feat: add lmfit for SimMonitor, refactored sim_data with baseclass, introduce slitproxy

This commit is contained in:
2024-02-22 14:42:35 +01:00
parent 2da6379e8e
commit 800c22e959
9 changed files with 822 additions and 535 deletions

View File

@ -8,13 +8,15 @@ from .npoint.npoint import NPointAxis
from .rt_lamni import RtFlomniMotor, RtLamniMotor
from .sim.sim import SimCamera
from .sim.sim import SimMonitor
from .sim.sim import SimFlyer
from .sim.sim import SimFlyer as SynFlyer
from .sim.sim import SimMonitor as SynAxisMonitor
from .sim.sim import SimMonitor as SynGaussBEC
from .sim.sim import SimPositioner
from .sim.sim import SimPositioner as SynAxisOPAAS
from .sim.sim import SynDeviceOPAAS, SynFlyer
from .sim.sim import SynDeviceOPAAS
from .sim.sim_signals import ReadOnlySignal
from .sim.sim_frameworks import DeviceProxy, SlitLookup
from .sim.sim_frameworks import DeviceProxy, SlitProxy
from .sim.sim_signals import ReadOnlySignal as SynSignalRO
from .sls_devices.sls_devices import SLSInfo, SLSOperatorMessages
from .smaract.smaract_ophyd import SmaractMotor

View File

@ -2,11 +2,11 @@ from .sim import (
SimPositioner,
SimMonitor,
SimCamera,
SynDynamicComponents,
SynFlyer,
SimFlyer,
SimFlyer as SynFlyer,
)
from .sim_xtreme import SynXtremeOtf
from .sim_signals import SetableSignal, ReadOnlySignal, ComputedReadOnlySignal
from .sim_signals import SetableSignal, ReadOnlySignal
from .sim_frameworks import SlitLookup
from .sim_frameworks import SlitProxy

View File

@ -1,23 +1,32 @@
from collections import defaultdict
import os
import threading
import time as ttime
import warnings
import numpy as np
from bec_lib import MessageEndpoints, bec_logger, messages
from ophyd import Component as Cpt
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import Device, DeviceStatus, Kind
from ophyd import PositionerBase, Signal
from ophyd import PositionerBase
from ophyd.flyers import FlyerInterface
from ophyd.sim import SynSignal
from ophyd.status import StatusBase
from ophyd.utils import LimitError
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.sim.sim_data import SimulatedDataBase, SimulatedDataCamera, SimulatedDataMonitor
from ophyd_devices.sim.sim_data import (
SimulatedPositioner,
SimulatedDataCamera,
SimulatedDataMonitor,
)
from ophyd_devices.sim.sim_test_devices import DummyController
from ophyd_devices.sim.sim_signals import SetableSignal, ReadOnlySignal, ComputedReadOnlySignal
from ophyd_devices.sim.sim_signals import SetableSignal, ReadOnlySignal
logger = bec_logger.logger
@ -46,11 +55,11 @@ class SimMonitor(Device):
"""
USER_ACCESS = ["sim"]
USER_ACCESS = ["sim", "registered_proxies"]
sim_cls = SimulatedDataMonitor
readback = Cpt(ComputedReadOnlySignal, value=0, kind=Kind.hinted)
readback = Cpt(ReadOnlySignal, value=0, kind=Kind.hinted, compute_readback=True)
SUB_READBACK = "readback"
_default_sub = SUB_READBACK
@ -69,16 +78,16 @@ class SimMonitor(Device):
self.precision = precision
self.init_sim_params = sim_init
self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs)
self._lookup_table = []
self._registered_proxies = {}
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None)
self.readback.name = self.name
@property
def lookup_table(self) -> None:
"""lookup_table property"""
return self._lookup_table
def registered_proxies(self) -> None:
"""Dictionary of registered signal_names and proxies."""
return self._registered_proxies
class SimCamera(Device):
@ -100,7 +109,7 @@ class SimCamera(Device):
"""
USER_ACCESS = ["sim"]
USER_ACCESS = ["sim", "registered_proxies"]
sim_cls = SimulatedDataCamera
SHAPE = (100, 100)
@ -116,9 +125,10 @@ class SimCamera(Device):
image_shape = Cpt(SetableSignal, name="image_shape", value=SHAPE, kind=Kind.config)
image = Cpt(
ComputedReadOnlySignal,
ReadOnlySignal,
name="image",
value=np.empty(SHAPE, dtype=np.uint16),
compute_readback=True,
kind=Kind.omitted,
)
@ -134,7 +144,7 @@ class SimCamera(Device):
):
self.device_manager = device_manager
self.init_sim_params = sim_init
self._lookup_table = []
self._registered_proxies = {}
self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs)
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
@ -144,9 +154,9 @@ class SimCamera(Device):
self._update_scaninfo()
@property
def lookup_table(self) -> None:
"""lookup_table property"""
return self._lookup_table
def registered_proxies(self) -> None:
"""Dictionary of registered signal_names and proxies."""
return self._registered_proxies
def trigger(self) -> DeviceStatus:
"""Trigger the camera to acquire images.
@ -225,30 +235,29 @@ class SimPositioner(Device, PositionerBase):
"""
A simulated device mimicing any 1D Axis device (position, temperature, rotation).
>>> motor = SimPositioner(name="motor")
Parameters
----------
name : string, keyword only
readback_func : callable, optional
When the Device is set to ``x``, its readback will be updated to
``f(x)``. This can be used to introduce random noise or a systematic
offset.
Expected signature: ``f(x) -> value``.
value : object, optional
The initial value. Default is 0.
delay : number, optional
Simulates how long it takes the device to "move". Default is 0 seconds.
precision : integer, optional
Digits of precision. Default is 3.
parent : Device, optional
Used internally if this Signal is made part of a larger Device.
kind : a member the Kind IntEnum (or equivalent integer), optional
Default is Kind.normal. See Kind for options.
name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device.\
Optional parameters:
----------
delay (int) : If 0, execution of move will be instant. If 1, exectution will depend on motor velocity. Default is 1.
update_frequency (int) : Frequency in Hz of the update of the simulated state during a move. Default is 2 Hz.
precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits.
tolerance (float) : Tolerance of the positioner to accept reaching target positions. Default is 0.5.
limits (tuple) : Tuple of the low and high limits of the positioner. Overrides low/high_limit_travel is specified. Default is None.
parent : Parent device, optional, is used internally if this signal/device is part of a larger device.
kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options.
device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically.
sim_init (dict) : Dictionary to initiate parameters of the simulation, check simulation type defaults for more details.
"""
# Specify which attributes are accessible via BEC client
USER_ACCESS = ["sim", "readback", "speed", "dummy_controller"]
USER_ACCESS = ["sim", "readback", "speed", "dummy_controller", "registered_proxies"]
sim_cls = SimulatedDataBase
sim_cls = SimulatedPositioner
# Define the signals as class attributes
readback = Cpt(ReadOnlySignal, name="readback", value=0, kind=Kind.hinted)
@ -256,60 +265,61 @@ class SimPositioner(Device, PositionerBase):
motor_is_moving = Cpt(SetableSignal, value=0, kind=Kind.normal)
# Config signals
velocity = Cpt(SetableSignal, value=1, kind=Kind.config)
velocity = Cpt(SetableSignal, value=100, kind=Kind.config)
acceleration = Cpt(SetableSignal, value=1, kind=Kind.config)
# Ommitted signals
high_limit_travel = Cpt(SetableSignal, value=0, kind=Kind.omitted)
low_limit_travel = Cpt(SetableSignal, value=0, kind=Kind.omitted)
unused = Cpt(Signal, value=1, kind=Kind.omitted)
unused = Cpt(SetableSignal, value=1, kind=Kind.omitted)
SUB_READBACK = "readback"
_default_sub = SUB_READBACK
# pylint: disable=too-many-arguments
def __init__(
self,
*,
name,
readback_func=None,
value=0,
delay=1,
speed=1,
*,
delay: int = 1,
update_frequency=2,
precision=3,
parent=None,
labels=None,
kind=None,
limits=None,
tolerance: float = 0.5,
sim: dict = None,
limits=None,
parent=None,
kind=None,
device_manager=None,
sim_init: dict = None,
# TODO remove after refactoring config
speed: float = 100,
**kwargs,
):
# Whether motions should be instantaneous or depend on motor velocity
self.delay = delay
self.device_manager = device_manager
self.precision = precision
self.tolerance = tolerance
self.init_sim_params = sim
self._lookup_table = []
self.init_sim_params = sim_init
self._registered_proxies = {}
self.speed = speed
self.update_frequency = update_frequency
self._stopped = False
self.dummy_controller = DummyController()
# initialize inner dictionary with simulated state
self.sim = self.sim_cls(parent=self, **kwargs)
super().__init__(name=name, labels=labels, parent=parent, kind=kind, **kwargs)
# Rename self.readback.name to self.name, also in self.sim_state
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None)
self.readback.name = self.name
# Init limits from deviceConfig
if limits is not None:
assert len(limits) == 2
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
# @property
# def connected(self):
# """Return the connected state of the simulated device."""
# return self.dummy_controller.connected
@property
def limits(self):
"""Return the limits of the simulated device."""
@ -325,11 +335,11 @@ class SimPositioner(Device, PositionerBase):
"""Return the high limit of the simulated device."""
return self.limits[1]
@property
def lookup_table(self) -> None:
"""lookup_table property"""
return self._lookup_table
def registered_proxies(self) -> None:
"""Dictionary of registered signal_names and proxies."""
return self._registered_proxies
# pylint: disable=arguments-differ
def check_value(self, value: any):
"""
Check that requested position is within existing limits.
@ -375,42 +385,41 @@ class SimPositioner(Device, PositionerBase):
st = DeviceStatus(device=self)
if self.delay:
# If self.delay is not 0, we use the speed and updated frequency of the device to compute the motion
def move_and_finish():
"""Move the simulated device and finish the motion."""
success = True
try:
# Compute final position with some jitter
move_val = self._get_sim_state(
self.setpoint.name
) + self.tolerance * np.random.uniform(-1, 1)
# Compute the number of updates needed to reach the final position with the given speed
updates = np.ceil(
np.abs(old_setpoint - move_val) / self.speed * self.update_frequency
np.abs(old_setpoint - move_val)
/ self.velocity.get()
* self.update_frequency
)
# Loop over the updates and update the state of the simulated device
for ii in np.linspace(old_setpoint, move_val, int(updates)):
ttime.sleep(1 / self.update_frequency)
update_state(ii)
# Update the state of the simulated device to the final position
update_state(move_val)
self._set_sim_state(self.motor_is_moving, 0)
except DeviceStop:
success = False
finally:
self._stopped = False
# Call function from positioner base to indicate that motion finished with success
self._done_moving(success=success)
# Set status to finished
self._set_sim_state(self.motor_is_moving.name, 0)
st.set_finished()
# Start motion in Thread
threading.Thread(target=move_and_finish, daemon=True).start()
else:
# If self.delay is 0, we move the simulated device instantaneously
update_state(value)
self._done_moving()
self._set_sim_state(self.motor_is_moving.name, 0)
st.set_finished()
return st
@ -420,7 +429,7 @@ class SimPositioner(Device, PositionerBase):
self._stopped = True
@property
def position(self):
def position(self) -> float:
"""Return the current position of the simulated device."""
return self.readback.get()
@ -430,57 +439,81 @@ class SimPositioner(Device, PositionerBase):
return "mm"
class SynFlyer(Device, PositionerBase):
class SimFlyer(Device, PositionerBase, FlyerInterface):
"""A simulated device mimicing any 2D Flyer device (position, temperature, rotation).
The corresponding simulation class is sim_cls=SimulatedPositioner, more details on defaults within the simulation class.
>>> flyer = SimFlyer(name="flyer")
Parameters
----------
name (string) : Name of the device. This is the only required argmuent, passed on to all signals of the device.
precision (integer) : Precision of the readback in digits, written to .describe(). Default is 3 digits.
parent : Parent device, optional, is used internally if this signal/device is part of a larger device.
kind : A member the Kind IntEnum (or equivalent integer), optional. Default is Kind.normal. See Kind for options.
device_manager : DeviceManager from BEC, optional . Within startup of simulation, device_manager is passed on automatically.
"""
USER_ACCESS = ["sim", "registered_proxies"]
sim_cls = SimulatedPositioner
readback = Cpt(
ReadOnlySignal, name="readback", value=0, kind=Kind.hinted, compute_readback=False
)
def __init__(
self,
name: str,
*,
name,
readback_func=None,
value=0,
delay=0,
speed=1,
update_frequency=2,
precision=3,
precision: int = 3,
parent=None,
labels=None,
kind=None,
device_manager=None,
# TODO remove after refactoring config
speed: float = 100,
delay: int = 1,
update_frequency: int = 100,
**kwargs,
):
if readback_func is None:
def readback_func(x):
return x
sentinel = object()
loop = kwargs.pop("loop", sentinel)
if loop is not sentinel:
warnings.warn(
f"{self.__class__} no longer takes a loop as input. "
"Your input will be ignored and may raise in the future",
stacklevel=2,
)
self.sim_state = {}
self._readback_func = readback_func
self.delay = delay
self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs)
self.precision = precision
self.tolerance = kwargs.pop("tolerance", 0.5)
self.device_manager = device_manager
self._registered_proxies = {}
# initialize values
self.sim_state["readback"] = readback_func(value)
self.sim_state["readback_ts"] = ttime.time()
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None)
self.readback.name = self.name
super().__init__(name=name, parent=parent, labels=labels, kind=kind, **kwargs)
@property
def registered_proxies(self) -> None:
"""Dictionary of registered signal_names and proxies."""
return self._registered_proxies
@property
def hints(self):
"""Return the hints of the simulated device."""
return {"fields": ["flyer_samx", "flyer_samy"]}
@property
def egu(self) -> str:
"""Return the engineering units of the simulated device."""
return "mm"
def complete(self) -> StatusBase:
"""Complete the motion of the simulated device."""
status = DeviceStatus(self)
status.set_finished()
return status
def kickoff(self, metadata, num_pos, positions, exp_time: float = 0):
"""Kickoff the flyer to execute code during the scan."""
positions = np.asarray(positions)
def produce_data(device, metadata):
"""Simulate the data being produced by the flyer."""
buffer_time = 0.2
elapsed_time = 0
bundle = messages.BundleMessage()
@ -529,10 +562,6 @@ class SynFlyer(Device, PositionerBase):
flyer.start()
class SynDynamicComponents(Device):
messages = Dcpt({f"message{i}": (SynSignal, None, {"name": f"msg{i}"}) for i in range(1, 6)})
class SynDeviceSubOPAAS(Device):
zsub = Cpt(SimPositioner, name="zsub")
@ -541,3 +570,12 @@ class SynDeviceOPAAS(Device):
x = Cpt(SimPositioner, name="x")
y = Cpt(SimPositioner, name="y")
z = Cpt(SynDeviceSubOPAAS, name="z")
class SynDynamicComponents(Device):
messages = Dcpt({f"message{i}": (SynSignal, None, {"name": f"msg{i}"}) for i in range(1, 6)})
if __name__ == "__main__":
cam = SimCamera(name="cam")
cam.image.read()

View File

@ -1,10 +1,15 @@
from __future__ import annotations
from collections import defaultdict
from abc import ABC, abstractmethod
from prettytable import PrettyTable
import enum
import inspect
import time as ttime
import numpy as np
from lmfit import models, Model
from bec_lib import bec_logger
@ -15,11 +20,11 @@ class SimulatedDataException(Exception):
"""Exception raised when there is an issue with the simulated data."""
class SimulationType(str, enum.Enum):
class SimulationType2D(str, enum.Enum):
"""Type of simulation to steer simulated data."""
CONSTANT = "constant"
GAUSSIAN = "gauss"
GAUSSIAN = "gaussian"
class NoiseType(str, enum.Enum):
@ -37,178 +42,374 @@ class HotPixelType(str, enum.Enum):
FLUCTUATING = "fluctuating"
class SimulatedDataBase:
DEFAULT_PARAMS_LMFIT = {
"c0": 1,
"c1": 1,
"c2": 1,
"c3": 1,
"c4": 1,
"c": 100,
"amplitude": 100,
"center": 0,
"sigma": 1,
}
DEFAULT_PARAMS_NOISE = {
"noise": NoiseType.UNIFORM,
"noise_multiplier": 10,
}
DEFAULT_PARAMS_MOTOR = {
"ref_motor": "samx",
}
DEFAULT_PARAMS_CAMERA_GAUSSIAN = {
"amplitude": 100,
"center_offset": np.array([0, 0]),
"covariance": np.array([[400, 100], [100, 400]]),
}
DEFAULT_PARAMS_CAMERA_CONSTANT = {
"amplitude": 100,
}
DEFAULT_PARAMS_HOT_PIXEL = {
"hot_pixel_coords": np.array([[24, 24], [50, 20], [4, 40]]),
"hot_pixel_types": [
HotPixelType.FLUCTUATING,
HotPixelType.CONSTANT,
HotPixelType.FLUCTUATING,
],
"hot_pixel_values": np.array([1e4, 1e6, 1e4]),
}
class SimulatedDataBase(ABC):
"""Abstract base class for simulated data.
This class should be subclassed to implement the simulated data for a specific device.
It provides the basic functionality to set and get data from the simulated data class
---------------------
The class provides the following methods:
- execute_simulation_method: execute a method from the simulated data class or reroute execution to device proxy class
- sim_select_model: select the active simulation model
- sim_params: get the parameters for the active simulation mdoel
- sim_models: get the available simulation models
- update_sim_state: update the simulated state of the device
"""
USER_ACCESS = [
"get_sim_params",
"set_sim_params",
"get_sim_type",
"set_sim_type",
"sim_params",
"sim_select_model",
"sim_get_models",
"sim_show_all",
]
def __init__(self, *args, parent=None, device_manager=None, **kwargs) -> None:
"""
Note:
self._model_params duplicates parameters from _params that are solely relevant for the model used.
This facilitates easier and faster access for computing the simulated state using the lmfit package.
"""
self.parent = parent
self.sim_state = defaultdict(dict)
self._all_params = defaultdict(dict)
self.device_manager = device_manager
self._simulation_type = None
self.lookup_table = getattr(self.parent, "lookup_table", [])
self.init_paramaters(**kwargs)
self._active_params = self._all_params.get(self._simulation_type, None)
self.sim_state = defaultdict(dict)
self.registered_proxies = getattr(self.parent, "registered_proxies", {})
self._model = {}
self._model_params = None
self._params = {}
def execute_simulation_method(self, *args, method=None, **kwargs) -> any:
"""Execute the method from the lookup table."""
if self.lookup_table and self.device_manager.devices.get(self.lookup_table[0]) is not None:
sim_device = self.device_manager.devices.get(self.lookup_table[0])
# pylint: disable=protected-access
if sim_device.enabled is True:
method = sim_device.obj.lookup[self.parent.name]["method"]
args = sim_device.obj.lookup[self.parent.name]["args"]
kwargs = sim_device.obj.lookup[self.parent.name]["kwargs"]
def execute_simulation_method(self, *args, method=None, signal_name: str = "", **kwargs) -> any:
"""
Execute either the provided method or reroutes the method execution
to a device proxy in case it is registered in self.parentregistered_proxies.
"""
if self.registered_proxies and self.device_manager:
for proxy_name, signal in self.registered_proxies.items():
if signal == signal_name or f"{self.parent.name}_{signal}" == signal_name:
sim_proxy = self.device_manager.devices.get(proxy_name, None)
if sim_proxy and sim_proxy.enabled is True:
method = sim_proxy.obj.lookup[self.parent.name]["method"]
args = sim_proxy.obj.lookup[self.parent.name]["args"]
kwargs = sim_proxy.obj.lookup[self.parent.name]["kwargs"]
break
if method is not None:
return method(*args, **kwargs)
raise SimulatedDataException(f"Method {method} is not available for {self.parent.name}")
def init_paramaters(self, **kwargs):
"""Initialize the parameters for the Simulated Data
This methods should be implemented by the subclass.
It sets the default parameters for the simulated data in
self._params and calls self._update_init_params()
def sim_select_model(self, model: str) -> None:
"""
def get_sim_params(self) -> dict:
"""Return the currently parameters for the active simulation type in sim_type.
These parameters can be changed with set_sim_params.
Returns:
dict: Parameters of the currently active simulation in sim_type.
"""
return self._active_params
def set_sim_params(self, params: dict) -> None:
"""Change the current set of parameters for the active simulation type.
Method to select the active simulation model.
It will initiate the model_cls and parameters for the model.
Args:
params (dict): New parameters for the active simulation type.
model (str): Name of the simulation model to select.
Raises:
SimulatedDataException: If the new parameters can not be set or is not part of the parameters initiated.
"""
for k, v in params.items():
try:
if k == "noise":
self._active_params[k] = NoiseType(v)
else:
self._active_params[k] = v
except Exception as exc:
raise SimulatedDataException(
f"Could not set {k} to {v} in {self._active_params} with exception {exc}"
) from exc
model_cls = self.get_model_cls(model)
self._model = model_cls() if callable(model_cls) else model_cls
self._params = self.get_params_for_model_cls()
self._params.update(self._get_additional_params())
print(self._get_table_active_simulation())
def get_sim_type(self) -> SimulationType:
"""Return the simulation type of the simulation.
@property
def sim_params(self) -> dict:
"""
Property that returns the parameters for the active simulation model. It can also
be used to set the parameters for the active simulation updating the parameters of the model.
Returns:
SimulationType: Type of simulation (e.g. "constant" or "gauss).
dict: Parameters for the active simulation model.
The following example shows how to update the noise parameter of the current simulation.
>>> dev.<device>.sim.sim_params = {"noise": "poisson"}
"""
return self._simulation_type
return self._params
def set_sim_type(self, simulation_type: SimulationType) -> None:
"""Set the simulation type of the simulation."""
try:
self._simulation_type = SimulationType(simulation_type)
except ValueError as exc:
raise SimulatedDataException(
f"Could not set simulation type to {simulation_type}. Valid options are 'constant'"
" and 'gauss'"
) from exc
self._active_params = self._all_params.get(self._simulation_type, None)
def _compute_sim_state(self, signal_name: str) -> None:
"""Update the simulated state of the device.
If no computation is relevant, ignore this method.
Otherwise implement it in the subclass.
@sim_params.setter
def sim_params(self, params: dict):
"""
Method to set the parameters for the active simulation model.
"""
for k, v in params.items():
if k in self.sim_params:
if k == "noise":
self._params[k] = NoiseType(v)
elif k == "hot_pixel_types":
self._params[k] = [HotPixelType(entry) for entry in v]
else:
self._params[k] = v
if isinstance(self._model, Model) and k in self._model_params:
self._model_params[k].value = v
else:
raise SimulatedDataException(f"Parameter {k} not found in {self.sim_params}.")
def sim_get_models(self) -> list:
"""
Method to get the all available simulation models.
"""
return self.get_all_sim_models()
def update_sim_state(self, signal_name: str, value: any) -> None:
"""Update the simulated state of the device.
Args:
signal_name (str): Name of the signal to update.
value (any): Value to update in the simulated state.
"""
self.sim_state[signal_name]["value"] = value
self.sim_state[signal_name]["timestamp"] = ttime.time()
def _update_init_params(
self,
sim_type_default: SimulationType,
) -> None:
"""Update the initial parameters of the simulated data with input from deviceConfig.
@abstractmethod
def _get_additional_params(self) -> dict:
"""Initialize the default parameters for the noise."""
Args:
sim_type_default (SimulationType): Default simulation type to use if not specified in deviceConfig.
@abstractmethod
def get_model_cls(self, model: str) -> any:
"""
init_params = getattr(self.parent, "init_sim_params", None)
for sim_type in self._all_params.values():
for sim_type_config_element in sim_type:
if init_params:
if sim_type_config_element in init_params:
sim_type[sim_type_config_element] = init_params[sim_type_config_element]
# Set simulation type to default if not specified in deviceConfig
sim_type_select = (
init_params.get("sim_type", sim_type_default) if init_params else sim_type_default
Method to get the class for the active simulation model_cls
"""
@abstractmethod
def get_params_for_model_cls(self) -> dict:
"""
Method to get the parameters for the active simulation model.
"""
@abstractmethod
def get_all_sim_models(self) -> list[str]:
"""
Method to get all names from the available simulation models.
Returns:
list: List of available simulation models.
"""
@abstractmethod
def compute_sim_state(self, signal_name: str, compute_readback: bool) -> None:
"""
Method to compute the simulated state of the device.
"""
def _get_table_active_simulation(self, width: int = 140) -> PrettyTable:
"""Return a table with the active simulation model and parameters."""
table = PrettyTable()
table.title = f"Currently active model: {self._model}"
table.field_names = ["Parameter", "Value", "Type"]
for k, v in self.sim_params.items():
table.add_row([k, f"{v}", f"{type(v)}"])
table._min_width["Parameter"] = 25 if width > 75 else width // 3
table._min_width["Type"] = 25 if width > 75 else width // 3
table.max_table_width = width
table._min_table_width = width
return table
def _get_table_method_information(self, width: int = 140) -> PrettyTable:
"""Return a table with the information about methods."""
table = PrettyTable()
table.max_width["Value"] = 120
table.hrules = 1
table.title = "Available methods within the simulation module"
table.field_names = ["Method", "Docstring"]
table.add_row(
[
self.sim_get_models.__name__,
f"{self.sim_get_models.__doc__}",
]
)
self.set_sim_type(sim_type_select)
table.add_row([self.sim_select_model.__name__, self.sim_select_model.__doc__])
table.add_row(["sim_params", self.__class__.sim_params.__doc__])
table.max_table_width = width
table._min_table_width = width
table.align["Docstring"] = "l"
return table
def sim_show_all(self):
"""Returns a summary about the active simulation and available methods."""
width = 150
print(self._get_table_active_simulation(width=width))
print(self._get_table_method_information(width=width))
table = PrettyTable()
table.title = "Simulation module for current device"
table.field_names = ["All available models"]
table.add_row([", ".join(self.get_all_sim_models())])
table.max_table_width = width
table._min_table_width = width
print(table)
class SimulatedPositioner(SimulatedDataBase):
"""Simulated data class for a positioner."""
def _init_default_additional_params(self) -> None:
"""No need to init additional parameters for Positioner."""
def get_model_cls(self, model: str) -> any:
"""For the simulated positioners, no simulation models are currently implemented."""
return None
def get_params_for_model_cls(self) -> dict:
"""For the simulated positioners, no simulation models are currently implemented."""
return {}
def get_all_sim_models(self) -> list[str]:
"""
For the simulated positioners, no simulation models are currently implemented.
Returns:
list: List of available simulation models.
"""
return []
def _get_additional_params(self) -> dict:
"""No need to add additional parameters for Positioner."""
return {}
def compute_sim_state(self, signal_name: str, compute_readback: bool) -> None:
"""
For the simulated positioners, a computed signal is currently not used.
The position is updated by the parent device, and readback/setpoint values
have a jitter/tolerance introduced directly in the parent class (SimPositioner).
"""
if compute_readback:
method = None
value = self.execute_simulation_method(method=method, signal_name=signal_name)
self.update_sim_state(signal_name, value)
class SimulatedDataMonitor(SimulatedDataBase):
"""Simulated data for a monitor."""
"""Simulated data class for a monitor."""
def init_paramaters(self, **kwargs):
"""Initialize the parameters for the simulated data
def __init__(self, *args, parent=None, device_manager=None, **kwargs) -> None:
self._model_lookup = self.init_lmfit_models()
super().__init__(*args, parent=parent, device_manager=device_manager, **kwargs)
self._init_default()
This method will fill self._all_params with the default parameters for
SimulationType.CONSTANT and SimulationType.GAUSSIAN.
New simulation types can be added by adding a new key to self._all_params,
together with the required parameters for that simulation type. Please
also complement the docstring of this method with the new simulation type.
def _get_additional_params(self) -> None:
params = DEFAULT_PARAMS_NOISE.copy()
params.update(DEFAULT_PARAMS_MOTOR.copy())
return params
For SimulationType.CONSTANT:
Amp is the amplitude of the constant value.
Noise is the type of noise to add to the signal. Available options are 'poisson', 'uniform' or 'none'.
Noise multiplier is the multiplier of the noise, only relevant for uniform noise.
def _init_default(self) -> None:
"""Initialize the default parameters for the simulated data."""
self.sim_select_model("ConstantModel")
For SimulationType.GAUSSIAN:
ref_motor is the motor that is used as reference to compute the gaussian.
amp is the amplitude of the gaussian.
cen is the center of the gaussian.
sig is the sigma of the gaussian.
noise is the type of noise to add to the signal. Available options are 'poisson', 'uniform' or 'none'.
noise multiplier is the multiplier of the noise, only relevant for uniform noise.
def get_model_cls(self, model: str) -> any:
"""Get the class for the active simulation model."""
if model not in self._model_lookup:
raise SimulatedDataException(f"Model {model} not found in {self._model_lookup.keys()}.")
return self._model_lookup[model]
def get_all_sim_models(self) -> list[str]:
"""
self._all_params = {
SimulationType.CONSTANT: {
"amp": 100,
"noise": NoiseType.POISSON,
"noise_multiplier": 0.1,
},
SimulationType.GAUSSIAN: {
"ref_motor": "samx",
"amp": 100,
"cen": 0,
"sig": 1,
"noise": NoiseType.NONE,
"noise_multiplier": 0.1,
},
}
# Update init parameters and set simulation type to Constant if not specified otherwise in init_sim_params
self._update_init_params(sim_type_default=SimulationType.CONSTANT)
Method to get all names from the available simulation models from the lmfit.models pool.
def _compute_sim_state(self, signal_name: str) -> None:
Returns:
list: List of available simulation models.
"""
return list(self._model_lookup.keys())
def get_params_for_model_cls(self) -> dict:
"""Get the parameters for the active simulation model.
Check if default parameters are available for lmfit parameters.
Args:
sim_model (str): Name of the simulation model.
Returns:
dict: {name: value} for the active simulation model.
"""
rtr = {}
params = self._model.make_params()
for name, parameter in params.items():
if name in DEFAULT_PARAMS_LMFIT:
rtr[name] = DEFAULT_PARAMS_LMFIT[name]
parameter.value = rtr[name]
else:
if not any([np.isnan(parameter.value), np.isinf(parameter.value)]):
rtr[name] = parameter.value
else:
rtr[name] = 1
parameter.value = 1
self._model_params = params
return rtr
def model_lookup(self):
"""Get available models from lmfit.models."""
return self._model_lookup
def init_lmfit_models(self) -> dict:
"""
Get available models from lmfit.models.
Exclude Gaussian2dModel, ExpressionModel, Model, SplineModel.
Returns:
dictionary of model name : model class pairs for available models from LMFit.
"""
model_lookup = {}
for name, model_cls in inspect.getmembers(models):
try:
is_model = issubclass(model_cls, Model)
except TypeError:
is_model = False
if is_model and name not in [
"Gaussian2dModel",
"ExpressionModel",
"Model",
"SplineModel",
]:
model_lookup[name] = model_cls
return model_lookup
def compute_sim_state(self, signal_name: str, compute_readback: bool) -> None:
"""Update the simulated state of the device.
It will update the value in self.sim_state with the value computed by
@ -217,153 +418,193 @@ class SimulatedDataMonitor(SimulatedDataBase):
Args:
signal_name (str): Name of the signal to update.
"""
if self.get_sim_type() == SimulationType.CONSTANT:
method = "_compute_constant"
# value = self._compute_constant()
elif self.get_sim_type() == SimulationType.GAUSSIAN:
method = "_compute_gaussian"
# value = self._compute_gaussian()
if compute_readback:
method = self._compute
value = self.execute_simulation_method(method=method, signal_name=signal_name)
self.update_sim_state(signal_name, value)
value = self.execute_simulation_method(method=getattr(self, method))
def _compute(self, *args, **kwargs) -> float:
mot_name = self.sim_params["ref_motor"]
if self.device_manager and mot_name in self.device_manager.devices:
motor_pos = self.device_manager.devices[mot_name].obj.read()[mot_name]["value"]
else:
motor_pos = 0
method = self._model
value = float(method.eval(params=self._model_params, x=motor_pos))
return self._add_noise(value, self.sim_params["noise"], self.sim_params["noise_multiplier"])
def _add_noise(self, v: float, noise: NoiseType, noise_multiplier: float) -> float:
"""
Add the currently activated noise to the simulated data.
If NoiseType.NONE is active, the value will be returned
Args:
v (float): Value to add noise to.
Returns:
float: Value with added noise.
"""
if noise == NoiseType.POISSON:
ceiled_v = np.ceil(v)
v = np.random.poisson(ceiled_v, 1)[0] if ceiled_v > 0 else ceiled_v
return v
elif noise == NoiseType.UNIFORM:
v += np.random.uniform(-1, 1) * noise_multiplier
return v
return v
class SimulatedDataCamera(SimulatedDataBase):
"""Simulated class to compute data for a 2D camera."""
def __init__(self, *args, parent=None, device_manager=None, **kwargs) -> None:
self._model_lookup = self.init_2D_models()
self._all_default_model_params = defaultdict(dict)
self._init_default_camera_params()
super().__init__(*args, parent=parent, device_manager=device_manager, **kwargs)
self._init_default()
def _init_default(self) -> None:
"""Initialize the default model for a simulated camera
Use the default model "Gaussian".
"""
self.sim_select_model(SimulationType2D.GAUSSIAN)
def init_2D_models(self) -> dict:
"""
Get the available models for 2D camera simulations.
"""
model_lookup = {}
for _, model_cls in inspect.getmembers(SimulationType2D):
if isinstance(model_cls, SimulationType2D):
model_lookup[model_cls.value] = model_cls
return model_lookup
def _get_additional_params(self) -> None:
params = DEFAULT_PARAMS_NOISE.copy()
params.update(DEFAULT_PARAMS_HOT_PIXEL.copy())
return params
def _init_default_camera_params(self) -> None:
"""Initiate additional params for the simulated camera."""
self._all_default_model_params.update(
{
self._model_lookup[
SimulationType2D.CONSTANT.value
]: DEFAULT_PARAMS_CAMERA_CONSTANT.copy()
}
)
self._all_default_model_params.update(
{
self._model_lookup[
SimulationType2D.GAUSSIAN.value
]: DEFAULT_PARAMS_CAMERA_GAUSSIAN.copy()
}
)
def get_model_cls(self, model: str) -> any:
"""For the simulated positioners, no simulation models are currently implemented."""
if model not in self._model_lookup:
raise SimulatedDataException(f"Model {model} not found in {self._model_lookup.keys()}.")
return self._model_lookup[model]
def get_params_for_model_cls(self) -> dict:
"""For the simulated positioners, no simulation models are currently implemented."""
return self._all_default_model_params[self._model.value]
def get_all_sim_models(self) -> list[str]:
"""
For the simulated positioners, no simulation models are currently implemented.
Returns:
list: List of available simulation models.
"""
return [entry.value for entry in self._model_lookup.values()]
def compute_sim_state(self, signal_name: str, compute_readback: bool) -> None:
"""Update the simulated state of the device.
It will update the value in self.sim_state with the value computed by
the chosen simulation type.
Args:
signal_name (str) : Name of the signal to update.
compute_readback (bool) : Flag whether to compute readback based on function hosted in SimulatedData
"""
if compute_readback:
if self._model == SimulationType2D.CONSTANT:
method = "_compute_constant"
elif self._model == SimulationType2D.GAUSSIAN:
method = "_compute_gaussian"
value = self.execute_simulation_method(
signal_name=signal_name, method=getattr(self, method)
)
else:
value = self._compute_empty_image()
self.update_sim_state(signal_name, value)
def _compute_constant(self) -> float:
"""Computes constant value and adds noise if activated."""
v = self._active_params["amp"]
if self._active_params["noise"] == NoiseType.POISSON:
v = np.random.poisson(np.round(v), 1)[0]
return v
elif self._active_params["noise"] == NoiseType.UNIFORM:
v += np.random.uniform(-1, 1) * self._active_params["noise_multiplier"]
return v
elif self._active_params["noise"] == NoiseType.NONE:
v = self._active_params["amp"]
return v
else:
def _compute_empty_image(self) -> np.ndarray:
"""Computes return value for sim_type = "empty_image".
Returns:
float: 0
"""
try:
shape = self.parent.image_shape.get()
return np.zeros(shape)
except SimulatedDataException as exc:
raise SimulatedDataException(
f"Unknown noise type {self._active_params['noise']}. Please choose from 'poisson',"
" 'uniform' or 'none'."
)
f"Could not compute empty image for {self.parent.name} with {exc} raised. Deactivate eiger to continue."
) from exc
def _compute_constant(self) -> np.ndarray:
"""Compute a return value for SimulationType2D constant."""
try:
shape = self.parent.image_shape.get()
v = self._model_params.get("amplitude") * np.ones(shape, dtype=np.uint16)
return self._add_noise(v, self.sim_params["noise"], self.sim_params["noise_multiplier"])
except SimulatedDataException as exc:
raise SimulatedDataException(
f"Could not compute constant for {self.parent.name} with {exc} raised. Deactivate eiger to continue."
) from exc
def _compute_gaussian(self) -> float:
"""Computes return value for sim_type = "gauss".
The value is based on the parameters for the gaussian in
self._active_params and the position of the ref_motor
and adds noise based on the noise type.
self._active_params and adds noise based on the noise type.
If computation fails, it returns 0.
Returns: float
"""
params = self._active_params
try:
motor_pos = self.device_manager.devices[params["ref_motor"]].obj.read()[
params["ref_motor"]
]["value"]
v = params["amp"] * np.exp(
-((motor_pos - params["cen"]) ** 2) / (2 * params["sig"] ** 2)
amp = self.sim_params.get("amplitude")
cov = self.sim_params.get("covariance")
cen_off = self.sim_params.get("center_offset")
shape = self.sim_state[self.parent.image_shape.name]["value"]
pos, offset, cov, amp = self._prepare_params_gauss(
amp=amp, cov=cov, offset=cen_off, shape=shape
)
v = self._compute_multivariate_gaussian(pos=pos, cen_off=offset, cov=cov, amp=amp)
v = self._add_noise(
v,
noise=self.sim_params["noise"],
noise_multiplier=self.sim_params["noise_multiplier"],
)
return self._add_hot_pixel(
v,
coords=self.sim_params["hot_pixel_coords"],
hot_pixel_types=self.sim_params["hot_pixel_types"],
values=self.sim_params["hot_pixel_values"],
)
if params["noise"] == NoiseType.POISSON:
v = np.random.poisson(np.round(v), 1)[0]
elif params["noise"] == NoiseType.UNIFORM:
v += np.random.uniform(-1, 1) * params["noise_multiplier"]
return v
except SimulatedDataException as exc:
raise SimulatedDataException(
f"Could not compute gaussian for {self.parent.name} with {exc} raised. Deactivate eiger to continue."
) from exc
class SimulatedDataCamera(SimulatedDataBase):
"""Simulated class to compute data for a 2D camera."""
def init_paramaters(self, **kwargs):
"""Initialize the parameters for the simulated data
This method will fill self._all_params with the default parameters for
SimulationType.CONSTANT and SimulationType.GAUSSIAN.
New simulation types can be added by adding a new key to self._all_params,
together with the required parameters for that simulation type. Please
also complement the docstring of this method with the new simulation type.
For SimulationType.CONSTANT:
Amp is the amplitude of the constant value.
Noise is the type of noise to add to the signal. Available options are 'poisson', 'uniform' or 'none'.
Noise multiplier is the multiplier of the noise, only relevant for uniform noise.
For SimulationType.GAUSSIAN:
amp is the amplitude of the gaussian.
cen_off is the pixel offset from the center of the gaussian from the center of the image.
It is passed as a numpy array.
cov is the 2D covariance matrix used to specify the shape of the gaussian.
It is a 2x2 matrix and will be passed as a numpy array.
noise is the type of noise to add to the signal. Available options are 'poisson', 'uniform' or 'none'.
noise multiplier is the multiplier of the noise, only relevant for uniform noise.
"""
self._all_params = {
SimulationType.CONSTANT: {
"amp": 100,
"noise": NoiseType.POISSON,
"noise_multiplier": 0.1,
"hot_pixel": {
"coords": np.array([[100, 100], [200, 200]]),
"type": [HotPixelType.CONSTANT, HotPixelType.FLUCTUATING],
"value": [1e6, 1e4],
},
},
SimulationType.GAUSSIAN: {
"amp": 100,
"cen_off": np.array([0, 0]),
"cov": np.array([[10, 5], [5, 10]]),
"noise": NoiseType.NONE,
"noise_multiplier": 0.1,
"hot_pixel": {
"coords": np.array([[240, 240], [50, 20], [40, 400]]),
"type": [
HotPixelType.FLUCTUATING,
HotPixelType.CONSTANT,
HotPixelType.FLUCTUATING,
],
"value": np.array([1e4, 1e6, 1e4]),
},
},
}
# Update init parameters and set simulation type to Gaussian if not specified otherwise in init_sim_params
self._update_init_params(sim_type_default=SimulationType.GAUSSIAN)
def _compute_sim_state(self, signal_name: str) -> None:
"""Update the simulated state of the device.
It will update the value in self.sim_state with the value computed by
the chosen simulation type.
Args:
signal_name (str): Name of the signal to update.
"""
if self.get_sim_type() == SimulationType.CONSTANT:
method = "_compute_constant"
# value = self._compute_constant()
elif self.get_sim_type() == SimulationType.GAUSSIAN:
method = "_compute_gaussian"
# value = self._compute_gaussian()
value = self.execute_simulation_method(method=getattr(self, method))
self.update_sim_state(signal_name, value)
def _compute_constant(self) -> float:
"""Compute a return value for sim_type = Constant."""
try:
shape = self.sim_state[self.parent.image_shape.name]["value"]
v = self._active_params["amp"] * np.ones(shape, dtype=np.uint16)
return self._add_noise(v, self._active_params["noise"])
except SimulatedDataException as exc:
raise SimulatedDataException(
f"Could not compute constant for {self.parent.name} with {exc} raised. Deactivate eiger to continue."
) from exc
def _compute_multivariate_gaussian(
self,
pos: np.ndarray | list,
@ -375,7 +616,7 @@ class SimulatedDataCamera(SimulatedDataBase):
Args:
pos (np.ndarray): Position of the gaussian.
cen_off (np.ndarray): Offset from cener of image for the gaussian.
cen_off (np.ndarray): Offset from center of image for the gaussian.
cov (np.ndarray): Covariance matrix of the gaussian.
Returns:
@ -398,11 +639,15 @@ class SimulatedDataCamera(SimulatedDataBase):
v *= amp / np.max(v)
return v
def _prepare_params_gauss(self, params: dict, shape: tuple) -> tuple:
def _prepare_params_gauss(
self, amp: float, cov: np.ndarray, offset: np.ndarray, shape: tuple
) -> tuple:
"""Prepare the positions for the gaussian.
Args:
params (dict): Parameters for the gaussian.
amp (float): Amplitude of the gaussian.
cov (np.ndarray): Covariance matrix of the gaussian.
offset (np.ndarray): Offset from the center of the image.
shape (tuple): Shape of the image.
Returns:
tuple: Positions, offset and covariance matrix for the gaussian.
@ -415,12 +660,9 @@ class SimulatedDataCamera(SimulatedDataBase):
pos[:, :, 0] = x
pos[:, :, 1] = y
offset = params["cen_off"]
cov = params["cov"]
amp = params["amp"]
return pos, offset, cov, amp
def _add_noise(self, v: np.ndarray, noise: NoiseType) -> np.ndarray:
def _add_noise(self, v: np.ndarray, noise: NoiseType, noise_multiplier: float) -> np.ndarray:
"""Add noise to the simulated data.
Args:
@ -431,51 +673,26 @@ class SimulatedDataCamera(SimulatedDataBase):
v = np.random.poisson(np.round(v), v.shape)
return v
if noise == NoiseType.UNIFORM:
multiplier = self._active_params["noise_multiplier"]
v += np.random.uniform(-multiplier, multiplier, v.shape)
v += np.random.uniform(-noise_multiplier, noise_multiplier, v.shape)
return v
if self._active_params["noise"] == NoiseType.NONE:
if noise == NoiseType.NONE:
return v
def _add_hot_pixel(self, v: np.ndarray, hot_pixel: dict) -> np.ndarray:
def _add_hot_pixel(
self, v: np.ndarray, coords: list, hot_pixel_types: list, values: list
) -> np.ndarray:
"""Add hot pixels to the simulated data.
Args:
v (np.ndarray): Simulated data.
hot_pixel (dict): Hot pixel parameters.
"""
for coords, hot_pixel_type, value in zip(
hot_pixel["coords"], hot_pixel["type"], hot_pixel["value"]
):
if coords[0] < v.shape[0] and coords[1] < v.shape[1]:
for coord, hot_pixel_type, value in zip(coords, hot_pixel_types, values):
if coord[0] < v.shape[0] and coord[1] < v.shape[1]:
if hot_pixel_type == HotPixelType.CONSTANT:
v[coords[0], coords[1]] = value
v[coord[0], coord[1]] = value
elif hot_pixel_type == HotPixelType.FLUCTUATING:
maximum = np.max(v) if np.max(v) != 0 else 1
if v[coords[0], coords[1]] / maximum > 0.5:
v[coords[0], coords[1]] = value
if v[coord[0], coord[1]] / maximum > 0.5:
v[coord[0], coord[1]] = value
return v
def _compute_gaussian(self) -> float:
"""Computes return value for sim_type = "gauss".
The value is based on the parameters for the gaussian in
self._active_params and adds noise based on the noise type.
If computation fails, it returns 0.
Returns: float
"""
try:
params = self._active_params
shape = self.sim_state[self.parent.image_shape.name]["value"]
pos, offset, cov, amp = self._prepare_params_gauss(self._active_params, shape)
v = self._compute_multivariate_gaussian(pos=pos, cen_off=offset, cov=cov, amp=amp)
v = self._add_noise(v, params["noise"])
return self._add_hot_pixel(v, params["hot_pixel"])
except SimulatedDataException as exc:
raise SimulatedDataException(
f"Could not compute gaussian for {self.parent.name} with {exc} raised. Deactivate eiger to continue."
) from exc

View File

@ -10,7 +10,7 @@ class DeviceProxy(BECDeviceBase):
"""DeviceProxy class inherits from BECDeviceBase."""
class SlitLookup(DeviceProxy):
class SlitProxy(DeviceProxy):
"""
Simulation framework to immidate the behaviour of slits.
@ -27,11 +27,12 @@ class SlitLookup(DeviceProxy):
slit_sim:
readoutPriority: on_request
deviceClass: SlitLookup
deviceClass: SlitProxy
deviceConfig:
eiger:
cen_off: [0, 0] # [x,y]
cov: [[1000, 500], [200, 1000]] # [[x,x],[y,y]]
signal_name: image
center_offset: [0, 0] # [x,y]
covariance: [[1000, 500], [200, 1000]] # [[x,x],[y,y]]
pixel_size: 0.01
ref_motors: [samx, samy]
slit_width: [1, 1]
@ -61,7 +62,9 @@ class SlitLookup(DeviceProxy):
print(self.__doc__)
def _update_device_config(self, config: dict) -> None:
"""Update the config from the device_config for the pinhole lookup table.
"""
BEC will call this method on every object upon initializing devices to pass over the deviceConfig
from the config file. It can be conveniently be used to hand over initial parameters to the device.
Args:
config (dict): Config dictionary.
@ -83,8 +86,8 @@ class SlitLookup(DeviceProxy):
"""Compile the lookup table for the simulated camera."""
for device_name in self.config.keys():
self._lookup[device_name] = {
# "obj": self,
"method": self._compute,
"signal_name": self.config[device_name]["signal_name"],
"args": (device_name,),
"kwargs": {},
}
@ -96,22 +99,28 @@ class SlitLookup(DeviceProxy):
Args:
device_name (str): Name of the device.
signal_name (str): Name of the signal.
Returns:
np.ndarray: Lookup table for the simulated camera.
"""
device_obj = self.device_manager.devices.get(device_name).obj
params = device_obj.sim._all_params.get("gauss")
params = device_obj.sim.sim_params
shape = device_obj.image_shape.get()
params.update(
{
"noise": NoiseType.POISSON,
"cov": np.array(self.config[device_name]["cov"]),
"cen_off": np.array(self.config[device_name]["cen_off"]),
"covariance": np.array(self.config[device_name]["covariance"]),
"center_offset": np.array(self.config[device_name]["center_offset"]),
}
)
amp = params.get("amplitude")
cov = params.get("covariance")
cen_off = params.get("center_offset")
pos, offset, cov, amp = device_obj.sim._prepare_params_gauss(params, shape)
pos, offset, cov, amp = device_obj.sim._prepare_params_gauss(
amp=amp, cov=cov, offset=cen_off, shape=shape
)
v = device_obj.sim._compute_multivariate_gaussian(pos=pos, cen_off=offset, cov=cov, amp=amp)
device_pos = self.config[device_name]["pixel_size"] * pos
valid_mask = self._create_mask(
@ -122,8 +131,15 @@ class SlitLookup(DeviceProxy):
)
valid_mask = self._blur_image(valid_mask, sigma=self._gaussian_blur_sigma)
v *= valid_mask
v = device_obj.sim._add_noise(v, params["noise"])
v = device_obj.sim._add_hot_pixel(v, params["hot_pixel"])
v = device_obj.sim._add_noise(
v, noise=params["noise"], noise_multiplier=params["noise_multiplier"]
)
v = device_obj.sim._add_hot_pixel(
v,
coords=params["hot_pixel_coords"],
hot_pixel_types=params["hot_pixel_types"],
values=params["hot_pixel_values"],
)
return v
def _blur_image(self, image: np.ndarray, sigma: float = 1) -> np.ndarray:
@ -159,6 +175,6 @@ class SlitLookup(DeviceProxy):
if __name__ == "__main__":
# Example usage
pinhole = SlitLookup(name="pinhole", device_manager=None)
pinhole = SlitProxy(name="pinhole", device_manager=None)
pinhole.describe()
print(pinhole)

View File

@ -1,28 +1,40 @@
import time as ttime
import time
import numpy as np
from bec_lib import bec_logger
import numpy as np
from ophyd import Signal, Kind
from ophyd.utils import ReadOnlyError
logger = bec_logger.logger
# Readout precision for Setable/Readonly/ComputedReadonly signals
# Readout precision for Setable/ReadOnlySignal signals
PRECISION = 3
class SetableSignal(Signal):
"""Setable signal for simulated devices.
It will return the value of the readback signal based on the position
created in the sim_state dictionary of the parent device.
The signal will store the value in sim_state of the SimulatedData class of the parent device.
It will also return the value from sim_state when get is called. Compared to the ReadOnlySignal,
this signal can be written to.
>>> signal = SetableSignal(name="signal", parent=parent, value=0)
Parameters
----------
name (string) : Name of the signal
parent (object) : Parent object of the signal, default none.
value (any) : Initial value of the signal, default 0.
kind (int) : Kind of the signal, default Kind.normal.
precision (float) : Precision of the signal, default PRECISION.
"""
def __init__(
self,
*args,
name: str,
value: any = None,
*args,
value: any = 0,
kind: int = Kind.normal,
precision: float = PRECISION,
**kwargs,
@ -34,7 +46,6 @@ class SetableSignal(Signal):
)
self._value = value
self.precision = precision
# Init the sim_state, if self.parent.sim available, use it, else use self.parent
self.sim = getattr(self.parent, "sim", self.parent)
self._update_sim_state(value)
@ -50,6 +61,7 @@ class SetableSignal(Signal):
"""Update the timestamp of the readback value."""
return self.sim.sim_state[self.name]["timestamp"]
# pylint: disable=arguments-differ
def get(self):
"""Get the current position of the simulated device.
@ -58,6 +70,7 @@ class SetableSignal(Signal):
self._value = self._get_value()
return self._value
# pylint: disable=arguments-differ
def put(self, value):
"""Put the value to the simulated device.
@ -83,111 +96,55 @@ class SetableSignal(Signal):
class ReadOnlySignal(Signal):
"""Readonly signal for simulated devices.
"""Computed readback signal for simulated devices.
If initiated without a value, it will set the initial value to 0.
The readback will be computed from a function hosted in the SimulatedData class from the parent device
if compute_readback is True. Else, it will return the value stored int sim.sim_state directly.
>>> signal = ComputedReadOnlySignal(name="signal", parent=parent, value=0, compute_readback=True)
Parameters
----------
name (string) : Name of the signal
parent (object) : Parent object of the signal, default none.
value (any) : Initial value of the signal, default 0.
kind (int) : Kind of the signal, default Kind.normal.
precision (float) : Precision of the signal, default PRECISION.
compute_readback (bool) : Flag whether to compute readback based on function hosted in SimulatedData
class. If False, sim_state value will be returned, if True, new value will be computed
"""
def __init__(
self,
*args,
name: str,
*args,
parent=None,
value: any = 0,
kind: int = Kind.normal,
precision: float = PRECISION,
compute_readback: bool = False,
**kwargs,
):
super().__init__(*args, name=name, value=value, kind=kind, **kwargs)
super().__init__(*args, name=name, parent=parent, value=value, kind=kind, **kwargs)
self._metadata.update(
connected=True,
write_access=False,
)
self.precision = precision
self._value = value
# Init the sim_state, if self.parent.sim available, use it, else use self.parent
self.precision = precision
self.compute_readback = compute_readback
self.sim = getattr(self.parent, "sim", None)
self._init_sim_state()
if self.sim:
self._init_sim_state()
def _init_sim_state(self) -> None:
"""Init the readback value and timestamp in sim_state"""
if self.sim:
self.sim.update_sim_state(self.name, self._value)
def _get_value(self) -> any:
"""Get the value of the readback from sim_state."""
if self.sim:
return self.sim.sim_state[self.name]["value"]
else:
return np.random.rand()
def _get_timestamp(self) -> any:
"""Get the timestamp of the readback from sim_state."""
if self.sim:
return self.sim.sim_state[self.name]["timestamp"]
else:
return ttime.time()
def get(self) -> any:
"""Get the current position of the simulated device.
Core function for signal.
"""
self._value = self._get_value()
return self._value
def put(self, value) -> None:
"""Put method, should raise ReadOnlyError since the signal is readonly."""
raise ReadOnlyError(f"The signal {self.name} is readonly.")
def describe(self):
"""Describe the readback signal.
Core function for signal.
"""
res = super().describe()
if self.precision is not None:
res[self.name]["precision"] = self.precision
return res
@property
def timestamp(self):
"""Timestamp of the readback value"""
return self._get_timestamp()
class ComputedReadOnlySignal(Signal):
"""Computed readback signal for simulated devices.
It will return the value computed from the sim_state of the signal.
This can be configured in parent.sim.
"""
def __init__(
self,
*args,
name: str,
value: any = None,
kind: int = Kind.normal,
precision: float = PRECISION,
**kwargs,
):
super().__init__(*args, name=name, value=value, kind=kind, **kwargs)
self._metadata.update(
connected=True,
write_access=False,
)
self._value = value
self.precision = precision
# Init the sim_state, if self.parent.sim available, use it, else use self.parent
self.sim = getattr(self.parent, "sim", self.parent)
self._update_sim_state()
"""Create the initial sim_state in the SimulatedData class of the parent device."""
self.sim.update_sim_state(self.name, self._value)
def _update_sim_state(self) -> None:
"""Update the readback value.
Call _compute_sim_state in parent device which updates the sim_state.
"""
self.sim._compute_sim_state(self.name)
"""Update the readback value."""
self.sim.compute_sim_state(signal_name=self.name, compute_readback=self.compute_readback)
def _get_value(self) -> any:
"""Update the timestamp of the readback value."""
@ -197,15 +154,16 @@ class ComputedReadOnlySignal(Signal):
"""Update the timestamp of the readback value."""
return self.sim.sim_state[self.name]["timestamp"]
# pylint: disable=arguments-differ
def get(self):
"""Get the current position of the simulated device.
Core function for signal.
"""
self._update_sim_state()
self._value = self._get_value()
return self._value
"""Get the current position of the simulated device."""
if self.sim:
self._update_sim_state()
self._value = self._get_value()
return self._value
return np.random.rand()
# pylint: disable=arguments-differ
def put(self, value) -> None:
"""Put method, should raise ReadOnlyError since the signal is readonly."""
raise ReadOnlyError(f"The signal {self.name} is readonly.")
@ -223,15 +181,6 @@ class ComputedReadOnlySignal(Signal):
@property
def timestamp(self):
"""Timestamp of the readback value"""
return self._get_timestamp()
if __name__ == "__main__":
from ophyd_devices.sim import SimPositioner
positioner = SimPositioner(name="positioner", parent=None)
print(positioner.velocity.get())
positioner.velocity.put(10)
print(positioner.velocity.get())
positioner.velocity.put(1)
print(positioner.velocity.get())
if self.sim:
return self._get_timestamp()
return time.time()

View File

@ -14,6 +14,7 @@ class DummyControllerDevice(Device):
class DummyController:
USER_ACCESS = [
"some_var",
"some_var_property",
"controller_show_all",
"_func_with_args",
"_func_with_args_and_kwargs",
@ -23,11 +24,19 @@ class DummyController:
some_var = 10
another_var = 20
def __init__(self) -> None:
self._some_var_property = None
self.connected = False
@property
def some_var_property(self):
return self._some_var_property
def on(self):
self._connected = True
self.connected = True
def off(self):
self._connected = False
self.connected = False
def _func_with_args(self, *args):
return args

50
ophyd_devices/sim/test.py Normal file
View File

@ -0,0 +1,50 @@
import lmfit
import inspect
class LmfitModelMixin:
# def __init__(self):
# self.model = lmfit.models.GaussianModel()
# self.params = self.model.make_params()
# self.params["center"].set(value=0)
# self.params["amplitude"].set(value=1)
# self.params["sigma"].set(value=1)
@staticmethod
def available_models() -> dict:
"""
Get available models from lmfit.models.
Exclude Gaussian2dModel, ExpressionModel, Model, SplineModel.
"""
avail_models = {}
for name, model_cls in inspect.getmembers(lmfit.models):
try:
is_model = issubclass(model_cls, lmfit.model.Model)
except TypeError:
is_model = False
if is_model and name not in [
"Gaussian2dModel",
"ExpressionModel",
"Model",
"SplineModel",
]:
avail_models[name] = model_cls
return avail_models
def create_properties(self):
"""
Create properties for model parameters.
"""
for name in self.available_models():
setattr(self, name, param)
@staticmethod
def get_model(model: str) -> lmfit.Model:
"""Get model for given string."""
if isinstance(model, str):
model = getattr(lmfit.models, model, None)
if not model:
raise ValueError(f"Model {model} not found.")
return model

View File

@ -1,8 +1,14 @@
from ophyd_devices.utils.bec_device_base import BECDeviceBase, BECDevice
from ophyd import Device, Signal
def test_BECDeviceBase():
# Test the BECDeviceBase class
test = BECDeviceBase(name="test")
assert isinstance(test, BECDevice)
assert test.connected is True
bec_device_base = BECDeviceBase(name="test")
assert isinstance(bec_device_base, BECDevice)
assert bec_device_base.connected is True
signal = Signal(name="signal")
assert isinstance(signal, BECDevice)
device = Device(name="device")
assert isinstance(device, BECDevice)