diff --git a/ophyd_devices/sim/sim.py b/ophyd_devices/sim/sim.py index 496710d..58be827 100644 --- a/ophyd_devices/sim/sim.py +++ b/ophyd_devices/sim/sim.py @@ -251,7 +251,6 @@ class SimPositioner(Device, PositionerBase): low_limit_travel = Cpt(SetableSignal, value=0, kind=Kind.omitted) unused = Cpt(Signal, value=1, kind=Kind.omitted) - # TODO add short description to these two lines and explain what this does SUB_READBACK = "readback" _default_sub = SUB_READBACK diff --git a/ophyd_devices/sim/sim_data.py b/ophyd_devices/sim/sim_data.py index 22dbc8f..eab7618 100644 --- a/ophyd_devices/sim/sim_data.py +++ b/ophyd_devices/sim/sim_data.py @@ -1,6 +1,9 @@ -from abc import ABC, abstractmethod +from __future__ import annotations + from collections import defaultdict + import enum +import inspect import time as ttime import numpy as np @@ -28,6 +31,13 @@ class NoiseType(str, enum.Enum): POISSON = "poisson" +class HotPixelType(str, enum.Enum): + """Type of hot pixel to add to simulated data.""" + + CONSTANT = "constant" + FLUCTUATING = "fluctuating" + + class SimulatedDataBase: USER_ACCESS = [ "get_sim_params", @@ -38,12 +48,42 @@ class SimulatedDataBase: def __init__(self, *args, parent=None, device_manager=None, **kwargs) -> None: self.parent = parent - self.sim_state = defaultdict(lambda: {}) - self._all_params = defaultdict(lambda: {}) + self.sim_state = defaultdict(dict) + self._all_params = defaultdict(dict) self.device_manager = device_manager self._simulation_type = None + self.lookup_table = getattr(self.parent, "lookup_table", None) self.init_paramaters(**kwargs) self._active_params = self._all_params.get(self._simulation_type, None) + # self.register_in_lookup_table() + + # self.lookup_table = self.update_lookup_table() + + # def update_lookup_table(self) -> None: + # """Update the lookup table with the new value for the signal.""" + # table = getattr(self.device_manager.lookup_table, self.parent.name, None) + + # return getattr(self.device_manager.lookup_table, self.parent.name, None) + + # def register_in_lookup_table(self) -> None: + # """Register the simulated device in the lookup table.""" + # self.device_manager.lookup_table[self.parent.name] = {"obj": self, "method": "_compute_sim_state", "args": (), "kwargs": {}} + + def execute_simulation_method(self, *args, method=None, **kwargs) -> any: + """Execute the method from the lookup table.""" + + if self.lookup_table and self.parent.name in self.lookup_table: + # obj = self.parent.lookup_table[self.parent.name]["obj"] + method = self.lookup_table[self.parent.name]["method"] + args = self.lookup_table[self.parent.name]["args"] + kwargs = self.lookup_table[self.parent.name]["kwargs"] + # Do I need args and kwargs! Why!! + + if method is not None: + method_arguments = list(inspect.signature(method).parameters.keys()) + if all([True for arg in method_arguments if arg in args or arg in kwargs]): + return method(*args, **kwargs) + raise SimulatedDataException(f"Method {method} is not available for {self.parent.name}") def init_paramaters(self, **kwargs): """Initialize the parameters for the Simulated Data @@ -119,13 +159,16 @@ class SimulatedDataBase: self.sim_state[signal_name]["value"] = value self.sim_state[signal_name]["timestamp"] = ttime.time() - def _update_init_params(self, sim_type_default: SimulationType) -> None: + def _update_init_params( + self, + sim_type_default: SimulationType, + ) -> None: """Update the initial parameters of the simulated data with input from deviceConfig. Args: sim_type_default (SimulationType): Default simulation type to use if not specified in deviceConfig. """ - init_params = self.parent.init_sim_params + init_params = getattr(self.parent, "init_sim_params", None) for sim_type in self._all_params.values(): for sim_type_config_element in sim_type: if init_params: @@ -191,10 +234,13 @@ class SimulatedDataMonitor(SimulatedDataBase): signal_name (str): Name of the signal to update. """ if self.get_sim_type() == SimulationType.CONSTANT: - value = self._compute_constant() + method = "_compute_constant" + # value = self._compute_constant() elif self.get_sim_type() == SimulationType.GAUSSIAN: - value = self._compute_gaussian() + method = "_compute_gaussian" + # value = self._compute_gaussian() + value = self.execute_simulation_method(method=getattr(self, method)) self.update_sim_state(signal_name, value) def _compute_constant(self) -> float: @@ -210,12 +256,10 @@ class SimulatedDataMonitor(SimulatedDataBase): v = self._active_params["amp"] return v else: - # TODO Propagate msg to client! - logger.warning( + raise SimulatedDataException( f"Unknown noise type {self._active_params['noise']}. Please choose from 'poisson'," - " 'uniform' or 'none'. Returning 0." + " 'uniform' or 'none'." ) - return 0 def _compute_gaussian(self) -> float: """Computes return value for sim_type = "gauss". @@ -243,12 +287,9 @@ class SimulatedDataMonitor(SimulatedDataBase): v += np.random.uniform(-1, 1) * params["noise_multiplier"] return v except SimulatedDataException as exc: - # TODO Propagate msg to client! - logger.warning( - f"Could not compute gaussian for {params['ref_motor']} with {exc} raised." - "Returning 0 instead." - ) - return 0 + raise SimulatedDataException( + f"Could not compute gaussian for {self.parent.name} with {exc} raised. Deactivate eiger to continue." + ) from exc class SimulatedDataCamera(SimulatedDataBase): @@ -282,13 +323,27 @@ class SimulatedDataCamera(SimulatedDataBase): "amp": 100, "noise": NoiseType.POISSON, "noise_multiplier": 0.1, + "hot_pixel": { + "coords": np.array([[100, 100], [200, 200]]), + "type": [HotPixelType.CONSTANT, HotPixelType.FLUCTUATING], + "value": [1e6, 1e4], + }, }, SimulationType.GAUSSIAN: { - "amp": 100, + "amp": 500, "cen_off": np.array([0, 0]), "cov": np.array([[10, 5], [5, 10]]), "noise": NoiseType.NONE, "noise_multiplier": 0.1, + "hot_pixel": { + "coords": np.array([[240, 240], [50, 20], [40, 400]]), + "type": [ + HotPixelType.FLUCTUATING, + HotPixelType.CONSTANT, + HotPixelType.FLUCTUATING, + ], + "value": np.array([1e4, 1e6, 1e4]), + }, }, } # Update init parameters and set simulation type to Gaussian if not specified otherwise in init_sim_params @@ -304,36 +359,33 @@ class SimulatedDataCamera(SimulatedDataBase): signal_name (str): Name of the signal to update. """ if self.get_sim_type() == SimulationType.CONSTANT: - value = self._compute_constant() + method = "_compute_constant" + # value = self._compute_constant() elif self.get_sim_type() == SimulationType.GAUSSIAN: - value = self._compute_gaussian() + method = "_compute_gaussian" + # value = self._compute_gaussian() + + value = self.execute_simulation_method(method=getattr(self, method)) self.update_sim_state(signal_name, value) def _compute_constant(self) -> float: """Compute a return value for sim_type = Constant.""" - - # tuple with shape - shape = self.sim_state[self.parent.image_shape.name]["value"] - v = self._active_params["amp"] * np.ones(shape, dtype=np.uint16) - if self._active_params["noise"] == NoiseType.POISSON: - v = np.random.poisson(np.round(v), v.shape) - return v - if self._active_params["noise"] == NoiseType.UNIFORM: - multiplier = self._active_params["noise_multiplier"] - v += np.random.randint(-multiplier, multiplier, v.shape) - return v - if self._active_params["noise"] == NoiseType.NONE: - return v - # TODO Propagate msg to client! - logger.warning( - f"Unknown noise type {self._active_params['noise']}. Please choose from 'poisson'," - " 'uniform' or 'none'. Returning 0." - ) - return 0 + try: + shape = self.sim_state[self.parent.image_shape.name]["value"] + v = self._active_params["amp"] * np.ones(shape, dtype=np.uint16) + return self._add_noise(v, self._active_params["noise"]) + except SimulatedDataException as exc: + raise SimulatedDataException( + f"Could not compute constant for {self.parent.name} with {exc} raised. Deactivate eiger to continue." + ) from exc def _compute_multivariate_gaussian( - self, pos: np.ndarray, cen_off: np.ndarray, cov: np.ndarray + self, + pos: np.ndarray | list, + cen_off: np.ndarray | list, + cov: np.ndarray | list, + amp: float, ) -> np.ndarray: """Computes and returns the multivariate Gaussian distribution. @@ -345,16 +397,80 @@ class SimulatedDataCamera(SimulatedDataBase): Returns: np.ndarray: Multivariate Gaussian distribution. """ - + if isinstance(pos, list): + pos = np.array(pos) + if isinstance(cen_off, list): + cen_off = np.array(cen_off) + if isinstance(cov, list): + cov = np.array(cov) dim = cen_off.shape[0] cov_det = np.linalg.det(cov) cov_inv = np.linalg.inv(cov) - N = np.sqrt((2 * np.pi) ** dim * cov_det) + norm = np.sqrt((2 * np.pi) ** dim * cov_det) # This einsum call calculates (x-mu)T.Sigma-1.(x-mu) in a vectorized # way across all the input variables. fac = np.einsum("...k,kl,...l->...", pos - cen_off, cov_inv, pos - cen_off) + v = np.exp(-fac / 2) / norm + v *= amp / np.max(v) + return v - return np.exp(-fac / 2) / N + def _prepare_params_gauss(self, params: dict, shape: tuple) -> tuple: + """Prepare the positions for the gaussian. + + Args: + params (dict): Parameters for the gaussian. + shape (tuple): Shape of the image. + Returns: + tuple: Positions, offset and covariance matrix for the gaussian. + """ + x, y = np.meshgrid( + np.linspace(-shape[0] / 2, shape[0] / 2, shape[0]), + np.linspace(-shape[1] / 2, shape[1] / 2, shape[1]), + ) + pos = np.empty((*x.shape, 2)) + pos[:, :, 0] = x + pos[:, :, 1] = y + + offset = params["cen_off"] + cov = params["cov"] + amp = params["amp"] + return pos, offset, cov, amp + + def _add_noise(self, v: np.ndarray, noise: NoiseType) -> np.ndarray: + """Add noise to the simulated data. + + Args: + v (np.ndarray): Simulated data. + noise (NoiseType): Type of noise to add. + """ + if noise == NoiseType.POISSON: + v = np.random.poisson(np.round(v), v.shape) + return v + if noise == NoiseType.UNIFORM: + multiplier = self._active_params["noise_multiplier"] + v += np.random.uniform(-multiplier, multiplier, v.shape) + return v + if self._active_params["noise"] == NoiseType.NONE: + return v + + def _add_hot_pixel(self, v: np.ndarray, hot_pixel: dict) -> np.ndarray: + """Add hot pixels to the simulated data. + + Args: + v (np.ndarray): Simulated data. + hot_pixel (dict): Hot pixel parameters. + """ + for coords, hot_pixel_type, value in zip( + hot_pixel["coords"], hot_pixel["type"], hot_pixel["value"] + ): + if coords[0] < v.shape[0] and coords[1] < v.shape[1]: + if hot_pixel_type == HotPixelType.CONSTANT: + v[coords[0], coords[1]] = value + elif hot_pixel_type == HotPixelType.FLUCTUATING: + maximum = np.max(v) if np.max(v) != 0 else 1 + if v[coords[0], coords[1]] / maximum > 0.5: + v[coords[0], coords[1]] = value + return v def _compute_gaussian(self) -> float: """Computes return value for sim_type = "gauss". @@ -367,41 +483,15 @@ class SimulatedDataCamera(SimulatedDataBase): Returns: float """ - params = self._active_params - shape = self.sim_state[self.parent.image_shape.name]["value"] try: - X, Y = np.meshgrid( - np.linspace(-shape[0] / 2, shape[0] / 2, shape[0]), - np.linspace(-shape[1] / 2, shape[1] / 2, shape[1]), - ) - pos = np.empty((*X.shape, 2)) - pos[:, :, 0] = X - pos[:, :, 1] = Y + params = self._active_params + shape = self.sim_state[self.parent.image_shape.name]["value"] + pos, offset, cov, amp = self._prepare_params_gauss(self._active_params, shape) - v = self._compute_multivariate_gaussian( - pos=pos, cen_off=params["cen_off"], cov=params["cov"] - ) - # divide by max(v) to ensure that maximum is params["amp"] - v *= params["amp"] / np.max(v) - - # TODO add dependency from motor position -> #transmission factor, sigmoidal form from 0 to 1 as a function of motor pos - # motor_pos = self.device_manager.devices[params["ref_motor"]].obj.read()[ - # params["ref_motor"] - # ]["value"] - - if params["noise"] == NoiseType.POISSON: - v = np.random.poisson(np.round(v), v.shape) - return v - if params["noise"] == NoiseType.UNIFORM: - multiplier = params["noise_multiplier"] - v += np.random.uniform(-multiplier, multiplier, v.shape) - return v - if self._active_params["noise"] == NoiseType.NONE: - return v + v = self._compute_multivariate_gaussian(pos=pos, cen_off=offset, cov=cov, amp=amp) + v = self._add_noise(v, params["noise"]) + return self._add_hot_pixel(v, params["hot_pixel"]) except SimulatedDataException as exc: - # TODO Propagate msg to client! - logger.warning( - f"Could not compute gaussian for {params['ref_motor']} with {exc} raised." - "Returning 0 instead." - ) - return 0 + raise SimulatedDataException( + f"Could not compute gaussian for {self.parent.name} with {exc} raised. Deactivate eiger to continue." + ) from exc