refactor: remove sleep from trigger, and adressed MR comments in sim_data

This commit is contained in:
2024-01-31 18:46:34 +01:00
parent 6cac04aa52
commit 10e9acff8a
2 changed files with 148 additions and 79 deletions

View File

@ -195,7 +195,7 @@ class SimCamera(Device):
save_file = Cpt(SetableSignal, name="save_file", value=False, kind=Kind.config)
# image shape, only adjustable via config.
image_shape = Cpt(ReadOnlySignal, name="image_shape", value=SHAPE, kind=Kind.config)
image_shape = Cpt(SetableSignal, name="image_shape", value=SHAPE, kind=Kind.config)
image = Cpt(
ComputedReadOnlySignal,
name="image",
@ -232,7 +232,6 @@ class SimCamera(Device):
for _ in range(self.burst.get()):
# Send data for each trigger
self._run_subs(sub_type=self.SUB_MONITOR, value=self.image.get())
ttime.sleep(self.exp_time.get())
if self._stopped:
raise DeviceStop
except DeviceStop:

View File

@ -50,30 +50,39 @@ class SimulatedDataBase:
This methods should be implemented by the subclass.
It should set the default parameters for:
- self._params (dict used for e.g. computation of gaussian)
- self._simulation_type (SimulationType, e.g. 'constant', 'gauss').
- self._noise (NoiseType, e.g. 'none', 'uniform', 'poisson')
It sets the default parameters for the simulated data,
in self._params that are required for the simulation of for instance
the siumulation type gaussian.
It sets the default parameters for the simulated data in
self._params and calls self._update_init_params()
"""
def get_sim_params(self) -> dict:
"""Return the parameters self._params of the simulation."""
"""Return the currently parameters for the active simulation type in sim_type.
These parameters can be changed with set_sim_params.
Returns:
dict: Parameters of the currently active simulation in sim_type.
"""
return self._active_params
def set_sim_params(self, params: dict) -> None:
"""Set the parameters self._params of the simulation."""
"""Change the current set of parameters for the active simulation type.
Args:
params (dict): New parameters for the active simulation type.
Raises:
SimulatedDataException: If the new parameters can not be set or is not part of the parameters initiated.
"""
for k, v in params.items():
try:
self._active_params[k] = v
except KeyError:
# TODO propagate msg to client!
logger.warning(
f"Could not set {k} to {v} in {self._active_params}.KeyError raised. Ignoring."
)
if k == "noise":
self._active_params[k] = NoiseType(v)
else:
self._active_params[k] = v
except Exception as exc:
raise SimulatedDataException(
f"Could not set {k} to {v} in {self._active_params} with exception {exc}"
) from exc
def get_sim_type(self) -> SimulationType:
"""Return the simulation type of the simulation.
@ -87,11 +96,11 @@ class SimulatedDataBase:
"""Set the simulation type of the simulation."""
try:
self._simulation_type = SimulationType(simulation_type)
except ValueError:
except ValueError as exc:
raise SimulatedDataException(
f"Could not set simulation type to {simulation_type}. Valid options are 'constant'"
" and 'gauss'"
)
) from exc
self._active_params = self._all_params.get(self._simulation_type, None)
def _compute_sim_state(self, signal_name: str) -> None:
@ -110,19 +119,49 @@ class SimulatedDataBase:
self.sim_state[signal_name]["value"] = value
self.sim_state[signal_name]["timestamp"] = ttime.time()
def _update_init_params(self, sim_type_default: SimulationType) -> None:
"""Update the initial parameters of the simulated data with input from deviceConfig.
Args:
sim_type_default (SimulationType): Default simulation type to use if not specified in deviceConfig.
"""
init_params = self.parent.init_sim_params
for sim_type in self._all_params.values():
for sim_type_config_element in sim_type:
if init_params:
if sim_type_config_element in init_params:
sim_type[sim_type_config_element] = init_params[sim_type_config_element]
# Set simulation type to default if not specified in deviceConfig
sim_type_select = (
init_params.get("sim_type", sim_type_default) if init_params else sim_type_default
)
self.set_sim_type(sim_type_select)
class SimulatedDataMonitor(SimulatedDataBase):
"""Simulated data for a monitor."""
def init_paramaters(self, **kwargs):
"""Initialize the parameters for the Simulated Data
"""Initialize the parameters for the simulated data
Ref_motor is the motor that is used to compute the gaussian.
Amp is the amplitude of the gaussian.
Cen is the center of the gaussian.
Sig is the sigma of the gaussian.
Noise is the type of noise to add to the signal. Be aware that poisson noise will round the value to an integer-like values.
Noise multiplier is the multiplier of the noise, only relevant for uniform noise.
This method will fill self._all_params with the default parameters for
SimulationType.CONSTANT and SimulationType.GAUSSIAN.
New simulation types can be added by adding a new key to self._all_params,
together with the required parameters for that simulation type. Please
also complement the docstring of this method with the new simulation type.
For SimulationType.CONSTANT:
Amp is the amplitude of the constant value.
Noise is the type of noise to add to the signal. Available options are 'poisson', 'uniform' or 'none'.
Noise multiplier is the multiplier of the noise, only relevant for uniform noise.
For SimulationType.GAUSSIAN:
ref_motor is the motor that is used as reference to compute the gaussian.
amp is the amplitude of the gaussian.
cen is the center of the gaussian.
sig is the sigma of the gaussian.
noise is the type of noise to add to the signal. Available options are 'poisson', 'uniform' or 'none'.
noise multiplier is the multiplier of the noise, only relevant for uniform noise.
"""
self._all_params = {
SimulationType.CONSTANT: {
@ -139,23 +178,17 @@ class SimulatedDataMonitor(SimulatedDataBase):
"noise_multiplier": 0.1,
},
}
if self.parent.init_sim_params:
sim_type = self.parent.init_sim_params.pop("sym_type", SimulationType.CONSTANT)
for v in self._all_params.values():
for k in v.keys():
if k in self.parent.init_sim_params:
v[k] = self.parent.init_sim_params[k]
else:
sim_type = SimulationType.CONSTANT
self.set_sim_type(sim_type)
# Update init parameters and set simulation type to Constant if not specified otherwise in init_sim_params
self._update_init_params(sim_type_default=SimulationType.CONSTANT)
def _compute_sim_state(self, signal_name: str) -> None:
"""Update the simulated state of the device.
It will update the value in self.sim_state with the value computed by
the chosen simulation type.
Args:
signal_name (str): Name of the signal to update.
sim_type (SimulationType, optional): Type of simulation to steer simulated data. Defaults to SimulationType.CONSTANT.
"""
if self.get_sim_type() == SimulationType.CONSTANT:
value = self._compute_constant()
@ -165,14 +198,17 @@ class SimulatedDataMonitor(SimulatedDataBase):
self.update_sim_state(signal_name, value)
def _compute_constant(self) -> float:
"""Compute a random value."""
"""Computes constant value and adds noise if activated."""
v = self._active_params["amp"]
if self._active_params["noise"] == NoiseType.POISSON:
v = np.random.poisson(np.round(v), 1)[0]
return v
elif self._active_params["noise"] == NoiseType.UNIFORM:
v += np.random.uniform(-1, 1) * self._active_params["noise_multiplier"]
return v
elif self._active_params["noise"] == NoiseType.NONE:
v = self._active_params["amp"]
return v
else:
# TODO Propagate msg to client!
logger.warning(
@ -180,15 +216,17 @@ class SimulatedDataMonitor(SimulatedDataBase):
" 'uniform' or 'none'. Returning 0."
)
return 0
return v
def _compute_gaussian(self) -> float:
"""Compute a gaussian value.
"""Computes return value for sim_type = "gauss".
Based on the parameters in self._params, a value of a gaussian distributed
is computed with respected to the motor position of ref_motor.
The value is based on the parameters for the gaussian in
self._active_params and the position of the ref_motor
and adds noise based on the noise type.
If computation fails, it returns 0.
Returns: float
"""
params = self._active_params
@ -214,10 +252,31 @@ class SimulatedDataMonitor(SimulatedDataBase):
class SimulatedDataCamera(SimulatedDataBase):
"""Simulated data for a 2D camera."""
"""Simulated class to compute data for a 2D camera."""
def init_paramaters(self, **kwargs):
"""Initialize the parameters for the simulated camera data"""
"""Initialize the parameters for the simulated data
This method will fill self._all_params with the default parameters for
SimulationType.CONSTANT and SimulationType.GAUSSIAN.
New simulation types can be added by adding a new key to self._all_params,
together with the required parameters for that simulation type. Please
also complement the docstring of this method with the new simulation type.
For SimulationType.CONSTANT:
Amp is the amplitude of the constant value.
Noise is the type of noise to add to the signal. Available options are 'poisson', 'uniform' or 'none'.
Noise multiplier is the multiplier of the noise, only relevant for uniform noise.
For SimulationType.GAUSSIAN:
amp is the amplitude of the gaussian.
cen_off is the pixel offset from the center of the gaussian from the center of the image.
It is passed as a numpy array.
cov is the 2D covariance matrix used to specify the shape of the gaussian.
It is a 2x2 matrix and will be passed as a numpy array.
noise is the type of noise to add to the signal. Available options are 'poisson', 'uniform' or 'none'.
noise multiplier is the multiplier of the noise, only relevant for uniform noise.
"""
self._all_params = {
SimulationType.CONSTANT: {
"amp": 100,
@ -226,29 +285,23 @@ class SimulatedDataCamera(SimulatedDataBase):
},
SimulationType.GAUSSIAN: {
"amp": 100,
"cen": np.array([50, 50]),
"cov": np.array([[10, 0], [0, 10]]),
"cen_off": np.array([0, 0]),
"cov": np.array([[10, 5], [5, 10]]),
"noise": NoiseType.NONE,
"noise_multiplier": 0.1,
},
}
if self.parent.init_sim_params:
sim_type = self.parent.init_sim_params.pop("sym_type", SimulationType.CONSTANT)
for v in self._all_params.values():
for k in v.keys():
if k in self.parent.init_sim_params:
v[k] = self.parent.init_sim_params[k]
else:
sim_type = SimulationType.CONSTANT
self.set_sim_type(sim_type)
# Update init parameters and set simulation type to Gaussian if not specified otherwise in init_sim_params
self._update_init_params(sim_type_default=SimulationType.GAUSSIAN)
def _compute_sim_state(self, signal_name: str) -> None:
"""Update the simulated state of the device.
It will update the value in self.sim_state with the value computed by
the chosen simulation type.
Args:
signal_name (str): Name of the signal to update.
sim_type (SimulationType, optional): Type of simulation to steer simulated data. Defaults to SimulationType.CONSTANT.
"""
if self.get_sim_type() == SimulationType.CONSTANT:
value = self._compute_constant()
@ -258,61 +311,76 @@ class SimulatedDataCamera(SimulatedDataBase):
self.update_sim_state(signal_name, value)
def _compute_constant(self) -> float:
"""Compute a random value."""
"""Compute a return value for sim_type = Constant."""
# tuple with shape
shape = self.sim_state[self.parent.image_shape.name]["value"]
v = self._active_params["amp"] * np.ones(shape, dtype=np.uint16)
if self._active_params["noise"] == NoiseType.POISSON:
v = np.random.poisson(np.round(v), v.shape)
return v
elif self._active_params["noise"] == NoiseType.UNIFORM:
if self._active_params["noise"] == NoiseType.UNIFORM:
multiplier = self._active_params["noise_multiplier"]
v += np.random.randint(-multiplier, multiplier, v.shape)
return v
elif self._active_params["noise"] == NoiseType.NONE:
if self._active_params["noise"] == NoiseType.NONE:
return v
else:
# TODO Propagate msg to client!
logger.warning(
f"Unknown noise type {self._active_params['noise']}. Please choose from 'poisson',"
" 'uniform' or 'none'. Returning 0."
)
return 0
# TODO Propagate msg to client!
logger.warning(
f"Unknown noise type {self._active_params['noise']}. Please choose from 'poisson',"
" 'uniform' or 'none'. Returning 0."
)
return 0
def _compute_multivariate_gaussian(self, pos: np.ndarray, cen: np.ndarray, cov: np.ndarray):
"""Return the multivariate Gaussian distribution on array pos."""
def _compute_multivariate_gaussian(
self, pos: np.ndarray, cen_off: np.ndarray, cov: np.ndarray
) -> np.ndarray:
"""Computes and returns the multivariate Gaussian distribution.
dim = cen.shape[0]
Args:
pos (np.ndarray): Position of the gaussian.
cen_off (np.ndarray): Offset from cener of image for the gaussian.
cov (np.ndarray): Covariance matrix of the gaussian.
Returns:
np.ndarray: Multivariate Gaussian distribution.
"""
dim = cen_off.shape[0]
cov_det = np.linalg.det(cov)
cov_inv = np.linalg.inv(cov)
N = np.sqrt((2 * np.pi) ** dim * cov_det)
# This einsum call calculates (x-mu)T.Sigma-1.(x-mu) in a vectorized
# way across all the input variables.
fac = np.einsum("...k,kl,...l->...", pos - cen, cov_inv, pos - cen)
fac = np.einsum("...k,kl,...l->...", pos - cen_off, cov_inv, pos - cen_off)
return np.exp(-fac / 2) / N
def _compute_gaussian(self) -> float:
"""Compute a gaussian value.
"""Computes return value for sim_type = "gauss".
Based on the parameters in self._params, a value of a gaussian distributed
is computed with respected to the motor position of ref_motor.
The value is based on the parameters for the gaussian in
self._active_params and adds noise based on the noise type.
If computation fails, it returns 0.
Returns: float
"""
params = self._active_params
shape = self.sim_state[self.parent.image_shape.name]["value"]
try:
X, Y = np.meshgrid(
np.linspace(0, shape[0] - 1, shape[0]),
np.linspace(0, shape[1] - 1, shape[1]),
np.linspace(-shape[0] / 2, shape[0] / 2, shape[0]),
np.linspace(-shape[1] / 2, shape[1] / 2, shape[1]),
)
pos = np.empty((*X.shape, 2))
pos[:, :, 0] = X
pos[:, :, 1] = Y
v = self._compute_multivariate_gaussian(pos=pos, cen=params["cen"], cov=params["cov"])
v = self._compute_multivariate_gaussian(
pos=pos, cen_off=params["cen_off"], cov=params["cov"]
)
# divide by max(v) to ensure that maximum is params["amp"]
v *= params["amp"] / np.max(v)
@ -324,10 +392,12 @@ class SimulatedDataCamera(SimulatedDataBase):
if params["noise"] == NoiseType.POISSON:
v = np.random.poisson(np.round(v), v.shape)
return v
elif params["noise"] == NoiseType.UNIFORM:
if params["noise"] == NoiseType.UNIFORM:
multiplier = params["noise_multiplier"]
v += np.random.uniform(-multiplier, multiplier, v.shape)
return v
if self._active_params["noise"] == NoiseType.NONE:
return v
except SimulatedDataException as exc:
# TODO Propagate msg to client!
logger.warning(