mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2026-04-21 05:14:35 +02:00
482 lines
19 KiB
Python
482 lines
19 KiB
Python
# skip-file
|
|
import socket
|
|
from unittest import mock
|
|
|
|
import numpy as np
|
|
import pytest
|
|
from ophyd import Staged
|
|
from pandablocks.connections import NeedMoreDataError
|
|
|
|
from ophyd_devices import StatusBase
|
|
from ophyd_devices.devices.panda_box.panda_box import (
|
|
EndData,
|
|
FrameData,
|
|
PandaBox,
|
|
PandaBoxDataConnection,
|
|
PandaState,
|
|
ReadyData,
|
|
StartData,
|
|
)
|
|
from ophyd_devices.devices.panda_box.utils import (
|
|
PANDA_AVAIL_PCAP_BLOCKS,
|
|
PANDA_AVAIL_PCAP_CAPTURE_FIELDS,
|
|
block_name_mapping,
|
|
get_pcap_capture_fields,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def _signal_aliases():
|
|
return {"FMC_IN.VAL1.Value": "my_signal_1", "FMC_IN.VAL2.Mean": "my_signal_2"}
|
|
|
|
|
|
@pytest.fixture
|
|
def panda_box(_signal_aliases):
|
|
return PandaBox(name="panda_box", host="localhost", signal_alias=_signal_aliases)
|
|
|
|
|
|
def test_panda_box_init_invalid_signal_alias():
|
|
"""Test that providing invalid signal aliases raises an error."""
|
|
with pytest.raises(
|
|
ValueError, match="Invalid signal name in signal_alias: 'Invalid.Signal.Name'"
|
|
):
|
|
PandaBox(
|
|
name="panda_box",
|
|
host="localhost",
|
|
signal_alias={"FMC_IN.VAL1.Value": "Invalid.Signal.Name"},
|
|
)
|
|
|
|
|
|
def test_panda_box_init(panda_box, _signal_aliases):
|
|
"""Test initialization of PandaBox, including default signal aliases."""
|
|
assert panda_box.name == "panda_box"
|
|
assert panda_box.host == "localhost"
|
|
all_signal_names = [name for name, _ in panda_box.data.signals]
|
|
for block in PANDA_AVAIL_PCAP_BLOCKS:
|
|
for field in PANDA_AVAIL_PCAP_CAPTURE_FIELDS:
|
|
signal_name = f"{block}.{field}"
|
|
if signal_name in ("FMC_IN.VAL1.Value", "FMC_IN.VAL2.Mean"):
|
|
# These signals should be renamed
|
|
assert _signal_aliases[signal_name] in all_signal_names
|
|
continue
|
|
assert (
|
|
block_name_mapping(signal_name) in all_signal_names
|
|
), f"Missing signal: {signal_name}"
|
|
|
|
|
|
def test_panda_wait_for_connection(panda_box):
|
|
"""Test that wait_for_connection can be called without error."""
|
|
with mock.patch.object(panda_box, "send_raw") as mock_send_raw:
|
|
mock_send_raw.return_value = "OK"
|
|
panda_box.wait_for_connection(timeout=1)
|
|
mock_send_raw.assert_called_with("*IDN?")
|
|
|
|
|
|
def test_panda_on_connected(panda_box):
|
|
"""Test that on_connected sets the connected flag."""
|
|
with mock.patch.object(panda_box, "data_thread") as mock_data_thread:
|
|
mock_data_thread.is_alive.return_value = False
|
|
panda_box.on_connected()
|
|
mock_data_thread.start.assert_called_once()
|
|
assert len(panda_box._data_callbacks) == 1
|
|
cb_id = list(panda_box._data_callbacks.keys())[0]
|
|
assert panda_box._data_callbacks[cb_id]["callback"] == panda_box._receive_frame_data
|
|
assert panda_box._data_callbacks[cb_id]["data_type"] == PandaState.FRAME
|
|
|
|
# Remove callback
|
|
panda_box.remove_data_callback(cb_id)
|
|
assert len(panda_box._data_callbacks) == 0, "Data callback was not removed"
|
|
|
|
# Call on_connected again, should add the callback again
|
|
mock_data_thread.reset_mock()
|
|
mock_data_thread.is_alive.return_value = True
|
|
panda_box.on_connected()
|
|
mock_data_thread.start.assert_not_called()
|
|
|
|
|
|
def test_panda_add_status_callback(panda_box):
|
|
"""Test that add_status_callback adds proper status callbacks, and resolves them correctly."""
|
|
|
|
assert panda_box.panda_state == PandaState.DISARMED, "Initial PandaBox state should be DISARMED"
|
|
status = StatusBase(obj=panda_box)
|
|
# I. Resolve immediately, should be successful
|
|
panda_box.add_status_callback(status=status, success=PandaState.DISARMED, failure=[])
|
|
status.wait(timeout=1)
|
|
assert status.done, "Status should be done"
|
|
assert status.success, "Status should be successful"
|
|
assert len(panda_box._status_callbacks) == 0, "Status callback should never be added"
|
|
|
|
# II. Resolve immediately, but with failure state, should be unsuccessful
|
|
status = StatusBase(obj=panda_box)
|
|
panda_box.add_status_callback(status=status, success=[], failure=PandaState.DISARMED)
|
|
with pytest.raises(RuntimeError):
|
|
status.wait(timeout=1)
|
|
assert status.done, "Status should be done"
|
|
assert not status.success, "Status should be unsuccessful"
|
|
assert len(panda_box._status_callbacks) == 0, "Status callback should never be added"
|
|
|
|
# III. Resolve status in success
|
|
status = StatusBase(obj=panda_box)
|
|
panda_box.add_status_callback(status=status, success=PandaState.READY, failure=[])
|
|
assert len(panda_box._status_callbacks) == 1, "Status callback should be added"
|
|
panda_box._run_status_callbacks(PandaState.START)
|
|
assert not status.done, "Status should not be done"
|
|
assert not status.success, "Status should not be successful"
|
|
panda_box._run_status_callbacks(PandaState.READY)
|
|
status.wait(timeout=1)
|
|
assert status.done, "Status should be done"
|
|
assert status.success, "Status should be successful"
|
|
|
|
# IV. Resolve status in failure
|
|
status = StatusBase(obj=panda_box)
|
|
panda_box.add_status_callback(status=status, success=[PandaState.END], failure=PandaState.START)
|
|
panda_box._run_status_callbacks(PandaState.START)
|
|
with pytest.raises(RuntimeError):
|
|
status.wait(timeout=1)
|
|
assert status.done, "Status should be done"
|
|
assert not status.success, "Status should be unsuccessful"
|
|
|
|
|
|
def test_panda_receive_frame_data(panda_box, _signal_aliases):
|
|
"""Test that _receive_frame_data processes data and updates signals."""
|
|
# Create a mock FrameData
|
|
data = np.array(
|
|
[
|
|
(np.float64(0), np.float64(10)),
|
|
(np.float64(1), np.float64(11)),
|
|
(np.float64(2), np.float64(12)),
|
|
],
|
|
dtype=[("FMC_IN.VAL1.Value", "<f8"), ("COUNTER2.OUT.Value", "<f8")],
|
|
)
|
|
fdata = FrameData(data)
|
|
# Use on_connected to set up data callback
|
|
with mock.patch.object(panda_box, "data_thread") as mock_data_thread:
|
|
mock_data_thread.is_alive.return_value = False
|
|
panda_box.on_connected() # This will set up the data callback
|
|
mock_data_thread.start.assert_called_once()
|
|
|
|
# Run the callback
|
|
panda_box._run_data_callbacks(data=fdata, event_type=PandaState.FRAME)
|
|
|
|
# Check that the correct signals were updated
|
|
expected_data = {
|
|
f"{panda_box.data.name}_my_signal_1": {
|
|
"value": [np.float64(0), np.float64(1), np.float64(2)],
|
|
"timestamp": mock.ANY,
|
|
},
|
|
f"{panda_box.data.name}_COUNTER2_OUT_Value": {
|
|
"value": [np.float64(10), np.float64(11), np.float64(12)],
|
|
"timestamp": mock.ANY,
|
|
},
|
|
}
|
|
md = {
|
|
"async_update": {"type": "add", "max_shape": [None]},
|
|
"acquisition_group": panda_box._acquisition_group,
|
|
}
|
|
d = panda_box.data.read()
|
|
assert d[panda_box.data.name]["value"].metadata == md, "Metadata mismatch"
|
|
for key, v in expected_data.items():
|
|
assert key in d[panda_box.data.name]["value"].signals, f"Missing signal: {key}"
|
|
assert np.isclose(
|
|
d[panda_box.data.name]["value"].signals[key]["value"], v["value"]
|
|
).all(), f"Incorrect values for {key}"
|
|
assert (
|
|
"timestamp" in d[panda_box.data.name]["value"].signals[key]
|
|
), f"Missing timestamp for {key}"
|
|
|
|
assert (
|
|
len(panda_box._data_callbacks) == 1
|
|
), "Data callback should still be registered after receiving data"
|
|
|
|
|
|
def test_panda_on_stop(panda_box):
|
|
"""Test that on_stop clears the data callbacks."""
|
|
panda_box.panda_state = PandaState.READY
|
|
panda_box.data_thread_run_event.set() # Simulate that the data thread is running
|
|
with mock.patch.object(panda_box, "_disarm") as mock_disarm:
|
|
panda_box.stop()
|
|
mock_disarm.assert_called_once()
|
|
|
|
assert (
|
|
panda_box.panda_state == PandaState.DISARMED
|
|
), "PandaBox state should be DISARMED after stop"
|
|
# Should go back to DISARMED state
|
|
assert (
|
|
not panda_box.data_thread_run_event.is_set()
|
|
), "Data thread run event should be unset after stop"
|
|
|
|
|
|
def test_panda_on_destroy(panda_box):
|
|
"""Test that on_destroy clears the data callbacks."""
|
|
panda_box.destroy()
|
|
assert panda_box.data_thread_kill_event.is_set(), "Data thread kill event not set"
|
|
assert panda_box.data_thread_run_event.is_set(), "Data thread run event not set"
|
|
|
|
|
|
def test_panda_on_stage_on_unstage(panda_box):
|
|
"""Test that on_stage sets the acquisition group."""
|
|
panda_box.panda_state = PandaState.DISARMED
|
|
panda_box.stage()
|
|
assert panda_box.data_thread_run_event.is_set(), "Data thread run event not set"
|
|
assert panda_box.staged == Staged.yes
|
|
|
|
# Now we will unstage the panda box
|
|
with mock.patch.object(panda_box, "_disarm") as mock_disarm:
|
|
panda_box.unstage()
|
|
mock_disarm.assert_called_once()
|
|
assert panda_box.staged == Staged.no
|
|
assert (
|
|
not panda_box.data_thread_run_event.is_set()
|
|
), "Data thread run event should be unset after unstage"
|
|
|
|
# We call on_stage again, but this time the PandaBox state is incorrect
|
|
panda_box.panda_state = PandaState.FRAME
|
|
with pytest.raises(RuntimeError):
|
|
panda_box._stage_timeout_in_s = 0.1 # Set a short timeout for the test
|
|
panda_box.stage()
|
|
|
|
|
|
def test_panda_get_signal_names_allowed_for_capture(panda_box):
|
|
"""Test that get_signal_names_allowed_for_capture returns the correct signal names."""
|
|
return_capture = [
|
|
"!INENC1.VAL",
|
|
"!INENC2.VAL",
|
|
"!INENC3.VAL",
|
|
"!INENC4.VAL",
|
|
"!PCAP.TS_START",
|
|
"!PCAP.TS_END",
|
|
"!PCAP.TS_TRIG",
|
|
"!PCAP.GATE_DURATION",
|
|
"!PCAP.BITS0",
|
|
"!PCAP.BITS1",
|
|
"!PCAP.BITS2",
|
|
"!PCAP.BITS3",
|
|
"!CALC1.OUT",
|
|
"!CALC2.OUT",
|
|
"!COUNTER1.OUT",
|
|
"!COUNTER2.OUT",
|
|
"!COUNTER3.OUT",
|
|
"!COUNTER4.OUT",
|
|
"!COUNTER5.OUT",
|
|
"!COUNTER6.OUT",
|
|
"!COUNTER7.OUT",
|
|
"!COUNTER8.OUT",
|
|
"!FILTER1.OUT",
|
|
"!FILTER2.OUT",
|
|
"!PGEN1.OUT",
|
|
"!PGEN2.OUT",
|
|
"!FMC_IN.VAL1",
|
|
"!FMC_IN.VAL2",
|
|
"!FMC_IN.VAL3",
|
|
"!FMC_IN.VAL4",
|
|
"!FMC_IN.VAL5",
|
|
"!FMC_IN.VAL6",
|
|
"!FMC_IN.VAL7",
|
|
"!FMC_IN.VAL8",
|
|
"!SFP3_SYNC_IN.POS1",
|
|
"!SFP3_SYNC_IN.POS2",
|
|
"!SFP3_SYNC_IN.POS3",
|
|
"!SFP3_SYNC_IN.POS4",
|
|
".",
|
|
]
|
|
|
|
with mock.patch.object(panda_box, "send_raw") as mock_send_raw:
|
|
mock_send_raw.return_value = return_capture
|
|
list_of_signals = panda_box._get_signal_names_allowed_for_capture()
|
|
mock_send_raw.assert_called_once_with("*CAPTURE.*?")
|
|
assert list_of_signals == PANDA_AVAIL_PCAP_BLOCKS, "Signal names mismatch"
|
|
|
|
|
|
def test_panda_get_signal_names_configured_for_capture(panda_box):
|
|
"""Test that get_signal_names_configured_for_capture returns the correct signal names."""
|
|
return_capture = [
|
|
"!INENC1.VAL Min Max Mean",
|
|
"!INENC2.VAL Value",
|
|
"!INENC3.VAL Diff",
|
|
"!INENC4.VAL Sum",
|
|
"!PCAP.TS_START Min Max",
|
|
]
|
|
possible_signal_names = get_pcap_capture_fields()
|
|
|
|
with mock.patch.object(panda_box, "send_raw") as mock_send_raw:
|
|
mock_send_raw.return_value = return_capture
|
|
list_of_signals = panda_box._get_signal_names_configured_for_capture()
|
|
list_of_signals = [block_name_mapping(name) for name in list_of_signals]
|
|
mock_send_raw.assert_called_once_with("*CAPTURE?")
|
|
for signal in list_of_signals:
|
|
assert signal in possible_signal_names, f"Unexpected signal: {signal}"
|
|
|
|
|
|
def test_panda_pre_scan_status_callback(panda_box):
|
|
"""Test that pre_scan_status_callback sets the acquisition group."""
|
|
with mock.patch.object(panda_box, "_arm") as mock_arm:
|
|
# I. Called with status that is not done, should do nothing
|
|
status = StatusBase(obj=panda_box)
|
|
panda_box._pre_scan_status_callback(status)
|
|
mock_arm.assert_not_called() # _arm should not be called in pre_scan_status_callback
|
|
status.set_finished()
|
|
# II. Called with status that is done, should arm the PandaBox
|
|
panda_box._pre_scan_status_callback(status)
|
|
mock_arm.assert_called_once()
|
|
status = StatusBase(obj=panda_box)
|
|
status.set_exception(RuntimeError("Test error"))
|
|
# III. Called with status that is done but not successful, should do nothing
|
|
mock_arm.reset_mock()
|
|
panda_box._pre_scan_status_callback(status)
|
|
mock_arm.assert_not_called()
|
|
|
|
|
|
def test_panda_get_pcap_capture_fields():
|
|
"""Test that get_pcap_capture_fields returns the correct list of capture fields."""
|
|
expected_fields = []
|
|
for block in PANDA_AVAIL_PCAP_BLOCKS:
|
|
for field in PANDA_AVAIL_PCAP_CAPTURE_FIELDS:
|
|
expected_fields.append(block_name_mapping(f"{block}.{field}"))
|
|
actual_fields = get_pcap_capture_fields()
|
|
assert actual_fields == expected_fields, "PCAP capture fields mismatch"
|
|
|
|
|
|
def test_panda_on_pre_scan(panda_box):
|
|
"""Test that on_pre_scan adds the correct pre-scan status callback."""
|
|
# I. Resolve immediately in success
|
|
panda_box.panda_state = PandaState.READY
|
|
|
|
status = panda_box.on_pre_scan()
|
|
assert status.done, "Status should be done"
|
|
assert status.success, "Status should be successful"
|
|
|
|
# II. Resolve immediately in failure
|
|
panda_box.panda_state = PandaState.FRAME
|
|
status = panda_box.on_pre_scan()
|
|
with pytest.raises(RuntimeError):
|
|
status.wait(timeout=1)
|
|
assert status.done, "Status should be done"
|
|
assert not status.success, "Status should be unsuccessful"
|
|
|
|
# III. Resolve in callback
|
|
panda_box.panda_state = PandaState.DISARMED
|
|
status = panda_box.on_pre_scan()
|
|
assert not status.done, "Status should not be done"
|
|
assert not status.success, "Status should not be successful"
|
|
panda_box._run_status_callbacks(PandaState.READY)
|
|
status.wait(timeout=1)
|
|
assert status.done, "Status should be done"
|
|
assert status.success, "Status should be successful"
|
|
|
|
|
|
def test_data_loop():
|
|
"""Test that the data loop correctly handles receiving data and exiting on EndData."""
|
|
# Mock the exact class lookup used inside PandaBox._run_data_readout
|
|
with mock.patch(
|
|
"ophyd_devices.devices.panda_box.panda_box.PandaBoxDataConnection"
|
|
) as mock_data_connection_cls:
|
|
panda_box = PandaBox(
|
|
name="panda_box", host="localhost", signal_alias={"FMC_IN.VAL1.Value": "my_signal_1"}
|
|
)
|
|
|
|
mock_data_connection = mock.MagicMock()
|
|
mock_socket_connection = mock.MagicMock()
|
|
mock_data_connection_cls.return_value.__enter__.return_value = (
|
|
mock_socket_connection,
|
|
mock_data_connection,
|
|
)
|
|
|
|
recv_items = [b"chunk1", b"chunk2", socket.timeout(), b"chunk3", b"chunk4", b"chunk5"]
|
|
|
|
def recv_side_effect(_buffer_size):
|
|
item = recv_items.pop(0)
|
|
if isinstance(item, Exception):
|
|
raise item
|
|
return item
|
|
|
|
def receive_bytes_side_effect(chunk):
|
|
if chunk == b"chunk1":
|
|
return [
|
|
StartData(
|
|
fields=["FMC_IN.VAL1.Value", "COUNTER2.OUT.Value"],
|
|
missed=0,
|
|
process="Scaled",
|
|
format="Framed",
|
|
sample_bytes=16,
|
|
arm_time=None,
|
|
start_time=None,
|
|
hw_time_offset_ns=None,
|
|
)
|
|
]
|
|
if chunk == b"chunk2":
|
|
return [ReadyData()]
|
|
if chunk == b"chunk3":
|
|
return [
|
|
FrameData(
|
|
np.array(
|
|
[(3, 4)],
|
|
dtype=[("FMC_IN.VAL1.Value", "<f8"), ("COUNTER2.OUT.Value", "<f8")],
|
|
)
|
|
)
|
|
]
|
|
if chunk == b"chunk4":
|
|
raise NeedMoreDataError()
|
|
if chunk == b"chunk5":
|
|
return [EndData(samples=200, reason="Ok")]
|
|
return []
|
|
|
|
mock_socket_connection.recv.side_effect = recv_side_effect
|
|
mock_data_connection.receive_bytes.side_effect = receive_bytes_side_effect
|
|
|
|
status_start = StatusBase(obj=panda_box)
|
|
status_ready = StatusBase(obj=panda_box)
|
|
status_frame = StatusBase(obj=panda_box)
|
|
status_end = StatusBase(obj=panda_box)
|
|
status_disarmed = StatusBase(obj=panda_box)
|
|
|
|
for status, expected_state in [
|
|
(status_start, PandaState.START),
|
|
(status_ready, PandaState.READY),
|
|
(status_frame, PandaState.FRAME),
|
|
(status_end, PandaState.END),
|
|
(status_disarmed, PandaState.DISARMED),
|
|
]:
|
|
panda_box.add_status_callback(
|
|
status=status, success=[expected_state], failure=[], check_directly=False
|
|
)
|
|
|
|
assert panda_box.panda_state == PandaState.DISARMED, "Initial state should be DISARMED"
|
|
|
|
with (
|
|
mock.patch.object(panda_box.data, "put"),
|
|
mock.patch.object(panda_box, "_disarm") as mock_disarm,
|
|
):
|
|
panda_box.data_thread_run_event.set()
|
|
panda_box._run_data_readout()
|
|
|
|
for status in [status_start, status_ready, status_frame, status_end, status_disarmed]:
|
|
assert status.done, f"{status} should be done"
|
|
assert status.success, f"{status} should be successful"
|
|
|
|
mock_data_connection_cls.assert_called_once_with(
|
|
"localhost", scaled=True, socket_timeout=0.1
|
|
)
|
|
mock_disarm.assert_called_once(), "_disarm should be called once after receiving EndData"
|
|
|
|
assert (
|
|
not panda_box.data_thread_run_event.is_set()
|
|
), "Data thread run event should be unset after receiving EndData"
|
|
|
|
|
|
def test_pandabox_data_connection():
|
|
"""Test that PandaBoxDataConnection correctly connects and receives data."""
|
|
with mock.patch("socket.socket") as mock_socket_cls:
|
|
mock_socket = mock.MagicMock()
|
|
mock_socket_cls.return_value = mock_socket
|
|
|
|
with PandaBoxDataConnection("localhost", scaled=True, socket_timeout=0.1) as (
|
|
data_socket,
|
|
data_connection,
|
|
):
|
|
mock_socket.connect.assert_called_once_with(("localhost", 8889))
|
|
expected_connect_command = "XML FRAMED SCALED\n" # Connect command for scaled data
|
|
mock_socket.sendall.assert_called_once_with(expected_connect_command.encode())
|
|
|
|
# Simulate exiting the context manager, which should close the socket
|
|
mock_socket.close.assert_called_once()
|