File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,715 @@
|
||||
import pytest
|
||||
import warnings
|
||||
from types import SimpleNamespace
|
||||
|
||||
from slic.core.device import Device, SimpleDevice
|
||||
from slic.core.device.device import decide_z, read_z_from_channel, recursive_adjustables
|
||||
from slic.core.device.filtered import by_type, by_name, filtered
|
||||
from slic.core.device.auto import auto
|
||||
from slic.core.adjustable import Adjustable
|
||||
from slic.core.adjustable.dummyadjustable import DummyAdjustable
|
||||
|
||||
|
||||
# Device
|
||||
|
||||
@pytest.mark.parametrize("device_id,expected_name", [
|
||||
("DEV001", "DEV001"),
|
||||
("MOTOR_X", "MOTOR_X"),
|
||||
("dev123", "dev123"),
|
||||
("X", "X"),
|
||||
("A_B_C_D", "A_B_C_D"),
|
||||
])
|
||||
def test_device_init_name_defaults_to_id(device_id, expected_name):
|
||||
dev = Device(ID=device_id)
|
||||
assert dev.ID == device_id
|
||||
assert dev.name == expected_name
|
||||
assert dev.description is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("device_id,custom_name,description", [
|
||||
("DEV001", "Motor X", "X-axis motor controller"),
|
||||
("SENS002", "Temperature Sensor", "Beamline temp sensor"),
|
||||
("UND100", "Undulator", "Main undulator device"),
|
||||
("CAM01", "Camera 1", None),
|
||||
("STAGE", "XY Stage", ""),
|
||||
])
|
||||
def test_device_init_with_custom_attributes(device_id, custom_name, description):
|
||||
dev = Device(ID=device_id, name=custom_name, description=description)
|
||||
assert dev.ID == device_id
|
||||
assert dev.name == custom_name
|
||||
assert dev.description == description
|
||||
|
||||
|
||||
@pytest.mark.parametrize("device_id,z_value", [
|
||||
("UND100", 100),
|
||||
("DEV999", 999),
|
||||
("MOTOR123", 123),
|
||||
("UND050:EXTRA", 50),
|
||||
("ABC250", 250),
|
||||
])
|
||||
def test_device_z_undulator_from_id(device_id, z_value):
|
||||
dev = Device(ID=device_id)
|
||||
assert dev.z_undulator == z_value
|
||||
|
||||
|
||||
@pytest.mark.parametrize("device_id,explicit_z", [
|
||||
("ANYID", 42),
|
||||
("NOZINID", 0),
|
||||
("TEST", 999),
|
||||
("ABC", -10),
|
||||
("XYZ", 12345),
|
||||
])
|
||||
def test_device_z_undulator_explicit(device_id, explicit_z):
|
||||
dev = Device(ID=device_id, z_undulator=explicit_z)
|
||||
assert dev.z_undulator == explicit_z
|
||||
|
||||
|
||||
def test_device_with_adjustables():
|
||||
dev = Device(ID="TESTDEV", name="Test Device")
|
||||
dev.motor_x = DummyAdjustable(ID="mx", name="Motor X")
|
||||
dev.motor_y = DummyAdjustable(ID="my", name="Motor Y")
|
||||
|
||||
assert hasattr(dev, "motor_x")
|
||||
assert hasattr(dev, "motor_y")
|
||||
assert isinstance(dev.motor_x, Adjustable)
|
||||
assert isinstance(dev.motor_y, Adjustable)
|
||||
|
||||
|
||||
def test_device_iteration():
|
||||
dev = Device(ID="TESTDEV")
|
||||
dev.z_motor = DummyAdjustable(ID="z", name="Z")
|
||||
dev.a_motor = DummyAdjustable(ID="a", name="A")
|
||||
dev.m_motor = DummyAdjustable(ID="m", name="M")
|
||||
|
||||
items = list(dev)
|
||||
|
||||
assert len(items) == 3
|
||||
assert items[0].name == "A"
|
||||
assert items[1].name == "M"
|
||||
assert items[2].name == "Z"
|
||||
|
||||
|
||||
def test_device_repr_with_adjustables():
|
||||
dev = Device(ID="TESTDEV", name="Test Device", description="A test device")
|
||||
dev.motor = DummyAdjustable(ID="m1", name="Motor1", initial_value=42)
|
||||
|
||||
repr_str = repr(dev)
|
||||
|
||||
assert "test device" in repr_str.lower()
|
||||
assert "motor" in repr_str
|
||||
|
||||
|
||||
def test_device_repr_uses_description_then_name_then_id():
|
||||
dev1 = Device(ID="ID1", name="Name1", description="Description1")
|
||||
repr1 = repr(dev1)
|
||||
assert "Description1" in repr1
|
||||
|
||||
dev2 = Device(ID="ID2", name="Name2")
|
||||
repr2 = repr(dev2)
|
||||
assert "Name2" in repr2
|
||||
|
||||
dev3 = Device(ID="ID3")
|
||||
repr3 = repr(dev3)
|
||||
assert "ID3" in repr3
|
||||
|
||||
|
||||
def test_device_nested():
|
||||
parent = Device(ID="PARENT")
|
||||
child = Device(ID="CHILD")
|
||||
child.motor = DummyAdjustable(ID="m1", name="Motor")
|
||||
parent.subsystem = child
|
||||
|
||||
adjs = recursive_adjustables(parent)
|
||||
assert "subsystem.motor" in adjs
|
||||
assert adjs["subsystem.motor"].name == "Motor"
|
||||
|
||||
|
||||
def test_device_recursive_detection():
|
||||
dev1 = Device(ID="DEV1")
|
||||
dev2 = Device(ID="DEV2")
|
||||
|
||||
dev1.sub = dev2
|
||||
dev2.parent = dev1
|
||||
|
||||
with warnings.catch_warnings(record=True) as w:
|
||||
warnings.simplefilter("always")
|
||||
adjs = recursive_adjustables(dev1)
|
||||
|
||||
assert len(w) >= 1
|
||||
assert "Recursive Device" in str(w[0].message)
|
||||
|
||||
|
||||
# Helper Functions
|
||||
|
||||
@pytest.mark.parametrize("channel_id,expected_z", [
|
||||
("UND100", 100),
|
||||
("DEV999", 999),
|
||||
("MOTOR123", 123),
|
||||
("UND001", 1),
|
||||
("TEST000", 0),
|
||||
("XYZ250", 250),
|
||||
])
|
||||
def test_read_z_from_channel_valid(channel_id, expected_z):
|
||||
assert read_z_from_channel(channel_id) == expected_z
|
||||
|
||||
|
||||
@pytest.mark.parametrize("channel_id", [
|
||||
"NODIGITS",
|
||||
"ABC",
|
||||
"TEST",
|
||||
"XYZ",
|
||||
"MOTOR_X",
|
||||
"AB1",
|
||||
"X12",
|
||||
])
|
||||
def test_read_z_from_channel_invalid(channel_id):
|
||||
assert read_z_from_channel(channel_id) is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("channel_id,colon_part", [
|
||||
("MAIN:SUB:123", "123"),
|
||||
("CH:100", "100"),
|
||||
("A:B:C:456", "456"),
|
||||
("PREFIX:789:SUFFIX", "789"),
|
||||
])
|
||||
def test_read_z_from_channel_with_colons(channel_id, colon_part):
|
||||
z = read_z_from_channel(channel_id)
|
||||
assert z is not None or colon_part[-3:].isdigit()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("channel_id,explicit_z,expected", [
|
||||
("UND100", None, 100),
|
||||
("UND100", 200, 200),
|
||||
("NODIGITS", None, None),
|
||||
("NODIGITS", 42, 42),
|
||||
("TEST123", 0, 0),
|
||||
])
|
||||
def test_decide_z(channel_id, explicit_z, expected):
|
||||
assert decide_z(channel_id, explicit_z) == expected
|
||||
|
||||
|
||||
# recursive_adjustables
|
||||
|
||||
def test_recursive_adjustables_flat():
|
||||
dev = Device(ID="FLAT")
|
||||
dev.adj1 = DummyAdjustable(ID="a1", name="Adj1")
|
||||
dev.adj2 = DummyAdjustable(ID="a2", name="Adj2")
|
||||
|
||||
adjs = recursive_adjustables(dev)
|
||||
|
||||
assert len(adjs) == 2
|
||||
assert "adj1" in adjs
|
||||
assert "adj2" in adjs
|
||||
|
||||
|
||||
def test_recursive_adjustables_nested():
|
||||
parent = Device(ID="PARENT")
|
||||
child1 = Device(ID="CHILD1")
|
||||
child2 = Device(ID="CHILD2")
|
||||
|
||||
parent.adj_top = DummyAdjustable(ID="top", name="Top")
|
||||
child1.adj_c1 = DummyAdjustable(ID="c1", name="Child1")
|
||||
child2.adj_c2 = DummyAdjustable(ID="c2", name="Child2")
|
||||
|
||||
parent.subsys1 = child1
|
||||
parent.subsys2 = child2
|
||||
|
||||
adjs = recursive_adjustables(parent)
|
||||
|
||||
assert len(adjs) == 3
|
||||
assert "adj_top" in adjs
|
||||
assert "subsys1.adj_c1" in adjs
|
||||
assert "subsys2.adj_c2" in adjs
|
||||
|
||||
|
||||
def test_recursive_adjustables_deep_nesting():
|
||||
dev1 = Device(ID="L1")
|
||||
dev2 = Device(ID="L2")
|
||||
dev3 = Device(ID="L3")
|
||||
|
||||
dev3.adj = DummyAdjustable(ID="deep", name="Deep")
|
||||
dev2.level3 = dev3
|
||||
dev1.level2 = dev2
|
||||
|
||||
adjs = recursive_adjustables(dev1)
|
||||
|
||||
assert "level2.level3.adj" in adjs
|
||||
|
||||
|
||||
def test_recursive_adjustables_ignores_non_adjustables():
|
||||
dev = Device(ID="TEST")
|
||||
dev.adjustable = DummyAdjustable(ID="adj", name="Adj")
|
||||
dev.some_string = "not an adjustable"
|
||||
dev.some_number = 42
|
||||
dev.some_list = [1, 2, 3]
|
||||
|
||||
adjs = recursive_adjustables(dev)
|
||||
|
||||
assert len(adjs) == 1
|
||||
assert "adjustable" in adjs
|
||||
assert "some_string" not in adjs
|
||||
assert "some_number" not in adjs
|
||||
|
||||
|
||||
# SimpleDevice
|
||||
|
||||
@pytest.mark.parametrize("device_id,kwargs", [
|
||||
("SIMPLE1", {"x": 10, "y": 20}),
|
||||
("SIMPLE2", {"value": 42}),
|
||||
("SIMPLE3", {"a": 1, "b": 2, "c": 3}),
|
||||
("SIMPLE4", {}),
|
||||
("SIMPLE5", {"name_attr": "test", "count": 100}),
|
||||
])
|
||||
def test_simpledevice_with_kwargs(device_id, kwargs):
|
||||
dev = SimpleDevice(ID=device_id, **kwargs)
|
||||
|
||||
assert dev.ID == device_id
|
||||
for key, value in kwargs.items():
|
||||
assert hasattr(dev, key)
|
||||
assert getattr(dev, key) == value
|
||||
|
||||
|
||||
def test_simpledevice_inherits_device():
|
||||
dev = SimpleDevice(ID="SIMPLE", name="Simple Device")
|
||||
dev.motor = DummyAdjustable(ID="m1", name="Motor")
|
||||
|
||||
assert dev.name == "Simple Device", f"BUG: Expected 'Simple Device', got '{dev.name}'"
|
||||
assert hasattr(dev, "motor")
|
||||
|
||||
items = list(dev)
|
||||
assert len(items) == 1
|
||||
|
||||
|
||||
def test_simpledevice_namespace_behavior():
|
||||
dev = SimpleDevice(ID="NS", x=10, y=20, z=30)
|
||||
|
||||
assert dev.x == 10
|
||||
assert dev.y == 20
|
||||
assert dev.z == 30
|
||||
|
||||
|
||||
def test_simpledevice_mixed_adjustables_and_data():
|
||||
dev = SimpleDevice(
|
||||
ID="MIXED",
|
||||
name="Mixed Device",
|
||||
calibration=1.5,
|
||||
offset=10
|
||||
)
|
||||
dev.motor = DummyAdjustable(ID="m1", name="Motor")
|
||||
|
||||
assert dev.calibration == 1.5
|
||||
assert dev.offset == 10
|
||||
|
||||
assert hasattr(dev, "motor")
|
||||
assert isinstance(dev.motor, Adjustable)
|
||||
|
||||
|
||||
def test_simpledevice_bug_name_and_description_ignored():
|
||||
dev = SimpleDevice(
|
||||
ID="TESTID",
|
||||
name="Custom Name Should Be Used",
|
||||
description="Custom Description Should Be Used",
|
||||
z_undulator=42
|
||||
)
|
||||
|
||||
assert dev.name == "Custom Name Should Be Used", \
|
||||
f"BUG: name parameter ignored! Expected 'Custom Name Should Be Used', got '{dev.name}'"
|
||||
assert dev.description == "Custom Description Should Be Used", \
|
||||
f"BUG: description parameter ignored! Expected 'Custom Description Should Be Used', got '{dev.description}'"
|
||||
assert dev.z_undulator == 42, \
|
||||
f"BUG: z_undulator parameter ignored! Expected 42, got {dev.z_undulator}"
|
||||
|
||||
|
||||
def test_simpledevice_bug_cascade_effect():
|
||||
fake_globals = {"adj": DummyAdjustable(ID="a1", name="Adj")}
|
||||
auto_dev = auto(fake_globals, "AUTO_ID", name="Auto Name", description="Auto Desc")
|
||||
|
||||
assert auto_dev.name == "Auto Name", \
|
||||
f"BUG CASCADE in auto(): Expected 'Auto Name', got '{auto_dev.name}'"
|
||||
assert auto_dev.description == "Auto Desc", \
|
||||
f"BUG CASCADE in auto(): Expected 'Auto Desc', got {auto_dev.description}"
|
||||
|
||||
orig_dev = Device(ID="ORIG_ID", name="Original Name", description="Original Desc")
|
||||
orig_dev.motor = DummyAdjustable(ID="m1", name="Motor")
|
||||
filtered_dev = by_name(orig_dev, "motor")
|
||||
|
||||
assert filtered_dev.name == "Original Name", \
|
||||
f"BUG CASCADE in filtered(): Expected 'Original Name', got '{filtered_dev.name}'"
|
||||
assert filtered_dev.description == "Original Desc", \
|
||||
f"BUG CASCADE in filtered(): Expected 'Original Desc', got {filtered_dev.description}"
|
||||
|
||||
|
||||
# Filtering
|
||||
|
||||
def test_filtered_by_type_single():
|
||||
dev = Device(ID="TEST")
|
||||
dev.dummy1 = DummyAdjustable(ID="d1", name="Dummy1")
|
||||
dev.dummy2 = DummyAdjustable(ID="d2", name="Dummy2")
|
||||
|
||||
filtered_dev = by_type(dev, DummyAdjustable)
|
||||
|
||||
assert filtered_dev is not None
|
||||
assert hasattr(filtered_dev, "dummy1")
|
||||
assert hasattr(filtered_dev, "dummy2")
|
||||
|
||||
|
||||
def test_filtered_by_type_excludes_other_types():
|
||||
from slic.core.adjustable.genericadjustable import GenericAdjustable
|
||||
|
||||
dev = Device(ID="TEST")
|
||||
dev.dummy = DummyAdjustable(ID="d1", name="Dummy")
|
||||
dev.generic = GenericAdjustable(
|
||||
ID="g1",
|
||||
get=lambda: 0,
|
||||
set=lambda x: None,
|
||||
name="Generic"
|
||||
)
|
||||
|
||||
filtered_dev = by_type(dev, DummyAdjustable)
|
||||
|
||||
assert filtered_dev is not None
|
||||
assert hasattr(filtered_dev, "dummy")
|
||||
assert not hasattr(filtered_dev, "generic")
|
||||
|
||||
|
||||
def test_filtered_by_type_multiple_types():
|
||||
from slic.core.adjustable.genericadjustable import GenericAdjustable
|
||||
|
||||
dev = Device(ID="TEST")
|
||||
dev.dummy = DummyAdjustable(ID="d1", name="Dummy")
|
||||
dev.generic = GenericAdjustable(
|
||||
ID="g1",
|
||||
get=lambda: 0,
|
||||
set=lambda x: None,
|
||||
name="Generic"
|
||||
)
|
||||
|
||||
filtered_dev = by_type(dev, (DummyAdjustable, GenericAdjustable))
|
||||
|
||||
assert filtered_dev is not None
|
||||
assert hasattr(filtered_dev, "dummy")
|
||||
assert hasattr(filtered_dev, "generic")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("pattern,expected_attrs", [
|
||||
("motor", ["motor_x", "motor_y"]),
|
||||
("x", ["motor_x", "pos_x"]),
|
||||
("y", ["motor_y", "pos_y"]),
|
||||
("pos", ["pos_x", "pos_y"]),
|
||||
("sensor", ["temp_sensor"]),
|
||||
])
|
||||
def test_filtered_by_name(pattern, expected_attrs):
|
||||
dev = Device(ID="TEST")
|
||||
dev.motor_x = DummyAdjustable(ID="mx", name="MotorX")
|
||||
dev.motor_y = DummyAdjustable(ID="my", name="MotorY")
|
||||
dev.pos_x = DummyAdjustable(ID="px", name="PosX")
|
||||
dev.pos_y = DummyAdjustable(ID="py", name="PosY")
|
||||
dev.temp_sensor = DummyAdjustable(ID="ts", name="TempSensor")
|
||||
|
||||
filtered_dev = by_name(dev, pattern)
|
||||
|
||||
assert filtered_dev is not None
|
||||
for attr in expected_attrs:
|
||||
assert hasattr(filtered_dev, attr)
|
||||
|
||||
|
||||
def test_filtered_by_name_matches_adjustable_name():
|
||||
dev = Device(ID="TEST")
|
||||
dev.attr1 = DummyAdjustable(ID="a1", name="SpecialMotor")
|
||||
dev.attr2 = DummyAdjustable(ID="a2", name="NormalSensor")
|
||||
|
||||
filtered_dev = by_name(dev, "Motor")
|
||||
|
||||
assert filtered_dev is not None
|
||||
assert hasattr(filtered_dev, "attr1")
|
||||
assert not hasattr(filtered_dev, "attr2")
|
||||
|
||||
|
||||
def test_filtered_custom_condition():
|
||||
dev = Device(ID="TEST")
|
||||
dev.small = DummyAdjustable(ID="s1", name="Small", initial_value=5)
|
||||
dev.large = DummyAdjustable(ID="l1", name="Large", initial_value=100)
|
||||
|
||||
def condition(k, v):
|
||||
return v.get_current_value() > 50
|
||||
|
||||
filtered_dev = filtered(dev, condition)
|
||||
|
||||
assert filtered_dev is not None
|
||||
assert not hasattr(filtered_dev, "small")
|
||||
assert hasattr(filtered_dev, "large")
|
||||
|
||||
|
||||
def test_filtered_nested_devices():
|
||||
parent = Device(ID="PARENT")
|
||||
child = Device(ID="CHILD")
|
||||
|
||||
parent.adj1 = DummyAdjustable(ID="a1", name="Motor1")
|
||||
child.adj2 = DummyAdjustable(ID="a2", name="Motor2")
|
||||
child.adj3 = DummyAdjustable(ID="a3", name="Sensor1")
|
||||
parent.subsystem = child
|
||||
|
||||
filtered_dev = by_name(parent, "Motor")
|
||||
|
||||
assert filtered_dev is not None
|
||||
assert hasattr(filtered_dev, "adj1")
|
||||
assert hasattr(filtered_dev, "subsystem")
|
||||
assert hasattr(filtered_dev.subsystem, "adj2")
|
||||
assert not hasattr(filtered_dev.subsystem, "adj3")
|
||||
|
||||
|
||||
def test_filtered_prunes_empty_subdevices():
|
||||
parent = Device(ID="PARENT")
|
||||
child = Device(ID="CHILD")
|
||||
|
||||
parent.motor = DummyAdjustable(ID="m1", name="Motor")
|
||||
child.sensor = DummyAdjustable(ID="s1", name="Sensor")
|
||||
parent.subsystem = child
|
||||
|
||||
filtered_dev = by_name(parent, "Motor")
|
||||
|
||||
assert filtered_dev is not None
|
||||
assert hasattr(filtered_dev, "motor")
|
||||
assert not hasattr(filtered_dev, "subsystem")
|
||||
|
||||
|
||||
def test_filtered_returns_none_when_empty():
|
||||
dev = Device(ID="TEST")
|
||||
dev.motor = DummyAdjustable(ID="m1", name="Motor")
|
||||
|
||||
filtered_dev = by_name(dev, "NOMATCH")
|
||||
|
||||
assert filtered_dev is None
|
||||
|
||||
|
||||
def test_filtered_preserves_device_metadata():
|
||||
dev = Device(ID="ORIGINAL", name="Original Name", description="Original Desc")
|
||||
dev.motor = DummyAdjustable(ID="m1", name="Motor")
|
||||
|
||||
filtered_dev = by_name(dev, "motor")
|
||||
|
||||
assert filtered_dev.ID == "ORIGINAL"
|
||||
assert filtered_dev.name == "Original Name", \
|
||||
f"BUG: Expected 'Original Name', got '{filtered_dev.name}'"
|
||||
assert filtered_dev.description == "Original Desc", \
|
||||
f"BUG: Expected 'Original Desc', got {filtered_dev.description}"
|
||||
|
||||
|
||||
# Auto Device
|
||||
|
||||
def test_auto_creates_device_from_globals():
|
||||
fake_globals = {
|
||||
"motor_x": DummyAdjustable(ID="mx", name="MotorX"),
|
||||
"motor_y": DummyAdjustable(ID="my", name="MotorY"),
|
||||
"sensor": DummyAdjustable(ID="s1", name="Sensor"),
|
||||
"_private": DummyAdjustable(ID="p1", name="Private"),
|
||||
"not_adjustable": 42,
|
||||
}
|
||||
|
||||
dev = auto(fake_globals, "AUTO_DEV")
|
||||
|
||||
assert dev.ID == "AUTO_DEV"
|
||||
assert hasattr(dev, "motor_x")
|
||||
assert hasattr(dev, "motor_y")
|
||||
assert hasattr(dev, "sensor")
|
||||
assert not hasattr(dev, "_private")
|
||||
assert not hasattr(dev, "not_adjustable")
|
||||
|
||||
|
||||
def test_auto_ignores_private_variables():
|
||||
fake_globals = {
|
||||
"public": DummyAdjustable(ID="pub", name="Public"),
|
||||
"_private": DummyAdjustable(ID="priv", name="Private"),
|
||||
"__dunder": DummyAdjustable(ID="dun", name="Dunder"),
|
||||
}
|
||||
|
||||
dev = auto(fake_globals, "TEST")
|
||||
|
||||
assert hasattr(dev, "public")
|
||||
assert not hasattr(dev, "_private")
|
||||
assert not hasattr(dev, "__dunder")
|
||||
|
||||
|
||||
def test_auto_includes_devices():
|
||||
fake_globals = {
|
||||
"adjustable": DummyAdjustable(ID="adj", name="Adj"),
|
||||
"device": Device(ID="dev", name="Dev"),
|
||||
}
|
||||
|
||||
dev = auto(fake_globals, "AUTO")
|
||||
|
||||
assert hasattr(dev, "adjustable")
|
||||
assert hasattr(dev, "device")
|
||||
assert isinstance(dev.adjustable, Adjustable)
|
||||
assert isinstance(dev.device, Device)
|
||||
|
||||
|
||||
def test_auto_with_kwargs():
|
||||
fake_globals = {
|
||||
"motor": DummyAdjustable(ID="m1", name="Motor"),
|
||||
}
|
||||
|
||||
dev = auto(fake_globals, "AUTO", name="Custom Name", description="Custom Desc")
|
||||
|
||||
assert dev.name == "Custom Name", \
|
||||
f"BUG: Expected 'Custom Name', got '{dev.name}'"
|
||||
assert dev.description == "Custom Desc", \
|
||||
f"BUG: Expected 'Custom Desc', got {dev.description}"
|
||||
assert hasattr(dev, "motor")
|
||||
|
||||
|
||||
def test_auto_empty_globals():
|
||||
fake_globals = {
|
||||
"some_string": "text",
|
||||
"some_number": 42,
|
||||
"_hidden": DummyAdjustable(ID="h1", name="Hidden"),
|
||||
}
|
||||
|
||||
dev = auto(fake_globals, "EMPTY")
|
||||
|
||||
assert dev.ID == "EMPTY"
|
||||
assert not hasattr(dev, "some_string")
|
||||
assert not hasattr(dev, "some_number")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("prefix", ["_", "__", "___"])
|
||||
def test_auto_ignores_various_underscore_prefixes(prefix):
|
||||
fake_globals = {
|
||||
f"{prefix}var": DummyAdjustable(ID="v1", name="Var"),
|
||||
"public": DummyAdjustable(ID="p1", name="Public"),
|
||||
}
|
||||
|
||||
dev = auto(fake_globals, "TEST")
|
||||
|
||||
assert hasattr(dev, "public")
|
||||
assert not hasattr(dev, f"{prefix}var")
|
||||
|
||||
|
||||
# Integration
|
||||
# Tests full hierarchy: root Device with sub-devices and adjustables
|
||||
# Verifies recursive_adjustables finds all nested adjustables
|
||||
# and by_name filters correctly through the hierarchy
|
||||
|
||||
def test_full_device_hierarchy():
|
||||
root = Device(ID="ROOT", name="Root System")
|
||||
subsys1 = Device(ID="SUB1", name="Subsystem 1")
|
||||
subsys2 = Device(ID="SUB2", name="Subsystem 2")
|
||||
|
||||
root.motor = DummyAdjustable(ID="m0", name="RootMotor")
|
||||
subsys1.motor_x = DummyAdjustable(ID="mx", name="SubMotorX")
|
||||
subsys1.sensor = DummyAdjustable(ID="s1", name="SubSensor")
|
||||
subsys2.motor_y = DummyAdjustable(ID="my", name="SubMotorY")
|
||||
|
||||
root.system1 = subsys1
|
||||
root.system2 = subsys2
|
||||
|
||||
all_adjs = recursive_adjustables(root)
|
||||
assert len(all_adjs) == 4
|
||||
assert "motor" in all_adjs
|
||||
assert "system1.motor_x" in all_adjs
|
||||
assert "system1.sensor" in all_adjs
|
||||
assert "system2.motor_y" in all_adjs
|
||||
|
||||
motor_only = by_name(root, "motor")
|
||||
assert motor_only is not None
|
||||
motor_adjs = recursive_adjustables(motor_only)
|
||||
assert len(motor_adjs) == 3
|
||||
|
||||
dummy_only = by_type(root, DummyAdjustable)
|
||||
assert dummy_only is not None
|
||||
|
||||
|
||||
# Tests that Device can contain adjustables, dicts, ints
|
||||
# Only Adjustables are returned by recursive_adjustables
|
||||
|
||||
def test_device_with_mixed_content():
|
||||
dev = Device(ID="MIXED")
|
||||
dev.motor = DummyAdjustable(ID="m1", name="Motor")
|
||||
dev.config = {"key": "value"}
|
||||
dev.counter = 42
|
||||
sub = Device(ID="SUB")
|
||||
sub.sensor = DummyAdjustable(ID="s1", name="Sensor")
|
||||
dev.subsystem = sub
|
||||
|
||||
adjs = recursive_adjustables(dev)
|
||||
assert len(adjs) == 2
|
||||
assert "motor" in adjs
|
||||
assert "subsystem.sensor" in adjs
|
||||
assert "config" not in adjs
|
||||
assert "counter" not in adjs
|
||||
|
||||
|
||||
# Tests full chain: auto() creates SimpleDevice from globals
|
||||
# then by_name filters adjustables by pattern
|
||||
|
||||
def test_simpledevice_with_auto():
|
||||
fake_globals = {
|
||||
"motor_x": DummyAdjustable(ID="mx", name="MotorX"),
|
||||
"motor_y": DummyAdjustable(ID="my", name="MotorY"),
|
||||
"sensor_temp": DummyAdjustable(ID="st", name="TempSensor"),
|
||||
"sensor_pressure": DummyAdjustable(ID="sp", name="PressureSensor"),
|
||||
}
|
||||
|
||||
dev = auto(fake_globals, "AUTO", name="Auto Device")
|
||||
|
||||
assert dev.name == "Auto Device", \
|
||||
f"BUG: Expected 'Auto Device', got '{dev.name}'"
|
||||
assert hasattr(dev, "motor_x")
|
||||
assert hasattr(dev, "sensor_temp")
|
||||
|
||||
motors = by_name(dev, "motor")
|
||||
assert motors is not None
|
||||
assert hasattr(motors, "motor_x")
|
||||
assert hasattr(motors, "motor_y")
|
||||
assert not hasattr(motors, "sensor_temp")
|
||||
|
||||
sensors = by_name(dev, "sensor")
|
||||
assert sensors is not None
|
||||
assert hasattr(sensors, "sensor_temp")
|
||||
assert hasattr(sensors, "sensor_pressure")
|
||||
assert not hasattr(sensors, "motor_x")
|
||||
|
||||
|
||||
# Tests iteration on Device with sub-devices
|
||||
# Returns all adjustables sorted by name
|
||||
|
||||
def test_device_iteration_with_nested_structure():
|
||||
parent = Device(ID="PARENT")
|
||||
child1 = Device(ID="CHILD1")
|
||||
child2 = Device(ID="CHILD2")
|
||||
|
||||
parent.adj_a = DummyAdjustable(ID="a", name="A")
|
||||
child1.adj_b = DummyAdjustable(ID="b", name="B")
|
||||
child2.adj_c = DummyAdjustable(ID="c", name="C")
|
||||
|
||||
parent.sub1 = child1
|
||||
parent.sub2 = child2
|
||||
|
||||
items = list(parent)
|
||||
assert len(items) == 3
|
||||
|
||||
names = [item.name for item in items]
|
||||
assert names == ["A", "B", "C"]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("num_adjustables,num_devices", [
|
||||
(5, 0),
|
||||
(0, 3),
|
||||
(3, 2),
|
||||
(10, 5),
|
||||
(1, 1),
|
||||
])
|
||||
def test_device_scalability(num_adjustables, num_devices):
|
||||
dev = Device(ID="SCALE_TEST")
|
||||
|
||||
for i in range(num_adjustables):
|
||||
setattr(dev, f"adj_{i}", DummyAdjustable(ID=f"a{i}", name=f"Adj{i}"))
|
||||
|
||||
for i in range(num_devices):
|
||||
sub = Device(ID=f"SUB{i}")
|
||||
sub.motor = DummyAdjustable(ID=f"m{i}", name=f"Motor{i}")
|
||||
setattr(dev, f"sub_{i}", sub)
|
||||
|
||||
adjs = recursive_adjustables(dev)
|
||||
expected_total = num_adjustables + num_devices
|
||||
assert len(adjs) == expected_total
|
||||
@@ -0,0 +1,276 @@
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from slic.core.scanner.runname import (
|
||||
extract_runnumber,
|
||||
extract_runnumbers,
|
||||
RunFilenameGenerator,
|
||||
EVERYTHING
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmpdir_runs(tmp_path):
|
||||
d = tmp_path / "runs"
|
||||
d.mkdir()
|
||||
return d
|
||||
|
||||
|
||||
# extract_runnumber
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"fname,prefix,separator,expected",
|
||||
[
|
||||
("scan0004_test.json", "scan", "_", 4),
|
||||
("scan0001_alpha.txt", "scan", "_", 1),
|
||||
("run-0042_demo.csv", "run-", "_", 42),
|
||||
("data9999_final.txt", "data", "_", 9999),
|
||||
("scan5_test.json", "scan", "_", 5),
|
||||
(str(Path("/tmp/data/scan0007_exp.json")), "scan", "_", 7),
|
||||
(str(Path("/Users/yasmine_tligui/test_pv/runs/scan0123_test.json")), "scan", "_", 123),
|
||||
("scan0010_report.csv", "scan", "_", 10),
|
||||
("scan0300_analysis.yaml", "scan", "_", 300),
|
||||
("experimentscan0070_trial.txt", "experimentscan", "_", 70),
|
||||
("meas:001_test.txt", "meas:", "_", 1),
|
||||
(str(Path("a/b/c/scan0042_demo.json")), "scan", "_", 42),
|
||||
("scan1234_extra_part.json", "scan", "_", 1234),
|
||||
("scan_0042_test.json", "scan_", "_", 42),
|
||||
],
|
||||
)
|
||||
def test_extract_runnumber_valid_cases(fname, prefix, separator, expected):
|
||||
assert extract_runnumber(fname, prefix, separator) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"fname,prefix,separator",
|
||||
[
|
||||
("scan00A3_test.json", "scan", "_"),
|
||||
("file_test.json", "scan", "_"),
|
||||
("scan_test.json", "scan", "_"),
|
||||
("run#_info.txt", "run#", "#"),
|
||||
("1234_scan_test.json", "scan", "_"),
|
||||
],
|
||||
)
|
||||
def test_extract_runnumber_invalid_cases(fname, prefix, separator):
|
||||
with pytest.raises(ValueError):
|
||||
extract_runnumber(fname, prefix, separator)
|
||||
|
||||
|
||||
# extract_runnumbers
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"fnames,prefix,separator,expected",
|
||||
[
|
||||
(["scan0001_a", "scan0002_b", "scan0030_c"], "scan", "_", [1, 2, 30]),
|
||||
(["scan1_a", "scan02_b", "scan300_c"], "scan", "_", [1, 2, 300]),
|
||||
(["run-0004_x", "run-0005_y", "run-0010_z"], "run-", "_", [4, 5, 10]),
|
||||
(["data9_file", "data10_file", "data11_file"], "data", "_", [9, 10, 11]),
|
||||
(["scan001--a", "scan002--b", "scan003--c"], "scan", "--", [1, 2, 3]),
|
||||
],
|
||||
)
|
||||
def test_extract_runnumbers_valid(fnames, prefix, separator, expected):
|
||||
result = extract_runnumbers(fnames, prefix, separator)
|
||||
assert result == expected
|
||||
|
||||
|
||||
# RunFilenameGenerator
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"prefix,run_index,separator,name,suffix,expected",
|
||||
[
|
||||
("pfx", "7", "-", "abc", ".txt", "pfx7-abc"),
|
||||
("scan", "001", "_", "data", "_scan_info.json", "scan001_data"),
|
||||
("run", "42", "-", "test", ".meta", "run42-test"),
|
||||
("exp", "003", "--", "trial", ".log", "exp003--trial"),
|
||||
("data", "99", "::", "final", ".json", "data99::final"),
|
||||
("experiment_", "005", "_", "measure", ".csv", "experiment_005_measure"),
|
||||
],
|
||||
)
|
||||
def test_fill_filename_pattern(prefix, run_index, separator, name, suffix, expected):
|
||||
gen = RunFilenameGenerator(
|
||||
base_dir=".",
|
||||
prefix=prefix,
|
||||
separator=separator,
|
||||
suffix=suffix
|
||||
)
|
||||
formatted = gen._fill_filename_pattern(run_index, name)
|
||||
assert formatted == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"prefix,n_digits,separator,suffix,expected",
|
||||
[
|
||||
("scan", 3, "_", "_scan_info.json", "scan" + "[0-9]"*3 + "_" + EVERYTHING + "_scan_info.json"),
|
||||
("run-", 4, "-", ".json", "run-" + "[0-9]"*4 + "-" + EVERYTHING + ".json"),
|
||||
("data", 2, "#", ".meta", "data" + "[0-9]"*2 + "#" + EVERYTHING + ".meta"),
|
||||
("exp", 1, ".", "_info.txt", "exp" + "[0-9]"*1 + "." + EVERYTHING + "_info.txt"),
|
||||
],
|
||||
)
|
||||
def test_pattern_exact_match(tmpdir_runs, prefix, n_digits, separator, suffix, expected):
|
||||
gen = RunFilenameGenerator(
|
||||
base_dir=tmpdir_runs,
|
||||
prefix=prefix,
|
||||
n_digits=n_digits,
|
||||
separator=separator,
|
||||
suffix=suffix,
|
||||
)
|
||||
pattern = gen.pattern
|
||||
assert pattern == expected
|
||||
|
||||
|
||||
def test_get_existing_runnumbers_empty(monkeypatch, tmpdir_runs):
|
||||
gen = RunFilenameGenerator(tmpdir_runs)
|
||||
monkeypatch.setattr("slic.core.scanner.runname.glob_files", lambda b, p: [])
|
||||
assert gen.get_existing_runnumbers() == []
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"filenames,prefix,separator,suffix,n_digits,expected_runnumbers",
|
||||
[
|
||||
(
|
||||
["scan0001_test_scan_info.json", "scan0002_demo_scan_info.json", "scan0005_exp_scan_info.json"],
|
||||
"scan", "_", "_scan_info.json", 4,
|
||||
[1, 2, 5]
|
||||
),
|
||||
(
|
||||
["run-0010_alpha_data.json", "run-0020_beta_data.json"],
|
||||
"run-", "_", "_data.json", 4,
|
||||
[10, 20]
|
||||
),
|
||||
(
|
||||
["data01_test.meta", "data02_test.meta", "data99_test.meta"],
|
||||
"data", "_", ".meta", 2,
|
||||
[1, 2, 99]
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_get_existing_runnumbers_with_files(tmpdir_runs, filenames, prefix, separator, suffix, n_digits, expected_runnumbers):
|
||||
for fname in filenames:
|
||||
(tmpdir_runs / fname).write_text("test content")
|
||||
|
||||
gen = RunFilenameGenerator(
|
||||
base_dir=tmpdir_runs,
|
||||
prefix=prefix,
|
||||
separator=separator,
|
||||
suffix=suffix,
|
||||
n_digits=n_digits,
|
||||
)
|
||||
runnums = gen.get_existing_runnumbers()
|
||||
assert sorted(runnums) == sorted(expected_runnumbers)
|
||||
|
||||
|
||||
def test_get_existing_runnumbers_mixed_files(tmpdir_runs):
|
||||
files = [
|
||||
"scan0001_test_scan_info.json",
|
||||
"scan0002_demo_scan_info.json",
|
||||
"other_file.json",
|
||||
"random_data.txt",
|
||||
"scan_without_number.json",
|
||||
]
|
||||
for fname in files:
|
||||
(tmpdir_runs / fname).write_text("test")
|
||||
|
||||
gen = RunFilenameGenerator(tmpdir_runs)
|
||||
runnums = gen.get_existing_runnumbers()
|
||||
assert sorted(runnums) == [1, 2]
|
||||
|
||||
|
||||
def test_sequential_run_generation(tmpdir_runs):
|
||||
gen = RunFilenameGenerator(tmpdir_runs)
|
||||
|
||||
first = gen.get_next_run_filename("test")
|
||||
assert first == "scan0000_test"
|
||||
(tmpdir_runs / f"{first}_scan_info.json").write_text("data")
|
||||
|
||||
second = gen.get_next_run_filename("demo")
|
||||
assert second == "scan0001_demo"
|
||||
(tmpdir_runs / f"{second}_scan_info.json").write_text("data")
|
||||
|
||||
third = gen.get_next_run_filename("exp")
|
||||
assert third == "scan0002_exp"
|
||||
|
||||
|
||||
def test_get_next_run_filename_non_contiguous(tmpdir_runs):
|
||||
files = ["scan0001_a_scan_info.json", "scan0005_b_scan_info.json", "scan0010_c_scan_info.json"]
|
||||
for fname in files:
|
||||
(tmpdir_runs / fname).write_text("test")
|
||||
|
||||
gen = RunFilenameGenerator(tmpdir_runs)
|
||||
next_file = gen.get_next_run_filename("test")
|
||||
assert next_file == "scan0011_test"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"n_digits,expected_format",
|
||||
[
|
||||
(1, "scan0_test"),
|
||||
(2, "scan00_test"),
|
||||
(3, "scan000_test"),
|
||||
(5, "scan00000_test"),
|
||||
(6, "scan000000_test"),
|
||||
],
|
||||
)
|
||||
def test_different_n_digits(tmpdir_runs, n_digits, expected_format):
|
||||
gen = RunFilenameGenerator(
|
||||
base_dir=tmpdir_runs,
|
||||
n_digits=n_digits,
|
||||
)
|
||||
next_file = gen.get_next_run_filename("test")
|
||||
assert next_file == expected_format
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"file_structure,search_base_dir,prefix,separator,suffix,n_digits,expected_next",
|
||||
[
|
||||
(
|
||||
{
|
||||
"experiments": ["scan0001_test_scan_info.json", "scan0002_test_scan_info.json"],
|
||||
"other_folder": ["scan0003_test_scan_info.json", "scan0004_test_scan_info.json"]
|
||||
},
|
||||
"experiments",
|
||||
"scan", "_", "_scan_info.json", 4,
|
||||
"scan0003_test"
|
||||
),
|
||||
(
|
||||
{
|
||||
"project/data": ["scan0001_test_scan_info.json"],
|
||||
"project/data/raw": ["scan0002_test_scan_info.json"],
|
||||
"project/backup": ["scan0003_test_scan_info.json"],
|
||||
"other_project": ["scan0004_test_scan_info.json"]
|
||||
},
|
||||
"project/data",
|
||||
"scan", "_", "_scan_info.json", 4,
|
||||
"scan0002_test"
|
||||
),
|
||||
(
|
||||
{
|
||||
"empty_dir": [],
|
||||
"full_dir": ["scan0001_test_scan_info.json", "scan0002_test_scan_info.json"],
|
||||
},
|
||||
"empty_dir",
|
||||
"scan", "_", "_scan_info.json", 4,
|
||||
"scan0000_test"
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_get_next_run_filename(tmpdir, file_structure, search_base_dir, prefix, separator, suffix, n_digits, expected_next):
|
||||
for dir_path, filenames in file_structure.items():
|
||||
current_dir = tmpdir
|
||||
for part in dir_path.split("/"):
|
||||
current_dir = current_dir / part
|
||||
if not current_dir.exists():
|
||||
current_dir.mkdir()
|
||||
|
||||
for filename in filenames:
|
||||
(current_dir / filename).write("test")
|
||||
|
||||
actual_base_dir = tmpdir / search_base_dir
|
||||
|
||||
gen = RunFilenameGenerator(
|
||||
base_dir=str(actual_base_dir),
|
||||
prefix=prefix,
|
||||
n_digits=n_digits,
|
||||
separator=separator,
|
||||
suffix=suffix,
|
||||
)
|
||||
next_file = gen.get_next_run_filename("test")
|
||||
assert next_file == expected_next
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,488 @@
|
||||
import pytest
|
||||
from slic.core.scanner.scaninfo import ScanInfo
|
||||
|
||||
|
||||
class DummyAdjustable:
|
||||
def __init__(self, name="adj", ID="id", units="u"):
|
||||
self.name = name
|
||||
self.ID = ID
|
||||
self.units = units
|
||||
|
||||
|
||||
# ScanInfo init
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"adjustables,values,suffix,expected_filename,expected_params",
|
||||
[
|
||||
(
|
||||
[DummyAdjustable()],
|
||||
[1, 2, 3],
|
||||
"_scan_info.json",
|
||||
"fileA_scan_info.json",
|
||||
{"name": ["adj"], "Id": ["id"], "units": ["u"]},
|
||||
),
|
||||
(
|
||||
[DummyAdjustable("motorX", "M1", "mm")],
|
||||
[10, 20],
|
||||
".meta",
|
||||
"fileB.meta",
|
||||
{"name": ["motorX"], "Id": ["M1"], "units": ["mm"]},
|
||||
),
|
||||
(
|
||||
[
|
||||
DummyAdjustable("motorX", "M1", "mm"),
|
||||
DummyAdjustable("stageY", "S2", "deg"),
|
||||
DummyAdjustable("lensZ", "L3", "cm"),
|
||||
],
|
||||
[1, 2, 3],
|
||||
"_extra.json",
|
||||
"fileC_extra.json",
|
||||
{
|
||||
"name": ["motorX", "stageY", "lensZ"],
|
||||
"Id": ["M1", "S2", "L3"],
|
||||
"units": ["mm", "deg", "cm"],
|
||||
},
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_init_creates_expected_filename(tmp_path, adjustables, values, suffix, expected_filename, expected_params):
|
||||
base_dir = tmp_path
|
||||
filename_base = expected_filename.split("_")[0].split(".")[0]
|
||||
si = ScanInfo(filename_base, base_dir, adjustables, values, suffix=suffix)
|
||||
|
||||
assert si.filename.endswith(expected_filename)
|
||||
assert si.parameters == expected_params
|
||||
assert si.values == []
|
||||
assert si.readbacks == []
|
||||
assert si.files == []
|
||||
assert si.info == []
|
||||
|
||||
|
||||
def test_init_with_empty_adjustables(tmp_path):
|
||||
si = ScanInfo("empty_scan", tmp_path, [], [])
|
||||
assert si.names == []
|
||||
assert si.IDs == []
|
||||
assert si.units == []
|
||||
assert si.parameters == {"name": [], "Id": [], "units": []}
|
||||
|
||||
|
||||
class PartialAdjustable:
|
||||
def __init__(self, has_name=True, has_id=True, has_units=True):
|
||||
if has_name:
|
||||
self.name = "test_name"
|
||||
if has_id:
|
||||
self.ID = "test_id"
|
||||
if has_units:
|
||||
self.units = "test_units"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"adjustable,expected_name,expected_id,expected_units",
|
||||
[
|
||||
(PartialAdjustable(has_name=False, has_id=True, has_units=True), "noName", "test_id", "test_units"),
|
||||
(PartialAdjustable(has_name=True, has_id=False, has_units=True), "test_name", "noID", "test_units"),
|
||||
(PartialAdjustable(has_name=True, has_id=True, has_units=False), "test_name", "test_id", "noUnits"),
|
||||
(PartialAdjustable(has_name=False, has_id=False, has_units=False), "noName", "noID", "noUnits"),
|
||||
],
|
||||
)
|
||||
def test_init_with_missing_attributes(tmp_path, adjustable, expected_name, expected_id, expected_units):
|
||||
si = ScanInfo("partial_scan", tmp_path, [adjustable], [0])
|
||||
assert si.names == [expected_name]
|
||||
assert si.IDs == [expected_id]
|
||||
assert si.units == [expected_units]
|
||||
|
||||
|
||||
# append
|
||||
|
||||
def test_append(tmp_path):
|
||||
si = ScanInfo("fileX", tmp_path, [DummyAdjustable("A", "1", "u")], [0])
|
||||
|
||||
si.append([1, 2, 3], [10, 20, 30], ["f1.dat", "f2.dat", "f3.dat"], {"note": "phase1"})
|
||||
assert si.values == [[1, 2, 3]]
|
||||
assert si.readbacks == [[10, 20, 30]]
|
||||
assert si.files == [["f1.dat", "f2.dat", "f3.dat"]]
|
||||
assert si.info == [{"note": "phase1"}]
|
||||
|
||||
si.append([4, 5], [40, 50], ["f4.dat", "f5.dat"], lambda: {"note": "auto_phase2"})
|
||||
|
||||
assert si.values == [[1, 2, 3], [4, 5]]
|
||||
assert si.readbacks == [[10, 20, 30], [40, 50]]
|
||||
assert si.files == [["f1.dat", "f2.dat", "f3.dat"], ["f4.dat", "f5.dat"]]
|
||||
assert si.info == [{"note": "phase1"}, {"note": "auto_phase2"}]
|
||||
|
||||
|
||||
def test_append_with_empty_lists(tmp_path):
|
||||
si = ScanInfo("empty", tmp_path, [DummyAdjustable()], [0])
|
||||
si.append([], [], [], {})
|
||||
|
||||
assert si.values == [[]]
|
||||
assert si.readbacks == [[]]
|
||||
assert si.files == [[]]
|
||||
assert si.info == [{}]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"values_len,readbacks_len,files_len",
|
||||
[
|
||||
(3, 2, 1),
|
||||
(1, 3, 2),
|
||||
(5, 1, 5),
|
||||
],
|
||||
)
|
||||
def test_append_with_mismatched_list_lengths(tmp_path, values_len, readbacks_len, files_len):
|
||||
si = ScanInfo("mismatch", tmp_path, [DummyAdjustable()], [0])
|
||||
|
||||
values = list(range(values_len))
|
||||
readbacks = list(range(readbacks_len))
|
||||
files = [f"f{i}.dat" for i in range(files_len)]
|
||||
|
||||
si.append(values, readbacks, files, {"note": "mismatch"})
|
||||
|
||||
assert len(si.values[0]) == values_len
|
||||
assert len(si.readbacks[0]) == readbacks_len
|
||||
assert len(si.files[0]) == files_len
|
||||
|
||||
|
||||
def test_append_info_with_complex_nested_dict(tmp_path):
|
||||
si = ScanInfo("nested", tmp_path, [DummyAdjustable()], [0])
|
||||
|
||||
complex_info = {
|
||||
"level1": {
|
||||
"level2": {
|
||||
"level3": {
|
||||
"data": [1, 2, 3],
|
||||
"meta": {"key": "value"}
|
||||
}
|
||||
}
|
||||
},
|
||||
"list": [{"a": 1}, {"b": 2}]
|
||||
}
|
||||
|
||||
si.append([1], [1], ["f.dat"], complex_info)
|
||||
assert si.info[0] == complex_info
|
||||
|
||||
|
||||
def test_very_long_lists(tmp_path):
|
||||
si = ScanInfo("long", tmp_path, [DummyAdjustable()], [0])
|
||||
|
||||
long_values = list(range(10000))
|
||||
long_readbacks = list(range(10000))
|
||||
long_files = [f"file_{i}.dat" for i in range(10000)]
|
||||
|
||||
si.append(long_values, long_readbacks, long_files, {"note": "big"})
|
||||
|
||||
assert len(si.values[0]) == 10000
|
||||
assert len(si.readbacks[0]) == 10000
|
||||
assert len(si.files[0]) == 10000
|
||||
|
||||
|
||||
# info callable
|
||||
|
||||
def test_callable_info_that_raises_exception(tmp_path):
|
||||
si = ScanInfo("error_test", tmp_path, [DummyAdjustable()], [0])
|
||||
|
||||
def bad_info():
|
||||
raise ValueError("Intentional error")
|
||||
|
||||
with pytest.raises(ValueError, match="Intentional error"):
|
||||
si.append([1], [1], ["f.dat"], bad_info)
|
||||
|
||||
|
||||
def test_callable_info_returns_none(tmp_path):
|
||||
si = ScanInfo("none_test", tmp_path, [DummyAdjustable()], [0])
|
||||
si.append([1], [1], ["f.dat"], lambda: None)
|
||||
assert si.info == [None]
|
||||
|
||||
|
||||
def test_info_with_none_directly(tmp_path):
|
||||
si = ScanInfo("none_direct", tmp_path, [DummyAdjustable()], [0])
|
||||
si.append([1], [1], ["f.dat"], None)
|
||||
assert si.info == [None]
|
||||
|
||||
|
||||
# write and to_dict
|
||||
|
||||
def test_write_and_to_dict(tmp_path, monkeypatch):
|
||||
base_dir = tmp_path
|
||||
si = ScanInfo("scanTest", base_dir, [
|
||||
DummyAdjustable("motorX", "M1", "mm"),
|
||||
DummyAdjustable("stageY", "S2", "deg"),
|
||||
], [0], suffix="_info.json")
|
||||
|
||||
si.append([1.0, 2.0], [1.1, 2.1], ["f1.dat", "f2.dat"], {"phase": "init"})
|
||||
si.append([3.0, 4.0], [3.1, 4.1], ["f3.dat", "f4.dat"], {"phase": "end"})
|
||||
|
||||
last_call = {}
|
||||
def mock_json_save(data, filename):
|
||||
last_call['data'] = data
|
||||
last_call['filename'] = filename
|
||||
|
||||
monkeypatch.setattr('slic.core.scanner.scaninfo.json_save', mock_json_save)
|
||||
|
||||
si.write()
|
||||
|
||||
assert last_call['filename'] == si.filename
|
||||
|
||||
expected = si.to_dict()
|
||||
assert last_call['data'] == expected
|
||||
|
||||
|
||||
def test_to_dict_complete_structure(tmp_path):
|
||||
si = ScanInfo("scan_test", tmp_path, [DummyAdjustable("M", "ID1", "mm")], [0, 1, 2])
|
||||
|
||||
si.append([1.0], [1.1], ["f1.dat"], {"note": "test"})
|
||||
|
||||
result = si.to_dict()
|
||||
|
||||
expected_keys = {
|
||||
"scan_parameters",
|
||||
"scan_values_all",
|
||||
"scan_values",
|
||||
"scan_readbacks",
|
||||
"scan_files",
|
||||
"scan_info",
|
||||
}
|
||||
assert set(result.keys()) == expected_keys
|
||||
|
||||
assert result["scan_parameters"] == {"name": ["M"], "Id": ["ID1"], "units": ["mm"]}
|
||||
assert result["scan_values_all"] == [0, 1, 2]
|
||||
assert result["scan_values"] == [[1.0]]
|
||||
assert result["scan_readbacks"] == [[1.1]]
|
||||
assert result["scan_files"] == [["f1.dat"]]
|
||||
assert result["scan_info"] == [{"note": "test"}]
|
||||
|
||||
|
||||
# update
|
||||
|
||||
def test_update_integration(tmp_path, monkeypatch):
|
||||
si = ScanInfo("scanX", tmp_path, [DummyAdjustable("M", "ID", "mm")], [0], suffix=".json")
|
||||
|
||||
last_call = {}
|
||||
def mock_json_save(data, filename):
|
||||
last_call['data'] = data
|
||||
last_call['filename'] = filename
|
||||
|
||||
monkeypatch.setattr('slic.core.scanner.scaninfo.json_save', mock_json_save)
|
||||
|
||||
si.update([1, 2], [10, 20], ["f1.dat", "f2.dat"], {"phase": "start"})
|
||||
|
||||
assert si.values == [[1, 2]]
|
||||
assert si.readbacks == [[10, 20]]
|
||||
assert si.files == [["f1.dat", "f2.dat"]]
|
||||
assert si.info == [{"phase": "start"}]
|
||||
|
||||
assert last_call['filename'] == si.filename
|
||||
assert last_call['data'] == si.to_dict()
|
||||
|
||||
|
||||
# to_sfdaq_dict
|
||||
|
||||
def test_to_sfdaq_dict_filled_example(tmp_path):
|
||||
si = ScanInfo(
|
||||
filename_base="scanAlpha",
|
||||
base_dir=tmp_path,
|
||||
adjustables=[
|
||||
DummyAdjustable("motorX", "M1", "mm"),
|
||||
DummyAdjustable("stageY", "S2", "deg"),
|
||||
DummyAdjustable("lensZ", "L3", "cm"),
|
||||
],
|
||||
values=[0, 1, 2],
|
||||
suffix="_scan_info.json",
|
||||
)
|
||||
|
||||
result_empty = si.to_sfdaq_dict()
|
||||
assert result_empty["scan_values"] is None
|
||||
assert result_empty["scan_readbacks"] is None
|
||||
|
||||
si.append(
|
||||
[1.0, 2.0, 3.0],
|
||||
[1.1, 2.1, 3.1],
|
||||
["f1.dat"],
|
||||
{"note": "first run"}
|
||||
)
|
||||
|
||||
si.append(
|
||||
[4.0, 5.0, 6.0],
|
||||
[4.1, 5.1, 6.1],
|
||||
["f2.dat"],
|
||||
{"note": "second run"}
|
||||
)
|
||||
|
||||
result = si.to_sfdaq_dict()
|
||||
|
||||
expected_keys = {
|
||||
"scan_name", "name", "Id", "units",
|
||||
"offset", "conversion_factor",
|
||||
"scan_values", "scan_readbacks", "scan_readbacks_raw",
|
||||
}
|
||||
assert set(result.keys()) == expected_keys
|
||||
|
||||
assert result["scan_values"] == [4.0, 5.0, 6.0]
|
||||
assert result["scan_readbacks"] == [4.1, 5.1, 6.1]
|
||||
assert result["scan_readbacks_raw"] == [4.1, 5.1, 6.1]
|
||||
|
||||
assert result["scan_name"] == "scanAlpha"
|
||||
assert result["name"] == ["motorX", "stageY", "lensZ"]
|
||||
assert result["Id"] == ["M1", "S2", "L3"]
|
||||
assert result["units"] == ["mm", "deg", "cm"]
|
||||
assert result["offset"] == [0, 0, 0]
|
||||
assert result["conversion_factor"] == [1, 1, 1]
|
||||
|
||||
expected_dict = {
|
||||
"scan_name": "scanAlpha",
|
||||
"name": ["motorX", "stageY", "lensZ"],
|
||||
"Id": ["M1", "S2", "L3"],
|
||||
"units": ["mm", "deg", "cm"],
|
||||
"offset": [0, 0, 0],
|
||||
"conversion_factor": [1, 1, 1],
|
||||
"scan_values": [4.0, 5.0, 6.0],
|
||||
"scan_readbacks": [4.1, 5.1, 6.1],
|
||||
"scan_readbacks_raw": [4.1, 5.1, 6.1],
|
||||
}
|
||||
|
||||
assert result == expected_dict
|
||||
|
||||
|
||||
def test_to_sfdaq_dict_with_single_adjustable(tmp_path):
|
||||
si = ScanInfo("single", tmp_path, [DummyAdjustable("motor", "M1", "deg")], [0])
|
||||
si.append([5.0], [5.1], ["data.dat"], {"run": 1})
|
||||
|
||||
result = si.to_sfdaq_dict()
|
||||
|
||||
assert result["name"] == ["motor"]
|
||||
assert result["Id"] == ["M1"]
|
||||
assert result["units"] == ["deg"]
|
||||
assert result["offset"] == [0]
|
||||
assert result["conversion_factor"] == [1]
|
||||
assert result["scan_values"] == [5.0]
|
||||
assert result["scan_readbacks"] == [5.1]
|
||||
|
||||
|
||||
def test_to_sfdaq_dict_filename_with_slash(tmp_path):
|
||||
si = ScanInfo("path/to/scan", tmp_path, [DummyAdjustable()], [0])
|
||||
result = si.to_sfdaq_dict()
|
||||
assert result["scan_name"] == "path_to_scan"
|
||||
|
||||
|
||||
def test_to_sfdaq_dict_with_empty_adjustables_list(tmp_path):
|
||||
si = ScanInfo("empty_adj", tmp_path, [], [])
|
||||
result = si.to_sfdaq_dict()
|
||||
|
||||
assert result["name"] == []
|
||||
assert result["Id"] == []
|
||||
assert result["units"] == []
|
||||
assert result["offset"] == []
|
||||
assert result["conversion_factor"] == []
|
||||
assert result["scan_values"] is None
|
||||
assert result["scan_readbacks"] is None
|
||||
|
||||
|
||||
def test_append_then_to_sfdaq_returns_last_values(tmp_path):
|
||||
si = ScanInfo("last_test", tmp_path, [DummyAdjustable("X", "X1", "m")], [0])
|
||||
|
||||
si.append([10.0], [10.5], ["f1.dat"], {"n": 1})
|
||||
si.append([20.0], [20.5], ["f2.dat"], {"n": 2})
|
||||
si.append([30.0], [30.5], ["f3.dat"], {"n": 3})
|
||||
|
||||
result = si.to_sfdaq_dict()
|
||||
|
||||
assert result["scan_values"] == [30.0]
|
||||
assert result["scan_readbacks"] == [30.5]
|
||||
assert result["scan_readbacks_raw"] == [30.5]
|
||||
|
||||
|
||||
# repr
|
||||
|
||||
def test_repr(tmp_path):
|
||||
si = ScanInfo("test_scan", tmp_path, [DummyAdjustable()], [0], suffix="_info.json")
|
||||
expected_path = str(tmp_path / "test_scan_info.json")
|
||||
assert repr(si) == f"Scan info in {expected_path}"
|
||||
|
||||
|
||||
# filename edge cases
|
||||
|
||||
def test_filename_base_already_contains_suffix(tmp_path):
|
||||
si = ScanInfo("scan_scan_info.json", tmp_path, [DummyAdjustable()], [0], suffix="_scan_info.json")
|
||||
assert si.filename.endswith("scan_scan_info.json_scan_info.json")
|
||||
|
||||
|
||||
def test_filename_with_multiple_slashes(tmp_path):
|
||||
si = ScanInfo("path/to/deep/scan", tmp_path, [DummyAdjustable()], [0])
|
||||
result = si.to_sfdaq_dict()
|
||||
assert result["scan_name"] == "path_to_deep_scan"
|
||||
assert "/" not in result["scan_name"]
|
||||
|
||||
|
||||
def test_suffix_empty_string(tmp_path):
|
||||
si = ScanInfo("test", tmp_path, [DummyAdjustable()], [0], suffix="")
|
||||
assert si.filename == str(tmp_path / "test")
|
||||
assert not si.filename.endswith(".json")
|
||||
|
||||
|
||||
def test_suffix_no_extension(tmp_path):
|
||||
si = ScanInfo("test", tmp_path, [DummyAdjustable()], [0], suffix="_data")
|
||||
assert si.filename.endswith("test_data")
|
||||
|
||||
|
||||
def test_base_dir_with_trailing_slash(tmp_path):
|
||||
base_with_slash = str(tmp_path) + "/"
|
||||
si = ScanInfo("test", base_with_slash, [DummyAdjustable()], [0])
|
||||
assert "test_scan_info.json" in si.filename
|
||||
|
||||
|
||||
def test_filename_base_with_dots(tmp_path):
|
||||
si = ScanInfo("scan.v1.0.test", tmp_path, [DummyAdjustable()], [0], suffix=".json")
|
||||
assert "scan.v1.0.test.json" in si.filename
|
||||
|
||||
|
||||
def test_multiple_slashes_and_underscores_in_filename_base(tmp_path):
|
||||
si = ScanInfo("path/to_scan/with_underscores", tmp_path, [DummyAdjustable()], [0])
|
||||
result = si.to_sfdaq_dict()
|
||||
assert result["scan_name"] == "path_to_scan_with_underscores"
|
||||
|
||||
|
||||
def test_adjustable_with_special_characters_in_name(tmp_path):
|
||||
adj = DummyAdjustable("motor:X@123!", "ID#1", "µm/s²")
|
||||
si = ScanInfo("special", tmp_path, [adj], [0])
|
||||
|
||||
assert si.names == ["motor:X@123!"]
|
||||
assert si.IDs == ["ID#1"]
|
||||
assert si.units == ["µm/s²"]
|
||||
|
||||
|
||||
# Integration
|
||||
|
||||
# Tests complete workflow: init -> multiple updates -> verify final JSON structure
|
||||
# Verifies that each update() call correctly appends data and saves to JSON
|
||||
|
||||
def test_full_integration_workflow(tmp_path, monkeypatch):
|
||||
saved_data = []
|
||||
def mock_json_save(data, filename):
|
||||
saved_data.append({"data": data, "filename": filename})
|
||||
|
||||
monkeypatch.setattr('slic.core.scanner.scaninfo.json_save', mock_json_save)
|
||||
|
||||
si = ScanInfo(
|
||||
"integration_scan",
|
||||
tmp_path,
|
||||
[DummyAdjustable("motorA", "MA", "mm"), DummyAdjustable("motorB", "MB", "deg")],
|
||||
[0, 10, 20],
|
||||
suffix=".json"
|
||||
)
|
||||
|
||||
si.update([1.0, 2.0], [1.1, 2.1], ["file1.dat"], {"step": 1})
|
||||
assert len(saved_data) == 1
|
||||
|
||||
si.update([3.0, 4.0], [3.1, 4.1], ["file2.dat"], {"step": 2})
|
||||
assert len(saved_data) == 2
|
||||
|
||||
final_data = saved_data[-1]["data"]
|
||||
assert final_data["scan_parameters"] == {
|
||||
"name": ["motorA", "motorB"],
|
||||
"Id": ["MA", "MB"],
|
||||
"units": ["mm", "deg"]
|
||||
}
|
||||
assert final_data["scan_values"] == [[1.0, 2.0], [3.0, 4.0]]
|
||||
assert final_data["scan_readbacks"] == [[1.1, 2.1], [3.1, 4.1]]
|
||||
assert final_data["scan_files"] == [["file1.dat"], ["file2.dat"]]
|
||||
assert final_data["scan_info"] == [{"step": 1}, {"step": 2}]
|
||||
Reference in New Issue
Block a user