From f883d9c298077be4b39041ad7fb864e0f5e32eaa Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 21 Sep 2020 11:01:23 +0200 Subject: [PATCH] Cleanup project from files moved to sf_broker --- CMakeLists.txt | 1 + core-broker/core-broker/__init__.py | 0 core-broker/core-broker/broker_client.py | 71 ------------ core-broker/core-broker/broker_debugger.py | 70 ------------ epics-writer/Readme.md | 34 ------ epics-writer/conda-recipe/build.sh | 3 - epics-writer/conda-recipe/meta.yaml | 20 ---- epics-writer/epics_writer/__init__.py | 0 epics-writer/epics_writer/start.py | 99 ----------------- epics-writer/epics_writer/writer.py | 120 --------------------- epics-writer/setup.py | 15 --- epics-writer/tests/__init__.py | 0 epics-writer/tests/channels.txt | 6 -- epics-writer/tests/test_download_data.py | 34 ------ 14 files changed, 1 insertion(+), 472 deletions(-) delete mode 100644 core-broker/core-broker/__init__.py delete mode 100644 core-broker/core-broker/broker_client.py delete mode 100644 core-broker/core-broker/broker_debugger.py delete mode 100644 epics-writer/Readme.md delete mode 100644 epics-writer/conda-recipe/build.sh delete mode 100644 epics-writer/conda-recipe/meta.yaml delete mode 100644 epics-writer/epics_writer/__init__.py delete mode 100644 epics-writer/epics_writer/start.py delete mode 100644 epics-writer/epics_writer/writer.py delete mode 100644 epics-writer/setup.py delete mode 100644 epics-writer/tests/__init__.py delete mode 100644 epics-writer/tests/channels.txt delete mode 100644 epics-writer/tests/test_download_data.py diff --git a/CMakeLists.txt b/CMakeLists.txt index d480f91..f67c4f0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,6 +30,7 @@ add_subdirectory( add_subdirectory("core-buffer") add_subdirectory("sf-buffer") +add_subdirectory("sf-buffer-writer") add_subdirectory("sf-stream") add_subdirectory("sf-writer") #add_subdirectory("jf-live-writer") diff --git a/core-broker/core-broker/__init__.py b/core-broker/core-broker/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/core-broker/core-broker/broker_client.py b/core-broker/core-broker/broker_client.py deleted file mode 100644 index 2b676fe..0000000 --- a/core-broker/core-broker/broker_client.py +++ /dev/null @@ -1,71 +0,0 @@ -import json - -from pika import BlockingConnection, ConnectionParameters, BasicProperties - - -class BrokerClient(object): - - REQUEST_EXCHANGE = "request" - STATUS_EXCHANGE = "status" - DEFAULT_BROKER_URL = "127.0.0.1" - - def __init__(self, broker_url=DEFAULT_BROKER_URL): - self.connection = BlockingConnection(ConnectionParameters(broker_url)) - self.channel = self.connection.channel() - - self.channel.exchange_declare(exchange=self.REQUEST_EXCHANGE, - exchange_type="topic") - - self.channel.exchange_declare(exchange=self.STATUS_EXCHANGE, - exchange_type="fanout") - - def close(self): - self.connection.close() - - def request_write(self, - output_prefix, - metadata=None, - detectors=None, - bsread_channels=None, - epics_pvs=None): - - routing_key = "." - - if detectors: - for detector in detectors: - routing_key += detector + "." - - if bsread_channels: - routing_key += "bsread" + "." - - if epics_pvs: - routing_key += "epics" + "." - - body_bytes = json.dumps({ - "output_prefix": output_prefix, - "metadata": metadata, - "detectors": detectors, - "bsread_channels": bsread_channels, - "epics_pvs": epics_pvs - }).encode() - - self.channel.basic_publish(exchange=self.REQUEST_EXCHANGE, - routing_key=routing_key, - body=body_bytes) - - status_header = { - "action": "write_request", - "source": "BrokerClient", - "routing_key": routing_key - } - - self.channel.basic_publish(exchange=self.STATUS_EXCHANGE, - properties=BasicProperties( - headers=status_header), - routing_key="", - body=body_bytes) - - -broker = BrokerClient() -broker.request_write("/tmp/test", epics_pvs=["test"]) -broker.close() diff --git a/core-broker/core-broker/broker_debugger.py b/core-broker/core-broker/broker_debugger.py deleted file mode 100644 index 52b504b..0000000 --- a/core-broker/core-broker/broker_debugger.py +++ /dev/null @@ -1,70 +0,0 @@ -from datetime import datetime -import json - -from pika import BlockingConnection, ConnectionParameters - -DEFAULT_BROKER_URL = "127.0.0.1" -STATUS_EXCHANGE = "status" - -COLOR_END_MARKER = '\x1b[0m' - - -def get_color_for_action(action): - - color_mapping = { - "write_request": "\x1b[34;1m", - "write_start": "\x1b[1;33;1m", - "write_finished": "\x1b[1;32;1m" - } - - return color_mapping.get(action, "") - - -def on_status(channel, method_frame, header_frame, body): - header = header_frame.headers - request = json.loads(body.decode()) - - action = header["action"] - source = header["source"] - - action_output = get_color_for_action(action) + action + COLOR_END_MARKER - time_output = datetime.utcnow().strftime("%Y%m%d-%H:%M:%S.%f") - - print("[%s] %s %s" % (time_output, action_output, source)) - print(request) - - -def connect_to_broker(broker_url): - connection = BlockingConnection(ConnectionParameters(broker_url)) - channel = connection.channel() - - channel.exchange_declare(exchange=STATUS_EXCHANGE, - exchange_type="fanout") - queue = channel.queue_declare(queue="", exclusive=True).method.queue - channel.queue_bind(queue=queue, - exchange=STATUS_EXCHANGE) - - channel.basic_consume(queue, on_status) - - try: - channel.start_consuming() - except KeyboardInterrupt: - channel.stop_consuming() - - -def main(): - import argparse - parser = argparse.ArgumentParser( - description="Connect and listen to broker events.") - - parser.add_argument('--broker_url', dest='broker_url', - default=DEFAULT_BROKER_URL, - help='RabbitMQ broker URL') - - args = parser.parse_args() - - connect_to_broker(broker_url=args.broker_url) - - -if __name__ == '__main__': - main() diff --git a/epics-writer/Readme.md b/epics-writer/Readme.md deleted file mode 100644 index 5f6ab51..0000000 --- a/epics-writer/Readme.md +++ /dev/null @@ -1,34 +0,0 @@ -# Overview - -Simple server to dump Epics Channel Access data to an HDF5 file. -The server gets an http callback from the Broker whenever there was an acquisition. - - -__Note: THIS IS/WAS A FRIDAY AFTERNOON HACK TO MAKE THE SWISSFEL DAQ WORK__ - - -The format of the request is as follows: -``` -{ - 'range': { - 'startPulseId': 100, - 'endPulseId': 120 - }, - - 'parameters': { - 'general/created': 'test', - 'general/user': 'tester', - 'general/process': 'test_process', - 'general/instrument': 'mac', - 'output_file': '/bla/test.h5'} -} - -``` - -Right now this server needs to run on the same server than the - -# Testing - -```bash -curl -XPUT -d '{"range":{"startPulseId": 7281433214, "endPulseId": 7281489688}, "parameters":{"output_file":"test.h5"}}' http://localhost:10200/notify -``` diff --git a/epics-writer/conda-recipe/build.sh b/epics-writer/conda-recipe/build.sh deleted file mode 100644 index d7a34f9..0000000 --- a/epics-writer/conda-recipe/build.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/usr/bin/env bash - -$PYTHON setup.py install # Python command to install the script diff --git a/epics-writer/conda-recipe/meta.yaml b/epics-writer/conda-recipe/meta.yaml deleted file mode 100644 index 8b464c1..0000000 --- a/epics-writer/conda-recipe/meta.yaml +++ /dev/null @@ -1,20 +0,0 @@ -package: - name: epics-writer - version: 0.0.1 - -source: - path: .. - -build: - noarch: python - entry_points: - - epics-writer = epics_writer.start:main - -requirements: - build: - - python - run: - - python - - data_api >=0.7.6 - - requests - - pika diff --git a/epics-writer/epics_writer/__init__.py b/epics-writer/epics_writer/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/epics-writer/epics_writer/start.py b/epics-writer/epics_writer/start.py deleted file mode 100644 index 3647d6d..0000000 --- a/epics-writer/epics_writer/start.py +++ /dev/null @@ -1,99 +0,0 @@ -import json -from pika import BlockingConnection, ConnectionParameters, BasicProperties - -from epics_writer.writer import write_epics_pvs - - -DEFAULT_BROKER_URL = "127.0.0.1" -REQUEST_EXCHANGE = "request" -STATUS_EXCHANGE = "status" -QUEUE_NAME = "epics" -OUTPUT_FILE_SUFFIX = ".PVCHANNELS.h5" - - -def update_status(channel, body, action, file, message=None): - status_header = { - "action": action, - "source": "epics_writer", - "routing_key": QUEUE_NAME, - "file": file, - "message": message - } - - channel.basic_publish(exchange=STATUS_EXCHANGE, - properties=BasicProperties( - headers=status_header), - routing_key="", - body=body) - - -def on_message(channel, method_frame, header_frame, body): - - output_file = None - - try: - request = json.loads(body.decode()) - output_prefix = request["output_prefix"] - start_pulse_id = 1 - stop_pulse_id = 10 - metadata = request["metadata"] - epics_pvs = request["epics_pvs"] - - output_file = output_prefix + OUTPUT_FILE_SUFFIX - - update_status(channel, body, "write_start", output_file) - - write_epics_pvs(output_file=output_file, - start_pulse_id=start_pulse_id, - stop_pulse_id=stop_pulse_id, - metadata=metadata, - epics_pvs=epics_pvs) - - except Exception as e: - channel.basic_reject(delivery_tag=method_frame.delivery_tag, - requeue=False) - - update_status(channel, body, "write_rejected", output_file, str(e)) - - else: - channel.basic_ack(delivery_tag=method_frame.delivery_tag) - - update_status(channel, body, "write_finished", output_file) - - -def connect_to_broker(broker_url): - connection = BlockingConnection(ConnectionParameters(broker_url)) - channel = connection.channel() - - channel.exchange_declare(exchange=STATUS_EXCHANGE, - exchange_type="fanout") - channel.exchange_declare(exchange=REQUEST_EXCHANGE, - exchange_type="topic") - - channel.queue_declare(queue=QUEUE_NAME, auto_delete=True) - channel.queue_bind(queue=QUEUE_NAME, - exchange=REQUEST_EXCHANGE, - routing_key="*.%s.*" % QUEUE_NAME) - channel.basic_qos(prefetch_count=1) - channel.basic_consume(QUEUE_NAME, on_message) - - try: - channel.start_consuming() - except KeyboardInterrupt: - channel.stop_consuming() - - -def main(): - import argparse - parser = argparse.ArgumentParser(description='Epics HDF5 writer') - parser.add_argument('--broker_url', dest='broker_url', - default=DEFAULT_BROKER_URL, - help='RabbitMQ broker URL') - - args = parser.parse_args() - - connect_to_broker(broker_url=args.broker_url) - - -if __name__ == '__main__': - main() diff --git a/epics-writer/epics_writer/writer.py b/epics-writer/epics_writer/writer.py deleted file mode 100644 index fe9bc06..0000000 --- a/epics-writer/epics_writer/writer.py +++ /dev/null @@ -1,120 +0,0 @@ -import datetime -import time -import logging -import requests -import data_api - -logger = logging.getLogger(__name__) - -DATA_API_QUERY_URL = "https://data-api.psi.ch/sf/query" - - -def write_epics_pvs(output_file, start_pulse_id, stop_pulse_id, metadata, epics_pvs): - - start_date = get_pulse_id_date_mapping(start_pulse_id) - stop_date = get_pulse_id_date_mapping(stop_pulse_id) - - data = get_data(epics_pvs, start=start_date, stop=stop_date) - # TODO: Merge metadata to data. - - if data: - logger.info("Persist data to hdf5 file") - data_api.to_hdf5(data, output_file, overwrite=True, compression=None, shuffle=False) - else: - logger.error("No data retrieved") - open(output_file + "_NO_DATA", 'a').close() - - -def get_data(channel_list, start=None, stop=None, base_url=None): - logger.info("Requesting range %s to %s for channels: " % (start, stop, channel_list)) - - query = {"range": {"startDate": datetime.datetime.isoformat(start), - "endDate": datetime.datetime.isoformat(stop), - "startExpansion": True}, - "channels": channel_list, - "fields": ["pulseId", "globalSeconds", "globalDate", "value", - "eventCount"]} - logger.debug(query) - - response = requests.post(DATA_API_QUERY_URL, json=query) - - # Check for successful return of data - if response.status_code != 200: - logger.info("Data retrievali failed, sleep for another time and try") - - itry = 0 - while itry < 5: - itry += 1 - time.sleep(60) - response = requests.post(DATA_API_QUERY_URL, json=query) - if response.status_code == 200: - break - - logger.info("Data retrieval failed, post attempt %d" % itry) - - if response.status_code != 200: - raise RuntimeError("Unable to retrieve data from server: ", response) - - logger.info("Data retieval is successful") - - data = response.json() - - return data_api.client._build_pandas_data_frame(data, index_field="globalDate") - - -def get_pulse_id_date_mapping(pulse_id): - # See https://jira.psi.ch/browse/ATEST-897 for more details ... - logger.info("Retrieve pulse-id/date mapping for pulse_id %s" % pulse_id) - - try: - - query = {"range": {"startPulseId": 0, - "endPulseId": pulse_id}, - "limit": 1, - "ordering": "desc", - "channels": ["SIN-CVME-TIFGUN-EVR0:BUNCH-1-OK"], - "fields": ["pulseId", "globalDate"]} - - for c in range(10): - - response = requests.post("https://data-api.psi.ch/sf/query", json=query) - - # Check for successful return of data - if response.status_code != 200: - raise RuntimeError("Unable to retrieve data from server: ", response) - - data = response.json() - - if len(data[0]["data"]) == 0 or not "pulseId" in data[0]["data"][0]: - raise RuntimeError( - "Didn't get good responce from data_api : %s " % data) - - if not pulse_id == data[0]["data"][0]["pulseId"]: - logger.info("retrieval failed") - if c == 0: - ref_date = data[0]["data"][0]["globalDate"] - ref_date = dateutil.parser.parse(ref_date) - - now_date = datetime.datetime.now() - now_date = pytz.timezone('Europe/Zurich').localize( - now_date) - - check_date = ref_date + datetime.timedelta( - seconds=24) # 20 seconds should be enough - delta_date = check_date - now_date - - s = delta_date.seconds - logger.info("retry in " + str(s) + " seconds ") - if not s <= 0: - time.sleep(s) - continue - - raise RuntimeError('Unable to retrieve mapping') - - date = data[0]["data"][0]["globalDate"] - date = dateutil.parser.parse(date) - dates.append(date) - break - - except Exception as e: - raise RuntimeError('Unable to retrieve pulse_id date mapping') from e diff --git a/epics-writer/setup.py b/epics-writer/setup.py deleted file mode 100644 index d4358df..0000000 --- a/epics-writer/setup.py +++ /dev/null @@ -1,15 +0,0 @@ -from setuptools import setup - -setup( - name="cadump", - version="0.0.12", - author="Paul Scherrer Institute", - author_email="daq@psi.ch", - description="Interface to dump data from archiver/databuffer", - packages=["cadump"], - entry_points={ - 'console_scripts': [ - 'cadump_server = cadump.cadump:main', - ], - } -) diff --git a/epics-writer/tests/__init__.py b/epics-writer/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/epics-writer/tests/channels.txt b/epics-writer/tests/channels.txt deleted file mode 100644 index 63868d2..0000000 --- a/epics-writer/tests/channels.txt +++ /dev/null @@ -1,6 +0,0 @@ -S10-CPCL-VM1MGC:LOAD - -ONE - TWO -# Comment that need to be removed -THREE # one more comment at the end that need to be removed diff --git a/epics-writer/tests/test_download_data.py b/epics-writer/tests/test_download_data.py deleted file mode 100644 index 87a7194..0000000 --- a/epics-writer/tests/test_download_data.py +++ /dev/null @@ -1,34 +0,0 @@ -import unittest -from unittest import TestCase -from cadump import cadump -import logging - -class TestDownloadData(TestCase): - - def test_download_data(self): - config = { - 'range': { - 'startPulseId': 9618913001, - 'endPulseId': 9618923000 - }, - - 'parameters': { - 'general/created': 'test', - 'general/user': 'tester', - 'general/process': 'test_process', - 'general/instrument': 'mac', - 'output_file': 'test.h5'} # this is usually the full path - } - - cadump.base_url = "https://data-api.psi.ch/sf" - cadump.download_data(config) - # self.fail() - - def test_read_channels(self): - channels = cadump.read_channels("channels.txt") - logging.info(channels) - self.assertEqual(len(channels), 4) - - -if __name__ == '__main__': - unittest.main()