From a0b2f273902eeadedab1ef747accacbc68b27ef0 Mon Sep 17 00:00:00 2001 From: appel_c Date: Fri, 8 May 2026 16:46:21 +0200 Subject: [PATCH] fix(sim-positioner): Only run readback subs if values changed by more than the tolerance --- ophyd_devices/sim/sim_positioner.py | 49 +++++++++++++++++++---------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/ophyd_devices/sim/sim_positioner.py b/ophyd_devices/sim/sim_positioner.py index 92b007e..b9509d7 100644 --- a/ophyd_devices/sim/sim_positioner.py +++ b/ophyd_devices/sim/sim_positioner.py @@ -106,6 +106,7 @@ class SimPositioner(Device, PositionerBase): self.sim = self.sim_cls(parent=self, **kwargs) self._status_list = [] + self._active_callbacks: set[str] = set() super().__init__(name=name, parent=parent, kind=kind, **kwargs) self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None) @@ -116,6 +117,18 @@ class SimPositioner(Device, PositionerBase): self.high_limit_travel.put(limits[1]) if self.sim_init: self.sim.set_init(self.sim_init) + self._last_emitted_readback_value = self.readback.get() + + def _run_subs(self, *args, sub_type, **kwargs): + """Prevent concurrent callbacks for the same subscription type.""" + if sub_type in self._active_callbacks: + return + try: + self._active_callbacks.add(sub_type) + super()._run_subs(*args, sub_type=sub_type, **kwargs) + finally: + if sub_type in self._active_callbacks: + self._active_callbacks.remove(sub_type) self.settle_time.subscribe(self._on_settle_time_change, run=False) self.timeout.subscribe(self._on_timeout_change, run=False) @@ -171,23 +184,25 @@ class SimPositioner(Device, PositionerBase): """Return the simulated state of the device.""" return self.sim.sim_state[signal_name]["value"] - def _update_state(self, val): + def _update_readback(self, val): """Update the state of the simulated device.""" old_readback = self._get_sim_state(self.readback.name) self._set_sim_state(self.readback.name, val) + if abs(val - self._last_emitted_readback_value) < self.tolerance.get(): + return # Run subscriptions only if the change in readback is larger than the tolerance # Run subscription on "readback" self._run_subs( sub_type=self.SUB_READBACK, old_value=old_readback, - value=self.sim.sim_state[self.readback.name]["value"], + value=self._get_sim_state(self.readback.name), timestamp=self.sim.sim_state[self.readback.name]["timestamp"], ) # Run subscription on "value" self.readback._run_subs( sub_type=self.readback.SUB_VALUE, old_value=old_readback, - value=self.sim.sim_state[self.readback.name]["value"], + value=self._get_sim_state(self.readback.name), timestamp=self.sim.sim_state[self.readback.name]["timestamp"], ) @@ -195,8 +210,8 @@ class SimPositioner(Device, PositionerBase): """Move the simulated device to the setpoint.""" try: while True: - setpoint = self.setpoint.get() - value = self.readback.get() + setpoint = self._get_sim_state(self.setpoint.name) + value = self._get_sim_state(self.readback.name) increment = np.sign(setpoint - value) * self.velocity.get() / self.update_frequency next_val = value + increment + np.random.uniform(-1, 1) * self.tolerance.get() @@ -207,13 +222,13 @@ class SimPositioner(Device, PositionerBase): ): next_val = setpoint + np.random.uniform(-1, 1) * self.tolerance.get() - self._update_state(next_val) + self._update_readback(next_val) if np.isclose(setpoint, next_val, atol=self.tolerance.get()): break if self._stopped: raise DeviceStopError(f"{self.name} was stopped") ttime.sleep(1 / self.update_frequency) - self._update_state(self.readback.get()) + self._update_readback(self.readback.get()) # pylint: disable=broad-except except Exception as exc: content = traceback.format_exc() @@ -228,7 +243,7 @@ class SimPositioner(Device, PositionerBase): with self._lock: self.motor_is_moving.put(0) if not self._stopped: - self._update_state(self.readback.get()) + self._update_readback(self.readback.get()) for status in self._status_list: if not status.done: status.set_finished() @@ -238,11 +253,11 @@ class SimPositioner(Device, PositionerBase): """Change the setpoint of the simulated device, and simultaneously initiate a motion.""" self._stopped = False self.check_value(value) - self.motor_is_moving.put(1) self.setpoint.put(value) st = DeviceStatus(device=self) with self._lock: + self.motor_is_moving.put(1) self._status_list.append(st) if self.delay: if self.move_thread is None or not self.move_thread.is_alive(): @@ -251,7 +266,7 @@ class SimPositioner(Device, PositionerBase): else: self._done_moving() self.motor_is_moving.put(0) - self._update_state(value) + self._update_readback(value) st.set_finished() return st @@ -289,13 +304,13 @@ class SimLinearTrajectoryPositioner(SimPositioner): try: while not traj.ended: ttime.sleep(1 / self.update_frequency) - self._update_state(traj.position()) + self._update_readback(traj.position()) if self._stopped: # simulate deceleration traj = stop_trajectory(traj) while not traj.ended: ttime.sleep(1 / self.update_frequency) - self._update_state(traj.position()) + self._update_readback(traj.position()) raise DeviceStopError(f"{self.name} was stopped") st.set_finished() # pylint: disable=broad-except @@ -306,14 +321,14 @@ class SimLinearTrajectoryPositioner(SimPositioner): ) st.set_exception(exc=exc) finally: - self._set_sim_state(self.motor_is_moving.name, 0) + self.motor_is_moving.put(0) def move(self, value: float, **kwargs) -> DeviceStatus: """Change the setpoint of the simulated device, and simultaneously initiate a motion.""" self._stopped = False self.check_value(value) - self._set_sim_state(self.motor_is_moving.name, 1) - self._set_sim_state(self.setpoint.name, value) + self.motor_is_moving.put(1) + self._update_readback(value) st = DeviceStatus(device=self) if self.delay: @@ -326,7 +341,7 @@ class SimLinearTrajectoryPositioner(SimPositioner): raise RuntimeError(f"{self.name} is already moving. Cannot start a new move.") else: self._done_moving() - self._set_sim_state(self.motor_is_moving.name, 0) - self._update_state(value) + self.motor_is_moving.put(0) + self._update_readback(value) st.set_finished() return st