test: fix flaky test for positioner is moving

This commit is contained in:
appel_c 2025-02-10 10:48:53 +01:00
parent d7a251dcb1
commit 1cc8a67ded
2 changed files with 34 additions and 16 deletions

View File

@ -190,7 +190,7 @@ class SimPositioner(Device, PositionerBase):
for status in self._status_list:
status.set_exception(exc=exc)
finally:
self._set_sim_state(self.motor_is_moving.name, 0)
self.motor_is_moving.put(0)
if not self._stopped:
self._update_state(self.readback.get())
self._status_list = []
@ -199,8 +199,8 @@ class SimPositioner(Device, PositionerBase):
"""Change the setpoint of the simulated device, and simultaneously initiate a motion."""
self._stopped = False
self.check_value(value)
self._set_sim_state(self.motor_is_moving.name, 1)
self._set_sim_state(self.setpoint.name, value)
self.motor_is_moving.put(1)
self.setpoint.put(value)
st = DeviceStatus(device=self)
self._status_list.append(st)
@ -210,7 +210,7 @@ class SimPositioner(Device, PositionerBase):
self.move_thread.start()
else:
self._done_moving()
self._set_sim_state(self.motor_is_moving.name, 0)
self.motor_is_moving.put(0)
self._update_state(value)
st.set_finished()
return st

View File

@ -268,22 +268,40 @@ def test_positioner_move(positioner):
)
@pytest.mark.timeout(30)
def test_positioner_motor_is_moving_signal(positioner):
"""Test that motor is moving is 0 and 1 while (not) moving"""
done = threading.Event()
def cb_motor_done():
done.set()
recorded_data = []
cid = None
positioner.move(0).wait()
positioner.velocity.set(3)
assert positioner.motor_is_moving.get() == 0
status = positioner.move(5)
status.add_callback(cb_motor_done)
assert positioner.motor_is_moving.get() == 1
status.wait() # Wait will not block until callbacks are executed
done.wait(5) # Wait for the additional callback to be executed
assert positioner.motor_is_moving.get() == 0
init_velocity = positioner.velocity.get()
def motor_is_moving_cb(value: any, obj, **kwargs):
data = obj.read()[f"{obj.name}_motor_is_moving"]["value"]
recorded_data.append(data)
try:
cid = positioner.subscribe(motor_is_moving_cb, event_type="readback", run=False)
status = positioner.move(-20)
status.wait()
positioner.velocity.set(10)
status = positioner.move(20)
status.wait()
# Check that motor was moving and motor_is_moving switched to "1"
assert any(recorded_data)
# Check that motor is not moving and motor_is_moving switched back to "0"
assert recorded_data[-1] == 0
finally:
# Restore initial velocity, remove subscription
positioner.velocity.set(init_velocity)
if cid is not None:
positioner.unsubscribe(cid)
@pytest.mark.parametrize(