0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 11:41:49 +02:00

refactor: fix bec_lib imports

This commit is contained in:
2023-11-13 08:29:07 +01:00
parent b38e942acb
commit 5ec2b08e34
13 changed files with 31 additions and 35 deletions

View File

@ -3,9 +3,8 @@ import itertools
import os
from typing import Callable
from bec_lib import BECClient
from bec_lib.core import BECMessage, ServiceConfig
from bec_lib.core.redis_connector import RedisConsumerThreaded
from bec_lib import BECClient, messages, ServiceConfig
from bec_lib.redis_connector import RedisConsumerThreaded
from PyQt5.QtCore import QObject, pyqtSignal
# Adding a new pyqt signal requres a class factory, as they must be part of the class definition
@ -45,13 +44,13 @@ class _BECDispatcher(QObject):
Args:
slot (Callable): A slot method/function that accepts two inputs: content and metadata of
the corresponding pub/sub message
topic (str): A topic that can typically be acquired via bec_lib.core.MessageEndpoints
topic (str): A topic that can typically be acquired via bec_lib.MessageEndpoints
"""
# create new connection for topic if it doesn't exist
if topic not in self._connections:
def cb(msg):
msg = BECMessage.MessageReader.loads(msg.value)
msg = messages.MessageReader.loads(msg.value)
# TODO: this can could be replaced with a simple
# self._connections[topic].signal.emit(msg.content, msg.metadata)
# once all dispatcher.connect_slot calls are made with a single topic only
@ -76,7 +75,7 @@ class _BECDispatcher(QObject):
Args:
slot (Callable): A slot to be disconnected
topic (str): A corresponding topic that can typically be acquired via
bec_lib.core.MessageEndpoints
bec_lib.MessageEndpoints
"""
if topic not in self._connections:
return

View File

@ -19,7 +19,7 @@ from pyqtgraph import mkBrush, mkPen
from pyqtgraph.Qt import QtCore, uic
from pyqtgraph.Qt import QtWidgets
from bec_lib.core import MessageEndpoints
from bec_lib import MessageEndpoints
from bec_widgets.qt_utils import Crosshair, Colors

View File

@ -8,7 +8,7 @@ from PyQt5.QtWidgets import (
QWidget,
)
from bec_lib.core import MessageEndpoints, BECMessage
from bec_lib import MessageEndpoints, messages
class StreamApp(QWidget):
@ -102,7 +102,7 @@ class StreamApp(QWidget):
@staticmethod
def _streamer_cb(msg, *, parent, **_kwargs) -> None:
msgMCS = BECMessage.DeviceMessage.loads(msg.value)
msgMCS = messages.DeviceMessage.loads(msg.value)
print(msgMCS)
row = msgMCS.content["signals"][parent.sub_device]
metadata = msgMCS.metadata
@ -123,7 +123,7 @@ class StreamApp(QWidget):
def _device_cv(msg, *, parent, **_kwargs) -> None:
print("Getting ScanID")
msgDEV = BECMessage.ScanStatusMessage.loads(msg.value)
msgDEV = messages.ScanStatusMessage.loads(msg.value)
current_scanID = msgDEV.content["scanID"]
@ -143,7 +143,7 @@ class StreamApp(QWidget):
if __name__ == "__main__":
import argparse
from bec_lib.core import RedisConnector
from bec_lib import RedisConnector
parser = argparse.ArgumentParser(description="Stream App.")
parser.add_argument("--port", type=str, default="pc15543:6379", help="Port for RedisConnector")

View File

@ -1,4 +1,4 @@
from bec_lib.core import BECMessage, MessageEndpoints, RedisConnector
from bec_lib import messages, MessageEndpoints, RedisConnector
import time
connector = RedisConnector("localhost:6379")
@ -15,7 +15,7 @@ metadata.update(
)
for ii in range(20):
data = {"mca1": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], "mca2": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]}
msg = BECMessage.DeviceMessage(
msg = messages.DeviceMessage(
signals=data,
metadata=metadata,
).dumps()

View File

@ -24,7 +24,7 @@ from PyQt5.QtWidgets import QMessageBox
from PyQt5.QtWidgets import QShortcut
from pyqtgraph.Qt import QtWidgets, uic, QtCore
from bec_lib.core import MessageEndpoints, BECMessage
from bec_lib import MessageEndpoints, messages
from bec_widgets.qt_utils import DoubleValidationDelegate
@ -1295,7 +1295,7 @@ class MotorControl(QThread):
@staticmethod
def _device_status_callback_motors(msg, *, parent, **_kwargs) -> None:
deviceMSG = BECMessage.DeviceMessage.loads(msg.value)
deviceMSG = messages.DeviceMessage.loads(msg.value)
if parent.motor_x.name in deviceMSG.content["signals"]:
parent.current_x = deviceMSG.content["signals"][parent.motor_x.name]["value"]
elif parent.motor_y.name in deviceMSG.content["signals"]:
@ -1307,9 +1307,7 @@ if __name__ == "__main__":
import yaml
import argparse
from bec_lib import BECClient
from bec_lib.core import ServiceConfig
from bec_lib import BECClient, ServiceConfig
parser = argparse.ArgumentParser(description="Motor App")

View File

@ -3,7 +3,7 @@ import os
import numpy as np
import PyQt5.QtWidgets
import pyqtgraph as pg
from bec_lib.core import MessageEndpoints
from bec_lib import MessageEndpoints
from PyQt5.QtCore import pyqtSignal, pyqtSlot
from PyQt5.QtWidgets import QApplication, QTableWidgetItem, QWidget
from pyqtgraph import mkBrush, mkColor, mkPen

View File

@ -7,8 +7,8 @@ from typing import Any
import numpy as np
import pyqtgraph
import pyqtgraph as pg
from bec_lib.core import BECMessage, MessageEndpoints
from bec_lib.core.redis_connector import MessageObject, RedisConnector
from bec_lib import messages, MessageEndpoints
from bec_lib.redis_connector import MessageObject, RedisConnector
from PyQt5.QtCore import pyqtSlot
from PyQt5.QtWidgets import QCheckBox, QTableWidgetItem
from pyqtgraph import mkBrush, mkColor, mkPen
@ -210,7 +210,7 @@ class StreamPlot(QtWidgets.QWidget):
np.where(self.plotter_data_x[0] < 10 ** region[1])[0][-1],
]
}
msg = BECMessage.DeviceMessage(signals=return_dict).dumps()
msg = messages.DeviceMessage(signals=return_dict).dumps()
self.producer.set_and_publish("px_stream/gui_event", msg=msg)
self.roi_signal.emit(region)
@ -268,7 +268,7 @@ class StreamPlot(QtWidgets.QWidget):
continue
endpoint = f"px_stream/projection_{self._current_proj}/data"
msgs = self.client.producer.lrange(topic=endpoint, start=-1, end=-1)
data = [BECMessage.DeviceMessage.loads(msg) for msg in msgs]
data = [messages.DeviceMessage.loads(msg) for msg in msgs]
if not data:
continue
with np.errstate(divide="ignore", invalid="ignore"):
@ -293,7 +293,7 @@ class StreamPlot(QtWidgets.QWidget):
proj_nr = content["signals"]["proj_nr"]
endpoint = f"px_stream/projection_{proj_nr}/metadata"
msg_raw = self.client.producer.get(topic=endpoint)
msg = BECMessage.DeviceMessage.loads(msg_raw)
msg = messages.DeviceMessage.loads(msg_raw)
self._current_q = msg.content["signals"]["q"]
self._current_norm = msg.content["signals"]["norm_sum"]
self._current_metadata = msg.content["signals"]["metadata"]

View File

@ -2,8 +2,8 @@ from threading import RLock
import numpy as np
import pyqtgraph as pg
from bec_lib.core import MessageEndpoints
from bec_lib.core.logger import bec_logger
from bec_lib import MessageEndpoints
from bec_lib.logger import bec_logger
from PyQt5.QtCore import pyqtProperty, pyqtSlot
from bec_widgets.bec_dispatcher import bec_dispatcher

View File

@ -2,8 +2,8 @@ import itertools
from threading import RLock
import pyqtgraph as pg
from bec_lib.core import MessageEndpoints
from bec_lib.core.logger import bec_logger
from bec_lib import MessageEndpoints
from bec_lib.logger import bec_logger
from PyQt5.QtCore import pyqtProperty, pyqtSlot
from bec_widgets.bec_dispatcher import bec_dispatcher

View File

@ -1,7 +1,7 @@
import os
import pyqtgraph as pg
from bec_lib.core import MessageEndpoints
from bec_lib import MessageEndpoints
from PyQt5 import QtCore
from PyQt5.QtCore import pyqtSignal, pyqtSlot
from PyQt5.QtWidgets import QApplication, QTableWidgetItem, QWidget

View File

@ -20,7 +20,7 @@ from PyQt5.QtWidgets import (
QHeaderView,
)
from bec_lib.core import MessageEndpoints
from bec_lib import MessageEndpoints
from bec_widgets.qt_utils.widget_io import WidgetIO

View File

@ -1,8 +1,8 @@
from unittest.mock import Mock
import pytest
from bec_lib.core.BECMessage import ScanMessage
from bec_lib.core.connector import MessageObject
from bec_lib.messages import ScanMessage
from bec_lib.connector import MessageObject
# TODO: find a better way to mock singletons
from bec_widgets.bec_dispatcher import _BECDispatcher

View File

@ -2,11 +2,10 @@ from unittest import mock
import numpy as np
import pytest
from bec_lib.core import BECMessage
from bec_lib import messages, RedisConnector
from pytestqt import qtbot
import threading
from bec_lib.core import RedisConnector
from bec_widgets.examples.stream_plot.stream_plot import StreamPlot
@ -144,7 +143,7 @@ def test_on_dap_update(qtbot, stream_app):
# }
#
# # Assume the RedisConnector client would return this data when new_proj is called
# mock_message = mock.MagicMock(spec=BECMessage.DeviceMessage)
# mock_message = mock.MagicMock(spec=messages.DeviceMessage)
# mock_message.__getitem__.side_effect = lambda key: mock_data[key]
# stream_app.client.producer.get = mock.MagicMock(return_value=mock_message.dumps())
#