fix: fixed sim test positioner import

This commit is contained in:
2024-08-13 11:41:06 +02:00
parent c560930ce3
commit 287b6588fc
3 changed files with 24 additions and 16 deletions

View File

@ -2,9 +2,9 @@ from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd_devices.sim.sim_positioner import SimPositionerWithCommFailure # noqa: F401
from ophyd_devices.sim.sim_positioner import SimPositioner
from ophyd_devices.sim.sim_signals import SetableSignal as SynSignal
from ophyd_devices.sim.sim_test_devices import SimPositionerWithCommFailure # noqa: F401
class SynDeviceSubOPAAS(Device):

View File

@ -254,17 +254,3 @@ class SimLinearTrajectoryPositioner(SimPositioner):
st.set_exception(exc=exc)
finally:
self._set_sim_state(self.motor_is_moving.name, 0)
class SimPositionerWithCommFailure(SimPositioner):
fails = Cpt(SetableSignal, value=0)
def move(self, value: float, **kwargs) -> DeviceStatus:
if self.fails.get() == 1:
raise RuntimeError("Communication failure")
if self.fails.get() == 2:
while not self._stopped:
ttime.sleep(1)
status = DeviceStatus(self)
status.set_exception(RuntimeError("Communication failure"))
return super().move(value, **kwargs)

View File

@ -4,7 +4,11 @@ import time as ttime
import numpy as np
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from ophyd import Device, OphydObject, PositionerBase
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, OphydObject, PositionerBase
from ophyd_devices.sim.sim_positioner import SimPositioner
from ophyd_devices.sim.sim_signals import SetableSignal
class DummyControllerDevice(Device):
@ -149,3 +153,21 @@ class SynFlyerLamNI(Device, PositionerBase):
flyer = threading.Thread(target=produce_data, args=(self, metadata))
flyer.start()
class SimPositionerWithCommFailure(SimPositioner):
"""
A simulated positioner that can be configured to fail on move.
"""
fails = Cpt(SetableSignal, value=0)
def move(self, value: float, **kwargs) -> DeviceStatus:
if self.fails.get() == 1:
raise RuntimeError("Communication failure")
if self.fails.get() == 2:
while not self._stopped:
ttime.sleep(1)
status = DeviceStatus(self)
status.set_exception(RuntimeError("Communication failure"))
return super().move(value, **kwargs)