diff --git a/ophyd_devices/__init__.py b/ophyd_devices/__init__.py index eccecdf..9b3073f 100644 --- a/ophyd_devices/__init__.py +++ b/ophyd_devices/__init__.py @@ -8,13 +8,15 @@ from .npoint.npoint import NPointAxis from .rt_lamni import RtFlomniMotor, RtLamniMotor from .sim.sim import SimCamera from .sim.sim import SimMonitor +from .sim.sim import SimFlyer +from .sim.sim import SimFlyer as SynFlyer from .sim.sim import SimMonitor as SynAxisMonitor from .sim.sim import SimMonitor as SynGaussBEC from .sim.sim import SimPositioner from .sim.sim import SimPositioner as SynAxisOPAAS -from .sim.sim import SynDeviceOPAAS, SynFlyer +from .sim.sim import SynDeviceOPAAS from .sim.sim_signals import ReadOnlySignal -from .sim.sim_frameworks import DeviceProxy, SlitLookup +from .sim.sim_frameworks import DeviceProxy, SlitProxy from .sim.sim_signals import ReadOnlySignal as SynSignalRO from .sls_devices.sls_devices import SLSInfo, SLSOperatorMessages from .smaract.smaract_ophyd import SmaractMotor diff --git a/ophyd_devices/sim/__init__.py b/ophyd_devices/sim/__init__.py index 2cd6e77..48b5c93 100644 --- a/ophyd_devices/sim/__init__.py +++ b/ophyd_devices/sim/__init__.py @@ -2,11 +2,11 @@ from .sim import ( SimPositioner, SimMonitor, SimCamera, - SynDynamicComponents, - SynFlyer, + SimFlyer, + SimFlyer as SynFlyer, ) from .sim_xtreme import SynXtremeOtf -from .sim_signals import SetableSignal, ReadOnlySignal, ComputedReadOnlySignal +from .sim_signals import SetableSignal, ReadOnlySignal -from .sim_frameworks import SlitLookup +from .sim_frameworks import SlitProxy diff --git a/ophyd_devices/sim/sim.py b/ophyd_devices/sim/sim.py index 6dcae0f..fd64603 100644 --- a/ophyd_devices/sim/sim.py +++ b/ophyd_devices/sim/sim.py @@ -1,23 +1,32 @@ -from collections import defaultdict import os import threading import time as ttime -import warnings - import numpy as np + from bec_lib import MessageEndpoints, bec_logger, messages + from ophyd import Component as Cpt from ophyd import DynamicDeviceComponent as Dcpt from ophyd import Device, DeviceStatus, Kind -from ophyd import PositionerBase, Signal +from ophyd import PositionerBase + +from ophyd.flyers import FlyerInterface + from ophyd.sim import SynSignal +from ophyd.status import StatusBase from ophyd.utils import LimitError + from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin -from ophyd_devices.sim.sim_data import SimulatedDataBase, SimulatedDataCamera, SimulatedDataMonitor + +from ophyd_devices.sim.sim_data import ( + SimulatedPositioner, + SimulatedDataCamera, + SimulatedDataMonitor, +) from ophyd_devices.sim.sim_test_devices import DummyController -from ophyd_devices.sim.sim_signals import SetableSignal, ReadOnlySignal, ComputedReadOnlySignal +from ophyd_devices.sim.sim_signals import SetableSignal, ReadOnlySignal logger = bec_logger.logger @@ -46,11 +55,11 @@ class SimMonitor(Device): """ - USER_ACCESS = ["sim"] + USER_ACCESS = ["sim", "registered_proxies"] sim_cls = SimulatedDataMonitor - readback = Cpt(ComputedReadOnlySignal, value=0, kind=Kind.hinted) + readback = Cpt(ReadOnlySignal, value=0, kind=Kind.hinted, compute_readback=True) SUB_READBACK = "readback" _default_sub = SUB_READBACK @@ -69,16 +78,16 @@ class SimMonitor(Device): self.precision = precision self.init_sim_params = sim_init self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs) - self._lookup_table = [] + self._registered_proxies = {} super().__init__(name=name, parent=parent, kind=kind, **kwargs) self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None) self.readback.name = self.name @property - def lookup_table(self) -> None: - """lookup_table property""" - return self._lookup_table + def registered_proxies(self) -> None: + """Dictionary of registered signal_names and proxies.""" + return self._registered_proxies class SimCamera(Device): @@ -100,7 +109,7 @@ class SimCamera(Device): """ - USER_ACCESS = ["sim"] + USER_ACCESS = ["sim", "registered_proxies"] sim_cls = SimulatedDataCamera SHAPE = (100, 100) @@ -116,9 +125,10 @@ class SimCamera(Device): image_shape = Cpt(SetableSignal, name="image_shape", value=SHAPE, kind=Kind.config) image = Cpt( - ComputedReadOnlySignal, + ReadOnlySignal, name="image", value=np.empty(SHAPE, dtype=np.uint16), + compute_readback=True, kind=Kind.omitted, ) @@ -134,7 +144,7 @@ class SimCamera(Device): ): self.device_manager = device_manager self.init_sim_params = sim_init - self._lookup_table = [] + self._registered_proxies = {} self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs) super().__init__(name=name, parent=parent, kind=kind, **kwargs) @@ -144,9 +154,9 @@ class SimCamera(Device): self._update_scaninfo() @property - def lookup_table(self) -> None: - """lookup_table property""" - return self._lookup_table + def registered_proxies(self) -> None: + """Dictionary of registered signal_names and proxies.""" + return self._registered_proxies def trigger(self) -> DeviceStatus: """Trigger the camera to acquire images. @@ -225,30 +235,29 @@ class SimPositioner(Device, PositionerBase): """ A simulated device mimicing any 1D Axis device (position, temperature, rotation). + >>> motor = SimPositioner(name="motor") + Parameters ---------- - name : string, keyword only - readback_func : callable, optional - When the Device is set to ``x``, its readback will be updated to - ``f(x)``. This can be used to introduce random noise or a systematic - offset. - Expected signature: ``f(x) -> value``. - value : object, optional - The initial value. Default is 0. - delay : number, optional - Simulates how long it takes the device to "move". Default is 0 seconds. - precision : integer, optional - Digits of precision. Default is 3. - parent : Device, optional - Used internally if this Signal is made part of a larger Device. - kind : a member the Kind IntEnum (or equivalent integer), optional - Default is Kind.normal. See Kind for options. + name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device.\ + Optional parameters: + ---------- + delay (int) : If 0, execution of move will be instant. If 1, exectution will depend on motor velocity. Default is 1. + update_frequency (int) : Frequency in Hz of the update of the simulated state during a move. Default is 2 Hz. + precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits. + tolerance (float) : Tolerance of the positioner to accept reaching target positions. Default is 0.5. + limits (tuple) : Tuple of the low and high limits of the positioner. Overrides low/high_limit_travel is specified. Default is None. + parent : Parent device, optional, is used internally if this signal/device is part of a larger device. + kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options. + device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically. + sim_init (dict) : Dictionary to initiate parameters of the simulation, check simulation type defaults for more details. + """ # Specify which attributes are accessible via BEC client - USER_ACCESS = ["sim", "readback", "speed", "dummy_controller"] + USER_ACCESS = ["sim", "readback", "speed", "dummy_controller", "registered_proxies"] - sim_cls = SimulatedDataBase + sim_cls = SimulatedPositioner # Define the signals as class attributes readback = Cpt(ReadOnlySignal, name="readback", value=0, kind=Kind.hinted) @@ -256,60 +265,61 @@ class SimPositioner(Device, PositionerBase): motor_is_moving = Cpt(SetableSignal, value=0, kind=Kind.normal) # Config signals - velocity = Cpt(SetableSignal, value=1, kind=Kind.config) + velocity = Cpt(SetableSignal, value=100, kind=Kind.config) acceleration = Cpt(SetableSignal, value=1, kind=Kind.config) # Ommitted signals high_limit_travel = Cpt(SetableSignal, value=0, kind=Kind.omitted) low_limit_travel = Cpt(SetableSignal, value=0, kind=Kind.omitted) - unused = Cpt(Signal, value=1, kind=Kind.omitted) + unused = Cpt(SetableSignal, value=1, kind=Kind.omitted) SUB_READBACK = "readback" _default_sub = SUB_READBACK + # pylint: disable=too-many-arguments def __init__( self, - *, name, - readback_func=None, - value=0, - delay=1, - speed=1, + *, + delay: int = 1, update_frequency=2, precision=3, - parent=None, - labels=None, - kind=None, - limits=None, tolerance: float = 0.5, - sim: dict = None, + limits=None, + parent=None, + kind=None, + device_manager=None, + sim_init: dict = None, + # TODO remove after refactoring config + speed: float = 100, **kwargs, ): - # Whether motions should be instantaneous or depend on motor velocity self.delay = delay + self.device_manager = device_manager self.precision = precision self.tolerance = tolerance - self.init_sim_params = sim - self._lookup_table = [] + self.init_sim_params = sim_init + self._registered_proxies = {} - self.speed = speed self.update_frequency = update_frequency self._stopped = False self.dummy_controller = DummyController() - # initialize inner dictionary with simulated state self.sim = self.sim_cls(parent=self, **kwargs) - super().__init__(name=name, labels=labels, parent=parent, kind=kind, **kwargs) - # Rename self.readback.name to self.name, also in self.sim_state + super().__init__(name=name, parent=parent, kind=kind, **kwargs) self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None) self.readback.name = self.name - # Init limits from deviceConfig if limits is not None: assert len(limits) == 2 self.low_limit_travel.put(limits[0]) self.high_limit_travel.put(limits[1]) + # @property + # def connected(self): + # """Return the connected state of the simulated device.""" + # return self.dummy_controller.connected + @property def limits(self): """Return the limits of the simulated device.""" @@ -325,11 +335,11 @@ class SimPositioner(Device, PositionerBase): """Return the high limit of the simulated device.""" return self.limits[1] - @property - def lookup_table(self) -> None: - """lookup_table property""" - return self._lookup_table + def registered_proxies(self) -> None: + """Dictionary of registered signal_names and proxies.""" + return self._registered_proxies + # pylint: disable=arguments-differ def check_value(self, value: any): """ Check that requested position is within existing limits. @@ -375,42 +385,41 @@ class SimPositioner(Device, PositionerBase): st = DeviceStatus(device=self) if self.delay: - # If self.delay is not 0, we use the speed and updated frequency of the device to compute the motion + def move_and_finish(): """Move the simulated device and finish the motion.""" success = True try: - # Compute final position with some jitter move_val = self._get_sim_state( self.setpoint.name ) + self.tolerance * np.random.uniform(-1, 1) - # Compute the number of updates needed to reach the final position with the given speed + updates = np.ceil( - np.abs(old_setpoint - move_val) / self.speed * self.update_frequency + np.abs(old_setpoint - move_val) + / self.velocity.get() + * self.update_frequency ) - # Loop over the updates and update the state of the simulated device + for ii in np.linspace(old_setpoint, move_val, int(updates)): ttime.sleep(1 / self.update_frequency) update_state(ii) - # Update the state of the simulated device to the final position + update_state(move_val) self._set_sim_state(self.motor_is_moving, 0) except DeviceStop: success = False finally: self._stopped = False - # Call function from positioner base to indicate that motion finished with success self._done_moving(success=success) - # Set status to finished + self._set_sim_state(self.motor_is_moving.name, 0) st.set_finished() - # Start motion in Thread threading.Thread(target=move_and_finish, daemon=True).start() else: - # If self.delay is 0, we move the simulated device instantaneously update_state(value) self._done_moving() + self._set_sim_state(self.motor_is_moving.name, 0) st.set_finished() return st @@ -420,7 +429,7 @@ class SimPositioner(Device, PositionerBase): self._stopped = True @property - def position(self): + def position(self) -> float: """Return the current position of the simulated device.""" return self.readback.get() @@ -430,57 +439,81 @@ class SimPositioner(Device, PositionerBase): return "mm" -class SynFlyer(Device, PositionerBase): +class SimFlyer(Device, PositionerBase, FlyerInterface): + """A simulated device mimicing any 2D Flyer device (position, temperature, rotation). + + The corresponding simulation class is sim_cls=SimulatedPositioner, more details on defaults within the simulation class. + + >>> flyer = SimFlyer(name="flyer") + + Parameters + ---------- + name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device. + precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits. + parent : Parent device, optional, is used internally if this signal/device is part of a larger device. + kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options. + device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically. + """ + + USER_ACCESS = ["sim", "registered_proxies"] + + sim_cls = SimulatedPositioner + + readback = Cpt( + ReadOnlySignal, name="readback", value=0, kind=Kind.hinted, compute_readback=False + ) + def __init__( self, + name: str, *, - name, - readback_func=None, - value=0, - delay=0, - speed=1, - update_frequency=2, - precision=3, + precision: int = 3, parent=None, - labels=None, kind=None, device_manager=None, + # TODO remove after refactoring config + speed: float = 100, + delay: int = 1, + update_frequency: int = 100, **kwargs, ): - if readback_func is None: - def readback_func(x): - return x - - sentinel = object() - loop = kwargs.pop("loop", sentinel) - if loop is not sentinel: - warnings.warn( - f"{self.__class__} no longer takes a loop as input. " - "Your input will be ignored and may raise in the future", - stacklevel=2, - ) - self.sim_state = {} - self._readback_func = readback_func - self.delay = delay + self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs) self.precision = precision - self.tolerance = kwargs.pop("tolerance", 0.5) self.device_manager = device_manager + self._registered_proxies = {} - # initialize values - self.sim_state["readback"] = readback_func(value) - self.sim_state["readback_ts"] = ttime.time() + super().__init__(name=name, parent=parent, kind=kind, **kwargs) + self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None) + self.readback.name = self.name - super().__init__(name=name, parent=parent, labels=labels, kind=kind, **kwargs) + @property + def registered_proxies(self) -> None: + """Dictionary of registered signal_names and proxies.""" + return self._registered_proxies @property def hints(self): + """Return the hints of the simulated device.""" return {"fields": ["flyer_samx", "flyer_samy"]} + @property + def egu(self) -> str: + """Return the engineering units of the simulated device.""" + return "mm" + + def complete(self) -> StatusBase: + """Complete the motion of the simulated device.""" + status = DeviceStatus(self) + status.set_finished() + return status + def kickoff(self, metadata, num_pos, positions, exp_time: float = 0): + """Kickoff the flyer to execute code during the scan.""" positions = np.asarray(positions) def produce_data(device, metadata): + """Simulate the data being produced by the flyer.""" buffer_time = 0.2 elapsed_time = 0 bundle = messages.BundleMessage() @@ -529,10 +562,6 @@ class SynFlyer(Device, PositionerBase): flyer.start() -class SynDynamicComponents(Device): - messages = Dcpt({f"message{i}": (SynSignal, None, {"name": f"msg{i}"}) for i in range(1, 6)}) - - class SynDeviceSubOPAAS(Device): zsub = Cpt(SimPositioner, name="zsub") @@ -541,3 +570,12 @@ class SynDeviceOPAAS(Device): x = Cpt(SimPositioner, name="x") y = Cpt(SimPositioner, name="y") z = Cpt(SynDeviceSubOPAAS, name="z") + + +class SynDynamicComponents(Device): + messages = Dcpt({f"message{i}": (SynSignal, None, {"name": f"msg{i}"}) for i in range(1, 6)}) + + +if __name__ == "__main__": + cam = SimCamera(name="cam") + cam.image.read() diff --git a/ophyd_devices/sim/sim_data.py b/ophyd_devices/sim/sim_data.py index 3179503..4443515 100644 --- a/ophyd_devices/sim/sim_data.py +++ b/ophyd_devices/sim/sim_data.py @@ -1,10 +1,15 @@ from __future__ import annotations from collections import defaultdict +from abc import ABC, abstractmethod + +from prettytable import PrettyTable import enum +import inspect import time as ttime import numpy as np +from lmfit import models, Model from bec_lib import bec_logger @@ -15,11 +20,11 @@ class SimulatedDataException(Exception): """Exception raised when there is an issue with the simulated data.""" -class SimulationType(str, enum.Enum): +class SimulationType2D(str, enum.Enum): """Type of simulation to steer simulated data.""" CONSTANT = "constant" - GAUSSIAN = "gauss" + GAUSSIAN = "gaussian" class NoiseType(str, enum.Enum): @@ -37,178 +42,374 @@ class HotPixelType(str, enum.Enum): FLUCTUATING = "fluctuating" -class SimulatedDataBase: +DEFAULT_PARAMS_LMFIT = { + "c0": 1, + "c1": 1, + "c2": 1, + "c3": 1, + "c4": 1, + "c": 100, + "amplitude": 100, + "center": 0, + "sigma": 1, +} + +DEFAULT_PARAMS_NOISE = { + "noise": NoiseType.UNIFORM, + "noise_multiplier": 10, +} + +DEFAULT_PARAMS_MOTOR = { + "ref_motor": "samx", +} + +DEFAULT_PARAMS_CAMERA_GAUSSIAN = { + "amplitude": 100, + "center_offset": np.array([0, 0]), + "covariance": np.array([[400, 100], [100, 400]]), +} + +DEFAULT_PARAMS_CAMERA_CONSTANT = { + "amplitude": 100, +} + +DEFAULT_PARAMS_HOT_PIXEL = { + "hot_pixel_coords": np.array([[24, 24], [50, 20], [4, 40]]), + "hot_pixel_types": [ + HotPixelType.FLUCTUATING, + HotPixelType.CONSTANT, + HotPixelType.FLUCTUATING, + ], + "hot_pixel_values": np.array([1e4, 1e6, 1e4]), +} + + +class SimulatedDataBase(ABC): + """Abstract base class for simulated data. + + This class should be subclassed to implement the simulated data for a specific device. + It provides the basic functionality to set and get data from the simulated data class + + --------------------- + The class provides the following methods: + + - execute_simulation_method: execute a method from the simulated data class or reroute execution to device proxy class + - sim_select_model: select the active simulation model + - sim_params: get the parameters for the active simulation mdoel + - sim_models: get the available simulation models + - update_sim_state: update the simulated state of the device + """ + USER_ACCESS = [ - "get_sim_params", - "set_sim_params", - "get_sim_type", - "set_sim_type", + "sim_params", + "sim_select_model", + "sim_get_models", + "sim_show_all", ] def __init__(self, *args, parent=None, device_manager=None, **kwargs) -> None: + """ + Note: + self._model_params duplicates parameters from _params that are solely relevant for the model used. + This facilitates easier and faster access for computing the simulated state using the lmfit package. + """ self.parent = parent - self.sim_state = defaultdict(dict) - self._all_params = defaultdict(dict) self.device_manager = device_manager - self._simulation_type = None - self.lookup_table = getattr(self.parent, "lookup_table", []) - self.init_paramaters(**kwargs) - self._active_params = self._all_params.get(self._simulation_type, None) + self.sim_state = defaultdict(dict) + self.registered_proxies = getattr(self.parent, "registered_proxies", {}) + self._model = {} + self._model_params = None + self._params = {} - def execute_simulation_method(self, *args, method=None, **kwargs) -> any: - """Execute the method from the lookup table.""" - if self.lookup_table and self.device_manager.devices.get(self.lookup_table[0]) is not None: - sim_device = self.device_manager.devices.get(self.lookup_table[0]) - # pylint: disable=protected-access - if sim_device.enabled is True: - method = sim_device.obj.lookup[self.parent.name]["method"] - args = sim_device.obj.lookup[self.parent.name]["args"] - kwargs = sim_device.obj.lookup[self.parent.name]["kwargs"] + def execute_simulation_method(self, *args, method=None, signal_name: str = "", **kwargs) -> any: + """ + Execute either the provided method or reroutes the method execution + to a device proxy in case it is registered in self.parentregistered_proxies. + """ + if self.registered_proxies and self.device_manager: + for proxy_name, signal in self.registered_proxies.items(): + if signal == signal_name or f"{self.parent.name}_{signal}" == signal_name: + sim_proxy = self.device_manager.devices.get(proxy_name, None) + if sim_proxy and sim_proxy.enabled is True: + method = sim_proxy.obj.lookup[self.parent.name]["method"] + args = sim_proxy.obj.lookup[self.parent.name]["args"] + kwargs = sim_proxy.obj.lookup[self.parent.name]["kwargs"] + break if method is not None: return method(*args, **kwargs) raise SimulatedDataException(f"Method {method} is not available for {self.parent.name}") - def init_paramaters(self, **kwargs): - """Initialize the parameters for the Simulated Data - - This methods should be implemented by the subclass. - - It sets the default parameters for the simulated data in - self._params and calls self._update_init_params() + def sim_select_model(self, model: str) -> None: """ - - def get_sim_params(self) -> dict: - """Return the currently parameters for the active simulation type in sim_type. - - These parameters can be changed with set_sim_params. - - Returns: - dict: Parameters of the currently active simulation in sim_type. - """ - return self._active_params - - def set_sim_params(self, params: dict) -> None: - """Change the current set of parameters for the active simulation type. + Method to select the active simulation model. + It will initiate the model_cls and parameters for the model. Args: - params (dict): New parameters for the active simulation type. + model (str): Name of the simulation model to select. - Raises: - SimulatedDataException: If the new parameters can not be set or is not part of the parameters initiated. """ - for k, v in params.items(): - try: - if k == "noise": - self._active_params[k] = NoiseType(v) - else: - self._active_params[k] = v - except Exception as exc: - raise SimulatedDataException( - f"Could not set {k} to {v} in {self._active_params} with exception {exc}" - ) from exc + model_cls = self.get_model_cls(model) + self._model = model_cls() if callable(model_cls) else model_cls + self._params = self.get_params_for_model_cls() + self._params.update(self._get_additional_params()) + print(self._get_table_active_simulation()) - def get_sim_type(self) -> SimulationType: - """Return the simulation type of the simulation. + @property + def sim_params(self) -> dict: + """ + Property that returns the parameters for the active simulation model. It can also + be used to set the parameters for the active simulation updating the parameters of the model. Returns: - SimulationType: Type of simulation (e.g. "constant" or "gauss). + dict: Parameters for the active simulation model. + + The following example shows how to update the noise parameter of the current simulation. + >>> dev..sim.sim_params = {"noise": "poisson"} """ - return self._simulation_type + return self._params - def set_sim_type(self, simulation_type: SimulationType) -> None: - """Set the simulation type of the simulation.""" - try: - self._simulation_type = SimulationType(simulation_type) - except ValueError as exc: - raise SimulatedDataException( - f"Could not set simulation type to {simulation_type}. Valid options are 'constant'" - " and 'gauss'" - ) from exc - self._active_params = self._all_params.get(self._simulation_type, None) - - def _compute_sim_state(self, signal_name: str) -> None: - """Update the simulated state of the device. - - If no computation is relevant, ignore this method. - Otherwise implement it in the subclass. + @sim_params.setter + def sim_params(self, params: dict): """ + Method to set the parameters for the active simulation model. + """ + for k, v in params.items(): + if k in self.sim_params: + if k == "noise": + self._params[k] = NoiseType(v) + elif k == "hot_pixel_types": + self._params[k] = [HotPixelType(entry) for entry in v] + else: + self._params[k] = v + if isinstance(self._model, Model) and k in self._model_params: + self._model_params[k].value = v + else: + raise SimulatedDataException(f"Parameter {k} not found in {self.sim_params}.") + + def sim_get_models(self) -> list: + """ + Method to get the all available simulation models. + """ + return self.get_all_sim_models() def update_sim_state(self, signal_name: str, value: any) -> None: """Update the simulated state of the device. Args: signal_name (str): Name of the signal to update. + value (any): Value to update in the simulated state. """ self.sim_state[signal_name]["value"] = value self.sim_state[signal_name]["timestamp"] = ttime.time() - def _update_init_params( - self, - sim_type_default: SimulationType, - ) -> None: - """Update the initial parameters of the simulated data with input from deviceConfig. + @abstractmethod + def _get_additional_params(self) -> dict: + """Initialize the default parameters for the noise.""" - Args: - sim_type_default (SimulationType): Default simulation type to use if not specified in deviceConfig. + @abstractmethod + def get_model_cls(self, model: str) -> any: """ - init_params = getattr(self.parent, "init_sim_params", None) - for sim_type in self._all_params.values(): - for sim_type_config_element in sim_type: - if init_params: - if sim_type_config_element in init_params: - sim_type[sim_type_config_element] = init_params[sim_type_config_element] - # Set simulation type to default if not specified in deviceConfig - sim_type_select = ( - init_params.get("sim_type", sim_type_default) if init_params else sim_type_default + Method to get the class for the active simulation model_cls + """ + + @abstractmethod + def get_params_for_model_cls(self) -> dict: + """ + Method to get the parameters for the active simulation model. + """ + + @abstractmethod + def get_all_sim_models(self) -> list[str]: + """ + Method to get all names from the available simulation models. + + Returns: + list: List of available simulation models. + """ + + @abstractmethod + def compute_sim_state(self, signal_name: str, compute_readback: bool) -> None: + """ + Method to compute the simulated state of the device. + """ + + def _get_table_active_simulation(self, width: int = 140) -> PrettyTable: + """Return a table with the active simulation model and parameters.""" + table = PrettyTable() + table.title = f"Currently active model: {self._model}" + table.field_names = ["Parameter", "Value", "Type"] + for k, v in self.sim_params.items(): + table.add_row([k, f"{v}", f"{type(v)}"]) + table._min_width["Parameter"] = 25 if width > 75 else width // 3 + table._min_width["Type"] = 25 if width > 75 else width // 3 + table.max_table_width = width + table._min_table_width = width + + return table + + def _get_table_method_information(self, width: int = 140) -> PrettyTable: + """Return a table with the information about methods.""" + table = PrettyTable() + table.max_width["Value"] = 120 + table.hrules = 1 + table.title = "Available methods within the simulation module" + table.field_names = ["Method", "Docstring"] + + table.add_row( + [ + self.sim_get_models.__name__, + f"{self.sim_get_models.__doc__}", + ] ) - self.set_sim_type(sim_type_select) + table.add_row([self.sim_select_model.__name__, self.sim_select_model.__doc__]) + table.add_row(["sim_params", self.__class__.sim_params.__doc__]) + table.max_table_width = width + table._min_table_width = width + table.align["Docstring"] = "l" + + return table + + def sim_show_all(self): + """Returns a summary about the active simulation and available methods.""" + width = 150 + print(self._get_table_active_simulation(width=width)) + print(self._get_table_method_information(width=width)) + table = PrettyTable() + table.title = "Simulation module for current device" + table.field_names = ["All available models"] + table.add_row([", ".join(self.get_all_sim_models())]) + table.max_table_width = width + table._min_table_width = width + print(table) + + +class SimulatedPositioner(SimulatedDataBase): + """Simulated data class for a positioner.""" + + def _init_default_additional_params(self) -> None: + """No need to init additional parameters for Positioner.""" + + def get_model_cls(self, model: str) -> any: + """For the simulated positioners, no simulation models are currently implemented.""" + return None + + def get_params_for_model_cls(self) -> dict: + """For the simulated positioners, no simulation models are currently implemented.""" + return {} + + def get_all_sim_models(self) -> list[str]: + """ + For the simulated positioners, no simulation models are currently implemented. + + Returns: + list: List of available simulation models. + """ + return [] + + def _get_additional_params(self) -> dict: + """No need to add additional parameters for Positioner.""" + return {} + + def compute_sim_state(self, signal_name: str, compute_readback: bool) -> None: + """ + For the simulated positioners, a computed signal is currently not used. + The position is updated by the parent device, and readback/setpoint values + have a jitter/tolerance introduced directly in the parent class (SimPositioner). + """ + if compute_readback: + method = None + value = self.execute_simulation_method(method=method, signal_name=signal_name) + self.update_sim_state(signal_name, value) class SimulatedDataMonitor(SimulatedDataBase): - """Simulated data for a monitor.""" + """Simulated data class for a monitor.""" - def init_paramaters(self, **kwargs): - """Initialize the parameters for the simulated data + def __init__(self, *args, parent=None, device_manager=None, **kwargs) -> None: + self._model_lookup = self.init_lmfit_models() + super().__init__(*args, parent=parent, device_manager=device_manager, **kwargs) + self._init_default() - This method will fill self._all_params with the default parameters for - SimulationType.CONSTANT and SimulationType.GAUSSIAN. - New simulation types can be added by adding a new key to self._all_params, - together with the required parameters for that simulation type. Please - also complement the docstring of this method with the new simulation type. + def _get_additional_params(self) -> None: + params = DEFAULT_PARAMS_NOISE.copy() + params.update(DEFAULT_PARAMS_MOTOR.copy()) + return params - For SimulationType.CONSTANT: - Amp is the amplitude of the constant value. - Noise is the type of noise to add to the signal. Available options are 'poisson', 'uniform' or 'none'. - Noise multiplier is the multiplier of the noise, only relevant for uniform noise. + def _init_default(self) -> None: + """Initialize the default parameters for the simulated data.""" + self.sim_select_model("ConstantModel") - For SimulationType.GAUSSIAN: - ref_motor is the motor that is used as reference to compute the gaussian. - amp is the amplitude of the gaussian. - cen is the center of the gaussian. - sig is the sigma of the gaussian. - noise is the type of noise to add to the signal. Available options are 'poisson', 'uniform' or 'none'. - noise multiplier is the multiplier of the noise, only relevant for uniform noise. + def get_model_cls(self, model: str) -> any: + """Get the class for the active simulation model.""" + if model not in self._model_lookup: + raise SimulatedDataException(f"Model {model} not found in {self._model_lookup.keys()}.") + return self._model_lookup[model] + + def get_all_sim_models(self) -> list[str]: """ - self._all_params = { - SimulationType.CONSTANT: { - "amp": 100, - "noise": NoiseType.POISSON, - "noise_multiplier": 0.1, - }, - SimulationType.GAUSSIAN: { - "ref_motor": "samx", - "amp": 100, - "cen": 0, - "sig": 1, - "noise": NoiseType.NONE, - "noise_multiplier": 0.1, - }, - } - # Update init parameters and set simulation type to Constant if not specified otherwise in init_sim_params - self._update_init_params(sim_type_default=SimulationType.CONSTANT) + Method to get all names from the available simulation models from the lmfit.models pool. - def _compute_sim_state(self, signal_name: str) -> None: + Returns: + list: List of available simulation models. + """ + return list(self._model_lookup.keys()) + + def get_params_for_model_cls(self) -> dict: + """Get the parameters for the active simulation model. + + Check if default parameters are available for lmfit parameters. + + Args: + sim_model (str): Name of the simulation model. + Returns: + dict: {name: value} for the active simulation model. + """ + rtr = {} + params = self._model.make_params() + for name, parameter in params.items(): + if name in DEFAULT_PARAMS_LMFIT: + rtr[name] = DEFAULT_PARAMS_LMFIT[name] + parameter.value = rtr[name] + else: + if not any([np.isnan(parameter.value), np.isinf(parameter.value)]): + rtr[name] = parameter.value + else: + rtr[name] = 1 + parameter.value = 1 + self._model_params = params + return rtr + + def model_lookup(self): + """Get available models from lmfit.models.""" + return self._model_lookup + + def init_lmfit_models(self) -> dict: + """ + Get available models from lmfit.models. + + Exclude Gaussian2dModel, ExpressionModel, Model, SplineModel. + + Returns: + dictionary of model name : model class pairs for available models from LMFit. + """ + model_lookup = {} + for name, model_cls in inspect.getmembers(models): + try: + is_model = issubclass(model_cls, Model) + except TypeError: + is_model = False + if is_model and name not in [ + "Gaussian2dModel", + "ExpressionModel", + "Model", + "SplineModel", + ]: + model_lookup[name] = model_cls + + return model_lookup + + def compute_sim_state(self, signal_name: str, compute_readback: bool) -> None: """Update the simulated state of the device. It will update the value in self.sim_state with the value computed by @@ -217,153 +418,193 @@ class SimulatedDataMonitor(SimulatedDataBase): Args: signal_name (str): Name of the signal to update. """ - if self.get_sim_type() == SimulationType.CONSTANT: - method = "_compute_constant" - # value = self._compute_constant() - elif self.get_sim_type() == SimulationType.GAUSSIAN: - method = "_compute_gaussian" - # value = self._compute_gaussian() + if compute_readback: + method = self._compute + value = self.execute_simulation_method(method=method, signal_name=signal_name) + self.update_sim_state(signal_name, value) - value = self.execute_simulation_method(method=getattr(self, method)) + def _compute(self, *args, **kwargs) -> float: + mot_name = self.sim_params["ref_motor"] + if self.device_manager and mot_name in self.device_manager.devices: + motor_pos = self.device_manager.devices[mot_name].obj.read()[mot_name]["value"] + else: + motor_pos = 0 + method = self._model + value = float(method.eval(params=self._model_params, x=motor_pos)) + return self._add_noise(value, self.sim_params["noise"], self.sim_params["noise_multiplier"]) + + def _add_noise(self, v: float, noise: NoiseType, noise_multiplier: float) -> float: + """ + Add the currently activated noise to the simulated data. + If NoiseType.NONE is active, the value will be returned + + Args: + v (float): Value to add noise to. + Returns: + float: Value with added noise. + """ + if noise == NoiseType.POISSON: + ceiled_v = np.ceil(v) + v = np.random.poisson(ceiled_v, 1)[0] if ceiled_v > 0 else ceiled_v + return v + elif noise == NoiseType.UNIFORM: + v += np.random.uniform(-1, 1) * noise_multiplier + return v + return v + + +class SimulatedDataCamera(SimulatedDataBase): + """Simulated class to compute data for a 2D camera.""" + + def __init__(self, *args, parent=None, device_manager=None, **kwargs) -> None: + self._model_lookup = self.init_2D_models() + self._all_default_model_params = defaultdict(dict) + self._init_default_camera_params() + super().__init__(*args, parent=parent, device_manager=device_manager, **kwargs) + self._init_default() + + def _init_default(self) -> None: + """Initialize the default model for a simulated camera + + Use the default model "Gaussian". + """ + self.sim_select_model(SimulationType2D.GAUSSIAN) + + def init_2D_models(self) -> dict: + """ + Get the available models for 2D camera simulations. + """ + model_lookup = {} + for _, model_cls in inspect.getmembers(SimulationType2D): + if isinstance(model_cls, SimulationType2D): + model_lookup[model_cls.value] = model_cls + return model_lookup + + def _get_additional_params(self) -> None: + params = DEFAULT_PARAMS_NOISE.copy() + params.update(DEFAULT_PARAMS_HOT_PIXEL.copy()) + return params + + def _init_default_camera_params(self) -> None: + """Initiate additional params for the simulated camera.""" + self._all_default_model_params.update( + { + self._model_lookup[ + SimulationType2D.CONSTANT.value + ]: DEFAULT_PARAMS_CAMERA_CONSTANT.copy() + } + ) + self._all_default_model_params.update( + { + self._model_lookup[ + SimulationType2D.GAUSSIAN.value + ]: DEFAULT_PARAMS_CAMERA_GAUSSIAN.copy() + } + ) + + def get_model_cls(self, model: str) -> any: + """For the simulated positioners, no simulation models are currently implemented.""" + if model not in self._model_lookup: + raise SimulatedDataException(f"Model {model} not found in {self._model_lookup.keys()}.") + return self._model_lookup[model] + + def get_params_for_model_cls(self) -> dict: + """For the simulated positioners, no simulation models are currently implemented.""" + return self._all_default_model_params[self._model.value] + + def get_all_sim_models(self) -> list[str]: + """ + For the simulated positioners, no simulation models are currently implemented. + + Returns: + list: List of available simulation models. + """ + return [entry.value for entry in self._model_lookup.values()] + + def compute_sim_state(self, signal_name: str, compute_readback: bool) -> None: + """Update the simulated state of the device. + + It will update the value in self.sim_state with the value computed by + the chosen simulation type. + + Args: + signal_name (str) : Name of the signal to update. + compute_readback (bool) : Flag whether to compute readback based on function hosted in SimulatedData + """ + if compute_readback: + if self._model == SimulationType2D.CONSTANT: + method = "_compute_constant" + elif self._model == SimulationType2D.GAUSSIAN: + method = "_compute_gaussian" + value = self.execute_simulation_method( + signal_name=signal_name, method=getattr(self, method) + ) + else: + value = self._compute_empty_image() self.update_sim_state(signal_name, value) - def _compute_constant(self) -> float: - """Computes constant value and adds noise if activated.""" - v = self._active_params["amp"] - if self._active_params["noise"] == NoiseType.POISSON: - v = np.random.poisson(np.round(v), 1)[0] - return v - elif self._active_params["noise"] == NoiseType.UNIFORM: - v += np.random.uniform(-1, 1) * self._active_params["noise_multiplier"] - return v - elif self._active_params["noise"] == NoiseType.NONE: - v = self._active_params["amp"] - return v - else: + def _compute_empty_image(self) -> np.ndarray: + """Computes return value for sim_type = "empty_image". + + Returns: + float: 0 + """ + try: + shape = self.parent.image_shape.get() + return np.zeros(shape) + except SimulatedDataException as exc: raise SimulatedDataException( - f"Unknown noise type {self._active_params['noise']}. Please choose from 'poisson'," - " 'uniform' or 'none'." - ) + f"Could not compute empty image for {self.parent.name} with {exc} raised. Deactivate eiger to continue." + ) from exc + + def _compute_constant(self) -> np.ndarray: + """Compute a return value for SimulationType2D constant.""" + try: + shape = self.parent.image_shape.get() + v = self._model_params.get("amplitude") * np.ones(shape, dtype=np.uint16) + return self._add_noise(v, self.sim_params["noise"], self.sim_params["noise_multiplier"]) + except SimulatedDataException as exc: + raise SimulatedDataException( + f"Could not compute constant for {self.parent.name} with {exc} raised. Deactivate eiger to continue." + ) from exc def _compute_gaussian(self) -> float: """Computes return value for sim_type = "gauss". The value is based on the parameters for the gaussian in - self._active_params and the position of the ref_motor - and adds noise based on the noise type. + self._active_params and adds noise based on the noise type. If computation fails, it returns 0. Returns: float """ - params = self._active_params try: - motor_pos = self.device_manager.devices[params["ref_motor"]].obj.read()[ - params["ref_motor"] - ]["value"] - v = params["amp"] * np.exp( - -((motor_pos - params["cen"]) ** 2) / (2 * params["sig"] ** 2) + amp = self.sim_params.get("amplitude") + cov = self.sim_params.get("covariance") + cen_off = self.sim_params.get("center_offset") + shape = self.sim_state[self.parent.image_shape.name]["value"] + pos, offset, cov, amp = self._prepare_params_gauss( + amp=amp, cov=cov, offset=cen_off, shape=shape + ) + + v = self._compute_multivariate_gaussian(pos=pos, cen_off=offset, cov=cov, amp=amp) + v = self._add_noise( + v, + noise=self.sim_params["noise"], + noise_multiplier=self.sim_params["noise_multiplier"], + ) + return self._add_hot_pixel( + v, + coords=self.sim_params["hot_pixel_coords"], + hot_pixel_types=self.sim_params["hot_pixel_types"], + values=self.sim_params["hot_pixel_values"], ) - if params["noise"] == NoiseType.POISSON: - v = np.random.poisson(np.round(v), 1)[0] - elif params["noise"] == NoiseType.UNIFORM: - v += np.random.uniform(-1, 1) * params["noise_multiplier"] - return v except SimulatedDataException as exc: raise SimulatedDataException( f"Could not compute gaussian for {self.parent.name} with {exc} raised. Deactivate eiger to continue." ) from exc - -class SimulatedDataCamera(SimulatedDataBase): - """Simulated class to compute data for a 2D camera.""" - - def init_paramaters(self, **kwargs): - """Initialize the parameters for the simulated data - - This method will fill self._all_params with the default parameters for - SimulationType.CONSTANT and SimulationType.GAUSSIAN. - New simulation types can be added by adding a new key to self._all_params, - together with the required parameters for that simulation type. Please - also complement the docstring of this method with the new simulation type. - - For SimulationType.CONSTANT: - Amp is the amplitude of the constant value. - Noise is the type of noise to add to the signal. Available options are 'poisson', 'uniform' or 'none'. - Noise multiplier is the multiplier of the noise, only relevant for uniform noise. - - For SimulationType.GAUSSIAN: - amp is the amplitude of the gaussian. - cen_off is the pixel offset from the center of the gaussian from the center of the image. - It is passed as a numpy array. - cov is the 2D covariance matrix used to specify the shape of the gaussian. - It is a 2x2 matrix and will be passed as a numpy array. - noise is the type of noise to add to the signal. Available options are 'poisson', 'uniform' or 'none'. - noise multiplier is the multiplier of the noise, only relevant for uniform noise. - """ - self._all_params = { - SimulationType.CONSTANT: { - "amp": 100, - "noise": NoiseType.POISSON, - "noise_multiplier": 0.1, - "hot_pixel": { - "coords": np.array([[100, 100], [200, 200]]), - "type": [HotPixelType.CONSTANT, HotPixelType.FLUCTUATING], - "value": [1e6, 1e4], - }, - }, - SimulationType.GAUSSIAN: { - "amp": 100, - "cen_off": np.array([0, 0]), - "cov": np.array([[10, 5], [5, 10]]), - "noise": NoiseType.NONE, - "noise_multiplier": 0.1, - "hot_pixel": { - "coords": np.array([[240, 240], [50, 20], [40, 400]]), - "type": [ - HotPixelType.FLUCTUATING, - HotPixelType.CONSTANT, - HotPixelType.FLUCTUATING, - ], - "value": np.array([1e4, 1e6, 1e4]), - }, - }, - } - # Update init parameters and set simulation type to Gaussian if not specified otherwise in init_sim_params - self._update_init_params(sim_type_default=SimulationType.GAUSSIAN) - - def _compute_sim_state(self, signal_name: str) -> None: - """Update the simulated state of the device. - - It will update the value in self.sim_state with the value computed by - the chosen simulation type. - - Args: - signal_name (str): Name of the signal to update. - """ - if self.get_sim_type() == SimulationType.CONSTANT: - method = "_compute_constant" - # value = self._compute_constant() - elif self.get_sim_type() == SimulationType.GAUSSIAN: - method = "_compute_gaussian" - # value = self._compute_gaussian() - - value = self.execute_simulation_method(method=getattr(self, method)) - - self.update_sim_state(signal_name, value) - - def _compute_constant(self) -> float: - """Compute a return value for sim_type = Constant.""" - try: - shape = self.sim_state[self.parent.image_shape.name]["value"] - v = self._active_params["amp"] * np.ones(shape, dtype=np.uint16) - return self._add_noise(v, self._active_params["noise"]) - except SimulatedDataException as exc: - raise SimulatedDataException( - f"Could not compute constant for {self.parent.name} with {exc} raised. Deactivate eiger to continue." - ) from exc - def _compute_multivariate_gaussian( self, pos: np.ndarray | list, @@ -375,7 +616,7 @@ class SimulatedDataCamera(SimulatedDataBase): Args: pos (np.ndarray): Position of the gaussian. - cen_off (np.ndarray): Offset from cener of image for the gaussian. + cen_off (np.ndarray): Offset from center of image for the gaussian. cov (np.ndarray): Covariance matrix of the gaussian. Returns: @@ -398,11 +639,15 @@ class SimulatedDataCamera(SimulatedDataBase): v *= amp / np.max(v) return v - def _prepare_params_gauss(self, params: dict, shape: tuple) -> tuple: + def _prepare_params_gauss( + self, amp: float, cov: np.ndarray, offset: np.ndarray, shape: tuple + ) -> tuple: """Prepare the positions for the gaussian. Args: - params (dict): Parameters for the gaussian. + amp (float): Amplitude of the gaussian. + cov (np.ndarray): Covariance matrix of the gaussian. + offset (np.ndarray): Offset from the center of the image. shape (tuple): Shape of the image. Returns: tuple: Positions, offset and covariance matrix for the gaussian. @@ -415,12 +660,9 @@ class SimulatedDataCamera(SimulatedDataBase): pos[:, :, 0] = x pos[:, :, 1] = y - offset = params["cen_off"] - cov = params["cov"] - amp = params["amp"] return pos, offset, cov, amp - def _add_noise(self, v: np.ndarray, noise: NoiseType) -> np.ndarray: + def _add_noise(self, v: np.ndarray, noise: NoiseType, noise_multiplier: float) -> np.ndarray: """Add noise to the simulated data. Args: @@ -431,51 +673,26 @@ class SimulatedDataCamera(SimulatedDataBase): v = np.random.poisson(np.round(v), v.shape) return v if noise == NoiseType.UNIFORM: - multiplier = self._active_params["noise_multiplier"] - v += np.random.uniform(-multiplier, multiplier, v.shape) + v += np.random.uniform(-noise_multiplier, noise_multiplier, v.shape) return v - if self._active_params["noise"] == NoiseType.NONE: + if noise == NoiseType.NONE: return v - def _add_hot_pixel(self, v: np.ndarray, hot_pixel: dict) -> np.ndarray: + def _add_hot_pixel( + self, v: np.ndarray, coords: list, hot_pixel_types: list, values: list + ) -> np.ndarray: """Add hot pixels to the simulated data. Args: v (np.ndarray): Simulated data. hot_pixel (dict): Hot pixel parameters. """ - for coords, hot_pixel_type, value in zip( - hot_pixel["coords"], hot_pixel["type"], hot_pixel["value"] - ): - if coords[0] < v.shape[0] and coords[1] < v.shape[1]: + for coord, hot_pixel_type, value in zip(coords, hot_pixel_types, values): + if coord[0] < v.shape[0] and coord[1] < v.shape[1]: if hot_pixel_type == HotPixelType.CONSTANT: - v[coords[0], coords[1]] = value + v[coord[0], coord[1]] = value elif hot_pixel_type == HotPixelType.FLUCTUATING: maximum = np.max(v) if np.max(v) != 0 else 1 - if v[coords[0], coords[1]] / maximum > 0.5: - v[coords[0], coords[1]] = value + if v[coord[0], coord[1]] / maximum > 0.5: + v[coord[0], coord[1]] = value return v - - def _compute_gaussian(self) -> float: - """Computes return value for sim_type = "gauss". - - The value is based on the parameters for the gaussian in - self._active_params and adds noise based on the noise type. - - If computation fails, it returns 0. - - Returns: float - """ - - try: - params = self._active_params - shape = self.sim_state[self.parent.image_shape.name]["value"] - pos, offset, cov, amp = self._prepare_params_gauss(self._active_params, shape) - - v = self._compute_multivariate_gaussian(pos=pos, cen_off=offset, cov=cov, amp=amp) - v = self._add_noise(v, params["noise"]) - return self._add_hot_pixel(v, params["hot_pixel"]) - except SimulatedDataException as exc: - raise SimulatedDataException( - f"Could not compute gaussian for {self.parent.name} with {exc} raised. Deactivate eiger to continue." - ) from exc diff --git a/ophyd_devices/sim/sim_frameworks.py b/ophyd_devices/sim/sim_frameworks.py index 4c6f784..fe13cfb 100644 --- a/ophyd_devices/sim/sim_frameworks.py +++ b/ophyd_devices/sim/sim_frameworks.py @@ -10,7 +10,7 @@ class DeviceProxy(BECDeviceBase): """DeviceProxy class inherits from BECDeviceBase.""" -class SlitLookup(DeviceProxy): +class SlitProxy(DeviceProxy): """ Simulation framework to immidate the behaviour of slits. @@ -27,11 +27,12 @@ class SlitLookup(DeviceProxy): slit_sim: readoutPriority: on_request - deviceClass: SlitLookup + deviceClass: SlitProxy deviceConfig: eiger: - cen_off: [0, 0] # [x,y] - cov: [[1000, 500], [200, 1000]] # [[x,x],[y,y]] + signal_name: image + center_offset: [0, 0] # [x,y] + covariance: [[1000, 500], [200, 1000]] # [[x,x],[y,y]] pixel_size: 0.01 ref_motors: [samx, samy] slit_width: [1, 1] @@ -61,7 +62,9 @@ class SlitLookup(DeviceProxy): print(self.__doc__) def _update_device_config(self, config: dict) -> None: - """Update the config from the device_config for the pinhole lookup table. + """ + BEC will call this method on every object upon initializing devices to pass over the deviceConfig + from the config file. It can be conveniently be used to hand over initial parameters to the device. Args: config (dict): Config dictionary. @@ -83,8 +86,8 @@ class SlitLookup(DeviceProxy): """Compile the lookup table for the simulated camera.""" for device_name in self.config.keys(): self._lookup[device_name] = { - # "obj": self, "method": self._compute, + "signal_name": self.config[device_name]["signal_name"], "args": (device_name,), "kwargs": {}, } @@ -96,22 +99,28 @@ class SlitLookup(DeviceProxy): Args: device_name (str): Name of the device. + signal_name (str): Name of the signal. Returns: np.ndarray: Lookup table for the simulated camera. """ device_obj = self.device_manager.devices.get(device_name).obj - params = device_obj.sim._all_params.get("gauss") + params = device_obj.sim.sim_params shape = device_obj.image_shape.get() params.update( { "noise": NoiseType.POISSON, - "cov": np.array(self.config[device_name]["cov"]), - "cen_off": np.array(self.config[device_name]["cen_off"]), + "covariance": np.array(self.config[device_name]["covariance"]), + "center_offset": np.array(self.config[device_name]["center_offset"]), } ) + amp = params.get("amplitude") + cov = params.get("covariance") + cen_off = params.get("center_offset") - pos, offset, cov, amp = device_obj.sim._prepare_params_gauss(params, shape) + pos, offset, cov, amp = device_obj.sim._prepare_params_gauss( + amp=amp, cov=cov, offset=cen_off, shape=shape + ) v = device_obj.sim._compute_multivariate_gaussian(pos=pos, cen_off=offset, cov=cov, amp=amp) device_pos = self.config[device_name]["pixel_size"] * pos valid_mask = self._create_mask( @@ -122,8 +131,15 @@ class SlitLookup(DeviceProxy): ) valid_mask = self._blur_image(valid_mask, sigma=self._gaussian_blur_sigma) v *= valid_mask - v = device_obj.sim._add_noise(v, params["noise"]) - v = device_obj.sim._add_hot_pixel(v, params["hot_pixel"]) + v = device_obj.sim._add_noise( + v, noise=params["noise"], noise_multiplier=params["noise_multiplier"] + ) + v = device_obj.sim._add_hot_pixel( + v, + coords=params["hot_pixel_coords"], + hot_pixel_types=params["hot_pixel_types"], + values=params["hot_pixel_values"], + ) return v def _blur_image(self, image: np.ndarray, sigma: float = 1) -> np.ndarray: @@ -159,6 +175,6 @@ class SlitLookup(DeviceProxy): if __name__ == "__main__": # Example usage - pinhole = SlitLookup(name="pinhole", device_manager=None) + pinhole = SlitProxy(name="pinhole", device_manager=None) pinhole.describe() print(pinhole) diff --git a/ophyd_devices/sim/sim_signals.py b/ophyd_devices/sim/sim_signals.py index f0ae27c..51c6004 100644 --- a/ophyd_devices/sim/sim_signals.py +++ b/ophyd_devices/sim/sim_signals.py @@ -1,28 +1,40 @@ -import time as ttime +import time +import numpy as np from bec_lib import bec_logger -import numpy as np from ophyd import Signal, Kind from ophyd.utils import ReadOnlyError logger = bec_logger.logger -# Readout precision for Setable/Readonly/ComputedReadonly signals +# Readout precision for Setable/ReadOnlySignal signals PRECISION = 3 class SetableSignal(Signal): """Setable signal for simulated devices. - It will return the value of the readback signal based on the position - created in the sim_state dictionary of the parent device. + The signal will store the value in sim_state of the SimulatedData class of the parent device. + It will also return the value from sim_state when get is called. Compared to the ReadOnlySignal, + this signal can be written to. + + >>> signal = SetableSignal(name="signal", parent=parent, value=0) + + Parameters + ---------- + + name (string) : Name of the signal + parent (object) : Parent object of the signal, default none. + value (any) : Initial value of the signal, default 0. + kind (int) : Kind of the signal, default Kind.normal. + precision (float) : Precision of the signal, default PRECISION. """ def __init__( self, - *args, name: str, - value: any = None, + *args, + value: any = 0, kind: int = Kind.normal, precision: float = PRECISION, **kwargs, @@ -34,7 +46,6 @@ class SetableSignal(Signal): ) self._value = value self.precision = precision - # Init the sim_state, if self.parent.sim available, use it, else use self.parent self.sim = getattr(self.parent, "sim", self.parent) self._update_sim_state(value) @@ -50,6 +61,7 @@ class SetableSignal(Signal): """Update the timestamp of the readback value.""" return self.sim.sim_state[self.name]["timestamp"] + # pylint: disable=arguments-differ def get(self): """Get the current position of the simulated device. @@ -58,6 +70,7 @@ class SetableSignal(Signal): self._value = self._get_value() return self._value + # pylint: disable=arguments-differ def put(self, value): """Put the value to the simulated device. @@ -83,111 +96,55 @@ class SetableSignal(Signal): class ReadOnlySignal(Signal): - """Readonly signal for simulated devices. + """Computed readback signal for simulated devices. - If initiated without a value, it will set the initial value to 0. + The readback will be computed from a function hosted in the SimulatedData class from the parent device + if compute_readback is True. Else, it will return the value stored int sim.sim_state directly. + + >>> signal = ComputedReadOnlySignal(name="signal", parent=parent, value=0, compute_readback=True) + + Parameters + ---------- + + name (string) : Name of the signal + parent (object) : Parent object of the signal, default none. + value (any) : Initial value of the signal, default 0. + kind (int) : Kind of the signal, default Kind.normal. + precision (float) : Precision of the signal, default PRECISION. + compute_readback (bool) : Flag whether to compute readback based on function hosted in SimulatedData + class. If False, sim_state value will be returned, if True, new value will be computed """ def __init__( self, - *args, name: str, + *args, + parent=None, value: any = 0, kind: int = Kind.normal, precision: float = PRECISION, + compute_readback: bool = False, **kwargs, ): - super().__init__(*args, name=name, value=value, kind=kind, **kwargs) + super().__init__(*args, name=name, parent=parent, value=value, kind=kind, **kwargs) self._metadata.update( connected=True, write_access=False, ) - self.precision = precision self._value = value - # Init the sim_state, if self.parent.sim available, use it, else use self.parent + self.precision = precision + self.compute_readback = compute_readback self.sim = getattr(self.parent, "sim", None) - self._init_sim_state() + if self.sim: + self._init_sim_state() def _init_sim_state(self) -> None: - """Init the readback value and timestamp in sim_state""" - if self.sim: - self.sim.update_sim_state(self.name, self._value) - - def _get_value(self) -> any: - """Get the value of the readback from sim_state.""" - if self.sim: - return self.sim.sim_state[self.name]["value"] - else: - return np.random.rand() - - def _get_timestamp(self) -> any: - """Get the timestamp of the readback from sim_state.""" - if self.sim: - return self.sim.sim_state[self.name]["timestamp"] - else: - return ttime.time() - - def get(self) -> any: - """Get the current position of the simulated device. - - Core function for signal. - """ - self._value = self._get_value() - return self._value - - def put(self, value) -> None: - """Put method, should raise ReadOnlyError since the signal is readonly.""" - raise ReadOnlyError(f"The signal {self.name} is readonly.") - - def describe(self): - """Describe the readback signal. - - Core function for signal. - """ - res = super().describe() - if self.precision is not None: - res[self.name]["precision"] = self.precision - return res - - @property - def timestamp(self): - """Timestamp of the readback value""" - return self._get_timestamp() - - -class ComputedReadOnlySignal(Signal): - """Computed readback signal for simulated devices. - - It will return the value computed from the sim_state of the signal. - This can be configured in parent.sim. - """ - - def __init__( - self, - *args, - name: str, - value: any = None, - kind: int = Kind.normal, - precision: float = PRECISION, - **kwargs, - ): - super().__init__(*args, name=name, value=value, kind=kind, **kwargs) - self._metadata.update( - connected=True, - write_access=False, - ) - self._value = value - self.precision = precision - # Init the sim_state, if self.parent.sim available, use it, else use self.parent - self.sim = getattr(self.parent, "sim", self.parent) - self._update_sim_state() + """Create the initial sim_state in the SimulatedData class of the parent device.""" + self.sim.update_sim_state(self.name, self._value) def _update_sim_state(self) -> None: - """Update the readback value. - - Call _compute_sim_state in parent device which updates the sim_state. - """ - self.sim._compute_sim_state(self.name) + """Update the readback value.""" + self.sim.compute_sim_state(signal_name=self.name, compute_readback=self.compute_readback) def _get_value(self) -> any: """Update the timestamp of the readback value.""" @@ -197,15 +154,16 @@ class ComputedReadOnlySignal(Signal): """Update the timestamp of the readback value.""" return self.sim.sim_state[self.name]["timestamp"] + # pylint: disable=arguments-differ def get(self): - """Get the current position of the simulated device. - - Core function for signal. - """ - self._update_sim_state() - self._value = self._get_value() - return self._value + """Get the current position of the simulated device.""" + if self.sim: + self._update_sim_state() + self._value = self._get_value() + return self._value + return np.random.rand() + # pylint: disable=arguments-differ def put(self, value) -> None: """Put method, should raise ReadOnlyError since the signal is readonly.""" raise ReadOnlyError(f"The signal {self.name} is readonly.") @@ -223,15 +181,6 @@ class ComputedReadOnlySignal(Signal): @property def timestamp(self): """Timestamp of the readback value""" - return self._get_timestamp() - - -if __name__ == "__main__": - from ophyd_devices.sim import SimPositioner - - positioner = SimPositioner(name="positioner", parent=None) - print(positioner.velocity.get()) - positioner.velocity.put(10) - print(positioner.velocity.get()) - positioner.velocity.put(1) - print(positioner.velocity.get()) + if self.sim: + return self._get_timestamp() + return time.time() diff --git a/ophyd_devices/sim/sim_test_devices.py b/ophyd_devices/sim/sim_test_devices.py index 2030b66..3f1b1ea 100644 --- a/ophyd_devices/sim/sim_test_devices.py +++ b/ophyd_devices/sim/sim_test_devices.py @@ -14,6 +14,7 @@ class DummyControllerDevice(Device): class DummyController: USER_ACCESS = [ "some_var", + "some_var_property", "controller_show_all", "_func_with_args", "_func_with_args_and_kwargs", @@ -23,11 +24,19 @@ class DummyController: some_var = 10 another_var = 20 + def __init__(self) -> None: + self._some_var_property = None + self.connected = False + + @property + def some_var_property(self): + return self._some_var_property + def on(self): - self._connected = True + self.connected = True def off(self): - self._connected = False + self.connected = False def _func_with_args(self, *args): return args diff --git a/ophyd_devices/sim/test.py b/ophyd_devices/sim/test.py new file mode 100644 index 0000000..7ffdb0d --- /dev/null +++ b/ophyd_devices/sim/test.py @@ -0,0 +1,50 @@ +import lmfit +import inspect + + +class LmfitModelMixin: + + # def __init__(self): + # self.model = lmfit.models.GaussianModel() + # self.params = self.model.make_params() + # self.params["center"].set(value=0) + # self.params["amplitude"].set(value=1) + # self.params["sigma"].set(value=1) + + @staticmethod + def available_models() -> dict: + """ + Get available models from lmfit.models. + + Exclude Gaussian2dModel, ExpressionModel, Model, SplineModel. + """ + avail_models = {} + for name, model_cls in inspect.getmembers(lmfit.models): + try: + is_model = issubclass(model_cls, lmfit.model.Model) + except TypeError: + is_model = False + if is_model and name not in [ + "Gaussian2dModel", + "ExpressionModel", + "Model", + "SplineModel", + ]: + avail_models[name] = model_cls + return avail_models + + def create_properties(self): + """ + Create properties for model parameters. + """ + for name in self.available_models(): + setattr(self, name, param) + + @staticmethod + def get_model(model: str) -> lmfit.Model: + """Get model for given string.""" + if isinstance(model, str): + model = getattr(lmfit.models, model, None) + if not model: + raise ValueError(f"Model {model} not found.") + return model diff --git a/tests/test_simulation.py b/tests/test_simulation.py index d0a7712..c046f23 100644 --- a/tests/test_simulation.py +++ b/tests/test_simulation.py @@ -1,8 +1,14 @@ from ophyd_devices.utils.bec_device_base import BECDeviceBase, BECDevice +from ophyd import Device, Signal + def test_BECDeviceBase(): # Test the BECDeviceBase class - test = BECDeviceBase(name="test") - assert isinstance(test, BECDevice) - assert test.connected is True + bec_device_base = BECDeviceBase(name="test") + assert isinstance(bec_device_base, BECDevice) + assert bec_device_base.connected is True + signal = Signal(name="signal") + assert isinstance(signal, BECDevice) + device = Device(name="device") + assert isinstance(device, BECDevice)