Files
apocalypse/tests/apo/test_send_msg.py
woznic_n 11b3c38038
All checks were successful
Run apocalypse tests / Explore-Gitea-Actions (push) Successful in 10s
bugfix
2025-09-12 17:48:01 +02:00

300 lines
10 KiB
Python

# pylint:disable=missing-function-docstring
import io
import json
from pathlib import Path
from unittest.mock import MagicMock, Mock, patch
import pytest
from apocalypse import broker_config
from apocalypse.arghandler import get_apo_input
from apocalypse.send_done import parse_apo_meta, send_msg, setup_conn
@pytest.fixture
def mock_apo_input():
mock = Mock(spec=get_apo_input)
mock.script_file = "test_script.py"
mock.endstation = "test_endstation"
mock.slurm_job = Path("/test/slurm_job")
mock.send_done_pth = Path("/test/done")
mock.header = {
"action": "write_finished",
"file": "/path/to/run0001/data/acq0001.h5",
"routing_key": ""
}
mock.request = {
"writer_type": "test_writer",
"output_file": "/path/to/run0001/data/acq0001.h5",
"start_pulse_id": 0,
"stop_pulse_id": 100,
"metadata": {
"general/instrument": "test_endstation",
"general/user": "p12345"
}
}
mock.log_file = "/pth/to/somewhere"
mock.template_file = Path("/test/template.jinja")
mock.broker_url = "test-broker"
mock.slurm_job_date = "_%Y_%m_%d"
mock.writer_type = "test_writer"
mock.slurm_params = "-c 5, --mem=0, -p 'athos', "
mock.slurm_params_other = [" --nice=100, --account='foo''"]
mock.pgroup = "p12345"
mock.job_name = "$run/$acq/$script_file"
return mock
@pytest.fixture
def mock_pika():
with (
patch("apocalypse.send_done.ConnectionParameters") as mock_params,
patch("apocalypse.send_done.BlockingConnection") as mock_connection
):
mock_channel = Mock()
mock_queue = Mock()
mock_queue.method.queue = ""
mock_channel.queue_declare.return_value = mock_queue
mock_connection_instance = Mock()
mock_connection_instance.channel.return_value = mock_channel
mock_connection.return_value = mock_connection_instance
yield {
"params": mock_params,
"connection": mock_connection_instance,
"connection_class": mock_connection,
"channel": mock_channel
}
@patch("apocalypse.send_done.BlockingConnection")
@patch("apocalypse.send_done.ConnectionParameters")
def test_setup_conn_successful_connection(mock_params, mock_connection_class, mock_pika):
broker_url = "foo"
mock_connection_class.return_value = mock_pika["connection_class"].return_value
channel, _conn = setup_conn(broker_url)
mock_params.assert_called_once_with(broker_url)
mock_connection_class.assert_called_once_with(mock_params.return_value)
mock_pika["connection_class"].return_value.channel.assert_called_once()
mock_pika["channel"].exchange_declare.assert_called_once_with(
exchange=broker_config.STATUS_EXCHANGE, exchange_type="fanout"
)
mock_pika["channel"].queue_declare.assert_called_once_with(queue="", exclusive=True)
mock_pika["channel"].queue_bind.assert_called_once_with(
queue="", exchange=broker_config.STATUS_EXCHANGE
)
assert channel == mock_pika["channel"]
@pytest.mark.parametrize(
"error_method, error_message",
[("connection_class", "Connection failed"), ("connection.channel", "Channel creation failed"),
("channel.exchange_declare", "Exchange declaration failed")]
)
def test_setup_conn_errors(mock_pika, error_method, error_message):
broker_url = "foo"
if error_method == "connection_class":
mock_pika["connection_class"].side_effect = Exception(error_message)
elif error_method == "connection.channel":
mock_pika["connection"].channel.side_effect = Exception(error_message)
elif error_method == "channel.exchange_declare":
mock_pika["channel"].exchange_declare.side_effect = Exception(error_message)
with pytest.raises(Exception, match=error_message):
setup_conn(broker_url)
def test_send_msg_properties_req_suc(mock_apo_input):
mock_channel = Mock()
send_msg(mock_channel, mock_apo_input, "request_success", "d45678f-a1234e-5c6", 5)
call_args = mock_channel.basic_publish.call_args[1]
assert call_args["exchange"] == broker_config.STATUS_EXCHANGE
properties = call_args["properties"]
assert properties.correlation_id == "d45678f-a1234e-5c6"
assert properties.headers == {
"action": "request_success",
"source": "apocalypse",
"message": None
}
def test_send_msg_properties_req_st(mock_apo_input):
mock_channel = Mock()
send_msg(mock_channel, mock_apo_input, "request_start", "d45678f-a1234e-5c6", 5)
call_args = mock_channel.basic_publish.call_args[1]
assert call_args["exchange"] == broker_config.STATUS_EXCHANGE
properties = call_args["properties"]
assert properties.correlation_id == "d45678f-a1234e-5c6"
assert properties.headers == {
"action": "request_start",
"source": "apocalypse",
"message": None,
"file": mock_apo_input.header["file"],
"routing_key": ""
}
def test_send_msg_body_req_suc_no_file(mock_apo_input):
mock_channel = Mock()
send_msg(mock_channel, mock_apo_input, "request_success", "d45678f-a1234e-5c6", 5)
call_args = mock_channel.basic_publish.call_args[1]
body_dict = json.loads(call_args["body"].decode())
assert body_dict == {
"writer_type": "processing",
"metadata": {
"general/instrument": "test_endstation",
"general/user": "p12345"
},
"apo_meta": {
'acq_number': '0001',
'run_number': '0001',
"submit_tstamp": 5
}
}
assert call_args["routing_key"] == ""
def test_send_msg_body_req_st(mock_apo_input):
mock_channel = Mock()
send_msg(mock_channel, mock_apo_input, "request_start", "d45678f-a1234e-5c6", 5)
call_args = mock_channel.basic_publish.call_args[1]
body_dict = json.loads(call_args["body"].decode())
processor = mock_apo_input.script_file
arguments = mock_apo_input.__dict__
start_id = mock_apo_input.request["start_pulse_id"]
stop_id = mock_apo_input.request["stop_pulse_id"]
log_file = mock_apo_input.log_file
metadata = mock_apo_input.request["metadata"]
assert body_dict == {
"writer_type": "processing",
"processor": processor,
"arguments": str(arguments),
"start_pulse_id": start_id,
"stop_pulse_id": stop_id,
"log_file": log_file,
"metadata": metadata,
"apo_meta": {
'acq_number': '0001',
'run_number': '0001',
"submit_tstamp": 5
}
}
assert call_args["routing_key"] == ""
def test_send_msg_body_req_succ(mock_apo_input):
mock_channel = Mock()
mock_path = MagicMock(spec=Path)
mock_path.parent.glob.return_value = [MagicMock(spec=Path)]
mock_meta = {
"key": "value",
"num": 2
}
with (
patch("apocalypse.send_done.Path", return_value=mock_path),
patch("apocalypse.send_done.parse_apo_meta", return_value=mock_meta)
):
send_msg(mock_channel, mock_apo_input, "request_success", "d45678f-a1234e-5c6", 5)
call_args = mock_channel.basic_publish.call_args[1]
body_dict = json.loads(call_args["body"].decode())
assert body_dict == {
"writer_type": "processing",
"metadata": {
"general/instrument": "test_endstation",
"general/user": "p12345"
},
"apo_meta": {
'acq_number': '0001',
'run_number': '0001',
"submit_tstamp": 5,
'key': 'value',
'num': 2
}
}
assert call_args["routing_key"] == ""
def test_send_msg_body_req_succ_double_meta(mock_apo_input):
mock_channel = Mock()
mock_path = MagicMock(spec=Path)
mock_path.parent.glob.return_value = ["pth1", "pth2"]
mock_meta = {
"key": "value",
"num": 2
}
with (
patch("apocalypse.send_done.Path", return_value=mock_path),
patch("apocalypse.send_done.parse_apo_meta", return_value=mock_meta),
patch("apocalypse.send_done.logger") as mock_logger
):
send_msg(mock_channel, mock_apo_input, "request_success", "d45678f-a1234e-5c6", 5)
call_args = mock_channel.basic_publish.call_args[1]
body_dict = json.loads(call_args["body"].decode())
assert body_dict == {
"writer_type": "processing",
"metadata": {
"general/instrument": "test_endstation",
"general/user": "p12345"
},
"apo_meta": {
'acq_number': '0001',
'run_number': '0001',
"submit_tstamp": 5,
'key': 'value',
'num': 2
}
}
assert call_args["routing_key"] == ""
mock_logger.debug.assert_called_with(
"there were more than one meta files ['pth1', 'pth2'], took 1st one"
)
@pytest.mark.parametrize(
"content, expected, suffix",
[('{"key": "value", "num": 2}', {
"key": "value",
"num": 2
}, ".json"), ("key: value\nnum: 2", {
"key": "value",
"num": 2
}, ".yaml"), ("plain meta txt", {
"table": {
"message": "plain meta txt"
}
}, ".txt")]
)
def test_parse_apo_meta(content, expected, suffix):
string_io = io.StringIO(content)
mock_path = MagicMock(spec=Path)
mock_path.suffix = suffix
with patch("builtins.open", return_value=string_io):
result = parse_apo_meta(mock_path)
assert result == expected
def test_parse_apo_meta_raise_err():
string_io = io.StringIO('"key": "value", "num": 2')
mock_path = MagicMock(spec=Path)
mock_path.suffix = ".json"
with patch("builtins.open", return_value=string_io):
result = parse_apo_meta(mock_path)
assert result == {
"table": {
"message": '"key": "value", "num": 2'
}
}