Compare commits
1 Commits
feat-csaxs
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
6d404cad12
|
@@ -13,6 +13,14 @@ from ophyd_devices import PSIDeviceBase
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class MonitorSignal(Signal):
|
||||
"""A simple wrapper around ophyd Signal that automatically monitors the signal for changes."""
|
||||
|
||||
def __init__(self, *, name, auto_monitor=False, **kwargs):
|
||||
super().__init__(name=name, **kwargs)
|
||||
self.auto_monitor = auto_monitor
|
||||
|
||||
|
||||
class OMNYFastShutter(PSIDeviceBase, Device):
|
||||
"""
|
||||
Fast Shutter control for OMNY setup. If started with at the beamline, it will expose
|
||||
@@ -26,7 +34,7 @@ class OMNYFastShutter(PSIDeviceBase, Device):
|
||||
SUB_VALUE = "value"
|
||||
_default_sub = SUB_VALUE
|
||||
|
||||
shutter = Cpt(Signal, name="shutter")
|
||||
shutter = Cpt(MonitorSignal, name="shutter", auto_monitor=True)
|
||||
|
||||
# -----------------------------------------------------
|
||||
# User-facing shutter control functions
|
||||
|
||||
69
tests/tests_devices/test_omny_shutter.py
Normal file
69
tests/tests_devices/test_omny_shutter.py
Normal file
@@ -0,0 +1,69 @@
|
||||
import pytest
|
||||
from bec_server.device_server.tests.utils import DMMock
|
||||
|
||||
from csaxs_bec.devices.omny.shutter import MonitorSignal, OMNYFastShutter
|
||||
|
||||
|
||||
@pytest.mark.parametrize("auto_monitor", [False, True])
|
||||
def test_monitor_signal_stores_auto_monitor(auto_monitor):
|
||||
signal = MonitorSignal(name="signal", auto_monitor=auto_monitor)
|
||||
|
||||
assert signal.auto_monitor is auto_monitor
|
||||
|
||||
|
||||
def test_monitor_signal_put_propagates_value_to_readback_callback():
|
||||
signal = MonitorSignal(name="signal", auto_monitor=True)
|
||||
initial_value = signal.read()[signal.name]["value"]
|
||||
callback_values = []
|
||||
callback_reads = []
|
||||
|
||||
def _test_cb(value, old_value, **kwargs):
|
||||
callback_values.append((value, old_value))
|
||||
callback_reads.append(kwargs["obj"].read())
|
||||
|
||||
signal.subscribe(_test_cb, event_type=signal.SUB_VALUE, run=False)
|
||||
|
||||
signal.put(1)
|
||||
|
||||
assert callback_values == [(1, initial_value)]
|
||||
assert len(callback_reads) == 1
|
||||
assert callback_reads[0][signal.name]["value"] == 1
|
||||
assert signal.read()[signal.name]["value"] == 1
|
||||
|
||||
signal.put(0)
|
||||
assert callback_values == [(1, initial_value), (0, 1)]
|
||||
assert len(callback_reads) == 2
|
||||
assert callback_reads[1][signal.name]["value"] == 0
|
||||
assert signal.read()[signal.name]["value"] == 0
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def omny_fast_shutter():
|
||||
shutter = OMNYFastShutter(name="omny_fast_shutter", device_manager=DMMock())
|
||||
|
||||
try:
|
||||
yield shutter
|
||||
finally:
|
||||
shutter.destroy()
|
||||
|
||||
|
||||
def test_omny_fast_shutter_uses_monitor_signal_with_auto_monitor(omny_fast_shutter):
|
||||
assert isinstance(omny_fast_shutter.shutter, MonitorSignal)
|
||||
assert omny_fast_shutter.shutter.auto_monitor is True
|
||||
|
||||
|
||||
def test_omny_fast_shutter_propagates_signal_changes_to_device_readback(omny_fast_shutter):
|
||||
signal_name = omny_fast_shutter.shutter.name
|
||||
callback_reads = []
|
||||
|
||||
def _test_cb(**kwargs):
|
||||
callback_reads.append(omny_fast_shutter.read())
|
||||
|
||||
omny_fast_shutter.shutter.subscribe(_test_cb, event_type=omny_fast_shutter.shutter.SUB_VALUE, run=False)
|
||||
|
||||
omny_fast_shutter.shutter.put(1)
|
||||
|
||||
assert len(callback_reads) == 1
|
||||
assert callback_reads[0][signal_name]["value"] == 1
|
||||
assert omny_fast_shutter.read()[signal_name]["value"] == 1
|
||||
assert omny_fast_shutter.fshstatus() == 1
|
||||
Reference in New Issue
Block a user