refactor: export from bec repo

This commit is contained in:
2022-12-09 11:36:05 +01:00
parent 172d6fe570
commit a8eae094eb
16 changed files with 4545 additions and 0 deletions

8
.gitignore vendored Normal file
View File

@@ -0,0 +1,8 @@
**/venv
**/.idea
*.log
**/__pycache__
.DS_Store
**/out
**/.vscode
**/.pytest_cache

BIN
bec_client/.DS_Store vendored Normal file

Binary file not shown.

245
bec_client/hli/spec_hli.py Normal file
View File

@@ -0,0 +1,245 @@
from bec_client.scan_manager import ScanReport
from bec_utils.devicemanager import Device
# pylint:disable=undefined-variable
# pylint: disable=too-many-arguments
def dscan(
motor1: Device, m1_from: float, m1_to: float, steps: int, exp_time: float, **kwargs
) -> ScanReport:
"""Relative line scan with one device.
Args:
motor1 (Device): Device that should be scanned.
m1_from (float): Start position relative to the current position.
m1_to (float): End position relative to the current position.
steps (int): Number of steps.
exp_time (float): Exposure time.
Returns:
ScanReport: Status object.
Examples:
>>> dscan(dev.motor1, -5, 5, 10, 0.1)
"""
return scans.line_scan(
motor1, m1_from, m1_to, steps=steps, exp_time=exp_time, relative=True, **kwargs
)
def d2scan(
motor1: Device,
m1_from: float,
m1_to: float,
motor2: Device,
m2_from: float,
m2_to: float,
steps: int,
exp_time: float,
**kwargs
) -> ScanReport:
"""Relative line scan with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device relative to its current position.
m1_to (float): End position of the first device relative to its current position.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device relative to its current position.
m2_to (float): End position of the second device relative to its current position.
steps (int): Number of steps.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> d2scan(dev.motor1, -5, 5, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.line_scan(
motor1,
m1_from,
m1_to,
motor2,
m2_from,
m2_to,
steps=steps,
exp_time=exp_time,
relative=True,
**kwargs
)
def ascan(motor1, m1_from, m1_to, steps, exp_time, **kwargs):
"""Absolute line scan with one device.
Args:
motor1 (Device): Device that should be scanned.
m1_from (float): Start position.
m1_to (float): End position.
steps (int): Number of steps.
exp_time (float): Exposure time.
Returns:
ScanReport: Status object.
Examples:
>>> ascan(dev.motor1, -5, 5, 10, 0.1)
"""
return scans.line_scan(
motor1, m1_from, m1_to, steps=steps, exp_time=exp_time, relative=False, **kwargs
)
def a2scan(motor1, m1_from, m1_to, motor2, m2_from, m2_to, steps, exp_time, **kwargs):
"""Absolute line scan with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device.
m1_to (float): End position of the first device.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device.
m2_to (float): End position of the second device.
steps (int): Number of steps.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> a2scan(dev.motor1, -5, 5, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.line_scan(
motor1,
m1_from,
m1_to,
motor2,
m2_from,
m2_to,
steps=steps,
exp_time=exp_time,
relative=False,
**kwargs
)
def dmesh(motor1, m1_from, m1_to, m1_steps, motor2, m2_from, m2_to, m2_steps, exp_time, **kwargs):
"""Relative mesh scan (grid scan) with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device relative to its current position.
m1_to (float): End position of the first device relative to its current position.
m1_steps (int): Number of steps for motor1.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device relative to its current position.
m2_to (float): End position of the second device relative to its current position.
m2_steps (int): Number of steps for motor2.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> dmesh(dev.motor1, -5, 5, 10, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.grid_scan(
motor1,
m1_from,
m1_to,
m1_steps,
motor2,
m2_from,
m2_to,
m2_steps,
exp_time=exp_time,
relative=True,
)
def amesh(motor1, m1_from, m1_to, m1_steps, motor2, m2_from, m2_to, m2_steps, exp_time, **kwargs):
"""Absolute mesh scan (grid scan) with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device.
m1_to (float): End position of the first device.
m1_steps (int): Number of steps for motor1.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device.
m2_to (float): End position of the second device.
m2_steps (int): Number of steps for motor2.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> amesh(dev.motor1, -5, 5, 10, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.grid_scan(
motor1,
m1_from,
m1_to,
m1_steps,
motor2,
m2_from,
m2_to,
m2_steps,
exp_time=exp_time,
relative=False,
)
def umv(*args) -> ScanReport:
"""Updated absolute move (i.e. blocking) for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> umv(dev.samx, 1)
>>> umv(dev.samx, 1, dev.samy, 2)
"""
return scans.umv(*args, relative=False)
def umvr(*args) -> ScanReport:
"""Updated relative move (i.e. blocking) for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> umvr(dev.samx, 1)
>>> umvr(dev.samx, 1, dev.samy, 2)
"""
return scans.umv(*args, relative=True)
def mv(*args) -> ScanReport:
"""Absolute move for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> mv(dev.samx, 1)
>>> mv(dev.samx, 1, dev.samy, 2)
"""
return scans.mv(*args, relative=False)
def mvr(*args) -> ScanReport:
"""Relative move for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> mvr(dev.samx, 1)
>>> mvr(dev.samx, 1, dev.samy, 2)
"""
return scans.mv(*args, relative=True)

BIN
bec_client/plugins/.DS_Store vendored Normal file

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB

View File

@@ -0,0 +1,2 @@
from .load_additional_correction import lamni_read_additional_correction
from .x_ray_eye_align import LamNI, XrayEyeAlign

View File

@@ -0,0 +1,269 @@
def bl_show_all '{
local gap
printf("beamline status at %s:\n",date())
if (!_bl_hall_temperature_ok()) {
bl_hall_temperature
printf("\n")
}
if (_bl_sls_status_unusual()) {
bl_sls_status
} else {
bl_ring_current
}
bl_chk_beam _show
printf("\n")
printf("U19 ID gap : ",gap)
gap = _id_get_gap_mm()
if (gap >= 8) {
text_bf
}
printf("%.3f mm\n",gap)
text_non_bf
if (!_id_loss_rate_ok()) {
id_loss_rate
}
bl_shutter_status
if (_bl_cvd_filter_open()) {
text_bf
printf("CVD diamond filter : open / out\n")
text_non_bf
}
if (!_bl_xbox_valve_es1_open()) {
bl_xbox_valve_es1 _show
}
if (_bl_ln2_non_standard()) {
text_bf
printf("\nNon standard liquid nitrogen cooling-warning parameters occur. Please report this to your local contact.\n")
text_non_bf
printf("The macro bl_ln2_warn can be used to control this e-mail warning feature.\n")
bl_ln2_warn "show"
printf("\n")
}
printf("\n")
bl_flight_tube_pressure
printf("\n")
bl_attended _show
_bl_check_alarm_records(1,1)
printf("\n")
bl_op_msg
}'
def _bl_hall_temperature_ok() '{
local temp_ok
local stat
temp_ok = 1
# EH T02 average temperature
stat = epics_get("ILUUL-02AV:TEMP")
if ((stat < 23.0) || (stat > 26.0)) {
temp_ok = 0
}
# EH T02 temperature at T0204 axis 16
stat = epics_get("ILUUL-0200-EB104:TEMP")
if ((stat < 23.0) || (stat > 26.0)) {
temp_ok = 0
}
# EH T02 temperature at T0205 axis 18
stat = epics_get("ILUUL-0200-EB105:TEMP")
if ((stat < 23.0) || (stat > 26.0)) {
temp_ok = 0
}
return (temp_ok)
}'
# ----------------------------------------------------------------------
def bl_hall_temperature '{
local stat
stat = epics_get("ILUUL-02AV:TEMP")
printf("hall T02 average temperature : ")
if ((stat < 23.0) || (stat > 25.0)) {
text_bf
}
printf("%.2f deg.C\n",stat)
text_non_bf
stat = epics_get("ILUUL-0200-EB104:TEMP")
printf("hall temperature at T0204 axis 16 : ")
if ((stat < 23) || (stat > 25)) {
text_bf
}
printf("%.2f deg.C\n",stat)
text_non_bf
stat = epics_get("ILUUL-0200-EB105:TEMP")
printf("hall temperature at T0205 axis 18 : ")
if ((stat < 23) || (stat > 25)) {
text_bf
}
printf("%.2f deg.C\n",stat)
text_non_bf
# stat = epics_get("ILUUL-0300-EB102:TEMP")
# printf("EH T03 temperature at T0302 axis 21: ")
# if ((stat < 23) || (stat > 25)) {
# text_bf
# }
# printf("%.2f deg.C\n",stat)
# text_non_bf
}'
def _bl_sls_status_unusual() '{
local unusual
local stat
unusual = 0
stat = epics_get("X12SA-SR-VAC:SETPOINT")
if (stat != "OK") {
unusual = 1
}
stat = epics_get("ACOAU-ACCU:OP-MODE.VAL")
if ((stat != "Light Available") && (stat != "Light-Available")) {
unusual = 1
}
stat = epics_get("ALIRF-GUN:INJ-MODE")
if (stat != "TOP-UP") {
unusual = 1
}
# current threshold
stat = epics_get("ALIRF-GUN:CUR-LOWLIM")
if (stat < 350) {
unusual = 1
}
# current deadband
stat = epics_get("ALIRF-GUN:CUR-DBAND")
if (stat > 2) {
unusual = 1
}
# orbit feedback mode
stat = epics_get("ARIDI-BPM:OFB-MODE")
if (stat != "fast") {
unusual = 1
}
# fast orbit feedback
stat = epics_get("ARIDI-BPM:FOFBSTATUS-G")
if (stat != "running") {
unusual = 1
}
return(unusual)
}'
def bl_sls_status '{
local stat
stat = epics_get("ACOAU-ACCU:OP-MODE.VAL")
printf("SLS status : ")
if ((stat != "Light Available") && (stat != "Light-Available")) {
text_bf
}
printf("%s\n",stat)
text_non_bf
stat = epics_get("ALIRF-GUN:INJ-MODE")
printf("SLS injection mode : ")
if (stat != "TOP-UP") {
text_bf
}
printf("%s\n",stat)
text_non_bf
stat = epics_get("ALIRF-GUN:CUR-LOWLIM")
printf("SLS current threshold : ")
if (stat < 350) {
text_bf
}
printf("%7.3f\n",stat)
text_non_bf
stat = epics_get("ALIRF-GUN:CUR-DBAND")
printf("SLS current deadband : ")
if (stat > 2) {
text_bf
}
printf("%7.3f\n",stat)
text_non_bf
stat = epics_get("ACORF-FILL:PAT-SELECT")
printf("SLS filling pattern : ")
printf("%s\n",stat)
bl_ring_current
stat = epics_get("ARIDI-PCT:TAU-HOUR")
printf("SLS filling life time : ")
printf("%.2f h\n",stat)
stat = epics_get("ARIDI-BPM:OFB-MODE")
printf("orbit feedback mode : ")
if (stat != "fast") {
text_bf
}
printf("%s\n",stat)
text_non_bf
stat = epics_get("ARIDI-BPM:FOFBSTATUS-G")
printf("fast orbit feedback : ")
if (stat != "running") {
text_bf
}
printf("%s\n",stat)
text_non_bf
}'
def _bl_get_ring_current() '{
return epics_get("ARIDI-PCT:CURRENT")
}'
# ----------------------------------------------------------------------
def _bl_no_ring_current() '{
# set an arbitrary current limit of 100mA as no-beam limit
if (_bl_get_ring_current() < 100) {
return 1
} else {
return 0
}
}'
# ----------------------------------------------------------------------
def bl_ring_current '{
local curr
curr = _bl_get_ring_current()
if (curr < 300) {
text_bf
}
printf("SLS ring current : %.3f mA\n",curr)
text_non_bf
}'

View File

@@ -0,0 +1,154 @@
import time
from rich.console import Console
from rich.table import Table
from rich import box
from bec_client.plugins.cSAXS import fshclose
class LamNIOpticsMixin:
@staticmethod
def _get_user_param_safe(device, var):
param = dev[device].user_parameter
if not param or param.get(var) is None:
raise ValueError(f"Device {device} has no user parameter definition for {var}.")
return param.get(var)
def leyey_out(self):
self.loptics_in()
fshclose()
leyey_out = self._get_user_param_safe("leyey", "out")
umv(dev.leyey, leyey_out)
epics_put("XOMNYI-XEYE-ACQ:0", 2)
umv(dev.dttrz, 5830, dev.fttrz, 3338)
def leye_in(self):
bec.queue.next_dataset_number += 1
umv(dev.dttrz, 5830 + 600, dev.fttrz, 3338 + 600)
while True:
moved_out = (input("Did the flight tube move out? (Y/n)") or "y").lower()
if moved_out == "y":
break
if moved_out == "n":
return
leyex_in = self._get_user_param_safe("leyex", "in")
leyey_in = self._get_user_param_safe("leyey", "in")
umv(dev.leyex, leyex_in, dev.leyey, leyey_in)
self.align.update_frame()
def _lfzp_in(self):
loptx_in = self._get_user_param_safe("loptx", "in")
lopty_in = self._get_user_param_safe("lopty", "in")
umv(
dev.loptx, loptx_in, dev.lopty, lopty_in
) # for 7.2567 keV and 150 mu, 60 nm fzp, loptz 83.6000 for propagation 1.4 mm
def lfzp_in(self):
"""
move in the lamni zone plate.
This will disable rt feedback, move the FZP and re-enabled the feedback.
"""
if "rtx" in dev and dev.rtx.enabled:
dev.rtx.feedback_disable()
self._lfzp_in()
if "rtx" in dev and dev.rtx.enabled:
dev.rtx.feedback_enable_with_reset()
def loptics_in(self):
"""
Move in the lamni optics, including the FZP and the OSA.
"""
self.lfzp_in()
self.losa_in()
def loptics_out(self):
"""Move out the lamni optics"""
if "rtx" in dev and dev.rtx.enabled:
dev.rtx.feedback_disable()
# self.lcs_out()
self.losa_out()
loptx_out = self._get_user_param_safe("loptx", "out")
lopty_out = self._get_user_param_safe("lopty", "out")
umv(dev.loptx, loptx_out, dev.lopty, lopty_out)
if "rtx" in dev and dev.rtx.enabled:
time.sleep(1)
dev.rtx.feedback_enable_with_reset()
def lcs_in(self):
# umv lcsx -1.852 lcsy -0.095
pass
def lcs_out(self):
umv(dev.lcsy, 3)
def losa_in(self):
# 6.2 keV, 170 um FZP
# umv(dev.losax, -1.4450000, dev.losay, -0.1800)
# umv(dev.losaz, -1)
# 6.7, 170
# umv(dev.losax, -1.4850, dev.losay, -0.1930)
# umv(dev.losaz, 1.0000)
# 7.2, 150
losax_in = self._get_user_param_safe("losax", "in")
losay_in = self._get_user_param_safe("losay", "in")
losaz_in = self._get_user_param_safe("losaz", "in")
umv(dev.losax, losax_in, dev.losay, losay_in)
umv(dev.losaz, losaz_in)
# 11 kev
# umv(dev.losax, -1.161000, dev.losay, -0.196)
# umv(dev.losaz, 1.0000)
def losa_out(self):
losay_out = self._get_user_param_safe("losay", "out")
losaz_out = self._get_user_param_safe("losaz", "out")
umv(dev.losaz, losaz_out)
umv(dev.losay, losay_out)
def lfzp_info(self):
loptz_val = dev.loptz.read()["loptz"]["value"]
distance = -loptz_val + 85.6 + 52
print(f"The sample is in a distance of {distance:.1f} mm from the FZP.")
diameters = []
diameters[0] = 80e-6
diameters[1] = 100e-6
diameters[2] = 120e-6
diameters[3] = 150e-6
diameters[4] = 170e-6
diameters[5] = 200e-6
diameters[6] = 220e-6
diameters[7] = 250e-6
mokev_val = dev.mokev.read()["mokev"]["value"]
console = Console()
table = Table(
title=f"At the current energy of {mokev_val:.4f} keV we have following options:",
box=box.SQUARE,
)
table.add_column("Diameter", justify="center")
table.add_column("Focal distance", justify="center")
table.add_column("Current beam size", justify="center")
wavelength = 1.2398e-9 / mokev_val
for diameter in diameters:
outermost_zonewidth = 60e-9
focal_distance = diameter * outermost_zonewidth / wavelength
beam_size = (
-diameter / (focal_distance * 1000) * (focal_distance * 1000 - distance) * 1e6
)
table.add_row(f"{diameter}", f"{focal_distance:.2f} mm", f"{beam_size:.2f} microns")
console.print(table)
print("OSA Information:")
# print(f"Current losaz %.1f\n", A[losaz])
# print("The OSA will collide with the sample plane at %.1f\n\n", 89.3-A[loptz])
print(
"The numbers presented here are for a sample in the plane of the lamni sample holder.\n"
)

View File

@@ -0,0 +1,23 @@
def lamni_read_additional_correction():
# "additional_correction_shift"
# [0][] x , [1][] y, [2][] angle, [3][0] number of elements
import numpy as np
with open("correction_lamni_um_S01405_.txt", "r") as f:
num_elements = f.readline()
int_num_elements = int(num_elements.split(" ")[2])
print(int_num_elements)
corr_pos_x = []
corr_pos_y = []
corr_angle = []
for j in range(0, int_num_elements * 3):
line = f.readline()
value = line.split(" ")[2]
name = line.split(" ")[0].split("[")[0]
if name == "corr_pos_x":
corr_pos_x.append(value)
elif name == "corr_pos_y":
corr_pos_y.append(value)
elif name == "corr_angle":
corr_angle.append(value)
return (corr_pos_x, corr_pos_y, corr_angle, num_elements)

View File

@@ -0,0 +1,888 @@
import builtins
import datetime
import math
import os
import subprocess
import threading
import time
from collections import defaultdict
from pathlib import Path
import numpy as np
from bec_utils import bec_logger
from bec_utils.logbook_connector import LogbookMessage
from bec_utils.pdf_writer import PDFWriter
from typeguard import typechecked
from bec_client.plugins.cSAXS import epics_get, epics_put, fshclose, fshopen
from .lamni_optics_mixin import LamNIOpticsMixin
logger = bec_logger.logger
class XrayEyeAlign:
# pixel calibration, multiply to get mm
# PIXEL_CALIBRATION = 0.2/209 #.2 with binning
PIXEL_CALIBRATION = 0.2 / 218 # .2 with binning
def __init__(self, client, lamni) -> None:
self.client = client
self.lamni = lamni
self.device_manager = client.device_manager
self.scans = client.scans
self.xeye = self.device_manager.devices.xeye
self.alignment_values = defaultdict(list)
self.shift_xy = [0, 0]
self._xray_fov_xy = [0, 0]
def save_frame(self):
epics_put("XOMNYI-XEYE-SAVFRAME:0", 1)
def update_frame(self):
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
# start live
epics_put("XOMNYI-XEYE-ACQ:0", 1)
# wait for start live
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
time.sleep(0.5)
print("waiting for live view to start...")
fshopen()
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
print("waiting for new frame...")
time.sleep(0.5)
time.sleep(0.5)
# stop live view
epics_put("XOMNYI-XEYE-ACQ:0", 0)
time.sleep(1)
# fshclose
print("got new frame")
def _disable_rt_feedback(self):
self.device_manager.devices.rtx.controller.feedback_disable()
def _enable_rt_feedback(self):
self.device_manager.devices.rtx.controller.feedback_enable_with_reset()
def tomo_rotate(self, val: float):
# pylint: disable=undefined-variable
umv(self.device_manager.devices.lsamrot, val)
def get_tomo_angle(self):
return self.device_manager.devices.lsamrot.readback.read()["lsamrot"]["value"]
def update_fov(self, k: int):
self._xray_fov_xy[0] = max(epics_get(f"XOMNYI-XEYE-XWIDTH_X:{k}"), self._xray_fov_xy[0])
self._xray_fov_xy[1] = max(0, self._xray_fov_xy[0])
@property
def movement_buttons_enabled(self):
return [epics_get("XOMNYI-XEYE-ENAMVX:0"), epics_get("XOMNYI-XEYE-ENAMVY:0")]
@movement_buttons_enabled.setter
def movement_buttons_enabled(self, enabled: bool):
enabled = int(enabled)
epics_put("XOMNYI-XEYE-ENAMVX:0", enabled)
epics_put("XOMNYI-XEYE-ENAMVY:0", enabled)
def send_message(self, msg: str):
epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg)
def align(self):
# this makes sure we are in a defined state
self._disable_rt_feedback()
epics_put("XOMNYI-XEYE-PIXELSIZE:0", self.PIXEL_CALIBRATION)
self._enable_rt_feedback()
# initialize
# disable movement buttons
self.movement_buttons_enabled = False
epics_put("XOMNYI-XEYE-ACQ:0", 0)
self.send_message("please wait...")
# put sample name
epics_put("XOMNYI-XEYE-SAMPLENAME:0.DESC", "Let us LAMNI...")
# first step
self._disable_rt_feedback()
k = 0
# move zone plate in, eye in to get beam position
self.lamni.lfzp_in()
self.update_frame()
# enable submit buttons
self.movement_buttons_enabled = False
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
epics_put("XOMNYI-XEYE-STEP:0", 0)
self.send_message("Submit center value of FZP.")
while True:
if epics_get("XOMNYI-XEYE-SUBMIT:0") == 1:
val_x = epics_get(f"XOMNYI-XEYE-XVAL_X:{k}") * self.PIXEL_CALIBRATION # in mm
val_y = epics_get(f"XOMNYI-XEYE-YVAL_Y:{k}") * self.PIXEL_CALIBRATION # in mm
self.alignment_values[k] = [val_x, val_y]
print(
f"Clicked position {k}: x {self.alignment_values[k][0]}, y {self.alignment_values[k][1]}"
)
if k == 0: # received center value of FZP
self.send_message("please wait ...")
# perform movement: FZP out, Sample in
self.lamni.loptics_out()
epics_put("XOMNYI-XEYE-SUBMIT:0", -1) # disable submit button
self.movement_buttons_enabled = False
print("Moving sample in, FZP out")
self._disable_rt_feedback()
time.sleep(0.3)
self._enable_rt_feedback()
time.sleep(0.3)
# zero is now at the center
self.update_frame()
self.send_message("Go and find the sample")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
self.movement_buttons_enabled = True
elif (
k == 1
): # received sample center value at samroy 0 ie the final base shift values
print(
f"Base shift values from movement are x {self.shift_xy[0]}, y {self.shift_xy[1]}"
)
self.shift_xy[0] += (
self.alignment_values[0][0] - self.alignment_values[1][0]
) * 1000
self.shift_xy[1] += (
self.alignment_values[1][1] - self.alignment_values[0][1]
) * 1000
print(
f"Base shift values from movement and clicked position are x {self.shift_xy[0]}, y {self.shift_xy[1]}"
)
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
self.send_message("please wait ...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1) # disable submit button
self.movement_buttons_enabled = False
time.sleep(1)
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
self.update_frame()
self.send_message("Submit sample center and FOV (0 deg)")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
self.update_fov(k)
elif 1 < k < 10: # received sample center value at samroy 0 ... 315
self.send_message("please wait ...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1) # disable submit button
# we swtich feedback off before rotating to not have it on and off again later for smooth operation
self._disable_rt_feedback()
self.tomo_rotate((k - 1) * 45 - 45 / 2)
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
self._disable_rt_feedback()
self.tomo_rotate((k - 1) * 45)
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
self.update_frame()
self.send_message("Submit sample center")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
epics_put("XOMNYI-XEYE-ENAMVX:0", 1)
self.update_fov(k)
elif k == 10: # received sample center value at samroy 270 and done
self.send_message("done...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1) # disable submit button
self.movement_buttons_enabled = False
self.update_fov(k)
break
k += 1
epics_put("XOMNYI-XEYE-STEP:0", k)
if k < 2:
# allow movements, store movements to calculate center
_xrayeyalignmvx = epics_get("XOMNYI-XEYE-MVX:0")
_xrayeyalignmvy = epics_get("XOMNYI-XEYE-MVY:0")
if _xrayeyalignmvx != 0 or _xrayeyalignmvy != 0:
self.shift_xy[0] = self.shift_xy[0] + _xrayeyalignmvx
self.shift_xy[1] = self.shift_xy[1] + _xrayeyalignmvy
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
print(
f"Current center horizontal {self.shift_xy[0]} vertical {self.shift_xy[1]}"
)
epics_put("XOMNYI-XEYE-MVY:0", 0)
epics_put("XOMNYI-XEYE-MVX:0", 0)
self.update_frame()
time.sleep(0.2)
self.write_output()
fovx = self._xray_fov_xy[0] * self.PIXEL_CALIBRATION * 1000 / 2
fovy = self._xray_fov_xy[1] * self.PIXEL_CALIBRATION * 1000 / 2
print(
f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns, fovy = {fovy:.0f} microns"
)
print("Use matlab routine to fit the current alignment...")
print(
f"This additional shift is applied to the base shift values\n which are x {self.shift_xy[0]}, y {self.shift_xy[1]}"
)
self._disable_rt_feedback()
self.tomo_rotate(0)
print(
"\n\nNEXT LOAD ALIGNMENT PARAMETERS\nby running lamni.align.read_alignment_parameters()\n"
)
self.client.set_global_var("tomo_fov_offset", self.shift_xy)
def write_output(self):
with open(
os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues"), "w"
) as alignment_values_file:
alignment_values_file.write(f"angle\thorizontal\tvertical\n")
for k in range(2, 11):
fovx_offset = (self.alignment_values[0][0] - self.alignment_values[k][0]) * 1000
fovy_offset = (self.alignment_values[k][1] - self.alignment_values[0][1]) * 1000
print(
f"Writing to file new alignment: number {k}, value x {fovx_offset}, y {fovy_offset}"
)
alignment_values_file.write(f"{(k-2)*45}\t{fovx_offset}\t{fovy_offset}\n")
def read_alignment_parameters(self, dir_path=os.path.expanduser("~/Data10/specES1/internal/")):
tomo_fit_xray_eye = np.zeros((2, 3))
with open(os.path.join(dir_path, "ptychotomoalign_Ax.txt"), "r") as file:
tomo_fit_xray_eye[0][0] = file.readline()
with open(os.path.join(dir_path, "ptychotomoalign_Bx.txt"), "r") as file:
tomo_fit_xray_eye[0][1] = file.readline()
with open(os.path.join(dir_path, "ptychotomoalign_Cx.txt"), "r") as file:
tomo_fit_xray_eye[0][2] = file.readline()
with open(os.path.join(dir_path, "ptychotomoalign_Ay.txt"), "r") as file:
tomo_fit_xray_eye[1][0] = file.readline()
with open(os.path.join(dir_path, "ptychotomoalign_By.txt"), "r") as file:
tomo_fit_xray_eye[1][1] = file.readline()
with open(os.path.join(dir_path, "ptychotomoalign_Cy.txt"), "r") as file:
tomo_fit_xray_eye[1][2] = file.readline()
self.client.set_global_var("tomo_fit_xray_eye", tomo_fit_xray_eye.tolist())
# x amp, phase, offset, y amp, phase, offset
# 0 0 0 1 0 2 1 0 1 1 1 2
print("New alignment parameters loaded from X-ray eye")
print(
f"X Amplitude {tomo_fit_xray_eye[0][0]},"
f"X Phase {tomo_fit_xray_eye[0][1]}, "
f"X Offset {tomo_fit_xray_eye[0][2]},"
f"Y Amplitude {tomo_fit_xray_eye[1][0]},"
f"Y Phase {tomo_fit_xray_eye[1][1]},"
f"Y Offset {tomo_fit_xray_eye[1][2]}"
)
class LamNI(LamNIOpticsMixin):
def __init__(self, client):
super().__init__()
self.client = client
self.align = XrayEyeAlign(client, self)
self.corr_pos_x = []
self.corr_pos_y = []
self.corr_angle = []
self.check_shutter = True
self.check_light_available = True
self.check_fofb = True
self._check_msgs = []
self.tomo_id = None
self._beam_is_okay = True
self._stop_beam_check_event = None
self.beam_check_thread = None
def get_beamline_checks_enabled(self):
print(
f"Shutter: {self.check_shutter}\nFOFB: {self.check_fofb}\nLight available: {self.check_light_available}"
)
def set_beamline_checks_enabled(self, val: bool):
self.check_shutter = val
self.check_light_available = val
self.check_fofb = val
self.get_beamline_checks_enabled()
@property
def tomo_fovx_offset(self):
val = self.client.get_global_var("tomo_fov_offset")
if val is None:
return 0.0
return val[0] / 1000
@tomo_fovx_offset.setter
def tomo_fovx_offset(self, val: float):
self.client.set_global_var("tomo_fov_offset", val)
@property
def tomo_fovy_offset(self):
val = self.client.get_global_var("tomo_fov_offset")
if val is None:
return 0.0
return val[1] / 1000
@tomo_fovy_offset.setter
def tomo_fovy_offset(self, val: float):
self.client.set_global_var("tomo_fov_offset", val)
@property
def tomo_shellstep(self):
val = self.client.get_global_var("tomo_shellstep")
if val is None:
return 1
return val
@tomo_shellstep.setter
def tomo_shellstep(self, val: float):
self.client.set_global_var("tomo_shellstep", val)
@property
def tomo_circfov(self):
val = self.client.get_global_var("tomo_circfov")
if val is None:
return 0.0
return val
@tomo_circfov.setter
def tomo_circfov(self, val: float):
self.client.set_global_var("tomo_circfov", val)
@property
def tomo_countingtime(self):
val = self.client.get_global_var("tomo_countingtime")
if val is None:
return 0.1
return val
@tomo_countingtime.setter
def tomo_countingtime(self, val: float):
self.client.set_global_var("tomo_countingtime", val)
@property
def manual_shift_x(self):
val = self.client.get_global_var("manual_shift_x")
if val is None:
return 0.0
return val
@manual_shift_x.setter
def manual_shift_x(self, val: float):
self.client.set_global_var("manual_shift_x", val)
@property
def manual_shift_y(self):
val = self.client.get_global_var("manual_shift_y")
if val is None:
return 0.0
return val
@manual_shift_y.setter
def manual_shift_y(self, val: float):
self.client.set_global_var("manual_shift_y", val)
@property
def lamni_piezo_range_x(self):
val = self.client.get_global_var("lamni_piezo_range_x")
if val is None:
return 20
return val
@lamni_piezo_range_x.setter
def lamni_piezo_range_x(self, val: float):
if val > 80:
raise ValueError("Piezo range cannot be larger than 80 um.")
self.client.set_global_var("lamni_piezo_range_x", val)
@property
def lamni_piezo_range_y(self):
val = self.client.get_global_var("lamni_piezo_range_y")
if val is None:
return 20
return val
@lamni_piezo_range_y.setter
def lamni_piezo_range_y(self, val: float):
if val > 80:
raise ValueError("Piezo range cannot be larger than 80 um.")
self.client.set_global_var("lamni_piezo_range_y", val)
@property
def lamni_stitch_x(self):
val = self.client.get_global_var("lamni_stitch_x")
if val is None:
return 0
return val
@lamni_stitch_x.setter
@typechecked
def lamni_stitch_x(self, val: int):
self.client.set_global_var("lamni_stitch_x", val)
@property
def lamni_stitch_y(self):
val = self.client.get_global_var("lamni_stitch_y")
if val is None:
return 0
return val
@lamni_stitch_y.setter
@typechecked
def lamni_stitch_y(self, val: int):
self.client.set_global_var("lamni_stitch_y", val)
@property
def ptycho_reconstruct_foldername(self):
val = self.client.get_global_var("ptycho_reconstruct_foldername")
if val is None:
return "ptycho_reconstruct"
return val
@ptycho_reconstruct_foldername.setter
def ptycho_reconstruct_foldername(self, val: str):
self.client.set_global_var("ptycho_reconstruct_foldername", val)
@property
def tomo_angle_stepsize(self):
val = self.client.get_global_var("tomo_angle_stepsize")
if val is None:
return 10.0
return val
@tomo_angle_stepsize.setter
def tomo_angle_stepsize(self, val: float):
self.client.set_global_var("tomo_angle_stepsize", val)
@property
def tomo_stitch_overlap(self):
val = self.client.get_global_var("tomo_stitch_overlap")
if val is None:
return 0.2
return val
@tomo_stitch_overlap.setter
def tomo_stitch_overlap(self, val: float):
self.client.set_global_var("tomo_stitch_overlap", val)
def tomo_scan_projection(self, angle: float):
scans = builtins.__dict__.get("scans")
bec = builtins.__dict__.get("bec")
additional_correction = self.compute_additional_correction(angle)
correction_xeye_mu = self.lamni_compute_additional_correction_xeye_mu(angle)
self._current_scan_list = []
for stitch_x in range(-self.lamni_stitch_x, self.lamni_stitch_x + 1):
for stitch_y in range(-self.lamni_stitch_y, self.lamni_stitch_y + 1):
# pylint: disable=undefined-variable
self._current_scan_list.append(bec.queue.next_scan_number)
logger.info(
f"scans.lamni_fermat_scan(fov_size=[{self.lamni_piezo_range_x},{self.lamni_piezo_range_y}], step={self.tomo_shellstep}, stitch_x={0}, stitch_y={0}, stitch_overlap={1},"
f"center_x={self.tomo_fovx_offset}, center_y={self.tomo_fovy_offset}, "
f"shift_x={self.manual_shift_x+correction_xeye_mu[0]-additional_correction[0]}, "
f"shift_y={self.manual_shift_y+correction_xeye_mu[1]-additional_correction[1]}, "
f"fov_circular={self.tomo_circfov}, angle={angle}, scan_type='fly')"
)
scans.lamni_fermat_scan(
fov_size=[self.lamni_piezo_range_x, self.lamni_piezo_range_y],
step=self.tomo_shellstep,
stitch_x=stitch_x,
stitch_y=stitch_y,
stitch_overlap=self.tomo_stitch_overlap,
center_x=self.tomo_fovx_offset,
center_y=self.tomo_fovy_offset,
shift_x=(
self.manual_shift_x + correction_xeye_mu[0] - additional_correction[0]
),
shift_y=(
self.manual_shift_y + correction_xeye_mu[1] - additional_correction[1]
),
fov_circular=self.tomo_circfov,
angle=angle,
scan_type="fly",
exp_time=self.tomo_countingtime,
)
def lamni_compute_additional_correction_xeye_mu(self, angle):
tomo_fit_xray_eye = self.client.get_global_var("tomo_fit_xray_eye")
if tomo_fit_xray_eye is None:
print("Not applying any additional correction. No x-ray eye data available.\n")
return (0, 0)
# x amp, phase, offset, y amp, phase, offset
# 0 0 0 1 0 2 1 0 1 1 1 2
correction_x = (
tomo_fit_xray_eye[0][0] * math.sin(math.radians(angle) + tomo_fit_xray_eye[0][1])
+ tomo_fit_xray_eye[0][2]
) / 1000
correction_y = (
tomo_fit_xray_eye[1][0] * math.sin(math.radians(angle) + tomo_fit_xray_eye[1][1])
+ tomo_fit_xray_eye[1][2]
) / 1000
print(f"Xeye correction x {correction_x}, y {correction_y} for angle {angle}\n")
return (correction_x, correction_y)
def compute_additional_correction(self, angle):
if not self.corr_pos_x:
print("Not applying any additional correction. No data available.\n")
return (0, 0)
# find index of closest angle
for j, _ in enumerate(self.corr_pos_x):
newangledelta = math.fabs(self.corr_angle[j] - angle)
if j == 0:
angledelta = newangledelta
additional_correction_shift_x = self.corr_pos_x[j]
additional_correction_shift_y = self.corr_pos_y[j]
continue
if newangledelta < angledelta:
additional_correction_shift_x = self.corr_pos_x[j]
additional_correction_shift_y = self.corr_pos_y[j]
angledelta = newangledelta
if additional_correction_shift_x == 0 and angle < self.corr_angle[0]:
additional_correction_shift_x = self.corr_pos_x[0]
additional_correction_shift_y = self.corr_pos_y[0]
if additional_correction_shift_x == 0 and angle > self.corr_angle[-1]:
additional_correction_shift_x = self.corr_pos_x[-1]
additional_correction_shift_y = self.corr_pos_y[-1]
logger.info(
f"Additional correction shifts: {additional_correction_shift_x} {additional_correction_shift_y}"
)
return (additional_correction_shift_x, additional_correction_shift_y)
def lamni_read_additional_correction(self, correction_file: str):
with open(correction_file, "r") as f:
num_elements = f.readline()
int_num_elements = int(num_elements.split(" ")[2])
print(int_num_elements)
corr_pos_x = []
corr_pos_y = []
corr_angle = []
for j in range(0, int_num_elements * 3):
line = f.readline()
value = line.split(" ")[2]
name = line.split(" ")[0].split("[")[0]
if name == "corr_pos_x":
corr_pos_x.append(float(value) / 1000)
elif name == "corr_pos_y":
corr_pos_y.append(float(value) / 1000)
elif name == "corr_angle":
corr_angle.append(float(value))
self.corr_pos_x = corr_pos_x
self.corr_pos_y = corr_pos_y
self.corr_angle = corr_angle
return
def _run_beamline_checks(self):
msgs = []
dev = builtins.__dict__.get("dev")
try:
if self.check_shutter:
shutter_val = dev.x12sa_es1_shutter_status.read(cached=True)
if shutter_val["value"].lower() != "open":
self._beam_is_okay = False
msgs.append("Check beam failed: Shutter is closed.")
if self.check_light_available:
machine_status = dev.sls_machine_status.read(cached=True)
if machine_status["value"] not in ["Light Available", "Light-Available"]:
self._beam_is_okay = False
msgs.append("Check beam failed: Light not available.")
if self.check_fofb:
fast_orbit_feedback = dev.sls_fast_orbit_feedback.read(cached=True)
if fast_orbit_feedback["value"] != "running":
self._beam_is_okay = False
msgs.append("Check beam failed: Fast orbit feedback is not running.")
except Exception:
logger.warning("Failed to check beam.")
return msgs
def _check_beam(self):
while not self._stop_beam_check_event.is_set():
self._check_msgs = self._run_beamline_checks()
if not self._beam_is_okay:
self._stop_beam_check_event.set()
time.sleep(1)
def _start_beam_check(self):
self._beam_is_okay = True
self._stop_beam_check_event = threading.Event()
self.beam_check_thread = threading.Thread(target=self._check_beam, daemon=True)
self.beam_check_thread.start()
def _was_beam_okay(self):
self._stop_beam_check_event.set()
self.beam_check_thread.join()
return self._beam_is_okay
def _print_beamline_checks(self):
for msg in self._check_msgs:
logger.warning(msg)
def _wait_for_beamline_checks(self):
self._print_beamline_checks()
try:
msg = LogbookMessage(self.client.logbook)
msg.add_text(
f"<p><mark class='pen-red'><strong>Beamline checks failed at {str(datetime.datetime.now())}: {''.join(self._check_msgs)}</strong></mark></p>"
).add_tag(["BEC", "beam_check"])
self.client.logbook.send_logbook_message(msg)
except Exception:
logger.warning("Failed to send update to SciLog.")
while True:
self._beam_is_okay = True
self._check_msgs = self._run_beamline_checks()
if self._beam_is_okay:
break
self._print_beamline_checks()
time.sleep(1)
try:
msg = LogbookMessage(self.client.logbook)
msg.add_text(
f"<p><mark class='pen-red'><strong>Operation resumed at {str(datetime.datetime.now())}.</strong></mark></p>"
).add_tag(["BEC", "beam_check"])
self.client.logbook.send_logbook_message(msg)
except Exception:
logger.warning("Failed to send update to SciLog.")
def add_sample_database(
self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user
):
"""Add a sample to the omny sample database. This also retrieves the tomo id."""
subprocess.run(
f"wget --user=omny --password=samples -q -O /tmp/currsamplesnr.txt 'https://omny.web.psi.ch/samples/newmeasurement.php?sample={samplename}&date={date}&eaccount={eaccount}&scannr={scan_number}&setup={setup}&additional={sample_additional_info}&user={user}'",
shell=True,
)
with open("/tmp/currsamplesnr.txt") as tomo_number_file:
tomo_number = int(tomo_number_file.read())
return tomo_number
def sub_tomo_scan(self, subtomo_number, start_angle=None):
"""start a subtomo"""
dev = builtins.__dict__.get("dev")
bec = builtins.__dict__.get("bec")
if start_angle is None:
if subtomo_number == 1:
start_angle = 0
elif subtomo_number == 2:
start_angle = self.tomo_angle_stepsize / 8.0 * 4
elif subtomo_number == 3:
start_angle = self.tomo_angle_stepsize / 8.0 * 2
elif subtomo_number == 4:
start_angle = self.tomo_angle_stepsize / 8.0 * 6
elif subtomo_number == 5:
start_angle = self.tomo_angle_stepsize / 8.0 * 1
elif subtomo_number == 6:
start_angle = self.tomo_angle_stepsize / 8.0 * 5
elif subtomo_number == 7:
start_angle = self.tomo_angle_stepsize / 8.0 * 3
elif subtomo_number == 8:
start_angle = self.tomo_angle_stepsize / 8.0 * 7
# _tomo_shift_angles (potential global variable)
_tomo_shift_angles = 0
angle_end = start_angle + 360
for angle in np.linspace(
start_angle + _tomo_shift_angles,
angle_end,
num=int(360 / self.tomo_angle_stepsize) + 1,
endpoint=True,
):
successful = False
if 0 <= angle < 360.05:
print(f"Starting LamNI scan for angle {angle}")
while not successful:
self._start_beam_check()
self.tomo_scan_projection(angle)
if self._was_beam_okay():
successful = True
else:
self._wait_for_beamline_checks()
self.tomo_reconstruct()
tomo_id = 0
with open(
os.path.expanduser("~/Data10/specES1/dat-files/tomography_scannumbers.txt"),
"a+",
) as out_file:
# pylint: disable=undefined-variable
out_file.write(
f"{bec.queue.next_scan_number-1} {angle} {dev.lsamrot.read()['lsamrot']['value']} {self.tomo_id} {subtomo_number} {0} {'lamni'}\n"
)
def tomo_scan(self, subtomo_start=1, start_angle=None):
"""start a tomo scan"""
bec = builtins.__dict__.get("bec")
if subtomo_start == 1 and start_angle is None:
# pylint: disable=undefined-variable
self.tomo_id = self.add_sample_database(
"bec_test_sample",
str(datetime.date.today()),
"e20588",
bec.queue.next_scan_number,
"lamni",
"test additional info",
"BEC",
)
for ii in range(subtomo_start, 9):
self.sub_tomo_scan(ii, start_angle=start_angle)
start_angle = None
def tomo_parameters(self):
"""print and update the tomo parameters"""
print("Current settings:")
print(f"Counting time <ctime> = {self.tomo_countingtime} s")
print(f"Stepsize microns <step> = {self.tomo_shellstep}")
print(
f"Piezo range (max 80) <microns> = {self.lamni_piezo_range_x}, {self.lamni_piezo_range_y}"
)
print(f"Stitching number x,y = {self.lamni_stitch_x}, {self.lamni_stitch_y}")
print(f"Stitching overlap = {self.tomo_stitch_overlap}")
print(f"Circuilar FOV diam <microns> = {self.tomo_circfov}")
print(f"Reconstruction queue name = {self.ptycho_reconstruct_foldername}")
print(
"For information, fov offset is rotating and finding the ROI, manual shift moves rotation center"
)
print(f" _tomo_fovx_offset <mm> = {self.tomo_fovx_offset}")
print(f" _tomo_fovy_offset <mm> = {self.tomo_fovy_offset}")
print(f" _manual_shift_x <mm> = {self.manual_shift_x}")
print(f" _manual_shift_y <mm> = {self.manual_shift_y}")
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
print(f"Resulting in number of projections: {360/self.tomo_angle_stepsize*8}\n")
user_input = input("Are these parameters correctly set for your scan? ")
if user_input == "y":
print("good then")
else:
self.tomo_countingtime = self._get_val("<ctime> s", self.tomo_countingtime, float)
self.tomo_shellstep = self._get_val("<step size> um", self.tomo_shellstep, float)
self.lamni_piezo_range_x = self._get_val(
"<piezo range X (max 80)> um", self.lamni_piezo_range_x, float
)
self.lamni_piezo_range_y = self._get_val(
"<piezo range Y (max 80)> um", self.lamni_piezo_range_y, float
)
self.lamni_stitch_x = self._get_val("<stitch X>", self.lamni_stitch_x, int)
self.lamni_stitch_y = self._get_val("<stitch Y>", self.lamni_stitch_y, int)
self.tomo_circfov = self._get_val("<circular FOV> um", self.tomo_circfov, float)
self.ptycho_reconstruct_foldername = self._get_val(
"Reconstruction queue ", self.ptycho_reconstruct_foldername, str
)
tomo_numberofprojections = self._get_val(
"Number of projections", 360 / self.tomo_angle_stepsize * 8, int
)
print(f"The angular step will be {360/tomo_numberofprojections}")
self.tomo_angle_stepsize = 360 / tomo_numberofprojections * 8
print(f"The angular step in a subtomogram it will be {self.tomo_angle_stepsize}")
@staticmethod
def _get_val(msg: str, default_value, data_type):
return data_type(input(f"{msg} ({default_value}): ") or default_value)
def tomo_reconstruct(self, base_path="~/Data10/specES1"):
"""write the tomo reconstruct file for the reconstruction queue"""
bec = builtins.__dict__.get("bec")
base_path = os.path.expanduser(base_path)
ptycho_queue_path = Path(os.path.join(base_path, self.ptycho_reconstruct_foldername))
ptycho_queue_path.mkdir(parents=True, exist_ok=True)
# pylint: disable=undefined-variable
last_scan_number = bec.queue.next_scan_number - 1
ptycho_queue_file = os.path.abspath(
os.path.join(ptycho_queue_path, f"scan_{last_scan_number:05d}.dat")
)
with open(ptycho_queue_file, "w") as queue_file:
scans = " ".join([str(scan) for scan in self._current_scan_list])
queue_file.write(f"p.scan_number {scans}\n")
queue_file.write(f"p.check_nextscan_started 1\n")
def write_pdf_report(self):
"""create and write the pdf report with the current LamNI settings"""
dev = builtins.__dict__.get("dev")
header = (
" \n" * 3
+ " ::: ::: ::: ::: :::: ::: ::::::::::: \n"
+ " :+: :+: :+: :+:+: :+:+: :+:+: :+: :+: \n"
+ " +:+ +:+ +:+ +:+ +:+:+ +:+ :+:+:+ +:+ +:+ \n"
+ " +#+ +#++:++#++: +#+ +:+ +#+ +#+ +:+ +#+ +#+ \n"
+ " +#+ +#+ +#+ +#+ +#+ +#+ +#+#+# +#+ \n"
+ " #+# #+# #+# #+# #+# #+# #+#+# #+# \n"
+ " ########## ### ### ### ### ### #### ########### \n"
)
padding = 20
piezo_range = f"{self.lamni_piezo_range_x:.2f}/{self.lamni_piezo_range_y:.2f}"
stitching = f"{self.lamni_stitch_x:.2f}/{self.lamni_stitch_y:.2f}"
dataset_id = str(self.client.queue.next_dataset_number)
# pylint: disable=undefined-variable
content = (
f"{'Sample Name:':<{padding}}{'test':>{padding}}\n",
f"{'Measurement ID:':<{padding}}{str(self.tomo_id):>{padding}}\n",
f"{'Dataset ID:':<{padding}}{dataset_id:>{padding}}\n",
f"{'Sample Info:':<{padding}}{'Sample Info':>{padding}}\n",
f"{'e-account:':<{padding}}{str(self.client.username):>{padding}}\n",
f"{'Number of projections:':<{padding}}{int(360 / self.tomo_angle_stepsize * 8):>{padding}}\n",
f"{'First scan number:':<{padding}}{self.client.queue.next_scan_number:>{padding}}\n",
f"{'Last scan number approx.:':<{padding}}{self.client.queue.next_scan_number + int(360 / self.tomo_angle_stepsize * 8) + 10:>{padding}}\n",
f"{'Current photon energy:':<{padding}}{dev.mokev.read(cached=True)['mokev']['value']:>{padding}.4f}\n",
f"{'Exposure time:':<{padding}}{self.tomo_countingtime:>{padding}.2f}\n",
f"{'Fermat spiral step size:':<{padding}}{self.tomo_shellstep:>{padding}.2f}\n",
f"{'Piezo range (FOV sample plane):':<{padding}}{piezo_range>{padding}}\n",
f"{'Restriction to circular FOV:':<{padding}}{self.tomo_circfov:>{padding}.2f}\n",
f"{'Stitching:':<{padding}}{stitching:>{padding}}\n",
f"{'Number of individual sub-tomograms:':<{padding}}{8:>{padding}}\n",
f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n",
)
with PDFWriter(os.path.expanduser("~/Data10/")) as file:
file.write(header)
file.write(content)
msg = LogbookMessage(self.client.logbook)
logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "lamni_logo.png")
msg.add_file(logo_path).add_text("".join(content).replace("\n", "</p><p>")).add_tag(
["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "LamNI"]
)
self.client.logbook.send_logbook_message(msg)

View File

@@ -0,0 +1 @@
from .cSAXS_beamline import fshopen, fshclose, fshstatus, epics_get, epics_put

View File

@@ -0,0 +1,28 @@
import epics
def epics_put(channel, value):
epics.caput(channel, value)
def epics_get(channel):
return epics.caget(channel)
def fshon():
pass
def fshopen():
"""open the fast shutter"""
epics_put("X12SA-ES1-TTL:OUT_01", 1)
def fshclose():
"""close the fast shutter"""
epics_put("X12SA-ES1-TTL:OUT_01", 0)
def fshstatus():
"""show the fast shutter status"""
return epics_get("X12SA-ES1-TTL:OUT_01")

BIN
scan_server/.DS_Store vendored Normal file

Binary file not shown.

View File

@@ -0,0 +1,451 @@
"""
SCAN PLUGINS
All new scans should be derived from ScanBase. ScanBase provides various methods that can be customized and overriden
but they are executed in a specific order:
- self.initialize # initialize the class if needed
- self.read_scan_motors # used to retrieve the start position (and the relative position shift if needed)
- self.prepare_positions # prepare the positions for the scan. The preparation is split into multiple sub fuctions:
- self._calculate_positions # calculate the positions
- self._set_positions_offset # apply the previously retrieved scan position shift (if needed)
- self._check_limits # tests to ensure the limits won't be reached
- self.open_scan # send an open_scan message including the scan name, the number of points and the scan motor names
- self.stage # stage all devices for the upcoming acquisiton
- self.run_baseline_readings # read all devices to get a baseline for the upcoming scan
- self.scan_core # run a loop over all position
- self._at_each_point(ind, pos) # called at each position with the current index and the target positions as arguments
- self.finalize # clean up the scan, e.g. move back to the start position; wait everything to finish
- self.unstage # unstage all devices that have been staged before
- self.cleanup # send a close scan message and perform additional cleanups if needed
"""
import time
import numpy as np
from bec_utils import BECMessage, MessageEndpoints, bec_logger
from scan_server.scans import RequestBase, ScanArgType, ScanBase
MOVEMENT_SCALE_X = np.sin(np.radians(15)) * np.cos(np.radians(30))
MOVEMENT_SCALE_Y = np.cos(np.radians(15))
logger = bec_logger.logger
def lamni_to_stage_coordinates(x: float, y: float) -> tuple:
"""convert from lamni coordinates to stage coordinates"""
y_stage = y / MOVEMENT_SCALE_Y
x_stage = 2 * (x - y_stage * MOVEMENT_SCALE_X)
return (x_stage, y_stage)
def lamni_from_stage_coordinates(x_stage: float, y_stage: float) -> tuple:
"""convert to lamni coordinates from stage coordinates"""
x = x_stage * 0.5 + y_stage * MOVEMENT_SCALE_X
y = y_stage * MOVEMENT_SCALE_Y
return (x, y)
class LamNIMixin:
def _lamni_compute_scan_center(self, x, y, angle_deg):
# assuming a scan point was found at interferometer x,y at zero degrees
# this function computes the new interferometer coordinates of this spot
# at a different rotation angle based on the lamni geometry
alpha = angle_deg / 180 * np.pi
stage_x, stage_y = lamni_to_stage_coordinates(x, y)
stage_x_rot = np.cos(alpha) * stage_x - np.sin(alpha) * stage_y
stage_y_rot = np.sin(alpha) * stage_x + np.cos(alpha) * stage_y
return lamni_from_stage_coordinates(stage_x_rot, stage_y_rot)
def lamni_new_scan_center_interferometer(self, x, y):
"""move to new scan center. xy in mm"""
lsamx_user_params = self.device_manager.devices.lsamx.user_parameter
if lsamx_user_params is None or lsamx_user_params.get("center") is None:
raise RuntimeError("lsamx center is not defined")
lsamy_user_params = self.device_manager.devices.lsamy.user_parameter
if lsamy_user_params is None or lsamy_user_params.get("center") is None:
raise RuntimeError("lsamy center is not defined")
lsamx_center = lsamx_user_params.get("center")
lsamy_center = lsamy_user_params.get("center")
# could first check if feedback is enabled
yield from self.stubs.send_rpc_and_wait("rtx", "controller.feedback_disable")
time.sleep(0.05)
rtx_current = yield from self.stubs.send_rpc_and_wait("rtx", "readback.get")
rty_current = yield from self.stubs.send_rpc_and_wait("rty", "readback.get")
lsamx_current = yield from self.stubs.send_rpc_and_wait("lsamx", "readback.get")
lsamy_current = yield from self.stubs.send_rpc_and_wait("lsamy", "readback.get")
x_stage, y_stage = lamni_to_stage_coordinates(x, y)
x_center_expect, y_center_expect = lamni_from_stage_coordinates(
lsamx_current - lsamx_center, lsamy_current - lsamy_center
)
# in microns
x_drift = x_center_expect * 1000 - rtx_current
y_drift = y_center_expect * 1000 - rty_current
logger.info(f"Current uncompensated drift of setup is x={x_drift:.3f}, y={y_drift:.3f}")
move_x = x_stage + lsamx_center + lamni_to_stage_coordinates(x_drift, y_drift)[0] / 1000
move_y = y_stage + lsamy_center + lamni_to_stage_coordinates(x_drift, y_drift)[1] / 1000
coarse_move_req_x = np.abs(lsamx_current - move_x)
coarse_move_req_y = np.abs(lsamy_current - move_y)
self.device_manager.devices.lsamx.enabled_set = True
self.device_manager.devices.lsamy.enabled_set = True
if (
np.abs(y_drift) > 150
or np.abs(x_drift) > 150
or (coarse_move_req_y < 0.003 and coarse_move_req_x < 0.003)
):
logger.info("No drift correction.")
else:
logger.info(
f"Compensating {[val/1000 for val in lamni_to_stage_coordinates(x_drift,y_drift)]}"
)
yield from self.stubs.set_and_wait(
device=["lsamx", "lsamy"], positions=[move_x, move_y]
)
time.sleep(0.01)
rtx_current = yield from self.stubs.send_rpc_and_wait("rtx", "readback.get")
rty_current = yield from self.stubs.send_rpc_and_wait("rty", "readback.get")
logger.info(f"New scan center interferometer {rtx_current:.3f}, {rty_current:.3f} microns")
# second iteration
x_center_expect, y_center_expect = lamni_from_stage_coordinates(x_stage, y_stage)
# in microns
x_drift2 = x_center_expect * 1000 - rtx_current
y_drift2 = y_center_expect * 1000 - rty_current
logger.info(
f"Uncompensated drift of setup after first iteration is x={x_drift2:.3f}, y={y_drift2:.3f}"
)
if np.abs(x_drift2) > 5 or np.abs(y_drift2) > 5:
logger.info(
f"Compensating second iteration {[val/1000 for val in lamni_to_stage_coordinates(x_drift2,y_drift2)]}"
)
move_x = (
x_stage
+ lsamx_center
+ lamni_to_stage_coordinates(x_drift, y_drift)[0] / 1000
+ lamni_to_stage_coordinates(x_drift2, y_drift2)[0] / 1000
)
move_y = (
y_stage
+ lsamy_center
+ lamni_to_stage_coordinates(x_drift, y_drift)[1] / 1000
+ lamni_to_stage_coordinates(x_drift2, y_drift2)[1] / 1000
)
yield from self.stubs.set_and_wait(
device=["lsamx", "lsamy"], positions=[move_x, move_y]
)
time.sleep(0.01)
rtx_current = yield from self.stubs.send_rpc_and_wait("rtx", "readback.get")
rty_current = yield from self.stubs.send_rpc_and_wait("rty", "readback.get")
logger.info(
f"New scan center interferometer after second iteration {rtx_current:.3f}, {rty_current:.3f} microns"
)
x_drift2 = x_center_expect * 1000 - rtx_current
y_drift2 = y_center_expect * 1000 - rty_current
logger.info(
f"Uncompensated drift of setup after second iteration is x={x_drift2:.3f}, y={y_drift2:.3f}"
)
else:
logger.info("No second iteration required")
self.device_manager.devices.lsamx.enabled_set = False
self.device_manager.devices.lsamy.enabled_set = False
yield from self.stubs.send_rpc_and_wait("rtx", "controller.feedback_enable_without_reset")
class LamNIMoveToScanCenter(RequestBase, LamNIMixin):
scan_name = "lamni_move_to_scan_center"
scan_report_hint = None
scan_type = "step"
required_kwargs = []
arg_input = [ScanArgType.FLOAT, ScanArgType.FLOAT, ScanArgType.FLOAT]
arg_bundle_size = None
def __init__(self, *args, parameter=None, **kwargs):
"""
Move LamNI to a new scan center.
Args:
*args: shift x, shift y, tomo angle in deg
Examples:
>>> scans.lamni_move_to_scan_center(1.2, 2.8, 12.5)
"""
super().__init__(parameter=parameter, **kwargs)
def run(self):
center_x, center_y = self._lamni_compute_scan_center(*self.caller_args)
yield from self.lamni_new_scan_center_interferometer(center_x, center_y)
class LamNIFermatScan(ScanBase, LamNIMixin):
scan_name = "lamni_fermat_scan"
scan_report_hint = "table"
scan_type = "step"
required_kwargs = ["fov_size", "exp_time", "step", "angle"]
arg_input = []
arg_bundle_size = None
def __init__(self, *args, parameter=None, **kwargs):
"""
A LamNI scan following Fermat's spiral.
Kwargs:
fov_size [um]: Fov in the piezo plane (i.e. piezo range). Max 80 um
step [um]: stepsize
shift_x/y [mm]: extra shift in x/y. The shift is directly applied to the scan. It will not be auto rotated. (default 0).
center_x/center_y [mm]: center position in x/y at 0 deg. This shift is rotated
using the geometry of LamNI
It is determined by the first 'click' in the x-ray eye alignemnt procedure
angle [deg]: rotation angle (will rotate first)
scan_type: fly (i.e. HW triggered step in case of LamNI) or step
stitch_x/y: shift scan to adjacent stitch region
fov_circular [um]: generate a circular field of view in the sample plane. This is an additional cropping to fov_size.
stitch_overlap [um]: overlap of the stitched regions
Returns:
Examples:
>>> scans.lamni_fermat_scan(fov_size=[20], step=0.5, exp_time=0.1)
>>> scans.lamni_fermat_scan(fov_size=[20, 25], center_x=0.02, center_y=0, shift_x=0, shift_y=0, angle=0, step=0.5, fov_circular=0, exp_time=0.1)
"""
super().__init__(parameter=parameter, **kwargs)
self.axis = []
scan_kwargs = parameter.get("kwargs", {})
self.fov_size = scan_kwargs.get("fov_size")
if len(self.fov_size) == 1:
self.fov_size *= 2 # if we only have one argument, let's assume it's a square
self.step = scan_kwargs.get("step", 0.1)
self.center_x = scan_kwargs.get("center_x", 0)
self.center_y = scan_kwargs.get("center_y", 0)
self.shift_x = scan_kwargs.get("shift_x", 0)
self.shift_y = scan_kwargs.get("shift_y", 0)
self.angle = scan_kwargs.get("angle", 0)
self.scan_type = scan_kwargs.get("scan_type", "fly")
self.stitch_x = scan_kwargs.get("stitch_x", 0)
self.stitch_y = scan_kwargs.get("stitch_y", 0)
self.fov_circular = scan_kwargs.get("fov_circular", 0)
self.stitch_overlap = scan_kwargs.get("stitch_overlap", 1)
# self.keep_plot = scan_kwargs.get("keep_plot", 0)
# self.optim_trajectory = scan_kwargs.get("optim_trajectory", "corridor")
def initialize(self):
self.scan_motors = ["rtx", "rty"]
def prepare_positions(self):
self._calculate_positions()
self._optimize_trajectory()
# self._sort_positions()
self.num_pos = len(self.positions)
def _lamni_check_pos_in_fov_range_and_circ_fov(self, x, y) -> bool:
# this function checks if positions are reachable in a scan
# these x y intererometer positions are not shifted to the scan center
# so its purpose is to see if the position is reachable by the
# rotated piezo stage. For a scan these positions have to be shifted to
# the current scan center before starting the scan
stage_x, stage_y = lamni_to_stage_coordinates(x, y)
stage_x_with_stitch, stage_y_with_stitch = self._lamni_compute_stitch_center(
self.stitch_x, self.stitch_y, self.angle
)
stage_x_with_stitch, stage_y_with_stitch = lamni_to_stage_coordinates(
stage_x_with_stitch, stage_y_with_stitch
)
# piezo stage is currently rotated to stage_angle_deg in degrees
# rotate positions to the piezo stage system
alpha = (self.angle - 300 + 30.5) / 180 * np.pi
stage_x_rot = np.cos(alpha) * stage_x + np.sin(alpha) * stage_y
stage_y_rot = -np.sin(alpha) * stage_x + np.cos(alpha) * stage_y
stage_x_rot_with_stitch = (
np.cos(alpha) * stage_x_with_stitch + np.sin(alpha) * stage_y_with_stitch
)
stage_y_rot_with_stitch = (
-np.sin(alpha) * stage_x_with_stitch + np.cos(alpha) * stage_y_with_stitch
)
return (
np.abs(stage_x_rot) <= (self.fov_size[1] / 2)
and np.abs(stage_y_rot) <= (self.fov_size[0] / 2)
and (
self.fov_circular == 0
or (
np.power((stage_x_rot_with_stitch + stage_x_rot), 2)
+ np.power((stage_y_rot_with_stitch + stage_y_rot), 2)
)
<= pow((self.fov_circular / 2), 2)
)
)
def _prepare_setup(self):
yield from self.stubs.send_rpc_and_wait("rtx", "controller.clear_trajectory_generator")
yield from self.lamni_rotation(self.angle)
total_shift_x, total_shift_y = self._compute_total_shift()
yield from self.lamni_new_scan_center_interferometer(total_shift_x, total_shift_y)
# self._plot_target_pos()
if self.scan_type == "fly":
yield from self._transfer_positions_to_LamNI()
# def _plot_target_pos(self):
# # return
# plt.plot(self.positions[:, 0], self.positions[:, 1], alpha=0.2)
# plt.scatter(self.positions[:, 0], self.positions[:, 1])
# plt.savefig("mygraph.png")
# if not self.keep_plot:
# plt.clf()
# # plt.show()
def _transfer_positions_to_LamNI(self):
yield from self.stubs.send_rpc_and_wait(
"rtx", "controller.add_pos_to_scan", (self.positions.tolist(),)
)
def _calculate_positions(self):
self.positions = self.get_lamni_fermat_spiral_pos(
-np.abs(self.fov_size[0] / 2),
np.abs(self.fov_size[0] / 2),
-np.abs(self.fov_size[1] / 2),
np.abs(self.fov_size[1] / 2),
step=self.step,
spiral_type=0,
center=False,
)
def _lamni_compute_stitch_center(self, xcount, ycount, angle_deg):
alpha = angle_deg / 180 * np.pi
stage_x = xcount * (self.fov_size[0] - self.stitch_overlap)
stage_y = ycount * (self.fov_size[1] - self.stitch_overlap)
x_rot = np.cos(alpha) * stage_x - np.sin(alpha) * stage_y
y_rot = np.sin(alpha) * stage_x + np.cos(alpha) * stage_y
return lamni_from_stage_coordinates(x_rot, y_rot)
def _compute_total_shift(self):
_shfitx, _shfity = self._lamni_compute_scan_center(self.center_x, self.center_y, self.angle)
x_stitch_shift, y_stitch_shift = self._lamni_compute_stitch_center(
self.stitch_x, self.stitch_y, self.angle
)
logger.info(
f"Total shift [mm] {_shfitx+x_stitch_shift/1000+self.shift_x}, {_shfity+y_stitch_shift/1000+self.shift_y}"
)
return (
_shfitx + x_stitch_shift / 1000 + self.shift_x,
_shfity + y_stitch_shift / 1000 + self.shift_y,
)
def get_lamni_fermat_spiral_pos(
self, m1_start, m1_stop, m2_start, m2_stop, step=1, spiral_type=0, center=False
):
"""[summary]
Args:
m1_start (float): start position motor 1
m1_stop (float): end position motor 1
m2_start (float): start position motor 2
m2_stop (float): end position motor 2
step (float, optional): Step size. Defaults to 1.
spiral_type (float, optional): Angular offset in radians that determines the shape of the spiral.
A spiral with spiral_type=2 is the same as spiral_type=0. Defaults to 0.
center (bool, optional): Add a center point. Defaults to False.
Raises:
TypeError: [description]
TypeError: [description]
TypeError: [description]
Returns:
[type]: [description]
Yields:
[type]: [description]
"""
positions = []
phi = 2 * np.pi * ((1 + np.sqrt(5)) / 2.0) + spiral_type * np.pi
start = int(not center)
length_axis1 = np.abs(m1_stop - m1_start)
length_axis2 = np.abs(m2_stop - m2_start)
n_max = int(length_axis1 * length_axis2 * 3.2 / step / step)
total_shift_x, total_shift_y = self._compute_total_shift()
for ii in range(start, n_max):
radius = step * 0.57 * np.sqrt(ii)
# FOV is restructed below at check pos in range
# if abs(radius * np.sin(ii * phi)) > length_axis1 / 2:
# continue
# if abs(radius * np.cos(ii * phi)) > length_axis2 / 2:
# continue
x = radius * np.sin(ii * phi)
y = radius * np.cos(ii * phi)
if self._lamni_check_pos_in_fov_range_and_circ_fov(x, y):
positions.extend([(x + total_shift_x * 1000, y + total_shift_y * 1000)])
# for testing we just shift by center_i and prepare also the setup to center_i
return np.array(positions)
def lamni_rotation(self, angle):
# get last setpoint (cannot be based on pos get because they will deviate slightly)
lsamrot_current_setpoint = yield from self.stubs.send_rpc_and_wait(
"lsamrot", "user_setpoint.get"
)
if angle == lsamrot_current_setpoint:
logger.info("No rotation required")
else:
logger.info("Rotating to requested angle")
yield from self.stubs.set_and_wait(device=["lsamrot"], positions=[angle])
def scan_core(self):
if self.scan_type == "step":
for ind, pos in self._get_position():
for self.burst_index in range(self.burst_at_each_point):
yield from self._at_each_point(ind, pos)
self.burst_index = 0
elif self.scan_type == "fly":
# use a device message to receive the scan number and
# scan ID before sending the message to the device server
yield from self.stubs.kickoff(device="rtx")
while True:
yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary")
msg = self.device_manager.producer.get(MessageEndpoints.device_status("rt_scan"))
if msg:
status = BECMessage.DeviceStatusMessage.loads(msg)
if status.content.get("status", 1) == 0 and self.metadata.get(
"RID"
) == status.metadata.get("RID"):
break
time.sleep(1)
logger.debug("reading monitors")
# yield from self.device_rpc("rtx", "controller.kickoff")
def run(self):
self.initialize()
yield from self.read_scan_motors()
self.prepare_positions()
yield from self._prepare_setup()
yield from self.open_scan()
yield from self.stage()
yield from self.run_baseline_reading()
yield from self.scan_core()
yield from self.finalize()
yield from self.unstage()
yield from self.cleanup()

150
scibec/lamni_config.py Normal file
View File

@@ -0,0 +1,150 @@
import yaml
from .config import DemoConfig, X12SAConfig
class LamNIConfig(DemoConfig, X12SAConfig):
def run(self):
# self.write_galil_motors()
# self.write_rt_motors()
# self.write_smaract_motors()
# self.write_eiger1p5m()
self.write_x12sa_status()
self.write_sls_status()
self.load_csaxs_config()
# self.write_sim_user_motors()
# self.write_sim_beamline_motors()
# self.write_sim_beamline_monitors()
def write_galil_motors(self):
lamni_galil_motors = [
("lsamx", "A", -1, 0.5, {"center": 8.768000}),
("lsamy", "B", 1, 0.5, {"center": 10.041000}),
("lsamrot", "C", 1, 0.5, {}),
("loptz", "D", -1, 0.5, {}),
("loptx", "E", 1, 0.5, {"in": -0.8380, "out": -0.699}),
("lopty", "F", 1, 0.5, {"in": 3.3540, "out": 3.53}),
("leyex", "G", -1, 0.001, {"in": 14.117000}),
("leyey", "H", -1, 0.001, {"in": 48.069000, "out": 0.5}),
]
out = {}
for m in lamni_galil_motors:
out[m[0]] = dict(
{
"status": {"enabled": True, "enabled_set": True},
"deviceClass": "GalilMotor",
"deviceConfig": {
"axis_Id": m[1],
"name": m[0],
"labels": m[0],
"host": "mpc2680.psi.ch",
"port": 8081,
"sign": m[2],
"limits": [0, 0],
"tolerance": m[3],
"device_access": True,
"device_mapping": {"rt": "rtx"},
},
"acquisitionConfig": {
"schedule": "sync",
"acquisitionGroup": "motor",
"readoutPriority": "baseline",
},
"deviceTags": ["lamni"],
}
)
if m[4]:
out[m[0]]["userParameter"] = m[4]
self.write_section(out, "LamNI Galil motors")
def write_rt_motors(self):
lamni_rt_motors = [
("rtx", "A", 1),
("rty", "B", 1),
]
out = dict()
for m in lamni_rt_motors:
out[m[0]] = dict(
{
"status": {"enabled": True, "enabled_set": True},
"deviceClass": "RtLamniMotor",
"deviceConfig": {
"axis_Id": m[1],
"name": m[0],
"labels": m[0],
"host": "mpc2680.psi.ch",
"port": 3333,
"limits": [0, 0],
"sign": m[2],
"device_access": True,
},
"acquisitionConfig": {
"schedule": "sync",
"acquisitionGroup": "motor",
"readoutPriority": "baseline",
},
"deviceTags": ["lamni"],
}
)
self.write_section(out, "LamNI RT")
def write_smaract_motors(self):
lamni_smaract_motors = [
("losax", "A", -1, {"in": -0.848000}),
("losay", "B", -1, {"in": 0.135000, "out": 3.8}),
("losaz", "C", 1, {"in": -1, "out": -3}),
("lcsx", "D", -1, {}),
("lcsy", "E", -1, {}),
]
out = dict()
for m in lamni_smaract_motors:
out[m[0]] = dict(
{
"status": {"enabled": True, "enabled_set": True},
"deviceClass": "SmaractMotor",
"deviceConfig": {
"axis_Id": m[1],
"name": m[0],
"labels": m[0],
"host": "mpc2680.psi.ch",
"port": 8085,
"limits": [0, 0],
"sign": m[2],
"tolerance": 0.05,
},
"acquisitionConfig": {
"schedule": "sync",
"acquisitionGroup": "motor",
"readoutPriority": "baseline",
},
"deviceTags": ["lamni"],
}
)
if m[3]:
out[m[0]]["userParameter"] = m[3]
self.write_section(out, "LamNI SmarAct motors")
def write_eiger1p5m(self):
out = {
"eiger1p5m": {
"description": "Eiger 1.5M in vacuum detector, in-house developed, PSI",
"status": {"enabled": True, "enabled_set": True},
"deviceClass": "Eiger1p5MDetector",
"deviceConfig": {"device_access": True, "name": "eiger1p5m"},
"acquisitionConfig": {
"schedule": "sync",
"acquisitionGroup": "detector",
"readoutPriority": "monitored",
},
"deviceTags": ["detector"],
}
}
self.write_section(out, "LamNI Eiger 1.5M in vacuum")
def load_csaxs_config(self):
CONFIG_PATH = "./init_scibec/configs/test_config_cSAXS.yaml"
content = {}
with open(CONFIG_PATH, "r") as csaxs_config_file:
content = yaml.safe_load(csaxs_config_file.read())
self.write_section(content, "Default cSAXS config")

File diff suppressed because it is too large Load Diff