""" Classes for stroing various configurations needed for reduction. """ import argparse from dataclasses import dataclass, field, Field, fields, MISSING from enum import StrEnum from typing import get_args, get_origin, List, Optional, Tuple, Union from datetime import datetime from os import path import numpy as np import logging class Defaults: # fileIdentifier outputPath = '.' rawPath = ['.', path.join('.','raw'), path.join('..','raw'), path.join('..','..','raw')] year = datetime.now().year normalisationFileIdentifier = [] normalisationMethod = 'o' monitorType = 'auto' # subtract outputName = "fromEOS" outputFormat = ['Rqz.ort'] incidentAngle = 'alphaF' qResolution = 0.01 #timeSlize scale = [1] # autoscale lambdaRange = [2., 15.] thetaRange = [-12., 12.] thetaRangeR = [-0.75, 0.75] yRange = [11, 41] qzRange = [0.005, 0.51] chopperSpeed = 500 chopperPhase = 0.0 chopperPhaseOffset = -9.1 muOffset = 0 mu = 0 nu = 0 sampleModel = None lowCurrentThreshold = 50 # @dataclass class CommandlineParameterConfig: argument: str # default parameter for command line resutign ins "--argument" add_argument_args: dict # all arguments that will be passed to add_argument method short_form: Optional[str] = None group: str = 'misc' priority: int = 0 def __gt__(self, other): """ Sort required arguments first, then use priority, then name """ return (not self.add_argument_args.get('required', False), -self.priority, self.argument)>( not other.add_argument_args.get('required', False), -other.priority, other.argument) class ArgParsable: def __init_subclass__(cls): # create a nice documentation string that takes help into account cls.__doc__ = cls.__name__ + " Parameters:\n" for key, typ in cls.__annotations__.items(): if get_origin(typ) is Union and type(None) in get_args(typ): optional = True typ = get_args(typ)[0] else: optional = False value = getattr(cls, key, None) cls.__doc__ += f" {key} ({typ.__name__})" if isinstance(value, Field): if value.default is not MISSING: cls.__doc__ += f" = {value.default}" if 'help' in value.metadata: cls.__doc__ += f" - {value.metadata['help']}" elif value is not None: cls.__doc__ += f" = {value}" if optional: cls.__doc__ += " [Optional]" cls.__doc__ += "\n" return cls @classmethod def get_commandline_parameters(cls) -> List[CommandlineParameterConfig]: """ Return a list of arguments used in building the command line parameters. Union types besides Optional are not supported. """ output = [] for field in fields(cls): args={} if field.default is not MISSING: args['default'] = field.default args['required'] = False elif field.default_factory is not MISSING: args['default'] = field.default_factory() args['required'] = False else: args['required'] = True if get_origin(field.type) is Union and type(None) in get_args(field.type): # optional argument typ = get_args(field.type)[0] del(args['default']) else: typ = field.type if get_origin(typ) is list: args['nargs'] = '+' typ = get_args(typ)[0] elif get_origin(typ) is tuple: args['nargs'] = len(get_args(typ)) typ = get_args(typ)[0] if issubclass(typ, StrEnum): args['choices'] = [ci.value for ci in typ] if field.default is not MISSING: args['default'] = field.default.value typ = str if typ is bool: args['action'] = 'store_false' if field.default else 'store_true' else: args['type'] = typ if 'help' in field.metadata: args['help'] = field.metadata['help'] output.append(CommandlineParameterConfig( field.name, add_argument_args=args, group=field.metadata.get('group', 'misc'), short_form=field.metadata.get('short', None), priority=field.metadata.get('priority', 0), )) return output @classmethod def from_args(cls, args: argparse.Namespace): """ Create the child class from the command line argument Namespace object. All attributes that are not needed for this class are ignored. """ inpargs = {} for field in fields(cls): value = getattr(args, field.name) typ = field.type if get_origin(field.type) is Union and type(None) in get_args(field.type): # optional argument typ = get_args(field.type)[0] if issubclass(typ, StrEnum): # convert str to enum try: value = typ(value) except ValueError: choices = [ci.value for ci in typ] raise ValueError(f"Parameter --{field.name} has to be one of {choices}") inpargs[field.name] = value return cls(**inpargs) @dataclass class ReaderConfig(ArgParsable): year: int = field(default=datetime.now().year, metadata={'short': 'Y', 'group': 'input data', 'help': 'year the measurement was performed'}) rawPath: List[str] = field(default_factory=lambda: ['.', path.join('.','raw'), path.join('..','raw'), path.join('..','..','raw')], metadata={ 'short': 'rp', 'group': 'input data', 'help': 'Search paths for hdf files'}) startTime: Optional[float] = None class IncidentAngle(StrEnum): alphaF = 'alphaF' mu = 'mu' nu = 'nu' class MonitorType(StrEnum): auto = 'a' proton_charge = 'p' time = 't' neutron_monitor = 'n' debug = 'x' @dataclass class ExperimentConfig(ArgParsable): chopperPhase: float chopperSpeed: float yRange: Tuple[float, float] lambdaRange: Tuple[float, float] lowCurrentThreshold: float incidentAngle: IncidentAngle = IncidentAngle.alphaF sampleModel: Optional[str] = None chopperPhaseOffset: float = 0 mu: Optional[float] = None nu: Optional[float] = None muOffset: Optional[float] = None monitorType: MonitorType = field(default=MonitorType.auto, metadata={'short': 'mt', 'group': 'input data', 'help': 'one of [a]uto, [p]rotonCurrent, [t]ime or [n]eutronMonitor'}) class NormalisationMethod(StrEnum): direct_beam = 'd' over_illuminated = 'o' under_illuminated = 'u' @dataclass class ReductionConfig(ArgParsable): qResolution: float qzRange: Tuple[float, float] thetaRange: Tuple[float, float] #thetaRangeR: Tuple[float, float] thetaRangeR: List[float] fileIdentifier: List[str] = field(metadata={'short': 'f', 'priority': 100, 'group': 'input data', 'help': 'file number(s) or offset (if < 1)'}) normalisationMethod: NormalisationMethod = field(default=NormalisationMethod.over_illuminated, metadata={'short': 'nm', 'priority': 90, 'group': 'input data', 'help': 'normalisation method: [o]verillumination, [u]nderillumination, [d]irect_beam'}) scale: List[float] = field(default_factory=lambda: [1.]) #per file scaling; if less elements than files use the last one autoscale: bool = False # TODO: This made no sense, it is used as single bool. subtract: Optional[str] = field(default=None, metadata={'short': 'sub', 'group': 'input data', 'help': 'File with R(q_z) curve to be subtracted (in .Rqz.ort format)'}) normalisationFileIdentifier: Optional[List[str]] = field(default=None, metadata={'short': 'n', 'priority': 90, 'group': 'input data', 'help': 'file number(s) of normalisation measurement'}) timeSlize: Optional[List[float]] = None @dataclass class OutputConfig(ArgParsable): outputFormats: List[str] outputName: str outputPath: str @dataclass class EOSConfig: reader: ReaderConfig experiment: ExperimentConfig reduction: ReductionConfig output: OutputConfig _call_string_overwrite=None #@property #def call_string(self)->str: # if self._call_string_overwrite: # return self._call_string_overwrite # else: # return self.calculate_call_string() def call_string(self): base = 'python eos.py' inpt = '' if self.reader.year: inpt += f' -Y {self.reader.year}' else: inpt += f' -Y {datetime.now().year}' if np.shape(self.reader.rawPath)[0] == 1: inpt += f' --rawPath {self.reader.rawPath}' if self.reduction.subtract: inpt += f' -subtract {self.reduction.subtract}' if self.reduction.normalisationFileIdentifier: inpt += f' -n {" ".join(self.reduction.normalisationFileIdentifier)}' if self.reduction.fileIdentifier: inpt += f' -f {" ".join(self.reduction.fileIdentifier)}' otpt = '' if self.reduction.qResolution: otpt += f' -r {self.reduction.qResolution}' if self.output.outputPath != '.': inpt += f' --outputdPath {self.output.outputPath}' if self.output.outputName: otpt += f' -o {self.output.outputName}' if self.output.outputFormats != ['Rqz.ort']: otpt += f' -of {" ".join(self.output.outputFormats)}' mask = '' if self.experiment.yRange != Defaults.yRange: mask += f' -y {" ".join(str(ii) for ii in self.experiment.yRange)}' if self.experiment.lambdaRange!= Defaults.lambdaRange: mask += f' -l {" ".join(str(ff) for ff in self.experiment.lambdaRange)}' if self.reduction.thetaRange != Defaults.thetaRange: mask += f' -t {" ".join(str(ff) for ff in self.reduction.thetaRange)}' elif self.reduction.thetaRangeR != Defaults.thetaRangeR: mask += f' -T {" ".join(str(ff) for ff in self.reduction.thetaRangeR)}' if self.experiment.qzRange!= Defaults.qzRange: mask += f' -q {" ".join(str(ff) for ff in self.experiment.qzRange)}' para = '' if self.experiment.chopperPhase != Defaults.chopperPhase: para += f' --chopperPhase {self.experiment.chopperPhase}' if self.experiment.chopperPhaseOffset != Defaults.chopperPhaseOffset: para += f' --chopperPhaseOffset {self.experiment.chopperPhaseOffset}' if self.experiment.mu: para += f' --mu {self.experiment.mu}' elif self.experiment.muOffset: para += f' --muOffset {self.experiment.muOffset}' if self.experiment.nu: para += f' --nu {self.experiment.nu}' modl = '' if self.experiment.sampleModel: modl += f" --sampleModel '{self.experiment.sampleModel}'" acts = '' if self.reduction.autoscale: acts += f' --autoscale {" ".join(str(ff) for ff in self.reduction.autoscale)}' if self.reduction.scale != Defaults.scale: acts += f' --scale {self.reduction.scale}' if self.reduction.timeSlize: acts += f' --timeSlize {" ".join(str(ff) for ff in self.reduction.timeSlize)}' mlst = base + inpt + otpt if mask: mlst += mask if para: mlst += para if acts: mlst += acts if modl: mlst += modl if len(mlst) > 70: mlst = base + ' ' + inpt + ' ' + otpt if mask: mlst += ' ' + mask if para: mlst += ' ' + para if acts: mlst += ' ' + acts if modl: mlst += ' ' + modl logging.debug(f'Argument list build in EOSConfig.call_string: {mlst}') return mlst