Add Digital Out signals to GalilRIO #138

Merged
appel_c merged 6 commits from add/rio-galil-set-gain into main 2026-02-16 17:51:37 +01:00
5 changed files with 237 additions and 126 deletions

View File

@@ -4,6 +4,7 @@ This module contains the base class for Galil controllers as well as the signals
import functools
import time
from typing import Any
from bec_lib import bec_logger
from ophyd.utils import ReadOnlyError
@@ -347,7 +348,7 @@ class GalilSignalBase(SocketSignal):
def __init__(self, signal_name, **kwargs):
self.signal_name = signal_name
super().__init__(**kwargs)
self.controller = self.parent.controller
self.controller = self.root.controller if hasattr(self.root, "controller") else None
class GalilSignalRO(GalilSignalBase):

View File

@@ -6,24 +6,26 @@ Link to the Galil RIO vendor page:
https://www.galil.com/plcs/remote-io/rio-471xx
This module provides the GalilRIOController for communication with the RIO controller
over TCP/IP. It also provides a device integration that interfaces to these
8 analog channels.
over TCP/IP. It also provides a device integration that interfaces to its
8 analog channels, and 16 digital output channels. Some PLCs may have 24 digital output channels,
which can be easily supported by changing the _NUM_DIGITAL_OUTPUT_CHANNELS variable.
"""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Literal
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DynamicDeviceComponent as DDC
from ophyd import Kind
from ophyd.utils import ReadOnlyError
from ophyd_devices import PSIDeviceBase
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO
from csaxs_bec.devices.omny.galil.galil_ophyd import (
GalilCommunicationError,
GalilSignalRO,
GalilSignalBase,
retry_once,
)
@@ -35,15 +37,11 @@ logger = bec_logger.logger
class GalilRIOController(Controller):
"""
Controller Class for Galil RIO controller communication.
Multiple controllers are in use at the cSAXS beamline:
- 129.129.98.64 (port 23)
"""
"""Controller Class for Galil RIO controller communication."""
@threadlocked
def socket_put(self, val: str) -> None:
"""Socker put method."""
self.sock.put(f"{val}\r".encode())
@retry_once
@@ -64,21 +62,95 @@ class GalilRIOController(Controller):
)
class GalilRIOSignalRO(GalilSignalRO):
class GalilRIOAnalogSignalRO(GalilSignalBase):
"""
Read-only Signal for reading a single analog input channel from the Galil RIO controller.
It always read all 8 analog channels at once, and updates the reabacks of all channels.
New readbacks are only fetched from the controller if the last readback is older than
_READ_TIMEOUT seconds, otherwise the last cached readback is returned to reduce network traffic.
Signal for reading analog input channels of the Galil RIO controller. This signal is read-only, so
the set method raises a ReadOnlyError. The get method retrieves the values of all analog
channels in a single socket command. The readback values of all channels are updated based
on the response, and subscriptions are run for all channels. Readings are cached as implemented
in the SocketSignal class, so that multiple reads of the same channel within an update cycle do
not result in multiple socket calls.
Args:
signal_name (str): Name of the signal.
channel (int): Analog channel number (0-7).
parent (GalilRIO): Parent GalilRIO device.
signal_name (str): The name of the signal, e.g. "ch0", "ch1", ..., "ch7"
channel (int): The channel number corresponding to the signal, e.g. 0 for "ch0", 1 for "ch1", ...
parent (GalilRIO): The parent device instance that this signal belongs to.
"""
_NUM_ANALOG_CHANNELS = 8
_READ_TIMEOUT = 0.1 # seconds
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
super().__init__(signal_name=signal_name, parent=parent, **kwargs)
self._channel = channel
self._metadata["connected"] = False
self._metadata["write_access"] = False
def _socket_set(self, val):
"""Read-only signal, so set method raises an error."""
raise ReadOnlyError(f"Signal {self.name} is read-only.")
def _socket_get(self) -> float:
"""Get command for the readback signal"""
cmd = "MG@" + ", @".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)])
ret = self.controller.socket_put_and_receive(cmd)
values = [float(val) for val in ret.strip().split(" ")]
# Run updates for all channels. This also updates the _readback and metadata timestamp
# value of this channel.
self._update_all_channels(values)
return self._readback
# pylint: disable=protected-access
def _update_all_channels(self, values: list[float]) -> None:
"""
Method to receive a list of readback values for channels 0 to 7. Updates for each channel idx
are applied to the corresponding GalilRIOAnalogSignalRO signal with matching attr_name "ch{idx}".
We also update the _last_readback attribute of each of the signals, to avoid multiple socket calls,
but rather use the cached value of the combined reading for all channels.
Args:
values (list[float]): List of new readback values for all channels, where the
index corresponds to the channel number (0-7).
"""
updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val)
# Update all readbacks first
for walk in self.parent.walk_signals():
if isinstance(walk.item, GalilRIOAnalogSignalRO):
idx = int(walk.item.attr_name[-1])
if 0 <= idx < len(values):
old_val = walk.item._readback
new_val = values[idx]
walk.item._metadata["timestamp"] = self._last_readback
walk.item._last_readback = self._last_readback
walk.item._readback = new_val
if (
idx != self._channel
): # Only run subscriptions on other channels, not on itself
# as this is handled by the SocketSignal and we want to avoid running multiple
# subscriptions for the same channel update
updates[walk.item.attr_name] = (new_val, old_val)
else:
logger.warning(
f"Received {len(values)} values but found channel index {idx} in signal {walk.item.name}. Skipping update for this signal."
)
# Run subscriptions after all readbacks have been updated
# on all channels except the one that triggered the update
for walk in self.parent.walk_signals():
if walk.item.attr_name in updates:
new_val, old_val = updates[walk.item.attr_name]
walk.item._run_subs(
sub_type=walk.item.SUB_VALUE,
old_value=old_val,
value=new_val,
timestamp=self._last_readback,
)
class GalilRIODigitalOutSignal(GalilSignalBase):
"""Signal for controlling digital outputs of the Galil RIO controller."""
_NUM_DIGITAL_OUTPUT_CHANNELS = 16
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
super().__init__(signal_name, parent=parent, **kwargs)
@@ -87,81 +159,83 @@ class GalilRIOSignalRO(GalilSignalRO):
def _socket_get(self) -> float:
"""Get command for the readback signal"""
cmd = "MG@" + ",@".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)])
cmd = f"MG@OUT[{self._channel}]"
ret = self.controller.socket_put_and_receive(cmd)
values = [float(val) for val in ret.strip().split(" ")]
# This updates all channels' readbacks, including self._readback
self._update_all_channels(values)
self._readback = float(ret.strip())
return self._readback
def get(self):
"""Get current analog channel values from the Galil RIO controller."""
# If the last readback has happend more than _READ_TIMEOUT seconds ago, read all channels again
if time.monotonic() - self.parent.last_readback > self._READ_TIMEOUT:
self._readback = self._socket_get()
return self._readback
def _socket_set(self, val: Literal[0, 1]) -> None:
"""Set command for the digital output signal. Value should be 0 or 1."""
# pylint: disable=protected-access
def _update_all_channels(self, values: list[float]) -> None:
"""
Update all analog channel readbacks based on the provided list of values.
List of values must be in order from an_ch0 to an_ch7.
if val not in (0, 1):
raise ValueError("Digital output value must be 0 or 1.")
cmd = f"SB{self._channel}" if val == 1 else f"CB{self._channel}"
self.controller.socket_put_confirmed(cmd)
We first have to update the _last_readback timestamp of the GalilRIO parent device.
Then we update all readbacks of all an_ch channels, before we run any subscriptions.
This ensures that all readbacks are updated before any subscriptions are run, which
may themselves read other channels.
Args:
values (list[float]): List of 8 float values corresponding to the analog channels.
They must be in order from an_ch0 to an_ch7.
"""
timestamp = time.time()
# Update parent's last readback before running subscriptions!!
self.parent._last_readback = time.monotonic()
updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val)
# Update all readbacks first
for walk in self.parent.walk_signals():
if walk.item.attr_name.startswith("an_ch"):
idx = int(walk.item.attr_name[-1])
if 0 <= idx < len(values):
old_val = walk.item._readback
new_val = values[idx]
walk.item._metadata["timestamp"] = timestamp
walk.item._readback = new_val
updates[walk.item.attr_name] = (new_val, old_val)
def _create_analog_channels(num_channels: int) -> dict[str, tuple]:
"""
Helper method to create a dictionary of analog channel definitions for the DynamicDeviceComponent.
# Run subscriptions after all readbacks have been updated
for walk in self.parent.walk_signals():
if walk.item.attr_name in updates:
new_val, old_val = updates[walk.item.attr_name]
walk.item._run_subs(
sub_type=walk.item.SUB_VALUE,
old_value=old_val,
value=new_val,
timestamp=timestamp,
)
Args:
num_channels (int): The number of analog channels to create.
"""
an_channels = {}
for i in range(0, num_channels):
an_channels[f"ch{i}"] = (
GalilRIOAnalogSignalRO,
f"ch{i}",
{"kind": Kind.normal, "notify_bec": True, "channel": i, "doc": f"Analog channel {i}."},
)
return an_channels
def _create_digital_output_channels(num_channels: int) -> dict[str, tuple]:
"""
Helper method to create a dictionary of digital output channel definitions for the DynamicDeviceComponent.
Args:
num_channels (int): The number of digital output channels to create.
"""
di_out_channels = {}
for i in range(0, num_channels):
di_out_channels[f"ch{i}"] = (
GalilRIODigitalOutSignal,
f"ch{i}",
{
"kind": Kind.config,
"notify_bec": True,
"channel": i,
"doc": f"Digital output channel {i}.",
},
)
return di_out_channels
class GalilRIO(PSIDeviceBase):
"""
Galil RIO controller integration with 8 analog input channels. To implement the device,
please provide the appropriate host and port (default port is 23).
Galil RIO controller integration with 16 digital output channels and 8 analog input channels.
The default port for the controller is 23.
Args:
host (str): Hostname or IP address of the Galil RIO controller.
port (int, optional): Port number for the TCP/IP connection. Defaults to 23.
socket_cls (type[SocketIO], optional): Socket class to use for communication. Defaults to SocketIO.
scan_info (ScanInfo, optional): ScanInfo object for the device.
device_manager (DeviceManagerDS): The device manager instance that manages this device.
**kwargs: Additional keyword arguments passed to the PSIDeviceBase constructor.
"""
SUB_CONNECTION_CHANGE = "connection_change"
an_ch0 = Cpt(GalilRIOSignalRO, signal_name="an_ch0", channel=0, doc="Analog input channel 0")
an_ch1 = Cpt(GalilRIOSignalRO, signal_name="an_ch1", channel=1, doc="Analog input channel 1")
an_ch2 = Cpt(GalilRIOSignalRO, signal_name="an_ch2", channel=2, doc="Analog input channel 2")
an_ch3 = Cpt(GalilRIOSignalRO, signal_name="an_ch3", channel=3, doc="Analog input channel 3")
an_ch4 = Cpt(GalilRIOSignalRO, signal_name="an_ch4", channel=4, doc="Analog input channel 4")
an_ch5 = Cpt(GalilRIOSignalRO, signal_name="an_ch5", channel=5, doc="Analog input channel 5")
an_ch6 = Cpt(GalilRIOSignalRO, signal_name="an_ch6", channel=6, doc="Analog input channel 6")
an_ch7 = Cpt(GalilRIOSignalRO, signal_name="an_ch7", channel=7, doc="Analog input channel 7")
#############################
### Analog input channels ###
#############################
analog_in = DDC(_create_analog_channels(GalilRIOAnalogSignalRO._NUM_ANALOG_CHANNELS))
digital_out = DDC(
_create_digital_output_channels(GalilRIODigitalOutSignal._NUM_DIGITAL_OUTPUT_CHANNELS)
)
def __init__(
self,
@@ -177,19 +251,18 @@ class GalilRIO(PSIDeviceBase):
if port is None:
port = 23 # Default port for Galil RIO controller
self.controller = GalilRIOController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
name=f"GalilRIOController_{name}",
socket_cls=socket_cls,
socket_host=host,
socket_port=port,
device_manager=device_manager,
)
self._last_readback: float = time.monotonic()
self._readback_metadata: dict[str, float] = {"last_readback": 0.0}
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
self.controller.subscribe(
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
)
@property
def last_readback(self) -> float:
"""Return the time of the last readback from the controller."""
return self._last_readback
# pylint: disable=arguments-differ
def wait_for_connection(self, timeout: float = 30.0) -> None:
"""Wait for the RIO controller to be connected within timeout period."""
@@ -207,7 +280,7 @@ class GalilRIO(PSIDeviceBase):
if __name__ == "__main__":
HOST_NAME = "129.129.98.64"
HOST_NAME = "129.129.122.14"
from bec_server.device_server.tests.utils import DMMock
dm = DMMock()

View File

@@ -1,14 +0,0 @@
"""
Conftest runs for all tests in this directory and subdirectories. Thereby, we know for
certain that the SocketSignal.READBACK_TIMEOUT is set to 0 for all tests, which prevents
hanging tests when a readback is attempted on a non-connected socket.
"""
# conftest.py
import pytest
from ophyd_devices.utils.socket import SocketSignal
@pytest.fixture(autouse=True)
def patch_socket_timeout(monkeypatch):
monkeypatch.setattr(SocketSignal, "READBACK_TIMEOUT", 0.0)

View File

@@ -2,6 +2,7 @@ from unittest import mock
import pytest
from ophyd_devices.tests.utils import SocketMock
from ophyd_devices.utils.socket import SocketSignal
from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor
@@ -17,6 +18,11 @@ def fsamroy(dm_with_devices):
socket_cls=SocketMock,
device_manager=dm_with_devices,
)
for walk in fsamroy_motor.walk_signals():
if isinstance(walk.item, SocketSignal):
walk.item._readback_timeout = (
0.0 # Set the readback timeout to 0 to avoid waiting during tests
)
fsamroy_motor.controller.on()
assert isinstance(fsamroy_motor.controller, FuprGalilController)
yield fsamroy_motor

View File

@@ -9,7 +9,11 @@ from ophyd_devices.tests.utils import SocketMock
from csaxs_bec.devices.npoint.npoint import NPointAxis, NPointController
from csaxs_bec.devices.omny.galil.fgalil_ophyd import FlomniGalilController, FlomniGalilMotor
from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor
from csaxs_bec.devices.omny.galil.galil_rio import GalilRIO, GalilRIOController, GalilRIOSignalRO
from csaxs_bec.devices.omny.galil.galil_rio import (
GalilRIO,
GalilRIOAnalogSignalRO,
GalilRIOController,
)
from csaxs_bec.devices.omny.galil.lgalil_ophyd import LamniGalilController, LamniGalilMotor
from csaxs_bec.devices.omny.galil.ogalil_ophyd import OMNYGalilController, OMNYGalilMotor
from csaxs_bec.devices.omny.galil.sgalil_ophyd import GalilController, SGalilMotor
@@ -272,26 +276,27 @@ def test_galil_rio_signal_read(galil_rio):
## Test read of all channels
###########
assert galil_rio.an_ch0._READ_TIMEOUT == 0.1 # Default read timeout of 100ms
assert galil_rio.analog_in.ch0._readback_timeout == 0.1 # Default read timeout of 100ms
# Mock the socket to return specific values
galil_rio.controller.sock.buffer_recv = [b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901"]
galil_rio._last_readback = 0 # Force read from controller
analog_bufffer = b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901\r\n"
galil_rio.controller.sock.buffer_recv = [] # Clear any existing buffer
galil_rio.controller.sock.buffer_recv.append(analog_bufffer)
read_values = galil_rio.read()
assert len(read_values) == 8 # 8 channels
expected_values = {
galil_rio.an_ch0.name: {"value": 1.234},
galil_rio.an_ch1.name: {"value": 2.345},
galil_rio.an_ch2.name: {"value": 3.456},
galil_rio.an_ch3.name: {"value": 4.567},
galil_rio.an_ch4.name: {"value": 5.678},
galil_rio.an_ch5.name: {"value": 6.789},
galil_rio.an_ch6.name: {"value": 7.890},
galil_rio.an_ch7.name: {"value": 8.901},
galil_rio.analog_in.ch0.name: {"value": 1.234},
galil_rio.analog_in.ch1.name: {"value": 2.345},
galil_rio.analog_in.ch2.name: {"value": 3.456},
galil_rio.analog_in.ch3.name: {"value": 4.567},
galil_rio.analog_in.ch4.name: {"value": 5.678},
galil_rio.analog_in.ch5.name: {"value": 6.789},
galil_rio.analog_in.ch6.name: {"value": 7.890},
galil_rio.analog_in.ch7.name: {"value": 8.901},
}
# All timestamps should be the same
assert all(
ret["timestamp"] == read_values[galil_rio.an_ch0.name]["timestamp"]
ret["timestamp"] == read_values[galil_rio.analog_in.ch0.name]["timestamp"]
for signal_name, ret in read_values.items()
)
# Check values
@@ -301,7 +306,7 @@ def test_galil_rio_signal_read(galil_rio):
# Check communication command to socker
assert galil_rio.controller.sock.buffer_put == [
b"MG@AN[0],@AN[1],@AN[2],@AN[3],@AN[4],@AN[5],@AN[6],@AN[7]\r"
b"MG@AN[0], @AN[1], @AN[2], @AN[3], @AN[4], @AN[5], @AN[6], @AN[7]\r"
]
###########
@@ -313,11 +318,11 @@ def test_galil_rio_signal_read(galil_rio):
def value_callback(value, old_value, **kwargs):
obj = kwargs.get("obj")
galil = obj.parent
galil = obj.parent.parent
readback = galil.read()
value_callback_buffer.append(readback)
galil_rio.an_ch0.subscribe(value_callback, run=False)
galil_rio.analog_in.ch0.subscribe(value_callback, run=False)
galil_rio.controller.sock.buffer_recv = [b" 2.5 2.6 2.7 2.8 2.9 3.0 3.1 3.2"]
expected_values = [2.5, 2.6, 2.7, 2.8, 2.9, 3.0, 3.1, 3.2]
@@ -327,13 +332,15 @@ def test_galil_rio_signal_read(galil_rio):
# Should have used the cached value
for walk in galil_rio.walk_signals():
walk.item._READ_TIMEOUT = 10 # Make sure cached read is used
ret = galil_rio.an_ch0.read()
walk.item._readback_timeout = 10 # Make sure cached read is used
ret = galil_rio.analog_in.ch0.read()
# Should not trigger callback since value did not change
assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 1.234)
assert np.isclose(ret[galil_rio.analog_in.ch0.name]["value"], 1.234)
# Same timestamp as for another channel as this is cached read
assert np.isclose(ret[galil_rio.an_ch0.name]["timestamp"], galil_rio.an_ch7.timestamp)
assert np.isclose(
ret[galil_rio.analog_in.ch0.name]["timestamp"], galil_rio.analog_in.ch7.timestamp
)
assert len(value_callback_buffer) == 0
##################
@@ -341,10 +348,10 @@ def test_galil_rio_signal_read(galil_rio):
##################
# Now force a read from the controller
galil_rio._last_readback = 0 # Force read from controller
ret = galil_rio.an_ch0.read()
galil_rio.analog_in.ch0._last_readback = 0 # Force read from controller
ret = galil_rio.analog_in.ch0.read()
assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 2.5)
assert np.isclose(ret[galil_rio.analog_in.ch0.name]["value"], 2.5)
# Check callback invocation, but only 1 callback even with galil_rio.read() call in callback
assert len(value_callback_buffer) == 1
@@ -352,7 +359,45 @@ def test_galil_rio_signal_read(galil_rio):
assert np.isclose(values, expected_values).all()
assert all(
[
value["timestamp"] == value_callback_buffer[0][galil_rio.an_ch0.name]["timestamp"]
value["timestamp"]
== value_callback_buffer[0][galil_rio.analog_in.ch0.name]["timestamp"]
for value in value_callback_buffer[0].values()
]
)
def test_galil_rio_digital_out_signal(galil_rio):
"""
Test that the Galil RIO digital output signal can be set correctly.
"""
## Test Read from digital output channels
buffer_receive = []
excepted_put_buffer = []
for ii in range(galil_rio.digital_out.ch0._NUM_DIGITAL_OUTPUT_CHANNELS):
cmd = f"MG@OUT[{ii}]\r".encode()
excepted_put_buffer.append(cmd)
recv = " 1.000".encode()
buffer_receive.append(recv)
galil_rio.controller.sock.buffer_recv = buffer_receive # Mock response for readback
digital_read = galil_rio.read_configuration() # Read to populate readback values
for walk in galil_rio.digital_out.walk_signals():
assert np.isclose(digital_read[walk.item.name]["value"], 1.0)
assert galil_rio.controller.sock.buffer_put == excepted_put_buffer
# Test writing to digital output channels
galil_rio.controller.sock.buffer_put = [] # Clear buffer put
galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback
# Set digital output channel 0 to high
galil_rio.digital_out.ch0.put(1)
assert galil_rio.controller.sock.buffer_put == [b"SB0\r"]
# Set digital output channel 0 to low
galil_rio.controller.sock.buffer_put = [] # Clear buffer put
galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback
galil_rio.digital_out.ch0.put(0)
assert galil_rio.controller.sock.buffer_put == [b"CB0\r"]