0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 11:41:49 +02:00

WIP minor cleanup

This commit is contained in:
2025-01-26 14:26:31 +01:00
parent 4b454ecdff
commit 995960e590

View File

@ -5,9 +5,7 @@ from typing import Literal, Optional
import numpy as np
import pyqtgraph as pg
from bec_lib import bec_logger
from bec_lib import messages
from bec_lib import bec_logger, messages
from bec_lib.device import ReadoutPriority
from bec_lib.endpoints import MessageEndpoints
from pydantic import Field, field_validator
@ -51,7 +49,7 @@ class Waveform(PlotBase):
async_signal_update = Signal()
request_dap_update = Signal()
unblock_dap_proxy = Signal()
dap_signal_update = Signal()
# dap_signal_update = Signal() #TODO not needed probably
dap_params_update = Signal(dict, dict)
dap_summary_update = Signal(dict, dict)
# autorange_signal = Signal()
@ -60,7 +58,7 @@ class Waveform(PlotBase):
# roi_changed = Signal(tuple)
# roi_active = Signal(bool)
# request_dap_refresh = Signal()
# request_dap_refresh = Signal() #TODO probably replaced by request_dap_update
def __init__(
self,
parent: QWidget | None = None,
@ -104,15 +102,11 @@ class Waveform(PlotBase):
self.proxy_update_plot = pg.SignalProxy(
self.scan_signal_update, rateLimit=25, slot=self.update_sync_curves
)
# TODO this is porper blocking proxy
# self.proxy_dap_request = BECSignalProxy(
# self.request_dap_update, rateLimit=25, slot=self.request_dap
# )
# self.unblock_dap_proxy.connect(self.proxy_dap_request.unblock_proxy)
self.proxy_dap_request = pg.SignalProxy(
self.request_dap_update, rateLimit=25, slot=self.request_dap
self.proxy_dap_request = BECSignalProxy(
self.request_dap_update, rateLimit=25, slot=self.request_dap, timeout=10.0
)
self.unblock_dap_proxy.connect(self.proxy_dap_request.unblock_proxy)
# TODO implement bec proxy to request dap update
# self.async_signal_update.connect(self.replot_async_curve)
# self.autorange_signal.connect(self.auto_range)
@ -136,7 +130,7 @@ class Waveform(PlotBase):
def x_mode(self, value: str):
self._x_axis_mode["name"] = value
self._switch_x_axis_item(mode=value)
# self._update_x_label_suffix() #TODO update straight away or wait for the next scan??
# self._update_x_label_suffix() # TODO update straight away or wait for the next scan??
self.async_signal_update.emit()
self.scan_signal_update.emit()
self.request_dap_update.emit()
@ -803,7 +797,6 @@ class Waveform(PlotBase):
def setup_dap_for_scan(self):
"""Setup DAP updates for the new scan."""
print(f"Setup DAP for scan {self.scan_id}") # TODO change to logger
self.bec_dispatcher.disconnect_slot(
self.update_dap_curves,
MessageEndpoints.dap_response(f"{self.old_scan_id}-{self.gui_id}"),
@ -813,7 +806,6 @@ class Waveform(PlotBase):
self.update_dap_curves,
MessageEndpoints.dap_response(f"{self.scan_id}-{self.gui_id}"),
)
print(f"DAP setup: {self.scan_id}-{self.gui_id}") # TODO change to logger
# @SafeSlot() #FIXME type error
def request_dap(self):
@ -927,7 +919,7 @@ class Waveform(PlotBase):
else:
# x_name = self.scan_item.status_message.info["scan_report_devices"][0] #TODO remove old access pattern
try:
x_name = self.ensure_str_list(
x_name = self._ensure_str_list(
self.scan_item.metadata["bec"]["scan_report_devices"]
)[0]
except:
@ -987,20 +979,9 @@ class Waveform(PlotBase):
"""
Categorise the device curves into sync and async based on the readout priority.
"""
# TODO fetch data from the scan item history
# try:
# data = self.scan_item.live_data
# data_access = "val"
# except AttributeError:
# # TODO implement history fetch
# data = self.client.history.get_by_scan_id(
# self.scan_id
# ).readout_groups.monitored_devices.read()
# data_access = "value"
if self.scan_item is None:
self.scan_history(-1)
# data, access_key = self._fetch_scan_data_and_access() #TODO check if this could be utilized
try:
readout_priority = self.scan_item.metadata["bec"]["readout_priority"]
except:
@ -1014,8 +995,8 @@ class Waveform(PlotBase):
found_sync = False
mode = "sync"
readout_priority_async = self.ensure_str_list(readout_priority.get("async", []))
readout_priority_sync = self.ensure_str_list(readout_priority.get("monitored", []))
readout_priority_async = self._ensure_str_list(readout_priority.get("async", []))
readout_priority_sync = self._ensure_str_list(readout_priority.get("monitored", []))
# Iterate over all curves
for curve in self.curves:
@ -1068,12 +1049,6 @@ class Waveform(PlotBase):
self.scan_item = self.client.history[scan_index]
metadata = self.scan_item.metadata
self.scan_id = metadata["bec"]["scan_id"]
print(f"Scan id: {self.scan_id}") # TODO change to logger
# if scan_id is None:
# logger.error(f"Scan with index {scan_index} not found.")
# return
# self.scan_id = scan_id
# self.scan_item = scan_item
else:
self.scan_id = scan_id
self.scan_item = self.client.history.get_by_scan_id(scan_id)
@ -1086,37 +1061,27 @@ class Waveform(PlotBase):
self.update_async_curves()
# self.scan_signal_update.emit()
# self.async_signal_update.emit()
# self.request_dap_update.emit() # TODO enable later
self.request_dap_update.emit() # TODO enable later
self.request_dap()
# self.readout_priority =
################################################################################
# Utility Methods
################################################################################
def ensure_str_list(self, entries):
def _ensure_str_list(self, entries: list | tuple | np.ndarray):
"""
Convert a variety of possible inputs (string, bytes, list/tuple/ndarray of either)
into a list of Python strings.
Examples of what this handles:
- 'monitor_async' -> ['monitor_async']
- b'monitor_async' -> ['monitor_async']
- [b'monitor_async', b'eiger', b'waveform']
-> ['monitor_async', 'eiger', 'waveform']
- np.array([b'monitor_async', b'eiger'], dtype='S')
-> ['monitor_async', 'eiger']
Args:
entries:
Returns:
list[str]: A list of Python strings.
"""
# If it's already a list/tuple/ndarray, we'll convert each element recursively
if isinstance(entries, (list, tuple, np.ndarray)):
return [self._to_str(e) for e in entries]
else:
# It's a single item (string or bytes or something else),
# so just wrap the single converted item into a list:
return [self._to_str(entries)]
def _to_str(self, x):
@ -1126,7 +1091,6 @@ class Waveform(PlotBase):
"""
if isinstance(x, bytes):
return x.decode("utf-8", errors="replace")
# If already a Python string, or anything else, just cast to str:
return str(x)
################################################################################