bec/scan_server/scan_plugins/LamNIFermatScan.py
2022-07-20 16:51:05 +02:00

347 lines
14 KiB
Python

"""
SCAN PLUGINS
All new scans should be derived from ScanBase. ScanBase provides various methods that can be customized and overriden
but they are executed in a specific order:
- self.initialize # initialize the class if needed
- self.read_scan_motors # used to retrieve the start position (and the relative position shift if needed)
- self.prepare_positions # prepare the positions for the scan. The preparation is split into multiple sub fuctions:
- self._calculate_positions # calculate the positions
- self._set_positions_offset # apply the previously retrieved scan position shift (if needed)
- self._check_limits # tests to ensure the limits won't be reached
- self.open_scan # send an open_scan message including the scan name, the number of points and the scan motor names
- self.stage # stage all devices for the upcoming acquisiton
- self.run_baseline_readings # read all devices to get a baseline for the upcoming scan
- self.scan_core # run a loop over all position
- self._at_each_point(ind, pos) # called at each position with the current index and the target positions as arguments
- self.finalize # clean up the scan, e.g. move back to the start position; wait everything to finish
- self.unstage # unstage all devices that have been staged before
- self.cleanup # send a close scan message and perform additional cleanups if needed
"""
import time
import uuid
import numpy as np
from bec_utils import BECMessage, MessageEndpoints, bec_logger
from scan_server.scans import ScanBase
import matplotlib.pyplot as plt
MOVEMENT_SCALE_X = np.sin(np.radians(15)) * np.cos(np.radians(30))
MOVEMENT_SCALE_Y = np.cos(np.radians(15))
logger = bec_logger.logger
def lamni_to_stage_coordinates(x: float, y: float) -> tuple:
"""convert from lamni coordinates to stage coordinates"""
y_stage = y / MOVEMENT_SCALE_Y
x_stage = 2 * (x - y_stage * MOVEMENT_SCALE_X)
return (x_stage, y_stage)
def lamni_from_stage_coordinates(x_stage: float, y_stage: float) -> tuple:
"""convert to lamni coordinates from stage coordinates"""
x = x_stage * 0.5 + y_stage * MOVEMENT_SCALE_X
y = y_stage * MOVEMENT_SCALE_Y
return (x, y)
class LamNIFermatScan(ScanBase):
scan_name = "lamni_fermat_scan"
scan_report_hint = "table"
required_kwargs = ["fov_size", "exp_time", "step", "angle"]
arg_input = []
arg_bundle_size = None
def __init__(self, *args, parameter=None, **kwargs):
"""
A LamNI scan following Fermat's spiral.
Kwargs:
shift_x: extra shift in x. The shift will not be rotated. (default 0).
shift_y: extra shift in y. The shift will not be rotated. (default 0).
center_x: center position in x at 0 deg. (optional)
center_y: center position in y at 0 deg. (optional)
angle: rotation angle (will rotate first)
Returns:
Examples:
>>> scans.lamni_fermat_scan(fov_size=[20], step=0.5, exp_time=0.1)
>>> scans.lamni_fermat_scan(fov_size=[20, 25], center_x=20, step=0.5, exp_time=0.1)
"""
super().__init__(parameter=parameter, **kwargs)
self.axis = []
scan_kwargs = parameter.get("kwargs", {})
self.fov_size = scan_kwargs.get("fov_size")
if len(self.fov_size) == 1:
self.fov_size *= 2 # if we only have one argument, let's assume it's a square
self.step = scan_kwargs.get("step", 0.1)
self.center_x = scan_kwargs.get("center_x", 0)
self.center_y = scan_kwargs.get("center_y", 0)
self.shift_x = scan_kwargs.get("shift_x", 0)
self.shift_y = scan_kwargs.get("shift_y", 0)
self.angle = scan_kwargs.get("angle", 0)
def initialize(self):
self.scan_motors = ["rtx", "rty"]
def prepare_positions(self):
self._calculate_positions()
# self._sort_positions()
# self.shift()
self.num_pos = len(self.positions)
def _lamni_check_pos_in_piezo_range(self, x, y) -> bool:
# this function checks if positions are reachable in a scan
# these x y intererometer positions are not shifted to the scan center
# so its purpose is to see if the position is reachable by the
# rotated piezo stage. For a scan these positions have to be shifted to
# the current scan center before starting the scan
stage_x, stage_y = lamni_to_stage_coordinates(x, y)
# piezo stage is currently rotated to stage_angle_deg in degrees
# rotate positions to the piezo stage system
alpha = (self.angle - 300 + 30.5) / 180 * np.pi
stage_x_rot = np.cos(alpha) * stage_x + np.sin(alpha) * stage_y
stage_y_rot = -np.sin(alpha) * stage_x + np.cos(alpha) * stage_y
_lamni_piezo_range = 20
return np.abs(stage_x_rot) <= (_lamni_piezo_range / 2) and np.abs(stage_y_rot) <= (
_lamni_piezo_range / 2
)
def _prepare_setup(self):
yield from self.device_rpc("rtx", "controller.clear_trajectory_generator")
yield from self.lamni_rotation(self.angle)
yield from self.lamni_new_scan_center_interferometer(self.center_x, self.center_y)
yield from self._plot_target_pos()
# yield from self._transfer_positions_to_LamNI()
# start HW scan p _rt_put_and_receive(sprintf("sd"))
# time.sleep(30)
def _plot_target_pos(self):
plt.plot(self.positions)
plt.show()
def _transfer_positions_to_LamNI(self):
for pos in self.positions:
yield from self.device_rpc("rtx", f"controller.add_pos_to_scan", (pos[0], pos[1]))
def _calculate_positions(self):
self.positions = self.get_lamni_fermat_spiral_pos(
-np.abs(self.fov_size[0] / 2),
np.abs(self.fov_size[0] / 2),
-np.abs(self.fov_size[1] / 2),
np.abs(self.fov_size[1] / 2),
step=self.step,
spiral_type=0,
center=False,
)
def get_lamni_fermat_spiral_pos(
self, m1_start, m1_stop, m2_start, m2_stop, step=1, spiral_type=0, center=False
):
"""[summary]
Args:
m1_start (float): start position motor 1
m1_stop (float): end position motor 1
m2_start (float): start position motor 2
m2_stop (float): end position motor 2
step (float, optional): Step size. Defaults to 1.
spiral_type (float, optional): Angular offset in radians that determines the shape of the spiral.
A spiral with spiral_type=2 is the same as spiral_type=0. Defaults to 0.
center (bool, optional): Add a center point. Defaults to False.
Raises:
TypeError: [description]
TypeError: [description]
TypeError: [description]
Returns:
[type]: [description]
Yields:
[type]: [description]
"""
positions = []
phi = 2 * np.pi * ((1 + np.sqrt(5)) / 2.0) + spiral_type * np.pi
start = int(not center)
length_axis1 = np.abs(m1_stop - m1_start)
length_axis2 = np.abs(m2_stop - m2_start)
n_max = int(length_axis1 * length_axis2 * 2)
for ii in range(start, n_max):
radius = step * 0.57 * np.sqrt(ii)
if abs(radius * np.sin(ii * phi)) > length_axis1 / 2:
continue
if abs(radius * np.cos(ii * phi)) > length_axis2 / 2:
continue
x = radius * np.sin(ii * phi)
y = radius * np.cos(ii * phi)
if self._lamni_check_pos_in_piezo_range(x, y):
positions.extend([(x + self.center_x * 1000, y + self.center_y * 1000)])
# for testing we just shift by center_i and prepare also the setup to center_i
return np.array(positions)
def lamni_rotation(self, angle):
# get last setpoint (cannot be based on pos get because they will deviate slightly)
lsamrot_current_setpoint = yield from self.device_rpc("lsamrot", "user_setpoint.get")
if angle == lsamrot_current_setpoint:
logger.info("No rotation required")
else:
logger.info("Rotating to requested angle")
yield from self._move_and_wait_devices(["lsamrot"], [angle])
def lamni_new_scan_center_interferometer(self, x, y):
"""move to new scan center. xy in mm"""
lsamx_center = 8.866
lsamy_center = 10.18
# could first check if feedback is enabled
yield from self.device_rpc("rtx", "controller.feedback_disable")
time.sleep(0.05)
rtx_current = yield from self.device_rpc("rtx", "readback.get")
rty_current = yield from self.device_rpc("rty", "readback.get")
lsamx_current = yield from self.device_rpc("lsamx", "readback.get")
lsamy_current = yield from self.device_rpc("lsamy", "readback.get")
x_stage, y_stage = lamni_to_stage_coordinates(x, y)
x_center_expect, y_center_expect = lamni_from_stage_coordinates(
lsamx_current - lsamx_center, lsamy_current - lsamy_center
)
# in microns
x_drift = x_center_expect * 1000 - rtx_current
y_drift = y_center_expect * 1000 - rty_current
logger.info(f"Current uncompensated drift of setup is x={x_drift:.3f}, y={y_drift:.3f}")
move_x = x_stage + lsamx_center + lamni_to_stage_coordinates(x_drift, y_drift)[0] / 1000
move_y = y_stage + lsamy_center + lamni_to_stage_coordinates(x_drift, y_drift)[1] / 1000
coarse_move_req_x = np.abs(lsamx_current - move_x)
coarse_move_req_y = np.abs(lsamy_current - move_y)
if (
np.abs(y_drift) > 150
or np.abs(x_drift) > 150
or (coarse_move_req_y < 0.003 and coarse_move_req_x < 0.003)
):
logger.info("No drift correction.")
else:
logger.info(
f"Compensating {[val/1000 for val in lamni_to_stage_coordinates(x_drift,y_drift)]}"
)
yield from self._move_and_wait_devices(["lsamx", "lsamy"], [move_x, move_y])
time.sleep(0.01)
rtx_current = yield from self.device_rpc("rtx", "readback.get")
rty_current = yield from self.device_rpc("rty", "readback.get")
logger.info(f"New scan center interferometer {rtx_current:.3f}, {rty_current:.3f} microns")
# second iteration
x_center_expect, y_center_expect = lamni_from_stage_coordinates(x_stage, y_stage)
# in microns
x_drift2 = x_center_expect * 1000 - rtx_current
y_drift2 = y_center_expect * 1000 - rty_current
logger.info(
f"Uncompensated drift of setup after first iteration is x={x_drift2:.3f}, y={y_drift2:.3f}"
)
if np.abs(x_drift2) > 5 or np.abs(y_drift2) > 5:
logger.info(
f"Compensating second iteration {[val/1000 for val in lamni_to_stage_coordinates(x_drift2,y_drift2)]}"
)
move_x = (
x_stage
+ lsamx_center
+ lamni_to_stage_coordinates(x_drift, y_drift)[0] / 1000
+ lamni_to_stage_coordinates(x_drift2, y_drift2)[0] / 1000
)
move_y = (
y_stage
+ lsamy_center
+ lamni_to_stage_coordinates(x_drift, y_drift)[1] / 1000
+ lamni_to_stage_coordinates(x_drift2, y_drift2)[1] / 1000
)
yield from self._move_and_wait_devices(["lsamx", "lsamy"], [move_x, move_y])
time.sleep(0.01)
rtx_current = yield from self.device_rpc("rtx", "readback.get")
rty_current = yield from self.device_rpc("rty", "readback.get")
logger.info(
f"New scan center interferometer after second iteration {rtx_current:.3f}, {rty_current:.3f} microns"
)
x_drift2 = x_center_expect * 1000 - rtx_current
y_drift2 = y_center_expect * 1000 - rty_current
logger.info(
f"Uncompensated drift of setup after second iteration is x={x_drift2:.3f}, y={y_drift2:.3f}"
)
else:
logger.info("No second iteration required")
yield from self.device_rpc("rtx", "controller.feedback_enable_without_reset")
def _move_and_wait_devices(self, devices, pos):
if not isinstance(pos, list) and not isinstance(pos, np.ndarray):
pos = [pos]
for ind, val in enumerate(devices):
yield self.device_msg(
device=val,
action="set",
parameter={
"value": pos[ind],
"group": "scan_motor",
"wait_group": "scan_motor",
},
)
yield self.device_msg(
device=devices,
action="wait",
parameter={
"type": "move",
"group": "scan_motor",
"wait_group": "scan_motor",
},
)
def open_scan(self):
yield self.device_msg(
device=None,
action="open_scan",
parameter={
"primary": self.scan_motors,
"num_points": self.num_pos,
"scan_name": self.scan_name,
"scan_type": "step",
},
)
def run(self):
self.initialize()
yield from self.read_scan_motors()
self.prepare_positions()
yield from self._prepare_setup()
yield from self.open_scan()
yield from self.stage()
yield from self.run_baseline_reading()
yield from self.scan_core()
yield from self.finalize()
yield from self.unstage()
yield from self.cleanup()