Source code for galaxia_ananke.photometry.IsochroneFile
#!/usr/bin/env python
#
# Author: Adrien CR Thob
# Copyright (C) 2022 Adrien CR Thob
#
# This file is part of the py-Galaxia-ananke project,
# <https://github.com/athob/py-Galaxia-ananke>, which is licensed
# under the GNU Affero General Public License v3.0 (AGPL-3.0).
#
# The full copyright notice, including terms governing use, modification,
# and redistribution, is contained in the files LICENSE and COPYRIGHT,
# which can be found at the root of the source code distribution tree:
# - LICENSE <https://github.com/athob/py-Galaxia-ananke/blob/main/LICENSE>
# - COPYRIGHT <https://github.com/athob/py-Galaxia-ananke/blob/main/COPYRIGHT>
#
"""
Docstring
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Union
from functools import cached_property
import pathlib
import re
import pandas as pd
from astropy.io import ascii
from .._constants import *
if TYPE_CHECKING:
from .Isochrone import Isochrone
__all__ = ['IsochroneFile']
[docs]
class IsochroneFile:
_file_format = "output_{}.dat"
[docs]
def __init__(self, *args, isochrone: Isochrone = None) -> None:
if isochrone is None:
raise TypeError(f"Keyword argument 'isochrone' missing")
self._isochrone = isochrone
if not args:
raise TypeError("Isochrone requires at least one argument")
elif len(args) in [1, 2]:
self._path = pathlib.Path(args[0])
if len(args) == 2:
self._write_table(args[1])
else:
raise TypeError(f"Too many arguments ({len(args)} given)")
def __repr__(self) -> str:
cls = self.__class__.__name__
description = ', '.join([(f"{prop}={getattr(self, prop)}") for prop in ['filename']])
return f'{cls}({description})'
def _write_table(self, data: Union[pd.DataFrame, ascii.core.Table]) -> None:
if isinstance(data, pd.DataFrame):
data = ascii.core.Table.from_pandas(data)
elif not isinstance(data, ascii.core.Table):
raise ValueError("Given data should either be a pandas DataFrame or an astropy Table")
data.sort([self.isochrone._age, self.isochrone._mini])
data.write(self.path, format='ascii.commented_header')
def _open(self, *args, **kwargs):
return open(self._path, *args, **kwargs)
@property
def path(self):
return self._path
@property
def filename(self):
return self._path.name
@property
def metallicity(self):
return float(re.findall("(?<={})(.*)(?={})".format(*tuple(self._file_format.split('{}'))),
self.filename)[0])
@cached_property
def column_names(self):
with self._open('r') as f:
_temp = '#'
while (_temp[0] == '#') if _temp else False:
_temp = f.readline()
if (_temp[0] == '#') if _temp else False:
header = _temp
header = header.strip('#').strip(' ').strip('\n').replace('\t',' ')
while header.count(' '):
header = header.replace(' ', ' ')
return header.split(' ')
@cached_property
def data(self) -> ascii.core.Table:
return ascii.read(self._path, names=self.column_names)
@property
def isochrone(self):
return self._isochrone
if __name__ == '__main__':
raise NotImplementedError()