feat(galil-rio): Add di_out channels to GalilRIO
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m27s

This commit is contained in:
2026-02-10 12:39:09 +01:00
parent 82d47c7511
commit f42f8e207c

View File

@@ -13,16 +13,18 @@ over TCP/IP. It also provides a device integration that interfaces to these
from __future__ import annotations
import time
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Literal
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DynamicDeviceComponent as DDC
from ophyd_devices import PSIDeviceBase
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO
from csaxs_bec.devices.omny.galil.galil_ophyd import (
GalilCommunicationError,
GalilSignalBase,
GalilSignalRO,
retry_once,
)
@@ -143,6 +145,32 @@ class GalilRIOSignalRO(GalilSignalRO):
)
class GalilRIODigitalOutSignal(GalilSignalBase):
"""
Signal for controlling digital outputs of the Galil RIO controller.
# TODO Check and consider if we want to also add readout on all, and a readback throttling
"""
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
super().__init__(signal_name, parent=parent, **kwargs)
self._channel = channel
self._metadata["connected"] = False
def _socket_get(self) -> float:
"""Get command for the readback signal"""
cmd = f"@OUT[{self._channel}]"
ret = self.controller.socket_put_and_receive(cmd)
self._readback = float(ret.strip().split(" "))
return self._readback
def _socket_put(self, value: Literal[0, 1]) -> None:
"""Set command for the digital output signal. Value should be 0 or 1."""
if value not in (0, 1):
raise ValueError("Digital output value must be 0 or 1.")
cmd = f"SB{self._channel}" if value == 1 else f"CB{self._channel}"
self.controller.socket_put_confirmed(cmd)
class GalilRIO(PSIDeviceBase):
"""
Galil RIO controller integration with 8 analog input channels. To implement the device,
@@ -154,6 +182,10 @@ class GalilRIO(PSIDeviceBase):
SUB_CONNECTION_CHANGE = "connection_change"
#############################
### Analog input channels ###
#############################
an_ch0 = Cpt(GalilRIOSignalRO, signal_name="an_ch0", channel=0, doc="Analog input channel 0")
an_ch1 = Cpt(GalilRIOSignalRO, signal_name="an_ch1", channel=1, doc="Analog input channel 1")
an_ch2 = Cpt(GalilRIOSignalRO, signal_name="an_ch2", channel=2, doc="Analog input channel 2")
@@ -163,6 +195,40 @@ class GalilRIO(PSIDeviceBase):
an_ch6 = Cpt(GalilRIOSignalRO, signal_name="an_ch6", channel=6, doc="Analog input channel 6")
an_ch7 = Cpt(GalilRIOSignalRO, signal_name="an_ch7", channel=7, doc="Analog input channel 7")
###############################
### Digital output channels ###
###############################
di_out0 = Cpt(
GalilRIODigitalOutSignal, signal_name="di_out0", channel=0, doc="Digital output channel 0"
)
di_out1 = Cpt(
GalilRIODigitalOutSignal, signal_name="di_out1", channel=1, doc="Digital output channel 1"
)
di_out2 = Cpt(
GalilRIODigitalOutSignal, signal_name="di_out2", channel=2, doc="Digital output channel 2"
)
di_out3 = Cpt(
GalilRIODigitalOutSignal, signal_name="di_out3", channel=3, doc="Digital output channel 3"
)
di_out4 = Cpt(
GalilRIODigitalOutSignal, signal_name="di_out4", channel=4, doc="Digital output channel 4"
)
di_out5 = Cpt(
GalilRIODigitalOutSignal, signal_name="di_out5", channel=5, doc="Digital output channel 5"
)
di_out6 = Cpt(
GalilRIODigitalOutSignal, signal_name="di_out6", channel=6, doc="Digital output channel 6"
)
di_out7 = Cpt(
GalilRIODigitalOutSignal, signal_name="di_out7", channel=7, doc="Digital output channel 7"
)
di_out8 = Cpt(
GalilRIODigitalOutSignal, signal_name="di_out8", channel=8, doc="Digital output channel 8"
)
di_out9 = Cpt(
GalilRIODigitalOutSignal, signal_name="di_out9", channel=9, doc="Digital output channel 9"
)
def __init__(
self,
*,