From dd0fe31cb753f770899337928ddede470269c869 Mon Sep 17 00:00:00 2001 From: gac-x01da Date: Wed, 3 Sep 2025 13:24:18 +0200 Subject: [PATCH] feat(pilatus): Initial commit of Pilatus integration --- debye_bec/device_configs/x01da_pilatus.yaml | 9 + debye_bec/devices/pilatus/__init__.py | 0 debye_bec/devices/pilatus/pilatus.py | 408 ++++++++++++++++++++ 3 files changed, 417 insertions(+) create mode 100644 debye_bec/device_configs/x01da_pilatus.yaml create mode 100644 debye_bec/devices/pilatus/__init__.py create mode 100644 debye_bec/devices/pilatus/pilatus.py diff --git a/debye_bec/device_configs/x01da_pilatus.yaml b/debye_bec/device_configs/x01da_pilatus.yaml new file mode 100644 index 0000000..261cfa9 --- /dev/null +++ b/debye_bec/device_configs/x01da_pilatus.yaml @@ -0,0 +1,9 @@ + pilatus: + readoutPriority: async + description: Pilatus + deviceClass: debye_bec.devices.pilatus.PilatusDetector + deviceConfig: + prefix: "X01DA-ES2-PIL:" + onFailure: retry + enabled: true + softwareTrigger: false \ No newline at end of file diff --git a/debye_bec/devices/pilatus/__init__.py b/debye_bec/devices/pilatus/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/debye_bec/devices/pilatus/pilatus.py b/debye_bec/devices/pilatus/pilatus.py new file mode 100644 index 0000000..8a570db --- /dev/null +++ b/debye_bec/devices/pilatus/pilatus.py @@ -0,0 +1,408 @@ +"""Pilatus AD integration at Debye beamline.""" + +from __future__ import annotations + +import enum +import threading +import time +import traceback +from typing import TYPE_CHECKING + +import numpy as np +from bec_lib.file_utils import get_full_path +from bec_lib.logger import bec_logger +from ophyd import Component as Cpt +from ophyd import EpicsSignal, Kind +from ophyd.areadetector.cam import ADBase, PilatusDetectorCam +from ophyd.areadetector.plugins import HDF5Plugin_V22 as HDF5Plugin +from ophyd.status import WaitTimeoutError +from ophyd_devices import ( + AndStatusWithList, + CompareStatus, + DeviceStatus, + FileEventSignal, + PreviewSignal, +) +from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 as ImagePlugin +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase +from ophyd_devices.utils.psi_device_base_utils import TaskStatus + +if TYPE_CHECKING: # pragma: no cover + from bec_lib.devicemanager import ScanInfo + from bec_lib.messages import DevicePreviewMessage, ScanStatusMessage + from bec_server.device_server.device_server import DeviceManagerDS + +PILATUS_READOUT_TIME = 0.1 # in s +PILATUS_ACQUIRE_TIME = ( + 999999 # This time is the timeout of the detector in operation mode, so it needs to be large. +) + +# pylint: disable=redefined-outer-name + +logger = bec_logger.logger + + +class DETECTORSTATE(int, enum.Enum): + """Pilatus Detector States from CamServer""" + + UNARMED = 0 + ARMED = 1 + + +class ACQUIREMODE(int, enum.Enum): + """Pilatus Acquisition Modes""" + + DONE = 0 + ACQUIRING = 1 + + +class FILEWRITEMODE(int, enum.Enum): + """HDF5 Plugin FileWrite Mode""" + + SINGLE = 0 + CAPTURE = 1 + STREAM = 2 + + +class COMPRESSIONALGORITHM(int, enum.Enum): + """HDF5 Plugin Compression Algorithm""" + + NONE = 0 + NBIT = 1 # Don't use that.. + SZIP = 2 + ZLIB = 3 + + +class TRIGGERMODE(int, enum.Enum): + """Pilatus Trigger Modes""" + + INTERNAL = 0 + EXT_ENABLE = 1 + EXT_TRIGGER = 2 + MULT_TRIGGER = 3 + ALIGNMENT = 4 + + def description(self) -> str: + """Return a description of the trigger mode.""" + descriptions = { + TRIGGERMODE.INTERNAL: "Internal trigger mode, images are acquired on internal trigger.", + TRIGGERMODE.EXT_ENABLE: "External Enable trigger mode; check manual as details are currently unknown", + TRIGGERMODE.EXT_TRIGGER: "External Trigger mode, images are acquired on external trigger signal. All images on single trigger.", + TRIGGERMODE.MULT_TRIGGER: "Multiple External Trigger mode, images are acquired on multiple external trigger signals. One image per trigger.", + TRIGGERMODE.ALIGNMENT: "Alignment mode, used for beam alignment.", + } + return descriptions.get(self, "Unknown") + + def __str__(self): + return self.description() + + +class Pilatus(PSIDeviceBase, ADBase): + """ + Pilatus Base integration for Debye. + Prefix of the detector is 'X01DA-ES2-PIL:' + + Args: + prefix (str) : Prefix for the IOC + name (str) : Name of the detector + scan_info (ScanInfo | None) : ScanInfo object passed through the device by the device_manager + device_manager (DeviceManager | None) : DeviceManager object passed through the device by the device_manager + """ + + cam = Cpt(PilatusDetectorCam, "cam1:") + hdf = Cpt(HDF5Plugin, "HDF1:") + image1 = Cpt(ImagePlugin, "image1:") + filter_number = Cpt( + EpicsSignal, "cam1:FileNumber", kind=Kind.omitted, doc="File number for ramdisk" + ) + trigger_shot = Cpt( + EpicsSignal, + read_pv="X01DA-OP-MO1:BRAGG:xrd_trig_req", + write_pv="X01DA-OP-MO1:BRAGG:xrd_trig_req", + add_prefix=("a",), + kind=Kind.omitted, + doc="Trigger PV from MO1 Bragg", + ) + trigger_source = Cpt( + EpicsSignal, + read_pv="X01DA-OP-MO1:BRAGG:xrd_trig_src_ENUM_RBV", + write_pv="X01DA-OP-MO1:BRAGG:xrd_trig_src_ENUM", + add_prefix=("a",), + kind=Kind.omitted, + doc="Trigger Source; PV, 0 : EPICS, 1 : INPOS", + ) + trigger_mode = Cpt( + EpicsSignal, + read_pv="X01DA-OP-MO1:BRAGG:xrd_trig_mode_ENUM_RBV", + write_pv="X01DA-OP-MO1:BRAGG:xrd_trig_mode_ENUM", + add_prefix=("a",), + kind=Kind.omitted, + doc="Trigger Mode; 0 : PULSE, 1 : CONDITION", + ) + trigger_pulse_length = Cpt( + EpicsSignal, + read_pv="X01DA-OP-MO1:BRAGG:xrd_trig_len_RBV", + write_pv="X01DA-OP-MO1:BRAGG:xrd_trig_len", + add_prefix=("a",), + kind=Kind.omitted, + doc="Trigger Pulse Length in seconds", + ) + preview = Cpt( + PreviewSignal, + name="preview", + ndim=2, + num_rotation_90=0, # Check this + doc="Preview signal for the Pilatus Detector", + ) + file_event = Cpt(FileEventSignal, name="file_event") + + def __init__( + self, + *, + name: str, + prefix: str = "", + scan_info: ScanInfo | None = None, + device_manager: DeviceManagerDS | None = None, + **kwargs, + ): + super().__init__( + name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs + ) + self.device_manager = device_manager + self._readout_time = PILATUS_READOUT_TIME + self._full_path = "" + self._poll_thread_stop_event = threading.Event() + self._task_status: TaskStatus | None = None + self._poll_rate = 1 # 1Hz + + ######################################## + # Custom Beamline Methods # + ######################################## + + def _poll_array_data(self): + while not self._poll_thread_stop_event.wait(1 / self._poll_rate): + logger.debug("Polling Pilatus array data for preview...") + try: + value = self.image1.array_data.get() + if value is None: + continue + width = self.image1.array_size.width.get() + height = self.image1.array_size.height.get() + # Geometry correction for the image + data = np.reshape(value, (height, width)) + last_image: DevicePreviewMessage = self.preview.get() + if last_image is None: + return + elif np.array_equal(data, last_image.data): + # No update if image is the same, ~2.5ms on 2400x2400 image (6M) + logger.debug( + f"Pilatus preview image for {self.name} is the same as last one, not updating." + ) + return + self.preview.put(data) + except Exception: # pylint: disable=broad-except + content = traceback.format_exc() + logger.error( + f"Error while polling array data for preview of {self.name}: {content}" + ) + + ######################################## + # Beamline Specific Implementations # + ######################################## + + def on_init(self) -> None: + """ + Called when the device is initialized. + + No signals are connected at this point. If you like to + set default values on signals, please use on_connected instead. + """ + + def on_connected(self) -> None: + """ + Called after the device is connected and its signals are connected. + Default values for signals should be set here. + """ + status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value) + status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.DONE.value) + try: + status_cam.wait(timeout=5) + status_hdf.wait(timeout=5) + except WaitTimeoutError: + logger.warning( + f"Camera device {self.name} was running an acquisition. Stopping acquisition." + ) + self.cam.acquire.put(0) + self.hdf.capture.put(0) + + self.cam.trigger_mode.set(TRIGGERMODE.MULT_TRIGGER.value).wait(5) + self.cam.image_file_tmot.set(60).wait(5) + self.hdf.file_write_mode.set(FILEWRITEMODE.STREAM.value).wait(5) + self.hdf.file_template.set("%s%s").wait(5) + self.hdf.auto_save.set(1).wait(5) + self.hdf.lazy_open.set(1).wait(5) + self.hdf.compression.set(COMPRESSIONALGORITHM.NONE.value).wait(5) # To test which to use + # Start polling thread... + self._task_status = self.task_handler.submit_task(task=self._poll_array_data, run=True) + + def on_stage(self) -> DeviceStatus | None: + """ + Called while staging the device. + + Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object. + """ + scan_msg: ScanStatusMessage = self.scan_info.msg + if scan_msg.scan_name.startswith("xas"): + return None + # TODO implement logic for 'xas' scans + else: + exp_time = scan_msg.scan_parameters.get("exposure_time", 0.0) + if exp_time - self._readout_time <= 0: + raise ValueError( + f"Exposure time {exp_time} is too short for Pilatus with readout_time {self._readout_time}." + ) + detector_exp_time = exp_time - self._readout_time + n_images = scan_msg.num_points * scan_msg.scan_parameters.get("frames_per_trigger", 1) + self._full_path = get_full_path(scan_msg, name="pilatus") + file_path = "/".join(self._full_path.split("/")[:-1]) + file_name = self._full_path.split("/")[-1] + + # Prepare detector and backend + self.cam.array_callbacks.set(1).wait(5) # Enable array callbacks + self.hdf.enable.set(1).wait(5) # Enable HDF5 plugin + # Camera settings + self.cam.num_exposures.set(1).wait(5) + self.cam.num_images.set(n_images).wait(5) + self.cam.acquire_time.set(exp_time).wait(5) # let's try this + self.cam.acquire_period.set(PILATUS_ACQUIRE_TIME).wait(5) + self.filter_number.set(0).wait(5) + # HDF5 settings + logger.debug(f"Setting HDF5 file path to {file_path} and file name to {file_name}") + self.hdf.file_path.set(file_path).wait(5) + self.hdf.file_name.set(file_name).wait(5) + self.hdf.num_capture.set(n_images).wait(5) + self.cam.array_counter.set(0).wait(5) # Reset array counter + self.file_event.put( + file_path=self._full_path, done=False, successful=False + ) # TODO add h5_entry dict + return None + + def on_unstage(self) -> None: + """Called while unstaging the device.""" + + def on_pre_scan(self) -> DeviceStatus | None: + """Called right before the scan starts on all devices automatically.""" + if self.scan_info.msg.scan_name.startswith("xas"): + # TODO implement logic for 'xas' scans + return None + else: + status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.ACQUIRING.value) + status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.ACQUIRING.value) + status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.ARMED.value) + status = AndStatusWithList( + device=self, status_list=[status_hdf, status_cam, status_cam_server] + ) + self.cam.acquire.put(1) + self.hdf.capture.put(1) + self.cancel_on_stop(status) + return status + + def on_trigger(self) -> DeviceStatus | None: + """Called when the device is triggered.""" + if self.scan_info.msg.scan_name.startswith("xas"): + return None + # TODO implement logic for 'xas' scans + else: + start_time = time.time() + logger.warning(f"Triggering image with num_captured {self.hdf.num_captured.get()}") + img_counter = self.hdf.num_captured.get() + status = CompareStatus(self.hdf.num_captured, img_counter + 1) + logger.warning(f"Triggering took image {time.time() - start_time:.3f} seconds") + self.trigger_shot.put(1) + self.cancel_on_stop(status) + return status + + def _complete_callback(self, status: DeviceStatus): + """Callback for when the device completes a scan.""" + if status.success: + status.device.file_event.put( + file_path=status.device._full_path, done=True, successful=True + ) + else: + status.device.file_event.put( + file_path=status.device._full_path, done=True, successful=False + ) + + def on_complete(self) -> DeviceStatus | None: + """Called to inquire if a device has completed a scans.""" + if self.scan_info.msg.scan_name.startswith("xas"): + # TODO implement logic for 'xas' scans + return None + else: + status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.DONE.value) + status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value) + status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.UNARMED.value) + num_images = self.scan_info.msg.num_points * self.scan_info.msg.scan_parameters.get( + "frames_per_trigger", 1 + ) + status_img_written = CompareStatus(self.hdf.num_captured, num_images) + status = AndStatusWithList( + device=self, + status_list=[status_hdf, status_cam, status_img_written, status_cam_server], + ) + status.add_callback(self._complete_callback) # Callback that writing was successful + self.cancel_on_stop(status) + return status + + def on_kickoff(self) -> None: + """Called to kickoff a device for a fly scan. Has to be called explicitly.""" + + def on_stop(self) -> None: + """Called when the device is stopped.""" + self.cam.acquire.put(0) + self.hdf.capture.put(0) + + def on_destroy(self) -> None: + """Called when the device is destroyed. Cleanup resources here.""" + self.on_stop() + self._poll_thread_stop_event.set() + + +if __name__ == "__main__": + try: + pilatus = Pilatus(name="pilatus", prefix="X01DA-ES2-PIL:") + logger.info(f"Calling wait for connection") + # pilatus.wait_for_connection(all_signals=True, timeout=20) + logger.info(f"Connecting to pilatus...") + pilatus.on_connected() + for exp_time, scan_number, n_pnts in zip([0.5, 1.0, 2.0], [1, 2, 3], [30, 20, 10]): + logger.info(f"Sleeping for 5s") + time.sleep(5) + pilatus.scan_info.msg.num_points = n_pnts + pilatus.scan_info.msg.scan_parameters["exposure_time"] = exp_time + pilatus.scan_info.msg.scan_parameters["frames_per_trigger"] = 1 + pilatus.scan_info.msg.info["file_components"] = ( + f"/sls/x01da/data/p22481/raw/data/S00000-00999/S{scan_number:05d}/S{scan_number:05d}", + "h5", + ) + pilatus.on_stage() + logger.info(f"Stage done") + pilatus.on_pre_scan().wait(timeout=5) + logger.info(f"Pre-scan done") + for ii in range(pilatus.scan_info.msg.num_points): + # if ii == 0: + # time.sleep(1) + logger.info(f"Triggering image {ii+1}/{pilatus.scan_info.msg.num_points}") + pilatus.on_trigger().wait() + p = pilatus.preview.get() + if p is not None: + p: DevicePreviewMessage + logger.warning( + f"Preview shape: {p.data.shape}, max: {np.max(p.data)}, min: {np.min(p.data)}, mean: {np.mean(p.data)}" + ) + pilatus.on_complete().wait(timeout=5) + logger.info(f"Complete done") + pilatus.on_unstage() + logger.info(f"Unstage done") + finally: + pilatus.on_destroy()