mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 11:41:49 +02:00
feat: monitor.py can access custom data send through redis
This commit is contained in:
@ -237,6 +237,35 @@ CONFIG_SIMPLE = {
|
||||
],
|
||||
}
|
||||
|
||||
CONFIG_REDIS = {
|
||||
"plot_settings": {
|
||||
"background_color": "white",
|
||||
"axis_width": 2,
|
||||
"num_columns": 5,
|
||||
"colormap": "plasma",
|
||||
"scan_types": False,
|
||||
},
|
||||
"plot_data": [
|
||||
{
|
||||
"plot_name": "BPM4i plots vs samx",
|
||||
"x_label": "Motor Y",
|
||||
"y_label": "bpm4i",
|
||||
"sources": [
|
||||
{
|
||||
"type": "scan_segment",
|
||||
"signals": {"x": [{"name": "samx"}], "y": [{"name": "gauss_bpm"}]},
|
||||
},
|
||||
{
|
||||
"type": "redis",
|
||||
"endpoint": "public/gui/data/6cd5ea3f-a9a9-4736-b4ed-74ab9edfb996",
|
||||
"update": "append",
|
||||
"signals": {"x": [{"name": "x_default_tag"}], "y": [{"name": "y_default_tag"}]},
|
||||
},
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
class BECMonitor(pg.GraphicsLayoutWidget):
|
||||
update_signal = pyqtSignal()
|
||||
@ -270,6 +299,7 @@ class BECMonitor(pg.GraphicsLayoutWidget):
|
||||
bec_dispatcher.connect_slot(
|
||||
self.on_instruction, MessageEndpoints.gui_instructions(self.gui_id)
|
||||
)
|
||||
bec_dispatcher.connect_slot(self.on_data_from_redis, MessageEndpoints.gui_data(self.gui_id))
|
||||
|
||||
# Current configuration
|
||||
self.config = config
|
||||
@ -292,7 +322,7 @@ class BECMonitor(pg.GraphicsLayoutWidget):
|
||||
|
||||
# Connect the update signal to the update plot method
|
||||
self.proxy_update_plot = pg.SignalProxy(
|
||||
self.update_signal, rateLimit=25, slot=self.update_plot
|
||||
self.update_signal, rateLimit=25, slot=self.update_scan_segment_plot
|
||||
)
|
||||
|
||||
# Init UI
|
||||
@ -322,19 +352,23 @@ class BECMonitor(pg.GraphicsLayoutWidget):
|
||||
# Initialize the UI
|
||||
self._init_ui(self.plot_settings["num_columns"])
|
||||
|
||||
def _init_database(self, plot_data_config: dict) -> dict:
|
||||
def _init_database(self, plot_data_config: dict, source_type_to_init=None) -> dict:
|
||||
"""
|
||||
Initializes the database for the PlotApp.
|
||||
Initializes or updates the database for the PlotApp.
|
||||
Args:
|
||||
plot_data_config(dict): Configuration settings for plots
|
||||
plot_data_config(dict): Configuration settings for plots.
|
||||
source_type_to_init(str, optional): Specific source type to initialize. If None, initialize all.
|
||||
Returns:
|
||||
dict: Database dictionary
|
||||
dict: Updated or new database dictionary.
|
||||
"""
|
||||
database = {}
|
||||
database = {} if source_type_to_init is None else self.database.copy()
|
||||
|
||||
for plot in plot_data_config:
|
||||
for source in plot["sources"]:
|
||||
source_type = source["type"]
|
||||
if source_type_to_init and source_type != source_type_to_init:
|
||||
continue # Skip if not the specified source type
|
||||
|
||||
if source_type not in database:
|
||||
database[source_type] = {}
|
||||
|
||||
@ -342,7 +376,6 @@ class BECMonitor(pg.GraphicsLayoutWidget):
|
||||
for signal in signals:
|
||||
name = signal["name"]
|
||||
entry = signal.get("entry", name)
|
||||
|
||||
if name not in database[source_type]:
|
||||
database[source_type][name] = {}
|
||||
if entry not in database[source_type][name]:
|
||||
@ -449,14 +482,9 @@ class BECMonitor(pg.GraphicsLayoutWidget):
|
||||
|
||||
def init_curves(self) -> None:
|
||||
"""
|
||||
Initialize curve data and properties, and update table row labels.
|
||||
|
||||
This method initializes a nested dictionary `self.curves_data` to store
|
||||
the curve objects for each x and y signal pair. It also updates the row labels
|
||||
in `self.tableWidget_crosshair` to include the grid position for each y-value.
|
||||
Initialize curve data and properties for each plot and data source.
|
||||
"""
|
||||
self.curves_data = {}
|
||||
row_labels = []
|
||||
|
||||
for idx, plot_config in enumerate(self.plot_data):
|
||||
plot_name = plot_config.get("plot_name", "")
|
||||
@ -464,28 +492,30 @@ class BECMonitor(pg.GraphicsLayoutWidget):
|
||||
plot.clear()
|
||||
|
||||
for source in plot_config["sources"]:
|
||||
source_type = source["type"][0]
|
||||
source_type = source["type"]
|
||||
y_signals = source["signals"].get("y", [])
|
||||
colors_ys = Colors.golden_angle_color(
|
||||
colormap=self.plot_settings["colormap"], num=len(y_signals)
|
||||
)
|
||||
|
||||
curve_list = []
|
||||
if source_type not in self.curves_data:
|
||||
self.curves_data[source_type] = {}
|
||||
if plot_name not in self.curves_data[source_type]:
|
||||
self.curves_data[source_type][plot_name] = []
|
||||
|
||||
for i, (y_signal, color) in enumerate(zip(y_signals, colors_ys)):
|
||||
y_name = y_signal["name"]
|
||||
y_entry = y_signal.get("entry", y_name)
|
||||
|
||||
curve_name = f"{y_name} ({y_entry})-{source_type.upper()}"
|
||||
curve_data = self.create_curve(curve_name, color)
|
||||
curve_list.append((y_name, y_entry, curve_data))
|
||||
plot.addItem(curve_data)
|
||||
row_labels.append(f"{y_name} ({y_entry}) - {plot_name}")
|
||||
self.curves_data[source_type][plot_name].append((y_name, y_entry, curve_data))
|
||||
|
||||
self.curves_data[plot_name] = curve_list
|
||||
|
||||
# Hook Crosshair
|
||||
if self.enable_crosshair is True:
|
||||
self.hook_crosshair()
|
||||
# Render static plot elements
|
||||
self.update_plot()
|
||||
# # Hook Crosshair #TODO enable later, currently not working
|
||||
# if self.enable_crosshair is True:
|
||||
# self.hook_crosshair()
|
||||
|
||||
def create_curve(self, curve_name: str, color: str) -> pg.PlotDataItem:
|
||||
"""
|
||||
@ -518,24 +548,36 @@ class BECMonitor(pg.GraphicsLayoutWidget):
|
||||
crosshair = Crosshair(plot, precision=3)
|
||||
self.crosshairs[plot_name] = crosshair
|
||||
|
||||
def update_plot(self) -> None:
|
||||
"""Update the plot data based on the stored data dictionary."""
|
||||
for plot_name, curve_list in self.curves_data.items():
|
||||
def update_scan_segment_plot(self):
|
||||
"""
|
||||
Update the plot with the latest scan segment data.
|
||||
"""
|
||||
self.update_plot(source_type="scan_segment")
|
||||
|
||||
def update_plot(self, source_type=None) -> None:
|
||||
"""
|
||||
Update the plot data based on the stored data dictionary.
|
||||
Only updates data for the specified source_type if provided.
|
||||
"""
|
||||
for src_type, plots in self.curves_data.items():
|
||||
if source_type and src_type != source_type:
|
||||
continue
|
||||
|
||||
for plot_name, curve_list in plots.items():
|
||||
plot_config = next(
|
||||
(pc for pc in self.plot_data if pc.get("plot_name") == plot_name), None
|
||||
)
|
||||
if not plot_config:
|
||||
continue
|
||||
|
||||
x_name, x_entry = self.extract_x_config(plot_config)
|
||||
x_name, x_entry = self.extract_x_config(plot_config, src_type)
|
||||
|
||||
for y_name, y_entry, curve in curve_list:
|
||||
data_x = self.database.get("scan_segment", {}).get(x_name, {}).get(x_entry, [])
|
||||
data_y = self.database.get("scan_segment", {}).get(y_name, {}).get(y_entry, [])
|
||||
|
||||
data_x = self.database.get(src_type, {}).get(x_name, {}).get(x_entry, [])
|
||||
data_y = self.database.get(src_type, {}).get(y_name, {}).get(y_entry, [])
|
||||
curve.setData(data_x, data_y)
|
||||
|
||||
def extract_x_config(self, plot_config: dict) -> tuple:
|
||||
def extract_x_config(self, plot_config: dict, source_type: str) -> tuple:
|
||||
"""Extract the signal configurations for x and y axes from plot_config.
|
||||
Args:
|
||||
plot_config (dict): Plot configuration.
|
||||
@ -545,11 +587,10 @@ class BECMonitor(pg.GraphicsLayoutWidget):
|
||||
x_name, x_entry = None, None
|
||||
|
||||
for source in plot_config["sources"]:
|
||||
if "x" in source["signals"]:
|
||||
if source["type"] == source_type and "x" in source["signals"]:
|
||||
x_signal = source["signals"]["x"][0]
|
||||
x_name = x_signal.get("name")
|
||||
x_entry = x_signal.get("entry")
|
||||
|
||||
x_entry = x_signal.get("entry", x_name)
|
||||
return x_name, x_entry
|
||||
|
||||
def get_config(self):
|
||||
@ -644,10 +685,26 @@ class BECMonitor(pg.GraphicsLayoutWidget):
|
||||
|
||||
return formatted_error_message
|
||||
|
||||
def flush(self) -> None:
|
||||
"""Flush the data dictionary (legacy) and recreate the database."""
|
||||
def flush(self, flush_all=False, source_type_to_flush=None) -> None:
|
||||
"""Update or reset the database to match the current configuration.
|
||||
|
||||
Args:
|
||||
flush_all (bool): If True, reset the entire database.
|
||||
source_type_to_flush (str): Specific source type to reset. Ignored if flush_all is True.
|
||||
"""
|
||||
if flush_all:
|
||||
self.database = self._init_database(self.plot_data)
|
||||
self.init_curves()
|
||||
else:
|
||||
if source_type_to_flush in self.database:
|
||||
# TODO maybe reinit the database from config again instead of cycle through names/entries
|
||||
# Reset only the specified source type
|
||||
for name in self.database[source_type_to_flush]:
|
||||
for entry in self.database[source_type_to_flush][name]:
|
||||
self.database[source_type_to_flush][name][entry] = []
|
||||
# Reset curves for the specified source type
|
||||
if source_type_to_flush in self.curves_data:
|
||||
self.init_curves()
|
||||
|
||||
@pyqtSlot(dict, dict)
|
||||
def on_scan_segment(self, msg: dict, metadata: dict):
|
||||
@ -687,7 +744,7 @@ class BECMonitor(pg.GraphicsLayoutWidget):
|
||||
if not self.scan_data:
|
||||
print(f"No data found for scanID: {self.scanID}") # TODO better error
|
||||
return
|
||||
self.flush()
|
||||
self.flush(source_type_to_flush="scan_segment")
|
||||
|
||||
self.scan_segment_update()
|
||||
|
||||
@ -706,6 +763,34 @@ class BECMonitor(pg.GraphicsLayoutWidget):
|
||||
else:
|
||||
print(f"No data found for {device_name} {entry}")
|
||||
|
||||
@pyqtSlot(dict)
|
||||
def on_data_from_redis(self, msg) -> None:
|
||||
"""
|
||||
Handle new data sent from redis.
|
||||
Args:
|
||||
msg (dict): Message received with data.
|
||||
"""
|
||||
|
||||
self._init_database(
|
||||
self.plot_data, source_type_to_init="redis"
|
||||
) # add database entry for redis dataset
|
||||
|
||||
data = msg.get("data", {})
|
||||
x_data = data.get("x", {})
|
||||
y_data = data.get("y", {})
|
||||
|
||||
# Update x data
|
||||
if x_data:
|
||||
x_tag = x_data.get("tag")
|
||||
self.database["redis"][x_tag][x_tag] = x_data["data"]
|
||||
|
||||
# Update y data
|
||||
for y_tag, y_info in y_data.items():
|
||||
self.database["redis"][y_tag][y_tag] = y_info["data"]
|
||||
|
||||
# Trigger plot update
|
||||
self.update_plot(source_type="redis")
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
import argparse
|
||||
@ -725,7 +810,7 @@ if __name__ == "__main__": # pragma: no cover
|
||||
# Load config from file
|
||||
config = load_yaml(args.config_file)
|
||||
else:
|
||||
config = CONFIG_SCAN_MODE
|
||||
config = CONFIG_REDIS
|
||||
|
||||
client = bec_dispatcher.client
|
||||
client.start()
|
||||
@ -736,4 +821,10 @@ if __name__ == "__main__": # pragma: no cover
|
||||
skip_validation=False,
|
||||
)
|
||||
monitor.show()
|
||||
# just to test redis data
|
||||
# redis_data = {
|
||||
# "x": {"data": [1, 2, 3], "tag": "x_default_tag"},
|
||||
# "y": {"y_default_tag": {"data": [1, 2, 3]}},
|
||||
# }
|
||||
# monitor.on_data_from_redis({"data": redis_data})
|
||||
sys.exit(app.exec())
|
||||
|
Reference in New Issue
Block a user