0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 11:41:49 +02:00

WIP sync curves updates

This commit is contained in:
2025-01-18 17:29:11 +01:00
parent 3f758e4b08
commit 52dfbf357a
2 changed files with 212 additions and 58 deletions

View File

@ -17,28 +17,35 @@ if TYPE_CHECKING:
logger = bec_logger.logger
class SignalData(BaseModel):
"""The data configuration of a signal in the 1D waveform widget for x and y axis."""
# TODO maybe whole SignalData class is not needed
# class SignalData(BaseModel):
# """The data configuration of a signal in the 1D waveform widget for x and y axis."""
#
# name: str
# entry: str
# unit: Optional[str] = None # todo implement later
# modifier: Optional[str] = None # todo implement later
# limits: Optional[list[float]] = None # todo implement later
# model_config: dict = {"validate_assignment": True}
# noinspection PyDataclass
class DeviceSignal(BaseModel):
"""The configuration of a signal in the 1D waveform widget."""
name: str
entry: str
unit: Optional[str] = None # todo implement later
modifier: Optional[str] = None # todo implement later
limits: Optional[list[float]] = None # todo implement later
model_config: dict = {"validate_assignment": True}
class Signal(BaseModel):
"""The configuration of a signal in the 1D waveform widget."""
source: Optional[str] = None # TODO probably not needed
x: Optional[SignalData] = None # TODO maybe not needed
y: SignalData
z: Optional[SignalData] = None # TODO maybe not needed
dap: Optional[str] = None # TODO utilize differently than in past
# TODO decide which parts will be still needed
# source: Optional[str] = None # TODO probably not needed
# x: Optional[SignalData] = None # TODO maybe not needed
# y: SignalData
# z: Optional[SignalData] = None # TODO maybe not needed
# dap: Optional[str] = None # TODO utilize differently than in past
model_config: dict = {"validate_assignment": True}
# noinspection PyDataclass
class CurveConfig(ConnectionConfig):
parent_id: Optional[str] = Field(None, description="The parent plot of the curve.")
label: Optional[str] = Field(None, description="The label of the curve.")
@ -52,19 +59,19 @@ class CurveConfig(ConnectionConfig):
pen_style: Optional[Literal["solid", "dash", "dot", "dashdot"]] = Field(
"solid", description="The style of the pen of the curve."
)
source: Optional[str] = Field(
None, description="The source of the curve."
source: Literal["device", "dap", "custom"] = Field(
"custom", description="The source of the curve."
) # TODO not needed probably
signals: Optional[Signal] = Field(None, description="The signal of the curve.")
color_map_z: Optional[str] = Field(
"magma", description="The colormap of the curves z gradient.", validate_default=True
)
signal: Optional[DeviceSignal] = Field(None, description="The signal of the curve.")
# color_map_z: Optional[str] = Field(
# "magma", description="The colormap of the curves z gradient.", validate_default=True
# ) #TODO remove, the gradient curves wil be separate
model_config: dict = {"validate_assignment": True}
_validate_color_map_z = field_validator("color_map_z")(
Colors.validate_color_map
) # TODO not needed probably
# _validate_color_map_z = field_validator("color_map_z")(
# Colors.validate_color_map
# ) # TODO not needed probably
_validate_color = field_validator("color")(Colors.validate_color)
_validate_symbol_color = field_validator("symbol_color")(Colors.validate_color)

View File

@ -19,9 +19,10 @@ from bec_widgets.qt_utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.utils import ConnectionConfig
from bec_widgets.utils.colors import set_theme, Colors
from bec_widgets.widgets.plots_next_gen.plot_base import PlotBase
from bec_widgets.widgets.plots_next_gen.waveform.curve import Curve, CurveConfig
from bec_widgets.widgets.plots_next_gen.waveform.curve import Curve, CurveConfig, DeviceSignal
# noinspection PyDataclass
class WaveformConfig(ConnectionConfig):
color_palette: Optional[str] = Field(
"magma", description="The color palette of the figure widget.", validate_default=True
@ -47,7 +48,7 @@ class Waveform(PlotBase):
}
# TODO implement signals
scan_signal_update = Signal()
scan_signal_update = Signal() # TODO maybe rename to async_signal_update
async_signal_update = Signal()
# dap_params_update = Signal(dict, dict)
# dap_summary_update = Signal(dict, dict)
@ -73,9 +74,11 @@ class Waveform(PlotBase):
self.setObjectName("Waveform")
# Curve data
self._curves_data = defaultdict(dict) # TODO needed can be 'device', 'custom','dap'
self._curves_by_class = defaultdict(dict) # TODO needed can be 'device', 'custom','dap'
self._sync_curves = []
self._async_curves = []
self._curves = self.plot_item.curves
self._mode: Literal["sync, async"] = (
self._mode: Literal["sync", "async", "mixed"] = (
"sync" # TODO mode probably not needed as well, both wil be allowed
)
@ -83,7 +86,7 @@ class Waveform(PlotBase):
self.old_scan_id = None
self.scan_id = None
self.scan_item = None
self.current_sources = {"sync": [], "async": []}
self.current_sources = {"sync": [], "async": []} # TODO maybe not needed
self.x_mode = "auto" # TODO maybe default could be 'best_effort'
self._x_axis_mode = {
"name": None,
@ -94,9 +97,9 @@ class Waveform(PlotBase):
# TODO review relevant bec_dispatcher signals
# Scan segment update proxy
# self.proxy_update_plot = pg.SignalProxy(
# self.scan_signal_update, rateLimit=25, slot=self._update_scan_curves
# )
self.proxy_update_plot = pg.SignalProxy(
self.scan_signal_update, rateLimit=25, slot=self.update_sync_curves
)
# self.proxy_update_dap = pg.SignalProxy(
# self.scan_signal_update, rateLimit=25, slot=self.refresh_dap
# )
@ -104,16 +107,28 @@ class Waveform(PlotBase):
# self.autorange_signal.connect(self.auto_range)
# self.bec_dispatcher.connect_slot(self.on_scan_segment, MessageEndpoints.scan_segment())
self.bec_dispatcher.connect_slot(
self.on_scan_segment, MessageEndpoints.scan_segment()
) # TODO probably not needed
# self.bec_dispatcher.connect_slot(
# self.on_scan_segment, MessageEndpoints.scan_segment()
# ) # TODO probably not needed
# Scan status update loop
self.bec_dispatcher.connect_slot(self.on_scan_status, MessageEndpoints.scan_status())
self.bec_dispatcher.connect_slot(self.on_scan_progress, MessageEndpoints.scan_progress())
# Curve update loop
# self.proxy_scan_update = pg.SignalProxy(
# self.scan_signal_update, rateLimit=25, slot=self.update_sync_curves
# ) # TODO implement
# self.proxy_dap_update = pg.SignalProxy(
# self.dap_signal_update, rateLimit=25, slot=self.update_dap_curves
# ) # TODO implement
# self.bec_dispatcher.connect_slot(
# self.async_signal_update, self.update_async_curves
# ) # TODO implement
# TODO test curves
self.plot([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], label="test_curve")
self.plot([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], label="test_curve2")
# self.plot([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], label="test_curve")
# self.plot([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], label="test_curve2")
################################################################################
# Widget Specific Properties
@ -137,6 +152,7 @@ class Waveform(PlotBase):
except Exception:
return
# TODO for loading and setting json rpc_register has to be double checked
@SafeProperty(str)
def curve_json(self) -> str:
"""
@ -200,7 +216,7 @@ class Waveform(PlotBase):
y(list | np.ndarray): Custom y data to plot.
x(list | np.ndarray): Custom y data to plot.
x_name(str): Name of the x signal.
- "best_effort": Use the best effort signal.
- "auto": Use the best effort signal.
- "timestamp": Use the timestamp signal.
- "index": Use the index signal.
- Custom signal name of device from BEC.
@ -218,27 +234,34 @@ class Waveform(PlotBase):
Returns:
Curve: The curve object.
"""
# 1. Custom curve logic
if x is not None and y is not None:
return self.add_curve_custom(x=x, y=y, label=label, color=color, **kwargs)
return self._add_curve_custom(x=x, y=y, label=label, color=color, **kwargs)
if isinstance(arg1, str):
y_name = arg1
elif isinstance(arg1, list):
if isinstance(y, list):
return self.add_curve_custom(x=arg1, y=y, label=label, color=color, **kwargs)
return self._add_curve_custom(x=arg1, y=y, label=label, color=color, **kwargs)
if y is None:
x = np.arange(len(arg1))
return self.add_curve_custom(x=x, y=arg1, label=label, color=color, **kwargs)
return self._add_curve_custom(x=x, y=arg1, label=label, color=color, **kwargs)
elif isinstance(arg1, np.ndarray) and y is None:
if arg1.ndim == 1:
x = np.arange(arg1.size)
return self.add_curve_custom(x=x, y=arg1, label=label, color=color, **kwargs)
return self._add_curve_custom(x=x, y=arg1, label=label, color=color, **kwargs)
if arg1.ndim == 2:
x = arg1[:, 0]
y = arg1[:, 1]
return self.add_curve_custom(x=x, y=y, label=label, color=color, **kwargs)
return self._add_curve_custom(x=x, y=y, label=label, color=color, **kwargs)
if y_name is None:
raise ValueError("y_name must be provided.") # TODO provide logger
# 2. BEC device curve logic
self._add_device_curve(y_name, y_entry) # TODO change y_name and y_entry
# TODO implement x_mode change if putted by user
# FIXME figure out dap logic adding
# TODO implement the plot method
@ -247,9 +270,37 @@ class Waveform(PlotBase):
################################################################################
# TODO implement curve management methods
# TODO for loading and setting json rpc_register has to be double checked
def _add_device_curve(self, device_name: str, device_signal: str):
"""Add BEC Device curve, can be sync(monitored device) or async device."""
# TODO implement signal fetch from BEC if not provided
def add_curve_custom(
# Setup identifiers
source = "device"
curve_id = f"{device_name}-{device_signal}"
# Check if curve already exists
curve_exits = self._check_curve_id(curve_id)
if curve_exits:
raise ValueError(
f"Curve with ID '{curve_id}' already exists in widget '{self.gui_id}'."
) # TODO change to logger
# TODO do device check with BEC if it is loaded
# Create curve by config
color = self._generate_color_from_palette() # TODO check the refresh logic of this
curve_config = CurveConfig(
widget_class="BECCurve",
parent_id=self.gui_id,
label=curve_id,
color=color,
source=source,
signal=DeviceSignal(name=device_name, entry=device_signal),
)
self._add_curve_object(name=curve_id, source=source, config=curve_config)
# TODO consolidate with adding curve object
def _add_curve_custom(
self,
x: list | np.ndarray,
y: list | np.ndarray,
@ -295,7 +346,7 @@ class Waveform(PlotBase):
parent_id=self.gui_id,
label=curve_id,
color=color,
source="custom", # TODO probably not needed
source="custom",
**kwargs,
)
curve = self._add_curve_object(
@ -322,8 +373,22 @@ class Waveform(PlotBase):
Returns:
BECCurve: The curve object.
"""
# curve_exits = self._check_curve_id(config.label)
# if curve_exits:
# raise ValueError(
# f"Curve with ID '{config.label}' already exists in widget '{self.gui_id}'."
# ) # TODO change to logger
#
# color = (
# color
# or Colors.golden_angle_color(
# colormap="magma", # FIXME Config do not have color_palette anymore
# num=max(10, len(self.plot_item.curves) + 1),
# format="HEX",
# )[len(self.plot_item.curves)]
# )
curve = Curve(config=config, name=name, parent_item=self)
self._curves_data[source][name] = curve
self._curves_by_class[source][name] = curve
self.plot_item.addItem(curve)
# self.config.curves[name] = curve.config #TODO will be changed
if data is not None:
@ -331,6 +396,7 @@ class Waveform(PlotBase):
# self.set_legend_label_size() #TODO will be changed
return curve
# TODO decide if needed
def _add_curve(
self,
name: str,
@ -342,6 +408,13 @@ class Waveform(PlotBase):
return curve
def _generate_color_from_palette(self) -> str:
# TODO think about refreshing all colors during this
color = Colors.golden_angle_color(
colormap=self.color_palette, num=max(10, len(self.plot_item.curves) + 1), format="HEX"
)[len(self.plot_item.curves)]
return color
def _remove_curve_by_source(self, source: str):
# TODO consider if this is needed
pass
@ -407,11 +480,13 @@ class Waveform(PlotBase):
# TODO probably not needed
print(f"Scan segment: {msg}")
@SafeSlot(dict)
def on_scan_status(self, msg: dict):
@SafeSlot(dict, dict)
def on_scan_status(self, msg: dict, meta: dict):
print(f"Scan status: {msg}")
print(f"Scan status meta: {meta}")
current_scan_id = msg.get("scan_id", None)
if current_scan_id is None:
readout_priority = msg.get("readout_priority", None)
if current_scan_id is None or readout_priority is None:
return
if current_scan_id != self.scan_id:
@ -424,14 +499,85 @@ class Waveform(PlotBase):
self.scan_id = current_scan_id
self.scan_item = self.queue.scan_storage.find_scan_by_ID(self.scan_id)
# First trigger to sync and async data
self.scan_signal_update.emit()
self.async_signal_update.emit() # TODO decide if needed here actually, maybe should be for setup
self._mode = self._categorise_device_curves(readout_priority)
@SafeSlot(dict)
def on_scan_progress(self, msg: dict):
# First trigger to sync and async data
if self._mode == "sync":
self.scan_signal_update.emit()
elif self._mode == "async":
# TODO sync should be setup to new scan id
self.async_signal_update.emit()
else:
self.scan_signal_update.emit()
self.async_signal_update.emit()
# TODO scan progress update loop triggering curve updates
@SafeSlot(dict, dict)
def on_scan_progress(self, msg: dict, meta: dict, *args, **kwargs):
print(f"Scan progress: {msg}")
data = msg
print(f"Scan progress meta: {meta}")
self.scan_signal_update.emit()
def update_sync_curves(self):
print("Updating sync curves")
try:
data = (
self.scan_item.live_data
if hasattr(self.scan_item, "live_data") # backward compatibility
else self.scan_item.data
)
except AttributeError:
return
for curve in self._sync_curves:
device_name = curve.config.signal.name
device_entry = curve.config.signal.entry
device_data = data[device_name][device_entry].val
x_name = self.scan_item.status_message.info["scan_report_devices"][0]
# TODO logic for x_entry
x_entry = x_name
x_data = data[x_name][x_entry].val
curve.setData(x_data, device_data)
def _categorise_device_curves(self, readout_priority: dict) -> str:
# Reset sync/async curve lists
self._async_curves = []
self._sync_curves = []
found_async = False
found_sync = False
mode = "sync"
readout_priority_async = readout_priority.get("async", [])
readout_priority_sync = readout_priority.get("monitored", [])
# Iterate over all curves
for curve_id, curve in self._curves_by_class["device"].items():
dev_name = curve.config.signal.name
if dev_name in readout_priority_async:
self._async_curves.append(curve)
found_async = True
elif dev_name in readout_priority_sync:
self._sync_curves.append(curve)
found_sync = True
else:
print(
f"Device {dev_name} not found in readout priority list."
) # TODO change to logger
# Determine mode of the scan
if found_async and found_sync:
mode = "mixed"
print(
f"Found both async and sync devices in the scan. X-axis integrity cannot be guaranteed."
) # TODO change to logger
# TODO do some prompt to user to decide which mode to use
elif found_async:
mode = "async"
elif found_sync:
mode = "sync"
else:
mode = "sync"
return mode
################################################################################
# Export Methods
@ -447,5 +593,6 @@ if __name__ == "__main__":
set_theme("dark")
widget = Waveform()
widget.show()
widget.plot(y_name="bpm4i", y_entry="bpm4i")
widget.plot(y_name="bpm3a", y_entry="bpm3a")
sys.exit(app.exec_())