w
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m28s

This commit is contained in:
2026-02-10 15:36:06 +01:00
parent be701e97d3
commit faab59c3a5
2 changed files with 16 additions and 21 deletions

View File

@@ -4,6 +4,7 @@ This module contains the base class for Galil controllers as well as the signals
import functools
import time
from typing import Any
from bec_lib import bec_logger
from ophyd.utils import ReadOnlyError
@@ -347,25 +348,23 @@ class GalilSignalBase(SocketSignal):
def __init__(self, signal_name, **kwargs):
self.signal_name = signal_name
super().__init__(**kwargs)
self.controller = self._find_controller_recursively()
self.controller: Controller = self._find_attribute_recursively("controller")
def _find_controller_recursively(self) -> Controller:
def _find_attribute_recursively(self, attribute: str) -> Any:
"""
Find controller instance recursively for nested sub-devices.
This is for example needed for the GalilRIO which has DDC components for the analog
and digital channels. (DDC components are sub-devices).
Find an attribute recursively for nested sub-devices.
This is needed to find for example the controller instance for DDC components,
thus nested devices.
"""
_MAX_DEPTH = 10 # to prevent infinite recursion
max_depth = 10 # to prevent infinite recursion
current_parent = self.parent
depth = 0
while current_parent is not None and depth < _MAX_DEPTH:
if hasattr(current_parent, "controller") and isinstance(
current_parent.controller, Controller
):
return current_parent.controller
while current_parent is not None and depth < max_depth:
if hasattr(current_parent, attribute):
return getattr(current_parent, attribute)
current_parent = getattr(current_parent, "parent", None)
depth += 1
raise RuntimeError("Controller not found within maximum depth")
raise RuntimeError(f"Attribute '{attribute}' not found within maximum depth {max_depth}.")
class GalilSignalRO(GalilSignalBase):

View File

@@ -84,6 +84,7 @@ class GalilRIOSignalRO(GalilSignalRO):
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
super().__init__(signal_name, parent=parent, **kwargs)
self._readback_metadata = self._find_attribute_recursively("_readback_metadata")
self._channel = channel
self._metadata["connected"] = False
@@ -99,7 +100,7 @@ class GalilRIOSignalRO(GalilSignalRO):
def get(self):
"""Get current analog channel values from the Galil RIO controller."""
# If the last readback has happend more than _READ_TIMEOUT seconds ago, read all channels again
if time.monotonic() - self.parent.last_readback > self._READ_TIMEOUT:
if time.monotonic() - self._readback_metadata["last_readback"] > self._READ_TIMEOUT:
self._readback = self._socket_get()
return self._readback
@@ -109,7 +110,7 @@ class GalilRIOSignalRO(GalilSignalRO):
Update all analog channel readbacks based on the provided list of values.
List of values must be in order from an_ch0 to an_ch7.
We first have to update the _last_readback timestamp of the GalilRIO parent device.
We first have to update the timestamp of the GalilRIO _readback_metadata device.
Then we update all readbacks of all an_ch channels, before we run any subscriptions.
This ensures that all readbacks are updated before any subscriptions are run, which
may themselves read other channels.
@@ -120,7 +121,7 @@ class GalilRIOSignalRO(GalilSignalRO):
"""
timestamp = time.time()
# Update parent's last readback before running subscriptions!!
self.parent._last_readback = time.monotonic()
self._readback_metadata["last_readback"] = time.monotonic()
updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val)
# Update all readbacks first
for walk in self.parent.walk_signals():
@@ -300,17 +301,12 @@ class GalilRIO(PSIDeviceBase):
self.controller = GalilRIOController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
)
self._last_readback: float = time.monotonic()
self._readback_metadata: dict[str, float] = {"last_readback": time.monotonic()}
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
self.controller.subscribe(
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
)
@property
def last_readback(self) -> float:
"""Return the time of the last readback from the controller."""
return self._last_readback
# pylint: disable=arguments-differ
def wait_for_connection(self, timeout: float = 30.0) -> None:
"""Wait for the RIO controller to be connected within timeout period."""