Add 'cadump/' from commit '89228a57f2936f8cf2e6fb9c3a37c337c5b4d011'

git-subtree-dir: cadump
git-subtree-mainline: c99ec49917
git-subtree-split: 89228a57f2
This commit is contained in:
2020-08-11 10:59:17 +02:00
12 changed files with 535 additions and 0 deletions
+5
View File
@@ -0,0 +1,5 @@
build
__pycache__
.idea
.pytest_cache
*.h5
+34
View File
@@ -0,0 +1,34 @@
# Overview
Simple server to dump Epics Channel Access data to an HDF5 file.
The server gets an http callback from the Broker whenever there was an acquisition.
__Note: THIS IS/WAS A FRIDAY AFTERNOON HACK TO MAKE THE SWISSFEL DAQ WORK__
The format of the request is as follows:
```
{
'range': {
'startPulseId': 100,
'endPulseId': 120
},
'parameters': {
'general/created': 'test',
'general/user': 'tester',
'general/process': 'test_process',
'general/instrument': 'mac',
'output_file': '/bla/test.h5'}
}
```
Right now this server needs to run on the same server than the
# Testing
```bash
curl -XPUT -d '{"range":{"startPulseId": 7281433214, "endPulseId": 7281489688}, "parameters":{"output_file":"test.h5"}}' http://localhost:10200/notify
```
+23
View File
@@ -0,0 +1,23 @@
# For a quick start check out our HTTP Requests collection (Tools|HTTP Client|Open HTTP Requests Collection) or
# paste cURL into the file and request will be converted to HTTP Request format.
#
# Following HTTP Request Live Templates are available:
# * 'gtrp' and 'gtr' create a GET request with or without query parameters;
# * 'ptr' and 'ptrp' create a POST request with a simple or parameter-like body;
# * 'mptr' and 'fptr' create a POST request to submit a form with a text or file field (multipart/form-data);
# curl -XPUT -d '{"range":{"startPulseId": 11281433214, "endPulseId": 7281489688}, "parameters":{"output_file":"test.h5"}}' http://localhost:10200/notify
PUT http://localhost:10200/notify
Content-Type: application/json
{"range":{"startPulseId": 11876043548, "endPulseId": 11876044548}, "parameters":{"output_file":"test.h5"}}
###
PUT http://localhost:10200/notify
Content-Type: application/json
{"range":{"startPulseId": 11876043548, "endPulseId": 11876044548}, "parameters":{"output_file":"test.h5"}, "channels": ["ABC"]}
###
+179
View File
@@ -0,0 +1,179 @@
SARES11-VPIG124-360:PRESSURE
SARES11-VMFR124-350:PRESSURE
SARES11-VMFR125-600:PRESSURE
SARES11-VMCP125-610:PRESSURE
SARES11-VMFR125-400:PRESSURE
SARES11-VMCP125-410:PRESSURE
SARES11-VMCP125-510:PRESSURE
SARES11-EVSP-SAMPLE:SET_PRES
SARES11-EVSP-010:SAMPLE
SARES11-EVSP-010:INTERMEDIATE
SARES11-EVSP-010:DIFFERENT
SARES11-EVGG-A010:PRESSURE
SARES11-EVGG-B010:PRESSURE
SARES11-VMDI125-400:PRESSURE
SARES11-VMDI125-500:PRESSURE
SARES11-EVGG-C010:PRESSURE
SARES11-EVGV-C010:PLC_OPEN
SARES11-EVGV-D010:PLC_OPEN
SARES11-EVGV-E010:PLC_OPEN
SARES11-EVGV-E020:PLC_OPEN
SARES11-EVGV-F010:PLC_OPEN
SLAAR11-LMOT-M421:MOT.RBV
SLAAR11-LMOT-M422:MOT.RBV
SLAAR11-LMOT-M423:MOT.RBV
SLAAR11-LMOT-M431:MOT.RBV
SLAAR11-LMOT-M432:MOT.RBV
SLAAR11-LMOT-M441:MOT.RBV
SLAAR11-LMOT-M442:MOT.RBV
SLAAR11-LMOT-M444:MOT.RBV
SLAAR11-LENG-R452:VAL_GET
SLAAR11-LMOT-M451:MOTOR_1.RBV
SLAAR11-LDIO-LAS6891:SET_BO02
SLAAR11-LPSYS-ESA:TRANS_OPEN
SAROP11-PALMK118:RAVE
SAROP11-PALMK118:LIVE
SLAAR11-L-LAMCALC2:MOTOR_POS_SMAR_SC
SLAAR11-LENG-R452:VAL_GET
SARES11-XFOC125:LIN1:MOTRBV
SARES11-XICM125:ROX1:MOTRBV
SARES11-XICM125:TRX1:MOTRBV
SARES11-XICM125:ROY1:MOTRBV
SARES11-XMI125-C4P1:EXPOSURE
SARES11-XMI125:FOCUS.RBV
SARES11-XMI125:ZOOM.RBV
SARES11-XILL125:LIGHTLEVEL_SP
SARES11-XMI125:ROY1:MOTRBV
SARES11-XMI125:ROZ1:MOTRBV
SARES11-XOTA125:W_X.RBV
SARES11-XOTA125:W_Y.RBV
SARES11-XOTA125:W_Z.RBV
SARES11-XOTA125:W_RX.RBV
SARES11-XOTA125:W_RY.RBV
SARES11-XOTA125:W_RZ.RBV
SARES11-XOTA125:RAIL.RBV
SARES11-XOTA125:MOTOR_Y1.RBV
SARES11-XOTA125:MOTOR_Y2.RBV
SARES11-XOTA125:MOTOR_Y3.RBV
SARES11-XOTA125:MOTOR_X1.RBV
SARES11-XOTA125:MOTOR_Z1.RBV
SARES11-XOTA125:MOTOR_Z2.RBV
SARES11-XOTA125:MODE_SP
SARES11-XIZO125:TRX1:MOTRBV
SARES11-XIZO125:TRX2:MOTRBV
SARES11-XIZO125:TRY1:MOTRBV
SARES11-XIZO125:TRY2:MOTRBV
SARES10-XHPLC125:01:FLOW
SARES10-XHPLC125:01:PRESSURE
SARES10-XHPLC125:02:FLOW
SARES10-XHPLC125:02:PRESSURE
SARES11-XSAM125:MOTOR_X1.RBV
SARES11-XSAM125:MOTOR_Y1.RBV
SARES11-XSAM125:MOTOR_Z1.RBV
SARES11-XCRY125:CRY_1.RBV
SARES11-XCRY125:CRY_2.RBV
SARES11-XCRY125:TRZ11:MOTRBV
SARES11-XCRY125:ROY11:MOTRBV
SARES11-XCRY125:ROX11:MOTRBV
SARES11-XCRY125:TRZ21:MOTRBV
SARES11-XCRY125:ROY21:MOTRBV
SARES11-XCRY125:ROX21:MOTRBV
SARES11-XCRY125:TRZ12:MOTRBV
SARES11-XCRY125:ROY12:MOTRBV
SARES11-XCRY125:ROX12:MOTRBV
SARES11-XCRY125:TRZ22:MOTRBV
SARES11-XCRY125:ROY22:MOTRBV
SARES11-XCRY125:ROX22:MOTRBV
SARBD01-DBPM040:Q1
SAROP11-ARAMIS:ENERGY
SAROP11-ARAMIS:MODE
SARFE10-PBPG050:PHOTON-ENERGY-PER-PULSE-AVG
SGE-OP2E-ARAMIS:ESA_PH2E_Y1
SGE-OP2E-ARAMIS:ESA_PH2E_Y2
SGE-OP2E-ARAMIS:ESA_PH2E_X1
SGE-OP2E-ARAMIS:ESA_PH2E_X2
SARCL02-DCOL290:ENE
SARBD01-DBPM040:Q1
SARBD01-MBND100:P-READ
SARUN03-UIND030:K_SET
SARUN04-UIND030:K_SET
SARUN05-UIND030:K_SET
SARUN06-UIND030:K_SET
SARUN07-UIND030:K_SET
SARUN08-UIND030:K_SET
SARUN09-UIND030:K_SET
SARUN10-UIND030:K_SET
SARUN11-UIND030:K_SET
SARUN12-UIND030:K_SET
SARUN13-UIND030:K_SET
SARUN14-UIND030:K_SET
SIN-TIMAST-TMA:Beam-Appl-Freq-RB
SIN-TIMAST-TMA:Evt-20-Delay-RB
SIN-TIMAST-TMA:Evt-20-Freq-I
SIN-TIMAST-TMA:Evt-50-Delay-RB
SIN-TIMAST-TMA:Evt-50-Freq-I
SIN-TIMAST-TMA:Evt-29-Delay-RB
SIN-TIMAST-TMA:Evt-29-Freq-I
SIN-TIMAST-TMA:Evt-24-Delay-RB
SIN-TIMAST-TMA:Evt-24-Freq-I
SARFE10-OATT053:TRANS_SP
SARFE10-OATT053:TRANS_RB
SARFE10-OATT053:TRANS3EDHARM_RB
SARFE10-OATT053:ENERGY
SAROP11-OATT120:TRANS_SP
SAROP11-OATT120:TRANS_RB
SAROP11-OATT120:TRANS3EDHARM_RB
SAROP11-OATT120:ENERGY
SAROP11-OPPI110:MOTOR_X1.RBV
SAROP11-OPPI110:MOTOR_Y1.RBV
SAROP11-OPPI110:PRESET_SP
SGE-CPCW-71-EVR0:FrontUnivOut15-Ena-SP
SGE-CPCW-71-EVR0:FrontUnivOut15-Src-SP
SARFE10-OAPU044:MOTOR_X
SARFE10-OAPU044:MOTOR_Y
SARFE10-OAPU044:MOTOR_W
SARFE10-OAPU044:MOTOR_H
SARFE10-OAPU044:MOTOR_X.RBV
SARFE10-OAPU044:MOTOR_Y.RBV
SARFE10-OAPU044:MOTOR_W.RBV
SARFE10-OAPU044:MOTOR_H.RBV
SARFE10-OAPU044:MOTOR_AX1.RBV
SARFE10-OAPU044:MOTOR_AX2.RBV
SARFE10-OAPU044:MOTOR_AY1.RBV
SARFE10-OAPU044:MOTOR_AY2.RBV
SARFE10-OAPU044:MOTOR_BX1.RBV
SARFE10-OAPU044:MOTOR_BX2.RBV
SARFE10-OAPU044:MOTOR_BY1.RBV
SARFE10-OAPU044:MOTOR_BY2.RBV
SAROP11-OAPU104:MOTOR_X
SAROP11-OAPU104:MOTOR_Y
SAROP11-OAPU104:MOTOR_W
SAROP11-OAPU104:MOTOR_H
SAROP11-OAPU120:MOTOR_X
SAROP11-OAPU120:MOTOR_Y
SAROP11-OAPU120:MOTOR_W
SAROP11-OAPU120:MOTOR_H
SLAAR01-TLSY-EPL:JITTERMON
View File
+217
View File
@@ -0,0 +1,217 @@
from bottle import route, run, request, abort
import json
import data_api
import data_api.client
import requests
import dateutil.parser
import pytz
import datetime
import time
import re
import logging
logger = logging.getLogger("logger")
# This is how the notification look like
# {
# 'range': {
# 'startPulseId': 100,
# 'endPulseId': 120
# },
#
# 'parameters': {
# 'general/created': 'test',
# 'general/user': 'tester',
# 'general/process': 'test_process',
# 'general/instrument': 'mac',
# 'output_file': '/bla/test.h5'} # this is usually the full path
# }
channel_list = ["S10-CPCL-VM1MGC:LOAD"] # specified channel is only for test purposes
base_url = ""
@route('/notify', method='PUT')
def put_document():
data = request.body.read()
if not data:
abort(400, 'No data received')
try:
download_data(json.loads(data))
except Exception as e:
logger.exception("Download data failed")
def download_data(config):
logger.info("Dump data to hdf5 ...")
start_pulse = config["range"]["startPulseId"]
end_pulse = config["range"]["endPulseId"]
# Overwrite the channels list if specified in the request
channels = channel_list
if "channels" in config:
channels = config["channels"]
logger.info("Retrieve data for channels: %s" % channels)
if "retrieval_url" in config:
new_base_url = config["retrieval_url"]
else:
new_base_url = base_url
logger.info("Retrieve pulse-id / data mapping for pulse ids")
start_date, end_date = get_pulse_id_date_mapping([start_pulse, end_pulse])
filename = config["parameters"]["output_file"]
if filename != "/dev/null":
new_filename = filename
else:
new_filename = None
logger.info("Retrieving data for interval start: " + str(start_date) + " end: " + str(end_date) + " . From " + new_base_url)
data = get_data(channels, start=start_date, end=end_date, base_url=new_base_url)
if len(data) < 1:
logger.error("No data retrieved")
open(new_filename+"_NO_DATA", 'a').close()
else:
if new_filename:
logger.info("Persist data to hdf5 file")
data_api.to_hdf5(data, new_filename, overwrite=True, compression=None, shuffle=False)
def read_channels(filename):
with open(filename) as f:
lines = f.readlines()
channels = []
for line in lines:
line = re.sub(r'\w*#.*', "", line)
line = line.strip()
if line: # if not empty line
channels.append(line) # remove all leading and trailing spaces
return channels
def get_data(channel_list, start=None, end=None, base_url=None):
query = {"range": {"startDate": datetime.datetime.isoformat(start),
"endDate": datetime.datetime.isoformat(end),
"startExpansion": True},
"channels": channel_list,
"fields": ["pulseId", "globalSeconds", "globalDate", "value", "eventCount"]}
logger.info(query)
response = requests.post(base_url + '/query', json=query)
# Check for successful return of data
if response.status_code != 200:
logger.info("Data retrievali failed, sleep for another time and try")
itry = 0
while itry < 5:
itry += 1
time.sleep(60)
response = requests.post(base_url + '/query', json=query)
if response.status_code == 200:
break
logger.info("Data retrieval failed, post attempt %d" % itry)
if response.status_code != 200:
raise RuntimeError("Unable to retrieve data from server: ", response)
logger.info("Data retieval is successful")
data = response.json()
return data_api.client._build_pandas_data_frame(data, index_field="globalDate")
def get_pulse_id_date_mapping(pulse_ids):
# See https://jira.psi.ch/browse/ATEST-897 for more details ...
try:
dates = []
for pulse_id in pulse_ids:
query = {"range": {"startPulseId": 0,
"endPulseId": pulse_id},
"limit": 1,
"ordering": "desc",
"channels": ["SIN-CVME-TIFGUN-EVR0:BUNCH-1-OK"],
"fields": ["pulseId", "globalDate"]}
for c in range(10):
logger.info("Retrieve mapping for pulse_id %d" % pulse_id)
# Query server
response = requests.post("https://data-api.psi.ch/sf/query", json=query)
# Check for successful return of data
if response.status_code != 200:
raise RuntimeError("Unable to retrieve data from server: ", response)
data = response.json()
if len(data[0]["data"]) == 0 or not "pulseId" in data[0]["data"][0]:
raise RuntimeError("Didn't get good responce from data_api : %s " % data)
if not pulse_id == data[0]["data"][0]["pulseId"]:
logger.info("retrieval failed")
if c == 0:
ref_date = data[0]["data"][0]["globalDate"]
ref_date = dateutil.parser.parse(ref_date)
now_date = datetime.datetime.now()
now_date = pytz.timezone('Europe/Zurich').localize(now_date)
check_date = ref_date+datetime.timedelta(seconds=24) # 20 seconds should be enough
delta_date = check_date - now_date
s = delta_date.seconds
logger.info("retry in " + str(s) + " seconds ")
if not s <= 0:
time.sleep(s)
continue
raise RuntimeError('Unable to retrieve mapping')
date = data[0]["data"][0]["globalDate"]
date = dateutil.parser.parse(date)
dates.append(date)
break
return dates
except Exception:
raise RuntimeError('Unable to retrieve mapping')
def main():
import argparse
parser = argparse.ArgumentParser(description='Channel Access archiver dump to hdf5')
parser.add_argument('--channels', dest='channel_list', default="tests/channels.txt", help='channels to dump')
parser.add_argument('--url', dest='url', default=None, help='base url to retrieve data from')
args = parser.parse_args()
print(args.channel_list)
global channel_list
channel_list = read_channels(args.channel_list)
logger.info("Using channel list: " + " ".join(channel_list))
global base_url
base_url = args.url
logger.info("Using base url: " + str(base_url))
run(host='localhost', port=10200)
if __name__ == '__main__':
main()
+3
View File
@@ -0,0 +1,3 @@
#!/usr/bin/env bash
$PYTHON setup.py install # Python command to install the script
+19
View File
@@ -0,0 +1,19 @@
package:
name: cadump
version: 0.0.12
source:
path: ..
build:
noarch: python
entry_points:
- cadump_server = cadump.cadump:main
requirements:
build:
- python
- data_api >=0.7.6
run:
- python
- data_api >=0.7.6
+15
View File
@@ -0,0 +1,15 @@
from setuptools import setup
setup(
name="cadump",
version="0.0.12",
author="Paul Scherrer Institute",
author_email="daq@psi.ch",
description="Interface to dump data from archiver/databuffer",
packages=["cadump"],
entry_points={
'console_scripts': [
'cadump_server = cadump.cadump:main',
],
}
)
View File
+6
View File
@@ -0,0 +1,6 @@
S10-CPCL-VM1MGC:LOAD
ONE
TWO
# Comment that need to be removed
THREE # one more comment at the end that need to be removed
+34
View File
@@ -0,0 +1,34 @@
import unittest
from unittest import TestCase
from cadump import cadump
import logging
class TestDownloadData(TestCase):
def test_download_data(self):
config = {
'range': {
'startPulseId': 9618913001,
'endPulseId': 9618923000
},
'parameters': {
'general/created': 'test',
'general/user': 'tester',
'general/process': 'test_process',
'general/instrument': 'mac',
'output_file': 'test.h5'} # this is usually the full path
}
cadump.base_url = "https://data-api.psi.ch/sf"
cadump.download_data(config)
# self.fail()
def test_read_channels(self):
channels = cadump.read_channels("channels.txt")
logging.info(channels)
self.assertEqual(len(channels), 4)
if __name__ == '__main__':
unittest.main()