refactor: moved sim test devices to sim_test_devices
This commit is contained in:
parent
87d2b22b51
commit
a49c6f6a62
@ -4,7 +4,8 @@ from .sim_flyer import SimFlyer
|
|||||||
SynFlyer = SimFlyer
|
SynFlyer = SimFlyer
|
||||||
from .sim_frameworks import SlitProxy
|
from .sim_frameworks import SlitProxy
|
||||||
from .sim_monitor import SimMonitor
|
from .sim_monitor import SimMonitor
|
||||||
from .sim_positioner import SimPositioner, SimPositionerWithCommFailure, SimPositionerWithController
|
from .sim_positioner import SimPositioner
|
||||||
from .sim_signals import ReadOnlySignal, SetableSignal
|
from .sim_signals import ReadOnlySignal, SetableSignal
|
||||||
|
from .sim_test_devices import SimPositionerWithCommFailure, SimPositionerWithController
|
||||||
from .sim_waveform import SimWaveform
|
from .sim_waveform import SimWaveform
|
||||||
from .sim_xtreme import SynXtremeOtf
|
from .sim_xtreme import SynXtremeOtf
|
||||||
|
@ -2,7 +2,6 @@ from ophyd import Component as Cpt
|
|||||||
from ophyd import Device
|
from ophyd import Device
|
||||||
from ophyd import DynamicDeviceComponent as Dcpt
|
from ophyd import DynamicDeviceComponent as Dcpt
|
||||||
|
|
||||||
from ophyd_devices.sim.sim_positioner import SimPositionerWithCommFailure # noqa: F401
|
|
||||||
from ophyd_devices.sim.sim_positioner import SimPositioner
|
from ophyd_devices.sim.sim_positioner import SimPositioner
|
||||||
from ophyd_devices.sim.sim_signals import SetableSignal as SynSignal
|
from ophyd_devices.sim.sim_signals import SetableSignal as SynSignal
|
||||||
|
|
||||||
|
@ -13,7 +13,6 @@ from typeguard import typechecked
|
|||||||
|
|
||||||
from ophyd_devices.sim.sim_data import SimulatedPositioner
|
from ophyd_devices.sim.sim_data import SimulatedPositioner
|
||||||
from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal
|
from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal
|
||||||
from ophyd_devices.sim.sim_test_devices import DummyController
|
|
||||||
from ophyd_devices.sim.sim_utils import LinearTrajectory, stop_trajectory
|
from ophyd_devices.sim.sim_utils import LinearTrajectory, stop_trajectory
|
||||||
from ophyd_devices.utils.errors import DeviceStopError
|
from ophyd_devices.utils.errors import DeviceStopError
|
||||||
|
|
||||||
@ -89,7 +88,6 @@ class SimPositioner(Device, PositionerBase):
|
|||||||
|
|
||||||
self.update_frequency = update_frequency
|
self.update_frequency = update_frequency
|
||||||
self._stopped = False
|
self._stopped = False
|
||||||
self.dummy_controller = DummyController()
|
|
||||||
|
|
||||||
self.sim = self.sim_cls(parent=self, **kwargs)
|
self.sim = self.sim_cls(parent=self, **kwargs)
|
||||||
|
|
||||||
@ -254,25 +252,3 @@ class SimLinearTrajectoryPositioner(SimPositioner):
|
|||||||
st.set_exception(exc=exc)
|
st.set_exception(exc=exc)
|
||||||
finally:
|
finally:
|
||||||
self._set_sim_state(self.motor_is_moving.name, 0)
|
self._set_sim_state(self.motor_is_moving.name, 0)
|
||||||
|
|
||||||
|
|
||||||
class SimPositionerWithCommFailure(SimPositioner):
|
|
||||||
fails = Cpt(SetableSignal, value=0)
|
|
||||||
|
|
||||||
def move(self, value: float, **kwargs) -> DeviceStatus:
|
|
||||||
if self.fails.get() == 1:
|
|
||||||
raise RuntimeError("Communication failure")
|
|
||||||
if self.fails.get() == 2:
|
|
||||||
while not self._stopped:
|
|
||||||
ttime.sleep(1)
|
|
||||||
status = DeviceStatus(self)
|
|
||||||
status.set_exception(RuntimeError("Communication failure"))
|
|
||||||
return super().move(value, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
class SimPositionerWithController(SimPositioner):
|
|
||||||
USER_ACCESS = ["sim", "readback", "dummy_controller", "registered_proxies"]
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
self.controller = DummyController()
|
|
||||||
|
@ -4,7 +4,11 @@ import time as ttime
|
|||||||
import numpy as np
|
import numpy as np
|
||||||
from bec_lib import messages
|
from bec_lib import messages
|
||||||
from bec_lib.endpoints import MessageEndpoints
|
from bec_lib.endpoints import MessageEndpoints
|
||||||
from ophyd import Device, OphydObject, PositionerBase
|
from ophyd import Component as Cpt
|
||||||
|
from ophyd import Device, DeviceStatus, OphydObject, PositionerBase
|
||||||
|
|
||||||
|
from ophyd_devices.sim.sim_positioner import SimPositioner
|
||||||
|
from ophyd_devices.sim.sim_signals import SetableSignal
|
||||||
|
|
||||||
|
|
||||||
class DummyControllerDevice(Device):
|
class DummyControllerDevice(Device):
|
||||||
@ -149,3 +153,25 @@ class SynFlyerLamNI(Device, PositionerBase):
|
|||||||
|
|
||||||
flyer = threading.Thread(target=produce_data, args=(self, metadata))
|
flyer = threading.Thread(target=produce_data, args=(self, metadata))
|
||||||
flyer.start()
|
flyer.start()
|
||||||
|
|
||||||
|
|
||||||
|
class SimPositionerWithCommFailure(SimPositioner):
|
||||||
|
fails = Cpt(SetableSignal, value=0)
|
||||||
|
|
||||||
|
def move(self, value: float, **kwargs) -> DeviceStatus:
|
||||||
|
if self.fails.get() == 1:
|
||||||
|
raise RuntimeError("Communication failure")
|
||||||
|
if self.fails.get() == 2:
|
||||||
|
while not self._stopped:
|
||||||
|
ttime.sleep(1)
|
||||||
|
status = DeviceStatus(self)
|
||||||
|
status.set_exception(RuntimeError("Communication failure"))
|
||||||
|
return super().move(value, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class SimPositionerWithController(SimPositioner):
|
||||||
|
USER_ACCESS = ["sim", "readback", "dummy_controller", "registered_proxies"]
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.dummy_controller = DummyController()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user