feat: add panda box csaxs integration

This commit is contained in:
x01dc
2026-01-14 12:25:47 +01:00
committed by appel_c
parent 3d62bea04b
commit f5b898ea1c
3 changed files with 114 additions and 0 deletions

View File

@@ -0,0 +1,32 @@
samx:
readoutPriority: baseline
deviceClass: ophyd_devices.SimPositioner
deviceConfig:
delay: 1
limits:
- -50
- 50
tolerance: 0.01
update_frequency: 400
deviceTags:
- user motors
enabled: true
readOnly: false
bpm4i:
readoutPriority: monitored
deviceClass: ophyd_devices.SimMonitor
deviceConfig:
deviceTags:
- beamline
enabled: true
readOnly: false
panda:
readoutPriority: async
deviceClass: csaxs_bec.devices.panda_box.panda_box.PandaBoxCSAXS
deviceConfig:
host: omny-panda.psi.ch
deviceTags:
- detector
enabled: true
readOnly: false
softwareTrigger: false

View File

View File

@@ -0,0 +1,82 @@
from ophyd_devices.devices.pandabox.pandabox import PandaBox, PandaState
from ophyd_devices import AsyncMultiSignal, StatusBase
from pandablocks.responses import FrameData
from bec_lib.logger import bec_logger
import time
logger = bec_logger.logger
class PandaBoxCSAXS(PandaBox):
data = AsyncMultiSignal(
name="data",
ndim=0,
max_size=1000,
signals=["FMC_IN.VAL1.Min", "FMC_IN.VAL1.Max", "FMC_IN.VAL1.Mean", "FMC_IN.VAL2.Value", "PCAP.GATE_DURATION.Value"],
async_update={"type" : "add", "max_shape" : [None]}
)
def on_init(self):
super().on_init()
self._acquisition_group = "panda"
def on_connected(self):
super().on_connected()
self.add_data_callback(data_type=PandaState.FRAME.value,callback=self._receive_frame_data)
return
def _receive_frame_data(self, data:FrameData) -> None:
logger.info(f"Received frame data with signals {data}")
out = self._compile_frame_data_to_dict(frame_data=data)
logger.info(f"Compiled data {out}")
self.data.put(out, acquisition_group=self._acquisition_group)
def on_complete(self):
#TODO make async...
captured = 0
start_time = time.monotonic_ns()
try:
while captured < self.scan_info.msg.num_points:
logger.info(f"Run with captured {captured} and expected points : {self.scan_info.msg.num_points}.")
ret = self.send_raw("*PCAP.CAPTURED?")
captured=int(ret[0].split("=")[-1])
logger.warning(f"Received captured {captured}.")
time.sleep(0.5)
if (time.monotonic_ns() -start_time) > 30:
break
finally:
logger.info(f"Running disarm")
self._disarm()
status = StatusBase(obj=self)
self.add_status_callback(status, success=[PandaState.END, PandaState.DISARMED], failure=[PandaState.READY])
self.cancel_on_stop(status)
return status
if __name__ == "__main__":
import time
panda = PandaBoxCSAXS(name='panda', host="omny-panda.psi.ch")
panda.on_connected()
status = StatusBase(obj=panda)
panda.add_status_callback(status=status, success=[PandaState.DISARMED], failure=[])
panda.stop()
status.wait(timeout=2)
panda.unstage()
logger.info(f"Panda connected")
ret = panda.stage()
logger.info(f"Panda staged")
ret = panda.pre_scan()
ret.wait(timeout=5)
logger.info(f"Panda pre scan done")
time.sleep(5)
panda.stop()
st = panda.complete()
st.wait(timeout=5)
logger.info(f"Measurement completed")
panda.unstage()
logger.info(f"Panda Unstaged")