From a49c6f6a625a576524fceca62dd0a1582a4a4a7d Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Thu, 15 Aug 2024 15:16:24 +0200 Subject: [PATCH] refactor: moved sim test devices to sim_test_devices --- ophyd_devices/sim/__init__.py | 3 ++- ophyd_devices/sim/sim.py | 1 - ophyd_devices/sim/sim_positioner.py | 24 ----------------------- ophyd_devices/sim/sim_test_devices.py | 28 ++++++++++++++++++++++++++- 4 files changed, 29 insertions(+), 27 deletions(-) diff --git a/ophyd_devices/sim/__init__.py b/ophyd_devices/sim/__init__.py index 0bddf55..e83baf5 100644 --- a/ophyd_devices/sim/__init__.py +++ b/ophyd_devices/sim/__init__.py @@ -4,7 +4,8 @@ from .sim_flyer import SimFlyer SynFlyer = SimFlyer from .sim_frameworks import SlitProxy from .sim_monitor import SimMonitor -from .sim_positioner import SimPositioner, SimPositionerWithCommFailure, SimPositionerWithController +from .sim_positioner import SimPositioner from .sim_signals import ReadOnlySignal, SetableSignal +from .sim_test_devices import SimPositionerWithCommFailure, SimPositionerWithController from .sim_waveform import SimWaveform from .sim_xtreme import SynXtremeOtf diff --git a/ophyd_devices/sim/sim.py b/ophyd_devices/sim/sim.py index 7c72f38..bddb0ae 100644 --- a/ophyd_devices/sim/sim.py +++ b/ophyd_devices/sim/sim.py @@ -2,7 +2,6 @@ from ophyd import Component as Cpt from ophyd import Device from ophyd import DynamicDeviceComponent as Dcpt -from ophyd_devices.sim.sim_positioner import SimPositionerWithCommFailure # noqa: F401 from ophyd_devices.sim.sim_positioner import SimPositioner from ophyd_devices.sim.sim_signals import SetableSignal as SynSignal diff --git a/ophyd_devices/sim/sim_positioner.py b/ophyd_devices/sim/sim_positioner.py index 3cd60dd..dadb0e6 100644 --- a/ophyd_devices/sim/sim_positioner.py +++ b/ophyd_devices/sim/sim_positioner.py @@ -13,7 +13,6 @@ from typeguard import typechecked from ophyd_devices.sim.sim_data import SimulatedPositioner from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal -from ophyd_devices.sim.sim_test_devices import DummyController from ophyd_devices.sim.sim_utils import LinearTrajectory, stop_trajectory from ophyd_devices.utils.errors import DeviceStopError @@ -89,7 +88,6 @@ class SimPositioner(Device, PositionerBase): self.update_frequency = update_frequency self._stopped = False - self.dummy_controller = DummyController() self.sim = self.sim_cls(parent=self, **kwargs) @@ -254,25 +252,3 @@ class SimLinearTrajectoryPositioner(SimPositioner): st.set_exception(exc=exc) finally: self._set_sim_state(self.motor_is_moving.name, 0) - - -class SimPositionerWithCommFailure(SimPositioner): - fails = Cpt(SetableSignal, value=0) - - def move(self, value: float, **kwargs) -> DeviceStatus: - if self.fails.get() == 1: - raise RuntimeError("Communication failure") - if self.fails.get() == 2: - while not self._stopped: - ttime.sleep(1) - status = DeviceStatus(self) - status.set_exception(RuntimeError("Communication failure")) - return super().move(value, **kwargs) - - -class SimPositionerWithController(SimPositioner): - USER_ACCESS = ["sim", "readback", "dummy_controller", "registered_proxies"] - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.controller = DummyController() diff --git a/ophyd_devices/sim/sim_test_devices.py b/ophyd_devices/sim/sim_test_devices.py index 2be5e16..84fb0a0 100644 --- a/ophyd_devices/sim/sim_test_devices.py +++ b/ophyd_devices/sim/sim_test_devices.py @@ -4,7 +4,11 @@ import time as ttime import numpy as np from bec_lib import messages from bec_lib.endpoints import MessageEndpoints -from ophyd import Device, OphydObject, PositionerBase +from ophyd import Component as Cpt +from ophyd import Device, DeviceStatus, OphydObject, PositionerBase + +from ophyd_devices.sim.sim_positioner import SimPositioner +from ophyd_devices.sim.sim_signals import SetableSignal class DummyControllerDevice(Device): @@ -149,3 +153,25 @@ class SynFlyerLamNI(Device, PositionerBase): flyer = threading.Thread(target=produce_data, args=(self, metadata)) flyer.start() + + +class SimPositionerWithCommFailure(SimPositioner): + fails = Cpt(SetableSignal, value=0) + + def move(self, value: float, **kwargs) -> DeviceStatus: + if self.fails.get() == 1: + raise RuntimeError("Communication failure") + if self.fails.get() == 2: + while not self._stopped: + ttime.sleep(1) + status = DeviceStatus(self) + status.set_exception(RuntimeError("Communication failure")) + return super().move(value, **kwargs) + + +class SimPositionerWithController(SimPositioner): + USER_ACCESS = ["sim", "readback", "dummy_controller", "registered_proxies"] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.dummy_controller = DummyController()