Source code for Galaxia_ananke.Output

#!/usr/bin/env python
"""
Contains the Output class definition

Please note that this module is private. The Output class is
available in the main ``Galaxia`` namespace - use that instead.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Tuple, List, Dict, Iterable
from numpy.typing import NDArray, ArrayLike
from warnings import warn
from functools import cached_property
import concurrent.futures
import gc
import pathlib
import itertools
import numpy as np
import h5py as h5
import ebf
import vaex
import pandas as pd
from astropy import units, coordinates
from astropy.utils import classproperty

from ._constants import *
from ._templates import *
from ._defaults import *
from .utils import CallableDFtoNone, common_entries
from .photometry.PhotoSystem import PhotoSystem
from . import Input

if TYPE_CHECKING:
    from . import Survey

__all__ = ['Output']


[docs] def shift_g_lon(lon): # restrict longitude values to be within (-180,180) return -((-lon+180)%360-180)
def _decorate_post_processing(pp: CallableDFtoNone) -> CallableDFtoNone: def new_pp(*args) -> None: pp(*args) gc.collect() return new_pp
[docs] class Output: _position_prop = (('px', 'py', 'pz'), "Position coordinates in $kpc$") _velocity_prop = (('vx', 'vy', 'vz'), "Velocity coordinates in $km/s$") _celestial_prop = (('ra', 'dec'), "Celestial equatorial coordinates in $degrees$") _galactic_prop = (('glon', 'glat'), "Celestial galactic coordinates in $degrees$") _distance_prop = ('rad', "Distance in $kpc$") _modulus_prop = ('dmod', "Distance modulus in magnitude units") _trgbmass_prop = ('mtip', "Tip of the Red Giant Branch stellar mass in solar masses") _currentmass_prop = ('mact', "Current stellar mass in solar masses") _zamsmass_prop = ('smass', "Zero Age Main Sequence stellar mass in solar masses") _age_prop = ('age', "Stellar ages in years and decimal logarithmic scale") _surfacegravity_prop = ('grav', "Surface gravity in CGS units and decimal logarithmic scale") _metallicity_prop = ('feh', "Stellar metallicity $[Fe/H]$ in $dex$ relative to solar") _temperature_prop = ('teff', "Surface temperature in Kelvin and decimal logarithmic scale") _luminosity_prop = ('lum', "Stellar luminosity in solar luminosities and decimal logarithmic scale") _parentindex_prop = Input._parentindex_prop _partitionindex_prop = Input._partitionindex_prop _satindex_prop = Input._populationindex_prop _satindex = 'satid' _particleflag_prop = ('partid', "Flag = 1 if star not at center of its parent particle") _parallax_prop = ('pi', "Parallax in milliarcseconds") _propermotion_prop = (('mura', 'mudec'), "Equatorial proper motions in milliarcseconds per year") _galacticpropermotion_prop = (('mul', 'mub'), "Galactic proper motions in milliarcseconds per year") _radialvelocity_prop = ('vr', "Radial velocity in $km/s$") _vaex_under_list = ['_repr_html_']
[docs] def __init__(self, survey: Survey, parameters: dict) -> None: """ Driver to exploit the output of Galaxia. Call signature:: output = Output(survey, parameters) Parameters ---------- survey : :obj:`Survey` Survey object that returned this output. parameters : dict Dictionary all of parameters passed by Survey that were used to generate this output. Notes ----- An Output object almost behaves as a vaex DataFrame, also please consult ``vaex`` online tutorials for more hands-on information: https://vaex.io/docs/tutorial.html The DataFrame represents the catalogue with columns corresponding to properties of the stars from the synthetic stellar population it simulates. .. warning:: When generated directly by ``Galaxia_ananke``, the catalogue properties reflect directly the quantities as computed by Galaxia. However the catalogue can be modified/amended by applying post-processing routines using the method ``apply_post_process_pipeline_and_flush``. Also if such ``Output`` object was generated by other software than ``Galaxia_ananke``, post-processing may have been applied: also please refer to that software documentation for a more complete overview of the catalogue. The catalogue properties include the photometric magnitudes per filter, with each filter identified by a key in the following lowercase format: ``photosys_filtername`` where * ``photo_sys`` corresponds to the chosen photometric system * ``filtername`` corresponds to a filter name of that system As an example, the photometry in filters ``gbp``, ``grp`` & ``g`` of the Gaia DR2 system identified as ``GAIA__DR2`` are respectively under keys ``gaia__dr2_gbp``, ``gaia__dr2_grp`` & ``gaia__dr2_g``. With those are also always included the following properties: {_output_properties} Additionally, depending on what optional properties were provided with the input particle data, the output can also include the following properties: {_optional_properties} """ self.__survey = survey self.__parameters = parameters self.__vaex = None self.__vaex_per_partition = None self.__path = None self.__clear_ebfs(force=True)
@classproperty def _export_properties(cls): return { cls._position_prop, cls._velocity_prop, cls._celestial_prop, cls._galactic_prop, cls._distance_prop, cls._modulus_prop, cls._trgbmass_prop, cls._currentmass_prop, cls._zamsmass_prop, cls._age_prop, cls._surfacegravity_prop, cls._metallicity_prop, cls._temperature_prop, cls._luminosity_prop, cls._parentindex_prop, cls._particleflag_prop, cls._partitionindex_prop } @classproperty def _postprocess_properties(cls): return { cls._parallax_prop, cls._propermotion_prop, cls._galacticpropermotion_prop, cls._radialvelocity_prop } @classproperty def _all_optional_properties(cls): return Input._optional_properties \ - {cls._parentindex_prop, cls._partitionindex_prop, cls._satindex_prop} \ | {(cls._satindex, cls._satindex_prop[1])} @classproperty def _export_keys(cls): return tuple(sum([(_p[0],) if isinstance(_p[0], str) else _p[0] for _p in cls._export_properties], ())) @classproperty def _postprocess_keys(cls): return tuple(sum([(_p[0],) if isinstance(_p[0], str) else _p[0] for _p in cls._postprocess_properties], ())) @classproperty def _pos(cls): return cls._position_prop[0] @classproperty def _vel(cls): return cls._velocity_prop[0] @classproperty def _cel(cls): return cls._celestial_prop[0] @classproperty def _gal(cls): return cls._galactic_prop[0] @classproperty def _rad(cls): return cls._distance_prop[0] @classproperty def _dmod(cls): return cls._modulus_prop[0] @classproperty def _mtip(cls): return cls._trgbmass_prop[0] @classproperty def _mact(cls): return cls._currentmass_prop[0] @classproperty def _mini(cls): return cls._zamsmass_prop[0] @classproperty def _age(cls): return cls._age_prop[0] @classproperty def _grav(cls): return cls._surfacegravity_prop[0] @classproperty def _feh(cls): return cls._metallicity_prop[0] @classproperty def _teff(cls): return cls._temperature_prop[0] @classproperty def _lum(cls): return cls._luminosity_prop[0] @classproperty def _parentid(cls): return cls._parentindex_prop[0] @classproperty def _partitionid(cls): return cls._partitionindex_prop[0] @classproperty def _partid(cls): return cls._particleflag_prop[0] @classproperty def _pi(cls): return cls._parallax_prop[0] @classproperty def _mu(cls): return cls._propermotion_prop[0] @classproperty def _mugal(cls): return cls._galacticpropermotion_prop[0] @classproperty def _vr(cls): return cls._radialvelocity_prop[0] def __dir__(self): return sorted({i for i in self.__vaex.__dir__() if not i.startswith('_')}.union( super(Output, self).__dir__()).union( self._vaex_under_list if self.__vaex is not None else [])) def __repr__(self) -> str: return repr(self._vaex) def __getitem__(self, item): return self._vaex[item] def __setitem__(self, item, value): self._vaex[item] = value def __getattr__(self, item): if (item in self.__vaex.__dir__() and not item.startswith('_')) or (item in self._vaex_under_list and self.__vaex is not None): return getattr(self.__vaex, item) else: return self.__getattribute__(item) @classmethod def _compile_export_mag_names(cls, photosystems: list[PhotoSystem]) -> Tuple[str]: return tuple(itertools.chain.from_iterable([photosystem.to_export_keys for photosystem in photosystems])) @classmethod def _make_export_keys(cls, photosystems: list[PhotoSystem], extra_keys=()) -> Tuple[str]: return tuple(set(cls._export_keys).union(extra_keys).union(cls._compile_export_mag_names(photosystems))) @classmethod def _make_catalogue_keys(cls, photosystems: list[PhotoSystem], extra_keys=()) -> Tuple[str]: return cls._make_export_keys(photosystems, extra_keys=cls._postprocess_keys+extra_keys) def _make_input_optional_keys(self) -> Tuple[str]: return tuple(k if k != self._satindex_prop[0] else self._satindex for k in self.survey.input.optional_keys()) def __ebf_to_hdf5_older(self): warn('This method is deprecated and does nothing at this time, this will be removed in future versions', DeprecationWarning, stacklevel=2) return hdf5_file = self.__hdf5 with h5.File(hdf5_file, 'w') as f5: for k in self.export_keys: # print(f"Exporting {k}...") f5.create_dataset(name=k, data=ebf.read(str(self.__ebf), f"/{k}")) print(f"Exported the following quantities to {hdf5_file}") print(list(f5.keys())) self.__vaex = vaex.open(hdf5_file) def __ebf_to_hdf5_old(self): warn('This method is deprecated and does nothing at this time, this will be removed in future versions', DeprecationWarning, stacklevel=2) return for i, hdf5_file, partition_slices, partition_indices in common_entries(self._hdf5s, self.__ebf_part_slices, self.__ebf_partitions): with h5.File(hdf5_file, 'w') as f5: for k in self.export_keys: # print(f"Exporting {k}...") # f5.create_dataset(name=k, data=ebf.read_ind(str(self._ebf), f"/{k}", partition_indices)) data = np.zeros(partition_indices.shape[0], dtype=ebf.read_ind(str(self.__ebf), f"/{k}", [0]).dtype) head = 0 for p_slice in partition_slices: data[head:(head:=head+p_slice.stop-p_slice.start)] = ebf.read(str(self.__ebf), f"/{k}", begin=p_slice.start, end=p_slice.stop) f5.create_dataset(name=k, data=data) print(f"Exported the following quantities to {hdf5_file} for partition {i}") print(list(f5.keys())) self.__vaex = vaex.open_many(map(str,self._hdf5s.values())) def _ebf_to_hdf5(self) -> None: ebfs: List[pathlib.Path] = self._ebfs export_keys: Tuple[str] = self.export_keys with concurrent.futures.ThreadPoolExecutor() as executor: # credit to https://www.squash.io/how-to-parallelize-a-simple-python-loop/ # Submit tasks to the executor futures = [executor.submit(self.__singlethread_ebf_to_hdf5, i, hdf5_file, part_slices_in_ebfs, part_lengths_in_ebfs, ebfs, export_keys) for i, hdf5_file, part_slices_in_ebfs, part_lengths_in_ebfs in common_entries(self._hdf5s, self.__ebfs_part_slices, self.__ebfs_part_lengths)] # Collect the results _ = [future.result() for future in concurrent.futures.as_completed(futures)] self.__reload_vaex() @classmethod def __singlethread_ebf_to_hdf5(cls, i: int, hdf5_file: pathlib.Path, part_slices_in_ebfs: Dict[str, List[slice]], part_lengths_in_ebfs: Dict[str, int], ebfs: List[pathlib.Path], export_keys: Tuple[str]) -> None: n_ebfs: int = len(ebfs) data_length: int = sum(part_lengths_in_ebfs.values()) ebfs_slices: Dict[slice] = {ebf_path.name: slice(bounds[0],bounds[1]) for ebf_path, bounds in zip(ebfs, np.repeat(np.cumsum( [0]+[part_lengths_in_ebfs[ebf_path.name] for ebf_path in ebfs] ), [1]+(n_ebfs-1)*[2]+[1] ).reshape((n_ebfs,2)))} ebf_sorter: NDArray = (i + np.arange(n_ebfs)) % n_ebfs i_ebf: int = ebf_sorter[0] first_ebf_str: str = str(ebfs[i_ebf].resolve()) with h5.File(hdf5_file, 'w') as f5: f5datasets = {name: f5.create_dataset(name=name, shape=(data_length,), dtype=ebf.read_ind(first_ebf_str, f"/{name}", [0]).dtype) for name in export_keys} for ebf_path in ebfs[i_ebf:]+ebfs[:i_ebf]: ebf_name: str = ebf_path.name ebf_str: str = str(ebf_path.resolve()) f5data_slice: slice = ebfs_slices[ebf_name] part_slices: List[slice] = part_slices_in_ebfs[ebf_name] for name in export_keys: head = f5data_slice.start for p_slice in part_slices: f5datasets[name][head:(head:=head+p_slice.stop-p_slice.start)] = ebf.read( ebf_str, f"/{name}", begin=p_slice.start, end=p_slice.stop ) print(f"Exported the following quantities from {ebf_path} to {hdf5_file} for partition {i}") print(list(f5.keys())) ### DEFINING POST PROCESSING PIPELINES BELOW # TODO consider a PostProcess class that runs postprocess pipeline at __call__ and holds flush_with_columns @classmethod def __pp_convert_cartesian_to_galactic(cls, df: pd.DataFrame) -> None: """ converts positions & velocities from mock catalog Cartesian coordinates (relative to solar position) into Galactic coordinates, assuming Sun is on -x axis (use rotateStars) """ GC = coordinates.Galactic(u = df[cls._pos[0]].to_numpy()*units.kpc, v = df[cls._pos[1]].to_numpy()*units.kpc, w = df[cls._pos[2]].to_numpy()*units.kpc, U = df[cls._vel[0]].to_numpy()*units.km/units.s, V = df[cls._vel[1]].to_numpy()*units.km/units.s, W = df[cls._vel[2]].to_numpy()*units.km/units.s, representation_type = coordinates.CartesianRepresentation, differential_type = coordinates.CartesianDifferential) df[cls._gal[0]] = shift_g_lon(GC.spherical.lon.value) df[cls._gal[1]] = GC.spherical.lat.value df[cls._rad] = GC.spherical.distance.value #################################### df[cls._mugal[0]] = GC.sphericalcoslat.differentials['s'].d_lon_coslat.value df[cls._mugal[1]] = GC.sphericalcoslat.differentials['s'].d_lat.value df[cls._vr] = GC.sphericalcoslat.differentials['s'].d_distance.value @classmethod def __pp_convert_galactic_to_icrs(cls, df: pd.DataFrame) -> None: """ converts PMs in galactic coordinates (mulcosb, mub) in arcsec/yr (as output by Galaxia) to ra/dec in mas/yr (units of output catalog) """ GC = coordinates.Galactic(l = df[cls._gal[0]].to_numpy()*units.degree, b = df[cls._gal[1]].to_numpy()*units.degree, distance = df[cls._rad].to_numpy()*units.kpc, pm_l_cosb = df[cls._mugal[0]].to_numpy()*units.mas/units.yr, pm_b = df[cls._mugal[1]].to_numpy()*units.mas/units.yr, radial_velocity = df[cls._vr].to_numpy()*units.km/units.s) GC_icrs = GC.transform_to(coordinates.ICRS()) df[cls._cel[0]] = GC_icrs.ra.value df[cls._cel[1]] = GC_icrs.dec.value #################################### df[cls._mu[0]] = GC_icrs.pm_ra_cosdec.to(units.mas/units.yr).value df[cls._mu[1]] = GC_icrs.pm_dec.to(units.mas/units.yr).value @classmethod def __pp_convert_icrs_to_galactic(cls, df: pd.DataFrame) -> None: """ converts PMs from ICRS coordinates (muacosd, mudec) to Galactic (mul, mub) input and output in mas/yr for PMs and degrees for positions also exports the galactic lat and longitude """ IC = coordinates.ICRS(ra = df[cls._cel[0]].to_numpy()*units.degree, dec = df[cls._cel[1]].to_numpy()*units.degree, pm_ra_cosdec = df[cls._mu[0]].to_numpy()*units.mas/units.yr, pm_dec = df[cls._mu[1]].to_numpy()*units.mas/units.yr) IC_gal = IC.transform_to(coordinates.Galactic()) df[cls._gal[0]] = shift_g_lon(IC_gal.l.value) df[cls._gal[1]] = IC_gal.b.value df[cls._mugal[0]] = IC_gal.pm_l_cosb.to(units.mas/units.yr).value df[cls._mugal[1]] = IC_gal.pm_b.to(units.mas/units.yr).value @classmethod def __pp_last_conversions(cls, df: pd.DataFrame) -> None: df[cls._pi] = 1.0/df[cls._rad] # parallax in mas (from distance in kpc) df[cls._teff] = 10**df[cls._teff] #Galaxia returns log10(teff/K) df[cls._lum] = 10**df[cls._lum] #Galaxia returns log10(lum/lsun)
[docs] def apply_post_process_pipeline_and_flush(self, post_process: CallableDFtoNone, *args, flush_with_columns=(), hold_flush: bool = False) -> None: """ Apply a given post processing routine to the catalogue Parameters ---------- post_process : callable Post processing pipeline to apply to the catalogue. This must be defined as a callable that returns nothing, and take only positional arguments, the first of which being the DataFrame representing the catalogue. \*args : callable args Any other positinoal arguments that should be passed to the ``post_process`` callable pipeline, in the order they should be passed. flush_with_columns : iterable If given an iterable structure of existing column keys, the flushing done after application of the post-processing will also overwrite those in the backend file with their current in-memory values. Default to an empty tuple. hold_flush : bool Flag to hold the flushing from being done after application of the post-processing. Default to False. """ # post_process(self._vaex, *args) with concurrent.futures.ThreadPoolExecutor() as executor: # credit to https://www.squash.io/how-to-parallelize-a-simple-python-loop/ # Submit tasks to the executor futures = [executor.submit(_decorate_post_processing(post_process), vaex_df, *args) for vaex_df in self._vaex_per_partition.values()] # Collect the results _ = [future.result() for future in concurrent.futures.as_completed(futures)] if not(hold_flush): self.flush_extra_columns_to_hdf5(with_columns=flush_with_columns)
def _post_process(self) -> None: self._pp_convert_cartesian_to_galactic() self._pp_convert_galactic_to_icrs() self._pp_last_conversions() def _pp_convert_cartesian_to_galactic(self) -> None: pipeline_name = "convert_cartesian_to_galactic" print(f"Running {pipeline_name} post-processing pipeline") self.apply_post_process_pipeline_and_flush(self.__pp_convert_cartesian_to_galactic, flush_with_columns=self._gal+(self._rad,)) def _pp_convert_galactic_to_icrs(self) -> None: pipeline_name = "convert_galactic_to_icrs" print(f"Running {pipeline_name} post-processing pipeline") self.apply_post_process_pipeline_and_flush(self.__pp_convert_galactic_to_icrs, flush_with_columns=self._cel) def _pp_convert_icrs_to_galactic(self) -> None: pipeline_name = "convert_icrs_to_galactic" print(f"Running {pipeline_name} post-processing pipeline") self.apply_post_process_pipeline_and_flush(self.__pp_convert_icrs_to_galactic, flush_with_columns=self._gal+self._mugal) def _pp_last_conversions(self) -> None: pipeline_name = "last_conversions" print(f"Running {pipeline_name} post-processing pipeline") self.apply_post_process_pipeline_and_flush(self.__pp_last_conversions, flush_with_columns=(self._teff, self._lum)) def __name_with_ext(self, ext): name_base = self._file_base return name_base.parent / f"{name_base.name}{ext}"
[docs] def save(self, path): # TODO Gotta update this """ Save output to new path .. danger:: currently not implemented """ raise NotImplementedError old_path = self._path self.__path = pathlib.Path(path) self._vaex.close() old_path.rename(self._path) self.__vaex = vaex.open(self._path)
@property def survey(self): return self.__survey @property def photosystems(self): return self.survey.photosystems @property def isochrones(self): warn('This property will be deprecated, please use instead property photosystems', DeprecationWarning, stacklevel=2) return self.photosystems @property def export_keys(self) -> Tuple[str]: return self._make_export_keys(self.photosystems, extra_keys=self._make_input_optional_keys()) @property def catalogue_keys(self) -> Tuple[str]: return self._make_catalogue_keys(self.photosystems, extra_keys=self._make_input_optional_keys()) @property def output_dir(self): return pathlib.Path(self._parameters[FTTAGS.output_dir]) @property def output_name(self): return f"{self.survey.surveyname}.{self.survey.inputname}" @property def rsun_skycoord(self): _temp = [self._parameters[k] for k in FTTAGS.rSun] return coordinates.SkyCoord(u=_temp[0], v=_temp[1], w=_temp[2], unit='kpc', representation_type='cartesian', frame='galactic') @property def _parameters(self): return self.__parameters @cached_property def __ebf_partitions(self) -> Dict[int, NDArray]: warn('This property is deprecated as remnant of the single ebf output implementation, this will be removed in future versions', DeprecationWarning, stacklevel=2) if self.__ebf.exists(): return pd.DataFrame(ebf.read(str(self.__ebf), f"/{self._partitionid}")).groupby([0]).indices else: raise RuntimeError("Don't attempt creating an Output object on your own, those are meant to be returned by Survey") @cached_property def __ebfs_partitions(self) -> Dict[int, Dict[str, NDArray]]: return_dict = {} for ebf_path in self._ebfs: if ebf_path.exists(): ebf_name: str = ebf_path.name ebf_str: str = str(ebf_path.resolve()) for i, ind in pd.DataFrame(ebf.read(ebf_str, f"/{self._partitionid}")).groupby([0]).indices.items(): if i in return_dict: return_dict[i][ebf_name] = ind else: return_dict[i] = {ebf_name: ind} else: raise RuntimeError("Don't attempt creating an Output object on your own, those are meant to be returned by Survey") return return_dict @property def __ebfs_part_lengths(self) -> Dict[int, Dict[str, int]]: return {i: {ebf_name: len(indices) for ebf_name,indices in indices_per_ebf.items()} for i,indices_per_ebf in self.__ebfs_partitions.items()} @cached_property def __ebf_part_slices(self) -> Dict[int, List[slice]]: warn('This property is deprecated as remnant of the single ebf output implementation, this will be removed in future versions', DeprecationWarning, stacklevel=2) return {i: [slice(start, stop) for start, stop in zip( [indices[0]]+indices[where_slice_change+1].tolist(), (indices[where_slice_change]+1).tolist() + [indices[-1]+1] )] for i,indices in self.__ebf_partitions.items() if (where_slice_change:=np.where(np.diff(indices)>1)[0]) is not None} @cached_property def __ebfs_part_slices(self) -> Dict[int, Dict[str, List[slice]]]: return {i: {ebf_name: [slice(start, stop) for start, stop in zip( [indices[0]]+indices[where_slice_change+1].tolist(), (indices[where_slice_change]+1).tolist() + [indices[-1]+1] )] for ebf_name,indices in indices_per_ebf.items() if (where_slice_change:=np.where(np.diff(indices)>1)[0]) is not None} for i,indices_per_ebf in self.__ebfs_partitions.items()} @cached_property def __vaex_partitions(self) -> Dict[int, NDArray]: return self._vaex[self._partitionid].to_pandas_series().to_frame().groupby([0]).indices @cached_property def __vaex_partition_slices(self) -> Dict[int, slice]: return {i: slice(indices[0], indices[-1]+1) for i,indices in self.__vaex_partitions.items()} @property def _vaex(self): if self.__vaex is None: raise RuntimeError("Don't attempt creating an Output object on your own, those are meant to be returned by Survey") else: return self.__vaex @property def _vaex_per_partition(self): if self.__vaex_per_partition is None: raise RuntimeError("Don't attempt creating an Output object on your own, those are meant to be returned by Survey") else: return self.__vaex_per_partition @property def _path(self): raise NotImplementedError if self.__path is None: return self.__hdf5 else: return self.__path @property def _file_base(self): return self.output_dir / self.output_name @cached_property def __ebf(self): warn('This property is deprecated as remnant of the single ebf/single hdf5 output implementation, this will be removed in future versions', DeprecationWarning, stacklevel=2) return next(self._ebf_glob) @property def _ebf_glob_pattern(self): return self.__name_with_ext('.*.ebf') @property def _ebf_glob(self): _temp = self._ebf_glob_pattern return _temp.parent.glob(_temp.name) @cached_property def _ebfs(self): return list(self._ebf_glob) def __clear_ebfs(self, force: bool = False) -> None: for ebf in self._ebf_glob: if True if force else input(f"You are about to remove {ebf}, do I proceed? [y/N] ") == 'y': ebf.unlink() @cached_property def __hdf5(self): warn('This property is deprecated as remnant of the single ebf/single hdf5 output implementation, this will be removed in future versions', DeprecationWarning, stacklevel=2) return self.__name_with_ext('.h5') @property def _hdf5_glob_pattern(self): return self.__name_with_ext('.*.h5') @cached_property def _hdf5s(self): pattern = self._hdf5_glob_pattern partitions = self.__ebfs_partitions length_tags = len(str(max(partitions.keys()))) return {i: pattern.parent / pattern.name.replace('*',f"{i:0{length_tags}d}") for i in partitions.keys()} def __flush_extra_columns_to_hdf5_older(self, with_columns=()): # temporary until vaex supports it warn('This method is deprecated and does nothing at this time, this will be removed in future versions', DeprecationWarning, stacklevel=2) return hdf5_file = self.__hdf5 old_column_names = set(vaex.open(hdf5_file).column_names) with h5.File(hdf5_file, 'r+') as f5: extra_columns = [k for k in set(self.column_names)-old_column_names if not k.startswith('__')] for k in extra_columns: f5.create_dataset(name=k, data=self[k].to_numpy()) if extra_columns: print(f"Exported the following quantities to {hdf5_file}") print(extra_columns) for k in with_columns: f5[k][...] = self[k].to_numpy() if with_columns: print(f"Overwritten the following quantities to {hdf5_file}") print(with_columns) self.__vaex = vaex.open(hdf5_file) def __flush_extra_columns_to_hdf5_old(self, with_columns=()): # temporary until vaex supports it warn('This method is deprecated and does nothing at this time, this will be removed in future versions', DeprecationWarning, stacklevel=2) return old_column_names = set(vaex.open(str(self._hdf5s[0])).column_names) extra_columns = [k for k in set(self.column_names)-old_column_names if not k.startswith('__')] for i, hdf5_file, vaex_slice in common_entries(self._hdf5s, self.__vaex_partition_slices): with h5.File(hdf5_file, 'r+') as f5: for k in extra_columns: f5.create_dataset(name=k, data=self[vaex_slice][k].to_numpy()) if extra_columns: print(f"Exported the following quantities to {hdf5_file} for partition {i}") print(extra_columns) for k in with_columns: f5[k][...] = self[vaex_slice][k].to_numpy() if with_columns: print(f"Overwritten the following quantities to {hdf5_file} for partition {i}") print(with_columns) self.__reload_vaex() def __singlethread_flush_extra_columns_to_hdf5(self, vaex_df: pd.DataFrame, hdf5_file: pathlib.Path, with_columns: Optional[Iterable] = ()) -> None: # temporary until vaex supports it old_column_names = set(vaex.open(hdf5_file).column_names) extra_columns = [k for k in set(vaex_df.column_names)-old_column_names if not k.startswith('__')] with h5.File(hdf5_file, 'r+') as f5: for k in extra_columns: f5.create_dataset(name=k, data=vaex_df[k].to_numpy()) if extra_columns: print(f"Exported the following quantities to {hdf5_file}") print(extra_columns) for k in with_columns: f5[k][...] = vaex_df[k].to_numpy() if len(with_columns): print(f"Overwritten the following quantities to {hdf5_file}") print(with_columns)
[docs] def flush_extra_columns_to_hdf5(self, with_columns: Optional[Iterable] = ()) -> None: # temporary until vaex supports it """ Flush the dataframe new columns to its backend memory-mapped file Parameters ---------- with_columns : iterable If given an iterable structure of existing column keys, the flushing will also overwrite those in the backend file with their current in-memory values. Default to an empty tuple. """ with concurrent.futures.ThreadPoolExecutor() as executor: # credit to https://www.squash.io/how-to-parallelize-a-simple-python-loop/ # Submit tasks to the executor futures = [executor.submit(self.__singlethread_flush_extra_columns_to_hdf5, vaex_df, hdf5_file, with_columns) for _, hdf5_file, vaex_df in common_entries(self._hdf5s, self._vaex_per_partition)] # Collect the results _ = [future.result() for future in concurrent.futures.as_completed(futures)] self.__reload_vaex() gc.collect()
def __reload_vaex(self) -> None: self.__vaex = vaex.open_many(map(str,self._hdf5s.values())) self.__vaex_per_partition = {i: vaex.open(str(hdf5_file)) for i, hdf5_file in self._hdf5s.items()}
Output.__init__.__doc__ = Output.__init__.__doc__.format(_output_properties=''.join( [f"\n * {desc} via key{'' if isinstance(key, str) else 's'} ``{str(key).replace(chr(39),'')}``" for key, desc in Output._export_properties.union(Output._postprocess_properties)]), _optional_properties=''.join( [f"\n * {desc} via key{'' if isinstance(key, str) else 's'} ``{str(key).replace(chr(39),'')}``" for key, desc in Output._all_optional_properties])) if __name__ == '__main__': pass