From f311ce5d1c082c107f782916c2fb724a34a92099 Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 30 Jan 2024 13:12:00 +0100 Subject: [PATCH] feat: refactor simulation, introduce SimCamera, SimMonitor in addition to existing classes --- ophyd_devices/sim/sim.py | 483 +++++++++++++++++---------------------- 1 file changed, 215 insertions(+), 268 deletions(-) diff --git a/ophyd_devices/sim/sim.py b/ophyd_devices/sim/sim.py index 503ea41..983d295 100644 --- a/ophyd_devices/sim/sim.py +++ b/ophyd_devices/sim/sim.py @@ -1,3 +1,4 @@ +from collections import defaultdict import os import threading import time as ttime @@ -6,13 +7,14 @@ import warnings import numpy as np from bec_lib import MessageEndpoints, bec_logger, messages from ophyd import Component as Cpt -from ophyd import Device, DeviceStatus +from ophyd import Device, DeviceStatus, Kind from ophyd import DynamicDeviceComponent as Dcpt from ophyd import OphydObject, PositionerBase, Signal from ophyd.sim import EnumSignal, SynSignal from ophyd.utils import LimitError, ReadOnlyError +from ophyd_devices.sim.sim_data import SimulatedDataBase, SimulatedDataCamera, SimulatedDataMonitor -from ophyd_devices.sim.sim_signals import ReadbackSignal, SetpointSignal, IsMovingSignal +from ophyd_devices.sim.sim_signals import SetableSignal, ReadOnlySignal, ComputedReadOnlySignal logger = bec_logger.logger @@ -21,99 +23,15 @@ class DeviceStop(Exception): pass -class SynSignalRO(Signal): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._metadata.update( - write_access=False, - ) - - def wait_for_connection(self, timeout=0): - super().wait_for_connection(timeout) - self._metadata.update(connected=True) - - def get(self, **kwargs): - self._readback = np.random.rand() - return self._readback - - -class _ReadbackSignalCompute(Signal): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._metadata.update( - connected=True, - write_access=False, - ) - - def get(self): - readback = self.parent._compute() - self._readback = self.parent.sim_state["readback"] = readback - return readback - - def describe(self): - res = super().describe() - # There should be only one key here, but for the sake of - # generality.... - for k in res: - res[k]["precision"] = self.parent.precision - return res - - @property - def timestamp(self): - """Timestamp of the readback value""" - return self.parent.sim_state["readback_ts"] - - def put(self, value, *, timestamp=None, force=False): - raise ReadOnlyError("The signal {} is readonly.".format(self.name)) - - def set(self, value, *, timestamp=None, force=False): - raise ReadOnlyError("The signal {} is readonly.".format(self.name)) - - -class _ReadbackSignalRand(Signal): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._metadata.update( - connected=True, - write_access=False, - ) - - def get(self): - self._readback = np.random.rand() - return self._readback - - def describe(self): - res = super().describe() - # There should be only one key here, but for the sake of - # generality.... - for k in res: - res[k]["precision"] = self.parent.precision - return res - - @property - def timestamp(self): - """Timestamp of the readback value""" - return self.parent.sim_state["readback_ts"] - - def put(self, value, *, timestamp=None, force=False): - raise ReadOnlyError("The signal {} is readonly.".format(self.name)) - - def set(self, value, *, timestamp=None, force=False): - raise ReadOnlyError("The signal {} is readonly.".format(self.name)) - - -class SynAxisMonitor(Device): +class SimMonitor(Device): """ - A synthetic settable Device mimic any 1D Axis (position, temperature). + A simulated device mimic any 1D Axis (position, temperature, beam). + + Readback functionality can be configured Parameters ---------- name : string, keyword only - readback_func : callable, optional - When the Device is set to ``x``, its readback will be updated to - ``f(x)``. This can be used to introduce random noise or a systematic - offset. - Expected signature: ``f(x) -> value``. value : object, optional The initial value. Default is 0. delay : number, optional @@ -126,7 +44,11 @@ class SynAxisMonitor(Device): Default is Kind.normal. See Kind for options. """ - readback = Cpt(_ReadbackSignalRand, value=0, kind="hinted") + USER_ACCESS = ["sim"] + + sim_cls = SimulatedDataMonitor + + readback = Cpt(ComputedReadOnlySignal, value=0, kind=Kind.hinted) SUB_READBACK = "readback" _default_sub = SUB_READBACK @@ -135,42 +57,117 @@ class SynAxisMonitor(Device): self, *, name, - readback_func=None, value=0, delay=0, precision=3, + tolerance: float = 0.5, + sim_init: dict = None, parent=None, labels=None, kind=None, + device_manager=None, **kwargs, ): - if readback_func is None: - - def readback_func(x): - return x - - sentinel = object() - loop = kwargs.pop("loop", sentinel) - if loop is not sentinel: - warnings.warn( - f"{self.__class__} no longer takes a loop as input. " - "Your input will be ignored and may raise in the future", - stacklevel=2, - ) - self.sim_state = {} - self._readback_func = readback_func self.delay = delay self.precision = precision - self.tolerance = kwargs.pop("tolerance", 0.5) - - # initialize values - self.sim_state["readback"] = readback_func(value) - self.sim_state["readback_ts"] = ttime.time() + self.tolerance = tolerance + self.init_sim_params = sim_init + self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs) super().__init__(name=name, parent=parent, labels=labels, kind=kind, **kwargs) + self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None) self.readback.name = self.name +class SynGaussBEC(Device): + """ + Evaluate a point on a Gaussian based on the value of a motor. + + Parameters + ---------- + name : string + motor : Device + motor_field : string + center : number + center of peak + Imax : number + max intensity of peak + sigma : number, optional + Default is 1. + noise : {'poisson', 'uniform', None}, optional + Add noise to the gaussian peak. + noise_multiplier : float, optional + Only relevant for 'uniform' noise. Multiply the random amount of + noise by 'noise_multiplier' + random_state : numpy random state object, optional + np.random.RandomState(0), to generate random number with given seed + + Example + ------- + motor = SynAxis(name='motor') + det = SynGauss('det', motor, 'motor', center=0, Imax=1, sigma=1) + """ + + val = Cpt(ComputedReadOnlySignal, value=0, kind=Kind.hinted) + Imax = Cpt(Signal, value=10, kind=Kind.config) + center = Cpt(Signal, value=0, kind=Kind.config) + sigma = Cpt(Signal, value=1, kind=Kind.config) + motor = Cpt(Signal, value="samx", kind=Kind.config) + noise = Cpt( + EnumSignal, + value="none", + kind=Kind.config, + enum_strings=("none", "poisson", "uniform"), + ) + noise_multiplier = Cpt(Signal, value=1, kind=Kind.config) + + def __init__(self, name, *, device_manager=None, random_state=None, **kwargs): + self.device_manager = device_manager + set_later = {} + for k in ("sigma", "noise", "noise_multiplier"): + v = kwargs.pop(k, None) + if v is not None: + set_later[k] = v + self.sim_state = defaultdict(lambda: {}) + super().__init__(name=name, **kwargs) + self.sim_state[self.name] = self.sim_state.pop(self.val.name, None) + self.val.name = self.name + + self.random_state = random_state or np.random + self.precision = 3 + + for k, v in set_later.items(): + getattr(self, k).put(v) + + def _compute_sim_state(self, signal_name: str) -> None: + try: + m = self.device_manager.devices[self.motor.get()].obj.read()[self.motor.get()]["value"] + # we need to do this one at a time because + # - self.read() may be screwed with by the user + # - self.get() would cause infinite recursion + Imax = self.Imax.get() + center = self.center.get() + sigma = self.sigma.get() + noise = self.noise.get() + noise_multiplier = self.noise_multiplier.get() + v = Imax * np.exp(-((m - center) ** 2) / (2 * sigma**2)) + if noise == "poisson": + v = int(self.random_state.poisson(np.round(v), 1)) + elif noise == "uniform": + v += self.random_state.uniform(-1, 1) * noise_multiplier + self.sim_state[signal_name]["value"] = v + self.sim_state[signal_name]["timestamp"] = ttime.time() + except Exception as exc: + logger.warning(f"Failed to compute sim state with exception {exc}") + self.sim_state[signal_name]["value"] = 0 + self.sim_state[signal_name]["timestamp"] = ttime.time() + + def get(self, *args, **kwargs): + self.sim_state["readback"] = self._compute() + self.sim_state["readback_ts"] = ttime.time() + return self.val.get() + + class _SLSDetectorConfigSignal(Signal): def put(self, value, *, timestamp=None, force=False): self._readback = value @@ -181,26 +178,46 @@ class _SLSDetectorConfigSignal(Signal): return self.parent.sim_state[self.name] -class SynSLSDetector(Device): - USER_ACCESS = [] - exp_time = Cpt(_SLSDetectorConfigSignal, name="exp_time", value=1, kind="config") - file_path = Cpt(_SLSDetectorConfigSignal, name="file_path", value="", kind="config") - file_pattern = Cpt(_SLSDetectorConfigSignal, name="file_pattern", value="", kind="config") - frames = Cpt(_SLSDetectorConfigSignal, name="frames", value=1, kind="config") - burst = Cpt(_SLSDetectorConfigSignal, name="burst", value=1, kind="config") - save_file = Cpt(_SLSDetectorConfigSignal, name="save_file", value=False, kind="config") +class SimCamera(Device): + USER_ACCESS = ["sim"] - def __init__(self, *, name, kind=None, parent=None, device_manager=None, **kwargs): + sim_cls = SimulatedDataCamera + SHAPE = (100, 100) + + SUB_MONITOR = "monitor" + _default_sub = SUB_MONITOR + + exp_time = Cpt(SetableSignal, name="exp_time", value=1, kind=Kind.config) + file_path = Cpt(SetableSignal, name="file_path", value="", kind=Kind.config) + file_pattern = Cpt(SetableSignal, name="file_pattern", value="", kind=Kind.config) + frames = Cpt(SetableSignal, name="frames", value=1, kind=Kind.config) + burst = Cpt(SetableSignal, name="burst", value=1, kind=Kind.config) + save_file = Cpt(SetableSignal, name="save_file", value=False, kind=Kind.config) + + # image shape, only adjustable via config. + image_shape = Cpt(ReadOnlySignal, name="image_shape", value=SHAPE, kind=Kind.config) + image = Cpt( + ComputedReadOnlySignal, + name="image", + value=np.empty(SHAPE, dtype=np.uint16), + kind=Kind.omitted, + ) + + def __init__( + self, + *, + name, + kind=None, + parent=None, + sim_init: dict = None, + device_manager=None, + **kwargs, + ): self.device_manager = device_manager + self.init_sim_params = sim_init + self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs) + super().__init__(name=name, parent=parent, kind=kind, **kwargs) - self.sim_state = { - f"{self.name}_file_path": "~/Data10/data/", - f"{self.name}_file_pattern": f"{self.name}_{{:05d}}.h5", - f"{self.name}_frames": 1, - f"{self.name}_burst": 1, - f"{self.name}_save_file": False, - f"{self.name}_exp_time": 0, - } self._stopped = False self.file_name = "" self.metadata = {} @@ -213,6 +230,8 @@ class SynSLSDetector(Device): def acquire(): try: for _ in range(self.burst.get()): + # Send data for each trigger + self._run_subs(sub_type=self.SUB_MONITOR, value=self.image.get()) ttime.sleep(self.exp_time.get()) if self._stopped: raise DeviceStop @@ -226,6 +245,11 @@ class SynSLSDetector(Device): return status def stage(self) -> list[object]: + """Stage the camera + + Receive scan message from REDIS first, extract relevant scan data, + and set all signals for the scan, e.g. scan_number, file_name, frames, etc. + """ msg = self.device_manager.producer.get(MessageEndpoints.scan_status()) scan_msg = messages.ScanStatusMessage.loads(msg) self.metadata = { @@ -238,19 +262,36 @@ class SynSLSDetector(Device): self.file_name = os.path.join( self.file_path.get(), self.file_pattern.get().format(scan_number) ) + self._stopped = False return super().stage() - def unstage(self) -> list[object]: - signals = {"config": self.sim_state, "data": self.file_name} + def _send_data_to_bec(self) -> None: + config_readout = { + signal.item.name: signal.item.get() + for signal in self.walk_signals() + if signal.item._kind == Kind.config + } + + signals = {"config": config_readout, "data": self.file_name} msg = messages.DeviceMessage(signals=signals, metadata=self.metadata) self.device_manager.producer.set_and_publish( MessageEndpoints.device_read(self.name), msg.dumps() ) + + def unstage(self) -> list[object]: + """Unstage the device + + Send reads from all config signals to redis + """ + if self._stopped is True or not self._staged: + return super().unstage() + self._send_data_to_bec() + return super().unstage() def stop(self, *, success=False): - super().stop(success=success) self._stopped = True + super().stop(success=success) class DummyController: @@ -528,21 +569,23 @@ class SimPositioner(Device, PositionerBase): """ # Specify which attributes are accessible via BEC client - USER_ACCESS = ["sim_state", "readback", "speed", "dummy_controller"] + USER_ACCESS = ["sim", "readback", "speed", "dummy_controller"] + + sim_cls = SimulatedDataBase # Define the signals as class attributes - readback = Cpt(ReadbackSignal, value=0, kind="hinted") - setpoint = Cpt(SetpointSignal, value=0, kind="normal") - motor_is_moving = Cpt(IsMovingSignal, value=0, kind="normal") + readback = Cpt(ReadOnlySignal, name="readback", value=0, kind=Kind.hinted) + setpoint = Cpt(SetableSignal, value=0, kind=Kind.normal) + motor_is_moving = Cpt(SetableSignal, value=0, kind=Kind.normal) # Config signals - velocity = Cpt(Signal, value=1, kind="config") - acceleration = Cpt(Signal, value=1, kind="config") + velocity = Cpt(SetableSignal, value=1, kind=Kind.config) + acceleration = Cpt(SetableSignal, value=1, kind=Kind.config) # Ommitted signals - high_limit_travel = Cpt(Signal, value=0, kind="omitted") - low_limit_travel = Cpt(Signal, value=0, kind="omitted") - unused = Cpt(Signal, value=1, kind="omitted") + high_limit_travel = Cpt(SetableSignal, value=0, kind=Kind.omitted) + low_limit_travel = Cpt(SetableSignal, value=0, kind=Kind.omitted) + unused = Cpt(Signal, value=1, kind=Kind.omitted) # TODO add short description to these two lines and explain what this does SUB_READBACK = "readback" @@ -562,44 +605,27 @@ class SimPositioner(Device, PositionerBase): labels=None, kind=None, limits=None, + tolerance: float = 0.5, + sim: dict = None, **kwargs, ): - if readback_func is None: - - def readback_func(x): - return x - - # TODO what is this, check if still needed.. - sentinel = object() - loop = kwargs.pop("loop", sentinel) - if loop is not sentinel: - warnings.warn( - f"{self.__class__} no longer takes a loop as input. " - "Your input will be ignored and may raise in the future", - stacklevel=2, - ) - - self._readback_func = readback_func # Whether motions should be instantaneous or depend on motor velocity self.delay = delay self.precision = precision + self.tolerance = tolerance + self.init_sim_params = sim + self.speed = speed self.update_frequency = update_frequency - self.tolerance = kwargs.pop("tolerance", 0.05) self._stopped = False - self.dummy_controller = DummyController() # initialize inner dictionary with simulated state - self.sim_state = {} - self.sim_state["setpoint"] = value - self.sim_state["setpoint_ts"] = ttime.time() - self.sim_state["readback"] = readback_func(value) - self.sim_state["readback_ts"] = ttime.time() - self.sim_state["is_moving"] = 0 - self.sim_state["is_moving_ts"] = ttime.time() + self.sim = self.sim_cls(parent=self, **kwargs) - super().__init__(name=name, parent=parent, labels=labels, kind=kind, **kwargs) + super().__init__(name=name, labels=labels, kind=kind, **kwargs) + # Rename self.readback.name to self.name, also in self.sim_state + self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None) self.readback.name = self.name # Init limits from deviceConfig if limits is not None: @@ -633,29 +659,36 @@ class SimPositioner(Device, PositionerBase): if low_limit < high_limit and not low_limit <= value <= high_limit: raise LimitError(f"position={value} not within limits {self.limits}") - def move(self, value, **kwargs) -> DeviceStatus: + def _set_sim_state(self, signal_name: str, value: any) -> None: + """Update the simulated state of the device.""" + self.sim.sim_state[signal_name]["value"] = value + self.sim.sim_state[signal_name]["timestamp"] = ttime.time() + + def _get_sim_state(self, signal_name: str) -> any: + """Return the simulated state of the device.""" + return self.sim.sim_state[signal_name]["value"] + + def move(self, value: float, **kwargs) -> DeviceStatus: """Change the setpoint of the simulated device, and simultaneously initiated a motion.""" self._stopped = False self.check_value(value) - old_setpoint = self.sim_state["setpoint"] - self.sim_state["is_moving"] = 1 - self.sim_state["setpoint"] = value - self.sim_state["setpoint_ts"] = ttime.time() + old_setpoint = self._get_sim_state(self.setpoint.name) + self._set_sim_state(self.motor_is_moving.name, 1) + self._set_sim_state(self.setpoint.name, value) def update_state(val): """Update the state of the simulated device.""" if self._stopped: raise DeviceStop - old_readback = self.sim_state["readback"] - self.sim_state["readback"] = val - self.sim_state["readback_ts"] = ttime.time() + old_readback = self._get_sim_state(self.readback.name) + self._set_sim_state(self.readback.name, val) # Run subscription on "readback" self._run_subs( sub_type=self.SUB_READBACK, old_value=old_readback, - value=self.sim_state["readback"], - timestamp=self.sim_state["readback_ts"], + value=self.sim.sim_state[self.readback.name]["value"], + timestamp=self.sim.sim_state[self.readback.name]["timestamp"], ) st = DeviceStatus(device=self) @@ -666,9 +699,9 @@ class SimPositioner(Device, PositionerBase): success = True try: # Compute final position with some jitter - move_val = self.sim_state["setpoint"] + self.tolerance * np.random.uniform( - -1, 1 - ) + move_val = self._get_sim_state( + self.setpoint.name + ) + self.tolerance * np.random.uniform(-1, 1) # Compute the number of updates needed to reach the final position with the given speed updates = np.ceil( np.abs(old_setpoint - move_val) / self.speed * self.update_frequency @@ -679,8 +712,7 @@ class SimPositioner(Device, PositionerBase): update_state(ii) # Update the state of the simulated device to the final position update_state(move_val) - self.sim_state["is_moving"] = 0 - self.sim_state["is_moving_ts"] = ttime.time() + self._set_sim_state(self.motor_is_moving, 0) except DeviceStop: success = False finally: @@ -716,90 +748,6 @@ class SimPositioner(Device, PositionerBase): return "mm" -class SynGaussBEC(Device): - """ - Evaluate a point on a Gaussian based on the value of a motor. - - Parameters - ---------- - name : string - motor : Device - motor_field : string - center : number - center of peak - Imax : number - max intensity of peak - sigma : number, optional - Default is 1. - noise : {'poisson', 'uniform', None}, optional - Add noise to the gaussian peak. - noise_multiplier : float, optional - Only relevant for 'uniform' noise. Multiply the random amount of - noise by 'noise_multiplier' - random_state : numpy random state object, optional - np.random.RandomState(0), to generate random number with given seed - - Example - ------- - motor = SynAxis(name='motor') - det = SynGauss('det', motor, 'motor', center=0, Imax=1, sigma=1) - """ - - val = Cpt(_ReadbackSignalCompute, value=0, kind="hinted") - Imax = Cpt(Signal, value=10, kind="config") - center = Cpt(Signal, value=0, kind="config") - sigma = Cpt(Signal, value=1, kind="config") - motor = Cpt(Signal, value="samx", kind="config") - noise = Cpt( - EnumSignal, - value="none", - kind="config", - enum_strings=("none", "poisson", "uniform"), - ) - noise_multiplier = Cpt(Signal, value=1, kind="config") - - def __init__(self, name, *, device_manager=None, random_state=None, **kwargs): - self.device_manager = device_manager - set_later = {} - for k in ("sigma", "noise", "noise_multiplier"): - v = kwargs.pop(k, None) - if v is not None: - set_later[k] = v - super().__init__(name=name, **kwargs) - - self.random_state = random_state or np.random - self.val.name = self.name - self.precision = 3 - self.sim_state = {"readback": 0, "readback_ts": ttime.time()} - for k, v in set_later.items(): - getattr(self, k).put(v) - - def _compute(self): - try: - m = self.device_manager.devices[self.motor.get()].obj.read()[self.motor.get()]["value"] - # we need to do this one at a time because - # - self.read() may be screwed with by the user - # - self.get() would cause infinite recursion - Imax = self.Imax.get() - center = self.center.get() - sigma = self.sigma.get() - noise = self.noise.get() - noise_multiplier = self.noise_multiplier.get() - v = Imax * np.exp(-((m - center) ** 2) / (2 * sigma**2)) - if noise == "poisson": - v = int(self.random_state.poisson(np.round(v), 1)) - elif noise == "uniform": - v += self.random_state.uniform(-1, 1) * noise_multiplier - return v - except Exception: - return 0 - - def get(self, *args, **kwargs): - self.sim_state["readback"] = self._compute() - self.sim_state["readback_ts"] = ttime.time() - return self.val.get() - - class SynDynamicComponents(Device): messages = Dcpt({f"message{i}": (SynSignal, None, {"name": f"msg{i}"}) for i in range(1, 6)}) @@ -815,5 +763,4 @@ class SynDeviceOPAAS(Device): if __name__ == "__main__": - det = SynSLSDetector(name="moench") - det.trigger() + gauss = SynGaussBEC(name="gauss")