diff --git a/csaxs_bec/devices/omny/shutter.py b/csaxs_bec/devices/omny/shutter.py index a4b7312..1f10051 100644 --- a/csaxs_bec/devices/omny/shutter.py +++ b/csaxs_bec/devices/omny/shutter.py @@ -13,6 +13,14 @@ from ophyd_devices import PSIDeviceBase logger = bec_logger.logger +class MonitorSignal(Signal): + """A simple wrapper around ophyd Signal that automatically monitors the signal for changes.""" + + def __init__(self, *, name, auto_monitor=False, **kwargs): + super().__init__(name=name, **kwargs) + self.auto_monitor = auto_monitor + + class OMNYFastShutter(PSIDeviceBase, Device): """ Fast Shutter control for OMNY setup. If started with at the beamline, it will expose @@ -26,7 +34,7 @@ class OMNYFastShutter(PSIDeviceBase, Device): SUB_VALUE = "value" _default_sub = SUB_VALUE - shutter = Cpt(Signal, name="shutter") + shutter = Cpt(MonitorSignal, name="shutter", auto_monitor=True) # ----------------------------------------------------- # User-facing shutter control functions diff --git a/tests/tests_devices/test_omny_shutter.py b/tests/tests_devices/test_omny_shutter.py new file mode 100644 index 0000000..b2bc1c9 --- /dev/null +++ b/tests/tests_devices/test_omny_shutter.py @@ -0,0 +1,69 @@ +import pytest +from bec_server.device_server.tests.utils import DMMock + +from csaxs_bec.devices.omny.shutter import MonitorSignal, OMNYFastShutter + + +@pytest.mark.parametrize("auto_monitor", [False, True]) +def test_monitor_signal_stores_auto_monitor(auto_monitor): + signal = MonitorSignal(name="signal", auto_monitor=auto_monitor) + + assert signal.auto_monitor is auto_monitor + + +def test_monitor_signal_put_propagates_value_to_readback_callback(): + signal = MonitorSignal(name="signal", auto_monitor=True) + initial_value = signal.read()[signal.name]["value"] + callback_values = [] + callback_reads = [] + + def _test_cb(value, old_value, **kwargs): + callback_values.append((value, old_value)) + callback_reads.append(kwargs["obj"].read()) + + signal.subscribe(_test_cb, event_type=signal.SUB_VALUE, run=False) + + signal.put(1) + + assert callback_values == [(1, initial_value)] + assert len(callback_reads) == 1 + assert callback_reads[0][signal.name]["value"] == 1 + assert signal.read()[signal.name]["value"] == 1 + + signal.put(0) + assert callback_values == [(1, initial_value), (0, 1)] + assert len(callback_reads) == 2 + assert callback_reads[1][signal.name]["value"] == 0 + assert signal.read()[signal.name]["value"] == 0 + + +@pytest.fixture +def omny_fast_shutter(): + shutter = OMNYFastShutter(name="omny_fast_shutter", device_manager=DMMock()) + + try: + yield shutter + finally: + shutter.destroy() + + +def test_omny_fast_shutter_uses_monitor_signal_with_auto_monitor(omny_fast_shutter): + assert isinstance(omny_fast_shutter.shutter, MonitorSignal) + assert omny_fast_shutter.shutter.auto_monitor is True + + +def test_omny_fast_shutter_propagates_signal_changes_to_device_readback(omny_fast_shutter): + signal_name = omny_fast_shutter.shutter.name + callback_reads = [] + + def _test_cb(**kwargs): + callback_reads.append(omny_fast_shutter.read()) + + omny_fast_shutter.shutter.subscribe(_test_cb, event_type=omny_fast_shutter.shutter.SUB_VALUE, run=False) + + omny_fast_shutter.shutter.put(1) + + assert len(callback_reads) == 1 + assert callback_reads[0][signal_name]["value"] == 1 + assert omny_fast_shutter.read()[signal_name]["value"] == 1 + assert omny_fast_shutter.fshstatus() == 1