From 28da69a38c792ea76caacae09c6a26f9e17274a0 Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Tue, 25 Nov 2025 17:45:34 +0100 Subject: [PATCH] wip - rt flyer --- .../plugins/flomni/x_ray_eye_align.py | 2 +- csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py | 344 +++++++++++------- csaxs_bec/scans/flomni_fermat_scan.py | 28 +- 3 files changed, 213 insertions(+), 161 deletions(-) diff --git a/csaxs_bec/bec_ipython_client/plugins/flomni/x_ray_eye_align.py b/csaxs_bec/bec_ipython_client/plugins/flomni/x_ray_eye_align.py index f156aad..1c16d7a 100644 --- a/csaxs_bec/bec_ipython_client/plugins/flomni/x_ray_eye_align.py +++ b/csaxs_bec/bec_ipython_client/plugins/flomni/x_ray_eye_align.py @@ -17,7 +17,7 @@ umv = builtins.__dict__.get("umv") umvr = builtins.__dict__.get("umvr") if TYPE_CHECKING: - from bec_ipython_client.plugins.flomni import Flomni + from bec_ipython_client.plugins.flomni.flomni import Flomni class XrayEyeAlign: diff --git a/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py b/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py index a689eb9..7ea5496 100644 --- a/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py +++ b/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py @@ -1,14 +1,16 @@ +from __future__ import annotations + import threading import time -from typing import List +from typing import TYPE_CHECKING, Literal import numpy as np -from bec_lib import bec_logger, messages -from bec_lib.endpoints import MessageEndpoints +from bec_lib.logger import bec_logger from ophyd import Component as Cpt -from ophyd import Device, PositionerBase, Signal +from ophyd import Device, DeviceStatus, PositionerBase, Signal from ophyd.status import wait as status_wait from ophyd.utils import LimitError +from ophyd_devices.utils.bec_signals import AsyncSignal from ophyd_devices.utils.controller import Controller, threadlocked from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected from prettytable import PrettyTable @@ -23,6 +25,9 @@ from csaxs_bec.devices.omny.rt.rt_ophyd import ( retry_once, ) +if TYPE_CHECKING: + from bec_server.device_server.devices.devicemanager import DeviceManagerDS + logger = bec_logger.logger @@ -270,8 +275,7 @@ class RtFlomniController(Controller): self.laser_update_tracker_info() if self.tracker_info["enabled_z"] and self.tracker_info["enabled_y"]: return True - else: - return False + return False def laser_tracker_on(self): if not self.laser_tracker_check_enabled(): @@ -383,66 +387,61 @@ class RtFlomniController(Controller): val = float(self.socket_put_and_receive(f"j{axis_number}").strip()) return val - def laser_tracker_check_signalstrength(self): + def laser_tracker_check_signalstrength(self) -> Literal["ok", "low", "toolow", "disabled"]: + """ + Check the signal strength of the laser tracker interferometer. + Returns: + str: "ok" if signal is above low limit, "low" if signal is below low limit but above min limit, + "toolow" if signal is below min limit, "disabled" if tracker is not enabled. + """ if not self.laser_tracker_check_enabled(): - returnval = "disabled" - else: - returnval = "ok" - self.laser_tracker_wait_on_target() + return "disabled" - signal = self.read_ssi_interferometer(1) - rtx = self.get_device_manager().devices.rtx - min_signal = rtx.user_parameter.get("min_signal") - low_signal = rtx.user_parameter.get("low_signal") - print(f"low signal: {low_signal}") - print(f"min signal: {min_signal}") - print(f"signal: {signal}") + returnval = "ok" + self.laser_tracker_wait_on_target() + + signal = self.read_ssi_interferometer(1) + rtx = self.get_device_manager().devices.rtx + min_signal = rtx.user_parameter.get("min_signal") + low_signal = rtx.user_parameter.get("low_signal") + print(f"low signal: {low_signal}") + print(f"min signal: {min_signal}") + print(f"signal: {signal}") + if signal < min_signal: + time.sleep(1) if signal < min_signal: - time.sleep(1) - if signal < min_signal: - print( - f"\x1b[91mThe signal of the tracker {signal} is below the minimum required signal of {min_signal}. Readjustment requred!\x1b[0m" - ) - returnval = "toolow" - # raise RtError("The interferometer signal of tracker is too low.") - elif signal < low_signal: print( - f"\x1b[91mThe signal of the tracker {signal} is below the warning limit of {low_signal}. Readjustment recommended!\x1b[0m" + f"\x1b[91mThe signal of the tracker {signal} is below the minimum required signal of {min_signal}. Readjustment requred!\x1b[0m" ) - returnval = "low" + returnval = "toolow" + # raise RtError("The interferometer signal of tracker is too low.") + elif signal < low_signal: + print( + f"\x1b[91mThe signal of the tracker {signal} is below the warning limit of {low_signal}. Readjustment recommended!\x1b[0m" + ) + returnval = "low" return returnval def show_signal_strength_interferometer(self): + """ + Show the signal strength of all four axes of the interferometer. + """ t = PrettyTable() - t.title = f"Interferometer signal strength" + t.title = "Interferometer signal strength" t.field_names = ["Axis", "Value"] for i in range(4): t.add_row([i, self.read_ssi_interferometer(i)]) print(t) - def _get_signals_from_table(self, return_table) -> dict: - self.average_stdeviations_x_st_fzp += float(return_table[4]) - self.average_stdeviations_y_st_fzp += float(return_table[7]) - signals = { - "target_x": {"value": float(return_table[2])}, - "average_x_st_fzp": {"value": float(return_table[3])}, - "stdev_x_st_fzp": {"value": float(return_table[4])}, - "target_y": {"value": float(return_table[5])}, - "average_y_st_fzp": {"value": float(return_table[6])}, - "stdev_y_st_fzp": {"value": float(return_table[7])}, - "average_rotz": {"value": float(return_table[8])}, - "stdev_rotz": {"value": float(return_table[9])}, - "average_stdeviations_x_st_fzp": { - "value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1) - }, - "average_stdeviations_y_st_fzp": { - "value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1) - }, - } - return signals - @threadlocked def start_scan(self): + """ + Start the scan. Before starting the scan, it is checked that the feedback loop is running. + Furthermore, it is checked that at least one target position is planned. + + Raises: + RtError: Raised if feedback loop is not running or if no target positions are planned. + """ if not self.feedback_is_running(): logger.error( "Cannot start scan because feedback loop is not running or there is an" @@ -453,18 +452,23 @@ class RtFlomniController(Controller): " interferometer error." ) # here exception - (mode, number_of_positions_planned, current_position_in_scan) = self.get_scan_status() + (_mode, number_of_positions_planned, _current_position_in_scan) = self.get_scan_status() if number_of_positions_planned == 0: logger.error("Cannot start scan because no target positions are planned.") raise RtError("Cannot start scan because no target positions are planned.") - # hier exception # start a point-by-point scan (for cont scan in flomni it would be "sa") self.socket_put_and_receive("sd") @retry_once @threadlocked - def get_scan_status(self): + def get_scan_status(self) -> tuple[int, int, int]: + """ + Get the current scan status of the controller. + + Returns: + tuple: A tuple containing the mode, number of positions planned, and current position in scan. + """ return_table = (self.socket_put_and_receive("sr")).split(",") if len(return_table) != 3: raise RtCommunicationError( @@ -480,94 +484,15 @@ class RtFlomniController(Controller): current_position_in_scan = int(float(return_table[2])) return (mode, number_of_positions_planned, current_position_in_scan) - def get_device_manager(self): + def get_device_manager(self) -> DeviceManagerDS: + """ + Helper function to get the device manager from one of the axes. + """ for axis in self._axis: - if hasattr(axis, "device_manager") and axis.device_manager: - return axis.device_manager + if hasattr(axis, "device_manager") and axis.device_manager: # type: ignore + return axis.device_manager # type: ignore raise BECConfigError("Could not access the device_manager") - def read_positions_from_sampler(self): - # this was for reading after the scan completed - number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read - - read_counter = 0 - - self.average_stdeviations_x_st_fzp = 0 - self.average_stdeviations_y_st_fzp = 0 - self.average_lamni_angle = 0 - - mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status() - - # if not (mode==2 or mode==3): - # error - self.get_device_manager().connector.set( - MessageEndpoints.device_status("rt_scan"), - messages.DeviceStatusMessage( - device="rt_scan", status=1, metadata=self.readout_metadata - ), - ) - # while scan is running - while mode > 0: - # logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}") - mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status() - time.sleep(0.01) - if current_position_in_scan > 5: - while current_position_in_scan > read_counter + 1: - return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",") - # logger.info(f"{return_table}") - logger.info(f"Read {read_counter} out of {number_of_positions_planned}") - - read_counter = read_counter + 1 - - signals = self._get_signals_from_table(return_table) - - self.publish_device_data(signals=signals, point_id=int(return_table[0])) - - time.sleep(0.05) - - # read the last samples even though scan is finished already - while number_of_positions_planned > read_counter: - return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",") - logger.info(f"Read {read_counter} out of {number_of_positions_planned}") - # logger.info(f"{return_table}") - read_counter = read_counter + 1 - - signals = self._get_signals_from_table(return_table) - self.publish_device_data(signals=signals, point_id=int(return_table[0])) - - self.get_device_manager().connector.set( - MessageEndpoints.device_status("rt_scan"), - messages.DeviceStatusMessage( - device="rt_scan", status=0, metadata=self.readout_metadata - ), - ) - - logger.info( - "Flomni statistics: Average of all standard deviations: x" - f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y" - f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}" - ) - - def publish_device_data(self, signals, point_id): - self.get_device_manager().connector.set_and_publish( - MessageEndpoints.device_read("rt_flomni"), - messages.DeviceMessage( - signals=signals, metadata={"point_id": point_id, **self.readout_metadata} - ), - ) - - def start_readout(self): - readout = threading.Thread(target=self.read_positions_from_sampler) - readout.start() - - def kickoff(self, metadata): - self.readout_metadata = metadata - while not self._min_scan_buffer_reached: - time.sleep(0.001) - self.start_scan() - time.sleep(0.1) - self.start_readout() - class RtFlomniReadbackSignal(RtReadbackSignal): @retry_once @@ -803,9 +728,6 @@ class RtFlomniMotor(Device, PositionerBase): else: raise TypeError(f"Expected value of type int but received {type(val)}") - def kickoff(self, metadata, **kwargs) -> None: - self.controller.kickoff(metadata) - @property def egu(self): """The engineering units (EGU) for positions""" @@ -817,6 +739,146 @@ class RtFlomniMotor(Device, PositionerBase): return super().stop(success=success) +class RtFlomniFlyer(Device): + USER_ACCESS = ["controller"] + data = Cpt(AsyncSignal, "data", ndim=1, async_update={"type": "add", "max_shape": [None]}) + + def __init__( + self, + prefix="", + *, + name, + kind=None, + read_attrs=None, + configuration_attrs=None, + parent=None, + host="mpc2844.psi.ch", + port=2222, + socket_cls=SocketIO, + device_manager=None, + **kwargs, + ): + super().__init__( + prefix=prefix, + name=name, + kind=kind, + read_attrs=read_attrs, + configuration_attrs=configuration_attrs, + parent=parent, + host=host, + port=port, + socket_cls=socket_cls, + device_manager=device_manager, + **kwargs, + ) + self.shutdown_event = threading.Event() + self.controller = RtFlomniController( + socket_cls=socket_cls, socket_host=host, socket_port=port + ) + self.average_stdeviations_x_st_fzp = 0 + self.average_stdeviations_y_st_fzp = 0 + self.average_lamni_angle = 0 + + def read_positions_from_sampler(self, status: DeviceStatus): + """ + Read the positions from the sampler and update the data signal. + This function runs in a separate thread and continuously checks the + scan status. + + Args: + status (DeviceStatus): The status object to update when the readout is complete. + """ + read_counter = 0 + self.average_stdeviations_x_st_fzp = 0 + self.average_stdeviations_y_st_fzp = 0 + self.average_lamni_angle = 0 + + mode, number_of_positions_planned, current_position_in_scan = ( + self.controller.get_scan_status() + ) + + # while scan is running + while mode > 0 and not self.shutdown_event.wait(0.01): + # logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}") + mode, number_of_positions_planned, current_position_in_scan = ( + self.controller.get_scan_status() + ) + if current_position_in_scan > 5: + while current_position_in_scan > read_counter + 1: + return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",") + logger.info(f"Read {read_counter} out of {number_of_positions_planned}") + read_counter = read_counter + 1 + signals = self._get_signals_from_table(return_table) + self.data.set(signals) + + if self.shutdown_event.wait(0.05): + logger.info("Shutdown event set, stopping readout.") + # if we are here, the shutdown_event is set. We can exit the readout loop. + status.set_finished() + return + + # read the last samples even though scan is finished already + while number_of_positions_planned > read_counter and not self.shutdown_event.is_set(): + return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",") + logger.info(f"Read {read_counter} out of {number_of_positions_planned}") + read_counter = read_counter + 1 + + signals = self._get_signals_from_table(return_table) + self.data.set(signals) + + # NOTE: No need to set the status to failed if the shutdown_event is set. + # The stop() method will take care of that. + status.set_finished() + + logger.info( + "Flomni statistics: Average of all standard deviations: x" + f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y" + f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}" + ) + + def _get_signals_from_table(self, return_table) -> dict: + self.average_stdeviations_x_st_fzp += float(return_table[4]) + self.average_stdeviations_y_st_fzp += float(return_table[7]) + signals = { + "target_x": {"value": float(return_table[2])}, + "average_x_st_fzp": {"value": float(return_table[3])}, + "stdev_x_st_fzp": {"value": float(return_table[4])}, + "target_y": {"value": float(return_table[5])}, + "average_y_st_fzp": {"value": float(return_table[6])}, + "stdev_y_st_fzp": {"value": float(return_table[7])}, + "average_rotz": {"value": float(return_table[8])}, + "stdev_rotz": {"value": float(return_table[9])}, + "average_stdeviations_x_st_fzp": { + "value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1) + }, + "average_stdeviations_y_st_fzp": { + "value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1) + }, + } + return signals + + def start_readout(self, status: DeviceStatus): + readout = threading.Thread(target=self.read_positions_from_sampler, args=(status,)) + readout.start() + + def kickoff(self): + self.shutdown_event.clear() + while not self._min_scan_buffer_reached and not self.shutdown_event.wait(0.001): + ... + self.start_scan() + self.shutdown_event.wait(0.1) + + def complete(self) -> DeviceStatus: + """Wait until the flyer is done.""" + status = DeviceStatus(self) + self.start_readout(status) + return status + + def stop(self, *, success=False): + self.shutdown_event.set() + return super().stop(success=success) + + if __name__ == "__main__": rtcontroller = RtFlomniController( socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None diff --git a/csaxs_bec/scans/flomni_fermat_scan.py b/csaxs_bec/scans/flomni_fermat_scan.py index 045ef1e..c507f02 100644 --- a/csaxs_bec/scans/flomni_fermat_scan.py +++ b/csaxs_bec/scans/flomni_fermat_scan.py @@ -169,12 +169,16 @@ class FlomniFermatScan(SyncFlyScanBase): tracker_signal_status = yield from self.stubs.send_rpc_and_wait( "rtx", "controller.laser_tracker_check_signalstrength" ) - #self.device_manager.connector.send_client_info(tracker_signal_status) + # self.device_manager.connector.send_client_info(tracker_signal_status) if tracker_signal_status == "low": self.device_manager.connector.raise_alarm( severity=0, alarm_type="LaserTrackerSignalStrength", - source={"device": "rtx", "reason": "low signal strength", "method": "_prepare_setup_part2"}, + source={ + "device": "rtx", + "reason": "low signal strength", + "method": "_prepare_setup_part2", + }, metadata={}, msg="Signal strength of the laser tracker is low, sufficient to continue. Realignment recommended!", ) @@ -276,26 +280,12 @@ class FlomniFermatScan(SyncFlyScanBase): return np.array(positions) def scan_core(self): - # use a device message to receive the scan number and - # scan ID before sending the message to the device server - yield from self.stubs.kickoff(device="rtx") - while True: + yield from self.stubs.kickoff(device="rt_flyer") + status = yield from self.stubs.complete(device="rt_flyer", wait=False) + while not status.done: yield from self.stubs.read(group="monitored") - status = self.connector.get(MessageEndpoints.device_status("rt_scan")) - if status: - status_id = status.content.get("status", 1) - request_id = status.metadata.get("RID") - if status_id == 0 and self.metadata.get("RID") == request_id: - break - if status_id == 2 and self.metadata.get("RID") == request_id: - raise ScanAbortion( - "An error occured during the flomni readout:" - f" {status.metadata.get('error')}" - ) - time.sleep(1) logger.debug("reading monitors") - # yield from self.device_rpc("rtx", "controller.kickoff") def move_to_start(self): """return to the start position"""