Compare commits
6 Commits
fixes/lamn
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| bc31c00e1f | |||
|
|
38671f074e | ||
|
|
92e39a5f75 | ||
|
|
22c48115a4 | ||
| 2a7448526b | |||
|
|
a5825307e5 |
@@ -1,2 +1,6 @@
|
||||
from .load_additional_correction import lamni_read_additional_correction
|
||||
from .x_ray_eye_align import DataDrivenLamNI, LamNI, MagLamNI, XrayEyeAlign
|
||||
from .alignment import XrayEyeAlign
|
||||
from .lamni import LamNI
|
||||
from .lamni_optics_mixin import LamNIInitError, LaMNIInitStages, LamNIOpticsMixin
|
||||
__all__ = [
|
||||
"LamNI", "XrayEyeAlign", "LamNIInitError", "LaMNIInitStages", "LamNIOpticsMixin"
|
||||
]
|
||||
461
csaxs_bec/bec_ipython_client/plugins/LamNI/alignment.py
Normal file
461
csaxs_bec/bec_ipython_client/plugins/LamNI/alignment.py
Normal file
@@ -0,0 +1,461 @@
|
||||
import builtins
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
import numpy as np
|
||||
from bec_lib import bec_logger
|
||||
from typeguard import typechecked
|
||||
|
||||
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
|
||||
|
||||
class XrayEyeAlign:
|
||||
# pixel calibration, multiply to get mm
|
||||
# PIXEL_CALIBRATION = 0.2/209 #.2 with binning
|
||||
PIXEL_CALIBRATION = 0.2 / 218 # .2 with binning
|
||||
|
||||
def __init__(self, client, lamni) -> None:
|
||||
self.client = client
|
||||
self.lamni = lamni
|
||||
self.device_manager = client.device_manager
|
||||
self.scans = client.scans
|
||||
self.alignment_values = defaultdict(list)
|
||||
self._reset_init_values()
|
||||
self.corr_pos_x = []
|
||||
self.corr_pos_y = []
|
||||
self.corr_angle = []
|
||||
self.corr_pos_x_2 = []
|
||||
self.corr_pos_y_2 = []
|
||||
self.corr_angle_2 = []
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Correction reset
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def reset_correction(self):
|
||||
self.corr_pos_x = []
|
||||
self.corr_pos_y = []
|
||||
self.corr_angle = []
|
||||
|
||||
def reset_correction_2(self):
|
||||
self.corr_pos_x_2 = []
|
||||
self.corr_pos_y_2 = []
|
||||
self.corr_angle_2 = []
|
||||
|
||||
def reset_xray_eye_correction(self):
|
||||
self.client.delete_global_var("tomo_fit_xray_eye")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# FOV offset properties
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@property
|
||||
def tomo_fovx_offset(self):
|
||||
val = self.client.get_global_var("tomo_fov_offset")
|
||||
if val is None:
|
||||
return 0.0
|
||||
return val[0] / 1000
|
||||
|
||||
@tomo_fovx_offset.setter
|
||||
@typechecked
|
||||
def tomo_fovx_offset(self, val: float):
|
||||
val_old = self.client.get_global_var("tomo_fov_offset")
|
||||
if val_old is None:
|
||||
val_old = [0.0, 0.0]
|
||||
self.client.set_global_var("tomo_fov_offset", [val * 1000, val_old[1]])
|
||||
|
||||
@property
|
||||
def tomo_fovy_offset(self):
|
||||
val = self.client.get_global_var("tomo_fov_offset")
|
||||
if val is None:
|
||||
return 0.0
|
||||
return val[1] / 1000
|
||||
|
||||
@tomo_fovy_offset.setter
|
||||
@typechecked
|
||||
def tomo_fovy_offset(self, val: float):
|
||||
val_old = self.client.get_global_var("tomo_fov_offset")
|
||||
if val_old is None:
|
||||
val_old = [0.0, 0.0]
|
||||
self.client.set_global_var("tomo_fov_offset", [val_old[0], val * 1000])
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _reset_init_values(self):
|
||||
self.shift_xy = [0, 0]
|
||||
self._xray_fov_xy = [0, 0]
|
||||
|
||||
def _disable_rt_feedback(self):
|
||||
self.device_manager.devices.rtx.controller.feedback_disable()
|
||||
|
||||
def _enable_rt_feedback(self):
|
||||
self.device_manager.devices.rtx.controller.feedback_enable_with_reset()
|
||||
|
||||
def tomo_rotate(self, val: float):
|
||||
# pylint: disable=undefined-variable
|
||||
umv(self.device_manager.devices.lsamrot, val)
|
||||
|
||||
def get_tomo_angle(self):
|
||||
return self.device_manager.devices.lsamrot.readback.read()["lsamrot"]["value"]
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# X-ray eye camera control
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def save_frame(self):
|
||||
epics_put("XOMNYI-XEYE-SAVFRAME:0", 1)
|
||||
|
||||
def update_frame(self):
|
||||
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
|
||||
# start live
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 1)
|
||||
# wait for start live
|
||||
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
|
||||
time.sleep(0.5)
|
||||
print("waiting for live view to start...")
|
||||
fshopen()
|
||||
|
||||
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
|
||||
|
||||
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
|
||||
print("waiting for new frame...")
|
||||
time.sleep(0.5)
|
||||
|
||||
time.sleep(0.5)
|
||||
# stop live view
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 0)
|
||||
time.sleep(1)
|
||||
print("got new frame")
|
||||
|
||||
def update_fov(self, k: int):
|
||||
self._xray_fov_xy[0] = max(epics_get(f"XOMNYI-XEYE-XWIDTH_X:{k}"), self._xray_fov_xy[0])
|
||||
self._xray_fov_xy[1] = max(0, self._xray_fov_xy[0])
|
||||
|
||||
@property
|
||||
def movement_buttons_enabled(self):
|
||||
return [epics_get("XOMNYI-XEYE-ENAMVX:0"), epics_get("XOMNYI-XEYE-ENAMVY:0")]
|
||||
|
||||
@movement_buttons_enabled.setter
|
||||
def movement_buttons_enabled(self, enabled: bool):
|
||||
enabled = int(enabled)
|
||||
epics_put("XOMNYI-XEYE-ENAMVX:0", enabled)
|
||||
epics_put("XOMNYI-XEYE-ENAMVY:0", enabled)
|
||||
|
||||
def send_message(self, msg: str):
|
||||
epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Alignment procedure
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def align(self):
|
||||
self._reset_init_values()
|
||||
self.reset_correction()
|
||||
self.reset_correction_2()
|
||||
|
||||
self._disable_rt_feedback()
|
||||
epics_put("XOMNYI-XEYE-PIXELSIZE:0", self.PIXEL_CALIBRATION)
|
||||
self._enable_rt_feedback()
|
||||
|
||||
self.movement_buttons_enabled = False
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 0)
|
||||
self.send_message("please wait...")
|
||||
epics_put("XOMNYI-XEYE-SAMPLENAME:0.DESC", "Let us LAMNI...")
|
||||
|
||||
self._disable_rt_feedback()
|
||||
k = 0
|
||||
|
||||
self.lamni.lfzp_in()
|
||||
self.update_frame()
|
||||
|
||||
self.movement_buttons_enabled = False
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
|
||||
epics_put("XOMNYI-XEYE-STEP:0", 0)
|
||||
self.send_message("Submit center value of FZP.")
|
||||
|
||||
while True:
|
||||
if epics_get("XOMNYI-XEYE-SUBMIT:0") == 1:
|
||||
val_x = epics_get(f"XOMNYI-XEYE-XVAL_X:{k}") * self.PIXEL_CALIBRATION # in mm
|
||||
val_y = epics_get(f"XOMNYI-XEYE-YVAL_Y:{k}") * self.PIXEL_CALIBRATION # in mm
|
||||
self.alignment_values[k] = [val_x, val_y]
|
||||
print(
|
||||
f"Clicked position {k}: x {self.alignment_values[k][0]}, y"
|
||||
f" {self.alignment_values[k][1]}"
|
||||
)
|
||||
|
||||
if k == 0: # received center value of FZP
|
||||
self.send_message("please wait ...")
|
||||
self.lamni.loptics_out()
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
|
||||
self.movement_buttons_enabled = False
|
||||
print("Moving sample in, FZP out")
|
||||
|
||||
self._disable_rt_feedback()
|
||||
time.sleep(0.3)
|
||||
self._enable_rt_feedback()
|
||||
time.sleep(0.3)
|
||||
|
||||
self.update_frame()
|
||||
self.send_message("Go and find the sample")
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
|
||||
self.movement_buttons_enabled = True
|
||||
|
||||
elif k == 1: # received sample center value at samrot 0
|
||||
msg = (
|
||||
f"Base shift values from movement are x {self.shift_xy[0]}, y"
|
||||
f" {self.shift_xy[1]}"
|
||||
)
|
||||
print(msg)
|
||||
logger.info(msg)
|
||||
self.shift_xy[0] += (
|
||||
self.alignment_values[0][0] - self.alignment_values[1][0]
|
||||
) * 1000
|
||||
self.shift_xy[1] += (
|
||||
self.alignment_values[1][1] - self.alignment_values[0][1]
|
||||
) * 1000
|
||||
print(
|
||||
"Base shift values from movement and clicked position are x"
|
||||
f" {self.shift_xy[0]}, y {self.shift_xy[1]}"
|
||||
)
|
||||
|
||||
self.scans.lamni_move_to_scan_center(
|
||||
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
|
||||
).wait()
|
||||
|
||||
self.send_message("please wait ...")
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
|
||||
self.movement_buttons_enabled = False
|
||||
time.sleep(1)
|
||||
|
||||
self.scans.lamni_move_to_scan_center(
|
||||
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
|
||||
).wait()
|
||||
|
||||
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
|
||||
self.update_frame()
|
||||
self.send_message("Submit sample center and FOV (0 deg)")
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
|
||||
self.update_fov(k)
|
||||
|
||||
elif 1 < k < 10: # received sample center value at samrot 0 ... 315
|
||||
self.send_message("please wait ...")
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
|
||||
|
||||
self._disable_rt_feedback()
|
||||
self.tomo_rotate((k - 1) * 45 - 45 / 2)
|
||||
self.scans.lamni_move_to_scan_center(
|
||||
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
|
||||
).wait()
|
||||
self._disable_rt_feedback()
|
||||
self.tomo_rotate((k - 1) * 45)
|
||||
self.scans.lamni_move_to_scan_center(
|
||||
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
|
||||
).wait()
|
||||
|
||||
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
|
||||
self.update_frame()
|
||||
self.send_message("Submit sample center")
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
|
||||
epics_put("XOMNYI-XEYE-ENAMVX:0", 1)
|
||||
self.update_fov(k)
|
||||
|
||||
elif k == 10: # received sample center value at samrot 270, done
|
||||
self.send_message("done...")
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
|
||||
self.movement_buttons_enabled = False
|
||||
self.update_fov(k)
|
||||
break
|
||||
|
||||
k += 1
|
||||
epics_put("XOMNYI-XEYE-STEP:0", k)
|
||||
|
||||
if k < 2:
|
||||
_xrayeyalignmvx = epics_get("XOMNYI-XEYE-MVX:0")
|
||||
_xrayeyalignmvy = epics_get("XOMNYI-XEYE-MVY:0")
|
||||
if _xrayeyalignmvx != 0 or _xrayeyalignmvy != 0:
|
||||
self.shift_xy[0] = self.shift_xy[0] + _xrayeyalignmvx
|
||||
self.shift_xy[1] = self.shift_xy[1] + _xrayeyalignmvy
|
||||
self.scans.lamni_move_to_scan_center(
|
||||
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
|
||||
).wait()
|
||||
print(
|
||||
f"Current center horizontal {self.shift_xy[0]} vertical {self.shift_xy[1]}"
|
||||
)
|
||||
epics_put("XOMNYI-XEYE-MVY:0", 0)
|
||||
epics_put("XOMNYI-XEYE-MVX:0", 0)
|
||||
self.update_frame()
|
||||
|
||||
time.sleep(0.2)
|
||||
|
||||
self.write_output()
|
||||
fovx = self._xray_fov_xy[0] * self.PIXEL_CALIBRATION * 1000 / 2
|
||||
fovy = self._xray_fov_xy[1] * self.PIXEL_CALIBRATION * 1000 / 2
|
||||
print(
|
||||
f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns,"
|
||||
f" fovy = {fovy:.0f} microns"
|
||||
)
|
||||
print("Use matlab routine to fit the current alignment...")
|
||||
print(
|
||||
"This additional shift is applied to the base shift values\n which are x"
|
||||
f" {self.shift_xy[0]}, y {self.shift_xy[1]}"
|
||||
)
|
||||
|
||||
self._disable_rt_feedback()
|
||||
self.tomo_rotate(0)
|
||||
|
||||
print(
|
||||
"\n\nNEXT LOAD ALIGNMENT PARAMETERS\nby running"
|
||||
" lamni.align.read_xray_eye_correction()\n"
|
||||
)
|
||||
|
||||
self.client.set_global_var("tomo_fov_offset", self.shift_xy)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Alignment output
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def write_output(self):
|
||||
import os
|
||||
with open(
|
||||
os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues"), "w"
|
||||
) as alignment_values_file:
|
||||
alignment_values_file.write("angle\thorizontal\tvertical\n")
|
||||
for k in range(2, 11):
|
||||
fovx_offset = (self.alignment_values[0][0] - self.alignment_values[k][0]) * 1000
|
||||
fovy_offset = (self.alignment_values[k][1] - self.alignment_values[0][1]) * 1000
|
||||
print(
|
||||
f"Writing to file new alignment: number {k}, value x {fovx_offset}, y"
|
||||
f" {fovy_offset}"
|
||||
)
|
||||
alignment_values_file.write(f"{(k-2)*45}\t{fovx_offset}\t{fovy_offset}\n")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# X-ray eye sinusoidal correction (loaded from MATLAB fit files)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def read_xray_eye_correction(self, dir_path=None):
|
||||
import os
|
||||
if dir_path is None:
|
||||
dir_path = os.path.expanduser("~/Data10/specES1/internal/")
|
||||
tomo_fit_xray_eye = np.zeros((2, 3))
|
||||
for i, axis in enumerate(["x", "y"]):
|
||||
for j, coeff in enumerate(["A", "B", "C"]):
|
||||
with open(os.path.join(dir_path, f"ptychotomoalign_{coeff}{axis}.txt"), "r") as f:
|
||||
tomo_fit_xray_eye[i][j] = f.readline()
|
||||
|
||||
self.client.set_global_var("tomo_fit_xray_eye", tomo_fit_xray_eye.tolist())
|
||||
# x amp, phase, offset, y amp, phase, offset
|
||||
# 0 0 0 1 0 2 1 0 1 1 1 2
|
||||
print("New alignment parameters loaded from X-ray eye")
|
||||
print(
|
||||
f"X Amplitude {tomo_fit_xray_eye[0][0]}, "
|
||||
f"X Phase {tomo_fit_xray_eye[0][1]}, "
|
||||
f"X Offset {tomo_fit_xray_eye[0][2]}, "
|
||||
f"Y Amplitude {tomo_fit_xray_eye[1][0]}, "
|
||||
f"Y Phase {tomo_fit_xray_eye[1][1]}, "
|
||||
f"Y Offset {tomo_fit_xray_eye[1][2]}"
|
||||
)
|
||||
|
||||
def lamni_compute_additional_correction_xeye_mu(self, angle):
|
||||
"""Compute sinusoidal correction from the X-ray eye fit for the given angle."""
|
||||
tomo_fit_xray_eye = self.client.get_global_var("tomo_fit_xray_eye")
|
||||
if tomo_fit_xray_eye is None:
|
||||
print("Not applying any additional correction. No x-ray eye data available.\n")
|
||||
return (0, 0)
|
||||
|
||||
# x amp, phase, offset, y amp, phase, offset
|
||||
# 0 0 0 1 0 2 1 0 1 1 1 2
|
||||
correction_x = (
|
||||
tomo_fit_xray_eye[0][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[0][1])
|
||||
+ tomo_fit_xray_eye[0][2]
|
||||
) / 1000
|
||||
correction_y = (
|
||||
tomo_fit_xray_eye[1][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[1][1])
|
||||
+ tomo_fit_xray_eye[1][2]
|
||||
) / 1000
|
||||
|
||||
print(f"Xeye correction x {correction_x}, y {correction_y} for angle {angle}\n")
|
||||
return (correction_x, correction_y)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Additional lookup-table corrections (iteration 1 and 2)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def read_additional_correction(self, correction_file: str):
|
||||
self.corr_pos_x, self.corr_pos_y, self.corr_angle = self._read_correction_file_xy(
|
||||
correction_file
|
||||
)
|
||||
|
||||
def read_additional_correction_2(self, correction_file: str):
|
||||
self.corr_pos_x_2, self.corr_pos_y_2, self.corr_angle_2 = self._read_correction_file_xy(
|
||||
correction_file
|
||||
)
|
||||
|
||||
def _read_correction_file_xy(self, correction_file: str):
|
||||
"""Parse a correction file that contains corr_pos_x, corr_pos_y and corr_angle entries."""
|
||||
with open(correction_file, "r") as f:
|
||||
num_elements = f.readline()
|
||||
int_num_elements = int(num_elements.split(" ")[2])
|
||||
print(int_num_elements)
|
||||
corr_pos_x = []
|
||||
corr_pos_y = []
|
||||
corr_angle = []
|
||||
for j in range(0, int_num_elements * 3):
|
||||
line = f.readline()
|
||||
value = line.split(" ")[2]
|
||||
name = line.split(" ")[0].split("[")[0]
|
||||
if name == "corr_pos_x":
|
||||
corr_pos_x.append(float(value) / 1000)
|
||||
elif name == "corr_pos_y":
|
||||
corr_pos_y.append(float(value) / 1000)
|
||||
elif name == "corr_angle":
|
||||
corr_angle.append(float(value))
|
||||
return corr_pos_x, corr_pos_y, corr_angle
|
||||
|
||||
def compute_additional_correction(self, angle):
|
||||
return self._compute_correction_xy(
|
||||
angle, self.corr_pos_x, self.corr_pos_y, self.corr_angle, label="1"
|
||||
)
|
||||
|
||||
def compute_additional_correction_2(self, angle):
|
||||
return self._compute_correction_xy(
|
||||
angle, self.corr_pos_x_2, self.corr_pos_y_2, self.corr_angle_2, label="2"
|
||||
)
|
||||
|
||||
def _compute_correction_xy(self, angle, corr_pos_x, corr_pos_y, corr_angle, label=""):
|
||||
"""Find the correction for the closest angle in the lookup table."""
|
||||
if not corr_pos_x:
|
||||
print(f"Not applying additional correction {label}. No data available.\n")
|
||||
return (0, 0)
|
||||
|
||||
shift_x = corr_pos_x[0]
|
||||
shift_y = corr_pos_y[0]
|
||||
angledelta = np.fabs(corr_angle[0] - angle)
|
||||
|
||||
for j in range(1, len(corr_pos_x)):
|
||||
newangledelta = np.fabs(corr_angle[j] - angle)
|
||||
if newangledelta < angledelta:
|
||||
shift_x = corr_pos_x[j]
|
||||
shift_y = corr_pos_y[j]
|
||||
angledelta = newangledelta
|
||||
|
||||
if shift_x == 0 and angle < corr_angle[0]:
|
||||
shift_x = corr_pos_x[0]
|
||||
shift_y = corr_pos_y[0]
|
||||
|
||||
if shift_x == 0 and angle > corr_angle[-1]:
|
||||
shift_x = corr_pos_x[-1]
|
||||
shift_y = corr_pos_y[-1]
|
||||
|
||||
print(f"Additional correction shifts {label}: {shift_x} {shift_y}")
|
||||
return (shift_x, shift_y)
|
||||
211
csaxs_bec/bec_ipython_client/plugins/LamNI/extra_tomo.py
Normal file
211
csaxs_bec/bec_ipython_client/plugins/LamNI/extra_tomo.py
Normal file
@@ -0,0 +1,211 @@
|
||||
"""
|
||||
extra_tomo.py
|
||||
=============
|
||||
Specialist LamNI subclasses for specific experimental configurations.
|
||||
Import explicitly when needed, e.g.:
|
||||
|
||||
from csaxs_bec...extra_tomo import MagLamNI
|
||||
from csaxs_bec...extra_tomo import DataDrivenLamNI
|
||||
"""
|
||||
|
||||
import builtins
|
||||
import datetime
|
||||
import os
|
||||
import time
|
||||
|
||||
import h5py
|
||||
import numpy as np
|
||||
from bec_lib import bec_logger
|
||||
from bec_lib.alarm_handler import AlarmBase
|
||||
|
||||
from .lamni import LamNI
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
|
||||
class MagLamNI(LamNI):
|
||||
"""LamNI subclass for magnetic experiments (XMCD).
|
||||
|
||||
Adds a slow rotation helper and allows injection of a custom
|
||||
per-angle callback via the ``lamni_at_each_angle`` builtin.
|
||||
"""
|
||||
|
||||
def sub_tomo_scan(self, subtomo_number, start_angle=None):
|
||||
super().sub_tomo_scan(subtomo_number, start_angle)
|
||||
# self.rotate_slowly(0)
|
||||
|
||||
def rotate_slowly(self, angle, step_size=20):
|
||||
"""Rotate to target angle in small steps to avoid mechanical stress."""
|
||||
current_angle = dev.lsamrot.read(cached=True)["value"]
|
||||
steps = int(np.ceil(np.abs(current_angle - angle) / step_size)) + 1
|
||||
for target_angle in np.linspace(current_angle, angle, steps, endpoint=True):
|
||||
umv(dev.lsamrot, target_angle)
|
||||
scans.lamni_move_to_scan_center(
|
||||
self.align.tomo_fovx_offset, self.align.tomo_fovy_offset, target_angle
|
||||
)
|
||||
|
||||
def _at_each_angle(self, angle: float) -> None:
|
||||
if "lamni_at_each_angle" in builtins.__dict__:
|
||||
# pylint: disable=undefined-variable
|
||||
lamni_at_each_angle(self, angle)
|
||||
return
|
||||
|
||||
self.tomo_scan_projection(angle)
|
||||
self.tomo_reconstruct()
|
||||
|
||||
|
||||
class DataDrivenLamNI(LamNI):
|
||||
"""LamNI subclass that reads per-projection scan parameters from an HDF5 file.
|
||||
|
||||
Instead of a fixed FOV and step size for the whole tomogram, each
|
||||
projection can have individual values for step size, loptz position
|
||||
and lateral shifts, as specified in a datadriven_params.h5 file.
|
||||
"""
|
||||
|
||||
def __init__(self, client):
|
||||
super().__init__(client)
|
||||
self.tomo_data = {}
|
||||
|
||||
def tomo_scan(
|
||||
self,
|
||||
subtomo_start=1,
|
||||
start_index=None,
|
||||
fname="~/Data10/data_driven_config/datadriven_params.h5",
|
||||
):
|
||||
"""Start a data-driven tomo scan.
|
||||
|
||||
Args:
|
||||
subtomo_start (int): Unused; kept for API compatibility. Use start_index to resume.
|
||||
start_index (int, optional): Skip projections before this index. Defaults to None.
|
||||
fname (str): Path to the HDF5 parameter file. Defaults to the standard location.
|
||||
"""
|
||||
bec = builtins.__dict__.get("bec")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
fname = os.path.expanduser(fname)
|
||||
if not os.path.exists(fname):
|
||||
raise FileNotFoundError(f"Could not find datadriven params file in {fname}.")
|
||||
|
||||
content = f"Loading tomo parameters from {fname}."
|
||||
logger.warning(content)
|
||||
msg = bec.logbook.LogbookMessage()
|
||||
msg.add_text(content).add_tag(["Data_driven_file", "BEC"])
|
||||
self.client.logbook.send_logbook_message(msg)
|
||||
|
||||
self._update_tomo_data_from_file(fname)
|
||||
self._current_special_angles = self.special_angles.copy()
|
||||
|
||||
if subtomo_start == 1 and start_index is None:
|
||||
self.tomo_id = self.add_sample_database(
|
||||
self.sample_name,
|
||||
str(datetime.date.today()),
|
||||
bec.active_account.decode(),
|
||||
bec.queue.next_scan_number,
|
||||
"lamni",
|
||||
"test additional info",
|
||||
"BEC",
|
||||
)
|
||||
self.write_pdf_report()
|
||||
|
||||
with scans.dataset_id_on_hold:
|
||||
self.sub_tomo_data_driven(start_index)
|
||||
|
||||
def sub_tomo_scan(self, subtomo_number=None, start_angle=None):
|
||||
raise NotImplementedError(
|
||||
"Cannot run sub_tomo_scan with DataDrivenLamNI. "
|
||||
"Use lamni.tomo_scan(start_index=<N>) to resume instead."
|
||||
)
|
||||
|
||||
def _at_each_angle(
|
||||
self, angle=None, stepsize=None, loptz_pos=None, manual_shift_x=0, manual_shift_y=0
|
||||
):
|
||||
self.manual_shift_x = manual_shift_x
|
||||
self.manual_shift_y = manual_shift_y
|
||||
self.tomo_shellstep = stepsize
|
||||
if loptz_pos is not None:
|
||||
dev.rtx.controller.feedback_disable()
|
||||
umv(dev.loptz, loptz_pos)
|
||||
super()._at_each_angle(angle=angle)
|
||||
|
||||
def sub_tomo_data_driven(self, start_index=None):
|
||||
"""Iterate over all projections defined in the loaded HDF5 parameter file."""
|
||||
for scan_index, scan_data in enumerate(zip(*self.tomo_data.values())):
|
||||
if start_index and scan_index < start_index:
|
||||
continue
|
||||
(
|
||||
angle,
|
||||
stepsize,
|
||||
loptz_pos,
|
||||
propagation_distance,
|
||||
manual_shift_x,
|
||||
manual_shift_y,
|
||||
subtomo_number,
|
||||
) = scan_data
|
||||
bec.metadata.update(
|
||||
{key: float(val) for key, val in zip(self.tomo_data.keys(), scan_data)}
|
||||
)
|
||||
successful = False
|
||||
error_caught = False
|
||||
if 0 <= angle < 360.05:
|
||||
print(f"Starting LamNI scan for angle {angle}")
|
||||
while not successful:
|
||||
self._start_beam_check()
|
||||
if not self.special_angles:
|
||||
self._current_special_angles = []
|
||||
if self._current_special_angles:
|
||||
next_special_angle = self._current_special_angles[0]
|
||||
if np.isclose(angle, next_special_angle, atol=0.5):
|
||||
self._current_special_angles.pop(0)
|
||||
num_repeats = self.special_angle_repeats
|
||||
else:
|
||||
num_repeats = 1
|
||||
try:
|
||||
start_scan_number = bec.queue.next_scan_number
|
||||
for i in range(num_repeats):
|
||||
self._at_each_angle(
|
||||
float(angle),
|
||||
stepsize=float(stepsize),
|
||||
loptz_pos=float(loptz_pos),
|
||||
manual_shift_x=float(manual_shift_x),
|
||||
manual_shift_y=float(manual_shift_y),
|
||||
)
|
||||
error_caught = False
|
||||
except AlarmBase as exc:
|
||||
if exc.alarm_type == "TimeoutError":
|
||||
bec.queue.request_queue_reset()
|
||||
time.sleep(2)
|
||||
error_caught = True
|
||||
else:
|
||||
raise exc
|
||||
|
||||
if self._was_beam_okay() and not error_caught:
|
||||
successful = True
|
||||
else:
|
||||
self._wait_for_beamline_checks()
|
||||
|
||||
end_scan_number = bec.queue.next_scan_number
|
||||
for scan_nr in range(start_scan_number, end_scan_number):
|
||||
self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
|
||||
|
||||
def _update_tomo_data_from_file(self, fname: str) -> None:
|
||||
"""Load projection parameters from the HDF5 file into self.tomo_data."""
|
||||
with h5py.File(fname, "r") as file:
|
||||
self.tomo_data["theta"] = np.array([*file["theta"]]).flatten()
|
||||
self.tomo_data["stepsize"] = np.array([*file["stepsize"]]).flatten()
|
||||
self.tomo_data["loptz"] = np.array([*file["loptz"]]).flatten()
|
||||
self.tomo_data["propagation_distance"] = np.array(
|
||||
[*file["relative_propagation_distance"]]
|
||||
).flatten()
|
||||
self.tomo_data["manual_shift_x"] = np.array([*file["manual_shift_x"]]).flatten()
|
||||
self.tomo_data["manual_shift_y"] = np.array([*file["manual_shift_y"]]).flatten()
|
||||
self.tomo_data["subtomo_id"] = np.array([*file["subtomo_id"]]).flatten()
|
||||
|
||||
shapes = [data.shape for data in self.tomo_data.values()]
|
||||
if len(set(shapes)) > 1:
|
||||
raise ValueError(f"Tomo data file has entries of inconsistent lengths: {shapes}.")
|
||||
1013
csaxs_bec/bec_ipython_client/plugins/LamNI/lamni.py
Normal file
1013
csaxs_bec/bec_ipython_client/plugins/LamNI/lamni.py
Normal file
File diff suppressed because it is too large
Load Diff
@@ -8,8 +8,6 @@ from rich.table import Table
|
||||
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_put, fshclose
|
||||
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
|
||||
|
||||
|
||||
# import builtins to avoid linter errors
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
bec = builtins.__dict__.get("bec")
|
||||
@@ -19,7 +17,9 @@ class LamNIInitError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class LaMNIInitStagesMixin:
|
||||
class LaMNIInitStages:
|
||||
"""Handles hardware initialization and referencing of LamNI stages."""
|
||||
|
||||
def __init__(self, client):
|
||||
super().__init__()
|
||||
self.client = client
|
||||
@@ -28,14 +28,11 @@ class LaMNIInitStagesMixin:
|
||||
def lamni_init_stages(self):
|
||||
|
||||
if self.OMNYTools.yesno("Start initialization of LamNI stages. OK?"):
|
||||
print("staring...")
|
||||
print("starting...")
|
||||
dev.lsamrot.enabled = True
|
||||
else:
|
||||
return
|
||||
|
||||
|
||||
|
||||
|
||||
if self.check_all_axes_of_lamni_referenced():
|
||||
if self.OMNYTools.yesno("All axes are referenced. Continue anyways?"):
|
||||
print("ok then...")
|
||||
@@ -44,7 +41,6 @@ class LaMNIInitStagesMixin:
|
||||
|
||||
axis_id_lsamrot = dev.lsamrot._config["deviceConfig"].get("axis_Id")
|
||||
if dev.lsamrot.controller.get_motor_limit_switch(axis_id_lsamrot)[1] == False:
|
||||
|
||||
if self.OMNYTools.yesno("The rotation stage will be moved to one limit"):
|
||||
print("starting...")
|
||||
else:
|
||||
@@ -56,7 +52,9 @@ class LaMNIInitStagesMixin:
|
||||
print("The controller will be disabled in bec. To enable dev.lsamrot.enabled=True")
|
||||
return
|
||||
|
||||
if self.OMNYTools.yesno("Init of loptz. Can the stage move to the upstream limit without collision?"):
|
||||
if self.OMNYTools.yesno(
|
||||
"Init of loptz. Can the stage move to the upstream limit without collision?"
|
||||
):
|
||||
print("ok then...")
|
||||
else:
|
||||
return
|
||||
@@ -81,13 +79,14 @@ class LaMNIInitStagesMixin:
|
||||
self.drive_axis_to_limit(dev.lsamy, "reverse")
|
||||
self.find_reference_mark(dev.lsamy)
|
||||
|
||||
# the dual encoder requires the reference mark to pass on both encoders
|
||||
print("Referencing lsamrot")
|
||||
self.drive_axis_to_limit(dev.lsamrot, "reverse")
|
||||
time.sleep(0.1)
|
||||
self.find_reference_mark(dev.lsamrot)
|
||||
|
||||
if self.OMNYTools.yesno("Init of leye. Can the stage move to -x limit without collision?"):
|
||||
if self.OMNYTools.yesno(
|
||||
"Init of leye. Can the stage move to -x limit without collision?"
|
||||
):
|
||||
print("starting...")
|
||||
else:
|
||||
return
|
||||
@@ -97,15 +96,6 @@ class LaMNIInitStagesMixin:
|
||||
print("Referencing leyey")
|
||||
self.drive_axis_to_limit(dev.leyey, "forward")
|
||||
|
||||
# set_lm lsamx 6 14
|
||||
# set_lm lsamy 6 14
|
||||
# set_lm lsamrot -3 362
|
||||
# set_lm loptx -1 -0.2
|
||||
# set_lm lopty 3.0 3.6
|
||||
# set_lm loptz 82 87
|
||||
# set_lm leyex 0 25
|
||||
# set_lm leyey 0.5 50
|
||||
|
||||
print("Init of Smaract stages")
|
||||
dev.losax.controller.find_reference_mark(2, 0, 1000, 1)
|
||||
time.sleep(1)
|
||||
@@ -113,15 +103,6 @@ class LaMNIInitStagesMixin:
|
||||
time.sleep(1)
|
||||
dev.losax.controller.find_reference_mark(1, 0, 1000, 1)
|
||||
time.sleep(1)
|
||||
# dev.losax.controller.find_reference_mark(3, 1, 1000, 1)
|
||||
# time.sleep(1)
|
||||
# dev.losax.controller.find_reference_mark(4, 1, 1000, 1)
|
||||
# time.sleep(1)
|
||||
|
||||
# set_lm losax -1.5 0.25
|
||||
# set_lm losay -2.5 4.1
|
||||
# set_lm losaz -4.1 -0.5
|
||||
# set_lm lcsy -1.5 5
|
||||
|
||||
self._align_setup()
|
||||
|
||||
@@ -178,6 +159,8 @@ class LaMNIInitStagesMixin:
|
||||
|
||||
|
||||
class LamNIOpticsMixin:
|
||||
"""Optics movement methods: FZP, OSA, central stop and X-ray eye."""
|
||||
|
||||
@staticmethod
|
||||
def _get_user_param_safe(device, var):
|
||||
param = dev[device].user_parameter
|
||||
@@ -192,13 +175,11 @@ class LamNIOpticsMixin:
|
||||
umv(dev.leyey, leyey_out)
|
||||
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 2)
|
||||
# move rotation stage to zero to avoid problems with wires
|
||||
umv(dev.lsamrot, 0)
|
||||
umv(dev.dttrz, 5854, dev.fttrz, 2395)
|
||||
|
||||
def leye_in(self):
|
||||
bec.queue.next_dataset_number += 1
|
||||
# move rotation stage to zero to avoid problems with wires
|
||||
umv(dev.lsamrot, 0)
|
||||
umv(dev.dttrz, 6419.677, dev.fttrz, 2959.979)
|
||||
while True:
|
||||
@@ -215,15 +196,10 @@ class LamNIOpticsMixin:
|
||||
def _lfzp_in(self):
|
||||
loptx_in = self._get_user_param_safe("loptx", "in")
|
||||
lopty_in = self._get_user_param_safe("lopty", "in")
|
||||
umv(
|
||||
dev.loptx, loptx_in, dev.lopty, lopty_in
|
||||
) # for 7.2567 keV and 150 mu, 60 nm fzp, loptz 83.6000 for propagation 1.4 mm
|
||||
umv(dev.loptx, loptx_in, dev.lopty, lopty_in)
|
||||
|
||||
def lfzp_in(self):
|
||||
"""
|
||||
move in the lamni zone plate.
|
||||
This will disable rt feedback, move the FZP and re-enabled the feedback.
|
||||
"""
|
||||
"""Move in the LamNI zone plate, disabling/re-enabling RT feedback around the move."""
|
||||
if "rtx" in dev and dev.rtx.enabled:
|
||||
dev.rtx.controller.feedback_disable()
|
||||
|
||||
@@ -233,18 +209,15 @@ class LamNIOpticsMixin:
|
||||
dev.rtx.controller.feedback_enable_with_reset()
|
||||
|
||||
def loptics_in(self):
|
||||
"""
|
||||
Move in the lamni optics, including the FZP and the OSA.
|
||||
"""
|
||||
"""Move in the LamNI optics (FZP + OSA)."""
|
||||
self.lfzp_in()
|
||||
self.losa_in()
|
||||
|
||||
def loptics_out(self):
|
||||
"""Move out the lamni optics"""
|
||||
"""Move out the LamNI optics."""
|
||||
if "rtx" in dev and dev.rtx.enabled:
|
||||
dev.rtx.controller.feedback_disable()
|
||||
|
||||
# self.lcs_out()
|
||||
self.losa_out()
|
||||
loptx_out = self._get_user_param_safe("loptx", "out")
|
||||
lopty_out = self._get_user_param_safe("lopty", "out")
|
||||
@@ -255,28 +228,17 @@ class LamNIOpticsMixin:
|
||||
dev.rtx.controller.feedback_enable_with_reset()
|
||||
|
||||
def lcs_in(self):
|
||||
# umv lcsx -1.852 lcsy -0.095
|
||||
pass
|
||||
|
||||
def lcs_out(self):
|
||||
umv(dev.lcsy, 3)
|
||||
|
||||
def losa_in(self):
|
||||
# 6.2 keV, 170 um FZP
|
||||
# umv(dev.losax, -1.4450000, dev.losay, -0.1800)
|
||||
# umv(dev.losaz, -1)
|
||||
# 6.7, 170
|
||||
# umv(dev.losax, -1.4850, dev.losay, -0.1930)
|
||||
# umv(dev.losaz, 1.0000)
|
||||
# 7.2, 150
|
||||
losax_in = self._get_user_param_safe("losax", "in")
|
||||
losay_in = self._get_user_param_safe("losay", "in")
|
||||
losaz_in = self._get_user_param_safe("losaz", "in")
|
||||
umv(dev.losax, losax_in, dev.losay, losay_in)
|
||||
umv(dev.losaz, losaz_in)
|
||||
# 11 kev
|
||||
# umv(dev.losax, -1.161000, dev.losay, -0.196)
|
||||
# umv(dev.losaz, 1.0000)
|
||||
|
||||
def losa_out(self):
|
||||
losay_out = self._get_user_param_safe("losay", "out")
|
||||
@@ -285,11 +247,10 @@ class LamNIOpticsMixin:
|
||||
umv(dev.losay, losay_out)
|
||||
|
||||
def lfzp_info(self, mokev_val=-1):
|
||||
|
||||
if mokev_val == -1:
|
||||
try:
|
||||
mokev_val = dev.mokev.readback.get()
|
||||
except:
|
||||
except Exception:
|
||||
print(
|
||||
"Device mokev does not exist. You can specify the energy in keV as an argument instead."
|
||||
)
|
||||
@@ -324,10 +285,6 @@ class LamNIOpticsMixin:
|
||||
)
|
||||
|
||||
console.print(table)
|
||||
|
||||
print("OSA Information:")
|
||||
# print(f"Current losaz %.1f\n", A[losaz])
|
||||
# print("The OSA will collide with the sample plane at %.1f\n\n", 89.3-A[loptz])
|
||||
print(
|
||||
"The numbers presented here are for a sample in the plane of the lamni sample holder.\n"
|
||||
)
|
||||
|
||||
@@ -1,22 +0,0 @@
|
||||
def lamni_read_additional_correction():
|
||||
# "additional_correction_shift"
|
||||
# [0][] x , [1][] y, [2][] angle, [3][0] number of elements
|
||||
|
||||
with open("correction_lamni_um_S01405_.txt", "r") as f:
|
||||
num_elements = f.readline()
|
||||
int_num_elements = int(num_elements.split(" ")[2])
|
||||
print(int_num_elements)
|
||||
corr_pos_x = []
|
||||
corr_pos_y = []
|
||||
corr_angle = []
|
||||
for j in range(0, int_num_elements * 3):
|
||||
line = f.readline()
|
||||
value = line.split(" ")[2]
|
||||
name = line.split(" ")[0].split("[")[0]
|
||||
if name == "corr_pos_x":
|
||||
corr_pos_x.append(value)
|
||||
elif name == "corr_pos_y":
|
||||
corr_pos_y.append(value)
|
||||
elif name == "corr_angle":
|
||||
corr_angle.append(value)
|
||||
return (corr_pos_x, corr_pos_y, corr_angle, num_elements)
|
||||
File diff suppressed because it is too large
Load Diff
@@ -65,10 +65,8 @@ class RtLamniController(Controller):
|
||||
"_position_sampling_single_read",
|
||||
"_position_sampling_single_reset_and_start_sampling",
|
||||
"show_signal_strength_interferometer",
|
||||
"show_interferometer_positions",
|
||||
"show_analog_signals",
|
||||
"show_feedback_status",
|
||||
|
||||
]
|
||||
|
||||
def __init__(
|
||||
|
||||
@@ -35,16 +35,16 @@ def test_save_frame(bec_client_mock):
|
||||
lamni = LamNI(client)
|
||||
align = XrayEyeAlign(client, lamni)
|
||||
with mock.patch(
|
||||
"csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put"
|
||||
"csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
|
||||
) as epics_put_mock:
|
||||
align.save_frame()
|
||||
epics_put_mock.assert_called_once_with("XOMNYI-XEYE-SAVFRAME:0", 1)
|
||||
|
||||
|
||||
def test_update_frame(bec_client_mock):
|
||||
epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put"
|
||||
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_get"
|
||||
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.fshopen"
|
||||
epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
|
||||
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_get"
|
||||
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.fshopen"
|
||||
client = bec_client_mock
|
||||
client.device_manager.devices.xeye = DeviceBase(
|
||||
name="xeye",
|
||||
|
||||
Reference in New Issue
Block a user