1981 lines
82 KiB
Python
1981 lines
82 KiB
Python
"""
|
|
@package pmsco.project
|
|
project-independent classes which store and handle model parameters.
|
|
|
|
the most important class defined here is Project.
|
|
each calculation project needs to derive its own project class from it.
|
|
the ModelSpace and CalculatorParams classes are typically used unchanged.
|
|
|
|
@note nomenclature: the term @e parameters has several meanings in the code and documentation.
|
|
the following distinctive terms are used in updated documentation sections.
|
|
ambiguous terms may still be present in older code sections.
|
|
@arg <em>calculation parameters</em> set of specific parameters passed as input to the calculation programs.
|
|
the amount and meaning of these parameters depend on the calculation code used.
|
|
typically, many of these parameters remain fixed, or change very rarely in the course of the study.
|
|
@arg <em>model parameters</em> concise set of independent physical parameters
|
|
that define the system in one calculation instance.
|
|
these parameters are varied systematically by the optimization process.
|
|
they are mapped to calculation parameters and a cluster by code derived from the Project class.
|
|
|
|
@author Matthias Muntwiler, matthias.muntwiler@psi.ch
|
|
|
|
@copyright (c) 2015-25 by Paul Scherrer Institut @n
|
|
Licensed under the Apache License, Version 2.0 (the "License"); @n
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
"""
|
|
|
|
import collections
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import numpy as np
|
|
import numpy.typing as npt
|
|
from pathlib import Path
|
|
import re
|
|
from typing import Any, Callable, Dict, Generator, Iterable, List, Mapping, Optional, Sequence, Set, Tuple, Union
|
|
|
|
from pmsco.calculators.calculator import Calculator, InternalAtomicCalculator
|
|
from pmsco.calculators.edac import EdacCalculator
|
|
from pmsco.cluster import Cluster
|
|
import pmsco.config as config
|
|
import pmsco.data
|
|
import pmsco.database.project as db_project
|
|
from pmsco.dispatch import CalcID, CalculationTask
|
|
from pmsco.files import FileTracker, FILE_CATEGORIES_TO_KEEP
|
|
from pmsco.handlers import (DomainHandler, EmitterHandler, EnergyRegionHandler, ScanHandler,
|
|
SingleModelHandler, SingleRegionHandler, TaskHandler)
|
|
from pmsco.helpers import BraceMessage as BMsg
|
|
from pmsco.optimizers.genetic import GeneticOptimizationHandler
|
|
from pmsco.optimizers.swarm import ParticleSwarmHandler
|
|
from pmsco.optimizers.grid import GridSearchHandler
|
|
from pmsco.optimizers.table import TableModelHandler
|
|
from pmsco.reports.base import ProjectReport
|
|
from pmsco.scan import Scan, ScanLoader, ScanCreator
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
ParamSpace = collections.namedtuple('ParamSpace', ['start', 'min', 'max', 'step'])
|
|
|
|
Numeric = Union[int, float, np.number]
|
|
PathLike = Union[str, os.PathLike]
|
|
|
|
|
|
class ModelSpace(config.ConfigurableObject):
|
|
"""
|
|
Domain of model parameters.
|
|
|
|
The model space declares the model parameters and defines their domain.
|
|
A dimension can have a finite range or fixed value.
|
|
In the case of a range, start and step values can be given.
|
|
|
|
Parameter names can be defined almost freely by the project.
|
|
They must contain only alphanumeric and underscore characters.
|
|
Names starting with an underscore are reserved for the optimizers.
|
|
|
|
The object storage is organized by start, min, max and step dictionaries containing the parameters.
|
|
The `add_param` and `get_param` give access to a `ParamSpace` structure by parameter name.
|
|
"""
|
|
|
|
## @var start (dict)
|
|
# Dictionary of start values for each model parameter.
|
|
#
|
|
# The start value can be the initial guess for an optimization run,
|
|
# or the actual value for a single calculation.
|
|
#
|
|
# There must be one item for each model parameter,
|
|
# where the key is the name of the parameter, and the value its physical value.
|
|
|
|
## @var min (dict)
|
|
# Dictionary of minimum values for each model parameter.
|
|
#
|
|
# The minimum defines the lower bound of the allowed interval for a model parameter.
|
|
#
|
|
# There must be one item for each model parameter,
|
|
# where the key is the name of the parameter, and the value its physical value.
|
|
|
|
## @var max (dict)
|
|
# Dictionary of maximum values for each model parameter.
|
|
#
|
|
# The maximum defines the upper bound of the allowed interval for a model parameter.
|
|
#
|
|
# There must be one item for each model parameter,
|
|
# where the key is the name of the parameter, and the value its physical value.
|
|
|
|
## @var step (dict)
|
|
# Dictionary of step sizes for each model parameter.
|
|
#
|
|
# Depending on the optimization mode, the step is a guess of how fast values should vary,
|
|
# e.g. step size, gradient, velocity, ...
|
|
#
|
|
# There must be one item for each model parameter,
|
|
# where the key is the name of the parameter, and the value its physical value.
|
|
|
|
def __init__(self):
|
|
"""
|
|
initialize the domain object with empty dictionaries.
|
|
"""
|
|
super().__init__()
|
|
self.start: Dict[str, Numeric] = {}
|
|
self.min: Dict[str, Numeric] = {}
|
|
self.max: Dict[str, Numeric] = {}
|
|
self.step: Dict[str, Numeric] = {}
|
|
|
|
def _eval_param_value(self, expr: Optional[Union[Numeric, str]]) -> Numeric:
|
|
"""
|
|
Evaluate a parameter expression
|
|
|
|
If the expression has a numeric type, it is cast to numpy.float64.
|
|
Else, it is evaluated using Python's `eval` function.
|
|
The expression may use the symbols from `self.project_symbols`.
|
|
These normally include built-in functions as well as the `math` and `numpy` modules.
|
|
|
|
This function is used to parse expressions from a runfile.
|
|
|
|
@param expr: Numeric value or string expression.
|
|
numpy.nan or None result in numpy.nan.
|
|
Empty string results in a ValueError.
|
|
@return: numpy.float64, numpy.nan if expr has a wrong type (e.g. NoneType)
|
|
@raise Exceptions that occur during evaluation of a string expression are passed on.
|
|
"""
|
|
|
|
try:
|
|
value = np.float64(expr)
|
|
except ValueError:
|
|
if expr:
|
|
value = np.float64(eval(expr, self.project_symbols))
|
|
else:
|
|
raise
|
|
except TypeError:
|
|
value = np.nan
|
|
|
|
return value
|
|
|
|
def add_param(self,
|
|
name: str,
|
|
start: Numeric,
|
|
min: Optional[Numeric] = None,
|
|
max: Optional[Numeric] = None,
|
|
step: Optional[Numeric] = None,
|
|
width: Optional[Numeric] = None) -> None:
|
|
|
|
"""
|
|
Set the domain of one parameter with all necessary values at once.
|
|
|
|
The exact meaning of the arguments depends on the calculation mode.
|
|
|
|
The parameters can be given as floats or expressions that evaluate to scalar values.
|
|
Expressions may use built-in functions and the math or numpy module.
|
|
|
|
@param name Name of the parameter (alphanumeric and underscore characters only).
|
|
It is recommended to use short but distinctive names.
|
|
|
|
@param start Start value.
|
|
|
|
@param min lower bound of the parameter interval.
|
|
Must be lower or equal to start.
|
|
If None, the field is set to start.
|
|
|
|
@param max Upper bound of the parameter interval.
|
|
Must be greater or equal to start.
|
|
If None, the field is set to start.
|
|
|
|
@param width Width of the parameter interval.
|
|
Instead of min and max, the interval can be set centered around the start value.
|
|
This is equivalent to min = start - width/2, max = start + width/2.
|
|
This argument overrides min and max. Don't use both arguments.
|
|
|
|
@param step Step size.
|
|
Must be greater or equal to zero.
|
|
If None, the field is set to zero.
|
|
|
|
@raise ValueError if invalid values of required arguments are given.
|
|
"""
|
|
|
|
if not name:
|
|
raise ValueError("Parameter name cannot be empty")
|
|
|
|
try:
|
|
start = self._eval_param_value(start)
|
|
except (AttributeError, KeyError, TypeError, ValueError, SyntaxError):
|
|
raise ValueError(f"Invalid start value of parameter {name}: {start}")
|
|
if np.isnan(start):
|
|
raise ValueError(f"Missing start value of parameter {name}")
|
|
|
|
try:
|
|
min = self._eval_param_value(min) if min is not None else np.nan
|
|
except (AttributeError, KeyError, NameError, TypeError, ValueError, SyntaxError):
|
|
raise ValueError(f"Invalid min value of parameter {name}: {min}")
|
|
|
|
try:
|
|
max = self._eval_param_value(max) if max is not None else np.nan
|
|
except (AttributeError, KeyError, NameError, TypeError, ValueError, SyntaxError):
|
|
raise ValueError(f"Invalid max value of parameter {name}: {max}")
|
|
|
|
try:
|
|
width = self._eval_param_value(width) if width is not None else np.nan
|
|
except (AttributeError, KeyError, NameError, TypeError, ValueError, SyntaxError):
|
|
raise ValueError(f"Invalid width value of parameter {name}: {width}")
|
|
|
|
try:
|
|
step = self._eval_param_value(step) if step is not None else np.nan
|
|
except (AttributeError, KeyError, NameError, TypeError, ValueError, SyntaxError):
|
|
raise ValueError(f"Invalid step value of parameter {name}: {step}")
|
|
else:
|
|
if step < 0:
|
|
raise ValueError(f"Invalid step value of parameter {name}: {step}")
|
|
|
|
self.start[name] = start
|
|
self.min[name] = min if not np.isnan(min) else start
|
|
self.max[name] = max if not np.isnan(max) else start
|
|
if not np.isnan(width):
|
|
self.min[name] = start - width / 2.
|
|
self.max[name] = start + width / 2.
|
|
self.step[name] = step if not np.isnan(step) else 0.0
|
|
|
|
def get_param(self, name: str) -> ParamSpace:
|
|
"""
|
|
Get all values of a model parameter in a named tuple.
|
|
|
|
@param name Name of the parameter.
|
|
|
|
@return named tuple `ParamSpace(start, min, max, step)` of the parameter.
|
|
|
|
@raise IndexError if the parameter is not defined.
|
|
"""
|
|
|
|
return ParamSpace(self.start[name], self.min[name], self.max[name], self.step[name])
|
|
|
|
def set_param_dict(self, d: Dict[str, Dict[str, Numeric]]) -> None:
|
|
"""
|
|
Initialize model space from dictionary.
|
|
|
|
@param d: Dictionary with two levels:
|
|
The top level are parameter names,
|
|
the second level the space descriptors 'start', 'min', 'max', 'step' and 'width'.
|
|
The values can be numeric values or expressions that evaluate to scalar values.
|
|
See add_param() for possible combinations and accepted values.
|
|
@return: None
|
|
"""
|
|
|
|
self.__init__()
|
|
for k, v in d.items():
|
|
self.add_param(k, **v)
|
|
|
|
def get_param_dict(self) -> Dict[str, Dict[str, Numeric]]:
|
|
"""
|
|
Return model space parameters in dictionary form.
|
|
|
|
The top level are parameter names,
|
|
the second level the space descriptors 'start', 'min', 'max' and 'step'.
|
|
|
|
@return: dict
|
|
"""
|
|
|
|
d = {}
|
|
for name in self.start:
|
|
d[name] = {'start': self.start[name], 'min': self.min[name], 'max': self.max[name], 'step': self.step[name]}
|
|
|
|
return d
|
|
|
|
|
|
class CalculatorParams:
|
|
"""
|
|
Calculation parameters for a single scattering calculation job.
|
|
|
|
This class holds all the calculation parameters that are passed via input file to the calculation program.
|
|
|
|
The class can hold parameters for both the MSC and EDAC codes.
|
|
Some parameters are used by both codes, others are used just by one of them.
|
|
Newer features such as multiple emitters, multiple domains, and others are supported in EDAC mode only.
|
|
MSC mode is currently not maintained.
|
|
|
|
Objects of this class are created by the implementation of the create_params() method
|
|
of the actual project class.
|
|
"""
|
|
|
|
## @var angular_resolution (float)
|
|
# FWHM angular resolution of the detector.
|
|
#
|
|
# maps to:
|
|
# @arg emission angle window (EDAC)
|
|
# @arg angular_broadening (MSC)
|
|
|
|
## @var binding_energy (float)
|
|
# initial state binding energy with respect to the Fermi level in eV
|
|
#
|
|
|
|
## @var initial_state (str)
|
|
# initial state
|
|
#
|
|
# 1s, 2p, 2p1/2, etc.
|
|
#
|
|
|
|
## @var phase_files (dict)
|
|
# dictionary of phase or scattering matrix element files.
|
|
#
|
|
# the keys are atomic numbers, the values file names.
|
|
# whether the files contain phase shifts or matrix elements depends on the calculator.
|
|
# EDAC determines the kind of information from the first line in the file.
|
|
#
|
|
# if the dictionary is empty or the files don't exist,
|
|
# the scattering matrix is computed by the calculator (if supported).
|
|
#
|
|
# maps to:
|
|
# @arg scatterer (EDAC)
|
|
# @arg atomic_number, phase_file (MSC)
|
|
|
|
## @var phase_output_classes (int or iterable of int)
|
|
# atom classes for which to output phase files
|
|
#
|
|
# if the atomic scattering factors are calculated internally,
|
|
# EDAC can export them to scattering files.
|
|
#
|
|
# this parameter can be one of
|
|
# @arg None (default) no phase output,
|
|
# @arg integer number defining a range 0:N-1 of atom classes,
|
|
# @arg iterable (e.g., set or sequence) of atom classes to export.
|
|
#
|
|
# the problem is that EDAC expects the user to list each atom class to export,
|
|
# though it is not possible to know how many classes there will be
|
|
# or which atoms belong to which class before the calculation is actually done.
|
|
# the number of classes will be between the number of different elements and the number of atoms.
|
|
#
|
|
# thus, this parameter should normally be left at its default value
|
|
# and used only in specific situations that can be processed manually.
|
|
# if the parameter is non-default, EDAC will also produce a cluster output
|
|
# that includes a mapping between atomic coordinates and atom classes.
|
|
#
|
|
# @note the files generated belong to the category "output".
|
|
# you need to specify `--keep-files output` to prevent them from getting cleaned up.
|
|
|
|
## @var polarization (str)
|
|
# photon polarization
|
|
#
|
|
# 'H', 'V', 'L', 'R', 'U'
|
|
#
|
|
|
|
## @var rme_files (dict)
|
|
# dictionary of radial matrix element files.
|
|
#
|
|
# if the dictionary is empty or the files don't exist,
|
|
# the radial matrix defaults to the rme_xxx_xxx attributes.
|
|
#
|
|
# in EDAC, RME files or constants are considered only if @ref phase_files are specified.
|
|
#
|
|
|
|
## @var work function (float)
|
|
# work function in eV
|
|
#
|
|
# the energy scale of EDAC is referenced to the vacuum level
|
|
# but data files are referenced to the Fermi level.
|
|
# the @ref pmsco.calculators.edac module adds the work function to the kinetic energy before it calls EDAC.
|
|
#
|
|
|
|
def __init__(self):
|
|
self.title: str = "default parameters"
|
|
self.comment: str = "set by project.CalculatorParams()"
|
|
self.cluster_file: PathLike = ""
|
|
self.output_file: PathLike = ""
|
|
self.scan_file: PathLike = ""
|
|
self.initial_state: str = "1s"
|
|
self.binding_energy: Numeric = 0.0
|
|
self.polarization: str = "H"
|
|
self.angular_resolution: Numeric = 1.0
|
|
self.z_surface: Numeric = 0.0
|
|
self.inner_potential: Numeric = 10.0
|
|
self.work_function: Numeric = 0.0
|
|
self.symmetry_range: Numeric = 360.0
|
|
self.polar_incidence_angle: Numeric = 60.0
|
|
self.azimuthal_incidence_angle: Numeric = 0.0
|
|
self.experiment_temperature: Numeric = 300.0
|
|
self.debye_temperature: Numeric = 400.0
|
|
self.debye_wavevector: Numeric = 1.0
|
|
self.phase_files: Dict[int, PathLike] = {}
|
|
self.rme_files: Dict[int, PathLike] = {}
|
|
self.rme_minus_value: Numeric = 0.1
|
|
self.rme_minus_shift: Numeric = 0.0
|
|
self.rme_plus_value: Numeric = 1.0
|
|
self.rme_plus_shift: Numeric = 0.0
|
|
# used by MSC only
|
|
self.spherical_order: int = 2
|
|
self.scattering_level: int = 5
|
|
self.fcut: Numeric = 15.0
|
|
self.cut: Numeric = 15.0
|
|
self.lattice_constant: Numeric = 1.0
|
|
self.msq_displacement: Dict[int, Numeric] = {}
|
|
self.planewave_attenuation = 1.0
|
|
self.vibration_model = "N"
|
|
self.substrate_atomic_mass = 1.0
|
|
# used by EDAC only
|
|
self.emitters: Iterable[Tuple[Numeric, Numeric, Numeric, int]] = [(0.0, 0.0, 0.0, 0)]
|
|
self.lmax: int = 15
|
|
self.dmax: Numeric = 5.0
|
|
self.orders: Iterable[int] = [20]
|
|
self.phase_output_classes: Optional[Union[int, Iterable[int]]] = None
|
|
|
|
@property
|
|
def l_init(self) -> int:
|
|
"""
|
|
initial state l quantum number.
|
|
|
|
this is converted from the initial_state property.
|
|
|
|
@return: (int) 0..3
|
|
"""
|
|
return "spdf".index(self.initial_state[1])
|
|
|
|
|
|
class ProjectDirectories(collections.UserDict):
|
|
"""
|
|
Dictionary of project directories
|
|
|
|
This class encapsulates a mapping of keys to directory paths and methods to resolve placeholders.
|
|
Placeholders have the format `${identifier}` and resolve to the correspondingly named items from:
|
|
- this dictionary itself
|
|
- the project attributes listed in project_attr
|
|
- the project tags
|
|
- extra mapping provided to the resolve method
|
|
|
|
The resolve_directories() method resolves all items at once,
|
|
The resolve_path() method resolves just one item.
|
|
|
|
The dictionary values can be strings or pathlib.Path objects.
|
|
|
|
Initial values can be passed to the constructor as a dictionary or keyword arguments.
|
|
"""
|
|
|
|
def __init__(self, project_: 'Project', *args, **kwargs):
|
|
args = [self, *args]
|
|
super().__init__(*args, **kwargs)
|
|
self.project = project_
|
|
self.project_attr = ['project_name', 'job_name', 'mode']
|
|
|
|
def _get_path_dict(self, extra_mapping: Optional[Dict[str, PathLike]] = None,
|
|
include_absolute_paths: bool = True) -> Dict[str, PathLike]:
|
|
|
|
"""
|
|
Auxiliary function for resolve_path
|
|
|
|
Compile a dictionary for path resolution. See @ref resolve_path.
|
|
|
|
@param extra_mapping: Custom placeholders to substitute
|
|
@param include_absolute_paths: Include items that represent absolute paths.
|
|
If False, absolute paths from self.dictionaries are not included in the result.
|
|
@return: dictionary
|
|
"""
|
|
|
|
d = {k: v for k, v in self.data.items() if v and (include_absolute_paths or not Path(v).is_absolute())}
|
|
for attr in self.project_attr:
|
|
d[attr] = getattr(self.project, attr)
|
|
d.update(self.project.job_tags)
|
|
if extra_mapping:
|
|
d.update(extra_mapping)
|
|
return d
|
|
|
|
def resolve_directories(self, check: bool = True) -> None:
|
|
"""
|
|
Resolve the paths of the directories property
|
|
|
|
@param check: Check that no placeholders remain in the resolved paths.
|
|
Else, raise a ValueError.
|
|
@return: None
|
|
"""
|
|
|
|
pattern = r"\$\{(\w*)\}"
|
|
for i in range(len(self.data)):
|
|
self.data = {k: Path(self.resolve_path(v)) for k, v in self.data.items()}
|
|
if check:
|
|
unresolved = [path for path in self.data.values() if re.search(pattern, str(path)) is not None]
|
|
if len(unresolved) == 0:
|
|
break
|
|
else:
|
|
break
|
|
else:
|
|
raise ValueError('Cannot resolve directory placeholders')
|
|
|
|
def resolve_path(self, path: PathLike, extra_mapping: Optional[Dict[str, PathLike]] = None,
|
|
allow_absolute: bool = True) -> PathLike:
|
|
|
|
"""
|
|
Resolve a path or file name by template substitution
|
|
|
|
Replace placeholders of the form `${identifier}`
|
|
by values from project-related dictionaries.
|
|
Placeholders are looked up in (in order of precedence):
|
|
- `extra_mapping`
|
|
- `project.job_tags`
|
|
- `project.mode`, `project.job_name`, `project.project_name`
|
|
- `self`
|
|
|
|
@note Placeholders that can't be resolved are not replaced!
|
|
|
|
@param path: (str or pathlib.Path) Template string containing placeholders in the form `${identifier}`,
|
|
or `Path` object containing one or more placeholders.
|
|
|
|
@param extra_mapping: Custom placeholders to substitute.
|
|
|
|
@param allow_absolute: Allow absolute paths for substitutes.
|
|
Used for internal recursive calls to prevent insertion of an absolute path.
|
|
|
|
@return: Resolved path or string
|
|
"""
|
|
|
|
pattern = r"\$\{(\w*)\}"
|
|
if isinstance(path, Path):
|
|
if path.is_absolute():
|
|
allow_absolute = False
|
|
parts = []
|
|
for p in path.parts:
|
|
parts.append(self.resolve_path(p, extra_mapping=extra_mapping, allow_absolute=allow_absolute))
|
|
allow_absolute = False
|
|
r = Path(*parts)
|
|
elif path:
|
|
d = self._get_path_dict(extra_mapping=extra_mapping, include_absolute_paths=allow_absolute)
|
|
|
|
def replacement(mo):
|
|
try:
|
|
return str(d[mo.group(1)])
|
|
except KeyError:
|
|
return mo.group(0)
|
|
|
|
r = re.sub(pattern, replacement, str(path))
|
|
else:
|
|
r = path
|
|
|
|
return r
|
|
|
|
|
|
# placeholder if default handler should be chosen.
|
|
# any other value means that the default is overridden by the user.
|
|
# default handlers are resolved in Project.validate.
|
|
DefaultHandler = None
|
|
|
|
|
|
# noinspection PyMethodMayBeStatic
|
|
class Project(config.ConfigurableObject):
|
|
"""
|
|
Base class of a calculation project.
|
|
|
|
Each calculation project must derive from this class.
|
|
It contains all parameters and data necessary to run the calculation.
|
|
It contains or references code for certain tasks like cluster generation, calculation parameters,
|
|
calculation of modulation functions and R-factors.
|
|
|
|
The attributes should be populated in the constructor of the derived class or (recommended) via runfile.
|
|
It is essential that the attributes are set correctly before calculation.
|
|
|
|
The call sequence of project methods is as follows:
|
|
1. Constructor `__init__`.
|
|
2. `set_properties` inherited from `ConfigurableObject` assigns values from the runfile.
|
|
3. `validate` resolves directories, instantiates objects, loads scan data,
|
|
and checks validity of important attribute values.
|
|
4. `setup` prepares task handlers and reports for the calculation.
|
|
5. During the calculations, the various `calc`, `combine`, `evaluate` functions are called as necessary.
|
|
6. After the calculations, `cleanup` can do some final processing.
|
|
The code must not rely on cleanup being called, though.
|
|
A resource manager may kill the process at any time.
|
|
"""
|
|
|
|
## @var features (dictionary)
|
|
#
|
|
# calculation features and versions supported by the project.
|
|
#
|
|
# the dictionary contains key-value pairs where the key is the name of the feature and value is a version number.
|
|
# this field conditionally enables new software features that may break backward compatibility.
|
|
# derived projects should fill this field with the supported version
|
|
# upon creation (in their __init__ method or create_project() factory).
|
|
# version 0 (default) means that the feature is disabled.
|
|
#
|
|
# the following features can be enabled (list may be incomplete):
|
|
# as of this version, no optional features are defined.
|
|
#
|
|
# @note rather than introducing new features and, particularly, new versions that rely on this mechanism,
|
|
# developers of generic code should check whether backward compatibility could be achieved in a simpler way,
|
|
# e.g. by implementing addition methods whose default behaviour is the same as of the previous version.
|
|
# in some cases it may be better to refactor all current project code.
|
|
#
|
|
|
|
## @var scans (list of Scan objects)
|
|
# list of experimental scans for which calculations are to be run.
|
|
#
|
|
# during project initialization, this list must be populated with Scan, ScanLoader or ScanCreator objects.
|
|
# while Scan objects contain all scan data, the latter two classes contain only scan specifications
|
|
# which are expanded (i.e. files are loaded or arrays are calculated) just before the calculations start.
|
|
# the Project.add_scan() method is a short-cut to create the respective scan object from few arguments.
|
|
# before the calculation starts, all objects are converted into fully specified Scan objects
|
|
# and scan data is loaded or calculated.
|
|
#
|
|
# there are two ways to fill this list:
|
|
# either the project code fills it as a part of its initialization (create_project),
|
|
# or the list is populated via the run-file.
|
|
|
|
## @var domains (list of arbitrary objects)
|
|
# list of domains for which calculations are to be run.
|
|
#
|
|
# it is up to the derived class what kind of objects are stored in the list.
|
|
# the recommended kind of objects are dictionaries which hold parameter values,
|
|
# similar to the model dictionaries.
|
|
#
|
|
# the list must be populated by calling the add_domain() method.
|
|
|
|
## @var cluster_generator (ClusterGenerator object)
|
|
# provides the cluster generator methods.
|
|
#
|
|
# a project must provide a cluster generator object that is derived from ClusterGenerator.
|
|
# at least the ClusterGenerator.create_cluster method must be implemented.
|
|
# if emitters should be run in parallel, the ClusterGenerator.count_emitters must be implemented as well.
|
|
#
|
|
# the initial value is a LegacyClusterGenerator object
|
|
# which routes cluster calls back to the project for compatibility with older project code.
|
|
|
|
## @var optimizer_params (dict)
|
|
# optional parameters of the model optimizer.
|
|
#
|
|
# this is a dictionary that can have (among others) the following values.
|
|
# for a detailed list, see the documentation of the respective model handler.
|
|
#
|
|
# @arg @c 'pop_size' (int)
|
|
# population size (number of particles) in the swarm or genetic optimization mode.
|
|
# by default, the population size is set to the number of parallel processes or 4, whichever is greater.
|
|
# you may want to override the default value in cases where the automatic choice is not appropriate.
|
|
# @arg @c 'seed_file' (string)
|
|
# name of a file containing the results from previous optimization runs.
|
|
# this can be used to resume a swarm or genetic optimization where it was interrupted before.
|
|
# the seed file is a space-delimited, multi-column, text file,
|
|
# e.g., the output file of a previous optimization.
|
|
# by default, no seed is loaded.
|
|
# @arg @c 'recalc_seed' (bool)
|
|
# select whether the R-factors of the seed models are calculated again.
|
|
# set this argument to False only if the calculation is a continuation of a previous one
|
|
# without any changes to the code.
|
|
|
|
## @var directories
|
|
# dictionary for various directory paths.
|
|
#
|
|
# home: user's home directory.
|
|
# work: working directory at job start.
|
|
# data: where to load experimental data (scan files) from.
|
|
# project: directory of the project module.
|
|
# output: where to write output and intermediate files.
|
|
# report: directory for graphical reports.
|
|
# temp: for temporary files.
|
|
#
|
|
# the paths should be pathlib.Path objects.
|
|
# strings are accepted as well.
|
|
#
|
|
# directly after initialization (__init__ constructor or runfile configuration),
|
|
# the paths can contain ${identifier}-style placeholders
|
|
# that refer to other directories items, job tags and some other project attributes.
|
|
# they are resolved to final paths by the validate method.
|
|
#
|
|
# output_dir and output_file are set at once by @ref set_output.
|
|
|
|
## @var output_file (Path)
|
|
# file name root for data files produced during the calculation, including intermediate files.
|
|
#
|
|
# this is the concatenation of self.directories['output'] and self.job_name.
|
|
# assignment to this property will update the two basic attributes.
|
|
|
|
## @var db_file (string)
|
|
# name of an sqlite3 database file where the calculation results should be stored.
|
|
#
|
|
# the default value is ':memory:', which creates a volatile in-memory database.
|
|
|
|
## @var timedelta_limit (datetime.timedelta)
|
|
# wall time after which no new calculations should be started.
|
|
#
|
|
# the actual wall time may be longer by the remaining time of running calculations.
|
|
# running calculations will not be aborted.
|
|
#
|
|
# the time_limit property is an alternative representation as hours.
|
|
# reading and writing accesses timedelta_limit.
|
|
|
|
## @var combined_scan
|
|
# combined raw data from scans.
|
|
# updated by self.load_scans().
|
|
|
|
## @var combined_modf
|
|
# combined modulation function from scans.
|
|
# updated by self.load_scans().
|
|
|
|
## @var files
|
|
# list of all generated data files with metadata.
|
|
# the list is used by model handlers to decide which files can be deleted at run time to save disk space.
|
|
#
|
|
# files.categories_to_delete determines which files can be deleted.
|
|
|
|
## @var git_hash
|
|
# git hash of the running code
|
|
#
|
|
# the attribute is normally set by the main pmsco module but can be overwritten by the run file.
|
|
# it is part of the job metadata and stored in the job record of the database.
|
|
|
|
## @var handler_classes
|
|
# Classes of the task handlers used in the calculation process
|
|
#
|
|
# Normally, PMSCO chooses the appropriate task handlers automatically based on arguments.
|
|
# This happens in the `Project.validate` methods.
|
|
# The default behavior can be overridden in one of the following ways:
|
|
# - Specify an explicit class in the constructor of the subclass.
|
|
# - Specify an explicit class in the runfile.
|
|
# - Override a `validate_xxxx_handler` method.
|
|
|
|
## @var keep_best
|
|
# number of best models for which result files should be kept.
|
|
#
|
|
# this attribute determines how many models are kept based on R-factor ranking at each node of the task tree
|
|
# (up to keep_levels).
|
|
|
|
## @var keep_levels
|
|
# numeric task level down to which R-factors are considered when model files are cleaned up.
|
|
#
|
|
# @arg 0 = model level: combined results only.
|
|
# @arg 1 = scan level: scan nodes in addition to combined results (level 0).
|
|
# @arg 2 = domain level: domain nodes in addition to level 1.
|
|
# @arg 3 = emitter level: emitter nodes in addition to level 1.
|
|
# @arg 4 = region level: region nodes in addition to level 1.
|
|
|
|
## @var atomic_scattering_factory
|
|
# factory function to create an atomic scattering calculator
|
|
#
|
|
# this can also be the name of a class.
|
|
# the calculator must inherit from pmsco.calculators.calculator.AtomicCalculator.
|
|
# the name of atomic scattering calculator classes should end in AtomicCalculator.
|
|
|
|
## @var multiple_scattering_factory
|
|
# factory function to create a multiple scattering calculator
|
|
#
|
|
# this can also be the name of a class.
|
|
# the calculator must inherit from pmsco.calculators.calculator.Calculator
|
|
#
|
|
# example: pmsco.calculators.edac.EdacCalculator
|
|
#
|
|
|
|
## @var reports
|
|
# list of reports
|
|
#
|
|
# reports are a configurable way of generating extra graphics or data files
|
|
# during an optimization job.
|
|
#
|
|
# the objects must in inherit from ProjectReport.
|
|
# the reports are called each time the calculation of a model finishes.
|
|
#
|
|
# the reports list can be configured in the runfile or project constructor.
|
|
# the Project.setup() method resolves symbolic file paths and calls setup on each report.
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.mode: str = "single"
|
|
self.project_name: str = self.__class__.__name__
|
|
self.job_name: str = "pmsco0"
|
|
self.job_tags: Dict[str, Union[str, Numeric]] = {}
|
|
self.git_hash: str = ""
|
|
self.description: str = ""
|
|
self.features: Dict[str, Union[str, Numeric]] = {}
|
|
self.cluster_format: int = pmsco.cluster.FMT_EDAC
|
|
self.cluster_generator: pmsco.cluster.ClusterGenerator = pmsco.cluster.LegacyClusterGenerator(self)
|
|
self._model_space: Optional[ModelSpace] = None
|
|
self.scans: List[Scan] = []
|
|
self.domains: List[Dict[str, Any]] = []
|
|
self.optimizer_params: Dict[str, Any] = {
|
|
'pop_size': 0,
|
|
'seed_file': "",
|
|
'seed_limit': 0,
|
|
'recalc_seed': True,
|
|
'table_file': ""
|
|
}
|
|
self.directories: ProjectDirectories = ProjectDirectories(self,
|
|
home=Path.home(),
|
|
work=Path.cwd(),
|
|
data="",
|
|
project="",
|
|
output="",
|
|
report=Path("${output}", "report"),
|
|
temp="")
|
|
self.log_file: PathLike = ""
|
|
self.log_level: str = "WARNING"
|
|
self.db_file: PathLike = ':memory:'
|
|
self.timedelta_limit: datetime.timedelta = datetime.timedelta(days=1)
|
|
self.combined_scan: Optional[npt.ArrayLike] = None
|
|
self.combined_modf: Optional[npt.ArrayLike] = None
|
|
self.files: FileTracker = FileTracker()
|
|
self.keep_files: Iterable[str] = list(FILE_CATEGORIES_TO_KEEP)
|
|
self.keep_levels: int = 1
|
|
self.keep_best: int = 10
|
|
self.handler_classes: Dict[str, Optional[type[TaskHandler]]] = {
|
|
'model': DefaultHandler,
|
|
'scan': DefaultHandler,
|
|
'domain': DefaultHandler,
|
|
'emit': DefaultHandler,
|
|
'region': DefaultHandler
|
|
}
|
|
self.atomic_scattering_factory: type[Calculator] = InternalAtomicCalculator
|
|
self.multiple_scattering_factory: type[Calculator] = EdacCalculator
|
|
self.reports: List[ProjectReport] = []
|
|
self._tasks_fields = []
|
|
self._db = db_project.ProjectDatabase()
|
|
|
|
def set_properties(self, symbols: Optional[Mapping[str, Any]],
|
|
data_dict: config.DataDict,
|
|
project: config.ConfigurableObject) -> None:
|
|
|
|
"""
|
|
Set configurable properties.
|
|
|
|
Inherits from `ConfigurableObject` and resolves calculator class names.
|
|
|
|
@param symbols:
|
|
@param data_dict:
|
|
@param project:
|
|
@return:
|
|
"""
|
|
|
|
super().set_properties(symbols, data_dict, project)
|
|
if isinstance(self.atomic_scattering_factory, str):
|
|
self.atomic_scattering_factory = eval(self.atomic_scattering_factory, symbols)
|
|
if isinstance(self.multiple_scattering_factory, str):
|
|
self.multiple_scattering_factory = eval(self.multiple_scattering_factory, symbols)
|
|
|
|
def validate(self):
|
|
"""
|
|
Validate the project parameters before starting calculations
|
|
|
|
- Check and fix attributes that may cause trouble or go unnoticed if they are wrong.
|
|
- Fix attributes which may be incomplete after loading a run-file.
|
|
- Look up scattering factories that are declared as string.
|
|
- Resolve placeholders in the directories.
|
|
- Resolve placeholders in the output_file.
|
|
- Make output_file and output_dir consistent (so that output_file includes output_dir).
|
|
- Call `create_model_space` if the `model_space` attribute is undefined.
|
|
- Load scan data.
|
|
|
|
Failed critical checks raise an exception (AssertionError, AttributeError, KeyError, ValueError).
|
|
Checks that cause an attribute do revert to default, are logged as warning.
|
|
|
|
@note To check the syntax of a run-file, set the calculation mode to 'validate' and run pmsco.
|
|
This will pass the validate method but will stop execution before calculations are started.
|
|
|
|
@raise AssertionError if a parameter is not correct.
|
|
@raise AttributeError if a class name cannot be resolved.
|
|
"""
|
|
|
|
assert self.mode in {"single", "swarm", "genetic", "grid", "table", "test", "validate"}
|
|
assert self.job_name
|
|
|
|
self.directories.resolve_directories(check=True)
|
|
self.directories['output'].mkdir(parents=True, exist_ok=True)
|
|
self.db_file = self.directories.resolve_path(self.db_file)
|
|
self.optimizer_params['seed_file'] = self.directories.resolve_path(self.optimizer_params['seed_file'])
|
|
|
|
if self._model_space is None or not self._model_space.start:
|
|
logger.warning("undefined model_space attribute, trying project's create_model_space")
|
|
self._model_space = self.create_model_space()
|
|
|
|
self.load_scans()
|
|
|
|
self.validate_model_handler()
|
|
self.validate_scan_handler()
|
|
self.validate_domain_handler()
|
|
self.validate_emitter_handler()
|
|
self.validate_region_handler()
|
|
|
|
for report in self.reports:
|
|
report.validate(self)
|
|
|
|
def validate_model_handler(self):
|
|
"""
|
|
Validate the model handler.
|
|
|
|
Check that `self.handler_classes['model']` contains a valid TaskHandler.
|
|
If none is set, choose the appropriate handler corresponding to the `mode` attribute.
|
|
|
|
The default behavior can be overridden by specifying an explicit class in the runfile
|
|
or by overriding this method.
|
|
The class must derive from TaskHandler, else an AssertionError is raised.
|
|
"""
|
|
|
|
handler_class = self.handler_classes.get('model')
|
|
|
|
if handler_class is None:
|
|
if self.mode == 'single':
|
|
handler_class = SingleModelHandler
|
|
elif self.mode == 'grid':
|
|
handler_class = GridSearchHandler
|
|
elif self.mode == 'swarm':
|
|
handler_class = ParticleSwarmHandler
|
|
elif self.mode == 'genetic':
|
|
handler_class = GeneticOptimizationHandler
|
|
elif self.mode == 'table':
|
|
handler_class = TableModelHandler
|
|
else:
|
|
handler_class = SingleModelHandler
|
|
logger.error(f"invalid optimization mode {self.mode}, defaulting to single")
|
|
|
|
assert issubclass(handler_class, TaskHandler)
|
|
self.handler_classes['model'] = handler_class
|
|
|
|
def validate_scan_handler(self):
|
|
"""
|
|
Validate the scan handler.
|
|
|
|
Check that `self.handler_classes['scan']` contains a valid TaskHandler.
|
|
If none is set, choose the default `ScanHandler`.
|
|
|
|
The default behavior can be overridden by specifying an explicit class in the runfile
|
|
or by overriding this method.
|
|
The class must derive from TaskHandler, else an AssertionError is raised.
|
|
"""
|
|
|
|
handler_class = self.handler_classes.get('scan')
|
|
|
|
if handler_class is None:
|
|
handler_class = ScanHandler
|
|
|
|
assert issubclass(handler_class, TaskHandler)
|
|
self.handler_classes['scan'] = handler_class
|
|
|
|
def validate_domain_handler(self):
|
|
"""
|
|
Validate the domain handler.
|
|
|
|
Check that `self.handler_classes['domain']` contains a valid TaskHandler.
|
|
If none is set, choose the default `DomainHandler`.
|
|
|
|
The default behavior can be overridden by specifying an explicit class in the runfile
|
|
or by overriding this method.
|
|
The class must derive from TaskHandler, else an AssertionError is raised.
|
|
"""
|
|
|
|
handler_class = self.handler_classes.get('domain')
|
|
|
|
if handler_class is None:
|
|
handler_class = DomainHandler
|
|
|
|
assert issubclass(handler_class, TaskHandler)
|
|
self.handler_classes['domain'] = handler_class
|
|
|
|
def validate_emitter_handler(self):
|
|
"""
|
|
Validate the emitter handler.
|
|
|
|
Check that `self.handler_classes['emit']` contains a valid TaskHandler.
|
|
If none is set, choose the default `EmitterHandler`.
|
|
|
|
The default behavior can be overridden by specifying an explicit class in the runfile
|
|
or by overriding this method.
|
|
The class must derive from TaskHandler, else an AssertionError is raised.
|
|
"""
|
|
|
|
handler_class = self.handler_classes.get('emit')
|
|
|
|
if handler_class is None:
|
|
handler_class = EmitterHandler
|
|
|
|
assert issubclass(handler_class, TaskHandler)
|
|
self.handler_classes['emit'] = handler_class
|
|
|
|
def validate_region_handler(self):
|
|
"""
|
|
Validate the emitter handler.
|
|
|
|
Check that `self.handler_classes['region']` contains a valid TaskHandler.
|
|
If none is set, choose one of `EnergyRegionHandler`
|
|
(if the project includes an energy scan with at least 10 steps)
|
|
or `SingleRegionHandler`.
|
|
|
|
angle scans do not benefit from region splitting in EDAC.
|
|
|
|
The default behavior can be overridden by specifying an explicit class in the runfile
|
|
or by overriding this method.
|
|
The class must derive from TaskHandler, else an AssertionError is raised.
|
|
"""
|
|
|
|
handler_class = self.handler_classes.get('region')
|
|
|
|
if handler_class is None:
|
|
energy_scans = 0
|
|
for scan in self.scans:
|
|
if scan.energies.shape[0] >= 10:
|
|
energy_scans += 1
|
|
|
|
if energy_scans >= 1:
|
|
handler_class = EnergyRegionHandler
|
|
else:
|
|
handler_class = SingleRegionHandler
|
|
|
|
assert issubclass(handler_class, TaskHandler)
|
|
self.handler_classes['region'] = handler_class
|
|
|
|
@property
|
|
def data_dir(self) -> PathLike:
|
|
return self.directories['data']
|
|
|
|
@data_dir.setter
|
|
def data_dir(self, path: PathLike):
|
|
self.directories['data'] = Path(path)
|
|
|
|
@property
|
|
def output_dir(self) -> PathLike:
|
|
return self.directories['output']
|
|
|
|
@output_dir.setter
|
|
def output_dir(self, path: PathLike):
|
|
self.directories['output'] = Path(path)
|
|
|
|
@property
|
|
def output_file(self) -> PathLike:
|
|
return Path(self.directories['output'], self.job_name)
|
|
|
|
@output_file.setter
|
|
def output_file(self, filename: PathLike) -> None:
|
|
"""
|
|
set path and base name of output file.
|
|
|
|
path is copied to the output_dir attribute.
|
|
the file stem is copied to the job_name attribute.
|
|
|
|
@param filename: (PathLike)
|
|
"""
|
|
p = Path(filename)
|
|
s = str(p.parent)
|
|
if s and s != ".":
|
|
self.directories['output'] = p.parent
|
|
s = str(p.stem)
|
|
if s:
|
|
self.job_name = s
|
|
else:
|
|
raise ValueError("invalid output file name")
|
|
|
|
@property
|
|
def time_limit(self) -> float:
|
|
"""
|
|
Wall time limit in hours
|
|
|
|
@return: hours
|
|
"""
|
|
|
|
return self.timedelta_limit.total_seconds() / 3600 / 24
|
|
|
|
@time_limit.setter
|
|
def time_limit(self, hours: float) -> None:
|
|
self.timedelta_limit = datetime.timedelta(hours=hours)
|
|
|
|
def create_model_space(self) -> Optional[ModelSpace]:
|
|
"""
|
|
create a project.ModelSpace object which defines the allowed range for model parameters.
|
|
|
|
there are three ways for a project to declare the model space:
|
|
1. implement the @ref create_model_space method.
|
|
this is the older way and may become deprecated in a future version.
|
|
2. assign a ModelSpace to the self.model_space property directly
|
|
(in the @ref validate method).
|
|
3. declare the model space in the run-file.
|
|
|
|
this method is called by the validate method only if self._model_space is undefined.
|
|
|
|
@return ModelSpace object
|
|
"""
|
|
return None
|
|
|
|
@property
|
|
def model_space(self) -> ModelSpace:
|
|
"""
|
|
ModelSpace object that defines the allowed range for model parameters.
|
|
|
|
there are three ways for a project to declare the model space:
|
|
1. implement the @ref create_model_space method.
|
|
this is the older way and may become deprecated in a future version.
|
|
2. assign a ModelSpace to the self.model_space property directly
|
|
(in the @ref validate method).
|
|
3. declare the model space in the run-file.
|
|
|
|
initially, this property is None.
|
|
"""
|
|
return self._model_space
|
|
|
|
@model_space.setter
|
|
def model_space(self, value: ModelSpace) -> None:
|
|
if isinstance(value, ModelSpace):
|
|
self._model_space = value
|
|
elif hasattr(value, 'items'):
|
|
self._model_space = ModelSpace()
|
|
self._model_space.set_param_dict(value)
|
|
else:
|
|
raise ValueError("incompatible object type")
|
|
|
|
def create_params(self, model: Dict[str, Numeric], index: CalcID) -> None:
|
|
"""
|
|
create a CalculatorParams object given the model parameters and calculation index.
|
|
|
|
@param model (dictionary) model parameters to be used in the calculation.
|
|
|
|
@param index (named tuple CalcID) calculation index.
|
|
the method should consider only the following attributes:
|
|
@arg `scan` scan index (index into Project.scans)
|
|
@arg `domain` domain index (index into Project.domains)
|
|
"""
|
|
return None
|
|
|
|
def clear_scans(self):
|
|
"""
|
|
clear scans.
|
|
|
|
delete all scans in self.scans and empty the list.
|
|
|
|
@return: None
|
|
"""
|
|
self.scans = []
|
|
self.combined_scan = None
|
|
self.combined_modf = None
|
|
|
|
def add_scan(self, filename: PathLike, emitter: str, initial_state: str, is_modf: bool = False,
|
|
positions: Optional[Dict[str, npt.NDArray]] = None) -> Union[ScanLoader, ScanCreator]:
|
|
|
|
"""
|
|
add a scan specification to the scans list.
|
|
|
|
this is a shortcut for adding a ScanCreator or ScanLoader object to the self.scans list.
|
|
the creator or loader are converted into full Scan objects just before the calculation starts
|
|
(in the self.setup() method).
|
|
|
|
the extension must be one of pmsco.data.DATATYPES (case insensitive)
|
|
corresponding to the meaning of the columns in the file.
|
|
|
|
caution: EDAC can only calculate equidistant, rectangular scans.
|
|
the following scans are currently supported:
|
|
|
|
* intensity vs energy at fixed theta, phi
|
|
* intensity vs analyser angle vs energy at normal emission (theta = 0, constant phi)
|
|
* intensity vs theta, phi, or alpha
|
|
* intensity vs theta and phi (hemisphere or hologram scan)
|
|
|
|
@param filename: (string) file name of the experimental data, possibly including a path.
|
|
the file is not loaded when the optional positions argument is present,
|
|
but the filename may serve as basename for output files (e.g. modulation function).
|
|
|
|
@param positions: (optional, dictionary of numpy arrays) scan positions.
|
|
if specified, the file given by filename is _not_ loaded,
|
|
and the scan positions are initialized from this dictionary.
|
|
the dictionary keys are the possible scan dimensions: 'e', 't', 'p', 'a'.
|
|
the arrays are one-dimensional and contain unique, equidistant positions.
|
|
constant dimensions have shape 1. see @ref Scan.define_scan.
|
|
|
|
@param emitter: (string) chemical symbol of the photo-emitting atom, e.g. "Cu".
|
|
|
|
@param initial_state: (string) nl term of the initial state of the atom, e.g. "2p".
|
|
|
|
@param is_modf: (bool) declares whether the file contains the modulation function (True),
|
|
or intensity (False, default). In the latter case, the modulation function is calculated internally.
|
|
|
|
@return (Scan) the new scan object (which is also a member of self.scans).
|
|
"""
|
|
|
|
if positions is not None:
|
|
scan = ScanCreator()
|
|
scan.positions = positions
|
|
else:
|
|
scan = ScanLoader()
|
|
scan.is_modf = is_modf
|
|
|
|
scan.filename = filename
|
|
scan.emitter = emitter
|
|
scan.initial_state = initial_state
|
|
self.scans.append(scan)
|
|
|
|
return scan
|
|
|
|
def load_scans(self):
|
|
"""
|
|
load all scan data.
|
|
|
|
initially, the self.scans list may contain objects of different classes (Scan, ScanLoader, ScanCreator)
|
|
depending on the project initialization.
|
|
this method loads all data, so that the scans list contains only Scan objects.
|
|
|
|
also, the self.combined_scan and self.combined_modf fields are calculated from the scans.
|
|
"""
|
|
has_raw_data = True
|
|
has_mod_func = True
|
|
loaded_scans = []
|
|
|
|
for scan_proto in self.scans:
|
|
scan = scan_proto.load(dirs=self.directories)
|
|
loaded_scans.append(scan)
|
|
if scan.modulation is None:
|
|
try:
|
|
scan.modulation = self.calc_modulation(scan, scan.raw_data)
|
|
except ValueError:
|
|
logger.error(f"error calculating the modulation function of scan {scan_proto}.")
|
|
has_raw_data = has_raw_data and scan.raw_data is not None
|
|
has_mod_func = has_mod_func and scan.modulation is not None
|
|
self.scans = loaded_scans
|
|
|
|
if has_raw_data:
|
|
stack1 = [scan.raw_data for scan in self.scans]
|
|
dtype = pmsco.data.common_dtype(stack1)
|
|
stack2 = [pmsco.data.restructure_data(data, dtype) for data in stack1]
|
|
self.combined_scan = np.hstack(tuple(stack2))
|
|
else:
|
|
self.combined_scan = None
|
|
|
|
if has_mod_func:
|
|
stack1 = [scan.modulation for scan in self.scans]
|
|
dtype = pmsco.data.common_dtype(stack1)
|
|
stack2 = [pmsco.data.restructure_data(data, dtype) for data in stack1]
|
|
self.combined_modf = np.hstack(tuple(stack2))
|
|
else:
|
|
self.combined_modf = None
|
|
|
|
def clear_domains(self):
|
|
"""
|
|
clear domains.
|
|
|
|
delete all domains in self.domains and empty the list.
|
|
|
|
@return: None
|
|
"""
|
|
self.domains = []
|
|
|
|
def add_domain(self, domain: Dict[str, Any]):
|
|
"""
|
|
add a domain to the list of domains.
|
|
|
|
this class declares the list of domains.
|
|
it does not define what should be in the list of domains.
|
|
however, there must be an entry for each domain to be calculated.
|
|
if the list is empty, no calculation will be executed.
|
|
|
|
@attention initially, the domains list is empty.
|
|
your project needs to add at least one domain.
|
|
otherwise, no calculation will be executed.
|
|
|
|
@param domain: it is up to the derived project class to specify and interpret the data stored here.
|
|
it is recommended to store a dictionary with domain parameters similar to the model parameters.
|
|
|
|
@return: None
|
|
"""
|
|
self.domains.append(domain)
|
|
|
|
def log_project_args(self):
|
|
"""
|
|
send some common project attributes to the log.
|
|
|
|
the attributes are normally logged at WARNING level.
|
|
|
|
this method is called by the main pmsco module after creating the project and assigning command line arguments.
|
|
it may be overridden to add logs of attributes of the sub-class.
|
|
|
|
@return: None
|
|
"""
|
|
try:
|
|
for key in self.directories:
|
|
val = self.directories[key]
|
|
lev = logging.WARNING if val else logging.DEBUG
|
|
logger.log(lev, f"directories['{key}']: {val}")
|
|
|
|
logger.warning("output file: {0}".format(self.output_file))
|
|
logger.warning("database: {0}".format(self.db_file))
|
|
|
|
logger.warning("atomic scattering: {0}".format(self.atomic_scattering_factory))
|
|
logger.warning("multiple scattering: {0}".format(self.multiple_scattering_factory))
|
|
logger.warning("optimization mode: {0}".format(self.mode))
|
|
|
|
for key in sorted(self.optimizer_params):
|
|
val = self.optimizer_params[key]
|
|
lev = logging.WARNING if val else logging.DEBUG
|
|
logger.log(lev, "optimizer_params['{k}']: {v}".format(k=key, v=val))
|
|
|
|
_files_to_keep = pmsco.files.FILE_CATEGORIES - self.files.categories_to_delete
|
|
logger.warning("intermediate files to keep: {0}".format(", ".join(_files_to_keep)))
|
|
|
|
for idx, scan in enumerate(self.scans):
|
|
logger.warning(f"scan {idx}: {scan}")
|
|
for idx, dom in enumerate(self.domains):
|
|
logger.warning(f"domain {idx}: {dom}")
|
|
|
|
except AttributeError:
|
|
logger.warning("AttributeError in log_project_args")
|
|
|
|
def combine_domains(self, parent_task: CalculationTask, child_tasks: Iterable[CalculationTask]):
|
|
"""
|
|
combine results of different domain into one result and calculate the modulation function.
|
|
|
|
the domain results are read from the file system using the indices defined by the child_tasks,
|
|
and the combined result is written to the file system with the index defined by parent_task.
|
|
|
|
by default, this method adds all domains with equal weight.
|
|
weights can be defined in the model dictionary with keys 'wdom0', 'wdom1', etc.
|
|
missing weights default to 1.
|
|
to avoid correlated parameters, one domain must always have a fixed weight.
|
|
it is recommended to leave 'wdom0' at its default.
|
|
|
|
@param parent_task: (CalculationTask) parent task of the domain tasks.
|
|
the method must write the results to the files indicated
|
|
by the @c result_filename and @c modf_filename attributes.
|
|
|
|
@param child_tasks: (sequence of CalculationTask) tasks which identify each domain.
|
|
the method must read the source data from the files
|
|
indicated by the @c result_filename attributes.
|
|
the sequence is sorted by task ID, i.e., essentially, by domain index.
|
|
|
|
@return: None
|
|
|
|
@raise IndexError if child_tasks is empty
|
|
|
|
@raise IOError if a filename is missing
|
|
|
|
@note the weights of the domains (in derived classes) can be part of the optimizable model parameters.
|
|
the model parameters are available as the @c model attribute of the calculation tasks.
|
|
"""
|
|
|
|
result_data = None
|
|
sum_weights = 0.
|
|
for task in child_tasks:
|
|
data = pmsco.data.load_data(task.result_filename)
|
|
if result_data is None:
|
|
result_data = data.copy()
|
|
result_data['i'] = 0.
|
|
try:
|
|
weight = task.model['wdom{}'.format(task.id.domain)]
|
|
except KeyError:
|
|
weight = 1.
|
|
result_data['i'] += weight * data['i']
|
|
sum_weights += weight
|
|
result_data['i'] /= sum_weights
|
|
|
|
pmsco.data.save_data(parent_task.result_filename, result_data)
|
|
|
|
if self.scans[parent_task.id.scan].modulation is not None:
|
|
result_modf = self.calc_modulation(parent_task.id.scan, result_data)
|
|
pmsco.data.save_data(parent_task.modf_filename, result_modf)
|
|
else:
|
|
parent_task.modf_filename = ""
|
|
|
|
def combine_emitters(self, parent_task: CalculationTask, child_tasks: Iterable[CalculationTask]):
|
|
"""
|
|
combine results of different emitters into one result. calculate the modulation function.
|
|
|
|
the emitter results are read from the file system using the indices defined by the child_tasks,
|
|
and the combined result is written to the file system with the index defined by parent_task.
|
|
|
|
by default, this method adds all emitters with equal weight.
|
|
|
|
sub-classes may override this method and implement expansion of equivalent emitters,
|
|
unequal weights, etc.
|
|
|
|
@param parent_task: (CalculationTask) parent task of the emitter tasks.
|
|
the method must write the results to the files indicated
|
|
by the @c result_filename and @c modf_filename attributes.
|
|
|
|
@param child_tasks: (sequence of CalculationTask) tasks which identify each emitter.
|
|
the method must read the source data from the files
|
|
indicated by the @c result_filename attributes.
|
|
the sequence is sorted by task ID, i.e., essentially, by the emitter index.
|
|
|
|
@return: None
|
|
|
|
@raise IndexError if child_tasks is empty
|
|
|
|
@raise IOError if a filename is missing
|
|
|
|
@note the weights of the emitters (in derived classes) can be part of the optimizable model parameters.
|
|
the model parameters are available as the @c model attribute of the calculation tasks.
|
|
"""
|
|
|
|
result_data = None
|
|
for task in child_tasks:
|
|
data = pmsco.data.load_data(task.result_filename)
|
|
if result_data is not None:
|
|
result_data['i'] += data['i']
|
|
else:
|
|
result_data = data
|
|
|
|
pmsco.data.save_data(parent_task.result_filename, result_data)
|
|
|
|
if self.scans[parent_task.id.scan].modulation is not None:
|
|
result_modf = self.calc_modulation(parent_task.id.scan, result_data)
|
|
pmsco.data.save_data(parent_task.modf_filename, result_modf)
|
|
else:
|
|
parent_task.modf_filename = ""
|
|
|
|
def combine_scans(self, parent_task: CalculationTask, child_tasks: Iterable[CalculationTask]):
|
|
"""
|
|
combine results of different scans into one result, for intensity and modulation.
|
|
|
|
the scan results are read from the file system using the indices defined by the child_tasks,
|
|
and the combined result is written to the file system with the index defined by parent_task.
|
|
|
|
the datasets of the scans are appended.
|
|
this is done for intensity and modulation data independently.
|
|
|
|
@param parent_task: (CalculationTask) parent task of the scan tasks.
|
|
the method must write the results to the files indicated
|
|
by the @c result_filename and @c modf_filename attributes.
|
|
|
|
@param child_tasks: (sequence of CalculationTask) tasks which identify each scan.
|
|
the method must read the source data from the files
|
|
indicated by the @c result_filename attributes.
|
|
the sequence is sorted by task ID, i.e., essentially, by scan index.
|
|
|
|
@return: None
|
|
|
|
@raise IndexError if child_tasks is empty.
|
|
"""
|
|
|
|
# intensity
|
|
try:
|
|
stack1 = [pmsco.data.load_data(task.result_filename) for task in child_tasks]
|
|
except IOError:
|
|
parent_task.result_filename = ""
|
|
else:
|
|
dtype = pmsco.data.common_dtype(stack1)
|
|
stack2 = [pmsco.data.restructure_data(data, dtype) for data in stack1]
|
|
result_data = np.hstack(tuple(stack2))
|
|
pmsco.data.save_data(parent_task.result_filename, result_data)
|
|
|
|
# modulation
|
|
try:
|
|
stack1 = [pmsco.data.load_data(task.modf_filename) for task in child_tasks]
|
|
except IOError:
|
|
parent_task.modf_filename = ""
|
|
else:
|
|
dtype = pmsco.data.common_dtype(stack1)
|
|
stack2 = [pmsco.data.restructure_data(data, dtype) for data in stack1]
|
|
result_modf = np.hstack(tuple(stack2))
|
|
pmsco.data.save_data(parent_task.modf_filename, result_modf)
|
|
|
|
def combine_regions(self, parent_task: CalculationTask, child_tasks: Iterable[CalculationTask]):
|
|
"""
|
|
combine results from different regions into one result, for intensity and modulation.
|
|
|
|
the scan results are read from the file system using the indices defined by the child_tasks,
|
|
and the combined result is written to the file system with the index defined by parent_task.
|
|
|
|
the datasets of the regions are appended and sorted in the standard order of the data module.
|
|
if the resulting length differs from the corresponding experimental scan,
|
|
an error is printed to the logger, but the calculation continues.
|
|
|
|
the modulation function is calculated by calling @ref calc_modulation.
|
|
|
|
@param parent_task: (CalculationTask) parent task of the region tasks.
|
|
the method writes the results to the file names
|
|
given by the @c result_filename and @c modf_filename attributes.
|
|
|
|
@param child_tasks: (sequence of CalculationTask) tasks which identify each region.
|
|
the reads the source data from the files
|
|
indicated by the @c result_filename attributes.
|
|
the sequence is sorted by task ID, i.e., essentially, by region index.
|
|
|
|
@return: None
|
|
|
|
@raise IndexError if child_tasks is empty.
|
|
"""
|
|
# intensity
|
|
try:
|
|
stack1 = [pmsco.data.load_data(task.result_filename) for task in child_tasks]
|
|
except IOError:
|
|
parent_task.result_valid = False
|
|
parent_task.result_filename = ""
|
|
else:
|
|
dtype = pmsco.data.common_dtype(stack1)
|
|
stack2 = [pmsco.data.restructure_data(data, dtype) for data in stack1]
|
|
result_data = np.hstack(tuple(stack2))
|
|
pmsco.data.sort_data(result_data)
|
|
pmsco.data.save_data(parent_task.result_filename, result_data)
|
|
|
|
scan = self.scans[parent_task.id.scan]
|
|
if result_data.shape[0] != scan.raw_data.shape[0]:
|
|
logger.error(BMsg("scan length mismatch: combined result: {result}, experimental data: {expected}",
|
|
result=result_data.shape[0], expected=scan.raw_data.shape[0]))
|
|
|
|
# modulation
|
|
try:
|
|
data = pmsco.data.load_data(parent_task.result_filename)
|
|
modf = self.calc_modulation(parent_task.id.scan, data)
|
|
except IOError:
|
|
parent_task.modf_filename = ""
|
|
else:
|
|
pmsco.data.save_data(parent_task.modf_filename, modf)
|
|
|
|
def setup(self, handlers: Dict[str, pmsco.handlers.TaskHandler]):
|
|
"""
|
|
prepare for calculations.
|
|
|
|
this method is called in the master process before starting the task loop.
|
|
at this point the task handlers have been created and set up.
|
|
if the project needs to change settings of task handlers it can do so in this method.
|
|
|
|
this instance writes the header of the tasks.dat file
|
|
that will receive sub-task evaluation results from the evaluate_result() method.
|
|
|
|
it also initializes the database where the task results will be stored.
|
|
this is either a volatile in-memory database or a user-specified sqlite3 database file.
|
|
|
|
@param handlers: dictionary listing the initialized task handler instances.
|
|
the dictionary keys are the attribute names of pmsco.dispatch.CalcID:
|
|
'model', 'scan', 'domain', 'emit' and 'region'.
|
|
|
|
@return: None
|
|
"""
|
|
|
|
fields = ["rfac"]
|
|
fields.extend(CalcID._fields)
|
|
fields.append("secs")
|
|
fields = ["_" + f for f in fields]
|
|
model_fields = list(self.model_space.start.keys())
|
|
model_fields.sort(key=lambda name: name.lower())
|
|
fields.extend(model_fields)
|
|
self._tasks_fields = fields
|
|
|
|
if 'all' in self.keep_files:
|
|
cats = set([])
|
|
else:
|
|
cats = pmsco.files.FILE_CATEGORIES - set(self.keep_files)
|
|
cats -= {'report'}
|
|
if self.mode == 'single':
|
|
cats -= {'model'}
|
|
self.files.categories_to_delete = cats
|
|
|
|
Path(self.output_file).parent.mkdir(parents=True, exist_ok=True)
|
|
tasks_file = Path(self.output_file).with_suffix(".tasks.dat")
|
|
with open(tasks_file, "wt", encoding="latin1") as outfile:
|
|
outfile.write("# ")
|
|
outfile.write(" ".join(fields))
|
|
outfile.write("\n")
|
|
|
|
self._db.connect(self.db_file)
|
|
self._db.ingest_project_metadata(self)
|
|
|
|
for report in self.reports:
|
|
report.set_database(self._db)
|
|
|
|
def evaluate_result(self, parent_task: CalculationTask, child_tasks: Iterable[CalculationTask]):
|
|
"""
|
|
evaluate the result of a calculation task.
|
|
|
|
this method is called from the add_result of the task handlers at each level.
|
|
it gives the project a hook to check the progress of a model at any level of the task tree.
|
|
|
|
the method calculates the r-factor by calling the Project.calc_rfactor method.
|
|
the result is written to the task.rfac field and to the .tasks.dat file.
|
|
invalid and region-level results are skipped.
|
|
|
|
this method is called in the master process only.
|
|
|
|
@param parent_task: (CalculationTask) a calculation task.
|
|
|
|
@param child_tasks: (sequence of CalculationTask) tasks which identify each scan.
|
|
the sequence must be sorted by task ID.
|
|
|
|
@return: None
|
|
"""
|
|
if parent_task.result_valid and parent_task.id.region == -1:
|
|
try:
|
|
parent_task.rfac = self.calc_rfactor(parent_task, child_tasks)
|
|
except ValueError:
|
|
parent_task.result_valid = False
|
|
logger.warning(BMsg("calculation {0} resulted in an undefined R-factor.", parent_task.id))
|
|
return None
|
|
|
|
values_dict = parent_task.id._asdict()
|
|
values_dict = {"_" + k: v for k, v in values_dict.items()}
|
|
values_dict.update(parent_task.model)
|
|
values_dict['_rfac'] = parent_task.rfac
|
|
values_dict['_secs'] = parent_task.time.total_seconds()
|
|
values_list = [values_dict[field] for field in self._tasks_fields]
|
|
|
|
tasks_file = Path(self.output_file).with_suffix(".tasks.dat")
|
|
with open(tasks_file, "at", encoding="latin1") as outfile:
|
|
outfile.write(" ".join(format(value) for value in values_list) + "\n")
|
|
if parent_task.delta:
|
|
delta_dict = parent_task.delta
|
|
else:
|
|
delta_dict = None
|
|
|
|
self._db.ingest_result(parent_task.id, values_dict, delta_dict)
|
|
|
|
if parent_task.result_valid and parent_task.id.level == 'model':
|
|
for report in self.reports:
|
|
if report.enabled and 'model' in report.trigger_levels:
|
|
logger.info(f"calling report {report.__class__.__name__} on model {parent_task.id.model}")
|
|
report.select_data(jobs=self._db.db_job_id, calcs=parent_task.id)
|
|
report.create_report()
|
|
|
|
return None
|
|
|
|
# noinspection PyUnusedLocal
|
|
def calc_modulation(self, scan: Union[int, Scan], data: npt.NDArray) -> npt.NDArray:
|
|
"""
|
|
Calculate the modulation function configured for a scan.
|
|
|
|
This method identifies the modulation function calculator of the scan and calls it on the given data.
|
|
If the scan does not define the type of modulation function, the default is pmsco.data.default_modfunc.
|
|
|
|
@param scan: Scan object or scan index that defines the modulation function.
|
|
Can alternatively be any object that defines the modulation_func and modulation_args attributes
|
|
like in the Scan class.
|
|
|
|
@param data: Structured numpy.ndarray in EI, ETPI, or ETPAI format.
|
|
Can contain a one- or multi-dimensional scan.
|
|
The scan coordinates must be on a rectangular or hemispherical grid.
|
|
For maximum compatibility, the array should be sorted,
|
|
though for the default calc_modfunc_loess function this is not required.
|
|
|
|
@return copy of the data array with the modulation function in the 'i' column.
|
|
"""
|
|
|
|
try:
|
|
modfunc = scan.modulation_func
|
|
modargs = scan.modulation_args
|
|
except AttributeError:
|
|
try:
|
|
modfunc = self.scans[scan].modulation_func
|
|
modargs = self.scans[scan].modulation_args
|
|
except (IndexError, TypeError):
|
|
logger.error(f"Unknown scan {scan} in Project.calc_modulation. "
|
|
f"Defaulting to pmsco.data.default_modfunc.")
|
|
modfunc = pmsco.data.default_modfunc
|
|
modargs = {}
|
|
|
|
return modfunc(data, **modargs)
|
|
|
|
def calc_rfactor(self, parent_task: CalculationTask, child_tasks: Iterable[CalculationTask]) -> float:
|
|
"""
|
|
Calculate the r-factor of a task.
|
|
|
|
The R-factor is calculated on the experimental and simulated modulation functions.
|
|
The algorithm differs for the model level and the lower task levels.
|
|
At the model level, the calculation is delegated to Project.combine_rfactors.
|
|
At all other levels, the calculation is delegated to Project.calc_scan_rfactor
|
|
where the simulated data is loaded from the file specified by parent_task
|
|
and the experimental data from Project.scan.
|
|
|
|
This method is called by the task handlers.
|
|
All child tasks belonging to the parent task must be complete.
|
|
|
|
To select a specific R-factor algorithm,
|
|
the R-factor function should be set in the Scan.rfactor_func attribute
|
|
which can be specified in the runfile.
|
|
|
|
In special (rare) cases, the project sub-class can override Project.calc_scan_rfactor
|
|
and/or Project.combine_rfactors.
|
|
|
|
@version In earlier versions,
|
|
projects had to override this method to implement their algorithm.
|
|
This has lead to duplication of common code.
|
|
The r-factor algorithm is now distributed over several methods
|
|
and can be specified in the runfile.
|
|
|
|
@param parent_task: (CalculationTask) a calculation task.
|
|
|
|
@param child_tasks: (sequence of CalculationTask) tasks which identify each scan.
|
|
the sequence must be sorted by task ID.
|
|
|
|
@return (float) calculated R-factor.
|
|
|
|
@raise ValueError if the function fails (e.g. division by zero or all elements non-finite).
|
|
"""
|
|
|
|
if parent_task.id.scan >= 0:
|
|
result_r = self.calc_scan_rfactor(parent_task)
|
|
else:
|
|
result_r = self.combine_rfactors(parent_task, child_tasks)
|
|
|
|
return result_r
|
|
|
|
def calc_scan_rfactor(self, task: CalculationTask) -> float:
|
|
"""
|
|
Calculate the R-factor of simulated diffraction data at the scan level.
|
|
|
|
The method calls the rfactor_func function of the scan referred by the task
|
|
to calculate the R-factor on the modulation functions referred to by the task.
|
|
|
|
Override this method in your project if you want to calculate the R-factor on other data,
|
|
e.g. unnormalized intensity.
|
|
|
|
The R-factor function should be selected in the scan object.
|
|
|
|
@param task: (CalculationTask) a calculation task at the scan level (task index must be >= 0).
|
|
task.modf_filename must point to a valid data file.
|
|
|
|
@return: (float) scalar R-factor
|
|
|
|
@raise ValueError if the function fails (e.g. division by zero or all elements non-finite).
|
|
"""
|
|
|
|
scan = self.scans[task.id.scan]
|
|
exp_data = scan.modulation
|
|
task_data = pmsco.data.load_data(task.modf_filename)
|
|
return scan.rfactor_func(exp_data, task_data, **scan.rfactor_args)
|
|
|
|
def combine_rfactors(self, parent_task: CalculationTask, child_tasks: Iterable[CalculationTask]) -> float:
|
|
"""
|
|
combine r-factors of child tasks.
|
|
|
|
the r-factors are taken from the rfac attribute of the child_tasks.
|
|
the result is an average of the child r-rfactors.
|
|
|
|
to produce a balanced result, every child dataset must contain a similar amount of information.
|
|
if this is not the case, the child r-factors must be weighted.
|
|
weighting is currently not implemented but may be introduced in a future version.
|
|
|
|
the method is intended to be used at the model level (children are scans).
|
|
though it can technically be used at any level where child r-factors are available.
|
|
|
|
@param parent_task: (CalculationTask) parent task for which the r-factor is calculated,
|
|
i.e. a model task.
|
|
|
|
@param child_tasks: (sequence of CalculationTask) child tasks of parent_tasks
|
|
that may be consulted for calculating the r-factor.
|
|
|
|
@return: (float) r-factor, NaN if parent task is invalid
|
|
|
|
@raise ValueError or IndexError if child_tasks is empty.
|
|
"""
|
|
|
|
if parent_task.result_valid:
|
|
return self.combine_rfactors_average(parent_task, child_tasks)
|
|
else:
|
|
return float('nan')
|
|
|
|
def combine_rfactors_average(self, parent_task: CalculationTask, child_tasks: Iterable[CalculationTask]) -> float:
|
|
"""
|
|
combine r-factors of child tasks.
|
|
|
|
the r-factors are taken from the rfac attribute of the child_tasks.
|
|
the result is an average of the child r-rfactors.
|
|
|
|
to produce a balanced result, every child dataset must contain a similar amount of information.
|
|
if this is not the case, the child r-factors must be weighted.
|
|
weighting is currently not implemented but may be introduced in a future version.
|
|
|
|
the method is intended to be used at the model level (children are scans).
|
|
though it can technically be used at any level where child r-factors are available.
|
|
|
|
@param parent_task: (CalculationTask) parent task for which the r-factor is calculated,
|
|
i.e. a model task.
|
|
|
|
@param child_tasks: (sequence of CalculationTask) child tasks of parent_tasks
|
|
that may be consulted for calculating the r-factor.
|
|
|
|
@return: (float) r-factor, NaN if parent task is invalid
|
|
|
|
@raise ValueError or IndexError if child_tasks is empty.
|
|
"""
|
|
if parent_task.result_valid:
|
|
rsum = 0.
|
|
count = 0
|
|
for task in child_tasks:
|
|
rsum += task.rfac
|
|
count += 1
|
|
return rsum / count
|
|
else:
|
|
return float('nan')
|
|
|
|
def combine_rfactors_datastack(self, parent_task: CalculationTask, child_tasks: Iterable[CalculationTask]) -> float:
|
|
"""
|
|
combine r-factors of child tasks by explicit calculation on the combined result.
|
|
|
|
this is an alternative implementation of combine_rfactors.
|
|
instead of using the r-factors from child tasks,
|
|
it re-calculates the r-factor for the combined dataset.
|
|
this method avoids the issue of weighting
|
|
but can introduce bias if the amplitudes of the child datasets differ substantially.
|
|
|
|
the experimental dataset is loaded from the file specified by the parent task,
|
|
the corresponding experimental data is taken from self.combined_modf.
|
|
|
|
to activate this method, assign it to combine_rfactors.
|
|
in the overriding __init__ or setup method:
|
|
@code{.py}
|
|
self.combine_rfactors = self.alt_combine_rfactors
|
|
@endcode
|
|
|
|
@param parent_task: (CalculationTask) parent task for which the r-factor is calculated,
|
|
i.e. a model task.
|
|
|
|
@param child_tasks: (sequence of CalculationTask) child tasks of parent_tasks
|
|
that may be consulted for calculating the r-factor.
|
|
|
|
@return: (float) r-factor, NaN if parent task is invalid
|
|
"""
|
|
if parent_task.result_valid:
|
|
task_data = pmsco.data.load_data(parent_task.modf_filename)
|
|
exp_data = self.combined_modf
|
|
return pmsco.data.default_rfactor(exp_data, task_data)
|
|
else:
|
|
return float('nan')
|
|
|
|
def export_cluster(self, index: CalcID, filename: PathLike, cluster: Cluster) -> Dict[str, str]:
|
|
"""
|
|
export the cluster of a calculation task in XYZ format for diagnostics and reporting.
|
|
|
|
this method is called with the final cluster just before it is handed over to the calculator.
|
|
it saves the atom coordinates in XYZ format for future reference (e.g. graphics).
|
|
|
|
the method creates two files:
|
|
@arg a file with extension '.xyz' contains the whole cluster in XYZ format.
|
|
@arg a file with extension '.emit.xyz' contains only emitter atoms in XYZ format.
|
|
|
|
the first part of the file name is formatted with the output name and the complete task identification.
|
|
the file is registered with the file tracker in the 'cluster' category
|
|
so that it will be deleted unless the cluster category is selected for keeping.
|
|
|
|
derived project class may override or extend this method
|
|
to carry out further diagnostics or reporting on the cluster.
|
|
|
|
@param index: (CalcID) calculation index to which the cluster belongs.
|
|
region may be -1 if only one cluster is exported for all regions
|
|
(clusters do not depend on the scan region).
|
|
emit may be -1 if the cluster is a master from which emitter-related child clusters are derived.
|
|
|
|
@param filename: (str) base file name for the output files.
|
|
the filename should be formatted using pmsco.dispatch.CalculationTask.format_filename().
|
|
extensions are appended by this method.
|
|
|
|
@param cluster: a pmsco.cluster.Cluster() object with all atom positions and emitters.
|
|
|
|
@return: dictionary listing the names of the created files with their category.
|
|
the dictionary key is the file name,
|
|
the value is the file category (cluster).
|
|
"""
|
|
_files = {}
|
|
xyz_filename = filename + ".xyz"
|
|
cluster.save_to_file(xyz_filename, fmt=pmsco.cluster.FMT_XYZ)
|
|
_files[xyz_filename] = 'cluster'
|
|
|
|
xyz_filename = filename + ".emit.xyz"
|
|
cluster.save_to_file(xyz_filename, fmt=pmsco.cluster.FMT_XYZ, emitters_only=True)
|
|
_files[xyz_filename] = 'cluster'
|
|
|
|
return _files
|
|
|
|
def before_atomic_scattering(self, task: CalculationTask, par: CalculatorParams, clu: Cluster) -> \
|
|
Tuple[Optional[CalculatorParams], Optional[Cluster]]:
|
|
|
|
"""
|
|
project hook before atomic scattering factors are calculated.
|
|
|
|
this method derives modified CalculatorParams and Cluster objects for the atomic scattering calculation
|
|
from the original objects that will be used in the multiple scattering calculation.
|
|
|
|
in the basic version, the method does not change the objects
|
|
except that it returns None for the root task (reference cluster).
|
|
subclasses may override it to modify or replace the cluster.
|
|
|
|
@param task: @ref pmsco.dispatch.CalculationTask object representing the current calculation task.
|
|
if the model index is -1, the project can return the global reference cluster
|
|
(to calculate the fixed scattering factors that will be used for all models)
|
|
or None if no global scattering factors should be calculated.
|
|
do not modify this object!
|
|
|
|
@param par: @ref pmsco.project.CalculatorParams object representing the preliminary
|
|
multiple scattering input parameters of the current task.
|
|
the method can make modifications to this object instance directly.
|
|
|
|
@param clu: @ref pmsco.cluster.Cluster object representing the preliminary
|
|
multiple scattering cluster of the current task.
|
|
the method can make modifications to this object instance directly.
|
|
|
|
@return: a tuple (par, clu) where par and clu are the input parameters and cluster
|
|
to be used for the calculation of atomic scattering factors.
|
|
these should either be the original function arguments,
|
|
or copies of the original arguments.
|
|
if atomic scattering factors should not be calculated, the return values should be None.
|
|
"""
|
|
|
|
if task.id.model >= 0:
|
|
return par, clu
|
|
else:
|
|
return None, None
|
|
|
|
def after_atomic_scattering(self, task: CalculationTask, par: CalculatorParams, clu: Cluster) -> \
|
|
Tuple[Optional[CalculatorParams], Optional[Cluster]]:
|
|
|
|
"""
|
|
project hook after atomic scattering factors are calculated.
|
|
|
|
this method cleans up the CalculatorParams and Cluster objects from the atomic scattering calculation
|
|
so that they can be used in the multiple scattering calculation.
|
|
|
|
in the basic version, the method just passes the input parameters for model tasks
|
|
and returns None for the root task.
|
|
subclasses may override it and modify the cluster and/or input parameters
|
|
so that the desired atomic scattering factors are used.
|
|
|
|
@param task: @ref pmsco.dispatch.CalculationTask object representing the current calculation task.
|
|
if the model index is -1, the project should return the global reference cluster
|
|
(to calculate the fixed scattering factors that will be used for all models)
|
|
or None if no global scattering factors should be calculated.
|
|
|
|
@param par: @ref pmsco.project.CalculatorParams object representing the preliminary
|
|
multiple scattering input parameters of the current task.
|
|
|
|
@param clu: @ref pmsco.cluster.Cluster object representing the preliminary
|
|
multiple scattering cluster of the current task.
|
|
do not modify this object, make a copy!
|
|
|
|
@return: a tuple (par, clu) where par and clu are the input parameters and cluster
|
|
to be used for the calculation of atomic scattering factors.
|
|
these should either be the original function arguments,
|
|
or copies of the original arguments.
|
|
"""
|
|
if task.id.model >= 0:
|
|
return par, clu
|
|
else:
|
|
return None, None
|
|
|
|
def cleanup(self):
|
|
"""
|
|
wrap up the calculation job.
|
|
|
|
- call final reports
|
|
- delete unwanted files at the end of a project
|
|
- close the database.
|
|
|
|
@return: None
|
|
"""
|
|
for report in self.reports:
|
|
if report.enabled and 'end' in report.trigger_levels:
|
|
logger.info(f"calling report {report.__class__.__name__} at end")
|
|
report.select_data(jobs=self._db.db_job_id)
|
|
report.create_report()
|
|
|
|
self.cleanup_files(incomplete_models=True)
|
|
self._db = None
|
|
|
|
def cleanup_files(self, keep: int = 0, incomplete_models: bool = False) -> None:
|
|
"""
|
|
delete uninteresting files (any time).
|
|
|
|
delete all files that
|
|
belong to one of the self.files.categories_to_delete categories or
|
|
do not belong to one of the "best" models.
|
|
|
|
"best" models are a number (self.keep_best) of models that gave the lowest R-factors
|
|
at each task level from root to self.keep_levels.
|
|
for example if `keep_best = 10` and `keep_levels = 1`
|
|
the 10 best models at the top level, and the 10 best at the scan level are kept.
|
|
this means that in total up to `n = 10 + 10 * n_scans` models may be kept,
|
|
where n_scans is the number of scan files in the job.
|
|
|
|
this method can be called at any time during the calculation process.
|
|
it executes on complete models only
|
|
unless incomplete_models is True.
|
|
|
|
@param keep: minimum number of best models to keep.
|
|
0 (default): use the project parameter self.keep_best.
|
|
|
|
@param incomplete_models: (bool) delete files of incomplete models as well.
|
|
by default (False), incomplete models are not deleted.
|
|
|
|
@return None
|
|
"""
|
|
self.files.delete_files(incomplete_models=incomplete_models)
|
|
if 'rfac' in self.files.categories_to_delete:
|
|
keep = max(keep, self.keep_best)
|
|
keepers = self._db.query_best_task_models(self.keep_levels, keep)
|
|
self.files.delete_models(keep=keepers)
|