fix: bugfixes after adding tests

This commit is contained in:
2023-10-30 22:24:25 +01:00
parent cf09883a9b
commit 72b88482ca

View File

@ -1,78 +1,60 @@
import enum import enum
import threading
import time import time
from typing import Any, List import threading
from bec_lib.core.devicemanager import DeviceStatus
import numpy as np import numpy as np
import os import os
from typing import Any, List
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd import DetectorBase, Device from ophyd import DetectorBase, Device
from ophyd import ADComponent as ADCpt from ophyd import ADComponent as ADCpt
from std_daq_client import StdDaqClient
from bec_lib.core import BECMessage, MessageEndpoints, threadlocked from bec_lib.core import BECMessage, MessageEndpoints, threadlocked
from bec_lib.core.file_utils import FileWriterMixin from bec_lib.core.file_utils import FileWriterMixin
from bec_lib.core import bec_logger from bec_lib.core import bec_logger
from ophyd_devices.utils import bec_utils as bec_utils
from std_daq_client import StdDaqClient
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils import bec_utils
logger = bec_logger.logger logger = bec_logger.logger
class EigerError(Exception): class EigerError(Exception):
"""Base class for exceptions in this module."""
pass pass
class EigerTimeoutError(Exception): class EigerTimeoutError(Exception):
"""Raised when the Eiger does not respond in time during unstage."""
pass pass
class SlsDetectorCam(Device): class SlsDetectorCam(Device):
# detector_type = ADCpt(EpicsSignalRO, "DetectorType_RBV") """SLS Detector Camera - Eiger 9M
# setting = ADCpt(EpicsSignalWithRBV, "Setting")
# delay_time = ADCpt(EpicsSignalWithRBV, "DelayTime") Base class to map EPICS PVs to ophyd signals.
"""
threshold_energy = ADCpt(EpicsSignalWithRBV, "ThresholdEnergy") threshold_energy = ADCpt(EpicsSignalWithRBV, "ThresholdEnergy")
beam_energy = ADCpt(EpicsSignalWithRBV, "BeamEnergy") beam_energy = ADCpt(EpicsSignalWithRBV, "BeamEnergy")
# enable_trimbits = ADCpt(EpicsSignalWithRBV, "Trimbits")
bit_depth = ADCpt(EpicsSignalWithRBV, "BitDepth") bit_depth = ADCpt(EpicsSignalWithRBV, "BitDepth")
# num_gates = ADCpt(EpicsSignalWithRBV, "NumGates") num_images = ADCpt(EpicsSignalWithRBV, "NumCycles")
num_cycles = ADCpt(EpicsSignalWithRBV, "NumCycles")
num_frames = ADCpt(EpicsSignalWithRBV, "NumFrames") num_frames = ADCpt(EpicsSignalWithRBV, "NumFrames")
timing_mode = ADCpt(EpicsSignalWithRBV, "TimingMode") trigger_mode = ADCpt(EpicsSignalWithRBV, "TimingMode")
trigger_software = ADCpt(EpicsSignal, "TriggerSoftware") trigger_software = ADCpt(EpicsSignal, "TriggerSoftware")
# high_voltage = ADCpt(EpicsSignalWithRBV, "HighVoltage")
# Receiver and data callback
# receiver_mode = ADCpt(EpicsSignalWithRBV, "ReceiverMode")
# receiver_stream = ADCpt(EpicsSignalWithRBV, "ReceiverStream")
# enable_data = ADCpt(EpicsSignalWithRBV, "UseDataCallback")
# missed_packets = ADCpt(EpicsSignalRO, "ReceiverMissedPackets_RBV")
# Direct settings access
# setup_file = ADCpt(EpicsSignal, "SetupFile")
# load_setup = ADCpt(EpicsSignal, "LoadSetup")
# command = ADCpt(EpicsSignal, "Command")
# Mythen 3
# counter_mask = ADCpt(EpicsSignalWithRBV, "CounterMask")
# counter1_threshold = ADCpt(EpicsSignalWithRBV, "Counter1Threshold")
# counter2_threshold = ADCpt(EpicsSignalWithRBV, "Counter2Threshold")
# counter3_threshold = ADCpt(EpicsSignalWithRBV, "Counter3Threshold")
# gate1_delay = ADCpt(EpicsSignalWithRBV, "Gate1Delay")
# gate1_width = ADCpt(EpicsSignalWithRBV, "Gate1Width")
# gate2_delay = ADCpt(EpicsSignalWithRBV, "Gate2Delay")
# gate2_width = ADCpt(EpicsSignalWithRBV, "Gate2Width")
# gate3_delay = ADCpt(EpicsSignalWithRBV, "Gate3Delay")
# gate3_width = ADCpt(EpicsSignalWithRBV, "Gate3Width")
# # Moench
# json_frame_mode = ADCpt(EpicsSignalWithRBV, "JsonFrameMode")
# json_detector_mode = ADCpt(EpicsSignalWithRBV, "JsonDetectorMode")
# fixes due to missing PVs from CamBase
acquire = ADCpt(EpicsSignal, "Acquire") acquire = ADCpt(EpicsSignal, "Acquire")
detector_state = ADCpt(EpicsSignalRO, "DetectorState_RBV") detector_state = ADCpt(EpicsSignalRO, "DetectorState_RBV")
class TriggerSource(int, enum.Enum): class TriggerSource(int, enum.Enum):
"""Trigger signals for Eiger9M detector"""
AUTO = 0 AUTO = 0
TRIGGER = 1 TRIGGER = 1
GATING = 2 GATING = 2
@ -80,6 +62,8 @@ class TriggerSource(int, enum.Enum):
class DetectorState(int, enum.Enum): class DetectorState(int, enum.Enum):
"""Detector states for Eiger9M detector"""
IDLE = 0 IDLE = 0
ERROR = 1 ERROR = 1
WAITING = 2 WAITING = 2
@ -105,6 +89,7 @@ class Eiger9mCsaxs(DetectorBase):
""" """
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = [ USER_ACCESS = [
"describe", "describe",
] ]
@ -124,6 +109,18 @@ class Eiger9mCsaxs(DetectorBase):
sim_mode=False, sim_mode=False,
**kwargs, **kwargs,
): ):
"""Initialize the Eiger9M detector
Args:
#TODO add here the parameters for kind, read_attrs, configuration_attrs, parent
prefix (str): PV prefix (X12SA-ES-EIGER9M:)
name (str): 'eiger'
kind (str):
read_attrs (list):
configuration_attrs (list):
parent (object):
device_manager (object): BEC device manager
sim_mode (bool): simulation mode to start the detector without BEC, e.g. from ipython shell
"""
super().__init__( super().__init__(
prefix=prefix, prefix=prefix,
name=name, name=name,
@ -133,46 +130,90 @@ class Eiger9mCsaxs(DetectorBase):
parent=parent, parent=parent,
**kwargs, **kwargs,
) )
self._stopped = False
self._lock = threading.RLock()
if device_manager is None and not sim_mode: if device_manager is None and not sim_mode:
raise EigerError("Add DeviceManager to initialization or init with sim_mode=True") raise EigerError("Add DeviceManager to initialization or init with sim_mode=True")
# TODO check if threadlock is needed for unstage
self._lock = threading.RLock()
self._stopped = False
self.name = name self.name = name
self.wait_for_connection() # Make sure to be connected before talking to PVs self.service_cfg = None
self.std_client = None
self.wait_for_connection(all_signals=True)
if not sim_mode: if not sim_mode:
from bec_lib.core.bec_service import SERVICE_CONFIG self._update_service_config()
self.device_manager = device_manager self.device_manager = device_manager
self._producer = self.device_manager.producer self._producer = self.device_manager.producer
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
else: else:
self._producer = bec_utils.MockProducer() self._producer = bec_utils.MockProducer()
self.device_manager = bec_utils.MockDeviceManager() self.device_manager = bec_utils.MockDeviceManager()
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
self.scaninfo.load_scan_metadata() self.scaninfo.load_scan_metadata()
self.service_cfg = {"base_path": f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"} base_path = f"/sls/X12SA/data/{self.scaninfo.username}/Data10/"
self.service_cfg = {"base_path": base_path}
self.scaninfo = BecScaninfoMixin(device_manager, sim_mode) self.scaninfo = BecScaninfoMixin(device_manager, sim_mode)
self.scaninfo.load_scan_metadata() self.scaninfo.load_scan_metadata()
# TODO
self.filepath = ""
self.filewriter = FileWriterMixin(self.service_cfg) self.filewriter = FileWriterMixin(self.service_cfg)
self.reduce_readout = 1e-3 # 3 ms self._init()
self.triggermode = 0 # 0 : internal, scan must set this if hardware triggered
self._init_eiger9m()
self._init_standard_daq()
# self.mokev = self.device_manager.devices.mokev.read()[ def _update_service_config(self) -> None:
# self.device_manager.devices.mokev.name from bec_lib.core.bec_service import SERVICE_CONFIG
# ]["value"]
def _init_eiger9m(self) -> None: self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
"""Init parameters for Eiger 9m"""
# TODO function for abstract class?
def _init(self) -> None:
"""Initialize detector, filewriter and set default parameters"""
self._default_parameter()
self._init_detector()
self._init_filewriter()
# TODO function for abstract class?
def _default_parameter(self) -> None:
"""Set default parameters for Eiger 9M
readout (float) : readout time in seconds
"""
self.reduce_readout = 1e-3
# TODO function for abstract class?
def _init_detector(self) -> None:
"""Init parameters for Eiger 9m.
Depends on hardware configuration and delay generators.
At this point it is set up for gating mode (09/2023).
"""
self._stop_det()
self._set_trigger(TriggerSource.GATING) self._set_trigger(TriggerSource.GATING)
self.stop_acquisition()
# TODO function for abstract class?
def _init_filewriter(self) -> None:
"""Init parameters for filewriter.
For the Eiger9M, the data backend is std_daq client.
Setting up these parameters depends on the backend, and would need to change upon changes in the backend.
"""
self.std_rest_server_url = "http://xbl-daq-29:5000"
self.std_client = StdDaqClient(url_base=self.std_rest_server_url)
self.std_client.stop_writer()
timeout = 0
# TODO put back change of e-account! and check with Leo which status to wait for
eacc = self.scaninfo.username
self._update_std_cfg("writer_user_id", int(eacc.strip(" e")))
time.sleep(5)
while not self.std_client.get_status()["state"] == "READY":
time.sleep(0.1)
timeout = timeout + 0.1
logger.info("Waiting for std_daq init.")
if timeout > 5:
if not self.std_client.get_status()["state"] == "READY":
raise EigerError(
f"Std client not in READY state, returns: {self.std_client.get_status()}"
)
else:
return
def _update_std_cfg(self, cfg_key: str, value: Any) -> None: def _update_std_cfg(self, cfg_key: str, value: Any) -> None:
"""Update std_daq config with new e-account for the current beamtime"""
# TODO Do we need all the loggers here, should this be properly refactored with a DEBUG mode?
cfg = self.std_client.get_config() cfg = self.std_client.get_config()
old_value = cfg.get(cfg_key) old_value = cfg.get(cfg_key)
logger.info(old_value) logger.info(old_value)
@ -189,71 +230,60 @@ class Eiger9mCsaxs(DetectorBase):
logger.info(f"Updated std_daq config for key {cfg_key} from {old_value} to {value}") logger.info(f"Updated std_daq config for key {cfg_key} from {old_value} to {value}")
self.std_client.set_config(cfg) self.std_client.set_config(cfg)
def _init_standard_daq(self) -> None: # TODO function for abstract class?
self.std_rest_server_url = "http://xbl-daq-29:5000" def stage(self) -> List[object]:
self.std_client = StdDaqClient(url_base=self.std_rest_server_url) """Stage command, called from BEC in preparation of a scan.
self.std_client.stop_writer() This will iniate the preparation of detector and file writer.
timeout = 0 The following functuions are called (at least):
# TODO put back change of e-account! - _prep_file_writer
# self._update_std_cfg("writer_user_id", int(self.scaninfo.username.strip(" e"))) - _prep_det
# time.sleep(5) - _publish_file_location
while not self.std_client.get_status()["state"] == "READY": The device returns a List[object] from the Ophyd Device class.
time.sleep(0.1)
timeout = timeout + 0.1
logger.info("Waiting for std_daq init.")
if timeout > 5:
if not self.std_client.get_status()["state"]:
raise EigerError(
f"Std client not in READY state, returns: {self.std_client.get_status()}"
)
else:
return
def _prep_det(self) -> None: #TODO make sure this is fullfiled
self._set_det_threshold()
self._set_acquisition_params()
self._set_trigger(TriggerSource.GATING)
def _set_det_threshold(self) -> None: Staging not idempotent and should raise
# threshold_energy PV exists on Eiger 9M? :obj:`RedundantStaging` if staged twice without an
factor = 1 intermediate :meth:`~BlueskyInterface.unstage`.
if self.cam.threshold_energy._metadata["units"] == "eV":
factor = 1000
setp_energy = int(self.mokev * factor)
energy = self.cam.beam_energy.read()[self.cam.beam_energy.name]["value"]
if setp_energy != energy:
self.cam.beam_energy.set(setp_energy) # .wait()
threshold = self.cam.threshold_energy.read()[self.cam.threshold_energy.name]["value"]
if not np.isclose(setp_energy / 2, threshold, rtol=0.05):
self.cam.threshold_energy.set(setp_energy / 2) # .wait()
def _set_acquisition_params(self) -> None:
# self.cam.acquire_time.set(self.scaninfo.exp_time)
# Set acquisition parameters slightly shorter then cycle
# self.cam.acquire_period.set(
# self.scaninfo.exp_time + (self.scaninfo.readout_time - self.reduce_readout)
# )
self.cam.num_cycles.put(int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger))
self.cam.num_frames.put(1)
def _set_trigger(self, trigger_source: TriggerSource) -> None:
"""Set trigger source for the detector, either directly to value or TriggerSource.* with
AUTO = 0
TRIGGER = 1
GATING = 2
BURST_TRIGGER = 3
""" """
value = int(trigger_source) self._stopped = False
self.cam.timing_mode.put(value) self.scaninfo.load_scan_metadata()
self.mokev = self.device_manager.devices.mokev.obj.read()[
self.device_manager.devices.mokev.name
]["value"]
# TODO refactor logger.info to DEBUG mode?
self._prep_file_writer()
self._prep_det()
state = False
self._publish_file_location(done=state)
self._arm_acquisition()
# TODO Fix should take place in EPICS or directly on the hardware!
# We observed that the detector missed triggers in the beginning in case BEC was to fast. Adding 50ms delay solved this
time.sleep(0.05)
return super().stage()
def _filepath_exists(self, filepath: str) -> None:
timer = 0
while not os.path.exists(os.path.dirname(self.filepath)):
timer = time + 0.1
time.sleep(0.1)
if timer > 3:
raise EigerError(f"Timeout of 3s reached for filepath {self.filepath}")
# TODO function for abstract class?
def _prep_file_writer(self) -> None: def _prep_file_writer(self) -> None:
"""Prepare file writer for scan
self.filewriter is a FileWriterMixin object that hosts logic for compiling the filepath
"""
timer = 0
self.filepath = self.filewriter.compile_full_filename( self.filepath = self.filewriter.compile_full_filename(
self.scaninfo.scan_number, f"{self.name}.h5", 1000, 5, True self.scaninfo.scan_number, f"{self.name}.h5", 1000, 5, True
) )
while not os.path.exists(os.path.dirname(self.filepath)): self._filepath_exists(self.filepath)
time.sleep(0.1) self._stop_file_writer()
self._close_file_writer()
logger.info(f" std_daq output filepath {self.filepath}") logger.info(f" std_daq output filepath {self.filepath}")
# TODO Discuss with Leo if this is needed, or how to start the async writing best
try: try:
self.std_client.start_writer_async( self.std_client.start_writer_async(
{ {
@ -267,121 +297,192 @@ class Eiger9mCsaxs(DetectorBase):
raise EigerError(f"Timeout of start_writer_async with {exc}") raise EigerError(f"Timeout of start_writer_async with {exc}")
while True: while True:
timer = timer + 0.01
det_ctrl = self.std_client.get_status()["acquisition"]["state"] det_ctrl = self.std_client.get_status()["acquisition"]["state"]
if det_ctrl == "WAITING_IMAGES": if det_ctrl == "WAITING_IMAGES":
break break
time.sleep(0.005) time.sleep(0.01)
def _close_file_writer(self) -> None:
self.std_client.stop_writer()
pass
def stage(self) -> List[object]:
"""stage the detector and file writer"""
self._stopped = False
self._acquisition_done = False
self.scaninfo.load_scan_metadata()
self.mokev = self.device_manager.devices.mokev.obj.read()[
self.device_manager.devices.mokev.name
]["value"]
self._prep_file_writer()
self._prep_det()
logger.info("Waiting for std daq to be armed")
logger.info("std_daq is ready")
msg = BECMessage.FileMessage(file_path=self.filepath, done=False)
self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.scanID, self.name),
msg.dumps(),
)
msg = BECMessage.FileMessage(file_path=self.filepath, done=False)
self._producer.set_and_publish(
MessageEndpoints.file_event(self.name),
msg.dumps(),
)
self.arm_acquisition()
self._stopped = False
# We see that we miss a trigger occasionally, it seems that status msg from the ioc are not realiable
time.sleep(0.05)
return super().stage()
@threadlocked
def unstage(self) -> List[object]:
"""unstage the detector and file writer"""
logger.info("Waiting for Eiger9M to finish")
old_scanID = self.scaninfo.scanID
self.scaninfo.load_scan_metadata()
logger.info(f"Old scanID: {old_scanID}, ")
if self.scaninfo.scanID != old_scanID:
self._stopped = True
if self._stopped == True:
return super().unstage()
self._eiger9M_finished()
# Message to BEC
state = True
msg = BECMessage.FileMessage(file_path=self.filepath, done=True, successful=state)
self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.scanID, self.name),
msg.dumps(),
)
self._stopped = False
logger.info("Eiger9M finished")
return super().unstage()
@threadlocked
def _eiger9M_finished(self):
"""Function with 10s timeout"""
timer = 0
while True:
det_ctrl = self.cam.acquire.read()[self.cam.acquire.name]["value"]
# det_ctrl = 0
std_ctrl = self.std_client.get_status()["acquisition"]["state"]
status = self.std_client.get_status()
received_frames = status["acquisition"]["stats"]["n_write_completed"]
total_frames = int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger)
# TODO if no writing was performed before
if det_ctrl == 0 and std_ctrl == "FINISHED" and total_frames == received_frames:
break
if self._stopped == True:
self.stop_acquisition()
self._close_file_writer()
break
time.sleep(0.1)
timer += 0.1
if timer > 5: if timer > 5:
self._stopped == True
self._close_file_writer() self._close_file_writer()
self.stop_acquisition() raise EigerError(
raise EigerTimeoutError( f"Timeout of 5s reached for std_daq start_writer_async with std_daq client status {det_ctrl}"
f"Reached timeout with detector state {det_ctrl}, std_daq state {std_ctrl} and received frames of {received_frames} for the file writer"
) )
self._close_file_writer()
def arm_acquisition(self) -> None: # TODO function for abstract class?
"""Start acquisition in software trigger mode, def _stop_file_writer(self) -> None:
or arm the detector in hardware of the detector """Close file writer"""
self.std_client.stop_writer()
# TODO can I wait for a status message here maybe? To ensure writer stopped and returned
# TODO function for abstract class?
def _prep_det(self) -> None:
"""Prepare detector for scan.
Includes checking the detector threshold, setting the acquisition parameters and setting the trigger source
""" """
self._set_det_threshold()
self._set_acquisition_params()
self._set_trigger(TriggerSource.GATING)
def _set_det_threshold(self) -> None:
"""Set correct detector threshold to 1/2 of current X-ray energy, allow 5% tolerance"""
# threshold energy might be in eV or keV
factor = 1
unit = getattr(self.cam.threshold_energy, "units", None)
if unit != None and unit == "eV":
factor = 1000
setpoint = int(self.mokev * factor)
energy = self.cam.beam_energy.read()[self.cam.beam_energy.name]["value"]
if setpoint != energy:
self.cam.beam_energy.set(setpoint)
threshold = self.cam.threshold_energy.read()[self.cam.threshold_energy.name]["value"]
if not np.isclose(setpoint / 2, threshold, rtol=0.05):
self.cam.threshold_energy.set(setpoint / 2)
def _set_acquisition_params(self) -> None:
"""Set acquisition parameters for the detector"""
self.cam.num_images.put(int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger))
self.cam.num_frames.put(1)
# TODO function for abstract class? + call it for each scan??
def _set_trigger(self, trigger_source: TriggerSource) -> None:
"""Set trigger source for the detector.
Check the TriggerSource enum for possible values
Args:
trigger_source (TriggerSource): Trigger source for the detector
"""
value = int(trigger_source)
self.cam.trigger_mode.put(value)
def _publish_file_location(self, done: bool = False, successful: bool = None) -> None:
"""Publish the filepath to REDIS.
We publish two events here:
- file_event: event for the filewriter
- public_file: event for any secondary service (e.g. radial integ code)
Args:
done (bool): True if scan is finished
successful (bool): True if scan was successful
"""
pipe = self._producer.pipeline()
if successful is None:
msg = BECMessage.FileMessage(file_path=self.filepath, done=done)
else:
msg = BECMessage.FileMessage(file_path=self.filepath, done=done, successful=successful)
self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps(), pipe=pipe
)
self._producer.set_and_publish(
MessageEndpoints.file_event(self.name), msg.dumps(), pipe=pipe
)
pipe.execute()
# TODO function for abstract class?
def _arm_acquisition(self) -> None:
"""Arm Eiger detector for acquisition"""
timer = 0
self.cam.acquire.put(1) self.cam.acquire.put(1)
logger.info("Waiting for Eiger9m to be armed")
while True: while True:
det_ctrl = self.cam.detector_state.read()[self.cam.detector_state.name]["value"] det_ctrl = self.cam.detector_state.read()[self.cam.detector_state.name]["value"]
if det_ctrl == int(DetectorState.RUNNING): if det_ctrl == int(DetectorState.RUNNING):
break break
if self._stopped == True: if self._stopped == True:
break break
time.sleep(0.005) time.sleep(0.01)
logger.info("Eiger9m is armed") timer += 0.01
if timer > 5:
self.stop()
raise EigerTimeoutError("Failed to arm the acquisition. IOC did not update.")
def stop_acquisition(self) -> None: # TODO function for abstract class?
def trigger(self) -> DeviceStatus:
"""Trigger the detector, called from BEC."""
self._on_trigger()
return super().trigger()
# TODO function for abstract class?
def _on_trigger(self):
"""Specify action that should be taken upon trigger signal."""
pass
# TODO function for abstract class?
# TODO threadlocked was attached, in principle unstage needs to be fast and should possibly called multiple times
@threadlocked
def unstage(self) -> List[object]:
"""Unstage the device.
This method must be idempotent, multiple calls (without a new
call to 'stage') have no effect.
Functions called:
- _finished
- _publish_file_location
"""
# TODO solution for multiple calls of the function to avoid calling the finished loop.
# Loop to avoid calling the finished loop multiple times
old_scanID = self.scaninfo.scanID
self.scaninfo.load_scan_metadata()
if self.scaninfo.scanID != old_scanID:
self._stopped = True
if self._stopped == True:
return super().unstage()
self._finished()
state = True
self._publish_file_location(done=state, successful=state)
self._stopped = False
return super().unstage()
# TODO function for abstract class?
# TODO necessary, how can we make this cleaner.
@threadlocked
def _finished(self):
"""Check if acquisition is finished.
This function is called from unstage and stop
and will check detector and file backend status.
Timeouts after given time
Functions called:
- _stop_det
- _stop_file_writer
"""
sleep_time = 0.1
timeout = 5
timer = 0
# Check status with timeout, break out if _stopped=True
while True:
det_ctrl = self.cam.acquire.read()[self.cam.acquire.name]["value"]
status = self.std_client.get_status()
std_ctrl = status["acquisition"]["state"]
received_frames = status["acquisition"]["stats"]["n_write_completed"]
total_frames = int(self.scaninfo.num_points * self.scaninfo.frames_per_trigger)
if det_ctrl == 0 and std_ctrl == "FINISHED" and total_frames == received_frames:
break
if self._stopped == True:
break
time.sleep(sleep_time)
timer += sleep_time
if timer > timeout:
self._stopped == True
self._stop_det()
self._stop_file_writer()
raise EigerTimeoutError(
f"Reached timeout with detector state {det_ctrl}, std_daq state {std_ctrl} and received frames of {received_frames} for the file writer"
)
self._stop_det()
self._stop_file_writer()
def _stop_det(self) -> None:
"""Stop the detector and wait for the proper status message""" """Stop the detector and wait for the proper status message"""
logger.info("Waiting for Eiger9m to be armed")
elapsed_time = 0 elapsed_time = 0
sleep_time = 0.01 sleep_time = 0.01
timeout = 5
# Stop acquisition
self.cam.acquire.put(0) self.cam.acquire.put(0)
retry = False retry = False
# Check status
while True: while True:
det_ctrl = self.cam.detector_state.read()[self.cam.detector_state.name]["value"] det_ctrl = self.cam.detector_state.read()[self.cam.detector_state.name]["value"]
if det_ctrl == int(DetectorState.IDLE): if det_ctrl == int(DetectorState.IDLE):
@ -390,16 +491,17 @@ class Eiger9mCsaxs(DetectorBase):
break break
time.sleep(sleep_time) time.sleep(sleep_time)
elapsed_time += sleep_time elapsed_time += sleep_time
if elapsed_time > 2 and not retry: if elapsed_time > timeout // 2 and not retry:
retry = True retry = True
# Retry to stop acquisition
self.cam.acquire.put(0) self.cam.acquire.put(0)
if elapsed_time > 5: if elapsed_time > timeout:
raise EigerTimeoutError("Failed to stop the acquisition. IOC did not update.") raise EigerTimeoutError("Failed to stop the acquisition. IOC did not update.")
def stop(self, *, success=False) -> None: def stop(self, *, success=False) -> None:
"""Stop the scan, with camera and file writer""" """Stop the scan, with camera and file writer"""
self.stop_acquisition() self._stop_det()
self._close_file_writer() self._stop_file_writer()
super().stop(success=success) super().stop(success=success)
self._stopped = True self._stopped = True