feat(alarms): report scan id and scan number on alarms

This commit is contained in:
2025-06-30 14:00:10 +02:00
committed by Klaus Wakonig
parent d0406aee24
commit ccfad6551a
8 changed files with 204 additions and 17 deletions
@@ -334,6 +334,34 @@ class DeviceServer(RPCMixin, BECService):
if dev not in self.device_manager.devices:
raise InvalidDeviceError(f"There is no device with the name {dev}.")
def _get_metadata_for_alarm(
self, instruction: messages.DeviceInstructionMessage | None = None
) -> dict:
"""
Get the metadata for the current scan. This is used to add the scan ID and scan number to alarms.
Returns:
dict: Metadata dictionary with scan ID and scan number.
"""
metadata = {}
if instruction is not None:
metadata.update(instruction.metadata)
if not self.device_manager:
return metadata
if not self.device_manager.scan_info:
return metadata
msg = self.device_manager.scan_info.msg
if not msg:
return metadata
scan_id_instruction = metadata.get("scan_id")
if msg.scan_id == scan_id_instruction and msg.scan_number is not None:
metadata["scan_number"] = msg.scan_number
return metadata
def handle_device_instructions(self, msg: messages.DeviceInstructionMessage) -> None:
"""Parse a device instruction message and handle the requested action. Action
types are set, read, rpc, kickoff or trigger.
@@ -386,7 +414,7 @@ class DeviceServer(RPCMixin, BECService):
source=instructions.content,
msg=content,
alarm_type=limit_error.__class__.__name__,
metadata=instructions.metadata,
metadata=self._get_metadata_for_alarm(msg),
)
except Exception as exc: # pylint: disable=broad-except
content = traceback.format_exc()
@@ -402,7 +430,7 @@ class DeviceServer(RPCMixin, BECService):
source=instructions.content,
msg=content,
alarm_type=exc.__class__.__name__,
metadata=instructions.metadata,
metadata=self._get_metadata_for_alarm(msg),
)
@staticmethod
@@ -657,7 +685,7 @@ class DeviceServer(RPCMixin, BECService):
alarm_type="Warning",
source={"device": device, "method": method},
msg=f"Failed to run {method} on device {device}.",
metadata={},
metadata=self._get_metadata_for_alarm(),
)
device_root = device.split(".")[0]
ds_dev = self.device_manager.devices.get(device_root)
@@ -148,7 +148,7 @@ class AsyncWriter(threading.Thread):
alarm_type="AsyncWriterError",
source={"file_path": self.file_path},
msg=f"Error writing async data file {self.file_path}: {content}",
metadata={},
metadata={"scan_id": self.scan_id, "scan_number": self.scan_number},
)
def send_file_message(self, done: bool, successful: bool) -> None:
@@ -232,7 +232,7 @@ class AsyncWriter(threading.Thread):
alarm_type="ValueError",
source={"device": device_name},
msg=f"Unknown key: {key}. Data will not be written.",
metadata={},
metadata={"scan_id": self.scan_id, "scan_number": self.scan_number},
)
if write_replace:
@@ -276,7 +276,7 @@ class AsyncWriter(threading.Thread):
alarm_type="ValueError",
source={"device": signal_group.name},
msg=f"Unknown async update type: {update_type}. Data will not be written.",
metadata={},
metadata={"scan_id": self.scan_id, "scan_number": self.scan_number},
)
def _write_value_add(self, async_update: dict, signal_group: h5py.Group, value: Any):
@@ -333,7 +333,7 @@ class AsyncWriter(threading.Thread):
alarm_type="ValueError",
source={"device": signal_group.name},
msg=f"Data for {signal_group.name} exceeds the defined max_shape {max_shape}. Data will not be written.",
metadata={},
metadata={"scan_id": self.scan_id, "scan_number": self.scan_number},
)
return
signal_group["value"].resize((current_shape[0] + value.shape[0], *current_shape[1:]))
@@ -358,7 +358,7 @@ class AsyncWriter(threading.Thread):
alarm_type="ValueError",
source={"device": signal_group.name},
msg=f"Invalid max_shape for async update type 'add_slice': {max_shape}. max_shape cannot exceed two dimensions. Data will not be written.",
metadata={},
metadata={"scan_id": self.scan_id, "scan_number": self.scan_number},
)
return
@@ -386,7 +386,7 @@ class AsyncWriter(threading.Thread):
alarm_type="ValueError",
source={"device": signal_group.name, "slice": row_index},
msg=f"Data for {signal_group.name} exceeds the defined max_shape {max_shape}. Data will be truncated.",
metadata={},
metadata={"scan_id": self.scan_id, "scan_number": self.scan_number},
)
value = value[:, : max_shape[1]]
signal_group.create_dataset("value", data=value, maxshape=max_shape)
@@ -412,7 +412,7 @@ class AsyncWriter(threading.Thread):
alarm_type="ValueError",
source={"device": signal_group.name, "slice": row_index},
msg=f"Added data slice for {signal_group.name} exceeds the defined max_shape {max_shape}. Data will be truncated.",
metadata={},
metadata={"scan_id": self.scan_id, "scan_number": self.scan_number},
)
value = value[: max_shape[1] - col_index]
signal_group["value"].resize((row_index + 1, max_shape[1]))
@@ -824,6 +824,20 @@ class RequestBlockQueue:
self.scan_queue.queue_manager.parent.dataset_number += 1
return
def _get_metadata_for_alarm(self):
"""get the metadata for the alarm"""
metadata = {}
if self.active_rb is None:
return metadata
if self.active_rb.scan is None:
return metadata
if self.active_rb.scan_id is not None:
metadata["scan_id"] = self.active_rb.scan_id
if self.active_rb.scan_number is not None:
metadata["scan_number"] = self.active_rb.scan_number
return metadata
def __iter__(self):
return self
@@ -849,7 +863,7 @@ class RequestBlockQueue:
source=self.active_rb.msg.content,
msg=limit_error.args[0],
alarm_type=limit_error.__class__.__name__,
metadata={},
metadata=self._get_metadata_for_alarm(),
)
self.instruction_queue.stopped = True
raise ScanAbortion from limit_error
@@ -862,7 +876,7 @@ class RequestBlockQueue:
source=self.active_rb.msg.content,
msg=content,
alarm_type=exc.__class__.__name__,
metadata={},
metadata=self._get_metadata_for_alarm(),
)
raise ScanAbortion from exc
@@ -347,6 +347,24 @@ class ScanWorker(threading.Thread):
if set(readback.get("devices", [])) & set(instr_device):
instr.metadata["response"] = True
def _get_metadata_for_alarm(self) -> dict:
"""
Get metadata for the alarm to be raised in case of an error.
This includes the scan ID and scan number if available.
Returns:
dict: Metadata dictionary with scan ID and scan number.
"""
metadata = {}
if not self.current_scan_info:
return metadata
if self.current_scan_info.get("scan_id"):
metadata["scan_id"] = self.current_scan_info["scan_id"]
if self.current_scan_info.get("scan_number"):
metadata["scan_number"] = self.current_scan_info["scan_number"]
return metadata
def _process_instructions(self, queue: InstructionQueueItem) -> None:
"""
Process scan instructions and send DeviceInstructions to OPAAS.
@@ -397,7 +415,7 @@ class ScanWorker(threading.Thread):
source={"ScanWorker": "_process_instructions"},
msg=content,
alarm_type=exc_return_to_start.__class__.__name__,
metadata={},
metadata=self._get_metadata_for_alarm(),
)
raise ScanAbortion from exc
raise ScanAbortion from exc
@@ -409,7 +427,7 @@ class ScanWorker(threading.Thread):
source={"ScanWorker": "_process_instructions"},
msg=content,
alarm_type=exc.__class__.__name__,
metadata={},
metadata=self._get_metadata_for_alarm(),
)
raise ScanAbortion from exc
queue.is_active = False
@@ -522,7 +540,7 @@ class ScanWorker(threading.Thread):
source={"ScanWorker": "run"},
msg=content,
alarm_type=exc.__class__.__name__,
metadata={},
metadata=self._get_metadata_for_alarm(),
)
if self.queue_name in self.parent.queue_manager.queues:
self.parent.queue_manager.queues[self.queue_name].abort()
+6 -2
View File
@@ -541,6 +541,10 @@ class ScanBase(RequestBase, PathOptimizerMixin):
# Their done status was not checked nor were they waited for
# While this is not an error, it is a warning that the scan
# might not have completed as expected.
metadata = {"scan_id": self.scan_id}
if self.scan_number is not None:
metadata["scan_number"] = self.scan_number
unchecked_status_objects = self.stubs.get_remaining_status_objects(
exclude_done=False, exclude_checked=True
)
@@ -550,7 +554,7 @@ class ScanBase(RequestBase, PathOptimizerMixin):
source={"Scan": self.scan_name},
msg=f"Scan completed with unchecked status objects: {unchecked_status_objects}. Use .wait() or .done within the scan to check their status.",
alarm_type="ScanCleanupWarning",
metadata={},
metadata=metadata,
)
# Check if there are any remaining status objects that are not done.
@@ -564,7 +568,7 @@ class ScanBase(RequestBase, PathOptimizerMixin):
source={"Scan": self.scan_name},
msg=f"Scan completed with remaining status objects: {remaining_status_objects}",
alarm_type="ScanCleanupWarning",
metadata={},
metadata=metadata,
)
for obj in remaining_status_objects:
obj.wait()
@@ -808,3 +808,75 @@ def test_stage_timeout_unstage_device(device_server_mock, instr):
# Change the mock to return the resolved unstage status + unstage the device
mock_unstage.side_effect = callback
device_server._stage_device(instr, timeout_on_unstage=0.1)
@pytest.mark.parametrize(
"instr",
[
messages.DeviceInstructionMessage(
device="samx",
action="unstage",
parameter={},
metadata={"stream": "primary", "device_instr_id": "diid"},
),
messages.DeviceInstructionMessage(
device="test_device", action="kickoff", parameter={}, metadata={}
),
],
)
def test_get_metadata_for_alarm(device_server_mock, instr):
device_server = device_server_mock
metadata = device_server._get_metadata_for_alarm(instr)
assert metadata == instr.metadata
def test_get_metadata_for_alarm_no_device_manager(device_server_mock):
device_server = device_server_mock
instr = messages.DeviceInstructionMessage(
device="test_device", action="kickoff", parameter={}, metadata={}
)
device_server.device_manager = None
metadata = device_server._get_metadata_for_alarm(instr)
assert metadata == instr.metadata
def test_get_metadata_for_alarm_no_scan_info(device_server_mock):
device_server = device_server_mock
instr = messages.DeviceInstructionMessage(
device="test_device", action="kickoff", parameter={}, metadata={}
)
device_server.device_manager.scan_info = None
metadata = device_server._get_metadata_for_alarm(instr)
assert metadata == instr.metadata
def test_get_metadata_for_alarm_no_scan_info_msg(device_server_mock):
device_server = device_server_mock
instr = messages.DeviceInstructionMessage(
device="test_device", action="kickoff", parameter={}, metadata={}
)
device_server.device_manager.scan_info.msg = None
metadata = device_server._get_metadata_for_alarm(instr)
assert metadata == instr.metadata
@pytest.mark.parametrize(
"msg",
[
messages.ScanStatusMessage(
scan_id="12345", scan_number=1, status="open", info={}, metadata={}
),
messages.ScanStatusMessage(
scan_id="12345", scan_number=1, status="open", info={}, metadata={}
),
],
)
def test_get_metadata_for_alarm_with_scan_info_msg(device_server_mock, msg):
device_server = device_server_mock
instr = messages.DeviceInstructionMessage(
device="test_device", action="kickoff", parameter={}, metadata={"scan_id": "12345"}
)
device_server.device_manager.scan_info.msg = msg
metadata = device_server._get_metadata_for_alarm(instr)
assert metadata["scan_id"] == msg.scan_id
assert metadata["scan_number"] == msg.scan_number
@@ -7,6 +7,7 @@ from bec_lib import messages
from bec_lib.alarm_handler import Alarms
from bec_lib.endpoints import MessageEndpoints
from bec_lib.redis_connector import MessageObject
from bec_server.scan_server.errors import LimitError, ScanAbortion
from bec_server.scan_server.scan_assembler import ScanAssembler
from bec_server.scan_server.scan_queue import (
InstructionQueueItem,
@@ -704,6 +705,39 @@ def test_pull_request_block_empyt_rb():
rbqs.assert_not_called()
@pytest.fixture(params=[LimitError, ScanAbortion])
def request_block_queue_error(request):
req_block_queue = RequestBlockQueue(mock.MagicMock(), mock.MagicMock())
req_block_queue.active_rb = mock.MagicMock()
req_block_queue.active_rb.instructions.__next__.side_effect = request.param("Test error")
return req_block_queue, request.param
@pytest.mark.parametrize(
"scan,scan_id,scan_number,metadata",
[
(None, None, None, {}),
(mock.MagicMock(), "scan_id", 1, {"scan_id": "scan_id", "scan_number": 1}),
],
)
def test_request_block_queue_raises_alarm_on_error(
request_block_queue_error, scan, scan_id, scan_number, metadata
):
req_block_queue, exc = request_block_queue_error
req_block_queue.active_rb.scan = scan
req_block_queue.active_rb.scan_id = scan_id
req_block_queue.active_rb.scan_number = scan_number
with pytest.raises(ScanAbortion):
next(req_block_queue)
req_block_queue.scan_queue.queue_manager.connector.raise_alarm.assert_called_once_with(
severity=Alarms.MAJOR,
source=req_block_queue.active_rb.msg.content,
msg=mock.ANY,
alarm_type=exc.__name__,
metadata=metadata,
)
def test_queue_manager_get_active_scan_id(queuemanager_mock):
queue_manager = queuemanager_mock()
msg = messages.ScanQueueMessage(
@@ -774,3 +774,20 @@ def test_worker_get_file_base_path(
worker.parent._service_config.service_config["file_writer"][
"base_path"
] = file_writer_base_path_orig
@pytest.mark.parametrize(
"scan_info, out",
[
(None, {}),
({}, {}),
({"scan_id": "12345"}, {"scan_id": "12345"}),
({"scan_number": 1}, {"scan_number": 1}),
({"scan_id": "12345", "scan_number": 1}, {"scan_id": "12345", "scan_number": 1}),
],
)
def test_worker_get_metadata_for_alarm(scan_worker_mock, scan_info, out):
worker = scan_worker_mock
worker.current_scan_info = scan_info
metadata = worker._get_metadata_for_alarm()
assert metadata == out