From f40fe32317849d4bc81083fad61520f0e6edcce5 Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Mon, 16 Mar 2026 15:18:52 +0100 Subject: [PATCH] fix: flomni async readout --- csaxs_bec/device_configs/ptycho_flomni.yaml | 10 + csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py | 291 ++++++++++++------- csaxs_bec/scans/flomni_fermat_scan.py | 44 ++- 3 files changed, 211 insertions(+), 134 deletions(-) diff --git a/csaxs_bec/device_configs/ptycho_flomni.yaml b/csaxs_bec/device_configs/ptycho_flomni.yaml index ebc3a7d..060a93a 100644 --- a/csaxs_bec/device_configs/ptycho_flomni.yaml +++ b/csaxs_bec/device_configs/ptycho_flomni.yaml @@ -395,6 +395,16 @@ rtz: readoutPriority: on_request connectionTimeout: 20 +rt_flyer: + deviceClass: csaxs_bec.devices.omny.rt.rt_flomni_ophyd.RtFlomniFlyer + deviceConfig: + host: mpc2844.psi.ch + port: 2222 + readoutPriority: async + connectionTimeout: 20 + enabled: true + readOnly: False + ############################################################ ####################### Cameras ############################ ############################################################ diff --git a/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py b/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py index 36f8574..def31da 100644 --- a/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py +++ b/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py @@ -1,20 +1,18 @@ import threading import time -from typing import List import numpy as np -from bec_lib import bec_logger, messages -from bec_lib.endpoints import MessageEndpoints +from bec_lib import bec_logger from ophyd import Component as Cpt from ophyd import Device, PositionerBase, Signal from ophyd.status import wait as status_wait from ophyd.utils import LimitError +from ophyd_devices import AsyncMultiSignal, DeviceStatus, ProgressSignal from ophyd_devices.utils.controller import Controller, threadlocked from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected from prettytable import PrettyTable from csaxs_bec.devices.omny.rt.rt_ophyd import ( - BECConfigError, RtCommunicationError, RtError, RtReadbackSignal, @@ -432,27 +430,6 @@ class RtFlomniController(Controller): t.add_row([i, self.read_ssi_interferometer(i)]) print(t) - def _get_signals_from_table(self, return_table) -> dict: - self.average_stdeviations_x_st_fzp += float(return_table[4]) - self.average_stdeviations_y_st_fzp += float(return_table[7]) - signals = { - "target_x": {"value": float(return_table[2])}, - "average_x_st_fzp": {"value": float(return_table[3])}, - "stdev_x_st_fzp": {"value": float(return_table[4])}, - "target_y": {"value": float(return_table[5])}, - "average_y_st_fzp": {"value": float(return_table[6])}, - "stdev_y_st_fzp": {"value": float(return_table[7])}, - "average_rotz": {"value": float(return_table[8])}, - "stdev_rotz": {"value": float(return_table[9])}, - "average_stdeviations_x_st_fzp": { - "value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1) - }, - "average_stdeviations_y_st_fzp": { - "value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1) - }, - } - return signals - @threadlocked def start_scan(self): if not self.feedback_is_running(): @@ -492,91 +469,6 @@ class RtFlomniController(Controller): current_position_in_scan = int(float(return_table[2])) return (mode, number_of_positions_planned, current_position_in_scan) - def read_positions_from_sampler(self): - # this was for reading after the scan completed - number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read - - read_counter = 0 - - self.average_stdeviations_x_st_fzp = 0 - self.average_stdeviations_y_st_fzp = 0 - self.average_lamni_angle = 0 - - mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status() - - # if not (mode==2 or mode==3): - # error - self.device_manager.connector.set( - MessageEndpoints.device_status("rt_scan"), - messages.DeviceStatusMessage( - device="rt_scan", status=1, metadata=self.readout_metadata - ), - ) - # while scan is running - while mode > 0: - - # TODO here?: scan abortion if no progress in scan *raise error - - # logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}") - mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status() - time.sleep(0.01) - if current_position_in_scan > 5: - while current_position_in_scan > read_counter + 1: - return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",") - # logger.info(f"{return_table}") - logger.info(f"Read {read_counter} out of {number_of_positions_planned}") - - read_counter = read_counter + 1 - - signals = self._get_signals_from_table(return_table) - - self.publish_device_data(signals=signals, point_id=int(return_table[0])) - - time.sleep(0.05) - - # read the last samples even though scan is finished already - while number_of_positions_planned > read_counter: - return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",") - logger.info(f"Read {read_counter} out of {number_of_positions_planned}") - # logger.info(f"{return_table}") - read_counter = read_counter + 1 - - signals = self._get_signals_from_table(return_table) - self.publish_device_data(signals=signals, point_id=int(return_table[0])) - - self.device_manager.connector.set( - MessageEndpoints.device_status("rt_scan"), - messages.DeviceStatusMessage( - device="rt_scan", status=0, metadata=self.readout_metadata - ), - ) - - logger.info( - "Flomni statistics: Average of all standard deviations: x" - f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y" - f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}" - ) - - def publish_device_data(self, signals, point_id): - self.device_manager.connector.set_and_publish( - MessageEndpoints.device_read("rt_flomni"), - messages.DeviceMessage( - signals=signals, metadata={"point_id": point_id, **self.readout_metadata} - ), - ) - - def start_readout(self): - readout = threading.Thread(target=self.read_positions_from_sampler) - readout.start() - - def kickoff(self, metadata): - self.readout_metadata = metadata - while not self._min_scan_buffer_reached: - time.sleep(0.001) - self.start_scan() - time.sleep(0.1) - self.start_readout() - class RtFlomniReadbackSignal(RtReadbackSignal): @retry_once @@ -844,6 +736,185 @@ class RtFlomniMotor(Device, PositionerBase): return super().stop(success=success) +class RtFlomniFlyer(Device): + USER_ACCESS = ["controller"] + data = Cpt( + AsyncMultiSignal, + name="data", + signals=[ + "target_x", + "average_x_st_fzp", + "stdev_x_st_fzp", + "target_y", + "average_y_st_fzp", + "stdev_y_st_fzp", + "average_rotz", + "stdev_rotz", + "average_stdeviations_x_st_fzp", + "average_stdeviations_y_st_fzp", + ], + ndim=1, + async_update={"type": "add", "max_shape": [None]}, + max_size=1000, + ) + progress = Cpt( + ProgressSignal, doc="ProgressSignal indicating the progress of the device during a scan." + ) + + def __init__( + self, + prefix="", + *, + name, + kind=None, + read_attrs=None, + configuration_attrs=None, + parent=None, + host="mpc2844.psi.ch", + port=2222, + socket_cls=SocketIO, + device_manager=None, + **kwargs, + ): + super().__init__(prefix=prefix, name=name, parent=parent, **kwargs) + self.shutdown_event = threading.Event() + self.controller = RtFlomniController( + socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager + ) + self.average_stdeviations_x_st_fzp = 0 + self.average_stdeviations_y_st_fzp = 0 + self.average_lamni_angle = 0 + self.readout_thread = None + self.scan_done_event = threading.Event() + self.scan_done_event.set() + + def read_positions_from_sampler(self, status: DeviceStatus): + """ + Read the positions from the sampler and update the data signal. + This function runs in a separate thread and continuously checks the + scan status. + + Args: + status (DeviceStatus): The status object to update when the readout is complete. + """ + read_counter = 0 + self.average_stdeviations_x_st_fzp = 0 + self.average_stdeviations_y_st_fzp = 0 + self.average_lamni_angle = 0 + + mode, number_of_positions_planned, current_position_in_scan = ( + self.controller.get_scan_status() + ) + + # while scan is running + while mode > 0 and not self.shutdown_event.wait(0.01): + # logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}") + mode, number_of_positions_planned, current_position_in_scan = ( + self.controller.get_scan_status() + ) + if current_position_in_scan > 5: + while current_position_in_scan > read_counter + 1: + return_table = ( + self.controller.socket_put_and_receive(f"r{read_counter}") + ).split(",") + logger.info(f"Read {read_counter} out of {number_of_positions_planned}") + self.progress.put( + value=read_counter, max_value=number_of_positions_planned, done=False + ) + read_counter = read_counter + 1 + signals = self._get_signals_from_table(return_table) + self.data.set(signals) + + if self.shutdown_event.wait(0.05): + logger.info("Shutdown event set, stopping readout.") + # if we are here, the shutdown_event is set. We can exit the readout loop. + status.set_finished() + return + + # read the last samples even though scan is finished already + while number_of_positions_planned > read_counter and not self.shutdown_event.is_set(): + return_table = (self.controller.socket_put_and_receive(f"r{read_counter}")).split(",") + logger.info(f"Read {read_counter} out of {number_of_positions_planned}") + self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=False) + read_counter = read_counter + 1 + + signals = self._get_signals_from_table(return_table) + self.data.set(signals) + + # NOTE: No need to set the status to failed if the shutdown_event is set. + # The stop() method will take care of that. + status.set_finished() + self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=True) + + logger.info( + "Flomni statistics: Average of all standard deviations: x" + f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y" + f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}" + ) + + def _get_signals_from_table(self, return_table) -> dict: + self.average_stdeviations_x_st_fzp += float(return_table[4]) + self.average_stdeviations_y_st_fzp += float(return_table[7]) + signals = { + "target_x": {"value": float(return_table[2])}, + "average_x_st_fzp": {"value": float(return_table[3])}, + "stdev_x_st_fzp": {"value": float(return_table[4])}, + "target_y": {"value": float(return_table[5])}, + "average_y_st_fzp": {"value": float(return_table[6])}, + "stdev_y_st_fzp": {"value": float(return_table[7])}, + "average_rotz": {"value": float(return_table[8])}, + "stdev_rotz": {"value": float(return_table[9])}, + "average_stdeviations_x_st_fzp": { + "value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1) + }, + "average_stdeviations_y_st_fzp": { + "value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1) + }, + } + return signals + + def stage(self): + self.shutdown_event.clear() + self.scan_done_event.set() + return super().stage() + + def start_readout(self, status: DeviceStatus): + self.readout_thread = threading.Thread( + target=self.read_positions_from_sampler, args=(status,) + ) + self.readout_thread.start() + + def kickoff(self) -> DeviceStatus: + self.shutdown_event.clear() + self.scan_done_event.clear() + while not self.controller._min_scan_buffer_reached and not self.shutdown_event.wait(0.001): + ... + self.controller.start_scan() + self.shutdown_event.wait(0.1) + status = DeviceStatus(self) + status.set_finished() + return status + + def complete(self) -> DeviceStatus: + """Wait until the flyer is done.""" + if self.scan_done_event.is_set(): + # if the scan_done_event is already set, we can return a finished status immediately + status = DeviceStatus(self) + status.set_finished() + return status + status = DeviceStatus(self) + self.start_readout(status) + status.add_callback(lambda *args, **kwargs: self.scan_done_event.set()) + return status + + def stop(self, *, success=False): + self.shutdown_event.set() + self.scan_done_event.set() + if self.readout_thread is not None: + self.readout_thread.join() + return super().stop(success=success) + + if __name__ == "__main__": rtcontroller = RtFlomniController( socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None diff --git a/csaxs_bec/scans/flomni_fermat_scan.py b/csaxs_bec/scans/flomni_fermat_scan.py index 5e616b7..3a7c060 100644 --- a/csaxs_bec/scans/flomni_fermat_scan.py +++ b/csaxs_bec/scans/flomni_fermat_scan.py @@ -27,20 +27,19 @@ from bec_lib import bec_logger, messages from bec_lib.alarm_handler import Alarms from bec_lib.endpoints import MessageEndpoints from bec_server.scan_server.errors import ScanAbortion -from bec_server.scan_server.scans import SyncFlyScanBase +from bec_server.scan_server.scans import AsyncFlyScanBase from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import TRIGGERSOURCE logger = bec_logger.logger -class FlomniFermatScan(SyncFlyScanBase): +class FlomniFermatScan(AsyncFlyScanBase): scan_name = "flomni_fermat_scan" scan_type = "fly" required_kwargs = ["fovx", "fovy", "exp_time", "step", "angle"] arg_input = {} arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} - use_scan_progress_report = True def __init__( self, @@ -104,6 +103,14 @@ class FlomniFermatScan(SyncFlyScanBase): self.zshift = -100 self.flomni_rotation_status = None + def scan_report_instructions(self): + """Scan report instructions for the progress bar""" + yield from self.stubs.scan_report_instruction({"device_progress": ["rt_flyer"]}) + + @property + def monitor_sync(self) -> str: + return "rt_flyer" + def initialize(self): self.scan_motors = [] self.update_readout_priority() @@ -113,10 +120,6 @@ class FlomniFermatScan(SyncFlyScanBase): self.positions, corridor_size=self.optim_trajectory_corridor ) - @property - def monitor_sync(self): - return "rt_flomni" - def reverse_trajectory(self): """ Reverse the trajectory. Every other scan should be reversed to @@ -290,26 +293,18 @@ class FlomniFermatScan(SyncFlyScanBase): return np.array(positions) def scan_core(self): - # use a device message to receive the scan number and - # scan ID before sending the message to the device server - yield from self.stubs.kickoff(device="rtx") - while True: - yield from self.stubs.read(group="monitored") - status = self.connector.get(MessageEndpoints.device_status("rt_scan")) - if status: - status_id = status.content.get("status", 1) - request_id = status.metadata.get("RID") - if status_id == 0 and self.metadata.get("RID") == request_id: - break - if status_id == 2 and self.metadata.get("RID") == request_id: - raise ScanAbortion( - "An error occured during the flomni readout:" - f" {status.metadata.get('error')}" - ) + # send off the flyer + yield from self.stubs.kickoff(device="rt_flyer") + # start the readout loop of the flyer + status = yield from self.stubs.complete(device="rt_flyer", wait=False) + + # read the monitors until the flyer is done + while not status.done: + yield from self.stubs.read(group="monitored", point_id=self.point_id) + self.point_id += 1 time.sleep(1) logger.debug("reading monitors") - # yield from self.device_rpc("rtx", "controller.kickoff") def move_to_start(self): """return to the start position""" @@ -336,6 +331,7 @@ class FlomniFermatScan(SyncFlyScanBase): yield from self.read_scan_motors() self.prepare_positions() yield from self._prepare_setup() + yield from self.scan_report_instructions() yield from self.open_scan() yield from self.stage() yield from self.run_baseline_reading()