mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-12-30 10:41:18 +01:00
fix(ringprogressbar): various fixes and improvements
This commit is contained in:
@@ -12,8 +12,8 @@ from bec_widgets.utils import BECConnector, ConnectionConfig
|
||||
|
||||
|
||||
class ProgressbarConnections(BaseModel):
|
||||
slot: Literal["on_scan_progress", "on_device_readback"] = None
|
||||
endpoint: EndpointInfo | str = None
|
||||
slot: Literal["on_scan_progress", "on_device_readback", None] = None
|
||||
endpoint: EndpointInfo | str | None = None
|
||||
model_config: dict = {"validate_assignment": True}
|
||||
|
||||
@field_validator("endpoint")
|
||||
@@ -222,9 +222,10 @@ class Ring(BECConnector, QObject):
|
||||
device(str): Device name for the device readback mode, only used when mode is "device"
|
||||
"""
|
||||
if mode == "manual":
|
||||
self.bec_dispatcher.disconnect_slot(
|
||||
getattr(self, self.config.connections.slot), self.config.connections.endpoint
|
||||
)
|
||||
if self.config.connections.slot is not None:
|
||||
self.bec_dispatcher.disconnect_slot(
|
||||
getattr(self, self.config.connections.slot), self.config.connections.endpoint
|
||||
)
|
||||
self.config.connections.slot = None
|
||||
self.config.connections.endpoint = None
|
||||
elif mode == "scan":
|
||||
|
||||
@@ -22,13 +22,9 @@ class RingProgressBarConfig(ConnectionConfig):
|
||||
color_map: Optional[str] = Field(
|
||||
"plasma", description="Color scheme for the progress bars.", validate_default=True
|
||||
)
|
||||
min_number_of_bars: int | None = Field(
|
||||
1, description="Minimum number of progress bars to display."
|
||||
)
|
||||
max_number_of_bars: int | None = Field(
|
||||
10, description="Maximum number of progress bars to display."
|
||||
)
|
||||
num_bars: int | None = Field(1, description="Number of progress bars to display.")
|
||||
min_number_of_bars: int = Field(1, description="Minimum number of progress bars to display.")
|
||||
max_number_of_bars: int = Field(10, description="Maximum number of progress bars to display.")
|
||||
num_bars: int = Field(1, description="Number of progress bars to display.")
|
||||
gap: int | None = Field(20, description="Gap between progress bars.")
|
||||
auto_updates: bool | None = Field(
|
||||
True, description="Enable or disable updates based on scan queue status."
|
||||
@@ -242,7 +238,7 @@ class RingProgressBar(BECWidget, QWidget):
|
||||
for i, ring in enumerate(self._rings):
|
||||
ring.config.index = i
|
||||
|
||||
def set_precision(self, precision: int, bar_index: int = None):
|
||||
def set_precision(self, precision: int, bar_index: int | None = None):
|
||||
"""
|
||||
Set the precision for the progress bars. If bar_index is not provide, the precision will be set for all progress bars.
|
||||
|
||||
@@ -271,9 +267,9 @@ class RingProgressBar(BECWidget, QWidget):
|
||||
min_values(int|float | list[float]): Minimum value(s) for the progress bars. If multiple progress bars are displayed, provide a list of minimum values for each progress bar.
|
||||
max_values(int|float | list[float]): Maximum value(s) for the progress bars. If multiple progress bars are displayed, provide a list of maximum values for each progress bar.
|
||||
"""
|
||||
if isinstance(min_values, int) or isinstance(min_values, float):
|
||||
if isinstance(min_values, (int, float)):
|
||||
min_values = [min_values]
|
||||
if isinstance(max_values, int) or isinstance(max_values, float):
|
||||
if isinstance(max_values, (int, float)):
|
||||
max_values = [max_values]
|
||||
min_values = self._adjust_list_to_bars(min_values)
|
||||
max_values = self._adjust_list_to_bars(max_values)
|
||||
@@ -441,14 +437,10 @@ class RingProgressBar(BECWidget, QWidget):
|
||||
Returns:
|
||||
Ring: Ring object.
|
||||
"""
|
||||
found_ring = None
|
||||
for ring in self._rings:
|
||||
if ring.config.index == index:
|
||||
found_ring = ring
|
||||
break
|
||||
if found_ring is None:
|
||||
raise ValueError(f"Ring with index {index} not found.")
|
||||
return found_ring
|
||||
return ring
|
||||
raise ValueError(f"Ring with index {index} not found.")
|
||||
|
||||
def enable_auto_updates(self, enable: bool = True):
|
||||
"""
|
||||
@@ -485,29 +477,30 @@ class RingProgressBar(BECWidget, QWidget):
|
||||
primary_queue = msg.get("queue").get("primary")
|
||||
info = primary_queue.get("info", None)
|
||||
|
||||
if info:
|
||||
active_request_block = info[0].get("active_request_block", None)
|
||||
if active_request_block:
|
||||
report_instructions = active_request_block.get("report_instructions", None)
|
||||
if report_instructions:
|
||||
instruction_type = list(report_instructions[0].keys())[0]
|
||||
if instruction_type == "scan_progress":
|
||||
self._hook_scan_progress(ring_index=0)
|
||||
elif instruction_type == "readback":
|
||||
devices = report_instructions[0].get("readback").get("devices")
|
||||
start = report_instructions[0].get("readback").get("start")
|
||||
end = report_instructions[0].get("readback").get("end")
|
||||
if self.config.num_bars != len(devices):
|
||||
self.set_number_of_bars(len(devices))
|
||||
for index, device in enumerate(devices):
|
||||
self._hook_readback(index, device, start[index], end[index])
|
||||
else:
|
||||
logger.error(f"{instruction_type} not supported yet.")
|
||||
if not info:
|
||||
return
|
||||
active_request_block = info[0].get("active_request_block", None)
|
||||
if not active_request_block:
|
||||
return
|
||||
report_instructions = active_request_block.get("report_instructions", None)
|
||||
if not report_instructions:
|
||||
return
|
||||
|
||||
# elif instruction_type == "device_progress":
|
||||
# print("hook device_progress")
|
||||
instruction_type = list(report_instructions[0].keys())[0]
|
||||
if instruction_type == "scan_progress":
|
||||
self._hook_scan_progress(ring_index=0)
|
||||
elif instruction_type == "readback":
|
||||
devices = report_instructions[0].get("readback").get("devices")
|
||||
start = report_instructions[0].get("readback").get("start")
|
||||
end = report_instructions[0].get("readback").get("end")
|
||||
if self.config.num_bars != len(devices):
|
||||
self.set_number_of_bars(len(devices))
|
||||
for index, device in enumerate(devices):
|
||||
self._hook_readback(index, device, start[index], end[index])
|
||||
else:
|
||||
logger.error(f"{instruction_type} not supported yet.")
|
||||
|
||||
def _hook_scan_progress(self, ring_index: int = None):
|
||||
def _hook_scan_progress(self, ring_index: int | None = None):
|
||||
"""
|
||||
Hook the scan progress to the progress bars.
|
||||
|
||||
@@ -521,8 +514,7 @@ class RingProgressBar(BECWidget, QWidget):
|
||||
|
||||
if ring.config.connections.slot == "on_scan_progress":
|
||||
return
|
||||
else:
|
||||
ring.set_connections("on_scan_progress", MessageEndpoints.scan_progress())
|
||||
ring.set_connections("on_scan_progress", MessageEndpoints.scan_progress())
|
||||
|
||||
def _hook_readback(self, bar_index: int, device: str, min: float | int, max: float | int):
|
||||
"""
|
||||
@@ -576,6 +568,8 @@ class RingProgressBar(BECWidget, QWidget):
|
||||
return bar_index
|
||||
|
||||
def paintEvent(self, event):
|
||||
if not self._rings:
|
||||
return
|
||||
painter = QtGui.QPainter(self)
|
||||
painter.setRenderHint(QtGui.QPainter.Antialiasing)
|
||||
size = min(self.width(), self.height())
|
||||
@@ -628,9 +622,8 @@ class RingProgressBar(BECWidget, QWidget):
|
||||
return QSize(10, 10)
|
||||
ring_widths = [self.config.rings[i].line_width for i in range(self.config.num_bars)]
|
||||
total_width = sum(ring_widths) + self.config.gap * (self.config.num_bars - 1)
|
||||
diameter = total_width * 2
|
||||
if diameter < 50:
|
||||
diameter = 50
|
||||
diameter = max(total_width * 2, 50)
|
||||
|
||||
return QSize(diameter, diameter)
|
||||
|
||||
def sizeHint(self):
|
||||
|
||||
Reference in New Issue
Block a user