0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 11:41:49 +02:00

refactor: monitor.py data for scan segment are only accessed through queue.scan_storage

This commit is contained in:
wyzula-jan
2023-12-13 10:30:58 +01:00
parent 26c07c3205
commit c3f2ad45c3

View File

@ -247,8 +247,6 @@ class BECMonitor(pg.GraphicsLayoutWidget):
enable_crosshair: bool = True, enable_crosshair: bool = True,
gui_id=None, gui_id=None,
skip_validation: bool = False, skip_validation: bool = False,
legacy_scan_segment: bool = True,
scan_storage_access: bool = False,
): ):
super(BECMonitor, self).__init__(parent=parent) super(BECMonitor, self).__init__(parent=parent)
@ -273,9 +271,7 @@ class BECMonitor(pg.GraphicsLayoutWidget):
# Current configuration # Current configuration
self.config = config self.config = config
self.legacy_scan_segment = legacy_scan_segment
self.skip_validation = skip_validation self.skip_validation = skip_validation
self.scan_storage_access = scan_storage_access
# Enable crosshair # Enable crosshair
self.enable_crosshair = enable_crosshair self.enable_crosshair = enable_crosshair
@ -399,12 +395,9 @@ class BECMonitor(pg.GraphicsLayoutWidget):
last_row_cols -= 1 last_row_cols -= 1
plot_name = plot_config.get("plot_name", "") plot_name = plot_config.get("plot_name", "")
if self.legacy_scan_segment is True:
x_label = plot_config["x"].get("label", "") x_label = plot_config.get("x_label", "")
y_label = plot_config["y"].get("label", "") y_label = plot_config.get("y_label", "")
else:
x_label = plot_config.get("x_label", "")
y_label = plot_config.get("y_label", "")
plot = self.addPlot(row=row, col=col, colspan=colspan, title=plot_name) plot = self.addPlot(row=row, col=col, colspan=colspan, title=plot_name)
plot.setLabel("bottom", x_label) plot.setLabel("bottom", x_label)
@ -416,10 +409,7 @@ class BECMonitor(pg.GraphicsLayoutWidget):
self.grid_coordinates.append((row, col)) self.grid_coordinates.append((row, col))
# Initialize curves # Initialize curves
if self.legacy_scan_segment is True: self.init_curves()
self.legacy_init_curves()
else:
self.init_curves()
def _set_plot_colors(self, plot: pg.PlotItem, plot_settings: dict) -> None: def _set_plot_colors(self, plot: pg.PlotItem, plot_settings: dict) -> None:
""" """
@ -508,56 +498,6 @@ class BECMonitor(pg.GraphicsLayoutWidget):
if self.enable_crosshair is True: if self.enable_crosshair is True:
self.hook_crosshair() self.hook_crosshair()
def legacy_init_curves(self) -> None:
"""
Initialize curve data and properties, and update table row labels.
This method initializes a nested dictionary `self.curves_data` to store
the curve objects for each x and y signal pair. It also updates the row labels
in `self.tableWidget_crosshair` to include the grid position for each y-value.
"""
self.curves_data = {}
row_labels = []
for idx, plot_config in enumerate(self.plot_data):
plot_name = plot_config.get("plot_name", "")
plot = self.plots[plot_name]
plot.clear()
y_configs = plot_config["y"]["signals"]
colors_ys = Colors.golden_angle_color(
colormap=self.plot_settings["colormap"], num=len(y_configs)
)
curve_list = []
for i, (y_config, color) in enumerate(zip(y_configs, colors_ys)):
y_name = y_config["name"]
y_entry = y_config["entry"]
user_color = self.user_colors.get((plot_name, y_name, y_entry), None)
color_to_use = user_color if user_color else color
pen_curve = mkPen(color=color_to_use, width=2, style=QtCore.Qt.DashLine)
brush_curve = mkBrush(color=color_to_use)
curve_data = pg.PlotDataItem(
symbolSize=5,
symbolBrush=brush_curve,
pen=pen_curve,
skipFiniteCheck=True,
name=f"{y_name} ({y_entry})",
)
curve_list.append((y_name, y_entry, curve_data))
plot.addItem(curve_data)
row_labels.append(f"{y_name} ({y_entry}) - {plot_name}")
self.curves_data[plot_name] = curve_list
# Hook Crosshair
if self.enable_crosshair == True:
self.hook_crosshair()
def hook_crosshair(self) -> None: def hook_crosshair(self) -> None:
"""Hook the crosshair to all plots.""" """Hook the crosshair to all plots."""
# TODO can be extended to hook crosshair signal for mouse move/clicked # TODO can be extended to hook crosshair signal for mouse move/clicked
@ -567,13 +507,6 @@ class BECMonitor(pg.GraphicsLayoutWidget):
self.crosshairs[plot_name] = crosshair self.crosshairs[plot_name] = crosshair
def update_plot(self) -> None: def update_plot(self) -> None:
"""Update the plot data based on the stored data dictionary."""
if self.legacy_scan_segment is True:
self.legacy_update_plot()
else:
self.source_update_plot()
def source_update_plot(self):
"""Update the plot data based on the stored data dictionary.""" """Update the plot data based on the stored data dictionary."""
for plot_name, curve_list in self.curves_data.items(): for plot_name, curve_list in self.curves_data.items():
plot_config = next( plot_config = next(
@ -591,7 +524,9 @@ class BECMonitor(pg.GraphicsLayoutWidget):
curve.setData(data_x, data_y) curve.setData(data_x, data_y)
def extract_signal_configurations(self, plot_config): def extract_signal_configurations(
self, plot_config: dict
): # TODO can be probably removed when validation is up
"""Extract the signal configurations for x and y axes from plot_config.""" """Extract the signal configurations for x and y axes from plot_config."""
x_name, x_entry, y_configurations = None, None, [] x_name, x_entry, y_configurations = None, None, []
@ -599,31 +534,15 @@ class BECMonitor(pg.GraphicsLayoutWidget):
if "x" in source["signals"]: if "x" in source["signals"]:
x_signal = source["signals"]["x"][0] x_signal = source["signals"]["x"][0]
x_name = x_signal.get("name", "") x_name = x_signal.get("name", "")
x_entry = x_signal.get("entry", x_name) x_entry = x_signal.get(
"entry", x_name
) # todo can be removed when validation is up again
if "y" in source["signals"]: if "y" in source["signals"]:
y_configurations.extend(source["signals"]["y"]) y_configurations.extend(source["signals"]["y"])
return x_name, x_entry, y_configurations return x_name, x_entry, y_configurations
def legacy_update_plot(self) -> None:
"""Legacy version of how data are update from on_scan_segment.
Update the plot data based on the stored data dictionary."""
for plot_name, curve_list in self.curves_data.items():
for y_name, y_entry, curve in curve_list:
x_config = next(
(pc["x"] for pc in self.plot_data if pc.get("plot_name") == plot_name), {}
)
x_signal_config = x_config["signals"][0]
x_name = x_signal_config.get("name", "")
x_entry = x_signal_config.get("entry", x_name)
key = (x_name, x_entry, y_name, y_entry)
data_x = self.data.get(key, {}).get("x", [])
data_y = self.data.get(key, {}).get("y", [])
curve.setData(data_x, data_y)
def get_config(self): def get_config(self):
"""Return the current configuration settings.""" """Return the current configuration settings."""
return self.config return self.config
@ -752,43 +671,21 @@ class BECMonitor(pg.GraphicsLayoutWidget):
self._init_ui(self.plot_settings["num_columns"]) self._init_ui(self.plot_settings["num_columns"])
self.scanID = current_scanID self.scanID = current_scanID
self.scan_data = self.queue.scan_storage.find_scan_by_ID(self.scanID)
if not self.scan_data:
print(f"No data found for scanID: {self.scanID}") # TODO better error
return
self.flush() self.flush()
if self.legacy_scan_segment is True: self.scan_segment_update()
self.legacy_scan_segment_update(msg)
elif self.scan_storage_access is True:
self.update_from_scan_storage(current_scanID)
else:
self.scan_segment_update(msg)
self.update_signal.emit() self.update_signal.emit()
def scan_segment_update(self, msg: dict): def scan_segment_update(self):
"""
Update the database based on the scan segment message.
Args:
msg (dict): Message received with scan data.
"""
# Append new data to the database based on the incoming message
for device_name, device_entries in self.database.get("scan_segment", {}).items():
for entry, data_list in device_entries.items():
data_value = msg["data"].get(device_name, {}).get(entry, {}).get("value", None)
if data_value is not None:
data_list.append(data_value)
def update_from_scan_storage(self, scanID: str):
""" """
Update the database with data from scan storage based on the provided scanID. Update the database with data from scan storage based on the provided scanID.
Args:
scanID (str): The scan ID used to find the relevant scan data.
""" """
scan_data = self.queue.scan_storage.find_scan_by_ID(scanID).data scan_data = self.scan_data.data
if not scan_data:
print(f"No data found for scanID: {scanID}")
return
for device_name, device_entries in self.database.get("scan_segment", {}).items(): for device_name, device_entries in self.database.get("scan_segment", {}).items():
for entry in device_entries.keys(): for entry in device_entries.keys():
dataset = scan_data[device_name][entry].val dataset = scan_data[device_name][entry].val
@ -797,35 +694,6 @@ class BECMonitor(pg.GraphicsLayoutWidget):
else: else:
print(f"No data found for {device_name} {entry}") print(f"No data found for {device_name} {entry}")
def legacy_scan_segment_update(self, msg: dict):
"""
Legacy method to handle scan segments appending each line from scan message.
Args:
msg(dict): Message received with scan data.
"""
for plot_config in self.plot_data:
x_config = plot_config["x"]
x_signal_config = x_config["signals"][0] # There is exactly 1 config for x signals
x_name = x_signal_config.get("name", "")
x_entry = x_signal_config.get("entry", [])
y_configs = plot_config["y"]["signals"]
for y_config in y_configs:
y_name = y_config.get("name", "")
y_entry = y_config.get("entry", [])
key = (x_name, x_entry, y_name, y_entry)
data_x = msg["data"].get(x_name, {}).get(x_entry, {}).get("value", None)
data_y = msg["data"].get(y_name, {}).get(y_entry, {}).get("value", None)
if data_x is not None:
self.data.setdefault(key, {}).setdefault("x", []).append(data_x)
if data_y is not None:
self.data.setdefault(key, {}).setdefault("y", []).append(data_y)
if __name__ == "__main__": # pragma: no cover if __name__ == "__main__": # pragma: no cover
import argparse import argparse
@ -856,8 +724,6 @@ if __name__ == "__main__": # pragma: no cover
config=config, config=config,
gui_id=args.id, gui_id=args.id,
skip_validation=True, skip_validation=True,
legacy_scan_segment=False,
scan_storage_access=False,
) )
monitor.show() monitor.show()
sys.exit(app.exec()) sys.exit(app.exec())