All checks were successful
Run apocalypse tests / Explore-Gitea-Actions (push) Successful in 10s
138 lines
4.5 KiB
Python
138 lines
4.5 KiB
Python
# pylint:disable=missing-function-docstring
|
|
import copy
|
|
import json
|
|
from unittest.mock import MagicMock, Mock, mock_open, patch
|
|
|
|
import pytest
|
|
|
|
from apocalypse import broker_config
|
|
from res.send_redo_msg import prepare_msg, send_msg
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_pika():
|
|
with (
|
|
patch("res.send_redo_msg.ConnectionParameters") as mock_params,
|
|
patch("res.send_redo_msg.BlockingConnection") as mock_connection
|
|
):
|
|
|
|
mock_channel = Mock()
|
|
mock_queue = Mock()
|
|
mock_queue.method.queue = ""
|
|
mock_channel.queue_declare.return_value = mock_queue
|
|
|
|
mock_channel_context = Mock()
|
|
mock_channel_context.__enter__ = Mock(return_value=mock_channel)
|
|
mock_channel_context.__exit__ = Mock()
|
|
|
|
mock_connection_instance = Mock()
|
|
mock_connection_instance.channel = Mock(return_value=mock_channel_context)
|
|
|
|
mock_connection.return_value = Mock()
|
|
mock_connection.return_value.__enter__ = Mock(return_value=mock_connection_instance)
|
|
mock_connection.return_value.__exit__ = Mock()
|
|
|
|
yield {
|
|
"params": mock_params,
|
|
"connection": mock_connection_instance,
|
|
"connection_class": mock_connection,
|
|
"channel": mock_channel
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_args():
|
|
args = MagicMock()
|
|
args.paths = []
|
|
args.files_meta = []
|
|
args.files = []
|
|
args.run = ["0045", "0100"]
|
|
args.acq = ["0001", "0002"]
|
|
args.file_suffix = "test"
|
|
args.broker_url = "foo"
|
|
return args
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_dict():
|
|
mock_data = {
|
|
"pgroup": "p20000",
|
|
"start_pulseid": 1,
|
|
"stop_pulseid": 100,
|
|
"endstation": "test",
|
|
"request_time": " "
|
|
}
|
|
return mock_data
|
|
|
|
|
|
@patch("res.send_redo_msg.BlockingConnection")
|
|
@patch("res.send_redo_msg.ConnectionParameters")
|
|
def test_send_msg__successful_connection(mock_params, mock_connection_class, mock_pika, mock_args):
|
|
|
|
mock_connection_class.return_value = mock_pika["connection_class"].return_value
|
|
|
|
send_msg(mock_args)
|
|
mock_params.assert_called_once_with(mock_args.broker_url)
|
|
mock_connection_class.assert_called_once_with(mock_params.return_value)
|
|
mock_pika["connection_class"].return_value.__enter__.assert_called_once()
|
|
mock_pika["channel"].exchange_declare.assert_called_once_with(
|
|
exchange=broker_config.STATUS_EXCHANGE, exchange_type="fanout"
|
|
)
|
|
mock_pika["channel"].queue_declare.assert_called_once_with(queue="", exclusive=True)
|
|
mock_pika["channel"].queue_bind.assert_called_once_with(
|
|
queue="", exchange=broker_config.STATUS_EXCHANGE
|
|
)
|
|
|
|
|
|
def test_prepare_msg_check_dict(mock_dict):
|
|
mock_channel = Mock()
|
|
pth = "/pth/to/file"
|
|
prepare_msg(mock_channel, mock_dict, pth)
|
|
call_args = mock_channel.basic_publish.call_args[1]
|
|
body_dict = json.loads(call_args["body"].decode())
|
|
start_id = mock_dict["start_pulseid"]
|
|
stop_id = mock_dict["stop_pulseid"]
|
|
endstation = mock_dict["endstation"]
|
|
pgroup = mock_dict["pgroup"]
|
|
request_time = mock_dict["request_time"]
|
|
metadata = {
|
|
"general/created": request_time,
|
|
"general/instrument": endstation,
|
|
"general/process": "res.send_redo_msg",
|
|
"general/user": pgroup[1:6]
|
|
}
|
|
assert body_dict == {
|
|
"writer_type": "apocalypse",
|
|
"start_pulseid": start_id,
|
|
"stop_pulseid": stop_id,
|
|
"pgroup": pgroup,
|
|
"request_time": request_time,
|
|
"endstation": endstation,
|
|
"metadata": metadata,
|
|
"output_file": pth
|
|
}
|
|
assert call_args["routing_key"] == ""
|
|
|
|
|
|
@patch("res.send_redo_msg.BlockingConnection")
|
|
@patch("res.send_redo_msg.ConnectionParameters")
|
|
@patch("res.send_redo_msg.sleep", return_value=0)
|
|
def test_send_msg_check_files(
|
|
mock_sleep, mock_params, mock_connection_class, mock_pika, mock_args, mock_dict
|
|
):
|
|
|
|
mock_connection_class.return_value = mock_pika["connection_class"].return_value
|
|
|
|
mock_args.files_meta = ["acq1.json", "acq2.json"]
|
|
mock_args.files = [["acq1.h5"], ["acq2.h5"]]
|
|
mock_dict2 = copy.deepcopy(mock_dict)
|
|
mock_dict2["beamline"] = mock_dict2["endstation"]
|
|
m = mock_open(read_data=json.dumps(mock_dict2))
|
|
|
|
with patch("builtins.open", m), patch("res.send_redo_msg.prepare_msg") as mock_prepare_msg:
|
|
send_msg(mock_args)
|
|
|
|
assert mock_prepare_msg.call_count == 2
|
|
mock_prepare_msg.assert_any_call(mock_pika["channel"], mock_dict, "acq1.h5")
|
|
mock_prepare_msg.assert_any_call(mock_pika["channel"], mock_dict, "acq2.h5")
|