Compare commits
7 Commits
fix/socket
...
feat/add_j
| Author | SHA1 | Date | |
|---|---|---|---|
| 9463a94e96 | |||
| c4e5d58987 | |||
| 74d4a05094 | |||
| 53cc177827 | |||
| 4f46fa23d4 | |||
| 6fadaf12ff | |||
| a4501b9420 |
@@ -42,3 +42,14 @@ ids_cam:
|
||||
enabled: true
|
||||
readoutPriority: async
|
||||
softwareTrigger: True
|
||||
|
||||
eiger_1_5:
|
||||
description: Eiger 1.5M in-vacuum detector
|
||||
deviceClass: csaxs_bec.devices.jungfraujoch.eiger_1_5m.Eiger1_5M
|
||||
deviceConfig:
|
||||
detector_distance: 100
|
||||
beam_center: [0, 0]
|
||||
onFailure: raise
|
||||
enabled: true
|
||||
readoutPriority: async
|
||||
softwareTrigger: False
|
||||
|
||||
@@ -33,6 +33,7 @@ from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
@@ -160,6 +161,9 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
# f = e + 1us
|
||||
# e has refernce to d, f has reference to e
|
||||
self.set_delay_pairs(channel="ef", delay=0, width=1e-6)
|
||||
time.sleep(
|
||||
0.2
|
||||
) # After staging, make sure that the DDG HW has some time to process changes properly.
|
||||
|
||||
def _prepare_mcs_on_trigger(self, mcs: MCSCardCSAXS) -> None:
|
||||
"""Prepare the MCS card for the next trigger.
|
||||
@@ -188,7 +192,13 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
while (
|
||||
self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set()
|
||||
):
|
||||
self._poll_loop()
|
||||
try:
|
||||
self._poll_loop()
|
||||
except Exception: # pylint: disable=broad-except
|
||||
content = traceback.format_exc()
|
||||
logger.error(
|
||||
f"Exception in polling loop thread, polling continues...\n Error content:\n{content}"
|
||||
)
|
||||
|
||||
self._poll_thread_poll_loop_done.set()
|
||||
|
||||
@@ -199,13 +209,17 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
The 20ms sleep was added to ensure that the event status is not polled too frequently,
|
||||
and to give the device time to process the previous command. This was found empirically
|
||||
to be necessary to avoid missing events.
|
||||
IMPORTANT: Do not remove sleeps or try to optimize this logic. This seems to be a
|
||||
fragile balance between polling frequency and device processing time. Also in between
|
||||
start/stop of polling. Please also consider that there is a sleep in on_trigger and
|
||||
that this might also be necessary to avoid that HW becomes unavailable/unstable.
|
||||
"""
|
||||
self.state.proc_status.put(1, use_complete=True)
|
||||
time.sleep(0.02) # 20ms delay for processing, important for not missing events
|
||||
if self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set():
|
||||
if self._poll_thread_kill_event.is_set() or not self._poll_thread_run_event.is_set():
|
||||
return
|
||||
self.state.event_status.get(use_monitor=False)
|
||||
if self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set():
|
||||
if self._poll_thread_kill_event.is_set() or not self._poll_thread_run_event.is_set():
|
||||
return
|
||||
time.sleep(0.02) # 20ms delay for processing, important for not missing events
|
||||
|
||||
@@ -256,6 +270,9 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
# Stop polling, poll once manually to ensure that the register is clean
|
||||
self._stop_polling()
|
||||
self._poll_thread_poll_loop_done.wait(timeout=1)
|
||||
# IMPORTANT: Keep this sleep setting, as it is necessary to avoid that the HW
|
||||
# becomes unresponsive. This was found empirically and seems to be necessary
|
||||
time.sleep(0.02)
|
||||
|
||||
# Prepare the MCS card for the next software trigger
|
||||
mcs = self.device_manager.devices.get("mcs", None)
|
||||
|
||||
@@ -1,381 +0,0 @@
|
||||
import enum
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import ADComponent as ADCpt
|
||||
from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
|
||||
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
|
||||
CustomDetectorMixin,
|
||||
PSIDetectorBase,
|
||||
)
|
||||
from std_daq_client import StdDaqClient
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class EigerError(Exception):
|
||||
"""Base class for exceptions in this module."""
|
||||
|
||||
|
||||
class EigerTimeoutError(EigerError):
|
||||
"""Raised when the Eiger does not respond in time."""
|
||||
|
||||
|
||||
class Eiger9MSetup(CustomDetectorMixin):
|
||||
"""Eiger setup class
|
||||
|
||||
Parent class: CustomDetectorMixin
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, *args, parent: Device = None, **kwargs) -> None:
|
||||
super().__init__(*args, parent=parent, **kwargs)
|
||||
self.std_rest_server_url = (
|
||||
kwargs["file_writer_url"] if "file_writer_url" in kwargs else "http://xbl-daq-29:5000"
|
||||
)
|
||||
self.std_client = None
|
||||
self._lock = threading.RLock()
|
||||
|
||||
def on_init(self) -> None:
|
||||
"""Initialize the detector"""
|
||||
self.initialize_default_parameter()
|
||||
self.initialize_detector()
|
||||
self.initialize_detector_backend()
|
||||
|
||||
def initialize_detector(self) -> None:
|
||||
"""Initialize detector"""
|
||||
self.stop_detector()
|
||||
self.parent.cam.trigger_mode.put(TriggerSource.GATING)
|
||||
|
||||
def initialize_default_parameter(self) -> None:
|
||||
"""Set default parameters for Eiger9M detector"""
|
||||
self.update_readout_time()
|
||||
|
||||
def update_readout_time(self) -> None:
|
||||
"""Set readout time for Eiger9M detector"""
|
||||
readout_time = (
|
||||
self.parent.scaninfo.readout_time
|
||||
if hasattr(self.parent.scaninfo, "readout_time")
|
||||
else self.parent.MIN_READOUT
|
||||
)
|
||||
self.parent.readout_time = max(readout_time, self.parent.MIN_READOUT)
|
||||
|
||||
def initialize_detector_backend(self) -> None:
|
||||
"""Initialize detector backend"""
|
||||
|
||||
self.std_client = StdDaqClient(url_base=self.std_rest_server_url)
|
||||
self.std_client.stop_writer()
|
||||
eacc = self.parent.scaninfo.username
|
||||
self.update_std_cfg("writer_user_id", int(eacc.strip(" e")))
|
||||
|
||||
signal_conditions = [(lambda: self.std_client.get_status()["state"], "READY")]
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=signal_conditions,
|
||||
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
|
||||
all_signals=True,
|
||||
):
|
||||
raise EigerTimeoutError(
|
||||
f"Std client not in READY state, returns: {self.std_client.get_status()}"
|
||||
)
|
||||
|
||||
def update_std_cfg(self, cfg_key: str, value: Any) -> None:
|
||||
"""
|
||||
Update std_daq config
|
||||
|
||||
Checks that the new value matches the type of the former entry.
|
||||
|
||||
Args:
|
||||
cfg_key (str) : config key of value to be updated
|
||||
value (Any) : value to be updated for the specified key
|
||||
|
||||
Raises:
|
||||
Raises EigerError if the key was not in the config before and if the new value does not match the type of the old value
|
||||
|
||||
"""
|
||||
|
||||
cfg = self.std_client.get_config()
|
||||
old_value = cfg.get(cfg_key)
|
||||
if old_value is None:
|
||||
raise EigerError(
|
||||
f"Tried to change entry for key {cfg_key} in std_config that does not exist"
|
||||
)
|
||||
if not isinstance(value, type(old_value)):
|
||||
raise EigerError(
|
||||
f"Type of new value {type(value)}:{value} does not match old value"
|
||||
f" {type(old_value)}:{old_value}"
|
||||
)
|
||||
|
||||
cfg.update({cfg_key: value})
|
||||
logger.debug(cfg)
|
||||
self.std_client.set_config(cfg)
|
||||
logger.debug(f"Updated std_daq config for key {cfg_key} from {old_value} to {value}")
|
||||
|
||||
def on_stage(self) -> None:
|
||||
"""Prepare the detector for scan"""
|
||||
self.prepare_detector()
|
||||
self.prepare_data_backend()
|
||||
self.publish_file_location(done=False, successful=False)
|
||||
self.arm_acquisition()
|
||||
|
||||
def prepare_detector(self) -> None:
|
||||
"""Prepare detector for scan"""
|
||||
self.set_detector_threshold()
|
||||
self.set_acquisition_params()
|
||||
self.parent.cam.trigger_mode.put(TriggerSource.GATING)
|
||||
|
||||
def set_detector_threshold(self) -> None:
|
||||
"""
|
||||
Set the detector threshold
|
||||
|
||||
The function sets the detector threshold automatically to 1/2 of the beam energy.
|
||||
"""
|
||||
mokev = self.parent.device_manager.devices.mokev.obj.read()[
|
||||
self.parent.device_manager.devices.mokev.name
|
||||
]["value"]
|
||||
factor = 1
|
||||
unit = getattr(self.parent.cam.threshold_energy, "units", None)
|
||||
|
||||
if unit is not None and unit == "eV":
|
||||
factor = 1000
|
||||
setpoint = int(mokev * factor)
|
||||
energy = self.parent.cam.beam_energy.read()[self.parent.cam.beam_energy.name]["value"]
|
||||
|
||||
if setpoint != energy:
|
||||
self.parent.cam.beam_energy.set(setpoint)
|
||||
|
||||
threshold = self.parent.cam.threshold_energy.read()[self.parent.cam.threshold_energy.name][
|
||||
"value"
|
||||
]
|
||||
if not np.isclose(setpoint / 2, threshold, rtol=0.05):
|
||||
self.parent.cam.threshold_energy.set(setpoint / 2)
|
||||
|
||||
def set_acquisition_params(self) -> None:
|
||||
"""Set acquisition parameters for the detector"""
|
||||
self.parent.cam.num_images.put(
|
||||
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
|
||||
)
|
||||
self.parent.cam.num_frames.put(1)
|
||||
self.update_readout_time()
|
||||
|
||||
def prepare_data_backend(self) -> None:
|
||||
"""Prepare the data backend for the scan"""
|
||||
self.parent.filepath.set(
|
||||
self.parent.filewriter.compile_full_filename(f"{self.parent.name}.h5")
|
||||
).wait()
|
||||
self.filepath_exists(self.parent.filepath.get())
|
||||
self.stop_detector_backend()
|
||||
try:
|
||||
self.std_client.start_writer_async(
|
||||
{
|
||||
"output_file": self.parent.filepath.get(),
|
||||
"n_images": int(
|
||||
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
|
||||
),
|
||||
}
|
||||
)
|
||||
except Exception as exc:
|
||||
time.sleep(5)
|
||||
if self.std_client.get_status()["state"] == "READY":
|
||||
raise EigerTimeoutError(f"Timeout of start_writer_async with {exc}") from exc
|
||||
|
||||
signal_conditions = [
|
||||
(lambda: self.std_client.get_status()["acquisition"]["state"], "WAITING_IMAGES")
|
||||
]
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=signal_conditions,
|
||||
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
|
||||
check_stopped=False,
|
||||
all_signals=True,
|
||||
):
|
||||
raise EigerTimeoutError(
|
||||
"Timeout of 5s reached for std_daq start_writer_async with std_daq client status"
|
||||
f" {self.std_client.get_status()}"
|
||||
)
|
||||
|
||||
def on_unstage(self) -> None:
|
||||
"""Unstage the detector"""
|
||||
pass
|
||||
|
||||
def on_complete(self) -> None:
|
||||
"""Complete the detector"""
|
||||
self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS)
|
||||
self.publish_file_location(done=True, successful=True)
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Stop the detector"""
|
||||
self.stop_detector()
|
||||
self.stop_detector_backend()
|
||||
|
||||
def stop_detector(self) -> None:
|
||||
"""Stop the detector"""
|
||||
|
||||
# Stop detector
|
||||
self.parent.cam.acquire.put(0)
|
||||
signal_conditions = [
|
||||
(
|
||||
lambda: self.parent.cam.detector_state.read()[self.parent.cam.detector_state.name][
|
||||
"value"
|
||||
],
|
||||
DetectorState.IDLE,
|
||||
)
|
||||
]
|
||||
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=signal_conditions,
|
||||
timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2,
|
||||
check_stopped=True,
|
||||
all_signals=False,
|
||||
):
|
||||
# Retry stop detector and wait for remaining time
|
||||
self.parent.cam.acquire.put(0)
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=signal_conditions,
|
||||
timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2,
|
||||
check_stopped=True,
|
||||
all_signals=False,
|
||||
):
|
||||
raise EigerTimeoutError(
|
||||
f"Failed to stop detector, detector state {signal_conditions[0][0]}"
|
||||
)
|
||||
|
||||
def stop_detector_backend(self) -> None:
|
||||
"""Close file writer"""
|
||||
self.std_client.stop_writer()
|
||||
|
||||
def filepath_exists(self, filepath: str) -> None:
|
||||
"""Check if filepath exists"""
|
||||
signal_conditions = [(lambda: os.path.exists(os.path.dirname(filepath)), True)]
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=signal_conditions,
|
||||
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
|
||||
check_stopped=False,
|
||||
all_signals=True,
|
||||
):
|
||||
raise EigerError(f"Timeout of 3s reached for filepath {filepath}")
|
||||
|
||||
def arm_acquisition(self) -> None:
|
||||
"""Arm Eiger detector for acquisition"""
|
||||
self.parent.cam.acquire.put(1)
|
||||
signal_conditions = [
|
||||
(
|
||||
lambda: self.parent.cam.detector_state.read()[self.parent.cam.detector_state.name][
|
||||
"value"
|
||||
],
|
||||
DetectorState.RUNNING,
|
||||
)
|
||||
]
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=signal_conditions,
|
||||
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
|
||||
check_stopped=True,
|
||||
all_signals=False,
|
||||
):
|
||||
raise EigerTimeoutError(
|
||||
f"Failed to arm the acquisition. Detector state {signal_conditions[0][0]}"
|
||||
)
|
||||
|
||||
def finished(self, timeout: int = 5) -> None:
|
||||
"""Check if acquisition is finished."""
|
||||
with self._lock:
|
||||
signal_conditions = [
|
||||
(
|
||||
lambda: self.parent.cam.acquire.read()[self.parent.cam.acquire.name]["value"],
|
||||
DetectorState.IDLE,
|
||||
),
|
||||
(lambda: self.std_client.get_status()["acquisition"]["state"], "FINISHED"),
|
||||
(
|
||||
lambda: self.std_client.get_status()["acquisition"]["stats"][
|
||||
"n_write_completed"
|
||||
],
|
||||
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger),
|
||||
),
|
||||
]
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=signal_conditions,
|
||||
timeout=timeout,
|
||||
check_stopped=True,
|
||||
all_signals=True,
|
||||
):
|
||||
raise EigerTimeoutError(
|
||||
f"Reached timeout with detector state {signal_conditions[0][0]}, std_daq state"
|
||||
f" {signal_conditions[1][0]} and received frames of {signal_conditions[2][0]} for"
|
||||
" the file writer"
|
||||
)
|
||||
self.stop_detector()
|
||||
self.stop_detector_backend()
|
||||
|
||||
|
||||
class SLSDetectorCam(Device):
|
||||
"""
|
||||
SLS Detector Camera - Eiger9M
|
||||
|
||||
Base class to map EPICS PVs to ophyd signals.
|
||||
"""
|
||||
|
||||
threshold_energy = ADCpt(EpicsSignalWithRBV, "ThresholdEnergy")
|
||||
beam_energy = ADCpt(EpicsSignalWithRBV, "BeamEnergy")
|
||||
bit_depth = ADCpt(EpicsSignalWithRBV, "BitDepth")
|
||||
num_images = ADCpt(EpicsSignalWithRBV, "NumCycles")
|
||||
num_frames = ADCpt(EpicsSignalWithRBV, "NumFrames")
|
||||
trigger_mode = ADCpt(EpicsSignalWithRBV, "TimingMode")
|
||||
trigger_software = ADCpt(EpicsSignal, "TriggerSoftware")
|
||||
acquire = ADCpt(EpicsSignal, "Acquire")
|
||||
detector_state = ADCpt(EpicsSignalRO, "DetectorState_RBV")
|
||||
|
||||
|
||||
class TriggerSource(int, enum.Enum):
|
||||
"""Trigger signals for Eiger9M detector"""
|
||||
|
||||
AUTO = 0
|
||||
TRIGGER = 1
|
||||
GATING = 2
|
||||
BURST_TRIGGER = 3
|
||||
|
||||
|
||||
class DetectorState(int, enum.Enum):
|
||||
"""Detector states for Eiger9M detector"""
|
||||
|
||||
IDLE = 0
|
||||
ERROR = 1
|
||||
WAITING = 2
|
||||
FINISHED = 3
|
||||
TRANSMITTING = 4
|
||||
RUNNING = 5
|
||||
STOPPED = 6
|
||||
STILL_WAITING = 7
|
||||
INITIALIZING = 8
|
||||
DISCONNECTED = 9
|
||||
ABORTED = 10
|
||||
|
||||
|
||||
class Eiger9McSAXS(PSIDetectorBase):
|
||||
"""
|
||||
Eiger9M detector for CSAXS
|
||||
|
||||
Parent class: PSIDetectorBase
|
||||
|
||||
class attributes:
|
||||
custom_prepare_cls (FalconSetup) : Custom detector setup class for cSAXS,
|
||||
inherits from CustomDetectorMixin
|
||||
PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector
|
||||
Various EpicsPVs for controlling the detector
|
||||
"""
|
||||
|
||||
# Specify which functions are revealed to the user in BEC client
|
||||
USER_ACCESS = []
|
||||
|
||||
# specify Setup class
|
||||
custom_prepare_cls = Eiger9MSetup
|
||||
# specify minimum readout time for detector and timeout for checks after unstage
|
||||
MIN_READOUT = 3e-3
|
||||
TIMEOUT_FOR_SIGNALS = 5
|
||||
# specify class attributes
|
||||
cam = ADCpt(SLSDetectorCam, "cam1:")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
eiger = Eiger9McSAXS(name="eiger", prefix="X12SA-ES-EIGER9M:", sim_mode=True)
|
||||
@@ -158,7 +158,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
self.acquire_mode.set(ACQUIREMODE.MCS).wait(timeout=self._pv_timeout)
|
||||
|
||||
# Subscribe the progress signal
|
||||
self.current_channel.subscribe(self._progress_update, run=False)
|
||||
# self.current_channel.subscribe(self._progress_update, run=False)
|
||||
|
||||
# Subscribe to the mca updates
|
||||
for name in self.counter_mapping.keys():
|
||||
|
||||
318
csaxs_bec/devices/jungfraujoch/eiger.py
Normal file
318
csaxs_bec/devices/jungfraujoch/eiger.py
Normal file
@@ -0,0 +1,318 @@
|
||||
"""
|
||||
Generic integration of JungfrauJoch backend with Eiger detectors
|
||||
for the cSAXS beamline at the Swiss Light Source.
|
||||
|
||||
The WEB UI is available on http://sls-jfjoch-001:8080
|
||||
|
||||
NOTE: this may not be the best place to store this information. It should be migrated to
|
||||
beamline documentation for debugging of Eiger & JungfrauJoch.
|
||||
|
||||
The JungfrauJoch server for cSAXS runs on sls-jfjoch-001.psi.ch
|
||||
User with sufficient rights may use:
|
||||
- sudo systemctl restart jfjoch_broker
|
||||
- sudo systemctl status jfjoch_broker
|
||||
to check and/or restart the broker for the JungfrauJoch server.
|
||||
|
||||
Some extra notes for setting up the detector:
|
||||
- If the energy on JFJ is set via DetectorSettings, the variable in DatasetSettings will be ignored
|
||||
- Changes in energy may take time, good to implement logic that only resets energy if needed.
|
||||
- For the Eiger, the frame_time_us in DetectorSettings is ignored, only the frame_time_us in
|
||||
the DatasetSettings is relevant
|
||||
- The bit_depth will be adjusted automatically based on the exp_time. Here, we need to ensure
|
||||
that subsequent triggers properly
|
||||
consider the readout_time of the boards. For Jungfrau detectors, the difference between
|
||||
count_time_us and frame_time_us is the readout_time of the boards. For the Eiger, this needs
|
||||
to be taken into account during the integration.
|
||||
- beam_center and detector settings are required input arguments, thus, they may be set to wrong
|
||||
values for acquisitions to start. Please keep this in mind.
|
||||
|
||||
Hardware related notes:
|
||||
- If there is an HW issue with the detector, power cycling may help.
|
||||
- The sls_detector package is available on console on /sls/X12SA/data/gac-x12sa/erik/micromamba
|
||||
- Run: source setup_9m.sh # Be careful, this connects to the detector, so it should not be
|
||||
used during operation
|
||||
- Useful commands:
|
||||
- p highvoltage 0 or 150 (operational)
|
||||
- g highvoltage
|
||||
- # Put high voltage to 0 before power cylcing it.
|
||||
- telnet bchip500
|
||||
- cd power_control_user/
|
||||
- ./on
|
||||
- ./off
|
||||
|
||||
Further information that may be relevant for debugging:
|
||||
JungfrauJoch - one needs to connect to the jfj-server (sls-jfjoch-001)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
import yaml
|
||||
from bec_lib.file_utils import get_full_path
|
||||
from bec_lib.logger import bec_logger
|
||||
from jfjoch_client.models.dataset_settings import DatasetSettings
|
||||
from jfjoch_client.models.detector_settings import DetectorSettings
|
||||
from jfjoch_client.models.detector_state import DetectorState
|
||||
from jfjoch_client.models.detector_timing import DetectorTiming
|
||||
from jfjoch_client.models.file_writer_format import FileWriterFormat
|
||||
from jfjoch_client.models.file_writer_settings import FileWriterSettings
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import DeviceStatus
|
||||
from ophyd_devices import FileEventSignal, PreviewSignal
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
from csaxs_bec.devices.jungfraujoch.jungfrau_joch_client import JungfrauJochClient
|
||||
from csaxs_bec.devices.jungfraujoch.jungfraujoch_preview import JungfrauJochPreview
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_server.device_server.device_server import DeviceManagerDS
|
||||
from jfjoch_client.models.measurement_statistics import MeasurementStatistics
|
||||
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
EIGER_READOUT_TIME_US = 500e-6 # 500 microseconds in s
|
||||
|
||||
|
||||
class EigerError(Exception):
|
||||
"""Custom exception for Eiger detector errors."""
|
||||
|
||||
|
||||
class Eiger(PSIDeviceBase):
|
||||
"""
|
||||
Base integration of the Eiger1.5M and Eiger9M at cSAXS. All relevant
|
||||
"""
|
||||
|
||||
USER_ACCESS = ["detector_distance", "beam_center"]
|
||||
|
||||
file_event = Cpt(FileEventSignal, name="file_event")
|
||||
preview_image = Cpt(PreviewSignal, name="preview_image", ndim=2)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
detector_name: Literal["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"],
|
||||
host: str = "http://sls-jfjoch-001",
|
||||
port: int = 8080,
|
||||
detector_distance: float = 100.0,
|
||||
beam_center: tuple[int, int] = (0, 0),
|
||||
scan_info: ScanInfo = None,
|
||||
readout_time: float = EIGER_READOUT_TIME_US,
|
||||
device_manager=None,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Initialize the PSI Device Base class.
|
||||
|
||||
Args:
|
||||
name (str) : Name of the device
|
||||
detector_name (str): Name of the detector. Supports ["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"]
|
||||
host (str): Hostname of the Jungfrau Joch server.
|
||||
port (int): Port of the Jungfrau Joch server.
|
||||
scan_info (ScanInfo): The scan info to use.
|
||||
device_manager (DeviceManagerDS): The device manager to use.
|
||||
**kwargs: Additional keyword arguments.
|
||||
"""
|
||||
super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs)
|
||||
self._host = f"{host}:{port}"
|
||||
self.jfj_client = JungfrauJochClient(host=self._host, parent=self)
|
||||
self.jfj_preview_client = JungfrauJochPreview(
|
||||
url="tcp://129.129.95.114:5400", cb=self.preview_image.put
|
||||
) # IP of sls-jfjoch-001.psi.ch on port 5400 for ZMQ stream
|
||||
self.device_manager = device_manager
|
||||
self.detector_name = detector_name
|
||||
self._detector_distance = detector_distance
|
||||
self._beam_center = beam_center
|
||||
self._readout_time = readout_time
|
||||
self._full_path = ""
|
||||
if self.device_manager is not None:
|
||||
self.device_manager: DeviceManagerDS
|
||||
|
||||
@property
|
||||
def detector_distance(self) -> float:
|
||||
"""The detector distance in mm."""
|
||||
return self._detector_distance
|
||||
|
||||
@detector_distance.setter
|
||||
def detector_distance(self, value: float) -> None:
|
||||
"""Set the detector distance in mm."""
|
||||
if value <= 0:
|
||||
raise ValueError("Detector distance must be a positive value.")
|
||||
self._detector_distance = value
|
||||
|
||||
@property
|
||||
def beam_center(self) -> tuple[float, float]:
|
||||
"""The beam center in pixels. (x,y)"""
|
||||
return self._beam_center
|
||||
|
||||
@beam_center.setter
|
||||
def beam_center(self, value: tuple[float, float]) -> None:
|
||||
"""Set the beam center in pixels. (x,y)"""
|
||||
self._beam_center = value
|
||||
|
||||
def on_init(self) -> None:
|
||||
"""
|
||||
Called when the device is initialized.
|
||||
|
||||
No siganls are connected at this point,
|
||||
thus should not be set here but in on_connected instead.
|
||||
"""
|
||||
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Called after the device is connected and its signals are connected.
|
||||
Default values for signals should be set here.
|
||||
"""
|
||||
logger.debug(f"On connected called for {self.name}")
|
||||
self.jfj_client.stop(request_timeout=3)
|
||||
# Check which detector is selected
|
||||
|
||||
# Get available detectors
|
||||
available_detectors = self.jfj_client.api.config_select_detector_get(_request_timeout=5)
|
||||
# Get current detector
|
||||
current_detector_name = ""
|
||||
if available_detectors.current_id:
|
||||
detector_selection = [
|
||||
det.description
|
||||
for det in available_detectors.detectors
|
||||
if det.id == available_detectors.current_id
|
||||
]
|
||||
current_detector_name = detector_selection[0] if detector_selection else ""
|
||||
if current_detector_name != self.detector_name:
|
||||
raise RuntimeError(
|
||||
f"Please select and initialise the detector {self.detector_name} in the WEB UI: {self._host}."
|
||||
)
|
||||
if self.jfj_client.detector_state != DetectorState.IDLE:
|
||||
raise RuntimeError(
|
||||
f"Detector {self.detector_name} is not in IDLE state, current state: {self.jfj_client.detector_state}. Please initialize the detector in the WEB UI: {self._host}."
|
||||
)
|
||||
# TODO - check again once Eiger should be initialized automatically, currently human initialization is expected
|
||||
# # Once the automation should be enabled, we may use here
|
||||
# detector_selection = [
|
||||
# det for det in available_detectors.detectors if det.id == self.detector_name
|
||||
# ]
|
||||
# if not detector_selection:
|
||||
# raise ValueError(
|
||||
# f"Detector {self.detector_name} not found in available detectors: {[det.description for det in available_detectors.detectors]}"
|
||||
# )
|
||||
# det_id = detector_selection[0].id
|
||||
# self.jfj_client.api.config_select_detector_put(
|
||||
# detector_selection=DetectorSelection(id=det_id), _request_timeout=5
|
||||
# )
|
||||
# self.jfj_client.connect_and_initialise(timeout=10)
|
||||
|
||||
# Setup Detector settings, here we may also set the energy already as this might be time consuming
|
||||
settings = DetectorSettings(frame_time_us=int(500), timing=DetectorTiming.TRIGGER)
|
||||
self.jfj_client.set_detector_settings(settings, timeout=10)
|
||||
# Set the file writer to the appropriate output for the HDF5 file
|
||||
file_writer_settings = FileWriterSettings(overwrite=True, format=FileWriterFormat.NXMXVDS)
|
||||
logger.debug(
|
||||
f"Setting writer_settings: {yaml.dump(file_writer_settings.to_dict(), indent=4)}"
|
||||
)
|
||||
self.jfj_client.api.config_file_writer_put(
|
||||
file_writer_settings=file_writer_settings, _request_timeout=10
|
||||
)
|
||||
# Start the preview client
|
||||
self.jfj_preview_client.connect()
|
||||
self.jfj_preview_client.start()
|
||||
logger.info(f"Connected to JungfrauJoch preview stream at {self.jfj_preview_client.url}")
|
||||
|
||||
def on_stage(self) -> DeviceStatus | None:
|
||||
"""
|
||||
Called while staging the device.
|
||||
|
||||
Information about the upcoming scan can be accessed from the scan_info object.
|
||||
"""
|
||||
start_time = time.time()
|
||||
scan_msg = self.scan_info.msg
|
||||
# Set acquisition parameter
|
||||
# TODO add check of mono energy, this can then also be passed to DatasetSettings
|
||||
incident_energy = 12.0
|
||||
exp_time = scan_msg.scan_parameters.get("exp_time", 0)
|
||||
if exp_time <= self._readout_time:
|
||||
raise ValueError(
|
||||
f"Receive scan request for scan {scan_msg.scan_name} with exp_time {exp_time}s, which must be larger than the readout time {self._readout_time}s of the detector {self.detector_name}."
|
||||
)
|
||||
frame_time_us = exp_time #
|
||||
ntrigger = int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"])
|
||||
# Fetch file path
|
||||
self._full_path = get_full_path(scan_msg, name=f"{self.name}_master")
|
||||
self._full_path = os.path.abspath(os.path.expanduser(self._full_path))
|
||||
# Inform BEC about upcoming file event
|
||||
self.file_event.put(
|
||||
file_path=self._full_path,
|
||||
done=False,
|
||||
successful=False,
|
||||
hinted_h5_entries={"data": "entry/data/data"},
|
||||
)
|
||||
# JFJ adds _master.h5 automatically
|
||||
path = os.path.relpath(self._full_path, start="/sls/x12sa/data").removesuffix("_master.h5")
|
||||
data_settings = DatasetSettings(
|
||||
image_time_us=int(frame_time_us * 1e6), # This is currently ignored
|
||||
ntrigger=ntrigger,
|
||||
file_prefix=path,
|
||||
beam_x_pxl=int(self._beam_center[0]),
|
||||
beam_y_pxl=int(self._beam_center[1]),
|
||||
detector_distance_mm=self.detector_distance,
|
||||
incident_energy_ke_v=incident_energy,
|
||||
)
|
||||
logger.debug(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}")
|
||||
prep_time = start_time - time.time()
|
||||
logger.debug(f"Prepared information for eiger to start acquisition in {prep_time:.2f}s")
|
||||
self.jfj_client.wait_for_idle(timeout=10, request_timeout=10) # Ensure we are in IDLE state
|
||||
self.jfj_client.start(settings=data_settings) # Takes around ~0.6s
|
||||
logger.debug(f"Wait for IDLE and start call took {time.time()-start_time-prep_time:.2f}s")
|
||||
|
||||
def on_unstage(self) -> DeviceStatus:
|
||||
"""Called while unstaging the device."""
|
||||
|
||||
def on_pre_scan(self) -> DeviceStatus:
|
||||
"""Called right before the scan starts on all devices automatically."""
|
||||
|
||||
def on_trigger(self) -> DeviceStatus:
|
||||
"""Called when the device is triggered."""
|
||||
|
||||
def _file_event_callback(self, status: DeviceStatus) -> None:
|
||||
"""Callback to update the file_event signal when the acquisition is done."""
|
||||
logger.info(f"Acquisition done callback called for {self.name} for status {status.success}")
|
||||
self.file_event.put(
|
||||
file_path=self._full_path,
|
||||
done=status.done,
|
||||
successful=status.success,
|
||||
hinted_h5_entries={"data": "entry/data/data"},
|
||||
)
|
||||
|
||||
def on_complete(self) -> DeviceStatus:
|
||||
"""Called to inquire if a device has completed a scans."""
|
||||
|
||||
def wait_for_complete():
|
||||
start_time = time.time()
|
||||
timeout = 10
|
||||
for _ in range(timeout):
|
||||
if self.jfj_client.wait_for_idle(timeout=1, request_timeout=10):
|
||||
return
|
||||
statistics: MeasurementStatistics = self.jfj_client.api.statistics_data_collection_get(
|
||||
_request_timeout=5
|
||||
)
|
||||
raise TimeoutError(
|
||||
f"Timeout after waiting for detector {self.name} to complete for {time.time()-start_time:.2f}s, measurement statistics: {yaml.dump(statistics.to_dict(), indent=4)}"
|
||||
)
|
||||
|
||||
status = self.task_handler.submit_task(wait_for_complete, run=True)
|
||||
status.add_callback(self._file_event_callback)
|
||||
self.cancel_on_stop(status)
|
||||
return status
|
||||
|
||||
def on_kickoff(self) -> DeviceStatus | None:
|
||||
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Called when the device is stopped."""
|
||||
self.jfj_client.stop(
|
||||
request_timeout=0.5
|
||||
) # Call should not block more than 0.5 seconds to stop all devices...
|
||||
self.task_handler.shutdown()
|
||||
54
csaxs_bec/devices/jungfraujoch/eiger_1_5m.py
Normal file
54
csaxs_bec/devices/jungfraujoch/eiger_1_5m.py
Normal file
@@ -0,0 +1,54 @@
|
||||
"""
|
||||
Eiger 1.5M specific integration. It is based on the Eiger base integration for the JungfrauJoch backend
|
||||
which is placed in eiger_csaxs, and where code that is equivalent for the Eiger9M and Eiger1.5M is shared.
|
||||
|
||||
Please check the eiger_csaxs.py class for more details about the relevant services.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from csaxs_bec.devices.jungfraujoch.eiger import Eiger
|
||||
|
||||
EIGER1_5M_READOUT_TIME_US = 500e-6 # 500 microseconds in s
|
||||
DETECTOR_NAME = "EIGER 1.5M"
|
||||
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_server.device_server.device_server import DeviceManagerDS
|
||||
|
||||
|
||||
# pylint:disable=invalid-name
|
||||
class Eiger1_5M(Eiger):
|
||||
"""
|
||||
Eiger 1.5M specific integration for the in-vaccum Eiger.
|
||||
|
||||
The logic implemented here is coupled to the DelayGenerator integration,
|
||||
repsonsible for the global triggering of all devices through a single Trigger logic.
|
||||
Please check the eiger.py class for more details about the integration of relevant backend
|
||||
services. The detector_name must be set to "EIGER 1.5M:
|
||||
"""
|
||||
|
||||
USER_ACCESS = Eiger.USER_ACCESS + [] # Add more user_access methods here.
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
detector_distance: float = 100.0,
|
||||
beam_center: tuple[float, float] = (0.0, 0.0),
|
||||
scan_info: ScanInfo = None,
|
||||
device_manager: DeviceManagerDS = None,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
name=name,
|
||||
detector_name=DETECTOR_NAME,
|
||||
readout_time=EIGER1_5M_READOUT_TIME_US,
|
||||
detector_distance=detector_distance,
|
||||
beam_center=beam_center,
|
||||
scan_info=scan_info,
|
||||
device_manager=device_manager,
|
||||
**kwargs,
|
||||
)
|
||||
58
csaxs_bec/devices/jungfraujoch/eiger_9m.py
Normal file
58
csaxs_bec/devices/jungfraujoch/eiger_9m.py
Normal file
@@ -0,0 +1,58 @@
|
||||
"""
|
||||
Eiger 9M specific integration. It is based on the Eiger base integration for the JungfrauJoch backend
|
||||
which is placed in eiger_csaxs, and where code that is equivalent for the Eiger9M and Eiger1.5M is shared.
|
||||
|
||||
Please check the eiger_csaxs.py class for more details about the relevant services.
|
||||
|
||||
In 16bit mode, 8e7 counts/s per pixel are supported in summed up frames,
|
||||
although subframes will never have more than 12bit counts (~4000 counts per pixel in subframe).
|
||||
In 32bit mode, 2e7 counts/s per pixel are supported, for which subframes will have no
|
||||
more than 24bit counts, which means 16.7 million counts per pixel in subframes.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from csaxs_bec.devices.jungfraujoch.eiger import Eiger
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_server.device_server.device_server import DeviceManagerDS
|
||||
|
||||
EIGER9M_READOUT_TIME_US = 500e-6 # 500 microseconds in s
|
||||
DETECTOR_NAME = "EIGER 8.5M (tmp)" # "EIGER 9M""
|
||||
|
||||
|
||||
# pylint:disable=invalid-name
|
||||
class Eiger9M(Eiger):
|
||||
"""
|
||||
Eiger 1.5M specific integration for the in-vaccum Eiger.
|
||||
|
||||
The logic implemented here is coupled to the DelayGenerator integration,
|
||||
repsonsible for the global triggering of all devices through a single Trigger logic.
|
||||
Please check the eiger.py class for more details about the integration of relevant backend
|
||||
services. The detector_name must be set to "EIGER 1.5M:
|
||||
"""
|
||||
|
||||
USER_ACCESS = Eiger.USER_ACCESS + [] # Add more user_access methods here.
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
detector_distance: float = 100.0,
|
||||
beam_center: tuple[float, float] = (0.0, 0.0),
|
||||
scan_info: ScanInfo = None,
|
||||
device_manager: DeviceManagerDS = None,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
name=name,
|
||||
detector_name=DETECTOR_NAME,
|
||||
readout_time=EIGER9M_READOUT_TIME_US,
|
||||
detector_distance=detector_distance,
|
||||
beam_center=beam_center,
|
||||
scan_info=scan_info,
|
||||
device_manager=device_manager,
|
||||
**kwargs,
|
||||
)
|
||||
@@ -1,20 +1,35 @@
|
||||
import enum
|
||||
import math
|
||||
"""Module with client interface for the Jungfrau Joch detector API"""
|
||||
|
||||
import jfjoch_client
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import time
|
||||
import traceback
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import requests
|
||||
from bec_lib.logger import bec_logger
|
||||
from jfjoch_client.api.default_api import DefaultApi
|
||||
from jfjoch_client.api_client import ApiClient
|
||||
from jfjoch_client.configuration import Configuration
|
||||
from jfjoch_client.models.broker_status import BrokerStatus
|
||||
from jfjoch_client.models.dataset_settings import DatasetSettings
|
||||
from jfjoch_client.models.detector_settings import DetectorSettings
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ophyd import Device
|
||||
|
||||
|
||||
# pylint: disable=raise-missing-from
|
||||
# pylint: disable=broad-except
|
||||
class JungfrauJochClientError(Exception):
|
||||
"""Base class for exceptions in this module."""
|
||||
|
||||
|
||||
class DetectorState(enum.StrEnum):
|
||||
"""Detector states for Jungfrau Joch detector
|
||||
['Inactive', 'Idle', 'Busy', 'Measuring', 'Pedestal', 'Error']
|
||||
"""
|
||||
class DetectorState(str, enum.Enum):
|
||||
"""Possible Detector states for Jungfrau Joch detector"""
|
||||
|
||||
INACTIVE = "Inactive"
|
||||
IDLE = "Idle"
|
||||
@@ -24,24 +39,30 @@ class DetectorState(enum.StrEnum):
|
||||
ERROR = "Error"
|
||||
|
||||
|
||||
class ResponseWaitDone(enum.IntEnum):
|
||||
"""Response state for Jungfrau Joch detector wait till done"""
|
||||
|
||||
DETECTOR_IDLE = 200
|
||||
TIMEOUT_PARAM_OUT_OF_RANGE = 400
|
||||
JUNGFRAU_ERROR = 500
|
||||
DETECTOR_INACTIVE = 502
|
||||
TIMEOUT_REACHED = 504
|
||||
|
||||
|
||||
class JungfrauJochClient:
|
||||
"""Thin wrapper around the Jungfrau Joch API client"""
|
||||
"""Thin wrapper around the Jungfrau Joch API client.
|
||||
|
||||
def __init__(self, host: str = "http://sls-jfjoch-001:8080") -> None:
|
||||
sudo systemctl restart jfjoch_broker
|
||||
sudo systemctl status jfjoch_broker
|
||||
|
||||
It looks as if the detector is not being stopped properly.
|
||||
One module remains running, how can we restart the detector?
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, host: str = "http://sls-jfjoch-001:8080", parent: Device | None = None
|
||||
) -> None:
|
||||
self._initialised = False
|
||||
configuration = jfjoch_client.Configuration(host=host)
|
||||
api_client = jfjoch_client.ApiClient(configuration)
|
||||
self.api = jfjoch_client.DefaultApi(api_client)
|
||||
configuration = Configuration(host=host)
|
||||
api_client = ApiClient(configuration)
|
||||
self.api = DefaultApi(api_client)
|
||||
self._parent_name = parent.name if parent else self.__class__.__name__
|
||||
|
||||
@property
|
||||
def jjf_state(self) -> BrokerStatus:
|
||||
"""Get the status of JungfrauJoch"""
|
||||
response = self.api.status_get()
|
||||
return BrokerStatus(**response.to_dict())
|
||||
|
||||
@property
|
||||
def initialised(self) -> bool:
|
||||
@@ -53,101 +74,113 @@ class JungfrauJochClient:
|
||||
"""Set the connected status"""
|
||||
self._initialised = value
|
||||
|
||||
def get_jungfrau_joch_status(self) -> DetectorState:
|
||||
# TODO this is not correct, as it may be that the state in INACTIVE. Models are not in sync...
|
||||
# REMOVE all model enums as most of the validation takes place in the Pydantic models, i.e. BrokerStatus here..
|
||||
@property
|
||||
def detector_state(self) -> DetectorState:
|
||||
"""Get the status of JungfrauJoch"""
|
||||
return self.api.status_get().state
|
||||
return DetectorState(self.jjf_state.state)
|
||||
|
||||
def connect_and_initialise(self, timeout: int = 5) -> None:
|
||||
def connect_and_initialise(self, timeout: int = 10, **kwargs) -> None:
|
||||
"""Check if JungfrauJoch is connected and ready to receive commands"""
|
||||
status = self.api.status_get().state
|
||||
status = self.detector_state
|
||||
if status != DetectorState.IDLE:
|
||||
self.api.initialize_post()
|
||||
self.wait_till_done(timeout)
|
||||
self.initialised = True
|
||||
self.api.initialize_post() # This is a blocking call....
|
||||
self.wait_for_idle(timeout, request_timeout=timeout) # Blocking call
|
||||
self.initialised = True
|
||||
|
||||
def set_detector_settings(self, settings: dict | jfjoch_client.DatasetSettings) -> None:
|
||||
def set_detector_settings(self, settings: dict | DetectorSettings, timeout: int = 10) -> None:
|
||||
"""Set the detector settings. JungfrauJoch must be in IDLE, Error or Inactive state.
|
||||
Note, the full settings have to be provided, otherwise the settings will be overwritten with default values.
|
||||
|
||||
Args:
|
||||
settings (dict): dictionary of settings
|
||||
"""
|
||||
state = self.api.status_get().state
|
||||
state = self.detector_state
|
||||
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
|
||||
raise JungfrauJochClientError(
|
||||
f"Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}"
|
||||
)
|
||||
time.sleep(1) # Give the detector 1s to become IDLE, retry
|
||||
state = self.detector_state
|
||||
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
|
||||
raise JungfrauJochClientError(
|
||||
f"Error in {self._parent_name}. Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}"
|
||||
)
|
||||
|
||||
if isinstance(settings, dict):
|
||||
settings = jfjoch_client.DatasetSettings(**settings)
|
||||
self.api.config_detector_put(settings)
|
||||
settings = DetectorSettings(**settings)
|
||||
try:
|
||||
self.api.config_detector_put(detector_settings=settings, _request_timeout=timeout)
|
||||
except requests.exceptions.Timeout:
|
||||
raise TimeoutError(f"Timeout while setting detector settings for {self._parent_name}")
|
||||
except Exception:
|
||||
content = traceback.format_exc()
|
||||
raise JungfrauJochClientError(
|
||||
f"Error while setting detector settings for {self._parent_name}: {content}"
|
||||
)
|
||||
|
||||
def set_mesaurement_settings(self, settings: dict | jfjoch_client.DatasetSettings) -> None:
|
||||
"""Set the measurement settings. JungfrauJoch must be in IDLE state.
|
||||
def start(self, settings: dict | DatasetSettings, request_timeout: float = 10) -> None:
|
||||
"""Start the mesaurement. DatasetSettings must be provided, and JungfrauJoch must be in IDLE state.
|
||||
The method call is blocking and JungfrauJoch will be ready to measure after the call resolves.
|
||||
|
||||
Please check the DataSettings class for the available settings.
|
||||
The minimum required settings are:
|
||||
beam_x_pxl: StrictFloat | StrictInt,
|
||||
beam_y_pxl: StrictFloat | StrictInt,
|
||||
detector_distance_mm: float | int,
|
||||
incident_energy_keV: float | int,
|
||||
|
||||
Args:
|
||||
settings (dict): dictionary of settings
|
||||
|
||||
Please check the DataSettings class for the available settings. Minimum required settings are
|
||||
beam_x_pxl, beam_y_pxl, detector_distance_mm, incident_energy_keV.
|
||||
|
||||
"""
|
||||
state = self.api.status_get().state
|
||||
state = self.detector_state
|
||||
if state != DetectorState.IDLE:
|
||||
raise JungfrauJochClientError(
|
||||
f"Detector must be in IDLE state to set settings. Current state: {state}"
|
||||
f"Error in {self._parent_name}. Detector must be in IDLE state to set settings. Current state: {state}"
|
||||
)
|
||||
|
||||
if isinstance(settings, dict):
|
||||
settings = jfjoch_client.DatasetSettings(**settings)
|
||||
settings = DatasetSettings(**settings)
|
||||
try:
|
||||
res = self.api.start_post_with_http_info(dataset_settings=settings)
|
||||
if res.status_code != 200:
|
||||
logger.error(
|
||||
f"Error while setting measurement settings {settings}, response: {res}"
|
||||
)
|
||||
raise JungfrauJochClientError(
|
||||
f"Error while setting measurement settings {settings}, response: {res}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error while setting measurement settings {settings}. Exception raised {e}"
|
||||
self.api.start_post_with_http_info(
|
||||
dataset_settings=settings, _request_timeout=request_timeout
|
||||
)
|
||||
except requests.exceptions.Timeout:
|
||||
raise TimeoutError(
|
||||
f"TimeoutError in JungfrauJochClient for parent device {self._parent_name} for 'start' call"
|
||||
)
|
||||
except Exception:
|
||||
content = traceback.format_exc()
|
||||
raise JungfrauJochClientError(
|
||||
f"Error while setting measurement settings {settings}. Exception raised {e}"
|
||||
) from e
|
||||
f"Error in JungfrauJochClient for parent device {self._parent_name} during 'start' call: {content}"
|
||||
)
|
||||
|
||||
def wait_till_done(self, timeout: int = 5) -> None:
|
||||
"""Wait until JungfrauJoch is done.
|
||||
def stop(self, request_timeout: float = 0.5) -> None:
|
||||
"""Stop the acquisition, this only logs errors and is not raising."""
|
||||
try:
|
||||
self.api.cancel_post_with_http_info(_request_timeout=request_timeout)
|
||||
except requests.exceptions.Timeout:
|
||||
content = traceback.format_exc()
|
||||
logger.error(
|
||||
f"Timeout in JungFrauJochClient for device {self._parent_name} during stop: {content}"
|
||||
)
|
||||
except Exception:
|
||||
content = traceback.format_exc()
|
||||
logger.error(
|
||||
f"Error in JungFrauJochClient for device {self._parent_name} during stop: {content}"
|
||||
)
|
||||
|
||||
def wait_for_idle(self, timeout: int = 10, request_timeout: float | None = None) -> bool:
|
||||
"""Wait for JungfrauJoch to be in Idle state. Blocking call with timeout.
|
||||
|
||||
Args:
|
||||
timeout (int): timeout in seconds
|
||||
Returns:
|
||||
bool: True if the detector is in IDLE state, False if timeout occurred
|
||||
"""
|
||||
success = False
|
||||
if request_timeout is None:
|
||||
request_timeout = timeout
|
||||
try:
|
||||
response = self.api.wait_till_done_post_with_http_info(math.ceil(timeout / 2))
|
||||
if response.status_code != ResponseWaitDone.DETECTOR_IDLE:
|
||||
logger.info(
|
||||
f"Waitin for JungfrauJoch to be done, status: {ResponseWaitDone(response.status_code)}; response msg {response}"
|
||||
)
|
||||
response = self.api.wait_till_done_post_with_http_info(math.floor(timeout / 2))
|
||||
if response.status_code == ResponseWaitDone.DETECTOR_IDLE:
|
||||
success = True
|
||||
return
|
||||
except Exception as e:
|
||||
logger.error(f"Error while waiting for JungfrauJoch to initialise: {e}")
|
||||
raise JungfrauJochClientError(
|
||||
f"Error while waiting for JungfrauJoch to initialise: {e}"
|
||||
) from e
|
||||
else:
|
||||
if success is False:
|
||||
logger.error(
|
||||
f"Failed to initialise JungfrauJoch with status: {response.status_code}; response msg {response}"
|
||||
)
|
||||
raise JungfrauJochClientError(
|
||||
f"Failed to initialise JungfrauJoch with status: {response.status_code}; response msg {response}"
|
||||
)
|
||||
self.api.wait_till_done_post(timeout=timeout, _request_timeout=request_timeout)
|
||||
except requests.exceptions.Timeout:
|
||||
raise TimeoutError(f"HTTP request timeout in wait_for_idle for {self._parent_name}")
|
||||
except Exception:
|
||||
content = traceback.format_exc()
|
||||
logger.debug(f"Waiting for device {self._parent_name} to become IDLE: {content}")
|
||||
return False
|
||||
return True
|
||||
|
||||
96
csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py
Normal file
96
csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py
Normal file
@@ -0,0 +1,96 @@
|
||||
"""Module for the Eiger preview ZMQ stream."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
from typing import Callable
|
||||
|
||||
import numpy as np
|
||||
import zmq
|
||||
from bec_lib.logger import bec_logger
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
ZMQ_TOPIC_FILTER = b""
|
||||
|
||||
|
||||
class JungfrauJochPreview:
|
||||
USER_ACCESS = ["start", "stop"]
|
||||
|
||||
def __init__(self, url: str, cb: Callable):
|
||||
self.url = url
|
||||
self._socket = None
|
||||
self._shutdown_event = threading.Event()
|
||||
self._zmq_thread = None
|
||||
self._on_update_callback = cb
|
||||
|
||||
def connect(self):
|
||||
"""Connect to the JungfrauJoch PUB-SUB streaming interface
|
||||
|
||||
JungfrauJoch may reject connection for a few seconds when it restarts,
|
||||
so if it fails, wait a bit and try to connect again.
|
||||
"""
|
||||
# pylint: disable=no-member
|
||||
|
||||
context = zmq.Context()
|
||||
self._socket = context.socket(zmq.SUB)
|
||||
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
|
||||
try:
|
||||
self._socket.connect(self.url)
|
||||
except ConnectionRefusedError:
|
||||
time.sleep(1)
|
||||
self._socket.connect(self.url)
|
||||
|
||||
def start(self):
|
||||
self._zmq_thread = threading.Thread(
|
||||
target=self._zmq_update_loop, daemon=True, name="JungfrauJoch_live_preview"
|
||||
)
|
||||
self._zmq_thread.start()
|
||||
|
||||
def stop(self):
|
||||
self._shutdown_event.set()
|
||||
if self._zmq_thread:
|
||||
self._zmq_thread.join()
|
||||
|
||||
def _zmq_update_loop(self):
|
||||
while not self._shutdown_event.is_set():
|
||||
if self._socket is None:
|
||||
self.connect()
|
||||
try:
|
||||
self._poll()
|
||||
except ValueError:
|
||||
# Happens when ZMQ partially delivers the multipart message
|
||||
pass
|
||||
except zmq.error.Again:
|
||||
# Happens when receive queue is empty
|
||||
time.sleep(0.1)
|
||||
|
||||
def _poll(self):
|
||||
"""
|
||||
Poll the ZMQ socket for new data. It will throttle the data update and
|
||||
only subscribe to the topic for a single update. This is not very nice
|
||||
but it seems like there is currently no option to set the update rate on
|
||||
the backend.
|
||||
"""
|
||||
|
||||
if self._shutdown_event.wait(0.2):
|
||||
return
|
||||
|
||||
try:
|
||||
# subscribe to the topic
|
||||
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
|
||||
|
||||
# pylint: disable=no-member
|
||||
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
|
||||
self._parse_data(r)
|
||||
|
||||
finally:
|
||||
# Unsubscribe from the topic
|
||||
self._socket.setsockopt(zmq.UNSUBSCRIBE, ZMQ_TOPIC_FILTER)
|
||||
|
||||
def _parse_data(self, data):
|
||||
# TODO decode and parse the data
|
||||
# self._on_update_callback(data)
|
||||
pass
|
||||
BIN
frame_dump.cbor
Normal file
BIN
frame_dump.cbor
Normal file
Binary file not shown.
@@ -23,6 +23,7 @@ dependencies = [
|
||||
"pyepics",
|
||||
"pyueye", # for the IDS uEye camera
|
||||
"bec_widgets",
|
||||
"zmq",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
|
||||
318
tests/tests_devices/test_eiger.py
Normal file
318
tests/tests_devices/test_eiger.py
Normal file
@@ -0,0 +1,318 @@
|
||||
# pylint: skip-file
|
||||
import os
|
||||
import threading
|
||||
from time import time
|
||||
from typing import TYPE_CHECKING, Generator
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from bec_lib.messages import FileMessage, ScanStatusMessage
|
||||
from jfjoch_client.models.broker_status import BrokerStatus
|
||||
from jfjoch_client.models.dataset_settings import DatasetSettings
|
||||
from jfjoch_client.models.detector_list import DetectorList
|
||||
from jfjoch_client.models.detector_list_element import DetectorListElement
|
||||
from jfjoch_client.models.detector_settings import DetectorSettings
|
||||
from jfjoch_client.models.detector_timing import DetectorTiming
|
||||
from jfjoch_client.models.file_writer_format import FileWriterFormat
|
||||
from jfjoch_client.models.file_writer_settings import FileWriterSettings
|
||||
from jfjoch_client.models.measurement_statistics import MeasurementStatistics
|
||||
from ophyd import Staged
|
||||
from ophyd_devices.utils.psi_device_base_utils import DeviceStatus
|
||||
|
||||
from csaxs_bec.devices.jungfraujoch.eiger import Eiger
|
||||
from csaxs_bec.devices.jungfraujoch.eiger_1_5m import Eiger1_5M
|
||||
from csaxs_bec.devices.jungfraujoch.eiger_9m import Eiger9M
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from bec_lib.messages import FileMessage
|
||||
|
||||
# @pytest.fixture(scope="function")
|
||||
# def scan_worker_mock(scan_server_mock):
|
||||
# scan_server_mock.device_manager.connector = mock.MagicMock()
|
||||
# scan_worker = ScanWorker(parent=scan_server_mock)
|
||||
# yield scan_worker
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope="function",
|
||||
params=[(0.1, 1, 1, "line_scan"), (0.2, 2, 2, "time_scan"), (0.5, 5, 5, "acquire")],
|
||||
)
|
||||
def mock_scan_info(request, tmpdir):
|
||||
exp_time, frames_per_trigger, num_points, scan_name = request.param
|
||||
scan_info = ScanStatusMessage(
|
||||
scan_id="test_id",
|
||||
status="open",
|
||||
scan_number=1,
|
||||
scan_parameters={
|
||||
"exp_time": exp_time,
|
||||
"frames_per_trigger": frames_per_trigger,
|
||||
"system_config": {},
|
||||
},
|
||||
info={"file_components": (f"{tmpdir}/data/S00000/S000001", "h5")},
|
||||
num_points=num_points,
|
||||
scan_name=scan_name,
|
||||
)
|
||||
yield scan_info
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", params=[(1,), (2,)])
|
||||
def detector_list(request) -> Generator[DetectorList, None, None]:
|
||||
"""Fixture for the detector list."""
|
||||
current_id = request.param[0]
|
||||
detector_list = DetectorList(
|
||||
detectors=[
|
||||
DetectorListElement(
|
||||
id=1,
|
||||
description="EIGER 1.5M",
|
||||
serial_number="123456",
|
||||
base_ipv4_addr="192.168.0.1",
|
||||
udp_interface_count=1,
|
||||
nmodules=1,
|
||||
width=512,
|
||||
height=512,
|
||||
pixel_size_mm=0.1,
|
||||
readout_time_us=100,
|
||||
min_frame_time_us=1000,
|
||||
min_count_time_us=100,
|
||||
type="EIGER",
|
||||
),
|
||||
DetectorListElement(
|
||||
id=2,
|
||||
description="EIGER 8.5M (tmp)",
|
||||
serial_number="123456",
|
||||
base_ipv4_addr="192.168.0.1",
|
||||
udp_interface_count=1,
|
||||
nmodules=1,
|
||||
width=512,
|
||||
height=512,
|
||||
pixel_size_mm=0.1,
|
||||
readout_time_us=100,
|
||||
min_frame_time_us=1000,
|
||||
min_count_time_us=100,
|
||||
type="EIGER",
|
||||
),
|
||||
],
|
||||
current_id=current_id,
|
||||
)
|
||||
yield detector_list
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def eiger_1_5m(mock_scan_info) -> Generator[Eiger1_5M, None, None]:
|
||||
"""Fixture for the Eiger 1.5M device."""
|
||||
name = "eiger_1_5m"
|
||||
dev = Eiger1_5M(name=name, beam_center=(256, 256), detector_distance=100.0)
|
||||
dev.scan_info.msg = mock_scan_info
|
||||
yield dev
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def eiger_9m(mock_scan_info) -> Generator[Eiger9M, None, None]:
|
||||
"""Fixture for the Eiger 9M device.
|
||||
Currently only on_connected is different for both devices, all other methods are the same."""
|
||||
name = "eiger_9m"
|
||||
dev = Eiger9M(name=name)
|
||||
dev.scan_info.msg = mock_scan_info
|
||||
yield dev
|
||||
|
||||
|
||||
@pytest.mark.parametrize("detector_state", ["Idle", "Inactive"])
|
||||
def test_eiger_1_5m_on_connected(eiger_1_5m, detector_list, detector_state):
|
||||
"""Test the on_connected logic of the Eiger detector."""
|
||||
eiger = eiger_1_5m
|
||||
detector_id = 1
|
||||
with (
|
||||
mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop,
|
||||
mock.patch.object(
|
||||
eiger.jfj_client.api, "config_select_detector_get", return_value=detector_list
|
||||
),
|
||||
mock.patch.object(
|
||||
eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state=detector_state)
|
||||
),
|
||||
mock.patch.object(eiger.jfj_client, "set_detector_settings") as mock_set_det,
|
||||
mock.patch.object(eiger.jfj_client.api, "config_file_writer_put") as mock_file_writer,
|
||||
mock.patch.object(eiger, "jfj_preview_client") as mock_jfj_preview_client,
|
||||
):
|
||||
if detector_state != "Idle" or detector_list.current_id != detector_id:
|
||||
with pytest.raises(RuntimeError):
|
||||
eiger.on_connected()
|
||||
mock_jfj_client_stop.assert_called_once()
|
||||
assert mock_jfj_preview_client.call_count == 0
|
||||
else:
|
||||
eiger.on_connected()
|
||||
assert mock_set_det.call_args == mock.call(
|
||||
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10
|
||||
)
|
||||
assert mock_file_writer.call_args == mock.call(
|
||||
file_writer_settings=FileWriterSettings(
|
||||
overwrite=True, format=FileWriterFormat.NXMXVDS
|
||||
),
|
||||
_request_timeout=10,
|
||||
)
|
||||
mock_jfj_client_stop.assert_called_once()
|
||||
assert mock_jfj_preview_client.connect.call_count == 1
|
||||
assert mock_jfj_preview_client.start.call_count == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize("detector_state", ["Idle", "Inactive"])
|
||||
def test_eiger_9m_on_connected(eiger_9m, detector_list, detector_state):
|
||||
"""Test the on_connected logic of the Eiger detector."""
|
||||
eiger = eiger_9m
|
||||
detector_id = 2
|
||||
with (
|
||||
mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop,
|
||||
mock.patch.object(
|
||||
eiger.jfj_client.api, "config_select_detector_get", return_value=detector_list
|
||||
),
|
||||
mock.patch.object(
|
||||
eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state=detector_state)
|
||||
),
|
||||
mock.patch.object(eiger.jfj_client, "set_detector_settings") as mock_set_det,
|
||||
mock.patch.object(eiger.jfj_client.api, "config_file_writer_put") as mock_file_writer,
|
||||
mock.patch.object(eiger, "jfj_preview_client") as mock_jfj_preview_client,
|
||||
):
|
||||
if detector_state != "Idle" or detector_list.current_id != detector_id:
|
||||
with pytest.raises(RuntimeError):
|
||||
eiger.on_connected()
|
||||
mock_jfj_client_stop.assert_called_once()
|
||||
assert mock_jfj_preview_client.call_count == 0
|
||||
else:
|
||||
eiger.on_connected()
|
||||
assert mock_set_det.call_args == mock.call(
|
||||
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10
|
||||
)
|
||||
assert mock_file_writer.call_args == mock.call(
|
||||
file_writer_settings=FileWriterSettings(
|
||||
overwrite=True, format=FileWriterFormat.NXMXVDS
|
||||
),
|
||||
_request_timeout=10,
|
||||
)
|
||||
mock_jfj_client_stop.assert_called_once()
|
||||
assert mock_jfj_preview_client.connect.call_count == 1
|
||||
assert mock_jfj_preview_client.start.call_count == 1
|
||||
|
||||
|
||||
@pytest.mark.timeout(20)
|
||||
def test_eiger_on_stop(eiger_1_5m):
|
||||
"""Test the on_stop logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
|
||||
eiger = eiger_1_5m
|
||||
start_event = threading.Event()
|
||||
stop_event = threading.Event()
|
||||
|
||||
def tmp_task():
|
||||
start_event.set()
|
||||
try:
|
||||
while True:
|
||||
time.sleep(0.1)
|
||||
finally:
|
||||
stop_event.set()
|
||||
|
||||
eiger.task_handler.submit_task(tmp_task, run=True)
|
||||
start_event.wait(timeout=5) # Wait for thread to start
|
||||
|
||||
with mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop:
|
||||
eiger.on_stop()
|
||||
mock_jfj_client_stop.assert_called_once()
|
||||
stop_event.wait(timeout=5) # Thread should be killed from task_handler
|
||||
|
||||
|
||||
@pytest.mark.timeout(25)
|
||||
@pytest.mark.parametrize("raise_timeout", [True, False])
|
||||
def test_eiger_on_complete(eiger_1_5m, raise_timeout):
|
||||
"""Test the on_complete logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
|
||||
eiger = eiger_1_5m
|
||||
|
||||
callback_completed_event = threading.Event()
|
||||
|
||||
def _callback_complete(status: DeviceStatus):
|
||||
if status.done:
|
||||
callback_completed_event.set()
|
||||
|
||||
unblock_wait_for_idle = threading.Event()
|
||||
|
||||
def mock_wait_for_idle(timeout: int, request_timeout: float):
|
||||
if unblock_wait_for_idle.wait(timeout):
|
||||
if raise_timeout:
|
||||
return False
|
||||
return True
|
||||
return False
|
||||
|
||||
with (
|
||||
mock.patch.object(eiger.jfj_client, "wait_for_idle", side_effect=mock_wait_for_idle),
|
||||
mock.patch.object(
|
||||
eiger.jfj_client.api,
|
||||
"statistics_data_collection_get",
|
||||
return_value=MeasurementStatistics(run_number=1),
|
||||
),
|
||||
):
|
||||
status = eiger.complete()
|
||||
status.add_callback(_callback_complete)
|
||||
assert status.done == False
|
||||
assert status.success == False
|
||||
assert eiger.file_event.get() is None
|
||||
unblock_wait_for_idle.set()
|
||||
if raise_timeout:
|
||||
with pytest.raises(TimeoutError):
|
||||
status.wait(timeout=10)
|
||||
else:
|
||||
status.wait(timeout=10)
|
||||
assert status.done == True
|
||||
assert status.success == False if raise_timeout else True
|
||||
|
||||
|
||||
def test_eiger_file_event_callback(eiger_1_5m, tmp_path):
|
||||
"""Test the file_event callback of the Eiger detector. This is equivalent for 9M and 1_5M."""
|
||||
eiger = eiger_1_5m
|
||||
test_file = tmp_path / "test_file.h5"
|
||||
eiger._full_path = str(test_file)
|
||||
assert eiger.file_event.get() is None
|
||||
status = DeviceStatus(device=eiger, done=True, success=True)
|
||||
eiger._file_event_callback(status)
|
||||
file_msg: FileMessage = eiger.file_event.get()
|
||||
assert file_msg.device_name == eiger.name
|
||||
assert file_msg.file_path == str(test_file)
|
||||
assert file_msg.done is True
|
||||
assert file_msg.successful is True
|
||||
assert file_msg.hinted_h5_entries == {"data": "entry/data/data"}
|
||||
status = DeviceStatus(device=eiger, done=False, success=False)
|
||||
eiger._file_event_callback(status)
|
||||
file_msg: FileMessage = eiger.file_event.get()
|
||||
assert file_msg.device_name == eiger.name
|
||||
assert file_msg.file_path == str(test_file)
|
||||
assert file_msg.done is False
|
||||
assert file_msg.successful is False
|
||||
assert file_msg.hinted_h5_entries == {"data": "entry/data/data"}
|
||||
|
||||
|
||||
def test_eiger_on_sage(eiger_1_5m):
|
||||
"""Test the on_stage and on_unstage logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
|
||||
eiger = eiger_1_5m
|
||||
scan_msg = eiger.scan_info.msg
|
||||
with (
|
||||
mock.patch.object(eiger.jfj_client, "wait_for_idle", return_value=True),
|
||||
mock.patch.object(eiger.jfj_client, "start") as mock_start,
|
||||
):
|
||||
eiger.stage()
|
||||
assert (
|
||||
eiger._full_path
|
||||
== f"{scan_msg.info['file_components'][0]}_{eiger.name}_master.{scan_msg.info['file_components'][1]}"
|
||||
)
|
||||
file_msg: FileMessage = eiger.file_event.get()
|
||||
assert file_msg.file_path == eiger._full_path
|
||||
assert file_msg.done is False
|
||||
assert file_msg.successful is False
|
||||
assert file_msg.hinted_h5_entries == {"data": "entry/data/data"}
|
||||
|
||||
data_settings = DatasetSettings(
|
||||
image_time_us=int(scan_msg.scan_parameters["exp_time"] * 1e6),
|
||||
ntrigger=int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"]),
|
||||
file_prefix=os.path.relpath(eiger._full_path, start="/sls/x12sa/data").removesuffix(
|
||||
"_master.h5"
|
||||
),
|
||||
beam_x_pxl=eiger.beam_center[0],
|
||||
beam_y_pxl=eiger.beam_center[1],
|
||||
detector_distance_mm=eiger.detector_distance,
|
||||
incident_energy_ke_v=12.0, # hardcoded at this moment as it is hardcoded in the Eiger implementation
|
||||
)
|
||||
assert mock_start.call_args == mock.call(settings=data_settings)
|
||||
assert eiger.staged is Staged.yes
|
||||
@@ -1,444 +0,0 @@
|
||||
# pylint: skip-file
|
||||
import threading
|
||||
from unittest import mock
|
||||
|
||||
import ophyd
|
||||
import pytest
|
||||
from bec_lib import messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_server.device_server.tests.utils import DMMock
|
||||
from ophyd_devices.tests.utils import MockPV
|
||||
|
||||
from csaxs_bec.devices.epics.eiger9m_csaxs import Eiger9McSAXS
|
||||
from csaxs_bec.devices.tests_utils.utils import patch_dual_pvs
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def mock_det():
|
||||
name = "eiger"
|
||||
prefix = "X12SA-ES-EIGER9M:"
|
||||
dm = DMMock()
|
||||
with mock.patch.object(dm, "connector"):
|
||||
with (
|
||||
mock.patch("ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"),
|
||||
mock.patch(
|
||||
"ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config"
|
||||
),
|
||||
):
|
||||
with mock.patch.object(ophyd, "cl") as mock_cl:
|
||||
mock_cl.get_pv = MockPV
|
||||
mock_cl.thread_class = threading.Thread
|
||||
with mock.patch.object(Eiger9McSAXS, "_init"):
|
||||
det = Eiger9McSAXS(name=name, prefix=prefix, device_manager=dm)
|
||||
patch_dual_pvs(det)
|
||||
det.TIMEOUT_FOR_SIGNALS = 0.1
|
||||
yield det
|
||||
|
||||
|
||||
def test_init():
|
||||
"""Test the _init function:"""
|
||||
name = "eiger"
|
||||
prefix = "X12SA-ES-EIGER9M:"
|
||||
dm = DMMock()
|
||||
with mock.patch.object(dm, "connector"):
|
||||
with (
|
||||
mock.patch("ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"),
|
||||
mock.patch(
|
||||
"ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config"
|
||||
),
|
||||
):
|
||||
with mock.patch.object(ophyd, "cl") as mock_cl:
|
||||
mock_cl.get_pv = MockPV
|
||||
with (
|
||||
mock.patch(
|
||||
"csaxs_bec.devices.epics.eiger9m_csaxs.Eiger9MSetup.initialize_default_parameter"
|
||||
) as mock_default,
|
||||
mock.patch(
|
||||
"csaxs_bec.devices.epics.eiger9m_csaxs.Eiger9MSetup.initialize_detector"
|
||||
) as mock_init_det,
|
||||
mock.patch(
|
||||
"csaxs_bec.devices.epics.eiger9m_csaxs.Eiger9MSetup.initialize_detector_backend"
|
||||
) as mock_init_backend,
|
||||
):
|
||||
Eiger9McSAXS(name=name, prefix=prefix, device_manager=dm)
|
||||
mock_default.assert_called_once()
|
||||
mock_init_det.assert_called_once()
|
||||
mock_init_backend.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"trigger_source, detector_state, expected_exception", [(2, 1, True), (2, 0, False)]
|
||||
)
|
||||
def test_initialize_detector(mock_det, trigger_source, detector_state, expected_exception):
|
||||
"""Test the _init function:
|
||||
|
||||
This includes testing the functions:
|
||||
- _init_detector
|
||||
- _stop_det
|
||||
- _set_trigger
|
||||
--> Testing the filewriter is done in test_init_filewriter
|
||||
|
||||
Validation upon setting the correct PVs
|
||||
|
||||
"""
|
||||
mock_det.cam.detector_state._read_pv.mock_data = detector_state
|
||||
if expected_exception:
|
||||
with pytest.raises(Exception):
|
||||
mock_det.timeout = 0.1
|
||||
mock_det.custom_prepare.initialize_detector()
|
||||
else:
|
||||
mock_det.custom_prepare.initialize_detector() # call the method you want to test
|
||||
assert mock_det.cam.acquire.get() == 0
|
||||
assert mock_det.cam.detector_state.get() == detector_state
|
||||
assert mock_det.cam.trigger_mode.get() == trigger_source
|
||||
|
||||
|
||||
def test_trigger(mock_det):
|
||||
"""Test the trigger function:
|
||||
Validate that trigger calls the custom_prepare.on_trigger() function
|
||||
"""
|
||||
with mock.patch.object(mock_det.custom_prepare, "on_trigger") as mock_on_trigger:
|
||||
mock_det.trigger()
|
||||
mock_on_trigger.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"readout_time, expected_value", [(1e-3, 3e-3), (3e-3, 3e-3), (5e-3, 5e-3), (None, 3e-3)]
|
||||
)
|
||||
def test_update_readout_time(mock_det, readout_time, expected_value):
|
||||
if readout_time is None:
|
||||
mock_det.custom_prepare.update_readout_time()
|
||||
assert mock_det.readout_time == expected_value
|
||||
else:
|
||||
mock_det.scaninfo.readout_time = readout_time
|
||||
mock_det.custom_prepare.update_readout_time()
|
||||
assert mock_det.readout_time == expected_value
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"eacc, exp_url, daq_status, daq_cfg, expected_exception",
|
||||
[
|
||||
("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_user_id": 12543}, False),
|
||||
("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_user_id": 15421}, False),
|
||||
("e12345", "http://xbl-daq-29:5000", {"state": "BUSY"}, {"writer_user_id": 15421}, True),
|
||||
("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_ud": 12345}, True),
|
||||
],
|
||||
)
|
||||
def test_initialize_detector_backend(
|
||||
mock_det, eacc, exp_url, daq_status, daq_cfg, expected_exception
|
||||
):
|
||||
"""Test self.custom_prepare.initialize_detector_backend (std daq in this case)
|
||||
|
||||
This includes testing the functions:
|
||||
|
||||
- _update_service_config
|
||||
|
||||
Validation upon checking set values in mocked std_daq instance
|
||||
"""
|
||||
with mock.patch("csaxs_bec.devices.epics.eiger9m_csaxs.StdDaqClient") as mock_std_daq:
|
||||
instance = mock_std_daq.return_value
|
||||
instance.stop_writer.return_value = None
|
||||
instance.get_status.return_value = daq_status
|
||||
instance.get_config.return_value = daq_cfg
|
||||
mock_det.scaninfo.username = eacc
|
||||
# scaninfo.username.return_value = eacc
|
||||
if expected_exception:
|
||||
with pytest.raises(Exception):
|
||||
mock_det.timeout = 0.1
|
||||
mock_det.custom_prepare.initialize_detector_backend()
|
||||
else:
|
||||
mock_det.custom_prepare.initialize_detector_backend()
|
||||
|
||||
instance.stop_writer.assert_called_once()
|
||||
instance.get_status.assert_called()
|
||||
instance.set_config.assert_called_once_with(daq_cfg)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"scaninfo, daq_status, daq_cfg, detector_state, stopped, expected_exception",
|
||||
[
|
||||
(
|
||||
{
|
||||
"eacc": "e12345",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"filepath": "test.h5",
|
||||
"scan_id": "123",
|
||||
"mokev": 12.4,
|
||||
},
|
||||
{"state": "READY"},
|
||||
{"writer_user_id": 12543},
|
||||
5,
|
||||
False,
|
||||
False,
|
||||
),
|
||||
(
|
||||
{
|
||||
"eacc": "e12345",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"filepath": "test.h5",
|
||||
"scan_id": "123",
|
||||
"mokev": 12.4,
|
||||
},
|
||||
{"state": "BUSY"},
|
||||
{"writer_user_id": 15421},
|
||||
5,
|
||||
False,
|
||||
False,
|
||||
),
|
||||
(
|
||||
{
|
||||
"eacc": "e12345",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"filepath": "test.h5",
|
||||
"scan_id": "123",
|
||||
"mokev": 18.4,
|
||||
},
|
||||
{"state": "READY"},
|
||||
{"writer_user_id": 12345},
|
||||
4,
|
||||
False,
|
||||
True,
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_stage(
|
||||
mock_det, scaninfo, daq_status, daq_cfg, detector_state, stopped, expected_exception
|
||||
):
|
||||
with (
|
||||
mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq,
|
||||
mock.patch.object(
|
||||
mock_det.custom_prepare, "publish_file_location"
|
||||
) as mock_publish_file_location,
|
||||
):
|
||||
mock_std_daq.stop_writer.return_value = None
|
||||
mock_std_daq.get_status.return_value = daq_status
|
||||
mock_std_daq.get_config.return_value = daq_cfg
|
||||
mock_det.scaninfo.num_points = scaninfo["num_points"]
|
||||
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
|
||||
mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"]
|
||||
# TODO consider putting energy as variable in scaninfo
|
||||
mock_det.device_manager.add_device("mokev", value=12.4)
|
||||
mock_det.cam.beam_energy.put(scaninfo["mokev"])
|
||||
mock_det.stopped = stopped
|
||||
mock_det.cam.detector_state._read_pv.mock_data = detector_state
|
||||
with mock.patch.object(mock_det.custom_prepare, "prepare_data_backend") as mock_prep_fw:
|
||||
mock_det.filepath.set(scaninfo["filepath"]).wait()
|
||||
if expected_exception:
|
||||
with pytest.raises(Exception):
|
||||
mock_det.timeout = 0.1
|
||||
mock_det.stage()
|
||||
else:
|
||||
mock_det.stage()
|
||||
mock_prep_fw.assert_called_once()
|
||||
# Check _prep_det
|
||||
assert mock_det.cam.num_images.get() == int(
|
||||
scaninfo["num_points"] * scaninfo["frames_per_trigger"]
|
||||
)
|
||||
assert mock_det.cam.num_frames.get() == 1
|
||||
|
||||
mock_publish_file_location.assert_called_with(done=False, successful=False)
|
||||
assert mock_det.cam.acquire.get() == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"scaninfo, daq_status, expected_exception",
|
||||
[
|
||||
(
|
||||
{
|
||||
"eacc": "e12345",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"filepath": "test.h5",
|
||||
"scan_id": "123",
|
||||
},
|
||||
{"state": "BUSY", "acquisition": {"state": "WAITING_IMAGES"}},
|
||||
False,
|
||||
),
|
||||
(
|
||||
{
|
||||
"eacc": "e12345",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"filepath": "test.h5",
|
||||
"scan_id": "123",
|
||||
},
|
||||
{"state": "BUSY", "acquisition": {"state": "WAITING_IMAGES"}},
|
||||
False,
|
||||
),
|
||||
(
|
||||
{
|
||||
"eacc": "e12345",
|
||||
"num_points": 500,
|
||||
"frames_per_trigger": 1,
|
||||
"filepath": "test.h5",
|
||||
"scan_id": "123",
|
||||
},
|
||||
{"state": "BUSY", "acquisition": {"state": "ERROR"}},
|
||||
True,
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_prepare_detector_backend(mock_det, scaninfo, daq_status, expected_exception):
|
||||
with (
|
||||
mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq,
|
||||
mock.patch.object(mock_det.custom_prepare, "filepath_exists") as mock_file_path_exists,
|
||||
mock.patch.object(mock_det.custom_prepare, "stop_detector_backend") as mock_stop_backend,
|
||||
mock.patch.object(mock_det, "scaninfo"),
|
||||
):
|
||||
mock_std_daq.start_writer_async.return_value = None
|
||||
mock_std_daq.get_status.return_value = daq_status
|
||||
mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"]
|
||||
mock_det.scaninfo.num_points = scaninfo["num_points"]
|
||||
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
|
||||
|
||||
if expected_exception:
|
||||
with pytest.raises(Exception):
|
||||
mock_det.timeout = 0.1
|
||||
mock_det.custom_prepare.prepare_data_backend()
|
||||
mock_file_path_exists.assert_called_once()
|
||||
assert mock_stop_backend.call_count == 2
|
||||
|
||||
else:
|
||||
mock_det.custom_prepare.prepare_data_backend()
|
||||
mock_file_path_exists.assert_called_once()
|
||||
mock_stop_backend.assert_called_once()
|
||||
|
||||
daq_writer_call = {
|
||||
"output_file": scaninfo["filepath"],
|
||||
"n_images": int(scaninfo["num_points"] * scaninfo["frames_per_trigger"]),
|
||||
}
|
||||
mock_std_daq.start_writer_async.assert_called_with(daq_writer_call)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)])
|
||||
def test_complete(mock_det, stopped, expected_exception):
|
||||
with (
|
||||
mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,
|
||||
mock.patch.object(
|
||||
mock_det.custom_prepare, "publish_file_location"
|
||||
) as mock_publish_file_location,
|
||||
):
|
||||
mock_det.stopped = stopped
|
||||
if expected_exception:
|
||||
mock_det.complete()
|
||||
assert mock_det.stopped is True
|
||||
else:
|
||||
mock_det.complete()
|
||||
mock_finished.assert_called_once()
|
||||
mock_publish_file_location.assert_called_with(done=True, successful=True)
|
||||
assert mock_det.stopped is False
|
||||
|
||||
|
||||
def test_stop_detector_backend(mock_det):
|
||||
with mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq:
|
||||
mock_std_daq.stop_writer.return_value = None
|
||||
mock_det.std_client = mock_std_daq
|
||||
mock_det.custom_prepare.stop_detector_backend()
|
||||
mock_std_daq.stop_writer.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"scaninfo",
|
||||
[
|
||||
({"filepath": "test.h5", "successful": True, "done": False, "scan_id": "123"}),
|
||||
({"filepath": "test.h5", "successful": False, "done": True, "scan_id": "123"}),
|
||||
],
|
||||
)
|
||||
def test_publish_file_location(mock_det, scaninfo):
|
||||
mock_det.scaninfo.scan_id = scaninfo["scan_id"]
|
||||
mock_det.filepath.set(scaninfo["filepath"]).wait()
|
||||
mock_det.custom_prepare.publish_file_location(
|
||||
done=scaninfo["done"], successful=scaninfo["successful"]
|
||||
)
|
||||
if scaninfo["successful"] is None:
|
||||
msg = messages.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"])
|
||||
else:
|
||||
msg = messages.FileMessage(
|
||||
file_path=scaninfo["filepath"], done=scaninfo["done"], successful=scaninfo["successful"]
|
||||
)
|
||||
expected_calls = [
|
||||
mock.call(
|
||||
MessageEndpoints.public_file(scaninfo["scan_id"], mock_det.name),
|
||||
msg,
|
||||
pipe=mock_det.connector.pipeline.return_value,
|
||||
),
|
||||
mock.call(
|
||||
MessageEndpoints.file_event(mock_det.name),
|
||||
msg,
|
||||
pipe=mock_det.connector.pipeline.return_value,
|
||||
),
|
||||
]
|
||||
assert mock_det.connector.set_and_publish.call_args_list == expected_calls
|
||||
|
||||
|
||||
def test_stop(mock_det):
|
||||
with (
|
||||
mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det,
|
||||
mock.patch.object(
|
||||
mock_det.custom_prepare, "stop_detector_backend"
|
||||
) as mock_stop_detector_backend,
|
||||
):
|
||||
mock_det.stop()
|
||||
mock_stop_det.assert_called_once()
|
||||
mock_stop_detector_backend.assert_called_once()
|
||||
assert mock_det.stopped is True
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"stopped, scaninfo, cam_state, daq_status, expected_exception",
|
||||
[
|
||||
(
|
||||
False,
|
||||
{"num_points": 500, "frames_per_trigger": 4},
|
||||
0,
|
||||
{"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 2000}}},
|
||||
False,
|
||||
),
|
||||
(
|
||||
False,
|
||||
{"num_points": 500, "frames_per_trigger": 4},
|
||||
0,
|
||||
{"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 1999}}},
|
||||
True,
|
||||
),
|
||||
(
|
||||
False,
|
||||
{"num_points": 500, "frames_per_trigger": 1},
|
||||
1,
|
||||
{"acquisition": {"state": "READY", "stats": {"n_write_completed": 500}}},
|
||||
True,
|
||||
),
|
||||
(
|
||||
False,
|
||||
{"num_points": 500, "frames_per_trigger": 1},
|
||||
0,
|
||||
{"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 500}}},
|
||||
False,
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_finished(mock_det, stopped, cam_state, daq_status, scaninfo, expected_exception):
|
||||
with (
|
||||
mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq,
|
||||
mock.patch.object(mock_det.custom_prepare, "stop_detector_backend") as mock_stop_backend,
|
||||
mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det,
|
||||
):
|
||||
mock_std_daq.get_status.return_value = daq_status
|
||||
mock_det.cam.acquire._read_pv.mock_state = cam_state
|
||||
mock_det.scaninfo.num_points = scaninfo["num_points"]
|
||||
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
|
||||
if expected_exception:
|
||||
with pytest.raises(Exception):
|
||||
mock_det.timeout = 0.1
|
||||
mock_det.custom_prepare.finished()
|
||||
assert mock_det.stopped is stopped
|
||||
else:
|
||||
mock_det.custom_prepare.finished()
|
||||
if stopped:
|
||||
assert mock_det.stopped is stopped
|
||||
|
||||
mock_stop_backend.assert_called()
|
||||
mock_stop_det.assert_called_once()
|
||||
@@ -96,11 +96,9 @@ def test_mcs_card_csaxs_on_connected(mock_mcs_csaxs):
|
||||
assert mcs.read_mode.get() == READMODE.PASSIVE
|
||||
assert mcs.acquire_mode.get() == ACQUIREMODE.MCS
|
||||
|
||||
with mock.patch.object(mcs.current_channel, "subscribe") as mock_cur_ch_subscribe:
|
||||
with mock.patch.object(mcs.counters.mca1, "subscribe") as mock_mca_subscribe:
|
||||
mcs.on_connected()
|
||||
assert mock_cur_ch_subscribe.call_args == mock.call(mcs._progress_update, run=False)
|
||||
assert mock_mca_subscribe.call_args == mock.call(mcs._on_counter_update, run=False)
|
||||
with mock.patch.object(mcs.counters.mca1, "subscribe") as mock_mca_subscribe:
|
||||
mcs.on_connected()
|
||||
assert mock_mca_subscribe.call_args == mock.call(mcs._on_counter_update, run=False)
|
||||
|
||||
|
||||
def test_mcs_card_csaxs_stage(mock_mcs_csaxs):
|
||||
|
||||
Reference in New Issue
Block a user