Source code for galaxia_ananke.photometry.Isochrone

#!/usr/bin/env python
#
# Author: Adrien CR Thob
# Copyright (C) 2022  Adrien CR Thob
#
# This file is part of the py-Galaxia-ananke project,
# <https://github.com/athob/py-Galaxia-ananke>, which is licensed
# under the GNU Affero General Public License v3.0 (AGPL-3.0).
# 
# The full copyright notice, including terms governing use, modification,
# and redistribution, is contained in the files LICENSE and COPYRIGHT,
# which can be found at the root of the source code distribution tree:
# - LICENSE <https://github.com/athob/py-Galaxia-ananke/blob/main/LICENSE>
# - COPYRIGHT <https://github.com/athob/py-Galaxia-ananke/blob/main/COPYRIGHT>
#
"""
Docstring
"""
import sys
from typing import Tuple, List
from numpy.typing import NDArray
from functools import cached_property
from operator import itemgetter
import pathlib
import shutil

import numpy as np

from .._constants import *
from ..utils import compare_given_and_required
from .IsochroneFile import IsochroneFile
from .Formatting import *

__all__ = ['Isochrone']


[docs] class Isochrone: """ Ages need to be in log scale """ _zini = default_formatting._zini _age = default_formatting._age _mini = default_formatting._mini _mact = default_formatting._mact _lum = default_formatting._lum _teff = default_formatting._teff _grav = default_formatting._grav _required_keys = [_age, _mini, _mact, _lum, _teff, _grav] _required_metallicities = {0.0001, 0.0002, 0.0004, 0.0005, 0.0006, 0.0007, 0.0008, 0.0009, 0.001, 0.0012, 0.0014, 0.0016, 0.0018, 0.002, 0.0022, 0.0024, 0.0026, 0.003, 0.0034, 0.004, 0.005, 0.006, 0.007, 0.008, 0.009, 0.01, 0.012, 0.014, 0.016, 0.018, 0.02, 0.024, 0.028, 0.03} _file_descriptor = "IsoFileDescriptor.txt"
[docs] def __init__(self, *args, overwrite=False) -> None: if not args: raise TypeError("Isochrone requires at least one argument") elif len(args) == 1: self._path = pathlib.Path(args[0]) elif len(args) == 2: self._path = ISOCHRONES_PATH / CUSTOM_PHOTOCAT / args[0] if self.path.exists(): if overwrite: shutil.rmtree(self.path) else: raise FileExistsError(f"Isochrone '{args[0]}' already exists at '{self.path}' (use `overwrite` kwarg to ignore)") self._write_isochrone_files(args[1]) else: raise TypeError(f"Too many arguments ({len(args)} given)")
def __repr__(self) -> str: cls = self.__class__.__name__ description = ', '.join([(f"{prop}={getattr(self, prop)}") for prop in ['category', 'name']]) return f'{cls}({description})' def _write_isochrone_files(self, isochrone_data: dict): self.path.mkdir(parents=True, exist_ok=True) iso_column_order = self._write_file_descriptor(isochrone_data) for feh, iso in isochrone_data.items(): path = self._path / IsochroneFile._file_format.format(format(feh, '.6f')) _temp = IsochroneFile(path, iso[iso_column_order], isochrone=self) def _write_file_descriptor(self, isochrone_data): metallicities, headers = zip(*[(feh, list(iso.keys())) for feh, iso in isochrone_data.items()]) compare_given_and_required(metallicities, self._required_metallicities, error_message="Given isochrone data covers wrong set of metallicities") check = [] for header in headers: if header not in check: check.append(header) if len(check) > 1: raise ValueError("Given isochrone dataframes have unequal headers") header = check[0] remain = set(self._required_keys).difference(header) if remain: raise ValueError(f"Given isochrone dataframes have incomplete headers: missing {remain}") magnames = sorted(list(set(header).difference(self._required_keys))) if not magnames: raise ValueError(f"Given isochrone dataframes have no magnitude columns") iso_column_order = self._required_keys + magnames with self.file_descriptor_path.open('w') as f: f.write( f"Python_{self.name} {len(iso_column_order)} {len(self._required_keys)} {len(magnames)} {' '.join(magnames)}\n\n") return iso_column_order @property def path(self): return self._path @property def category(self): return self.path.parent.name @property def name(self): return self.path.name @property def file_descriptor_path(self): return self._path / self._file_descriptor @property def has_file_descriptor(self): return self.file_descriptor_path.exists() @cached_property def isochrone_files(self): return list(map(lambda path: IsochroneFile(path, isochrone=self), sorted(self._path.glob(IsochroneFile._file_format.format('*'))))) @cached_property def file_descriptor_content(self) -> List[str]: if self.has_file_descriptor: with open(self.file_descriptor_path,'r') as f: return f.readline().strip('\n').split() else: raise NotImplementedError("Photometric system doesn't have an IsoFileDescriptor.txt file") @cached_property def __fd_photosysname(self) -> str: return self.file_descriptor_content[0] @cached_property def __fd_fields(self) -> int: return int(self.file_descriptor_content[1]) @cached_property def _fd_startid(self) -> int: return int(self.file_descriptor_content[2]) @cached_property def _fd_nmags(self) -> int: return int(self.file_descriptor_content[3]) @cached_property def magnitude_itemgetter(self) -> itemgetter: return itemgetter(*range(self._fd_startid, self._fd_startid+self._fd_nmags)) @cached_property def mag_names(self) -> List[str]: return self.file_descriptor_content[4:] @cached_property def formatting(self): if False: # sys.version_info >= (3,10): # TODO uncomment below when upgrading officially to 3.10 # match self.name: # case 'WFIRST+HST__WFC3' if self.category == 'padova': # return oldpadova_fomatting_withlogage # case ('WFIRST' | 'LSST' | 'GAIA__0' | 'GAIA__DR2') if self.category == 'padova': # return oldpadova_fomatting # case ('CTIO__DECam' | 'LSST_DP0' | 'Roman' | 'Euclid' | 'JWST') if self.category == 'padova': # return padova_formatting # case _: # return default_formatting pass else: if self.name == 'WFIRST+HST__WFC3' and self.category == 'padova': return oldpadova_fomatting_withlogage if self.name in ['WFIRST','LSST','GAIA__0','GAIA__DR2','HST__WFC3'] and self.category == 'padova': return oldpadova_fomatting if self.name in ['CTIO__DECam','LSST_DP0','Roman','Euclid','JWST__NIRCam','JWST__MIRI'] and self.category == 'padova': return padova_formatting else: return default_formatting @cached_property def qtables_dictionary(self): # log10(fe[i]/0.0152 return {self.formatting.metallicity_converter(iso_file.metallicity): self.formatting.qtable_per_age_from_isochronefile(iso_file) for iso_file in self.isochrone_files} @cached_property def qtables_unique_ages_dictionary(self): return {metallicity: qtables_per_age_dict.keys() for metallicity, qtables_per_age_dict in self.qtables_dictionary.items()} @cached_property def consolidated_points_and_values(self) -> Tuple[NDArray,NDArray]: list_of_points = [] list_of_values = [] for metallicity, unique_ages in self.qtables_unique_ages_dictionary.items(): unique_ages = list(unique_ages) qtables = self.qtables_dictionary[metallicity] for i_age, age in enumerate(unique_ages): qtable = qtables[age] list_of_points.append(qtable[[self._zini, self._age, self._mini]].to_pandas().to_numpy()) list_of_values.append(qtable[self.mag_names].to_pandas().to_numpy()) if qtable[-1][self._lum].value == -9.999: # bit of a stretch for white dwarf case, but that will do for now for what I need if age != unique_ages[-1]: list_of_points.append(list_of_points[-1][[-1]].copy()) list_of_points[-1][:,1] = unique_ages[-1] list_of_values.append(list_of_values[-1][[-1]].copy()) else: # also a stretch, no neutron star, just black holes next_age = unique_ages[i_age+1] max_mini_next_age = qtables[next_age][self._mini].max() dead_next_age_qtable = qtable[qtable[self._mini] > max_mini_next_age] list_of_points.append(dead_next_age_qtable[[self._zini, self._age, self._mini]].to_pandas().to_numpy()) list_of_points[-1][:,1] = next_age list_of_values.append(dead_next_age_qtable[self.mag_names].to_pandas().to_numpy()) list_of_values[-1][:,:] = 999 list_of_points.append(list_of_points[-1][[-1]].copy()) list_of_points[-1][:,1] = unique_ages[-1] list_of_values.append(list_of_values[-1][[-1]].copy()) list_of_lengths = np.array(list(map(len, list_of_points))) successive_indices = np.hstack([0,np.cumsum(list_of_lengths)]) points = np.empty((successive_indices[-1], 3)) values = np.empty((successive_indices[-1], len(self.mag_names))) for first, last, sub_points, sub_values in zip(successive_indices[:-1], successive_indices[1:], list_of_points, list_of_values): points[first:last] = sub_points values[first:last] = sub_values return points, values
if __name__ == '__main__': raise NotImplementedError()