fix: move lamni fly scan to dedicated flyer device
CI for csaxs_bec / test (push) Successful in 2m1s
CI for csaxs_bec / test (pull_request) Successful in 2m37s

This commit is contained in:
2026-04-14 13:02:00 +02:00
parent fa799b1864
commit af90396274
4 changed files with 267 additions and 190 deletions
+28 -19
View File
@@ -14,7 +14,7 @@ leyex:
port: 8081
sign: -1
deviceTags:
- lamni
- lamni
enabled: true
onFailure: buffer
readOnly: false
@@ -34,7 +34,7 @@ leyey:
port: 8081
sign: -1
deviceTags:
- lamni
- lamni
enabled: true
onFailure: buffer
readOnly: false
@@ -55,7 +55,7 @@ loptx:
port: 8081
sign: 1
deviceTags:
- lamni
- lamni
enabled: true
onFailure: buffer
readOnly: false
@@ -76,7 +76,7 @@ lopty:
port: 8081
sign: 1
deviceTags:
- lamni
- lamni
enabled: true
onFailure: buffer
readOnly: false
@@ -97,7 +97,7 @@ loptz:
port: 8081
sign: -1
deviceTags:
- lamni
- lamni
enabled: true
onFailure: buffer
readOnly: false
@@ -115,7 +115,7 @@ lsamrot:
port: 8081
sign: 1
deviceTags:
- lamni
- lamni
enabled: true
onFailure: buffer
readOnly: false
@@ -133,7 +133,7 @@ lsamx:
port: 8081
sign: -1
deviceTags:
- lamni
- lamni
enabled: true
onFailure: buffer
readOnly: false
@@ -153,7 +153,7 @@ lsamy:
port: 8081
sign: 1
deviceTags:
- lamni
- lamni
enabled: true
onFailure: buffer
readOnly: false
@@ -162,7 +162,6 @@ lsamy:
userParameter:
center: 10.041
############################################################
################ LamNI Smaract motors ######################
############################################################
@@ -179,7 +178,7 @@ losax:
port: 8085
sign: -1
deviceTags:
- lamni
- lamni
enabled: true
onFailure: buffer
readOnly: false
@@ -199,7 +198,7 @@ losay:
port: 8085
sign: -1
deviceTags:
- lamni
- lamni
enabled: true
onFailure: buffer
readOnly: false
@@ -220,7 +219,7 @@ losaz:
port: 8085
sign: 1
deviceTags:
- lamni
- lamni
enabled: true
onFailure: buffer
readOnly: false
@@ -242,12 +241,12 @@ rtx:
host: mpc2680.psi.ch
labels: rtx
limits:
- 0
- 0
- 0
- 0
port: 3333
sign: 1
deviceTags:
- lamni
- lamni
readoutPriority: baseline
connectionTimeout: 20
enabled: true
@@ -260,17 +259,27 @@ rty:
host: mpc2680.psi.ch
labels: rty
limits:
- 0
- 0
- 0
- 0
port: 3333
sign: 1
deviceTags:
- lamni
- lamni
readoutPriority: baseline
connectionTimeout: 20
enabled: true
readOnly: False
rt_positions:
deviceClass: csaxs_bec.devices.omny.rt.rt_lamni_ophyd.RtLamniFlyer
deviceConfig:
host: mpc2680.psi.ch
port: 3333
readoutPriority: async
connectionTimeout: 20
enabled: true
readOnly: False
############################################################
######################### Cameras ##########################
############################################################
@@ -287,4 +296,4 @@ cam_xeye:
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: async
readoutPriority: async
+1 -6
View File
@@ -518,14 +518,11 @@ class RtFlomniSetpointSignal(RtSetpointSignal):
tracker_status = self.parent.controller.laser_tracker_check_signalstrength()
if tracker_status == "toolow":
print(
"The interferometer signal is too low for movements. Realignment required."
)
print("The interferometer signal is too low for movements. Realignment required.")
raise RtError(
"The interferometer signal is too low for movements. Realignment required."
)
self.set_with_feedback_disabled(val)
def set_with_feedback_disabled(self, val):
@@ -783,7 +780,6 @@ class RtFlomniFlyer(Device):
)
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.average_lamni_angle = 0
self.readout_thread = None
self.scan_done_event = threading.Event()
self.scan_done_event.set()
@@ -800,7 +796,6 @@ class RtFlomniFlyer(Device):
read_counter = 0
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.average_lamni_angle = 0
mode, number_of_positions_planned, current_position_in_scan = (
self.controller.get_scan_status()
+217 -143
View File
@@ -3,12 +3,12 @@ import threading
import time
import numpy as np
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal
from ophyd import Device, DeviceStatus, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError, ReadOnlyError
from ophyd_devices.utils.bec_signals import AsyncMultiSignal, ProgressSignal
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected
from prettytable import PrettyTable
@@ -215,7 +215,7 @@ class RtLamniController(Controller):
def start_scan(self):
# interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0])
# if interferometer_feedback_not_running == 1:
if not self.feedback_is_running():
if not self.feedback_is_running():
logger.error(
"Cannot start scan because feedback loop is not running or there is an interferometer error."
)
@@ -232,51 +232,6 @@ class RtLamniController(Controller):
# start a point-by-point scan (for cont scan in flomni it would be "sa")
self.socket_put_and_receive("sd")
def start_readout(self):
readout = threading.Thread(target=self.read_positions_from_sampler)
readout.start()
def kickoff(self, metadata):
self.readout_metadata = metadata
while not self._min_scan_buffer_reached:
time.sleep(0.001)
self.start_scan()
time.sleep(0.1)
self.start_readout()
def _get_signals_from_table(self, return_table) -> dict:
self.average_stdeviations_x_st_fzp += float(return_table[5])
self.average_stdeviations_y_st_fzp += float(return_table[8])
self.average_lamni_angle += float(return_table[19])
signals = {
"target_x": {"value": float(return_table[3])},
"average_x_st_fzp": {"value": float(return_table[4])},
"stdev_x_st_fzp": {"value": float(return_table[5])},
"target_y": {"value": float(return_table[6])},
"average_y_st_fzp": {"value": float(return_table[7])},
"stdev_y_st_fzp": {"value": float(return_table[8])},
"average_cap1": {"value": float(return_table[9])},
"stdev_cap1": {"value": float(return_table[10])},
"average_cap2": {"value": float(return_table[11])},
"stdev_cap2": {"value": float(return_table[12])},
"average_cap3": {"value": float(return_table[13])},
"stdev_cap3": {"value": float(return_table[14])},
"average_cap4": {"value": float(return_table[15])},
"stdev_cap4": {"value": float(return_table[16])},
"average_cap5": {"value": float(return_table[17])},
"stdev_cap5": {"value": float(return_table[18])},
"average_angle_interf_ST": {"value": float(return_table[19])},
"stdev_angle_interf_ST": {"value": float(return_table[20])},
"average_stdeviations_x_st_fzp": {
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
},
"average_stdeviations_y_st_fzp": {
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
},
"average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)},
}
return signals
def feedback_is_running(self) -> bool:
status = int(float((self.socket_put_and_receive("J2")).split(",")[0]))
return status == 0 # 0 means running, 1 means error/disabled
@@ -285,24 +240,25 @@ class RtLamniController(Controller):
if self.feedback_is_running():
print("Loop is running, no error on interferometer.")
else:
print("Loop is not running, either it is turned off or an interferometer error occurred.")
print(
"Loop is not running, either it is turned off or an interferometer error occurred."
)
def show_analog_signals(self) -> dict:
self.socket_put("As") # start sampling
time.sleep(0.01)
return_table = (self.socket_put_and_receive("Ar")).split(",")
number_of_samples = int(float(return_table[0]))
signals = {
"number_of_samples": number_of_samples,
"piezo_0": float(return_table[1]),
"piezo_1": float(return_table[2]),
"cap_0": float(return_table[3]),
"cap_1": float(return_table[4]),
"cap_2": float(return_table[5]),
"cap_3": float(return_table[6]),
"cap_4": float(return_table[7]),
"number_of_samples": number_of_samples,
"piezo_0": float(return_table[1]),
"piezo_1": float(return_table[2]),
"cap_0": float(return_table[3]),
"cap_1": float(return_table[4]),
"cap_2": float(return_table[5]),
"cap_3": float(return_table[6]),
"cap_4": float(return_table[7]),
}
t = PrettyTable()
@@ -315,75 +271,6 @@ class RtLamniController(Controller):
return
def read_positions_from_sampler(self):
# this was for reading after the scan completed
number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read
read_counter = 0
previous_point_in_scan = 0
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.average_lamni_angle = 0
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
# if not (mode==2 or mode==3):
# error
self.device_manager.connector.set(
MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage(
device="rt_scan", status=1, metadata=self.readout_metadata
),
)
# while scan is running
while mode > 0:
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
time.sleep(0.01)
if current_position_in_scan > 5:
while current_position_in_scan > read_counter + 1:
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
# logger.info(f"{return_table}")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.publish_device_data(signals=signals, point_id=int(return_table[0]))
time.sleep(0.05)
# read the last samples even though scan is finished already
while number_of_positions_planned > read_counter:
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
# logger.info(f"{return_table}")
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.publish_device_data(signals=signals, point_id=int(return_table[0]))
self.device_manager.connector.set(
MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage(
device="rt_scan", status=0, metadata=self.readout_metadata
),
)
logger.info(
f"LamNI statistics: Average of all standard deviations: x {self.average_stdeviations_x_st_fzp/number_of_samples_to_read}, y {self.average_stdeviations_y_st_fzp/number_of_samples_to_read}, angle {self.average_lamni_angle/number_of_samples_to_read}."
)
def publish_device_data(self, signals, point_id):
self.device_manager.connector.set_and_publish(
MessageEndpoints.device_read("rt_lamni"),
messages.DeviceMessage(
signals=signals, metadata={"point_id": point_id, **self.readout_metadata}
),
)
def feedback_status_angle_lamni(self) -> bool:
return_table = (self.socket_put_and_receive("J7")).split(",")
logger.debug(
@@ -407,8 +294,8 @@ class RtLamniController(Controller):
t = PrettyTable()
t.title = "Interferometer signal strength"
t.field_names = ["Axis", "Description", "Value", "Running"]
t.add_row([0, "ST FZP horizontal", ssi_0, "-"])
t.add_row([1, "ST FZP vertical", ssi_1, "-"])
t.add_row([0, "ST FZP horizontal", ssi_0, "-"])
t.add_row([1, "ST FZP vertical", ssi_1, "-"])
t.add_row([2, "Angle interferometer", angle_signal, angle_running])
print(t)
@@ -431,7 +318,7 @@ class RtLamniController(Controller):
print(t)
print(f"Feedback loop running: {loop_status}")
return {"x": pos_x, "y": pos_y, "loop_running": loop_status}
return {"x": pos_x, "y": pos_y, "loop_running": loop_status}
def feedback_enable_with_reset(self):
if not self.feedback_status_angle_lamni():
@@ -792,27 +679,214 @@ class RtLamniMotor(Device, PositionerBase):
else:
raise TypeError(f"Expected value of type int but received {type(val)}")
def kickoff(self, metadata, **kwargs) -> None:
self.controller.kickoff(metadata)
@property
def egu(self):
"""The engineering units (EGU) for positions"""
return "um"
# how is this used later?
def stage(self) -> list[object]:
return super().stage()
def unstage(self) -> list[object]:
return super().unstage()
def stop(self, *, success=False):
self.controller.stop_all_axes()
return super().stop(success=success)
class RtLamniFlyer(Device):
USER_ACCESS = ["controller"]
data = Cpt(
AsyncMultiSignal,
name="data",
signals=[
"target_x",
"average_x_st_fzp",
"stdev_x_st_fzp",
"target_y",
"average_y_st_fzp",
"stdev_y_st_fzp",
"average_cap1",
"stdev_cap1",
"average_cap2",
"stdev_cap2",
"average_cap3",
"stdev_cap3",
"average_cap4",
"stdev_cap4",
"average_cap5",
"stdev_cap5",
"average_angle_interf_ST",
"stdev_angle_interf_ST",
"average_stdeviations_x_st_fzp",
"average_stdeviations_y_st_fzp",
"average_lamni_angle",
],
ndim=1,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
)
progress = Cpt(ProgressSignal, doc="ProgressSignal indicating of the device during the scan.")
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
host="mpc2680.psi.ch",
port=3333,
socket_cls=SocketIO,
device_manager=None,
**kwargs,
):
super().__init__(prefix=prefix, name=name, parent=parent, **kwargs)
self.shutdown_event = threading.Event()
self.controller = RtLamniController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
)
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.average_lamni_angle = 0
self.readout_thread = None
self.scan_done_event = threading.Event()
self.scan_done_event.set()
def stage(self):
self.shutdown_event.clear()
self.scan_done_event.set()
return super().stage()
def start_readout(self, status: DeviceStatus):
self.readout_thread = threading.Thread(
target=self.read_positions_from_sampler, args=(status,)
)
self.readout_thread.start()
def kickoff(self) -> DeviceStatus:
self.shutdown_event.clear()
self.scan_done_event.clear()
while not self.controller._min_scan_buffer_reached and not self.shutdown_event.wait(0.001):
...
self.controller.start_scan()
self.shutdown_event.wait(0.1)
status = DeviceStatus(self)
status.set_finished()
return status
def complete(self) -> DeviceStatus:
"""Wait until the flyer is done."""
if self.scan_done_event.is_set():
# if the scan_done_event is already set, we can return a finished status immediately
status = DeviceStatus(self)
status.set_finished()
return status
status = DeviceStatus(self)
self.start_readout(status)
status.add_callback(lambda *args, **kwargs: self.scan_done_event.set())
return status
def stop(self, *, success=False):
self.shutdown_event.set()
self.scan_done_event.set()
if self.readout_thread is not None:
self.readout_thread.join()
return super().stop(success=success)
def read_positions_from_sampler(self, status: DeviceStatus):
"""
Read the positions from the sampler and update the data signal.
This function runs in a separate thread and continuously checks the
scan status.
Args:
status (DeviceStatus): The status object to update when the readout is complete.
"""
read_counter = 0
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.average_lamni_angle = 0
mode, number_of_positions_planned, current_position_in_scan = (
self.controller.get_scan_status()
)
# while scan is running
while mode > 0 and not self.shutdown_event.wait(0.01):
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
mode, number_of_positions_planned, current_position_in_scan = (
self.controller.get_scan_status()
)
if current_position_in_scan > 5:
while current_position_in_scan > read_counter + 1:
return_table = (
self.controller.socket_put_and_receive(f"r{read_counter}")
).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
self.progress.put(
value=read_counter, max_value=number_of_positions_planned, done=False
)
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.data.set(signals)
if self.shutdown_event.wait(0.05):
logger.info("Shutdown event set, stopping readout.")
# if we are here, the shutdown_event is set. We can exit the readout loop.
status.set_finished()
return
# read the last samples even though scan is finished already
while number_of_positions_planned > read_counter and not self.shutdown_event.is_set():
return_table = (self.controller.socket_put_and_receive(f"r{read_counter}")).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=False)
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.data.set(signals)
# NOTE: No need to set the status to failed if the shutdown_event is set.
# The stop() method will take care of that.
status.set_finished()
self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=True)
logger.info(
f"LamNI statistics: Average of all standard deviations: x {self.average_stdeviations_x_st_fzp}, y {self.average_stdeviations_y_st_fzp}, angle {self.average_lamni_angle}."
)
def _get_signals_from_table(self, return_table) -> dict:
self.average_stdeviations_x_st_fzp += float(return_table[5])
self.average_stdeviations_y_st_fzp += float(return_table[8])
self.average_lamni_angle += float(return_table[19])
signals = {
"target_x": {"value": float(return_table[3])},
"average_x_st_fzp": {"value": float(return_table[4])},
"stdev_x_st_fzp": {"value": float(return_table[5])},
"target_y": {"value": float(return_table[6])},
"average_y_st_fzp": {"value": float(return_table[7])},
"stdev_y_st_fzp": {"value": float(return_table[8])},
"average_cap1": {"value": float(return_table[9])},
"stdev_cap1": {"value": float(return_table[10])},
"average_cap2": {"value": float(return_table[11])},
"stdev_cap2": {"value": float(return_table[12])},
"average_cap3": {"value": float(return_table[13])},
"stdev_cap3": {"value": float(return_table[14])},
"average_cap4": {"value": float(return_table[15])},
"stdev_cap4": {"value": float(return_table[16])},
"average_cap5": {"value": float(return_table[17])},
"stdev_cap5": {"value": float(return_table[18])},
"average_angle_interf_ST": {"value": float(return_table[19])},
"stdev_angle_interf_ST": {"value": float(return_table[20])},
"average_stdeviations_x_st_fzp": {
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
},
"average_stdeviations_y_st_fzp": {
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
},
"average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)},
}
return signals
if __name__ == "__main__": # pragma: no cover
mock = False
if not mock:
+21 -22
View File
@@ -169,8 +169,8 @@ class LamNIMixin:
self.device_manager.devices.lsamx.read_only = True
self.device_manager.devices.lsamy.read_only = True
#update angle readback before start of the scan
# update angle readback before start of the scan
yield from self.stubs.send_rpc_and_wait("lsamrot", "readback.get")
yield from self.stubs.send_rpc_and_wait("rtx", "controller.feedback_enable_without_reset")
@@ -213,7 +213,14 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
def __init__(self, *args, parameter: dict = None, frames_per_trigger:int=1, exp_time:float=0,**kwargs):
def __init__(
self,
*args,
parameter: dict = None,
frames_per_trigger: int = 1,
exp_time: float = 0,
**kwargs,
):
"""
A LamNI scan following Fermat's spiral.
@@ -236,7 +243,9 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
>>> scans.lamni_fermat_scan(fov_size=[20, 25], center_x=0.02, center_y=0, shift_x=0, shift_y=0, angle=0, step=0.5, fov_circular=0, exp_time=0.1, frames_per_trigger=1)
"""
super().__init__(parameter=parameter, frames_per_trigger=frames_per_trigger, exp_time=exp_time,**kwargs)
super().__init__(
parameter=parameter, frames_per_trigger=frames_per_trigger, exp_time=exp_time, **kwargs
)
self.axis = []
scan_kwargs = parameter.get("kwargs", {})
self.fov_size = scan_kwargs.get("fov_size")
@@ -455,27 +464,17 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
yield from self._at_each_point(ind, pos)
self.burst_index = 0
elif self.scan_type == "fly":
# use a device message to receive the scan number and
# scan ID before sending the message to the device server
yield from self.stubs.kickoff(device="rtx")
while True:
yield from self.stubs.read(group="monitored")
msg = self.device_manager.connector.get(MessageEndpoints.device_status("rt_scan"))
if msg:
status = msg
status_id = status.content.get("status", 1)
request_id = status.metadata.get("RID")
if status_id == 0 and self.metadata.get("RID") == request_id:
break
if status_id == 2 and self.metadata.get("RID") == request_id:
raise ScanAbortion(
"An error occured during the LamNI readout:"
f" {status.metadata.get('error')}"
)
# fly scan mode
yield from self.stubs.kickoff(device="rt_positions")
# start the readout loop of the flyer
status = yield from self.stubs.complete(device="rt_positions", wait=False)
while not status.done:
yield from self.stubs.read(group="monitored", point_id=self.point_id)
self.point_id += 1
time.sleep(1)
logger.debug("reading monitors")
# yield from self.device_rpc("rtx", "controller.kickoff")
def run(self):
self.initialize()