This commit is contained in:
2025-09-03 18:51:53 +02:00
parent 7360829ed2
commit b0fc180405

View File

@@ -7,6 +7,7 @@ import threading
import time
from typing import TYPE_CHECKING
import numpy as np
from bec_lib.file_utils import get_full_path
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
@@ -14,12 +15,14 @@ from ophyd import EpicsSignal, Kind
from ophyd.areadetector.cam import ADBase, PilatusDetectorCam
from ophyd.areadetector.plugins import HDF5Plugin_V22 as HDF5Plugin
from ophyd.status import WaitTimeoutError
from ophyd_devices import AndStatus, CompareStatus, DeviceStatus, PreviewSignal
from ophyd_devices import AndStatus, CompareStatus, DeviceStatus, FileEventSignal, PreviewSignal
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 as ImagePlugin
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.psi_device_base_utils import TaskStatus
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from bec_lib.messages import ScanStatusMessage
from bec_lib.messages import DevicePreviewMessage, ScanStatusMessage
from bec_server.device_server.device_server import DeviceManagerDS
PILATUS_READOUT_TIME = 0.1 # in s
@@ -27,6 +30,8 @@ PILATUS_ACQUIRE_TIME = (
999999 # This time is the timeout of the detector in operation mode, so it needs to be large.
)
# pylint: disable=redefined-outer-name
logger = bec_logger.logger
@@ -92,6 +97,7 @@ class Pilatus(PSIDeviceBase, ADBase):
cam = Cpt(PilatusDetectorCam, "cam1:")
hdf = Cpt(HDF5Plugin, "HDF1:")
image1 = Cpt(ImagePlugin, "image1:")
filter_number = Cpt(
EpicsSignal, "cam1:FileNumber", kind=Kind.omitted, doc="File number for ramdisk"
)
@@ -134,6 +140,7 @@ class Pilatus(PSIDeviceBase, ADBase):
num_rotation_90=0, # Check this
doc="Preview signal for the Pilatus Detector",
)
file_event = Cpt(FileEventSignal, name="file_event")
def __init__(
self,
@@ -149,6 +156,29 @@ class Pilatus(PSIDeviceBase, ADBase):
)
self.device_manager = device_manager
self._readout_time = PILATUS_READOUT_TIME
self._full_path = ""
self._poll_thread_stop_event = threading.Event()
self._task_status: TaskStatus | None = None
self._poll_rate = 1 # 1Hz
########################################
# Custom Beamline Methods #
########################################
def _poll_array_data(self):
while not self._poll_thread_stop_event.wait(1 / self._poll_rate):
value = self.image1.array_data.get()
if value is None:
continue
width = self.image1.array_size.width.get()
height = self.image1.array_size.height.get()
# Geometry correction for the image
data = np.reshape(value, (height, width))
last_image = self.preview.get()
if np.array_equal(data, last_image):
# No update if image is the same, ~2.5ms on 2400x2400 image (6M)
return
self.preview.put(data)
########################################
# Beamline Specific Implementations #
@@ -185,84 +215,110 @@ class Pilatus(PSIDeviceBase, ADBase):
self.hdf.auto_save.set(1).wait(5)
self.hdf.lazy_open.set(1).wait(5)
self.hdf.compression.set(COMPRESSIONALGORITHM.NONE.value).wait(5) # To test which to use
# Start polling thread...
self._task_status = self.task_handler.submit_task(task=self._poll_array_data, run=True)
# TODO: Start background thread that polls data
def on_stage(self) -> DeviceStatus:
def on_stage(self) -> DeviceStatus | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
scan_msg: ScanStatusMessage = self.scan_info.msg
exp_time = scan_msg.scan_parameters.get("exposure_time", 0.0)
if exp_time - self._readout_time <= 0:
raise ValueError(
f"Exposure time {exp_time} is too short for Pilatus with readout_time {self._readout_time}."
)
detector_exp_time = exp_time - self._readout_time
n_images = scan_msg.num_points * scan_msg.scan_parameters.get("frames_per_trigger", 1)
full_path = get_full_path(scan_msg, name="pilatus")
file_path = "/".join(full_path.split("/")[:-1])
file_name = full_path.split("/")[-1]
if scan_msg.scan_name.startswith("xas"):
return None
# TODO implement logic for 'xas' scans
else:
exp_time = scan_msg.scan_parameters.get("exposure_time", 0.0)
if exp_time - self._readout_time <= 0:
raise ValueError(
f"Exposure time {exp_time} is too short for Pilatus with readout_time {self._readout_time}."
)
detector_exp_time = exp_time - self._readout_time
n_images = scan_msg.num_points * scan_msg.scan_parameters.get("frames_per_trigger", 1)
self._full_path = get_full_path(scan_msg, name="pilatus")
file_path = "/".join(self._full_path.split("/")[:-1])
file_name = self._full_path.split("/")[-1]
# TODO Check hown long this takes, make it asynchronous if slow..
start_time = time.time()
self.cam.array_callbacks.put(1) # Enable array callbacks
self.hdf.enable.put(1) # Enable HDF5 plugin
# Camera settings
self.cam.num_exposures.put(1) # why
self.cam.num_images.put(n_images)
self.cam.acquire_time.put(detector_exp_time)
self.cam.acquire_period.put(PILATUS_ACQUIRE_TIME)
self.filter_number.put(0)
# HDF5 settings
logger.debug(f"Setting HDF5 file path to {file_path} and file name to {file_name}")
self.hdf.file_path.put(file_path)
self.hdf.file_name.put(file_name)
self.hdf.num_capture.put(n_images)
self.cam.array_counter.put(0) # Reset array counter
logger.warning(
f"Finished setting up detector {self.name} after {time.time() - start_time:.2f} seconds."
)
# Prepare detector and backend
self.cam.array_callbacks.set(1).wait(5) # Enable array callbacks
self.hdf.enable.set(1).wait(5) # Enable HDF5 plugin
# Camera settings
self.cam.num_exposures.set(1).wait(5) # why
self.cam.num_images.set(n_images).wait(5)
self.cam.acquire_time.set(detector_exp_time).wait(5)
self.cam.acquire_period.set(PILATUS_ACQUIRE_TIME).wait(5)
self.filter_number.set(0).wait(5)
# HDF5 settings
logger.debug(f"Setting HDF5 file path to {file_path} and file name to {file_name}")
self.hdf.file_path.set(file_path).wait(5)
self.hdf.file_name.set(file_name).wait(5)
self.hdf.num_capture.set(n_images).wait(5)
self.cam.array_counter.set(0).wait(5) # Reset array counter
self.file_event.put(
file_path=self._full_path, done=False, successful=False
) # TODO add h5_entry dict
return None
def on_unstage(self) -> None:
"""Called while unstaging the device."""
def on_pre_scan(self) -> DeviceStatus:
def on_pre_scan(self) -> DeviceStatus | None:
"""Called right before the scan starts on all devices automatically."""
status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.ACQUIRING.value)
status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.ACQUIRING.value)
status = AndStatus(status_hdf, status_cam) # , name=f"{self.name}_on_pre_scan")
self.cam.acquire.put(1)
self.hdf.capture.put(1)
return status
if self.scan_info.msg.scan_name.startswith("xas"):
# TODO implement logic for 'xas' scans
return None
else:
status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.ACQUIRING.value)
status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.ACQUIRING.value)
status = AndStatus(status_hdf, status_cam) # , name=f"{self.name}_on_pre_scan")
self.cam.acquire.put(1)
self.hdf.capture.put(1)
return status
def on_trigger(self) -> DeviceStatus:
def on_trigger(self) -> DeviceStatus | None:
"""Called when the device is triggered."""
# TODO should we fetch the image counter value, or rather use our own count
# TODO check logic for xas scans!
if self.scan_info.msg.scan_type == "step":
if self.scan_info.msg.scan_name.startswith("xas"):
return None
# TODO implement logic for 'xas' scans
else:
start_time = time.time()
logger.warning(f"Triggering image with num_captured {self.hdf.num_captured.get()}")
img_counter = self.hdf.num_captured.get()
status = CompareStatus(self.hdf.num_captured, img_counter + 1)
logger.warning(f"Triggering took image {time.time() - start_time:.3f} seconds")
self.trigger_shot.put(1)
self.cancel_on_stop(status)
return status
def on_complete(self) -> DeviceStatus:
def _complete_callback(self, status: DeviceStatus):
"""Callback for when the device completes a scan."""
if status.success:
status.device.file_event.put(
file_path=status.device._full_path, done=True, successful=True
)
else:
status.device.file_event.put(
file_path=status.device._full_path, done=True, successful=False
)
def on_complete(self) -> DeviceStatus | None:
"""Called to inquire if a device has completed a scans."""
status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.DONE.value)
status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value)
num_images = self.scan_info.msg.num_points * self.scan_info.msg.scan_parameters.get(
"frames_per_trigger", 1
)
status_img_written = CompareStatus(self.hdf.num_captured, num_images)
# TODO change to new ANDSTATUS
status_cam = AndStatus(status_hdf, status_cam)
status = AndStatus(status_cam, status_img_written) # , name=f"{self.name}_on_complete")
return status
if self.scan_info.msg.scan_name.startswith("xas"):
# TODO implement logic for 'xas' scans
return None
else:
status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.DONE.value)
status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value)
num_images = self.scan_info.msg.num_points * self.scan_info.msg.scan_parameters.get(
"frames_per_trigger", 1
)
status_img_written = CompareStatus(self.hdf.num_captured, num_images)
# TODO change to new ANDSTATUS
status_cam = AndStatus(status_hdf, status_cam)
status = AndStatus(status_cam, status_img_written) # , name=f"{self.name}_on_complete")
status.add_callback(self._complete_callback) # Callback that writing was successful
return status
def on_kickoff(self) -> None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
@@ -274,6 +330,7 @@ class Pilatus(PSIDeviceBase, ADBase):
def on_destroy(self) -> None:
"""Called when the device is destroyed. Cleanup resources here."""
self._poll_thread_stop_event.set()
if __name__ == "__main__":
@@ -299,6 +356,12 @@ if __name__ == "__main__":
# time.sleep(1)
logger.info(f"Triggering image {ii+1}/{pilatus.scan_info.msg.num_points}")
pilatus.on_trigger().wait()
p = pilatus.preview.get()
if p is not None:
p: DevicePreviewMessage
logger.warning(
f"Preview shape: {p.data.shape}, max: {np.max(p.data)}, min: {np.min(p.data)}"
)
pilatus.on_complete().wait(timeout=5)
logger.info(f"Complete done")
pilatus.on_unstage()