refactor: refactored SimMonitor and SimCamera

This commit is contained in:
2024-02-09 19:21:28 +01:00
parent 4ce12711ac
commit 96a5f1b86a

View File

@@ -1,4 +1,3 @@
from collections import defaultdict
import os import os
import threading import threading
import time as ttime import time as ttime
@@ -9,10 +8,12 @@ from bec_lib import MessageEndpoints, bec_logger, messages
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, Kind from ophyd import Device, DeviceStatus, Kind
from ophyd import DynamicDeviceComponent as Dcpt from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import OphydObject, PositionerBase, Signal from ophyd import PositionerBase, Signal
from ophyd.sim import EnumSignal, SynSignal from ophyd.sim import SynSignal
from ophyd.utils import LimitError, ReadOnlyError from ophyd.utils import LimitError
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.sim.sim_data import SimulatedDataBase, SimulatedDataCamera, SimulatedDataMonitor from ophyd_devices.sim.sim_data import SimulatedDataBase, SimulatedDataCamera, SimulatedDataMonitor
from ophyd_devices.sim.sim_additional_devices import DummyController
from ophyd_devices.sim.sim_signals import SetableSignal, ReadOnlySignal, ComputedReadOnlySignal from ophyd_devices.sim.sim_signals import SetableSignal, ReadOnlySignal, ComputedReadOnlySignal
@@ -27,21 +28,20 @@ class SimMonitor(Device):
""" """
A simulated device mimic any 1D Axis (position, temperature, beam). A simulated device mimic any 1D Axis (position, temperature, beam).
Readback functionality can be configured It's readback is a computed signal, which is configurable by the user and from the command line.
The corresponding simulation class is sim_cls=SimulatedDataMonitor, more details on defaults within the simulation class.
>>> monitor = SimMonitor(name="monitor")
Parameters Parameters
---------- ----------
name : string, keyword only name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device.
value : object, optional precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits.
The initial value. Default is 0. sim_init (dict) : Dictionary to initiate parameters of the simulation, check simulation type defaults for more details.
delay : number, optional parent : Parent device, optional, is used internally if this signal/device is part of a larger device.
Simulates how long it takes the device to "move". Default is 0 seconds. kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options.
precision : integer, optional device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically.
Digits of precision. Default is 3.
parent : Device, optional
Used internally if this Signal is made part of a larger Device.
kind : a member the Kind IntEnum (or equivalent integer), optional
Default is Kind.normal. See Kind for options.
""" """
USER_ACCESS = ["sim"] USER_ACCESS = ["sim"]
@@ -55,130 +55,43 @@ class SimMonitor(Device):
def __init__( def __init__(
self, self,
*,
name, name,
value=0, *,
delay=0, precision: int = 3,
precision=3,
tolerance: float = 0.5,
sim_init: dict = None, sim_init: dict = None,
parent=None, parent=None,
labels=None,
kind=None, kind=None,
device_manager=None, device_manager=None,
**kwargs, **kwargs,
): ):
self.delay = delay
self.precision = precision self.precision = precision
self.tolerance = tolerance
self.init_sim_params = sim_init self.init_sim_params = sim_init
self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs) self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs)
super().__init__(name=name, parent=parent, labels=labels, kind=kind, **kwargs) super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None) self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None)
self.readback.name = self.name self.readback.name = self.name
class SynGaussBEC(Device): class SimCamera(Device):
""" """A simulated device mimic any 2D camera.
Evaluate a point on a Gaussian based on the value of a motor.
It's image is a computed signal, which is configurable by the user and from the command line.
The corresponding simulation class is sim_cls=SimulatedDataCamera, more details on defaults within the simulation class.
>>> camera = SimCamera(name="camera")
Parameters Parameters
---------- ----------
name : string name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device.
motor : Device precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits.
motor_field : string sim_init (dict) : Dictionary to initiate parameters of the simulation, check simulation type defaults for more details.
center : number parent : Parent device, optional, is used internally if this signal/device is part of a larger device.
center of peak kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options.
Imax : number device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically.
max intensity of peak
sigma : number, optional
Default is 1.
noise : {'poisson', 'uniform', None}, optional
Add noise to the gaussian peak.
noise_multiplier : float, optional
Only relevant for 'uniform' noise. Multiply the random amount of
noise by 'noise_multiplier'
random_state : numpy random state object, optional
np.random.RandomState(0), to generate random number with given seed
Example
-------
motor = SynAxis(name='motor')
det = SynGauss('det', motor, 'motor', center=0, Imax=1, sigma=1)
""" """
val = Cpt(ComputedReadOnlySignal, value=0, kind=Kind.hinted)
Imax = Cpt(Signal, value=10, kind=Kind.config)
center = Cpt(Signal, value=0, kind=Kind.config)
sigma = Cpt(Signal, value=1, kind=Kind.config)
motor = Cpt(Signal, value="samx", kind=Kind.config)
noise = Cpt(
EnumSignal,
value="none",
kind=Kind.config,
enum_strings=("none", "poisson", "uniform"),
)
noise_multiplier = Cpt(Signal, value=1, kind=Kind.config)
def __init__(self, name, *, device_manager=None, random_state=None, **kwargs):
self.device_manager = device_manager
set_later = {}
for k in ("sigma", "noise", "noise_multiplier"):
v = kwargs.pop(k, None)
if v is not None:
set_later[k] = v
self.sim_state = defaultdict(lambda: {})
super().__init__(name=name, **kwargs)
self.sim_state[self.name] = self.sim_state.pop(self.val.name, None)
self.val.name = self.name
self.random_state = random_state or np.random
self.precision = 3
for k, v in set_later.items():
getattr(self, k).put(v)
def _compute_sim_state(self, signal_name: str) -> None:
try:
m = self.device_manager.devices[self.motor.get()].obj.read()[self.motor.get()]["value"]
# we need to do this one at a time because
# - self.read() may be screwed with by the user
# - self.get() would cause infinite recursion
Imax = self.Imax.get()
center = self.center.get()
sigma = self.sigma.get()
noise = self.noise.get()
noise_multiplier = self.noise_multiplier.get()
v = Imax * np.exp(-((m - center) ** 2) / (2 * sigma**2))
if noise == "poisson":
v = int(self.random_state.poisson(np.round(v), 1))
elif noise == "uniform":
v += self.random_state.uniform(-1, 1) * noise_multiplier
self.sim_state[signal_name]["value"] = v
self.sim_state[signal_name]["timestamp"] = ttime.time()
except Exception as exc:
logger.warning(f"Failed to compute sim state with exception {exc}")
self.sim_state[signal_name]["value"] = 0
self.sim_state[signal_name]["timestamp"] = ttime.time()
def get(self, *args, **kwargs):
self.sim_state["readback"] = self._compute()
self.sim_state["readback_ts"] = ttime.time()
return self.val.get()
class _SLSDetectorConfigSignal(Signal):
def put(self, value, *, timestamp=None, force=False):
self._readback = value
self.parent.sim_state[self.name] = value
def get(self):
self._readback = self.parent.sim_state[self.name]
return self.parent.sim_state[self.name]
class SimCamera(Device):
USER_ACCESS = ["sim"] USER_ACCESS = ["sim"]
sim_cls = SimulatedDataCamera sim_cls = SimulatedDataCamera
@@ -189,12 +102,8 @@ class SimCamera(Device):
exp_time = Cpt(SetableSignal, name="exp_time", value=1, kind=Kind.config) exp_time = Cpt(SetableSignal, name="exp_time", value=1, kind=Kind.config)
file_path = Cpt(SetableSignal, name="file_path", value="", kind=Kind.config) file_path = Cpt(SetableSignal, name="file_path", value="", kind=Kind.config)
file_pattern = Cpt(SetableSignal, name="file_pattern", value="", kind=Kind.config)
frames = Cpt(SetableSignal, name="frames", value=1, kind=Kind.config) frames = Cpt(SetableSignal, name="frames", value=1, kind=Kind.config)
burst = Cpt(SetableSignal, name="burst", value=1, kind=Kind.config)
save_file = Cpt(SetableSignal, name="save_file", value=False, kind=Kind.config)
# image shape, only adjustable via config.
image_shape = Cpt(SetableSignal, name="image_shape", value=SHAPE, kind=Kind.config) image_shape = Cpt(SetableSignal, name="image_shape", value=SHAPE, kind=Kind.config)
image = Cpt( image = Cpt(
ComputedReadOnlySignal, ComputedReadOnlySignal,
@@ -205,8 +114,8 @@ class SimCamera(Device):
def __init__( def __init__(
self, self,
*,
name, name,
*,
kind=None, kind=None,
parent=None, parent=None,
sim_init: dict = None, sim_init: dict = None,
@@ -219,10 +128,17 @@ class SimCamera(Device):
super().__init__(name=name, parent=parent, kind=kind, **kwargs) super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self._stopped = False self._stopped = False
self.file_name = "" self.scaninfo = None
self.metadata = {} self._update_scaninfo()
def trigger(self): def trigger(self) -> DeviceStatus:
"""Trigger the camera to acquire images.
This method can be called from BEC during a scan. It will acquire images and send them to BEC.
Whether the trigger is send from BEC is determined by the softwareTrigger argument in the device config.
Here, we also run a callback on SUB_MONITOR to send the image data the device_monitor endpoint in BEC.
"""
status = DeviceStatus(self) status = DeviceStatus(self)
self.subscribe(status._finished, event_type=self.SUB_ACQ_DONE, run=False) self.subscribe(status._finished, event_type=self.SUB_ACQ_DONE, run=False)
@@ -230,7 +146,6 @@ class SimCamera(Device):
def acquire(): def acquire():
try: try:
for _ in range(self.burst.get()): for _ in range(self.burst.get()):
# Send data for each trigger
self._run_subs(sub_type=self.SUB_MONITOR, value=self.image.get()) self._run_subs(sub_type=self.SUB_MONITOR, value=self.image.get())
if self._stopped: if self._stopped:
raise DeviceStop raise DeviceStop
@@ -243,28 +158,41 @@ class SimCamera(Device):
threading.Thread(target=acquire, daemon=True).start() threading.Thread(target=acquire, daemon=True).start()
return status return status
def stage(self) -> list[object]: def _update_scaninfo(self) -> None:
"""Stage the camera """Update scaninfo from BecScaninfoMixing
This depends on device manager and operation/sim_mode
Receive scan message from REDIS first, extract relevant scan data,
and set all signals for the scan, e.g. scan_number, file_name, frames, etc.
""" """
msg = self.device_manager.producer.get(MessageEndpoints.scan_status()) self.scaninfo = BecScaninfoMixin(self.device_manager)
scan_msg = messages.ScanStatusMessage.loads(msg)
self.metadata = { def stage(self) -> list[object]:
"scanID": scan_msg.content["scanID"], """Stage the camera for upcoming scan
"RID": scan_msg.content["info"]["RID"],
"queueID": scan_msg.content["info"]["queueID"], This method is called from BEC in preparation of a scan.
} It receives metadata about the scan from BEC,
scan_number = scan_msg.content["info"]["scan_number"] compiles it and prepares the camera for the scan.
self.frames.set(scan_msg.content["info"]["num_points"])
self.file_name = os.path.join( FYI: No data is written to disk in the simulation, but upon each trigger it
self.file_path.get(), self.file_pattern.get().format(scan_number) is published to the device_monitor endpoint in REDIS.
"""
if self._staged:
return super().stag e()
self.scaninfo.load_scan_metadata()
self.file_path.set(
os.path.join(
self.file_path.get(), self.file_pattern.get().format(self.scaninfo.scan_number)
)
) )
self.frames = self.scaninfo.num_points * self.scaninfo.frames_per_trigger
self.exp_time = self.scaninfo.exp_time
self._stopped = False self._stopped = False
return super().stage() return super().stage()
def _send_data_to_bec(self) -> None: def _send_data_to_bec(self) -> None:
"""Send data to BEC.
Reads out all signals of type Kind.config, and send them to BEC.
Happens once for each scan.
"""
config_readout = { config_readout = {
signal.item.name: signal.item.get() signal.item.name: signal.item.get()
for signal in self.walk_signals() for signal in self.walk_signals()
@@ -272,7 +200,7 @@ class SimCamera(Device):
} }
signals = {"config": config_readout, "data": self.file_name} signals = {"config": config_readout, "data": self.file_name}
msg = messages.DeviceMessage(signals=signals, metadata=self.metadata) msg = messages.DeviceMessage(signals=signals, metadata=self.scaninfo.metadata)
self.device_manager.producer.set_and_publish( self.device_manager.producer.set_and_publish(
MessageEndpoints.device_read(self.name), msg.dumps() MessageEndpoints.device_read(self.name), msg.dumps()
) )
@@ -289,53 +217,11 @@ class SimCamera(Device):
return super().unstage() return super().unstage()
def stop(self, *, success=False): def stop(self, *, success=False):
"""Stop the device"""
self._stopped = True self._stopped = True
super().stop(success=success) super().stop(success=success)
class DummyController:
USER_ACCESS = [
"some_var",
"controller_show_all",
"_func_with_args",
"_func_with_args_and_kwargs",
"_func_with_kwargs",
"_func_without_args_kwargs",
]
some_var = 10
another_var = 20
def on(self):
self._connected = True
def off(self):
self._connected = False
def _func_with_args(self, *args):
return args
def _func_with_args_and_kwargs(self, *args, **kwargs):
return args, kwargs
def _func_with_kwargs(self, **kwargs):
return kwargs
def _func_without_args_kwargs(self):
return None
def controller_show_all(self):
"""dummy controller show all
Raises:
in: _description_
LimitError: _description_
Returns:
_type_: _description_
"""
print(self.some_var)
class DummyControllerDevice(Device): class DummyControllerDevice(Device):
USER_ACCESS = ["controller"] USER_ACCESS = ["controller"]
@@ -439,14 +325,6 @@ class SynFlyer(Device, PositionerBase):
flyer.start() flyer.start()
class SynController(OphydObject):
def on(self):
pass
def off(self):
pass
class SynFlyerLamNI(Device, PositionerBase): class SynFlyerLamNI(Device, PositionerBase):
def __init__( def __init__(
self, self,