mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-06 12:24:13 +02:00
Cleanup project from files moved to sf_broker
This commit is contained in:
@@ -30,6 +30,7 @@ add_subdirectory(
|
||||
|
||||
add_subdirectory("core-buffer")
|
||||
add_subdirectory("sf-buffer")
|
||||
add_subdirectory("sf-buffer-writer")
|
||||
add_subdirectory("sf-stream")
|
||||
add_subdirectory("sf-writer")
|
||||
#add_subdirectory("jf-live-writer")
|
||||
|
||||
@@ -1,71 +0,0 @@
|
||||
import json
|
||||
|
||||
from pika import BlockingConnection, ConnectionParameters, BasicProperties
|
||||
|
||||
|
||||
class BrokerClient(object):
|
||||
|
||||
REQUEST_EXCHANGE = "request"
|
||||
STATUS_EXCHANGE = "status"
|
||||
DEFAULT_BROKER_URL = "127.0.0.1"
|
||||
|
||||
def __init__(self, broker_url=DEFAULT_BROKER_URL):
|
||||
self.connection = BlockingConnection(ConnectionParameters(broker_url))
|
||||
self.channel = self.connection.channel()
|
||||
|
||||
self.channel.exchange_declare(exchange=self.REQUEST_EXCHANGE,
|
||||
exchange_type="topic")
|
||||
|
||||
self.channel.exchange_declare(exchange=self.STATUS_EXCHANGE,
|
||||
exchange_type="fanout")
|
||||
|
||||
def close(self):
|
||||
self.connection.close()
|
||||
|
||||
def request_write(self,
|
||||
output_prefix,
|
||||
metadata=None,
|
||||
detectors=None,
|
||||
bsread_channels=None,
|
||||
epics_pvs=None):
|
||||
|
||||
routing_key = "."
|
||||
|
||||
if detectors:
|
||||
for detector in detectors:
|
||||
routing_key += detector + "."
|
||||
|
||||
if bsread_channels:
|
||||
routing_key += "bsread" + "."
|
||||
|
||||
if epics_pvs:
|
||||
routing_key += "epics" + "."
|
||||
|
||||
body_bytes = json.dumps({
|
||||
"output_prefix": output_prefix,
|
||||
"metadata": metadata,
|
||||
"detectors": detectors,
|
||||
"bsread_channels": bsread_channels,
|
||||
"epics_pvs": epics_pvs
|
||||
}).encode()
|
||||
|
||||
self.channel.basic_publish(exchange=self.REQUEST_EXCHANGE,
|
||||
routing_key=routing_key,
|
||||
body=body_bytes)
|
||||
|
||||
status_header = {
|
||||
"action": "write_request",
|
||||
"source": "BrokerClient",
|
||||
"routing_key": routing_key
|
||||
}
|
||||
|
||||
self.channel.basic_publish(exchange=self.STATUS_EXCHANGE,
|
||||
properties=BasicProperties(
|
||||
headers=status_header),
|
||||
routing_key="",
|
||||
body=body_bytes)
|
||||
|
||||
|
||||
broker = BrokerClient()
|
||||
broker.request_write("/tmp/test", epics_pvs=["test"])
|
||||
broker.close()
|
||||
@@ -1,70 +0,0 @@
|
||||
from datetime import datetime
|
||||
import json
|
||||
|
||||
from pika import BlockingConnection, ConnectionParameters
|
||||
|
||||
DEFAULT_BROKER_URL = "127.0.0.1"
|
||||
STATUS_EXCHANGE = "status"
|
||||
|
||||
COLOR_END_MARKER = '\x1b[0m'
|
||||
|
||||
|
||||
def get_color_for_action(action):
|
||||
|
||||
color_mapping = {
|
||||
"write_request": "\x1b[34;1m",
|
||||
"write_start": "\x1b[1;33;1m",
|
||||
"write_finished": "\x1b[1;32;1m"
|
||||
}
|
||||
|
||||
return color_mapping.get(action, "")
|
||||
|
||||
|
||||
def on_status(channel, method_frame, header_frame, body):
|
||||
header = header_frame.headers
|
||||
request = json.loads(body.decode())
|
||||
|
||||
action = header["action"]
|
||||
source = header["source"]
|
||||
|
||||
action_output = get_color_for_action(action) + action + COLOR_END_MARKER
|
||||
time_output = datetime.utcnow().strftime("%Y%m%d-%H:%M:%S.%f")
|
||||
|
||||
print("[%s] %s %s" % (time_output, action_output, source))
|
||||
print(request)
|
||||
|
||||
|
||||
def connect_to_broker(broker_url):
|
||||
connection = BlockingConnection(ConnectionParameters(broker_url))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange=STATUS_EXCHANGE,
|
||||
exchange_type="fanout")
|
||||
queue = channel.queue_declare(queue="", exclusive=True).method.queue
|
||||
channel.queue_bind(queue=queue,
|
||||
exchange=STATUS_EXCHANGE)
|
||||
|
||||
channel.basic_consume(queue, on_status)
|
||||
|
||||
try:
|
||||
channel.start_consuming()
|
||||
except KeyboardInterrupt:
|
||||
channel.stop_consuming()
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Connect and listen to broker events.")
|
||||
|
||||
parser.add_argument('--broker_url', dest='broker_url',
|
||||
default=DEFAULT_BROKER_URL,
|
||||
help='RabbitMQ broker URL')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
connect_to_broker(broker_url=args.broker_url)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -1,34 +0,0 @@
|
||||
# Overview
|
||||
|
||||
Simple server to dump Epics Channel Access data to an HDF5 file.
|
||||
The server gets an http callback from the Broker whenever there was an acquisition.
|
||||
|
||||
|
||||
__Note: THIS IS/WAS A FRIDAY AFTERNOON HACK TO MAKE THE SWISSFEL DAQ WORK__
|
||||
|
||||
|
||||
The format of the request is as follows:
|
||||
```
|
||||
{
|
||||
'range': {
|
||||
'startPulseId': 100,
|
||||
'endPulseId': 120
|
||||
},
|
||||
|
||||
'parameters': {
|
||||
'general/created': 'test',
|
||||
'general/user': 'tester',
|
||||
'general/process': 'test_process',
|
||||
'general/instrument': 'mac',
|
||||
'output_file': '/bla/test.h5'}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
Right now this server needs to run on the same server than the
|
||||
|
||||
# Testing
|
||||
|
||||
```bash
|
||||
curl -XPUT -d '{"range":{"startPulseId": 7281433214, "endPulseId": 7281489688}, "parameters":{"output_file":"test.h5"}}' http://localhost:10200/notify
|
||||
```
|
||||
@@ -1,3 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
$PYTHON setup.py install # Python command to install the script
|
||||
@@ -1,20 +0,0 @@
|
||||
package:
|
||||
name: epics-writer
|
||||
version: 0.0.1
|
||||
|
||||
source:
|
||||
path: ..
|
||||
|
||||
build:
|
||||
noarch: python
|
||||
entry_points:
|
||||
- epics-writer = epics_writer.start:main
|
||||
|
||||
requirements:
|
||||
build:
|
||||
- python
|
||||
run:
|
||||
- python
|
||||
- data_api >=0.7.6
|
||||
- requests
|
||||
- pika
|
||||
@@ -1,99 +0,0 @@
|
||||
import json
|
||||
from pika import BlockingConnection, ConnectionParameters, BasicProperties
|
||||
|
||||
from epics_writer.writer import write_epics_pvs
|
||||
|
||||
|
||||
DEFAULT_BROKER_URL = "127.0.0.1"
|
||||
REQUEST_EXCHANGE = "request"
|
||||
STATUS_EXCHANGE = "status"
|
||||
QUEUE_NAME = "epics"
|
||||
OUTPUT_FILE_SUFFIX = ".PVCHANNELS.h5"
|
||||
|
||||
|
||||
def update_status(channel, body, action, file, message=None):
|
||||
status_header = {
|
||||
"action": action,
|
||||
"source": "epics_writer",
|
||||
"routing_key": QUEUE_NAME,
|
||||
"file": file,
|
||||
"message": message
|
||||
}
|
||||
|
||||
channel.basic_publish(exchange=STATUS_EXCHANGE,
|
||||
properties=BasicProperties(
|
||||
headers=status_header),
|
||||
routing_key="",
|
||||
body=body)
|
||||
|
||||
|
||||
def on_message(channel, method_frame, header_frame, body):
|
||||
|
||||
output_file = None
|
||||
|
||||
try:
|
||||
request = json.loads(body.decode())
|
||||
output_prefix = request["output_prefix"]
|
||||
start_pulse_id = 1
|
||||
stop_pulse_id = 10
|
||||
metadata = request["metadata"]
|
||||
epics_pvs = request["epics_pvs"]
|
||||
|
||||
output_file = output_prefix + OUTPUT_FILE_SUFFIX
|
||||
|
||||
update_status(channel, body, "write_start", output_file)
|
||||
|
||||
write_epics_pvs(output_file=output_file,
|
||||
start_pulse_id=start_pulse_id,
|
||||
stop_pulse_id=stop_pulse_id,
|
||||
metadata=metadata,
|
||||
epics_pvs=epics_pvs)
|
||||
|
||||
except Exception as e:
|
||||
channel.basic_reject(delivery_tag=method_frame.delivery_tag,
|
||||
requeue=False)
|
||||
|
||||
update_status(channel, body, "write_rejected", output_file, str(e))
|
||||
|
||||
else:
|
||||
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
|
||||
|
||||
update_status(channel, body, "write_finished", output_file)
|
||||
|
||||
|
||||
def connect_to_broker(broker_url):
|
||||
connection = BlockingConnection(ConnectionParameters(broker_url))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange=STATUS_EXCHANGE,
|
||||
exchange_type="fanout")
|
||||
channel.exchange_declare(exchange=REQUEST_EXCHANGE,
|
||||
exchange_type="topic")
|
||||
|
||||
channel.queue_declare(queue=QUEUE_NAME, auto_delete=True)
|
||||
channel.queue_bind(queue=QUEUE_NAME,
|
||||
exchange=REQUEST_EXCHANGE,
|
||||
routing_key="*.%s.*" % QUEUE_NAME)
|
||||
channel.basic_qos(prefetch_count=1)
|
||||
channel.basic_consume(QUEUE_NAME, on_message)
|
||||
|
||||
try:
|
||||
channel.start_consuming()
|
||||
except KeyboardInterrupt:
|
||||
channel.stop_consuming()
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description='Epics HDF5 writer')
|
||||
parser.add_argument('--broker_url', dest='broker_url',
|
||||
default=DEFAULT_BROKER_URL,
|
||||
help='RabbitMQ broker URL')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
connect_to_broker(broker_url=args.broker_url)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -1,120 +0,0 @@
|
||||
import datetime
|
||||
import time
|
||||
import logging
|
||||
import requests
|
||||
import data_api
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DATA_API_QUERY_URL = "https://data-api.psi.ch/sf/query"
|
||||
|
||||
|
||||
def write_epics_pvs(output_file, start_pulse_id, stop_pulse_id, metadata, epics_pvs):
|
||||
|
||||
start_date = get_pulse_id_date_mapping(start_pulse_id)
|
||||
stop_date = get_pulse_id_date_mapping(stop_pulse_id)
|
||||
|
||||
data = get_data(epics_pvs, start=start_date, stop=stop_date)
|
||||
# TODO: Merge metadata to data.
|
||||
|
||||
if data:
|
||||
logger.info("Persist data to hdf5 file")
|
||||
data_api.to_hdf5(data, output_file, overwrite=True, compression=None, shuffle=False)
|
||||
else:
|
||||
logger.error("No data retrieved")
|
||||
open(output_file + "_NO_DATA", 'a').close()
|
||||
|
||||
|
||||
def get_data(channel_list, start=None, stop=None, base_url=None):
|
||||
logger.info("Requesting range %s to %s for channels: " % (start, stop, channel_list))
|
||||
|
||||
query = {"range": {"startDate": datetime.datetime.isoformat(start),
|
||||
"endDate": datetime.datetime.isoformat(stop),
|
||||
"startExpansion": True},
|
||||
"channels": channel_list,
|
||||
"fields": ["pulseId", "globalSeconds", "globalDate", "value",
|
||||
"eventCount"]}
|
||||
logger.debug(query)
|
||||
|
||||
response = requests.post(DATA_API_QUERY_URL, json=query)
|
||||
|
||||
# Check for successful return of data
|
||||
if response.status_code != 200:
|
||||
logger.info("Data retrievali failed, sleep for another time and try")
|
||||
|
||||
itry = 0
|
||||
while itry < 5:
|
||||
itry += 1
|
||||
time.sleep(60)
|
||||
response = requests.post(DATA_API_QUERY_URL, json=query)
|
||||
if response.status_code == 200:
|
||||
break
|
||||
|
||||
logger.info("Data retrieval failed, post attempt %d" % itry)
|
||||
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError("Unable to retrieve data from server: ", response)
|
||||
|
||||
logger.info("Data retieval is successful")
|
||||
|
||||
data = response.json()
|
||||
|
||||
return data_api.client._build_pandas_data_frame(data, index_field="globalDate")
|
||||
|
||||
|
||||
def get_pulse_id_date_mapping(pulse_id):
|
||||
# See https://jira.psi.ch/browse/ATEST-897 for more details ...
|
||||
logger.info("Retrieve pulse-id/date mapping for pulse_id %s" % pulse_id)
|
||||
|
||||
try:
|
||||
|
||||
query = {"range": {"startPulseId": 0,
|
||||
"endPulseId": pulse_id},
|
||||
"limit": 1,
|
||||
"ordering": "desc",
|
||||
"channels": ["SIN-CVME-TIFGUN-EVR0:BUNCH-1-OK"],
|
||||
"fields": ["pulseId", "globalDate"]}
|
||||
|
||||
for c in range(10):
|
||||
|
||||
response = requests.post("https://data-api.psi.ch/sf/query", json=query)
|
||||
|
||||
# Check for successful return of data
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError("Unable to retrieve data from server: ", response)
|
||||
|
||||
data = response.json()
|
||||
|
||||
if len(data[0]["data"]) == 0 or not "pulseId" in data[0]["data"][0]:
|
||||
raise RuntimeError(
|
||||
"Didn't get good responce from data_api : %s " % data)
|
||||
|
||||
if not pulse_id == data[0]["data"][0]["pulseId"]:
|
||||
logger.info("retrieval failed")
|
||||
if c == 0:
|
||||
ref_date = data[0]["data"][0]["globalDate"]
|
||||
ref_date = dateutil.parser.parse(ref_date)
|
||||
|
||||
now_date = datetime.datetime.now()
|
||||
now_date = pytz.timezone('Europe/Zurich').localize(
|
||||
now_date)
|
||||
|
||||
check_date = ref_date + datetime.timedelta(
|
||||
seconds=24) # 20 seconds should be enough
|
||||
delta_date = check_date - now_date
|
||||
|
||||
s = delta_date.seconds
|
||||
logger.info("retry in " + str(s) + " seconds ")
|
||||
if not s <= 0:
|
||||
time.sleep(s)
|
||||
continue
|
||||
|
||||
raise RuntimeError('Unable to retrieve mapping')
|
||||
|
||||
date = data[0]["data"][0]["globalDate"]
|
||||
date = dateutil.parser.parse(date)
|
||||
dates.append(date)
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
raise RuntimeError('Unable to retrieve pulse_id date mapping') from e
|
||||
@@ -1,15 +0,0 @@
|
||||
from setuptools import setup
|
||||
|
||||
setup(
|
||||
name="cadump",
|
||||
version="0.0.12",
|
||||
author="Paul Scherrer Institute",
|
||||
author_email="daq@psi.ch",
|
||||
description="Interface to dump data from archiver/databuffer",
|
||||
packages=["cadump"],
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'cadump_server = cadump.cadump:main',
|
||||
],
|
||||
}
|
||||
)
|
||||
@@ -1,6 +0,0 @@
|
||||
S10-CPCL-VM1MGC:LOAD
|
||||
|
||||
ONE
|
||||
TWO
|
||||
# Comment that need to be removed
|
||||
THREE # one more comment at the end that need to be removed
|
||||
@@ -1,34 +0,0 @@
|
||||
import unittest
|
||||
from unittest import TestCase
|
||||
from cadump import cadump
|
||||
import logging
|
||||
|
||||
class TestDownloadData(TestCase):
|
||||
|
||||
def test_download_data(self):
|
||||
config = {
|
||||
'range': {
|
||||
'startPulseId': 9618913001,
|
||||
'endPulseId': 9618923000
|
||||
},
|
||||
|
||||
'parameters': {
|
||||
'general/created': 'test',
|
||||
'general/user': 'tester',
|
||||
'general/process': 'test_process',
|
||||
'general/instrument': 'mac',
|
||||
'output_file': 'test.h5'} # this is usually the full path
|
||||
}
|
||||
|
||||
cadump.base_url = "https://data-api.psi.ch/sf"
|
||||
cadump.download_data(config)
|
||||
# self.fail()
|
||||
|
||||
def test_read_channels(self):
|
||||
channels = cadump.read_channels("channels.txt")
|
||||
logging.info(channels)
|
||||
self.assertEqual(len(channels), 4)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user