@@ -16,8 +16,8 @@ import time
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import DynamicDeviceComponent as DDC
|
||||
from ophyd import Kind
|
||||
from ophyd_devices import PSIDeviceBase
|
||||
from ophyd_devices.utils.controller import Controller, threadlocked
|
||||
from ophyd_devices.utils.socket import SocketIO
|
||||
@@ -223,6 +223,42 @@ class GalilRIODigitalOutSignal(GalilSignalBase):
|
||||
)
|
||||
|
||||
|
||||
def _create_analog_channels(num_channels: int) -> dict[str, tuple]:
|
||||
"""
|
||||
Create a dictionary of analog channel definitions for the DynamicDeviceComponent.
|
||||
Starts from channel 0 to num_channels - 1.
|
||||
|
||||
Args:
|
||||
num_channels (int): The number of analog channels to create.
|
||||
"""
|
||||
an_channels = {}
|
||||
for i in range(0, num_channels):
|
||||
an_channels[f"an_ch{i}"] = (
|
||||
GalilRIOSignalRO,
|
||||
f"ch{i}.VAL",
|
||||
{"kind": Kind.normal, "auto_monitor": True, "doc": f"Analog channel {i}."},
|
||||
)
|
||||
return an_channels
|
||||
|
||||
|
||||
def _create_digital_output_channels(num_channels: int) -> dict[str, tuple]:
|
||||
"""
|
||||
Create a dictionary of digital output channel definitions for the DynamicDeviceComponent.
|
||||
Starts from channel 0 to num_channels - 1.
|
||||
|
||||
Args:
|
||||
num_channels (int): The number of digital output channels to create.
|
||||
"""
|
||||
di_out_channels = {}
|
||||
for i in range(0, num_channels):
|
||||
di_out_channels[f"di_out{i}"] = (
|
||||
GalilRIODigitalOutSignal,
|
||||
f"di_out_ch{i}.VAL",
|
||||
{"kind": Kind.config, "auto_monitor": True, "doc": f"Digital output channel {i}."},
|
||||
)
|
||||
return di_out_channels
|
||||
|
||||
|
||||
class GalilRIO(PSIDeviceBase):
|
||||
"""
|
||||
Galil RIO controller integration with 8 analog input channels. To implement the device,
|
||||
@@ -238,48 +274,8 @@ class GalilRIO(PSIDeviceBase):
|
||||
### Analog input channels ###
|
||||
#############################
|
||||
|
||||
an_ch0 = Cpt(GalilRIOSignalRO, signal_name="an_ch0", channel=0, doc="Analog input channel 0")
|
||||
an_ch1 = Cpt(GalilRIOSignalRO, signal_name="an_ch1", channel=1, doc="Analog input channel 1")
|
||||
an_ch2 = Cpt(GalilRIOSignalRO, signal_name="an_ch2", channel=2, doc="Analog input channel 2")
|
||||
an_ch3 = Cpt(GalilRIOSignalRO, signal_name="an_ch3", channel=3, doc="Analog input channel 3")
|
||||
an_ch4 = Cpt(GalilRIOSignalRO, signal_name="an_ch4", channel=4, doc="Analog input channel 4")
|
||||
an_ch5 = Cpt(GalilRIOSignalRO, signal_name="an_ch5", channel=5, doc="Analog input channel 5")
|
||||
an_ch6 = Cpt(GalilRIOSignalRO, signal_name="an_ch6", channel=6, doc="Analog input channel 6")
|
||||
an_ch7 = Cpt(GalilRIOSignalRO, signal_name="an_ch7", channel=7, doc="Analog input channel 7")
|
||||
|
||||
###############################
|
||||
### Digital output channels ###
|
||||
###############################
|
||||
di_out0 = Cpt(
|
||||
GalilRIODigitalOutSignal, signal_name="di_out0", channel=0, doc="Digital output channel 0"
|
||||
)
|
||||
di_out1 = Cpt(
|
||||
GalilRIODigitalOutSignal, signal_name="di_out1", channel=1, doc="Digital output channel 1"
|
||||
)
|
||||
di_out2 = Cpt(
|
||||
GalilRIODigitalOutSignal, signal_name="di_out2", channel=2, doc="Digital output channel 2"
|
||||
)
|
||||
di_out3 = Cpt(
|
||||
GalilRIODigitalOutSignal, signal_name="di_out3", channel=3, doc="Digital output channel 3"
|
||||
)
|
||||
di_out4 = Cpt(
|
||||
GalilRIODigitalOutSignal, signal_name="di_out4", channel=4, doc="Digital output channel 4"
|
||||
)
|
||||
di_out5 = Cpt(
|
||||
GalilRIODigitalOutSignal, signal_name="di_out5", channel=5, doc="Digital output channel 5"
|
||||
)
|
||||
di_out6 = Cpt(
|
||||
GalilRIODigitalOutSignal, signal_name="di_out6", channel=6, doc="Digital output channel 6"
|
||||
)
|
||||
di_out7 = Cpt(
|
||||
GalilRIODigitalOutSignal, signal_name="di_out7", channel=7, doc="Digital output channel 7"
|
||||
)
|
||||
di_out8 = Cpt(
|
||||
GalilRIODigitalOutSignal, signal_name="di_out8", channel=8, doc="Digital output channel 8"
|
||||
)
|
||||
di_out9 = Cpt(
|
||||
GalilRIODigitalOutSignal, signal_name="di_out9", channel=9, doc="Digital output channel 9"
|
||||
)
|
||||
an = DDC(_create_analog_channels(8)) # Creates an_ch0 to an_ch7
|
||||
di = DDC(_create_digital_output_channels(8)) # Creates di_out0 to di_out7
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -315,7 +311,7 @@ class GalilRIO(PSIDeviceBase):
|
||||
|
||||
def destroy(self) -> None:
|
||||
"""Make sure to turn off the controller socket on destroy."""
|
||||
self.controller.off(update_config=False)
|
||||
self.controller.off(update_normal=False)
|
||||
return super().destroy()
|
||||
|
||||
# pylint: disable=protected-access
|
||||
|
||||
@@ -274,11 +274,18 @@ def test_galil_rio_signal_read(galil_rio):
|
||||
|
||||
assert galil_rio.an_ch0._READ_TIMEOUT == 0.1 # Default read timeout of 100ms
|
||||
# Mock the socket to return specific values
|
||||
galil_rio.controller.sock.buffer_recv = [b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901"]
|
||||
an_buffer = b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901\r\n"
|
||||
di_buffer = b"0.0\r\n"
|
||||
galil_rio.controller.sock.buffer_recv = [] # Clear any existing buffer
|
||||
for name in galil_rio.component_names:
|
||||
if name.startswith("an_ch"):
|
||||
galil_rio.controller.sock.buffer_recv.append(an_buffer)
|
||||
elif name.startswith("di_ou"):
|
||||
galil_rio.controller.sock.buffer_recv.append(di_buffer)
|
||||
galil_rio._last_readback = 0 # Force read from controller
|
||||
|
||||
read_values = galil_rio.read()
|
||||
assert len(read_values) == 8 # 8 channels
|
||||
assert len(read_values) == 16 # 16 channels
|
||||
expected_values = {
|
||||
galil_rio.an_ch0.name: {"value": 1.234},
|
||||
galil_rio.an_ch1.name: {"value": 2.345},
|
||||
|
||||
Reference in New Issue
Block a user