feat: add sim device for tests that returns device status for stage/unstage

This commit is contained in:
appel_c 2025-01-21 15:23:37 +01:00
parent b2e09ca70d
commit 5c02e1ecae

View File

@ -7,7 +7,7 @@ from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, OphydObject, PositionerBase
from ophyd import Device, DeviceStatus, OphydObject, PositionerBase, Staged
from ophyd_devices.sim.sim_camera import SimCamera
from ophyd_devices.sim.sim_positioner import SimPositioner
@ -72,6 +72,56 @@ class DummyController:
print(self.some_var)
class SimDeviceWithStatusStageUnstage(Device):
"""SimDevice with stage and unstage methods that return a status object.
Methods resolve once the stage_thread_event or unstage_thread_event is set.
Stop always resolves immediately.
"""
def __init__(self, name, **kwargs):
super().__init__(name=name, **kwargs)
self.stage_thread = None
self.stage_thread_event = None
self.stopped = False
def stage(self) -> DeviceStatus:
"""Stage the device and return a status object."""
self.stopped = False
if self.stage_thread is not None:
self.stage_thread_event.set()
self.stage_thread.join()
self.stage_thread_event = threading.Event()
status = DeviceStatus(self)
def stage_device(status: DeviceStatus):
self.stage_thread_event.wait()
if self.stopped is True:
exc = RuntimeError(f"Device {self.name} was stopped")
status.set_exception(exc)
else:
self._staged = Staged.yes
status.set_finished()
self.stage_thread = threading.Thread(target=stage_device, args=(status,))
self.stage_thread.start()
return status
def unstage(self) -> DeviceStatus:
"""Unstage the device and return a status object."""
self.stopped = False
return super().unstage()
def stop(self, success: bool = False):
"""Stop the device and set the stopped flag."""
self.stopped = True
if self.stage_thread_event:
self.stage_thread_event.set()
super().stop(success=success)
class SynController(OphydObject):
def on(self):
pass