feat(galil-rio): Add di_out channels to GalilRIO
This commit is contained in:
@@ -4,6 +4,7 @@ This module contains the base class for Galil controllers as well as the signals
|
||||
|
||||
import functools
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from bec_lib import bec_logger
|
||||
from ophyd.utils import ReadOnlyError
|
||||
@@ -347,7 +348,23 @@ class GalilSignalBase(SocketSignal):
|
||||
def __init__(self, signal_name, **kwargs):
|
||||
self.signal_name = signal_name
|
||||
super().__init__(**kwargs)
|
||||
self.controller = self.parent.controller
|
||||
self.controller: Controller = self._find_attribute_recursively("controller")
|
||||
|
||||
def _find_attribute_recursively(self, attribute: str) -> Any:
|
||||
"""
|
||||
Find an attribute recursively for nested sub-devices.
|
||||
This is needed to find for example the controller instance for DDC components,
|
||||
thus nested devices.
|
||||
"""
|
||||
max_depth = 10 # to prevent infinite recursion
|
||||
current_parent = self.parent
|
||||
depth = 0
|
||||
while current_parent is not None and depth < max_depth:
|
||||
if hasattr(current_parent, attribute):
|
||||
return getattr(current_parent, attribute)
|
||||
current_parent = getattr(current_parent, "parent", None)
|
||||
depth += 1
|
||||
raise RuntimeError(f"Attribute '{attribute}' not found within maximum depth {max_depth}.")
|
||||
|
||||
|
||||
class GalilSignalRO(GalilSignalBase):
|
||||
|
||||
@@ -13,16 +13,19 @@ over TCP/IP. It also provides a device integration that interfaces to these
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, Literal, Type
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import DynamicDeviceComponent as DDC
|
||||
from ophyd import Kind
|
||||
from ophyd.utils import ReadOnlyError
|
||||
from ophyd_devices import PSIDeviceBase
|
||||
from ophyd_devices.utils.controller import Controller, threadlocked
|
||||
from ophyd_devices.utils.socket import SocketIO
|
||||
|
||||
from csaxs_bec.devices.omny.galil.galil_ophyd import (
|
||||
GalilCommunicationError,
|
||||
GalilSignalBase,
|
||||
GalilSignalRO,
|
||||
retry_once,
|
||||
)
|
||||
@@ -64,7 +67,7 @@ class GalilRIOController(Controller):
|
||||
)
|
||||
|
||||
|
||||
class GalilRIOSignalRO(GalilSignalRO):
|
||||
class GalilRIOSignal(GalilSignalBase):
|
||||
"""
|
||||
Read-only Signal for reading a single analog input channel from the Galil RIO controller.
|
||||
It always read all 8 analog channels at once, and updates the reabacks of all channels.
|
||||
@@ -77,37 +80,28 @@ class GalilRIOSignalRO(GalilSignalRO):
|
||||
parent (GalilRIO): Parent GalilRIO device.
|
||||
"""
|
||||
|
||||
_NUM_ANALOG_CHANNELS = 8
|
||||
_READ_TIMEOUT = 0.1 # seconds
|
||||
|
||||
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
|
||||
super().__init__(signal_name, parent=parent, **kwargs)
|
||||
self._readback_metadata = self._find_attribute_recursively("_readback_metadata")
|
||||
self._channel = channel
|
||||
self._metadata["connected"] = False
|
||||
|
||||
def _socket_get(self) -> float:
|
||||
"""Get command for the readback signal"""
|
||||
cmd = "MG@" + ",@".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)])
|
||||
ret = self.controller.socket_put_and_receive(cmd)
|
||||
values = [float(val) for val in ret.strip().split(" ")]
|
||||
# This updates all channels' readbacks, including self._readback
|
||||
self._update_all_channels(values)
|
||||
return self._readback
|
||||
|
||||
def get(self):
|
||||
"""Get current analog channel values from the Galil RIO controller."""
|
||||
# If the last readback has happend more than _READ_TIMEOUT seconds ago, read all channels again
|
||||
if time.monotonic() - self.parent.last_readback > self._READ_TIMEOUT:
|
||||
if time.monotonic() - self._readback_metadata["last_readback"] > self._READ_TIMEOUT:
|
||||
self._readback = self._socket_get()
|
||||
return self._readback
|
||||
|
||||
# pylint: disable=protected-access
|
||||
def _update_all_channels(self, values: list[float]) -> None:
|
||||
def _update_all_channels(self, values: list[float], signal_cls: Type[GalilRIOSignal]) -> None:
|
||||
"""
|
||||
Update all analog channel readbacks based on the provided list of values.
|
||||
List of values must be in order from an_ch0 to an_ch7.
|
||||
|
||||
We first have to update the _last_readback timestamp of the GalilRIO parent device.
|
||||
We first have to update the timestamp of the GalilRIO _readback_metadata device.
|
||||
Then we update all readbacks of all an_ch channels, before we run any subscriptions.
|
||||
This ensures that all readbacks are updated before any subscriptions are run, which
|
||||
may themselves read other channels.
|
||||
@@ -115,14 +109,15 @@ class GalilRIOSignalRO(GalilSignalRO):
|
||||
Args:
|
||||
values (list[float]): List of 8 float values corresponding to the analog channels.
|
||||
They must be in order from an_ch0 to an_ch7.
|
||||
signal_cls (Type[GalilRIOSignal]): The class of the signal to update, used to identify which signals to update.
|
||||
"""
|
||||
timestamp = time.time()
|
||||
# Update parent's last readback before running subscriptions!!
|
||||
self.parent._last_readback = time.monotonic()
|
||||
self._readback_metadata["last_readback"] = time.monotonic()
|
||||
updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val)
|
||||
# Update all readbacks first
|
||||
for walk in self.parent.walk_signals():
|
||||
if walk.item.attr_name.startswith("an_ch"):
|
||||
if isinstance(walk.item, signal_cls):
|
||||
idx = int(walk.item.attr_name[-1])
|
||||
if 0 <= idx < len(values):
|
||||
old_val = walk.item._readback
|
||||
@@ -143,6 +138,96 @@ class GalilRIOSignalRO(GalilSignalRO):
|
||||
)
|
||||
|
||||
|
||||
class GalilRIOSignalRO(GalilRIOSignal):
|
||||
|
||||
_NUM_ANALOG_CHANNELS = 8
|
||||
|
||||
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
|
||||
super().__init__(signal_name=signal_name, channel=channel, parent=parent, **kwargs)
|
||||
self._metadata["write_access"] = False
|
||||
|
||||
def _socket_set(self, val):
|
||||
raise ReadOnlyError(f"Signal {self.name} is read-only.")
|
||||
|
||||
def _socket_get(self) -> float:
|
||||
"""Get command for the readback signal"""
|
||||
cmd = "MG@" + ", @".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)])
|
||||
ret = self.controller.socket_put_and_receive(cmd)
|
||||
values = [float(val) for val in ret.strip().split(" ")]
|
||||
# This updates all channels' readbacks, including self._readback
|
||||
self._update_all_channels(values, signal_cls=GalilRIOSignalRO)
|
||||
return self._readback
|
||||
|
||||
|
||||
class GalilRIODigitalOutSignal(GalilRIOSignal): # We reuse the logic implemented for Galil
|
||||
"""
|
||||
Signal for controlling digital outputs of the Galil RIO controller.
|
||||
"""
|
||||
|
||||
_NUM_DIGITAL_OUTPUT_CHANNELS = 24
|
||||
|
||||
def _socket_get(self) -> float:
|
||||
"""Get command for the readback signal"""
|
||||
cmd = f"MG@OUT[{self._channel}]"
|
||||
ret = self.controller.socket_put_and_receive(cmd)
|
||||
self._readback = float(ret.strip())
|
||||
return self._readback
|
||||
|
||||
def _socket_set(self, val: Literal[0, 1]) -> None:
|
||||
"""Set command for the digital output signal. Value should be 0 or 1."""
|
||||
|
||||
if val not in (0, 1):
|
||||
raise ValueError("Digital output value must be 0 or 1.")
|
||||
cmd = f"SB{self._channel}" if val == 1 else f"CB{self._channel}"
|
||||
self.controller.socket_put_confirmed(cmd)
|
||||
|
||||
|
||||
def _create_analog_channels(num_channels: int) -> dict[str, tuple]:
|
||||
"""
|
||||
Create a dictionary of analog channel definitions for the DynamicDeviceComponent.
|
||||
Starts from channel 0 to num_channels - 1.
|
||||
|
||||
Args:
|
||||
num_channels (int): The number of analog channels to create.
|
||||
"""
|
||||
an_channels = {}
|
||||
for i in range(0, num_channels):
|
||||
an_channels[f"ch{i}"] = (
|
||||
GalilRIOSignalRO,
|
||||
f"ch{i}",
|
||||
{
|
||||
"kind": Kind.normal,
|
||||
"auto_monitor": True,
|
||||
"channel": i,
|
||||
"doc": f"Analog channel {i}.",
|
||||
},
|
||||
)
|
||||
return an_channels
|
||||
|
||||
|
||||
def _create_digital_output_channels(num_channels: int) -> dict[str, tuple]:
|
||||
"""
|
||||
Create a dictionary of digital output channel definitions for the DynamicDeviceComponent.
|
||||
Starts from channel 0 to num_channels - 1.
|
||||
|
||||
Args:
|
||||
num_channels (int): The number of digital output channels to create.
|
||||
"""
|
||||
di_out_channels = {}
|
||||
for i in range(0, num_channels):
|
||||
di_out_channels[f"ch{i}"] = (
|
||||
GalilRIODigitalOutSignal,
|
||||
f"ch{i}",
|
||||
{
|
||||
"kind": Kind.config,
|
||||
"auto_monitor": True,
|
||||
"channel": i,
|
||||
"doc": f"Digital output channel {i}.",
|
||||
},
|
||||
)
|
||||
return di_out_channels
|
||||
|
||||
|
||||
class GalilRIO(PSIDeviceBase):
|
||||
"""
|
||||
Galil RIO controller integration with 8 analog input channels. To implement the device,
|
||||
@@ -154,14 +239,16 @@ class GalilRIO(PSIDeviceBase):
|
||||
|
||||
SUB_CONNECTION_CHANGE = "connection_change"
|
||||
|
||||
an_ch0 = Cpt(GalilRIOSignalRO, signal_name="an_ch0", channel=0, doc="Analog input channel 0")
|
||||
an_ch1 = Cpt(GalilRIOSignalRO, signal_name="an_ch1", channel=1, doc="Analog input channel 1")
|
||||
an_ch2 = Cpt(GalilRIOSignalRO, signal_name="an_ch2", channel=2, doc="Analog input channel 2")
|
||||
an_ch3 = Cpt(GalilRIOSignalRO, signal_name="an_ch3", channel=3, doc="Analog input channel 3")
|
||||
an_ch4 = Cpt(GalilRIOSignalRO, signal_name="an_ch4", channel=4, doc="Analog input channel 4")
|
||||
an_ch5 = Cpt(GalilRIOSignalRO, signal_name="an_ch5", channel=5, doc="Analog input channel 5")
|
||||
an_ch6 = Cpt(GalilRIOSignalRO, signal_name="an_ch6", channel=6, doc="Analog input channel 6")
|
||||
an_ch7 = Cpt(GalilRIOSignalRO, signal_name="an_ch7", channel=7, doc="Analog input channel 7")
|
||||
#############################
|
||||
### Analog input channels ###
|
||||
#############################
|
||||
|
||||
analog_in = DDC(
|
||||
_create_analog_channels(GalilRIOSignalRO._NUM_ANALOG_CHANNELS)
|
||||
) # Creates ch0 to ch7
|
||||
digital_out = DDC(
|
||||
_create_digital_output_channels(GalilRIODigitalOutSignal._NUM_DIGITAL_OUTPUT_CHANNELS)
|
||||
) # Creates ch0 to ch23
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -177,19 +264,18 @@ class GalilRIO(PSIDeviceBase):
|
||||
if port is None:
|
||||
port = 23 # Default port for Galil RIO controller
|
||||
self.controller = GalilRIOController(
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
|
||||
name=f"GalilRIOController_{name}",
|
||||
socket_cls=socket_cls,
|
||||
socket_host=host,
|
||||
socket_port=port,
|
||||
device_manager=device_manager,
|
||||
)
|
||||
self._last_readback: float = time.monotonic()
|
||||
self._readback_metadata: dict[str, float] = {"last_readback": 0.0}
|
||||
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
|
||||
self.controller.subscribe(
|
||||
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
|
||||
)
|
||||
|
||||
@property
|
||||
def last_readback(self) -> float:
|
||||
"""Return the time of the last readback from the controller."""
|
||||
return self._last_readback
|
||||
|
||||
# pylint: disable=arguments-differ
|
||||
def wait_for_connection(self, timeout: float = 30.0) -> None:
|
||||
"""Wait for the RIO controller to be connected within timeout period."""
|
||||
|
||||
@@ -274,11 +274,18 @@ def test_galil_rio_signal_read(galil_rio):
|
||||
|
||||
assert galil_rio.an_ch0._READ_TIMEOUT == 0.1 # Default read timeout of 100ms
|
||||
# Mock the socket to return specific values
|
||||
galil_rio.controller.sock.buffer_recv = [b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901"]
|
||||
an_buffer = b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901\r\n"
|
||||
di_buffer = b"0.0\r\n"
|
||||
galil_rio.controller.sock.buffer_recv = [] # Clear any existing buffer
|
||||
for name in galil_rio.component_names:
|
||||
if name.startswith("an_ch"):
|
||||
galil_rio.controller.sock.buffer_recv.append(an_buffer)
|
||||
elif name.startswith("di_ou"):
|
||||
galil_rio.controller.sock.buffer_recv.append(di_buffer)
|
||||
galil_rio._last_readback = 0 # Force read from controller
|
||||
|
||||
read_values = galil_rio.read()
|
||||
assert len(read_values) == 8 # 8 channels
|
||||
assert len(read_values) == 16 # 16 channels
|
||||
expected_values = {
|
||||
galil_rio.an_ch0.name: {"value": 1.234},
|
||||
galil_rio.an_ch1.name: {"value": 2.345},
|
||||
@@ -356,3 +363,18 @@ def test_galil_rio_signal_read(galil_rio):
|
||||
for value in value_callback_buffer[0].values()
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_galil_rio_digital_out_signal(galil_rio):
|
||||
"""
|
||||
Test that the Galil RIO digital output signal can be set correctly.
|
||||
"""
|
||||
# Set digital output channel 0 to high
|
||||
galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback]
|
||||
galil_rio.di_out0.put(1)
|
||||
assert galil_rio.controller.sock.buffer_put == [b"SB0\r"]
|
||||
|
||||
galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback
|
||||
# Set digital output channel 0 to low
|
||||
galil_rio.di_out0.put(0)
|
||||
assert galil_rio.controller.sock.buffer_put == [b"SB0\r", b"CB0\r"]
|
||||
|
||||
Reference in New Issue
Block a user