refactor: change type hints

This commit is contained in:
2026-02-13 13:15:21 +01:00
parent 32b5294343
commit f85f8b0447
2 changed files with 34 additions and 29 deletions

View File

@@ -27,7 +27,7 @@ import os
import threading
import uuid
from enum import StrEnum
from typing import TYPE_CHECKING, Any, Callable, TypeAlias, Union
from typing import TYPE_CHECKING, Any, Callable, TypeAlias, TypedDict
import pandablocks.commands as pbc
from bec_lib import bec_logger
@@ -138,16 +138,21 @@ class PandaState(StrEnum):
# pylint: disable=invalid-name
LITERAL_PANDA_COMMANDS: TypeAlias = Union[
pbc.Raw,
pbc.Arm,
pbc.Disarm,
pbc.GetChanges,
pbc.GetBlockInfo,
pbc.GetFieldInfo,
pbc.GetPcapBitsLabels,
]
LITERAL_PANDA_DATA: TypeAlias = Union[ReadyData, StartData, FrameData, EndData, Data]
LITERAL_PANDA_COMMANDS: TypeAlias = (
pbc.Raw
| pbc.Arm
| pbc.Disarm
| pbc.GetChanges
| pbc.GetBlockInfo
| pbc.GetFieldInfo
| pbc.GetPcapBitsLabels
)
LITERAL_PANDA_DATA: TypeAlias = ReadyData | StartData | FrameData | EndData | Data
class DataCallback(TypedDict):
callback: Callable[[LITERAL_PANDA_DATA], None]
data_type: PandaState
class PandaBox(PSIDeviceBase):
@@ -197,10 +202,10 @@ class PandaBox(PSIDeviceBase):
self._panda_state: PandaState | str = PandaState.DISARMED.value
# Status callback management
self._status_callbacks: dict[uuid.UUID, dict[str, Any]] = {}
self._status_callbacks: dict[str, dict[str, Any]] = {}
# Data callbacks management
self._data_callbacks: dict[uuid.UUID, Callable[[LITERAL_PANDA_DATA], None]] = {}
self._data_callbacks: dict[str, DataCallback] = {}
# Thread to receive data from the PandaBox
self.data_thread: threading.Thread = threading.Thread(
@@ -226,7 +231,7 @@ class PandaBox(PSIDeviceBase):
### Public API methods ###
##########################
def send_raw(self, raw_command: Union[str, list[str]]) -> Any:
def send_raw(self, raw_command: str | list[str]) -> Any:
"""
Send a raw command to the PandaBox. This can be used to set for example
values on PandaBox block fields directly, e.g. 'BITS.B=1' to set the BITS.B field to 1.
@@ -245,6 +250,8 @@ class PandaBox(PSIDeviceBase):
- ['PULSE1.DELAY.UNITS=s', PULSE1.DELAY=0, PULSE1.WIDTH.UNITS=s, PULSE1.WIDTH=0.001] to set multiple fields at once
- '*CAPTURE?' to inspect which signals have been configured for capture (PCAP?) TODO to check
"""
if isinstance(raw_command, str):
raw_command = [raw_command]
return self._send_command(pbc.Raw(raw_command))
def add_status_callback(
@@ -310,7 +317,7 @@ class PandaBox(PSIDeviceBase):
def add_data_callback(
self,
callback: Callable[[LITERAL_PANDA_DATA], None],
data_type: PandaState = PandaState.FRAME.value,
data_type: PandaState = PandaState.FRAME,
) -> str:
"""
Register a data callback to be called whenever new data is received from the PandaBox.
@@ -406,19 +413,19 @@ class PandaBox(PSIDeviceBase):
for data in client.data(scaled=False):
if isinstance(data, ReadyData):
self._run_status_callbacks(PandaState.READY)
self._run_data_callbacks(data, PandaState.READY.value)
self._run_data_callbacks(data, PandaState.READY)
elif isinstance(data, StartData):
self._run_status_callbacks(PandaState.START)
self._run_data_callbacks(data, PandaState.START.value)
self._run_data_callbacks(data, PandaState.START)
elif isinstance(data, FrameData):
self._run_status_callbacks(PandaState.FRAME)
self._run_data_callbacks(data, PandaState.FRAME.value)
self._run_data_callbacks(data, PandaState.FRAME)
elif isinstance(data, EndData):
self._run_status_callbacks(PandaState.END)
self._run_data_callbacks(data, PandaState.END.value)
self._run_data_callbacks(data, PandaState.END)
break # Exit data readout loop
finally:
@@ -441,7 +448,7 @@ class PandaBox(PSIDeviceBase):
# As DISARMED is not triggered by a data message, we manually run data callbacks for it here
# and run it with an empty Data() object following the base class for data message responses
# of the pandablocks library.
self._run_data_callbacks(Data(), PandaState.DISARMED.value)
self._run_data_callbacks(Data(), PandaState.DISARMED)
def _run_status_callbacks(self, event: PandaState) -> None:
"""
@@ -522,7 +529,7 @@ class PandaBox(PSIDeviceBase):
raise e from e
super().on_connected()
self.data_thread.start()
self.add_data_callback(data_type=PandaState.FRAME.value, callback=self._receive_frame_data)
self.add_data_callback(data_type=PandaState.FRAME, callback=self._receive_frame_data)
def _receive_frame_data(self, data: FrameData) -> None:
logger.info(f"Received frame data with signals {data}")

View File

@@ -79,6 +79,9 @@ class SignalInfo(BaseModel):
)
_SignalsTypes = list[tuple[str, str | Kind]] | list[str] | str | None
class BECMessageSignal(Signal):
"""
Custom signal class that accepts BECMessage objects as values.
@@ -98,12 +101,7 @@ class BECMessageSignal(Signal):
role: Literal["main", "preview", "diagnostic", "file event", "progress"] = "main",
acquisition_group: Literal["baseline", "monitored"] | str | None = None,
enabled: bool = True,
signals: (
Callable[[], list[str]]
| Callable[[], list[tuple[str, str | Kind]]]
| list[tuple[str, str | Kind] | str]
| None
) = None,
signals: _SignalsTypes | Callable[[], _SignalsTypes] = None,
signal_metadata: dict | None = None,
**kwargs,
):
@@ -140,7 +138,7 @@ class BECMessageSignal(Signal):
self._bec_message_type = bec_message_type
def _unify_signals(
self, signals: Callable[[], list[str]] | list[tuple[str, str | Kind] | str] | str | None
self, signals: _SignalsTypes | Callable[[], _SignalsTypes]
) -> list[tuple[str, int]]:
"""
Unify the signals list to a list of tuples with signal name and kind.
@@ -151,7 +149,7 @@ class BECMessageSignal(Signal):
Returns:
list[tuple[str, str]]: The unified list of signals.
"""
if isinstance(signals, Callable):
if callable(signals):
signals = signals()
if signals is None:
return [(self.name, Kind.hinted.value)] # Default to signal name with hinted kind