mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-01 01:22:23 +02:00
Improve broker wrapper script
This commit is contained in:
@@ -2,14 +2,14 @@ import json
|
||||
|
||||
from pika import BlockingConnection, ConnectionParameters, BasicProperties
|
||||
|
||||
DEFAULT_BROKER_URL = "127.0.0.1"
|
||||
REQUEST_EXCHANGE = "request"
|
||||
STATUS_EXCHANGE = "status"
|
||||
QUEUE_NAME = "epics"
|
||||
OUTPUT_FILE_SUFFIX = ".epics.h5"
|
||||
OUTPUT_FILE_SUFFIX = ".PVCHANNELS.h5"
|
||||
|
||||
|
||||
def update_status(channel, body, action, file, message=None):
|
||||
|
||||
status_header = {
|
||||
"action": action,
|
||||
"source": "epics-writer",
|
||||
@@ -27,6 +27,8 @@ def update_status(channel, body, action, file, message=None):
|
||||
|
||||
def on_message(channel, method_frame, header_frame, body):
|
||||
|
||||
output_file = None
|
||||
|
||||
try:
|
||||
request = json.loads(body.decode())
|
||||
output_prefix = request["output_prefix"]
|
||||
@@ -39,14 +41,17 @@ def on_message(channel, method_frame, header_frame, body):
|
||||
|
||||
update_status(channel, body, "write_start", output_file)
|
||||
|
||||
# TODO: Call the actual writing.
|
||||
|
||||
except Exception as e:
|
||||
channel.basic_reject(delivery_tag=method_frame.delivery_tag,
|
||||
requeue=False)
|
||||
|
||||
update_status(channel, body, "write_rejected", str(e))
|
||||
update_status(channel, body, "write_rejected", output_file, str(e))
|
||||
|
||||
else:
|
||||
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
|
||||
|
||||
update_status(channel, body, "write_finished", output_file)
|
||||
|
||||
|
||||
@@ -54,18 +59,15 @@ def connect_to_broker(broker_url):
|
||||
connection = BlockingConnection(ConnectionParameters(broker_url))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange=REQUEST_EXCHANGE,
|
||||
exchange_type="topic")
|
||||
|
||||
channel.exchange_declare(exchange=STATUS_EXCHANGE,
|
||||
exchange_type="fanout")
|
||||
channel.exchange_declare(exchange=REQUEST_EXCHANGE,
|
||||
exchange_type="topic")
|
||||
|
||||
channel.queue_declare(queue=QUEUE_NAME, auto_delete=True)
|
||||
channel.queue_bind(queue=QUEUE_NAME,
|
||||
exchange=REQUEST_EXCHANGE,
|
||||
routing_key="*.%s.*" % QUEUE_NAME,
|
||||
auto_detele=True)
|
||||
|
||||
routing_key="*.%s.*" % QUEUE_NAME)
|
||||
channel.basic_qos(prefetch_count=1)
|
||||
channel.basic_consume(QUEUE_NAME, on_message)
|
||||
|
||||
@@ -75,9 +77,6 @@ def connect_to_broker(broker_url):
|
||||
channel.stop_consuming()
|
||||
|
||||
|
||||
DEFAULT_BROKER_URL = "127.0.0.1"
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description='Epics HDF5 writer')
|
||||
|
||||
Reference in New Issue
Block a user