fix: sim_init param considered upon init of simulated devices

This commit is contained in:
2024-06-21 11:24:23 +02:00
parent 73a8f4d986
commit 73dc26b099
4 changed files with 83 additions and 34 deletions

View File

@ -79,7 +79,7 @@ class SimMonitor(Device):
**kwargs, **kwargs,
): ):
self.precision = precision self.precision = precision
self.init_sim_params = sim_init self.sim_init = sim_init
self.device_manager = device_manager self.device_manager = device_manager
self.sim = self.sim_cls(parent=self, **kwargs) self.sim = self.sim_cls(parent=self, **kwargs)
self._registered_proxies = {} self._registered_proxies = {}
@ -87,6 +87,8 @@ class SimMonitor(Device):
super().__init__(name=name, parent=parent, kind=kind, **kwargs) super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None) self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None)
self.readback.name = self.name self.readback.name = self.name
if self.sim_init:
self.sim.sim_set_init(self.sim_init)
@property @property
def registered_proxies(self) -> None: def registered_proxies(self) -> None:
@ -141,7 +143,7 @@ class SimCameraSetup(CustomDetectorMixin):
self.parent.h5_writer.prepare( self.parent.h5_writer.prepare(
file_path=self.parent.filepath.get(), h5_entry="/entry/data/data" file_path=self.parent.filepath.get(), h5_entry="/entry/data/data"
) )
self.publish_file_location(done=False) self.publish_file_location(done=False, successful=False)
self.parent.stopped = False self.parent.stopped = False
def on_unstage(self) -> None: def on_unstage(self) -> None:
@ -200,13 +202,15 @@ class SimCamera(PSIDetectorBase):
def __init__( def __init__(
self, name, *, kind=None, parent=None, sim_init: dict = None, device_manager=None, **kwargs self, name, *, kind=None, parent=None, sim_init: dict = None, device_manager=None, **kwargs
): ):
self.init_sim_params = sim_init self.sim_init = sim_init
self._registered_proxies = {} self._registered_proxies = {}
self.sim = self.sim_cls(parent=self, **kwargs) self.sim = self.sim_cls(parent=self, **kwargs)
self.h5_writer = H5Writer() self.h5_writer = H5Writer()
super().__init__( super().__init__(
name=name, parent=parent, kind=kind, device_manager=device_manager, **kwargs name=name, parent=parent, kind=kind, device_manager=device_manager, **kwargs
) )
if self.sim_init:
self.sim.sim_set_init(self.sim_init)
@property @property
def registered_proxies(self) -> None: def registered_proxies(self) -> None:
@ -269,7 +273,7 @@ class SimWaveform(Device):
self, name, *, kind=None, parent=None, sim_init: dict = None, device_manager=None, **kwargs self, name, *, kind=None, parent=None, sim_init: dict = None, device_manager=None, **kwargs
): ):
self.device_manager = device_manager self.device_manager = device_manager
self.init_sim_params = sim_init self.sim_init = sim_init
self._registered_proxies = {} self._registered_proxies = {}
self.sim = self.sim_cls(parent=self, **kwargs) self.sim = self.sim_cls(parent=self, **kwargs)
@ -278,6 +282,8 @@ class SimWaveform(Device):
self._staged = False self._staged = False
self.scaninfo = None self.scaninfo = None
self._update_scaninfo() self._update_scaninfo()
if self.sim_init:
self.sim.sim_set_init(self.sim_init)
@property @property
def registered_proxies(self) -> None: def registered_proxies(self) -> None:
@ -420,7 +426,7 @@ class SimPositioner(Device, PositionerBase):
self.delay = delay self.delay = delay
self.device_manager = device_manager self.device_manager = device_manager
self.precision = precision self.precision = precision
self.init_sim_params = sim_init self.sim_init = sim_init
self._registered_proxies = {} self._registered_proxies = {}
self.update_frequency = update_frequency self.update_frequency = update_frequency
@ -436,11 +442,8 @@ class SimPositioner(Device, PositionerBase):
assert len(limits) == 2 assert len(limits) == 2
self.low_limit_travel.put(limits[0]) self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1]) self.high_limit_travel.put(limits[1])
if self.sim_init:
# @property self.sim.sim_set_init(self.sim_init)
# def connected(self):
# """Return the connected state of the simulated device."""
# return self.dummy_controller.connected
@property @property
def limits(self): def limits(self):

View File

@ -260,6 +260,29 @@ class SimulatedDataBase(ABC):
table._min_table_width = width table._min_table_width = width
print(table) print(table)
def sim_set_init(self, sim_init: dict["model", "params"]) -> None:
"""Set the initial simulation parameters.
Args:
sim_init (dict["model"]): Dictionary to initiate parameters of the simulation.
"""
model = sim_init.get("model", None)
if model:
try:
self.sim_select_model(model)
except Exception as e:
logger.info(
f"Model {model} not found in available models: {self.sim_get_models()}. Exception raised {e}"
)
params = sim_init.get("params", None)
if params:
try:
self.sim_params = params
except Exception as e:
logger.info(
f"Seeting the simulation parameters failed for {params} and active model {self.self._model()}. Exception raised {e}"
)
class SimulatedPositioner(SimulatedDataBase): class SimulatedPositioner(SimulatedDataBase):
"""Simulated data class for a positioner.""" """Simulated data class for a positioner."""
@ -680,6 +703,11 @@ class SimulatedDataCamera(SimulatedDataBase):
dim = cen_off.shape[0] dim = cen_off.shape[0]
cov_det = np.linalg.det(cov) cov_det = np.linalg.det(cov)
cov_inv = np.linalg.inv(cov) cov_inv = np.linalg.inv(cov)
input = (2 * np.pi) ** dim * cov_det
if input < 0:
raise SimulatedDataException(
f"Covariance matrix leads to negative input for sqrt: {input}"
)
norm = np.sqrt((2 * np.pi) ** dim * cov_det) norm = np.sqrt((2 * np.pi) ** dim * cov_det)
# This einsum call calculates (x-mu)T.Sigma-1.(x-mu) in a vectorized # This einsum call calculates (x-mu)T.Sigma-1.(x-mu) in a vectorized
# way across all the input variables. # way across all the input variables.

View File

@ -87,20 +87,7 @@ class SlitProxy(DeviceProxy):
To update for instance the pixel_size directly, you can directly access the DeviceConfig via To update for instance the pixel_size directly, you can directly access the DeviceConfig via
`dev.eiger.get_device_config()` or update it `dev.eiger.get_device_config({'eiger' : {'pixel_size': 0.1}})` `dev.eiger.get_device_config()` or update it `dev.eiger.get_device_config({'eiger' : {'pixel_size': 0.1}})`
slit_sim: An example for the configuration of this is device is in ophyd_devices.configs.ophyd_devices_simulation.yaml
readoutPriority: baseline
deviceClass: SlitProxy
deviceConfig:
eiger:
signal_name: image
center_offset: [0, 0] # [x,y]
covariance: [[1000, 500], [200, 1000]] # [[x,x],[y,y]]
pixel_size: 0.01
ref_motors: [samx, samy]
slit_width: [1, 1]
motor_dir: [0, 1] # x:0 , y:1, z:2 coordinates
enabled: true
readOnly: false
""" """
USER_ACCESS = ["enabled", "lookup", "help"] USER_ACCESS = ["enabled", "lookup", "help"]
@ -199,16 +186,7 @@ class H5ImageReplayProxy(DeviceProxy):
If the number of requested images is larger than the number of available iamges, the images will be replayed from the beginning. If the number of requested images is larger than the number of available iamges, the images will be replayed from the beginning.
h5_image_sim: An example for the configuration of this is device is in ophyd_devices.configs.ophyd_devices_simulation.yaml
readoutPriority: baseline
deviceClass: H5ImageReplayProxy
deviceConfig:
eiger:
signal_name: image
file_source: /path/to/h5file.h5
h5_entry: /entry/data
enabled: true
readOnly: false
""" """
USER_ACCESS = ["file_source", "h5_entry"] USER_ACCESS = ["file_source", "h5_entry"]

View File

@ -94,6 +94,46 @@ def flyer(name="flyer"):
yield fly yield fly
def test_camera_with_sim_init():
"""Test to see if the sim init parameters are passed to the device"""
dm = DMMock()
sim = SimCamera(name="sim", device_manager=dm)
assert sim.sim._model.value == "gaussian"
model = "constant"
params = {
"amplitude": 300,
"noise": "uniform",
"noise_multiplier": 1,
"hot_pixel_coords": [[0, 0], [50, 50]],
"hot_pixel_types": ["fluctuating", "constant"],
"hot_pixel_values": [2.0, 2.0],
}
sim = SimCamera(name="sim", device_manager=dm, sim_init={"model": model, "params": params})
assert sim.sim._model.value == model
assert sim.sim.sim_params == params
def test_monitor_with_sim_init():
"""Test to see if the sim init parameters are passed to the device"""
dm = DMMock()
sim = SimMonitor(name="sim", device_manager=dm)
assert sim.sim._model._name == "constant"
model = "GaussianModel"
params = {
"amplitude": 500,
"center": 5,
"sigma": 4,
"noise": "uniform",
"noise_multiplier": 1,
"ref_motor": "samy",
}
sim = SimMonitor(name="sim", device_manager=dm, sim_init={"model": model, "params": params})
assert sim.sim._model._name == model.strip("Model").lower()
diff_keys = set(sim.sim.sim_params.keys()) - set(params.keys())
for k in params:
assert sim.sim.sim_params[k] == params[k]
def test_signal__init__(signal): def test_signal__init__(signal):
"""Test the BECProtocol class""" """Test the BECProtocol class"""
assert isinstance(signal, BECDeviceProtocol) assert isinstance(signal, BECDeviceProtocol)