diff --git a/csaxs_bec/devices/epics/fast_shutter.py b/csaxs_bec/devices/epics/fast_shutter.py index 2ae4ce1..9064b15 100644 --- a/csaxs_bec/devices/epics/fast_shutter.py +++ b/csaxs_bec/devices/epics/fast_shutter.py @@ -1,12 +1,11 @@ -import time -import socket +""" +Shutter device for the cSAXS beamline with 2 PVs. One is connected to a +signal can be set to control the shutter signal, and the other is a readback signal +that can be monitored to check the shutter status as it may be controlled directly by +the delay generator.""" + from ophyd import Component as Cpt -from ophyd import Device, Kind -from ophyd import EpicsSignal - - -class FastEpicsShutterError(Exception): - pass +from ophyd import Device, EpicsSignal, Kind class cSAXSFastEpicsShutter(Device): @@ -22,45 +21,35 @@ class cSAXSFastEpicsShutter(Device): shutter = Cpt(EpicsSignal, "OUT_01", kind=Kind.normal, auto_monitor=True) shutter_readback = Cpt(EpicsSignal, "IN_01", kind=Kind.normal, auto_monitor=True) - # ----------------------------------------------------- # User-facing shutter control functions # ----------------------------------------------------- - def fshopen(self): + # pylint: disable=protetced-access + def fshopen(self) -> None: """Open the fast shutter.""" - try: - self.shutter.put(1, wait=True) - except Exception as ex: - raise OMNYFastEpicsShutterError(f"Failed to open shutter: {ex}") + self.shutter.set(1).wait(timeout=self.shutter._timeout) # 2s default for ES - def fshclose(self): + # pylint: disable=protetced-access + def fshclose(self) -> None: """Close the fast shutter.""" - try: - self.shutter.put(0, wait=True) - except Exception as ex: - raise OMNYFastEpicsShutterError(f"Failed to close shutter: {ex}") + self.shutter.set(0).wait(timeout=self.shutter._timeout) # 2s default for ES - def fshstatus(self): - """Return the fast shutter status (0=closed, 1=open).""" - try: - return self.shutter.get() - except Exception as ex: - raise OMNYFastEpicsShutterError(f"Failed to read shutter status: {ex}") + def fshstatus(self) -> int: + """Return the fast shutter control status (0=closed, 1=open).""" + return self.shutter.get() # Ensure we have the latest value from EPICS - def fshstatus_readback(self): + def fshstatus_readback(self) -> int: """Return the fast shutter status (0=closed, 1=open).""" - try: - return self.shutter_readback.get() - except Exception as ex: - raise OMNYFastEpicsShutterError(f"Failed to read shutter status: {ex}") - - def fshinfo(self): + return self.shutter_readback.get() # Ensure we have the latest value from EPICS + + def fshinfo(self) -> None: """Print information about which EPICS PV channel is being used.""" pvname = self.shutter.pvname shutter_readback_pvname = self.shutter_readback.pvname - print(f"Fast shutter connected to EPICS channel: {pvname} with shutter readback: {shutter_readback_pvname}") - return pvname + print( + f"Fast shutter connected to EPICS channel: {pvname} with shutter readback: {shutter_readback_pvname}" + ) def help(self): """Display available user methods.""" @@ -68,5 +57,6 @@ class cSAXSFastEpicsShutter(Device): for method in self.USER_ACCESS: print(f" - {method}") + if __name__ == "__main__": fsh = cSAXSFastEpicsShutter(name="fsh", prefix="X12SA-ES1-TTL:") diff --git a/csaxs_bec/devices/omny/shutter.py b/csaxs_bec/devices/omny/shutter.py index 3ffd7e6..983b632 100644 --- a/csaxs_bec/devices/omny/shutter.py +++ b/csaxs_bec/devices/omny/shutter.py @@ -1,82 +1,87 @@ -import time -import socket +""" +Fast Shutter control for OMNY setup. If started with a config file in which the device_manager +has a 'fsh' device (cSAXSFastEpicsShutter), this device will be used as the shutter. +Otherwise, the device will create a dummy shutter device that will log warnings when shutter +methods are called, but will not raise exceptions. +""" + +from bec_lib.logger import bec_logger from ophyd import Component as Cpt -from ophyd import Device -from ophyd import EpicsSignal +from ophyd import Device, Signal +from ophyd_devices import PSIDeviceBase + +logger = bec_logger.logger -class OMNYFastEpicsShutterError(Exception): - pass - - -def _detect_host_pv(): - """Detect host subnet and return appropriate PV name.""" - try: - hostname = socket.gethostname() - local_ip = socket.gethostbyname(hostname) - if local_ip.startswith("129.129.122."): - return "X12SA-ES1-TTL:OUT_01" - else: - return "XOMNYI-XEYE-DUMMYSHUTTER:0" - except Exception as ex: - print(f"Warning: could not detect IP subnet ({ex}), using dummy shutter.") - return "XOMNYI-XEYE-DUMMYSHUTTER:0" - - -class OMNYFastEpicsShutter(Device): +class OMNYFastEpicsShutter(PSIDeviceBase, Device): """ - Fast EPICS shutter with automatic PV selection based on host subnet. + Fast Shutter control for OMNY setup. If started with at the beamline, it will expose + the shutter control methods (fshopen, fshclose, fshstatus, fshinfo) from the + cSAXSFastEpicsShutter device. The device is identified by the 'fsh' name in the device manager. + If the 'fsh' device is not found in the device manager, this device will create a dummy shutter + and log warnings when shutter methods are called, but will not raise exceptions. """ - USER_ACCESS = ["fshopen", "fshclose", "fshstatus", "fshinfo", "help"] + USER_ACCESS = ["fshopen", "fshclose", "fshstatus", "fshinfo", "help", "fshstatus_readback"] SUB_VALUE = "value" _default_sub = SUB_VALUE - # PV is detected dynamically at import time - shutter = Cpt(EpicsSignal, name="shutter", read_pv=_detect_host_pv(), auto_monitor=True) - - def __init__(self, prefix="", *, name, **kwargs): - super().__init__(prefix, name=name, **kwargs) - self.shutter.subscribe(self._emit_value) - - def _emit_value(self, **kwargs): - timestamp = kwargs.pop("timestamp", time.time()) - self.wait_for_connection() - self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self) + shutter = Cpt(Signal, name="shutter") # ----------------------------------------------------- # User-facing shutter control functions # ----------------------------------------------------- + # pylint: disable=invalid-name + def _check_if_cSAXS_shutter_exists_in_config(self) -> bool: + """ + Check on the device manager if the shutter device exists. + + Returns: + bool: True if the 'fsh' device exists in the device manager, False otherwise + """ + if self.device_manager.devices.get("fsh", None) is None: + logger.warning(f"Fast shutter device not found for {self.name}.") + return False + return True + def fshopen(self): """Open the fast shutter.""" - try: - self.shutter.put(1, wait=True) - except Exception as ex: - raise OMNYFastEpicsShutterError(f"Failed to open shutter: {ex}") + if self._check_if_cSAXS_shutter_exists_in_config(): + return self.device_manager.devices["fsh"].fshopen() + else: + self.shutter.put(1) def fshclose(self): """Close the fast shutter.""" - try: - self.shutter.put(0, wait=True) - except Exception as ex: - raise OMNYFastEpicsShutterError(f"Failed to close shutter: {ex}") + if self._check_if_cSAXS_shutter_exists_in_config(): + return self.device_manager.devices["fsh"].fshclose() + else: + self.shutter.put(0) def fshstatus(self): """Return the fast shutter status (0=closed, 1=open).""" - try: + if self._check_if_cSAXS_shutter_exists_in_config(): + return self.device_manager.devices["fsh"].fshstatus() + else: return self.shutter.get() - except Exception as ex: - raise OMNYFastEpicsShutterError(f"Failed to read shutter status: {ex}") def fshinfo(self): """Print information about which EPICS PV channel is being used.""" - pvname = self.shutter.pvname - print(f"Fast shutter connected to EPICS channel: {pvname}") - return pvname + if self._check_if_cSAXS_shutter_exists_in_config(): + return self.device_manager.devices["fsh"].fshinfo() + else: + print("Using dummy fast shutter device. No EPICS channel is connected.") def help(self): """Display available user methods.""" print("Available methods:") for method in self.USER_ACCESS: print(f" - {method}") + + def fshstatus_readback(self): + """Return the fast shutter status (0=closed, 1=open) from the readback signal.""" + if self._check_if_cSAXS_shutter_exists_in_config(): + return self.device_manager.devices["fsh"].fshstatus_readback() + else: + self.shutter.get() diff --git a/tests/tests_devices/test_epics_devices.py b/tests/tests_devices/test_epics_devices.py new file mode 100644 index 0000000..327847b --- /dev/null +++ b/tests/tests_devices/test_epics_devices.py @@ -0,0 +1,37 @@ +"""Module to test epics devices.""" + +import pytest +from ophyd_devices.tests.utils import patched_device + +from csaxs_bec.devices.epics.fast_shutter import cSAXSFastEpicsShutter + + +@pytest.fixture +def fast_shutter_device(): + """Fixture to create a patched cSAXSFastEpicsShutter device for testing.""" + with patched_device(cSAXSFastEpicsShutter, name="fsh", prefix="X12SA-ES1-TTL:") as device: + yield device + + +def test_fast_shutter_methods(fast_shutter_device): + """Test the user-facing methods of the cSAXSFastEpicsShutter device.""" + assert fast_shutter_device.name == "fsh", "Device name should be 'fsh'" + assert fast_shutter_device.prefix == "X12SA-ES1-TTL:", "Device prefix is 'X12SA-ES1-TTL:'" + # Test fshopen and fshclose + fast_shutter_device.fshopen() + assert fast_shutter_device.shutter.get() == 1, "Shutter should be open (1) after fshopen()" + assert fast_shutter_device.fshstatus() == 1, "fshstatus should return 1 when shutter is open" + + fast_shutter_device.fshclose() + assert fast_shutter_device.shutter.get() == 0, "Shutter should be closed (0) after fshclose()" + assert fast_shutter_device.fshstatus() == 0, "fshstatus should return 0 when shutter is closed" + + # shutter_readback is connected to separate PV. + fast_shutter_device.shutter_readback._read_pv.mock_data = 1 # Simulate readback showing open + assert ( + fast_shutter_device.fshstatus_readback() == 1 + ), "fshstatus_readback should return 1 when shutter readback shows open" + fast_shutter_device.shutter_readback._read_pv.mock_data = 0 # Simulate readback showing closed + assert ( + fast_shutter_device.fshstatus_readback() == 0 + ), "fshstatus_readback should return 0 when shutter readback shows closed"