From f5ab78e933c2bbb34c571a72c25a7fc5c2b20e65 Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 17 Oct 2024 17:46:58 +0200 Subject: [PATCH] feat: add test device for return status for stage/unstage --- ophyd_devices/sim/sim_data.py | 4 ++ ophyd_devices/sim/sim_test_devices.py | 64 +++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/ophyd_devices/sim/sim_data.py b/ophyd_devices/sim/sim_data.py index f5532cd..83df2eb 100644 --- a/ophyd_devices/sim/sim_data.py +++ b/ophyd_devices/sim/sim_data.py @@ -597,6 +597,10 @@ class SimulatedDataCamera(SimulatedDataBase): method = "_compute_constant" elif self._model == SimulationType2D.GAUSSIAN: method = "_compute_gaussian" + else: + raise SimulatedDataException( + f"Model {self._model} not found in {self._model_lookup.keys()}." + ) value = self.execute_simulation_method( signal_name=signal_name, method=getattr(self, method) ) diff --git a/ophyd_devices/sim/sim_test_devices.py b/ophyd_devices/sim/sim_test_devices.py index 84fb0a0..9a51edc 100644 --- a/ophyd_devices/sim/sim_test_devices.py +++ b/ophyd_devices/sim/sim_test_devices.py @@ -1,15 +1,20 @@ import threading import time as ttime +import traceback import numpy as np from bec_lib import messages from bec_lib.endpoints import MessageEndpoints +from bec_lib.logger import bec_logger from ophyd import Component as Cpt from ophyd import Device, DeviceStatus, OphydObject, PositionerBase +from ophyd_devices.sim.sim_camera import SimCamera from ophyd_devices.sim.sim_positioner import SimPositioner from ophyd_devices.sim.sim_signals import SetableSignal +logger = bec_logger.logger + class DummyControllerDevice(Device): USER_ACCESS = ["controller"] @@ -175,3 +180,62 @@ class SimPositionerWithController(SimPositioner): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.dummy_controller = DummyController() + + +class SimCameraWithStageStatus(SimCamera): + """Simulated camera which returns a status object when staged. + + Note: This is a minimum implementation for test purposes without refactoring + the super().stage() method. In theory, the super().stage() method should take + into account a thread event to stop the staging process if the device is stopped. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.raise_on_stage = False + if "raise_on_stage" in kwargs: + self.raise_on_stage = kwargs.pop("raise_on_stage") + + def stage(self): + status = DeviceStatus(self) + + def _stage_device(obj, status: DeviceStatus): + """Start thread to stage the device""" + try: + logger.info(f"Staging device {obj.name} with status object") + super(SimCamera, obj).stage() + if obj.raise_on_stage is True: + raise RuntimeError(f"Error during staging in {obj.name}") + # pylint: disable=broad-except + except Exception as exc: + content = traceback.format_exc() + logger.warning(f"Error in staging of {obj.name}; Traceback: {content}") + status.set_exception(exc=exc) + else: + status.set_finished() + + thread = threading.Thread(target=_stage_device, args=(self, status), daemon=True) + thread.start() + return status + + def unstage(self): + status = DeviceStatus(self) + + def _unstage_device(obj, status: DeviceStatus): + """Start thread to stage the device""" + try: + logger.info(f"Unstaging device {obj.name} with status object") + super(SimCamera, obj).unstage() + if obj.raise_on_stage is True: + raise RuntimeError(f"Error during unstaging in {obj.name}") + # pylint: disable=broad-except + except Exception as exc: + content = traceback.format_exc() + logger.warning(f"Error in unstaging of {obj.name}; Traceback: {content}") + status.set_exception(exc=exc) + else: + status.set_finished() + + thread = threading.Thread(target=_unstage_device, args=(self, status), daemon=True) + thread.start() + return status