Compare commits

...

11 Commits

Author SHA1 Message Date
gac-x12sa
d7290e3942 wip work at beamline 2025-04-23 10:51:14 +02:00
gac-x12sa
730926f5b3 wip jfj integration 2025-03-25 16:30:01 +01:00
gac-x12sa
18215a05b5 wip jfj, implemented first logic for computing triggers and pulse widths 2025-03-24 17:39:26 +01:00
gac-x12sa
5cd93fc5aa wip moving readout from class attribute to constant 2025-03-24 17:38:01 +01:00
gac-x12sa
2b4a13ebc2 wip eiger, improve stage procedure for BEC core scans 2025-03-24 08:23:18 +01:00
gac-x12sa
ee8fa8b962 wip fixing jfj client, improve start procedure 2025-03-24 08:23:18 +01:00
gac-x12sa
b281e458f9 wip small fixes of ddg csaxs for readout times 2025-03-24 08:23:17 +01:00
gac-x12sa
48bd7f73a8 refactor: delay generator ready for step scanning 2025-03-21 15:12:35 +01:00
gac-x12sa
b806487c54 wip: draft from working at the beamline 2025-02-26 15:45:22 +01:00
53dca4dc6f wip, fix enums for pytest 3.10 2025-02-26 11:28:04 +01:00
ccf8bb8474 refactor: client refactoring, adding tests for jfj_client models 2025-02-26 11:22:14 +01:00
6 changed files with 1418 additions and 314 deletions

View File

@@ -0,0 +1,64 @@
eiger9m:
description: Eiger9m HPC area detector 9M with JungfrauJoch backend
deviceClass: csaxs_bec.devices.jungfraujoch.eiger_jfj.Eiger9MCSAXS
deviceConfig:
host: "http://sls-jfjoch-001"
port: 8080
deviceTags:
- cSAXS
- eiger9m
onFailure: buffer
enabled: true
readoutPriority: async
softwareTrigger: false
ddg_jfj:
description: DelayGenerator for triggering all detectors
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DelayGeneratorcSAXS
deviceConfig:
prefix: 'X12SA-CPCL-DDG3:'
# ddg_config:
# delay_burst: 40.e-3
# delta_width: 0
# additional_triggers: 0
# polarity:
# - 1 # T0 -> DDG MCS
# - 0 # eiger
# - 1 # falcon
# - 1
# - 1
# amplitude: 4.5
# offset: 0
# thres_trig_level: 2.5
# set_high_on_exposure: False
# set_high_on_stage: False
deviceTags:
- cSAXS
- ddg_detectors
onFailure: buffer
enabled: true
readoutPriority: async
softwareTrigger: True
# Two test devices from the simulation
samx:
readoutPriority: baseline
deviceClass: ophyd_devices.SimPositioner
deviceConfig:
delay: 1
limits:
- -50
- 50
tolerance: 0.01
update_frequency: 400
deviceTags:
- user motors
enabled: true
readOnly: false
bpm4i:
readoutPriority: monitored
deviceClass: ophyd_devices.SimMonitor
deviceConfig:
deviceTags:
- beamline
enabled: true
readOnly: false

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,221 @@
"""Eiger detector for cSAXS beamline at the Swiss Light Source.
16bit mode supports 8e7 counts/s per pixel,
you will never have more than 12bit subframes, which means 4000 counts per subframe.
32bit mode supports 2e7 counts/s per pixel,
you will never have more than 24bit subframe, which means 16.7 million counts per subframe.
"""
import enum
from typing import TYPE_CHECKING
from bec_lib.devicemanager import ScanInfo
from bec_lib.logger import bec_logger
from jfjoch_client.models.dataset_settings import DatasetSettings
from jfjoch_client.models.detector_settings import DetectorSettings
from jfjoch_client.models.detector_timing import DetectorTiming
from ophyd import DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from csaxs_bec.devices.jungfraujoch.jungfrau_joch_client import JungfrauJochClient
from csaxs_bec.devices.jungfraujoch.readout_constants import EIGER9M_READOUT_TIME_32BIT
if TYPE_CHECKING: # pragma no cover
from bec_lib.devicemanager import ScanInfo
from bec_server.device_server.device_server import DeviceManagerDS
logger = bec_logger.logger
class EigerCSAXSBitDepth(int, enum.Enum):
"""Bit depth for EIGER detector at cSAXS beamline."""
BIT_DEPTH_16 = 16
BIT_DEPTH_32 = 32
class Eiger9MCSAXS(PSIDeviceBase):
"""
-----------
JungfrauJoch - one needs to connect to the jfj-server (sls-jfjoch-001)
Relevant commands for debugging:
sudo systemctl restart jfjoch_broker
sudo systemctl status jfjoch_broker
Some additional notes:
------------
- If energy on JFJ is set via DetectorSettings, the one in DatasetSettings will be ignored.
- One can set this initially, and then set it to none in DetectorSettings such that any update in DatasetSettings will be considered.
- IMPORTANT: Any change in energy will be detector. It will be best to have a check ourselves with a certain tolerance of ~ % to not constantly update the energy.
- in 'gating' mode, frame_time_us and count_time_us are not used.
- The image_time_us of the DatasetSettings and the frame_time_us of the DetectorSettings need to be the same.
- The difference between frame_time_us and count_time_us is the readout time.
- 16bit and 32bit, when do we switch?
- If switching is desired, the readout time needs to be adapted as a function of internal exposure time, and bit depth.
This can be up to ~400us for 32bit, and long exposures. This needs to be discussed!
- 16bit mode supports 8e7 counts/s per pixel,
It needs to be se to None.
------------
Eiger - if power cycling is needed. Use a combination of commands that connect to the chip, and the conda package.
The package is available via:
cd /sls/X12SA/data/gac-x12sa/erik/micromamba
source setup_9m.sh
------------
Nice to set high voltage low first, from conda package (sls_detector_package)
p highvoltage 0 or 150 (operational)
g highvoltage
# Put high voltage to 0 before power cylcing it.
telnet bchip500
cd power_control_user/
./on
./off
"""
########################################
# Beamline Specific Implementations #
########################################
USER_ACCESS = ["jfj_client"]
def __init__(
self,
name: str,
host: str = "http://sls-jfjoch-001",
port: int = 8080,
scan_info: ScanInfo = None,
device_manager=None,
**kwargs,
):
"""
Initialize the PSI Device Base class.
Args:
name (str) : Name of the device
scan_info (ScanInfo): The scan info to use.
"""
super().__init__(name=name, scan_info=scan_info, **kwargs)
self._host = f"{host}:{port}"
self.jfj_client = JungfrauJochClient(host=self._host, parent=self)
self.device_manager = device_manager
if self.device_manager is not None:
self.device_manager: DeviceManagerDS
self._bit_depth = 16
self.frame_time = 500e-6 # 500us, will be ignored in DetectorTiming.Gated
self.count_time = 300e-6 # 480us, will be ignored in DetectorTiming.Gated
# If not gated, frame_time and count_time will be used and logic has to be adjusted
self._timing = DetectorTiming.GATED
def on_init(self) -> None:
"""
Called when the device is initialized.
No siganls are connected at this point,
thus should not be set here but in on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
# Stop first in case it was in an uncertain state (i.e. measuring)
logger.info(f"On connected for {self.name}")
self.jfj_client.stop()
# Try to connect, needs to be in Inactive or Error state
self.jfj_client.connect_and_initialise(timeout=5)
# Set energy threshold for EIGER detector
threshold_ke_v = 6.200 # Grab this from mono energy pseudo device
# Energy threshold provided in DetectorSettings, than it is ignored in DatasetSettings
# This sets the energy threshold for the EIGER detector
settings = DetectorSettings(
frame_time_us=int(self.frame_time * 1e6),
count_time_us=int(self.count_time * 1e6),
eiger_bit_depth=self._bit_depth,
eiger_threshold_ke_v=threshold_ke_v,
timing=self._timing,
)
self.jfj_client.set_detector_settings(settings)
# Second call is needed to ensure that eiger_threshold_ke_v is set to None
# if not, DatasetSettings for eiger_threshold_ke_v will be ignored
# settings = DetectorSettings(
# frame_time_us=int(self.frame_time * 1e6),
# count_time_us=int(self.count_time * 1e6),
# eiger_bit_depth=self._bit_depth,
# timing=self._timing,
# )
# self.jfj_client.set_detector_settings(settings)
def on_stage(self) -> DeviceStatus | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info object.
"""
# Delay generator ddg_jfj needs to be activate
ddg = self.device_manager.devices.get("ddg_jfj", None)
if ddg is None:
logger.warning("ddg_jfj not found in device manager")
raise ValueError("ddg_jfj not found in device manager")
ntrigger = ddg.compute_num_trigger()
if self.scan_info.msg.scan_type == "step":
# Energy threshold provided in DetectorSettings, than it is ignored in DatasetSettings
print()
data_settings = DatasetSettings(
image_time_us=int(self.frame_time * 1e6), # this is frame_time
ntrigger=ntrigger,
beam_x_pxl=0,
beam_y_pxl=0,
detector_distance_mm=100,
incident_energy_ke_v=10.00,
# file_prefix = full_path_to_file,
)
# status = self.task_handler.submit_task(
# self.jfj_client.start, task_args=(data_settings,), run=True
# )
# return status
self.jfj_client.start(settings=data_settings)
# This method computes trigger_pulse_width, ntriggers and bit_depth
# trigger_pulse_width -> image_time in s (image_time_us)
# ntriggers -> number of images per trigger
# bit_depth -> 16 or 32
def on_unstage(self) -> DeviceStatus | None:
"""Called while unstaging the device."""
def on_pre_scan(self) -> DeviceStatus | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | None:
"""Called to inquire if a device has completed a scans."""
def wait_for_complete():
timeout = 10
for _ in range(timeout):
try:
self.jfj_client.wait_till_done(timeout=1)
except TimeoutError:
continue
except Exception as e:
raise ValueError(f"Error in complete for {self.name}, exception: {e}") from e
else:
break
status = self.task_handler.submit_task(wait_for_complete, run=True)
return status
def on_kickoff(self) -> DeviceStatus | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.jfj_client.stop()
self.task_handler.shutdown()

View File

@@ -1,8 +1,18 @@
"""Module with client interface for the Jungfrau Joch detector API"""
import enum
import math
import time
import jfjoch_client
from bec_lib.logger import bec_logger
from jfjoch_client.api.default_api import DefaultApi
from jfjoch_client.api_client import ApiClient
from jfjoch_client.api_response import ApiResponse
from jfjoch_client.configuration import Configuration
from jfjoch_client.models.broker_status import BrokerStatus
from jfjoch_client.models.dataset_settings import DatasetSettings
from jfjoch_client.models.detector_settings import DetectorSettings
from ophyd import Device
logger = bec_logger.logger
@@ -11,10 +21,8 @@ class JungfrauJochClientError(Exception):
"""Base class for exceptions in this module."""
class DetectorState(enum.StrEnum):
"""Detector states for Jungfrau Joch detector
['Inactive', 'Idle', 'Busy', 'Measuring', 'Pedestal', 'Error']
"""
class DetectorState(str, enum.Enum):
"""Possible Detector states for Jungfrau Joch detector"""
INACTIVE = "Inactive"
IDLE = "Idle"
@@ -24,24 +32,46 @@ class DetectorState(enum.StrEnum):
ERROR = "Error"
class ResponseWaitDone(enum.IntEnum):
"""Response state for Jungfrau Joch detector wait till done"""
class ResponseWaitDone(int, enum.Enum):
"""Response state for Jungfrau Joch detector wait till done call"""
DETECTOR_IDLE = 200
TIMEOUT_PARAM_OUT_OF_RANGE = 400
TIMEOUT_PARAM_OUT_OF_BOUNDS = 400
JUNGFRAU_ERROR = 500
DETECTOR_INACTIVE = 502
TIMEOUT_REACHED = 504
class JungfrauJochClient:
"""Thin wrapper around the Jungfrau Joch API client"""
class ResponseCancelDone(int, enum.Enum):
"""HTTP Response for cancel post"""
def __init__(self, host: str = "http://sls-jfjoch-001:8080") -> None:
CANCEL_SENT_TO_FPGA = 200
class JungfrauJochClient:
"""Thin wrapper around the Jungfrau Joch API client.
sudo systemctl restart jfjoch_broker
sudo systemctl status jfjoch_broker
It looks as if the detector is not being stopped properly.
One module remains running, how can we restart the detector?
"""
def __init__(
self, host: str = "http://sls-jfjoch-001:8080", parent: Device | None = None
) -> None:
self._initialised = False
configuration = jfjoch_client.Configuration(host=host)
api_client = jfjoch_client.ApiClient(configuration)
self.api = jfjoch_client.DefaultApi(api_client)
configuration = Configuration(host=host)
api_client = ApiClient(configuration)
self.api = DefaultApi(api_client)
self._parent_name = parent.name if parent else self.__class__.__name__
@property
def jjf_state(self) -> BrokerStatus:
"""Get the status of JungfrauJoch"""
response = self.api.status_get()
return BrokerStatus(**response.to_dict())
@property
def initialised(self) -> bool:
@@ -53,19 +83,19 @@ class JungfrauJochClient:
"""Set the connected status"""
self._initialised = value
def get_jungfrau_joch_status(self) -> DetectorState:
def get_detector_state(self) -> DetectorState:
"""Get the status of JungfrauJoch"""
return self.api.status_get().state
return DetectorState(self.jjf_state.state)
def connect_and_initialise(self, timeout: int = 5) -> None:
"""Check if JungfrauJoch is connected and ready to receive commands"""
status = self.api.status_get().state
status = self.get_detector_state()
if status != DetectorState.IDLE:
self.api.initialize_post()
self.wait_till_done(timeout)
self.initialised = True
self.wait_till_done(timeout) # Blocking call
self.initialised = True
def set_detector_settings(self, settings: dict | jfjoch_client.DatasetSettings) -> None:
def set_detector_settings(self, settings: dict | DetectorSettings) -> None:
"""Set the detector settings. JungfrauJoch must be in IDLE, Error or Inactive state.
Note, the full settings have to be provided, otherwise the settings will be overwritten with default values.
@@ -73,81 +103,91 @@ class JungfrauJochClient:
settings (dict): dictionary of settings
"""
state = self.api.status_get().state
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
time.sleep(1) # This can be improved.... #TODO
state = self.api.status_get().state
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
raise JungfrauJochClientError(
f"Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}"
f"Error in {self._parent_name}. Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}"
)
if isinstance(settings, dict):
settings = jfjoch_client.DatasetSettings(**settings)
self.api.config_detector_put(settings)
settings = DetectorSettings(**settings)
self.api.config_detector_put(detector_settings=settings)
# Check with Filip if this call is blocking! also check if put_with_http is better
def set_mesaurement_settings(self, settings: dict | jfjoch_client.DatasetSettings) -> None:
"""Set the measurement settings. JungfrauJoch must be in IDLE state.
def start(self, settings: dict | DatasetSettings) -> None:
"""Start the mesaurement. DatasetSettings must be provided, and JungfrauJoch must be in IDLE state.
The method call is blocking and JungfrauJoch will be ready to measure after the call resolves.
Please check the DataSettings class for the available settings.
The minimum required settings are:
beam_x_pxl: StrictFloat | StrictInt,
beam_y_pxl: StrictFloat | StrictInt,
detector_distance_mm: float | int,
incident_energy_keV: float | int,
Args:
settings (dict): dictionary of settings
Please check the DataSettings class for the available settings. Minimum required settings are
beam_x_pxl, beam_y_pxl, detector_distance_mm, incident_energy_keV.
"""
state = self.api.status_get().state
state = self.get_detector_state()
if state != DetectorState.IDLE:
raise JungfrauJochClientError(
f"Detector must be in IDLE state to set settings. Current state: {state}"
f"Error in {self._parent_name}. Detector must be in IDLE state to set settings. Current state: {state}"
)
if isinstance(settings, dict):
settings = jfjoch_client.DatasetSettings(**settings)
settings = DatasetSettings(**settings)
try:
res = self.api.start_post_with_http_info(dataset_settings=settings)
res: ApiResponse = self.api.start_post_with_http_info(dataset_settings=settings)
if res.status_code != 200:
logger.error(
f"Error while setting measurement settings {settings}, response: {res}"
)
raise JungfrauJochClientError(
f"Error while setting measurement settings {settings}, response: {res}"
)
response = f"Error in {self._parent_name}, while setting measurement settings {settings}, response: {res}"
raise JungfrauJochClientError(response)
except JungfrauJochClientError as e:
logger.error(e)
raise e
except Exception as e:
logger.error(
f"Error while setting measurement settings {settings}. Exception raised {e}"
)
raise JungfrauJochClientError(
f"Error while setting measurement settings {settings}. Exception raised {e}"
) from e
response = f"Error in {self._parent_name}, while setting measurement settings {settings}, exception: {e}"
logger.error(response)
raise JungfrauJochClientError(response) from e
def stop(self) -> None:
"""Stop the acquisition"""
try:
res: ApiResponse = self.api.cancel_post_with_http_info() # Should we use a timeout?
if res.status_code != ResponseCancelDone.CANCEL_SENT_TO_FPGA:
response = f"Error in device {self._parent_name} while stopping the measurement. API Response: {res}"
raise JungfrauJochClientError(response)
except JungfrauJochClientError as e:
raise e
except Exception as exc:
raise JungfrauJochClientError from exc
def wait_till_done(self, timeout: int = 5) -> None:
"""Wait until JungfrauJoch is done.
"""Wait for JungfrauJoch to be in Idle state. Blocking call with timeout.
Args:
timeout (int): timeout in seconds
"""
success = False
try:
response = self.api.wait_till_done_post_with_http_info(math.ceil(timeout / 2))
if response.status_code != ResponseWaitDone.DETECTOR_IDLE:
logger.info(
f"Waitin for JungfrauJoch to be done, status: {ResponseWaitDone(response.status_code)}; response msg {response}"
)
response = self.api.wait_till_done_post_with_http_info(math.floor(timeout / 2))
if response.status_code == ResponseWaitDone.DETECTOR_IDLE:
success = True
return
logger.info(
f"Waiting for device {self._parent_name}, jungfrau joch to become IDLE, "
f"status: {ResponseWaitDone(response.status_code)}; response msg {response}"
)
response = self.api.wait_till_done_post_with_http_info(math.floor(timeout / 2))
if response.status_code == ResponseWaitDone.DETECTOR_IDLE:
return
except Exception as e:
logger.error(f"Error while waiting for JungfrauJoch to initialise: {e}")
logger.error(
f"Error in device {self._parent_name} while waiting for JungfrauJoch to initialise: {e}"
)
raise JungfrauJochClientError(
f"Error while waiting for JungfrauJoch to initialise: {e}"
f"Error in device {self._parent_name} while waiting for JungfrauJoch to initialise. Exception: {e}"
) from e
else:
if success is False:
logger.error(
f"Failed to initialise JungfrauJoch with status: {response.status_code}; response msg {response}"
)
raise JungfrauJochClientError(
f"Failed to initialise JungfrauJoch with status: {response.status_code}; response msg {response}"
)
# If the response is IDLE, this part is never reached. We will raise a TimeoutError.
msg = (
f"TimeoutError in device {self._parent_name}, failed to initialise JungfrauJoch with status:"
f"{response.status_code}; response msg {response}"
)
logger.error(msg)
raise TimeoutError(msg)

View File

@@ -0,0 +1,4 @@
"""Readout constants for all relevant detectors at cSAXS beamline."""
# -> should 20e-6, 20us : parallel vs nonparallel, exact values to be checked
EIGER9M_READOUT_TIME_32BIT = 100e-6 # s

View File

@@ -0,0 +1,54 @@
import pytest
from jfjoch_client.api_response import ApiResponse
from jfjoch_client.models.broker_status import BrokerStatus
from jfjoch_client.models.dataset_settings import DatasetSettings
from jfjoch_client.models.detector_settings import DetectorSettings
from csaxs_bec.devices.jungfraujoch.jungfrau_joch_client import DetectorState, ResponseWaitDone
def test_jungfrau_joch_client_models_broker_status():
"""Test BrokerStatus model from JJF client"""
# Test broker status model
broker_status = BrokerStatus
assert "state" in broker_status.model_fields
assert "progress" in broker_status.model_fields
# Test that all DetectorStates are valid BrokerStatus states. This will not raise if
for state in DetectorState:
broker_status = BrokerStatus(state=state.value)
# Test an invalid state
with pytest.raises(ValueError):
broker_status = BrokerStatus(state="wrong")
def test_jungfrau_joch_client_models_dataset_settings():
"""Test DatasetSettings model from JJF client"""
# Test detector state model
settings = {
"beam_x_pxl": 0,
"beam_y_pxl": 0,
"detector_distance_mm": 100,
"incident_energy_keV": 10.00,
}
# Try creating DatasetSettings object with minimal required settigns
dataset_settings = DatasetSettings(**settings)
# Test that image_time_ns and ntrigger are still available
settings["image_time_us"] = 1000
settings["ntrigger"] = 100
dataset_settings = DatasetSettings(**settings)
def test_jungfrau_joch_client_models_api_response():
"""Test APIResponse model from JJF client.
We can only check that all http status code responses are valid.
"""
# Check if all ResponseWaitDone http status codes are valid for the APIResponse model
for state in ResponseWaitDone:
response = ApiResponse(status_code=state.value, data="", headers=None, raw_data=b"")
def test_jungfrau_joch_client_models_detector_settigns():
"""Test DetectorSettings model from JJF client"""
# Must be initialized with frame_time_us
settings = {"frame_time_us": 450}
DetectorSettings(**settings) # type:ignore