All checks were successful
Run apocalypse tests / Explore-Gitea-Actions (push) Successful in 10s
300 lines
10 KiB
Python
300 lines
10 KiB
Python
# pylint:disable=missing-function-docstring
|
|
import io
|
|
import json
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, Mock, patch
|
|
|
|
import pytest
|
|
|
|
from apocalypse import broker_config
|
|
from apocalypse.arghandler import get_apo_input
|
|
from apocalypse.send_done import parse_apo_meta, send_msg, setup_conn
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_apo_input():
|
|
mock = Mock(spec=get_apo_input)
|
|
mock.script_file = "test_script.py"
|
|
mock.endstation = "test_endstation"
|
|
mock.slurm_job = Path("/test/slurm_job")
|
|
mock.send_done_pth = Path("/test/done")
|
|
mock.header = {
|
|
"action": "write_finished",
|
|
"file": "/path/to/run0001/data/acq0001.h5",
|
|
"routing_key": ""
|
|
}
|
|
mock.request = {
|
|
"writer_type": "test_writer",
|
|
"output_file": "/path/to/run0001/data/acq0001.h5",
|
|
"start_pulse_id": 0,
|
|
"stop_pulse_id": 100,
|
|
"metadata": {
|
|
"general/instrument": "test_endstation",
|
|
"general/user": "p12345"
|
|
}
|
|
}
|
|
mock.log_file = "/pth/to/somewhere"
|
|
mock.template_file = Path("/test/template.jinja")
|
|
mock.broker_url = "test-broker"
|
|
mock.slurm_job_date = "_%Y_%m_%d"
|
|
mock.writer_type = "test_writer"
|
|
mock.slurm_params = "-c 5, --mem=0, -p 'athos', "
|
|
mock.slurm_params_other = [" --nice=100, --account='foo''"]
|
|
mock.pgroup = "p12345"
|
|
mock.job_name = "$run/$acq/$script_file"
|
|
return mock
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_pika():
|
|
with (
|
|
patch("apocalypse.send_done.ConnectionParameters") as mock_params,
|
|
patch("apocalypse.send_done.BlockingConnection") as mock_connection
|
|
):
|
|
mock_channel = Mock()
|
|
mock_queue = Mock()
|
|
mock_queue.method.queue = ""
|
|
mock_channel.queue_declare.return_value = mock_queue
|
|
|
|
mock_connection_instance = Mock()
|
|
mock_connection_instance.channel.return_value = mock_channel
|
|
mock_connection.return_value = mock_connection_instance
|
|
|
|
yield {
|
|
"params": mock_params,
|
|
"connection": mock_connection_instance,
|
|
"connection_class": mock_connection,
|
|
"channel": mock_channel
|
|
}
|
|
|
|
|
|
@patch("apocalypse.send_done.BlockingConnection")
|
|
@patch("apocalypse.send_done.ConnectionParameters")
|
|
def test_setup_conn_successful_connection(mock_params, mock_connection_class, mock_pika):
|
|
broker_url = "foo"
|
|
mock_connection_class.return_value = mock_pika["connection_class"].return_value
|
|
channel, _conn = setup_conn(broker_url)
|
|
mock_params.assert_called_once_with(broker_url)
|
|
mock_connection_class.assert_called_once_with(mock_params.return_value)
|
|
mock_pika["connection_class"].return_value.channel.assert_called_once()
|
|
mock_pika["channel"].exchange_declare.assert_called_once_with(
|
|
exchange=broker_config.STATUS_EXCHANGE, exchange_type="fanout"
|
|
)
|
|
mock_pika["channel"].queue_declare.assert_called_once_with(queue="", exclusive=True)
|
|
mock_pika["channel"].queue_bind.assert_called_once_with(
|
|
queue="", exchange=broker_config.STATUS_EXCHANGE
|
|
)
|
|
assert channel == mock_pika["channel"]
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"error_method, error_message",
|
|
[("connection_class", "Connection failed"), ("connection.channel", "Channel creation failed"),
|
|
("channel.exchange_declare", "Exchange declaration failed")]
|
|
)
|
|
def test_setup_conn_errors(mock_pika, error_method, error_message):
|
|
broker_url = "foo"
|
|
|
|
if error_method == "connection_class":
|
|
mock_pika["connection_class"].side_effect = Exception(error_message)
|
|
elif error_method == "connection.channel":
|
|
mock_pika["connection"].channel.side_effect = Exception(error_message)
|
|
elif error_method == "channel.exchange_declare":
|
|
mock_pika["channel"].exchange_declare.side_effect = Exception(error_message)
|
|
|
|
with pytest.raises(Exception, match=error_message):
|
|
setup_conn(broker_url)
|
|
|
|
|
|
def test_send_msg_properties_req_suc(mock_apo_input):
|
|
mock_channel = Mock()
|
|
send_msg(mock_channel, mock_apo_input, "request_success", "d45678f-a1234e-5c6", 5)
|
|
call_args = mock_channel.basic_publish.call_args[1]
|
|
|
|
assert call_args["exchange"] == broker_config.STATUS_EXCHANGE
|
|
properties = call_args["properties"]
|
|
assert properties.correlation_id == "d45678f-a1234e-5c6"
|
|
assert properties.headers == {
|
|
"action": "request_success",
|
|
"source": "apocalypse",
|
|
"message": None
|
|
}
|
|
|
|
|
|
def test_send_msg_properties_req_st(mock_apo_input):
|
|
mock_channel = Mock()
|
|
send_msg(mock_channel, mock_apo_input, "request_start", "d45678f-a1234e-5c6", 5)
|
|
call_args = mock_channel.basic_publish.call_args[1]
|
|
|
|
assert call_args["exchange"] == broker_config.STATUS_EXCHANGE
|
|
properties = call_args["properties"]
|
|
assert properties.correlation_id == "d45678f-a1234e-5c6"
|
|
assert properties.headers == {
|
|
"action": "request_start",
|
|
"source": "apocalypse",
|
|
"message": None,
|
|
"file": mock_apo_input.header["file"],
|
|
"routing_key": ""
|
|
}
|
|
|
|
|
|
def test_send_msg_body_req_suc_no_file(mock_apo_input):
|
|
mock_channel = Mock()
|
|
send_msg(mock_channel, mock_apo_input, "request_success", "d45678f-a1234e-5c6", 5)
|
|
call_args = mock_channel.basic_publish.call_args[1]
|
|
|
|
body_dict = json.loads(call_args["body"].decode())
|
|
assert body_dict == {
|
|
"writer_type": "processing",
|
|
"metadata": {
|
|
"general/instrument": "test_endstation",
|
|
"general/user": "p12345"
|
|
},
|
|
"apo_meta": {
|
|
'acq_number': '0001',
|
|
'run_number': '0001',
|
|
"submit_tstamp": 5
|
|
}
|
|
}
|
|
assert call_args["routing_key"] == ""
|
|
|
|
|
|
def test_send_msg_body_req_st(mock_apo_input):
|
|
mock_channel = Mock()
|
|
send_msg(mock_channel, mock_apo_input, "request_start", "d45678f-a1234e-5c6", 5)
|
|
call_args = mock_channel.basic_publish.call_args[1]
|
|
|
|
body_dict = json.loads(call_args["body"].decode())
|
|
processor = mock_apo_input.script_file
|
|
arguments = mock_apo_input.__dict__
|
|
start_id = mock_apo_input.request["start_pulse_id"]
|
|
stop_id = mock_apo_input.request["stop_pulse_id"]
|
|
log_file = mock_apo_input.log_file
|
|
metadata = mock_apo_input.request["metadata"]
|
|
assert body_dict == {
|
|
"writer_type": "processing",
|
|
"processor": processor,
|
|
"arguments": str(arguments),
|
|
"start_pulse_id": start_id,
|
|
"stop_pulse_id": stop_id,
|
|
"log_file": log_file,
|
|
"metadata": metadata,
|
|
"apo_meta": {
|
|
'acq_number': '0001',
|
|
'run_number': '0001',
|
|
"submit_tstamp": 5
|
|
}
|
|
}
|
|
assert call_args["routing_key"] == ""
|
|
|
|
|
|
def test_send_msg_body_req_succ(mock_apo_input):
|
|
mock_channel = Mock()
|
|
mock_path = MagicMock(spec=Path)
|
|
mock_path.parent.glob.return_value = [MagicMock(spec=Path)]
|
|
mock_meta = {
|
|
"key": "value",
|
|
"num": 2
|
|
}
|
|
|
|
with (
|
|
patch("apocalypse.send_done.Path", return_value=mock_path),
|
|
patch("apocalypse.send_done.parse_apo_meta", return_value=mock_meta)
|
|
):
|
|
send_msg(mock_channel, mock_apo_input, "request_success", "d45678f-a1234e-5c6", 5)
|
|
call_args = mock_channel.basic_publish.call_args[1]
|
|
body_dict = json.loads(call_args["body"].decode())
|
|
|
|
assert body_dict == {
|
|
"writer_type": "processing",
|
|
"metadata": {
|
|
"general/instrument": "test_endstation",
|
|
"general/user": "p12345"
|
|
},
|
|
"apo_meta": {
|
|
'acq_number': '0001',
|
|
'run_number': '0001',
|
|
"submit_tstamp": 5,
|
|
'key': 'value',
|
|
'num': 2
|
|
}
|
|
}
|
|
assert call_args["routing_key"] == ""
|
|
|
|
|
|
def test_send_msg_body_req_succ_double_meta(mock_apo_input):
|
|
mock_channel = Mock()
|
|
mock_path = MagicMock(spec=Path)
|
|
mock_path.parent.glob.return_value = ["pth1", "pth2"]
|
|
mock_meta = {
|
|
"key": "value",
|
|
"num": 2
|
|
}
|
|
|
|
with (
|
|
patch("apocalypse.send_done.Path", return_value=mock_path),
|
|
patch("apocalypse.send_done.parse_apo_meta", return_value=mock_meta),
|
|
patch("apocalypse.send_done.logger") as mock_logger
|
|
):
|
|
send_msg(mock_channel, mock_apo_input, "request_success", "d45678f-a1234e-5c6", 5)
|
|
call_args = mock_channel.basic_publish.call_args[1]
|
|
body_dict = json.loads(call_args["body"].decode())
|
|
|
|
assert body_dict == {
|
|
"writer_type": "processing",
|
|
"metadata": {
|
|
"general/instrument": "test_endstation",
|
|
"general/user": "p12345"
|
|
},
|
|
"apo_meta": {
|
|
'acq_number': '0001',
|
|
'run_number': '0001',
|
|
"submit_tstamp": 5,
|
|
'key': 'value',
|
|
'num': 2
|
|
}
|
|
}
|
|
assert call_args["routing_key"] == ""
|
|
mock_logger.debug.assert_called_with(
|
|
"there were more than one meta files ['pth1', 'pth2'], took 1st one"
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"content, expected, suffix",
|
|
[('{"key": "value", "num": 2}', {
|
|
"key": "value",
|
|
"num": 2
|
|
}, ".json"), ("key: value\nnum: 2", {
|
|
"key": "value",
|
|
"num": 2
|
|
}, ".yaml"), ("plain meta txt", {
|
|
"table": {
|
|
"message": "plain meta txt"
|
|
}
|
|
}, ".txt")]
|
|
)
|
|
def test_parse_apo_meta(content, expected, suffix):
|
|
string_io = io.StringIO(content)
|
|
mock_path = MagicMock(spec=Path)
|
|
mock_path.suffix = suffix
|
|
|
|
with patch("builtins.open", return_value=string_io):
|
|
result = parse_apo_meta(mock_path)
|
|
|
|
assert result == expected
|
|
|
|
|
|
def test_parse_apo_meta_raise_err():
|
|
string_io = io.StringIO('"key": "value", "num": 2')
|
|
mock_path = MagicMock(spec=Path)
|
|
mock_path.suffix = ".json"
|
|
|
|
with patch("builtins.open", return_value=string_io):
|
|
result = parse_apo_meta(mock_path)
|
|
assert result == {
|
|
"table": {
|
|
"message": '"key": "value", "num": 2'
|
|
}
|
|
}
|