0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 11:41:49 +02:00

WIP dap is updated after adding curve with plot

This commit is contained in:
2025-01-26 13:31:17 +01:00
parent 9e9399db23
commit 4b454ecdff

View File

@ -87,6 +87,7 @@ class Waveform(PlotBase):
self.old_scan_id = None
self.scan_id = None
self.scan_item = None
self.readout_priority = None
self._x_axis_mode = {
"name": "auto",
"entry": None,
@ -103,11 +104,15 @@ class Waveform(PlotBase):
self.proxy_update_plot = pg.SignalProxy(
self.scan_signal_update, rateLimit=25, slot=self.update_sync_curves
)
self.proxy_dap_request = BECSignalProxy(
# TODO this is porper blocking proxy
# self.proxy_dap_request = BECSignalProxy(
# self.request_dap_update, rateLimit=25, slot=self.request_dap
# )
# self.unblock_dap_proxy.connect(self.proxy_dap_request.unblock_proxy)
self.proxy_dap_request = pg.SignalProxy(
self.request_dap_update, rateLimit=25, slot=self.request_dap
)
self.unblock_dap_proxy.connect(self.proxy_dap_request.unblock_proxy)
# TODO implement bec proxy to request dap update
# self.async_signal_update.connect(self.replot_async_curve)
# self.autorange_signal.connect(self.auto_range)
@ -117,6 +122,7 @@ class Waveform(PlotBase):
# self.bec_dispatcher.connect_slot(
# self.async_signal_update, self.update_async_curves
# ) # TODO implement
self.scan_history(-1)
################################################################################
# Widget Specific Properties
@ -249,35 +255,35 @@ class Waveform(PlotBase):
y_data = None
# 1. Custom curve logic
if x is not None and y is not None:
# 1.1 both x and y defined
if x is not None and y is not None:
source = "custom"
x_data = np.asarray(x)
y_data = np.asarray(y)
# 1.2 If user gave only arg1 + y, interpret arg1 as x_data
elif isinstance(arg1, (list, np.ndarray)) and isinstance(y, (list, np.ndarray)):
if isinstance(arg1, str):
y_name = arg1
elif isinstance(arg1, list):
if isinstance(y, list):
source = "custom"
x_data = np.asarray(arg1)
y_data = np.asarray(y)
# 1.3 If user gave only arg1 as array => treat as y_data with x=range(len(y_data))
elif isinstance(arg1, (list, np.ndarray)) and y is None:
if y is None:
source = "custom"
arr = np.asarray(arg1)
x_data = np.arange(len(arr))
y_data = arr
# 1.4 If user gave arg1 as a 2D array => interpret columns as x_data, y_data
if isinstance(arg1, np.ndarray) and arg1.ndim == 2 and arg1.shape[1] == 2 and y is None:
elif isinstance(arg1, np.ndarray) and y is None:
if arg1.ndim == 1:
source = "custom"
x_data = np.arange(len(arg1))
y_data = arg1
if arg1.ndim == 2 and arg1.shape[1] == 2:
source = "custom"
x_data = arg1[:, 0]
y_data = arg1[:, 1]
# 2. If arg1 is a string => interpret as y_name => device data
if isinstance(arg1, str):
y_name = arg1
# if y_name is None:
# raise ValueError("y_name must be provided.")
# If y_name is set => device data
if y_name is not None and x_data is None and y_data is None:
@ -384,10 +390,10 @@ class Waveform(PlotBase):
# 4) Create the DAP curve config using `_add_curve(...)`
dap_curve = self._add_curve(config=config)
# self._dap_curves.append(dap_curve)
# 5) Immediately request a DAP update (this can trigger the pipeline)
# self.request_dap_update.emit() #FIXME implement this again when blocking proxy will have timeout limit
print(f"Added DAP curve '{dap_label}'") # TODO change to logger
# self.request_dap_update.emit() # FIXME implement this again when blocking proxy will have timeout limit
return dap_curve
@ -440,9 +446,17 @@ class Waveform(PlotBase):
curve.setData(x_data, y_data)
# If device => schedule BEC updates
# self._categorise_device_curves() #TODO has to be cathergorised upon creation at least with the latest scan item
if config.source == "device":
self.async_signal_update.emit()
self.scan_signal_update.emit()
if self.scan_item is None:
self.scan_history(-1)
# self.async_signal_update.emit() #TODO not needed probably
self.update_async_curves()
self.update_sync_curves()
# self.scan_signal_update.emit() #TODO not needed probably
if config.source == "dap":
self.setup_dap_for_scan()
self.request_dap_update.emit()
return curve
@ -459,6 +473,7 @@ class Waveform(PlotBase):
"""
curve = Curve(config=config, name=name, parent_item=self)
self.plot_item.addItem(curve)
self._categorise_device_curves()
return curve
def _generate_color_from_palette(self) -> str:
@ -612,8 +627,10 @@ class Waveform(PlotBase):
meta(dict): The message metadata.
"""
current_scan_id = msg.get("scan_id", None)
readout_priority = msg.get("readout_priority", None)
if current_scan_id is None or readout_priority is None:
print(f"Current scan id: {current_scan_id}") # TODO change to logger
# readout_priority = msg.get("readout_priority", None)
# print(readout_priority)#TODO consider removing readout priority here
if current_scan_id is None: # or readout_priority is None:
return
if current_scan_id != self.scan_id:
@ -624,25 +641,26 @@ class Waveform(PlotBase):
self.auto_range_y = True
self.old_scan_id = self.scan_id
self.scan_id = current_scan_id
self.scan_item = self.queue.scan_storage.find_scan_by_ID(self.scan_id)
self.scan_item = self.queue.scan_storage.find_scan_by_ID(self.scan_id) # live scan
self._mode = self._categorise_device_curves(readout_priority)
self._mode = self._categorise_device_curves()
# First trigger to sync and async data
if self._mode == "sync":
self.scan_signal_update.emit()
print("Sync mode") # TODO change to logger
logger.info("Scan status: Sync mode")
elif self._mode == "async":
for curve in self._async_curves:
self._setup_async_curve(curve)
self.async_signal_update.emit()
print("Async mode") # TODO change to logger
logger.info("Scan status: Async mode")
else:
self.scan_signal_update.emit()
for curve in self._async_curves:
self._setup_async_curve(curve)
self.async_signal_update.emit()
print("Mixed mode") # TODO change to logger
logger.info("Scan status: Mixed mode")
logger.warning("Mixed mode - integrity of x axis cannot be guaranteed.")
self.setup_dap_for_scan()
@SafeSlot(dict, dict)
@ -656,29 +674,76 @@ class Waveform(PlotBase):
"""
self.scan_signal_update.emit()
def _fetch_scan_data_and_access(self):
"""
Decide whether we're in a live scan or history
and return the appropriate data dict and access key.
Returns:
data_dict (dict): The data structure for the current scan.
access_key (str): Either 'val' (live) or 'value' (history).
"""
if self.scan_item is None:
# Optionally fetch the latest from history if nothing is set
self.scan_history(-1)
if hasattr(self.scan_item, "live_data"):
# Live scan
return self.scan_item.live_data, "val"
else:
# Historical
# If we haven't cached the read data, do it now
# TODO looks like bug in BEC has to be discussed, so far only sync devices are accessible by history
# async_devices = self.scan_item.readout_groups.async_devices.read()
sync_devices = self.scan_item.readout_groups.monitored_devices.read()
# all_devices = {**async_devices, **sync_devices}
return (sync_devices, "value")
# @SafeSlot() #TODO from some reason TypeError: Waveform.update_sync_curves() takes 1 positional argument but 2 were given
def update_sync_curves(self):
try:
data = (
self.scan_item.live_data
if hasattr(self.scan_item, "live_data") # backward compatibility
else self.scan_item.data
)
except AttributeError:
return
"""
Update the sync curves with the latest data from the scan.
"""
data, access_key = self._fetch_scan_data_and_access()
for curve in self._sync_curves:
device_name = curve.config.signal.name
device_entry = curve.config.signal.entry
device_data = data.get(device_name, {}).get(device_entry, {}).get("val", None)
device_data = data.get(device_name, {}).get(device_entry, {}).get(access_key, None)
x_data = self._get_x_data(device_name, device_entry)
# TODO check logic for x data
if len(data) == 0: # case if the data is empty because motor is not scanned
return
if device_data is not None and x_data is not None:
curve.setData(x_data, device_data)
if device_data is not None and x_data is None:
curve.setData(device_data)
self.request_dap_update.emit()
self.request_dap_update.emit() # TODO enable later
def update_async_curves(self):
# TODO SO FAR DO NOT WORK, ASYNC not accessible from bec history due to lazy dict
"""
Manually load data for asynchronous device curves (in history scenario)
or re-check in live data if needed. For live scanning, typically real-time
updates come from on_async_readback(). But if user is browsing history,
we must fetch the final recorded data from the scan storage.
This parallels update_sync_curves(), but for self._async_curves.
"""
data, access_key = self._fetch_scan_data_and_access()
for curve in self._async_curves:
device_name = curve.config.signal.name
device_entry = curve.config.signal.entry
device_data = data.get(device_name, {}).get(device_entry, {}).get(access_key, None)
x_data = self._get_x_data(device_name, device_entry)
# If there's actual data, set it
if device_data is not None:
if x_data is not None:
curve.setData(x_data, device_data)
else:
curve.setData(device_data)
def _setup_async_curve(self, curve: Curve):
name = curve.config.signal.name
@ -748,6 +813,7 @@ class Waveform(PlotBase):
self.update_dap_curves,
MessageEndpoints.dap_response(f"{self.scan_id}-{self.gui_id}"),
)
print(f"DAP setup: {self.scan_id}-{self.gui_id}") # TODO change to logger
# @SafeSlot() #FIXME type error
def request_dap(self):
@ -824,11 +890,7 @@ class Waveform(PlotBase):
"""
x_data = None
new_suffix = None
live_data = (
self.scan_item.live_data
if hasattr(self.scan_item, "live_data")
else self.scan_item.data
)
data, access_key = self._fetch_scan_data_and_access()
# 1 User wants custom signal
# TODO extend validation
@ -838,14 +900,14 @@ class Waveform(PlotBase):
if x_entry is None:
x_entry = self.entry_validator.validate_signal(x_name, None)
# if the motor was not scanned, an empty list is returned and curves are not updated
x_data = live_data.get(x_name, {}).get(x_entry, {}).get("val", [])
x_data = data.get(x_name, {}).get(x_entry, {}).get(access_key, [])
new_suffix = f" [custom: {x_name}-{x_entry}]"
# 2 User wants timestamp
if self._x_axis_mode["name"] == "timestamp":
print("Timestamp mode") # TODO change to logger
print(f"Device name: {device_name}, entry: {device_entry}") # TODO change to logger
timestamps = live_data[device_name][device_entry].timestamps
timestamps = data[device_name][device_entry].timestamps
x_data = timestamps
new_suffix = " [timestamp]"
@ -863,9 +925,15 @@ class Waveform(PlotBase):
self._update_x_label_suffix(new_suffix)
# 4.2 If there are sync curves, use the first device from the scan report
else:
# x_name = self.scan_item.status_message.info["scan_report_devices"][0] #TODO remove old access pattern
try:
x_name = self.ensure_str_list(
self.scan_item.metadata["bec"]["scan_report_devices"]
)[0]
except:
x_name = self.scan_item.status_message.info["scan_report_devices"][0]
x_entry = self.entry_validator.validate_signal(x_name, None)
x_data = live_data.get(x_name, {}).get(x_entry, {}).get("val", None)
x_data = data.get(x_name, {}).get(x_entry, {}).get(access_key, None)
new_suffix = f" [auto: {x_name}-{x_entry}]"
self._update_x_label_suffix(new_suffix)
return x_data
@ -914,26 +982,44 @@ class Waveform(PlotBase):
else:
self.plot_item.setAxisItems({"bottom": default_axis})
def _categorise_device_curves(self, readout_priority: dict) -> str:
# TODO remove readout priority from the method
def _categorise_device_curves(self) -> str:
"""
Categorise the device curves into sync and async based on the readout priority.
Args:
readout_priority(dict): The readout priority of the scan.
"""
# TODO fetch data from the scan item history
# try:
# data = self.scan_item.live_data
# data_access = "val"
# except AttributeError:
# # TODO implement history fetch
# data = self.client.history.get_by_scan_id(
# self.scan_id
# ).readout_groups.monitored_devices.read()
# data_access = "value"
if self.scan_item is None:
self.scan_history(-1)
# data, access_key = self._fetch_scan_data_and_access() #TODO check if this could be utilized
try:
readout_priority = self.scan_item.metadata["bec"]["readout_priority"]
except:
readout_priority = self.scan_item.status_message.info["readout_priority"]
# Reset sync/async curve lists
self._async_curves = []
self._sync_curves = []
self._dap_curves = []
self._async_curves.clear()
self._sync_curves.clear()
self._dap_curves.clear()
found_async = False
found_sync = False
mode = "sync"
readout_priority_async = readout_priority.get("async", [])
readout_priority_sync = readout_priority.get("monitored", [])
readout_priority_async = self.ensure_str_list(readout_priority.get("async", []))
readout_priority_sync = self.ensure_str_list(readout_priority.get("monitored", []))
# Iterate over all curves
for curve in self.curves:
print(curve)
# categorise dap curves firsts
if curve.config.source == "dap":
self._dap_curves.append(curve)
@ -946,9 +1032,7 @@ class Waveform(PlotBase):
self._sync_curves.append(curve)
found_sync = True
else:
print(
f"Device {dev_name} not found in readout priority list."
) # TODO change to logger
logger.warning("Device {dev_name} not found in readout priority list.")
# Determine the mode of the scan
if found_async and found_sync:
@ -956,16 +1040,95 @@ class Waveform(PlotBase):
logger.warning(
f"Found both async and sync devices in the scan. X-axis integrity cannot be guaranteed."
)
# TODO do some prompt to user to decide which mode to use
elif found_async:
mode = "async"
elif found_sync:
mode = "sync"
logger.info(f"Curve acquisition mode for scan {self.scan_id}: {mode}")
logger.info(f"Scan {self.scan_id} => mode={self._mode}")
return mode
def scan_history(self, scan_index: int = None, scan_id: str = None):
"""
Update the scan curves with the data from the scan storage.
Provide only one of scan_id or scan_index.
Args:
scan_id(str, optional): ScanID of the scan to be updated. Defaults to None.
scan_index(int, optional): Index of the scan to be updated. Defaults to None.
"""
if scan_index is not None and scan_id is not None:
raise ValueError("Only one of scan_id or scan_index can be provided.")
if scan_index is None and scan_id is None:
logger.warning(f"Neither scan_id or scan_number was provided, fetching the latest scan")
scan_index = -1
if scan_index is not None:
self.scan_item = self.client.history[scan_index]
metadata = self.scan_item.metadata
self.scan_id = metadata["bec"]["scan_id"]
print(f"Scan id: {self.scan_id}") # TODO change to logger
# if scan_id is None:
# logger.error(f"Scan with index {scan_index} not found.")
# return
# self.scan_id = scan_id
# self.scan_item = scan_item
else:
self.scan_id = scan_id
self.scan_item = self.client.history.get_by_scan_id(scan_id)
self._categorise_device_curves()
# TODO refresh plotting data
self.setup_dap_for_scan()
self.update_sync_curves()
self.update_async_curves()
# self.scan_signal_update.emit()
# self.async_signal_update.emit()
# self.request_dap_update.emit() # TODO enable later
self.request_dap()
# self.readout_priority =
################################################################################
# Utility Methods
################################################################################
def ensure_str_list(self, entries):
"""
Convert a variety of possible inputs (string, bytes, list/tuple/ndarray of either)
into a list of Python strings.
Examples of what this handles:
- 'monitor_async' -> ['monitor_async']
- b'monitor_async' -> ['monitor_async']
- [b'monitor_async', b'eiger', b'waveform']
-> ['monitor_async', 'eiger', 'waveform']
- np.array([b'monitor_async', b'eiger'], dtype='S')
-> ['monitor_async', 'eiger']
Returns:
list[str]: A list of Python strings.
"""
# If it's already a list/tuple/ndarray, we'll convert each element recursively
if isinstance(entries, (list, tuple, np.ndarray)):
return [self._to_str(e) for e in entries]
else:
# It's a single item (string or bytes or something else),
# so just wrap the single converted item into a list:
return [self._to_str(entries)]
def _to_str(self, x):
"""
Convert a single object x (which may be a Python string, bytes, or something else)
into a plain Python string.
"""
if isinstance(x, bytes):
return x.decode("utf-8", errors="replace")
# If already a Python string, or anything else, just cast to str:
return str(x)
################################################################################
# Export Methods
################################################################################
@ -980,8 +1143,8 @@ if __name__ == "__main__":
set_theme("dark")
widget = Waveform()
widget.show()
widget.plot("monitor_async")
widget.plot("bullshit")
widget.plot(y_name="bpm4i", y_entry="bpm4i", dap="GaussianModel")
# widget.plot("bullshit")
# widget.plot(y_name="bpm4i", y_entry="bpm4i")
# widget.plot(y_name="bpm3a", y_entry="bpm3a")
sys.exit(app.exec_())