From b6ac8eaae135060ddfc7acd3e12e8f44c042c77e Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 10 Feb 2026 14:14:14 +0100 Subject: [PATCH] w --- csaxs_bec/devices/omny/galil/galil_rio.py | 50 ++++++++++++++++++++++- tests/tests_devices/test_galil.py | 15 +++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/csaxs_bec/devices/omny/galil/galil_rio.py b/csaxs_bec/devices/omny/galil/galil_rio.py index 7dde0f6..0b558a5 100644 --- a/csaxs_bec/devices/omny/galil/galil_rio.py +++ b/csaxs_bec/devices/omny/galil/galil_rio.py @@ -163,13 +163,61 @@ class GalilRIODigitalOutSignal(GalilSignalBase): self._readback = float(ret.strip().split(" ")) return self._readback - def _socket_put(self, value: Literal[0, 1]) -> None: + def _socket_set(self, value: Literal[0, 1]) -> None: """Set command for the digital output signal. Value should be 0 or 1.""" + if value not in (0, 1): raise ValueError("Digital output value must be 0 or 1.") cmd = f"SB{self._channel}" if value == 1 else f"CB{self._channel}" self.controller.socket_put_confirmed(cmd) + def put( + self, + value, + force=False, + connection_timeout=1, + callback=None, + use_complete=None, + timeout=1, + **kwargs, + ): + """Using channel access, set the write PV to `value`. + + Keyword arguments are passed on to callbacks + + Parameters + ---------- + value : any + The value to set + force : bool, optional + Skip checking the value in Python first + connection_timeout : float, optional + If not already connected, allow up to `connection_timeout` seconds + for the connection to complete. + use_complete : bool, optional + Override put completion settings + callback : callable + Callback for when the put has completed + timeout : float, optional + Timeout before assuming that put has failed. (Only relevant if + put completion is used.) + """ + if not force: + pass + # self.check_value(value) + old_value = self._readback + self.wait_for_connection(timeout=connection_timeout) + if use_complete is None: + use_complete = False + + self._socket_set(value) + + timestamp = time.time() + super().put(value, timestamp=timestamp, force=True) + self._run_subs( + sub_type=self.SUB_SETPOINT, old_value=old_value, value=value, timestamp=timestamp + ) + class GalilRIO(PSIDeviceBase): """ diff --git a/tests/tests_devices/test_galil.py b/tests/tests_devices/test_galil.py index 3024640..567d9db 100644 --- a/tests/tests_devices/test_galil.py +++ b/tests/tests_devices/test_galil.py @@ -356,3 +356,18 @@ def test_galil_rio_signal_read(galil_rio): for value in value_callback_buffer[0].values() ] ) + + +def test_galil_rio_digital_out_signal(galil_rio): + """ + Test that the Galil RIO digital output signal can be set correctly. + """ + # Set digital output channel 0 to high + galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback] + galil_rio.di_out0.put(1) + assert galil_rio.controller.sock.buffer_put == [b"SB0\r"] + + galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback + # Set digital output channel 0 to low + galil_rio.di_out0.put(0) + assert galil_rio.controller.sock.buffer_put == [b"SB0\r", b"CB0\r"]