feat(scan_bundler): added scan progress

This commit is contained in:
wakonig_k 2024-05-15 16:32:58 +02:00
parent 9f7002e0c6
commit 27befe9666
4 changed files with 41 additions and 1 deletions

View File

@ -519,6 +519,22 @@ class MessageEndpoints:
message_op=MessageOp.SET_PUBLISH,
)
@staticmethod
def scan_progress() -> EndpointInfo:
"""
Endpoint for scan progress. This endpoint is used to publish the scan progress using
a messages.ProgressMessage message.
Returns:
EndpointInfo: Endpoint for scan progress.
"""
endpoint = "scans/scan_progress"
return EndpointInfo(
endpoint=endpoint,
message_type=messages.ProgressMessage,
message_op=MessageOp.SET_PUBLISH,
)
@staticmethod
def available_scans() -> EndpointInfo:
"""

View File

@ -42,6 +42,21 @@ class BECEmitter(EmitterBase):
MessageEndpoints.scan_segment(),
MessageEndpoints.public_scan_segment(scan_id=scan_id, point_id=point_id),
)
self._update_scan_progress(scan_id, point_id)
def _update_scan_progress(self, scan_id: str, point_id: int, done=False) -> None:
info = self.scan_bundler.sync_storage[scan_id]["info"]
msg = messages.ProgressMessage(
value=point_id + 1,
max_value=info.get("num_points", point_id + 1),
done=done,
metadata={
"scan_id": scan_id,
"RID": info.get("RID", ""),
"queue_id": info.get("queue_id", ""),
},
)
self.scan_bundler.connector.set_and_publish(MessageEndpoints.scan_progress(), msg)
def _send_baseline(self, scan_id: str) -> None:
sb = self.scan_bundler
@ -57,3 +72,9 @@ class BECEmitter(EmitterBase):
)
sb.connector.set_and_publish(MessageEndpoints.scan_baseline(), msg, pipe=pipe)
pipe.execute()
def on_scan_status_update(self, status_msg: messages.ScanStatusMessage):
if status_msg.status == "open":
return
num_points = status_msg.info.get("num_points", 0) - 1
self._update_scan_progress(status_msg.scan_id, num_points, done=True)

View File

@ -62,6 +62,9 @@ class EmitterBase:
def on_cleanup(self, scan_id: str):
pass
def on_scan_status_update(self, status_msg: messages.ScanStatusMessage):
pass
def shutdown(self):
if self._buffered_connector_thread:
self._buffered_publisher_stop_event.set()

View File

@ -98,6 +98,7 @@ class ScanBundler(BECService):
self._initialize_scan_container(msg)
if scan_id not in self.scan_id_history:
self.scan_id_history.append(scan_id)
self.run_emitter("on_scan_status_update", msg)
if msg.content.get("status") != "open":
self._scan_status_modification(msg)
@ -106,7 +107,6 @@ class ScanBundler(BECService):
if status not in ["closed", "aborted", "paused", "halted"]:
logger.error(f"Unknown scan status {status}")
return
scan_id = msg.content.get("scan_id")
if not scan_id:
logger.warning(f"Received scan status update without scan_id: {msg}")