Cutting back on unittests

This commit is contained in:
gac-x05la
2025-04-16 14:03:22 +02:00
parent 6dd03d24a0
commit 56f4a6a61e
2 changed files with 39 additions and 121 deletions

View File

@@ -67,6 +67,8 @@ def test_stddaq_client_get_daq_config(client, full_config):
def test_stddaq_client_set_config_pydantic(client, full_config):
"""Test setting configurations through the StdDAQ client"""
with requests_mock.Mocker() as m:
response = full_config
m.get("http://localhost:5000/api/config/get?user=ioc", json=response.model_dump())
m.post("http://localhost:5000/api/config/set?user=ioc")
# Test with StdDaqConfig object
@@ -84,6 +86,8 @@ def test_std_daq_client_set_config_dict(client, full_config):
"""
with requests_mock.Mocker() as m:
response = full_config
m.get("http://localhost:5000/api/config/get?user=ioc", json=response.model_dump())
m.post("http://localhost:5000/api/config/set?user=ioc")
# Test with dictionary input
@@ -99,6 +103,8 @@ def test_stddaq_client_set_config_ignores_extra_keys(client, full_config):
"""
with requests_mock.Mocker() as m:
response = full_config
m.get("http://localhost:5000/api/config/get?user=ioc", json=response.model_dump())
m.post("http://localhost:5000/api/config/set?user=ioc")
# Test with dictionary input
@@ -139,33 +145,11 @@ def test_stddaq_client_status(client):
assert client.status == StdDaqStatus.FILE_CREATED
def test_stddaq_client_start(client):
with mock.patch("tomcat_bec.devices.gigafrost.std_daq_client.StatusBase") as StatusBase:
client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images=10)
out = client._send_queue.get()
assert out == {
"command": "start",
"path": "test_file_path",
"file_prefix": "test_file_prefix",
"n_image": 10,
}
StatusBase().wait.assert_called_once()
def test_stddaq_client_start_type_error(client):
with pytest.raises(typeguard.TypeCheckError):
client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images="10")
def test_stddaq_client_stop(client):
"""
Check that the stop method puts the stop command in the send queue.
"""
client.stop()
client._send_queue.get() == {"command": "stop"}
def test_stddaq_client_update_config(client, full_config):
"""
Test that the update_config method updates the configuration with the provided dictionary.
@@ -178,7 +162,7 @@ def test_stddaq_client_update_config(client, full_config):
# Update the configuration
update_dict = {"detector_name": "new_name"}
with mock.patch.object(client, "set_config") as set_config:
client.update_config(update_dict)
client.set_config(update_dict, update=True)
assert set_config.call_count == 1
@@ -195,24 +179,7 @@ def test_stddaq_client_updates_only_changed_configs(client, full_config):
# Update the configuration
update_dict = {"detector_name": "tomcat-gf"}
with mock.patch.object(client, "set_config") as set_config:
client.update_config(update_dict)
assert set_config.call_count == 0
def test_stddaq_client_updates_only_changed_configs_empty(client, full_config):
"""
Test that the update_config method only updates the configuration if the config has changed.
"""
config = full_config
with requests_mock.Mocker() as m:
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
# Update the configuration
update_dict = {}
with mock.patch.object(client, "set_config") as set_config:
client.update_config(update_dict)
client.set_config(update_dict, update=True)
assert set_config.call_count == 0
@@ -278,76 +245,20 @@ def test_stddaq_client_run_status_callbacks_error(client):
assert len(status._callbacks) == 0
@pytest.mark.parametrize(
"msg, updated",
[({"status": "IDLE"}, False), (json.dumps({"status": "waiting_for_first_image"}), True)],
)
def test_stddaq_client_on_received_ws_message(client, msg, updated):
"""
Test that the on_received_ws_message method runs the status callback.
"""
client._status = None
with mock.patch.object(client, "_run_status_callbacks") as run_status_callbacks:
client._on_received_ws_message(msg)
if updated:
run_status_callbacks.assert_called_once()
assert client._status == StdDaqStatus.WAITING_FOR_FIRST_IMAGE
else:
run_status_callbacks.assert_not_called()
assert client._status is None
def test_stddaq_client_ws_send_and_receive(client):
client.ws_client = mock.MagicMock()
client._send_queue.put({"command": "test"})
client._ws_send_and_receive()
# queue is not empty, so we should send the message
client.ws_client.send.assert_called_once()
client.ws_client.recv.assert_called_once()
client.ws_client.reset_mock()
client._ws_send_and_receive()
# queue is empty, so we should not send the message
client.ws_client.send.assert_not_called()
client.ws_client.recv.assert_called_once()
def test_stddaq_client_ws_send_and_receive_websocket_error(client):
"""
Test that the ws_send_and_receive method handles websocket errors.
"""
client.ws_client = mock.MagicMock()
client.ws_client.send.side_effect = WebSocketException()
client._send_queue.put({"command": "test"})
with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
client._ws_send_and_receive()
wait_for_connection.assert_called_once()
def test_stddaq_client_ws_send_and_receive_timeout_error(client):
"""
Test that the ws_send_and_receive method handles timeout errors.
"""
client.ws_client = mock.MagicMock()
client.ws_client.recv.side_effect = TimeoutError()
client._send_queue.put({"command": "test"})
with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
client._ws_send_and_receive()
wait_for_connection.assert_not_called()
def test_stddaq_client_ws_update_loop(client):
"""
Test that the ws_update_loop method runs the status callback.
"""
client._shutdown_event = mock.MagicMock()
client._shutdown_event.is_set.side_effect = [False, True]
with (
mock.patch.object(client, "_ws_send_and_receive") as ws_send_and_receive,
mock.patch.object(client, "_wait_for_server_running") as wait_for_server_running,
):
client._ws_update_loop()
ws_send_and_receive.assert_called_once()
wait_for_server_running.assert_called_once()
# @pytest.mark.parametrize(
# "msg, updated",
# [({"status": "IDLE"}, False), (json.dumps({"status": "waiting_for_first_image"}), True)],
# )
# def test_stddaq_client_on_received_ws_message(client, msg, updated):
# """
# Test that the on_received_ws_message method runs the status callback.
# """
# client._status = None
# with mock.patch.object(client, "_run_status_callbacks") as run_status_callbacks:
# client._on_received_ws_message(msg)
# if updated:
# run_status_callbacks.assert_called_once()
# assert client._status == StdDaqStatus.WAITING_FOR_FIRST_IMAGE
# else:
# run_status_callbacks.assert_not_called()
# assert client._status is None

View File

@@ -151,7 +151,7 @@ class StdDaqClient:
return None
return status
def get_config(self, timeout: float = 2) -> dict:
def get_config(self, timeout: float = 2, cached=False) -> dict:
"""Get the current configuration of the StdDAQ.
Args:
@@ -159,6 +159,9 @@ class StdDaqClient:
Returns:
config (dict): configuration of the StdDAQ
"""
if cached:
return self._config
response = requests.get(
self.rest_url + "/api/config/get", params={"user": "ioc"}, timeout=timeout
)
@@ -166,7 +169,9 @@ class StdDaqClient:
self._config = response.json()
return self._config
def set_config(self, config: dict, timeout: float = 2, update: bool = True, force: bool=True) -> None:
def set_config(
self, config: dict, timeout: float = 2, update: bool = True, force: bool = True
) -> None:
"""
Set the configuration of the StdDAQ. This will overwrite the current configuration.
@@ -193,7 +198,10 @@ class StdDaqClient:
# new_jason = json.dumps(new_config)
logger.warning(new_config)
response = requests.post(
self.rest_url + "/api/config/set", params={"user": "ioc"}, json=new_config, timeout=timeout
self.rest_url + "/api/config/set",
params={"user": "ioc"},
json=new_config,
timeout=timeout,
)
response.raise_for_status()
@@ -282,7 +290,6 @@ class StdDaqClient:
self._ws_client = None
logger.warning("Shutdown complete")
def _wait_for_server_running(self):
"""
Wait for the StdDAQ to be running. If the StdDaq is not running, the
@@ -349,14 +356,14 @@ class StdDaqClient:
self._status_callbacks.pop(id(cb))
# Automatically connect to microXAS testbench if directly invoked
if __name__ == "__main__":
# pylint: disable=disallowed-name,too-few-public-methods
class foo:
"""Dummy"""
name="bar"
name = "bar"
daq = StdDaqClient(
parent=foo(), ws_url='ws://129.129.95.111:8080', rest_url='http://129.129.95.111:5000'
parent=foo(), ws_url="ws://129.129.95.111:8080", rest_url="http://129.129.95.111:5000"
)