Compare commits

...

9 Commits

Author SHA1 Message Date
x01dc
54f1f42332 added tomo type 2 and 3, for golden ratio
Some checks failed
CI for csaxs_bec / test (pull_request) Successful in 1m37s
CI for csaxs_bec / test (push) Has been cancelled
2026-02-21 11:52:32 +01:00
x01dc
48df15f35c added rt controller commands to lamni namespace
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m38s
2026-02-21 11:38:34 +01:00
x01dc
6f60bd4b2b first commit, getting lamni running and adding some missing features in controller rt and galil
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m45s
2026-02-20 14:39:22 +01:00
5d97913956 refactor(galil-rio): Improve docstring and general integration
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m31s
CI for csaxs_bec / test (push) Successful in 1m32s
2026-02-16 17:48:35 +01:00
93384b87e0 fix(readback-timeout): fix monkeypatch for readback timeout; closes #140
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m29s
CI for csaxs_bec / test (pull_request) Failing after 1m29s
2026-02-16 17:38:10 +01:00
9a249363fd refactor(galil-rio): fix socket-signal cached readings 2026-02-16 17:38:10 +01:00
f925a7c1db fix(galilsignalbase): fix root access for controller from parent.
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m29s
CI for csaxs_bec / test (pull_request) Failing after 1m33s
2026-02-16 14:57:02 +01:00
5811e445fe tests: fix tests after refactoring 2026-02-16 14:57:02 +01:00
16ea7f410e feat(galil-rio): Add di_out channels to GalilRIO 2026-02-16 14:57:02 +01:00
10 changed files with 814 additions and 199 deletions

View File

@@ -6,6 +6,8 @@ from rich.console import Console
from rich.table import Table
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_put, fshclose
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
# import builtins to avoid linter errors
dev = builtins.__dict__.get("dev")
@@ -18,25 +20,32 @@ class LamNIInitError(Exception):
class LaMNIInitStagesMixin:
def __init__(self, client):
super().__init__()
self.client = client
self.OMNYTools = OMNYTools(self.client)
def lamni_init_stages(self):
user_input = input("Starting initialization of LamNI stages. OK? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Start initialization of LamNI stages. OK?"):
print("staring...")
dev.lsamrot.enabled = True
else:
return
if self.check_all_axes_of_lamni_referenced():
user_input = input("Continue anyways? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("All axes are referenced. Continue anyways?"):
print("ok then...")
else:
return
axis_id_lsamrot = dev.lsamrot._config["deviceConfig"].get("axis_Id")
if dev.lsamrot.controller.get_motor_limit_switch(axis_id_lsamrot)[1] == False:
user_input = input("The rotation stage will be moved to one limit [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("The rotation stage will be moved to one limit"):
print("starting...")
else:
return
@@ -47,10 +56,7 @@ class LaMNIInitStagesMixin:
print("The controller will be disabled in bec. To enable dev.lsamrot.enabled=True")
return
user_input = input(
"Init of loptz. Can the stage move to the upstream limit without collision?? [y/n]"
)
if user_input == "y":
if self.OMNYTools.yesno("Init of loptz. Can the stage move to the upstream limit without collision?"):
print("ok then...")
else:
return
@@ -81,8 +87,7 @@ class LaMNIInitStagesMixin:
time.sleep(0.1)
self.find_reference_mark(dev.lsamrot)
user_input = input("Init of leye. Can the stage move to -x limit without collision? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Init of leye. Can the stage move to -x limit without collision?"):
print("starting...")
else:
return
@@ -134,8 +139,7 @@ class LaMNIInitStagesMixin:
return ord(axis_id.lower()) - 97
def _align_setup(self):
user_input = input("Start moving stages to default initial positions? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Start moving stages to default initial positions?"):
print("Start moving stages...")
else:
print("Stopping.")

View File

@@ -15,13 +15,19 @@ from bec_lib.pdf_writer import PDFWriter
from typeguard import typechecked
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
from .lamni_optics_mixin import LaMNIInitStagesMixin, LamNIOpticsMixin
logger = bec_logger.logger
bec = builtins.__dict__.get("bec")
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
class XrayEyeAlign:
# pixel calibration, multiply to get mm
# PIXEL_CALIBRATION = 0.2/209 #.2 with binning
@@ -510,8 +516,9 @@ class LamNI(LamNIOpticsMixin):
def __init__(self, client):
super().__init__()
self.client = client
self.device_manager = client.device_manager
self.align = XrayEyeAlign(client, self)
self.init = LaMNIInitStagesMixin()
self.init = LaMNIInitStagesMixin(client)
self.check_shutter = True
self.check_light_available = True
self.check_fofb = True
@@ -524,6 +531,16 @@ class LamNI(LamNIOpticsMixin):
self._beam_is_okay = True
self._stop_beam_check_event = None
self.beam_check_thread = None
self.OMNYTools = OMNYTools(self.client)
# Progress tracking
self.progress = {}
self.progress["tomo_type"] = "Equally spaced sub-tomograms"
self.progress["subtomo"] = 0
self.progress["subtomo_projection"] = 0
self.progress["subtomo_total_projections"] = 1
self.progress["projection"] = 0
self.progress["total_projections"] = 1
self.progress["angle"] = 0
def get_beamline_checks_enabled(self):
print(
@@ -598,12 +615,12 @@ class LamNI(LamNIOpticsMixin):
if val == 1:
# equally spaced tomography with 8 sub tomograms
self.client.set_global_var("tomo_type", val)
# elif val == 2:
# # golden ratio tomography (sorted bunches)
# self.client.set_global_var("tomo_type", val)
# elif val == 3:
# # equally spaced tomography with starting angles shifted by golden ratio
# self.client.set_global_var("tomo_type", val)
elif val == 2:
# golden ratio tomography (sorted bunches)
self.client.set_global_var("tomo_type", val)
elif val == 3:
# equally spaced tomography with starting angles shifted by golden ratio
self.client.set_global_var("tomo_type", val)
else:
raise ValueError("Unknown tomo_type.")
@@ -740,6 +757,41 @@ class LamNI(LamNIOpticsMixin):
def tomo_stitch_overlap(self, val: float):
self.client.set_global_var("tomo_stitch_overlap", val)
@property
def golden_max_number_of_projections(self):
val = self.client.get_global_var("golden_max_number_of_projections")
if val is None:
return 1000.0
return val
@golden_max_number_of_projections.setter
def golden_max_number_of_projections(self, val: float):
self.client.set_global_var("golden_max_number_of_projections", val)
@property
def golden_ratio_bunch_size(self):
val = self.client.get_global_var("golden_ratio_bunch_size")
if val is None:
return 20
return val
@golden_ratio_bunch_size.setter
def golden_ratio_bunch_size(self, val: float):
if val < 20:
raise ValueError("golden_ratio_bunch_size must be at least 20.")
self.client.set_global_var("golden_ratio_bunch_size", val)
@property
def golden_projections_at_0_deg_for_damage_estimation(self):
val = self.client.get_global_var("golden_projections_at_0_deg_for_damage_estimation")
if val is None:
return 0
return val
@golden_projections_at_0_deg_for_damage_estimation.setter
def golden_projections_at_0_deg_for_damage_estimation(self, val: float):
self.client.set_global_var("golden_projections_at_0_deg_for_damage_estimation", val)
@property
def sample_name(self):
val = self.client.get_global_var("sample_name")
@@ -909,6 +961,49 @@ class LamNI(LamNIOpticsMixin):
except Exception:
logger.warning("Failed to send update to SciLog.")
def rt_off(self):
dev.rtx.enabled = False
dev.rty.enabled = False
def rt_on(self):
dev.rtx.enabled = True
dev.rty.enabled = True
if dev.rtx.enabled == True:
print("rt is enabled")
else:
print("failed to enable rt")
def feedback_enable_with_reset(self):
self.device_manager.devices.rtx.controller.feedback_enable_with_reset()
self.feedback_status()
def feedback_enable_without_reset(self):
self.device_manager.devices.rtx.controller.feedback_enable_without_reset()
self.feedback_status()
def feedback_disable(self):
self.device_manager.devices.rtx.controller.feedback_disable()
self.feedback_status()
def feedback_disable_and_reset_angle(self):
self.device_manager.devices.rtx.controller.feedback_disable_and_even_reset_lamni_angle_interferometer()
self.feedback_status()
def feedback_status(self):
if self.device_manager.devices.rtx.controller.feedback_is_running():
print("The rt feedback is \x1b[92mrunning\x1b[0m.")
else:
print("The rt feedback is \x1b[91mNOT\x1b[0m running.")
def show_interferometer_positions(self):
self.device_manager.devices.rtx.controller.show_interferometer_positions()
def show_signal_strength(self):
self.device_manager.devices.rtx.controller.show_signal_strength_interferometer()
def show_analog_signals(self):
return self.device_manager.devices.rtx.controller.show_analog_signals()
def add_sample_database(
self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user
):
@@ -935,9 +1030,8 @@ class LamNI(LamNIOpticsMixin):
# self.tomo_scan_projection(angle)
# self.tomo_reconstruct()
def sub_tomo_scan(self, subtomo_number, start_angle=None):
"""start a subtomo"""
dev = builtins.__dict__.get("dev")
def _write_subtomo_to_scilog(self, subtomo_number):
"""Write subtomo start information to scilog."""
bec = builtins.__dict__.get("bec")
if self.tomo_id > 0:
tags = ["BEC_subtomo", self.sample_name, f"tomo_id_{self.tomo_id}"]
@@ -948,6 +1042,10 @@ class LamNI(LamNIOpticsMixin):
tags,
)
def sub_tomo_scan(self, subtomo_number, start_angle=None):
"""start a subtomo"""
self._write_subtomo_to_scilog(subtomo_number)
if start_angle is None:
if subtomo_number == 1:
start_angle = 0
@@ -980,13 +1078,14 @@ class LamNI(LamNIOpticsMixin):
if not (subtomo_number % 2):
angles = np.flip(angles)
for angle in angles:
self.progress["tomo_type"] = "Equally spaced sub-tomograms"
self.progress["subtomo"] = subtomo_number
self.progress["subtomo_projection"] = angles.index(angle)
self.progress["subtomo_total_projections"] = 180 / self.tomo_angle_stepsize
self.progress["subtomo_projection"] = np.where(angles == angle)[0][0]
self.progress["subtomo_total_projections"] = 360 / self.tomo_angle_stepsize
self.progress["projection"] = (subtomo_number - 1) * self.progress[
"subtomo_total_projections"
] + self.progress["subtomo_projection"]
self.progress["total_projections"] = 180 / self.tomo_angle_stepsize * 8
self.progress["total_projections"] = 360 / self.tomo_angle_stepsize * 8
self.progress["angle"] = angle
self._tomo_scan_at_angle(angle, subtomo_number)
@@ -1049,6 +1148,11 @@ class LamNI(LamNIOpticsMixin):
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
if self._was_beam_okay() and not error_caught:
successful = True
else:
self._wait_for_beamline_checks()
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None:
tomo_scan_numbers_file = os.path.expanduser(
"~/Data10/specES1/dat-files/tomography_scannumbers.txt"
@@ -1059,13 +1163,74 @@ class LamNI(LamNIOpticsMixin):
f"{scan_number} {angle} {dev.lsamrot.read()['lsamrot']['value']:.3f} {self.tomo_id} {subtomo_number} {0} {'lamni'}\n"
)
def tomo_scan(self, subtomo_start=1, start_angle=None):
"""start a tomo scan"""
def _golden(self, ii, howmany_sorted, maxangle=360, reverse=False):
"""Return the ii-th golden ratio angle within sorted bunches of size howmany_sorted,
and its subtomo number. Operates over maxangle degrees (360 for LamNI)."""
golden = []
for iji in range(
(ii - (ii % howmany_sorted)), (ii - (ii % howmany_sorted)) + howmany_sorted, 1
):
golden.append(
((iji * maxangle * (1 + pow(5, 0.5)) / 2) * 1000 % (maxangle * 1000)) / 1000
)
golden.sort()
subtomo_number = int(ii / howmany_sorted) + 1
if reverse and not subtomo_number % 2:
golden.reverse()
return (golden[ii % howmany_sorted], subtomo_number)
def _golden_equally_spaced(self, ii, number_of_projections_per_subtomo, maxangle=360, reverse=True, verbose=False):
"""Return angles for equally spaced tomography with sub-tomogram starting angles
shifted according to the golden ratio. Operates over maxangle degrees (360 for LamNI).
ii is the projection number starting at 0."""
angular_step = maxangle / number_of_projections_per_subtomo
subtomo_number = int(((ii) * angular_step) / maxangle) + 1
start_angle = self._golden(subtomo_number - 1, 1, angular_step)[0]
projection_number_of_subtomo = (
ii - (subtomo_number - 1) * number_of_projections_per_subtomo
)
if reverse:
if subtomo_number % 2:
angle = start_angle + projection_number_of_subtomo * angular_step
else:
angle = (
start_angle
+ (number_of_projections_per_subtomo - 1) * angular_step
- projection_number_of_subtomo * angular_step
)
else:
angle = start_angle + projection_number_of_subtomo * angular_step
if verbose:
print(
f"Equally spaced golden ratio tomography.\n"
f"Angular step: {angular_step}\n"
f"Subtomo Number: {subtomo_number}\n"
f"Angle: {angle}"
)
return angle, subtomo_number
def tomo_scan(self, subtomo_start=1, start_angle=None, projection_number=None):
"""Start a tomo scan.
Args:
subtomo_start (int): For tomo_type 1, the sub-tomogram number to start from. Defaults to 1.
start_angle (float, optional): Override the starting angle of the first sub-tomogram. Defaults to None.
projection_number (int, optional): For tomo_types 2 and 3, resume from this projection index. Defaults to None.
"""
bec = builtins.__dict__.get("bec")
scans = builtins.__dict__.get("scans")
self._current_special_angles = self.special_angles.copy()
if self.tomo_type == 1 and subtomo_start == 1 and start_angle is None:
# Register a new tomo scan in the database and write the PDF report
# only when starting fresh (not resuming mid-scan)
if (
(self.tomo_type == 1 and subtomo_start == 1 and start_angle is None)
or (self.tomo_type == 2 and projection_number is None)
or (self.tomo_type == 3 and projection_number is None)
):
# pylint: disable=undefined-variable
self.tomo_id = self.add_sample_database(
self.sample_name,
@@ -1077,10 +1242,108 @@ class LamNI(LamNIOpticsMixin):
"BEC",
)
self.write_pdf_report()
with scans.dataset_id_on_hold:
for ii in range(subtomo_start, 9):
self.sub_tomo_scan(ii, start_angle=start_angle)
start_angle = None
if self.tomo_type == 1:
# 8 equally spaced sub-tomograms over 360 degrees
self.progress["tomo_type"] = "Equally spaced sub-tomograms"
for ii in range(subtomo_start, 9):
self.sub_tomo_scan(ii, start_angle=start_angle)
start_angle = None
elif self.tomo_type == 2:
# Golden ratio tomography (sorted bunches) over 360 degrees
self.progress["tomo_type"] = "Golden ratio tomography"
previous_subtomo_number = -1
ii = 0 if projection_number is None else projection_number
while True:
angle, subtomo_number = self._golden(
ii, self.golden_ratio_bunch_size, maxangle=360, reverse=True
)
if previous_subtomo_number != subtomo_number:
self._write_subtomo_to_scilog(subtomo_number)
if (
subtomo_number % 2 == 1
and ii > 10
and self.golden_projections_at_0_deg_for_damage_estimation == 1
):
self._tomo_scan_at_angle(0, subtomo_number)
previous_subtomo_number = subtomo_number
self.progress["subtomo"] = subtomo_number
self.progress["projection"] = ii
self.progress["angle"] = angle
if self.golden_ratio_bunch_size > 0:
self.progress["subtomo_total_projections"] = self.golden_ratio_bunch_size
self.progress["subtomo_projection"] = (
ii - (subtomo_number - 1) * self.golden_ratio_bunch_size
)
else:
self.progress["subtomo_total_projections"] = 0
self.progress["subtomo_projection"] = 0
self.progress["total_projections"] = (
self.golden_max_number_of_projections
if self.golden_max_number_of_projections > 0
else 0
)
self._tomo_scan_at_angle(angle, subtomo_number)
ii += 1
if (
self.golden_max_number_of_projections > 0
and ii > self.golden_max_number_of_projections
):
print(
f"Golden ratio tomography stopped automatically after the requested"
f" {self.golden_max_number_of_projections} projections."
)
break
elif self.tomo_type == 3:
# Equally spaced tomography with golden ratio starting angles over 360 degrees
self.progress["tomo_type"] = "Equally spaced, golden ratio starting angles"
previous_subtomo_number = -1
ii = 0 if projection_number is None else projection_number
while True:
angle, subtomo_number = self._golden_equally_spaced(
ii, int(360 / self.tomo_angle_stepsize), maxangle=360, reverse=True
)
if previous_subtomo_number != subtomo_number:
self._write_subtomo_to_scilog(subtomo_number)
if (
subtomo_number % 2 == 1
and ii > 10
and self.golden_projections_at_0_deg_for_damage_estimation == 1
):
self._tomo_scan_at_angle(0, subtomo_number)
previous_subtomo_number = subtomo_number
self.progress["subtomo"] = subtomo_number
self.progress["projection"] = ii
self.progress["angle"] = angle
self.progress["subtomo_total_projections"] = 360 / self.tomo_angle_stepsize
self.progress["subtomo_projection"] = (
ii - (subtomo_number - 1) * self.progress["subtomo_total_projections"]
)
self.progress["total_projections"] = (
self.golden_max_number_of_projections
if self.golden_max_number_of_projections > 0
else 0
)
self._tomo_scan_at_angle(angle, subtomo_number)
ii += 1
if (
self.golden_max_number_of_projections > 0
and ii > self.golden_max_number_of_projections
):
print(
f"Golden ratio tomography stopped automatically after the requested"
f" {self.golden_max_number_of_projections} projections."
)
break
else:
raise ValueError(f"Unknown tomo_type: {self.tomo_type}.")
def tomo_parameters(self):
"""print and update the tomo parameters"""
@@ -1103,9 +1366,37 @@ class LamNI(LamNIOpticsMixin):
print(f" _tomo_fovy_offset <mm> = {self.align.tomo_fovy_offset}")
print(f" _manual_shift_x <mm> = {self.manual_shift_x}")
print(f" _manual_shift_y <mm> = {self.manual_shift_y}")
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
print(f"Resulting in number of projections: {360/self.tomo_angle_stepsize*8}")
print(f"Sample name: {self.sample_name}\n")
print("")
if self.tomo_type == 1:
print("\x1b[1mTomo type 1:\x1b[0m 8 equally spaced sub-tomograms")
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
print(f"Resulting in number of projections: {360/self.tomo_angle_stepsize*8}")
elif self.tomo_type == 2:
print("\x1b[1mTomo type 2:\x1b[0m Golden ratio tomography")
print(f"Sorted in bunches of: {self.golden_ratio_bunch_size}")
if self.golden_max_number_of_projections > 0:
print(f"Ending after {self.golden_max_number_of_projections} projections.")
else:
print("Ending by manual interruption.")
if self.golden_projections_at_0_deg_for_damage_estimation == 1:
print(
"Repeating projections at 0 degrees at the beginning of every second subtomogram."
)
elif self.tomo_type == 3:
print(
"\x1b[1mTomo type 3:\x1b[0m Equally spaced tomography, golden ratio starting angle"
)
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
print(f"Number of projections per sub-tomogram: {360/self.tomo_angle_stepsize}")
if self.golden_max_number_of_projections > 0:
print(f"Ending after {self.golden_max_number_of_projections} projections.")
else:
print("Ending by manual interruption.")
if self.golden_projections_at_0_deg_for_damage_estimation == 1:
print(
"Repeating projections at 0 degrees at the beginning of every second subtomogram."
)
print(f"\nSample name: {self.sample_name}\n")
user_input = input("Are these parameters correctly set for your scan? ")
if user_input == "y":
@@ -1125,13 +1416,61 @@ class LamNI(LamNIOpticsMixin):
self.ptycho_reconstruct_foldername = self._get_val(
"Reconstruction queue ", self.ptycho_reconstruct_foldername, str
)
tomo_numberofprojections = self._get_val(
"Number of projections", 360 / self.tomo_angle_stepsize * 8, int
)
print(f"The angular step will be {360/tomo_numberofprojections}")
self.tomo_angle_stepsize = 360 / tomo_numberofprojections * 8
print(f"The angular step in a subtomogram it will be {self.tomo_angle_stepsize}")
print("Tomography type:")
print(" 1: 8 equally spaced sub-tomograms (360 deg)")
print(" 2: Golden ratio tomography")
print(" 3: Equally spaced tomography, golden ratio starting angle")
self.tomo_type = self._get_val("Tomography type", self.tomo_type, int)
if self.tomo_type == 1:
tomo_numberofprojections = self._get_val(
"Number of projections", 360 / self.tomo_angle_stepsize * 8, int
)
print(f"The angular step will be {360/tomo_numberofprojections}")
self.tomo_angle_stepsize = 360 / tomo_numberofprojections * 8
print(f"The angular step in a subtomogram it will be {self.tomo_angle_stepsize}")
elif self.tomo_type == 2:
while True:
bunch_size = self._get_val(
"Number of projections sorted per bunch (minimum 20)",
self.golden_ratio_bunch_size,
int,
)
if bunch_size >= 20:
self.golden_ratio_bunch_size = bunch_size
break
print("Bunch size must be at least 20. Please try again.")
self.golden_max_number_of_projections = self._get_val(
"Stop after number of projections (0 for endless)",
self.golden_max_number_of_projections,
int,
)
self.golden_projections_at_0_deg_for_damage_estimation = self._get_val(
"Repeat projections at 0 deg every second subtomo 1/0?",
self.golden_projections_at_0_deg_for_damage_estimation,
int,
)
elif self.tomo_type == 3:
numprj = self._get_val(
"Number of projections per sub-tomogram",
int(360 / self.tomo_angle_stepsize),
int,
)
self.tomo_angle_stepsize = 360 / numprj
self.golden_max_number_of_projections = self._get_val(
"Stop after number of projections (0 for endless)",
self.golden_max_number_of_projections,
int,
)
self.golden_projections_at_0_deg_for_damage_estimation = self._get_val(
"Repeat projections at 0 deg every second subtomo 1/0?",
self.golden_projections_at_0_deg_for_damage_estimation,
int,
)
self.sample_name = self._get_val("sample name", self.sample_name, str)
@staticmethod
@@ -1197,6 +1536,7 @@ class LamNI(LamNIOpticsMixin):
(
f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n"
),
f"{'Tomo type:':<{padding}}{self.tomo_type:>{padding}}\n",
]
content = "".join(content)
user_target = os.path.expanduser(f"~/Data10/documentation/tomo_scan_ID_{self.tomo_id}.pdf")
@@ -1389,3 +1729,4 @@ class DataDrivenLamNI(LamNI):
shapes.append(data.shape)
if len(set(shapes)) > 1:
raise ValueError(f"Tomo data file has entries of inconsistent lengths: {shapes}.")

View File

@@ -35,32 +35,32 @@ class FlomniInitError(Exception):
class FlomniError(Exception):
pass
class FlomniTools:
def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
if autoconfirm and default == "y":
self.printgreen(message + " Automatically confirming default: yes")
return True
elif autoconfirm and default == "n":
self.printgreen(message + " Automatically confirming default: no")
return False
if default == "y":
message_ending = " [Y]/n? "
elif default == "n":
message_ending = " y/[N]? "
else:
message_ending = " y/n? "
while True:
user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
if (
user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
) or (default == "y" and user_input == ""):
return True
if (
user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
) or (default == "n" and user_input == ""):
return False
else:
print("Please expicitely confirm y or n.")
# class FlomniTools:
# def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
# if autoconfirm and default == "y":
# self.printgreen(message + " Automatically confirming default: yes")
# return True
# elif autoconfirm and default == "n":
# self.printgreen(message + " Automatically confirming default: no")
# return False
# if default == "y":
# message_ending = " [Y]/n? "
# elif default == "n":
# message_ending = " y/[N]? "
# else:
# message_ending = " y/n? "
# while True:
# user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
# if (
# user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
# ) or (default == "y" and user_input == ""):
# return True
# if (
# user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
# ) or (default == "n" and user_input == ""):
# return False
# else:
# print("Please expicitely confirm y or n.")

View File

@@ -4,6 +4,7 @@ This module contains the base class for Galil controllers as well as the signals
import functools
import time
from typing import Any
from bec_lib import bec_logger
from ophyd.utils import ReadOnlyError
@@ -347,7 +348,7 @@ class GalilSignalBase(SocketSignal):
def __init__(self, signal_name, **kwargs):
self.signal_name = signal_name
super().__init__(**kwargs)
self.controller = self.parent.controller
self.controller = self.root.controller if hasattr(self.root, "controller") else None
class GalilSignalRO(GalilSignalBase):

View File

@@ -6,24 +6,26 @@ Link to the Galil RIO vendor page:
https://www.galil.com/plcs/remote-io/rio-471xx
This module provides the GalilRIOController for communication with the RIO controller
over TCP/IP. It also provides a device integration that interfaces to these
8 analog channels.
over TCP/IP. It also provides a device integration that interfaces to its
8 analog channels, and 16 digital output channels. Some PLCs may have 24 digital output channels,
which can be easily supported by changing the _NUM_DIGITAL_OUTPUT_CHANNELS variable.
"""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Literal
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DynamicDeviceComponent as DDC
from ophyd import Kind
from ophyd.utils import ReadOnlyError
from ophyd_devices import PSIDeviceBase
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO
from csaxs_bec.devices.omny.galil.galil_ophyd import (
GalilCommunicationError,
GalilSignalRO,
GalilSignalBase,
retry_once,
)
@@ -35,15 +37,11 @@ logger = bec_logger.logger
class GalilRIOController(Controller):
"""
Controller Class for Galil RIO controller communication.
Multiple controllers are in use at the cSAXS beamline:
- 129.129.98.64 (port 23)
"""
"""Controller Class for Galil RIO controller communication."""
@threadlocked
def socket_put(self, val: str) -> None:
"""Socker put method."""
self.sock.put(f"{val}\r".encode())
@retry_once
@@ -64,21 +62,95 @@ class GalilRIOController(Controller):
)
class GalilRIOSignalRO(GalilSignalRO):
class GalilRIOAnalogSignalRO(GalilSignalBase):
"""
Read-only Signal for reading a single analog input channel from the Galil RIO controller.
It always read all 8 analog channels at once, and updates the reabacks of all channels.
New readbacks are only fetched from the controller if the last readback is older than
_READ_TIMEOUT seconds, otherwise the last cached readback is returned to reduce network traffic.
Signal for reading analog input channels of the Galil RIO controller. This signal is read-only, so
the set method raises a ReadOnlyError. The get method retrieves the values of all analog
channels in a single socket command. The readback values of all channels are updated based
on the response, and subscriptions are run for all channels. Readings are cached as implemented
in the SocketSignal class, so that multiple reads of the same channel within an update cycle do
not result in multiple socket calls.
Args:
signal_name (str): Name of the signal.
channel (int): Analog channel number (0-7).
parent (GalilRIO): Parent GalilRIO device.
signal_name (str): The name of the signal, e.g. "ch0", "ch1", ..., "ch7"
channel (int): The channel number corresponding to the signal, e.g. 0 for "ch0", 1 for "ch1", ...
parent (GalilRIO): The parent device instance that this signal belongs to.
"""
_NUM_ANALOG_CHANNELS = 8
_READ_TIMEOUT = 0.1 # seconds
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
super().__init__(signal_name=signal_name, parent=parent, **kwargs)
self._channel = channel
self._metadata["connected"] = False
self._metadata["write_access"] = False
def _socket_set(self, val):
"""Read-only signal, so set method raises an error."""
raise ReadOnlyError(f"Signal {self.name} is read-only.")
def _socket_get(self) -> float:
"""Get command for the readback signal"""
cmd = "MG@" + ", @".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)])
ret = self.controller.socket_put_and_receive(cmd)
values = [float(val) for val in ret.strip().split(" ")]
# Run updates for all channels. This also updates the _readback and metadata timestamp
# value of this channel.
self._update_all_channels(values)
return self._readback
# pylint: disable=protected-access
def _update_all_channels(self, values: list[float]) -> None:
"""
Method to receive a list of readback values for channels 0 to 7. Updates for each channel idx
are applied to the corresponding GalilRIOAnalogSignalRO signal with matching attr_name "ch{idx}".
We also update the _last_readback attribute of each of the signals, to avoid multiple socket calls,
but rather use the cached value of the combined reading for all channels.
Args:
values (list[float]): List of new readback values for all channels, where the
index corresponds to the channel number (0-7).
"""
updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val)
# Update all readbacks first
for walk in self.parent.walk_signals():
if isinstance(walk.item, GalilRIOAnalogSignalRO):
idx = int(walk.item.attr_name[-1])
if 0 <= idx < len(values):
old_val = walk.item._readback
new_val = values[idx]
walk.item._metadata["timestamp"] = self._last_readback
walk.item._last_readback = self._last_readback
walk.item._readback = new_val
if (
idx != self._channel
): # Only run subscriptions on other channels, not on itself
# as this is handled by the SocketSignal and we want to avoid running multiple
# subscriptions for the same channel update
updates[walk.item.attr_name] = (new_val, old_val)
else:
logger.warning(
f"Received {len(values)} values but found channel index {idx} in signal {walk.item.name}. Skipping update for this signal."
)
# Run subscriptions after all readbacks have been updated
# on all channels except the one that triggered the update
for walk in self.parent.walk_signals():
if walk.item.attr_name in updates:
new_val, old_val = updates[walk.item.attr_name]
walk.item._run_subs(
sub_type=walk.item.SUB_VALUE,
old_value=old_val,
value=new_val,
timestamp=self._last_readback,
)
class GalilRIODigitalOutSignal(GalilSignalBase):
"""Signal for controlling digital outputs of the Galil RIO controller."""
_NUM_DIGITAL_OUTPUT_CHANNELS = 16
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
super().__init__(signal_name, parent=parent, **kwargs)
@@ -87,81 +159,83 @@ class GalilRIOSignalRO(GalilSignalRO):
def _socket_get(self) -> float:
"""Get command for the readback signal"""
cmd = "MG@" + ",@".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)])
cmd = f"MG@OUT[{self._channel}]"
ret = self.controller.socket_put_and_receive(cmd)
values = [float(val) for val in ret.strip().split(" ")]
# This updates all channels' readbacks, including self._readback
self._update_all_channels(values)
self._readback = float(ret.strip())
return self._readback
def get(self):
"""Get current analog channel values from the Galil RIO controller."""
# If the last readback has happend more than _READ_TIMEOUT seconds ago, read all channels again
if time.monotonic() - self.parent.last_readback > self._READ_TIMEOUT:
self._readback = self._socket_get()
return self._readback
def _socket_set(self, val: Literal[0, 1]) -> None:
"""Set command for the digital output signal. Value should be 0 or 1."""
# pylint: disable=protected-access
def _update_all_channels(self, values: list[float]) -> None:
"""
Update all analog channel readbacks based on the provided list of values.
List of values must be in order from an_ch0 to an_ch7.
if val not in (0, 1):
raise ValueError("Digital output value must be 0 or 1.")
cmd = f"SB{self._channel}" if val == 1 else f"CB{self._channel}"
self.controller.socket_put_confirmed(cmd)
We first have to update the _last_readback timestamp of the GalilRIO parent device.
Then we update all readbacks of all an_ch channels, before we run any subscriptions.
This ensures that all readbacks are updated before any subscriptions are run, which
may themselves read other channels.
Args:
values (list[float]): List of 8 float values corresponding to the analog channels.
They must be in order from an_ch0 to an_ch7.
"""
timestamp = time.time()
# Update parent's last readback before running subscriptions!!
self.parent._last_readback = time.monotonic()
updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val)
# Update all readbacks first
for walk in self.parent.walk_signals():
if walk.item.attr_name.startswith("an_ch"):
idx = int(walk.item.attr_name[-1])
if 0 <= idx < len(values):
old_val = walk.item._readback
new_val = values[idx]
walk.item._metadata["timestamp"] = timestamp
walk.item._readback = new_val
updates[walk.item.attr_name] = (new_val, old_val)
def _create_analog_channels(num_channels: int) -> dict[str, tuple]:
"""
Helper method to create a dictionary of analog channel definitions for the DynamicDeviceComponent.
# Run subscriptions after all readbacks have been updated
for walk in self.parent.walk_signals():
if walk.item.attr_name in updates:
new_val, old_val = updates[walk.item.attr_name]
walk.item._run_subs(
sub_type=walk.item.SUB_VALUE,
old_value=old_val,
value=new_val,
timestamp=timestamp,
)
Args:
num_channels (int): The number of analog channels to create.
"""
an_channels = {}
for i in range(0, num_channels):
an_channels[f"ch{i}"] = (
GalilRIOAnalogSignalRO,
f"ch{i}",
{"kind": Kind.normal, "notify_bec": True, "channel": i, "doc": f"Analog channel {i}."},
)
return an_channels
def _create_digital_output_channels(num_channels: int) -> dict[str, tuple]:
"""
Helper method to create a dictionary of digital output channel definitions for the DynamicDeviceComponent.
Args:
num_channels (int): The number of digital output channels to create.
"""
di_out_channels = {}
for i in range(0, num_channels):
di_out_channels[f"ch{i}"] = (
GalilRIODigitalOutSignal,
f"ch{i}",
{
"kind": Kind.config,
"notify_bec": True,
"channel": i,
"doc": f"Digital output channel {i}.",
},
)
return di_out_channels
class GalilRIO(PSIDeviceBase):
"""
Galil RIO controller integration with 8 analog input channels. To implement the device,
please provide the appropriate host and port (default port is 23).
Galil RIO controller integration with 16 digital output channels and 8 analog input channels.
The default port for the controller is 23.
Args:
host (str): Hostname or IP address of the Galil RIO controller.
port (int, optional): Port number for the TCP/IP connection. Defaults to 23.
socket_cls (type[SocketIO], optional): Socket class to use for communication. Defaults to SocketIO.
scan_info (ScanInfo, optional): ScanInfo object for the device.
device_manager (DeviceManagerDS): The device manager instance that manages this device.
**kwargs: Additional keyword arguments passed to the PSIDeviceBase constructor.
"""
SUB_CONNECTION_CHANGE = "connection_change"
an_ch0 = Cpt(GalilRIOSignalRO, signal_name="an_ch0", channel=0, doc="Analog input channel 0")
an_ch1 = Cpt(GalilRIOSignalRO, signal_name="an_ch1", channel=1, doc="Analog input channel 1")
an_ch2 = Cpt(GalilRIOSignalRO, signal_name="an_ch2", channel=2, doc="Analog input channel 2")
an_ch3 = Cpt(GalilRIOSignalRO, signal_name="an_ch3", channel=3, doc="Analog input channel 3")
an_ch4 = Cpt(GalilRIOSignalRO, signal_name="an_ch4", channel=4, doc="Analog input channel 4")
an_ch5 = Cpt(GalilRIOSignalRO, signal_name="an_ch5", channel=5, doc="Analog input channel 5")
an_ch6 = Cpt(GalilRIOSignalRO, signal_name="an_ch6", channel=6, doc="Analog input channel 6")
an_ch7 = Cpt(GalilRIOSignalRO, signal_name="an_ch7", channel=7, doc="Analog input channel 7")
#############################
### Analog input channels ###
#############################
analog_in = DDC(_create_analog_channels(GalilRIOAnalogSignalRO._NUM_ANALOG_CHANNELS))
digital_out = DDC(
_create_digital_output_channels(GalilRIODigitalOutSignal._NUM_DIGITAL_OUTPUT_CHANNELS)
)
def __init__(
self,
@@ -177,19 +251,18 @@ class GalilRIO(PSIDeviceBase):
if port is None:
port = 23 # Default port for Galil RIO controller
self.controller = GalilRIOController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
name=f"GalilRIOController_{name}",
socket_cls=socket_cls,
socket_host=host,
socket_port=port,
device_manager=device_manager,
)
self._last_readback: float = time.monotonic()
self._readback_metadata: dict[str, float] = {"last_readback": 0.0}
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
self.controller.subscribe(
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
)
@property
def last_readback(self) -> float:
"""Return the time of the last readback from the controller."""
return self._last_readback
# pylint: disable=arguments-differ
def wait_for_connection(self, timeout: float = 30.0) -> None:
"""Wait for the RIO controller to be connected within timeout period."""
@@ -207,7 +280,7 @@ class GalilRIO(PSIDeviceBase):
if __name__ == "__main__":
HOST_NAME = "129.129.98.64"
HOST_NAME = "129.129.122.14"
from bec_server.device_server.tests.utils import DMMock
dm = DMMock()

View File

@@ -25,6 +25,34 @@ logger = bec_logger.logger
class LamniGalilController(GalilController):
# ============================================================
# Error status
# ============================================================
caperr_bits = {
0x01: "Cap1 outside expected left-stop range (early check)",
0x02: "Cap2 outside expected left-stop range (early check)",
0x04: "Cap1 too low during pressure-off check (near right boundary)",
0x08: "Cap2 too low during pressure-off check (near right boundary)",
0x10: "Cap1 exceeded allowed left-stop boundary during movement",
0x20: "Cap2 exceeded allowed left-stop boundary during movement (disabled in code)",
0x40: "Cap1 did not respond to test movement",
0x80: "Cap2 did not respond to test movement"
}
allaxrer_table = {
1: "Not all axes referenced after reference search",
2: "Pressure-loss emergency stop (pressure 14/15 active while motor C off)",
3: "Unexpected pressure OFF while soft-limits not yet set",
4: "Pressure valve mismatch (OUT13=0 but IN13=1)",
5: "Capacitive sensor boundary violations (caperr > 0)",
6: "Emergency Stop triggered (IN[5]=0)",
7: "Following error detected on one or more axes"
}
USER_ACCESS = [
"describe",
"show_running_threads",
@@ -37,6 +65,8 @@ class LamniGalilController(GalilController):
"get_motor_limit_switch",
"is_motor_on",
"all_axes_referenced",
"lamni_lights_off",
"lamni_lights_on"
]
def show_status_other(self):
@@ -60,6 +90,47 @@ class LamniGalilController(GalilController):
print("There is air pressure at the outer rotation radial.")
swver = float(self.socket_put_and_receive("MGswver"))
print(f"Lgalil LAMNI firmware version {swver:2.0f}.")
allaxref = int(float(self.socket_put_and_receive("MGallaxref")))
print(f"Error statuts:")
if allaxref == 1:
print(f"Allaxref = 1, all OK.")
else:
print(f"Allaxref = {allaxref}. Not all axes are referenced or error introduced preventing motion.")
allaxrer = int(float(self.socket_put_and_receive("MGallaxrer")))
print("\nallaxrer =", allaxrer)
print(self.decode_allaxrer(allaxrer))
caperr = int(float(self.socket_put_and_receive("MGcaperr")))
print("\nDecoding caperr =", caperr)
self.visualize_caperr(caperr)
def decode_allaxrer(self, code: int) -> str:
"""Return human-readable meaning of allaxrer code."""
return self.allaxrer_table.get(code, "Unknown allaxrer code")
def visualize_caperr(self, mask: int):
"""Pretty-print a bitmask visualization for caperr."""
print("\n=== CAPERR BITMASK VISUALIZER ===")
print(f"Raw value: {mask} (0x{mask:02X})")
print("----------------------------------\n")
print("Bit | Hex | Active | Meaning")
print("----------------------------------")
for bit, meaning in self.caperr_bits.items():
active = "YES" if mask & bit else "no"
print(f"{bit:3d} | 0x{bit:02X} | {active:6} | {meaning}")
print("\nActive flags:")
active_flags = [meaning for bit, meaning in self.caperr_bits.items() if mask & bit]
if active_flags:
for f in active_flags:
print("", f)
else:
print(" (none)")
print("\n==================================\n")
def lamni_lights_off(self):
self.socket_put_confirmed("SB1")
@@ -93,7 +164,7 @@ class LamniGalilReadbackSignal(GalilSignalRO):
val = super().read()
if self.parent.axis_Id_numeric == 2:
try:
rt = self.parent.device_manager.devices[self.parent.rtx]
rt = self.parent.device_manager.devices[self.parent.rt]
if rt.enabled:
rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"])
except KeyError:
@@ -147,7 +218,7 @@ class LamniGalilMotor(Device, PositionerBase):
raise BECConfigError(
"device_mapping has been specified but the device_manager cannot be accessed."
)
self.rt = self.device_mapping.get("rt")
self.rt = self.device_mapping.get("rt", "rtx")
super().__init__(
prefix,

View File

@@ -11,6 +11,7 @@ from ophyd.status import wait as status_wait
from ophyd.utils import LimitError, ReadOnlyError
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected
from prettytable import PrettyTable
from csaxs_bec.devices.omny.rt.rt_ophyd import RtCommunicationError, RtError
@@ -51,6 +52,7 @@ class RtLamniController(Controller):
_axes_per_controller = 3
USER_ACCESS = [
"socket_put_and_receive",
"socket_put",
"set_rotation_angle",
"feedback_disable",
"feedback_enable_without_reset",
@@ -62,6 +64,11 @@ class RtLamniController(Controller):
"_set_axis_velocity_maximum_speed",
"_position_sampling_single_read",
"_position_sampling_single_reset_and_start_sampling",
"show_signal_strength_interferometer",
"show_interferometer_positions",
"show_analog_signals",
"show_feedback_status",
]
def __init__(
@@ -208,8 +215,9 @@ class RtLamniController(Controller):
@threadlocked
def start_scan(self):
interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0])
if interferometer_feedback_not_running == 1:
# interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0])
# if interferometer_feedback_not_running == 1:
if not self.feedback_is_running():
logger.error(
"Cannot start scan because feedback loop is not running or there is an interferometer error."
)
@@ -270,6 +278,44 @@ class RtLamniController(Controller):
"average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)},
}
return signals
def feedback_is_running(self) -> bool:
status = int(float((self.socket_put_and_receive("J2")).split(",")[0]))
return status == 0 # 0 means running, 1 means error/disabled
def show_feedback_status(self):
if self.feedback_is_running():
print("Loop is running, no error on interferometer.")
else:
print("Loop is not running, either it is turned off or an interferometer error occurred.")
def show_analog_signals(self) -> dict:
self.socket_put("As") # start sampling
time.sleep(0.01)
return_table = (self.socket_put_and_receive("Ar")).split(",")
number_of_samples = int(float(return_table[0]))
signals = {
"number_of_samples": number_of_samples,
"piezo_0": float(return_table[1]),
"piezo_1": float(return_table[2]),
"cap_0": float(return_table[3]),
"cap_1": float(return_table[4]),
"cap_2": float(return_table[5]),
"cap_3": float(return_table[6]),
"cap_4": float(return_table[7]),
}
t = PrettyTable()
t.title = f"LamNI Analog Signals ({number_of_samples} samples)"
t.field_names = ["Signal", "Value"]
for key, val in signals.items():
if key != "number_of_samples":
t.add_row([key, f"{val:.4f}"])
print(t)
return
def read_positions_from_sampler(self):
# this was for reading after the scan completed
@@ -347,6 +393,48 @@ class RtLamniController(Controller):
)
return bool(return_table[0])
def show_signal_strength_interferometer(self):
# trigger SSI averaging before reading
self.socket_put("J3")
time.sleep(0.05)
return_table = (self.socket_put_and_receive("J2")).split(",")
ssi_0 = float(return_table[1])
ssi_1 = float(return_table[2])
return_table_angle = (self.socket_put_and_receive("J7")).split(",")
angle_running = bool(int(float(return_table_angle[0])))
angle_position = float(return_table_angle[1])
angle_signal = float(return_table_angle[2])
t = PrettyTable()
t.title = "Interferometer signal strength"
t.field_names = ["Axis", "Description", "Value", "Running"]
t.add_row([0, "ST FZP horizontal", ssi_0, "-"])
t.add_row([1, "ST FZP vertical", ssi_1, "-"])
t.add_row([2, "Angle interferometer", angle_signal, angle_running])
print(t)
if angle_running:
print(f"Angle interferometer position: {angle_position:.4f} um")
else:
print("Warning: angle interferometer is not running.")
def show_interferometer_positions(self) -> dict:
return_table = (self.socket_put_and_receive("J4")).split(",")
loop_status = bool(int(float(return_table[0])))
pos_y = float(return_table[1])
pos_x = float(return_table[2])
t = PrettyTable()
t.title = "LamNI Interferometer Positions"
t.field_names = ["Axis", "Description", "Position (um)"]
t.add_row([0, "X", f"{pos_x:.4f}"])
t.add_row([1, "Y", f"{pos_y:.4f}"])
print(t)
print(f"Feedback loop running: {loop_status}")
return {"x": pos_x, "y": pos_y, "loop_running": loop_status}
def feedback_enable_with_reset(self):
if not self.feedback_status_angle_lamni():
self.feedback_disable_and_even_reset_lamni_angle_interferometer()

View File

@@ -1,14 +0,0 @@
"""
Conftest runs for all tests in this directory and subdirectories. Thereby, we know for
certain that the SocketSignal.READBACK_TIMEOUT is set to 0 for all tests, which prevents
hanging tests when a readback is attempted on a non-connected socket.
"""
# conftest.py
import pytest
from ophyd_devices.utils.socket import SocketSignal
@pytest.fixture(autouse=True)
def patch_socket_timeout(monkeypatch):
monkeypatch.setattr(SocketSignal, "READBACK_TIMEOUT", 0.0)

View File

@@ -2,6 +2,7 @@ from unittest import mock
import pytest
from ophyd_devices.tests.utils import SocketMock
from ophyd_devices.utils.socket import SocketSignal
from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor
@@ -17,6 +18,11 @@ def fsamroy(dm_with_devices):
socket_cls=SocketMock,
device_manager=dm_with_devices,
)
for walk in fsamroy_motor.walk_signals():
if isinstance(walk.item, SocketSignal):
walk.item._readback_timeout = (
0.0 # Set the readback timeout to 0 to avoid waiting during tests
)
fsamroy_motor.controller.on()
assert isinstance(fsamroy_motor.controller, FuprGalilController)
yield fsamroy_motor

View File

@@ -9,7 +9,11 @@ from ophyd_devices.tests.utils import SocketMock
from csaxs_bec.devices.npoint.npoint import NPointAxis, NPointController
from csaxs_bec.devices.omny.galil.fgalil_ophyd import FlomniGalilController, FlomniGalilMotor
from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor
from csaxs_bec.devices.omny.galil.galil_rio import GalilRIO, GalilRIOController, GalilRIOSignalRO
from csaxs_bec.devices.omny.galil.galil_rio import (
GalilRIO,
GalilRIOAnalogSignalRO,
GalilRIOController,
)
from csaxs_bec.devices.omny.galil.lgalil_ophyd import LamniGalilController, LamniGalilMotor
from csaxs_bec.devices.omny.galil.ogalil_ophyd import OMNYGalilController, OMNYGalilMotor
from csaxs_bec.devices.omny.galil.sgalil_ophyd import GalilController, SGalilMotor
@@ -272,26 +276,27 @@ def test_galil_rio_signal_read(galil_rio):
## Test read of all channels
###########
assert galil_rio.an_ch0._READ_TIMEOUT == 0.1 # Default read timeout of 100ms
assert galil_rio.analog_in.ch0._readback_timeout == 0.1 # Default read timeout of 100ms
# Mock the socket to return specific values
galil_rio.controller.sock.buffer_recv = [b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901"]
galil_rio._last_readback = 0 # Force read from controller
analog_bufffer = b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901\r\n"
galil_rio.controller.sock.buffer_recv = [] # Clear any existing buffer
galil_rio.controller.sock.buffer_recv.append(analog_bufffer)
read_values = galil_rio.read()
assert len(read_values) == 8 # 8 channels
expected_values = {
galil_rio.an_ch0.name: {"value": 1.234},
galil_rio.an_ch1.name: {"value": 2.345},
galil_rio.an_ch2.name: {"value": 3.456},
galil_rio.an_ch3.name: {"value": 4.567},
galil_rio.an_ch4.name: {"value": 5.678},
galil_rio.an_ch5.name: {"value": 6.789},
galil_rio.an_ch6.name: {"value": 7.890},
galil_rio.an_ch7.name: {"value": 8.901},
galil_rio.analog_in.ch0.name: {"value": 1.234},
galil_rio.analog_in.ch1.name: {"value": 2.345},
galil_rio.analog_in.ch2.name: {"value": 3.456},
galil_rio.analog_in.ch3.name: {"value": 4.567},
galil_rio.analog_in.ch4.name: {"value": 5.678},
galil_rio.analog_in.ch5.name: {"value": 6.789},
galil_rio.analog_in.ch6.name: {"value": 7.890},
galil_rio.analog_in.ch7.name: {"value": 8.901},
}
# All timestamps should be the same
assert all(
ret["timestamp"] == read_values[galil_rio.an_ch0.name]["timestamp"]
ret["timestamp"] == read_values[galil_rio.analog_in.ch0.name]["timestamp"]
for signal_name, ret in read_values.items()
)
# Check values
@@ -301,7 +306,7 @@ def test_galil_rio_signal_read(galil_rio):
# Check communication command to socker
assert galil_rio.controller.sock.buffer_put == [
b"MG@AN[0],@AN[1],@AN[2],@AN[3],@AN[4],@AN[5],@AN[6],@AN[7]\r"
b"MG@AN[0], @AN[1], @AN[2], @AN[3], @AN[4], @AN[5], @AN[6], @AN[7]\r"
]
###########
@@ -313,11 +318,11 @@ def test_galil_rio_signal_read(galil_rio):
def value_callback(value, old_value, **kwargs):
obj = kwargs.get("obj")
galil = obj.parent
galil = obj.parent.parent
readback = galil.read()
value_callback_buffer.append(readback)
galil_rio.an_ch0.subscribe(value_callback, run=False)
galil_rio.analog_in.ch0.subscribe(value_callback, run=False)
galil_rio.controller.sock.buffer_recv = [b" 2.5 2.6 2.7 2.8 2.9 3.0 3.1 3.2"]
expected_values = [2.5, 2.6, 2.7, 2.8, 2.9, 3.0, 3.1, 3.2]
@@ -327,13 +332,15 @@ def test_galil_rio_signal_read(galil_rio):
# Should have used the cached value
for walk in galil_rio.walk_signals():
walk.item._READ_TIMEOUT = 10 # Make sure cached read is used
ret = galil_rio.an_ch0.read()
walk.item._readback_timeout = 10 # Make sure cached read is used
ret = galil_rio.analog_in.ch0.read()
# Should not trigger callback since value did not change
assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 1.234)
assert np.isclose(ret[galil_rio.analog_in.ch0.name]["value"], 1.234)
# Same timestamp as for another channel as this is cached read
assert np.isclose(ret[galil_rio.an_ch0.name]["timestamp"], galil_rio.an_ch7.timestamp)
assert np.isclose(
ret[galil_rio.analog_in.ch0.name]["timestamp"], galil_rio.analog_in.ch7.timestamp
)
assert len(value_callback_buffer) == 0
##################
@@ -341,10 +348,10 @@ def test_galil_rio_signal_read(galil_rio):
##################
# Now force a read from the controller
galil_rio._last_readback = 0 # Force read from controller
ret = galil_rio.an_ch0.read()
galil_rio.analog_in.ch0._last_readback = 0 # Force read from controller
ret = galil_rio.analog_in.ch0.read()
assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 2.5)
assert np.isclose(ret[galil_rio.analog_in.ch0.name]["value"], 2.5)
# Check callback invocation, but only 1 callback even with galil_rio.read() call in callback
assert len(value_callback_buffer) == 1
@@ -352,7 +359,45 @@ def test_galil_rio_signal_read(galil_rio):
assert np.isclose(values, expected_values).all()
assert all(
[
value["timestamp"] == value_callback_buffer[0][galil_rio.an_ch0.name]["timestamp"]
value["timestamp"]
== value_callback_buffer[0][galil_rio.analog_in.ch0.name]["timestamp"]
for value in value_callback_buffer[0].values()
]
)
def test_galil_rio_digital_out_signal(galil_rio):
"""
Test that the Galil RIO digital output signal can be set correctly.
"""
## Test Read from digital output channels
buffer_receive = []
excepted_put_buffer = []
for ii in range(galil_rio.digital_out.ch0._NUM_DIGITAL_OUTPUT_CHANNELS):
cmd = f"MG@OUT[{ii}]\r".encode()
excepted_put_buffer.append(cmd)
recv = " 1.000".encode()
buffer_receive.append(recv)
galil_rio.controller.sock.buffer_recv = buffer_receive # Mock response for readback
digital_read = galil_rio.read_configuration() # Read to populate readback values
for walk in galil_rio.digital_out.walk_signals():
assert np.isclose(digital_read[walk.item.name]["value"], 1.0)
assert galil_rio.controller.sock.buffer_put == excepted_put_buffer
# Test writing to digital output channels
galil_rio.controller.sock.buffer_put = [] # Clear buffer put
galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback
# Set digital output channel 0 to high
galil_rio.digital_out.ch0.put(1)
assert galil_rio.controller.sock.buffer_put == [b"SB0\r"]
# Set digital output channel 0 to low
galil_rio.controller.sock.buffer_put = [] # Clear buffer put
galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback
galil_rio.digital_out.ch0.put(0)
assert galil_rio.controller.sock.buffer_put == [b"CB0\r"]