from .StarList import StarList
from .file_helpers import *
import pandas as pd
from .fileformats import *
DAO_file_firstline = ' NL NX NY LOWBAD HIGHBAD THRESH AP1 PH/ADU RNOISE FRAD'
[docs]def convert_dao_type(starlist, new_daotype, update_daotype=True):
# type: (StarList, DAO.FType) -> bool
"""Converts from one daotype to another
Only subset of conversions supported. You can also provide own map (directory) of column names"""
for col in new_daotype.columns:
if col not in starlist.columns:
ct = _get_col_type(new_daotype.extension, col)
if not ct.optional:
if ct.default is None:
return False
else:
starlist.loc[:, col] = ct.default # new col
if update_daotype:
starlist.DAO_type = new_daotype
return True
[docs]def read_dao_file(file, dao_type = None):
"""
Construct StarList from daophot output file.
The header lines in file may be missing.
:rtype: StarList
:param file: open stream or filename, if stream dao_type must be specified
:param dao_type: file format, one of DAO.XXX_FILE constants:
- DAO.COO_FILE
- DAO.AP_FILE
- DAO.LST_FILE
- DAO.NEI_FILE
- DAO.ALS_FILE
If missing filename extension will be used to determine file type
if file is provided as filename
:return: StarList instance
"""
ret = _parse_file(file, dao_type)
return ret
[docs]def write_dao_file(starlist, file, dao_type=None, with_header=None):
"""
Write StarList object into daophot file.
:param starlist: StarList instance to be writen
:param file: writable stream or filename, if stream dao_type must be specified
:param dao_type: file format, one of DAO.XXX_FILE constants:
- DAO.COO_FILE
- DAO.AP_FILE
- DAO.LST_FILE
- DAO.NEI_FILE
- DAO.ALS_FILE
If missing extension of file will be used to determine file type
if file is provided as filename
:param with_header: True, False or None. If None header will be written if not None in starlist
:rtype: None
"""
if dao_type is None:
if starlist.DAO_type is None:
raise Exception('Can not determine file format')
dao_type = starlist.DAO_type
elif dao_type != starlist.DAO_type:
converted = convert_dao_type(starlist, dao_type, update_daotype=False)
if not converted:
raise Exception('Can not convert columns {} into {} '.format(starlist.columns, dao_type.columns))
_write_file(starlist, file, dao_type, with_header=with_header)
def _get_col_type(file_ext, column):
# type: (str, str) -> DAO.CType
coltype = DAO.columns.get((file_ext, column)) # lookup for (filetype,col)
if not coltype:
coltype = DAO.columns.get(column) # lookup for col
if not coltype:
coltype = DAO.columns['_default']
return coltype
def _write_file(starlist, file, dao_type, with_header=None):
f, to_close = get_stream(file, 'w')
if with_header != False:
if starlist.DAO_hdr is not None:
starlist.DAO_hdr['NL'] = dao_type.NL
write_dao_header(starlist.DAO_hdr, f)
f.write('\n')
elif with_header:
raise Exception('Attempt to save dao file with header from StarList with DAO_hdr=None')
_write_table(starlist, f, dao_type)
close_files(to_close)
[docs]def dump_dao_hdr(hdr, line_prefix=''):
"""
returns two line string representation of header dictionary
:param dict hdr: dao header dictionary like StarList.DAO_hdr
:param str line_prefix: add this prefix at beginning of every line (e.g. comment char)
:rtype:str
"""
first_line = ''
second_line = ''
second_line_frmt = ['{:3.0f}', '{:6.0f}', '{:6.0f}', '{:8.1f}', '{:8.1f}', '{:8.2f}',
'{:8.2f}', '{:8.2f}', '{:8.2f}', '{:8.2f}']
second_line_frmt.reverse()
try:
for token in DAO_file_firstline.split(' '):
if token != '':
val = hdr[token]
first_line += token
fmt = second_line_frmt.pop()
second_line += fmt.format(float(val))
first_line += ' '
except KeyError:
# sometimes header is shorter, it's OK'
pass
return line_prefix + first_line + '\n' + line_prefix + second_line + '\n'
def _write_table(starlist, file, dao_type):
# preapre columns (from daotype in order but only existing in starlist)
pd.options.mode.chained_assignment = None # default='warn'
columns = [c for c in dao_type.columns if c in starlist.columns]
coltypes = [_get_col_type(dao_type.extension, c) for c in columns]
towrite = starlist[columns]
for i, row in towrite.iterrows():
for col, coltype, val in zip(columns, coltypes, row):
if pd.isnull(val):
val = coltype.NaN[0]
file.write(coltype.format.format(val))
file.write('\n')
def _parse_file(file, dao_type):
if dao_type is None and isinstance(file, str):
_, ext = os.path.splitext(file)
dao_type = DAO.file_types.get(ext)
f, to_close = get_stream(file, 'r')
try:
hdr, _ = read_dao_header(f)
fl = _parse_table(f, hdr, dao_type)
finally:
close_files(to_close)
fl.DAO_hdr = hdr
return fl
[docs]def parse_dao_hdr(hdr, val, line_prefix=''):
"""
creates dao header dict form two lines of file header
:param str hdr: first line
:param str val: second line
:param line_prefix: expected line prefix
:return: dict with dao header compatible with StarList.DAO_header
"""
# cut prefix
hdr = hdr[len(line_prefix):] # first line
val = val[len(line_prefix):] # second line
# split keys, split values, zip them into list of tuples, then create dictionary out of that
return dict(zip(hdr.split(), val.split()))
def _parse_table(f, hdr, dao_type):
if dao_type is not None and dao_type.read_cols is not None: # limit number of read cols
df = pd.read_table(f, header=None, sep='\s+', usecols=range(dao_type.read_cols))
else:
df = pd.read_table(f, header=None, sep='\s+')
#df.insert(0, 'id', df.index.to_series())
if dao_type is None:
dao_type = _guess_filetype(hdr, df)
if dao_type == DAO.AP_FILE: # two row per star format correction
odd = df.iloc[0::2]
odd.columns = DAO.AP_FILE_ODD.columns[:odd.columns.size]
even = df.iloc[1::2]
even.columns = DAO.AP_FILE_EVEN.columns[:even.columns.size]
even.index = odd.index
df = odd.join(even, rsuffix='foo')
else:
df.columns = dao_type.columns[:df.columns.size]
df.id = df.id.astype(int)
df.index = df.id
# find NaN
for col in df.columns:
coltype = _get_col_type(dao_type.extension, col)
if coltype.NaN:
df[col].replace(coltype.NaN, pd.np.nan, inplace=True)
ret = StarList(df)
ret.DAO_type = dao_type
return ret
def _guess_filetype(header, table):
type = DAO.UNKNOWN_FILE
if header:
colno = table.columns.size
NL = int(header['NL'])
type = DAO.UNKNOWN_FILE
if NL == 1:
if colno == 7:
type = DAO.COO_FILE
elif colno == 9:
type = DAO.ALS_FILE
elif colno == 5:
type = DAO.SHORT_FILE
elif NL == 2:
type = DAO.AP_FILE
elif NL == 3:
if colno == 6:
type = DAO.LST_FILE
elif colno == 5:
type = DAO.NEI_FILE
return type