Compare commits
9 Commits
feat/add_p
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| bc31c00e1f | |||
|
|
38671f074e | ||
|
|
92e39a5f75 | ||
|
|
22c48115a4 | ||
| 2a7448526b | |||
|
|
a5825307e5 | ||
|
|
54f1f42332 | ||
|
|
48df15f35c | ||
|
|
6f60bd4b2b |
@@ -1,2 +1,6 @@
|
||||
from .load_additional_correction import lamni_read_additional_correction
|
||||
from .x_ray_eye_align import DataDrivenLamNI, LamNI, MagLamNI, XrayEyeAlign
|
||||
from .alignment import XrayEyeAlign
|
||||
from .lamni import LamNI
|
||||
from .lamni_optics_mixin import LamNIInitError, LaMNIInitStages, LamNIOpticsMixin
|
||||
__all__ = [
|
||||
"LamNI", "XrayEyeAlign", "LamNIInitError", "LaMNIInitStages", "LamNIOpticsMixin"
|
||||
]
|
||||
461
csaxs_bec/bec_ipython_client/plugins/LamNI/alignment.py
Normal file
461
csaxs_bec/bec_ipython_client/plugins/LamNI/alignment.py
Normal file
@@ -0,0 +1,461 @@
|
||||
import builtins
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
import numpy as np
|
||||
from bec_lib import bec_logger
|
||||
from typeguard import typechecked
|
||||
|
||||
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
|
||||
|
||||
class XrayEyeAlign:
|
||||
# pixel calibration, multiply to get mm
|
||||
# PIXEL_CALIBRATION = 0.2/209 #.2 with binning
|
||||
PIXEL_CALIBRATION = 0.2 / 218 # .2 with binning
|
||||
|
||||
def __init__(self, client, lamni) -> None:
|
||||
self.client = client
|
||||
self.lamni = lamni
|
||||
self.device_manager = client.device_manager
|
||||
self.scans = client.scans
|
||||
self.alignment_values = defaultdict(list)
|
||||
self._reset_init_values()
|
||||
self.corr_pos_x = []
|
||||
self.corr_pos_y = []
|
||||
self.corr_angle = []
|
||||
self.corr_pos_x_2 = []
|
||||
self.corr_pos_y_2 = []
|
||||
self.corr_angle_2 = []
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Correction reset
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def reset_correction(self):
|
||||
self.corr_pos_x = []
|
||||
self.corr_pos_y = []
|
||||
self.corr_angle = []
|
||||
|
||||
def reset_correction_2(self):
|
||||
self.corr_pos_x_2 = []
|
||||
self.corr_pos_y_2 = []
|
||||
self.corr_angle_2 = []
|
||||
|
||||
def reset_xray_eye_correction(self):
|
||||
self.client.delete_global_var("tomo_fit_xray_eye")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# FOV offset properties
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@property
|
||||
def tomo_fovx_offset(self):
|
||||
val = self.client.get_global_var("tomo_fov_offset")
|
||||
if val is None:
|
||||
return 0.0
|
||||
return val[0] / 1000
|
||||
|
||||
@tomo_fovx_offset.setter
|
||||
@typechecked
|
||||
def tomo_fovx_offset(self, val: float):
|
||||
val_old = self.client.get_global_var("tomo_fov_offset")
|
||||
if val_old is None:
|
||||
val_old = [0.0, 0.0]
|
||||
self.client.set_global_var("tomo_fov_offset", [val * 1000, val_old[1]])
|
||||
|
||||
@property
|
||||
def tomo_fovy_offset(self):
|
||||
val = self.client.get_global_var("tomo_fov_offset")
|
||||
if val is None:
|
||||
return 0.0
|
||||
return val[1] / 1000
|
||||
|
||||
@tomo_fovy_offset.setter
|
||||
@typechecked
|
||||
def tomo_fovy_offset(self, val: float):
|
||||
val_old = self.client.get_global_var("tomo_fov_offset")
|
||||
if val_old is None:
|
||||
val_old = [0.0, 0.0]
|
||||
self.client.set_global_var("tomo_fov_offset", [val_old[0], val * 1000])
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _reset_init_values(self):
|
||||
self.shift_xy = [0, 0]
|
||||
self._xray_fov_xy = [0, 0]
|
||||
|
||||
def _disable_rt_feedback(self):
|
||||
self.device_manager.devices.rtx.controller.feedback_disable()
|
||||
|
||||
def _enable_rt_feedback(self):
|
||||
self.device_manager.devices.rtx.controller.feedback_enable_with_reset()
|
||||
|
||||
def tomo_rotate(self, val: float):
|
||||
# pylint: disable=undefined-variable
|
||||
umv(self.device_manager.devices.lsamrot, val)
|
||||
|
||||
def get_tomo_angle(self):
|
||||
return self.device_manager.devices.lsamrot.readback.read()["lsamrot"]["value"]
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# X-ray eye camera control
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def save_frame(self):
|
||||
epics_put("XOMNYI-XEYE-SAVFRAME:0", 1)
|
||||
|
||||
def update_frame(self):
|
||||
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
|
||||
# start live
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 1)
|
||||
# wait for start live
|
||||
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
|
||||
time.sleep(0.5)
|
||||
print("waiting for live view to start...")
|
||||
fshopen()
|
||||
|
||||
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
|
||||
|
||||
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
|
||||
print("waiting for new frame...")
|
||||
time.sleep(0.5)
|
||||
|
||||
time.sleep(0.5)
|
||||
# stop live view
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 0)
|
||||
time.sleep(1)
|
||||
print("got new frame")
|
||||
|
||||
def update_fov(self, k: int):
|
||||
self._xray_fov_xy[0] = max(epics_get(f"XOMNYI-XEYE-XWIDTH_X:{k}"), self._xray_fov_xy[0])
|
||||
self._xray_fov_xy[1] = max(0, self._xray_fov_xy[0])
|
||||
|
||||
@property
|
||||
def movement_buttons_enabled(self):
|
||||
return [epics_get("XOMNYI-XEYE-ENAMVX:0"), epics_get("XOMNYI-XEYE-ENAMVY:0")]
|
||||
|
||||
@movement_buttons_enabled.setter
|
||||
def movement_buttons_enabled(self, enabled: bool):
|
||||
enabled = int(enabled)
|
||||
epics_put("XOMNYI-XEYE-ENAMVX:0", enabled)
|
||||
epics_put("XOMNYI-XEYE-ENAMVY:0", enabled)
|
||||
|
||||
def send_message(self, msg: str):
|
||||
epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Alignment procedure
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def align(self):
|
||||
self._reset_init_values()
|
||||
self.reset_correction()
|
||||
self.reset_correction_2()
|
||||
|
||||
self._disable_rt_feedback()
|
||||
epics_put("XOMNYI-XEYE-PIXELSIZE:0", self.PIXEL_CALIBRATION)
|
||||
self._enable_rt_feedback()
|
||||
|
||||
self.movement_buttons_enabled = False
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 0)
|
||||
self.send_message("please wait...")
|
||||
epics_put("XOMNYI-XEYE-SAMPLENAME:0.DESC", "Let us LAMNI...")
|
||||
|
||||
self._disable_rt_feedback()
|
||||
k = 0
|
||||
|
||||
self.lamni.lfzp_in()
|
||||
self.update_frame()
|
||||
|
||||
self.movement_buttons_enabled = False
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
|
||||
epics_put("XOMNYI-XEYE-STEP:0", 0)
|
||||
self.send_message("Submit center value of FZP.")
|
||||
|
||||
while True:
|
||||
if epics_get("XOMNYI-XEYE-SUBMIT:0") == 1:
|
||||
val_x = epics_get(f"XOMNYI-XEYE-XVAL_X:{k}") * self.PIXEL_CALIBRATION # in mm
|
||||
val_y = epics_get(f"XOMNYI-XEYE-YVAL_Y:{k}") * self.PIXEL_CALIBRATION # in mm
|
||||
self.alignment_values[k] = [val_x, val_y]
|
||||
print(
|
||||
f"Clicked position {k}: x {self.alignment_values[k][0]}, y"
|
||||
f" {self.alignment_values[k][1]}"
|
||||
)
|
||||
|
||||
if k == 0: # received center value of FZP
|
||||
self.send_message("please wait ...")
|
||||
self.lamni.loptics_out()
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
|
||||
self.movement_buttons_enabled = False
|
||||
print("Moving sample in, FZP out")
|
||||
|
||||
self._disable_rt_feedback()
|
||||
time.sleep(0.3)
|
||||
self._enable_rt_feedback()
|
||||
time.sleep(0.3)
|
||||
|
||||
self.update_frame()
|
||||
self.send_message("Go and find the sample")
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
|
||||
self.movement_buttons_enabled = True
|
||||
|
||||
elif k == 1: # received sample center value at samrot 0
|
||||
msg = (
|
||||
f"Base shift values from movement are x {self.shift_xy[0]}, y"
|
||||
f" {self.shift_xy[1]}"
|
||||
)
|
||||
print(msg)
|
||||
logger.info(msg)
|
||||
self.shift_xy[0] += (
|
||||
self.alignment_values[0][0] - self.alignment_values[1][0]
|
||||
) * 1000
|
||||
self.shift_xy[1] += (
|
||||
self.alignment_values[1][1] - self.alignment_values[0][1]
|
||||
) * 1000
|
||||
print(
|
||||
"Base shift values from movement and clicked position are x"
|
||||
f" {self.shift_xy[0]}, y {self.shift_xy[1]}"
|
||||
)
|
||||
|
||||
self.scans.lamni_move_to_scan_center(
|
||||
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
|
||||
).wait()
|
||||
|
||||
self.send_message("please wait ...")
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
|
||||
self.movement_buttons_enabled = False
|
||||
time.sleep(1)
|
||||
|
||||
self.scans.lamni_move_to_scan_center(
|
||||
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
|
||||
).wait()
|
||||
|
||||
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
|
||||
self.update_frame()
|
||||
self.send_message("Submit sample center and FOV (0 deg)")
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
|
||||
self.update_fov(k)
|
||||
|
||||
elif 1 < k < 10: # received sample center value at samrot 0 ... 315
|
||||
self.send_message("please wait ...")
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
|
||||
|
||||
self._disable_rt_feedback()
|
||||
self.tomo_rotate((k - 1) * 45 - 45 / 2)
|
||||
self.scans.lamni_move_to_scan_center(
|
||||
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
|
||||
).wait()
|
||||
self._disable_rt_feedback()
|
||||
self.tomo_rotate((k - 1) * 45)
|
||||
self.scans.lamni_move_to_scan_center(
|
||||
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
|
||||
).wait()
|
||||
|
||||
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
|
||||
self.update_frame()
|
||||
self.send_message("Submit sample center")
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
|
||||
epics_put("XOMNYI-XEYE-ENAMVX:0", 1)
|
||||
self.update_fov(k)
|
||||
|
||||
elif k == 10: # received sample center value at samrot 270, done
|
||||
self.send_message("done...")
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
|
||||
self.movement_buttons_enabled = False
|
||||
self.update_fov(k)
|
||||
break
|
||||
|
||||
k += 1
|
||||
epics_put("XOMNYI-XEYE-STEP:0", k)
|
||||
|
||||
if k < 2:
|
||||
_xrayeyalignmvx = epics_get("XOMNYI-XEYE-MVX:0")
|
||||
_xrayeyalignmvy = epics_get("XOMNYI-XEYE-MVY:0")
|
||||
if _xrayeyalignmvx != 0 or _xrayeyalignmvy != 0:
|
||||
self.shift_xy[0] = self.shift_xy[0] + _xrayeyalignmvx
|
||||
self.shift_xy[1] = self.shift_xy[1] + _xrayeyalignmvy
|
||||
self.scans.lamni_move_to_scan_center(
|
||||
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
|
||||
).wait()
|
||||
print(
|
||||
f"Current center horizontal {self.shift_xy[0]} vertical {self.shift_xy[1]}"
|
||||
)
|
||||
epics_put("XOMNYI-XEYE-MVY:0", 0)
|
||||
epics_put("XOMNYI-XEYE-MVX:0", 0)
|
||||
self.update_frame()
|
||||
|
||||
time.sleep(0.2)
|
||||
|
||||
self.write_output()
|
||||
fovx = self._xray_fov_xy[0] * self.PIXEL_CALIBRATION * 1000 / 2
|
||||
fovy = self._xray_fov_xy[1] * self.PIXEL_CALIBRATION * 1000 / 2
|
||||
print(
|
||||
f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns,"
|
||||
f" fovy = {fovy:.0f} microns"
|
||||
)
|
||||
print("Use matlab routine to fit the current alignment...")
|
||||
print(
|
||||
"This additional shift is applied to the base shift values\n which are x"
|
||||
f" {self.shift_xy[0]}, y {self.shift_xy[1]}"
|
||||
)
|
||||
|
||||
self._disable_rt_feedback()
|
||||
self.tomo_rotate(0)
|
||||
|
||||
print(
|
||||
"\n\nNEXT LOAD ALIGNMENT PARAMETERS\nby running"
|
||||
" lamni.align.read_xray_eye_correction()\n"
|
||||
)
|
||||
|
||||
self.client.set_global_var("tomo_fov_offset", self.shift_xy)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Alignment output
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def write_output(self):
|
||||
import os
|
||||
with open(
|
||||
os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues"), "w"
|
||||
) as alignment_values_file:
|
||||
alignment_values_file.write("angle\thorizontal\tvertical\n")
|
||||
for k in range(2, 11):
|
||||
fovx_offset = (self.alignment_values[0][0] - self.alignment_values[k][0]) * 1000
|
||||
fovy_offset = (self.alignment_values[k][1] - self.alignment_values[0][1]) * 1000
|
||||
print(
|
||||
f"Writing to file new alignment: number {k}, value x {fovx_offset}, y"
|
||||
f" {fovy_offset}"
|
||||
)
|
||||
alignment_values_file.write(f"{(k-2)*45}\t{fovx_offset}\t{fovy_offset}\n")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# X-ray eye sinusoidal correction (loaded from MATLAB fit files)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def read_xray_eye_correction(self, dir_path=None):
|
||||
import os
|
||||
if dir_path is None:
|
||||
dir_path = os.path.expanduser("~/Data10/specES1/internal/")
|
||||
tomo_fit_xray_eye = np.zeros((2, 3))
|
||||
for i, axis in enumerate(["x", "y"]):
|
||||
for j, coeff in enumerate(["A", "B", "C"]):
|
||||
with open(os.path.join(dir_path, f"ptychotomoalign_{coeff}{axis}.txt"), "r") as f:
|
||||
tomo_fit_xray_eye[i][j] = f.readline()
|
||||
|
||||
self.client.set_global_var("tomo_fit_xray_eye", tomo_fit_xray_eye.tolist())
|
||||
# x amp, phase, offset, y amp, phase, offset
|
||||
# 0 0 0 1 0 2 1 0 1 1 1 2
|
||||
print("New alignment parameters loaded from X-ray eye")
|
||||
print(
|
||||
f"X Amplitude {tomo_fit_xray_eye[0][0]}, "
|
||||
f"X Phase {tomo_fit_xray_eye[0][1]}, "
|
||||
f"X Offset {tomo_fit_xray_eye[0][2]}, "
|
||||
f"Y Amplitude {tomo_fit_xray_eye[1][0]}, "
|
||||
f"Y Phase {tomo_fit_xray_eye[1][1]}, "
|
||||
f"Y Offset {tomo_fit_xray_eye[1][2]}"
|
||||
)
|
||||
|
||||
def lamni_compute_additional_correction_xeye_mu(self, angle):
|
||||
"""Compute sinusoidal correction from the X-ray eye fit for the given angle."""
|
||||
tomo_fit_xray_eye = self.client.get_global_var("tomo_fit_xray_eye")
|
||||
if tomo_fit_xray_eye is None:
|
||||
print("Not applying any additional correction. No x-ray eye data available.\n")
|
||||
return (0, 0)
|
||||
|
||||
# x amp, phase, offset, y amp, phase, offset
|
||||
# 0 0 0 1 0 2 1 0 1 1 1 2
|
||||
correction_x = (
|
||||
tomo_fit_xray_eye[0][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[0][1])
|
||||
+ tomo_fit_xray_eye[0][2]
|
||||
) / 1000
|
||||
correction_y = (
|
||||
tomo_fit_xray_eye[1][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[1][1])
|
||||
+ tomo_fit_xray_eye[1][2]
|
||||
) / 1000
|
||||
|
||||
print(f"Xeye correction x {correction_x}, y {correction_y} for angle {angle}\n")
|
||||
return (correction_x, correction_y)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Additional lookup-table corrections (iteration 1 and 2)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def read_additional_correction(self, correction_file: str):
|
||||
self.corr_pos_x, self.corr_pos_y, self.corr_angle = self._read_correction_file_xy(
|
||||
correction_file
|
||||
)
|
||||
|
||||
def read_additional_correction_2(self, correction_file: str):
|
||||
self.corr_pos_x_2, self.corr_pos_y_2, self.corr_angle_2 = self._read_correction_file_xy(
|
||||
correction_file
|
||||
)
|
||||
|
||||
def _read_correction_file_xy(self, correction_file: str):
|
||||
"""Parse a correction file that contains corr_pos_x, corr_pos_y and corr_angle entries."""
|
||||
with open(correction_file, "r") as f:
|
||||
num_elements = f.readline()
|
||||
int_num_elements = int(num_elements.split(" ")[2])
|
||||
print(int_num_elements)
|
||||
corr_pos_x = []
|
||||
corr_pos_y = []
|
||||
corr_angle = []
|
||||
for j in range(0, int_num_elements * 3):
|
||||
line = f.readline()
|
||||
value = line.split(" ")[2]
|
||||
name = line.split(" ")[0].split("[")[0]
|
||||
if name == "corr_pos_x":
|
||||
corr_pos_x.append(float(value) / 1000)
|
||||
elif name == "corr_pos_y":
|
||||
corr_pos_y.append(float(value) / 1000)
|
||||
elif name == "corr_angle":
|
||||
corr_angle.append(float(value))
|
||||
return corr_pos_x, corr_pos_y, corr_angle
|
||||
|
||||
def compute_additional_correction(self, angle):
|
||||
return self._compute_correction_xy(
|
||||
angle, self.corr_pos_x, self.corr_pos_y, self.corr_angle, label="1"
|
||||
)
|
||||
|
||||
def compute_additional_correction_2(self, angle):
|
||||
return self._compute_correction_xy(
|
||||
angle, self.corr_pos_x_2, self.corr_pos_y_2, self.corr_angle_2, label="2"
|
||||
)
|
||||
|
||||
def _compute_correction_xy(self, angle, corr_pos_x, corr_pos_y, corr_angle, label=""):
|
||||
"""Find the correction for the closest angle in the lookup table."""
|
||||
if not corr_pos_x:
|
||||
print(f"Not applying additional correction {label}. No data available.\n")
|
||||
return (0, 0)
|
||||
|
||||
shift_x = corr_pos_x[0]
|
||||
shift_y = corr_pos_y[0]
|
||||
angledelta = np.fabs(corr_angle[0] - angle)
|
||||
|
||||
for j in range(1, len(corr_pos_x)):
|
||||
newangledelta = np.fabs(corr_angle[j] - angle)
|
||||
if newangledelta < angledelta:
|
||||
shift_x = corr_pos_x[j]
|
||||
shift_y = corr_pos_y[j]
|
||||
angledelta = newangledelta
|
||||
|
||||
if shift_x == 0 and angle < corr_angle[0]:
|
||||
shift_x = corr_pos_x[0]
|
||||
shift_y = corr_pos_y[0]
|
||||
|
||||
if shift_x == 0 and angle > corr_angle[-1]:
|
||||
shift_x = corr_pos_x[-1]
|
||||
shift_y = corr_pos_y[-1]
|
||||
|
||||
print(f"Additional correction shifts {label}: {shift_x} {shift_y}")
|
||||
return (shift_x, shift_y)
|
||||
211
csaxs_bec/bec_ipython_client/plugins/LamNI/extra_tomo.py
Normal file
211
csaxs_bec/bec_ipython_client/plugins/LamNI/extra_tomo.py
Normal file
@@ -0,0 +1,211 @@
|
||||
"""
|
||||
extra_tomo.py
|
||||
=============
|
||||
Specialist LamNI subclasses for specific experimental configurations.
|
||||
Import explicitly when needed, e.g.:
|
||||
|
||||
from csaxs_bec...extra_tomo import MagLamNI
|
||||
from csaxs_bec...extra_tomo import DataDrivenLamNI
|
||||
"""
|
||||
|
||||
import builtins
|
||||
import datetime
|
||||
import os
|
||||
import time
|
||||
|
||||
import h5py
|
||||
import numpy as np
|
||||
from bec_lib import bec_logger
|
||||
from bec_lib.alarm_handler import AlarmBase
|
||||
|
||||
from .lamni import LamNI
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
|
||||
class MagLamNI(LamNI):
|
||||
"""LamNI subclass for magnetic experiments (XMCD).
|
||||
|
||||
Adds a slow rotation helper and allows injection of a custom
|
||||
per-angle callback via the ``lamni_at_each_angle`` builtin.
|
||||
"""
|
||||
|
||||
def sub_tomo_scan(self, subtomo_number, start_angle=None):
|
||||
super().sub_tomo_scan(subtomo_number, start_angle)
|
||||
# self.rotate_slowly(0)
|
||||
|
||||
def rotate_slowly(self, angle, step_size=20):
|
||||
"""Rotate to target angle in small steps to avoid mechanical stress."""
|
||||
current_angle = dev.lsamrot.read(cached=True)["value"]
|
||||
steps = int(np.ceil(np.abs(current_angle - angle) / step_size)) + 1
|
||||
for target_angle in np.linspace(current_angle, angle, steps, endpoint=True):
|
||||
umv(dev.lsamrot, target_angle)
|
||||
scans.lamni_move_to_scan_center(
|
||||
self.align.tomo_fovx_offset, self.align.tomo_fovy_offset, target_angle
|
||||
)
|
||||
|
||||
def _at_each_angle(self, angle: float) -> None:
|
||||
if "lamni_at_each_angle" in builtins.__dict__:
|
||||
# pylint: disable=undefined-variable
|
||||
lamni_at_each_angle(self, angle)
|
||||
return
|
||||
|
||||
self.tomo_scan_projection(angle)
|
||||
self.tomo_reconstruct()
|
||||
|
||||
|
||||
class DataDrivenLamNI(LamNI):
|
||||
"""LamNI subclass that reads per-projection scan parameters from an HDF5 file.
|
||||
|
||||
Instead of a fixed FOV and step size for the whole tomogram, each
|
||||
projection can have individual values for step size, loptz position
|
||||
and lateral shifts, as specified in a datadriven_params.h5 file.
|
||||
"""
|
||||
|
||||
def __init__(self, client):
|
||||
super().__init__(client)
|
||||
self.tomo_data = {}
|
||||
|
||||
def tomo_scan(
|
||||
self,
|
||||
subtomo_start=1,
|
||||
start_index=None,
|
||||
fname="~/Data10/data_driven_config/datadriven_params.h5",
|
||||
):
|
||||
"""Start a data-driven tomo scan.
|
||||
|
||||
Args:
|
||||
subtomo_start (int): Unused; kept for API compatibility. Use start_index to resume.
|
||||
start_index (int, optional): Skip projections before this index. Defaults to None.
|
||||
fname (str): Path to the HDF5 parameter file. Defaults to the standard location.
|
||||
"""
|
||||
bec = builtins.__dict__.get("bec")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
fname = os.path.expanduser(fname)
|
||||
if not os.path.exists(fname):
|
||||
raise FileNotFoundError(f"Could not find datadriven params file in {fname}.")
|
||||
|
||||
content = f"Loading tomo parameters from {fname}."
|
||||
logger.warning(content)
|
||||
msg = bec.logbook.LogbookMessage()
|
||||
msg.add_text(content).add_tag(["Data_driven_file", "BEC"])
|
||||
self.client.logbook.send_logbook_message(msg)
|
||||
|
||||
self._update_tomo_data_from_file(fname)
|
||||
self._current_special_angles = self.special_angles.copy()
|
||||
|
||||
if subtomo_start == 1 and start_index is None:
|
||||
self.tomo_id = self.add_sample_database(
|
||||
self.sample_name,
|
||||
str(datetime.date.today()),
|
||||
bec.active_account.decode(),
|
||||
bec.queue.next_scan_number,
|
||||
"lamni",
|
||||
"test additional info",
|
||||
"BEC",
|
||||
)
|
||||
self.write_pdf_report()
|
||||
|
||||
with scans.dataset_id_on_hold:
|
||||
self.sub_tomo_data_driven(start_index)
|
||||
|
||||
def sub_tomo_scan(self, subtomo_number=None, start_angle=None):
|
||||
raise NotImplementedError(
|
||||
"Cannot run sub_tomo_scan with DataDrivenLamNI. "
|
||||
"Use lamni.tomo_scan(start_index=<N>) to resume instead."
|
||||
)
|
||||
|
||||
def _at_each_angle(
|
||||
self, angle=None, stepsize=None, loptz_pos=None, manual_shift_x=0, manual_shift_y=0
|
||||
):
|
||||
self.manual_shift_x = manual_shift_x
|
||||
self.manual_shift_y = manual_shift_y
|
||||
self.tomo_shellstep = stepsize
|
||||
if loptz_pos is not None:
|
||||
dev.rtx.controller.feedback_disable()
|
||||
umv(dev.loptz, loptz_pos)
|
||||
super()._at_each_angle(angle=angle)
|
||||
|
||||
def sub_tomo_data_driven(self, start_index=None):
|
||||
"""Iterate over all projections defined in the loaded HDF5 parameter file."""
|
||||
for scan_index, scan_data in enumerate(zip(*self.tomo_data.values())):
|
||||
if start_index and scan_index < start_index:
|
||||
continue
|
||||
(
|
||||
angle,
|
||||
stepsize,
|
||||
loptz_pos,
|
||||
propagation_distance,
|
||||
manual_shift_x,
|
||||
manual_shift_y,
|
||||
subtomo_number,
|
||||
) = scan_data
|
||||
bec.metadata.update(
|
||||
{key: float(val) for key, val in zip(self.tomo_data.keys(), scan_data)}
|
||||
)
|
||||
successful = False
|
||||
error_caught = False
|
||||
if 0 <= angle < 360.05:
|
||||
print(f"Starting LamNI scan for angle {angle}")
|
||||
while not successful:
|
||||
self._start_beam_check()
|
||||
if not self.special_angles:
|
||||
self._current_special_angles = []
|
||||
if self._current_special_angles:
|
||||
next_special_angle = self._current_special_angles[0]
|
||||
if np.isclose(angle, next_special_angle, atol=0.5):
|
||||
self._current_special_angles.pop(0)
|
||||
num_repeats = self.special_angle_repeats
|
||||
else:
|
||||
num_repeats = 1
|
||||
try:
|
||||
start_scan_number = bec.queue.next_scan_number
|
||||
for i in range(num_repeats):
|
||||
self._at_each_angle(
|
||||
float(angle),
|
||||
stepsize=float(stepsize),
|
||||
loptz_pos=float(loptz_pos),
|
||||
manual_shift_x=float(manual_shift_x),
|
||||
manual_shift_y=float(manual_shift_y),
|
||||
)
|
||||
error_caught = False
|
||||
except AlarmBase as exc:
|
||||
if exc.alarm_type == "TimeoutError":
|
||||
bec.queue.request_queue_reset()
|
||||
time.sleep(2)
|
||||
error_caught = True
|
||||
else:
|
||||
raise exc
|
||||
|
||||
if self._was_beam_okay() and not error_caught:
|
||||
successful = True
|
||||
else:
|
||||
self._wait_for_beamline_checks()
|
||||
|
||||
end_scan_number = bec.queue.next_scan_number
|
||||
for scan_nr in range(start_scan_number, end_scan_number):
|
||||
self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
|
||||
|
||||
def _update_tomo_data_from_file(self, fname: str) -> None:
|
||||
"""Load projection parameters from the HDF5 file into self.tomo_data."""
|
||||
with h5py.File(fname, "r") as file:
|
||||
self.tomo_data["theta"] = np.array([*file["theta"]]).flatten()
|
||||
self.tomo_data["stepsize"] = np.array([*file["stepsize"]]).flatten()
|
||||
self.tomo_data["loptz"] = np.array([*file["loptz"]]).flatten()
|
||||
self.tomo_data["propagation_distance"] = np.array(
|
||||
[*file["relative_propagation_distance"]]
|
||||
).flatten()
|
||||
self.tomo_data["manual_shift_x"] = np.array([*file["manual_shift_x"]]).flatten()
|
||||
self.tomo_data["manual_shift_y"] = np.array([*file["manual_shift_y"]]).flatten()
|
||||
self.tomo_data["subtomo_id"] = np.array([*file["subtomo_id"]]).flatten()
|
||||
|
||||
shapes = [data.shape for data in self.tomo_data.values()]
|
||||
if len(set(shapes)) > 1:
|
||||
raise ValueError(f"Tomo data file has entries of inconsistent lengths: {shapes}.")
|
||||
1013
csaxs_bec/bec_ipython_client/plugins/LamNI/lamni.py
Normal file
1013
csaxs_bec/bec_ipython_client/plugins/LamNI/lamni.py
Normal file
File diff suppressed because it is too large
Load Diff
@@ -6,39 +6,42 @@ from rich.console import Console
|
||||
from rich.table import Table
|
||||
|
||||
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_put, fshclose
|
||||
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
|
||||
|
||||
# import builtins to avoid linter errors
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
bec = builtins.__dict__.get("bec")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
|
||||
class LamNIInitError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class LaMNIInitStagesMixin:
|
||||
class LaMNIInitStages:
|
||||
"""Handles hardware initialization and referencing of LamNI stages."""
|
||||
|
||||
def __init__(self, client):
|
||||
super().__init__()
|
||||
self.client = client
|
||||
self.OMNYTools = OMNYTools(self.client)
|
||||
|
||||
def lamni_init_stages(self):
|
||||
user_input = input("Starting initialization of LamNI stages. OK? [y/n]")
|
||||
if user_input == "y":
|
||||
print("staring...")
|
||||
|
||||
if self.OMNYTools.yesno("Start initialization of LamNI stages. OK?"):
|
||||
print("starting...")
|
||||
dev.lsamrot.enabled = True
|
||||
else:
|
||||
return
|
||||
|
||||
if self.check_all_axes_of_lamni_referenced():
|
||||
user_input = input("Continue anyways? [y/n]")
|
||||
if user_input == "y":
|
||||
if self.OMNYTools.yesno("All axes are referenced. Continue anyways?"):
|
||||
print("ok then...")
|
||||
else:
|
||||
return
|
||||
|
||||
axis_id_lsamrot = dev.lsamrot._config["deviceConfig"].get("axis_Id")
|
||||
if dev.lsamrot.controller.get_motor_limit_switch(axis_id_lsamrot)[1] == False:
|
||||
user_input = input("The rotation stage will be moved to one limit [y/n]")
|
||||
if user_input == "y":
|
||||
if self.OMNYTools.yesno("The rotation stage will be moved to one limit"):
|
||||
print("starting...")
|
||||
else:
|
||||
return
|
||||
@@ -49,10 +52,9 @@ class LaMNIInitStagesMixin:
|
||||
print("The controller will be disabled in bec. To enable dev.lsamrot.enabled=True")
|
||||
return
|
||||
|
||||
user_input = input(
|
||||
"Init of loptz. Can the stage move to the upstream limit without collision?? [y/n]"
|
||||
)
|
||||
if user_input == "y":
|
||||
if self.OMNYTools.yesno(
|
||||
"Init of loptz. Can the stage move to the upstream limit without collision?"
|
||||
):
|
||||
print("ok then...")
|
||||
else:
|
||||
return
|
||||
@@ -77,14 +79,14 @@ class LaMNIInitStagesMixin:
|
||||
self.drive_axis_to_limit(dev.lsamy, "reverse")
|
||||
self.find_reference_mark(dev.lsamy)
|
||||
|
||||
# the dual encoder requires the reference mark to pass on both encoders
|
||||
print("Referencing lsamrot")
|
||||
self.drive_axis_to_limit(dev.lsamrot, "reverse")
|
||||
time.sleep(0.1)
|
||||
self.find_reference_mark(dev.lsamrot)
|
||||
|
||||
user_input = input("Init of leye. Can the stage move to -x limit without collision? [y/n]")
|
||||
if user_input == "y":
|
||||
if self.OMNYTools.yesno(
|
||||
"Init of leye. Can the stage move to -x limit without collision?"
|
||||
):
|
||||
print("starting...")
|
||||
else:
|
||||
return
|
||||
@@ -94,15 +96,6 @@ class LaMNIInitStagesMixin:
|
||||
print("Referencing leyey")
|
||||
self.drive_axis_to_limit(dev.leyey, "forward")
|
||||
|
||||
# set_lm lsamx 6 14
|
||||
# set_lm lsamy 6 14
|
||||
# set_lm lsamrot -3 362
|
||||
# set_lm loptx -1 -0.2
|
||||
# set_lm lopty 3.0 3.6
|
||||
# set_lm loptz 82 87
|
||||
# set_lm leyex 0 25
|
||||
# set_lm leyey 0.5 50
|
||||
|
||||
print("Init of Smaract stages")
|
||||
dev.losax.controller.find_reference_mark(2, 0, 1000, 1)
|
||||
time.sleep(1)
|
||||
@@ -110,15 +103,6 @@ class LaMNIInitStagesMixin:
|
||||
time.sleep(1)
|
||||
dev.losax.controller.find_reference_mark(1, 0, 1000, 1)
|
||||
time.sleep(1)
|
||||
# dev.losax.controller.find_reference_mark(3, 1, 1000, 1)
|
||||
# time.sleep(1)
|
||||
# dev.losax.controller.find_reference_mark(4, 1, 1000, 1)
|
||||
# time.sleep(1)
|
||||
|
||||
# set_lm losax -1.5 0.25
|
||||
# set_lm losay -2.5 4.1
|
||||
# set_lm losaz -4.1 -0.5
|
||||
# set_lm lcsy -1.5 5
|
||||
|
||||
self._align_setup()
|
||||
|
||||
@@ -136,8 +120,7 @@ class LaMNIInitStagesMixin:
|
||||
return ord(axis_id.lower()) - 97
|
||||
|
||||
def _align_setup(self):
|
||||
user_input = input("Start moving stages to default initial positions? [y/n]")
|
||||
if user_input == "y":
|
||||
if self.OMNYTools.yesno("Start moving stages to default initial positions?"):
|
||||
print("Start moving stages...")
|
||||
else:
|
||||
print("Stopping.")
|
||||
@@ -176,6 +159,8 @@ class LaMNIInitStagesMixin:
|
||||
|
||||
|
||||
class LamNIOpticsMixin:
|
||||
"""Optics movement methods: FZP, OSA, central stop and X-ray eye."""
|
||||
|
||||
@staticmethod
|
||||
def _get_user_param_safe(device, var):
|
||||
param = dev[device].user_parameter
|
||||
@@ -190,13 +175,11 @@ class LamNIOpticsMixin:
|
||||
umv(dev.leyey, leyey_out)
|
||||
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 2)
|
||||
# move rotation stage to zero to avoid problems with wires
|
||||
umv(dev.lsamrot, 0)
|
||||
umv(dev.dttrz, 5854, dev.fttrz, 2395)
|
||||
|
||||
def leye_in(self):
|
||||
bec.queue.next_dataset_number += 1
|
||||
# move rotation stage to zero to avoid problems with wires
|
||||
umv(dev.lsamrot, 0)
|
||||
umv(dev.dttrz, 6419.677, dev.fttrz, 2959.979)
|
||||
while True:
|
||||
@@ -213,15 +196,10 @@ class LamNIOpticsMixin:
|
||||
def _lfzp_in(self):
|
||||
loptx_in = self._get_user_param_safe("loptx", "in")
|
||||
lopty_in = self._get_user_param_safe("lopty", "in")
|
||||
umv(
|
||||
dev.loptx, loptx_in, dev.lopty, lopty_in
|
||||
) # for 7.2567 keV and 150 mu, 60 nm fzp, loptz 83.6000 for propagation 1.4 mm
|
||||
umv(dev.loptx, loptx_in, dev.lopty, lopty_in)
|
||||
|
||||
def lfzp_in(self):
|
||||
"""
|
||||
move in the lamni zone plate.
|
||||
This will disable rt feedback, move the FZP and re-enabled the feedback.
|
||||
"""
|
||||
"""Move in the LamNI zone plate, disabling/re-enabling RT feedback around the move."""
|
||||
if "rtx" in dev and dev.rtx.enabled:
|
||||
dev.rtx.controller.feedback_disable()
|
||||
|
||||
@@ -231,18 +209,15 @@ class LamNIOpticsMixin:
|
||||
dev.rtx.controller.feedback_enable_with_reset()
|
||||
|
||||
def loptics_in(self):
|
||||
"""
|
||||
Move in the lamni optics, including the FZP and the OSA.
|
||||
"""
|
||||
"""Move in the LamNI optics (FZP + OSA)."""
|
||||
self.lfzp_in()
|
||||
self.losa_in()
|
||||
|
||||
def loptics_out(self):
|
||||
"""Move out the lamni optics"""
|
||||
"""Move out the LamNI optics."""
|
||||
if "rtx" in dev and dev.rtx.enabled:
|
||||
dev.rtx.controller.feedback_disable()
|
||||
|
||||
# self.lcs_out()
|
||||
self.losa_out()
|
||||
loptx_out = self._get_user_param_safe("loptx", "out")
|
||||
lopty_out = self._get_user_param_safe("lopty", "out")
|
||||
@@ -253,28 +228,17 @@ class LamNIOpticsMixin:
|
||||
dev.rtx.controller.feedback_enable_with_reset()
|
||||
|
||||
def lcs_in(self):
|
||||
# umv lcsx -1.852 lcsy -0.095
|
||||
pass
|
||||
|
||||
def lcs_out(self):
|
||||
umv(dev.lcsy, 3)
|
||||
|
||||
def losa_in(self):
|
||||
# 6.2 keV, 170 um FZP
|
||||
# umv(dev.losax, -1.4450000, dev.losay, -0.1800)
|
||||
# umv(dev.losaz, -1)
|
||||
# 6.7, 170
|
||||
# umv(dev.losax, -1.4850, dev.losay, -0.1930)
|
||||
# umv(dev.losaz, 1.0000)
|
||||
# 7.2, 150
|
||||
losax_in = self._get_user_param_safe("losax", "in")
|
||||
losay_in = self._get_user_param_safe("losay", "in")
|
||||
losaz_in = self._get_user_param_safe("losaz", "in")
|
||||
umv(dev.losax, losax_in, dev.losay, losay_in)
|
||||
umv(dev.losaz, losaz_in)
|
||||
# 11 kev
|
||||
# umv(dev.losax, -1.161000, dev.losay, -0.196)
|
||||
# umv(dev.losaz, 1.0000)
|
||||
|
||||
def losa_out(self):
|
||||
losay_out = self._get_user_param_safe("losay", "out")
|
||||
@@ -283,11 +247,10 @@ class LamNIOpticsMixin:
|
||||
umv(dev.losay, losay_out)
|
||||
|
||||
def lfzp_info(self, mokev_val=-1):
|
||||
|
||||
if mokev_val == -1:
|
||||
try:
|
||||
mokev_val = dev.mokev.readback.get()
|
||||
except:
|
||||
except Exception:
|
||||
print(
|
||||
"Device mokev does not exist. You can specify the energy in keV as an argument instead."
|
||||
)
|
||||
@@ -322,10 +285,6 @@ class LamNIOpticsMixin:
|
||||
)
|
||||
|
||||
console.print(table)
|
||||
|
||||
print("OSA Information:")
|
||||
# print(f"Current losaz %.1f\n", A[losaz])
|
||||
# print("The OSA will collide with the sample plane at %.1f\n\n", 89.3-A[loptz])
|
||||
print(
|
||||
"The numbers presented here are for a sample in the plane of the lamni sample holder.\n"
|
||||
)
|
||||
|
||||
@@ -1,22 +0,0 @@
|
||||
def lamni_read_additional_correction():
|
||||
# "additional_correction_shift"
|
||||
# [0][] x , [1][] y, [2][] angle, [3][0] number of elements
|
||||
|
||||
with open("correction_lamni_um_S01405_.txt", "r") as f:
|
||||
num_elements = f.readline()
|
||||
int_num_elements = int(num_elements.split(" ")[2])
|
||||
print(int_num_elements)
|
||||
corr_pos_x = []
|
||||
corr_pos_y = []
|
||||
corr_angle = []
|
||||
for j in range(0, int_num_elements * 3):
|
||||
line = f.readline()
|
||||
value = line.split(" ")[2]
|
||||
name = line.split(" ")[0].split("[")[0]
|
||||
if name == "corr_pos_x":
|
||||
corr_pos_x.append(value)
|
||||
elif name == "corr_pos_y":
|
||||
corr_pos_y.append(value)
|
||||
elif name == "corr_angle":
|
||||
corr_angle.append(value)
|
||||
return (corr_pos_x, corr_pos_y, corr_angle, num_elements)
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,443 +0,0 @@
|
||||
"""
|
||||
csaxs_dlpca200.py
|
||||
=================
|
||||
BEC control script for FEMTO DLPCA-200 Variable Gain Low Noise Current Amplifiers
|
||||
connected to Galil RIO digital outputs.
|
||||
|
||||
DLPCA-200 Remote Control (datasheet page 4)
|
||||
-------------------------------------------
|
||||
Sub-D pin → function:
|
||||
Pin 10 → gain LSB (digital out channel, index 0 in bit-tuple)
|
||||
Pin 11 → gain MID (digital out channel, index 1 in bit-tuple)
|
||||
Pin 12 → gain MSB (digital out channel, index 2 in bit-tuple)
|
||||
Pin 13 → coupling LOW = AC, HIGH = DC
|
||||
Pin 14 → speed mode HIGH = low noise (Pin14=1), LOW = high speed (Pin14=0)
|
||||
|
||||
Gain truth table (MSB, MID, LSB):
|
||||
0,0,0 → low-noise: 1e3 high-speed: 1e5
|
||||
0,0,1 → low-noise: 1e4 high-speed: 1e6
|
||||
0,1,0 → low-noise: 1e5 high-speed: 1e7
|
||||
0,1,1 → low-noise: 1e6 high-speed: 1e8
|
||||
1,0,0 → low-noise: 1e7 high-speed: 1e9
|
||||
1,0,1 → low-noise: 1e8 high-speed: 1e10
|
||||
1,1,0 → low-noise: 1e9 high-speed: 1e11
|
||||
|
||||
Strategy: prefer low-noise mode (1e3–1e9). For 1e10 and 1e11,
|
||||
automatically fall back to high-speed mode.
|
||||
|
||||
Device wiring example (galilrioesxbox):
|
||||
bpm4: Pin10→ch0, Pin11→ch1, Pin12→ch2, Pin13→ch3, Pin14→ch4
|
||||
bim: Pin10→ch6, Pin11→ch7, Pin12→ch8, Pin13→ch9, Pin14→ch10
|
||||
|
||||
Usage examples
|
||||
--------------
|
||||
csaxs_amp = cSAXSDLPCA200(client)
|
||||
|
||||
csaxs_amp.set_gain("bpm4", 1e7) # low-noise if possible
|
||||
csaxs_amp.set_gain("bim", 1e10) # auto high-speed
|
||||
csaxs_amp.set_coupling("bpm4", "DC")
|
||||
csaxs_amp.set_coupling("bim", "AC")
|
||||
csaxs_amp.info("bpm4") # print current settings
|
||||
csaxs_amp.info_all() # print all configured amplifiers
|
||||
"""
|
||||
|
||||
import builtins
|
||||
|
||||
from bec_lib import bec_logger
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Amplifier registry
|
||||
# ---------------------------------------------------------------------------
|
||||
# Each entry describes one DLPCA-200 amplifier connected to a Galil RIO.
|
||||
#
|
||||
# Keys inside "channels":
|
||||
# gain_lsb → digital output channel number wired to DLPCA-200 Pin 10
|
||||
# gain_mid → digital output channel number wired to DLPCA-200 Pin 11
|
||||
# gain_msb → digital output channel number wired to DLPCA-200 Pin 12
|
||||
# coupling → digital output channel number wired to DLPCA-200 Pin 13
|
||||
# speed_mode → digital output channel number wired to DLPCA-200 Pin 14
|
||||
#
|
||||
# To add a new amplifier, simply extend this dict.
|
||||
# ---------------------------------------------------------------------------
|
||||
DLPCA200_AMPLIFIER_CONFIG: dict[str, dict] = {
|
||||
"bpm4": {
|
||||
"rio_device": "galilrioesxbox",
|
||||
"description": "Beam Position Monitor 4 current amplifier",
|
||||
"channels": {
|
||||
"gain_lsb": 0, # Pin 10 → Galil ch0
|
||||
"gain_mid": 1, # Pin 11 → Galil ch1
|
||||
"gain_msb": 2, # Pin 12 → Galil ch2
|
||||
"coupling": 3, # Pin 13 → Galil ch3
|
||||
"speed_mode": 4, # Pin 14 → Galil ch4
|
||||
},
|
||||
},
|
||||
"bim": {
|
||||
"rio_device": "galilrioesxbox",
|
||||
"description": "Beam Intensity Monitor current amplifier",
|
||||
"channels": {
|
||||
"gain_lsb": 6, # Pin 10 → Galil ch6
|
||||
"gain_mid": 7, # Pin 11 → Galil ch7
|
||||
"gain_msb": 8, # Pin 12 → Galil ch8
|
||||
"coupling": 9, # Pin 13 → Galil ch9
|
||||
"speed_mode": 10, # Pin 14 → Galil ch10
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DLPCA-200 gain encoding tables
|
||||
# ---------------------------------------------------------------------------
|
||||
# (msb, mid, lsb) → gain in V/A
|
||||
_GAIN_BITS_LOW_NOISE: dict[tuple, int] = {
|
||||
(0, 0, 0): int(1e3),
|
||||
(0, 0, 1): int(1e4),
|
||||
(0, 1, 0): int(1e5),
|
||||
(0, 1, 1): int(1e6),
|
||||
(1, 0, 0): int(1e7),
|
||||
(1, 0, 1): int(1e8),
|
||||
(1, 1, 0): int(1e9),
|
||||
}
|
||||
|
||||
_GAIN_BITS_HIGH_SPEED: dict[tuple, int] = {
|
||||
(0, 0, 0): int(1e5),
|
||||
(0, 0, 1): int(1e6),
|
||||
(0, 1, 0): int(1e7),
|
||||
(0, 1, 1): int(1e8),
|
||||
(1, 0, 0): int(1e9),
|
||||
(1, 0, 1): int(1e10),
|
||||
(1, 1, 0): int(1e11),
|
||||
}
|
||||
|
||||
# Inverse maps: gain → (msb, mid, lsb, low_noise_flag)
|
||||
# low_noise_flag: True = Pin14 HIGH, False = Pin14 LOW
|
||||
_GAIN_TO_BITS: dict[int, tuple] = {}
|
||||
for _bits, _gain in _GAIN_BITS_LOW_NOISE.items():
|
||||
_GAIN_TO_BITS[_gain] = (*_bits, True)
|
||||
for _bits, _gain in _GAIN_BITS_HIGH_SPEED.items():
|
||||
if _gain not in _GAIN_TO_BITS: # low-noise takes priority
|
||||
_GAIN_TO_BITS[_gain] = (*_bits, False)
|
||||
|
||||
VALID_GAINS = sorted(_GAIN_TO_BITS.keys())
|
||||
|
||||
|
||||
class cSAXSDLPCA200Error(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class cSAXSDLPCA200:
|
||||
"""
|
||||
Control class for FEMTO DLPCA-200 current amplifiers connected via Galil RIO
|
||||
digital outputs in a BEC environment.
|
||||
|
||||
Supports:
|
||||
- Forward control: set_gain(), set_coupling()
|
||||
- Readback reporting: info(), info_all(), read_settings()
|
||||
- Robust error handling and logging following cSAXS conventions.
|
||||
"""
|
||||
|
||||
TAG = "[DLPCA200]"
|
||||
|
||||
def __init__(self, client, config: dict | None = None) -> None:
|
||||
"""
|
||||
Parameters
|
||||
----------
|
||||
client : BEC client object (passed through for future use)
|
||||
config : optional override for DLPCA200_AMPLIFIER_CONFIG.
|
||||
Falls back to the module-level dict if not provided.
|
||||
"""
|
||||
self.client = client
|
||||
self._config: dict[str, dict] = config if config is not None else DLPCA200_AMPLIFIER_CONFIG
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _require_dev(self) -> None:
|
||||
if dev is None:
|
||||
raise cSAXSDLPCA200Error(
|
||||
f"{self.TAG} BEC 'dev' namespace is not available in this session."
|
||||
)
|
||||
|
||||
def _get_cfg(self, amp_name: str) -> dict:
|
||||
"""Return config dict for a named amplifier, raising on unknown names."""
|
||||
if amp_name not in self._config:
|
||||
known = ", ".join(sorted(self._config.keys()))
|
||||
raise cSAXSDLPCA200Error(
|
||||
f"{self.TAG} Unknown amplifier '{amp_name}'. Known: [{known}]"
|
||||
)
|
||||
return self._config[amp_name]
|
||||
|
||||
def _get_rio(self, amp_name: str):
|
||||
"""Return the live RIO device object for a given amplifier."""
|
||||
self._require_dev()
|
||||
cfg = self._get_cfg(amp_name)
|
||||
rio_name = cfg["rio_device"]
|
||||
try:
|
||||
rio = getattr(dev, rio_name)
|
||||
except AttributeError:
|
||||
raise cSAXSDLPCA200Error(
|
||||
f"{self.TAG} RIO device '{rio_name}' not found in BEC 'dev'."
|
||||
)
|
||||
return rio
|
||||
|
||||
def _dout_get(self, rio, ch: int) -> int:
|
||||
"""Read one digital output channel (returns 0 or 1)."""
|
||||
attr = getattr(rio.digital_out, f"ch{ch}")
|
||||
val = attr.get()
|
||||
return int(val)
|
||||
|
||||
def _dout_set(self, rio, ch: int, value: bool) -> None:
|
||||
"""Write one digital output channel (True=HIGH=1, False=LOW=0)."""
|
||||
attr = getattr(rio.digital_out, f"ch{ch}")
|
||||
attr.set(value)
|
||||
|
||||
def _read_gain_bits(self, amp_name: str) -> tuple[int, int, int, int]:
|
||||
"""
|
||||
Read current gain bit-state from hardware.
|
||||
|
||||
Returns
|
||||
-------
|
||||
(msb, mid, lsb, speed_mode)
|
||||
speed_mode: 1 = low-noise (Pin14=HIGH), 0 = high-speed (Pin14=LOW)
|
||||
"""
|
||||
rio = self._get_rio(amp_name)
|
||||
ch = self._get_cfg(amp_name)["channels"]
|
||||
msb = self._dout_get(rio, ch["gain_msb"])
|
||||
mid = self._dout_get(rio, ch["gain_mid"])
|
||||
lsb = self._dout_get(rio, ch["gain_lsb"])
|
||||
speed_mode = self._dout_get(rio, ch["speed_mode"])
|
||||
return msb, mid, lsb, speed_mode
|
||||
|
||||
def _decode_gain(self, msb: int, mid: int, lsb: int, speed_mode: int) -> int | None:
|
||||
"""
|
||||
Decode hardware bit-state into gain value (V/A).
|
||||
|
||||
speed_mode=1 → low-noise table, speed_mode=0 → high-speed table.
|
||||
Returns None if the bit combination is not in the table.
|
||||
"""
|
||||
bits = (msb, mid, lsb)
|
||||
if speed_mode:
|
||||
return _GAIN_BITS_LOW_NOISE.get(bits)
|
||||
else:
|
||||
return _GAIN_BITS_HIGH_SPEED.get(bits)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Public API – control
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def set_gain(self, amp_name: str, gain: float, force_high_speed: bool = False) -> None:
|
||||
"""
|
||||
Set the transimpedance gain of a DLPCA-200 amplifier.
|
||||
|
||||
The method automatically selects low-noise mode (Pin14=HIGH) whenever
|
||||
the requested gain is achievable in low-noise mode (1e3 – 1e9 V/A).
|
||||
For gains of 1e10 and 1e11 V/A, high-speed mode is used automatically.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
amp_name : str
|
||||
Amplifier name as defined in DLPCA200_AMPLIFIER_CONFIG (e.g. "bpm4").
|
||||
gain : float or int
|
||||
Target gain in V/A. Must be one of:
|
||||
1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11.
|
||||
force_high_speed : bool, optional
|
||||
If True, force high-speed (low-noise=False) mode even for gains
|
||||
below 1e10. Default: False (prefer low-noise).
|
||||
|
||||
Examples
|
||||
--------
|
||||
csaxs_amp.set_gain("bpm4", 1e7) # low-noise mode (automatic)
|
||||
csaxs_amp.set_gain("bim", 1e10) # high-speed mode (automatic)
|
||||
csaxs_amp.set_gain("bpm4", 1e7, force_high_speed=True) # override to high-speed
|
||||
"""
|
||||
gain_int = int(gain)
|
||||
if gain_int not in _GAIN_TO_BITS:
|
||||
valid_str = ", ".join(f"1e{int(round(__import__('math').log10(g)))}" for g in VALID_GAINS)
|
||||
raise cSAXSDLPCA200Error(
|
||||
f"{self.TAG} Invalid gain {gain:.2e} V/A for '{amp_name}'. "
|
||||
f"Valid values: {valid_str}"
|
||||
)
|
||||
|
||||
msb, mid, lsb, low_noise_preferred = _GAIN_TO_BITS[gain_int]
|
||||
|
||||
# Apply force_high_speed override
|
||||
if force_high_speed and low_noise_preferred:
|
||||
# Check if this gain is achievable in high-speed mode
|
||||
hs_entry = next(
|
||||
(bits for bits, g in _GAIN_BITS_HIGH_SPEED.items() if g == gain_int), None
|
||||
)
|
||||
if hs_entry is None:
|
||||
raise cSAXSDLPCA200Error(
|
||||
f"{self.TAG} Gain {gain:.2e} V/A is not achievable in high-speed mode "
|
||||
f"for '{amp_name}'."
|
||||
)
|
||||
msb, mid, lsb = hs_entry
|
||||
low_noise_preferred = False
|
||||
|
||||
use_low_noise = low_noise_preferred and not force_high_speed
|
||||
|
||||
try:
|
||||
rio = self._get_rio(amp_name)
|
||||
ch = self._get_cfg(amp_name)["channels"]
|
||||
|
||||
self._dout_set(rio, ch["gain_msb"], bool(msb))
|
||||
self._dout_set(rio, ch["gain_mid"], bool(mid))
|
||||
self._dout_set(rio, ch["gain_lsb"], bool(lsb))
|
||||
self._dout_set(rio, ch["speed_mode"], use_low_noise) # True=low-noise
|
||||
|
||||
mode_str = "low-noise" if use_low_noise else "high-speed"
|
||||
logger.info(
|
||||
f"{self.TAG} [{amp_name}] gain set to {gain_int:.2e} V/A "
|
||||
f"({mode_str} mode, bits MSB={msb} MID={mid} LSB={lsb})"
|
||||
)
|
||||
print(
|
||||
f"{amp_name}: gain → {gain_int:.2e} V/A [{mode_str}] "
|
||||
f"(bits: MSB={msb} MID={mid} LSB={lsb})"
|
||||
)
|
||||
|
||||
except cSAXSDLPCA200Error:
|
||||
raise
|
||||
except Exception as exc:
|
||||
raise cSAXSDLPCA200Error(
|
||||
f"{self.TAG} Failed to set gain on '{amp_name}': {exc}"
|
||||
) from exc
|
||||
|
||||
def set_coupling(self, amp_name: str, coupling: str) -> None:
|
||||
"""
|
||||
Set AC or DC coupling on a DLPCA-200 amplifier.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
amp_name : str
|
||||
Amplifier name (e.g. "bpm4", "bim").
|
||||
coupling : str
|
||||
"AC" or "DC" (case-insensitive).
|
||||
DC → Pin13 HIGH, AC → Pin13 LOW.
|
||||
|
||||
Examples
|
||||
--------
|
||||
csaxs_amp.set_coupling("bpm4", "DC")
|
||||
csaxs_amp.set_coupling("bim", "AC")
|
||||
"""
|
||||
coupling_upper = coupling.strip().upper()
|
||||
if coupling_upper not in ("AC", "DC"):
|
||||
raise cSAXSDLPCA200Error(
|
||||
f"{self.TAG} Invalid coupling '{coupling}' for '{amp_name}'. "
|
||||
f"Use 'AC' or 'DC'."
|
||||
)
|
||||
|
||||
pin13_high = coupling_upper == "DC"
|
||||
|
||||
try:
|
||||
rio = self._get_rio(amp_name)
|
||||
ch = self._get_cfg(amp_name)["channels"]
|
||||
self._dout_set(rio, ch["coupling"], pin13_high)
|
||||
|
||||
logger.info(f"{self.TAG} [{amp_name}] coupling set to {coupling_upper}")
|
||||
print(f"{amp_name}: coupling → {coupling_upper}")
|
||||
|
||||
except cSAXSDLPCA200Error:
|
||||
raise
|
||||
except Exception as exc:
|
||||
raise cSAXSDLPCA200Error(
|
||||
f"{self.TAG} Failed to set coupling on '{amp_name}': {exc}"
|
||||
) from exc
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Public API – readback / reporting
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def read_settings(self, amp_name: str) -> dict:
|
||||
"""
|
||||
Read back the current settings from hardware digital outputs.
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict with keys:
|
||||
"amp_name" : str
|
||||
"gain" : int or None – gain in V/A (None if unknown bit pattern)
|
||||
"mode" : str – "low-noise" or "high-speed"
|
||||
"coupling" : str – "AC" or "DC"
|
||||
"bits" : dict – raw bit values {msb, mid, lsb, speed_mode, coupling}
|
||||
"""
|
||||
rio = self._get_rio(amp_name)
|
||||
ch = self._get_cfg(amp_name)["channels"]
|
||||
|
||||
msb = self._dout_get(rio, ch["gain_msb"])
|
||||
mid = self._dout_get(rio, ch["gain_mid"])
|
||||
lsb = self._dout_get(rio, ch["gain_lsb"])
|
||||
speed_mode = self._dout_get(rio, ch["speed_mode"])
|
||||
coupling_bit = self._dout_get(rio, ch["coupling"])
|
||||
|
||||
gain = self._decode_gain(msb, mid, lsb, speed_mode)
|
||||
mode = "low-noise" if speed_mode else "high-speed"
|
||||
coupling = "DC" if coupling_bit else "AC"
|
||||
|
||||
return {
|
||||
"amp_name": amp_name,
|
||||
"gain": gain,
|
||||
"mode": mode,
|
||||
"coupling": coupling,
|
||||
"bits": {
|
||||
"msb": msb,
|
||||
"mid": mid,
|
||||
"lsb": lsb,
|
||||
"speed_mode": speed_mode,
|
||||
"coupling": coupling_bit,
|
||||
},
|
||||
}
|
||||
|
||||
def info(self, amp_name: str) -> None:
|
||||
"""
|
||||
Print a plain summary of the current settings for one amplifier.
|
||||
|
||||
Example output
|
||||
--------------
|
||||
Amplifier : bpm4
|
||||
Description : Beam Position Monitor 4 current amplifier
|
||||
RIO device : galilrioesxbox
|
||||
Gain : 1.00e+07 V/A
|
||||
Mode : low-noise
|
||||
Coupling : DC
|
||||
Raw bits : MSB=1 MID=0 LSB=0 speed=1 coup=1
|
||||
"""
|
||||
cfg = self._get_cfg(amp_name)
|
||||
|
||||
try:
|
||||
s = self.read_settings(amp_name)
|
||||
except Exception as exc:
|
||||
print(f"{self.TAG} [{amp_name}] Could not read settings: {exc}")
|
||||
return
|
||||
|
||||
gain_str = (
|
||||
f"{s['gain']:.2e} V/A" if s["gain"] is not None else "UNKNOWN (invalid bit pattern)"
|
||||
)
|
||||
bits = s["bits"]
|
||||
|
||||
print(f" {'Amplifier':<12}: {amp_name}")
|
||||
print(f" {'Description':<12}: {cfg.get('description', '')}")
|
||||
print(f" {'RIO device':<12}: {cfg['rio_device']}")
|
||||
print(f" {'Gain':<12}: {gain_str}")
|
||||
print(f" {'Mode':<12}: {s['mode']}")
|
||||
print(f" {'Coupling':<12}: {s['coupling']}")
|
||||
print(f" {'Raw bits':<12}: MSB={bits['msb']} MID={bits['mid']} LSB={bits['lsb']} speed={bits['speed_mode']} coup={bits['coupling']}")
|
||||
|
||||
def info_all(self) -> None:
|
||||
"""
|
||||
Print a plain summary for ALL configured amplifiers.
|
||||
"""
|
||||
print("\nDLPCA-200 Amplifier Status Report")
|
||||
print("-" * 40)
|
||||
for amp_name in sorted(self._config.keys()):
|
||||
self.info(amp_name)
|
||||
print()
|
||||
|
||||
def list_amplifiers(self) -> list[str]:
|
||||
"""Return sorted list of configured amplifier names."""
|
||||
return sorted(self._config.keys())
|
||||
@@ -41,10 +41,8 @@ import builtins
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
|
||||
class cSAXSFilterTransmission:
|
||||
"""
|
||||
|
||||
@@ -8,14 +8,11 @@ from bec_lib import bec_logger
|
||||
logger = bec_logger.logger
|
||||
|
||||
# Pull BEC globals if present
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
|
||||
|
||||
class cSAXSInitSmaractStagesError(Exception):
|
||||
pass
|
||||
@@ -386,6 +383,7 @@ class cSAXSInitSmaractStages:
|
||||
if not self._yesno("Proceed with the motions listed above?", "y"):
|
||||
logger.info("[cSAXS] Motion to initial position aborted by user.")
|
||||
return
|
||||
|
||||
# --- Execution phase (SIMULTANEOUS MOTION) ---
|
||||
if umv is None:
|
||||
logger.error("[cSAXS] 'umv' is not available in this session.")
|
||||
|
||||
@@ -22,10 +22,8 @@ logger = bec_logger.logger
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
|
||||
|
||||
class FlomniToolsError(Exception):
|
||||
@@ -37,32 +35,32 @@ class FlomniInitError(Exception):
|
||||
class FlomniError(Exception):
|
||||
pass
|
||||
|
||||
class FlomniTools:
|
||||
def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
|
||||
if autoconfirm and default == "y":
|
||||
self.printgreen(message + " Automatically confirming default: yes")
|
||||
return True
|
||||
elif autoconfirm and default == "n":
|
||||
self.printgreen(message + " Automatically confirming default: no")
|
||||
return False
|
||||
if default == "y":
|
||||
message_ending = " [Y]/n? "
|
||||
elif default == "n":
|
||||
message_ending = " y/[N]? "
|
||||
else:
|
||||
message_ending = " y/n? "
|
||||
while True:
|
||||
user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
|
||||
if (
|
||||
user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
|
||||
) or (default == "y" and user_input == ""):
|
||||
return True
|
||||
if (
|
||||
user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
|
||||
) or (default == "n" and user_input == ""):
|
||||
return False
|
||||
else:
|
||||
print("Please expicitely confirm y or n.")
|
||||
# class FlomniTools:
|
||||
# def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
|
||||
# if autoconfirm and default == "y":
|
||||
# self.printgreen(message + " Automatically confirming default: yes")
|
||||
# return True
|
||||
# elif autoconfirm and default == "n":
|
||||
# self.printgreen(message + " Automatically confirming default: no")
|
||||
# return False
|
||||
# if default == "y":
|
||||
# message_ending = " [Y]/n? "
|
||||
# elif default == "n":
|
||||
# message_ending = " y/[N]? "
|
||||
# else:
|
||||
# message_ending = " y/n? "
|
||||
# while True:
|
||||
# user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
|
||||
# if (
|
||||
# user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
|
||||
# ) or (default == "y" and user_input == ""):
|
||||
# return True
|
||||
# if (
|
||||
# user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
|
||||
# ) or (default == "n" and user_input == ""):
|
||||
# return False
|
||||
# else:
|
||||
# print("Please expicitely confirm y or n.")
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -7,10 +7,8 @@ from bec_widgets.cli.client import BECDockArea
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
|
||||
|
||||
class flomniGuiToolsError(Exception):
|
||||
|
||||
@@ -13,10 +13,8 @@ logger = bec_logger.logger
|
||||
# import builtins to avoid linter errors
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_ipython_client.plugins.flomni import Flomni
|
||||
|
||||
@@ -7,10 +7,8 @@ from bec_widgets.cli.client import BECDockArea
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
|
||||
|
||||
class OMNYGuiToolsError(Exception):
|
||||
|
||||
@@ -27,10 +27,9 @@ logger = bec_logger.logger
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
|
||||
class OMNYInitError(Exception):
|
||||
pass
|
||||
|
||||
@@ -16,10 +16,8 @@ from rich.table import Table
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
|
||||
|
||||
class OMNYToolsError(Exception):
|
||||
|
||||
@@ -16,10 +16,8 @@ from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fsh
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
|
||||
|
||||
class OMNYTransferError(Exception):
|
||||
|
||||
@@ -13,10 +13,8 @@ logger = bec_logger.logger
|
||||
# import builtins to avoid linter errors
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_ipython_client.plugins.omny import OMNY
|
||||
|
||||
@@ -30,74 +30,29 @@ logger = bec_logger.logger
|
||||
|
||||
logger.info("Using the cSAXS startup script.")
|
||||
|
||||
# pylint: disable=import-error
|
||||
_args = _main_dict["args"]
|
||||
|
||||
_session_name = "cSAXS"
|
||||
if _args.session.lower() == "lamni":
|
||||
from csaxs_bec.bec_ipython_client.plugins.cSAXS import *
|
||||
from csaxs_bec.bec_ipython_client.plugins.LamNI import *
|
||||
|
||||
_session_name = "LamNI"
|
||||
lamni = LamNI(bec)
|
||||
logger.success("LamNI session loaded.")
|
||||
|
||||
elif _args.session.lower() == "csaxs":
|
||||
print("Loading cSAXS session")
|
||||
from csaxs_bec.bec_ipython_client.plugins.cSAXS import *
|
||||
|
||||
logger.success("cSAXS session loaded.")
|
||||
|
||||
from csaxs_bec.bec_ipython_client.plugins.tool_box.debug_tools import DebugTools
|
||||
|
||||
debug = DebugTools()
|
||||
logger.success("Debug tools loaded. Use 'debug' to access them.")
|
||||
|
||||
# pylint: disable=import-error
|
||||
_args = _main_dict["args"]
|
||||
|
||||
_session_name = "cSAXS"
|
||||
|
||||
print("Loading cSAXS session")
|
||||
from csaxs_bec.bec_ipython_client.plugins.cSAXS.cSAXS import cSAXS
|
||||
csaxs = cSAXS(bec)
|
||||
logger.success("cSAXS session loaded.")
|
||||
|
||||
|
||||
if _args.session.lower() == "lamni":
|
||||
from csaxs_bec.bec_ipython_client.plugins.LamNI import LamNI
|
||||
|
||||
_session_name = "LamNI"
|
||||
lamni = LamNI(bec)
|
||||
logger.success("LamNI session loaded.")
|
||||
print(r"""
|
||||
██████╗ ███████╗ ██████╗ ██╗ █████╗ ███╗ ███╗███╗ ██╗██╗
|
||||
██╔══██╗██╔════╝██╔════╝ ██║ ██╔══██╗████╗ ████║████╗ ██║██║
|
||||
██████╔╝█████╗ ██║ ██║ ███████║██╔████╔██║██╔██╗ ██║██║
|
||||
██╔══██╗██╔══╝ ██║ ██║ ██╔══██║██║╚██╔╝██║██║╚██╗██║██║
|
||||
██████╔╝███████╗╚██████╗ ███████╗██║ ██║██║ ╚═╝ ██║██║ ╚████║██║
|
||||
╚═════╝ ╚══════╝ ╚═════╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝
|
||||
|
||||
B E C L a m N I
|
||||
""")
|
||||
|
||||
elif _args.session.lower() == "omny":
|
||||
from csaxs_bec.bec_ipython_client.plugins.flomni import OMNY
|
||||
|
||||
_session_name = "OMNY"
|
||||
omny = OMNY(bec)
|
||||
logger.success("OMNY session loaded.")
|
||||
print(r"""
|
||||
██████╗ ███████╗ ██████╗ ██████╗ ███╗ ███╗███╗ ██╗██╗ ██╗
|
||||
██╔══██╗██╔════╝██╔════╝ ██╔═══██╗████╗ ████║████╗ ██║╚██╗ ██╔╝
|
||||
██████╔╝█████╗ ██║ ██║ ██║██╔████╔██║██╔██╗ ██║ ╚████╔╝
|
||||
██╔══██╗██╔══╝ ██║ ██║ ██║██║╚██╔╝██║██║╚██╗██║ ╚██╔╝
|
||||
██████╔╝███████╗╚██████╗ ╚██████╔╝██║ ╚═╝ ██║██║ ╚████║ ██║
|
||||
╚═════╝ ╚══════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝ ╚═╝
|
||||
|
||||
B E C O M N Y
|
||||
""")
|
||||
|
||||
elif _args.session.lower() == "flomni":
|
||||
from csaxs_bec.bec_ipython_client.plugins.flomni import Flomni
|
||||
|
||||
_session_name = "flomni"
|
||||
flomni = Flomni(bec)
|
||||
logger.success("flomni session loaded.")
|
||||
print(r"""
|
||||
██████╗ ███████╗ ██████╗ ███████╗██╗ ██████╗ ███╗ ███╗███╗ ██╗██╗
|
||||
██╔══██╗██╔════╝██╔════╝ ██╔════╝██║ ██╔═══██╗████╗ ████║████╗ ██║██║
|
||||
██████╔╝█████╗ ██║ █████╗ ██║ ██║ ██║██╔████╔██║██╔██╗ ██║██║
|
||||
██╔══██╗██╔══╝ ██║ ██╔══╝ ██║ ██║ ██║██║╚██╔╝██║██║╚██╗██║██║
|
||||
██████╔╝███████╗╚██████╗ ██║ ███████╗╚██████╔╝██║ ╚═╝ ██║██║ ╚████║██║
|
||||
╚═════╝ ╚══════╝ ╚═════╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝
|
||||
|
||||
B E C f l O M N I
|
||||
""")
|
||||
|
||||
|
||||
# SETUP BEAMLINE INFO
|
||||
from bec_ipython_client.plugins.SLS.sls_info import OperatorInfo, SLSInfo
|
||||
|
||||
@@ -9,27 +9,27 @@ eiger_1_5:
|
||||
readoutPriority: async
|
||||
softwareTrigger: False
|
||||
|
||||
# eiger_9:
|
||||
# description: Eiger 9M detector
|
||||
# deviceClass: csaxs_bec.devices.jungfraujoch.eiger_9m.Eiger9M
|
||||
# deviceConfig:
|
||||
# detector_distance: 100
|
||||
# beam_center: [0, 0]
|
||||
# onFailure: raise
|
||||
# enabled: true
|
||||
# readoutPriority: async
|
||||
# softwareTrigger: False
|
||||
eiger_9:
|
||||
description: Eiger 9M detector
|
||||
deviceClass: csaxs_bec.devices.jungfraujoch.eiger_9m.Eiger9M
|
||||
deviceConfig:
|
||||
detector_distance: 100
|
||||
beam_center: [0, 0]
|
||||
onFailure: raise
|
||||
enabled: true
|
||||
readoutPriority: async
|
||||
softwareTrigger: False
|
||||
|
||||
# ids_cam:
|
||||
# description: IDS camera for live image acquisition
|
||||
# deviceClass: csaxs_bec.devices.ids_cameras.IDSCamera
|
||||
# deviceConfig:
|
||||
# camera_id: 201
|
||||
# bits_per_pixel: 24
|
||||
# m_n_colormode: 1
|
||||
# live_mode: True
|
||||
# onFailure: raise
|
||||
# enabled: true
|
||||
# readoutPriority: async
|
||||
# softwareTrigger: True
|
||||
ids_cam:
|
||||
description: IDS camera for live image acquisition
|
||||
deviceClass: csaxs_bec.devices.ids_cameras.IDSCamera
|
||||
deviceConfig:
|
||||
camera_id: 201
|
||||
bits_per_pixel: 24
|
||||
m_n_colormode: 1
|
||||
live_mode: True
|
||||
onFailure: raise
|
||||
enabled: true
|
||||
readoutPriority: async
|
||||
softwareTrigger: True
|
||||
|
||||
|
||||
@@ -1,25 +0,0 @@
|
||||
############################################################
|
||||
##################### EPS ##################################
|
||||
############################################################
|
||||
x12saEPS:
|
||||
description: X12SA EPS info and control
|
||||
deviceClass: csaxs_bec.devices.epics.eps.EPS
|
||||
deviceConfig: {}
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
############################################################
|
||||
##################### GalilRIO #############################
|
||||
############################################################
|
||||
|
||||
galilrioesxbox:
|
||||
description: Galil RIO for remote gain switching and slow reading ES XBox
|
||||
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
|
||||
deviceConfig:
|
||||
host: galilrioesxbox.psi.ch
|
||||
enabled: true
|
||||
onFailure: raise
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -1,11 +1,11 @@
|
||||
# This is the main configuration file that is
|
||||
# commented or uncommented according to the type of experiment
|
||||
|
||||
# optics:
|
||||
# - !include ./bl_optics_hutch.yaml
|
||||
optics:
|
||||
- !include ./bl_optics_hutch.yaml
|
||||
|
||||
# frontend:
|
||||
# - !include ./bl_frontend.yaml
|
||||
frontend:
|
||||
- !include ./bl_frontend.yaml
|
||||
|
||||
endstation:
|
||||
- !include ./bl_endstation.yaml
|
||||
@@ -16,8 +16,8 @@ detectors:
|
||||
#sastt:
|
||||
# - !include ./sastt.yaml
|
||||
|
||||
flomni:
|
||||
- !include ./ptycho_flomni.yaml
|
||||
#flomni:
|
||||
# - !include ./ptycho_flomni.yaml
|
||||
|
||||
#omny:
|
||||
# - !include ./ptycho_omny.yaml
|
||||
|
||||
@@ -471,7 +471,7 @@ omnyfsh:
|
||||
#################### GUI Signals ###########################
|
||||
############################################################
|
||||
omny_xray_gui:
|
||||
description: Gui signals
|
||||
description: Gui Epics signals
|
||||
deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayEpicsGUI
|
||||
deviceConfig: {}
|
||||
enabled: true
|
||||
@@ -486,25 +486,4 @@ calculated_signal:
|
||||
compute_method: "def just_rand():\n return 42"
|
||||
enabled: true
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
|
||||
############################################################
|
||||
#################### OMNY Pandabox #########################
|
||||
############################################################
|
||||
omny_panda:
|
||||
readoutPriority: async
|
||||
deviceClass: csaxs_bec.devices.panda_box.panda_box_omny.PandaBoxOMNY
|
||||
deviceConfig:
|
||||
host: omny-panda.psi.ch
|
||||
signal_alias:
|
||||
FMC_IN.VAL1.Min: cap_voltage_fzp_y_min
|
||||
FMC_IN.VAL1.Max: cap_voltage_fzp_y_max
|
||||
FMC_IN.VAL1.Mean: cap_voltage_fzp_y_mean
|
||||
FMC_IN.VAL2.Min: cap_voltage_fzp_x_min
|
||||
FMC_IN.VAL2.Max: cap_voltage_fzp_x_max
|
||||
FMC_IN.VAL2.Mean: cap_voltage_fzp_x_mean
|
||||
deviceTags:
|
||||
- detector
|
||||
enabled: true
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
readoutPriority: baseline
|
||||
@@ -25,6 +25,34 @@ logger = bec_logger.logger
|
||||
|
||||
|
||||
class LamniGalilController(GalilController):
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Error status
|
||||
# ============================================================
|
||||
|
||||
caperr_bits = {
|
||||
0x01: "Cap1 outside expected left-stop range (early check)",
|
||||
0x02: "Cap2 outside expected left-stop range (early check)",
|
||||
0x04: "Cap1 too low during pressure-off check (near right boundary)",
|
||||
0x08: "Cap2 too low during pressure-off check (near right boundary)",
|
||||
0x10: "Cap1 exceeded allowed left-stop boundary during movement",
|
||||
0x20: "Cap2 exceeded allowed left-stop boundary during movement (disabled in code)",
|
||||
0x40: "Cap1 did not respond to test movement",
|
||||
0x80: "Cap2 did not respond to test movement"
|
||||
}
|
||||
|
||||
allaxrer_table = {
|
||||
1: "Not all axes referenced after reference search",
|
||||
2: "Pressure-loss emergency stop (pressure 14/15 active while motor C off)",
|
||||
3: "Unexpected pressure OFF while soft-limits not yet set",
|
||||
4: "Pressure valve mismatch (OUT13=0 but IN13=1)",
|
||||
5: "Capacitive sensor boundary violations (caperr > 0)",
|
||||
6: "Emergency Stop triggered (IN[5]=0)",
|
||||
7: "Following error detected on one or more axes"
|
||||
}
|
||||
|
||||
|
||||
USER_ACCESS = [
|
||||
"describe",
|
||||
"show_running_threads",
|
||||
@@ -37,6 +65,8 @@ class LamniGalilController(GalilController):
|
||||
"get_motor_limit_switch",
|
||||
"is_motor_on",
|
||||
"all_axes_referenced",
|
||||
"lamni_lights_off",
|
||||
"lamni_lights_on"
|
||||
]
|
||||
|
||||
def show_status_other(self):
|
||||
@@ -60,6 +90,47 @@ class LamniGalilController(GalilController):
|
||||
print("There is air pressure at the outer rotation radial.")
|
||||
swver = float(self.socket_put_and_receive("MGswver"))
|
||||
print(f"Lgalil LAMNI firmware version {swver:2.0f}.")
|
||||
allaxref = int(float(self.socket_put_and_receive("MGallaxref")))
|
||||
print(f"Error statuts:")
|
||||
if allaxref == 1:
|
||||
print(f"Allaxref = 1, all OK.")
|
||||
else:
|
||||
print(f"Allaxref = {allaxref}. Not all axes are referenced or error introduced preventing motion.")
|
||||
allaxrer = int(float(self.socket_put_and_receive("MGallaxrer")))
|
||||
print("\nallaxrer =", allaxrer)
|
||||
print(self.decode_allaxrer(allaxrer))
|
||||
caperr = int(float(self.socket_put_and_receive("MGcaperr")))
|
||||
print("\nDecoding caperr =", caperr)
|
||||
self.visualize_caperr(caperr)
|
||||
|
||||
def decode_allaxrer(self, code: int) -> str:
|
||||
"""Return human-readable meaning of allaxrer code."""
|
||||
return self.allaxrer_table.get(code, "Unknown allaxrer code")
|
||||
|
||||
def visualize_caperr(self, mask: int):
|
||||
"""Pretty-print a bitmask visualization for caperr."""
|
||||
print("\n=== CAPERR BITMASK VISUALIZER ===")
|
||||
print(f"Raw value: {mask} (0x{mask:02X})")
|
||||
print("----------------------------------\n")
|
||||
|
||||
print("Bit | Hex | Active | Meaning")
|
||||
print("----------------------------------")
|
||||
|
||||
for bit, meaning in self.caperr_bits.items():
|
||||
active = "YES" if mask & bit else "no"
|
||||
print(f"{bit:3d} | 0x{bit:02X} | {active:6} | {meaning}")
|
||||
|
||||
print("\nActive flags:")
|
||||
active_flags = [meaning for bit, meaning in self.caperr_bits.items() if mask & bit]
|
||||
|
||||
if active_flags:
|
||||
for f in active_flags:
|
||||
print(" ✓", f)
|
||||
else:
|
||||
print(" (none)")
|
||||
|
||||
print("\n==================================\n")
|
||||
|
||||
|
||||
def lamni_lights_off(self):
|
||||
self.socket_put_confirmed("SB1")
|
||||
@@ -93,7 +164,7 @@ class LamniGalilReadbackSignal(GalilSignalRO):
|
||||
val = super().read()
|
||||
if self.parent.axis_Id_numeric == 2:
|
||||
try:
|
||||
rt = self.parent.device_manager.devices[self.parent.rtx]
|
||||
rt = self.parent.device_manager.devices[self.parent.rt]
|
||||
if rt.enabled:
|
||||
rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"])
|
||||
except KeyError:
|
||||
@@ -147,7 +218,7 @@ class LamniGalilMotor(Device, PositionerBase):
|
||||
raise BECConfigError(
|
||||
"device_mapping has been specified but the device_manager cannot be accessed."
|
||||
)
|
||||
self.rt = self.device_mapping.get("rt")
|
||||
self.rt = self.device_mapping.get("rt", "rtx")
|
||||
|
||||
super().__init__(
|
||||
prefix,
|
||||
|
||||
@@ -11,6 +11,7 @@ from ophyd.status import wait as status_wait
|
||||
from ophyd.utils import LimitError, ReadOnlyError
|
||||
from ophyd_devices.utils.controller import Controller, threadlocked
|
||||
from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected
|
||||
from prettytable import PrettyTable
|
||||
|
||||
from csaxs_bec.devices.omny.rt.rt_ophyd import RtCommunicationError, RtError
|
||||
|
||||
@@ -51,6 +52,7 @@ class RtLamniController(Controller):
|
||||
_axes_per_controller = 3
|
||||
USER_ACCESS = [
|
||||
"socket_put_and_receive",
|
||||
"socket_put",
|
||||
"set_rotation_angle",
|
||||
"feedback_disable",
|
||||
"feedback_enable_without_reset",
|
||||
@@ -62,6 +64,9 @@ class RtLamniController(Controller):
|
||||
"_set_axis_velocity_maximum_speed",
|
||||
"_position_sampling_single_read",
|
||||
"_position_sampling_single_reset_and_start_sampling",
|
||||
"show_signal_strength_interferometer",
|
||||
"show_analog_signals",
|
||||
"show_feedback_status",
|
||||
]
|
||||
|
||||
def __init__(
|
||||
@@ -208,8 +213,9 @@ class RtLamniController(Controller):
|
||||
|
||||
@threadlocked
|
||||
def start_scan(self):
|
||||
interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0])
|
||||
if interferometer_feedback_not_running == 1:
|
||||
# interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0])
|
||||
# if interferometer_feedback_not_running == 1:
|
||||
if not self.feedback_is_running():
|
||||
logger.error(
|
||||
"Cannot start scan because feedback loop is not running or there is an interferometer error."
|
||||
)
|
||||
@@ -270,6 +276,44 @@ class RtLamniController(Controller):
|
||||
"average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)},
|
||||
}
|
||||
return signals
|
||||
|
||||
def feedback_is_running(self) -> bool:
|
||||
status = int(float((self.socket_put_and_receive("J2")).split(",")[0]))
|
||||
return status == 0 # 0 means running, 1 means error/disabled
|
||||
|
||||
def show_feedback_status(self):
|
||||
if self.feedback_is_running():
|
||||
print("Loop is running, no error on interferometer.")
|
||||
else:
|
||||
print("Loop is not running, either it is turned off or an interferometer error occurred.")
|
||||
|
||||
|
||||
def show_analog_signals(self) -> dict:
|
||||
self.socket_put("As") # start sampling
|
||||
time.sleep(0.01)
|
||||
return_table = (self.socket_put_and_receive("Ar")).split(",")
|
||||
|
||||
number_of_samples = int(float(return_table[0]))
|
||||
signals = {
|
||||
"number_of_samples": number_of_samples,
|
||||
"piezo_0": float(return_table[1]),
|
||||
"piezo_1": float(return_table[2]),
|
||||
"cap_0": float(return_table[3]),
|
||||
"cap_1": float(return_table[4]),
|
||||
"cap_2": float(return_table[5]),
|
||||
"cap_3": float(return_table[6]),
|
||||
"cap_4": float(return_table[7]),
|
||||
}
|
||||
|
||||
t = PrettyTable()
|
||||
t.title = f"LamNI Analog Signals ({number_of_samples} samples)"
|
||||
t.field_names = ["Signal", "Value"]
|
||||
for key, val in signals.items():
|
||||
if key != "number_of_samples":
|
||||
t.add_row([key, f"{val:.4f}"])
|
||||
print(t)
|
||||
|
||||
return
|
||||
|
||||
def read_positions_from_sampler(self):
|
||||
# this was for reading after the scan completed
|
||||
@@ -347,6 +391,48 @@ class RtLamniController(Controller):
|
||||
)
|
||||
return bool(return_table[0])
|
||||
|
||||
def show_signal_strength_interferometer(self):
|
||||
# trigger SSI averaging before reading
|
||||
self.socket_put("J3")
|
||||
time.sleep(0.05)
|
||||
return_table = (self.socket_put_and_receive("J2")).split(",")
|
||||
ssi_0 = float(return_table[1])
|
||||
ssi_1 = float(return_table[2])
|
||||
|
||||
return_table_angle = (self.socket_put_and_receive("J7")).split(",")
|
||||
angle_running = bool(int(float(return_table_angle[0])))
|
||||
angle_position = float(return_table_angle[1])
|
||||
angle_signal = float(return_table_angle[2])
|
||||
|
||||
t = PrettyTable()
|
||||
t.title = "Interferometer signal strength"
|
||||
t.field_names = ["Axis", "Description", "Value", "Running"]
|
||||
t.add_row([0, "ST FZP horizontal", ssi_0, "-"])
|
||||
t.add_row([1, "ST FZP vertical", ssi_1, "-"])
|
||||
t.add_row([2, "Angle interferometer", angle_signal, angle_running])
|
||||
print(t)
|
||||
|
||||
if angle_running:
|
||||
print(f"Angle interferometer position: {angle_position:.4f} um")
|
||||
else:
|
||||
print("Warning: angle interferometer is not running.")
|
||||
|
||||
def show_interferometer_positions(self) -> dict:
|
||||
return_table = (self.socket_put_and_receive("J4")).split(",")
|
||||
loop_status = bool(int(float(return_table[0])))
|
||||
pos_y = float(return_table[1])
|
||||
pos_x = float(return_table[2])
|
||||
|
||||
t = PrettyTable()
|
||||
t.title = "LamNI Interferometer Positions"
|
||||
t.field_names = ["Axis", "Description", "Position (um)"]
|
||||
t.add_row([0, "X", f"{pos_x:.4f}"])
|
||||
t.add_row([1, "Y", f"{pos_y:.4f}"])
|
||||
print(t)
|
||||
print(f"Feedback loop running: {loop_status}")
|
||||
|
||||
return {"x": pos_x, "y": pos_y, "loop_running": loop_status}
|
||||
|
||||
def feedback_enable_with_reset(self):
|
||||
if not self.feedback_status_angle_lamni():
|
||||
self.feedback_disable_and_even_reset_lamni_angle_interferometer()
|
||||
|
||||
@@ -1,101 +0,0 @@
|
||||
"""Module to integrate the PandaBox for cSAXS measurements."""
|
||||
|
||||
import time
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd_devices import AsyncMultiSignal, StatusBase
|
||||
from ophyd_devices.devices.panda_box.panda_box import PandaBox, PandaState
|
||||
from pandablocks.responses import FrameData
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class PandaBoxCSAXS(PandaBox):
|
||||
|
||||
def on_init(self):
|
||||
super().on_init()
|
||||
self._acquisition_group = "burst"
|
||||
self._timeout_on_completed = 10
|
||||
|
||||
def on_stage(self):
|
||||
# TODO, adjust as seen fit.
|
||||
# Adjust the acquisition group based on scan parameters if needed
|
||||
if self.scan_info.msg.scan_type == "fly":
|
||||
self._acquisition_group = "fly"
|
||||
elif self.scan_info.msg.scan_type == "step":
|
||||
if self.scan_info.msg.scan_parameters["frames_per_trigger"] == 1:
|
||||
self._acquisition_group = "monitored"
|
||||
else:
|
||||
self._acquisition_group = "burst"
|
||||
|
||||
def on_complete(self):
|
||||
"""On complete is called after the scan is complete. We need to wait for the capture to complete before we can disarm the PandaBox."""
|
||||
|
||||
def _check_capture_complete():
|
||||
captured = 0
|
||||
start_time = time.monotonic()
|
||||
try:
|
||||
expected_points = int(self.scan_info.msg.num_points * self.scan_info.msg.scan_parameters.get("frames_per_trigger",1))
|
||||
while captured < expected_points:
|
||||
logger.info(
|
||||
f"Run with captured {captured} and expected points : {expected_points}."
|
||||
)
|
||||
ret = self.send_raw("*PCAP.CAPTURED?")
|
||||
captured = int(ret[0].split("=")[-1])
|
||||
time.sleep(0.01)
|
||||
if (time.monotonic() - start_time) > self._timeout_on_completed:
|
||||
raise TimeoutError(f"Pandabox {self.name} did not complete after {self._timeout_on_completed} with points captured {captured}/{expected_points}")
|
||||
finally:
|
||||
self._disarm()
|
||||
|
||||
_check_capture_complete()
|
||||
|
||||
# NOTE: This utility class allows to submit a blocking function to a thread and return a status object
|
||||
# that can be awaited for. This allows for asynchronous waiting for the capture to complete without blocking
|
||||
# the main duty cycle of the device server. The device server knows how to handle the status object (future)
|
||||
# and will wait for it to complete.
|
||||
# status = self.task_handler.submit_task(_check_capture_complete, run=True)
|
||||
|
||||
# status_panda_state = StatusBase(obj=self)
|
||||
# self.add_status_callback(
|
||||
# status, success=[PandaState.END, PandaState.DISARMED], failure=[PandaState.READY]
|
||||
# )
|
||||
# ret_status = status & status_panda_state
|
||||
# self.cancel_on_stop(ret_status)
|
||||
# return ret_status
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import time
|
||||
|
||||
panda = PandaBoxCSAXS(
|
||||
name="omny_panda",
|
||||
host="omny-panda.psi.ch",
|
||||
signal_alias={
|
||||
"FMC_IN.VAL2.Value": "alias",
|
||||
"FMC_IN.VAL1.Min": "alias2",
|
||||
"FMC_IN.VAL1.Max": "alias3",
|
||||
"FMC_IN.VAL1.Mean": "alias4",
|
||||
},
|
||||
)
|
||||
panda.on_connected()
|
||||
status = StatusBase(obj=panda)
|
||||
panda.add_status_callback(
|
||||
status=status, success=[PandaState.DISARMED], failure=[PandaState.READY]
|
||||
)
|
||||
panda.stop()
|
||||
status.wait(timeout=2)
|
||||
panda.unstage()
|
||||
logger.info(f"Panda connected")
|
||||
ret = panda.stage()
|
||||
logger.info(f"Panda staged")
|
||||
ret = panda.pre_scan()
|
||||
ret.wait(timeout=5)
|
||||
logger.info(f"Panda pre scan done")
|
||||
time.sleep(5)
|
||||
panda.stop()
|
||||
st = panda.complete()
|
||||
st.wait(timeout=5)
|
||||
logger.info(f"Measurement completed")
|
||||
panda.unstage()
|
||||
logger.info(f"Panda Unstaged")
|
||||
@@ -1,102 +0,0 @@
|
||||
"""Module to integrate the PandaBox for cSAXS measurements."""
|
||||
|
||||
import time
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd_devices import AsyncMultiSignal, StatusBase
|
||||
from ophyd_devices.devices.panda_box.panda_box import PandaBox, PandaState
|
||||
from pandablocks.responses import FrameData
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class PandaBoxOMNY(PandaBox):
|
||||
|
||||
def on_init(self):
|
||||
super().on_init()
|
||||
self._acquisition_group = "burst"
|
||||
self._timeout_on_completed = 10
|
||||
|
||||
def on_stage(self):
|
||||
|
||||
# TODO, adjust as seen fit.
|
||||
# Adjust the acquisition group based on scan parameters if needed
|
||||
if self.scan_info.msg.scan_type == "fly":
|
||||
self._acquisition_group = "fly"
|
||||
elif self.scan_info.msg.scan_type == "step":
|
||||
if self.scan_info.msg.scan_parameters["frames_per_trigger"] == 1:
|
||||
self._acquisition_group = "monitored"
|
||||
else:
|
||||
self._acquisition_group = "burst"
|
||||
|
||||
def on_complete(self):
|
||||
"""On complete is called after the scan is complete. We need to wait for the capture to complete before we can disarm the PandaBox."""
|
||||
|
||||
def _check_capture_complete():
|
||||
captured = 0
|
||||
start_time = time.monotonic()
|
||||
try:
|
||||
expected_points = int(self.scan_info.msg.num_points * self.scan_info.msg.scan_parameters.get("frames_per_trigger",1))
|
||||
while captured < expected_points:
|
||||
logger.info(
|
||||
f"Run with captured {captured} and expected points : {expected_points}."
|
||||
)
|
||||
ret = self.send_raw("*PCAP.CAPTURED?")
|
||||
captured = int(ret[0].split("=")[-1])
|
||||
time.sleep(0.01)
|
||||
if (time.monotonic() - start_time) > self._timeout_on_completed:
|
||||
raise TimeoutError(f"Pandabox {self.name} did not complete after {self._timeout_on_completed} with points captured {captured}/{expected_points}")
|
||||
finally:
|
||||
self._disarm()
|
||||
|
||||
_check_capture_complete()
|
||||
|
||||
# NOTE: This utility class allows to submit a blocking function to a thread and return a status object
|
||||
# that can be awaited for. This allows for asynchronous waiting for the capture to complete without blocking
|
||||
# the main duty cycle of the device server. The device server knows how to handle the status object (future)
|
||||
# and will wait for it to complete.
|
||||
# status = self.task_handler.submit_task(_check_capture_complete, run=True)
|
||||
|
||||
# status_panda_state = StatusBase(obj=self)
|
||||
# self.add_status_callback(
|
||||
# status, success=[PandaState.END, PandaState.DISARMED], failure=[PandaState.READY]
|
||||
# )
|
||||
# ret_status = status & status_panda_state
|
||||
# self.cancel_on_stop(ret_status)
|
||||
# return ret_status
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import time
|
||||
|
||||
panda = PandaBoxCSAXS(
|
||||
name="omny_panda",
|
||||
host="omny-panda.psi.ch",
|
||||
signal_alias={
|
||||
"FMC_IN.VAL2.Value": "alias",
|
||||
"FMC_IN.VAL1.Min": "alias2",
|
||||
"FMC_IN.VAL1.Max": "alias3",
|
||||
"FMC_IN.VAL1.Mean": "alias4",
|
||||
},
|
||||
)
|
||||
panda.on_connected()
|
||||
status = StatusBase(obj=panda)
|
||||
panda.add_status_callback(
|
||||
status=status, success=[PandaState.DISARMED], failure=[PandaState.READY]
|
||||
)
|
||||
panda.stop()
|
||||
status.wait(timeout=2)
|
||||
panda.unstage()
|
||||
logger.info(f"Panda connected")
|
||||
ret = panda.stage()
|
||||
logger.info(f"Panda staged")
|
||||
ret = panda.pre_scan()
|
||||
ret.wait(timeout=5)
|
||||
logger.info(f"Panda pre scan done")
|
||||
time.sleep(5)
|
||||
panda.stop()
|
||||
st = panda.complete()
|
||||
st.wait(timeout=5)
|
||||
logger.info(f"Measurement completed")
|
||||
panda.unstage()
|
||||
logger.info(f"Panda Unstaged")
|
||||
@@ -210,7 +210,7 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
|
||||
arg_input = {}
|
||||
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, frames_per_trigger:int=1, exp_time:float=0,**kwargs):
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
"""
|
||||
A LamNI scan following Fermat's spiral.
|
||||
|
||||
@@ -230,10 +230,10 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
|
||||
|
||||
Examples:
|
||||
>>> scans.lamni_fermat_scan(fov_size=[20], step=0.5, exp_time=0.1)
|
||||
>>> scans.lamni_fermat_scan(fov_size=[20, 25], center_x=0.02, center_y=0, shift_x=0, shift_y=0, angle=0, step=0.5, fov_circular=0, exp_time=0.1, frames_per_trigger=1)
|
||||
>>> scans.lamni_fermat_scan(fov_size=[20, 25], center_x=0.02, center_y=0, shift_x=0, shift_y=0, angle=0, step=0.5, fov_circular=0, exp_time=0.1)
|
||||
"""
|
||||
|
||||
super().__init__(parameter=parameter, frames_per_trigger=frames_per_trigger, exp_time=exp_time,**kwargs)
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
self.axis = []
|
||||
scan_kwargs = parameter.get("kwargs", {})
|
||||
self.fov_size = scan_kwargs.get("fov_size")
|
||||
@@ -482,7 +482,6 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
|
||||
yield from self.open_scan()
|
||||
yield from self.stage()
|
||||
yield from self.run_baseline_reading()
|
||||
yield from self.pre_scan()
|
||||
yield from self.scan_core()
|
||||
yield from self.finalize()
|
||||
yield from self.unstage()
|
||||
|
||||
@@ -52,7 +52,6 @@ class FlomniFermatScan(SyncFlyScanBase):
|
||||
angle: float = None,
|
||||
corridor_size: float = 3,
|
||||
parameter: dict = None,
|
||||
frames_per_trigger:int=1,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
@@ -63,8 +62,7 @@ class FlomniFermatScan(SyncFlyScanBase):
|
||||
fovy(float) [um]: Fov in the piezo plane (i.e. piezo range). Max 100 um
|
||||
cenx(float) [um]: center position in x.
|
||||
ceny(float) [um]: center position in y.
|
||||
exp_time(float) [s]: exposure time per burst frame
|
||||
frames_per_trigger(int) : Number of burst frames per point
|
||||
exp_time(float) [s]: exposure time
|
||||
step(float) [um]: stepsize
|
||||
zshift(float) [um]: shift in z
|
||||
angle(float) [deg]: rotation angle (will rotate first)
|
||||
@@ -73,10 +71,10 @@ class FlomniFermatScan(SyncFlyScanBase):
|
||||
Returns:
|
||||
|
||||
Examples:
|
||||
>>> scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1)
|
||||
>>> scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01)
|
||||
"""
|
||||
|
||||
super().__init__(parameter=parameter, exp_time=exp_time, frames_per_trigger=frames_per_trigger, **kwargs)
|
||||
super().__init__(parameter=parameter, exp_time=exp_time, **kwargs)
|
||||
self.show_live_table = False
|
||||
self.axis = []
|
||||
self.fovx = fovx
|
||||
@@ -325,7 +323,6 @@ class FlomniFermatScan(SyncFlyScanBase):
|
||||
yield from self.stage()
|
||||
yield from self.run_baseline_reading()
|
||||
yield from self._prepare_setup_part2()
|
||||
yield from self.pre_scan()
|
||||
yield from self.scan_core()
|
||||
yield from self.finalize()
|
||||
yield from self.unstage()
|
||||
|
||||
@@ -51,7 +51,6 @@ class OMNYFermatScan(SyncFlyScanBase):
|
||||
angle: float = None,
|
||||
corridor_size: float = 3,
|
||||
parameter: dict = None,
|
||||
frames_per_trigger:int=1,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
@@ -63,7 +62,6 @@ class OMNYFermatScan(SyncFlyScanBase):
|
||||
cenx(float) [um]: center position in x.
|
||||
ceny(float) [um]: center position in y.
|
||||
exp_time(float) [s]: exposure time
|
||||
frames_per_trigger:int: Number of burst frames per trigger, defaults to 1.
|
||||
step(float) [um]: stepsize
|
||||
zshift(float) [um]: shift in z
|
||||
angle(float) [deg]: rotation angle (will rotate first)
|
||||
@@ -75,7 +73,7 @@ class OMNYFermatScan(SyncFlyScanBase):
|
||||
>>> scans.omny_fermat_scan(fovx=20, fovy=25, cenx=10, ceny=0, zshift=0, angle=0, step=2, exp_time=0.01)
|
||||
"""
|
||||
|
||||
super().__init__(parameter=parameter, exp_time=exp_time, frames_per_trigger=frames_per_trigger, **kwargs)
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
self.axis = []
|
||||
self.fovx = fovx
|
||||
self.fovy = fovy
|
||||
@@ -301,7 +299,6 @@ class OMNYFermatScan(SyncFlyScanBase):
|
||||
yield from self.stage()
|
||||
yield from self.run_baseline_reading()
|
||||
yield from self._prepare_setup_part2()
|
||||
yield from self.pre_scan()
|
||||
yield from self.scan_core()
|
||||
yield from self.finalize()
|
||||
yield from self.unstage()
|
||||
|
||||
@@ -193,15 +193,14 @@ The basic scan function can be called by `scans.flomni_fermat_scan()` and offers
|
||||
| fovy (float) | Fov in the piezo plane (i.e. piezo range). Max 100 um |
|
||||
| cenx (float) | center position in x |
|
||||
| ceny (float) | center position in y |
|
||||
| exp_time (float) | exposure time per frame |
|
||||
| frames_per_trigger(int) | Number of burst frames per position |
|
||||
| exp_time (float) | exposure time |
|
||||
| step (float) | stepsize |
|
||||
| zshift (float) | shift in z |
|
||||
| angle (float) | rotation angle (will rotate first) |
|
||||
| corridor_size (float) | corridor size for the corridor optimization. Default 3 um |
|
||||
|
||||
Example:
|
||||
`scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1)`
|
||||
`scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01)`
|
||||
|
||||
#### Overview of the alignment steps
|
||||
|
||||
|
||||
@@ -327,15 +327,14 @@ The basic scan function can be called by `scans.omny_fermat_scan()` and offers a
|
||||
| fovy (float) | Fov in the piezo plane (i.e. piezo range). Max 100 um |
|
||||
| cenx (float) | center position in x |
|
||||
| ceny (float) | center position in y |
|
||||
| exp_time (float) | exposure time per frame |
|
||||
| frames_per_trigger(int) | Number of burst frames per position |
|
||||
| exp_time (float) | exposure time |
|
||||
| step (float) | stepsize |
|
||||
| zshift (float) | shift in z |
|
||||
| angle (float) | rotation angle (will rotate first) |
|
||||
| corridor_size (float) | corridor size for the corridor optimization. Default 3 um |
|
||||
|
||||
Example:
|
||||
`scans.omny_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1)`
|
||||
`scans.omny_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01)`
|
||||
|
||||
#### Overview of the alignment steps
|
||||
|
||||
|
||||
@@ -35,16 +35,16 @@ def test_save_frame(bec_client_mock):
|
||||
lamni = LamNI(client)
|
||||
align = XrayEyeAlign(client, lamni)
|
||||
with mock.patch(
|
||||
"csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put"
|
||||
"csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
|
||||
) as epics_put_mock:
|
||||
align.save_frame()
|
||||
epics_put_mock.assert_called_once_with("XOMNYI-XEYE-SAVFRAME:0", 1)
|
||||
|
||||
|
||||
def test_update_frame(bec_client_mock):
|
||||
epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put"
|
||||
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_get"
|
||||
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.fshopen"
|
||||
epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
|
||||
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_get"
|
||||
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.fshopen"
|
||||
client = bec_client_mock
|
||||
client.device_manager.devices.xeye = DeviceBase(
|
||||
name="xeye",
|
||||
|
||||
Reference in New Issue
Block a user