diff --git a/ophyd_devices/sim/sim.py b/ophyd_devices/sim/sim.py index 7c72f38..82ec4c4 100644 --- a/ophyd_devices/sim/sim.py +++ b/ophyd_devices/sim/sim.py @@ -2,9 +2,9 @@ from ophyd import Component as Cpt from ophyd import Device from ophyd import DynamicDeviceComponent as Dcpt -from ophyd_devices.sim.sim_positioner import SimPositionerWithCommFailure # noqa: F401 from ophyd_devices.sim.sim_positioner import SimPositioner from ophyd_devices.sim.sim_signals import SetableSignal as SynSignal +from ophyd_devices.sim.sim_test_devices import SimPositionerWithCommFailure # noqa: F401 class SynDeviceSubOPAAS(Device): diff --git a/ophyd_devices/sim/sim_positioner.py b/ophyd_devices/sim/sim_positioner.py index cc4fb2a..4633b65 100644 --- a/ophyd_devices/sim/sim_positioner.py +++ b/ophyd_devices/sim/sim_positioner.py @@ -254,17 +254,3 @@ class SimLinearTrajectoryPositioner(SimPositioner): st.set_exception(exc=exc) finally: self._set_sim_state(self.motor_is_moving.name, 0) - - -class SimPositionerWithCommFailure(SimPositioner): - fails = Cpt(SetableSignal, value=0) - - def move(self, value: float, **kwargs) -> DeviceStatus: - if self.fails.get() == 1: - raise RuntimeError("Communication failure") - if self.fails.get() == 2: - while not self._stopped: - ttime.sleep(1) - status = DeviceStatus(self) - status.set_exception(RuntimeError("Communication failure")) - return super().move(value, **kwargs) diff --git a/ophyd_devices/sim/sim_test_devices.py b/ophyd_devices/sim/sim_test_devices.py index 2be5e16..e7ed377 100644 --- a/ophyd_devices/sim/sim_test_devices.py +++ b/ophyd_devices/sim/sim_test_devices.py @@ -4,7 +4,11 @@ import time as ttime import numpy as np from bec_lib import messages from bec_lib.endpoints import MessageEndpoints -from ophyd import Device, OphydObject, PositionerBase +from ophyd import Component as Cpt +from ophyd import Device, DeviceStatus, OphydObject, PositionerBase + +from ophyd_devices.sim.sim_positioner import SimPositioner +from ophyd_devices.sim.sim_signals import SetableSignal class DummyControllerDevice(Device): @@ -149,3 +153,21 @@ class SynFlyerLamNI(Device, PositionerBase): flyer = threading.Thread(target=produce_data, args=(self, metadata)) flyer.start() + + +class SimPositionerWithCommFailure(SimPositioner): + """ + A simulated positioner that can be configured to fail on move. + """ + + fails = Cpt(SetableSignal, value=0) + + def move(self, value: float, **kwargs) -> DeviceStatus: + if self.fails.get() == 1: + raise RuntimeError("Communication failure") + if self.fails.get() == 2: + while not self._stopped: + ttime.sleep(1) + status = DeviceStatus(self) + status.set_exception(RuntimeError("Communication failure")) + return super().move(value, **kwargs)