From a935dbf496e7c3455f56e4f28e406950a7f3daec Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 10 Feb 2026 15:49:21 +0100 Subject: [PATCH] w --- csaxs_bec/devices/omny/galil/galil_rio.py | 115 ++++++++-------------- 1 file changed, 39 insertions(+), 76 deletions(-) diff --git a/csaxs_bec/devices/omny/galil/galil_rio.py b/csaxs_bec/devices/omny/galil/galil_rio.py index 52f7d5a..597ca12 100644 --- a/csaxs_bec/devices/omny/galil/galil_rio.py +++ b/csaxs_bec/devices/omny/galil/galil_rio.py @@ -13,11 +13,12 @@ over TCP/IP. It also provides a device integration that interfaces to these from __future__ import annotations import time -from typing import TYPE_CHECKING, Literal +from typing import TYPE_CHECKING, Literal, Type from bec_lib.logger import bec_logger from ophyd import DynamicDeviceComponent as DDC from ophyd import Kind +from ophyd.utils import ReadOnlyError from ophyd_devices import PSIDeviceBase from ophyd_devices.utils.controller import Controller, threadlocked from ophyd_devices.utils.socket import SocketIO @@ -66,7 +67,7 @@ class GalilRIOController(Controller): ) -class GalilRIOSignalRO(GalilSignalRO): +class GalilRIOSignal(GalilSignalBase): """ Read-only Signal for reading a single analog input channel from the Galil RIO controller. It always read all 8 analog channels at once, and updates the reabacks of all channels. @@ -79,7 +80,6 @@ class GalilRIOSignalRO(GalilSignalRO): parent (GalilRIO): Parent GalilRIO device. """ - _NUM_ANALOG_CHANNELS = 8 _READ_TIMEOUT = 0.1 # seconds def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs): @@ -88,15 +88,6 @@ class GalilRIOSignalRO(GalilSignalRO): self._channel = channel self._metadata["connected"] = False - def _socket_get(self) -> float: - """Get command for the readback signal""" - cmd = "MG@" + ",@".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)]) - ret = self.controller.socket_put_and_receive(cmd) - values = [float(val) for val in ret.strip().split(" ")] - # This updates all channels' readbacks, including self._readback - self._update_all_channels(values) - return self._readback - def get(self): """Get current analog channel values from the Galil RIO controller.""" # If the last readback has happend more than _READ_TIMEOUT seconds ago, read all channels again @@ -105,7 +96,7 @@ class GalilRIOSignalRO(GalilSignalRO): return self._readback # pylint: disable=protected-access - def _update_all_channels(self, values: list[float]) -> None: + def _update_all_channels(self, values: list[float], signal_cls: Type[GalilRIOSignal]) -> None: """ Update all analog channel readbacks based on the provided list of values. List of values must be in order from an_ch0 to an_ch7. @@ -118,6 +109,7 @@ class GalilRIOSignalRO(GalilSignalRO): Args: values (list[float]): List of 8 float values corresponding to the analog channels. They must be in order from an_ch0 to an_ch7. + signal_cls (Type[GalilRIOSignal]): The class of the signal to update, used to identify which signals to update. """ timestamp = time.time() # Update parent's last readback before running subscriptions!! @@ -125,7 +117,7 @@ class GalilRIOSignalRO(GalilSignalRO): updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val) # Update all readbacks first for walk in self.parent.walk_signals(): - if walk.item.attr_name.startswith("an_ch"): + if isinstance(walk.item, signal_cls): idx = int(walk.item.attr_name[-1]) if 0 <= idx < len(values): old_val = walk.item._readback @@ -146,80 +138,51 @@ class GalilRIOSignalRO(GalilSignalRO): ) -class GalilRIODigitalOutSignal(GalilSignalBase): - """ - Signal for controlling digital outputs of the Galil RIO controller. - # TODO Check and consider if we want to also add readout on all, and a readback throttling - """ +class GalilRIOSignalRO(GalilRIOSignal): + + _NUM_ANALOG_CHANNELS = 8 def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs): - super().__init__(signal_name, parent=parent, **kwargs) - self._channel = channel - self._metadata["connected"] = False + super().__init__(signal_name=signal_name, channel=channel, parent=parent, **kwargs) + self._metadata["write_access"] = False + + def _socket_set(self, val): + raise ReadOnlyError("Read-only signals cannot be set") def _socket_get(self) -> float: """Get command for the readback signal""" - cmd = f"MG@OUT[{self._channel}]" + cmd = "MG@" + ",@".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)]) ret = self.controller.socket_put_and_receive(cmd) - logger.debug(f"Received readback for digital output channel {self._channel}: {ret}") - self._readback = float(ret.strip()) + values = [float(val) for val in ret.strip().split(" ")] + # This updates all channels' readbacks, including self._readback + self._update_all_channels(values, signal_cls=GalilRIOSignalRO) return self._readback - def _socket_set(self, value: Literal[0, 1]) -> None: + +class GalilRIODigitalOutSignal(GalilRIOSignal): # We reuse the logic implemented for Galil + """ + Signal for controlling digital outputs of the Galil RIO controller. + """ + + _NUM_DIGITAL_OUTPUT_CHANNELS = 10 + + def _socket_get(self) -> float: + """Get command for the readback signal""" + cmd = "MG@" + ",@".join([f"OUT[{ii}]" for ii in range(self._NUM_DIGITAL_OUTPUT_CHANNELS)]) + ret = self.controller.socket_put_and_receive(cmd) + values = [float(val) for val in ret.strip().split(" ")] + cmd = f"MG@OUT[{self._channel}]" + self._update_all_channels(values, signal_cls=GalilRIODigitalOutSignal) + return self._readback + + def _socket_set(self, val: Literal[0, 1]) -> None: """Set command for the digital output signal. Value should be 0 or 1.""" - if value not in (0, 1): + if val not in (0, 1): raise ValueError("Digital output value must be 0 or 1.") - cmd = f"SB{self._channel}" if value == 1 else f"CB{self._channel}" + cmd = f"SB{self._channel}" if val == 1 else f"CB{self._channel}" self.controller.socket_put_confirmed(cmd) - def put( - self, - value, - force=False, - connection_timeout=1, - callback=None, - use_complete=None, - timeout=1, - **kwargs, - ): - """Using channel access, set the write PV to `value`. - - Keyword arguments are passed on to callbacks - - Parameters - ---------- - value : any - The value to set - force : bool, optional - Skip checking the value in Python first - connection_timeout : float, optional - If not already connected, allow up to `connection_timeout` seconds - for the connection to complete. - use_complete : bool, optional - Override put completion settings - callback : callable - Callback for when the put has completed - timeout : float, optional - Timeout before assuming that put has failed. (Only relevant if - put completion is used.) - """ - if not force: - pass - # self.check_value(value) - old_value = self._readback - self.wait_for_connection(timeout=connection_timeout) - if use_complete is None: - use_complete = False - - self._socket_set(value) - - timestamp = time.time() - super().put(value, timestamp=timestamp, force=True) - self._run_subs( - sub_type=self.SUB_SETPOINT, old_value=old_value, value=value, timestamp=timestamp - ) - def _create_analog_channels(num_channels: int) -> dict[str, tuple]: """ @@ -283,7 +246,7 @@ class GalilRIO(PSIDeviceBase): ############################# analog_in = DDC(_create_analog_channels(8)) # Creates an_ch0 to an_ch7 - digital_out = DDC(_create_digital_output_channels(8)) # Creates di_out0 to di_out7 + digital_out = DDC(_create_digital_output_channels(10)) # Creates di_out0 to di_out9 def __init__( self,