diff --git a/grum/cli.py b/grum/cli.py index fb2d9b9..09f19dc 100644 --- a/grum/cli.py +++ b/grum/cli.py @@ -7,6 +7,8 @@ from . import ctrl_c, theme from .mainwin import MainWindow from .mdi import MDIWindowMode +from bec_utils import RedisConnector + def main(): app = QApplication(sys.argv) diff --git a/grum/mainwin.py b/grum/mainwin.py index a2e4fd9..c46af16 100644 --- a/grum/mainwin.py +++ b/grum/mainwin.py @@ -13,6 +13,9 @@ from .rpc import RPCServerThread from .shortcut import shortcut from .webview import WebView +from bec_utils import RedisConnector +from .plot_recon import plot_recon, BECConnector + class MainWindow(QMainWindow): @@ -21,6 +24,14 @@ class MainWindow(QMainWindow): def __init__(self, *args, title="grum", host="localhost", port=8000, offline=False, add_examples=False, window_mode=MDIWindowMode.MULTI, **kwargs): super().__init__(*args, **kwargs) + + print("initializing redis-connector") + redis_url = "129.129.122.75:6379" # for LamNI + redis_connector = RedisConnector(redis_url) + connector = BECConnector(redis_connector) + data_access = DataAccess(connector) + + if offline: title = f"{title} (offline)" @@ -91,6 +102,8 @@ class MainWindow(QMainWindow): self.sig_make_new_plot.connect(self.on_make_new_plot) + plot_recon() + def keyPressEvent(self, event): if event.key() == Qt.Key_F1: diff --git a/grum/plot_recon.py b/grum/plot_recon.py new file mode 100644 index 0000000..bea9814 --- /dev/null +++ b/grum/plot_recon.py @@ -0,0 +1,214 @@ +from os import listdir +from os.path import isfile, join + + +from abc import ABC, abstractmethod +from collections import defaultdict, deque +from bec_utils import BECMessage, MessageEndpoints +from PyQt5.QtCore import QObject, pyqtSignal + +pyqtWrapperType = type(QObject) + + +class FinalMeta(pyqtWrapperType, ABC): + pass + + +class ConnectorBase(QObject, metaclass=FinalMeta): + new_monitor_value = pyqtSignal(str) + msg_update = pyqtSignal(BECMessage.ScanStatusMessage) + msg_queue_update = pyqtSignal(BECMessage.ScanQueueMessage) + msg_scan_update = pyqtSignal(list) + storage_device = defaultdict(lambda: deque(maxlen=5000)) + current_devices = [] + + @abstractmethod + def start_scan_sub(self): + """ """ + + @abstractmethod + def start_monitor_sub(self): + """ """ + + @abstractmethod + def start_scan_msg_sub(self): + """ """ + + @abstractmethod + def start_queue_msg_sub(self): + """ """ + + +class BECConnector(ConnectorBase): + new_monitor_value = pyqtSignal(str) + msg_update = pyqtSignal(BECMessage.ScanStatusMessage) + msg_scan_update = pyqtSignal(list) + msg_queue_update = pyqtSignal(BECMessage.ScanQueueMessage) + + def __init__(self, redis_connector) -> None: + super().__init__() + self.connector = redis_connector + self.producer = self.connector.producer() + self._scan_sub = None + self._monitor_sub = None + self._scan_msg_sub = None + self._queue_msg_sub = None + self.storage_device = defaultdict(lambda: deque(maxlen=5000)) + + def start_scan_sub(self): + self._scan_sub = self.connector.consumer( + MessageEndpoints.scan_status(), cb=self.scan_status_update, parent=self + ) + self._scan_sub.start() + + def start_scan_msg_sub(self): + self._scan_msg_sub = self.connector.consumer( + MessageEndpoints.scan_segment(), cb=self.scan_msg_update, parent=self + ) + self._scan_msg_sub.start() + + def start_queue_msg_sub(self): + self._queue_msg_sub = self.connector.consumer( + MessageEndpoints.scan_queue_request(), cb=self.queue_msg_update, parent=self + ) + self._queue_msg_sub.start() + + @staticmethod + def queue_msg_update(msg, parent): + print("queue_msg_update", msg) + + msg = BECMessage.ScanQueueMessage.loads(msg.value) + parent.msg_queue_update.emit(msg) + + @staticmethod + def scan_msg_update(msg, parent): + print("scan_msg_update", msg) + msg = BECMessage.ScanMessage.loads(msg.value) + parent.msg_scan_update.emit(msg) + + @staticmethod + def scan_status_update(msg, parent): + print("scan_status_update", msg) + msg = BECMessage.ScanStatusMessage.loads(msg.value) + parent.msg_update.emit(msg) + + def start_monitor_sub(self): + self._monitor_sub = self.connector.consumer( + pattern=MessageEndpoints.device_readback("*"), + cb=self.monitor_update, + parent=self, + ) + self._monitor_sub.start() + + @staticmethod + def monitor_update(msg, parent): + dev = msg.topic.decode().split(MessageEndpoints._device_readback + "/")[-1].split(":sub")[0] + msg = BECMessage.DeviceMessage.loads(msg.value) + parent.append_device_reading(msg, dev) + + def append_device_reading(self, msg: BECMessage.DeviceMessage, device_name: str): + signals = msg.content["signals"] + self.storage_device[device_name].append(signals) + self.new_monitor_value.emit(device_name) + + +from collections import deque +from PyQt5.QtCore import QObject, pyqtSignal +from live_display.bec_connector import ConnectorBase +from bec_utils import BECMessage + + +class DataAccess(QObject): + new_mon_value = pyqtSignal(str) + used_devices_updated = pyqtSignal(bool) + new_scan_started = pyqtSignal(bool) + new_scan_msg = pyqtSignal(BECMessage.BECMessage) + msg_queue_update = pyqtSignal(BECMessage.BECMessage) + new_position_umv = pyqtSignal(bool) + + def __init__(self, connector: ConnectorBase): + super().__init__() + self.connector = connector + self.start_all_subscriptions(self.connector) + self.device_storage = self.connector.storage_device + + self.all_device_names = self.get_all_device_names() + self.used_devices = [] + self.scan_nr = "" + self.positions = None + self.used_motors = [] + self.connect_pyqtSignals() + + + + def start_all_subscriptions(self, conn): + conn.start_scan_sub() + conn.start_monitor_sub() + conn.start_scan_msg_sub() + conn.start_queue_msg_sub() + + def on_msg_update(self, msg: BECMessage.ScanStatusMessage): + """ + This function is run whenever there is a new message. + Args: + msg (BECMessage.ScanStatusMessage): A scan status message + """ + if msg.content["status"] == "open": + self.get_scan_info(msg) + # self.reset_plot_data() + self.new_scan_started.emit(True) + + # def on_msg_scan_update(self, message): + # """ + # This fcn runs whenever there is a new message from the device, could be more than one scan at the time. + # Args: + # message (_type_): _description_ + # """ + # if message[0].content["point_id"] == 0: + # self.update_used_devices(message) + # self.update_all_device_names() + + # for scan_segment_mess in message: + # self.new_scan_msg.emit(scan_segment_mess) + + + def get_scan_info(self, msg): + print( + "--------------------------------------------------------------------------------------------" + ) + print("New scan started") + self.used_motors = msg.content["info"]["primary"] + self.scan_nr = msg.content["info"]["scan_number"] + self.positions = msg.content["info"]["positions"] + print( + "--------------------------------------------------------------------------------------------" + ) + + def connect_pyqtSignals(self): + # self.new_mon_value.connect(lambda: self.monitor_plot_update(self.monitored_device)) + self.connector.new_monitor_value.connect(self.on_new_monitor_value) + self.connector.msg_update.connect(self.on_msg_update) + self.connector.msg_scan_update.connect(self.on_msg_scan_update) + self.connector.msg_queue_update.connect(self.on_msg_queue_update) + + +def get_files(): + + mypath = "/home/stalbe_j/grum" + return [f for f in listdir(mypath) if isfile(join(mypath, f))] + + +def plot_recon(): + files = get_files() + recon_files = [file for file in files if file.endswith("h5")] + for file in recon_files: + print(file) + print(file.keys()) + + # i = 1 + # while True: + + # print("plot_recon") + # self.new_plot("recon"+str(i), {'xs':[1*i,2*1,3*1], 'ys':[3*i,4*i,5*i]}) + # time.sleep(10) + # i+=1 \ No newline at end of file diff --git a/mytestfile.hdf5 b/mytestfile.hdf5 new file mode 100644 index 0000000..3afd443 Binary files /dev/null and b/mytestfile.hdf5 differ diff --git a/setup.py b/setup.py index 8369a89..94f449b 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,16 @@ +import os +import pathlib +import subprocess + from setuptools import setup +CURRENT_PATH = pathlib.Path(__file__).parent.resolve() + +bec_utils = os.path.join(os.getenv("BEC_PATH", f"{CURRENT_PATH}/../bec"), "bec_utils") + if __name__ == "__main__": + subprocess.run(f"pip install -e {bec_utils}", shell=True, check=True) + setup( install_requires=["pyqt5", "pyqtgraph", "h5py", "PyQtWebEngine"], entry_points={"console_scripts": ["grum=grum:main"]},