omny fermat scan

This commit is contained in:
Holler Mirko
2024-10-02 10:35:24 +02:00
committed by wakonig_k
parent 86e71ca255
commit d7308a8b81
4 changed files with 343 additions and 1 deletions

View File

@@ -7,6 +7,8 @@ from rich.console import Console
from rich.table import Table
from typeguard import typechecked
from bec_lib import bec_logger
logger = bec_logger.logger
class OMNYAlignmentError(Exception):

View File

@@ -72,7 +72,6 @@ rtx:
userParameter:
low_signal: 11000
min_signal: 10000
rt_pid_voltage: -0.06219
rty:
description: OMNY rt
deviceClass: csaxs_bec.devices.omny.rt.rt_omny_ophyd.RtOMNYMotor

View File

@@ -1,4 +1,5 @@
from .flomni_fermat_scan import FlomniFermatScan
from .omny_fermat_scan import OMNYFermatScan
from .LamNIFermatScan import LamNIFermatScan, LamNIMoveToScanCenter
from .owis_grid import OwisGrid
from .sgalil_grid import SgalilGrid

View File

@@ -0,0 +1,340 @@
"""
SCAN PLUGINS
All new scans should be derived from ScanBase. ScanBase provides various methods that can be customized and overriden
but they are executed in a specific order:
- self.initialize # initialize the class if needed
- self.read_scan_motors # used to retrieve the start position (and the relative position shift if needed)
- self.prepare_positions # prepare the positions for the scan. The preparation is split into multiple sub fuctions:
- self._calculate_positions # calculate the positions
- self._set_positions_offset # apply the previously retrieved scan position shift (if needed)
- self._check_limits # tests to ensure the limits won't be reached
- self.open_scan # send an open_scan message including the scan name, the number of points and the scan motor names
- self.stage # stage all devices for the upcoming acquisiton
- self.run_baseline_readings # read all devices to get a baseline for the upcoming scan
- self.scan_core # run a loop over all position
- self._at_each_point(ind, pos) # called at each position with the current index and the target positions as arguments
- self.finalize # clean up the scan, e.g. move back to the start position; wait everything to finish
- self.unstage # unstage all devices that have been staged before
- self.cleanup # send a close scan message and perform additional cleanups if needed
"""
import time
import numpy as np
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
from bec_server.scan_server.errors import ScanAbortion
from bec_server.scan_server.scans import SyncFlyScanBase
logger = bec_logger.logger
class OMNYFermatScan(SyncFlyScanBase):
scan_name = "omny_fermat_scan"
scan_report_hint = "table"
scan_type = "fly"
required_kwargs = ["fov_size", "exp_time", "step", "angle"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
def __init__(
self,
fovx: float,
fovy: float,
cenx: float,
ceny: float,
exp_time: float,
step: float,
zshift: float,
angle: float = None,
corridor_size: float = 3,
parameter: dict = None,
**kwargs,
):
"""
An OMNY scan following Fermat's spiral.
Args:
fovx(float) [um]: Fov in the piezo plane (i.e. piezo range). Max 200 um
fovy(float) [um]: Fov in the piezo plane (i.e. piezo range). Max 100 um
cenx(float) [mm]: center position in x.
ceny(float) [mm]: center position in y.
exp_time(float) [s]: exposure time
step(float) [um]: stepsize
zshift(float) [um]: shift in z
angle(float) [deg]: rotation angle (will rotate first)
corridor_size(float) [um]: corridor size for the corridor optimization. Default 3 um
Returns:
Examples:
>>> scans.omny_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01)
"""
super().__init__(parameter=parameter, **kwargs)
self.axis = []
self.fovx = fovx
self.fovy = fovy
self.cenx = cenx
self.ceny = ceny
self.exp_time = exp_time
self.step = step
self.zshift = zshift
self.angle = angle
self.optim_trajectory = "corridor"
self.optim_trajectory_corridor = corridor_size
if self.fovy > 100:
raise ScanAbortion("The FOV in y must be smaller than 100 um.")
if self.fovx > 200:
raise ScanAbortion("The FOV in x must be smaller than 200 um.")
if self.zshift > 100:
logger.warning("The zshift is larger than 100 um. It will be limited to 100 um.")
self.zshift = 100
if self.zshift < -100:
logger.warning("The zshift is smaller than -100 um. It will be limited to -100 um.")
self.zshift = -100
def initialize(self):
self.scan_motors = []
self.update_readout_priority()
def _optimize_trajectory(self):
self.positions = self.optimize_corridor(
self.positions, corridor_size=self.optim_trajectory_corridor
)
@property
def monitor_sync(self):
return "rt_omny"
def reverse_trajectory(self):
"""
Reverse the trajectory. Every other scan should be reversed to
shorten the movement time. In order to keep the last state, even if the
server is restarted, the state is stored in a global variable in redis.
"""
producer = self.device_manager.producer
msg = producer.get(MessageEndpoints.global_vars("reverse_omny_trajectory"))
if msg:
val = msg.content.get("value", False)
else:
val = False
producer.set(
MessageEndpoints.global_vars("reverse_omny_trajectory"),
messages.VariableMessage(value=(not val)),
)
return val
def prepare_positions(self):
self._calculate_positions()
self._optimize_trajectory()
flip_axes = self.reverse_trajectory()
if flip_axes:
self.positions = np.flipud(self.positions)
self.num_pos = len(self.positions)
self._check_min_positions()
def _check_min_positions(self):
if self.num_pos < 20:
raise ScanAbortion(
f"The number of positions must exceed 20. Currently: {self.num_pos}."
)
def _prepare_setup(self):
yield from self.stubs.send_rpc_and_wait("rtx", "controller.clear_trajectory_generator")
yield from self.omny_rotation(self.angle)
yield from self.stubs.send_rpc_and_wait("rty", "set", self.positions[0][1])
def _prepare_setup_part2(self):
yield from self.stubs.wait(wait_type="move", device="osamroy", wait_group="omny_rotation")
yield from self.stubs.set(
device="rtx", value=self.positions[0][0], wait_group="prepare_setup_part2"
)
yield from self.stubs.set(
device="rtz", value=self.positions[0][2], wait_group="prepare_setup_part2"
)
yield from self.stubs.send_rpc_and_wait("rtx", "controller.laser_tracker_on")
yield from self.stubs.wait(
wait_type="move", device=["rtx", "rtz"], wait_group="prepare_setup_part2"
)
yield from self._transfer_positions_to_omny()
yield from self.omny_osamx_to_scan_center()
def _get_user_param_safe(self, device, var):
param = dev[device].user_parameter
if not param or param.get(var) is None:
raise ValueError(f"Device {device} has no user parameter definition for {var}.")
return param.get(var)
def omny_osamx_to_scan_center(self):
# get last setpoint
osamroy_current_setpoint = yield from self.stubs.send_rpc_and_wait(
"osamroy", "user_setpoint.get"
)
omny_samx_in = self._get_user_param_safe("osamx","in")
if np.fabs(osamroy_current_setpoint-(omny_samx_in+self.cenx/1000)) > 0.025:
logger.info("Moving osamx to scan center")
self.set_device_enabled("osamx", True)
dev.osamx.controller.socket_put_confirmed("axspeed[0]=100")
yield from self.stubs.set(device="osamx", value=self.cenx/1000, wait_group="osamx_mv")
yield from self.stubs.wait(wait_type="move", device="osamx", wait_group="osamx_mv")
self.set_device_enabled("osamx", False)
time.sleep(4)
yield from self.stubs.send_rpc_and_wait("rtx", "controller.laser_tracker_on")
# Todo: auto align tracking at beginning of each scan. currently this is implemented in the client only
# this part is probably just ok with wait on target, which should also re align the tracking in OMNY!
def omny_rotation(self, angle):
# get last setpoint (cannot be based on pos get because they will deviate slightly)
osamroy_current_setpoint = yield from self.stubs.send_rpc_and_wait(
"osamroy", "user_setpoint.get"
)
if angle == osamroy_current_setpoint:
logger.info("No rotation required")
else:
logger.info("Rotating to requested angle")
yield from self.stubs.scan_report_instruction(
{
"readback": {
"RID": self.metadata["RID"],
"devices": ["osamroy"],
"start": [osamroy_current_setpoint],
"end": [angle],
}
}
)
yield from self.stubs.set(device="osamroy", value=angle, wait_group="omny_rotation")
def _transfer_positions_to_omny(self):
yield from self.stubs.send_rpc_and_wait(
"rtx", "controller.add_pos_to_scan", self.positions.tolist()
)
def _calculate_positions(self):
self.positions = self.get_omny_fermat_spiral_pos(
-np.abs(self.fovx / 2),
np.abs(self.fovx / 2),
-np.abs(self.fovy / 2),
np.abs(self.fovy / 2),
step=self.step,
spiral_type=0,
center=False,
)
def get_omny_fermat_spiral_pos(
self, m1_start, m1_stop, m2_start, m2_stop, step=1, spiral_type=0, center=False
):
"""
Calculate positions for a Fermat spiral scan.
Args:
m1_start(float): start position in m1
m1_stop(float): stop position in m1
m2_start(float): start position in m2
m2_stop(float): stop position in m2
step(float): stepsize
spiral_type(int): 0 for traditional Fermat spiral
center(bool): whether to include the center position
Returns:
positions(array): positions
"""
positions = []
phi = 2 * np.pi * ((1 + np.sqrt(5)) / 2.0) + spiral_type * np.pi
start = int(not center)
length_axis1 = np.abs(m1_stop - m1_start)
length_axis2 = np.abs(m2_stop - m2_start)
n_max = int(length_axis1 * length_axis2 * 3.2 / step / step)
z_pos = self.zshift
for ii in range(start, n_max):
radius = step * 0.57 * np.sqrt(ii)
# FOV is restructed below at check pos in range
if abs(radius * np.sin(ii * phi)) > length_axis1 / 2:
continue
if abs(radius * np.cos(ii * phi)) > length_axis2 / 2:
continue
x = radius * np.sin(ii * phi)
y = radius * np.cos(ii * phi)
positions.append([x + self.cenx, y + self.ceny, z_pos])
left_lower_corner = [
min(m1_start, m1_stop) + self.cenx,
min(m2_start, m2_stop) + self.ceny,
z_pos,
]
right_upper_corner = [
max(m1_start, m1_stop) + self.cenx,
max(m2_start, m2_stop) + self.ceny,
z_pos,
]
positions.append(left_lower_corner)
positions.append(right_upper_corner)
return np.array(positions)
def scan_core(self):
# use a device message to receive the scan number and
# scan ID before sending the message to the device server
yield from self.stubs.kickoff(device="rtx")
while True:
yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary")
status = self.device_manager.producer.get(MessageEndpoints.device_status("rt_scan"))
if status:
status_id = status.content.get("status", 1)
request_id = status.metadata.get("RID")
if status_id == 0 and self.metadata.get("RID") == request_id:
break
if status_id == 2 and self.metadata.get("RID") == request_id:
raise ScanAbortion(
"An error occured during the omny readout:"
f" {status.metadata.get('error')}"
)
time.sleep(1)
logger.debug("reading monitors")
# yield from self.device_rpc("rtx", "controller.kickoff")
def return_to_start(self):
"""return to the start position"""
# in omny, we need to move to the start position of the next scan
if isinstance(self.positions, np.ndarray) and len(self.positions[-1]) == 3:
yield from self.stubs.set(
device="rtx", value=self.positions[-1][0], wait_group="scan_motor"
)
yield from self.stubs.set(
device="rty", value=self.positions[-1][1], wait_group="scan_motor"
)
yield from self.stubs.set(
device="rtz", value=self.positions[-1][2], wait_group="scan_motor"
)
yield from self.stubs.wait(
wait_type="move", device=["rtx", "rty", "rtz"], wait_group="scan_motor"
)
return
logger.warning("No positions found to return to start")
def run(self):
self.initialize()
yield from self.read_scan_motors()
self.prepare_positions()
yield from self._prepare_setup()
yield from self.open_scan()
yield from self.stage()
yield from self.run_baseline_reading()
yield from self._prepare_setup_part2()
yield from self.scan_core()
yield from self.finalize()
yield from self.unstage()
yield from self.cleanup()