mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-01 15:52:24 +02:00
Adding RabbitMQ broker
This commit is contained in:
+71
-20
@@ -1,17 +1,19 @@
|
||||
import json
|
||||
|
||||
import pika
|
||||
import data_api
|
||||
import data_api.client
|
||||
import requests
|
||||
import dateutil.parser
|
||||
import pytz
|
||||
import datetime
|
||||
import time
|
||||
import re
|
||||
|
||||
import logging
|
||||
|
||||
from pika import BlockingConnection, ConnectionParameters
|
||||
|
||||
logger = logging.getLogger("logger")
|
||||
|
||||
|
||||
# This is how the notification look like
|
||||
# {
|
||||
# 'range': {
|
||||
@@ -29,24 +31,26 @@ logger = logging.getLogger("logger")
|
||||
|
||||
|
||||
def download_data(start_pulse, end_pulse, channels):
|
||||
|
||||
logger.info("Dump data to hdf5 ...")
|
||||
logger.info("Retrieve data for channels: %s" % channels)
|
||||
|
||||
logger.info("Retrieve pulse-id / data mapping for pulse ids")
|
||||
start_date, end_date = get_pulse_id_date_mapping([start_pulse, end_pulse])
|
||||
|
||||
logger.info("Retrieving data for interval start: " + str(start_date) + " end: " + str(end_date) + " . From " + new_base_url)
|
||||
data = get_data(channels, start=start_date, end=end_date, base_url=new_base_url)
|
||||
logger.info("Retrieving data for interval start: " + str(
|
||||
start_date) + " end: " + str(end_date) + " . From " + new_base_url)
|
||||
data = get_data(channels, start=start_date, end=end_date,
|
||||
base_url=new_base_url)
|
||||
|
||||
if len(data) < 1:
|
||||
logger.error("No data retrieved")
|
||||
open(new_filename+"_NO_DATA", 'a').close()
|
||||
open(new_filename + "_NO_DATA", 'a').close()
|
||||
|
||||
else:
|
||||
if new_filename:
|
||||
logger.info("Persist data to hdf5 file")
|
||||
data_api.to_hdf5(data, new_filename, overwrite=True, compression=None, shuffle=False)
|
||||
data_api.to_hdf5(data, new_filename, overwrite=True,
|
||||
compression=None, shuffle=False)
|
||||
|
||||
|
||||
def get_data(channel_list, start=None, end=None, base_url=None):
|
||||
@@ -54,7 +58,8 @@ def get_data(channel_list, start=None, end=None, base_url=None):
|
||||
"endDate": datetime.datetime.isoformat(end),
|
||||
"startExpansion": True},
|
||||
"channels": channel_list,
|
||||
"fields": ["pulseId", "globalSeconds", "globalDate", "value", "eventCount"]}
|
||||
"fields": ["pulseId", "globalSeconds", "globalDate", "value",
|
||||
"eventCount"]}
|
||||
|
||||
logger.info(query)
|
||||
|
||||
@@ -80,11 +85,11 @@ def get_data(channel_list, start=None, end=None, base_url=None):
|
||||
|
||||
data = response.json()
|
||||
|
||||
return data_api.client._build_pandas_data_frame(data, index_field="globalDate")
|
||||
return data_api.client._build_pandas_data_frame(data,
|
||||
index_field="globalDate")
|
||||
|
||||
|
||||
def get_pulse_id_date_mapping(pulse_ids):
|
||||
|
||||
# See https://jira.psi.ch/browse/ATEST-897 for more details ...
|
||||
|
||||
try:
|
||||
@@ -102,16 +107,20 @@ def get_pulse_id_date_mapping(pulse_ids):
|
||||
|
||||
logger.info("Retrieve mapping for pulse_id %d" % pulse_id)
|
||||
# Query server
|
||||
response = requests.post("https://data-api.psi.ch/sf/query", json=query)
|
||||
response = requests.post("https://data-api.psi.ch/sf/query",
|
||||
json=query)
|
||||
|
||||
# Check for successful return of data
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError("Unable to retrieve data from server: ", response)
|
||||
raise RuntimeError("Unable to retrieve data from server: ",
|
||||
response)
|
||||
|
||||
data = response.json()
|
||||
|
||||
if len(data[0]["data"]) == 0 or not "pulseId" in data[0]["data"][0]:
|
||||
raise RuntimeError("Didn't get good responce from data_api : %s " % data)
|
||||
if len(data[0]["data"]) == 0 or not "pulseId" in \
|
||||
data[0]["data"][0]:
|
||||
raise RuntimeError(
|
||||
"Didn't get good responce from data_api : %s " % data)
|
||||
|
||||
if not pulse_id == data[0]["data"][0]["pulseId"]:
|
||||
logger.info("retrieval failed")
|
||||
@@ -120,9 +129,11 @@ def get_pulse_id_date_mapping(pulse_ids):
|
||||
ref_date = dateutil.parser.parse(ref_date)
|
||||
|
||||
now_date = datetime.datetime.now()
|
||||
now_date = pytz.timezone('Europe/Zurich').localize(now_date)
|
||||
now_date = pytz.timezone('Europe/Zurich').localize(
|
||||
now_date)
|
||||
|
||||
check_date = ref_date+datetime.timedelta(seconds=24) # 20 seconds should be enough
|
||||
check_date = ref_date + datetime.timedelta(
|
||||
seconds=24) # 20 seconds should be enough
|
||||
delta_date = check_date - now_date
|
||||
|
||||
s = delta_date.seconds
|
||||
@@ -143,14 +154,54 @@ def get_pulse_id_date_mapping(pulse_ids):
|
||||
raise RuntimeError('Unable to retrieve mapping')
|
||||
|
||||
|
||||
def on_message(channel, method_frame, header_frame, body):
|
||||
print(method_frame.delivery_tag)
|
||||
print(body)
|
||||
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
|
||||
|
||||
|
||||
def connect_to_broker(broker_url, exchange_name, queue_name):
|
||||
connection = BlockingConnection(ConnectionParameters(broker_url))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue=queue_name, )
|
||||
channel.queue_bind(queue=queue_name,
|
||||
exchange=exchange_name,
|
||||
routing_key=QUEUE_NAME)
|
||||
|
||||
channel.basic_consume(queue_name, on_message)
|
||||
|
||||
try:
|
||||
channel.start_consuming()
|
||||
except KeyboardInterrupt:
|
||||
channel.stop_consuming()
|
||||
|
||||
|
||||
BROKER_URL = "localhost"
|
||||
EXCHANGE_NAME = "request"
|
||||
QUEUE_NAME = "epics"
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description='Channel Access archiver dump to hdf5')
|
||||
parser.add_argument('--channels', dest='channel_list', default="tests/channels.txt", help='channels to dump')
|
||||
parser.add_argument('--url', dest='url', default=None, help='base url to retrieve data from')
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Channel Access archiver dump to hdf5')
|
||||
parser.add_argument('--broker_url', dest='broker_url',
|
||||
default=BROKER_URL,
|
||||
help='RabbitMQ broker URL')
|
||||
parser.add_argument('--exchange_name', dest='exchange_name',
|
||||
default=EXCHANGE_NAME,
|
||||
help='Name of the request exchange.')
|
||||
parser.add_argument('--queue_name', dest='queue_name',
|
||||
default=QUEUE_NAME,
|
||||
help='Name of the queue to connect to')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
connect_to_broker(broker_url=args.broker_url,
|
||||
exchange_name=args.exchange_name,
|
||||
queue_name=args.queue_name)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user