fix(sim positions): fixed support for setting a new setpoint while the motor is still moving

This commit is contained in:
wakonig_k 2024-11-19 11:13:10 +01:00
parent 719474221b
commit 1482124e24

View File

@ -90,6 +90,7 @@ class SimPositioner(Device, PositionerBase):
self._stopped = False
self.sim = self.sim_cls(parent=self, **kwargs)
self._status_list = []
super().__init__(name=name, parent=parent, kind=kind, **kwargs)
self.sim.sim_state[self.name] = self.sim.sim_state.pop(self.readback.name, None)
@ -155,46 +156,58 @@ class SimPositioner(Device, PositionerBase):
timestamp=self.sim.sim_state[self.readback.name]["timestamp"],
)
def _move_and_finish(self, start_pos, stop_pos, st):
"""Move the simulated device and finish the motion."""
target = stop_pos + self.tolerance.get() * np.random.uniform(-1, 1)
updates = np.ceil(np.abs(target - start_pos) / self.velocity.get() * self.update_frequency)
def _move_to_setpoint(self) -> None:
"""Move the simulated device to the setpoint."""
try:
for ii in np.linspace(start_pos, target, int(updates)):
ttime.sleep(1 / self.update_frequency)
self._update_state(ii)
while True:
setpoint = self.setpoint.get()
value = self.readback.get()
increment = np.sign(setpoint - value) * self.velocity.get() / self.update_frequency
next_val = value + increment + np.random.uniform(-1, 1) * self.tolerance.get()
# Check if next_val would overshoot the setpoint
if (increment > 0 and next_val > setpoint) or (
increment < 0 and next_val < setpoint
):
next_val = setpoint + np.random.uniform(-1, 1) * self.tolerance.get()
self._update_state(next_val)
if np.isclose(setpoint, next_val, atol=self.tolerance.get()):
break
if self._stopped:
raise DeviceStopError(f"{self.name} was stopped")
self._update_state(target)
st.set_finished()
ttime.sleep(1 / self.update_frequency)
self._update_state(self.readback.get())
for status in self._status_list:
status.set_finished()
# pylint: disable=broad-except
except Exception as exc:
content = traceback.format_exc()
logger.warning(
f"Error in on_complete call in device {self.name}. Error traceback: {content}"
)
st.set_exception(exc=exc)
for status in self._status_list:
status.set_exception(exc=exc)
finally:
self._set_sim_state(self.motor_is_moving.name, 0)
if not self._stopped:
self._update_state(target)
self._update_state(self.readback.get())
self._status_list = []
def move(self, value: float, **kwargs) -> DeviceStatus:
"""Change the setpoint of the simulated device, and simultaneously initiate a motion."""
self._stopped = False
self.check_value(value)
start_pos = self._get_sim_state(self.setpoint.name)
self._set_sim_state(self.motor_is_moving.name, 1)
self._set_sim_state(self.setpoint.name, value)
st = DeviceStatus(device=self)
self._status_list.append(st)
if self.delay:
self.move_thread = threading.Thread(
target=self._move_and_finish, args=(start_pos, value, st)
)
self.move_thread.start()
if self.move_thread is None or not self.move_thread.is_alive():
self.move_thread = threading.Thread(target=self._move_to_setpoint)
self.move_thread.start()
else:
self._done_moving()
self._set_sim_state(self.motor_is_moving.name, 0)
@ -254,3 +267,26 @@ class SimLinearTrajectoryPositioner(SimPositioner):
st.set_exception(exc=exc)
finally:
self._set_sim_state(self.motor_is_moving.name, 0)
def move(self, value: float, **kwargs) -> DeviceStatus:
"""Change the setpoint of the simulated device, and simultaneously initiate a motion."""
self._stopped = False
self.check_value(value)
self._set_sim_state(self.motor_is_moving.name, 1)
self._set_sim_state(self.setpoint.name, value)
st = DeviceStatus(device=self)
if self.delay:
if self.move_thread is None or not self.move_thread.is_alive():
self.move_thread = threading.Thread(
target=self._move_and_finish, args=(self.position, value, st)
)
self.move_thread.start()
else:
raise RuntimeError(f"{self.name} is already moving. Cannot start a new move.")
else:
self._done_moving()
self._set_sim_state(self.motor_is_moving.name, 0)
self._update_state(value)
st.set_finished()
return st