feat(eiger): add jfj integration
This commit is contained in:
@@ -42,3 +42,29 @@ ids_cam:
|
||||
enabled: true
|
||||
readoutPriority: async
|
||||
softwareTrigger: True
|
||||
|
||||
eiger_1_5:
|
||||
description: Eiger 1.5M in-vacuum detector
|
||||
deviceClass: csaxs_bec.devices.jungfraujoch.eiger_1_5m.Eiger1_5M
|
||||
deviceConfig:
|
||||
detector_distance: 100
|
||||
beam_center: [0, 0]
|
||||
onFailure: raise
|
||||
enabled: true
|
||||
readoutPriority: async
|
||||
softwareTrigger: False
|
||||
|
||||
samx:
|
||||
readoutPriority: baseline
|
||||
deviceClass: ophyd_devices.SimPositioner
|
||||
deviceConfig:
|
||||
delay: 1
|
||||
limits:
|
||||
- -50
|
||||
- 50
|
||||
tolerance: 0.01
|
||||
update_frequency: 400
|
||||
deviceTags:
|
||||
- user motors
|
||||
enabled: true
|
||||
readOnly: false
|
||||
|
||||
@@ -33,6 +33,7 @@ from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
@@ -160,6 +161,9 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
# f = e + 1us
|
||||
# e has refernce to d, f has reference to e
|
||||
self.set_delay_pairs(channel="ef", delay=0, width=1e-6)
|
||||
time.sleep(
|
||||
0.2
|
||||
) # After staging, make sure that the DDG HW has some time to process changes properly.
|
||||
|
||||
def _prepare_mcs_on_trigger(self, mcs: MCSCardCSAXS) -> None:
|
||||
"""Prepare the MCS card for the next trigger.
|
||||
@@ -188,7 +192,13 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
while (
|
||||
self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set()
|
||||
):
|
||||
self._poll_loop()
|
||||
try:
|
||||
self._poll_loop()
|
||||
except Exception: # pylint: disable=broad-except
|
||||
content = traceback.format_exc()
|
||||
logger.error(
|
||||
f"Exception in polling loop thread, polling continues...\n Error content:\n{content}"
|
||||
)
|
||||
|
||||
self._poll_thread_poll_loop_done.set()
|
||||
|
||||
@@ -201,13 +211,17 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
to be necessary to avoid missing events.
|
||||
"""
|
||||
self.state.proc_status.put(1, use_complete=True)
|
||||
time.sleep(0.02) # 20ms delay for processing, important for not missing events
|
||||
if self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set():
|
||||
if (
|
||||
self._poll_thread_run_event.wait(timeout=0.02)
|
||||
and not self._poll_thread_kill_event.is_set()
|
||||
):
|
||||
return
|
||||
self.state.event_status.get(use_monitor=False)
|
||||
if self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set():
|
||||
if (
|
||||
self._poll_thread_run_event.wait(timeout=0.02)
|
||||
and not self._poll_thread_kill_event.is_set()
|
||||
):
|
||||
return
|
||||
time.sleep(0.02) # 20ms delay for processing, important for not missing events
|
||||
|
||||
def _start_polling(self) -> None:
|
||||
"""Start the polling loop in the background thread."""
|
||||
@@ -253,6 +267,9 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
If we don't then subsequent triggers may reach the DDG too early, and will be ignored. To
|
||||
avoid this, we've added the option to specify a delay via add_delay, default here is 50ms.
|
||||
"""
|
||||
# Keep sleep here for software trigger mode as 20ms delay between subsequent commands
|
||||
# to the HW are necessary to avoid crashes and missing events.
|
||||
time.sleep(0.02)
|
||||
# Stop polling, poll once manually to ensure that the register is clean
|
||||
self._stop_polling()
|
||||
self._poll_thread_poll_loop_done.wait(timeout=1)
|
||||
|
||||
@@ -158,7 +158,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
self.acquire_mode.set(ACQUIREMODE.MCS).wait(timeout=self._pv_timeout)
|
||||
|
||||
# Subscribe the progress signal
|
||||
self.current_channel.subscribe(self._progress_update, run=False)
|
||||
# self.current_channel.subscribe(self._progress_update, run=False)
|
||||
|
||||
# Subscribe to the mca updates
|
||||
for name in self.counter_mapping.keys():
|
||||
|
||||
284
csaxs_bec/devices/jungfraujoch/eiger.py
Normal file
284
csaxs_bec/devices/jungfraujoch/eiger.py
Normal file
@@ -0,0 +1,284 @@
|
||||
"""
|
||||
Generic integration of JungfrauJoch backend with Eiger detectors
|
||||
for the cSAXS beamline at the Swiss Light Source.
|
||||
|
||||
The WEB UI is available on http://sls-jfjoch-001:8080
|
||||
|
||||
NOTE: this may not be the best place to store this information. It should be migrated to
|
||||
beamline documentation for debugging of Eiger & JungfrauJoch.
|
||||
|
||||
The JungfrauJoch server for cSAXS runs on sls-jfjoch-001.psi.ch
|
||||
User with sufficient rights may use:
|
||||
- sudo systemctl restart jfjoch_broker
|
||||
- sudo systemctl status jfjoch_broker
|
||||
to check and/or restart the broker for the JungfrauJoch server.
|
||||
|
||||
Some extra notes for setting up the detector:
|
||||
- If the energy on JFJ is set via DetectorSettings, the variable in DatasetSettings will be ignored
|
||||
- Changes in energy may take time, good to implement logic that only resets energy if needed.
|
||||
- For the Eiger, the frame_time_us in DetectorSettings is ignored, only the frame_time_us in
|
||||
the DatasetSettings is relevant
|
||||
- The bit_depth will be adjusted automatically based on the exp_time. Here, we need to ensure
|
||||
that subsequent triggers properly
|
||||
consider the readout_time of the boards. For Jungfrau detectors, the difference between
|
||||
count_time_us and frame_time_us is the readout_time of the boards. For the Eiger, this needs
|
||||
to be taken into account during the integration.
|
||||
- beam_center and detector settings are required input arguments, thus, they may be set to wrong
|
||||
values for acquisitions to start. Please keep this in mind.
|
||||
|
||||
Hardware related notes:
|
||||
- If there is an HW issue with the detector, power cycling may help.
|
||||
- The sls_detector package is available on console on /sls/X12SA/data/gac-x12sa/erik/micromamba
|
||||
- Run: source setup_9m.sh # Be careful, this connects to the detector, so it should not be
|
||||
used during operation
|
||||
- Useful commands:
|
||||
- p highvoltage 0 or 150 (operational)
|
||||
- g highvoltage
|
||||
- # Put high voltage to 0 before power cylcing it.
|
||||
- telnet bchip500
|
||||
- cd power_control_user/
|
||||
- ./on
|
||||
- ./off
|
||||
|
||||
Further information that may be relevant for debugging:
|
||||
JungfrauJoch - one needs to connect to the jfj-server (sls-jfjoch-001)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import time
|
||||
import traceback
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
import requests
|
||||
import yaml
|
||||
from bec_lib.file_utils import get_full_path
|
||||
from bec_lib.logger import bec_logger
|
||||
from jfjoch_client.models.dataset_settings import DatasetSettings
|
||||
from jfjoch_client.models.detector_selection import DetectorSelection
|
||||
from jfjoch_client.models.detector_settings import DetectorSettings
|
||||
from jfjoch_client.models.detector_timing import DetectorTiming
|
||||
from ophyd import DeviceStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
from csaxs_bec.devices.jungfraujoch.jungfrau_joch_client import (
|
||||
JungfrauJochClient,
|
||||
JungfrauJochClientError,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_server.device_server.device_server import DeviceManagerDS
|
||||
from jfjoch_client.models.detector_list_element import DetectorListElement
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
EIGER_READOUT_TIME_US = 500e-6 # 500 microseconds in s
|
||||
|
||||
|
||||
class EigerError(Exception):
|
||||
"""Custom exception for Eiger detector errors."""
|
||||
|
||||
|
||||
class Eiger(PSIDeviceBase):
|
||||
"""
|
||||
Base integration of the Eiger1.5M and Eiger9M at cSAXS. All relevant
|
||||
"""
|
||||
|
||||
USER_ACCESS = ["detector_distance", "beam_center"]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
detector_name: Literal["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"],
|
||||
host: str = "http://sls-jfjoch-001",
|
||||
port: int = 8080,
|
||||
detector_distance: float = 100.0,
|
||||
beam_center: tuple[int, int] = (0, 0),
|
||||
scan_info: ScanInfo = None,
|
||||
readout_time: float = EIGER_READOUT_TIME_US,
|
||||
device_manager=None,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Initialize the PSI Device Base class.
|
||||
|
||||
Args:
|
||||
name (str) : Name of the device
|
||||
detector_name (str): Name of the detector. Supports ["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"]
|
||||
host (str): Hostname of the Jungfrau Joch server.
|
||||
port (int): Port of the Jungfrau Joch server.
|
||||
scan_info (ScanInfo): The scan info to use.
|
||||
device_manager (DeviceManagerDS): The device manager to use.
|
||||
**kwargs: Additional keyword arguments.
|
||||
"""
|
||||
super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs)
|
||||
self._host = f"{host}:{port}"
|
||||
self.jfj_client = JungfrauJochClient(host=self._host, parent=self)
|
||||
self.device_manager = device_manager
|
||||
self.detector_name = detector_name
|
||||
self._detector_distance = detector_distance
|
||||
self._beam_center = beam_center
|
||||
self._readout_time = readout_time
|
||||
self._full_path = ""
|
||||
if self.device_manager is not None:
|
||||
self.device_manager: DeviceManagerDS
|
||||
|
||||
@property
|
||||
def detector_distance(self) -> float:
|
||||
"""The detector distance in mm."""
|
||||
return self._detector_distance
|
||||
|
||||
@detector_distance.setter
|
||||
def detector_distance(self, value: float) -> None:
|
||||
"""Set the detector distance in mm."""
|
||||
if value <= 0:
|
||||
raise ValueError("Detector distance must be a positive value.")
|
||||
self._detector_distance = value
|
||||
|
||||
@property
|
||||
def beam_center(self) -> tuple[float, float]:
|
||||
"""The beam center in pixels. (x,y)"""
|
||||
return self._beam_center
|
||||
|
||||
@beam_center.setter
|
||||
def beam_center(self, value: tuple[float, float]) -> None:
|
||||
"""Set the beam center in pixels. (x,y)"""
|
||||
self._beam_center = value
|
||||
|
||||
def on_init(self) -> None:
|
||||
"""
|
||||
Called when the device is initialized.
|
||||
|
||||
No siganls are connected at this point,
|
||||
thus should not be set here but in on_connected instead.
|
||||
"""
|
||||
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Called after the device is connected and its signals are connected.
|
||||
Default values for signals should be set here.
|
||||
"""
|
||||
logger.debug(f"On connected called for {self.name}")
|
||||
self.jfj_client.stop(request_timeout=3)
|
||||
# If stop failed, it will not raise. So the next calls may raise instead.. TODO logic could be discussed
|
||||
# Get available detectors
|
||||
available_detectors = self.jfj_client.api.config_select_detector_get(_request_timeout=5)
|
||||
detector_info = [
|
||||
det for det in available_detectors.detectors if det.description == self.detector_name
|
||||
]
|
||||
# If the specified detector is not found, or multiple definitions for that name are available, something is wrong.
|
||||
if len(detector_info) != 1:
|
||||
raise ValueError(
|
||||
f"Detector {self.detector_name} not found in available detectors: {available_detectors}"
|
||||
)
|
||||
detector_info: DetectorListElement = detector_info[0]
|
||||
try:
|
||||
self.jfj_client.api.config_select_detector_put(
|
||||
DetectorSelection(id=detector_info.id), _request_timeout=10
|
||||
)
|
||||
except requests.exceptions.Timeout:
|
||||
raise TimeoutError(f"Timeout while selecting detector {self.detector_name}")
|
||||
except Exception:
|
||||
content = traceback.format_exc()
|
||||
raise EigerError(f"Error while selecting detector {self.detector_name}: {content}")
|
||||
|
||||
# Try to connect, needs to be in Inactive or Error state
|
||||
self.jfj_client.connect_and_initialise(timeout=10, _request_timeout=10)
|
||||
|
||||
# This sets the energy threshold for the EIGER detector
|
||||
settings = DetectorSettings(frame_time_us=int(500), timing=DetectorTiming.TRIGGER)
|
||||
logger.debug(f"Setting detector_settings: {yaml.dump(settings.to_dict(), indent=4)}")
|
||||
self.jfj_client.set_detector_settings(settings, timeout=10)
|
||||
|
||||
def on_stage(self) -> DeviceStatus | None:
|
||||
"""
|
||||
Called while staging the device.
|
||||
|
||||
Information about the upcoming scan can be accessed from the scan_info object.
|
||||
"""
|
||||
start_time = time.time()
|
||||
|
||||
scan_msg = self.scan_info.msg
|
||||
# TODO add mono energy logic, if energy changed more than ~1-2%, adapt thresholds!
|
||||
# Add logic to set the detector energy threshold from mono_energy pv
|
||||
# dev.mono_energy from device_manager
|
||||
incident_energy = 12.0
|
||||
|
||||
exp_time = scan_msg.scan_parameters.get("exp_time", 0)
|
||||
if exp_time <= self._readout_time:
|
||||
raise ValueError(
|
||||
f"Receive scan request for scan {scan_msg.scan_name} with exp_time {exp_time}s, which must be larger than the readout time {self._readout_time}s of the detector {self.detector_name}."
|
||||
)
|
||||
frame_time_us = exp_time # - self._readout_time # Needs to be checked if needed
|
||||
# convert frame time to us
|
||||
# settings = DetectorSettings(frame_time_us=int(frame_time_us * 1e6))
|
||||
# self.jfj_client.set_detector_settings(settings, timeout=10)
|
||||
# Set acquisition parameter
|
||||
ntrigger = int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"])
|
||||
# Fetch file path
|
||||
self._full_path = get_full_path(
|
||||
scan_msg, name=self.name
|
||||
) # We can discuss if this should be used with name..
|
||||
# Get Path from Broker
|
||||
# Fetch basepath from JFJ broker config
|
||||
path = os.path.relpath(self._full_path, start="/sls/x12sa/data")
|
||||
data_settings = DatasetSettings(
|
||||
image_time_us=int(frame_time_us * 1e6), # This is currently ignored
|
||||
ntrigger=ntrigger,
|
||||
file_prefix=path,
|
||||
beam_x_pxl=int(self._beam_center[0]),
|
||||
beam_y_pxl=int(self._beam_center[1]),
|
||||
detector_distance_mm=self.detector_distance,
|
||||
incident_energy_ke_v=incident_energy,
|
||||
)
|
||||
logger.debug(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}")
|
||||
prep_time = start_time - time.time()
|
||||
logger.info(f"Prepared information for eiger to start acquisition in {prep_time:.2f}s")
|
||||
self.jfj_client.start(settings=data_settings)
|
||||
start_call_returns = time.time() - start_time - prep_time
|
||||
logger.info(f"Start Rest call from JFJ took {start_call_returns:.2f}s")
|
||||
|
||||
def on_unstage(self) -> DeviceStatus:
|
||||
"""Called while unstaging the device."""
|
||||
|
||||
def on_pre_scan(self) -> DeviceStatus:
|
||||
"""Called right before the scan starts on all devices automatically."""
|
||||
|
||||
def on_trigger(self) -> DeviceStatus:
|
||||
"""Called when the device is triggered."""
|
||||
|
||||
def on_complete(self) -> DeviceStatus:
|
||||
"""Called to inquire if a device has completed a scans."""
|
||||
|
||||
def wait_for_complete():
|
||||
timeout = 10
|
||||
for _ in range(timeout):
|
||||
try:
|
||||
self.jfj_client.wait_till_done(timeout=1, _request_timeout=5)
|
||||
except (
|
||||
JungfrauJochClientError
|
||||
): # Means that timeout was triggered, and not _request_timeout
|
||||
continue
|
||||
except TimeoutError: # Timeout exception from wait_till_done
|
||||
content = traceback.format_exc()
|
||||
raise TimeoutError(f"Timeout for request during complete call: {content}")
|
||||
except Exception as e: # This should actually never occur..
|
||||
raise ValueError(f"Error in complete for {self.name}, exception: {e}") from e
|
||||
else:
|
||||
break
|
||||
|
||||
status = self.task_handler.submit_task(wait_for_complete, run=True)
|
||||
self.cancel_on_stop(status)
|
||||
return status
|
||||
|
||||
def on_kickoff(self) -> DeviceStatus | None:
|
||||
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Called when the device is stopped."""
|
||||
self.jfj_client.stop(
|
||||
request_timeout=0.5
|
||||
) # Call should not block more than 0.5 seconds to stop all devices...
|
||||
self.task_handler.shutdown()
|
||||
54
csaxs_bec/devices/jungfraujoch/eiger_1_5m.py
Normal file
54
csaxs_bec/devices/jungfraujoch/eiger_1_5m.py
Normal file
@@ -0,0 +1,54 @@
|
||||
"""
|
||||
Eiger 1.5M specific integration. It is based on the Eiger base integration for the JungfrauJoch backend
|
||||
which is placed in eiger_csaxs, and where code that is equivalent for the Eiger9M and Eiger1.5M is shared.
|
||||
|
||||
Please check the eiger_csaxs.py class for more details about the relevant services.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from csaxs_bec.devices.jungfraujoch.eiger import Eiger
|
||||
|
||||
EIGER1_5M_READOUT_TIME_US = 500e-6 # 500 microseconds in s
|
||||
DETECTOR_NAME = "EIGER 1.5M"
|
||||
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_server.device_server.device_server import DeviceManagerDS
|
||||
|
||||
|
||||
# pylint:disable=invalid-name
|
||||
class Eiger1_5M(Eiger):
|
||||
"""
|
||||
Eiger 1.5M specific integration for the in-vaccum Eiger.
|
||||
|
||||
The logic implemented here is coupled to the DelayGenerator integration,
|
||||
repsonsible for the global triggering of all devices through a single Trigger logic.
|
||||
Please check the eiger.py class for more details about the integration of relevant backend
|
||||
services. The detector_name must be set to "EIGER 1.5M:
|
||||
"""
|
||||
|
||||
USER_ACCESS = Eiger.USER_ACCESS + [] # Add more user_access methods here.
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
detector_distance: float = 100.0,
|
||||
beam_center: tuple[float, float] = (0.0, 0.0),
|
||||
scan_info: ScanInfo = None,
|
||||
device_manager: DeviceManagerDS = None,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
name=name,
|
||||
detector_name=DETECTOR_NAME,
|
||||
readout_time=EIGER1_5M_READOUT_TIME_US,
|
||||
detector_distance=detector_distance,
|
||||
beam_center=beam_center,
|
||||
scan_info=scan_info,
|
||||
device_manager=device_manager,
|
||||
**kwargs,
|
||||
)
|
||||
58
csaxs_bec/devices/jungfraujoch/eiger_9m.py
Normal file
58
csaxs_bec/devices/jungfraujoch/eiger_9m.py
Normal file
@@ -0,0 +1,58 @@
|
||||
"""
|
||||
Eiger 9M specific integration. It is based on the Eiger base integration for the JungfrauJoch backend
|
||||
which is placed in eiger_csaxs, and where code that is equivalent for the Eiger9M and Eiger1.5M is shared.
|
||||
|
||||
Please check the eiger_csaxs.py class for more details about the relevant services.
|
||||
|
||||
In 16bit mode, 8e7 counts/s per pixel are supported in summed up frames,
|
||||
although subframes will never have more than 12bit counts (~4000 counts per pixel in subframe).
|
||||
In 32bit mode, 2e7 counts/s per pixel are supported, for which subframes will have no
|
||||
more than 24bit counts, which means 16.7 million counts per pixel in subframes.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from csaxs_bec.devices.jungfraujoch.eiger import Eiger
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_server.device_server.device_server import DeviceManagerDS
|
||||
|
||||
EIGER9M_READOUT_TIME_US = 500e-6 # 500 microseconds in s
|
||||
DETECTOR_NAME = "EIGER 8.5M (tmp)" # "EIGER 9M""
|
||||
|
||||
|
||||
# pylint:disable=invalid-name
|
||||
class Eiger1_5M(Eiger):
|
||||
"""
|
||||
Eiger 1.5M specific integration for the in-vaccum Eiger.
|
||||
|
||||
The logic implemented here is coupled to the DelayGenerator integration,
|
||||
repsonsible for the global triggering of all devices through a single Trigger logic.
|
||||
Please check the eiger.py class for more details about the integration of relevant backend
|
||||
services. The detector_name must be set to "EIGER 1.5M:
|
||||
"""
|
||||
|
||||
USER_ACCESS = Eiger.USER_ACCESS + [] # Add more user_access methods here.
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
detector_distance: float = 100.0,
|
||||
beam_center: tuple[float, float] = (0.0, 0.0),
|
||||
scan_info: ScanInfo = None,
|
||||
device_manager: DeviceManagerDS = None,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
name=name,
|
||||
detector_name=DETECTOR_NAME,
|
||||
readout_time=EIGER9M_READOUT_TIME_US,
|
||||
detector_distance=detector_distance,
|
||||
beam_center=beam_center,
|
||||
scan_info=scan_info,
|
||||
device_manager=device_manager,
|
||||
**kwargs,
|
||||
)
|
||||
@@ -1,20 +1,36 @@
|
||||
"""Module with client interface for the Jungfrau Joch detector API"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import math
|
||||
import time
|
||||
import traceback
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import jfjoch_client
|
||||
import requests
|
||||
from bec_lib.logger import bec_logger
|
||||
from jfjoch_client.api.default_api import DefaultApi
|
||||
from jfjoch_client.api_client import ApiClient
|
||||
from jfjoch_client.configuration import Configuration
|
||||
from jfjoch_client.models.broker_status import BrokerStatus
|
||||
from jfjoch_client.models.dataset_settings import DatasetSettings
|
||||
from jfjoch_client.models.detector_settings import DetectorSettings
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ophyd import Device
|
||||
|
||||
|
||||
# pylint: disable=raise-missing-from
|
||||
# pylint: disable=broad-except
|
||||
class JungfrauJochClientError(Exception):
|
||||
"""Base class for exceptions in this module."""
|
||||
|
||||
|
||||
class DetectorState(enum.StrEnum):
|
||||
"""Detector states for Jungfrau Joch detector
|
||||
['Inactive', 'Idle', 'Busy', 'Measuring', 'Pedestal', 'Error']
|
||||
"""
|
||||
class DetectorState(str, enum.Enum):
|
||||
"""Possible Detector states for Jungfrau Joch detector"""
|
||||
|
||||
INACTIVE = "Inactive"
|
||||
IDLE = "Idle"
|
||||
@@ -24,24 +40,30 @@ class DetectorState(enum.StrEnum):
|
||||
ERROR = "Error"
|
||||
|
||||
|
||||
class ResponseWaitDone(enum.IntEnum):
|
||||
"""Response state for Jungfrau Joch detector wait till done"""
|
||||
|
||||
DETECTOR_IDLE = 200
|
||||
TIMEOUT_PARAM_OUT_OF_RANGE = 400
|
||||
JUNGFRAU_ERROR = 500
|
||||
DETECTOR_INACTIVE = 502
|
||||
TIMEOUT_REACHED = 504
|
||||
|
||||
|
||||
class JungfrauJochClient:
|
||||
"""Thin wrapper around the Jungfrau Joch API client"""
|
||||
"""Thin wrapper around the Jungfrau Joch API client.
|
||||
|
||||
def __init__(self, host: str = "http://sls-jfjoch-001:8080") -> None:
|
||||
sudo systemctl restart jfjoch_broker
|
||||
sudo systemctl status jfjoch_broker
|
||||
|
||||
It looks as if the detector is not being stopped properly.
|
||||
One module remains running, how can we restart the detector?
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, host: str = "http://sls-jfjoch-001:8080", parent: Device | None = None
|
||||
) -> None:
|
||||
self._initialised = False
|
||||
configuration = jfjoch_client.Configuration(host=host)
|
||||
api_client = jfjoch_client.ApiClient(configuration)
|
||||
self.api = jfjoch_client.DefaultApi(api_client)
|
||||
configuration = Configuration(host=host)
|
||||
api_client = ApiClient(configuration)
|
||||
self.api = DefaultApi(api_client)
|
||||
self._parent_name = parent.name if parent else self.__class__.__name__
|
||||
|
||||
@property
|
||||
def jjf_state(self) -> BrokerStatus:
|
||||
"""Get the status of JungfrauJoch"""
|
||||
response = self.api.status_get()
|
||||
return BrokerStatus(**response.to_dict())
|
||||
|
||||
@property
|
||||
def initialised(self) -> bool:
|
||||
@@ -53,101 +75,121 @@ class JungfrauJochClient:
|
||||
"""Set the connected status"""
|
||||
self._initialised = value
|
||||
|
||||
def get_jungfrau_joch_status(self) -> DetectorState:
|
||||
@property
|
||||
def detector_state(self) -> DetectorState:
|
||||
"""Get the status of JungfrauJoch"""
|
||||
return self.api.status_get().state
|
||||
return DetectorState(self.jjf_state.state)
|
||||
|
||||
def connect_and_initialise(self, timeout: int = 5) -> None:
|
||||
def connect_and_initialise(self, timeout: int = 10, **kwargs) -> None:
|
||||
"""Check if JungfrauJoch is connected and ready to receive commands"""
|
||||
status = self.api.status_get().state
|
||||
status = self.detector_state
|
||||
if status != DetectorState.IDLE:
|
||||
self.api.initialize_post()
|
||||
self.wait_till_done(timeout)
|
||||
self.initialised = True
|
||||
self.api.initialize_post() # This is a blocking call....
|
||||
self.wait_till_done(timeout, **kwargs) # Blocking call
|
||||
self.initialised = True
|
||||
|
||||
def set_detector_settings(self, settings: dict | jfjoch_client.DatasetSettings) -> None:
|
||||
def set_detector_settings(self, settings: dict | DetectorSettings, timeout: int = 10) -> None:
|
||||
"""Set the detector settings. JungfrauJoch must be in IDLE, Error or Inactive state.
|
||||
Note, the full settings have to be provided, otherwise the settings will be overwritten with default values.
|
||||
|
||||
Args:
|
||||
settings (dict): dictionary of settings
|
||||
"""
|
||||
state = self.api.status_get().state
|
||||
state = self.detector_state
|
||||
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
|
||||
raise JungfrauJochClientError(
|
||||
f"Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}"
|
||||
)
|
||||
time.sleep(1) # Give the detector 1s to become IDLE, retry
|
||||
state = self.detector_state
|
||||
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
|
||||
raise JungfrauJochClientError(
|
||||
f"Error in {self._parent_name}. Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}"
|
||||
)
|
||||
|
||||
if isinstance(settings, dict):
|
||||
settings = jfjoch_client.DatasetSettings(**settings)
|
||||
self.api.config_detector_put(settings)
|
||||
settings = DetectorSettings(**settings)
|
||||
try:
|
||||
self.api.config_detector_put(detector_settings=settings, _request_timeout=timeout)
|
||||
except requests.exceptions.Timeout:
|
||||
raise TimeoutError(f"Timeout while setting detector settings for {self._parent_name}")
|
||||
except Exception:
|
||||
content = traceback.format_exc()
|
||||
raise JungfrauJochClientError(
|
||||
f"Error while setting detector settings for {self._parent_name}: {content}"
|
||||
)
|
||||
|
||||
def set_mesaurement_settings(self, settings: dict | jfjoch_client.DatasetSettings) -> None:
|
||||
"""Set the measurement settings. JungfrauJoch must be in IDLE state.
|
||||
def start(self, settings: dict | DatasetSettings, request_timeout: float = 10) -> None:
|
||||
"""Start the mesaurement. DatasetSettings must be provided, and JungfrauJoch must be in IDLE state.
|
||||
The method call is blocking and JungfrauJoch will be ready to measure after the call resolves.
|
||||
|
||||
Please check the DataSettings class for the available settings.
|
||||
The minimum required settings are:
|
||||
beam_x_pxl: StrictFloat | StrictInt,
|
||||
beam_y_pxl: StrictFloat | StrictInt,
|
||||
detector_distance_mm: float | int,
|
||||
incident_energy_keV: float | int,
|
||||
|
||||
Args:
|
||||
settings (dict): dictionary of settings
|
||||
|
||||
Please check the DataSettings class for the available settings. Minimum required settings are
|
||||
beam_x_pxl, beam_y_pxl, detector_distance_mm, incident_energy_keV.
|
||||
|
||||
"""
|
||||
state = self.api.status_get().state
|
||||
state = self.detector_state
|
||||
if state != DetectorState.IDLE:
|
||||
raise JungfrauJochClientError(
|
||||
f"Detector must be in IDLE state to set settings. Current state: {state}"
|
||||
f"Error in {self._parent_name}. Detector must be in IDLE state to set settings. Current state: {state}"
|
||||
)
|
||||
|
||||
if isinstance(settings, dict):
|
||||
settings = jfjoch_client.DatasetSettings(**settings)
|
||||
settings = DatasetSettings(**settings)
|
||||
try:
|
||||
res = self.api.start_post_with_http_info(dataset_settings=settings)
|
||||
if res.status_code != 200:
|
||||
logger.error(
|
||||
f"Error while setting measurement settings {settings}, response: {res}"
|
||||
)
|
||||
raise JungfrauJochClientError(
|
||||
f"Error while setting measurement settings {settings}, response: {res}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error while setting measurement settings {settings}. Exception raised {e}"
|
||||
self.api.start_post_with_http_info(
|
||||
dataset_settings=settings, _request_timeout=request_timeout
|
||||
)
|
||||
except requests.exceptions.Timeout:
|
||||
raise TimeoutError(
|
||||
f"TimeoutError in JungfrauJochClient for parent device {self._parent_name} for 'start' call"
|
||||
)
|
||||
except Exception:
|
||||
content = traceback.format_exc()
|
||||
raise JungfrauJochClientError(
|
||||
f"Error while setting measurement settings {settings}. Exception raised {e}"
|
||||
) from e
|
||||
f"Error in JungfrauJochClient for parent device {self._parent_name} during 'start' call: {content}"
|
||||
)
|
||||
|
||||
def wait_till_done(self, timeout: int = 5) -> None:
|
||||
"""Wait until JungfrauJoch is done.
|
||||
def stop(self, request_timeout: float = 0.5) -> None:
|
||||
"""Stop the acquisition, this only logs errors and is not raising."""
|
||||
try:
|
||||
self.api.cancel_post_with_http_info(_request_timeout=request_timeout)
|
||||
except requests.exceptions.Timeout:
|
||||
content = traceback.format_exc()
|
||||
logger.error(
|
||||
f"Timeout in JungFrauJochClient for device {self._parent_name} during stop: {content}"
|
||||
)
|
||||
except Exception:
|
||||
content = traceback.format_exc()
|
||||
logger.error(
|
||||
f"Error in JungFrauJochClient for device {self._parent_name} during stop: {content}"
|
||||
)
|
||||
|
||||
def wait_till_done(self, timeout: int = 10, **kwargs) -> None:
|
||||
"""Wait for JungfrauJoch to be in Idle state. Blocking call with timeout.
|
||||
|
||||
Args:
|
||||
timeout (int): timeout in seconds
|
||||
"""
|
||||
success = False
|
||||
try:
|
||||
response = self.api.wait_till_done_post_with_http_info(math.ceil(timeout / 2))
|
||||
if response.status_code != ResponseWaitDone.DETECTOR_IDLE:
|
||||
logger.info(
|
||||
f"Waitin for JungfrauJoch to be done, status: {ResponseWaitDone(response.status_code)}; response msg {response}"
|
||||
self.api.wait_till_done_post_with_http_info(math.ceil(timeout=timeout / 2), **kwargs)
|
||||
except requests.exceptions.Timeout:
|
||||
raise TimeoutError(
|
||||
f"Timeout in JungfrauJochClient for parent device {self._parent_name} for 'wait_till_done' call"
|
||||
)
|
||||
except Exception:
|
||||
logger.info(
|
||||
f"Waiting for device {self._parent_name}, jungfrau joch to become IDLE, retry after {timeout/2} seconds"
|
||||
)
|
||||
try:
|
||||
self.api.wait_till_done_post_with_http_info(
|
||||
timeout=math.floor(timeout / 2), **kwargs
|
||||
)
|
||||
response = self.api.wait_till_done_post_with_http_info(math.floor(timeout / 2))
|
||||
if response.status_code == ResponseWaitDone.DETECTOR_IDLE:
|
||||
success = True
|
||||
return
|
||||
except Exception as e:
|
||||
logger.error(f"Error while waiting for JungfrauJoch to initialise: {e}")
|
||||
raise JungfrauJochClientError(
|
||||
f"Error while waiting for JungfrauJoch to initialise: {e}"
|
||||
) from e
|
||||
else:
|
||||
if success is False:
|
||||
logger.error(
|
||||
f"Failed to initialise JungfrauJoch with status: {response.status_code}; response msg {response}"
|
||||
except requests.exceptions.Timeout:
|
||||
raise TimeoutError(
|
||||
f"Timeout in JungfrauJochClient for parent device {self._parent_name} for 'wait_till_done' call"
|
||||
)
|
||||
except Exception:
|
||||
content = traceback.format_exc()
|
||||
raise JungfrauJochClientError(
|
||||
f"Failed to initialise JungfrauJoch with status: {response.status_code}; response msg {response}"
|
||||
f"JungfrauJoch Error in wait_till_done post for device {self._parent_name}: {content}"
|
||||
)
|
||||
|
||||
95
csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py
Normal file
95
csaxs_bec/devices/jungfraujoch/jungfraujoch_preview.py
Normal file
@@ -0,0 +1,95 @@
|
||||
"""Module for the Eiger preview ZMQ stream."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
from typing import Callable
|
||||
|
||||
import numpy as np
|
||||
import zmq
|
||||
from bec_lib.logger import bec_logger
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
ZMQ_TOPIC_FILTER = b""
|
||||
|
||||
|
||||
class JungfrauJochPreview:
|
||||
USER_ACCESS = ["start", "stop"]
|
||||
|
||||
def __init__(self, url: str, cb: Callable):
|
||||
self.url = url
|
||||
self._socket = None
|
||||
self._shutdown_event = threading.Event()
|
||||
self._zmq_thread = None
|
||||
self._on_update_callback = cb
|
||||
|
||||
def connect(self):
|
||||
"""Connect to the JungfrauJoch PUB-SUB streaming interface
|
||||
|
||||
JungfrauJoch may reject connection for a few seconds when it restarts,
|
||||
so if it fails, wait a bit and try to connect again.
|
||||
"""
|
||||
# pylint: disable=no-member
|
||||
|
||||
context = zmq.Context()
|
||||
self._socket = context.socket(zmq.SUB)
|
||||
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
|
||||
try:
|
||||
self._socket.connect(self.url)
|
||||
except ConnectionRefusedError:
|
||||
time.sleep(1)
|
||||
self._socket.connect(self.url)
|
||||
|
||||
def start(self):
|
||||
self._zmq_thread = threading.Thread(
|
||||
target=self._zmq_update_loop, daemon=True, name="JungfrauJoch_live_preview"
|
||||
)
|
||||
self._zmq_thread.start()
|
||||
|
||||
def stop(self):
|
||||
self._shutdown_event.set()
|
||||
if self._zmq_thread:
|
||||
self._zmq_thread.join()
|
||||
|
||||
def _zmq_update_loop(self):
|
||||
while not self._shutdown_event.is_set():
|
||||
if self._socket is None:
|
||||
self.connect()
|
||||
try:
|
||||
self._poll()
|
||||
except ValueError:
|
||||
# Happens when ZMQ partially delivers the multipart message
|
||||
pass
|
||||
except zmq.error.Again:
|
||||
# Happens when receive queue is empty
|
||||
time.sleep(0.1)
|
||||
|
||||
def _poll(self):
|
||||
"""
|
||||
Poll the ZMQ socket for new data. It will throttle the data update and
|
||||
only subscribe to the topic for a single update. This is not very nice
|
||||
but it seems like there is currently no option to set the update rate on
|
||||
the backend.
|
||||
"""
|
||||
|
||||
if self._shutdown_event.wait(0.2):
|
||||
return
|
||||
|
||||
try:
|
||||
# subscribe to the topic
|
||||
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
|
||||
|
||||
# pylint: disable=no-member
|
||||
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
|
||||
self._parse_data(r)
|
||||
|
||||
finally:
|
||||
# Unsubscribe from the topic
|
||||
self._socket.setsockopt(zmq.UNSUBSCRIBE, ZMQ_TOPIC_FILTER)
|
||||
|
||||
def _parse_data(self, data):
|
||||
logger.info(f"Received data of length {len(data)}")
|
||||
self._on_update_callback(data)
|
||||
BIN
frame_dump.cbor
Normal file
BIN
frame_dump.cbor
Normal file
Binary file not shown.
@@ -23,6 +23,7 @@ dependencies = [
|
||||
"pyepics",
|
||||
"pyueye", # for the IDS uEye camera
|
||||
"bec_widgets",
|
||||
"zmq",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
|
||||
Reference in New Issue
Block a user