feat: move signals to own file and refactor access pattern to sim_state data.

This commit is contained in:
appel_c 2024-01-30 13:10:58 +01:00
parent 2ccd096ead
commit 6f3c2383b5

View File

@ -1,96 +1,245 @@
import time as ttime import time as ttime
from bec_lib import bec_logger from bec_lib import bec_logger
from ophyd import Signal import numpy as np
from ophyd import Signal, Kind
from ophyd.utils import ReadOnlyError from ophyd.utils import ReadOnlyError
logger = bec_logger.logger logger = bec_logger.logger
# Readout precision for Setable/Readonly/ComputedReadonly signals
PRECISION = 3
class ReadbackSignal(Signal):
"""Readback signal for simulated devices. class SetableSignal(Signal):
"""Setable signal for simulated devices.
It will return the value of the readback signal based on the position It will return the value of the readback signal based on the position
created in the sim_state dictionary of the parent device. created in the sim_state dictionary of the parent device.
""" """
def __init__(self, *args, **kwargs): def __init__(
super().__init__(*args, **kwargs) self,
*args,
name: str,
value: any = None,
kind: int = Kind.normal,
precision: float = PRECISION,
**kwargs,
):
super().__init__(*args, name=name, value=value, kind=kind, **kwargs)
self._metadata.update( self._metadata.update(
connected=True, connected=True,
write_access=False, write_access=False,
) )
self._value = value
self.precision = precision
# Init the sim_state, if self.parent.sim available, use it, else use self.parent
self.sim = getattr(self.parent, "sim", self.parent)
self._update_sim_state(value)
def get(self, **kwargs): def _update_sim_state(self, value: any) -> None:
"""Get the current position of the simulated device.""" """Update the readback value."""
self._readback = self.parent.sim_state["readback"] self.sim.update_sim_state(self.name, value)
self.parent.sim_state["readback_ts"] = ttime.time()
return self._readback def _get_value(self) -> any:
"""Update the timestamp of the readback value."""
return self.sim.sim_state[self.name]["value"]
def _get_timestamp(self) -> any:
"""Update the timestamp of the readback value."""
return self.sim.sim_state[self.name]["timestamp"]
def get(self):
"""Get the current position of the simulated device.
Core function for signal.
"""
self._value = self._get_value()
return self._value
def put(self, value):
"""Put the value to the simulated device.
Core function for signal.
"""
self._update_sim_state(value)
self._value = value
def describe(self): def describe(self):
"""Describe the readback signal.""" """Describe the readback signal.
Core function for signal.
"""
res = super().describe() res = super().describe()
res[self.name]["precision"] = self.parent.precision if self.precision is not None:
res[self.name]["precision"] = self.precision
return res return res
@property @property
def timestamp(self): def timestamp(self):
"""Timestamp of the readback value""" """Timestamp of the readback value"""
return self.parent.sim_state["readback_ts"] return self._get_timestamp()
def put(self, value, *, timestamp=None, force=False, **kwargs):
class ReadOnlySignal(Signal):
"""Readonly signal for simulated devices.
If initiated without a value, it will set the initial value to 0.
"""
def __init__(
self,
*args,
name: str,
value: any = 0,
kind: int = Kind.normal,
precision: float = PRECISION,
**kwargs,
):
super().__init__(*args, name=name, value=value, kind=kind, **kwargs)
self._metadata.update(
connected=True,
write_access=False,
)
self.precision = precision
self._value = value
# Init the sim_state, if self.parent.sim available, use it, else use self.parent
self.sim = getattr(self.parent, "sim", None)
self._init_sim_state()
def _init_sim_state(self) -> None:
"""Init the readback value and timestamp in sim_state"""
if self.sim:
self.sim.update_sim_state(self.name, self._value)
def _get_value(self) -> any:
"""Get the value of the readback from sim_state."""
if self.sim:
return self.sim.sim_state[self.name]["value"]
else:
return np.random.rand()
def _get_timestamp(self) -> any:
"""Get the timestamp of the readback from sim_state."""
if self.sim:
return self.sim.sim_state[self.name]["timestamp"]
else:
return ttime.time()
def get(self) -> any:
"""Get the current position of the simulated device.
Core function for signal.
"""
self._value = self._get_value()
return self._value
def put(self, value) -> None:
"""Put method, should raise ReadOnlyError since the signal is readonly.""" """Put method, should raise ReadOnlyError since the signal is readonly."""
raise ReadOnlyError(f"The signal {self.name} is readonly.") raise ReadOnlyError(f"The signal {self.name} is readonly.")
def set(self, value, *, timestamp=None, force=False, **kwargs): def set(self, value) -> None:
"""Set method, should raise ReadOnlyError since the signal is readonly.""" """Set method, should raise ReadOnlyError since the signal is readonly."""
raise ReadOnlyError(f"The signal {self.name} is readonly.") raise ReadOnlyError(f"The signal {self.name} is readonly.")
def describe(self):
"""Describe the readback signal.
class SetpointSignal(Signal): Core function for signal.
"""Setpoint signal for simulated devices.
When read, it will return the "setpoint" key from the dictionary sim_state,
and whe put it will call the set method of the parent device with the value.
""" """
res = super().describe()
def put(self, value, *, timestamp=None, force=False, **kwargs): if self.precision is not None:
"""Put the value to the simulated device.""" res[self.name]["precision"] = self.precision
self._readback = float(value) return res
self.parent.set(float(value))
def get(self, **kwargs):
"""Get the current setpoint of the simulated device."""
self._readback = self.parent.sim_state["setpoint"]
return self.parent.sim_state["setpoint"]
@property @property
def timestamp(self): def timestamp(self):
"""Timestamp of the readback value""" """Timestamp of the readback value"""
return self.parent.sim_state["setpoint_ts"] return self._get_timestamp()
class IsMovingSignal(Signal): class ComputedReadOnlySignal(Signal):
"""IsMoving signal for simulated devices. """Computed readback signal for simulated devices.
When read, it will return the "is_moving" key from the dictionary sim_state, It will return the value computed from the sim_state of the signal.
and whe put it will call the set method of the parent device with the value. This can be configured in parent.sim.
""" """
def get(self, **kwargs): def __init__(
self._readback = self.parent.sim_state["is_moving"] self,
self.parent.sim_state["is_moving_ts"] = ttime.time() *args,
return self.parent.sim_state["is_moving"] name: str,
value: any = None,
kind: int = Kind.normal,
precision: float = PRECISION,
**kwargs,
):
super().__init__(*args, name=name, value=value, kind=kind, **kwargs)
self._metadata.update(
connected=True,
write_access=False,
)
self._value = value
self.precision = precision
# Init the sim_state, if self.parent.sim available, use it, else use self.parent
self.sim = getattr(self.parent, "sim", self.parent)
self._update_sim_state()
def _update_sim_state(self) -> None:
"""Update the readback value.
Call _compute_sim_state in parent device which updates the sim_state.
"""
self.sim._compute_sim_state(self.name)
def _get_value(self) -> any:
"""Update the timestamp of the readback value."""
return self.sim.sim_state[self.name]["value"]
def _get_timestamp(self) -> any:
"""Update the timestamp of the readback value."""
return self.sim.sim_state[self.name]["timestamp"]
def get(self):
"""Get the current position of the simulated device.
Core function for signal.
"""
self._update_sim_state()
self._value = self._get_value()
return self._value
def put(self, value) -> None:
"""Put method, should raise ReadOnlyError since the signal is readonly."""
raise ReadOnlyError(f"The signal {self.name} is readonly.")
def set(self, value) -> None:
"""Set method, should raise ReadOnlyError since the signal is readonly."""
raise ReadOnlyError(f"The signal {self.name} is readonly.")
def describe(self):
"""Describe the readback signal.
Core function for signal.
"""
res = super().describe()
if self.precision is not None:
res[self.name]["precision"] = self.precision
return res
@property @property
def timestamp(self): def timestamp(self):
"""Timestamp of the readback value""" """Timestamp of the readback value"""
return self.parent.sim_state["is_moving_ts"] return self._get_timestamp()
def put(self, value, *, timestamp=None, force=False, **kwargs):
"""Put method, should raise ReadOnlyError since the signal is readonly."""
raise ReadOnlyError(f"The signal {self.name} is readonly.")
def set(self, value, *, timestamp=None, force=False, **kwargs): if __name__ == "__main__":
"""Set method, should raise ReadOnlyError since the signal is readonly.""" from ophyd_devices.sim import SimPositioner
raise ReadOnlyError(f"The signal {self.name} is readonly.")
positioner = SimPositioner(name="positioner", parent=None)
print(positioner.velocity.get())
positioner.velocity.put(10)
print(positioner.velocity.get())
positioner.velocity.put(1)
print(positioner.velocity.get())