feat: refactor simulation, introduce SimCamera, SimMonitor in addition to existing classes

This commit is contained in:
appel_c 2024-01-30 13:12:00 +01:00
parent 8cc955b202
commit f311ce5d1c

View File

@ -1,3 +1,4 @@
from collections import defaultdict
import os
import threading
import time as ttime
@ -6,13 +7,14 @@ import warnings
import numpy as np
from bec_lib import MessageEndpoints, bec_logger, messages
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus
from ophyd import Device, DeviceStatus, Kind
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import OphydObject, PositionerBase, Signal
from ophyd.sim import EnumSignal, SynSignal
from ophyd.utils import LimitError, ReadOnlyError
from ophyd_devices.sim.sim_data import SimulatedDataBase, SimulatedDataCamera, SimulatedDataMonitor
from ophyd_devices.sim.sim_signals import ReadbackSignal, SetpointSignal, IsMovingSignal
from ophyd_devices.sim.sim_signals import SetableSignal, ReadOnlySignal, ComputedReadOnlySignal
logger = bec_logger.logger
@ -21,99 +23,15 @@ class DeviceStop(Exception):
pass
class SynSignalRO(Signal):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._metadata.update(
write_access=False,
)
def wait_for_connection(self, timeout=0):
super().wait_for_connection(timeout)
self._metadata.update(connected=True)
def get(self, **kwargs):
self._readback = np.random.rand()
return self._readback
class _ReadbackSignalCompute(Signal):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._metadata.update(
connected=True,
write_access=False,
)
def get(self):
readback = self.parent._compute()
self._readback = self.parent.sim_state["readback"] = readback
return readback
def describe(self):
res = super().describe()
# There should be only one key here, but for the sake of
# generality....
for k in res:
res[k]["precision"] = self.parent.precision
return res
@property
def timestamp(self):
"""Timestamp of the readback value"""
return self.parent.sim_state["readback_ts"]
def put(self, value, *, timestamp=None, force=False):
raise ReadOnlyError("The signal {} is readonly.".format(self.name))
def set(self, value, *, timestamp=None, force=False):
raise ReadOnlyError("The signal {} is readonly.".format(self.name))
class _ReadbackSignalRand(Signal):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._metadata.update(
connected=True,
write_access=False,
)
def get(self):
self._readback = np.random.rand()
return self._readback
def describe(self):
res = super().describe()
# There should be only one key here, but for the sake of
# generality....
for k in res:
res[k]["precision"] = self.parent.precision
return res
@property
def timestamp(self):
"""Timestamp of the readback value"""
return self.parent.sim_state["readback_ts"]
def put(self, value, *, timestamp=None, force=False):
raise ReadOnlyError("The signal {} is readonly.".format(self.name))
def set(self, value, *, timestamp=None, force=False):
raise ReadOnlyError("The signal {} is readonly.".format(self.name))
class SynAxisMonitor(Device):
class SimMonitor(Device):
"""
A synthetic settable Device mimic any 1D Axis (position, temperature).
A simulated device mimic any 1D Axis (position, temperature, beam).
Readback functionality can be configured
Parameters
----------
name : string, keyword only
readback_func : callable, optional
When the Device is set to ``x``, its readback will be updated to
``f(x)``. This can be used to introduce random noise or a systematic
offset.
Expected signature: ``f(x) -> value``.
value : object, optional
The initial value. Default is 0.
delay : number, optional
@ -126,7 +44,11 @@ class SynAxisMonitor(Device):
Default is Kind.normal. See Kind for options.
"""
readback = Cpt(_ReadbackSignalRand, value=0, kind="hinted")
USER_ACCESS = ["sim"]
sim_cls = SimulatedDataMonitor
readback = Cpt(ComputedReadOnlySignal, value=0, kind=Kind.hinted)
SUB_READBACK = "readback"
_default_sub = SUB_READBACK
@ -135,42 +57,117 @@ class SynAxisMonitor(Device):
self,
*,
name,
readback_func=None,
value=0,
delay=0,
precision=3,
tolerance: float = 0.5,
sim_init: dict = None,
parent=None,
labels=None,
kind=None,
device_manager=None,
**kwargs,
):
if readback_func is None:
def readback_func(x):
return x
sentinel = object()
loop = kwargs.pop("loop", sentinel)
if loop is not sentinel:
warnings.warn(
f"{self.__class__} no longer takes a loop as input. "
"Your input will be ignored and may raise in the future",
stacklevel=2,
)
self.sim_state = {}
self._readback_func = readback_func
self.delay = delay
self.precision = precision
self.tolerance = kwargs.pop("tolerance", 0.5)
# initialize values
self.sim_state["readback"] = readback_func(value)
self.sim_state["readback_ts"] = ttime.time()
self.tolerance = tolerance
self.init_sim_params = sim_init
self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs)
super().__init__(name=name, parent=parent, labels=labels, kind=kind, **kwargs)
self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None)
self.readback.name = self.name
class SynGaussBEC(Device):
"""
Evaluate a point on a Gaussian based on the value of a motor.
Parameters
----------
name : string
motor : Device
motor_field : string
center : number
center of peak
Imax : number
max intensity of peak
sigma : number, optional
Default is 1.
noise : {'poisson', 'uniform', None}, optional
Add noise to the gaussian peak.
noise_multiplier : float, optional
Only relevant for 'uniform' noise. Multiply the random amount of
noise by 'noise_multiplier'
random_state : numpy random state object, optional
np.random.RandomState(0), to generate random number with given seed
Example
-------
motor = SynAxis(name='motor')
det = SynGauss('det', motor, 'motor', center=0, Imax=1, sigma=1)
"""
val = Cpt(ComputedReadOnlySignal, value=0, kind=Kind.hinted)
Imax = Cpt(Signal, value=10, kind=Kind.config)
center = Cpt(Signal, value=0, kind=Kind.config)
sigma = Cpt(Signal, value=1, kind=Kind.config)
motor = Cpt(Signal, value="samx", kind=Kind.config)
noise = Cpt(
EnumSignal,
value="none",
kind=Kind.config,
enum_strings=("none", "poisson", "uniform"),
)
noise_multiplier = Cpt(Signal, value=1, kind=Kind.config)
def __init__(self, name, *, device_manager=None, random_state=None, **kwargs):
self.device_manager = device_manager
set_later = {}
for k in ("sigma", "noise", "noise_multiplier"):
v = kwargs.pop(k, None)
if v is not None:
set_later[k] = v
self.sim_state = defaultdict(lambda: {})
super().__init__(name=name, **kwargs)
self.sim_state[self.name] = self.sim_state.pop(self.val.name, None)
self.val.name = self.name
self.random_state = random_state or np.random
self.precision = 3
for k, v in set_later.items():
getattr(self, k).put(v)
def _compute_sim_state(self, signal_name: str) -> None:
try:
m = self.device_manager.devices[self.motor.get()].obj.read()[self.motor.get()]["value"]
# we need to do this one at a time because
# - self.read() may be screwed with by the user
# - self.get() would cause infinite recursion
Imax = self.Imax.get()
center = self.center.get()
sigma = self.sigma.get()
noise = self.noise.get()
noise_multiplier = self.noise_multiplier.get()
v = Imax * np.exp(-((m - center) ** 2) / (2 * sigma**2))
if noise == "poisson":
v = int(self.random_state.poisson(np.round(v), 1))
elif noise == "uniform":
v += self.random_state.uniform(-1, 1) * noise_multiplier
self.sim_state[signal_name]["value"] = v
self.sim_state[signal_name]["timestamp"] = ttime.time()
except Exception as exc:
logger.warning(f"Failed to compute sim state with exception {exc}")
self.sim_state[signal_name]["value"] = 0
self.sim_state[signal_name]["timestamp"] = ttime.time()
def get(self, *args, **kwargs):
self.sim_state["readback"] = self._compute()
self.sim_state["readback_ts"] = ttime.time()
return self.val.get()
class _SLSDetectorConfigSignal(Signal):
def put(self, value, *, timestamp=None, force=False):
self._readback = value
@ -181,26 +178,46 @@ class _SLSDetectorConfigSignal(Signal):
return self.parent.sim_state[self.name]
class SynSLSDetector(Device):
USER_ACCESS = []
exp_time = Cpt(_SLSDetectorConfigSignal, name="exp_time", value=1, kind="config")
file_path = Cpt(_SLSDetectorConfigSignal, name="file_path", value="", kind="config")
file_pattern = Cpt(_SLSDetectorConfigSignal, name="file_pattern", value="", kind="config")
frames = Cpt(_SLSDetectorConfigSignal, name="frames", value=1, kind="config")
burst = Cpt(_SLSDetectorConfigSignal, name="burst", value=1, kind="config")
save_file = Cpt(_SLSDetectorConfigSignal, name="save_file", value=False, kind="config")
class SimCamera(Device):
USER_ACCESS = ["sim"]
def __init__(self, *, name, kind=None, parent=None, device_manager=None, **kwargs):
sim_cls = SimulatedDataCamera
SHAPE = (100, 100)
SUB_MONITOR = "monitor"
_default_sub = SUB_MONITOR
exp_time = Cpt(SetableSignal, name="exp_time", value=1, kind=Kind.config)
file_path = Cpt(SetableSignal, name="file_path", value="", kind=Kind.config)
file_pattern = Cpt(SetableSignal, name="file_pattern", value="", kind=Kind.config)
frames = Cpt(SetableSignal, name="frames", value=1, kind=Kind.config)
burst = Cpt(SetableSignal, name="burst", value=1, kind=Kind.config)
save_file = Cpt(SetableSignal, name="save_file", value=False, kind=Kind.config)
# image shape, only adjustable via config.
image_shape = Cpt(ReadOnlySignal, name="image_shape", value=SHAPE, kind=Kind.config)
image = Cpt(
ComputedReadOnlySignal,
name="image",
value=np.empty(SHAPE, dtype=np.uint16),
kind=Kind.omitted,
)
def __init__(
self,
*,
name,
kind=None,
parent=None,
sim_init: dict = None,
device_manager=None,
**kwargs,
):
self.device_manager = device_manager
self.init_sim_params = sim_init
self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs)
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self.sim_state = {
f"{self.name}_file_path": "~/Data10/data/",
f"{self.name}_file_pattern": f"{self.name}_{{:05d}}.h5",
f"{self.name}_frames": 1,
f"{self.name}_burst": 1,
f"{self.name}_save_file": False,
f"{self.name}_exp_time": 0,
}
self._stopped = False
self.file_name = ""
self.metadata = {}
@ -213,6 +230,8 @@ class SynSLSDetector(Device):
def acquire():
try:
for _ in range(self.burst.get()):
# Send data for each trigger
self._run_subs(sub_type=self.SUB_MONITOR, value=self.image.get())
ttime.sleep(self.exp_time.get())
if self._stopped:
raise DeviceStop
@ -226,6 +245,11 @@ class SynSLSDetector(Device):
return status
def stage(self) -> list[object]:
"""Stage the camera
Receive scan message from REDIS first, extract relevant scan data,
and set all signals for the scan, e.g. scan_number, file_name, frames, etc.
"""
msg = self.device_manager.producer.get(MessageEndpoints.scan_status())
scan_msg = messages.ScanStatusMessage.loads(msg)
self.metadata = {
@ -238,19 +262,36 @@ class SynSLSDetector(Device):
self.file_name = os.path.join(
self.file_path.get(), self.file_pattern.get().format(scan_number)
)
self._stopped = False
return super().stage()
def unstage(self) -> list[object]:
signals = {"config": self.sim_state, "data": self.file_name}
def _send_data_to_bec(self) -> None:
config_readout = {
signal.item.name: signal.item.get()
for signal in self.walk_signals()
if signal.item._kind == Kind.config
}
signals = {"config": config_readout, "data": self.file_name}
msg = messages.DeviceMessage(signals=signals, metadata=self.metadata)
self.device_manager.producer.set_and_publish(
MessageEndpoints.device_read(self.name), msg.dumps()
)
def unstage(self) -> list[object]:
"""Unstage the device
Send reads from all config signals to redis
"""
if self._stopped is True or not self._staged:
return super().unstage()
self._send_data_to_bec()
return super().unstage()
def stop(self, *, success=False):
super().stop(success=success)
self._stopped = True
super().stop(success=success)
class DummyController:
@ -528,21 +569,23 @@ class SimPositioner(Device, PositionerBase):
"""
# Specify which attributes are accessible via BEC client
USER_ACCESS = ["sim_state", "readback", "speed", "dummy_controller"]
USER_ACCESS = ["sim", "readback", "speed", "dummy_controller"]
sim_cls = SimulatedDataBase
# Define the signals as class attributes
readback = Cpt(ReadbackSignal, value=0, kind="hinted")
setpoint = Cpt(SetpointSignal, value=0, kind="normal")
motor_is_moving = Cpt(IsMovingSignal, value=0, kind="normal")
readback = Cpt(ReadOnlySignal, name="readback", value=0, kind=Kind.hinted)
setpoint = Cpt(SetableSignal, value=0, kind=Kind.normal)
motor_is_moving = Cpt(SetableSignal, value=0, kind=Kind.normal)
# Config signals
velocity = Cpt(Signal, value=1, kind="config")
acceleration = Cpt(Signal, value=1, kind="config")
velocity = Cpt(SetableSignal, value=1, kind=Kind.config)
acceleration = Cpt(SetableSignal, value=1, kind=Kind.config)
# Ommitted signals
high_limit_travel = Cpt(Signal, value=0, kind="omitted")
low_limit_travel = Cpt(Signal, value=0, kind="omitted")
unused = Cpt(Signal, value=1, kind="omitted")
high_limit_travel = Cpt(SetableSignal, value=0, kind=Kind.omitted)
low_limit_travel = Cpt(SetableSignal, value=0, kind=Kind.omitted)
unused = Cpt(Signal, value=1, kind=Kind.omitted)
# TODO add short description to these two lines and explain what this does
SUB_READBACK = "readback"
@ -562,44 +605,27 @@ class SimPositioner(Device, PositionerBase):
labels=None,
kind=None,
limits=None,
tolerance: float = 0.5,
sim: dict = None,
**kwargs,
):
if readback_func is None:
def readback_func(x):
return x
# TODO what is this, check if still needed..
sentinel = object()
loop = kwargs.pop("loop", sentinel)
if loop is not sentinel:
warnings.warn(
f"{self.__class__} no longer takes a loop as input. "
"Your input will be ignored and may raise in the future",
stacklevel=2,
)
self._readback_func = readback_func
# Whether motions should be instantaneous or depend on motor velocity
self.delay = delay
self.precision = precision
self.tolerance = tolerance
self.init_sim_params = sim
self.speed = speed
self.update_frequency = update_frequency
self.tolerance = kwargs.pop("tolerance", 0.05)
self._stopped = False
self.dummy_controller = DummyController()
# initialize inner dictionary with simulated state
self.sim_state = {}
self.sim_state["setpoint"] = value
self.sim_state["setpoint_ts"] = ttime.time()
self.sim_state["readback"] = readback_func(value)
self.sim_state["readback_ts"] = ttime.time()
self.sim_state["is_moving"] = 0
self.sim_state["is_moving_ts"] = ttime.time()
self.sim = self.sim_cls(parent=self, **kwargs)
super().__init__(name=name, parent=parent, labels=labels, kind=kind, **kwargs)
super().__init__(name=name, labels=labels, kind=kind, **kwargs)
# Rename self.readback.name to self.name, also in self.sim_state
self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None)
self.readback.name = self.name
# Init limits from deviceConfig
if limits is not None:
@ -633,29 +659,36 @@ class SimPositioner(Device, PositionerBase):
if low_limit < high_limit and not low_limit <= value <= high_limit:
raise LimitError(f"position={value} not within limits {self.limits}")
def move(self, value, **kwargs) -> DeviceStatus:
def _set_sim_state(self, signal_name: str, value: any) -> None:
"""Update the simulated state of the device."""
self.sim.sim_state[signal_name]["value"] = value
self.sim.sim_state[signal_name]["timestamp"] = ttime.time()
def _get_sim_state(self, signal_name: str) -> any:
"""Return the simulated state of the device."""
return self.sim.sim_state[signal_name]["value"]
def move(self, value: float, **kwargs) -> DeviceStatus:
"""Change the setpoint of the simulated device, and simultaneously initiated a motion."""
self._stopped = False
self.check_value(value)
old_setpoint = self.sim_state["setpoint"]
self.sim_state["is_moving"] = 1
self.sim_state["setpoint"] = value
self.sim_state["setpoint_ts"] = ttime.time()
old_setpoint = self._get_sim_state(self.setpoint.name)
self._set_sim_state(self.motor_is_moving.name, 1)
self._set_sim_state(self.setpoint.name, value)
def update_state(val):
"""Update the state of the simulated device."""
if self._stopped:
raise DeviceStop
old_readback = self.sim_state["readback"]
self.sim_state["readback"] = val
self.sim_state["readback_ts"] = ttime.time()
old_readback = self._get_sim_state(self.readback.name)
self._set_sim_state(self.readback.name, val)
# Run subscription on "readback"
self._run_subs(
sub_type=self.SUB_READBACK,
old_value=old_readback,
value=self.sim_state["readback"],
timestamp=self.sim_state["readback_ts"],
value=self.sim.sim_state[self.readback.name]["value"],
timestamp=self.sim.sim_state[self.readback.name]["timestamp"],
)
st = DeviceStatus(device=self)
@ -666,9 +699,9 @@ class SimPositioner(Device, PositionerBase):
success = True
try:
# Compute final position with some jitter
move_val = self.sim_state["setpoint"] + self.tolerance * np.random.uniform(
-1, 1
)
move_val = self._get_sim_state(
self.setpoint.name
) + self.tolerance * np.random.uniform(-1, 1)
# Compute the number of updates needed to reach the final position with the given speed
updates = np.ceil(
np.abs(old_setpoint - move_val) / self.speed * self.update_frequency
@ -679,8 +712,7 @@ class SimPositioner(Device, PositionerBase):
update_state(ii)
# Update the state of the simulated device to the final position
update_state(move_val)
self.sim_state["is_moving"] = 0
self.sim_state["is_moving_ts"] = ttime.time()
self._set_sim_state(self.motor_is_moving, 0)
except DeviceStop:
success = False
finally:
@ -716,90 +748,6 @@ class SimPositioner(Device, PositionerBase):
return "mm"
class SynGaussBEC(Device):
"""
Evaluate a point on a Gaussian based on the value of a motor.
Parameters
----------
name : string
motor : Device
motor_field : string
center : number
center of peak
Imax : number
max intensity of peak
sigma : number, optional
Default is 1.
noise : {'poisson', 'uniform', None}, optional
Add noise to the gaussian peak.
noise_multiplier : float, optional
Only relevant for 'uniform' noise. Multiply the random amount of
noise by 'noise_multiplier'
random_state : numpy random state object, optional
np.random.RandomState(0), to generate random number with given seed
Example
-------
motor = SynAxis(name='motor')
det = SynGauss('det', motor, 'motor', center=0, Imax=1, sigma=1)
"""
val = Cpt(_ReadbackSignalCompute, value=0, kind="hinted")
Imax = Cpt(Signal, value=10, kind="config")
center = Cpt(Signal, value=0, kind="config")
sigma = Cpt(Signal, value=1, kind="config")
motor = Cpt(Signal, value="samx", kind="config")
noise = Cpt(
EnumSignal,
value="none",
kind="config",
enum_strings=("none", "poisson", "uniform"),
)
noise_multiplier = Cpt(Signal, value=1, kind="config")
def __init__(self, name, *, device_manager=None, random_state=None, **kwargs):
self.device_manager = device_manager
set_later = {}
for k in ("sigma", "noise", "noise_multiplier"):
v = kwargs.pop(k, None)
if v is not None:
set_later[k] = v
super().__init__(name=name, **kwargs)
self.random_state = random_state or np.random
self.val.name = self.name
self.precision = 3
self.sim_state = {"readback": 0, "readback_ts": ttime.time()}
for k, v in set_later.items():
getattr(self, k).put(v)
def _compute(self):
try:
m = self.device_manager.devices[self.motor.get()].obj.read()[self.motor.get()]["value"]
# we need to do this one at a time because
# - self.read() may be screwed with by the user
# - self.get() would cause infinite recursion
Imax = self.Imax.get()
center = self.center.get()
sigma = self.sigma.get()
noise = self.noise.get()
noise_multiplier = self.noise_multiplier.get()
v = Imax * np.exp(-((m - center) ** 2) / (2 * sigma**2))
if noise == "poisson":
v = int(self.random_state.poisson(np.round(v), 1))
elif noise == "uniform":
v += self.random_state.uniform(-1, 1) * noise_multiplier
return v
except Exception:
return 0
def get(self, *args, **kwargs):
self.sim_state["readback"] = self._compute()
self.sim_state["readback_ts"] = ttime.time()
return self.val.get()
class SynDynamicComponents(Device):
messages = Dcpt({f"message{i}": (SynSignal, None, {"name": f"msg{i}"}) for i in range(1, 6)})
@ -815,5 +763,4 @@ class SynDeviceOPAAS(Device):
if __name__ == "__main__":
det = SynSLSDetector(name="moench")
det.trigger()
gauss = SynGaussBEC(name="gauss")