diff --git a/csaxs_bec/bec_ipython_client/plugins/LamNI/alignment.py b/csaxs_bec/bec_ipython_client/plugins/LamNI/alignment.py new file mode 100644 index 0000000..9049562 --- /dev/null +++ b/csaxs_bec/bec_ipython_client/plugins/LamNI/alignment.py @@ -0,0 +1,461 @@ +import builtins +import time +from collections import defaultdict + +import numpy as np +from bec_lib import bec_logger +from typeguard import typechecked + +from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen + +logger = bec_logger.logger + +if builtins.__dict__.get("bec") is not None: + bec = builtins.__dict__.get("bec") + dev = builtins.__dict__.get("dev") + umv = builtins.__dict__.get("umv") + umvr = builtins.__dict__.get("umvr") + + +class XrayEyeAlign: + # pixel calibration, multiply to get mm + # PIXEL_CALIBRATION = 0.2/209 #.2 with binning + PIXEL_CALIBRATION = 0.2 / 218 # .2 with binning + + def __init__(self, client, lamni) -> None: + self.client = client + self.lamni = lamni + self.device_manager = client.device_manager + self.scans = client.scans + self.alignment_values = defaultdict(list) + self._reset_init_values() + self.corr_pos_x = [] + self.corr_pos_y = [] + self.corr_angle = [] + self.corr_pos_x_2 = [] + self.corr_pos_y_2 = [] + self.corr_angle_2 = [] + + # ------------------------------------------------------------------ + # Correction reset + # ------------------------------------------------------------------ + + def reset_correction(self): + self.corr_pos_x = [] + self.corr_pos_y = [] + self.corr_angle = [] + + def reset_correction_2(self): + self.corr_pos_x_2 = [] + self.corr_pos_y_2 = [] + self.corr_angle_2 = [] + + def reset_xray_eye_correction(self): + self.client.delete_global_var("tomo_fit_xray_eye") + + # ------------------------------------------------------------------ + # FOV offset properties + # ------------------------------------------------------------------ + + @property + def tomo_fovx_offset(self): + val = self.client.get_global_var("tomo_fov_offset") + if val is None: + return 0.0 + return val[0] / 1000 + + @tomo_fovx_offset.setter + @typechecked + def tomo_fovx_offset(self, val: float): + val_old = self.client.get_global_var("tomo_fov_offset") + if val_old is None: + val_old = [0.0, 0.0] + self.client.set_global_var("tomo_fov_offset", [val * 1000, val_old[1]]) + + @property + def tomo_fovy_offset(self): + val = self.client.get_global_var("tomo_fov_offset") + if val is None: + return 0.0 + return val[1] / 1000 + + @tomo_fovy_offset.setter + @typechecked + def tomo_fovy_offset(self, val: float): + val_old = self.client.get_global_var("tomo_fov_offset") + if val_old is None: + val_old = [0.0, 0.0] + self.client.set_global_var("tomo_fov_offset", [val_old[0], val * 1000]) + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _reset_init_values(self): + self.shift_xy = [0, 0] + self._xray_fov_xy = [0, 0] + + def _disable_rt_feedback(self): + self.device_manager.devices.rtx.controller.feedback_disable() + + def _enable_rt_feedback(self): + self.device_manager.devices.rtx.controller.feedback_enable_with_reset() + + def tomo_rotate(self, val: float): + # pylint: disable=undefined-variable + umv(self.device_manager.devices.lsamrot, val) + + def get_tomo_angle(self): + return self.device_manager.devices.lsamrot.readback.read()["lsamrot"]["value"] + + # ------------------------------------------------------------------ + # X-ray eye camera control + # ------------------------------------------------------------------ + + def save_frame(self): + epics_put("XOMNYI-XEYE-SAVFRAME:0", 1) + + def update_frame(self): + epics_put("XOMNYI-XEYE-ACQDONE:0", 0) + # start live + epics_put("XOMNYI-XEYE-ACQ:0", 1) + # wait for start live + while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0: + time.sleep(0.5) + print("waiting for live view to start...") + fshopen() + + epics_put("XOMNYI-XEYE-ACQDONE:0", 0) + + while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0: + print("waiting for new frame...") + time.sleep(0.5) + + time.sleep(0.5) + # stop live view + epics_put("XOMNYI-XEYE-ACQ:0", 0) + time.sleep(1) + print("got new frame") + + def update_fov(self, k: int): + self._xray_fov_xy[0] = max(epics_get(f"XOMNYI-XEYE-XWIDTH_X:{k}"), self._xray_fov_xy[0]) + self._xray_fov_xy[1] = max(0, self._xray_fov_xy[0]) + + @property + def movement_buttons_enabled(self): + return [epics_get("XOMNYI-XEYE-ENAMVX:0"), epics_get("XOMNYI-XEYE-ENAMVY:0")] + + @movement_buttons_enabled.setter + def movement_buttons_enabled(self, enabled: bool): + enabled = int(enabled) + epics_put("XOMNYI-XEYE-ENAMVX:0", enabled) + epics_put("XOMNYI-XEYE-ENAMVY:0", enabled) + + def send_message(self, msg: str): + epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg) + + # ------------------------------------------------------------------ + # Alignment procedure + # ------------------------------------------------------------------ + + def align(self): + self._reset_init_values() + self.reset_correction() + self.reset_correction_2() + + self._disable_rt_feedback() + epics_put("XOMNYI-XEYE-PIXELSIZE:0", self.PIXEL_CALIBRATION) + self._enable_rt_feedback() + + self.movement_buttons_enabled = False + epics_put("XOMNYI-XEYE-ACQ:0", 0) + self.send_message("please wait...") + epics_put("XOMNYI-XEYE-SAMPLENAME:0.DESC", "Let us LAMNI...") + + self._disable_rt_feedback() + k = 0 + + self.lamni.lfzp_in() + self.update_frame() + + self.movement_buttons_enabled = False + epics_put("XOMNYI-XEYE-SUBMIT:0", 0) + epics_put("XOMNYI-XEYE-STEP:0", 0) + self.send_message("Submit center value of FZP.") + + while True: + if epics_get("XOMNYI-XEYE-SUBMIT:0") == 1: + val_x = epics_get(f"XOMNYI-XEYE-XVAL_X:{k}") * self.PIXEL_CALIBRATION # in mm + val_y = epics_get(f"XOMNYI-XEYE-YVAL_Y:{k}") * self.PIXEL_CALIBRATION # in mm + self.alignment_values[k] = [val_x, val_y] + print( + f"Clicked position {k}: x {self.alignment_values[k][0]}, y" + f" {self.alignment_values[k][1]}" + ) + + if k == 0: # received center value of FZP + self.send_message("please wait ...") + self.lamni.loptics_out() + epics_put("XOMNYI-XEYE-SUBMIT:0", -1) + self.movement_buttons_enabled = False + print("Moving sample in, FZP out") + + self._disable_rt_feedback() + time.sleep(0.3) + self._enable_rt_feedback() + time.sleep(0.3) + + self.update_frame() + self.send_message("Go and find the sample") + epics_put("XOMNYI-XEYE-SUBMIT:0", 0) + self.movement_buttons_enabled = True + + elif k == 1: # received sample center value at samrot 0 + msg = ( + f"Base shift values from movement are x {self.shift_xy[0]}, y" + f" {self.shift_xy[1]}" + ) + print(msg) + logger.info(msg) + self.shift_xy[0] += ( + self.alignment_values[0][0] - self.alignment_values[1][0] + ) * 1000 + self.shift_xy[1] += ( + self.alignment_values[1][1] - self.alignment_values[0][1] + ) * 1000 + print( + "Base shift values from movement and clicked position are x" + f" {self.shift_xy[0]}, y {self.shift_xy[1]}" + ) + + self.scans.lamni_move_to_scan_center( + self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle() + ).wait() + + self.send_message("please wait ...") + epics_put("XOMNYI-XEYE-SUBMIT:0", -1) + self.movement_buttons_enabled = False + time.sleep(1) + + self.scans.lamni_move_to_scan_center( + self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle() + ).wait() + + epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle()) + self.update_frame() + self.send_message("Submit sample center and FOV (0 deg)") + epics_put("XOMNYI-XEYE-SUBMIT:0", 0) + self.update_fov(k) + + elif 1 < k < 10: # received sample center value at samrot 0 ... 315 + self.send_message("please wait ...") + epics_put("XOMNYI-XEYE-SUBMIT:0", -1) + + self._disable_rt_feedback() + self.tomo_rotate((k - 1) * 45 - 45 / 2) + self.scans.lamni_move_to_scan_center( + self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle() + ).wait() + self._disable_rt_feedback() + self.tomo_rotate((k - 1) * 45) + self.scans.lamni_move_to_scan_center( + self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle() + ).wait() + + epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle()) + self.update_frame() + self.send_message("Submit sample center") + epics_put("XOMNYI-XEYE-SUBMIT:0", 0) + epics_put("XOMNYI-XEYE-ENAMVX:0", 1) + self.update_fov(k) + + elif k == 10: # received sample center value at samrot 270, done + self.send_message("done...") + epics_put("XOMNYI-XEYE-SUBMIT:0", -1) + self.movement_buttons_enabled = False + self.update_fov(k) + break + + k += 1 + epics_put("XOMNYI-XEYE-STEP:0", k) + + if k < 2: + _xrayeyalignmvx = epics_get("XOMNYI-XEYE-MVX:0") + _xrayeyalignmvy = epics_get("XOMNYI-XEYE-MVY:0") + if _xrayeyalignmvx != 0 or _xrayeyalignmvy != 0: + self.shift_xy[0] = self.shift_xy[0] + _xrayeyalignmvx + self.shift_xy[1] = self.shift_xy[1] + _xrayeyalignmvy + self.scans.lamni_move_to_scan_center( + self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle() + ).wait() + print( + f"Current center horizontal {self.shift_xy[0]} vertical {self.shift_xy[1]}" + ) + epics_put("XOMNYI-XEYE-MVY:0", 0) + epics_put("XOMNYI-XEYE-MVX:0", 0) + self.update_frame() + + time.sleep(0.2) + + self.write_output() + fovx = self._xray_fov_xy[0] * self.PIXEL_CALIBRATION * 1000 / 2 + fovy = self._xray_fov_xy[1] * self.PIXEL_CALIBRATION * 1000 / 2 + print( + f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns," + f" fovy = {fovy:.0f} microns" + ) + print("Use matlab routine to fit the current alignment...") + print( + "This additional shift is applied to the base shift values\n which are x" + f" {self.shift_xy[0]}, y {self.shift_xy[1]}" + ) + + self._disable_rt_feedback() + self.tomo_rotate(0) + + print( + "\n\nNEXT LOAD ALIGNMENT PARAMETERS\nby running" + " lamni.align.read_xray_eye_correction()\n" + ) + + self.client.set_global_var("tomo_fov_offset", self.shift_xy) + + # ------------------------------------------------------------------ + # Alignment output + # ------------------------------------------------------------------ + + def write_output(self): + import os + with open( + os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues"), "w" + ) as alignment_values_file: + alignment_values_file.write("angle\thorizontal\tvertical\n") + for k in range(2, 11): + fovx_offset = (self.alignment_values[0][0] - self.alignment_values[k][0]) * 1000 + fovy_offset = (self.alignment_values[k][1] - self.alignment_values[0][1]) * 1000 + print( + f"Writing to file new alignment: number {k}, value x {fovx_offset}, y" + f" {fovy_offset}" + ) + alignment_values_file.write(f"{(k-2)*45}\t{fovx_offset}\t{fovy_offset}\n") + + # ------------------------------------------------------------------ + # X-ray eye sinusoidal correction (loaded from MATLAB fit files) + # ------------------------------------------------------------------ + + def read_xray_eye_correction(self, dir_path=None): + import os + if dir_path is None: + dir_path = os.path.expanduser("~/Data10/specES1/internal/") + tomo_fit_xray_eye = np.zeros((2, 3)) + for i, axis in enumerate(["x", "y"]): + for j, coeff in enumerate(["A", "B", "C"]): + with open(os.path.join(dir_path, f"ptychotomoalign_{coeff}{axis}.txt"), "r") as f: + tomo_fit_xray_eye[i][j] = f.readline() + + self.client.set_global_var("tomo_fit_xray_eye", tomo_fit_xray_eye.tolist()) + # x amp, phase, offset, y amp, phase, offset + # 0 0 0 1 0 2 1 0 1 1 1 2 + print("New alignment parameters loaded from X-ray eye") + print( + f"X Amplitude {tomo_fit_xray_eye[0][0]}, " + f"X Phase {tomo_fit_xray_eye[0][1]}, " + f"X Offset {tomo_fit_xray_eye[0][2]}, " + f"Y Amplitude {tomo_fit_xray_eye[1][0]}, " + f"Y Phase {tomo_fit_xray_eye[1][1]}, " + f"Y Offset {tomo_fit_xray_eye[1][2]}" + ) + + def lamni_compute_additional_correction_xeye_mu(self, angle): + """Compute sinusoidal correction from the X-ray eye fit for the given angle.""" + tomo_fit_xray_eye = self.client.get_global_var("tomo_fit_xray_eye") + if tomo_fit_xray_eye is None: + print("Not applying any additional correction. No x-ray eye data available.\n") + return (0, 0) + + # x amp, phase, offset, y amp, phase, offset + # 0 0 0 1 0 2 1 0 1 1 1 2 + correction_x = ( + tomo_fit_xray_eye[0][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[0][1]) + + tomo_fit_xray_eye[0][2] + ) / 1000 + correction_y = ( + tomo_fit_xray_eye[1][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[1][1]) + + tomo_fit_xray_eye[1][2] + ) / 1000 + + print(f"Xeye correction x {correction_x}, y {correction_y} for angle {angle}\n") + return (correction_x, correction_y) + + # ------------------------------------------------------------------ + # Additional lookup-table corrections (iteration 1 and 2) + # ------------------------------------------------------------------ + + def read_additional_correction(self, correction_file: str): + self.corr_pos_x, self.corr_pos_y, self.corr_angle = self._read_correction_file_xy( + correction_file + ) + + def read_additional_correction_2(self, correction_file: str): + self.corr_pos_x_2, self.corr_pos_y_2, self.corr_angle_2 = self._read_correction_file_xy( + correction_file + ) + + def _read_correction_file_xy(self, correction_file: str): + """Parse a correction file that contains corr_pos_x, corr_pos_y and corr_angle entries.""" + with open(correction_file, "r") as f: + num_elements = f.readline() + int_num_elements = int(num_elements.split(" ")[2]) + print(int_num_elements) + corr_pos_x = [] + corr_pos_y = [] + corr_angle = [] + for j in range(0, int_num_elements * 3): + line = f.readline() + value = line.split(" ")[2] + name = line.split(" ")[0].split("[")[0] + if name == "corr_pos_x": + corr_pos_x.append(float(value) / 1000) + elif name == "corr_pos_y": + corr_pos_y.append(float(value) / 1000) + elif name == "corr_angle": + corr_angle.append(float(value)) + return corr_pos_x, corr_pos_y, corr_angle + + def compute_additional_correction(self, angle): + return self._compute_correction_xy( + angle, self.corr_pos_x, self.corr_pos_y, self.corr_angle, label="1" + ) + + def compute_additional_correction_2(self, angle): + return self._compute_correction_xy( + angle, self.corr_pos_x_2, self.corr_pos_y_2, self.corr_angle_2, label="2" + ) + + def _compute_correction_xy(self, angle, corr_pos_x, corr_pos_y, corr_angle, label=""): + """Find the correction for the closest angle in the lookup table.""" + if not corr_pos_x: + print(f"Not applying additional correction {label}. No data available.\n") + return (0, 0) + + shift_x = corr_pos_x[0] + shift_y = corr_pos_y[0] + angledelta = np.fabs(corr_angle[0] - angle) + + for j in range(1, len(corr_pos_x)): + newangledelta = np.fabs(corr_angle[j] - angle) + if newangledelta < angledelta: + shift_x = corr_pos_x[j] + shift_y = corr_pos_y[j] + angledelta = newangledelta + + if shift_x == 0 and angle < corr_angle[0]: + shift_x = corr_pos_x[0] + shift_y = corr_pos_y[0] + + if shift_x == 0 and angle > corr_angle[-1]: + shift_x = corr_pos_x[-1] + shift_y = corr_pos_y[-1] + + print(f"Additional correction shifts {label}: {shift_x} {shift_y}") + return (shift_x, shift_y) diff --git a/csaxs_bec/bec_ipython_client/plugins/LamNI/extra_tomo.py b/csaxs_bec/bec_ipython_client/plugins/LamNI/extra_tomo.py new file mode 100644 index 0000000..5eac3a4 --- /dev/null +++ b/csaxs_bec/bec_ipython_client/plugins/LamNI/extra_tomo.py @@ -0,0 +1,211 @@ +""" +extra_tomo.py +============= +Specialist LamNI subclasses for specific experimental configurations. +Import explicitly when needed, e.g.: + + from csaxs_bec...extra_tomo import MagLamNI + from csaxs_bec...extra_tomo import DataDrivenLamNI +""" + +import builtins +import datetime +import os +import time + +import h5py +import numpy as np +from bec_lib import bec_logger +from bec_lib.alarm_handler import AlarmBase + +from .lamni import LamNI + +logger = bec_logger.logger + +if builtins.__dict__.get("bec") is not None: + bec = builtins.__dict__.get("bec") + dev = builtins.__dict__.get("dev") + umv = builtins.__dict__.get("umv") + scans = builtins.__dict__.get("scans") + + +class MagLamNI(LamNI): + """LamNI subclass for magnetic experiments (XMCD). + + Adds a slow rotation helper and allows injection of a custom + per-angle callback via the ``lamni_at_each_angle`` builtin. + """ + + def sub_tomo_scan(self, subtomo_number, start_angle=None): + super().sub_tomo_scan(subtomo_number, start_angle) + # self.rotate_slowly(0) + + def rotate_slowly(self, angle, step_size=20): + """Rotate to target angle in small steps to avoid mechanical stress.""" + current_angle = dev.lsamrot.read(cached=True)["value"] + steps = int(np.ceil(np.abs(current_angle - angle) / step_size)) + 1 + for target_angle in np.linspace(current_angle, angle, steps, endpoint=True): + umv(dev.lsamrot, target_angle) + scans.lamni_move_to_scan_center( + self.align.tomo_fovx_offset, self.align.tomo_fovy_offset, target_angle + ) + + def _at_each_angle(self, angle: float) -> None: + if "lamni_at_each_angle" in builtins.__dict__: + # pylint: disable=undefined-variable + lamni_at_each_angle(self, angle) + return + + self.tomo_scan_projection(angle) + self.tomo_reconstruct() + + +class DataDrivenLamNI(LamNI): + """LamNI subclass that reads per-projection scan parameters from an HDF5 file. + + Instead of a fixed FOV and step size for the whole tomogram, each + projection can have individual values for step size, loptz position + and lateral shifts, as specified in a datadriven_params.h5 file. + """ + + def __init__(self, client): + super().__init__(client) + self.tomo_data = {} + + def tomo_scan( + self, + subtomo_start=1, + start_index=None, + fname="~/Data10/data_driven_config/datadriven_params.h5", + ): + """Start a data-driven tomo scan. + + Args: + subtomo_start (int): Unused; kept for API compatibility. Use start_index to resume. + start_index (int, optional): Skip projections before this index. Defaults to None. + fname (str): Path to the HDF5 parameter file. Defaults to the standard location. + """ + bec = builtins.__dict__.get("bec") + scans = builtins.__dict__.get("scans") + + fname = os.path.expanduser(fname) + if not os.path.exists(fname): + raise FileNotFoundError(f"Could not find datadriven params file in {fname}.") + + content = f"Loading tomo parameters from {fname}." + logger.warning(content) + msg = bec.logbook.LogbookMessage() + msg.add_text(content).add_tag(["Data_driven_file", "BEC"]) + self.client.logbook.send_logbook_message(msg) + + self._update_tomo_data_from_file(fname) + self._current_special_angles = self.special_angles.copy() + + if subtomo_start == 1 and start_index is None: + self.tomo_id = self.add_sample_database( + self.sample_name, + str(datetime.date.today()), + bec.active_account.decode(), + bec.queue.next_scan_number, + "lamni", + "test additional info", + "BEC", + ) + self.write_pdf_report() + + with scans.dataset_id_on_hold: + self.sub_tomo_data_driven(start_index) + + def sub_tomo_scan(self, subtomo_number=None, start_angle=None): + raise NotImplementedError( + "Cannot run sub_tomo_scan with DataDrivenLamNI. " + "Use lamni.tomo_scan(start_index=) to resume instead." + ) + + def _at_each_angle( + self, angle=None, stepsize=None, loptz_pos=None, manual_shift_x=0, manual_shift_y=0 + ): + self.manual_shift_x = manual_shift_x + self.manual_shift_y = manual_shift_y + self.tomo_shellstep = stepsize + if loptz_pos is not None: + dev.rtx.controller.feedback_disable() + umv(dev.loptz, loptz_pos) + super()._at_each_angle(angle=angle) + + def sub_tomo_data_driven(self, start_index=None): + """Iterate over all projections defined in the loaded HDF5 parameter file.""" + for scan_index, scan_data in enumerate(zip(*self.tomo_data.values())): + if start_index and scan_index < start_index: + continue + ( + angle, + stepsize, + loptz_pos, + propagation_distance, + manual_shift_x, + manual_shift_y, + subtomo_number, + ) = scan_data + bec.metadata.update( + {key: float(val) for key, val in zip(self.tomo_data.keys(), scan_data)} + ) + successful = False + error_caught = False + if 0 <= angle < 360.05: + print(f"Starting LamNI scan for angle {angle}") + while not successful: + self._start_beam_check() + if not self.special_angles: + self._current_special_angles = [] + if self._current_special_angles: + next_special_angle = self._current_special_angles[0] + if np.isclose(angle, next_special_angle, atol=0.5): + self._current_special_angles.pop(0) + num_repeats = self.special_angle_repeats + else: + num_repeats = 1 + try: + start_scan_number = bec.queue.next_scan_number + for i in range(num_repeats): + self._at_each_angle( + float(angle), + stepsize=float(stepsize), + loptz_pos=float(loptz_pos), + manual_shift_x=float(manual_shift_x), + manual_shift_y=float(manual_shift_y), + ) + error_caught = False + except AlarmBase as exc: + if exc.alarm_type == "TimeoutError": + bec.queue.request_queue_reset() + time.sleep(2) + error_caught = True + else: + raise exc + + if self._was_beam_okay() and not error_caught: + successful = True + else: + self._wait_for_beamline_checks() + + end_scan_number = bec.queue.next_scan_number + for scan_nr in range(start_scan_number, end_scan_number): + self._write_tomo_scan_number(scan_nr, angle, subtomo_number) + + def _update_tomo_data_from_file(self, fname: str) -> None: + """Load projection parameters from the HDF5 file into self.tomo_data.""" + with h5py.File(fname, "r") as file: + self.tomo_data["theta"] = np.array([*file["theta"]]).flatten() + self.tomo_data["stepsize"] = np.array([*file["stepsize"]]).flatten() + self.tomo_data["loptz"] = np.array([*file["loptz"]]).flatten() + self.tomo_data["propagation_distance"] = np.array( + [*file["relative_propagation_distance"]] + ).flatten() + self.tomo_data["manual_shift_x"] = np.array([*file["manual_shift_x"]]).flatten() + self.tomo_data["manual_shift_y"] = np.array([*file["manual_shift_y"]]).flatten() + self.tomo_data["subtomo_id"] = np.array([*file["subtomo_id"]]).flatten() + + shapes = [data.shape for data in self.tomo_data.values()] + if len(set(shapes)) > 1: + raise ValueError(f"Tomo data file has entries of inconsistent lengths: {shapes}.") diff --git a/csaxs_bec/bec_ipython_client/plugins/LamNI/lamni.py b/csaxs_bec/bec_ipython_client/plugins/LamNI/lamni.py new file mode 100644 index 0000000..536d0e3 --- /dev/null +++ b/csaxs_bec/bec_ipython_client/plugins/LamNI/lamni.py @@ -0,0 +1,1015 @@ +import builtins +import datetime +import os +import subprocess +import threading +import time +from pathlib import Path + +import numpy as np +from bec_lib import bec_logger +from bec_lib.alarm_handler import AlarmBase +from bec_lib.pdf_writer import PDFWriter +from typeguard import typechecked + +from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools + +from .alignment import XrayEyeAlign +from .lamni_optics_mixin import LaMNIInitStages, LamNIOpticsMixin + +logger = bec_logger.logger + +if builtins.__dict__.get("bec") is not None: + bec = builtins.__dict__.get("bec") + dev = builtins.__dict__.get("dev") + umv = builtins.__dict__.get("umv") + umvr = builtins.__dict__.get("umvr") + + +class LamNI(LamNIOpticsMixin): + def __init__(self, client): + super().__init__() + self.client = client + self.device_manager = client.device_manager + self.align = XrayEyeAlign(client, self) + self.init = LaMNIInitStages(client) + self.check_shutter = True + self.check_light_available = True + self.check_fofb = True + self._check_msgs = [] + self.tomo_id = -1 + self.special_angles = [] + self.special_angle_repeats = 20 + self.special_angle_tolerance = 20 + self._current_special_angles = [] + self._beam_is_okay = True + self._stop_beam_check_event = None + self.beam_check_thread = None + self.OMNYTools = OMNYTools(self.client) + # Progress tracking + self.progress = {} + self.progress["tomo_type"] = "Equally spaced sub-tomograms" + self.progress["subtomo"] = 0 + self.progress["subtomo_projection"] = 0 + self.progress["subtomo_total_projections"] = 1 + self.progress["projection"] = 0 + self.progress["total_projections"] = 1 + self.progress["angle"] = 0 + + # ------------------------------------------------------------------ + # Beamline checks + # ------------------------------------------------------------------ + + def get_beamline_checks_enabled(self): + print( + f"Shutter: {self.check_shutter}\nFOFB: {self.check_fofb}\nLight available:" + f" {self.check_light_available}" + ) + + @property + def beamline_checks_enabled(self): + return { + "shutter": self.check_shutter, + "fofb": self.check_fofb, + "light available": self.check_light_available, + } + + @beamline_checks_enabled.setter + def beamline_checks_enabled(self, val: bool): + self.check_shutter = val + self.check_light_available = val + self.check_fofb = val + self.get_beamline_checks_enabled() + + def _run_beamline_checks(self): + msgs = [] + dev = builtins.__dict__.get("dev") + try: + if self.check_shutter: + shutter_val = dev.x12sa_es1_shutter_status.read(cached=True) + if shutter_val["value"].lower() != "open": + self._beam_is_okay = False + msgs.append("Check beam failed: Shutter is closed.") + if self.check_light_available: + machine_status = dev.sls_machine_status.read(cached=True) + if machine_status["value"] not in ["Light Available", "Light-Available"]: + self._beam_is_okay = False + msgs.append("Check beam failed: Light not available.") + if self.check_fofb: + fast_orbit_feedback = dev.sls_fast_orbit_feedback.read(cached=True) + if fast_orbit_feedback["value"] != "running": + self._beam_is_okay = False + msgs.append("Check beam failed: Fast orbit feedback is not running.") + except Exception: + logger.warning("Failed to check beam.") + return msgs + + def _check_beam(self): + while not self._stop_beam_check_event.is_set(): + self._check_msgs = self._run_beamline_checks() + if not self._beam_is_okay: + self._stop_beam_check_event.set() + time.sleep(1) + + def _start_beam_check(self): + self._beam_is_okay = True + self._stop_beam_check_event = threading.Event() + self.beam_check_thread = threading.Thread(target=self._check_beam, daemon=True) + self.beam_check_thread.start() + + def _was_beam_okay(self): + self._stop_beam_check_event.set() + self.beam_check_thread.join() + return self._beam_is_okay + + def _print_beamline_checks(self): + for msg in self._check_msgs: + logger.warning(msg) + + def _wait_for_beamline_checks(self): + self._print_beamline_checks() + try: + msg = bec.logbook.LogbookMessage() + msg.add_text( + "

Beamline checks failed at" + f" {str(datetime.datetime.now())}: {''.join(self._check_msgs)}

" + ).add_tag(["BEC", "beam_check"]) + self.client.logbook.send_logbook_message(msg) + except Exception: + logger.warning("Failed to send update to SciLog.") + + while True: + self._beam_is_okay = True + self._check_msgs = self._run_beamline_checks() + if self._beam_is_okay: + break + self._print_beamline_checks() + time.sleep(1) + + try: + msg = bec.logbook.LogbookMessage() + msg.add_text( + "

Operation resumed at" + f" {str(datetime.datetime.now())}.

" + ).add_tag(["BEC", "beam_check"]) + self.client.logbook.send_logbook_message(msg) + except Exception: + logger.warning("Failed to send update to SciLog.") + + # ------------------------------------------------------------------ + # Special angles + # ------------------------------------------------------------------ + + def set_special_angles(self, angles: list, repeats: int = 20, tolerance: float = 0.5): + """Set the special angles for a tomo. + + Args: + angles (list): List of special angles. + repeats (int, optional): Number of repeats at a special angle. Defaults to 20. + tolerance (float, optional): Angle tolerance in degrees. Defaults to 0.5. + """ + self.special_angles = angles + self.special_angle_repeats = repeats + self.special_angle_tolerance = tolerance + + def remove_special_angles(self): + """Remove the special angles and reset repeats to 1.""" + self.special_angles = [] + self.special_angle_repeats = 1 + + # ------------------------------------------------------------------ + # RT feedback / interferometer helpers + # ------------------------------------------------------------------ + + def rt_off(self): + dev.rtx.enabled = False + dev.rty.enabled = False + + def rt_on(self): + dev.rtx.enabled = True + dev.rty.enabled = True + if dev.rtx.enabled == True: + print("rt is enabled") + else: + print("failed to enable rt") + + def feedback_enable_with_reset(self): + self.device_manager.devices.rtx.controller.feedback_enable_with_reset() + self.feedback_status() + + def feedback_enable_without_reset(self): + self.device_manager.devices.rtx.controller.feedback_enable_without_reset() + self.feedback_status() + + def feedback_disable(self): + self.device_manager.devices.rtx.controller.feedback_disable() + self.feedback_status() + + def feedback_disable_and_reset_angle(self): + self.device_manager.devices.rtx.controller.feedback_disable_and_even_reset_lamni_angle_interferometer() + self.feedback_status() + + def feedback_status(self): + if self.device_manager.devices.rtx.controller.feedback_is_running(): + print("The rt feedback is \x1b[92mrunning\x1b[0m.") + else: + print("The rt feedback is \x1b[91mNOT\x1b[0m running.") + + def show_interferometer_positions(self): + self.device_manager.devices.rtx.controller.show_interferometer_positions() + + def show_signal_strength(self): + self.device_manager.devices.rtx.controller.show_signal_strength_interferometer() + + def show_analog_signals(self): + return self.device_manager.devices.rtx.controller.show_analog_signals() + + # ------------------------------------------------------------------ + # Global parameters (backed by BEC global vars) + # ------------------------------------------------------------------ + + @property + def tomo_shellstep(self): + val = self.client.get_global_var("tomo_shellstep") + if val is None: + return 1 + return val + + @tomo_shellstep.setter + def tomo_shellstep(self, val: float): + self.client.set_global_var("tomo_shellstep", val) + + @property + def tomo_circfov(self): + val = self.client.get_global_var("tomo_circfov") + if val is None: + return 0.0 + return val + + @tomo_circfov.setter + def tomo_circfov(self, val: float): + self.client.set_global_var("tomo_circfov", val) + + @property + def tomo_type(self): + val = self.client.get_global_var("tomo_type") + if val is None: + return 1 + return val + + @tomo_type.setter + def tomo_type(self, val: int): + if val not in (1, 2, 3): + raise ValueError("Unknown tomo_type. Must be 1, 2 or 3.") + self.client.set_global_var("tomo_type", val) + + @property + def tomo_countingtime(self): + val = self.client.get_global_var("tomo_countingtime") + if val is None: + return 0.1 + return val + + @tomo_countingtime.setter + def tomo_countingtime(self, val: float): + self.client.set_global_var("tomo_countingtime", val) + + @property + def manual_shift_x(self): + val = self.client.get_global_var("manual_shift_x") + if val is None: + return 0.0 + return val + + @manual_shift_x.setter + def manual_shift_x(self, val: float): + self.client.set_global_var("manual_shift_x", val) + + @property + def manual_shift_y(self): + val = self.client.get_global_var("manual_shift_y") + if val is None: + return 0.0 + return val + + @manual_shift_y.setter + def manual_shift_y(self, val: float): + self.client.set_global_var("manual_shift_y", val) + + @property + def lamni_piezo_range_x(self): + val = self.client.get_global_var("lamni_piezo_range_x") + if val is None: + return 20 + return val + + @lamni_piezo_range_x.setter + def lamni_piezo_range_x(self, val: float): + if dev.rtx.user_parameter and dev.rtx.user_parameter.get("large_range_scan", True): + self.client.set_global_var("lamni_piezo_range_x", val) + return + if val > 80: + raise ValueError("Piezo range cannot be larger than 80 um.") + self.client.set_global_var("lamni_piezo_range_x", val) + + @property + def lamni_piezo_range_y(self): + val = self.client.get_global_var("lamni_piezo_range_y") + if val is None: + return 20 + return val + + @lamni_piezo_range_y.setter + def lamni_piezo_range_y(self, val: float): + if dev.rtx.user_parameter and dev.rtx.user_parameter.get("large_range_scan", True): + self.client.set_global_var("lamni_piezo_range_y", val) + return + if val > 80: + raise ValueError("Piezo range cannot be larger than 80 um.") + self.client.set_global_var("lamni_piezo_range_y", val) + + @property + def corridor_size(self): + val = self.client.get_global_var("corridor_size") + if val is None: + val = -1 + return val + + @corridor_size.setter + def corridor_size(self, val: float): + self.client.set_global_var("corridor_size", val) + + @property + def lamni_stitch_x(self): + val = self.client.get_global_var("lamni_stitch_x") + if val is None: + return 0 + return val + + @lamni_stitch_x.setter + @typechecked + def lamni_stitch_x(self, val: int): + self.client.set_global_var("lamni_stitch_x", val) + + @property + def lamni_stitch_y(self): + val = self.client.get_global_var("lamni_stitch_y") + if val is None: + return 0 + return val + + @lamni_stitch_y.setter + @typechecked + def lamni_stitch_y(self, val: int): + self.client.set_global_var("lamni_stitch_y", val) + + @property + def ptycho_reconstruct_foldername(self): + val = self.client.get_global_var("ptycho_reconstruct_foldername") + if val is None: + return "ptycho_reconstruct" + return val + + @ptycho_reconstruct_foldername.setter + def ptycho_reconstruct_foldername(self, val: str): + self.client.set_global_var("ptycho_reconstruct_foldername", val) + + @property + def tomo_angle_stepsize(self): + val = self.client.get_global_var("tomo_angle_stepsize") + if val is None: + return 10.0 + return val + + @tomo_angle_stepsize.setter + def tomo_angle_stepsize(self, val: float): + self.client.set_global_var("tomo_angle_stepsize", val) + + @property + def tomo_stitch_overlap(self): + val = self.client.get_global_var("tomo_stitch_overlap") + if val is None: + return 0.2 + return val + + @tomo_stitch_overlap.setter + def tomo_stitch_overlap(self, val: float): + self.client.set_global_var("tomo_stitch_overlap", val) + + @property + def golden_max_number_of_projections(self): + val = self.client.get_global_var("golden_max_number_of_projections") + if val is None: + return 1000.0 + return val + + @golden_max_number_of_projections.setter + def golden_max_number_of_projections(self, val: float): + self.client.set_global_var("golden_max_number_of_projections", val) + + @property + def golden_ratio_bunch_size(self): + val = self.client.get_global_var("golden_ratio_bunch_size") + if val is None: + return 20 + return val + + @golden_ratio_bunch_size.setter + def golden_ratio_bunch_size(self, val: int): + if val < 20: + raise ValueError("golden_ratio_bunch_size must be at least 20.") + self.client.set_global_var("golden_ratio_bunch_size", val) + + @property + def golden_projections_at_0_deg_for_damage_estimation(self): + val = self.client.get_global_var("golden_projections_at_0_deg_for_damage_estimation") + if val is None: + return 0 + return val + + @golden_projections_at_0_deg_for_damage_estimation.setter + def golden_projections_at_0_deg_for_damage_estimation(self, val: int): + self.client.set_global_var("golden_projections_at_0_deg_for_damage_estimation", val) + + @property + def sample_name(self): + val = self.client.get_global_var("sample_name") + if val is None: + return "bec_test_sample" + return val + + @sample_name.setter + @typechecked + def sample_name(self, val: str): + self.client.set_global_var("sample_name", val) + + # ------------------------------------------------------------------ + # Logging helpers + # ------------------------------------------------------------------ + + def write_to_spec_log(self, content): + try: + with open( + os.path.expanduser( + "~/Data10/specES1/log-files/specES1_started_2022_11_30_1313.log" + ), + "a", + ) as log_file: + log_file.write(content) + except Exception: + logger.warning("Failed to write to spec log file (omny web page).") + + def write_to_scilog(self, content, tags: list = None): + try: + if tags is not None: + tags.append("BEC") + else: + tags = ["BEC"] + msg = bec.logbook.LogbookMessage() + msg.add_text(content).add_tag(tags) + self.client.logbook.send_logbook_message(msg) + except Exception: + logger.warning("Failed to write to scilog.") + + def _write_subtomo_to_scilog(self, subtomo_number): + bec = builtins.__dict__.get("bec") + if self.tomo_id > 0: + tags = ["BEC_subtomo", self.sample_name, f"tomo_id_{self.tomo_id}"] + else: + tags = ["BEC_subtomo", self.sample_name] + self.write_to_scilog( + f"Starting subtomo: {subtomo_number}. First scan number: {bec.queue.next_scan_number}.", + tags, + ) + + def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None: + tomo_scan_numbers_file = os.path.expanduser( + "~/Data10/specES1/dat-files/tomography_scannumbers.txt" + ) + with open(tomo_scan_numbers_file, "a+") as out_file: + out_file.write( + f"{scan_number} {angle} {dev.lsamrot.read()['lsamrot']['value']:.3f}" + f" {self.tomo_id} {subtomo_number} {0} {'lamni'}\n" + ) + + # ------------------------------------------------------------------ + # Sample database + # ------------------------------------------------------------------ + + def add_sample_database( + self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user + ): + """Add a sample to the OMNY sample database and retrieve the tomo id.""" + subprocess.run( + "wget --user=omny --password=samples -q -O /tmp/currsamplesnr.txt" + f" 'https://omny.web.psi.ch/samples/newmeasurement.php?sample={samplename}" + f"&date={date}&eaccount={eaccount}&scannr={scan_number}&setup={setup}" + f"&additional={sample_additional_info}&user={user}'", + shell=True, + ) + with open("/tmp/currsamplesnr.txt") as tomo_number_file: + tomo_number = int(tomo_number_file.read()) + return tomo_number + + # ------------------------------------------------------------------ + # Scan projection + # ------------------------------------------------------------------ + + def tomo_scan_projection(self, angle: float): + scans = builtins.__dict__.get("scans") + + additional_correction = self.align.compute_additional_correction(angle) + additional_correction_2 = self.align.compute_additional_correction_2(angle) + correction_xeye_mu = self.align.lamni_compute_additional_correction_xeye_mu(angle) + + self._current_scan_list = [] + + for stitch_x in range(-self.lamni_stitch_x, self.lamni_stitch_x + 1): + for stitch_y in range(-self.lamni_stitch_y, self.lamni_stitch_y + 1): + # pylint: disable=undefined-variable + self._current_scan_list.append(bec.queue.next_scan_number) + log_message = ( + f"{str(datetime.datetime.now())}: LamNI scan projection at angle {angle}," + f" scan number {bec.queue.next_scan_number}.\n" + ) + self.write_to_spec_log(log_message) + corridor_size = self.corridor_size if self.corridor_size > 0 else None + scans.lamni_fermat_scan( + fov_size=[self.lamni_piezo_range_x, self.lamni_piezo_range_y], + step=self.tomo_shellstep, + stitch_x=stitch_x, + stitch_y=stitch_y, + stitch_overlap=self.tomo_stitch_overlap, + center_x=self.align.tomo_fovx_offset, + center_y=self.align.tomo_fovy_offset, + shift_x=( + self.manual_shift_x + + correction_xeye_mu[0] + - additional_correction[0] + - additional_correction_2[0] + ), + shift_y=( + self.manual_shift_y + + correction_xeye_mu[1] + - additional_correction[1] + - additional_correction_2[1] + ), + fov_circular=self.tomo_circfov, + angle=angle, + scan_type="fly", + exp_time=self.tomo_countingtime, + optim_trajectory_corridor=corridor_size, + ) + + def tomo_reconstruct(self, base_path="~/Data10/specES1"): + """Write the tomo reconstruct file for the reconstruction queue.""" + bec = builtins.__dict__.get("bec") + base_path = os.path.expanduser(base_path) + ptycho_queue_path = Path(os.path.join(base_path, self.ptycho_reconstruct_foldername)) + ptycho_queue_path.mkdir(parents=True, exist_ok=True) + + last_scan_number = bec.queue.next_scan_number - 1 + ptycho_queue_file = os.path.abspath( + os.path.join(ptycho_queue_path, f"scan_{last_scan_number:05d}.dat") + ) + with open(ptycho_queue_file, "w") as queue_file: + scans = " ".join([str(scan) for scan in self._current_scan_list]) + queue_file.write(f"p.scan_number {scans}\n") + queue_file.write("p.check_nextscan_started 1\n") + + def _at_each_angle(self, angle: float) -> None: + self.tomo_scan_projection(angle) + self.tomo_reconstruct() + + # ------------------------------------------------------------------ + # Progress reporting + # ------------------------------------------------------------------ + + def _print_progress(self): + print("\x1b[95mProgress report:") + print(f"Tomo type: ....................... {self.progress['tomo_type']}") + print(f"Projection: ...................... {self.progress['projection']}") + print(f"Total projections expected ....... {self.progress['total_projections']}") + print(f"Angle: ........................... {self.progress['angle']}") + print(f"Current subtomo: ................. {self.progress['subtomo']}") + print(f"Current projection within subtomo: {self.progress['subtomo_projection']}\x1b[0m") + + # ------------------------------------------------------------------ + # Tomo scan orchestration + # ------------------------------------------------------------------ + + def sub_tomo_scan(self, subtomo_number, start_angle=None): + """Perform one sub-tomogram (tomo_type 1 only).""" + self._write_subtomo_to_scilog(subtomo_number) + + if start_angle is None: + offsets = {1: 0, 2: 4, 3: 2, 4: 6, 5: 1, 6: 5, 7: 3, 8: 7} + start_angle = self.tomo_angle_stepsize / 8.0 * offsets[subtomo_number] + + angle_end = start_angle + 360 + angles = np.linspace( + start_angle, + angle_end, + num=int(360 / self.tomo_angle_stepsize) + 1, + endpoint=True, + ) + + if not (subtomo_number % 2): + angles = np.flip(angles) + + for angle in angles: + self.progress["tomo_type"] = "Equally spaced sub-tomograms" + self.progress["subtomo"] = subtomo_number + self.progress["subtomo_projection"] = np.where(angles == angle)[0][0] + self.progress["subtomo_total_projections"] = 360 / self.tomo_angle_stepsize + self.progress["projection"] = ( + (subtomo_number - 1) * self.progress["subtomo_total_projections"] + + self.progress["subtomo_projection"] + ) + self.progress["total_projections"] = 360 / self.tomo_angle_stepsize * 8 + self.progress["angle"] = angle + self._tomo_scan_at_angle(angle, subtomo_number) + + def _tomo_scan_at_angle(self, angle, subtomo_number): + successful = False + error_caught = False + if 0 <= angle < 360.05: + print(f"Starting LamNI scan for angle {angle} in subtomo {subtomo_number}") + self._print_progress() + while not successful: + self._start_beam_check() + if not self.special_angles: + self._current_special_angles = [] + if self._current_special_angles: + next_special_angle = self._current_special_angles[0] + if np.isclose(angle, next_special_angle, atol=0.5): + self._current_special_angles.pop(0) + num_repeats = self.special_angle_repeats + else: + num_repeats = 1 + try: + start_scan_number = bec.queue.next_scan_number + for i in range(num_repeats): + self._at_each_angle(angle) + error_caught = False + except AlarmBase as exc: + if exc.alarm_type == "TimeoutError": + bec.queue.request_queue_reset() + time.sleep(2) + error_caught = True + else: + raise exc + + end_scan_number = bec.queue.next_scan_number + for scan_nr in range(start_scan_number, end_scan_number): + self._write_tomo_scan_number(scan_nr, angle, subtomo_number) + + if self._was_beam_okay() and not error_caught: + successful = True + else: + self._wait_for_beamline_checks() + + def _golden(self, ii, howmany_sorted, maxangle=360, reverse=False): + """Return the ii-th golden ratio angle within sorted bunches and its subtomo number.""" + golden = [] + for iji in range( + (ii - (ii % howmany_sorted)), (ii - (ii % howmany_sorted)) + howmany_sorted, 1 + ): + golden.append( + ((iji * maxangle * (1 + pow(5, 0.5)) / 2) * 1000 % (maxangle * 1000)) / 1000 + ) + golden.sort() + subtomo_number = int(ii / howmany_sorted) + 1 + if reverse and not subtomo_number % 2: + golden.reverse() + return (golden[ii % howmany_sorted], subtomo_number) + + def _golden_equally_spaced( + self, ii, number_of_projections_per_subtomo, maxangle=360, reverse=True, verbose=False + ): + """Return angles for equally spaced tomography with golden ratio sub-tomogram starting angles.""" + angular_step = maxangle / number_of_projections_per_subtomo + subtomo_number = int((ii * angular_step) / maxangle) + 1 + start_angle = self._golden(subtomo_number - 1, 1, angular_step)[0] + projection_number_of_subtomo = ( + ii - (subtomo_number - 1) * number_of_projections_per_subtomo + ) + + if reverse: + if subtomo_number % 2: + angle = start_angle + projection_number_of_subtomo * angular_step + else: + angle = ( + start_angle + + (number_of_projections_per_subtomo - 1) * angular_step + - projection_number_of_subtomo * angular_step + ) + else: + angle = start_angle + projection_number_of_subtomo * angular_step + + if verbose: + print( + f"Equally spaced golden ratio tomography.\n" + f"Angular step: {angular_step}\n" + f"Subtomo Number: {subtomo_number}\n" + f"Angle: {angle}" + ) + return angle, subtomo_number + + def tomo_scan(self, subtomo_start=1, start_angle=None, projection_number=None): + """Start a tomo scan. + + Args: + subtomo_start (int): For tomo_type 1, the sub-tomogram to start from. Defaults to 1. + start_angle (float, optional): Override starting angle of the first sub-tomogram. + projection_number (int, optional): For tomo_types 2 and 3, resume from this index. + """ + bec = builtins.__dict__.get("bec") + scans = builtins.__dict__.get("scans") + self._current_special_angles = self.special_angles.copy() + + if ( + (self.tomo_type == 1 and subtomo_start == 1 and start_angle is None) + or (self.tomo_type == 2 and projection_number is None) + or (self.tomo_type == 3 and projection_number is None) + ): + self.tomo_id = self.add_sample_database( + self.sample_name, + str(datetime.date.today()), + bec.active_account.decode(), + bec.queue.next_scan_number, + "lamni", + "test additional info", + "BEC", + ) + self.write_pdf_report() + + with scans.dataset_id_on_hold: + if self.tomo_type == 1: + self.progress["tomo_type"] = "Equally spaced sub-tomograms" + for ii in range(subtomo_start, 9): + self.sub_tomo_scan(ii, start_angle=start_angle) + start_angle = None + + elif self.tomo_type == 2: + self.progress["tomo_type"] = "Golden ratio tomography" + previous_subtomo_number = -1 + ii = 0 if projection_number is None else projection_number + while True: + angle, subtomo_number = self._golden( + ii, self.golden_ratio_bunch_size, maxangle=360, reverse=True + ) + if previous_subtomo_number != subtomo_number: + self._write_subtomo_to_scilog(subtomo_number) + if ( + subtomo_number % 2 == 1 + and ii > 10 + and self.golden_projections_at_0_deg_for_damage_estimation == 1 + ): + self._tomo_scan_at_angle(0, subtomo_number) + previous_subtomo_number = subtomo_number + + self.progress["subtomo"] = subtomo_number + self.progress["projection"] = ii + self.progress["angle"] = angle + self.progress["subtomo_total_projections"] = self.golden_ratio_bunch_size + self.progress["subtomo_projection"] = ( + ii - (subtomo_number - 1) * self.golden_ratio_bunch_size + ) + self.progress["total_projections"] = self.golden_max_number_of_projections + + self._tomo_scan_at_angle(angle, subtomo_number) + ii += 1 + if ( + self.golden_max_number_of_projections > 0 + and ii > self.golden_max_number_of_projections + ): + print( + f"Golden ratio tomography stopped after" + f" {self.golden_max_number_of_projections} projections." + ) + break + + elif self.tomo_type == 3: + self.progress["tomo_type"] = "Equally spaced, golden ratio starting angles" + previous_subtomo_number = -1 + ii = 0 if projection_number is None else projection_number + while True: + angle, subtomo_number = self._golden_equally_spaced( + ii, int(360 / self.tomo_angle_stepsize), maxangle=360, reverse=True + ) + if previous_subtomo_number != subtomo_number: + self._write_subtomo_to_scilog(subtomo_number) + if ( + subtomo_number % 2 == 1 + and ii > 10 + and self.golden_projections_at_0_deg_for_damage_estimation == 1 + ): + self._tomo_scan_at_angle(0, subtomo_number) + previous_subtomo_number = subtomo_number + + self.progress["subtomo"] = subtomo_number + self.progress["projection"] = ii + self.progress["angle"] = angle + self.progress["subtomo_total_projections"] = 360 / self.tomo_angle_stepsize + self.progress["subtomo_projection"] = ( + ii - (subtomo_number - 1) * self.progress["subtomo_total_projections"] + ) + self.progress["total_projections"] = self.golden_max_number_of_projections + + self._tomo_scan_at_angle(angle, subtomo_number) + ii += 1 + if ( + self.golden_max_number_of_projections > 0 + and ii > self.golden_max_number_of_projections + ): + print( + f"Golden ratio tomography stopped after" + f" {self.golden_max_number_of_projections} projections." + ) + break + else: + raise ValueError(f"Unknown tomo_type: {self.tomo_type}.") + + # ------------------------------------------------------------------ + # Parameter display and interactive update + # ------------------------------------------------------------------ + + def tomo_parameters(self): + """Print and interactively update the tomo parameters.""" + print("Current settings:") + print(f"Counting time = {self.tomo_countingtime} s") + print(f"Stepsize microns = {self.tomo_shellstep}") + print( + f"Piezo range (max 80) = {self.lamni_piezo_range_x}," + f" {self.lamni_piezo_range_y}" + ) + print(f"Stitching number x,y = {self.lamni_stitch_x}, {self.lamni_stitch_y}") + print(f"Stitching overlap = {self.tomo_stitch_overlap}") + print(f"Circular FOV diam = {self.tomo_circfov}") + print(f"Reconstruction queue name = {self.ptycho_reconstruct_foldername}") + print("FOV offset rotates to find the ROI; manual shift moves the rotation center.") + print(f" _tomo_fovx_offset = {self.align.tomo_fovx_offset}") + print(f" _tomo_fovy_offset = {self.align.tomo_fovy_offset}") + print(f" _manual_shift_x = {self.manual_shift_x}") + print(f" _manual_shift_y = {self.manual_shift_y}") + print("") + if self.tomo_type == 1: + print("\x1b[1mTomo type 1:\x1b[0m 8 equally spaced sub-tomograms (360 deg)") + print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees") + print(f"Resulting in number of projections: {360/self.tomo_angle_stepsize*8}") + elif self.tomo_type == 2: + print("\x1b[1mTomo type 2:\x1b[0m Golden ratio tomography") + print(f"Sorted in bunches of: {self.golden_ratio_bunch_size}") + if self.golden_max_number_of_projections > 0: + print(f"Ending after {self.golden_max_number_of_projections} projections.") + else: + print("Ending by manual interruption.") + if self.golden_projections_at_0_deg_for_damage_estimation == 1: + print("Repeating projections at 0 deg at start of every second subtomogram.") + elif self.tomo_type == 3: + print("\x1b[1mTomo type 3:\x1b[0m Equally spaced, golden ratio starting angles") + print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees") + print(f"Number of projections per sub-tomogram: {360/self.tomo_angle_stepsize}") + if self.golden_max_number_of_projections > 0: + print(f"Ending after {self.golden_max_number_of_projections} projections.") + else: + print("Ending by manual interruption.") + if self.golden_projections_at_0_deg_for_damage_estimation == 1: + print("Repeating projections at 0 deg at start of every second subtomogram.") + print(f"\nSample name: {self.sample_name}\n") + + user_input = input("Are these parameters correctly set for your scan? ") + if user_input == "y": + print("good then") + return + + self.tomo_countingtime = self._get_val(" s", self.tomo_countingtime, float) + self.tomo_shellstep = self._get_val(" um", self.tomo_shellstep, float) + self.lamni_piezo_range_x = self._get_val( + " um", self.lamni_piezo_range_x, float + ) + self.lamni_piezo_range_y = self._get_val( + " um", self.lamni_piezo_range_y, float + ) + self.lamni_stitch_x = self._get_val("", self.lamni_stitch_x, int) + self.lamni_stitch_y = self._get_val("", self.lamni_stitch_y, int) + self.tomo_circfov = self._get_val(" um", self.tomo_circfov, float) + self.ptycho_reconstruct_foldername = self._get_val( + "Reconstruction queue", self.ptycho_reconstruct_foldername, str + ) + + print("Tomography type:") + print(" 1: 8 equally spaced sub-tomograms (360 deg)") + print(" 2: Golden ratio tomography") + print(" 3: Equally spaced tomography, golden ratio starting angle") + self.tomo_type = self._get_val("Tomography type", self.tomo_type, int) + + if self.tomo_type == 1: + tomo_numberofprojections = self._get_val( + "Number of projections", 360 / self.tomo_angle_stepsize * 8, int + ) + self.tomo_angle_stepsize = 360 / tomo_numberofprojections * 8 + print(f"Angular step in a subtomogram: {self.tomo_angle_stepsize}") + + elif self.tomo_type == 2: + while True: + bunch_size = self._get_val( + "Number of projections sorted per bunch (minimum 20)", + self.golden_ratio_bunch_size, + int, + ) + if bunch_size >= 20: + self.golden_ratio_bunch_size = bunch_size + break + print("Bunch size must be at least 20. Please try again.") + self.golden_max_number_of_projections = self._get_val( + "Stop after number of projections (0 for endless)", + self.golden_max_number_of_projections, + int, + ) + self.golden_projections_at_0_deg_for_damage_estimation = self._get_val( + "Repeat projections at 0 deg every second subtomo 1/0?", + self.golden_projections_at_0_deg_for_damage_estimation, + int, + ) + + elif self.tomo_type == 3: + numprj = self._get_val( + "Number of projections per sub-tomogram", + int(360 / self.tomo_angle_stepsize), + int, + ) + self.tomo_angle_stepsize = 360 / numprj + self.golden_max_number_of_projections = self._get_val( + "Stop after number of projections (0 for endless)", + self.golden_max_number_of_projections, + int, + ) + self.golden_projections_at_0_deg_for_damage_estimation = self._get_val( + "Repeat projections at 0 deg every second subtomo 1/0?", + self.golden_projections_at_0_deg_for_damage_estimation, + int, + ) + + self.sample_name = self._get_val("sample name", self.sample_name, str) + + @staticmethod + def _get_val(msg: str, default_value, data_type): + return data_type(input(f"{msg} ({default_value}): ") or default_value) + + # ------------------------------------------------------------------ + # PDF report + # ------------------------------------------------------------------ + + def write_pdf_report(self): + """Create and write the PDF report with current LamNI settings.""" + dev = builtins.__dict__.get("dev") + header = ( + " \n" * 3 + + " ::: ::: ::: ::: :::: ::: ::::::::::: \n" + + " :+: :+: :+: :+:+: :+:+: :+:+: :+: :+: \n" + + " +:+ +:+ +:+ +:+ +:+:+ +:+ :+:+:+ +:+ +:+ \n" + + " +#+ +#++:++#++: +#+ +:+ +#+ +#+ +:+ +#+ +#+ \n" + + " +#+ +#+ +#+ +#+ +#+ +#+ +#+#+# +#+ \n" + + " #+# #+# #+# #+# #+# #+# #+#+# #+# \n" + + " ########## ### ### ### ### ### #### ########### \n" + ) + padding = 20 + piezo_range = f"{self.lamni_piezo_range_x:.2f}/{self.lamni_piezo_range_y:.2f}" + stitching = f"{self.lamni_stitch_x:.2f}/{self.lamni_stitch_y:.2f}" + dataset_id = str(self.client.queue.next_dataset_number) + content = [ + f"{'Sample Name:':<{padding}}{self.sample_name:>{padding}}\n", + f"{'Measurement ID:':<{padding}}{str(self.tomo_id):>{padding}}\n", + f"{'Dataset ID:':<{padding}}{dataset_id:>{padding}}\n", + f"{'Sample Info:':<{padding}}{'Sample Info':>{padding}}\n", + f"{'e-account:':<{padding}}{str(self.client.username):>{padding}}\n", + f"{'Number of projections:':<{padding}}{int(360 / self.tomo_angle_stepsize * 8):>{padding}}\n", + f"{'First scan number:':<{padding}}{self.client.queue.next_scan_number:>{padding}}\n", + f"{'Last scan number approx.:':<{padding}}{self.client.queue.next_scan_number + int(360 / self.tomo_angle_stepsize * 8) + 10:>{padding}}\n", + f"{'Current photon energy:':<{padding}}{dev.mokev.read(cached=True)['value']:>{padding}.4f}\n", + f"{'Exposure time:':<{padding}}{self.tomo_countingtime:>{padding}.2f}\n", + f"{'Fermat spiral step size:':<{padding}}{self.tomo_shellstep:>{padding}.2f}\n", + f"{'Piezo range (FOV sample plane):':<{padding}}{piezo_range:>{padding}}\n", + f"{'Restriction to circular FOV:':<{padding}}{self.tomo_circfov:>{padding}.2f}\n", + f"{'Stitching:':<{padding}}{stitching:>{padding}}\n", + f"{'Number of individual sub-tomograms:':<{padding}}{8:>{padding}}\n", + f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n", + f"{'Tomo type:':<{padding}}{self.tomo_type:>{padding}}\n", + ] + content = "".join(content) + user_target = os.path.expanduser(f"~/Data10/documentation/tomo_scan_ID_{self.tomo_id}.pdf") + with PDFWriter(user_target) as file: + file.write(header) + file.write(content) + subprocess.run( + "xterm /work/sls/spec/local/XOMNY/bin/upload/upload_last_pon.sh &", shell=True + ) + msg = bec.logbook.LogbookMessage() + logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LamNI_logo.png") + msg.add_file(logo_path).add_text("".join(content).replace("\n", "

")).add_tag( + ["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "LamNI", self.sample_name] + ) + self.client.logbook.send_logbook_message(msg) + diff --git a/csaxs_bec/bec_ipython_client/plugins/LamNI/lamni_optics_mixin (1).py b/csaxs_bec/bec_ipython_client/plugins/LamNI/lamni_optics_mixin (1).py new file mode 100644 index 0000000..3ac50e5 --- /dev/null +++ b/csaxs_bec/bec_ipython_client/plugins/LamNI/lamni_optics_mixin (1).py @@ -0,0 +1,290 @@ +import builtins +import time + +from rich import box +from rich.console import Console +from rich.table import Table + +from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_put, fshclose +from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools + +dev = builtins.__dict__.get("dev") +umv = builtins.__dict__.get("umv") +bec = builtins.__dict__.get("bec") + + +class LamNIInitError(Exception): + pass + + +class LaMNIInitStages: + """Handles hardware initialization and referencing of LamNI stages.""" + + def __init__(self, client): + super().__init__() + self.client = client + self.OMNYTools = OMNYTools(self.client) + + def lamni_init_stages(self): + + if self.OMNYTools.yesno("Start initialization of LamNI stages. OK?"): + print("starting...") + dev.lsamrot.enabled = True + else: + return + + if self.check_all_axes_of_lamni_referenced(): + if self.OMNYTools.yesno("All axes are referenced. Continue anyways?"): + print("ok then...") + else: + return + + axis_id_lsamrot = dev.lsamrot._config["deviceConfig"].get("axis_Id") + if dev.lsamrot.controller.get_motor_limit_switch(axis_id_lsamrot)[1] == False: + if self.OMNYTools.yesno("The rotation stage will be moved to one limit"): + print("starting...") + else: + return + + self.drive_axis_to_limit(dev.lsamrot, "forward") + dev.lsamrot.enabled = False + print("Now hard reboot the controller and run the initialization routine again.") + print("The controller will be disabled in bec. To enable dev.lsamrot.enabled=True") + return + + if self.OMNYTools.yesno( + "Init of loptz. Can the stage move to the upstream limit without collision?" + ): + print("ok then...") + else: + return + + print("Referencing loptz") + self.drive_axis_to_limit(dev.loptz, "forward") + self.find_reference_mark(dev.loptz) + + print("Referencing loptx") + self.drive_axis_to_limit(dev.loptx, "reverse") + self.find_reference_mark(dev.loptx) + + print("Referencing lopty") + self.drive_axis_to_limit(dev.lopty, "forward") + self.find_reference_mark(dev.lopty) + + print("Referencing lsamx") + self.drive_axis_to_limit(dev.lsamx, "forward") + self.find_reference_mark(dev.lsamx) + + print("Referencing lsamy") + self.drive_axis_to_limit(dev.lsamy, "reverse") + self.find_reference_mark(dev.lsamy) + + print("Referencing lsamrot") + self.drive_axis_to_limit(dev.lsamrot, "reverse") + time.sleep(0.1) + self.find_reference_mark(dev.lsamrot) + + if self.OMNYTools.yesno( + "Init of leye. Can the stage move to -x limit without collision?" + ): + print("starting...") + else: + return + + print("Referencing leyex") + self.drive_axis_to_limit(dev.leyex, "forward") + print("Referencing leyey") + self.drive_axis_to_limit(dev.leyey, "forward") + + print("Init of Smaract stages") + dev.losax.controller.find_reference_mark(2, 0, 1000, 1) + time.sleep(1) + dev.losax.controller.find_reference_mark(0, 0, 1000, 1) + time.sleep(1) + dev.losax.controller.find_reference_mark(1, 0, 1000, 1) + time.sleep(1) + + self._align_setup() + + def find_reference_mark(self, device): + axis_id = device._config["deviceConfig"].get("axis_Id") + axis_id_numeric = self.axis_id_to_numeric(axis_id) + device.controller.find_reference(axis_id_numeric) + + def drive_axis_to_limit(self, device, direction): + axis_id = device._config["deviceConfig"].get("axis_Id") + axis_id_numeric = self.axis_id_to_numeric(axis_id) + device.controller.drive_axis_to_limit(axis_id_numeric, direction) + + def axis_id_to_numeric(self, axis_id) -> int: + return ord(axis_id.lower()) - 97 + + def _align_setup(self): + if self.OMNYTools.yesno("Start moving stages to default initial positions?"): + print("Start moving stages...") + else: + print("Stopping.") + return + + lsamx_center = dev.lsamx.user_parameter.get("center") + if lsamx_center is None: + raise LamNIInitError( + "Could not find a lsamx center position. Please check your device config." + ) + lsamy_center = dev.lsamy.user_parameter.get("center") + if lsamy_center is None: + raise LamNIInitError( + "Could not find a lsamy center position. Please check your device config." + ) + umv(dev.lsamx, lsamx_center, dev.lsamy, lsamy_center, dev.loptx, -0.3, dev.lopty, 0) + umv(dev.losax, -1) + umv(dev.loptz, 82.25) + umv(dev.lsamrot, -1) + umv(dev.lsamrot, 0) + + time.sleep(2) + dev.rtx.controller.feedback_disable_and_even_reset_lamni_angle_interferometer() + + def check_all_axes_of_lamni_referenced(self): + if ( + dev.losax.controller.axis_is_referenced(0) + & dev.losax.controller.axis_is_referenced(1) + & dev.losax.controller.axis_is_referenced(2) + & dev.lsamx.controller.all_axes_referenced() + ): + print("All axes of LamNI are referenced.") + return True + else: + return False + + +class LamNIOpticsMixin: + """Optics movement methods: FZP, OSA, central stop and X-ray eye.""" + + @staticmethod + def _get_user_param_safe(device, var): + param = dev[device].user_parameter + if not param or param.get(var) is None: + raise ValueError(f"Device {device} has no user parameter definition for {var}.") + return param.get(var) + + def leye_out(self): + self.loptics_in() + fshclose() + leyey_out = self._get_user_param_safe("leyey", "out") + umv(dev.leyey, leyey_out) + + epics_put("XOMNYI-XEYE-ACQ:0", 2) + umv(dev.lsamrot, 0) + umv(dev.dttrz, 5854, dev.fttrz, 2395) + + def leye_in(self): + bec.queue.next_dataset_number += 1 + umv(dev.lsamrot, 0) + umv(dev.dttrz, 6419.677, dev.fttrz, 2959.979) + while True: + moved_out = (input("Did the flight tube move out? (Y/n)") or "y").lower() + if moved_out == "y": + break + if moved_out == "n": + return + leyex_in = self._get_user_param_safe("leyex", "in") + leyey_in = self._get_user_param_safe("leyey", "in") + umv(dev.leyex, leyex_in, dev.leyey, leyey_in) + self.align.update_frame() + + def _lfzp_in(self): + loptx_in = self._get_user_param_safe("loptx", "in") + lopty_in = self._get_user_param_safe("lopty", "in") + umv(dev.loptx, loptx_in, dev.lopty, lopty_in) + + def lfzp_in(self): + """Move in the LamNI zone plate, disabling/re-enabling RT feedback around the move.""" + if "rtx" in dev and dev.rtx.enabled: + dev.rtx.controller.feedback_disable() + + self._lfzp_in() + + if "rtx" in dev and dev.rtx.enabled: + dev.rtx.controller.feedback_enable_with_reset() + + def loptics_in(self): + """Move in the LamNI optics (FZP + OSA).""" + self.lfzp_in() + self.losa_in() + + def loptics_out(self): + """Move out the LamNI optics.""" + if "rtx" in dev and dev.rtx.enabled: + dev.rtx.controller.feedback_disable() + + self.losa_out() + loptx_out = self._get_user_param_safe("loptx", "out") + lopty_out = self._get_user_param_safe("lopty", "out") + umv(dev.loptx, loptx_out, dev.lopty, lopty_out) + + if "rtx" in dev and dev.rtx.enabled: + time.sleep(1) + dev.rtx.controller.feedback_enable_with_reset() + + def lcs_in(self): + pass + + def lcs_out(self): + umv(dev.lcsy, 3) + + def losa_in(self): + losax_in = self._get_user_param_safe("losax", "in") + losay_in = self._get_user_param_safe("losay", "in") + losaz_in = self._get_user_param_safe("losaz", "in") + umv(dev.losax, losax_in, dev.losay, losay_in) + umv(dev.losaz, losaz_in) + + def losa_out(self): + losay_out = self._get_user_param_safe("losay", "out") + losaz_out = self._get_user_param_safe("losaz", "out") + umv(dev.losaz, losaz_out) + umv(dev.losay, losay_out) + + def lfzp_info(self, mokev_val=-1): + if mokev_val == -1: + try: + mokev_val = dev.mokev.readback.get() + except Exception: + print( + "Device mokev does not exist. You can specify the energy in keV as an argument instead." + ) + return + loptz_val = dev.loptz.read()["loptz"]["value"] + distance = -loptz_val + 85.6 + 52 + print(f"The sample is in a distance of {distance:.1f} mm from the FZP.") + + diameters = [80e-6, 100e-6, 120e-6, 150e-6, 170e-6, 200e-6, 220e-6, 250e-6] + + console = Console() + table = Table( + title=f"At the current energy of {mokev_val:.4f} keV we have following options:", + box=box.SQUARE, + ) + table.add_column("Diameter", justify="center") + table.add_column("Focal distance", justify="center") + table.add_column("Current beam size", justify="center") + + wavelength = 1.2398e-9 / mokev_val + + for diameter in diameters: + outermost_zonewidth = 60e-9 + focal_distance = diameter * outermost_zonewidth / wavelength + beam_size = ( + -diameter / (focal_distance * 1000) * (focal_distance * 1000 - distance) * 1e6 + ) + table.add_row( + f"{diameter*1e6:.2f} microns", + f"{focal_distance:.2f} mm", + f"{beam_size:.2f} microns", + ) + + console.print(table) + print( + "The numbers presented here are for a sample in the plane of the lamni sample holder.\n" + )