1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-01-01 19:41:18 +01:00
Files
bec_widgets/bec_widgets/tests/utils.py

286 lines
8.8 KiB
Python

from unittest.mock import MagicMock
from bec_lib.device import Device as BECDevice
from bec_lib.device import Positioner as BECPositioner
from bec_lib.device import ReadoutPriority
from bec_lib.devicemanager import DeviceContainer
class FakeDevice(BECDevice):
"""Fake minimal positioner class for testing."""
def __init__(self, name, enabled=True, readout_priority=ReadoutPriority.MONITORED):
super().__init__(name=name)
self._enabled = enabled
self.signals = {self.name: {"value": 1.0}}
self.description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
self._readout_priority = readout_priority
self._config = {
"readoutPriority": "baseline",
"deviceClass": "ophyd.Device",
"deviceConfig": {},
"deviceTags": {"user device"},
"enabled": enabled,
"readOnly": False,
"name": self.name,
}
@property
def readout_priority(self):
return self._readout_priority
@readout_priority.setter
def readout_priority(self, value):
self._readout_priority = value
@property
def limits(self) -> tuple[float, float]:
return self._limits
@limits.setter
def limits(self, value: tuple[float, float]):
self._limits = value
def __contains__(self, item):
return item == self.name
@property
def _hints(self):
return [self.name]
def set_value(self, fake_value: float = 1.0) -> None:
"""
Setup fake value for device readout
Args:
fake_value(float): Desired fake value
"""
self.signals[self.name]["value"] = fake_value
def describe(self) -> dict:
"""
Get the description of the device
Returns:
dict: Description of the device
"""
return self.description
class FakePositioner(BECPositioner):
def __init__(
self,
name,
enabled=True,
limits=None,
read_value=1.0,
readout_priority=ReadoutPriority.MONITORED,
):
super().__init__(name=name)
# self.limits = limits if limits is not None else [0.0, 0.0]
self.read_value = read_value
self.setpoint_value = read_value
self.motor_is_moving_value = 0
self._enabled = enabled
self._limits = limits
self._readout_priority = readout_priority
self.signals = {self.name: {"value": 1.0}}
self.description = {self.name: {"source": self.name, "dtype": "number", "shape": []}}
self._config = {
"readoutPriority": "baseline",
"deviceClass": "ophyd_devices.SimPositioner",
"deviceConfig": {"delay": 1, "tolerance": 0.01, "update_frequency": 400},
"deviceTags": {"user motors"},
"enabled": enabled,
"readOnly": False,
"name": self.name,
}
self._info = {
"signals": {
"readback": {
"kind_str": "hinted",
"component_name": "readback",
"obj_name": self.name,
}, # hinted
"setpoint": {
"kind_str": "normal",
"component_name": "setpoint",
"obj_name": f"{self.name}_setpoint",
}, # normal
"velocity": {
"kind_str": "config",
"component_name": "velocity",
"obj_name": f"{self.name}_velocity",
}, # config
}
}
self.signals = {
self.name: {"value": self.read_value},
f"{self.name}_setpoint": {"value": self.setpoint_value},
f"{self.name}_motor_is_moving": {"value": self.motor_is_moving_value},
}
@property
def readout_priority(self):
return self._readout_priority
@readout_priority.setter
def readout_priority(self, value):
self._readout_priority = value
@property
def enabled(self) -> bool:
return self._enabled
@enabled.setter
def enabled(self, value: bool):
self._enabled = value
@property
def limits(self) -> tuple[float, float]:
return self._limits
@limits.setter
def limits(self, value: tuple[float, float]):
self._limits = value
def __contains__(self, item):
return item == self.name
@property
def _hints(self):
return [self.name]
def set_value(self, fake_value: float = 1.0) -> None:
"""
Setup fake value for device readout
Args:
fake_value(float): Desired fake value
"""
self.read_value = fake_value
def describe(self) -> dict:
"""
Get the description of the device
Returns:
dict: Description of the device
"""
return self.description
@property
def precision(self):
return 3
def set_read_value(self, value):
self.read_value = value
def read(self, cached=False):
return self.signals
def set_limits(self, limits):
self.limits = limits
def move(self, value, relative=False):
"""Simulates moving the device to a new position."""
if relative:
self.read_value += value
else:
self.read_value = value
# Respect the limits
self.read_value = max(min(self.read_value, self.limits[1]), self.limits[0])
@property
def readback(self):
return MagicMock(get=MagicMock(return_value=self.read_value))
class Positioner(FakePositioner):
"""just placeholder for testing embedded isinstance check in DeviceCombobox"""
def __init__(self, name="test", limits=None, read_value=1.0, enabled=True):
super().__init__(name, limits=limits, read_value=read_value, enabled=enabled)
class Device(FakeDevice):
"""just placeholder for testing embedded isinstance check in DeviceCombobox"""
def __init__(self, name, enabled=True):
super().__init__(name, enabled)
class DMMock:
def __init__(self):
self.devices = DeviceContainer()
self.enabled_devices = [device for device in self.devices if device.enabled]
def add_devices(self, devices: list):
"""
Add devices to the DeviceContainer.
Args:
devices (list): List of device instances to add.
"""
for device in devices:
self.devices[device.name] = device
def get_bec_signals(self, signal_class_name: str):
"""
Emulate DeviceManager.get_bec_signals for unit-tests.
For “AsyncSignal” we list every device whose readout_priority is
ReadoutPriority.ASYNC and build a minimal tuple
(device_name, signal_name, signal_info_dict) that matches the real
API shape used by Waveform._check_async_signal_found.
"""
signals: list[tuple[str, str, dict]] = []
if signal_class_name != "AsyncSignal":
return signals
for device in self.devices.values():
if getattr(device, "readout_priority", None) == ReadoutPriority.ASYNC:
device_name = device.name
signal_name = device.name # primary signal in our mocks
signal_info = {
"component_name": signal_name,
"obj_name": signal_name,
"kind_str": "hinted",
"signal_class": signal_class_name,
"metadata": {
"connected": True,
"precision": None,
"read_access": True,
"timestamp": 0.0,
"write_access": True,
},
}
signals.append((device_name, signal_name, signal_info))
return signals
DEVICES = [
FakePositioner("samx", limits=[-10, 10], read_value=2.0),
FakePositioner("samy", limits=[-5, 5], read_value=3.0),
FakePositioner("samz", limits=[-8, 8], read_value=4.0),
FakePositioner("aptrx", limits=None, read_value=4.0),
FakePositioner("aptry", limits=None, read_value=5.0),
FakeDevice("gauss_bpm"),
FakeDevice("gauss_adc1"),
FakeDevice("gauss_adc2"),
FakeDevice("gauss_adc3"),
FakeDevice("bpm4i"),
FakeDevice("bpm3a"),
FakeDevice("bpm3i"),
FakeDevice("eiger", readout_priority=ReadoutPriority.ASYNC),
FakeDevice("waveform1d"),
FakeDevice("async_device", readout_priority=ReadoutPriority.ASYNC),
Positioner("test", limits=[-10, 10], read_value=2.0),
Device("test_device"),
]
def check_remote_data_size(widget, plot_name, num_elements):
"""
Check if the remote data has the correct number of elements.
Used in the qtbot.waitUntil function.
"""
return len(widget.get_all_data()[plot_name]["x"]) == num_elements