fix: make SimulatedDataMonitor robust to inf/nan

This commit is contained in:
2025-07-22 18:52:20 +02:00
committed by David Perl
parent 6ff034d191
commit d2659bf0b1
2 changed files with 12 additions and 2 deletions

View File

@@ -6,6 +6,7 @@ import time as ttime
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from collections import defaultdict from collections import defaultdict
from copy import deepcopy from copy import deepcopy
from math import copysign, isinf, isnan
import numpy as np import numpy as np
from bec_lib import bec_logger from bec_lib import bec_logger
@@ -72,6 +73,14 @@ DEFAULT_PARAMS_HOT_PIXEL = {
} }
def _safeint(val: float) -> int:
if isnan(val):
return 0
if isinf(val):
return int(copysign(2_147_483_647, val))
return int(val)
class SimulatedDataBase(ABC): class SimulatedDataBase(ABC):
"""Abstract base class for simulated data. """Abstract base class for simulated data.
@@ -429,7 +438,7 @@ class SimulatedDataMonitor(SimulatedDataBase):
else: else:
motor_pos = 0 motor_pos = 0
method = self._model method = self._model
value = int(method.eval(params=self._model_params, x=motor_pos)) value = _safeint(method.eval(params=self._model_params, x=motor_pos))
return self._add_noise(value, self.params["noise"], self.params["noise_multiplier"]) return self._add_noise(value, self.params["noise"], self.params["noise_multiplier"])
def _add_noise(self, v: int, noise: NoiseType, noise_multiplier: float) -> int: def _add_noise(self, v: int, noise: NoiseType, noise_multiplier: float) -> int:

View File

@@ -24,6 +24,7 @@ from ophyd_devices.interfaces.protocols.bec_protocols import (
BECSignalProtocol, BECSignalProtocol,
) )
from ophyd_devices.sim.sim_camera import SimCamera from ophyd_devices.sim.sim_camera import SimCamera
from ophyd_devices.sim.sim_data import _safeint
from ophyd_devices.sim.sim_flyer import SimFlyer from ophyd_devices.sim.sim_flyer import SimFlyer
from ophyd_devices.sim.sim_frameworks.h5_image_replay_proxy import H5ImageReplayProxy from ophyd_devices.sim.sim_frameworks.h5_image_replay_proxy import H5ImageReplayProxy
from ophyd_devices.sim.sim_frameworks.slit_proxy import SlitProxy from ophyd_devices.sim.sim_frameworks.slit_proxy import SlitProxy
@@ -233,7 +234,7 @@ def test_monitor_readback(monitor, center):
elif "center" in monitor.sim.params: elif "center" in monitor.sim.params:
monitor.sim.params["center"] = center monitor.sim.params["center"] = center
assert isinstance(monitor.read()[monitor.name]["value"], monitor.BIT_DEPTH) assert isinstance(monitor.read()[monitor.name]["value"], monitor.BIT_DEPTH)
expected_value = monitor.sim._model.eval(monitor.sim._model_params, x=motor_pos) expected_value = _safeint(monitor.sim._model.eval(monitor.sim._model_params, x=motor_pos))
print(expected_value, monitor.read()[monitor.name]["value"]) print(expected_value, monitor.read()[monitor.name]["value"])
tolerance = ( tolerance = (
monitor.sim.params["noise_multipler"] + 1 monitor.sim.params["noise_multipler"] + 1