diff --git a/cadump/cadump/cadump.py b/cadump/cadump/cadump.py index a7da9d4..a9e2dbf 100644 --- a/cadump/cadump/cadump.py +++ b/cadump/cadump/cadump.py @@ -1,17 +1,19 @@ import json +import pika import data_api import data_api.client import requests -import dateutil.parser -import pytz import datetime import time -import re import logging + +from pika import BlockingConnection, ConnectionParameters + logger = logging.getLogger("logger") + # This is how the notification look like # { # 'range': { @@ -29,24 +31,26 @@ logger = logging.getLogger("logger") def download_data(start_pulse, end_pulse, channels): - logger.info("Dump data to hdf5 ...") logger.info("Retrieve data for channels: %s" % channels) logger.info("Retrieve pulse-id / data mapping for pulse ids") start_date, end_date = get_pulse_id_date_mapping([start_pulse, end_pulse]) - logger.info("Retrieving data for interval start: " + str(start_date) + " end: " + str(end_date) + " . From " + new_base_url) - data = get_data(channels, start=start_date, end=end_date, base_url=new_base_url) + logger.info("Retrieving data for interval start: " + str( + start_date) + " end: " + str(end_date) + " . From " + new_base_url) + data = get_data(channels, start=start_date, end=end_date, + base_url=new_base_url) if len(data) < 1: logger.error("No data retrieved") - open(new_filename+"_NO_DATA", 'a').close() + open(new_filename + "_NO_DATA", 'a').close() else: if new_filename: logger.info("Persist data to hdf5 file") - data_api.to_hdf5(data, new_filename, overwrite=True, compression=None, shuffle=False) + data_api.to_hdf5(data, new_filename, overwrite=True, + compression=None, shuffle=False) def get_data(channel_list, start=None, end=None, base_url=None): @@ -54,7 +58,8 @@ def get_data(channel_list, start=None, end=None, base_url=None): "endDate": datetime.datetime.isoformat(end), "startExpansion": True}, "channels": channel_list, - "fields": ["pulseId", "globalSeconds", "globalDate", "value", "eventCount"]} + "fields": ["pulseId", "globalSeconds", "globalDate", "value", + "eventCount"]} logger.info(query) @@ -80,11 +85,11 @@ def get_data(channel_list, start=None, end=None, base_url=None): data = response.json() - return data_api.client._build_pandas_data_frame(data, index_field="globalDate") + return data_api.client._build_pandas_data_frame(data, + index_field="globalDate") def get_pulse_id_date_mapping(pulse_ids): - # See https://jira.psi.ch/browse/ATEST-897 for more details ... try: @@ -102,16 +107,20 @@ def get_pulse_id_date_mapping(pulse_ids): logger.info("Retrieve mapping for pulse_id %d" % pulse_id) # Query server - response = requests.post("https://data-api.psi.ch/sf/query", json=query) + response = requests.post("https://data-api.psi.ch/sf/query", + json=query) # Check for successful return of data if response.status_code != 200: - raise RuntimeError("Unable to retrieve data from server: ", response) + raise RuntimeError("Unable to retrieve data from server: ", + response) data = response.json() - if len(data[0]["data"]) == 0 or not "pulseId" in data[0]["data"][0]: - raise RuntimeError("Didn't get good responce from data_api : %s " % data) + if len(data[0]["data"]) == 0 or not "pulseId" in \ + data[0]["data"][0]: + raise RuntimeError( + "Didn't get good responce from data_api : %s " % data) if not pulse_id == data[0]["data"][0]["pulseId"]: logger.info("retrieval failed") @@ -120,9 +129,11 @@ def get_pulse_id_date_mapping(pulse_ids): ref_date = dateutil.parser.parse(ref_date) now_date = datetime.datetime.now() - now_date = pytz.timezone('Europe/Zurich').localize(now_date) + now_date = pytz.timezone('Europe/Zurich').localize( + now_date) - check_date = ref_date+datetime.timedelta(seconds=24) # 20 seconds should be enough + check_date = ref_date + datetime.timedelta( + seconds=24) # 20 seconds should be enough delta_date = check_date - now_date s = delta_date.seconds @@ -143,14 +154,54 @@ def get_pulse_id_date_mapping(pulse_ids): raise RuntimeError('Unable to retrieve mapping') +def on_message(channel, method_frame, header_frame, body): + print(method_frame.delivery_tag) + print(body) + channel.basic_ack(delivery_tag=method_frame.delivery_tag) + + +def connect_to_broker(broker_url, exchange_name, queue_name): + connection = BlockingConnection(ConnectionParameters(broker_url)) + channel = connection.channel() + + channel.queue_declare(queue=queue_name, ) + channel.queue_bind(queue=queue_name, + exchange=exchange_name, + routing_key=QUEUE_NAME) + + channel.basic_consume(queue_name, on_message) + + try: + channel.start_consuming() + except KeyboardInterrupt: + channel.stop_consuming() + + +BROKER_URL = "localhost" +EXCHANGE_NAME = "request" +QUEUE_NAME = "epics" + + def main(): import argparse - parser = argparse.ArgumentParser(description='Channel Access archiver dump to hdf5') - parser.add_argument('--channels', dest='channel_list', default="tests/channels.txt", help='channels to dump') - parser.add_argument('--url', dest='url', default=None, help='base url to retrieve data from') + parser = argparse.ArgumentParser( + description='Channel Access archiver dump to hdf5') + parser.add_argument('--broker_url', dest='broker_url', + default=BROKER_URL, + help='RabbitMQ broker URL') + parser.add_argument('--exchange_name', dest='exchange_name', + default=EXCHANGE_NAME, + help='Name of the request exchange.') + parser.add_argument('--queue_name', dest='queue_name', + default=QUEUE_NAME, + help='Name of the queue to connect to') args = parser.parse_args() + connect_to_broker(broker_url=args.broker_url, + exchange_name=args.exchange_name, + queue_name=args.queue_name) + if __name__ == '__main__': main()