mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2026-02-20 17:28:42 +01:00
fix(bec_signals): update signal metadata when updating its components
This commit is contained in:
@@ -129,7 +129,7 @@ class BECMessageSignal(Signal):
|
||||
self.enabled = enabled
|
||||
self.acquisition_group = acquisition_group
|
||||
self.signals = self._unify_signals(signals)
|
||||
self.signal_metadata = signal_metadata
|
||||
self.signal_metadata = signal_metadata or {}
|
||||
self._bec_message_type = bec_message_type
|
||||
|
||||
def _unify_signals(
|
||||
@@ -540,8 +540,6 @@ class PreviewSignal(BECMessageSignal):
|
||||
transpose (bool): Whether to transpose the data for visualization.
|
||||
value (DeviceMonitorMessage | dict | None): The initial value of the signal. Defaults to None.
|
||||
"""
|
||||
self.num_rotation_90 = num_rotation_90
|
||||
self.transpose = transpose
|
||||
kwargs.pop("kind", None)
|
||||
super().__init__(
|
||||
name=name,
|
||||
@@ -556,6 +554,24 @@ class PreviewSignal(BECMessageSignal):
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@property
|
||||
def num_rotation_90(self) -> Literal[0, 1, 2, 3]:
|
||||
"""Get the number of 90 degree counter-clockwise rotations applied to the data."""
|
||||
return self.signal_metadata["num_rotation_90"]
|
||||
|
||||
@num_rotation_90.setter
|
||||
def num_rotation_90(self, value: Literal[0, 1, 2, 3]) -> None:
|
||||
self.signal_metadata["num_rotation_90"] = value
|
||||
|
||||
@property
|
||||
def transpose(self) -> bool:
|
||||
"""Get whether the data is transposed."""
|
||||
return self.signal_metadata["transpose"]
|
||||
|
||||
@transpose.setter
|
||||
def transpose(self, value: bool) -> None:
|
||||
self.signal_metadata["transpose"] = value
|
||||
|
||||
def _process_data(self, value: np.ndarray) -> np.ndarray:
|
||||
if self.ndim == 1:
|
||||
return value
|
||||
@@ -961,3 +977,12 @@ class AsyncSignal(DynamicSignal):
|
||||
status = DeviceStatus(device=self)
|
||||
status.set_finished()
|
||||
return status
|
||||
|
||||
@property
|
||||
def max_size(self) -> int:
|
||||
"""Get the maximum size of the signal buffer."""
|
||||
return self.signal_metadata["max_size"]
|
||||
|
||||
@max_size.setter
|
||||
def max_size(self, value: int) -> None:
|
||||
self.signal_metadata["max_size"] = value
|
||||
|
||||
@@ -251,7 +251,7 @@ def test_utils_bec_message_signal():
|
||||
"enabled": True,
|
||||
"rpc_access": False,
|
||||
"signals": [("bec_message_signal", 5)],
|
||||
"signal_metadata": None,
|
||||
"signal_metadata": {},
|
||||
"acquisition_group": None,
|
||||
},
|
||||
}
|
||||
@@ -298,7 +298,7 @@ def test_utils_dynamic_signal():
|
||||
"enabled": True,
|
||||
"rpc_access": False,
|
||||
"signals": [("sig1", 1), ("sig2", 1)],
|
||||
"signal_metadata": None,
|
||||
"signal_metadata": {},
|
||||
"acquisition_group": None,
|
||||
},
|
||||
}
|
||||
@@ -442,7 +442,7 @@ def test_utils_file_event_signal():
|
||||
"enabled": True,
|
||||
"rpc_access": False,
|
||||
"signals": [("file_event_signal", 5)],
|
||||
"signal_metadata": None,
|
||||
"signal_metadata": {},
|
||||
"acquisition_group": None,
|
||||
},
|
||||
}
|
||||
@@ -623,7 +623,7 @@ def test_utils_progress_signal():
|
||||
"enabled": True,
|
||||
"rpc_access": False,
|
||||
"signals": [("progress_signal", 5)],
|
||||
"signal_metadata": None,
|
||||
"signal_metadata": {},
|
||||
"acquisition_group": None,
|
||||
},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user