Files
tomcat_bec/tests/tests_devices/test_stddaq_client.py
gac-x05la ae85d179f5 refactor: std daq integration
Major refactor of the std daq integration for the PCO Edge camera and Gigafrost camera.
New live processing capabilities have been added, and the code has been cleaned up for better maintainability.
2025-06-16 16:59:08 +02:00

362 lines
12 KiB
Python

import json
from unittest import mock
import pytest
import requests
import requests_mock
import typeguard
from ophyd import StatusBase
from websockets import WebSocketException
from tomcat_bec.devices.std_daq.std_daq_client import (
StdDaqClient,
StdDaqConfig,
StdDaqError,
StdDaqStatus,
)
@pytest.fixture
def client():
parent_device = mock.MagicMock()
_client = StdDaqClient(
parent=parent_device, ws_url="http://localhost:5000", rest_url="http://localhost:5000"
)
yield _client
_client.shutdown()
@pytest.fixture
def full_config():
full_config = StdDaqConfig(
detector_name="tomcat-gf",
detector_type="gigafrost",
n_modules=8,
bit_depth=16,
image_pixel_height=2016,
image_pixel_width=2016,
start_udp_port=2000,
writer_user_id=18600,
max_number_of_forwarders_spawned=8,
use_all_forwarders=True,
module_sync_queue_size=4096,
number_of_writers=12,
module_positions={},
ram_buffer_gb=150,
delay_filter_timeout=10,
live_stream_configs={
"tcp://129.129.95.111:20000": {"type": "periodic", "config": [1, 5]},
"tcp://129.129.95.111:20001": {"type": "periodic", "config": [1, 5]},
"tcp://129.129.95.38:20000": {"type": "periodic", "config": [1, 1]},
},
)
return full_config
def test_stddaq_client(client):
assert client is not None
def test_stddaq_client_get_daq_config(client, full_config):
with requests_mock.Mocker() as m:
response = full_config
m.get(
"http://localhost:5000/api/config/get?user=ioc",
json=response.model_dump(exclude_defaults=True),
)
out = client.get_config()
# Check that the response is simply the json response
assert out == response.model_dump(exclude_defaults=True)
assert client._config == response
def test_stddaq_client_set_config_pydantic(client, full_config):
"""Test setting configurations through the StdDAQ client"""
with requests_mock.Mocker() as m:
m.post("http://localhost:5000/api/config/set?user=ioc")
# Test with StdDaqConfig object
config = full_config
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
client.set_config(config)
# Verify the last request
assert m.last_request.json() == full_config.model_dump(exclude_defaults=True)
def test_std_daq_client_set_config_dict(client, full_config):
"""
Test setting configurations through the StdDAQ client with a dictionary input.
"""
with requests_mock.Mocker() as m:
m.post("http://localhost:5000/api/config/set?user=ioc")
# Test with dictionary input
config_dict = full_config.model_dump()
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
client.set_config(config_dict)
assert m.last_request.json() == full_config.model_dump(exclude_defaults=True)
def test_stddaq_client_set_config_ignores_extra_keys(client, full_config):
"""
Test that the set_config method ignores extra keys in the input dictionary.
"""
with requests_mock.Mocker() as m:
m.post("http://localhost:5000/api/config/set?user=ioc")
# Test with dictionary input
config_dict = full_config.model_dump()
config_dict["extra_key"] = "extra_value"
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
client.set_config(config_dict)
assert m.last_request.json() == full_config.model_dump(exclude_defaults=True)
def test_stddaq_client_set_config_error(client, full_config):
"""
Test error handling in the set_config method.
"""
with requests_mock.Mocker() as m:
config = full_config
m.post("http://localhost:5000/api/config/set?user=ioc", status_code=500)
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
with pytest.raises(requests.exceptions.HTTPError):
client.set_config(config)
def test_stddaq_client_get_config_cached(client, full_config):
"""
Test that the client returns the cached configuration if it is available.
"""
# Set the cached configuration
config = full_config
client._config = config
# Test that the client returns the cached configuration
assert client.get_config(cached=True) == config
def test_stddaq_client_status(client):
client._status = StdDaqStatus.FILE_CREATED
assert client.status == StdDaqStatus.FILE_CREATED
def test_stddaq_client_start(client):
with mock.patch("tomcat_bec.devices.std_daq.std_daq_client.StatusBase") as StatusBase:
client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images=10)
out = client._send_queue.get()
assert out == {
"command": "start",
"path": "test_file_path",
"file_prefix": "test_file_prefix",
"n_image": 10,
}
StatusBase().wait.assert_called_once()
def test_stddaq_client_start_type_error(client):
with pytest.raises(typeguard.TypeCheckError):
client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images="10")
def test_stddaq_client_stop(client):
"""
Check that the stop method puts the stop command in the send queue.
"""
client.stop()
client._send_queue.get() == {"command": "stop"}
def test_stddaq_client_update_config(client, full_config):
"""
Test that the update_config method updates the configuration with the provided dictionary.
"""
config = full_config
with requests_mock.Mocker() as m:
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
# Update the configuration
update_dict = {"detector_name": "new_name"}
with mock.patch.object(client, "set_config") as set_config:
client.update_config(update_dict)
assert set_config.call_count == 1
def test_stddaq_client_updates_only_changed_configs(client, full_config):
"""
Test that the update_config method only updates the configuration if the config has changed.
"""
config = full_config
with requests_mock.Mocker() as m:
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
# Update the configuration
update_dict = {"detector_name": "tomcat-gf"}
with mock.patch.object(client, "set_config") as set_config:
client.update_config(update_dict)
assert set_config.call_count == 0
def test_stddaq_client_updates_only_changed_configs_empty(client, full_config):
"""
Test that the update_config method only updates the configuration if the config has changed.
"""
config = full_config
with requests_mock.Mocker() as m:
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
# Update the configuration
update_dict = {}
with mock.patch.object(client, "set_config") as set_config:
client.update_config(update_dict)
assert set_config.call_count == 0
def test_stddaq_client_pre_restart(client):
"""
Test that the pre_restart method sets the status to RESTARTING.
"""
# let's assume the websocket loop is already idle
client._ws_idle_event.set()
client.ws_client = mock.MagicMock()
client._pre_restart()
client.ws_client.close.assert_called_once()
def test_stddaq_client_post_restart(client):
"""
Test that the post_restart method sets the status to IDLE.
"""
with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
client._post_restart()
wait_for_connection.assert_called_once()
assert client._daq_is_running.is_set()
def test_stddaq_client_reset(client):
"""
Test that the reset method calls get_config and set_config.
"""
with (
mock.patch.object(client, "get_config") as get_config,
mock.patch.object(client, "set_config") as set_config,
):
client.reset()
get_config.assert_called_once()
set_config.assert_called_once()
def test_stddaq_client_run_status_callbacks(client):
"""
Test that the run_status_callback method runs the status callback.
"""
status = StatusBase()
client.add_status_callback(status, success=[StdDaqStatus.FILE_CREATED], error=[])
client._status = StdDaqStatus.FILE_CREATED
client._run_status_callbacks()
status.wait()
assert len(status._callbacks) == 0
def test_stddaq_client_run_status_callbacks_error(client):
"""
Test that the run_status_callback method runs the status callback.
"""
status = StatusBase()
client.add_status_callback(status, success=[], error=[StdDaqStatus.FILE_CREATED])
client._status = StdDaqStatus.FILE_CREATED
client._run_status_callbacks()
with pytest.raises(StdDaqError):
status.wait()
assert len(status._callbacks) == 0
@pytest.mark.parametrize(
"msg, updated",
[({"status": "IDLE"}, False), (json.dumps({"status": "waiting_for_first_image"}), True)],
)
def test_stddaq_client_on_received_ws_message(client, msg, updated):
"""
Test that the on_received_ws_message method runs the status callback.
"""
client._status = None
with mock.patch.object(client, "_run_status_callbacks") as run_status_callbacks:
client._on_received_ws_message(msg)
if updated:
run_status_callbacks.assert_called_once()
assert client._status == StdDaqStatus.WAITING_FOR_FIRST_IMAGE
else:
run_status_callbacks.assert_not_called()
assert client._status is None
def test_stddaq_client_ws_send_and_receive(client):
client.ws_client = mock.MagicMock()
client._send_queue.put({"command": "test"})
client._ws_send_and_receive()
# queue is not empty, so we should send the message
client.ws_client.send.assert_called_once()
client.ws_client.recv.assert_called_once()
client.ws_client.reset_mock()
client._ws_send_and_receive()
# queue is empty, so we should not send the message
client.ws_client.send.assert_not_called()
client.ws_client.recv.assert_called_once()
def test_stddaq_client_ws_send_and_receive_websocket_error(client):
"""
Test that the ws_send_and_receive method handles websocket errors.
"""
client.ws_client = mock.MagicMock()
client.ws_client.send.side_effect = WebSocketException()
client._send_queue.put({"command": "test"})
with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
client._ws_send_and_receive()
wait_for_connection.assert_called_once()
def test_stddaq_client_ws_send_and_receive_timeout_error(client):
"""
Test that the ws_send_and_receive method handles timeout errors.
"""
client.ws_client = mock.MagicMock()
client.ws_client.recv.side_effect = TimeoutError()
client._send_queue.put({"command": "test"})
with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
client._ws_send_and_receive()
wait_for_connection.assert_not_called()
def test_stddaq_client_ws_update_loop(client):
"""
Test that the ws_update_loop method runs the status callback.
"""
client._shutdown_event = mock.MagicMock()
client._shutdown_event.is_set.side_effect = [False, True]
with (
mock.patch.object(client, "_ws_send_and_receive") as ws_send_and_receive,
mock.patch.object(client, "_wait_for_server_running") as wait_for_server_running,
):
client._ws_update_loop()
ws_send_and_receive.assert_called_once()
wait_for_server_running.assert_called_once()