mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-04-30 14:22:23 +02:00
initial commit
This commit is contained in:
@@ -0,0 +1,4 @@
|
||||
__pycache__
|
||||
.idea
|
||||
.pytest_cache
|
||||
*.h5
|
||||
@@ -0,0 +1,29 @@
|
||||
# Overview
|
||||
Simple server to dump Epics Channel Access data to an HDF5 file.
|
||||
The server gets an http callback from the Broker whenever there was an acquisition.
|
||||
|
||||
The format of the request is as follows:
|
||||
```
|
||||
{
|
||||
'range': {
|
||||
'startPulseId': 100,
|
||||
'endPulseId': 120
|
||||
},
|
||||
|
||||
'parameters': {
|
||||
'general/created': 'test',
|
||||
'general/user': 'tester',
|
||||
'general/process': 'test_process',
|
||||
'general/instrument': 'mac',
|
||||
'output_file': '/bla/test.h5'}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
Right now this server needs to run on the same server than the
|
||||
|
||||
# Testing
|
||||
|
||||
```bash
|
||||
curl -XPUT -d '{"range":{"startPulseId": 7281433214, "endPulseId": 7281489688}, "parameters":{"output_file":"test.h5"}}' http://localhost:10200/notify
|
||||
```
|
||||
@@ -0,0 +1,88 @@
|
||||
from bottle import route, run, request, abort
|
||||
import json
|
||||
import logging
|
||||
|
||||
import data_api
|
||||
|
||||
# This is how the notification look like
|
||||
# {
|
||||
# 'range': {
|
||||
# 'startPulseId': 100,
|
||||
# 'endPulseId': 120
|
||||
# },
|
||||
#
|
||||
# 'parameters': {
|
||||
# 'general/created': 'test',
|
||||
# 'general/user': 'tester',
|
||||
# 'general/process': 'test_process',
|
||||
# 'general/instrument': 'mac',
|
||||
# 'output_file': '/bla/test.h5'} # this is usually the full path
|
||||
# }
|
||||
|
||||
channel_list = ["S10-CPCL-VM1MGC:LOAD"] # specified channel is only for test purposes
|
||||
|
||||
|
||||
@route('/notify', method='PUT')
|
||||
def put_document():
|
||||
data = request.body.read()
|
||||
if not data:
|
||||
abort(400, 'No data received')
|
||||
|
||||
try:
|
||||
download_data(json.loads(data))
|
||||
except Exception as e:
|
||||
logging.warning("Download data failed", e)
|
||||
|
||||
|
||||
def download_data(config):
|
||||
|
||||
logging.info("Dump data to hdf5 ...")
|
||||
# logging.info(config)
|
||||
|
||||
start_pulse = config["range"]["startPulseId"]
|
||||
end_pulse = config["range"]["endPulseId"]
|
||||
|
||||
start_date, end_date = data_api.get_global_date([start_pulse, end_pulse])
|
||||
|
||||
print(start_date, end_date)
|
||||
|
||||
# append _CA to the filename
|
||||
filename = config["parameters"]["output_file"]
|
||||
new_filename = filename[:-3]+"_CA"+filename[-3:]
|
||||
|
||||
logging.info("Retrieving data")
|
||||
data = data_api.get_data(channel_list, start=start_date, end= end_date)
|
||||
logging.info("Persist data to hdf5 file")
|
||||
data_api.to_hdf5(data, new_filename, overwrite=True, compression=None, shuffle=False)
|
||||
|
||||
|
||||
def read_channels(filename):
|
||||
with open(filename) as f:
|
||||
lines = f.readlines()
|
||||
|
||||
channels = []
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if line: # if not empty line
|
||||
channels.append(line) # remove all leading and trailing spaces
|
||||
|
||||
return channels
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description='Channel Access archiver dump to hdf5')
|
||||
parser.add_argument('--channels', dest='channel_list', default="tests/channels.txt", help='channels to dump')
|
||||
|
||||
args = parser.parse_args()
|
||||
print(args.channel_list)
|
||||
|
||||
global channel_list
|
||||
channel_list = read_channels(args.channel_list)
|
||||
logging.info("Using channel list: " + " ".join(channel_list))
|
||||
|
||||
run(host='localhost', port=10200)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -0,0 +1,17 @@
|
||||
{% set data = load_setup_py_data() %}
|
||||
|
||||
package:
|
||||
name: cadump
|
||||
version: {{ data.get('version') }}
|
||||
|
||||
build:
|
||||
entry_points:
|
||||
- cadump_server = cadump.cadump:main
|
||||
|
||||
requirements:
|
||||
build:
|
||||
- python
|
||||
- data_api
|
||||
run:
|
||||
- python
|
||||
- data_api
|
||||
@@ -0,0 +1,25 @@
|
||||
import os
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
|
||||
# Utility function to read the README file.
|
||||
# Used for the long_description. It's nice, because now 1) we have a top level
|
||||
# README file and 2) it's easier to type in the README file than to put a raw
|
||||
# string in below ...
|
||||
def read(fname):
|
||||
return open(os.path.join(os.path.dirname(__file__), fname)).read()
|
||||
|
||||
|
||||
setup(
|
||||
name="cadump",
|
||||
version="0.0.1",
|
||||
author="Paul Scherrer Institute",
|
||||
author_email="daq@psi.ch",
|
||||
description=("Interface to dump data from archiver/databuffer"),
|
||||
license="GPLv3",
|
||||
keywords="",
|
||||
url="",
|
||||
packages=find_packages(),
|
||||
long_description=read('Readme.md'),
|
||||
|
||||
)
|
||||
@@ -0,0 +1,5 @@
|
||||
S10-CPCL-VM1MGC:LOAD
|
||||
|
||||
ONE
|
||||
TWO
|
||||
THREE
|
||||
@@ -0,0 +1,35 @@
|
||||
import unittest
|
||||
from unittest import TestCase
|
||||
from cadump import cadump
|
||||
import logging
|
||||
|
||||
class TestDownloadData(TestCase):
|
||||
|
||||
def test_download_data(self):
|
||||
config = {
|
||||
'range': {
|
||||
'startPulseId': 7281433214,
|
||||
'endPulseId': 7281489688
|
||||
},
|
||||
|
||||
'parameters': {
|
||||
'general/created': 'test',
|
||||
'general/user': 'tester',
|
||||
'general/process': 'test_process',
|
||||
'general/instrument': 'mac',
|
||||
'output_file': 'test.h5'} # this is usually the full path
|
||||
}
|
||||
|
||||
cadump.download_data(config)
|
||||
# self.fail()
|
||||
|
||||
def test_read_channels(self):
|
||||
channels = cadump.read_channels("channels.txt")
|
||||
logging.info(channels)
|
||||
self.assertEqual(len(channels), 3)
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user