#!/usr/bin/env python
#
# Author: Adrien CR Thob
# Copyright (C) 2022 Adrien CR Thob
#
# This file is part of the py-Galaxia-ananke project,
# <https://github.com/athob/py-Galaxia-ananke>, which is licensed
# under the GNU Affero General Public License v3.0 (AGPL-3.0).
#
# The full copyright notice, including terms governing use, modification,
# and redistribution, is contained in the files LICENSE and COPYRIGHT,
# which can be found at the root of the source code distribution tree:
# - LICENSE <https://github.com/athob/py-Galaxia-ananke/blob/main/LICENSE>
# - COPYRIGHT <https://github.com/athob/py-Galaxia-ananke/blob/main/COPYRIGHT>
#
"""
Contains the Output class definition
Please note that this module is private. The Output class is
available in the main ``galaxia_ananke`` namespace - use that instead.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Optional, Union, Tuple, List, Dict, Iterable
from numpy.typing import NDArray, ArrayLike
from warnings import warn
from functools import cached_property
import concurrent.futures
import pathos
import gc
import os
import pathlib
import itertools
import numpy as np
import h5py as h5
import ebf
import vaex
import pandas as pd
from astropy import units, coordinates
import vaex.dataframe
from .__metadata__ import *
from ._constants import *
from ._templates import *
from ._defaults import *
from .utils import classproperty, CallableDFtoNone, CallableDFtoInt, RecordingDataFrame, State, common_entries, metadata_dicts_to_consolidated_json
from .photometry.PhotoSystem import PhotoSystem
from . import Input
if TYPE_CHECKING:
from . import Survey
__all__ = ['Output']
[docs]
def shift_g_lon(lon): # restrict longitude values to be within (-180,180)
return -((-lon+180)%360-180)
def _flush_extra_columns_to_hdf5(vaex_df: vaex.DataFrame, hdf5_file: pathlib.Path,
with_columns: Optional[Iterable] = (),
update_metadata: Optional[Dict[str, Union[str,float,int]]] = None,
verbose: bool = True) -> None: # temporary until vaex supports it
_temp = vaex.open(hdf5_file)
old_column_names = set(_temp.column_names)
_temp.close()
extra_columns = [k for k in set(vaex_df.column_names)-old_column_names if not k.startswith('__')]
with_columns = list(set(with_columns) - set(extra_columns))
with h5.File(hdf5_file, 'r+') as f5:
for k in extra_columns:
f5.create_dataset(name=k, data=vaex_df[k].to_numpy())
if verbose and extra_columns:
print(f"Exported the following quantities to {hdf5_file}")
print(extra_columns)
for k in with_columns:
f5[k][...] = vaex_df[k].to_numpy()
if verbose and len(with_columns):
print(f"Overwritten the following quantities to {hdf5_file}")
print(with_columns)
if isinstance(update_metadata, dict):
for k, v in update_metadata.items():
f5.attrs[k] = v if isinstance(v, (bool, int, float, str)) else v(f5.attrs)
def _decorate_post_processing(pp: CallableDFtoNone, hdf5_path_input: bool = False, flush_with_columns: Optional[Iterable] = (), max_thread_workers: int = None, update_metadata: Optional[Dict[str, Union[str,float,int]]] = None, verbose: bool = True) -> CallableDFtoNone:
def new_pp(*args) -> None:
first_arg = args[0]
if not isinstance(first_arg, list):
first_arg = [first_arg]
for _temp in first_arg:
if hdf5_path_input:
hdf5_file: pathlib.Path = _temp
old_vaex_main_executor = vaex.dataframe.main_executor
vaex.dataframe.main_executor = vaex.execution.ExecutorLocal(vaex.multithreading.ThreadPoolIndex(max_workers=max_thread_workers))
vaex_df: vaex.DataFrame = vaex.open(hdf5_file)
else:
vaex_df: vaex.DataFrame = _temp
pp(vaex_df, *args[1:])
if hdf5_path_input:
_flush_extra_columns_to_hdf5(vaex_df, hdf5_file, with_columns=flush_with_columns, update_metadata=update_metadata, verbose=verbose)
vaex_df.close()
vaex.dataframe.main_executor = old_vaex_main_executor
gc.collect()
return new_pp
[docs]
class Output:
_position_prop = (('px', 'py', 'pz'), "Position coordinates in $kpc$")
_velocity_prop = (('vx', 'vy', 'vz'), "Velocity coordinates in $km/s$")
_celestial_prop = (('ra', 'dec'), "Celestial equatorial coordinates in $degrees$")
_galactic_prop = (('glon', 'glat'), "Celestial galactic coordinates in $degrees$")
_distance_prop = ('rad', "Distance in $kpc$")
_modulus_prop = ('dmod', "Distance modulus in magnitude units")
_trgbmass_prop = ('mtip', "Tip of the Red Giant Branch stellar mass in solar masses")
_currentmass_prop = ('mact', "Current stellar mass in solar masses")
_initialmass_prop = ('minit', "Zero Age Main Sequence stellar mass in solar masses")
_age_prop = ('age', "Stellar ages in years and decimal logarithmic scale")
_surfacegravity_prop = ('grav', "Surface gravity in CGS units and decimal logarithmic scale")
_metallicity_prop = ('feh', "Stellar metallicity $[Fe/H]$ in $dex$ relative to solar")
_temperature_prop = ('teff', "Surface temperature in Kelvin and decimal logarithmic scale")
_luminosity_prop = ('lum', "Stellar luminosity in solar luminosities and decimal logarithmic scale")
_parentindex_prop = Input._parentindex_prop
_partitionindex_prop = Input._partitionindex_prop
_satindex_prop = Input._populationindex_prop
_satindex = 'satid'
_particleflag_prop = ('partid', "Flag = 1 if star not at center of its parent particle")
_parallax_prop = ('pi', "Parallax in milliarcseconds")
_propermotion_prop = (('mura', 'mudec'), "Equatorial proper motions in milliarcseconds per year")
_galacticpropermotion_prop = (('mul', 'mub'), "Galactic proper motions in milliarcseconds per year")
_radialvelocity_prop = ('vr', "Radial velocity in $km/s$")
_vaex_under_list = ['_repr_html_']
[docs]
def __init__(self, survey: Survey) -> None:
"""
Driver to exploit the output of Galaxia.
Call signature::
output = Output(survey)
Parameters
----------
survey : :obj:`Survey`
Survey object that returned this output.
Notes
-----
An Output object almost behaves as a vaex DataFrame, also please
consult ``vaex`` online tutorials for more hands-on information:
https://vaex.io/docs/tutorial.html
The DataFrame represents the catalogue with columns corresponding
to properties of the stars from the synthetic stellar population
it simulates.
.. warning:: When generated directly by ``galaxia_ananke``, the
catalogue properties reflect directly the quantities
as computed by Galaxia. However the catalogue can be
modified/amended by applying post-processing routines
using the method
``apply_post_process_pipeline_and_flush``. Also if
such ``Output`` object was generated by other software
than ``galaxia_ananke``, post-processing may have been
applied: also please refer to that software
documentation for a more complete overview of the
catalogue.
The catalogue properties include the photometric magnitudes per
filter, with each filter identified by a key in the following
lowercase format:
``photosys_filtername``
where
* ``photo_sys`` corresponds to the chosen photometric system
* ``filtername`` corresponds to a filter name of that system
As an example, the photometry in filters ``gbp``, ``grp`` & ``g``
of the Gaia DR2 system identified as ``GAIA__DR2`` are respectively
under keys ``gaia__dr2_gbp``, ``gaia__dr2_grp`` & ``gaia__dr2_g``.
With those are also always included the following properties:
{_output_properties}
Additionally, depending on what optional properties were provided
with the input particle data, the output can also include the
following properties:
{_optional_properties}
"""
self.__survey: Survey = survey
self.__vaex = None
self.__vaex_per_partition = None
self.__path = None
self.__make_state()
if not self.caching:
self.__clear_ebfs(force=True)
self._max_pp_workers = 1
self._pp_auto_flush = True
class _State(State):
pass
@classproperty
def _export_properties(cls):
return {
cls._position_prop,
cls._velocity_prop,
cls._distance_prop,
cls._modulus_prop,
cls._trgbmass_prop,
cls._currentmass_prop,
cls._initialmass_prop,
cls._age_prop,
cls._surfacegravity_prop,
cls._metallicity_prop,
cls._temperature_prop,
cls._luminosity_prop,
cls._parentindex_prop,
cls._particleflag_prop,
cls._partitionindex_prop
}
@classproperty
def _postprocess_properties(cls):
return {
cls._celestial_prop,
cls._galactic_prop,
cls._parallax_prop,
cls._propermotion_prop,
cls._galacticpropermotion_prop,
cls._radialvelocity_prop
}
@classproperty
def _all_optional_properties(cls):
return Input._optional_properties \
- {cls._parentindex_prop, cls._partitionindex_prop, cls._satindex_prop} \
| {(cls._satindex, cls._satindex_prop[1])}
@classproperty
def _export_keys(cls):
return tuple(sum([(_p[0],) if isinstance(_p[0], str) else _p[0] for _p in cls._export_properties], ()))
@classproperty
def _postprocess_keys(cls):
return tuple(sum([(_p[0],) if isinstance(_p[0], str) else _p[0] for _p in cls._postprocess_properties], ()))
@classproperty
def _pos(cls):
return cls._position_prop[0]
@classproperty
def _vel(cls):
return cls._velocity_prop[0]
@classproperty
def _cel(cls):
return cls._celestial_prop[0]
@classproperty
def _gal(cls):
return cls._galactic_prop[0]
@classproperty
def _rad(cls):
return cls._distance_prop[0]
@classproperty
def _dmod(cls):
return cls._modulus_prop[0]
@classproperty
def _mtip(cls):
return cls._trgbmass_prop[0]
@classproperty
def _mact(cls):
return cls._currentmass_prop[0]
@classproperty
def _minit(cls):
return cls._initialmass_prop[0]
@classproperty
def _age(cls):
return cls._age_prop[0]
@classproperty
def _grav(cls):
return cls._surfacegravity_prop[0]
@classproperty
def _feh(cls):
return cls._metallicity_prop[0]
@classproperty
def _teff(cls):
return cls._temperature_prop[0]
@classproperty
def _lum(cls):
return cls._luminosity_prop[0]
@classproperty
def _parentid(cls):
return cls._parentindex_prop[0]
@classproperty
def _partitionid(cls):
return cls._partitionindex_prop[0]
@classproperty
def _partid(cls):
return cls._particleflag_prop[0]
@classproperty
def _pi(cls):
return cls._parallax_prop[0]
@classproperty
def _mu(cls):
return cls._propermotion_prop[0]
@classproperty
def _mugal(cls):
return cls._galacticpropermotion_prop[0]
@classproperty
def _vr(cls):
return cls._radialvelocity_prop[0]
def __dir__(self):
return sorted({i for i in self.__vaex.__dir__() if not i.startswith('_')}.union(
super(Output, self).__dir__()).union(
self._vaex_under_list if self.__vaex is not None else []))
def __repr__(self) -> str:
return repr(self._vaex)
def __getitem__(self, item: str):
return self._vaex[item]
def __setitem__(self, item: str, value):
self._vaex[item] = value
def __getattr__(self, item: str):
if (item in self.__vaex.__dir__() and not item.startswith('_')) or (item in self._vaex_under_list and self.__vaex is not None):
return getattr(self.__vaex, item)
else:
return self.__getattribute__(item)
@classmethod
def _compile_export_mag_names(cls, photosystems: list[PhotoSystem]) -> Tuple[str]:
return tuple(itertools.chain.from_iterable([photosystem.to_export_keys for photosystem in photosystems]))
@classmethod
def _make_export_keys(cls, photosystems: list[PhotoSystem], extra_keys=()) -> Tuple[str]:
return tuple(set(cls._export_keys).union(extra_keys).union(cls._compile_export_mag_names(photosystems)))
@classmethod
def _make_catalogue_keys(cls, photosystems: list[PhotoSystem], extra_keys=()) -> Tuple[str]:
return cls._make_export_keys(photosystems, extra_keys=cls._postprocess_keys+extra_keys)
def _make_input_optional_keys(self) -> Tuple[str]:
return tuple(k if k != self._satindex_prop[0] else self._satindex for k in self.survey.input.optional_keys())
def _redefine_partitions_in_ebfs(self, partitioning_rule: Optional[CallableDFtoInt]) -> None:
if partitioning_rule is not None: # TODO test instead partitioning_rule?
ebfs: List[pathlib.Path] = self._ebfs
export_keys: Tuple[str] = self.export_keys
# with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_pp_workers) as executor: # credit to https://www.squash.io/how-to-parallelize-a-simple-python-loop/
# # Submit tasks to the executor
# futures = [executor.submit(self._singlethread_redefine_partitions, ebf_file, partitioning_rule, export_keys)
# for ebf_file in ebfs]
# # Collect the results
# _ = [future.result() for future in concurrent.futures.as_completed(futures)]
with pathos.pools.ProcessPool(self._max_pp_workers) as executor: # credit to https://github.com/uqfoundation/pathos/issues/158#issuecomment-449636971
# Submit tasks to the executor
futures = [executor.apipe(self._singlethread_redefine_partitions, ebf_file, partitioning_rule, export_keys)
for ebf_file in ebfs]
# Collect the results
_ = [future.get() for future in futures]
@classmethod
def _singlethread_redefine_partitions(cls, ebf_file: pathlib.Path, partitioning_rule: CallableDFtoInt, export_keys: Tuple[str]) -> None:
dummy_df = RecordingDataFrame([], columns=export_keys, dtype=float)
_ = partitioning_rule(dummy_df)
ebf_df = pd.DataFrame({key: ebf.read(str(ebf_file), f"/{key}") for key in dummy_df.record_of_all_used_keys})
new_partition_id = partitioning_rule(ebf_df)
del ebf_df
ebf.update_ind(str(ebf_file), f'/{cls._partitionid}', new_partition_id)
partition_id_sorter = np.argsort(new_partition_id)
del new_partition_id
gc.collect()
for key in ebf._EbfMap.keys(str(ebf_file)):
if key not in [b'/log', b'/center'] and not key.startswith(b'/.'):
ebf.update_ind(str(ebf_file), key, ebf.read(str(ebf_file), key)[partition_id_sorter])
# Update ebf file log to flag that a modification was made
full_log = str(ebf.read(str(ebf_file), '/log')).rstrip('\n')
ebf._EbfTable.remove(str(ebf_file), '/log') # TODO update when a more dedicated routine is available
ebf.write(
str(ebf_file), '/log',
f"{full_log}\n"
f"# Modified by Python's {NAME} v{__version__},\n"
f"# software available at <{__url__}>.\n",
'a'
)
def _ebf_to_hdf5(self) -> None:
ebfs: List[pathlib.Path] = self._ebfs
export_keys: Tuple[str] = self.export_keys
partition_ids: List[int] = list(self._hdf5s.keys())
# with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_pp_workers) as executor: # credit to https://www.squash.io/how-to-parallelize-a-simple-python-loop/
# # Submit tasks to the executor
# futures = [executor.submit(self._singlethread_ebf_to_hdf5, i, hdf5_file, part_slices_in_ebfs, part_lengths_in_ebfs, ebfs, export_keys)
# for i, hdf5_file, part_slices_in_ebfs, part_lengths_in_ebfs in common_entries(self._hdf5s, self.__ebfs_part_slices, self.__ebfs_part_lengths)]
# # Collect the results
# _ = [future.result() for future in concurrent.futures.as_completed(futures)]
with pathos.pools.ProcessPool(self._max_pp_workers) as executor: # credit to https://github.com/uqfoundation/pathos/issues/158#issuecomment-449636971
# Submit tasks to the executor
futures = [executor.apipe(self._singlethread_ebf_to_hdf5, i, partition_id, partition_ids, hdf5_file, part_slices_in_ebfs, part_lengths_in_ebfs, ebfs, export_keys, self.all_metadata, self.verbose)
for i, (partition_id, hdf5_file, part_slices_in_ebfs, part_lengths_in_ebfs) in enumerate(common_entries(self._hdf5s, self.__ebfs_part_slices, self.__ebfs_part_lengths))]
# Collect the results
_ = [future.get() for future in futures]
if not(self._pp_auto_flush):
self.__reload_vaex()
@classmethod
def _singlethread_ebf_to_hdf5(cls, i: int, partition_id: int,
partition_ids: List[int],
hdf5_file: pathlib.Path,
part_slices_in_ebfs: Dict[str, List[slice]],
part_lengths_in_ebfs: Dict[str, int],
ebfs: List[pathlib.Path], export_keys: Tuple[str],
metadata: Dict[str, Union[str,float,int]],
verbose: bool = True) -> None:
header_ebf: str = str(ebfs[0].resolve())
ebfs: List[pathlib.Path] = [ebf_path for ebf_path in ebfs if ebf_path.name in part_lengths_in_ebfs]
n_ebfs: int = len(ebfs)
if ebfs:
i_ebf: int = i % n_ebfs
ebfs: List[pathlib.Path] = ebfs[i_ebf:]+ebfs[:i_ebf]
header_ebf: str = str(ebfs[0].resolve())
#
data_length: int = sum(part_lengths_in_ebfs.values())
ebfs_slices: Dict[str, slice] = {
ebf_path.name: slice(bounds[0],bounds[1])
for ebf_path, bounds
in zip(ebfs,
np.repeat(
np.cumsum(
[0]+[part_lengths_in_ebfs[ebf_path.name] for ebf_path in ebfs]
),
[1]+(n_ebfs-1)*[2]+[1]
).reshape((n_ebfs,2)
) if ebfs else []
)
}
with h5.File(hdf5_file, 'w') as f5:
f5datasets = {name: f5.create_dataset(name=name,
shape=(data_length,),
dtype=ebf.getHeader(header_ebf, f"/{name}").get_dtype())
for name in export_keys}
ebf_logs = []
for ebf_path in ebfs:
ebf_name: str = ebf_path.name
ebf_str: str = str(ebf_path.resolve())
ebf_log: Dict[str, str] = cls.__parse_galaxia_ebf_log(ebf_str)
f5data_slice: slice = ebfs_slices[ebf_name]
part_slices: List[slice] = part_slices_in_ebfs[ebf_name]
for name in export_keys:
head = f5data_slice.start
for p_slice in part_slices:
f5datasets[name][head:(head:=head+p_slice.stop-p_slice.start)] = ebf.read(
ebf_str, f"/{name}", begin=p_slice.start, end=p_slice.stop
)
if verbose:
print(f"Exported the following quantities from {ebf_path} to {hdf5_file} for partition {partition_id}")
print(list(f5.keys()))
ebf_logs.append(ebf_log)
f5.attrs["Galaxia_ebf_output_logs"] = metadata_dicts_to_consolidated_json(ebf_logs, metadata_name = "Galaxia_ebf_output_logs")
f5.attrs[METADATA_HEADER_KEY] = (
f"File generated by Python module {NAME} v{__version__}, "
f"software available at <{__url__}>."
)
f5.attrs[f"Cat_partition_id"] = partition_id
f5.attrs[f"All_partition_ids"] = partition_ids
for k, v in metadata.items():
f5.attrs[k] = v
@classmethod
def __parse_galaxia_ebf_log(cls, ebf_str: str) -> Dict[str, str]:
log_string: str = str(ebf.read(ebf_str, '/log'))
log_header, log_body = log_string.split('\n# <parameterfile>\n')
log_body, log_footer = log_body.split('\n# </parameterfile>\n')
log_body: str = log_body.replace('\n# ','\n')
log_dict: Dict[str, str] = {parts[0]: ' '.join(parts[1:]) for parts in (line.split() for line in log_body.splitlines()) if len(parts)>0}
return {"log_header": log_header, **log_dict, "log_footer": log_footer}
[docs]
def read_galaxia_output(self, partitioning_rule: Optional[CallableDFtoInt], max_pp_workers: int, pp_auto_flush: bool) -> None:
self._max_pp_workers = max_pp_workers
self._pp_auto_flush = pp_auto_flush
self.check_state_before_running(description="redefine_partitions_in_ebfs")(self._redefine_partitions_in_ebfs)(partitioning_rule)
self.check_state_before_running(description="convert_ebf_to_hdf5", level=1)(self._ebf_to_hdf5)()
### DEFINING POST PROCESSING PIPELINES BELOW # TODO consider a PostProcess class that runs postprocess pipeline at __call__ and holds flush_with_columns
@classmethod
def __pp_convert_cartesian_to_galactic(cls, df: pd.DataFrame) -> None:
"""
converts positions & velocities from mock catalog Cartesian coordinates (relative to solar position)
into Galactic coordinates, assuming Sun is on -x axis (use rotateStars)
"""
GC = coordinates.Galactic(u = df[cls._pos[0]].to_numpy()*units.kpc,
v = df[cls._pos[1]].to_numpy()*units.kpc,
w = df[cls._pos[2]].to_numpy()*units.kpc,
U = df[cls._vel[0]].to_numpy()*units.km/units.s,
V = df[cls._vel[1]].to_numpy()*units.km/units.s,
W = df[cls._vel[2]].to_numpy()*units.km/units.s,
representation_type = coordinates.CartesianRepresentation,
differential_type = coordinates.CartesianDifferential)
df[cls._gal[0]] = shift_g_lon(GC.spherical.lon.value)
df[cls._gal[1]] = GC.spherical.lat.value
df[cls._rad] = GC.spherical.distance.value
####################################
if GC.shape[0]:
df[cls._mugal[0]] = GC.sphericalcoslat.differentials['s'].d_lon_coslat.value
df[cls._mugal[1]] = GC.sphericalcoslat.differentials['s'].d_lat.value
df[cls._vr] = GC.sphericalcoslat.differentials['s'].d_distance.value
else:
df[cls._mugal[0]] = df[cls._mugal[1]] = df[cls._vr] = df[cls._pos[0]].to_numpy()+df[cls._vel[0]].to_numpy()
@classmethod
def __pp_convert_galactic_to_icrs(cls, df: pd.DataFrame) -> None:
"""
converts PMs in galactic coordinates (mulcosb, mub) in arcsec/yr (as output by Galaxia)
to ra/dec in mas/yr (units of output catalog)
"""
GC = coordinates.Galactic(l = df[cls._gal[0]].to_numpy()*units.degree,
b = df[cls._gal[1]].to_numpy()*units.degree,
distance = df[cls._rad].to_numpy()*units.kpc,
pm_l_cosb = df[cls._mugal[0]].to_numpy()*units.mas/units.yr,
pm_b = df[cls._mugal[1]].to_numpy()*units.mas/units.yr,
radial_velocity = df[cls._vr].to_numpy()*units.km/units.s)
GC_icrs = GC.transform_to(coordinates.ICRS())
df[cls._cel[0]] = GC_icrs.ra.value
df[cls._cel[1]] = GC_icrs.dec.value
####################################
if GC.shape[0]:
df[cls._mu[0]] = GC_icrs.pm_ra_cosdec.to(units.mas/units.yr).value
df[cls._mu[1]] = GC_icrs.pm_dec.to(units.mas/units.yr).value
else:
df[cls._mu[0]] = df[cls._mu[1]] = df[cls._gal[0]].to_numpy()+df[cls._mugal[0]].to_numpy()
@classmethod
def __pp_convert_icrs_to_galactic(cls, df: pd.DataFrame) -> None:
"""
converts PMs from ICRS coordinates (muacosd, mudec) to Galactic (mul, mub)
input and output in mas/yr for PMs and degrees for positions
also exports the galactic lat and longitude
"""
IC = coordinates.ICRS(ra = df[cls._cel[0]].to_numpy()*units.degree,
dec = df[cls._cel[1]].to_numpy()*units.degree,
pm_ra_cosdec = df[cls._mu[0]].to_numpy()*units.mas/units.yr,
pm_dec = df[cls._mu[1]].to_numpy()*units.mas/units.yr)
IC_gal = IC.transform_to(coordinates.Galactic())
df[cls._gal[0]] = shift_g_lon(IC_gal.l.value)
df[cls._gal[1]] = IC_gal.b.value
####################################
if IC.shape[0]:
df[cls._mugal[0]] = IC_gal.pm_l_cosb.to(units.mas/units.yr).value
df[cls._mugal[1]] = IC_gal.pm_b.to(units.mas/units.yr).value
else:
df[cls._mugal[0]] = df[cls._mugal[1]] = df[cls._cel[0]].to_numpy()+df[cls._mu[0]].to_numpy()
@classmethod
def __pp_last_conversions(cls, df: pd.DataFrame) -> None:
df[cls._pi] = 1.0/df[cls._rad] # parallax in mas (from distance in kpc)
df[cls._teff] = 10**df[cls._teff] #Galaxia returns log10(teff/K)
df[cls._lum] = 10**df[cls._lum] #Galaxia returns log10(lum/lsun)
@property
def _max_pp_workers(self) -> int:
return self.__max_pp_workers
@_max_pp_workers.setter
def _max_pp_workers(self, value: int) -> None:
self.__max_pp_workers: int = value
@property
def _pp_auto_flush(self) -> bool:
return self.__pp_auto_flush
@_pp_auto_flush.setter
def _pp_auto_flush(self, value: bool) -> None:
self.__pp_auto_flush: bool = value
@staticmethod
def __consolidate_partitions_per_process(partitions: NDArray, lengths: NDArray, max_workers: int) -> Dict[int, List[int]]:
df = pd.DataFrame({'partition': partitions, 'length': lengths, 'process_id': 0*lengths})
df.set_index('partition', drop=True, inplace=True)
df.sort_values('length', inplace=True)
df['length_cumsum'] = df.length.cumsum()
df['length_cumsum_norm'] = (
df.length_cumsum/df.length_cumsum.iloc[-1]
if df.length_cumsum.iloc[-1]
else np.linspace(0,1,df.shape[0]+1)[1:]
)
df['process_id'] = np.ceil(df.length_cumsum_norm*max_workers).astype('int')-1
unique = df.process_id.unique()
df['process_id'] = df.process_id.map(dict(zip(unique, range(len(unique)))))
df['temp'] = df.length * (-1)**(df.process_id)
df.sort_values(['process_id','temp'], inplace=True)
return df.groupby('process_id').groups
[docs]
def apply_post_process_pipeline_and_flush(self, post_process: CallableDFtoNone, *args,
flush_with_columns=(), hold_flush: bool = False, hold_reload: bool = False,
consolidate_partitions_per_process: bool = False,
update_metadata: Optional[Dict[str, Union[str,float,int]]] = None) -> None:
"""
Apply a given post processing routine to the catalogue
Parameters
----------
post_process : callable
Post processing pipeline to apply to the catalogue. This must
be defined as a callable that returns nothing, and take only
positional arguments, the first of which being the DataFrame
representing the catalogue.
\*args : callable args
Any other positinoal arguments that should be passed to the
``post_process`` callable pipeline, in the order they should
be passed.
flush_with_columns : iterable
If given an iterable structure of existing column keys, the
flushing done after application of the post-processing
will also overwrite those in the backend file with their
current in-memory values. Default to an empty tuple.
hold_flush : bool
Flag to hold the flushing from being done after application of
the post-processing. Default to False.
hold_reload : bool
Flag to hold the reload from being done after application of
the post-processing and flushing. Default to False.
consolidate_partitions_per_process : bool
TODO
update_metadata : dict
TODO
"""
# post_process(self._vaex, *args)
vaex_df_or_hdf5_or_list_s = self._hdf5s if self._pp_auto_flush else self._vaex_per_partition
if consolidate_partitions_per_process:
partitions_per_process: Dict[int, List[int]] = self.__consolidate_partitions_per_process(
np.array(list(self.__partitions_lengths.keys())),
np.array(list(self.__partitions_lengths.values())),
self._max_pp_workers
)
vaex_df_or_hdf5_or_list_s = {process: [vaex_df_or_hdf5_or_list_s[i] for i in partitions]
for process, partitions in partitions_per_process.items()}
if update_metadata is not None:
pass # TODO check format of update_metadata ?
if self._pp_auto_flush:
with pathos.pools.ProcessPool(self._max_pp_workers) as executor: # credit to https://github.com/uqfoundation/pathos/issues/158#issuecomment-449636971
# Submit tasks to the executor
futures = [executor.apipe(_decorate_post_processing(post_process,
self._pp_auto_flush,
flush_with_columns=flush_with_columns,
max_thread_workers=int(np.ceil(os.cpu_count()/self._max_pp_workers)),
update_metadata=update_metadata,
verbose=self.verbose),
vaex_df_or_hdf5_or_list, *args)
for vaex_df_or_hdf5_or_list in vaex_df_or_hdf5_or_list_s.values()]
# Collect the results
_ = [future.get() for future in futures]
else:
with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_pp_workers) as executor: # credit to https://www.squash.io/how-to-parallelize-a-simple-python-loop/
# Submit tasks to the executor
futures = [executor.submit(_decorate_post_processing(post_process,
self._pp_auto_flush,
flush_with_columns=flush_with_columns,
update_metadata=update_metadata,
verbose=self.verbose),
vaex_df_or_hdf5, *args)
for vaex_df_or_hdf5 in vaex_df_or_hdf5_or_list_s]
# Collect the results
_ = [future.result() for future in concurrent.futures.as_completed(futures)]
if not(hold_flush) and not(self._pp_auto_flush):
self.flush_extra_columns_to_hdf5(with_columns=flush_with_columns)
if not(hold_reload):
self.__reload_vaex()
[docs]
def post_process_output(self) -> None:
self.check_state_before_running(description="pp_cartesian_to_galactic")(self._pp_convert_cartesian_to_galactic)(hold_reload=True)
self.check_state_before_running(description="pp_galactic_to_icrs", level=1)(self._pp_convert_galactic_to_icrs)(hold_reload=True)
self.check_state_before_running(description="pp_last_conversions", level=1)(self._pp_last_conversions)(hold_reload=True)
self.__reload_vaex()
def _pp_convert_cartesian_to_galactic(self, **kwargs) -> None:
pipeline_name = "convert_cartesian_to_galactic"
if self.verbose:
print(f"Running {pipeline_name} post-processing pipeline")
self.apply_post_process_pipeline_and_flush(self.__pp_convert_cartesian_to_galactic, flush_with_columns=self._gal+(self._rad,), **kwargs)
def _pp_convert_galactic_to_icrs(self, **kwargs) -> None:
pipeline_name = "convert_galactic_to_icrs"
if self.verbose:
print(f"Running {pipeline_name} post-processing pipeline")
self.apply_post_process_pipeline_and_flush(self.__pp_convert_galactic_to_icrs, flush_with_columns=self._cel, **kwargs)
def _pp_convert_icrs_to_galactic(self, **kwargs) -> None:
pipeline_name = "convert_icrs_to_galactic"
if self.verbose:
print(f"Running {pipeline_name} post-processing pipeline")
self.apply_post_process_pipeline_and_flush(self.__pp_convert_icrs_to_galactic, flush_with_columns=self._gal+self._mugal, **kwargs)
def _pp_last_conversions(self, **kwargs) -> None:
pipeline_name = "last_conversions"
if self.verbose:
print(f"Running {pipeline_name} post-processing pipeline")
self.apply_post_process_pipeline_and_flush(self.__pp_last_conversions, flush_with_columns=(self._teff, self._lum), **kwargs)
def __name_with_ext(self, ext):
name_base = self._file_base
return name_base.parent / f"{name_base.name}{ext}"
[docs]
def save(self, path): # TODO Gotta update this
"""
Save output to new path
.. danger:: currently not implemented
"""
raise NotImplementedError
old_path = self._path
self.__path = pathlib.Path(path)
self._vaex.close()
old_path.rename(self._path)
self.__vaex = vaex.open(self._path)
def __make_state(self):
self.__state: Output._State = self._State(self.__name_with_ext('.dummy'), self)
@property
def _state(self) -> Output._State:
return self.__state
@property
def check_state_before_running(self):
return self._state.check_state_file_before_running
@property
def caching(self):
return self.survey.caching
@property
def verbose(self):
return self.survey.verbose
@property
def survey(self):
return self.__survey
@property
def photosystems(self):
return self.survey.photosystems
@property
def export_keys(self) -> Tuple[str]:
return self._make_export_keys(self.photosystems, extra_keys=self._make_input_optional_keys())
@property
def catalogue_keys(self) -> Tuple[str]:
return self._make_catalogue_keys(self.photosystems, extra_keys=self._make_input_optional_keys())
@property
def output_dir(self):
return pathlib.Path(self.parameters[FTTAGS.output_dir])
@property
def output_name(self):
return f"{self.survey.surveyname_hash}.{self.survey.inputname_hash}"
@property
def rsun_skycoord(self):
_temp = [self.parameters[k] for k in FTTAGS.rSun]
return coordinates.SkyCoord(u=_temp[0], v=_temp[1], w=_temp[2], unit='kpc', representation_type='cartesian', frame='galactic')
@property
def parameters(self) -> Dict[str, Union[str,float,int]]:
return self.survey.parameters
@property
def parameter_mag_color_names(self) -> str:
return self.parameters[FTTAGS.mag_color_names]
@property
def parameter_magnitude_name(self) -> str:
return self.parameter_mag_color_names.split(',')[0]
@property
def parameter_abs_mag_hi(self) -> str:
return self.parameters[FTTAGS.abs_mag_lim_hi]
@property
def parameter_app_mag_hi(self) -> str:
return self.parameters[FTTAGS.app_mag_lim_hi]
@property
def all_metadata(self) -> Dict[str, Any]:
return {
k: v if isinstance(v, (bool, int, float, str)) else str(v) # TODO consider Iterable case?
for k, v in {
**{f"{PARAMETER_METADATA_PREFIX}{k}": v for k, v in self.parameters.items()},
**{f"{INPUT_METADATA_PREFIX}{k}": v for k, v in self.survey.input.metadata.items()},
**{f"{SURVEY_METADATA_PREFIX}{k}": v for k, v in self.survey.metadata.items()},
}.items()
}
@cached_property
def __ebfs_partitions(self) -> Dict[int, Dict[str, NDArray]]:
return_dict = {}
for ebf_path in self._ebfs:
if ebf_path.exists():
ebf_name: str = ebf_path.name
ebf_str: str = str(ebf_path.resolve())
indices_generator = (item
for item in pd.DataFrame(
ebf.read(ebf_str, f"/{self._partitionid}")
).groupby([0]).indices.items() # groupby ignores ids that don't appear
if item[0]>=0) # hard-coded condition to only take partition id >= 0
for i, ind in indices_generator:
if i in return_dict:
return_dict[i][ebf_name] = ind
else:
return_dict[i] = {ebf_name: ind}
else:
raise RuntimeError("Don't attempt creating an Output object on your own, those are meant to be returned by Survey")
if not return_dict:
return_dict[-1] = {}
return return_dict
@cached_property
def __ebfs_part_lengths(self) -> Dict[int, Dict[str, int]]:
return {i: {ebf_name: len(indices)
for ebf_name,indices in indices_per_ebf.items()}
for i,indices_per_ebf in self.__ebfs_partitions.items()}
@cached_property
def __partitions_lengths(self) -> Dict[int, int]:
return {i: sum(part_lengths_in_ebfs.values()) for i, part_lengths_in_ebfs in self.__ebfs_part_lengths.items()}
@cached_property
def __ebfs_part_slices(self) -> Dict[int, Dict[str, List[slice]]]:
return {i: {ebf_name: [slice(start, stop)
for start, stop in zip(
[indices[0]]+indices[where_slice_change+1].tolist(),
(indices[where_slice_change]+1).tolist() + [indices[-1]+1]
)]
for ebf_name,indices in indices_per_ebf.items()
if (where_slice_change:=np.where(np.diff(indices)>1)[0]) is not None}
for i,indices_per_ebf in self.__ebfs_partitions.items()}
@cached_property
def __vaex_partitions(self) -> Dict[int, NDArray]:
return self._vaex[self._partitionid].to_pandas_series().to_frame().groupby([0]).indices
@cached_property
def __vaex_partition_slices(self) -> Dict[int, slice]:
return {i: slice(indices[0], indices[-1]+1) for i,indices in self.__vaex_partitions.items()}
@property
def _vaex(self):
if self.__vaex is None:
raise RuntimeError("Don't attempt creating an Output object on your own, those are meant to be returned by Survey")
else:
return self.__vaex
@property
def _vaex_per_partition(self):
if self.__vaex_per_partition is None:
raise RuntimeError("Don't attempt creating an Output object on your own, those are meant to be returned by Survey")
else:
return self.__vaex_per_partition
@property
def _path(self):
raise NotImplementedError
if self.__path is None:
return self.__hdf5
else:
return self.__path
@property
def _file_base(self):
return self.output_dir / self.output_name
@property
def _ebf_glob_pattern(self):
return self.__name_with_ext('.*.ebf')
@property
def _ebf_glob(self):
_temp = self._ebf_glob_pattern
return _temp.parent.glob(_temp.name)
@cached_property
def _ebfs(self):
ebfs = list(self._ebf_glob)
if not ebfs:
raise RuntimeError("Galaxia didn't return any ebf output files")
return ebfs
def __clear_ebfs(self, force: bool = False) -> None:
for ebf in self._ebf_glob:
if True if force else input(f"You are about to remove {ebf}, do I proceed? [y/N] ") == 'y':
ebf.unlink()
@property
def _hdf5_glob_pattern(self):
return self.__name_with_ext('.*.h5')
@cached_property
def _hdf5s(self):
pattern = self._hdf5_glob_pattern
partitions = self.__ebfs_partitions
length_tags = len(str(max(tuple(partitions.keys())+(0,))))
return {i: pattern.parent / pattern.name.replace('*',f"{i:0{length_tags}d}") for i in partitions.keys()}
[docs]
def flush_extra_columns_to_hdf5(self, with_columns: Optional[Iterable] = (), update_metadata: Optional[Dict[str, Union[str,float,int]]] = None) -> None: # temporary until vaex supports it
"""
Flush the dataframe new columns to its backend memory-mapped file
Parameters
----------
with_columns : iterable
If given an iterable structure of existing column keys, the
flushing will also overwrite those in the backend file with
their current in-memory values. Default to an empty tuple.
update_metadata : dict
TODO
"""
# with pathos.pools.ProcessPool(self.__max_pp_workers) as executor: # credit to https://github.com/uqfoundation/pathos/issues/158#issuecomment-449636971
# # Submit tasks to the executor
# futures = [executor.apipe(_flush_extra_columns_to_hdf5, vaex_df, hdf5_file, with_columns, update_metadata, self.verbose)
# for _, hdf5_file, vaex_df in common_entries(self._hdf5s, self._vaex_per_partition)]
# # Collect the results
# _ = [future.get() for future in futures]
with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_pp_workers) as executor: # credit to https://www.squash.io/how-to-parallelize-a-simple-python-loop/
# Submit tasks to the executor
futures = [executor.submit(_flush_extra_columns_to_hdf5, vaex_df, hdf5_file, with_columns, update_metadata, self.verbose)
for _, hdf5_file, vaex_df in common_entries(self._hdf5s, self._vaex_per_partition)]
# Collect the results
_ = [future.result() for future in concurrent.futures.as_completed(futures)]
# self.__reload_vaex()
# gc.collect()
def __reload_vaex(self) -> None:
if self.__vaex is not None:
self.__vaex.close()
if self._hdf5s.values():
self.__vaex = vaex.open_many(map(str,self._hdf5s.values()))
else:
raise RuntimeError("Corrupted HDF5 internal dictionary")
if self.__vaex_per_partition is not None and not self._pp_auto_flush:
for i in self.__vaex_per_partition:
self.__vaex_per_partition[i].close()
self.__vaex_per_partition = {i: vaex.open(str(hdf5_file)) for i, hdf5_file in self._hdf5s.items()}
gc.collect()
Output.__init__.__doc__ = Output.__init__.__doc__.format(_output_properties=''.join(
[f"\n * {desc} via key{'' if isinstance(key, str) else 's'} ``{str(key).replace(chr(39),'')}``"
for key, desc in Output._export_properties.union(Output._postprocess_properties)]),
_optional_properties=''.join(
[f"\n * {desc} via key{'' if isinstance(key, str) else 's'} ``{str(key).replace(chr(39),'')}``"
for key, desc in Output._all_optional_properties]))
if __name__ == '__main__':
raise NotImplementedError()