1824 lines
50 KiB
Python
1824 lines
50 KiB
Python
import pytest
|
|
from pathlib import Path
|
|
import os
|
|
|
|
from slic.core.scanner.scanbackend import (
|
|
ScanBackend,
|
|
is_sfdaq, is_only_sfdaq,
|
|
print_all_current_values, get_all_current_values,
|
|
set_all_target_values_and_wait, set_all_target_values,
|
|
wait_for_all, stop_all,
|
|
)
|
|
|
|
from slic.core.acquisition import SFAcquisition
|
|
from slic.core.acquisition.fakeacquisition import FakeAcquisition as _BaseFakeAcquisition
|
|
from slic.core.adjustable.dummyadjustable import DummyAdjustable
|
|
from slic.core.task import DAQTask
|
|
|
|
|
|
# FakeAcquisition
|
|
|
|
class FakeAcquisition(_BaseFakeAcquisition):
|
|
|
|
def __init__(self, instrument, pgroup):
|
|
super().__init__(instrument, pgroup)
|
|
self.call_count = 0
|
|
self.acquire_called = False
|
|
self.last_filename = None
|
|
self.last_data_base_dir = None
|
|
self.last_channels = None
|
|
self.last_n_pulses = None
|
|
self.last_wait = None
|
|
|
|
def acquire(self, filename=None, data_base_dir=None, detectors=None, channels=None, pvs=None, scan_info=None, n_pulses=100, n_repeat=1, is_scan_step=False, wait=True):
|
|
self.call_count += 1
|
|
self.acquire_called = True
|
|
self.last_filename = filename
|
|
self.last_data_base_dir = data_base_dir
|
|
self.last_channels = channels
|
|
self.last_n_pulses = n_pulses
|
|
self.last_wait = wait
|
|
return super().acquire(filename, data_base_dir, detectors, channels, pvs, scan_info, n_pulses, n_repeat, is_scan_step, wait)
|
|
|
|
def reset(self):
|
|
self.call_count = 0
|
|
self.acquire_called = False
|
|
self.last_filename = None
|
|
self.last_data_base_dir = None
|
|
self.last_channels = None
|
|
self.last_n_pulses = None
|
|
self.last_wait = None
|
|
|
|
|
|
# DummyCondition
|
|
|
|
class DummyCondition:
|
|
def __init__(self, repeats=0):
|
|
self.repeats = repeats
|
|
self._stopped = False
|
|
def wants_repeat(self):
|
|
self.repeats -= 1
|
|
return self.repeats >= 0
|
|
def stop(self):
|
|
self._stopped = True
|
|
|
|
|
|
# DummySensor
|
|
|
|
class DummySensor:
|
|
counter = 0
|
|
def __init__(self, name=None):
|
|
DummySensor.counter += 1
|
|
self.name = name or f"sensor_{DummySensor.counter}"
|
|
self.started = False
|
|
self.stopped = False
|
|
self._cache = {}
|
|
def start(self):
|
|
self.started = True
|
|
def stop(self):
|
|
self.stopped = True
|
|
def get(self):
|
|
return 3.14
|
|
|
|
|
|
# DummyRemotePlot
|
|
|
|
class DummyRemotePlot:
|
|
def __init__(self, fail=False):
|
|
self.fail = fail
|
|
self.created = False
|
|
self.appended = False
|
|
self.last_data = None
|
|
self.last_filename = None
|
|
|
|
def new_plot(self, filename, cfg):
|
|
self.created = True
|
|
if self.fail:
|
|
raise ConnectionRefusedError
|
|
|
|
def append_data(self, filename, data):
|
|
self.appended = True
|
|
self.last_data = data
|
|
self.last_filename = filename
|
|
if self.fail:
|
|
raise ConnectionRefusedError
|
|
|
|
|
|
# NonSFDAQAcquisition
|
|
|
|
class NonSFDAQAcquisition:
|
|
|
|
def __init__(self, name="NonSFDAQ", default_dir=None):
|
|
self.name = name
|
|
self.default_dir = default_dir
|
|
self.call_count = 0
|
|
self.last_filename = None
|
|
self.last_n_pulses = None
|
|
self.filenames = []
|
|
|
|
def acquire(self, filename=None, n_pulses=100, **_kwargs):
|
|
self.call_count += 1
|
|
self.last_filename = filename
|
|
self.last_n_pulses = n_pulses
|
|
|
|
def fake_acquire_func():
|
|
return [f"{filename}_nonsfdaq_{self.call_count}.h5"]
|
|
|
|
task = DAQTask(fake_acquire_func)
|
|
self.filenames.extend([f"{filename}_nonsfdaq_{self.call_count}.h5"])
|
|
return task
|
|
|
|
def __repr__(self):
|
|
return f"NonSFDAQAcquisition({self.name})"
|
|
|
|
|
|
# is_sfdaq
|
|
|
|
def test_is_sfdaq_and_only_sfdaq():
|
|
class MockConfig:
|
|
pgroup = None
|
|
|
|
class MockClient:
|
|
config = MockConfig()
|
|
|
|
class MockSFAcquisition(SFAcquisition):
|
|
def __init__(self, instrument, pgroup):
|
|
self.client = MockClient()
|
|
self.instrument = instrument
|
|
self._pgroup = pgroup
|
|
|
|
s1, s2 = MockSFAcquisition("test_instrument", "test_pgroup"), MockSFAcquisition("test_instrument", "test_pgroup")
|
|
f1 = FakeAcquisition("test_instrument", "test_pgroup")
|
|
random_obj = object()
|
|
|
|
assert is_sfdaq(s1)
|
|
assert is_sfdaq(f1)
|
|
assert not is_sfdaq(random_obj)
|
|
|
|
assert is_only_sfdaq([s1, s2])
|
|
assert is_only_sfdaq([f1, s1])
|
|
assert not is_only_sfdaq([s1, random_obj])
|
|
|
|
|
|
# get_filename
|
|
|
|
def test_get_filename(tmp_path):
|
|
adjs = [DummyAdjustable(ID="AX", name="AX")]
|
|
acqs = [FakeAcquisition("test_instrument", "test_pgroup")]
|
|
|
|
sb1 = ScanBackend(
|
|
adjs, [[1]], acqs, "scanfile",
|
|
detectors=[], channels=[], pvs=[],
|
|
n_pulses=1, data_base_dir="data", scan_info_dir=tmp_path,
|
|
make_scan_sub_dir=False, condition=None,
|
|
return_to_initial_values=True, n_repeat=1,
|
|
sensor=None, remote_plot=None
|
|
)
|
|
f1 = sb1.get_filename(7)
|
|
assert f1.endswith("scanfile_step0007")
|
|
assert os.path.basename(f1).startswith("scanfile")
|
|
|
|
sb2 = ScanBackend(
|
|
adjs, [[1]], acqs, "scanfile",
|
|
detectors=[], channels=[], pvs=[],
|
|
n_pulses=1, data_base_dir="data", scan_info_dir=tmp_path,
|
|
make_scan_sub_dir=True, condition=None,
|
|
return_to_initial_values=True, n_repeat=1,
|
|
sensor=None, remote_plot=None
|
|
)
|
|
f2 = sb2.get_filename(3)
|
|
expected_sub = os.path.join("scanfile", "scanfile_step0003")
|
|
assert f2.endswith(expected_sub)
|
|
|
|
sb3 = ScanBackend(
|
|
adjs, [[1]], acqs, "/tmp/path/to/custom_name",
|
|
detectors=[], channels=[], pvs=[],
|
|
n_pulses=1, data_base_dir="data", scan_info_dir=tmp_path,
|
|
make_scan_sub_dir=True, condition=None,
|
|
return_to_initial_values=True, n_repeat=1,
|
|
sensor=None, remote_plot=None
|
|
)
|
|
f3 = sb3.get_filename(1)
|
|
assert f3.endswith(os.path.join("custom_name", "custom_name_step0001"))
|
|
|
|
|
|
# create_output_dirs
|
|
|
|
def test_create_output_dirs(tmp_path):
|
|
class MockConfig:
|
|
pgroup = None
|
|
|
|
class MockClient:
|
|
config = MockConfig()
|
|
|
|
class MockSFAcquisition(SFAcquisition):
|
|
def __init__(self, instrument, pgroup):
|
|
self.client = MockClient()
|
|
self.instrument = instrument
|
|
self._pgroup = pgroup
|
|
|
|
adjs = [DummyAdjustable(ID="A1")]
|
|
|
|
sfdaq_acq = MockSFAcquisition("test_instrument", "test_pgroup")
|
|
sb = ScanBackend(
|
|
adjs, [[1]], [sfdaq_acq],
|
|
filename="scan_sfdaq",
|
|
detectors=[], channels=[], pvs=[],
|
|
n_pulses=1, data_base_dir="data", scan_info_dir=tmp_path,
|
|
make_scan_sub_dir=True, condition=None,
|
|
return_to_initial_values=True, n_repeat=1,
|
|
sensor=None, remote_plot=None
|
|
)
|
|
|
|
sb.create_output_dirs()
|
|
for root, dirs, files in os.walk(tmp_path):
|
|
assert not dirs
|
|
|
|
fake_acq = FakeAcquisition("test_instrument", "test_pgroup")
|
|
fake_acq.default_dir = str(tmp_path / "fake_default")
|
|
sb = ScanBackend(
|
|
adjs, [[1]], [fake_acq],
|
|
filename="scan_fake",
|
|
detectors=[], channels=[], pvs=[],
|
|
n_pulses=1, data_base_dir="data", scan_info_dir=tmp_path,
|
|
make_scan_sub_dir=False, condition=None,
|
|
return_to_initial_values=True, n_repeat=1,
|
|
sensor=None, remote_plot=None
|
|
)
|
|
|
|
sb.create_output_dirs()
|
|
expected_data_dir = os.path.join(fake_acq.default_dir, sb.data_base_dir)
|
|
assert not os.path.exists(expected_data_dir)
|
|
|
|
|
|
# store_initial_values
|
|
|
|
def test_store_and_change_initial_values_restores_correctly(tmp_path):
|
|
adjs = [
|
|
DummyAdjustable(ID="A", initial_value=9, process_time=0),
|
|
DummyAdjustable(ID="B", initial_value=8, process_time=0)
|
|
]
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1]], [FakeAcquisition("test_instrument", "test_pgroup")],
|
|
filename="fn", detectors=[], channels=[], pvs=[],
|
|
n_pulses=1, data_base_dir="data", scan_info_dir=tmp_path,
|
|
make_scan_sub_dir=True, condition=None,
|
|
return_to_initial_values=True, n_repeat=1,
|
|
sensor=None, remote_plot=None
|
|
)
|
|
|
|
sb.store_initial_values()
|
|
initial_values = [a.get_current_value() for a in adjs]
|
|
assert initial_values == [9, 8]
|
|
assert sb.initial_values == [9, 8]
|
|
|
|
for a, new_val in zip(adjs, [100, 200]):
|
|
a.set_target_value(new_val)
|
|
|
|
changed_values = [a.get_current_value() for a in adjs]
|
|
assert changed_values == [100, 200]
|
|
|
|
sb.change_to_initial_values()
|
|
|
|
restored_values = [a.get_current_value() for a in adjs]
|
|
assert restored_values == [9, 8]
|
|
|
|
|
|
# acquire_all
|
|
|
|
def test_acquire_all_with_fake_acquisitions(tmp_path):
|
|
adjs = [DummyAdjustable(name="A", ID="A")]
|
|
|
|
fake_acq1 = FakeAcquisition("test_instrument", "test_pgroup")
|
|
fake_acq2 = FakeAcquisition("test_instrument", "test_pgroup")
|
|
acqs = [fake_acq1, fake_acq2]
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1]], acqs, "test_scan",
|
|
["detector1"], ["bs_channel1"], ["pv1"], 3,
|
|
"data", tmp_path, True, None, True, 1, None, None
|
|
)
|
|
|
|
filenames = sb.acquire_all("test_filename")
|
|
|
|
assert hasattr(sb, 'current_tasks')
|
|
assert len(sb.current_tasks) == 2
|
|
|
|
assert all(t.status == "done" for t in sb.current_tasks)
|
|
|
|
for acq in acqs:
|
|
assert acq.acquire_called
|
|
assert "test_scan" in acq.last_filename or acq.last_filename == "test_scan"
|
|
assert acq.last_data_base_dir == "data"
|
|
assert acq.last_channels == ["bs_channel1"]
|
|
assert acq.last_n_pulses == 3
|
|
assert acq.last_wait == False
|
|
|
|
assert len(filenames) >= 2
|
|
assert all(isinstance(fname, str) for fname in filenames)
|
|
assert all(len(fname) > 0 for fname in filenames)
|
|
|
|
sb.stop()
|
|
assert not sb.running
|
|
for t in sb.current_tasks:
|
|
assert t.status == "done"
|
|
|
|
|
|
# do_step
|
|
|
|
def test_do_step_with_fake_acquisitions(tmp_path):
|
|
adjs = [DummyAdjustable(name="motor1", ID="M1"), DummyAdjustable(name="motor2", ID="M2")]
|
|
|
|
fake_acq = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1, 2], [3, 4]], [fake_acq], "test_scan",
|
|
[], ["bs_channel"], [], 1,
|
|
"data", tmp_path, True, None, True, 1, None, None
|
|
)
|
|
|
|
step_values = [1, 3]
|
|
n_step = 0
|
|
|
|
sb.do_step(n_step, step_values)
|
|
|
|
current_values = [adj.get_current_value() for adj in adjs]
|
|
assert current_values == step_values
|
|
|
|
assert len(sb.scan_info_sfdaq.values) > 0
|
|
assert len(sb.scan_info_sfdaq.readbacks) > 0
|
|
|
|
assert fake_acq.last_filename == "test_scan"
|
|
|
|
assert len(sb.scan_info.values) > 0
|
|
assert len(sb.scan_info.readbacks) > 0
|
|
|
|
|
|
def test_do_step_with_sensor_and_remote_plot(tmp_path):
|
|
adjs = [DummyAdjustable(name="motor", ID="M1")]
|
|
|
|
fake_acq = FakeAcquisition("test_instrument", "test_pgroup")
|
|
dummy_sensor = DummySensor()
|
|
dummy_remote_plot = DummyRemotePlot()
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1, 2, 3]], [fake_acq], "test_scan",
|
|
[], ["bs_channel"], [], 1,
|
|
"data", tmp_path, True, None, True, 1, dummy_sensor, dummy_remote_plot
|
|
)
|
|
|
|
step_values = [2]
|
|
n_step = 1
|
|
|
|
sb.do_step(n_step, step_values)
|
|
|
|
assert dummy_sensor.started
|
|
assert dummy_sensor.stopped
|
|
|
|
assert dummy_remote_plot.appended
|
|
|
|
x_value = adjs[0].get_current_value()
|
|
y_value = dummy_sensor.get()
|
|
expected_data = (float(x_value), float(y_value))
|
|
assert dummy_remote_plot.last_data == expected_data
|
|
|
|
|
|
def test_do_step_multiple_steps(tmp_path):
|
|
adjs = [DummyAdjustable(name="motor", ID="M1")]
|
|
|
|
fake_acq = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1, 2, 3]], [fake_acq], "test_scan",
|
|
[], ["bs_channel"], [], 1,
|
|
"data", tmp_path, True, None, True, 1, None, None
|
|
)
|
|
|
|
step_sequences = [
|
|
(0, [1]),
|
|
(1, [2]),
|
|
(2, [3])
|
|
]
|
|
|
|
for n_step, step_values in step_sequences:
|
|
fake_acq.reset()
|
|
|
|
sb.do_step(n_step, step_values)
|
|
|
|
assert adjs[0].get_current_value() == step_values[0]
|
|
|
|
assert "test_scan" in fake_acq.last_filename
|
|
|
|
assert len(sb.scan_info.values) == n_step + 1
|
|
|
|
|
|
# do_checked_step
|
|
|
|
def test_do_checked_step_with_condition_repeats(tmp_path):
|
|
adjs = [DummyAdjustable(name="motor", ID="M1")]
|
|
fake_acq = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
condition = DummyCondition(repeats=2)
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1, 2]], [fake_acq], "test_scan",
|
|
[], ["bs_channel"], [], 1,
|
|
"data", tmp_path, True, condition, True, 1, None, None
|
|
)
|
|
|
|
step_values = [1]
|
|
n_step = 0
|
|
|
|
do_step_call_count = 0
|
|
original_do_step = sb.do_step
|
|
|
|
def mock_do_step(*args, **kwargs):
|
|
nonlocal do_step_call_count
|
|
do_step_call_count += 1
|
|
return original_do_step(*args, **kwargs)
|
|
|
|
sb.do_step = mock_do_step
|
|
sb.running = True
|
|
|
|
sb.do_checked_step(n_step, step_values)
|
|
|
|
assert do_step_call_count == 2
|
|
|
|
assert condition.repeats == -1
|
|
|
|
|
|
# _make_summary
|
|
|
|
def test_make_summary_and_repr(tmp_path):
|
|
adjs = [DummyAdjustable(name="A", ID="A")]
|
|
sb = ScanBackend(
|
|
adjs, [[1, 2]], [FakeAcquisition("test_instrument", "test_pgroup")],
|
|
"fn", [], [], [], 2,
|
|
"data", tmp_path, True, None, True, 2, None, None
|
|
)
|
|
s = sb._make_summary()
|
|
assert "record" in s and "pulse" in s
|
|
assert isinstance(repr(sb), str)
|
|
|
|
|
|
def test_make_summary_single_repeat(tmp_path):
|
|
adjs = [DummyAdjustable(name="motor1", ID="M1"), DummyAdjustable(name="motor2", ID="M2")]
|
|
fake_acq = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1, 2], [3, 4]], [fake_acq], "test_scan",
|
|
[], ["bs_channel"], [], 5,
|
|
"data", tmp_path, True, None, True, 1,
|
|
None, None
|
|
)
|
|
|
|
summary = sb._make_summary()
|
|
|
|
assert "perform the following scan" in summary
|
|
|
|
assert "motor1" in summary
|
|
assert "motor2" in summary
|
|
|
|
assert "5 pulses" in summary
|
|
|
|
assert "test_scan" in summary
|
|
|
|
assert "FakeAcquisition" in summary
|
|
|
|
|
|
def test_make_summary_multiple_repeats(tmp_path):
|
|
adjs = [DummyAdjustable(name="motor", ID="M1")]
|
|
fake_acq = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1]], [fake_acq], "multi_scan",
|
|
[], ["bs_channel"], [], 1,
|
|
"data", tmp_path, True, None, True, 3,
|
|
None, None
|
|
)
|
|
|
|
summary = sb._make_summary()
|
|
|
|
assert "repeat the following scan 3 times" in summary
|
|
|
|
assert "1 pulse" in summary
|
|
|
|
assert "multi_scan" in summary
|
|
|
|
|
|
# scan_loop
|
|
|
|
def test_scan_loop_fake_only(tmp_path, capsys):
|
|
adjs = [
|
|
DummyAdjustable(name="A", ID="A", initial_value=0),
|
|
DummyAdjustable(name="B", ID="B", initial_value=0),
|
|
]
|
|
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
values = [[1, 10], [2, 20], [3, 30]]
|
|
|
|
sb = ScanBackend(
|
|
adjs, values, [fake], "scan1",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False,
|
|
condition=None, return_to_initial_values=True, n_repeat=1,
|
|
sensor=None, remote_plot=None
|
|
)
|
|
|
|
sb.running = True
|
|
sb.scan_loop()
|
|
|
|
out = capsys.readouterr().out
|
|
|
|
assert "Scan step 1 of 3" in out
|
|
assert "Scan step 2 of 3" in out
|
|
assert "Scan step 3 of 3" in out
|
|
assert "All scan steps done" in out
|
|
|
|
assert fake.call_count == 3
|
|
|
|
assert adjs[0].get_current_value() == 3
|
|
assert adjs[1].get_current_value() == 30
|
|
|
|
|
|
# repeated_scan_loop
|
|
|
|
def test_repeated_scan_loop_fake_only(tmp_path, capsys):
|
|
adjs = [DummyAdjustable(name="A", ID="A")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
values = [[1], [2]]
|
|
|
|
sb = ScanBackend(
|
|
adjs, values, [fake], "rscan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False,
|
|
condition=None,
|
|
return_to_initial_values=True, n_repeat=3,
|
|
sensor=None, remote_plot=None
|
|
)
|
|
|
|
sb.running = True
|
|
sb.repeated_scan_loop()
|
|
|
|
out = capsys.readouterr().out
|
|
|
|
assert "Repetition 1 of 3" in out
|
|
assert "Repetition 2 of 3" in out
|
|
assert "Repetition 3 of 3" in out
|
|
|
|
assert fake.call_count == 6
|
|
|
|
assert sb.filename == "rscan"
|
|
|
|
|
|
# Full scan end-to-end
|
|
# Tests complete scan execution with multiple adjustables and steps
|
|
# Verifies all values, readbacks, filenames, and return to initial
|
|
|
|
def test_full_multidimensional_scan_end_to_end(tmp_path):
|
|
d = 4
|
|
n_steps = 5
|
|
|
|
adjs = [
|
|
DummyAdjustable(name=f"M{i}", ID=f"ID{i}", initial_value=0, process_time=0)
|
|
for i in range(d)
|
|
]
|
|
|
|
values = [
|
|
list(range(t, t + d))
|
|
for t in range(1, n_steps + 1)
|
|
]
|
|
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
fake.default_dir = str(tmp_path / "fake_data")
|
|
|
|
sb = ScanBackend(
|
|
adjs, values, [fake],
|
|
filename="multidim_test",
|
|
detectors=[], channels=["ch"], pvs=[],
|
|
n_pulses=3,
|
|
data_base_dir="data",
|
|
scan_info_dir=tmp_path,
|
|
make_scan_sub_dir=True,
|
|
condition=None,
|
|
return_to_initial_values=True,
|
|
n_repeat=1,
|
|
sensor=None,
|
|
remote_plot=None
|
|
)
|
|
|
|
sb.run()
|
|
|
|
assert fake.call_count == n_steps
|
|
|
|
assert [a.get_current_value() for a in adjs] == [0] * d
|
|
|
|
assert len(sb.scan_info.values) == n_steps
|
|
assert len(sb.scan_info_sfdaq.values) == n_steps
|
|
|
|
for i in range(n_steps):
|
|
step_values = sb.scan_info.values[i]
|
|
assert step_values == values[i]
|
|
|
|
base = sb.filename
|
|
filebase = os.path.basename(base)
|
|
for i in range(n_steps):
|
|
expected = os.path.join(base, filebase + f"_step{i:04d}")
|
|
assert expected.endswith(f"{filebase}_step{i:04d}")
|
|
|
|
data_root = tmp_path / "fake_data" / "data"
|
|
assert not data_root.exists()
|
|
|
|
expected_subfolder = data_root / "multidim_test"
|
|
assert not expected_subfolder.exists()
|
|
|
|
assert all(t.status == "done" for t in sb.current_tasks)
|
|
|
|
all_files = []
|
|
for t in sb.current_tasks:
|
|
assert len(t.filenames) > 0
|
|
all_files += t.filenames
|
|
|
|
assert len(all_files) >= 1
|
|
|
|
for i in range(n_steps):
|
|
target = values[i]
|
|
readback = sb.scan_info_sfdaq.readbacks[i]
|
|
assert readback == target
|
|
|
|
assert sb.scan_info.values[0] == values[0]
|
|
assert sb.scan_info.values[-1] == values[-1]
|
|
|
|
|
|
def test_scanND_relative_positions_only(tmp_path):
|
|
d = 5
|
|
|
|
initial_values = [10 * (i + 1) for i in range(d)]
|
|
adjustables = [
|
|
DummyAdjustable(name=f"M{i}", ID=f"ID{i}", initial_value=initial_values[i], process_time=0)
|
|
for i in range(d)
|
|
]
|
|
|
|
positions_per_dim = [list(range(-1, 2))] * d
|
|
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjustables,
|
|
values=positions_per_dim,
|
|
acquisitions=[fake],
|
|
filename="relND",
|
|
detectors=[], channels=["ch"], pvs=[],
|
|
n_pulses=1,
|
|
data_base_dir="data",
|
|
scan_info_dir=tmp_path,
|
|
make_scan_sub_dir=False,
|
|
condition=None,
|
|
return_to_initial_values=True,
|
|
n_repeat=1,
|
|
sensor=None, remote_plot=None
|
|
)
|
|
|
|
offset_values = [
|
|
[p + initial_values[i] for p in [-1, 0, 1]]
|
|
for i in range(d)
|
|
]
|
|
|
|
sb.values = offset_values
|
|
|
|
for i in range(d):
|
|
expected = [
|
|
initial_values[i] - 1,
|
|
initial_values[i],
|
|
initial_values[i] + 1,
|
|
]
|
|
assert sb.values[i] == expected
|
|
|
|
assert len(sb.values) == d
|
|
|
|
assert all(len(axis) == 3 for axis in sb.values)
|
|
|
|
|
|
# Utility functions
|
|
|
|
def test_print_current_values_displays_correct_output(capsys):
|
|
class Obj:
|
|
def __init__(self, adjustables):
|
|
self.adjustables = adjustables
|
|
|
|
def print_current_values(self):
|
|
print_all_current_values(self.adjustables)
|
|
|
|
adjs = [
|
|
DummyAdjustable(ID="A1", name="MotorA", initial_value=10),
|
|
DummyAdjustable(ID="B2", name="MotorB", initial_value=20),
|
|
]
|
|
|
|
obj = Obj(adjs)
|
|
obj.print_current_values()
|
|
|
|
captured = capsys.readouterr().out
|
|
|
|
assert "Current values" in captured
|
|
assert "A1" in captured and "B2" in captured
|
|
assert "10" in captured and "20" in captured
|
|
|
|
|
|
def test_get_all_current_values_returns_correct_list():
|
|
adjs = [
|
|
DummyAdjustable(ID="M1", name="M1", initial_value=5),
|
|
DummyAdjustable(ID="M2", name="M2", initial_value=15),
|
|
]
|
|
values = get_all_current_values(adjs)
|
|
assert values == [5, 15]
|
|
|
|
|
|
def test_set_all_target_values_and_wait_full_chain():
|
|
adjs = [
|
|
DummyAdjustable(ID="1", initial_value=0, process_time=0),
|
|
DummyAdjustable(ID="2", initial_value=0, process_time=0),
|
|
]
|
|
|
|
set_all_target_values_and_wait(adjs, [10, 20])
|
|
|
|
values = [a.get_current_value() for a in adjs]
|
|
assert values == [10, 20]
|
|
|
|
tasks = set_all_target_values(adjs, [15, 25])
|
|
assert all(hasattr(t, "wait") for t in tasks)
|
|
|
|
wait_for_all(tasks)
|
|
|
|
values = [a.get_current_value() for a in adjs]
|
|
assert values == [15, 25]
|
|
|
|
|
|
def test_wait_for_all_calls_wait_on_all_tasks(monkeypatch):
|
|
called = []
|
|
|
|
class DummyTask:
|
|
def __init__(self, name):
|
|
self.name = name
|
|
def wait(self):
|
|
called.append(self.name)
|
|
|
|
tasks = [DummyTask("t1"), DummyTask("t2"), DummyTask("t3")]
|
|
|
|
wait_for_all(tasks)
|
|
|
|
assert called == ["t1", "t2", "t3"]
|
|
|
|
|
|
def test_stop_all_calls_stop_and_handles_exceptions(capsys):
|
|
called = []
|
|
|
|
class WorkingTask:
|
|
def __init__(self, name):
|
|
self.name = name
|
|
self.stopped = False
|
|
def stop(self):
|
|
self.stopped = True
|
|
called.append(self.name)
|
|
|
|
class FailingTask:
|
|
def stop(self):
|
|
raise RuntimeError("boom")
|
|
|
|
tasks = [WorkingTask("T1"), FailingTask(), WorkingTask("T2")]
|
|
|
|
stop_all(tasks)
|
|
|
|
assert all(t.stopped for t in tasks if isinstance(t, WorkingTask))
|
|
|
|
assert called == ["T1", "T2"]
|
|
|
|
out = capsys.readouterr().out
|
|
assert "Stopping caused" in out
|
|
assert "boom" in out
|
|
|
|
|
|
# Edge cases
|
|
|
|
def test_run_with_exception_handling(tmp_path, capsys):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1], [2]], [fake], "error_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, False, 1, None, None
|
|
)
|
|
|
|
original_do_step = sb.do_step
|
|
def failing_do_step(*args, **kwargs):
|
|
if args[0] == 1:
|
|
raise ValueError("Intentional test error")
|
|
return original_do_step(*args, **kwargs)
|
|
|
|
sb.do_step = failing_do_step
|
|
|
|
sb.run()
|
|
|
|
out = capsys.readouterr().out
|
|
assert "Stopping because of:" in out
|
|
assert "Intentional test error" in out
|
|
assert "Stopped current DAQ tasks:" in out
|
|
|
|
assert not sb.running
|
|
|
|
|
|
def test_scan_interrupted_midway(tmp_path, capsys):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1], [2], [3], [4], [5]], [fake], "interrupted_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
original_do_step = sb.do_step
|
|
def stopping_do_step(*args, **kwargs):
|
|
result = original_do_step(*args, **kwargs)
|
|
if args[0] == 1:
|
|
sb.running = False
|
|
return result
|
|
|
|
sb.do_step = stopping_do_step
|
|
|
|
sb.run()
|
|
|
|
out = capsys.readouterr().out
|
|
assert "Stopped during scan step" in out
|
|
assert "of 5" in out
|
|
|
|
assert len(sb.scan_info.values) == 2
|
|
|
|
|
|
def test_remote_plot_connection_refused_on_new_plot(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
sensor = DummySensor()
|
|
remote_plot = DummyRemotePlot(fail=True)
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1]], [fake], "plot_fail_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, sensor, remote_plot
|
|
)
|
|
|
|
sb.run()
|
|
|
|
assert remote_plot.created
|
|
assert remote_plot.appended
|
|
|
|
|
|
def test_remote_plot_connection_refused_on_append_data(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
sensor = DummySensor()
|
|
|
|
class SelectiveFailPlot(DummyRemotePlot):
|
|
def __init__(self):
|
|
super().__init__(fail=False)
|
|
self.append_fail = True
|
|
|
|
def append_data(self, filename, data):
|
|
self.appended = True
|
|
self.last_data = data
|
|
self.last_filename = filename
|
|
if self.append_fail:
|
|
raise ConnectionRefusedError("Connection refused")
|
|
|
|
remote_plot = SelectiveFailPlot()
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1]], [fake], "append_fail_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, sensor, remote_plot
|
|
)
|
|
|
|
sb.run()
|
|
|
|
assert remote_plot.created
|
|
assert remote_plot.appended
|
|
|
|
|
|
def test_step_info_callable(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
call_count = [0]
|
|
def step_info_func():
|
|
call_count[0] += 1
|
|
return {"call": call_count[0], "timestamp": "2024-01-01"}
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1], [2]], [fake], "callable_info_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
sb.running = True
|
|
sb.scan_loop(step_info=step_info_func)
|
|
|
|
assert call_count[0] == 4
|
|
|
|
assert len(sb.scan_info.info) == 2
|
|
assert sb.scan_info.info[0] == {"call": 2, "timestamp": "2024-01-01"}
|
|
assert sb.scan_info.info[1] == {"call": 4, "timestamp": "2024-01-01"}
|
|
|
|
|
|
def test_repeated_scan_with_very_large_n_repeat(tmp_path, capsys):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1], [2]], [fake], "many_repeat_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 10, None, None
|
|
)
|
|
|
|
sb.run()
|
|
|
|
assert fake.call_count == 20
|
|
|
|
out = capsys.readouterr().out
|
|
assert "Repetition 1 of 10" in out
|
|
assert "Repetition 10 of 10" in out
|
|
|
|
|
|
def test_repeated_scan_loop_stops_when_running_false(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1]], [fake], "stop_repeat_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 100, None, None
|
|
)
|
|
|
|
original_scan_loop = sb.scan_loop
|
|
rep_count = [0]
|
|
def counting_scan_loop(*args, **kwargs):
|
|
result = original_scan_loop(*args, **kwargs)
|
|
rep_count[0] += 1
|
|
if rep_count[0] >= 2:
|
|
sb.running = False
|
|
return result
|
|
|
|
sb.scan_loop = counting_scan_loop
|
|
|
|
sb.running = True
|
|
sb.repeated_scan_loop()
|
|
|
|
assert rep_count[0] == 2
|
|
assert fake.call_count == 2
|
|
|
|
|
|
def test_sfdaq_with_spreadsheet_logging(tmp_path):
|
|
adjs = [DummyAdjustable(name="M1", ID="M1"), DummyAdjustable(name="M2", ID="M2")]
|
|
|
|
logged_data = []
|
|
class MockSpreadsheet:
|
|
def add(self, run_number, filename, n_pulses, scanned_adjs, scan_values):
|
|
logged_data.append({
|
|
"run_number": run_number,
|
|
"filename": filename,
|
|
"n_pulses": n_pulses,
|
|
"scanned_adjs": scanned_adjs,
|
|
"scan_values": scan_values
|
|
})
|
|
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
fake.spreadsheet = MockSpreadsheet()
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1, 2], [3, 4]], [fake], "spreadsheet_scan",
|
|
[], ["ch"], [], 5,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
sb.scan_loop()
|
|
|
|
assert len(logged_data) == 1
|
|
log = logged_data[0]
|
|
assert log["run_number"] == 1
|
|
assert log["filename"] == "spreadsheet_scan"
|
|
assert log["n_pulses"] == 5
|
|
assert log["scanned_adjs"] == adjs
|
|
assert log["scan_values"] == [[1, 2], [3, 4]]
|
|
|
|
|
|
def test_scan_loop_with_single_value(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[42]], [fake], "single_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
sb.running = True
|
|
sb.scan_loop()
|
|
|
|
assert len(sb.scan_info.values) == 1
|
|
assert sb.scan_info.values[0] == [42]
|
|
assert fake.call_count == 1
|
|
|
|
|
|
def test_scan_loop_advances_run_number_per_acquisition(tmp_path, capsys):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake1 = FakeAcquisition("test_instrument", "test_pgroup")
|
|
fake2 = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1]], [fake1, fake2], "multi_acq_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
sb.scan_loop()
|
|
|
|
out = capsys.readouterr().out
|
|
assert "Advanced run number to 1 for" in out
|
|
assert out.count("Advanced run number to") == 2
|
|
|
|
|
|
def test_return_to_initial_values_false(tmp_path):
|
|
adjs = [DummyAdjustable(ID="M", initial_value=100, process_time=0)]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1], [2], [3]], [fake], "no_return_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, False, 1, None, None
|
|
)
|
|
|
|
sb.run()
|
|
|
|
assert adjs[0].get_current_value() == 3
|
|
assert adjs[0].get_current_value() != 100
|
|
|
|
|
|
# N-dimensional grid scans
|
|
|
|
def test_true_2D_grid_scan(tmp_path):
|
|
N = 2
|
|
adjs = [DummyAdjustable(name=f"Adj{i}", ID=f"Adj{i}") for i in range(N)]
|
|
|
|
x_vals = [0, 1, 2]
|
|
y_vals = [10, 20]
|
|
|
|
values = [[x, y] for x in x_vals for y in y_vals]
|
|
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, values, [fake], "grid_2d_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
sb.running = True
|
|
sb.scan_loop()
|
|
|
|
assert len(sb.scan_info.values) == 6
|
|
assert fake.call_count == 6
|
|
|
|
assert sb.scan_info.values[0] == [0, 10]
|
|
assert sb.scan_info.values[1] == [0, 20]
|
|
assert sb.scan_info.values[2] == [1, 10]
|
|
assert sb.scan_info.values[3] == [1, 20]
|
|
assert sb.scan_info.values[4] == [2, 10]
|
|
assert sb.scan_info.values[5] == [2, 20]
|
|
|
|
|
|
def test_true_3D_grid_scan(tmp_path):
|
|
N = 3
|
|
adjs = [DummyAdjustable(name=f"Adj{i}", ID=f"Adj{i}") for i in range(N)]
|
|
|
|
x_vals = [0, 1]
|
|
y_vals = [10, 20]
|
|
z_vals = [100, 200, 300]
|
|
|
|
values = [[x, y, z] for x in x_vals for y in y_vals for z in z_vals]
|
|
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, values, [fake], "grid_3d_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
sb.running = True
|
|
sb.scan_loop()
|
|
|
|
assert len(sb.scan_info.values) == 12
|
|
assert fake.call_count == 12
|
|
|
|
assert sb.scan_info.values[0] == [0, 10, 100]
|
|
assert sb.scan_info.values[1] == [0, 10, 200]
|
|
assert sb.scan_info.values[2] == [0, 10, 300]
|
|
assert sb.scan_info.values[3] == [0, 20, 100]
|
|
assert sb.scan_info.values[10] == [1, 20, 200]
|
|
assert sb.scan_info.values[11] == [1, 20, 300]
|
|
|
|
|
|
def test_true_4D_grid_scan(tmp_path):
|
|
N = 4
|
|
adjs = [DummyAdjustable(name=f"Adj{i}", ID=f"Adj{i}") for i in range(N)]
|
|
|
|
dim_vals = [[0, 1], [10, 11], [100, 101], [1000, 1001]]
|
|
|
|
values = []
|
|
for v0 in dim_vals[0]:
|
|
for v1 in dim_vals[1]:
|
|
for v2 in dim_vals[2]:
|
|
for v3 in dim_vals[3]:
|
|
values.append([v0, v1, v2, v3])
|
|
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, values, [fake], "grid_4d_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
sb.running = True
|
|
sb.scan_loop()
|
|
|
|
assert len(sb.scan_info.values) == 16
|
|
assert fake.call_count == 16
|
|
|
|
assert sb.scan_info.values[0] == [0, 10, 100, 1000]
|
|
assert sb.scan_info.values[15] == [1, 11, 101, 1001]
|
|
|
|
|
|
def test_parametrized_ND_grid_scan_generator(tmp_path):
|
|
import itertools
|
|
|
|
def generate_nd_grid(dim_values_list):
|
|
return [list(combo) for combo in itertools.product(*dim_values_list)]
|
|
|
|
N = 2
|
|
dim_values = [[1, 2, 3], [100, 200]]
|
|
adjs = [DummyAdjustable(name=f"A{i}", ID=f"A{i}") for i in range(N)]
|
|
values = generate_nd_grid(dim_values)
|
|
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
sb = ScanBackend(
|
|
adjs, values, [fake], "nd_gen_2d",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
sb.running = True
|
|
sb.scan_loop()
|
|
|
|
assert len(sb.scan_info.values) == 6
|
|
assert sb.scan_info.values[0] == [1, 100]
|
|
assert sb.scan_info.values[5] == [3, 200]
|
|
|
|
N = 3
|
|
dim_values = [[1, 2], [10, 20, 30], [100]]
|
|
adjs = [DummyAdjustable(name=f"A{i}", ID=f"A{i}") for i in range(N)]
|
|
values = generate_nd_grid(dim_values)
|
|
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
sb = ScanBackend(
|
|
adjs, values, [fake], "nd_gen_3d",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
sb.running = True
|
|
sb.scan_loop()
|
|
|
|
assert len(sb.scan_info.values) == 6
|
|
assert sb.scan_info.values[0] == [1, 10, 100]
|
|
assert sb.scan_info.values[5] == [2, 30, 100]
|
|
|
|
N = 5
|
|
dim_values = [[1, 2], [10], [100, 200], [1000], [10000, 20000, 30000]]
|
|
adjs = [DummyAdjustable(name=f"A{i}", ID=f"A{i}") for i in range(N)]
|
|
values = generate_nd_grid(dim_values)
|
|
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
sb = ScanBackend(
|
|
adjs, values, [fake], "nd_gen_5d",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
sb.running = True
|
|
sb.scan_loop()
|
|
|
|
assert len(sb.scan_info.values) == 12
|
|
assert sb.scan_info.values[0] == [1, 10, 100, 1000, 10000]
|
|
assert sb.scan_info.values[11] == [2, 10, 200, 1000, 30000]
|
|
|
|
|
|
def test_ND_grid_with_readbacks_verification(tmp_path):
|
|
import itertools
|
|
|
|
N = 3
|
|
dim_values = [[0, 1], [10, 20], [100, 200, 300]]
|
|
adjs = [DummyAdjustable(name=f"Motor{i}", ID=f"Motor{i}") for i in range(N)]
|
|
values = [list(combo) for combo in itertools.product(*dim_values)]
|
|
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, values, [fake], "grid_readbacks",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
sb.running = True
|
|
sb.scan_loop()
|
|
|
|
assert len(sb.scan_info.readbacks) == 12
|
|
|
|
for i, expected_vals in enumerate(values):
|
|
assert sb.scan_info.readbacks[i] == expected_vals
|
|
|
|
assert sb.scan_info.values[0] == [0, 10, 100]
|
|
assert sb.scan_info.readbacks[0] == [0, 10, 100]
|
|
|
|
assert sb.scan_info.values[11] == [1, 20, 300]
|
|
assert sb.scan_info.readbacks[11] == [1, 20, 300]
|
|
|
|
|
|
def test_large_ND_grid_scan(tmp_path):
|
|
N = 3
|
|
dim_values = [[0, 1, 2], [10, 20, 30], [100, 200, 300]]
|
|
|
|
import itertools
|
|
adjs = [DummyAdjustable(name=f"Axis{i}", ID=f"Axis{i}") for i in range(N)]
|
|
values = [list(combo) for combo in itertools.product(*dim_values)]
|
|
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, values, [fake], "large_grid_3d",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
sb.running = True
|
|
sb.scan_loop()
|
|
|
|
assert len(sb.scan_info.values) == 27
|
|
assert fake.call_count == 27
|
|
|
|
assert sb.scan_info.values[0] == [0, 10, 100]
|
|
assert sb.scan_info.values[8] == [0, 30, 300]
|
|
assert sb.scan_info.values[26] == [2, 30, 300]
|
|
|
|
|
|
def test_ND_grid_interrupted_midway(tmp_path):
|
|
N = 3
|
|
dim_values = [[0, 1, 2], [10, 20], [100, 200, 300]]
|
|
|
|
import itertools
|
|
adjs = [DummyAdjustable(name=f"M{i}", ID=f"M{i}") for i in range(N)]
|
|
values = [list(combo) for combo in itertools.product(*dim_values)]
|
|
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, values, [fake], "interrupted_grid",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
original_do_step = sb.do_step
|
|
step_count = [0]
|
|
def counting_do_step(*args, **kwargs):
|
|
result = original_do_step(*args, **kwargs)
|
|
step_count[0] += 1
|
|
if step_count[0] >= 5:
|
|
sb.running = False
|
|
return result
|
|
|
|
sb.do_step = counting_do_step
|
|
sb.running = True
|
|
sb.scan_loop()
|
|
|
|
assert len(sb.scan_info.values) == 5
|
|
assert fake.call_count == 5
|
|
assert step_count[0] == 5
|
|
|
|
|
|
# Non-SFDAQ acquisitions
|
|
|
|
def test_nonsfdaq_acquisition_only(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
nonsfdaq = NonSFDAQAcquisition(name="TestNonSFDAQ", default_dir=str(tmp_path))
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1], [2]], [nonsfdaq], "nonsfdaq_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
assert sb.filename != "nonsfdaq_scan"
|
|
assert sb.filename_sfdaq == "nonsfdaq_scan"
|
|
|
|
sb.running = True
|
|
sb.scan_loop()
|
|
|
|
assert nonsfdaq.call_count == 2
|
|
|
|
assert len(sb.scan_info.values) == 2
|
|
assert sb.scan_info.values[0] == [1]
|
|
assert sb.scan_info.values[1] == [2]
|
|
|
|
|
|
def test_nonsfdaq_with_make_scan_sub_dir(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
default_dir = tmp_path / "nonsfdaq_default"
|
|
default_dir.mkdir()
|
|
|
|
nonsfdaq = NonSFDAQAcquisition(name="TestNonSFDAQ", default_dir=str(default_dir))
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1]], [nonsfdaq], "subdir_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, True, None, True, 1, None, None
|
|
)
|
|
|
|
sb.create_output_dirs()
|
|
|
|
expected_subdir = default_dir / "data" / sb.filename
|
|
assert expected_subdir.exists()
|
|
|
|
|
|
def test_nonsfdaq_acquire_all_different_params(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
nonsfdaq = NonSFDAQAcquisition(name="TestNonSFDAQ", default_dir=str(tmp_path))
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1]], [nonsfdaq], "param_test",
|
|
[], ["ch"], [], 5,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
filename = sb.get_filename(0)
|
|
sb.acquire_all(filename)
|
|
|
|
assert nonsfdaq.last_filename == filename
|
|
assert nonsfdaq.last_n_pulses == 5
|
|
|
|
|
|
def test_mixed_sfdaq_and_nonsfdaq_acquisitions(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
nonsfdaq = NonSFDAQAcquisition(name="Mixed", default_dir=str(tmp_path))
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1], [2]], [fake, nonsfdaq], "mixed_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
assert sb.filename != "mixed_scan"
|
|
|
|
sb.running = True
|
|
sb.scan_loop()
|
|
|
|
assert fake.call_count == 2
|
|
assert nonsfdaq.call_count == 2
|
|
|
|
|
|
def test_nonsfdaq_with_default_dir_none(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
nonsfdaq = NonSFDAQAcquisition(name="NoDir", default_dir=None)
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1]], [nonsfdaq], "nodir_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, True, None, True, 1, None, None
|
|
)
|
|
|
|
sb.create_output_dirs()
|
|
|
|
sb.running = True
|
|
sb.scan_loop()
|
|
assert nonsfdaq.call_count == 1
|
|
|
|
|
|
# return_to_initial_values=None
|
|
|
|
def test_return_to_initial_values_none_user_says_yes(tmp_path, monkeypatch):
|
|
adjs = [DummyAdjustable(ID="M", initial_value=100, process_time=0)]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1], [2]], [fake], "interactive_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, None, 1, None, None
|
|
)
|
|
|
|
user_response = [True]
|
|
def mock_ask_yes_no(prompt):
|
|
return user_response[0]
|
|
|
|
monkeypatch.setattr("slic.core.scanner.scanbackend.ask_Yes_no", mock_ask_yes_no)
|
|
|
|
sb.run()
|
|
|
|
assert adjs[0].get_current_value() == 100
|
|
|
|
|
|
def test_return_to_initial_values_none_user_says_no(tmp_path, monkeypatch):
|
|
adjs = [DummyAdjustable(ID="M", initial_value=100, process_time=0)]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1], [2]], [fake], "interactive_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, None, 1, None, None
|
|
)
|
|
|
|
def mock_ask_yes_no(prompt):
|
|
return False
|
|
|
|
monkeypatch.setattr("slic.core.scanner.scanbackend.ask_Yes_no", mock_ask_yes_no)
|
|
|
|
sb.run()
|
|
|
|
assert adjs[0].get_current_value() == 2
|
|
|
|
|
|
# n_repeat=None (infinite)
|
|
|
|
def test_n_repeat_none_infinite_with_early_stop(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1]], [fake], "infinite_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, None, None, None
|
|
)
|
|
|
|
original_scan_loop = sb.scan_loop
|
|
rep_count = [0]
|
|
def counting_scan_loop(*args, **kwargs):
|
|
result = original_scan_loop(*args, **kwargs)
|
|
rep_count[0] += 1
|
|
if rep_count[0] >= 3:
|
|
sb.running = False
|
|
return result
|
|
|
|
sb.scan_loop = counting_scan_loop
|
|
sb.running = True
|
|
sb.repeated_scan_loop()
|
|
|
|
assert rep_count[0] == 3
|
|
assert fake.call_count == 3
|
|
|
|
|
|
def test_n_repeat_none_printable_summary(tmp_path, capsys):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1]], [fake], "infinite_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, None, None, None
|
|
)
|
|
|
|
summary = sb._make_summary()
|
|
|
|
assert "repeat the following scan forever" in summary
|
|
|
|
|
|
# KeyboardInterrupt
|
|
|
|
def test_keyboard_interrupt_handling(tmp_path, capsys):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1], [2], [3]], [fake], "keyboard_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
original_do_step = sb.do_step
|
|
step_count = [0]
|
|
def interrupt_do_step(*args, **kwargs):
|
|
step_count[0] += 1
|
|
if step_count[0] >= 2:
|
|
raise KeyboardInterrupt
|
|
return original_do_step(*args, **kwargs)
|
|
|
|
sb.do_step = interrupt_do_step
|
|
sb.run()
|
|
|
|
assert step_count[0] == 2
|
|
assert sb.running == False
|
|
|
|
out = capsys.readouterr().out
|
|
assert "Stopped current DAQ tasks:" in out
|
|
|
|
|
|
# sensor without remote_plot (BUG)
|
|
|
|
def test_sensor_without_remote_plot(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
sensor = DummySensor(name="TestSensor")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1], [2]], [fake], "sensor_only_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, sensor, None
|
|
)
|
|
|
|
sb.run()
|
|
|
|
assert sensor.started
|
|
assert sensor.stopped
|
|
assert fake.call_count == 2
|
|
|
|
|
|
# condition with n_repeat>1
|
|
|
|
def test_condition_with_multiple_repeats(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
condition = DummyCondition(repeats=6)
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1]], [fake], "condition_repeat_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, condition, True, 3, None, None
|
|
)
|
|
|
|
sb.run()
|
|
|
|
assert fake.call_count == 6
|
|
|
|
|
|
# condition.stop()
|
|
|
|
def test_stop_calls_condition_stop(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
condition = DummyCondition(repeats=0)
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1]], [fake], "stop_condition_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, condition, True, 1, None, None
|
|
)
|
|
|
|
assert not condition._stopped
|
|
sb.stop()
|
|
assert condition._stopped
|
|
|
|
|
|
# step_info as static dict
|
|
|
|
def test_step_info_as_static_dict(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1], [2]], [fake], "static_info_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
static_info = {"experiment": "test", "user": "testuser"}
|
|
|
|
sb.running = True
|
|
sb.scan_loop(step_info=static_info)
|
|
|
|
assert len(sb.scan_info.info) == 2
|
|
assert sb.scan_info.info[0] == static_info
|
|
assert sb.scan_info.info[1] == static_info
|
|
|
|
|
|
# Empty values (BUG)
|
|
|
|
def test_scan_with_empty_values_list(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [], [fake], "empty_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
sb.running = True
|
|
|
|
sb.scan_loop()
|
|
|
|
assert len(sb.scan_info.values) == 0
|
|
assert fake.call_count == 0
|
|
|
|
|
|
def test_scan_with_empty_acquisitions_list(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1], [2]], [], "no_acq_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
sb.running = True
|
|
sb.scan_loop()
|
|
|
|
assert len(sb.scan_info.values) == 2
|
|
|
|
|
|
def test_scan_with_very_large_istep(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1]], [fake], "large_step_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, True, None, True, 1, None, None
|
|
)
|
|
|
|
filename = sb.get_filename(12345)
|
|
|
|
assert "step12345" in filename
|
|
|
|
|
|
def test_condition_with_minimum_repeats(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
condition = DummyCondition(repeats=1)
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1]], [fake], "min_repeat_condition",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, condition, True, 1, None, None
|
|
)
|
|
|
|
sb.running = True
|
|
sb.scan_loop()
|
|
|
|
assert fake.call_count == 1
|
|
|
|
|
|
def test_remote_plot_without_sensor(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
remote_plot = DummyRemotePlot()
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1], [2]], [fake], "plot_no_sensor_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, remote_plot
|
|
)
|
|
|
|
sb.run()
|
|
|
|
assert not remote_plot.created
|
|
assert fake.call_count == 2
|
|
|
|
|
|
def test_n_repeat_with_n_pulses_formatting(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb1 = ScanBackend(
|
|
adjs, [[1]], [fake], "scan1",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
summary1 = sb1._make_summary()
|
|
assert "1 pulse" in summary1
|
|
|
|
sb2 = ScanBackend(
|
|
adjs, [[1]], [fake], "scan2",
|
|
[], ["ch"], [], 5,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
summary2 = sb2._make_summary()
|
|
assert "5 pulses" in summary2
|
|
|
|
|
|
def test_scan_stopped_before_first_iteration(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, [[1], [2], [3]], [fake], "stop_early_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
sb.running = False
|
|
sb.scan_loop()
|
|
|
|
assert len(sb.scan_info.values) == 0
|
|
assert fake.call_count == 0
|
|
|
|
|
|
# Complete scan verification
|
|
# Verifies ALL values, readbacks, acquisitions, and return to initial
|
|
|
|
def test_complete_scan_values_readbacks_and_return_to_initial(tmp_path):
|
|
adj1 = DummyAdjustable(name="Motor1", ID="M1", initial_value=0, process_time=0)
|
|
adj2 = DummyAdjustable(name="Motor2", ID="M2", initial_value=0, process_time=0)
|
|
adjs = [adj1, adj2]
|
|
|
|
scan_values = [
|
|
[10, 100],
|
|
[20, 200],
|
|
[30, 300]
|
|
]
|
|
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
adjs, scan_values, [fake], "complete_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
initial_before = [adj1.get_current_value(), adj2.get_current_value()]
|
|
assert initial_before == [0, 0], "Initial values should be [0, 0]"
|
|
|
|
sb.run()
|
|
|
|
assert fake.call_count == 3, f"Expected 3 acquisitions, got {fake.call_count}"
|
|
|
|
assert len(sb.scan_info.values) == 3, "Should have 3 recorded values"
|
|
assert sb.scan_info.values[0] == [10, 100], f"Step 0: expected [10, 100], got {sb.scan_info.values[0]}"
|
|
assert sb.scan_info.values[1] == [20, 200], f"Step 1: expected [20, 200], got {sb.scan_info.values[1]}"
|
|
assert sb.scan_info.values[2] == [30, 300], f"Step 2: expected [30, 300], got {sb.scan_info.values[2]}"
|
|
|
|
assert len(sb.scan_info.readbacks) == 3, "Should have 3 readback records"
|
|
assert sb.scan_info.readbacks[0] == [10, 100], f"Readback 0: expected [10, 100], got {sb.scan_info.readbacks[0]}"
|
|
assert sb.scan_info.readbacks[1] == [20, 200], f"Readback 1: expected [20, 200], got {sb.scan_info.readbacks[1]}"
|
|
assert sb.scan_info.readbacks[2] == [30, 300], f"Readback 2: expected [30, 300], got {sb.scan_info.readbacks[2]}"
|
|
|
|
final_values = [adj1.get_current_value(), adj2.get_current_value()]
|
|
assert final_values == [0, 0], f"Should return to [0, 0], but got {final_values}"
|
|
|
|
assert sb.initial_values == [0, 0], f"Stored initial values should be [0, 0], got {sb.initial_values}"
|
|
|
|
|
|
def test_scan_without_return_to_initial_stays_at_last_value(tmp_path):
|
|
adj1 = DummyAdjustable(name="M1", ID="M1", initial_value=100, process_time=0)
|
|
adj2 = DummyAdjustable(name="M2", ID="M2", initial_value=200, process_time=0)
|
|
|
|
scan_values = [
|
|
[10, 20],
|
|
[30, 40],
|
|
[50, 60]
|
|
]
|
|
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
sb = ScanBackend(
|
|
[adj1, adj2], scan_values, [fake], "no_return_scan",
|
|
[], ["ch"], [], 1,
|
|
"data", tmp_path, False, None, False, 1, None, None
|
|
)
|
|
|
|
assert adj1.get_current_value() == 100
|
|
assert adj2.get_current_value() == 200
|
|
|
|
sb.run()
|
|
|
|
assert adj1.get_current_value() == 50, f"M1 should be at 50, got {adj1.get_current_value()}"
|
|
assert adj2.get_current_value() == 60, f"M2 should be at 60, got {adj2.get_current_value()}"
|
|
|
|
|
|
def test_acquisition_parameters_are_correct(tmp_path):
|
|
adjs = [DummyAdjustable(name="M", ID="M")]
|
|
fake = FakeAcquisition("test_instrument", "test_pgroup")
|
|
|
|
scan_values = [[1], [2], [3]]
|
|
|
|
sb = ScanBackend(
|
|
adjs, scan_values, [fake], "param_check_scan",
|
|
[], ["test_channel"], [], 5,
|
|
"data", tmp_path, False, None, True, 1, None, None
|
|
)
|
|
|
|
sb.run()
|
|
|
|
assert fake.call_count == 3
|
|
|
|
assert fake.last_n_pulses == 5, f"Expected n_pulses=5, got {fake.last_n_pulses}"
|
|
|
|
assert fake.last_channels == ["test_channel"], f"Expected channels=['test_channel'], got {fake.last_channels}"
|