#!/usr/bin/env python
#
# Author: Adrien CR Thob
# Copyright (C) 2022 Adrien CR Thob
#
# This file is part of the py-Galaxia-ananke project,
# <https://github.com/athob/py-Galaxia-ananke>, which is licensed
# under the GNU Affero General Public License v3.0 (AGPL-3.0).
#
# The full copyright notice, including terms governing use, modification,
# and redistribution, is contained in the files LICENSE and COPYRIGHT,
# which can be found at the root of the source code distribution tree:
# - LICENSE <https://github.com/athob/py-Galaxia-ananke/blob/main/LICENSE>
# - COPYRIGHT <https://github.com/athob/py-Galaxia-ananke/blob/main/COPYRIGHT>
#
"""
Contains the Survey class definition
Please note that this module is private. The Survey class is
available in the main ``galaxia_ananke`` namespace - use that instead.
"""
from __future__ import annotations
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, Optional, Union, Tuple, List, Set, Dict, Iterable
from numpy.typing import NDArray, ArrayLike
from warnings import warn
from functools import cached_property
import re
import pathlib
from pprint import PrettyPrinter
from ._constants import *
from ._templates import *
from ._defaults import *
from .utils import CallableDFtoInt, execute, lexicalorder_dict, mark_metadata_prop, collect_metadata_marked_properties, hash_iterable
from . import photometry
from .photometry.PhotoSystem import PhotoSystem
from .Output import Output
if TYPE_CHECKING:
from . import Input
__all__ = ['Survey']
[docs]
@collect_metadata_marked_properties
class Survey:
[docs]
def __init__(self, input: Input, photo_sys: Union[str,List[str]] = DEFAULT_PSYS, surveyname: str = DEFAULT_SURVEYNAME, verbose: bool = True) -> None:
"""
Driver to exploit the input object and run Galaxia with it.
Call signature::
survey = Survey(input,
photo_sys={DEFAULT_PSYS},
surveyname='{DEFAULT_SURVEYNAME}')
Parameters
----------
input : :obj:`Input`
Input object storing the particle data.
photo_sys : string or list
Name(s) of the photometric system(s) Galaxia should use to
generate the survey. Default to {DEFAULT_PSYS}.
Available photometric systems can be found with the photometry
submodule - please refer to its documentation for further
details.
surveyname : string
Optional name Galaxia should use for the output files. Default
to '{DEFAULT_SURVEYNAME}'.
"""
self.__surveyname: str = surveyname
self.__input: Input = input
self.__photosystems: List[PhotoSystem] = self.prepare_photosystems(photo_sys)
self.__verbose: bool = verbose
self.__fileparam: MappingProxyType[str, Union[str,float,int]] = None
self.__extraparam: MappingProxyType[str, Union[str,float,int]] = None
self.__output: Output = None
__init__.__doc__ = __init__.__doc__.format(DEFAULT_SURVEYNAME=DEFAULT_SURVEYNAME,
DEFAULT_PSYS=DEFAULT_PSYS)
def __repr__(self) -> str:
cls = self.__class__.__name__
description = ', '.join([(f"{prop}={getattr(self, prop)}") for prop in ['surveyname', 'photo_sys']])
return f'{cls}({description})'
[docs]
@classmethod
def prepare_photosystems(cls, photo_sys: str) -> list[PhotoSystem]:
if isinstance(photo_sys, str):
photo_sys = [photo_sys]
return [photometry.available_photo_systems[psys] for psys in photo_sys]
[docs]
@classmethod
def set_isochrones_from_photosys(cls, photo_sys: str) -> list[PhotoSystem]:
warn('This class method will be deprecated, please use instead class method prepare_photosystems', DeprecationWarning, stacklevel=2)
return cls.prepare_photosystems(photo_sys)
def _prepare_survey_parameters_and_output(self, cmd_magnames: Union[str,Dict[str,str]], n_gens: Iterable[int], **kwargs) -> None:
photosys = self.photosystems[0]
cmd_magnames: str = photosys.check_cmd_magnames(cmd_magnames)
parameters: Dict[str, Union[str,float,int]] = DEFAULTS_FOR_PARFILE.copy()
parameters.update(**{FTTAGS.photo_categ: photosys.category, FTTAGS.photo_sys: photosys.name, FTTAGS.mag_color_names: cmd_magnames, FTTAGS.nres: self.ngb}, **kwargs)
n_gen_tag_length = len(str(max(n_gens)))
self.__fileparam = MappingProxyType(parameters)
self.__extraparam = MappingProxyType({
**{f"n_gen_{i:0{n_gen_tag_length}d}": n for i,n in enumerate(n_gens)},
**{k: v for n,PS in enumerate(self.photosystems[1:], start=1)
for k,v in zip(FTTAGS.append_photo(n), PS.categ_and_name)}
})
self.__output = Output(self)
def _write_parameter_file(self) -> Tuple[pathlib.Path, Dict[str, Union[str,float,int]]]:
parameters: Dict[str, Union[str,float,int]] = self.fileparam
surveyname_hash: str = self.surveyname_hash
parfile: pathlib.Path = self.inputdir / PARFILENAME_TEMPLATE.substitute({FTTAGS.name: surveyname_hash}) # TODO make temporary? create a global record of temporary files?
parfile_text: str = PARFILE_TEMPLATE.substitute({FTTAGS.output_file: surveyname_hash, **parameters})
if ((parfile.read_text() != parfile_text # proceed if parfile_text is not in parfile,
if parfile.exists() # only if parfile exist,
else True) # otherwise proceed if doesn't exist
if self.caching else True): # -> proceed anyway if self.caching is False
parfile.write_text(parfile_text)
return parfile, parameters
def _run_survey(self, parfile: pathlib.Path, n_gens: Iterable[int], max_gen_workers: int) -> None:
cmds = [RUN_TEMPLATE.substitute(**{
CTTAGS.hdim_block : '' if self.hdim is None
else HDIMBLOCK_TEMPLATE.substitute(**{CTTAGS.hdim: self.hdim}),
CTTAGS.nfile : self.inputname_hash,
CTTAGS.ngen : ngen,
CTTAGS.parfile : parfile
}) for ngen in n_gens]
execute(cmds, max_workers=max_gen_workers, verbose=self.verbose)
def _append_survey(self, photosystem: PhotoSystem, max_gen_workers: Optional[int]) -> None:
if max_gen_workers is None:
max_gen_workers = len(list(self.__ebf_output_files_glob))
cmds = [APPEND_TEMPLATE.substitute(**{
CTTAGS.pcat : photosystem.category,
CTTAGS.psys : photosystem.name,
CTTAGS.filename : filename
}) for filename in self.__ebf_output_files_glob]
execute(cmds, max_workers=max_gen_workers, verbose=self.verbose)
def _vanilla_survey(self, cmd_magnames: Union[str,Dict[str,str]] = DEFAULT_CMD,
fsample: float = 1, input_sorter: ArrayLike = None,
n_jobs: int = None, n_gens: Union[int, Iterable[int]] = (0,),
max_gen_workers: int = None, **kwargs) -> None:
"""
TODO
"""
if isinstance(n_jobs, int):
n_gens = n_jobs
warn('Keyword argument n_jobs will be deprecated, please use instead keyword argument n_gens. Consider also reading doc regarding keyword argument max_pp_workers.', DeprecationWarning, stacklevel=2)
if isinstance(n_gens, int):
n_gens = range(n_gens)
if max_gen_workers is None:
max_gen_workers = len(n_gens)
else:
warn('The keyword argument max_gen_workers is currently not implemented.', stacklevel=2)
self.input.input_sorter = input_sorter
self._prepare_survey_parameters_and_output(cmd_magnames, n_gens, fsample=fsample, **kwargs)
inputname, parfile, for_parfile = self.input.prepare_input(self)
#
self.check_state_before_running(description='run_survey_complete')(self._run_survey)(parfile, n_gens=n_gens, max_gen_workers=max_gen_workers)
for photosystem in self.photosystems[1:]:
self.check_state_before_running(description=f'append_{photosystem.name}_complete', level=1)(self._append_survey)(photosystem, max_gen_workers=max_gen_workers)
[docs]
def make_survey(self, *, verbose: bool = True, partitioning_rule: CallableDFtoInt = None,
max_pp_workers: int = 1, pp_auto_flush: bool = True, **kwargs) -> Output:
"""
Driver to exploit the input object and run Galaxia with it.
Call signature::
output = self.make_survey(cmd_magnames= '{DEFAULT_CMD}' ,
fsample=1, verbose=True, **kwargs)
Parameters
----------
cmd_magnames : string
Names of the filters Galaxia should use for the color-
magnitude diagram box selection. The given string must meet
the following format::
"band1,band2-band3"
where ``band1`` is the magnitude filter and ``(band2, band3)``
are the filters that define the ``band2-band3`` color index.
The filter names must correspond to filters that are part of
the first chosen photometric system in photo_sys. Default to
``'{DEFAULT_CMD}'``
fsample : float
Sampling rate from 0 to 1 for the resulting synthetic star
survey. 1 returns a full sample while any value under returns
partial surveys. Default to 1.
input_sorter : array_like
TODO
n_gens, n_jobs : int or iterable of int
Number of independent catalog generations ran in parallel. Can
also receive an iterable containing each generation number to
run in parallel. Default to 1. Usage of n_jobs is deprecated
and will be removed.
max_gen_workers : int
CURRENTLY NOT PROPERLY IMPLEMENTED
Maximum number of workers to parallelize the initial catalog
generations. Default to the number of independent generations
in n_gens.
max_pp_workers : int
Maximum number of workers to parallelize the post-processing
pipelines after the initial catalog generation. Default to 1.
pp_auto_flush : bool
TODO
verbose : bool
Verbose boolean flag to allow pipeline to print what it's doing
to stdout. Default to True.
partitioning_rule : TODO
TODO
parfile : string
Name of file where Input should save the parameters for
Galaxia. Default to '{DEFAULT_PARFILE}'
output_dir : string or pathlib.Path
Path to directory where to save the input/output files of
Galaxia. Default to '{TTAGS_output_dir}'
app_mag_lim_lo, app_mag_lim_hi, abs_mag_lim_lo, abs_mag_lim_hi, color_lim_lo, color_lim_hi : float
These allow to specify the limits of the chosen color-magnitude
diagram box selection (``lo`` for lower and ``hi`` for upper).
``app_mag``, ``abs_mag`` and ``color`` represent respectively
limits in apparent magnitudes, absolute magnitudes and color
index. Default values follow those set in the dictionary::
{DEFAULT_CMD_BOX}
rSun0, rSun1, rSun2 : float
Coordinates for the observer position in kpc. Respectively
default to::
{TTAGS_rSun0}, {TTAGS_rSun1} & {TTAGS_rSun2}
vSun0, vSun1, vSun2 : float
Coordinates for the observer velocity in km/s. Respectively
default to::
{TTAGS_vSun0}, {TTAGS_vSun1} & {TTAGS_vSun2}
r_max, r_min : float
Extent of the shell of radii from observer location within
which particles should be considered by Galaxia. Respectively
default to::
{TTAGS_r_max} & {TTAGS_r_min}
rand_seed : int
Seed to be used by Galaxia's pseudorandom number generator.
Default to {TTAGS_rand_seed}
nstart : int
Index at which to start indexing synthetic stars. Default
to {TTAGS_nstart}
longitude, latitude : float
Currently not implemented. Respectively default to::
{TTAGS_longitude} & {TTAGS_latitude}
star_type : int
Currently not implemented. Default to {TTAGS_star_type}
geometry_opt : int
Currently not implemented. Default to {TTAGS_geometry_opt}
survey_area : float
Currently not implemented. Default to {TTAGS_survey_area}
pop_id : int
Currently not implemented. Default to {TTAGS_pop_id}
warp_flare_on : int
Currently not implemented. Default to {TTAGS_warp_flare_on}
photo_error : int
Currently not implemented. Default to {TTAGS_photo_error}
Returns
-------
output : :obj:`Output`
Handler with utilities to utilize the output survey and its
data.
""" # TODO Move documentation around to the subroutines where they should go
self.verbose = verbose
self._vanilla_survey(**kwargs)
self.output.read_galaxia_output(partitioning_rule, max_pp_workers, pp_auto_flush)
self.output.post_process_output()
return self.output
make_survey.__doc__ = make_survey.__doc__.format(DEFAULT_CMD=DEFAULT_CMD,
DEFAULT_PARFILE=DEFAULT_PARFILE,
DEFAULT_CMD_BOX=('\n'+PrettyPrinter(width=60).
pformat(DEFAULT_CMD_BOX)).
replace('\n','\n '),
**{f"TTAGS_{key}": val
for key,val in DEFAULTS_FOR_PARFILE.items()})
@property
def has_no_fileparam(self) -> bool:
return self.__fileparam is None
@property
def fileparam(self) -> Dict[str, Union[str,float,int]]:
if self.has_no_fileparam:
raise RuntimeError("Survey hasn't been made yet, run method `make_survey` first")
else:
return dict(self.__fileparam)
@property
def _extraparam(self) -> Dict[str, Union[str,float,int]]:
return self.__extraparam
@property
def parameters(self) -> Dict[str, Union[str,float,int]]:
return {**self.fileparam, **self._extraparam}
@property
def n_gens(self) -> List[int]:
return list(lexicalorder_dict({
int(n_gen[0]): value
for key, value in self._extraparam.items()
if (n_gen:=re.findall("n_gen_(\d*)", key))
}).values())
@property
def fsample(self) -> float:
return len(self.n_gens)*self.fileparam[FTTAGS.fsample]
@cached_property
def _surveyhash(self) -> bytes:
return hash_iterable(map(lambda el: str(el).encode(HASH_ENCODING),
lexicalorder_dict(self.parameters).values()))
@property
@mark_metadata_prop
def hash(self) -> str:
return self._surveyhash.decode()
@property
@mark_metadata_prop
def surveyname(self) -> str:
return self.__surveyname
@property
def append_hash(self) -> bool:
return self.input.append_hash
@property
def surveyname_hash(self) -> str:
return self.surveyname + (f"_{self.hash[:7]}" if self.append_hash else "")
@property
def metadata(self) -> Dict[str, Any]:
return self._metadata
@property
def input(self) -> Input:
return self.__input
@property
def photosystems(self) -> List[PhotoSystem]:
return self.__photosystems
@property
def isochrones(self):
warn('This property will be deprecated, please use instead property photosystems', DeprecationWarning, stacklevel=2)
return self.photosystems
@property
def photo_sys(self) -> Set[str]:
return {photosystem.key for photosystem in self.photosystems}
@property
def verbose(self) -> bool:
return self.__verbose
@verbose.setter
def verbose(self, value: Optional[bool]) -> None:
if value is not None:
self.__verbose = value
@property
def has_no_output(self) -> bool:
return self.__output is None
@property
def output(self):
if self.has_no_output:
raise RuntimeError("Survey hasn't been made yet, run method `make_survey` first")
else:
return self.__output
@property
def caching(self) -> bool:
return self.input.caching
@property
def hdim(self) -> int:
return self.input.hdim
@property
def inputname_hash(self) -> str:
return self.input.name_hash
@property
def inputdir(self) -> pathlib.Path:
return self.input._input_dir
@property
def ngb(self) -> int:
return self.input.ngb
@property
def check_state_before_running(self):
return self.output.check_state_before_running
@property
def __ebf_output_files_glob(self):
return self.output._ebf_glob
if __name__ == '__main__':
raise NotImplementedError()