Compare commits

...

15 Commits

Author SHA1 Message Date
bc31c00e1f fix(tests): x_ray_eye_align correct imports fixed after refactor of LamNI
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m36s
CI for csaxs_bec / test (push) Successful in 1m36s
2026-02-23 13:25:09 +01:00
x01dc
38671f074e minor printout fix
Some checks failed
CI for csaxs_bec / test (pull_request) Failing after 1m30s
CI for csaxs_bec / test (push) Failing after 1m32s
2026-02-23 12:44:04 +01:00
x01dc
92e39a5f75 minor adjmustment 2026-02-23 12:35:56 +01:00
x01dc
22c48115a4 final refactor step prior testing
Some checks failed
CI for csaxs_bec / test (pull_request) Failing after 1m33s
CI for csaxs_bec / test (push) Failing after 1m30s
2026-02-21 13:30:24 +01:00
2a7448526b new files for refactor
Some checks failed
CI for csaxs_bec / test (push) Failing after 36s
2026-02-21 13:02:35 +01:00
x01dc
a5825307e5 refactor remove previous files
Some checks failed
CI for csaxs_bec / test (push) Failing after 35s
2026-02-21 13:00:48 +01:00
x01dc
54f1f42332 added tomo type 2 and 3, for golden ratio
Some checks failed
CI for csaxs_bec / test (pull_request) Successful in 1m37s
CI for csaxs_bec / test (push) Has been cancelled
2026-02-21 11:52:32 +01:00
x01dc
48df15f35c added rt controller commands to lamni namespace
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m38s
2026-02-21 11:38:34 +01:00
x01dc
6f60bd4b2b first commit, getting lamni running and adding some missing features in controller rt and galil
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m45s
2026-02-20 14:39:22 +01:00
5d97913956 refactor(galil-rio): Improve docstring and general integration
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m31s
CI for csaxs_bec / test (push) Successful in 1m32s
2026-02-16 17:48:35 +01:00
93384b87e0 fix(readback-timeout): fix monkeypatch for readback timeout; closes #140
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m29s
CI for csaxs_bec / test (pull_request) Failing after 1m29s
2026-02-16 17:38:10 +01:00
9a249363fd refactor(galil-rio): fix socket-signal cached readings 2026-02-16 17:38:10 +01:00
f925a7c1db fix(galilsignalbase): fix root access for controller from parent.
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m29s
CI for csaxs_bec / test (pull_request) Failing after 1m33s
2026-02-16 14:57:02 +01:00
5811e445fe tests: fix tests after refactoring 2026-02-16 14:57:02 +01:00
16ea7f410e feat(galil-rio): Add di_out channels to GalilRIO 2026-02-16 14:57:02 +01:00
16 changed files with 2147 additions and 1642 deletions

View File

@@ -1,2 +1,6 @@
from .load_additional_correction import lamni_read_additional_correction
from .x_ray_eye_align import DataDrivenLamNI, LamNI, MagLamNI, XrayEyeAlign
from .alignment import XrayEyeAlign
from .lamni import LamNI
from .lamni_optics_mixin import LamNIInitError, LaMNIInitStages, LamNIOpticsMixin
__all__ = [
"LamNI", "XrayEyeAlign", "LamNIInitError", "LaMNIInitStages", "LamNIOpticsMixin"
]

View File

@@ -0,0 +1,461 @@
import builtins
import time
from collections import defaultdict
import numpy as np
from bec_lib import bec_logger
from typeguard import typechecked
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen
logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
class XrayEyeAlign:
# pixel calibration, multiply to get mm
# PIXEL_CALIBRATION = 0.2/209 #.2 with binning
PIXEL_CALIBRATION = 0.2 / 218 # .2 with binning
def __init__(self, client, lamni) -> None:
self.client = client
self.lamni = lamni
self.device_manager = client.device_manager
self.scans = client.scans
self.alignment_values = defaultdict(list)
self._reset_init_values()
self.corr_pos_x = []
self.corr_pos_y = []
self.corr_angle = []
self.corr_pos_x_2 = []
self.corr_pos_y_2 = []
self.corr_angle_2 = []
# ------------------------------------------------------------------
# Correction reset
# ------------------------------------------------------------------
def reset_correction(self):
self.corr_pos_x = []
self.corr_pos_y = []
self.corr_angle = []
def reset_correction_2(self):
self.corr_pos_x_2 = []
self.corr_pos_y_2 = []
self.corr_angle_2 = []
def reset_xray_eye_correction(self):
self.client.delete_global_var("tomo_fit_xray_eye")
# ------------------------------------------------------------------
# FOV offset properties
# ------------------------------------------------------------------
@property
def tomo_fovx_offset(self):
val = self.client.get_global_var("tomo_fov_offset")
if val is None:
return 0.0
return val[0] / 1000
@tomo_fovx_offset.setter
@typechecked
def tomo_fovx_offset(self, val: float):
val_old = self.client.get_global_var("tomo_fov_offset")
if val_old is None:
val_old = [0.0, 0.0]
self.client.set_global_var("tomo_fov_offset", [val * 1000, val_old[1]])
@property
def tomo_fovy_offset(self):
val = self.client.get_global_var("tomo_fov_offset")
if val is None:
return 0.0
return val[1] / 1000
@tomo_fovy_offset.setter
@typechecked
def tomo_fovy_offset(self, val: float):
val_old = self.client.get_global_var("tomo_fov_offset")
if val_old is None:
val_old = [0.0, 0.0]
self.client.set_global_var("tomo_fov_offset", [val_old[0], val * 1000])
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _reset_init_values(self):
self.shift_xy = [0, 0]
self._xray_fov_xy = [0, 0]
def _disable_rt_feedback(self):
self.device_manager.devices.rtx.controller.feedback_disable()
def _enable_rt_feedback(self):
self.device_manager.devices.rtx.controller.feedback_enable_with_reset()
def tomo_rotate(self, val: float):
# pylint: disable=undefined-variable
umv(self.device_manager.devices.lsamrot, val)
def get_tomo_angle(self):
return self.device_manager.devices.lsamrot.readback.read()["lsamrot"]["value"]
# ------------------------------------------------------------------
# X-ray eye camera control
# ------------------------------------------------------------------
def save_frame(self):
epics_put("XOMNYI-XEYE-SAVFRAME:0", 1)
def update_frame(self):
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
# start live
epics_put("XOMNYI-XEYE-ACQ:0", 1)
# wait for start live
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
time.sleep(0.5)
print("waiting for live view to start...")
fshopen()
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
print("waiting for new frame...")
time.sleep(0.5)
time.sleep(0.5)
# stop live view
epics_put("XOMNYI-XEYE-ACQ:0", 0)
time.sleep(1)
print("got new frame")
def update_fov(self, k: int):
self._xray_fov_xy[0] = max(epics_get(f"XOMNYI-XEYE-XWIDTH_X:{k}"), self._xray_fov_xy[0])
self._xray_fov_xy[1] = max(0, self._xray_fov_xy[0])
@property
def movement_buttons_enabled(self):
return [epics_get("XOMNYI-XEYE-ENAMVX:0"), epics_get("XOMNYI-XEYE-ENAMVY:0")]
@movement_buttons_enabled.setter
def movement_buttons_enabled(self, enabled: bool):
enabled = int(enabled)
epics_put("XOMNYI-XEYE-ENAMVX:0", enabled)
epics_put("XOMNYI-XEYE-ENAMVY:0", enabled)
def send_message(self, msg: str):
epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg)
# ------------------------------------------------------------------
# Alignment procedure
# ------------------------------------------------------------------
def align(self):
self._reset_init_values()
self.reset_correction()
self.reset_correction_2()
self._disable_rt_feedback()
epics_put("XOMNYI-XEYE-PIXELSIZE:0", self.PIXEL_CALIBRATION)
self._enable_rt_feedback()
self.movement_buttons_enabled = False
epics_put("XOMNYI-XEYE-ACQ:0", 0)
self.send_message("please wait...")
epics_put("XOMNYI-XEYE-SAMPLENAME:0.DESC", "Let us LAMNI...")
self._disable_rt_feedback()
k = 0
self.lamni.lfzp_in()
self.update_frame()
self.movement_buttons_enabled = False
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
epics_put("XOMNYI-XEYE-STEP:0", 0)
self.send_message("Submit center value of FZP.")
while True:
if epics_get("XOMNYI-XEYE-SUBMIT:0") == 1:
val_x = epics_get(f"XOMNYI-XEYE-XVAL_X:{k}") * self.PIXEL_CALIBRATION # in mm
val_y = epics_get(f"XOMNYI-XEYE-YVAL_Y:{k}") * self.PIXEL_CALIBRATION # in mm
self.alignment_values[k] = [val_x, val_y]
print(
f"Clicked position {k}: x {self.alignment_values[k][0]}, y"
f" {self.alignment_values[k][1]}"
)
if k == 0: # received center value of FZP
self.send_message("please wait ...")
self.lamni.loptics_out()
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
self.movement_buttons_enabled = False
print("Moving sample in, FZP out")
self._disable_rt_feedback()
time.sleep(0.3)
self._enable_rt_feedback()
time.sleep(0.3)
self.update_frame()
self.send_message("Go and find the sample")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
self.movement_buttons_enabled = True
elif k == 1: # received sample center value at samrot 0
msg = (
f"Base shift values from movement are x {self.shift_xy[0]}, y"
f" {self.shift_xy[1]}"
)
print(msg)
logger.info(msg)
self.shift_xy[0] += (
self.alignment_values[0][0] - self.alignment_values[1][0]
) * 1000
self.shift_xy[1] += (
self.alignment_values[1][1] - self.alignment_values[0][1]
) * 1000
print(
"Base shift values from movement and clicked position are x"
f" {self.shift_xy[0]}, y {self.shift_xy[1]}"
)
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
self.send_message("please wait ...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
self.movement_buttons_enabled = False
time.sleep(1)
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
self.update_frame()
self.send_message("Submit sample center and FOV (0 deg)")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
self.update_fov(k)
elif 1 < k < 10: # received sample center value at samrot 0 ... 315
self.send_message("please wait ...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
self._disable_rt_feedback()
self.tomo_rotate((k - 1) * 45 - 45 / 2)
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
self._disable_rt_feedback()
self.tomo_rotate((k - 1) * 45)
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
self.update_frame()
self.send_message("Submit sample center")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
epics_put("XOMNYI-XEYE-ENAMVX:0", 1)
self.update_fov(k)
elif k == 10: # received sample center value at samrot 270, done
self.send_message("done...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
self.movement_buttons_enabled = False
self.update_fov(k)
break
k += 1
epics_put("XOMNYI-XEYE-STEP:0", k)
if k < 2:
_xrayeyalignmvx = epics_get("XOMNYI-XEYE-MVX:0")
_xrayeyalignmvy = epics_get("XOMNYI-XEYE-MVY:0")
if _xrayeyalignmvx != 0 or _xrayeyalignmvy != 0:
self.shift_xy[0] = self.shift_xy[0] + _xrayeyalignmvx
self.shift_xy[1] = self.shift_xy[1] + _xrayeyalignmvy
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
print(
f"Current center horizontal {self.shift_xy[0]} vertical {self.shift_xy[1]}"
)
epics_put("XOMNYI-XEYE-MVY:0", 0)
epics_put("XOMNYI-XEYE-MVX:0", 0)
self.update_frame()
time.sleep(0.2)
self.write_output()
fovx = self._xray_fov_xy[0] * self.PIXEL_CALIBRATION * 1000 / 2
fovy = self._xray_fov_xy[1] * self.PIXEL_CALIBRATION * 1000 / 2
print(
f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns,"
f" fovy = {fovy:.0f} microns"
)
print("Use matlab routine to fit the current alignment...")
print(
"This additional shift is applied to the base shift values\n which are x"
f" {self.shift_xy[0]}, y {self.shift_xy[1]}"
)
self._disable_rt_feedback()
self.tomo_rotate(0)
print(
"\n\nNEXT LOAD ALIGNMENT PARAMETERS\nby running"
" lamni.align.read_xray_eye_correction()\n"
)
self.client.set_global_var("tomo_fov_offset", self.shift_xy)
# ------------------------------------------------------------------
# Alignment output
# ------------------------------------------------------------------
def write_output(self):
import os
with open(
os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues"), "w"
) as alignment_values_file:
alignment_values_file.write("angle\thorizontal\tvertical\n")
for k in range(2, 11):
fovx_offset = (self.alignment_values[0][0] - self.alignment_values[k][0]) * 1000
fovy_offset = (self.alignment_values[k][1] - self.alignment_values[0][1]) * 1000
print(
f"Writing to file new alignment: number {k}, value x {fovx_offset}, y"
f" {fovy_offset}"
)
alignment_values_file.write(f"{(k-2)*45}\t{fovx_offset}\t{fovy_offset}\n")
# ------------------------------------------------------------------
# X-ray eye sinusoidal correction (loaded from MATLAB fit files)
# ------------------------------------------------------------------
def read_xray_eye_correction(self, dir_path=None):
import os
if dir_path is None:
dir_path = os.path.expanduser("~/Data10/specES1/internal/")
tomo_fit_xray_eye = np.zeros((2, 3))
for i, axis in enumerate(["x", "y"]):
for j, coeff in enumerate(["A", "B", "C"]):
with open(os.path.join(dir_path, f"ptychotomoalign_{coeff}{axis}.txt"), "r") as f:
tomo_fit_xray_eye[i][j] = f.readline()
self.client.set_global_var("tomo_fit_xray_eye", tomo_fit_xray_eye.tolist())
# x amp, phase, offset, y amp, phase, offset
# 0 0 0 1 0 2 1 0 1 1 1 2
print("New alignment parameters loaded from X-ray eye")
print(
f"X Amplitude {tomo_fit_xray_eye[0][0]}, "
f"X Phase {tomo_fit_xray_eye[0][1]}, "
f"X Offset {tomo_fit_xray_eye[0][2]}, "
f"Y Amplitude {tomo_fit_xray_eye[1][0]}, "
f"Y Phase {tomo_fit_xray_eye[1][1]}, "
f"Y Offset {tomo_fit_xray_eye[1][2]}"
)
def lamni_compute_additional_correction_xeye_mu(self, angle):
"""Compute sinusoidal correction from the X-ray eye fit for the given angle."""
tomo_fit_xray_eye = self.client.get_global_var("tomo_fit_xray_eye")
if tomo_fit_xray_eye is None:
print("Not applying any additional correction. No x-ray eye data available.\n")
return (0, 0)
# x amp, phase, offset, y amp, phase, offset
# 0 0 0 1 0 2 1 0 1 1 1 2
correction_x = (
tomo_fit_xray_eye[0][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[0][1])
+ tomo_fit_xray_eye[0][2]
) / 1000
correction_y = (
tomo_fit_xray_eye[1][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[1][1])
+ tomo_fit_xray_eye[1][2]
) / 1000
print(f"Xeye correction x {correction_x}, y {correction_y} for angle {angle}\n")
return (correction_x, correction_y)
# ------------------------------------------------------------------
# Additional lookup-table corrections (iteration 1 and 2)
# ------------------------------------------------------------------
def read_additional_correction(self, correction_file: str):
self.corr_pos_x, self.corr_pos_y, self.corr_angle = self._read_correction_file_xy(
correction_file
)
def read_additional_correction_2(self, correction_file: str):
self.corr_pos_x_2, self.corr_pos_y_2, self.corr_angle_2 = self._read_correction_file_xy(
correction_file
)
def _read_correction_file_xy(self, correction_file: str):
"""Parse a correction file that contains corr_pos_x, corr_pos_y and corr_angle entries."""
with open(correction_file, "r") as f:
num_elements = f.readline()
int_num_elements = int(num_elements.split(" ")[2])
print(int_num_elements)
corr_pos_x = []
corr_pos_y = []
corr_angle = []
for j in range(0, int_num_elements * 3):
line = f.readline()
value = line.split(" ")[2]
name = line.split(" ")[0].split("[")[0]
if name == "corr_pos_x":
corr_pos_x.append(float(value) / 1000)
elif name == "corr_pos_y":
corr_pos_y.append(float(value) / 1000)
elif name == "corr_angle":
corr_angle.append(float(value))
return corr_pos_x, corr_pos_y, corr_angle
def compute_additional_correction(self, angle):
return self._compute_correction_xy(
angle, self.corr_pos_x, self.corr_pos_y, self.corr_angle, label="1"
)
def compute_additional_correction_2(self, angle):
return self._compute_correction_xy(
angle, self.corr_pos_x_2, self.corr_pos_y_2, self.corr_angle_2, label="2"
)
def _compute_correction_xy(self, angle, corr_pos_x, corr_pos_y, corr_angle, label=""):
"""Find the correction for the closest angle in the lookup table."""
if not corr_pos_x:
print(f"Not applying additional correction {label}. No data available.\n")
return (0, 0)
shift_x = corr_pos_x[0]
shift_y = corr_pos_y[0]
angledelta = np.fabs(corr_angle[0] - angle)
for j in range(1, len(corr_pos_x)):
newangledelta = np.fabs(corr_angle[j] - angle)
if newangledelta < angledelta:
shift_x = corr_pos_x[j]
shift_y = corr_pos_y[j]
angledelta = newangledelta
if shift_x == 0 and angle < corr_angle[0]:
shift_x = corr_pos_x[0]
shift_y = corr_pos_y[0]
if shift_x == 0 and angle > corr_angle[-1]:
shift_x = corr_pos_x[-1]
shift_y = corr_pos_y[-1]
print(f"Additional correction shifts {label}: {shift_x} {shift_y}")
return (shift_x, shift_y)

View File

@@ -0,0 +1,211 @@
"""
extra_tomo.py
=============
Specialist LamNI subclasses for specific experimental configurations.
Import explicitly when needed, e.g.:
from csaxs_bec...extra_tomo import MagLamNI
from csaxs_bec...extra_tomo import DataDrivenLamNI
"""
import builtins
import datetime
import os
import time
import h5py
import numpy as np
from bec_lib import bec_logger
from bec_lib.alarm_handler import AlarmBase
from .lamni import LamNI
logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
scans = builtins.__dict__.get("scans")
class MagLamNI(LamNI):
"""LamNI subclass for magnetic experiments (XMCD).
Adds a slow rotation helper and allows injection of a custom
per-angle callback via the ``lamni_at_each_angle`` builtin.
"""
def sub_tomo_scan(self, subtomo_number, start_angle=None):
super().sub_tomo_scan(subtomo_number, start_angle)
# self.rotate_slowly(0)
def rotate_slowly(self, angle, step_size=20):
"""Rotate to target angle in small steps to avoid mechanical stress."""
current_angle = dev.lsamrot.read(cached=True)["value"]
steps = int(np.ceil(np.abs(current_angle - angle) / step_size)) + 1
for target_angle in np.linspace(current_angle, angle, steps, endpoint=True):
umv(dev.lsamrot, target_angle)
scans.lamni_move_to_scan_center(
self.align.tomo_fovx_offset, self.align.tomo_fovy_offset, target_angle
)
def _at_each_angle(self, angle: float) -> None:
if "lamni_at_each_angle" in builtins.__dict__:
# pylint: disable=undefined-variable
lamni_at_each_angle(self, angle)
return
self.tomo_scan_projection(angle)
self.tomo_reconstruct()
class DataDrivenLamNI(LamNI):
"""LamNI subclass that reads per-projection scan parameters from an HDF5 file.
Instead of a fixed FOV and step size for the whole tomogram, each
projection can have individual values for step size, loptz position
and lateral shifts, as specified in a datadriven_params.h5 file.
"""
def __init__(self, client):
super().__init__(client)
self.tomo_data = {}
def tomo_scan(
self,
subtomo_start=1,
start_index=None,
fname="~/Data10/data_driven_config/datadriven_params.h5",
):
"""Start a data-driven tomo scan.
Args:
subtomo_start (int): Unused; kept for API compatibility. Use start_index to resume.
start_index (int, optional): Skip projections before this index. Defaults to None.
fname (str): Path to the HDF5 parameter file. Defaults to the standard location.
"""
bec = builtins.__dict__.get("bec")
scans = builtins.__dict__.get("scans")
fname = os.path.expanduser(fname)
if not os.path.exists(fname):
raise FileNotFoundError(f"Could not find datadriven params file in {fname}.")
content = f"Loading tomo parameters from {fname}."
logger.warning(content)
msg = bec.logbook.LogbookMessage()
msg.add_text(content).add_tag(["Data_driven_file", "BEC"])
self.client.logbook.send_logbook_message(msg)
self._update_tomo_data_from_file(fname)
self._current_special_angles = self.special_angles.copy()
if subtomo_start == 1 and start_index is None:
self.tomo_id = self.add_sample_database(
self.sample_name,
str(datetime.date.today()),
bec.active_account.decode(),
bec.queue.next_scan_number,
"lamni",
"test additional info",
"BEC",
)
self.write_pdf_report()
with scans.dataset_id_on_hold:
self.sub_tomo_data_driven(start_index)
def sub_tomo_scan(self, subtomo_number=None, start_angle=None):
raise NotImplementedError(
"Cannot run sub_tomo_scan with DataDrivenLamNI. "
"Use lamni.tomo_scan(start_index=<N>) to resume instead."
)
def _at_each_angle(
self, angle=None, stepsize=None, loptz_pos=None, manual_shift_x=0, manual_shift_y=0
):
self.manual_shift_x = manual_shift_x
self.manual_shift_y = manual_shift_y
self.tomo_shellstep = stepsize
if loptz_pos is not None:
dev.rtx.controller.feedback_disable()
umv(dev.loptz, loptz_pos)
super()._at_each_angle(angle=angle)
def sub_tomo_data_driven(self, start_index=None):
"""Iterate over all projections defined in the loaded HDF5 parameter file."""
for scan_index, scan_data in enumerate(zip(*self.tomo_data.values())):
if start_index and scan_index < start_index:
continue
(
angle,
stepsize,
loptz_pos,
propagation_distance,
manual_shift_x,
manual_shift_y,
subtomo_number,
) = scan_data
bec.metadata.update(
{key: float(val) for key, val in zip(self.tomo_data.keys(), scan_data)}
)
successful = False
error_caught = False
if 0 <= angle < 360.05:
print(f"Starting LamNI scan for angle {angle}")
while not successful:
self._start_beam_check()
if not self.special_angles:
self._current_special_angles = []
if self._current_special_angles:
next_special_angle = self._current_special_angles[0]
if np.isclose(angle, next_special_angle, atol=0.5):
self._current_special_angles.pop(0)
num_repeats = self.special_angle_repeats
else:
num_repeats = 1
try:
start_scan_number = bec.queue.next_scan_number
for i in range(num_repeats):
self._at_each_angle(
float(angle),
stepsize=float(stepsize),
loptz_pos=float(loptz_pos),
manual_shift_x=float(manual_shift_x),
manual_shift_y=float(manual_shift_y),
)
error_caught = False
except AlarmBase as exc:
if exc.alarm_type == "TimeoutError":
bec.queue.request_queue_reset()
time.sleep(2)
error_caught = True
else:
raise exc
if self._was_beam_okay() and not error_caught:
successful = True
else:
self._wait_for_beamline_checks()
end_scan_number = bec.queue.next_scan_number
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
def _update_tomo_data_from_file(self, fname: str) -> None:
"""Load projection parameters from the HDF5 file into self.tomo_data."""
with h5py.File(fname, "r") as file:
self.tomo_data["theta"] = np.array([*file["theta"]]).flatten()
self.tomo_data["stepsize"] = np.array([*file["stepsize"]]).flatten()
self.tomo_data["loptz"] = np.array([*file["loptz"]]).flatten()
self.tomo_data["propagation_distance"] = np.array(
[*file["relative_propagation_distance"]]
).flatten()
self.tomo_data["manual_shift_x"] = np.array([*file["manual_shift_x"]]).flatten()
self.tomo_data["manual_shift_y"] = np.array([*file["manual_shift_y"]]).flatten()
self.tomo_data["subtomo_id"] = np.array([*file["subtomo_id"]]).flatten()
shapes = [data.shape for data in self.tomo_data.values()]
if len(set(shapes)) > 1:
raise ValueError(f"Tomo data file has entries of inconsistent lengths: {shapes}.")

File diff suppressed because it is too large Load Diff

View File

@@ -6,8 +6,8 @@ from rich.console import Console
from rich.table import Table
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_put, fshclose
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
# import builtins to avoid linter errors
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
bec = builtins.__dict__.get("bec")
@@ -17,26 +17,31 @@ class LamNIInitError(Exception):
pass
class LaMNIInitStagesMixin:
class LaMNIInitStages:
"""Handles hardware initialization and referencing of LamNI stages."""
def __init__(self, client):
super().__init__()
self.client = client
self.OMNYTools = OMNYTools(self.client)
def lamni_init_stages(self):
user_input = input("Starting initialization of LamNI stages. OK? [y/n]")
if user_input == "y":
print("staring...")
if self.OMNYTools.yesno("Start initialization of LamNI stages. OK?"):
print("starting...")
dev.lsamrot.enabled = True
else:
return
if self.check_all_axes_of_lamni_referenced():
user_input = input("Continue anyways? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("All axes are referenced. Continue anyways?"):
print("ok then...")
else:
return
axis_id_lsamrot = dev.lsamrot._config["deviceConfig"].get("axis_Id")
if dev.lsamrot.controller.get_motor_limit_switch(axis_id_lsamrot)[1] == False:
user_input = input("The rotation stage will be moved to one limit [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("The rotation stage will be moved to one limit"):
print("starting...")
else:
return
@@ -47,10 +52,9 @@ class LaMNIInitStagesMixin:
print("The controller will be disabled in bec. To enable dev.lsamrot.enabled=True")
return
user_input = input(
"Init of loptz. Can the stage move to the upstream limit without collision?? [y/n]"
)
if user_input == "y":
if self.OMNYTools.yesno(
"Init of loptz. Can the stage move to the upstream limit without collision?"
):
print("ok then...")
else:
return
@@ -75,14 +79,14 @@ class LaMNIInitStagesMixin:
self.drive_axis_to_limit(dev.lsamy, "reverse")
self.find_reference_mark(dev.lsamy)
# the dual encoder requires the reference mark to pass on both encoders
print("Referencing lsamrot")
self.drive_axis_to_limit(dev.lsamrot, "reverse")
time.sleep(0.1)
self.find_reference_mark(dev.lsamrot)
user_input = input("Init of leye. Can the stage move to -x limit without collision? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno(
"Init of leye. Can the stage move to -x limit without collision?"
):
print("starting...")
else:
return
@@ -92,15 +96,6 @@ class LaMNIInitStagesMixin:
print("Referencing leyey")
self.drive_axis_to_limit(dev.leyey, "forward")
# set_lm lsamx 6 14
# set_lm lsamy 6 14
# set_lm lsamrot -3 362
# set_lm loptx -1 -0.2
# set_lm lopty 3.0 3.6
# set_lm loptz 82 87
# set_lm leyex 0 25
# set_lm leyey 0.5 50
print("Init of Smaract stages")
dev.losax.controller.find_reference_mark(2, 0, 1000, 1)
time.sleep(1)
@@ -108,15 +103,6 @@ class LaMNIInitStagesMixin:
time.sleep(1)
dev.losax.controller.find_reference_mark(1, 0, 1000, 1)
time.sleep(1)
# dev.losax.controller.find_reference_mark(3, 1, 1000, 1)
# time.sleep(1)
# dev.losax.controller.find_reference_mark(4, 1, 1000, 1)
# time.sleep(1)
# set_lm losax -1.5 0.25
# set_lm losay -2.5 4.1
# set_lm losaz -4.1 -0.5
# set_lm lcsy -1.5 5
self._align_setup()
@@ -134,8 +120,7 @@ class LaMNIInitStagesMixin:
return ord(axis_id.lower()) - 97
def _align_setup(self):
user_input = input("Start moving stages to default initial positions? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Start moving stages to default initial positions?"):
print("Start moving stages...")
else:
print("Stopping.")
@@ -174,6 +159,8 @@ class LaMNIInitStagesMixin:
class LamNIOpticsMixin:
"""Optics movement methods: FZP, OSA, central stop and X-ray eye."""
@staticmethod
def _get_user_param_safe(device, var):
param = dev[device].user_parameter
@@ -188,13 +175,11 @@ class LamNIOpticsMixin:
umv(dev.leyey, leyey_out)
epics_put("XOMNYI-XEYE-ACQ:0", 2)
# move rotation stage to zero to avoid problems with wires
umv(dev.lsamrot, 0)
umv(dev.dttrz, 5854, dev.fttrz, 2395)
def leye_in(self):
bec.queue.next_dataset_number += 1
# move rotation stage to zero to avoid problems with wires
umv(dev.lsamrot, 0)
umv(dev.dttrz, 6419.677, dev.fttrz, 2959.979)
while True:
@@ -211,15 +196,10 @@ class LamNIOpticsMixin:
def _lfzp_in(self):
loptx_in = self._get_user_param_safe("loptx", "in")
lopty_in = self._get_user_param_safe("lopty", "in")
umv(
dev.loptx, loptx_in, dev.lopty, lopty_in
) # for 7.2567 keV and 150 mu, 60 nm fzp, loptz 83.6000 for propagation 1.4 mm
umv(dev.loptx, loptx_in, dev.lopty, lopty_in)
def lfzp_in(self):
"""
move in the lamni zone plate.
This will disable rt feedback, move the FZP and re-enabled the feedback.
"""
"""Move in the LamNI zone plate, disabling/re-enabling RT feedback around the move."""
if "rtx" in dev and dev.rtx.enabled:
dev.rtx.controller.feedback_disable()
@@ -229,18 +209,15 @@ class LamNIOpticsMixin:
dev.rtx.controller.feedback_enable_with_reset()
def loptics_in(self):
"""
Move in the lamni optics, including the FZP and the OSA.
"""
"""Move in the LamNI optics (FZP + OSA)."""
self.lfzp_in()
self.losa_in()
def loptics_out(self):
"""Move out the lamni optics"""
"""Move out the LamNI optics."""
if "rtx" in dev and dev.rtx.enabled:
dev.rtx.controller.feedback_disable()
# self.lcs_out()
self.losa_out()
loptx_out = self._get_user_param_safe("loptx", "out")
lopty_out = self._get_user_param_safe("lopty", "out")
@@ -251,28 +228,17 @@ class LamNIOpticsMixin:
dev.rtx.controller.feedback_enable_with_reset()
def lcs_in(self):
# umv lcsx -1.852 lcsy -0.095
pass
def lcs_out(self):
umv(dev.lcsy, 3)
def losa_in(self):
# 6.2 keV, 170 um FZP
# umv(dev.losax, -1.4450000, dev.losay, -0.1800)
# umv(dev.losaz, -1)
# 6.7, 170
# umv(dev.losax, -1.4850, dev.losay, -0.1930)
# umv(dev.losaz, 1.0000)
# 7.2, 150
losax_in = self._get_user_param_safe("losax", "in")
losay_in = self._get_user_param_safe("losay", "in")
losaz_in = self._get_user_param_safe("losaz", "in")
umv(dev.losax, losax_in, dev.losay, losay_in)
umv(dev.losaz, losaz_in)
# 11 kev
# umv(dev.losax, -1.161000, dev.losay, -0.196)
# umv(dev.losaz, 1.0000)
def losa_out(self):
losay_out = self._get_user_param_safe("losay", "out")
@@ -281,11 +247,10 @@ class LamNIOpticsMixin:
umv(dev.losay, losay_out)
def lfzp_info(self, mokev_val=-1):
if mokev_val == -1:
try:
mokev_val = dev.mokev.readback.get()
except:
except Exception:
print(
"Device mokev does not exist. You can specify the energy in keV as an argument instead."
)
@@ -320,10 +285,6 @@ class LamNIOpticsMixin:
)
console.print(table)
print("OSA Information:")
# print(f"Current losaz %.1f\n", A[losaz])
# print("The OSA will collide with the sample plane at %.1f\n\n", 89.3-A[loptz])
print(
"The numbers presented here are for a sample in the plane of the lamni sample holder.\n"
)

View File

@@ -1,22 +0,0 @@
def lamni_read_additional_correction():
# "additional_correction_shift"
# [0][] x , [1][] y, [2][] angle, [3][0] number of elements
with open("correction_lamni_um_S01405_.txt", "r") as f:
num_elements = f.readline()
int_num_elements = int(num_elements.split(" ")[2])
print(int_num_elements)
corr_pos_x = []
corr_pos_y = []
corr_angle = []
for j in range(0, int_num_elements * 3):
line = f.readline()
value = line.split(" ")[2]
name = line.split(" ")[0].split("[")[0]
if name == "corr_pos_x":
corr_pos_x.append(value)
elif name == "corr_pos_y":
corr_pos_y.append(value)
elif name == "corr_angle":
corr_angle.append(value)
return (corr_pos_x, corr_pos_y, corr_angle, num_elements)

File diff suppressed because it is too large Load Diff

View File

@@ -35,32 +35,32 @@ class FlomniInitError(Exception):
class FlomniError(Exception):
pass
class FlomniTools:
def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
if autoconfirm and default == "y":
self.printgreen(message + " Automatically confirming default: yes")
return True
elif autoconfirm and default == "n":
self.printgreen(message + " Automatically confirming default: no")
return False
if default == "y":
message_ending = " [Y]/n? "
elif default == "n":
message_ending = " y/[N]? "
else:
message_ending = " y/n? "
while True:
user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
if (
user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
) or (default == "y" and user_input == ""):
return True
if (
user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
) or (default == "n" and user_input == ""):
return False
else:
print("Please expicitely confirm y or n.")
# class FlomniTools:
# def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
# if autoconfirm and default == "y":
# self.printgreen(message + " Automatically confirming default: yes")
# return True
# elif autoconfirm and default == "n":
# self.printgreen(message + " Automatically confirming default: no")
# return False
# if default == "y":
# message_ending = " [Y]/n? "
# elif default == "n":
# message_ending = " y/[N]? "
# else:
# message_ending = " y/n? "
# while True:
# user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
# if (
# user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
# ) or (default == "y" and user_input == ""):
# return True
# if (
# user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
# ) or (default == "n" and user_input == ""):
# return False
# else:
# print("Please expicitely confirm y or n.")

View File

@@ -4,6 +4,7 @@ This module contains the base class for Galil controllers as well as the signals
import functools
import time
from typing import Any
from bec_lib import bec_logger
from ophyd.utils import ReadOnlyError
@@ -347,7 +348,7 @@ class GalilSignalBase(SocketSignal):
def __init__(self, signal_name, **kwargs):
self.signal_name = signal_name
super().__init__(**kwargs)
self.controller = self.parent.controller
self.controller = self.root.controller if hasattr(self.root, "controller") else None
class GalilSignalRO(GalilSignalBase):

View File

@@ -6,24 +6,26 @@ Link to the Galil RIO vendor page:
https://www.galil.com/plcs/remote-io/rio-471xx
This module provides the GalilRIOController for communication with the RIO controller
over TCP/IP. It also provides a device integration that interfaces to these
8 analog channels.
over TCP/IP. It also provides a device integration that interfaces to its
8 analog channels, and 16 digital output channels. Some PLCs may have 24 digital output channels,
which can be easily supported by changing the _NUM_DIGITAL_OUTPUT_CHANNELS variable.
"""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Literal
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DynamicDeviceComponent as DDC
from ophyd import Kind
from ophyd.utils import ReadOnlyError
from ophyd_devices import PSIDeviceBase
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO
from csaxs_bec.devices.omny.galil.galil_ophyd import (
GalilCommunicationError,
GalilSignalRO,
GalilSignalBase,
retry_once,
)
@@ -35,15 +37,11 @@ logger = bec_logger.logger
class GalilRIOController(Controller):
"""
Controller Class for Galil RIO controller communication.
Multiple controllers are in use at the cSAXS beamline:
- 129.129.98.64 (port 23)
"""
"""Controller Class for Galil RIO controller communication."""
@threadlocked
def socket_put(self, val: str) -> None:
"""Socker put method."""
self.sock.put(f"{val}\r".encode())
@retry_once
@@ -64,21 +62,95 @@ class GalilRIOController(Controller):
)
class GalilRIOSignalRO(GalilSignalRO):
class GalilRIOAnalogSignalRO(GalilSignalBase):
"""
Read-only Signal for reading a single analog input channel from the Galil RIO controller.
It always read all 8 analog channels at once, and updates the reabacks of all channels.
New readbacks are only fetched from the controller if the last readback is older than
_READ_TIMEOUT seconds, otherwise the last cached readback is returned to reduce network traffic.
Signal for reading analog input channels of the Galil RIO controller. This signal is read-only, so
the set method raises a ReadOnlyError. The get method retrieves the values of all analog
channels in a single socket command. The readback values of all channels are updated based
on the response, and subscriptions are run for all channels. Readings are cached as implemented
in the SocketSignal class, so that multiple reads of the same channel within an update cycle do
not result in multiple socket calls.
Args:
signal_name (str): Name of the signal.
channel (int): Analog channel number (0-7).
parent (GalilRIO): Parent GalilRIO device.
signal_name (str): The name of the signal, e.g. "ch0", "ch1", ..., "ch7"
channel (int): The channel number corresponding to the signal, e.g. 0 for "ch0", 1 for "ch1", ...
parent (GalilRIO): The parent device instance that this signal belongs to.
"""
_NUM_ANALOG_CHANNELS = 8
_READ_TIMEOUT = 0.1 # seconds
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
super().__init__(signal_name=signal_name, parent=parent, **kwargs)
self._channel = channel
self._metadata["connected"] = False
self._metadata["write_access"] = False
def _socket_set(self, val):
"""Read-only signal, so set method raises an error."""
raise ReadOnlyError(f"Signal {self.name} is read-only.")
def _socket_get(self) -> float:
"""Get command for the readback signal"""
cmd = "MG@" + ", @".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)])
ret = self.controller.socket_put_and_receive(cmd)
values = [float(val) for val in ret.strip().split(" ")]
# Run updates for all channels. This also updates the _readback and metadata timestamp
# value of this channel.
self._update_all_channels(values)
return self._readback
# pylint: disable=protected-access
def _update_all_channels(self, values: list[float]) -> None:
"""
Method to receive a list of readback values for channels 0 to 7. Updates for each channel idx
are applied to the corresponding GalilRIOAnalogSignalRO signal with matching attr_name "ch{idx}".
We also update the _last_readback attribute of each of the signals, to avoid multiple socket calls,
but rather use the cached value of the combined reading for all channels.
Args:
values (list[float]): List of new readback values for all channels, where the
index corresponds to the channel number (0-7).
"""
updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val)
# Update all readbacks first
for walk in self.parent.walk_signals():
if isinstance(walk.item, GalilRIOAnalogSignalRO):
idx = int(walk.item.attr_name[-1])
if 0 <= idx < len(values):
old_val = walk.item._readback
new_val = values[idx]
walk.item._metadata["timestamp"] = self._last_readback
walk.item._last_readback = self._last_readback
walk.item._readback = new_val
if (
idx != self._channel
): # Only run subscriptions on other channels, not on itself
# as this is handled by the SocketSignal and we want to avoid running multiple
# subscriptions for the same channel update
updates[walk.item.attr_name] = (new_val, old_val)
else:
logger.warning(
f"Received {len(values)} values but found channel index {idx} in signal {walk.item.name}. Skipping update for this signal."
)
# Run subscriptions after all readbacks have been updated
# on all channels except the one that triggered the update
for walk in self.parent.walk_signals():
if walk.item.attr_name in updates:
new_val, old_val = updates[walk.item.attr_name]
walk.item._run_subs(
sub_type=walk.item.SUB_VALUE,
old_value=old_val,
value=new_val,
timestamp=self._last_readback,
)
class GalilRIODigitalOutSignal(GalilSignalBase):
"""Signal for controlling digital outputs of the Galil RIO controller."""
_NUM_DIGITAL_OUTPUT_CHANNELS = 16
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
super().__init__(signal_name, parent=parent, **kwargs)
@@ -87,81 +159,83 @@ class GalilRIOSignalRO(GalilSignalRO):
def _socket_get(self) -> float:
"""Get command for the readback signal"""
cmd = "MG@" + ",@".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)])
cmd = f"MG@OUT[{self._channel}]"
ret = self.controller.socket_put_and_receive(cmd)
values = [float(val) for val in ret.strip().split(" ")]
# This updates all channels' readbacks, including self._readback
self._update_all_channels(values)
self._readback = float(ret.strip())
return self._readback
def get(self):
"""Get current analog channel values from the Galil RIO controller."""
# If the last readback has happend more than _READ_TIMEOUT seconds ago, read all channels again
if time.monotonic() - self.parent.last_readback > self._READ_TIMEOUT:
self._readback = self._socket_get()
return self._readback
def _socket_set(self, val: Literal[0, 1]) -> None:
"""Set command for the digital output signal. Value should be 0 or 1."""
# pylint: disable=protected-access
def _update_all_channels(self, values: list[float]) -> None:
"""
Update all analog channel readbacks based on the provided list of values.
List of values must be in order from an_ch0 to an_ch7.
if val not in (0, 1):
raise ValueError("Digital output value must be 0 or 1.")
cmd = f"SB{self._channel}" if val == 1 else f"CB{self._channel}"
self.controller.socket_put_confirmed(cmd)
We first have to update the _last_readback timestamp of the GalilRIO parent device.
Then we update all readbacks of all an_ch channels, before we run any subscriptions.
This ensures that all readbacks are updated before any subscriptions are run, which
may themselves read other channels.
Args:
values (list[float]): List of 8 float values corresponding to the analog channels.
They must be in order from an_ch0 to an_ch7.
"""
timestamp = time.time()
# Update parent's last readback before running subscriptions!!
self.parent._last_readback = time.monotonic()
updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val)
# Update all readbacks first
for walk in self.parent.walk_signals():
if walk.item.attr_name.startswith("an_ch"):
idx = int(walk.item.attr_name[-1])
if 0 <= idx < len(values):
old_val = walk.item._readback
new_val = values[idx]
walk.item._metadata["timestamp"] = timestamp
walk.item._readback = new_val
updates[walk.item.attr_name] = (new_val, old_val)
def _create_analog_channels(num_channels: int) -> dict[str, tuple]:
"""
Helper method to create a dictionary of analog channel definitions for the DynamicDeviceComponent.
# Run subscriptions after all readbacks have been updated
for walk in self.parent.walk_signals():
if walk.item.attr_name in updates:
new_val, old_val = updates[walk.item.attr_name]
walk.item._run_subs(
sub_type=walk.item.SUB_VALUE,
old_value=old_val,
value=new_val,
timestamp=timestamp,
)
Args:
num_channels (int): The number of analog channels to create.
"""
an_channels = {}
for i in range(0, num_channels):
an_channels[f"ch{i}"] = (
GalilRIOAnalogSignalRO,
f"ch{i}",
{"kind": Kind.normal, "notify_bec": True, "channel": i, "doc": f"Analog channel {i}."},
)
return an_channels
def _create_digital_output_channels(num_channels: int) -> dict[str, tuple]:
"""
Helper method to create a dictionary of digital output channel definitions for the DynamicDeviceComponent.
Args:
num_channels (int): The number of digital output channels to create.
"""
di_out_channels = {}
for i in range(0, num_channels):
di_out_channels[f"ch{i}"] = (
GalilRIODigitalOutSignal,
f"ch{i}",
{
"kind": Kind.config,
"notify_bec": True,
"channel": i,
"doc": f"Digital output channel {i}.",
},
)
return di_out_channels
class GalilRIO(PSIDeviceBase):
"""
Galil RIO controller integration with 8 analog input channels. To implement the device,
please provide the appropriate host and port (default port is 23).
Galil RIO controller integration with 16 digital output channels and 8 analog input channels.
The default port for the controller is 23.
Args:
host (str): Hostname or IP address of the Galil RIO controller.
port (int, optional): Port number for the TCP/IP connection. Defaults to 23.
socket_cls (type[SocketIO], optional): Socket class to use for communication. Defaults to SocketIO.
scan_info (ScanInfo, optional): ScanInfo object for the device.
device_manager (DeviceManagerDS): The device manager instance that manages this device.
**kwargs: Additional keyword arguments passed to the PSIDeviceBase constructor.
"""
SUB_CONNECTION_CHANGE = "connection_change"
an_ch0 = Cpt(GalilRIOSignalRO, signal_name="an_ch0", channel=0, doc="Analog input channel 0")
an_ch1 = Cpt(GalilRIOSignalRO, signal_name="an_ch1", channel=1, doc="Analog input channel 1")
an_ch2 = Cpt(GalilRIOSignalRO, signal_name="an_ch2", channel=2, doc="Analog input channel 2")
an_ch3 = Cpt(GalilRIOSignalRO, signal_name="an_ch3", channel=3, doc="Analog input channel 3")
an_ch4 = Cpt(GalilRIOSignalRO, signal_name="an_ch4", channel=4, doc="Analog input channel 4")
an_ch5 = Cpt(GalilRIOSignalRO, signal_name="an_ch5", channel=5, doc="Analog input channel 5")
an_ch6 = Cpt(GalilRIOSignalRO, signal_name="an_ch6", channel=6, doc="Analog input channel 6")
an_ch7 = Cpt(GalilRIOSignalRO, signal_name="an_ch7", channel=7, doc="Analog input channel 7")
#############################
### Analog input channels ###
#############################
analog_in = DDC(_create_analog_channels(GalilRIOAnalogSignalRO._NUM_ANALOG_CHANNELS))
digital_out = DDC(
_create_digital_output_channels(GalilRIODigitalOutSignal._NUM_DIGITAL_OUTPUT_CHANNELS)
)
def __init__(
self,
@@ -177,19 +251,18 @@ class GalilRIO(PSIDeviceBase):
if port is None:
port = 23 # Default port for Galil RIO controller
self.controller = GalilRIOController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
name=f"GalilRIOController_{name}",
socket_cls=socket_cls,
socket_host=host,
socket_port=port,
device_manager=device_manager,
)
self._last_readback: float = time.monotonic()
self._readback_metadata: dict[str, float] = {"last_readback": 0.0}
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
self.controller.subscribe(
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
)
@property
def last_readback(self) -> float:
"""Return the time of the last readback from the controller."""
return self._last_readback
# pylint: disable=arguments-differ
def wait_for_connection(self, timeout: float = 30.0) -> None:
"""Wait for the RIO controller to be connected within timeout period."""
@@ -207,7 +280,7 @@ class GalilRIO(PSIDeviceBase):
if __name__ == "__main__":
HOST_NAME = "129.129.98.64"
HOST_NAME = "129.129.122.14"
from bec_server.device_server.tests.utils import DMMock
dm = DMMock()

View File

@@ -25,6 +25,34 @@ logger = bec_logger.logger
class LamniGalilController(GalilController):
# ============================================================
# Error status
# ============================================================
caperr_bits = {
0x01: "Cap1 outside expected left-stop range (early check)",
0x02: "Cap2 outside expected left-stop range (early check)",
0x04: "Cap1 too low during pressure-off check (near right boundary)",
0x08: "Cap2 too low during pressure-off check (near right boundary)",
0x10: "Cap1 exceeded allowed left-stop boundary during movement",
0x20: "Cap2 exceeded allowed left-stop boundary during movement (disabled in code)",
0x40: "Cap1 did not respond to test movement",
0x80: "Cap2 did not respond to test movement"
}
allaxrer_table = {
1: "Not all axes referenced after reference search",
2: "Pressure-loss emergency stop (pressure 14/15 active while motor C off)",
3: "Unexpected pressure OFF while soft-limits not yet set",
4: "Pressure valve mismatch (OUT13=0 but IN13=1)",
5: "Capacitive sensor boundary violations (caperr > 0)",
6: "Emergency Stop triggered (IN[5]=0)",
7: "Following error detected on one or more axes"
}
USER_ACCESS = [
"describe",
"show_running_threads",
@@ -37,6 +65,8 @@ class LamniGalilController(GalilController):
"get_motor_limit_switch",
"is_motor_on",
"all_axes_referenced",
"lamni_lights_off",
"lamni_lights_on"
]
def show_status_other(self):
@@ -60,6 +90,47 @@ class LamniGalilController(GalilController):
print("There is air pressure at the outer rotation radial.")
swver = float(self.socket_put_and_receive("MGswver"))
print(f"Lgalil LAMNI firmware version {swver:2.0f}.")
allaxref = int(float(self.socket_put_and_receive("MGallaxref")))
print(f"Error statuts:")
if allaxref == 1:
print(f"Allaxref = 1, all OK.")
else:
print(f"Allaxref = {allaxref}. Not all axes are referenced or error introduced preventing motion.")
allaxrer = int(float(self.socket_put_and_receive("MGallaxrer")))
print("\nallaxrer =", allaxrer)
print(self.decode_allaxrer(allaxrer))
caperr = int(float(self.socket_put_and_receive("MGcaperr")))
print("\nDecoding caperr =", caperr)
self.visualize_caperr(caperr)
def decode_allaxrer(self, code: int) -> str:
"""Return human-readable meaning of allaxrer code."""
return self.allaxrer_table.get(code, "Unknown allaxrer code")
def visualize_caperr(self, mask: int):
"""Pretty-print a bitmask visualization for caperr."""
print("\n=== CAPERR BITMASK VISUALIZER ===")
print(f"Raw value: {mask} (0x{mask:02X})")
print("----------------------------------\n")
print("Bit | Hex | Active | Meaning")
print("----------------------------------")
for bit, meaning in self.caperr_bits.items():
active = "YES" if mask & bit else "no"
print(f"{bit:3d} | 0x{bit:02X} | {active:6} | {meaning}")
print("\nActive flags:")
active_flags = [meaning for bit, meaning in self.caperr_bits.items() if mask & bit]
if active_flags:
for f in active_flags:
print("", f)
else:
print(" (none)")
print("\n==================================\n")
def lamni_lights_off(self):
self.socket_put_confirmed("SB1")
@@ -93,7 +164,7 @@ class LamniGalilReadbackSignal(GalilSignalRO):
val = super().read()
if self.parent.axis_Id_numeric == 2:
try:
rt = self.parent.device_manager.devices[self.parent.rtx]
rt = self.parent.device_manager.devices[self.parent.rt]
if rt.enabled:
rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"])
except KeyError:
@@ -147,7 +218,7 @@ class LamniGalilMotor(Device, PositionerBase):
raise BECConfigError(
"device_mapping has been specified but the device_manager cannot be accessed."
)
self.rt = self.device_mapping.get("rt")
self.rt = self.device_mapping.get("rt", "rtx")
super().__init__(
prefix,

View File

@@ -11,6 +11,7 @@ from ophyd.status import wait as status_wait
from ophyd.utils import LimitError, ReadOnlyError
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected
from prettytable import PrettyTable
from csaxs_bec.devices.omny.rt.rt_ophyd import RtCommunicationError, RtError
@@ -51,6 +52,7 @@ class RtLamniController(Controller):
_axes_per_controller = 3
USER_ACCESS = [
"socket_put_and_receive",
"socket_put",
"set_rotation_angle",
"feedback_disable",
"feedback_enable_without_reset",
@@ -62,6 +64,9 @@ class RtLamniController(Controller):
"_set_axis_velocity_maximum_speed",
"_position_sampling_single_read",
"_position_sampling_single_reset_and_start_sampling",
"show_signal_strength_interferometer",
"show_analog_signals",
"show_feedback_status",
]
def __init__(
@@ -208,8 +213,9 @@ class RtLamniController(Controller):
@threadlocked
def start_scan(self):
interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0])
if interferometer_feedback_not_running == 1:
# interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0])
# if interferometer_feedback_not_running == 1:
if not self.feedback_is_running():
logger.error(
"Cannot start scan because feedback loop is not running or there is an interferometer error."
)
@@ -270,6 +276,44 @@ class RtLamniController(Controller):
"average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)},
}
return signals
def feedback_is_running(self) -> bool:
status = int(float((self.socket_put_and_receive("J2")).split(",")[0]))
return status == 0 # 0 means running, 1 means error/disabled
def show_feedback_status(self):
if self.feedback_is_running():
print("Loop is running, no error on interferometer.")
else:
print("Loop is not running, either it is turned off or an interferometer error occurred.")
def show_analog_signals(self) -> dict:
self.socket_put("As") # start sampling
time.sleep(0.01)
return_table = (self.socket_put_and_receive("Ar")).split(",")
number_of_samples = int(float(return_table[0]))
signals = {
"number_of_samples": number_of_samples,
"piezo_0": float(return_table[1]),
"piezo_1": float(return_table[2]),
"cap_0": float(return_table[3]),
"cap_1": float(return_table[4]),
"cap_2": float(return_table[5]),
"cap_3": float(return_table[6]),
"cap_4": float(return_table[7]),
}
t = PrettyTable()
t.title = f"LamNI Analog Signals ({number_of_samples} samples)"
t.field_names = ["Signal", "Value"]
for key, val in signals.items():
if key != "number_of_samples":
t.add_row([key, f"{val:.4f}"])
print(t)
return
def read_positions_from_sampler(self):
# this was for reading after the scan completed
@@ -347,6 +391,48 @@ class RtLamniController(Controller):
)
return bool(return_table[0])
def show_signal_strength_interferometer(self):
# trigger SSI averaging before reading
self.socket_put("J3")
time.sleep(0.05)
return_table = (self.socket_put_and_receive("J2")).split(",")
ssi_0 = float(return_table[1])
ssi_1 = float(return_table[2])
return_table_angle = (self.socket_put_and_receive("J7")).split(",")
angle_running = bool(int(float(return_table_angle[0])))
angle_position = float(return_table_angle[1])
angle_signal = float(return_table_angle[2])
t = PrettyTable()
t.title = "Interferometer signal strength"
t.field_names = ["Axis", "Description", "Value", "Running"]
t.add_row([0, "ST FZP horizontal", ssi_0, "-"])
t.add_row([1, "ST FZP vertical", ssi_1, "-"])
t.add_row([2, "Angle interferometer", angle_signal, angle_running])
print(t)
if angle_running:
print(f"Angle interferometer position: {angle_position:.4f} um")
else:
print("Warning: angle interferometer is not running.")
def show_interferometer_positions(self) -> dict:
return_table = (self.socket_put_and_receive("J4")).split(",")
loop_status = bool(int(float(return_table[0])))
pos_y = float(return_table[1])
pos_x = float(return_table[2])
t = PrettyTable()
t.title = "LamNI Interferometer Positions"
t.field_names = ["Axis", "Description", "Position (um)"]
t.add_row([0, "X", f"{pos_x:.4f}"])
t.add_row([1, "Y", f"{pos_y:.4f}"])
print(t)
print(f"Feedback loop running: {loop_status}")
return {"x": pos_x, "y": pos_y, "loop_running": loop_status}
def feedback_enable_with_reset(self):
if not self.feedback_status_angle_lamni():
self.feedback_disable_and_even_reset_lamni_angle_interferometer()

View File

@@ -1,14 +0,0 @@
"""
Conftest runs for all tests in this directory and subdirectories. Thereby, we know for
certain that the SocketSignal.READBACK_TIMEOUT is set to 0 for all tests, which prevents
hanging tests when a readback is attempted on a non-connected socket.
"""
# conftest.py
import pytest
from ophyd_devices.utils.socket import SocketSignal
@pytest.fixture(autouse=True)
def patch_socket_timeout(monkeypatch):
monkeypatch.setattr(SocketSignal, "READBACK_TIMEOUT", 0.0)

View File

@@ -35,16 +35,16 @@ def test_save_frame(bec_client_mock):
lamni = LamNI(client)
align = XrayEyeAlign(client, lamni)
with mock.patch(
"csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put"
"csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
) as epics_put_mock:
align.save_frame()
epics_put_mock.assert_called_once_with("XOMNYI-XEYE-SAVFRAME:0", 1)
def test_update_frame(bec_client_mock):
epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put"
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_get"
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.fshopen"
epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_get"
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.fshopen"
client = bec_client_mock
client.device_manager.devices.xeye = DeviceBase(
name="xeye",

View File

@@ -2,6 +2,7 @@ from unittest import mock
import pytest
from ophyd_devices.tests.utils import SocketMock
from ophyd_devices.utils.socket import SocketSignal
from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor
@@ -17,6 +18,11 @@ def fsamroy(dm_with_devices):
socket_cls=SocketMock,
device_manager=dm_with_devices,
)
for walk in fsamroy_motor.walk_signals():
if isinstance(walk.item, SocketSignal):
walk.item._readback_timeout = (
0.0 # Set the readback timeout to 0 to avoid waiting during tests
)
fsamroy_motor.controller.on()
assert isinstance(fsamroy_motor.controller, FuprGalilController)
yield fsamroy_motor

View File

@@ -9,7 +9,11 @@ from ophyd_devices.tests.utils import SocketMock
from csaxs_bec.devices.npoint.npoint import NPointAxis, NPointController
from csaxs_bec.devices.omny.galil.fgalil_ophyd import FlomniGalilController, FlomniGalilMotor
from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor
from csaxs_bec.devices.omny.galil.galil_rio import GalilRIO, GalilRIOController, GalilRIOSignalRO
from csaxs_bec.devices.omny.galil.galil_rio import (
GalilRIO,
GalilRIOAnalogSignalRO,
GalilRIOController,
)
from csaxs_bec.devices.omny.galil.lgalil_ophyd import LamniGalilController, LamniGalilMotor
from csaxs_bec.devices.omny.galil.ogalil_ophyd import OMNYGalilController, OMNYGalilMotor
from csaxs_bec.devices.omny.galil.sgalil_ophyd import GalilController, SGalilMotor
@@ -272,26 +276,27 @@ def test_galil_rio_signal_read(galil_rio):
## Test read of all channels
###########
assert galil_rio.an_ch0._READ_TIMEOUT == 0.1 # Default read timeout of 100ms
assert galil_rio.analog_in.ch0._readback_timeout == 0.1 # Default read timeout of 100ms
# Mock the socket to return specific values
galil_rio.controller.sock.buffer_recv = [b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901"]
galil_rio._last_readback = 0 # Force read from controller
analog_bufffer = b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901\r\n"
galil_rio.controller.sock.buffer_recv = [] # Clear any existing buffer
galil_rio.controller.sock.buffer_recv.append(analog_bufffer)
read_values = galil_rio.read()
assert len(read_values) == 8 # 8 channels
expected_values = {
galil_rio.an_ch0.name: {"value": 1.234},
galil_rio.an_ch1.name: {"value": 2.345},
galil_rio.an_ch2.name: {"value": 3.456},
galil_rio.an_ch3.name: {"value": 4.567},
galil_rio.an_ch4.name: {"value": 5.678},
galil_rio.an_ch5.name: {"value": 6.789},
galil_rio.an_ch6.name: {"value": 7.890},
galil_rio.an_ch7.name: {"value": 8.901},
galil_rio.analog_in.ch0.name: {"value": 1.234},
galil_rio.analog_in.ch1.name: {"value": 2.345},
galil_rio.analog_in.ch2.name: {"value": 3.456},
galil_rio.analog_in.ch3.name: {"value": 4.567},
galil_rio.analog_in.ch4.name: {"value": 5.678},
galil_rio.analog_in.ch5.name: {"value": 6.789},
galil_rio.analog_in.ch6.name: {"value": 7.890},
galil_rio.analog_in.ch7.name: {"value": 8.901},
}
# All timestamps should be the same
assert all(
ret["timestamp"] == read_values[galil_rio.an_ch0.name]["timestamp"]
ret["timestamp"] == read_values[galil_rio.analog_in.ch0.name]["timestamp"]
for signal_name, ret in read_values.items()
)
# Check values
@@ -301,7 +306,7 @@ def test_galil_rio_signal_read(galil_rio):
# Check communication command to socker
assert galil_rio.controller.sock.buffer_put == [
b"MG@AN[0],@AN[1],@AN[2],@AN[3],@AN[4],@AN[5],@AN[6],@AN[7]\r"
b"MG@AN[0], @AN[1], @AN[2], @AN[3], @AN[4], @AN[5], @AN[6], @AN[7]\r"
]
###########
@@ -313,11 +318,11 @@ def test_galil_rio_signal_read(galil_rio):
def value_callback(value, old_value, **kwargs):
obj = kwargs.get("obj")
galil = obj.parent
galil = obj.parent.parent
readback = galil.read()
value_callback_buffer.append(readback)
galil_rio.an_ch0.subscribe(value_callback, run=False)
galil_rio.analog_in.ch0.subscribe(value_callback, run=False)
galil_rio.controller.sock.buffer_recv = [b" 2.5 2.6 2.7 2.8 2.9 3.0 3.1 3.2"]
expected_values = [2.5, 2.6, 2.7, 2.8, 2.9, 3.0, 3.1, 3.2]
@@ -327,13 +332,15 @@ def test_galil_rio_signal_read(galil_rio):
# Should have used the cached value
for walk in galil_rio.walk_signals():
walk.item._READ_TIMEOUT = 10 # Make sure cached read is used
ret = galil_rio.an_ch0.read()
walk.item._readback_timeout = 10 # Make sure cached read is used
ret = galil_rio.analog_in.ch0.read()
# Should not trigger callback since value did not change
assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 1.234)
assert np.isclose(ret[galil_rio.analog_in.ch0.name]["value"], 1.234)
# Same timestamp as for another channel as this is cached read
assert np.isclose(ret[galil_rio.an_ch0.name]["timestamp"], galil_rio.an_ch7.timestamp)
assert np.isclose(
ret[galil_rio.analog_in.ch0.name]["timestamp"], galil_rio.analog_in.ch7.timestamp
)
assert len(value_callback_buffer) == 0
##################
@@ -341,10 +348,10 @@ def test_galil_rio_signal_read(galil_rio):
##################
# Now force a read from the controller
galil_rio._last_readback = 0 # Force read from controller
ret = galil_rio.an_ch0.read()
galil_rio.analog_in.ch0._last_readback = 0 # Force read from controller
ret = galil_rio.analog_in.ch0.read()
assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 2.5)
assert np.isclose(ret[galil_rio.analog_in.ch0.name]["value"], 2.5)
# Check callback invocation, but only 1 callback even with galil_rio.read() call in callback
assert len(value_callback_buffer) == 1
@@ -352,7 +359,45 @@ def test_galil_rio_signal_read(galil_rio):
assert np.isclose(values, expected_values).all()
assert all(
[
value["timestamp"] == value_callback_buffer[0][galil_rio.an_ch0.name]["timestamp"]
value["timestamp"]
== value_callback_buffer[0][galil_rio.analog_in.ch0.name]["timestamp"]
for value in value_callback_buffer[0].values()
]
)
def test_galil_rio_digital_out_signal(galil_rio):
"""
Test that the Galil RIO digital output signal can be set correctly.
"""
## Test Read from digital output channels
buffer_receive = []
excepted_put_buffer = []
for ii in range(galil_rio.digital_out.ch0._NUM_DIGITAL_OUTPUT_CHANNELS):
cmd = f"MG@OUT[{ii}]\r".encode()
excepted_put_buffer.append(cmd)
recv = " 1.000".encode()
buffer_receive.append(recv)
galil_rio.controller.sock.buffer_recv = buffer_receive # Mock response for readback
digital_read = galil_rio.read_configuration() # Read to populate readback values
for walk in galil_rio.digital_out.walk_signals():
assert np.isclose(digital_read[walk.item.name]["value"], 1.0)
assert galil_rio.controller.sock.buffer_put == excepted_put_buffer
# Test writing to digital output channels
galil_rio.controller.sock.buffer_put = [] # Clear buffer put
galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback
# Set digital output channel 0 to high
galil_rio.digital_out.ch0.put(1)
assert galil_rio.controller.sock.buffer_put == [b"SB0\r"]
# Set digital output channel 0 to low
galil_rio.controller.sock.buffer_put = [] # Clear buffer put
galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback
galil_rio.digital_out.ch0.put(0)
assert galil_rio.controller.sock.buffer_put == [b"CB0\r"]