From af903962749b6063be7bfd248e2bbde152af83ff Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Tue, 14 Apr 2026 13:02:00 +0200 Subject: [PATCH] fix: move lamni fly scan to dedicated flyer device --- csaxs_bec/device_configs/ptycho_lamni.yaml | 47 ++- csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py | 7 +- csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py | 360 +++++++++++-------- csaxs_bec/scans/LamNIFermatScan.py | 43 ++- 4 files changed, 267 insertions(+), 190 deletions(-) diff --git a/csaxs_bec/device_configs/ptycho_lamni.yaml b/csaxs_bec/device_configs/ptycho_lamni.yaml index 63ccf96..7de6c86 100644 --- a/csaxs_bec/device_configs/ptycho_lamni.yaml +++ b/csaxs_bec/device_configs/ptycho_lamni.yaml @@ -14,7 +14,7 @@ leyex: port: 8081 sign: -1 deviceTags: - - lamni + - lamni enabled: true onFailure: buffer readOnly: false @@ -34,7 +34,7 @@ leyey: port: 8081 sign: -1 deviceTags: - - lamni + - lamni enabled: true onFailure: buffer readOnly: false @@ -55,7 +55,7 @@ loptx: port: 8081 sign: 1 deviceTags: - - lamni + - lamni enabled: true onFailure: buffer readOnly: false @@ -76,7 +76,7 @@ lopty: port: 8081 sign: 1 deviceTags: - - lamni + - lamni enabled: true onFailure: buffer readOnly: false @@ -97,7 +97,7 @@ loptz: port: 8081 sign: -1 deviceTags: - - lamni + - lamni enabled: true onFailure: buffer readOnly: false @@ -115,7 +115,7 @@ lsamrot: port: 8081 sign: 1 deviceTags: - - lamni + - lamni enabled: true onFailure: buffer readOnly: false @@ -133,7 +133,7 @@ lsamx: port: 8081 sign: -1 deviceTags: - - lamni + - lamni enabled: true onFailure: buffer readOnly: false @@ -153,7 +153,7 @@ lsamy: port: 8081 sign: 1 deviceTags: - - lamni + - lamni enabled: true onFailure: buffer readOnly: false @@ -162,7 +162,6 @@ lsamy: userParameter: center: 10.041 - ############################################################ ################ LamNI Smaract motors ###################### ############################################################ @@ -179,7 +178,7 @@ losax: port: 8085 sign: -1 deviceTags: - - lamni + - lamni enabled: true onFailure: buffer readOnly: false @@ -199,7 +198,7 @@ losay: port: 8085 sign: -1 deviceTags: - - lamni + - lamni enabled: true onFailure: buffer readOnly: false @@ -220,7 +219,7 @@ losaz: port: 8085 sign: 1 deviceTags: - - lamni + - lamni enabled: true onFailure: buffer readOnly: false @@ -242,12 +241,12 @@ rtx: host: mpc2680.psi.ch labels: rtx limits: - - 0 - - 0 + - 0 + - 0 port: 3333 sign: 1 deviceTags: - - lamni + - lamni readoutPriority: baseline connectionTimeout: 20 enabled: true @@ -260,17 +259,27 @@ rty: host: mpc2680.psi.ch labels: rty limits: - - 0 - - 0 + - 0 + - 0 port: 3333 sign: 1 deviceTags: - - lamni + - lamni readoutPriority: baseline connectionTimeout: 20 enabled: true readOnly: False +rt_positions: + deviceClass: csaxs_bec.devices.omny.rt.rt_lamni_ophyd.RtLamniFlyer + deviceConfig: + host: mpc2680.psi.ch + port: 3333 + readoutPriority: async + connectionTimeout: 20 + enabled: true + readOnly: False + ############################################################ ######################### Cameras ########################## ############################################################ @@ -287,4 +296,4 @@ cam_xeye: enabled: true onFailure: buffer readOnly: false - readoutPriority: async \ No newline at end of file + readoutPriority: async diff --git a/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py b/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py index def31da..c7823d3 100644 --- a/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py +++ b/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py @@ -518,14 +518,11 @@ class RtFlomniSetpointSignal(RtSetpointSignal): tracker_status = self.parent.controller.laser_tracker_check_signalstrength() if tracker_status == "toolow": - print( - "The interferometer signal is too low for movements. Realignment required." - ) + print("The interferometer signal is too low for movements. Realignment required.") raise RtError( "The interferometer signal is too low for movements. Realignment required." ) - self.set_with_feedback_disabled(val) def set_with_feedback_disabled(self, val): @@ -783,7 +780,6 @@ class RtFlomniFlyer(Device): ) self.average_stdeviations_x_st_fzp = 0 self.average_stdeviations_y_st_fzp = 0 - self.average_lamni_angle = 0 self.readout_thread = None self.scan_done_event = threading.Event() self.scan_done_event.set() @@ -800,7 +796,6 @@ class RtFlomniFlyer(Device): read_counter = 0 self.average_stdeviations_x_st_fzp = 0 self.average_stdeviations_y_st_fzp = 0 - self.average_lamni_angle = 0 mode, number_of_positions_planned, current_position_in_scan = ( self.controller.get_scan_status() diff --git a/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py b/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py index 3e3df3c..2ad0099 100644 --- a/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py +++ b/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py @@ -3,12 +3,12 @@ import threading import time import numpy as np -from bec_lib import bec_logger, messages -from bec_lib.endpoints import MessageEndpoints +from bec_lib import bec_logger from ophyd import Component as Cpt -from ophyd import Device, PositionerBase, Signal +from ophyd import Device, DeviceStatus, PositionerBase, Signal from ophyd.status import wait as status_wait from ophyd.utils import LimitError, ReadOnlyError +from ophyd_devices.utils.bec_signals import AsyncMultiSignal, ProgressSignal from ophyd_devices.utils.controller import Controller, threadlocked from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected from prettytable import PrettyTable @@ -215,7 +215,7 @@ class RtLamniController(Controller): def start_scan(self): # interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0]) # if interferometer_feedback_not_running == 1: - if not self.feedback_is_running(): + if not self.feedback_is_running(): logger.error( "Cannot start scan because feedback loop is not running or there is an interferometer error." ) @@ -232,51 +232,6 @@ class RtLamniController(Controller): # start a point-by-point scan (for cont scan in flomni it would be "sa") self.socket_put_and_receive("sd") - def start_readout(self): - readout = threading.Thread(target=self.read_positions_from_sampler) - readout.start() - - def kickoff(self, metadata): - self.readout_metadata = metadata - while not self._min_scan_buffer_reached: - time.sleep(0.001) - self.start_scan() - time.sleep(0.1) - self.start_readout() - - def _get_signals_from_table(self, return_table) -> dict: - self.average_stdeviations_x_st_fzp += float(return_table[5]) - self.average_stdeviations_y_st_fzp += float(return_table[8]) - self.average_lamni_angle += float(return_table[19]) - signals = { - "target_x": {"value": float(return_table[3])}, - "average_x_st_fzp": {"value": float(return_table[4])}, - "stdev_x_st_fzp": {"value": float(return_table[5])}, - "target_y": {"value": float(return_table[6])}, - "average_y_st_fzp": {"value": float(return_table[7])}, - "stdev_y_st_fzp": {"value": float(return_table[8])}, - "average_cap1": {"value": float(return_table[9])}, - "stdev_cap1": {"value": float(return_table[10])}, - "average_cap2": {"value": float(return_table[11])}, - "stdev_cap2": {"value": float(return_table[12])}, - "average_cap3": {"value": float(return_table[13])}, - "stdev_cap3": {"value": float(return_table[14])}, - "average_cap4": {"value": float(return_table[15])}, - "stdev_cap4": {"value": float(return_table[16])}, - "average_cap5": {"value": float(return_table[17])}, - "stdev_cap5": {"value": float(return_table[18])}, - "average_angle_interf_ST": {"value": float(return_table[19])}, - "stdev_angle_interf_ST": {"value": float(return_table[20])}, - "average_stdeviations_x_st_fzp": { - "value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1) - }, - "average_stdeviations_y_st_fzp": { - "value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1) - }, - "average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)}, - } - return signals - def feedback_is_running(self) -> bool: status = int(float((self.socket_put_and_receive("J2")).split(",")[0])) return status == 0 # 0 means running, 1 means error/disabled @@ -285,24 +240,25 @@ class RtLamniController(Controller): if self.feedback_is_running(): print("Loop is running, no error on interferometer.") else: - print("Loop is not running, either it is turned off or an interferometer error occurred.") - + print( + "Loop is not running, either it is turned off or an interferometer error occurred." + ) def show_analog_signals(self) -> dict: self.socket_put("As") # start sampling time.sleep(0.01) return_table = (self.socket_put_and_receive("Ar")).split(",") - + number_of_samples = int(float(return_table[0])) signals = { - "number_of_samples": number_of_samples, - "piezo_0": float(return_table[1]), - "piezo_1": float(return_table[2]), - "cap_0": float(return_table[3]), - "cap_1": float(return_table[4]), - "cap_2": float(return_table[5]), - "cap_3": float(return_table[6]), - "cap_4": float(return_table[7]), + "number_of_samples": number_of_samples, + "piezo_0": float(return_table[1]), + "piezo_1": float(return_table[2]), + "cap_0": float(return_table[3]), + "cap_1": float(return_table[4]), + "cap_2": float(return_table[5]), + "cap_3": float(return_table[6]), + "cap_4": float(return_table[7]), } t = PrettyTable() @@ -315,75 +271,6 @@ class RtLamniController(Controller): return - def read_positions_from_sampler(self): - # this was for reading after the scan completed - number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read - - read_counter = 0 - previous_point_in_scan = 0 - - self.average_stdeviations_x_st_fzp = 0 - self.average_stdeviations_y_st_fzp = 0 - self.average_lamni_angle = 0 - - mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status() - - # if not (mode==2 or mode==3): - # error - self.device_manager.connector.set( - MessageEndpoints.device_status("rt_scan"), - messages.DeviceStatusMessage( - device="rt_scan", status=1, metadata=self.readout_metadata - ), - ) - # while scan is running - while mode > 0: - # logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}") - mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status() - time.sleep(0.01) - if current_position_in_scan > 5: - while current_position_in_scan > read_counter + 1: - return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",") - # logger.info(f"{return_table}") - logger.info(f"Read {read_counter} out of {number_of_positions_planned}") - - read_counter = read_counter + 1 - - signals = self._get_signals_from_table(return_table) - - self.publish_device_data(signals=signals, point_id=int(return_table[0])) - - time.sleep(0.05) - - # read the last samples even though scan is finished already - while number_of_positions_planned > read_counter: - return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",") - logger.info(f"Read {read_counter} out of {number_of_positions_planned}") - # logger.info(f"{return_table}") - read_counter = read_counter + 1 - - signals = self._get_signals_from_table(return_table) - self.publish_device_data(signals=signals, point_id=int(return_table[0])) - - self.device_manager.connector.set( - MessageEndpoints.device_status("rt_scan"), - messages.DeviceStatusMessage( - device="rt_scan", status=0, metadata=self.readout_metadata - ), - ) - - logger.info( - f"LamNI statistics: Average of all standard deviations: x {self.average_stdeviations_x_st_fzp/number_of_samples_to_read}, y {self.average_stdeviations_y_st_fzp/number_of_samples_to_read}, angle {self.average_lamni_angle/number_of_samples_to_read}." - ) - - def publish_device_data(self, signals, point_id): - self.device_manager.connector.set_and_publish( - MessageEndpoints.device_read("rt_lamni"), - messages.DeviceMessage( - signals=signals, metadata={"point_id": point_id, **self.readout_metadata} - ), - ) - def feedback_status_angle_lamni(self) -> bool: return_table = (self.socket_put_and_receive("J7")).split(",") logger.debug( @@ -407,8 +294,8 @@ class RtLamniController(Controller): t = PrettyTable() t.title = "Interferometer signal strength" t.field_names = ["Axis", "Description", "Value", "Running"] - t.add_row([0, "ST FZP horizontal", ssi_0, "-"]) - t.add_row([1, "ST FZP vertical", ssi_1, "-"]) + t.add_row([0, "ST FZP horizontal", ssi_0, "-"]) + t.add_row([1, "ST FZP vertical", ssi_1, "-"]) t.add_row([2, "Angle interferometer", angle_signal, angle_running]) print(t) @@ -431,7 +318,7 @@ class RtLamniController(Controller): print(t) print(f"Feedback loop running: {loop_status}") - return {"x": pos_x, "y": pos_y, "loop_running": loop_status} + return {"x": pos_x, "y": pos_y, "loop_running": loop_status} def feedback_enable_with_reset(self): if not self.feedback_status_angle_lamni(): @@ -792,27 +679,214 @@ class RtLamniMotor(Device, PositionerBase): else: raise TypeError(f"Expected value of type int but received {type(val)}") - def kickoff(self, metadata, **kwargs) -> None: - self.controller.kickoff(metadata) - @property def egu(self): """The engineering units (EGU) for positions""" return "um" - # how is this used later? - - def stage(self) -> list[object]: - return super().stage() - - def unstage(self) -> list[object]: - return super().unstage() - def stop(self, *, success=False): self.controller.stop_all_axes() return super().stop(success=success) +class RtLamniFlyer(Device): + USER_ACCESS = ["controller"] + data = Cpt( + AsyncMultiSignal, + name="data", + signals=[ + "target_x", + "average_x_st_fzp", + "stdev_x_st_fzp", + "target_y", + "average_y_st_fzp", + "stdev_y_st_fzp", + "average_cap1", + "stdev_cap1", + "average_cap2", + "stdev_cap2", + "average_cap3", + "stdev_cap3", + "average_cap4", + "stdev_cap4", + "average_cap5", + "stdev_cap5", + "average_angle_interf_ST", + "stdev_angle_interf_ST", + "average_stdeviations_x_st_fzp", + "average_stdeviations_y_st_fzp", + "average_lamni_angle", + ], + ndim=1, + async_update={"type": "add", "max_shape": [None]}, + max_size=1000, + ) + progress = Cpt(ProgressSignal, doc="ProgressSignal indicating of the device during the scan.") + + def __init__( + self, + prefix="", + *, + name, + kind=None, + read_attrs=None, + configuration_attrs=None, + parent=None, + host="mpc2680.psi.ch", + port=3333, + socket_cls=SocketIO, + device_manager=None, + **kwargs, + ): + super().__init__(prefix=prefix, name=name, parent=parent, **kwargs) + self.shutdown_event = threading.Event() + self.controller = RtLamniController( + socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager + ) + self.average_stdeviations_x_st_fzp = 0 + self.average_stdeviations_y_st_fzp = 0 + self.average_lamni_angle = 0 + self.readout_thread = None + self.scan_done_event = threading.Event() + self.scan_done_event.set() + + def stage(self): + self.shutdown_event.clear() + self.scan_done_event.set() + return super().stage() + + def start_readout(self, status: DeviceStatus): + self.readout_thread = threading.Thread( + target=self.read_positions_from_sampler, args=(status,) + ) + self.readout_thread.start() + + def kickoff(self) -> DeviceStatus: + self.shutdown_event.clear() + self.scan_done_event.clear() + while not self.controller._min_scan_buffer_reached and not self.shutdown_event.wait(0.001): + ... + self.controller.start_scan() + self.shutdown_event.wait(0.1) + status = DeviceStatus(self) + status.set_finished() + return status + + def complete(self) -> DeviceStatus: + """Wait until the flyer is done.""" + if self.scan_done_event.is_set(): + # if the scan_done_event is already set, we can return a finished status immediately + status = DeviceStatus(self) + status.set_finished() + return status + status = DeviceStatus(self) + self.start_readout(status) + status.add_callback(lambda *args, **kwargs: self.scan_done_event.set()) + return status + + def stop(self, *, success=False): + self.shutdown_event.set() + self.scan_done_event.set() + if self.readout_thread is not None: + self.readout_thread.join() + return super().stop(success=success) + + def read_positions_from_sampler(self, status: DeviceStatus): + """ + Read the positions from the sampler and update the data signal. + This function runs in a separate thread and continuously checks the + scan status. + + Args: + status (DeviceStatus): The status object to update when the readout is complete. + """ + read_counter = 0 + self.average_stdeviations_x_st_fzp = 0 + self.average_stdeviations_y_st_fzp = 0 + self.average_lamni_angle = 0 + + mode, number_of_positions_planned, current_position_in_scan = ( + self.controller.get_scan_status() + ) + + # while scan is running + while mode > 0 and not self.shutdown_event.wait(0.01): + # logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}") + mode, number_of_positions_planned, current_position_in_scan = ( + self.controller.get_scan_status() + ) + if current_position_in_scan > 5: + while current_position_in_scan > read_counter + 1: + return_table = ( + self.controller.socket_put_and_receive(f"r{read_counter}") + ).split(",") + logger.info(f"Read {read_counter} out of {number_of_positions_planned}") + self.progress.put( + value=read_counter, max_value=number_of_positions_planned, done=False + ) + read_counter = read_counter + 1 + signals = self._get_signals_from_table(return_table) + self.data.set(signals) + + if self.shutdown_event.wait(0.05): + logger.info("Shutdown event set, stopping readout.") + # if we are here, the shutdown_event is set. We can exit the readout loop. + status.set_finished() + return + + # read the last samples even though scan is finished already + while number_of_positions_planned > read_counter and not self.shutdown_event.is_set(): + return_table = (self.controller.socket_put_and_receive(f"r{read_counter}")).split(",") + logger.info(f"Read {read_counter} out of {number_of_positions_planned}") + self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=False) + read_counter = read_counter + 1 + + signals = self._get_signals_from_table(return_table) + self.data.set(signals) + + # NOTE: No need to set the status to failed if the shutdown_event is set. + # The stop() method will take care of that. + status.set_finished() + self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=True) + + logger.info( + f"LamNI statistics: Average of all standard deviations: x {self.average_stdeviations_x_st_fzp}, y {self.average_stdeviations_y_st_fzp}, angle {self.average_lamni_angle}." + ) + + def _get_signals_from_table(self, return_table) -> dict: + self.average_stdeviations_x_st_fzp += float(return_table[5]) + self.average_stdeviations_y_st_fzp += float(return_table[8]) + self.average_lamni_angle += float(return_table[19]) + signals = { + "target_x": {"value": float(return_table[3])}, + "average_x_st_fzp": {"value": float(return_table[4])}, + "stdev_x_st_fzp": {"value": float(return_table[5])}, + "target_y": {"value": float(return_table[6])}, + "average_y_st_fzp": {"value": float(return_table[7])}, + "stdev_y_st_fzp": {"value": float(return_table[8])}, + "average_cap1": {"value": float(return_table[9])}, + "stdev_cap1": {"value": float(return_table[10])}, + "average_cap2": {"value": float(return_table[11])}, + "stdev_cap2": {"value": float(return_table[12])}, + "average_cap3": {"value": float(return_table[13])}, + "stdev_cap3": {"value": float(return_table[14])}, + "average_cap4": {"value": float(return_table[15])}, + "stdev_cap4": {"value": float(return_table[16])}, + "average_cap5": {"value": float(return_table[17])}, + "stdev_cap5": {"value": float(return_table[18])}, + "average_angle_interf_ST": {"value": float(return_table[19])}, + "stdev_angle_interf_ST": {"value": float(return_table[20])}, + "average_stdeviations_x_st_fzp": { + "value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1) + }, + "average_stdeviations_y_st_fzp": { + "value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1) + }, + "average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)}, + } + return signals + + if __name__ == "__main__": # pragma: no cover mock = False if not mock: diff --git a/csaxs_bec/scans/LamNIFermatScan.py b/csaxs_bec/scans/LamNIFermatScan.py index 98982c2..9ffcf79 100644 --- a/csaxs_bec/scans/LamNIFermatScan.py +++ b/csaxs_bec/scans/LamNIFermatScan.py @@ -169,8 +169,8 @@ class LamNIMixin: self.device_manager.devices.lsamx.read_only = True self.device_manager.devices.lsamy.read_only = True - - #update angle readback before start of the scan + + # update angle readback before start of the scan yield from self.stubs.send_rpc_and_wait("lsamrot", "readback.get") yield from self.stubs.send_rpc_and_wait("rtx", "controller.feedback_enable_without_reset") @@ -213,7 +213,14 @@ class LamNIFermatScan(ScanBase, LamNIMixin): arg_input = {} arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} - def __init__(self, *args, parameter: dict = None, frames_per_trigger:int=1, exp_time:float=0,**kwargs): + def __init__( + self, + *args, + parameter: dict = None, + frames_per_trigger: int = 1, + exp_time: float = 0, + **kwargs, + ): """ A LamNI scan following Fermat's spiral. @@ -236,7 +243,9 @@ class LamNIFermatScan(ScanBase, LamNIMixin): >>> scans.lamni_fermat_scan(fov_size=[20, 25], center_x=0.02, center_y=0, shift_x=0, shift_y=0, angle=0, step=0.5, fov_circular=0, exp_time=0.1, frames_per_trigger=1) """ - super().__init__(parameter=parameter, frames_per_trigger=frames_per_trigger, exp_time=exp_time,**kwargs) + super().__init__( + parameter=parameter, frames_per_trigger=frames_per_trigger, exp_time=exp_time, **kwargs + ) self.axis = [] scan_kwargs = parameter.get("kwargs", {}) self.fov_size = scan_kwargs.get("fov_size") @@ -455,27 +464,17 @@ class LamNIFermatScan(ScanBase, LamNIMixin): yield from self._at_each_point(ind, pos) self.burst_index = 0 elif self.scan_type == "fly": - # use a device message to receive the scan number and - # scan ID before sending the message to the device server - yield from self.stubs.kickoff(device="rtx") - while True: - yield from self.stubs.read(group="monitored") - msg = self.device_manager.connector.get(MessageEndpoints.device_status("rt_scan")) - if msg: - status = msg - status_id = status.content.get("status", 1) - request_id = status.metadata.get("RID") - if status_id == 0 and self.metadata.get("RID") == request_id: - break - if status_id == 2 and self.metadata.get("RID") == request_id: - raise ScanAbortion( - "An error occured during the LamNI readout:" - f" {status.metadata.get('error')}" - ) + # fly scan mode + yield from self.stubs.kickoff(device="rt_positions") + # start the readout loop of the flyer + status = yield from self.stubs.complete(device="rt_positions", wait=False) + + while not status.done: + yield from self.stubs.read(group="monitored", point_id=self.point_id) + self.point_id += 1 time.sleep(1) logger.debug("reading monitors") - # yield from self.device_rpc("rtx", "controller.kickoff") def run(self): self.initialize()