diff --git a/ophyd_devices/__init__.py b/ophyd_devices/__init__.py index 1f88e9f..eccecdf 100644 --- a/ophyd_devices/__init__.py +++ b/ophyd_devices/__init__.py @@ -14,6 +14,7 @@ from .sim.sim import SimPositioner from .sim.sim import SimPositioner as SynAxisOPAAS from .sim.sim import SynDeviceOPAAS, SynFlyer from .sim.sim_signals import ReadOnlySignal +from .sim.sim_frameworks import DeviceProxy, SlitLookup from .sim.sim_signals import ReadOnlySignal as SynSignalRO from .sls_devices.sls_devices import SLSInfo, SLSOperatorMessages from .smaract.smaract_ophyd import SmaractMotor diff --git a/ophyd_devices/sim/__init__.py b/ophyd_devices/sim/__init__.py index f185011..2cd6e77 100644 --- a/ophyd_devices/sim/__init__.py +++ b/ophyd_devices/sim/__init__.py @@ -8,3 +8,5 @@ from .sim import ( from .sim_xtreme import SynXtremeOtf from .sim_signals import SetableSignal, ReadOnlySignal, ComputedReadOnlySignal + +from .sim_frameworks import SlitLookup diff --git a/ophyd_devices/sim/sim.py b/ophyd_devices/sim/sim.py index 58be827..d390592 100644 --- a/ophyd_devices/sim/sim.py +++ b/ophyd_devices/sim/sim.py @@ -1,3 +1,4 @@ +from collections import defaultdict import os import threading import time as ttime @@ -68,11 +69,17 @@ class SimMonitor(Device): self.precision = precision self.init_sim_params = sim_init self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs) + self._lookup_table = [] super().__init__(name=name, parent=parent, kind=kind, **kwargs) self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None) self.readback.name = self.name + @property + def lookup_table(self) -> None: + """lookup_table property""" + return self._lookup_table + class SimCamera(Device): """A simulated device mimic any 2D camera. @@ -127,6 +134,7 @@ class SimCamera(Device): ): self.device_manager = device_manager self.init_sim_params = sim_init + self._lookup_table = [] self.sim = self.sim_cls(parent=self, device_manager=device_manager, **kwargs) super().__init__(name=name, parent=parent, kind=kind, **kwargs) @@ -135,6 +143,11 @@ class SimCamera(Device): self.scaninfo = None self._update_scaninfo() + @property + def lookup_table(self) -> None: + """lookup_table property""" + return self._lookup_table + def trigger(self) -> DeviceStatus: """Trigger the camera to acquire images. @@ -277,6 +290,7 @@ class SimPositioner(Device, PositionerBase): self.precision = precision self.tolerance = tolerance self.init_sim_params = sim + self._lookup_table = [] self.speed = speed self.update_frequency = update_frequency @@ -286,7 +300,7 @@ class SimPositioner(Device, PositionerBase): # initialize inner dictionary with simulated state self.sim = self.sim_cls(parent=self, **kwargs) - super().__init__(name=name, labels=labels, kind=kind, **kwargs) + super().__init__(name=name, labels=labels, parent=parent, kind=kind, **kwargs) # Rename self.readback.name to self.name, also in self.sim_state self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None) self.readback.name = self.name @@ -311,6 +325,11 @@ class SimPositioner(Device, PositionerBase): """Return the high limit of the simulated device.""" return self.limits[1] + @property + def lookup_table(self) -> None: + """lookup_table property""" + return self._lookup_table + def check_value(self, value: any): """ Check that requested position is within existing limits. @@ -519,4 +538,4 @@ class SynDeviceSubOPAAS(Device): class SynDeviceOPAAS(Device): x = Cpt(SimPositioner, name="x") y = Cpt(SimPositioner, name="y") - z = Cpt(SimPositioner, name="z") + z = Cpt(SynDeviceSubOPAAS, name="z") diff --git a/ophyd_devices/sim/sim_data.py b/ophyd_devices/sim/sim_data.py index eab7618..3179503 100644 --- a/ophyd_devices/sim/sim_data.py +++ b/ophyd_devices/sim/sim_data.py @@ -3,7 +3,6 @@ from __future__ import annotations from collections import defaultdict import enum -import inspect import time as ttime import numpy as np @@ -52,37 +51,22 @@ class SimulatedDataBase: self._all_params = defaultdict(dict) self.device_manager = device_manager self._simulation_type = None - self.lookup_table = getattr(self.parent, "lookup_table", None) + self.lookup_table = getattr(self.parent, "lookup_table", []) self.init_paramaters(**kwargs) self._active_params = self._all_params.get(self._simulation_type, None) - # self.register_in_lookup_table() - - # self.lookup_table = self.update_lookup_table() - - # def update_lookup_table(self) -> None: - # """Update the lookup table with the new value for the signal.""" - # table = getattr(self.device_manager.lookup_table, self.parent.name, None) - - # return getattr(self.device_manager.lookup_table, self.parent.name, None) - - # def register_in_lookup_table(self) -> None: - # """Register the simulated device in the lookup table.""" - # self.device_manager.lookup_table[self.parent.name] = {"obj": self, "method": "_compute_sim_state", "args": (), "kwargs": {}} def execute_simulation_method(self, *args, method=None, **kwargs) -> any: """Execute the method from the lookup table.""" - - if self.lookup_table and self.parent.name in self.lookup_table: - # obj = self.parent.lookup_table[self.parent.name]["obj"] - method = self.lookup_table[self.parent.name]["method"] - args = self.lookup_table[self.parent.name]["args"] - kwargs = self.lookup_table[self.parent.name]["kwargs"] - # Do I need args and kwargs! Why!! + if self.lookup_table and self.device_manager.devices.get(self.lookup_table[0]) is not None: + sim_device = self.device_manager.devices.get(self.lookup_table[0]) + # pylint: disable=protected-access + if sim_device.enabled is True: + method = sim_device.obj.lookup[self.parent.name]["method"] + args = sim_device.obj.lookup[self.parent.name]["args"] + kwargs = sim_device.obj.lookup[self.parent.name]["kwargs"] if method is not None: - method_arguments = list(inspect.signature(method).parameters.keys()) - if all([True for arg in method_arguments if arg in args or arg in kwargs]): - return method(*args, **kwargs) + return method(*args, **kwargs) raise SimulatedDataException(f"Method {method} is not available for {self.parent.name}") def init_paramaters(self, **kwargs): @@ -330,7 +314,7 @@ class SimulatedDataCamera(SimulatedDataBase): }, }, SimulationType.GAUSSIAN: { - "amp": 500, + "amp": 100, "cen_off": np.array([0, 0]), "cov": np.array([[10, 5], [5, 10]]), "noise": NoiseType.NONE, diff --git a/ophyd_devices/sim/sim_frameworks.py b/ophyd_devices/sim/sim_frameworks.py index ad9f4b6..4c6f784 100644 --- a/ophyd_devices/sim/sim_frameworks.py +++ b/ophyd_devices/sim/sim_frameworks.py @@ -1,64 +1,95 @@ import numpy as np - from scipy.ndimage import gaussian_filter from collections import defaultdict from ophyd_devices.sim.sim_data import NoiseType +from ophyd_devices.utils.bec_device_base import BECDeviceBase -class PinholeLookup: - """Pinhole lookup table for simulated devices. +class DeviceProxy(BECDeviceBase): + """DeviceProxy class inherits from BECDeviceBase.""" - When activated, it will create a lookup table for a simulated camera based on the config. - The lookup table will be used to simulate the effect of a pinhole on the camera image. - An example config is shown below, for the dev.eiger, with dev.samx and dev.samy as reference motors. - eiger: - cen_off: [0, 0] # [x,y] - cov: [[1000, 500], [200, 1000]] # [[x,x],[y,y]] - pixel_size: 0.01 - signal: image - ref_motors: [samx, samy] - slit_width: [1, 2] - motor_dir: [0, 1] # x:0 , y:1, z:2 coordinates +class SlitLookup(DeviceProxy): """ + Simulation framework to immidate the behaviour of slits. + + This device is a proxy that is meant to overrides the behaviour of a SimCamera. + You may use this to simulate the effect of slits on the camera image. + + Parameters can be configured via the deviceConfig field in the device_config. + The example below shows the configuration for a pinhole simulation on an Eiger detector, + where the pinhole is defined by the position of motors samx and samy. These devices must + exist in your config. + + To update for instance the pixel_size directly, you can directly access the DeviceConfig via + `dev.eiger.get_device_config()` or update it `dev.eiger.get_device_config({'eiger' : {'pixel_size': 0.1}})` + + slit_sim: + readoutPriority: on_request + deviceClass: SlitLookup + deviceConfig: + eiger: + cen_off: [0, 0] # [x,y] + cov: [[1000, 500], [200, 1000]] # [[x,x],[y,y]] + pixel_size: 0.01 + ref_motors: [samx, samy] + slit_width: [1, 1] + motor_dir: [0, 1] # x:0 , y:1, z:2 coordinates + enabled: true + readOnly: false + """ + + USER_ACCESS = ["enabled", "lookup", "help"] def __init__( self, - *args, name, + *args, device_manager=None, - config: dict = None, **kwargs, ): self.name = name self.device_manager = device_manager - self.config = config - self._enabled = True + self.config = None self._lookup = defaultdict(dict) - self._gaussian_blur_sigma = 8 + self._gaussian_blur_sigma = 5 + super().__init__(name, *args, **kwargs) + + def help(self) -> None: + """Print documentation for the SlitLookup device.""" + print(self.__doc__) + + def _update_device_config(self, config: dict) -> None: + """Update the config from the device_config for the pinhole lookup table. + + Args: + config (dict): Config dictionary. + """ + self.config = config self._compile_lookup() @property def lookup(self): """lookup property""" - return ( - self._lookup - if getattr(self.device_manager.devices, self.name).enabled is True - else None - ) + return self._lookup + + @lookup.setter + def lookup(self, update: dict) -> None: + """lookup setter""" + self._lookup.update(update) def _compile_lookup(self): """Compile the lookup table for the simulated camera.""" for device_name in self.config.keys(): - self.lookup[device_name] = { - "obj": self, + self._lookup[device_name] = { + # "obj": self, "method": self._compute, - "args": {}, - "kwargs": {"device_name": device_name}, + "args": (device_name,), + "kwargs": {}, } - def _compute(self, *args, device_name: str = None, **kwargs) -> np.ndarray: + def _compute(self, device_name: str, *args, **kwargs) -> np.ndarray: """ Compute the lookup table for the simulated camera. It copies the sim_camera bevahiour and adds a mask to simulate the effect of a pinhole. @@ -69,7 +100,7 @@ class PinholeLookup: Returns: np.ndarray: Lookup table for the simulated camera. """ - device_obj = self.device_manager.devices.get(device_name) + device_obj = self.device_manager.devices.get(device_name).obj params = device_obj.sim._all_params.get("gauss") shape = device_obj.image_shape.get() params.update( @@ -87,7 +118,7 @@ class PinholeLookup: device_pos=device_pos, ref_motors=self.config[device_name]["ref_motors"], width=self.config[device_name]["slit_width"], - dir=self.config[device_name]["motor_dir"], + direction=self.config[device_name]["motor_dir"], ) valid_mask = self._blur_image(valid_mask, sigma=self._gaussian_blur_sigma) v *= valid_mask @@ -95,7 +126,7 @@ class PinholeLookup: v = device_obj.sim._add_hot_pixel(v, params["hot_pixel"]) return v - def _blur_image(self, image: np.ndarray, sigma: float = 5) -> np.ndarray: + def _blur_image(self, image: np.ndarray, sigma: float = 1) -> np.ndarray: """Blur the image with a gaussian filter. Args: @@ -108,14 +139,26 @@ class PinholeLookup: return gaussian_filter(image, sigma=sigma) def _create_mask( - self, device_pos: np.ndarray, ref_motors: list[str], width: list[float], dir: list[int] + self, + device_pos: np.ndarray, + ref_motors: list[str], + width: list[float], + direction: list[int], ): - mask = np.ones_like(device_pos, dtype=bool) + mask = np.ones_like(device_pos) for ii, motor_name in enumerate(ref_motors): - motor_pos = self.device_manager.devices.get(motor_name).read()[motor_name]["value"] + motor_pos = self.device_manager.devices.get(motor_name).obj.read()[motor_name]["value"] edges = [motor_pos + width[ii] / 2, motor_pos - width[ii] / 2] - mask[..., dir[ii]] = np.logical_and( - device_pos[..., dir[ii]] > np.min(edges), device_pos[..., dir[ii]] < np.max(edges) + mask[..., direction[ii]] = np.logical_and( + device_pos[..., direction[ii]] > np.min(edges), + device_pos[..., direction[ii]] < np.max(edges), ) return np.prod(mask, axis=2) + + +if __name__ == "__main__": + # Example usage + pinhole = SlitLookup(name="pinhole", device_manager=None) + pinhole.describe() + print(pinhole)