refactor(pilatus): Cleanupt PIlatus integration
This commit is contained in:
@@ -17,16 +17,12 @@ from ophyd.areadetector.cam import ADBase, PilatusDetectorCam
|
||||
from ophyd.areadetector.plugins import HDF5Plugin_V22 as HDF5Plugin
|
||||
from ophyd.areadetector.plugins import ImagePlugin_V22 as ImagePlugin
|
||||
from ophyd.status import WaitTimeoutError
|
||||
from ophyd_devices import (
|
||||
AndStatusWithList,
|
||||
CompareStatus,
|
||||
DeviceStatus,
|
||||
FileEventSignal,
|
||||
PreviewSignal,
|
||||
)
|
||||
from ophyd_devices import CompareStatus, DeviceStatus, FileEventSignal, PreviewSignal
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from debye_bec.devices.pilatus.utils import AndStatusWithList
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_lib.messages import DevicePreviewMessage, ScanStatusMessage
|
||||
@@ -83,19 +79,20 @@ class TRIGGERMODE(int, enum.Enum):
|
||||
MULT_TRIGGER = 3
|
||||
ALIGNMENT = 4
|
||||
|
||||
|
||||
class MONOTRIGGERSOURCE(int, enum.Enum):
|
||||
""""Mono XRD trigger source"""
|
||||
""" "Mono XRD trigger source"""
|
||||
|
||||
EPICS = 0
|
||||
INPOS = 1
|
||||
|
||||
|
||||
class MONOTRIGGERMODE(int, enum.Enum):
|
||||
""""Mono XRD trigger mode"""
|
||||
""" "Mono XRD trigger mode"""
|
||||
|
||||
PULSE = 0
|
||||
CONDITION = 1
|
||||
|
||||
|
||||
def description(self) -> str:
|
||||
"""Return a description of the trigger mode."""
|
||||
descriptions = {
|
||||
@@ -109,7 +106,6 @@ class MONOTRIGGERMODE(int, enum.Enum):
|
||||
|
||||
def __str__(self):
|
||||
return self.description()
|
||||
|
||||
|
||||
|
||||
class ScanParameter(BaseModel):
|
||||
@@ -217,12 +213,12 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
@property
|
||||
def baseline_signals(self):
|
||||
"""Define baseline signals"""
|
||||
return[
|
||||
return [
|
||||
self.cam.acquire_time,
|
||||
self.cam.num_exposures,
|
||||
self.cam.threshold_energy,
|
||||
self.cam.gain_menu,
|
||||
self.cam.pixel_cut_off
|
||||
self.cam.pixel_cut_off,
|
||||
]
|
||||
|
||||
def __init__(
|
||||
@@ -246,10 +242,7 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
)
|
||||
self._poll_thread_kill_event = threading.Event()
|
||||
self._poll_rate = 1 # Poll rate in Hz
|
||||
self.xas_xrd_scan_names = [
|
||||
"xas_simple_scan_with_xrd",
|
||||
"xas_advanced_scan_with_xrd",
|
||||
]
|
||||
self.xas_xrd_scan_names = ["xas_simple_scan_with_xrd", "xas_advanced_scan_with_xrd"]
|
||||
self.n_images = None
|
||||
# self._live_mode_thread = threading.Thread(
|
||||
# target=self._live_mode_loop, daemon=True, name=f"{self.name}_live_mode_thread"
|
||||
@@ -377,6 +370,69 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
)
|
||||
return status
|
||||
|
||||
def _calculate_trigger(self, scan_msg: ScanStatusMessage):
|
||||
self._update_scan_parameter()
|
||||
total_osc = 0
|
||||
total_trig_lo = 0
|
||||
total_trig_hi = 0
|
||||
calc_duration = 0
|
||||
n_trig_lo = 1
|
||||
n_trig_hi = 1
|
||||
init_lo = 1
|
||||
init_hi = 1
|
||||
lo_done = 0
|
||||
hi_done = 0
|
||||
if not self.scan_parameter.break_enable_low:
|
||||
lo_done = 1
|
||||
if not self.scan_parameter.break_enable_high:
|
||||
hi_done = 1
|
||||
start_time = time.time()
|
||||
while True:
|
||||
# TODO, we should not use infinite loops, for now let's add the escape Timeout of 20s, but should eventually be reviewed.
|
||||
if time.time() - start_time > 20:
|
||||
raise RuntimeError(
|
||||
f"Calculating the number of triggers for scan {scan_msg.scan_name} took more than 20 seconds, aborting."
|
||||
)
|
||||
total_osc = total_osc + 2
|
||||
calc_duration = calc_duration + 2 * self.scan_parameter.scan_time
|
||||
|
||||
if self.scan_parameter.break_enable_low and n_trig_lo >= self.scan_parameter.cycle_low:
|
||||
n_trig_lo = 1
|
||||
calc_duration = calc_duration + self.scan_parameter.break_time_low
|
||||
if init_lo:
|
||||
lo_done = 1
|
||||
init_lo = 0
|
||||
else:
|
||||
n_trig_lo += 1
|
||||
|
||||
if (
|
||||
self.scan_parameter.break_enable_high
|
||||
and n_trig_hi >= self.scan_parameter.cycle_high
|
||||
):
|
||||
n_trig_hi = 1
|
||||
calc_duration = calc_duration + self.scan_parameter.break_time_high
|
||||
if init_hi:
|
||||
hi_done = 1
|
||||
init_hi = 0
|
||||
else:
|
||||
n_trig_hi += 1
|
||||
|
||||
if lo_done and hi_done:
|
||||
n = np.floor(self.scan_parameter.scan_duration / calc_duration)
|
||||
total_osc = total_osc * n
|
||||
if self.scan_parameter.break_enable_low:
|
||||
total_trig_lo = n + 1
|
||||
if self.scan_parameter.break_enable_high:
|
||||
total_trig_hi = n + 1
|
||||
calc_duration = calc_duration * n
|
||||
lo_done = 0
|
||||
hi_done = 0
|
||||
|
||||
if calc_duration >= self.scan_parameter.scan_duration:
|
||||
break
|
||||
|
||||
return total_trig_lo + total_trig_hi
|
||||
|
||||
########################################
|
||||
# Beamline Specific Implementations #
|
||||
########################################
|
||||
@@ -427,84 +483,45 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
(self.scan_info.msg) object.
|
||||
"""
|
||||
# self.stop_live_mode() # Make sure that live mode is stopped if scan runs
|
||||
self._update_scan_parameter()
|
||||
|
||||
scan_msg: ScanStatusMessage = self.scan_info.msg
|
||||
if scan_msg.scan_name in self.xas_xrd_scan_names:
|
||||
total_osc = 0
|
||||
total_trig_lo = 0
|
||||
total_trig_hi = 0
|
||||
calc_duration = 0
|
||||
n_trig_lo = 1
|
||||
n_trig_hi = 1
|
||||
init_lo = 1
|
||||
init_hi = 1
|
||||
lo_done = 0
|
||||
hi_done = 0
|
||||
if not self.scan_parameter.break_enable_low:
|
||||
lo_done = 1
|
||||
if not self.scan_parameter.break_enable_high:
|
||||
hi_done = 1
|
||||
while True:
|
||||
total_osc = total_osc + 2
|
||||
calc_duration = calc_duration + 2 * self.scan_parameter.scan_time
|
||||
|
||||
if self.scan_parameter.break_enable_low and n_trig_lo >= self.scan_parameter.cycle_low:
|
||||
n_trig_lo = 1
|
||||
calc_duration = calc_duration + self.scan_parameter.break_time_low
|
||||
if init_lo:
|
||||
lo_done = 1
|
||||
init_lo = 0
|
||||
else:
|
||||
n_trig_lo += 1
|
||||
|
||||
if self.scan_parameter.break_enable_high and n_trig_hi >= self.scan_parameter.cycle_high:
|
||||
n_trig_hi = 1
|
||||
calc_duration = calc_duration + self.scan_parameter.break_time_high
|
||||
if init_hi:
|
||||
hi_done = 1
|
||||
init_hi = 0
|
||||
else:
|
||||
n_trig_hi += 1
|
||||
|
||||
if lo_done and hi_done:
|
||||
n = np.floor(self.scan_parameter.scan_duration / calc_duration)
|
||||
total_osc = total_osc * n
|
||||
if self.scan_parameter.break_enable_low:
|
||||
total_trig_lo = n + 1
|
||||
if self.scan_parameter.break_enable_high:
|
||||
total_trig_hi = n + 1
|
||||
calc_duration = calc_duration * n
|
||||
lo_done = 0
|
||||
hi_done = 0
|
||||
|
||||
if calc_duration >= self.scan_parameter.scan_duration:
|
||||
break
|
||||
|
||||
# logger.info(f'total_osc: {total_osc}')
|
||||
# logger.info(f'total trig low: {total_trig_lo}')
|
||||
# logger.info(f'total trig high: {total_trig_hi}')
|
||||
|
||||
self._update_scan_parameter()
|
||||
# Compute number of triggers
|
||||
total_trig_lo, total_trig_hi = self._calculate_trigger(scan_msg)
|
||||
# Set the number of images, we may also set this to a higher values if preferred and stop the acquisition
|
||||
# TODO This logic is prone to errors, as we rely on the scans to nicely resolve to n_images. We should
|
||||
# use here instead a way of settings the n_images independently of the scan parameters to avoid running out of sync
|
||||
# with the complete method. Ideally we comput them in the scan itself.. This is much safer IMO!
|
||||
self.n_images = (total_trig_lo + total_trig_hi) * self.scan_parameter.n_of_trigger
|
||||
exp_time = self.scan_parameter.exp_time
|
||||
self.trigger_source.set(MONOTRIGGERSOURCE.INPOS).wait(5)
|
||||
self.trigger_n_of.set(self.scan_parameter.n_of_trigger).wait(5)
|
||||
|
||||
elif scan_msg.scan_type == 'step':
|
||||
self.n_images = scan_msg.num_points * scan_msg.scan_parameters.get("frames_per_trigger", 1)
|
||||
elif scan_msg.scan_type == "step":
|
||||
self.n_images = scan_msg.num_points * scan_msg.scan_parameters.get(
|
||||
"frames_per_trigger", 1
|
||||
)
|
||||
exp_time = scan_msg.scan_parameters.get("exp_time")
|
||||
self.trigger_source.set(MONOTRIGGERSOURCE.EPICS).wait(5)
|
||||
self.trigger_n_of.set(1).wait(5) # BEC will trigger each acquisition
|
||||
self.trigger_n_of.set(1).wait(5) # BEC will trigger each acquisition
|
||||
else:
|
||||
# TODO how to deal with fly scans?
|
||||
return None
|
||||
# Common settings
|
||||
self.trigger_mode.set(MONOTRIGGERMODE.PULSE).wait(5)
|
||||
self.trigger_period.set(exp_time).wait(5)
|
||||
self.trigger_pulse_length.set(0.005).wait(5) # Pulse length of 5 ms enough for Pilatus and NIDAQ
|
||||
self.trigger_pulse_length.set(0.005).wait(
|
||||
5
|
||||
) # Pulse length of 5 ms enough for Pilatus and NIDAQ
|
||||
|
||||
if exp_time - self._readout_time <= 0:
|
||||
raise ValueError((f"Exposure time {exp_time} is too short ",
|
||||
f"for Pilatus with readout_time {self._readout_time}."
|
||||
))
|
||||
raise ValueError(
|
||||
(
|
||||
f"Exposure time {exp_time} is too short ",
|
||||
f"for Pilatus with readout_time {self._readout_time}.",
|
||||
)
|
||||
)
|
||||
detector_exp_time = exp_time - self._readout_time
|
||||
self._full_path = get_full_path(scan_msg, name="pilatus")
|
||||
file_path = "/".join(self._full_path.split("/")[:-1])
|
||||
@@ -519,7 +536,9 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
self.cam.acquire_period.set(exp_time).wait(5)
|
||||
self.filter_number.set(0).wait(5)
|
||||
# HDF5 settings
|
||||
logger.debug(f"Setting HDF5 file path to {file_path} and file name to {file_name}. full_path is {self._full_path}")
|
||||
logger.debug(
|
||||
f"Setting HDF5 file path to {file_path} and file name to {file_name}. full_path is {self._full_path}"
|
||||
)
|
||||
self.hdf.file_path.set(file_path).wait(5)
|
||||
self.hdf.file_name.set(file_name).wait(5)
|
||||
self.hdf.num_capture.set(self.n_images).wait(5)
|
||||
@@ -537,7 +556,9 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
def on_pre_scan(self) -> DeviceStatus | None:
|
||||
"""Called right before the scan starts on all devices automatically."""
|
||||
scan_msg: ScanStatusMessage = self.scan_info.msg
|
||||
if scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == 'step':
|
||||
if (
|
||||
scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == "step"
|
||||
): # TODO how to deal with fly scans?
|
||||
status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.ACQUIRING.value)
|
||||
status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.ACQUIRING.value)
|
||||
status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.ARMED.value)
|
||||
@@ -554,24 +575,23 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
def on_trigger(self) -> DeviceStatus | None:
|
||||
"""Called when the device is triggered."""
|
||||
scan_msg: ScanStatusMessage = self.scan_info.msg
|
||||
if scan_msg.scan_name in self.xas_xrd_scan_names:
|
||||
return None
|
||||
elif scan_msg.scan_type == 'step':
|
||||
start_time = time.time()
|
||||
logger.warning(f"Triggering image with num_captured {self.hdf.num_captured.get()}")
|
||||
img_counter = self.hdf.num_captured.get()
|
||||
status = CompareStatus(self.hdf.num_captured, img_counter + 1)
|
||||
logger.warning(f"Triggering took image {time.time() - start_time:.3f} seconds")
|
||||
self.trigger_shot.put(1)
|
||||
self.cancel_on_stop(status)
|
||||
return status
|
||||
else:
|
||||
if not scan_msg.scan_type == "step":
|
||||
return None
|
||||
start_time = time.time()
|
||||
img_counter = self.hdf.num_captured.get()
|
||||
logger.debug(f"Triggering image with num_captured {img_counter}")
|
||||
status = CompareStatus(self.hdf.num_captured, img_counter + 1)
|
||||
logger.debug(f"Triggering took image {time.time() - start_time:.3f} seconds")
|
||||
self.trigger_shot.put(1)
|
||||
self.cancel_on_stop(status)
|
||||
return status
|
||||
|
||||
def _complete_callback(self, status: DeviceStatus):
|
||||
"""Callback for when the device completes a scan."""
|
||||
scan_msg: ScanStatusMessage = self.scan_info.msg
|
||||
if scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == 'step':
|
||||
if (
|
||||
scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == "step"
|
||||
): # TODO how to deal with fly scans?
|
||||
if status.success:
|
||||
status.device.file_event.put(
|
||||
file_path=status.device._full_path, # pylint: disable:protected-access
|
||||
@@ -592,19 +612,24 @@ class Pilatus(PSIDeviceBase, ADBase):
|
||||
def on_complete(self) -> DeviceStatus | None:
|
||||
"""Called to inquire if a device has completed a scans."""
|
||||
scan_msg: ScanStatusMessage = self.scan_info.msg
|
||||
if scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == 'step':
|
||||
if (
|
||||
scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == "step"
|
||||
): # TODO how to deal with fly scans?
|
||||
status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.DONE.value)
|
||||
status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value)
|
||||
status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.UNARMED.value)
|
||||
if self.scan_info.msg.scan_name in self.xas_xrd_scan_names:
|
||||
# For long scans, it can be that the mono will execute one cycle more,
|
||||
# meaning a few more XRD triggers will be sent
|
||||
status_img_written = CompareStatus(self.hdf.num_captured, self.n_images, operation='>=')
|
||||
status_img_written = CompareStatus(
|
||||
self.hdf.num_captured, self.n_images, operation=">="
|
||||
)
|
||||
else:
|
||||
status_img_written = CompareStatus(self.hdf.num_captured, self.n_images)
|
||||
status_img_written = CompareStatus(self.hdf.num_captured, self.n_images)
|
||||
status = AndStatusWithList(
|
||||
device=self, status_list=[status_hdf, status_cam, status_img_written, status_cam_server]
|
||||
device=self,
|
||||
status_list=[status_hdf, status_cam, status_img_written, status_cam_server],
|
||||
)
|
||||
status.add_callback(self._complete_callback) # Callback that writing was successful
|
||||
self.cancel_on_stop(status)
|
||||
|
||||
238
debye_bec/devices/pilatus/utils.py
Normal file
238
debye_bec/devices/pilatus/utils.py
Normal file
@@ -0,0 +1,238 @@
|
||||
"""Temporary utility module for Status Object implementations."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from ophyd import Device, DeviceStatus, StatusBase
|
||||
|
||||
|
||||
class AndStatusWithList(DeviceStatus):
|
||||
"""
|
||||
Custom implementation of the AndStatus that combines the
|
||||
option to add multiple statuses as a list, and in addition
|
||||
allows for adding the Device as an object to access its
|
||||
methods.
|
||||
|
||||
Args"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
device: Device,
|
||||
status_list: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus],
|
||||
**kwargs,
|
||||
):
|
||||
self.all_statuses = status_list if isinstance(status_list, list) else [status_list]
|
||||
super().__init__(device=device, **kwargs)
|
||||
self._trace_attributes["all"] = [st._trace_attributes for st in self.all_statuses]
|
||||
|
||||
def inner(status):
|
||||
with self._lock:
|
||||
if self._externally_initiated_completion:
|
||||
return
|
||||
if self.done: # Return if status is already done.. It must be resolved already
|
||||
return
|
||||
|
||||
for st in self.all_statuses:
|
||||
with st._lock:
|
||||
if st.done and not st.success:
|
||||
self.set_exception(st.exception()) # st._exception
|
||||
return
|
||||
|
||||
if all(st.done for st in self.all_statuses) and all(
|
||||
st.success for st in self.all_statuses
|
||||
):
|
||||
self.set_finished()
|
||||
|
||||
for st in self.all_statuses:
|
||||
with st._lock:
|
||||
st.add_callback(inner)
|
||||
|
||||
# TODO improve __repr__ and __str__
|
||||
def __repr__(self):
|
||||
return "<AndStatusWithList({self.all_statuses!r})>".format(self=self)
|
||||
|
||||
def __str__(self):
|
||||
return "<AndStatusWithList(done={self.done}, success={self.success})>".format(self=self)
|
||||
|
||||
def __contains__(self, status: StatusBase | DeviceStatus) -> bool:
|
||||
for child in self.all_statuses:
|
||||
if child == status:
|
||||
return True
|
||||
if isinstance(child, AndStatusWithList):
|
||||
if status in child:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
# TODO Check if this actually works....
|
||||
def set_exception(self, exc):
|
||||
super().set_exception(exc)
|
||||
# Propagate the exception to all sub-statuses that are not done yet.
|
||||
with self._lock:
|
||||
for st in self.all_statuses:
|
||||
with st._lock:
|
||||
if not st.done:
|
||||
st.set_exception(exc)
|
||||
|
||||
def _run_callbacks(self):
|
||||
"""
|
||||
Set the Event and run the callbacks.
|
||||
"""
|
||||
if self.timeout is None:
|
||||
timeout = None
|
||||
else:
|
||||
timeout = self.timeout + self.settle_time
|
||||
if not self._settled_event.wait(timeout):
|
||||
self.log.warning("%r has timed out", self)
|
||||
with self._externally_initiated_completion_lock:
|
||||
if self._exception is None:
|
||||
exc = TimeoutError(
|
||||
f"AndStatus from device {self.device.name} failed to complete in specified timeout of {self.timeout + self.settle_time}."
|
||||
)
|
||||
self._exception = exc
|
||||
# Mark this as "settled".
|
||||
try:
|
||||
self._settled()
|
||||
except Exception:
|
||||
self.log.exception("%r encountered error during _settled()", self)
|
||||
with self._lock:
|
||||
self._event.set()
|
||||
if self._exception is not None:
|
||||
try:
|
||||
self._handle_failure()
|
||||
except Exception:
|
||||
self.log.exception("%r encountered an error during _handle_failure()", self)
|
||||
for cb in self._callbacks:
|
||||
try:
|
||||
cb(self)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"An error was raised on a background thread while "
|
||||
"running the callback %r(%r).",
|
||||
cb,
|
||||
self,
|
||||
)
|
||||
self._callbacks.clear()
|
||||
|
||||
|
||||
class AndStatus(StatusBase):
|
||||
"""Custom AndStatus for TimePix detector."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
left: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None,
|
||||
name: str | Device | None = None,
|
||||
right: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
self.left = left if isinstance(left, list) else [left]
|
||||
if right is not None:
|
||||
self.right = right if isinstance(right, list) else [right]
|
||||
else:
|
||||
self.right = []
|
||||
self.all_statuses = self.left + self.right
|
||||
if name is None:
|
||||
name = "unname_status"
|
||||
elif isinstance(name, Device):
|
||||
name = name.name
|
||||
else:
|
||||
name = name
|
||||
self.name = name
|
||||
super().__init__(**kwargs)
|
||||
self._trace_attributes["left"] = [st._trace_attributes for st in self.left]
|
||||
self._trace_attributes["right"] = [st._trace_attributes for st in self.right]
|
||||
|
||||
def inner(status):
|
||||
with self._lock:
|
||||
if self._externally_initiated_completion:
|
||||
return
|
||||
if self.done: # Return if status is already done.. It must be resolved already
|
||||
return
|
||||
|
||||
for st in self.all_statuses:
|
||||
with st._lock:
|
||||
if st.done and not st.success:
|
||||
self.set_exception(st.exception()) # st._exception
|
||||
return
|
||||
|
||||
if all(st.done for st in self.all_statuses) and all(
|
||||
st.success for st in self.all_statuses
|
||||
):
|
||||
self.set_finished()
|
||||
|
||||
for st in self.all_statuses:
|
||||
with st._lock:
|
||||
st.add_callback(inner)
|
||||
|
||||
def __repr__(self):
|
||||
return "({self.left!r} & {self.right!r})".format(self=self)
|
||||
|
||||
def __str__(self):
|
||||
return "{0}(done={1.done}, " "success={1.success})" "".format(self.__class__.__name__, self)
|
||||
|
||||
def __contains__(self, status: StatusBase) -> bool:
|
||||
for child in [self.left, self.right]:
|
||||
if child == status:
|
||||
return True
|
||||
if isinstance(child, AndStatus):
|
||||
if status in child:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _run_callbacks(self):
|
||||
"""
|
||||
Set the Event and run the callbacks.
|
||||
"""
|
||||
if self.timeout is None:
|
||||
timeout = None
|
||||
else:
|
||||
timeout = self.timeout + self.settle_time
|
||||
if not self._settled_event.wait(timeout):
|
||||
# We have timed out. It's possible that set_finished() has already
|
||||
# been called but we got here before the settle_time timer expired.
|
||||
# And it's possible that in this space be between the above
|
||||
# statement timing out grabbing the lock just below,
|
||||
# set_exception(exc) has been called. Both of these possibilties
|
||||
# are accounted for.
|
||||
self.log.warning("%r has timed out", self)
|
||||
with self._externally_initiated_completion_lock:
|
||||
# Set the exception and mark the Status as done, unless
|
||||
# set_exception(exc) was called externally before we grabbed
|
||||
# the lock.
|
||||
if self._exception is None:
|
||||
exc = TimeoutError(
|
||||
f"Status with name {self.name} failed to complete in specified timeout of {self.timeout + self.settle_time}."
|
||||
)
|
||||
self._exception = exc
|
||||
# Mark this as "settled".
|
||||
try:
|
||||
self._settled()
|
||||
except Exception:
|
||||
# No alternative but to log this. We can't supersede set_exception,
|
||||
# and we have to continue and run the callbacks.
|
||||
self.log.exception("%r encountered error during _settled()", self)
|
||||
# Now we know whether or not we have succeed or failed, either by
|
||||
# timeout above or by set_exception(exc), so we can set the Event that
|
||||
# will mark this Status as done.
|
||||
with self._lock:
|
||||
self._event.set()
|
||||
if self._exception is not None:
|
||||
try:
|
||||
self._handle_failure()
|
||||
except Exception:
|
||||
self.log.exception("%r encountered an error during _handle_failure()", self)
|
||||
# The callbacks have access to self, from which they can distinguish
|
||||
# success or failure.
|
||||
for cb in self._callbacks:
|
||||
try:
|
||||
cb(self)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"An error was raised on a background thread while "
|
||||
"running the callback %r(%r).",
|
||||
cb,
|
||||
self,
|
||||
)
|
||||
self._callbacks.clear()
|
||||
Reference in New Issue
Block a user