From c3f2ad45c3a3b54597ab64de4e5105f49ef50a79 Mon Sep 17 00:00:00 2001 From: wyzula-jan <133381102+wyzula-jan@users.noreply.github.com> Date: Wed, 13 Dec 2023 10:30:58 +0100 Subject: [PATCH] refactor: monitor.py data for scan segment are only accessed through queue.scan_storage --- bec_widgets/widgets/monitor/monitor.py | 168 +++---------------------- 1 file changed, 17 insertions(+), 151 deletions(-) diff --git a/bec_widgets/widgets/monitor/monitor.py b/bec_widgets/widgets/monitor/monitor.py index b16317b6..ab2c06a1 100644 --- a/bec_widgets/widgets/monitor/monitor.py +++ b/bec_widgets/widgets/monitor/monitor.py @@ -247,8 +247,6 @@ class BECMonitor(pg.GraphicsLayoutWidget): enable_crosshair: bool = True, gui_id=None, skip_validation: bool = False, - legacy_scan_segment: bool = True, - scan_storage_access: bool = False, ): super(BECMonitor, self).__init__(parent=parent) @@ -273,9 +271,7 @@ class BECMonitor(pg.GraphicsLayoutWidget): # Current configuration self.config = config - self.legacy_scan_segment = legacy_scan_segment self.skip_validation = skip_validation - self.scan_storage_access = scan_storage_access # Enable crosshair self.enable_crosshair = enable_crosshair @@ -399,12 +395,9 @@ class BECMonitor(pg.GraphicsLayoutWidget): last_row_cols -= 1 plot_name = plot_config.get("plot_name", "") - if self.legacy_scan_segment is True: - x_label = plot_config["x"].get("label", "") - y_label = plot_config["y"].get("label", "") - else: - x_label = plot_config.get("x_label", "") - y_label = plot_config.get("y_label", "") + + x_label = plot_config.get("x_label", "") + y_label = plot_config.get("y_label", "") plot = self.addPlot(row=row, col=col, colspan=colspan, title=plot_name) plot.setLabel("bottom", x_label) @@ -416,10 +409,7 @@ class BECMonitor(pg.GraphicsLayoutWidget): self.grid_coordinates.append((row, col)) # Initialize curves - if self.legacy_scan_segment is True: - self.legacy_init_curves() - else: - self.init_curves() + self.init_curves() def _set_plot_colors(self, plot: pg.PlotItem, plot_settings: dict) -> None: """ @@ -508,56 +498,6 @@ class BECMonitor(pg.GraphicsLayoutWidget): if self.enable_crosshair is True: self.hook_crosshair() - def legacy_init_curves(self) -> None: - """ - Initialize curve data and properties, and update table row labels. - - This method initializes a nested dictionary `self.curves_data` to store - the curve objects for each x and y signal pair. It also updates the row labels - in `self.tableWidget_crosshair` to include the grid position for each y-value. - """ - self.curves_data = {} - row_labels = [] - - for idx, plot_config in enumerate(self.plot_data): - plot_name = plot_config.get("plot_name", "") - plot = self.plots[plot_name] - plot.clear() - - y_configs = plot_config["y"]["signals"] - colors_ys = Colors.golden_angle_color( - colormap=self.plot_settings["colormap"], num=len(y_configs) - ) - - curve_list = [] - for i, (y_config, color) in enumerate(zip(y_configs, colors_ys)): - y_name = y_config["name"] - y_entry = y_config["entry"] - - user_color = self.user_colors.get((plot_name, y_name, y_entry), None) - color_to_use = user_color if user_color else color - - pen_curve = mkPen(color=color_to_use, width=2, style=QtCore.Qt.DashLine) - brush_curve = mkBrush(color=color_to_use) - - curve_data = pg.PlotDataItem( - symbolSize=5, - symbolBrush=brush_curve, - pen=pen_curve, - skipFiniteCheck=True, - name=f"{y_name} ({y_entry})", - ) - - curve_list.append((y_name, y_entry, curve_data)) - plot.addItem(curve_data) - row_labels.append(f"{y_name} ({y_entry}) - {plot_name}") - - self.curves_data[plot_name] = curve_list - - # Hook Crosshair - if self.enable_crosshair == True: - self.hook_crosshair() - def hook_crosshair(self) -> None: """Hook the crosshair to all plots.""" # TODO can be extended to hook crosshair signal for mouse move/clicked @@ -567,13 +507,6 @@ class BECMonitor(pg.GraphicsLayoutWidget): self.crosshairs[plot_name] = crosshair def update_plot(self) -> None: - """Update the plot data based on the stored data dictionary.""" - if self.legacy_scan_segment is True: - self.legacy_update_plot() - else: - self.source_update_plot() - - def source_update_plot(self): """Update the plot data based on the stored data dictionary.""" for plot_name, curve_list in self.curves_data.items(): plot_config = next( @@ -591,7 +524,9 @@ class BECMonitor(pg.GraphicsLayoutWidget): curve.setData(data_x, data_y) - def extract_signal_configurations(self, plot_config): + def extract_signal_configurations( + self, plot_config: dict + ): # TODO can be probably removed when validation is up """Extract the signal configurations for x and y axes from plot_config.""" x_name, x_entry, y_configurations = None, None, [] @@ -599,31 +534,15 @@ class BECMonitor(pg.GraphicsLayoutWidget): if "x" in source["signals"]: x_signal = source["signals"]["x"][0] x_name = x_signal.get("name", "") - x_entry = x_signal.get("entry", x_name) + x_entry = x_signal.get( + "entry", x_name + ) # todo can be removed when validation is up again if "y" in source["signals"]: y_configurations.extend(source["signals"]["y"]) return x_name, x_entry, y_configurations - def legacy_update_plot(self) -> None: - """Legacy version of how data are update from on_scan_segment. - Update the plot data based on the stored data dictionary.""" - for plot_name, curve_list in self.curves_data.items(): - for y_name, y_entry, curve in curve_list: - x_config = next( - (pc["x"] for pc in self.plot_data if pc.get("plot_name") == plot_name), {} - ) - x_signal_config = x_config["signals"][0] - x_name = x_signal_config.get("name", "") - x_entry = x_signal_config.get("entry", x_name) - - key = (x_name, x_entry, y_name, y_entry) - data_x = self.data.get(key, {}).get("x", []) - data_y = self.data.get(key, {}).get("y", []) - - curve.setData(data_x, data_y) - def get_config(self): """Return the current configuration settings.""" return self.config @@ -752,43 +671,21 @@ class BECMonitor(pg.GraphicsLayoutWidget): self._init_ui(self.plot_settings["num_columns"]) self.scanID = current_scanID + self.scan_data = self.queue.scan_storage.find_scan_by_ID(self.scanID) + if not self.scan_data: + print(f"No data found for scanID: {self.scanID}") # TODO better error + return self.flush() - if self.legacy_scan_segment is True: - self.legacy_scan_segment_update(msg) - elif self.scan_storage_access is True: - self.update_from_scan_storage(current_scanID) - else: - self.scan_segment_update(msg) + self.scan_segment_update() self.update_signal.emit() - def scan_segment_update(self, msg: dict): - """ - Update the database based on the scan segment message. - - Args: - msg (dict): Message received with scan data. - """ - # Append new data to the database based on the incoming message - for device_name, device_entries in self.database.get("scan_segment", {}).items(): - for entry, data_list in device_entries.items(): - data_value = msg["data"].get(device_name, {}).get(entry, {}).get("value", None) - if data_value is not None: - data_list.append(data_value) - - def update_from_scan_storage(self, scanID: str): + def scan_segment_update(self): """ Update the database with data from scan storage based on the provided scanID. - - Args: - scanID (str): The scan ID used to find the relevant scan data. """ - scan_data = self.queue.scan_storage.find_scan_by_ID(scanID).data - if not scan_data: - print(f"No data found for scanID: {scanID}") - return - + scan_data = self.scan_data.data for device_name, device_entries in self.database.get("scan_segment", {}).items(): for entry in device_entries.keys(): dataset = scan_data[device_name][entry].val @@ -797,35 +694,6 @@ class BECMonitor(pg.GraphicsLayoutWidget): else: print(f"No data found for {device_name} {entry}") - def legacy_scan_segment_update(self, msg: dict): - """ - Legacy method to handle scan segments appending each line from scan message. - Args: - msg(dict): Message received with scan data. - """ - for plot_config in self.plot_data: - x_config = plot_config["x"] - x_signal_config = x_config["signals"][0] # There is exactly 1 config for x signals - - x_name = x_signal_config.get("name", "") - x_entry = x_signal_config.get("entry", []) - - y_configs = plot_config["y"]["signals"] - for y_config in y_configs: - y_name = y_config.get("name", "") - y_entry = y_config.get("entry", []) - - key = (x_name, x_entry, y_name, y_entry) - - data_x = msg["data"].get(x_name, {}).get(x_entry, {}).get("value", None) - data_y = msg["data"].get(y_name, {}).get(y_entry, {}).get("value", None) - - if data_x is not None: - self.data.setdefault(key, {}).setdefault("x", []).append(data_x) - - if data_y is not None: - self.data.setdefault(key, {}).setdefault("y", []).append(data_y) - if __name__ == "__main__": # pragma: no cover import argparse @@ -856,8 +724,6 @@ if __name__ == "__main__": # pragma: no cover config=config, gui_id=args.id, skip_validation=True, - legacy_scan_segment=False, - scan_storage_access=False, ) monitor.show() sys.exit(app.exec())