diff --git a/debye_bec/devices/pilatus/pilatus.py b/debye_bec/devices/pilatus/pilatus.py index b62ca35..e12aac1 100644 --- a/debye_bec/devices/pilatus/pilatus.py +++ b/debye_bec/devices/pilatus/pilatus.py @@ -17,16 +17,12 @@ from ophyd.areadetector.cam import ADBase, PilatusDetectorCam from ophyd.areadetector.plugins import HDF5Plugin_V22 as HDF5Plugin from ophyd.areadetector.plugins import ImagePlugin_V22 as ImagePlugin from ophyd.status import WaitTimeoutError -from ophyd_devices import ( - AndStatusWithList, - CompareStatus, - DeviceStatus, - FileEventSignal, - PreviewSignal, -) +from ophyd_devices import CompareStatus, DeviceStatus, FileEventSignal, PreviewSignal from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from pydantic import BaseModel, Field +from debye_bec.devices.pilatus.utils import AndStatusWithList + if TYPE_CHECKING: # pragma: no cover from bec_lib.devicemanager import ScanInfo from bec_lib.messages import DevicePreviewMessage, ScanStatusMessage @@ -83,19 +79,20 @@ class TRIGGERMODE(int, enum.Enum): MULT_TRIGGER = 3 ALIGNMENT = 4 + class MONOTRIGGERSOURCE(int, enum.Enum): - """"Mono XRD trigger source""" + """ "Mono XRD trigger source""" EPICS = 0 INPOS = 1 + class MONOTRIGGERMODE(int, enum.Enum): - """"Mono XRD trigger mode""" + """ "Mono XRD trigger mode""" PULSE = 0 CONDITION = 1 - def description(self) -> str: """Return a description of the trigger mode.""" descriptions = { @@ -109,7 +106,6 @@ class MONOTRIGGERMODE(int, enum.Enum): def __str__(self): return self.description() - class ScanParameter(BaseModel): @@ -217,12 +213,12 @@ class Pilatus(PSIDeviceBase, ADBase): @property def baseline_signals(self): """Define baseline signals""" - return[ + return [ self.cam.acquire_time, self.cam.num_exposures, self.cam.threshold_energy, self.cam.gain_menu, - self.cam.pixel_cut_off + self.cam.pixel_cut_off, ] def __init__( @@ -246,10 +242,7 @@ class Pilatus(PSIDeviceBase, ADBase): ) self._poll_thread_kill_event = threading.Event() self._poll_rate = 1 # Poll rate in Hz - self.xas_xrd_scan_names = [ - "xas_simple_scan_with_xrd", - "xas_advanced_scan_with_xrd", - ] + self.xas_xrd_scan_names = ["xas_simple_scan_with_xrd", "xas_advanced_scan_with_xrd"] self.n_images = None # self._live_mode_thread = threading.Thread( # target=self._live_mode_loop, daemon=True, name=f"{self.name}_live_mode_thread" @@ -377,6 +370,69 @@ class Pilatus(PSIDeviceBase, ADBase): ) return status + def _calculate_trigger(self, scan_msg: ScanStatusMessage): + self._update_scan_parameter() + total_osc = 0 + total_trig_lo = 0 + total_trig_hi = 0 + calc_duration = 0 + n_trig_lo = 1 + n_trig_hi = 1 + init_lo = 1 + init_hi = 1 + lo_done = 0 + hi_done = 0 + if not self.scan_parameter.break_enable_low: + lo_done = 1 + if not self.scan_parameter.break_enable_high: + hi_done = 1 + start_time = time.time() + while True: + # TODO, we should not use infinite loops, for now let's add the escape Timeout of 20s, but should eventually be reviewed. + if time.time() - start_time > 20: + raise RuntimeError( + f"Calculating the number of triggers for scan {scan_msg.scan_name} took more than 20 seconds, aborting." + ) + total_osc = total_osc + 2 + calc_duration = calc_duration + 2 * self.scan_parameter.scan_time + + if self.scan_parameter.break_enable_low and n_trig_lo >= self.scan_parameter.cycle_low: + n_trig_lo = 1 + calc_duration = calc_duration + self.scan_parameter.break_time_low + if init_lo: + lo_done = 1 + init_lo = 0 + else: + n_trig_lo += 1 + + if ( + self.scan_parameter.break_enable_high + and n_trig_hi >= self.scan_parameter.cycle_high + ): + n_trig_hi = 1 + calc_duration = calc_duration + self.scan_parameter.break_time_high + if init_hi: + hi_done = 1 + init_hi = 0 + else: + n_trig_hi += 1 + + if lo_done and hi_done: + n = np.floor(self.scan_parameter.scan_duration / calc_duration) + total_osc = total_osc * n + if self.scan_parameter.break_enable_low: + total_trig_lo = n + 1 + if self.scan_parameter.break_enable_high: + total_trig_hi = n + 1 + calc_duration = calc_duration * n + lo_done = 0 + hi_done = 0 + + if calc_duration >= self.scan_parameter.scan_duration: + break + + return total_trig_lo + total_trig_hi + ######################################## # Beamline Specific Implementations # ######################################## @@ -427,84 +483,45 @@ class Pilatus(PSIDeviceBase, ADBase): (self.scan_info.msg) object. """ # self.stop_live_mode() # Make sure that live mode is stopped if scan runs - self._update_scan_parameter() + scan_msg: ScanStatusMessage = self.scan_info.msg if scan_msg.scan_name in self.xas_xrd_scan_names: - total_osc = 0 - total_trig_lo = 0 - total_trig_hi = 0 - calc_duration = 0 - n_trig_lo = 1 - n_trig_hi = 1 - init_lo = 1 - init_hi = 1 - lo_done = 0 - hi_done = 0 - if not self.scan_parameter.break_enable_low: - lo_done = 1 - if not self.scan_parameter.break_enable_high: - hi_done = 1 - while True: - total_osc = total_osc + 2 - calc_duration = calc_duration + 2 * self.scan_parameter.scan_time - - if self.scan_parameter.break_enable_low and n_trig_lo >= self.scan_parameter.cycle_low: - n_trig_lo = 1 - calc_duration = calc_duration + self.scan_parameter.break_time_low - if init_lo: - lo_done = 1 - init_lo = 0 - else: - n_trig_lo += 1 - - if self.scan_parameter.break_enable_high and n_trig_hi >= self.scan_parameter.cycle_high: - n_trig_hi = 1 - calc_duration = calc_duration + self.scan_parameter.break_time_high - if init_hi: - hi_done = 1 - init_hi = 0 - else: - n_trig_hi += 1 - - if lo_done and hi_done: - n = np.floor(self.scan_parameter.scan_duration / calc_duration) - total_osc = total_osc * n - if self.scan_parameter.break_enable_low: - total_trig_lo = n + 1 - if self.scan_parameter.break_enable_high: - total_trig_hi = n + 1 - calc_duration = calc_duration * n - lo_done = 0 - hi_done = 0 - - if calc_duration >= self.scan_parameter.scan_duration: - break - - # logger.info(f'total_osc: {total_osc}') - # logger.info(f'total trig low: {total_trig_lo}') - # logger.info(f'total trig high: {total_trig_hi}') - + self._update_scan_parameter() + # Compute number of triggers + total_trig_lo, total_trig_hi = self._calculate_trigger(scan_msg) + # Set the number of images, we may also set this to a higher values if preferred and stop the acquisition + # TODO This logic is prone to errors, as we rely on the scans to nicely resolve to n_images. We should + # use here instead a way of settings the n_images independently of the scan parameters to avoid running out of sync + # with the complete method. Ideally we comput them in the scan itself.. This is much safer IMO! self.n_images = (total_trig_lo + total_trig_hi) * self.scan_parameter.n_of_trigger exp_time = self.scan_parameter.exp_time self.trigger_source.set(MONOTRIGGERSOURCE.INPOS).wait(5) self.trigger_n_of.set(self.scan_parameter.n_of_trigger).wait(5) - elif scan_msg.scan_type == 'step': - self.n_images = scan_msg.num_points * scan_msg.scan_parameters.get("frames_per_trigger", 1) + elif scan_msg.scan_type == "step": + self.n_images = scan_msg.num_points * scan_msg.scan_parameters.get( + "frames_per_trigger", 1 + ) exp_time = scan_msg.scan_parameters.get("exp_time") self.trigger_source.set(MONOTRIGGERSOURCE.EPICS).wait(5) - self.trigger_n_of.set(1).wait(5) # BEC will trigger each acquisition + self.trigger_n_of.set(1).wait(5) # BEC will trigger each acquisition else: + # TODO how to deal with fly scans? return None # Common settings self.trigger_mode.set(MONOTRIGGERMODE.PULSE).wait(5) self.trigger_period.set(exp_time).wait(5) - self.trigger_pulse_length.set(0.005).wait(5) # Pulse length of 5 ms enough for Pilatus and NIDAQ + self.trigger_pulse_length.set(0.005).wait( + 5 + ) # Pulse length of 5 ms enough for Pilatus and NIDAQ if exp_time - self._readout_time <= 0: - raise ValueError((f"Exposure time {exp_time} is too short ", - f"for Pilatus with readout_time {self._readout_time}." - )) + raise ValueError( + ( + f"Exposure time {exp_time} is too short ", + f"for Pilatus with readout_time {self._readout_time}.", + ) + ) detector_exp_time = exp_time - self._readout_time self._full_path = get_full_path(scan_msg, name="pilatus") file_path = "/".join(self._full_path.split("/")[:-1]) @@ -519,7 +536,9 @@ class Pilatus(PSIDeviceBase, ADBase): self.cam.acquire_period.set(exp_time).wait(5) self.filter_number.set(0).wait(5) # HDF5 settings - logger.debug(f"Setting HDF5 file path to {file_path} and file name to {file_name}. full_path is {self._full_path}") + logger.debug( + f"Setting HDF5 file path to {file_path} and file name to {file_name}. full_path is {self._full_path}" + ) self.hdf.file_path.set(file_path).wait(5) self.hdf.file_name.set(file_name).wait(5) self.hdf.num_capture.set(self.n_images).wait(5) @@ -537,7 +556,9 @@ class Pilatus(PSIDeviceBase, ADBase): def on_pre_scan(self) -> DeviceStatus | None: """Called right before the scan starts on all devices automatically.""" scan_msg: ScanStatusMessage = self.scan_info.msg - if scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == 'step': + if ( + scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == "step" + ): # TODO how to deal with fly scans? status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.ACQUIRING.value) status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.ACQUIRING.value) status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.ARMED.value) @@ -554,24 +575,23 @@ class Pilatus(PSIDeviceBase, ADBase): def on_trigger(self) -> DeviceStatus | None: """Called when the device is triggered.""" scan_msg: ScanStatusMessage = self.scan_info.msg - if scan_msg.scan_name in self.xas_xrd_scan_names: - return None - elif scan_msg.scan_type == 'step': - start_time = time.time() - logger.warning(f"Triggering image with num_captured {self.hdf.num_captured.get()}") - img_counter = self.hdf.num_captured.get() - status = CompareStatus(self.hdf.num_captured, img_counter + 1) - logger.warning(f"Triggering took image {time.time() - start_time:.3f} seconds") - self.trigger_shot.put(1) - self.cancel_on_stop(status) - return status - else: + if not scan_msg.scan_type == "step": return None + start_time = time.time() + img_counter = self.hdf.num_captured.get() + logger.debug(f"Triggering image with num_captured {img_counter}") + status = CompareStatus(self.hdf.num_captured, img_counter + 1) + logger.debug(f"Triggering took image {time.time() - start_time:.3f} seconds") + self.trigger_shot.put(1) + self.cancel_on_stop(status) + return status def _complete_callback(self, status: DeviceStatus): """Callback for when the device completes a scan.""" scan_msg: ScanStatusMessage = self.scan_info.msg - if scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == 'step': + if ( + scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == "step" + ): # TODO how to deal with fly scans? if status.success: status.device.file_event.put( file_path=status.device._full_path, # pylint: disable:protected-access @@ -592,19 +612,24 @@ class Pilatus(PSIDeviceBase, ADBase): def on_complete(self) -> DeviceStatus | None: """Called to inquire if a device has completed a scans.""" scan_msg: ScanStatusMessage = self.scan_info.msg - if scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == 'step': + if ( + scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == "step" + ): # TODO how to deal with fly scans? status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.DONE.value) status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value) status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.UNARMED.value) if self.scan_info.msg.scan_name in self.xas_xrd_scan_names: # For long scans, it can be that the mono will execute one cycle more, # meaning a few more XRD triggers will be sent - status_img_written = CompareStatus(self.hdf.num_captured, self.n_images, operation='>=') + status_img_written = CompareStatus( + self.hdf.num_captured, self.n_images, operation=">=" + ) else: status_img_written = CompareStatus(self.hdf.num_captured, self.n_images) status_img_written = CompareStatus(self.hdf.num_captured, self.n_images) status = AndStatusWithList( - device=self, status_list=[status_hdf, status_cam, status_img_written, status_cam_server] + device=self, + status_list=[status_hdf, status_cam, status_img_written, status_cam_server], ) status.add_callback(self._complete_callback) # Callback that writing was successful self.cancel_on_stop(status) diff --git a/debye_bec/devices/pilatus/utils.py b/debye_bec/devices/pilatus/utils.py new file mode 100644 index 0000000..d3f3de3 --- /dev/null +++ b/debye_bec/devices/pilatus/utils.py @@ -0,0 +1,238 @@ +"""Temporary utility module for Status Object implementations.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from ophyd import Device, DeviceStatus, StatusBase + + +class AndStatusWithList(DeviceStatus): + """ + Custom implementation of the AndStatus that combines the + option to add multiple statuses as a list, and in addition + allows for adding the Device as an object to access its + methods. + + Args""" + + def __init__( + self, + device: Device, + status_list: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus], + **kwargs, + ): + self.all_statuses = status_list if isinstance(status_list, list) else [status_list] + super().__init__(device=device, **kwargs) + self._trace_attributes["all"] = [st._trace_attributes for st in self.all_statuses] + + def inner(status): + with self._lock: + if self._externally_initiated_completion: + return + if self.done: # Return if status is already done.. It must be resolved already + return + + for st in self.all_statuses: + with st._lock: + if st.done and not st.success: + self.set_exception(st.exception()) # st._exception + return + + if all(st.done for st in self.all_statuses) and all( + st.success for st in self.all_statuses + ): + self.set_finished() + + for st in self.all_statuses: + with st._lock: + st.add_callback(inner) + + # TODO improve __repr__ and __str__ + def __repr__(self): + return "".format(self=self) + + def __str__(self): + return "".format(self=self) + + def __contains__(self, status: StatusBase | DeviceStatus) -> bool: + for child in self.all_statuses: + if child == status: + return True + if isinstance(child, AndStatusWithList): + if status in child: + return True + + return False + + # TODO Check if this actually works.... + def set_exception(self, exc): + super().set_exception(exc) + # Propagate the exception to all sub-statuses that are not done yet. + with self._lock: + for st in self.all_statuses: + with st._lock: + if not st.done: + st.set_exception(exc) + + def _run_callbacks(self): + """ + Set the Event and run the callbacks. + """ + if self.timeout is None: + timeout = None + else: + timeout = self.timeout + self.settle_time + if not self._settled_event.wait(timeout): + self.log.warning("%r has timed out", self) + with self._externally_initiated_completion_lock: + if self._exception is None: + exc = TimeoutError( + f"AndStatus from device {self.device.name} failed to complete in specified timeout of {self.timeout + self.settle_time}." + ) + self._exception = exc + # Mark this as "settled". + try: + self._settled() + except Exception: + self.log.exception("%r encountered error during _settled()", self) + with self._lock: + self._event.set() + if self._exception is not None: + try: + self._handle_failure() + except Exception: + self.log.exception("%r encountered an error during _handle_failure()", self) + for cb in self._callbacks: + try: + cb(self) + except Exception: + self.log.exception( + "An error was raised on a background thread while " + "running the callback %r(%r).", + cb, + self, + ) + self._callbacks.clear() + + +class AndStatus(StatusBase): + """Custom AndStatus for TimePix detector.""" + + def __init__( + self, + left: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None, + name: str | Device | None = None, + right: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None = None, + **kwargs, + ): + self.left = left if isinstance(left, list) else [left] + if right is not None: + self.right = right if isinstance(right, list) else [right] + else: + self.right = [] + self.all_statuses = self.left + self.right + if name is None: + name = "unname_status" + elif isinstance(name, Device): + name = name.name + else: + name = name + self.name = name + super().__init__(**kwargs) + self._trace_attributes["left"] = [st._trace_attributes for st in self.left] + self._trace_attributes["right"] = [st._trace_attributes for st in self.right] + + def inner(status): + with self._lock: + if self._externally_initiated_completion: + return + if self.done: # Return if status is already done.. It must be resolved already + return + + for st in self.all_statuses: + with st._lock: + if st.done and not st.success: + self.set_exception(st.exception()) # st._exception + return + + if all(st.done for st in self.all_statuses) and all( + st.success for st in self.all_statuses + ): + self.set_finished() + + for st in self.all_statuses: + with st._lock: + st.add_callback(inner) + + def __repr__(self): + return "({self.left!r} & {self.right!r})".format(self=self) + + def __str__(self): + return "{0}(done={1.done}, " "success={1.success})" "".format(self.__class__.__name__, self) + + def __contains__(self, status: StatusBase) -> bool: + for child in [self.left, self.right]: + if child == status: + return True + if isinstance(child, AndStatus): + if status in child: + return True + + return False + + def _run_callbacks(self): + """ + Set the Event and run the callbacks. + """ + if self.timeout is None: + timeout = None + else: + timeout = self.timeout + self.settle_time + if not self._settled_event.wait(timeout): + # We have timed out. It's possible that set_finished() has already + # been called but we got here before the settle_time timer expired. + # And it's possible that in this space be between the above + # statement timing out grabbing the lock just below, + # set_exception(exc) has been called. Both of these possibilties + # are accounted for. + self.log.warning("%r has timed out", self) + with self._externally_initiated_completion_lock: + # Set the exception and mark the Status as done, unless + # set_exception(exc) was called externally before we grabbed + # the lock. + if self._exception is None: + exc = TimeoutError( + f"Status with name {self.name} failed to complete in specified timeout of {self.timeout + self.settle_time}." + ) + self._exception = exc + # Mark this as "settled". + try: + self._settled() + except Exception: + # No alternative but to log this. We can't supersede set_exception, + # and we have to continue and run the callbacks. + self.log.exception("%r encountered error during _settled()", self) + # Now we know whether or not we have succeed or failed, either by + # timeout above or by set_exception(exc), so we can set the Event that + # will mark this Status as done. + with self._lock: + self._event.set() + if self._exception is not None: + try: + self._handle_failure() + except Exception: + self.log.exception("%r encountered an error during _handle_failure()", self) + # The callbacks have access to self, from which they can distinguish + # success or failure. + for cb in self._callbacks: + try: + cb(self) + except Exception: + self.log.exception( + "An error was raised on a background thread while " + "running the callback %r(%r).", + cb, + self, + ) + self._callbacks.clear()