From 13e55edfc89e3cad5c0a54ac0407e6d59e37996e Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 12 Aug 2020 11:34:43 +0200 Subject: [PATCH] Implement consumer queue boilerplate --- epics-writer/epics-writer/start.py | 84 ++++++++++++++++++++---------- 1 file changed, 56 insertions(+), 28 deletions(-) diff --git a/epics-writer/epics-writer/start.py b/epics-writer/epics-writer/start.py index 4bcadcb..ddcff27 100644 --- a/epics-writer/epics-writer/start.py +++ b/epics-writer/epics-writer/start.py @@ -1,35 +1,73 @@ import json -from pika import BlockingConnection, ConnectionParameters +from pika import BlockingConnection, ConnectionParameters, BasicProperties + +REQUEST_EXCHANGE = "request" +STATUS_EXCHANGE = "status" +QUEUE_NAME = "epics" +OUTPUT_FILE_SUFFIX = ".epics.h5" + + +def update_status(channel, body, action, file, message=None): + + status_header = { + "action": action, + "source": "epics-writer", + "routing_key": QUEUE_NAME, + "file": file, + "message": message + } + + channel.basic_publish(exchange=STATUS_EXCHANGE, + properties=BasicProperties( + headers=status_header), + routing_key="", + body=body) def on_message(channel, method_frame, header_frame, body): - request = json.loads(body.decode()) - epics_pvs = request.get("epics_pvs") - if not epics_pvs: + try: + request = json.loads(body.decode()) + output_prefix = request["output_prefix"] + start_pulse_id = 1 + stop_pulse_id = 10 + metadata = request["metadata"] + epics_pvs = request["epics_pvs"] + + output_file = output_prefix + OUTPUT_FILE_SUFFIX + + update_status(channel, body, "write_start", output_file) + + except Exception as e: channel.basic_reject(delivery_tag=method_frame.delivery_tag, requeue=False) - # TODO: Send to status exchange. + update_status(channel, body, "write_rejected", str(e)) + + else: + channel.basic_ack(delivery_tag=method_frame.delivery_tag) + update_status(channel, body, "write_finished", output_file) - channel.basic_ack(delivery_tag=method_frame.delivery_tag) - - -def connect_to_broker(broker_url, exchange_name, queue_name): +def connect_to_broker(broker_url): connection = BlockingConnection(ConnectionParameters(broker_url)) channel = connection.channel() - channel.exchange_declare(exchange=exchange_name, + channel.exchange_declare(exchange=REQUEST_EXCHANGE, exchange_type="topic") - channel.queue_declare(queue=queue_name, auto_delete=True) - channel.queue_bind(queue=queue_name, - exchange=exchange_name, - routing_key="*.%s.*" % queue_name) + channel.exchange_declare(exchange=STATUS_EXCHANGE, + exchange_type="fanout") - channel.basic_consume(queue_name, on_message) + channel.queue_declare(queue=QUEUE_NAME, auto_delete=True) + channel.queue_bind(queue=QUEUE_NAME, + exchange=REQUEST_EXCHANGE, + routing_key="*.%s.*" % QUEUE_NAME, + auto_detele=True) + + channel.basic_qos(prefetch_count=1) + channel.basic_consume(QUEUE_NAME, on_message) try: channel.start_consuming() @@ -37,29 +75,19 @@ def connect_to_broker(broker_url, exchange_name, queue_name): channel.stop_consuming() -BROKER_URL = "127.0.0.1" -EXCHANGE_NAME = "request" -QUEUE_NAME = "epics" +DEFAULT_BROKER_URL = "127.0.0.1" def main(): import argparse parser = argparse.ArgumentParser(description='Epics HDF5 writer') parser.add_argument('--broker_url', dest='broker_url', - default=BROKER_URL, + default=DEFAULT_BROKER_URL, help='RabbitMQ broker URL') - parser.add_argument('--exchange_name', dest='exchange_name', - default=EXCHANGE_NAME, - help='Name of the request exchange.') - parser.add_argument('--queue_name', dest='queue_name', - default=QUEUE_NAME, - help='Name of the queue to connect to') args = parser.parse_args() - connect_to_broker(broker_url=args.broker_url, - exchange_name=args.exchange_name, - queue_name=args.queue_name) + connect_to_broker(broker_url=args.broker_url) if __name__ == '__main__':