mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2026-05-12 22:55:35 +02:00
fix(pseudo-motor): Add subscription callbacks to readback signal, and SUB_READBACK event from positioners
This commit is contained in:
@@ -98,8 +98,23 @@ class PSIPseudoMotorBase(ABC, PSIDeviceBase, PositionerBase):
|
||||
move_signature = inspect.signature(positioner.move)
|
||||
if "wait" in move_signature.parameters:
|
||||
self._positioner_move_kwargs[name] = {"wait": False}
|
||||
|
||||
# Subscribe callback to updates of the readback signals
|
||||
self.readback.subscribe(self._run_readback_signal_subs, event_type=self.readback.SUB_VALUE)
|
||||
# Subscribe to "readback" event on each positioner
|
||||
for positioner in self.positioner_objects.values():
|
||||
positioner.subscribe(self._run_readback_event_subs, event_type=positioner.SUB_READBACK)
|
||||
return super().wait_for_connection(*args, **kwargs)
|
||||
|
||||
def _run_readback_signal_subs(self, value: float, old_value: float, **kwargs):
|
||||
"""Run subscriptions on the readback signal when it updates."""
|
||||
self._run_subs(sub_type=self.SUB_READBACK, old_value=old_value, value=value)
|
||||
|
||||
def _run_readback_event_subs(self, value: float, old_value: float, **kwargs):
|
||||
"""Run subscriptions on the readback event when it updates."""
|
||||
new_val = self.readback.get()
|
||||
self._run_subs(sub_type=self.SUB_READBACK, value=new_val)
|
||||
|
||||
def _check_method_signatures(self) -> None:
|
||||
"""Ensure calculation method parameters match configured positioner keys."""
|
||||
input_names = set(self.positioner_objects.keys())
|
||||
|
||||
Reference in New Issue
Block a user