mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2026-06-07 18:38:40 +02:00
test(panda-box): Add tests for data readout loop and context manager
This commit is contained in:
+128
-1
@@ -1,12 +1,22 @@
|
||||
# skip-file
|
||||
import socket
|
||||
from unittest import mock
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
from ophyd import Staged
|
||||
from pandablocks.connections import NeedMoreDataError
|
||||
|
||||
from ophyd_devices import StatusBase
|
||||
from ophyd_devices.devices.panda_box.panda_box import FrameData, PandaBox, PandaState
|
||||
from ophyd_devices.devices.panda_box.panda_box import (
|
||||
EndData,
|
||||
FrameData,
|
||||
PandaBox,
|
||||
PandaBoxDataConnection,
|
||||
PandaState,
|
||||
ReadyData,
|
||||
StartData,
|
||||
)
|
||||
from ophyd_devices.devices.panda_box.utils import (
|
||||
PANDA_AVAIL_PCAP_BLOCKS,
|
||||
PANDA_AVAIL_PCAP_CAPTURE_FIELDS,
|
||||
@@ -336,3 +346,120 @@ def test_panda_on_pre_scan(panda_box):
|
||||
status.wait(timeout=1)
|
||||
assert status.done, "Status should be done"
|
||||
assert status.success, "Status should be successful"
|
||||
|
||||
|
||||
def test_data_loop():
|
||||
"""Test that the data loop correctly handles receiving data and exiting on EndData."""
|
||||
# Mock the exact class lookup used inside PandaBox._run_data_readout
|
||||
with mock.patch(
|
||||
"ophyd_devices.devices.panda_box.panda_box.PandaBoxDataConnection"
|
||||
) as mock_data_connection_cls:
|
||||
panda_box = PandaBox(
|
||||
name="panda_box", host="localhost", signal_alias={"FMC_IN.VAL1.Value": "my_signal_1"}
|
||||
)
|
||||
|
||||
mock_data_connection = mock.MagicMock()
|
||||
mock_socket_connection = mock.MagicMock()
|
||||
mock_data_connection_cls.return_value.__enter__.return_value = (
|
||||
mock_socket_connection,
|
||||
mock_data_connection,
|
||||
)
|
||||
|
||||
recv_items = [b"chunk1", b"chunk2", socket.timeout(), b"chunk3", b"chunk4", b"chunk5"]
|
||||
|
||||
def recv_side_effect(_buffer_size):
|
||||
item = recv_items.pop(0)
|
||||
if isinstance(item, Exception):
|
||||
raise item
|
||||
return item
|
||||
|
||||
def receive_bytes_side_effect(chunk):
|
||||
if chunk == b"chunk1":
|
||||
return [
|
||||
StartData(
|
||||
fields=["FMC_IN.VAL1.Value", "COUNTER2.OUT.Value"],
|
||||
missed=0,
|
||||
process="Scaled",
|
||||
format="Framed",
|
||||
sample_bytes=16,
|
||||
arm_time=None,
|
||||
start_time=None,
|
||||
hw_time_offset_ns=None,
|
||||
)
|
||||
]
|
||||
if chunk == b"chunk2":
|
||||
return [ReadyData()]
|
||||
if chunk == b"chunk3":
|
||||
return [
|
||||
FrameData(
|
||||
np.array(
|
||||
[(3, 4)],
|
||||
dtype=[("FMC_IN.VAL1.Value", "<f8"), ("COUNTER2.OUT.Value", "<f8")],
|
||||
)
|
||||
)
|
||||
]
|
||||
if chunk == b"chunk4":
|
||||
raise NeedMoreDataError()
|
||||
if chunk == b"chunk5":
|
||||
return [EndData(samples=200, reason="Ok")]
|
||||
return []
|
||||
|
||||
mock_socket_connection.recv.side_effect = recv_side_effect
|
||||
mock_data_connection.receive_bytes.side_effect = receive_bytes_side_effect
|
||||
|
||||
status_start = StatusBase(obj=panda_box)
|
||||
status_ready = StatusBase(obj=panda_box)
|
||||
status_frame = StatusBase(obj=panda_box)
|
||||
status_end = StatusBase(obj=panda_box)
|
||||
status_disarmed = StatusBase(obj=panda_box)
|
||||
|
||||
for status, expected_state in [
|
||||
(status_start, PandaState.START),
|
||||
(status_ready, PandaState.READY),
|
||||
(status_frame, PandaState.FRAME),
|
||||
(status_end, PandaState.END),
|
||||
(status_disarmed, PandaState.DISARMED),
|
||||
]:
|
||||
panda_box.add_status_callback(
|
||||
status=status, success=[expected_state], failure=[], check_directly=False
|
||||
)
|
||||
|
||||
assert panda_box.panda_state == PandaState.DISARMED, "Initial state should be DISARMED"
|
||||
|
||||
with (
|
||||
mock.patch.object(panda_box.data, "put"),
|
||||
mock.patch.object(panda_box, "_disarm") as mock_disarm,
|
||||
):
|
||||
panda_box.data_thread_run_event.set()
|
||||
panda_box._run_data_readout()
|
||||
|
||||
for status in [status_start, status_ready, status_frame, status_end, status_disarmed]:
|
||||
assert status.done, f"{status} should be done"
|
||||
assert status.success, f"{status} should be successful"
|
||||
|
||||
mock_data_connection_cls.assert_called_once_with(
|
||||
"localhost", scaled=True, socket_timeout=0.1
|
||||
)
|
||||
mock_disarm.assert_called_once(), "_disarm should be called once after receiving EndData"
|
||||
|
||||
assert (
|
||||
not panda_box.data_thread_run_event.is_set()
|
||||
), "Data thread run event should be unset after receiving EndData"
|
||||
|
||||
|
||||
def test_pandabox_data_connection():
|
||||
"""Test that PandaBoxDataConnection correctly connects and receives data."""
|
||||
with mock.patch("socket.socket") as mock_socket_cls:
|
||||
mock_socket = mock.MagicMock()
|
||||
mock_socket_cls.return_value = mock_socket
|
||||
|
||||
with PandaBoxDataConnection("localhost", scaled=True, socket_timeout=0.1) as (
|
||||
data_socket,
|
||||
data_connection,
|
||||
):
|
||||
mock_socket.connect.assert_called_once_with(("localhost", 8889))
|
||||
expected_connect_command = "XML FRAMED SCALED\n" # Connect command for scaled data
|
||||
mock_socket.sendall.assert_called_once_with(expected_connect_command.encode())
|
||||
|
||||
# Simulate exiting the context manager, which should close the socket
|
||||
mock_socket.close.assert_called_once()
|
||||
|
||||
Reference in New Issue
Block a user