feat: add SimWaveform for 1D waveform simulations

This commit is contained in:
appel_c 2024-04-12 18:13:07 +02:00
parent f287efc831
commit bf73bf41c4
4 changed files with 217 additions and 14 deletions

View File

@ -18,7 +18,7 @@ from .sim.sim import SimMonitor as SynAxisMonitor
from .sim.sim import SimMonitor as SynGaussBEC from .sim.sim import SimMonitor as SynGaussBEC
from .sim.sim import SimPositioner from .sim.sim import SimPositioner
from .sim.sim import SimPositioner as SynAxisOPAAS from .sim.sim import SimPositioner as SynAxisOPAAS
from .sim.sim import SynDeviceOPAAS from .sim.sim import SimWaveform, SynDeviceOPAAS
from .sim.sim_frameworks import DeviceProxy, H5ImageReplayProxy, SlitProxy from .sim.sim_frameworks import DeviceProxy, H5ImageReplayProxy, SlitProxy
from .sim.sim_signals import ReadOnlySignal from .sim.sim_signals import ReadOnlySignal
from .sim.sim_signals import ReadOnlySignal as SynSignalRO from .sim.sim_signals import ReadOnlySignal as SynSignalRO

View File

@ -1,7 +1,7 @@
from .sim import SimCamera from .sim import SimCamera
from .sim import SimFlyer from .sim import SimFlyer
from .sim import SimFlyer as SynFlyer from .sim import SimFlyer as SynFlyer
from .sim import SimMonitor, SimPositioner from .sim import SimMonitor, SimPositioner, SimWaveform
from .sim_frameworks import SlitProxy from .sim_frameworks import SlitProxy
from .sim_signals import ReadOnlySignal, SetableSignal from .sim_signals import ReadOnlySignal, SetableSignal
from .sim_xtreme import SynXtremeOtf from .sim_xtreme import SynXtremeOtf

View File

@ -16,6 +16,7 @@ from ophyd.utils import LimitError
from ophyd_devices.sim.sim_data import ( from ophyd_devices.sim.sim_data import (
SimulatedDataCamera, SimulatedDataCamera,
SimulatedDataMonitor, SimulatedDataMonitor,
SimulatedDataWaveform,
SimulatedPositioner, SimulatedPositioner,
) )
from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal
@ -220,6 +221,141 @@ class SimCamera(Device):
super().stop(success=success) super().stop(success=success)
class SimWaveform(Device):
"""A simulated device mimic any 1D Waveform detector.
It's waveform is a computed signal, which is configurable by the user and from the command line.
The corresponding simulation class is sim_cls=SimulatedDataWaveform, more details on defaults within the simulation class.
>>> waveform = SimWaveform(name="waveform")
Parameters
----------
name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device.
precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits.
sim_init (dict) : Dictionary to initiate parameters of the simulation, check simulation type defaults for more details.
parent : Parent device, optional, is used internally if this signal/device is part of a larger device.
kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options.
device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically.
"""
USER_ACCESS = ["sim", "registered_proxies"]
sim_cls = SimulatedDataWaveform
SHAPE = (1000,)
BIT_DEPTH = np.uint16
SUB_MONITOR = "monitor"
_default_sub = SUB_MONITOR
exp_time = Cpt(SetableSignal, name="exp_time", value=1, kind=Kind.config)
file_path = Cpt(SetableSignal, name="file_path", value="", kind=Kind.config)
file_pattern = Cpt(SetableSignal, name="file_pattern", value="", kind=Kind.config)
frames = Cpt(SetableSignal, name="frames", value=1, kind=Kind.config)
burst = Cpt(SetableSignal, name="burst", value=1, kind=Kind.config)
waveform_shape = Cpt(SetableSignal, name="waveform_shape", value=SHAPE, kind=Kind.config)
waveform = Cpt(
ReadOnlySignal,
name="waveform",
value=np.empty(SHAPE, dtype=BIT_DEPTH),
compute_readback=True,
kind=Kind.omitted,
)
def __init__(
self, name, *, kind=None, parent=None, sim_init: dict = None, device_manager=None, **kwargs
):
self.device_manager = device_manager
self.init_sim_params = sim_init
self._registered_proxies = {}
self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs)
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self._stopped = False
self._staged = False
self.scaninfo = None
self._update_scaninfo()
@property
def registered_proxies(self) -> None:
"""Dictionary of registered signal_names and proxies."""
return self._registered_proxies
def trigger(self) -> DeviceStatus:
"""Trigger the camera to acquire images.
This method can be called from BEC during a scan. It will acquire images and send them to BEC.
Whether the trigger is send from BEC is determined by the softwareTrigger argument in the device config.
Here, we also run a callback on SUB_MONITOR to send the image data the device_monitor endpoint in BEC.
"""
status = DeviceStatus(self)
self.subscribe(status._finished, event_type=self.SUB_ACQ_DONE, run=False)
def acquire():
try:
for _ in range(self.burst.get()):
self._run_subs(sub_type=self.SUB_MONITOR, value=self.waveform.get())
if self._stopped:
raise DeviceStop
except DeviceStop:
pass
finally:
self._stopped = False
self._done_acquiring()
threading.Thread(target=acquire, daemon=True).start()
return status
def _update_scaninfo(self) -> None:
"""Update scaninfo from BecScaninfoMixing
This depends on device manager and operation/sim_mode
"""
self.scaninfo = BecScaninfoMixin(self.device_manager)
def stage(self) -> list[object]:
"""Stage the camera for upcoming scan
This method is called from BEC in preparation of a scan.
It receives metadata about the scan from BEC,
compiles it and prepares the camera for the scan.
FYI: No data is written to disk in the simulation, but upon each trigger it
is published to the device_monitor endpoint in REDIS.
"""
if self._staged:
return super().stage()
self.scaninfo.load_scan_metadata()
self.file_path.set(
os.path.join(
self.file_path.get(), self.file_pattern.get().format(self.scaninfo.scan_number)
)
)
self.frames.set(self.scaninfo.num_points * self.scaninfo.frames_per_trigger)
self.exp_time.set(self.scaninfo.exp_time)
self.burst.set(self.scaninfo.frames_per_trigger)
self._stopped = False
return super().stage()
def unstage(self) -> list[object]:
"""Unstage the device
Send reads from all config signals to redis
"""
if self._stopped is True or not self._staged:
return super().unstage()
return super().unstage()
def stop(self, *, success=False):
"""Stop the device"""
self._stopped = True
super().stop(success=success)
class SimPositioner(Device, PositionerBase): class SimPositioner(Device, PositionerBase):
""" """
A simulated device mimicing any 1D Axis device (position, temperature, rotation). A simulated device mimicing any 1D Axis device (position, temperature, rotation).
@ -577,5 +713,12 @@ class SimPositionerWithCommFailure(SimPositioner):
if __name__ == "__main__": if __name__ == "__main__":
cam = SimCamera(name="cam") waveform = SimWaveform(name="waveform")
cam.image.read() waveform.sim.sim_select_model(waveform.sim.sim_get_models()[7])
waveform.sim.sim_params = {
"amplitude": 1500,
"noise_multiplier": 168,
"sigma": 50,
"center": 350,
}
waveform.waveform.get()

View File

@ -5,6 +5,7 @@ import inspect
import time as ttime import time as ttime
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from collections import defaultdict from collections import defaultdict
from copy import deepcopy
import numpy as np import numpy as np
from bec_lib import bec_logger from bec_lib import bec_logger
@ -310,8 +311,8 @@ class SimulatedDataMonitor(SimulatedDataBase):
self._init_default() self._init_default()
def _get_additional_params(self) -> None: def _get_additional_params(self) -> None:
params = DEFAULT_PARAMS_NOISE.copy() params = deepcopy(DEFAULT_PARAMS_NOISE)
params.update(DEFAULT_PARAMS_MOTOR.copy()) params.update(deepcopy(DEFAULT_PARAMS_MOTOR))
return params return params
def _init_default(self) -> None: def _init_default(self) -> None:
@ -439,6 +440,65 @@ class SimulatedDataMonitor(SimulatedDataBase):
return v return v
class SimulatedDataWaveform(SimulatedDataMonitor):
"""Simulated data class for a waveform.
The class inherits from SimulatedDataMonitor,
and overwrites the relevant methods to compute
a simulated waveform for each point.
"""
def _get_additional_params(self) -> None:
params = deepcopy(DEFAULT_PARAMS_NOISE)
return params
def compute_sim_state(self, signal_name: str, compute_readback: bool) -> None:
"""Update the simulated state of the device.
It will update the value in self.sim_state with the value computed by
the chosen simulation type.
Args:
signal_name (str): Name of the signal to update.
"""
if compute_readback:
method = self._compute
value = self.execute_simulation_method(method=method, signal_name=signal_name)
value = self.bit_depth(value)
self.update_sim_state(signal_name, value)
def _compute(self, *args, **kwargs) -> np.ndarray:
"""
Compute the return value for active model.
Returns:
np.array: Values computed for the activate model.
"""
size = self.parent.waveform_shape.get()
size = size[0] if isinstance(size, tuple) else size
method = self._model
value = method.eval(params=self._model_params, x=np.array(range(size)))
value *= self.sim_params["amplitude"] / np.max(value)
return self._add_noise(value, self.sim_params["noise"], self.sim_params["noise_multiplier"])
def _add_noise(self, v: np.ndarray, noise: NoiseType, noise_multiplier: float) -> np.ndarray:
"""Add noise to the simulated data.
Args:
v (np.ndarray): Simulated data.
noise (NoiseType): Type of noise to add.
"""
if noise == NoiseType.POISSON:
v = np.random.poisson(np.round(v), v.shape)
return v
if noise == NoiseType.UNIFORM:
v += np.random.uniform(-noise_multiplier, noise_multiplier, v.shape)
v[v <= 0] = 0
return v
if noise == NoiseType.NONE:
return v
class SimulatedDataCamera(SimulatedDataBase): class SimulatedDataCamera(SimulatedDataBase):
"""Simulated class to compute data for a 2D camera.""" """Simulated class to compute data for a 2D camera."""
@ -468,24 +528,24 @@ class SimulatedDataCamera(SimulatedDataBase):
return model_lookup return model_lookup
def _get_additional_params(self) -> None: def _get_additional_params(self) -> None:
params = DEFAULT_PARAMS_NOISE.copy() params = deepcopy(DEFAULT_PARAMS_NOISE)
params.update(DEFAULT_PARAMS_HOT_PIXEL.copy()) params.update(deepcopy(DEFAULT_PARAMS_HOT_PIXEL))
return params return params
def _init_default_camera_params(self) -> None: def _init_default_camera_params(self) -> None:
"""Initiate additional params for the simulated camera.""" """Initiate additional params for the simulated camera."""
self._all_default_model_params.update( self._all_default_model_params.update(
{ {
self._model_lookup[ self._model_lookup[SimulationType2D.CONSTANT.value]: deepcopy(
SimulationType2D.CONSTANT.value DEFAULT_PARAMS_CAMERA_CONSTANT
]: DEFAULT_PARAMS_CAMERA_CONSTANT.copy() )
} }
) )
self._all_default_model_params.update( self._all_default_model_params.update(
{ {
self._model_lookup[ self._model_lookup[SimulationType2D.GAUSSIAN.value]: deepcopy(
SimulationType2D.GAUSSIAN.value DEFAULT_PARAMS_CAMERA_GAUSSIAN
]: DEFAULT_PARAMS_CAMERA_GAUSSIAN.copy() )
} }
) )