mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-13 19:21:50 +02:00
refactor: renamed scanID to scan_id
This commit is contained in:
@ -1,8 +1,9 @@
|
||||
# This file was automatically generated by generate_cli.py
|
||||
|
||||
from bec_widgets.cli.client_utils import rpc_call, RPCBase, BECFigureClientMixin
|
||||
from typing import Literal, Optional, overload
|
||||
|
||||
from bec_widgets.cli.client_utils import BECFigureClientMixin, RPCBase, rpc_call
|
||||
|
||||
|
||||
class BECPlotBase(RPCBase):
|
||||
@rpc_call
|
||||
@ -194,12 +195,12 @@ class BECWaveform1D(RPCBase):
|
||||
"""
|
||||
|
||||
@rpc_call
|
||||
def scan_history(self, scan_index: "int" = None, scanID: "str" = None):
|
||||
def scan_history(self, scan_index: "int" = None, scan_id: "str" = None):
|
||||
"""
|
||||
Update the scan curves with the data from the scan storage.
|
||||
Provide only one of scanID or scan_index.
|
||||
Provide only one of scan_id or scan_index.
|
||||
Args:
|
||||
scanID(str, optional): ScanID of the scan to be updated. Defaults to None.
|
||||
scan_id(str, optional): ScanID of the scan to be updated. Defaults to None.
|
||||
scan_index(int, optional): Index of the scan to be updated. Defaults to None.
|
||||
"""
|
||||
|
||||
|
@ -8,12 +8,12 @@ import uuid
|
||||
from functools import wraps
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from qtpy.QtCore import QCoreApplication
|
||||
|
||||
import bec_widgets.cli.client as client
|
||||
from bec_lib import MessageEndpoints, messages
|
||||
from bec_lib.connector import MessageObject
|
||||
from bec_lib.device import DeviceBase
|
||||
from qtpy.QtCore import QCoreApplication
|
||||
|
||||
import bec_widgets.cli.client as client
|
||||
from bec_widgets.utils.bec_dispatcher import BECDispatcher
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@ -59,7 +59,7 @@ def update_script(figure: BECFigure, msg):
|
||||
"""
|
||||
info = msg.info
|
||||
status = msg.status
|
||||
scan_id = msg.scanID
|
||||
scan_id = msg.scan_id
|
||||
scan_number = info.get("scan_number", 0)
|
||||
scan_name = info.get("scan_name", "Unknown")
|
||||
scan_report_devices = info.get("scan_report_devices", [])
|
||||
|
@ -1,19 +1,15 @@
|
||||
# import simulation_progress as SP
|
||||
import numpy as np
|
||||
import pyqtgraph as pg
|
||||
from qtpy.QtCore import Signal as pyqtSignal, Slot as pyqtSlot
|
||||
from qtpy.QtWidgets import (
|
||||
QApplication,
|
||||
QVBoxLayout,
|
||||
QWidget,
|
||||
)
|
||||
|
||||
from bec_lib import MessageEndpoints, messages
|
||||
from qtpy.QtCore import Signal as pyqtSignal
|
||||
from qtpy.QtCore import Slot as pyqtSlot
|
||||
from qtpy.QtWidgets import QApplication, QVBoxLayout, QWidget
|
||||
|
||||
|
||||
class StreamApp(QWidget):
|
||||
update_signal = pyqtSignal()
|
||||
new_scanID = pyqtSignal(str)
|
||||
new_scan_id = pyqtSignal(str)
|
||||
|
||||
def __init__(self, device, sub_device):
|
||||
super().__init__()
|
||||
@ -23,7 +19,7 @@ class StreamApp(QWidget):
|
||||
self.setWindowTitle("MCA readout")
|
||||
|
||||
self.data = None
|
||||
self.scanID = None
|
||||
self.scan_id = None
|
||||
self.stream_consumer = None
|
||||
|
||||
self.device = device
|
||||
@ -33,7 +29,7 @@ class StreamApp(QWidget):
|
||||
|
||||
# self.start_device_consumer(self.device) # for simulation
|
||||
|
||||
self.new_scanID.connect(self.create_new_stream_consumer)
|
||||
self.new_scan_id.connect(self.create_new_stream_consumer)
|
||||
self.update_signal.connect(self.plot_new)
|
||||
|
||||
def init_ui(self):
|
||||
@ -64,17 +60,17 @@ class StreamApp(QWidget):
|
||||
# self.glw.addItem(self.hist)
|
||||
|
||||
@pyqtSlot(str)
|
||||
def create_new_stream_consumer(self, scanID: str):
|
||||
print(f"Creating new stream consumer for scanID: {scanID}")
|
||||
def create_new_stream_consumer(self, scan_id: str):
|
||||
print(f"Creating new stream consumer for scan_id: {scan_id}")
|
||||
|
||||
self.connect_stream_consumer(scanID, self.device)
|
||||
self.connect_stream_consumer(scan_id, self.device)
|
||||
|
||||
def connect_stream_consumer(self, scanID, device):
|
||||
def connect_stream_consumer(self, scan_id, device):
|
||||
if self.stream_consumer is not None:
|
||||
self.stream_consumer.shutdown()
|
||||
|
||||
self.stream_consumer = connector.stream_consumer(
|
||||
topics=MessageEndpoints.device_async_readback(scanID=scanID, device=device),
|
||||
topics=MessageEndpoints.device_async_readback(scan_id=scan_id, device=device),
|
||||
cb=self._streamer_cb,
|
||||
parent=self,
|
||||
)
|
||||
@ -125,24 +121,25 @@ class StreamApp(QWidget):
|
||||
|
||||
msgDEV = msg.value
|
||||
|
||||
current_scanID = msgDEV.content["scanID"]
|
||||
current_scan_id = msgDEV.content["scan_id"]
|
||||
|
||||
if parent.scanID is None:
|
||||
parent.scanID = current_scanID
|
||||
parent.new_scanID.emit(current_scanID)
|
||||
print(f"New scanID: {current_scanID}")
|
||||
if parent.scan_id is None:
|
||||
parent.scan_id = current_scan_id
|
||||
parent.new_scan_id.emit(current_scan_id)
|
||||
print(f"New scan_id: {current_scan_id}")
|
||||
|
||||
if current_scanID != parent.scanID:
|
||||
parent.scanID = current_scanID
|
||||
if current_scan_id != parent.scan_id:
|
||||
parent.scan_id = current_scan_id
|
||||
# parent.data = None
|
||||
# parent.imageItem.clear()
|
||||
parent.new_scanID.emit(current_scanID)
|
||||
parent.new_scan_id.emit(current_scan_id)
|
||||
|
||||
print(f"New scanID: {current_scanID}")
|
||||
print(f"New scan_id: {current_scan_id}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
|
||||
from bec_lib import RedisConnector
|
||||
|
||||
parser = argparse.ArgumentParser(description="Stream App.")
|
||||
|
@ -1,31 +1,26 @@
|
||||
from bec_lib import messages, MessageEndpoints, RedisConnector
|
||||
import time
|
||||
|
||||
from bec_lib import MessageEndpoints, RedisConnector, messages
|
||||
|
||||
connector = RedisConnector("localhost:6379")
|
||||
producer = connector.producer()
|
||||
metadata = {}
|
||||
|
||||
scanID = "ScanID1"
|
||||
scan_id = "ScanID1"
|
||||
|
||||
metadata.update(
|
||||
{
|
||||
"scanID": scanID, # this will be different for each scan
|
||||
"async_update": "append",
|
||||
}
|
||||
{"scan_id": scan_id, "async_update": "append"} # this will be different for each scan
|
||||
)
|
||||
for ii in range(20):
|
||||
data = {"mca1": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], "mca2": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]}
|
||||
msg = messages.DeviceMessage(
|
||||
signals=data,
|
||||
metadata=metadata,
|
||||
).dumps()
|
||||
msg = messages.DeviceMessage(signals=data, metadata=metadata).dumps()
|
||||
|
||||
# producer.send(topic=MessageEndpoints.device_status(device="mca"), msg=msg)
|
||||
|
||||
producer.xadd(
|
||||
topic=MessageEndpoints.device_async_readback(
|
||||
scanID=scanID, device="mca"
|
||||
), # scanID will be different for each scan
|
||||
scan_id=scan_id, device="mca"
|
||||
), # scan_id will be different for each scan
|
||||
msg={"data": msg}, # TODO should be msg_dict
|
||||
expire=1800,
|
||||
)
|
||||
|
@ -883,7 +883,7 @@
|
||||
</column>
|
||||
<column>
|
||||
<property name="text">
|
||||
<string>scanID</string>
|
||||
<string>scan_id</string>
|
||||
</property>
|
||||
</column>
|
||||
<column>
|
||||
|
@ -1,6 +1,6 @@
|
||||
from typing import Optional, Union, Literal
|
||||
from typing import Literal, Optional, Union
|
||||
|
||||
from pydantic import BaseModel, Field, field_validator, model_validator, ValidationError
|
||||
from pydantic import BaseModel, Field, ValidationError, field_validator, model_validator
|
||||
from pydantic_core import PydanticCustomError
|
||||
|
||||
|
||||
@ -92,12 +92,12 @@ class SourceHistoryValidator(BaseModel):
|
||||
"""History source validator
|
||||
Attributes:
|
||||
type (str): type of source - history
|
||||
scanID (str): Scan ID for history source.
|
||||
scan_id (str): Scan ID for history source.
|
||||
signals (list): Signal for the source.
|
||||
"""
|
||||
|
||||
type: Literal["history"]
|
||||
scanID: str # TODO can be validated if it is a valid scanID
|
||||
scan_id: str # TODO can be validated if it is a valid scan_id
|
||||
signals: AxisSignal
|
||||
|
||||
|
||||
@ -131,12 +131,12 @@ class Source(BaseModel): # TODO decide if it should stay for general Source val
|
||||
General source validation, includes all Optional arguments of all other sources.
|
||||
Attributes:
|
||||
type (list): type of source (scan_segment, history)
|
||||
scanID (Optional[str]): Scan ID for history source.
|
||||
scan_id (Optional[str]): Scan ID for history source.
|
||||
signals (Optional[AxisSignal]): Signal for the source.
|
||||
"""
|
||||
|
||||
type: Literal["scan_segment", "history", "redis"]
|
||||
scanID: Optional[str] = None
|
||||
scan_id: Optional[str] = None
|
||||
signals: Optional[dict] = None
|
||||
|
||||
|
||||
|
@ -11,8 +11,8 @@ from qtpy.QtCore import Slot as pyqtSlot
|
||||
from qtpy.QtWidgets import QApplication, QMessageBox
|
||||
|
||||
from bec_widgets.utils import Colors, Crosshair, yaml_dialog
|
||||
from bec_widgets.validation import MonitorConfigValidator
|
||||
from bec_widgets.utils.bec_dispatcher import BECDispatcher
|
||||
from bec_widgets.validation import MonitorConfigValidator
|
||||
|
||||
# just for demonstration purposes if script run directly
|
||||
CONFIG_SCAN_MODE = {
|
||||
@ -59,10 +59,7 @@ CONFIG_SCAN_MODE = {
|
||||
"sources": [
|
||||
{
|
||||
"type": "scan_segment",
|
||||
"signals": {
|
||||
"x": [{"name": "samy"}],
|
||||
"y": [{"name": "bpm4i"}],
|
||||
},
|
||||
"signals": {"x": [{"name": "samy"}], "y": [{"name": "bpm4i"}]},
|
||||
}
|
||||
],
|
||||
},
|
||||
@ -137,7 +134,7 @@ CONFIG_WRONG = {
|
||||
},
|
||||
{
|
||||
"type": "history",
|
||||
"scanID": "<scanID>",
|
||||
"scan_id": "<scan_id>",
|
||||
"signals": {
|
||||
"x": [{"name": "samy"}],
|
||||
"y": [{"name": "bpm4i", "entry": "bpm4i"}],
|
||||
@ -170,11 +167,8 @@ CONFIG_WRONG = {
|
||||
{
|
||||
"signals": {
|
||||
"x": [{"name": "samx", "entry": "samx"}],
|
||||
"y": [
|
||||
{"name": "samx"},
|
||||
{"name": "samy", "entry": "samx"},
|
||||
],
|
||||
},
|
||||
"y": [{"name": "samx"}, {"name": "samy", "entry": "samx"}],
|
||||
}
|
||||
}
|
||||
],
|
||||
},
|
||||
@ -315,7 +309,7 @@ class BECMonitor(pg.GraphicsLayoutWidget):
|
||||
self.plots = None
|
||||
self.curves_data = None
|
||||
self.grid_coordinates = None
|
||||
self.scanID = None
|
||||
self.scan_id = None
|
||||
|
||||
# TODO make colors accessible to users
|
||||
self.user_colors = {} # key: (plot_name, y_name, y_entry), value: color
|
||||
@ -352,7 +346,7 @@ class BECMonitor(pg.GraphicsLayoutWidget):
|
||||
# Initialize the UI
|
||||
self._init_ui(self.plot_settings["num_columns"])
|
||||
|
||||
if self.scanID is not None:
|
||||
if self.scan_id is not None:
|
||||
self.replot_last_scan()
|
||||
|
||||
def _init_database(self, plot_data_config: dict, source_type_to_init=None) -> dict:
|
||||
@ -729,11 +723,11 @@ class BECMonitor(pg.GraphicsLayoutWidget):
|
||||
msg (dict): Message received with scan data.
|
||||
metadata (dict): Metadata of the scan.
|
||||
"""
|
||||
current_scanID = msg.get("scanID", None)
|
||||
if current_scanID is None:
|
||||
current_scan_id = msg.get("scan_id", None)
|
||||
if current_scan_id is None:
|
||||
return
|
||||
|
||||
if current_scanID != self.scanID:
|
||||
if current_scan_id != self.scan_id:
|
||||
if self.scan_types is False:
|
||||
self.plot_data = self.plot_data_config
|
||||
elif self.scan_types is True:
|
||||
@ -753,10 +747,10 @@ class BECMonitor(pg.GraphicsLayoutWidget):
|
||||
# Init UI
|
||||
self._init_ui(self.plot_settings["num_columns"])
|
||||
|
||||
self.scanID = current_scanID
|
||||
self.scan_data = self.queue.scan_storage.find_scan_by_ID(self.scanID)
|
||||
self.scan_id = current_scan_id
|
||||
self.scan_data = self.queue.scan_storage.find_scan_by_ID(self.scan_id)
|
||||
if not self.scan_data:
|
||||
print(f"No data found for scanID: {self.scanID}") # TODO better error
|
||||
print(f"No data found for scan_id: {self.scan_id}") # TODO better error
|
||||
return
|
||||
self.flush(source_type_to_flush="scan_segment")
|
||||
|
||||
@ -766,7 +760,7 @@ class BECMonitor(pg.GraphicsLayoutWidget):
|
||||
|
||||
def scan_segment_update(self):
|
||||
"""
|
||||
Update the database with data from scan storage based on the provided scanID.
|
||||
Update the database with data from scan storage based on the provided scan_id.
|
||||
"""
|
||||
scan_data = self.scan_data.data
|
||||
for device_name, device_entries in self.database.get("scan_segment", {}).items():
|
||||
@ -840,11 +834,7 @@ if __name__ == "__main__": # pragma: no cover
|
||||
client = BECDispatcher().client
|
||||
client.start()
|
||||
app = QApplication(sys.argv)
|
||||
monitor = BECMonitor(
|
||||
config=config,
|
||||
gui_id=args.id,
|
||||
skip_validation=False,
|
||||
)
|
||||
monitor = BECMonitor(config=config, gui_id=args.id, skip_validation=False)
|
||||
monitor.show()
|
||||
# just to test redis data
|
||||
# redis_data = {
|
||||
|
@ -4,20 +4,16 @@ from collections import defaultdict
|
||||
|
||||
import numpy as np
|
||||
import pyqtgraph as pg
|
||||
from bec_lib import MessageEndpoints
|
||||
from qtpy.QtCore import Signal as pyqtSignal
|
||||
from qtpy.QtCore import Slot as pyqtSlot
|
||||
from qtpy.QtWidgets import QApplication
|
||||
from qtpy.QtWidgets import QVBoxLayout, QWidget
|
||||
from qtpy.QtWidgets import QApplication, QVBoxLayout, QWidget
|
||||
|
||||
from bec_lib import MessageEndpoints
|
||||
from bec_widgets.utils import yaml_dialog
|
||||
from bec_widgets.utils.bec_dispatcher import BECDispatcher
|
||||
|
||||
CONFIG_DEFAULT = {
|
||||
"plot_settings": {
|
||||
"colormap": "CET-L4",
|
||||
"num_columns": 1,
|
||||
},
|
||||
"plot_settings": {"colormap": "CET-L4", "num_columns": 1},
|
||||
"waveform2D": [
|
||||
{
|
||||
"plot_name": "Waveform 2D Scatter (1)",
|
||||
@ -97,7 +93,7 @@ class BECMonitor2DScatter(QWidget):
|
||||
self.plots = None
|
||||
self.curves_data = None
|
||||
self.grid_coordinates = None
|
||||
self.scanID = None
|
||||
self.scan_id = None
|
||||
|
||||
# Connect the update signal to the update plot method
|
||||
self.proxy_update_plot = pg.SignalProxy(
|
||||
@ -275,15 +271,15 @@ class BECMonitor2DScatter(QWidget):
|
||||
"""
|
||||
|
||||
# TODO check if this is correct
|
||||
current_scanID = msg.get("scanID", None)
|
||||
if current_scanID is None:
|
||||
current_scan_id = msg.get("scan_id", None)
|
||||
if current_scan_id is None:
|
||||
return
|
||||
|
||||
if current_scanID != self.scanID:
|
||||
self.scanID = current_scanID
|
||||
self.scan_data = self.queue.scan_storage.find_scan_by_ID(self.scanID)
|
||||
if current_scan_id != self.scan_id:
|
||||
self.scan_id = current_scan_id
|
||||
self.scan_data = self.queue.scan_storage.find_scan_by_ID(self.scan_id)
|
||||
if not self.scan_data:
|
||||
print(f"No data found for scanID: {self.scanID}") # TODO better error
|
||||
print(f"No data found for scan_id: {self.scan_id}") # TODO better error
|
||||
return
|
||||
self.flush()
|
||||
|
||||
@ -373,10 +369,6 @@ if __name__ == "__main__": # pragma: no cover
|
||||
client = BECDispatcher().client
|
||||
client.start()
|
||||
app = QApplication(sys.argv)
|
||||
monitor = BECMonitor2DScatter(
|
||||
config=config,
|
||||
gui_id=args.id,
|
||||
skip_validation=True,
|
||||
)
|
||||
monitor = BECMonitor2DScatter(config=config, gui_id=args.id, skip_validation=True)
|
||||
monitor.show()
|
||||
sys.exit(app.exec())
|
||||
|
@ -1,20 +1,20 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import defaultdict
|
||||
from typing import Literal, Optional, Any
|
||||
from typing import Any, Literal, Optional
|
||||
|
||||
import numpy as np
|
||||
import pyqtgraph as pg
|
||||
from pydantic import Field, BaseModel, ValidationError
|
||||
from bec_lib import MessageEndpoints
|
||||
from bec_lib.scan_data import ScanData
|
||||
from pydantic import BaseModel, Field, ValidationError
|
||||
from pyqtgraph import mkBrush
|
||||
from qtpy import QtCore
|
||||
from qtpy.QtCore import Signal as pyqtSignal
|
||||
from qtpy.QtCore import Slot as pyqtSlot
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
from bec_lib import MessageEndpoints
|
||||
from bec_lib.scan_data import ScanData
|
||||
from bec_widgets.utils import Colors, ConnectionConfig, BECConnector, EntryValidator
|
||||
from bec_widgets.utils import BECConnector, Colors, ConnectionConfig, EntryValidator
|
||||
from bec_widgets.widgets.plots import BECPlotBase, WidgetConfig
|
||||
|
||||
|
||||
@ -254,7 +254,7 @@ class BECWaveform1D(BECPlotBase):
|
||||
)
|
||||
|
||||
self._curves_data = defaultdict(dict)
|
||||
self.scanID = None
|
||||
self.scan_id = None
|
||||
|
||||
# Scan segment update proxy
|
||||
self.proxy_update_plot = pg.SignalProxy(
|
||||
@ -630,14 +630,14 @@ class BECWaveform1D(BECPlotBase):
|
||||
msg (dict): Message received with scan data.
|
||||
metadata (dict): Metadata of the scan.
|
||||
"""
|
||||
current_scanID = msg.get("scanID", None)
|
||||
if current_scanID is None:
|
||||
current_scan_id = msg.get("scan_id", None)
|
||||
if current_scan_id is None:
|
||||
return
|
||||
|
||||
if current_scanID != self.scanID:
|
||||
self.scanID = current_scanID
|
||||
if current_scan_id != self.scan_id:
|
||||
self.scan_id = current_scan_id
|
||||
self.scan_segment_data = self.queue.scan_storage.find_scan_by_ID(
|
||||
self.scanID
|
||||
self.scan_id
|
||||
) # TODO do scan access through BECFigure
|
||||
|
||||
self.scan_signal_update.emit()
|
||||
@ -667,23 +667,23 @@ class BECWaveform1D(BECPlotBase):
|
||||
|
||||
curve.setData(data_x, data_y)
|
||||
|
||||
def scan_history(self, scan_index: int = None, scanID: str = None):
|
||||
def scan_history(self, scan_index: int = None, scan_id: str = None):
|
||||
"""
|
||||
Update the scan curves with the data from the scan storage.
|
||||
Provide only one of scanID or scan_index.
|
||||
Provide only one of scan_id or scan_index.
|
||||
Args:
|
||||
scanID(str, optional): ScanID of the scan to be updated. Defaults to None.
|
||||
scan_id(str, optional): ScanID of the scan to be updated. Defaults to None.
|
||||
scan_index(int, optional): Index of the scan to be updated. Defaults to None.
|
||||
"""
|
||||
if scan_index is not None and scanID is not None:
|
||||
raise ValueError("Only one of scanID or scan_index can be provided.")
|
||||
if scan_index is not None and scan_id is not None:
|
||||
raise ValueError("Only one of scan_id or scan_index can be provided.")
|
||||
|
||||
if scan_index is not None:
|
||||
self.scanID = self.queue.scan_storage.storage[scan_index].scanID
|
||||
data = self.queue.scan_storage.find_scan_by_ID(self.scanID).data
|
||||
elif scanID is not None:
|
||||
self.scanID = scanID
|
||||
data = self.queue.scan_storage.find_scan_by_ID(self.scanID).data
|
||||
self.scan_id = self.queue.scan_storage.storage[scan_index].scan_id
|
||||
data = self.queue.scan_storage.find_scan_by_ID(self.scan_id).data
|
||||
elif scan_id is not None:
|
||||
self.scan_id = scan_id
|
||||
data = self.queue.scan_storage.find_scan_by_ID(self.scan_id).data
|
||||
|
||||
self._update_scan_curves(data)
|
||||
|
||||
|
@ -2,11 +2,10 @@
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
from bec_lib.messages import ScanMessage
|
||||
from bec_lib.connector import MessageObject
|
||||
from bec_lib.messages import ScanMessage
|
||||
|
||||
|
||||
msg = MessageObject(topic="", value=ScanMessage(point_id=0, scanID=0, data={}))
|
||||
msg = MessageObject(topic="", value=ScanMessage(point_id=0, scan_id=0, data={}))
|
||||
|
||||
|
||||
@pytest.fixture(name="consumer")
|
||||
@ -206,7 +205,7 @@ def test_connect_one_slot_multiple_topics_single_callback(bec_dispatcher, consum
|
||||
# Simulate messages being published on each topic
|
||||
for topic in topics:
|
||||
msg_with_topic = MessageObject(
|
||||
topic=topic, value=ScanMessage(point_id=0, scanID=0, data={})
|
||||
topic=topic, value=ScanMessage(point_id=0, scan_id=0, data={})
|
||||
)
|
||||
consumer.register.call_args.kwargs["cb"](msg_with_topic)
|
||||
|
||||
|
@ -1,9 +1,9 @@
|
||||
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
|
||||
import os
|
||||
import yaml
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
import yaml
|
||||
|
||||
from bec_widgets.widgets import BECMonitor
|
||||
|
||||
@ -126,12 +126,7 @@ def test_on_config_update(monitor, config_initial, config_update):
|
||||
@pytest.mark.parametrize(
|
||||
"config_name, expected_num_columns, expected_plot_names, expected_coordinates",
|
||||
[
|
||||
(
|
||||
"config_device",
|
||||
1,
|
||||
["BPM4i plots vs samx", "Gauss plots vs samx"],
|
||||
[(0, 0), (1, 0)],
|
||||
),
|
||||
("config_device", 1, ["BPM4i plots vs samx", "Gauss plots vs samx"], [(0, 0), (1, 0)]),
|
||||
(
|
||||
"config_scan",
|
||||
3,
|
||||
@ -186,7 +181,7 @@ msg_1 = {
|
||||
"gauss_adc1": {"gauss_adc1": {"value": 8}},
|
||||
"gauss_adc2": {"gauss_adc2": {"value": 9}},
|
||||
},
|
||||
"scanID": 1,
|
||||
"scan_id": 1,
|
||||
}
|
||||
metadata_grid = {"scan_name": "grid_scan"}
|
||||
metadata_line = {"scan_name": "line_scan"}
|
||||
@ -195,7 +190,7 @@ metadata_line = {"scan_name": "line_scan"}
|
||||
@pytest.mark.parametrize(
|
||||
"config_name, msg, metadata, expected_data",
|
||||
[
|
||||
# case: msg does not have 'scanID'
|
||||
# case: msg does not have 'scan_id'
|
||||
(
|
||||
"config_device",
|
||||
{"data": {}},
|
||||
|
@ -1,17 +1,14 @@
|
||||
# pylint: disable=missing-module-docstring, missing-function-docstring
|
||||
from collections import defaultdict
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
from qtpy import QtGui
|
||||
|
||||
from bec_widgets.widgets import BECMonitor2DScatter
|
||||
|
||||
CONFIG_DEFAULT = {
|
||||
"plot_settings": {
|
||||
"colormap": "CET-L4",
|
||||
"num_columns": 1,
|
||||
},
|
||||
"plot_settings": {"colormap": "CET-L4", "num_columns": 1},
|
||||
"waveform2D": [
|
||||
{
|
||||
"plot_name": "Waveform 2D Scatter (1)",
|
||||
@ -37,10 +34,7 @@ CONFIG_DEFAULT = {
|
||||
}
|
||||
|
||||
CONFIG_ONE_PLOT = {
|
||||
"plot_settings": {
|
||||
"colormap": "CET-L4",
|
||||
"num_columns": 1,
|
||||
},
|
||||
"plot_settings": {"colormap": "CET-L4", "num_columns": 1},
|
||||
"waveform2D": [
|
||||
{
|
||||
"plot_name": "Waveform 2D Scatter (1)",
|
||||
@ -51,7 +45,7 @@ CONFIG_ONE_PLOT = {
|
||||
"y": [{"name": "aptry", "entry": "aptry"}],
|
||||
"z": [{"name": "gauss_bpm", "entry": "gauss_bpm"}],
|
||||
},
|
||||
},
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
@ -65,13 +59,7 @@ def monitor_2Dscatter(qtbot):
|
||||
yield widget
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"config, number_of_plots",
|
||||
[
|
||||
(CONFIG_DEFAULT, 2),
|
||||
(CONFIG_ONE_PLOT, 1),
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize("config, number_of_plots", [(CONFIG_DEFAULT, 2), (CONFIG_ONE_PLOT, 1)])
|
||||
def test_initialization(monitor_2Dscatter, config, number_of_plots):
|
||||
config_load = config
|
||||
monitor_2Dscatter.on_config_update(config_load)
|
||||
@ -81,13 +69,7 @@ def test_initialization(monitor_2Dscatter, config, number_of_plots):
|
||||
assert len(monitor_2Dscatter.plot_data) == number_of_plots
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"config ",
|
||||
[
|
||||
(CONFIG_DEFAULT),
|
||||
(CONFIG_ONE_PLOT),
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize("config ", [(CONFIG_DEFAULT), (CONFIG_ONE_PLOT)])
|
||||
def test_database_initialization(monitor_2Dscatter, config):
|
||||
monitor_2Dscatter.on_config_update(config)
|
||||
# Check if the database is a defaultdict
|
||||
@ -108,13 +90,7 @@ def test_database_initialization(monitor_2Dscatter, config):
|
||||
assert isinstance(monitor_2Dscatter.database[plot_name][axis][signal_name], list)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"config ",
|
||||
[
|
||||
(CONFIG_DEFAULT),
|
||||
(CONFIG_ONE_PLOT),
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize("config ", [(CONFIG_DEFAULT), (CONFIG_ONE_PLOT)])
|
||||
def test_ui_initialization(monitor_2Dscatter, config):
|
||||
monitor_2Dscatter.on_config_update(config)
|
||||
assert len(monitor_2Dscatter.plots) == len(config["waveform2D"])
|
||||
@ -133,7 +109,7 @@ def simulate_scan_data(monitor, x_value, y_value, z_value):
|
||||
"samy": {"samy": {"value": y_value}},
|
||||
"gauss_bpm": {"gauss_bpm": {"value": z_value}},
|
||||
},
|
||||
"scanID": 1,
|
||||
"scan_id": 1,
|
||||
}
|
||||
monitor.on_scan_segment(msg, {})
|
||||
|
||||
|
@ -1,11 +1,12 @@
|
||||
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
|
||||
import pytest
|
||||
from pydantic import ValidationError
|
||||
|
||||
from bec_widgets.validation.monitor_config_validator import (
|
||||
MonitorConfigValidator,
|
||||
Signal,
|
||||
AxisSignal,
|
||||
MonitorConfigValidator,
|
||||
PlotConfig,
|
||||
Signal,
|
||||
)
|
||||
|
||||
from .test_bec_monitor import mocked_client
|
||||
@ -84,7 +85,7 @@ def test_plot_config_no_source_type_provided(setup_devices):
|
||||
def test_plot_config_history_source_type(setup_devices):
|
||||
history_source = {
|
||||
"type": "history",
|
||||
"scanID": "valid_scan_id",
|
||||
"scan_id": "valid_scan_id",
|
||||
"signals": {"x": [{"name": "samx"}], "y": [{"name": "samx"}]},
|
||||
}
|
||||
|
||||
@ -92,7 +93,7 @@ def test_plot_config_history_source_type(setup_devices):
|
||||
|
||||
assert len(plot_config.sources) == 1
|
||||
assert plot_config.sources[0].type == "history"
|
||||
assert plot_config.sources[0].scanID == "valid_scan_id"
|
||||
assert plot_config.sources[0].scan_id == "valid_scan_id"
|
||||
|
||||
|
||||
def test_plot_config_redis_source_type(setup_devices):
|
||||
|
@ -4,7 +4,8 @@ from unittest.mock import MagicMock
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from bec_widgets.widgets.plots.waveform1d import SignalData, Signal, CurveConfig
|
||||
from bec_widgets.widgets.plots.waveform1d import CurveConfig, Signal, SignalData
|
||||
|
||||
from .client_mocks import mocked_client
|
||||
from .test_bec_figure import bec_figure
|
||||
|
||||
@ -365,7 +366,7 @@ def test_scan_update(bec_figure, qtbot):
|
||||
"gauss_adc1": {"gauss_adc1": {"value": 8}},
|
||||
"gauss_adc2": {"gauss_adc2": {"value": 9}},
|
||||
},
|
||||
"scanID": 1,
|
||||
"scan_id": 1,
|
||||
}
|
||||
# Mock scan_storage.find_scan_by_ID
|
||||
mock_scan_data_waveform = MagicMock()
|
||||
@ -400,8 +401,8 @@ def test_scan_history_with_val_access(bec_figure, qtbot):
|
||||
mock_scan_storage.find_scan_by_ID.return_value = MagicMock(data=mock_scan_data)
|
||||
w1.queue.scan_storage = mock_scan_storage
|
||||
|
||||
fake_scanID = "fake_scanID"
|
||||
w1.scan_history(scanID=fake_scanID)
|
||||
fake_scan_id = "fake_scan_id"
|
||||
w1.scan_history(scan_id=fake_scan_id)
|
||||
|
||||
qtbot.wait(500)
|
||||
|
||||
|
Reference in New Issue
Block a user