mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-04 06:54:15 +02:00
Cleanup of version files
This commit is contained in:
@@ -32,5 +32,3 @@ add_subdirectory("core-buffer")
|
||||
add_subdirectory("sf-buffer")
|
||||
add_subdirectory("sf-stream")
|
||||
add_subdirectory("sf-writer")
|
||||
#add_subdirectory("jf-live-writer")
|
||||
#add_subdirectory("jf-live-daq")
|
||||
|
||||
@@ -1,71 +0,0 @@
|
||||
import json
|
||||
|
||||
from pika import BlockingConnection, ConnectionParameters, BasicProperties
|
||||
|
||||
|
||||
class BrokerClient(object):
|
||||
|
||||
REQUEST_EXCHANGE = "request"
|
||||
STATUS_EXCHANGE = "status"
|
||||
DEFAULT_BROKER_URL = "127.0.0.1"
|
||||
|
||||
def __init__(self, broker_url=DEFAULT_BROKER_URL):
|
||||
self.connection = BlockingConnection(ConnectionParameters(broker_url))
|
||||
self.channel = self.connection.channel()
|
||||
|
||||
self.channel.exchange_declare(exchange=self.REQUEST_EXCHANGE,
|
||||
exchange_type="topic")
|
||||
|
||||
self.channel.exchange_declare(exchange=self.STATUS_EXCHANGE,
|
||||
exchange_type="fanout")
|
||||
|
||||
def close(self):
|
||||
self.connection.close()
|
||||
|
||||
def request_write(self,
|
||||
output_prefix,
|
||||
metadata=None,
|
||||
detectors=None,
|
||||
bsread_channels=None,
|
||||
epics_pvs=None):
|
||||
|
||||
routing_key = "."
|
||||
|
||||
if detectors:
|
||||
for detector in detectors:
|
||||
routing_key += detector + "."
|
||||
|
||||
if bsread_channels:
|
||||
routing_key += "bsread" + "."
|
||||
|
||||
if epics_pvs:
|
||||
routing_key += "epics" + "."
|
||||
|
||||
body_bytes = json.dumps({
|
||||
"output_prefix": output_prefix,
|
||||
"metadata": metadata,
|
||||
"detectors": detectors,
|
||||
"bsread_channels": bsread_channels,
|
||||
"epics_pvs": epics_pvs
|
||||
}).encode()
|
||||
|
||||
self.channel.basic_publish(exchange=self.REQUEST_EXCHANGE,
|
||||
routing_key=routing_key,
|
||||
body=body_bytes)
|
||||
|
||||
status_header = {
|
||||
"action": "write_request",
|
||||
"source": "BrokerClient",
|
||||
"routing_key": routing_key
|
||||
}
|
||||
|
||||
self.channel.basic_publish(exchange=self.STATUS_EXCHANGE,
|
||||
properties=BasicProperties(
|
||||
headers=status_header),
|
||||
routing_key="",
|
||||
body=body_bytes)
|
||||
|
||||
|
||||
broker = BrokerClient()
|
||||
broker.request_write("/tmp/test", epics_pvs=["test"])
|
||||
broker.close()
|
||||
@@ -1,70 +0,0 @@
|
||||
from datetime import datetime
|
||||
import json
|
||||
|
||||
from pika import BlockingConnection, ConnectionParameters
|
||||
|
||||
DEFAULT_BROKER_URL = "127.0.0.1"
|
||||
STATUS_EXCHANGE = "status"
|
||||
|
||||
COLOR_END_MARKER = '\x1b[0m'
|
||||
|
||||
|
||||
def get_color_for_action(action):
|
||||
|
||||
color_mapping = {
|
||||
"write_request": "\x1b[34;1m",
|
||||
"write_start": "\x1b[1;33;1m",
|
||||
"write_finished": "\x1b[1;32;1m"
|
||||
}
|
||||
|
||||
return color_mapping.get(action, "")
|
||||
|
||||
|
||||
def on_status(channel, method_frame, header_frame, body):
|
||||
header = header_frame.headers
|
||||
request = json.loads(body.decode())
|
||||
|
||||
action = header["action"]
|
||||
source = header["source"]
|
||||
|
||||
action_output = get_color_for_action(action) + action + COLOR_END_MARKER
|
||||
time_output = datetime.utcnow().strftime("%Y%m%d-%H:%M:%S.%f")
|
||||
|
||||
print("[%s] %s %s" % (time_output, action_output, source))
|
||||
print(request)
|
||||
|
||||
|
||||
def connect_to_broker(broker_url):
|
||||
connection = BlockingConnection(ConnectionParameters(broker_url))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange=STATUS_EXCHANGE,
|
||||
exchange_type="fanout")
|
||||
queue = channel.queue_declare(queue="", exclusive=True).method.queue
|
||||
channel.queue_bind(queue=queue,
|
||||
exchange=STATUS_EXCHANGE)
|
||||
|
||||
channel.basic_consume(queue, on_status)
|
||||
|
||||
try:
|
||||
channel.start_consuming()
|
||||
except KeyboardInterrupt:
|
||||
channel.stop_consuming()
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Connect and listen to broker events.")
|
||||
|
||||
parser.add_argument('--broker_url', dest='broker_url',
|
||||
default=DEFAULT_BROKER_URL,
|
||||
help='RabbitMQ broker URL')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
connect_to_broker(broker_url=args.broker_url)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -1,34 +0,0 @@
|
||||
# Overview
|
||||
|
||||
Simple server to dump Epics Channel Access data to an HDF5 file.
|
||||
The server gets an http callback from the Broker whenever there was an acquisition.
|
||||
|
||||
|
||||
__Note: THIS IS/WAS A FRIDAY AFTERNOON HACK TO MAKE THE SWISSFEL DAQ WORK__
|
||||
|
||||
|
||||
The format of the request is as follows:
|
||||
```
|
||||
{
|
||||
'range': {
|
||||
'startPulseId': 100,
|
||||
'endPulseId': 120
|
||||
},
|
||||
|
||||
'parameters': {
|
||||
'general/created': 'test',
|
||||
'general/user': 'tester',
|
||||
'general/process': 'test_process',
|
||||
'general/instrument': 'mac',
|
||||
'output_file': '/bla/test.h5'}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
Right now this server needs to run on the same server than the
|
||||
|
||||
# Testing
|
||||
|
||||
```bash
|
||||
curl -XPUT -d '{"range":{"startPulseId": 7281433214, "endPulseId": 7281489688}, "parameters":{"output_file":"test.h5"}}' http://localhost:10200/notify
|
||||
```
|
||||
@@ -1,3 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
$PYTHON setup.py install # Python command to install the script
|
||||
@@ -1,20 +0,0 @@
|
||||
package:
|
||||
name: epics-writer
|
||||
version: 0.0.1
|
||||
|
||||
source:
|
||||
path: ..
|
||||
|
||||
build:
|
||||
noarch: python
|
||||
entry_points:
|
||||
- epics-writer = epics_writer.start:main
|
||||
|
||||
requirements:
|
||||
build:
|
||||
- python
|
||||
run:
|
||||
- python
|
||||
- data_api >=0.7.6
|
||||
- requests
|
||||
- pika
|
||||
@@ -1,99 +0,0 @@
|
||||
import json
|
||||
from pika import BlockingConnection, ConnectionParameters, BasicProperties
|
||||
|
||||
from epics_writer.writer import write_epics_pvs
|
||||
|
||||
|
||||
DEFAULT_BROKER_URL = "127.0.0.1"
|
||||
REQUEST_EXCHANGE = "request"
|
||||
STATUS_EXCHANGE = "status"
|
||||
QUEUE_NAME = "epics"
|
||||
OUTPUT_FILE_SUFFIX = ".PVCHANNELS.h5"
|
||||
|
||||
|
||||
def update_status(channel, body, action, file, message=None):
|
||||
status_header = {
|
||||
"action": action,
|
||||
"source": "epics_writer",
|
||||
"routing_key": QUEUE_NAME,
|
||||
"file": file,
|
||||
"message": message
|
||||
}
|
||||
|
||||
channel.basic_publish(exchange=STATUS_EXCHANGE,
|
||||
properties=BasicProperties(
|
||||
headers=status_header),
|
||||
routing_key="",
|
||||
body=body)
|
||||
|
||||
|
||||
def on_message(channel, method_frame, header_frame, body):
|
||||
|
||||
output_file = None
|
||||
|
||||
try:
|
||||
request = json.loads(body.decode())
|
||||
output_prefix = request["output_prefix"]
|
||||
start_pulse_id = 1
|
||||
stop_pulse_id = 10
|
||||
metadata = request["metadata"]
|
||||
epics_pvs = request["epics_pvs"]
|
||||
|
||||
output_file = output_prefix + OUTPUT_FILE_SUFFIX
|
||||
|
||||
update_status(channel, body, "write_start", output_file)
|
||||
|
||||
write_epics_pvs(output_file=output_file,
|
||||
start_pulse_id=start_pulse_id,
|
||||
stop_pulse_id=stop_pulse_id,
|
||||
metadata=metadata,
|
||||
epics_pvs=epics_pvs)
|
||||
|
||||
except Exception as e:
|
||||
channel.basic_reject(delivery_tag=method_frame.delivery_tag,
|
||||
requeue=False)
|
||||
|
||||
update_status(channel, body, "write_rejected", output_file, str(e))
|
||||
|
||||
else:
|
||||
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
|
||||
|
||||
update_status(channel, body, "write_finished", output_file)
|
||||
|
||||
|
||||
def connect_to_broker(broker_url):
|
||||
connection = BlockingConnection(ConnectionParameters(broker_url))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange=STATUS_EXCHANGE,
|
||||
exchange_type="fanout")
|
||||
channel.exchange_declare(exchange=REQUEST_EXCHANGE,
|
||||
exchange_type="topic")
|
||||
|
||||
channel.queue_declare(queue=QUEUE_NAME, auto_delete=True)
|
||||
channel.queue_bind(queue=QUEUE_NAME,
|
||||
exchange=REQUEST_EXCHANGE,
|
||||
routing_key="*.%s.*" % QUEUE_NAME)
|
||||
channel.basic_qos(prefetch_count=1)
|
||||
channel.basic_consume(QUEUE_NAME, on_message)
|
||||
|
||||
try:
|
||||
channel.start_consuming()
|
||||
except KeyboardInterrupt:
|
||||
channel.stop_consuming()
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description='Epics HDF5 writer')
|
||||
parser.add_argument('--broker_url', dest='broker_url',
|
||||
default=DEFAULT_BROKER_URL,
|
||||
help='RabbitMQ broker URL')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
connect_to_broker(broker_url=args.broker_url)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -1,120 +0,0 @@
|
||||
import datetime
|
||||
import time
|
||||
import logging
|
||||
import requests
|
||||
import data_api
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DATA_API_QUERY_URL = "https://data-api.psi.ch/sf/query"
|
||||
|
||||
|
||||
def write_epics_pvs(output_file, start_pulse_id, stop_pulse_id, metadata, epics_pvs):
|
||||
|
||||
start_date = get_pulse_id_date_mapping(start_pulse_id)
|
||||
stop_date = get_pulse_id_date_mapping(stop_pulse_id)
|
||||
|
||||
data = get_data(epics_pvs, start=start_date, stop=stop_date)
|
||||
# TODO: Merge metadata to data.
|
||||
|
||||
if data:
|
||||
logger.info("Persist data to hdf5 file")
|
||||
data_api.to_hdf5(data, output_file, overwrite=True, compression=None, shuffle=False)
|
||||
else:
|
||||
logger.error("No data retrieved")
|
||||
open(output_file + "_NO_DATA", 'a').close()
|
||||
|
||||
|
||||
def get_data(channel_list, start=None, stop=None, base_url=None):
|
||||
logger.info("Requesting range %s to %s for channels: " % (start, stop, channel_list))
|
||||
|
||||
query = {"range": {"startDate": datetime.datetime.isoformat(start),
|
||||
"endDate": datetime.datetime.isoformat(stop),
|
||||
"startExpansion": True},
|
||||
"channels": channel_list,
|
||||
"fields": ["pulseId", "globalSeconds", "globalDate", "value",
|
||||
"eventCount"]}
|
||||
logger.debug(query)
|
||||
|
||||
response = requests.post(DATA_API_QUERY_URL, json=query)
|
||||
|
||||
# Check for successful return of data
|
||||
if response.status_code != 200:
|
||||
logger.info("Data retrievali failed, sleep for another time and try")
|
||||
|
||||
itry = 0
|
||||
while itry < 5:
|
||||
itry += 1
|
||||
time.sleep(60)
|
||||
response = requests.post(DATA_API_QUERY_URL, json=query)
|
||||
if response.status_code == 200:
|
||||
break
|
||||
|
||||
logger.info("Data retrieval failed, post attempt %d" % itry)
|
||||
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError("Unable to retrieve data from server: ", response)
|
||||
|
||||
logger.info("Data retieval is successful")
|
||||
|
||||
data = response.json()
|
||||
|
||||
return data_api.client._build_pandas_data_frame(data, index_field="globalDate")
|
||||
|
||||
|
||||
def get_pulse_id_date_mapping(pulse_id):
|
||||
# See https://jira.psi.ch/browse/ATEST-897 for more details ...
|
||||
logger.info("Retrieve pulse-id/date mapping for pulse_id %s" % pulse_id)
|
||||
|
||||
try:
|
||||
|
||||
query = {"range": {"startPulseId": 0,
|
||||
"endPulseId": pulse_id},
|
||||
"limit": 1,
|
||||
"ordering": "desc",
|
||||
"channels": ["SIN-CVME-TIFGUN-EVR0:BUNCH-1-OK"],
|
||||
"fields": ["pulseId", "globalDate"]}
|
||||
|
||||
for c in range(10):
|
||||
|
||||
response = requests.post("https://data-api.psi.ch/sf/query", json=query)
|
||||
|
||||
# Check for successful return of data
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError("Unable to retrieve data from server: ", response)
|
||||
|
||||
data = response.json()
|
||||
|
||||
if len(data[0]["data"]) == 0 or not "pulseId" in data[0]["data"][0]:
|
||||
raise RuntimeError(
|
||||
"Didn't get good responce from data_api : %s " % data)
|
||||
|
||||
if not pulse_id == data[0]["data"][0]["pulseId"]:
|
||||
logger.info("retrieval failed")
|
||||
if c == 0:
|
||||
ref_date = data[0]["data"][0]["globalDate"]
|
||||
ref_date = dateutil.parser.parse(ref_date)
|
||||
|
||||
now_date = datetime.datetime.now()
|
||||
now_date = pytz.timezone('Europe/Zurich').localize(
|
||||
now_date)
|
||||
|
||||
check_date = ref_date + datetime.timedelta(
|
||||
seconds=24) # 20 seconds should be enough
|
||||
delta_date = check_date - now_date
|
||||
|
||||
s = delta_date.seconds
|
||||
logger.info("retry in " + str(s) + " seconds ")
|
||||
if not s <= 0:
|
||||
time.sleep(s)
|
||||
continue
|
||||
|
||||
raise RuntimeError('Unable to retrieve mapping')
|
||||
|
||||
date = data[0]["data"][0]["globalDate"]
|
||||
date = dateutil.parser.parse(date)
|
||||
dates.append(date)
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
raise RuntimeError('Unable to retrieve pulse_id date mapping') from e
|
||||
@@ -1,15 +0,0 @@
|
||||
from setuptools import setup
|
||||
|
||||
setup(
|
||||
name="cadump",
|
||||
version="0.0.12",
|
||||
author="Paul Scherrer Institute",
|
||||
author_email="daq@psi.ch",
|
||||
description="Interface to dump data from archiver/databuffer",
|
||||
packages=["cadump"],
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'cadump_server = cadump.cadump:main',
|
||||
],
|
||||
}
|
||||
)
|
||||
@@ -1,6 +0,0 @@
|
||||
S10-CPCL-VM1MGC:LOAD
|
||||
|
||||
ONE
|
||||
TWO
|
||||
# Comment that need to be removed
|
||||
THREE # one more comment at the end that need to be removed
|
||||
@@ -1,34 +0,0 @@
|
||||
import unittest
|
||||
from unittest import TestCase
|
||||
from cadump import cadump
|
||||
import logging
|
||||
|
||||
class TestDownloadData(TestCase):
|
||||
|
||||
def test_download_data(self):
|
||||
config = {
|
||||
'range': {
|
||||
'startPulseId': 9618913001,
|
||||
'endPulseId': 9618923000
|
||||
},
|
||||
|
||||
'parameters': {
|
||||
'general/created': 'test',
|
||||
'general/user': 'tester',
|
||||
'general/process': 'test_process',
|
||||
'general/instrument': 'mac',
|
||||
'output_file': 'test.h5'} # this is usually the full path
|
||||
}
|
||||
|
||||
cadump.base_url = "https://data-api.psi.ch/sf"
|
||||
cadump.download_data(config)
|
||||
# self.fail()
|
||||
|
||||
def test_read_channels(self):
|
||||
channels = cadump.read_channels("channels.txt")
|
||||
logging.info(channels)
|
||||
self.assertEqual(len(channels), 4)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -1,26 +0,0 @@
|
||||
|
||||
find_package(MPI REQUIRED)
|
||||
# Because of openmpi.
|
||||
add_definitions(-DOMPI_SKIP_MPICXX)
|
||||
|
||||
file(GLOB SOURCES
|
||||
src/*.cpp)
|
||||
|
||||
add_library(jf-live-daq-lib STATIC ${SOURCES})
|
||||
target_include_directories(jf-live-daq-lib
|
||||
PUBLIC include/
|
||||
SYSTEM ${MPI_INCLUDE_PATH})
|
||||
|
||||
target_link_libraries(jf-live-daq-lib
|
||||
external
|
||||
core-buffer-lib
|
||||
${MPI_LIBRARIES})
|
||||
|
||||
add_executable(jf-live-daq src/main.cpp)
|
||||
set_target_properties(jf-live-daq PROPERTIES OUTPUT_NAME jf_live_daq)
|
||||
target_link_libraries(jf-live-daq
|
||||
jf-live-daq-lib
|
||||
)
|
||||
|
||||
enable_testing()
|
||||
add_subdirectory(test/)
|
||||
@@ -1,49 +0,0 @@
|
||||
#include <stdio.h>
|
||||
#include <mpi.h>
|
||||
|
||||
void receive()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
void assemble()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
void write()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
// Initialize the MPI environment
|
||||
MPI_Init(NULL, NULL);
|
||||
|
||||
// Get the number of processes
|
||||
int world_size;
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
|
||||
|
||||
// Get the rank of the process
|
||||
int world_rank;
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
|
||||
|
||||
// Get the name of the processor
|
||||
char processor_name[MPI_MAX_PROCESSOR_NAME];
|
||||
int name_len;
|
||||
MPI_Get_processor_name(processor_name, &name_len);
|
||||
|
||||
const int n_modules = 16;
|
||||
|
||||
if (world_rank == 0) {
|
||||
assemble();
|
||||
} else if (world_rank <= n_modules) {
|
||||
receive();
|
||||
} else {
|
||||
write();
|
||||
}
|
||||
|
||||
// Finalize the MPI environment.
|
||||
MPI_Finalize();
|
||||
}
|
||||
@@ -1,10 +0,0 @@
|
||||
add_executable(jf-live-daq-tests main.cpp)
|
||||
|
||||
target_link_libraries(jf-live-daq-tests
|
||||
jf-live-daq-lib
|
||||
hdf5
|
||||
hdf5_hl
|
||||
hdf5_cpp
|
||||
zmq
|
||||
gtest
|
||||
)
|
||||
@@ -1,9 +0,0 @@
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
|
||||
using namespace std;
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
@@ -1,22 +0,0 @@
|
||||
file(GLOB SOURCES
|
||||
src/*.cpp)
|
||||
|
||||
add_library(jf-live-writer-lib STATIC ${SOURCES})
|
||||
target_include_directories(jf-live-writer-lib PUBLIC include/)
|
||||
target_link_libraries(jf-live-writer-lib
|
||||
external
|
||||
core-buffer-lib)
|
||||
|
||||
add_executable(jf-live-writer src/main.cpp)
|
||||
set_target_properties(jf-live-writer PROPERTIES OUTPUT_NAME jf_live_writer)
|
||||
target_link_libraries(jf-live-writer
|
||||
jf-live-writer-lib
|
||||
sf-writer-lib
|
||||
hdf5
|
||||
hdf5_hl
|
||||
hdf5_cpp
|
||||
pthread
|
||||
)
|
||||
|
||||
enable_testing()
|
||||
add_subdirectory(test/)
|
||||
@@ -1,28 +0,0 @@
|
||||
#ifndef SF_DAQ_BUFFER_BINARYREADER_HPP
|
||||
#define SF_DAQ_BUFFER_BINARYREADER_HPP
|
||||
|
||||
|
||||
#include <formats.hpp>
|
||||
|
||||
class BinaryReader {
|
||||
|
||||
const std::string detector_folder_;
|
||||
const std::string module_name_;
|
||||
|
||||
std::string current_input_file_;
|
||||
int input_file_fd_;
|
||||
|
||||
void open_file(const std::string& filename);
|
||||
void close_current_file();
|
||||
|
||||
public:
|
||||
BinaryReader(const std::string &detector_folder,
|
||||
const std::string &module_name);
|
||||
|
||||
~BinaryReader();
|
||||
|
||||
void get_frame(const uint64_t pulse_id, BufferBinaryFormat *buffer);
|
||||
};
|
||||
|
||||
|
||||
#endif //SF_DAQ_BUFFER_BINARYREADER_HPP
|
||||
@@ -1,49 +0,0 @@
|
||||
#ifndef SFWRITER_HPP
|
||||
#define SFWRITER_HPP
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <H5Cpp.h>
|
||||
|
||||
#include "LiveImageAssembler.hpp"
|
||||
|
||||
const auto& H5_UINT64 = H5::PredType::NATIVE_UINT64;
|
||||
const auto& H5_UINT32 = H5::PredType::NATIVE_UINT32;
|
||||
const auto& H5_UINT16 = H5::PredType::NATIVE_UINT16;
|
||||
const auto& H5_UINT8 = H5::PredType::NATIVE_UINT8;
|
||||
|
||||
class JFH5LiveWriter {
|
||||
|
||||
const std::string detector_name_;
|
||||
const size_t n_modules_;
|
||||
const size_t n_pulses_;
|
||||
|
||||
size_t write_index_;
|
||||
|
||||
H5::H5File file_;
|
||||
H5::DataSet image_dataset_;
|
||||
|
||||
uint64_t* b_pulse_id_;
|
||||
uint64_t* b_frame_index_;
|
||||
uint32_t* b_daq_rec_;
|
||||
uint8_t* b_is_good_frame_ ;
|
||||
|
||||
void init_file(const std::string &output_file);
|
||||
void write_dataset(const std::string name,
|
||||
const void *buffer,
|
||||
const H5::PredType &type);
|
||||
void write_metadata();
|
||||
std::string get_detector_name(const std::string& detector_folder);
|
||||
|
||||
void close_file();
|
||||
|
||||
public:
|
||||
JFH5LiveWriter(const std::string& output_file,
|
||||
const std::string& detector_folder,
|
||||
const size_t n_modules,
|
||||
const size_t n_pulses);
|
||||
~JFH5LiveWriter();
|
||||
void write(const ImageMetadata* metadata, const char* data);
|
||||
};
|
||||
|
||||
#endif //SFWRITER_HPP
|
||||
@@ -1,51 +0,0 @@
|
||||
#ifndef SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP
|
||||
#define SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP
|
||||
|
||||
#include <atomic>
|
||||
|
||||
#include "buffer_config.hpp"
|
||||
#include "formats.hpp"
|
||||
|
||||
const uint64_t IA_EMPTY_SLOT_VALUE = 0;
|
||||
|
||||
struct ImageMetadata
|
||||
{
|
||||
uint64_t pulse_id;
|
||||
uint64_t frame_index;
|
||||
uint32_t daq_rec;
|
||||
uint8_t is_good_image;
|
||||
};
|
||||
|
||||
class LiveImageAssembler {
|
||||
const size_t n_modules_;
|
||||
const size_t image_buffer_slot_n_bytes_;
|
||||
|
||||
char* image_buffer_;
|
||||
ImageMetadata* image_meta_buffer_;
|
||||
ModuleFrame* frame_meta_buffer_;
|
||||
std::atomic_int* buffer_status_;
|
||||
std::atomic_uint64_t* buffer_pulse_id_;
|
||||
|
||||
size_t get_data_offset(const uint64_t slot_id, const int i_module);
|
||||
size_t get_frame_metadata_offset(const uint64_t slot_id, const int i_module);
|
||||
|
||||
public:
|
||||
LiveImageAssembler(const size_t n_modules);
|
||||
|
||||
virtual ~LiveImageAssembler();
|
||||
|
||||
bool is_slot_free(const uint64_t pulse_id);
|
||||
bool is_slot_full(const uint64_t pulse_id);
|
||||
|
||||
void process(const uint64_t pulse_id,
|
||||
const int i_module,
|
||||
const BufferBinaryFormat* block_buffer);
|
||||
|
||||
void free_slot(const uint64_t pulse_id);
|
||||
|
||||
ImageMetadata* get_metadata_buffer(const uint64_t pulse_id);
|
||||
char* get_data_buffer(const uint64_t pulse_id);
|
||||
};
|
||||
|
||||
|
||||
#endif //SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP
|
||||
@@ -1,9 +0,0 @@
|
||||
#include <cstddef>
|
||||
|
||||
namespace live_writer_config
|
||||
{
|
||||
// MS to retry reading from the image assembler.
|
||||
const size_t ASSEMBLER_RETRY_MS = 5;
|
||||
// Number of slots in the reconstruction buffer.
|
||||
const size_t WRITER_IA_N_SLOTS = 200;
|
||||
}
|
||||
@@ -1,102 +0,0 @@
|
||||
#include "BinaryReader.hpp"
|
||||
|
||||
#include <unistd.h>
|
||||
#include <sstream>
|
||||
#include <cstring>
|
||||
#include <fcntl.h>
|
||||
|
||||
#include "BufferUtils.hpp"
|
||||
#include "buffer_config.hpp"
|
||||
|
||||
using namespace std;
|
||||
using namespace buffer_config;
|
||||
|
||||
BinaryReader::BinaryReader(
|
||||
const std::string &detector_folder,
|
||||
const std::string &module_name) :
|
||||
detector_folder_(detector_folder),
|
||||
module_name_(module_name),
|
||||
current_input_file_(""),
|
||||
input_file_fd_(-1)
|
||||
{}
|
||||
|
||||
BinaryReader::~BinaryReader()
|
||||
{
|
||||
close_current_file();
|
||||
}
|
||||
|
||||
void BinaryReader::get_frame(
|
||||
const uint64_t pulse_id, BufferBinaryFormat* buffer)
|
||||
{
|
||||
|
||||
auto current_frame_file = BufferUtils::get_filename(
|
||||
detector_folder_, module_name_, pulse_id);
|
||||
|
||||
if (current_frame_file != current_input_file_) {
|
||||
open_file(current_frame_file);
|
||||
}
|
||||
|
||||
size_t file_index = BufferUtils::get_file_frame_index(pulse_id);
|
||||
size_t n_bytes_offset = file_index * sizeof(BufferBinaryFormat);
|
||||
|
||||
auto lseek_result = lseek(input_file_fd_, n_bytes_offset, SEEK_SET);
|
||||
if (lseek_result < 0) {
|
||||
stringstream err_msg;
|
||||
|
||||
err_msg << "[BinaryReader::get_frame]";
|
||||
err_msg << " Error while lseek on file ";
|
||||
err_msg << current_input_file_ << " for n_bytes_offset ";
|
||||
err_msg << n_bytes_offset << ": " << strerror(errno) << endl;
|
||||
|
||||
throw runtime_error(err_msg.str());
|
||||
}
|
||||
|
||||
auto n_bytes = ::read(input_file_fd_, buffer, sizeof(BufferBinaryFormat));
|
||||
|
||||
if (n_bytes < sizeof(BufferBinaryFormat)) {
|
||||
stringstream err_msg;
|
||||
|
||||
err_msg << "[BinaryReader::get_block]";
|
||||
err_msg << " Error while reading from file ";
|
||||
err_msg << current_input_file_ << ": " << strerror(errno) << endl;
|
||||
|
||||
throw runtime_error(err_msg.str());
|
||||
}
|
||||
}
|
||||
|
||||
void BinaryReader::open_file(const std::string& filename)
|
||||
{
|
||||
close_current_file();
|
||||
|
||||
input_file_fd_ = open(filename.c_str(), O_RDONLY);
|
||||
|
||||
if (input_file_fd_ < 0) {
|
||||
stringstream err_msg;
|
||||
|
||||
err_msg << "[BinaryReader::open_file]";
|
||||
err_msg << " Cannot open file " << filename << ": ";
|
||||
err_msg << strerror(errno) << endl;
|
||||
|
||||
throw runtime_error(err_msg.str());
|
||||
}
|
||||
|
||||
current_input_file_ = filename;
|
||||
}
|
||||
|
||||
void BinaryReader::close_current_file()
|
||||
{
|
||||
if (input_file_fd_ != -1) {
|
||||
if (close(input_file_fd_) < 0) {
|
||||
stringstream err_msg;
|
||||
|
||||
err_msg << "[BinaryWriter::close_current_file]";
|
||||
err_msg << " Error while closing file " << current_input_file_;
|
||||
err_msg << ": " << strerror(errno) << endl;
|
||||
|
||||
throw runtime_error(err_msg.str());
|
||||
}
|
||||
|
||||
input_file_fd_ = -1;
|
||||
current_input_file_ = "";
|
||||
}
|
||||
}
|
||||
@@ -1,133 +0,0 @@
|
||||
#include "JFH5LiveWriter.hpp"
|
||||
|
||||
#include <cstring>
|
||||
#include <hdf5_hl.h>
|
||||
|
||||
|
||||
#include "buffer_config.hpp"
|
||||
|
||||
using namespace std;
|
||||
using namespace buffer_config;
|
||||
|
||||
JFH5LiveWriter::JFH5LiveWriter(const string& output_file,
|
||||
const string& detector_folder,
|
||||
const size_t n_modules,
|
||||
const size_t n_pulses) :
|
||||
detector_name_(get_detector_name(detector_folder)),
|
||||
n_modules_(n_modules),
|
||||
n_pulses_(n_pulses),
|
||||
write_index_(0)
|
||||
{
|
||||
b_pulse_id_ = new uint64_t[n_pulses_];
|
||||
b_frame_index_= new uint64_t[n_pulses_];
|
||||
b_daq_rec_ = new uint32_t[n_pulses_];
|
||||
b_is_good_frame_ = new uint8_t[n_pulses_];
|
||||
|
||||
init_file(output_file);
|
||||
}
|
||||
|
||||
void JFH5LiveWriter::init_file(const string& output_file)
|
||||
{
|
||||
file_ = H5::H5File(output_file, H5F_ACC_TRUNC);
|
||||
file_.createGroup("/data");
|
||||
file_.createGroup("/data/" + detector_name_);
|
||||
|
||||
H5::DataSpace att_space(H5S_SCALAR);
|
||||
H5::DataType data_type = H5::StrType(0, H5T_VARIABLE);
|
||||
|
||||
file_.createGroup("/general");
|
||||
auto detector_dataset = file_.createDataSet(
|
||||
"/general/detector_name", data_type ,att_space);
|
||||
|
||||
detector_dataset.write(detector_name_, data_type);
|
||||
|
||||
hsize_t image_dataset_dims[3] =
|
||||
{n_pulses_, n_modules_ * MODULE_Y_SIZE, MODULE_X_SIZE};
|
||||
|
||||
H5::DataSpace image_dataspace(3, image_dataset_dims);
|
||||
|
||||
hsize_t image_dataset_chunking[3] =
|
||||
{1, n_modules_ * MODULE_Y_SIZE, MODULE_X_SIZE};
|
||||
H5::DSetCreatPropList image_dataset_properties;
|
||||
image_dataset_properties.setChunk(3, image_dataset_chunking);
|
||||
|
||||
image_dataset_ = file_.createDataSet(
|
||||
"/data/" + detector_name_ + "/data",
|
||||
H5_UINT16,
|
||||
image_dataspace,
|
||||
image_dataset_properties);
|
||||
}
|
||||
|
||||
|
||||
std::string JFH5LiveWriter::get_detector_name(const string& detector_folder)
|
||||
{
|
||||
size_t last_separator;
|
||||
if ((last_separator = detector_folder.rfind("/")) == string::npos) {
|
||||
return detector_folder;
|
||||
}
|
||||
|
||||
return detector_folder.substr(last_separator + 1);
|
||||
}
|
||||
|
||||
JFH5LiveWriter::~JFH5LiveWriter()
|
||||
{
|
||||
close_file();
|
||||
|
||||
delete[] b_pulse_id_;
|
||||
delete[] b_frame_index_;
|
||||
delete[] b_daq_rec_;
|
||||
delete[] b_is_good_frame_;
|
||||
}
|
||||
|
||||
void JFH5LiveWriter::write_dataset(
|
||||
const string name, const void* buffer, const H5::PredType& type)
|
||||
{
|
||||
hsize_t b_m_dims[] = {n_pulses_};
|
||||
H5::DataSpace b_m_space (1, b_m_dims);
|
||||
|
||||
hsize_t f_m_dims[] = {n_pulses_, 1};
|
||||
H5::DataSpace f_m_space(2, f_m_dims);
|
||||
|
||||
auto complete_name = "/data/" + detector_name_ + "/" + name;
|
||||
auto dataset = file_.createDataSet(complete_name, type, f_m_space);
|
||||
|
||||
dataset.write(buffer, type, b_m_space, f_m_space);
|
||||
|
||||
dataset.close();
|
||||
}
|
||||
|
||||
void JFH5LiveWriter::write_metadata()
|
||||
{
|
||||
write_dataset("pulse_id", &b_pulse_id_, H5_UINT64);
|
||||
write_dataset("frame_index", &b_frame_index_, H5_UINT64);
|
||||
write_dataset("daq_rec", &b_daq_rec_, H5_UINT32);
|
||||
write_dataset("is_good_frame", &b_is_good_frame_, H5_UINT8);
|
||||
}
|
||||
|
||||
void JFH5LiveWriter::close_file()
|
||||
{
|
||||
if (file_.getId() == -1) {
|
||||
return;
|
||||
}
|
||||
|
||||
image_dataset_.close();
|
||||
|
||||
write_metadata();
|
||||
|
||||
file_.close();
|
||||
}
|
||||
|
||||
void JFH5LiveWriter::write(const ImageMetadata* metadata, const char* data)
|
||||
{
|
||||
hsize_t offset[] = {write_index_, 0, 0};
|
||||
|
||||
H5DOwrite_chunk(image_dataset_.getId(), H5P_DEFAULT, 0,
|
||||
offset, MODULE_N_BYTES * n_modules_, data);
|
||||
|
||||
b_pulse_id_[write_index_] = metadata->pulse_id;
|
||||
b_frame_index_[write_index_] = metadata->frame_index;
|
||||
b_daq_rec_[write_index_] = metadata->daq_rec;
|
||||
b_is_good_frame_[write_index_] = metadata->is_good_image;
|
||||
|
||||
write_index_++;
|
||||
}
|
||||
@@ -1,159 +0,0 @@
|
||||
#include <cstring>
|
||||
|
||||
#include "LiveImageAssembler.hpp"
|
||||
#include "buffer_config.hpp"
|
||||
#include "live_writer_config.hpp"
|
||||
|
||||
using namespace std;
|
||||
using namespace buffer_config;
|
||||
using namespace live_writer_config;
|
||||
|
||||
LiveImageAssembler::LiveImageAssembler(const size_t n_modules) :
|
||||
n_modules_(n_modules),
|
||||
image_buffer_slot_n_bytes_(MODULE_N_BYTES * n_modules_)
|
||||
{
|
||||
image_buffer_ = new char[WRITER_IA_N_SLOTS * image_buffer_slot_n_bytes_];
|
||||
image_meta_buffer_ = new ImageMetadata[WRITER_IA_N_SLOTS];
|
||||
frame_meta_buffer_ = new ModuleFrame[WRITER_IA_N_SLOTS * n_modules];
|
||||
buffer_status_ = new atomic_int[WRITER_IA_N_SLOTS];
|
||||
buffer_pulse_id_ = new atomic_uint64_t[WRITER_IA_N_SLOTS];
|
||||
|
||||
for (size_t i=0; i < WRITER_IA_N_SLOTS; i++) {
|
||||
free_slot(i);
|
||||
}
|
||||
}
|
||||
|
||||
LiveImageAssembler::~LiveImageAssembler()
|
||||
{
|
||||
delete[] image_buffer_;
|
||||
delete[] image_meta_buffer_;
|
||||
}
|
||||
|
||||
bool LiveImageAssembler::is_slot_free(const uint64_t pulse_id)
|
||||
{
|
||||
auto slot_id = pulse_id % WRITER_IA_N_SLOTS;
|
||||
|
||||
uint64_t slot_pulse_id = IA_EMPTY_SLOT_VALUE;
|
||||
if (buffer_pulse_id_[slot_id].compare_exchange_strong(
|
||||
slot_pulse_id, pulse_id)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
auto is_free = buffer_status_[slot_id].load(memory_order_relaxed) > 0;
|
||||
return is_free && (slot_pulse_id == pulse_id);
|
||||
}
|
||||
|
||||
bool LiveImageAssembler::is_slot_full(const uint64_t pulse_id)
|
||||
{
|
||||
auto slot_id = pulse_id % WRITER_IA_N_SLOTS;
|
||||
return buffer_status_[slot_id].load(memory_order_relaxed) == 0;
|
||||
}
|
||||
|
||||
size_t LiveImageAssembler::get_data_offset(
|
||||
const uint64_t slot_id, const int i_module)
|
||||
{
|
||||
size_t slot_i_offset = slot_id * image_buffer_slot_n_bytes_;
|
||||
size_t module_i_offset = i_module * MODULE_N_BYTES;
|
||||
|
||||
return slot_i_offset + module_i_offset;
|
||||
}
|
||||
|
||||
size_t LiveImageAssembler::get_frame_metadata_offset(
|
||||
const uint64_t slot_id, const int i_module)
|
||||
{
|
||||
size_t slot_m_offset = slot_id * n_modules_;
|
||||
size_t module_m_offset = i_module;
|
||||
|
||||
return slot_m_offset + module_m_offset;
|
||||
}
|
||||
|
||||
void LiveImageAssembler::process(
|
||||
const uint64_t pulse_id,
|
||||
const int i_module,
|
||||
const BufferBinaryFormat* file_buffer)
|
||||
{
|
||||
const auto slot_id = pulse_id % WRITER_IA_N_SLOTS;
|
||||
|
||||
auto frame_meta_offset = get_frame_metadata_offset(slot_id, i_module);
|
||||
auto image_offset = get_data_offset(slot_id, i_module);
|
||||
|
||||
memcpy(
|
||||
&(frame_meta_buffer_[frame_meta_offset]),
|
||||
&(file_buffer->metadata),
|
||||
sizeof(file_buffer->metadata));
|
||||
|
||||
memcpy(
|
||||
image_buffer_ + image_offset,
|
||||
&(file_buffer->data[0]),
|
||||
MODULE_N_BYTES);
|
||||
|
||||
buffer_status_[slot_id].fetch_sub(1, memory_order_relaxed);
|
||||
}
|
||||
|
||||
void LiveImageAssembler::free_slot(const uint64_t pulse_id)
|
||||
{
|
||||
auto slot_id = pulse_id % WRITER_IA_N_SLOTS;
|
||||
buffer_status_[slot_id].store(n_modules_, memory_order_relaxed);
|
||||
buffer_pulse_id_[slot_id].store(IA_EMPTY_SLOT_VALUE, memory_order_relaxed);
|
||||
}
|
||||
|
||||
ImageMetadata* LiveImageAssembler::get_metadata_buffer(const uint64_t pulse_id)
|
||||
{
|
||||
const auto slot_id = pulse_id % WRITER_IA_N_SLOTS;
|
||||
|
||||
ImageMetadata& image_meta = image_meta_buffer_[slot_id];
|
||||
|
||||
auto frame_meta_offset = get_frame_metadata_offset(slot_id, 0);
|
||||
|
||||
auto is_pulse_init = false;
|
||||
image_meta.is_good_image = 1;
|
||||
image_meta.pulse_id = 0;
|
||||
|
||||
for (size_t i_module=0; i_module < n_modules_; i_module++) {
|
||||
|
||||
auto& frame_meta = frame_meta_buffer_[frame_meta_offset];
|
||||
frame_meta_offset += 1;
|
||||
|
||||
auto is_good_frame =
|
||||
frame_meta.n_recv_packets == JF_N_PACKETS_PER_FRAME;
|
||||
|
||||
if (!is_good_frame) {
|
||||
image_meta.pulse_id = 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!is_pulse_init) {
|
||||
image_meta.pulse_id = frame_meta.pulse_id;
|
||||
image_meta.frame_index = frame_meta.frame_index;
|
||||
image_meta.daq_rec = frame_meta.daq_rec;
|
||||
|
||||
is_pulse_init = true;
|
||||
}
|
||||
|
||||
if (image_meta.is_good_image == 1) {
|
||||
if (frame_meta.pulse_id != image_meta.pulse_id) {
|
||||
image_meta.is_good_image = 0;
|
||||
}
|
||||
|
||||
if (frame_meta.frame_index != image_meta.frame_index) {
|
||||
image_meta.is_good_image = 0;
|
||||
}
|
||||
|
||||
if (frame_meta.daq_rec != image_meta.daq_rec) {
|
||||
image_meta.is_good_image = 0;
|
||||
}
|
||||
|
||||
if (frame_meta.n_recv_packets != JF_N_PACKETS_PER_FRAME) {
|
||||
image_meta.is_good_image = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &image_meta;
|
||||
}
|
||||
|
||||
char* LiveImageAssembler::get_data_buffer(const uint64_t pulse_id)
|
||||
{
|
||||
auto slot_id = pulse_id % WRITER_IA_N_SLOTS;
|
||||
return image_buffer_ + (slot_id * image_buffer_slot_n_bytes_);
|
||||
}
|
||||
@@ -1,195 +0,0 @@
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <vector>
|
||||
|
||||
#include "zmq.h"
|
||||
#include "live_writer_config.hpp"
|
||||
#include "buffer_config.hpp"
|
||||
#include "bitshuffle/bitshuffle.h"
|
||||
#include "JFH5LiveWriter.hpp"
|
||||
#include "LiveImageAssembler.hpp"
|
||||
#include "BinaryReader.hpp"
|
||||
|
||||
using namespace std;
|
||||
using namespace chrono;
|
||||
using namespace buffer_config;
|
||||
using namespace live_writer_config;
|
||||
|
||||
void read_buffer(
|
||||
const string detector_folder,
|
||||
const string module_name,
|
||||
const int i_module,
|
||||
const vector<uint64_t>& pulse_ids_to_write,
|
||||
LiveImageAssembler& image_assembler,
|
||||
void* ctx)
|
||||
{
|
||||
BinaryReader reader(detector_folder, module_name);
|
||||
auto frame_buffer = new BufferBinaryFormat();
|
||||
|
||||
void* socket = zmq_socket(ctx, ZMQ_SUB);
|
||||
if (socket == nullptr) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
int rcvhwm = 100;
|
||||
if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
int linger = 0;
|
||||
if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
// In milliseconds.
|
||||
int rcvto = 2000;
|
||||
if (zmq_setsockopt(socket, ZMQ_RCVTIMEO, &rcvto, sizeof(rcvto)) != 0 ){
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
if (zmq_connect(socket, "tcp://127.0.0.1:51234") != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
if (zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
const uint64_t PULSE_ID_DELAY = 100;
|
||||
|
||||
uint64_t live_pulse_id = pulse_ids_to_write.front();
|
||||
for (uint64_t pulse_id:pulse_ids_to_write) {
|
||||
|
||||
while(!image_assembler.is_slot_free(pulse_id)) {
|
||||
this_thread::sleep_for(chrono::milliseconds(ASSEMBLER_RETRY_MS));
|
||||
}
|
||||
|
||||
auto start_time = steady_clock::now();
|
||||
|
||||
// Enforce a delay of 1 second for writing.
|
||||
while (live_pulse_id - pulse_id < PULSE_ID_DELAY) {
|
||||
if (zmq_recv(socket, &live_pulse_id,
|
||||
sizeof(live_pulse_id), 0) == -1) {
|
||||
if (errno == EAGAIN) {
|
||||
throw runtime_error("Did not receive pulse_id in time.");
|
||||
} else {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
reader.get_frame(pulse_id, frame_buffer);
|
||||
|
||||
auto end_time = steady_clock::now();
|
||||
uint64_t read_us_duration = duration_cast<microseconds>(
|
||||
end_time-start_time).count();
|
||||
|
||||
start_time = steady_clock::now();
|
||||
|
||||
image_assembler.process(pulse_id, i_module, frame_buffer);
|
||||
|
||||
end_time = steady_clock::now();
|
||||
uint64_t compose_us_duration = duration_cast<microseconds>(
|
||||
end_time-start_time).count();
|
||||
|
||||
cout << "sf_writer:avg_read_us ";
|
||||
cout << read_us_duration / BUFFER_BLOCK_SIZE << endl;
|
||||
cout << "sf_writer:avg_assemble_us ";
|
||||
cout << compose_us_duration / BUFFER_BLOCK_SIZE << endl;
|
||||
}
|
||||
|
||||
delete frame_buffer;
|
||||
}
|
||||
|
||||
int main (int argc, char *argv[])
|
||||
{
|
||||
if (argc != 7) {
|
||||
cout << endl;
|
||||
cout << "Usage: sf_writer [output_file] [detector_folder] [n_modules]";
|
||||
cout << " [start_pulse_id] [n_pulses] [pulse_id_step]";
|
||||
cout << endl;
|
||||
cout << "\toutput_file: Complete path to the output file." << endl;
|
||||
cout << "\tdetector_folder: Absolute path to detector buffer." << endl;
|
||||
cout << "\tn_modules: number of modules" << endl;
|
||||
cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl;
|
||||
cout << "\tn_pulses: Number of pulses to write." << endl;
|
||||
cout << "\tpulse_id_step: 1==100Hz, 2==50hz, 4==25Hz.." << endl;
|
||||
cout << endl;
|
||||
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
string output_file = string(argv[1]);
|
||||
const string detector_folder = string(argv[2]);
|
||||
size_t n_modules = atoi(argv[3]);
|
||||
uint64_t start_pulse_id = (uint64_t) atoll(argv[4]);
|
||||
size_t n_pulses = (size_t) atoll(argv[5]);
|
||||
int pulse_id_step = atoi(argv[6]);
|
||||
|
||||
std::vector<uint64_t> pulse_ids_to_write;
|
||||
uint64_t i_pulse_id = start_pulse_id;
|
||||
for (size_t i=0; i<n_pulses; i++) {
|
||||
pulse_ids_to_write.push_back(i_pulse_id);
|
||||
i_pulse_id += pulse_id_step;
|
||||
}
|
||||
|
||||
LiveImageAssembler image_assembler(n_modules);
|
||||
|
||||
auto ctx = zmq_ctx_new();
|
||||
zmq_ctx_set (ctx, ZMQ_IO_THREADS, 1);
|
||||
|
||||
std::vector<std::thread> reading_threads(n_modules);
|
||||
for (size_t i_module=0; i_module<n_modules; i_module++) {
|
||||
|
||||
// TODO: Very ugly. Fix.
|
||||
string module_name = "M";
|
||||
if (i_module < 10) {
|
||||
module_name += "0";
|
||||
}
|
||||
module_name += to_string(i_module);
|
||||
|
||||
reading_threads.emplace_back(
|
||||
read_buffer,
|
||||
detector_folder,
|
||||
module_name,
|
||||
i_module,
|
||||
ref(pulse_ids_to_write),
|
||||
ref(image_assembler),
|
||||
ctx);
|
||||
}
|
||||
|
||||
JFH5LiveWriter writer(output_file, detector_folder, n_modules, n_pulses);
|
||||
|
||||
for (uint64_t pulse_id:pulse_ids_to_write) {
|
||||
|
||||
while(!image_assembler.is_slot_full(pulse_id)) {
|
||||
this_thread::sleep_for(chrono::milliseconds(ASSEMBLER_RETRY_MS));
|
||||
}
|
||||
|
||||
auto metadata = image_assembler.get_metadata_buffer(pulse_id);
|
||||
auto data = image_assembler.get_data_buffer(pulse_id);
|
||||
|
||||
auto start_time = steady_clock::now();
|
||||
|
||||
writer.write(metadata, data);
|
||||
|
||||
auto end_time = steady_clock::now();
|
||||
auto write_us_duration = duration_cast<microseconds>(
|
||||
end_time-start_time).count();
|
||||
|
||||
image_assembler.free_slot(pulse_id);
|
||||
|
||||
cout << "sf_writer:avg_write_us ";
|
||||
cout << write_us_duration / BUFFER_BLOCK_SIZE << endl;
|
||||
}
|
||||
|
||||
for (auto& reading_thread : reading_threads) {
|
||||
if (reading_thread.joinable()) {
|
||||
reading_thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
@@ -1,10 +0,0 @@
|
||||
add_executable(jf-live-writer-tests main.cpp)
|
||||
|
||||
target_link_libraries(jf-live-writer-tests
|
||||
jf-live-writer-lib
|
||||
hdf5
|
||||
hdf5_hl
|
||||
hdf5_cpp
|
||||
zmq
|
||||
gtest
|
||||
)
|
||||
@@ -1,10 +0,0 @@
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include "test_BinaryReader.cpp"
|
||||
|
||||
using namespace std;
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
@@ -1,10 +0,0 @@
|
||||
#include <BinaryReader.hpp>
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
TEST(BinaryReader, basic_interaction) {
|
||||
// TODO: Write some real tests.
|
||||
auto detector_folder = "test_device";
|
||||
auto module_name = "M1";
|
||||
BinaryReader reader(detector_folder, module_name);
|
||||
}
|
||||
|
||||
@@ -59,27 +59,6 @@ ZmqLiveSender::ZmqLiveSender(
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
}
|
||||
|
||||
if (false) {
|
||||
socket_pulse_ = zmq_socket(ctx, ZMQ_PUB);
|
||||
|
||||
if (zmq_bind(socket_pulse_, config.pulse_address.c_str()) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
const int sndhwm = PULSE_ZMQ_SNDHWM;
|
||||
if (zmq_setsockopt(
|
||||
socket_pulse_, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
const int linger = 0;
|
||||
if (zmq_setsockopt(
|
||||
socket_pulse_, ZMQ_LINGER, &linger, sizeof(linger)) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
ZmqLiveSender::~ZmqLiveSender()
|
||||
@@ -121,10 +100,6 @@ void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data)
|
||||
}
|
||||
}
|
||||
|
||||
// if(zmq_send(socket_pulse_, &pulse_id, sizeof(pulse_id), 0) == -1) {
|
||||
// throw runtime_error(zmq_strerror(errno));
|
||||
// }
|
||||
|
||||
// TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame)
|
||||
|
||||
header.AddMember("frame", frame_index, header_alloc);
|
||||
|
||||
Reference in New Issue
Block a user