From faab59c3a58923ce54cf91879ce252346b2d28a2 Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 10 Feb 2026 15:36:06 +0100 Subject: [PATCH] w --- csaxs_bec/devices/omny/galil/galil_ophyd.py | 23 ++++++++++----------- csaxs_bec/devices/omny/galil/galil_rio.py | 14 +++++-------- 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/csaxs_bec/devices/omny/galil/galil_ophyd.py b/csaxs_bec/devices/omny/galil/galil_ophyd.py index 3c03bed..c1215b1 100644 --- a/csaxs_bec/devices/omny/galil/galil_ophyd.py +++ b/csaxs_bec/devices/omny/galil/galil_ophyd.py @@ -4,6 +4,7 @@ This module contains the base class for Galil controllers as well as the signals import functools import time +from typing import Any from bec_lib import bec_logger from ophyd.utils import ReadOnlyError @@ -347,25 +348,23 @@ class GalilSignalBase(SocketSignal): def __init__(self, signal_name, **kwargs): self.signal_name = signal_name super().__init__(**kwargs) - self.controller = self._find_controller_recursively() + self.controller: Controller = self._find_attribute_recursively("controller") - def _find_controller_recursively(self) -> Controller: + def _find_attribute_recursively(self, attribute: str) -> Any: """ - Find controller instance recursively for nested sub-devices. - This is for example needed for the GalilRIO which has DDC components for the analog - and digital channels. (DDC components are sub-devices). + Find an attribute recursively for nested sub-devices. + This is needed to find for example the controller instance for DDC components, + thus nested devices. """ - _MAX_DEPTH = 10 # to prevent infinite recursion + max_depth = 10 # to prevent infinite recursion current_parent = self.parent depth = 0 - while current_parent is not None and depth < _MAX_DEPTH: - if hasattr(current_parent, "controller") and isinstance( - current_parent.controller, Controller - ): - return current_parent.controller + while current_parent is not None and depth < max_depth: + if hasattr(current_parent, attribute): + return getattr(current_parent, attribute) current_parent = getattr(current_parent, "parent", None) depth += 1 - raise RuntimeError("Controller not found within maximum depth") + raise RuntimeError(f"Attribute '{attribute}' not found within maximum depth {max_depth}.") class GalilSignalRO(GalilSignalBase): diff --git a/csaxs_bec/devices/omny/galil/galil_rio.py b/csaxs_bec/devices/omny/galil/galil_rio.py index c9838b8..52f7d5a 100644 --- a/csaxs_bec/devices/omny/galil/galil_rio.py +++ b/csaxs_bec/devices/omny/galil/galil_rio.py @@ -84,6 +84,7 @@ class GalilRIOSignalRO(GalilSignalRO): def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs): super().__init__(signal_name, parent=parent, **kwargs) + self._readback_metadata = self._find_attribute_recursively("_readback_metadata") self._channel = channel self._metadata["connected"] = False @@ -99,7 +100,7 @@ class GalilRIOSignalRO(GalilSignalRO): def get(self): """Get current analog channel values from the Galil RIO controller.""" # If the last readback has happend more than _READ_TIMEOUT seconds ago, read all channels again - if time.monotonic() - self.parent.last_readback > self._READ_TIMEOUT: + if time.monotonic() - self._readback_metadata["last_readback"] > self._READ_TIMEOUT: self._readback = self._socket_get() return self._readback @@ -109,7 +110,7 @@ class GalilRIOSignalRO(GalilSignalRO): Update all analog channel readbacks based on the provided list of values. List of values must be in order from an_ch0 to an_ch7. - We first have to update the _last_readback timestamp of the GalilRIO parent device. + We first have to update the timestamp of the GalilRIO _readback_metadata device. Then we update all readbacks of all an_ch channels, before we run any subscriptions. This ensures that all readbacks are updated before any subscriptions are run, which may themselves read other channels. @@ -120,7 +121,7 @@ class GalilRIOSignalRO(GalilSignalRO): """ timestamp = time.time() # Update parent's last readback before running subscriptions!! - self.parent._last_readback = time.monotonic() + self._readback_metadata["last_readback"] = time.monotonic() updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val) # Update all readbacks first for walk in self.parent.walk_signals(): @@ -300,17 +301,12 @@ class GalilRIO(PSIDeviceBase): self.controller = GalilRIOController( socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager ) - self._last_readback: float = time.monotonic() + self._readback_metadata: dict[str, float] = {"last_readback": time.monotonic()} super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs) self.controller.subscribe( self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE ) - @property - def last_readback(self) -> float: - """Return the time of the last readback from the controller.""" - return self._last_readback - # pylint: disable=arguments-differ def wait_for_connection(self, timeout: float = 30.0) -> None: """Wait for the RIO controller to be connected within timeout period."""