diff --git a/bec_widgets/widgets/monitor/monitor.py b/bec_widgets/widgets/monitor/monitor.py index 2b0796b5..1643d257 100644 --- a/bec_widgets/widgets/monitor/monitor.py +++ b/bec_widgets/widgets/monitor/monitor.py @@ -237,6 +237,35 @@ CONFIG_SIMPLE = { ], } +CONFIG_REDIS = { + "plot_settings": { + "background_color": "white", + "axis_width": 2, + "num_columns": 5, + "colormap": "plasma", + "scan_types": False, + }, + "plot_data": [ + { + "plot_name": "BPM4i plots vs samx", + "x_label": "Motor Y", + "y_label": "bpm4i", + "sources": [ + { + "type": "scan_segment", + "signals": {"x": [{"name": "samx"}], "y": [{"name": "gauss_bpm"}]}, + }, + { + "type": "redis", + "endpoint": "public/gui/data/6cd5ea3f-a9a9-4736-b4ed-74ab9edfb996", + "update": "append", + "signals": {"x": [{"name": "x_default_tag"}], "y": [{"name": "y_default_tag"}]}, + }, + ], + } + ], +} + class BECMonitor(pg.GraphicsLayoutWidget): update_signal = pyqtSignal() @@ -270,6 +299,7 @@ class BECMonitor(pg.GraphicsLayoutWidget): bec_dispatcher.connect_slot( self.on_instruction, MessageEndpoints.gui_instructions(self.gui_id) ) + bec_dispatcher.connect_slot(self.on_data_from_redis, MessageEndpoints.gui_data(self.gui_id)) # Current configuration self.config = config @@ -292,7 +322,7 @@ class BECMonitor(pg.GraphicsLayoutWidget): # Connect the update signal to the update plot method self.proxy_update_plot = pg.SignalProxy( - self.update_signal, rateLimit=25, slot=self.update_plot + self.update_signal, rateLimit=25, slot=self.update_scan_segment_plot ) # Init UI @@ -322,19 +352,23 @@ class BECMonitor(pg.GraphicsLayoutWidget): # Initialize the UI self._init_ui(self.plot_settings["num_columns"]) - def _init_database(self, plot_data_config: dict) -> dict: + def _init_database(self, plot_data_config: dict, source_type_to_init=None) -> dict: """ - Initializes the database for the PlotApp. + Initializes or updates the database for the PlotApp. Args: - plot_data_config(dict): Configuration settings for plots + plot_data_config(dict): Configuration settings for plots. + source_type_to_init(str, optional): Specific source type to initialize. If None, initialize all. Returns: - dict: Database dictionary + dict: Updated or new database dictionary. """ - database = {} + database = {} if source_type_to_init is None else self.database.copy() for plot in plot_data_config: for source in plot["sources"]: source_type = source["type"] + if source_type_to_init and source_type != source_type_to_init: + continue # Skip if not the specified source type + if source_type not in database: database[source_type] = {} @@ -342,7 +376,6 @@ class BECMonitor(pg.GraphicsLayoutWidget): for signal in signals: name = signal["name"] entry = signal.get("entry", name) - if name not in database[source_type]: database[source_type][name] = {} if entry not in database[source_type][name]: @@ -449,14 +482,9 @@ class BECMonitor(pg.GraphicsLayoutWidget): def init_curves(self) -> None: """ - Initialize curve data and properties, and update table row labels. - - This method initializes a nested dictionary `self.curves_data` to store - the curve objects for each x and y signal pair. It also updates the row labels - in `self.tableWidget_crosshair` to include the grid position for each y-value. + Initialize curve data and properties for each plot and data source. """ self.curves_data = {} - row_labels = [] for idx, plot_config in enumerate(self.plot_data): plot_name = plot_config.get("plot_name", "") @@ -464,28 +492,30 @@ class BECMonitor(pg.GraphicsLayoutWidget): plot.clear() for source in plot_config["sources"]: - source_type = source["type"][0] + source_type = source["type"] y_signals = source["signals"].get("y", []) colors_ys = Colors.golden_angle_color( colormap=self.plot_settings["colormap"], num=len(y_signals) ) - curve_list = [] + if source_type not in self.curves_data: + self.curves_data[source_type] = {} + if plot_name not in self.curves_data[source_type]: + self.curves_data[source_type][plot_name] = [] + for i, (y_signal, color) in enumerate(zip(y_signals, colors_ys)): y_name = y_signal["name"] y_entry = y_signal.get("entry", y_name) - curve_name = f"{y_name} ({y_entry})-{source_type.upper()}" curve_data = self.create_curve(curve_name, color) - curve_list.append((y_name, y_entry, curve_data)) plot.addItem(curve_data) - row_labels.append(f"{y_name} ({y_entry}) - {plot_name}") + self.curves_data[source_type][plot_name].append((y_name, y_entry, curve_data)) - self.curves_data[plot_name] = curve_list - - # Hook Crosshair - if self.enable_crosshair is True: - self.hook_crosshair() + # Render static plot elements + self.update_plot() + # # Hook Crosshair #TODO enable later, currently not working + # if self.enable_crosshair is True: + # self.hook_crosshair() def create_curve(self, curve_name: str, color: str) -> pg.PlotDataItem: """ @@ -518,24 +548,36 @@ class BECMonitor(pg.GraphicsLayoutWidget): crosshair = Crosshair(plot, precision=3) self.crosshairs[plot_name] = crosshair - def update_plot(self) -> None: - """Update the plot data based on the stored data dictionary.""" - for plot_name, curve_list in self.curves_data.items(): - plot_config = next( - (pc for pc in self.plot_data if pc.get("plot_name") == plot_name), None - ) - if not plot_config: + def update_scan_segment_plot(self): + """ + Update the plot with the latest scan segment data. + """ + self.update_plot(source_type="scan_segment") + + def update_plot(self, source_type=None) -> None: + """ + Update the plot data based on the stored data dictionary. + Only updates data for the specified source_type if provided. + """ + for src_type, plots in self.curves_data.items(): + if source_type and src_type != source_type: continue - x_name, x_entry = self.extract_x_config(plot_config) + for plot_name, curve_list in plots.items(): + plot_config = next( + (pc for pc in self.plot_data if pc.get("plot_name") == plot_name), None + ) + if not plot_config: + continue - for y_name, y_entry, curve in curve_list: - data_x = self.database.get("scan_segment", {}).get(x_name, {}).get(x_entry, []) - data_y = self.database.get("scan_segment", {}).get(y_name, {}).get(y_entry, []) + x_name, x_entry = self.extract_x_config(plot_config, src_type) - curve.setData(data_x, data_y) + for y_name, y_entry, curve in curve_list: + data_x = self.database.get(src_type, {}).get(x_name, {}).get(x_entry, []) + data_y = self.database.get(src_type, {}).get(y_name, {}).get(y_entry, []) + curve.setData(data_x, data_y) - def extract_x_config(self, plot_config: dict) -> tuple: + def extract_x_config(self, plot_config: dict, source_type: str) -> tuple: """Extract the signal configurations for x and y axes from plot_config. Args: plot_config (dict): Plot configuration. @@ -545,12 +587,11 @@ class BECMonitor(pg.GraphicsLayoutWidget): x_name, x_entry = None, None for source in plot_config["sources"]: - if "x" in source["signals"]: + if source["type"] == source_type and "x" in source["signals"]: x_signal = source["signals"]["x"][0] x_name = x_signal.get("name") - x_entry = x_signal.get("entry") - - return x_name, x_entry + x_entry = x_signal.get("entry", x_name) + return x_name, x_entry def get_config(self): """Return the current configuration settings.""" @@ -644,10 +685,26 @@ class BECMonitor(pg.GraphicsLayoutWidget): return formatted_error_message - def flush(self) -> None: - """Flush the data dictionary (legacy) and recreate the database.""" - self.database = self._init_database(self.plot_data) - self.init_curves() + def flush(self, flush_all=False, source_type_to_flush=None) -> None: + """Update or reset the database to match the current configuration. + + Args: + flush_all (bool): If True, reset the entire database. + source_type_to_flush (str): Specific source type to reset. Ignored if flush_all is True. + """ + if flush_all: + self.database = self._init_database(self.plot_data) + self.init_curves() + else: + if source_type_to_flush in self.database: + # TODO maybe reinit the database from config again instead of cycle through names/entries + # Reset only the specified source type + for name in self.database[source_type_to_flush]: + for entry in self.database[source_type_to_flush][name]: + self.database[source_type_to_flush][name][entry] = [] + # Reset curves for the specified source type + if source_type_to_flush in self.curves_data: + self.init_curves() @pyqtSlot(dict, dict) def on_scan_segment(self, msg: dict, metadata: dict): @@ -687,7 +744,7 @@ class BECMonitor(pg.GraphicsLayoutWidget): if not self.scan_data: print(f"No data found for scanID: {self.scanID}") # TODO better error return - self.flush() + self.flush(source_type_to_flush="scan_segment") self.scan_segment_update() @@ -706,6 +763,34 @@ class BECMonitor(pg.GraphicsLayoutWidget): else: print(f"No data found for {device_name} {entry}") + @pyqtSlot(dict) + def on_data_from_redis(self, msg) -> None: + """ + Handle new data sent from redis. + Args: + msg (dict): Message received with data. + """ + + self._init_database( + self.plot_data, source_type_to_init="redis" + ) # add database entry for redis dataset + + data = msg.get("data", {}) + x_data = data.get("x", {}) + y_data = data.get("y", {}) + + # Update x data + if x_data: + x_tag = x_data.get("tag") + self.database["redis"][x_tag][x_tag] = x_data["data"] + + # Update y data + for y_tag, y_info in y_data.items(): + self.database["redis"][y_tag][y_tag] = y_info["data"] + + # Trigger plot update + self.update_plot(source_type="redis") + if __name__ == "__main__": # pragma: no cover import argparse @@ -725,7 +810,7 @@ if __name__ == "__main__": # pragma: no cover # Load config from file config = load_yaml(args.config_file) else: - config = CONFIG_SCAN_MODE + config = CONFIG_REDIS client = bec_dispatcher.client client.start() @@ -736,4 +821,10 @@ if __name__ == "__main__": # pragma: no cover skip_validation=False, ) monitor.show() + # just to test redis data + # redis_data = { + # "x": {"data": [1, 2, 3], "tag": "x_default_tag"}, + # "y": {"y_default_tag": {"data": [1, 2, 3]}}, + # } + # monitor.on_data_from_redis({"data": redis_data}) sys.exit(app.exec())