diff --git a/sf_daq_broker/writer/start.py b/sf_daq_broker/writer/start.py index d2c0164..cadc927 100644 --- a/sf_daq_broker/writer/start.py +++ b/sf_daq_broker/writer/start.py @@ -3,6 +3,8 @@ import json import logging from datetime import datetime +from functools import partial +from threading import Thread from time import time, sleep from pika import BlockingConnection, ConnectionParameters, BasicProperties @@ -117,7 +119,20 @@ def update_status(channel, body, action, file, message=None): body=body) -def on_broker_message(channel, method_frame, header_frame, body): +def confirm_request(channel, method_frame, body, output_file): + channel.basic_ack(delivery_tag=method_frame.delivery_tag) + + update_status(channel, body, "write_finished", output_file) + + +def reject_request(channel, method_frame, body, output_file, e): + channel.basic_reject(delivery_tag=method_frame.delivery_tag, + requeue=False) + + update_status(channel, body, "write_rejected", output_file, str(e)) + + +def on_broker_message(channel, method_frame, header_frame, body, connection): output_file = None @@ -127,21 +142,27 @@ def on_broker_message(channel, method_frame, header_frame, body): output_file = request["output_file"] update_status(channel, body, "write_start", output_file) - process_request(request) + def process_async(): + try: + process_request(request) + + except Exception as ex: + _logger.exception("Error while trying to write a requested data.") + + reject_request_f = partial(reject_request, channel, method_frame, body, output_file, ex) + connection.add_callback_threadsafe(reject_request_f) + + else: + confirm_request_f = partial(confirm_request, channel, method_frame, body, output_file) + connection.add_callback_threadsafe(confirm_request_f) + + thread = Thread(target=process_async) + thread.daemon = True + thread.start() except Exception as e: - _logger.exception("Error while trying to write a requested data.") - - channel.basic_reject(delivery_tag=method_frame.delivery_tag, - requeue=False) - - update_status(channel, body, "write_rejected", output_file, str(e)) - - else: - channel.basic_ack(delivery_tag=method_frame.delivery_tag) - - update_status(channel, body, "write_finished", output_file) + reject_request(channel, method_frame, body, output_file, e) def start_service(broker_url): @@ -160,7 +181,9 @@ def start_service(broker_url): routing_key="*.%s.*" % broker_config.BSREAD_QUEUE) channel.basic_qos(prefetch_count=1) - channel.basic_consume(broker_config.BSREAD_QUEUE, on_broker_message) + + on_broker_message_f = partial(on_broker_message, connection=connection) + channel.basic_consume(broker_config.BSREAD_QUEUE, on_broker_message_f) try: channel.start_consuming()