mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-05-28 15:40:42 +02:00
refactor: add DeviceProxy class to sim_framework
refactor(__init__): remove bec_device_base from import refactor: cleanup __init__ refactor: cleanup refactor: cleanup, renaming and small fixes to sim_framework. refactor: cleanup imports refactor: cleanup
This commit is contained in:
parent
491e105af0
commit
01c8559319
@ -14,6 +14,7 @@ from .sim.sim import SimPositioner
|
||||
from .sim.sim import SimPositioner as SynAxisOPAAS
|
||||
from .sim.sim import SynDeviceOPAAS, SynFlyer
|
||||
from .sim.sim_signals import ReadOnlySignal
|
||||
from .sim.sim_frameworks import DeviceProxy, SlitLookup
|
||||
from .sim.sim_signals import ReadOnlySignal as SynSignalRO
|
||||
from .sls_devices.sls_devices import SLSInfo, SLSOperatorMessages
|
||||
from .smaract.smaract_ophyd import SmaractMotor
|
||||
|
@ -8,3 +8,5 @@ from .sim import (
|
||||
from .sim_xtreme import SynXtremeOtf
|
||||
|
||||
from .sim_signals import SetableSignal, ReadOnlySignal, ComputedReadOnlySignal
|
||||
|
||||
from .sim_frameworks import SlitLookup
|
||||
|
@ -1,3 +1,4 @@
|
||||
from collections import defaultdict
|
||||
import os
|
||||
import threading
|
||||
import time as ttime
|
||||
@ -68,11 +69,17 @@ class SimMonitor(Device):
|
||||
self.precision = precision
|
||||
self.init_sim_params = sim_init
|
||||
self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs)
|
||||
self._lookup_table = []
|
||||
|
||||
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
|
||||
self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None)
|
||||
self.readback.name = self.name
|
||||
|
||||
@property
|
||||
def lookup_table(self) -> None:
|
||||
"""lookup_table property"""
|
||||
return self._lookup_table
|
||||
|
||||
|
||||
class SimCamera(Device):
|
||||
"""A simulated device mimic any 2D camera.
|
||||
@ -127,6 +134,7 @@ class SimCamera(Device):
|
||||
):
|
||||
self.device_manager = device_manager
|
||||
self.init_sim_params = sim_init
|
||||
self._lookup_table = []
|
||||
self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs)
|
||||
|
||||
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
|
||||
@ -135,6 +143,11 @@ class SimCamera(Device):
|
||||
self.scaninfo = None
|
||||
self._update_scaninfo()
|
||||
|
||||
@property
|
||||
def lookup_table(self) -> None:
|
||||
"""lookup_table property"""
|
||||
return self._lookup_table
|
||||
|
||||
def trigger(self) -> DeviceStatus:
|
||||
"""Trigger the camera to acquire images.
|
||||
|
||||
@ -277,6 +290,7 @@ class SimPositioner(Device, PositionerBase):
|
||||
self.precision = precision
|
||||
self.tolerance = tolerance
|
||||
self.init_sim_params = sim
|
||||
self._lookup_table = []
|
||||
|
||||
self.speed = speed
|
||||
self.update_frequency = update_frequency
|
||||
@ -286,7 +300,7 @@ class SimPositioner(Device, PositionerBase):
|
||||
# initialize inner dictionary with simulated state
|
||||
self.sim = self.sim_cls(parent=self, **kwargs)
|
||||
|
||||
super().__init__(name=name, labels=labels, kind=kind, **kwargs)
|
||||
super().__init__(name=name, labels=labels, parent=parent, kind=kind, **kwargs)
|
||||
# Rename self.readback.name to self.name, also in self.sim_state
|
||||
self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None)
|
||||
self.readback.name = self.name
|
||||
@ -311,6 +325,11 @@ class SimPositioner(Device, PositionerBase):
|
||||
"""Return the high limit of the simulated device."""
|
||||
return self.limits[1]
|
||||
|
||||
@property
|
||||
def lookup_table(self) -> None:
|
||||
"""lookup_table property"""
|
||||
return self._lookup_table
|
||||
|
||||
def check_value(self, value: any):
|
||||
"""
|
||||
Check that requested position is within existing limits.
|
||||
@ -519,4 +538,4 @@ class SynDeviceSubOPAAS(Device):
|
||||
class SynDeviceOPAAS(Device):
|
||||
x = Cpt(SimPositioner, name="x")
|
||||
y = Cpt(SimPositioner, name="y")
|
||||
z = Cpt(SimPositioner, name="z")
|
||||
z = Cpt(SynDeviceSubOPAAS, name="z")
|
||||
|
@ -3,7 +3,6 @@ from __future__ import annotations
|
||||
from collections import defaultdict
|
||||
|
||||
import enum
|
||||
import inspect
|
||||
import time as ttime
|
||||
import numpy as np
|
||||
|
||||
@ -52,37 +51,22 @@ class SimulatedDataBase:
|
||||
self._all_params = defaultdict(dict)
|
||||
self.device_manager = device_manager
|
||||
self._simulation_type = None
|
||||
self.lookup_table = getattr(self.parent, "lookup_table", None)
|
||||
self.lookup_table = getattr(self.parent, "lookup_table", [])
|
||||
self.init_paramaters(**kwargs)
|
||||
self._active_params = self._all_params.get(self._simulation_type, None)
|
||||
# self.register_in_lookup_table()
|
||||
|
||||
# self.lookup_table = self.update_lookup_table()
|
||||
|
||||
# def update_lookup_table(self) -> None:
|
||||
# """Update the lookup table with the new value for the signal."""
|
||||
# table = getattr(self.device_manager.lookup_table, self.parent.name, None)
|
||||
|
||||
# return getattr(self.device_manager.lookup_table, self.parent.name, None)
|
||||
|
||||
# def register_in_lookup_table(self) -> None:
|
||||
# """Register the simulated device in the lookup table."""
|
||||
# self.device_manager.lookup_table[self.parent.name] = {"obj": self, "method": "_compute_sim_state", "args": (), "kwargs": {}}
|
||||
|
||||
def execute_simulation_method(self, *args, method=None, **kwargs) -> any:
|
||||
"""Execute the method from the lookup table."""
|
||||
|
||||
if self.lookup_table and self.parent.name in self.lookup_table:
|
||||
# obj = self.parent.lookup_table[self.parent.name]["obj"]
|
||||
method = self.lookup_table[self.parent.name]["method"]
|
||||
args = self.lookup_table[self.parent.name]["args"]
|
||||
kwargs = self.lookup_table[self.parent.name]["kwargs"]
|
||||
# Do I need args and kwargs! Why!!
|
||||
if self.lookup_table and self.device_manager.devices.get(self.lookup_table[0]) is not None:
|
||||
sim_device = self.device_manager.devices.get(self.lookup_table[0])
|
||||
# pylint: disable=protected-access
|
||||
if sim_device.enabled is True:
|
||||
method = sim_device.obj.lookup[self.parent.name]["method"]
|
||||
args = sim_device.obj.lookup[self.parent.name]["args"]
|
||||
kwargs = sim_device.obj.lookup[self.parent.name]["kwargs"]
|
||||
|
||||
if method is not None:
|
||||
method_arguments = list(inspect.signature(method).parameters.keys())
|
||||
if all([True for arg in method_arguments if arg in args or arg in kwargs]):
|
||||
return method(*args, **kwargs)
|
||||
return method(*args, **kwargs)
|
||||
raise SimulatedDataException(f"Method {method} is not available for {self.parent.name}")
|
||||
|
||||
def init_paramaters(self, **kwargs):
|
||||
@ -330,7 +314,7 @@ class SimulatedDataCamera(SimulatedDataBase):
|
||||
},
|
||||
},
|
||||
SimulationType.GAUSSIAN: {
|
||||
"amp": 500,
|
||||
"amp": 100,
|
||||
"cen_off": np.array([0, 0]),
|
||||
"cov": np.array([[10, 5], [5, 10]]),
|
||||
"noise": NoiseType.NONE,
|
||||
|
@ -1,64 +1,95 @@
|
||||
import numpy as np
|
||||
|
||||
from scipy.ndimage import gaussian_filter
|
||||
|
||||
from collections import defaultdict
|
||||
from ophyd_devices.sim.sim_data import NoiseType
|
||||
from ophyd_devices.utils.bec_device_base import BECDeviceBase
|
||||
|
||||
|
||||
class PinholeLookup:
|
||||
"""Pinhole lookup table for simulated devices.
|
||||
class DeviceProxy(BECDeviceBase):
|
||||
"""DeviceProxy class inherits from BECDeviceBase."""
|
||||
|
||||
When activated, it will create a lookup table for a simulated camera based on the config.
|
||||
The lookup table will be used to simulate the effect of a pinhole on the camera image.
|
||||
An example config is shown below, for the dev.eiger, with dev.samx and dev.samy as reference motors.
|
||||
|
||||
eiger:
|
||||
cen_off: [0, 0] # [x,y]
|
||||
cov: [[1000, 500], [200, 1000]] # [[x,x],[y,y]]
|
||||
pixel_size: 0.01
|
||||
signal: image
|
||||
ref_motors: [samx, samy]
|
||||
slit_width: [1, 2]
|
||||
motor_dir: [0, 1] # x:0 , y:1, z:2 coordinates
|
||||
class SlitLookup(DeviceProxy):
|
||||
"""
|
||||
Simulation framework to immidate the behaviour of slits.
|
||||
|
||||
This device is a proxy that is meant to overrides the behaviour of a SimCamera.
|
||||
You may use this to simulate the effect of slits on the camera image.
|
||||
|
||||
Parameters can be configured via the deviceConfig field in the device_config.
|
||||
The example below shows the configuration for a pinhole simulation on an Eiger detector,
|
||||
where the pinhole is defined by the position of motors samx and samy. These devices must
|
||||
exist in your config.
|
||||
|
||||
To update for instance the pixel_size directly, you can directly access the DeviceConfig via
|
||||
`dev.eiger.get_device_config()` or update it `dev.eiger.get_device_config({'eiger' : {'pixel_size': 0.1}})`
|
||||
|
||||
slit_sim:
|
||||
readoutPriority: on_request
|
||||
deviceClass: SlitLookup
|
||||
deviceConfig:
|
||||
eiger:
|
||||
cen_off: [0, 0] # [x,y]
|
||||
cov: [[1000, 500], [200, 1000]] # [[x,x],[y,y]]
|
||||
pixel_size: 0.01
|
||||
ref_motors: [samx, samy]
|
||||
slit_width: [1, 1]
|
||||
motor_dir: [0, 1] # x:0 , y:1, z:2 coordinates
|
||||
enabled: true
|
||||
readOnly: false
|
||||
"""
|
||||
|
||||
USER_ACCESS = ["enabled", "lookup", "help"]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*args,
|
||||
name,
|
||||
*args,
|
||||
device_manager=None,
|
||||
config: dict = None,
|
||||
**kwargs,
|
||||
):
|
||||
self.name = name
|
||||
self.device_manager = device_manager
|
||||
self.config = config
|
||||
self._enabled = True
|
||||
self.config = None
|
||||
self._lookup = defaultdict(dict)
|
||||
self._gaussian_blur_sigma = 8
|
||||
self._gaussian_blur_sigma = 5
|
||||
super().__init__(name, *args, **kwargs)
|
||||
|
||||
def help(self) -> None:
|
||||
"""Print documentation for the SlitLookup device."""
|
||||
print(self.__doc__)
|
||||
|
||||
def _update_device_config(self, config: dict) -> None:
|
||||
"""Update the config from the device_config for the pinhole lookup table.
|
||||
|
||||
Args:
|
||||
config (dict): Config dictionary.
|
||||
"""
|
||||
self.config = config
|
||||
self._compile_lookup()
|
||||
|
||||
@property
|
||||
def lookup(self):
|
||||
"""lookup property"""
|
||||
return (
|
||||
self._lookup
|
||||
if getattr(self.device_manager.devices, self.name).enabled is True
|
||||
else None
|
||||
)
|
||||
return self._lookup
|
||||
|
||||
@lookup.setter
|
||||
def lookup(self, update: dict) -> None:
|
||||
"""lookup setter"""
|
||||
self._lookup.update(update)
|
||||
|
||||
def _compile_lookup(self):
|
||||
"""Compile the lookup table for the simulated camera."""
|
||||
for device_name in self.config.keys():
|
||||
self.lookup[device_name] = {
|
||||
"obj": self,
|
||||
self._lookup[device_name] = {
|
||||
# "obj": self,
|
||||
"method": self._compute,
|
||||
"args": {},
|
||||
"kwargs": {"device_name": device_name},
|
||||
"args": (device_name,),
|
||||
"kwargs": {},
|
||||
}
|
||||
|
||||
def _compute(self, *args, device_name: str = None, **kwargs) -> np.ndarray:
|
||||
def _compute(self, device_name: str, *args, **kwargs) -> np.ndarray:
|
||||
"""
|
||||
Compute the lookup table for the simulated camera.
|
||||
It copies the sim_camera bevahiour and adds a mask to simulate the effect of a pinhole.
|
||||
@ -69,7 +100,7 @@ class PinholeLookup:
|
||||
Returns:
|
||||
np.ndarray: Lookup table for the simulated camera.
|
||||
"""
|
||||
device_obj = self.device_manager.devices.get(device_name)
|
||||
device_obj = self.device_manager.devices.get(device_name).obj
|
||||
params = device_obj.sim._all_params.get("gauss")
|
||||
shape = device_obj.image_shape.get()
|
||||
params.update(
|
||||
@ -87,7 +118,7 @@ class PinholeLookup:
|
||||
device_pos=device_pos,
|
||||
ref_motors=self.config[device_name]["ref_motors"],
|
||||
width=self.config[device_name]["slit_width"],
|
||||
dir=self.config[device_name]["motor_dir"],
|
||||
direction=self.config[device_name]["motor_dir"],
|
||||
)
|
||||
valid_mask = self._blur_image(valid_mask, sigma=self._gaussian_blur_sigma)
|
||||
v *= valid_mask
|
||||
@ -95,7 +126,7 @@ class PinholeLookup:
|
||||
v = device_obj.sim._add_hot_pixel(v, params["hot_pixel"])
|
||||
return v
|
||||
|
||||
def _blur_image(self, image: np.ndarray, sigma: float = 5) -> np.ndarray:
|
||||
def _blur_image(self, image: np.ndarray, sigma: float = 1) -> np.ndarray:
|
||||
"""Blur the image with a gaussian filter.
|
||||
|
||||
Args:
|
||||
@ -108,14 +139,26 @@ class PinholeLookup:
|
||||
return gaussian_filter(image, sigma=sigma)
|
||||
|
||||
def _create_mask(
|
||||
self, device_pos: np.ndarray, ref_motors: list[str], width: list[float], dir: list[int]
|
||||
self,
|
||||
device_pos: np.ndarray,
|
||||
ref_motors: list[str],
|
||||
width: list[float],
|
||||
direction: list[int],
|
||||
):
|
||||
mask = np.ones_like(device_pos, dtype=bool)
|
||||
mask = np.ones_like(device_pos)
|
||||
for ii, motor_name in enumerate(ref_motors):
|
||||
motor_pos = self.device_manager.devices.get(motor_name).read()[motor_name]["value"]
|
||||
motor_pos = self.device_manager.devices.get(motor_name).obj.read()[motor_name]["value"]
|
||||
edges = [motor_pos + width[ii] / 2, motor_pos - width[ii] / 2]
|
||||
mask[..., dir[ii]] = np.logical_and(
|
||||
device_pos[..., dir[ii]] > np.min(edges), device_pos[..., dir[ii]] < np.max(edges)
|
||||
mask[..., direction[ii]] = np.logical_and(
|
||||
device_pos[..., direction[ii]] > np.min(edges),
|
||||
device_pos[..., direction[ii]] < np.max(edges),
|
||||
)
|
||||
|
||||
return np.prod(mask, axis=2)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Example usage
|
||||
pinhole = SlitLookup(name="pinhole", device_manager=None)
|
||||
pinhole.describe()
|
||||
print(pinhole)
|
||||
|
Loading…
x
Reference in New Issue
Block a user