fix: lamni scan adjustments

This commit is contained in:
x01dc
2026-04-14 18:31:50 +02:00
committed by appel_c
parent 31d9ba3c69
commit 3a249557e9
+24 -20
View File
@@ -26,7 +26,7 @@ import numpy as np
from bec_lib import bec_logger
from bec_lib.endpoints import MessageEndpoints
from bec_server.scan_server.errors import ScanAbortion
from bec_server.scan_server.scans import RequestBase, ScanArgType, ScanBase
from bec_server.scan_server.scans import AsyncFlyScanBase, RequestBase, ScanArgType
MOVEMENT_SCALE_X = np.sin(np.radians(15)) * np.cos(np.radians(30))
MOVEMENT_SCALE_Y = np.cos(np.radians(15))
@@ -205,10 +205,9 @@ class LamNIMoveToScanCenter(RequestBase, LamNIMixin):
yield from self.lamni_new_scan_center_interferometer(center_x, center_y)
class LamNIFermatScan(ScanBase, LamNIMixin):
class LamNIFermatScan(AsyncFlyScanBase, LamNIMixin):
scan_name = "lamni_fermat_scan"
scan_report_hint = "table"
scan_type = "step"
scan_type = "fly"
required_kwargs = ["fov_size", "exp_time", "step", "angle"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
@@ -258,7 +257,16 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
self.optim_trajectory_corridor = scan_kwargs.get("optim_trajectory_corridor")
def initialize(self):
self.scan_motors = ["rtx", "rty"]
self.scan_motors = []
self.update_readout_priority()
def scan_report_instructions(self):
"""Scan report instructions for the progress bar"""
yield from self.stubs.scan_report_instruction({"device_progress": ["rt_positions"]})
@property
def monitor_sync(self) -> str:
return "rt_positions"
def _optimize_trajectory(self):
self.positions = self.optimize_corridor(
@@ -449,29 +457,25 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
yield from self.stubs.set(device="lsamrot", value=angle)
def scan_core(self):
if self.scan_type == "step":
for ind, pos in self._get_position():
for self.burst_index in range(self.burst_at_each_point):
yield from self._at_each_point(ind, pos)
self.burst_index = 0
elif self.scan_type == "fly":
# fly scan mode
yield from self.stubs.kickoff(device="rt_positions")
# fly scan mode
yield from self.stubs.kickoff(device="rt_positions")
# start the readout loop of the flyer
status = yield from self.stubs.complete(device="rt_positions", wait=False)
# start the readout loop of the flyer
status = yield from self.stubs.complete(device="rt_positions", wait=False)
while not status.done:
yield from self.stubs.read(group="monitored", point_id=self.point_id)
self.point_id += 1
time.sleep(1)
logger.debug("reading monitors")
while not status.done:
yield from self.stubs.read(group="monitored", point_id=self.point_id)
self.point_id += 1
time.sleep(1)
logger.debug("reading monitors")
def run(self):
self.initialize()
yield from self.read_scan_motors()
self.prepare_positions()
yield from self._prepare_setup()
yield from self.scan_report_instructions()
yield from self.open_scan()
yield from self.stage()
yield from self.run_baseline_reading()