Files
eos/libeos/options.py

561 lines
20 KiB
Python

"""
Classes for stroing various configurations needed for reduction.
"""
import argparse
from dataclasses import dataclass, field, Field, fields, MISSING
from enum import StrEnum
from typing import get_args, get_origin, List, Optional, Tuple, Union
from datetime import datetime
from os import path
import numpy as np
import logging
@dataclass
class CommandlineParameterConfig:
argument: str # default parameter for command line resutign ins "--argument"
add_argument_args: dict # all arguments that will be passed to add_argument method
short_form: Optional[str] = None
group: str = 'misc'
priority: int = 0
def __gt__(self, other):
"""
Sort required arguments first, then use priority, then name
"""
return (not self.add_argument_args.get('required', False), -self.priority, self.argument)>(
not other.add_argument_args.get('required', False), -other.priority, other.argument)
class ArgParsable:
def __init_subclass__(cls):
# create a nice documentation string that takes help into account
cls.__doc__ = cls.__name__ + " Parameters:\n"
for key, typ in cls.__annotations__.items():
if get_origin(typ) is Union and type(None) in get_args(typ):
optional = True
typ = get_args(typ)[0]
else:
optional = False
value = getattr(cls, key, None)
cls.__doc__ += f" {key} ({typ.__name__})"
if isinstance(value, Field):
if value.default is not MISSING:
cls.__doc__ += f" = {value.default}"
if 'help' in value.metadata:
cls.__doc__ += f" - {value.metadata['help']}"
elif value is not None:
cls.__doc__ += f" = {value}"
if optional:
cls.__doc__ += " [Optional]"
cls.__doc__ += "\n"
return cls
@classmethod
def get_commandline_parameters(cls) -> List[CommandlineParameterConfig]:
"""
Return a list of arguments used in building the command line parameters.
Union types besides Optional are not supported.
"""
output = []
for field in fields(cls):
args={}
if field.default is not MISSING:
args['default'] = field.default
args['required'] = False
elif field.default_factory is not MISSING:
args['default'] = field.default_factory()
args['required'] = False
else:
args['required'] = True
if get_origin(field.type) is Union and type(None) in get_args(field.type):
# optional argument
typ = get_args(field.type)[0]
del(args['default'])
else:
typ = field.type
if get_origin(typ) is list:
args['nargs'] = '+'
typ = get_args(typ)[0]
elif get_origin(typ) is tuple:
args['nargs'] = len(get_args(typ))
typ = get_args(typ)[0]
if issubclass(typ, StrEnum):
args['choices'] = [ci.value for ci in typ]
if field.default is not MISSING:
args['default'] = field.default.value
typ = str
if typ is bool:
args['action'] = 'store_false' if field.default else 'store_true'
else:
args['type'] = typ
if 'help' in field.metadata:
args['help'] = field.metadata['help']
output.append(CommandlineParameterConfig(
field.name,
add_argument_args=args,
group=field.metadata.get('group', 'misc'),
short_form=field.metadata.get('short', None),
priority=field.metadata.get('priority', 0),
))
return output
@classmethod
def from_args(cls, args: argparse.Namespace):
"""
Create the child class from the command line argument Namespace object.
All attributes that are not needed for this class are ignored.
"""
inpargs = {}
for field in fields(cls):
value = getattr(args, field.name)
typ = field.type
if get_origin(field.type) is Union and type(None) in get_args(field.type):
# optional argument
typ = get_args(field.type)[0]
if isinstance(typ, type) and issubclass(typ, StrEnum):
# convert str to enum
try:
value = typ(value)
except ValueError:
choices = [ci.value for ci in typ]
raise ValueError(f"Parameter --{field.name} has to be one of {choices}")
inpargs[field.name] = value
return cls(**inpargs)
# definition of command line arguments
@dataclass
class ReaderConfig(ArgParsable):
year: int = field(
default=datetime.now().year,
metadata={
'short': 'Y',
'group': 'input data',
'help': 'year the measurement was performed',
},
)
rawPath: List[str] = field(
default_factory=lambda: ['.', path.join('.','raw'), path.join('..','raw'), path.join('..','..','raw')],
metadata={
'short': 'rp',
'group': 'input data',
'help': 'search paths for hdf files',
},
)
startTime: Optional[float] = field(
default = None,
metadata={
'short': 'st',
'group': 'data manicure',
'help': 'set time zero other than the start of the data aquisition',
},
)
class IncidentAngle(StrEnum):
alphaF = 'alphaF'
mu = 'mu'
nu = 'nu'
class MonitorType(StrEnum):
auto = 'a'
proton_charge = 'p'
time = 't'
neutron_monitor = 'n'
debug = 'x'
@dataclass
class ExperimentConfig(ArgParsable):
chopperPhase: float = field(
default=0,
metadata={
'short': 'cp',
'group': 'instrument settings',
'help': 'phase between opening of chopper 1 and closing of chopper 2 window',
},
)
chopperPhaseOffset: float = field(
default=-5,
metadata={
'short': 'co',
'group': 'instrument settings',
'help': 'phase between chopper 1 index pulse and closing edge',
},
)
chopperSpeed: float = field(
default=500,
metadata={
'short': 'cs',
'group': 'instrument settings',
'help': 'rotation speed of the chopper disks in rpm',
},
)
yRange: Tuple[float, float] = field(
default_factory=lambda: [18, 48],
metadata={
'short': 'y',
'group': 'region of interest',
'help': 'horizontal pixel range on the detector to be used',
},
)
lambdaRange: Tuple[float, float] = field(
default_factory=lambda: [3, 12.5],
metadata={
'short': 'l',
'group': 'region of interest',
'help': 'wavelength range to be used (in angstrom)',
},
)
lowCurrentThreshold: float = field(
default=50,
metadata={
'short': 'pt',
'group': 'instrument settings',
'help': 'proton current below which the events are ignored (per chopper pulse)',
},
)
incidentAngle: str = field(
default=IncidentAngle.alphaF,
metadata={
'short': 'ai',
'group': 'instrument settings',
'help': 'calculate alphaI = [alphaF], [mu]+kappa+delta_kappa or ([nu]+kappa+delta_kappa)/2',
},
)
alphaF = 'alphaF'
sampleModel: Optional[str] = field(
default=None,
metadata={
'short': 'sm',
'group': 'sample',
'help': 'orso type string to describe the sample in one line',
},
)
mu: Optional[float] = field(
default=None,
metadata={
'short': 'mu',
'group': 'sample',
'help': 'inclination of the sample surface w.r.t. the instrument horizon',
},
)
nu: Optional[float] = field(
default=None,
metadata={
'short': 'nu',
'group': 'sample',
'help': 'inclination of the detector w.r.t. the instrument horizon',
},
)
muOffset: Optional[float] = field(
default=0,
metadata={
'short': 'm',
'group': 'sample',
'help': 'correction offset for mu misalignment (mu_real = mu_file + mu_offset)',
},
)
monitorType: MonitorType = field(
default=MonitorType.auto,
metadata={
'short': 'mt',
'group': 'instrument settings',
'help': 'one of [a]uto, [p]rotonCurrent, [t]ime or [n]eutronMonitor',
},
)
class NormalisationMethod(StrEnum):
direct_beam = 'd'
over_illuminated = 'o'
under_illuminated = 'u'
@dataclass
class ReductionConfig(ArgParsable):
qResolution: float = field(
default=0.01,
metadata={
'short': 'r',
'group': 'data manicure',
'help': 'output resolution of q-scale Delta q / q',
},
)
qzRange: Tuple[float, float] = field(
default_factory=lambda: [0.005, 0.51],
metadata={
'short': 'q',
'group': 'region of interest',
'help': '?',
},
)
thetaRange: Tuple[float, float] = field(
default_factory=lambda: [-12., 12.],
metadata={
'short': 't',
'group': 'region of interest',
'help': 'absolute theta region of interest',
},
)
thetaRangeR: Tuple[float, float] = field(
default_factory=lambda: [-0.75, 0.75],
metadata={
'short': 'T',
'group': 'region of interest',
'help': 'theta region of interest w.r.t. beam center',
},
)
fileIdentifier: List[str] = field(
default_factory=lambda: ['0'],
metadata={
'short': 'f',
'priority': 100,
'group': 'input data',
'help': 'file number(s) or offset (if < 1)',
},
)
normalisationMethod: NormalisationMethod = field(
default=NormalisationMethod.over_illuminated,
metadata={
'short': 'nm',
'priority': 90,
'group': 'input data',
'help': 'normalisation method: [o]verillumination, [u]nderillumination, [d]irect_beam'})
scale: List[float] = field(
default_factory=lambda: [1.],
metadata={
'short': 's',
'group': 'data manicure',
'help': '(list of) scaling factors, if less elements than files use the last one',
},
)
# TODO: This made no sense, it is used as single bool.
autoscale: bool = False
#autoscale: Tuple[float, float] = field(
# default_factory=lambda: [None, None],
# metadata={
# 'short': 'S',
# 'group': 'data manicure',
# 'help': '',
# },
# )
subtract: Optional[str] = field(
default=None,
metadata={
'short': 'sub',
'group': 'input data',
'help': 'File with R(q_z) curve to be subtracted (in .Rqz.ort format)'})
normalisationFileIdentifier: Optional[List[str]] = field(
default_factory=lambda: [None],
metadata={
'short': 'n',
'priority': 90,
'group': 'input data',
'help': 'file number(s) of normalisation measurement'})
timeSlize: Optional[List[float]] = field(
default_factory=lambda: [None],
metadata={
'short': 'ts',
'group': 'region of interest',
'help': 'time slizing <interval> ,[<start> [,stop]]',
},
)
def _expand_file_list(self, short_notation:str):
"""Evaluate string entry for file number lists"""
file_list=[]
for i in short_notation.split(','):
if '-' in i:
if ':' in i:
step = i.split(':', 1)[1]
file_list += range(int(i.split('-', 1)[0]),
int((i.rsplit('-', 1)[1]).split(':', 1)[0])+1,
int(step))
else:
step = 1
file_list += range(int(i.split('-', 1)[0]),
int(i.split('-', 1)[1])+1,
int(step))
else:
file_list += [int(i)]
file_list.sort()
return file_list
def data_files(self):
# get input files from expanding fileIdentifier
return list(map(self._expand_file_list, self.fileIdentifier))
def normal_files(self):
return list(map(self._expand_file_list, self.normalisationFileIdentifier))
class OutputFomatOption(StrEnum):
Rqz_ort = "Rqz.ort"
Rqz_orb = "Rqz.orb"
Rlt_ort = "Rlt.ort"
Rlt_orb = "Rlt.orb"
ort = "ort"
orb = "orb"
Rqz = "Rqz"
Rlt = "Rlt"
@dataclass
class OutputConfig(ArgParsable):
outputFormats: List[OutputFomatOption] = field(
default_factory=lambda: ['Rqz.ort'],
metadata={
'short': 'of',
'group': 'output',
'help': 'one of "Rqz[.ort]", "Rlt[.ort]" or both with "ort"',
},
)
outputName: str = field(
default='fromEOS',
metadata={
'short': 'o',
'group': 'output',
'help': '?',
},
)
outputPath: str = field(
default='.',
metadata={
'short': 'op',
'group': 'output',
'help': '?',
},
)
def _output_format_list(self, outputFormat):
format_list = []
if OutputFomatOption.ort in outputFormat\
or OutputFomatOption.Rqz_ort in outputFormat\
or OutputFomatOption.Rqz in outputFormat:
format_list.append(OutputFomatOption.Rqz_ort)
if OutputFomatOption.ort in outputFormat\
or OutputFomatOption.Rlt_ort in outputFormat\
or OutputFomatOption.Rlt in outputFormat:
format_list.append(OutputFomatOption.Rlt_ort)
if OutputFomatOption.orb in outputFormat\
or OutputFomatOption.Rqz_orb in outputFormat\
or OutputFomatOption.Rqz in outputFormat:
format_list.append(OutputFomatOption.Rqz_orb)
if OutputFomatOption.orb in outputFormat\
or OutputFomatOption.Rlt_orb in outputFormat\
or OutputFomatOption.Rlt in outputFormat:
format_list.append(OutputFomatOption.Rlt_orb)
return sorted(format_list, reverse=True)
def __post_init__(self):
self.outputFormats = self._output_format_list(self.outputFormats)
# ===================================
@dataclass
class EOSConfig:
reader: ReaderConfig
experiment: ExperimentConfig
reduction: ReductionConfig
output: OutputConfig
_call_string_overwrite=None
#@property
#def call_string(self)->str:
# if self._call_string_overwrite:
# return self._call_string_overwrite
# else:
# return self.calculate_call_string()
def call_string(self):
base = 'python eos.py'
inpt = ''
if self.reader.year:
inpt += f' -Y {self.reader.year}'
else:
inpt += f' -Y {datetime.now().year}'
if np.shape(self.reader.rawPath)[0] == 1:
inpt += f' --rawPath {self.reader.rawPath}'
if self.reduction.subtract:
inpt += f' -subtract {self.reduction.subtract}'
if self.reduction.normalisationFileIdentifier:
inpt += f' -n {" ".join(self.reduction.normalisationFileIdentifier)}'
if self.reduction.fileIdentifier:
inpt += f' -f {" ".join(self.reduction.fileIdentifier)}'
otpt = ''
if self.reduction.qResolution:
otpt += f' -r {self.reduction.qResolution}'
if self.output.outputPath != '.':
inpt += f' --outputdPath {self.output.outputPath}'
if self.output.outputName:
otpt += f' -o {self.output.outputName}'
if self.output.outputFormats != ['Rqz.ort']:
otpt += f' -of {" ".join(self.output.outputFormats)}'
mask = ''
# TODO: Check if you want these parameters for the case of default call
mask += f' -y {" ".join(str(ii) for ii in self.experiment.yRange)}'
mask += f' -l {" ".join(str(ff) for ff in self.experiment.lambdaRange)}'
mask += f' -t {" ".join(str(ff) for ff in self.reduction.thetaRange)}'
mask += f' -T {" ".join(str(ff) for ff in self.reduction.thetaRangeR)}'
mask += f' -q {" ".join(str(ff) for ff in self.reduction.qzRange)}'
para = ''
# TODO: Check if we want these parameters for defaults
para += f' --chopperPhase {self.experiment.chopperPhase}'
para += f' --chopperPhaseOffset {self.experiment.chopperPhaseOffset}'
if self.experiment.mu:
para += f' --mu {self.experiment.mu}'
elif self.experiment.muOffset:
para += f' --muOffset {self.experiment.muOffset}'
if self.experiment.nu:
para += f' --nu {self.experiment.nu}'
modl = ''
if self.experiment.sampleModel:
modl += f" --sampleModel '{self.experiment.sampleModel}'"
acts = ''
if self.reduction.autoscale:
acts += f' --autoscale {" ".join(str(ff) for ff in self.reduction.autoscale)}'
# TODO: Check if should be shown if not default
acts += f' --scale {self.reduction.scale}'
if self.reduction.timeSlize:
acts += f' --timeSlize {" ".join(str(ff) for ff in self.reduction.timeSlize)}'
mlst = base + inpt + otpt
if mask:
mlst += mask
if para:
mlst += para
if acts:
mlst += acts
if modl:
mlst += modl
if len(mlst) > 70:
mlst = base + ' ' + inpt + ' ' + otpt
if mask:
mlst += ' ' + mask
if para:
mlst += ' ' + para
if acts:
mlst += ' ' + acts
if modl:
mlst += ' ' + modl
logging.debug(f'Argument list build in EOSConfig.call_string: {mlst}')
return mlst