1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-03-04 16:02:51 +01:00

feat(image): modernization of image widget

This commit is contained in:
2025-12-02 13:31:03 +01:00
parent 91ff057054
commit 7c769d3522
9 changed files with 1287 additions and 402 deletions

View File

@@ -2502,16 +2502,30 @@ class Image(RPCBase):
@property
@rpc_call
def monitor(self) -> "str":
def device_name(self) -> "str":
"""
The name of the monitor to use for the image.
The name of the device to monitor for image data.
"""
@monitor.setter
@device_name.setter
@rpc_call
def monitor(self) -> "str":
def device_name(self) -> "str":
"""
The name of the monitor to use for the image.
The name of the device to monitor for image data.
"""
@property
@rpc_call
def device_entry(self) -> "str":
"""
The signal/entry name to monitor on the device.
"""
@device_entry.setter
@rpc_call
def device_entry(self) -> "str":
"""
The signal/entry name to monitor on the device.
"""
@rpc_call
@@ -2617,8 +2631,8 @@ class Image(RPCBase):
@rpc_call
def image(
self,
monitor: "str | tuple | None" = None,
monitor_type: "Literal['auto', '1d', '2d']" = "auto",
device_name: "str | None" = None,
device_entry: "str | None" = None,
color_map: "str | None" = None,
color_bar: "Literal['simple', 'full'] | None" = None,
vrange: "tuple[int, int] | None" = None,
@@ -2627,14 +2641,14 @@ class Image(RPCBase):
Set the image source and update the image.
Args:
monitor(str|tuple|None): The name of the monitor to use for the image, or a tuple of (device, signal) for preview signals. If None or empty string, the current monitor will be disconnected.
monitor_type(str): The type of monitor to use. Options are "1d", "2d", or "auto".
device_name(str|None): The name of the device to monitor. If None or empty string, the current monitor will be disconnected.
device_entry(str|None): The signal/entry name to monitor on the device.
color_map(str): The color map to use for the image.
color_bar(str): The type of color bar to use. Options are "simple" or "full".
vrange(tuple): The range of values to use for the color map.
Returns:
ImageItem: The image object.
ImageItem: The image object, or None if connection failed.
"""
@property

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,253 @@
from qtpy.QtWidgets import QHBoxLayout, QSizePolicy, QWidget
from bec_widgets.utils.toolbars.actions import WidgetAction
from bec_widgets.utils.toolbars.bundles import ToolbarBundle, ToolbarComponents
from bec_widgets.utils.toolbars.connections import BundleConnection
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.control.device_input.signal_combobox.signal_combobox import SignalComboBox
class DeviceSelection(QWidget):
"""Device and signal selection widget for image toolbar."""
def __init__(self, parent=None, client=None):
super().__init__(parent=parent)
self.client = client
self.supported_signals = [
"PreviewSignal",
"AsyncSignal",
"AsyncMultiSignal",
"DynamicSignal",
]
# Create device combobox with signal class filter
# This will only show devices that have signals matching the supported signal classes
self.device_combo_box = DeviceComboBox(
parent=self, client=self.client, signal_class_filter=self.supported_signals
)
self.device_combo_box.setToolTip("Select Device")
self.device_combo_box.setEditable(True)
# Set expanding size policy so it grows with available space
self.device_combo_box.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed)
# Configure SignalComboBox to filter by PreviewSignal and supported async signals
# Also filter by ndim (1D and 2D only) for Image widget
self.signal_combo_box = SignalComboBox(
parent=self,
client=self.client,
signal_class_filter=[
"PreviewSignal",
"AsyncSignal",
"AsyncMultiSignal",
"DynamicSignal",
],
ndim_filter=[1, 2], # Only show 1D and 2D signals for Image widget
store_signal_config=True,
require_device=True,
)
self.signal_combo_box.setToolTip("Select Signal")
self.signal_combo_box.setEditable(True)
# Set expanding size policy so it grows with available space
self.signal_combo_box.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed)
# Connect comboboxes together
self.device_combo_box.currentTextChanged.connect(self.signal_combo_box.set_device)
self.device_combo_box.device_reset.connect(self.signal_combo_box.reset_selection)
# Simple horizontal layout with stretch to fill space
layout = QHBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(2)
layout.addWidget(self.device_combo_box, stretch=1)
layout.addWidget(self.signal_combo_box, stretch=1)
def set_device_and_signal(self, device_name: str | None, device_entry: str | None) -> None:
"""Set the displayed device and signal without emitting selection signals."""
device_name = device_name or ""
device_entry = device_entry or ""
self.device_combo_box.blockSignals(True)
self.signal_combo_box.blockSignals(True)
try:
if device_name:
# Set device in device_combo_box
index = self.device_combo_box.findText(device_name)
if index >= 0:
self.device_combo_box.setCurrentIndex(index)
else:
# Device not found in list, but still set it
self.device_combo_box.setCurrentText(device_name)
# Only update signal combobox device filter if it's actually changing
# This prevents redundant repopulation which can cause duplicates !!!!
current_device = getattr(self.signal_combo_box, "_device", None)
if current_device != device_name:
self.signal_combo_box.set_device(device_name)
# Sync signal combobox selection
if device_entry:
# Try to find the signal by component_name (which is what's displayed)
found = False
for i in range(self.signal_combo_box.count()):
text = self.signal_combo_box.itemText(i)
config_data = self.signal_combo_box.itemData(i)
# Check if this matches our signal
if config_data:
component_name = config_data.get("component_name", "")
if text == component_name or text == device_entry:
self.signal_combo_box.setCurrentIndex(i)
found = True
break
if not found:
# Fallback: try to match the device_entry directly
index = self.signal_combo_box.findText(device_entry)
if index >= 0:
self.signal_combo_box.setCurrentIndex(index)
else:
# No device set, clear selections
self.device_combo_box.setCurrentText("")
self.signal_combo_box.reset_selection()
finally:
# Always unblock signals
self.device_combo_box.blockSignals(False)
self.signal_combo_box.blockSignals(False)
def set_connection_status(self, status: str, message: str | None = None) -> None:
tooltip = f"Connection status: {status}"
if message:
tooltip = f"{tooltip}\n{message}"
self.device_combo_box.setToolTip(tooltip)
self.signal_combo_box.setToolTip(tooltip)
if not self.device_combo_box.is_valid_input or not self.signal_combo_box.is_valid_input:
return
if status == "error":
style = "border: 1px solid orange;"
else:
style = "border: 1px solid transparent;"
self.device_combo_box.setStyleSheet(style)
self.signal_combo_box.setStyleSheet(style)
def cleanup(self):
"""Clean up the widget resources."""
self.device_combo_box.close()
self.device_combo_box.deleteLater()
self.signal_combo_box.close()
self.signal_combo_box.deleteLater()
def device_selection_bundle(components: ToolbarComponents, client=None) -> ToolbarBundle:
"""
Creates a device selection toolbar bundle for Image widget.
Includes a resizable splitter after the device selection. All subsequent bundles'
actions will appear compactly after the splitter with no gaps.
Args:
components (ToolbarComponents): The components to be added to the bundle.
client: The BEC client instance.
Returns:
ToolbarBundle: The device selection toolbar bundle.
"""
device_selection_widget = DeviceSelection(parent=components.toolbar, client=client)
components.add_safe(
"device_selection", WidgetAction(widget=device_selection_widget, adjust_size=False)
)
bundle = ToolbarBundle("device_selection", components)
bundle.add_action("device_selection")
bundle.add_splitter(
name="device_selection_splitter",
target_widget=device_selection_widget,
min_width=210,
max_width=600,
)
return bundle
class DeviceSelectionConnection(BundleConnection):
"""
Connection helper for the device selection bundle.
"""
def __init__(self, components: ToolbarComponents, target_widget=None):
super().__init__(parent=components.toolbar)
self.bundle_name = "device_selection"
self.components = components
self.target_widget = target_widget
self._connected = False
self.register_property_sync("device_name", self._sync_from_device_name)
self.register_property_sync("device_entry", self._sync_from_device_entry)
self.register_property_sync("connection_status", self._sync_connection_status)
self.register_property_sync("connection_error", self._sync_connection_status)
def _widget(self) -> DeviceSelection:
return self.components.get_action("device_selection").widget
def connect(self):
if self._connected:
return
widget = self._widget()
widget.device_combo_box.device_selected.connect(
self.target_widget.on_device_selection_changed
)
widget.signal_combo_box.device_signal_changed.connect(
self.target_widget.on_device_selection_changed
)
self.connect_property_sync(self.target_widget)
self._connected = True
def disconnect(self):
if not self._connected:
return
widget = self._widget()
widget.device_combo_box.device_selected.disconnect(
self.target_widget.on_device_selection_changed
)
widget.signal_combo_box.device_signal_changed.disconnect(
self.target_widget.on_device_selection_changed
)
self.disconnect_property_sync(self.target_widget)
self._connected = False
widget.cleanup()
def _sync_from_device_name(self, _):
try:
widget = self._widget()
except Exception:
return
widget.set_device_and_signal(
self.target_widget.device_name, self.target_widget.device_entry
)
self.target_widget._sync_device_entry_from_toolbar()
def _sync_from_device_entry(self, _):
try:
widget = self._widget()
except Exception:
return
widget.set_device_and_signal(
self.target_widget.device_name, self.target_widget.device_entry
)
def _sync_connection_status(self, _):
try:
widget = self._widget()
except Exception:
return
widget.set_connection_status(
self.target_widget._config.connection_status,
self.target_widget._config.connection_error,
)

View File

@@ -32,7 +32,7 @@ dock_area = gui.new()
img_widget = dock_area.new().new(gui.available_widgets.Image)
# Add an ImageWidget to the BECFigure for a 2D detector
img_widget.image(monitor='eiger', monitor_type='2d')
img_widget.image(device_name='eiger', device_entry='preview')
img_widget.title = "Camera Image - Eiger Detector"
```
@@ -46,7 +46,7 @@ dock_area = gui.new()
img_widget = dock_area.new().new(gui.available_widgets.Image)
# Add an ImageWidget to the BECFigure for a 2D detector
img_widget.image(monitor='waveform', monitor_type='1d')
img_widget.image(device_name='waveform', device_entry='data')
img_widget.title = "Line Detector Data"
# Optional: Set the color map and value range
@@ -84,7 +84,7 @@ The Image Widget can be configured for different detectors by specifying the cor
```python
# For a 2D camera detector
img_widget = fig.image(monitor='eiger', monitor_type='2d')
img_widget = fig.image(device_name='eiger', device_entry='preview')
img_widget.set_title("Eiger Camera Image")
```
@@ -92,7 +92,7 @@ img_widget.set_title("Eiger Camera Image")
```python
# For a 1D line detector
img_widget = fig.image(monitor='waveform', monitor_type='1d')
img_widget = fig.image(device_name='waveform', device_entry='data')
img_widget.set_title("Line Detector Data")
```

View File

@@ -59,7 +59,7 @@ def test_rpc_add_dock_with_plots_e2e(qtbot, bec_client_lib, connected_client_gui
mm.map("samx", "samy")
curve = wf.plot(x_name="samx", y_name="bpm4i")
im_item = im.image("eiger")
im_item = im.image(device_name="eiger", device_entry="preview")
assert curve.__class__.__name__ == "RPCReference"
assert curve.__class__ == RPCReference

View File

@@ -42,7 +42,7 @@ def test_rpc_plotting_shortcuts_init_configs(qtbot, connected_client_gui_obj):
c3 = wf.plot(y=[1, 2, 3], x=[1, 2, 3])
assert c3.object_name == "Curve_0"
im.image(monitor="eiger")
im.image(device_name="eiger", device_entry="preview")
mm.map(x_name="samx", y_name="samy")
sw.plot(x_name="samx", y_name="samy", z_name="bpm4a")
mw.plot(monitor="waveform")
@@ -166,14 +166,14 @@ def test_rpc_image(qtbot, bec_client_lib, connected_client_gui_obj):
scans = client.scans
im = dock_area.new("Image")
im.image(monitor="eiger")
im.image(device_name="eiger", device_entry="preview")
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
status.wait()
last_image_device = client.connector.get_last(MessageEndpoints.device_monitor_2d("eiger"))[
"data"
].data
last_image_device = client.connector.get_last(
MessageEndpoints.device_preview("eiger", "preview")
)["data"].data
last_image_plot = im.main_image.get_data()
# check plotted data

View File

@@ -15,7 +15,7 @@ def test_rpc_reference_objects(connected_client_gui_obj):
plt.plot(x_name="samx", y_name="bpm4i")
im = dock_area.new("Image")
im.image("eiger")
im.image(device_name="eiger", device_entry="preview")
motor_map = dock_area.new("MotorMap")
motor_map.map("samx", "samy")
plt_z = dock_area.new("Waveform")
@@ -23,7 +23,8 @@ def test_rpc_reference_objects(connected_client_gui_obj):
assert len(plt_z.curves) == 1
assert len(plt.curves) == 1
assert im.monitor == "eiger"
assert im.device_name == "eiger"
assert im.device_entry == "preview"
assert isinstance(im.main_image, RPCReference)
image_item = gui._ipython_registry.get(im.main_image._gui_id, None)

View File

@@ -16,6 +16,7 @@ from typing import TYPE_CHECKING
import numpy as np
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.rpc.rpc_base import RPCBase, RPCReference
@@ -233,7 +234,7 @@ def test_widgets_e2e_image(qtbot, connected_client_gui_obj, random_generator_fro
scans = bec.scans
dev = bec.device_manager.devices
# Test rpc calls
img = widget.image(dev.eiger)
img = widget.image(device_name=dev.eiger.name, device_entry="preview")
assert img.get_data() is None
# Run a scan and plot the image
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
@@ -247,13 +248,13 @@ def test_widgets_e2e_image(qtbot, connected_client_gui_obj, random_generator_fro
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# Check that last image is equivalent to data in Redis
last_img = bec.device_monitor.get_data(
dev.eiger, count=1
) # Get last image from Redis monitor 2D endpoint
last_img = bec.connector.get_last(MessageEndpoints.device_preview("eiger", "preview"))[
"data"
].data
assert np.allclose(img.get_data(), last_img)
# Now add a device with a preview signal
img = widget.image(["eiger", "preview"])
img = widget.image(device_name="eiger", device_entry="preview")
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
s.wait()

View File

@@ -1,6 +1,7 @@
import numpy as np
import pyqtgraph as pg
import pytest
from bec_lib.endpoints import MessageEndpoints
from qtpy.QtCore import QPointF
from bec_widgets.widgets.plots.image.image import Image
@@ -12,6 +13,23 @@ from tests.unit_tests.conftest import create_widget
##################################################
def _set_signal_config(
client,
device_name: str,
signal_name: str,
signal_class: str,
ndim: int,
obj_name: str | None = None,
):
device = client.device_manager.devices[device_name]
device._info["signals"][signal_name] = {
"obj_name": obj_name or signal_name,
"signal_class": signal_class,
"component_name": signal_name,
"describe": {"signal_info": {"ndim": ndim}},
}
def test_initialization_defaults(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
assert bec_image_view.color_map == "plasma"
@@ -114,32 +132,35 @@ def test_enable_colorbar_with_vrange(qtbot, mocked_client, colorbar_type):
##############################################
# Previewsignal update mechanism
# Device/signal update mechanism
def test_image_setup_preview_signal_1d(qtbot, mocked_client, monkeypatch):
def test_image_setup_preview_signal_1d(qtbot, mocked_client):
"""
Ensure that calling .image() with a (device, signal, config) tuple representing
a 1D PreviewSignal connects using the 1D path and updates correctly.
Ensure that calling .image() with a 1D PreviewSignal connects using the 1D path
and updates correctly.
"""
import numpy as np
view = create_widget(qtbot, Image, client=mocked_client)
signal_config = {
"obj_name": "waveform1d_img",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 1}},
}
_set_signal_config(
mocked_client,
"waveform1d",
"img",
signal_class="PreviewSignal",
ndim=1,
obj_name="waveform1d_img",
)
# Set the image monitor to the preview signal
view.image(monitor=("waveform1d", "img", signal_config))
view.image(device_name="waveform1d", device_entry="img")
# Subscriptions should indicate 1D preview connection
sub = view.subscriptions["main"]
assert sub.source == "device_monitor_1d"
assert sub.monitor_type == "1d"
assert sub.monitor == ("waveform1d", "img", signal_config)
assert view.device_name == "waveform1d"
assert view.device_entry == "img"
# Simulate a waveform update from the dispatcher
waveform = np.arange(25, dtype=float)
@@ -148,29 +169,32 @@ def test_image_setup_preview_signal_1d(qtbot, mocked_client, monkeypatch):
np.testing.assert_array_equal(view.main_image.raw_data[0], waveform)
def test_image_setup_preview_signal_2d(qtbot, mocked_client, monkeypatch):
def test_image_setup_preview_signal_2d(qtbot, mocked_client):
"""
Ensure that calling .image() with a (device, signal, config) tuple representing
a 2D PreviewSignal connects using the 2D path and updates correctly.
Ensure that calling .image() with a 2D PreviewSignal connects using the 2D path
and updates correctly.
"""
import numpy as np
view = create_widget(qtbot, Image, client=mocked_client)
signal_config = {
"obj_name": "eiger_img2d",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 2}},
}
_set_signal_config(
mocked_client,
"eiger",
"img2d",
signal_class="PreviewSignal",
ndim=2,
obj_name="eiger_img2d",
)
# Set the image monitor to the preview signal
view.image(monitor=("eiger", "img2d", signal_config))
view.image(device_name="eiger", device_entry="img2d")
# Subscriptions should indicate 2D preview connection
sub = view.subscriptions["main"]
assert sub.source == "device_monitor_2d"
assert sub.monitor_type == "2d"
assert sub.monitor == ("eiger", "img2d", signal_config)
assert view.device_name == "eiger"
assert view.device_entry == "img2d"
# Simulate a 2D image update
test_data = np.arange(16, dtype=float).reshape(4, 4)
@@ -178,38 +202,197 @@ def test_image_setup_preview_signal_2d(qtbot, mocked_client, monkeypatch):
np.testing.assert_array_equal(view.main_image.image, test_data)
def test_preview_signals_skip_0d_entries(qtbot, mocked_client, monkeypatch):
"""
Preview/async combobox should omit 0D signals.
"""
view = create_widget(qtbot, Image, client=mocked_client)
def fake_get(signal_class_filter):
signal_classes = (
signal_class_filter
if isinstance(signal_class_filter, (list, tuple, set))
else [signal_class_filter]
)
if "PreviewSignal" in signal_classes:
return [
(
"eiger",
"sig0d",
{
"obj_name": "sig0d",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 0}},
},
),
(
"eiger",
"sig2d",
{
"obj_name": "sig2d",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 2}},
},
),
]
return []
monkeypatch.setattr(view.client.device_manager, "get_bec_signals", fake_get)
device_selection = view.toolbar.components.get_action("device_selection").widget
device_selection.signal_combo_box.set_device("eiger")
device_selection.signal_combo_box.update_signals_from_signal_classes()
texts = [
device_selection.signal_combo_box.itemText(i)
for i in range(device_selection.signal_combo_box.count())
]
assert "sig0d" not in texts
assert "sig2d" in texts
def test_image_async_signal_uses_obj_name(qtbot, mocked_client, monkeypatch):
"""
Verify async signals use obj_name for endpoints/payloads and reconnect with scan_id.
"""
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(
mocked_client, "eiger", "img", signal_class="AsyncSignal", ndim=1, obj_name="async_obj"
)
view.image(device_name="eiger", device_entry="img")
assert view.subscriptions["main"].async_signal_name == "async_obj"
assert view.async_update is True
# Prepare scan ids and capture dispatcher calls
view.old_scan_id = "old_scan"
view.scan_id = "new_scan"
connected = []
disconnected = []
monkeypatch.setattr(
view.bec_dispatcher,
"connect_slot",
lambda slot, endpoint, from_start=False, cb_info=None: connected.append(
(slot, endpoint, from_start, cb_info)
),
)
monkeypatch.setattr(
view.bec_dispatcher,
"disconnect_slot",
lambda slot, endpoint: disconnected.append((slot, endpoint)),
)
view._setup_async_image(view.scan_id)
expected_new = MessageEndpoints.device_async_signal("new_scan", "eiger", "async_obj")
expected_old = MessageEndpoints.device_async_signal("old_scan", "eiger", "async_obj")
assert any(ep == expected_new for _, ep, _, _ in connected)
assert any(ep == expected_old for _, ep in disconnected)
# Payload extraction should use obj_name
payload = np.array([1, 2, 3])
msg = {"signals": {"async_obj": {"value": payload}}}
assert np.array_equal(view._get_payload_data(msg), payload)
def test_disconnect_clears_async_state(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(
mocked_client, "eiger", "img", signal_class="AsyncSignal", ndim=2, obj_name="async_obj"
)
view.image(device_name="eiger", device_entry="img")
view.scan_id = "scan_x"
view.old_scan_id = "scan_y"
view.subscriptions["main"].async_signal_name = "async_obj"
# Avoid touching real dispatcher
monkeypatch.setattr(view.bec_dispatcher, "disconnect_slot", lambda *args, **kwargs: None)
view.disconnect_monitor(device_name="eiger", device_entry="img")
assert view.subscriptions["main"].async_signal_name is None
assert view.async_update is False
##############################################
# Device monitor endpoint update mechanism
# Connection guardrails
def test_image_setup_image_2d(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.image(monitor="eiger", monitor_type="2d")
assert bec_image_view.monitor == "eiger"
assert bec_image_view.subscriptions["main"].source == "device_monitor_2d"
assert bec_image_view.subscriptions["main"].monitor_type == "2d"
assert bec_image_view.main_image.raw_data is None
assert bec_image_view.main_image.image is None
def test_image_setup_rejects_unsupported_signal_class(qtbot, mocked_client):
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(mocked_client, "eiger", "img", signal_class="Signal", ndim=2)
view.image(device_name="eiger", device_entry="img")
assert view.subscriptions["main"].source is None
assert view.subscriptions["main"].monitor_type is None
assert view.async_update is False
def test_image_setup_image_1d(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.image(monitor="eiger", monitor_type="1d")
assert bec_image_view.monitor == "eiger"
assert bec_image_view.subscriptions["main"].source == "device_monitor_1d"
assert bec_image_view.subscriptions["main"].monitor_type == "1d"
assert bec_image_view.main_image.raw_data is None
assert bec_image_view.main_image.image is None
def test_image_disconnects_with_missing_entry(qtbot, mocked_client):
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(mocked_client, "eiger", "img", signal_class="PreviewSignal", ndim=2)
view.image(device_name="eiger", device_entry="img")
assert view.device_name == "eiger"
assert view.device_entry == "img"
view.image(device_name="eiger", device_entry=None)
assert view.device_name == ""
assert view.device_entry == ""
def test_image_setup_image_auto(qtbot, mocked_client):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.image(monitor="eiger", monitor_type="auto")
assert bec_image_view.monitor == "eiger"
assert bec_image_view.subscriptions["main"].source == "auto"
assert bec_image_view.subscriptions["main"].monitor_type == "auto"
assert bec_image_view.main_image.raw_data is None
assert bec_image_view.main_image.image is None
def test_handle_scan_change_clears_buffers_and_resets_crosshair(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
view.scan_id = "scan_1"
view.main_image.buffer = [np.array([1.0, 2.0])]
view.main_image.max_len = 2
clear_called = []
monkeypatch.setattr(view.main_image, "clear", lambda: clear_called.append(True))
reset_called = []
if view.crosshair is not None:
monkeypatch.setattr(view.crosshair, "reset", lambda: reset_called.append(True))
view._handle_scan_change("scan_2")
assert view.old_scan_id == "scan_1"
assert view.scan_id == "scan_2"
assert clear_called == [True]
assert view.main_image.buffer == []
assert view.main_image.max_len == 0
if view.crosshair is not None:
assert reset_called == [True]
def test_handle_scan_change_reconnects_async(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
view.scan_id = "scan_1"
view.async_update = True
called = []
monkeypatch.setattr(view, "_setup_async_image", lambda scan_id: called.append(scan_id))
view._handle_scan_change("scan_2")
assert called == ["scan_2"]
def test_handle_scan_change_same_scan_noop(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
view.scan_id = "scan_1"
view.main_image.buffer = [np.array([1.0])]
view.main_image.max_len = 1
clear_called = []
monkeypatch.setattr(view.main_image, "clear", lambda: clear_called.append(True))
view._handle_scan_change("scan_1")
assert view.scan_id == "scan_1"
assert clear_called == []
assert view.main_image.buffer == [np.array([1.0])]
assert view.main_image.max_len == 1
def test_image_data_update_2d(qtbot, mocked_client):
@@ -245,8 +428,7 @@ def test_toolbar_actions_presence(qtbot, mocked_client):
assert bec_image_view.toolbar.components.exists("image_autorange")
assert bec_image_view.toolbar.components.exists("lock_aspect_ratio")
assert bec_image_view.toolbar.components.exists("image_processing_fft")
assert bec_image_view.toolbar.components.exists("image_device_combo")
assert bec_image_view.toolbar.components.exists("image_dim_combo")
assert bec_image_view.toolbar.components.exists("device_selection")
def test_auto_emit_syncs_image_toolbar_actions(qtbot, mocked_client):
@@ -327,13 +509,40 @@ def test_setting_vrange_with_colorbar(qtbot, mocked_client, colorbar_type):
###################################
def test_setup_image_from_toolbar(qtbot, mocked_client):
def test_setup_image_from_toolbar(qtbot, mocked_client, monkeypatch):
bec_image_view = create_widget(qtbot, Image, client=mocked_client)
bec_image_view.device_combo_box.setCurrentText("eiger")
bec_image_view.dim_combo_box.setCurrentText("2d")
_set_signal_config(mocked_client, "eiger", "img", signal_class="PreviewSignal", ndim=2)
monkeypatch.setattr(
mocked_client.device_manager,
"get_bec_signals",
lambda signal_class_filter: (
[
(
"eiger",
"img",
{
"obj_name": "img",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 2}},
},
)
]
if "PreviewSignal" in (signal_class_filter or [])
else []
),
)
assert bec_image_view.monitor == "eiger"
device_selection = bec_image_view.toolbar.components.get_action("device_selection").widget
device_selection.device_combo_box.update_devices_from_filters()
device_selection.device_combo_box.setCurrentText("eiger")
device_selection.signal_combo_box.setCurrentText("img")
bec_image_view.on_device_selection_changed(None)
qtbot.wait(200)
assert bec_image_view.device_name == "eiger"
assert bec_image_view.device_entry == "img"
assert bec_image_view.subscriptions["main"].source == "device_monitor_2d"
assert bec_image_view.subscriptions["main"].monitor_type == "2d"
assert bec_image_view.main_image.raw_data is None
@@ -598,90 +807,59 @@ def test_roi_plot_data_from_image(qtbot, mocked_client):
##############################################
# MonitorSelectionToolbarBundle specific tests
# Device selection toolbar sync
##############################################
def test_monitor_selection_reverse_device_items(qtbot, mocked_client):
"""
Verify that _reverse_device_items correctly reverses the order of items in the
device combobox while preserving the current selection.
"""
def test_device_selection_syncs_from_properties(qtbot, mocked_client, monkeypatch):
view = create_widget(qtbot, Image, client=mocked_client)
combo = view.device_combo_box
# Replace existing items with a deterministic list
combo.clear()
combo.addItem("samx", 1)
combo.addItem("samy", 2)
combo.addItem("samz", 3)
combo.setCurrentText("samy")
# Reverse the items
view._reverse_device_items()
# Order should be reversed and selection preserved
assert [combo.itemText(i) for i in range(combo.count())] == ["samz", "samy", "samx"]
assert combo.currentText() == "samy"
def test_monitor_selection_populate_preview_signals(qtbot, mocked_client, monkeypatch):
"""
Verify that _populate_preview_signals adds previewsignal devices to the combobox
with the correct userData.
"""
view = create_widget(qtbot, Image, client=mocked_client)
# Provide a deterministic fake device_manager with get_bec_signals
class _FakeDM:
def get_bec_signals(self, _filter):
return [
("eiger", "img", {"obj_name": "eiger_img"}),
("async_device", "img2", {"obj_name": "async_device_img2"}),
_set_signal_config(mocked_client, "eiger", "img2d", signal_class="PreviewSignal", ndim=2)
monkeypatch.setattr(
view.client.device_manager,
"get_bec_signals",
lambda signal_class_filter: (
[
(
"eiger",
"img2d",
{
"obj_name": "img2d",
"signal_class": "PreviewSignal",
"describe": {"signal_info": {"ndim": 2}},
},
)
]
if "PreviewSignal" in (signal_class_filter or [])
else []
),
)
monkeypatch.setattr(view.client, "device_manager", _FakeDM())
view.device_name = "eiger"
view.device_entry = "img2d"
initial_count = view.device_combo_box.count()
qtbot.wait(200) # Allow signal processing
view._populate_preview_signals()
# Two new entries should have been added
assert view.device_combo_box.count() == initial_count + 2
# The first newly added item should carry tuple userData describing the device/signal
data = view.device_combo_box.itemData(initial_count)
assert isinstance(data, tuple) and data[0] == "eiger"
device_selection = view.toolbar.components.get_action("device_selection").widget
qtbot.waitUntil(
lambda: device_selection.device_combo_box.currentText() == "eiger"
and device_selection.signal_combo_box.currentText() == "img2d",
timeout=1000,
)
def test_monitor_selection_adjust_and_connect(qtbot, mocked_client, monkeypatch):
"""
Verify that _adjust_and_connect performs the full set-up:
- fills the combobox with preview signals,
- reverses their order,
- and resets the currentText to an empty string.
"""
def test_device_entry_syncs_from_toolbar(qtbot, mocked_client):
view = create_widget(qtbot, Image, client=mocked_client)
_set_signal_config(mocked_client, "eiger", "img_a", signal_class="PreviewSignal", ndim=2)
_set_signal_config(mocked_client, "eiger", "img_b", signal_class="PreviewSignal", ndim=2)
# Deterministic fake device_manager
class _FakeDM:
def get_bec_signals(self, _filter):
return [("eiger", "img", {"obj_name": "eiger_img"})]
view.device_name = "eiger"
view.device_entry = "img_a"
monkeypatch.setattr(view.client, "device_manager", _FakeDM())
device_selection = view.toolbar.components.get_action("device_selection").widget
device_selection.signal_combo_box.blockSignals(True)
device_selection.signal_combo_box.setCurrentText("img_b")
device_selection.signal_combo_box.blockSignals(False)
combo = view.device_combo_box
# Start from a clean state
combo.clear()
combo.addItem("", None)
combo.setCurrentText("")
view._sync_device_entry_from_toolbar()
# Execute the method under test
view._adjust_and_connect()
# Expect exactly two items: preview label followed by the empty default
assert combo.count() == 2
# Because of the reversal, the preview label comes first
assert combo.itemText(0) == "eiger_img"
# Current selection remains empty
assert combo.currentText() == ""
assert view.device_entry == "img_b"