Upload files to "tests"
Run CI Tests / test (push) Successful in 2m19s

This commit is contained in:
2026-01-21 19:16:28 +01:00
parent 1cf98a1d72
commit 2c7b11248b
5 changed files with 4329 additions and 0 deletions
File diff suppressed because it is too large Load Diff
+715
View File
@@ -0,0 +1,715 @@
import pytest
import warnings
from types import SimpleNamespace
from slic.core.device import Device, SimpleDevice
from slic.core.device.device import decide_z, read_z_from_channel, recursive_adjustables
from slic.core.device.filtered import by_type, by_name, filtered
from slic.core.device.auto import auto
from slic.core.adjustable import Adjustable
from slic.core.adjustable.dummyadjustable import DummyAdjustable
# Device
@pytest.mark.parametrize("device_id,expected_name", [
("DEV001", "DEV001"),
("MOTOR_X", "MOTOR_X"),
("dev123", "dev123"),
("X", "X"),
("A_B_C_D", "A_B_C_D"),
])
def test_device_init_name_defaults_to_id(device_id, expected_name):
dev = Device(ID=device_id)
assert dev.ID == device_id
assert dev.name == expected_name
assert dev.description is None
@pytest.mark.parametrize("device_id,custom_name,description", [
("DEV001", "Motor X", "X-axis motor controller"),
("SENS002", "Temperature Sensor", "Beamline temp sensor"),
("UND100", "Undulator", "Main undulator device"),
("CAM01", "Camera 1", None),
("STAGE", "XY Stage", ""),
])
def test_device_init_with_custom_attributes(device_id, custom_name, description):
dev = Device(ID=device_id, name=custom_name, description=description)
assert dev.ID == device_id
assert dev.name == custom_name
assert dev.description == description
@pytest.mark.parametrize("device_id,z_value", [
("UND100", 100),
("DEV999", 999),
("MOTOR123", 123),
("UND050:EXTRA", 50),
("ABC250", 250),
])
def test_device_z_undulator_from_id(device_id, z_value):
dev = Device(ID=device_id)
assert dev.z_undulator == z_value
@pytest.mark.parametrize("device_id,explicit_z", [
("ANYID", 42),
("NOZINID", 0),
("TEST", 999),
("ABC", -10),
("XYZ", 12345),
])
def test_device_z_undulator_explicit(device_id, explicit_z):
dev = Device(ID=device_id, z_undulator=explicit_z)
assert dev.z_undulator == explicit_z
def test_device_with_adjustables():
dev = Device(ID="TESTDEV", name="Test Device")
dev.motor_x = DummyAdjustable(ID="mx", name="Motor X")
dev.motor_y = DummyAdjustable(ID="my", name="Motor Y")
assert hasattr(dev, "motor_x")
assert hasattr(dev, "motor_y")
assert isinstance(dev.motor_x, Adjustable)
assert isinstance(dev.motor_y, Adjustable)
def test_device_iteration():
dev = Device(ID="TESTDEV")
dev.z_motor = DummyAdjustable(ID="z", name="Z")
dev.a_motor = DummyAdjustable(ID="a", name="A")
dev.m_motor = DummyAdjustable(ID="m", name="M")
items = list(dev)
assert len(items) == 3
assert items[0].name == "A"
assert items[1].name == "M"
assert items[2].name == "Z"
def test_device_repr_with_adjustables():
dev = Device(ID="TESTDEV", name="Test Device", description="A test device")
dev.motor = DummyAdjustable(ID="m1", name="Motor1", initial_value=42)
repr_str = repr(dev)
assert "test device" in repr_str.lower()
assert "motor" in repr_str
def test_device_repr_uses_description_then_name_then_id():
dev1 = Device(ID="ID1", name="Name1", description="Description1")
repr1 = repr(dev1)
assert "Description1" in repr1
dev2 = Device(ID="ID2", name="Name2")
repr2 = repr(dev2)
assert "Name2" in repr2
dev3 = Device(ID="ID3")
repr3 = repr(dev3)
assert "ID3" in repr3
def test_device_nested():
parent = Device(ID="PARENT")
child = Device(ID="CHILD")
child.motor = DummyAdjustable(ID="m1", name="Motor")
parent.subsystem = child
adjs = recursive_adjustables(parent)
assert "subsystem.motor" in adjs
assert adjs["subsystem.motor"].name == "Motor"
def test_device_recursive_detection():
dev1 = Device(ID="DEV1")
dev2 = Device(ID="DEV2")
dev1.sub = dev2
dev2.parent = dev1
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
adjs = recursive_adjustables(dev1)
assert len(w) >= 1
assert "Recursive Device" in str(w[0].message)
# Helper Functions
@pytest.mark.parametrize("channel_id,expected_z", [
("UND100", 100),
("DEV999", 999),
("MOTOR123", 123),
("UND001", 1),
("TEST000", 0),
("XYZ250", 250),
])
def test_read_z_from_channel_valid(channel_id, expected_z):
assert read_z_from_channel(channel_id) == expected_z
@pytest.mark.parametrize("channel_id", [
"NODIGITS",
"ABC",
"TEST",
"XYZ",
"MOTOR_X",
"AB1",
"X12",
])
def test_read_z_from_channel_invalid(channel_id):
assert read_z_from_channel(channel_id) is None
@pytest.mark.parametrize("channel_id,colon_part", [
("MAIN:SUB:123", "123"),
("CH:100", "100"),
("A:B:C:456", "456"),
("PREFIX:789:SUFFIX", "789"),
])
def test_read_z_from_channel_with_colons(channel_id, colon_part):
z = read_z_from_channel(channel_id)
assert z is not None or colon_part[-3:].isdigit()
@pytest.mark.parametrize("channel_id,explicit_z,expected", [
("UND100", None, 100),
("UND100", 200, 200),
("NODIGITS", None, None),
("NODIGITS", 42, 42),
("TEST123", 0, 0),
])
def test_decide_z(channel_id, explicit_z, expected):
assert decide_z(channel_id, explicit_z) == expected
# recursive_adjustables
def test_recursive_adjustables_flat():
dev = Device(ID="FLAT")
dev.adj1 = DummyAdjustable(ID="a1", name="Adj1")
dev.adj2 = DummyAdjustable(ID="a2", name="Adj2")
adjs = recursive_adjustables(dev)
assert len(adjs) == 2
assert "adj1" in adjs
assert "adj2" in adjs
def test_recursive_adjustables_nested():
parent = Device(ID="PARENT")
child1 = Device(ID="CHILD1")
child2 = Device(ID="CHILD2")
parent.adj_top = DummyAdjustable(ID="top", name="Top")
child1.adj_c1 = DummyAdjustable(ID="c1", name="Child1")
child2.adj_c2 = DummyAdjustable(ID="c2", name="Child2")
parent.subsys1 = child1
parent.subsys2 = child2
adjs = recursive_adjustables(parent)
assert len(adjs) == 3
assert "adj_top" in adjs
assert "subsys1.adj_c1" in adjs
assert "subsys2.adj_c2" in adjs
def test_recursive_adjustables_deep_nesting():
dev1 = Device(ID="L1")
dev2 = Device(ID="L2")
dev3 = Device(ID="L3")
dev3.adj = DummyAdjustable(ID="deep", name="Deep")
dev2.level3 = dev3
dev1.level2 = dev2
adjs = recursive_adjustables(dev1)
assert "level2.level3.adj" in adjs
def test_recursive_adjustables_ignores_non_adjustables():
dev = Device(ID="TEST")
dev.adjustable = DummyAdjustable(ID="adj", name="Adj")
dev.some_string = "not an adjustable"
dev.some_number = 42
dev.some_list = [1, 2, 3]
adjs = recursive_adjustables(dev)
assert len(adjs) == 1
assert "adjustable" in adjs
assert "some_string" not in adjs
assert "some_number" not in adjs
# SimpleDevice
@pytest.mark.parametrize("device_id,kwargs", [
("SIMPLE1", {"x": 10, "y": 20}),
("SIMPLE2", {"value": 42}),
("SIMPLE3", {"a": 1, "b": 2, "c": 3}),
("SIMPLE4", {}),
("SIMPLE5", {"name_attr": "test", "count": 100}),
])
def test_simpledevice_with_kwargs(device_id, kwargs):
dev = SimpleDevice(ID=device_id, **kwargs)
assert dev.ID == device_id
for key, value in kwargs.items():
assert hasattr(dev, key)
assert getattr(dev, key) == value
def test_simpledevice_inherits_device():
dev = SimpleDevice(ID="SIMPLE", name="Simple Device")
dev.motor = DummyAdjustable(ID="m1", name="Motor")
assert dev.name == "Simple Device", f"BUG: Expected 'Simple Device', got '{dev.name}'"
assert hasattr(dev, "motor")
items = list(dev)
assert len(items) == 1
def test_simpledevice_namespace_behavior():
dev = SimpleDevice(ID="NS", x=10, y=20, z=30)
assert dev.x == 10
assert dev.y == 20
assert dev.z == 30
def test_simpledevice_mixed_adjustables_and_data():
dev = SimpleDevice(
ID="MIXED",
name="Mixed Device",
calibration=1.5,
offset=10
)
dev.motor = DummyAdjustable(ID="m1", name="Motor")
assert dev.calibration == 1.5
assert dev.offset == 10
assert hasattr(dev, "motor")
assert isinstance(dev.motor, Adjustable)
def test_simpledevice_bug_name_and_description_ignored():
dev = SimpleDevice(
ID="TESTID",
name="Custom Name Should Be Used",
description="Custom Description Should Be Used",
z_undulator=42
)
assert dev.name == "Custom Name Should Be Used", \
f"BUG: name parameter ignored! Expected 'Custom Name Should Be Used', got '{dev.name}'"
assert dev.description == "Custom Description Should Be Used", \
f"BUG: description parameter ignored! Expected 'Custom Description Should Be Used', got '{dev.description}'"
assert dev.z_undulator == 42, \
f"BUG: z_undulator parameter ignored! Expected 42, got {dev.z_undulator}"
def test_simpledevice_bug_cascade_effect():
fake_globals = {"adj": DummyAdjustable(ID="a1", name="Adj")}
auto_dev = auto(fake_globals, "AUTO_ID", name="Auto Name", description="Auto Desc")
assert auto_dev.name == "Auto Name", \
f"BUG CASCADE in auto(): Expected 'Auto Name', got '{auto_dev.name}'"
assert auto_dev.description == "Auto Desc", \
f"BUG CASCADE in auto(): Expected 'Auto Desc', got {auto_dev.description}"
orig_dev = Device(ID="ORIG_ID", name="Original Name", description="Original Desc")
orig_dev.motor = DummyAdjustable(ID="m1", name="Motor")
filtered_dev = by_name(orig_dev, "motor")
assert filtered_dev.name == "Original Name", \
f"BUG CASCADE in filtered(): Expected 'Original Name', got '{filtered_dev.name}'"
assert filtered_dev.description == "Original Desc", \
f"BUG CASCADE in filtered(): Expected 'Original Desc', got {filtered_dev.description}"
# Filtering
def test_filtered_by_type_single():
dev = Device(ID="TEST")
dev.dummy1 = DummyAdjustable(ID="d1", name="Dummy1")
dev.dummy2 = DummyAdjustable(ID="d2", name="Dummy2")
filtered_dev = by_type(dev, DummyAdjustable)
assert filtered_dev is not None
assert hasattr(filtered_dev, "dummy1")
assert hasattr(filtered_dev, "dummy2")
def test_filtered_by_type_excludes_other_types():
from slic.core.adjustable.genericadjustable import GenericAdjustable
dev = Device(ID="TEST")
dev.dummy = DummyAdjustable(ID="d1", name="Dummy")
dev.generic = GenericAdjustable(
ID="g1",
get=lambda: 0,
set=lambda x: None,
name="Generic"
)
filtered_dev = by_type(dev, DummyAdjustable)
assert filtered_dev is not None
assert hasattr(filtered_dev, "dummy")
assert not hasattr(filtered_dev, "generic")
def test_filtered_by_type_multiple_types():
from slic.core.adjustable.genericadjustable import GenericAdjustable
dev = Device(ID="TEST")
dev.dummy = DummyAdjustable(ID="d1", name="Dummy")
dev.generic = GenericAdjustable(
ID="g1",
get=lambda: 0,
set=lambda x: None,
name="Generic"
)
filtered_dev = by_type(dev, (DummyAdjustable, GenericAdjustable))
assert filtered_dev is not None
assert hasattr(filtered_dev, "dummy")
assert hasattr(filtered_dev, "generic")
@pytest.mark.parametrize("pattern,expected_attrs", [
("motor", ["motor_x", "motor_y"]),
("x", ["motor_x", "pos_x"]),
("y", ["motor_y", "pos_y"]),
("pos", ["pos_x", "pos_y"]),
("sensor", ["temp_sensor"]),
])
def test_filtered_by_name(pattern, expected_attrs):
dev = Device(ID="TEST")
dev.motor_x = DummyAdjustable(ID="mx", name="MotorX")
dev.motor_y = DummyAdjustable(ID="my", name="MotorY")
dev.pos_x = DummyAdjustable(ID="px", name="PosX")
dev.pos_y = DummyAdjustable(ID="py", name="PosY")
dev.temp_sensor = DummyAdjustable(ID="ts", name="TempSensor")
filtered_dev = by_name(dev, pattern)
assert filtered_dev is not None
for attr in expected_attrs:
assert hasattr(filtered_dev, attr)
def test_filtered_by_name_matches_adjustable_name():
dev = Device(ID="TEST")
dev.attr1 = DummyAdjustable(ID="a1", name="SpecialMotor")
dev.attr2 = DummyAdjustable(ID="a2", name="NormalSensor")
filtered_dev = by_name(dev, "Motor")
assert filtered_dev is not None
assert hasattr(filtered_dev, "attr1")
assert not hasattr(filtered_dev, "attr2")
def test_filtered_custom_condition():
dev = Device(ID="TEST")
dev.small = DummyAdjustable(ID="s1", name="Small", initial_value=5)
dev.large = DummyAdjustable(ID="l1", name="Large", initial_value=100)
def condition(k, v):
return v.get_current_value() > 50
filtered_dev = filtered(dev, condition)
assert filtered_dev is not None
assert not hasattr(filtered_dev, "small")
assert hasattr(filtered_dev, "large")
def test_filtered_nested_devices():
parent = Device(ID="PARENT")
child = Device(ID="CHILD")
parent.adj1 = DummyAdjustable(ID="a1", name="Motor1")
child.adj2 = DummyAdjustable(ID="a2", name="Motor2")
child.adj3 = DummyAdjustable(ID="a3", name="Sensor1")
parent.subsystem = child
filtered_dev = by_name(parent, "Motor")
assert filtered_dev is not None
assert hasattr(filtered_dev, "adj1")
assert hasattr(filtered_dev, "subsystem")
assert hasattr(filtered_dev.subsystem, "adj2")
assert not hasattr(filtered_dev.subsystem, "adj3")
def test_filtered_prunes_empty_subdevices():
parent = Device(ID="PARENT")
child = Device(ID="CHILD")
parent.motor = DummyAdjustable(ID="m1", name="Motor")
child.sensor = DummyAdjustable(ID="s1", name="Sensor")
parent.subsystem = child
filtered_dev = by_name(parent, "Motor")
assert filtered_dev is not None
assert hasattr(filtered_dev, "motor")
assert not hasattr(filtered_dev, "subsystem")
def test_filtered_returns_none_when_empty():
dev = Device(ID="TEST")
dev.motor = DummyAdjustable(ID="m1", name="Motor")
filtered_dev = by_name(dev, "NOMATCH")
assert filtered_dev is None
def test_filtered_preserves_device_metadata():
dev = Device(ID="ORIGINAL", name="Original Name", description="Original Desc")
dev.motor = DummyAdjustable(ID="m1", name="Motor")
filtered_dev = by_name(dev, "motor")
assert filtered_dev.ID == "ORIGINAL"
assert filtered_dev.name == "Original Name", \
f"BUG: Expected 'Original Name', got '{filtered_dev.name}'"
assert filtered_dev.description == "Original Desc", \
f"BUG: Expected 'Original Desc', got {filtered_dev.description}"
# Auto Device
def test_auto_creates_device_from_globals():
fake_globals = {
"motor_x": DummyAdjustable(ID="mx", name="MotorX"),
"motor_y": DummyAdjustable(ID="my", name="MotorY"),
"sensor": DummyAdjustable(ID="s1", name="Sensor"),
"_private": DummyAdjustable(ID="p1", name="Private"),
"not_adjustable": 42,
}
dev = auto(fake_globals, "AUTO_DEV")
assert dev.ID == "AUTO_DEV"
assert hasattr(dev, "motor_x")
assert hasattr(dev, "motor_y")
assert hasattr(dev, "sensor")
assert not hasattr(dev, "_private")
assert not hasattr(dev, "not_adjustable")
def test_auto_ignores_private_variables():
fake_globals = {
"public": DummyAdjustable(ID="pub", name="Public"),
"_private": DummyAdjustable(ID="priv", name="Private"),
"__dunder": DummyAdjustable(ID="dun", name="Dunder"),
}
dev = auto(fake_globals, "TEST")
assert hasattr(dev, "public")
assert not hasattr(dev, "_private")
assert not hasattr(dev, "__dunder")
def test_auto_includes_devices():
fake_globals = {
"adjustable": DummyAdjustable(ID="adj", name="Adj"),
"device": Device(ID="dev", name="Dev"),
}
dev = auto(fake_globals, "AUTO")
assert hasattr(dev, "adjustable")
assert hasattr(dev, "device")
assert isinstance(dev.adjustable, Adjustable)
assert isinstance(dev.device, Device)
def test_auto_with_kwargs():
fake_globals = {
"motor": DummyAdjustable(ID="m1", name="Motor"),
}
dev = auto(fake_globals, "AUTO", name="Custom Name", description="Custom Desc")
assert dev.name == "Custom Name", \
f"BUG: Expected 'Custom Name', got '{dev.name}'"
assert dev.description == "Custom Desc", \
f"BUG: Expected 'Custom Desc', got {dev.description}"
assert hasattr(dev, "motor")
def test_auto_empty_globals():
fake_globals = {
"some_string": "text",
"some_number": 42,
"_hidden": DummyAdjustable(ID="h1", name="Hidden"),
}
dev = auto(fake_globals, "EMPTY")
assert dev.ID == "EMPTY"
assert not hasattr(dev, "some_string")
assert not hasattr(dev, "some_number")
@pytest.mark.parametrize("prefix", ["_", "__", "___"])
def test_auto_ignores_various_underscore_prefixes(prefix):
fake_globals = {
f"{prefix}var": DummyAdjustable(ID="v1", name="Var"),
"public": DummyAdjustable(ID="p1", name="Public"),
}
dev = auto(fake_globals, "TEST")
assert hasattr(dev, "public")
assert not hasattr(dev, f"{prefix}var")
# Integration
# Tests full hierarchy: root Device with sub-devices and adjustables
# Verifies recursive_adjustables finds all nested adjustables
# and by_name filters correctly through the hierarchy
def test_full_device_hierarchy():
root = Device(ID="ROOT", name="Root System")
subsys1 = Device(ID="SUB1", name="Subsystem 1")
subsys2 = Device(ID="SUB2", name="Subsystem 2")
root.motor = DummyAdjustable(ID="m0", name="RootMotor")
subsys1.motor_x = DummyAdjustable(ID="mx", name="SubMotorX")
subsys1.sensor = DummyAdjustable(ID="s1", name="SubSensor")
subsys2.motor_y = DummyAdjustable(ID="my", name="SubMotorY")
root.system1 = subsys1
root.system2 = subsys2
all_adjs = recursive_adjustables(root)
assert len(all_adjs) == 4
assert "motor" in all_adjs
assert "system1.motor_x" in all_adjs
assert "system1.sensor" in all_adjs
assert "system2.motor_y" in all_adjs
motor_only = by_name(root, "motor")
assert motor_only is not None
motor_adjs = recursive_adjustables(motor_only)
assert len(motor_adjs) == 3
dummy_only = by_type(root, DummyAdjustable)
assert dummy_only is not None
# Tests that Device can contain adjustables, dicts, ints
# Only Adjustables are returned by recursive_adjustables
def test_device_with_mixed_content():
dev = Device(ID="MIXED")
dev.motor = DummyAdjustable(ID="m1", name="Motor")
dev.config = {"key": "value"}
dev.counter = 42
sub = Device(ID="SUB")
sub.sensor = DummyAdjustable(ID="s1", name="Sensor")
dev.subsystem = sub
adjs = recursive_adjustables(dev)
assert len(adjs) == 2
assert "motor" in adjs
assert "subsystem.sensor" in adjs
assert "config" not in adjs
assert "counter" not in adjs
# Tests full chain: auto() creates SimpleDevice from globals
# then by_name filters adjustables by pattern
def test_simpledevice_with_auto():
fake_globals = {
"motor_x": DummyAdjustable(ID="mx", name="MotorX"),
"motor_y": DummyAdjustable(ID="my", name="MotorY"),
"sensor_temp": DummyAdjustable(ID="st", name="TempSensor"),
"sensor_pressure": DummyAdjustable(ID="sp", name="PressureSensor"),
}
dev = auto(fake_globals, "AUTO", name="Auto Device")
assert dev.name == "Auto Device", \
f"BUG: Expected 'Auto Device', got '{dev.name}'"
assert hasattr(dev, "motor_x")
assert hasattr(dev, "sensor_temp")
motors = by_name(dev, "motor")
assert motors is not None
assert hasattr(motors, "motor_x")
assert hasattr(motors, "motor_y")
assert not hasattr(motors, "sensor_temp")
sensors = by_name(dev, "sensor")
assert sensors is not None
assert hasattr(sensors, "sensor_temp")
assert hasattr(sensors, "sensor_pressure")
assert not hasattr(sensors, "motor_x")
# Tests iteration on Device with sub-devices
# Returns all adjustables sorted by name
def test_device_iteration_with_nested_structure():
parent = Device(ID="PARENT")
child1 = Device(ID="CHILD1")
child2 = Device(ID="CHILD2")
parent.adj_a = DummyAdjustable(ID="a", name="A")
child1.adj_b = DummyAdjustable(ID="b", name="B")
child2.adj_c = DummyAdjustable(ID="c", name="C")
parent.sub1 = child1
parent.sub2 = child2
items = list(parent)
assert len(items) == 3
names = [item.name for item in items]
assert names == ["A", "B", "C"]
@pytest.mark.parametrize("num_adjustables,num_devices", [
(5, 0),
(0, 3),
(3, 2),
(10, 5),
(1, 1),
])
def test_device_scalability(num_adjustables, num_devices):
dev = Device(ID="SCALE_TEST")
for i in range(num_adjustables):
setattr(dev, f"adj_{i}", DummyAdjustable(ID=f"a{i}", name=f"Adj{i}"))
for i in range(num_devices):
sub = Device(ID=f"SUB{i}")
sub.motor = DummyAdjustable(ID=f"m{i}", name=f"Motor{i}")
setattr(dev, f"sub_{i}", sub)
adjs = recursive_adjustables(dev)
expected_total = num_adjustables + num_devices
assert len(adjs) == expected_total
+276
View File
@@ -0,0 +1,276 @@
import pytest
from pathlib import Path
from slic.core.scanner.runname import (
extract_runnumber,
extract_runnumbers,
RunFilenameGenerator,
EVERYTHING
)
@pytest.fixture
def tmpdir_runs(tmp_path):
d = tmp_path / "runs"
d.mkdir()
return d
# extract_runnumber
@pytest.mark.parametrize(
"fname,prefix,separator,expected",
[
("scan0004_test.json", "scan", "_", 4),
("scan0001_alpha.txt", "scan", "_", 1),
("run-0042_demo.csv", "run-", "_", 42),
("data9999_final.txt", "data", "_", 9999),
("scan5_test.json", "scan", "_", 5),
(str(Path("/tmp/data/scan0007_exp.json")), "scan", "_", 7),
(str(Path("/Users/yasmine_tligui/test_pv/runs/scan0123_test.json")), "scan", "_", 123),
("scan0010_report.csv", "scan", "_", 10),
("scan0300_analysis.yaml", "scan", "_", 300),
("experimentscan0070_trial.txt", "experimentscan", "_", 70),
("meas:001_test.txt", "meas:", "_", 1),
(str(Path("a/b/c/scan0042_demo.json")), "scan", "_", 42),
("scan1234_extra_part.json", "scan", "_", 1234),
("scan_0042_test.json", "scan_", "_", 42),
],
)
def test_extract_runnumber_valid_cases(fname, prefix, separator, expected):
assert extract_runnumber(fname, prefix, separator) == expected
@pytest.mark.parametrize(
"fname,prefix,separator",
[
("scan00A3_test.json", "scan", "_"),
("file_test.json", "scan", "_"),
("scan_test.json", "scan", "_"),
("run#_info.txt", "run#", "#"),
("1234_scan_test.json", "scan", "_"),
],
)
def test_extract_runnumber_invalid_cases(fname, prefix, separator):
with pytest.raises(ValueError):
extract_runnumber(fname, prefix, separator)
# extract_runnumbers
@pytest.mark.parametrize(
"fnames,prefix,separator,expected",
[
(["scan0001_a", "scan0002_b", "scan0030_c"], "scan", "_", [1, 2, 30]),
(["scan1_a", "scan02_b", "scan300_c"], "scan", "_", [1, 2, 300]),
(["run-0004_x", "run-0005_y", "run-0010_z"], "run-", "_", [4, 5, 10]),
(["data9_file", "data10_file", "data11_file"], "data", "_", [9, 10, 11]),
(["scan001--a", "scan002--b", "scan003--c"], "scan", "--", [1, 2, 3]),
],
)
def test_extract_runnumbers_valid(fnames, prefix, separator, expected):
result = extract_runnumbers(fnames, prefix, separator)
assert result == expected
# RunFilenameGenerator
@pytest.mark.parametrize(
"prefix,run_index,separator,name,suffix,expected",
[
("pfx", "7", "-", "abc", ".txt", "pfx7-abc"),
("scan", "001", "_", "data", "_scan_info.json", "scan001_data"),
("run", "42", "-", "test", ".meta", "run42-test"),
("exp", "003", "--", "trial", ".log", "exp003--trial"),
("data", "99", "::", "final", ".json", "data99::final"),
("experiment_", "005", "_", "measure", ".csv", "experiment_005_measure"),
],
)
def test_fill_filename_pattern(prefix, run_index, separator, name, suffix, expected):
gen = RunFilenameGenerator(
base_dir=".",
prefix=prefix,
separator=separator,
suffix=suffix
)
formatted = gen._fill_filename_pattern(run_index, name)
assert formatted == expected
@pytest.mark.parametrize(
"prefix,n_digits,separator,suffix,expected",
[
("scan", 3, "_", "_scan_info.json", "scan" + "[0-9]"*3 + "_" + EVERYTHING + "_scan_info.json"),
("run-", 4, "-", ".json", "run-" + "[0-9]"*4 + "-" + EVERYTHING + ".json"),
("data", 2, "#", ".meta", "data" + "[0-9]"*2 + "#" + EVERYTHING + ".meta"),
("exp", 1, ".", "_info.txt", "exp" + "[0-9]"*1 + "." + EVERYTHING + "_info.txt"),
],
)
def test_pattern_exact_match(tmpdir_runs, prefix, n_digits, separator, suffix, expected):
gen = RunFilenameGenerator(
base_dir=tmpdir_runs,
prefix=prefix,
n_digits=n_digits,
separator=separator,
suffix=suffix,
)
pattern = gen.pattern
assert pattern == expected
def test_get_existing_runnumbers_empty(monkeypatch, tmpdir_runs):
gen = RunFilenameGenerator(tmpdir_runs)
monkeypatch.setattr("slic.core.scanner.runname.glob_files", lambda b, p: [])
assert gen.get_existing_runnumbers() == []
@pytest.mark.parametrize(
"filenames,prefix,separator,suffix,n_digits,expected_runnumbers",
[
(
["scan0001_test_scan_info.json", "scan0002_demo_scan_info.json", "scan0005_exp_scan_info.json"],
"scan", "_", "_scan_info.json", 4,
[1, 2, 5]
),
(
["run-0010_alpha_data.json", "run-0020_beta_data.json"],
"run-", "_", "_data.json", 4,
[10, 20]
),
(
["data01_test.meta", "data02_test.meta", "data99_test.meta"],
"data", "_", ".meta", 2,
[1, 2, 99]
),
],
)
def test_get_existing_runnumbers_with_files(tmpdir_runs, filenames, prefix, separator, suffix, n_digits, expected_runnumbers):
for fname in filenames:
(tmpdir_runs / fname).write_text("test content")
gen = RunFilenameGenerator(
base_dir=tmpdir_runs,
prefix=prefix,
separator=separator,
suffix=suffix,
n_digits=n_digits,
)
runnums = gen.get_existing_runnumbers()
assert sorted(runnums) == sorted(expected_runnumbers)
def test_get_existing_runnumbers_mixed_files(tmpdir_runs):
files = [
"scan0001_test_scan_info.json",
"scan0002_demo_scan_info.json",
"other_file.json",
"random_data.txt",
"scan_without_number.json",
]
for fname in files:
(tmpdir_runs / fname).write_text("test")
gen = RunFilenameGenerator(tmpdir_runs)
runnums = gen.get_existing_runnumbers()
assert sorted(runnums) == [1, 2]
def test_sequential_run_generation(tmpdir_runs):
gen = RunFilenameGenerator(tmpdir_runs)
first = gen.get_next_run_filename("test")
assert first == "scan0000_test"
(tmpdir_runs / f"{first}_scan_info.json").write_text("data")
second = gen.get_next_run_filename("demo")
assert second == "scan0001_demo"
(tmpdir_runs / f"{second}_scan_info.json").write_text("data")
third = gen.get_next_run_filename("exp")
assert third == "scan0002_exp"
def test_get_next_run_filename_non_contiguous(tmpdir_runs):
files = ["scan0001_a_scan_info.json", "scan0005_b_scan_info.json", "scan0010_c_scan_info.json"]
for fname in files:
(tmpdir_runs / fname).write_text("test")
gen = RunFilenameGenerator(tmpdir_runs)
next_file = gen.get_next_run_filename("test")
assert next_file == "scan0011_test"
@pytest.mark.parametrize(
"n_digits,expected_format",
[
(1, "scan0_test"),
(2, "scan00_test"),
(3, "scan000_test"),
(5, "scan00000_test"),
(6, "scan000000_test"),
],
)
def test_different_n_digits(tmpdir_runs, n_digits, expected_format):
gen = RunFilenameGenerator(
base_dir=tmpdir_runs,
n_digits=n_digits,
)
next_file = gen.get_next_run_filename("test")
assert next_file == expected_format
@pytest.mark.parametrize(
"file_structure,search_base_dir,prefix,separator,suffix,n_digits,expected_next",
[
(
{
"experiments": ["scan0001_test_scan_info.json", "scan0002_test_scan_info.json"],
"other_folder": ["scan0003_test_scan_info.json", "scan0004_test_scan_info.json"]
},
"experiments",
"scan", "_", "_scan_info.json", 4,
"scan0003_test"
),
(
{
"project/data": ["scan0001_test_scan_info.json"],
"project/data/raw": ["scan0002_test_scan_info.json"],
"project/backup": ["scan0003_test_scan_info.json"],
"other_project": ["scan0004_test_scan_info.json"]
},
"project/data",
"scan", "_", "_scan_info.json", 4,
"scan0002_test"
),
(
{
"empty_dir": [],
"full_dir": ["scan0001_test_scan_info.json", "scan0002_test_scan_info.json"],
},
"empty_dir",
"scan", "_", "_scan_info.json", 4,
"scan0000_test"
),
],
)
def test_get_next_run_filename(tmpdir, file_structure, search_base_dir, prefix, separator, suffix, n_digits, expected_next):
for dir_path, filenames in file_structure.items():
current_dir = tmpdir
for part in dir_path.split("/"):
current_dir = current_dir / part
if not current_dir.exists():
current_dir.mkdir()
for filename in filenames:
(current_dir / filename).write("test")
actual_base_dir = tmpdir / search_base_dir
gen = RunFilenameGenerator(
base_dir=str(actual_base_dir),
prefix=prefix,
n_digits=n_digits,
separator=separator,
suffix=suffix,
)
next_file = gen.get_next_run_filename("test")
assert next_file == expected_next
File diff suppressed because it is too large Load Diff
+488
View File
@@ -0,0 +1,488 @@
import pytest
from slic.core.scanner.scaninfo import ScanInfo
class DummyAdjustable:
def __init__(self, name="adj", ID="id", units="u"):
self.name = name
self.ID = ID
self.units = units
# ScanInfo init
@pytest.mark.parametrize(
"adjustables,values,suffix,expected_filename,expected_params",
[
(
[DummyAdjustable()],
[1, 2, 3],
"_scan_info.json",
"fileA_scan_info.json",
{"name": ["adj"], "Id": ["id"], "units": ["u"]},
),
(
[DummyAdjustable("motorX", "M1", "mm")],
[10, 20],
".meta",
"fileB.meta",
{"name": ["motorX"], "Id": ["M1"], "units": ["mm"]},
),
(
[
DummyAdjustable("motorX", "M1", "mm"),
DummyAdjustable("stageY", "S2", "deg"),
DummyAdjustable("lensZ", "L3", "cm"),
],
[1, 2, 3],
"_extra.json",
"fileC_extra.json",
{
"name": ["motorX", "stageY", "lensZ"],
"Id": ["M1", "S2", "L3"],
"units": ["mm", "deg", "cm"],
},
),
],
)
def test_init_creates_expected_filename(tmp_path, adjustables, values, suffix, expected_filename, expected_params):
base_dir = tmp_path
filename_base = expected_filename.split("_")[0].split(".")[0]
si = ScanInfo(filename_base, base_dir, adjustables, values, suffix=suffix)
assert si.filename.endswith(expected_filename)
assert si.parameters == expected_params
assert si.values == []
assert si.readbacks == []
assert si.files == []
assert si.info == []
def test_init_with_empty_adjustables(tmp_path):
si = ScanInfo("empty_scan", tmp_path, [], [])
assert si.names == []
assert si.IDs == []
assert si.units == []
assert si.parameters == {"name": [], "Id": [], "units": []}
class PartialAdjustable:
def __init__(self, has_name=True, has_id=True, has_units=True):
if has_name:
self.name = "test_name"
if has_id:
self.ID = "test_id"
if has_units:
self.units = "test_units"
@pytest.mark.parametrize(
"adjustable,expected_name,expected_id,expected_units",
[
(PartialAdjustable(has_name=False, has_id=True, has_units=True), "noName", "test_id", "test_units"),
(PartialAdjustable(has_name=True, has_id=False, has_units=True), "test_name", "noID", "test_units"),
(PartialAdjustable(has_name=True, has_id=True, has_units=False), "test_name", "test_id", "noUnits"),
(PartialAdjustable(has_name=False, has_id=False, has_units=False), "noName", "noID", "noUnits"),
],
)
def test_init_with_missing_attributes(tmp_path, adjustable, expected_name, expected_id, expected_units):
si = ScanInfo("partial_scan", tmp_path, [adjustable], [0])
assert si.names == [expected_name]
assert si.IDs == [expected_id]
assert si.units == [expected_units]
# append
def test_append(tmp_path):
si = ScanInfo("fileX", tmp_path, [DummyAdjustable("A", "1", "u")], [0])
si.append([1, 2, 3], [10, 20, 30], ["f1.dat", "f2.dat", "f3.dat"], {"note": "phase1"})
assert si.values == [[1, 2, 3]]
assert si.readbacks == [[10, 20, 30]]
assert si.files == [["f1.dat", "f2.dat", "f3.dat"]]
assert si.info == [{"note": "phase1"}]
si.append([4, 5], [40, 50], ["f4.dat", "f5.dat"], lambda: {"note": "auto_phase2"})
assert si.values == [[1, 2, 3], [4, 5]]
assert si.readbacks == [[10, 20, 30], [40, 50]]
assert si.files == [["f1.dat", "f2.dat", "f3.dat"], ["f4.dat", "f5.dat"]]
assert si.info == [{"note": "phase1"}, {"note": "auto_phase2"}]
def test_append_with_empty_lists(tmp_path):
si = ScanInfo("empty", tmp_path, [DummyAdjustable()], [0])
si.append([], [], [], {})
assert si.values == [[]]
assert si.readbacks == [[]]
assert si.files == [[]]
assert si.info == [{}]
@pytest.mark.parametrize(
"values_len,readbacks_len,files_len",
[
(3, 2, 1),
(1, 3, 2),
(5, 1, 5),
],
)
def test_append_with_mismatched_list_lengths(tmp_path, values_len, readbacks_len, files_len):
si = ScanInfo("mismatch", tmp_path, [DummyAdjustable()], [0])
values = list(range(values_len))
readbacks = list(range(readbacks_len))
files = [f"f{i}.dat" for i in range(files_len)]
si.append(values, readbacks, files, {"note": "mismatch"})
assert len(si.values[0]) == values_len
assert len(si.readbacks[0]) == readbacks_len
assert len(si.files[0]) == files_len
def test_append_info_with_complex_nested_dict(tmp_path):
si = ScanInfo("nested", tmp_path, [DummyAdjustable()], [0])
complex_info = {
"level1": {
"level2": {
"level3": {
"data": [1, 2, 3],
"meta": {"key": "value"}
}
}
},
"list": [{"a": 1}, {"b": 2}]
}
si.append([1], [1], ["f.dat"], complex_info)
assert si.info[0] == complex_info
def test_very_long_lists(tmp_path):
si = ScanInfo("long", tmp_path, [DummyAdjustable()], [0])
long_values = list(range(10000))
long_readbacks = list(range(10000))
long_files = [f"file_{i}.dat" for i in range(10000)]
si.append(long_values, long_readbacks, long_files, {"note": "big"})
assert len(si.values[0]) == 10000
assert len(si.readbacks[0]) == 10000
assert len(si.files[0]) == 10000
# info callable
def test_callable_info_that_raises_exception(tmp_path):
si = ScanInfo("error_test", tmp_path, [DummyAdjustable()], [0])
def bad_info():
raise ValueError("Intentional error")
with pytest.raises(ValueError, match="Intentional error"):
si.append([1], [1], ["f.dat"], bad_info)
def test_callable_info_returns_none(tmp_path):
si = ScanInfo("none_test", tmp_path, [DummyAdjustable()], [0])
si.append([1], [1], ["f.dat"], lambda: None)
assert si.info == [None]
def test_info_with_none_directly(tmp_path):
si = ScanInfo("none_direct", tmp_path, [DummyAdjustable()], [0])
si.append([1], [1], ["f.dat"], None)
assert si.info == [None]
# write and to_dict
def test_write_and_to_dict(tmp_path, monkeypatch):
base_dir = tmp_path
si = ScanInfo("scanTest", base_dir, [
DummyAdjustable("motorX", "M1", "mm"),
DummyAdjustable("stageY", "S2", "deg"),
], [0], suffix="_info.json")
si.append([1.0, 2.0], [1.1, 2.1], ["f1.dat", "f2.dat"], {"phase": "init"})
si.append([3.0, 4.0], [3.1, 4.1], ["f3.dat", "f4.dat"], {"phase": "end"})
last_call = {}
def mock_json_save(data, filename):
last_call['data'] = data
last_call['filename'] = filename
monkeypatch.setattr('slic.core.scanner.scaninfo.json_save', mock_json_save)
si.write()
assert last_call['filename'] == si.filename
expected = si.to_dict()
assert last_call['data'] == expected
def test_to_dict_complete_structure(tmp_path):
si = ScanInfo("scan_test", tmp_path, [DummyAdjustable("M", "ID1", "mm")], [0, 1, 2])
si.append([1.0], [1.1], ["f1.dat"], {"note": "test"})
result = si.to_dict()
expected_keys = {
"scan_parameters",
"scan_values_all",
"scan_values",
"scan_readbacks",
"scan_files",
"scan_info",
}
assert set(result.keys()) == expected_keys
assert result["scan_parameters"] == {"name": ["M"], "Id": ["ID1"], "units": ["mm"]}
assert result["scan_values_all"] == [0, 1, 2]
assert result["scan_values"] == [[1.0]]
assert result["scan_readbacks"] == [[1.1]]
assert result["scan_files"] == [["f1.dat"]]
assert result["scan_info"] == [{"note": "test"}]
# update
def test_update_integration(tmp_path, monkeypatch):
si = ScanInfo("scanX", tmp_path, [DummyAdjustable("M", "ID", "mm")], [0], suffix=".json")
last_call = {}
def mock_json_save(data, filename):
last_call['data'] = data
last_call['filename'] = filename
monkeypatch.setattr('slic.core.scanner.scaninfo.json_save', mock_json_save)
si.update([1, 2], [10, 20], ["f1.dat", "f2.dat"], {"phase": "start"})
assert si.values == [[1, 2]]
assert si.readbacks == [[10, 20]]
assert si.files == [["f1.dat", "f2.dat"]]
assert si.info == [{"phase": "start"}]
assert last_call['filename'] == si.filename
assert last_call['data'] == si.to_dict()
# to_sfdaq_dict
def test_to_sfdaq_dict_filled_example(tmp_path):
si = ScanInfo(
filename_base="scanAlpha",
base_dir=tmp_path,
adjustables=[
DummyAdjustable("motorX", "M1", "mm"),
DummyAdjustable("stageY", "S2", "deg"),
DummyAdjustable("lensZ", "L3", "cm"),
],
values=[0, 1, 2],
suffix="_scan_info.json",
)
result_empty = si.to_sfdaq_dict()
assert result_empty["scan_values"] is None
assert result_empty["scan_readbacks"] is None
si.append(
[1.0, 2.0, 3.0],
[1.1, 2.1, 3.1],
["f1.dat"],
{"note": "first run"}
)
si.append(
[4.0, 5.0, 6.0],
[4.1, 5.1, 6.1],
["f2.dat"],
{"note": "second run"}
)
result = si.to_sfdaq_dict()
expected_keys = {
"scan_name", "name", "Id", "units",
"offset", "conversion_factor",
"scan_values", "scan_readbacks", "scan_readbacks_raw",
}
assert set(result.keys()) == expected_keys
assert result["scan_values"] == [4.0, 5.0, 6.0]
assert result["scan_readbacks"] == [4.1, 5.1, 6.1]
assert result["scan_readbacks_raw"] == [4.1, 5.1, 6.1]
assert result["scan_name"] == "scanAlpha"
assert result["name"] == ["motorX", "stageY", "lensZ"]
assert result["Id"] == ["M1", "S2", "L3"]
assert result["units"] == ["mm", "deg", "cm"]
assert result["offset"] == [0, 0, 0]
assert result["conversion_factor"] == [1, 1, 1]
expected_dict = {
"scan_name": "scanAlpha",
"name": ["motorX", "stageY", "lensZ"],
"Id": ["M1", "S2", "L3"],
"units": ["mm", "deg", "cm"],
"offset": [0, 0, 0],
"conversion_factor": [1, 1, 1],
"scan_values": [4.0, 5.0, 6.0],
"scan_readbacks": [4.1, 5.1, 6.1],
"scan_readbacks_raw": [4.1, 5.1, 6.1],
}
assert result == expected_dict
def test_to_sfdaq_dict_with_single_adjustable(tmp_path):
si = ScanInfo("single", tmp_path, [DummyAdjustable("motor", "M1", "deg")], [0])
si.append([5.0], [5.1], ["data.dat"], {"run": 1})
result = si.to_sfdaq_dict()
assert result["name"] == ["motor"]
assert result["Id"] == ["M1"]
assert result["units"] == ["deg"]
assert result["offset"] == [0]
assert result["conversion_factor"] == [1]
assert result["scan_values"] == [5.0]
assert result["scan_readbacks"] == [5.1]
def test_to_sfdaq_dict_filename_with_slash(tmp_path):
si = ScanInfo("path/to/scan", tmp_path, [DummyAdjustable()], [0])
result = si.to_sfdaq_dict()
assert result["scan_name"] == "path_to_scan"
def test_to_sfdaq_dict_with_empty_adjustables_list(tmp_path):
si = ScanInfo("empty_adj", tmp_path, [], [])
result = si.to_sfdaq_dict()
assert result["name"] == []
assert result["Id"] == []
assert result["units"] == []
assert result["offset"] == []
assert result["conversion_factor"] == []
assert result["scan_values"] is None
assert result["scan_readbacks"] is None
def test_append_then_to_sfdaq_returns_last_values(tmp_path):
si = ScanInfo("last_test", tmp_path, [DummyAdjustable("X", "X1", "m")], [0])
si.append([10.0], [10.5], ["f1.dat"], {"n": 1})
si.append([20.0], [20.5], ["f2.dat"], {"n": 2})
si.append([30.0], [30.5], ["f3.dat"], {"n": 3})
result = si.to_sfdaq_dict()
assert result["scan_values"] == [30.0]
assert result["scan_readbacks"] == [30.5]
assert result["scan_readbacks_raw"] == [30.5]
# repr
def test_repr(tmp_path):
si = ScanInfo("test_scan", tmp_path, [DummyAdjustable()], [0], suffix="_info.json")
expected_path = str(tmp_path / "test_scan_info.json")
assert repr(si) == f"Scan info in {expected_path}"
# filename edge cases
def test_filename_base_already_contains_suffix(tmp_path):
si = ScanInfo("scan_scan_info.json", tmp_path, [DummyAdjustable()], [0], suffix="_scan_info.json")
assert si.filename.endswith("scan_scan_info.json_scan_info.json")
def test_filename_with_multiple_slashes(tmp_path):
si = ScanInfo("path/to/deep/scan", tmp_path, [DummyAdjustable()], [0])
result = si.to_sfdaq_dict()
assert result["scan_name"] == "path_to_deep_scan"
assert "/" not in result["scan_name"]
def test_suffix_empty_string(tmp_path):
si = ScanInfo("test", tmp_path, [DummyAdjustable()], [0], suffix="")
assert si.filename == str(tmp_path / "test")
assert not si.filename.endswith(".json")
def test_suffix_no_extension(tmp_path):
si = ScanInfo("test", tmp_path, [DummyAdjustable()], [0], suffix="_data")
assert si.filename.endswith("test_data")
def test_base_dir_with_trailing_slash(tmp_path):
base_with_slash = str(tmp_path) + "/"
si = ScanInfo("test", base_with_slash, [DummyAdjustable()], [0])
assert "test_scan_info.json" in si.filename
def test_filename_base_with_dots(tmp_path):
si = ScanInfo("scan.v1.0.test", tmp_path, [DummyAdjustable()], [0], suffix=".json")
assert "scan.v1.0.test.json" in si.filename
def test_multiple_slashes_and_underscores_in_filename_base(tmp_path):
si = ScanInfo("path/to_scan/with_underscores", tmp_path, [DummyAdjustable()], [0])
result = si.to_sfdaq_dict()
assert result["scan_name"] == "path_to_scan_with_underscores"
def test_adjustable_with_special_characters_in_name(tmp_path):
adj = DummyAdjustable("motor:X@123!", "ID#1", "µm/s²")
si = ScanInfo("special", tmp_path, [adj], [0])
assert si.names == ["motor:X@123!"]
assert si.IDs == ["ID#1"]
assert si.units == ["µm/s²"]
# Integration
# Tests complete workflow: init -> multiple updates -> verify final JSON structure
# Verifies that each update() call correctly appends data and saves to JSON
def test_full_integration_workflow(tmp_path, monkeypatch):
saved_data = []
def mock_json_save(data, filename):
saved_data.append({"data": data, "filename": filename})
monkeypatch.setattr('slic.core.scanner.scaninfo.json_save', mock_json_save)
si = ScanInfo(
"integration_scan",
tmp_path,
[DummyAdjustable("motorA", "MA", "mm"), DummyAdjustable("motorB", "MB", "deg")],
[0, 10, 20],
suffix=".json"
)
si.update([1.0, 2.0], [1.1, 2.1], ["file1.dat"], {"step": 1})
assert len(saved_data) == 1
si.update([3.0, 4.0], [3.1, 4.1], ["file2.dat"], {"step": 2})
assert len(saved_data) == 2
final_data = saved_data[-1]["data"]
assert final_data["scan_parameters"] == {
"name": ["motorA", "motorB"],
"Id": ["MA", "MB"],
"units": ["mm", "deg"]
}
assert final_data["scan_values"] == [[1.0, 2.0], [3.0, 4.0]]
assert final_data["scan_readbacks"] == [[1.1, 2.1], [3.1, 4.1]]
assert final_data["scan_files"] == [["file1.dat"], ["file2.dat"]]
assert final_data["scan_info"] == [{"step": 1}, {"step": 2}]