From b0fc18040513f8afe8ee6d03a160aeabdf45f77d Mon Sep 17 00:00:00 2001 From: appel_c Date: Wed, 3 Sep 2025 18:51:53 +0200 Subject: [PATCH] w --- debye_bec/devices/pilatus/pilatus.py | 175 ++++++++++++++++++--------- 1 file changed, 119 insertions(+), 56 deletions(-) diff --git a/debye_bec/devices/pilatus/pilatus.py b/debye_bec/devices/pilatus/pilatus.py index 48f347a..3157e4f 100644 --- a/debye_bec/devices/pilatus/pilatus.py +++ b/debye_bec/devices/pilatus/pilatus.py @@ -7,6 +7,7 @@ import threading import time from typing import TYPE_CHECKING +import numpy as np from bec_lib.file_utils import get_full_path from bec_lib.logger import bec_logger from ophyd import Component as Cpt @@ -14,12 +15,14 @@ from ophyd import EpicsSignal, Kind from ophyd.areadetector.cam import ADBase, PilatusDetectorCam from ophyd.areadetector.plugins import HDF5Plugin_V22 as HDF5Plugin from ophyd.status import WaitTimeoutError -from ophyd_devices import AndStatus, CompareStatus, DeviceStatus, PreviewSignal +from ophyd_devices import AndStatus, CompareStatus, DeviceStatus, FileEventSignal, PreviewSignal +from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 as ImagePlugin from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase +from ophyd_devices.utils.psi_device_base_utils import TaskStatus if TYPE_CHECKING: # pragma: no cover from bec_lib.devicemanager import ScanInfo - from bec_lib.messages import ScanStatusMessage + from bec_lib.messages import DevicePreviewMessage, ScanStatusMessage from bec_server.device_server.device_server import DeviceManagerDS PILATUS_READOUT_TIME = 0.1 # in s @@ -27,6 +30,8 @@ PILATUS_ACQUIRE_TIME = ( 999999 # This time is the timeout of the detector in operation mode, so it needs to be large. ) +# pylint: disable=redefined-outer-name + logger = bec_logger.logger @@ -92,6 +97,7 @@ class Pilatus(PSIDeviceBase, ADBase): cam = Cpt(PilatusDetectorCam, "cam1:") hdf = Cpt(HDF5Plugin, "HDF1:") + image1 = Cpt(ImagePlugin, "image1:") filter_number = Cpt( EpicsSignal, "cam1:FileNumber", kind=Kind.omitted, doc="File number for ramdisk" ) @@ -134,6 +140,7 @@ class Pilatus(PSIDeviceBase, ADBase): num_rotation_90=0, # Check this doc="Preview signal for the Pilatus Detector", ) + file_event = Cpt(FileEventSignal, name="file_event") def __init__( self, @@ -149,6 +156,29 @@ class Pilatus(PSIDeviceBase, ADBase): ) self.device_manager = device_manager self._readout_time = PILATUS_READOUT_TIME + self._full_path = "" + self._poll_thread_stop_event = threading.Event() + self._task_status: TaskStatus | None = None + self._poll_rate = 1 # 1Hz + + ######################################## + # Custom Beamline Methods # + ######################################## + + def _poll_array_data(self): + while not self._poll_thread_stop_event.wait(1 / self._poll_rate): + value = self.image1.array_data.get() + if value is None: + continue + width = self.image1.array_size.width.get() + height = self.image1.array_size.height.get() + # Geometry correction for the image + data = np.reshape(value, (height, width)) + last_image = self.preview.get() + if np.array_equal(data, last_image): + # No update if image is the same, ~2.5ms on 2400x2400 image (6M) + return + self.preview.put(data) ######################################## # Beamline Specific Implementations # @@ -185,84 +215,110 @@ class Pilatus(PSIDeviceBase, ADBase): self.hdf.auto_save.set(1).wait(5) self.hdf.lazy_open.set(1).wait(5) self.hdf.compression.set(COMPRESSIONALGORITHM.NONE.value).wait(5) # To test which to use + # Start polling thread... + self._task_status = self.task_handler.submit_task(task=self._poll_array_data, run=True) - # TODO: Start background thread that polls data - - def on_stage(self) -> DeviceStatus: + def on_stage(self) -> DeviceStatus | None: """ Called while staging the device. Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object. """ scan_msg: ScanStatusMessage = self.scan_info.msg - exp_time = scan_msg.scan_parameters.get("exposure_time", 0.0) - if exp_time - self._readout_time <= 0: - raise ValueError( - f"Exposure time {exp_time} is too short for Pilatus with readout_time {self._readout_time}." - ) - detector_exp_time = exp_time - self._readout_time - n_images = scan_msg.num_points * scan_msg.scan_parameters.get("frames_per_trigger", 1) - full_path = get_full_path(scan_msg, name="pilatus") - file_path = "/".join(full_path.split("/")[:-1]) - file_name = full_path.split("/")[-1] + if scan_msg.scan_name.startswith("xas"): + return None + # TODO implement logic for 'xas' scans + else: + exp_time = scan_msg.scan_parameters.get("exposure_time", 0.0) + if exp_time - self._readout_time <= 0: + raise ValueError( + f"Exposure time {exp_time} is too short for Pilatus with readout_time {self._readout_time}." + ) + detector_exp_time = exp_time - self._readout_time + n_images = scan_msg.num_points * scan_msg.scan_parameters.get("frames_per_trigger", 1) + self._full_path = get_full_path(scan_msg, name="pilatus") + file_path = "/".join(self._full_path.split("/")[:-1]) + file_name = self._full_path.split("/")[-1] - # TODO Check hown long this takes, make it asynchronous if slow.. - start_time = time.time() - self.cam.array_callbacks.put(1) # Enable array callbacks - self.hdf.enable.put(1) # Enable HDF5 plugin - # Camera settings - self.cam.num_exposures.put(1) # why - self.cam.num_images.put(n_images) - self.cam.acquire_time.put(detector_exp_time) - self.cam.acquire_period.put(PILATUS_ACQUIRE_TIME) - self.filter_number.put(0) - # HDF5 settings - logger.debug(f"Setting HDF5 file path to {file_path} and file name to {file_name}") - self.hdf.file_path.put(file_path) - self.hdf.file_name.put(file_name) - self.hdf.num_capture.put(n_images) - self.cam.array_counter.put(0) # Reset array counter - logger.warning( - f"Finished setting up detector {self.name} after {time.time() - start_time:.2f} seconds." - ) + # Prepare detector and backend + self.cam.array_callbacks.set(1).wait(5) # Enable array callbacks + self.hdf.enable.set(1).wait(5) # Enable HDF5 plugin + # Camera settings + self.cam.num_exposures.set(1).wait(5) # why + self.cam.num_images.set(n_images).wait(5) + self.cam.acquire_time.set(detector_exp_time).wait(5) + self.cam.acquire_period.set(PILATUS_ACQUIRE_TIME).wait(5) + self.filter_number.set(0).wait(5) + # HDF5 settings + logger.debug(f"Setting HDF5 file path to {file_path} and file name to {file_name}") + self.hdf.file_path.set(file_path).wait(5) + self.hdf.file_name.set(file_name).wait(5) + self.hdf.num_capture.set(n_images).wait(5) + self.cam.array_counter.set(0).wait(5) # Reset array counter + self.file_event.put( + file_path=self._full_path, done=False, successful=False + ) # TODO add h5_entry dict + return None def on_unstage(self) -> None: """Called while unstaging the device.""" - def on_pre_scan(self) -> DeviceStatus: + def on_pre_scan(self) -> DeviceStatus | None: """Called right before the scan starts on all devices automatically.""" - status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.ACQUIRING.value) - status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.ACQUIRING.value) - status = AndStatus(status_hdf, status_cam) # , name=f"{self.name}_on_pre_scan") - self.cam.acquire.put(1) - self.hdf.capture.put(1) - return status + if self.scan_info.msg.scan_name.startswith("xas"): + # TODO implement logic for 'xas' scans + return None + else: + status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.ACQUIRING.value) + status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.ACQUIRING.value) + status = AndStatus(status_hdf, status_cam) # , name=f"{self.name}_on_pre_scan") + self.cam.acquire.put(1) + self.hdf.capture.put(1) + return status - def on_trigger(self) -> DeviceStatus: + def on_trigger(self) -> DeviceStatus | None: """Called when the device is triggered.""" - # TODO should we fetch the image counter value, or rather use our own count - # TODO check logic for xas scans! - if self.scan_info.msg.scan_type == "step": + if self.scan_info.msg.scan_name.startswith("xas"): + return None + # TODO implement logic for 'xas' scans + else: start_time = time.time() logger.warning(f"Triggering image with num_captured {self.hdf.num_captured.get()}") img_counter = self.hdf.num_captured.get() status = CompareStatus(self.hdf.num_captured, img_counter + 1) logger.warning(f"Triggering took image {time.time() - start_time:.3f} seconds") self.trigger_shot.put(1) + self.cancel_on_stop(status) return status - def on_complete(self) -> DeviceStatus: + def _complete_callback(self, status: DeviceStatus): + """Callback for when the device completes a scan.""" + if status.success: + status.device.file_event.put( + file_path=status.device._full_path, done=True, successful=True + ) + else: + status.device.file_event.put( + file_path=status.device._full_path, done=True, successful=False + ) + + def on_complete(self) -> DeviceStatus | None: """Called to inquire if a device has completed a scans.""" - status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.DONE.value) - status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value) - num_images = self.scan_info.msg.num_points * self.scan_info.msg.scan_parameters.get( - "frames_per_trigger", 1 - ) - status_img_written = CompareStatus(self.hdf.num_captured, num_images) - # TODO change to new ANDSTATUS - status_cam = AndStatus(status_hdf, status_cam) - status = AndStatus(status_cam, status_img_written) # , name=f"{self.name}_on_complete") - return status + if self.scan_info.msg.scan_name.startswith("xas"): + # TODO implement logic for 'xas' scans + return None + else: + status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.DONE.value) + status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value) + num_images = self.scan_info.msg.num_points * self.scan_info.msg.scan_parameters.get( + "frames_per_trigger", 1 + ) + status_img_written = CompareStatus(self.hdf.num_captured, num_images) + # TODO change to new ANDSTATUS + status_cam = AndStatus(status_hdf, status_cam) + status = AndStatus(status_cam, status_img_written) # , name=f"{self.name}_on_complete") + status.add_callback(self._complete_callback) # Callback that writing was successful + return status def on_kickoff(self) -> None: """Called to kickoff a device for a fly scan. Has to be called explicitly.""" @@ -274,6 +330,7 @@ class Pilatus(PSIDeviceBase, ADBase): def on_destroy(self) -> None: """Called when the device is destroyed. Cleanup resources here.""" + self._poll_thread_stop_event.set() if __name__ == "__main__": @@ -299,6 +356,12 @@ if __name__ == "__main__": # time.sleep(1) logger.info(f"Triggering image {ii+1}/{pilatus.scan_info.msg.num_points}") pilatus.on_trigger().wait() + p = pilatus.preview.get() + if p is not None: + p: DevicePreviewMessage + logger.warning( + f"Preview shape: {p.data.shape}, max: {np.max(p.data)}, min: {np.min(p.data)}" + ) pilatus.on_complete().wait(timeout=5) logger.info(f"Complete done") pilatus.on_unstage()