feat(flomni): add functionality to save reference and alignment images as HDF5 files #238

Merged
wakonig_k merged 4 commits from feature/save_ref_images into main 2026-07-01 11:22:56 +02:00
14 changed files with 113 additions and 153 deletions
@@ -5,9 +5,11 @@ import subprocess
import time
from pathlib import Path
import h5py
import numpy as np
from bec_lib import bec_logger
from bec_lib.alarm_handler import AlarmBase
from bec_lib.endpoints import MessageEndpoints
from bec_lib.pdf_writer import PDFWriter
from bec_lib.scan_repeat import scan_repeat
from typeguard import typechecked
@@ -612,6 +614,8 @@ class FlomniSampleTransferMixin:
print("The unmount process started.")
self.transfer_step = 0
time.sleep(1)
while True:
in_progress = bool(
@@ -619,7 +623,7 @@ class FlomniSampleTransferMixin:
)
if not in_progress:
break
self.ftransfer_confirm()
self.ftransfer_confirm(step_name="get")
time.sleep(1)
self.ftransfer_controller_disable_mount_mode()
self.ensure_gripper_up()
@@ -662,13 +666,15 @@ class FlomniSampleTransferMixin:
print("The mount process started.")
time.sleep(1)
self.transfer_step = 0
while True:
in_progress = bool(
float(dev.ftransy.controller.socket_put_and_receive("MG mntprgs").strip())
)
if not in_progress:
break
self.ftransfer_confirm()
self.ftransfer_confirm(step_name="put")
time.sleep(1)
self.ftransfer_controller_disable_mount_mode()
self.ensure_gripper_up()
@@ -796,19 +802,35 @@ class FlomniSampleTransferMixin:
)
return in_mount_mode
def ftransfer_confirm(self):
def ftransfer_confirm(self, step_name: str = ""):
confirm = int(float(dev.ftransy.controller.socket_put_and_receive("MG confirm").strip()))
if confirm != -1:
return
self.transfer_step += 1
if self.OMNYTools.yesno("All OK? Continue?", "y"):
print("OK. continue.")
data = self.client.connector.get_last(
MessageEndpoints.device_preview("cam_flomni_gripper", "preview")
)["data"].data
self.save_reference_image(data, file_suffix=f"{step_name}_{self.transfer_step}")
dev.ftransy.controller.socket_put_confirmed("confirm=1")
else:
print("Stopping.")
raise FlomniError("User abort sample transfer.")
def save_reference_image(self, image: np.ndarray, file_suffix: str = ""):
suffix = f"_{file_suffix}_" or "_"
# Save the reference image to a file
timestamp = time.strftime("%Y%m%d_%H%M%S")
file = os.path.expanduser(
f"~/data/raw/logs/sample_transfer_images/sample_transfer_reference_image{suffix}{timestamp}.h5"
)
os.makedirs(os.path.dirname(file), exist_ok=True)
with h5py.File(file, "w") as f:
f.create_dataset("reference_image", data=image)
def ftransfer_gripper_is_open(self) -> bool:
status = bool(float(dev.ftransy.controller.socket_put_and_receive("MG @OUT[9]").strip()))
return status
@@ -2599,13 +2621,10 @@ class Flomni(
if self.tomo_type == 1:
print("\x1b[1mTomo type 1:\x1b[0m 8 equally spaced sub-tomograms")
print(f"Angular range = {self.tomo_angle_range} degrees")
# N, step, total_projections all come from the same helper
# sub_tomo_scan() effectively uses internally - see
# _tomo_type1_actual_grid() for why this can't just read
# self.tomo_angle_stepsize directly.
_, achievable_step, total_projections = self._tomo_type1_actual_grid()
print(f"Total number of projections: {total_projections}")
print(f"Angular step within sub-tomogram: {achievable_step:.3f} degrees")
print(
f"Total number of projections: {(self.tomo_angle_range/self.tomo_angle_stepsize)*8}"
)
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
print(
"Angular step of the final (combined) tomogram:"
f" {self.tomo_angle_range / total_projections:.3f} degrees"
@@ -5,9 +5,10 @@ import os
import time
from typing import TYPE_CHECKING
import h5py
import numpy as np
from bec_lib import bec_logger
from bec_lib.endpoints import MessageEndpoints
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
@@ -42,6 +43,7 @@ class XrayEyeAlign:
self.device_manager = client.device_manager
self.scans = client.scans
self.alignment_values = {}
self.alignment_images = []
# Deliberately NOT calling self.flomni.reset_correction() /
# reset_tomo_alignment_fit() here: XrayEyeAlign is constructed every
# time Flomni() is instantiated (i.e. every new client session, not
@@ -69,6 +71,14 @@ class XrayEyeAlign:
# fit point instead of triggering another height correction.
self._height_centered = False
def _save_alignment_data(self, file_path: str):
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with h5py.File(os.path.expanduser(file_path), "w") as f:
f.create_dataset(
"alignment_values", data=np.array(list(self.alignment_values.values()))
)
f.create_dataset("alignment_images", data=np.array(self.alignment_images))
def update_frame(self, keep_shutter_open=False):
if self.flomni._flomnigui_check_attribute_not_exists("xeyegui"):
self.flomni.flomnigui_show_xeyealign()
@@ -81,6 +91,8 @@ class XrayEyeAlign:
dev.fsh.fshopen()
time.sleep(1)
# store the image
self.alignment_images.append(dev.cam_xeye.get_last_image())
# stop live view
if not keep_shutter_open:
self.gui.on_live_view_enabled(False)
@@ -135,6 +147,7 @@ class XrayEyeAlign:
"This routine can be called with paramter keep_shutter_open=True to keep the shutter always open"
)
self.send_message("Getting things ready. Please wait...")
self.alignment_images = []
self.gui.enable_submit_button(False)
@@ -355,10 +368,14 @@ class XrayEyeAlign:
umv(dev.rtx, 0)
print("You are ready to remove the xray eye and start ptychography scans.")
print("Fine alignment: flomni.tomo_parameters() , then flomni.tomo_alignment_scan()")
print("After that, run the fit in Matlab and load the new fit flomni.read_alignment_offset()")
print(
"After that, run the fit in Matlab and load the new fit flomni.read_alignment_offset()"
)
def write_output(self):
file = os.path.expanduser("~/data/raw/logs/xrayeye_alignmentvalues")
file = os.path.expanduser("~/data/raw/logs/xrayeye_alignmentvalues/xrayeye_alignmentvalues")
timestamp = time.strftime("%Y%m%d_%H%M%S")
self._save_alignment_data(file + f"_image_data_{timestamp}.h5")
if not os.path.exists(file):
os.makedirs(os.path.dirname(file), exist_ok=True)
+2 -2
View File
@@ -7,7 +7,7 @@ from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError
from ophyd_devices.utils.controller import threadlocked
from ophyd_devices.utils.controller import retry_once, threadlocked
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected
from csaxs_bec.devices.omny.galil.galil_ophyd import (
@@ -19,7 +19,6 @@ from csaxs_bec.devices.omny.galil.galil_ophyd import (
GalilMotorResolution,
GalilSetpointSignal,
GalilSignalRO,
retry_once,
)
logger = bec_logger.logger
@@ -41,6 +40,7 @@ class FlomniGalilController(GalilController):
"all_axes_referenced",
"lights_off",
"lights_on",
"print_command_history",
]
def is_axis_moving(self, axis_Id, axis_Id_numeric) -> bool:
+1 -2
View File
@@ -7,7 +7,7 @@ from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError
from ophyd_devices.utils.controller import threadlocked
from ophyd_devices.utils.controller import retry_once, threadlocked
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected
from csaxs_bec.devices.omny.galil.galil_ophyd import (
@@ -19,7 +19,6 @@ from csaxs_bec.devices.omny.galil.galil_ophyd import (
GalilMotorResolution,
GalilSetpointSignal,
GalilSignalRO,
retry_once,
)
logger = bec_logger.logger
+3 -15
View File
@@ -8,7 +8,7 @@ from typing import Any
from bec_lib import bec_logger
from ophyd.utils import ReadOnlyError
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.controller import Controller, retry_once, threadlocked
from ophyd_devices.utils.socket import SocketSignal
from prettytable import PrettyTable
@@ -27,20 +27,6 @@ class BECConfigError(Exception):
pass
def retry_once(fcn):
"""Decorator to rerun a function in case a Galil communication error was raised. This may happen if the buffer was not empty."""
@functools.wraps(fcn)
def wrapper(self, *args, **kwargs):
try:
val = fcn(self, *args, **kwargs)
except (GalilCommunicationError, GalilError):
val = fcn(self, *args, **kwargs)
return val
return wrapper
class GalilController(Controller):
"""
Base class for Galil controllers. This class provides the basic functionality for Galil controllers and should be subclassed for specific devices.
@@ -59,6 +45,7 @@ class GalilController(Controller):
"is_motor_on",
"is_thread_active",
"all_axes_referenced",
"print_command_history",
]
OKBLUE = "\033[94m"
@@ -70,6 +57,7 @@ class GalilController(Controller):
@threadlocked
def socket_put(self, val: str) -> None:
self.command_history.append(f"[PUT]: {val}")
self.sock.put(f"{val}\r".encode())
@retry_once
+3 -6
View File
@@ -21,14 +21,10 @@ from ophyd import DynamicDeviceComponent as DDC
from ophyd import Kind
from ophyd.utils import ReadOnlyError
from ophyd_devices import PSIDeviceBase
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.controller import Controller, retry_once, threadlocked
from ophyd_devices.utils.socket import SocketIO
from csaxs_bec.devices.omny.galil.galil_ophyd import (
GalilCommunicationError,
GalilSignalBase,
retry_once,
)
from csaxs_bec.devices.omny.galil.galil_ophyd import GalilCommunicationError, GalilSignalBase
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
@@ -43,6 +39,7 @@ class GalilRIOController(Controller):
@threadlocked
def socket_put(self, val: str) -> None:
"""Socker put method."""
self.command_history.append(f"[PUT]: {val}")
self.sock.put(f"{val}\r".encode())
@retry_once
+19 -13
View File
@@ -7,7 +7,7 @@ from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError
from ophyd_devices.utils.controller import threadlocked
from ophyd_devices.utils.controller import retry_once, threadlocked
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected
from csaxs_bec.devices.omny.galil.galil_ophyd import (
@@ -17,7 +17,6 @@ from csaxs_bec.devices.omny.galil.galil_ophyd import (
GalilMotorIsMoving,
GalilSetpointSignal,
GalilSignalRO,
retry_once,
)
logger = bec_logger.logger
@@ -29,15 +28,19 @@ class GalilMotorResolution(GalilSignalRO):
def _socket_get(self):
if self.parent.axis_Id_numeric < 6:
return float(
self.controller.socket_put_and_receive(f"MG encpermm[{self.parent.axis_Id_numeric}]")
self.controller.socket_put_and_receive(
f"MG encpermm[{self.parent.axis_Id_numeric}]"
)
)
else:
return float(
self.controller.socket_put_and_receive(f"MG stppermm[{self.parent.axis_Id_numeric}]")
self.controller.socket_put_and_receive(
f"MG stppermm[{self.parent.axis_Id_numeric}]"
)
)
class LamniGalilController(GalilController):
class LamniGalilController(GalilController):
# ============================================================
# Error status
@@ -51,7 +54,7 @@ class LamniGalilController(GalilController):
0x10: "Cap1 exceeded allowed left-stop boundary during movement",
0x20: "Cap2 exceeded allowed left-stop boundary during movement (disabled in code)",
0x40: "Cap1 did not respond to test movement",
0x80: "Cap2 did not respond to test movement"
0x80: "Cap2 did not respond to test movement",
}
allaxrer_table = {
@@ -61,10 +64,9 @@ class LamniGalilController(GalilController):
4: "Pressure valve mismatch (OUT13=0 but IN13=1)",
5: "Capacitive sensor boundary violations (caperr > 0)",
6: "Emergency Stop triggered (IN[5]=0)",
7: "Following error detected on one or more axes"
7: "Following error detected on one or more axes",
}
USER_ACCESS = [
"describe",
"show_running_threads",
@@ -78,7 +80,8 @@ class LamniGalilController(GalilController):
"is_motor_on",
"all_axes_referenced",
"lamni_lights_off",
"lamni_lights_on"
"lamni_lights_on",
"print_command_history",
]
def show_status_other(self):
@@ -107,7 +110,9 @@ class LamniGalilController(GalilController):
if allaxref == 1:
print(f"Allaxref = 1, all OK.")
else:
print(f"Allaxref = {allaxref}. Not all axes are referenced or error introduced preventing motion.")
print(
f"Allaxref = {allaxref}. Not all axes are referenced or error introduced preventing motion."
)
allaxrer = int(float(self.socket_put_and_receive("MGallaxrer")))
print("\nallaxrer =", allaxrer)
print(self.decode_allaxrer(allaxrer))
@@ -143,7 +148,6 @@ class LamniGalilController(GalilController):
print("\n==================================\n")
def lamni_lights_off(self):
self.socket_put_confirmed("SB1")
@@ -170,14 +174,16 @@ class LamniGalilReadbackSignal(GalilSignalRO):
current_pos = float(self.controller.socket_put_and_receive(f"TP{self.parent.axis_Id}"))
current_pos *= self.parent.sign
encoder_resolution = self.parent.motor_resolution.get()
logger.info(f"Read galil encoder position of axis {self.parent.axis_Id_numeric} to be TP {current_pos} with resolution {encoder_resolution}")
logger.info(
f"Read galil encoder position of axis {self.parent.axis_Id_numeric} to be TP {current_pos} with resolution {encoder_resolution}"
)
return current_pos / encoder_resolution
else:
current_pos = float(self.controller.socket_put_and_receive(f"TD{self.parent.axis_Id}"))
current_pos *= self.parent.sign
step_mm = self.parent.motor_resolution.get()
return current_pos / step_mm
def read(self):
self._metadata["timestamp"] = time.time()
val = super().read()
+2 -17
View File
@@ -10,15 +10,13 @@ from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError
from ophyd_devices.utils.controller import threadlocked
from ophyd_devices.utils.controller import retry_once, threadlocked
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected
from csaxs_bec.devices.omny.galil.galil_ophyd import (
BECConfigError,
GalilAxesReferenced,
GalilCommunicationError,
GalilController,
GalilError,
GalilMotorIsMoving,
GalilSetpointSignal,
GalilSignalRO,
@@ -27,20 +25,6 @@ from csaxs_bec.devices.omny.galil.galil_ophyd import (
logger = bec_logger.logger
def retry_once(fcn):
"""Decorator to rerun a function in case a Galil communication error was raised. This may happen if the buffer was not empty."""
@functools.wraps(fcn)
def wrapper(self, *args, **kwargs):
try:
val = fcn(self, *args, **kwargs)
except (GalilCommunicationError, GalilError):
val = fcn(self, *args, **kwargs)
return val
return wrapper
class GalilMotorResolution(GalilSignalRO):
@retry_once
@threadlocked
@@ -130,6 +114,7 @@ class OMNYGalilController(GalilController):
"get_motor_limit_switch",
"is_motor_on",
"all_axes_referenced",
"print_command_history",
"_ogalil_switchsocket",
"_ogalil_switchsocket_switch_all_on",
"_ogalil_switchsocket_status",
+7 -17
View File
@@ -8,7 +8,7 @@ from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError, ReadOnlyError
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.controller import Controller, retry_once, threadlocked
from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected
from prettytable import PrettyTable
@@ -27,20 +27,6 @@ class BECConfigError(Exception):
pass
def retry_once(fcn):
"""Decorator to rerun a function in case a Galil communication error was raised. This may happen if the buffer was not empty."""
@functools.wraps(fcn)
def wrapper(self, *args, **kwargs):
try:
val = fcn(self, *args, **kwargs)
except (GalilCommunicationError, GalilError):
val = fcn(self, *args, **kwargs)
return val
return wrapper
class GalilController(Controller):
USER_ACCESS = [
"describe",
@@ -51,6 +37,7 @@ class GalilController(Controller):
"sgalil_reference",
"fly_grid_scan",
"read_encoder_position",
"print_command_history",
]
_axes_per_controller = 8
@@ -84,19 +71,22 @@ class GalilController(Controller):
@threadlocked
def socket_put(self, val: str) -> None:
time.sleep(0.01)
self.command_history.append(f"[PUT]: {val}")
self.sock.put(f"{val}\r".encode())
@threadlocked
def socket_get(self) -> str:
time.sleep(0.01)
return self.sock.receive().decode()
response = self.sock.receive().decode()
self.command_history.append(f"[GET]: {response}")
return response
@retry_once
@threadlocked
def socket_put_and_receive(self, val: str, remove_trailing_chars=True) -> str:
self.socket_put(val)
if remove_trailing_chars:
return self._remove_trailing_characters(self.sock.receive().decode())
return self._remove_trailing_characters(self.socket_get())
return self.socket_get()
@retry_once
+1 -2
View File
@@ -8,7 +8,7 @@ from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError
from ophyd_devices import AsyncMultiSignal, DeviceStatus, ProgressSignal
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.controller import Controller, retry_once, threadlocked
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected
from prettytable import PrettyTable
@@ -18,7 +18,6 @@ from csaxs_bec.devices.omny.rt.rt_ophyd import (
RtReadbackSignal,
RtSetpointSignal,
RtSignalRO,
retry_once,
)
logger = bec_logger.logger
+19 -32
View File
@@ -9,7 +9,7 @@ from ophyd import Device, DeviceStatus, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError, ReadOnlyError
from ophyd_devices.utils.bec_signals import AsyncMultiSignal, ProgressSignal
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.controller import Controller, retry_once, threadlocked
from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected
from prettytable import PrettyTable
@@ -30,20 +30,6 @@ class BECConfigError(Exception):
pass
def retry_once(fcn):
"""Decorator to rerun a function in case a CommunicationError was raised. This may happen if the buffer was not empty."""
@functools.wraps(fcn)
def wrapper(self, *args, **kwargs):
try:
val = fcn(self, *args, **kwargs)
except (RtLamniCommunicationError, RtLamniError):
val = fcn(self, *args, **kwargs)
return val
return wrapper
class RtLamniController(Controller):
"""
RT-Lamni controller class for all rt devices.
@@ -133,7 +119,7 @@ class RtLamniController(Controller):
@threadlocked
def _position_sampling_single_read(self):
(number_of_samples, sum0, sum0_2, sum1, sum1_2, sum2, sum2_2) = self.socket_put_and_receive(
number_of_samples, sum0, sum0_2, sum1, sum1_2, sum2, sum2_2 = self.socket_put_and_receive(
f"Sr"
).split(",")
avg_x = float(sum1) / int(number_of_samples)
@@ -187,7 +173,7 @@ class RtLamniController(Controller):
parent._min_scan_buffer_reached = False
for pos_index, pos in enumerate(positions):
cmd = f"s{pos[0]:.05f},{pos[1]:.05f},0"
parent.socket_put_and_receive(cmd)
parent.socket_put_and_receive(cmd)
if pos_index > 100:
parent._min_scan_buffer_reached = True
parent._min_scan_buffer_reached = True
@@ -224,7 +210,7 @@ class RtLamniController(Controller):
"Cannot start scan because feedback loop is not running or there is an interferometer error."
)
# here exception
(mode, number_of_positions_planned, current_position_in_scan) = self.get_scan_status()
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
if number_of_positions_planned == 0:
logger.error("Cannot start scan because no target positions are planned.")
@@ -241,24 +227,25 @@ class RtLamniController(Controller):
if self.feedback_is_running():
print("Loop is running, no error on interferometer.")
else:
print("Loop is not running, either it is turned off or an interferometer error occurred.")
print(
"Loop is not running, either it is turned off or an interferometer error occurred."
)
def show_analog_signals(self) -> dict:
self.socket_put("As") # start sampling
time.sleep(0.01)
return_table = (self.socket_put_and_receive("Ar")).split(",")
number_of_samples = int(float(return_table[0]))
signals = {
"number_of_samples": number_of_samples,
"piezo_0": float(return_table[1]),
"piezo_1": float(return_table[2]),
"cap_0": float(return_table[3]),
"cap_1": float(return_table[4]),
"cap_2": float(return_table[5]),
"cap_3": float(return_table[6]),
"cap_4": float(return_table[7]),
"number_of_samples": number_of_samples,
"piezo_0": float(return_table[1]),
"piezo_1": float(return_table[2]),
"cap_0": float(return_table[3]),
"cap_1": float(return_table[4]),
"cap_2": float(return_table[5]),
"cap_3": float(return_table[6]),
"cap_4": float(return_table[7]),
}
t = PrettyTable()
@@ -294,8 +281,8 @@ class RtLamniController(Controller):
t = PrettyTable()
t.title = "Interferometer signal strength"
t.field_names = ["Axis", "Description", "Value", "Running"]
t.add_row([0, "ST FZP horizontal", ssi_0, "-"])
t.add_row([1, "ST FZP vertical", ssi_1, "-"])
t.add_row([0, "ST FZP horizontal", ssi_0, "-"])
t.add_row([1, "ST FZP vertical", ssi_1, "-"])
t.add_row([2, "Angle interferometer", angle_signal, angle_running])
print(t)
@@ -318,7 +305,7 @@ class RtLamniController(Controller):
print(t)
print(f"Feedback loop running: {loop_status}")
return {"x": pos_x, "y": pos_y, "loop_running": loop_status}
return {"x": pos_x, "y": pos_y, "loop_running": loop_status}
def feedback_enable_with_reset(self):
if not self.feedback_status_angle_lamni():
+2 -3
View File
@@ -10,7 +10,7 @@ from ophyd import Device, DeviceStatus, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError
from ophyd_devices import AsyncMultiSignal, ProgressSignal
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.controller import Controller, retry_once, threadlocked
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected
from prettytable import PrettyTable
@@ -27,7 +27,6 @@ from csaxs_bec.devices.omny.rt.rt_ophyd import (
RtReadbackSignal,
RtSetpointSignal,
RtSignalRO,
retry_once,
)
logger = bec_logger.logger
@@ -853,7 +852,7 @@ class RtOMNYController(Controller):
" interferometer error."
)
# here exception
(mode, number_of_positions_planned, current_position_in_scan) = self.get_scan_status()
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
if number_of_positions_planned == 0:
logger.error("Cannot start scan because no target positions are planned.")
+2 -16
View File
@@ -1,5 +1,5 @@
"""
This module contains base signals for RT devices. Controller and motors are implemented in the
This module contains base signals for RT devices. Controller and motors are implemented in the
bespoke modules such as `rt_flomni_ophyd.py` or `rt_lamni_ophyd.py`.
"""
@@ -8,7 +8,7 @@ import time
from bec_lib import bec_logger
from ophyd.utils import ReadOnlyError
from ophyd_devices.utils.controller import ControllerCommunicationError, threadlocked
from ophyd_devices.utils.controller import ControllerCommunicationError, retry_once, threadlocked
from ophyd_devices.utils.socket import SocketSignal
logger = bec_logger.logger
@@ -26,20 +26,6 @@ class BECConfigError(Exception):
pass
def retry_once(fcn):
"""Decorator to rerun a function in case a CommunicationError was raised. This may happen if the buffer was not empty."""
@functools.wraps(fcn)
def wrapper(self, *args, **kwargs):
try:
val = fcn(self, *args, **kwargs)
except (RtCommunicationError, RtError):
val = fcn(self, *args, **kwargs)
return val
return wrapper
class RtSignalBase(SocketSignal):
def __init__(self, signal_name, **kwargs):
self.signal_name = signal_name
@@ -6,7 +6,7 @@ import os
import time
import numpy as np
from ophyd_devices.utils.controller import Controller, axis_checked, threadlocked
from ophyd_devices.utils.controller import Controller, axis_checked, retry_once, threadlocked
from prettytable import PrettyTable
from typeguard import typechecked
@@ -20,20 +20,6 @@ class SmaractCommunicationMode(enum.Enum):
ASYNC = 1
def retry_once(fcn):
"""Decorator to rerun a function in case a SmaractCommunicationError was raised. This may happen if the buffer was not empty."""
@functools.wraps(fcn)
def wrapper(self, *args, **kwargs):
try:
val = fcn(self, *args, **kwargs)
except (SmaractCommunicationError, SmaractErrorCode):
val = fcn(self, *args, **kwargs)
return val
return wrapper
class SmaractChannelStatus(enum.Enum):
STOPPED = 0
STEPPING = 1
@@ -82,6 +68,7 @@ class SmaractController(Controller):
"all_axes_referenced",
"set_closed_loop_move_speed",
"is_axis_moving",
"print_command_history",
]
def __init__(
@@ -113,6 +100,7 @@ class SmaractController(Controller):
@threadlocked
def socket_put(self, val: str):
self.command_history.append(f"[PUT]: {val}")
self.sock.put(f":{val}\n".encode())
@threadlocked