From 903755325230b2c93eba219dd0e4d2aadd05d16f Mon Sep 17 00:00:00 2001 From: Mathias Guijarro Date: Fri, 28 Jun 2024 15:22:28 +0200 Subject: [PATCH] refactor: make it easier to subclass SimPositioner --- ophyd_devices/sim/sim_positioner.py | 102 ++++++++++++++-------------- 1 file changed, 51 insertions(+), 51 deletions(-) diff --git a/ophyd_devices/sim/sim_positioner.py b/ophyd_devices/sim/sim_positioner.py index 9eb654d..15798ab 100644 --- a/ophyd_devices/sim/sim_positioner.py +++ b/ophyd_devices/sim/sim_positioner.py @@ -76,6 +76,7 @@ class SimPositioner(Device, PositionerBase): sim_init: dict = None, **kwargs, ): + self.move_thread = None self.delay = delay self.device_manager = device_manager self.precision = precision @@ -139,65 +140,62 @@ class SimPositioner(Device, PositionerBase): """Return the simulated state of the device.""" return self.sim.sim_state[signal_name]["value"] + def _update_state(self, val): + """Update the state of the simulated device.""" + if self._stopped: + raise DeviceStop + old_readback = self._get_sim_state(self.readback.name) + self._set_sim_state(self.readback.name, val) + + # Run subscription on "readback" + self._run_subs( + sub_type=self.SUB_READBACK, + old_value=old_readback, + value=self.sim.sim_state[self.readback.name]["value"], + timestamp=self.sim.sim_state[self.readback.name]["timestamp"], + ) + + def _move_and_finish(self, start_pos, stop_pos, st): + """Move the simulated device and finish the motion.""" + success = True + + try: + target = stop_pos + self.tolerance.get() * np.random.uniform(-1, 1) + + updates = np.ceil( + np.abs(target - start_pos) / self.velocity.get() * self.update_frequency + ) + + for ii in np.linspace(start_pos, target, int(updates)): + ttime.sleep(1 / self.update_frequency) + self._update_state(ii) + if self._stopped: + break + else: + self._update_state(target) + except DeviceStop: + success = False + finally: + self._done_moving(success=success) + self._set_sim_state(self.motor_is_moving.name, 0) + st.set_finished() + def move(self, value: float, **kwargs) -> DeviceStatus: - """Change the setpoint of the simulated device, and simultaneously initiated a motion.""" + """Change the setpoint of the simulated device, and simultaneously initiate a motion.""" self._stopped = False self.check_value(value) - old_setpoint = self._get_sim_state(self.setpoint.name) + start_pos = self._get_sim_state(self.setpoint.name) self._set_sim_state(self.motor_is_moving.name, 1) self._set_sim_state(self.setpoint.name, value) - def update_state(val): - """Update the state of the simulated device.""" - if self._stopped: - raise DeviceStop - old_readback = self._get_sim_state(self.readback.name) - self._set_sim_state(self.readback.name, val) - - # Run subscription on "readback" - self._run_subs( - sub_type=self.SUB_READBACK, - old_value=old_readback, - value=self.sim.sim_state[self.readback.name]["value"], - timestamp=self.sim.sim_state[self.readback.name]["timestamp"], - ) - st = DeviceStatus(device=self) if self.delay: - - def move_and_finish(): - """Move the simulated device and finish the motion.""" - success = True - try: - move_val = self._get_sim_state( - self.setpoint.name - ) + self.tolerance.get() * np.random.uniform(-1, 1) - - updates = np.ceil( - np.abs(old_setpoint - move_val) - / self.velocity.get() - * self.update_frequency - ) - - for ii in np.linspace(old_setpoint, move_val, int(updates)): - ttime.sleep(1 / self.update_frequency) - update_state(ii) - - self._set_sim_state(self.motor_is_moving.name, 0) - update_state(move_val) - - except DeviceStop: - success = False - finally: - self._stopped = False - self._done_moving(success=success) - self._set_sim_state(self.motor_is_moving.name, 0) - st.set_finished() - - threading.Thread(target=move_and_finish, daemon=True).start() - + self.move_thread = threading.Thread( + target=self._move_and_finish, args=(start_pos, value, st) + ) + self.move_thread.start() else: - update_state(value) + self._update_state(value) self._done_moving() self._set_sim_state(self.motor_is_moving.name, 0) st.set_finished() @@ -205,8 +203,10 @@ class SimPositioner(Device, PositionerBase): def stop(self, *, success=False): """Stop the motion of the simulated device.""" - super().stop(success=success) self._stopped = True + if self.move_thread: + self.move_thread.join() + super().stop(success=success) @property def position(self) -> float: