fix: Cleanup after tests at the beamline

This commit is contained in:
2026-01-26 12:31:27 +01:00
parent 57613488d0
commit 0fb85d785b
3 changed files with 159 additions and 52 deletions

View File

@@ -28,7 +28,7 @@ Some extra notes for setting up the detector:
Hardware related notes:
- If there is an HW issue with the detector, power cycling may help.
- The sls_detector package is available on console on /sls/X12SA/data/gac-x12sa/erik/micromamba
- The sls_detector package is available on console on /sls/x12sa/applications/erik/micromamba
- Run: source setup_9m.sh # Be careful, this connects to the detector, so it should not be
used during operation
- Useful commands:
@@ -120,8 +120,9 @@ class Eiger(PSIDeviceBase):
super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs)
self._host = f"{host}:{port}"
self.jfj_client = JungfrauJochClient(host=self._host, parent=self)
# NOTE: fetch this information from JungfrauJochClient during on_connected!
self.jfj_preview_client = JungfrauJochPreview(
url="tcp://129.129.95.114:5400", cb=self.preview_image.put
url="tcp://129.129.95.114:5400", cb=self._preview_callback
) # IP of sls-jfjoch-001.psi.ch on port 5400 for ZMQ stream
self.device_manager = device_manager
self.detector_name = detector_name
@@ -132,6 +133,15 @@ class Eiger(PSIDeviceBase):
if self.device_manager is not None:
self.device_manager: DeviceManagerDS
def _preview_callback(self, message: dict) -> None:
if message.get("type", "") == "image":
data = message.get("data", {}).get("default", None)
if data is None:
logger.error(f"Received image message on device {self.name} without data.")
return
logger.info(f"Received preview image on device {self.name}")
self.preview_image.put(data)
@property
def detector_distance(self) -> float:
"""The detector distance in mm."""
@@ -173,7 +183,7 @@ class Eiger(PSIDeviceBase):
# Get available detectors
available_detectors = self.jfj_client.api.config_select_detector_get(_request_timeout=5)
logger.info(f"Available detectors {available_detectors}")
logger.debug(f"Available detectors {available_detectors}")
# Get current detector
current_detector_name = ""
if available_detectors.current_id is not None:
@@ -191,6 +201,7 @@ class Eiger(PSIDeviceBase):
raise RuntimeError(
f"Detector {self.detector_name} is not in IDLE state, current state: {self.jfj_client.detector_state}. Please initialize the detector in the WEB UI: {self._host}."
)
# TODO - check again once Eiger should be initialized automatically, currently human initialization is expected
# # Once the automation should be enabled, we may use here
# detector_selection = [
@@ -266,11 +277,12 @@ class Eiger(PSIDeviceBase):
incident_energy_ke_v=incident_energy,
)
logger.info(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}")
prep_time = start_time - time.time()
logger.debug(f"Prepared information for eiger to start acquisition in {prep_time:.2f}s")
prep_time = time.time()
self.jfj_client.wait_for_idle(timeout=10, request_timeout=10) # Ensure we are in IDLE state
self.jfj_client.start(settings=data_settings) # Takes around ~0.6s
logger.debug(f"Wait for IDLE and start call took {time.time()-start_time-prep_time:.2f}s")
logger.info(
f"On stage done for device {self.name} after {time.time()-start_time:.2f}s, with {time.time()-prep_time:.2f}s spent with communication to JungfrauJoch."
)
def on_unstage(self) -> DeviceStatus:
"""Called while unstaging the device."""
@@ -283,7 +295,9 @@ class Eiger(PSIDeviceBase):
def _file_event_callback(self, status: DeviceStatus) -> None:
"""Callback to update the file_event signal when the acquisition is done."""
logger.info(f"Acquisition done callback called for {self.name} for status {status.success}")
logger.debug(
f"Acquisition done callback called for {self.name} for status {status.success}"
)
self.file_event.put(
file_path=self._full_path,
done=status.done,
@@ -297,15 +311,20 @@ class Eiger(PSIDeviceBase):
def wait_for_complete():
start_time = time.time()
timeout = 20
for ii in range(timeout):
logger.info(f"Running loop with timeout {time.time() - start_time}.")
if self.jfj_client.wait_for_idle(timeout=1, request_timeout=10):
for _ in range(timeout):
if self.jfj_client.wait_for_idle(
timeout=1, request_timeout=10, raise_on_timeout=False
):
logger.info(f"Device {self.name} completed acquisition.")
return
logger.info(
f"Device {self.name} running loop to wait for complete, time elapsed: {time.time() - start_time}."
)
statistics: MeasurementStatistics = self.jfj_client.api.statistics_data_collection_get(
_request_timeout=5
)
raise TimeoutError(
f"Timeout after waiting for detector {self.name} to complete for {time.time()-start_time:.2f}s, measurement statistics: {yaml.dump(statistics.to_dict(), indent=4)}"
f"Timeout after waiting for device {self.name} to complete for {time.time()-start_time:.2f}s, measurement statistics: {yaml.dump(statistics.to_dict(), indent=4)}"
)
status = self.task_handler.submit_task(wait_for_complete, run=True)

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
import enum
import threading
import time
import traceback
from typing import TYPE_CHECKING
@@ -152,20 +153,29 @@ class JungfrauJochClient:
def stop(self, request_timeout: float = 0.5) -> None:
"""Stop the acquisition, this only logs errors and is not raising."""
try:
self.api.cancel_post_with_http_info(_request_timeout=request_timeout)
except requests.exceptions.Timeout:
content = traceback.format_exc()
logger.error(
f"Timeout in JungFrauJochClient for device {self._parent_name} during stop: {content}"
)
except Exception:
content = traceback.format_exc()
logger.error(
f"Error in JungFrauJochClient for device {self._parent_name} during stop: {content}"
)
def wait_for_idle(self, timeout: int = 10, request_timeout: float | None = None) -> bool:
def _stop_call(self):
try:
self.api.cancel_post_with_http_info() # (_request_timeout=request_timeout)
except requests.exceptions.Timeout:
content = traceback.format_exc()
logger.error(
f"Timeout in JungFrauJochClient for device {self._parent_name} during stop: {content}"
)
except Exception:
content = traceback.format_exc()
logger.error(
f"Error in JungFrauJochClient for device {self._parent_name} during stop: {content}"
)
thread = threading.Thread(
target=_stop_call, daemon=True, args=(self,), name="stop_jungfraujoch_thread"
)
thread.start()
def wait_for_idle(
self, timeout: int = 10, request_timeout: float | None = None, raise_on_timeout: bool = True
) -> bool:
"""Wait for JungfrauJoch to be in Idle state. Blocking call with timeout.
Args:
@@ -178,9 +188,17 @@ class JungfrauJochClient:
try:
self.api.wait_till_done_post(timeout=timeout, _request_timeout=request_timeout)
except requests.exceptions.Timeout:
raise TimeoutError(f"HTTP request timeout in wait_for_idle for {self._parent_name}")
except Exception:
if raise_on_timeout:
raise TimeoutError(
f"HTTP request timeout in wait_for_idle for {self._parent_name}."
)
return False
except Exception as exc:
content = traceback.format_exc()
logger.debug(f"Waiting for device {self._parent_name} to become IDLE: {content}")
if raise_on_timeout:
raise JungfrauJochClientError(
f"Error in wait_for_idle for {self._parent_name}: {content}"
) from exc
return False
return True

View File

@@ -1,26 +1,39 @@
"""Module for the Eiger preview ZMQ stream."""
"""
Module for the JungfrauJoch preview ZMQ stream for the Eiger detector at cSAXS.
The Preview client is implemented for the JungfrauJoch ZMQ PUB-SUB interface, and
should be independent of the EIGER detector type.
The client connects to the ZMQ PUB-SUB preview stream and calls a user provided callback
function with the decompressed messages received from the stream. The callback needs to be
able to deal with the different message types sent by the JungfrauJoch server ("start",
"image", "end") as described in the JungfrauJoch ZEROMQ preview stream documentation.
(https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream).
"""
from __future__ import annotations
import json
import threading
import time
from typing import Callable
import cbor2
import numpy as np
import zmq
from bec_lib.logger import bec_logger
from dectris.compression import decompress
logger = bec_logger.logger
ZMQ_TOPIC_FILTER = b""
import cbor2
import numpy as np
from dectris.compression import decompress
###############################
###### CBOR TAG DECODERS ######
###############################
# Dectris specific CBOR tags and decoders for Jungfrau data
# Reference:
# https://github.com/dectris/documentation/blob/main/stream_v2/examples/client.py
def decode_multi_dim_array(tag, column_major):
def decode_multi_dim_array(tag: cbor2.CBORTag, column_major: bool = False):
"""Decode a multi-dimensional array from a CBOR tag."""
dimensions, contents = tag.value
if isinstance(contents, list):
array = np.empty((len(contents),), dtype=object)
@@ -32,17 +45,23 @@ def decode_multi_dim_array(tag, column_major):
return array.reshape(dimensions, order="F" if column_major else "C")
def decode_typed_array(tag, dtype):
def decode_typed_array(tag: cbor2.CBORTag, dtype: str):
"""Decode a typed array from a CBOR tag."""
if not isinstance(tag.value, bytes):
raise cbor2.CBORDecodeValueError("expected byte string in typed array")
return np.frombuffer(tag.value, dtype=dtype)
def decode_dectris_compression(tag):
def decode_dectris_compression(tag: cbor2.CBORTag):
"""Decode a Dectris compressed array from a CBOR tag."""
algorithm, elem_size, encoded = tag.value
return decompress(encoded, algorithm, elem_size=elem_size)
#########################################
#### Dectris CBOR TAG Extensions ########
#########################################
tag_decoders = {
40: lambda tag: decode_multi_dim_array(tag, column_major=False),
64: lambda tag: decode_typed_array(tag, dtype="u1"),
@@ -73,12 +92,42 @@ tag_decoders = {
}
def tag_hook(decoder, tag):
def tag_hook(tag: int):
"""Get the decoder for Dectris specific CBOR tags. tag must be in tag_decoders."""
tag_decoder = tag_decoders.get(tag.tag)
return tag_decoder(tag) if tag_decoder else tag
######################
#### ZMQ Settings ####
######################
ZMQ_TOPIC_FILTER = b"" # Subscribe to all topics
ZMQ_CONFLATE_SETTING = 1 # Keep only the most recent message
ZMQ_RCVHWM_SETTING = 1 # Set high water mark to 1, this configures the max number of queue messages
#################################
#### Jungfrau Preview Client ####
#################################
class JungfrauJochPreview:
"""
Preview client for the JungfrauJoch ZMQ preview stream. The client is started with
a URL to receive the data from the JungfrauJoch PUB-SUB preview interface, and a
callback function that is called with messages received from the preview stream.
The callback needs to be able to deal with the different message types sent
by the JungfrauJoch server ("start", "image", "end") as described in the
JungfrauJoch ZEROMQ preview stream documentation. Messages are dictionary dumps.
(https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream).
Args:
url (str): ZMQ PUB-SUB preview stream URL.
cb (Callable): Callback function called with messages received from the stream.
"""
USER_ACCESS = ["start", "stop"]
def __init__(self, url: str, cb: Callable):
@@ -89,16 +138,18 @@ class JungfrauJochPreview:
self._on_update_callback = cb
def connect(self):
"""Connect to the JungfrauJoch PUB-SUB streaming interface
JungfrauJoch may reject connection for a few seconds when it restarts,
so if it fails, wait a bit and try to connect again.
"""
Connect to the JungfrauJoch PUB-SUB streaming interface. If the connection is refused
it will reattempt a second time after a one second delay.
"""
# pylint: disable=no-member
context = zmq.Context()
self._socket = context.socket(zmq.SUB)
self._socket.setsockopt(zmq.CONFLATE, ZMQ_CONFLATE_SETTING)
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
self._socket.setsockopt(zmq.RCVHWM, ZMQ_RCVHWM_SETTING)
try:
self._socket.connect(self.url)
except ConnectionRefusedError:
@@ -106,17 +157,20 @@ class JungfrauJochPreview:
self._socket.connect(self.url)
def start(self):
"""Start the ZMQ update loop in a background thread."""
self._zmq_thread = threading.Thread(
target=self._zmq_update_loop, daemon=True, name="JungfrauJoch_live_preview"
)
self._zmq_thread.start()
def stop(self):
"""Stop the ZMQ update loop and wait for the thread to finish."""
self._shutdown_event.set()
if self._zmq_thread:
self._zmq_thread.join()
def _zmq_update_loop(self):
"""Zmq update loop with polling for new data. The loop runs at maximum 10 Hz."""
while not self._shutdown_event.is_set():
if self._socket is None:
self.connect()
@@ -127,17 +181,22 @@ class JungfrauJochPreview:
pass
except zmq.error.Again:
# Happens when receive queue is empty
time.sleep(0.1)
time.sleep(0.1) # NOTE: Change sleep time to control polling rate
def _poll(self):
"""
Poll the ZMQ socket for new data. It will throttle the data update and
only subscribe to the topic for a single update. This is not very nice
but it seems like there is currently no option to set the update rate on
the backend.
Poll the ZMQ socket for new data. We are currently subscribing and unsubscribing
for each poll loop to avoid receiving too many messages. The zmq update loop is
running with a maximum of 10 Hz, and we wait an additional 0.2 seconds here to
avoid busy waiting. Therefore, the maximum data rate is around 3 Hz.
NOTE:
This can be optimized in the future as JungfrauJoch supports controlling the update
rate from the server side. We still safeguard here to avoid that the device-server
becomes overloaded with messages.
"""
if self._shutdown_event.wait(0.2):
if self._shutdown_event.wait(0.2): # NOTE: Change to adjust throttle rate in polling.
return
try:
@@ -152,8 +211,19 @@ class JungfrauJochPreview:
# Unsubscribe from the topic
self._socket.setsockopt(zmq.UNSUBSCRIBE, ZMQ_TOPIC_FILTER)
def _parse_data(self, data):
message = cbor2.loads(data, tag_hook=tag_hook)
# Parse message to data, call callback with data
for cb in self._on_update_callback:
cb(message)
def _parse_data(self, bytes_list: list[bytes]):
"""
Parse the received ZMQ data from the JungfrauJoch preview stream.
We will call the _on_update_callback with the decompressed messages.
The callback needs to be able to deal with the different message types sent
by the JungfrauJoch server ("start", "image", "end") as described in the
JungfrauJoch ZEROMQ preview stream documentation. Messages are dictionary dumps.
(https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream).
Args:
bytes_list (list[bytes]): List of byte messages received from ZMQ recv_multipart.
"""
for byte_msg in bytes_list:
msg = cbor2.loads(byte_msg, tag_hook=tag_hook)
self._on_update_callback(msg)