ophyd test script
This commit is contained in:
66
superxas_bec/devices/test_script_ophyd.py
Normal file
66
superxas_bec/devices/test_script_ophyd.py
Normal file
@@ -0,0 +1,66 @@
|
||||
import time
|
||||
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device, EpicsSignal, EpicsSignalRO
|
||||
from ophyd.status import SubscriptionStatus
|
||||
|
||||
|
||||
class DummyDevice(Device):
|
||||
"""
|
||||
A dummy device for testing purposes.
|
||||
"""
|
||||
|
||||
i_0 = Cpt(EpicsSignalRO, "ES1-SAI_01:MEAN", doc="I0 signal")
|
||||
trigger = Cpt(EpicsSignal, "ES1:SMPL", doc="Trigger signal")
|
||||
trigger_done = Cpt(EpicsSignalRO, "ES1:SMPL-DONE", auto_monitor=True, doc="Trigger done signal")
|
||||
|
||||
xmap_start = Cpt(EpicsSignal, "SITORO:EraseStart", doc="XMAP start signal")
|
||||
xmap_stop = Cpt(EpicsSignal, "SITORO:StopAll", doc="XMAP stop signal")
|
||||
xmap_acquiring = Cpt(
|
||||
EpicsSignalRO, "SITORO:Acquiring", auto_monitor=True, doc="XMAP acquiring signal"
|
||||
)
|
||||
xmap_icr = Cpt(EpicsSignalRO, "SITORO:dxp1:InputCountRate", doc="XMAP input count rate")
|
||||
xmap_ocr = Cpt(EpicsSignalRO, "SITORO:dxp1:OutputCountRate", doc="XMAP output count rate")
|
||||
xmap_roi = Cpt(EpicsSignalRO, "SITORO:mca1.R0", doc="XMAP ROI signal")
|
||||
|
||||
|
||||
def state_changed_callback(*, value, old_value, **kwargs):
|
||||
"""Callback for acquiring signal changes."""
|
||||
if old_value == 0 and value == 1 or old_value == 1 and value == 0:
|
||||
print(f"State changed: {old_value} -> {value}")
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Create an instance of the dummy device
|
||||
print("Initializing DummyDevice...")
|
||||
time_started = time.time()
|
||||
|
||||
device = DummyDevice(name="test_device", prefix="X10DA-")
|
||||
|
||||
device.wait_for_connection(timeout=50, all_signals=True)
|
||||
print(f"Device initialized in {time.time() - time_started:.2f} seconds.")
|
||||
|
||||
for i in range(500):
|
||||
start_time = time.time()
|
||||
status = SubscriptionStatus(device.xmap_acquiring, state_changed_callback)
|
||||
device.xmap_start.put(1)
|
||||
status.wait(timeout=5)
|
||||
|
||||
status2 = SubscriptionStatus(device.trigger_done, state_changed_callback)
|
||||
device.trigger.put(1)
|
||||
status2.wait(timeout=5)
|
||||
|
||||
status3 = SubscriptionStatus(device.xmap_acquiring, state_changed_callback)
|
||||
device.xmap_stop.put(1)
|
||||
status3.wait(timeout=5)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
i0 = device.i_0.get()
|
||||
icr = device.xmap_icr.get()
|
||||
ocr = device.xmap_ocr.get()
|
||||
roi = device.xmap_roi.get()
|
||||
|
||||
print(f"time={time.time() - start_time:.4f}", i0, icr, ocr, roi)
|
||||
Reference in New Issue
Block a user