mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-13 07:35:35 +02:00
Fixing queue logic
This commit is contained in:
@@ -1,9 +1,19 @@
|
||||
import json
|
||||
|
||||
from pika import BlockingConnection, ConnectionParameters
|
||||
|
||||
|
||||
def on_message(channel, method_frame, header_frame, body):
|
||||
print(method_frame.delivery_tag)
|
||||
print(body)
|
||||
request = json.loads(body.decode())
|
||||
epics_pvs = request.get("epics_pvs")
|
||||
|
||||
if not epics_pvs:
|
||||
channel.basic_reject(delivery_tag=method_frame.delivery_tag,
|
||||
requeue=False)
|
||||
|
||||
# TODO: Send to status exchange.
|
||||
|
||||
|
||||
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
|
||||
|
||||
|
||||
@@ -14,10 +24,10 @@ def connect_to_broker(broker_url, exchange_name, queue_name):
|
||||
channel.exchange_declare(exchange=exchange_name,
|
||||
exchange_type="topic")
|
||||
|
||||
channel.queue_declare(queue=queue_name)
|
||||
channel.queue_declare(queue=queue_name, auto_delete=True)
|
||||
channel.queue_bind(queue=queue_name,
|
||||
exchange=exchange_name,
|
||||
routing_key=QUEUE_NAME)
|
||||
routing_key="*.%s.*" % queue_name)
|
||||
|
||||
channel.basic_consume(queue_name, on_message)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user