diff --git a/ophyd_devices/sim/sim_frameworks.py b/ophyd_devices/sim/sim_frameworks.py deleted file mode 100644 index 46a62ec..0000000 --- a/ophyd_devices/sim/sim_frameworks.py +++ /dev/null @@ -1,312 +0,0 @@ -from abc import ABC, abstractmethod -from collections import defaultdict - -import h5py - -# Necessary import to allow h5py to open compressed h5files. -# pylint: disable=unused-import -import hdf5plugin # noqa: F401 -import numpy as np -from ophyd import Component, Kind, Staged -from scipy.ndimage import gaussian_filter - -from ophyd_devices.sim.sim_data import NoiseType -from ophyd_devices.sim.sim_signals import SetableSignal -from ophyd_devices.utils.bec_device_base import BECDeviceBase - - -class DeviceProxy(BECDeviceBase, ABC): - """DeviceProxy class inherits from BECDeviceBase. - - It is an abstract class that is meant to be used as a base class for all device proxies. - The minimum requirement for a device proxy is to implement the _compute method. - """ - - def __init__(self, name, *args, device_manager=None, **kwargs): - self.name = name - self.device_manager = device_manager - self.config = None - self._lookup = defaultdict(dict) - super().__init__(name, *args, device_manager=device_manager, **kwargs) - self._signals = dict() - - @property - def lookup(self): - """lookup property""" - return self._lookup - - @lookup.setter - def lookup(self, update: dict) -> None: - """lookup setter""" - self._lookup.update(update) - - def _update_device_config(self, config: dict) -> None: - """ - BEC will call this method on every object upon initializing devices to pass over the deviceConfig - from the config file. It can be conveniently be used to hand over initial parameters to the device. - - Args: - config (dict): Config dictionary. - """ - self.config = config - self._compile_lookup() - - def _compile_lookup(self): - """Compile the lookup table for the device.""" - for device_name in self.config.keys(): - self._lookup[device_name] = { - "method": self._compute, - "signal_name": self.config[device_name]["signal_name"], - "args": (device_name,), - "kwargs": {}, - } - - @abstractmethod - def _compute(self, device_name: str, *args, **kwargs) -> any: - """ - The purpose of this method is to compute the readback value for the signal of the device - that this proxy is attached to. This method is meant to be overriden by the user. - P - - Args: - device_name (str): Name of the device. - - Returns: - """ - - -class SlitProxy(DeviceProxy): - """ - Simulation framework to immidate the behaviour of slits. - - This device is a proxy that is meant to overrides the behaviour of a SimCamera. - You may use this to simulate the effect of slits on the camera image. - - Parameters can be configured via the deviceConfig field in the device_config. - The example below shows the configuration for a pinhole simulation on an Eiger detector, - where the pinhole is defined by the position of motors samx and samy. These devices must - exist in your config. - - To update for instance the pixel_size directly, you can directly access the DeviceConfig via - `dev.eiger.get_device_config()` or update it `dev.eiger.get_device_config({'eiger' : {'pixel_size': 0.1}})` - - An example for the configuration of this is device is in ophyd_devices.configs.ophyd_devices_simulation.yaml - """ - - USER_ACCESS = ["enabled", "lookup", "help"] - - def __init__(self, name, *args, device_manager=None, **kwargs): - self._gaussian_blur_sigma = 5 - super().__init__(name, *args, device_manager=device_manager, **kwargs) - - def help(self) -> None: - """Print documentation for the SlitLookup device.""" - print(self.__doc__) - - def _compute(self, device_name: str, *args, **kwargs) -> np.ndarray: - """ - Compute the lookup table for the simulated camera. - It copies the sim_camera bevahiour and adds a mask to simulate the effect of a pinhole. - - Args: - device_name (str): Name of the device. - signal_name (str): Name of the signal. - - Returns: - np.ndarray: Lookup table for the simulated camera. - """ - device_obj = self.device_manager.devices.get(device_name).obj - params = device_obj.sim.params - shape = device_obj.image_shape.get() - params.update( - { - "noise": NoiseType.POISSON, - "covariance": np.array(self.config[device_name]["covariance"]), - "center_offset": np.array(self.config[device_name]["center_offset"]), - } - ) - amp = params.get("amplitude") - cov = params.get("covariance") - cen_off = params.get("center_offset") - - pos, offset, cov, amp = device_obj.sim._prepare_params_gauss( - amp=amp, cov=cov, offset=cen_off, shape=shape - ) - v = device_obj.sim._compute_multivariate_gaussian(pos=pos, cen_off=offset, cov=cov, amp=amp) - device_pos = self.config[device_name]["pixel_size"] * pos - valid_mask = self._create_mask( - device_pos=device_pos, - ref_motors=self.config[device_name]["ref_motors"], - width=self.config[device_name]["slit_width"], - direction=self.config[device_name]["motor_dir"], - ) - valid_mask = self._blur_image(valid_mask, sigma=self._gaussian_blur_sigma) - v *= valid_mask - v = device_obj.sim._add_noise( - v, noise=params["noise"], noise_multiplier=params["noise_multiplier"] - ) - v = device_obj.sim._add_hot_pixel( - v, - coords=params["hot_pixel_coords"], - hot_pixel_types=params["hot_pixel_types"], - values=params["hot_pixel_values"], - ) - return v - - def _blur_image(self, image: np.ndarray, sigma: float = 1) -> np.ndarray: - """Blur the image with a gaussian filter. - - Args: - image (np.ndarray): Image to be blurred. - sigma (float): Sigma for the gaussian filter. - - Returns: - np.ndarray: Blurred image. - """ - return gaussian_filter(image, sigma=sigma) - - def _create_mask( - self, - device_pos: np.ndarray, - ref_motors: list[str], - width: list[float], - direction: list[int], - ): - mask = np.ones_like(device_pos) - for ii, motor_name in enumerate(ref_motors): - motor_pos = self.device_manager.devices.get(motor_name).obj.read()[motor_name]["value"] - edges = [motor_pos + width[ii] / 2, motor_pos - width[ii] / 2] - mask[..., direction[ii]] = np.logical_and( - device_pos[..., direction[ii]] > np.min(edges), - device_pos[..., direction[ii]] < np.max(edges), - ) - - return np.prod(mask, axis=2) - - -class H5ImageReplayProxy(DeviceProxy): - """This Proxy class can be used to replay images from an h5 file. - - If the number of requested images is larger than the number of available iamges, - the images will be replayed from the beginning. - - An example for the configuration of this is device is in ophyd_devices.configs.ophyd_devices_simulation.yaml - """ - - USER_ACCESS = ["file_source", "h5_entry"] - - def __init__(self, name, *args, device_manager=None, **kwargs): - self.h5_file = None - self.h5_dataset = None - self._number_of_images = None - self._staged = Staged.no - self._image = None - self._index = 0 - self._file_source = "" - self._h5_entry = "" - super().__init__(name, *args, device_manager=device_manager, **kwargs) - - @property - def file_source(self) -> str: - """File source property.""" - return self._file_source - - @file_source.setter - def file_source(self, file_source: str) -> None: - self._file_source = file_source - - @property - def h5_entry(self) -> str: - """H5 entry property.""" - return self._h5_entry - - @h5_entry.setter - def h5_entry(self, h5_entry: str) -> None: - self._h5_entry = h5_entry - - def _update_device_config(self, config: dict) -> None: - super()._update_device_config(config) - if len(config.keys()) > 1: - raise RuntimeError( - f"The current implementation of device {self.name} can only replay data for a single device. The config has information about multiple devices {config.keys()}" - ) - self._init_signals() - - def _init_signals(self): - """Initialize the signals for the device.""" - if "file_source" in self.config[list(self.config.keys())[0]]: - self.file_source = self.config[list(self.config.keys())[0]]["file_source"] - if "h5_entry" in self.config[list(self.config.keys())[0]]: - self.h5_entry = self.config[list(self.config.keys())[0]]["h5_entry"] - - def _open_h5_file(self) -> None: - """Opens the HDF5 file found in the file_source signal and the HDF5 dataset specified by the h5_entry signal.""" - self.h5_file = h5py.File(self.file_source, mode="r") - self.h5_dataset = self.h5_file[self.h5_entry] - self._number_of_images = self.h5_dataset.shape[0] - - def _close_h5_file(self) -> None: - """Close the HDF5 file.""" - self.h5_file.close() - - def stop(self) -> None: - """Stop the device.""" - if self.h5_file: - self._close_h5_file() - self.h5_file = None - self.h5_dataset = None - self._number_of_images = None - self._index = 0 - - def stage(self) -> list[object]: - """Stage the device. - This opens the HDF5 dataset, unstaging will close it. - """ - - if self._staged != Staged.no: - return [self] - try: - self._open_h5_file() - except Exception as exc: - if self.h5_file: - self.stop() - raise FileNotFoundError( - f"Could not open h5file {self.file_source} or access data set {self.h5_dataset} in file" - ) from exc - - self._staged = Staged.yes - return [self] - - def unstage(self) -> list[object]: - """Unstage the device, also closes the HDF5 dataset""" - if self.h5_file: - self.stop() - self._staged = Staged.no - return [self] - - def _load_image(self): - """Try loading the image from the h5 dataset, and set it to self._image.""" - if self.h5_file: - slice_nr = self._index % self._number_of_images - self._index = self._index + 1 - self._image = self.h5_dataset[slice_nr, ...] - return - try: - self.stage() - slice_nr = self._index % self._number_of_images - self._index = self._index + 1 - self._image = self.h5_dataset[slice_nr, ...] - self.unstage() - except Exception as exc: - raise FileNotFoundError( - f"Could not open h5file {self.file_source} or access data set {self.h5_dataset} in file" - ) from exc - - def _compute(self, device_name: str, *args, **kwargs) -> np.ndarray: - """Compute the image. - - Returns: - np.ndarray: Image. - """ - self._load_image() - return self._image diff --git a/ophyd_devices/sim/sim_frameworks/__init__.py b/ophyd_devices/sim/sim_frameworks/__init__.py new file mode 100644 index 0000000..48310fe --- /dev/null +++ b/ophyd_devices/sim/sim_frameworks/__init__.py @@ -0,0 +1,5 @@ +from ophyd_devices.sim.sim_frameworks.device_proxy import DeviceProxy +from ophyd_devices.sim.sim_frameworks.h5_image_replay_proxy import H5ImageReplayProxy +from ophyd_devices.sim.sim_frameworks.slit_proxy import SlitProxy + +__all__ = ["DeviceProxy", "H5ImageReplayProxy", "SlitProxy"] diff --git a/ophyd_devices/sim/sim_frameworks/device_proxy.py b/ophyd_devices/sim/sim_frameworks/device_proxy.py new file mode 100644 index 0000000..fb3ed4c --- /dev/null +++ b/ophyd_devices/sim/sim_frameworks/device_proxy.py @@ -0,0 +1,64 @@ +from abc import ABC, abstractmethod +from collections import defaultdict + +from ophyd_devices.utils.bec_device_base import BECDeviceBase + + +class DeviceProxy(BECDeviceBase, ABC): + """DeviceProxy class inherits from BECDeviceBase. + + It is an abstract class that is meant to be used as a base class for all device proxies. + The minimum requirement for a device proxy is to implement the _compute method. + """ + + def __init__(self, name, *args, device_manager=None, **kwargs): + self.name = name + self.device_manager = device_manager + self.config = None + self._lookup = defaultdict(dict) + super().__init__(name, *args, device_manager=device_manager, **kwargs) + self._signals = dict() + + @property + def lookup(self): + """lookup property""" + return self._lookup + + @lookup.setter + def lookup(self, update: dict) -> None: + """lookup setter""" + self._lookup.update(update) + + def _update_device_config(self, config: dict) -> None: + """ + BEC will call this method on every object upon initializing devices to pass over the deviceConfig + from the config file. It can be conveniently be used to hand over initial parameters to the device. + + Args: + config (dict): Config dictionary. + """ + self.config = config + self._compile_lookup() + + def _compile_lookup(self): + """Compile the lookup table for the device.""" + for device_name in self.config.keys(): + self._lookup[device_name] = { + "method": self._compute, + "signal_name": self.config[device_name]["signal_name"], + "args": (device_name,), + "kwargs": {}, + } + + @abstractmethod + def _compute(self, device_name: str, *args, **kwargs) -> any: + """ + The purpose of this method is to compute the readback value for the signal of the device + that this proxy is attached to. This method is meant to be overriden by the user. + P + + Args: + device_name (str): Name of the device. + + Returns: + """ diff --git a/ophyd_devices/sim/sim_frameworks/h5_image_replay_proxy.py b/ophyd_devices/sim/sim_frameworks/h5_image_replay_proxy.py new file mode 100644 index 0000000..2570838 --- /dev/null +++ b/ophyd_devices/sim/sim_frameworks/h5_image_replay_proxy.py @@ -0,0 +1,138 @@ +import h5py + +# Necessary import to allow h5py to open compressed h5files. +# pylint: disable=unused-import +import hdf5plugin # noqa: F401 +import numpy as np +from ophyd import Component, Kind, Staged +from scipy.ndimage import gaussian_filter + +from ophyd_devices.sim.sim_frameworks.device_proxy import DeviceProxy + + +class H5ImageReplayProxy(DeviceProxy): + """This Proxy class can be used to replay images from an h5 file. + + If the number of requested images is larger than the number of available iamges, + the images will be replayed from the beginning. + + An example for the configuration of this is device is in ophyd_devices.configs.ophyd_devices_simulation.yaml + """ + + USER_ACCESS = ["file_source", "h5_entry"] + + def __init__(self, name, *args, device_manager=None, **kwargs): + self.h5_file = None + self.h5_dataset = None + self._number_of_images = None + self._staged = Staged.no + self._image = None + self._index = 0 + self._file_source = "" + self._h5_entry = "" + super().__init__(name, *args, device_manager=device_manager, **kwargs) + + @property + def file_source(self) -> str: + """File source property.""" + return self._file_source + + @file_source.setter + def file_source(self, file_source: str) -> None: + self._file_source = file_source + + @property + def h5_entry(self) -> str: + """H5 entry property.""" + return self._h5_entry + + @h5_entry.setter + def h5_entry(self, h5_entry: str) -> None: + self._h5_entry = h5_entry + + def _update_device_config(self, config: dict) -> None: + super()._update_device_config(config) + if len(config.keys()) > 1: + raise RuntimeError( + f"The current implementation of device {self.name} can only replay data for a single device. The config has information about multiple devices {config.keys()}" + ) + self._init_signals() + + def _init_signals(self): + """Initialize the signals for the device.""" + if "file_source" in self.config[list(self.config.keys())[0]]: + self.file_source = self.config[list(self.config.keys())[0]]["file_source"] + if "h5_entry" in self.config[list(self.config.keys())[0]]: + self.h5_entry = self.config[list(self.config.keys())[0]]["h5_entry"] + + def _open_h5_file(self) -> None: + """Opens the HDF5 file found in the file_source signal and the HDF5 dataset specified by the h5_entry signal.""" + self.h5_file = h5py.File(self.file_source, mode="r") + self.h5_dataset = self.h5_file[self.h5_entry] + self._number_of_images = self.h5_dataset.shape[0] + + def _close_h5_file(self) -> None: + """Close the HDF5 file.""" + self.h5_file.close() + + def stop(self) -> None: + """Stop the device.""" + if self.h5_file: + self._close_h5_file() + self.h5_file = None + self.h5_dataset = None + self._number_of_images = None + self._index = 0 + + def stage(self) -> list[object]: + """Stage the device. + This opens the HDF5 dataset, unstaging will close it. + """ + + if self._staged != Staged.no: + return [self] + try: + self._open_h5_file() + except Exception as exc: + if self.h5_file: + self.stop() + raise FileNotFoundError( + f"Could not open h5file {self.file_source} or access data set {self.h5_dataset} in file" + ) from exc + + self._staged = Staged.yes + return [self] + + def unstage(self) -> list[object]: + """Unstage the device, also closes the HDF5 dataset""" + if self.h5_file: + self.stop() + self._staged = Staged.no + return [self] + + def _load_image(self): + """Try loading the image from the h5 dataset, and set it to self._image.""" + if self.h5_file: + slice_nr = self._index % self._number_of_images + self._index = self._index + 1 + self._image = self.h5_dataset[slice_nr, ...] + return + try: + self.stage() + slice_nr = self._index % self._number_of_images + self._index = self._index + 1 + self._image = self.h5_dataset[slice_nr, ...] + self.unstage() + except Exception as exc: + raise FileNotFoundError( + f"Could not open h5file {self.file_source} or access data set {self.h5_dataset} in file" + ) from exc + + def _compute(self, device_name: str, *args, **kwargs) -> np.ndarray: + """Compute the image. + + Returns: + np.ndarray: Image. + """ + self._load_image() + return self._image diff --git a/ophyd_devices/sim/sim_frameworks/slit_proxy.py b/ophyd_devices/sim/sim_frameworks/slit_proxy.py new file mode 100644 index 0000000..96e6652 --- /dev/null +++ b/ophyd_devices/sim/sim_frameworks/slit_proxy.py @@ -0,0 +1,114 @@ +import numpy as np +from scipy.ndimage import gaussian_filter + +from ophyd_devices.sim.sim_data import NoiseType +from ophyd_devices.sim.sim_frameworks.device_proxy import DeviceProxy + + +class SlitProxy(DeviceProxy): + """ + Simulation framework to immitate the behaviour of slits. + + This device is a proxy that is meant to override the behaviour of a SimCamera. + You may use this to simulate the effect of slits on the camera image. + + Parameters can be configured via the DeviceConfig field in the device_config. + The example below shows the configuration for a pinhole simulation on an Eiger detector, + where the pinhole is defined by the position of motors samx and samy. These devices must + exist in your config. + + To update for instance the pixel_size directly, you can directly access the DeviceConfig via + `dev.eiger.get_device_config()` or update it `dev.eiger.set_device_config({'eiger' : {'pixel_size': 0.1}})` + + An example for the configuration of this is device is in ophyd_devices.configs.ophyd_devices_simulation.yaml + """ + + USER_ACCESS = ["enabled", "lookup", "help"] + + def __init__(self, name, *args, device_manager=None, **kwargs): + self._gaussian_blur_sigma = 5 + super().__init__(name, *args, device_manager=device_manager, **kwargs) + + def help(self) -> None: + """Print documentation for the SlitLookup device.""" + print(self.__doc__) + + def _compute(self, device_name: str, *args, **kwargs) -> np.ndarray: + """ + Compute the lookup table for the simulated camera. + It copies the sim_camera behaviour and adds a mask to simulate the effect of a pinhole. + + Args: + device_name (str): Name of the device. + signal_name (str): Name of the signal. + + Returns: + np.ndarray: Lookup table for the simulated camera. + """ + device_obj = self.device_manager.devices.get(device_name).obj + params = device_obj.sim.params + shape = device_obj.image_shape.get() + params.update( + { + "noise": NoiseType.POISSON, + "covariance": np.array(self.config[device_name]["covariance"]), + "center_offset": np.array(self.config[device_name]["center_offset"]), + } + ) + amp = params.get("amplitude") + cov = params.get("covariance") + cen_off = params.get("center_offset") + + pos, offset, cov, amp = device_obj.sim._prepare_params_gauss( + amp=amp, cov=cov, offset=cen_off, shape=shape + ) + v = device_obj.sim._compute_multivariate_gaussian(pos=pos, cen_off=offset, cov=cov, amp=amp) + device_pos = self.config[device_name]["pixel_size"] * pos + valid_mask = self._create_mask( + device_pos=device_pos, + ref_motors=self.config[device_name]["ref_motors"], + width=self.config[device_name]["slit_width"], + direction=self.config[device_name]["motor_dir"], + ) + valid_mask = self._blur_image(valid_mask, sigma=self._gaussian_blur_sigma) + v *= valid_mask + v = device_obj.sim._add_noise( + v, noise=params["noise"], noise_multiplier=params["noise_multiplier"] + ) + v = device_obj.sim._add_hot_pixel( + v, + coords=params["hot_pixel_coords"], + hot_pixel_types=params["hot_pixel_types"], + values=params["hot_pixel_values"], + ) + return v + + def _blur_image(self, image: np.ndarray, sigma: float = 1) -> np.ndarray: + """Blur the image with a gaussian filter. + + Args: + image (np.ndarray): Image to be blurred. + sigma (float): Sigma for the gaussian filter. + + Returns: + np.ndarray: Blurred image. + """ + return gaussian_filter(image, sigma=sigma) + + def _create_mask( + self, + device_pos: np.ndarray, + ref_motors: list[str], + width: list[float], + direction: list[int], + ): + mask = np.ones_like(device_pos) + for ii, motor_name in enumerate(ref_motors): + motor_pos = self.device_manager.devices.get(motor_name).obj.read()[motor_name]["value"] + edges = [motor_pos + width[ii] / 2, motor_pos - width[ii] / 2] + mask[..., direction[ii]] = np.logical_and( + device_pos[..., direction[ii]] > np.min(edges), + device_pos[..., direction[ii]] < np.max(edges), + ) + + return np.prod(mask, axis=2)