Major refactor of the std daq integration for the PCO Edge camera and Gigafrost camera. New live processing capabilities have been added, and the code has been cleaned up for better maintainability.
362 lines
12 KiB
Python
362 lines
12 KiB
Python
import json
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
import requests
|
|
import requests_mock
|
|
import typeguard
|
|
from ophyd import StatusBase
|
|
from websockets import WebSocketException
|
|
|
|
from tomcat_bec.devices.std_daq.std_daq_client import (
|
|
StdDaqClient,
|
|
StdDaqConfig,
|
|
StdDaqError,
|
|
StdDaqStatus,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def client():
|
|
parent_device = mock.MagicMock()
|
|
_client = StdDaqClient(
|
|
parent=parent_device, ws_url="http://localhost:5000", rest_url="http://localhost:5000"
|
|
)
|
|
yield _client
|
|
_client.shutdown()
|
|
|
|
|
|
@pytest.fixture
|
|
def full_config():
|
|
full_config = StdDaqConfig(
|
|
detector_name="tomcat-gf",
|
|
detector_type="gigafrost",
|
|
n_modules=8,
|
|
bit_depth=16,
|
|
image_pixel_height=2016,
|
|
image_pixel_width=2016,
|
|
start_udp_port=2000,
|
|
writer_user_id=18600,
|
|
max_number_of_forwarders_spawned=8,
|
|
use_all_forwarders=True,
|
|
module_sync_queue_size=4096,
|
|
number_of_writers=12,
|
|
module_positions={},
|
|
ram_buffer_gb=150,
|
|
delay_filter_timeout=10,
|
|
live_stream_configs={
|
|
"tcp://129.129.95.111:20000": {"type": "periodic", "config": [1, 5]},
|
|
"tcp://129.129.95.111:20001": {"type": "periodic", "config": [1, 5]},
|
|
"tcp://129.129.95.38:20000": {"type": "periodic", "config": [1, 1]},
|
|
},
|
|
)
|
|
return full_config
|
|
|
|
|
|
def test_stddaq_client(client):
|
|
assert client is not None
|
|
|
|
|
|
def test_stddaq_client_get_daq_config(client, full_config):
|
|
with requests_mock.Mocker() as m:
|
|
response = full_config
|
|
m.get(
|
|
"http://localhost:5000/api/config/get?user=ioc",
|
|
json=response.model_dump(exclude_defaults=True),
|
|
)
|
|
out = client.get_config()
|
|
|
|
# Check that the response is simply the json response
|
|
assert out == response.model_dump(exclude_defaults=True)
|
|
|
|
assert client._config == response
|
|
|
|
|
|
def test_stddaq_client_set_config_pydantic(client, full_config):
|
|
"""Test setting configurations through the StdDAQ client"""
|
|
with requests_mock.Mocker() as m:
|
|
m.post("http://localhost:5000/api/config/set?user=ioc")
|
|
|
|
# Test with StdDaqConfig object
|
|
config = full_config
|
|
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
|
|
client.set_config(config)
|
|
|
|
# Verify the last request
|
|
assert m.last_request.json() == full_config.model_dump(exclude_defaults=True)
|
|
|
|
|
|
def test_std_daq_client_set_config_dict(client, full_config):
|
|
"""
|
|
Test setting configurations through the StdDAQ client with a dictionary input.
|
|
"""
|
|
|
|
with requests_mock.Mocker() as m:
|
|
m.post("http://localhost:5000/api/config/set?user=ioc")
|
|
|
|
# Test with dictionary input
|
|
config_dict = full_config.model_dump()
|
|
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
|
|
client.set_config(config_dict)
|
|
assert m.last_request.json() == full_config.model_dump(exclude_defaults=True)
|
|
|
|
|
|
def test_stddaq_client_set_config_ignores_extra_keys(client, full_config):
|
|
"""
|
|
Test that the set_config method ignores extra keys in the input dictionary.
|
|
"""
|
|
|
|
with requests_mock.Mocker() as m:
|
|
m.post("http://localhost:5000/api/config/set?user=ioc")
|
|
|
|
# Test with dictionary input
|
|
config_dict = full_config.model_dump()
|
|
config_dict["extra_key"] = "extra_value"
|
|
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
|
|
client.set_config(config_dict)
|
|
assert m.last_request.json() == full_config.model_dump(exclude_defaults=True)
|
|
|
|
|
|
def test_stddaq_client_set_config_error(client, full_config):
|
|
"""
|
|
Test error handling in the set_config method.
|
|
"""
|
|
with requests_mock.Mocker() as m:
|
|
config = full_config
|
|
m.post("http://localhost:5000/api/config/set?user=ioc", status_code=500)
|
|
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
|
|
with pytest.raises(requests.exceptions.HTTPError):
|
|
client.set_config(config)
|
|
|
|
|
|
def test_stddaq_client_get_config_cached(client, full_config):
|
|
"""
|
|
Test that the client returns the cached configuration if it is available.
|
|
"""
|
|
|
|
# Set the cached configuration
|
|
config = full_config
|
|
client._config = config
|
|
|
|
# Test that the client returns the cached configuration
|
|
assert client.get_config(cached=True) == config
|
|
|
|
|
|
def test_stddaq_client_status(client):
|
|
client._status = StdDaqStatus.FILE_CREATED
|
|
assert client.status == StdDaqStatus.FILE_CREATED
|
|
|
|
|
|
def test_stddaq_client_start(client):
|
|
|
|
with mock.patch("tomcat_bec.devices.std_daq.std_daq_client.StatusBase") as StatusBase:
|
|
client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images=10)
|
|
out = client._send_queue.get()
|
|
assert out == {
|
|
"command": "start",
|
|
"path": "test_file_path",
|
|
"file_prefix": "test_file_prefix",
|
|
"n_image": 10,
|
|
}
|
|
StatusBase().wait.assert_called_once()
|
|
|
|
|
|
def test_stddaq_client_start_type_error(client):
|
|
with pytest.raises(typeguard.TypeCheckError):
|
|
client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images="10")
|
|
|
|
|
|
def test_stddaq_client_stop(client):
|
|
"""
|
|
Check that the stop method puts the stop command in the send queue.
|
|
"""
|
|
client.stop()
|
|
client._send_queue.get() == {"command": "stop"}
|
|
|
|
|
|
def test_stddaq_client_update_config(client, full_config):
|
|
"""
|
|
Test that the update_config method updates the configuration with the provided dictionary.
|
|
"""
|
|
|
|
config = full_config
|
|
with requests_mock.Mocker() as m:
|
|
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
|
|
|
|
# Update the configuration
|
|
update_dict = {"detector_name": "new_name"}
|
|
with mock.patch.object(client, "set_config") as set_config:
|
|
client.update_config(update_dict)
|
|
|
|
assert set_config.call_count == 1
|
|
|
|
|
|
def test_stddaq_client_updates_only_changed_configs(client, full_config):
|
|
"""
|
|
Test that the update_config method only updates the configuration if the config has changed.
|
|
"""
|
|
|
|
config = full_config
|
|
with requests_mock.Mocker() as m:
|
|
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
|
|
|
|
# Update the configuration
|
|
update_dict = {"detector_name": "tomcat-gf"}
|
|
with mock.patch.object(client, "set_config") as set_config:
|
|
client.update_config(update_dict)
|
|
|
|
assert set_config.call_count == 0
|
|
|
|
|
|
def test_stddaq_client_updates_only_changed_configs_empty(client, full_config):
|
|
"""
|
|
Test that the update_config method only updates the configuration if the config has changed.
|
|
"""
|
|
|
|
config = full_config
|
|
with requests_mock.Mocker() as m:
|
|
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
|
|
|
|
# Update the configuration
|
|
update_dict = {}
|
|
with mock.patch.object(client, "set_config") as set_config:
|
|
client.update_config(update_dict)
|
|
|
|
assert set_config.call_count == 0
|
|
|
|
|
|
def test_stddaq_client_pre_restart(client):
|
|
"""
|
|
Test that the pre_restart method sets the status to RESTARTING.
|
|
"""
|
|
# let's assume the websocket loop is already idle
|
|
client._ws_idle_event.set()
|
|
client.ws_client = mock.MagicMock()
|
|
client._pre_restart()
|
|
client.ws_client.close.assert_called_once()
|
|
|
|
|
|
def test_stddaq_client_post_restart(client):
|
|
"""
|
|
Test that the post_restart method sets the status to IDLE.
|
|
"""
|
|
with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
|
|
client._post_restart()
|
|
wait_for_connection.assert_called_once()
|
|
assert client._daq_is_running.is_set()
|
|
|
|
|
|
def test_stddaq_client_reset(client):
|
|
"""
|
|
Test that the reset method calls get_config and set_config.
|
|
"""
|
|
with (
|
|
mock.patch.object(client, "get_config") as get_config,
|
|
mock.patch.object(client, "set_config") as set_config,
|
|
):
|
|
client.reset()
|
|
get_config.assert_called_once()
|
|
set_config.assert_called_once()
|
|
|
|
|
|
def test_stddaq_client_run_status_callbacks(client):
|
|
"""
|
|
Test that the run_status_callback method runs the status callback.
|
|
"""
|
|
status = StatusBase()
|
|
client.add_status_callback(status, success=[StdDaqStatus.FILE_CREATED], error=[])
|
|
client._status = StdDaqStatus.FILE_CREATED
|
|
client._run_status_callbacks()
|
|
status.wait()
|
|
|
|
assert len(status._callbacks) == 0
|
|
|
|
|
|
def test_stddaq_client_run_status_callbacks_error(client):
|
|
"""
|
|
Test that the run_status_callback method runs the status callback.
|
|
"""
|
|
status = StatusBase()
|
|
client.add_status_callback(status, success=[], error=[StdDaqStatus.FILE_CREATED])
|
|
client._status = StdDaqStatus.FILE_CREATED
|
|
client._run_status_callbacks()
|
|
with pytest.raises(StdDaqError):
|
|
status.wait()
|
|
|
|
assert len(status._callbacks) == 0
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"msg, updated",
|
|
[({"status": "IDLE"}, False), (json.dumps({"status": "waiting_for_first_image"}), True)],
|
|
)
|
|
def test_stddaq_client_on_received_ws_message(client, msg, updated):
|
|
"""
|
|
Test that the on_received_ws_message method runs the status callback.
|
|
"""
|
|
client._status = None
|
|
with mock.patch.object(client, "_run_status_callbacks") as run_status_callbacks:
|
|
client._on_received_ws_message(msg)
|
|
if updated:
|
|
run_status_callbacks.assert_called_once()
|
|
assert client._status == StdDaqStatus.WAITING_FOR_FIRST_IMAGE
|
|
else:
|
|
run_status_callbacks.assert_not_called()
|
|
assert client._status is None
|
|
|
|
|
|
def test_stddaq_client_ws_send_and_receive(client):
|
|
|
|
client.ws_client = mock.MagicMock()
|
|
client._send_queue.put({"command": "test"})
|
|
client._ws_send_and_receive()
|
|
# queue is not empty, so we should send the message
|
|
client.ws_client.send.assert_called_once()
|
|
client.ws_client.recv.assert_called_once()
|
|
|
|
client.ws_client.reset_mock()
|
|
client._ws_send_and_receive()
|
|
# queue is empty, so we should not send the message
|
|
client.ws_client.send.assert_not_called()
|
|
client.ws_client.recv.assert_called_once()
|
|
|
|
|
|
def test_stddaq_client_ws_send_and_receive_websocket_error(client):
|
|
"""
|
|
Test that the ws_send_and_receive method handles websocket errors.
|
|
"""
|
|
client.ws_client = mock.MagicMock()
|
|
client.ws_client.send.side_effect = WebSocketException()
|
|
client._send_queue.put({"command": "test"})
|
|
with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
|
|
client._ws_send_and_receive()
|
|
wait_for_connection.assert_called_once()
|
|
|
|
|
|
def test_stddaq_client_ws_send_and_receive_timeout_error(client):
|
|
"""
|
|
Test that the ws_send_and_receive method handles timeout errors.
|
|
"""
|
|
client.ws_client = mock.MagicMock()
|
|
client.ws_client.recv.side_effect = TimeoutError()
|
|
client._send_queue.put({"command": "test"})
|
|
with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
|
|
client._ws_send_and_receive()
|
|
wait_for_connection.assert_not_called()
|
|
|
|
|
|
def test_stddaq_client_ws_update_loop(client):
|
|
"""
|
|
Test that the ws_update_loop method runs the status callback.
|
|
"""
|
|
client._shutdown_event = mock.MagicMock()
|
|
client._shutdown_event.is_set.side_effect = [False, True]
|
|
with (
|
|
mock.patch.object(client, "_ws_send_and_receive") as ws_send_and_receive,
|
|
mock.patch.object(client, "_wait_for_server_running") as wait_for_server_running,
|
|
):
|
|
client._ws_update_loop()
|
|
|
|
ws_send_and_receive.assert_called_once()
|
|
wait_for_server_running.assert_called_once()
|