cleanupt: pilatus

This commit is contained in:
2025-09-16 17:33:40 +02:00
parent 9b38139967
commit 0bd29120fb

View File

@@ -17,16 +17,12 @@ from ophyd.areadetector.cam import ADBase, PilatusDetectorCam
from ophyd.areadetector.plugins import HDF5Plugin_V22 as HDF5Plugin
from ophyd.areadetector.plugins import ImagePlugin_V22 as ImagePlugin
from ophyd.status import WaitTimeoutError
from ophyd_devices import (
AndStatusWithList,
CompareStatus,
DeviceStatus,
FileEventSignal,
PreviewSignal,
)
from ophyd_devices import CompareStatus, DeviceStatus, FileEventSignal, PreviewSignal
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from pydantic import BaseModel, Field
from debye_bec.devices.pilatus.utils import AndStatusWithList
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from bec_lib.messages import DevicePreviewMessage, ScanStatusMessage
@@ -83,19 +79,20 @@ class TRIGGERMODE(int, enum.Enum):
MULT_TRIGGER = 3
ALIGNMENT = 4
class MONOTRIGGERSOURCE(int, enum.Enum):
""""Mono XRD trigger source"""
""" "Mono XRD trigger source"""
EPICS = 0
INPOS = 1
class MONOTRIGGERMODE(int, enum.Enum):
""""Mono XRD trigger mode"""
""" "Mono XRD trigger mode"""
PULSE = 0
CONDITION = 1
def description(self) -> str:
"""Return a description of the trigger mode."""
descriptions = {
@@ -109,7 +106,6 @@ class MONOTRIGGERMODE(int, enum.Enum):
def __str__(self):
return self.description()
class ScanParameter(BaseModel):
@@ -217,12 +213,12 @@ class Pilatus(PSIDeviceBase, ADBase):
@property
def baseline_signals(self):
"""Define baseline signals"""
return[
return [
self.cam.acquire_time,
self.cam.num_exposures,
self.cam.threshold_energy,
self.cam.gain_menu,
self.cam.pixel_cut_off
self.cam.pixel_cut_off,
]
def __init__(
@@ -246,10 +242,7 @@ class Pilatus(PSIDeviceBase, ADBase):
)
self._poll_thread_kill_event = threading.Event()
self._poll_rate = 1 # Poll rate in Hz
self.xas_xrd_scan_names = [
"xas_simple_scan_with_xrd",
"xas_advanced_scan_with_xrd",
]
self.xas_xrd_scan_names = ["xas_simple_scan_with_xrd", "xas_advanced_scan_with_xrd"]
self.n_images = None
# self._live_mode_thread = threading.Thread(
# target=self._live_mode_loop, daemon=True, name=f"{self.name}_live_mode_thread"
@@ -377,6 +370,69 @@ class Pilatus(PSIDeviceBase, ADBase):
)
return status
def _calculate_trigger(self, scan_msg: ScanStatusMessage):
self._update_scan_parameter()
total_osc = 0
total_trig_lo = 0
total_trig_hi = 0
calc_duration = 0
n_trig_lo = 1
n_trig_hi = 1
init_lo = 1
init_hi = 1
lo_done = 0
hi_done = 0
if not self.scan_parameter.break_enable_low:
lo_done = 1
if not self.scan_parameter.break_enable_high:
hi_done = 1
start_time = time.time()
while True:
# TODO, we should not use infinite loops, for now let's add the escape Timeout of 20s, but should eventually be reviewed.
if time.time() - start_time > 20:
raise RuntimeError(
f"Calculating the number of triggers for scan {scan_msg.scan_name} took more than 20 seconds, aborting."
)
total_osc = total_osc + 2
calc_duration = calc_duration + 2 * self.scan_parameter.scan_time
if self.scan_parameter.break_enable_low and n_trig_lo >= self.scan_parameter.cycle_low:
n_trig_lo = 1
calc_duration = calc_duration + self.scan_parameter.break_time_low
if init_lo:
lo_done = 1
init_lo = 0
else:
n_trig_lo += 1
if (
self.scan_parameter.break_enable_high
and n_trig_hi >= self.scan_parameter.cycle_high
):
n_trig_hi = 1
calc_duration = calc_duration + self.scan_parameter.break_time_high
if init_hi:
hi_done = 1
init_hi = 0
else:
n_trig_hi += 1
if lo_done and hi_done:
n = np.floor(self.scan_parameter.scan_duration / calc_duration)
total_osc = total_osc * n
if self.scan_parameter.break_enable_low:
total_trig_lo = n + 1
if self.scan_parameter.break_enable_high:
total_trig_hi = n + 1
calc_duration = calc_duration * n
lo_done = 0
hi_done = 0
if calc_duration >= self.scan_parameter.scan_duration:
break
return total_trig_lo + total_trig_hi
########################################
# Beamline Specific Implementations #
########################################
@@ -427,84 +483,42 @@ class Pilatus(PSIDeviceBase, ADBase):
(self.scan_info.msg) object.
"""
# self.stop_live_mode() # Make sure that live mode is stopped if scan runs
self._update_scan_parameter()
scan_msg: ScanStatusMessage = self.scan_info.msg
if scan_msg.scan_name in self.xas_xrd_scan_names:
total_osc = 0
total_trig_lo = 0
total_trig_hi = 0
calc_duration = 0
n_trig_lo = 1
n_trig_hi = 1
init_lo = 1
init_hi = 1
lo_done = 0
hi_done = 0
if not self.scan_parameter.break_enable_low:
lo_done = 1
if not self.scan_parameter.break_enable_high:
hi_done = 1
while True:
total_osc = total_osc + 2
calc_duration = calc_duration + 2 * self.scan_parameter.scan_time
if self.scan_parameter.break_enable_low and n_trig_lo >= self.scan_parameter.cycle_low:
n_trig_lo = 1
calc_duration = calc_duration + self.scan_parameter.break_time_low
if init_lo:
lo_done = 1
init_lo = 0
else:
n_trig_lo += 1
if self.scan_parameter.break_enable_high and n_trig_hi >= self.scan_parameter.cycle_high:
n_trig_hi = 1
calc_duration = calc_duration + self.scan_parameter.break_time_high
if init_hi:
hi_done = 1
init_hi = 0
else:
n_trig_hi += 1
if lo_done and hi_done:
n = np.floor(self.scan_parameter.scan_duration / calc_duration)
total_osc = total_osc * n
if self.scan_parameter.break_enable_low:
total_trig_lo = n + 1
if self.scan_parameter.break_enable_high:
total_trig_hi = n + 1
calc_duration = calc_duration * n
lo_done = 0
hi_done = 0
if calc_duration >= self.scan_parameter.scan_duration:
break
# logger.info(f'total_osc: {total_osc}')
# logger.info(f'total trig low: {total_trig_lo}')
# logger.info(f'total trig high: {total_trig_hi}')
self._update_scan_parameter()
# Compute number of triggers
total_trig_lo, total_trig_hi = self._calculate_trigger(scan_msg)
# Set the number of images, we may also set this to a higher values if preferred and stop the acquisition
self.n_images = (total_trig_lo + total_trig_hi) * self.scan_parameter.n_of_trigger
exp_time = self.scan_parameter.exp_time
self.trigger_source.set(MONOTRIGGERSOURCE.INPOS).wait(5)
self.trigger_n_of.set(self.scan_parameter.n_of_trigger).wait(5)
elif scan_msg.scan_type == 'step':
self.n_images = scan_msg.num_points * scan_msg.scan_parameters.get("frames_per_trigger", 1)
elif scan_msg.scan_type == "step":
self.n_images = scan_msg.num_points * scan_msg.scan_parameters.get(
"frames_per_trigger", 1
)
exp_time = scan_msg.scan_parameters.get("exp_time")
self.trigger_source.set(MONOTRIGGERSOURCE.EPICS).wait(5)
self.trigger_n_of.set(1).wait(5) # BEC will trigger each acquisition
self.trigger_n_of.set(1).wait(5) # BEC will trigger each acquisition
else:
# TODO how to deal with fly scans?
return None
# Common settings
self.trigger_mode.set(MONOTRIGGERMODE.PULSE).wait(5)
self.trigger_period.set(exp_time).wait(5)
self.trigger_pulse_length.set(0.005).wait(5) # Pulse length of 5 ms enough for Pilatus and NIDAQ
self.trigger_pulse_length.set(0.005).wait(
5
) # Pulse length of 5 ms enough for Pilatus and NIDAQ
if exp_time - self._readout_time <= 0:
raise ValueError((f"Exposure time {exp_time} is too short ",
f"for Pilatus with readout_time {self._readout_time}."
))
raise ValueError(
(
f"Exposure time {exp_time} is too short ",
f"for Pilatus with readout_time {self._readout_time}.",
)
)
detector_exp_time = exp_time - self._readout_time
self._full_path = get_full_path(scan_msg, name="pilatus")
file_path = "/".join(self._full_path.split("/")[:-1])
@@ -519,7 +533,9 @@ class Pilatus(PSIDeviceBase, ADBase):
self.cam.acquire_period.set(exp_time).wait(5)
self.filter_number.set(0).wait(5)
# HDF5 settings
logger.debug(f"Setting HDF5 file path to {file_path} and file name to {file_name}. full_path is {self._full_path}")
logger.debug(
f"Setting HDF5 file path to {file_path} and file name to {file_name}. full_path is {self._full_path}"
)
self.hdf.file_path.set(file_path).wait(5)
self.hdf.file_name.set(file_name).wait(5)
self.hdf.num_capture.set(self.n_images).wait(5)
@@ -537,7 +553,9 @@ class Pilatus(PSIDeviceBase, ADBase):
def on_pre_scan(self) -> DeviceStatus | None:
"""Called right before the scan starts on all devices automatically."""
scan_msg: ScanStatusMessage = self.scan_info.msg
if scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == 'step':
if (
scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == "step"
): # TODO how to deal with fly scans?
status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.ACQUIRING.value)
status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.ACQUIRING.value)
status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.ARMED.value)
@@ -554,24 +572,23 @@ class Pilatus(PSIDeviceBase, ADBase):
def on_trigger(self) -> DeviceStatus | None:
"""Called when the device is triggered."""
scan_msg: ScanStatusMessage = self.scan_info.msg
if scan_msg.scan_name in self.xas_xrd_scan_names:
return None
elif scan_msg.scan_type == 'step':
start_time = time.time()
logger.warning(f"Triggering image with num_captured {self.hdf.num_captured.get()}")
img_counter = self.hdf.num_captured.get()
status = CompareStatus(self.hdf.num_captured, img_counter + 1)
logger.warning(f"Triggering took image {time.time() - start_time:.3f} seconds")
self.trigger_shot.put(1)
self.cancel_on_stop(status)
return status
else:
if not scan_msg.scan_type == "step":
return None
start_time = time.time()
img_counter = self.hdf.num_captured.get()
logger.debug(f"Triggering image with num_captured {img_counter}")
status = CompareStatus(self.hdf.num_captured, img_counter + 1)
logger.debug(f"Triggering took image {time.time() - start_time:.3f} seconds")
self.trigger_shot.put(1)
self.cancel_on_stop(status)
return status
def _complete_callback(self, status: DeviceStatus):
"""Callback for when the device completes a scan."""
scan_msg: ScanStatusMessage = self.scan_info.msg
if scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == 'step':
if (
scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == "step"
): # TODO how to deal with fly scans?
if status.success:
status.device.file_event.put(
file_path=status.device._full_path, # pylint: disable:protected-access
@@ -592,19 +609,24 @@ class Pilatus(PSIDeviceBase, ADBase):
def on_complete(self) -> DeviceStatus | None:
"""Called to inquire if a device has completed a scans."""
scan_msg: ScanStatusMessage = self.scan_info.msg
if scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == 'step':
if (
scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == "step"
): # TODO how to deal with fly scans?
status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.DONE.value)
status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value)
status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.UNARMED.value)
if self.scan_info.msg.scan_name in self.xas_xrd_scan_names:
# For long scans, it can be that the mono will execute one cycle more,
# meaning a few more XRD triggers will be sent
status_img_written = CompareStatus(self.hdf.num_captured, self.n_images, operation='>=')
status_img_written = CompareStatus(
self.hdf.num_captured, self.n_images, operation=">="
)
else:
status_img_written = CompareStatus(self.hdf.num_captured, self.n_images)
status_img_written = CompareStatus(self.hdf.num_captured, self.n_images)
status = AndStatusWithList(
device=self, status_list=[status_hdf, status_cam, status_img_written, status_cam_server]
device=self,
status_list=[status_hdf, status_cam, status_img_written, status_cam_server],
)
status.add_callback(self._complete_callback) # Callback that writing was successful
self.cancel_on_stop(status)