Source code for Galaxia_ananke.Input

#!/usr/bin/env python
"""
Contains the Input class definition

Please note that this module is private. The Input class is
available in the main ``Galaxia`` namespace - use that instead.
"""
from typing import Union, Tuple, Dict
import re
import pathlib
import numpy as np
import ebf
from astropy.utils import classproperty

from ._constants import *
from ._templates import *
from ._defaults import *
from .utils import make_symlink, compare_given_and_required, confirm_equal_length_arrays_in_dict
from .photometry.PhotoSystem import PhotoSystem

__all__ = ['Input']


FOURTHIRDPI = 4*np.pi/3

[docs] class Input: _position_prop = ('pos3', "Position coordinates in $kpc$ (Nx3)") _velocity_prop = ('vel3', "Velocity coordinates in $km/s$ (Nx3)") _mass_prop = ('mass', "Stellar masses in solar masses") _age_prop = ('age', "Stellar ages in years and decimal logarithmic scale") _metallicity_prop = ('feh', "Stellar metallicity $[Fe/H]$ in dex relative to solar") _parentindex_prop = ('parentid', "Index of parent particle") _populationindex_prop = ('id', "Index of parent particle population") _partitionindex_prop = ('partitionid', "Index of the data partition that contains the particle") _formationdistance_prop = ('dform', "Formation distance of parent particle in kpc") _heliumabundance_prop = ('helium', "Helium abundance $[He/H]$ in $dex$") _carbonabundance_prop = ('carbon', "Carbon abundance $[C/H]$ in $dex$") _nitrogenabundance_prop = ('nitrogen', "Nitrogen abundance $[N/H]$ in $dex$") _oxygenabundance_prop = ('oxygen', "Oxygen abundance $[O/H]$ in $dex$") _neonabundance_prop = ('neon', "Neon abundance $[Ne/H]$ in $dex$") _magnesiumabundance_prop = ('magnesium', "Magnesium abundance $[Mg/H]$ in $dex$") _siliconabundance_prop = ('silicon', "Silicon abundance $[Si/H]$ in $dex$") _sulphurabundance_prop = ('sulphur', "Sulphur abundance $[S/H]$ in $dex$") _calciumabundance_prop = ('calcium', "Calcium abundance $[Ca/H]$ in $dex$") _alphaabundance_prop = ('alpha', "Alpha abundance $[Mg/Fe]$ in $dex$") _kernels = 'h_cubic' _density = 'density' _positiondensity_prop = ('rho_pos', 'Position space density in $kpc^{-3}$') _velocitydensity_prop = ('rho_vel', 'Velocity space density in $[km/s]^{-3}$')
[docs] def __init__(self, *args, **kwargs) -> None: """ Driver to store and prepare the input data for Galaxia. Call signatures:: input = Input(particles, {rho_pos}, {rho_vel}=None, input_dir='{GALAXIA_TMP}', name='{DEFAULT_SIMNAME}', ngb={TTAGS_nres}, k_factor=1., former_kernel=False) input = Input(pname, kname, former_kernel=False) Parameters ---------- particles : dict Dictionary where each elements represent the properties of the input particles, given as equal-length array_like objects. {particles_dictionary_description} Input comes with class method make_dummy_particles_input to produce a dummy example of that dictionary. {rho_pos} : array_like Contains the position-determined kernel density estimates for the input particles. Must have equal lengths as the elements in the particles dictionary. {rho_vel} : array_like Contains the velocity-determined kernel density estimates for the input particles. Must have equal lengths as the elements in the particles dictionary. input_dir : string or pathlib.Path Optional arguments to specify path for the directory where Galaxia's input data should be generated. Default to '{GALAXIA_TMP}'. name : string Optional name Galaxia should use for the input files. Default to '{DEFAULT_SIMNAME}'. ngb : int Number of neighbouring particles Galaxia should consider. Default to {TTAGS_nres}. ONLY SUPPORT 8, 32, 64 & 128. k_factor : float Scaling factor applied to the kernels lengths to adjust all the kernels sizes uniformly. Lower values reduces the kernels extents, while higher values increases them. Default to 1 (no adjustment). pname : string Path to existing pre-formatted particles EBF files to use as input for Galaxia. This keyword argument must be used in conjunction with kname. Default to None if unused. kname : string Path to existing pre-formatted kernel EBF files to use as input for Galaxia. This keyword argument must be used in conjunction with pname. Default to None if unused. former_kernel : bool or dict Flag that allow the utilization of the former implementation for the kernels lengths with the consideration of a kernel normalization factor. If providing a dictionary, you can configure the former kernel normalization factor knorm by including the value knorm under key 'knorm'. Default to False, if True, knorm defaults to 0.596831. """ if args: if len(args) not in [2,3]: raise # TODO mix & match args & kwargs for particles and rho_pos kwargs['particles'] = args[0] kwargs[self._rho_pos] = args[1] kwargs[self._rho_vel] = args[2] if len(args) == 3 else kwargs.get(self._rho_vel, None) self.__input_files_exist = False elif {'pname', 'kname'}.issubset(kwargs.keys()): # TODO implement case where pname and kname non-formated names _pname = kwargs['pname'] = pathlib.Path(kwargs['pname']) _kname = kwargs['kname'] = pathlib.Path(kwargs['kname']) if _pname.parent != _kname.parent: raise ValueError(f"Given pname file {_pname} and kname file {_kname} need to share the same parent directory") kwargs['input_dir'] = _pname.parent kwargs['name'] = re.findall("(.*).ebf", _pname.name)[0] _hdim, kwargs['ngb'] = map(int, re.findall(f"{kwargs['name']}_d(\\d*)n(\\d*)_den.ebf", _kname.name)[0]) # TODO what if _hdim is 3 ? kwargs['particles'] = ebf.read(_pname) _k = ebf.read(_kname) _mass = _k[self._mass] # dummy line to check format kwargs[self._rho_pos] = _k[self._density] _k_factor = _k[self._kernels][:,0] * np.cbrt(FOURTHIRDPI*_k[self._density]) if kwargs.get('former_kernel', False): _knorm = _k_factor/(np.sqrt(kwargs['ngb']) * np.cbrt(FOURTHIRDPI)) # _knorm = _k[self._kernels][:,0] * np.cbrt(_k[self._density]) / np.sqrt(kwargs['ngb']) _knorm = np.median(_knorm) if len(np.unique(np.round(_knorm/(2*np.finfo(_knorm.dtype).eps)).astype('int')))==1 else _knorm kwargs[self._rho_vel] = (np.sqrt(kwargs['ngb']) * _knorm / _k[self._kernels][:,1])**3 kwargs['former_kernel'] = {'knorm': _knorm} else: kwargs[self._rho_vel] = (_k_factor / _k[self._kernels][:,1])**3 / FOURTHIRDPI kwargs['k_factor'] = _k_factor self.__input_files_exist = True else: raise ValueError("Wrong signature: please consult help of the Input constructor") self.__particles = kwargs['particles'].copy() self.__verify_particles(self.particles) self.__complete_particles(self.particles) self.__pos_density = kwargs[self._rho_pos] self.__vel_density = kwargs.get(self._rho_vel) self.__input_dir = pathlib.Path(kwargs.get('input_dir', GALAXIA_TMP)) self.__name = kwargs.get('name', DEFAULT_SIMNAME) self.__pname = kwargs.get('pname', None) self.__kname = kwargs.get('kname', None) self.__ngb = kwargs.get('ngb', FTTAGS.nres) __old = kwargs.get('former_kernel', False) if __old and not isinstance(__old, dict): __old = {} __knorm = __old.get('knorm', 0.596831) if isinstance(__old, dict) else None self.__k_factor = kwargs.get('k_factor', 1. if __knorm is None else np.sqrt(self.ngb) * __knorm * np.cbrt(FOURTHIRDPI))
@classproperty def particles_dictionary_description(cls): description = """ The particle dictionary includes the following properties with corresponding keys: {_required_properties} Additionally, Galaxia can optionally receive particle properties that will be carried over to the generated synthetic star, those include the following: {_optional_properties} """.format(_required_properties=''.join( [f"\n * {desc} via key ``{str(key)}``" for key, desc in Input._required_properties]), _optional_properties=''.join( [f"\n * {desc} via key ``{str(key)}``" for key, desc in Input._optional_properties])) return description @classproperty def _required_properties(cls): return { cls._position_prop, cls._velocity_prop, cls._mass_prop, cls._age_prop, cls._metallicity_prop } @classproperty def _optional_properties(cls): return { cls._parentindex_prop, cls._partitionindex_prop, cls._populationindex_prop, cls._formationdistance_prop, cls._heliumabundance_prop, cls._carbonabundance_prop, cls._nitrogenabundance_prop, cls._oxygenabundance_prop, cls._neonabundance_prop, cls._magnesiumabundance_prop, cls._siliconabundance_prop, cls._sulphurabundance_prop, cls._calciumabundance_prop, cls._alphaabundance_prop } @classproperty def _required_keys_in_particles(cls): return {_property[0] for _property in cls._required_properties} @classproperty def _optional_keys_in_particles(cls): return {_property[0] for _property in cls._optional_properties} @classproperty def all_possible_keys_in_particles(cls): return cls._required_keys_in_particles.union(cls._optional_keys_in_particles) @classproperty def _pos(cls): return cls._position_prop[0] @classproperty def _vel(cls): return cls._velocity_prop[0] @classproperty def _mass(cls): return cls._mass_prop[0] @classproperty def _age(cls): return cls._age_prop[0] @classproperty def _feh(cls): return cls._metallicity_prop[0] @classproperty def _He(cls): return cls._heliumabundance_prop[0] @classproperty def _C(cls): return cls._carbonabundance_prop[0] @classproperty def _N(cls): return cls._nitrogenabundance_prop[0] @classproperty def _O(cls): return cls._oxygenabundance_prop[0] @classproperty def _Ne(cls): return cls._neonabundance_prop[0] @classproperty def _Mg(cls): return cls._magnesiumabundance_prop[0] @classproperty def _Si(cls): return cls._siliconabundance_prop[0] @classproperty def _S(cls): return cls._sulphurabundance_prop[0] @classproperty def _Ca(cls): return cls._calciumabundance_prop[0] @classproperty def _elem_list(cls): return [cls._He, cls._C, cls._N, cls._O, cls._Ne, cls._Mg, cls._Si, cls._S, cls._Ca] @classproperty def _alph(cls): return cls._alphaabundance_prop[0] @classproperty def _parentid(cls): return cls._parentindex_prop[0] @classproperty def _partitionid(cls): return cls._partitionindex_prop[0] @classproperty def _dform(cls): return cls._formationdistance_prop[0] @classproperty def _pop_id(cls): return cls._populationindex_prop[0] @classproperty def _rho_pos(cls): return cls._positiondensity_prop[0] @classproperty def _rho_vel(cls): return cls._velocitydensity_prop[0] @property def particles(self): return self.__particles @property def length(self): return len(self.particles[self._mass]) @property def rho_pos(self): return self.__pos_density @property def rho_vel(self): return self.__vel_density @property def rho(self): return np.vstack([self.rho_pos, self.rho_vel]).T # TODO what if rho_vel is None @property def hdim(self): return 3 if self.rho_vel is None else 6 @property def name(self) -> str: return self.__name @property def ngb(self): return self.__ngb @property def k_factor(self): return self.__k_factor @property def _input_dir(self): return self.__input_dir @property def kernels(self): return self.k_factor/np.cbrt(FOURTHIRDPI*self.rho) @property def kname(self): # TODO replace with functools cached_property? if self.__kname is None: self.__kname = self._input_dir / f"{self.name}_d{self.hdim}n{self.ngb}_den.ebf" return self.__kname @property def pname(self): if self.__pname is None: self.__pname = self._input_dir / f"{self.name}.ebf" return self.__pname
[docs] def keys(self): return self.particles.keys()
[docs] def optional_keys(self): return list(set(self.keys()).intersection(self._optional_keys_in_particles))
[docs] def prepare_input(self, photosys: PhotoSystem, cmd_magnames: Union[str,Dict[str,str]], **kwargs) -> Tuple[str, pathlib.Path, Dict[str, Union[str,float,int]]]: cmd_magnames: str = photosys.check_cmd_magnames(cmd_magnames) parfile, for_parfile = self._write_parameter_file(photosys, cmd_magnames, **kwargs) sorter = np.lexsort((self.particles[self._partitionid],)) kname = self._write_kernels(sorter) pname = self._write_particles(sorter) temp_filename = self._prepare_nbody1(kname, pname) return self.name, parfile, for_parfile
def _write_parameter_file(self, photosys: PhotoSystem, cmd_magnames: str, **kwargs) -> Tuple[pathlib.Path, Dict[str, Union[str,float,int]]]: parfile = pathlib.Path(kwargs.pop('parfile', DEFAULT_PARFILE)) # TODO make temporary? create a global record of temporary files? if not parfile.is_absolute(): parfile = self._input_dir / parfile for_parfile = DEFAULTS_FOR_PARFILE.copy() for_parfile.update(**{FTTAGS.photo_categ: photosys.category, FTTAGS.photo_sys: photosys.name, FTTAGS.mag_color_names: cmd_magnames, FTTAGS.nres: self.ngb}, **kwargs) parfile.write_text(PARFILE_TEMPLATE.substitute(for_parfile)) return parfile, for_parfile def _write_kernels(self, sorter): kname = self.kname if not self.__input_files_exist: ebf.initialize(self.kname) ebf.write(kname, f"/{self._density}", self.rho_pos[sorter], "a") ebf.write(kname, f"/{self._kernels}", self.kernels[sorter], "a") ebf.write(kname, f"/{self._mass}", self.particles[self._mass][sorter], "a") return kname def _write_particles(self, sorter): pname = self.pname if not self.__input_files_exist: ebf.initialize(pname) for key in self._required_keys_in_particles: ebf.write(pname, f"/{key}", self.particles[key][sorter], 'a') for key in self._optional_keys_in_particles: ebf.write(pname, f"/{key}", self.particles[key][sorter] if key in self.keys() else np.zeros(self.length), 'a') return pname def _prepare_nbody1(self, kname: pathlib.Path, pname: pathlib.Path): temp_dir = GALAXIA_NBODY1 / self.name temp_dir.mkdir(parents=True, exist_ok=True) make_symlink(kname, temp_dir) make_symlink(pname, temp_dir) temp_filename = (GALAXIA_FILENAMES / self.name).with_suffix('.txt') temp_filename.write_text(FILENAME_TEMPLATE.substitute(name=self.name, pname=pname.name)) return temp_filename @classmethod def __verify_particles(cls, particles): compare_given_and_required(particles.keys(), cls._required_keys_in_particles, cls._optional_keys_in_particles, error_message="Given particle data covers wrong set of keys") confirm_equal_length_arrays_in_dict(particles, cls._mass, error_message_dict_name='particles') # TODO check format, if dataframe-like @classmethod def __complete_particles(cls, particles): if cls._parentid not in particles: particles[cls._parentid] = np.arange(particles[cls._mass].shape[0]) if cls._partitionid not in particles: particles[cls._partitionid] = np.zeros(particles[cls._mass].shape[0], dtype='int') if cls._dform not in particles: particles[cls._dform] = 0*particles[cls._mass]
[docs] @classmethod def make_dummy_particles_input(cls, n_parts=10**5): """ Generate an example dummy input particles dictionary for Input made of randomly generated arrays. Parameters ---------- n_parts : int Number of particles the example include. Default to 10**5. Returns ------- p : dict Dummy example input particles dictionary for Input. Notes ----- {particles_dictionary_description} """ p = {} p[cls._pos] = 30*np.random.randn(n_parts, 3) p[cls._vel] = 50*np.random.randn(n_parts, 3) p[cls._mass] = 5500 + 700*np.random.randn(n_parts) p[cls._age] = 9.7 + 0.4*np.random.randn(n_parts) p[cls._feh] = -0.7 + 0.4*np.random.randn(n_parts) for el in cls._elem_list: p[el] = -0.6 + 0.4*np.random.randn(n_parts) p[cls._alph] = p[cls._Mg] - p[cls._feh] p[cls._parentid] = np.arange(n_parts).astype('int') p[cls._partitionid] = np.zeros(n_parts, dtype='int') p[cls._dform] = np.zeros(n_parts, dtype='float32') p[cls._pop_id] = np.zeros(n_parts, dtype='int') return p
[docs] @classmethod def make_dummy_densities_input(cls, n_parts=10**5): """ Generate an example dummy input densities for Input made of randomly generated arrays. Parameters ---------- n_parts : int Number of particles the example include. Default to 10**5. Returns ------- rho_pos : array Dummy example input position densities for Input. rho_vel : array Dummy example input velocity densities for Input. """ rho_pos = np.exp(-2.9 + 1.1*np.random.randn(n_parts)) rho_vel = np.exp(-4.4 + 1.1*np.random.randn(n_parts)) return rho_pos, rho_vel
Input.__init__.__doc__ = Input.__init__.__doc__.format(GALAXIA_TMP=GALAXIA_TMP, DEFAULT_SIMNAME=DEFAULT_SIMNAME, TTAGS_nres=FTTAGS.nres, particles_dictionary_description=Input.particles_dictionary_description, rho_pos=Input._rho_pos, rho_vel=Input._rho_vel) Input.make_dummy_particles_input.__func__.__doc__ = Input.make_dummy_particles_input.__doc__.format( particles_dictionary_description=Input.particles_dictionary_description) if __name__ == '__main__': pass