Compare commits
4 Commits
refactor/m
...
fix/remove
| Author | SHA1 | Date | |
|---|---|---|---|
| afdc64e296 | |||
| bc31c00e1f | |||
|
|
38671f074e | ||
|
|
92e39a5f75 |
@@ -210,13 +210,11 @@ class LamNI(LamNIOpticsMixin):
|
|||||||
self.feedback_status()
|
self.feedback_status()
|
||||||
|
|
||||||
def feedback_status(self):
|
def feedback_status(self):
|
||||||
if self.device_manager.devices.rtx.controller.feedback_is_running():
|
self.device_manager.devices.rtx.controller.show_feedback_status()
|
||||||
print("The rt feedback is \x1b[92mrunning\x1b[0m.")
|
|
||||||
else:
|
|
||||||
print("The rt feedback is \x1b[91mNOT\x1b[0m running.")
|
|
||||||
|
|
||||||
def show_interferometer_positions(self):
|
def show_interferometer_positions(self):
|
||||||
self.device_manager.devices.rtx.controller.show_interferometer_positions()
|
self.device_manager.devices.rtx.controller.show_feedback_status()
|
||||||
|
|
||||||
def show_signal_strength(self):
|
def show_signal_strength(self):
|
||||||
self.device_manager.devices.rtx.controller.show_signal_strength_interferometer()
|
self.device_manager.devices.rtx.controller.show_signal_strength_interferometer()
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ which can be easily supported by changing the _NUM_DIGITAL_OUTPUT_CHANNELS varia
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import time
|
||||||
from typing import TYPE_CHECKING, Literal
|
from typing import TYPE_CHECKING, Literal
|
||||||
|
|
||||||
from bec_lib.logger import bec_logger
|
from bec_lib.logger import bec_logger
|
||||||
@@ -78,12 +79,38 @@ class GalilRIOAnalogSignalRO(GalilSignalBase):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
_NUM_ANALOG_CHANNELS = 8
|
_NUM_ANALOG_CHANNELS = 8
|
||||||
|
READBACK_TIMEOUT = 0.1 # time to wait in between two readback attemps in seconds, otherwise return cached value
|
||||||
|
|
||||||
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
|
def __init__(
|
||||||
|
self,
|
||||||
|
signal_name: str,
|
||||||
|
channel: int,
|
||||||
|
parent: GalilRIO,
|
||||||
|
readback_timeout: float = None,
|
||||||
|
**kwargs,
|
||||||
|
):
|
||||||
super().__init__(signal_name=signal_name, parent=parent, **kwargs)
|
super().__init__(signal_name=signal_name, parent=parent, **kwargs)
|
||||||
self._channel = channel
|
self._channel = channel
|
||||||
self._metadata["connected"] = False
|
self._metadata["connected"] = False
|
||||||
|
self._readback_timeout = (
|
||||||
|
readback_timeout if readback_timeout is not None else self.READBACK_TIMEOUT
|
||||||
|
)
|
||||||
self._metadata["write_access"] = False
|
self._metadata["write_access"] = False
|
||||||
|
self._last_readback = 0.0
|
||||||
|
|
||||||
|
def get(self):
|
||||||
|
current_time = time.monotonic()
|
||||||
|
if current_time - self._last_readback > self._readback_timeout:
|
||||||
|
old_value = self._readback
|
||||||
|
self._last_readback = current_time # _socket_get may rely on this value to be set.
|
||||||
|
self._readback = self._socket_get()
|
||||||
|
self._run_subs(
|
||||||
|
sub_type=self.SUB_VALUE,
|
||||||
|
old_value=old_value,
|
||||||
|
value=self._readback,
|
||||||
|
timestamp=current_time,
|
||||||
|
)
|
||||||
|
return self._readback
|
||||||
|
|
||||||
def _socket_set(self, val):
|
def _socket_set(self, val):
|
||||||
"""Read-only signal, so set method raises an error."""
|
"""Read-only signal, so set method raises an error."""
|
||||||
@@ -136,6 +163,8 @@ class GalilRIOAnalogSignalRO(GalilSignalBase):
|
|||||||
|
|
||||||
# Run subscriptions after all readbacks have been updated
|
# Run subscriptions after all readbacks have been updated
|
||||||
# on all channels except the one that triggered the update
|
# on all channels except the one that triggered the update
|
||||||
|
# TODO for now skip running subscribers, this should be re-implemented
|
||||||
|
# once we properly handle subscriptions from bec running "read"
|
||||||
for walk in self.parent.walk_signals():
|
for walk in self.parent.walk_signals():
|
||||||
if walk.item.attr_name in updates:
|
if walk.item.attr_name in updates:
|
||||||
new_val, old_val = updates[walk.item.attr_name]
|
new_val, old_val = updates[walk.item.attr_name]
|
||||||
@@ -185,7 +214,7 @@ def _create_analog_channels(num_channels: int) -> dict[str, tuple]:
|
|||||||
an_channels[f"ch{i}"] = (
|
an_channels[f"ch{i}"] = (
|
||||||
GalilRIOAnalogSignalRO,
|
GalilRIOAnalogSignalRO,
|
||||||
f"ch{i}",
|
f"ch{i}",
|
||||||
{"kind": Kind.normal, "notify_bec": True, "channel": i, "doc": f"Analog channel {i}."},
|
{"kind": Kind.normal, "channel": i, "doc": f"Analog channel {i}."},
|
||||||
)
|
)
|
||||||
return an_channels
|
return an_channels
|
||||||
|
|
||||||
@@ -202,12 +231,7 @@ def _create_digital_output_channels(num_channels: int) -> dict[str, tuple]:
|
|||||||
di_out_channels[f"ch{i}"] = (
|
di_out_channels[f"ch{i}"] = (
|
||||||
GalilRIODigitalOutSignal,
|
GalilRIODigitalOutSignal,
|
||||||
f"ch{i}",
|
f"ch{i}",
|
||||||
{
|
{"kind": Kind.config, "channel": i, "doc": f"Digital output channel {i}."},
|
||||||
"kind": Kind.config,
|
|
||||||
"notify_bec": True,
|
|
||||||
"channel": i,
|
|
||||||
"doc": f"Digital output channel {i}.",
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
return di_out_channels
|
return di_out_channels
|
||||||
|
|
||||||
|
|||||||
@@ -65,10 +65,8 @@ class RtLamniController(Controller):
|
|||||||
"_position_sampling_single_read",
|
"_position_sampling_single_read",
|
||||||
"_position_sampling_single_reset_and_start_sampling",
|
"_position_sampling_single_reset_and_start_sampling",
|
||||||
"show_signal_strength_interferometer",
|
"show_signal_strength_interferometer",
|
||||||
"show_interferometer_positions",
|
|
||||||
"show_analog_signals",
|
"show_analog_signals",
|
||||||
"show_feedback_status",
|
"show_feedback_status",
|
||||||
|
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
|
|||||||
@@ -35,16 +35,16 @@ def test_save_frame(bec_client_mock):
|
|||||||
lamni = LamNI(client)
|
lamni = LamNI(client)
|
||||||
align = XrayEyeAlign(client, lamni)
|
align = XrayEyeAlign(client, lamni)
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put"
|
"csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
|
||||||
) as epics_put_mock:
|
) as epics_put_mock:
|
||||||
align.save_frame()
|
align.save_frame()
|
||||||
epics_put_mock.assert_called_once_with("XOMNYI-XEYE-SAVFRAME:0", 1)
|
epics_put_mock.assert_called_once_with("XOMNYI-XEYE-SAVFRAME:0", 1)
|
||||||
|
|
||||||
|
|
||||||
def test_update_frame(bec_client_mock):
|
def test_update_frame(bec_client_mock):
|
||||||
epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put"
|
epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
|
||||||
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_get"
|
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_get"
|
||||||
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.fshopen"
|
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.fshopen"
|
||||||
client = bec_client_mock
|
client = bec_client_mock
|
||||||
client.device_manager.devices.xeye = DeviceBase(
|
client.device_manager.devices.xeye = DeviceBase(
|
||||||
name="xeye",
|
name="xeye",
|
||||||
|
|||||||
Reference in New Issue
Block a user