Compare commits

..

16 Commits

Author SHA1 Message Date
x01dc
725eed17ed finalized flomni temp and humidity display
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m16s
CI for csaxs_bec / test (push) Successful in 1m15s
2025-10-14 11:41:31 +02:00
x01dc
1d9fb39c0e initial version flomni temp humidity 2025-10-14 11:41:31 +02:00
x01dc
f1dd299fad gui tools running 2025-10-14 11:41:31 +02:00
1fcb213336 ci: pull from github instead of gitea
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m23s
CI for csaxs_bec / test (push) Successful in 1m22s
2025-10-08 16:33:34 +02:00
x01dc
b08c7bf44b removed debug info
Some checks failed
CI for csaxs_bec / test (push) Failing after 34s
CI for csaxs_bec / test (pull_request) Failing after 35s
2025-10-08 16:23:38 +02:00
x01dc
d16f6b703c tested gui tools 2025-10-08 16:23:38 +02:00
x01dc
f526d5cc05 gui tools running 2025-10-08 16:23:38 +02:00
gac-x01dc
8f7914b978 - added camera viewer device
- fixed some issues in flomni sample
storage device
2025-10-08 16:22:13 +02:00
gac-x01dc
6c65d5546c fixed issue with flomni storage not updating and start using OMNYTools for yesno 2025-10-08 16:22:13 +02:00
2bb6667f30 test(eiger): cleanup and add tests
Some checks failed
CI for csaxs_bec / test (push) Failing after 34s
2025-09-17 17:40:43 +02:00
e3f337e7c3 fix: deprecate old eiger9m_csaxs integration 2025-09-17 17:40:43 +02:00
419d15dcdb fix(mcs-card): fix mcs card test for on_connected 2025-09-17 17:40:43 +02:00
84537eafde refactor(eiger): cleanup, remove auto initialization 2025-09-17 17:40:43 +02:00
6a864c9bc6 refactor(jungfrau-joch-client): Improve wait_till_idle method 2025-09-17 17:40:43 +02:00
afbf7adff5 refactor(eiger): add file event signal 2025-09-17 17:40:43 +02:00
dd304f2f3b feat(eiger): add jfj integration 2025-09-17 17:40:43 +02:00
25 changed files with 1627 additions and 1028 deletions

View File

@@ -47,25 +47,13 @@ jobs:
python-version: "${{ inputs.PYTHON_VERSION || '3.11' }}"
- name: Checkout BEC Core
uses: actions/checkout@v4
with:
repository: bec/bec
ref: "${{ inputs.BEC_CORE_BRANCH || 'main' }}"
path: ./bec
run: git clone --depth 1 --branch "${{ inputs.BEC_CORE_BRANCH || 'main' }}" https://github.com/bec-project/bec.git ./bec
- name: Checkout Ophyd Devices
uses: actions/checkout@v4
with:
repository: bec/ophyd_devices
ref: "${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}"
path: ./ophyd_devices
run: git clone --depth 1 --branch "${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}" https://github.com/bec-project/ophyd_devices.git ./ophyd_devices
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec/bec_widgets
ref: "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}"
path: ./bec_widgets
run: git clone --depth 1 --branch "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}" https://github.com/bec-project/bec_widgets.git ./bec_widgets
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4

View File

@@ -14,6 +14,8 @@ from typeguard import typechecked
from csaxs_bec.bec_ipython_client.plugins.cSAXS import cSAXSBeamlineChecks
from csaxs_bec.bec_ipython_client.plugins.flomni.flomni_optics_mixin import FlomniOpticsMixin
from csaxs_bec.bec_ipython_client.plugins.flomni.x_ray_eye_align import XrayEyeAlign
from csaxs_bec.bec_ipython_client.plugins.flomni.gui_tools import flomniGuiTools
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
logger = bec_logger.logger
@@ -24,27 +26,57 @@ if builtins.__dict__.get("bec") is not None:
umvr = builtins.__dict__.get("umvr")
class FlomniToolsError(Exception):
pass
class FlomniInitError(Exception):
pass
class FlomniError(Exception):
pass
class FlomniTools:
def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
if autoconfirm and default == "y":
self.printgreen(message + " Automatically confirming default: yes")
return True
elif autoconfirm and default == "n":
self.printgreen(message + " Automatically confirming default: no")
return False
if default == "y":
message_ending = " [Y]/n? "
elif default == "n":
message_ending = " y/[N]? "
else:
message_ending = " y/n? "
while True:
user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
if (
user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
) or (default == "y" and user_input == ""):
return True
if (
user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
) or (default == "n" and user_input == ""):
return False
else:
print("Please expicitely confirm y or n.")
class FlomniInitStagesMixin:
def flomni_init_stages(self):
user_input = input("Starting initialization of flOMNI stages. OK? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Starting initialization of flOMNI stages. OK?"):
print("staring...")
else:
return
if self.check_all_axes_of_fomni_referenced():
user_input = input("Continue anyways? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("All axes are referenced. Continue anyways?"):
print("ok then...")
else:
return
@@ -74,10 +106,8 @@ class FlomniInitStagesMixin:
dev.feyex.limits = [-30, -1]
print("done")
user_input = input(
"Init of foptz. Can the stage move to the upstream limit without collision? [y/n]"
)
if user_input == "y":
if self.OMNYTools.yesno("Init of foptz. Can the stage move to the upstream limit without collision?"):
print("good then")
else:
return
@@ -131,10 +161,7 @@ class FlomniInitStagesMixin:
dev.fsamy.limits = [2, 3.1]
print("done")
user_input = input(
"Init of tracking stages. Did you remove the outer laser flight tubes? [y/n]"
)
if user_input == "y":
if self.OMNYTools.yesno("Init of tracking stages. Did you remove the outer laser flight tubes?"):
print("good then")
else:
print("Stopping.")
@@ -150,8 +177,7 @@ class FlomniInitStagesMixin:
dev.ftrackz.limits = [4.5, 5.5]
print("done")
user_input = input("Init of sample stage. Is the piezo at about 0 deg? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Init of sample stage. Is the piezo at about 0 deg?"):
print("good then")
else:
print("Stopping.")
@@ -168,11 +194,7 @@ class FlomniInitStagesMixin:
print("done")
print("Initializing UPR stage.")
user_input = input(
"To ensure that the end switches work, please check that they are currently not pushed."
" Is everything okay? [y/n]"
)
if user_input == "y":
if self.OMNYTools.yesno("To ensure that the end switches work, please check that they are currently not pushed. Is everything okay?"):
print("good then")
else:
print("Stopping.")
@@ -193,8 +215,7 @@ class FlomniInitStagesMixin:
time.sleep(1)
continue
break
user_input = input("Shall I start the index search? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Shall I start the index search?"):
print("good then. Starting index search.")
else:
print("Stopping.")
@@ -213,11 +234,7 @@ class FlomniInitStagesMixin:
dev.fsamroy.limits = [-5, 365]
print("done")
user_input = input(
"Init of foptx. Can the stage move to the positive limit without collision? Attention:"
" tracker flight tube! [y/n]"
)
if user_input == "y":
if self.OMNYTools.yesno("Init of foptx. Can the stage move to the positive limit without collision? Attention: tracker flight tube!"):
print("good then")
else:
print("Stopping.")
@@ -241,8 +258,7 @@ class FlomniInitStagesMixin:
continue
break
user_input = input("Start limit switch search of fopty? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Start limit switch search of fopty?"):
print("good then")
else:
print("Stopping.")
@@ -275,8 +291,7 @@ class FlomniInitStagesMixin:
return False
def set_limits(self):
user_input = input("Set default limits for flOMNI? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Set default limits for flOMNI?"):
print("setting limits...")
else:
print("Stopping.")
@@ -303,8 +318,7 @@ class FlomniInitStagesMixin:
dev.ftrackz.limits = [4.5, 5.5]
def _align_setup(self):
user_input = input("Start moving stages to default initial positions? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Start moving stages to default initial positions?", "y"):
print("Start moving stages...")
else:
print("Stopping.")
@@ -397,7 +411,8 @@ class FlomniSampleTransferMixin:
raise FlomniError("Ftray is not at the 'IN' position. Aborting.")
def ftransfer_flomni_stage_in(self):
sample_in_position = bool(float(dev.flomni_samples.sample_placed.sample0.get()))
sample_in_position = dev.flomni_samples.is_sample_slot_used(0)
#bool(float(dev.flomni_samples.sample_placed.sample0.get()))
if not sample_in_position:
raise FlomniError("There is no sample in the sample stage. Aborting.")
self.reset_correction()
@@ -410,6 +425,8 @@ class FlomniSampleTransferMixin:
umv(dev.fsamx, fsamx_in)
dev.fsamx.limits = [fsamx_in - 0.4, fsamx_in + 0.4]
self.flomnigui_idle()
def laser_tracker_show_all(self):
dev.rtx.controller.laser_tracker_show_all()
@@ -449,6 +466,10 @@ class FlomniSampleTransferMixin:
self.device_manager.devices.fsamx.controller.lights_on()
def ftransfer_flomni_stage_out(self):
self.flomnigui_show_cameras()
target_pos = -162
if np.isclose(dev.fsamx.readback.get(), target_pos, 0.01):
return
@@ -496,22 +517,20 @@ class FlomniSampleTransferMixin:
self.check_tray_in()
self.check_sensor_connected()
sample_in_gripper = bool(float(dev.flomni_samples.sample_in_gripper.get()))
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
if sample_in_gripper:
raise FlomniError(
"The gripper does carry a sample. Cannot proceed getting another sample."
)
sample_signal = getattr(dev.flomni_samples.sample_placed, f"sample{position}")
sample_in_position = bool(float(sample_signal.get()))
sample_in_position = dev.flomni_samples.is_sample_slot_used(position)
if not sample_in_position:
raise FlomniError(f"The planned pick position [{position}] does not have a sample.")
user_input = input(
"Please confirm that there is currently no sample in the gripper. It would be dropped!"
" [y/n]"
)
if user_input == "y":
self.flomnigui_show_cameras()
if self.OMNYTools.yesno("Please confirm that there is currently no sample in the gripper. It would be dropped!", "y"):
print("good then")
else:
print("Stopping.")
@@ -555,12 +574,12 @@ class FlomniSampleTransferMixin:
self.check_tray_in()
self.check_sensor_connected()
sample_in_gripper = bool(float(dev.flomni_samples.sample_in_gripper.get()))
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
#bool(float(dev.flomni_samples.sample_in_gripper.get()))
if not sample_in_gripper:
raise FlomniError("The gripper does not carry a sample.")
sample_signal = getattr(dev.flomni_samples.sample_placed, f"sample{position}")
sample_in_position = bool(float(sample_signal.get()))
sample_in_position = dev.flomni_samples.is_sample_slot_used(position)
if sample_in_position:
raise FlomniError(f"The planned put position [{position}] already has a sample.")
@@ -593,8 +612,9 @@ class FlomniSampleTransferMixin:
self.flomni_modify_storage_non_interactive(100, 0, "-")
self.flomni_modify_storage_non_interactive(position, 1, sample_name)
# TODO: flomni_stage_in if position == 0
# bec.queue.next_dataset_number += 1
if position == 0:
self.ftransfer_flomni_stage_in()
bec.queue.next_dataset_number += 1
def sample_get_name(self, position: int = 0) -> str:
"""
@@ -605,36 +625,51 @@ class FlomniSampleTransferMixin:
def ftransfer_sample_change(self, new_sample_position: int):
self.check_tray_in()
sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
# sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
if sample_in_gripper:
raise FlomniError("There is already a sample in the gripper. Aborting.")
self.check_position_is_valid(new_sample_position)
sample_placed = getattr(
dev.flomni_samples.sample_placed, f"sample{new_sample_position}"
).get()
if new_sample_position == 0:
raise FlomniError("The new sample to place cannot be the sample in the sample stage. Aborting.")
# sample_placed = getattr(
# dev.flomni_samples.sample_placed, f"sample{new_sample_position}"
# ).get()
sample_placed = dev.flomni_samples.is_sample_slot_used(new_sample_position)
if not sample_placed:
raise FlomniError(
f"There is currently no sample in position [{new_sample_position}]. Aborting."
)
sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get()
# sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get()
sample_in_sample_stage = dev.flomni_samples.is_sample_slot_used(0)
if sample_in_sample_stage:
# find a new home for the sample...
empty_slots = []
for name, val in dev.flomni_samples.read().items():
if "flomni_samples_sample_placed_sample" not in name:
continue
if val.get("value") == 0:
empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1]))
# for name, val in dev.flomni_samples.read().items():
# if "flomni_samples_sample_placed_sample" not in name:
# continue
# if val.get("value") == 0:
# empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1]))
for j in range(1,20):
if not dev.flomni_samples.is_sample_slot_used(j):
empty_slots.append(j)
if not empty_slots:
raise FlomniError("There are no empty slots available. Aborting.")
print(f"The following slots are empty: {empty_slots}.")
while True:
user_input = input(f"Where shall I put the sample? Default: [{empty_slots[0]}]")
user_input = input(f"Where shall I put the sample? Default: [{empty_slots[0]}] ")
if user_input.strip() == "":
# No entry: use default
user_input = empty_slots[0]
break
try:
user_input = int(user_input)
if user_input not in empty_slots:
@@ -700,20 +735,20 @@ class FlomniSampleTransferMixin:
if confirm != -1:
return
user_input = input("All OK? Continue? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("All OK? Continue?", "y"):
print("good then")
dev.ftransy.controller.socket_put_confirmed("confirm=1")
else:
print("Stopping.")
return
exit
def ftransfer_gripper_is_open(self) -> bool:
status = bool(float(dev.ftransy.controller.socket_put_and_receive("MG @OUT[9]").strip()))
return status
def ftransfer_gripper_open(self):
sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
#dev.flomni_samples.sample_in_gripper.get()
if sample_in_gripper:
raise FlomniError(
"Cannot open gripper. There is still a sample in the gripper! Aborting."
@@ -733,11 +768,8 @@ class FlomniSampleTransferMixin:
fsamx_pos = dev.fsamx.readback.get()
if position == 0 and fsamx_pos > -160:
user_input = input(
"May the flomni stage be moved out for the sample change? Feedback will be disabled"
" and alignment will be lost! [y/n]"
)
if user_input == "y":
if self.OMNYTools.yesno("May the flomni stage be moved out for the sample change? Feedback will be disabled and alignment will be lost!", "y"):
print("good then")
self.ftransfer_flomni_stage_out()
else:
@@ -1106,6 +1138,7 @@ class Flomni(
FlomniAlignmentMixin,
FlomniOpticsMixin,
cSAXSBeamlineChecks,
flomniGuiTools
):
def __init__(self, client):
super().__init__()
@@ -1128,13 +1161,20 @@ class Flomni(
self.corr_pos_y_2 = []
self.corr_angle_y_2 = []
self.progress = {}
self.progress["subtomo"] = 0
self.progress["subtomo_projection"] = 0
self.progress["subtomo_total_projections"] = 1
self.progress["projection"] = 0
self.progress["total_projections"] = 1
self.progress["angle"] = 0
self.progress["tomo_type"] = 0
self.OMNYTools = OMNYTools(self.client)
self.align = XrayEyeAlign(self.client, self)
self.set_client(client)
def start_x_ray_eye_alignment(self):
user_input = input(
"Starting Xrayeye alignment. Deleting any potential existing alignment for this sample. [Y/n]"
)
if user_input == "y" or user_input == "":
if self.OMNYTools.yesno("Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.", "y"):
self.align = XrayEyeAlign(self.client, self)
try:
self.align.align()
@@ -1570,6 +1610,9 @@ class Flomni(
def tomo_scan(self, subtomo_start=1, start_angle=None, projection_number=None):
"""start a tomo scan"""
self.flomnigui_show_progress()
bec = builtins.__dict__.get("bec")
scans = builtins.__dict__.get("scans")
self._current_special_angles = self.special_angles.copy()
@@ -1706,6 +1749,7 @@ class Flomni(
print(f"Angle: ........................... {self.progress['angle']}")
print(f"Current subtomo: ................. {self.progress['subtomo']}")
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}\x1b[0m")
self._flomnigui_update_progress()
def add_sample_database(
self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user
@@ -1795,7 +1839,7 @@ class Flomni(
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None:
tomo_scan_numbers_file = os.path.expanduser(
"~/Data10/specES1/dat-files/tomography_scannumbers.txt"
"~/tomography_scannumbers.txt"
)
with open(tomo_scan_numbers_file, "a+") as out_file:
# pylint: disable=undefined-variable
@@ -1894,8 +1938,8 @@ class Flomni(
)
print(f"\nSample name: {self.sample_name}\n")
user_input = input("Are these parameters correctly set for your scan? [Y/n]")
if user_input == "y" or user_input == "":
if self.OMNYTools.yesno("Are these parameters correctly set for your scan?", "y"):
print("... excellent!")
else:
self.tomo_countingtime = self._get_val("<ctime> s", self.tomo_countingtime, float)

View File

@@ -0,0 +1,150 @@
import builtins
from bec_widgets.cli.client import BECDockArea
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
class flomniGuiToolsError(Exception):
pass
class flomniGuiTools:
def __init__(self):
self.gui_window = None
self.camera_gripper_image = None
self.camera_overview_image = None
self.progressbar = None
self.text_box = None
self.idle_text_box = None
def set_client(self, client):
self.client = client
self.gui = self.client.gui
def flomnigui_show_gui(self):
if "flomni" in self.gui.windows:
self.gui.flomni.show()
else:
self.gui.new("flomni")
def flomnigui_stop_gui(self):
self.gui.flomni.hide()
def flomnigui_show_cameras(self):
self.flomnigui_show_gui()
if self.camera_gripper_image is None or self.camera_overview_image is None:
self.flomnigui_remove_all_docks()
self.camera_gripper_image = self.gui.flomni.new("camera_gripper").new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_gripper):
self.camera_gripper_image.image(("cam_flomni_gripper", "preview"))
self.camera_gripper_image.lock_aspect_ratio = True
self.camera_gripper_image.enable_fps_monitor = True
self.camera_gripper_image.enable_toolbar = False
self.camera_gripper_image.outer_axes = False
self.camera_gripper_image.inner_axes = False
dev.cam_flomni_gripper.start_live_mode()
else:
print("Cannot open camera_gripper. Device does not exist.")
self.camera_overview_image = self.gui.flomni.new("camera_overview").new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_overview):
self.camera_overview_image.image(("cam_flomni_overview", "preview"))
self.camera_overview_image.lock_aspect_ratio = True
self.camera_overview_image.enable_fps_monitor = True
self.camera_overview_image.enable_toolbar = False
self.camera_overview_image.outer_axes = False
self.camera_overview_image.inner_axes = False
dev.cam_flomni_overview.start_live_mode()
else:
print("Cannot open camera_overview. Device does not exist.")
def flomnigui_remove_all_docks(self):
dev.cam_flomni_overview.stop_live_mode()
dev.cam_flomni_gripper.stop_live_mode()
self.gui.flomni.delete_all()
self.camera_gripper_image = None
self.camera_overview_image = None
self.progressbar = None
self.text_box = None
self.idle_text_box = None
def flomnigui_idle(self):
self.flomnigui_show_gui()
if self.idle_text_box is None:
self.flomnigui_remove_all_docks()
self.idle_text_box = self.gui.flomni.new("idle_textbox").new("TextBox")
text = (
"<pre>"
+ " ,---.,--. ,-----. ,--. ,--.,--. ,--.,--. \n"
+ "/ .-'| |' .-. '| `.' || ,'.| || | \n"
+ "| `-,| || | | || |'.'| || |' ' || | \n"
+ "| .-'| |' '-' '| | | || | ` || | \n"
+ "`--' `--' `-----' `--' `--'`--' `--'`--' \n"
+ "</pre>"
)
self.idle_text_box.set_html_text(text)
def _flomnicam_check_device_exists(self, device):
try:
device
except:
return False
else:
return True
def flomnigui_show_progress(self):
self.flomnigui_show_gui()
if self.progressbar is None:
self.flomnigui_remove_all_docks()
# Add a new dock with a RingProgressBar widget
self.progressbar = self.gui.flomni.new("progressbar").new("RingProgressBar")
# Customize the size of the progress ring
self.progressbar.set_line_widths(20)
# Disable automatic updates and manually set the self.progressbar value
self.progressbar.enable_auto_updates(False)
# Set precision for the self.progressbar display
self.progressbar.set_precision(1) # Display self.progressbar with one decimal places
# Setting multiple rigns with different values
self.progressbar.set_number_of_bars(3)
self.progressbar.rings[0].set_update("manual")
self.progressbar.rings[1].set_update("manual")
self.progressbar.rings[2].set_update("scan")
# Set the values of the rings to 50, 75, and 25 from outer to inner ring
# self.progressbar.set_value([50, 75])
# Add a new dock with a TextBox widget
self.text_box = self.gui.flomni.new(name="progress_text").new("TextBox")
self._flomnigui_update_progress()
def _flomnigui_update_progress(self):
if self.progressbar is not None:
progress = self.progress["projection"] / self.progress["total_projections"] * 100
subtomo_progress = (
self.progress["subtomo_projection"]
/ self.progress["subtomo_total_projections"]
* 100
)
self.progressbar.set_value([progress, subtomo_progress, 0])
text = f"Progress report:\n Tomo type: ....................... {self.progress['tomo_type']}\n Projection: ...................... {self.progress['projection']:.0f}\n Total projections expected ....... {self.progress['total_projections']}\n Angle: ........................... {self.progress['angle']}\n Current subtomo: ................. {self.progress['subtomo']}\n Current projection within subtomo: {self.progress['subtomo_projection']}\n Total projections per subtomo: ... {self.progress['subtomo_total_projections']}"
self.text_box.set_plain_text(text)
if __name__ == "__main__":
from bec_lib.client import BECClient
from bec_widgets.cli.client_utils import BECGuiClient
client = BECClient()
client.start()
client.gui = BECGuiClient()
flomni_gui = flomniGuiTools(client)
flomni_gui.flomnigui_show_gui()
flomni_gui.flomnigui_show_progress()

View File

@@ -42,3 +42,14 @@ ids_cam:
enabled: true
readoutPriority: async
softwareTrigger: True
eiger_1_5:
description: Eiger 1.5M in-vacuum detector
deviceClass: csaxs_bec.devices.jungfraujoch.eiger_1_5m.Eiger1_5M
deviceConfig:
detector_distance: 100
beam_center: [0, 0]
onFailure: raise
enabled: true
readoutPriority: async
softwareTrigger: False

View File

@@ -213,6 +213,8 @@ ftransy:
onFailure: buffer
readOnly: false
readoutPriority: baseline
userParameter:
sensor_voltage: -2.4
ftransz:
description: Sample transer Z
deviceClass: csaxs_bec.devices.omny.galil.fgalil_ophyd.FlomniGalilMotor
@@ -333,8 +335,8 @@ rtx:
readOnly: false
readoutPriority: on_request
userParameter:
low_signal: 11000
min_signal: 10000
low_signal: 10000
min_signal: 9000
rt_pid_voltage: -0.06219
rty:
description: flomni rt
@@ -362,3 +364,57 @@ rtz:
onFailure: buffer
readOnly: false
readoutPriority: on_request
############################################################
####################### Cameras ############################
############################################################
cam_flomni_gripper:
description: Camera sample changer
deviceClass: csaxs_bec.devices.omny.webcam_viewer.WebcamViewer
deviceConfig:
url: http://flomnicamserver:5000/video_high
num_rotation_90: 3
transpose: false
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
cam_flomni_overview:
description: Camera flomni overview
deviceClass: csaxs_bec.devices.omny.webcam_viewer.WebcamViewer
deviceConfig:
url: http://flomnicamserver:5001/video_high
num_rotation_90: 3
transpose: false
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
# cam_flomni_xeye:
# description: Camera flOMNI Xray eye ID101
# deviceClass: csaxs_bec.devices.ids_cameras.ids_camera.IDSCamera
# deviceConfig:
# camera_ID: 101
# bits_per_pixel: 24
# channels: 3
# m_n_colormode: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: async
# ############################################################
# ################### flOMNI temperatures ####################
# ############################################################
flomni_temphum:
description: flOMNI Temperatures and humidity
deviceClass: csaxs_bec.devices.omny.flomni_temp_and_humidity.FlomniTempHum
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline

View File

@@ -33,6 +33,7 @@ from __future__ import annotations
import threading
import time
import traceback
from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
@@ -160,6 +161,9 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# f = e + 1us
# e has refernce to d, f has reference to e
self.set_delay_pairs(channel="ef", delay=0, width=1e-6)
time.sleep(
0.2
) # After staging, make sure that the DDG HW has some time to process changes properly.
def _prepare_mcs_on_trigger(self, mcs: MCSCardCSAXS) -> None:
"""Prepare the MCS card for the next trigger.
@@ -188,7 +192,13 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
while (
self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set()
):
self._poll_loop()
try:
self._poll_loop()
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Exception in polling loop thread, polling continues...\n Error content:\n{content}"
)
self._poll_thread_poll_loop_done.set()
@@ -199,13 +209,17 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
The 20ms sleep was added to ensure that the event status is not polled too frequently,
and to give the device time to process the previous command. This was found empirically
to be necessary to avoid missing events.
IMPORTANT: Do not remove sleeps or try to optimize this logic. This seems to be a
fragile balance between polling frequency and device processing time. Also in between
start/stop of polling. Please also consider that there is a sleep in on_trigger and
that this might also be necessary to avoid that HW becomes unavailable/unstable.
"""
self.state.proc_status.put(1, use_complete=True)
time.sleep(0.02) # 20ms delay for processing, important for not missing events
if self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set():
if self._poll_thread_kill_event.is_set() or not self._poll_thread_run_event.is_set():
return
self.state.event_status.get(use_monitor=False)
if self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set():
if self._poll_thread_kill_event.is_set() or not self._poll_thread_run_event.is_set():
return
time.sleep(0.02) # 20ms delay for processing, important for not missing events
@@ -256,6 +270,9 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# Stop polling, poll once manually to ensure that the register is clean
self._stop_polling()
self._poll_thread_poll_loop_done.wait(timeout=1)
# IMPORTANT: Keep this sleep setting, as it is necessary to avoid that the HW
# becomes unresponsive. This was found empirically and seems to be necessary
time.sleep(0.02)
# Prepare the MCS card for the next software trigger
mcs = self.device_manager.devices.get("mcs", None)

View File

@@ -1,381 +0,0 @@
import enum
import os
import threading
import time
from typing import Any
import numpy as np
from bec_lib.logger import bec_logger
from ophyd import ADComponent as ADCpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
from std_daq_client import StdDaqClient
logger = bec_logger.logger
class EigerError(Exception):
"""Base class for exceptions in this module."""
class EigerTimeoutError(EigerError):
"""Raised when the Eiger does not respond in time."""
class Eiger9MSetup(CustomDetectorMixin):
"""Eiger setup class
Parent class: CustomDetectorMixin
"""
def __init__(self, *args, parent: Device = None, **kwargs) -> None:
super().__init__(*args, parent=parent, **kwargs)
self.std_rest_server_url = (
kwargs["file_writer_url"] if "file_writer_url" in kwargs else "http://xbl-daq-29:5000"
)
self.std_client = None
self._lock = threading.RLock()
def on_init(self) -> None:
"""Initialize the detector"""
self.initialize_default_parameter()
self.initialize_detector()
self.initialize_detector_backend()
def initialize_detector(self) -> None:
"""Initialize detector"""
self.stop_detector()
self.parent.cam.trigger_mode.put(TriggerSource.GATING)
def initialize_default_parameter(self) -> None:
"""Set default parameters for Eiger9M detector"""
self.update_readout_time()
def update_readout_time(self) -> None:
"""Set readout time for Eiger9M detector"""
readout_time = (
self.parent.scaninfo.readout_time
if hasattr(self.parent.scaninfo, "readout_time")
else self.parent.MIN_READOUT
)
self.parent.readout_time = max(readout_time, self.parent.MIN_READOUT)
def initialize_detector_backend(self) -> None:
"""Initialize detector backend"""
self.std_client = StdDaqClient(url_base=self.std_rest_server_url)
self.std_client.stop_writer()
eacc = self.parent.scaninfo.username
self.update_std_cfg("writer_user_id", int(eacc.strip(" e")))
signal_conditions = [(lambda: self.std_client.get_status()["state"], "READY")]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
all_signals=True,
):
raise EigerTimeoutError(
f"Std client not in READY state, returns: {self.std_client.get_status()}"
)
def update_std_cfg(self, cfg_key: str, value: Any) -> None:
"""
Update std_daq config
Checks that the new value matches the type of the former entry.
Args:
cfg_key (str) : config key of value to be updated
value (Any) : value to be updated for the specified key
Raises:
Raises EigerError if the key was not in the config before and if the new value does not match the type of the old value
"""
cfg = self.std_client.get_config()
old_value = cfg.get(cfg_key)
if old_value is None:
raise EigerError(
f"Tried to change entry for key {cfg_key} in std_config that does not exist"
)
if not isinstance(value, type(old_value)):
raise EigerError(
f"Type of new value {type(value)}:{value} does not match old value"
f" {type(old_value)}:{old_value}"
)
cfg.update({cfg_key: value})
logger.debug(cfg)
self.std_client.set_config(cfg)
logger.debug(f"Updated std_daq config for key {cfg_key} from {old_value} to {value}")
def on_stage(self) -> None:
"""Prepare the detector for scan"""
self.prepare_detector()
self.prepare_data_backend()
self.publish_file_location(done=False, successful=False)
self.arm_acquisition()
def prepare_detector(self) -> None:
"""Prepare detector for scan"""
self.set_detector_threshold()
self.set_acquisition_params()
self.parent.cam.trigger_mode.put(TriggerSource.GATING)
def set_detector_threshold(self) -> None:
"""
Set the detector threshold
The function sets the detector threshold automatically to 1/2 of the beam energy.
"""
mokev = self.parent.device_manager.devices.mokev.obj.read()[
self.parent.device_manager.devices.mokev.name
]["value"]
factor = 1
unit = getattr(self.parent.cam.threshold_energy, "units", None)
if unit is not None and unit == "eV":
factor = 1000
setpoint = int(mokev * factor)
energy = self.parent.cam.beam_energy.read()[self.parent.cam.beam_energy.name]["value"]
if setpoint != energy:
self.parent.cam.beam_energy.set(setpoint)
threshold = self.parent.cam.threshold_energy.read()[self.parent.cam.threshold_energy.name][
"value"
]
if not np.isclose(setpoint / 2, threshold, rtol=0.05):
self.parent.cam.threshold_energy.set(setpoint / 2)
def set_acquisition_params(self) -> None:
"""Set acquisition parameters for the detector"""
self.parent.cam.num_images.put(
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
)
self.parent.cam.num_frames.put(1)
self.update_readout_time()
def prepare_data_backend(self) -> None:
"""Prepare the data backend for the scan"""
self.parent.filepath.set(
self.parent.filewriter.compile_full_filename(f"{self.parent.name}.h5")
).wait()
self.filepath_exists(self.parent.filepath.get())
self.stop_detector_backend()
try:
self.std_client.start_writer_async(
{
"output_file": self.parent.filepath.get(),
"n_images": int(
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
),
}
)
except Exception as exc:
time.sleep(5)
if self.std_client.get_status()["state"] == "READY":
raise EigerTimeoutError(f"Timeout of start_writer_async with {exc}") from exc
signal_conditions = [
(lambda: self.std_client.get_status()["acquisition"]["state"], "WAITING_IMAGES")
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
check_stopped=False,
all_signals=True,
):
raise EigerTimeoutError(
"Timeout of 5s reached for std_daq start_writer_async with std_daq client status"
f" {self.std_client.get_status()}"
)
def on_unstage(self) -> None:
"""Unstage the detector"""
pass
def on_complete(self) -> None:
"""Complete the detector"""
self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS)
self.publish_file_location(done=True, successful=True)
def on_stop(self) -> None:
"""Stop the detector"""
self.stop_detector()
self.stop_detector_backend()
def stop_detector(self) -> None:
"""Stop the detector"""
# Stop detector
self.parent.cam.acquire.put(0)
signal_conditions = [
(
lambda: self.parent.cam.detector_state.read()[self.parent.cam.detector_state.name][
"value"
],
DetectorState.IDLE,
)
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2,
check_stopped=True,
all_signals=False,
):
# Retry stop detector and wait for remaining time
self.parent.cam.acquire.put(0)
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2,
check_stopped=True,
all_signals=False,
):
raise EigerTimeoutError(
f"Failed to stop detector, detector state {signal_conditions[0][0]}"
)
def stop_detector_backend(self) -> None:
"""Close file writer"""
self.std_client.stop_writer()
def filepath_exists(self, filepath: str) -> None:
"""Check if filepath exists"""
signal_conditions = [(lambda: os.path.exists(os.path.dirname(filepath)), True)]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
check_stopped=False,
all_signals=True,
):
raise EigerError(f"Timeout of 3s reached for filepath {filepath}")
def arm_acquisition(self) -> None:
"""Arm Eiger detector for acquisition"""
self.parent.cam.acquire.put(1)
signal_conditions = [
(
lambda: self.parent.cam.detector_state.read()[self.parent.cam.detector_state.name][
"value"
],
DetectorState.RUNNING,
)
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
check_stopped=True,
all_signals=False,
):
raise EigerTimeoutError(
f"Failed to arm the acquisition. Detector state {signal_conditions[0][0]}"
)
def finished(self, timeout: int = 5) -> None:
"""Check if acquisition is finished."""
with self._lock:
signal_conditions = [
(
lambda: self.parent.cam.acquire.read()[self.parent.cam.acquire.name]["value"],
DetectorState.IDLE,
),
(lambda: self.std_client.get_status()["acquisition"]["state"], "FINISHED"),
(
lambda: self.std_client.get_status()["acquisition"]["stats"][
"n_write_completed"
],
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger),
),
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=timeout,
check_stopped=True,
all_signals=True,
):
raise EigerTimeoutError(
f"Reached timeout with detector state {signal_conditions[0][0]}, std_daq state"
f" {signal_conditions[1][0]} and received frames of {signal_conditions[2][0]} for"
" the file writer"
)
self.stop_detector()
self.stop_detector_backend()
class SLSDetectorCam(Device):
"""
SLS Detector Camera - Eiger9M
Base class to map EPICS PVs to ophyd signals.
"""
threshold_energy = ADCpt(EpicsSignalWithRBV, "ThresholdEnergy")
beam_energy = ADCpt(EpicsSignalWithRBV, "BeamEnergy")
bit_depth = ADCpt(EpicsSignalWithRBV, "BitDepth")
num_images = ADCpt(EpicsSignalWithRBV, "NumCycles")
num_frames = ADCpt(EpicsSignalWithRBV, "NumFrames")
trigger_mode = ADCpt(EpicsSignalWithRBV, "TimingMode")
trigger_software = ADCpt(EpicsSignal, "TriggerSoftware")
acquire = ADCpt(EpicsSignal, "Acquire")
detector_state = ADCpt(EpicsSignalRO, "DetectorState_RBV")
class TriggerSource(int, enum.Enum):
"""Trigger signals for Eiger9M detector"""
AUTO = 0
TRIGGER = 1
GATING = 2
BURST_TRIGGER = 3
class DetectorState(int, enum.Enum):
"""Detector states for Eiger9M detector"""
IDLE = 0
ERROR = 1
WAITING = 2
FINISHED = 3
TRANSMITTING = 4
RUNNING = 5
STOPPED = 6
STILL_WAITING = 7
INITIALIZING = 8
DISCONNECTED = 9
ABORTED = 10
class Eiger9McSAXS(PSIDetectorBase):
"""
Eiger9M detector for CSAXS
Parent class: PSIDetectorBase
class attributes:
custom_prepare_cls (FalconSetup) : Custom detector setup class for cSAXS,
inherits from CustomDetectorMixin
PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector
Various EpicsPVs for controlling the detector
"""
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = []
# specify Setup class
custom_prepare_cls = Eiger9MSetup
# specify minimum readout time for detector and timeout for checks after unstage
MIN_READOUT = 3e-3
TIMEOUT_FOR_SIGNALS = 5
# specify class attributes
cam = ADCpt(SLSDetectorCam, "cam1:")
if __name__ == "__main__":
eiger = Eiger9McSAXS(name="eiger", prefix="X12SA-ES-EIGER9M:", sim_mode=True)

View File

@@ -158,7 +158,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
self.acquire_mode.set(ACQUIREMODE.MCS).wait(timeout=self._pv_timeout)
# Subscribe the progress signal
self.current_channel.subscribe(self._progress_update, run=False)
# self.current_channel.subscribe(self._progress_update, run=False)
# Subscribe to the mca updates
for name in self.counter_mapping.keys():

View File

@@ -0,0 +1,318 @@
"""
Generic integration of JungfrauJoch backend with Eiger detectors
for the cSAXS beamline at the Swiss Light Source.
The WEB UI is available on http://sls-jfjoch-001:8080
NOTE: this may not be the best place to store this information. It should be migrated to
beamline documentation for debugging of Eiger & JungfrauJoch.
The JungfrauJoch server for cSAXS runs on sls-jfjoch-001.psi.ch
User with sufficient rights may use:
- sudo systemctl restart jfjoch_broker
- sudo systemctl status jfjoch_broker
to check and/or restart the broker for the JungfrauJoch server.
Some extra notes for setting up the detector:
- If the energy on JFJ is set via DetectorSettings, the variable in DatasetSettings will be ignored
- Changes in energy may take time, good to implement logic that only resets energy if needed.
- For the Eiger, the frame_time_us in DetectorSettings is ignored, only the frame_time_us in
the DatasetSettings is relevant
- The bit_depth will be adjusted automatically based on the exp_time. Here, we need to ensure
that subsequent triggers properly
consider the readout_time of the boards. For Jungfrau detectors, the difference between
count_time_us and frame_time_us is the readout_time of the boards. For the Eiger, this needs
to be taken into account during the integration.
- beam_center and detector settings are required input arguments, thus, they may be set to wrong
values for acquisitions to start. Please keep this in mind.
Hardware related notes:
- If there is an HW issue with the detector, power cycling may help.
- The sls_detector package is available on console on /sls/X12SA/data/gac-x12sa/erik/micromamba
- Run: source setup_9m.sh # Be careful, this connects to the detector, so it should not be
used during operation
- Useful commands:
- p highvoltage 0 or 150 (operational)
- g highvoltage
- # Put high voltage to 0 before power cylcing it.
- telnet bchip500
- cd power_control_user/
- ./on
- ./off
Further information that may be relevant for debugging:
JungfrauJoch - one needs to connect to the jfj-server (sls-jfjoch-001)
"""
from __future__ import annotations
import os
import time
from typing import TYPE_CHECKING, Literal
import yaml
from bec_lib.file_utils import get_full_path
from bec_lib.logger import bec_logger
from jfjoch_client.models.dataset_settings import DatasetSettings
from jfjoch_client.models.detector_settings import DetectorSettings
from jfjoch_client.models.detector_state import DetectorState
from jfjoch_client.models.detector_timing import DetectorTiming
from jfjoch_client.models.file_writer_format import FileWriterFormat
from jfjoch_client.models.file_writer_settings import FileWriterSettings
from ophyd import Component as Cpt
from ophyd import DeviceStatus
from ophyd_devices import FileEventSignal, PreviewSignal
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from csaxs_bec.devices.jungfraujoch.jungfrau_joch_client import JungfrauJochClient
from csaxs_bec.devices.jungfraujoch.jungfraujoch_preview import JungfrauJochPreview
if TYPE_CHECKING: # pragma no cover
from bec_lib.devicemanager import ScanInfo
from bec_server.device_server.device_server import DeviceManagerDS
from jfjoch_client.models.measurement_statistics import MeasurementStatistics
logger = bec_logger.logger
EIGER_READOUT_TIME_US = 500e-6 # 500 microseconds in s
class EigerError(Exception):
"""Custom exception for Eiger detector errors."""
class Eiger(PSIDeviceBase):
"""
Base integration of the Eiger1.5M and Eiger9M at cSAXS. All relevant
"""
USER_ACCESS = ["detector_distance", "beam_center"]
file_event = Cpt(FileEventSignal, name="file_event")
preview_image = Cpt(PreviewSignal, name="preview_image", ndim=2)
def __init__(
self,
name: str,
detector_name: Literal["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"],
host: str = "http://sls-jfjoch-001",
port: int = 8080,
detector_distance: float = 100.0,
beam_center: tuple[int, int] = (0, 0),
scan_info: ScanInfo = None,
readout_time: float = EIGER_READOUT_TIME_US,
device_manager=None,
**kwargs,
):
"""
Initialize the PSI Device Base class.
Args:
name (str) : Name of the device
detector_name (str): Name of the detector. Supports ["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"]
host (str): Hostname of the Jungfrau Joch server.
port (int): Port of the Jungfrau Joch server.
scan_info (ScanInfo): The scan info to use.
device_manager (DeviceManagerDS): The device manager to use.
**kwargs: Additional keyword arguments.
"""
super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs)
self._host = f"{host}:{port}"
self.jfj_client = JungfrauJochClient(host=self._host, parent=self)
self.jfj_preview_client = JungfrauJochPreview(
url="tcp://129.129.95.114:5400", cb=self.preview_image.put
) # IP of sls-jfjoch-001.psi.ch on port 5400 for ZMQ stream
self.device_manager = device_manager
self.detector_name = detector_name
self._detector_distance = detector_distance
self._beam_center = beam_center
self._readout_time = readout_time
self._full_path = ""
if self.device_manager is not None:
self.device_manager: DeviceManagerDS
@property
def detector_distance(self) -> float:
"""The detector distance in mm."""
return self._detector_distance
@detector_distance.setter
def detector_distance(self, value: float) -> None:
"""Set the detector distance in mm."""
if value <= 0:
raise ValueError("Detector distance must be a positive value.")
self._detector_distance = value
@property
def beam_center(self) -> tuple[float, float]:
"""The beam center in pixels. (x,y)"""
return self._beam_center
@beam_center.setter
def beam_center(self, value: tuple[float, float]) -> None:
"""Set the beam center in pixels. (x,y)"""
self._beam_center = value
def on_init(self) -> None:
"""
Called when the device is initialized.
No siganls are connected at this point,
thus should not be set here but in on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
logger.debug(f"On connected called for {self.name}")
self.jfj_client.stop(request_timeout=3)
# Check which detector is selected
# Get available detectors
available_detectors = self.jfj_client.api.config_select_detector_get(_request_timeout=5)
# Get current detector
current_detector_name = ""
if available_detectors.current_id:
detector_selection = [
det.description
for det in available_detectors.detectors
if det.id == available_detectors.current_id
]
current_detector_name = detector_selection[0] if detector_selection else ""
if current_detector_name != self.detector_name:
raise RuntimeError(
f"Please select and initialise the detector {self.detector_name} in the WEB UI: {self._host}."
)
if self.jfj_client.detector_state != DetectorState.IDLE:
raise RuntimeError(
f"Detector {self.detector_name} is not in IDLE state, current state: {self.jfj_client.detector_state}. Please initialize the detector in the WEB UI: {self._host}."
)
# TODO - check again once Eiger should be initialized automatically, currently human initialization is expected
# # Once the automation should be enabled, we may use here
# detector_selection = [
# det for det in available_detectors.detectors if det.id == self.detector_name
# ]
# if not detector_selection:
# raise ValueError(
# f"Detector {self.detector_name} not found in available detectors: {[det.description for det in available_detectors.detectors]}"
# )
# det_id = detector_selection[0].id
# self.jfj_client.api.config_select_detector_put(
# detector_selection=DetectorSelection(id=det_id), _request_timeout=5
# )
# self.jfj_client.connect_and_initialise(timeout=10)
# Setup Detector settings, here we may also set the energy already as this might be time consuming
settings = DetectorSettings(frame_time_us=int(500), timing=DetectorTiming.TRIGGER)
self.jfj_client.set_detector_settings(settings, timeout=10)
# Set the file writer to the appropriate output for the HDF5 file
file_writer_settings = FileWriterSettings(overwrite=True, format=FileWriterFormat.NXMXVDS)
logger.debug(
f"Setting writer_settings: {yaml.dump(file_writer_settings.to_dict(), indent=4)}"
)
self.jfj_client.api.config_file_writer_put(
file_writer_settings=file_writer_settings, _request_timeout=10
)
# Start the preview client
self.jfj_preview_client.connect()
self.jfj_preview_client.start()
logger.info(f"Connected to JungfrauJoch preview stream at {self.jfj_preview_client.url}")
def on_stage(self) -> DeviceStatus | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info object.
"""
start_time = time.time()
scan_msg = self.scan_info.msg
# Set acquisition parameter
# TODO add check of mono energy, this can then also be passed to DatasetSettings
incident_energy = 12.0
exp_time = scan_msg.scan_parameters.get("exp_time", 0)
if exp_time <= self._readout_time:
raise ValueError(
f"Receive scan request for scan {scan_msg.scan_name} with exp_time {exp_time}s, which must be larger than the readout time {self._readout_time}s of the detector {self.detector_name}."
)
frame_time_us = exp_time #
ntrigger = int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"])
# Fetch file path
self._full_path = get_full_path(scan_msg, name=f"{self.name}_master")
self._full_path = os.path.abspath(os.path.expanduser(self._full_path))
# Inform BEC about upcoming file event
self.file_event.put(
file_path=self._full_path,
done=False,
successful=False,
hinted_h5_entries={"data": "entry/data/data"},
)
# JFJ adds _master.h5 automatically
path = os.path.relpath(self._full_path, start="/sls/x12sa/data").removesuffix("_master.h5")
data_settings = DatasetSettings(
image_time_us=int(frame_time_us * 1e6), # This is currently ignored
ntrigger=ntrigger,
file_prefix=path,
beam_x_pxl=int(self._beam_center[0]),
beam_y_pxl=int(self._beam_center[1]),
detector_distance_mm=self.detector_distance,
incident_energy_ke_v=incident_energy,
)
logger.debug(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}")
prep_time = start_time - time.time()
logger.debug(f"Prepared information for eiger to start acquisition in {prep_time:.2f}s")
self.jfj_client.wait_for_idle(timeout=10, request_timeout=10) # Ensure we are in IDLE state
self.jfj_client.start(settings=data_settings) # Takes around ~0.6s
logger.debug(f"Wait for IDLE and start call took {time.time()-start_time-prep_time:.2f}s")
def on_unstage(self) -> DeviceStatus:
"""Called while unstaging the device."""
def on_pre_scan(self) -> DeviceStatus:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus:
"""Called when the device is triggered."""
def _file_event_callback(self, status: DeviceStatus) -> None:
"""Callback to update the file_event signal when the acquisition is done."""
logger.info(f"Acquisition done callback called for {self.name} for status {status.success}")
self.file_event.put(
file_path=self._full_path,
done=status.done,
successful=status.success,
hinted_h5_entries={"data": "entry/data/data"},
)
def on_complete(self) -> DeviceStatus:
"""Called to inquire if a device has completed a scans."""
def wait_for_complete():
start_time = time.time()
timeout = 10
for _ in range(timeout):
if self.jfj_client.wait_for_idle(timeout=1, request_timeout=10):
return
statistics: MeasurementStatistics = self.jfj_client.api.statistics_data_collection_get(
_request_timeout=5
)
raise TimeoutError(
f"Timeout after waiting for detector {self.name} to complete for {time.time()-start_time:.2f}s, measurement statistics: {yaml.dump(statistics.to_dict(), indent=4)}"
)
status = self.task_handler.submit_task(wait_for_complete, run=True)
status.add_callback(self._file_event_callback)
self.cancel_on_stop(status)
return status
def on_kickoff(self) -> DeviceStatus | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.jfj_client.stop(
request_timeout=0.5
) # Call should not block more than 0.5 seconds to stop all devices...
self.task_handler.shutdown()

View File

@@ -0,0 +1,54 @@
"""
Eiger 1.5M specific integration. It is based on the Eiger base integration for the JungfrauJoch backend
which is placed in eiger_csaxs, and where code that is equivalent for the Eiger9M and Eiger1.5M is shared.
Please check the eiger_csaxs.py class for more details about the relevant services.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from csaxs_bec.devices.jungfraujoch.eiger import Eiger
EIGER1_5M_READOUT_TIME_US = 500e-6 # 500 microseconds in s
DETECTOR_NAME = "EIGER 1.5M"
if TYPE_CHECKING: # pragma no cover
from bec_lib.devicemanager import ScanInfo
from bec_server.device_server.device_server import DeviceManagerDS
# pylint:disable=invalid-name
class Eiger1_5M(Eiger):
"""
Eiger 1.5M specific integration for the in-vaccum Eiger.
The logic implemented here is coupled to the DelayGenerator integration,
repsonsible for the global triggering of all devices through a single Trigger logic.
Please check the eiger.py class for more details about the integration of relevant backend
services. The detector_name must be set to "EIGER 1.5M:
"""
USER_ACCESS = Eiger.USER_ACCESS + [] # Add more user_access methods here.
def __init__(
self,
name: str,
detector_distance: float = 100.0,
beam_center: tuple[float, float] = (0.0, 0.0),
scan_info: ScanInfo = None,
device_manager: DeviceManagerDS = None,
**kwargs,
) -> None:
super().__init__(
name=name,
detector_name=DETECTOR_NAME,
readout_time=EIGER1_5M_READOUT_TIME_US,
detector_distance=detector_distance,
beam_center=beam_center,
scan_info=scan_info,
device_manager=device_manager,
**kwargs,
)

View File

@@ -0,0 +1,58 @@
"""
Eiger 9M specific integration. It is based on the Eiger base integration for the JungfrauJoch backend
which is placed in eiger_csaxs, and where code that is equivalent for the Eiger9M and Eiger1.5M is shared.
Please check the eiger_csaxs.py class for more details about the relevant services.
In 16bit mode, 8e7 counts/s per pixel are supported in summed up frames,
although subframes will never have more than 12bit counts (~4000 counts per pixel in subframe).
In 32bit mode, 2e7 counts/s per pixel are supported, for which subframes will have no
more than 24bit counts, which means 16.7 million counts per pixel in subframes.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from csaxs_bec.devices.jungfraujoch.eiger import Eiger
if TYPE_CHECKING: # pragma no cover
from bec_lib.devicemanager import ScanInfo
from bec_server.device_server.device_server import DeviceManagerDS
EIGER9M_READOUT_TIME_US = 500e-6 # 500 microseconds in s
DETECTOR_NAME = "EIGER 8.5M (tmp)" # "EIGER 9M""
# pylint:disable=invalid-name
class Eiger9M(Eiger):
"""
Eiger 1.5M specific integration for the in-vaccum Eiger.
The logic implemented here is coupled to the DelayGenerator integration,
repsonsible for the global triggering of all devices through a single Trigger logic.
Please check the eiger.py class for more details about the integration of relevant backend
services. The detector_name must be set to "EIGER 1.5M:
"""
USER_ACCESS = Eiger.USER_ACCESS + [] # Add more user_access methods here.
def __init__(
self,
name: str,
detector_distance: float = 100.0,
beam_center: tuple[float, float] = (0.0, 0.0),
scan_info: ScanInfo = None,
device_manager: DeviceManagerDS = None,
**kwargs,
) -> None:
super().__init__(
name=name,
detector_name=DETECTOR_NAME,
readout_time=EIGER9M_READOUT_TIME_US,
detector_distance=detector_distance,
beam_center=beam_center,
scan_info=scan_info,
device_manager=device_manager,
**kwargs,
)

View File

@@ -1,20 +1,35 @@
import enum
import math
"""Module with client interface for the Jungfrau Joch detector API"""
import jfjoch_client
from __future__ import annotations
import enum
import time
import traceback
from typing import TYPE_CHECKING
import requests
from bec_lib.logger import bec_logger
from jfjoch_client.api.default_api import DefaultApi
from jfjoch_client.api_client import ApiClient
from jfjoch_client.configuration import Configuration
from jfjoch_client.models.broker_status import BrokerStatus
from jfjoch_client.models.dataset_settings import DatasetSettings
from jfjoch_client.models.detector_settings import DetectorSettings
logger = bec_logger.logger
if TYPE_CHECKING:
from ophyd import Device
# pylint: disable=raise-missing-from
# pylint: disable=broad-except
class JungfrauJochClientError(Exception):
"""Base class for exceptions in this module."""
class DetectorState(enum.StrEnum):
"""Detector states for Jungfrau Joch detector
['Inactive', 'Idle', 'Busy', 'Measuring', 'Pedestal', 'Error']
"""
class DetectorState(str, enum.Enum):
"""Possible Detector states for Jungfrau Joch detector"""
INACTIVE = "Inactive"
IDLE = "Idle"
@@ -24,24 +39,30 @@ class DetectorState(enum.StrEnum):
ERROR = "Error"
class ResponseWaitDone(enum.IntEnum):
"""Response state for Jungfrau Joch detector wait till done"""
DETECTOR_IDLE = 200
TIMEOUT_PARAM_OUT_OF_RANGE = 400
JUNGFRAU_ERROR = 500
DETECTOR_INACTIVE = 502
TIMEOUT_REACHED = 504
class JungfrauJochClient:
"""Thin wrapper around the Jungfrau Joch API client"""
"""Thin wrapper around the Jungfrau Joch API client.
def __init__(self, host: str = "http://sls-jfjoch-001:8080") -> None:
sudo systemctl restart jfjoch_broker
sudo systemctl status jfjoch_broker
It looks as if the detector is not being stopped properly.
One module remains running, how can we restart the detector?
"""
def __init__(
self, host: str = "http://sls-jfjoch-001:8080", parent: Device | None = None
) -> None:
self._initialised = False
configuration = jfjoch_client.Configuration(host=host)
api_client = jfjoch_client.ApiClient(configuration)
self.api = jfjoch_client.DefaultApi(api_client)
configuration = Configuration(host=host)
api_client = ApiClient(configuration)
self.api = DefaultApi(api_client)
self._parent_name = parent.name if parent else self.__class__.__name__
@property
def jjf_state(self) -> BrokerStatus:
"""Get the status of JungfrauJoch"""
response = self.api.status_get()
return BrokerStatus(**response.to_dict())
@property
def initialised(self) -> bool:
@@ -53,101 +74,113 @@ class JungfrauJochClient:
"""Set the connected status"""
self._initialised = value
def get_jungfrau_joch_status(self) -> DetectorState:
# TODO this is not correct, as it may be that the state in INACTIVE. Models are not in sync...
# REMOVE all model enums as most of the validation takes place in the Pydantic models, i.e. BrokerStatus here..
@property
def detector_state(self) -> DetectorState:
"""Get the status of JungfrauJoch"""
return self.api.status_get().state
return DetectorState(self.jjf_state.state)
def connect_and_initialise(self, timeout: int = 5) -> None:
def connect_and_initialise(self, timeout: int = 10, **kwargs) -> None:
"""Check if JungfrauJoch is connected and ready to receive commands"""
status = self.api.status_get().state
status = self.detector_state
if status != DetectorState.IDLE:
self.api.initialize_post()
self.wait_till_done(timeout)
self.initialised = True
self.api.initialize_post() # This is a blocking call....
self.wait_for_idle(timeout, request_timeout=timeout) # Blocking call
self.initialised = True
def set_detector_settings(self, settings: dict | jfjoch_client.DatasetSettings) -> None:
def set_detector_settings(self, settings: dict | DetectorSettings, timeout: int = 10) -> None:
"""Set the detector settings. JungfrauJoch must be in IDLE, Error or Inactive state.
Note, the full settings have to be provided, otherwise the settings will be overwritten with default values.
Args:
settings (dict): dictionary of settings
"""
state = self.api.status_get().state
state = self.detector_state
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
raise JungfrauJochClientError(
f"Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}"
)
time.sleep(1) # Give the detector 1s to become IDLE, retry
state = self.detector_state
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
raise JungfrauJochClientError(
f"Error in {self._parent_name}. Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}"
)
if isinstance(settings, dict):
settings = jfjoch_client.DatasetSettings(**settings)
self.api.config_detector_put(settings)
settings = DetectorSettings(**settings)
try:
self.api.config_detector_put(detector_settings=settings, _request_timeout=timeout)
except requests.exceptions.Timeout:
raise TimeoutError(f"Timeout while setting detector settings for {self._parent_name}")
except Exception:
content = traceback.format_exc()
raise JungfrauJochClientError(
f"Error while setting detector settings for {self._parent_name}: {content}"
)
def set_mesaurement_settings(self, settings: dict | jfjoch_client.DatasetSettings) -> None:
"""Set the measurement settings. JungfrauJoch must be in IDLE state.
def start(self, settings: dict | DatasetSettings, request_timeout: float = 10) -> None:
"""Start the mesaurement. DatasetSettings must be provided, and JungfrauJoch must be in IDLE state.
The method call is blocking and JungfrauJoch will be ready to measure after the call resolves.
Please check the DataSettings class for the available settings.
The minimum required settings are:
beam_x_pxl: StrictFloat | StrictInt,
beam_y_pxl: StrictFloat | StrictInt,
detector_distance_mm: float | int,
incident_energy_keV: float | int,
Args:
settings (dict): dictionary of settings
Please check the DataSettings class for the available settings. Minimum required settings are
beam_x_pxl, beam_y_pxl, detector_distance_mm, incident_energy_keV.
"""
state = self.api.status_get().state
state = self.detector_state
if state != DetectorState.IDLE:
raise JungfrauJochClientError(
f"Detector must be in IDLE state to set settings. Current state: {state}"
f"Error in {self._parent_name}. Detector must be in IDLE state to set settings. Current state: {state}"
)
if isinstance(settings, dict):
settings = jfjoch_client.DatasetSettings(**settings)
settings = DatasetSettings(**settings)
try:
res = self.api.start_post_with_http_info(dataset_settings=settings)
if res.status_code != 200:
logger.error(
f"Error while setting measurement settings {settings}, response: {res}"
)
raise JungfrauJochClientError(
f"Error while setting measurement settings {settings}, response: {res}"
)
except Exception as e:
logger.error(
f"Error while setting measurement settings {settings}. Exception raised {e}"
self.api.start_post_with_http_info(
dataset_settings=settings, _request_timeout=request_timeout
)
except requests.exceptions.Timeout:
raise TimeoutError(
f"TimeoutError in JungfrauJochClient for parent device {self._parent_name} for 'start' call"
)
except Exception:
content = traceback.format_exc()
raise JungfrauJochClientError(
f"Error while setting measurement settings {settings}. Exception raised {e}"
) from e
f"Error in JungfrauJochClient for parent device {self._parent_name} during 'start' call: {content}"
)
def wait_till_done(self, timeout: int = 5) -> None:
"""Wait until JungfrauJoch is done.
def stop(self, request_timeout: float = 0.5) -> None:
"""Stop the acquisition, this only logs errors and is not raising."""
try:
self.api.cancel_post_with_http_info(_request_timeout=request_timeout)
except requests.exceptions.Timeout:
content = traceback.format_exc()
logger.error(
f"Timeout in JungFrauJochClient for device {self._parent_name} during stop: {content}"
)
except Exception:
content = traceback.format_exc()
logger.error(
f"Error in JungFrauJochClient for device {self._parent_name} during stop: {content}"
)
def wait_for_idle(self, timeout: int = 10, request_timeout: float | None = None) -> bool:
"""Wait for JungfrauJoch to be in Idle state. Blocking call with timeout.
Args:
timeout (int): timeout in seconds
Returns:
bool: True if the detector is in IDLE state, False if timeout occurred
"""
success = False
if request_timeout is None:
request_timeout = timeout
try:
response = self.api.wait_till_done_post_with_http_info(math.ceil(timeout / 2))
if response.status_code != ResponseWaitDone.DETECTOR_IDLE:
logger.info(
f"Waitin for JungfrauJoch to be done, status: {ResponseWaitDone(response.status_code)}; response msg {response}"
)
response = self.api.wait_till_done_post_with_http_info(math.floor(timeout / 2))
if response.status_code == ResponseWaitDone.DETECTOR_IDLE:
success = True
return
except Exception as e:
logger.error(f"Error while waiting for JungfrauJoch to initialise: {e}")
raise JungfrauJochClientError(
f"Error while waiting for JungfrauJoch to initialise: {e}"
) from e
else:
if success is False:
logger.error(
f"Failed to initialise JungfrauJoch with status: {response.status_code}; response msg {response}"
)
raise JungfrauJochClientError(
f"Failed to initialise JungfrauJoch with status: {response.status_code}; response msg {response}"
)
self.api.wait_till_done_post(timeout=timeout, _request_timeout=request_timeout)
except requests.exceptions.Timeout:
raise TimeoutError(f"HTTP request timeout in wait_for_idle for {self._parent_name}")
except Exception:
content = traceback.format_exc()
logger.debug(f"Waiting for device {self._parent_name} to become IDLE: {content}")
return False
return True

View File

@@ -0,0 +1,96 @@
"""Module for the Eiger preview ZMQ stream."""
from __future__ import annotations
import json
import threading
import time
from typing import Callable
import numpy as np
import zmq
from bec_lib.logger import bec_logger
logger = bec_logger.logger
ZMQ_TOPIC_FILTER = b""
class JungfrauJochPreview:
USER_ACCESS = ["start", "stop"]
def __init__(self, url: str, cb: Callable):
self.url = url
self._socket = None
self._shutdown_event = threading.Event()
self._zmq_thread = None
self._on_update_callback = cb
def connect(self):
"""Connect to the JungfrauJoch PUB-SUB streaming interface
JungfrauJoch may reject connection for a few seconds when it restarts,
so if it fails, wait a bit and try to connect again.
"""
# pylint: disable=no-member
context = zmq.Context()
self._socket = context.socket(zmq.SUB)
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
try:
self._socket.connect(self.url)
except ConnectionRefusedError:
time.sleep(1)
self._socket.connect(self.url)
def start(self):
self._zmq_thread = threading.Thread(
target=self._zmq_update_loop, daemon=True, name="JungfrauJoch_live_preview"
)
self._zmq_thread.start()
def stop(self):
self._shutdown_event.set()
if self._zmq_thread:
self._zmq_thread.join()
def _zmq_update_loop(self):
while not self._shutdown_event.is_set():
if self._socket is None:
self.connect()
try:
self._poll()
except ValueError:
# Happens when ZMQ partially delivers the multipart message
pass
except zmq.error.Again:
# Happens when receive queue is empty
time.sleep(0.1)
def _poll(self):
"""
Poll the ZMQ socket for new data. It will throttle the data update and
only subscribe to the topic for a single update. This is not very nice
but it seems like there is currently no option to set the update rate on
the backend.
"""
if self._shutdown_event.wait(0.2):
return
try:
# subscribe to the topic
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
# pylint: disable=no-member
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
self._parse_data(r)
finally:
# Unsubscribe from the topic
self._socket.setsockopt(zmq.UNSUBSCRIBE, ZMQ_TOPIC_FILTER)
def _parse_data(self, data):
# TODO decode and parse the data
# self._on_update_callback(data)
pass

View File

@@ -24,24 +24,25 @@ class FlomniSampleStorage(Device):
SUB_VALUE = "value"
_default_sub = SUB_VALUE
sample_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET", {}) for i in range(21)
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET", {"auto_monitor": True}) for i in range(21)
}
sample_placed = Dcpt(sample_placed)
sample_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET.DESC", {"string": True})
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET.DESC", {"string": True, "auto_monitor": True})
for i in range(21)
}
sample_names = Dcpt(sample_names)
sample_in_gripper = Cpt(
EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_flomni100:GET"
EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_flomni100:GET", auto_monitor=True
)
sample_in_gripper_name = Cpt(
EpicsSignal,
name="sample_in_gripper_name",
read_pv="XOMNY-SAMPLE_DB_flomni100:GET.DESC",
string=True,
auto_monitor=True
)
def __init__(self, prefix="", *, name, **kwargs):

View File

@@ -0,0 +1,208 @@
import time
import datetime
from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal
from prettytable import FRAME, PrettyTable
import numpy as np
class FlomniTempHumError(Exception):
pass
class FlomniTempHum(Device):
USER_ACCESS = [
"show_all",
"help",
]
SUB_VALUE = "value"
_default_sub = SUB_VALUE
temperature_mirror = Cpt(
EpicsSignal, name="temperature_mirror", read_pv="XOMNI-TEMPHUM-MIRROR:0.VAL"
)
temperature_mirrorset_set = Cpt(
EpicsSignal, name="temperature_mirrorset_set", read_pv="XOMNI-TEMPHUM-MIRRORSET_SET:0.VAL"
)
temperature_mirrorset_rb = Cpt(
EpicsSignal, name="temperature_mirrorset_rb", read_pv="XOMNI-TEMPHUM-MIRRORSET_RB:0.VAL"
)
temperature_osa = Cpt(
EpicsSignal, name="temperature_osa", read_pv="XOMNI-TEMPHUM-OSA:0.VAL"
)
temperature_osaset_set = Cpt(
EpicsSignal, name="temperature_osaset_set", read_pv="XOMNI-TEMPHUM-OSASET_SET:0.VAL"
)
temperature_osaset_rb = Cpt(
EpicsSignal, name="temperature_osaset_rb", read_pv="XOMNI-TEMPHUM-OSASET_RB:0.VAL"
)
omegactrl_alive = Cpt(
EpicsSignal, name="omegactrl_alive", read_pv="XOMNI-TEMPHUM-OMEGACTRL-ALIVE:0.VAL"
)
galilctrl_alive = Cpt(
EpicsSignal, name="galilctrl_alive", read_pv="XOMNI-TEMPHUM-GALILCTRL-ALIVE:0.VAL"
)
temperature_heater = Cpt(
EpicsSignal, name="temperature_heater", read_pv="XOMNI-TEMPHUM-HEATER:0.VAL"
)
temperature_heaterset_set = Cpt(
EpicsSignal, name="temperature_heaterset_set", read_pv="XOMNI-TEMPHUM-HEATERSET_SET:0.VAL"
)
temperature_heaterset_rb = Cpt(
EpicsSignal, name="temperature_heaterset_rb", read_pv="XOMNI-TEMPHUM-HEATERSET_RB:0.VAL"
)
temperature_heaterhousing = Cpt(
EpicsSignal, name="temperature_heaterhousing", read_pv="XOMNI-TEMPHUM-HEATERHOUSE:0.VAL"
)
temperature_heaterhousing_alarm = Cpt(
EpicsSignal, name="temperature_heaterhousing_alarm", read_pv="XOMNI-TEMPHUM-HEATERHOUSEALARM:0.VAL"
)
temperature_heater_enabled = Cpt(
EpicsSignal, name="temperature_heater_enabled", read_pv="XOMNI-TEMPHUM-HEAT_EN:0.VAL"
)
temperature_heater_enabled = Cpt(
EpicsSignal, name="temperature_heater_enabled", read_pv="XOMNI-TEMPHUM-HEAT_EN:0.VAL"
)
###### GALIL CONTROLLER
humidity_sensor1 = Cpt(
EpicsSignal, name="humidity_sensor1", read_pv="XOMNI-TEMPHUM-HUM1:0.VAL"
)
humidity_sensor2 = Cpt(
EpicsSignal, name="humidity_sensor2", read_pv="XOMNI-TEMPHUM-HUM2:0.VAL"
)
humidity_sensor1_temperature = Cpt(
EpicsSignal, name="humidity_sensor1_temperature", read_pv="XOMNI-TEMPHUM-TEMP1:0.VAL"
)
humidity_sensor2_temperature = Cpt(
EpicsSignal, name="humidity_sensor2_temperature", read_pv="XOMNI-TEMPHUM-TEMP2:0.VAL"
)
humidity_sensor1_err = Cpt(
EpicsSignal, name="humidity_sensor1_err", read_pv="XOMNI-TEMPHUM-ERR1:0.VAL"
)
humidity_sensor2_err = Cpt(
EpicsSignal, name="humidity_sensor2_err", read_pv="XOMNI-TEMPHUM-ERR2:0.VAL"
)
flow = Cpt(
EpicsSignal, name="flow", read_pv="XOMNI-TEMPHUM-FLOW:0.VAL"
)
flowset = Cpt(
EpicsSignal, name="flowset", read_pv="XOMNI-TEMPHUM-FLOWSET:0.VAL"
)
flowset_set = Cpt(
EpicsSignal, name="flowset_set", read_pv="XOMNI-TEMPHUM-FLOWSETSET:0.VAL"
)
humidityset = Cpt(
EpicsSignal, name="humidityset", read_pv="XOMNI-TEMPHUM-HUMSET:0.VAL"
)
humidityset_set = Cpt(
EpicsSignal, name="humidityset_set", read_pv="XOMNI-TEMPHUM-HUMSETSET:0.VAL"
)
suction = Cpt(
EpicsSignal, name="suction", read_pv="XOMNI-TEMPHUM-SUCTION:0.VAL"
)
valvedry = Cpt(
EpicsSignal, name="valvedry", read_pv="XOMNI-TEMPHUM-VALVEDRY:0.VAL"
)
valvewet = Cpt(
EpicsSignal, name="valvewet", read_pv="XOMNI-TEMPHUM-VALVEWET:0.VAL"
)
setuptemp = Cpt(
EpicsSignal, name="setuptemp", read_pv="XOMNI-TEMPHUM-SETUPTEMP:0.VAL"
)
def omega_controller_running(self):
time_diff = np.fabs(float(self.omegactrl_alive.get()) - time.time())
if time_diff > 120:
return False
else:
return True
def galil_controller_running(self):
time_diff = np.fabs(float(self.galilctrl_alive.get()) - time.time())
if time_diff > 120:
return False
else:
return True
def __init__(self, prefix="", *, name, **kwargs):
super().__init__(prefix, name=name, **kwargs)
self.temperature_mirror.subscribe(self._emit_value, run=False)
def _emit_value(self, **kwargs):
timestamp = kwargs.pop("timestamp", time.time())
self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self)
def show_all(self):
print("=== flOMNI Temperature & Humidity Overview ===")
print("")
print("Temperatures:")
print(f" Mirror: {float(self.temperature_mirror.get()):7.2f} °C")
print(f" Mirror Setpoint (RB): {float(self.temperature_mirrorset_rb.get()):7.2f} °C")
print(f" OSA: {float(self.temperature_osa.get()):7.2f} °C")
print(f" OSA Setpoint (RB): {float(self.temperature_osaset_rb.get()):7.2f} °C")
print(f" Heater: {float(self.temperature_heater.get()):7.2f} °C")
print(f" Heater Setpoint (RB): {float(self.temperature_heaterset_rb.get()):7.2f} °C")
print(f" Heater Enabled: {float(self.temperature_heater_enabled.get()):.0f}")
print(f" Heater Housing: {float(self.temperature_heaterhousing.get()):7.2f} °C")
print(f" Heater Housing Alarm: {float(self.temperature_heaterhousing_alarm.get()):.0f}")
print("")
print("Humidity Sensors:")
print(f" Sensor 1 Humidity: {float(self.humidity_sensor1.get()):7.2f} %RH")
print(f" Sensor 1 Temperature: {float(self.humidity_sensor1_temperature.get()):7.2f} °C")
print(f" Sensor 1 Error: {float(self.humidity_sensor1_err.get()):.0f}")
print(f" Sensor 2 Humidity: {float(self.humidity_sensor2.get()):7.2f} %RH")
print(f" Sensor 2 Temperature: {float(self.humidity_sensor2_temperature.get()):7.2f} °C")
print(f" Sensor 2 Error: {float(self.humidity_sensor2_err.get()):.0f}")
print(f" Humidity Setpoint: {float(self.humidityset.get()):7.2f} %RH")
print("")
print("Flow Control:")
print(f" Flow: {float(self.flow.get()):7.2f} sccm")
print(f" Flow Setpoint (RB): {float(self.flowset.get()):7.2f} sccm")
print("")
print("Suction:")
print(f" Suction: {float(self.suction.get()):7.2f}")
print("")
print("Valves:")
print(f" Dry Valve: {float(self.valvedry.get()):.0f}")
print(f" Wet Valve: {float(self.valvewet.get()):.0f}")
print("")
print("Controller Heartbeats:")
print(f" OMEGA Controller Alive: {self.omega_controller_running()}")
print(f" GALIL Controller Alive: {self.galil_controller_running()}")
print("==============================================")
def help(self):
print("Help for flOMNI temperature and humidity control system:")
print("Available methods:")
print(" show_all() - display all current values")

View File

@@ -37,84 +37,86 @@ class OMNYSampleStorage(Device):
_default_sub = SUB_VALUE
sample_shuttle_A_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}", {}) for i in range(1, 7)
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}", {"auto_monitor": True}) for i in range(1, 7)
}
sample_shuttle_A_placed = Dcpt(sample_shuttle_A_placed)
sample_shuttle_B_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}", {}) for i in range(1, 7)
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}", {"auto_monitor": True}) for i in range(1, 7)
}
sample_shuttle_B_placed = Dcpt(sample_shuttle_B_placed)
sample_shuttle_C_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {}) for i in range(1, 7)
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {"auto_monitor": True}) for i in range(1, 7)
}
sample_shuttle_C_placed = Dcpt(sample_shuttle_C_placed)
sample_shuttle_C_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {}) for i in range(1, 7)
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {"auto_monitor": True}) for i in range(1, 7)
}
sample_shuttle_C_placed = Dcpt(sample_shuttle_C_placed)
parking_placed = {
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}", {}) for i in range(1, 7)
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}", {"auto_monitor": True}) for i in range(1, 7)
}
parking_placed = Dcpt(parking_placed)
sample_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}", {})
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}", {"auto_monitor": True})
for i in [10, 11, 12, 13, 14, 32, 33, 34, 100, 101]
}
sample_placed = Dcpt(sample_placed)
sample_shuttle_A_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}.DESC", {"string": True})
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}.DESC", {"string": True, "auto_monitor": True})
for i in range(1, 7)
}
sample_shuttle_A_names = Dcpt(sample_shuttle_A_names)
sample_shuttle_B_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}.DESC", {"string": True})
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}.DESC", {"string": True, "auto_monitor": True})
for i in range(1, 7)
}
sample_shuttle_B_names = Dcpt(sample_shuttle_B_names)
sample_shuttle_C_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}.DESC", {"string": True})
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}.DESC", {"string": True, "auto_monitor": True})
for i in range(1, 7)
}
sample_shuttle_C_names = Dcpt(sample_shuttle_C_names)
parking_names = {
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}.DESC", {"string": True})
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}.DESC", {"string": True, "auto_monitor": True})
for i in range(1, 7)
}
parking_names = Dcpt(parking_names)
sample_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}.DESC", {"string": True})
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}.DESC", {"string": True, "auto_monitor": True})
for i in [10, 11, 12, 13, 14, 32, 33, 34, 100, 101]
}
sample_names = Dcpt(sample_names)
sample_in_gripper = Cpt(
EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_omny:110.VAL"
EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_omny:110.VAL", auto_monitor=True
)
sample_in_gripper_name = Cpt(
EpicsSignal,
name="sample_in_gripper_name",
read_pv="XOMNY-SAMPLE_DB_omny:110.DESC",
string=True,
auto_monitor=True
)
sample_in_samplestage = Cpt(
EpicsSignal, name="sample_in_samplestage", read_pv="XOMNY-SAMPLE_DB_omny:0.VAL"
EpicsSignal, name="sample_in_samplestage", read_pv="XOMNY-SAMPLE_DB_omny:0.VAL", auto_monitor=True
)
sample_in_samplestage_name = Cpt(
EpicsSignal,
name="sample_in_samplestage_name",
read_pv="XOMNY-SAMPLE_DB_omny:0.DESC",
string=True,
auto_monitor=True
)
def __init__(self, prefix="", *, name, **kwargs):

View File

@@ -392,6 +392,9 @@ class RtFlomniController(Controller):
rtx = self.get_device_manager().devices.rtx
min_signal = rtx.user_parameter.get("min_signal")
low_signal = rtx.user_parameter.get("low_signal")
print(f"low signal: {low_signal}")
print(f"min signal: {min_signal}")
print(f"signal: {signal}")
if signal < min_signal:
time.sleep(1)
if signal < min_signal:

View File

@@ -0,0 +1,65 @@
import requests
import threading
import cv2
import numpy as np
from ophyd import Device, Component as Cpt
from ophyd_devices import PreviewSignal
import traceback
from bec_lib.logger import bec_logger
logger = bec_logger.logger
class WebcamViewer(Device):
USER_ACCESS = ["start_live_mode", "stop_live_mode"]
preview = Cpt(PreviewSignal, ndim=2, num_rotation_90=0, transpose=False)
def __init__(self, url:str, name:str, num_rotation_90=0, transpose=False, **kwargs) -> None:
super().__init__(name=name, **kwargs)
self.url = url
self._connection = None
self._update_thread = None
self._buffer = b""
self._shutdown_event = threading.Event()
self.preview.num_rotation_90 = num_rotation_90
self.preview.transpose = transpose
def start_live_mode(self) -> None:
if self._connection is not None:
return
self._update_thread = threading.Thread(target=self._update_loop, daemon=True)
self._update_thread.start()
def _update_loop(self) -> None:
while not self._shutdown_event.is_set():
try:
self._connection = requests.get(self.url, stream=True)
for chunk in self._connection.iter_content(chunk_size=1024):
self._buffer += chunk
start = self._buffer.find(b'\xff\xd8') # JPEG start
end = self._buffer.find(b'\xff\xd9') # JPEG end
if start == -1 or end == -1:
continue
jpg = self._buffer[start:end+2]
self._buffer = self._buffer[end+2:]
image = cv2.imdecode(np.frombuffer(jpg, np.uint8), cv2.IMREAD_COLOR)
if image is not None:
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
self.preview.put(image)
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Image update loop failed: {content}")
def stop_live_mode(self) -> None:
if self._connection is None:
return
self._shutdown_event.set()
if self._connection is not None:
self._connection.close()
self._connection = None
if self._update_thread is not None:
self._update_thread.join()
self._update_thread = None
self._shutdown_event.clear()

View File

@@ -33,11 +33,11 @@ logger = bec_logger.logger
class FlomniFermatScan(SyncFlyScanBase):
scan_name = "flomni_fermat_scan"
scan_report_hint = "table"
scan_type = "fly"
required_kwargs = ["fovx", "fovy", "exp_time", "step", "angle"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
use_scan_progress_report = True
def __init__(
self,
@@ -74,6 +74,7 @@ class FlomniFermatScan(SyncFlyScanBase):
"""
super().__init__(parameter=parameter, **kwargs)
self.show_live_table = False
self.axis = []
self.fovx = fovx
self.fovy = fovy
@@ -168,6 +169,7 @@ class FlomniFermatScan(SyncFlyScanBase):
tracker_signal_status = yield from self.stubs.send_rpc_and_wait(
"rtx", "controller.laser_tracker_check_signalstrength"
)
#self.device_manager.connector.send_client_info(tracker_signal_status)
if tracker_signal_status == "low":
self.device_manager.connector.raise_alarm(
severity=0,

BIN
frame_dump.cbor Normal file

Binary file not shown.

View File

@@ -23,6 +23,8 @@ dependencies = [
"pyepics",
"pyueye", # for the IDS uEye camera
"bec_widgets",
"zmq",
"opencv-python",
]
[project.optional-dependencies]

View File

View File

@@ -0,0 +1,318 @@
# pylint: skip-file
import os
import threading
from time import time
from typing import TYPE_CHECKING, Generator
from unittest import mock
import pytest
from bec_lib.messages import FileMessage, ScanStatusMessage
from jfjoch_client.models.broker_status import BrokerStatus
from jfjoch_client.models.dataset_settings import DatasetSettings
from jfjoch_client.models.detector_list import DetectorList
from jfjoch_client.models.detector_list_element import DetectorListElement
from jfjoch_client.models.detector_settings import DetectorSettings
from jfjoch_client.models.detector_timing import DetectorTiming
from jfjoch_client.models.file_writer_format import FileWriterFormat
from jfjoch_client.models.file_writer_settings import FileWriterSettings
from jfjoch_client.models.measurement_statistics import MeasurementStatistics
from ophyd import Staged
from ophyd_devices.utils.psi_device_base_utils import DeviceStatus
from csaxs_bec.devices.jungfraujoch.eiger import Eiger
from csaxs_bec.devices.jungfraujoch.eiger_1_5m import Eiger1_5M
from csaxs_bec.devices.jungfraujoch.eiger_9m import Eiger9M
if TYPE_CHECKING: # pragma no cover
from bec_lib.messages import FileMessage
# @pytest.fixture(scope="function")
# def scan_worker_mock(scan_server_mock):
# scan_server_mock.device_manager.connector = mock.MagicMock()
# scan_worker = ScanWorker(parent=scan_server_mock)
# yield scan_worker
@pytest.fixture(
scope="function",
params=[(0.1, 1, 1, "line_scan"), (0.2, 2, 2, "time_scan"), (0.5, 5, 5, "acquire")],
)
def mock_scan_info(request, tmpdir):
exp_time, frames_per_trigger, num_points, scan_name = request.param
scan_info = ScanStatusMessage(
scan_id="test_id",
status="open",
scan_number=1,
scan_parameters={
"exp_time": exp_time,
"frames_per_trigger": frames_per_trigger,
"system_config": {},
},
info={"file_components": (f"{tmpdir}/data/S00000/S000001", "h5")},
num_points=num_points,
scan_name=scan_name,
)
yield scan_info
@pytest.fixture(scope="function", params=[(1,), (2,)])
def detector_list(request) -> Generator[DetectorList, None, None]:
"""Fixture for the detector list."""
current_id = request.param[0]
detector_list = DetectorList(
detectors=[
DetectorListElement(
id=1,
description="EIGER 1.5M",
serial_number="123456",
base_ipv4_addr="192.168.0.1",
udp_interface_count=1,
nmodules=1,
width=512,
height=512,
pixel_size_mm=0.1,
readout_time_us=100,
min_frame_time_us=1000,
min_count_time_us=100,
type="EIGER",
),
DetectorListElement(
id=2,
description="EIGER 8.5M (tmp)",
serial_number="123456",
base_ipv4_addr="192.168.0.1",
udp_interface_count=1,
nmodules=1,
width=512,
height=512,
pixel_size_mm=0.1,
readout_time_us=100,
min_frame_time_us=1000,
min_count_time_us=100,
type="EIGER",
),
],
current_id=current_id,
)
yield detector_list
@pytest.fixture(scope="function")
def eiger_1_5m(mock_scan_info) -> Generator[Eiger1_5M, None, None]:
"""Fixture for the Eiger 1.5M device."""
name = "eiger_1_5m"
dev = Eiger1_5M(name=name, beam_center=(256, 256), detector_distance=100.0)
dev.scan_info.msg = mock_scan_info
yield dev
@pytest.fixture(scope="function")
def eiger_9m(mock_scan_info) -> Generator[Eiger9M, None, None]:
"""Fixture for the Eiger 9M device.
Currently only on_connected is different for both devices, all other methods are the same."""
name = "eiger_9m"
dev = Eiger9M(name=name)
dev.scan_info.msg = mock_scan_info
yield dev
@pytest.mark.parametrize("detector_state", ["Idle", "Inactive"])
def test_eiger_1_5m_on_connected(eiger_1_5m, detector_list, detector_state):
"""Test the on_connected logic of the Eiger detector."""
eiger = eiger_1_5m
detector_id = 1
with (
mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop,
mock.patch.object(
eiger.jfj_client.api, "config_select_detector_get", return_value=detector_list
),
mock.patch.object(
eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state=detector_state)
),
mock.patch.object(eiger.jfj_client, "set_detector_settings") as mock_set_det,
mock.patch.object(eiger.jfj_client.api, "config_file_writer_put") as mock_file_writer,
mock.patch.object(eiger, "jfj_preview_client") as mock_jfj_preview_client,
):
if detector_state != "Idle" or detector_list.current_id != detector_id:
with pytest.raises(RuntimeError):
eiger.on_connected()
mock_jfj_client_stop.assert_called_once()
assert mock_jfj_preview_client.call_count == 0
else:
eiger.on_connected()
assert mock_set_det.call_args == mock.call(
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10
)
assert mock_file_writer.call_args == mock.call(
file_writer_settings=FileWriterSettings(
overwrite=True, format=FileWriterFormat.NXMXVDS
),
_request_timeout=10,
)
mock_jfj_client_stop.assert_called_once()
assert mock_jfj_preview_client.connect.call_count == 1
assert mock_jfj_preview_client.start.call_count == 1
@pytest.mark.parametrize("detector_state", ["Idle", "Inactive"])
def test_eiger_9m_on_connected(eiger_9m, detector_list, detector_state):
"""Test the on_connected logic of the Eiger detector."""
eiger = eiger_9m
detector_id = 2
with (
mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop,
mock.patch.object(
eiger.jfj_client.api, "config_select_detector_get", return_value=detector_list
),
mock.patch.object(
eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state=detector_state)
),
mock.patch.object(eiger.jfj_client, "set_detector_settings") as mock_set_det,
mock.patch.object(eiger.jfj_client.api, "config_file_writer_put") as mock_file_writer,
mock.patch.object(eiger, "jfj_preview_client") as mock_jfj_preview_client,
):
if detector_state != "Idle" or detector_list.current_id != detector_id:
with pytest.raises(RuntimeError):
eiger.on_connected()
mock_jfj_client_stop.assert_called_once()
assert mock_jfj_preview_client.call_count == 0
else:
eiger.on_connected()
assert mock_set_det.call_args == mock.call(
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10
)
assert mock_file_writer.call_args == mock.call(
file_writer_settings=FileWriterSettings(
overwrite=True, format=FileWriterFormat.NXMXVDS
),
_request_timeout=10,
)
mock_jfj_client_stop.assert_called_once()
assert mock_jfj_preview_client.connect.call_count == 1
assert mock_jfj_preview_client.start.call_count == 1
@pytest.mark.timeout(20)
def test_eiger_on_stop(eiger_1_5m):
"""Test the on_stop logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
start_event = threading.Event()
stop_event = threading.Event()
def tmp_task():
start_event.set()
try:
while True:
time.sleep(0.1)
finally:
stop_event.set()
eiger.task_handler.submit_task(tmp_task, run=True)
start_event.wait(timeout=5) # Wait for thread to start
with mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop:
eiger.on_stop()
mock_jfj_client_stop.assert_called_once()
stop_event.wait(timeout=5) # Thread should be killed from task_handler
@pytest.mark.timeout(25)
@pytest.mark.parametrize("raise_timeout", [True, False])
def test_eiger_on_complete(eiger_1_5m, raise_timeout):
"""Test the on_complete logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
callback_completed_event = threading.Event()
def _callback_complete(status: DeviceStatus):
if status.done:
callback_completed_event.set()
unblock_wait_for_idle = threading.Event()
def mock_wait_for_idle(timeout: int, request_timeout: float):
if unblock_wait_for_idle.wait(timeout):
if raise_timeout:
return False
return True
return False
with (
mock.patch.object(eiger.jfj_client, "wait_for_idle", side_effect=mock_wait_for_idle),
mock.patch.object(
eiger.jfj_client.api,
"statistics_data_collection_get",
return_value=MeasurementStatistics(run_number=1),
),
):
status = eiger.complete()
status.add_callback(_callback_complete)
assert status.done == False
assert status.success == False
assert eiger.file_event.get() is None
unblock_wait_for_idle.set()
if raise_timeout:
with pytest.raises(TimeoutError):
status.wait(timeout=10)
else:
status.wait(timeout=10)
assert status.done == True
assert status.success == False if raise_timeout else True
def test_eiger_file_event_callback(eiger_1_5m, tmp_path):
"""Test the file_event callback of the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
test_file = tmp_path / "test_file.h5"
eiger._full_path = str(test_file)
assert eiger.file_event.get() is None
status = DeviceStatus(device=eiger, done=True, success=True)
eiger._file_event_callback(status)
file_msg: FileMessage = eiger.file_event.get()
assert file_msg.device_name == eiger.name
assert file_msg.file_path == str(test_file)
assert file_msg.done is True
assert file_msg.successful is True
assert file_msg.hinted_h5_entries == {"data": "entry/data/data"}
status = DeviceStatus(device=eiger, done=False, success=False)
eiger._file_event_callback(status)
file_msg: FileMessage = eiger.file_event.get()
assert file_msg.device_name == eiger.name
assert file_msg.file_path == str(test_file)
assert file_msg.done is False
assert file_msg.successful is False
assert file_msg.hinted_h5_entries == {"data": "entry/data/data"}
def test_eiger_on_sage(eiger_1_5m):
"""Test the on_stage and on_unstage logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
scan_msg = eiger.scan_info.msg
with (
mock.patch.object(eiger.jfj_client, "wait_for_idle", return_value=True),
mock.patch.object(eiger.jfj_client, "start") as mock_start,
):
eiger.stage()
assert (
eiger._full_path
== f"{scan_msg.info['file_components'][0]}_{eiger.name}_master.{scan_msg.info['file_components'][1]}"
)
file_msg: FileMessage = eiger.file_event.get()
assert file_msg.file_path == eiger._full_path
assert file_msg.done is False
assert file_msg.successful is False
assert file_msg.hinted_h5_entries == {"data": "entry/data/data"}
data_settings = DatasetSettings(
image_time_us=int(scan_msg.scan_parameters["exp_time"] * 1e6),
ntrigger=int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"]),
file_prefix=os.path.relpath(eiger._full_path, start="/sls/x12sa/data").removesuffix(
"_master.h5"
),
beam_x_pxl=eiger.beam_center[0],
beam_y_pxl=eiger.beam_center[1],
detector_distance_mm=eiger.detector_distance,
incident_energy_ke_v=12.0, # hardcoded at this moment as it is hardcoded in the Eiger implementation
)
assert mock_start.call_args == mock.call(settings=data_settings)
assert eiger.staged is Staged.yes

View File

@@ -1,444 +0,0 @@
# pylint: skip-file
import threading
from unittest import mock
import ophyd
import pytest
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_server.device_server.tests.utils import DMMock
from ophyd_devices.tests.utils import MockPV
from csaxs_bec.devices.epics.eiger9m_csaxs import Eiger9McSAXS
from csaxs_bec.devices.tests_utils.utils import patch_dual_pvs
@pytest.fixture(scope="function")
def mock_det():
name = "eiger"
prefix = "X12SA-ES-EIGER9M:"
dm = DMMock()
with mock.patch.object(dm, "connector"):
with (
mock.patch("ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"),
mock.patch(
"ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config"
),
):
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
with mock.patch.object(Eiger9McSAXS, "_init"):
det = Eiger9McSAXS(name=name, prefix=prefix, device_manager=dm)
patch_dual_pvs(det)
det.TIMEOUT_FOR_SIGNALS = 0.1
yield det
def test_init():
"""Test the _init function:"""
name = "eiger"
prefix = "X12SA-ES-EIGER9M:"
dm = DMMock()
with mock.patch.object(dm, "connector"):
with (
mock.patch("ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"),
mock.patch(
"ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config"
),
):
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
with (
mock.patch(
"csaxs_bec.devices.epics.eiger9m_csaxs.Eiger9MSetup.initialize_default_parameter"
) as mock_default,
mock.patch(
"csaxs_bec.devices.epics.eiger9m_csaxs.Eiger9MSetup.initialize_detector"
) as mock_init_det,
mock.patch(
"csaxs_bec.devices.epics.eiger9m_csaxs.Eiger9MSetup.initialize_detector_backend"
) as mock_init_backend,
):
Eiger9McSAXS(name=name, prefix=prefix, device_manager=dm)
mock_default.assert_called_once()
mock_init_det.assert_called_once()
mock_init_backend.assert_called_once()
@pytest.mark.parametrize(
"trigger_source, detector_state, expected_exception", [(2, 1, True), (2, 0, False)]
)
def test_initialize_detector(mock_det, trigger_source, detector_state, expected_exception):
"""Test the _init function:
This includes testing the functions:
- _init_detector
- _stop_det
- _set_trigger
--> Testing the filewriter is done in test_init_filewriter
Validation upon setting the correct PVs
"""
mock_det.cam.detector_state._read_pv.mock_data = detector_state
if expected_exception:
with pytest.raises(Exception):
mock_det.timeout = 0.1
mock_det.custom_prepare.initialize_detector()
else:
mock_det.custom_prepare.initialize_detector() # call the method you want to test
assert mock_det.cam.acquire.get() == 0
assert mock_det.cam.detector_state.get() == detector_state
assert mock_det.cam.trigger_mode.get() == trigger_source
def test_trigger(mock_det):
"""Test the trigger function:
Validate that trigger calls the custom_prepare.on_trigger() function
"""
with mock.patch.object(mock_det.custom_prepare, "on_trigger") as mock_on_trigger:
mock_det.trigger()
mock_on_trigger.assert_called_once()
@pytest.mark.parametrize(
"readout_time, expected_value", [(1e-3, 3e-3), (3e-3, 3e-3), (5e-3, 5e-3), (None, 3e-3)]
)
def test_update_readout_time(mock_det, readout_time, expected_value):
if readout_time is None:
mock_det.custom_prepare.update_readout_time()
assert mock_det.readout_time == expected_value
else:
mock_det.scaninfo.readout_time = readout_time
mock_det.custom_prepare.update_readout_time()
assert mock_det.readout_time == expected_value
@pytest.mark.parametrize(
"eacc, exp_url, daq_status, daq_cfg, expected_exception",
[
("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_user_id": 12543}, False),
("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_user_id": 15421}, False),
("e12345", "http://xbl-daq-29:5000", {"state": "BUSY"}, {"writer_user_id": 15421}, True),
("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_ud": 12345}, True),
],
)
def test_initialize_detector_backend(
mock_det, eacc, exp_url, daq_status, daq_cfg, expected_exception
):
"""Test self.custom_prepare.initialize_detector_backend (std daq in this case)
This includes testing the functions:
- _update_service_config
Validation upon checking set values in mocked std_daq instance
"""
with mock.patch("csaxs_bec.devices.epics.eiger9m_csaxs.StdDaqClient") as mock_std_daq:
instance = mock_std_daq.return_value
instance.stop_writer.return_value = None
instance.get_status.return_value = daq_status
instance.get_config.return_value = daq_cfg
mock_det.scaninfo.username = eacc
# scaninfo.username.return_value = eacc
if expected_exception:
with pytest.raises(Exception):
mock_det.timeout = 0.1
mock_det.custom_prepare.initialize_detector_backend()
else:
mock_det.custom_prepare.initialize_detector_backend()
instance.stop_writer.assert_called_once()
instance.get_status.assert_called()
instance.set_config.assert_called_once_with(daq_cfg)
@pytest.mark.parametrize(
"scaninfo, daq_status, daq_cfg, detector_state, stopped, expected_exception",
[
(
{
"eacc": "e12345",
"num_points": 500,
"frames_per_trigger": 1,
"filepath": "test.h5",
"scan_id": "123",
"mokev": 12.4,
},
{"state": "READY"},
{"writer_user_id": 12543},
5,
False,
False,
),
(
{
"eacc": "e12345",
"num_points": 500,
"frames_per_trigger": 1,
"filepath": "test.h5",
"scan_id": "123",
"mokev": 12.4,
},
{"state": "BUSY"},
{"writer_user_id": 15421},
5,
False,
False,
),
(
{
"eacc": "e12345",
"num_points": 500,
"frames_per_trigger": 1,
"filepath": "test.h5",
"scan_id": "123",
"mokev": 18.4,
},
{"state": "READY"},
{"writer_user_id": 12345},
4,
False,
True,
),
],
)
def test_stage(
mock_det, scaninfo, daq_status, daq_cfg, detector_state, stopped, expected_exception
):
with (
mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq,
mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location,
):
mock_std_daq.stop_writer.return_value = None
mock_std_daq.get_status.return_value = daq_status
mock_std_daq.get_config.return_value = daq_cfg
mock_det.scaninfo.num_points = scaninfo["num_points"]
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"]
# TODO consider putting energy as variable in scaninfo
mock_det.device_manager.add_device("mokev", value=12.4)
mock_det.cam.beam_energy.put(scaninfo["mokev"])
mock_det.stopped = stopped
mock_det.cam.detector_state._read_pv.mock_data = detector_state
with mock.patch.object(mock_det.custom_prepare, "prepare_data_backend") as mock_prep_fw:
mock_det.filepath.set(scaninfo["filepath"]).wait()
if expected_exception:
with pytest.raises(Exception):
mock_det.timeout = 0.1
mock_det.stage()
else:
mock_det.stage()
mock_prep_fw.assert_called_once()
# Check _prep_det
assert mock_det.cam.num_images.get() == int(
scaninfo["num_points"] * scaninfo["frames_per_trigger"]
)
assert mock_det.cam.num_frames.get() == 1
mock_publish_file_location.assert_called_with(done=False, successful=False)
assert mock_det.cam.acquire.get() == 1
@pytest.mark.parametrize(
"scaninfo, daq_status, expected_exception",
[
(
{
"eacc": "e12345",
"num_points": 500,
"frames_per_trigger": 1,
"filepath": "test.h5",
"scan_id": "123",
},
{"state": "BUSY", "acquisition": {"state": "WAITING_IMAGES"}},
False,
),
(
{
"eacc": "e12345",
"num_points": 500,
"frames_per_trigger": 1,
"filepath": "test.h5",
"scan_id": "123",
},
{"state": "BUSY", "acquisition": {"state": "WAITING_IMAGES"}},
False,
),
(
{
"eacc": "e12345",
"num_points": 500,
"frames_per_trigger": 1,
"filepath": "test.h5",
"scan_id": "123",
},
{"state": "BUSY", "acquisition": {"state": "ERROR"}},
True,
),
],
)
def test_prepare_detector_backend(mock_det, scaninfo, daq_status, expected_exception):
with (
mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq,
mock.patch.object(mock_det.custom_prepare, "filepath_exists") as mock_file_path_exists,
mock.patch.object(mock_det.custom_prepare, "stop_detector_backend") as mock_stop_backend,
mock.patch.object(mock_det, "scaninfo"),
):
mock_std_daq.start_writer_async.return_value = None
mock_std_daq.get_status.return_value = daq_status
mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"]
mock_det.scaninfo.num_points = scaninfo["num_points"]
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
if expected_exception:
with pytest.raises(Exception):
mock_det.timeout = 0.1
mock_det.custom_prepare.prepare_data_backend()
mock_file_path_exists.assert_called_once()
assert mock_stop_backend.call_count == 2
else:
mock_det.custom_prepare.prepare_data_backend()
mock_file_path_exists.assert_called_once()
mock_stop_backend.assert_called_once()
daq_writer_call = {
"output_file": scaninfo["filepath"],
"n_images": int(scaninfo["num_points"] * scaninfo["frames_per_trigger"]),
}
mock_std_daq.start_writer_async.assert_called_with(daq_writer_call)
@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)])
def test_complete(mock_det, stopped, expected_exception):
with (
mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,
mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location,
):
mock_det.stopped = stopped
if expected_exception:
mock_det.complete()
assert mock_det.stopped is True
else:
mock_det.complete()
mock_finished.assert_called_once()
mock_publish_file_location.assert_called_with(done=True, successful=True)
assert mock_det.stopped is False
def test_stop_detector_backend(mock_det):
with mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq:
mock_std_daq.stop_writer.return_value = None
mock_det.std_client = mock_std_daq
mock_det.custom_prepare.stop_detector_backend()
mock_std_daq.stop_writer.assert_called_once()
@pytest.mark.parametrize(
"scaninfo",
[
({"filepath": "test.h5", "successful": True, "done": False, "scan_id": "123"}),
({"filepath": "test.h5", "successful": False, "done": True, "scan_id": "123"}),
],
)
def test_publish_file_location(mock_det, scaninfo):
mock_det.scaninfo.scan_id = scaninfo["scan_id"]
mock_det.filepath.set(scaninfo["filepath"]).wait()
mock_det.custom_prepare.publish_file_location(
done=scaninfo["done"], successful=scaninfo["successful"]
)
if scaninfo["successful"] is None:
msg = messages.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"])
else:
msg = messages.FileMessage(
file_path=scaninfo["filepath"], done=scaninfo["done"], successful=scaninfo["successful"]
)
expected_calls = [
mock.call(
MessageEndpoints.public_file(scaninfo["scan_id"], mock_det.name),
msg,
pipe=mock_det.connector.pipeline.return_value,
),
mock.call(
MessageEndpoints.file_event(mock_det.name),
msg,
pipe=mock_det.connector.pipeline.return_value,
),
]
assert mock_det.connector.set_and_publish.call_args_list == expected_calls
def test_stop(mock_det):
with (
mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det,
mock.patch.object(
mock_det.custom_prepare, "stop_detector_backend"
) as mock_stop_detector_backend,
):
mock_det.stop()
mock_stop_det.assert_called_once()
mock_stop_detector_backend.assert_called_once()
assert mock_det.stopped is True
@pytest.mark.parametrize(
"stopped, scaninfo, cam_state, daq_status, expected_exception",
[
(
False,
{"num_points": 500, "frames_per_trigger": 4},
0,
{"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 2000}}},
False,
),
(
False,
{"num_points": 500, "frames_per_trigger": 4},
0,
{"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 1999}}},
True,
),
(
False,
{"num_points": 500, "frames_per_trigger": 1},
1,
{"acquisition": {"state": "READY", "stats": {"n_write_completed": 500}}},
True,
),
(
False,
{"num_points": 500, "frames_per_trigger": 1},
0,
{"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 500}}},
False,
),
],
)
def test_finished(mock_det, stopped, cam_state, daq_status, scaninfo, expected_exception):
with (
mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq,
mock.patch.object(mock_det.custom_prepare, "stop_detector_backend") as mock_stop_backend,
mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det,
):
mock_std_daq.get_status.return_value = daq_status
mock_det.cam.acquire._read_pv.mock_state = cam_state
mock_det.scaninfo.num_points = scaninfo["num_points"]
mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
if expected_exception:
with pytest.raises(Exception):
mock_det.timeout = 0.1
mock_det.custom_prepare.finished()
assert mock_det.stopped is stopped
else:
mock_det.custom_prepare.finished()
if stopped:
assert mock_det.stopped is stopped
mock_stop_backend.assert_called()
mock_stop_det.assert_called_once()

View File

@@ -96,11 +96,9 @@ def test_mcs_card_csaxs_on_connected(mock_mcs_csaxs):
assert mcs.read_mode.get() == READMODE.PASSIVE
assert mcs.acquire_mode.get() == ACQUIREMODE.MCS
with mock.patch.object(mcs.current_channel, "subscribe") as mock_cur_ch_subscribe:
with mock.patch.object(mcs.counters.mca1, "subscribe") as mock_mca_subscribe:
mcs.on_connected()
assert mock_cur_ch_subscribe.call_args == mock.call(mcs._progress_update, run=False)
assert mock_mca_subscribe.call_args == mock.call(mcs._on_counter_update, run=False)
with mock.patch.object(mcs.counters.mca1, "subscribe") as mock_mca_subscribe:
mcs.on_connected()
assert mock_mca_subscribe.call_args == mock.call(mcs._on_counter_update, run=False)
def test_mcs_card_csaxs_stage(mock_mcs_csaxs):