diff --git a/ophyd_devices/sim/sim.py b/ophyd_devices/sim/sim.py index 983d295..eb9156d 100644 --- a/ophyd_devices/sim/sim.py +++ b/ophyd_devices/sim/sim.py @@ -195,7 +195,7 @@ class SimCamera(Device): save_file = Cpt(SetableSignal, name="save_file", value=False, kind=Kind.config) # image shape, only adjustable via config. - image_shape = Cpt(ReadOnlySignal, name="image_shape", value=SHAPE, kind=Kind.config) + image_shape = Cpt(SetableSignal, name="image_shape", value=SHAPE, kind=Kind.config) image = Cpt( ComputedReadOnlySignal, name="image", @@ -232,7 +232,6 @@ class SimCamera(Device): for _ in range(self.burst.get()): # Send data for each trigger self._run_subs(sub_type=self.SUB_MONITOR, value=self.image.get()) - ttime.sleep(self.exp_time.get()) if self._stopped: raise DeviceStop except DeviceStop: diff --git a/ophyd_devices/sim/sim_data.py b/ophyd_devices/sim/sim_data.py index 3786370..22dbc8f 100644 --- a/ophyd_devices/sim/sim_data.py +++ b/ophyd_devices/sim/sim_data.py @@ -50,30 +50,39 @@ class SimulatedDataBase: This methods should be implemented by the subclass. - It should set the default parameters for: - - self._params (dict used for e.g. computation of gaussian) - - self._simulation_type (SimulationType, e.g. 'constant', 'gauss'). - - self._noise (NoiseType, e.g. 'none', 'uniform', 'poisson') - - It sets the default parameters for the simulated data, - in self._params that are required for the simulation of for instance - the siumulation type gaussian. + It sets the default parameters for the simulated data in + self._params and calls self._update_init_params() """ def get_sim_params(self) -> dict: - """Return the parameters self._params of the simulation.""" + """Return the currently parameters for the active simulation type in sim_type. + + These parameters can be changed with set_sim_params. + + Returns: + dict: Parameters of the currently active simulation in sim_type. + """ return self._active_params def set_sim_params(self, params: dict) -> None: - """Set the parameters self._params of the simulation.""" + """Change the current set of parameters for the active simulation type. + + Args: + params (dict): New parameters for the active simulation type. + + Raises: + SimulatedDataException: If the new parameters can not be set or is not part of the parameters initiated. + """ for k, v in params.items(): try: - self._active_params[k] = v - except KeyError: - # TODO propagate msg to client! - logger.warning( - f"Could not set {k} to {v} in {self._active_params}.KeyError raised. Ignoring." - ) + if k == "noise": + self._active_params[k] = NoiseType(v) + else: + self._active_params[k] = v + except Exception as exc: + raise SimulatedDataException( + f"Could not set {k} to {v} in {self._active_params} with exception {exc}" + ) from exc def get_sim_type(self) -> SimulationType: """Return the simulation type of the simulation. @@ -87,11 +96,11 @@ class SimulatedDataBase: """Set the simulation type of the simulation.""" try: self._simulation_type = SimulationType(simulation_type) - except ValueError: + except ValueError as exc: raise SimulatedDataException( f"Could not set simulation type to {simulation_type}. Valid options are 'constant'" " and 'gauss'" - ) + ) from exc self._active_params = self._all_params.get(self._simulation_type, None) def _compute_sim_state(self, signal_name: str) -> None: @@ -110,19 +119,49 @@ class SimulatedDataBase: self.sim_state[signal_name]["value"] = value self.sim_state[signal_name]["timestamp"] = ttime.time() + def _update_init_params(self, sim_type_default: SimulationType) -> None: + """Update the initial parameters of the simulated data with input from deviceConfig. + + Args: + sim_type_default (SimulationType): Default simulation type to use if not specified in deviceConfig. + """ + init_params = self.parent.init_sim_params + for sim_type in self._all_params.values(): + for sim_type_config_element in sim_type: + if init_params: + if sim_type_config_element in init_params: + sim_type[sim_type_config_element] = init_params[sim_type_config_element] + # Set simulation type to default if not specified in deviceConfig + sim_type_select = ( + init_params.get("sim_type", sim_type_default) if init_params else sim_type_default + ) + self.set_sim_type(sim_type_select) + class SimulatedDataMonitor(SimulatedDataBase): """Simulated data for a monitor.""" def init_paramaters(self, **kwargs): - """Initialize the parameters for the Simulated Data + """Initialize the parameters for the simulated data - Ref_motor is the motor that is used to compute the gaussian. - Amp is the amplitude of the gaussian. - Cen is the center of the gaussian. - Sig is the sigma of the gaussian. - Noise is the type of noise to add to the signal. Be aware that poisson noise will round the value to an integer-like values. - Noise multiplier is the multiplier of the noise, only relevant for uniform noise. + This method will fill self._all_params with the default parameters for + SimulationType.CONSTANT and SimulationType.GAUSSIAN. + New simulation types can be added by adding a new key to self._all_params, + together with the required parameters for that simulation type. Please + also complement the docstring of this method with the new simulation type. + + For SimulationType.CONSTANT: + Amp is the amplitude of the constant value. + Noise is the type of noise to add to the signal. Available options are 'poisson', 'uniform' or 'none'. + Noise multiplier is the multiplier of the noise, only relevant for uniform noise. + + For SimulationType.GAUSSIAN: + ref_motor is the motor that is used as reference to compute the gaussian. + amp is the amplitude of the gaussian. + cen is the center of the gaussian. + sig is the sigma of the gaussian. + noise is the type of noise to add to the signal. Available options are 'poisson', 'uniform' or 'none'. + noise multiplier is the multiplier of the noise, only relevant for uniform noise. """ self._all_params = { SimulationType.CONSTANT: { @@ -139,23 +178,17 @@ class SimulatedDataMonitor(SimulatedDataBase): "noise_multiplier": 0.1, }, } - - if self.parent.init_sim_params: - sim_type = self.parent.init_sim_params.pop("sym_type", SimulationType.CONSTANT) - for v in self._all_params.values(): - for k in v.keys(): - if k in self.parent.init_sim_params: - v[k] = self.parent.init_sim_params[k] - else: - sim_type = SimulationType.CONSTANT - self.set_sim_type(sim_type) + # Update init parameters and set simulation type to Constant if not specified otherwise in init_sim_params + self._update_init_params(sim_type_default=SimulationType.CONSTANT) def _compute_sim_state(self, signal_name: str) -> None: """Update the simulated state of the device. + It will update the value in self.sim_state with the value computed by + the chosen simulation type. + Args: signal_name (str): Name of the signal to update. - sim_type (SimulationType, optional): Type of simulation to steer simulated data. Defaults to SimulationType.CONSTANT. """ if self.get_sim_type() == SimulationType.CONSTANT: value = self._compute_constant() @@ -165,14 +198,17 @@ class SimulatedDataMonitor(SimulatedDataBase): self.update_sim_state(signal_name, value) def _compute_constant(self) -> float: - """Compute a random value.""" + """Computes constant value and adds noise if activated.""" v = self._active_params["amp"] if self._active_params["noise"] == NoiseType.POISSON: v = np.random.poisson(np.round(v), 1)[0] + return v elif self._active_params["noise"] == NoiseType.UNIFORM: v += np.random.uniform(-1, 1) * self._active_params["noise_multiplier"] + return v elif self._active_params["noise"] == NoiseType.NONE: v = self._active_params["amp"] + return v else: # TODO Propagate msg to client! logger.warning( @@ -180,15 +216,17 @@ class SimulatedDataMonitor(SimulatedDataBase): " 'uniform' or 'none'. Returning 0." ) return 0 - return v def _compute_gaussian(self) -> float: - """Compute a gaussian value. + """Computes return value for sim_type = "gauss". - Based on the parameters in self._params, a value of a gaussian distributed - is computed with respected to the motor position of ref_motor. + The value is based on the parameters for the gaussian in + self._active_params and the position of the ref_motor + and adds noise based on the noise type. If computation fails, it returns 0. + + Returns: float """ params = self._active_params @@ -214,10 +252,31 @@ class SimulatedDataMonitor(SimulatedDataBase): class SimulatedDataCamera(SimulatedDataBase): - """Simulated data for a 2D camera.""" + """Simulated class to compute data for a 2D camera.""" def init_paramaters(self, **kwargs): - """Initialize the parameters for the simulated camera data""" + """Initialize the parameters for the simulated data + + This method will fill self._all_params with the default parameters for + SimulationType.CONSTANT and SimulationType.GAUSSIAN. + New simulation types can be added by adding a new key to self._all_params, + together with the required parameters for that simulation type. Please + also complement the docstring of this method with the new simulation type. + + For SimulationType.CONSTANT: + Amp is the amplitude of the constant value. + Noise is the type of noise to add to the signal. Available options are 'poisson', 'uniform' or 'none'. + Noise multiplier is the multiplier of the noise, only relevant for uniform noise. + + For SimulationType.GAUSSIAN: + amp is the amplitude of the gaussian. + cen_off is the pixel offset from the center of the gaussian from the center of the image. + It is passed as a numpy array. + cov is the 2D covariance matrix used to specify the shape of the gaussian. + It is a 2x2 matrix and will be passed as a numpy array. + noise is the type of noise to add to the signal. Available options are 'poisson', 'uniform' or 'none'. + noise multiplier is the multiplier of the noise, only relevant for uniform noise. + """ self._all_params = { SimulationType.CONSTANT: { "amp": 100, @@ -226,29 +285,23 @@ class SimulatedDataCamera(SimulatedDataBase): }, SimulationType.GAUSSIAN: { "amp": 100, - "cen": np.array([50, 50]), - "cov": np.array([[10, 0], [0, 10]]), + "cen_off": np.array([0, 0]), + "cov": np.array([[10, 5], [5, 10]]), "noise": NoiseType.NONE, "noise_multiplier": 0.1, }, } - - if self.parent.init_sim_params: - sim_type = self.parent.init_sim_params.pop("sym_type", SimulationType.CONSTANT) - for v in self._all_params.values(): - for k in v.keys(): - if k in self.parent.init_sim_params: - v[k] = self.parent.init_sim_params[k] - else: - sim_type = SimulationType.CONSTANT - self.set_sim_type(sim_type) + # Update init parameters and set simulation type to Gaussian if not specified otherwise in init_sim_params + self._update_init_params(sim_type_default=SimulationType.GAUSSIAN) def _compute_sim_state(self, signal_name: str) -> None: """Update the simulated state of the device. + It will update the value in self.sim_state with the value computed by + the chosen simulation type. + Args: signal_name (str): Name of the signal to update. - sim_type (SimulationType, optional): Type of simulation to steer simulated data. Defaults to SimulationType.CONSTANT. """ if self.get_sim_type() == SimulationType.CONSTANT: value = self._compute_constant() @@ -258,61 +311,76 @@ class SimulatedDataCamera(SimulatedDataBase): self.update_sim_state(signal_name, value) def _compute_constant(self) -> float: - """Compute a random value.""" + """Compute a return value for sim_type = Constant.""" + # tuple with shape shape = self.sim_state[self.parent.image_shape.name]["value"] v = self._active_params["amp"] * np.ones(shape, dtype=np.uint16) if self._active_params["noise"] == NoiseType.POISSON: v = np.random.poisson(np.round(v), v.shape) return v - elif self._active_params["noise"] == NoiseType.UNIFORM: + if self._active_params["noise"] == NoiseType.UNIFORM: multiplier = self._active_params["noise_multiplier"] v += np.random.randint(-multiplier, multiplier, v.shape) return v - elif self._active_params["noise"] == NoiseType.NONE: + if self._active_params["noise"] == NoiseType.NONE: return v - else: - # TODO Propagate msg to client! - logger.warning( - f"Unknown noise type {self._active_params['noise']}. Please choose from 'poisson'," - " 'uniform' or 'none'. Returning 0." - ) - return 0 + # TODO Propagate msg to client! + logger.warning( + f"Unknown noise type {self._active_params['noise']}. Please choose from 'poisson'," + " 'uniform' or 'none'. Returning 0." + ) + return 0 - def _compute_multivariate_gaussian(self, pos: np.ndarray, cen: np.ndarray, cov: np.ndarray): - """Return the multivariate Gaussian distribution on array pos.""" + def _compute_multivariate_gaussian( + self, pos: np.ndarray, cen_off: np.ndarray, cov: np.ndarray + ) -> np.ndarray: + """Computes and returns the multivariate Gaussian distribution. - dim = cen.shape[0] + Args: + pos (np.ndarray): Position of the gaussian. + cen_off (np.ndarray): Offset from cener of image for the gaussian. + cov (np.ndarray): Covariance matrix of the gaussian. + + Returns: + np.ndarray: Multivariate Gaussian distribution. + """ + + dim = cen_off.shape[0] cov_det = np.linalg.det(cov) cov_inv = np.linalg.inv(cov) N = np.sqrt((2 * np.pi) ** dim * cov_det) # This einsum call calculates (x-mu)T.Sigma-1.(x-mu) in a vectorized # way across all the input variables. - fac = np.einsum("...k,kl,...l->...", pos - cen, cov_inv, pos - cen) + fac = np.einsum("...k,kl,...l->...", pos - cen_off, cov_inv, pos - cen_off) return np.exp(-fac / 2) / N def _compute_gaussian(self) -> float: - """Compute a gaussian value. + """Computes return value for sim_type = "gauss". - Based on the parameters in self._params, a value of a gaussian distributed - is computed with respected to the motor position of ref_motor. + The value is based on the parameters for the gaussian in + self._active_params and adds noise based on the noise type. If computation fails, it returns 0. + + Returns: float """ params = self._active_params shape = self.sim_state[self.parent.image_shape.name]["value"] try: X, Y = np.meshgrid( - np.linspace(0, shape[0] - 1, shape[0]), - np.linspace(0, shape[1] - 1, shape[1]), + np.linspace(-shape[0] / 2, shape[0] / 2, shape[0]), + np.linspace(-shape[1] / 2, shape[1] / 2, shape[1]), ) pos = np.empty((*X.shape, 2)) pos[:, :, 0] = X pos[:, :, 1] = Y - v = self._compute_multivariate_gaussian(pos=pos, cen=params["cen"], cov=params["cov"]) + v = self._compute_multivariate_gaussian( + pos=pos, cen_off=params["cen_off"], cov=params["cov"] + ) # divide by max(v) to ensure that maximum is params["amp"] v *= params["amp"] / np.max(v) @@ -324,10 +392,12 @@ class SimulatedDataCamera(SimulatedDataBase): if params["noise"] == NoiseType.POISSON: v = np.random.poisson(np.round(v), v.shape) return v - elif params["noise"] == NoiseType.UNIFORM: + if params["noise"] == NoiseType.UNIFORM: multiplier = params["noise_multiplier"] v += np.random.uniform(-multiplier, multiplier, v.shape) return v + if self._active_params["noise"] == NoiseType.NONE: + return v except SimulatedDataException as exc: # TODO Propagate msg to client! logger.warning(