diff --git a/csaxs_bec/devices/omny/galil/galil_rio.py b/csaxs_bec/devices/omny/galil/galil_rio.py index 17a88f9..aa20fdf 100644 --- a/csaxs_bec/devices/omny/galil/galil_rio.py +++ b/csaxs_bec/devices/omny/galil/galil_rio.py @@ -1,10 +1,46 @@ +""" +Module for the Galil RIO (RIO-471xx) controller interface. The controller is a compact PLC +with Ethernet. It has digital and analog I/O as well as counters and timers. + +Link to the Galil RIO vendor page: +https://www.galil.com/plcs/remote-io/rio-471xx + +This module provides the GalilRIOController for communication with the RIO controller +over TCP/IP. It also provides a device integration that interfaces to these +8 analog channels. +""" + +from __future__ import annotations + +import time +from typing import TYPE_CHECKING + +from bec_lib.logger import bec_logger +from ophyd import Component as Cpt +from ophyd_devices import PSIDeviceBase from ophyd_devices.utils.controller import Controller, threadlocked -from ophyd_devices.utils.socket import SocketSignal +from ophyd_devices.utils.socket import SocketIO -from csaxs_bec.devices.omny.galil.galil_ophyd import GalilCommunicationError, retry_once +from csaxs_bec.devices.omny.galil.galil_ophyd import ( + GalilCommunicationError, + GalilSignalRO, + retry_once, +) + +if TYPE_CHECKING: # pragma: no cover + from bec_lib.devicemanager import ScanInfo + from bec_server.device_server.devices.devicemanager import DeviceManagerDS + +logger = bec_logger.logger -class GalilRIO(Controller): +class GalilRIOController(Controller): + """ + Controller Class for Galil RIO controller communication. + + Multiple controllers are in use at the cSAXS beamline: + - 129.129.98.64 (port 23) + """ @threadlocked def socket_put(self, val: str) -> None: @@ -28,8 +64,154 @@ class GalilRIO(Controller): ) -class GalilRIOSignalBase(SocketSignal): - def __init__(self, signal_name, **kwargs): - self.signal_name = signal_name - super().__init__(**kwargs) - self.rio_controller = self.parent.rio_controller +class GalilRIOSignalRO(GalilSignalRO): + """ + Read-only Signal for reading a single analog input channel from the Galil RIO controller. + It always read all 8 analog channels at once, and updates the reabacks of all channels. + New readbacks are only fetched from the controller if the last readback is older than + _READ_TIMEOUT seconds, otherwise the last cached readback is returned to reduce network traffic. + + Args: + signal_name (str): Name of the signal. + channel (int): Analog channel number (0-7). + parent (GalilRIO): Parent GalilRIO device. + """ + + _NUM_ANALOG_CHANNELS = 8 + _READ_TIMEOUT = 0.1 # seconds + + def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs): + super().__init__(signal_name, parent=parent, **kwargs) + self._channel = channel + self._metadata["connected"] = False + + def _socket_get(self) -> float: + """Get command for the readback signal""" + cmd = "MG@" + ",@".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)]) + ret = self.controller.socket_put_and_receive(cmd) + values = [float(val) for val in ret.strip().split(" ")] + # This updates all channels' readbacks, including self._readback + self._update_all_channels(values) + return self._readback + + def get(self): + """Get current analog channel values from the Galil RIO controller.""" + # If the last readback has happend more than _READ_TIMEOUT seconds ago, read all channels again + if time.monotonic() - self.parent.last_readback > self._READ_TIMEOUT: + self._readback = self._socket_get() + return self._readback + + # pylint: disable=protected-access + def _update_all_channels(self, values: list[float]) -> None: + """ + Update all analog channel readbacks based on the provided list of values. + List of values must be in order from an_ch0 to an_ch7. + + We first have to update the _last_readback timestamp of the GalilRIO parent device. + Then we update all readbacks of all an_ch channels, before we run any subscriptions. + This ensures that all readbacks are updated before any subscriptions are run, which + may themselves read other channels. + + Args: + values (list[float]): List of 8 float values corresponding to the analog channels. + They must be in order from an_ch0 to an_ch7. + """ + timestamp = time.time() + # Update parent's last readback before running subscriptions!! + self.parent._last_readback = time.monotonic() + updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val) + # Update all readbacks first + for walk in self.parent.walk_signals(): + if walk.item.attr_name.startswith("an_ch"): + idx = int(walk.item.attr_name[-1]) + if 0 <= idx < len(values): + old_val = walk.item._readback + new_val = values[idx] + walk.item._metadata["timestamp"] = timestamp + walk.item._readback = new_val + updates[walk.item.attr_name] = (new_val, old_val) + + # Run subscriptions after all readbacks have been updated + for walk in self.parent.walk_signals(): + if walk.item.attr_name in updates: + new_val, old_val = updates[walk.item.attr_name] + walk.item._run_subs( + sub_type=walk.item.SUB_VALUE, + old_value=old_val, + value=new_val, + timestamp=timestamp, + ) + + +class GalilRIO(PSIDeviceBase): + """ + Galil RIO controller integration with 8 analog input channels. To implement the device, + please provide the appropriate host and port (default port is 23). + + Args: + host (str): Hostname or IP address of the Galil RIO controller. + """ + + SUB_CONNECTION_CHANGE = "connection_change" + + an_ch0 = Cpt(GalilRIOSignalRO, signal_name="an_ch0", channel=0, doc="Analog input channel 0") + an_ch1 = Cpt(GalilRIOSignalRO, signal_name="an_ch1", channel=1, doc="Analog input channel 1") + an_ch2 = Cpt(GalilRIOSignalRO, signal_name="an_ch2", channel=2, doc="Analog input channel 2") + an_ch3 = Cpt(GalilRIOSignalRO, signal_name="an_ch3", channel=3, doc="Analog input channel 3") + an_ch4 = Cpt(GalilRIOSignalRO, signal_name="an_ch4", channel=4, doc="Analog input channel 4") + an_ch5 = Cpt(GalilRIOSignalRO, signal_name="an_ch5", channel=5, doc="Analog input channel 5") + an_ch6 = Cpt(GalilRIOSignalRO, signal_name="an_ch6", channel=6, doc="Analog input channel 6") + an_ch7 = Cpt(GalilRIOSignalRO, signal_name="an_ch7", channel=7, doc="Analog input channel 7") + + def __init__( + self, + *, + name: str, + host: str, + device_manager: DeviceManagerDS, + port: int | None = None, + socket_cls: type[SocketIO] = SocketIO, + scan_info: ScanInfo | None = None, + **kwargs, + ): + if port is None: + port = 23 # Default port for Galil RIO controller + self.controller = GalilRIOController( + socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager + ) + self._last_readback: float = time.monotonic() + super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs) + self.controller.subscribe( + self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE + ) + + @property + def last_readback(self) -> float: + """Return the time of the last readback from the controller.""" + return self._last_readback + + # pylint: disable=arguments-differ + def wait_for_connection(self, timeout: float = 30.0) -> None: + """Wait for the RIO controller to be connected within timeout period.""" + self.controller.on(timeout=timeout) + + def destroy(self) -> None: + """Make sure to turn off the controller socket on destroy.""" + self.controller.off(update_config=False) + return super().destroy() + + # pylint: disable=protected-access + def _update_connection_state(self, **kwargs): + for walk in self.walk_signals(): + walk.item._metadata["connected"] = self.controller.connected + + +if __name__ == "__main__": + HOST_NAME = "129.129.98.64" + from bec_server.device_server.tests.utils import DMMock + + dm = DMMock() + rio = GalilRIO(name="rio", host=HOST_NAME, device_manager=dm) + rio.wait_for_connection(timeout=10) + print("Connected:", rio.an_ch1.read()) + print("All channels:", rio.read()) diff --git a/tests/tests_devices/test_galil.py b/tests/tests_devices/test_galil.py index 841f4f4..3024640 100644 --- a/tests/tests_devices/test_galil.py +++ b/tests/tests_devices/test_galil.py @@ -1,13 +1,15 @@ import copy +import inspect from unittest import mock +import numpy as np import pytest -from bec_server.device_server.tests.utils import DMMock from ophyd_devices.tests.utils import SocketMock from csaxs_bec.devices.npoint.npoint import NPointAxis, NPointController from csaxs_bec.devices.omny.galil.fgalil_ophyd import FlomniGalilController, FlomniGalilMotor from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor +from csaxs_bec.devices.omny.galil.galil_rio import GalilRIO, GalilRIOController, GalilRIOSignalRO from csaxs_bec.devices.omny.galil.lgalil_ophyd import LamniGalilController, LamniGalilMotor from csaxs_bec.devices.omny.galil.ogalil_ophyd import OMNYGalilController, OMNYGalilMotor from csaxs_bec.devices.omny.galil.sgalil_ophyd import GalilController, SGalilMotor @@ -173,9 +175,9 @@ def test_find_reference(leyex, axis_nr, socket_put_messages, socket_get_messages assert leyex.controller.sock.buffer_put == socket_put_messages -def test_wait_for_connection_called(): +def test_wait_for_connection_called(dm_with_devices): """Test that wait_for_connection is called on all motors that have a socket controller.""" - dm = DMMock() + dm = dm_with_devices testable_connections = [ (NPointAxis, NPointController), (FlomniGalilMotor, FlomniGalilController), @@ -187,6 +189,7 @@ def test_wait_for_connection_called(): (RtLamniMotor, RtLamniController), (RtOMNYMotor, RtOMNYController), (SmaractMotor, SmaractController), + (GalilRIO, GalilRIOController), ] for motor_cls, controller_cls in testable_connections: # Store values to restore later @@ -195,14 +198,20 @@ def test_wait_for_connection_called(): controller_cls._reset_controller() controller_cls._axes_per_controller = 3 - motor = motor_cls( - "C", - name="test_motor", - host="mpc2680.psi.ch", - port=8081, - socket_cls=SocketMock, - device_manager=dm, - ) + inspect_args = inspect.getfullargspec(motor_cls.__init__).args + inspect_kwargs = inspect.getfullargspec(motor_cls.__init__).kwonlyargs + if len(inspect_args) > 1: + args = ("C",) + else: + args = () + kwargs = { + "name": "test_motor", + "host": "mpc2680.psi.ch", + "port": 8081, + "device_manager": dm, + "socket_cls": SocketMock, + } + motor = motor_cls(*args, **kwargs) with mock.patch.object(motor.controller, "on") as mock_on: motor.wait_for_connection(timeout=5.0) @@ -219,3 +228,131 @@ def test_wait_for_connection_called(): finally: controller_cls._reset_controller() controller_cls._axes_per_controller = ctrl_axis_backup + + +######################## +#### Test Galil RIO #### +######################## + + +@pytest.fixture +def galil_rio(dm_with_devices): + try: + rio = GalilRIO( + name="galil_rio", + host="129.129.0.1", + socket_cls=SocketMock, + device_manager=dm_with_devices, + ) + rio.wait_for_connection() + yield rio + finally: + rio.destroy() + + +def test_galil_rio_initialization(galil_rio): + """ + Test that the Galil RIO signal can establish a connection. + """ + assert galil_rio.controller.connected is True + # All signals should be connected if the controller is connected + for walk in galil_rio.walk_signals(): + signal = walk.item + assert signal.connected is True + + assert galil_rio.controller._socket_host == "129.129.0.1" + assert galil_rio.controller._socket_port == 23 # Default port + + +def test_galil_rio_signal_read(galil_rio): + """ + Test that the Galil RIO signal can read values correctly. + """ + ########### + ## Test read of all channels + ########### + + assert galil_rio.an_ch0._READ_TIMEOUT == 0.1 # Default read timeout of 100ms + # Mock the socket to return specific values + galil_rio.controller.sock.buffer_recv = [b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901"] + galil_rio._last_readback = 0 # Force read from controller + + read_values = galil_rio.read() + assert len(read_values) == 8 # 8 channels + expected_values = { + galil_rio.an_ch0.name: {"value": 1.234}, + galil_rio.an_ch1.name: {"value": 2.345}, + galil_rio.an_ch2.name: {"value": 3.456}, + galil_rio.an_ch3.name: {"value": 4.567}, + galil_rio.an_ch4.name: {"value": 5.678}, + galil_rio.an_ch5.name: {"value": 6.789}, + galil_rio.an_ch6.name: {"value": 7.890}, + galil_rio.an_ch7.name: {"value": 8.901}, + } + # All timestamps should be the same + assert all( + ret["timestamp"] == read_values[galil_rio.an_ch0.name]["timestamp"] + for signal_name, ret in read_values.items() + ) + # Check values + for signal_name, expected in expected_values.items(): + assert np.isclose(read_values[signal_name]["value"], expected["value"]) + assert "timestamp" in read_values[signal_name] + + # Check communication command to socker + assert galil_rio.controller.sock.buffer_put == [ + b"MG@AN[0],@AN[1],@AN[2],@AN[3],@AN[4],@AN[5],@AN[6],@AN[7]\r" + ] + + ########### + ## Test read of single channel with callback + ########### + + # Add callback to update readback + value_callback_buffer: list[tuple] = [] + + def value_callback(value, old_value, **kwargs): + obj = kwargs.get("obj") + galil = obj.parent + readback = galil.read() + value_callback_buffer.append(readback) + + galil_rio.an_ch0.subscribe(value_callback, run=False) + galil_rio.controller.sock.buffer_recv = [b" 2.5 2.6 2.7 2.8 2.9 3.0 3.1 3.2"] + expected_values = [2.5, 2.6, 2.7, 2.8, 2.9, 3.0, 3.1, 3.2] + + ################## + ## Test cached readback + ################## + + # Should have used the cached value + for walk in galil_rio.walk_signals(): + walk.item._READ_TIMEOUT = 10 # Make sure cached read is used + ret = galil_rio.an_ch0.read() + + # Should not trigger callback since value did not change + assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 1.234) + # Same timestamp as for another channel as this is cached read + assert np.isclose(ret[galil_rio.an_ch0.name]["timestamp"], galil_rio.an_ch7.timestamp) + assert len(value_callback_buffer) == 0 + + ################## + ## Test unchached read from controller + ################## + + # Now force a read from the controller + galil_rio._last_readback = 0 # Force read from controller + ret = galil_rio.an_ch0.read() + + assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 2.5) + + # Check callback invocation, but only 1 callback even with galil_rio.read() call in callback + assert len(value_callback_buffer) == 1 + values = [value["value"] for value in value_callback_buffer[0].values()] + assert np.isclose(values, expected_values).all() + assert all( + [ + value["timestamp"] == value_callback_buffer[0][galil_rio.an_ch0.name]["timestamp"] + for value in value_callback_buffer[0].values() + ] + )