Merge branch 'ram_buffer'

# Conflicts:
#	CMakeLists.txt
#	jf-assembler/test/main.cpp
#	jf-live-writer/CMakeLists.txt
#	jf-live-writer/include/live_writer_config.hpp
#	jf-live-writer/src/main.cpp
#	jf-live-writer/test/CMakeLists.txt
#	jf-live-writer/test/main.cpp
#	scripts/JF01-buffer-worker.sh
#	scripts/JF02-buffer-worker.sh
#	scripts/JF06-buffer-worker.sh
#	scripts/JF06_4M-buffer-worker.sh
#	sf-stream/include/stream_config.hpp
#	sf-stream/src/ZmqLiveSender.cpp
This commit is contained in:
2021-02-26 12:32:36 +01:00
76 changed files with 2241 additions and 620 deletions
+62
View File
@@ -0,0 +1,62 @@
#include "StreamStats.hpp"
#include <iostream>
using namespace std;
using namespace chrono;
StreamStats::StreamStats(
const std::string &detector_name,
const std::string &stream_name,
const size_t stats_modulo) :
detector_name_(detector_name),
stream_name_(stream_name),
stats_modulo_(stats_modulo)
{
reset_counters();
}
void StreamStats::reset_counters()
{
image_counter_ = 0;
n_corrupted_images_ = 0;
stats_interval_start_ = steady_clock::now();
}
void StreamStats::record_stats(
const ImageMetadata &meta)
{
image_counter_++;
if (!meta.is_good_image) {
n_corrupted_images_++;
}
if (image_counter_ == stats_modulo_) {
print_stats();
reset_counters();
}
}
void StreamStats::print_stats()
{
auto interval_ms_duration = duration_cast<milliseconds>(
steady_clock::now()-stats_interval_start_).count();
// * 1000 because milliseconds, + 250 because of truncation.
int rep_rate = ((image_counter_ * 1000) + 250) / interval_ms_duration;
uint64_t timestamp = time_point_cast<nanoseconds>(
system_clock::now()).time_since_epoch().count();
// Output in InfluxDB line protocol
cout << "sf_stream";
cout << ",detector_name=" << detector_name_;
cout << ",stream_name=" << stream_name_;
cout << " ";
cout << "n_processed_images=" << image_counter_ << "i";
cout << ",n_corrupted_images=" << n_corrupted_images_ << "i";
cout << ",repetition_rate=" << rep_rate << "i";
cout << " ";
cout << timestamp;
cout << endl;
}
-171
View File
@@ -1,171 +0,0 @@
#include "ZmqLiveReceiver.hpp"
#include <zmq.h>
#include <stdexcept>
#include <sstream>
#include <chrono>
#include "buffer_config.hpp"
#include "stream_config.hpp"
using namespace std;
using namespace chrono;
using namespace buffer_config;
using namespace stream_config;
ZmqLiveReceiver::ZmqLiveReceiver(
const size_t n_modules,
void* ctx,
const std::string &ipc_prefix) :
n_modules_(n_modules),
ctx_(ctx),
ipc_prefix_(ipc_prefix),
sockets_(n_modules)
{
for (size_t i = 0; i < n_modules_; i++) {
sockets_[i] = connect_socket(i);
}
}
ZmqLiveReceiver::~ZmqLiveReceiver()
{
for (auto& socket:sockets_) {
zmq_close(socket);
}
}
void* ZmqLiveReceiver::connect_socket(size_t module_id)
{
void* socket = zmq_socket(ctx_, ZMQ_SUB);
if (socket == nullptr) {
throw runtime_error(zmq_strerror(errno));
}
int rcvhwm = STREAM_RCVHWM;
if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
int linger = 0;
if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
stringstream ipc_addr;
ipc_addr << ipc_prefix_ << module_id;
const auto ipc = ipc_addr.str();
if (zmq_connect(socket, ipc.c_str()) != 0) {
throw runtime_error(zmq_strerror(errno));
}
if (zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0) != 0) {
throw runtime_error(zmq_strerror(errno));
}
return socket;
}
void ZmqLiveReceiver::recv_single_module(
void* socket, ModuleFrame* meta, char* data)
{
auto n_bytes_meta = zmq_recv(socket, meta, sizeof(ModuleFrame), 0);
if (n_bytes_meta == -1) {
throw runtime_error(zmq_strerror(errno));
}
if (n_bytes_meta != sizeof(ModuleFrame)) {
throw runtime_error("Stream header of wrong size.");
}
if (meta->pulse_id == 0) {
throw runtime_error("Received invalid pulse_id=0.");
}
auto n_bytes_frame = zmq_recv(socket, data, MODULE_N_BYTES, 0);
if (n_bytes_frame == -1) {
throw runtime_error(zmq_strerror(errno));
}
if (n_bytes_frame != MODULE_N_BYTES) {
throw runtime_error("Stream data of wrong size.");
}
}
uint64_t ZmqLiveReceiver::align_modules(ModuleFrameBuffer *meta, char *data)
{
uint64_t max_pulse_id = 0;
uint64_t min_pulse_id = numeric_limits<uint64_t>::max();
// First pass - determine current min and max pulse_id.
for (size_t i_module = 0; i_module < n_modules_; i_module++) {
auto& module_meta = meta->module[i_module];
min_pulse_id = min(min_pulse_id, module_meta.pulse_id);
max_pulse_id = max(max_pulse_id, module_meta.pulse_id);
}
auto max_diff = max_pulse_id - min_pulse_id;
if (max_diff > PULSE_OFFSET_LIMIT) {
stringstream err_msg;
err_msg << "[ZmqLiveReceiver::align_modules]";
err_msg << " PULSE_OFFSET_LIMIT exceeded.";
err_msg << " Modules out of sync for " << max_diff << " pulses.";
for (size_t i_module = 0; i_module < n_modules_; i_module++) {
auto& module_meta = meta->module[i_module];
err_msg << " (" << module_meta.module_id << ", ";
err_msg << module_meta.pulse_id << "),";
}
err_msg << endl;
throw runtime_error(err_msg.str());
}
// Second pass - align all receivers to max_pulse_id.
for (size_t i_module = 0; i_module < n_modules_; i_module++) {
auto& module_meta = meta->module[i_module];
while (module_meta.pulse_id < max_pulse_id) {
recv_single_module(
sockets_[i_module],
&module_meta,
data + (MODULE_N_BYTES * i_module));
}
if (module_meta.pulse_id != max_pulse_id) {
throw runtime_error("Cannot align pulse_ids.");
}
}
return max_pulse_id - min_pulse_id;
}
uint64_t ZmqLiveReceiver::get_next_image(ModuleFrameBuffer* meta, char* data)
{
uint64_t frame_pulse_id;
bool sync_needed = false;
for (size_t i_module = 0; i_module < n_modules_; i_module++) {
auto& module_meta = meta->module[i_module];
char* buffer = data + (MODULE_N_BYTES * i_module);
recv_single_module(sockets_[i_module], &module_meta, buffer);
if (i_module == 0) {
frame_pulse_id = module_meta.pulse_id;
} else if (frame_pulse_id != module_meta.pulse_id) {
sync_needed = true;
}
}
if (sync_needed) {
auto lost_pulses = align_modules(meta, data);
return lost_pulses;
}
return 0;
}
+11 -57
View File
@@ -3,35 +3,18 @@
#include "zmq.h"
#include <stdexcept>
#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include <iostream>
//
using namespace std;
using namespace stream_config;
LiveStreamConfig read_json_config(const std::string filename)
{
std::ifstream ifs(filename);
rapidjson::IStreamWrapper isw(ifs);
rapidjson::Document config_parameters;
config_parameters.ParseStream(isw);
return {
config_parameters["streamvis_stream"].GetString(),
config_parameters["streamvis_rate"].GetInt(),
config_parameters["live_stream"].GetString(),
config_parameters["live_rate"].GetInt(),
config_parameters["pedestal_file"].GetString(),
config_parameters["gain_file"].GetString(),
config_parameters["detector_name"].GetString(),
config_parameters["n_modules"].GetInt(),
"tcp://127.0.0.1:51234"
};
}
ZmqLiveSender::ZmqLiveSender(
void* ctx,
const LiveStreamConfig& config) :
const BufferUtils::DetectorConfig& config) :
ctx_(ctx),
config_(config)
{
@@ -69,7 +52,7 @@ ZmqLiveSender::~ZmqLiveSender()
zmq_close(socket_live_);
}
void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data)
void ZmqLiveSender::send(const ImageMetadata& meta, const char *data)
{
uint16_t data_empty [] = { 0, 0, 0, 0};
@@ -77,40 +60,11 @@ void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data)
auto& header_alloc = header.GetAllocator();
string text_header;
uint64_t pulse_id = 0;
uint64_t frame_index = 0;
uint64_t daq_rec = 0;
bool is_good_frame = true;
for (size_t i_module = 0; i_module < config_.n_modules; i_module++) {
// TODO: Place this tests in the appropriate spot.
auto& module_metadata = meta->module[i_module];
if (i_module == 0) {
pulse_id = module_metadata.pulse_id;
frame_index = module_metadata.frame_index;
daq_rec = module_metadata.daq_rec;
if (module_metadata.n_recv_packets != 128 ) is_good_frame = false;
} else {
if (module_metadata.pulse_id != pulse_id) is_good_frame = false;
if (module_metadata.frame_index != frame_index) is_good_frame = false;
if (module_metadata.daq_rec != daq_rec) is_good_frame = false;
if (module_metadata.n_recv_packets != 128 ) is_good_frame = false;
}
if (pulse_id % 10000 == 0 && is_good_frame != true) {
cout << "Frame is not good " << pulse_id << " module : " << i_module << " frame_index(0) : " << frame_index << " frame_index : " << module_metadata.frame_index << endl;
}
}
// TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame)
header.AddMember("frame", frame_index, header_alloc);
header.AddMember("is_good_frame", is_good_frame, header_alloc);
header.AddMember("daq_rec", daq_rec, header_alloc);
header.AddMember("pulse_id", pulse_id, header_alloc);
header.AddMember("frame", meta.frame_index, header_alloc);
header.AddMember("is_good_frame", meta.is_good_image, header_alloc);
header.AddMember("daq_rec", meta.daq_rec, header_alloc);
header.AddMember("pulse_id", meta.pulse_id, header_alloc);
rapidjson::Value pedestal_file;
pedestal_file.SetString(config_.PEDE_FILENAME.c_str(), header_alloc);
@@ -124,12 +78,12 @@ void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data)
rapidjson::Value run_name;
run_name.SetString(
to_string(uint64_t(pulse_id/10000)*10000).c_str(),
to_string(uint64_t(meta.pulse_id/10000)*10000).c_str(),
header_alloc);
header.AddMember("run_name", run_name, header_alloc);
rapidjson::Value detector_name;
detector_name.SetString(config_.DETECTOR_NAME.c_str(), header_alloc);
detector_name.SetString(config_.detector_name.c_str(), header_alloc);
header.AddMember("detector_name", detector_name, header_alloc);
header.AddMember("htype", "array-1.0", header_alloc);
+115
View File
@@ -0,0 +1,115 @@
#include "ZmqPulseSyncReceiver.hpp"
#include "BufferUtils.hpp"
#include <zmq.h>
#include <stdexcept>
#include <sstream>
#include <chrono>
#include <algorithm>
#include <iostream>
#include "stream_config.hpp"
using namespace std;
using namespace chrono;
using namespace buffer_config;
using namespace stream_config;
ZmqPulseSyncReceiver::ZmqPulseSyncReceiver(
void * ctx,
const string& detector_name,
const int n_modules) :
ctx_(ctx),
n_modules_(n_modules)
{
sockets_.reserve(n_modules_);
for (int i=0; i<n_modules_; i++) {
sockets_.push_back(
BufferUtils::connect_socket(ctx_, detector_name, to_string(i)));
}
}
ZmqPulseSyncReceiver::~ZmqPulseSyncReceiver()
{
for (auto& socket:sockets_) {
zmq_close(socket);
}
}
PulseAndSync ZmqPulseSyncReceiver::get_next_pulse_id() const
{
uint64_t pulses[n_modules_];
bool modules_in_sync = true;
for (int i = 0; i < n_modules_; i++) {
zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0);
if (pulses[0] != pulses[i]) {
modules_in_sync = false;
}
}
if (modules_in_sync) {
return {pulses[0], 0};
}
// How many pulses we lost in total to get the next pulse_id.
uint32_t n_lost_pulses = 0;
for (int i_sync=0; i_sync < SYNC_RETRY_LIMIT; i_sync++) {
uint64_t min_pulse_id = numeric_limits<uint64_t>::max();;
uint64_t max_pulse_id = 0;
for (int i = 0; i < n_modules_; i++) {
min_pulse_id = min(min_pulse_id, pulses[i]);
max_pulse_id = max(max_pulse_id, pulses[i]);
}
auto max_diff = max_pulse_id - min_pulse_id;
if (max_diff > PULSE_OFFSET_LIMIT) {
stringstream err_msg;
err_msg << "[ZmqPulseSyncReceiver::get_next_pulse_id]";
err_msg << " PULSE_OFFSET_LIMIT exceeded.";
err_msg << " max_diff=" << max_diff << " pulses.";
for (int i = 0; i < n_modules_; i++) {
err_msg << " (module " << i << ", ";
err_msg << pulses[i] << "),";
}
err_msg << endl;
throw runtime_error(err_msg.str());
}
modules_in_sync = true;
// Max pulses we lost in this sync attempt.
uint32_t i_sync_lost_pulses = 0;
for (int i = 0; i < n_modules_; i++) {
// How many pulses we lost for this specific module.
uint32_t i_module_lost_pulses = 0;
while (pulses[i] < max_pulse_id) {
zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0);
i_module_lost_pulses++;
}
i_sync_lost_pulses = max(i_sync_lost_pulses, i_module_lost_pulses);
if (pulses[i] != max_pulse_id) {
modules_in_sync = false;
}
}
n_lost_pulses += i_sync_lost_pulses;
if (modules_in_sync) {
return {pulses[0], n_lost_pulses};
}
}
stringstream err_msg;
err_msg << "[ZmqLiveReceiver::get_next_pulse_id]";
err_msg << " SYNC_RETRY_LIMIT exceeded.";
err_msg << endl;
throw runtime_error(err_msg.str());
}
+19 -65
View File
@@ -1,95 +1,49 @@
#include <iostream>
#include <string>
#include <chrono>
#include <cstring>
#include <zmq.h>
#include <RamBuffer.hpp>
#include <BufferUtils.hpp>
#include <StreamStats.hpp>
#include "buffer_config.hpp"
#include "stream_config.hpp"
#include "ZmqLiveSender.hpp"
#include "ZmqLiveReceiver.hpp"
using namespace std;
using namespace chrono;
using namespace buffer_config;
using namespace stream_config;
int main (int argc, char *argv[])
{
if (argc != 2) {
if (argc != 3) {
cout << endl;
cout << "Usage: sf_stream ";
cout << " [config_json_file]";
cout << endl;
cout << "\tconfig_json_file: json file with the configuration "
"parameters(detector name, number of modules, pedestal and "
"gain files" << endl;
cout << "Usage: sf_stream [detector_json_filename]"
" [stream_name]" << endl;
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << endl;
exit(-1);
}
auto config = read_json_config(string(argv[1]));
string RECV_IPC_URL = BUFFER_LIVE_IPC_URL + config.DETECTOR_NAME + "-";
ModuleFrameBuffer* meta = new ModuleFrameBuffer();
char* data = new char[config.n_modules * MODULE_N_BYTES];
const auto stream_name = string(argv[2]);
// TODO: Add stream_name to config reading - multiple stream definitions.
auto config = BufferUtils::read_json_config(string(argv[1]));
auto ctx = zmq_ctx_new();
zmq_ctx_set (ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS);
zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS);
auto receiver = BufferUtils::connect_socket(
ctx, config.detector_name, "assembler");
ZmqLiveReceiver receiver(config.n_modules, ctx, RECV_IPC_URL);
RamBuffer ram_buffer(config.detector_name, config.n_modules);
StreamStats stats(config.detector_name, stream_name, STREAM_STATS_MODULO);
ZmqLiveSender sender(ctx, config);
// TODO: Remove stats trash.
int stats_counter = 0;
size_t read_total_us = 0;
size_t read_max_us = 0;
size_t send_total_us = 0;
size_t send_max_us = 0;
ImageMetadata meta;
while (true) {
auto start_time = steady_clock::now();
auto n_lost_pulses = receiver.get_next_image(meta, data);
if (n_lost_pulses > 0) {
cout << "sf_stream:sync_lost_pulses " << n_lost_pulses << endl;
}
auto end_time = steady_clock::now();
size_t read_us_duration = duration_cast<microseconds>(
end_time - start_time).count();
start_time = steady_clock::now();
zmq_recv(receiver, &meta, sizeof(meta), 0);
char* data = ram_buffer.read_image(meta.pulse_id);
sender.send(meta, data);
end_time = steady_clock::now();
size_t send_us_duration = duration_cast<microseconds>(
end_time - start_time).count();
// TODO: Some poor statistics.
stats_counter++;
read_total_us += read_us_duration;
send_total_us += send_us_duration;
read_max_us = max(read_max_us, read_us_duration);
send_max_us = max(send_max_us, send_us_duration);
if (stats_counter == STATS_MODULO) {
cout << "sf_stream:read_us " << read_total_us / STATS_MODULO;
cout << " sf_stream:read_max_us " << read_max_us;
cout << " sf_stream:send_us " << send_total_us / STATS_MODULO;
cout << " sf_stream:send_max_us " << send_max_us;
cout << endl;
stats_counter = 0;
read_total_us = 0;
read_max_us = 0;
send_total_us = 0;
send_max_us = 0;
}
stats.record_stats(meta);
}
}