mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-03-05 00:12:49 +01:00
fix(heatmap): interpolation thread is killed only on exit, logger for dandling thread
This commit is contained in:
@@ -128,6 +128,12 @@ class _StepInterpolationWorker(QObject):
|
||||
def __init__(self, parent: QObject | None = None):
|
||||
super().__init__(parent=parent)
|
||||
self._active_request: _InterpolationRequest | None = None
|
||||
self._processing = False
|
||||
|
||||
@property
|
||||
def is_processing(self) -> bool:
|
||||
"""Return whether the worker is currently processing a request."""
|
||||
return self._processing
|
||||
|
||||
@SafeSlot(object, int)
|
||||
def process(self, request: _InterpolationRequest, data_version: int):
|
||||
@@ -139,6 +145,7 @@ class _StepInterpolationWorker(QObject):
|
||||
data_version(int): The data version for the request.
|
||||
"""
|
||||
self._active_request = request
|
||||
self._processing = True
|
||||
try:
|
||||
image, transform = Heatmap.compute_step_scan_image(
|
||||
x_data=np.asarray(request.x_data, dtype=float),
|
||||
@@ -150,7 +157,9 @@ class _StepInterpolationWorker(QObject):
|
||||
except Exception as exc: # pragma: no cover - defensive
|
||||
logger.warning(f"Step-scan interpolation failed with: {exc}")
|
||||
self.failed.emit(str(exc), data_version, request.scan_id)
|
||||
self._processing = False
|
||||
return
|
||||
self._processing = False
|
||||
self.finished.emit(image, transform, data_version, request.scan_id)
|
||||
|
||||
|
||||
@@ -682,7 +691,7 @@ class Heatmap(ImageBase):
|
||||
oversampling_factor=self._image_config.oversampling_factor,
|
||||
)
|
||||
|
||||
if self._interpolation_thread is not None and self._interpolation_thread.isRunning():
|
||||
if self._interpolation_worker is not None and self._interpolation_worker.is_processing:
|
||||
self._pending_interpolation_request = request
|
||||
return
|
||||
|
||||
@@ -721,16 +730,10 @@ class Heatmap(ImageBase):
|
||||
self._apply_image_update(img, transform)
|
||||
else:
|
||||
logger.info("Discarding outdated interpolation result.")
|
||||
if self._interpolation_thread is not None and self._interpolation_thread.isRunning():
|
||||
self._interpolation_thread.quit()
|
||||
self._interpolation_thread.wait()
|
||||
self._maybe_start_pending_interpolation()
|
||||
|
||||
def _on_interpolation_failed(self, error: str, data_version: int, scan_id: str):
|
||||
logger.warning(f"Interpolation failed for scan {scan_id} (version {data_version}): {error}")
|
||||
if self._interpolation_thread is not None and self._interpolation_thread.isRunning():
|
||||
self._interpolation_thread.quit()
|
||||
self._interpolation_thread.wait()
|
||||
self._maybe_start_pending_interpolation()
|
||||
|
||||
def _finish_interpolation_thread(self):
|
||||
@@ -738,17 +741,21 @@ class Heatmap(ImageBase):
|
||||
if self._interpolation_worker is not None:
|
||||
try:
|
||||
self.interpolation_requested.disconnect(self._interpolation_worker.process)
|
||||
except (TypeError, RuntimeError):
|
||||
# Defensive: disconnect may fail if already disconnected or during shutdown.
|
||||
except (TypeError, RuntimeError) as ext:
|
||||
logger.warning(f"Processing thread already disconnected: {ext}")
|
||||
pass
|
||||
self._interpolation_worker.deleteLater()
|
||||
self._interpolation_worker = None
|
||||
if self._interpolation_thread is not None:
|
||||
if self._interpolation_thread.isRunning():
|
||||
self._interpolation_thread.quit()
|
||||
self._interpolation_thread.wait()
|
||||
if not self._interpolation_thread.wait(3000): # 3s timeout
|
||||
logger.error(
|
||||
f"Interpolation thread of widget {self.gui_id} did not stop within timeout 3s; leaving it dangling."
|
||||
)
|
||||
self._interpolation_thread.deleteLater()
|
||||
self._interpolation_thread = None
|
||||
logger.info(f"Interpolation thread finished of widget {self.gui_id}")
|
||||
|
||||
def _maybe_start_pending_interpolation(self):
|
||||
if self._pending_interpolation_request is None:
|
||||
@@ -756,6 +763,8 @@ class Heatmap(ImageBase):
|
||||
if self._pending_interpolation_request.scan_id != self.scan_id:
|
||||
self._pending_interpolation_request = None
|
||||
return
|
||||
if self._interpolation_worker is not None and self._interpolation_worker.is_processing:
|
||||
return
|
||||
|
||||
pending = self._pending_interpolation_request
|
||||
self._pending_interpolation_request = None
|
||||
|
||||
@@ -4,8 +4,8 @@ import numpy as np
|
||||
import pytest
|
||||
from bec_lib import messages
|
||||
from bec_lib.scan_history import ScanHistory
|
||||
from qtpy.QtGui import QTransform
|
||||
from qtpy.QtCore import QPointF
|
||||
from qtpy.QtGui import QTransform
|
||||
|
||||
from bec_widgets.widgets.plots.heatmap.heatmap import (
|
||||
Heatmap,
|
||||
@@ -560,8 +560,9 @@ def test_pending_request_queueing_and_start(heatmap_widget):
|
||||
metadata={},
|
||||
info={"positions": [[0, 0], [1, 1], [2, 2], [3, 3]]},
|
||||
)
|
||||
heatmap_widget._interpolation_thread = mock.MagicMock()
|
||||
heatmap_widget._interpolation_thread.isRunning.return_value = True
|
||||
# Simulate an active worker processing a job so new requests are queued.
|
||||
heatmap_widget._interpolation_worker = mock.MagicMock()
|
||||
heatmap_widget._interpolation_worker.is_processing = True
|
||||
|
||||
with mock.patch.object(heatmap_widget, "_start_step_scan_interpolation") as start_mock:
|
||||
heatmap_widget._request_step_scan_interpolation(
|
||||
@@ -573,7 +574,7 @@ def test_pending_request_queueing_and_start(heatmap_widget):
|
||||
assert heatmap_widget._pending_interpolation_request is not None
|
||||
|
||||
# Now simulate worker finished and thread cleaned up
|
||||
heatmap_widget._interpolation_thread = None
|
||||
heatmap_widget._interpolation_worker.is_processing = False
|
||||
pending = heatmap_widget._pending_interpolation_request
|
||||
heatmap_widget._pending_interpolation_request = pending
|
||||
heatmap_widget._maybe_start_pending_interpolation()
|
||||
|
||||
Reference in New Issue
Block a user