Compare commits
2 Commits
smaract_im
...
fix/socket
| Author | SHA1 | Date | |
|---|---|---|---|
| eae967a04e | |||
|
82d47c7511
|
@@ -1,10 +1,46 @@
|
|||||||
|
"""
|
||||||
|
Module for the Galil RIO (RIO-471xx) controller interface. The controller is a compact PLC
|
||||||
|
with Ethernet. It has digital and analog I/O as well as counters and timers.
|
||||||
|
|
||||||
|
Link to the Galil RIO vendor page:
|
||||||
|
https://www.galil.com/plcs/remote-io/rio-471xx
|
||||||
|
|
||||||
|
This module provides the GalilRIOController for communication with the RIO controller
|
||||||
|
over TCP/IP. It also provides a device integration that interfaces to these
|
||||||
|
8 analog channels.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import time
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from bec_lib.logger import bec_logger
|
||||||
|
from ophyd import Component as Cpt
|
||||||
|
from ophyd_devices import PSIDeviceBase
|
||||||
from ophyd_devices.utils.controller import Controller, threadlocked
|
from ophyd_devices.utils.controller import Controller, threadlocked
|
||||||
from ophyd_devices.utils.socket import SocketSignal
|
from ophyd_devices.utils.socket import SocketIO
|
||||||
|
|
||||||
from csaxs_bec.devices.omny.galil.galil_ophyd import GalilCommunicationError, retry_once
|
from csaxs_bec.devices.omny.galil.galil_ophyd import (
|
||||||
|
GalilCommunicationError,
|
||||||
|
GalilSignalRO,
|
||||||
|
retry_once,
|
||||||
|
)
|
||||||
|
|
||||||
|
if TYPE_CHECKING: # pragma: no cover
|
||||||
|
from bec_lib.devicemanager import ScanInfo
|
||||||
|
from bec_server.device_server.devices.devicemanager import DeviceManagerDS
|
||||||
|
|
||||||
|
logger = bec_logger.logger
|
||||||
|
|
||||||
|
|
||||||
class GalilRIO(Controller):
|
class GalilRIOController(Controller):
|
||||||
|
"""
|
||||||
|
Controller Class for Galil RIO controller communication.
|
||||||
|
|
||||||
|
Multiple controllers are in use at the cSAXS beamline:
|
||||||
|
- 129.129.98.64 (port 23)
|
||||||
|
"""
|
||||||
|
|
||||||
@threadlocked
|
@threadlocked
|
||||||
def socket_put(self, val: str) -> None:
|
def socket_put(self, val: str) -> None:
|
||||||
@@ -28,8 +64,154 @@ class GalilRIO(Controller):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class GalilRIOSignalBase(SocketSignal):
|
class GalilRIOSignalRO(GalilSignalRO):
|
||||||
def __init__(self, signal_name, **kwargs):
|
"""
|
||||||
self.signal_name = signal_name
|
Read-only Signal for reading a single analog input channel from the Galil RIO controller.
|
||||||
super().__init__(**kwargs)
|
It always read all 8 analog channels at once, and updates the reabacks of all channels.
|
||||||
self.rio_controller = self.parent.rio_controller
|
New readbacks are only fetched from the controller if the last readback is older than
|
||||||
|
_READ_TIMEOUT seconds, otherwise the last cached readback is returned to reduce network traffic.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
signal_name (str): Name of the signal.
|
||||||
|
channel (int): Analog channel number (0-7).
|
||||||
|
parent (GalilRIO): Parent GalilRIO device.
|
||||||
|
"""
|
||||||
|
|
||||||
|
_NUM_ANALOG_CHANNELS = 8
|
||||||
|
_READ_TIMEOUT = 0.1 # seconds
|
||||||
|
|
||||||
|
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
|
||||||
|
super().__init__(signal_name, parent=parent, **kwargs)
|
||||||
|
self._channel = channel
|
||||||
|
self._metadata["connected"] = False
|
||||||
|
|
||||||
|
def _socket_get(self) -> float:
|
||||||
|
"""Get command for the readback signal"""
|
||||||
|
cmd = "MG@" + ",@".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)])
|
||||||
|
ret = self.controller.socket_put_and_receive(cmd)
|
||||||
|
values = [float(val) for val in ret.strip().split(" ")]
|
||||||
|
# This updates all channels' readbacks, including self._readback
|
||||||
|
self._update_all_channels(values)
|
||||||
|
return self._readback
|
||||||
|
|
||||||
|
def get(self):
|
||||||
|
"""Get current analog channel values from the Galil RIO controller."""
|
||||||
|
# If the last readback has happend more than _READ_TIMEOUT seconds ago, read all channels again
|
||||||
|
if time.monotonic() - self.parent.last_readback > self._READ_TIMEOUT:
|
||||||
|
self._readback = self._socket_get()
|
||||||
|
return self._readback
|
||||||
|
|
||||||
|
# pylint: disable=protected-access
|
||||||
|
def _update_all_channels(self, values: list[float]) -> None:
|
||||||
|
"""
|
||||||
|
Update all analog channel readbacks based on the provided list of values.
|
||||||
|
List of values must be in order from an_ch0 to an_ch7.
|
||||||
|
|
||||||
|
We first have to update the _last_readback timestamp of the GalilRIO parent device.
|
||||||
|
Then we update all readbacks of all an_ch channels, before we run any subscriptions.
|
||||||
|
This ensures that all readbacks are updated before any subscriptions are run, which
|
||||||
|
may themselves read other channels.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
values (list[float]): List of 8 float values corresponding to the analog channels.
|
||||||
|
They must be in order from an_ch0 to an_ch7.
|
||||||
|
"""
|
||||||
|
timestamp = time.time()
|
||||||
|
# Update parent's last readback before running subscriptions!!
|
||||||
|
self.parent._last_readback = time.monotonic()
|
||||||
|
updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val)
|
||||||
|
# Update all readbacks first
|
||||||
|
for walk in self.parent.walk_signals():
|
||||||
|
if walk.item.attr_name.startswith("an_ch"):
|
||||||
|
idx = int(walk.item.attr_name[-1])
|
||||||
|
if 0 <= idx < len(values):
|
||||||
|
old_val = walk.item._readback
|
||||||
|
new_val = values[idx]
|
||||||
|
walk.item._metadata["timestamp"] = timestamp
|
||||||
|
walk.item._readback = new_val
|
||||||
|
updates[walk.item.attr_name] = (new_val, old_val)
|
||||||
|
|
||||||
|
# Run subscriptions after all readbacks have been updated
|
||||||
|
for walk in self.parent.walk_signals():
|
||||||
|
if walk.item.attr_name in updates:
|
||||||
|
new_val, old_val = updates[walk.item.attr_name]
|
||||||
|
walk.item._run_subs(
|
||||||
|
sub_type=walk.item.SUB_VALUE,
|
||||||
|
old_value=old_val,
|
||||||
|
value=new_val,
|
||||||
|
timestamp=timestamp,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class GalilRIO(PSIDeviceBase):
|
||||||
|
"""
|
||||||
|
Galil RIO controller integration with 8 analog input channels. To implement the device,
|
||||||
|
please provide the appropriate host and port (default port is 23).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
host (str): Hostname or IP address of the Galil RIO controller.
|
||||||
|
"""
|
||||||
|
|
||||||
|
SUB_CONNECTION_CHANGE = "connection_change"
|
||||||
|
|
||||||
|
an_ch0 = Cpt(GalilRIOSignalRO, signal_name="an_ch0", channel=0, doc="Analog input channel 0")
|
||||||
|
an_ch1 = Cpt(GalilRIOSignalRO, signal_name="an_ch1", channel=1, doc="Analog input channel 1")
|
||||||
|
an_ch2 = Cpt(GalilRIOSignalRO, signal_name="an_ch2", channel=2, doc="Analog input channel 2")
|
||||||
|
an_ch3 = Cpt(GalilRIOSignalRO, signal_name="an_ch3", channel=3, doc="Analog input channel 3")
|
||||||
|
an_ch4 = Cpt(GalilRIOSignalRO, signal_name="an_ch4", channel=4, doc="Analog input channel 4")
|
||||||
|
an_ch5 = Cpt(GalilRIOSignalRO, signal_name="an_ch5", channel=5, doc="Analog input channel 5")
|
||||||
|
an_ch6 = Cpt(GalilRIOSignalRO, signal_name="an_ch6", channel=6, doc="Analog input channel 6")
|
||||||
|
an_ch7 = Cpt(GalilRIOSignalRO, signal_name="an_ch7", channel=7, doc="Analog input channel 7")
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
name: str,
|
||||||
|
host: str,
|
||||||
|
device_manager: DeviceManagerDS,
|
||||||
|
port: int | None = None,
|
||||||
|
socket_cls: type[SocketIO] = SocketIO,
|
||||||
|
scan_info: ScanInfo | None = None,
|
||||||
|
**kwargs,
|
||||||
|
):
|
||||||
|
if port is None:
|
||||||
|
port = 23 # Default port for Galil RIO controller
|
||||||
|
self.controller = GalilRIOController(
|
||||||
|
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
|
||||||
|
)
|
||||||
|
self._last_readback: float = time.monotonic()
|
||||||
|
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
|
||||||
|
self.controller.subscribe(
|
||||||
|
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def last_readback(self) -> float:
|
||||||
|
"""Return the time of the last readback from the controller."""
|
||||||
|
return self._last_readback
|
||||||
|
|
||||||
|
# pylint: disable=arguments-differ
|
||||||
|
def wait_for_connection(self, timeout: float = 30.0) -> None:
|
||||||
|
"""Wait for the RIO controller to be connected within timeout period."""
|
||||||
|
self.controller.on(timeout=timeout)
|
||||||
|
|
||||||
|
def destroy(self) -> None:
|
||||||
|
"""Make sure to turn off the controller socket on destroy."""
|
||||||
|
self.controller.off(update_config=False)
|
||||||
|
return super().destroy()
|
||||||
|
|
||||||
|
# pylint: disable=protected-access
|
||||||
|
def _update_connection_state(self, **kwargs):
|
||||||
|
for walk in self.walk_signals():
|
||||||
|
walk.item._metadata["connected"] = self.controller.connected
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
HOST_NAME = "129.129.98.64"
|
||||||
|
from bec_server.device_server.tests.utils import DMMock
|
||||||
|
|
||||||
|
dm = DMMock()
|
||||||
|
rio = GalilRIO(name="rio", host=HOST_NAME, device_manager=dm)
|
||||||
|
rio.wait_for_connection(timeout=10)
|
||||||
|
print("Connected:", rio.an_ch1.read())
|
||||||
|
print("All channels:", rio.read())
|
||||||
|
|||||||
14
tests/conftest.py
Normal file
14
tests/conftest.py
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
"""
|
||||||
|
Conftest runs for all tests in this directory and subdirectories. Thereby, we know for
|
||||||
|
certain that the SocketSignal.READBACK_TIMEOUT is set to 0 for all tests, which prevents
|
||||||
|
hanging tests when a readback is attempted on a non-connected socket.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# conftest.py
|
||||||
|
import pytest
|
||||||
|
from ophyd_devices.utils.socket import SocketSignal
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def patch_socket_timeout(monkeypatch):
|
||||||
|
monkeypatch.setattr(SocketSignal, "READBACK_TIMEOUT", 0.0)
|
||||||
@@ -1,13 +1,15 @@
|
|||||||
import copy
|
import copy
|
||||||
|
import inspect
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
import pytest
|
import pytest
|
||||||
from bec_server.device_server.tests.utils import DMMock
|
|
||||||
from ophyd_devices.tests.utils import SocketMock
|
from ophyd_devices.tests.utils import SocketMock
|
||||||
|
|
||||||
from csaxs_bec.devices.npoint.npoint import NPointAxis, NPointController
|
from csaxs_bec.devices.npoint.npoint import NPointAxis, NPointController
|
||||||
from csaxs_bec.devices.omny.galil.fgalil_ophyd import FlomniGalilController, FlomniGalilMotor
|
from csaxs_bec.devices.omny.galil.fgalil_ophyd import FlomniGalilController, FlomniGalilMotor
|
||||||
from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor
|
from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor
|
||||||
|
from csaxs_bec.devices.omny.galil.galil_rio import GalilRIO, GalilRIOController, GalilRIOSignalRO
|
||||||
from csaxs_bec.devices.omny.galil.lgalil_ophyd import LamniGalilController, LamniGalilMotor
|
from csaxs_bec.devices.omny.galil.lgalil_ophyd import LamniGalilController, LamniGalilMotor
|
||||||
from csaxs_bec.devices.omny.galil.ogalil_ophyd import OMNYGalilController, OMNYGalilMotor
|
from csaxs_bec.devices.omny.galil.ogalil_ophyd import OMNYGalilController, OMNYGalilMotor
|
||||||
from csaxs_bec.devices.omny.galil.sgalil_ophyd import GalilController, SGalilMotor
|
from csaxs_bec.devices.omny.galil.sgalil_ophyd import GalilController, SGalilMotor
|
||||||
@@ -173,9 +175,9 @@ def test_find_reference(leyex, axis_nr, socket_put_messages, socket_get_messages
|
|||||||
assert leyex.controller.sock.buffer_put == socket_put_messages
|
assert leyex.controller.sock.buffer_put == socket_put_messages
|
||||||
|
|
||||||
|
|
||||||
def test_wait_for_connection_called():
|
def test_wait_for_connection_called(dm_with_devices):
|
||||||
"""Test that wait_for_connection is called on all motors that have a socket controller."""
|
"""Test that wait_for_connection is called on all motors that have a socket controller."""
|
||||||
dm = DMMock()
|
dm = dm_with_devices
|
||||||
testable_connections = [
|
testable_connections = [
|
||||||
(NPointAxis, NPointController),
|
(NPointAxis, NPointController),
|
||||||
(FlomniGalilMotor, FlomniGalilController),
|
(FlomniGalilMotor, FlomniGalilController),
|
||||||
@@ -187,6 +189,7 @@ def test_wait_for_connection_called():
|
|||||||
(RtLamniMotor, RtLamniController),
|
(RtLamniMotor, RtLamniController),
|
||||||
(RtOMNYMotor, RtOMNYController),
|
(RtOMNYMotor, RtOMNYController),
|
||||||
(SmaractMotor, SmaractController),
|
(SmaractMotor, SmaractController),
|
||||||
|
(GalilRIO, GalilRIOController),
|
||||||
]
|
]
|
||||||
for motor_cls, controller_cls in testable_connections:
|
for motor_cls, controller_cls in testable_connections:
|
||||||
# Store values to restore later
|
# Store values to restore later
|
||||||
@@ -195,14 +198,20 @@ def test_wait_for_connection_called():
|
|||||||
controller_cls._reset_controller()
|
controller_cls._reset_controller()
|
||||||
controller_cls._axes_per_controller = 3
|
controller_cls._axes_per_controller = 3
|
||||||
|
|
||||||
motor = motor_cls(
|
inspect_args = inspect.getfullargspec(motor_cls.__init__).args
|
||||||
"C",
|
inspect_kwargs = inspect.getfullargspec(motor_cls.__init__).kwonlyargs
|
||||||
name="test_motor",
|
if len(inspect_args) > 1:
|
||||||
host="mpc2680.psi.ch",
|
args = ("C",)
|
||||||
port=8081,
|
else:
|
||||||
socket_cls=SocketMock,
|
args = ()
|
||||||
device_manager=dm,
|
kwargs = {
|
||||||
)
|
"name": "test_motor",
|
||||||
|
"host": "mpc2680.psi.ch",
|
||||||
|
"port": 8081,
|
||||||
|
"device_manager": dm,
|
||||||
|
"socket_cls": SocketMock,
|
||||||
|
}
|
||||||
|
motor = motor_cls(*args, **kwargs)
|
||||||
with mock.patch.object(motor.controller, "on") as mock_on:
|
with mock.patch.object(motor.controller, "on") as mock_on:
|
||||||
|
|
||||||
motor.wait_for_connection(timeout=5.0)
|
motor.wait_for_connection(timeout=5.0)
|
||||||
@@ -219,3 +228,131 @@ def test_wait_for_connection_called():
|
|||||||
finally:
|
finally:
|
||||||
controller_cls._reset_controller()
|
controller_cls._reset_controller()
|
||||||
controller_cls._axes_per_controller = ctrl_axis_backup
|
controller_cls._axes_per_controller = ctrl_axis_backup
|
||||||
|
|
||||||
|
|
||||||
|
########################
|
||||||
|
#### Test Galil RIO ####
|
||||||
|
########################
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def galil_rio(dm_with_devices):
|
||||||
|
try:
|
||||||
|
rio = GalilRIO(
|
||||||
|
name="galil_rio",
|
||||||
|
host="129.129.0.1",
|
||||||
|
socket_cls=SocketMock,
|
||||||
|
device_manager=dm_with_devices,
|
||||||
|
)
|
||||||
|
rio.wait_for_connection()
|
||||||
|
yield rio
|
||||||
|
finally:
|
||||||
|
rio.destroy()
|
||||||
|
|
||||||
|
|
||||||
|
def test_galil_rio_initialization(galil_rio):
|
||||||
|
"""
|
||||||
|
Test that the Galil RIO signal can establish a connection.
|
||||||
|
"""
|
||||||
|
assert galil_rio.controller.connected is True
|
||||||
|
# All signals should be connected if the controller is connected
|
||||||
|
for walk in galil_rio.walk_signals():
|
||||||
|
signal = walk.item
|
||||||
|
assert signal.connected is True
|
||||||
|
|
||||||
|
assert galil_rio.controller._socket_host == "129.129.0.1"
|
||||||
|
assert galil_rio.controller._socket_port == 23 # Default port
|
||||||
|
|
||||||
|
|
||||||
|
def test_galil_rio_signal_read(galil_rio):
|
||||||
|
"""
|
||||||
|
Test that the Galil RIO signal can read values correctly.
|
||||||
|
"""
|
||||||
|
###########
|
||||||
|
## Test read of all channels
|
||||||
|
###########
|
||||||
|
|
||||||
|
assert galil_rio.an_ch0._READ_TIMEOUT == 0.1 # Default read timeout of 100ms
|
||||||
|
# Mock the socket to return specific values
|
||||||
|
galil_rio.controller.sock.buffer_recv = [b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901"]
|
||||||
|
galil_rio._last_readback = 0 # Force read from controller
|
||||||
|
|
||||||
|
read_values = galil_rio.read()
|
||||||
|
assert len(read_values) == 8 # 8 channels
|
||||||
|
expected_values = {
|
||||||
|
galil_rio.an_ch0.name: {"value": 1.234},
|
||||||
|
galil_rio.an_ch1.name: {"value": 2.345},
|
||||||
|
galil_rio.an_ch2.name: {"value": 3.456},
|
||||||
|
galil_rio.an_ch3.name: {"value": 4.567},
|
||||||
|
galil_rio.an_ch4.name: {"value": 5.678},
|
||||||
|
galil_rio.an_ch5.name: {"value": 6.789},
|
||||||
|
galil_rio.an_ch6.name: {"value": 7.890},
|
||||||
|
galil_rio.an_ch7.name: {"value": 8.901},
|
||||||
|
}
|
||||||
|
# All timestamps should be the same
|
||||||
|
assert all(
|
||||||
|
ret["timestamp"] == read_values[galil_rio.an_ch0.name]["timestamp"]
|
||||||
|
for signal_name, ret in read_values.items()
|
||||||
|
)
|
||||||
|
# Check values
|
||||||
|
for signal_name, expected in expected_values.items():
|
||||||
|
assert np.isclose(read_values[signal_name]["value"], expected["value"])
|
||||||
|
assert "timestamp" in read_values[signal_name]
|
||||||
|
|
||||||
|
# Check communication command to socker
|
||||||
|
assert galil_rio.controller.sock.buffer_put == [
|
||||||
|
b"MG@AN[0],@AN[1],@AN[2],@AN[3],@AN[4],@AN[5],@AN[6],@AN[7]\r"
|
||||||
|
]
|
||||||
|
|
||||||
|
###########
|
||||||
|
## Test read of single channel with callback
|
||||||
|
###########
|
||||||
|
|
||||||
|
# Add callback to update readback
|
||||||
|
value_callback_buffer: list[tuple] = []
|
||||||
|
|
||||||
|
def value_callback(value, old_value, **kwargs):
|
||||||
|
obj = kwargs.get("obj")
|
||||||
|
galil = obj.parent
|
||||||
|
readback = galil.read()
|
||||||
|
value_callback_buffer.append(readback)
|
||||||
|
|
||||||
|
galil_rio.an_ch0.subscribe(value_callback, run=False)
|
||||||
|
galil_rio.controller.sock.buffer_recv = [b" 2.5 2.6 2.7 2.8 2.9 3.0 3.1 3.2"]
|
||||||
|
expected_values = [2.5, 2.6, 2.7, 2.8, 2.9, 3.0, 3.1, 3.2]
|
||||||
|
|
||||||
|
##################
|
||||||
|
## Test cached readback
|
||||||
|
##################
|
||||||
|
|
||||||
|
# Should have used the cached value
|
||||||
|
for walk in galil_rio.walk_signals():
|
||||||
|
walk.item._READ_TIMEOUT = 10 # Make sure cached read is used
|
||||||
|
ret = galil_rio.an_ch0.read()
|
||||||
|
|
||||||
|
# Should not trigger callback since value did not change
|
||||||
|
assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 1.234)
|
||||||
|
# Same timestamp as for another channel as this is cached read
|
||||||
|
assert np.isclose(ret[galil_rio.an_ch0.name]["timestamp"], galil_rio.an_ch7.timestamp)
|
||||||
|
assert len(value_callback_buffer) == 0
|
||||||
|
|
||||||
|
##################
|
||||||
|
## Test unchached read from controller
|
||||||
|
##################
|
||||||
|
|
||||||
|
# Now force a read from the controller
|
||||||
|
galil_rio._last_readback = 0 # Force read from controller
|
||||||
|
ret = galil_rio.an_ch0.read()
|
||||||
|
|
||||||
|
assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 2.5)
|
||||||
|
|
||||||
|
# Check callback invocation, but only 1 callback even with galil_rio.read() call in callback
|
||||||
|
assert len(value_callback_buffer) == 1
|
||||||
|
values = [value["value"] for value in value_callback_buffer[0].values()]
|
||||||
|
assert np.isclose(values, expected_values).all()
|
||||||
|
assert all(
|
||||||
|
[
|
||||||
|
value["timestamp"] == value_callback_buffer[0][galil_rio.an_ch0.name]["timestamp"]
|
||||||
|
for value in value_callback_buffer[0].values()
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user