# pylint:disable=missing-function-docstring import io import json from pathlib import Path from unittest.mock import MagicMock, Mock, patch import pytest from apocalypse import broker_config from apocalypse.arghandler import get_apo_input from apocalypse.send_done import parse_apo_meta, send_msg, setup_conn @pytest.fixture def mock_apo_input(): mock = Mock(spec=get_apo_input) mock.script_file = "test_script.py" mock.endstation = "test_endstation" mock.slurm_job = Path("/test/slurm_job") mock.send_done_pth = Path("/test/done") mock.header = { "action": "write_finished", "file": "/path/to/run0001/data/acq0001.h5", "routing_key": "" } mock.request = { "writer_type": "test_writer", "output_file": "/path/to/run0001/data/acq0001.h5", "start_pulse_id": 0, "stop_pulse_id": 100, "metadata": { "general/instrument": "test_endstation", "general/user": "p12345" } } mock.log_file = "/pth/to/somewhere" mock.template_file = Path("/test/template.jinja") mock.broker_url = "test-broker" mock.slurm_job_date = "_%Y_%m_%d" mock.writer_type = "test_writer" mock.slurm_params = "-c 5, --mem=0, -p 'athos', " mock.slurm_params_other = [" --nice=100, --account='foo''"] mock.pgroup = "p12345" mock.job_name = "$run/$acq/$script_file" return mock @pytest.fixture def mock_pika(): with ( patch("apocalypse.send_done.ConnectionParameters") as mock_params, patch("apocalypse.send_done.BlockingConnection") as mock_connection ): mock_channel = Mock() mock_queue = Mock() mock_queue.method.queue = "" mock_channel.queue_declare.return_value = mock_queue mock_connection_instance = Mock() mock_connection_instance.channel.return_value = mock_channel mock_connection.return_value = mock_connection_instance yield { "params": mock_params, "connection": mock_connection_instance, "connection_class": mock_connection, "channel": mock_channel } @patch("apocalypse.send_done.BlockingConnection") @patch("apocalypse.send_done.ConnectionParameters") def test_setup_conn_successful_connection(mock_params, mock_connection_class, mock_pika): broker_url = "foo" mock_connection_class.return_value = mock_pika["connection_class"].return_value channel, _conn = setup_conn(broker_url) mock_params.assert_called_once_with(broker_url) mock_connection_class.assert_called_once_with(mock_params.return_value) mock_pika["connection_class"].return_value.channel.assert_called_once() mock_pika["channel"].exchange_declare.assert_called_once_with( exchange=broker_config.STATUS_EXCHANGE, exchange_type="fanout" ) mock_pika["channel"].queue_declare.assert_called_once_with(queue="", exclusive=True) mock_pika["channel"].queue_bind.assert_called_once_with( queue="", exchange=broker_config.STATUS_EXCHANGE ) assert channel == mock_pika["channel"] @pytest.mark.parametrize( "error_method, error_message", [("connection_class", "Connection failed"), ("connection.channel", "Channel creation failed"), ("channel.exchange_declare", "Exchange declaration failed")] ) def test_setup_conn_errors(mock_pika, error_method, error_message): broker_url = "foo" if error_method == "connection_class": mock_pika["connection_class"].side_effect = Exception(error_message) elif error_method == "connection.channel": mock_pika["connection"].channel.side_effect = Exception(error_message) elif error_method == "channel.exchange_declare": mock_pika["channel"].exchange_declare.side_effect = Exception(error_message) with pytest.raises(Exception, match=error_message): setup_conn(broker_url) def test_send_msg_properties_req_suc(mock_apo_input): mock_channel = Mock() send_msg(mock_channel, mock_apo_input, "request_success", "d45678f-a1234e-5c6", 5) call_args = mock_channel.basic_publish.call_args[1] assert call_args["exchange"] == broker_config.STATUS_EXCHANGE properties = call_args["properties"] assert properties.correlation_id == "d45678f-a1234e-5c6" assert properties.headers == { "action": "request_success", "source": "apocalypse", "message": None } def test_send_msg_properties_req_st(mock_apo_input): mock_channel = Mock() send_msg(mock_channel, mock_apo_input, "request_start", "d45678f-a1234e-5c6", 5) call_args = mock_channel.basic_publish.call_args[1] assert call_args["exchange"] == broker_config.STATUS_EXCHANGE properties = call_args["properties"] assert properties.correlation_id == "d45678f-a1234e-5c6" assert properties.headers == { "action": "request_start", "source": "apocalypse", "message": None, "file": mock_apo_input.header["file"], "routing_key": "" } def test_send_msg_body_req_suc_no_file(mock_apo_input): mock_channel = Mock() send_msg(mock_channel, mock_apo_input, "request_success", "d45678f-a1234e-5c6", 5) call_args = mock_channel.basic_publish.call_args[1] body_dict = json.loads(call_args["body"].decode()) assert body_dict == { "writer_type": "processing", "metadata": { "general/instrument": "test_endstation", "general/user": "p12345" }, "apo_meta": { 'acq_number': '0001', 'run_number': '0001', "submit_tstamp": 5 } } assert call_args["routing_key"] == "" def test_send_msg_body_req_st(mock_apo_input): mock_channel = Mock() send_msg(mock_channel, mock_apo_input, "request_start", "d45678f-a1234e-5c6", 5) call_args = mock_channel.basic_publish.call_args[1] body_dict = json.loads(call_args["body"].decode()) processor = mock_apo_input.script_file arguments = mock_apo_input.__dict__ start_id = mock_apo_input.request["start_pulse_id"] stop_id = mock_apo_input.request["stop_pulse_id"] log_file = mock_apo_input.log_file metadata = mock_apo_input.request["metadata"] assert body_dict == { "writer_type": "processing", "processor": processor, "arguments": str(arguments), "start_pulse_id": start_id, "stop_pulse_id": stop_id, "log_file": log_file, "metadata": metadata, "apo_meta": { 'acq_number': '0001', 'run_number': '0001', "submit_tstamp": 5 } } assert call_args["routing_key"] == "" def test_send_msg_body_req_succ(mock_apo_input): mock_channel = Mock() mock_path = MagicMock(spec=Path) mock_path.parent.glob.return_value = [MagicMock(spec=Path)] mock_meta = { "key": "value", "num": 2 } with ( patch("apocalypse.send_done.Path", return_value=mock_path), patch("apocalypse.send_done.parse_apo_meta", return_value=mock_meta) ): send_msg(mock_channel, mock_apo_input, "request_success", "d45678f-a1234e-5c6", 5) call_args = mock_channel.basic_publish.call_args[1] body_dict = json.loads(call_args["body"].decode()) assert body_dict == { "writer_type": "processing", "metadata": { "general/instrument": "test_endstation", "general/user": "p12345" }, "apo_meta": { 'acq_number': '0001', 'run_number': '0001', "submit_tstamp": 5, 'key': 'value', 'num': 2 } } assert call_args["routing_key"] == "" def test_send_msg_body_req_succ_double_meta(mock_apo_input): mock_channel = Mock() mock_path = MagicMock(spec=Path) mock_path.parent.glob.return_value = ["pth1", "pth2"] mock_meta = { "key": "value", "num": 2 } with ( patch("apocalypse.send_done.Path", return_value=mock_path), patch("apocalypse.send_done.parse_apo_meta", return_value=mock_meta), patch("apocalypse.send_done.logger") as mock_logger ): send_msg(mock_channel, mock_apo_input, "request_success", "d45678f-a1234e-5c6", 5) call_args = mock_channel.basic_publish.call_args[1] body_dict = json.loads(call_args["body"].decode()) assert body_dict == { "writer_type": "processing", "metadata": { "general/instrument": "test_endstation", "general/user": "p12345" }, "apo_meta": { 'acq_number': '0001', 'run_number': '0001', "submit_tstamp": 5, 'key': 'value', 'num': 2 } } assert call_args["routing_key"] == "" mock_logger.debug.assert_called_with( "there were more than one meta files ['pth1', 'pth2'], took 1st one" ) @pytest.mark.parametrize( "content, expected, suffix", [('{"key": "value", "num": 2}', { "key": "value", "num": 2 }, ".json"), ("key: value\nnum: 2", { "key": "value", "num": 2 }, ".yaml"), ("plain meta txt", { "table": { "message": "plain meta txt" } }, ".txt")] ) def test_parse_apo_meta(content, expected, suffix): string_io = io.StringIO(content) mock_path = MagicMock(spec=Path) mock_path.suffix = suffix with patch("builtins.open", return_value=string_io): result = parse_apo_meta(mock_path) assert result == expected def test_parse_apo_meta_raise_err(): string_io = io.StringIO('"key": "value", "num": 2') mock_path = MagicMock(spec=Path) mock_path.suffix = ".json" with patch("builtins.open", return_value=string_io): result = parse_apo_meta(mock_path) assert result == { "table": { "message": '"key": "value", "num": 2' } }