Files
apocalypse/tests/res/test_send_redo_msg.py
woznic_n 177d8777c1
All checks were successful
Run apocalypse tests / Explore-Gitea-Actions (push) Successful in 10s
first commit
2025-09-08 18:53:14 +02:00

138 lines
4.5 KiB
Python

# pylint:disable=missing-function-docstring
import copy
import json
from unittest.mock import MagicMock, Mock, mock_open, patch
import pytest
from apocalypse import broker_config
from res.send_redo_msg import prepare_msg, send_msg
@pytest.fixture
def mock_pika():
with (
patch("res.send_redo_msg.ConnectionParameters") as mock_params,
patch("res.send_redo_msg.BlockingConnection") as mock_connection
):
mock_channel = Mock()
mock_queue = Mock()
mock_queue.method.queue = ""
mock_channel.queue_declare.return_value = mock_queue
mock_channel_context = Mock()
mock_channel_context.__enter__ = Mock(return_value=mock_channel)
mock_channel_context.__exit__ = Mock()
mock_connection_instance = Mock()
mock_connection_instance.channel = Mock(return_value=mock_channel_context)
mock_connection.return_value = Mock()
mock_connection.return_value.__enter__ = Mock(return_value=mock_connection_instance)
mock_connection.return_value.__exit__ = Mock()
yield {
"params": mock_params,
"connection": mock_connection_instance,
"connection_class": mock_connection,
"channel": mock_channel
}
@pytest.fixture
def mock_args():
args = MagicMock()
args.paths = []
args.files_meta = []
args.files = []
args.run = ["0045", "0100"]
args.acq = ["0001", "0002"]
args.file_suffix = "test"
args.broker_url = "foo"
return args
@pytest.fixture
def mock_dict():
mock_data = {
"pgroup": "p20000",
"start_pulseid": 1,
"stop_pulseid": 100,
"endstation": "test",
"request_time": " "
}
return mock_data
@patch("res.send_redo_msg.BlockingConnection")
@patch("res.send_redo_msg.ConnectionParameters")
def test_send_msg__successful_connection(mock_params, mock_connection_class, mock_pika, mock_args):
mock_connection_class.return_value = mock_pika["connection_class"].return_value
send_msg(mock_args)
mock_params.assert_called_once_with(mock_args.broker_url)
mock_connection_class.assert_called_once_with(mock_params.return_value)
mock_pika["connection_class"].return_value.__enter__.assert_called_once()
mock_pika["channel"].exchange_declare.assert_called_once_with(
exchange=broker_config.STATUS_EXCHANGE, exchange_type="fanout"
)
mock_pika["channel"].queue_declare.assert_called_once_with(queue="", exclusive=True)
mock_pika["channel"].queue_bind.assert_called_once_with(
queue="", exchange=broker_config.STATUS_EXCHANGE
)
def test_prepare_msg_check_dict(mock_dict):
mock_channel = Mock()
pth = "/pth/to/file"
prepare_msg(mock_channel, mock_dict, pth)
call_args = mock_channel.basic_publish.call_args[1]
body_dict = json.loads(call_args["body"].decode())
start_id = mock_dict["start_pulseid"]
stop_id = mock_dict["stop_pulseid"]
endstation = mock_dict["endstation"]
pgroup = mock_dict["pgroup"]
request_time = mock_dict["request_time"]
metadata = {
"general/created": request_time,
"general/instrument": endstation,
"general/process": "res.send_redo_msg",
"general/user": pgroup[1:6]
}
assert body_dict == {
"writer_type": "apocalypse",
"start_pulseid": start_id,
"stop_pulseid": stop_id,
"pgroup": pgroup,
"request_time": request_time,
"endstation": endstation,
"metadata": metadata,
"output_file": pth
}
assert call_args["routing_key"] == ""
@patch("res.send_redo_msg.BlockingConnection")
@patch("res.send_redo_msg.ConnectionParameters")
@patch("res.send_redo_msg.sleep", return_value=0)
def test_send_msg_check_files(
mock_sleep, mock_params, mock_connection_class, mock_pika, mock_args, mock_dict
):
mock_connection_class.return_value = mock_pika["connection_class"].return_value
mock_args.files_meta = ["acq1.json", "acq2.json"]
mock_args.files = [["acq1.h5"], ["acq2.h5"]]
mock_dict2 = copy.deepcopy(mock_dict)
mock_dict2["beamline"] = mock_dict2["endstation"]
m = mock_open(read_data=json.dumps(mock_dict2))
with patch("builtins.open", m), patch("res.send_redo_msg.prepare_msg") as mock_prepare_msg:
send_msg(mock_args)
assert mock_prepare_msg.call_count == 2
mock_prepare_msg.assert_any_call(mock_pika["channel"], mock_dict, "acq1.h5")
mock_prepare_msg.assert_any_call(mock_pika["channel"], mock_dict, "acq2.h5")