Compare commits

..

1 Commits

Author SHA1 Message Date
82d47c7511 feat(galil-rio): Add GalilRIO device with 8 analog channels
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m28s
CI for csaxs_bec / test (push) Successful in 1m30s
2026-01-29 08:20:38 +01:00
2 changed files with 338 additions and 19 deletions

View File

@@ -1,10 +1,46 @@
"""
Module for the Galil RIO (RIO-471xx) controller interface. The controller is a compact PLC
with Ethernet. It has digital and analog I/O as well as counters and timers.
Link to the Galil RIO vendor page:
https://www.galil.com/plcs/remote-io/rio-471xx
This module provides the GalilRIOController for communication with the RIO controller
over TCP/IP. It also provides a device integration that interfaces to these
8 analog channels.
"""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd_devices import PSIDeviceBase
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketSignal
from ophyd_devices.utils.socket import SocketIO
from csaxs_bec.devices.omny.galil.galil_ophyd import GalilCommunicationError, retry_once
from csaxs_bec.devices.omny.galil.galil_ophyd import (
GalilCommunicationError,
GalilSignalRO,
retry_once,
)
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from bec_server.device_server.devices.devicemanager import DeviceManagerDS
logger = bec_logger.logger
class GalilRIO(Controller):
class GalilRIOController(Controller):
"""
Controller Class for Galil RIO controller communication.
Multiple controllers are in use at the cSAXS beamline:
- 129.129.98.64 (port 23)
"""
@threadlocked
def socket_put(self, val: str) -> None:
@@ -28,8 +64,154 @@ class GalilRIO(Controller):
)
class GalilRIOSignalBase(SocketSignal):
def __init__(self, signal_name, **kwargs):
self.signal_name = signal_name
super().__init__(**kwargs)
self.rio_controller = self.parent.rio_controller
class GalilRIOSignalRO(GalilSignalRO):
"""
Read-only Signal for reading a single analog input channel from the Galil RIO controller.
It always read all 8 analog channels at once, and updates the reabacks of all channels.
New readbacks are only fetched from the controller if the last readback is older than
_READ_TIMEOUT seconds, otherwise the last cached readback is returned to reduce network traffic.
Args:
signal_name (str): Name of the signal.
channel (int): Analog channel number (0-7).
parent (GalilRIO): Parent GalilRIO device.
"""
_NUM_ANALOG_CHANNELS = 8
_READ_TIMEOUT = 0.1 # seconds
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
super().__init__(signal_name, parent=parent, **kwargs)
self._channel = channel
self._metadata["connected"] = False
def _socket_get(self) -> float:
"""Get command for the readback signal"""
cmd = "MG@" + ",@".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)])
ret = self.controller.socket_put_and_receive(cmd)
values = [float(val) for val in ret.strip().split(" ")]
# This updates all channels' readbacks, including self._readback
self._update_all_channels(values)
return self._readback
def get(self):
"""Get current analog channel values from the Galil RIO controller."""
# If the last readback has happend more than _READ_TIMEOUT seconds ago, read all channels again
if time.monotonic() - self.parent.last_readback > self._READ_TIMEOUT:
self._readback = self._socket_get()
return self._readback
# pylint: disable=protected-access
def _update_all_channels(self, values: list[float]) -> None:
"""
Update all analog channel readbacks based on the provided list of values.
List of values must be in order from an_ch0 to an_ch7.
We first have to update the _last_readback timestamp of the GalilRIO parent device.
Then we update all readbacks of all an_ch channels, before we run any subscriptions.
This ensures that all readbacks are updated before any subscriptions are run, which
may themselves read other channels.
Args:
values (list[float]): List of 8 float values corresponding to the analog channels.
They must be in order from an_ch0 to an_ch7.
"""
timestamp = time.time()
# Update parent's last readback before running subscriptions!!
self.parent._last_readback = time.monotonic()
updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val)
# Update all readbacks first
for walk in self.parent.walk_signals():
if walk.item.attr_name.startswith("an_ch"):
idx = int(walk.item.attr_name[-1])
if 0 <= idx < len(values):
old_val = walk.item._readback
new_val = values[idx]
walk.item._metadata["timestamp"] = timestamp
walk.item._readback = new_val
updates[walk.item.attr_name] = (new_val, old_val)
# Run subscriptions after all readbacks have been updated
for walk in self.parent.walk_signals():
if walk.item.attr_name in updates:
new_val, old_val = updates[walk.item.attr_name]
walk.item._run_subs(
sub_type=walk.item.SUB_VALUE,
old_value=old_val,
value=new_val,
timestamp=timestamp,
)
class GalilRIO(PSIDeviceBase):
"""
Galil RIO controller integration with 8 analog input channels. To implement the device,
please provide the appropriate host and port (default port is 23).
Args:
host (str): Hostname or IP address of the Galil RIO controller.
"""
SUB_CONNECTION_CHANGE = "connection_change"
an_ch0 = Cpt(GalilRIOSignalRO, signal_name="an_ch0", channel=0, doc="Analog input channel 0")
an_ch1 = Cpt(GalilRIOSignalRO, signal_name="an_ch1", channel=1, doc="Analog input channel 1")
an_ch2 = Cpt(GalilRIOSignalRO, signal_name="an_ch2", channel=2, doc="Analog input channel 2")
an_ch3 = Cpt(GalilRIOSignalRO, signal_name="an_ch3", channel=3, doc="Analog input channel 3")
an_ch4 = Cpt(GalilRIOSignalRO, signal_name="an_ch4", channel=4, doc="Analog input channel 4")
an_ch5 = Cpt(GalilRIOSignalRO, signal_name="an_ch5", channel=5, doc="Analog input channel 5")
an_ch6 = Cpt(GalilRIOSignalRO, signal_name="an_ch6", channel=6, doc="Analog input channel 6")
an_ch7 = Cpt(GalilRIOSignalRO, signal_name="an_ch7", channel=7, doc="Analog input channel 7")
def __init__(
self,
*,
name: str,
host: str,
device_manager: DeviceManagerDS,
port: int | None = None,
socket_cls: type[SocketIO] = SocketIO,
scan_info: ScanInfo | None = None,
**kwargs,
):
if port is None:
port = 23 # Default port for Galil RIO controller
self.controller = GalilRIOController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
)
self._last_readback: float = time.monotonic()
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
self.controller.subscribe(
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
)
@property
def last_readback(self) -> float:
"""Return the time of the last readback from the controller."""
return self._last_readback
# pylint: disable=arguments-differ
def wait_for_connection(self, timeout: float = 30.0) -> None:
"""Wait for the RIO controller to be connected within timeout period."""
self.controller.on(timeout=timeout)
def destroy(self) -> None:
"""Make sure to turn off the controller socket on destroy."""
self.controller.off(update_config=False)
return super().destroy()
# pylint: disable=protected-access
def _update_connection_state(self, **kwargs):
for walk in self.walk_signals():
walk.item._metadata["connected"] = self.controller.connected
if __name__ == "__main__":
HOST_NAME = "129.129.98.64"
from bec_server.device_server.tests.utils import DMMock
dm = DMMock()
rio = GalilRIO(name="rio", host=HOST_NAME, device_manager=dm)
rio.wait_for_connection(timeout=10)
print("Connected:", rio.an_ch1.read())
print("All channels:", rio.read())

View File

@@ -1,13 +1,15 @@
import copy
import inspect
from unittest import mock
import numpy as np
import pytest
from bec_server.device_server.tests.utils import DMMock
from ophyd_devices.tests.utils import SocketMock
from csaxs_bec.devices.npoint.npoint import NPointAxis, NPointController
from csaxs_bec.devices.omny.galil.fgalil_ophyd import FlomniGalilController, FlomniGalilMotor
from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor
from csaxs_bec.devices.omny.galil.galil_rio import GalilRIO, GalilRIOController, GalilRIOSignalRO
from csaxs_bec.devices.omny.galil.lgalil_ophyd import LamniGalilController, LamniGalilMotor
from csaxs_bec.devices.omny.galil.ogalil_ophyd import OMNYGalilController, OMNYGalilMotor
from csaxs_bec.devices.omny.galil.sgalil_ophyd import GalilController, SGalilMotor
@@ -173,9 +175,9 @@ def test_find_reference(leyex, axis_nr, socket_put_messages, socket_get_messages
assert leyex.controller.sock.buffer_put == socket_put_messages
def test_wait_for_connection_called():
def test_wait_for_connection_called(dm_with_devices):
"""Test that wait_for_connection is called on all motors that have a socket controller."""
dm = DMMock()
dm = dm_with_devices
testable_connections = [
(NPointAxis, NPointController),
(FlomniGalilMotor, FlomniGalilController),
@@ -187,6 +189,7 @@ def test_wait_for_connection_called():
(RtLamniMotor, RtLamniController),
(RtOMNYMotor, RtOMNYController),
(SmaractMotor, SmaractController),
(GalilRIO, GalilRIOController),
]
for motor_cls, controller_cls in testable_connections:
# Store values to restore later
@@ -195,14 +198,20 @@ def test_wait_for_connection_called():
controller_cls._reset_controller()
controller_cls._axes_per_controller = 3
motor = motor_cls(
"C",
name="test_motor",
host="mpc2680.psi.ch",
port=8081,
socket_cls=SocketMock,
device_manager=dm,
)
inspect_args = inspect.getfullargspec(motor_cls.__init__).args
inspect_kwargs = inspect.getfullargspec(motor_cls.__init__).kwonlyargs
if len(inspect_args) > 1:
args = ("C",)
else:
args = ()
kwargs = {
"name": "test_motor",
"host": "mpc2680.psi.ch",
"port": 8081,
"device_manager": dm,
"socket_cls": SocketMock,
}
motor = motor_cls(*args, **kwargs)
with mock.patch.object(motor.controller, "on") as mock_on:
motor.wait_for_connection(timeout=5.0)
@@ -219,3 +228,131 @@ def test_wait_for_connection_called():
finally:
controller_cls._reset_controller()
controller_cls._axes_per_controller = ctrl_axis_backup
########################
#### Test Galil RIO ####
########################
@pytest.fixture
def galil_rio(dm_with_devices):
try:
rio = GalilRIO(
name="galil_rio",
host="129.129.0.1",
socket_cls=SocketMock,
device_manager=dm_with_devices,
)
rio.wait_for_connection()
yield rio
finally:
rio.destroy()
def test_galil_rio_initialization(galil_rio):
"""
Test that the Galil RIO signal can establish a connection.
"""
assert galil_rio.controller.connected is True
# All signals should be connected if the controller is connected
for walk in galil_rio.walk_signals():
signal = walk.item
assert signal.connected is True
assert galil_rio.controller._socket_host == "129.129.0.1"
assert galil_rio.controller._socket_port == 23 # Default port
def test_galil_rio_signal_read(galil_rio):
"""
Test that the Galil RIO signal can read values correctly.
"""
###########
## Test read of all channels
###########
assert galil_rio.an_ch0._READ_TIMEOUT == 0.1 # Default read timeout of 100ms
# Mock the socket to return specific values
galil_rio.controller.sock.buffer_recv = [b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901"]
galil_rio._last_readback = 0 # Force read from controller
read_values = galil_rio.read()
assert len(read_values) == 8 # 8 channels
expected_values = {
galil_rio.an_ch0.name: {"value": 1.234},
galil_rio.an_ch1.name: {"value": 2.345},
galil_rio.an_ch2.name: {"value": 3.456},
galil_rio.an_ch3.name: {"value": 4.567},
galil_rio.an_ch4.name: {"value": 5.678},
galil_rio.an_ch5.name: {"value": 6.789},
galil_rio.an_ch6.name: {"value": 7.890},
galil_rio.an_ch7.name: {"value": 8.901},
}
# All timestamps should be the same
assert all(
ret["timestamp"] == read_values[galil_rio.an_ch0.name]["timestamp"]
for signal_name, ret in read_values.items()
)
# Check values
for signal_name, expected in expected_values.items():
assert np.isclose(read_values[signal_name]["value"], expected["value"])
assert "timestamp" in read_values[signal_name]
# Check communication command to socker
assert galil_rio.controller.sock.buffer_put == [
b"MG@AN[0],@AN[1],@AN[2],@AN[3],@AN[4],@AN[5],@AN[6],@AN[7]\r"
]
###########
## Test read of single channel with callback
###########
# Add callback to update readback
value_callback_buffer: list[tuple] = []
def value_callback(value, old_value, **kwargs):
obj = kwargs.get("obj")
galil = obj.parent
readback = galil.read()
value_callback_buffer.append(readback)
galil_rio.an_ch0.subscribe(value_callback, run=False)
galil_rio.controller.sock.buffer_recv = [b" 2.5 2.6 2.7 2.8 2.9 3.0 3.1 3.2"]
expected_values = [2.5, 2.6, 2.7, 2.8, 2.9, 3.0, 3.1, 3.2]
##################
## Test cached readback
##################
# Should have used the cached value
for walk in galil_rio.walk_signals():
walk.item._READ_TIMEOUT = 10 # Make sure cached read is used
ret = galil_rio.an_ch0.read()
# Should not trigger callback since value did not change
assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 1.234)
# Same timestamp as for another channel as this is cached read
assert np.isclose(ret[galil_rio.an_ch0.name]["timestamp"], galil_rio.an_ch7.timestamp)
assert len(value_callback_buffer) == 0
##################
## Test unchached read from controller
##################
# Now force a read from the controller
galil_rio._last_readback = 0 # Force read from controller
ret = galil_rio.an_ch0.read()
assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 2.5)
# Check callback invocation, but only 1 callback even with galil_rio.read() call in callback
assert len(value_callback_buffer) == 1
values = [value["value"] for value in value_callback_buffer[0].values()]
assert np.isclose(values, expected_values).all()
assert all(
[
value["timestamp"] == value_callback_buffer[0][galil_rio.an_ch0.name]["timestamp"]
for value in value_callback_buffer[0].values()
]
)