feat(galil-rio): Add GalilRio Signal for analog channels
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m18s
CI for csaxs_bec / test (pull_request) Successful in 1m16s

This commit is contained in:
2026-01-16 09:52:45 +01:00
parent 2cf2f4b4e4
commit d396091d97

View File

@@ -1,10 +1,37 @@
"""
Module for the Galil RIO (RIO-471xx) controller interface. The controller is a compact PLC
with Ethernet. It has digital and analog I/O as well as counters and timers.
Link to the Galil RIO vendor page:
https://www.galil.com/plcs/remote-io/rio-471xx
This module provides the GalilRIOController for communication with the RIO controller
over TCP/IP as well as a read-only Ophyd Signal, which reads from all 8 analog input channels.
This signal can be used to create virtual devices in BEC that process the analog input values
and combine them into more complex signals. For this purpose, the GalilRIOSignal should be
integrated as a 'monitored' or 'baseline' signal in the device config of BEC.
"""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketSignal
from ophyd_devices.utils.socket import SocketIO, SocketSignal
from csaxs_bec.devices.omny.galil.galil_ophyd import GalilCommunicationError, retry_once
from csaxs_bec.devices.omny.galil.galil_ophyd import (
GalilCommunicationError,
ReadOnlyError,
retry_once,
)
if TYPE_CHECKING: # pragma: no cover
from bec_server.device_server.devices.devicemanager import DeviceManagerDS
class GalilRIO(Controller):
class GalilRIOController(Controller):
"""Controller Class for Galil RIO controller communication."""
@threadlocked
def socket_put(self, val: str) -> None:
@@ -28,8 +55,72 @@ class GalilRIO(Controller):
)
class GalilRIOSignalBase(SocketSignal):
def __init__(self, signal_name, **kwargs):
self.signal_name = signal_name
super().__init__(**kwargs)
self.rio_controller = self.parent.rio_controller
class GalilRIOSignal(SocketSignal):
"""
Read-only Signal for reading _NUM_ANALOG_CH analog input channels from Galil RIO controller.
The signal reads all channels at once and returns a list of float values.
"""
_NUM_ANALOG_CH = 8
def __init__(
self,
name: str,
host: str = "129.129.98.64",
port: int = 23,
socket_cls=SocketIO,
device_manager: DeviceManagerDS | None = None,
**kwargs,
):
super().__init__(name=name, **kwargs)
self._metadata["write_access"] = False
self.controller = GalilRIOController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
)
self._metadata["connected"] = False
self._readback = [0.0] * self._NUM_ANALOG_CH # Set proper initial value type
def wait_for_connection(self, timeout=10, **kwargs):
"""
Wait for socket connection to be established within timeout period.
This is needed to ensure that the controller is connected before starting
to read values.
Args:
timeout (int): Time in seconds to wait for connection
"""
self.controller.on(timeout=timeout)
self._metadata["connected"] = True
def destroy(self):
"""Make sure to turn off the controller socket on destroy."""
self.controller.off()
return super().destroy()
def _socket_set(self, val):
raise ReadOnlyError("Read-only signals cannot be set")
@threadlocked
def _socket_get(self) -> list[float]:
"""Get command for the readback signal
Returns:
list[float]: List of analog channel values
"""
cmd = "MG@" + ",@".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CH)])
ret = self.controller.socket_put_and_receive(cmd)
timestamp = time.time()
self._metadata["timestamp"] = timestamp
return [float(val) for val in ret.strip().split(" ")]
def get(self):
"""Get current analog channel values from the Galil RIO controller."""
old_val = self._readback if isinstance(self._readback, list) else []
self._readback = self._socket_get()
self._run_subs(
sub_type=self.SUB_VALUE,
old_value=old_val,
value=self._readback,
timestamp=self._metadata.get("timestamp", time.time()),
)
return self._readback