mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 03:31:50 +02:00
fix: move log panel to bec connector and add rate limiter
This commit is contained in:
@ -11,10 +11,10 @@ from re import Pattern
|
|||||||
from typing import TYPE_CHECKING, Literal
|
from typing import TYPE_CHECKING, Literal
|
||||||
|
|
||||||
from bec_lib.client import BECClient
|
from bec_lib.client import BECClient
|
||||||
from bec_lib.connector import ConnectorBase
|
|
||||||
from bec_lib.endpoints import MessageEndpoints
|
from bec_lib.endpoints import MessageEndpoints
|
||||||
from bec_lib.logger import LogLevel, bec_logger
|
from bec_lib.logger import LogLevel, bec_logger
|
||||||
from bec_lib.messages import LogMessage, StatusMessage
|
from bec_lib.messages import LogMessage, StatusMessage
|
||||||
|
from pyqtgraph import SignalProxy
|
||||||
from PySide6.QtCore import QObject
|
from PySide6.QtCore import QObject
|
||||||
from qtpy.QtCore import QDateTime, Qt, Signal
|
from qtpy.QtCore import QDateTime, Qt, Signal
|
||||||
from qtpy.QtGui import QFont
|
from qtpy.QtGui import QFont
|
||||||
@ -81,11 +81,11 @@ class BecLogsQueue(BECConnector, QObject):
|
|||||||
parent: QObject | None,
|
parent: QObject | None,
|
||||||
maxlen: int = 1000,
|
maxlen: int = 1000,
|
||||||
line_formatter: LineFormatter = noop_format,
|
line_formatter: LineFormatter = noop_format,
|
||||||
|
**kwargs,
|
||||||
) -> None:
|
) -> None:
|
||||||
super().__init__(parent=parent)
|
super().__init__(parent=parent, **kwargs)
|
||||||
self._timestamp_start: QDateTime | None = None
|
self._timestamp_start: QDateTime | None = None
|
||||||
self._timestamp_end: QDateTime | None = None
|
self._timestamp_end: QDateTime | None = None
|
||||||
self._conn = self.client.connector
|
|
||||||
self._max_length = maxlen
|
self._max_length = maxlen
|
||||||
self._data: deque[LogMessage] = deque([], self._max_length)
|
self._data: deque[LogMessage] = deque([], self._max_length)
|
||||||
self._display_queue: deque[str] = deque([], self._max_length)
|
self._display_queue: deque[str] = deque([], self._max_length)
|
||||||
@ -95,25 +95,23 @@ class BecLogsQueue(BECConnector, QObject):
|
|||||||
self._set_formatter_and_update_filter(line_formatter)
|
self._set_formatter_and_update_filter(line_formatter)
|
||||||
# instance attribute still accessible after c++ object is deleted, so the callback can be unregistered
|
# instance attribute still accessible after c++ object is deleted, so the callback can be unregistered
|
||||||
self._callback = lambda *args: self._process_incoming_log_msg(*args)
|
self._callback = lambda *args: self._process_incoming_log_msg(*args)
|
||||||
self._conn.register([MessageEndpoints.log()], None, self._callback)
|
self.bec_dispatcher.connect_slot(self._callback, MessageEndpoints.log())
|
||||||
self.destroyed.connect(self.unsub_from_redis)
|
|
||||||
|
|
||||||
def unsub_from_redis(self, *_):
|
def cleanup(self, *_):
|
||||||
"""Stop listening to the Redis log stream"""
|
"""Stop listening to the Redis log stream"""
|
||||||
self._conn.unregister([MessageEndpoints.log()], None, self._callback)
|
self.bec_dispatcher.disconnect_slot(self._callback, [MessageEndpoints.log()])
|
||||||
|
|
||||||
def _process_incoming_log_msg(self, msg: dict):
|
@SafeSlot(verify_sender=True)
|
||||||
|
def _process_incoming_log_msg(self, msg: dict, _metadata: dict):
|
||||||
try:
|
try:
|
||||||
_msg: LogMessage | None = msg.get("data", None)
|
_msg = LogMessage(**msg)
|
||||||
if _msg is None or not isinstance(_msg, LogMessage):
|
|
||||||
return
|
|
||||||
self._data.append(_msg)
|
self._data.append(_msg)
|
||||||
if self.filter is None or self.filter(_msg):
|
if self.filter is None or self.filter(_msg):
|
||||||
self._display_queue.append(self._line_formatter(_msg))
|
self._display_queue.append(self._line_formatter(_msg))
|
||||||
self.new_message.emit()
|
self.new_message.emit()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if "Internal C++ object (BecLogsQueue) already deleted." in e.args:
|
if "Internal C++ object (BecLogsQueue) already deleted." in e.args:
|
||||||
self.unsub_from_redis()
|
self.bec_dispatcher.disconnect_slot(self._callback, [MessageEndpoints.log()])
|
||||||
return
|
return
|
||||||
logger.warning(f"Error in LogPanel incoming message callback: {e}")
|
logger.warning(f"Error in LogPanel incoming message callback: {e}")
|
||||||
|
|
||||||
@ -211,7 +209,7 @@ class BecLogsQueue(BECConnector, QObject):
|
|||||||
"""Fetch all available messages from Redis"""
|
"""Fetch all available messages from Redis"""
|
||||||
self._data = deque(
|
self._data = deque(
|
||||||
item["data"]
|
item["data"]
|
||||||
for item in self._conn.xread(
|
for item in self.bec_dispatcher.client.connector.xread(
|
||||||
MessageEndpoints.log().endpoint, from_start=True, count=self._max_length
|
MessageEndpoints.log().endpoint, from_start=True, count=self._max_length
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
@ -405,7 +403,6 @@ class LogPanel(TextBox):
|
|||||||
"""Displays a log panel"""
|
"""Displays a log panel"""
|
||||||
|
|
||||||
ICON_NAME = "terminal"
|
ICON_NAME = "terminal"
|
||||||
_new_messages = Signal()
|
|
||||||
service_list_update = Signal(dict, set)
|
service_list_update = Signal(dict, set)
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
@ -422,7 +419,9 @@ class LogPanel(TextBox):
|
|||||||
self._log_manager = BecLogsQueue(
|
self._log_manager = BecLogsQueue(
|
||||||
parent=self, line_formatter=partial(simple_color_format, colors=self._colors)
|
parent=self, line_formatter=partial(simple_color_format, colors=self._colors)
|
||||||
)
|
)
|
||||||
self._log_manager.new_message.connect(self._new_messages)
|
self._proxy_update = SignalProxy(
|
||||||
|
self._log_manager.new_message, rateLimit=1, slot=self._on_append
|
||||||
|
)
|
||||||
|
|
||||||
self.toolbar = LogPanelToolbar(parent=self)
|
self.toolbar = LogPanelToolbar(parent=self)
|
||||||
self.toolbar_area = QScrollArea()
|
self.toolbar_area = QScrollArea()
|
||||||
@ -438,7 +437,6 @@ class LogPanel(TextBox):
|
|||||||
self.toolbar.search_textbox.returnPressed.connect(self._on_re_update)
|
self.toolbar.search_textbox.returnPressed.connect(self._on_re_update)
|
||||||
self.toolbar.regex_enabled.checkStateChanged.connect(self._on_re_update)
|
self.toolbar.regex_enabled.checkStateChanged.connect(self._on_re_update)
|
||||||
self.toolbar.filter_level_dropdown.currentTextChanged.connect(self._set_level_filter)
|
self.toolbar.filter_level_dropdown.currentTextChanged.connect(self._set_level_filter)
|
||||||
self._new_messages.connect(self._on_append)
|
|
||||||
|
|
||||||
self.toolbar.timerange_button.clicked.connect(self._choose_datetime)
|
self.toolbar.timerange_button.clicked.connect(self._choose_datetime)
|
||||||
self._service_status.services_update.connect(self._update_service_list)
|
self._service_status.services_update.connect(self._update_service_list)
|
||||||
@ -490,10 +488,10 @@ class LogPanel(TextBox):
|
|||||||
self.set_html_text(self._log_manager.display_all())
|
self.set_html_text(self._log_manager.display_all())
|
||||||
self._cursor_to_end()
|
self._cursor_to_end()
|
||||||
|
|
||||||
@SafeSlot()
|
@SafeSlot(verify_sender=True)
|
||||||
def _on_append(self):
|
def _on_append(self, *_):
|
||||||
self._cursor_to_end()
|
|
||||||
self.text_box_text_edit.insertHtml(self._log_manager.format_new())
|
self.text_box_text_edit.insertHtml(self._log_manager.format_new())
|
||||||
|
self._cursor_to_end()
|
||||||
|
|
||||||
@SafeSlot()
|
@SafeSlot()
|
||||||
def _on_clear(self):
|
def _on_clear(self):
|
||||||
@ -536,9 +534,8 @@ class LogPanel(TextBox):
|
|||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
self._service_status.cleanup()
|
self._service_status.cleanup()
|
||||||
self._log_manager.new_message.disconnect()
|
self._log_manager.cleanup()
|
||||||
self._new_messages.disconnect()
|
self._log_manager.deleteLater()
|
||||||
self._log_manager.unsub_from_redis()
|
|
||||||
super().cleanup()
|
super().cleanup()
|
||||||
|
|
||||||
|
|
||||||
|
@ -66,7 +66,6 @@ def log_panel(qtbot, mocked_client: MagicMock):
|
|||||||
qtbot.addWidget(widget)
|
qtbot.addWidget(widget)
|
||||||
qtbot.waitExposed(widget)
|
qtbot.waitExposed(widget)
|
||||||
yield widget
|
yield widget
|
||||||
widget.cleanup()
|
|
||||||
|
|
||||||
|
|
||||||
def test_log_panel_init(log_panel: LogPanel):
|
def test_log_panel_init(log_panel: LogPanel):
|
||||||
@ -98,14 +97,13 @@ def test_logpanel_output(qtbot, log_panel: LogPanel):
|
|||||||
return len(log_panel._log_manager._display_queue) == 0
|
return len(log_panel._log_manager._display_queue) == 0
|
||||||
|
|
||||||
next_text = "datetime | error | test log message"
|
next_text = "datetime | error | test log message"
|
||||||
|
msg = LogMessage(
|
||||||
|
metadata={},
|
||||||
|
log_type="error",
|
||||||
|
log_msg={"text": next_text, "record": {}, "service_name": "ScanServer"},
|
||||||
|
)
|
||||||
log_panel._log_manager._process_incoming_log_msg(
|
log_panel._log_manager._process_incoming_log_msg(
|
||||||
{
|
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
|
||||||
"data": LogMessage(
|
|
||||||
metadata={},
|
|
||||||
log_type="error",
|
|
||||||
log_msg={"text": next_text, "record": {}, "service_name": "ScanServer"},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
qtbot.waitUntil(display_queue_empty, timeout=5000)
|
qtbot.waitUntil(display_queue_empty, timeout=5000)
|
||||||
@ -142,16 +140,22 @@ def test_timestamp_filter(log_panel: LogPanel):
|
|||||||
def test_error_handling_in_callback(log_panel: LogPanel):
|
def test_error_handling_in_callback(log_panel: LogPanel):
|
||||||
log_panel._log_manager.new_message = MagicMock()
|
log_panel._log_manager.new_message = MagicMock()
|
||||||
|
|
||||||
cbs = (lambda: log_panel._log_manager._process_incoming_log_msg, {})
|
|
||||||
with patch("bec_widgets.widgets.utility.logpanel.logpanel.logger") as logger:
|
with patch("bec_widgets.widgets.utility.logpanel.logpanel.logger") as logger:
|
||||||
# generally errors should be logged
|
# generally errors should be logged
|
||||||
log_panel._log_manager.new_message.emit = MagicMock(
|
log_panel._log_manager.new_message.emit = MagicMock(
|
||||||
side_effect=ValueError("Something went wrong")
|
side_effect=ValueError("Something went wrong")
|
||||||
)
|
)
|
||||||
log_panel.client.connector._handle_message(
|
msg = LogMessage(
|
||||||
msg=StreamMessage(
|
metadata={},
|
||||||
msg={"data": LogMessage(log_type="debug", log_msg="message")}, callbacks=[cbs]
|
log_type="debug",
|
||||||
)
|
log_msg={
|
||||||
|
"text": "datetime | debug | test log message",
|
||||||
|
"record": {"time": {"timestamp": 123456789.000}},
|
||||||
|
"service_name": "ScanServer",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
log_panel._log_manager._process_incoming_log_msg(
|
||||||
|
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
|
||||||
)
|
)
|
||||||
logger.warning.assert_called_once()
|
logger.warning.assert_called_once()
|
||||||
|
|
||||||
@ -159,9 +163,7 @@ def test_error_handling_in_callback(log_panel: LogPanel):
|
|||||||
log_panel._log_manager.new_message.emit = MagicMock(
|
log_panel._log_manager.new_message.emit = MagicMock(
|
||||||
side_effect=RuntimeError("Internal C++ object (BecLogsQueue) already deleted.")
|
side_effect=RuntimeError("Internal C++ object (BecLogsQueue) already deleted.")
|
||||||
)
|
)
|
||||||
log_panel.client.connector._handle_message(
|
log_panel._log_manager._process_incoming_log_msg(
|
||||||
msg=StreamMessage(
|
msg.content, msg.metadata, _override_slot_params={"verify_sender": False}
|
||||||
msg={"data": LogMessage(log_type="debug", log_msg="message")}, callbacks=[cbs]
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
logger.warning.assert_called_once()
|
logger.warning.assert_called_once()
|
||||||
|
Reference in New Issue
Block a user