feat: add timeout functionality to ophyd devices

This commit is contained in:
appel_c 2023-09-07 14:04:32 +02:00
parent e48f4a305e
commit c80d9ab29b
4 changed files with 121 additions and 59 deletions

View File

@ -24,6 +24,10 @@ class EigerError(Exception):
pass pass
class EigerTimeoutError(Exception):
pass
class SlsDetectorCam(Device): class SlsDetectorCam(Device):
detector_type = ADCpt(EpicsSignalRO, "DetectorType_RBV") detector_type = ADCpt(EpicsSignalRO, "DetectorType_RBV")
setting = ADCpt(EpicsSignalWithRBV, "Setting") setting = ADCpt(EpicsSignalWithRBV, "Setting")
@ -262,6 +266,8 @@ class Eiger9mCsaxs(DetectorBase):
def stage(self) -> List[object]: def stage(self) -> List[object]:
"""stage the detector and file writer""" """stage the detector and file writer"""
self._stopped = False
self._acquisition_done = False
self.scaninfo.load_scan_metadata() self.scaninfo.load_scan_metadata()
self.mokev = self.device_manager.devices.mokev.obj.read()[ self.mokev = self.device_manager.devices.mokev.obj.read()[
self.device_manager.devices.mokev.name self.device_manager.devices.mokev.name
@ -297,26 +303,8 @@ class Eiger9mCsaxs(DetectorBase):
def unstage(self) -> List[object]: def unstage(self) -> List[object]:
"""unstage the detector and file writer""" """unstage the detector and file writer"""
logger.info("Waiting for Eiger9M to return from acquisition") logger.info("Waiting for Eiger9M to finish")
while True: self._eiger9M_finished()
det_ctrl = self.cam.acquire.read()[self.cam.acquire.name]["value"]
if det_ctrl == 0:
break
if self._stopped == True:
break
time.sleep(0.005)
logger.info("Eiger9M finished")
logger.info("Waiting for std daq to receive images")
while True:
det_ctrl = self.std_client.get_status()["acquisition"]["state"]
# TODO if no writing was performed before
if det_ctrl == "FINISHED":
break
if self._stopped == True:
break
time.sleep(0.005)
logger.info("Std_daq finished")
# Message to BEC # Message to BEC
state = True state = True
@ -326,8 +314,30 @@ class Eiger9mCsaxs(DetectorBase):
msg.dumps(), msg.dumps(),
) )
self._stopped = False self._stopped = False
logger.info("Eiger9M finished")
return super().unstage() return super().unstage()
def _eiger9M_finished(self):
"""Function with 10s timeout"""
timer = 0
while True:
det_ctrl = self.cam.acquire.read()[self.cam.acquire.name]["value"]
std_ctrl = self.std_client.get_status()["acquisition"]["state"]
status = self.std_client.get_status()
received_frames = status["acquisition"]["stats"]["n_write_completed"]
total_frames = int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger)
# TODO if no writing was performed before
if det_ctrl == 0 and std_ctrl == "FINISHED" and total_frames == received_frames:
break
if self._stopped == True:
break
time.sleep(0.1)
timer += 0.1
if timer > 5:
raise EigerTimeoutError(
f"Reached timeout with detector state {det_ctrl}, std_daq state {std_ctrl} and received frames of {received_frames} for the file writer"
)
def arm_acquisition(self) -> None: def arm_acquisition(self) -> None:
"""Start acquisition in software trigger mode, """Start acquisition in software trigger mode,
or arm the detector in hardware of the detector or arm the detector in hardware of the detector

View File

@ -21,6 +21,10 @@ class FalconError(Exception):
pass pass
class FalconTimeoutError(Exception):
pass
class DetectorState(int, enum.Enum): class DetectorState(int, enum.Enum):
DONE = 0 DONE = 0
ACQUIRING = 1 ACQUIRING = 1
@ -61,7 +65,6 @@ class FalconHDF5Plugins(Device): # HDF5Plugin_V21, FilePlugin_V22):
file_template = Cpt(EpicsSignalWithRBV, "FileTemplate", string=True, kind="config") file_template = Cpt(EpicsSignalWithRBV, "FileTemplate", string=True, kind="config")
num_capture = Cpt(EpicsSignalWithRBV, "NumCapture", kind="config") num_capture = Cpt(EpicsSignalWithRBV, "NumCapture", kind="config")
file_write_mode = Cpt(EpicsSignalWithRBV, "FileWriteMode", kind="config") file_write_mode = Cpt(EpicsSignalWithRBV, "FileWriteMode", kind="config")
capture = Cpt(EpicsSignalWithRBV, "Capture")
class FalconCsaxs(Device): class FalconCsaxs(Device):
@ -196,7 +199,7 @@ class FalconCsaxs(Device):
def stage(self) -> List[object]: def stage(self) -> List[object]:
"""stage the detector and file writer""" """stage the detector and file writer"""
# TODO clean up needed? # TODO clean up needed?
# self._clean_up() self._stopped = False
self.scaninfo.load_scan_metadata() self.scaninfo.load_scan_metadata()
self.mokev = self.device_manager.devices.mokev.obj.read()[ self.mokev = self.device_manager.devices.mokev.obj.read()[
self.device_manager.devices.mokev.name self.device_manager.devices.mokev.name
@ -224,7 +227,6 @@ class FalconCsaxs(Device):
break break
time.sleep(0.005) time.sleep(0.005)
logger.info("Falcon is armed") logger.info("Falcon is armed")
self._stopped = False
return super().stage() return super().stage()
def arm_acquisition(self) -> None: def arm_acquisition(self) -> None:
@ -232,15 +234,9 @@ class FalconCsaxs(Device):
def unstage(self) -> List[object]: def unstage(self) -> List[object]:
logger.info("Waiting for Falcon to return from acquisition") logger.info("Waiting for Falcon to return from acquisition")
while True: if self._stopped:
det_ctrl = self.state.read()[self.state.name]["value"] return super().unstage()
if det_ctrl == int(DetectorState.DONE): self._falcon_finished()
break
if self._stopped == True:
break
time.sleep(0.005)
logger.info("Falcon done")
# TODO needed?
self._clean_up() self._clean_up()
state = True state = True
msg = BECMessage.FileMessage(file_path=self.destination_path, done=True, successful=state) msg = BECMessage.FileMessage(file_path=self.destination_path, done=True, successful=state)
@ -249,8 +245,29 @@ class FalconCsaxs(Device):
msg.dumps(), msg.dumps(),
) )
self._stopped = False self._stopped = False
logger.info("Falcon done")
return super().unstage() return super().unstage()
def _falcon_finished(self):
"""Function with 10s timeout"""
timer = 0
while True:
det_ctrl = self.acquiring.get()
writer_ctrl = self.hdf5.capture.get()
received_frames = self.dxp.current_pixel.get()
total_frames = int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger)
# TODO if no writing was performed before
if det_ctrl == 0 and writer_ctrl == 0 and total_frames == received_frames:
break
if self._stopped == True:
break
time.sleep(0.1)
timer += 0.1
if timer > 5:
raise FalconTimeoutError(
f"Reached timeout with detector state {det_ctrl}, std_daq state {writer_ctrl} and received frames of {received_frames} for the file writer"
)
def stop(self, *, success=False) -> None: def stop(self, *, success=False) -> None:
"""Stop the scan, with camera and file writer""" """Stop the scan, with camera and file writer"""
self._clean_up() self._clean_up()

View File

@ -21,7 +21,11 @@ from bec_lib.core import bec_logger, threadlocked
logger = bec_logger.logger logger = bec_logger.logger
class MCSError(Exception): class McsError(Exception):
pass
class McsTimeoutError(Exception):
pass pass
@ -169,7 +173,7 @@ class McsCsaxs(SIS38XX):
**kwargs, **kwargs,
) )
if device_manager is None and not sim_mode: if device_manager is None and not sim_mode:
raise MCSError("Add DeviceManager to initialization or init with sim_mode=True") raise McsError("Add DeviceManager to initialization or init with sim_mode=True")
self.name = name self.name = name
self._stream_ttl = 1800 self._stream_ttl = 1800
@ -236,13 +240,6 @@ class McsCsaxs(SIS38XX):
self.mca_data[obj.attr_name] = kwargs["value"][1:] self.mca_data[obj.attr_name] = kwargs["value"][1:]
if len(self.mca_names) != len(self.mca_data): if len(self.mca_names) != len(self.mca_data):
return return
# ref_entry = self.mca_data[self.mca_names[0]]
# if not ref_entry:
# self.mca_data = defaultdict(lambda: [])
# return
# if isinstance(ref_entry, list) and (ref_entry > 0):
# return
self._updated = True self._updated = True
self.counter += 1 self.counter += 1
if (self.scaninfo.scan_type == "fly" and self.counter == self.num_lines.get()) or ( if (self.scaninfo.scan_type == "fly" and self.counter == self.num_lines.get()) or (
@ -293,9 +290,9 @@ class McsCsaxs(SIS38XX):
elif self.scaninfo.scan_type == "fly": elif self.scaninfo.scan_type == "fly":
self.n_points = int(self.scaninfo.num_points / int(self.num_lines.get()) + 1) self.n_points = int(self.scaninfo.num_points / int(self.num_lines.get()) + 1)
else: else:
raise MCSError(f"Scantype {self.scaninfo} not implemented for MCS card") raise McsError(f"Scantype {self.scaninfo} not implemented for MCS card")
if self.n_points > 10000: if self.n_points > 10000:
raise MCSError( raise McsError(
f"Requested number of points N={self.n_points} exceeds hardware limit of mcs card 10000 (N-1)" f"Requested number of points N={self.n_points} exceeds hardware limit of mcs card 10000 (N-1)"
) )
self.num_use_all.set(self.n_points) self.num_use_all.set(self.n_points)
@ -320,6 +317,8 @@ class McsCsaxs(SIS38XX):
def stage(self) -> List[object]: def stage(self) -> List[object]:
"""stage the detector and file writer""" """stage the detector and file writer"""
self._stopped = False
self._acquisition_done = False
logger.info("Stage mcs") logger.info("Stage mcs")
self.scaninfo.load_scan_metadata() self.scaninfo.load_scan_metadata()
self._prep_det() self._prep_det()
@ -344,16 +343,30 @@ class McsCsaxs(SIS38XX):
def unstage(self) -> List[object]: def unstage(self) -> List[object]:
"""unstage""" """unstage"""
logger.info("Waiting for mcs to finish acquisition") logger.info("Waiting for mcs to finish acquisition")
while not self._acquisition_done: self._mcs_finished()
# monitor signal instead?
if self._stopped:
break
time.sleep(0.005)
self._acquisition_done = False self._acquisition_done = False
self._stopped = False self._stopped = False
logger.info("mcs done") logger.info("mcs done")
return super().unstage() return super().unstage()
def _mcs_finished(self):
"""Function with 10s timeout"""
timer = 0
while True:
if self._acquisition_done == True and self.acquiring.get() == 0:
break
if self._stopped == True:
break
time.sleep(0.1)
timer += 0.1
if timer > 5:
total_frames = self.counter * int(
self.scaninfo.num_points / self.num_lines.get()
) + max(self.current_channel.get() - 1, 0)
raise McsTimeoutError(
f"Reached timeout with mcs in state {self.acquiring.get()} and {total_frames} frames arriving at the mcs card"
)
def arm_acquisition(self) -> None: def arm_acquisition(self) -> None:
"""Arm acquisition """Arm acquisition
Options: Options:

View File

@ -25,6 +25,10 @@ class PilatusError(Exception):
pass pass
class PilatusTimeoutError(Exception):
pass
class TriggerSource(int, enum.Enum): class TriggerSource(int, enum.Enum):
INTERNAL = 0 INTERNAL = 0
EXT_ENABLE = 1 EXT_ENABLE = 1
@ -321,8 +325,8 @@ class PilatusCsaxs(DetectorBase):
def stage(self) -> List[object]: def stage(self) -> List[object]:
"""stage the detector and file writer""" """stage the detector and file writer"""
self._close_file_writer() self._acquisition_done = False
self._stop_file_writer() self._stopped = False
self.scaninfo.load_scan_metadata() self.scaninfo.load_scan_metadata()
self.mokev = self.device_manager.devices.mokev.obj.read()[ self.mokev = self.device_manager.devices.mokev.obj.read()[
self.device_manager.devices.mokev.name self.device_manager.devices.mokev.name
@ -337,7 +341,6 @@ class PilatusCsaxs(DetectorBase):
msg = BECMessage.FileMessage( msg = BECMessage.FileMessage(
file_path=self.filepath_h5, done=False, metadata={"input_path": self.destination_path} file_path=self.filepath_h5, done=False, metadata={"input_path": self.destination_path}
) )
return super().stage() return super().stage()
def pre_scan(self) -> None: def pre_scan(self) -> None:
@ -346,11 +349,10 @@ class PilatusCsaxs(DetectorBase):
def unstage(self) -> List[object]: def unstage(self) -> List[object]:
"""unstage the detector and file writer""" """unstage the detector and file writer"""
# Reset to software trigger # Reset to software trigger
self.triggermode = 0 logger.info("Waiting for Pilatus to return from acquisition")
# TODO if images are missing, consider adding delay if self._stopped:
self._close_file_writer() return super().unstage()
self._stop_file_writer() self._pilatus_finished()
# Only sent this out once data is written to disk since cbf to hdf5 converter will be triggered
msg = BECMessage.FileMessage( msg = BECMessage.FileMessage(
file_path=self.filepath_h5, done=True, metadata={"input_path": self.destination_path} file_path=self.filepath_h5, done=True, metadata={"input_path": self.destination_path}
) )
@ -365,16 +367,37 @@ class PilatusCsaxs(DetectorBase):
logger.info("Pilatus2 done") logger.info("Pilatus2 done")
return super().unstage() return super().unstage()
def _pilatus_finished(self) -> None:
timer = 0
while True:
rtr = self.cam.status_message_camserver
if self.cam.acquire.get() == 0 and rtr == "Camserver returned OK":
break
if self._stopped == True:
break
time.sleep(0.1)
timer += 0.1
if timer > 5:
self._close_file_writer()
self._stop_file_writer()
raise PilatusTimeoutError(
f"Pilatus timeout with detector state {self.cam.acquire.get()} and camserver return status: {rtr} "
)
self._close_file_writer()
self._stop_file_writer()
def acquire(self) -> None: def acquire(self) -> None:
"""Start acquisition in software trigger mode, """Start acquisition in software trigger mode,
or arm the detector in hardware of the detector or arm the detector in hardware of the detector
""" """
self.cam.acquire.set(1) self.cam.acquire.put(1)
def stop(self, *, success=False) -> None: def stop(self, *, success=False) -> None:
"""Stop the scan, with camera and file writer""" """Stop the scan, with camera and file writer"""
self.cam.acquire.set(0) self.cam.acquire.put(0)
self._stop_file_writer() self._stop_file_writer()
# TODO maybe needed
# self._close_file_writer()
# self.unstage() # self.unstage()
super().stop(success=success) super().stop(success=success)
self._stopped = True self._stopped = True
@ -383,4 +406,3 @@ class PilatusCsaxs(DetectorBase):
# Automatically connect to test environmenr if directly invoked # Automatically connect to test environmenr if directly invoked
if __name__ == "__main__": if __name__ == "__main__":
pilatus_2 = PilatusCsaxs(name="pilatus_2", prefix="X12SA-ES-PILATUS300K:", sim_mode=True) pilatus_2 = PilatusCsaxs(name="pilatus_2", prefix="X12SA-ES-PILATUS300K:", sim_mode=True)
pilatus_2.stage()