feat(eiger): add jfj integration

This commit is contained in:
2025-08-11 11:24:20 +02:00
parent 189141a047
commit 02ac2e19cf
10 changed files with 663 additions and 86 deletions

View File

@@ -42,3 +42,29 @@ ids_cam:
enabled: true
readoutPriority: async
softwareTrigger: True
eiger_1_5:
description: Eiger 1.5M in-vacuum detector
deviceClass: csaxs_bec.devices.jungfraujoch.eiger_1_5m.Eiger1_5M
deviceConfig:
detector_distance: 100
beam_center: [0, 0]
onFailure: raise
enabled: true
readoutPriority: async
softwareTrigger: False
samx:
readoutPriority: baseline
deviceClass: ophyd_devices.SimPositioner
deviceConfig:
delay: 1
limits:
- -50
- 50
tolerance: 0.01
update_frequency: 400
deviceTags:
- user motors
enabled: true
readOnly: false

View File

@@ -33,6 +33,7 @@ from __future__ import annotations
import threading
import time
import traceback
from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
@@ -160,6 +161,9 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# f = e + 1us
# e has refernce to d, f has reference to e
self.set_delay_pairs(channel="ef", delay=0, width=1e-6)
time.sleep(
0.2
) # After staging, make sure that the DDG HW has some time to process changes properly.
def _prepare_mcs_on_trigger(self, mcs: MCSCardCSAXS) -> None:
"""Prepare the MCS card for the next trigger.
@@ -188,7 +192,13 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
while (
self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set()
):
self._poll_loop()
try:
self._poll_loop()
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Exception in polling loop thread, polling continues...\n Error content:\n{content}"
)
self._poll_thread_poll_loop_done.set()
@@ -201,13 +211,17 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
to be necessary to avoid missing events.
"""
self.state.proc_status.put(1, use_complete=True)
time.sleep(0.02) # 20ms delay for processing, important for not missing events
if self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set():
if (
self._poll_thread_run_event.wait(timeout=0.02)
and not self._poll_thread_kill_event.is_set()
):
return
self.state.event_status.get(use_monitor=False)
if self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set():
if (
self._poll_thread_run_event.wait(timeout=0.02)
and not self._poll_thread_kill_event.is_set()
):
return
time.sleep(0.02) # 20ms delay for processing, important for not missing events
def _start_polling(self) -> None:
"""Start the polling loop in the background thread."""
@@ -253,6 +267,9 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
If we don't then subsequent triggers may reach the DDG too early, and will be ignored. To
avoid this, we've added the option to specify a delay via add_delay, default here is 50ms.
"""
# Keep sleep here for software trigger mode as 20ms delay between subsequent commands
# to the HW are necessary to avoid crashes and missing events.
time.sleep(0.02)
# Stop polling, poll once manually to ensure that the register is clean
self._stop_polling()
self._poll_thread_poll_loop_done.wait(timeout=1)

View File

@@ -158,7 +158,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
self.acquire_mode.set(ACQUIREMODE.MCS).wait(timeout=self._pv_timeout)
# Subscribe the progress signal
self.current_channel.subscribe(self._progress_update, run=False)
# self.current_channel.subscribe(self._progress_update, run=False)
# Subscribe to the mca updates
for name in self.counter_mapping.keys():

View File

@@ -0,0 +1,284 @@
"""
Generic integration of JungfrauJoch backend with Eiger detectors
for the cSAXS beamline at the Swiss Light Source.
The WEB UI is available on http://sls-jfjoch-001:8080
NOTE: this may not be the best place to store this information. It should be migrated to
beamline documentation for debugging of Eiger & JungfrauJoch.
The JungfrauJoch server for cSAXS runs on sls-jfjoch-001.psi.ch
User with sufficient rights may use:
- sudo systemctl restart jfjoch_broker
- sudo systemctl status jfjoch_broker
to check and/or restart the broker for the JungfrauJoch server.
Some extra notes for setting up the detector:
- If the energy on JFJ is set via DetectorSettings, the variable in DatasetSettings will be ignored
- Changes in energy may take time, good to implement logic that only resets energy if needed.
- For the Eiger, the frame_time_us in DetectorSettings is ignored, only the frame_time_us in
the DatasetSettings is relevant
- The bit_depth will be adjusted automatically based on the exp_time. Here, we need to ensure
that subsequent triggers properly
consider the readout_time of the boards. For Jungfrau detectors, the difference between
count_time_us and frame_time_us is the readout_time of the boards. For the Eiger, this needs
to be taken into account during the integration.
- beam_center and detector settings are required input arguments, thus, they may be set to wrong
values for acquisitions to start. Please keep this in mind.
Hardware related notes:
- If there is an HW issue with the detector, power cycling may help.
- The sls_detector package is available on console on /sls/X12SA/data/gac-x12sa/erik/micromamba
- Run: source setup_9m.sh # Be careful, this connects to the detector, so it should not be
used during operation
- Useful commands:
- p highvoltage 0 or 150 (operational)
- g highvoltage
- # Put high voltage to 0 before power cylcing it.
- telnet bchip500
- cd power_control_user/
- ./on
- ./off
Further information that may be relevant for debugging:
JungfrauJoch - one needs to connect to the jfj-server (sls-jfjoch-001)
"""
from __future__ import annotations
import os
import time
import traceback
from typing import TYPE_CHECKING, Literal
import requests
import yaml
from bec_lib.file_utils import get_full_path
from bec_lib.logger import bec_logger
from jfjoch_client.models.dataset_settings import DatasetSettings
from jfjoch_client.models.detector_selection import DetectorSelection
from jfjoch_client.models.detector_settings import DetectorSettings
from jfjoch_client.models.detector_timing import DetectorTiming
from ophyd import DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from csaxs_bec.devices.jungfraujoch.jungfrau_joch_client import (
JungfrauJochClient,
JungfrauJochClientError,
)
if TYPE_CHECKING: # pragma no cover
from bec_lib.devicemanager import ScanInfo
from bec_server.device_server.device_server import DeviceManagerDS
from jfjoch_client.models.detector_list_element import DetectorListElement
logger = bec_logger.logger
EIGER_READOUT_TIME_US = 500e-6 # 500 microseconds in s
class EigerError(Exception):
"""Custom exception for Eiger detector errors."""
class Eiger(PSIDeviceBase):
"""
Base integration of the Eiger1.5M and Eiger9M at cSAXS. All relevant
"""
USER_ACCESS = ["detector_distance", "beam_center"]
def __init__(
self,
name: str,
detector_name: Literal["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"],
host: str = "http://sls-jfjoch-001",
port: int = 8080,
detector_distance: float = 100.0,
beam_center: tuple[int, int] = (0, 0),
scan_info: ScanInfo = None,
readout_time: float = EIGER_READOUT_TIME_US,
device_manager=None,
**kwargs,
):
"""
Initialize the PSI Device Base class.
Args:
name (str) : Name of the device
detector_name (str): Name of the detector. Supports ["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"]
host (str): Hostname of the Jungfrau Joch server.
port (int): Port of the Jungfrau Joch server.
scan_info (ScanInfo): The scan info to use.
device_manager (DeviceManagerDS): The device manager to use.
**kwargs: Additional keyword arguments.
"""
super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs)
self._host = f"{host}:{port}"
self.jfj_client = JungfrauJochClient(host=self._host, parent=self)
self.device_manager = device_manager
self.detector_name = detector_name
self._detector_distance = detector_distance
self._beam_center = beam_center
self._readout_time = readout_time
self._full_path = ""
if self.device_manager is not None:
self.device_manager: DeviceManagerDS
@property
def detector_distance(self) -> float:
"""The detector distance in mm."""
return self._detector_distance
@detector_distance.setter
def detector_distance(self, value: float) -> None:
"""Set the detector distance in mm."""
if value <= 0:
raise ValueError("Detector distance must be a positive value.")
self._detector_distance = value
@property
def beam_center(self) -> tuple[float, float]:
"""The beam center in pixels. (x,y)"""
return self._beam_center
@beam_center.setter
def beam_center(self, value: tuple[float, float]) -> None:
"""Set the beam center in pixels. (x,y)"""
self._beam_center = value
def on_init(self) -> None:
"""
Called when the device is initialized.
No siganls are connected at this point,
thus should not be set here but in on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
logger.debug(f"On connected called for {self.name}")
self.jfj_client.stop(request_timeout=3)
# If stop failed, it will not raise. So the next calls may raise instead.. TODO logic could be discussed
# Get available detectors
available_detectors = self.jfj_client.api.config_select_detector_get(_request_timeout=5)
detector_info = [
det for det in available_detectors.detectors if det.description == self.detector_name
]
# If the specified detector is not found, or multiple definitions for that name are available, something is wrong.
if len(detector_info) != 1:
raise ValueError(
f"Detector {self.detector_name} not found in available detectors: {available_detectors}"
)
detector_info: DetectorListElement = detector_info[0]
try:
self.jfj_client.api.config_select_detector_put(
DetectorSelection(id=detector_info.id), _request_timeout=10
)
except requests.exceptions.Timeout:
raise TimeoutError(f"Timeout while selecting detector {self.detector_name}")
except Exception:
content = traceback.format_exc()
raise EigerError(f"Error while selecting detector {self.detector_name}: {content}")
# Try to connect, needs to be in Inactive or Error state
self.jfj_client.connect_and_initialise(timeout=10, _request_timeout=10)
# This sets the energy threshold for the EIGER detector
settings = DetectorSettings(frame_time_us=int(500), timing=DetectorTiming.TRIGGER)
logger.debug(f"Setting detector_settings: {yaml.dump(settings.to_dict(), indent=4)}")
self.jfj_client.set_detector_settings(settings, timeout=10)
def on_stage(self) -> DeviceStatus | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info object.
"""
start_time = time.time()
scan_msg = self.scan_info.msg
# TODO add mono energy logic, if energy changed more than ~1-2%, adapt thresholds!
# Add logic to set the detector energy threshold from mono_energy pv
# dev.mono_energy from device_manager
incident_energy = 12.0
exp_time = scan_msg.scan_parameters.get("exp_time", 0)
if exp_time <= self._readout_time:
raise ValueError(
f"Receive scan request for scan {scan_msg.scan_name} with exp_time {exp_time}s, which must be larger than the readout time {self._readout_time}s of the detector {self.detector_name}."
)
frame_time_us = exp_time # - self._readout_time # Needs to be checked if needed
# convert frame time to us
# settings = DetectorSettings(frame_time_us=int(frame_time_us * 1e6))
# self.jfj_client.set_detector_settings(settings, timeout=10)
# Set acquisition parameter
ntrigger = int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"])
# Fetch file path
self._full_path = get_full_path(
scan_msg, name=self.name
) # We can discuss if this should be used with name..
# Get Path from Broker
# Fetch basepath from JFJ broker config
path = os.path.relpath(self._full_path, start="/sls/x12sa/data")
data_settings = DatasetSettings(
image_time_us=int(frame_time_us * 1e6), # This is currently ignored
ntrigger=ntrigger,
file_prefix=path,
beam_x_pxl=int(self._beam_center[0]),
beam_y_pxl=int(self._beam_center[1]),
detector_distance_mm=self.detector_distance,
incident_energy_ke_v=incident_energy,
)
logger.debug(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}")
prep_time = start_time - time.time()
logger.info(f"Prepared information for eiger to start acquisition in {prep_time:.2f}s")
self.jfj_client.start(settings=data_settings)
start_call_returns = time.time() - start_time - prep_time
logger.info(f"Start Rest call from JFJ took {start_call_returns:.2f}s")
def on_unstage(self) -> DeviceStatus:
"""Called while unstaging the device."""
def on_pre_scan(self) -> DeviceStatus:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus:
"""Called to inquire if a device has completed a scans."""
def wait_for_complete():
timeout = 10
for _ in range(timeout):
try:
self.jfj_client.wait_till_done(timeout=1, _request_timeout=5)
except (
JungfrauJochClientError
): # Means that timeout was triggered, and not _request_timeout
continue
except TimeoutError: # Timeout exception from wait_till_done
content = traceback.format_exc()
raise TimeoutError(f"Timeout for request during complete call: {content}")
except Exception as e: # This should actually never occur..
raise ValueError(f"Error in complete for {self.name}, exception: {e}") from e
else:
break
status = self.task_handler.submit_task(wait_for_complete, run=True)
self.cancel_on_stop(status)
return status
def on_kickoff(self) -> DeviceStatus | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.jfj_client.stop(
request_timeout=0.5
) # Call should not block more than 0.5 seconds to stop all devices...
self.task_handler.shutdown()

View File

@@ -0,0 +1,54 @@
"""
Eiger 1.5M specific integration. It is based on the Eiger base integration for the JungfrauJoch backend
which is placed in eiger_csaxs, and where code that is equivalent for the Eiger9M and Eiger1.5M is shared.
Please check the eiger_csaxs.py class for more details about the relevant services.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from csaxs_bec.devices.jungfraujoch.eiger import Eiger
EIGER1_5M_READOUT_TIME_US = 500e-6 # 500 microseconds in s
DETECTOR_NAME = "EIGER 1.5M"
if TYPE_CHECKING: # pragma no cover
from bec_lib.devicemanager import ScanInfo
from bec_server.device_server.device_server import DeviceManagerDS
# pylint:disable=invalid-name
class Eiger1_5M(Eiger):
"""
Eiger 1.5M specific integration for the in-vaccum Eiger.
The logic implemented here is coupled to the DelayGenerator integration,
repsonsible for the global triggering of all devices through a single Trigger logic.
Please check the eiger.py class for more details about the integration of relevant backend
services. The detector_name must be set to "EIGER 1.5M:
"""
USER_ACCESS = Eiger.USER_ACCESS + [] # Add more user_access methods here.
def __init__(
self,
name: str,
detector_distance: float = 100.0,
beam_center: tuple[float, float] = (0.0, 0.0),
scan_info: ScanInfo = None,
device_manager: DeviceManagerDS = None,
**kwargs,
) -> None:
super().__init__(
name=name,
detector_name=DETECTOR_NAME,
readout_time=EIGER1_5M_READOUT_TIME_US,
detector_distance=detector_distance,
beam_center=beam_center,
scan_info=scan_info,
device_manager=device_manager,
**kwargs,
)

View File

@@ -0,0 +1,58 @@
"""
Eiger 9M specific integration. It is based on the Eiger base integration for the JungfrauJoch backend
which is placed in eiger_csaxs, and where code that is equivalent for the Eiger9M and Eiger1.5M is shared.
Please check the eiger_csaxs.py class for more details about the relevant services.
In 16bit mode, 8e7 counts/s per pixel are supported in summed up frames,
although subframes will never have more than 12bit counts (~4000 counts per pixel in subframe).
In 32bit mode, 2e7 counts/s per pixel are supported, for which subframes will have no
more than 24bit counts, which means 16.7 million counts per pixel in subframes.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from csaxs_bec.devices.jungfraujoch.eiger import Eiger
if TYPE_CHECKING: # pragma no cover
from bec_lib.devicemanager import ScanInfo
from bec_server.device_server.device_server import DeviceManagerDS
EIGER9M_READOUT_TIME_US = 500e-6 # 500 microseconds in s
DETECTOR_NAME = "EIGER 8.5M (tmp)" # "EIGER 9M""
# pylint:disable=invalid-name
class Eiger1_5M(Eiger):
"""
Eiger 1.5M specific integration for the in-vaccum Eiger.
The logic implemented here is coupled to the DelayGenerator integration,
repsonsible for the global triggering of all devices through a single Trigger logic.
Please check the eiger.py class for more details about the integration of relevant backend
services. The detector_name must be set to "EIGER 1.5M:
"""
USER_ACCESS = Eiger.USER_ACCESS + [] # Add more user_access methods here.
def __init__(
self,
name: str,
detector_distance: float = 100.0,
beam_center: tuple[float, float] = (0.0, 0.0),
scan_info: ScanInfo = None,
device_manager: DeviceManagerDS = None,
**kwargs,
) -> None:
super().__init__(
name=name,
detector_name=DETECTOR_NAME,
readout_time=EIGER9M_READOUT_TIME_US,
detector_distance=detector_distance,
beam_center=beam_center,
scan_info=scan_info,
device_manager=device_manager,
**kwargs,
)

View File

@@ -1,20 +1,36 @@
"""Module with client interface for the Jungfrau Joch detector API"""
from __future__ import annotations
import enum
import math
import time
import traceback
from typing import TYPE_CHECKING
import jfjoch_client
import requests
from bec_lib.logger import bec_logger
from jfjoch_client.api.default_api import DefaultApi
from jfjoch_client.api_client import ApiClient
from jfjoch_client.configuration import Configuration
from jfjoch_client.models.broker_status import BrokerStatus
from jfjoch_client.models.dataset_settings import DatasetSettings
from jfjoch_client.models.detector_settings import DetectorSettings
logger = bec_logger.logger
if TYPE_CHECKING:
from ophyd import Device
# pylint: disable=raise-missing-from
# pylint: disable=broad-except
class JungfrauJochClientError(Exception):
"""Base class for exceptions in this module."""
class DetectorState(enum.StrEnum):
"""Detector states for Jungfrau Joch detector
['Inactive', 'Idle', 'Busy', 'Measuring', 'Pedestal', 'Error']
"""
class DetectorState(str, enum.Enum):
"""Possible Detector states for Jungfrau Joch detector"""
INACTIVE = "Inactive"
IDLE = "Idle"
@@ -24,24 +40,30 @@ class DetectorState(enum.StrEnum):
ERROR = "Error"
class ResponseWaitDone(enum.IntEnum):
"""Response state for Jungfrau Joch detector wait till done"""
DETECTOR_IDLE = 200
TIMEOUT_PARAM_OUT_OF_RANGE = 400
JUNGFRAU_ERROR = 500
DETECTOR_INACTIVE = 502
TIMEOUT_REACHED = 504
class JungfrauJochClient:
"""Thin wrapper around the Jungfrau Joch API client"""
"""Thin wrapper around the Jungfrau Joch API client.
def __init__(self, host: str = "http://sls-jfjoch-001:8080") -> None:
sudo systemctl restart jfjoch_broker
sudo systemctl status jfjoch_broker
It looks as if the detector is not being stopped properly.
One module remains running, how can we restart the detector?
"""
def __init__(
self, host: str = "http://sls-jfjoch-001:8080", parent: Device | None = None
) -> None:
self._initialised = False
configuration = jfjoch_client.Configuration(host=host)
api_client = jfjoch_client.ApiClient(configuration)
self.api = jfjoch_client.DefaultApi(api_client)
configuration = Configuration(host=host)
api_client = ApiClient(configuration)
self.api = DefaultApi(api_client)
self._parent_name = parent.name if parent else self.__class__.__name__
@property
def jjf_state(self) -> BrokerStatus:
"""Get the status of JungfrauJoch"""
response = self.api.status_get()
return BrokerStatus(**response.to_dict())
@property
def initialised(self) -> bool:
@@ -53,101 +75,121 @@ class JungfrauJochClient:
"""Set the connected status"""
self._initialised = value
def get_jungfrau_joch_status(self) -> DetectorState:
@property
def detector_state(self) -> DetectorState:
"""Get the status of JungfrauJoch"""
return self.api.status_get().state
return DetectorState(self.jjf_state.state)
def connect_and_initialise(self, timeout: int = 5) -> None:
def connect_and_initialise(self, timeout: int = 10, **kwargs) -> None:
"""Check if JungfrauJoch is connected and ready to receive commands"""
status = self.api.status_get().state
status = self.detector_state
if status != DetectorState.IDLE:
self.api.initialize_post()
self.wait_till_done(timeout)
self.initialised = True
self.api.initialize_post() # This is a blocking call....
self.wait_till_done(timeout, **kwargs) # Blocking call
self.initialised = True
def set_detector_settings(self, settings: dict | jfjoch_client.DatasetSettings) -> None:
def set_detector_settings(self, settings: dict | DetectorSettings, timeout: int = 10) -> None:
"""Set the detector settings. JungfrauJoch must be in IDLE, Error or Inactive state.
Note, the full settings have to be provided, otherwise the settings will be overwritten with default values.
Args:
settings (dict): dictionary of settings
"""
state = self.api.status_get().state
state = self.detector_state
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
raise JungfrauJochClientError(
f"Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}"
)
time.sleep(1) # Give the detector 1s to become IDLE, retry
state = self.detector_state
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
raise JungfrauJochClientError(
f"Error in {self._parent_name}. Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}"
)
if isinstance(settings, dict):
settings = jfjoch_client.DatasetSettings(**settings)
self.api.config_detector_put(settings)
settings = DetectorSettings(**settings)
try:
self.api.config_detector_put(detector_settings=settings, _request_timeout=timeout)
except requests.exceptions.Timeout:
raise TimeoutError(f"Timeout while setting detector settings for {self._parent_name}")
except Exception:
content = traceback.format_exc()
raise JungfrauJochClientError(
f"Error while setting detector settings for {self._parent_name}: {content}"
)
def set_mesaurement_settings(self, settings: dict | jfjoch_client.DatasetSettings) -> None:
"""Set the measurement settings. JungfrauJoch must be in IDLE state.
def start(self, settings: dict | DatasetSettings, request_timeout: float = 10) -> None:
"""Start the mesaurement. DatasetSettings must be provided, and JungfrauJoch must be in IDLE state.
The method call is blocking and JungfrauJoch will be ready to measure after the call resolves.
Please check the DataSettings class for the available settings.
The minimum required settings are:
beam_x_pxl: StrictFloat | StrictInt,
beam_y_pxl: StrictFloat | StrictInt,
detector_distance_mm: float | int,
incident_energy_keV: float | int,
Args:
settings (dict): dictionary of settings
Please check the DataSettings class for the available settings. Minimum required settings are
beam_x_pxl, beam_y_pxl, detector_distance_mm, incident_energy_keV.
"""
state = self.api.status_get().state
state = self.detector_state
if state != DetectorState.IDLE:
raise JungfrauJochClientError(
f"Detector must be in IDLE state to set settings. Current state: {state}"
f"Error in {self._parent_name}. Detector must be in IDLE state to set settings. Current state: {state}"
)
if isinstance(settings, dict):
settings = jfjoch_client.DatasetSettings(**settings)
settings = DatasetSettings(**settings)
try:
res = self.api.start_post_with_http_info(dataset_settings=settings)
if res.status_code != 200:
logger.error(
f"Error while setting measurement settings {settings}, response: {res}"
)
raise JungfrauJochClientError(
f"Error while setting measurement settings {settings}, response: {res}"
)
except Exception as e:
logger.error(
f"Error while setting measurement settings {settings}. Exception raised {e}"
self.api.start_post_with_http_info(
dataset_settings=settings, _request_timeout=request_timeout
)
except requests.exceptions.Timeout:
raise TimeoutError(
f"TimeoutError in JungfrauJochClient for parent device {self._parent_name} for 'start' call"
)
except Exception:
content = traceback.format_exc()
raise JungfrauJochClientError(
f"Error while setting measurement settings {settings}. Exception raised {e}"
) from e
f"Error in JungfrauJochClient for parent device {self._parent_name} during 'start' call: {content}"
)
def wait_till_done(self, timeout: int = 5) -> None:
"""Wait until JungfrauJoch is done.
def stop(self, request_timeout: float = 0.5) -> None:
"""Stop the acquisition, this only logs errors and is not raising."""
try:
self.api.cancel_post_with_http_info(_request_timeout=request_timeout)
except requests.exceptions.Timeout:
content = traceback.format_exc()
logger.error(
f"Timeout in JungFrauJochClient for device {self._parent_name} during stop: {content}"
)
except Exception:
content = traceback.format_exc()
logger.error(
f"Error in JungFrauJochClient for device {self._parent_name} during stop: {content}"
)
def wait_till_done(self, timeout: int = 10, **kwargs) -> None:
"""Wait for JungfrauJoch to be in Idle state. Blocking call with timeout.
Args:
timeout (int): timeout in seconds
"""
success = False
try:
response = self.api.wait_till_done_post_with_http_info(math.ceil(timeout / 2))
if response.status_code != ResponseWaitDone.DETECTOR_IDLE:
logger.info(
f"Waitin for JungfrauJoch to be done, status: {ResponseWaitDone(response.status_code)}; response msg {response}"
self.api.wait_till_done_post_with_http_info(math.ceil(timeout=timeout / 2), **kwargs)
except requests.exceptions.Timeout:
raise TimeoutError(
f"Timeout in JungfrauJochClient for parent device {self._parent_name} for 'wait_till_done' call"
)
except Exception:
logger.info(
f"Waiting for device {self._parent_name}, jungfrau joch to become IDLE, retry after {timeout/2} seconds"
)
try:
self.api.wait_till_done_post_with_http_info(
timeout=math.floor(timeout / 2), **kwargs
)
response = self.api.wait_till_done_post_with_http_info(math.floor(timeout / 2))
if response.status_code == ResponseWaitDone.DETECTOR_IDLE:
success = True
return
except Exception as e:
logger.error(f"Error while waiting for JungfrauJoch to initialise: {e}")
raise JungfrauJochClientError(
f"Error while waiting for JungfrauJoch to initialise: {e}"
) from e
else:
if success is False:
logger.error(
f"Failed to initialise JungfrauJoch with status: {response.status_code}; response msg {response}"
except requests.exceptions.Timeout:
raise TimeoutError(
f"Timeout in JungfrauJochClient for parent device {self._parent_name} for 'wait_till_done' call"
)
except Exception:
content = traceback.format_exc()
raise JungfrauJochClientError(
f"Failed to initialise JungfrauJoch with status: {response.status_code}; response msg {response}"
f"JungfrauJoch Error in wait_till_done post for device {self._parent_name}: {content}"
)

View File

@@ -0,0 +1,95 @@
"""Module for the Eiger preview ZMQ stream."""
from __future__ import annotations
import json
import threading
import time
from typing import Callable
import numpy as np
import zmq
from bec_lib.logger import bec_logger
logger = bec_logger.logger
ZMQ_TOPIC_FILTER = b""
class JungfrauJochPreview:
USER_ACCESS = ["start", "stop"]
def __init__(self, url: str, cb: Callable):
self.url = url
self._socket = None
self._shutdown_event = threading.Event()
self._zmq_thread = None
self._on_update_callback = cb
def connect(self):
"""Connect to the JungfrauJoch PUB-SUB streaming interface
JungfrauJoch may reject connection for a few seconds when it restarts,
so if it fails, wait a bit and try to connect again.
"""
# pylint: disable=no-member
context = zmq.Context()
self._socket = context.socket(zmq.SUB)
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
try:
self._socket.connect(self.url)
except ConnectionRefusedError:
time.sleep(1)
self._socket.connect(self.url)
def start(self):
self._zmq_thread = threading.Thread(
target=self._zmq_update_loop, daemon=True, name="JungfrauJoch_live_preview"
)
self._zmq_thread.start()
def stop(self):
self._shutdown_event.set()
if self._zmq_thread:
self._zmq_thread.join()
def _zmq_update_loop(self):
while not self._shutdown_event.is_set():
if self._socket is None:
self.connect()
try:
self._poll()
except ValueError:
# Happens when ZMQ partially delivers the multipart message
pass
except zmq.error.Again:
# Happens when receive queue is empty
time.sleep(0.1)
def _poll(self):
"""
Poll the ZMQ socket for new data. It will throttle the data update and
only subscribe to the topic for a single update. This is not very nice
but it seems like there is currently no option to set the update rate on
the backend.
"""
if self._shutdown_event.wait(0.2):
return
try:
# subscribe to the topic
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
# pylint: disable=no-member
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
self._parse_data(r)
finally:
# Unsubscribe from the topic
self._socket.setsockopt(zmq.UNSUBSCRIBE, ZMQ_TOPIC_FILTER)
def _parse_data(self, data):
logger.info(f"Received data of length {len(data)}")
self._on_update_callback(data)

BIN
frame_dump.cbor Normal file

Binary file not shown.

View File

@@ -23,6 +23,7 @@ dependencies = [
"pyepics",
"pyueye", # for the IDS uEye camera
"bec_widgets",
"zmq",
]
[project.optional-dependencies]