fix(fast-shutter): Fix cSAXS fast shutter device
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m30s
CI for csaxs_bec / test (pull_request) Failing after 1m30s

This commit is contained in:
2026-02-09 08:29:08 +01:00
parent ed8d012632
commit 6fad4f2034
3 changed files with 116 additions and 84 deletions

View File

@@ -1,12 +1,11 @@
import time
import socket
"""
Shutter device for the cSAXS beamline with 2 PVs. One is connected to a
signal can be set to control the shutter signal, and the other is a readback signal
that can be monitored to check the shutter status as it may be controlled directly by
the delay generator."""
from ophyd import Component as Cpt
from ophyd import Device, Kind
from ophyd import EpicsSignal
class FastEpicsShutterError(Exception):
pass
from ophyd import Device, EpicsSignal, Kind
class cSAXSFastEpicsShutter(Device):
@@ -22,45 +21,35 @@ class cSAXSFastEpicsShutter(Device):
shutter = Cpt(EpicsSignal, "OUT_01", kind=Kind.normal, auto_monitor=True)
shutter_readback = Cpt(EpicsSignal, "IN_01", kind=Kind.normal, auto_monitor=True)
# -----------------------------------------------------
# User-facing shutter control functions
# -----------------------------------------------------
def fshopen(self):
# pylint: disable=protetced-access
def fshopen(self) -> None:
"""Open the fast shutter."""
try:
self.shutter.put(1, wait=True)
except Exception as ex:
raise OMNYFastEpicsShutterError(f"Failed to open shutter: {ex}")
self.shutter.set(1).wait(timeout=self.shutter._timeout) # 2s default for ES
def fshclose(self):
# pylint: disable=protetced-access
def fshclose(self) -> None:
"""Close the fast shutter."""
try:
self.shutter.put(0, wait=True)
except Exception as ex:
raise OMNYFastEpicsShutterError(f"Failed to close shutter: {ex}")
self.shutter.set(0).wait(timeout=self.shutter._timeout) # 2s default for ES
def fshstatus(self):
"""Return the fast shutter status (0=closed, 1=open)."""
try:
return self.shutter.get()
except Exception as ex:
raise OMNYFastEpicsShutterError(f"Failed to read shutter status: {ex}")
def fshstatus(self) -> int:
"""Return the fast shutter control status (0=closed, 1=open)."""
return self.shutter.get() # Ensure we have the latest value from EPICS
def fshstatus_readback(self):
def fshstatus_readback(self) -> int:
"""Return the fast shutter status (0=closed, 1=open)."""
try:
return self.shutter_readback.get()
except Exception as ex:
raise OMNYFastEpicsShutterError(f"Failed to read shutter status: {ex}")
def fshinfo(self):
return self.shutter_readback.get() # Ensure we have the latest value from EPICS
def fshinfo(self) -> None:
"""Print information about which EPICS PV channel is being used."""
pvname = self.shutter.pvname
shutter_readback_pvname = self.shutter_readback.pvname
print(f"Fast shutter connected to EPICS channel: {pvname} with shutter readback: {shutter_readback_pvname}")
return pvname
print(
f"Fast shutter connected to EPICS channel: {pvname} with shutter readback: {shutter_readback_pvname}"
)
def help(self):
"""Display available user methods."""
@@ -68,5 +57,6 @@ class cSAXSFastEpicsShutter(Device):
for method in self.USER_ACCESS:
print(f" - {method}")
if __name__ == "__main__":
fsh = cSAXSFastEpicsShutter(name="fsh", prefix="X12SA-ES1-TTL:")

View File

@@ -1,82 +1,87 @@
import time
import socket
"""
Fast Shutter control for OMNY setup. If started with a config file in which the device_manager
has a 'fsh' device (cSAXSFastEpicsShutter), this device will be used as the shutter.
Otherwise, the device will create a dummy shutter device that will log warnings when shutter
methods are called, but will not raise exceptions.
"""
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device
from ophyd import EpicsSignal
from ophyd import Device, Signal
from ophyd_devices import PSIDeviceBase
logger = bec_logger.logger
class OMNYFastEpicsShutterError(Exception):
pass
def _detect_host_pv():
"""Detect host subnet and return appropriate PV name."""
try:
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
if local_ip.startswith("129.129.122."):
return "X12SA-ES1-TTL:OUT_01"
else:
return "XOMNYI-XEYE-DUMMYSHUTTER:0"
except Exception as ex:
print(f"Warning: could not detect IP subnet ({ex}), using dummy shutter.")
return "XOMNYI-XEYE-DUMMYSHUTTER:0"
class OMNYFastEpicsShutter(Device):
class OMNYFastEpicsShutter(PSIDeviceBase, Device):
"""
Fast EPICS shutter with automatic PV selection based on host subnet.
Fast Shutter control for OMNY setup. If started with at the beamline, it will expose
the shutter control methods (fshopen, fshclose, fshstatus, fshinfo) from the
cSAXSFastEpicsShutter device. The device is identified by the 'fsh' name in the device manager.
If the 'fsh' device is not found in the device manager, this device will create a dummy shutter
and log warnings when shutter methods are called, but will not raise exceptions.
"""
USER_ACCESS = ["fshopen", "fshclose", "fshstatus", "fshinfo", "help"]
USER_ACCESS = ["fshopen", "fshclose", "fshstatus", "fshinfo", "help", "fshstatus_readback"]
SUB_VALUE = "value"
_default_sub = SUB_VALUE
# PV is detected dynamically at import time
shutter = Cpt(EpicsSignal, name="shutter", read_pv=_detect_host_pv(), auto_monitor=True)
def __init__(self, prefix="", *, name, **kwargs):
super().__init__(prefix, name=name, **kwargs)
self.shutter.subscribe(self._emit_value)
def _emit_value(self, **kwargs):
timestamp = kwargs.pop("timestamp", time.time())
self.wait_for_connection()
self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self)
shutter = Cpt(Signal, name="shutter")
# -----------------------------------------------------
# User-facing shutter control functions
# -----------------------------------------------------
# pylint: disable=invalid-name
def _check_if_cSAXS_shutter_exists_in_config(self) -> bool:
"""
Check on the device manager if the shutter device exists.
Returns:
bool: True if the 'fsh' device exists in the device manager, False otherwise
"""
if self.device_manager.devices.get("fsh", None) is None:
logger.warning(f"Fast shutter device not found for {self.name}.")
return False
return True
def fshopen(self):
"""Open the fast shutter."""
try:
self.shutter.put(1, wait=True)
except Exception as ex:
raise OMNYFastEpicsShutterError(f"Failed to open shutter: {ex}")
if self._check_if_cSAXS_shutter_exists_in_config():
return self.device_manager.devices["fsh"].fshopen()
else:
self.shutter.put(1)
def fshclose(self):
"""Close the fast shutter."""
try:
self.shutter.put(0, wait=True)
except Exception as ex:
raise OMNYFastEpicsShutterError(f"Failed to close shutter: {ex}")
if self._check_if_cSAXS_shutter_exists_in_config():
return self.device_manager.devices["fsh"].fshclose()
else:
self.shutter.put(0)
def fshstatus(self):
"""Return the fast shutter status (0=closed, 1=open)."""
try:
if self._check_if_cSAXS_shutter_exists_in_config():
return self.device_manager.devices["fsh"].fshstatus()
else:
return self.shutter.get()
except Exception as ex:
raise OMNYFastEpicsShutterError(f"Failed to read shutter status: {ex}")
def fshinfo(self):
"""Print information about which EPICS PV channel is being used."""
pvname = self.shutter.pvname
print(f"Fast shutter connected to EPICS channel: {pvname}")
return pvname
if self._check_if_cSAXS_shutter_exists_in_config():
return self.device_manager.devices["fsh"].fshinfo()
else:
print("Using dummy fast shutter device. No EPICS channel is connected.")
def help(self):
"""Display available user methods."""
print("Available methods:")
for method in self.USER_ACCESS:
print(f" - {method}")
def fshstatus_readback(self):
"""Return the fast shutter status (0=closed, 1=open) from the readback signal."""
if self._check_if_cSAXS_shutter_exists_in_config():
return self.device_manager.devices["fsh"].fshstatus_readback()
else:
self.shutter.get()

View File

@@ -0,0 +1,37 @@
"""Module to test epics devices."""
import pytest
from ophyd_devices.tests.utils import patched_device
from csaxs_bec.devices.epics.fast_shutter import cSAXSFastEpicsShutter
@pytest.fixture
def fast_shutter_device():
"""Fixture to create a patched cSAXSFastEpicsShutter device for testing."""
with patched_device(cSAXSFastEpicsShutter, name="fsh", prefix="X12SA-ES1-TTL:") as device:
yield device
def test_fast_shutter_methods(fast_shutter_device):
"""Test the user-facing methods of the cSAXSFastEpicsShutter device."""
assert fast_shutter_device.name == "fsh", "Device name should be 'fsh'"
assert fast_shutter_device.prefix == "X12SA-ES1-TTL:", "Device prefix is 'X12SA-ES1-TTL:'"
# Test fshopen and fshclose
fast_shutter_device.fshopen()
assert fast_shutter_device.shutter.get() == 1, "Shutter should be open (1) after fshopen()"
assert fast_shutter_device.fshstatus() == 1, "fshstatus should return 1 when shutter is open"
fast_shutter_device.fshclose()
assert fast_shutter_device.shutter.get() == 0, "Shutter should be closed (0) after fshclose()"
assert fast_shutter_device.fshstatus() == 0, "fshstatus should return 0 when shutter is closed"
# shutter_readback is connected to separate PV.
fast_shutter_device.shutter_readback._read_pv.mock_data = 1 # Simulate readback showing open
assert (
fast_shutter_device.fshstatus_readback() == 1
), "fshstatus_readback should return 1 when shutter readback shows open"
fast_shutter_device.shutter_readback._read_pv.mock_data = 0 # Simulate readback showing closed
assert (
fast_shutter_device.fshstatus_readback() == 0
), "fshstatus_readback should return 0 when shutter readback shows closed"