From a70478db2d3bbef1bfe8a1f9710b9c4405288413 Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 24 Jun 2025 14:10:58 +0200 Subject: [PATCH] ophyd test script --- superxas_bec/devices/test_script_ophyd.py | 66 +++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 superxas_bec/devices/test_script_ophyd.py diff --git a/superxas_bec/devices/test_script_ophyd.py b/superxas_bec/devices/test_script_ophyd.py new file mode 100644 index 0000000..863c403 --- /dev/null +++ b/superxas_bec/devices/test_script_ophyd.py @@ -0,0 +1,66 @@ +import time + +from ophyd import Component as Cpt +from ophyd import Device, EpicsSignal, EpicsSignalRO +from ophyd.status import SubscriptionStatus + + +class DummyDevice(Device): + """ + A dummy device for testing purposes. + """ + + i_0 = Cpt(EpicsSignalRO, "ES1-SAI_01:MEAN", doc="I0 signal") + trigger = Cpt(EpicsSignal, "ES1:SMPL", doc="Trigger signal") + trigger_done = Cpt(EpicsSignalRO, "ES1:SMPL-DONE", auto_monitor=True, doc="Trigger done signal") + + xmap_start = Cpt(EpicsSignal, "SITORO:EraseStart", doc="XMAP start signal") + xmap_stop = Cpt(EpicsSignal, "SITORO:StopAll", doc="XMAP stop signal") + xmap_acquiring = Cpt( + EpicsSignalRO, "SITORO:Acquiring", auto_monitor=True, doc="XMAP acquiring signal" + ) + xmap_icr = Cpt(EpicsSignalRO, "SITORO:dxp1:InputCountRate", doc="XMAP input count rate") + xmap_ocr = Cpt(EpicsSignalRO, "SITORO:dxp1:OutputCountRate", doc="XMAP output count rate") + xmap_roi = Cpt(EpicsSignalRO, "SITORO:mca1.R0", doc="XMAP ROI signal") + + +def state_changed_callback(*, value, old_value, **kwargs): + """Callback for acquiring signal changes.""" + if old_value == 0 and value == 1 or old_value == 1 and value == 0: + print(f"State changed: {old_value} -> {value}") + return True + return False + + +if __name__ == "__main__": + # Create an instance of the dummy device + print("Initializing DummyDevice...") + time_started = time.time() + + device = DummyDevice(name="test_device", prefix="X10DA-") + + device.wait_for_connection(timeout=50, all_signals=True) + print(f"Device initialized in {time.time() - time_started:.2f} seconds.") + + for i in range(500): + start_time = time.time() + status = SubscriptionStatus(device.xmap_acquiring, state_changed_callback) + device.xmap_start.put(1) + status.wait(timeout=5) + + status2 = SubscriptionStatus(device.trigger_done, state_changed_callback) + device.trigger.put(1) + status2.wait(timeout=5) + + status3 = SubscriptionStatus(device.xmap_acquiring, state_changed_callback) + device.xmap_stop.put(1) + status3.wait(timeout=5) + + time.sleep(0.1) + + i0 = device.i_0.get() + icr = device.xmap_icr.get() + ocr = device.xmap_ocr.get() + roi = device.xmap_roi.get() + + print(f"time={time.time() - start_time:.4f}", i0, icr, ocr, roi)