feat: add JungFrauJoch client draf and test

This commit is contained in:
2024-12-05 09:27:20 +01:00
parent e19ce733a2
commit 8a7a005b56
4 changed files with 216 additions and 4 deletions

View File

@@ -1,7 +1,7 @@
from bec_lib import bec_logger
from ophyd import Component, DeviceStatus, Kind
from ophyd_devices.devices.delay_generator_645 import DelayGenerator, TriggerSource
from ophyd_devices.interfaces.base_classes.psi_device_base import CustomPrepare, PSIDeviceBase
from ophyd_devices.interfaces.base_classes.bec_device_base import BECDeviceBase, CustomPrepare
from ophyd_devices.sim.sim_signals import SetableSignal
from ophyd_devices.utils import bec_utils
@@ -12,7 +12,7 @@ class DelayGeneratorcSAXSError(Exception):
"""Exception raised for errors."""
class DDGSetup(CustomPrepare):
class DDGSetup(CustomPrepare["DelayGeneratorcSAXS"]):
"""
Custom Prepare class with hooks for beamline specific logic for the DG645 at CSAXS
"""
@@ -108,7 +108,7 @@ class DDGSetup(CustomPrepare):
self.parent.trigger_shot.put(1)
class DelayGeneratorcSAXS(PSIDeviceBase, DelayGenerator):
class DelayGeneratorcSAXS(BECDeviceBase, DelayGenerator):
"""
DG645 delay generator at cSAXS (multiple can be in use depending on the setup)

View File

@@ -0,0 +1,153 @@
import enum
import math
import jfjoch_client
from bec_lib.logger import bec_logger
logger = bec_logger.logger
class JungfrauJochClientError(Exception):
"""Base class for exceptions in this module."""
class DetectorState(enum.StrEnum):
"""Detector states for Jungfrau Joch detector
['Inactive', 'Idle', 'Busy', 'Measuring', 'Pedestal', 'Error']
"""
INACTIVE = "Inactive"
IDLE = "Idle"
BUSY = "Busy"
MEASURING = "Measuring"
PEDESTAL = "Pedestal"
ERROR = "Error"
class ResponseWaitDone(enum.IntEnum):
"""Response state for Jungfrau Joch detector wait till done"""
DETECTOR_IDLE = 200
TIMEOUT_PARAM_OUT_OF_RANGE = 400
JUNGFRAU_ERROR = 500
DETECTOR_INACTIVE = 502
TIMEOUT_REACHED = 504
class JungfrauJochClient:
"""Thin wrapper around the Jungfrau Joch API client"""
def __init__(self, host: str = "http://sls-jfjoch-001:8080") -> None:
self._initialised = False
configuration = jfjoch_client.Configuration(host=host)
api_client = jfjoch_client.ApiClient(configuration)
self.api = jfjoch_client.DefaultApi(api_client)
@property
def initialised(self) -> bool:
"""Check if jfj is connected and ready to receive commands"""
return self._initialised
@initialised.setter
def initialised(self, value: bool) -> None:
"""Set the connected status"""
self._initialised = value
def get_jungfrau_joch_status(self) -> DetectorState:
"""Get the status of JungfrauJoch"""
return self.api.status_get().state
def connect_and_initialise(self, timeout: int = 5) -> None:
"""Check if JungfrauJoch is connected and ready to receive commands"""
status = self.api.status_get().state
if status != DetectorState.IDLE:
self.api.initialize_post()
self.wait_till_done(timeout)
self.initialised = True
def set_detector_settings(self, settings: dict | jfjoch_client.DatasetSettings) -> None:
"""Set the detector settings. JungfrauJoch must be in IDLE, Error or Inactive state.
Note, the full settings have to be provided, otherwise the settings will be overwritten with default values.
Args:
settings (dict): dictionary of settings
"""
state = self.api.status_get().state
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
raise JungfrauJochClientError(
f"Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}"
)
if isinstance(settings, dict):
settings = jfjoch_client.DatasetSettings(**settings)
self.api.config_detector_put(settings)
def set_mesaurement_settings(self, settings: dict | jfjoch_client.DatasetSettings) -> None:
"""Set the measurement settings. JungfrauJoch must be in IDLE state.
The method call is blocking and JungfrauJoch will be ready to measure after the call resolves.
Please check the DataSettings class for the available settings.
The minimum required settings are:
beam_x_pxl: StrictFloat | StrictInt,
beam_y_pxl: StrictFloat | StrictInt,
detector_distance_mm: float | int,
incident_energy_keV: float | int,
Args:
settings (dict): dictionary of settings
"""
state = self.api.status_get().state
if state != DetectorState.IDLE:
raise JungfrauJochClientError(
f"Detector must be in IDLE state to set settings. Current state: {state}"
)
if isinstance(settings, dict):
settings = jfjoch_client.DatasetSettings(**settings)
try:
res = self.api.start_post_with_http_info(dataset_settings=settings)
if res.status_code != 200:
logger.error(
f"Error while setting measurement settings {settings}, response: {res}"
)
raise JungfrauJochClientError(
f"Error while setting measurement settings {settings}, response: {res}"
)
except Exception as e:
logger.error(
f"Error while setting measurement settings {settings}. Exception raised {e}"
)
raise JungfrauJochClientError(
f"Error while setting measurement settings {settings}. Exception raised {e}"
) from e
def wait_till_done(self, timeout: int = 5) -> None:
"""Wait until JungfrauJoch is done.
Args:
timeout (int): timeout in seconds
"""
success = False
try:
response = self.api.wait_till_done_post_with_http_info(math.ceil(timeout / 2))
if response.status_code != ResponseWaitDone.DETECTOR_IDLE:
logger.info(
f"Waitin for JungfrauJoch to be done, status: {ResponseWaitDone(response.status_code)}; response msg {response}"
)
response = self.api.wait_till_done_post_with_http_info(math.floor(timeout / 2))
if response.status_code == ResponseWaitDone.DETECTOR_IDLE:
success = True
return
except Exception as e:
logger.error(f"Error while waiting for JungfrauJoch to initialise: {e}")
raise JungfrauJochClientError(
f"Error while waiting for JungfrauJoch to initialise: {e}"
) from e
else:
if success is False:
logger.error(
f"Failed to initialise JungfrauJoch with status: {response.status_code}; response msg {response}"
)
raise JungfrauJochClientError(
f"Failed to initialise JungfrauJoch with status: {response.status_code}; response msg {response}"
)

View File

@@ -1,5 +1,6 @@
from .flomni_fermat_scan import FlomniFermatScan
from .omny_fermat_scan import OMNYFermatScan
from .jungfrau_joch_scan import JungfrauJochTestScan
from .LamNIFermatScan import LamNIFermatScan, LamNIMoveToScanCenter
from .omny_fermat_scan import OMNYFermatScan
from .owis_grid import OwisGrid
from .sgalil_grid import SgalilGrid

View File

@@ -0,0 +1,58 @@
""" Module with JungfrauJochTestScan class. """
from bec_lib import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase, ScanAbortion
logger = bec_logger.logger
class JungfrauJochTestScan(AsyncFlyScanBase):
"""Owis-based grid scan."""
scan_name = "jjf_test"
# scan_report_hint = "device_progress"
required_kwargs = ["points", "exp_time", "readout_time"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
gui_config = {
"Acquisition Parameters": ["num_points", "cycles"],
"Exposure Parameters": ["exp_time", "readout_time"],
}
def __init__(
self, num_points: int, exp_time: float, readout_time: float, cycles: int = 1, **kwargs
):
"""
JungfrauJoch Test scan.
Args:
device (DeviceBase) : The device to be triggered, currently only for delaygenerator csaxs
num_points (int) : Number of points per burst
exp_time (float) : exposure time.
readout_time (float): readout time of detector
cycles (int) : number of cycles, default is 1
Example:
scans.jjf_test(points = 100, exp_time= 1e-3, readout_time=1e-3, cycles = 2)
"""
if readout_time <= 0:
raise ScanAbortion(f"Readout time must be larger than 0, provided value {readout_time}")
super().__init__(exp_time=exp_time, readout_time=readout_time, **kwargs)
self.device = "ddg"
self.num_points = num_points
self.cycles = cycles
self.primary_readout_cycle = 0.2
def scan_core(self):
logger.info(f"Starting with Scan Core")
total_exposure = self.num_points * (self.exp_time + self.readout_time)
for i in range(self.cycles):
logger.info(f"Beginning cycle {i} of {self.cycles}")
status = yield from self.stubs.trigger(min_wait=total_exposure, wait=False)
yield from self.stubs.read(group="monitored", point_id=self.point_id, wait=True)
self.point_id += 1
status.wait()
logger.info(f"Finished cycle {i} of {self.cycles}")
logger.info(f"Finished scan")
self.num_pos = self.point_id