diff --git a/ophyd_devices/epics/devices/eiger9m_csaxs.py b/ophyd_devices/epics/devices/eiger9m_csaxs.py index 962476b..3037e32 100644 --- a/ophyd_devices/epics/devices/eiger9m_csaxs.py +++ b/ophyd_devices/epics/devices/eiger9m_csaxs.py @@ -1,78 +1,60 @@ import enum import threading import time -from typing import Any, List +from bec_lib.core.devicemanager import DeviceStatus import numpy as np import os + +from typing import Any, List + from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV from ophyd import DetectorBase, Device from ophyd import ADComponent as ADCpt +from std_daq_client import StdDaqClient + from bec_lib.core import BECMessage, MessageEndpoints, threadlocked from bec_lib.core.file_utils import FileWriterMixin from bec_lib.core import bec_logger -from ophyd_devices.utils import bec_utils as bec_utils - -from std_daq_client import StdDaqClient from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin - +from ophyd_devices.utils import bec_utils logger = bec_logger.logger class EigerError(Exception): + """Base class for exceptions in this module.""" + pass class EigerTimeoutError(Exception): + """Raised when the Eiger does not respond in time during unstage.""" + pass class SlsDetectorCam(Device): - # detector_type = ADCpt(EpicsSignalRO, "DetectorType_RBV") - # setting = ADCpt(EpicsSignalWithRBV, "Setting") - # delay_time = ADCpt(EpicsSignalWithRBV, "DelayTime") + """SLS Detector Camera - Eiger 9M + + Base class to map EPICS PVs to ophyd signals. + """ + threshold_energy = ADCpt(EpicsSignalWithRBV, "ThresholdEnergy") beam_energy = ADCpt(EpicsSignalWithRBV, "BeamEnergy") - # enable_trimbits = ADCpt(EpicsSignalWithRBV, "Trimbits") bit_depth = ADCpt(EpicsSignalWithRBV, "BitDepth") - # num_gates = ADCpt(EpicsSignalWithRBV, "NumGates") - num_cycles = ADCpt(EpicsSignalWithRBV, "NumCycles") + num_images = ADCpt(EpicsSignalWithRBV, "NumCycles") num_frames = ADCpt(EpicsSignalWithRBV, "NumFrames") - timing_mode = ADCpt(EpicsSignalWithRBV, "TimingMode") + trigger_mode = ADCpt(EpicsSignalWithRBV, "TimingMode") trigger_software = ADCpt(EpicsSignal, "TriggerSoftware") - # high_voltage = ADCpt(EpicsSignalWithRBV, "HighVoltage") - # Receiver and data callback - # receiver_mode = ADCpt(EpicsSignalWithRBV, "ReceiverMode") - # receiver_stream = ADCpt(EpicsSignalWithRBV, "ReceiverStream") - # enable_data = ADCpt(EpicsSignalWithRBV, "UseDataCallback") - # missed_packets = ADCpt(EpicsSignalRO, "ReceiverMissedPackets_RBV") - # Direct settings access - # setup_file = ADCpt(EpicsSignal, "SetupFile") - # load_setup = ADCpt(EpicsSignal, "LoadSetup") - # command = ADCpt(EpicsSignal, "Command") - # Mythen 3 - # counter_mask = ADCpt(EpicsSignalWithRBV, "CounterMask") - # counter1_threshold = ADCpt(EpicsSignalWithRBV, "Counter1Threshold") - # counter2_threshold = ADCpt(EpicsSignalWithRBV, "Counter2Threshold") - # counter3_threshold = ADCpt(EpicsSignalWithRBV, "Counter3Threshold") - # gate1_delay = ADCpt(EpicsSignalWithRBV, "Gate1Delay") - # gate1_width = ADCpt(EpicsSignalWithRBV, "Gate1Width") - # gate2_delay = ADCpt(EpicsSignalWithRBV, "Gate2Delay") - # gate2_width = ADCpt(EpicsSignalWithRBV, "Gate2Width") - # gate3_delay = ADCpt(EpicsSignalWithRBV, "Gate3Delay") - # gate3_width = ADCpt(EpicsSignalWithRBV, "Gate3Width") - # # Moench - # json_frame_mode = ADCpt(EpicsSignalWithRBV, "JsonFrameMode") - # json_detector_mode = ADCpt(EpicsSignalWithRBV, "JsonDetectorMode") - - # fixes due to missing PVs from CamBase acquire = ADCpt(EpicsSignal, "Acquire") detector_state = ADCpt(EpicsSignalRO, "DetectorState_RBV") class TriggerSource(int, enum.Enum): + """Trigger signals for Eiger9M detector""" + AUTO = 0 TRIGGER = 1 GATING = 2 @@ -80,6 +62,8 @@ class TriggerSource(int, enum.Enum): class DetectorState(int, enum.Enum): + """Detector states for Eiger9M detector""" + IDLE = 0 ERROR = 1 WAITING = 2 @@ -105,6 +89,7 @@ class Eiger9mCsaxs(DetectorBase): """ + # Specify which functions are revealed to the user in BEC client USER_ACCESS = [ "describe", ] @@ -124,6 +109,18 @@ class Eiger9mCsaxs(DetectorBase): sim_mode=False, **kwargs, ): + """Initialize the Eiger9M detector + Args: + #TODO add here the parameters for kind, read_attrs, configuration_attrs, parent + prefix (str): PV prefix (X12SA-ES-EIGER9M:) + name (str): 'eiger' + kind (str): + read_attrs (list): + configuration_attrs (list): + parent (object): + device_manager (object): BEC device manager + sim_mode (bool): simulation mode to start the detector without BEC, e.g. from ipython shell + """ super().__init__( prefix=prefix, name=name, @@ -133,13 +130,16 @@ class Eiger9mCsaxs(DetectorBase): parent=parent, **kwargs, ) - self._stopped = False - self._lock = threading.RLock() if device_manager is None and not sim_mode: raise EigerError("Add DeviceManager to initialization or init with sim_mode=True") + # Not sure if this is needed, comment it for now! + # self._lock = threading.RLock() + self._stopped = False self.name = name - self.wait_for_connection() # Make sure to be connected before talking to PVs + self.wait_for_connection() + # Spin up connections for simulation or BEC mode + # TODO check if sim_mode still works. Is it needed? I believe filewriting might be handled properly if not sim_mode: from bec_lib.core.bec_service import SERVICE_CONFIG @@ -147,32 +147,70 @@ class Eiger9mCsaxs(DetectorBase): self._producer = self.device_manager.producer self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] else: + base_path = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/" self._producer = bec_utils.MockProducer() self.device_manager = bec_utils.MockDeviceManager() self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) self.scaninfo.load_scan_metadata() - self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"} + self.service_cfg = {"base_path": base_path} + self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) self.scaninfo.load_scan_metadata() - # TODO - self.filepath = "" - self.filewriter = FileWriterMixin(self.service_cfg) - self.reduce_readout = 1e-3 # 3 ms - self.triggermode = 0 # 0 : internal, scan must set this if hardware triggered - self._init_eiger9m() - self._init_standard_daq() + self._init() - # self.mokev = self.device_manager.devices.mokev.read()[ - # self.device_manager.devices.mokev.name - # ]["value"] + # TODO function for abstract class? + def _init(self) -> None: + """Initialize detector, filewriter and set default parameters""" + self._default_parameter() + self._init_detector() + self._init_filewriter() - def _init_eiger9m(self) -> None: - """Init parameters for Eiger 9m""" + # TODO function for abstract class? + def _default_parameter(self) -> None: + """Set default parameters for Eiger 9M + readout (float) : readout time in seconds + """ + self.reduce_readout = 1e-3 + + # TODO function for abstract class? + def _init_detector(self) -> None: + """Init parameters for Eiger 9m. + Depends on hardware configuration and delay generators. + At this point it is set up for gating mode (09/2023). + """ + self._stop_det() self._set_trigger(TriggerSource.GATING) - self.stop_acquisition() + + # TODO function for abstract class? + def _init_filewriter(self) -> None: + """Init parameters for filewriter. + For the Eiger9M, the data backend is std_daq client. + Setting up these parameters depends on the backend, and would need to change upon changes in the backend. + """ + self.std_rest_server_url = "http://xbl-daq-29:5000" + self.std_client = StdDaqClient(url_base=self.std_rest_server_url) + self.std_client.stop_writer() + timeout = 0 + # TODO changing e-account was not possible during beamtimes. + # self._update_std_cfg("writer_user_id", int(self.scaninfo.username.strip(" e"))) + # time.sleep(5) + # TODO is this the only state to wait for or should we wait for more from the std_daq client? + while not self.std_client.get_status()["state"] == "READY": + time.sleep(0.1) + timeout = timeout + 0.1 + logger.info("Waiting for std_daq init.") + if timeout > 5: + if not self.std_client.get_status()["state"]: + raise EigerError( + f"Std client not in READY state, returns: {self.std_client.get_status()}" + ) + else: + return def _update_std_cfg(self, cfg_key: str, value: Any) -> None: + """Update std_daq config with new e-account for the current beamtime""" + # TODO Do we need all the loggers here, should this be properly refactored with a DEBUG mode? cfg = self.std_client.get_config() old_value = cfg.get(cfg_key) logger.info(old_value) @@ -189,71 +227,54 @@ class Eiger9mCsaxs(DetectorBase): logger.info(f"Updated std_daq config for key {cfg_key} from {old_value} to {value}") self.std_client.set_config(cfg) - def _init_standard_daq(self) -> None: - self.std_rest_server_url = "http://xbl-daq-29:5000" - self.std_client = StdDaqClient(url_base=self.std_rest_server_url) - self.std_client.stop_writer() - timeout = 0 - # TODO put back change of e-account! - # self._update_std_cfg("writer_user_id", int(self.scaninfo.username.strip(" e"))) - # time.sleep(5) - while not self.std_client.get_status()["state"] == "READY": - time.sleep(0.1) - timeout = timeout + 0.1 - logger.info("Waiting for std_daq init.") - if timeout > 5: - if not self.std_client.get_status()["state"]: - raise EigerError( - f"Std client not in READY state, returns: {self.std_client.get_status()}" - ) - else: - return + # TODO function for abstract class? + def stage(self) -> List[object]: + """Stage command, called from BEC in preparation of a scan. + This will iniate the preparation of detector and file writer. + The following functuions are called (at least): + - _prep_file_writer + - _prep_det + - _publish_file_location + The device returns a List[object] from the Ophyd Device class. - def _prep_det(self) -> None: - self._set_det_threshold() - self._set_acquisition_params() - self._set_trigger(TriggerSource.GATING) + #TODO make sure this is fullfiled - def _set_det_threshold(self) -> None: - # threshold_energy PV exists on Eiger 9M? - factor = 1 - if self.cam.threshold_energy._metadata["units"] == "eV": - factor = 1000 - setp_energy = int(self.mokev * factor) - energy = self.cam.beam_energy.read()[self.cam.beam_energy.name]["value"] - if setp_energy != energy: - self.cam.beam_energy.set(setp_energy) # .wait() - threshold = self.cam.threshold_energy.read()[self.cam.threshold_energy.name]["value"] - if not np.isclose(setp_energy / 2, threshold, rtol=0.05): - self.cam.threshold_energy.set(setp_energy / 2) # .wait() - - def _set_acquisition_params(self) -> None: - # self.cam.acquire_time.set(self.scaninfo.exp_time) - # Set acquisition parameters slightly shorter then cycle - # self.cam.acquire_period.set( - # self.scaninfo.exp_time + (self.scaninfo.readout_time - self.reduce_readout) - # ) - self.cam.num_cycles.put(int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger)) - self.cam.num_frames.put(1) - - def _set_trigger(self, trigger_source: TriggerSource) -> None: - """Set trigger source for the detector, either directly to value or TriggerSource.* with - AUTO = 0 - TRIGGER = 1 - GATING = 2 - BURST_TRIGGER = 3 + Staging not idempotent and should raise + :obj:`RedundantStaging` if staged twice without an + intermediate :meth:`~BlueskyInterface.unstage`. """ - value = int(trigger_source) - self.cam.timing_mode.put(value) + self._stopped = False + self.scaninfo.load_scan_metadata() + self.mokev = self.device_manager.devices.mokev.obj.read()[ + self.device_manager.devices.mokev.name + ]["value"] + # TODO refactor logger.info to DEBUG mode? + self._prep_file_writer() + self._prep_det() + state = False + self._publish_file_location(done=state, successful=state) + self._arm_acquisition() + # TODO Fix should take place in EPICS or directly on the hardware! + # We observed that the detector missed triggers in the beginning in case BEC was to fast. Adding 50ms delay solved this + time.sleep(0.05) + return super().stage() + # TODO function for abstract class? def _prep_file_writer(self) -> None: + """Prepare file writer for scan + + self.filewriter is a FileWriterMixin object that hosts logic for compiling the filepath + """ self.filepath = self.filewriter.compile_full_filename( self.scaninfo.scan_number, f"{self.name}.h5", 1000, 5, True ) + # TODO needed, should be checked from the filerwriter mixin right? while not os.path.exists(os.path.dirname(self.filepath)): time.sleep(0.1) - self._close_file_writer() + + self._stop_file_writer() logger.info(f" std_daq output filepath {self.filepath}") + # TODO Discuss with Leo if this is needed, or how to start the async writing best try: self.std_client.start_writer_async( { @@ -272,100 +293,66 @@ class Eiger9mCsaxs(DetectorBase): break time.sleep(0.005) - def _close_file_writer(self) -> None: + # TODO function for abstract class? + def _stop_file_writer(self) -> None: + """Close file writer""" self.std_client.stop_writer() - pass + # TODO can I wait for a status message here maybe? To ensure writer returned - def stage(self) -> List[object]: - """stage the detector and file writer""" - self._stopped = False - self._acquisition_done = False - self.scaninfo.load_scan_metadata() - self.mokev = self.device_manager.devices.mokev.obj.read()[ - self.device_manager.devices.mokev.name - ]["value"] - - self._prep_file_writer() - self._prep_det() - logger.info("Waiting for std daq to be armed") - logger.info("std_daq is ready") - - msg = BECMessage.FileMessage(file_path=self.filepath, done=False) - self._producer.set_and_publish( - MessageEndpoints.public_file(self.scaninfo.scanID, self.name), - msg.dumps(), - ) - msg = BECMessage.FileMessage(file_path=self.filepath, done=False) - self._producer.set_and_publish( - MessageEndpoints.file_event(self.name), - msg.dumps(), - ) - self.arm_acquisition() - - self._stopped = False - # We see that we miss a trigger occasionally, it seems that status msg from the ioc are not realiable - time.sleep(0.05) - return super().stage() - - @threadlocked - def unstage(self) -> List[object]: - """unstage the detector and file writer""" - logger.info("Waiting for Eiger9M to finish") - old_scanID = self.scaninfo.scanID - self.scaninfo.load_scan_metadata() - logger.info(f"Old scanID: {old_scanID}, ") - if self.scaninfo.scanID != old_scanID: - self._stopped = True - if self._stopped == True: - return super().unstage() - self._eiger9M_finished() - # Message to BEC - state = True - - msg = BECMessage.FileMessage(file_path=self.filepath, done=True, successful=state) - self._producer.set_and_publish( - MessageEndpoints.public_file(self.scaninfo.scanID, self.name), - msg.dumps(), - ) - self._stopped = False - logger.info("Eiger9M finished") - return super().unstage() - - @threadlocked - def _eiger9M_finished(self): - """Function with 10s timeout""" - timer = 0 - while True: - det_ctrl = self.cam.acquire.read()[self.cam.acquire.name]["value"] - # det_ctrl = 0 - std_ctrl = self.std_client.get_status()["acquisition"]["state"] - status = self.std_client.get_status() - received_frames = status["acquisition"]["stats"]["n_write_completed"] - total_frames = int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger) - # TODO if no writing was performed before - if det_ctrl == 0 and std_ctrl == "FINISHED" and total_frames == received_frames: - break - if self._stopped == True: - self.stop_acquisition() - self._close_file_writer() - break - time.sleep(0.1) - timer += 0.1 - if timer > 5: - self._stopped == True - self._close_file_writer() - self.stop_acquisition() - raise EigerTimeoutError( - f"Reached timeout with detector state {det_ctrl}, std_daq state {std_ctrl} and received frames of {received_frames} for the file writer" - ) - self._close_file_writer() - - def arm_acquisition(self) -> None: - """Start acquisition in software trigger mode, - or arm the detector in hardware of the detector + # TODO function for abstract class? + def _prep_det(self) -> None: + """Prepare detector for scan. + Includes checking the detector threshold, setting the acquisition parameters and setting the trigger source """ + self._set_det_threshold() + self._set_acquisition_params() + self._set_trigger(TriggerSource.GATING) + + def _set_det_threshold(self) -> None: + """Set correct detector threshold to 1/2 of current X-ray energy, allow 5% tolerance""" + # threshold energy might be in eV or keV + factor = 1 + if self.cam.threshold_energy._metadata["units"] == "eV": + factor = 1000 + setpoint = int(self.mokev * factor) + energy = self.cam.beam_energy.read()[self.cam.beam_energy.name]["value"] + if setpoint != energy: + self.cam.beam_energy.set(setpoint) + threshold = self.cam.threshold_energy.read()[self.cam.threshold_energy.name]["value"] + if not np.isclose(setpoint / 2, threshold, rtol=0.05): + self.cam.threshold_energy.set(setpoint / 2) + + def _set_acquisition_params(self) -> None: + """Set acquisition parameters for the detector""" + self.cam.num_images.put(int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger)) + self.cam.num_frames.put(1) + + # TODO function for abstract class? + call it for each scan?? + def _set_trigger(self, trigger_source: TriggerSource) -> None: + """Set trigger source for the detector. + Check the TriggerSource enum for possible values + """ + value = int(trigger_source) + self.cam.trigger_mode.put(value) + + def _publish_file_location(self, done=False, successful=False) -> None: + """Publish the filepath to REDIS + First msg for file writer and the second one for other listeners (e.g. radial integ) + """ + pipe = self._producer.pipeline() + msg = BECMessage.FileMessage(file_path=self.filepath, done=done, successful=successful) + self._producer.set_and_publish( + MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps(), pipe=pipe + ) + self._producer.set_and_publish( + MessageEndpoints.file_event(self.name), msg.dumps(), pip=pipe + ) + pipe.execute() + + # TODO function for abstract class? + def _arm_acquisition(self) -> None: + """Arm Eiger detector for acquisition""" self.cam.acquire.put(1) - logger.info("Waiting for Eiger9m to be armed") while True: det_ctrl = self.cam.detector_state.read()[self.cam.detector_state.name]["value"] if det_ctrl == int(DetectorState.RUNNING): @@ -373,15 +360,96 @@ class Eiger9mCsaxs(DetectorBase): if self._stopped == True: break time.sleep(0.005) - logger.info("Eiger9m is armed") - def stop_acquisition(self) -> None: + # TODO function for abstract class? + def trigger(self) -> DeviceStatus: + """Trigger the detector, called from BEC.""" + self._on_trigger() + return super().trigger() + + # TODO function for abstract class? + def _on_trigger(self): + """Specify action that should be taken upon trigger signal.""" + pass + + # TODO function for abstract class? + # TODO threadlocked was attached, in principle unstage needs to be fast and should possibly called multiple times + @threadlocked + def unstage(self) -> List[object]: + """Unstage the device. + + This method must be idempotent, multiple calls (without a new + call to 'stage') have no effect. + + Functions called: + - _finished + - _publish_file_location + """ + # TODO solution for multiple calls of the function to avoid calling the finished loop. + # Loop to avoid calling the finished loop multiple times + old_scanID = self.scaninfo.scanID + self.scaninfo.load_scan_metadata() + if self.scaninfo.scanID != old_scanID: + self._stopped = True + if self._stopped == True: + return super().unstage() + self._finished() + state = True + self._publish_file_location(done=state, successful=state) + self._stopped = False + return super().unstage() + + # TODO function for abstract class? + # TODO necessary, how can we make this cleaner. + @threadlocked + def _finished(self): + """Check if acquisition is finished. + + This function is called from unstage and stop + and will check detector and file backend status. + Timeouts after given time + + Functions called: + - _stop_det + - _stop_file_writer + """ + sleep_time = 0.1 + timeout = 5 + timer = 0 + # Check status with timeout, break out if _stopped=True + while True: + det_ctrl = self.cam.acquire.read()[self.cam.acquire.name]["value"] + std_ctrl = self.std_client.get_status()["acquisition"]["state"] + status = self.std_client.get_status() + received_frames = status["acquisition"]["stats"]["n_write_completed"] + total_frames = int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger) + if det_ctrl == 0 and std_ctrl == "FINISHED" and total_frames == received_frames: + break + if self._stopped == True: + self._stop_det() + self._stop_file_writer() + break + time.sleep(sleep_time) + timer += sleep_time + if timer > timeout: + self._stopped == True + self._stop_det() + self._stop_file_writer() + raise EigerTimeoutError( + f"Reached timeout with detector state {det_ctrl}, std_daq state {std_ctrl} and received frames of {received_frames} for the file writer" + ) + self._stop_det() + self._stop_file_writer() + + def _stop_det(self) -> None: """Stop the detector and wait for the proper status message""" - logger.info("Waiting for Eiger9m to be armed") elapsed_time = 0 sleep_time = 0.01 + timeout = 5 + # Stop acquisition self.cam.acquire.put(0) retry = False + # Check status while True: det_ctrl = self.cam.detector_state.read()[self.cam.detector_state.name]["value"] if det_ctrl == int(DetectorState.IDLE): @@ -390,16 +458,17 @@ class Eiger9mCsaxs(DetectorBase): break time.sleep(sleep_time) elapsed_time += sleep_time - if elapsed_time > 2 and not retry: + if elapsed_time > timeout // 2 and not retry: retry = True + # Retry to stop acquisition self.cam.acquire.put(0) - if elapsed_time > 5: + if elapsed_time > timeout: raise EigerTimeoutError("Failed to stop the acquisition. IOC did not update.") def stop(self, *, success=False) -> None: """Stop the scan, with camera and file writer""" - self.stop_acquisition() - self._close_file_writer() + self._stop_det() + self._stop_file_writer() super().stop(success=success) self._stopped = True diff --git a/ophyd_devices/epics/devices/falcon_csaxs.py b/ophyd_devices/epics/devices/falcon_csaxs.py index f52f87d..4a5bd1b 100644 --- a/ophyd_devices/epics/devices/falcon_csaxs.py +++ b/ophyd_devices/epics/devices/falcon_csaxs.py @@ -1,37 +1,47 @@ import enum import os import time -from typing import List -from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Component as Cpt, Device +from typing import List +from bec_lib.core.devicemanager import DeviceStatus + +from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Component as Cpt, Device from ophyd.mca import EpicsMCARecord -from ophyd.areadetector.plugins import HDF5Plugin_V21, FilePlugin_V22 from bec_lib.core.file_utils import FileWriterMixin from bec_lib.core import MessageEndpoints, BECMessage from bec_lib.core import bec_logger -from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin +from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin from ophyd_devices.utils import bec_utils logger = bec_logger.logger class FalconError(Exception): + """Base class for exceptions in this module.""" + pass class FalconTimeoutError(Exception): + """Raised when the Falcon does not respond in time during unstage.""" + pass class DetectorState(int, enum.Enum): + """Detector states for Falcon detector""" + DONE = 0 ACQUIRING = 1 class EpicsDXPFalcon(Device): - """All high-level DXP parameters for each channel""" + """DXP parameters for Falcon detector + + Base class to map EPICS PVs from DXP parameters to ophyd signals. + """ elapsed_live_time = Cpt(EpicsSignal, "ElapsedLiveTime") elapsed_real_time = Cpt(EpicsSignal, "ElapsedRealTime") @@ -51,15 +61,17 @@ class EpicsDXPFalcon(Device): current_pixel = Cpt(EpicsSignalRO, "CurrentPixel") -class FalconHDF5Plugins(Device): # HDF5Plugin_V21, FilePlugin_V22): +class FalconHDF5Plugins(Device): + """HDF5 parameters for Falcon detector + + Base class to map EPICS PVs from HDF5 Plugin to ophyd signals. + """ + capture = Cpt(EpicsSignalWithRBV, "Capture") enable = Cpt(EpicsSignalWithRBV, "EnableCallbacks", string=True, kind="config") xml_file_name = Cpt(EpicsSignalWithRBV, "XMLFileName", string=True, kind="config") lazy_open = Cpt(EpicsSignalWithRBV, "LazyOpen", string=True, doc="0='No' 1='Yes'") temp_suffix = Cpt(EpicsSignalWithRBV, "TempSuffix", string=True) - # file_path = Cpt( - # EpicsSignalWithRBV, "FilePath", string=True, kind="config", path_semantics="posix" - # ) file_path = Cpt(EpicsSignalWithRBV, "FilePath", string=True, kind="config") file_name = Cpt(EpicsSignalWithRBV, "FileName", string=True, kind="config") file_template = Cpt(EpicsSignalWithRBV, "FileTemplate", string=True, kind="config") @@ -70,29 +82,40 @@ class FalconHDF5Plugins(Device): # HDF5Plugin_V21, FilePlugin_V22): class FalconCsaxs(Device): - """FalxonX1 with HDF5 writer""" + """Falcon Sitoro detector for CSAXS + + Parent class: Device + Device classes: EpicsDXPFalcon dxp1:, EpicsMCARecord mca1, FalconHDF5Plugins HDF1: + + Attributes: + name str: 'falcon' + prefix (str): PV prefix ("X12SA-SITORO:) + + """ + + # Specify which functions are revealed to the user in BEC client + USER_ACCESS = [ + "describe", + ] dxp = Cpt(EpicsDXPFalcon, "dxp1:") mca = Cpt(EpicsMCARecord, "mca1") hdf5 = Cpt(FalconHDF5Plugins, "HDF1:") - # Control + # specify Epics PVs for Falcon + # TODO consider moving this outside of this class! stop_all = Cpt(EpicsSignal, "StopAll") erase_all = Cpt(EpicsSignal, "EraseAll") start_all = Cpt(EpicsSignal, "StartAll") state = Cpt(EpicsSignal, "Acquiring") - # Preset options preset_mode = Cpt(EpicsSignal, "PresetMode") # 0 No preset 1 Real time 2 Events 3 Triggers preset_real = Cpt(EpicsSignal, "PresetReal") preset_events = Cpt(EpicsSignal, "PresetEvents") preset_triggers = Cpt(EpicsSignal, "PresetTriggers") - # read-only diagnostics triggers = Cpt(EpicsSignalRO, "MaxTriggers", lazy=True) events = Cpt(EpicsSignalRO, "MaxEvents", lazy=True) input_count_rate = Cpt(EpicsSignalRO, "MaxInputCountRate", lazy=True) output_count_rate = Cpt(EpicsSignalRO, "MaxOutputCountRate", lazy=True) - - # Mapping control collect_mode = Cpt(EpicsSignal, "CollectMode") # 0 MCA spectra, 1 MCA mapping pixel_advance_mode = Cpt(EpicsSignal, "PixelAdvanceMode") ignore_gate = Cpt(EpicsSignal, "IgnoreGate") @@ -102,8 +125,6 @@ class FalconCsaxs(Device): pixels_per_run = Cpt(EpicsSignal, "PixelsPerRun") nd_array_mode = Cpt(EpicsSignal, "NDArrayMode") - # HDF5 - def __init__( self, prefix="", @@ -117,6 +138,18 @@ class FalconCsaxs(Device): sim_mode=False, **kwargs, ): + """Initialize Falcon detector + Args: + #TODO add here the parameters for kind, read_attrs, configuration_attrs, parent + prefix (str): PV prefix ("X12SA-SITORO:) + name (str): 'falcon' + kind (str): + read_attrs (list): + configuration_attrs (list): + parent (object): + device_manager (object): BEC device manager + sim_mode (bool): simulation mode to start the detector without BEC, e.g. from ipython shell + """ super().__init__( prefix=prefix, name=name, @@ -130,7 +163,8 @@ class FalconCsaxs(Device): raise FalconError("Add DeviceManager to initialization or init with sim_mode=True") self._stopped = False self.name = name - self.wait_for_connection() # Make sure to be connected before talking to PVs + self.wait_for_connection() + # Spin up connections for simulation or BEC mode if not sim_mode: from bec_lib.core.bec_service import SERVICE_CONFIG @@ -138,37 +172,60 @@ class FalconCsaxs(Device): self._producer = self.device_manager.producer self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] else: + base_path = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/" self._producer = bec_utils.MockProducer() self.device_manager = bec_utils.MockDeviceManager() self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) self.scaninfo.load_scan_metadata() - self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"} + self.service_cfg = {"base_path": base_path} + self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) self.scaninfo.load_scan_metadata() self.filewriter = FileWriterMixin(self.service_cfg) + self._init() - self.readout = 0.003 # 3 ms + def _init(self) -> None: + """Initialize detector, filewriter and set default parameters""" + self._default_parameter() + self._init_detector() + self._init_filewriter() + + def _default_parameter(self) -> None: + """Set default parameters for Falcon + readout (float): readout time in seconds + _value_pixel_per_buffer (int): number of spectra in buffer of Falcon Sitoro""" + self.readout = 1e-3 self._value_pixel_per_buffer = 20 # 16 - self._clean_up() - self._init_hdf5_saving() - self._init_mapping_mode() + self._stop_det() + self._stop_file_writer() - def _clean_up(self) -> None: - """Clean up""" - self.hdf5.capture.put(0) + def _stop_det(self) -> None: + """ "Stop detector""" self.stop_all.put(1) self.erase_all.put(1) - def _init_hdf5_saving(self) -> None: - """Set up hdf5 save parameters""" + def _stop_file_writer(self) -> None: + """ "Stop the file writer""" + self.hdf5.capture.put(0) + + def _init_filewriter(self) -> None: + """Initialize file writer for Falcon. + This includes setting variables for the HDF5 plugin (EPICS) that is used to write the data. + """ self.hdf5.enable.put(1) # EnableCallbacks self.hdf5.xml_file_name.put("layout.xml") # Points to hardcopy of HDF5 Layout xml file - self.hdf5.lazy_open.put(1) # Yes -> To be checked how to add FilePlugin_V21+ + self.hdf5.lazy_open.put( + 1 + ) # Potentially not needed, means a temp data file is created first, could be 0 self.hdf5.temp_suffix.put("") # -> To be checked how to add FilePlugin_V22+ - self.hdf5.queue_size.put(2000) + self.hdf5.queue_size.put(2000) # size of queue for spectra in the buffer - def _init_mapping_mode(self) -> None: - """Set up mapping mode params""" + def _init_detector(self) -> None: + """Initialize Falcon detector. + The detector is operated in MCA mapping mode. + Parameters here affect the triggering, gating etc. + This includes also the readout chunk size and whether data is segmented into spectra in EPICS. + """ self.collect_mode.put(1) # 1 MCA Mapping, 0 MCA Spectrum self.preset_mode.put(1) # 1 Realtime self.input_logic_polarity.put(0) # 0 Normal, 1 Inverted @@ -176,24 +233,47 @@ class FalconCsaxs(Device): self.ignore_gate.put(0) # 1 Yes, 0 No self.auto_pixels_per_buffer.put(0) # 0 Manual 1 Auto self.pixels_per_buffer.put(self._value_pixel_per_buffer) # - self.nd_array_mode.put(1) + self.nd_array_mode.put(1) # Segmentation happens in EPICS + + def stage(self) -> List[object]: + """Stage command, called from BEC in preparation of a scan. + This will iniate the preparation of detector and file writer. + The following functuions are called (at least): + - _prep_file_writer + - _prep_det + - _publish_file_location + The device returns a List[object] from the Ophyd Device class. + + #TODO make sure this is fullfiled + + Staging not idempotent and should raise + :obj:`RedundantStaging` if staged twice without an + intermediate :meth:`~BlueskyInterface.unstage`. + """ + self._stopped = False + self.scaninfo.load_scan_metadata() + self.mokev = self.device_manager.devices.mokev.obj.read()[ + self.device_manager.devices.mokev.name + ]["value"] + self._prep_file_writer() + self._prep_det() + state = False + self._publish_file_location(done=state, successful=state) + self._arm_acquisition() + return super().stage() def _prep_det(self) -> None: """Prepare detector for acquisition""" self.collect_mode.put(1) self.preset_real.put(self.scaninfo.exp_time) self.pixels_per_run.put(int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger)) - # self.auto_pixels_per_buffer.put(0) - # self.pixels_per_buffer.put(self._value_pixel_per_buffer) def _prep_file_writer(self) -> None: - """Prep HDF5 weriting""" - # TODO creta filename and destination path from filepath - self.destination_path = self.filewriter.compile_full_filename( + """Prepare filewriting from HDF5 plugin""" + self.filepath = self.filewriter.compile_full_filename( self.scaninfo.scan_number, f"{self.name}.h5", 1000, 5, True ) - # self.hdf5.file_path.set(self.destination_path) - file_path, file_name = os.path.split(self.destination_path) + file_path, file_name = os.path.split(self.filepath) self.hdf5.file_path.put(file_path) self.hdf5.file_name.put(file_name) self.hdf5.file_template.put(f"%s%s") @@ -202,29 +282,23 @@ class FalconCsaxs(Device): self.hdf5.array_counter.put(0) self.hdf5.capture.put(1) - def stage(self) -> List[object]: - """stage the detector and file writer""" - # TODO clean up needed? - self._stopped = False - self.scaninfo.load_scan_metadata() - self.mokev = self.device_manager.devices.mokev.obj.read()[ - self.device_manager.devices.mokev.name - ]["value"] - - logger.info("Waiting for pilatus2 to be armed") - self._prep_det() - logger.info("Pilatus2 armed") - logger.info("Waiting for pilatus2 zmq stream to be ready") - self._prep_file_writer() - logger.info("Pilatus2 zmq ready") - - msg = BECMessage.FileMessage(file_path=self.destination_path, done=False) + def _publish_file_location(self, done=False, successful=False) -> None: + """Publish the filepath to REDIS + First msg for file writer and the second one for other listeners (e.g. radial integ) + """ + pipe = self._producer.pipeline() + msg = BECMessage.FileMessage(file_path=self.filepath, done=done, successful=successful) self._producer.set_and_publish( - MessageEndpoints.public_file(self.scaninfo.scanID, self.name), - msg.dumps(), + MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps(), pipe=pipe ) - self.arm_acquisition() - logger.info("Waiting for Falcon to be armed") + self._producer.set_and_publish( + MessageEndpoints.file_event(self.name), msg.dumps(), pip=pipe + ) + pipe.execute() + + def _arm_acquisition(self) -> None: + """Arm Falcon detector for acquisition""" + self.start_all.put(1) while True: det_ctrl = self.state.read()[self.state.name]["value"] if det_ctrl == int(DetectorState.ACQUIRING): @@ -232,14 +306,28 @@ class FalconCsaxs(Device): if self._stopped == True: break time.sleep(0.005) - logger.info("Falcon is armed") - return super().stage() - def arm_acquisition(self) -> None: - self.start_all.put(1) + # TODO function for abstract class? + def trigger(self) -> DeviceStatus: + """Trigger the detector, called from BEC.""" + self._on_trigger() + return super().trigger() + + # TODO function for abstract class? + def _on_trigger(self): + """Specify action that should be taken upon trigger signal.""" + pass def unstage(self) -> List[object]: - logger.info("Waiting for Falcon to return from acquisition") + """Unstage the device. + + This method must be idempotent, multiple calls (without a new + call to 'stage') have no effect. + + Functions called: + - _finished + - _publish_file_location + """ old_scanID = self.scaninfo.scanID self.scaninfo.load_scan_metadata() logger.info(f"Old scanID: {old_scanID}, ") @@ -247,20 +335,25 @@ class FalconCsaxs(Device): self._stopped = True if self._stopped: return super().unstage() - self._falcon_finished() - self._clean_up() + self._finished() state = True - msg = BECMessage.FileMessage(file_path=self.destination_path, done=True, successful=state) - self._producer.set_and_publish( - MessageEndpoints.public_file(self.scaninfo.metadata["scanID"], self.name), - msg.dumps(), - ) + self._publish_file_location(done=state, successful=state) self._stopped = False - logger.info("Falcon done") return super().unstage() - def _falcon_finished(self): - """Function with 10s timeout""" + def _finished(self): + """Check if acquisition is finished. + + This function is called from unstage and stop + and will check detector and file backend status. + Timeouts after given time + + Functions called: + - _stop_det + - _stop_file_writer + """ + sleep_time = 0.1 + timeout = 5 timer = 0 while True: det_ctrl = self.state.read()[self.state.name]["value"] @@ -268,25 +361,25 @@ class FalconCsaxs(Device): received_frames = self.dxp.current_pixel.get() written_frames = self.hdf5.array_counter.get() total_frames = int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger) - # TODO if no writing was performed before + # TODO Could check state of detector (det_ctrl) and file writer (writer_ctrl) if total_frames == received_frames and total_frames == written_frames: break if self._stopped == True: break - time.sleep(0.1) - timer += 0.1 - if timer > 5: + time.sleep(sleep_time) + timer += sleep_time + if timer > timeout: logger.info( f"Falcon missed a trigger: received trigger {received_frames}, send data {written_frames} from total_frames {total_frames}" ) break - # raise FalconTimeoutError - # f"Reached timeout with detector state {det_ctrl}, falcon state {writer_ctrl}, received trigger {received_frames} and files written {written_frames}" - # ) + self._stop_det() + self._stop_file_writer() def stop(self, *, success=False) -> None: """Stop the scan, with camera and file writer""" - self._clean_up() + self._stop_det() + self._stop_file_writer() super().stop(success=success) self._stopped = True diff --git a/ophyd_devices/epics/devices/pilatus_csaxs.py b/ophyd_devices/epics/devices/pilatus_csaxs.py index e175eb7..1982d86 100644 --- a/ophyd_devices/epics/devices/pilatus_csaxs.py +++ b/ophyd_devices/epics/devices/pilatus_csaxs.py @@ -1,32 +1,36 @@ import enum import json import os -import subprocess import time -from typing import List +from bec_lib.core.devicemanager import DeviceStatus import requests import numpy as np +from typing import List + from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV from ophyd import DetectorBase, Device, Staged from ophyd import ADComponent as ADCpt -from ophyd_devices.utils import bec_utils as bec_utils from bec_lib.core import BECMessage, MessageEndpoints from bec_lib.core.file_utils import FileWriterMixin from bec_lib.core import bec_logger - +from ophyd_devices.utils import bec_utils as bec_utils from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin logger = bec_logger.logger class PilatusError(Exception): + """Base class for exceptions in this module.""" + pass class PilatusTimeoutError(Exception): + """Raised when the Pilatus does not respond in time during unstage.""" + pass @@ -38,50 +42,12 @@ class TriggerSource(int, enum.Enum): ALGINMENT = 4 -class SlsDetectorCam(Device): # CamBase, FileBase): - # detector_type = ADCpt(EpicsSignalRO, "DetectorType_RBV") - # setting = ADCpt(EpicsSignalWithRBV, "Setting") - # beam_energy = ADCpt(EpicsSignalWithRBV, "BeamEnergy") - # enable_trimbits = ADCpt(EpicsSignalWithRBV, "Trimbits") - # bit_depth = ADCpt(EpicsSignalWithRBV, "BitDepth") - # trigger_software = ADCpt(EpicsSignal, "TriggerSoftware") - # high_voltage = ADCpt(EpicsSignalWithRBV, "HighVoltage") - # Receiver and data callback - # receiver_mode = ADCpt(EpicsSignalWithRBV, "ReceiverMode") - # receiver_stream = ADCpt(EpicsSignalWithRBV, "ReceiverStream") - # enable_data = ADCpt(EpicsSignalWithRBV, "UseDataCallback") - # missed_packets = ADCpt(EpicsSignalRO, "ReceiverMissedPackets_RBV") - # # Direct settings access - # setup_file = ADCpt(EpicsSignal, "SetupFile") - # load_setup = ADCpt(EpicsSignal, "LoadSetup") - # command = ADCpt(EpicsSignal, "Command") - # Mythen 3 - # counter_mask = ADCpt(EpicsSignalWithRBV, "CounterMask") - # counter1_threshold = ADCpt(EpicsSignalWithRBV, "Counter1Threshold") - # counter2_threshold = ADCpt(EpicsSignalWithRBV, "Counter2Threshold") - # counter3_threshold = ADCpt(EpicsSignalWithRBV, "Counter3Threshold") - # gate1_delay = ADCpt(EpicsSignalWithRBV, "Gate1Delay") - # gate1_width = ADCpt(EpicsSignalWithRBV, "Gate1Width") - # gate2_delay = ADCpt(EpicsSignalWithRBV, "Gate2Delay") - # gate2_width = ADCpt(EpicsSignalWithRBV, "Gate2Width") - # gate3_delay = ADCpt(EpicsSignalWithRBV, "Gate3Delay") - # gate3_width = ADCpt(EpicsSignalWithRBV, "Gate3Width") - # Moench - # json_frame_mode = ADCpt(EpicsSignalWithRBV, "JsonFrameMode") - # json_detector_mode = ADCpt(EpicsSignalWithRBV, "JsonDetectorMode") +class SlsDetectorCam(Device): + """SLS Detector Camera - Pilatus - # Eiger9M - # delay_time = ADCpt(EpicsSignalWithRBV, "DelayTime") - # num_frames = ADCpt(EpicsSignalWithRBV, "NumFrames") - # acquire = ADCpt(EpicsSignal, "Acquire") - # acquire_time = ADCpt(EpicsSignal, 'AcquireTime') - # detector_state = ADCpt(EpicsSignalRO, "DetectorState_RBV") - # threshold_energy = ADCpt(EpicsSignalWithRBV, "ThresholdEnergy") - # num_gates = ADCpt(EpicsSignalWithRBV, "NumGates") - # num_cycles = ADCpt(EpicsSignalWithRBV, "NumCycles") - # timing_mode = ADCpt(EpicsSignalWithRBV, "TimingMode") + Base class to map EPICS PVs to ophyd signals. + """ - # Pilatus_2 300k num_images = ADCpt(EpicsSignalWithRBV, "NumImages") num_exposures = ADCpt(EpicsSignalWithRBV, "NumExposures") delay_time = ADCpt(EpicsSignalWithRBV, "NumExposures") @@ -116,6 +82,11 @@ class PilatusCsaxs(DetectorBase): """ + # Specify which functions are revealed to the user in BEC client + USER_ACCESS = [ + "describe", + ] + cam = ADCpt(SlsDetectorCam, "cam1:") def __init__( @@ -131,6 +102,18 @@ class PilatusCsaxs(DetectorBase): sim_mode=False, **kwargs, ): + """Initialize the Pilatus detector + Args: + #TODO add here the parameters for kind, read_attrs, configuration_attrs, parent + prefix (str): PV prefix ("X12SA-ES-PILATUS300K:) + name (str): 'pilatus_2' + kind (str): + read_attrs (list): + configuration_attrs (list): + parent (object): + device_manager (object): BEC device manager + sim_mode (bool): simulation mode to start the detector without BEC, e.g. from ipython shell + """ super().__init__( prefix=prefix, name=name, @@ -144,7 +127,8 @@ class PilatusCsaxs(DetectorBase): raise PilatusError("Add DeviceManager to initialization or init with sim_mode=True") self.name = name - self.wait_for_connection() # Make sure to be connected before talking to PVs + self.wait_for_connection() + # Spin up connections for simulation or BEC mode if not sim_mode: from bec_lib.core.bec_service import SERVICE_CONFIG @@ -152,24 +136,39 @@ class PilatusCsaxs(DetectorBase): self._producer = self.device_manager.producer self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] else: + base_path = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/" self._producer = bec_utils.MockProducer() self.device_manager = bec_utils.MockDeviceManager() self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) self.scaninfo.load_scan_metadata() - self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"} + self.service_cfg = {"base_path": base_path} self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) - self.filepath_h5 = "" - + self.scaninfo.load_scan_metadata() self.filewriter = FileWriterMixin(self.service_cfg) - self.readout = 1e-3 # 3 ms + self._init() - # TODO maybe needed - # self._close_file_writer() + def _init(self) -> None: + """Initialize detector, filewriter and set default parameters""" + self._default_parameter() + self._init_detector() + self._init_filewriter() - def _get_current_scan_msg(self) -> BECMessage.ScanStatusMessage: - msg = self.device_manager.producer.get(MessageEndpoints.scan_status()) - return BECMessage.ScanStatusMessage.loads(msg) + def _default_parameter(self) -> None: + """Set default parameters for Pilatus300k detector + readout (float): readout time in seconds + """ + self.reduce_readout = 1e-3 + + def _init_detector(self) -> None: + """Initialize the detector""" + # TODO add check if detector is running + pass + + def _init_filewriter(self) -> None: + """Initialize the file writer""" + # TODO in case the data backend is rewritten, add check if it is ready! + pass def _prep_det(self) -> None: # TODO slow reaction, seemed to have timeout. @@ -181,10 +180,10 @@ class PilatusCsaxs(DetectorBase): factor = 1 if self.cam.threshold_energy._metadata["units"] == "eV": factor = 1000 - setp_energy = int(self.mokev * factor) + setpoint = int(self.mokev * factor) threshold = self.cam.threshold_energy.read()[self.cam.threshold_energy.name]["value"] - if not np.isclose(setp_energy / 2, threshold, rtol=0.05): - self.cam.threshold_energy.set(setp_energy / 2) + if not np.isclose(setpoint / 2, threshold, rtol=0.05): + self.cam.threshold_energy.set(setpoint / 2) def _set_acquisition_params(self) -> None: """set acquisition parameters on the detector""" @@ -220,7 +219,7 @@ class PilatusCsaxs(DetectorBase): self._stop_file_writer() time.sleep(0.1) - self.filepath_h5 = self.filewriter.compile_full_filename( + self.filepath_raw = self.filewriter.compile_full_filename( self.scaninfo.scan_number, "pilatus_2.h5", 1000, 5, True ) self.cam.file_path.put(f"/dev/shm/zmq/") @@ -232,19 +231,19 @@ class PilatusCsaxs(DetectorBase): # compile filename basepath = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/pilatus_2/" - self.destination_path = os.path.join( + self.filepath = os.path.join( basepath, self.filewriter.get_scan_directory(self.scaninfo.scan_number, 1000, 5), ) # Make directory if needed - os.makedirs(self.destination_path, exist_ok=True) + os.makedirs(self.filepath, exist_ok=True) data_msg = { "source": [ { "searchPath": "/", "searchPattern": "glob:*.cbf", - "destinationPath": self.destination_path, + "destinationPath": self.filepath, } ] } @@ -339,32 +338,76 @@ class PilatusCsaxs(DetectorBase): res.raise_for_status() def stage(self) -> List[object]: - """stage the detector and file writer""" - self._acquisition_done = False + """Stage command, called from BEC in preparation of a scan. + This will iniate the preparation of detector and file writer. + The following functuions are called: + - _prep_file_writer + - _prep_det + - _publish_file_location + + The device returns a List[object] from the Ophyd Device class. + + #TODO make sure this is fullfiled + + Staging not idempotent and should raise + :obj:`RedundantStaging` if staged twice without an + intermediate :meth:`~BlueskyInterface.unstage`. + """ self._stopped = False self.scaninfo.load_scan_metadata() self.mokev = self.device_manager.devices.mokev.obj.read()[ self.device_manager.devices.mokev.name ]["value"] - - logger.info("Waiting for pilatus2 to be armed") - self._prep_det() - logger.info("Pilatus2 armed") - logger.info("Waiting for pilatus2 zmq stream to be ready") + # TODO refactor logger.info to DEBUG mode? self._prep_file_writer() - logger.info("Pilatus2 zmq ready") - msg = BECMessage.FileMessage( - file_path=self.filepath_h5, done=False, metadata={"input_path": self.destination_path} - ) + self._prep_det() + state = False + self._publish_file_location(done=state, successful=state) return super().stage() + # TODO might be useful for base class def pre_scan(self) -> None: + """ " Pre_scan gets executed right before""" + self._arm_acquisition() + + def _arm_acquisition(self) -> None: self.acquire() + def _publish_file_location(self, done=False, successful=False) -> None: + """Publish the filepath to REDIS + First msg for file writer and the second one for other listeners (e.g. radial integ) + """ + pipe = self._producer.pipeline() + msg = BECMessage.FileMessage(file_path=self.filepath, done=done, successful=successful) + self._producer.set_and_publish( + MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps(), pipe=pipe + ) + self._producer.set_and_publish( + MessageEndpoints.file_event(self.name), msg.dumps(), pip=pipe + ) + pipe.execute() + + # TODO function for abstract class? + def trigger(self) -> DeviceStatus: + """Trigger the detector, called from BEC.""" + self._on_trigger() + return super().trigger() + + # TODO function for abstract class? + def _on_trigger(self): + """Specify action that should be taken upon trigger signal.""" + pass + def unstage(self) -> List[object]: - """unstage the detector and file writer""" - # Reset to software trigger - logger.info("Waiting for Pilatus to return from acquisition") + """Unstage the device. + + This method must be idempotent, multiple calls (without a new + call to 'stage') have no effect. + + Functions called: + - _finished + - _publish_file_location + """ old_scanID = self.scaninfo.scanID self.scaninfo.load_scan_metadata() logger.info(f"Old scanID: {old_scanID}, ") @@ -372,27 +415,37 @@ class PilatusCsaxs(DetectorBase): self._stopped = True if self._stopped: return super().unstage() - self._pilatus_finished() - msg = BECMessage.FileMessage( - file_path=self.filepath_h5, done=True, metadata={"input_path": self.destination_path} - ) - self._producer.set_and_publish( - MessageEndpoints.public_file(self.scaninfo.scanID, self.name), - msg.dumps(), - ) - self._producer.set_and_publish( - MessageEndpoints.file_event(self.name), - msg.dumps(), - ) - logger.info("Pilatus2 done") + self._finished() + state = True + self._publish_file_location(done=state, successful=state) + self._start_h5converter(done=state) return super().unstage() - def _pilatus_finished(self) -> None: - # time.sleep(2) + def _start_h5converter(self, done=False) -> None: + """Start the h5converter""" + msg = BECMessage.FileMessage( + file_path=self.filepath_raw, done=done, metadata={"input_path": self.filepath} + ) + self._producer.set_and_publish( + MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps() + ) + + def _finished(self) -> None: + """Check if acquisition is finished. + + This function is called from unstage and stop + and will check detector and file backend status. + Timeouts after given time + + Functions called: + - _stop_det + - _stop_file_writer + """ while True: if self.device_manager.devices.mcs.obj._staged != Staged.yes: break time.sleep(0.1) + # TODO implement a waiting function or not # time.sleep(2) # timer = 0 # while True: @@ -412,7 +465,9 @@ class PilatusCsaxs(DetectorBase): # # f"Pilatus timeout with detector state {self.cam.acquire.get()} and camserver return status: {rtr} " # # ) + self._stop_det() self._stop_file_writer() + # TODO explore if sleep is needed time.sleep(0.5) self._close_file_writer() @@ -421,15 +476,18 @@ class PilatusCsaxs(DetectorBase): or arm the detector in hardware of the detector """ self.cam.acquire.put(1) + # TODO check if sleep of 1s is needed, could be that less is enough time.sleep(1) + def _stop_det(self) -> None: + """Stop the detector""" + self.cam.acquire.put(0) + def stop(self, *, success=False) -> None: """Stop the scan, with camera and file writer""" - self.cam.acquire.put(0) + self._stop_det() self._stop_file_writer() - # TODO maybe needed self._close_file_writer() - # self.unstage() super().stop(success=success) self._stopped = True