saving of images tested
CI for csaxs_bec / test (push) Successful in 1m31s
CI for csaxs_bec / test (pull_request) Successful in 1m31s

This commit is contained in:
x12sa
2026-07-01 11:19:51 +02:00
committed by wakonig_k
co-authored by wakonig_k
parent 46a1909296
commit 0fd5c26332
2 changed files with 23 additions and 10 deletions
@@ -9,6 +9,7 @@ import h5py
import numpy as np
from bec_lib import bec_logger
from bec_lib.alarm_handler import AlarmBase
from bec_lib.endpoints import MessageEndpoints
from bec_lib.pdf_writer import PDFWriter
from bec_lib.scan_repeat import scan_repeat
from typeguard import typechecked
@@ -613,6 +614,8 @@ class FlomniSampleTransferMixin:
print("The unmount process started.")
self.transfer_step = 0
time.sleep(1)
while True:
in_progress = bool(
@@ -620,7 +623,7 @@ class FlomniSampleTransferMixin:
)
if not in_progress:
break
self.ftransfer_confirm()
self.ftransfer_confirm(step_name="get")
time.sleep(1)
self.ftransfer_controller_disable_mount_mode()
self.ensure_gripper_up()
@@ -663,13 +666,15 @@ class FlomniSampleTransferMixin:
print("The mount process started.")
time.sleep(1)
self.transfer_step = 0
while True:
in_progress = bool(
float(dev.ftransy.controller.socket_put_and_receive("MG mntprgs").strip())
)
if not in_progress:
break
self.ftransfer_confirm()
self.ftransfer_confirm(step_name="put")
time.sleep(1)
self.ftransfer_controller_disable_mount_mode()
self.ensure_gripper_up()
@@ -797,26 +802,32 @@ class FlomniSampleTransferMixin:
)
return in_mount_mode
def ftransfer_confirm(self):
def ftransfer_confirm(self, step_name: str = ""):
confirm = int(float(dev.ftransy.controller.socket_put_and_receive("MG confirm").strip()))
if confirm != -1:
return
self.transfer_step += 1
if self.OMNYTools.yesno("All OK? Continue?", "y"):
print("OK. continue.")
reference_image = dev.cam_flomni_gripper.image.get()
self.save_reference_image(reference_image)
data = self.client.connector.get_last(
MessageEndpoints.device_preview("cam_flomni_gripper", "preview")
)["data"].data
self.save_reference_image(data, file_suffix=f"{step_name}_{self.transfer_step}")
dev.ftransy.controller.socket_put_confirmed("confirm=1")
else:
print("Stopping.")
raise FlomniError("User abort sample transfer.")
def save_reference_image(self, image):
def save_reference_image(self, image: np.ndarray, file_suffix: str = ""):
suffix = f"_{file_suffix}_" or "_"
# Save the reference image to a file
timestamp = time.strftime("%Y%m%d_%H%M%S")
file = os.path.expanduser(f"~/data/raw/logs/sample_transfer_reference_image_{timestamp}.h5")
file = os.path.expanduser(
f"~/data/raw/logs/sample_transfer_images/sample_transfer_reference_image{suffix}{timestamp}.h5"
)
os.makedirs(os.path.dirname(file), exist_ok=True)
with h5py.File(file, "w") as f:
f.create_dataset("reference_image", data=image)
@@ -78,6 +78,7 @@ class XrayEyeAlign:
# self.alignment_images.append(msg["data"].data)
def _save_alignment_data(self, file_path: str):
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with h5py.File(os.path.expanduser(file_path), "w") as f:
f.create_dataset(
"alignment_values", data=np.array(list(self.alignment_values.values()))
@@ -96,6 +97,8 @@ class XrayEyeAlign:
dev.fsh.fshopen()
time.sleep(1)
# store the image
self.alignment_images.append(dev.cam_xeye.get_last_image())
# stop live view
if not keep_shutter_open:
self.gui.on_live_view_enabled(False)
@@ -220,7 +223,6 @@ class XrayEyeAlign:
print(f"Current rtx position {rtx_position}")
self.alignment_values[k] -= rtx_position
print(f"Corrected position {k}: x {self.alignment_values[k]}")
self.alignment_images.append(dev.cam_xeye.image.get())
# reset submit channel
dev.omny_xray_gui.submit.set(0)
@@ -377,7 +379,7 @@ class XrayEyeAlign:
)
def write_output(self):
file = os.path.expanduser("~/data/raw/logs/xrayeye_alignmentvalues")
file = os.path.expanduser("~/data/raw/logs/xrayeye_alignmentvalues/xrayeye_alignmentvalues")
timestamp = time.strftime("%Y%m%d_%H%M%S")
self._save_alignment_data(file + f"_image_data_{timestamp}.h5")
if not os.path.exists(file):