w
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m27s

This commit is contained in:
2026-02-10 15:49:21 +01:00
parent faab59c3a5
commit a935dbf496

View File

@@ -13,11 +13,12 @@ over TCP/IP. It also provides a device integration that interfaces to these
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Literal
from typing import TYPE_CHECKING, Literal, Type
from bec_lib.logger import bec_logger
from ophyd import DynamicDeviceComponent as DDC
from ophyd import Kind
from ophyd.utils import ReadOnlyError
from ophyd_devices import PSIDeviceBase
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO
@@ -66,7 +67,7 @@ class GalilRIOController(Controller):
)
class GalilRIOSignalRO(GalilSignalRO):
class GalilRIOSignal(GalilSignalBase):
"""
Read-only Signal for reading a single analog input channel from the Galil RIO controller.
It always read all 8 analog channels at once, and updates the reabacks of all channels.
@@ -79,7 +80,6 @@ class GalilRIOSignalRO(GalilSignalRO):
parent (GalilRIO): Parent GalilRIO device.
"""
_NUM_ANALOG_CHANNELS = 8
_READ_TIMEOUT = 0.1 # seconds
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
@@ -88,15 +88,6 @@ class GalilRIOSignalRO(GalilSignalRO):
self._channel = channel
self._metadata["connected"] = False
def _socket_get(self) -> float:
"""Get command for the readback signal"""
cmd = "MG@" + ",@".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)])
ret = self.controller.socket_put_and_receive(cmd)
values = [float(val) for val in ret.strip().split(" ")]
# This updates all channels' readbacks, including self._readback
self._update_all_channels(values)
return self._readback
def get(self):
"""Get current analog channel values from the Galil RIO controller."""
# If the last readback has happend more than _READ_TIMEOUT seconds ago, read all channels again
@@ -105,7 +96,7 @@ class GalilRIOSignalRO(GalilSignalRO):
return self._readback
# pylint: disable=protected-access
def _update_all_channels(self, values: list[float]) -> None:
def _update_all_channels(self, values: list[float], signal_cls: Type[GalilRIOSignal]) -> None:
"""
Update all analog channel readbacks based on the provided list of values.
List of values must be in order from an_ch0 to an_ch7.
@@ -118,6 +109,7 @@ class GalilRIOSignalRO(GalilSignalRO):
Args:
values (list[float]): List of 8 float values corresponding to the analog channels.
They must be in order from an_ch0 to an_ch7.
signal_cls (Type[GalilRIOSignal]): The class of the signal to update, used to identify which signals to update.
"""
timestamp = time.time()
# Update parent's last readback before running subscriptions!!
@@ -125,7 +117,7 @@ class GalilRIOSignalRO(GalilSignalRO):
updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val)
# Update all readbacks first
for walk in self.parent.walk_signals():
if walk.item.attr_name.startswith("an_ch"):
if isinstance(walk.item, signal_cls):
idx = int(walk.item.attr_name[-1])
if 0 <= idx < len(values):
old_val = walk.item._readback
@@ -146,80 +138,51 @@ class GalilRIOSignalRO(GalilSignalRO):
)
class GalilRIODigitalOutSignal(GalilSignalBase):
"""
Signal for controlling digital outputs of the Galil RIO controller.
# TODO Check and consider if we want to also add readout on all, and a readback throttling
"""
class GalilRIOSignalRO(GalilRIOSignal):
_NUM_ANALOG_CHANNELS = 8
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
super().__init__(signal_name, parent=parent, **kwargs)
self._channel = channel
self._metadata["connected"] = False
super().__init__(signal_name=signal_name, channel=channel, parent=parent, **kwargs)
self._metadata["write_access"] = False
def _socket_set(self, val):
raise ReadOnlyError("Read-only signals cannot be set")
def _socket_get(self) -> float:
"""Get command for the readback signal"""
cmd = f"MG@OUT[{self._channel}]"
cmd = "MG@" + ",@".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)])
ret = self.controller.socket_put_and_receive(cmd)
logger.debug(f"Received readback for digital output channel {self._channel}: {ret}")
self._readback = float(ret.strip())
values = [float(val) for val in ret.strip().split(" ")]
# This updates all channels' readbacks, including self._readback
self._update_all_channels(values, signal_cls=GalilRIOSignalRO)
return self._readback
def _socket_set(self, value: Literal[0, 1]) -> None:
class GalilRIODigitalOutSignal(GalilRIOSignal): # We reuse the logic implemented for Galil
"""
Signal for controlling digital outputs of the Galil RIO controller.
"""
_NUM_DIGITAL_OUTPUT_CHANNELS = 10
def _socket_get(self) -> float:
"""Get command for the readback signal"""
cmd = "MG@" + ",@".join([f"OUT[{ii}]" for ii in range(self._NUM_DIGITAL_OUTPUT_CHANNELS)])
ret = self.controller.socket_put_and_receive(cmd)
values = [float(val) for val in ret.strip().split(" ")]
cmd = f"MG@OUT[{self._channel}]"
self._update_all_channels(values, signal_cls=GalilRIODigitalOutSignal)
return self._readback
def _socket_set(self, val: Literal[0, 1]) -> None:
"""Set command for the digital output signal. Value should be 0 or 1."""
if value not in (0, 1):
if val not in (0, 1):
raise ValueError("Digital output value must be 0 or 1.")
cmd = f"SB{self._channel}" if value == 1 else f"CB{self._channel}"
cmd = f"SB{self._channel}" if val == 1 else f"CB{self._channel}"
self.controller.socket_put_confirmed(cmd)
def put(
self,
value,
force=False,
connection_timeout=1,
callback=None,
use_complete=None,
timeout=1,
**kwargs,
):
"""Using channel access, set the write PV to `value`.
Keyword arguments are passed on to callbacks
Parameters
----------
value : any
The value to set
force : bool, optional
Skip checking the value in Python first
connection_timeout : float, optional
If not already connected, allow up to `connection_timeout` seconds
for the connection to complete.
use_complete : bool, optional
Override put completion settings
callback : callable
Callback for when the put has completed
timeout : float, optional
Timeout before assuming that put has failed. (Only relevant if
put completion is used.)
"""
if not force:
pass
# self.check_value(value)
old_value = self._readback
self.wait_for_connection(timeout=connection_timeout)
if use_complete is None:
use_complete = False
self._socket_set(value)
timestamp = time.time()
super().put(value, timestamp=timestamp, force=True)
self._run_subs(
sub_type=self.SUB_SETPOINT, old_value=old_value, value=value, timestamp=timestamp
)
def _create_analog_channels(num_channels: int) -> dict[str, tuple]:
"""
@@ -283,7 +246,7 @@ class GalilRIO(PSIDeviceBase):
#############################
analog_in = DDC(_create_analog_channels(8)) # Creates an_ch0 to an_ch7
digital_out = DDC(_create_digital_output_channels(8)) # Creates di_out0 to di_out7
digital_out = DDC(_create_digital_output_channels(10)) # Creates di_out0 to di_out9
def __init__(
self,