mirror of
https://github.com/paulscherrerinstitute/sf_daq_broker.git
synced 2026-05-22 05:40:54 +02:00
Add async implementation of Pika worker
This commit is contained in:
@@ -3,6 +3,8 @@ import json
|
||||
import logging
|
||||
|
||||
from datetime import datetime
|
||||
from functools import partial
|
||||
from threading import Thread
|
||||
from time import time, sleep
|
||||
from pika import BlockingConnection, ConnectionParameters, BasicProperties
|
||||
|
||||
@@ -117,7 +119,20 @@ def update_status(channel, body, action, file, message=None):
|
||||
body=body)
|
||||
|
||||
|
||||
def on_broker_message(channel, method_frame, header_frame, body):
|
||||
def confirm_request(channel, method_frame, body, output_file):
|
||||
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
|
||||
|
||||
update_status(channel, body, "write_finished", output_file)
|
||||
|
||||
|
||||
def reject_request(channel, method_frame, body, output_file, e):
|
||||
channel.basic_reject(delivery_tag=method_frame.delivery_tag,
|
||||
requeue=False)
|
||||
|
||||
update_status(channel, body, "write_rejected", output_file, str(e))
|
||||
|
||||
|
||||
def on_broker_message(channel, method_frame, header_frame, body, connection):
|
||||
|
||||
output_file = None
|
||||
|
||||
@@ -127,21 +142,27 @@ def on_broker_message(channel, method_frame, header_frame, body):
|
||||
output_file = request["output_file"]
|
||||
update_status(channel, body, "write_start", output_file)
|
||||
|
||||
process_request(request)
|
||||
def process_async():
|
||||
try:
|
||||
process_request(request)
|
||||
|
||||
except Exception as ex:
|
||||
_logger.exception("Error while trying to write a requested data.")
|
||||
|
||||
reject_request_f = partial(reject_request, channel, method_frame, body, output_file, ex)
|
||||
connection.add_callback_threadsafe(reject_request_f)
|
||||
|
||||
else:
|
||||
confirm_request_f = partial(confirm_request, channel, method_frame, body, output_file)
|
||||
connection.add_callback_threadsafe(confirm_request_f)
|
||||
|
||||
thread = Thread(target=process_async)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
|
||||
except Exception as e:
|
||||
|
||||
_logger.exception("Error while trying to write a requested data.")
|
||||
|
||||
channel.basic_reject(delivery_tag=method_frame.delivery_tag,
|
||||
requeue=False)
|
||||
|
||||
update_status(channel, body, "write_rejected", output_file, str(e))
|
||||
|
||||
else:
|
||||
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
|
||||
|
||||
update_status(channel, body, "write_finished", output_file)
|
||||
reject_request(channel, method_frame, body, output_file, e)
|
||||
|
||||
|
||||
def start_service(broker_url):
|
||||
@@ -160,7 +181,9 @@ def start_service(broker_url):
|
||||
routing_key="*.%s.*" % broker_config.BSREAD_QUEUE)
|
||||
|
||||
channel.basic_qos(prefetch_count=1)
|
||||
channel.basic_consume(broker_config.BSREAD_QUEUE, on_broker_message)
|
||||
|
||||
on_broker_message_f = partial(on_broker_message, connection=connection)
|
||||
channel.basic_consume(broker_config.BSREAD_QUEUE, on_broker_message_f)
|
||||
|
||||
try:
|
||||
channel.start_consuming()
|
||||
|
||||
Reference in New Issue
Block a user