diff --git a/bec_widgets/widgets/utility/logpanel/logpanel.py b/bec_widgets/widgets/utility/logpanel/logpanel.py index 6027d732..ad272a99 100644 --- a/bec_widgets/widgets/utility/logpanel/logpanel.py +++ b/bec_widgets/widgets/utility/logpanel/logpanel.py @@ -11,10 +11,10 @@ from re import Pattern from typing import TYPE_CHECKING, Literal from bec_lib.client import BECClient -from bec_lib.connector import ConnectorBase from bec_lib.endpoints import MessageEndpoints from bec_lib.logger import LogLevel, bec_logger from bec_lib.messages import LogMessage, StatusMessage +from pyqtgraph import SignalProxy from PySide6.QtCore import QObject from qtpy.QtCore import QDateTime, Qt, Signal from qtpy.QtGui import QFont @@ -81,11 +81,11 @@ class BecLogsQueue(BECConnector, QObject): parent: QObject | None, maxlen: int = 1000, line_formatter: LineFormatter = noop_format, + **kwargs, ) -> None: - super().__init__(parent=parent) + super().__init__(parent=parent, **kwargs) self._timestamp_start: QDateTime | None = None self._timestamp_end: QDateTime | None = None - self._conn = self.client.connector self._max_length = maxlen self._data: deque[LogMessage] = deque([], self._max_length) self._display_queue: deque[str] = deque([], self._max_length) @@ -95,25 +95,23 @@ class BecLogsQueue(BECConnector, QObject): self._set_formatter_and_update_filter(line_formatter) # instance attribute still accessible after c++ object is deleted, so the callback can be unregistered self._callback = lambda *args: self._process_incoming_log_msg(*args) - self._conn.register([MessageEndpoints.log()], None, self._callback) - self.destroyed.connect(self.unsub_from_redis) + self.bec_dispatcher.connect_slot(self._callback, MessageEndpoints.log()) - def unsub_from_redis(self, *_): + def cleanup(self, *_): """Stop listening to the Redis log stream""" - self._conn.unregister([MessageEndpoints.log()], None, self._callback) + self.bec_dispatcher.disconnect_slot(self._callback, [MessageEndpoints.log()]) - def _process_incoming_log_msg(self, msg: dict): + @SafeSlot(verify_sender=True) + def _process_incoming_log_msg(self, msg: dict, _metadata: dict): try: - _msg: LogMessage | None = msg.get("data", None) - if _msg is None or not isinstance(_msg, LogMessage): - return + _msg = LogMessage(**msg) self._data.append(_msg) if self.filter is None or self.filter(_msg): self._display_queue.append(self._line_formatter(_msg)) self.new_message.emit() except Exception as e: if "Internal C++ object (BecLogsQueue) already deleted." in e.args: - self.unsub_from_redis() + self.bec_dispatcher.disconnect_slot(self._callback, [MessageEndpoints.log()]) return logger.warning(f"Error in LogPanel incoming message callback: {e}") @@ -211,7 +209,7 @@ class BecLogsQueue(BECConnector, QObject): """Fetch all available messages from Redis""" self._data = deque( item["data"] - for item in self._conn.xread( + for item in self.bec_dispatcher.client.connector.xread( MessageEndpoints.log().endpoint, from_start=True, count=self._max_length ) ) @@ -405,7 +403,6 @@ class LogPanel(TextBox): """Displays a log panel""" ICON_NAME = "terminal" - _new_messages = Signal() service_list_update = Signal(dict, set) def __init__( @@ -422,7 +419,9 @@ class LogPanel(TextBox): self._log_manager = BecLogsQueue( parent=self, line_formatter=partial(simple_color_format, colors=self._colors) ) - self._log_manager.new_message.connect(self._new_messages) + self._proxy_update = SignalProxy( + self._log_manager.new_message, rateLimit=1, slot=self._on_append + ) self.toolbar = LogPanelToolbar(parent=self) self.toolbar_area = QScrollArea() @@ -438,7 +437,6 @@ class LogPanel(TextBox): self.toolbar.search_textbox.returnPressed.connect(self._on_re_update) self.toolbar.regex_enabled.checkStateChanged.connect(self._on_re_update) self.toolbar.filter_level_dropdown.currentTextChanged.connect(self._set_level_filter) - self._new_messages.connect(self._on_append) self.toolbar.timerange_button.clicked.connect(self._choose_datetime) self._service_status.services_update.connect(self._update_service_list) @@ -490,10 +488,10 @@ class LogPanel(TextBox): self.set_html_text(self._log_manager.display_all()) self._cursor_to_end() - @SafeSlot() - def _on_append(self): - self._cursor_to_end() + @SafeSlot(verify_sender=True) + def _on_append(self, *_): self.text_box_text_edit.insertHtml(self._log_manager.format_new()) + self._cursor_to_end() @SafeSlot() def _on_clear(self): @@ -536,9 +534,8 @@ class LogPanel(TextBox): def cleanup(self): self._service_status.cleanup() - self._log_manager.new_message.disconnect() - self._new_messages.disconnect() - self._log_manager.unsub_from_redis() + self._log_manager.cleanup() + self._log_manager.deleteLater() super().cleanup() diff --git a/tests/unit_tests/test_logpanel.py b/tests/unit_tests/test_logpanel.py index 43a931a0..b9d20731 100644 --- a/tests/unit_tests/test_logpanel.py +++ b/tests/unit_tests/test_logpanel.py @@ -66,7 +66,6 @@ def log_panel(qtbot, mocked_client: MagicMock): qtbot.addWidget(widget) qtbot.waitExposed(widget) yield widget - widget.cleanup() def test_log_panel_init(log_panel: LogPanel): @@ -98,14 +97,13 @@ def test_logpanel_output(qtbot, log_panel: LogPanel): return len(log_panel._log_manager._display_queue) == 0 next_text = "datetime | error | test log message" + msg = LogMessage( + metadata={}, + log_type="error", + log_msg={"text": next_text, "record": {}, "service_name": "ScanServer"}, + ) log_panel._log_manager._process_incoming_log_msg( - { - "data": LogMessage( - metadata={}, - log_type="error", - log_msg={"text": next_text, "record": {}, "service_name": "ScanServer"}, - ) - } + msg.content, msg.metadata, _override_slot_params={"verify_sender": False} ) qtbot.waitUntil(display_queue_empty, timeout=5000) @@ -142,16 +140,22 @@ def test_timestamp_filter(log_panel: LogPanel): def test_error_handling_in_callback(log_panel: LogPanel): log_panel._log_manager.new_message = MagicMock() - cbs = (lambda: log_panel._log_manager._process_incoming_log_msg, {}) with patch("bec_widgets.widgets.utility.logpanel.logpanel.logger") as logger: # generally errors should be logged log_panel._log_manager.new_message.emit = MagicMock( side_effect=ValueError("Something went wrong") ) - log_panel.client.connector._handle_message( - msg=StreamMessage( - msg={"data": LogMessage(log_type="debug", log_msg="message")}, callbacks=[cbs] - ) + msg = LogMessage( + metadata={}, + log_type="debug", + log_msg={ + "text": "datetime | debug | test log message", + "record": {"time": {"timestamp": 123456789.000}}, + "service_name": "ScanServer", + }, + ) + log_panel._log_manager._process_incoming_log_msg( + msg.content, msg.metadata, _override_slot_params={"verify_sender": False} ) logger.warning.assert_called_once() @@ -159,9 +163,7 @@ def test_error_handling_in_callback(log_panel: LogPanel): log_panel._log_manager.new_message.emit = MagicMock( side_effect=RuntimeError("Internal C++ object (BecLogsQueue) already deleted.") ) - log_panel.client.connector._handle_message( - msg=StreamMessage( - msg={"data": LogMessage(log_type="debug", log_msg="message")}, callbacks=[cbs] - ) + log_panel._log_manager._process_incoming_log_msg( + msg.content, msg.metadata, _override_slot_params={"verify_sender": False} ) logger.warning.assert_called_once()