refactor: make it easier to subclass SimPositioner

This commit is contained in:
guijar_m 2024-06-28 15:22:28 +02:00 committed by guijar_m
parent d3ef7df4e9
commit 9037553252

View File

@ -76,6 +76,7 @@ class SimPositioner(Device, PositionerBase):
sim_init: dict = None, sim_init: dict = None,
**kwargs, **kwargs,
): ):
self.move_thread = None
self.delay = delay self.delay = delay
self.device_manager = device_manager self.device_manager = device_manager
self.precision = precision self.precision = precision
@ -139,65 +140,62 @@ class SimPositioner(Device, PositionerBase):
"""Return the simulated state of the device.""" """Return the simulated state of the device."""
return self.sim.sim_state[signal_name]["value"] return self.sim.sim_state[signal_name]["value"]
def _update_state(self, val):
"""Update the state of the simulated device."""
if self._stopped:
raise DeviceStop
old_readback = self._get_sim_state(self.readback.name)
self._set_sim_state(self.readback.name, val)
# Run subscription on "readback"
self._run_subs(
sub_type=self.SUB_READBACK,
old_value=old_readback,
value=self.sim.sim_state[self.readback.name]["value"],
timestamp=self.sim.sim_state[self.readback.name]["timestamp"],
)
def _move_and_finish(self, start_pos, stop_pos, st):
"""Move the simulated device and finish the motion."""
success = True
try:
target = stop_pos + self.tolerance.get() * np.random.uniform(-1, 1)
updates = np.ceil(
np.abs(target - start_pos) / self.velocity.get() * self.update_frequency
)
for ii in np.linspace(start_pos, target, int(updates)):
ttime.sleep(1 / self.update_frequency)
self._update_state(ii)
if self._stopped:
break
else:
self._update_state(target)
except DeviceStop:
success = False
finally:
self._done_moving(success=success)
self._set_sim_state(self.motor_is_moving.name, 0)
st.set_finished()
def move(self, value: float, **kwargs) -> DeviceStatus: def move(self, value: float, **kwargs) -> DeviceStatus:
"""Change the setpoint of the simulated device, and simultaneously initiated a motion.""" """Change the setpoint of the simulated device, and simultaneously initiate a motion."""
self._stopped = False self._stopped = False
self.check_value(value) self.check_value(value)
old_setpoint = self._get_sim_state(self.setpoint.name) start_pos = self._get_sim_state(self.setpoint.name)
self._set_sim_state(self.motor_is_moving.name, 1) self._set_sim_state(self.motor_is_moving.name, 1)
self._set_sim_state(self.setpoint.name, value) self._set_sim_state(self.setpoint.name, value)
def update_state(val):
"""Update the state of the simulated device."""
if self._stopped:
raise DeviceStop
old_readback = self._get_sim_state(self.readback.name)
self._set_sim_state(self.readback.name, val)
# Run subscription on "readback"
self._run_subs(
sub_type=self.SUB_READBACK,
old_value=old_readback,
value=self.sim.sim_state[self.readback.name]["value"],
timestamp=self.sim.sim_state[self.readback.name]["timestamp"],
)
st = DeviceStatus(device=self) st = DeviceStatus(device=self)
if self.delay: if self.delay:
self.move_thread = threading.Thread(
def move_and_finish(): target=self._move_and_finish, args=(start_pos, value, st)
"""Move the simulated device and finish the motion.""" )
success = True self.move_thread.start()
try:
move_val = self._get_sim_state(
self.setpoint.name
) + self.tolerance.get() * np.random.uniform(-1, 1)
updates = np.ceil(
np.abs(old_setpoint - move_val)
/ self.velocity.get()
* self.update_frequency
)
for ii in np.linspace(old_setpoint, move_val, int(updates)):
ttime.sleep(1 / self.update_frequency)
update_state(ii)
self._set_sim_state(self.motor_is_moving.name, 0)
update_state(move_val)
except DeviceStop:
success = False
finally:
self._stopped = False
self._done_moving(success=success)
self._set_sim_state(self.motor_is_moving.name, 0)
st.set_finished()
threading.Thread(target=move_and_finish, daemon=True).start()
else: else:
update_state(value) self._update_state(value)
self._done_moving() self._done_moving()
self._set_sim_state(self.motor_is_moving.name, 0) self._set_sim_state(self.motor_is_moving.name, 0)
st.set_finished() st.set_finished()
@ -205,8 +203,10 @@ class SimPositioner(Device, PositionerBase):
def stop(self, *, success=False): def stop(self, *, success=False):
"""Stop the motion of the simulated device.""" """Stop the motion of the simulated device."""
super().stop(success=success)
self._stopped = True self._stopped = True
if self.move_thread:
self.move_thread.join()
super().stop(success=success)
@property @property
def position(self) -> float: def position(self) -> float: