wip - rt flyer
Some checks failed
CI for csaxs_bec / test (push) Failing after 11m42s
CI for csaxs_bec / test (pull_request) Failing after 12m8s

This commit is contained in:
2025-11-25 17:45:34 +01:00
parent 3a41c2130a
commit 28da69a38c
3 changed files with 213 additions and 161 deletions

View File

@@ -17,7 +17,7 @@ umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
if TYPE_CHECKING:
from bec_ipython_client.plugins.flomni import Flomni
from bec_ipython_client.plugins.flomni.flomni import Flomni
class XrayEyeAlign:

View File

@@ -1,14 +1,16 @@
from __future__ import annotations
import threading
import time
from typing import List
from typing import TYPE_CHECKING, Literal
import numpy as np
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal
from ophyd import Device, DeviceStatus, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError
from ophyd_devices.utils.bec_signals import AsyncSignal
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected
from prettytable import PrettyTable
@@ -23,6 +25,9 @@ from csaxs_bec.devices.omny.rt.rt_ophyd import (
retry_once,
)
if TYPE_CHECKING:
from bec_server.device_server.devices.devicemanager import DeviceManagerDS
logger = bec_logger.logger
@@ -270,8 +275,7 @@ class RtFlomniController(Controller):
self.laser_update_tracker_info()
if self.tracker_info["enabled_z"] and self.tracker_info["enabled_y"]:
return True
else:
return False
return False
def laser_tracker_on(self):
if not self.laser_tracker_check_enabled():
@@ -383,66 +387,61 @@ class RtFlomniController(Controller):
val = float(self.socket_put_and_receive(f"j{axis_number}").strip())
return val
def laser_tracker_check_signalstrength(self):
def laser_tracker_check_signalstrength(self) -> Literal["ok", "low", "toolow", "disabled"]:
"""
Check the signal strength of the laser tracker interferometer.
Returns:
str: "ok" if signal is above low limit, "low" if signal is below low limit but above min limit,
"toolow" if signal is below min limit, "disabled" if tracker is not enabled.
"""
if not self.laser_tracker_check_enabled():
returnval = "disabled"
else:
returnval = "ok"
self.laser_tracker_wait_on_target()
return "disabled"
signal = self.read_ssi_interferometer(1)
rtx = self.get_device_manager().devices.rtx
min_signal = rtx.user_parameter.get("min_signal")
low_signal = rtx.user_parameter.get("low_signal")
print(f"low signal: {low_signal}")
print(f"min signal: {min_signal}")
print(f"signal: {signal}")
returnval = "ok"
self.laser_tracker_wait_on_target()
signal = self.read_ssi_interferometer(1)
rtx = self.get_device_manager().devices.rtx
min_signal = rtx.user_parameter.get("min_signal")
low_signal = rtx.user_parameter.get("low_signal")
print(f"low signal: {low_signal}")
print(f"min signal: {min_signal}")
print(f"signal: {signal}")
if signal < min_signal:
time.sleep(1)
if signal < min_signal:
time.sleep(1)
if signal < min_signal:
print(
f"\x1b[91mThe signal of the tracker {signal} is below the minimum required signal of {min_signal}. Readjustment requred!\x1b[0m"
)
returnval = "toolow"
# raise RtError("The interferometer signal of tracker is too low.")
elif signal < low_signal:
print(
f"\x1b[91mThe signal of the tracker {signal} is below the warning limit of {low_signal}. Readjustment recommended!\x1b[0m"
f"\x1b[91mThe signal of the tracker {signal} is below the minimum required signal of {min_signal}. Readjustment requred!\x1b[0m"
)
returnval = "low"
returnval = "toolow"
# raise RtError("The interferometer signal of tracker is too low.")
elif signal < low_signal:
print(
f"\x1b[91mThe signal of the tracker {signal} is below the warning limit of {low_signal}. Readjustment recommended!\x1b[0m"
)
returnval = "low"
return returnval
def show_signal_strength_interferometer(self):
"""
Show the signal strength of all four axes of the interferometer.
"""
t = PrettyTable()
t.title = f"Interferometer signal strength"
t.title = "Interferometer signal strength"
t.field_names = ["Axis", "Value"]
for i in range(4):
t.add_row([i, self.read_ssi_interferometer(i)])
print(t)
def _get_signals_from_table(self, return_table) -> dict:
self.average_stdeviations_x_st_fzp += float(return_table[4])
self.average_stdeviations_y_st_fzp += float(return_table[7])
signals = {
"target_x": {"value": float(return_table[2])},
"average_x_st_fzp": {"value": float(return_table[3])},
"stdev_x_st_fzp": {"value": float(return_table[4])},
"target_y": {"value": float(return_table[5])},
"average_y_st_fzp": {"value": float(return_table[6])},
"stdev_y_st_fzp": {"value": float(return_table[7])},
"average_rotz": {"value": float(return_table[8])},
"stdev_rotz": {"value": float(return_table[9])},
"average_stdeviations_x_st_fzp": {
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
},
"average_stdeviations_y_st_fzp": {
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
},
}
return signals
@threadlocked
def start_scan(self):
"""
Start the scan. Before starting the scan, it is checked that the feedback loop is running.
Furthermore, it is checked that at least one target position is planned.
Raises:
RtError: Raised if feedback loop is not running or if no target positions are planned.
"""
if not self.feedback_is_running():
logger.error(
"Cannot start scan because feedback loop is not running or there is an"
@@ -453,18 +452,23 @@ class RtFlomniController(Controller):
" interferometer error."
)
# here exception
(mode, number_of_positions_planned, current_position_in_scan) = self.get_scan_status()
(_mode, number_of_positions_planned, _current_position_in_scan) = self.get_scan_status()
if number_of_positions_planned == 0:
logger.error("Cannot start scan because no target positions are planned.")
raise RtError("Cannot start scan because no target positions are planned.")
# hier exception
# start a point-by-point scan (for cont scan in flomni it would be "sa")
self.socket_put_and_receive("sd")
@retry_once
@threadlocked
def get_scan_status(self):
def get_scan_status(self) -> tuple[int, int, int]:
"""
Get the current scan status of the controller.
Returns:
tuple: A tuple containing the mode, number of positions planned, and current position in scan.
"""
return_table = (self.socket_put_and_receive("sr")).split(",")
if len(return_table) != 3:
raise RtCommunicationError(
@@ -480,94 +484,15 @@ class RtFlomniController(Controller):
current_position_in_scan = int(float(return_table[2]))
return (mode, number_of_positions_planned, current_position_in_scan)
def get_device_manager(self):
def get_device_manager(self) -> DeviceManagerDS:
"""
Helper function to get the device manager from one of the axes.
"""
for axis in self._axis:
if hasattr(axis, "device_manager") and axis.device_manager:
return axis.device_manager
if hasattr(axis, "device_manager") and axis.device_manager: # type: ignore
return axis.device_manager # type: ignore
raise BECConfigError("Could not access the device_manager")
def read_positions_from_sampler(self):
# this was for reading after the scan completed
number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read
read_counter = 0
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.average_lamni_angle = 0
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
# if not (mode==2 or mode==3):
# error
self.get_device_manager().connector.set(
MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage(
device="rt_scan", status=1, metadata=self.readout_metadata
),
)
# while scan is running
while mode > 0:
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
time.sleep(0.01)
if current_position_in_scan > 5:
while current_position_in_scan > read_counter + 1:
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
# logger.info(f"{return_table}")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.publish_device_data(signals=signals, point_id=int(return_table[0]))
time.sleep(0.05)
# read the last samples even though scan is finished already
while number_of_positions_planned > read_counter:
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
# logger.info(f"{return_table}")
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.publish_device_data(signals=signals, point_id=int(return_table[0]))
self.get_device_manager().connector.set(
MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage(
device="rt_scan", status=0, metadata=self.readout_metadata
),
)
logger.info(
"Flomni statistics: Average of all standard deviations: x"
f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y"
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}"
)
def publish_device_data(self, signals, point_id):
self.get_device_manager().connector.set_and_publish(
MessageEndpoints.device_read("rt_flomni"),
messages.DeviceMessage(
signals=signals, metadata={"point_id": point_id, **self.readout_metadata}
),
)
def start_readout(self):
readout = threading.Thread(target=self.read_positions_from_sampler)
readout.start()
def kickoff(self, metadata):
self.readout_metadata = metadata
while not self._min_scan_buffer_reached:
time.sleep(0.001)
self.start_scan()
time.sleep(0.1)
self.start_readout()
class RtFlomniReadbackSignal(RtReadbackSignal):
@retry_once
@@ -803,9 +728,6 @@ class RtFlomniMotor(Device, PositionerBase):
else:
raise TypeError(f"Expected value of type int but received {type(val)}")
def kickoff(self, metadata, **kwargs) -> None:
self.controller.kickoff(metadata)
@property
def egu(self):
"""The engineering units (EGU) for positions"""
@@ -817,6 +739,146 @@ class RtFlomniMotor(Device, PositionerBase):
return super().stop(success=success)
class RtFlomniFlyer(Device):
USER_ACCESS = ["controller"]
data = Cpt(AsyncSignal, "data", ndim=1, async_update={"type": "add", "max_shape": [None]})
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
host="mpc2844.psi.ch",
port=2222,
socket_cls=SocketIO,
device_manager=None,
**kwargs,
):
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
host=host,
port=port,
socket_cls=socket_cls,
device_manager=device_manager,
**kwargs,
)
self.shutdown_event = threading.Event()
self.controller = RtFlomniController(
socket_cls=socket_cls, socket_host=host, socket_port=port
)
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.average_lamni_angle = 0
def read_positions_from_sampler(self, status: DeviceStatus):
"""
Read the positions from the sampler and update the data signal.
This function runs in a separate thread and continuously checks the
scan status.
Args:
status (DeviceStatus): The status object to update when the readout is complete.
"""
read_counter = 0
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.average_lamni_angle = 0
mode, number_of_positions_planned, current_position_in_scan = (
self.controller.get_scan_status()
)
# while scan is running
while mode > 0 and not self.shutdown_event.wait(0.01):
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
mode, number_of_positions_planned, current_position_in_scan = (
self.controller.get_scan_status()
)
if current_position_in_scan > 5:
while current_position_in_scan > read_counter + 1:
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.data.set(signals)
if self.shutdown_event.wait(0.05):
logger.info("Shutdown event set, stopping readout.")
# if we are here, the shutdown_event is set. We can exit the readout loop.
status.set_finished()
return
# read the last samples even though scan is finished already
while number_of_positions_planned > read_counter and not self.shutdown_event.is_set():
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.data.set(signals)
# NOTE: No need to set the status to failed if the shutdown_event is set.
# The stop() method will take care of that.
status.set_finished()
logger.info(
"Flomni statistics: Average of all standard deviations: x"
f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y"
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}"
)
def _get_signals_from_table(self, return_table) -> dict:
self.average_stdeviations_x_st_fzp += float(return_table[4])
self.average_stdeviations_y_st_fzp += float(return_table[7])
signals = {
"target_x": {"value": float(return_table[2])},
"average_x_st_fzp": {"value": float(return_table[3])},
"stdev_x_st_fzp": {"value": float(return_table[4])},
"target_y": {"value": float(return_table[5])},
"average_y_st_fzp": {"value": float(return_table[6])},
"stdev_y_st_fzp": {"value": float(return_table[7])},
"average_rotz": {"value": float(return_table[8])},
"stdev_rotz": {"value": float(return_table[9])},
"average_stdeviations_x_st_fzp": {
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
},
"average_stdeviations_y_st_fzp": {
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
},
}
return signals
def start_readout(self, status: DeviceStatus):
readout = threading.Thread(target=self.read_positions_from_sampler, args=(status,))
readout.start()
def kickoff(self):
self.shutdown_event.clear()
while not self._min_scan_buffer_reached and not self.shutdown_event.wait(0.001):
...
self.start_scan()
self.shutdown_event.wait(0.1)
def complete(self) -> DeviceStatus:
"""Wait until the flyer is done."""
status = DeviceStatus(self)
self.start_readout(status)
return status
def stop(self, *, success=False):
self.shutdown_event.set()
return super().stop(success=success)
if __name__ == "__main__":
rtcontroller = RtFlomniController(
socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None

View File

@@ -169,12 +169,16 @@ class FlomniFermatScan(SyncFlyScanBase):
tracker_signal_status = yield from self.stubs.send_rpc_and_wait(
"rtx", "controller.laser_tracker_check_signalstrength"
)
#self.device_manager.connector.send_client_info(tracker_signal_status)
# self.device_manager.connector.send_client_info(tracker_signal_status)
if tracker_signal_status == "low":
self.device_manager.connector.raise_alarm(
severity=0,
alarm_type="LaserTrackerSignalStrength",
source={"device": "rtx", "reason": "low signal strength", "method": "_prepare_setup_part2"},
source={
"device": "rtx",
"reason": "low signal strength",
"method": "_prepare_setup_part2",
},
metadata={},
msg="Signal strength of the laser tracker is low, sufficient to continue. Realignment recommended!",
)
@@ -276,26 +280,12 @@ class FlomniFermatScan(SyncFlyScanBase):
return np.array(positions)
def scan_core(self):
# use a device message to receive the scan number and
# scan ID before sending the message to the device server
yield from self.stubs.kickoff(device="rtx")
while True:
yield from self.stubs.kickoff(device="rt_flyer")
status = yield from self.stubs.complete(device="rt_flyer", wait=False)
while not status.done:
yield from self.stubs.read(group="monitored")
status = self.connector.get(MessageEndpoints.device_status("rt_scan"))
if status:
status_id = status.content.get("status", 1)
request_id = status.metadata.get("RID")
if status_id == 0 and self.metadata.get("RID") == request_id:
break
if status_id == 2 and self.metadata.get("RID") == request_id:
raise ScanAbortion(
"An error occured during the flomni readout:"
f" {status.metadata.get('error')}"
)
time.sleep(1)
logger.debug("reading monitors")
# yield from self.device_rpc("rtx", "controller.kickoff")
def move_to_start(self):
"""return to the start position"""