0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 11:41:49 +02:00

WIP Waveform signal for async and sync update adjusted

This commit is contained in:
2025-01-27 15:12:44 +01:00
parent 3cffe81d6b
commit 90a27f2b07

View File

@ -36,25 +36,15 @@ class WaveformConfig(ConnectionConfig):
class Waveform(PlotBase):
PLUGIN = True
INI_SETTING = True
ICON_NAME = "show_chart"
READOUT_PRIORITY_HANDLER = {
ReadoutPriority.ON_REQUEST: "on_request",
ReadoutPriority.BASELINE: "baseline",
ReadoutPriority.MONITORED: "monitored",
ReadoutPriority.ASYNC: "async",
ReadoutPriority.CONTINUOUS: "continuous",
}
# TODO implement signals
scan_signal_update = Signal() # TODO maybe rename to sync_signal_update
sync_signal_update = Signal()
async_signal_update = Signal()
request_dap_update = Signal()
unblock_dap_proxy = Signal()
# dap_signal_update = Signal() #TODO not needed probably
dap_params_update = Signal(dict, dict)
dap_summary_update = Signal(dict, dict)
# autorange_signal = Signal()
new_scan = Signal()
new_scan_id = Signal(str)
@ -104,24 +94,17 @@ class Waveform(PlotBase):
self.bec_dispatcher.connect_slot(self.on_scan_progress, MessageEndpoints.scan_progress())
# Curve update loop
# TODO review relevant bec_dispatcher signals
self.proxy_update_plot = pg.SignalProxy(
self.scan_signal_update, rateLimit=25, slot=self.update_sync_curves
self.proxy_update_sync = pg.SignalProxy(
self.sync_signal_update, rateLimit=25, slot=self.update_sync_curves
)
self.proxy_update_async = pg.SignalProxy(
self.async_signal_update, rateLimit=25, slot=self.update_async_curves
)
self.proxy_dap_request = BECSignalProxy(
self.request_dap_update, rateLimit=25, slot=self.request_dap, timeout=10.0
)
self.unblock_dap_proxy.connect(self.proxy_dap_request.unblock_proxy)
# TODO implement bec proxy to request dap update
# self.async_signal_update.connect(self.replot_async_curve)
# self.autorange_signal.connect(self.auto_range)
# self.proxy_dap_update = pg.SignalProxy(
# self.dap_signal_update, rateLimit=25, slot=self.update_dap_curves
# ) # TODO implement
# self.bec_dispatcher.connect_slot(
# self.async_signal_update, self.update_async_curves
# ) # TODO implement
self.scan_history(-1)
################################################################################
@ -233,12 +216,12 @@ class Waveform(PlotBase):
@x_mode.setter
def x_mode(self, value: str):
# FIXME wrong update of the label
self._x_axis_mode["name"] = value
self._switch_x_axis_item(mode=value)
# self._update_x_label_suffix() # TODO update straight away or wait for the next scan??
self.async_signal_update.emit()
self.scan_signal_update.emit()
self.request_dap_update.emit()
self.sync_signal_update.emit()
self.plot_item.enableAutoRange(x=True)
@SafeProperty(str)
@ -320,8 +303,7 @@ class Waveform(PlotBase):
y_entry: str | None = None,
color: str | None = None,
label: str | None = None,
validate: bool = True, # TODO global vs local validation rules
dap: str | None = None, # TODO add dap custom curve wrapper
dap: str | None = None,
**kwargs,
) -> Curve:
# TODO review the docstring
@ -342,7 +324,6 @@ class Waveform(PlotBase):
y_entry(str): The name of the entry for the y-axis.
color(str): The color of the curve.
label(str): The label of the curve.
validate(bool): If True, validate the device names and entries.
dap(str): The dap model to use for the curve, only available for sync devices. If not specified, none will be added.
Returns:
@ -381,9 +362,6 @@ class Waveform(PlotBase):
x_data = arg1[:, 0]
y_data = arg1[:, 1]
# if y_name is None:
# raise ValueError("y_name must be provided.")
# If y_name is set => device data
if y_name is not None and x_data is None and y_data is None:
source = "device"
@ -432,7 +410,6 @@ class Waveform(PlotBase):
################################################################################
# Curve Management Methods
# TODO has to be tested
def add_dap_curve(
self, device_label: str, dap_name: str, color: str | None = None, **kwargs
) -> Curve:
@ -490,9 +467,6 @@ class Waveform(PlotBase):
# 4) Create the DAP curve config using `_add_curve(...)`
dap_curve = self._add_curve(config=config)
# 5) Immediately request a DAP update (this can trigger the pipeline)
self.request_dap_update.emit()
return dap_curve
def _add_curve(
@ -544,17 +518,14 @@ class Waveform(PlotBase):
curve.setData(x_data, y_data)
# If device => schedule BEC updates
# self._categorise_device_curves() #TODO has to be cathergorised upon creation at least with the latest scan item
if config.source == "device":
if self.scan_item is None:
self.scan_history(-1)
# self.async_signal_update.emit() #TODO not needed probably
self.update_async_curves()
self.update_sync_curves()
# self.scan_signal_update.emit() #TODO not needed probably
self.async_signal_update.emit()
self.sync_signal_update.emit()
if config.source == "dap":
self.setup_dap_for_scan()
self.request_dap_update.emit()
self.request_dap() # Request DAP update directly without blocking proxy
return curve
@ -645,7 +616,8 @@ class Waveform(PlotBase):
self._curve_clean_up(curve)
else:
raise IndexError(f"Curve order {N} out of range.") # TODO can be logged
logger.error(f"Curve order {N} out of range.")
raise IndexError(f"Curve order {N} out of range.")
def _curve_clean_up(self, curve: Curve):
"""
@ -680,20 +652,6 @@ class Waveform(PlotBase):
return True
return False
# TODO extend and implement
def _get_device_readout_priority(self, name: str):
"""
Get the type of device from the entry_validator.
Args:
name(str): Name of the device.
entry(str): Entry of the device.
Returns:
str: Type of the device.
"""
return self.READOUT_PRIORITY_HANDLER[self.dev[name].readout_priority]
def _find_curve_by_label(self, label: str) -> Curve | None:
"""
Find a curve by its label.
@ -712,8 +670,6 @@ class Waveform(PlotBase):
################################################################################
# BEC Update Methods
################################################################################
# TODO here will go bec related update slots
@SafeSlot(dict, dict)
def on_scan_status(self, msg: dict, meta: dict):
"""
@ -725,10 +681,7 @@ class Waveform(PlotBase):
meta(dict): The message metadata.
"""
current_scan_id = msg.get("scan_id", None)
print(f"Current scan id: {current_scan_id}") # TODO change to logger
# readout_priority = msg.get("readout_priority", None)
# print(readout_priority)#TODO consider removing readout priority here
if current_scan_id is None: # or readout_priority is None:
if current_scan_id is None:
return
if current_scan_id != self.scan_id:
@ -745,7 +698,7 @@ class Waveform(PlotBase):
# First trigger to sync and async data
if self._mode == "sync":
self.scan_signal_update.emit()
self.sync_signal_update.emit()
logger.info("Scan status: Sync mode")
elif self._mode == "async":
for curve in self._async_curves:
@ -753,7 +706,7 @@ class Waveform(PlotBase):
self.async_signal_update.emit()
logger.info("Scan status: Async mode")
else:
self.scan_signal_update.emit()
self.sync_signal_update.emit()
for curve in self._async_curves:
self._setup_async_curve(curve)
self.async_signal_update.emit()
@ -770,7 +723,7 @@ class Waveform(PlotBase):
msg(dict): The message content.
meta(dict): The message metadata.
"""
self.scan_signal_update.emit()
self.sync_signal_update.emit()
def _fetch_scan_data_and_access(self):
"""
@ -790,12 +743,8 @@ class Waveform(PlotBase):
return self.scan_item.live_data, "val"
else:
# Historical
# If we haven't cached the read data, do it now
# TODO looks like bug in BEC has to be discussed, so far only sync devices are accessible by history
# async_devices = self.scan_item.readout_groups.async_devices.read()
sync_devices = self.scan_item.readout_groups.monitored_devices.read()
# all_devices = {**async_devices, **sync_devices}
return (sync_devices, "value")
scan_devices = self.scan_item.devices
return (scan_devices, "value")
# @SafeSlot() #TODO from some reason TypeError: Waveform.update_sync_curves() takes 1 positional argument but 2 were given
def update_sync_curves(self):
@ -806,7 +755,12 @@ class Waveform(PlotBase):
for curve in self._sync_curves:
device_name = curve.config.signal.name
device_entry = curve.config.signal.entry
device_data = data.get(device_name, {}).get(device_entry, {}).get(access_key, None)
if access_key == "val":
device_data = data.get(device_name, {}).get(device_entry, {}).get(access_key, None)
else:
device_data = (
data.get(device_name, {}).get(device_entry, {}).read().get("value", None)
)
x_data = self._get_x_data(device_name, device_entry)
# TODO check logic for x data
if len(data) == 0: # case if the data is empty because motor is not scanned
@ -815,10 +769,9 @@ class Waveform(PlotBase):
curve.setData(x_data, device_data)
if device_data is not None and x_data is None:
curve.setData(device_data)
self.request_dap_update.emit() # TODO enable later
self.request_dap_update.emit()
def update_async_curves(self):
# TODO SO FAR DO NOT WORK, ASYNC not accessible from bec history due to lazy dict
"""
Manually load data for asynchronous device curves (in history scenario)
or re-check in live data if needed. For live scanning, typically real-time
@ -832,7 +785,12 @@ class Waveform(PlotBase):
for curve in self._async_curves:
device_name = curve.config.signal.name
device_entry = curve.config.signal.entry
device_data = data.get(device_name, {}).get(device_entry, {}).get(access_key, None)
if access_key == "val":
device_data = data.get(device_name, {}).get(device_entry, {}).get(access_key, None)
else:
device_data = (
data.get(device_name, {}).get(device_entry, {}).read().get("value", None)
)
x_data = self._get_x_data(device_name, device_entry)
@ -842,6 +800,7 @@ class Waveform(PlotBase):
curve.setData(x_data, device_data)
else:
curve.setData(device_data)
self.request_dap_update.emit()
def _setup_async_curve(self, curve: Curve):
name = curve.config.signal.name
@ -857,7 +816,7 @@ class Waveform(PlotBase):
MessageEndpoints.device_async_readback(self.scan_id, name),
from_start=True,
)
print(f"Setup async curve {name}") # TODO change to logger
logger.info(f"Setup async curve {name}")
@SafeSlot(dict, dict)
def on_async_readback(self, msg, metadata):
@ -914,6 +873,7 @@ class Waveform(PlotBase):
# @SafeSlot() #FIXME type error
def request_dap(self):
"""Request new fit for data"""
for dap_curve in self._dap_curves:
parent_label = getattr(dap_curve.config, "parent_label", None)
if not parent_label:
@ -934,18 +894,12 @@ class Waveform(PlotBase):
x_min = None
x_max = None
print(f"x_min: {x_min}, x_max: {x_max}")
# TODO implement DAP logic
msg = messages.DAPRequestMessage(
dap_cls="LmfitService1D",
dap_type="on_demand",
config={
"args": [],
"kwargs": {
"data_x": x_data,
"data_y": y_data,
}, # TODO add xmin,xmax as before -> so far do not work
"kwargs": {"data_x": x_data, "data_y": y_data},
"class_args": model._plugin_info["class_args"],
"class_kwargs": model._plugin_info["class_kwargs"],
"curve_label": dap_curve.name(),
@ -957,7 +911,6 @@ class Waveform(PlotBase):
@SafeSlot(dict, dict)
def update_dap_curves(self, msg, metadata):
"""Callback for DAP response message."""
print("Update DAP curves") # TODO change to logger
self.unblock_dap_proxy.emit()
msg_config = msg.get("dap_request", None).content.get("config", {})
@ -1005,7 +958,10 @@ class Waveform(PlotBase):
if x_entry is None:
x_entry = self.entry_validator.validate_signal(x_name, None)
# if the motor was not scanned, an empty list is returned and curves are not updated
x_data = data.get(x_name, {}).get(x_entry, {}).get(access_key, [])
if access_key == "val":
x_data = data.get(x_name, {}).get(x_entry, {}).get(access_key, None)
else:
x_data = data.get(x_name, {}).get(x_entry, {}).read().get("value", None)
new_suffix = f" [custom: {x_name}-{x_entry}]"
# 2 User wants timestamp
@ -1030,7 +986,6 @@ class Waveform(PlotBase):
self._update_x_label_suffix(new_suffix)
# 4.2 If there are sync curves, use the first device from the scan report
else:
# x_name = self.scan_item.status_message.info["scan_report_devices"][0] #TODO remove old access pattern
try:
x_name = self._ensure_str_list(
self.scan_item.metadata["bec"]["scan_report_devices"]
@ -1038,7 +993,10 @@ class Waveform(PlotBase):
except:
x_name = self.scan_item.status_message.info["scan_report_devices"][0]
x_entry = self.entry_validator.validate_signal(x_name, None)
x_data = data.get(x_name, {}).get(x_entry, {}).get(access_key, None)
if access_key == "val":
x_data = data.get(x_name, {}).get(x_entry, {}).get(access_key, None)
else:
x_data = data.get(x_name, {}).get(x_entry, {}).read().get("value", None)
new_suffix = f" [auto: {x_name}-{x_entry}]"
self._update_x_label_suffix(new_suffix)
return x_data
@ -1079,7 +1037,7 @@ class Waveform(PlotBase):
- "best_effort": Use the best effort signal.
- Custom signal name of device from BEC.
"""
print(f'Switching x-axis mode to "{mode}"') # TODO change to logger
logger.info(f'Switching x-axis mode to "{mode}"')
date_axis = pg.graphicsItems.DateAxisItem.DateAxisItem(orientation="bottom")
default_axis = pg.AxisItem(orientation="bottom")
if mode == "timestamp":
@ -1087,7 +1045,6 @@ class Waveform(PlotBase):
else:
self.plot_item.setAxisItems({"bottom": default_axis})
# TODO remove readout priority from the method
def _categorise_device_curves(self) -> str:
"""
Categorise the device curves into sync and async based on the readout priority.
@ -1168,14 +1125,9 @@ class Waveform(PlotBase):
self._categorise_device_curves()
# TODO refresh plotting data
self.setup_dap_for_scan()
self.update_sync_curves()
self.update_async_curves()
# self.scan_signal_update.emit()
# self.async_signal_update.emit()
self.request_dap_update.emit() # TODO enable later
self.request_dap()
self.sync_signal_update.emit()
self.async_signal_update.emit()
################################################################################
# Utility Methods
@ -1245,7 +1197,6 @@ if __name__ == "__main__":
widget = Waveform()
widget.show()
widget.plot(y_name="bpm4i", y_entry="bpm4i", dap="GaussianModel")
# widget.plot("bullshit")
# widget.plot(y_name="bpm4i", y_entry="bpm4i")
# widget.plot(y_name="bpm3a", y_entry="bpm3a")
sys.exit(app.exec_())