First working broker example

This commit is contained in:
2020-08-11 13:53:35 +02:00
parent cfb99ff17f
commit f4bd6a15f1
3 changed files with 57 additions and 53 deletions
@@ -1,6 +1,3 @@
import json
import pika
import data_api
import data_api.client
import requests
@@ -9,7 +6,7 @@ import time
import logging
from pika import BlockingConnection, ConnectionParameters
logger = logging.getLogger("logger")
@@ -154,53 +151,4 @@ def get_pulse_id_date_mapping(pulse_ids):
raise RuntimeError('Unable to retrieve mapping')
def on_message(channel, method_frame, header_frame, body):
print(method_frame.delivery_tag)
print(body)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
def connect_to_broker(broker_url, exchange_name, queue_name):
connection = BlockingConnection(ConnectionParameters(broker_url))
channel = connection.channel()
channel.queue_declare(queue=queue_name, )
channel.queue_bind(queue=queue_name,
exchange=exchange_name,
routing_key=QUEUE_NAME)
channel.basic_consume(queue_name, on_message)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
BROKER_URL = "localhost"
EXCHANGE_NAME = "request"
QUEUE_NAME = "epics"
def main():
import argparse
parser = argparse.ArgumentParser(description='Epics HDF5 writer')
parser.add_argument('--broker_url', dest='broker_url',
default=BROKER_URL,
help='RabbitMQ broker URL')
parser.add_argument('--exchange_name', dest='exchange_name',
default=EXCHANGE_NAME,
help='Name of the request exchange.')
parser.add_argument('--queue_name', dest='queue_name',
default=QUEUE_NAME,
help='Name of the queue to connect to')
args = parser.parse_args()
connect_to_broker(broker_url=args.broker_url,
exchange_name=args.exchange_name,
queue_name=args.queue_name)
if __name__ == '__main__':
main()
+56
View File
@@ -0,0 +1,56 @@
from pika import BlockingConnection, ConnectionParameters
def on_message(channel, method_frame, header_frame, body):
print(method_frame.delivery_tag)
print(body)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
def connect_to_broker(broker_url, exchange_name, queue_name):
connection = BlockingConnection(ConnectionParameters(broker_url))
channel = connection.channel()
channel.exchange_declare(exchange=exchange_name,
exchange_type="topic")
channel.queue_declare(queue=queue_name)
channel.queue_bind(queue=queue_name,
exchange=exchange_name,
routing_key=QUEUE_NAME)
channel.basic_consume(queue_name, on_message)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
BROKER_URL = "127.0.0.1"
EXCHANGE_NAME = "request"
QUEUE_NAME = "epics"
def main():
import argparse
parser = argparse.ArgumentParser(description='Epics HDF5 writer')
parser.add_argument('--broker_url', dest='broker_url',
default=BROKER_URL,
help='RabbitMQ broker URL')
parser.add_argument('--exchange_name', dest='exchange_name',
default=EXCHANGE_NAME,
help='Name of the request exchange.')
parser.add_argument('--queue_name', dest='queue_name',
default=QUEUE_NAME,
help='Name of the queue to connect to')
args = parser.parse_args()
connect_to_broker(broker_url=args.broker_url,
exchange_name=args.exchange_name,
queue_name=args.queue_name)
if __name__ == '__main__':
main()