mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 11:41:49 +02:00
fix: monitor_config_validator.py changed to check .describe() instead of signals
This commit is contained in:
@ -43,13 +43,8 @@ class Signal(BaseModel):
|
|||||||
|
|
||||||
device = devices[name] # get the device to check if it has signals
|
device = devices[name] # get the device to check if it has signals
|
||||||
|
|
||||||
# Check if device have signals
|
# Get device description
|
||||||
if not hasattr(device, "signals"):
|
description = device.describe()
|
||||||
raise PydanticCustomError(
|
|
||||||
"no_device_signals",
|
|
||||||
'Device "{wrong_value}" does not have "signals" defined. Check device configuration.',
|
|
||||||
{"wrong_value": name},
|
|
||||||
)
|
|
||||||
|
|
||||||
# Validate 'entry'
|
# Validate 'entry'
|
||||||
entry = values.get("entry")
|
entry = values.get("entry")
|
||||||
@ -57,7 +52,7 @@ class Signal(BaseModel):
|
|||||||
# Set entry based on hints if not provided
|
# Set entry based on hints if not provided
|
||||||
if entry is None:
|
if entry is None:
|
||||||
entry = next(iter(device._hints), name) if hasattr(device, "_hints") else name
|
entry = next(iter(device._hints), name) if hasattr(device, "_hints") else name
|
||||||
if entry not in device.signals:
|
if entry not in description:
|
||||||
raise PydanticCustomError(
|
raise PydanticCustomError(
|
||||||
"no_entry_for_device",
|
"no_entry_for_device",
|
||||||
'Entry "{wrong_value}" not found in device "{device_name}" signals',
|
'Entry "{wrong_value}" not found in device "{device_name}" signals',
|
||||||
@ -117,6 +112,20 @@ class SourceSegmentValidator(BaseModel):
|
|||||||
signals: AxisSignal
|
signals: AxisSignal
|
||||||
|
|
||||||
|
|
||||||
|
class SourceRedisValidator(BaseModel):
|
||||||
|
"""Scan Segment source validator
|
||||||
|
Attributes:
|
||||||
|
type (str): type of source - scan_segment
|
||||||
|
endpoint (str): Endpoint reference in redis.
|
||||||
|
update (str): Update type.
|
||||||
|
"""
|
||||||
|
|
||||||
|
type: Literal["redis"]
|
||||||
|
endpoint: str
|
||||||
|
update: str
|
||||||
|
signals: dict
|
||||||
|
|
||||||
|
|
||||||
class Source(BaseModel): # TODO decide if it should stay for general Source validation
|
class Source(BaseModel): # TODO decide if it should stay for general Source validation
|
||||||
"""
|
"""
|
||||||
General source validation, includes all Optional arguments of all other sources.
|
General source validation, includes all Optional arguments of all other sources.
|
||||||
@ -126,9 +135,9 @@ class Source(BaseModel): # TODO decide if it should stay for general Source val
|
|||||||
signals (Optional[AxisSignal]): Signal for the source.
|
signals (Optional[AxisSignal]): Signal for the source.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
type: Literal["scan_segment", "history"]
|
type: Literal["scan_segment", "history", "redis"]
|
||||||
scanID: Optional[str] = None
|
scanID: Optional[str] = None
|
||||||
signals: Optional[AxisSignal] = None
|
signals: Optional[dict] = None
|
||||||
|
|
||||||
|
|
||||||
class PlotConfig(BaseModel):
|
class PlotConfig(BaseModel):
|
||||||
@ -153,14 +162,17 @@ class PlotConfig(BaseModel):
|
|||||||
"""Validate the sources of the plot configuration, based on the type of source."""
|
"""Validate the sources of the plot configuration, based on the type of source."""
|
||||||
validated_sources = []
|
validated_sources = []
|
||||||
for source in values:
|
for source in values:
|
||||||
|
# Check if source type is supported
|
||||||
Source(**source)
|
Source(**source)
|
||||||
source_type = source.get("type", None)
|
source_type = source.get("type", None)
|
||||||
|
|
||||||
# Check if source type is supported
|
# Validate source based on type
|
||||||
if source_type == "scan_segment":
|
if source_type == "scan_segment":
|
||||||
validated_sources.append(SourceSegmentValidator(**source))
|
validated_sources.append(SourceSegmentValidator(**source))
|
||||||
elif source_type == "history":
|
elif source_type == "history":
|
||||||
validated_sources.append(SourceHistoryValidator(**source))
|
validated_sources.append(SourceHistoryValidator(**source))
|
||||||
|
elif source_type == "redis":
|
||||||
|
validated_sources.append(SourceRedisValidator(**source))
|
||||||
return validated_sources
|
return validated_sources
|
||||||
|
|
||||||
|
|
||||||
|
@ -22,6 +22,7 @@ class FakeDevice:
|
|||||||
self.name = name
|
self.name = name
|
||||||
self.enabled = enabled
|
self.enabled = enabled
|
||||||
self.signals = {self.name: {"value": 1.0}}
|
self.signals = {self.name: {"value": 1.0}}
|
||||||
|
self.description = {self.name: {"source": self.name}}
|
||||||
|
|
||||||
def __contains__(self, item):
|
def __contains__(self, item):
|
||||||
return item == self.name
|
return item == self.name
|
||||||
@ -38,6 +39,14 @@ class FakeDevice:
|
|||||||
"""
|
"""
|
||||||
self.signals[self.name]["value"] = fake_value
|
self.signals[self.name]["value"] = fake_value
|
||||||
|
|
||||||
|
def describe(self) -> dict:
|
||||||
|
"""
|
||||||
|
Get the description of the device
|
||||||
|
Returns:
|
||||||
|
dict: Description of the device
|
||||||
|
"""
|
||||||
|
return self.description
|
||||||
|
|
||||||
|
|
||||||
def get_mocked_device(device_name: str):
|
def get_mocked_device(device_name: str):
|
||||||
"""
|
"""
|
||||||
@ -54,11 +63,6 @@ def mocked_client():
|
|||||||
device_names = ["samx", "gauss_bpm", "gauss_adc1", "gauss_adc2", "gauss_adc3", "bpm4i"]
|
device_names = ["samx", "gauss_bpm", "gauss_adc1", "gauss_adc2", "gauss_adc3", "bpm4i"]
|
||||||
mocked_devices = {name: get_mocked_device(name) for name in device_names}
|
mocked_devices = {name: get_mocked_device(name) for name in device_names}
|
||||||
|
|
||||||
# Adding a device with empty signals for validation tests
|
|
||||||
no_signal_device = FakeDevice(name="no_signal_device")
|
|
||||||
del no_signal_device.signals # Simulate a device with no signals
|
|
||||||
mocked_devices["no_signal_device"] = no_signal_device
|
|
||||||
|
|
||||||
# Create a MagicMock object
|
# Create a MagicMock object
|
||||||
client = MagicMock()
|
client = MagicMock()
|
||||||
|
|
||||||
|
@ -34,16 +34,6 @@ def test_signal_validation_name_not_in_bec(setup_devices):
|
|||||||
assert 'Device "non_existent_device" not found in current BEC session' in str(excinfo.value)
|
assert 'Device "non_existent_device" not found in current BEC session' in str(excinfo.value)
|
||||||
|
|
||||||
|
|
||||||
def test_signal_validation_device_has_no_signals(setup_devices):
|
|
||||||
with pytest.raises(ValidationError) as excinfo:
|
|
||||||
Signal(name="no_signal_device")
|
|
||||||
|
|
||||||
errors = excinfo.value.errors()
|
|
||||||
assert len(errors) == 1
|
|
||||||
assert errors[0]["type"] == "no_device_signals"
|
|
||||||
assert 'Device "no_signal_device" does not have "signals" defined' in errors[0]["msg"]
|
|
||||||
|
|
||||||
|
|
||||||
def test_signal_validation_entry_not_in_device(setup_devices):
|
def test_signal_validation_entry_not_in_device(setup_devices):
|
||||||
with pytest.raises(ValidationError) as excinfo:
|
with pytest.raises(ValidationError) as excinfo:
|
||||||
Signal(name="samx", entry="non_existent_entry")
|
Signal(name="samx", entry="non_existent_entry")
|
||||||
@ -103,3 +93,17 @@ def test_plot_config_history_source_type(setup_devices):
|
|||||||
assert len(plot_config.sources) == 1
|
assert len(plot_config.sources) == 1
|
||||||
assert plot_config.sources[0].type == "history"
|
assert plot_config.sources[0].type == "history"
|
||||||
assert plot_config.sources[0].scanID == "valid_scan_id"
|
assert plot_config.sources[0].scanID == "valid_scan_id"
|
||||||
|
|
||||||
|
|
||||||
|
def test_plot_config_redis_source_type(setup_devices):
|
||||||
|
history_source = {
|
||||||
|
"type": "redis",
|
||||||
|
"endpoint": "valid_endpoint",
|
||||||
|
"update": "append",
|
||||||
|
"signals": {"x": [{"name": "samx"}], "y": [{"name": "samx"}]},
|
||||||
|
}
|
||||||
|
|
||||||
|
plot_config = PlotConfig(sources=[history_source])
|
||||||
|
|
||||||
|
assert len(plot_config.sources) == 1
|
||||||
|
assert plot_config.sources[0].type == "redis"
|
||||||
|
Reference in New Issue
Block a user