fix: alarm handler signature for flomni scan
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 2m19s
CI for csaxs_bec / test (push) Successful in 1m55s

This commit was merged in pull request #165.
This commit is contained in:
2026-03-13 09:34:32 +01:00
parent 00c45b2bcf
commit 4b04e7a35d

View File

@@ -24,9 +24,11 @@ import time
import numpy as np
from bec_lib import bec_logger, messages
from bec_lib.alarm_handler import Alarms
from bec_lib.endpoints import MessageEndpoints
from bec_server.scan_server.errors import ScanAbortion
from bec_server.scan_server.scans import SyncFlyScanBase
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import TRIGGERSOURCE
logger = bec_logger.logger
@@ -52,7 +54,7 @@ class FlomniFermatScan(SyncFlyScanBase):
angle: float = None,
corridor_size: float = 3,
parameter: dict = None,
frames_per_trigger:int=1,
frames_per_trigger: int = 1,
**kwargs,
):
"""
@@ -76,7 +78,9 @@ class FlomniFermatScan(SyncFlyScanBase):
>>> scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1)
"""
super().__init__(parameter=parameter, exp_time=exp_time, frames_per_trigger=frames_per_trigger, **kwargs)
super().__init__(
parameter=parameter, exp_time=exp_time, frames_per_trigger=frames_per_trigger, **kwargs
)
self.show_live_table = False
self.axis = []
self.fovx = fovx
@@ -153,9 +157,11 @@ class FlomniFermatScan(SyncFlyScanBase):
yield from self.stubs.send_rpc_and_wait("rty", "set", self.positions[0][1])
def _prepare_setup_part2(self):
# Prepare DDG1 to use
yield from self.stubs.send_rpc_and_wait("ddg1", "set_trigger", TRIGGERSOURCE.EXT_RISING_EDGE.value)
# Prepare DDG1 to use
yield from self.stubs.send_rpc_and_wait(
"ddg1", "set_trigger", TRIGGERSOURCE.EXT_RISING_EDGE.value
)
if self.flomni_rotation_status:
self.flomni_rotation_status.wait()
@@ -174,15 +180,15 @@ class FlomniFermatScan(SyncFlyScanBase):
tracker_signal_status = yield from self.stubs.send_rpc_and_wait(
"rtx", "controller.laser_tracker_check_signalstrength"
)
#self.device_manager.connector.send_client_info(tracker_signal_status)
# self.device_manager.connector.send_client_info(tracker_signal_status)
if tracker_signal_status == "low":
self.device_manager.connector.raise_alarm(
severity=0,
alarm_type="LaserTrackerSignalStrength",
source={"device": "rtx", "reason": "low signal strength", "method": "_prepare_setup_part2"},
metadata={},
msg="Signal strength of the laser tracker is low, sufficient to continue. Realignment recommended!",
error_info = messages.ErrorInfo(
error_message="Signal strength of the laser tracker is low, but sufficient to continue. Realignment recommended!",
compact_error_message="Low signal strength of the laser tracker. Realignment recommended!",
exception_type="LaserTrackerSignalStrengthLow",
device="rtx",
)
self.device_manager.connector.raise_alarm(severity=Alarms.WARNING, info=error_info)
elif tracker_signal_status == "toolow":
raise ScanAbortion(
"Signal strength of the laser tracker is too low for scanning. Realignment required!"
@@ -313,7 +319,9 @@ class FlomniFermatScan(SyncFlyScanBase):
logger.warning("No positions found to return to start")
def cleanup(self):
yield from self.stubs.send_rpc_and_wait("ddg1", "set_trigger", TRIGGERSOURCE.SINGLE_SHOT.value)
yield from self.stubs.send_rpc_and_wait(
"ddg1", "set_trigger", TRIGGERSOURCE.SINGLE_SHOT.value
)
yield from super().cleanup()
def run(self):