mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2026-01-16 15:29:20 +01:00
refactor: renamed SynAxisOPPAS to SimPositioner; moved readback/setpoint/ismoving signal to sim_signals; closes 27
This commit is contained in:
@@ -6,7 +6,7 @@ from .npoint.npoint import NPointAxis
|
|||||||
from .rt_lamni import RtLamniMotor
|
from .rt_lamni import RtLamniMotor
|
||||||
from .sim.sim import (
|
from .sim.sim import (
|
||||||
SynAxisMonitor,
|
SynAxisMonitor,
|
||||||
SynAxisOPAAS,
|
SimPositioner,
|
||||||
SynDeviceOPAAS,
|
SynDeviceOPAAS,
|
||||||
SynFlyer,
|
SynFlyer,
|
||||||
SynSignalRO,
|
SynSignalRO,
|
||||||
|
|||||||
@@ -1,9 +1,11 @@
|
|||||||
from .sim import (
|
from .sim import (
|
||||||
SynAxisMonitor,
|
SynAxisMonitor,
|
||||||
SynAxisOPAAS,
|
SimPositioner,
|
||||||
SynDynamicComponents,
|
SynDynamicComponents,
|
||||||
SynFlyer,
|
SynFlyer,
|
||||||
SynSignalRO,
|
SynSignalRO,
|
||||||
SynSLSDetector,
|
SynSLSDetector,
|
||||||
)
|
)
|
||||||
from .sim_xtreme import SynXtremeOtf
|
from .sim_xtreme import SynXtremeOtf
|
||||||
|
|
||||||
|
from .sim_signals import ReadbackSignal, SetpointSignal, IsMovingSignal
|
||||||
|
|||||||
@@ -9,9 +9,11 @@ from ophyd import Component as Cpt
|
|||||||
from ophyd import Device, DeviceStatus
|
from ophyd import Device, DeviceStatus
|
||||||
from ophyd import DynamicDeviceComponent as Dcpt
|
from ophyd import DynamicDeviceComponent as Dcpt
|
||||||
from ophyd import OphydObject, PositionerBase, Signal
|
from ophyd import OphydObject, PositionerBase, Signal
|
||||||
from ophyd.sim import EnumSignal, SynSignal, _ReadbackSignal, _SetpointSignal
|
from ophyd.sim import EnumSignal, SynSignal
|
||||||
from ophyd.utils import LimitError, ReadOnlyError
|
from ophyd.utils import LimitError, ReadOnlyError
|
||||||
|
|
||||||
|
from ophyd_devices.sim.sim_signals import ReadbackSignal, SetpointSignal, IsMovingSignal
|
||||||
|
|
||||||
logger = bec_logger.logger
|
logger = bec_logger.logger
|
||||||
|
|
||||||
|
|
||||||
@@ -35,39 +37,6 @@ class SynSignalRO(Signal):
|
|||||||
return self._readback
|
return self._readback
|
||||||
|
|
||||||
|
|
||||||
class _ReadbackSignal(Signal):
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
self._metadata.update(
|
|
||||||
connected=True,
|
|
||||||
write_access=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
def get(self):
|
|
||||||
self._readback = self.parent.sim_state["readback"]
|
|
||||||
self.parent.sim_state["readback_ts"] = ttime.time()
|
|
||||||
return self._readback
|
|
||||||
|
|
||||||
def describe(self):
|
|
||||||
res = super().describe()
|
|
||||||
# There should be only one key here, but for the sake of
|
|
||||||
# generality....
|
|
||||||
for k in res:
|
|
||||||
res[k]["precision"] = self.parent.precision
|
|
||||||
return res
|
|
||||||
|
|
||||||
@property
|
|
||||||
def timestamp(self):
|
|
||||||
"""Timestamp of the readback value"""
|
|
||||||
return self.parent.sim_state["readback_ts"]
|
|
||||||
|
|
||||||
def put(self, value, *, timestamp=None, force=False):
|
|
||||||
raise ReadOnlyError("The signal {} is readonly.".format(self.name))
|
|
||||||
|
|
||||||
def set(self, value, *, timestamp=None, force=False):
|
|
||||||
raise ReadOnlyError("The signal {} is readonly.".format(self.name))
|
|
||||||
|
|
||||||
|
|
||||||
class _ReadbackSignalCompute(Signal):
|
class _ReadbackSignalCompute(Signal):
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
@@ -133,50 +102,6 @@ class _ReadbackSignalRand(Signal):
|
|||||||
raise ReadOnlyError("The signal {} is readonly.".format(self.name))
|
raise ReadOnlyError("The signal {} is readonly.".format(self.name))
|
||||||
|
|
||||||
|
|
||||||
class _SetpointSignal(Signal):
|
|
||||||
def put(self, value, *, timestamp=None, force=False):
|
|
||||||
self._readback = float(value)
|
|
||||||
self.parent.set(float(value))
|
|
||||||
|
|
||||||
def get(self):
|
|
||||||
self._readback = self.parent.sim_state["setpoint"]
|
|
||||||
return self.parent.sim_state["setpoint"]
|
|
||||||
|
|
||||||
def describe(self):
|
|
||||||
res = super().describe()
|
|
||||||
# There should be only one key here, but for the sake of generality....
|
|
||||||
for k in res:
|
|
||||||
res[k]["precision"] = self.parent.precision
|
|
||||||
return res
|
|
||||||
|
|
||||||
@property
|
|
||||||
def timestamp(self):
|
|
||||||
"""Timestamp of the readback value"""
|
|
||||||
return self.parent.sim_state["setpoint_ts"]
|
|
||||||
|
|
||||||
|
|
||||||
class _IsMovingSignal(Signal):
|
|
||||||
def put(self, value, *, timestamp=None, force=False):
|
|
||||||
self._readback = float(value)
|
|
||||||
self.parent.set(float(value))
|
|
||||||
|
|
||||||
def get(self):
|
|
||||||
self._readback = self.parent.sim_state["is_moving"]
|
|
||||||
return self.parent.sim_state["is_moving"]
|
|
||||||
|
|
||||||
def describe(self):
|
|
||||||
res = super().describe()
|
|
||||||
# There should be only one key here, but for the sake of generality....
|
|
||||||
for k in res:
|
|
||||||
res[k]["precision"] = self.parent.precision
|
|
||||||
return res
|
|
||||||
|
|
||||||
@property
|
|
||||||
def timestamp(self):
|
|
||||||
"""Timestamp of the readback value"""
|
|
||||||
return self.parent.sim_state["is_moving_ts"]
|
|
||||||
|
|
||||||
|
|
||||||
class SynAxisMonitor(Device):
|
class SynAxisMonitor(Device):
|
||||||
"""
|
"""
|
||||||
A synthetic settable Device mimic any 1D Axis (position, temperature).
|
A synthetic settable Device mimic any 1D Axis (position, temperature).
|
||||||
@@ -578,9 +503,9 @@ class SynFlyerLamNI(Device, PositionerBase):
|
|||||||
flyer.start()
|
flyer.start()
|
||||||
|
|
||||||
|
|
||||||
class SynAxisOPAAS(Device, PositionerBase):
|
class SimPositioner(Device, PositionerBase):
|
||||||
"""
|
"""
|
||||||
A synthetic settable Device mimic any 1D Axis (position, temperature).
|
A simulated device mimicing any 1D Axis device (position, temperature, rotation).
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
@@ -602,18 +527,24 @@ class SynAxisOPAAS(Device, PositionerBase):
|
|||||||
Default is Kind.normal. See Kind for options.
|
Default is Kind.normal. See Kind for options.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# Specify which attributes are accessible via BEC client
|
||||||
USER_ACCESS = ["sim_state", "readback", "speed", "dummy_controller"]
|
USER_ACCESS = ["sim_state", "readback", "speed", "dummy_controller"]
|
||||||
readback = Cpt(_ReadbackSignal, value=0, kind="hinted")
|
|
||||||
setpoint = Cpt(_SetpointSignal, value=0, kind="normal")
|
|
||||||
motor_is_moving = Cpt(_IsMovingSignal, value=0, kind="normal")
|
|
||||||
|
|
||||||
|
# Define the signals as class attributes
|
||||||
|
readback = Cpt(ReadbackSignal, value=0, kind="hinted")
|
||||||
|
setpoint = Cpt(SetpointSignal, value=0, kind="normal")
|
||||||
|
motor_is_moving = Cpt(IsMovingSignal, value=0, kind="normal")
|
||||||
|
|
||||||
|
# Config signals
|
||||||
velocity = Cpt(Signal, value=1, kind="config")
|
velocity = Cpt(Signal, value=1, kind="config")
|
||||||
acceleration = Cpt(Signal, value=1, kind="config")
|
acceleration = Cpt(Signal, value=1, kind="config")
|
||||||
|
|
||||||
|
# Ommitted signals
|
||||||
high_limit_travel = Cpt(Signal, value=0, kind="omitted")
|
high_limit_travel = Cpt(Signal, value=0, kind="omitted")
|
||||||
low_limit_travel = Cpt(Signal, value=0, kind="omitted")
|
low_limit_travel = Cpt(Signal, value=0, kind="omitted")
|
||||||
unused = Cpt(Signal, value=1, kind="omitted")
|
unused = Cpt(Signal, value=1, kind="omitted")
|
||||||
|
|
||||||
|
# TODO add short description to these two lines and explain what this does
|
||||||
SUB_READBACK = "readback"
|
SUB_READBACK = "readback"
|
||||||
_default_sub = SUB_READBACK
|
_default_sub = SUB_READBACK
|
||||||
|
|
||||||
@@ -623,7 +554,7 @@ class SynAxisOPAAS(Device, PositionerBase):
|
|||||||
name,
|
name,
|
||||||
readback_func=None,
|
readback_func=None,
|
||||||
value=0,
|
value=0,
|
||||||
delay=0,
|
delay=1,
|
||||||
speed=1,
|
speed=1,
|
||||||
update_frequency=2,
|
update_frequency=2,
|
||||||
precision=3,
|
precision=3,
|
||||||
@@ -638,6 +569,7 @@ class SynAxisOPAAS(Device, PositionerBase):
|
|||||||
def readback_func(x):
|
def readback_func(x):
|
||||||
return x
|
return x
|
||||||
|
|
||||||
|
# TODO what is this, check if still needed..
|
||||||
sentinel = object()
|
sentinel = object()
|
||||||
loop = kwargs.pop("loop", sentinel)
|
loop = kwargs.pop("loop", sentinel)
|
||||||
if loop is not sentinel:
|
if loop is not sentinel:
|
||||||
@@ -646,8 +578,9 @@ class SynAxisOPAAS(Device, PositionerBase):
|
|||||||
"Your input will be ignored and may raise in the future",
|
"Your input will be ignored and may raise in the future",
|
||||||
stacklevel=2,
|
stacklevel=2,
|
||||||
)
|
)
|
||||||
self.sim_state = {}
|
|
||||||
self._readback_func = readback_func
|
self._readback_func = readback_func
|
||||||
|
# Whether motions should be instantaneous or depend on motor velocity
|
||||||
self.delay = delay
|
self.delay = delay
|
||||||
self.precision = precision
|
self.precision = precision
|
||||||
self.speed = speed
|
self.speed = speed
|
||||||
@@ -655,9 +588,10 @@ class SynAxisOPAAS(Device, PositionerBase):
|
|||||||
self.tolerance = kwargs.pop("tolerance", 0.05)
|
self.tolerance = kwargs.pop("tolerance", 0.05)
|
||||||
self._stopped = False
|
self._stopped = False
|
||||||
|
|
||||||
self.dummy_controller = DummyController()
|
# self.dummy_controller = DummyController()
|
||||||
|
|
||||||
# initialize values
|
# initialize inner dictionary with simulated state
|
||||||
|
self.sim_state = {}
|
||||||
self.sim_state["setpoint"] = value
|
self.sim_state["setpoint"] = value
|
||||||
self.sim_state["setpoint_ts"] = ttime.time()
|
self.sim_state["setpoint_ts"] = ttime.time()
|
||||||
self.sim_state["readback"] = readback_func(value)
|
self.sim_state["readback"] = readback_func(value)
|
||||||
@@ -667,6 +601,7 @@ class SynAxisOPAAS(Device, PositionerBase):
|
|||||||
|
|
||||||
super().__init__(name=name, parent=parent, labels=labels, kind=kind, **kwargs)
|
super().__init__(name=name, parent=parent, labels=labels, kind=kind, **kwargs)
|
||||||
self.readback.name = self.name
|
self.readback.name = self.name
|
||||||
|
# Init limits from deviceConfig
|
||||||
if limits is not None:
|
if limits is not None:
|
||||||
assert len(limits) == 2
|
assert len(limits) == 2
|
||||||
self.low_limit_travel.put(limits[0])
|
self.low_limit_travel.put(limits[0])
|
||||||
@@ -674,55 +609,48 @@ class SynAxisOPAAS(Device, PositionerBase):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def limits(self):
|
def limits(self):
|
||||||
|
"""Return the limits of the simulated device."""
|
||||||
return (self.low_limit_travel.get(), self.high_limit_travel.get())
|
return (self.low_limit_travel.get(), self.high_limit_travel.get())
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def low_limit(self):
|
def low_limit(self):
|
||||||
|
"""Return the low limit of the simulated device."""
|
||||||
return self.limits[0]
|
return self.limits[0]
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def high_limit(self):
|
def high_limit(self):
|
||||||
|
"""Return the high limit of the simulated device."""
|
||||||
return self.limits[1]
|
return self.limits[1]
|
||||||
|
|
||||||
def check_value(self, pos):
|
def check_value(self, value: any):
|
||||||
"""Check that the position is within the soft limits"""
|
"""
|
||||||
|
Check that requested position is within existing limits.
|
||||||
|
|
||||||
|
This function has to be implemented on the top level of the positioner.
|
||||||
|
"""
|
||||||
low_limit, high_limit = self.limits
|
low_limit, high_limit = self.limits
|
||||||
|
|
||||||
if low_limit < high_limit and not (low_limit <= pos <= high_limit):
|
if low_limit < high_limit and not low_limit <= value <= high_limit:
|
||||||
raise LimitError(f"position={pos} not within limits {self.limits}")
|
raise LimitError(f"position={value} not within limits {self.limits}")
|
||||||
|
|
||||||
def set(self, value):
|
def move(self, value, **kwargs) -> DeviceStatus:
|
||||||
|
"""Change the setpoint of the simulated device, and simultaneously initiated a motion."""
|
||||||
self._stopped = False
|
self._stopped = False
|
||||||
self.check_value(value)
|
self.check_value(value)
|
||||||
old_setpoint = self.sim_state["setpoint"]
|
old_setpoint = self.sim_state["setpoint"]
|
||||||
self.sim_state["is_moving"] = 1
|
self.sim_state["is_moving"] = 1
|
||||||
self.motor_is_moving._run_subs(
|
|
||||||
sub_type=self.motor_is_moving.SUB_VALUE,
|
|
||||||
old_value=0,
|
|
||||||
value=1,
|
|
||||||
timestamp=self.sim_state["is_moving_ts"],
|
|
||||||
)
|
|
||||||
self.sim_state["setpoint"] = value
|
self.sim_state["setpoint"] = value
|
||||||
self.sim_state["setpoint_ts"] = ttime.time()
|
self.sim_state["setpoint_ts"] = ttime.time()
|
||||||
self.setpoint._run_subs(
|
|
||||||
sub_type=self.setpoint.SUB_VALUE,
|
|
||||||
old_value=old_setpoint,
|
|
||||||
value=self.sim_state["setpoint"],
|
|
||||||
timestamp=self.sim_state["setpoint_ts"],
|
|
||||||
)
|
|
||||||
|
|
||||||
def update_state(val):
|
def update_state(val):
|
||||||
|
"""Update the state of the simulated device."""
|
||||||
if self._stopped:
|
if self._stopped:
|
||||||
raise DeviceStop
|
raise DeviceStop
|
||||||
old_readback = self.sim_state["readback"]
|
old_readback = self.sim_state["readback"]
|
||||||
self.sim_state["readback"] = val
|
self.sim_state["readback"] = val
|
||||||
self.sim_state["readback_ts"] = ttime.time()
|
self.sim_state["readback_ts"] = ttime.time()
|
||||||
self.readback._run_subs(
|
|
||||||
sub_type=self.readback.SUB_VALUE,
|
# Run subscription on "readback"
|
||||||
old_value=old_readback,
|
|
||||||
value=self.sim_state["readback"],
|
|
||||||
timestamp=self.sim_state["readback_ts"],
|
|
||||||
)
|
|
||||||
self._run_subs(
|
self._run_subs(
|
||||||
sub_type=self.SUB_READBACK,
|
sub_type=self.SUB_READBACK,
|
||||||
old_value=old_readback,
|
old_value=old_readback,
|
||||||
@@ -732,53 +660,59 @@ class SynAxisOPAAS(Device, PositionerBase):
|
|||||||
|
|
||||||
st = DeviceStatus(device=self)
|
st = DeviceStatus(device=self)
|
||||||
if self.delay:
|
if self.delay:
|
||||||
|
# If self.delay is not 0, we use the speed and updated frequency of the device to compute the motion
|
||||||
def move_and_finish():
|
def move_and_finish():
|
||||||
|
"""Move the simulated device and finish the motion."""
|
||||||
success = True
|
success = True
|
||||||
try:
|
try:
|
||||||
|
# Compute final position with some jitter
|
||||||
move_val = self.sim_state["setpoint"] + self.tolerance * np.random.uniform(
|
move_val = self.sim_state["setpoint"] + self.tolerance * np.random.uniform(
|
||||||
-1, 1
|
-1, 1
|
||||||
)
|
)
|
||||||
|
# Compute the number of updates needed to reach the final position with the given speed
|
||||||
updates = np.ceil(
|
updates = np.ceil(
|
||||||
np.abs(old_setpoint - move_val) / self.speed * self.update_frequency
|
np.abs(old_setpoint - move_val) / self.speed * self.update_frequency
|
||||||
)
|
)
|
||||||
|
# Loop over the updates and update the state of the simulated device
|
||||||
for ii in np.linspace(old_setpoint, move_val, int(updates)):
|
for ii in np.linspace(old_setpoint, move_val, int(updates)):
|
||||||
ttime.sleep(1 / self.update_frequency)
|
ttime.sleep(1 / self.update_frequency)
|
||||||
update_state(ii)
|
update_state(ii)
|
||||||
|
# Update the state of the simulated device to the final position
|
||||||
update_state(move_val)
|
update_state(move_val)
|
||||||
self.sim_state["is_moving"] = 0
|
self.sim_state["is_moving"] = 0
|
||||||
self.sim_state["is_moving_ts"] = ttime.time()
|
self.sim_state["is_moving_ts"] = ttime.time()
|
||||||
self.motor_is_moving._run_subs(
|
|
||||||
sub_type=self.motor_is_moving.SUB_VALUE,
|
|
||||||
old_value=1,
|
|
||||||
value=0,
|
|
||||||
timestamp=self.sim_state["is_moving_ts"],
|
|
||||||
)
|
|
||||||
except DeviceStop:
|
except DeviceStop:
|
||||||
success = False
|
success = False
|
||||||
finally:
|
finally:
|
||||||
self._stopped = False
|
self._stopped = False
|
||||||
|
# Call function from positioner base to indicate that motion finished with success
|
||||||
self._done_moving(success=success)
|
self._done_moving(success=success)
|
||||||
|
# Set status to finished
|
||||||
st.set_finished()
|
st.set_finished()
|
||||||
|
|
||||||
|
# Start motion in Thread
|
||||||
threading.Thread(target=move_and_finish, daemon=True).start()
|
threading.Thread(target=move_and_finish, daemon=True).start()
|
||||||
# raise TimeoutError
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
# If self.delay is 0, we move the simulated device instantaneously
|
||||||
update_state(value)
|
update_state(value)
|
||||||
self._done_moving()
|
self._done_moving()
|
||||||
st.set_finished()
|
st.set_finished()
|
||||||
return st
|
return st
|
||||||
|
|
||||||
def stop(self, *, success=False):
|
def stop(self, *, success=False):
|
||||||
|
"""Stop the motion of the simulated device."""
|
||||||
super().stop(success=success)
|
super().stop(success=success)
|
||||||
self._stopped = True
|
self._stopped = True
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def position(self):
|
def position(self):
|
||||||
|
"""Return the current position of the simulated device."""
|
||||||
return self.readback.get()
|
return self.readback.get()
|
||||||
|
|
||||||
|
@property
|
||||||
def egu(self):
|
def egu(self):
|
||||||
|
"""Return the engineering units of the simulated device."""
|
||||||
return "mm"
|
return "mm"
|
||||||
|
|
||||||
|
|
||||||
@@ -871,12 +805,12 @@ class SynDynamicComponents(Device):
|
|||||||
|
|
||||||
|
|
||||||
class SynDeviceSubOPAAS(Device):
|
class SynDeviceSubOPAAS(Device):
|
||||||
zsub = Cpt(SynAxisOPAAS, name="zsub")
|
zsub = Cpt(SimPositioner, name="zsub")
|
||||||
|
|
||||||
|
|
||||||
class SynDeviceOPAAS(Device):
|
class SynDeviceOPAAS(Device):
|
||||||
x = Cpt(SynAxisOPAAS, name="x")
|
x = Cpt(SimPositioner, name="x")
|
||||||
y = Cpt(SynAxisOPAAS, name="y")
|
y = Cpt(SimPositioner, name="y")
|
||||||
z = Cpt(SynDeviceSubOPAAS, name="z")
|
z = Cpt(SynDeviceSubOPAAS, name="z")
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
96
ophyd_devices/sim/sim_signals.py
Normal file
96
ophyd_devices/sim/sim_signals.py
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
import time as ttime
|
||||||
|
|
||||||
|
from bec_lib import bec_logger
|
||||||
|
from ophyd import Signal
|
||||||
|
from ophyd.utils import ReadOnlyError
|
||||||
|
|
||||||
|
logger = bec_logger.logger
|
||||||
|
|
||||||
|
|
||||||
|
class ReadbackSignal(Signal):
|
||||||
|
"""Readback signal for simulated devices.
|
||||||
|
|
||||||
|
It will return the value of the readback signal based on the position
|
||||||
|
created in the sim_state dictionary of the parent device.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self._metadata.update(
|
||||||
|
connected=True,
|
||||||
|
write_access=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
def get(self, **kwargs):
|
||||||
|
"""Get the current position of the simulated device."""
|
||||||
|
self._readback = self.parent.sim_state["readback"]
|
||||||
|
self.parent.sim_state["readback_ts"] = ttime.time()
|
||||||
|
return self._readback
|
||||||
|
|
||||||
|
def describe(self):
|
||||||
|
"""Describe the readback signal."""
|
||||||
|
res = super().describe()
|
||||||
|
res[self.name]["precision"] = self.parent.precision
|
||||||
|
return res
|
||||||
|
|
||||||
|
@property
|
||||||
|
def timestamp(self):
|
||||||
|
"""Timestamp of the readback value"""
|
||||||
|
return self.parent.sim_state["readback_ts"]
|
||||||
|
|
||||||
|
def put(self, value, *, timestamp=None, force=False, **kwargs):
|
||||||
|
"""Put method, should raise ReadOnlyError since the signal is readonly."""
|
||||||
|
raise ReadOnlyError(f"The signal {self.name} is readonly.")
|
||||||
|
|
||||||
|
def set(self, value, *, timestamp=None, force=False, **kwargs):
|
||||||
|
"""Set method, should raise ReadOnlyError since the signal is readonly."""
|
||||||
|
raise ReadOnlyError(f"The signal {self.name} is readonly.")
|
||||||
|
|
||||||
|
|
||||||
|
class SetpointSignal(Signal):
|
||||||
|
"""Setpoint signal for simulated devices.
|
||||||
|
|
||||||
|
When read, it will return the "setpoint" key from the dictionary sim_state,
|
||||||
|
and whe put it will call the set method of the parent device with the value.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def put(self, value, *, timestamp=None, force=False, **kwargs):
|
||||||
|
"""Put the value to the simulated device."""
|
||||||
|
self._readback = float(value)
|
||||||
|
self.parent.set(float(value))
|
||||||
|
|
||||||
|
def get(self, **kwargs):
|
||||||
|
"""Get the current setpoint of the simulated device."""
|
||||||
|
self._readback = self.parent.sim_state["setpoint"]
|
||||||
|
return self.parent.sim_state["setpoint"]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def timestamp(self):
|
||||||
|
"""Timestamp of the readback value"""
|
||||||
|
return self.parent.sim_state["setpoint_ts"]
|
||||||
|
|
||||||
|
|
||||||
|
class IsMovingSignal(Signal):
|
||||||
|
"""IsMoving signal for simulated devices.
|
||||||
|
|
||||||
|
When read, it will return the "is_moving" key from the dictionary sim_state,
|
||||||
|
and whe put it will call the set method of the parent device with the value.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def get(self, **kwargs):
|
||||||
|
self._readback = self.parent.sim_state["is_moving"]
|
||||||
|
self.parent.sim_state["is_moving_ts"] = ttime.time()
|
||||||
|
return self.parent.sim_state["is_moving"]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def timestamp(self):
|
||||||
|
"""Timestamp of the readback value"""
|
||||||
|
return self.parent.sim_state["is_moving_ts"]
|
||||||
|
|
||||||
|
def put(self, value, *, timestamp=None, force=False, **kwargs):
|
||||||
|
"""Put method, should raise ReadOnlyError since the signal is readonly."""
|
||||||
|
raise ReadOnlyError(f"The signal {self.name} is readonly.")
|
||||||
|
|
||||||
|
def set(self, value, *, timestamp=None, force=False, **kwargs):
|
||||||
|
"""Set method, should raise ReadOnlyError since the signal is readonly."""
|
||||||
|
raise ReadOnlyError(f"The signal {self.name} is readonly.")
|
||||||
Reference in New Issue
Block a user