From c80d9ab29bcc85a46b59f3be8fb86b990c3ed299 Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 7 Sep 2023 14:04:32 +0200 Subject: [PATCH] feat: add timeout functionality to ophyd devices --- ophyd_devices/epics/devices/eiger9m_csaxs.py | 50 ++++++++++++-------- ophyd_devices/epics/devices/falcon_csaxs.py | 41 +++++++++++----- ophyd_devices/epics/devices/mcs_csaxs.py | 45 +++++++++++------- ophyd_devices/epics/devices/pilatus_csaxs.py | 44 ++++++++++++----- 4 files changed, 121 insertions(+), 59 deletions(-) diff --git a/ophyd_devices/epics/devices/eiger9m_csaxs.py b/ophyd_devices/epics/devices/eiger9m_csaxs.py index 7556a9b..3c70e64 100644 --- a/ophyd_devices/epics/devices/eiger9m_csaxs.py +++ b/ophyd_devices/epics/devices/eiger9m_csaxs.py @@ -24,6 +24,10 @@ class EigerError(Exception): pass +class EigerTimeoutError(Exception): + pass + + class SlsDetectorCam(Device): detector_type = ADCpt(EpicsSignalRO, "DetectorType_RBV") setting = ADCpt(EpicsSignalWithRBV, "Setting") @@ -262,6 +266,8 @@ class Eiger9mCsaxs(DetectorBase): def stage(self) -> List[object]: """stage the detector and file writer""" + self._stopped = False + self._acquisition_done = False self.scaninfo.load_scan_metadata() self.mokev = self.device_manager.devices.mokev.obj.read()[ self.device_manager.devices.mokev.name @@ -297,26 +303,8 @@ class Eiger9mCsaxs(DetectorBase): def unstage(self) -> List[object]: """unstage the detector and file writer""" - logger.info("Waiting for Eiger9M to return from acquisition") - while True: - det_ctrl = self.cam.acquire.read()[self.cam.acquire.name]["value"] - if det_ctrl == 0: - break - if self._stopped == True: - break - time.sleep(0.005) - logger.info("Eiger9M finished") - - logger.info("Waiting for std daq to receive images") - while True: - det_ctrl = self.std_client.get_status()["acquisition"]["state"] - # TODO if no writing was performed before - if det_ctrl == "FINISHED": - break - if self._stopped == True: - break - time.sleep(0.005) - logger.info("Std_daq finished") + logger.info("Waiting for Eiger9M to finish") + self._eiger9M_finished() # Message to BEC state = True @@ -326,8 +314,30 @@ class Eiger9mCsaxs(DetectorBase): msg.dumps(), ) self._stopped = False + logger.info("Eiger9M finished") return super().unstage() + def _eiger9M_finished(self): + """Function with 10s timeout""" + timer = 0 + while True: + det_ctrl = self.cam.acquire.read()[self.cam.acquire.name]["value"] + std_ctrl = self.std_client.get_status()["acquisition"]["state"] + status = self.std_client.get_status() + received_frames = status["acquisition"]["stats"]["n_write_completed"] + total_frames = int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger) + # TODO if no writing was performed before + if det_ctrl == 0 and std_ctrl == "FINISHED" and total_frames == received_frames: + break + if self._stopped == True: + break + time.sleep(0.1) + timer += 0.1 + if timer > 5: + raise EigerTimeoutError( + f"Reached timeout with detector state {det_ctrl}, std_daq state {std_ctrl} and received frames of {received_frames} for the file writer" + ) + def arm_acquisition(self) -> None: """Start acquisition in software trigger mode, or arm the detector in hardware of the detector diff --git a/ophyd_devices/epics/devices/falcon_csaxs.py b/ophyd_devices/epics/devices/falcon_csaxs.py index 319ce13..35084f0 100644 --- a/ophyd_devices/epics/devices/falcon_csaxs.py +++ b/ophyd_devices/epics/devices/falcon_csaxs.py @@ -21,6 +21,10 @@ class FalconError(Exception): pass +class FalconTimeoutError(Exception): + pass + + class DetectorState(int, enum.Enum): DONE = 0 ACQUIRING = 1 @@ -61,7 +65,6 @@ class FalconHDF5Plugins(Device): # HDF5Plugin_V21, FilePlugin_V22): file_template = Cpt(EpicsSignalWithRBV, "FileTemplate", string=True, kind="config") num_capture = Cpt(EpicsSignalWithRBV, "NumCapture", kind="config") file_write_mode = Cpt(EpicsSignalWithRBV, "FileWriteMode", kind="config") - capture = Cpt(EpicsSignalWithRBV, "Capture") class FalconCsaxs(Device): @@ -196,7 +199,7 @@ class FalconCsaxs(Device): def stage(self) -> List[object]: """stage the detector and file writer""" # TODO clean up needed? - # self._clean_up() + self._stopped = False self.scaninfo.load_scan_metadata() self.mokev = self.device_manager.devices.mokev.obj.read()[ self.device_manager.devices.mokev.name @@ -224,7 +227,6 @@ class FalconCsaxs(Device): break time.sleep(0.005) logger.info("Falcon is armed") - self._stopped = False return super().stage() def arm_acquisition(self) -> None: @@ -232,15 +234,9 @@ class FalconCsaxs(Device): def unstage(self) -> List[object]: logger.info("Waiting for Falcon to return from acquisition") - while True: - det_ctrl = self.state.read()[self.state.name]["value"] - if det_ctrl == int(DetectorState.DONE): - break - if self._stopped == True: - break - time.sleep(0.005) - logger.info("Falcon done") - # TODO needed? + if self._stopped: + return super().unstage() + self._falcon_finished() self._clean_up() state = True msg = BECMessage.FileMessage(file_path=self.destination_path, done=True, successful=state) @@ -249,8 +245,29 @@ class FalconCsaxs(Device): msg.dumps(), ) self._stopped = False + logger.info("Falcon done") return super().unstage() + def _falcon_finished(self): + """Function with 10s timeout""" + timer = 0 + while True: + det_ctrl = self.acquiring.get() + writer_ctrl = self.hdf5.capture.get() + received_frames = self.dxp.current_pixel.get() + total_frames = int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger) + # TODO if no writing was performed before + if det_ctrl == 0 and writer_ctrl == 0 and total_frames == received_frames: + break + if self._stopped == True: + break + time.sleep(0.1) + timer += 0.1 + if timer > 5: + raise FalconTimeoutError( + f"Reached timeout with detector state {det_ctrl}, std_daq state {writer_ctrl} and received frames of {received_frames} for the file writer" + ) + def stop(self, *, success=False) -> None: """Stop the scan, with camera and file writer""" self._clean_up() diff --git a/ophyd_devices/epics/devices/mcs_csaxs.py b/ophyd_devices/epics/devices/mcs_csaxs.py index 5d61f62..c0bf756 100644 --- a/ophyd_devices/epics/devices/mcs_csaxs.py +++ b/ophyd_devices/epics/devices/mcs_csaxs.py @@ -21,7 +21,11 @@ from bec_lib.core import bec_logger, threadlocked logger = bec_logger.logger -class MCSError(Exception): +class McsError(Exception): + pass + + +class McsTimeoutError(Exception): pass @@ -169,7 +173,7 @@ class McsCsaxs(SIS38XX): **kwargs, ) if device_manager is None and not sim_mode: - raise MCSError("Add DeviceManager to initialization or init with sim_mode=True") + raise McsError("Add DeviceManager to initialization or init with sim_mode=True") self.name = name self._stream_ttl = 1800 @@ -236,13 +240,6 @@ class McsCsaxs(SIS38XX): self.mca_data[obj.attr_name] = kwargs["value"][1:] if len(self.mca_names) != len(self.mca_data): return - # ref_entry = self.mca_data[self.mca_names[0]] - # if not ref_entry: - # self.mca_data = defaultdict(lambda: []) - # return - # if isinstance(ref_entry, list) and (ref_entry > 0): - # return - self._updated = True self.counter += 1 if (self.scaninfo.scan_type == "fly" and self.counter == self.num_lines.get()) or ( @@ -293,9 +290,9 @@ class McsCsaxs(SIS38XX): elif self.scaninfo.scan_type == "fly": self.n_points = int(self.scaninfo.num_points / int(self.num_lines.get()) + 1) else: - raise MCSError(f"Scantype {self.scaninfo} not implemented for MCS card") + raise McsError(f"Scantype {self.scaninfo} not implemented for MCS card") if self.n_points > 10000: - raise MCSError( + raise McsError( f"Requested number of points N={self.n_points} exceeds hardware limit of mcs card 10000 (N-1)" ) self.num_use_all.set(self.n_points) @@ -320,6 +317,8 @@ class McsCsaxs(SIS38XX): def stage(self) -> List[object]: """stage the detector and file writer""" + self._stopped = False + self._acquisition_done = False logger.info("Stage mcs") self.scaninfo.load_scan_metadata() self._prep_det() @@ -344,16 +343,30 @@ class McsCsaxs(SIS38XX): def unstage(self) -> List[object]: """unstage""" logger.info("Waiting for mcs to finish acquisition") - while not self._acquisition_done: - # monitor signal instead? - if self._stopped: - break - time.sleep(0.005) + self._mcs_finished() self._acquisition_done = False self._stopped = False logger.info("mcs done") return super().unstage() + def _mcs_finished(self): + """Function with 10s timeout""" + timer = 0 + while True: + if self._acquisition_done == True and self.acquiring.get() == 0: + break + if self._stopped == True: + break + time.sleep(0.1) + timer += 0.1 + if timer > 5: + total_frames = self.counter * int( + self.scaninfo.num_points / self.num_lines.get() + ) + max(self.current_channel.get() - 1, 0) + raise McsTimeoutError( + f"Reached timeout with mcs in state {self.acquiring.get()} and {total_frames} frames arriving at the mcs card" + ) + def arm_acquisition(self) -> None: """Arm acquisition Options: diff --git a/ophyd_devices/epics/devices/pilatus_csaxs.py b/ophyd_devices/epics/devices/pilatus_csaxs.py index 2efee34..f3a2a4a 100644 --- a/ophyd_devices/epics/devices/pilatus_csaxs.py +++ b/ophyd_devices/epics/devices/pilatus_csaxs.py @@ -25,6 +25,10 @@ class PilatusError(Exception): pass +class PilatusTimeoutError(Exception): + pass + + class TriggerSource(int, enum.Enum): INTERNAL = 0 EXT_ENABLE = 1 @@ -321,8 +325,8 @@ class PilatusCsaxs(DetectorBase): def stage(self) -> List[object]: """stage the detector and file writer""" - self._close_file_writer() - self._stop_file_writer() + self._acquisition_done = False + self._stopped = False self.scaninfo.load_scan_metadata() self.mokev = self.device_manager.devices.mokev.obj.read()[ self.device_manager.devices.mokev.name @@ -337,7 +341,6 @@ class PilatusCsaxs(DetectorBase): msg = BECMessage.FileMessage( file_path=self.filepath_h5, done=False, metadata={"input_path": self.destination_path} ) - return super().stage() def pre_scan(self) -> None: @@ -346,11 +349,10 @@ class PilatusCsaxs(DetectorBase): def unstage(self) -> List[object]: """unstage the detector and file writer""" # Reset to software trigger - self.triggermode = 0 - # TODO if images are missing, consider adding delay - self._close_file_writer() - self._stop_file_writer() - # Only sent this out once data is written to disk since cbf to hdf5 converter will be triggered + logger.info("Waiting for Pilatus to return from acquisition") + if self._stopped: + return super().unstage() + self._pilatus_finished() msg = BECMessage.FileMessage( file_path=self.filepath_h5, done=True, metadata={"input_path": self.destination_path} ) @@ -365,16 +367,37 @@ class PilatusCsaxs(DetectorBase): logger.info("Pilatus2 done") return super().unstage() + def _pilatus_finished(self) -> None: + timer = 0 + while True: + rtr = self.cam.status_message_camserver + if self.cam.acquire.get() == 0 and rtr == "Camserver returned OK": + break + if self._stopped == True: + break + time.sleep(0.1) + timer += 0.1 + if timer > 5: + self._close_file_writer() + self._stop_file_writer() + raise PilatusTimeoutError( + f"Pilatus timeout with detector state {self.cam.acquire.get()} and camserver return status: {rtr} " + ) + self._close_file_writer() + self._stop_file_writer() + def acquire(self) -> None: """Start acquisition in software trigger mode, or arm the detector in hardware of the detector """ - self.cam.acquire.set(1) + self.cam.acquire.put(1) def stop(self, *, success=False) -> None: """Stop the scan, with camera and file writer""" - self.cam.acquire.set(0) + self.cam.acquire.put(0) self._stop_file_writer() + # TODO maybe needed + # self._close_file_writer() # self.unstage() super().stop(success=success) self._stopped = True @@ -383,4 +406,3 @@ class PilatusCsaxs(DetectorBase): # Automatically connect to test environmenr if directly invoked if __name__ == "__main__": pilatus_2 = PilatusCsaxs(name="pilatus_2", prefix="X12SA-ES-PILATUS300K:", sim_mode=True) - pilatus_2.stage()