feat(flomni): add 360° tomo range and single-point acquire mode plus few fixes #228

Merged
holler merged 2 commits from feat/flomni360 into main 2026-06-20 14:03:05 +02:00
@@ -1236,6 +1236,7 @@ class _ProgressProxy:
"tomo_type": 0,
"tomo_start_time": None,
"estimated_remaining_time": None,
"estimated_finish_time": None,
"heartbeat": None,
}
@@ -1578,6 +1579,22 @@ class Flomni(
def tomo_stitch_overlap(self, val: float):
self.client.set_global_var("tomo_stitch_overlap", val)
@property
def tomo_angle_range(self):
"""Total angular sweep in degrees for tomo_type 1 (equally spaced
sub-tomograms), inclusive of the upper bound. Either 180 (default,
original behaviour) or 360."""
val = self.client.get_global_var("tomo_angle_range")
if val is None:
return 180
return val
@tomo_angle_range.setter
def tomo_angle_range(self, val: float):
if val not in (180, 360):
raise ValueError("tomo_angle_range must be 180 or 360 degrees.")
self.client.set_global_var("tomo_angle_range", val)
@property
def golden_projections_at_0_deg_for_damage_estimation(self):
val = self.client.get_global_var("golden_projections_at_0_deg_for_damage_estimation")
@@ -1600,6 +1617,39 @@ class Flomni(
def golden_ratio_bunch_size(self, val: float):
self.client.set_global_var("golden_ratio_bunch_size", val)
@property
def frames_per_trigger(self):
"""Number of burst frames acquired per point/projection. Used both
by scans.flomni_fermat_scan (via tomo_scan_projection) and by
tomo_acquire_at_angle (scans.acquire)."""
val = self.client.get_global_var("frames_per_trigger")
if val is None:
return 1
return val
@frames_per_trigger.setter
def frames_per_trigger(self, val: int):
if isinstance(val, bool) or not isinstance(val, int):
raise ValueError("frames_per_trigger must be a positive integer.")
if val <= 0:
raise ValueError("frames_per_trigger must be a positive integer.")
self.client.set_global_var("frames_per_trigger", val)
@property
def single_point_instead_of_fermat_scan(self):
"""If True, tomo_scan acquires a single point (or burst) at each
angle via scans.acquire instead of running scans.flomni_fermat_scan.
Applies to all tomo_types, since it only changes how a given angle
is acquired, not which angles are visited."""
val = self.client.get_global_var("single_point_instead_of_fermat_scan")
if val is None:
return False
return val
@single_point_instead_of_fermat_scan.setter
def single_point_instead_of_fermat_scan(self, val: bool):
self.client.set_global_var("single_point_instead_of_fermat_scan", val)
@property
def sample_name(self):
return self.sample_get_name(0)
@@ -1729,14 +1779,14 @@ class Flomni(
start = start_angle + _tomo_shift_angles
if subtomo_number % 2: # odd = forward
max_allowed_angle = 180.05 + self.tomo_angle_stepsize
proposed_end = start + 180
max_allowed_angle = self.tomo_angle_range + 0.05 + self.tomo_angle_stepsize
proposed_end = start + self.tomo_angle_range
angle_end = min(proposed_end, max_allowed_angle)
span = angle_end - start
else: # even = reverse
min_allowed_angle = 0
proposed_end = start - 180
proposed_end = start - self.tomo_angle_range
angle_end = max(proposed_end, min_allowed_angle)
span = start - angle_end
@@ -1747,8 +1797,8 @@ class Flomni(
if subtomo_number % 2: # odd subtomos → forward direction
# clamp end angle to max allowed
max_allowed_angle = 180.05 + self.tomo_angle_stepsize
proposed_end = start + 180
max_allowed_angle = self.tomo_angle_range + 0.05 + self.tomo_angle_stepsize
proposed_end = start + self.tomo_angle_range
angle_end = min(proposed_end, max_allowed_angle)
angles = np.linspace(start, angle_end, num=N, endpoint=True)
@@ -1756,7 +1806,7 @@ class Flomni(
else: # even subtomos → reverse direction
# go FROM start_angle down toward 0
min_allowed_angle = 0
proposed_end = start - 180
proposed_end = start - self.tomo_angle_range
angle_end = max(proposed_end, min_allowed_angle)
angles = np.linspace(start, angle_end, num=N, endpoint=True)
@@ -1778,18 +1828,22 @@ class Flomni(
if subtomo_number % 2: # odd = forward direction
self._subtomo_offset = round(sa / step)
else: # even = reverse direction
self._subtomo_offset = round((180 - sa) / step)
self._subtomo_offset = round((self.tomo_angle_range - sa) / step)
# progress index must always increase
self.progress["subtomo_projection"] = self._subtomo_offset + i
# ------------------------------------------------------------
# existing progress fields
self.progress["subtomo_total_projections"] = int(180 / self.tomo_angle_stepsize)
self.progress["subtomo_total_projections"] = int(
self.tomo_angle_range / self.tomo_angle_stepsize
)
self.progress["projection"] = (subtomo_number - 1) * self.progress[
"subtomo_total_projections"
] + self.progress["subtomo_projection"]
self.progress["total_projections"] = 180 / self.tomo_angle_stepsize * 8
self.progress["total_projections"] = (
self.tomo_angle_range / self.tomo_angle_stepsize
) * 8
self.progress["angle"] = angle
# finally do the scan at this angle
@@ -1798,7 +1852,7 @@ class Flomni(
@scan_repeat(max_repeats=10, default=True)
def _tomo_scan_at_angle(self, angle, subtomo_number):
if 0 <= angle < 180.05:
if 0 <= angle < self.tomo_angle_range + 0.05:
self.progress["heartbeat"] = datetime.datetime.now().isoformat()
print(f"Starting flOMNI scan for angle {angle} in subtomo {subtomo_number}")
self._print_progress()
@@ -1837,6 +1891,10 @@ class Flomni(
bec = builtins.__dict__.get("bec")
scans = builtins.__dict__.get("scans")
bec.builtin_actors.scan_interlock.trigger_setting = "restart_scan"
bec.builtin_actors.scan_interlock.enabled = True
self._current_special_angles = self.special_angles.copy()
# a new tomo scan was started
if (
@@ -1861,6 +1919,11 @@ class Flomni(
self.tomo_id = 0
self.write_pdf_report()
self.progress["tomo_start_time"] = datetime.datetime.now().isoformat()
# reset stale estimates from any previous scan, otherwise the GUI
# would keep showing a leftover ETA from before this scan has
# accumulated enough projections to compute a fresh one
self.progress["estimated_remaining_time"] = None
self.progress["estimated_finish_time"] = None
with scans.dataset_id_on_hold:
if self.tomo_type == 1:
@@ -1986,15 +2049,18 @@ class Flomni(
projection = self.progress["projection"]
total = self.progress["total_projections"]
if start_str is not None and total > 0 and projection > 9:
elapsed = (
datetime.datetime.now() - datetime.datetime.fromisoformat(start_str)
).total_seconds()
now = datetime.datetime.now()
elapsed = (now - datetime.datetime.fromisoformat(start_str)).total_seconds()
rate = projection / elapsed # projections per second
remaining_s = (total - projection) / rate
self.progress["estimated_remaining_time"] = remaining_s
eta_str = self._format_duration(remaining_s)
finish_dt = now + datetime.timedelta(seconds=remaining_s)
self.progress["estimated_finish_time"] = finish_dt.isoformat()
finish_str = finish_dt.strftime("%Y-%m-%d %H:%M:%S")
else:
eta_str = "N/A"
finish_str = "N/A"
# ----------------------------------------------------------------------
print("\x1b[95mProgress report:")
print(f"Tomo type: ....................... {self.progress['tomo_type']}")
@@ -2003,7 +2069,8 @@ class Flomni(
print(f"Angle: ........................... {self.progress['angle']}")
print(f"Current subtomo: ................. {self.progress['subtomo']}")
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}")
print(f"Estimated remaining time: ........ {eta_str}\x1b[0m")
print(f"Estimated remaining time: ........ {eta_str}")
print(f"Estimated finish time: ........... {finish_str}\x1b[0m")
self._flomnigui_update_progress()
def add_sample_database(
@@ -2026,6 +2093,10 @@ class Flomni(
flomni_at_each_angle(self, angle)
return
if self.single_point_instead_of_fermat_scan:
self.tomo_acquire_at_angle(angle)
return
self.tomo_scan_projection(angle)
def _golden(self, ii, howmany_sorted, maxangle, reverse=False):
@@ -2109,6 +2180,7 @@ class Flomni(
offsets[1]
- self.compute_additional_correction_y(angle)
- self.compute_additional_correction_y_2(angle)
+ self.manual_shift_y
)
sum_offset_z = offsets[2]
@@ -2141,6 +2213,7 @@ class Flomni(
zshift=sum_offset_z,
angle=angle,
exp_time=self.tomo_countingtime,
frames_per_trigger=self.frames_per_trigger,
)
if self.corridor_size > 0:
@@ -2150,6 +2223,68 @@ class Flomni(
self.tomo_reconstruct()
def tomo_acquire_at_angle(self, angle: float, frames_per_trigger: int | None = None):
"""
Move fsamroy to `angle`, then move rtx/rty/rtz to the alignment-corrected
scan center (same alignment-offset logic as tomo_scan_projection, but
without stitching), and acquire a single frame or a burst via
scans.acquire instead of running a fermat scan.
This mirrors the positioning sequence used internally by
flomni_fermat_scan (rotation, then rtx/rty/rtz with laser-tracker
on/check/move-to-region), but executes it as plain blocking
client-side calls, since this runs in the BEC client, not on the
scan server.
Args:
angle (float): rotation angle [deg] to move fsamroy to.
frames_per_trigger (int, optional): number of burst frames for
this acquisition. Defaults to self.frames_per_trigger.
"""
scans = builtins.__dict__.get("scans")
# --- rotation ---
fsamroy_current_setpoint = dev.fsamroy.user_setpoint.get()
if angle != fsamroy_current_setpoint:
umv(dev.fsamroy, angle)
else:
print("No rotation required")
# --- alignment offset (same as tomo_scan_projection, no stitching) ---
offsets = self.get_alignment_offset(angle)
sum_offset_x = offsets[0]
sum_offset_y = (
offsets[1]
- self.compute_additional_correction_y(angle)
- self.compute_additional_correction_y_2(angle)
+ self.manual_shift_y
)
sum_offset_z = offsets[2]
# --- positioning + laser tracker, mirroring
# flomni_fermat_scan._prepare_setup_part2 ---
dev.rtx.controller.laser_tracker_on()
umv(dev.rtx, sum_offset_x, dev.rty, sum_offset_y, dev.rtz, sum_offset_z)
tracker_signal = dev.rtx.controller.laser_tracker_check_signalstrength()
# checks that the fsamx coarse stage is at a position that leaves
# sufficient piezo range on the fine (rtx) stage
dev.rtx.controller.move_samx_to_scan_region(sum_offset_x)
if tracker_signal == "low":
logger.warning(
"Signal strength of the laser tracker is low. Realignment recommended!"
)
elif tracker_signal == "toolow":
raise FlomniError(
"Signal strength of the laser tracker is too low for scanning. Realignment required!"
)
# --- acquire ---
n_frames = (
frames_per_trigger if frames_per_trigger is not None else self.frames_per_trigger
)
scans.acquire(exp_time=self.tomo_countingtime, frames_per_trigger=n_frames)
def tomo_parameters(self):
"""print and update the tomo parameters"""
print("Current settings:")
@@ -2159,12 +2294,19 @@ class Flomni(
print(f"Stitching number x,y = {self.stitch_x}, {self.stitch_y}")
print(f"Stitching overlap = {self.tomo_stitch_overlap}")
print(f"Reconstruction queue name = {self.ptycho_reconstruct_foldername}")
print(f" _manual_shift_y <mm> = {self.manual_shift_y}")
print(f" _manual_shift_y <um> = {self.manual_shift_y}")
print(f"Frames per trigger (burst) = {self.frames_per_trigger}")
print(f"Single point instead of fermat = {self.single_point_instead_of_fermat_scan}")
print("")
if self.tomo_type == 1:
print("\x1b[1mTomo type 1:\x1b[0m 8 equally spaced sub-tomograms")
print(f"Total number of projections: {180/self.tomo_angle_stepsize*8}")
print(f"Angular range = {self.tomo_angle_range} degrees")
print(f"Total number of projections: {(self.tomo_angle_range/self.tomo_angle_stepsize)*8}")
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
print(
"Angular step of the final (combined) tomogram:"
f" {self.tomo_angle_range/((self.tomo_angle_range/self.tomo_angle_stepsize)*8)} degrees"
)
if self.tomo_type == 2:
print("\x1b[1mTomo type 2:\x1b[0m Golden ratio tomography")
print(f"Sorted in bunches of: {self.golden_ratio_bunch_size}")
@@ -2199,11 +2341,38 @@ class Flomni(
self.tomo_shellstep = self._get_val("<step size> um", self.tomo_shellstep, float)
self.fovx = self._get_val("<FOV X (max 200)> um", self.fovx, float)
self.fovy = self._get_val("<FOV Y (max 100)> um", self.fovy, float)
self.stitch_x = self._get_val("<stitch X>", self.stitch_x, int)
self.stitch_y = self._get_val("<stitch Y>", self.stitch_y, int)
if self.single_point_instead_of_fermat_scan:
print(
"Stitching is disabled while single point instead of fermat scan is"
" active; stitch X/Y forced to 0."
)
self.stitch_x = 0
self.stitch_y = 0
else:
self.stitch_x = self._get_val("<stitch X>", self.stitch_x, int)
self.stitch_y = self._get_val("<stitch Y>", self.stitch_y, int)
self.ptycho_reconstruct_foldername = self._get_val(
"Reconstruction queue ", self.ptycho_reconstruct_foldername, str
)
self.frames_per_trigger = self._get_val(
"Frames per trigger (burst)", self.frames_per_trigger, int
)
self.single_point_instead_of_fermat_scan = bool(
self._get_val(
"Single point instead of fermat scan (acquire at angle) 1/0?",
int(self.single_point_instead_of_fermat_scan),
int,
)
)
if self.single_point_instead_of_fermat_scan and (
self.stitch_x != 0 or self.stitch_y != 0
):
print(
"Stitching is not supported with single point instead of fermat scan;"
" stitch X/Y forced to 0."
)
self.stitch_x = 0
self.stitch_y = 0
print("Tomography type:")
print(" 1: 8 equally spaced sub-tomograms")
@@ -2212,12 +2381,22 @@ class Flomni(
self.tomo_type = self._get_val("Tomography type", self.tomo_type, int)
if self.tomo_type == 1:
tomo_numberofprojections = self._get_val(
"Total number of projections", 180 / self.tomo_angle_stepsize * 8, int
self.tomo_angle_range = self._get_val(
"Angular range (180 or 360)", self.tomo_angle_range, int
)
tomo_numberofprojections = self._get_val(
"Total number of projections",
(self.tomo_angle_range / self.tomo_angle_stepsize) * 8,
int,
)
self.tomo_angle_stepsize = (self.tomo_angle_range / tomo_numberofprojections) * 8
print(
f"The angular step within a sub-tomogram will be {self.tomo_angle_stepsize} degrees"
)
print(
"The angular step of the final (combined) tomogram will be"
f" {self.tomo_angle_range / tomo_numberofprojections} degrees"
)
print(f"The angular step will be {180/tomo_numberofprojections}")
self.tomo_angle_stepsize = 180 / tomo_numberofprojections * 8
print(f"The angular step in a subtomogram it will be {self.tomo_angle_stepsize}")
if self.tomo_type == 2:
self.golden_ratio_bunch_size = self._get_val(
@@ -2298,9 +2477,9 @@ class Flomni(
f"{'Dataset ID:':<{padding}}{dataset_id:>{padding}}\n",
f"{'Sample Info:':<{padding}}{'Sample Info':>{padding}}\n",
f"{'e-account:':<{padding}}{str(account):>{padding}}\n",
f"{'Number of projections:':<{padding}}{int(180 / self.tomo_angle_stepsize * 8):>{padding}}\n",
f"{'Number of projections:':<{padding}}{int((self.tomo_angle_range / self.tomo_angle_stepsize) * 8):>{padding}}\n",
f"{'First scan number:':<{padding}}{self.client.queue.next_scan_number:>{padding}}\n",
f"{'Last scan number approx.:':<{padding}}{self.client.queue.next_scan_number + int(180 / self.tomo_angle_stepsize * 8) + 10:>{padding}}\n",
f"{'Last scan number approx.:':<{padding}}{self.client.queue.next_scan_number + int((self.tomo_angle_range / self.tomo_angle_stepsize) * 8) + 10:>{padding}}\n",
f"{'Current photon energy:':<{padding}}To be implemented\n",
# f"{'Current photon energy:':<{padding}}{dev.mokev.read()['mokev']['value']:>{padding}.4f}\n",
f"{'Exposure time:':<{padding}}{self.tomo_countingtime:>{padding}.2f}\n",