Compare commits
11 Commits
copier-upg
...
feat/add_e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d7290e3942 | ||
|
|
730926f5b3 | ||
|
|
18215a05b5 | ||
|
|
5cd93fc5aa | ||
|
|
2b4a13ebc2 | ||
|
|
ee8fa8b962 | ||
|
|
b281e458f9 | ||
|
|
48bd7f73a8 | ||
|
|
b806487c54 | ||
| 53dca4dc6f | |||
| ccf8bb8474 |
64
csaxs_bec/device_configs/jungfrau_joch_test_config.yaml
Normal file
64
csaxs_bec/device_configs/jungfrau_joch_test_config.yaml
Normal file
@@ -0,0 +1,64 @@
|
||||
eiger9m:
|
||||
description: Eiger9m HPC area detector 9M with JungfrauJoch backend
|
||||
deviceClass: csaxs_bec.devices.jungfraujoch.eiger_jfj.Eiger9MCSAXS
|
||||
deviceConfig:
|
||||
host: "http://sls-jfjoch-001"
|
||||
port: 8080
|
||||
deviceTags:
|
||||
- cSAXS
|
||||
- eiger9m
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: async
|
||||
softwareTrigger: false
|
||||
ddg_jfj:
|
||||
description: DelayGenerator for triggering all detectors
|
||||
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DelayGeneratorcSAXS
|
||||
deviceConfig:
|
||||
prefix: 'X12SA-CPCL-DDG3:'
|
||||
# ddg_config:
|
||||
# delay_burst: 40.e-3
|
||||
# delta_width: 0
|
||||
# additional_triggers: 0
|
||||
# polarity:
|
||||
# - 1 # T0 -> DDG MCS
|
||||
# - 0 # eiger
|
||||
# - 1 # falcon
|
||||
# - 1
|
||||
# - 1
|
||||
# amplitude: 4.5
|
||||
# offset: 0
|
||||
# thres_trig_level: 2.5
|
||||
# set_high_on_exposure: False
|
||||
# set_high_on_stage: False
|
||||
deviceTags:
|
||||
- cSAXS
|
||||
- ddg_detectors
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: async
|
||||
softwareTrigger: True
|
||||
|
||||
# Two test devices from the simulation
|
||||
samx:
|
||||
readoutPriority: baseline
|
||||
deviceClass: ophyd_devices.SimPositioner
|
||||
deviceConfig:
|
||||
delay: 1
|
||||
limits:
|
||||
- -50
|
||||
- 50
|
||||
tolerance: 0.01
|
||||
update_frequency: 400
|
||||
deviceTags:
|
||||
- user motors
|
||||
enabled: true
|
||||
readOnly: false
|
||||
bpm4i:
|
||||
readoutPriority: monitored
|
||||
deviceClass: ophyd_devices.SimMonitor
|
||||
deviceConfig:
|
||||
deviceTags:
|
||||
- beamline
|
||||
enabled: true
|
||||
readOnly: false
|
||||
File diff suppressed because it is too large
Load Diff
221
csaxs_bec/devices/jungfraujoch/eiger_jfj.py
Normal file
221
csaxs_bec/devices/jungfraujoch/eiger_jfj.py
Normal file
@@ -0,0 +1,221 @@
|
||||
"""Eiger detector for cSAXS beamline at the Swiss Light Source.
|
||||
|
||||
16bit mode supports 8e7 counts/s per pixel,
|
||||
you will never have more than 12bit subframes, which means 4000 counts per subframe.
|
||||
32bit mode supports 2e7 counts/s per pixel,
|
||||
you will never have more than 24bit subframe, which means 16.7 million counts per subframe.
|
||||
"""
|
||||
|
||||
import enum
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_lib.logger import bec_logger
|
||||
from jfjoch_client.models.dataset_settings import DatasetSettings
|
||||
from jfjoch_client.models.detector_settings import DetectorSettings
|
||||
from jfjoch_client.models.detector_timing import DetectorTiming
|
||||
from ophyd import DeviceStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
from csaxs_bec.devices.jungfraujoch.jungfrau_joch_client import JungfrauJochClient
|
||||
from csaxs_bec.devices.jungfraujoch.readout_constants import EIGER9M_READOUT_TIME_32BIT
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_server.device_server.device_server import DeviceManagerDS
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class EigerCSAXSBitDepth(int, enum.Enum):
|
||||
"""Bit depth for EIGER detector at cSAXS beamline."""
|
||||
|
||||
BIT_DEPTH_16 = 16
|
||||
BIT_DEPTH_32 = 32
|
||||
|
||||
|
||||
class Eiger9MCSAXS(PSIDeviceBase):
|
||||
"""
|
||||
-----------
|
||||
JungfrauJoch - one needs to connect to the jfj-server (sls-jfjoch-001)
|
||||
|
||||
Relevant commands for debugging:
|
||||
sudo systemctl restart jfjoch_broker
|
||||
sudo systemctl status jfjoch_broker
|
||||
|
||||
Some additional notes:
|
||||
------------
|
||||
- If energy on JFJ is set via DetectorSettings, the one in DatasetSettings will be ignored.
|
||||
- One can set this initially, and then set it to none in DetectorSettings such that any update in DatasetSettings will be considered.
|
||||
- IMPORTANT: Any change in energy will be detector. It will be best to have a check ourselves with a certain tolerance of ~ % to not constantly update the energy.
|
||||
- in 'gating' mode, frame_time_us and count_time_us are not used.
|
||||
- The image_time_us of the DatasetSettings and the frame_time_us of the DetectorSettings need to be the same.
|
||||
- The difference between frame_time_us and count_time_us is the readout time.
|
||||
- 16bit and 32bit, when do we switch?
|
||||
- If switching is desired, the readout time needs to be adapted as a function of internal exposure time, and bit depth.
|
||||
This can be up to ~400us for 32bit, and long exposures. This needs to be discussed!
|
||||
- 16bit mode supports 8e7 counts/s per pixel,
|
||||
It needs to be se to None.
|
||||
|
||||
------------
|
||||
Eiger - if power cycling is needed. Use a combination of commands that connect to the chip, and the conda package.
|
||||
The package is available via:
|
||||
cd /sls/X12SA/data/gac-x12sa/erik/micromamba
|
||||
source setup_9m.sh
|
||||
|
||||
------------
|
||||
Nice to set high voltage low first, from conda package (sls_detector_package)
|
||||
p highvoltage 0 or 150 (operational)
|
||||
g highvoltage
|
||||
# Put high voltage to 0 before power cylcing it.
|
||||
telnet bchip500
|
||||
cd power_control_user/
|
||||
./on
|
||||
./off
|
||||
"""
|
||||
|
||||
########################################
|
||||
# Beamline Specific Implementations #
|
||||
########################################
|
||||
|
||||
USER_ACCESS = ["jfj_client"]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
host: str = "http://sls-jfjoch-001",
|
||||
port: int = 8080,
|
||||
scan_info: ScanInfo = None,
|
||||
device_manager=None,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Initialize the PSI Device Base class.
|
||||
|
||||
Args:
|
||||
name (str) : Name of the device
|
||||
scan_info (ScanInfo): The scan info to use.
|
||||
"""
|
||||
super().__init__(name=name, scan_info=scan_info, **kwargs)
|
||||
self._host = f"{host}:{port}"
|
||||
self.jfj_client = JungfrauJochClient(host=self._host, parent=self)
|
||||
self.device_manager = device_manager
|
||||
if self.device_manager is not None:
|
||||
self.device_manager: DeviceManagerDS
|
||||
self._bit_depth = 16
|
||||
self.frame_time = 500e-6 # 500us, will be ignored in DetectorTiming.Gated
|
||||
self.count_time = 300e-6 # 480us, will be ignored in DetectorTiming.Gated
|
||||
# If not gated, frame_time and count_time will be used and logic has to be adjusted
|
||||
self._timing = DetectorTiming.GATED
|
||||
|
||||
def on_init(self) -> None:
|
||||
"""
|
||||
Called when the device is initialized.
|
||||
|
||||
No siganls are connected at this point,
|
||||
thus should not be set here but in on_connected instead.
|
||||
"""
|
||||
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Called after the device is connected and its signals are connected.
|
||||
Default values for signals should be set here.
|
||||
"""
|
||||
# Stop first in case it was in an uncertain state (i.e. measuring)
|
||||
logger.info(f"On connected for {self.name}")
|
||||
self.jfj_client.stop()
|
||||
# Try to connect, needs to be in Inactive or Error state
|
||||
self.jfj_client.connect_and_initialise(timeout=5)
|
||||
|
||||
# Set energy threshold for EIGER detector
|
||||
threshold_ke_v = 6.200 # Grab this from mono energy pseudo device
|
||||
# Energy threshold provided in DetectorSettings, than it is ignored in DatasetSettings
|
||||
|
||||
# This sets the energy threshold for the EIGER detector
|
||||
settings = DetectorSettings(
|
||||
frame_time_us=int(self.frame_time * 1e6),
|
||||
count_time_us=int(self.count_time * 1e6),
|
||||
eiger_bit_depth=self._bit_depth,
|
||||
eiger_threshold_ke_v=threshold_ke_v,
|
||||
timing=self._timing,
|
||||
)
|
||||
self.jfj_client.set_detector_settings(settings)
|
||||
# Second call is needed to ensure that eiger_threshold_ke_v is set to None
|
||||
# if not, DatasetSettings for eiger_threshold_ke_v will be ignored
|
||||
# settings = DetectorSettings(
|
||||
# frame_time_us=int(self.frame_time * 1e6),
|
||||
# count_time_us=int(self.count_time * 1e6),
|
||||
# eiger_bit_depth=self._bit_depth,
|
||||
# timing=self._timing,
|
||||
# )
|
||||
# self.jfj_client.set_detector_settings(settings)
|
||||
|
||||
def on_stage(self) -> DeviceStatus | None:
|
||||
"""
|
||||
Called while staging the device.
|
||||
|
||||
Information about the upcoming scan can be accessed from the scan_info object.
|
||||
"""
|
||||
# Delay generator ddg_jfj needs to be activate
|
||||
ddg = self.device_manager.devices.get("ddg_jfj", None)
|
||||
if ddg is None:
|
||||
logger.warning("ddg_jfj not found in device manager")
|
||||
raise ValueError("ddg_jfj not found in device manager")
|
||||
ntrigger = ddg.compute_num_trigger()
|
||||
if self.scan_info.msg.scan_type == "step":
|
||||
# Energy threshold provided in DetectorSettings, than it is ignored in DatasetSettings
|
||||
print()
|
||||
data_settings = DatasetSettings(
|
||||
image_time_us=int(self.frame_time * 1e6), # this is frame_time
|
||||
ntrigger=ntrigger,
|
||||
beam_x_pxl=0,
|
||||
beam_y_pxl=0,
|
||||
detector_distance_mm=100,
|
||||
incident_energy_ke_v=10.00,
|
||||
# file_prefix = full_path_to_file,
|
||||
)
|
||||
# status = self.task_handler.submit_task(
|
||||
# self.jfj_client.start, task_args=(data_settings,), run=True
|
||||
# )
|
||||
# return status
|
||||
self.jfj_client.start(settings=data_settings)
|
||||
|
||||
# This method computes trigger_pulse_width, ntriggers and bit_depth
|
||||
# trigger_pulse_width -> image_time in s (image_time_us)
|
||||
# ntriggers -> number of images per trigger
|
||||
# bit_depth -> 16 or 32
|
||||
|
||||
def on_unstage(self) -> DeviceStatus | None:
|
||||
"""Called while unstaging the device."""
|
||||
|
||||
def on_pre_scan(self) -> DeviceStatus | None:
|
||||
"""Called right before the scan starts on all devices automatically."""
|
||||
|
||||
def on_trigger(self) -> DeviceStatus | None:
|
||||
"""Called when the device is triggered."""
|
||||
|
||||
def on_complete(self) -> DeviceStatus | None:
|
||||
"""Called to inquire if a device has completed a scans."""
|
||||
|
||||
def wait_for_complete():
|
||||
timeout = 10
|
||||
for _ in range(timeout):
|
||||
try:
|
||||
self.jfj_client.wait_till_done(timeout=1)
|
||||
except TimeoutError:
|
||||
continue
|
||||
except Exception as e:
|
||||
raise ValueError(f"Error in complete for {self.name}, exception: {e}") from e
|
||||
else:
|
||||
break
|
||||
|
||||
status = self.task_handler.submit_task(wait_for_complete, run=True)
|
||||
return status
|
||||
|
||||
def on_kickoff(self) -> DeviceStatus | None:
|
||||
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Called when the device is stopped."""
|
||||
self.jfj_client.stop()
|
||||
self.task_handler.shutdown()
|
||||
@@ -1,8 +1,18 @@
|
||||
"""Module with client interface for the Jungfrau Joch detector API"""
|
||||
|
||||
import enum
|
||||
import math
|
||||
import time
|
||||
|
||||
import jfjoch_client
|
||||
from bec_lib.logger import bec_logger
|
||||
from jfjoch_client.api.default_api import DefaultApi
|
||||
from jfjoch_client.api_client import ApiClient
|
||||
from jfjoch_client.api_response import ApiResponse
|
||||
from jfjoch_client.configuration import Configuration
|
||||
from jfjoch_client.models.broker_status import BrokerStatus
|
||||
from jfjoch_client.models.dataset_settings import DatasetSettings
|
||||
from jfjoch_client.models.detector_settings import DetectorSettings
|
||||
from ophyd import Device
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
@@ -11,10 +21,8 @@ class JungfrauJochClientError(Exception):
|
||||
"""Base class for exceptions in this module."""
|
||||
|
||||
|
||||
class DetectorState(enum.StrEnum):
|
||||
"""Detector states for Jungfrau Joch detector
|
||||
['Inactive', 'Idle', 'Busy', 'Measuring', 'Pedestal', 'Error']
|
||||
"""
|
||||
class DetectorState(str, enum.Enum):
|
||||
"""Possible Detector states for Jungfrau Joch detector"""
|
||||
|
||||
INACTIVE = "Inactive"
|
||||
IDLE = "Idle"
|
||||
@@ -24,24 +32,46 @@ class DetectorState(enum.StrEnum):
|
||||
ERROR = "Error"
|
||||
|
||||
|
||||
class ResponseWaitDone(enum.IntEnum):
|
||||
"""Response state for Jungfrau Joch detector wait till done"""
|
||||
class ResponseWaitDone(int, enum.Enum):
|
||||
"""Response state for Jungfrau Joch detector wait till done call"""
|
||||
|
||||
DETECTOR_IDLE = 200
|
||||
TIMEOUT_PARAM_OUT_OF_RANGE = 400
|
||||
TIMEOUT_PARAM_OUT_OF_BOUNDS = 400
|
||||
JUNGFRAU_ERROR = 500
|
||||
DETECTOR_INACTIVE = 502
|
||||
TIMEOUT_REACHED = 504
|
||||
|
||||
|
||||
class JungfrauJochClient:
|
||||
"""Thin wrapper around the Jungfrau Joch API client"""
|
||||
class ResponseCancelDone(int, enum.Enum):
|
||||
"""HTTP Response for cancel post"""
|
||||
|
||||
def __init__(self, host: str = "http://sls-jfjoch-001:8080") -> None:
|
||||
CANCEL_SENT_TO_FPGA = 200
|
||||
|
||||
|
||||
class JungfrauJochClient:
|
||||
"""Thin wrapper around the Jungfrau Joch API client.
|
||||
|
||||
sudo systemctl restart jfjoch_broker
|
||||
sudo systemctl status jfjoch_broker
|
||||
|
||||
It looks as if the detector is not being stopped properly.
|
||||
One module remains running, how can we restart the detector?
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, host: str = "http://sls-jfjoch-001:8080", parent: Device | None = None
|
||||
) -> None:
|
||||
self._initialised = False
|
||||
configuration = jfjoch_client.Configuration(host=host)
|
||||
api_client = jfjoch_client.ApiClient(configuration)
|
||||
self.api = jfjoch_client.DefaultApi(api_client)
|
||||
configuration = Configuration(host=host)
|
||||
api_client = ApiClient(configuration)
|
||||
self.api = DefaultApi(api_client)
|
||||
self._parent_name = parent.name if parent else self.__class__.__name__
|
||||
|
||||
@property
|
||||
def jjf_state(self) -> BrokerStatus:
|
||||
"""Get the status of JungfrauJoch"""
|
||||
response = self.api.status_get()
|
||||
return BrokerStatus(**response.to_dict())
|
||||
|
||||
@property
|
||||
def initialised(self) -> bool:
|
||||
@@ -53,19 +83,19 @@ class JungfrauJochClient:
|
||||
"""Set the connected status"""
|
||||
self._initialised = value
|
||||
|
||||
def get_jungfrau_joch_status(self) -> DetectorState:
|
||||
def get_detector_state(self) -> DetectorState:
|
||||
"""Get the status of JungfrauJoch"""
|
||||
return self.api.status_get().state
|
||||
return DetectorState(self.jjf_state.state)
|
||||
|
||||
def connect_and_initialise(self, timeout: int = 5) -> None:
|
||||
"""Check if JungfrauJoch is connected and ready to receive commands"""
|
||||
status = self.api.status_get().state
|
||||
status = self.get_detector_state()
|
||||
if status != DetectorState.IDLE:
|
||||
self.api.initialize_post()
|
||||
self.wait_till_done(timeout)
|
||||
self.initialised = True
|
||||
self.wait_till_done(timeout) # Blocking call
|
||||
self.initialised = True
|
||||
|
||||
def set_detector_settings(self, settings: dict | jfjoch_client.DatasetSettings) -> None:
|
||||
def set_detector_settings(self, settings: dict | DetectorSettings) -> None:
|
||||
"""Set the detector settings. JungfrauJoch must be in IDLE, Error or Inactive state.
|
||||
Note, the full settings have to be provided, otherwise the settings will be overwritten with default values.
|
||||
|
||||
@@ -73,81 +103,91 @@ class JungfrauJochClient:
|
||||
settings (dict): dictionary of settings
|
||||
"""
|
||||
state = self.api.status_get().state
|
||||
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
|
||||
time.sleep(1) # This can be improved.... #TODO
|
||||
state = self.api.status_get().state
|
||||
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
|
||||
raise JungfrauJochClientError(
|
||||
f"Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}"
|
||||
f"Error in {self._parent_name}. Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}"
|
||||
)
|
||||
|
||||
if isinstance(settings, dict):
|
||||
settings = jfjoch_client.DatasetSettings(**settings)
|
||||
self.api.config_detector_put(settings)
|
||||
settings = DetectorSettings(**settings)
|
||||
self.api.config_detector_put(detector_settings=settings)
|
||||
# Check with Filip if this call is blocking! also check if put_with_http is better
|
||||
|
||||
def set_mesaurement_settings(self, settings: dict | jfjoch_client.DatasetSettings) -> None:
|
||||
"""Set the measurement settings. JungfrauJoch must be in IDLE state.
|
||||
def start(self, settings: dict | DatasetSettings) -> None:
|
||||
"""Start the mesaurement. DatasetSettings must be provided, and JungfrauJoch must be in IDLE state.
|
||||
The method call is blocking and JungfrauJoch will be ready to measure after the call resolves.
|
||||
|
||||
Please check the DataSettings class for the available settings.
|
||||
The minimum required settings are:
|
||||
beam_x_pxl: StrictFloat | StrictInt,
|
||||
beam_y_pxl: StrictFloat | StrictInt,
|
||||
detector_distance_mm: float | int,
|
||||
incident_energy_keV: float | int,
|
||||
|
||||
Args:
|
||||
settings (dict): dictionary of settings
|
||||
|
||||
Please check the DataSettings class for the available settings. Minimum required settings are
|
||||
beam_x_pxl, beam_y_pxl, detector_distance_mm, incident_energy_keV.
|
||||
|
||||
"""
|
||||
state = self.api.status_get().state
|
||||
state = self.get_detector_state()
|
||||
if state != DetectorState.IDLE:
|
||||
raise JungfrauJochClientError(
|
||||
f"Detector must be in IDLE state to set settings. Current state: {state}"
|
||||
f"Error in {self._parent_name}. Detector must be in IDLE state to set settings. Current state: {state}"
|
||||
)
|
||||
|
||||
if isinstance(settings, dict):
|
||||
settings = jfjoch_client.DatasetSettings(**settings)
|
||||
settings = DatasetSettings(**settings)
|
||||
try:
|
||||
res = self.api.start_post_with_http_info(dataset_settings=settings)
|
||||
res: ApiResponse = self.api.start_post_with_http_info(dataset_settings=settings)
|
||||
if res.status_code != 200:
|
||||
logger.error(
|
||||
f"Error while setting measurement settings {settings}, response: {res}"
|
||||
)
|
||||
raise JungfrauJochClientError(
|
||||
f"Error while setting measurement settings {settings}, response: {res}"
|
||||
)
|
||||
response = f"Error in {self._parent_name}, while setting measurement settings {settings}, response: {res}"
|
||||
raise JungfrauJochClientError(response)
|
||||
except JungfrauJochClientError as e:
|
||||
logger.error(e)
|
||||
raise e
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error while setting measurement settings {settings}. Exception raised {e}"
|
||||
)
|
||||
raise JungfrauJochClientError(
|
||||
f"Error while setting measurement settings {settings}. Exception raised {e}"
|
||||
) from e
|
||||
response = f"Error in {self._parent_name}, while setting measurement settings {settings}, exception: {e}"
|
||||
logger.error(response)
|
||||
raise JungfrauJochClientError(response) from e
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop the acquisition"""
|
||||
try:
|
||||
res: ApiResponse = self.api.cancel_post_with_http_info() # Should we use a timeout?
|
||||
if res.status_code != ResponseCancelDone.CANCEL_SENT_TO_FPGA:
|
||||
response = f"Error in device {self._parent_name} while stopping the measurement. API Response: {res}"
|
||||
raise JungfrauJochClientError(response)
|
||||
except JungfrauJochClientError as e:
|
||||
raise e
|
||||
except Exception as exc:
|
||||
raise JungfrauJochClientError from exc
|
||||
|
||||
def wait_till_done(self, timeout: int = 5) -> None:
|
||||
"""Wait until JungfrauJoch is done.
|
||||
"""Wait for JungfrauJoch to be in Idle state. Blocking call with timeout.
|
||||
|
||||
Args:
|
||||
timeout (int): timeout in seconds
|
||||
"""
|
||||
success = False
|
||||
try:
|
||||
response = self.api.wait_till_done_post_with_http_info(math.ceil(timeout / 2))
|
||||
if response.status_code != ResponseWaitDone.DETECTOR_IDLE:
|
||||
logger.info(
|
||||
f"Waitin for JungfrauJoch to be done, status: {ResponseWaitDone(response.status_code)}; response msg {response}"
|
||||
)
|
||||
response = self.api.wait_till_done_post_with_http_info(math.floor(timeout / 2))
|
||||
if response.status_code == ResponseWaitDone.DETECTOR_IDLE:
|
||||
success = True
|
||||
return
|
||||
logger.info(
|
||||
f"Waiting for device {self._parent_name}, jungfrau joch to become IDLE, "
|
||||
f"status: {ResponseWaitDone(response.status_code)}; response msg {response}"
|
||||
)
|
||||
response = self.api.wait_till_done_post_with_http_info(math.floor(timeout / 2))
|
||||
if response.status_code == ResponseWaitDone.DETECTOR_IDLE:
|
||||
return
|
||||
except Exception as e:
|
||||
logger.error(f"Error while waiting for JungfrauJoch to initialise: {e}")
|
||||
logger.error(
|
||||
f"Error in device {self._parent_name} while waiting for JungfrauJoch to initialise: {e}"
|
||||
)
|
||||
raise JungfrauJochClientError(
|
||||
f"Error while waiting for JungfrauJoch to initialise: {e}"
|
||||
f"Error in device {self._parent_name} while waiting for JungfrauJoch to initialise. Exception: {e}"
|
||||
) from e
|
||||
else:
|
||||
if success is False:
|
||||
logger.error(
|
||||
f"Failed to initialise JungfrauJoch with status: {response.status_code}; response msg {response}"
|
||||
)
|
||||
raise JungfrauJochClientError(
|
||||
f"Failed to initialise JungfrauJoch with status: {response.status_code}; response msg {response}"
|
||||
)
|
||||
# If the response is IDLE, this part is never reached. We will raise a TimeoutError.
|
||||
msg = (
|
||||
f"TimeoutError in device {self._parent_name}, failed to initialise JungfrauJoch with status:"
|
||||
f"{response.status_code}; response msg {response}"
|
||||
)
|
||||
logger.error(msg)
|
||||
raise TimeoutError(msg)
|
||||
|
||||
4
csaxs_bec/devices/jungfraujoch/readout_constants.py
Normal file
4
csaxs_bec/devices/jungfraujoch/readout_constants.py
Normal file
@@ -0,0 +1,4 @@
|
||||
"""Readout constants for all relevant detectors at cSAXS beamline."""
|
||||
|
||||
# -> should 20e-6, 20us : parallel vs nonparallel, exact values to be checked
|
||||
EIGER9M_READOUT_TIME_32BIT = 100e-6 # s
|
||||
54
tests/tests_devices/test_jungfrau_joch.py
Normal file
54
tests/tests_devices/test_jungfrau_joch.py
Normal file
@@ -0,0 +1,54 @@
|
||||
import pytest
|
||||
from jfjoch_client.api_response import ApiResponse
|
||||
from jfjoch_client.models.broker_status import BrokerStatus
|
||||
from jfjoch_client.models.dataset_settings import DatasetSettings
|
||||
from jfjoch_client.models.detector_settings import DetectorSettings
|
||||
|
||||
from csaxs_bec.devices.jungfraujoch.jungfrau_joch_client import DetectorState, ResponseWaitDone
|
||||
|
||||
|
||||
def test_jungfrau_joch_client_models_broker_status():
|
||||
"""Test BrokerStatus model from JJF client"""
|
||||
# Test broker status model
|
||||
broker_status = BrokerStatus
|
||||
assert "state" in broker_status.model_fields
|
||||
assert "progress" in broker_status.model_fields
|
||||
# Test that all DetectorStates are valid BrokerStatus states. This will not raise if
|
||||
for state in DetectorState:
|
||||
broker_status = BrokerStatus(state=state.value)
|
||||
# Test an invalid state
|
||||
with pytest.raises(ValueError):
|
||||
broker_status = BrokerStatus(state="wrong")
|
||||
|
||||
|
||||
def test_jungfrau_joch_client_models_dataset_settings():
|
||||
"""Test DatasetSettings model from JJF client"""
|
||||
# Test detector state model
|
||||
settings = {
|
||||
"beam_x_pxl": 0,
|
||||
"beam_y_pxl": 0,
|
||||
"detector_distance_mm": 100,
|
||||
"incident_energy_keV": 10.00,
|
||||
}
|
||||
# Try creating DatasetSettings object with minimal required settigns
|
||||
dataset_settings = DatasetSettings(**settings)
|
||||
# Test that image_time_ns and ntrigger are still available
|
||||
settings["image_time_us"] = 1000
|
||||
settings["ntrigger"] = 100
|
||||
dataset_settings = DatasetSettings(**settings)
|
||||
|
||||
|
||||
def test_jungfrau_joch_client_models_api_response():
|
||||
"""Test APIResponse model from JJF client.
|
||||
We can only check that all http status code responses are valid.
|
||||
"""
|
||||
# Check if all ResponseWaitDone http status codes are valid for the APIResponse model
|
||||
for state in ResponseWaitDone:
|
||||
response = ApiResponse(status_code=state.value, data="", headers=None, raw_data=b"")
|
||||
|
||||
|
||||
def test_jungfrau_joch_client_models_detector_settigns():
|
||||
"""Test DetectorSettings model from JJF client"""
|
||||
# Must be initialized with frame_time_us
|
||||
settings = {"frame_time_us": 450}
|
||||
DetectorSettings(**settings) # type:ignore
|
||||
Reference in New Issue
Block a user