diff --git a/ophyd_devices/__init__.py b/ophyd_devices/__init__.py index e89b012..264f7eb 100644 --- a/ophyd_devices/__init__.py +++ b/ophyd_devices/__init__.py @@ -18,7 +18,7 @@ from .sim.sim import SimMonitor as SynAxisMonitor from .sim.sim import SimMonitor as SynGaussBEC from .sim.sim import SimPositioner from .sim.sim import SimPositioner as SynAxisOPAAS -from .sim.sim import SynDeviceOPAAS +from .sim.sim import SimWaveform, SynDeviceOPAAS from .sim.sim_frameworks import DeviceProxy, H5ImageReplayProxy, SlitProxy from .sim.sim_signals import ReadOnlySignal from .sim.sim_signals import ReadOnlySignal as SynSignalRO diff --git a/ophyd_devices/sim/__init__.py b/ophyd_devices/sim/__init__.py index 003f7cb..58c6f29 100644 --- a/ophyd_devices/sim/__init__.py +++ b/ophyd_devices/sim/__init__.py @@ -1,7 +1,7 @@ from .sim import SimCamera from .sim import SimFlyer from .sim import SimFlyer as SynFlyer -from .sim import SimMonitor, SimPositioner +from .sim import SimMonitor, SimPositioner, SimWaveform from .sim_frameworks import SlitProxy from .sim_signals import ReadOnlySignal, SetableSignal from .sim_xtreme import SynXtremeOtf diff --git a/ophyd_devices/sim/sim.py b/ophyd_devices/sim/sim.py index bb54c8b..bd08687 100644 --- a/ophyd_devices/sim/sim.py +++ b/ophyd_devices/sim/sim.py @@ -16,6 +16,7 @@ from ophyd.utils import LimitError from ophyd_devices.sim.sim_data import ( SimulatedDataCamera, SimulatedDataMonitor, + SimulatedDataWaveform, SimulatedPositioner, ) from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal @@ -220,6 +221,141 @@ class SimCamera(Device): super().stop(success=success) +class SimWaveform(Device): + """A simulated device mimic any 1D Waveform detector. + + It's waveform is a computed signal, which is configurable by the user and from the command line. + The corresponding simulation class is sim_cls=SimulatedDataWaveform, more details on defaults within the simulation class. + + >>> waveform = SimWaveform(name="waveform") + + Parameters + ---------- + name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device. + precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits. + sim_init (dict) : Dictionary to initiate parameters of the simulation, check simulation type defaults for more details. + parent : Parent device, optional, is used internally if this signal/device is part of a larger device. + kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options. + device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically. + + """ + + USER_ACCESS = ["sim", "registered_proxies"] + + sim_cls = SimulatedDataWaveform + SHAPE = (1000,) + BIT_DEPTH = np.uint16 + + SUB_MONITOR = "monitor" + _default_sub = SUB_MONITOR + + exp_time = Cpt(SetableSignal, name="exp_time", value=1, kind=Kind.config) + file_path = Cpt(SetableSignal, name="file_path", value="", kind=Kind.config) + file_pattern = Cpt(SetableSignal, name="file_pattern", value="", kind=Kind.config) + frames = Cpt(SetableSignal, name="frames", value=1, kind=Kind.config) + burst = Cpt(SetableSignal, name="burst", value=1, kind=Kind.config) + + waveform_shape = Cpt(SetableSignal, name="waveform_shape", value=SHAPE, kind=Kind.config) + waveform = Cpt( + ReadOnlySignal, + name="waveform", + value=np.empty(SHAPE, dtype=BIT_DEPTH), + compute_readback=True, + kind=Kind.omitted, + ) + + def __init__( + self, name, *, kind=None, parent=None, sim_init: dict = None, device_manager=None, **kwargs + ): + self.device_manager = device_manager + self.init_sim_params = sim_init + self._registered_proxies = {} + self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs) + + super().__init__(name=name, parent=parent, kind=kind, **kwargs) + self._stopped = False + self._staged = False + self.scaninfo = None + self._update_scaninfo() + + @property + def registered_proxies(self) -> None: + """Dictionary of registered signal_names and proxies.""" + return self._registered_proxies + + def trigger(self) -> DeviceStatus: + """Trigger the camera to acquire images. + + This method can be called from BEC during a scan. It will acquire images and send them to BEC. + Whether the trigger is send from BEC is determined by the softwareTrigger argument in the device config. + + Here, we also run a callback on SUB_MONITOR to send the image data the device_monitor endpoint in BEC. + """ + status = DeviceStatus(self) + + self.subscribe(status._finished, event_type=self.SUB_ACQ_DONE, run=False) + + def acquire(): + try: + for _ in range(self.burst.get()): + self._run_subs(sub_type=self.SUB_MONITOR, value=self.waveform.get()) + if self._stopped: + raise DeviceStop + except DeviceStop: + pass + finally: + self._stopped = False + self._done_acquiring() + + threading.Thread(target=acquire, daemon=True).start() + return status + + def _update_scaninfo(self) -> None: + """Update scaninfo from BecScaninfoMixing + This depends on device manager and operation/sim_mode + """ + self.scaninfo = BecScaninfoMixin(self.device_manager) + + def stage(self) -> list[object]: + """Stage the camera for upcoming scan + + This method is called from BEC in preparation of a scan. + It receives metadata about the scan from BEC, + compiles it and prepares the camera for the scan. + + FYI: No data is written to disk in the simulation, but upon each trigger it + is published to the device_monitor endpoint in REDIS. + """ + if self._staged: + return super().stage() + self.scaninfo.load_scan_metadata() + self.file_path.set( + os.path.join( + self.file_path.get(), self.file_pattern.get().format(self.scaninfo.scan_number) + ) + ) + self.frames.set(self.scaninfo.num_points * self.scaninfo.frames_per_trigger) + self.exp_time.set(self.scaninfo.exp_time) + self.burst.set(self.scaninfo.frames_per_trigger) + self._stopped = False + return super().stage() + + def unstage(self) -> list[object]: + """Unstage the device + + Send reads from all config signals to redis + """ + if self._stopped is True or not self._staged: + return super().unstage() + + return super().unstage() + + def stop(self, *, success=False): + """Stop the device""" + self._stopped = True + super().stop(success=success) + + class SimPositioner(Device, PositionerBase): """ A simulated device mimicing any 1D Axis device (position, temperature, rotation). @@ -577,5 +713,12 @@ class SimPositionerWithCommFailure(SimPositioner): if __name__ == "__main__": - cam = SimCamera(name="cam") - cam.image.read() + waveform = SimWaveform(name="waveform") + waveform.sim.sim_select_model(waveform.sim.sim_get_models()[7]) + waveform.sim.sim_params = { + "amplitude": 1500, + "noise_multiplier": 168, + "sigma": 50, + "center": 350, + } + waveform.waveform.get() diff --git a/ophyd_devices/sim/sim_data.py b/ophyd_devices/sim/sim_data.py index f60afb0..173b6ea 100644 --- a/ophyd_devices/sim/sim_data.py +++ b/ophyd_devices/sim/sim_data.py @@ -5,6 +5,7 @@ import inspect import time as ttime from abc import ABC, abstractmethod from collections import defaultdict +from copy import deepcopy import numpy as np from bec_lib import bec_logger @@ -310,8 +311,8 @@ class SimulatedDataMonitor(SimulatedDataBase): self._init_default() def _get_additional_params(self) -> None: - params = DEFAULT_PARAMS_NOISE.copy() - params.update(DEFAULT_PARAMS_MOTOR.copy()) + params = deepcopy(DEFAULT_PARAMS_NOISE) + params.update(deepcopy(DEFAULT_PARAMS_MOTOR)) return params def _init_default(self) -> None: @@ -439,6 +440,65 @@ class SimulatedDataMonitor(SimulatedDataBase): return v +class SimulatedDataWaveform(SimulatedDataMonitor): + """Simulated data class for a waveform. + + The class inherits from SimulatedDataMonitor, + and overwrites the relevant methods to compute + a simulated waveform for each point. + """ + + def _get_additional_params(self) -> None: + params = deepcopy(DEFAULT_PARAMS_NOISE) + return params + + def compute_sim_state(self, signal_name: str, compute_readback: bool) -> None: + """Update the simulated state of the device. + + It will update the value in self.sim_state with the value computed by + the chosen simulation type. + + Args: + signal_name (str): Name of the signal to update. + """ + if compute_readback: + method = self._compute + value = self.execute_simulation_method(method=method, signal_name=signal_name) + value = self.bit_depth(value) + self.update_sim_state(signal_name, value) + + def _compute(self, *args, **kwargs) -> np.ndarray: + """ + Compute the return value for active model. + + Returns: + np.array: Values computed for the activate model. + """ + size = self.parent.waveform_shape.get() + size = size[0] if isinstance(size, tuple) else size + method = self._model + value = method.eval(params=self._model_params, x=np.array(range(size))) + value *= self.sim_params["amplitude"] / np.max(value) + return self._add_noise(value, self.sim_params["noise"], self.sim_params["noise_multiplier"]) + + def _add_noise(self, v: np.ndarray, noise: NoiseType, noise_multiplier: float) -> np.ndarray: + """Add noise to the simulated data. + + Args: + v (np.ndarray): Simulated data. + noise (NoiseType): Type of noise to add. + """ + if noise == NoiseType.POISSON: + v = np.random.poisson(np.round(v), v.shape) + return v + if noise == NoiseType.UNIFORM: + v += np.random.uniform(-noise_multiplier, noise_multiplier, v.shape) + v[v <= 0] = 0 + return v + if noise == NoiseType.NONE: + return v + + class SimulatedDataCamera(SimulatedDataBase): """Simulated class to compute data for a 2D camera.""" @@ -468,24 +528,24 @@ class SimulatedDataCamera(SimulatedDataBase): return model_lookup def _get_additional_params(self) -> None: - params = DEFAULT_PARAMS_NOISE.copy() - params.update(DEFAULT_PARAMS_HOT_PIXEL.copy()) + params = deepcopy(DEFAULT_PARAMS_NOISE) + params.update(deepcopy(DEFAULT_PARAMS_HOT_PIXEL)) return params def _init_default_camera_params(self) -> None: """Initiate additional params for the simulated camera.""" self._all_default_model_params.update( { - self._model_lookup[ - SimulationType2D.CONSTANT.value - ]: DEFAULT_PARAMS_CAMERA_CONSTANT.copy() + self._model_lookup[SimulationType2D.CONSTANT.value]: deepcopy( + DEFAULT_PARAMS_CAMERA_CONSTANT + ) } ) self._all_default_model_params.update( { - self._model_lookup[ - SimulationType2D.GAUSSIAN.value - ]: DEFAULT_PARAMS_CAMERA_GAUSSIAN.copy() + self._model_lookup[SimulationType2D.GAUSSIAN.value]: deepcopy( + DEFAULT_PARAMS_CAMERA_GAUSSIAN + ) } )