From 5c02e1ecaea2b282e838fcea13c0e18d9beeb10e Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 21 Jan 2025 15:23:37 +0100 Subject: [PATCH] feat: add sim device for tests that returns device status for stage/unstage --- ophyd_devices/sim/sim_test_devices.py | 52 ++++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/ophyd_devices/sim/sim_test_devices.py b/ophyd_devices/sim/sim_test_devices.py index 9d989d0..88353bd 100644 --- a/ophyd_devices/sim/sim_test_devices.py +++ b/ophyd_devices/sim/sim_test_devices.py @@ -7,7 +7,7 @@ from bec_lib import messages from bec_lib.endpoints import MessageEndpoints from bec_lib.logger import bec_logger from ophyd import Component as Cpt -from ophyd import Device, DeviceStatus, OphydObject, PositionerBase +from ophyd import Device, DeviceStatus, OphydObject, PositionerBase, Staged from ophyd_devices.sim.sim_camera import SimCamera from ophyd_devices.sim.sim_positioner import SimPositioner @@ -72,6 +72,56 @@ class DummyController: print(self.some_var) +class SimDeviceWithStatusStageUnstage(Device): + """SimDevice with stage and unstage methods that return a status object. + + Methods resolve once the stage_thread_event or unstage_thread_event is set. + Stop always resolves immediately. + """ + + def __init__(self, name, **kwargs): + super().__init__(name=name, **kwargs) + self.stage_thread = None + self.stage_thread_event = None + self.stopped = False + + def stage(self) -> DeviceStatus: + """Stage the device and return a status object.""" + self.stopped = False + if self.stage_thread is not None: + self.stage_thread_event.set() + self.stage_thread.join() + self.stage_thread_event = threading.Event() + status = DeviceStatus(self) + + def stage_device(status: DeviceStatus): + + self.stage_thread_event.wait() + + if self.stopped is True: + exc = RuntimeError(f"Device {self.name} was stopped") + status.set_exception(exc) + else: + self._staged = Staged.yes + status.set_finished() + + self.stage_thread = threading.Thread(target=stage_device, args=(status,)) + self.stage_thread.start() + return status + + def unstage(self) -> DeviceStatus: + """Unstage the device and return a status object.""" + self.stopped = False + return super().unstage() + + def stop(self, success: bool = False): + """Stop the device and set the stopped flag.""" + self.stopped = True + if self.stage_thread_event: + self.stage_thread_event.set() + super().stop(success=success) + + class SynController(OphydObject): def on(self): pass