From 96a5f1b86a17f099a935c460baf397d9c93bc612 Mon Sep 17 00:00:00 2001 From: appel_c Date: Fri, 9 Feb 2024 19:21:28 +0100 Subject: [PATCH] refactor: refactored SimMonitor and SimCamera --- ophyd_devices/sim/sim.py | 270 +++++++++++---------------------------- 1 file changed, 74 insertions(+), 196 deletions(-) diff --git a/ophyd_devices/sim/sim.py b/ophyd_devices/sim/sim.py index eb9156d..a0dd91f 100644 --- a/ophyd_devices/sim/sim.py +++ b/ophyd_devices/sim/sim.py @@ -1,4 +1,3 @@ -from collections import defaultdict import os import threading import time as ttime @@ -9,10 +8,12 @@ from bec_lib import MessageEndpoints, bec_logger, messages from ophyd import Component as Cpt from ophyd import Device, DeviceStatus, Kind from ophyd import DynamicDeviceComponent as Dcpt -from ophyd import OphydObject, PositionerBase, Signal -from ophyd.sim import EnumSignal, SynSignal -from ophyd.utils import LimitError, ReadOnlyError +from ophyd import PositionerBase, Signal +from ophyd.sim import SynSignal +from ophyd.utils import LimitError +from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin from ophyd_devices.sim.sim_data import SimulatedDataBase, SimulatedDataCamera, SimulatedDataMonitor +from ophyd_devices.sim.sim_additional_devices import DummyController from ophyd_devices.sim.sim_signals import SetableSignal, ReadOnlySignal, ComputedReadOnlySignal @@ -27,21 +28,20 @@ class SimMonitor(Device): """ A simulated device mimic any 1D Axis (position, temperature, beam). - Readback functionality can be configured + It's readback is a computed signal, which is configurable by the user and from the command line. + The corresponding simulation class is sim_cls=SimulatedDataMonitor, more details on defaults within the simulation class. + + >>> monitor = SimMonitor(name="monitor") Parameters ---------- - name : string, keyword only - value : object, optional - The initial value. Default is 0. - delay : number, optional - Simulates how long it takes the device to "move". Default is 0 seconds. - precision : integer, optional - Digits of precision. Default is 3. - parent : Device, optional - Used internally if this Signal is made part of a larger Device. - kind : a member the Kind IntEnum (or equivalent integer), optional - Default is Kind.normal. See Kind for options. + name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device. + precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits. + sim_init (dict) : Dictionary to initiate parameters of the simulation, check simulation type defaults for more details. + parent : Parent device, optional, is used internally if this signal/device is part of a larger device. + kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options. + device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically. + """ USER_ACCESS = ["sim"] @@ -55,130 +55,43 @@ class SimMonitor(Device): def __init__( self, - *, name, - value=0, - delay=0, - precision=3, - tolerance: float = 0.5, + *, + precision: int = 3, sim_init: dict = None, parent=None, - labels=None, kind=None, device_manager=None, **kwargs, ): - self.delay = delay self.precision = precision - self.tolerance = tolerance self.init_sim_params = sim_init self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs) - super().__init__(name=name, parent=parent, labels=labels, kind=kind, **kwargs) + super().__init__(name=name, parent=parent, kind=kind, **kwargs) self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None) self.readback.name = self.name -class SynGaussBEC(Device): - """ - Evaluate a point on a Gaussian based on the value of a motor. +class SimCamera(Device): + """A simulated device mimic any 2D camera. + + It's image is a computed signal, which is configurable by the user and from the command line. + The corresponding simulation class is sim_cls=SimulatedDataCamera, more details on defaults within the simulation class. + + >>> camera = SimCamera(name="camera") Parameters ---------- - name : string - motor : Device - motor_field : string - center : number - center of peak - Imax : number - max intensity of peak - sigma : number, optional - Default is 1. - noise : {'poisson', 'uniform', None}, optional - Add noise to the gaussian peak. - noise_multiplier : float, optional - Only relevant for 'uniform' noise. Multiply the random amount of - noise by 'noise_multiplier' - random_state : numpy random state object, optional - np.random.RandomState(0), to generate random number with given seed + name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device. + precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits. + sim_init (dict) : Dictionary to initiate parameters of the simulation, check simulation type defaults for more details. + parent : Parent device, optional, is used internally if this signal/device is part of a larger device. + kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options. + device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically. - Example - ------- - motor = SynAxis(name='motor') - det = SynGauss('det', motor, 'motor', center=0, Imax=1, sigma=1) """ - val = Cpt(ComputedReadOnlySignal, value=0, kind=Kind.hinted) - Imax = Cpt(Signal, value=10, kind=Kind.config) - center = Cpt(Signal, value=0, kind=Kind.config) - sigma = Cpt(Signal, value=1, kind=Kind.config) - motor = Cpt(Signal, value="samx", kind=Kind.config) - noise = Cpt( - EnumSignal, - value="none", - kind=Kind.config, - enum_strings=("none", "poisson", "uniform"), - ) - noise_multiplier = Cpt(Signal, value=1, kind=Kind.config) - - def __init__(self, name, *, device_manager=None, random_state=None, **kwargs): - self.device_manager = device_manager - set_later = {} - for k in ("sigma", "noise", "noise_multiplier"): - v = kwargs.pop(k, None) - if v is not None: - set_later[k] = v - self.sim_state = defaultdict(lambda: {}) - super().__init__(name=name, **kwargs) - self.sim_state[self.name] = self.sim_state.pop(self.val.name, None) - self.val.name = self.name - - self.random_state = random_state or np.random - self.precision = 3 - - for k, v in set_later.items(): - getattr(self, k).put(v) - - def _compute_sim_state(self, signal_name: str) -> None: - try: - m = self.device_manager.devices[self.motor.get()].obj.read()[self.motor.get()]["value"] - # we need to do this one at a time because - # - self.read() may be screwed with by the user - # - self.get() would cause infinite recursion - Imax = self.Imax.get() - center = self.center.get() - sigma = self.sigma.get() - noise = self.noise.get() - noise_multiplier = self.noise_multiplier.get() - v = Imax * np.exp(-((m - center) ** 2) / (2 * sigma**2)) - if noise == "poisson": - v = int(self.random_state.poisson(np.round(v), 1)) - elif noise == "uniform": - v += self.random_state.uniform(-1, 1) * noise_multiplier - self.sim_state[signal_name]["value"] = v - self.sim_state[signal_name]["timestamp"] = ttime.time() - except Exception as exc: - logger.warning(f"Failed to compute sim state with exception {exc}") - self.sim_state[signal_name]["value"] = 0 - self.sim_state[signal_name]["timestamp"] = ttime.time() - - def get(self, *args, **kwargs): - self.sim_state["readback"] = self._compute() - self.sim_state["readback_ts"] = ttime.time() - return self.val.get() - - -class _SLSDetectorConfigSignal(Signal): - def put(self, value, *, timestamp=None, force=False): - self._readback = value - self.parent.sim_state[self.name] = value - - def get(self): - self._readback = self.parent.sim_state[self.name] - return self.parent.sim_state[self.name] - - -class SimCamera(Device): USER_ACCESS = ["sim"] sim_cls = SimulatedDataCamera @@ -189,12 +102,8 @@ class SimCamera(Device): exp_time = Cpt(SetableSignal, name="exp_time", value=1, kind=Kind.config) file_path = Cpt(SetableSignal, name="file_path", value="", kind=Kind.config) - file_pattern = Cpt(SetableSignal, name="file_pattern", value="", kind=Kind.config) frames = Cpt(SetableSignal, name="frames", value=1, kind=Kind.config) - burst = Cpt(SetableSignal, name="burst", value=1, kind=Kind.config) - save_file = Cpt(SetableSignal, name="save_file", value=False, kind=Kind.config) - # image shape, only adjustable via config. image_shape = Cpt(SetableSignal, name="image_shape", value=SHAPE, kind=Kind.config) image = Cpt( ComputedReadOnlySignal, @@ -205,8 +114,8 @@ class SimCamera(Device): def __init__( self, - *, name, + *, kind=None, parent=None, sim_init: dict = None, @@ -219,10 +128,17 @@ class SimCamera(Device): super().__init__(name=name, parent=parent, kind=kind, **kwargs) self._stopped = False - self.file_name = "" - self.metadata = {} + self.scaninfo = None + self._update_scaninfo() - def trigger(self): + def trigger(self) -> DeviceStatus: + """Trigger the camera to acquire images. + + This method can be called from BEC during a scan. It will acquire images and send them to BEC. + Whether the trigger is send from BEC is determined by the softwareTrigger argument in the device config. + + Here, we also run a callback on SUB_MONITOR to send the image data the device_monitor endpoint in BEC. + """ status = DeviceStatus(self) self.subscribe(status._finished, event_type=self.SUB_ACQ_DONE, run=False) @@ -230,7 +146,6 @@ class SimCamera(Device): def acquire(): try: for _ in range(self.burst.get()): - # Send data for each trigger self._run_subs(sub_type=self.SUB_MONITOR, value=self.image.get()) if self._stopped: raise DeviceStop @@ -243,28 +158,41 @@ class SimCamera(Device): threading.Thread(target=acquire, daemon=True).start() return status - def stage(self) -> list[object]: - """Stage the camera - - Receive scan message from REDIS first, extract relevant scan data, - and set all signals for the scan, e.g. scan_number, file_name, frames, etc. + def _update_scaninfo(self) -> None: + """Update scaninfo from BecScaninfoMixing + This depends on device manager and operation/sim_mode """ - msg = self.device_manager.producer.get(MessageEndpoints.scan_status()) - scan_msg = messages.ScanStatusMessage.loads(msg) - self.metadata = { - "scanID": scan_msg.content["scanID"], - "RID": scan_msg.content["info"]["RID"], - "queueID": scan_msg.content["info"]["queueID"], - } - scan_number = scan_msg.content["info"]["scan_number"] - self.frames.set(scan_msg.content["info"]["num_points"]) - self.file_name = os.path.join( - self.file_path.get(), self.file_pattern.get().format(scan_number) + self.scaninfo = BecScaninfoMixin(self.device_manager) + + def stage(self) -> list[object]: + """Stage the camera for upcoming scan + + This method is called from BEC in preparation of a scan. + It receives metadata about the scan from BEC, + compiles it and prepares the camera for the scan. + + FYI: No data is written to disk in the simulation, but upon each trigger it + is published to the device_monitor endpoint in REDIS. + """ + if self._staged: + return super().stag e() + self.scaninfo.load_scan_metadata() + self.file_path.set( + os.path.join( + self.file_path.get(), self.file_pattern.get().format(self.scaninfo.scan_number) + ) ) + self.frames = self.scaninfo.num_points * self.scaninfo.frames_per_trigger + self.exp_time = self.scaninfo.exp_time self._stopped = False return super().stage() def _send_data_to_bec(self) -> None: + """Send data to BEC. + + Reads out all signals of type Kind.config, and send them to BEC. + Happens once for each scan. + """ config_readout = { signal.item.name: signal.item.get() for signal in self.walk_signals() @@ -272,7 +200,7 @@ class SimCamera(Device): } signals = {"config": config_readout, "data": self.file_name} - msg = messages.DeviceMessage(signals=signals, metadata=self.metadata) + msg = messages.DeviceMessage(signals=signals, metadata=self.scaninfo.metadata) self.device_manager.producer.set_and_publish( MessageEndpoints.device_read(self.name), msg.dumps() ) @@ -289,53 +217,11 @@ class SimCamera(Device): return super().unstage() def stop(self, *, success=False): + """Stop the device""" self._stopped = True super().stop(success=success) -class DummyController: - USER_ACCESS = [ - "some_var", - "controller_show_all", - "_func_with_args", - "_func_with_args_and_kwargs", - "_func_with_kwargs", - "_func_without_args_kwargs", - ] - some_var = 10 - another_var = 20 - - def on(self): - self._connected = True - - def off(self): - self._connected = False - - def _func_with_args(self, *args): - return args - - def _func_with_args_and_kwargs(self, *args, **kwargs): - return args, kwargs - - def _func_with_kwargs(self, **kwargs): - return kwargs - - def _func_without_args_kwargs(self): - return None - - def controller_show_all(self): - """dummy controller show all - - Raises: - in: _description_ - LimitError: _description_ - - Returns: - _type_: _description_ - """ - print(self.some_var) - - class DummyControllerDevice(Device): USER_ACCESS = ["controller"] @@ -439,14 +325,6 @@ class SynFlyer(Device, PositionerBase): flyer.start() -class SynController(OphydObject): - def on(self): - pass - - def off(self): - pass - - class SynFlyerLamNI(Device, PositionerBase): def __init__( self,