diff --git a/cadump/.gitignore b/cadump/.gitignore new file mode 100644 index 0000000..3ed1302 --- /dev/null +++ b/cadump/.gitignore @@ -0,0 +1,5 @@ +build +__pycache__ +.idea +.pytest_cache +*.h5 diff --git a/cadump/Readme.md b/cadump/Readme.md new file mode 100644 index 0000000..5f6ab51 --- /dev/null +++ b/cadump/Readme.md @@ -0,0 +1,34 @@ +# Overview + +Simple server to dump Epics Channel Access data to an HDF5 file. +The server gets an http callback from the Broker whenever there was an acquisition. + + +__Note: THIS IS/WAS A FRIDAY AFTERNOON HACK TO MAKE THE SWISSFEL DAQ WORK__ + + +The format of the request is as follows: +``` +{ + 'range': { + 'startPulseId': 100, + 'endPulseId': 120 + }, + + 'parameters': { + 'general/created': 'test', + 'general/user': 'tester', + 'general/process': 'test_process', + 'general/instrument': 'mac', + 'output_file': '/bla/test.h5'} +} + +``` + +Right now this server needs to run on the same server than the + +# Testing + +```bash +curl -XPUT -d '{"range":{"startPulseId": 7281433214, "endPulseId": 7281489688}, "parameters":{"output_file":"test.h5"}}' http://localhost:10200/notify +``` diff --git a/cadump/api.http b/cadump/api.http new file mode 100644 index 0000000..0fd0d2e --- /dev/null +++ b/cadump/api.http @@ -0,0 +1,23 @@ +# For a quick start check out our HTTP Requests collection (Tools|HTTP Client|Open HTTP Requests Collection) or +# paste cURL into the file and request will be converted to HTTP Request format. +# +# Following HTTP Request Live Templates are available: +# * 'gtrp' and 'gtr' create a GET request with or without query parameters; +# * 'ptr' and 'ptrp' create a POST request with a simple or parameter-like body; +# * 'mptr' and 'fptr' create a POST request to submit a form with a text or file field (multipart/form-data); + +# curl -XPUT -d '{"range":{"startPulseId": 11281433214, "endPulseId": 7281489688}, "parameters":{"output_file":"test.h5"}}' http://localhost:10200/notify +PUT http://localhost:10200/notify +Content-Type: application/json + +{"range":{"startPulseId": 11876043548, "endPulseId": 11876044548}, "parameters":{"output_file":"test.h5"}} + +### + +PUT http://localhost:10200/notify +Content-Type: application/json + +{"range":{"startPulseId": 11876043548, "endPulseId": 11876044548}, "parameters":{"output_file":"test.h5"}, "channels": ["ABC"]} + +### + diff --git a/cadump/ca_default_channel_list b/cadump/ca_default_channel_list new file mode 100644 index 0000000..cd23afe --- /dev/null +++ b/cadump/ca_default_channel_list @@ -0,0 +1,179 @@ +SARES11-VPIG124-360:PRESSURE +SARES11-VMFR124-350:PRESSURE + +SARES11-VMFR125-600:PRESSURE +SARES11-VMCP125-610:PRESSURE +SARES11-VMFR125-400:PRESSURE +SARES11-VMCP125-410:PRESSURE +SARES11-VMCP125-510:PRESSURE +SARES11-EVSP-SAMPLE:SET_PRES +SARES11-EVSP-010:SAMPLE +SARES11-EVSP-010:INTERMEDIATE +SARES11-EVSP-010:DIFFERENT + + +SARES11-EVGG-A010:PRESSURE +SARES11-EVGG-B010:PRESSURE +SARES11-VMDI125-400:PRESSURE +SARES11-VMDI125-500:PRESSURE +SARES11-EVGG-C010:PRESSURE +SARES11-EVGV-C010:PLC_OPEN +SARES11-EVGV-D010:PLC_OPEN +SARES11-EVGV-E010:PLC_OPEN +SARES11-EVGV-E020:PLC_OPEN +SARES11-EVGV-F010:PLC_OPEN + +SLAAR11-LMOT-M421:MOT.RBV +SLAAR11-LMOT-M422:MOT.RBV +SLAAR11-LMOT-M423:MOT.RBV + +SLAAR11-LMOT-M431:MOT.RBV +SLAAR11-LMOT-M432:MOT.RBV +SLAAR11-LMOT-M441:MOT.RBV +SLAAR11-LMOT-M442:MOT.RBV +SLAAR11-LMOT-M444:MOT.RBV +SLAAR11-LENG-R452:VAL_GET +SLAAR11-LMOT-M451:MOTOR_1.RBV + +SLAAR11-LDIO-LAS6891:SET_BO02 +SLAAR11-LPSYS-ESA:TRANS_OPEN + +SAROP11-PALMK118:RAVE +SAROP11-PALMK118:LIVE +SLAAR11-L-LAMCALC2:MOTOR_POS_SMAR_SC +SLAAR11-LENG-R452:VAL_GET +SARES11-XFOC125:LIN1:MOTRBV +SARES11-XICM125:ROX1:MOTRBV +SARES11-XICM125:TRX1:MOTRBV +SARES11-XICM125:ROY1:MOTRBV + +SARES11-XMI125-C4P1:EXPOSURE +SARES11-XMI125:FOCUS.RBV +SARES11-XMI125:ZOOM.RBV +SARES11-XILL125:LIGHTLEVEL_SP +SARES11-XMI125:ROY1:MOTRBV +SARES11-XMI125:ROZ1:MOTRBV + +SARES11-XOTA125:W_X.RBV +SARES11-XOTA125:W_Y.RBV +SARES11-XOTA125:W_Z.RBV +SARES11-XOTA125:W_RX.RBV +SARES11-XOTA125:W_RY.RBV +SARES11-XOTA125:W_RZ.RBV +SARES11-XOTA125:RAIL.RBV +SARES11-XOTA125:MOTOR_Y1.RBV +SARES11-XOTA125:MOTOR_Y2.RBV +SARES11-XOTA125:MOTOR_Y3.RBV +SARES11-XOTA125:MOTOR_X1.RBV +SARES11-XOTA125:MOTOR_Z1.RBV +SARES11-XOTA125:MOTOR_Z2.RBV +SARES11-XOTA125:MODE_SP + +SARES11-XIZO125:TRX1:MOTRBV +SARES11-XIZO125:TRX2:MOTRBV +SARES11-XIZO125:TRY1:MOTRBV +SARES11-XIZO125:TRY2:MOTRBV + +SARES10-XHPLC125:01:FLOW +SARES10-XHPLC125:01:PRESSURE + +SARES10-XHPLC125:02:FLOW +SARES10-XHPLC125:02:PRESSURE + +SARES11-XSAM125:MOTOR_X1.RBV +SARES11-XSAM125:MOTOR_Y1.RBV +SARES11-XSAM125:MOTOR_Z1.RBV + +SARES11-XCRY125:CRY_1.RBV +SARES11-XCRY125:CRY_2.RBV +SARES11-XCRY125:TRZ11:MOTRBV +SARES11-XCRY125:ROY11:MOTRBV +SARES11-XCRY125:ROX11:MOTRBV +SARES11-XCRY125:TRZ21:MOTRBV +SARES11-XCRY125:ROY21:MOTRBV +SARES11-XCRY125:ROX21:MOTRBV +SARES11-XCRY125:TRZ12:MOTRBV +SARES11-XCRY125:ROY12:MOTRBV +SARES11-XCRY125:ROX12:MOTRBV +SARES11-XCRY125:TRZ22:MOTRBV +SARES11-XCRY125:ROY22:MOTRBV +SARES11-XCRY125:ROX22:MOTRBV + +SARBD01-DBPM040:Q1 +SAROP11-ARAMIS:ENERGY +SAROP11-ARAMIS:MODE +SARFE10-PBPG050:PHOTON-ENERGY-PER-PULSE-AVG +SGE-OP2E-ARAMIS:ESA_PH2E_Y1 +SGE-OP2E-ARAMIS:ESA_PH2E_Y2 +SGE-OP2E-ARAMIS:ESA_PH2E_X1 +SGE-OP2E-ARAMIS:ESA_PH2E_X2 +SARCL02-DCOL290:ENE +SARBD01-DBPM040:Q1 +SARBD01-MBND100:P-READ +SARUN03-UIND030:K_SET +SARUN04-UIND030:K_SET +SARUN05-UIND030:K_SET +SARUN06-UIND030:K_SET +SARUN07-UIND030:K_SET +SARUN08-UIND030:K_SET +SARUN09-UIND030:K_SET +SARUN10-UIND030:K_SET +SARUN11-UIND030:K_SET +SARUN12-UIND030:K_SET +SARUN13-UIND030:K_SET +SARUN14-UIND030:K_SET + +SIN-TIMAST-TMA:Beam-Appl-Freq-RB +SIN-TIMAST-TMA:Evt-20-Delay-RB +SIN-TIMAST-TMA:Evt-20-Freq-I +SIN-TIMAST-TMA:Evt-50-Delay-RB +SIN-TIMAST-TMA:Evt-50-Freq-I +SIN-TIMAST-TMA:Evt-29-Delay-RB +SIN-TIMAST-TMA:Evt-29-Freq-I +SIN-TIMAST-TMA:Evt-24-Delay-RB +SIN-TIMAST-TMA:Evt-24-Freq-I + +SARFE10-OATT053:TRANS_SP +SARFE10-OATT053:TRANS_RB +SARFE10-OATT053:TRANS3EDHARM_RB +SARFE10-OATT053:ENERGY +SAROP11-OATT120:TRANS_SP +SAROP11-OATT120:TRANS_RB +SAROP11-OATT120:TRANS3EDHARM_RB +SAROP11-OATT120:ENERGY + +SAROP11-OPPI110:MOTOR_X1.RBV +SAROP11-OPPI110:MOTOR_Y1.RBV +SAROP11-OPPI110:PRESET_SP +SGE-CPCW-71-EVR0:FrontUnivOut15-Ena-SP +SGE-CPCW-71-EVR0:FrontUnivOut15-Src-SP + +SARFE10-OAPU044:MOTOR_X +SARFE10-OAPU044:MOTOR_Y +SARFE10-OAPU044:MOTOR_W +SARFE10-OAPU044:MOTOR_H +SARFE10-OAPU044:MOTOR_X.RBV +SARFE10-OAPU044:MOTOR_Y.RBV +SARFE10-OAPU044:MOTOR_W.RBV +SARFE10-OAPU044:MOTOR_H.RBV +SARFE10-OAPU044:MOTOR_AX1.RBV +SARFE10-OAPU044:MOTOR_AX2.RBV +SARFE10-OAPU044:MOTOR_AY1.RBV +SARFE10-OAPU044:MOTOR_AY2.RBV +SARFE10-OAPU044:MOTOR_BX1.RBV +SARFE10-OAPU044:MOTOR_BX2.RBV +SARFE10-OAPU044:MOTOR_BY1.RBV +SARFE10-OAPU044:MOTOR_BY2.RBV + +SAROP11-OAPU104:MOTOR_X +SAROP11-OAPU104:MOTOR_Y +SAROP11-OAPU104:MOTOR_W +SAROP11-OAPU104:MOTOR_H + +SAROP11-OAPU120:MOTOR_X +SAROP11-OAPU120:MOTOR_Y +SAROP11-OAPU120:MOTOR_W +SAROP11-OAPU120:MOTOR_H + +SLAAR01-TLSY-EPL:JITTERMON + diff --git a/cadump/cadump/__init__.py b/cadump/cadump/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cadump/cadump/cadump.py b/cadump/cadump/cadump.py new file mode 100644 index 0000000..612aa39 --- /dev/null +++ b/cadump/cadump/cadump.py @@ -0,0 +1,217 @@ +from bottle import route, run, request, abort +import json + +import data_api +import data_api.client +import requests +import dateutil.parser +import pytz +import datetime +import time +import re + +import logging +logger = logging.getLogger("logger") + +# This is how the notification look like +# { +# 'range': { +# 'startPulseId': 100, +# 'endPulseId': 120 +# }, +# +# 'parameters': { +# 'general/created': 'test', +# 'general/user': 'tester', +# 'general/process': 'test_process', +# 'general/instrument': 'mac', +# 'output_file': '/bla/test.h5'} # this is usually the full path +# } + +channel_list = ["S10-CPCL-VM1MGC:LOAD"] # specified channel is only for test purposes +base_url = "" + + +@route('/notify', method='PUT') +def put_document(): + data = request.body.read() + if not data: + abort(400, 'No data received') + + try: + download_data(json.loads(data)) + except Exception as e: + logger.exception("Download data failed") + + +def download_data(config): + + logger.info("Dump data to hdf5 ...") + + start_pulse = config["range"]["startPulseId"] + end_pulse = config["range"]["endPulseId"] + + # Overwrite the channels list if specified in the request + channels = channel_list + if "channels" in config: + channels = config["channels"] + + logger.info("Retrieve data for channels: %s" % channels) + + if "retrieval_url" in config: + new_base_url = config["retrieval_url"] + else: + new_base_url = base_url + + logger.info("Retrieve pulse-id / data mapping for pulse ids") + start_date, end_date = get_pulse_id_date_mapping([start_pulse, end_pulse]) + + filename = config["parameters"]["output_file"] + if filename != "/dev/null": + new_filename = filename + else: + new_filename = None + + logger.info("Retrieving data for interval start: " + str(start_date) + " end: " + str(end_date) + " . From " + new_base_url) + data = get_data(channels, start=start_date, end=end_date, base_url=new_base_url) + + if len(data) < 1: + logger.error("No data retrieved") + open(new_filename+"_NO_DATA", 'a').close() + + else: + if new_filename: + logger.info("Persist data to hdf5 file") + data_api.to_hdf5(data, new_filename, overwrite=True, compression=None, shuffle=False) + + +def read_channels(filename): + with open(filename) as f: + lines = f.readlines() + + channels = [] + for line in lines: + line = re.sub(r'\w*#.*', "", line) + line = line.strip() + if line: # if not empty line + channels.append(line) # remove all leading and trailing spaces + + return channels + + +def get_data(channel_list, start=None, end=None, base_url=None): + query = {"range": {"startDate": datetime.datetime.isoformat(start), + "endDate": datetime.datetime.isoformat(end), + "startExpansion": True}, + "channels": channel_list, + "fields": ["pulseId", "globalSeconds", "globalDate", "value", "eventCount"]} + + logger.info(query) + + response = requests.post(base_url + '/query', json=query) + + # Check for successful return of data + if response.status_code != 200: + logger.info("Data retrievali failed, sleep for another time and try") + + itry = 0 + while itry < 5: + itry += 1 + time.sleep(60) + response = requests.post(base_url + '/query', json=query) + if response.status_code == 200: + break + logger.info("Data retrieval failed, post attempt %d" % itry) + + if response.status_code != 200: + raise RuntimeError("Unable to retrieve data from server: ", response) + + logger.info("Data retieval is successful") + + data = response.json() + + return data_api.client._build_pandas_data_frame(data, index_field="globalDate") + + +def get_pulse_id_date_mapping(pulse_ids): + + # See https://jira.psi.ch/browse/ATEST-897 for more details ... + + try: + dates = [] + for pulse_id in pulse_ids: + + query = {"range": {"startPulseId": 0, + "endPulseId": pulse_id}, + "limit": 1, + "ordering": "desc", + "channels": ["SIN-CVME-TIFGUN-EVR0:BUNCH-1-OK"], + "fields": ["pulseId", "globalDate"]} + + for c in range(10): + + logger.info("Retrieve mapping for pulse_id %d" % pulse_id) + # Query server + response = requests.post("https://data-api.psi.ch/sf/query", json=query) + + # Check for successful return of data + if response.status_code != 200: + raise RuntimeError("Unable to retrieve data from server: ", response) + + data = response.json() + + if len(data[0]["data"]) == 0 or not "pulseId" in data[0]["data"][0]: + raise RuntimeError("Didn't get good responce from data_api : %s " % data) + + if not pulse_id == data[0]["data"][0]["pulseId"]: + logger.info("retrieval failed") + if c == 0: + ref_date = data[0]["data"][0]["globalDate"] + ref_date = dateutil.parser.parse(ref_date) + + now_date = datetime.datetime.now() + now_date = pytz.timezone('Europe/Zurich').localize(now_date) + + check_date = ref_date+datetime.timedelta(seconds=24) # 20 seconds should be enough + delta_date = check_date - now_date + + s = delta_date.seconds + logger.info("retry in " + str(s) + " seconds ") + if not s <= 0: + time.sleep(s) + continue + + raise RuntimeError('Unable to retrieve mapping') + + date = data[0]["data"][0]["globalDate"] + date = dateutil.parser.parse(date) + dates.append(date) + break + + return dates + except Exception: + raise RuntimeError('Unable to retrieve mapping') + + +def main(): + import argparse + parser = argparse.ArgumentParser(description='Channel Access archiver dump to hdf5') + parser.add_argument('--channels', dest='channel_list', default="tests/channels.txt", help='channels to dump') + parser.add_argument('--url', dest='url', default=None, help='base url to retrieve data from') + + args = parser.parse_args() + print(args.channel_list) + + global channel_list + channel_list = read_channels(args.channel_list) + logger.info("Using channel list: " + " ".join(channel_list)) + + global base_url + base_url = args.url + logger.info("Using base url: " + str(base_url)) + + run(host='localhost', port=10200) + + +if __name__ == '__main__': + main() diff --git a/cadump/conda-recipe/build.sh b/cadump/conda-recipe/build.sh new file mode 100644 index 0000000..d7a34f9 --- /dev/null +++ b/cadump/conda-recipe/build.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +$PYTHON setup.py install # Python command to install the script diff --git a/cadump/conda-recipe/meta.yaml b/cadump/conda-recipe/meta.yaml new file mode 100644 index 0000000..e46d052 --- /dev/null +++ b/cadump/conda-recipe/meta.yaml @@ -0,0 +1,19 @@ +package: + name: cadump + version: 0.0.12 + +source: + path: .. + +build: + noarch: python + entry_points: + - cadump_server = cadump.cadump:main + +requirements: + build: + - python + - data_api >=0.7.6 + run: + - python + - data_api >=0.7.6 diff --git a/cadump/setup.py b/cadump/setup.py new file mode 100644 index 0000000..d4358df --- /dev/null +++ b/cadump/setup.py @@ -0,0 +1,15 @@ +from setuptools import setup + +setup( + name="cadump", + version="0.0.12", + author="Paul Scherrer Institute", + author_email="daq@psi.ch", + description="Interface to dump data from archiver/databuffer", + packages=["cadump"], + entry_points={ + 'console_scripts': [ + 'cadump_server = cadump.cadump:main', + ], + } +) diff --git a/cadump/tests/__init__.py b/cadump/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cadump/tests/channels.txt b/cadump/tests/channels.txt new file mode 100644 index 0000000..63868d2 --- /dev/null +++ b/cadump/tests/channels.txt @@ -0,0 +1,6 @@ +S10-CPCL-VM1MGC:LOAD + +ONE + TWO +# Comment that need to be removed +THREE # one more comment at the end that need to be removed diff --git a/cadump/tests/test_download_data.py b/cadump/tests/test_download_data.py new file mode 100644 index 0000000..87a7194 --- /dev/null +++ b/cadump/tests/test_download_data.py @@ -0,0 +1,34 @@ +import unittest +from unittest import TestCase +from cadump import cadump +import logging + +class TestDownloadData(TestCase): + + def test_download_data(self): + config = { + 'range': { + 'startPulseId': 9618913001, + 'endPulseId': 9618923000 + }, + + 'parameters': { + 'general/created': 'test', + 'general/user': 'tester', + 'general/process': 'test_process', + 'general/instrument': 'mac', + 'output_file': 'test.h5'} # this is usually the full path + } + + cadump.base_url = "https://data-api.psi.ch/sf" + cadump.download_data(config) + # self.fail() + + def test_read_channels(self): + channels = cadump.read_channels("channels.txt") + logging.info(channels) + self.assertEqual(len(channels), 4) + + +if __name__ == '__main__': + unittest.main()