Compare commits
9 Commits
fix/fj_int
...
fixes/lamn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
54f1f42332 | ||
|
|
48df15f35c | ||
|
|
6f60bd4b2b | ||
| 5d97913956 | |||
| 93384b87e0 | |||
| 9a249363fd | |||
| f925a7c1db | |||
| 5811e445fe | |||
| 16ea7f410e |
@@ -6,6 +6,8 @@ from rich.console import Console
|
||||
from rich.table import Table
|
||||
|
||||
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_put, fshclose
|
||||
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
|
||||
|
||||
|
||||
# import builtins to avoid linter errors
|
||||
dev = builtins.__dict__.get("dev")
|
||||
@@ -18,25 +20,32 @@ class LamNIInitError(Exception):
|
||||
|
||||
|
||||
class LaMNIInitStagesMixin:
|
||||
def __init__(self, client):
|
||||
super().__init__()
|
||||
self.client = client
|
||||
self.OMNYTools = OMNYTools(self.client)
|
||||
|
||||
def lamni_init_stages(self):
|
||||
user_input = input("Starting initialization of LamNI stages. OK? [y/n]")
|
||||
if user_input == "y":
|
||||
|
||||
if self.OMNYTools.yesno("Start initialization of LamNI stages. OK?"):
|
||||
print("staring...")
|
||||
dev.lsamrot.enabled = True
|
||||
else:
|
||||
return
|
||||
|
||||
|
||||
|
||||
|
||||
if self.check_all_axes_of_lamni_referenced():
|
||||
user_input = input("Continue anyways? [y/n]")
|
||||
if user_input == "y":
|
||||
if self.OMNYTools.yesno("All axes are referenced. Continue anyways?"):
|
||||
print("ok then...")
|
||||
else:
|
||||
return
|
||||
|
||||
axis_id_lsamrot = dev.lsamrot._config["deviceConfig"].get("axis_Id")
|
||||
if dev.lsamrot.controller.get_motor_limit_switch(axis_id_lsamrot)[1] == False:
|
||||
user_input = input("The rotation stage will be moved to one limit [y/n]")
|
||||
if user_input == "y":
|
||||
|
||||
if self.OMNYTools.yesno("The rotation stage will be moved to one limit"):
|
||||
print("starting...")
|
||||
else:
|
||||
return
|
||||
@@ -47,10 +56,7 @@ class LaMNIInitStagesMixin:
|
||||
print("The controller will be disabled in bec. To enable dev.lsamrot.enabled=True")
|
||||
return
|
||||
|
||||
user_input = input(
|
||||
"Init of loptz. Can the stage move to the upstream limit without collision?? [y/n]"
|
||||
)
|
||||
if user_input == "y":
|
||||
if self.OMNYTools.yesno("Init of loptz. Can the stage move to the upstream limit without collision?"):
|
||||
print("ok then...")
|
||||
else:
|
||||
return
|
||||
@@ -81,8 +87,7 @@ class LaMNIInitStagesMixin:
|
||||
time.sleep(0.1)
|
||||
self.find_reference_mark(dev.lsamrot)
|
||||
|
||||
user_input = input("Init of leye. Can the stage move to -x limit without collision? [y/n]")
|
||||
if user_input == "y":
|
||||
if self.OMNYTools.yesno("Init of leye. Can the stage move to -x limit without collision?"):
|
||||
print("starting...")
|
||||
else:
|
||||
return
|
||||
@@ -134,8 +139,7 @@ class LaMNIInitStagesMixin:
|
||||
return ord(axis_id.lower()) - 97
|
||||
|
||||
def _align_setup(self):
|
||||
user_input = input("Start moving stages to default initial positions? [y/n]")
|
||||
if user_input == "y":
|
||||
if self.OMNYTools.yesno("Start moving stages to default initial positions?"):
|
||||
print("Start moving stages...")
|
||||
else:
|
||||
print("Stopping.")
|
||||
|
||||
@@ -15,13 +15,19 @@ from bec_lib.pdf_writer import PDFWriter
|
||||
from typeguard import typechecked
|
||||
|
||||
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen
|
||||
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
|
||||
|
||||
from .lamni_optics_mixin import LaMNIInitStagesMixin, LamNIOpticsMixin
|
||||
|
||||
logger = bec_logger.logger
|
||||
bec = builtins.__dict__.get("bec")
|
||||
|
||||
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
|
||||
class XrayEyeAlign:
|
||||
# pixel calibration, multiply to get mm
|
||||
# PIXEL_CALIBRATION = 0.2/209 #.2 with binning
|
||||
@@ -510,8 +516,9 @@ class LamNI(LamNIOpticsMixin):
|
||||
def __init__(self, client):
|
||||
super().__init__()
|
||||
self.client = client
|
||||
self.device_manager = client.device_manager
|
||||
self.align = XrayEyeAlign(client, self)
|
||||
self.init = LaMNIInitStagesMixin()
|
||||
self.init = LaMNIInitStagesMixin(client)
|
||||
self.check_shutter = True
|
||||
self.check_light_available = True
|
||||
self.check_fofb = True
|
||||
@@ -524,6 +531,16 @@ class LamNI(LamNIOpticsMixin):
|
||||
self._beam_is_okay = True
|
||||
self._stop_beam_check_event = None
|
||||
self.beam_check_thread = None
|
||||
self.OMNYTools = OMNYTools(self.client)
|
||||
# Progress tracking
|
||||
self.progress = {}
|
||||
self.progress["tomo_type"] = "Equally spaced sub-tomograms"
|
||||
self.progress["subtomo"] = 0
|
||||
self.progress["subtomo_projection"] = 0
|
||||
self.progress["subtomo_total_projections"] = 1
|
||||
self.progress["projection"] = 0
|
||||
self.progress["total_projections"] = 1
|
||||
self.progress["angle"] = 0
|
||||
|
||||
def get_beamline_checks_enabled(self):
|
||||
print(
|
||||
@@ -598,12 +615,12 @@ class LamNI(LamNIOpticsMixin):
|
||||
if val == 1:
|
||||
# equally spaced tomography with 8 sub tomograms
|
||||
self.client.set_global_var("tomo_type", val)
|
||||
# elif val == 2:
|
||||
# # golden ratio tomography (sorted bunches)
|
||||
# self.client.set_global_var("tomo_type", val)
|
||||
# elif val == 3:
|
||||
# # equally spaced tomography with starting angles shifted by golden ratio
|
||||
# self.client.set_global_var("tomo_type", val)
|
||||
elif val == 2:
|
||||
# golden ratio tomography (sorted bunches)
|
||||
self.client.set_global_var("tomo_type", val)
|
||||
elif val == 3:
|
||||
# equally spaced tomography with starting angles shifted by golden ratio
|
||||
self.client.set_global_var("tomo_type", val)
|
||||
else:
|
||||
raise ValueError("Unknown tomo_type.")
|
||||
|
||||
@@ -740,6 +757,41 @@ class LamNI(LamNIOpticsMixin):
|
||||
def tomo_stitch_overlap(self, val: float):
|
||||
self.client.set_global_var("tomo_stitch_overlap", val)
|
||||
|
||||
@property
|
||||
def golden_max_number_of_projections(self):
|
||||
val = self.client.get_global_var("golden_max_number_of_projections")
|
||||
if val is None:
|
||||
return 1000.0
|
||||
return val
|
||||
|
||||
@golden_max_number_of_projections.setter
|
||||
def golden_max_number_of_projections(self, val: float):
|
||||
self.client.set_global_var("golden_max_number_of_projections", val)
|
||||
|
||||
@property
|
||||
def golden_ratio_bunch_size(self):
|
||||
val = self.client.get_global_var("golden_ratio_bunch_size")
|
||||
if val is None:
|
||||
return 20
|
||||
return val
|
||||
|
||||
@golden_ratio_bunch_size.setter
|
||||
def golden_ratio_bunch_size(self, val: float):
|
||||
if val < 20:
|
||||
raise ValueError("golden_ratio_bunch_size must be at least 20.")
|
||||
self.client.set_global_var("golden_ratio_bunch_size", val)
|
||||
|
||||
@property
|
||||
def golden_projections_at_0_deg_for_damage_estimation(self):
|
||||
val = self.client.get_global_var("golden_projections_at_0_deg_for_damage_estimation")
|
||||
if val is None:
|
||||
return 0
|
||||
return val
|
||||
|
||||
@golden_projections_at_0_deg_for_damage_estimation.setter
|
||||
def golden_projections_at_0_deg_for_damage_estimation(self, val: float):
|
||||
self.client.set_global_var("golden_projections_at_0_deg_for_damage_estimation", val)
|
||||
|
||||
@property
|
||||
def sample_name(self):
|
||||
val = self.client.get_global_var("sample_name")
|
||||
@@ -909,6 +961,49 @@ class LamNI(LamNIOpticsMixin):
|
||||
except Exception:
|
||||
logger.warning("Failed to send update to SciLog.")
|
||||
|
||||
def rt_off(self):
|
||||
dev.rtx.enabled = False
|
||||
dev.rty.enabled = False
|
||||
|
||||
def rt_on(self):
|
||||
dev.rtx.enabled = True
|
||||
dev.rty.enabled = True
|
||||
if dev.rtx.enabled == True:
|
||||
print("rt is enabled")
|
||||
else:
|
||||
print("failed to enable rt")
|
||||
|
||||
def feedback_enable_with_reset(self):
|
||||
self.device_manager.devices.rtx.controller.feedback_enable_with_reset()
|
||||
self.feedback_status()
|
||||
|
||||
def feedback_enable_without_reset(self):
|
||||
self.device_manager.devices.rtx.controller.feedback_enable_without_reset()
|
||||
self.feedback_status()
|
||||
|
||||
def feedback_disable(self):
|
||||
self.device_manager.devices.rtx.controller.feedback_disable()
|
||||
self.feedback_status()
|
||||
|
||||
def feedback_disable_and_reset_angle(self):
|
||||
self.device_manager.devices.rtx.controller.feedback_disable_and_even_reset_lamni_angle_interferometer()
|
||||
self.feedback_status()
|
||||
|
||||
def feedback_status(self):
|
||||
if self.device_manager.devices.rtx.controller.feedback_is_running():
|
||||
print("The rt feedback is \x1b[92mrunning\x1b[0m.")
|
||||
else:
|
||||
print("The rt feedback is \x1b[91mNOT\x1b[0m running.")
|
||||
|
||||
def show_interferometer_positions(self):
|
||||
self.device_manager.devices.rtx.controller.show_interferometer_positions()
|
||||
|
||||
def show_signal_strength(self):
|
||||
self.device_manager.devices.rtx.controller.show_signal_strength_interferometer()
|
||||
|
||||
def show_analog_signals(self):
|
||||
return self.device_manager.devices.rtx.controller.show_analog_signals()
|
||||
|
||||
def add_sample_database(
|
||||
self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user
|
||||
):
|
||||
@@ -935,9 +1030,8 @@ class LamNI(LamNIOpticsMixin):
|
||||
# self.tomo_scan_projection(angle)
|
||||
# self.tomo_reconstruct()
|
||||
|
||||
def sub_tomo_scan(self, subtomo_number, start_angle=None):
|
||||
"""start a subtomo"""
|
||||
dev = builtins.__dict__.get("dev")
|
||||
def _write_subtomo_to_scilog(self, subtomo_number):
|
||||
"""Write subtomo start information to scilog."""
|
||||
bec = builtins.__dict__.get("bec")
|
||||
if self.tomo_id > 0:
|
||||
tags = ["BEC_subtomo", self.sample_name, f"tomo_id_{self.tomo_id}"]
|
||||
@@ -948,6 +1042,10 @@ class LamNI(LamNIOpticsMixin):
|
||||
tags,
|
||||
)
|
||||
|
||||
def sub_tomo_scan(self, subtomo_number, start_angle=None):
|
||||
"""start a subtomo"""
|
||||
self._write_subtomo_to_scilog(subtomo_number)
|
||||
|
||||
if start_angle is None:
|
||||
if subtomo_number == 1:
|
||||
start_angle = 0
|
||||
@@ -980,13 +1078,14 @@ class LamNI(LamNIOpticsMixin):
|
||||
if not (subtomo_number % 2):
|
||||
angles = np.flip(angles)
|
||||
for angle in angles:
|
||||
self.progress["tomo_type"] = "Equally spaced sub-tomograms"
|
||||
self.progress["subtomo"] = subtomo_number
|
||||
self.progress["subtomo_projection"] = angles.index(angle)
|
||||
self.progress["subtomo_total_projections"] = 180 / self.tomo_angle_stepsize
|
||||
self.progress["subtomo_projection"] = np.where(angles == angle)[0][0]
|
||||
self.progress["subtomo_total_projections"] = 360 / self.tomo_angle_stepsize
|
||||
self.progress["projection"] = (subtomo_number - 1) * self.progress[
|
||||
"subtomo_total_projections"
|
||||
] + self.progress["subtomo_projection"]
|
||||
self.progress["total_projections"] = 180 / self.tomo_angle_stepsize * 8
|
||||
self.progress["total_projections"] = 360 / self.tomo_angle_stepsize * 8
|
||||
self.progress["angle"] = angle
|
||||
self._tomo_scan_at_angle(angle, subtomo_number)
|
||||
|
||||
@@ -1049,6 +1148,11 @@ class LamNI(LamNIOpticsMixin):
|
||||
for scan_nr in range(start_scan_number, end_scan_number):
|
||||
self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
|
||||
|
||||
if self._was_beam_okay() and not error_caught:
|
||||
successful = True
|
||||
else:
|
||||
self._wait_for_beamline_checks()
|
||||
|
||||
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None:
|
||||
tomo_scan_numbers_file = os.path.expanduser(
|
||||
"~/Data10/specES1/dat-files/tomography_scannumbers.txt"
|
||||
@@ -1059,13 +1163,74 @@ class LamNI(LamNIOpticsMixin):
|
||||
f"{scan_number} {angle} {dev.lsamrot.read()['lsamrot']['value']:.3f} {self.tomo_id} {subtomo_number} {0} {'lamni'}\n"
|
||||
)
|
||||
|
||||
def tomo_scan(self, subtomo_start=1, start_angle=None):
|
||||
"""start a tomo scan"""
|
||||
def _golden(self, ii, howmany_sorted, maxangle=360, reverse=False):
|
||||
"""Return the ii-th golden ratio angle within sorted bunches of size howmany_sorted,
|
||||
and its subtomo number. Operates over maxangle degrees (360 for LamNI)."""
|
||||
golden = []
|
||||
for iji in range(
|
||||
(ii - (ii % howmany_sorted)), (ii - (ii % howmany_sorted)) + howmany_sorted, 1
|
||||
):
|
||||
golden.append(
|
||||
((iji * maxangle * (1 + pow(5, 0.5)) / 2) * 1000 % (maxangle * 1000)) / 1000
|
||||
)
|
||||
golden.sort()
|
||||
subtomo_number = int(ii / howmany_sorted) + 1
|
||||
if reverse and not subtomo_number % 2:
|
||||
golden.reverse()
|
||||
return (golden[ii % howmany_sorted], subtomo_number)
|
||||
|
||||
def _golden_equally_spaced(self, ii, number_of_projections_per_subtomo, maxangle=360, reverse=True, verbose=False):
|
||||
"""Return angles for equally spaced tomography with sub-tomogram starting angles
|
||||
shifted according to the golden ratio. Operates over maxangle degrees (360 for LamNI).
|
||||
ii is the projection number starting at 0."""
|
||||
angular_step = maxangle / number_of_projections_per_subtomo
|
||||
subtomo_number = int(((ii) * angular_step) / maxangle) + 1
|
||||
start_angle = self._golden(subtomo_number - 1, 1, angular_step)[0]
|
||||
projection_number_of_subtomo = (
|
||||
ii - (subtomo_number - 1) * number_of_projections_per_subtomo
|
||||
)
|
||||
|
||||
if reverse:
|
||||
if subtomo_number % 2:
|
||||
angle = start_angle + projection_number_of_subtomo * angular_step
|
||||
else:
|
||||
angle = (
|
||||
start_angle
|
||||
+ (number_of_projections_per_subtomo - 1) * angular_step
|
||||
- projection_number_of_subtomo * angular_step
|
||||
)
|
||||
else:
|
||||
angle = start_angle + projection_number_of_subtomo * angular_step
|
||||
|
||||
if verbose:
|
||||
print(
|
||||
f"Equally spaced golden ratio tomography.\n"
|
||||
f"Angular step: {angular_step}\n"
|
||||
f"Subtomo Number: {subtomo_number}\n"
|
||||
f"Angle: {angle}"
|
||||
)
|
||||
|
||||
return angle, subtomo_number
|
||||
|
||||
def tomo_scan(self, subtomo_start=1, start_angle=None, projection_number=None):
|
||||
"""Start a tomo scan.
|
||||
|
||||
Args:
|
||||
subtomo_start (int): For tomo_type 1, the sub-tomogram number to start from. Defaults to 1.
|
||||
start_angle (float, optional): Override the starting angle of the first sub-tomogram. Defaults to None.
|
||||
projection_number (int, optional): For tomo_types 2 and 3, resume from this projection index. Defaults to None.
|
||||
"""
|
||||
bec = builtins.__dict__.get("bec")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
self._current_special_angles = self.special_angles.copy()
|
||||
|
||||
if self.tomo_type == 1 and subtomo_start == 1 and start_angle is None:
|
||||
# Register a new tomo scan in the database and write the PDF report
|
||||
# only when starting fresh (not resuming mid-scan)
|
||||
if (
|
||||
(self.tomo_type == 1 and subtomo_start == 1 and start_angle is None)
|
||||
or (self.tomo_type == 2 and projection_number is None)
|
||||
or (self.tomo_type == 3 and projection_number is None)
|
||||
):
|
||||
# pylint: disable=undefined-variable
|
||||
self.tomo_id = self.add_sample_database(
|
||||
self.sample_name,
|
||||
@@ -1077,10 +1242,108 @@ class LamNI(LamNIOpticsMixin):
|
||||
"BEC",
|
||||
)
|
||||
self.write_pdf_report()
|
||||
|
||||
with scans.dataset_id_on_hold:
|
||||
for ii in range(subtomo_start, 9):
|
||||
self.sub_tomo_scan(ii, start_angle=start_angle)
|
||||
start_angle = None
|
||||
if self.tomo_type == 1:
|
||||
# 8 equally spaced sub-tomograms over 360 degrees
|
||||
self.progress["tomo_type"] = "Equally spaced sub-tomograms"
|
||||
for ii in range(subtomo_start, 9):
|
||||
self.sub_tomo_scan(ii, start_angle=start_angle)
|
||||
start_angle = None
|
||||
|
||||
elif self.tomo_type == 2:
|
||||
# Golden ratio tomography (sorted bunches) over 360 degrees
|
||||
self.progress["tomo_type"] = "Golden ratio tomography"
|
||||
previous_subtomo_number = -1
|
||||
ii = 0 if projection_number is None else projection_number
|
||||
while True:
|
||||
angle, subtomo_number = self._golden(
|
||||
ii, self.golden_ratio_bunch_size, maxangle=360, reverse=True
|
||||
)
|
||||
if previous_subtomo_number != subtomo_number:
|
||||
self._write_subtomo_to_scilog(subtomo_number)
|
||||
if (
|
||||
subtomo_number % 2 == 1
|
||||
and ii > 10
|
||||
and self.golden_projections_at_0_deg_for_damage_estimation == 1
|
||||
):
|
||||
self._tomo_scan_at_angle(0, subtomo_number)
|
||||
previous_subtomo_number = subtomo_number
|
||||
|
||||
self.progress["subtomo"] = subtomo_number
|
||||
self.progress["projection"] = ii
|
||||
self.progress["angle"] = angle
|
||||
if self.golden_ratio_bunch_size > 0:
|
||||
self.progress["subtomo_total_projections"] = self.golden_ratio_bunch_size
|
||||
self.progress["subtomo_projection"] = (
|
||||
ii - (subtomo_number - 1) * self.golden_ratio_bunch_size
|
||||
)
|
||||
else:
|
||||
self.progress["subtomo_total_projections"] = 0
|
||||
self.progress["subtomo_projection"] = 0
|
||||
self.progress["total_projections"] = (
|
||||
self.golden_max_number_of_projections
|
||||
if self.golden_max_number_of_projections > 0
|
||||
else 0
|
||||
)
|
||||
|
||||
self._tomo_scan_at_angle(angle, subtomo_number)
|
||||
ii += 1
|
||||
if (
|
||||
self.golden_max_number_of_projections > 0
|
||||
and ii > self.golden_max_number_of_projections
|
||||
):
|
||||
print(
|
||||
f"Golden ratio tomography stopped automatically after the requested"
|
||||
f" {self.golden_max_number_of_projections} projections."
|
||||
)
|
||||
break
|
||||
|
||||
elif self.tomo_type == 3:
|
||||
# Equally spaced tomography with golden ratio starting angles over 360 degrees
|
||||
self.progress["tomo_type"] = "Equally spaced, golden ratio starting angles"
|
||||
previous_subtomo_number = -1
|
||||
ii = 0 if projection_number is None else projection_number
|
||||
while True:
|
||||
angle, subtomo_number = self._golden_equally_spaced(
|
||||
ii, int(360 / self.tomo_angle_stepsize), maxangle=360, reverse=True
|
||||
)
|
||||
if previous_subtomo_number != subtomo_number:
|
||||
self._write_subtomo_to_scilog(subtomo_number)
|
||||
if (
|
||||
subtomo_number % 2 == 1
|
||||
and ii > 10
|
||||
and self.golden_projections_at_0_deg_for_damage_estimation == 1
|
||||
):
|
||||
self._tomo_scan_at_angle(0, subtomo_number)
|
||||
previous_subtomo_number = subtomo_number
|
||||
|
||||
self.progress["subtomo"] = subtomo_number
|
||||
self.progress["projection"] = ii
|
||||
self.progress["angle"] = angle
|
||||
self.progress["subtomo_total_projections"] = 360 / self.tomo_angle_stepsize
|
||||
self.progress["subtomo_projection"] = (
|
||||
ii - (subtomo_number - 1) * self.progress["subtomo_total_projections"]
|
||||
)
|
||||
self.progress["total_projections"] = (
|
||||
self.golden_max_number_of_projections
|
||||
if self.golden_max_number_of_projections > 0
|
||||
else 0
|
||||
)
|
||||
|
||||
self._tomo_scan_at_angle(angle, subtomo_number)
|
||||
ii += 1
|
||||
if (
|
||||
self.golden_max_number_of_projections > 0
|
||||
and ii > self.golden_max_number_of_projections
|
||||
):
|
||||
print(
|
||||
f"Golden ratio tomography stopped automatically after the requested"
|
||||
f" {self.golden_max_number_of_projections} projections."
|
||||
)
|
||||
break
|
||||
else:
|
||||
raise ValueError(f"Unknown tomo_type: {self.tomo_type}.")
|
||||
|
||||
def tomo_parameters(self):
|
||||
"""print and update the tomo parameters"""
|
||||
@@ -1103,9 +1366,37 @@ class LamNI(LamNIOpticsMixin):
|
||||
print(f" _tomo_fovy_offset <mm> = {self.align.tomo_fovy_offset}")
|
||||
print(f" _manual_shift_x <mm> = {self.manual_shift_x}")
|
||||
print(f" _manual_shift_y <mm> = {self.manual_shift_y}")
|
||||
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
|
||||
print(f"Resulting in number of projections: {360/self.tomo_angle_stepsize*8}")
|
||||
print(f"Sample name: {self.sample_name}\n")
|
||||
print("")
|
||||
if self.tomo_type == 1:
|
||||
print("\x1b[1mTomo type 1:\x1b[0m 8 equally spaced sub-tomograms")
|
||||
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
|
||||
print(f"Resulting in number of projections: {360/self.tomo_angle_stepsize*8}")
|
||||
elif self.tomo_type == 2:
|
||||
print("\x1b[1mTomo type 2:\x1b[0m Golden ratio tomography")
|
||||
print(f"Sorted in bunches of: {self.golden_ratio_bunch_size}")
|
||||
if self.golden_max_number_of_projections > 0:
|
||||
print(f"Ending after {self.golden_max_number_of_projections} projections.")
|
||||
else:
|
||||
print("Ending by manual interruption.")
|
||||
if self.golden_projections_at_0_deg_for_damage_estimation == 1:
|
||||
print(
|
||||
"Repeating projections at 0 degrees at the beginning of every second subtomogram."
|
||||
)
|
||||
elif self.tomo_type == 3:
|
||||
print(
|
||||
"\x1b[1mTomo type 3:\x1b[0m Equally spaced tomography, golden ratio starting angle"
|
||||
)
|
||||
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
|
||||
print(f"Number of projections per sub-tomogram: {360/self.tomo_angle_stepsize}")
|
||||
if self.golden_max_number_of_projections > 0:
|
||||
print(f"Ending after {self.golden_max_number_of_projections} projections.")
|
||||
else:
|
||||
print("Ending by manual interruption.")
|
||||
if self.golden_projections_at_0_deg_for_damage_estimation == 1:
|
||||
print(
|
||||
"Repeating projections at 0 degrees at the beginning of every second subtomogram."
|
||||
)
|
||||
print(f"\nSample name: {self.sample_name}\n")
|
||||
|
||||
user_input = input("Are these parameters correctly set for your scan? ")
|
||||
if user_input == "y":
|
||||
@@ -1125,13 +1416,61 @@ class LamNI(LamNIOpticsMixin):
|
||||
self.ptycho_reconstruct_foldername = self._get_val(
|
||||
"Reconstruction queue ", self.ptycho_reconstruct_foldername, str
|
||||
)
|
||||
tomo_numberofprojections = self._get_val(
|
||||
"Number of projections", 360 / self.tomo_angle_stepsize * 8, int
|
||||
)
|
||||
|
||||
print(f"The angular step will be {360/tomo_numberofprojections}")
|
||||
self.tomo_angle_stepsize = 360 / tomo_numberofprojections * 8
|
||||
print(f"The angular step in a subtomogram it will be {self.tomo_angle_stepsize}")
|
||||
print("Tomography type:")
|
||||
print(" 1: 8 equally spaced sub-tomograms (360 deg)")
|
||||
print(" 2: Golden ratio tomography")
|
||||
print(" 3: Equally spaced tomography, golden ratio starting angle")
|
||||
self.tomo_type = self._get_val("Tomography type", self.tomo_type, int)
|
||||
|
||||
if self.tomo_type == 1:
|
||||
tomo_numberofprojections = self._get_val(
|
||||
"Number of projections", 360 / self.tomo_angle_stepsize * 8, int
|
||||
)
|
||||
print(f"The angular step will be {360/tomo_numberofprojections}")
|
||||
self.tomo_angle_stepsize = 360 / tomo_numberofprojections * 8
|
||||
print(f"The angular step in a subtomogram it will be {self.tomo_angle_stepsize}")
|
||||
|
||||
elif self.tomo_type == 2:
|
||||
while True:
|
||||
bunch_size = self._get_val(
|
||||
"Number of projections sorted per bunch (minimum 20)",
|
||||
self.golden_ratio_bunch_size,
|
||||
int,
|
||||
)
|
||||
if bunch_size >= 20:
|
||||
self.golden_ratio_bunch_size = bunch_size
|
||||
break
|
||||
print("Bunch size must be at least 20. Please try again.")
|
||||
self.golden_max_number_of_projections = self._get_val(
|
||||
"Stop after number of projections (0 for endless)",
|
||||
self.golden_max_number_of_projections,
|
||||
int,
|
||||
)
|
||||
self.golden_projections_at_0_deg_for_damage_estimation = self._get_val(
|
||||
"Repeat projections at 0 deg every second subtomo 1/0?",
|
||||
self.golden_projections_at_0_deg_for_damage_estimation,
|
||||
int,
|
||||
)
|
||||
|
||||
elif self.tomo_type == 3:
|
||||
numprj = self._get_val(
|
||||
"Number of projections per sub-tomogram",
|
||||
int(360 / self.tomo_angle_stepsize),
|
||||
int,
|
||||
)
|
||||
self.tomo_angle_stepsize = 360 / numprj
|
||||
self.golden_max_number_of_projections = self._get_val(
|
||||
"Stop after number of projections (0 for endless)",
|
||||
self.golden_max_number_of_projections,
|
||||
int,
|
||||
)
|
||||
self.golden_projections_at_0_deg_for_damage_estimation = self._get_val(
|
||||
"Repeat projections at 0 deg every second subtomo 1/0?",
|
||||
self.golden_projections_at_0_deg_for_damage_estimation,
|
||||
int,
|
||||
)
|
||||
|
||||
self.sample_name = self._get_val("sample name", self.sample_name, str)
|
||||
|
||||
@staticmethod
|
||||
@@ -1197,6 +1536,7 @@ class LamNI(LamNIOpticsMixin):
|
||||
(
|
||||
f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n"
|
||||
),
|
||||
f"{'Tomo type:':<{padding}}{self.tomo_type:>{padding}}\n",
|
||||
]
|
||||
content = "".join(content)
|
||||
user_target = os.path.expanduser(f"~/Data10/documentation/tomo_scan_ID_{self.tomo_id}.pdf")
|
||||
@@ -1389,3 +1729,4 @@ class DataDrivenLamNI(LamNI):
|
||||
shapes.append(data.shape)
|
||||
if len(set(shapes)) > 1:
|
||||
raise ValueError(f"Tomo data file has entries of inconsistent lengths: {shapes}.")
|
||||
|
||||
@@ -35,32 +35,32 @@ class FlomniInitError(Exception):
|
||||
class FlomniError(Exception):
|
||||
pass
|
||||
|
||||
class FlomniTools:
|
||||
def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
|
||||
if autoconfirm and default == "y":
|
||||
self.printgreen(message + " Automatically confirming default: yes")
|
||||
return True
|
||||
elif autoconfirm and default == "n":
|
||||
self.printgreen(message + " Automatically confirming default: no")
|
||||
return False
|
||||
if default == "y":
|
||||
message_ending = " [Y]/n? "
|
||||
elif default == "n":
|
||||
message_ending = " y/[N]? "
|
||||
else:
|
||||
message_ending = " y/n? "
|
||||
while True:
|
||||
user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
|
||||
if (
|
||||
user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
|
||||
) or (default == "y" and user_input == ""):
|
||||
return True
|
||||
if (
|
||||
user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
|
||||
) or (default == "n" and user_input == ""):
|
||||
return False
|
||||
else:
|
||||
print("Please expicitely confirm y or n.")
|
||||
# class FlomniTools:
|
||||
# def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
|
||||
# if autoconfirm and default == "y":
|
||||
# self.printgreen(message + " Automatically confirming default: yes")
|
||||
# return True
|
||||
# elif autoconfirm and default == "n":
|
||||
# self.printgreen(message + " Automatically confirming default: no")
|
||||
# return False
|
||||
# if default == "y":
|
||||
# message_ending = " [Y]/n? "
|
||||
# elif default == "n":
|
||||
# message_ending = " y/[N]? "
|
||||
# else:
|
||||
# message_ending = " y/n? "
|
||||
# while True:
|
||||
# user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
|
||||
# if (
|
||||
# user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
|
||||
# ) or (default == "y" and user_input == ""):
|
||||
# return True
|
||||
# if (
|
||||
# user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
|
||||
# ) or (default == "n" and user_input == ""):
|
||||
# return False
|
||||
# else:
|
||||
# print("Please expicitely confirm y or n.")
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ This module contains the base class for Galil controllers as well as the signals
|
||||
|
||||
import functools
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from bec_lib import bec_logger
|
||||
from ophyd.utils import ReadOnlyError
|
||||
@@ -347,7 +348,7 @@ class GalilSignalBase(SocketSignal):
|
||||
def __init__(self, signal_name, **kwargs):
|
||||
self.signal_name = signal_name
|
||||
super().__init__(**kwargs)
|
||||
self.controller = self.parent.controller
|
||||
self.controller = self.root.controller if hasattr(self.root, "controller") else None
|
||||
|
||||
|
||||
class GalilSignalRO(GalilSignalBase):
|
||||
|
||||
@@ -6,24 +6,26 @@ Link to the Galil RIO vendor page:
|
||||
https://www.galil.com/plcs/remote-io/rio-471xx
|
||||
|
||||
This module provides the GalilRIOController for communication with the RIO controller
|
||||
over TCP/IP. It also provides a device integration that interfaces to these
|
||||
8 analog channels.
|
||||
over TCP/IP. It also provides a device integration that interfaces to its
|
||||
8 analog channels, and 16 digital output channels. Some PLCs may have 24 digital output channels,
|
||||
which can be easily supported by changing the _NUM_DIGITAL_OUTPUT_CHANNELS variable.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import DynamicDeviceComponent as DDC
|
||||
from ophyd import Kind
|
||||
from ophyd.utils import ReadOnlyError
|
||||
from ophyd_devices import PSIDeviceBase
|
||||
from ophyd_devices.utils.controller import Controller, threadlocked
|
||||
from ophyd_devices.utils.socket import SocketIO
|
||||
|
||||
from csaxs_bec.devices.omny.galil.galil_ophyd import (
|
||||
GalilCommunicationError,
|
||||
GalilSignalRO,
|
||||
GalilSignalBase,
|
||||
retry_once,
|
||||
)
|
||||
|
||||
@@ -35,15 +37,11 @@ logger = bec_logger.logger
|
||||
|
||||
|
||||
class GalilRIOController(Controller):
|
||||
"""
|
||||
Controller Class for Galil RIO controller communication.
|
||||
|
||||
Multiple controllers are in use at the cSAXS beamline:
|
||||
- 129.129.98.64 (port 23)
|
||||
"""
|
||||
"""Controller Class for Galil RIO controller communication."""
|
||||
|
||||
@threadlocked
|
||||
def socket_put(self, val: str) -> None:
|
||||
"""Socker put method."""
|
||||
self.sock.put(f"{val}\r".encode())
|
||||
|
||||
@retry_once
|
||||
@@ -64,21 +62,95 @@ class GalilRIOController(Controller):
|
||||
)
|
||||
|
||||
|
||||
class GalilRIOSignalRO(GalilSignalRO):
|
||||
class GalilRIOAnalogSignalRO(GalilSignalBase):
|
||||
"""
|
||||
Read-only Signal for reading a single analog input channel from the Galil RIO controller.
|
||||
It always read all 8 analog channels at once, and updates the reabacks of all channels.
|
||||
New readbacks are only fetched from the controller if the last readback is older than
|
||||
_READ_TIMEOUT seconds, otherwise the last cached readback is returned to reduce network traffic.
|
||||
Signal for reading analog input channels of the Galil RIO controller. This signal is read-only, so
|
||||
the set method raises a ReadOnlyError. The get method retrieves the values of all analog
|
||||
channels in a single socket command. The readback values of all channels are updated based
|
||||
on the response, and subscriptions are run for all channels. Readings are cached as implemented
|
||||
in the SocketSignal class, so that multiple reads of the same channel within an update cycle do
|
||||
not result in multiple socket calls.
|
||||
|
||||
Args:
|
||||
signal_name (str): Name of the signal.
|
||||
channel (int): Analog channel number (0-7).
|
||||
parent (GalilRIO): Parent GalilRIO device.
|
||||
signal_name (str): The name of the signal, e.g. "ch0", "ch1", ..., "ch7"
|
||||
channel (int): The channel number corresponding to the signal, e.g. 0 for "ch0", 1 for "ch1", ...
|
||||
parent (GalilRIO): The parent device instance that this signal belongs to.
|
||||
"""
|
||||
|
||||
_NUM_ANALOG_CHANNELS = 8
|
||||
_READ_TIMEOUT = 0.1 # seconds
|
||||
|
||||
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
|
||||
super().__init__(signal_name=signal_name, parent=parent, **kwargs)
|
||||
self._channel = channel
|
||||
self._metadata["connected"] = False
|
||||
self._metadata["write_access"] = False
|
||||
|
||||
def _socket_set(self, val):
|
||||
"""Read-only signal, so set method raises an error."""
|
||||
raise ReadOnlyError(f"Signal {self.name} is read-only.")
|
||||
|
||||
def _socket_get(self) -> float:
|
||||
"""Get command for the readback signal"""
|
||||
cmd = "MG@" + ", @".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)])
|
||||
ret = self.controller.socket_put_and_receive(cmd)
|
||||
values = [float(val) for val in ret.strip().split(" ")]
|
||||
# Run updates for all channels. This also updates the _readback and metadata timestamp
|
||||
# value of this channel.
|
||||
self._update_all_channels(values)
|
||||
return self._readback
|
||||
|
||||
# pylint: disable=protected-access
|
||||
def _update_all_channels(self, values: list[float]) -> None:
|
||||
"""
|
||||
Method to receive a list of readback values for channels 0 to 7. Updates for each channel idx
|
||||
are applied to the corresponding GalilRIOAnalogSignalRO signal with matching attr_name "ch{idx}".
|
||||
|
||||
We also update the _last_readback attribute of each of the signals, to avoid multiple socket calls,
|
||||
but rather use the cached value of the combined reading for all channels.
|
||||
|
||||
Args:
|
||||
values (list[float]): List of new readback values for all channels, where the
|
||||
index corresponds to the channel number (0-7).
|
||||
"""
|
||||
updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val)
|
||||
# Update all readbacks first
|
||||
for walk in self.parent.walk_signals():
|
||||
if isinstance(walk.item, GalilRIOAnalogSignalRO):
|
||||
idx = int(walk.item.attr_name[-1])
|
||||
if 0 <= idx < len(values):
|
||||
old_val = walk.item._readback
|
||||
new_val = values[idx]
|
||||
walk.item._metadata["timestamp"] = self._last_readback
|
||||
walk.item._last_readback = self._last_readback
|
||||
walk.item._readback = new_val
|
||||
if (
|
||||
idx != self._channel
|
||||
): # Only run subscriptions on other channels, not on itself
|
||||
# as this is handled by the SocketSignal and we want to avoid running multiple
|
||||
# subscriptions for the same channel update
|
||||
updates[walk.item.attr_name] = (new_val, old_val)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Received {len(values)} values but found channel index {idx} in signal {walk.item.name}. Skipping update for this signal."
|
||||
)
|
||||
|
||||
# Run subscriptions after all readbacks have been updated
|
||||
# on all channels except the one that triggered the update
|
||||
for walk in self.parent.walk_signals():
|
||||
if walk.item.attr_name in updates:
|
||||
new_val, old_val = updates[walk.item.attr_name]
|
||||
walk.item._run_subs(
|
||||
sub_type=walk.item.SUB_VALUE,
|
||||
old_value=old_val,
|
||||
value=new_val,
|
||||
timestamp=self._last_readback,
|
||||
)
|
||||
|
||||
|
||||
class GalilRIODigitalOutSignal(GalilSignalBase):
|
||||
"""Signal for controlling digital outputs of the Galil RIO controller."""
|
||||
|
||||
_NUM_DIGITAL_OUTPUT_CHANNELS = 16
|
||||
|
||||
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
|
||||
super().__init__(signal_name, parent=parent, **kwargs)
|
||||
@@ -87,81 +159,83 @@ class GalilRIOSignalRO(GalilSignalRO):
|
||||
|
||||
def _socket_get(self) -> float:
|
||||
"""Get command for the readback signal"""
|
||||
cmd = "MG@" + ",@".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)])
|
||||
cmd = f"MG@OUT[{self._channel}]"
|
||||
ret = self.controller.socket_put_and_receive(cmd)
|
||||
values = [float(val) for val in ret.strip().split(" ")]
|
||||
# This updates all channels' readbacks, including self._readback
|
||||
self._update_all_channels(values)
|
||||
self._readback = float(ret.strip())
|
||||
return self._readback
|
||||
|
||||
def get(self):
|
||||
"""Get current analog channel values from the Galil RIO controller."""
|
||||
# If the last readback has happend more than _READ_TIMEOUT seconds ago, read all channels again
|
||||
if time.monotonic() - self.parent.last_readback > self._READ_TIMEOUT:
|
||||
self._readback = self._socket_get()
|
||||
return self._readback
|
||||
def _socket_set(self, val: Literal[0, 1]) -> None:
|
||||
"""Set command for the digital output signal. Value should be 0 or 1."""
|
||||
|
||||
# pylint: disable=protected-access
|
||||
def _update_all_channels(self, values: list[float]) -> None:
|
||||
"""
|
||||
Update all analog channel readbacks based on the provided list of values.
|
||||
List of values must be in order from an_ch0 to an_ch7.
|
||||
if val not in (0, 1):
|
||||
raise ValueError("Digital output value must be 0 or 1.")
|
||||
cmd = f"SB{self._channel}" if val == 1 else f"CB{self._channel}"
|
||||
self.controller.socket_put_confirmed(cmd)
|
||||
|
||||
We first have to update the _last_readback timestamp of the GalilRIO parent device.
|
||||
Then we update all readbacks of all an_ch channels, before we run any subscriptions.
|
||||
This ensures that all readbacks are updated before any subscriptions are run, which
|
||||
may themselves read other channels.
|
||||
|
||||
Args:
|
||||
values (list[float]): List of 8 float values corresponding to the analog channels.
|
||||
They must be in order from an_ch0 to an_ch7.
|
||||
"""
|
||||
timestamp = time.time()
|
||||
# Update parent's last readback before running subscriptions!!
|
||||
self.parent._last_readback = time.monotonic()
|
||||
updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val)
|
||||
# Update all readbacks first
|
||||
for walk in self.parent.walk_signals():
|
||||
if walk.item.attr_name.startswith("an_ch"):
|
||||
idx = int(walk.item.attr_name[-1])
|
||||
if 0 <= idx < len(values):
|
||||
old_val = walk.item._readback
|
||||
new_val = values[idx]
|
||||
walk.item._metadata["timestamp"] = timestamp
|
||||
walk.item._readback = new_val
|
||||
updates[walk.item.attr_name] = (new_val, old_val)
|
||||
def _create_analog_channels(num_channels: int) -> dict[str, tuple]:
|
||||
"""
|
||||
Helper method to create a dictionary of analog channel definitions for the DynamicDeviceComponent.
|
||||
|
||||
# Run subscriptions after all readbacks have been updated
|
||||
for walk in self.parent.walk_signals():
|
||||
if walk.item.attr_name in updates:
|
||||
new_val, old_val = updates[walk.item.attr_name]
|
||||
walk.item._run_subs(
|
||||
sub_type=walk.item.SUB_VALUE,
|
||||
old_value=old_val,
|
||||
value=new_val,
|
||||
timestamp=timestamp,
|
||||
)
|
||||
Args:
|
||||
num_channels (int): The number of analog channels to create.
|
||||
"""
|
||||
an_channels = {}
|
||||
for i in range(0, num_channels):
|
||||
an_channels[f"ch{i}"] = (
|
||||
GalilRIOAnalogSignalRO,
|
||||
f"ch{i}",
|
||||
{"kind": Kind.normal, "notify_bec": True, "channel": i, "doc": f"Analog channel {i}."},
|
||||
)
|
||||
return an_channels
|
||||
|
||||
|
||||
def _create_digital_output_channels(num_channels: int) -> dict[str, tuple]:
|
||||
"""
|
||||
Helper method to create a dictionary of digital output channel definitions for the DynamicDeviceComponent.
|
||||
|
||||
Args:
|
||||
num_channels (int): The number of digital output channels to create.
|
||||
"""
|
||||
di_out_channels = {}
|
||||
for i in range(0, num_channels):
|
||||
di_out_channels[f"ch{i}"] = (
|
||||
GalilRIODigitalOutSignal,
|
||||
f"ch{i}",
|
||||
{
|
||||
"kind": Kind.config,
|
||||
"notify_bec": True,
|
||||
"channel": i,
|
||||
"doc": f"Digital output channel {i}.",
|
||||
},
|
||||
)
|
||||
return di_out_channels
|
||||
|
||||
|
||||
class GalilRIO(PSIDeviceBase):
|
||||
"""
|
||||
Galil RIO controller integration with 8 analog input channels. To implement the device,
|
||||
please provide the appropriate host and port (default port is 23).
|
||||
Galil RIO controller integration with 16 digital output channels and 8 analog input channels.
|
||||
The default port for the controller is 23.
|
||||
|
||||
Args:
|
||||
host (str): Hostname or IP address of the Galil RIO controller.
|
||||
port (int, optional): Port number for the TCP/IP connection. Defaults to 23.
|
||||
socket_cls (type[SocketIO], optional): Socket class to use for communication. Defaults to SocketIO.
|
||||
scan_info (ScanInfo, optional): ScanInfo object for the device.
|
||||
device_manager (DeviceManagerDS): The device manager instance that manages this device.
|
||||
**kwargs: Additional keyword arguments passed to the PSIDeviceBase constructor.
|
||||
"""
|
||||
|
||||
SUB_CONNECTION_CHANGE = "connection_change"
|
||||
|
||||
an_ch0 = Cpt(GalilRIOSignalRO, signal_name="an_ch0", channel=0, doc="Analog input channel 0")
|
||||
an_ch1 = Cpt(GalilRIOSignalRO, signal_name="an_ch1", channel=1, doc="Analog input channel 1")
|
||||
an_ch2 = Cpt(GalilRIOSignalRO, signal_name="an_ch2", channel=2, doc="Analog input channel 2")
|
||||
an_ch3 = Cpt(GalilRIOSignalRO, signal_name="an_ch3", channel=3, doc="Analog input channel 3")
|
||||
an_ch4 = Cpt(GalilRIOSignalRO, signal_name="an_ch4", channel=4, doc="Analog input channel 4")
|
||||
an_ch5 = Cpt(GalilRIOSignalRO, signal_name="an_ch5", channel=5, doc="Analog input channel 5")
|
||||
an_ch6 = Cpt(GalilRIOSignalRO, signal_name="an_ch6", channel=6, doc="Analog input channel 6")
|
||||
an_ch7 = Cpt(GalilRIOSignalRO, signal_name="an_ch7", channel=7, doc="Analog input channel 7")
|
||||
#############################
|
||||
### Analog input channels ###
|
||||
#############################
|
||||
|
||||
analog_in = DDC(_create_analog_channels(GalilRIOAnalogSignalRO._NUM_ANALOG_CHANNELS))
|
||||
digital_out = DDC(
|
||||
_create_digital_output_channels(GalilRIODigitalOutSignal._NUM_DIGITAL_OUTPUT_CHANNELS)
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -177,19 +251,18 @@ class GalilRIO(PSIDeviceBase):
|
||||
if port is None:
|
||||
port = 23 # Default port for Galil RIO controller
|
||||
self.controller = GalilRIOController(
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
|
||||
name=f"GalilRIOController_{name}",
|
||||
socket_cls=socket_cls,
|
||||
socket_host=host,
|
||||
socket_port=port,
|
||||
device_manager=device_manager,
|
||||
)
|
||||
self._last_readback: float = time.monotonic()
|
||||
self._readback_metadata: dict[str, float] = {"last_readback": 0.0}
|
||||
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
|
||||
self.controller.subscribe(
|
||||
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
|
||||
)
|
||||
|
||||
@property
|
||||
def last_readback(self) -> float:
|
||||
"""Return the time of the last readback from the controller."""
|
||||
return self._last_readback
|
||||
|
||||
# pylint: disable=arguments-differ
|
||||
def wait_for_connection(self, timeout: float = 30.0) -> None:
|
||||
"""Wait for the RIO controller to be connected within timeout period."""
|
||||
@@ -207,7 +280,7 @@ class GalilRIO(PSIDeviceBase):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
HOST_NAME = "129.129.98.64"
|
||||
HOST_NAME = "129.129.122.14"
|
||||
from bec_server.device_server.tests.utils import DMMock
|
||||
|
||||
dm = DMMock()
|
||||
|
||||
@@ -25,6 +25,34 @@ logger = bec_logger.logger
|
||||
|
||||
|
||||
class LamniGalilController(GalilController):
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Error status
|
||||
# ============================================================
|
||||
|
||||
caperr_bits = {
|
||||
0x01: "Cap1 outside expected left-stop range (early check)",
|
||||
0x02: "Cap2 outside expected left-stop range (early check)",
|
||||
0x04: "Cap1 too low during pressure-off check (near right boundary)",
|
||||
0x08: "Cap2 too low during pressure-off check (near right boundary)",
|
||||
0x10: "Cap1 exceeded allowed left-stop boundary during movement",
|
||||
0x20: "Cap2 exceeded allowed left-stop boundary during movement (disabled in code)",
|
||||
0x40: "Cap1 did not respond to test movement",
|
||||
0x80: "Cap2 did not respond to test movement"
|
||||
}
|
||||
|
||||
allaxrer_table = {
|
||||
1: "Not all axes referenced after reference search",
|
||||
2: "Pressure-loss emergency stop (pressure 14/15 active while motor C off)",
|
||||
3: "Unexpected pressure OFF while soft-limits not yet set",
|
||||
4: "Pressure valve mismatch (OUT13=0 but IN13=1)",
|
||||
5: "Capacitive sensor boundary violations (caperr > 0)",
|
||||
6: "Emergency Stop triggered (IN[5]=0)",
|
||||
7: "Following error detected on one or more axes"
|
||||
}
|
||||
|
||||
|
||||
USER_ACCESS = [
|
||||
"describe",
|
||||
"show_running_threads",
|
||||
@@ -37,6 +65,8 @@ class LamniGalilController(GalilController):
|
||||
"get_motor_limit_switch",
|
||||
"is_motor_on",
|
||||
"all_axes_referenced",
|
||||
"lamni_lights_off",
|
||||
"lamni_lights_on"
|
||||
]
|
||||
|
||||
def show_status_other(self):
|
||||
@@ -60,6 +90,47 @@ class LamniGalilController(GalilController):
|
||||
print("There is air pressure at the outer rotation radial.")
|
||||
swver = float(self.socket_put_and_receive("MGswver"))
|
||||
print(f"Lgalil LAMNI firmware version {swver:2.0f}.")
|
||||
allaxref = int(float(self.socket_put_and_receive("MGallaxref")))
|
||||
print(f"Error statuts:")
|
||||
if allaxref == 1:
|
||||
print(f"Allaxref = 1, all OK.")
|
||||
else:
|
||||
print(f"Allaxref = {allaxref}. Not all axes are referenced or error introduced preventing motion.")
|
||||
allaxrer = int(float(self.socket_put_and_receive("MGallaxrer")))
|
||||
print("\nallaxrer =", allaxrer)
|
||||
print(self.decode_allaxrer(allaxrer))
|
||||
caperr = int(float(self.socket_put_and_receive("MGcaperr")))
|
||||
print("\nDecoding caperr =", caperr)
|
||||
self.visualize_caperr(caperr)
|
||||
|
||||
def decode_allaxrer(self, code: int) -> str:
|
||||
"""Return human-readable meaning of allaxrer code."""
|
||||
return self.allaxrer_table.get(code, "Unknown allaxrer code")
|
||||
|
||||
def visualize_caperr(self, mask: int):
|
||||
"""Pretty-print a bitmask visualization for caperr."""
|
||||
print("\n=== CAPERR BITMASK VISUALIZER ===")
|
||||
print(f"Raw value: {mask} (0x{mask:02X})")
|
||||
print("----------------------------------\n")
|
||||
|
||||
print("Bit | Hex | Active | Meaning")
|
||||
print("----------------------------------")
|
||||
|
||||
for bit, meaning in self.caperr_bits.items():
|
||||
active = "YES" if mask & bit else "no"
|
||||
print(f"{bit:3d} | 0x{bit:02X} | {active:6} | {meaning}")
|
||||
|
||||
print("\nActive flags:")
|
||||
active_flags = [meaning for bit, meaning in self.caperr_bits.items() if mask & bit]
|
||||
|
||||
if active_flags:
|
||||
for f in active_flags:
|
||||
print(" ✓", f)
|
||||
else:
|
||||
print(" (none)")
|
||||
|
||||
print("\n==================================\n")
|
||||
|
||||
|
||||
def lamni_lights_off(self):
|
||||
self.socket_put_confirmed("SB1")
|
||||
@@ -93,7 +164,7 @@ class LamniGalilReadbackSignal(GalilSignalRO):
|
||||
val = super().read()
|
||||
if self.parent.axis_Id_numeric == 2:
|
||||
try:
|
||||
rt = self.parent.device_manager.devices[self.parent.rtx]
|
||||
rt = self.parent.device_manager.devices[self.parent.rt]
|
||||
if rt.enabled:
|
||||
rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"])
|
||||
except KeyError:
|
||||
@@ -147,7 +218,7 @@ class LamniGalilMotor(Device, PositionerBase):
|
||||
raise BECConfigError(
|
||||
"device_mapping has been specified but the device_manager cannot be accessed."
|
||||
)
|
||||
self.rt = self.device_mapping.get("rt")
|
||||
self.rt = self.device_mapping.get("rt", "rtx")
|
||||
|
||||
super().__init__(
|
||||
prefix,
|
||||
|
||||
@@ -11,6 +11,7 @@ from ophyd.status import wait as status_wait
|
||||
from ophyd.utils import LimitError, ReadOnlyError
|
||||
from ophyd_devices.utils.controller import Controller, threadlocked
|
||||
from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected
|
||||
from prettytable import PrettyTable
|
||||
|
||||
from csaxs_bec.devices.omny.rt.rt_ophyd import RtCommunicationError, RtError
|
||||
|
||||
@@ -51,6 +52,7 @@ class RtLamniController(Controller):
|
||||
_axes_per_controller = 3
|
||||
USER_ACCESS = [
|
||||
"socket_put_and_receive",
|
||||
"socket_put",
|
||||
"set_rotation_angle",
|
||||
"feedback_disable",
|
||||
"feedback_enable_without_reset",
|
||||
@@ -62,6 +64,11 @@ class RtLamniController(Controller):
|
||||
"_set_axis_velocity_maximum_speed",
|
||||
"_position_sampling_single_read",
|
||||
"_position_sampling_single_reset_and_start_sampling",
|
||||
"show_signal_strength_interferometer",
|
||||
"show_interferometer_positions",
|
||||
"show_analog_signals",
|
||||
"show_feedback_status",
|
||||
|
||||
]
|
||||
|
||||
def __init__(
|
||||
@@ -208,8 +215,9 @@ class RtLamniController(Controller):
|
||||
|
||||
@threadlocked
|
||||
def start_scan(self):
|
||||
interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0])
|
||||
if interferometer_feedback_not_running == 1:
|
||||
# interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0])
|
||||
# if interferometer_feedback_not_running == 1:
|
||||
if not self.feedback_is_running():
|
||||
logger.error(
|
||||
"Cannot start scan because feedback loop is not running or there is an interferometer error."
|
||||
)
|
||||
@@ -270,6 +278,44 @@ class RtLamniController(Controller):
|
||||
"average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)},
|
||||
}
|
||||
return signals
|
||||
|
||||
def feedback_is_running(self) -> bool:
|
||||
status = int(float((self.socket_put_and_receive("J2")).split(",")[0]))
|
||||
return status == 0 # 0 means running, 1 means error/disabled
|
||||
|
||||
def show_feedback_status(self):
|
||||
if self.feedback_is_running():
|
||||
print("Loop is running, no error on interferometer.")
|
||||
else:
|
||||
print("Loop is not running, either it is turned off or an interferometer error occurred.")
|
||||
|
||||
|
||||
def show_analog_signals(self) -> dict:
|
||||
self.socket_put("As") # start sampling
|
||||
time.sleep(0.01)
|
||||
return_table = (self.socket_put_and_receive("Ar")).split(",")
|
||||
|
||||
number_of_samples = int(float(return_table[0]))
|
||||
signals = {
|
||||
"number_of_samples": number_of_samples,
|
||||
"piezo_0": float(return_table[1]),
|
||||
"piezo_1": float(return_table[2]),
|
||||
"cap_0": float(return_table[3]),
|
||||
"cap_1": float(return_table[4]),
|
||||
"cap_2": float(return_table[5]),
|
||||
"cap_3": float(return_table[6]),
|
||||
"cap_4": float(return_table[7]),
|
||||
}
|
||||
|
||||
t = PrettyTable()
|
||||
t.title = f"LamNI Analog Signals ({number_of_samples} samples)"
|
||||
t.field_names = ["Signal", "Value"]
|
||||
for key, val in signals.items():
|
||||
if key != "number_of_samples":
|
||||
t.add_row([key, f"{val:.4f}"])
|
||||
print(t)
|
||||
|
||||
return
|
||||
|
||||
def read_positions_from_sampler(self):
|
||||
# this was for reading after the scan completed
|
||||
@@ -347,6 +393,48 @@ class RtLamniController(Controller):
|
||||
)
|
||||
return bool(return_table[0])
|
||||
|
||||
def show_signal_strength_interferometer(self):
|
||||
# trigger SSI averaging before reading
|
||||
self.socket_put("J3")
|
||||
time.sleep(0.05)
|
||||
return_table = (self.socket_put_and_receive("J2")).split(",")
|
||||
ssi_0 = float(return_table[1])
|
||||
ssi_1 = float(return_table[2])
|
||||
|
||||
return_table_angle = (self.socket_put_and_receive("J7")).split(",")
|
||||
angle_running = bool(int(float(return_table_angle[0])))
|
||||
angle_position = float(return_table_angle[1])
|
||||
angle_signal = float(return_table_angle[2])
|
||||
|
||||
t = PrettyTable()
|
||||
t.title = "Interferometer signal strength"
|
||||
t.field_names = ["Axis", "Description", "Value", "Running"]
|
||||
t.add_row([0, "ST FZP horizontal", ssi_0, "-"])
|
||||
t.add_row([1, "ST FZP vertical", ssi_1, "-"])
|
||||
t.add_row([2, "Angle interferometer", angle_signal, angle_running])
|
||||
print(t)
|
||||
|
||||
if angle_running:
|
||||
print(f"Angle interferometer position: {angle_position:.4f} um")
|
||||
else:
|
||||
print("Warning: angle interferometer is not running.")
|
||||
|
||||
def show_interferometer_positions(self) -> dict:
|
||||
return_table = (self.socket_put_and_receive("J4")).split(",")
|
||||
loop_status = bool(int(float(return_table[0])))
|
||||
pos_y = float(return_table[1])
|
||||
pos_x = float(return_table[2])
|
||||
|
||||
t = PrettyTable()
|
||||
t.title = "LamNI Interferometer Positions"
|
||||
t.field_names = ["Axis", "Description", "Position (um)"]
|
||||
t.add_row([0, "X", f"{pos_x:.4f}"])
|
||||
t.add_row([1, "Y", f"{pos_y:.4f}"])
|
||||
print(t)
|
||||
print(f"Feedback loop running: {loop_status}")
|
||||
|
||||
return {"x": pos_x, "y": pos_y, "loop_running": loop_status}
|
||||
|
||||
def feedback_enable_with_reset(self):
|
||||
if not self.feedback_status_angle_lamni():
|
||||
self.feedback_disable_and_even_reset_lamni_angle_interferometer()
|
||||
|
||||
@@ -1,14 +0,0 @@
|
||||
"""
|
||||
Conftest runs for all tests in this directory and subdirectories. Thereby, we know for
|
||||
certain that the SocketSignal.READBACK_TIMEOUT is set to 0 for all tests, which prevents
|
||||
hanging tests when a readback is attempted on a non-connected socket.
|
||||
"""
|
||||
|
||||
# conftest.py
|
||||
import pytest
|
||||
from ophyd_devices.utils.socket import SocketSignal
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def patch_socket_timeout(monkeypatch):
|
||||
monkeypatch.setattr(SocketSignal, "READBACK_TIMEOUT", 0.0)
|
||||
@@ -2,6 +2,7 @@ from unittest import mock
|
||||
|
||||
import pytest
|
||||
from ophyd_devices.tests.utils import SocketMock
|
||||
from ophyd_devices.utils.socket import SocketSignal
|
||||
|
||||
from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor
|
||||
|
||||
@@ -17,6 +18,11 @@ def fsamroy(dm_with_devices):
|
||||
socket_cls=SocketMock,
|
||||
device_manager=dm_with_devices,
|
||||
)
|
||||
for walk in fsamroy_motor.walk_signals():
|
||||
if isinstance(walk.item, SocketSignal):
|
||||
walk.item._readback_timeout = (
|
||||
0.0 # Set the readback timeout to 0 to avoid waiting during tests
|
||||
)
|
||||
fsamroy_motor.controller.on()
|
||||
assert isinstance(fsamroy_motor.controller, FuprGalilController)
|
||||
yield fsamroy_motor
|
||||
|
||||
@@ -9,7 +9,11 @@ from ophyd_devices.tests.utils import SocketMock
|
||||
from csaxs_bec.devices.npoint.npoint import NPointAxis, NPointController
|
||||
from csaxs_bec.devices.omny.galil.fgalil_ophyd import FlomniGalilController, FlomniGalilMotor
|
||||
from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor
|
||||
from csaxs_bec.devices.omny.galil.galil_rio import GalilRIO, GalilRIOController, GalilRIOSignalRO
|
||||
from csaxs_bec.devices.omny.galil.galil_rio import (
|
||||
GalilRIO,
|
||||
GalilRIOAnalogSignalRO,
|
||||
GalilRIOController,
|
||||
)
|
||||
from csaxs_bec.devices.omny.galil.lgalil_ophyd import LamniGalilController, LamniGalilMotor
|
||||
from csaxs_bec.devices.omny.galil.ogalil_ophyd import OMNYGalilController, OMNYGalilMotor
|
||||
from csaxs_bec.devices.omny.galil.sgalil_ophyd import GalilController, SGalilMotor
|
||||
@@ -272,26 +276,27 @@ def test_galil_rio_signal_read(galil_rio):
|
||||
## Test read of all channels
|
||||
###########
|
||||
|
||||
assert galil_rio.an_ch0._READ_TIMEOUT == 0.1 # Default read timeout of 100ms
|
||||
assert galil_rio.analog_in.ch0._readback_timeout == 0.1 # Default read timeout of 100ms
|
||||
# Mock the socket to return specific values
|
||||
galil_rio.controller.sock.buffer_recv = [b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901"]
|
||||
galil_rio._last_readback = 0 # Force read from controller
|
||||
|
||||
analog_bufffer = b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901\r\n"
|
||||
galil_rio.controller.sock.buffer_recv = [] # Clear any existing buffer
|
||||
galil_rio.controller.sock.buffer_recv.append(analog_bufffer)
|
||||
read_values = galil_rio.read()
|
||||
assert len(read_values) == 8 # 8 channels
|
||||
|
||||
expected_values = {
|
||||
galil_rio.an_ch0.name: {"value": 1.234},
|
||||
galil_rio.an_ch1.name: {"value": 2.345},
|
||||
galil_rio.an_ch2.name: {"value": 3.456},
|
||||
galil_rio.an_ch3.name: {"value": 4.567},
|
||||
galil_rio.an_ch4.name: {"value": 5.678},
|
||||
galil_rio.an_ch5.name: {"value": 6.789},
|
||||
galil_rio.an_ch6.name: {"value": 7.890},
|
||||
galil_rio.an_ch7.name: {"value": 8.901},
|
||||
galil_rio.analog_in.ch0.name: {"value": 1.234},
|
||||
galil_rio.analog_in.ch1.name: {"value": 2.345},
|
||||
galil_rio.analog_in.ch2.name: {"value": 3.456},
|
||||
galil_rio.analog_in.ch3.name: {"value": 4.567},
|
||||
galil_rio.analog_in.ch4.name: {"value": 5.678},
|
||||
galil_rio.analog_in.ch5.name: {"value": 6.789},
|
||||
galil_rio.analog_in.ch6.name: {"value": 7.890},
|
||||
galil_rio.analog_in.ch7.name: {"value": 8.901},
|
||||
}
|
||||
# All timestamps should be the same
|
||||
assert all(
|
||||
ret["timestamp"] == read_values[galil_rio.an_ch0.name]["timestamp"]
|
||||
ret["timestamp"] == read_values[galil_rio.analog_in.ch0.name]["timestamp"]
|
||||
for signal_name, ret in read_values.items()
|
||||
)
|
||||
# Check values
|
||||
@@ -301,7 +306,7 @@ def test_galil_rio_signal_read(galil_rio):
|
||||
|
||||
# Check communication command to socker
|
||||
assert galil_rio.controller.sock.buffer_put == [
|
||||
b"MG@AN[0],@AN[1],@AN[2],@AN[3],@AN[4],@AN[5],@AN[6],@AN[7]\r"
|
||||
b"MG@AN[0], @AN[1], @AN[2], @AN[3], @AN[4], @AN[5], @AN[6], @AN[7]\r"
|
||||
]
|
||||
|
||||
###########
|
||||
@@ -313,11 +318,11 @@ def test_galil_rio_signal_read(galil_rio):
|
||||
|
||||
def value_callback(value, old_value, **kwargs):
|
||||
obj = kwargs.get("obj")
|
||||
galil = obj.parent
|
||||
galil = obj.parent.parent
|
||||
readback = galil.read()
|
||||
value_callback_buffer.append(readback)
|
||||
|
||||
galil_rio.an_ch0.subscribe(value_callback, run=False)
|
||||
galil_rio.analog_in.ch0.subscribe(value_callback, run=False)
|
||||
galil_rio.controller.sock.buffer_recv = [b" 2.5 2.6 2.7 2.8 2.9 3.0 3.1 3.2"]
|
||||
expected_values = [2.5, 2.6, 2.7, 2.8, 2.9, 3.0, 3.1, 3.2]
|
||||
|
||||
@@ -327,13 +332,15 @@ def test_galil_rio_signal_read(galil_rio):
|
||||
|
||||
# Should have used the cached value
|
||||
for walk in galil_rio.walk_signals():
|
||||
walk.item._READ_TIMEOUT = 10 # Make sure cached read is used
|
||||
ret = galil_rio.an_ch0.read()
|
||||
walk.item._readback_timeout = 10 # Make sure cached read is used
|
||||
ret = galil_rio.analog_in.ch0.read()
|
||||
|
||||
# Should not trigger callback since value did not change
|
||||
assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 1.234)
|
||||
assert np.isclose(ret[galil_rio.analog_in.ch0.name]["value"], 1.234)
|
||||
# Same timestamp as for another channel as this is cached read
|
||||
assert np.isclose(ret[galil_rio.an_ch0.name]["timestamp"], galil_rio.an_ch7.timestamp)
|
||||
assert np.isclose(
|
||||
ret[galil_rio.analog_in.ch0.name]["timestamp"], galil_rio.analog_in.ch7.timestamp
|
||||
)
|
||||
assert len(value_callback_buffer) == 0
|
||||
|
||||
##################
|
||||
@@ -341,10 +348,10 @@ def test_galil_rio_signal_read(galil_rio):
|
||||
##################
|
||||
|
||||
# Now force a read from the controller
|
||||
galil_rio._last_readback = 0 # Force read from controller
|
||||
ret = galil_rio.an_ch0.read()
|
||||
galil_rio.analog_in.ch0._last_readback = 0 # Force read from controller
|
||||
ret = galil_rio.analog_in.ch0.read()
|
||||
|
||||
assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 2.5)
|
||||
assert np.isclose(ret[galil_rio.analog_in.ch0.name]["value"], 2.5)
|
||||
|
||||
# Check callback invocation, but only 1 callback even with galil_rio.read() call in callback
|
||||
assert len(value_callback_buffer) == 1
|
||||
@@ -352,7 +359,45 @@ def test_galil_rio_signal_read(galil_rio):
|
||||
assert np.isclose(values, expected_values).all()
|
||||
assert all(
|
||||
[
|
||||
value["timestamp"] == value_callback_buffer[0][galil_rio.an_ch0.name]["timestamp"]
|
||||
value["timestamp"]
|
||||
== value_callback_buffer[0][galil_rio.analog_in.ch0.name]["timestamp"]
|
||||
for value in value_callback_buffer[0].values()
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_galil_rio_digital_out_signal(galil_rio):
|
||||
"""
|
||||
Test that the Galil RIO digital output signal can be set correctly.
|
||||
"""
|
||||
## Test Read from digital output channels
|
||||
buffer_receive = []
|
||||
excepted_put_buffer = []
|
||||
for ii in range(galil_rio.digital_out.ch0._NUM_DIGITAL_OUTPUT_CHANNELS):
|
||||
cmd = f"MG@OUT[{ii}]\r".encode()
|
||||
excepted_put_buffer.append(cmd)
|
||||
recv = " 1.000".encode()
|
||||
buffer_receive.append(recv)
|
||||
|
||||
galil_rio.controller.sock.buffer_recv = buffer_receive # Mock response for readback
|
||||
|
||||
digital_read = galil_rio.read_configuration() # Read to populate readback values
|
||||
|
||||
for walk in galil_rio.digital_out.walk_signals():
|
||||
assert np.isclose(digital_read[walk.item.name]["value"], 1.0)
|
||||
|
||||
assert galil_rio.controller.sock.buffer_put == excepted_put_buffer
|
||||
|
||||
# Test writing to digital output channels
|
||||
galil_rio.controller.sock.buffer_put = [] # Clear buffer put
|
||||
galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback
|
||||
|
||||
# Set digital output channel 0 to high
|
||||
galil_rio.digital_out.ch0.put(1)
|
||||
assert galil_rio.controller.sock.buffer_put == [b"SB0\r"]
|
||||
|
||||
# Set digital output channel 0 to low
|
||||
galil_rio.controller.sock.buffer_put = [] # Clear buffer put
|
||||
galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback
|
||||
galil_rio.digital_out.ch0.put(0)
|
||||
assert galil_rio.controller.sock.buffer_put == [b"CB0\r"]
|
||||
|
||||
Reference in New Issue
Block a user