w
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m29s
CI for csaxs_bec / test (push) Successful in 1m31s

This commit is contained in:
2026-01-26 10:55:45 +01:00
parent 8200d3f7c5
commit 8c7e8ab34f
2 changed files with 86 additions and 36 deletions

View File

@@ -73,7 +73,7 @@ class GalilRIOSignalRO(GalilSignalRO):
"""
_NUM_ANALOG_CHANNELS = 8
_READ_TIMEOUT = 0.5 # seconds
_READ_TIMEOUT = 0.1 # seconds
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
super().__init__(signal_name, parent=parent, **kwargs)
@@ -92,11 +92,8 @@ class GalilRIOSignalRO(GalilSignalRO):
def get(self):
"""Get current analog channel values from the Galil RIO controller."""
# If the last readback has happend more than 0.5 seconds ago, read all channels again
logger.debug(f"Reading value for {self.name}, last readback at {self.parent.last_readback}")
if time.monotonic() - self.parent.last_readback < 0.5: # self._READ_TIMEOUT:
logger.debug(f"Using cached readback value for {self.name}: {self._readback}")
return self._readback
self._readback = self._socket_get()
if time.monotonic() - self.parent.last_readback > self._READ_TIMEOUT:
self._readback = self._socket_get()
return self._readback
# pylint: disable=protected-access
@@ -112,6 +109,8 @@ class GalilRIOSignalRO(GalilSignalRO):
timestamp = time.time()
# NOTE: Update parent's last readback before running subscriptions!!
self.parent._last_readback = time.monotonic()
updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val)
# First we update all readbacks
for walk in self.parent.walk_signals():
if walk.item.attr_name.startswith("an_ch"):
idx = int(walk.item.attr_name[-1])
@@ -120,12 +119,18 @@ class GalilRIOSignalRO(GalilSignalRO):
new_val = values[idx]
walk.item._metadata["timestamp"] = timestamp
walk.item._readback = new_val
walk.item._run_subs(
sub_type=walk.item.SUB_VALUE,
old_value=old_val,
value=new_val,
timestamp=timestamp,
)
updates[walk.item.attr_name] = (new_val, old_val)
# Then we run all subscriptions as this ensures that all readbacks are updated
for walk in self.parent.walk_signals():
if walk.item.attr_name in updates:
new_val, old_val = updates[walk.item.attr_name]
walk.item._run_subs(
sub_type=walk.item.SUB_VALUE,
old_value=old_val,
value=new_val,
timestamp=timestamp,
)
class GalilRIO(PSIDeviceBase):

View File

@@ -1,4 +1,5 @@
import copy
import inspect
from unittest import mock
import numpy as np
@@ -197,14 +198,20 @@ def test_wait_for_connection_called(dm_with_devices):
controller_cls._reset_controller()
controller_cls._axes_per_controller = 3
motor = motor_cls(
"C",
name="test_motor",
host="mpc2680.psi.ch",
port=8081,
socket_cls=SocketMock,
device_manager=dm,
)
inspect_args = inspect.getfullargspec(motor_cls.__init__).args
inspect_kwargs = inspect.getfullargspec(motor_cls.__init__).kwonlyargs
if len(inspect_args) > 1:
args = ("C",)
else:
args = ()
kwargs = {
"name": "test_motor",
"host": "mpc2680.psi.ch",
"port": 8081,
"device_manager": dm,
"socket_cls": SocketMock,
}
motor = motor_cls(*args, **kwargs)
with mock.patch.object(motor.controller, "on") as mock_on:
motor.wait_for_connection(timeout=5.0)
@@ -261,9 +268,14 @@ def test_galil_rio_signal_read(galil_rio):
"""
Test that the Galil RIO signal can read values correctly.
"""
###########
## Test read of all channels
###########
assert galil_rio.an_ch0._READ_TIMEOUT == 0.1 # Default read timeout of 100ms
# Mock the socket to return specific values
galil_rio.controller.sock.buffer_recv = [b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901"]
galil_rio._last_readback = 0 # Force read from controller
read_values = galil_rio.read()
assert len(read_values) == 8 # 8 channels
@@ -277,6 +289,12 @@ def test_galil_rio_signal_read(galil_rio):
galil_rio.an_ch6.name: {"value": 7.890},
galil_rio.an_ch7.name: {"value": 8.901},
}
# All timestamps should be the same
assert all(
ret["timestamp"] == read_values[galil_rio.an_ch0.name]["timestamp"]
for signal_name, ret in read_values.items()
)
# Check values
for signal_name, expected in expected_values.items():
assert np.isclose(read_values[signal_name]["value"], expected["value"])
assert "timestamp" in read_values[signal_name]
@@ -286,28 +304,55 @@ def test_galil_rio_signal_read(galil_rio):
b"MG@AN[0],@AN[1],@AN[2],@AN[3],@AN[4],@AN[5],@AN[6],@AN[7]\r"
]
###########
## Test read of single channel with callback
###########
# Add callback to update readback
value_callback_buffer: list[tuple] = []
def value_callback(value, old_value, **kwargs):
value_callback_buffer.append((value, old_value))
obj = kwargs.get("obj")
galil = obj.parent
readback = galil.read()
value_callback_buffer.append(readback)
galil_rio.an_ch0.subscribe(value_callback)
galil_rio.controller.sock.buffer_recv = [b" 1.234"]
galil_rio.an_ch0.subscribe(value_callback, run=False)
galil_rio.controller.sock.buffer_recv = [b" 2.5 2.6 2.7 2.8 2.9 3.0 3.1 3.2"]
expected_values = [2.5, 2.6, 2.7, 2.8, 2.9, 3.0, 3.1, 3.2]
val = galil_rio.an_ch0.read()
##################
## Test cached readback
##################
assert np.isclose(val[galil_rio.an_ch0.name]["value"], 1.234)
# Should have used the cached value
for walk in galil_rio.walk_signals():
walk.item._READ_TIMEOUT = 10 # Make sure cached read is used
ret = galil_rio.an_ch0.read()
# Check callback invocation
# Should not trigger callback since value did not change
assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 1.234)
# Same timestamp as for another channel as this is cached read
assert np.isclose(ret[galil_rio.an_ch0.name]["timestamp"], galil_rio.an_ch7.timestamp)
assert len(value_callback_buffer) == 0
##################
## Test unchached read from controller
##################
# Now force a read from the controller
galil_rio._last_readback = 0 # Force read from controller
ret = galil_rio.an_ch0.read()
assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 2.5)
# Check callback invocation, but only 1 callback even with galil_rio.read() call in callback
assert len(value_callback_buffer) == 1
# Check callback values match expected
assert np.isclose(value_callback_buffer[0], (1.234, 0)).all()
# Test another read with different value
value_callback_buffer.clear()
galil_rio.controller.sock.buffer_recv = [b" 2.5"]
val = galil_rio.an_ch0.read()
assert np.isclose(value_callback_buffer[0], (2.5, 1.234)).all()
values = [value["value"] for value in value_callback_buffer[0].values()]
assert np.isclose(values, expected_values).all()
assert all(
[
value["timestamp"] == value_callback_buffer[0][galil_rio.an_ch0.name]["timestamp"]
for value in value_callback_buffer[0].values()
]
)