diff --git a/csaxs_bec/bec_ipython_client/plugins/LamNI/__init__.py b/csaxs_bec/bec_ipython_client/plugins/LamNI/__init__.py
index 286f52a..5642f93 100644
--- a/csaxs_bec/bec_ipython_client/plugins/LamNI/__init__.py
+++ b/csaxs_bec/bec_ipython_client/plugins/LamNI/__init__.py
@@ -1,2 +1,6 @@
-from .load_additional_correction import lamni_read_additional_correction
-from .x_ray_eye_align import DataDrivenLamNI, LamNI, MagLamNI, XrayEyeAlign
+from .alignment import XrayEyeAlign
+from .lamni import LamNI
+from .lamni_optics_mixin import LamNIInitError, LaMNIInitStages, LamNIOpticsMixin
+__all__ = [
+ "LamNI", "XrayEyeAlign", "LamNIInitError", "LaMNIInitStages", "LamNIOpticsMixin"
+]
\ No newline at end of file
diff --git a/csaxs_bec/bec_ipython_client/plugins/LamNI/alignment.py b/csaxs_bec/bec_ipython_client/plugins/LamNI/alignment.py
new file mode 100644
index 0000000..9049562
--- /dev/null
+++ b/csaxs_bec/bec_ipython_client/plugins/LamNI/alignment.py
@@ -0,0 +1,461 @@
+import builtins
+import time
+from collections import defaultdict
+
+import numpy as np
+from bec_lib import bec_logger
+from typeguard import typechecked
+
+from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen
+
+logger = bec_logger.logger
+
+if builtins.__dict__.get("bec") is not None:
+ bec = builtins.__dict__.get("bec")
+ dev = builtins.__dict__.get("dev")
+ umv = builtins.__dict__.get("umv")
+ umvr = builtins.__dict__.get("umvr")
+
+
+class XrayEyeAlign:
+ # pixel calibration, multiply to get mm
+ # PIXEL_CALIBRATION = 0.2/209 #.2 with binning
+ PIXEL_CALIBRATION = 0.2 / 218 # .2 with binning
+
+ def __init__(self, client, lamni) -> None:
+ self.client = client
+ self.lamni = lamni
+ self.device_manager = client.device_manager
+ self.scans = client.scans
+ self.alignment_values = defaultdict(list)
+ self._reset_init_values()
+ self.corr_pos_x = []
+ self.corr_pos_y = []
+ self.corr_angle = []
+ self.corr_pos_x_2 = []
+ self.corr_pos_y_2 = []
+ self.corr_angle_2 = []
+
+ # ------------------------------------------------------------------
+ # Correction reset
+ # ------------------------------------------------------------------
+
+ def reset_correction(self):
+ self.corr_pos_x = []
+ self.corr_pos_y = []
+ self.corr_angle = []
+
+ def reset_correction_2(self):
+ self.corr_pos_x_2 = []
+ self.corr_pos_y_2 = []
+ self.corr_angle_2 = []
+
+ def reset_xray_eye_correction(self):
+ self.client.delete_global_var("tomo_fit_xray_eye")
+
+ # ------------------------------------------------------------------
+ # FOV offset properties
+ # ------------------------------------------------------------------
+
+ @property
+ def tomo_fovx_offset(self):
+ val = self.client.get_global_var("tomo_fov_offset")
+ if val is None:
+ return 0.0
+ return val[0] / 1000
+
+ @tomo_fovx_offset.setter
+ @typechecked
+ def tomo_fovx_offset(self, val: float):
+ val_old = self.client.get_global_var("tomo_fov_offset")
+ if val_old is None:
+ val_old = [0.0, 0.0]
+ self.client.set_global_var("tomo_fov_offset", [val * 1000, val_old[1]])
+
+ @property
+ def tomo_fovy_offset(self):
+ val = self.client.get_global_var("tomo_fov_offset")
+ if val is None:
+ return 0.0
+ return val[1] / 1000
+
+ @tomo_fovy_offset.setter
+ @typechecked
+ def tomo_fovy_offset(self, val: float):
+ val_old = self.client.get_global_var("tomo_fov_offset")
+ if val_old is None:
+ val_old = [0.0, 0.0]
+ self.client.set_global_var("tomo_fov_offset", [val_old[0], val * 1000])
+
+ # ------------------------------------------------------------------
+ # Internal helpers
+ # ------------------------------------------------------------------
+
+ def _reset_init_values(self):
+ self.shift_xy = [0, 0]
+ self._xray_fov_xy = [0, 0]
+
+ def _disable_rt_feedback(self):
+ self.device_manager.devices.rtx.controller.feedback_disable()
+
+ def _enable_rt_feedback(self):
+ self.device_manager.devices.rtx.controller.feedback_enable_with_reset()
+
+ def tomo_rotate(self, val: float):
+ # pylint: disable=undefined-variable
+ umv(self.device_manager.devices.lsamrot, val)
+
+ def get_tomo_angle(self):
+ return self.device_manager.devices.lsamrot.readback.read()["lsamrot"]["value"]
+
+ # ------------------------------------------------------------------
+ # X-ray eye camera control
+ # ------------------------------------------------------------------
+
+ def save_frame(self):
+ epics_put("XOMNYI-XEYE-SAVFRAME:0", 1)
+
+ def update_frame(self):
+ epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
+ # start live
+ epics_put("XOMNYI-XEYE-ACQ:0", 1)
+ # wait for start live
+ while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
+ time.sleep(0.5)
+ print("waiting for live view to start...")
+ fshopen()
+
+ epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
+
+ while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
+ print("waiting for new frame...")
+ time.sleep(0.5)
+
+ time.sleep(0.5)
+ # stop live view
+ epics_put("XOMNYI-XEYE-ACQ:0", 0)
+ time.sleep(1)
+ print("got new frame")
+
+ def update_fov(self, k: int):
+ self._xray_fov_xy[0] = max(epics_get(f"XOMNYI-XEYE-XWIDTH_X:{k}"), self._xray_fov_xy[0])
+ self._xray_fov_xy[1] = max(0, self._xray_fov_xy[0])
+
+ @property
+ def movement_buttons_enabled(self):
+ return [epics_get("XOMNYI-XEYE-ENAMVX:0"), epics_get("XOMNYI-XEYE-ENAMVY:0")]
+
+ @movement_buttons_enabled.setter
+ def movement_buttons_enabled(self, enabled: bool):
+ enabled = int(enabled)
+ epics_put("XOMNYI-XEYE-ENAMVX:0", enabled)
+ epics_put("XOMNYI-XEYE-ENAMVY:0", enabled)
+
+ def send_message(self, msg: str):
+ epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg)
+
+ # ------------------------------------------------------------------
+ # Alignment procedure
+ # ------------------------------------------------------------------
+
+ def align(self):
+ self._reset_init_values()
+ self.reset_correction()
+ self.reset_correction_2()
+
+ self._disable_rt_feedback()
+ epics_put("XOMNYI-XEYE-PIXELSIZE:0", self.PIXEL_CALIBRATION)
+ self._enable_rt_feedback()
+
+ self.movement_buttons_enabled = False
+ epics_put("XOMNYI-XEYE-ACQ:0", 0)
+ self.send_message("please wait...")
+ epics_put("XOMNYI-XEYE-SAMPLENAME:0.DESC", "Let us LAMNI...")
+
+ self._disable_rt_feedback()
+ k = 0
+
+ self.lamni.lfzp_in()
+ self.update_frame()
+
+ self.movement_buttons_enabled = False
+ epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
+ epics_put("XOMNYI-XEYE-STEP:0", 0)
+ self.send_message("Submit center value of FZP.")
+
+ while True:
+ if epics_get("XOMNYI-XEYE-SUBMIT:0") == 1:
+ val_x = epics_get(f"XOMNYI-XEYE-XVAL_X:{k}") * self.PIXEL_CALIBRATION # in mm
+ val_y = epics_get(f"XOMNYI-XEYE-YVAL_Y:{k}") * self.PIXEL_CALIBRATION # in mm
+ self.alignment_values[k] = [val_x, val_y]
+ print(
+ f"Clicked position {k}: x {self.alignment_values[k][0]}, y"
+ f" {self.alignment_values[k][1]}"
+ )
+
+ if k == 0: # received center value of FZP
+ self.send_message("please wait ...")
+ self.lamni.loptics_out()
+ epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
+ self.movement_buttons_enabled = False
+ print("Moving sample in, FZP out")
+
+ self._disable_rt_feedback()
+ time.sleep(0.3)
+ self._enable_rt_feedback()
+ time.sleep(0.3)
+
+ self.update_frame()
+ self.send_message("Go and find the sample")
+ epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
+ self.movement_buttons_enabled = True
+
+ elif k == 1: # received sample center value at samrot 0
+ msg = (
+ f"Base shift values from movement are x {self.shift_xy[0]}, y"
+ f" {self.shift_xy[1]}"
+ )
+ print(msg)
+ logger.info(msg)
+ self.shift_xy[0] += (
+ self.alignment_values[0][0] - self.alignment_values[1][0]
+ ) * 1000
+ self.shift_xy[1] += (
+ self.alignment_values[1][1] - self.alignment_values[0][1]
+ ) * 1000
+ print(
+ "Base shift values from movement and clicked position are x"
+ f" {self.shift_xy[0]}, y {self.shift_xy[1]}"
+ )
+
+ self.scans.lamni_move_to_scan_center(
+ self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
+ ).wait()
+
+ self.send_message("please wait ...")
+ epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
+ self.movement_buttons_enabled = False
+ time.sleep(1)
+
+ self.scans.lamni_move_to_scan_center(
+ self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
+ ).wait()
+
+ epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
+ self.update_frame()
+ self.send_message("Submit sample center and FOV (0 deg)")
+ epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
+ self.update_fov(k)
+
+ elif 1 < k < 10: # received sample center value at samrot 0 ... 315
+ self.send_message("please wait ...")
+ epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
+
+ self._disable_rt_feedback()
+ self.tomo_rotate((k - 1) * 45 - 45 / 2)
+ self.scans.lamni_move_to_scan_center(
+ self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
+ ).wait()
+ self._disable_rt_feedback()
+ self.tomo_rotate((k - 1) * 45)
+ self.scans.lamni_move_to_scan_center(
+ self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
+ ).wait()
+
+ epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
+ self.update_frame()
+ self.send_message("Submit sample center")
+ epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
+ epics_put("XOMNYI-XEYE-ENAMVX:0", 1)
+ self.update_fov(k)
+
+ elif k == 10: # received sample center value at samrot 270, done
+ self.send_message("done...")
+ epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
+ self.movement_buttons_enabled = False
+ self.update_fov(k)
+ break
+
+ k += 1
+ epics_put("XOMNYI-XEYE-STEP:0", k)
+
+ if k < 2:
+ _xrayeyalignmvx = epics_get("XOMNYI-XEYE-MVX:0")
+ _xrayeyalignmvy = epics_get("XOMNYI-XEYE-MVY:0")
+ if _xrayeyalignmvx != 0 or _xrayeyalignmvy != 0:
+ self.shift_xy[0] = self.shift_xy[0] + _xrayeyalignmvx
+ self.shift_xy[1] = self.shift_xy[1] + _xrayeyalignmvy
+ self.scans.lamni_move_to_scan_center(
+ self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
+ ).wait()
+ print(
+ f"Current center horizontal {self.shift_xy[0]} vertical {self.shift_xy[1]}"
+ )
+ epics_put("XOMNYI-XEYE-MVY:0", 0)
+ epics_put("XOMNYI-XEYE-MVX:0", 0)
+ self.update_frame()
+
+ time.sleep(0.2)
+
+ self.write_output()
+ fovx = self._xray_fov_xy[0] * self.PIXEL_CALIBRATION * 1000 / 2
+ fovy = self._xray_fov_xy[1] * self.PIXEL_CALIBRATION * 1000 / 2
+ print(
+ f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns,"
+ f" fovy = {fovy:.0f} microns"
+ )
+ print("Use matlab routine to fit the current alignment...")
+ print(
+ "This additional shift is applied to the base shift values\n which are x"
+ f" {self.shift_xy[0]}, y {self.shift_xy[1]}"
+ )
+
+ self._disable_rt_feedback()
+ self.tomo_rotate(0)
+
+ print(
+ "\n\nNEXT LOAD ALIGNMENT PARAMETERS\nby running"
+ " lamni.align.read_xray_eye_correction()\n"
+ )
+
+ self.client.set_global_var("tomo_fov_offset", self.shift_xy)
+
+ # ------------------------------------------------------------------
+ # Alignment output
+ # ------------------------------------------------------------------
+
+ def write_output(self):
+ import os
+ with open(
+ os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues"), "w"
+ ) as alignment_values_file:
+ alignment_values_file.write("angle\thorizontal\tvertical\n")
+ for k in range(2, 11):
+ fovx_offset = (self.alignment_values[0][0] - self.alignment_values[k][0]) * 1000
+ fovy_offset = (self.alignment_values[k][1] - self.alignment_values[0][1]) * 1000
+ print(
+ f"Writing to file new alignment: number {k}, value x {fovx_offset}, y"
+ f" {fovy_offset}"
+ )
+ alignment_values_file.write(f"{(k-2)*45}\t{fovx_offset}\t{fovy_offset}\n")
+
+ # ------------------------------------------------------------------
+ # X-ray eye sinusoidal correction (loaded from MATLAB fit files)
+ # ------------------------------------------------------------------
+
+ def read_xray_eye_correction(self, dir_path=None):
+ import os
+ if dir_path is None:
+ dir_path = os.path.expanduser("~/Data10/specES1/internal/")
+ tomo_fit_xray_eye = np.zeros((2, 3))
+ for i, axis in enumerate(["x", "y"]):
+ for j, coeff in enumerate(["A", "B", "C"]):
+ with open(os.path.join(dir_path, f"ptychotomoalign_{coeff}{axis}.txt"), "r") as f:
+ tomo_fit_xray_eye[i][j] = f.readline()
+
+ self.client.set_global_var("tomo_fit_xray_eye", tomo_fit_xray_eye.tolist())
+ # x amp, phase, offset, y amp, phase, offset
+ # 0 0 0 1 0 2 1 0 1 1 1 2
+ print("New alignment parameters loaded from X-ray eye")
+ print(
+ f"X Amplitude {tomo_fit_xray_eye[0][0]}, "
+ f"X Phase {tomo_fit_xray_eye[0][1]}, "
+ f"X Offset {tomo_fit_xray_eye[0][2]}, "
+ f"Y Amplitude {tomo_fit_xray_eye[1][0]}, "
+ f"Y Phase {tomo_fit_xray_eye[1][1]}, "
+ f"Y Offset {tomo_fit_xray_eye[1][2]}"
+ )
+
+ def lamni_compute_additional_correction_xeye_mu(self, angle):
+ """Compute sinusoidal correction from the X-ray eye fit for the given angle."""
+ tomo_fit_xray_eye = self.client.get_global_var("tomo_fit_xray_eye")
+ if tomo_fit_xray_eye is None:
+ print("Not applying any additional correction. No x-ray eye data available.\n")
+ return (0, 0)
+
+ # x amp, phase, offset, y amp, phase, offset
+ # 0 0 0 1 0 2 1 0 1 1 1 2
+ correction_x = (
+ tomo_fit_xray_eye[0][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[0][1])
+ + tomo_fit_xray_eye[0][2]
+ ) / 1000
+ correction_y = (
+ tomo_fit_xray_eye[1][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[1][1])
+ + tomo_fit_xray_eye[1][2]
+ ) / 1000
+
+ print(f"Xeye correction x {correction_x}, y {correction_y} for angle {angle}\n")
+ return (correction_x, correction_y)
+
+ # ------------------------------------------------------------------
+ # Additional lookup-table corrections (iteration 1 and 2)
+ # ------------------------------------------------------------------
+
+ def read_additional_correction(self, correction_file: str):
+ self.corr_pos_x, self.corr_pos_y, self.corr_angle = self._read_correction_file_xy(
+ correction_file
+ )
+
+ def read_additional_correction_2(self, correction_file: str):
+ self.corr_pos_x_2, self.corr_pos_y_2, self.corr_angle_2 = self._read_correction_file_xy(
+ correction_file
+ )
+
+ def _read_correction_file_xy(self, correction_file: str):
+ """Parse a correction file that contains corr_pos_x, corr_pos_y and corr_angle entries."""
+ with open(correction_file, "r") as f:
+ num_elements = f.readline()
+ int_num_elements = int(num_elements.split(" ")[2])
+ print(int_num_elements)
+ corr_pos_x = []
+ corr_pos_y = []
+ corr_angle = []
+ for j in range(0, int_num_elements * 3):
+ line = f.readline()
+ value = line.split(" ")[2]
+ name = line.split(" ")[0].split("[")[0]
+ if name == "corr_pos_x":
+ corr_pos_x.append(float(value) / 1000)
+ elif name == "corr_pos_y":
+ corr_pos_y.append(float(value) / 1000)
+ elif name == "corr_angle":
+ corr_angle.append(float(value))
+ return corr_pos_x, corr_pos_y, corr_angle
+
+ def compute_additional_correction(self, angle):
+ return self._compute_correction_xy(
+ angle, self.corr_pos_x, self.corr_pos_y, self.corr_angle, label="1"
+ )
+
+ def compute_additional_correction_2(self, angle):
+ return self._compute_correction_xy(
+ angle, self.corr_pos_x_2, self.corr_pos_y_2, self.corr_angle_2, label="2"
+ )
+
+ def _compute_correction_xy(self, angle, corr_pos_x, corr_pos_y, corr_angle, label=""):
+ """Find the correction for the closest angle in the lookup table."""
+ if not corr_pos_x:
+ print(f"Not applying additional correction {label}. No data available.\n")
+ return (0, 0)
+
+ shift_x = corr_pos_x[0]
+ shift_y = corr_pos_y[0]
+ angledelta = np.fabs(corr_angle[0] - angle)
+
+ for j in range(1, len(corr_pos_x)):
+ newangledelta = np.fabs(corr_angle[j] - angle)
+ if newangledelta < angledelta:
+ shift_x = corr_pos_x[j]
+ shift_y = corr_pos_y[j]
+ angledelta = newangledelta
+
+ if shift_x == 0 and angle < corr_angle[0]:
+ shift_x = corr_pos_x[0]
+ shift_y = corr_pos_y[0]
+
+ if shift_x == 0 and angle > corr_angle[-1]:
+ shift_x = corr_pos_x[-1]
+ shift_y = corr_pos_y[-1]
+
+ print(f"Additional correction shifts {label}: {shift_x} {shift_y}")
+ return (shift_x, shift_y)
diff --git a/csaxs_bec/bec_ipython_client/plugins/LamNI/extra_tomo.py b/csaxs_bec/bec_ipython_client/plugins/LamNI/extra_tomo.py
new file mode 100644
index 0000000..5eac3a4
--- /dev/null
+++ b/csaxs_bec/bec_ipython_client/plugins/LamNI/extra_tomo.py
@@ -0,0 +1,211 @@
+"""
+extra_tomo.py
+=============
+Specialist LamNI subclasses for specific experimental configurations.
+Import explicitly when needed, e.g.:
+
+ from csaxs_bec...extra_tomo import MagLamNI
+ from csaxs_bec...extra_tomo import DataDrivenLamNI
+"""
+
+import builtins
+import datetime
+import os
+import time
+
+import h5py
+import numpy as np
+from bec_lib import bec_logger
+from bec_lib.alarm_handler import AlarmBase
+
+from .lamni import LamNI
+
+logger = bec_logger.logger
+
+if builtins.__dict__.get("bec") is not None:
+ bec = builtins.__dict__.get("bec")
+ dev = builtins.__dict__.get("dev")
+ umv = builtins.__dict__.get("umv")
+ scans = builtins.__dict__.get("scans")
+
+
+class MagLamNI(LamNI):
+ """LamNI subclass for magnetic experiments (XMCD).
+
+ Adds a slow rotation helper and allows injection of a custom
+ per-angle callback via the ``lamni_at_each_angle`` builtin.
+ """
+
+ def sub_tomo_scan(self, subtomo_number, start_angle=None):
+ super().sub_tomo_scan(subtomo_number, start_angle)
+ # self.rotate_slowly(0)
+
+ def rotate_slowly(self, angle, step_size=20):
+ """Rotate to target angle in small steps to avoid mechanical stress."""
+ current_angle = dev.lsamrot.read(cached=True)["value"]
+ steps = int(np.ceil(np.abs(current_angle - angle) / step_size)) + 1
+ for target_angle in np.linspace(current_angle, angle, steps, endpoint=True):
+ umv(dev.lsamrot, target_angle)
+ scans.lamni_move_to_scan_center(
+ self.align.tomo_fovx_offset, self.align.tomo_fovy_offset, target_angle
+ )
+
+ def _at_each_angle(self, angle: float) -> None:
+ if "lamni_at_each_angle" in builtins.__dict__:
+ # pylint: disable=undefined-variable
+ lamni_at_each_angle(self, angle)
+ return
+
+ self.tomo_scan_projection(angle)
+ self.tomo_reconstruct()
+
+
+class DataDrivenLamNI(LamNI):
+ """LamNI subclass that reads per-projection scan parameters from an HDF5 file.
+
+ Instead of a fixed FOV and step size for the whole tomogram, each
+ projection can have individual values for step size, loptz position
+ and lateral shifts, as specified in a datadriven_params.h5 file.
+ """
+
+ def __init__(self, client):
+ super().__init__(client)
+ self.tomo_data = {}
+
+ def tomo_scan(
+ self,
+ subtomo_start=1,
+ start_index=None,
+ fname="~/Data10/data_driven_config/datadriven_params.h5",
+ ):
+ """Start a data-driven tomo scan.
+
+ Args:
+ subtomo_start (int): Unused; kept for API compatibility. Use start_index to resume.
+ start_index (int, optional): Skip projections before this index. Defaults to None.
+ fname (str): Path to the HDF5 parameter file. Defaults to the standard location.
+ """
+ bec = builtins.__dict__.get("bec")
+ scans = builtins.__dict__.get("scans")
+
+ fname = os.path.expanduser(fname)
+ if not os.path.exists(fname):
+ raise FileNotFoundError(f"Could not find datadriven params file in {fname}.")
+
+ content = f"Loading tomo parameters from {fname}."
+ logger.warning(content)
+ msg = bec.logbook.LogbookMessage()
+ msg.add_text(content).add_tag(["Data_driven_file", "BEC"])
+ self.client.logbook.send_logbook_message(msg)
+
+ self._update_tomo_data_from_file(fname)
+ self._current_special_angles = self.special_angles.copy()
+
+ if subtomo_start == 1 and start_index is None:
+ self.tomo_id = self.add_sample_database(
+ self.sample_name,
+ str(datetime.date.today()),
+ bec.active_account.decode(),
+ bec.queue.next_scan_number,
+ "lamni",
+ "test additional info",
+ "BEC",
+ )
+ self.write_pdf_report()
+
+ with scans.dataset_id_on_hold:
+ self.sub_tomo_data_driven(start_index)
+
+ def sub_tomo_scan(self, subtomo_number=None, start_angle=None):
+ raise NotImplementedError(
+ "Cannot run sub_tomo_scan with DataDrivenLamNI. "
+ "Use lamni.tomo_scan(start_index= Beamline checks failed at"
+ f" {str(datetime.datetime.now())}: {''.join(self._check_msgs)} Operation resumed at"
+ f" {str(datetime.datetime.now())}.
")).add_tag( + ["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "LamNI", self.sample_name] + ) + self.client.logbook.send_logbook_message(msg) + diff --git a/csaxs_bec/bec_ipython_client/plugins/LamNI/lamni_optics_mixin.py b/csaxs_bec/bec_ipython_client/plugins/LamNI/lamni_optics_mixin.py index f70385c..3ac50e5 100644 --- a/csaxs_bec/bec_ipython_client/plugins/LamNI/lamni_optics_mixin.py +++ b/csaxs_bec/bec_ipython_client/plugins/LamNI/lamni_optics_mixin.py @@ -8,8 +8,6 @@ from rich.table import Table from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_put, fshclose from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools - -# import builtins to avoid linter errors dev = builtins.__dict__.get("dev") umv = builtins.__dict__.get("umv") bec = builtins.__dict__.get("bec") @@ -19,7 +17,9 @@ class LamNIInitError(Exception): pass -class LaMNIInitStagesMixin: +class LaMNIInitStages: + """Handles hardware initialization and referencing of LamNI stages.""" + def __init__(self, client): super().__init__() self.client = client @@ -28,14 +28,11 @@ class LaMNIInitStagesMixin: def lamni_init_stages(self): if self.OMNYTools.yesno("Start initialization of LamNI stages. OK?"): - print("staring...") + print("starting...") dev.lsamrot.enabled = True else: return - - - if self.check_all_axes_of_lamni_referenced(): if self.OMNYTools.yesno("All axes are referenced. Continue anyways?"): print("ok then...") @@ -44,7 +41,6 @@ class LaMNIInitStagesMixin: axis_id_lsamrot = dev.lsamrot._config["deviceConfig"].get("axis_Id") if dev.lsamrot.controller.get_motor_limit_switch(axis_id_lsamrot)[1] == False: - if self.OMNYTools.yesno("The rotation stage will be moved to one limit"): print("starting...") else: @@ -56,7 +52,9 @@ class LaMNIInitStagesMixin: print("The controller will be disabled in bec. To enable dev.lsamrot.enabled=True") return - if self.OMNYTools.yesno("Init of loptz. Can the stage move to the upstream limit without collision?"): + if self.OMNYTools.yesno( + "Init of loptz. Can the stage move to the upstream limit without collision?" + ): print("ok then...") else: return @@ -81,13 +79,14 @@ class LaMNIInitStagesMixin: self.drive_axis_to_limit(dev.lsamy, "reverse") self.find_reference_mark(dev.lsamy) - # the dual encoder requires the reference mark to pass on both encoders print("Referencing lsamrot") self.drive_axis_to_limit(dev.lsamrot, "reverse") time.sleep(0.1) self.find_reference_mark(dev.lsamrot) - if self.OMNYTools.yesno("Init of leye. Can the stage move to -x limit without collision?"): + if self.OMNYTools.yesno( + "Init of leye. Can the stage move to -x limit without collision?" + ): print("starting...") else: return @@ -97,15 +96,6 @@ class LaMNIInitStagesMixin: print("Referencing leyey") self.drive_axis_to_limit(dev.leyey, "forward") - # set_lm lsamx 6 14 - # set_lm lsamy 6 14 - # set_lm lsamrot -3 362 - # set_lm loptx -1 -0.2 - # set_lm lopty 3.0 3.6 - # set_lm loptz 82 87 - # set_lm leyex 0 25 - # set_lm leyey 0.5 50 - print("Init of Smaract stages") dev.losax.controller.find_reference_mark(2, 0, 1000, 1) time.sleep(1) @@ -113,15 +103,6 @@ class LaMNIInitStagesMixin: time.sleep(1) dev.losax.controller.find_reference_mark(1, 0, 1000, 1) time.sleep(1) - # dev.losax.controller.find_reference_mark(3, 1, 1000, 1) - # time.sleep(1) - # dev.losax.controller.find_reference_mark(4, 1, 1000, 1) - # time.sleep(1) - - # set_lm losax -1.5 0.25 - # set_lm losay -2.5 4.1 - # set_lm losaz -4.1 -0.5 - # set_lm lcsy -1.5 5 self._align_setup() @@ -178,6 +159,8 @@ class LaMNIInitStagesMixin: class LamNIOpticsMixin: + """Optics movement methods: FZP, OSA, central stop and X-ray eye.""" + @staticmethod def _get_user_param_safe(device, var): param = dev[device].user_parameter @@ -192,13 +175,11 @@ class LamNIOpticsMixin: umv(dev.leyey, leyey_out) epics_put("XOMNYI-XEYE-ACQ:0", 2) - # move rotation stage to zero to avoid problems with wires umv(dev.lsamrot, 0) umv(dev.dttrz, 5854, dev.fttrz, 2395) def leye_in(self): bec.queue.next_dataset_number += 1 - # move rotation stage to zero to avoid problems with wires umv(dev.lsamrot, 0) umv(dev.dttrz, 6419.677, dev.fttrz, 2959.979) while True: @@ -215,15 +196,10 @@ class LamNIOpticsMixin: def _lfzp_in(self): loptx_in = self._get_user_param_safe("loptx", "in") lopty_in = self._get_user_param_safe("lopty", "in") - umv( - dev.loptx, loptx_in, dev.lopty, lopty_in - ) # for 7.2567 keV and 150 mu, 60 nm fzp, loptz 83.6000 for propagation 1.4 mm + umv(dev.loptx, loptx_in, dev.lopty, lopty_in) def lfzp_in(self): - """ - move in the lamni zone plate. - This will disable rt feedback, move the FZP and re-enabled the feedback. - """ + """Move in the LamNI zone plate, disabling/re-enabling RT feedback around the move.""" if "rtx" in dev and dev.rtx.enabled: dev.rtx.controller.feedback_disable() @@ -233,18 +209,15 @@ class LamNIOpticsMixin: dev.rtx.controller.feedback_enable_with_reset() def loptics_in(self): - """ - Move in the lamni optics, including the FZP and the OSA. - """ + """Move in the LamNI optics (FZP + OSA).""" self.lfzp_in() self.losa_in() def loptics_out(self): - """Move out the lamni optics""" + """Move out the LamNI optics.""" if "rtx" in dev and dev.rtx.enabled: dev.rtx.controller.feedback_disable() - # self.lcs_out() self.losa_out() loptx_out = self._get_user_param_safe("loptx", "out") lopty_out = self._get_user_param_safe("lopty", "out") @@ -255,28 +228,17 @@ class LamNIOpticsMixin: dev.rtx.controller.feedback_enable_with_reset() def lcs_in(self): - # umv lcsx -1.852 lcsy -0.095 pass def lcs_out(self): umv(dev.lcsy, 3) def losa_in(self): - # 6.2 keV, 170 um FZP - # umv(dev.losax, -1.4450000, dev.losay, -0.1800) - # umv(dev.losaz, -1) - # 6.7, 170 - # umv(dev.losax, -1.4850, dev.losay, -0.1930) - # umv(dev.losaz, 1.0000) - # 7.2, 150 losax_in = self._get_user_param_safe("losax", "in") losay_in = self._get_user_param_safe("losay", "in") losaz_in = self._get_user_param_safe("losaz", "in") umv(dev.losax, losax_in, dev.losay, losay_in) umv(dev.losaz, losaz_in) - # 11 kev - # umv(dev.losax, -1.161000, dev.losay, -0.196) - # umv(dev.losaz, 1.0000) def losa_out(self): losay_out = self._get_user_param_safe("losay", "out") @@ -285,11 +247,10 @@ class LamNIOpticsMixin: umv(dev.losay, losay_out) def lfzp_info(self, mokev_val=-1): - if mokev_val == -1: try: mokev_val = dev.mokev.readback.get() - except: + except Exception: print( "Device mokev does not exist. You can specify the energy in keV as an argument instead." ) @@ -324,10 +285,6 @@ class LamNIOpticsMixin: ) console.print(table) - - print("OSA Information:") - # print(f"Current losaz %.1f\n", A[losaz]) - # print("The OSA will collide with the sample plane at %.1f\n\n", 89.3-A[loptz]) print( "The numbers presented here are for a sample in the plane of the lamni sample holder.\n" ) diff --git a/csaxs_bec/bec_ipython_client/plugins/LamNI/load_additional_correction.py b/csaxs_bec/bec_ipython_client/plugins/LamNI/load_additional_correction.py deleted file mode 100644 index 40e966c..0000000 --- a/csaxs_bec/bec_ipython_client/plugins/LamNI/load_additional_correction.py +++ /dev/null @@ -1,22 +0,0 @@ -def lamni_read_additional_correction(): - # "additional_correction_shift" - # [0][] x , [1][] y, [2][] angle, [3][0] number of elements - - with open("correction_lamni_um_S01405_.txt", "r") as f: - num_elements = f.readline() - int_num_elements = int(num_elements.split(" ")[2]) - print(int_num_elements) - corr_pos_x = [] - corr_pos_y = [] - corr_angle = [] - for j in range(0, int_num_elements * 3): - line = f.readline() - value = line.split(" ")[2] - name = line.split(" ")[0].split("[")[0] - if name == "corr_pos_x": - corr_pos_x.append(value) - elif name == "corr_pos_y": - corr_pos_y.append(value) - elif name == "corr_angle": - corr_angle.append(value) - return (corr_pos_x, corr_pos_y, corr_angle, num_elements) diff --git a/csaxs_bec/bec_ipython_client/plugins/LamNI/x_ray_eye_align.py b/csaxs_bec/bec_ipython_client/plugins/LamNI/x_ray_eye_align.py deleted file mode 100644 index 88c0957..0000000 --- a/csaxs_bec/bec_ipython_client/plugins/LamNI/x_ray_eye_align.py +++ /dev/null @@ -1,1732 +0,0 @@ -import builtins -import datetime -import os -import subprocess -import threading -import time -from collections import defaultdict -from pathlib import Path - -import h5py -import numpy as np -from bec_lib import bec_logger -from bec_lib.alarm_handler import AlarmBase -from bec_lib.pdf_writer import PDFWriter -from typeguard import typechecked - -from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen -from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools - -from .lamni_optics_mixin import LaMNIInitStagesMixin, LamNIOpticsMixin - -logger = bec_logger.logger - - -if builtins.__dict__.get("bec") is not None: - bec = builtins.__dict__.get("bec") - dev = builtins.__dict__.get("dev") - umv = builtins.__dict__.get("umv") - umvr = builtins.__dict__.get("umvr") - -class XrayEyeAlign: - # pixel calibration, multiply to get mm - # PIXEL_CALIBRATION = 0.2/209 #.2 with binning - PIXEL_CALIBRATION = 0.2 / 218 # .2 with binning - - def __init__(self, client, lamni) -> None: - self.client = client - self.lamni = lamni - self.device_manager = client.device_manager - self.scans = client.scans - # self.xeye = self.device_manager.devices.xeye - self.alignment_values = defaultdict(list) - self._reset_init_values() - self.corr_pos_x = [] - self.corr_pos_y = [] - self.corr_angle = [] - self.corr_pos_x_2 = [] - self.corr_pos_y_2 = [] - self.corr_angle_2 = [] - - def reset_correction(self): - self.corr_pos_x = [] - self.corr_pos_y = [] - self.corr_angle = [] - - def reset_correction_2(self): - self.corr_pos_x_2 = [] - self.corr_pos_y_2 = [] - self.corr_angle_2 = [] - - def reset_xray_eye_correction(self): - self.client.delete_global_var("tomo_fit_xray_eye") - - @property - def tomo_fovx_offset(self): - val = self.client.get_global_var("tomo_fov_offset") - if val is None: - return 0.0 - return val[0] / 1000 - - @tomo_fovx_offset.setter - @typechecked - def tomo_fovx_offset(self, val: float): - val_old = self.client.get_global_var("tomo_fov_offset") - if val_old is None: - val_old = [0.0, 0.0] - self.client.set_global_var("tomo_fov_offset", [val * 1000, val_old[1]]) - - @property - def tomo_fovy_offset(self): - val = self.client.get_global_var("tomo_fov_offset") - if val is None: - return 0.0 - return val[1] / 1000 - - @tomo_fovy_offset.setter - @typechecked - def tomo_fovy_offset(self, val: float): - val_old = self.client.get_global_var("tomo_fov_offset") - if val_old is None: - val_old = [0.0, 0.0] - self.client.set_global_var("tomo_fov_offset", [val_old[0], val * 1000]) - - def _reset_init_values(self): - self.shift_xy = [0, 0] - self._xray_fov_xy = [0, 0] - - def save_frame(self): - epics_put("XOMNYI-XEYE-SAVFRAME:0", 1) - - def update_frame(self): - epics_put("XOMNYI-XEYE-ACQDONE:0", 0) - # start live - epics_put("XOMNYI-XEYE-ACQ:0", 1) - # wait for start live - while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0: - time.sleep(0.5) - print("waiting for live view to start...") - fshopen() - - epics_put("XOMNYI-XEYE-ACQDONE:0", 0) - - while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0: - print("waiting for new frame...") - time.sleep(0.5) - - time.sleep(0.5) - # stop live view - epics_put("XOMNYI-XEYE-ACQ:0", 0) - time.sleep(1) - # fshclose - print("got new frame") - - def _disable_rt_feedback(self): - self.device_manager.devices.rtx.controller.feedback_disable() - - def _enable_rt_feedback(self): - self.device_manager.devices.rtx.controller.feedback_enable_with_reset() - - def tomo_rotate(self, val: float): - # pylint: disable=undefined-variable - umv(self.device_manager.devices.lsamrot, val) - - def get_tomo_angle(self): - return self.device_manager.devices.lsamrot.readback.read()["lsamrot"]["value"] - - def update_fov(self, k: int): - self._xray_fov_xy[0] = max(epics_get(f"XOMNYI-XEYE-XWIDTH_X:{k}"), self._xray_fov_xy[0]) - self._xray_fov_xy[1] = max(0, self._xray_fov_xy[0]) - - @property - def movement_buttons_enabled(self): - return [epics_get("XOMNYI-XEYE-ENAMVX:0"), epics_get("XOMNYI-XEYE-ENAMVY:0")] - - @movement_buttons_enabled.setter - def movement_buttons_enabled(self, enabled: bool): - enabled = int(enabled) - epics_put("XOMNYI-XEYE-ENAMVX:0", enabled) - epics_put("XOMNYI-XEYE-ENAMVY:0", enabled) - - def send_message(self, msg: str): - epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg) - - def align(self): - # reset shift xy and fov params - self._reset_init_values() - self.reset_correction() - self.reset_correction_2() - - # this makes sure we are in a defined state - self._disable_rt_feedback() - - epics_put("XOMNYI-XEYE-PIXELSIZE:0", self.PIXEL_CALIBRATION) - - self._enable_rt_feedback() - - # initialize - # disable movement buttons - self.movement_buttons_enabled = False - - epics_put("XOMNYI-XEYE-ACQ:0", 0) - self.send_message("please wait...") - - # put sample name - epics_put("XOMNYI-XEYE-SAMPLENAME:0.DESC", "Let us LAMNI...") - - # first step - self._disable_rt_feedback() - k = 0 - - # move zone plate in, eye in to get beam position - self.lamni.lfzp_in() - - self.update_frame() - - # enable submit buttons - self.movement_buttons_enabled = False - epics_put("XOMNYI-XEYE-SUBMIT:0", 0) - epics_put("XOMNYI-XEYE-STEP:0", 0) - self.send_message("Submit center value of FZP.") - - while True: - if epics_get("XOMNYI-XEYE-SUBMIT:0") == 1: - val_x = epics_get(f"XOMNYI-XEYE-XVAL_X:{k}") * self.PIXEL_CALIBRATION # in mm - val_y = epics_get(f"XOMNYI-XEYE-YVAL_Y:{k}") * self.PIXEL_CALIBRATION # in mm - self.alignment_values[k] = [val_x, val_y] - print( - f"Clicked position {k}: x {self.alignment_values[k][0]}, y" - f" {self.alignment_values[k][1]}" - ) - - if k == 0: # received center value of FZP - self.send_message("please wait ...") - # perform movement: FZP out, Sample in - self.lamni.loptics_out() - epics_put("XOMNYI-XEYE-SUBMIT:0", -1) # disable submit button - self.movement_buttons_enabled = False - print("Moving sample in, FZP out") - - self._disable_rt_feedback() - time.sleep(0.3) - self._enable_rt_feedback() - time.sleep(0.3) - - # zero is now at the center - self.update_frame() - self.send_message("Go and find the sample") - epics_put("XOMNYI-XEYE-SUBMIT:0", 0) - self.movement_buttons_enabled = True - - elif ( - k == 1 - ): # received sample center value at samroy 0 ie the final base shift values - msg = ( - f"Base shift values from movement are x {self.shift_xy[0]}, y" - f" {self.shift_xy[1]}" - ) - print(msg) - logger.info(msg) - self.shift_xy[0] += ( - self.alignment_values[0][0] - self.alignment_values[1][0] - ) * 1000 - self.shift_xy[1] += ( - self.alignment_values[1][1] - self.alignment_values[0][1] - ) * 1000 - print( - "Base shift values from movement and clicked position are x" - f" {self.shift_xy[0]}, y {self.shift_xy[1]}" - ) - - self.scans.lamni_move_to_scan_center( - self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle() - ).wait() - - self.send_message("please wait ...") - epics_put("XOMNYI-XEYE-SUBMIT:0", -1) # disable submit button - self.movement_buttons_enabled = False - time.sleep(1) - - self.scans.lamni_move_to_scan_center( - self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle() - ).wait() - - epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle()) - self.update_frame() - self.send_message("Submit sample center and FOV (0 deg)") - epics_put("XOMNYI-XEYE-SUBMIT:0", 0) - self.update_fov(k) - - elif 1 < k < 10: # received sample center value at samroy 0 ... 315 - self.send_message("please wait ...") - epics_put("XOMNYI-XEYE-SUBMIT:0", -1) # disable submit button - - # we swtich feedback off before rotating to not have it on and off again later for smooth operation - self._disable_rt_feedback() - self.tomo_rotate((k - 1) * 45 - 45 / 2) - self.scans.lamni_move_to_scan_center( - self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle() - ).wait() - self._disable_rt_feedback() - self.tomo_rotate((k - 1) * 45) - self.scans.lamni_move_to_scan_center( - self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle() - ).wait() - - epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle()) - self.update_frame() - self.send_message("Submit sample center") - epics_put("XOMNYI-XEYE-SUBMIT:0", 0) - epics_put("XOMNYI-XEYE-ENAMVX:0", 1) - self.update_fov(k) - - elif k == 10: # received sample center value at samroy 270 and done - self.send_message("done...") - epics_put("XOMNYI-XEYE-SUBMIT:0", -1) # disable submit button - self.movement_buttons_enabled = False - self.update_fov(k) - break - - k += 1 - epics_put("XOMNYI-XEYE-STEP:0", k) - if k < 2: - # allow movements, store movements to calculate center - _xrayeyalignmvx = epics_get("XOMNYI-XEYE-MVX:0") - _xrayeyalignmvy = epics_get("XOMNYI-XEYE-MVY:0") - if _xrayeyalignmvx != 0 or _xrayeyalignmvy != 0: - self.shift_xy[0] = self.shift_xy[0] + _xrayeyalignmvx - self.shift_xy[1] = self.shift_xy[1] + _xrayeyalignmvy - self.scans.lamni_move_to_scan_center( - self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle() - ).wait() - print( - f"Current center horizontal {self.shift_xy[0]} vertical {self.shift_xy[1]}" - ) - epics_put("XOMNYI-XEYE-MVY:0", 0) - epics_put("XOMNYI-XEYE-MVX:0", 0) - self.update_frame() - - time.sleep(0.2) - - self.write_output() - fovx = self._xray_fov_xy[0] * self.PIXEL_CALIBRATION * 1000 / 2 - fovy = self._xray_fov_xy[1] * self.PIXEL_CALIBRATION * 1000 / 2 - print( - f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns, fovy" - f" = {fovy:.0f} microns" - ) - print("Use matlab routine to fit the current alignment...") - - print( - "This additional shift is applied to the base shift values\n which are x" - f" {self.shift_xy[0]}, y {self.shift_xy[1]}" - ) - - self._disable_rt_feedback() - - self.tomo_rotate(0) - - print( - "\n\nNEXT LOAD ALIGNMENT PARAMETERS\nby running" - " lamni.align.read_xray_eye_correction()\n" - ) - - self.client.set_global_var("tomo_fov_offset", self.shift_xy) - - def write_output(self): - with open( - os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues"), "w" - ) as alignment_values_file: - alignment_values_file.write("angle\thorizontal\tvertical\n") - for k in range(2, 11): - fovx_offset = (self.alignment_values[0][0] - self.alignment_values[k][0]) * 1000 - fovy_offset = (self.alignment_values[k][1] - self.alignment_values[0][1]) * 1000 - print( - f"Writing to file new alignment: number {k}, value x {fovx_offset}, y" - f" {fovy_offset}" - ) - alignment_values_file.write(f"{(k-2)*45}\t{fovx_offset}\t{fovy_offset}\n") - - def read_xray_eye_correction(self, dir_path=os.path.expanduser("~/Data10/specES1/internal/")): - tomo_fit_xray_eye = np.zeros((2, 3)) - with open(os.path.join(dir_path, "ptychotomoalign_Ax.txt"), "r") as file: - tomo_fit_xray_eye[0][0] = file.readline() - - with open(os.path.join(dir_path, "ptychotomoalign_Bx.txt"), "r") as file: - tomo_fit_xray_eye[0][1] = file.readline() - - with open(os.path.join(dir_path, "ptychotomoalign_Cx.txt"), "r") as file: - tomo_fit_xray_eye[0][2] = file.readline() - - with open(os.path.join(dir_path, "ptychotomoalign_Ay.txt"), "r") as file: - tomo_fit_xray_eye[1][0] = file.readline() - - with open(os.path.join(dir_path, "ptychotomoalign_By.txt"), "r") as file: - tomo_fit_xray_eye[1][1] = file.readline() - - with open(os.path.join(dir_path, "ptychotomoalign_Cy.txt"), "r") as file: - tomo_fit_xray_eye[1][2] = file.readline() - - self.client.set_global_var("tomo_fit_xray_eye", tomo_fit_xray_eye.tolist()) - # x amp, phase, offset, y amp, phase, offset - # 0 0 0 1 0 2 1 0 1 1 1 2 - - print("New alignment parameters loaded from X-ray eye") - print( - f"X Amplitude {tomo_fit_xray_eye[0][0]}," - f"X Phase {tomo_fit_xray_eye[0][1]}, " - f"X Offset {tomo_fit_xray_eye[0][2]}," - f"Y Amplitude {tomo_fit_xray_eye[1][0]}," - f"Y Phase {tomo_fit_xray_eye[1][1]}," - f"Y Offset {tomo_fit_xray_eye[1][2]}" - ) - - def lamni_compute_additional_correction_xeye_mu(self, angle): - tomo_fit_xray_eye = self.client.get_global_var("tomo_fit_xray_eye") - if tomo_fit_xray_eye is None: - print("Not applying any additional correction. No x-ray eye data available.\n") - return (0, 0) - - # x amp, phase, offset, y amp, phase, offset - # 0 0 0 1 0 2 1 0 1 1 1 2 - correction_x = ( - tomo_fit_xray_eye[0][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[0][1]) - + tomo_fit_xray_eye[0][2] - ) / 1000 - correction_y = ( - tomo_fit_xray_eye[1][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[1][1]) - + tomo_fit_xray_eye[1][2] - ) / 1000 - - print(f"Xeye correction x {correction_x}, y {correction_y} for angle {angle}\n") - return (correction_x, correction_y) - - def compute_additional_correction(self, angle): - if not self.corr_pos_x: - print("Not applying any additional correction. No data available.\n") - return (0, 0) - - # find index of closest angle - for j, _ in enumerate(self.corr_pos_x): - newangledelta = np.fabs(self.corr_angle[j] - angle) - if j == 0: - angledelta = newangledelta - additional_correction_shift_x = self.corr_pos_x[j] - additional_correction_shift_y = self.corr_pos_y[j] - continue - - if newangledelta < angledelta: - additional_correction_shift_x = self.corr_pos_x[j] - additional_correction_shift_y = self.corr_pos_y[j] - angledelta = newangledelta - - if additional_correction_shift_x == 0 and angle < self.corr_angle[0]: - additional_correction_shift_x = self.corr_pos_x[0] - additional_correction_shift_y = self.corr_pos_y[0] - - if additional_correction_shift_x == 0 and angle > self.corr_angle[-1]: - additional_correction_shift_x = self.corr_pos_x[-1] - additional_correction_shift_y = self.corr_pos_y[-1] - print( - "Additional correction shifts:" - f" {additional_correction_shift_x} {additional_correction_shift_y}" - ) - return (additional_correction_shift_x, additional_correction_shift_y) - - def read_additional_correction(self, correction_file: str): - with open(correction_file, "r") as f: - num_elements = f.readline() - int_num_elements = int(num_elements.split(" ")[2]) - print(int_num_elements) - corr_pos_x = [] - corr_pos_y = [] - corr_angle = [] - for j in range(0, int_num_elements * 3): - line = f.readline() - value = line.split(" ")[2] - name = line.split(" ")[0].split("[")[0] - if name == "corr_pos_x": - corr_pos_x.append(float(value) / 1000) - elif name == "corr_pos_y": - corr_pos_y.append(float(value) / 1000) - elif name == "corr_angle": - corr_angle.append(float(value)) - self.corr_pos_x = corr_pos_x - self.corr_pos_y = corr_pos_y - self.corr_angle = corr_angle - return - - def compute_additional_correction_2(self, angle): - if not self.corr_pos_x_2: - print("Not applying any additional secondary correction. No data available.\n") - return (0, 0) - - # find index of closest angle - for j, _ in enumerate(self.corr_pos_x_2): - newangledelta = np.fabs(self.corr_angle_2[j] - angle) - if j == 0: - angledelta = newangledelta - additional_correction_shift_x = self.corr_pos_x_2[j] - additional_correction_shift_y = self.corr_pos_y_2[j] - continue - - if newangledelta < angledelta: - additional_correction_shift_x = self.corr_pos_x_2[j] - additional_correction_shift_y = self.corr_pos_y_2[j] - angledelta = newangledelta - - if additional_correction_shift_x == 0 and angle < self.corr_angle_2[0]: - additional_correction_shift_x = self.corr_pos_x_2[0] - additional_correction_shift_y = self.corr_pos_y_2[0] - - if additional_correction_shift_x == 0 and angle > self.corr_angle_2[-1]: - additional_correction_shift_x = self.corr_pos_x_2[-1] - additional_correction_shift_y = self.corr_pos_y_2[-1] - print( - "Additional correction shifts 2:" - f" {additional_correction_shift_x} {additional_correction_shift_y}" - ) - return (additional_correction_shift_x, additional_correction_shift_y) - - def read_additional_correction_2(self, correction_file: str): - with open(correction_file, "r") as f: - num_elements = f.readline() - int_num_elements = int(num_elements.split(" ")[2]) - print(int_num_elements) - corr_pos_x = [] - corr_pos_y = [] - corr_angle = [] - for j in range(0, int_num_elements * 3): - line = f.readline() - value = line.split(" ")[2] - name = line.split(" ")[0].split("[")[0] - if name == "corr_pos_x": - corr_pos_x.append(float(value) / 1000) - elif name == "corr_pos_y": - corr_pos_y.append(float(value) / 1000) - elif name == "corr_angle": - corr_angle.append(float(value)) - self.corr_pos_x_2 = corr_pos_x - self.corr_pos_y_2 = corr_pos_y - self.corr_angle_2 = corr_angle - return - - -class LamNI(LamNIOpticsMixin): - def __init__(self, client): - super().__init__() - self.client = client - self.device_manager = client.device_manager - self.align = XrayEyeAlign(client, self) - self.init = LaMNIInitStagesMixin(client) - self.check_shutter = True - self.check_light_available = True - self.check_fofb = True - self._check_msgs = [] - self.tomo_id = -1 - self.special_angles = [] - self.special_angle_repeats = 20 - self.special_angle_tolerance = 20 - self._current_special_angles = [] - self._beam_is_okay = True - self._stop_beam_check_event = None - self.beam_check_thread = None - self.OMNYTools = OMNYTools(self.client) - # Progress tracking - self.progress = {} - self.progress["tomo_type"] = "Equally spaced sub-tomograms" - self.progress["subtomo"] = 0 - self.progress["subtomo_projection"] = 0 - self.progress["subtomo_total_projections"] = 1 - self.progress["projection"] = 0 - self.progress["total_projections"] = 1 - self.progress["angle"] = 0 - - def get_beamline_checks_enabled(self): - print( - f"Shutter: {self.check_shutter}\nFOFB: {self.check_fofb}\nLight available:" - f" {self.check_light_available}" - ) - - @property - def beamline_checks_enabled(self): - return { - "shutter": self.check_shutter, - "fofb": self.check_fofb, - "light available": self.check_light_available, - } - - @beamline_checks_enabled.setter - def beamline_checks_enabled(self, val: bool): - self.check_shutter = val - self.check_light_available = val - self.check_fofb = val - self.get_beamline_checks_enabled() - - def set_special_angles(self, angles: list, repeats: int = 20, tolerance: float = 0.5): - """Set the special angles for a tomo - - Args: - angles (list): List of special angles - repeats (int, optional): Number of repeats at a special angle. Defaults to 20. - tolerance (float, optional): Number of repeats at a special angle. Defaults to 0.5. - - """ - self.special_angles = angles - self.special_angle_repeats = repeats - self.special_angle_tolerance = tolerance - - def remove_special_angles(self): - """Remove the special angles and set the number of repeats to 1""" - self.special_angles = [] - self.special_angle_repeats = 1 - - @property - def tomo_shellstep(self): - val = self.client.get_global_var("tomo_shellstep") - if val is None: - return 1 - return val - - @tomo_shellstep.setter - def tomo_shellstep(self, val: float): - self.client.set_global_var("tomo_shellstep", val) - - @property - def tomo_circfov(self): - val = self.client.get_global_var("tomo_circfov") - if val is None: - return 0.0 - return val - - @tomo_circfov.setter - def tomo_circfov(self, val: float): - self.client.set_global_var("tomo_circfov", val) - - @property - def tomo_type(self): - val = self.client.get_global_var("tomo_type") - if val is None: - return 1 - return val - - @tomo_type.setter - def tomo_type(self, val: float): - if val == 1: - # equally spaced tomography with 8 sub tomograms - self.client.set_global_var("tomo_type", val) - elif val == 2: - # golden ratio tomography (sorted bunches) - self.client.set_global_var("tomo_type", val) - elif val == 3: - # equally spaced tomography with starting angles shifted by golden ratio - self.client.set_global_var("tomo_type", val) - else: - raise ValueError("Unknown tomo_type.") - - @property - def tomo_countingtime(self): - val = self.client.get_global_var("tomo_countingtime") - if val is None: - return 0.1 - return val - - @tomo_countingtime.setter - def tomo_countingtime(self, val: float): - self.client.set_global_var("tomo_countingtime", val) - - @property - def manual_shift_x(self): - val = self.client.get_global_var("manual_shift_x") - if val is None: - return 0.0 - return val - - @manual_shift_x.setter - def manual_shift_x(self, val: float): - self.client.set_global_var("manual_shift_x", val) - - @property - def manual_shift_y(self): - val = self.client.get_global_var("manual_shift_y") - if val is None: - return 0.0 - return val - - @manual_shift_y.setter - def manual_shift_y(self, val: float): - self.client.set_global_var("manual_shift_y", val) - - @property - def lamni_piezo_range_x(self): - val = self.client.get_global_var("lamni_piezo_range_x") - if val is None: - return 20 - return val - - @lamni_piezo_range_x.setter - def lamni_piezo_range_x(self, val: float): - if dev.rtx.user_parameter and dev.rtx.user_parameter.get("large_range_scan", True): - self.client.set_global_var("lamni_piezo_range_x", val) - return - if val > 80: - raise ValueError("Piezo range cannot be larger than 80 um.") - self.client.set_global_var("lamni_piezo_range_x", val) - - @property - def lamni_piezo_range_y(self): - val = self.client.get_global_var("lamni_piezo_range_y") - if val is None: - return 20 - return val - - @lamni_piezo_range_y.setter - def lamni_piezo_range_y(self, val: float): - if dev.rtx.user_parameter and dev.rtx.user_parameter.get("large_range_scan", True): - self.client.set_global_var("lamni_piezo_range_y", val) - return - if val > 80: - raise ValueError("Piezo range cannot be larger than 80 um.") - self.client.set_global_var("lamni_piezo_range_y", val) - - @property - def corridor_size(self): - val = self.client.get_global_var("corridor_size") - if val is None: - val = -1 - return val - - @corridor_size.setter - def corridor_size(self, val: float): - self.client.set_global_var("corridor_size", val) - - @property - def lamni_stitch_x(self): - val = self.client.get_global_var("lamni_stitch_x") - if val is None: - return 0 - return val - - @lamni_stitch_x.setter - @typechecked - def lamni_stitch_x(self, val: int): - self.client.set_global_var("lamni_stitch_x", val) - - @property - def lamni_stitch_y(self): - val = self.client.get_global_var("lamni_stitch_y") - if val is None: - return 0 - return val - - @lamni_stitch_y.setter - @typechecked - def lamni_stitch_y(self, val: int): - self.client.set_global_var("lamni_stitch_y", val) - - @property - def ptycho_reconstruct_foldername(self): - val = self.client.get_global_var("ptycho_reconstruct_foldername") - if val is None: - return "ptycho_reconstruct" - return val - - @ptycho_reconstruct_foldername.setter - def ptycho_reconstruct_foldername(self, val: str): - self.client.set_global_var("ptycho_reconstruct_foldername", val) - - @property - def tomo_angle_stepsize(self): - val = self.client.get_global_var("tomo_angle_stepsize") - if val is None: - return 10.0 - return val - - @tomo_angle_stepsize.setter - def tomo_angle_stepsize(self, val: float): - self.client.set_global_var("tomo_angle_stepsize", val) - - @property - def tomo_stitch_overlap(self): - val = self.client.get_global_var("tomo_stitch_overlap") - if val is None: - return 0.2 - return val - - @tomo_stitch_overlap.setter - def tomo_stitch_overlap(self, val: float): - self.client.set_global_var("tomo_stitch_overlap", val) - - @property - def golden_max_number_of_projections(self): - val = self.client.get_global_var("golden_max_number_of_projections") - if val is None: - return 1000.0 - return val - - @golden_max_number_of_projections.setter - def golden_max_number_of_projections(self, val: float): - self.client.set_global_var("golden_max_number_of_projections", val) - - @property - def golden_ratio_bunch_size(self): - val = self.client.get_global_var("golden_ratio_bunch_size") - if val is None: - return 20 - return val - - @golden_ratio_bunch_size.setter - def golden_ratio_bunch_size(self, val: float): - if val < 20: - raise ValueError("golden_ratio_bunch_size must be at least 20.") - self.client.set_global_var("golden_ratio_bunch_size", val) - - @property - def golden_projections_at_0_deg_for_damage_estimation(self): - val = self.client.get_global_var("golden_projections_at_0_deg_for_damage_estimation") - if val is None: - return 0 - return val - - @golden_projections_at_0_deg_for_damage_estimation.setter - def golden_projections_at_0_deg_for_damage_estimation(self, val: float): - self.client.set_global_var("golden_projections_at_0_deg_for_damage_estimation", val) - - @property - def sample_name(self): - val = self.client.get_global_var("sample_name") - if val is None: - return "bec_test_sample" - return val - - @sample_name.setter - @typechecked - def sample_name(self, val: str): - self.client.set_global_var("sample_name", val) - - def write_to_spec_log(self, content): - try: - with open( - os.path.expanduser( - "~/Data10/specES1/log-files/specES1_started_2022_11_30_1313.log" - ), - "a", - ) as log_file: - log_file.write(content) - except Exception: - logger.warning("Failed to write to spec log file (omny web page).") - - def write_to_scilog(self, content, tags: list = None): - try: - if tags is not None: - tags.append("BEC") - else: - tags = ["BEC"] - msg = bec.logbook.LogbookMessage() - msg.add_text(content).add_tag(tags) - self.client.logbook.send_logbook_message(msg) - except Exception: - logger.warning("Failed to write to scilog.") - - def tomo_scan_projection(self, angle: float): - scans = builtins.__dict__.get("scans") - - additional_correction = self.align.compute_additional_correction(angle) - additional_correction_2 = self.align.compute_additional_correction_2(angle) - correction_xeye_mu = self.align.lamni_compute_additional_correction_xeye_mu(angle) - - self._current_scan_list = [] - - for stitch_x in range(-self.lamni_stitch_x, self.lamni_stitch_x + 1): - for stitch_y in range(-self.lamni_stitch_y, self.lamni_stitch_y + 1): - # pylint: disable=undefined-variable - self._current_scan_list.append(bec.queue.next_scan_number) - logger.info( - f"scans.lamni_fermat_scan(fov_size=[{self.lamni_piezo_range_x},{self.lamni_piezo_range_y}]," - f" step={self.tomo_shellstep}, stitch_x={0}, stitch_y={0}," - f" stitch_overlap={1},center_x={self.align.tomo_fovx_offset}," - f" center_y={self.align.tomo_fovy_offset}," - f" shift_x={self.manual_shift_x+correction_xeye_mu[0]-additional_correction[0]-additional_correction_2[0]}," - f" shift_y={self.manual_shift_y+correction_xeye_mu[1]-additional_correction[1]-additional_correction_2[1]}," - f" fov_circular={self.tomo_circfov}, angle={angle}, scan_type='fly')" - ) - log_message = ( - f"{str(datetime.datetime.now())}: LamNI scan projection at angle {angle}, scan" - f" number {bec.queue.next_scan_number}.\n" - ) - self.write_to_spec_log(log_message) - # self.write_to_scilog(log_message, ["BEC_scans", self.sample_name]) - corridor_size = self.corridor_size if self.corridor_size > 0 else None - scans.lamni_fermat_scan( - fov_size=[self.lamni_piezo_range_x, self.lamni_piezo_range_y], - step=self.tomo_shellstep, - stitch_x=stitch_x, - stitch_y=stitch_y, - stitch_overlap=self.tomo_stitch_overlap, - center_x=self.align.tomo_fovx_offset, - center_y=self.align.tomo_fovy_offset, - shift_x=( - self.manual_shift_x - + correction_xeye_mu[0] - - additional_correction[0] - - additional_correction_2[0] - ), - shift_y=( - self.manual_shift_y - + correction_xeye_mu[1] - - additional_correction[1] - - additional_correction_2[1] - ), - fov_circular=self.tomo_circfov, - angle=angle, - scan_type="fly", - exp_time=self.tomo_countingtime, - optim_trajectory_corridor=corridor_size, - ) - - def _run_beamline_checks(self): - msgs = [] - dev = builtins.__dict__.get("dev") - try: - if self.check_shutter: - shutter_val = dev.x12sa_es1_shutter_status.read(cached=True) - if shutter_val["value"].lower() != "open": - self._beam_is_okay = False - msgs.append("Check beam failed: Shutter is closed.") - if self.check_light_available: - machine_status = dev.sls_machine_status.read(cached=True) - if machine_status["value"] not in ["Light Available", "Light-Available"]: - self._beam_is_okay = False - msgs.append("Check beam failed: Light not available.") - if self.check_fofb: - fast_orbit_feedback = dev.sls_fast_orbit_feedback.read(cached=True) - if fast_orbit_feedback["value"] != "running": - self._beam_is_okay = False - msgs.append("Check beam failed: Fast orbit feedback is not running.") - except Exception: - logger.warning("Failed to check beam.") - return msgs - - def _check_beam(self): - while not self._stop_beam_check_event.is_set(): - self._check_msgs = self._run_beamline_checks() - - if not self._beam_is_okay: - self._stop_beam_check_event.set() - time.sleep(1) - - def _start_beam_check(self): - self._beam_is_okay = True - self._stop_beam_check_event = threading.Event() - - self.beam_check_thread = threading.Thread(target=self._check_beam, daemon=True) - self.beam_check_thread.start() - - def _was_beam_okay(self): - self._stop_beam_check_event.set() - self.beam_check_thread.join() - return self._beam_is_okay - - def _print_beamline_checks(self): - for msg in self._check_msgs: - logger.warning(msg) - - def _wait_for_beamline_checks(self): - self._print_beamline_checks() - try: - msg = bec.logbook.LogbookMessage() - msg.add_text( - "
Beamline checks failed at" - f" {str(datetime.datetime.now())}: {''.join(self._check_msgs)}
" - ).add_tag(["BEC", "beam_check"]) - self.client.logbook.send_logbook_message(msg) - except Exception: - logger.warning("Failed to send update to SciLog.") - - while True: - self._beam_is_okay = True - self._check_msgs = self._run_beamline_checks() - if self._beam_is_okay: - break - self._print_beamline_checks() - time.sleep(1) - - try: - msg = bec.logbook.LogbookMessage() - msg.add_text( - "Operation resumed at" - f" {str(datetime.datetime.now())}.
" - ).add_tag(["BEC", "beam_check"]) - self.client.logbook.send_logbook_message(msg) - except Exception: - logger.warning("Failed to send update to SciLog.") - - def rt_off(self): - dev.rtx.enabled = False - dev.rty.enabled = False - - def rt_on(self): - dev.rtx.enabled = True - dev.rty.enabled = True - if dev.rtx.enabled == True: - print("rt is enabled") - else: - print("failed to enable rt") - - def feedback_enable_with_reset(self): - self.device_manager.devices.rtx.controller.feedback_enable_with_reset() - self.feedback_status() - - def feedback_enable_without_reset(self): - self.device_manager.devices.rtx.controller.feedback_enable_without_reset() - self.feedback_status() - - def feedback_disable(self): - self.device_manager.devices.rtx.controller.feedback_disable() - self.feedback_status() - - def feedback_disable_and_reset_angle(self): - self.device_manager.devices.rtx.controller.feedback_disable_and_even_reset_lamni_angle_interferometer() - self.feedback_status() - - def feedback_status(self): - if self.device_manager.devices.rtx.controller.feedback_is_running(): - print("The rt feedback is \x1b[92mrunning\x1b[0m.") - else: - print("The rt feedback is \x1b[91mNOT\x1b[0m running.") - - def show_interferometer_positions(self): - self.device_manager.devices.rtx.controller.show_interferometer_positions() - - def show_signal_strength(self): - self.device_manager.devices.rtx.controller.show_signal_strength_interferometer() - - def show_analog_signals(self): - return self.device_manager.devices.rtx.controller.show_analog_signals() - - def add_sample_database( - self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user - ): - """Add a sample to the omny sample database. This also retrieves the tomo id.""" - subprocess.run( - "wget --user=omny --password=samples -q -O /tmp/currsamplesnr.txt" - f" 'https://omny.web.psi.ch/samples/newmeasurement.php?sample={samplename}&date={date}&eaccount={eaccount}&scannr={scan_number}&setup={setup}&additional={sample_additional_info}&user={user}'", - shell=True, - ) - with open("/tmp/currsamplesnr.txt") as tomo_number_file: - tomo_number = int(tomo_number_file.read()) - return tomo_number - - def _at_each_angle(self, angle: float) -> None: - self.tomo_scan_projection(angle) - self.tomo_reconstruct() - - ### XMCD ### - # 2 projections, 1 for each polarization state - # cp() - # self.tomo_scan_projection(angle) - # self.tomo_reconstruct() - # cm() - # self.tomo_scan_projection(angle) - # self.tomo_reconstruct() - - def _write_subtomo_to_scilog(self, subtomo_number): - """Write subtomo start information to scilog.""" - bec = builtins.__dict__.get("bec") - if self.tomo_id > 0: - tags = ["BEC_subtomo", self.sample_name, f"tomo_id_{self.tomo_id}"] - else: - tags = ["BEC_subtomo", self.sample_name] - self.write_to_scilog( - f"Starting subtomo: {subtomo_number}. First scan number: {bec.queue.next_scan_number}.", - tags, - ) - - def sub_tomo_scan(self, subtomo_number, start_angle=None): - """start a subtomo""" - self._write_subtomo_to_scilog(subtomo_number) - - if start_angle is None: - if subtomo_number == 1: - start_angle = 0 - elif subtomo_number == 2: - start_angle = self.tomo_angle_stepsize / 8.0 * 4 - elif subtomo_number == 3: - start_angle = self.tomo_angle_stepsize / 8.0 * 2 - elif subtomo_number == 4: - start_angle = self.tomo_angle_stepsize / 8.0 * 6 - elif subtomo_number == 5: - start_angle = self.tomo_angle_stepsize / 8.0 * 1 - elif subtomo_number == 6: - start_angle = self.tomo_angle_stepsize / 8.0 * 5 - elif subtomo_number == 7: - start_angle = self.tomo_angle_stepsize / 8.0 * 3 - elif subtomo_number == 8: - start_angle = self.tomo_angle_stepsize / 8.0 * 7 - - # _tomo_shift_angles (potential global variable) - _tomo_shift_angles = 0 - angle_end = start_angle + 360 - angles = np.linspace( - start_angle + _tomo_shift_angles, - angle_end, - num=int(360 / self.tomo_angle_stepsize) + 1, - endpoint=True, - ) - - # reverse even sub-tomograms - if not (subtomo_number % 2): - angles = np.flip(angles) - for angle in angles: - self.progress["tomo_type"] = "Equally spaced sub-tomograms" - self.progress["subtomo"] = subtomo_number - self.progress["subtomo_projection"] = np.where(angles == angle)[0][0] - self.progress["subtomo_total_projections"] = 360 / self.tomo_angle_stepsize - self.progress["projection"] = (subtomo_number - 1) * self.progress[ - "subtomo_total_projections" - ] + self.progress["subtomo_projection"] - self.progress["total_projections"] = 360 / self.tomo_angle_stepsize * 8 - self.progress["angle"] = angle - self._tomo_scan_at_angle(angle, subtomo_number) - - def _print_progress(self): - print("\x1b[95mProgress report:") - print(f"Tomo type: ....................... {self.progress['tomo_type']}") - print(f"Projection: ...................... {self.progress['projection']}") - print(f"Total projections expected ....... {self.progress['total_projections']}") - print(f"Angle: ........................... {self.progress['angle']}") - print(f"Current subtomo: ................. {self.progress['subtomo']}") - print(f"Current projection within subtomo: {self.progress['subtomo_projection']}\x1b[0m") - - def _tomo_scan_at_angle(self, angle, subtomo_number): - successful = False - error_caught = False - if 0 <= angle < 360.05: - print(f"Starting LamNI scan for angle {angle} in subtomo {subtomo_number}") - self._print_progress() - while not successful: - self._start_beam_check() - if not self.special_angles: - self._current_special_angles = [] - if self._current_special_angles: - next_special_angle = self._current_special_angles[0] - if np.isclose(angle, next_special_angle, atol=0.5): - self._current_special_angles.pop(0) - num_repeats = self.special_angle_repeats - else: - num_repeats = 1 - try: - start_scan_number = bec.queue.next_scan_number - for i in range(num_repeats): - self._at_each_angle(angle) - error_caught = False - except AlarmBase as exc: - if exc.alarm_type == "TimeoutError": - bec.queue.request_queue_reset() - time.sleep(2) - error_caught = True - else: - num_repeats = 1 - try: - start_scan_number = bec.queue.next_scan_number - for i in range(num_repeats): - self._at_each_angle(angle) - error_caught = False - except AlarmBase as exc: - if exc.alarm_type == "TimeoutError": - bec.queue.request_queue_reset() - time.sleep(2) - error_caught = True - else: - raise exc - - if self._was_beam_okay() and not error_caught: - successful = True - else: - self._wait_for_beamline_checks() - end_scan_number = bec.queue.next_scan_number - for scan_nr in range(start_scan_number, end_scan_number): - self._write_tomo_scan_number(scan_nr, angle, subtomo_number) - - if self._was_beam_okay() and not error_caught: - successful = True - else: - self._wait_for_beamline_checks() - - def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None: - tomo_scan_numbers_file = os.path.expanduser( - "~/Data10/specES1/dat-files/tomography_scannumbers.txt" - ) - with open(tomo_scan_numbers_file, "a+") as out_file: - # pylint: disable=undefined-variable - out_file.write( - f"{scan_number} {angle} {dev.lsamrot.read()['lsamrot']['value']:.3f} {self.tomo_id} {subtomo_number} {0} {'lamni'}\n" - ) - - def _golden(self, ii, howmany_sorted, maxangle=360, reverse=False): - """Return the ii-th golden ratio angle within sorted bunches of size howmany_sorted, - and its subtomo number. Operates over maxangle degrees (360 for LamNI).""" - golden = [] - for iji in range( - (ii - (ii % howmany_sorted)), (ii - (ii % howmany_sorted)) + howmany_sorted, 1 - ): - golden.append( - ((iji * maxangle * (1 + pow(5, 0.5)) / 2) * 1000 % (maxangle * 1000)) / 1000 - ) - golden.sort() - subtomo_number = int(ii / howmany_sorted) + 1 - if reverse and not subtomo_number % 2: - golden.reverse() - return (golden[ii % howmany_sorted], subtomo_number) - - def _golden_equally_spaced(self, ii, number_of_projections_per_subtomo, maxangle=360, reverse=True, verbose=False): - """Return angles for equally spaced tomography with sub-tomogram starting angles - shifted according to the golden ratio. Operates over maxangle degrees (360 for LamNI). - ii is the projection number starting at 0.""" - angular_step = maxangle / number_of_projections_per_subtomo - subtomo_number = int(((ii) * angular_step) / maxangle) + 1 - start_angle = self._golden(subtomo_number - 1, 1, angular_step)[0] - projection_number_of_subtomo = ( - ii - (subtomo_number - 1) * number_of_projections_per_subtomo - ) - - if reverse: - if subtomo_number % 2: - angle = start_angle + projection_number_of_subtomo * angular_step - else: - angle = ( - start_angle - + (number_of_projections_per_subtomo - 1) * angular_step - - projection_number_of_subtomo * angular_step - ) - else: - angle = start_angle + projection_number_of_subtomo * angular_step - - if verbose: - print( - f"Equally spaced golden ratio tomography.\n" - f"Angular step: {angular_step}\n" - f"Subtomo Number: {subtomo_number}\n" - f"Angle: {angle}" - ) - - return angle, subtomo_number - - def tomo_scan(self, subtomo_start=1, start_angle=None, projection_number=None): - """Start a tomo scan. - - Args: - subtomo_start (int): For tomo_type 1, the sub-tomogram number to start from. Defaults to 1. - start_angle (float, optional): Override the starting angle of the first sub-tomogram. Defaults to None. - projection_number (int, optional): For tomo_types 2 and 3, resume from this projection index. Defaults to None. - """ - bec = builtins.__dict__.get("bec") - scans = builtins.__dict__.get("scans") - self._current_special_angles = self.special_angles.copy() - - # Register a new tomo scan in the database and write the PDF report - # only when starting fresh (not resuming mid-scan) - if ( - (self.tomo_type == 1 and subtomo_start == 1 and start_angle is None) - or (self.tomo_type == 2 and projection_number is None) - or (self.tomo_type == 3 and projection_number is None) - ): - # pylint: disable=undefined-variable - self.tomo_id = self.add_sample_database( - self.sample_name, - str(datetime.date.today()), - bec.active_account.decode(), - bec.queue.next_scan_number, - "lamni", - "test additional info", - "BEC", - ) - self.write_pdf_report() - - with scans.dataset_id_on_hold: - if self.tomo_type == 1: - # 8 equally spaced sub-tomograms over 360 degrees - self.progress["tomo_type"] = "Equally spaced sub-tomograms" - for ii in range(subtomo_start, 9): - self.sub_tomo_scan(ii, start_angle=start_angle) - start_angle = None - - elif self.tomo_type == 2: - # Golden ratio tomography (sorted bunches) over 360 degrees - self.progress["tomo_type"] = "Golden ratio tomography" - previous_subtomo_number = -1 - ii = 0 if projection_number is None else projection_number - while True: - angle, subtomo_number = self._golden( - ii, self.golden_ratio_bunch_size, maxangle=360, reverse=True - ) - if previous_subtomo_number != subtomo_number: - self._write_subtomo_to_scilog(subtomo_number) - if ( - subtomo_number % 2 == 1 - and ii > 10 - and self.golden_projections_at_0_deg_for_damage_estimation == 1 - ): - self._tomo_scan_at_angle(0, subtomo_number) - previous_subtomo_number = subtomo_number - - self.progress["subtomo"] = subtomo_number - self.progress["projection"] = ii - self.progress["angle"] = angle - if self.golden_ratio_bunch_size > 0: - self.progress["subtomo_total_projections"] = self.golden_ratio_bunch_size - self.progress["subtomo_projection"] = ( - ii - (subtomo_number - 1) * self.golden_ratio_bunch_size - ) - else: - self.progress["subtomo_total_projections"] = 0 - self.progress["subtomo_projection"] = 0 - self.progress["total_projections"] = ( - self.golden_max_number_of_projections - if self.golden_max_number_of_projections > 0 - else 0 - ) - - self._tomo_scan_at_angle(angle, subtomo_number) - ii += 1 - if ( - self.golden_max_number_of_projections > 0 - and ii > self.golden_max_number_of_projections - ): - print( - f"Golden ratio tomography stopped automatically after the requested" - f" {self.golden_max_number_of_projections} projections." - ) - break - - elif self.tomo_type == 3: - # Equally spaced tomography with golden ratio starting angles over 360 degrees - self.progress["tomo_type"] = "Equally spaced, golden ratio starting angles" - previous_subtomo_number = -1 - ii = 0 if projection_number is None else projection_number - while True: - angle, subtomo_number = self._golden_equally_spaced( - ii, int(360 / self.tomo_angle_stepsize), maxangle=360, reverse=True - ) - if previous_subtomo_number != subtomo_number: - self._write_subtomo_to_scilog(subtomo_number) - if ( - subtomo_number % 2 == 1 - and ii > 10 - and self.golden_projections_at_0_deg_for_damage_estimation == 1 - ): - self._tomo_scan_at_angle(0, subtomo_number) - previous_subtomo_number = subtomo_number - - self.progress["subtomo"] = subtomo_number - self.progress["projection"] = ii - self.progress["angle"] = angle - self.progress["subtomo_total_projections"] = 360 / self.tomo_angle_stepsize - self.progress["subtomo_projection"] = ( - ii - (subtomo_number - 1) * self.progress["subtomo_total_projections"] - ) - self.progress["total_projections"] = ( - self.golden_max_number_of_projections - if self.golden_max_number_of_projections > 0 - else 0 - ) - - self._tomo_scan_at_angle(angle, subtomo_number) - ii += 1 - if ( - self.golden_max_number_of_projections > 0 - and ii > self.golden_max_number_of_projections - ): - print( - f"Golden ratio tomography stopped automatically after the requested" - f" {self.golden_max_number_of_projections} projections." - ) - break - else: - raise ValueError(f"Unknown tomo_type: {self.tomo_type}.") - - def tomo_parameters(self): - """print and update the tomo parameters""" - print("Current settings:") - print(f"Counting time")).add_tag(
- ["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "LamNI", self.sample_name]
- )
- self.client.logbook.send_logbook_message(msg)
-
-
-class MagLamNI(LamNI):
- def sub_tomo_scan(self, subtomo_number, start_angle=None):
- super().sub_tomo_scan(subtomo_number, start_angle)
- # self.rotate_slowly(0)
-
- def rotate_slowly(self, angle, step_size=20):
- current_angle = dev.lsamrot.read(cached=True)["value"]
- steps = int(np.ceil(np.abs(current_angle - angle) / step_size)) + 1
- for target_angle in np.linspace(current_angle, angle, steps, endpoint=True):
- umv(dev.lsamrot, target_angle)
- scans.lamni_move_to_scan_center(
- self.align.tomo_fovx_offset, self.align.tomo_fovy_offset, target_angle
- )
-
- def _at_each_angle(self, angle: float) -> None:
- if "lamni_at_each_angle" in builtins.__dict__:
- lamni_at_each_angle(self, angle)
- return
-
- self.tomo_scan_projection(angle)
- self.tomo_reconstruct()
-
- # # cm()
- # # umv(dev.ppth,15.1762) #11.567 keV
- # for ii in range(2):
- # self.tomo_scan_projection(angle)
- # self.tomo_reconstruct()
- # # cp()
- # # umv(dev.ppth,15.1827) #11.567 keV
- # for ii in range(2):
- # self.tomo_scan_projection(angle)
- # self.tomo_reconstruct()
-
-
-class DataDrivenLamNI(LamNI):
- def __init__(self, client):
- super().__init__(client)
- self.tomo_data = {}
-
- def tomo_scan(
- self,
- subtomo_start=1,
- start_index=None,
- fname="~/Data10/data_driven_config/datadriven_params.h5",
- ):
- """start a tomo scan"""
- bec = builtins.__dict__.get("bec")
- scans = builtins.__dict__.get("scans")
-
- fname = os.path.expanduser(fname)
-
- if not os.path.exists(fname):
- raise FileNotFoundError(f"Could not find datadriven params file in {fname}.")
- content = f"Loading tomo parameters from {fname}."
- logger.warning(content)
- tags = ["Data_driven_file", "BEC"]
- msg = bec.logbook.LogbookMessage()
- msg.add_text(content).add_tag(tags)
- self.client.logbook.send_logbook_message(msg)
- self._update_tomo_data_from_file(fname)
-
- self._current_special_angles = self.special_angles.copy()
-
- if subtomo_start == 1 and start_index is None:
- # pylint: disable=undefined-variable
- self.tomo_id = self.add_sample_database(
- self.sample_name,
- str(datetime.date.today()),
- bec.active_account.decode(),
- bec.queue.next_scan_number,
- "lamni",
- "test additional info",
- "BEC",
- )
- self.write_pdf_report()
- with scans.dataset_id_on_hold:
- self.sub_tomo_data_driven(start_index)
-
- def sub_tomo_scan(self):
- raise NotImplementedError(
- "Cannot run sub_tomo_scan with data-driven LamNI. Please use"
- " lamni.tomo_scan(subtomo_start=