Compare commits
4 Commits
refactor/m
...
fix/remove
| Author | SHA1 | Date | |
|---|---|---|---|
| afdc64e296 | |||
| bc31c00e1f | |||
|
|
38671f074e | ||
|
|
92e39a5f75 |
@@ -210,13 +210,11 @@ class LamNI(LamNIOpticsMixin):
|
||||
self.feedback_status()
|
||||
|
||||
def feedback_status(self):
|
||||
if self.device_manager.devices.rtx.controller.feedback_is_running():
|
||||
print("The rt feedback is \x1b[92mrunning\x1b[0m.")
|
||||
else:
|
||||
print("The rt feedback is \x1b[91mNOT\x1b[0m running.")
|
||||
self.device_manager.devices.rtx.controller.show_feedback_status()
|
||||
|
||||
|
||||
def show_interferometer_positions(self):
|
||||
self.device_manager.devices.rtx.controller.show_interferometer_positions()
|
||||
self.device_manager.devices.rtx.controller.show_feedback_status()
|
||||
|
||||
def show_signal_strength(self):
|
||||
self.device_manager.devices.rtx.controller.show_signal_strength_interferometer()
|
||||
|
||||
@@ -13,6 +13,7 @@ which can be easily supported by changing the _NUM_DIGITAL_OUTPUT_CHANNELS varia
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
@@ -78,12 +79,38 @@ class GalilRIOAnalogSignalRO(GalilSignalBase):
|
||||
"""
|
||||
|
||||
_NUM_ANALOG_CHANNELS = 8
|
||||
READBACK_TIMEOUT = 0.1 # time to wait in between two readback attemps in seconds, otherwise return cached value
|
||||
|
||||
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
signal_name: str,
|
||||
channel: int,
|
||||
parent: GalilRIO,
|
||||
readback_timeout: float = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(signal_name=signal_name, parent=parent, **kwargs)
|
||||
self._channel = channel
|
||||
self._metadata["connected"] = False
|
||||
self._readback_timeout = (
|
||||
readback_timeout if readback_timeout is not None else self.READBACK_TIMEOUT
|
||||
)
|
||||
self._metadata["write_access"] = False
|
||||
self._last_readback = 0.0
|
||||
|
||||
def get(self):
|
||||
current_time = time.monotonic()
|
||||
if current_time - self._last_readback > self._readback_timeout:
|
||||
old_value = self._readback
|
||||
self._last_readback = current_time # _socket_get may rely on this value to be set.
|
||||
self._readback = self._socket_get()
|
||||
self._run_subs(
|
||||
sub_type=self.SUB_VALUE,
|
||||
old_value=old_value,
|
||||
value=self._readback,
|
||||
timestamp=current_time,
|
||||
)
|
||||
return self._readback
|
||||
|
||||
def _socket_set(self, val):
|
||||
"""Read-only signal, so set method raises an error."""
|
||||
@@ -136,6 +163,8 @@ class GalilRIOAnalogSignalRO(GalilSignalBase):
|
||||
|
||||
# Run subscriptions after all readbacks have been updated
|
||||
# on all channels except the one that triggered the update
|
||||
# TODO for now skip running subscribers, this should be re-implemented
|
||||
# once we properly handle subscriptions from bec running "read"
|
||||
for walk in self.parent.walk_signals():
|
||||
if walk.item.attr_name in updates:
|
||||
new_val, old_val = updates[walk.item.attr_name]
|
||||
@@ -185,7 +214,7 @@ def _create_analog_channels(num_channels: int) -> dict[str, tuple]:
|
||||
an_channels[f"ch{i}"] = (
|
||||
GalilRIOAnalogSignalRO,
|
||||
f"ch{i}",
|
||||
{"kind": Kind.normal, "notify_bec": True, "channel": i, "doc": f"Analog channel {i}."},
|
||||
{"kind": Kind.normal, "channel": i, "doc": f"Analog channel {i}."},
|
||||
)
|
||||
return an_channels
|
||||
|
||||
@@ -202,12 +231,7 @@ def _create_digital_output_channels(num_channels: int) -> dict[str, tuple]:
|
||||
di_out_channels[f"ch{i}"] = (
|
||||
GalilRIODigitalOutSignal,
|
||||
f"ch{i}",
|
||||
{
|
||||
"kind": Kind.config,
|
||||
"notify_bec": True,
|
||||
"channel": i,
|
||||
"doc": f"Digital output channel {i}.",
|
||||
},
|
||||
{"kind": Kind.config, "channel": i, "doc": f"Digital output channel {i}."},
|
||||
)
|
||||
return di_out_channels
|
||||
|
||||
|
||||
@@ -65,10 +65,8 @@ class RtLamniController(Controller):
|
||||
"_position_sampling_single_read",
|
||||
"_position_sampling_single_reset_and_start_sampling",
|
||||
"show_signal_strength_interferometer",
|
||||
"show_interferometer_positions",
|
||||
"show_analog_signals",
|
||||
"show_feedback_status",
|
||||
|
||||
]
|
||||
|
||||
def __init__(
|
||||
|
||||
@@ -35,16 +35,16 @@ def test_save_frame(bec_client_mock):
|
||||
lamni = LamNI(client)
|
||||
align = XrayEyeAlign(client, lamni)
|
||||
with mock.patch(
|
||||
"csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put"
|
||||
"csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
|
||||
) as epics_put_mock:
|
||||
align.save_frame()
|
||||
epics_put_mock.assert_called_once_with("XOMNYI-XEYE-SAVFRAME:0", 1)
|
||||
|
||||
|
||||
def test_update_frame(bec_client_mock):
|
||||
epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put"
|
||||
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_get"
|
||||
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.fshopen"
|
||||
epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
|
||||
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_get"
|
||||
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.fshopen"
|
||||
client = bec_client_mock
|
||||
client.device_manager.devices.xeye = DeviceBase(
|
||||
name="xeye",
|
||||
|
||||
Reference in New Issue
Block a user