first draft

This commit is contained in:
2025-09-03 15:19:31 +02:00
parent 2f043c23ae
commit 22a56dfe25
3 changed files with 282 additions and 12 deletions

View File

@@ -1,12 +0,0 @@
from ophyd import Component as Cpt
from ophyd.areadetector.cam import ADBase, PilatusDetectorCam
from ophyd.areadetector.plugins import HDF5Plugin_V22 as HDF5Plugin
class PilatusDetector(ADBase):
cam = Cpt(PilatusDetectorCam, 'cam1:')
hdf = Cpt(HDF5Plugin, 'HDF1:')
def __init__(self, *, name: str, prefix: str = "", **kwargs):
super().__init__(name=name, prefix=prefix, **kwargs)
self.wait_for_connection(all_signals=True, timeout=30)

View File

View File

@@ -0,0 +1,282 @@
"""Pilatus AD integration at Debye beamline."""
from __future__ import annotations
import enum
import threading
import time
from typing import TYPE_CHECKING
from bec_lib.file_utils import get_full_path
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import EpicsSignal, Kind
from ophyd.areadetector.cam import ADBase, PilatusDetectorCam
from ophyd.areadetector.plugins import HDF5Plugin_V22 as HDF5Plugin
from ophyd.status import WaitTimeoutError
from ophyd_devices import AndStatus, CompareStatus, DeviceStatus, PreviewSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from bec_lib.messages import ScanStatusMessage
from bec_server.device_server.device_server import DeviceManagerDS
PILATUS_READOUT_TIME = 0.1 # in s
PILATUS_ACQUIRE_TIME = (
999999 # This time is the timeout of the detector in operation mode, so it needs to be large.
)
logger = bec_logger.logger
class ACQUIREMODE(int, enum.Enum):
"""Pilatus Acquisition Modes"""
DONE = 0
ACQUIRING = 1
class FILEWRITEMODE(int, enum.Enum):
"""HDF5 Plugin FileWrite Mode"""
SINGLE = 0
CAPTURE = 1
STREAM = 2
class COMPRESSIONALGORITHM(int, enum.Enum):
"""HDF5 Plugin Compression Algorithm"""
NONE = 0
NBIT = 1 # Don't use that..
SZIP = 2
ZLIB = 3
class TRIGGERMODE(int, enum.Enum):
"""Pilatus Trigger Modes"""
INTERNAL = 0
EXT_ENABLE = 1
EXT_TRIGGER = 2
MULT_TRIGGER = 3
ALIGNMENT = 4
def description(self) -> str:
"""Return a description of the trigger mode."""
descriptions = {
TRIGGERMODE.INTERNAL: "Internal trigger mode, images are acquired on internal trigger.",
TRIGGERMODE.EXT_ENABLE: "External Enable trigger mode; check manual as details are currently unknown",
TRIGGERMODE.EXT_TRIGGER: "External Trigger mode, images are acquired on external trigger signal. All images on single trigger.",
TRIGGERMODE.MULT_TRIGGER: "Multiple External Trigger mode, images are acquired on multiple external trigger signals. One image per trigger.",
TRIGGERMODE.ALIGNMENT: "Alignment mode, used for beam alignment.",
}
return descriptions.get(self, "Unknown")
def __str__(self):
return self.description()
class Pilatus(PSIDeviceBase, ADBase):
"""
Pilatus Base integration for Debye.
Prefix of the detector is 'X01DA-ES2-PIL:'
Args:
prefix (str) : Prefix for the IOC
name (str) : Name of the detector
scan_info (ScanInfo | None) : ScanInfo object passed through the device by the device_manager
device_manager (DeviceManager | None) : DeviceManager object passed through the device by the device_manager
"""
cam = Cpt(PilatusDetectorCam, "cam1:")
hdf = Cpt(HDF5Plugin, "HDF1:")
trigger_shot = Cpt(
EpicsSignal,
"X01DA-OP-MO1:BRAGG:xrd_trig_req",
kind=Kind.omitted,
doc="Trigger PV from MO1 Bragg",
)
trigger_source = Cpt(
EpicsSignal,
"X01DA-OP-MO1:BRAGG:xrd_trig_src_ENUM",
kind=Kind.omitted,
doc="Trigger Source; PV, 0 : EPICS, 1 : INPOS",
)
trigger_mode = Cpt(
EpicsSignal,
"X01DA-OP-MO1:BRAGG:xrd_trig_mode_ENUM",
kind=Kind.omitted,
doc="Trigger Mode; 0 : PULSE, 1 : CONDITION",
)
trigger_pulse_length = Cpt(
EpicsSignal,
"X01DA-OP-MO1:BRAGG:xrd_trig_len",
kind=Kind.omitted,
doc="Trigger Pulse Length in seconds",
)
preview = Cpt(
PreviewSignal,
name="preview",
ndim=2,
num_rotation_90=0, # Check this
doc="Preview signal for the Pilatus Detector",
)
def __init__(
self,
*,
name: str,
prefix: str = "",
scan_info: ScanInfo | None = None,
device_manager: DeviceManagerDS | None = None,
**kwargs,
):
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self.device_manager = device_manager
self._readout_time = PILATUS_READOUT_TIME
########################################
# Beamline Specific Implementations #
########################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
status = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value)
try:
status.wait(timeout=5)
except WaitTimeoutError:
logger.warning(
f"Camera device {self.name} was running an acquisition. Stopping acquisition."
)
self.cam.acquire.put(0)
self.hdf.capture.put(0)
self.cam.trigger_mode.set(TRIGGERMODE.MULT_TRIGGER.value).wait(5)
self.hdf.file_write_mode.set(FILEWRITEMODE.STREAM.value).wait(5)
self.hdf.file_template.set("%s%s.h5").wait(5)
self.hdf.auto_save.set(1).wait(5)
self.hdf.compression.set(COMPRESSIONALGORITHM.NONE.value).wait(5) # To test which to use
# TODO: Start background thread that polls data
def on_stage(self) -> DeviceStatus:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
scan_msg: ScanStatusMessage = self.scan_info.msg
exp_time = scan_msg.scan_parameters.get("exposure_time", 0.0)
if exp_time - self._readout_time <= 0:
raise ValueError(
f"Exposure time {exp_time} is too short for Pilatus with readout_time {self._readout_time}."
)
detector_exp_time = exp_time - self._readout_time
n_images = scan_msg.num_points * scan_msg.scan_parameters.get("frames_per_trigger", 1)
full_path = get_full_path(scan_msg, name="pilatus")
file_path = "/".join(full_path.split("/")[:-1])
file_name = full_path.split("/")[-1]
# TODO Check hown long this takes, make it asynchronous if slow..
start_time = time.time()
self.cam.array_callbacks.set(1).wait(5) # Enable array callbacks
self.hdf.enable.set(1).wait(5) # Enable HDF5 plugin
# Camera settings
self.cam.num_exposures.set(n_images).wait(5)
self.cam.acquire_time.set(detector_exp_time).wait(5)
self.cam.acquire_period.set(PILATUS_ACQUIRE_TIME).wait(5)
# HDF5 settings
logger.debug(f"Setting HDF5 file path to {file_path} and file name to {file_name}")
self.hdf.file_path.set(file_path).wait(5)
self.hdf.file_name.set(file_name).wait(5)
self.hdf.num_capture.set(n_images).wait(5)
self.cam.array_counter.set(0).wait(5) # Reset array counter
logger.warning(
f"Finished setting up detector {self.name} after {time.time() - start_time:.2f} seconds."
)
def on_unstage(self) -> None:
"""Called while unstaging the device."""
def on_pre_scan(self) -> DeviceStatus:
"""Called right before the scan starts on all devices automatically."""
status_hdf = TransitionStatus(self.hdf.capture, ACQUIREMODE.ACQUIRING.value)
status_cam = TransitionStatus(self.cam.acquire, ACQUIREMODE.ACQUIRING.value)
status = AndStatus(status_hdf, status_cam) # , name=f"{self.name}_on_pre_scan")
self.cam.acquire.put(1)
self.hdf.capture.put(1)
return status
def on_trigger(self) -> DeviceStatus:
"""Called when the device is triggered."""
# TODO should we fetch the image counter value, or rather use our own count
# TODO check logic for xas scans!
if self.scan_info.msg.scan_type == "step":
img_counter = self.cam.array_counter.get()
status = CompareStatus(self.cam.array_counter, img_counter + 1)
self.trigger_shot.put(1)
return status
def on_complete(self) -> DeviceStatus:
"""Called to inquire if a device has completed a scans."""
status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.DONE.value)
status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value)
num_images = self.scan_info.msg.num_points * self.scan_info.msg.scan_parameters.get(
"frames_per_trigger", 1
)
status_img_written = CompareStatus(self.hdf.num_captured, num_images)
# TODO change to new ANDSTATUS
status_cam = AndStatus(status_hdf, status_cam)
status = AndStatus(status_cam, status_img_written) # , name=f"{self.name}_on_complete")
return status
def on_kickoff(self) -> None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.cam.acquire.put(0)
self.hdf.capture.put(0)
def on_destroy(self) -> None:
"""Called when the device is destroyed. Cleanup resources here."""
if __name__ == "__main__":
pilatus = Pilatus(name="pilatus", prefix="X01DA-ES2-PIL:")
pilatus.on_connected()
pilatus.wait_for_connection(all_signals=True, timeout=20)
pilatus.scan_info.msg.num_points = 10
pilatus.scan_info.msg.scan_parameters["exposure_time"] = 1.0
pilatus.scan_info.msg.scan_parameters["frames_per_trigger"] = 1
pilatus.scan_info.msg.info["file_components"] = (
"/sls/x01da/data/p22481/raw/data/S00000-00999/S00001/S00001",
"h5",
)
pilatus.on_stage()
logger.info(f"Stage done")
pilatus.on_pre_scan().wait(timeout=5)
logger.info(f"Pre-scan done")
for ii in range(pilatus.scan_info.msg.num_points):
logger.info(f"Triggering image {ii+1}/{pilatus.scan_info.msg.num_points}")
pilatus.on_trigger().wait()
pilatus.on_complete().wait(timeout=5)
logger.info(f"Complete done")
pilatus.on_unstage()
logger.info(f"Unstage done")