Compare commits

..

4 Commits

Author SHA1 Message Date
afdc64e296 fix(rio): fix rio cached readings
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m44s
CI for csaxs_bec / test (pull_request) Successful in 1m40s
2026-02-26 16:15:29 +01:00
bc31c00e1f fix(tests): x_ray_eye_align correct imports fixed after refactor of LamNI
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m36s
CI for csaxs_bec / test (push) Successful in 1m37s
2026-02-23 13:25:09 +01:00
x01dc
38671f074e minor printout fix
Some checks failed
CI for csaxs_bec / test (pull_request) Failing after 1m30s
CI for csaxs_bec / test (push) Failing after 1m32s
2026-02-23 12:44:04 +01:00
x01dc
92e39a5f75 minor adjmustment 2026-02-23 12:35:56 +01:00
4 changed files with 39 additions and 19 deletions

View File

@@ -210,13 +210,11 @@ class LamNI(LamNIOpticsMixin):
self.feedback_status() self.feedback_status()
def feedback_status(self): def feedback_status(self):
if self.device_manager.devices.rtx.controller.feedback_is_running(): self.device_manager.devices.rtx.controller.show_feedback_status()
print("The rt feedback is \x1b[92mrunning\x1b[0m.")
else:
print("The rt feedback is \x1b[91mNOT\x1b[0m running.")
def show_interferometer_positions(self): def show_interferometer_positions(self):
self.device_manager.devices.rtx.controller.show_interferometer_positions() self.device_manager.devices.rtx.controller.show_feedback_status()
def show_signal_strength(self): def show_signal_strength(self):
self.device_manager.devices.rtx.controller.show_signal_strength_interferometer() self.device_manager.devices.rtx.controller.show_signal_strength_interferometer()

View File

@@ -13,6 +13,7 @@ which can be easily supported by changing the _NUM_DIGITAL_OUTPUT_CHANNELS varia
from __future__ import annotations from __future__ import annotations
import time
from typing import TYPE_CHECKING, Literal from typing import TYPE_CHECKING, Literal
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
@@ -78,12 +79,38 @@ class GalilRIOAnalogSignalRO(GalilSignalBase):
""" """
_NUM_ANALOG_CHANNELS = 8 _NUM_ANALOG_CHANNELS = 8
READBACK_TIMEOUT = 0.1 # time to wait in between two readback attemps in seconds, otherwise return cached value
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs): def __init__(
self,
signal_name: str,
channel: int,
parent: GalilRIO,
readback_timeout: float = None,
**kwargs,
):
super().__init__(signal_name=signal_name, parent=parent, **kwargs) super().__init__(signal_name=signal_name, parent=parent, **kwargs)
self._channel = channel self._channel = channel
self._metadata["connected"] = False self._metadata["connected"] = False
self._readback_timeout = (
readback_timeout if readback_timeout is not None else self.READBACK_TIMEOUT
)
self._metadata["write_access"] = False self._metadata["write_access"] = False
self._last_readback = 0.0
def get(self):
current_time = time.monotonic()
if current_time - self._last_readback > self._readback_timeout:
old_value = self._readback
self._last_readback = current_time # _socket_get may rely on this value to be set.
self._readback = self._socket_get()
self._run_subs(
sub_type=self.SUB_VALUE,
old_value=old_value,
value=self._readback,
timestamp=current_time,
)
return self._readback
def _socket_set(self, val): def _socket_set(self, val):
"""Read-only signal, so set method raises an error.""" """Read-only signal, so set method raises an error."""
@@ -136,6 +163,8 @@ class GalilRIOAnalogSignalRO(GalilSignalBase):
# Run subscriptions after all readbacks have been updated # Run subscriptions after all readbacks have been updated
# on all channels except the one that triggered the update # on all channels except the one that triggered the update
# TODO for now skip running subscribers, this should be re-implemented
# once we properly handle subscriptions from bec running "read"
for walk in self.parent.walk_signals(): for walk in self.parent.walk_signals():
if walk.item.attr_name in updates: if walk.item.attr_name in updates:
new_val, old_val = updates[walk.item.attr_name] new_val, old_val = updates[walk.item.attr_name]
@@ -185,7 +214,7 @@ def _create_analog_channels(num_channels: int) -> dict[str, tuple]:
an_channels[f"ch{i}"] = ( an_channels[f"ch{i}"] = (
GalilRIOAnalogSignalRO, GalilRIOAnalogSignalRO,
f"ch{i}", f"ch{i}",
{"kind": Kind.normal, "notify_bec": True, "channel": i, "doc": f"Analog channel {i}."}, {"kind": Kind.normal, "channel": i, "doc": f"Analog channel {i}."},
) )
return an_channels return an_channels
@@ -202,12 +231,7 @@ def _create_digital_output_channels(num_channels: int) -> dict[str, tuple]:
di_out_channels[f"ch{i}"] = ( di_out_channels[f"ch{i}"] = (
GalilRIODigitalOutSignal, GalilRIODigitalOutSignal,
f"ch{i}", f"ch{i}",
{ {"kind": Kind.config, "channel": i, "doc": f"Digital output channel {i}."},
"kind": Kind.config,
"notify_bec": True,
"channel": i,
"doc": f"Digital output channel {i}.",
},
) )
return di_out_channels return di_out_channels

View File

@@ -65,10 +65,8 @@ class RtLamniController(Controller):
"_position_sampling_single_read", "_position_sampling_single_read",
"_position_sampling_single_reset_and_start_sampling", "_position_sampling_single_reset_and_start_sampling",
"show_signal_strength_interferometer", "show_signal_strength_interferometer",
"show_interferometer_positions",
"show_analog_signals", "show_analog_signals",
"show_feedback_status", "show_feedback_status",
] ]
def __init__( def __init__(

View File

@@ -35,16 +35,16 @@ def test_save_frame(bec_client_mock):
lamni = LamNI(client) lamni = LamNI(client)
align = XrayEyeAlign(client, lamni) align = XrayEyeAlign(client, lamni)
with mock.patch( with mock.patch(
"csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put" "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
) as epics_put_mock: ) as epics_put_mock:
align.save_frame() align.save_frame()
epics_put_mock.assert_called_once_with("XOMNYI-XEYE-SAVFRAME:0", 1) epics_put_mock.assert_called_once_with("XOMNYI-XEYE-SAVFRAME:0", 1)
def test_update_frame(bec_client_mock): def test_update_frame(bec_client_mock):
epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put" epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_get" epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_get"
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.fshopen" fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.fshopen"
client = bec_client_mock client = bec_client_mock
client.device_manager.devices.xeye = DeviceBase( client.device_manager.devices.xeye = DeviceBase(
name="xeye", name="xeye",