diff --git a/ophyd_devices/sim/sim_positioner.py b/ophyd_devices/sim/sim_positioner.py index f06824a..8939703 100644 --- a/ophyd_devices/sim/sim_positioner.py +++ b/ophyd_devices/sim/sim_positioner.py @@ -190,7 +190,7 @@ class SimPositioner(Device, PositionerBase): for status in self._status_list: status.set_exception(exc=exc) finally: - self._set_sim_state(self.motor_is_moving.name, 0) + self.motor_is_moving.put(0) if not self._stopped: self._update_state(self.readback.get()) self._status_list = [] @@ -199,8 +199,8 @@ class SimPositioner(Device, PositionerBase): """Change the setpoint of the simulated device, and simultaneously initiate a motion.""" self._stopped = False self.check_value(value) - self._set_sim_state(self.motor_is_moving.name, 1) - self._set_sim_state(self.setpoint.name, value) + self.motor_is_moving.put(1) + self.setpoint.put(value) st = DeviceStatus(device=self) self._status_list.append(st) @@ -210,7 +210,7 @@ class SimPositioner(Device, PositionerBase): self.move_thread.start() else: self._done_moving() - self._set_sim_state(self.motor_is_moving.name, 0) + self.motor_is_moving.put(0) self._update_state(value) st.set_finished() return st diff --git a/tests/test_simulation.py b/tests/test_simulation.py index e2f40d6..b8d84e4 100644 --- a/tests/test_simulation.py +++ b/tests/test_simulation.py @@ -268,22 +268,40 @@ def test_positioner_move(positioner): ) +@pytest.mark.timeout(30) def test_positioner_motor_is_moving_signal(positioner): """Test that motor is moving is 0 and 1 while (not) moving""" - done = threading.Event() - def cb_motor_done(): - done.set() + recorded_data = [] + cid = None - positioner.move(0).wait() - positioner.velocity.set(3) - assert positioner.motor_is_moving.get() == 0 - status = positioner.move(5) - status.add_callback(cb_motor_done) - assert positioner.motor_is_moving.get() == 1 - status.wait() # Wait will not block until callbacks are executed - done.wait(5) # Wait for the additional callback to be executed - assert positioner.motor_is_moving.get() == 0 + init_velocity = positioner.velocity.get() + + def motor_is_moving_cb(value: any, obj, **kwargs): + data = obj.read()[f"{obj.name}_motor_is_moving"]["value"] + recorded_data.append(data) + + try: + cid = positioner.subscribe(motor_is_moving_cb, event_type="readback", run=False) + + status = positioner.move(-20) + status.wait() + + positioner.velocity.set(10) + + status = positioner.move(20) + status.wait() + + # Check that motor was moving and motor_is_moving switched to "1" + assert any(recorded_data) + + # Check that motor is not moving and motor_is_moving switched back to "0" + assert recorded_data[-1] == 0 + finally: + # Restore initial velocity, remove subscription + positioner.velocity.set(init_velocity) + if cid is not None: + positioner.unsubscribe(cid) @pytest.mark.parametrize(