From f5b71473d6a6beb9494a455aee1d2c3afba6bc7e Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Tue, 14 Apr 2026 13:05:43 +0200 Subject: [PATCH 1/4] fix: move lamni fly scan to dedicated flyer device --- csaxs_bec/device_configs/ptycho_lamni.yaml | 10 + csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py | 2 - csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py | 331 +++++++++++-------- csaxs_bec/scans/LamNIFermatScan.py | 30 +- 4 files changed, 222 insertions(+), 151 deletions(-) diff --git a/csaxs_bec/device_configs/ptycho_lamni.yaml b/csaxs_bec/device_configs/ptycho_lamni.yaml index 63ccf96..59718fc 100644 --- a/csaxs_bec/device_configs/ptycho_lamni.yaml +++ b/csaxs_bec/device_configs/ptycho_lamni.yaml @@ -271,6 +271,16 @@ rty: enabled: true readOnly: False +rt_positions: + deviceClass: csaxs_bec.devices.omny.rt.rt_lamni_ophyd.RtLamniFlyer + deviceConfig: + host: mpc2680.psi.ch + port: 3333 + readoutPriority: async + connectionTimeout: 20 + enabled: true + readOnly: False + ############################################################ ######################### Cameras ########################## ############################################################ diff --git a/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py b/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py index def31da..d882f4d 100644 --- a/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py +++ b/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py @@ -783,7 +783,6 @@ class RtFlomniFlyer(Device): ) self.average_stdeviations_x_st_fzp = 0 self.average_stdeviations_y_st_fzp = 0 - self.average_lamni_angle = 0 self.readout_thread = None self.scan_done_event = threading.Event() self.scan_done_event.set() @@ -800,7 +799,6 @@ class RtFlomniFlyer(Device): read_counter = 0 self.average_stdeviations_x_st_fzp = 0 self.average_stdeviations_y_st_fzp = 0 - self.average_lamni_angle = 0 mode, number_of_positions_planned, current_position_in_scan = ( self.controller.get_scan_status() diff --git a/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py b/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py index 3e3df3c..7c50047 100644 --- a/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py +++ b/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py @@ -3,12 +3,12 @@ import threading import time import numpy as np -from bec_lib import bec_logger, messages -from bec_lib.endpoints import MessageEndpoints +from bec_lib import bec_logger from ophyd import Component as Cpt -from ophyd import Device, PositionerBase, Signal +from ophyd import Device, DeviceStatus, PositionerBase, Signal from ophyd.status import wait as status_wait from ophyd.utils import LimitError, ReadOnlyError +from ophyd_devices.utils.bec_signals import AsyncMultiSignal, ProgressSignal from ophyd_devices.utils.controller import Controller, threadlocked from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected from prettytable import PrettyTable @@ -215,7 +215,7 @@ class RtLamniController(Controller): def start_scan(self): # interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0]) # if interferometer_feedback_not_running == 1: - if not self.feedback_is_running(): + if not self.feedback_is_running(): logger.error( "Cannot start scan because feedback loop is not running or there is an interferometer error." ) @@ -232,51 +232,6 @@ class RtLamniController(Controller): # start a point-by-point scan (for cont scan in flomni it would be "sa") self.socket_put_and_receive("sd") - def start_readout(self): - readout = threading.Thread(target=self.read_positions_from_sampler) - readout.start() - - def kickoff(self, metadata): - self.readout_metadata = metadata - while not self._min_scan_buffer_reached: - time.sleep(0.001) - self.start_scan() - time.sleep(0.1) - self.start_readout() - - def _get_signals_from_table(self, return_table) -> dict: - self.average_stdeviations_x_st_fzp += float(return_table[5]) - self.average_stdeviations_y_st_fzp += float(return_table[8]) - self.average_lamni_angle += float(return_table[19]) - signals = { - "target_x": {"value": float(return_table[3])}, - "average_x_st_fzp": {"value": float(return_table[4])}, - "stdev_x_st_fzp": {"value": float(return_table[5])}, - "target_y": {"value": float(return_table[6])}, - "average_y_st_fzp": {"value": float(return_table[7])}, - "stdev_y_st_fzp": {"value": float(return_table[8])}, - "average_cap1": {"value": float(return_table[9])}, - "stdev_cap1": {"value": float(return_table[10])}, - "average_cap2": {"value": float(return_table[11])}, - "stdev_cap2": {"value": float(return_table[12])}, - "average_cap3": {"value": float(return_table[13])}, - "stdev_cap3": {"value": float(return_table[14])}, - "average_cap4": {"value": float(return_table[15])}, - "stdev_cap4": {"value": float(return_table[16])}, - "average_cap5": {"value": float(return_table[17])}, - "stdev_cap5": {"value": float(return_table[18])}, - "average_angle_interf_ST": {"value": float(return_table[19])}, - "stdev_angle_interf_ST": {"value": float(return_table[20])}, - "average_stdeviations_x_st_fzp": { - "value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1) - }, - "average_stdeviations_y_st_fzp": { - "value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1) - }, - "average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)}, - } - return signals - def feedback_is_running(self) -> bool: status = int(float((self.socket_put_and_receive("J2")).split(",")[0])) return status == 0 # 0 means running, 1 means error/disabled @@ -315,75 +270,6 @@ class RtLamniController(Controller): return - def read_positions_from_sampler(self): - # this was for reading after the scan completed - number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read - - read_counter = 0 - previous_point_in_scan = 0 - - self.average_stdeviations_x_st_fzp = 0 - self.average_stdeviations_y_st_fzp = 0 - self.average_lamni_angle = 0 - - mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status() - - # if not (mode==2 or mode==3): - # error - self.device_manager.connector.set( - MessageEndpoints.device_status("rt_scan"), - messages.DeviceStatusMessage( - device="rt_scan", status=1, metadata=self.readout_metadata - ), - ) - # while scan is running - while mode > 0: - # logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}") - mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status() - time.sleep(0.01) - if current_position_in_scan > 5: - while current_position_in_scan > read_counter + 1: - return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",") - # logger.info(f"{return_table}") - logger.info(f"Read {read_counter} out of {number_of_positions_planned}") - - read_counter = read_counter + 1 - - signals = self._get_signals_from_table(return_table) - - self.publish_device_data(signals=signals, point_id=int(return_table[0])) - - time.sleep(0.05) - - # read the last samples even though scan is finished already - while number_of_positions_planned > read_counter: - return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",") - logger.info(f"Read {read_counter} out of {number_of_positions_planned}") - # logger.info(f"{return_table}") - read_counter = read_counter + 1 - - signals = self._get_signals_from_table(return_table) - self.publish_device_data(signals=signals, point_id=int(return_table[0])) - - self.device_manager.connector.set( - MessageEndpoints.device_status("rt_scan"), - messages.DeviceStatusMessage( - device="rt_scan", status=0, metadata=self.readout_metadata - ), - ) - - logger.info( - f"LamNI statistics: Average of all standard deviations: x {self.average_stdeviations_x_st_fzp/number_of_samples_to_read}, y {self.average_stdeviations_y_st_fzp/number_of_samples_to_read}, angle {self.average_lamni_angle/number_of_samples_to_read}." - ) - - def publish_device_data(self, signals, point_id): - self.device_manager.connector.set_and_publish( - MessageEndpoints.device_read("rt_lamni"), - messages.DeviceMessage( - signals=signals, metadata={"point_id": point_id, **self.readout_metadata} - ), - ) - def feedback_status_angle_lamni(self) -> bool: return_table = (self.socket_put_and_receive("J7")).split(",") logger.debug( @@ -792,27 +678,214 @@ class RtLamniMotor(Device, PositionerBase): else: raise TypeError(f"Expected value of type int but received {type(val)}") - def kickoff(self, metadata, **kwargs) -> None: - self.controller.kickoff(metadata) - @property def egu(self): """The engineering units (EGU) for positions""" return "um" - # how is this used later? - - def stage(self) -> list[object]: - return super().stage() - - def unstage(self) -> list[object]: - return super().unstage() - def stop(self, *, success=False): self.controller.stop_all_axes() return super().stop(success=success) +class RtLamniFlyer(Device): + USER_ACCESS = ["controller"] + data = Cpt( + AsyncMultiSignal, + name="data", + signals=[ + "target_x", + "average_x_st_fzp", + "stdev_x_st_fzp", + "target_y", + "average_y_st_fzp", + "stdev_y_st_fzp", + "average_cap1", + "stdev_cap1", + "average_cap2", + "stdev_cap2", + "average_cap3", + "stdev_cap3", + "average_cap4", + "stdev_cap4", + "average_cap5", + "stdev_cap5", + "average_angle_interf_ST", + "stdev_angle_interf_ST", + "average_stdeviations_x_st_fzp", + "average_stdeviations_y_st_fzp", + "average_lamni_angle", + ], + ndim=1, + async_update={"type": "add", "max_shape": [None]}, + max_size=1000, + ) + progress = Cpt(ProgressSignal, doc="ProgressSignal indicating of the device during the scan.") + + def __init__( + self, + prefix="", + *, + name, + kind=None, + read_attrs=None, + configuration_attrs=None, + parent=None, + host="mpc2680.psi.ch", + port=3333, + socket_cls=SocketIO, + device_manager=None, + **kwargs, + ): + super().__init__(prefix=prefix, name=name, parent=parent, **kwargs) + self.shutdown_event = threading.Event() + self.controller = RtLamniController( + socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager + ) + self.average_stdeviations_x_st_fzp = 0 + self.average_stdeviations_y_st_fzp = 0 + self.average_lamni_angle = 0 + self.readout_thread = None + self.scan_done_event = threading.Event() + self.scan_done_event.set() + + def stage(self): + self.shutdown_event.clear() + self.scan_done_event.set() + return super().stage() + + def start_readout(self, status: DeviceStatus): + self.readout_thread = threading.Thread( + target=self.read_positions_from_sampler, args=(status,) + ) + self.readout_thread.start() + + def kickoff(self) -> DeviceStatus: + self.shutdown_event.clear() + self.scan_done_event.clear() + while not self.controller._min_scan_buffer_reached and not self.shutdown_event.wait(0.001): + ... + self.controller.start_scan() + self.shutdown_event.wait(0.1) + status = DeviceStatus(self) + status.set_finished() + return status + + def complete(self) -> DeviceStatus: + """Wait until the flyer is done.""" + if self.scan_done_event.is_set(): + # if the scan_done_event is already set, we can return a finished status immediately + status = DeviceStatus(self) + status.set_finished() + return status + status = DeviceStatus(self) + self.start_readout(status) + status.add_callback(lambda *args, **kwargs: self.scan_done_event.set()) + return status + + def stop(self, *, success=False): + self.shutdown_event.set() + self.scan_done_event.set() + if self.readout_thread is not None: + self.readout_thread.join() + return super().stop(success=success) + + def read_positions_from_sampler(self, status: DeviceStatus): + """ + Read the positions from the sampler and update the data signal. + This function runs in a separate thread and continuously checks the + scan status. + + Args: + status (DeviceStatus): The status object to update when the readout is complete. + """ + read_counter = 0 + self.average_stdeviations_x_st_fzp = 0 + self.average_stdeviations_y_st_fzp = 0 + self.average_lamni_angle = 0 + + mode, number_of_positions_planned, current_position_in_scan = ( + self.controller.get_scan_status() + ) + + # while scan is running + while mode > 0 and not self.shutdown_event.wait(0.01): + # logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}") + mode, number_of_positions_planned, current_position_in_scan = ( + self.controller.get_scan_status() + ) + if current_position_in_scan > 5: + while current_position_in_scan > read_counter + 1: + return_table = ( + self.controller.socket_put_and_receive(f"r{read_counter}") + ).split(",") + logger.info(f"Read {read_counter} out of {number_of_positions_planned}") + self.progress.put( + value=read_counter, max_value=number_of_positions_planned, done=False + ) + read_counter = read_counter + 1 + signals = self._get_signals_from_table(return_table) + self.data.set(signals) + + if self.shutdown_event.wait(0.05): + logger.info("Shutdown event set, stopping readout.") + # if we are here, the shutdown_event is set. We can exit the readout loop. + status.set_finished() + return + + # read the last samples even though scan is finished already + while number_of_positions_planned > read_counter and not self.shutdown_event.is_set(): + return_table = (self.controller.socket_put_and_receive(f"r{read_counter}")).split(",") + logger.info(f"Read {read_counter} out of {number_of_positions_planned}") + self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=False) + read_counter = read_counter + 1 + + signals = self._get_signals_from_table(return_table) + self.data.set(signals) + + # NOTE: No need to set the status to failed if the shutdown_event is set. + # The stop() method will take care of that. + status.set_finished() + self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=True) + + logger.info( + f"LamNI statistics: Average of all standard deviations: x {self.average_stdeviations_x_st_fzp}, y {self.average_stdeviations_y_st_fzp}, angle {self.average_lamni_angle}." + ) + + def _get_signals_from_table(self, return_table) -> dict: + self.average_stdeviations_x_st_fzp += float(return_table[5]) + self.average_stdeviations_y_st_fzp += float(return_table[8]) + self.average_lamni_angle += float(return_table[19]) + signals = { + "target_x": {"value": float(return_table[3])}, + "average_x_st_fzp": {"value": float(return_table[4])}, + "stdev_x_st_fzp": {"value": float(return_table[5])}, + "target_y": {"value": float(return_table[6])}, + "average_y_st_fzp": {"value": float(return_table[7])}, + "stdev_y_st_fzp": {"value": float(return_table[8])}, + "average_cap1": {"value": float(return_table[9])}, + "stdev_cap1": {"value": float(return_table[10])}, + "average_cap2": {"value": float(return_table[11])}, + "stdev_cap2": {"value": float(return_table[12])}, + "average_cap3": {"value": float(return_table[13])}, + "stdev_cap3": {"value": float(return_table[14])}, + "average_cap4": {"value": float(return_table[15])}, + "stdev_cap4": {"value": float(return_table[16])}, + "average_cap5": {"value": float(return_table[17])}, + "stdev_cap5": {"value": float(return_table[18])}, + "average_angle_interf_ST": {"value": float(return_table[19])}, + "stdev_angle_interf_ST": {"value": float(return_table[20])}, + "average_stdeviations_x_st_fzp": { + "value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1) + }, + "average_stdeviations_y_st_fzp": { + "value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1) + }, + "average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)}, + } + return signals + + if __name__ == "__main__": # pragma: no cover mock = False if not mock: diff --git a/csaxs_bec/scans/LamNIFermatScan.py b/csaxs_bec/scans/LamNIFermatScan.py index 98982c2..06fe62c 100644 --- a/csaxs_bec/scans/LamNIFermatScan.py +++ b/csaxs_bec/scans/LamNIFermatScan.py @@ -169,8 +169,8 @@ class LamNIMixin: self.device_manager.devices.lsamx.read_only = True self.device_manager.devices.lsamy.read_only = True - - #update angle readback before start of the scan + + # update angle readback before start of the scan yield from self.stubs.send_rpc_and_wait("lsamrot", "readback.get") yield from self.stubs.send_rpc_and_wait("rtx", "controller.feedback_enable_without_reset") @@ -455,27 +455,17 @@ class LamNIFermatScan(ScanBase, LamNIMixin): yield from self._at_each_point(ind, pos) self.burst_index = 0 elif self.scan_type == "fly": - # use a device message to receive the scan number and - # scan ID before sending the message to the device server - yield from self.stubs.kickoff(device="rtx") - while True: - yield from self.stubs.read(group="monitored") - msg = self.device_manager.connector.get(MessageEndpoints.device_status("rt_scan")) - if msg: - status = msg - status_id = status.content.get("status", 1) - request_id = status.metadata.get("RID") - if status_id == 0 and self.metadata.get("RID") == request_id: - break - if status_id == 2 and self.metadata.get("RID") == request_id: - raise ScanAbortion( - "An error occured during the LamNI readout:" - f" {status.metadata.get('error')}" - ) + # fly scan mode + yield from self.stubs.kickoff(device="rt_positions") + # start the readout loop of the flyer + status = yield from self.stubs.complete(device="rt_positions", wait=False) + + while not status.done: + yield from self.stubs.read(group="monitored", point_id=self.point_id) + self.point_id += 1 time.sleep(1) logger.debug("reading monitors") - # yield from self.device_rpc("rtx", "controller.kickoff") def run(self): self.initialize() -- 2.52.0 From d8ea93a58727f96354cf3561f05a9372beb51e11 Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Tue, 14 Apr 2026 14:27:59 +0200 Subject: [PATCH 2/4] fix: update omny to dedicate flyer --- csaxs_bec/device_configs/ptycho_omny.yaml | 12 +- csaxs_bec/devices/omny/rt/rt_omny_ophyd.py | 317 +++++++++++++-------- csaxs_bec/scans/omny_fermat_scan.py | 30 +- 3 files changed, 215 insertions(+), 144 deletions(-) diff --git a/csaxs_bec/device_configs/ptycho_omny.yaml b/csaxs_bec/device_configs/ptycho_omny.yaml index cd894a1..4f809a7 100755 --- a/csaxs_bec/device_configs/ptycho_omny.yaml +++ b/csaxs_bec/device_configs/ptycho_omny.yaml @@ -101,7 +101,17 @@ rtz: readOnly: false readoutPriority: on_request connectionTimeout: 20 - + +rt_positions: + deviceClass: csaxs_bec.devices.omny.rt.rt_omny_ophyd.RtOMNYFlyer + deviceConfig: + host: mpc3217.psi.ch + port: 3333 + readoutPriority: async + connectionTimeout: 20 + enabled: true + readOnly: False + # ############################################################ # ##################### OMNY samples ######################### # ############################################################ diff --git a/csaxs_bec/devices/omny/rt/rt_omny_ophyd.py b/csaxs_bec/devices/omny/rt/rt_omny_ophyd.py index 13a5fa2..13acd74 100644 --- a/csaxs_bec/devices/omny/rt/rt_omny_ophyd.py +++ b/csaxs_bec/devices/omny/rt/rt_omny_ophyd.py @@ -2,15 +2,14 @@ import builtins import socket import threading import time -from typing import List import numpy as np -from bec_lib import bec_logger, messages -from bec_lib.endpoints import MessageEndpoints +from bec_lib import bec_logger from ophyd import Component as Cpt -from ophyd import Device, PositionerBase, Signal +from ophyd import Device, DeviceStatus, PositionerBase, Signal from ophyd.status import wait as status_wait from ophyd.utils import LimitError +from ophyd_devices import AsyncMultiSignal, ProgressSignal from ophyd_devices.utils.controller import Controller, threadlocked from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected from prettytable import PrettyTable @@ -23,7 +22,6 @@ if builtins.__dict__.get("bec") is not None: from csaxs_bec.devices.omny.rt.rt_ophyd import ( - BECConfigError, RtCommunicationError, RtError, RtReadbackSignal, @@ -842,35 +840,6 @@ class RtOMNYController(Controller): self.feedback_is_running() return self.ssi[channel] - def _get_signals_from_table(self, return_table) -> dict: - self.average_stdeviations_x_st_fzp += float(return_table[16]) - self.average_stdeviations_y_st_fzp += float(return_table[18]) - signals = { - "target_x": {"value": float(return_table[3])}, - "average_x_st_osa": {"value": float(return_table[4])}, - "stdev_x_st_osa": {"value": float(return_table[5])}, - "target_y": {"value": float(return_table[6])}, - "average_y_st_osa": {"value": float(return_table[7])}, - "stdev_y_st_osa": {"value": float(return_table[8])}, - "average_x_osa_fzp": {"value": float(return_table[9])}, - "stdev_x_osa_fzp": {"value": float(return_table[10])}, - "average_y_osa_fzp": {"value": float(return_table[11])}, - "stdev_y_osa_fzp": {"value": float(return_table[12])}, - "average_rotz_st": {"value": float(return_table[13])}, - "stdev_rotz_st": {"value": float(return_table[14])}, - "average_x_st_fzp": {"value": float(return_table[15])}, - "stdev_x_st_fzp": {"value": float(return_table[16])}, - "average_y_st_fzp": {"value": float(return_table[17])}, - "stdev_y_st_fzp": {"value": float(return_table[18])}, - "average_stdeviations_x_st_fzp": { - "value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1) - }, - "average_stdeviations_y_st_fzp": { - "value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1) - }, - } - return signals - @threadlocked def start_scan(self): if not self.feedback_is_running(): @@ -910,95 +879,6 @@ class RtOMNYController(Controller): current_position_in_scan = int(float(return_table[2])) return (mode, number_of_positions_planned, current_position_in_scan) - def read_positions_from_sampler(self): - # this was for reading after the scan completed - number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read - - read_counter = 0 - - self.average_stdeviations_x_st_fzp = 0 - self.average_stdeviations_y_st_fzp = 0 - - mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status() - - # if not (mode==2 or mode==3): - # error - self.device_manager.connector.set( - MessageEndpoints.device_status("rt_scan"), - messages.DeviceStatusMessage( - device="rt_scan", status=1, metadata=self.readout_metadata - ), - ) - # while scan is running - while mode > 0: - # logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}") - mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status() - time.sleep(0.01) - if current_position_in_scan > 5: - while current_position_in_scan > read_counter + 1: - return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",") - # logger.info(f"{return_table}") - logger.info(f"Read {read_counter} out of {number_of_positions_planned}") - - read_counter = read_counter + 1 - - signals = self._get_signals_from_table(return_table) - - self.publish_device_data(signals=signals, point_id=int(return_table[0])) - - time.sleep(0.05) - - # read the last samples even though scan is finished already - while number_of_positions_planned > read_counter: - return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",") - logger.info(f"Read {read_counter} out of {number_of_positions_planned}") - # logger.info(f"{return_table}") - read_counter = read_counter + 1 - - signals = self._get_signals_from_table(return_table) - self.publish_device_data(signals=signals, point_id=int(return_table[0])) - - self.device_manager.connector.set( - MessageEndpoints.device_status("rt_scan"), - messages.DeviceStatusMessage( - device="rt_scan", status=0, metadata=self.readout_metadata - ), - ) - - logger.info( - "OMNY statistics: Average of all standard deviations [nm]: x" - f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y" - f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}." - ) - - self.device_manager.connector.send_client_info( - "OMNY statistics: Average of all standard deviations [nm]: x" - f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y" - f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}.", - scope="", - show_asap=True, - ) - - def publish_device_data(self, signals, point_id): - self.device_manager.connector.set_and_publish( - MessageEndpoints.device_read("rt_omny"), - messages.DeviceMessage( - signals=signals, metadata={"point_id": point_id, **self.readout_metadata} - ), - ) - - def start_readout(self): - readout = threading.Thread(target=self.read_positions_from_sampler) - readout.start() - - def kickoff(self, metadata): - self.readout_metadata = metadata - while not self._min_scan_buffer_reached: - time.sleep(0.001) - self.start_scan() - time.sleep(0.1) - self.start_readout() - class RtOMNYReadbackSignal(RtReadbackSignal): @retry_once @@ -1241,9 +1121,6 @@ class RtOMNYMotor(Device, PositionerBase): else: raise TypeError(f"Expected value of type int but received {type(val)}") - def kickoff(self, metadata, **kwargs) -> None: - self.controller.kickoff(metadata) - @property def egu(self): """The engineering units (EGU) for positions""" @@ -1255,6 +1132,194 @@ class RtOMNYMotor(Device, PositionerBase): return super().stop(success=success) +class RtOMNYFlyer(Device): + USER_ACCESS = ["controller"] + + data = Cpt( + AsyncMultiSignal, + name="data", + signals=[ + "target_x", + "average_x_st_osa", + "stdev_x_st_osa", + "target_y", + "average_y_st_osa", + "stdev_y_st_osa", + "average_x_osa_fzp", + "stdev_x_osa_fzp", + "average_y_osa_fzp", + "stdev_y_osa_fzp", + "average_rotz_st", + "stdev_rotz_st", + "average_x_st_fzp", + "stdev_x_st_fzp", + "average_y_st_fzp", + "stdev_y_st_fzp", + "average_stdeviations_x_st_fzp", + "average_stdeviations_y_st_fzp", + ], + ndim=1, + async_update={"type": "add", "max_shape": [None]}, + max_size=1000, + ) + progress = Cpt(ProgressSignal, doc="Signal to track the progress of the device during a scan") + + def __init__( + self, + prefix="", + *, + name, + kind=None, + read_attrs=None, + configuration_attrs=None, + parent=None, + host="mpc3217.psi.ch", + port=2222, + socket_cls=SocketIO, + device_manager=None, + **kwargs, + ): + super().__init__(prefix=prefix, name=name, parent=parent, **kwargs) + self.shutdown_event = threading.Event() + self.controller = RtOMNYController( + socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager + ) + self.average_stdeviations_x_st_fzp = 0 + self.average_stdeviations_y_st_fzp = 0 + self.readout_thread = None + self.scan_done_event = threading.Event() + self.scan_done_event.set() + + def stage(self): + self.shutdown_event.clear() + self.scan_done_event.set() + return super().stage() + + def start_readout(self, status: DeviceStatus): + self.readout_thread = threading.Thread( + target=self.read_positions_from_sampler, args=(status,) + ) + self.readout_thread.start() + + def kickoff(self) -> DeviceStatus: + self.shutdown_event.clear() + self.scan_done_event.clear() + while not self.controller._min_scan_buffer_reached and not self.shutdown_event.wait(0.001): + ... + self.controller.start_scan() + self.shutdown_event.wait(0.1) + status = DeviceStatus(self) + status.set_finished() + return status + + def complete(self) -> DeviceStatus: + """Wait until the flyer is done.""" + if self.scan_done_event.is_set(): + # if the scan_done_event is already set, we can return a finished status immediately + status = DeviceStatus(self) + status.set_finished() + return status + status = DeviceStatus(self) + self.start_readout(status) + status.add_callback(lambda *args, **kwargs: self.scan_done_event.set()) + return status + + def stop(self, *, success=False): + self.shutdown_event.set() + self.scan_done_event.set() + if self.readout_thread is not None: + self.readout_thread.join() + return super().stop(success=success) + + def read_positions_from_sampler(self, status: DeviceStatus): + """ + Read the positions from the sampler and update the data signal. + This function runs in a separate thread and continuously checks the + scan status. + + Args: + status (DeviceStatus): The status object to update when the readout is complete. + """ + read_counter = 0 + self.average_stdeviations_x_st_fzp = 0 + self.average_stdeviations_y_st_fzp = 0 + + mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status() + while mode > 0 and not self.shutdown_event.wait(0.01): + # logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}") + mode, number_of_positions_planned, current_position_in_scan = ( + self.controller.get_scan_status() + ) + if current_position_in_scan > 5: + while current_position_in_scan > read_counter + 1: + return_table = ( + self.controller.socket_put_and_receive(f"r{read_counter}") + ).split(",") + logger.info(f"Read {read_counter} out of {number_of_positions_planned}") + self.progress.put( + value=read_counter, max_value=number_of_positions_planned, done=False + ) + read_counter = read_counter + 1 + signals = self._get_signals_from_table(return_table) + self.data.set(signals) + + if self.shutdown_event.wait(0.05): + logger.info("Shutdown event set, stopping readout.") + # if we are here, the shutdown_event is set. We can exit the readout loop. + status.set_finished() + return + + # read the last samples even though scan is finished already + while number_of_positions_planned > read_counter and not self.shutdown_event.is_set(): + return_table = (self.controller.socket_put_and_receive(f"r{read_counter}")).split(",") + logger.info(f"Read {read_counter} out of {number_of_positions_planned}") + self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=False) + read_counter = read_counter + 1 + + signals = self._get_signals_from_table(return_table) + self.data.set(signals) + + # NOTE: No need to set the status to failed if the shutdown_event is set. + # The stop() method will take care of that. + status.set_finished() + self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=True) + + logger.info( + "OMNY statistics: Average of all standard deviations: x" + f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y" + f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}" + ) + + def _get_signals_from_table(self, return_table) -> dict: + self.average_stdeviations_x_st_fzp += float(return_table[16]) + self.average_stdeviations_y_st_fzp += float(return_table[18]) + signals = { + "target_x": {"value": float(return_table[3])}, + "average_x_st_osa": {"value": float(return_table[4])}, + "stdev_x_st_osa": {"value": float(return_table[5])}, + "target_y": {"value": float(return_table[6])}, + "average_y_st_osa": {"value": float(return_table[7])}, + "stdev_y_st_osa": {"value": float(return_table[8])}, + "average_x_osa_fzp": {"value": float(return_table[9])}, + "stdev_x_osa_fzp": {"value": float(return_table[10])}, + "average_y_osa_fzp": {"value": float(return_table[11])}, + "stdev_y_osa_fzp": {"value": float(return_table[12])}, + "average_rotz_st": {"value": float(return_table[13])}, + "stdev_rotz_st": {"value": float(return_table[14])}, + "average_x_st_fzp": {"value": float(return_table[15])}, + "stdev_x_st_fzp": {"value": float(return_table[16])}, + "average_y_st_fzp": {"value": float(return_table[17])}, + "stdev_y_st_fzp": {"value": float(return_table[18])}, + "average_stdeviations_x_st_fzp": { + "value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1) + }, + "average_stdeviations_y_st_fzp": { + "value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1) + }, + } + return signals + + if __name__ == "__main__": rtcontroller = RtOMNYController( socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None diff --git a/csaxs_bec/scans/omny_fermat_scan.py b/csaxs_bec/scans/omny_fermat_scan.py index 38269b7..b2f5bcb 100644 --- a/csaxs_bec/scans/omny_fermat_scan.py +++ b/csaxs_bec/scans/omny_fermat_scan.py @@ -51,7 +51,7 @@ class OMNYFermatScan(SyncFlyScanBase): angle: float = None, corridor_size: float = 3, parameter: dict = None, - frames_per_trigger:int=1, + frames_per_trigger: int = 1, **kwargs, ): """ @@ -75,7 +75,9 @@ class OMNYFermatScan(SyncFlyScanBase): >>> scans.omny_fermat_scan(fovx=20, fovy=25, cenx=10, ceny=0, zshift=0, angle=0, step=2, exp_time=0.01) """ - super().__init__(parameter=parameter, exp_time=exp_time, frames_per_trigger=frames_per_trigger, **kwargs) + super().__init__( + parameter=parameter, exp_time=exp_time, frames_per_trigger=frames_per_trigger, **kwargs + ) self.axis = [] self.fovx = fovx self.fovy = fovy @@ -265,22 +267,16 @@ class OMNYFermatScan(SyncFlyScanBase): # scan ID before sending the message to the device server yield from self.stubs.kickoff(device="rtx") while True: - yield from self.stubs.read(group="monitored") - status = self.connector.get(MessageEndpoints.device_status("rt_scan")) - if status: - status_id = status.content.get("status", 1) - request_id = status.metadata.get("RID") - if status_id == 0 and self.metadata.get("RID") == request_id: - break - if status_id == 2 and self.metadata.get("RID") == request_id: - raise ScanAbortion( - "An error occured during the omny readout:" - f" {status.metadata.get('error')}" - ) + yield from self.stubs.kickoff(device="rt_positions") - time.sleep(1) - logger.debug("reading monitors") - # yield from self.device_rpc("rtx", "controller.kickoff") + # start the readout loop of the flyer + status = yield from self.stubs.complete(device="rt_positions", wait=False) + + while not status.done: + yield from self.stubs.read(group="monitored", point_id=self.point_id) + self.point_id += 1 + time.sleep(1) + logger.debug("reading monitors") def move_to_start(self): """return to the start position""" -- 2.52.0 From ffe52a13dd4d0138d953475e80a9f60408b61a24 Mon Sep 17 00:00:00 2001 From: x01dc Date: Tue, 14 Apr 2026 15:26:37 +0200 Subject: [PATCH 3/4] minor format fixes --- csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py | 3 ++- csaxs_bec/scans/LamNIFermatScan.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py b/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py index 7c50047..53c6e4f 100644 --- a/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py +++ b/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py @@ -186,7 +186,8 @@ class RtLamniController(Controller): def send_positions(parent, positions): parent._min_scan_buffer_reached = False for pos_index, pos in enumerate(positions): - parent.socket_put_and_receive(f"s{pos[0]:.05f},{pos[1]:.05f},0") + cmd = f"s{pos[0]:.05f},{pos[1]:.05f},0" + parent.socket_put_and_receive(cmd) if pos_index > 100: parent._min_scan_buffer_reached = True parent._min_scan_buffer_reached = True diff --git a/csaxs_bec/scans/LamNIFermatScan.py b/csaxs_bec/scans/LamNIFermatScan.py index 06fe62c..3a209e5 100644 --- a/csaxs_bec/scans/LamNIFermatScan.py +++ b/csaxs_bec/scans/LamNIFermatScan.py @@ -339,7 +339,7 @@ class LamNIFermatScan(ScanBase, LamNIMixin): def _transfer_positions_to_LamNI(self): yield from self.stubs.send_rpc_and_wait( - "rtx", "controller.add_pos_to_scan", (self.positions.tolist(),) + "rtx", "controller.add_pos_to_scan", self.positions.tolist() ) def _calculate_positions(self): -- 2.52.0 From 9d368d554da334558995e27a62382963e1e052bd Mon Sep 17 00:00:00 2001 From: x01dc Date: Tue, 14 Apr 2026 15:31:06 +0200 Subject: [PATCH 4/4] format fix --- csaxs_bec/devices/omny/rt/rt_omny_ophyd.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/csaxs_bec/devices/omny/rt/rt_omny_ophyd.py b/csaxs_bec/devices/omny/rt/rt_omny_ophyd.py index 13acd74..ef0aa09 100644 --- a/csaxs_bec/devices/omny/rt/rt_omny_ophyd.py +++ b/csaxs_bec/devices/omny/rt/rt_omny_ophyd.py @@ -254,7 +254,8 @@ class RtOMNYController(Controller): parent._min_scan_buffer_reached = False start_time = time.time() for pos_index, pos in enumerate(positions): - parent.socket_put_and_receive(f"s{pos[0]:.05f},{pos[1]:.05f},{pos[2]:.05f}") + cmd = f"s{pos[0]:.05f},{pos[1]:.05f},{pos[2]:.05f}" + parent.socket_put_and_receive(cmd) if pos_index > 100: parent._min_scan_buffer_reached = True parent._min_scan_buffer_reached = True -- 2.52.0