diff --git a/ophyd_devices/__init__.py b/ophyd_devices/__init__.py index c51c5a7..383618f 100644 --- a/ophyd_devices/__init__.py +++ b/ophyd_devices/__init__.py @@ -6,7 +6,7 @@ from .npoint.npoint import NPointAxis from .rt_lamni import RtLamniMotor from .sim.sim import ( SynAxisMonitor, - SynAxisOPAAS, + SimPositioner, SynDeviceOPAAS, SynFlyer, SynSignalRO, diff --git a/ophyd_devices/sim/__init__.py b/ophyd_devices/sim/__init__.py index 3c8036c..92d8fb6 100644 --- a/ophyd_devices/sim/__init__.py +++ b/ophyd_devices/sim/__init__.py @@ -1,9 +1,11 @@ from .sim import ( SynAxisMonitor, - SynAxisOPAAS, + SimPositioner, SynDynamicComponents, SynFlyer, SynSignalRO, SynSLSDetector, ) from .sim_xtreme import SynXtremeOtf + +from .sim_signals import ReadbackSignal, SetpointSignal, IsMovingSignal diff --git a/ophyd_devices/sim/sim.py b/ophyd_devices/sim/sim.py index 23311ab..f4beb9f 100644 --- a/ophyd_devices/sim/sim.py +++ b/ophyd_devices/sim/sim.py @@ -9,9 +9,11 @@ from ophyd import Component as Cpt from ophyd import Device, DeviceStatus from ophyd import DynamicDeviceComponent as Dcpt from ophyd import OphydObject, PositionerBase, Signal -from ophyd.sim import EnumSignal, SynSignal, _ReadbackSignal, _SetpointSignal +from ophyd.sim import EnumSignal, SynSignal from ophyd.utils import LimitError, ReadOnlyError +from ophyd_devices.sim.sim_signals import ReadbackSignal, SetpointSignal, IsMovingSignal + logger = bec_logger.logger @@ -35,39 +37,6 @@ class SynSignalRO(Signal): return self._readback -class _ReadbackSignal(Signal): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._metadata.update( - connected=True, - write_access=False, - ) - - def get(self): - self._readback = self.parent.sim_state["readback"] - self.parent.sim_state["readback_ts"] = ttime.time() - return self._readback - - def describe(self): - res = super().describe() - # There should be only one key here, but for the sake of - # generality.... - for k in res: - res[k]["precision"] = self.parent.precision - return res - - @property - def timestamp(self): - """Timestamp of the readback value""" - return self.parent.sim_state["readback_ts"] - - def put(self, value, *, timestamp=None, force=False): - raise ReadOnlyError("The signal {} is readonly.".format(self.name)) - - def set(self, value, *, timestamp=None, force=False): - raise ReadOnlyError("The signal {} is readonly.".format(self.name)) - - class _ReadbackSignalCompute(Signal): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -133,50 +102,6 @@ class _ReadbackSignalRand(Signal): raise ReadOnlyError("The signal {} is readonly.".format(self.name)) -class _SetpointSignal(Signal): - def put(self, value, *, timestamp=None, force=False): - self._readback = float(value) - self.parent.set(float(value)) - - def get(self): - self._readback = self.parent.sim_state["setpoint"] - return self.parent.sim_state["setpoint"] - - def describe(self): - res = super().describe() - # There should be only one key here, but for the sake of generality.... - for k in res: - res[k]["precision"] = self.parent.precision - return res - - @property - def timestamp(self): - """Timestamp of the readback value""" - return self.parent.sim_state["setpoint_ts"] - - -class _IsMovingSignal(Signal): - def put(self, value, *, timestamp=None, force=False): - self._readback = float(value) - self.parent.set(float(value)) - - def get(self): - self._readback = self.parent.sim_state["is_moving"] - return self.parent.sim_state["is_moving"] - - def describe(self): - res = super().describe() - # There should be only one key here, but for the sake of generality.... - for k in res: - res[k]["precision"] = self.parent.precision - return res - - @property - def timestamp(self): - """Timestamp of the readback value""" - return self.parent.sim_state["is_moving_ts"] - - class SynAxisMonitor(Device): """ A synthetic settable Device mimic any 1D Axis (position, temperature). @@ -578,9 +503,9 @@ class SynFlyerLamNI(Device, PositionerBase): flyer.start() -class SynAxisOPAAS(Device, PositionerBase): +class SimPositioner(Device, PositionerBase): """ - A synthetic settable Device mimic any 1D Axis (position, temperature). + A simulated device mimicing any 1D Axis device (position, temperature, rotation). Parameters ---------- @@ -602,18 +527,24 @@ class SynAxisOPAAS(Device, PositionerBase): Default is Kind.normal. See Kind for options. """ + # Specify which attributes are accessible via BEC client USER_ACCESS = ["sim_state", "readback", "speed", "dummy_controller"] - readback = Cpt(_ReadbackSignal, value=0, kind="hinted") - setpoint = Cpt(_SetpointSignal, value=0, kind="normal") - motor_is_moving = Cpt(_IsMovingSignal, value=0, kind="normal") + # Define the signals as class attributes + readback = Cpt(ReadbackSignal, value=0, kind="hinted") + setpoint = Cpt(SetpointSignal, value=0, kind="normal") + motor_is_moving = Cpt(IsMovingSignal, value=0, kind="normal") + + # Config signals velocity = Cpt(Signal, value=1, kind="config") acceleration = Cpt(Signal, value=1, kind="config") + # Ommitted signals high_limit_travel = Cpt(Signal, value=0, kind="omitted") low_limit_travel = Cpt(Signal, value=0, kind="omitted") unused = Cpt(Signal, value=1, kind="omitted") + # TODO add short description to these two lines and explain what this does SUB_READBACK = "readback" _default_sub = SUB_READBACK @@ -623,7 +554,7 @@ class SynAxisOPAAS(Device, PositionerBase): name, readback_func=None, value=0, - delay=0, + delay=1, speed=1, update_frequency=2, precision=3, @@ -638,6 +569,7 @@ class SynAxisOPAAS(Device, PositionerBase): def readback_func(x): return x + # TODO what is this, check if still needed.. sentinel = object() loop = kwargs.pop("loop", sentinel) if loop is not sentinel: @@ -646,8 +578,9 @@ class SynAxisOPAAS(Device, PositionerBase): "Your input will be ignored and may raise in the future", stacklevel=2, ) - self.sim_state = {} + self._readback_func = readback_func + # Whether motions should be instantaneous or depend on motor velocity self.delay = delay self.precision = precision self.speed = speed @@ -655,9 +588,10 @@ class SynAxisOPAAS(Device, PositionerBase): self.tolerance = kwargs.pop("tolerance", 0.05) self._stopped = False - self.dummy_controller = DummyController() + # self.dummy_controller = DummyController() - # initialize values + # initialize inner dictionary with simulated state + self.sim_state = {} self.sim_state["setpoint"] = value self.sim_state["setpoint_ts"] = ttime.time() self.sim_state["readback"] = readback_func(value) @@ -667,6 +601,7 @@ class SynAxisOPAAS(Device, PositionerBase): super().__init__(name=name, parent=parent, labels=labels, kind=kind, **kwargs) self.readback.name = self.name + # Init limits from deviceConfig if limits is not None: assert len(limits) == 2 self.low_limit_travel.put(limits[0]) @@ -674,55 +609,48 @@ class SynAxisOPAAS(Device, PositionerBase): @property def limits(self): + """Return the limits of the simulated device.""" return (self.low_limit_travel.get(), self.high_limit_travel.get()) @property def low_limit(self): + """Return the low limit of the simulated device.""" return self.limits[0] @property def high_limit(self): + """Return the high limit of the simulated device.""" return self.limits[1] - def check_value(self, pos): - """Check that the position is within the soft limits""" + def check_value(self, value: any): + """ + Check that requested position is within existing limits. + + This function has to be implemented on the top level of the positioner. + """ low_limit, high_limit = self.limits - if low_limit < high_limit and not (low_limit <= pos <= high_limit): - raise LimitError(f"position={pos} not within limits {self.limits}") + if low_limit < high_limit and not low_limit <= value <= high_limit: + raise LimitError(f"position={value} not within limits {self.limits}") - def set(self, value): + def move(self, value, **kwargs) -> DeviceStatus: + """Change the setpoint of the simulated device, and simultaneously initiated a motion.""" self._stopped = False self.check_value(value) old_setpoint = self.sim_state["setpoint"] self.sim_state["is_moving"] = 1 - self.motor_is_moving._run_subs( - sub_type=self.motor_is_moving.SUB_VALUE, - old_value=0, - value=1, - timestamp=self.sim_state["is_moving_ts"], - ) self.sim_state["setpoint"] = value self.sim_state["setpoint_ts"] = ttime.time() - self.setpoint._run_subs( - sub_type=self.setpoint.SUB_VALUE, - old_value=old_setpoint, - value=self.sim_state["setpoint"], - timestamp=self.sim_state["setpoint_ts"], - ) def update_state(val): + """Update the state of the simulated device.""" if self._stopped: raise DeviceStop old_readback = self.sim_state["readback"] self.sim_state["readback"] = val self.sim_state["readback_ts"] = ttime.time() - self.readback._run_subs( - sub_type=self.readback.SUB_VALUE, - old_value=old_readback, - value=self.sim_state["readback"], - timestamp=self.sim_state["readback_ts"], - ) + + # Run subscription on "readback" self._run_subs( sub_type=self.SUB_READBACK, old_value=old_readback, @@ -732,53 +660,59 @@ class SynAxisOPAAS(Device, PositionerBase): st = DeviceStatus(device=self) if self.delay: - + # If self.delay is not 0, we use the speed and updated frequency of the device to compute the motion def move_and_finish(): + """Move the simulated device and finish the motion.""" success = True try: + # Compute final position with some jitter move_val = self.sim_state["setpoint"] + self.tolerance * np.random.uniform( -1, 1 ) + # Compute the number of updates needed to reach the final position with the given speed updates = np.ceil( np.abs(old_setpoint - move_val) / self.speed * self.update_frequency ) + # Loop over the updates and update the state of the simulated device for ii in np.linspace(old_setpoint, move_val, int(updates)): ttime.sleep(1 / self.update_frequency) update_state(ii) + # Update the state of the simulated device to the final position update_state(move_val) self.sim_state["is_moving"] = 0 self.sim_state["is_moving_ts"] = ttime.time() - self.motor_is_moving._run_subs( - sub_type=self.motor_is_moving.SUB_VALUE, - old_value=1, - value=0, - timestamp=self.sim_state["is_moving_ts"], - ) except DeviceStop: success = False finally: self._stopped = False + # Call function from positioner base to indicate that motion finished with success self._done_moving(success=success) + # Set status to finished st.set_finished() + # Start motion in Thread threading.Thread(target=move_and_finish, daemon=True).start() - # raise TimeoutError else: + # If self.delay is 0, we move the simulated device instantaneously update_state(value) self._done_moving() st.set_finished() return st def stop(self, *, success=False): + """Stop the motion of the simulated device.""" super().stop(success=success) self._stopped = True @property def position(self): + """Return the current position of the simulated device.""" return self.readback.get() + @property def egu(self): + """Return the engineering units of the simulated device.""" return "mm" @@ -871,12 +805,12 @@ class SynDynamicComponents(Device): class SynDeviceSubOPAAS(Device): - zsub = Cpt(SynAxisOPAAS, name="zsub") + zsub = Cpt(SimPositioner, name="zsub") class SynDeviceOPAAS(Device): - x = Cpt(SynAxisOPAAS, name="x") - y = Cpt(SynAxisOPAAS, name="y") + x = Cpt(SimPositioner, name="x") + y = Cpt(SimPositioner, name="y") z = Cpt(SynDeviceSubOPAAS, name="z") diff --git a/ophyd_devices/sim/sim_signals.py b/ophyd_devices/sim/sim_signals.py new file mode 100644 index 0000000..b0287f1 --- /dev/null +++ b/ophyd_devices/sim/sim_signals.py @@ -0,0 +1,96 @@ +import time as ttime + +from bec_lib import bec_logger +from ophyd import Signal +from ophyd.utils import ReadOnlyError + +logger = bec_logger.logger + + +class ReadbackSignal(Signal): + """Readback signal for simulated devices. + + It will return the value of the readback signal based on the position + created in the sim_state dictionary of the parent device. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._metadata.update( + connected=True, + write_access=False, + ) + + def get(self, **kwargs): + """Get the current position of the simulated device.""" + self._readback = self.parent.sim_state["readback"] + self.parent.sim_state["readback_ts"] = ttime.time() + return self._readback + + def describe(self): + """Describe the readback signal.""" + res = super().describe() + res[self.name]["precision"] = self.parent.precision + return res + + @property + def timestamp(self): + """Timestamp of the readback value""" + return self.parent.sim_state["readback_ts"] + + def put(self, value, *, timestamp=None, force=False, **kwargs): + """Put method, should raise ReadOnlyError since the signal is readonly.""" + raise ReadOnlyError(f"The signal {self.name} is readonly.") + + def set(self, value, *, timestamp=None, force=False, **kwargs): + """Set method, should raise ReadOnlyError since the signal is readonly.""" + raise ReadOnlyError(f"The signal {self.name} is readonly.") + + +class SetpointSignal(Signal): + """Setpoint signal for simulated devices. + + When read, it will return the "setpoint" key from the dictionary sim_state, + and whe put it will call the set method of the parent device with the value. + """ + + def put(self, value, *, timestamp=None, force=False, **kwargs): + """Put the value to the simulated device.""" + self._readback = float(value) + self.parent.set(float(value)) + + def get(self, **kwargs): + """Get the current setpoint of the simulated device.""" + self._readback = self.parent.sim_state["setpoint"] + return self.parent.sim_state["setpoint"] + + @property + def timestamp(self): + """Timestamp of the readback value""" + return self.parent.sim_state["setpoint_ts"] + + +class IsMovingSignal(Signal): + """IsMoving signal for simulated devices. + + When read, it will return the "is_moving" key from the dictionary sim_state, + and whe put it will call the set method of the parent device with the value. + """ + + def get(self, **kwargs): + self._readback = self.parent.sim_state["is_moving"] + self.parent.sim_state["is_moving_ts"] = ttime.time() + return self.parent.sim_state["is_moving"] + + @property + def timestamp(self): + """Timestamp of the readback value""" + return self.parent.sim_state["is_moving_ts"] + + def put(self, value, *, timestamp=None, force=False, **kwargs): + """Put method, should raise ReadOnlyError since the signal is readonly.""" + raise ReadOnlyError(f"The signal {self.name} is readonly.") + + def set(self, value, *, timestamp=None, force=False, **kwargs): + """Set method, should raise ReadOnlyError since the signal is readonly.""" + raise ReadOnlyError(f"The signal {self.name} is readonly.")