Files
slic/tests/test_scanbackend.py
tligui_y 2c7b11248b
Run CI Tests / test (push) Successful in 2m19s
Upload files to "tests"
2026-01-21 19:16:28 +01:00

1824 lines
50 KiB
Python

import pytest
from pathlib import Path
import os
from slic.core.scanner.scanbackend import (
ScanBackend,
is_sfdaq, is_only_sfdaq,
print_all_current_values, get_all_current_values,
set_all_target_values_and_wait, set_all_target_values,
wait_for_all, stop_all,
)
from slic.core.acquisition import SFAcquisition
from slic.core.acquisition.fakeacquisition import FakeAcquisition as _BaseFakeAcquisition
from slic.core.adjustable.dummyadjustable import DummyAdjustable
from slic.core.task import DAQTask
# FakeAcquisition
class FakeAcquisition(_BaseFakeAcquisition):
def __init__(self, instrument, pgroup):
super().__init__(instrument, pgroup)
self.call_count = 0
self.acquire_called = False
self.last_filename = None
self.last_data_base_dir = None
self.last_channels = None
self.last_n_pulses = None
self.last_wait = None
def acquire(self, filename=None, data_base_dir=None, detectors=None, channels=None, pvs=None, scan_info=None, n_pulses=100, n_repeat=1, is_scan_step=False, wait=True):
self.call_count += 1
self.acquire_called = True
self.last_filename = filename
self.last_data_base_dir = data_base_dir
self.last_channels = channels
self.last_n_pulses = n_pulses
self.last_wait = wait
return super().acquire(filename, data_base_dir, detectors, channels, pvs, scan_info, n_pulses, n_repeat, is_scan_step, wait)
def reset(self):
self.call_count = 0
self.acquire_called = False
self.last_filename = None
self.last_data_base_dir = None
self.last_channels = None
self.last_n_pulses = None
self.last_wait = None
# DummyCondition
class DummyCondition:
def __init__(self, repeats=0):
self.repeats = repeats
self._stopped = False
def wants_repeat(self):
self.repeats -= 1
return self.repeats >= 0
def stop(self):
self._stopped = True
# DummySensor
class DummySensor:
counter = 0
def __init__(self, name=None):
DummySensor.counter += 1
self.name = name or f"sensor_{DummySensor.counter}"
self.started = False
self.stopped = False
self._cache = {}
def start(self):
self.started = True
def stop(self):
self.stopped = True
def get(self):
return 3.14
# DummyRemotePlot
class DummyRemotePlot:
def __init__(self, fail=False):
self.fail = fail
self.created = False
self.appended = False
self.last_data = None
self.last_filename = None
def new_plot(self, filename, cfg):
self.created = True
if self.fail:
raise ConnectionRefusedError
def append_data(self, filename, data):
self.appended = True
self.last_data = data
self.last_filename = filename
if self.fail:
raise ConnectionRefusedError
# NonSFDAQAcquisition
class NonSFDAQAcquisition:
def __init__(self, name="NonSFDAQ", default_dir=None):
self.name = name
self.default_dir = default_dir
self.call_count = 0
self.last_filename = None
self.last_n_pulses = None
self.filenames = []
def acquire(self, filename=None, n_pulses=100, **_kwargs):
self.call_count += 1
self.last_filename = filename
self.last_n_pulses = n_pulses
def fake_acquire_func():
return [f"{filename}_nonsfdaq_{self.call_count}.h5"]
task = DAQTask(fake_acquire_func)
self.filenames.extend([f"{filename}_nonsfdaq_{self.call_count}.h5"])
return task
def __repr__(self):
return f"NonSFDAQAcquisition({self.name})"
# is_sfdaq
def test_is_sfdaq_and_only_sfdaq():
class MockConfig:
pgroup = None
class MockClient:
config = MockConfig()
class MockSFAcquisition(SFAcquisition):
def __init__(self, instrument, pgroup):
self.client = MockClient()
self.instrument = instrument
self._pgroup = pgroup
s1, s2 = MockSFAcquisition("test_instrument", "test_pgroup"), MockSFAcquisition("test_instrument", "test_pgroup")
f1 = FakeAcquisition("test_instrument", "test_pgroup")
random_obj = object()
assert is_sfdaq(s1)
assert is_sfdaq(f1)
assert not is_sfdaq(random_obj)
assert is_only_sfdaq([s1, s2])
assert is_only_sfdaq([f1, s1])
assert not is_only_sfdaq([s1, random_obj])
# get_filename
def test_get_filename(tmp_path):
adjs = [DummyAdjustable(ID="AX", name="AX")]
acqs = [FakeAcquisition("test_instrument", "test_pgroup")]
sb1 = ScanBackend(
adjs, [[1]], acqs, "scanfile",
detectors=[], channels=[], pvs=[],
n_pulses=1, data_base_dir="data", scan_info_dir=tmp_path,
make_scan_sub_dir=False, condition=None,
return_to_initial_values=True, n_repeat=1,
sensor=None, remote_plot=None
)
f1 = sb1.get_filename(7)
assert f1.endswith("scanfile_step0007")
assert os.path.basename(f1).startswith("scanfile")
sb2 = ScanBackend(
adjs, [[1]], acqs, "scanfile",
detectors=[], channels=[], pvs=[],
n_pulses=1, data_base_dir="data", scan_info_dir=tmp_path,
make_scan_sub_dir=True, condition=None,
return_to_initial_values=True, n_repeat=1,
sensor=None, remote_plot=None
)
f2 = sb2.get_filename(3)
expected_sub = os.path.join("scanfile", "scanfile_step0003")
assert f2.endswith(expected_sub)
sb3 = ScanBackend(
adjs, [[1]], acqs, "/tmp/path/to/custom_name",
detectors=[], channels=[], pvs=[],
n_pulses=1, data_base_dir="data", scan_info_dir=tmp_path,
make_scan_sub_dir=True, condition=None,
return_to_initial_values=True, n_repeat=1,
sensor=None, remote_plot=None
)
f3 = sb3.get_filename(1)
assert f3.endswith(os.path.join("custom_name", "custom_name_step0001"))
# create_output_dirs
def test_create_output_dirs(tmp_path):
class MockConfig:
pgroup = None
class MockClient:
config = MockConfig()
class MockSFAcquisition(SFAcquisition):
def __init__(self, instrument, pgroup):
self.client = MockClient()
self.instrument = instrument
self._pgroup = pgroup
adjs = [DummyAdjustable(ID="A1")]
sfdaq_acq = MockSFAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[1]], [sfdaq_acq],
filename="scan_sfdaq",
detectors=[], channels=[], pvs=[],
n_pulses=1, data_base_dir="data", scan_info_dir=tmp_path,
make_scan_sub_dir=True, condition=None,
return_to_initial_values=True, n_repeat=1,
sensor=None, remote_plot=None
)
sb.create_output_dirs()
for root, dirs, files in os.walk(tmp_path):
assert not dirs
fake_acq = FakeAcquisition("test_instrument", "test_pgroup")
fake_acq.default_dir = str(tmp_path / "fake_default")
sb = ScanBackend(
adjs, [[1]], [fake_acq],
filename="scan_fake",
detectors=[], channels=[], pvs=[],
n_pulses=1, data_base_dir="data", scan_info_dir=tmp_path,
make_scan_sub_dir=False, condition=None,
return_to_initial_values=True, n_repeat=1,
sensor=None, remote_plot=None
)
sb.create_output_dirs()
expected_data_dir = os.path.join(fake_acq.default_dir, sb.data_base_dir)
assert not os.path.exists(expected_data_dir)
# store_initial_values
def test_store_and_change_initial_values_restores_correctly(tmp_path):
adjs = [
DummyAdjustable(ID="A", initial_value=9, process_time=0),
DummyAdjustable(ID="B", initial_value=8, process_time=0)
]
sb = ScanBackend(
adjs, [[1]], [FakeAcquisition("test_instrument", "test_pgroup")],
filename="fn", detectors=[], channels=[], pvs=[],
n_pulses=1, data_base_dir="data", scan_info_dir=tmp_path,
make_scan_sub_dir=True, condition=None,
return_to_initial_values=True, n_repeat=1,
sensor=None, remote_plot=None
)
sb.store_initial_values()
initial_values = [a.get_current_value() for a in adjs]
assert initial_values == [9, 8]
assert sb.initial_values == [9, 8]
for a, new_val in zip(adjs, [100, 200]):
a.set_target_value(new_val)
changed_values = [a.get_current_value() for a in adjs]
assert changed_values == [100, 200]
sb.change_to_initial_values()
restored_values = [a.get_current_value() for a in adjs]
assert restored_values == [9, 8]
# acquire_all
def test_acquire_all_with_fake_acquisitions(tmp_path):
adjs = [DummyAdjustable(name="A", ID="A")]
fake_acq1 = FakeAcquisition("test_instrument", "test_pgroup")
fake_acq2 = FakeAcquisition("test_instrument", "test_pgroup")
acqs = [fake_acq1, fake_acq2]
sb = ScanBackend(
adjs, [[1]], acqs, "test_scan",
["detector1"], ["bs_channel1"], ["pv1"], 3,
"data", tmp_path, True, None, True, 1, None, None
)
filenames = sb.acquire_all("test_filename")
assert hasattr(sb, 'current_tasks')
assert len(sb.current_tasks) == 2
assert all(t.status == "done" for t in sb.current_tasks)
for acq in acqs:
assert acq.acquire_called
assert "test_scan" in acq.last_filename or acq.last_filename == "test_scan"
assert acq.last_data_base_dir == "data"
assert acq.last_channels == ["bs_channel1"]
assert acq.last_n_pulses == 3
assert acq.last_wait == False
assert len(filenames) >= 2
assert all(isinstance(fname, str) for fname in filenames)
assert all(len(fname) > 0 for fname in filenames)
sb.stop()
assert not sb.running
for t in sb.current_tasks:
assert t.status == "done"
# do_step
def test_do_step_with_fake_acquisitions(tmp_path):
adjs = [DummyAdjustable(name="motor1", ID="M1"), DummyAdjustable(name="motor2", ID="M2")]
fake_acq = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[1, 2], [3, 4]], [fake_acq], "test_scan",
[], ["bs_channel"], [], 1,
"data", tmp_path, True, None, True, 1, None, None
)
step_values = [1, 3]
n_step = 0
sb.do_step(n_step, step_values)
current_values = [adj.get_current_value() for adj in adjs]
assert current_values == step_values
assert len(sb.scan_info_sfdaq.values) > 0
assert len(sb.scan_info_sfdaq.readbacks) > 0
assert fake_acq.last_filename == "test_scan"
assert len(sb.scan_info.values) > 0
assert len(sb.scan_info.readbacks) > 0
def test_do_step_with_sensor_and_remote_plot(tmp_path):
adjs = [DummyAdjustable(name="motor", ID="M1")]
fake_acq = FakeAcquisition("test_instrument", "test_pgroup")
dummy_sensor = DummySensor()
dummy_remote_plot = DummyRemotePlot()
sb = ScanBackend(
adjs, [[1, 2, 3]], [fake_acq], "test_scan",
[], ["bs_channel"], [], 1,
"data", tmp_path, True, None, True, 1, dummy_sensor, dummy_remote_plot
)
step_values = [2]
n_step = 1
sb.do_step(n_step, step_values)
assert dummy_sensor.started
assert dummy_sensor.stopped
assert dummy_remote_plot.appended
x_value = adjs[0].get_current_value()
y_value = dummy_sensor.get()
expected_data = (float(x_value), float(y_value))
assert dummy_remote_plot.last_data == expected_data
def test_do_step_multiple_steps(tmp_path):
adjs = [DummyAdjustable(name="motor", ID="M1")]
fake_acq = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[1, 2, 3]], [fake_acq], "test_scan",
[], ["bs_channel"], [], 1,
"data", tmp_path, True, None, True, 1, None, None
)
step_sequences = [
(0, [1]),
(1, [2]),
(2, [3])
]
for n_step, step_values in step_sequences:
fake_acq.reset()
sb.do_step(n_step, step_values)
assert adjs[0].get_current_value() == step_values[0]
assert "test_scan" in fake_acq.last_filename
assert len(sb.scan_info.values) == n_step + 1
# do_checked_step
def test_do_checked_step_with_condition_repeats(tmp_path):
adjs = [DummyAdjustable(name="motor", ID="M1")]
fake_acq = FakeAcquisition("test_instrument", "test_pgroup")
condition = DummyCondition(repeats=2)
sb = ScanBackend(
adjs, [[1, 2]], [fake_acq], "test_scan",
[], ["bs_channel"], [], 1,
"data", tmp_path, True, condition, True, 1, None, None
)
step_values = [1]
n_step = 0
do_step_call_count = 0
original_do_step = sb.do_step
def mock_do_step(*args, **kwargs):
nonlocal do_step_call_count
do_step_call_count += 1
return original_do_step(*args, **kwargs)
sb.do_step = mock_do_step
sb.running = True
sb.do_checked_step(n_step, step_values)
assert do_step_call_count == 2
assert condition.repeats == -1
# _make_summary
def test_make_summary_and_repr(tmp_path):
adjs = [DummyAdjustable(name="A", ID="A")]
sb = ScanBackend(
adjs, [[1, 2]], [FakeAcquisition("test_instrument", "test_pgroup")],
"fn", [], [], [], 2,
"data", tmp_path, True, None, True, 2, None, None
)
s = sb._make_summary()
assert "record" in s and "pulse" in s
assert isinstance(repr(sb), str)
def test_make_summary_single_repeat(tmp_path):
adjs = [DummyAdjustable(name="motor1", ID="M1"), DummyAdjustable(name="motor2", ID="M2")]
fake_acq = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[1, 2], [3, 4]], [fake_acq], "test_scan",
[], ["bs_channel"], [], 5,
"data", tmp_path, True, None, True, 1,
None, None
)
summary = sb._make_summary()
assert "perform the following scan" in summary
assert "motor1" in summary
assert "motor2" in summary
assert "5 pulses" in summary
assert "test_scan" in summary
assert "FakeAcquisition" in summary
def test_make_summary_multiple_repeats(tmp_path):
adjs = [DummyAdjustable(name="motor", ID="M1")]
fake_acq = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[1]], [fake_acq], "multi_scan",
[], ["bs_channel"], [], 1,
"data", tmp_path, True, None, True, 3,
None, None
)
summary = sb._make_summary()
assert "repeat the following scan 3 times" in summary
assert "1 pulse" in summary
assert "multi_scan" in summary
# scan_loop
def test_scan_loop_fake_only(tmp_path, capsys):
adjs = [
DummyAdjustable(name="A", ID="A", initial_value=0),
DummyAdjustable(name="B", ID="B", initial_value=0),
]
fake = FakeAcquisition("test_instrument", "test_pgroup")
values = [[1, 10], [2, 20], [3, 30]]
sb = ScanBackend(
adjs, values, [fake], "scan1",
[], ["ch"], [], 1,
"data", tmp_path, False,
condition=None, return_to_initial_values=True, n_repeat=1,
sensor=None, remote_plot=None
)
sb.running = True
sb.scan_loop()
out = capsys.readouterr().out
assert "Scan step 1 of 3" in out
assert "Scan step 2 of 3" in out
assert "Scan step 3 of 3" in out
assert "All scan steps done" in out
assert fake.call_count == 3
assert adjs[0].get_current_value() == 3
assert adjs[1].get_current_value() == 30
# repeated_scan_loop
def test_repeated_scan_loop_fake_only(tmp_path, capsys):
adjs = [DummyAdjustable(name="A", ID="A")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
values = [[1], [2]]
sb = ScanBackend(
adjs, values, [fake], "rscan",
[], ["ch"], [], 1,
"data", tmp_path, False,
condition=None,
return_to_initial_values=True, n_repeat=3,
sensor=None, remote_plot=None
)
sb.running = True
sb.repeated_scan_loop()
out = capsys.readouterr().out
assert "Repetition 1 of 3" in out
assert "Repetition 2 of 3" in out
assert "Repetition 3 of 3" in out
assert fake.call_count == 6
assert sb.filename == "rscan"
# Full scan end-to-end
# Tests complete scan execution with multiple adjustables and steps
# Verifies all values, readbacks, filenames, and return to initial
def test_full_multidimensional_scan_end_to_end(tmp_path):
d = 4
n_steps = 5
adjs = [
DummyAdjustable(name=f"M{i}", ID=f"ID{i}", initial_value=0, process_time=0)
for i in range(d)
]
values = [
list(range(t, t + d))
for t in range(1, n_steps + 1)
]
fake = FakeAcquisition("test_instrument", "test_pgroup")
fake.default_dir = str(tmp_path / "fake_data")
sb = ScanBackend(
adjs, values, [fake],
filename="multidim_test",
detectors=[], channels=["ch"], pvs=[],
n_pulses=3,
data_base_dir="data",
scan_info_dir=tmp_path,
make_scan_sub_dir=True,
condition=None,
return_to_initial_values=True,
n_repeat=1,
sensor=None,
remote_plot=None
)
sb.run()
assert fake.call_count == n_steps
assert [a.get_current_value() for a in adjs] == [0] * d
assert len(sb.scan_info.values) == n_steps
assert len(sb.scan_info_sfdaq.values) == n_steps
for i in range(n_steps):
step_values = sb.scan_info.values[i]
assert step_values == values[i]
base = sb.filename
filebase = os.path.basename(base)
for i in range(n_steps):
expected = os.path.join(base, filebase + f"_step{i:04d}")
assert expected.endswith(f"{filebase}_step{i:04d}")
data_root = tmp_path / "fake_data" / "data"
assert not data_root.exists()
expected_subfolder = data_root / "multidim_test"
assert not expected_subfolder.exists()
assert all(t.status == "done" for t in sb.current_tasks)
all_files = []
for t in sb.current_tasks:
assert len(t.filenames) > 0
all_files += t.filenames
assert len(all_files) >= 1
for i in range(n_steps):
target = values[i]
readback = sb.scan_info_sfdaq.readbacks[i]
assert readback == target
assert sb.scan_info.values[0] == values[0]
assert sb.scan_info.values[-1] == values[-1]
def test_scanND_relative_positions_only(tmp_path):
d = 5
initial_values = [10 * (i + 1) for i in range(d)]
adjustables = [
DummyAdjustable(name=f"M{i}", ID=f"ID{i}", initial_value=initial_values[i], process_time=0)
for i in range(d)
]
positions_per_dim = [list(range(-1, 2))] * d
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjustables,
values=positions_per_dim,
acquisitions=[fake],
filename="relND",
detectors=[], channels=["ch"], pvs=[],
n_pulses=1,
data_base_dir="data",
scan_info_dir=tmp_path,
make_scan_sub_dir=False,
condition=None,
return_to_initial_values=True,
n_repeat=1,
sensor=None, remote_plot=None
)
offset_values = [
[p + initial_values[i] for p in [-1, 0, 1]]
for i in range(d)
]
sb.values = offset_values
for i in range(d):
expected = [
initial_values[i] - 1,
initial_values[i],
initial_values[i] + 1,
]
assert sb.values[i] == expected
assert len(sb.values) == d
assert all(len(axis) == 3 for axis in sb.values)
# Utility functions
def test_print_current_values_displays_correct_output(capsys):
class Obj:
def __init__(self, adjustables):
self.adjustables = adjustables
def print_current_values(self):
print_all_current_values(self.adjustables)
adjs = [
DummyAdjustable(ID="A1", name="MotorA", initial_value=10),
DummyAdjustable(ID="B2", name="MotorB", initial_value=20),
]
obj = Obj(adjs)
obj.print_current_values()
captured = capsys.readouterr().out
assert "Current values" in captured
assert "A1" in captured and "B2" in captured
assert "10" in captured and "20" in captured
def test_get_all_current_values_returns_correct_list():
adjs = [
DummyAdjustable(ID="M1", name="M1", initial_value=5),
DummyAdjustable(ID="M2", name="M2", initial_value=15),
]
values = get_all_current_values(adjs)
assert values == [5, 15]
def test_set_all_target_values_and_wait_full_chain():
adjs = [
DummyAdjustable(ID="1", initial_value=0, process_time=0),
DummyAdjustable(ID="2", initial_value=0, process_time=0),
]
set_all_target_values_and_wait(adjs, [10, 20])
values = [a.get_current_value() for a in adjs]
assert values == [10, 20]
tasks = set_all_target_values(adjs, [15, 25])
assert all(hasattr(t, "wait") for t in tasks)
wait_for_all(tasks)
values = [a.get_current_value() for a in adjs]
assert values == [15, 25]
def test_wait_for_all_calls_wait_on_all_tasks(monkeypatch):
called = []
class DummyTask:
def __init__(self, name):
self.name = name
def wait(self):
called.append(self.name)
tasks = [DummyTask("t1"), DummyTask("t2"), DummyTask("t3")]
wait_for_all(tasks)
assert called == ["t1", "t2", "t3"]
def test_stop_all_calls_stop_and_handles_exceptions(capsys):
called = []
class WorkingTask:
def __init__(self, name):
self.name = name
self.stopped = False
def stop(self):
self.stopped = True
called.append(self.name)
class FailingTask:
def stop(self):
raise RuntimeError("boom")
tasks = [WorkingTask("T1"), FailingTask(), WorkingTask("T2")]
stop_all(tasks)
assert all(t.stopped for t in tasks if isinstance(t, WorkingTask))
assert called == ["T1", "T2"]
out = capsys.readouterr().out
assert "Stopping caused" in out
assert "boom" in out
# Edge cases
def test_run_with_exception_handling(tmp_path, capsys):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[1], [2]], [fake], "error_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, False, 1, None, None
)
original_do_step = sb.do_step
def failing_do_step(*args, **kwargs):
if args[0] == 1:
raise ValueError("Intentional test error")
return original_do_step(*args, **kwargs)
sb.do_step = failing_do_step
sb.run()
out = capsys.readouterr().out
assert "Stopping because of:" in out
assert "Intentional test error" in out
assert "Stopped current DAQ tasks:" in out
assert not sb.running
def test_scan_interrupted_midway(tmp_path, capsys):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[1], [2], [3], [4], [5]], [fake], "interrupted_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
original_do_step = sb.do_step
def stopping_do_step(*args, **kwargs):
result = original_do_step(*args, **kwargs)
if args[0] == 1:
sb.running = False
return result
sb.do_step = stopping_do_step
sb.run()
out = capsys.readouterr().out
assert "Stopped during scan step" in out
assert "of 5" in out
assert len(sb.scan_info.values) == 2
def test_remote_plot_connection_refused_on_new_plot(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sensor = DummySensor()
remote_plot = DummyRemotePlot(fail=True)
sb = ScanBackend(
adjs, [[1]], [fake], "plot_fail_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, sensor, remote_plot
)
sb.run()
assert remote_plot.created
assert remote_plot.appended
def test_remote_plot_connection_refused_on_append_data(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sensor = DummySensor()
class SelectiveFailPlot(DummyRemotePlot):
def __init__(self):
super().__init__(fail=False)
self.append_fail = True
def append_data(self, filename, data):
self.appended = True
self.last_data = data
self.last_filename = filename
if self.append_fail:
raise ConnectionRefusedError("Connection refused")
remote_plot = SelectiveFailPlot()
sb = ScanBackend(
adjs, [[1]], [fake], "append_fail_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, sensor, remote_plot
)
sb.run()
assert remote_plot.created
assert remote_plot.appended
def test_step_info_callable(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
call_count = [0]
def step_info_func():
call_count[0] += 1
return {"call": call_count[0], "timestamp": "2024-01-01"}
sb = ScanBackend(
adjs, [[1], [2]], [fake], "callable_info_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
sb.running = True
sb.scan_loop(step_info=step_info_func)
assert call_count[0] == 4
assert len(sb.scan_info.info) == 2
assert sb.scan_info.info[0] == {"call": 2, "timestamp": "2024-01-01"}
assert sb.scan_info.info[1] == {"call": 4, "timestamp": "2024-01-01"}
def test_repeated_scan_with_very_large_n_repeat(tmp_path, capsys):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[1], [2]], [fake], "many_repeat_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 10, None, None
)
sb.run()
assert fake.call_count == 20
out = capsys.readouterr().out
assert "Repetition 1 of 10" in out
assert "Repetition 10 of 10" in out
def test_repeated_scan_loop_stops_when_running_false(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[1]], [fake], "stop_repeat_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 100, None, None
)
original_scan_loop = sb.scan_loop
rep_count = [0]
def counting_scan_loop(*args, **kwargs):
result = original_scan_loop(*args, **kwargs)
rep_count[0] += 1
if rep_count[0] >= 2:
sb.running = False
return result
sb.scan_loop = counting_scan_loop
sb.running = True
sb.repeated_scan_loop()
assert rep_count[0] == 2
assert fake.call_count == 2
def test_sfdaq_with_spreadsheet_logging(tmp_path):
adjs = [DummyAdjustable(name="M1", ID="M1"), DummyAdjustable(name="M2", ID="M2")]
logged_data = []
class MockSpreadsheet:
def add(self, run_number, filename, n_pulses, scanned_adjs, scan_values):
logged_data.append({
"run_number": run_number,
"filename": filename,
"n_pulses": n_pulses,
"scanned_adjs": scanned_adjs,
"scan_values": scan_values
})
fake = FakeAcquisition("test_instrument", "test_pgroup")
fake.spreadsheet = MockSpreadsheet()
sb = ScanBackend(
adjs, [[1, 2], [3, 4]], [fake], "spreadsheet_scan",
[], ["ch"], [], 5,
"data", tmp_path, False, None, True, 1, None, None
)
sb.scan_loop()
assert len(logged_data) == 1
log = logged_data[0]
assert log["run_number"] == 1
assert log["filename"] == "spreadsheet_scan"
assert log["n_pulses"] == 5
assert log["scanned_adjs"] == adjs
assert log["scan_values"] == [[1, 2], [3, 4]]
def test_scan_loop_with_single_value(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[42]], [fake], "single_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
sb.running = True
sb.scan_loop()
assert len(sb.scan_info.values) == 1
assert sb.scan_info.values[0] == [42]
assert fake.call_count == 1
def test_scan_loop_advances_run_number_per_acquisition(tmp_path, capsys):
adjs = [DummyAdjustable(name="M", ID="M")]
fake1 = FakeAcquisition("test_instrument", "test_pgroup")
fake2 = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[1]], [fake1, fake2], "multi_acq_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
sb.scan_loop()
out = capsys.readouterr().out
assert "Advanced run number to 1 for" in out
assert out.count("Advanced run number to") == 2
def test_return_to_initial_values_false(tmp_path):
adjs = [DummyAdjustable(ID="M", initial_value=100, process_time=0)]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[1], [2], [3]], [fake], "no_return_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, False, 1, None, None
)
sb.run()
assert adjs[0].get_current_value() == 3
assert adjs[0].get_current_value() != 100
# N-dimensional grid scans
def test_true_2D_grid_scan(tmp_path):
N = 2
adjs = [DummyAdjustable(name=f"Adj{i}", ID=f"Adj{i}") for i in range(N)]
x_vals = [0, 1, 2]
y_vals = [10, 20]
values = [[x, y] for x in x_vals for y in y_vals]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, values, [fake], "grid_2d_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
sb.running = True
sb.scan_loop()
assert len(sb.scan_info.values) == 6
assert fake.call_count == 6
assert sb.scan_info.values[0] == [0, 10]
assert sb.scan_info.values[1] == [0, 20]
assert sb.scan_info.values[2] == [1, 10]
assert sb.scan_info.values[3] == [1, 20]
assert sb.scan_info.values[4] == [2, 10]
assert sb.scan_info.values[5] == [2, 20]
def test_true_3D_grid_scan(tmp_path):
N = 3
adjs = [DummyAdjustable(name=f"Adj{i}", ID=f"Adj{i}") for i in range(N)]
x_vals = [0, 1]
y_vals = [10, 20]
z_vals = [100, 200, 300]
values = [[x, y, z] for x in x_vals for y in y_vals for z in z_vals]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, values, [fake], "grid_3d_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
sb.running = True
sb.scan_loop()
assert len(sb.scan_info.values) == 12
assert fake.call_count == 12
assert sb.scan_info.values[0] == [0, 10, 100]
assert sb.scan_info.values[1] == [0, 10, 200]
assert sb.scan_info.values[2] == [0, 10, 300]
assert sb.scan_info.values[3] == [0, 20, 100]
assert sb.scan_info.values[10] == [1, 20, 200]
assert sb.scan_info.values[11] == [1, 20, 300]
def test_true_4D_grid_scan(tmp_path):
N = 4
adjs = [DummyAdjustable(name=f"Adj{i}", ID=f"Adj{i}") for i in range(N)]
dim_vals = [[0, 1], [10, 11], [100, 101], [1000, 1001]]
values = []
for v0 in dim_vals[0]:
for v1 in dim_vals[1]:
for v2 in dim_vals[2]:
for v3 in dim_vals[3]:
values.append([v0, v1, v2, v3])
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, values, [fake], "grid_4d_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
sb.running = True
sb.scan_loop()
assert len(sb.scan_info.values) == 16
assert fake.call_count == 16
assert sb.scan_info.values[0] == [0, 10, 100, 1000]
assert sb.scan_info.values[15] == [1, 11, 101, 1001]
def test_parametrized_ND_grid_scan_generator(tmp_path):
import itertools
def generate_nd_grid(dim_values_list):
return [list(combo) for combo in itertools.product(*dim_values_list)]
N = 2
dim_values = [[1, 2, 3], [100, 200]]
adjs = [DummyAdjustable(name=f"A{i}", ID=f"A{i}") for i in range(N)]
values = generate_nd_grid(dim_values)
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, values, [fake], "nd_gen_2d",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
sb.running = True
sb.scan_loop()
assert len(sb.scan_info.values) == 6
assert sb.scan_info.values[0] == [1, 100]
assert sb.scan_info.values[5] == [3, 200]
N = 3
dim_values = [[1, 2], [10, 20, 30], [100]]
adjs = [DummyAdjustable(name=f"A{i}", ID=f"A{i}") for i in range(N)]
values = generate_nd_grid(dim_values)
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, values, [fake], "nd_gen_3d",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
sb.running = True
sb.scan_loop()
assert len(sb.scan_info.values) == 6
assert sb.scan_info.values[0] == [1, 10, 100]
assert sb.scan_info.values[5] == [2, 30, 100]
N = 5
dim_values = [[1, 2], [10], [100, 200], [1000], [10000, 20000, 30000]]
adjs = [DummyAdjustable(name=f"A{i}", ID=f"A{i}") for i in range(N)]
values = generate_nd_grid(dim_values)
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, values, [fake], "nd_gen_5d",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
sb.running = True
sb.scan_loop()
assert len(sb.scan_info.values) == 12
assert sb.scan_info.values[0] == [1, 10, 100, 1000, 10000]
assert sb.scan_info.values[11] == [2, 10, 200, 1000, 30000]
def test_ND_grid_with_readbacks_verification(tmp_path):
import itertools
N = 3
dim_values = [[0, 1], [10, 20], [100, 200, 300]]
adjs = [DummyAdjustable(name=f"Motor{i}", ID=f"Motor{i}") for i in range(N)]
values = [list(combo) for combo in itertools.product(*dim_values)]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, values, [fake], "grid_readbacks",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
sb.running = True
sb.scan_loop()
assert len(sb.scan_info.readbacks) == 12
for i, expected_vals in enumerate(values):
assert sb.scan_info.readbacks[i] == expected_vals
assert sb.scan_info.values[0] == [0, 10, 100]
assert sb.scan_info.readbacks[0] == [0, 10, 100]
assert sb.scan_info.values[11] == [1, 20, 300]
assert sb.scan_info.readbacks[11] == [1, 20, 300]
def test_large_ND_grid_scan(tmp_path):
N = 3
dim_values = [[0, 1, 2], [10, 20, 30], [100, 200, 300]]
import itertools
adjs = [DummyAdjustable(name=f"Axis{i}", ID=f"Axis{i}") for i in range(N)]
values = [list(combo) for combo in itertools.product(*dim_values)]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, values, [fake], "large_grid_3d",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
sb.running = True
sb.scan_loop()
assert len(sb.scan_info.values) == 27
assert fake.call_count == 27
assert sb.scan_info.values[0] == [0, 10, 100]
assert sb.scan_info.values[8] == [0, 30, 300]
assert sb.scan_info.values[26] == [2, 30, 300]
def test_ND_grid_interrupted_midway(tmp_path):
N = 3
dim_values = [[0, 1, 2], [10, 20], [100, 200, 300]]
import itertools
adjs = [DummyAdjustable(name=f"M{i}", ID=f"M{i}") for i in range(N)]
values = [list(combo) for combo in itertools.product(*dim_values)]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, values, [fake], "interrupted_grid",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
original_do_step = sb.do_step
step_count = [0]
def counting_do_step(*args, **kwargs):
result = original_do_step(*args, **kwargs)
step_count[0] += 1
if step_count[0] >= 5:
sb.running = False
return result
sb.do_step = counting_do_step
sb.running = True
sb.scan_loop()
assert len(sb.scan_info.values) == 5
assert fake.call_count == 5
assert step_count[0] == 5
# Non-SFDAQ acquisitions
def test_nonsfdaq_acquisition_only(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
nonsfdaq = NonSFDAQAcquisition(name="TestNonSFDAQ", default_dir=str(tmp_path))
sb = ScanBackend(
adjs, [[1], [2]], [nonsfdaq], "nonsfdaq_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
assert sb.filename != "nonsfdaq_scan"
assert sb.filename_sfdaq == "nonsfdaq_scan"
sb.running = True
sb.scan_loop()
assert nonsfdaq.call_count == 2
assert len(sb.scan_info.values) == 2
assert sb.scan_info.values[0] == [1]
assert sb.scan_info.values[1] == [2]
def test_nonsfdaq_with_make_scan_sub_dir(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
default_dir = tmp_path / "nonsfdaq_default"
default_dir.mkdir()
nonsfdaq = NonSFDAQAcquisition(name="TestNonSFDAQ", default_dir=str(default_dir))
sb = ScanBackend(
adjs, [[1]], [nonsfdaq], "subdir_scan",
[], ["ch"], [], 1,
"data", tmp_path, True, None, True, 1, None, None
)
sb.create_output_dirs()
expected_subdir = default_dir / "data" / sb.filename
assert expected_subdir.exists()
def test_nonsfdaq_acquire_all_different_params(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
nonsfdaq = NonSFDAQAcquisition(name="TestNonSFDAQ", default_dir=str(tmp_path))
sb = ScanBackend(
adjs, [[1]], [nonsfdaq], "param_test",
[], ["ch"], [], 5,
"data", tmp_path, False, None, True, 1, None, None
)
filename = sb.get_filename(0)
sb.acquire_all(filename)
assert nonsfdaq.last_filename == filename
assert nonsfdaq.last_n_pulses == 5
def test_mixed_sfdaq_and_nonsfdaq_acquisitions(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
nonsfdaq = NonSFDAQAcquisition(name="Mixed", default_dir=str(tmp_path))
sb = ScanBackend(
adjs, [[1], [2]], [fake, nonsfdaq], "mixed_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
assert sb.filename != "mixed_scan"
sb.running = True
sb.scan_loop()
assert fake.call_count == 2
assert nonsfdaq.call_count == 2
def test_nonsfdaq_with_default_dir_none(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
nonsfdaq = NonSFDAQAcquisition(name="NoDir", default_dir=None)
sb = ScanBackend(
adjs, [[1]], [nonsfdaq], "nodir_scan",
[], ["ch"], [], 1,
"data", tmp_path, True, None, True, 1, None, None
)
sb.create_output_dirs()
sb.running = True
sb.scan_loop()
assert nonsfdaq.call_count == 1
# return_to_initial_values=None
def test_return_to_initial_values_none_user_says_yes(tmp_path, monkeypatch):
adjs = [DummyAdjustable(ID="M", initial_value=100, process_time=0)]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[1], [2]], [fake], "interactive_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, None, 1, None, None
)
user_response = [True]
def mock_ask_yes_no(prompt):
return user_response[0]
monkeypatch.setattr("slic.core.scanner.scanbackend.ask_Yes_no", mock_ask_yes_no)
sb.run()
assert adjs[0].get_current_value() == 100
def test_return_to_initial_values_none_user_says_no(tmp_path, monkeypatch):
adjs = [DummyAdjustable(ID="M", initial_value=100, process_time=0)]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[1], [2]], [fake], "interactive_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, None, 1, None, None
)
def mock_ask_yes_no(prompt):
return False
monkeypatch.setattr("slic.core.scanner.scanbackend.ask_Yes_no", mock_ask_yes_no)
sb.run()
assert adjs[0].get_current_value() == 2
# n_repeat=None (infinite)
def test_n_repeat_none_infinite_with_early_stop(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[1]], [fake], "infinite_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, None, None, None
)
original_scan_loop = sb.scan_loop
rep_count = [0]
def counting_scan_loop(*args, **kwargs):
result = original_scan_loop(*args, **kwargs)
rep_count[0] += 1
if rep_count[0] >= 3:
sb.running = False
return result
sb.scan_loop = counting_scan_loop
sb.running = True
sb.repeated_scan_loop()
assert rep_count[0] == 3
assert fake.call_count == 3
def test_n_repeat_none_printable_summary(tmp_path, capsys):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[1]], [fake], "infinite_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, None, None, None
)
summary = sb._make_summary()
assert "repeat the following scan forever" in summary
# KeyboardInterrupt
def test_keyboard_interrupt_handling(tmp_path, capsys):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[1], [2], [3]], [fake], "keyboard_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
original_do_step = sb.do_step
step_count = [0]
def interrupt_do_step(*args, **kwargs):
step_count[0] += 1
if step_count[0] >= 2:
raise KeyboardInterrupt
return original_do_step(*args, **kwargs)
sb.do_step = interrupt_do_step
sb.run()
assert step_count[0] == 2
assert sb.running == False
out = capsys.readouterr().out
assert "Stopped current DAQ tasks:" in out
# sensor without remote_plot (BUG)
def test_sensor_without_remote_plot(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sensor = DummySensor(name="TestSensor")
sb = ScanBackend(
adjs, [[1], [2]], [fake], "sensor_only_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, sensor, None
)
sb.run()
assert sensor.started
assert sensor.stopped
assert fake.call_count == 2
# condition with n_repeat>1
def test_condition_with_multiple_repeats(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
condition = DummyCondition(repeats=6)
sb = ScanBackend(
adjs, [[1]], [fake], "condition_repeat_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, condition, True, 3, None, None
)
sb.run()
assert fake.call_count == 6
# condition.stop()
def test_stop_calls_condition_stop(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
condition = DummyCondition(repeats=0)
sb = ScanBackend(
adjs, [[1]], [fake], "stop_condition_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, condition, True, 1, None, None
)
assert not condition._stopped
sb.stop()
assert condition._stopped
# step_info as static dict
def test_step_info_as_static_dict(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[1], [2]], [fake], "static_info_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
static_info = {"experiment": "test", "user": "testuser"}
sb.running = True
sb.scan_loop(step_info=static_info)
assert len(sb.scan_info.info) == 2
assert sb.scan_info.info[0] == static_info
assert sb.scan_info.info[1] == static_info
# Empty values (BUG)
def test_scan_with_empty_values_list(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [], [fake], "empty_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
sb.running = True
sb.scan_loop()
assert len(sb.scan_info.values) == 0
assert fake.call_count == 0
def test_scan_with_empty_acquisitions_list(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
sb = ScanBackend(
adjs, [[1], [2]], [], "no_acq_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
sb.running = True
sb.scan_loop()
assert len(sb.scan_info.values) == 2
def test_scan_with_very_large_istep(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[1]], [fake], "large_step_scan",
[], ["ch"], [], 1,
"data", tmp_path, True, None, True, 1, None, None
)
filename = sb.get_filename(12345)
assert "step12345" in filename
def test_condition_with_minimum_repeats(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
condition = DummyCondition(repeats=1)
sb = ScanBackend(
adjs, [[1]], [fake], "min_repeat_condition",
[], ["ch"], [], 1,
"data", tmp_path, False, condition, True, 1, None, None
)
sb.running = True
sb.scan_loop()
assert fake.call_count == 1
def test_remote_plot_without_sensor(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
remote_plot = DummyRemotePlot()
sb = ScanBackend(
adjs, [[1], [2]], [fake], "plot_no_sensor_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, remote_plot
)
sb.run()
assert not remote_plot.created
assert fake.call_count == 2
def test_n_repeat_with_n_pulses_formatting(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb1 = ScanBackend(
adjs, [[1]], [fake], "scan1",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
summary1 = sb1._make_summary()
assert "1 pulse" in summary1
sb2 = ScanBackend(
adjs, [[1]], [fake], "scan2",
[], ["ch"], [], 5,
"data", tmp_path, False, None, True, 1, None, None
)
summary2 = sb2._make_summary()
assert "5 pulses" in summary2
def test_scan_stopped_before_first_iteration(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, [[1], [2], [3]], [fake], "stop_early_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
sb.running = False
sb.scan_loop()
assert len(sb.scan_info.values) == 0
assert fake.call_count == 0
# Complete scan verification
# Verifies ALL values, readbacks, acquisitions, and return to initial
def test_complete_scan_values_readbacks_and_return_to_initial(tmp_path):
adj1 = DummyAdjustable(name="Motor1", ID="M1", initial_value=0, process_time=0)
adj2 = DummyAdjustable(name="Motor2", ID="M2", initial_value=0, process_time=0)
adjs = [adj1, adj2]
scan_values = [
[10, 100],
[20, 200],
[30, 300]
]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
adjs, scan_values, [fake], "complete_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, True, 1, None, None
)
initial_before = [adj1.get_current_value(), adj2.get_current_value()]
assert initial_before == [0, 0], "Initial values should be [0, 0]"
sb.run()
assert fake.call_count == 3, f"Expected 3 acquisitions, got {fake.call_count}"
assert len(sb.scan_info.values) == 3, "Should have 3 recorded values"
assert sb.scan_info.values[0] == [10, 100], f"Step 0: expected [10, 100], got {sb.scan_info.values[0]}"
assert sb.scan_info.values[1] == [20, 200], f"Step 1: expected [20, 200], got {sb.scan_info.values[1]}"
assert sb.scan_info.values[2] == [30, 300], f"Step 2: expected [30, 300], got {sb.scan_info.values[2]}"
assert len(sb.scan_info.readbacks) == 3, "Should have 3 readback records"
assert sb.scan_info.readbacks[0] == [10, 100], f"Readback 0: expected [10, 100], got {sb.scan_info.readbacks[0]}"
assert sb.scan_info.readbacks[1] == [20, 200], f"Readback 1: expected [20, 200], got {sb.scan_info.readbacks[1]}"
assert sb.scan_info.readbacks[2] == [30, 300], f"Readback 2: expected [30, 300], got {sb.scan_info.readbacks[2]}"
final_values = [adj1.get_current_value(), adj2.get_current_value()]
assert final_values == [0, 0], f"Should return to [0, 0], but got {final_values}"
assert sb.initial_values == [0, 0], f"Stored initial values should be [0, 0], got {sb.initial_values}"
def test_scan_without_return_to_initial_stays_at_last_value(tmp_path):
adj1 = DummyAdjustable(name="M1", ID="M1", initial_value=100, process_time=0)
adj2 = DummyAdjustable(name="M2", ID="M2", initial_value=200, process_time=0)
scan_values = [
[10, 20],
[30, 40],
[50, 60]
]
fake = FakeAcquisition("test_instrument", "test_pgroup")
sb = ScanBackend(
[adj1, adj2], scan_values, [fake], "no_return_scan",
[], ["ch"], [], 1,
"data", tmp_path, False, None, False, 1, None, None
)
assert adj1.get_current_value() == 100
assert adj2.get_current_value() == 200
sb.run()
assert adj1.get_current_value() == 50, f"M1 should be at 50, got {adj1.get_current_value()}"
assert adj2.get_current_value() == 60, f"M2 should be at 60, got {adj2.get_current_value()}"
def test_acquisition_parameters_are_correct(tmp_path):
adjs = [DummyAdjustable(name="M", ID="M")]
fake = FakeAcquisition("test_instrument", "test_pgroup")
scan_values = [[1], [2], [3]]
sb = ScanBackend(
adjs, scan_values, [fake], "param_check_scan",
[], ["test_channel"], [], 5,
"data", tmp_path, False, None, True, 1, None, None
)
sb.run()
assert fake.call_count == 3
assert fake.last_n_pulses == 5, f"Expected n_pulses=5, got {fake.last_n_pulses}"
assert fake.last_channels == ["test_channel"], f"Expected channels=['test_channel'], got {fake.last_channels}"