Jungfrau Joch integration #133

Merged
appel_c merged 10 commits from fix/fj_integration into main 2026-02-16 14:14:10 +01:00
9 changed files with 598 additions and 165 deletions

View File

@@ -9,6 +9,17 @@ eiger_1_5:
readoutPriority: async
softwareTrigger: False
eiger_9:
description: Eiger 9M detector
deviceClass: csaxs_bec.devices.jungfraujoch.eiger_9m.Eiger9M
deviceConfig:
detector_distance: 100
beam_center: [0, 0]
onFailure: raise
enabled: true
readoutPriority: async
softwareTrigger: False
ids_cam:
description: IDS camera for live image acquisition
deviceClass: csaxs_bec.devices.ids_cameras.IDSCamera

View File

@@ -317,10 +317,14 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
old_value: Previous value of the signal.
value: New value of the signal.
"""
scan_done = bool(value == self._num_total_triggers)
self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done)
if scan_done:
self._scan_done_event.set()
try:
scan_done = bool(value == self._num_total_triggers)
self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done)
if scan_done:
self._scan_done_event.set()
except Exception:
content = traceback.format_exc()
logger.info(f"Device {self.name} error: {content}")
def on_stage(self) -> None:
"""

View File

@@ -0,0 +1,48 @@
# Overview
Integration module for Eiger detectors at the cSAXS beamline with JungfrauJoch backend.
There are currently two supported Eiger detectors:
- EIGER 1.5M
- EIGER 9M
This module provides a base integration for both detectors. A short list of useful
information is also provided below.
## JungfrauJoch Service
The JungfrauJoch WEB UI is available on http://sls-jfjoch-001:8080. This is an interface
to the broker which runs on sls-jfjoch-001.psi.ch. The writer service runs on
xbl-daq-34.psi.ch. Permissions to get access to these machines and run systemctl or
journalctl commands can be requested with the Infrastructure and Services group in AWI.
Beamline scientists need to check if they have the necessary permissions to connect
to these machines and run the commands below.
Useful commands for the broker service on sls-jfjoch-001.psi.ch:
- sudo systemctl status jfjoch_broker # Check status
- sudo systemctl start jfjoch_broker # Start service
- sudo systemctl stop jfjoch_broker # Stop service
- sudo systemctl restart jfjoch_broker # Restart service
For the writer service on xbl-daq-34.psi.ch:
- sudo journalctl -u jfjoch_writer -f # streams live logs
- sudo systemctl status jfjoch_writer # Check status
- sudo systemctl start jfjoch_writer # Start service
- sudo systemctl stop jfjoch_writer # Stop service
- sudo systemctl restart jfjoch_writer # Restart service
More information about the JungfrauJoch and API client can be found at: (https://jungfraujoch.readthedocs.io/en/latest/index.html)
### JungfrauJoch API Client
A thin wrapper for the JungfrauJoch API client is provided in the [jungfrau_joch_client](./jungfrau_joch_client.py).
Details about the specific integration are provided in the code.
## Eiger implementation
The Eiger detector integration is provided in the [eiger.py](./eiger.py) module. It provides a base integration for both Eiger 1.5M and Eiger 9M detectors.
Logic specific to each detector is implemented in the respective modules:
- [eiger_1_5m.py](./eiger_1_5m.py)
- [eiger_9m.py](./eiger_9m.py)
With the current implementation, the detector initialization should be done by a beamline scientist through the JungfrauJoch WEB UI by choosing the
appropriate detector (1.5M or 9M) before loading the device config with BEC. BEC will check upon connecting if the selected detector matches the expected one.
A preview stream for images is also provided which is forwarded and accessible through the `preview_image` signal.
For more specific details, please check the code documentation.

View File

@@ -1,34 +1,23 @@
"""
Generic integration of JungfrauJoch backend with Eiger detectors
for the cSAXS beamline at the Swiss Light Source.
The WEB UI is available on http://sls-jfjoch-001:8080
Integration module for Eiger detectors at the cSAXS beamline with JungfrauJoch backend.
NOTE: this may not be the best place to store this information. It should be migrated to
beamline documentation for debugging of Eiger & JungfrauJoch.
A few notes on setup and operation of the Eiger detectors through the JungfrauJoch broker:
The JungfrauJoch server for cSAXS runs on sls-jfjoch-001.psi.ch
User with sufficient rights may use:
- sudo systemctl restart jfjoch_broker
- sudo systemctl status jfjoch_broker
to check and/or restart the broker for the JungfrauJoch server.
Some extra notes for setting up the detector:
- If the energy on JFJ is set via DetectorSettings, the variable in DatasetSettings will be ignored
- Changes in energy may take time, good to implement logic that only resets energy if needed.
- For the Eiger, the frame_time_us in DetectorSettings is ignored, only the frame_time_us in
the DatasetSettings is relevant
- The bit_depth will be adjusted automatically based on the exp_time. Here, we need to ensure
that subsequent triggers properly
consider the readout_time of the boards. For Jungfrau detectors, the difference between
count_time_us and frame_time_us is the readout_time of the boards. For the Eiger, this needs
to be taken into account during the integration.
that subsequent triggers properly consider the readout_time of the boards. For the Eiger detectors
at cSAXS, a readout time of 20us is configured through the JungfrauJoch deployment config. This
setting is sufficiently large for the detectors if they run in parallel mode.
- beam_center and detector settings are required input arguments, thus, they may be set to wrong
values for acquisitions to start. Please keep this in mind.
Hardware related notes:
- If there is an HW issue with the detector, power cycling may help.
- The sls_detector package is available on console on /sls/X12SA/data/gac-x12sa/erik/micromamba
- The sls_detector package is available on console on /sls/x12sa/applications/erik/micromamba
- Run: source setup_9m.sh # Be careful, this connects to the detector, so it should not be
used during operation
- Useful commands:
@@ -39,9 +28,6 @@ Hardware related notes:
- cd power_control_user/
- ./on
- ./off
Further information that may be relevant for debugging:
JungfrauJoch - one needs to connect to the jfj-server (sls-jfjoch-001)
"""
from __future__ import annotations
@@ -84,10 +70,19 @@ class EigerError(Exception):
class Eiger(PSIDeviceBase):
"""
Base integration of the Eiger1.5M and Eiger9M at cSAXS. All relevant
Base integration of the Eiger1.5M and Eiger9M at cSAXS.
Args:
name (str) : Name of the device
detector_name (str): Name of the detector. Supports ["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"]
host (str): Hostname of the Jungfrau Joch server.
port (int): Port of the Jungfrau Joch server.
scan_info (ScanInfo): The scan info to use.
device_manager (DeviceManagerDS): The device manager to use.
**kwargs: Additional keyword arguments.
"""
USER_ACCESS = ["detector_distance", "beam_center"]
USER_ACCESS = ["set_detector_distance", "set_beam_center"]
file_event = Cpt(FileEventSignal, name="file_event")
preview_image = Cpt(PreviewSignal, name="preview_image", ndim=2)
@@ -105,23 +100,12 @@ class Eiger(PSIDeviceBase):
device_manager=None,
**kwargs,
):
"""
Initialize the PSI Device Base class.
Args:
name (str) : Name of the device
detector_name (str): Name of the detector. Supports ["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"]
host (str): Hostname of the Jungfrau Joch server.
port (int): Port of the Jungfrau Joch server.
scan_info (ScanInfo): The scan info to use.
device_manager (DeviceManagerDS): The device manager to use.
**kwargs: Additional keyword arguments.
"""
super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs)
self._host = f"{host}:{port}"
self.jfj_client = JungfrauJochClient(host=self._host, parent=self)
# NOTE: fetch this information from JungfrauJochClient during on_connected!
self.jfj_preview_client = JungfrauJochPreview(
url="tcp://129.129.95.114:5400", cb=self.preview_image.put
url="tcp://129.129.95.114:5400", cb=self._preview_callback
) # IP of sls-jfjoch-001.psi.ch on port 5400 for ZMQ stream
self.device_manager = device_manager
self.detector_name = detector_name
@@ -129,53 +113,102 @@ class Eiger(PSIDeviceBase):
self._beam_center = beam_center
self._readout_time = readout_time
self._full_path = ""
self._num_triggers = 0
self._wait_for_on_complete = 20 # seconds
if self.device_manager is not None:
self.device_manager: DeviceManagerDS
def _preview_callback(self, message: dict) -> None:
"""
Callback method for handling preview messages as received from the JungfrauJoch preview stream.
These messages are dictionary dumps as described in the JFJ ZMQ preview stream documentation.
(https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream).
Args:
message (dict): The message received from the preview stream.
"""
if message.get("type", "") == "image":
data = message.get("data", {}).get("default", None)
if data is None:
logger.error(f"Received image message on device {self.name} without data.")
return
logger.info(f"Received preview image on device {self.name}")
self.preview_image.put(data)
# pylint: disable=missing-function-docstring
@property
def detector_distance(self) -> float:
"""The detector distance in mm."""
return self._detector_distance
@detector_distance.setter
def detector_distance(self, value: float) -> None:
"""Set the detector distance in mm."""
if value <= 0:
raise ValueError("Detector distance must be a positive value.")
self._detector_distance = value
def set_detector_distance(self, distance: float) -> None:
"""
Set the detector distance in mm.
Args:
distance (float): The detector distance in mm.
"""
self.detector_distance = distance
# pylint: disable=missing-function-docstring
@property
def beam_center(self) -> tuple[float, float]:
"""The beam center in pixels. (x,y)"""
return self._beam_center
@beam_center.setter
def beam_center(self, value: tuple[float, float]) -> None:
"""Set the beam center in pixels. (x,y)"""
if any(coord < 0 for coord in value):
raise ValueError("Beam center coordinates must be non-negative.")
self._beam_center = value
def on_init(self) -> None:
def set_beam_center(self, x: float, y: float) -> None:
"""
Called when the device is initialized.
Set the beam center coordinates in pixels.
No siganls are connected at this point,
thus should not be set here but in on_connected instead.
Args:
x (float): The x coordinate of the beam center in pixels.
y (float): The y coordinate of the beam center in pixels.
"""
self.beam_center = (x, y)
def on_init(self) -> None:
"""Hook called during device initialization."""
# pylint: disable=arguments-differ
def wait_for_connection(self, timeout: float = 10) -> None:
"""
Wait for the device to be connected to the JungfrauJoch backend.
Args:
timeout (float): Timeout in seconds to wait for the connection.
"""
self.jfj_client.api.status_get(_request_timeout=timeout) # If connected, this responds
def on_connected(self) -> None:
"""
Hook called after the device is connected to through the device server.
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
Default values for signals should be set here. Currently, the detector needs to be
initialised manually through the WEB UI of JungfrauJoch. Once agreed upon, the automated
initialisation can be re-enabled here (code commented below).
"""
start_time = time.time()
logger.debug(f"On connected called for {self.name}")
self.jfj_client.stop(request_timeout=3)
# Check which detector is selected
# Get available detectors
available_detectors = self.jfj_client.api.config_select_detector_get(_request_timeout=5)
logger.debug(f"Available detectors {available_detectors}")
# Get current detector
current_detector_name = ""
if available_detectors.current_id:
if available_detectors.current_id is not None:
detector_selection = [
det.description
for det in available_detectors.detectors
@@ -190,8 +223,9 @@ class Eiger(PSIDeviceBase):
raise RuntimeError(
f"Detector {self.detector_name} is not in IDLE state, current state: {self.jfj_client.detector_state}. Please initialize the detector in the WEB UI: {self._host}."
)
# TODO - check again once Eiger should be initialized automatically, currently human initialization is expected
# # Once the automation should be enabled, we may use here
# TODO - Currently the initialisation of the detector is done manually through the WEB UI. Once adjusted
# this can be automated here again.
# detector_selection = [
# det for det in available_detectors.detectors if det.id == self.detector_name
# ]
@@ -207,41 +241,51 @@ class Eiger(PSIDeviceBase):
# Setup Detector settings, here we may also set the energy already as this might be time consuming
settings = DetectorSettings(frame_time_us=int(500), timing=DetectorTiming.TRIGGER)
self.jfj_client.set_detector_settings(settings, timeout=10)
self.jfj_client.set_detector_settings(settings, timeout=5)
# Set the file writer to the appropriate output for the HDF5 file
file_writer_settings = FileWriterSettings(overwrite=True, format=FileWriterFormat.NXMXVDS)
logger.debug(
f"Setting writer_settings: {yaml.dump(file_writer_settings.to_dict(), indent=4)}"
)
# Setup the file writer settings
self.jfj_client.api.config_file_writer_put(
file_writer_settings=file_writer_settings, _request_timeout=10
)
# Start the preview client
self.jfj_preview_client.connect()
self.jfj_preview_client.start()
logger.info(f"Connected to JungfrauJoch preview stream at {self.jfj_preview_client.url}")
logger.info(
f"Device {self.name} initialized after {time.time()-start_time:.2f}s. Preview stream connected on url: {self.jfj_preview_client.url}"
)
def on_stage(self) -> DeviceStatus | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info object.
Hook called when staging the device. Information about the upcoming scan can be accessed from the scan_info object.
scan_msg = self.scan_info.msg
"""
start_time = time.time()
scan_msg = self.scan_info.msg
# Set acquisition parameter
# TODO add check of mono energy, this can then also be passed to DatasetSettings
# TODO: Check mono energy from device in BEC
# Setting incident energy in keV
incident_energy = 12.0
# Setting up exp_time and num_triggers acquisition parameter
exp_time = scan_msg.scan_parameters.get("exp_time", 0)
if exp_time <= self._readout_time:
if exp_time <= self._readout_time: # Exp_time must be at least the readout time
raise ValueError(
f"Receive scan request for scan {scan_msg.scan_name} with exp_time {exp_time}s, which must be larger than the readout time {self._readout_time}s of the detector {self.detector_name}."
f"Value error on device {self.name}: Exposure time {exp_time}s is less than readout time {self._readout_time}s."
)
frame_time_us = exp_time #
ntrigger = int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"])
# Fetch file path
self._num_triggers = int(
scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"]
)
# Setting up the full path for file writing
self._full_path = get_full_path(scan_msg, name=f"{self.name}_master")
self._full_path = os.path.abspath(os.path.expanduser(self._full_path))
# Inform BEC about upcoming file event
self.file_event.put(
file_path=self._full_path,
@@ -249,11 +293,14 @@ class Eiger(PSIDeviceBase):
successful=False,
hinted_h5_entries={"data": "entry/data/data"},
)
# JFJ adds _master.h5 automatically
path = os.path.relpath(self._full_path, start="/sls/x12sa/data").removesuffix("_master.h5")
# Create dataset settings for API call.
data_settings = DatasetSettings(
image_time_us=int(frame_time_us * 1e6), # This is currently ignored
ntrigger=ntrigger,
image_time_us=int(exp_time * 1e6),
ntrigger=self._num_triggers,
file_prefix=path,
beam_x_pxl=int(self._beam_center[0]),
beam_y_pxl=int(self._beam_center[1]),
@@ -261,11 +308,15 @@ class Eiger(PSIDeviceBase):
incident_energy_ke_v=incident_energy,
)
logger.debug(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}")
prep_time = start_time - time.time()
logger.debug(f"Prepared information for eiger to start acquisition in {prep_time:.2f}s")
self.jfj_client.wait_for_idle(timeout=10, request_timeout=10) # Ensure we are in IDLE state
prep_time = time.time()
self.jfj_client.wait_for_idle(timeout=10) # Ensure we are in IDLE state
self.jfj_client.start(settings=data_settings) # Takes around ~0.6s
logger.debug(f"Wait for IDLE and start call took {time.time()-start_time-prep_time:.2f}s")
# Time the stage process
logger.info(
f"Device {self.name} staged for scan. Time spent {time.time()-start_time:.2f}s,"
f" with {time.time()-prep_time:.2f}s spent with communication to JungfrauJoch."
)
def on_unstage(self) -> DeviceStatus:
"""Called while unstaging the device."""
@@ -278,7 +329,9 @@ class Eiger(PSIDeviceBase):
def _file_event_callback(self, status: DeviceStatus) -> None:
"""Callback to update the file_event signal when the acquisition is done."""
logger.info(f"Acquisition done callback called for {self.name} for status {status.success}")
logger.debug(
f"File event callback on complete status for device {self.name}: done={status.done}, successful={status.success}"
)
self.file_event.put(
file_path=self._full_path,
done=status.done,
@@ -287,19 +340,44 @@ class Eiger(PSIDeviceBase):
)
def on_complete(self) -> DeviceStatus:
"""Called to inquire if a device has completed a scans."""
"""
Called at the end of the scan. The method should implement an asynchronous wait for the
device to complete the acquisition. A callback to update the file_event signal is
attached that resolves the file event when the acquisition is done.
Returns:
DeviceStatus: The status object representing the completion of the acquisition.
"""
def wait_for_complete():
start_time = time.time()
timeout = 10
for _ in range(timeout):
if self.jfj_client.wait_for_idle(timeout=1, request_timeout=10):
# NOTE: This adjust the time (s) that should be waited for completion of the scan.
timeout = self._wait_for_on_complete
while time.time() - start_time < timeout:
if self.jfj_client.wait_for_idle(timeout=1, raise_on_timeout=False):
# TODO: Once available, add check for
statistics: MeasurementStatistics = (
self.jfj_client.api.statistics_data_collection_get(_request_timeout=5)
)
if statistics.images_collected < self._num_triggers:
raise EigerError(
f"Device {self.name} acquisition incomplete. "
f"Expected {self._num_triggers} triggers, "
f"but only {statistics.images_collected} were collected."
)
return
logger.info(
f"Waiting for device {self.name} to finish complete, time elapsed: "
f"{time.time() - start_time}."
)
statistics: MeasurementStatistics = self.jfj_client.api.statistics_data_collection_get(
_request_timeout=5
)
broker_status = self.jfj_client.jfj_status
raise TimeoutError(
f"Timeout after waiting for detector {self.name} to complete for {time.time()-start_time:.2f}s, measurement statistics: {yaml.dump(statistics.to_dict(), indent=4)}"
f"Timeout after waiting for device {self.name} to complete for {time.time()-start_time:.2f}s \n \n"
f"Broker status: \n{yaml.dump(broker_status.to_dict(), indent=4)} \n \n"
f"Measurement statistics: \n{yaml.dump(statistics.to_dict(), indent=4)}"
)
status = self.task_handler.submit_task(wait_for_complete, run=True)
@@ -312,7 +390,11 @@ class Eiger(PSIDeviceBase):
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.jfj_client.stop(
request_timeout=0.5
) # Call should not block more than 0.5 seconds to stop all devices...
self.jfj_client.stop(request_timeout=0.5)
self.task_handler.shutdown()
def on_destroy(self):
"""Called when the device is destroyed."""
self.jfj_preview_client.stop()
self.on_stop()
return super().on_destroy()

View File

@@ -21,18 +21,18 @@ if TYPE_CHECKING: # pragma no cover
from bec_server.device_server.device_server import DeviceManagerDS
EIGER9M_READOUT_TIME_US = 500e-6 # 500 microseconds in s
DETECTOR_NAME = "EIGER 8.5M (tmp)" # "EIGER 9M""
DETECTOR_NAME = "EIGER 9M" # "EIGER 9M""
# pylint:disable=invalid-name
class Eiger9M(Eiger):
"""
Eiger 1.5M specific integration for the in-vaccum Eiger.
EIGER 9M specific integration for the in-vaccum Eiger.
The logic implemented here is coupled to the DelayGenerator integration,
repsonsible for the global triggering of all devices through a single Trigger logic.
Please check the eiger.py class for more details about the integration of relevant backend
services. The detector_name must be set to "EIGER 1.5M:
services. The detector_name must be set to "EIGER 9M":
"""
USER_ACCESS = Eiger.USER_ACCESS + [] # Add more user_access methods here.

View File

@@ -1,13 +1,15 @@
"""Module with client interface for the Jungfrau Joch detector API"""
"""Module with a thin client wrapper around the Jungfrau Joch detector API"""
from __future__ import annotations
import enum
import threading
import time
import traceback
from typing import TYPE_CHECKING
import requests
import yaml
from bec_lib.logger import bec_logger
from jfjoch_client.api.default_api import DefaultApi
from jfjoch_client.api_client import ApiClient
@@ -18,7 +20,7 @@ from jfjoch_client.models.detector_settings import DetectorSettings
logger = bec_logger.logger
if TYPE_CHECKING:
if TYPE_CHECKING: # pragma: no cover
from ophyd import Device
@@ -29,7 +31,10 @@ class JungfrauJochClientError(Exception):
class DetectorState(str, enum.Enum):
"""Possible Detector states for Jungfrau Joch detector"""
"""
Enum states of the BrokerStatus state. The pydantic model validates in runtime,
thus we keep the possible states here for a convenient overview and access.
"""
INACTIVE = "Inactive"
IDLE = "Idle"
@@ -40,13 +45,15 @@ class DetectorState(str, enum.Enum):
class JungfrauJochClient:
"""Thin wrapper around the Jungfrau Joch API client.
"""
Jungfrau Joch API client wrapper. It provides a thin wrapper methods around the API client,
that allow to connect, initialise, wait for state changes, set settings, start and stop
acquisitions.
sudo systemctl restart jfjoch_broker
sudo systemctl status jfjoch_broker
It looks as if the detector is not being stopped properly.
One module remains running, how can we restart the detector?
Args:
host (str): Hostname of the Jungfrau Joch broker service.
Default is "http://sls-jfjoch-001:8080"
parent (Device, optional): Parent ophyd device, used for logging purposes.
"""
def __init__(
@@ -59,50 +66,63 @@ class JungfrauJochClient:
self._parent_name = parent.name if parent else self.__class__.__name__
@property
def jjf_state(self) -> BrokerStatus:
"""Get the status of JungfrauJoch"""
def jfj_status(self) -> BrokerStatus:
"""Broker status of JungfrauJoch."""
response = self.api.status_get()
return BrokerStatus(**response.to_dict())
# pylint: disable=missing-function-docstring
@property
def initialised(self) -> bool:
"""Check if jfj is connected and ready to receive commands"""
return self._initialised
@initialised.setter
def initialised(self, value: bool) -> None:
"""Set the connected status"""
self._initialised = value
# TODO this is not correct, as it may be that the state in INACTIVE. Models are not in sync...
# REMOVE all model enums as most of the validation takes place in the Pydantic models, i.e. BrokerStatus here..
# pylint: disable=missing-function-docstring
@property
def detector_state(self) -> DetectorState:
"""Get the status of JungfrauJoch"""
return DetectorState(self.jjf_state.state)
return DetectorState(self.jfj_status.state)
def connect_and_initialise(self, timeout: int = 10, **kwargs) -> None:
"""Check if JungfrauJoch is connected and ready to receive commands"""
def connect_and_initialise(self, timeout: int = 10) -> None:
"""
Connect and initialise the JungfrauJoch detector. The detector must be in
IDLE state to become initialised. This is a blocking call, the timeout parameter
will be passed to the HTTP requests timeout method of the wait_for_idle method.
Args:
timeout (int): Timeout in seconds for the initialisation and waiting for IDLE state.
"""
status = self.detector_state
# TODO: #135 Check if the detector has to be in INACTIVE state before initialisation
if status != DetectorState.IDLE:
self.api.initialize_post() # This is a blocking call....
self.wait_for_idle(timeout, request_timeout=timeout) # Blocking call
self.api.initialize_post()
self.wait_for_idle(timeout)
self.initialised = True
def set_detector_settings(self, settings: dict | DetectorSettings, timeout: int = 10) -> None:
"""Set the detector settings. JungfrauJoch must be in IDLE, Error or Inactive state.
Note, the full settings have to be provided, otherwise the settings will be overwritten with default values.
"""
Set the detector settings. The state of JungfrauJoch must be in IDLE,
Error or Inactive state. Please note: a full set of setttings has to be provided,
otherwise the settings will be overwritten with default values.
Args:
settings (dict): dictionary of settings
timeout (int): Timeout in seconds for the HTTP request to set the settings.
"""
state = self.detector_state
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
logger.info(
f"JungfrauJoch backend fo device {self._parent_name} is not in IDLE state,"
" waiting 1s before retrying..."
)
time.sleep(1) # Give the detector 1s to become IDLE, retry
state = self.detector_state
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
raise JungfrauJochClientError(
f"Error in {self._parent_name}. Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}"
f"Error on {self._parent_name}. Detector must be in IDLE, ERROR or INACTIVE"
" state to set settings. Current state: {state}"
)
if isinstance(settings, dict):
@@ -110,28 +130,36 @@ class JungfrauJochClient:
try:
self.api.config_detector_put(detector_settings=settings, _request_timeout=timeout)
except requests.exceptions.Timeout:
raise TimeoutError(f"Timeout while setting detector settings for {self._parent_name}")
raise TimeoutError(
f"Timeout on device {self._parent_name} while setting detector settings:\n "
f"{yaml.dump(settings, indent=4)}."
)
except Exception:
content = traceback.format_exc()
logger.error(
f"Error on device {self._parent_name} while setting detector settings:\n "
f"{yaml.dump(settings, indent=4)}. Error traceback: {content}"
)
raise JungfrauJochClientError(
f"Error while setting detector settings for {self._parent_name}: {content}"
f"Error on device {self._parent_name} while setting detector settings:\n "
f"{yaml.dump(settings, indent=4)}. Full traceback: {content}."
)
def start(self, settings: dict | DatasetSettings, request_timeout: float = 10) -> None:
"""Start the mesaurement. DatasetSettings must be provided, and JungfrauJoch must be in IDLE state.
The method call is blocking and JungfrauJoch will be ready to measure after the call resolves.
"""
Start the acquisition with the provided dataset settings.
The detector must be in IDLE state. Settings must always provide a full set of
parameters, missing parameters will be set to default values.
Args:
settings (dict): dictionary of settings
Please check the DataSettings class for the available settings. Minimum required settings are
beam_x_pxl, beam_y_pxl, detector_distance_mm, incident_energy_keV.
settings (dict | DatasetSettings): Dataset settings to start the acquisition with.
request_timeout (float): Timeout in sec for the HTTP request to start the acquisition.
"""
state = self.detector_state
if state != DetectorState.IDLE:
raise JungfrauJochClientError(
f"Error in {self._parent_name}. Detector must be in IDLE state to set settings. Current state: {state}"
f"Error on device {self._parent_name}. "
f"Detector must be in IDLE state to start acquisition. Current state: {state}"
)
if isinstance(settings, dict):
@@ -141,46 +169,80 @@ class JungfrauJochClient:
dataset_settings=settings, _request_timeout=request_timeout
)
except requests.exceptions.Timeout:
content = traceback.format_exc()
logger.error(
f"Timeout error after {request_timeout} seconds on device {self._parent_name} "
f"during 'start' call with dataset settings: {yaml.dump(settings, indent=4)}. \n"
f"Traceback: {content}"
)
raise TimeoutError(
f"TimeoutError in JungfrauJochClient for parent device {self._parent_name} for 'start' call"
f"Timeout error after {request_timeout} seconds on device {self._parent_name} "
f"during 'start' call with dataset settings: {yaml.dump(settings, indent=4)}."
)
except Exception:
content = traceback.format_exc()
logger.error(
f"Error on device {self._parent_name} during 'start' post with dataset settings: \n"
f"{yaml.dump(settings, indent=4)}. \nTraceback: {content}"
)
raise JungfrauJochClientError(
f"Error in JungfrauJochClient for parent device {self._parent_name} during 'start' call: {content}"
f"Error on device {self._parent_name} during 'start' post with dataset settings: \n"
f"{yaml.dump(settings, indent=4)}. \nTraceback: {content}."
)
def stop(self, request_timeout: float = 0.5) -> None:
"""Stop the acquisition, this only logs errors and is not raising."""
try:
self.api.cancel_post_with_http_info(_request_timeout=request_timeout)
except requests.exceptions.Timeout:
content = traceback.format_exc()
logger.error(
f"Timeout in JungFrauJochClient for device {self._parent_name} during stop: {content}"
)
except Exception:
content = traceback.format_exc()
logger.error(
f"Error in JungFrauJochClient for device {self._parent_name} during stop: {content}"
)
def wait_for_idle(self, timeout: int = 10, request_timeout: float | None = None) -> bool:
"""Wait for JungfrauJoch to be in Idle state. Blocking call with timeout.
def _stop_call(self):
try:
self.api.cancel_post_with_http_info() # (_request_timeout=request_timeout)
except requests.exceptions.Timeout:
content = traceback.format_exc()
logger.error(
f"Timeout error after {request_timeout} seconds on device {self._parent_name} "
f"during stop: {content}"
)
except Exception:
content = traceback.format_exc()
logger.error(f"Error on device {self._parent_name} during stop: {content}")
thread = threading.Thread(
target=_stop_call, daemon=True, args=(self,), name="stop_jungfraujoch_thread"
)
thread.start()
def wait_for_idle(self, timeout: int = 10, raise_on_timeout: bool = True) -> bool:
"""
Method to wait until the detector is in IDLE state. This is a blocking call with a
timeout that can be specified. The additional parameter raise_on_timeout can be used to
raise an exception on timeout instead of returning boolean True/False.
Args:
timeout (int): timeout in seconds
raise_on_timeout (bool): If True, raises an exception on timeout. Default is True.
Returns:
bool: True if the detector is in IDLE state, False if timeout occurred
"""
if request_timeout is None:
request_timeout = timeout
try:
self.api.wait_till_done_post(timeout=timeout, _request_timeout=request_timeout)
self.api.wait_till_done_post(timeout=timeout, _request_timeout=timeout)
except requests.exceptions.Timeout:
raise TimeoutError(f"HTTP request timeout in wait_for_idle for {self._parent_name}")
except Exception:
content = traceback.format_exc()
logger.debug(f"Waiting for device {self._parent_name} to become IDLE: {content}")
logger.info(
f"Timeout after {timeout} seconds on device {self._parent_name} in wait_for_idle: {content}"
)
if raise_on_timeout:
raise TimeoutError(
f"Timeout after {timeout} seconds on device {self._parent_name} in wait_for_idle."
)
return False
except Exception as exc:
content = traceback.format_exc()
logger.info(
f"Error on device {self._parent_name} in wait_for_idle. Full traceback: {content}"
)
if raise_on_timeout:
raise JungfrauJochClientError(
f"Error on device {self._parent_name} in wait_for_idle: {content}"
) from exc
return False
return True

View File

@@ -1,22 +1,136 @@
"""Module for the Eiger preview ZMQ stream."""
"""
Module for the JungfrauJoch preview ZMQ stream for the Eiger detector at cSAXS.
The Preview client is implemented for the JungfrauJoch ZMQ PUB-SUB interface, and
should be independent of the EIGER detector type.
The client connects to the ZMQ PUB-SUB preview stream and calls a user provided callback
function with the decompressed messages received from the stream. The callback needs to be
able to deal with the different message types sent by the JungfrauJoch server ("start",
"image", "end") as described in the JungfrauJoch ZEROMQ preview stream documentation.
(https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream).
"""
from __future__ import annotations
import json
import threading
import time
from typing import Callable
import cbor2
import numpy as np
import zmq
from bec_lib.logger import bec_logger
from dectris.compression import decompress
logger = bec_logger.logger
ZMQ_TOPIC_FILTER = b""
###############################
###### CBOR TAG DECODERS ######
###############################
# Dectris specific CBOR tags and decoders for Jungfrau data
# Reference:
# https://github.com/dectris/documentation/blob/main/stream_v2/examples/client.py
def decode_multi_dim_array(tag: cbor2.CBORTag, column_major: bool = False):
"""Decode a multi-dimensional array from a CBOR tag."""
dimensions, contents = tag.value
if isinstance(contents, list):
array = np.empty((len(contents),), dtype=object)
array[:] = contents
elif isinstance(contents, (np.ndarray, np.generic)):
array = contents
else:
raise cbor2.CBORDecodeValueError("expected array or typed array")
return array.reshape(dimensions, order="F" if column_major else "C")
def decode_typed_array(tag: cbor2.CBORTag, dtype: str):
"""Decode a typed array from a CBOR tag."""
if not isinstance(tag.value, bytes):
raise cbor2.CBORDecodeValueError("expected byte string in typed array")
return np.frombuffer(tag.value, dtype=dtype)
def decode_dectris_compression(tag: cbor2.CBORTag):
"""Decode a Dectris compressed array from a CBOR tag."""
algorithm, elem_size, encoded = tag.value
return decompress(encoded, algorithm, elem_size=elem_size)
#########################################
#### Dectris CBOR TAG Extensions ########
#########################################
# Mapping of various additional CBOR tags from Dectris to decoder functions
tag_decoders = {
40: lambda tag: decode_multi_dim_array(tag, column_major=False),
64: lambda tag: decode_typed_array(tag, dtype="u1"),
65: lambda tag: decode_typed_array(tag, dtype=">u2"),
66: lambda tag: decode_typed_array(tag, dtype=">u4"),
67: lambda tag: decode_typed_array(tag, dtype=">u8"),
68: lambda tag: decode_typed_array(tag, dtype="u1"),
69: lambda tag: decode_typed_array(tag, dtype="<u2"),
70: lambda tag: decode_typed_array(tag, dtype="<u4"),
71: lambda tag: decode_typed_array(tag, dtype="<u8"),
72: lambda tag: decode_typed_array(tag, dtype="i1"),
73: lambda tag: decode_typed_array(tag, dtype=">i2"),
74: lambda tag: decode_typed_array(tag, dtype=">i4"),
75: lambda tag: decode_typed_array(tag, dtype=">i8"),
77: lambda tag: decode_typed_array(tag, dtype="<i2"),
78: lambda tag: decode_typed_array(tag, dtype="<i4"),
79: lambda tag: decode_typed_array(tag, dtype="<i8"),
80: lambda tag: decode_typed_array(tag, dtype=">f2"),
81: lambda tag: decode_typed_array(tag, dtype=">f4"),
82: lambda tag: decode_typed_array(tag, dtype=">f8"),
83: lambda tag: decode_typed_array(tag, dtype=">f16"),
84: lambda tag: decode_typed_array(tag, dtype="<f2"),
85: lambda tag: decode_typed_array(tag, dtype="<f4"),
86: lambda tag: decode_typed_array(tag, dtype="<f8"),
87: lambda tag: decode_typed_array(tag, dtype="<f16"),
1040: lambda tag: decode_multi_dim_array(tag, column_major=True),
56500: lambda tag: decode_dectris_compression(tag), # pylint: disable=unnecessary-lambda
}
def tag_hook(decoder, tag: int):
"""
Tag hook for the cbor2.loads method. Both arguments "decoder" and "tag" mus be present.
We use the tag to choose the respective decoder from the tag_decoders registry if available.
"""
tag_decoder = tag_decoders.get(tag.tag)
return tag_decoder(tag) if tag_decoder else tag
######################
#### ZMQ Settings ####
######################
ZMQ_TOPIC_FILTER = b"" # Subscribe to all topics
ZMQ_CONFLATE_SETTING = 1 # Keep only the most recent message
ZMQ_RCVHWM_SETTING = 1 # Set high water mark to 1, this configures the max number of queue messages
#################################
#### Jungfrau Preview Client ####
#################################
class JungfrauJochPreview:
"""
Preview client for the JungfrauJoch ZMQ preview stream. The client is started with
a URL to receive the data from the JungfrauJoch PUB-SUB preview interface, and a
callback function that is called with messages received from the preview stream.
The callback needs to be able to deal with the different message types sent
by the JungfrauJoch server ("start", "image", "end") as described in the
JungfrauJoch ZEROMQ preview stream documentation. Messages are dictionary dumps.
(https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream).
Args:
url (str): ZMQ PUB-SUB preview stream URL.
cb (Callable): Callback function called with messages received from the stream.
"""
USER_ACCESS = ["start", "stop"]
def __init__(self, url: str, cb: Callable):
@@ -27,16 +141,18 @@ class JungfrauJochPreview:
self._on_update_callback = cb
def connect(self):
"""Connect to the JungfrauJoch PUB-SUB streaming interface
JungfrauJoch may reject connection for a few seconds when it restarts,
so if it fails, wait a bit and try to connect again.
"""
Connect to the JungfrauJoch PUB-SUB streaming interface. If the connection is refused
it will reattempt a second time after a one second delay.
"""
# pylint: disable=no-member
context = zmq.Context()
self._socket = context.socket(zmq.SUB)
self._socket.setsockopt(zmq.CONFLATE, ZMQ_CONFLATE_SETTING)
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
self._socket.setsockopt(zmq.RCVHWM, ZMQ_RCVHWM_SETTING)
try:
self._socket.connect(self.url)
except ConnectionRefusedError:
@@ -44,17 +160,26 @@ class JungfrauJochPreview:
self._socket.connect(self.url)
def start(self):
"""Start the ZMQ update loop in a background thread."""
self._zmq_thread = threading.Thread(
target=self._zmq_update_loop, daemon=True, name="JungfrauJoch_live_preview"
)
self._zmq_thread.start()
def stop(self):
"""Stop the ZMQ update loop and wait for the thread to finish."""
self._shutdown_event.set()
if self._zmq_thread:
self._zmq_thread.join()
self._zmq_thread.join(timeout=1.0)
def _zmq_update_loop(self):
def _zmq_update_loop(self, poll_interval: float = 0.2):
"""
ZMQ update loop running in a background thread. The polling is throttled by
the poll_interval parameter.
Args:
poll_interval (float): Time in seconds to wait between polling attempts.
"""
while not self._shutdown_event.is_set():
if self._socket is None:
self.connect()
@@ -64,18 +189,21 @@ class JungfrauJochPreview:
# Happens when ZMQ partially delivers the multipart message
pass
except zmq.error.Again:
# Happens when receive queue is empty
time.sleep(0.1)
logger.debug(
f"ZMQ Again exception, receive queue is empty for JFJ preview at {self.url}."
)
finally:
# We throttle the polling to avoid heavy load on the device server
time.sleep(poll_interval)
def _poll(self):
"""
Poll the ZMQ socket for new data. It will throttle the data update and
only subscribe to the topic for a single update. This is not very nice
but it seems like there is currently no option to set the update rate on
the backend.
Poll the ZMQ socket for new data. We are currently subscribing and unsubscribing
for each poll loop to avoid receiving too many messages. Throttling of the update
loop is handled in the _zmq_update_loop method.
"""
if self._shutdown_event.wait(0.2):
if self._shutdown_event.is_set():
return
try:
@@ -90,7 +218,19 @@ class JungfrauJochPreview:
# Unsubscribe from the topic
self._socket.setsockopt(zmq.UNSUBSCRIBE, ZMQ_TOPIC_FILTER)
def _parse_data(self, data):
# TODO decode and parse the data
# self._on_update_callback(data)
pass
def _parse_data(self, bytes_list: list[bytes]):
"""
Parse the received ZMQ data from the JungfrauJoch preview stream.
We will call the _on_update_callback with the decompressed messages as a dictionary.
The callback needs to be able to deal with the different message types sent
by the JungfrauJoch server ("start", "image", "end") as described in the
JungfrauJoch ZEROMQ preview stream documentation. Messages are dictionary dumps.
(https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream).
Args:
bytes_list (list[bytes]): List of byte messages received from ZMQ recv_multipart.
"""
for byte_msg in bytes_list:
msg = cbor2.loads(byte_msg, tag_hook=tag_hook)
self._on_update_callback(msg)

View File

@@ -25,6 +25,8 @@ dependencies = [
"bec_widgets",
"zmq",
"opencv-python",
"dectris-compression", # for JFJ preview stream decompression
"cbor2",
]
[project.optional-dependencies]

View File

@@ -5,6 +5,7 @@ from time import time
from typing import TYPE_CHECKING, Generator
from unittest import mock
import numpy as np
import pytest
from bec_lib.messages import FileMessage, ScanStatusMessage
from jfjoch_client.models.broker_status import BrokerStatus
@@ -78,7 +79,7 @@ def detector_list(request) -> Generator[DetectorList, None, None]:
),
DetectorListElement(
id=2,
description="EIGER 8.5M (tmp)",
description="EIGER 9M",
serial_number="123456",
base_ipv4_addr="192.168.0.1",
udp_interface_count=1,
@@ -103,7 +104,11 @@ def eiger_1_5m(mock_scan_info) -> Generator[Eiger1_5M, None, None]:
name = "eiger_1_5m"
dev = Eiger1_5M(name=name, beam_center=(256, 256), detector_distance=100.0)
dev.scan_info.msg = mock_scan_info
yield dev
try:
yield dev
finally:
if dev._destroyed is False:
dev.destroy()
@pytest.fixture(scope="function")
@@ -113,7 +118,19 @@ def eiger_9m(mock_scan_info) -> Generator[Eiger9M, None, None]:
name = "eiger_9m"
dev = Eiger9M(name=name)
dev.scan_info.msg = mock_scan_info
yield dev
try:
yield dev
finally:
if dev._destroyed is False:
dev.destroy()
def test_eiger_wait_for_connection(eiger_1_5m, eiger_9m):
"""Test the wait_for_connection metho is calling status_get on the JFJ API client."""
for eiger in (eiger_1_5m, eiger_9m):
with mock.patch.object(eiger.jfj_client.api, "status_get") as mock_status_get:
eiger.wait_for_connection(timeout=1)
mock_status_get.assert_called_once_with(_request_timeout=1)
@pytest.mark.parametrize("detector_state", ["Idle", "Inactive"])
@@ -141,7 +158,7 @@ def test_eiger_1_5m_on_connected(eiger_1_5m, detector_list, detector_state):
else:
eiger.on_connected()
assert mock_set_det.call_args == mock.call(
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=5
)
assert mock_file_writer.call_args == mock.call(
file_writer_settings=FileWriterSettings(
@@ -179,7 +196,7 @@ def test_eiger_9m_on_connected(eiger_9m, detector_list, detector_state):
else:
eiger.on_connected()
assert mock_set_det.call_args == mock.call(
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=5
)
assert mock_file_writer.call_args == mock.call(
file_writer_settings=FileWriterSettings(
@@ -216,11 +233,39 @@ def test_eiger_on_stop(eiger_1_5m):
stop_event.wait(timeout=5) # Thread should be killed from task_handler
def test_eiger_on_destroy(eiger_1_5m):
"""Test the on_destroy logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
start_event = threading.Event()
stop_event = threading.Event()
def tmp_task():
start_event.set()
try:
while True:
time.sleep(0.1)
finally:
stop_event.set()
eiger.task_handler.submit_task(tmp_task)
start_event.wait(timeout=5)
with (
mock.patch.object(eiger.jfj_preview_client, "stop") as mock_jfj_preview_client_stop,
mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop,
):
eiger.on_destroy()
mock_jfj_preview_client_stop.assert_called_once()
mock_jfj_client_stop.assert_called_once()
stop_event.wait(timeout=5)
@pytest.mark.timeout(25)
@pytest.mark.parametrize("raise_timeout", [True, False])
def test_eiger_on_complete(eiger_1_5m, raise_timeout):
"""Test the on_complete logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
eiger._wait_for_on_complete = 1 # reduce wait time for testing
callback_completed_event = threading.Event()
@@ -230,7 +275,7 @@ def test_eiger_on_complete(eiger_1_5m, raise_timeout):
unblock_wait_for_idle = threading.Event()
def mock_wait_for_idle(timeout: int, request_timeout: float):
def mock_wait_for_idle(timeout: float, raise_on_timeout: bool) -> bool:
if unblock_wait_for_idle.wait(timeout):
if raise_timeout:
return False
@@ -238,11 +283,18 @@ def test_eiger_on_complete(eiger_1_5m, raise_timeout):
return False
with (
mock.patch.object(
eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state="Idle")
),
mock.patch.object(eiger.jfj_client, "wait_for_idle", side_effect=mock_wait_for_idle),
mock.patch.object(
eiger.jfj_client.api,
"statistics_data_collection_get",
return_value=MeasurementStatistics(run_number=1),
return_value=MeasurementStatistics(
run_number=1,
images_collected=eiger.scan_info.msg.num_points
* eiger.scan_info.msg.scan_parameters["frames_per_trigger"],
),
),
):
status = eiger.complete()
@@ -284,7 +336,7 @@ def test_eiger_file_event_callback(eiger_1_5m, tmp_path):
assert file_msg.hinted_h5_entries == {"data": "entry/data/data"}
def test_eiger_on_sage(eiger_1_5m):
def test_eiger_on_stage(eiger_1_5m):
"""Test the on_stage and on_unstage logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
scan_msg = eiger.scan_info.msg
@@ -316,3 +368,35 @@ def test_eiger_on_sage(eiger_1_5m):
)
assert mock_start.call_args == mock.call(settings=data_settings)
assert eiger.staged is Staged.yes
def test_eiger_set_det_distance_test_beam_center(eiger_1_5m):
"""Test the set_detector_distance and set_beam_center methods. Equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
old_distance = eiger.detector_distance
new_distance = old_distance + 100
old_beam_center = eiger.beam_center
new_beam_center = (old_beam_center[0] + 20, old_beam_center[1] + 50)
eiger.set_detector_distance(new_distance)
assert eiger.detector_distance == new_distance
eiger.set_beam_center(x=new_beam_center[0], y=new_beam_center[1])
assert eiger.beam_center == new_beam_center
with pytest.raises(ValueError):
eiger.set_beam_center(x=-10, y=100) # Cannot set negative beam center
with pytest.raises(ValueError):
eiger.detector_distance = -50 # Cannot set negative detector distance
def test_eiger_preview_callback(eiger_1_5m):
"""Preview callback test for the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
# NOTE: I don't find models for the CBOR messages used by JFJ, currently using a dummay dict.
# Please adjust once the proper model is found.
for msg_type in ["start", "end", "image", "calibration", "metadata"]:
msg = {"type": msg_type, "data": {"default": np.array([[1, 2], [3, 4]])}}
with mock.patch.object(eiger.preview_image, "put") as mock_preview_put:
eiger._preview_callback(msg)
if msg_type == "image":
mock_preview_put.assert_called_once_with(msg["data"]["default"])
else:
mock_preview_put.assert_not_called()