From afa2eb48fda4b8b8975146c9cecc9526b528b23e Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 12 Aug 2020 13:47:22 +0200 Subject: [PATCH] Improve broker wrapper script --- epics-writer/epics-writer/start.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/epics-writer/epics-writer/start.py b/epics-writer/epics-writer/start.py index ddcff27..de84e51 100644 --- a/epics-writer/epics-writer/start.py +++ b/epics-writer/epics-writer/start.py @@ -2,14 +2,14 @@ import json from pika import BlockingConnection, ConnectionParameters, BasicProperties +DEFAULT_BROKER_URL = "127.0.0.1" REQUEST_EXCHANGE = "request" STATUS_EXCHANGE = "status" QUEUE_NAME = "epics" -OUTPUT_FILE_SUFFIX = ".epics.h5" +OUTPUT_FILE_SUFFIX = ".PVCHANNELS.h5" def update_status(channel, body, action, file, message=None): - status_header = { "action": action, "source": "epics-writer", @@ -27,6 +27,8 @@ def update_status(channel, body, action, file, message=None): def on_message(channel, method_frame, header_frame, body): + output_file = None + try: request = json.loads(body.decode()) output_prefix = request["output_prefix"] @@ -39,14 +41,17 @@ def on_message(channel, method_frame, header_frame, body): update_status(channel, body, "write_start", output_file) + # TODO: Call the actual writing. + except Exception as e: channel.basic_reject(delivery_tag=method_frame.delivery_tag, requeue=False) - update_status(channel, body, "write_rejected", str(e)) + update_status(channel, body, "write_rejected", output_file, str(e)) else: channel.basic_ack(delivery_tag=method_frame.delivery_tag) + update_status(channel, body, "write_finished", output_file) @@ -54,18 +59,15 @@ def connect_to_broker(broker_url): connection = BlockingConnection(ConnectionParameters(broker_url)) channel = connection.channel() - channel.exchange_declare(exchange=REQUEST_EXCHANGE, - exchange_type="topic") - channel.exchange_declare(exchange=STATUS_EXCHANGE, exchange_type="fanout") + channel.exchange_declare(exchange=REQUEST_EXCHANGE, + exchange_type="topic") channel.queue_declare(queue=QUEUE_NAME, auto_delete=True) channel.queue_bind(queue=QUEUE_NAME, exchange=REQUEST_EXCHANGE, - routing_key="*.%s.*" % QUEUE_NAME, - auto_detele=True) - + routing_key="*.%s.*" % QUEUE_NAME) channel.basic_qos(prefetch_count=1) channel.basic_consume(QUEUE_NAME, on_message) @@ -75,9 +77,6 @@ def connect_to_broker(broker_url): channel.stop_consuming() -DEFAULT_BROKER_URL = "127.0.0.1" - - def main(): import argparse parser = argparse.ArgumentParser(description='Epics HDF5 writer')