From 5496b59ae2254495845a0fae2754cdd935b4fb7b Mon Sep 17 00:00:00 2001 From: appel_c Date: Fri, 1 Mar 2024 13:03:37 +0100 Subject: [PATCH] feat: add proxy for h5 image replay for SimCamera --- ophyd_devices/__init__.py | 2 +- ophyd_devices/sim/sim_frameworks.py | 276 +++++++++++++++++++++++----- ophyd_devices/sim/sim_signals.py | 148 ++++++++++++++- 3 files changed, 379 insertions(+), 47 deletions(-) diff --git a/ophyd_devices/__init__.py b/ophyd_devices/__init__.py index 9b3073f..8205c20 100644 --- a/ophyd_devices/__init__.py +++ b/ophyd_devices/__init__.py @@ -16,7 +16,7 @@ from .sim.sim import SimPositioner from .sim.sim import SimPositioner as SynAxisOPAAS from .sim.sim import SynDeviceOPAAS from .sim.sim_signals import ReadOnlySignal -from .sim.sim_frameworks import DeviceProxy, SlitProxy +from .sim.sim_frameworks import DeviceProxy, SlitProxy, H5ImageReplayProxy from .sim.sim_signals import ReadOnlySignal as SynSignalRO from .sls_devices.sls_devices import SLSInfo, SLSOperatorMessages from .smaract.smaract_ophyd import SmaractMotor diff --git a/ophyd_devices/sim/sim_frameworks.py b/ophyd_devices/sim/sim_frameworks.py index fe13cfb..45eeb08 100644 --- a/ophyd_devices/sim/sim_frameworks.py +++ b/ophyd_devices/sim/sim_frameworks.py @@ -1,13 +1,82 @@ import numpy as np from scipy.ndimage import gaussian_filter +from abc import ABC, abstractmethod + +import h5py +import hdf5plugin + +from ophyd import Staged, Kind from collections import defaultdict from ophyd_devices.sim.sim_data import NoiseType +from ophyd_devices.sim.sim_signals import CustomSetableSignal from ophyd_devices.utils.bec_device_base import BECDeviceBase -class DeviceProxy(BECDeviceBase): - """DeviceProxy class inherits from BECDeviceBase.""" +class DeviceProxy(BECDeviceBase, ABC): + """DeviceProxy class inherits from BECDeviceBase. + + It is an abstract class that is meant to be used as a base class for all device proxies. + The minimum requirement for a device proxy is to implement the _compute method. + """ + + def __init__( + self, + name, + *args, + device_manager=None, + **kwargs, + ): + self.name = name + self.device_manager = device_manager + self.config = None + self._lookup = defaultdict(dict) + super().__init__(name, *args, device_manager=device_manager, **kwargs) + self._signals = dict() + + @property + def lookup(self): + """lookup property""" + return self._lookup + + @lookup.setter + def lookup(self, update: dict) -> None: + """lookup setter""" + self._lookup.update(update) + + def _update_device_config(self, config: dict) -> None: + """ + BEC will call this method on every object upon initializing devices to pass over the deviceConfig + from the config file. It can be conveniently be used to hand over initial parameters to the device. + + Args: + config (dict): Config dictionary. + """ + self.config = config + self._compile_lookup() + + def _compile_lookup(self): + """Compile the lookup table for the device.""" + for device_name in self.config.keys(): + self._lookup[device_name] = { + "method": self._compute, + "signal_name": self.config[device_name]["signal_name"], + "args": (device_name,), + "kwargs": {}, + } + + @abstractmethod + def _compute(self, device_name: str, *args, **kwargs) -> any: + """ + The purpose of this method is to compute the readback value for the signal of the device + that this proxy is attached to. This method is meant to be overriden by the user. + P + + Args: + device_name (str): Name of the device. + + Returns: + """ class SlitProxy(DeviceProxy): @@ -26,7 +95,7 @@ class SlitProxy(DeviceProxy): `dev.eiger.get_device_config()` or update it `dev.eiger.get_device_config({'eiger' : {'pixel_size': 0.1}})` slit_sim: - readoutPriority: on_request + readoutPriority: baseline deviceClass: SlitProxy deviceConfig: eiger: @@ -50,48 +119,13 @@ class SlitProxy(DeviceProxy): device_manager=None, **kwargs, ): - self.name = name - self.device_manager = device_manager - self.config = None - self._lookup = defaultdict(dict) self._gaussian_blur_sigma = 5 - super().__init__(name, *args, **kwargs) + super().__init__(name, *args, device_manager=device_manager, **kwargs) def help(self) -> None: """Print documentation for the SlitLookup device.""" print(self.__doc__) - def _update_device_config(self, config: dict) -> None: - """ - BEC will call this method on every object upon initializing devices to pass over the deviceConfig - from the config file. It can be conveniently be used to hand over initial parameters to the device. - - Args: - config (dict): Config dictionary. - """ - self.config = config - self._compile_lookup() - - @property - def lookup(self): - """lookup property""" - return self._lookup - - @lookup.setter - def lookup(self, update: dict) -> None: - """lookup setter""" - self._lookup.update(update) - - def _compile_lookup(self): - """Compile the lookup table for the simulated camera.""" - for device_name in self.config.keys(): - self._lookup[device_name] = { - "method": self._compute, - "signal_name": self.config[device_name]["signal_name"], - "args": (device_name,), - "kwargs": {}, - } - def _compute(self, device_name: str, *args, **kwargs) -> np.ndarray: """ Compute the lookup table for the simulated camera. @@ -173,8 +207,166 @@ class SlitProxy(DeviceProxy): return np.prod(mask, axis=2) +class H5ImageReplayProxy(DeviceProxy): + """This Proxy clas can be used to reply images from an h5 file. + + If the requested images is larger than the available iamges, the images will be replayed from the beginning. + + h5_image_sim: + readoutPriority: baseline + deviceClass: H5ImageReplayProxy + deviceConfig: + eiger: + signal_name: image + file_source: /path/to/h5file.h5 + h5_entry: /entry/data + enabled: true + readOnly: false + """ + + USER_ACCESS = ["file_source", "h5_entry"] + + def __init__( + self, + name, + *args, + device_manager=None, + **kwargs, + ): + self.h5_file = None + self.h5_dataset = None + self._number_of_images = None + self.mode = "r" + self._staged = Staged.no + self._image = None + self._index = 0 + super().__init__(name, *args, device_manager=device_manager, **kwargs) + self.file_source = CustomSetableSignal( + name="file_source", value="", parent=self, kind=Kind.normal + ) + self.h5_entry = CustomSetableSignal( + name="h5_entry", value="", parent=self, kind=Kind.normal + ) + + @property + def component_names(self) -> list[str]: + """Return the names of the components.""" + return ["file_source", "h5_entry"] + + def _update_device_config(self, config: dict) -> None: + super()._update_device_config(config) + if len(config.keys()) > 1: + raise RuntimeError( + f"The current implementation of device {self.name} can only data for a single device. The config hosts multiple keys {config.keys()}" + ) + self._init_signals() + + def _init_signals(self): + """Initialize the signals for the device.""" + if "file_source" in self.config[list(self.config.keys())[0]]: + self.file_source.set(self.config[list(self.config.keys())[0]]["file_source"]) + if "h5_entry" in self.config[list(self.config.keys())[0]]: + self.h5_entry.set(self.config[list(self.config.keys())[0]]["h5_entry"]) + + def _open_h5_file(self) -> None: + """Open an HDF5 fiel and return a reference to the dataset without loading its content. + + Args: + fname (str): File name. + enty (str): Entry name. + mode (str): Mode of the file, default "r". + + Returns: + h5py.Dataset: Reference to the dataset. + """ + self.h5_file = h5py.File(self.file_source.get(), mode=self.mode) + self.h5_dataset = self.h5_file[self.h5_entry.get()] + self._number_of_images = self.h5_dataset.shape[0] + + def _close_h5_file(self) -> None: + """Close the HDF5 file.""" + self.h5_file.close() + + def stop(self) -> None: + """Stop the device.""" + if self.h5_file: + self._close_h5_file() + self.h5_file = None + self.h5_dataset = None + self._number_of_images = None + self._index = 0 + + def stage(self) -> list[object]: + """Stage the device. + This opens the HDF5 file, unstaging will close it. + """ + + if self._staged != Staged.no: + return [self] + try: + self._open_h5_file() + except Exception as exc: + if self.h5_file: + self.stop() + raise FileNotFoundError( + f"Could not open h5file {self.file_source.get()} or access data set {self.h5_dataset.get()} in file" + ) from exc + + self._staged = Staged.yes + return [self] + + def unstage(self) -> list[object]: + """Unstage the device.""" + if self.h5_file: + self.stop() + self._staged = Staged.no + return [self] + + def _load_image(self): + """Get the image from the h5 file. + + Args: + index (int): Index of the image. + + Returns: + np.ndarray: Image. + """ + if self.h5_file: + slice_nr = self._index % self._number_of_images + self._index = self._index + 1 + self._image = self.h5_dataset[slice_nr, ...] + return + try: + self.stage() + slice_nr = self._index % self._number_of_images + self._index = self._index + 1 + self._image = self.h5_dataset[slice_nr, ...] + self.unstage() + except Exception as exc: + raise FileNotFoundError( + f"Could not open h5file {self.file_source.get()} or access data set {self.h5_dataset.get()} in file" + ) from exc + + def _compute(self, device_name: str, *args, **kwargs) -> np.ndarray: + """Compute the image. + + Returns: + np.ndarray: Image. + """ + self._load_image() + return self._image + + if __name__ == "__main__": # Example usage - pinhole = SlitProxy(name="pinhole", device_manager=None) - pinhole.describe() - print(pinhole) + tmp = H5ImageReplayProxy(name="tmp", device_manager=None) + config = { + "eiger": { + "signal_name": "image", + "file_source": "/Users/appel_c/switchdrive/Sharefolder/AgBH_2D_gridscan/projection_000006_data_000001.h5", + "h5_entry": "/entry/data/data", + } + } + tmp._update_device_config(config) + tmp.stage() + print(tmp) diff --git a/ophyd_devices/sim/sim_signals.py b/ophyd_devices/sim/sim_signals.py index 51c6004..40c864a 100644 --- a/ophyd_devices/sim/sim_signals.py +++ b/ophyd_devices/sim/sim_signals.py @@ -5,6 +5,8 @@ from bec_lib import bec_logger from ophyd import Signal, Kind from ophyd.utils import ReadOnlyError +from ophyd_devices.utils.bec_device_base import BECDeviceBase + logger = bec_logger.logger # Readout precision for Setable/ReadOnlySignal signals @@ -17,6 +19,8 @@ class SetableSignal(Signal): The signal will store the value in sim_state of the SimulatedData class of the parent device. It will also return the value from sim_state when get is called. Compared to the ReadOnlySignal, this signal can be written to. + The setable signal inherits from the Signal class of ophyd, thus the class attribute needs to be + initiated as a Component (class from ophyd). >>> signal = SetableSignal(name="signal", parent=parent, value=0) @@ -46,20 +50,25 @@ class SetableSignal(Signal): ) self._value = value self.precision = precision - self.sim = getattr(self.parent, "sim", self.parent) + self.sim = getattr(self.parent, "sim", None) self._update_sim_state(value) def _update_sim_state(self, value: any) -> None: """Update the readback value.""" - self.sim.update_sim_state(self.name, value) + if self.sim: + self.sim.update_sim_state(self.name, value) def _get_value(self) -> any: """Update the timestamp of the readback value.""" - return self.sim.sim_state[self.name]["value"] + if self.sim: + return self.sim.sim_state[self.name]["value"] + return self._value def _get_timestamp(self) -> any: """Update the timestamp of the readback value.""" - return self.sim.sim_state[self.name]["timestamp"] + if self.sim: + return self.sim.sim_state[self.name]["timestamp"] + return time.time() # pylint: disable=arguments-differ def get(self): @@ -100,6 +109,8 @@ class ReadOnlySignal(Signal): The readback will be computed from a function hosted in the SimulatedData class from the parent device if compute_readback is True. Else, it will return the value stored int sim.sim_state directly. + The readonly signal inherits from the Signal class of ophyd, thus the class attribute needs to be + initiated as a Component (class from ophyd). >>> signal = ComputedReadOnlySignal(name="signal", parent=parent, value=0, compute_readback=True) @@ -184,3 +195,132 @@ class ReadOnlySignal(Signal): if self.sim: return self._get_timestamp() return time.time() + + +class CustomSetableSignal(BECDeviceBase): + """Custom signal for simulated devices. The custom signal can be read-only, setable or computed. + In comparison to above, this signal is not a class from ophyd, but an own implementation of a signal. + + It works in the same fashion as the SetableSignal and ReadOnlySignal, however, it is + not needed to initiate it as a Component (ophyd) within the parent device class. + + >>> signal = SetableSignal(name="signal", parent=parent, value=0) + + Parameters + ---------- + + name (string) : Name of the signal + parent (object) : Parent object of the signal, default none. + value (any) : Initial value of the signal, default 0. + kind (int) : Kind of the signal, default Kind.normal. + precision (float) : Precision of the signal, default PRECISION. + """ + + USER_ACCESS = ["put", "get", "set"] + + def __init__( + self, + name: str, + *args, + parent=None, + value: any = 0, + kind: int = Kind.normal, + precision: float = PRECISION, + **kwargs, + ): + if parent: + name = f"{parent.name}_{name}" + super().__init__(*args, name=name, parent=parent, kind=kind, **kwargs) + self._metadata = {"connected": self.connected, "write_access": True} + self._value = value + self._timestamp = time.time() + self._dtype = type(value) + self._shape = self._get_shape(value) + self.precision = precision + self.sim = getattr(self.parent, "sim", None) + self._update_sim_state(value) + + def _get_shape(self, value: any) -> list: + """Get the shape of the value. + **Note: This logic is from ophyd, and replicated here. + There would be more sophisticated ways, but to keep it consistent, it is replicated here.** + """ + if isinstance(value, np.ndarray): + return list(value.shape) + if isinstance(value, list): + return len(value) + return [] + + def _update_sim_state(self, value: any) -> None: + """Update the readback value.""" + if self.sim: + self.sim.update_sim_state(self.name, value) + + def _get_value(self) -> any: + """Update the timestamp of the readback value.""" + if self.sim: + return self.sim.sim_state[self.name]["value"] + return self._value + + def _get_timestamp(self) -> any: + """Update the timestamp of the readback value.""" + if self.sim: + return self.sim.sim_state[self.name]["timestamp"] + return self._timestamp + + # pylint: disable=arguments-differ + def get(self): + """Get the current position of the simulated device. + + Core function for signal. + """ + self._value = self._get_value() + return self._value + + # pylint: disable=arguments-differ + def put(self, value): + """Put the value to the simulated device. + + Core function for signal. + """ + self._update_sim_state(value) + self._value = value + self._timestamp = time.time() + + def describe(self): + """Describe the readback signal. + + Core function for signal. + """ + res = { + self.name: { + "source": str(self.__class__), + "dtype": self._dtype, + "shape": self._shape, + } + } + if self.precision is not None: + res[self.name]["precision"] = self.precision + return res + + def set(self, value): + """Set method""" + self.put(value) + + @property + def timestamp(self): + """Timestamp of the readback value""" + return self._get_timestamp() + + def read(self): + """Read method""" + return { + self.name: { + "value": self.get(), + "timestamp": self.timestamp, + } + } + + def read_configuration(self): + """Read method""" + return self.read()