refactor(panda-box): refactor Pandabox, moving logic to base class
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m22s
CI for csaxs_bec / test (pull_request) Successful in 1m23s

This commit is contained in:
2026-02-09 17:13:09 +01:00
parent db747c0c20
commit 2c86fe64e8

View File

@@ -1,64 +1,71 @@
from ophyd_devices.devices.pandabox.pandabox import PandaBox, PandaState
from ophyd_devices import AsyncMultiSignal, StatusBase
"""Module to integrate the PandaBox for cSAXS measurements."""
from pandablocks.responses import FrameData
from bec_lib.logger import bec_logger
import time
from bec_lib.logger import bec_logger
from ophyd_devices import AsyncMultiSignal, StatusBase
from ophyd_devices.devices.panda_box.panda_box import PandaBox, PandaState
from pandablocks.responses import FrameData
logger = bec_logger.logger
class PandaBoxCSAXS(PandaBox):
data = AsyncMultiSignal(
name="data",
ndim=0,
max_size=1000,
signals=["FMC_IN.VAL1.Min", "FMC_IN.VAL1.Max", "FMC_IN.VAL1.Mean", "FMC_IN.VAL2.Value", "PCAP.GATE_DURATION.Value"],
async_update={"type" : "add", "max_shape" : [None]}
)
class PandaBoxCSAXS(PandaBox):
def on_init(self):
super().on_init()
self._acquisition_group = "panda"
self._acquisition_group = "burst"
def on_connected(self):
super().on_connected()
self.add_data_callback(data_type=PandaState.FRAME.value,callback=self._receive_frame_data)
return
def _receive_frame_data(self, data:FrameData) -> None:
logger.info(f"Received frame data with signals {data}")
out = self._compile_frame_data_to_dict(frame_data=data)
logger.info(f"Compiled data {out}")
self.data.put(out, acquisition_group=self._acquisition_group)
def on_stage(self):
# TODO, adjust as seen fit.
# Adjust the acquisition group based on scan parameters if needed
if self.scan_info.msg.scan_type == "fly":
self._acquisition_group = "fly"
elif self.scan_info.msg.scan_type == "step":
if self.scan_info.msg.scan_parameters["frames_per_trigger"] == 1:
self._acquisition_group = "monitored"
else:
self._acquisition_group = "burst"
def on_complete(self):
#TODO make async...
captured = 0
start_time = time.monotonic_ns()
try:
while captured < self.scan_info.msg.num_points:
logger.info(f"Run with captured {captured} and expected points : {self.scan_info.msg.num_points}.")
ret = self.send_raw("*PCAP.CAPTURED?")
captured=int(ret[0].split("=")[-1])
logger.warning(f"Received captured {captured}.")
time.sleep(0.5)
if (time.monotonic_ns() -start_time) > 30:
break
finally:
logger.info(f"Running disarm")
self._disarm()
status = StatusBase(obj=self)
self.add_status_callback(status, success=[PandaState.END, PandaState.DISARMED], failure=[PandaState.READY])
self.cancel_on_stop(status)
return status
"""On complete is called after the scan is complete. We need to wait for the capture to complete before we can disarm the PandaBox."""
def _check_capture_complete():
timeout = 10
captured = 0
start_time = time.monotonic_ns()
try:
while captured < self.scan_info.msg.num_points:
logger.debug(
f"Run with captured {captured} and expected points : {self.scan_info.msg.num_points}."
)
ret = self.send_raw("*PCAP.CAPTURED?")
captured = int(ret[0].split("=")[-1])
time.sleep(0.5)
if (time.monotonic_ns() - start_time) > timeout:
break
finally:
self._disarm()
# NOTE: This utility class allows to submit a blocking function to a thread and return a status object
# that can be awaited for. This allows for asynchronous waiting for the capture to complete without blocking
# the main duty cycle of the device server. The device server knows how to handle the status object (future)
# and will wait for it to complete.
status = self.task_handler.submit_task(_check_capture_complete, run=True)
status_panda_state = StatusBase(obj=self)
self.add_status_callback(
status, success=[PandaState.END, PandaState.DISARMED], failure=[PandaState.READY]
)
ret_status = status & status_panda_state
self.cancel_on_stop(ret_status)
return ret_status
if __name__ == "__main__":
import time
panda = PandaBoxCSAXS(name='panda', host="omny-panda.psi.ch")
panda = PandaBoxCSAXS(name="panda", host="omny-panda.psi.ch")
panda.on_connected()
status = StatusBase(obj=panda)
panda.add_status_callback(status=status, success=[PandaState.DISARMED], failure=[])
@@ -78,5 +85,3 @@ if __name__ == "__main__":
logger.info(f"Measurement completed")
panda.unstage()
logger.info(f"Panda Unstaged")