From 69d13fc3aa65c4802609f81d48c8b205610bc9f4 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 9 Jun 2020 14:03:12 +0200 Subject: [PATCH 1/9] Extract stream sending logic to class --- sf-stream/src/main.cpp | 227 ++++++----------------------------------- 1 file changed, 32 insertions(+), 195 deletions(-) diff --git a/sf-stream/src/main.cpp b/sf-stream/src/main.cpp index 83d3f9d..2a48505 100644 --- a/sf-stream/src/main.cpp +++ b/sf-stream/src/main.cpp @@ -1,22 +1,17 @@ #include -#include #include -#include #include #include #include -#include "rapidjson/istreamwrapper.h" -#include -#include "rapidjson/document.h" -#include "rapidjson/stringbuffer.h" -#include "rapidjson/writer.h" #include "FastQueue.hpp" #include "LiveRecvModule.hpp" #include "buffer_config.hpp" #include "stream_config.hpp" +#include "ZmqLiveSender.hpp" using namespace std; +using namespace chrono; using namespace buffer_config; using namespace stream_config; @@ -27,239 +22,81 @@ int main (int argc, char *argv[]) cout << "Usage: sf_stream "; cout << " [config_json_file]"; cout << endl; - cout << "\tconfig_json_file: json file with the configuration parameters(detector name, number of modules, pedestal and gain files" << endl; + cout << "\tconfig_json_file: json file with the configuration " + "parameters(detector name, number of modules, pedestal and " + "gain files" << endl; cout << endl; exit(-1); } - string config_json_file = string(argv[1]); - - ifstream ifs(config_json_file); - rapidjson::IStreamWrapper isw(ifs); - rapidjson::Document config_parameters; - config_parameters.ParseStream(isw); - - string streamvis_address = config_parameters["streamvis_stream"].GetString(); - int reduction_factor_streamvis = config_parameters["streamvis_rate"].GetInt(); - string live_analysis_address = config_parameters["live_stream"].GetString(); - int reduction_factor_live_analysis = config_parameters["live_rate"].GetInt(); - - const string PEDE_FILENAME = config_parameters["pedestal_file"].GetString(); - const string GAIN_FILENAME = config_parameters["gain_file"].GetString(); - const string DETECTOR_NAME = config_parameters["detector_name"].GetString(); - size_t n_modules = config_parameters["n_modules"].GetInt(); + auto config = read_json_config(string(argv[1])); + string RECV_IPC_URL = BUFFER_LIVE_IPC_URL + config.DETECTOR_NAME + "-"; FastQueue queue( - n_modules * MODULE_N_BYTES, STREAM_FASTQUEUE_SLOTS); + config.n_modules * MODULE_N_BYTES, STREAM_FASTQUEUE_SLOTS); auto ctx = zmq_ctx_new(); zmq_ctx_set (ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); - const string LIVE_IPC_URL = BUFFER_LIVE_IPC_URL+DETECTOR_NAME + "-"; - ZmqLiveReceiver receiver(n_modules, ctx, LIVE_IPC_URL); - + ZmqLiveReceiver receiver(config.n_modules, ctx, RECV_IPC_URL); LiveRecvModule recv_module(queue, receiver); - // 0mq sockets to streamvis and live analysis - void *socket_streamvis = zmq_socket(ctx, ZMQ_PUB); - if (zmq_bind(socket_streamvis, streamvis_address.c_str()) != 0) { - throw runtime_error(strerror(errno)); - } - void *socket_live = zmq_socket(ctx, ZMQ_PUB); - if (zmq_bind(socket_live, live_analysis_address.c_str()) != 0) { - throw runtime_error(strerror(errno)); - } - - uint16_t data_empty [] = { 0, 0, 0, 0}; + ZmqLiveSender sender(ctx, config); // TODO: Remove stats trash. int stats_counter = 0; - size_t read_total_us = 0; size_t read_max_us = 0; + size_t send_total_us = 0; + size_t send_max_us = 0; while (true) { - rapidjson::Document header(rapidjson::kObjectType); - auto& header_alloc = header.GetAllocator(); - string text_header; + auto start_time = steady_clock::now(); - auto start_time = chrono::steady_clock::now(); - - auto slot_id = queue.read(); - - if(slot_id == -1) { - this_thread::sleep_for(chrono::milliseconds( - buffer_config::RB_READ_RETRY_INTERVAL_MS)); - continue; + int slot_id; + while((slot_id = queue.read()) == -1) { + this_thread::sleep_for(milliseconds(RB_READ_RETRY_INTERVAL_MS)); } - auto metadata = queue.get_metadata_buffer(slot_id); + auto meta = queue.get_metadata_buffer(slot_id); auto data = queue.get_data_buffer(slot_id); - auto read_end_time = chrono::steady_clock::now(); - auto read_us_duration = chrono::duration_cast( - read_end_time-start_time).count(); + auto end_time = steady_clock::now(); + size_t read_us_duration = duration_cast( + end_time - start_time).count(); - uint64_t pulse_id = 0; - uint64_t frame_index = 0; - uint64_t daq_rec = 0; - bool is_good_frame = true; + start_time = steady_clock::now(); - for (size_t i_module = 0; i_module < n_modules; i_module++) { - // TODO: Place this tests in the appropriate spot. - auto& module_metadata = metadata->module[i_module]; - if (i_module == 0) { - pulse_id = module_metadata.pulse_id; - frame_index = module_metadata.frame_index; - daq_rec = module_metadata.daq_rec; + sender.send(meta, data); - if (module_metadata.n_recv_packets != 128 ) is_good_frame = false; - } else { - if (module_metadata.pulse_id != pulse_id) is_good_frame = false; - - if (module_metadata.frame_index != frame_index) is_good_frame = false; - - if (module_metadata.daq_rec != daq_rec) is_good_frame = false; - - if (module_metadata.n_recv_packets != 128 ) is_good_frame = false; - } - } - - // TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame) - - header.AddMember("frame", frame_index, header_alloc); - header.AddMember("is_good_frame", is_good_frame, header_alloc); - header.AddMember("daq_rec", daq_rec, header_alloc); - header.AddMember("pulse_id", pulse_id, header_alloc); - - rapidjson::Value pedestal_file; - pedestal_file.SetString(PEDE_FILENAME.c_str(), header_alloc); - header.AddMember("pedestal_file", pedestal_file, header_alloc); - - rapidjson::Value gain_file; - gain_file.SetString(GAIN_FILENAME.c_str(), header_alloc); - header.AddMember("gain_file", gain_file, header_alloc); - - header.AddMember("number_frames_expected", 10000, header_alloc); - - rapidjson::Value run_name; - run_name.SetString( - to_string(uint64_t(pulse_id/10000)*10000).c_str(), - header_alloc); - header.AddMember("run_name", run_name, header_alloc); - - rapidjson::Value detector_name; - detector_name.SetString(DETECTOR_NAME.c_str(), header_alloc); - header.AddMember("detector_name", detector_name, header_alloc); - - header.AddMember("htype", "array-1.0", header_alloc); - header.AddMember("type", "uint16", header_alloc); - - // To be retrieved and filled with correct values down. - auto shape_value = rapidjson::Value(rapidjson::kArrayType); - shape_value.PushBack((uint64_t)0, header_alloc); - shape_value.PushBack((uint64_t)0, header_alloc); - header.AddMember("shape", shape_value, header_alloc); - - int send_streamvis = 0; - if ( reduction_factor_streamvis > 1 ) { - send_streamvis = rand() % reduction_factor_streamvis; - } - if ( send_streamvis == 0 ) { - auto& shape = header["shape"]; - shape[0] = n_modules*512; - shape[1] = 1024; - } else{ - auto& shape = header["shape"]; - shape[0] = 2; - shape[1] = 2; - } - - { - rapidjson::StringBuffer buffer; - rapidjson::Writer writer(buffer); - header.Accept(writer); - - text_header = buffer.GetString(); - } - - zmq_send(socket_streamvis, - text_header.c_str(), - text_header.size(), - ZMQ_SNDMORE); - - if ( send_streamvis == 0 ) { - zmq_send(socket_streamvis, - (char*)data, - buffer_config::MODULE_N_BYTES * n_modules, - 0); - } else { - zmq_send(socket_streamvis, - (char*)data_empty, - 8, - 0); - } - - //same for live analysis - int send_live_analysis = 0; - if ( reduction_factor_live_analysis > 1 ) { - send_live_analysis = rand() % reduction_factor_live_analysis; - } - if ( send_live_analysis == 0 ) { - auto& shape = header["shape"]; - shape[0] = n_modules*512; - shape[1] = 1024; - } else{ - auto& shape = header["shape"]; - shape[0] = 2; - shape[1] = 2; - } - - { - rapidjson::StringBuffer buffer; - rapidjson::Writer writer(buffer); - header.Accept(writer); - - text_header = buffer.GetString(); - } - - zmq_send(socket_live, - text_header.c_str(), - text_header.size(), - ZMQ_SNDMORE); - - if ( send_live_analysis == 0 ) { - zmq_send(socket_live, - (char*)data, - buffer_config::MODULE_N_BYTES * n_modules, - 0); - } else { - zmq_send(socket_live, - (char*)data_empty, - 8, - 0); - } + end_time = steady_clock::now(); + size_t send_us_duration = duration_cast( + end_time - start_time).count(); queue.release(); // TODO: Some poor statistics. stats_counter++; read_total_us += read_us_duration; + send_total_us += send_us_duration; - if (read_us_duration > read_max_us) { - read_max_us = read_us_duration; - } + read_max_us = max(read_max_us, read_us_duration); + send_max_us = max(send_max_us, send_us_duration); if (stats_counter == STATS_MODULO) { cout << "sf_stream:read_us " << read_total_us / STATS_MODULO; cout << " sf_stream:read_max_us " << read_max_us; + cout << "sf_stream:send_us " << send_total_us / STATS_MODULO; + cout << " sf_stream:send_max_us " << send_max_us; cout << endl; stats_counter = 0; read_total_us = 0; read_max_us = 0; + send_total_us = 0; + send_max_us = 0; } } } From 3a50aca2f193961453872044217805b16b4e2f42 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 9 Jun 2020 14:03:31 +0200 Subject: [PATCH 2/9] Fix typo --- sf-stream/src/ZmqLiveReceiver.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sf-stream/src/ZmqLiveReceiver.cpp b/sf-stream/src/ZmqLiveReceiver.cpp index 7a943c9..f8811e6 100644 --- a/sf-stream/src/ZmqLiveReceiver.cpp +++ b/sf-stream/src/ZmqLiveReceiver.cpp @@ -16,10 +16,10 @@ using namespace stream_config; ZmqLiveReceiver::ZmqLiveReceiver( const size_t n_modules, - void *ctx, + void* ctx, const std::string &ipc_prefix) : n_modules_(n_modules), - ctx_(ctx_), + ctx_(ctx), ipc_prefix_(ipc_prefix), sockets_(n_modules) { From cdef4018625885975279064eede096d685481930 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 9 Jun 2020 14:03:42 +0200 Subject: [PATCH 3/9] Extract sender from main --- sf-stream/src/ZmqLiveSender.cpp | 182 ++++++++++++++++++++++++++++++++ sf-stream/src/ZmqLiveSender.hpp | 60 +++++++++++ 2 files changed, 242 insertions(+) create mode 100644 sf-stream/src/ZmqLiveSender.cpp create mode 100644 sf-stream/src/ZmqLiveSender.hpp diff --git a/sf-stream/src/ZmqLiveSender.cpp b/sf-stream/src/ZmqLiveSender.cpp new file mode 100644 index 0000000..9ff32be --- /dev/null +++ b/sf-stream/src/ZmqLiveSender.cpp @@ -0,0 +1,182 @@ +#include "ZmqLiveSender.hpp" + +#include "zmq.h" +#include + +using namespace std; + +ZmqLiveSender::ZmqLiveSender( + void* ctx, + const LiveStreamConfig& config) : + ctx_(ctx), + config_(config) +{ + // TODO: Set also LINGER and SNDHWM. + + // 0mq sockets to streamvis and live analysis + socket_streamvis_ = zmq_socket(ctx, ZMQ_PUB); + if (zmq_bind(socket_streamvis_, config.streamvis_address.c_str()) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + socket_live_ = zmq_socket(ctx, ZMQ_PUB); + if (zmq_bind(socket_live_, config.live_analysis_address.c_str()) != 0) { + throw runtime_error(zmq_strerror(errno)); + } +} + +ZmqLiveSender::~ZmqLiveSender() +{ + zmq_close(socket_streamvis_); + zmq_close(socket_live_); +} + +void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data) +{ + uint16_t data_empty [] = { 0, 0, 0, 0}; + + rapidjson::Document header(rapidjson::kObjectType); + auto& header_alloc = header.GetAllocator(); + string text_header; + + uint64_t pulse_id = 0; + uint64_t frame_index = 0; + uint64_t daq_rec = 0; + bool is_good_frame = true; + + for (size_t i_module = 0; i_module < n_modules; i_module++) { + // TODO: Place this tests in the appropriate spot. + auto& module_metadata = metadata->module[i_module]; + if (i_module == 0) { + pulse_id = module_metadata.pulse_id; + frame_index = module_metadata.frame_index; + daq_rec = module_metadata.daq_rec; + + if (module_metadata.n_recv_packets != 128 ) is_good_frame = false; + } else { + if (module_metadata.pulse_id != pulse_id) is_good_frame = false; + + if (module_metadata.frame_index != frame_index) is_good_frame = false; + + if (module_metadata.daq_rec != daq_rec) is_good_frame = false; + + if (module_metadata.n_recv_packets != 128 ) is_good_frame = false; + } + } + + // TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame) + + header.AddMember("frame", frame_index, header_alloc); + header.AddMember("is_good_frame", is_good_frame, header_alloc); + header.AddMember("daq_rec", daq_rec, header_alloc); + header.AddMember("pulse_id", pulse_id, header_alloc); + + rapidjson::Value pedestal_file; + pedestal_file.SetString(PEDE_FILENAME.c_str(), header_alloc); + header.AddMember("pedestal_file", pedestal_file, header_alloc); + + rapidjson::Value gain_file; + gain_file.SetString(GAIN_FILENAME.c_str(), header_alloc); + header.AddMember("gain_file", gain_file, header_alloc); + + header.AddMember("number_frames_expected", 10000, header_alloc); + + rapidjson::Value run_name; + run_name.SetString( + to_string(uint64_t(pulse_id/10000)*10000).c_str(), + header_alloc); + header.AddMember("run_name", run_name, header_alloc); + + rapidjson::Value detector_name; + detector_name.SetString(DETECTOR_NAME.c_str(), header_alloc); + header.AddMember("detector_name", detector_name, header_alloc); + + header.AddMember("htype", "array-1.0", header_alloc); + header.AddMember("type", "uint16", header_alloc); + + // To be retrieved and filled with correct values down. + auto shape_value = rapidjson::Value(rapidjson::kArrayType); + shape_value.PushBack((uint64_t)0, header_alloc); + shape_value.PushBack((uint64_t)0, header_alloc); + header.AddMember("shape", shape_value, header_alloc); + + int send_streamvis = 0; + if ( reduction_factor_streamvis > 1 ) { + send_streamvis = rand() % reduction_factor_streamvis; + } + if ( send_streamvis == 0 ) { + auto& shape = header["shape"]; + shape[0] = n_modules*512; + shape[1] = 1024; + } else{ + auto& shape = header["shape"]; + shape[0] = 2; + shape[1] = 2; + } + + { + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + header.Accept(writer); + + text_header = buffer.GetString(); + } + + zmq_send(socket_streamvis, + text_header.c_str(), + text_header.size(), + ZMQ_SNDMORE); + + if ( send_streamvis == 0 ) { + zmq_send(socket_streamvis, + (char*)data, + buffer_config::MODULE_N_BYTES * n_modules, + 0); + } else { + zmq_send(socket_streamvis, + (char*)data_empty, + 8, + 0); + } + + //same for live analysis + int send_live_analysis = 0; + if ( reduction_factor_live_analysis > 1 ) { + send_live_analysis = rand() % reduction_factor_live_analysis; + } + if ( send_live_analysis == 0 ) { + auto& shape = header["shape"]; + shape[0] = n_modules*512; + shape[1] = 1024; + } else{ + auto& shape = header["shape"]; + shape[0] = 2; + shape[1] = 2; + } + + { + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + header.Accept(writer); + + text_header = buffer.GetString(); + } + + zmq_send(socket_live, + text_header.c_str(), + text_header.size(), + ZMQ_SNDMORE); + + if ( send_live_analysis == 0 ) { + zmq_send(socket_live, + (char*)data, + buffer_config::MODULE_N_BYTES * n_modules, + 0); + } else { + zmq_send(socket_live, + (char*)data_empty, + 8, + 0); + } +} + diff --git a/sf-stream/src/ZmqLiveSender.hpp b/sf-stream/src/ZmqLiveSender.hpp new file mode 100644 index 0000000..ba7d0a1 --- /dev/null +++ b/sf-stream/src/ZmqLiveSender.hpp @@ -0,0 +1,60 @@ +#ifndef SF_DAQ_BUFFER_ZMQLIVESENDER_HPP +#define SF_DAQ_BUFFER_ZMQLIVESENDER_HPP + +#include +#include +#include +#include +#include +#include + +#include "formats.hpp" + + +struct LiveStreamConfig { + const std::string streamvis_address; + const int reduction_factor_streamvis; + const std::string live_analysis_address; + const int reduction_factor_live_analysis; + const std::string PEDE_FILENAME; + const std::string GAIN_FILENAME; + const std::string DETECTOR_NAME; + const int n_modules; +}; + +LiveStreamConfig read_json_config(const std::string filename) +{ + std::ifstream ifs(filename); + rapidjson::IStreamWrapper isw(ifs); + rapidjson::Document config_parameters; + config_parameters.ParseStream(isw); + + return { + config_parameters["streamvis_stream"].GetString(), + config_parameters["streamvis_rate"].GetInt(), + config_parameters["live_stream"].GetString(), + config_parameters["live_rate"].GetInt(), + config_parameters["pedestal_file"].GetString(), + config_parameters["gain_file"].GetString(), + config_parameters["detector_name"].GetString(), + config_parameters["n_modules"].GetInt() + }; +} + +class ZmqLiveSender { + const void* ctx_; + const LiveStreamConfig config_; + + void* socket_streamvis_; + void* socket_live_; + +public: + ZmqLiveSender(void* ctx, + const LiveStreamConfig& config); + ~ZmqLiveSender(); + + void send(const ModuleFrameBuffer* meta, const char* data); +}; + + +#endif //SF_DAQ_BUFFER_ZMQLIVESENDER_HPP From c541da141bb971079062b3c461e79254fc591025 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 9 Jun 2020 14:10:19 +0200 Subject: [PATCH 4/9] Fix changes once the class was extracted --- sf-stream/src/ZmqLiveSender.cpp | 38 ++++++++++++++++----------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/sf-stream/src/ZmqLiveSender.cpp b/sf-stream/src/ZmqLiveSender.cpp index 9ff32be..bdf7c28 100644 --- a/sf-stream/src/ZmqLiveSender.cpp +++ b/sf-stream/src/ZmqLiveSender.cpp @@ -44,9 +44,9 @@ void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data) uint64_t daq_rec = 0; bool is_good_frame = true; - for (size_t i_module = 0; i_module < n_modules; i_module++) { + for (size_t i_module = 0; i_module < config_.n_modules; i_module++) { // TODO: Place this tests in the appropriate spot. - auto& module_metadata = metadata->module[i_module]; + auto& module_metadata = meta->module[i_module]; if (i_module == 0) { pulse_id = module_metadata.pulse_id; frame_index = module_metadata.frame_index; @@ -72,11 +72,11 @@ void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data) header.AddMember("pulse_id", pulse_id, header_alloc); rapidjson::Value pedestal_file; - pedestal_file.SetString(PEDE_FILENAME.c_str(), header_alloc); + pedestal_file.SetString(config_.PEDE_FILENAME.c_str(), header_alloc); header.AddMember("pedestal_file", pedestal_file, header_alloc); rapidjson::Value gain_file; - gain_file.SetString(GAIN_FILENAME.c_str(), header_alloc); + gain_file.SetString(config_.GAIN_FILENAME.c_str(), header_alloc); header.AddMember("gain_file", gain_file, header_alloc); header.AddMember("number_frames_expected", 10000, header_alloc); @@ -88,7 +88,7 @@ void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data) header.AddMember("run_name", run_name, header_alloc); rapidjson::Value detector_name; - detector_name.SetString(DETECTOR_NAME.c_str(), header_alloc); + detector_name.SetString(config_.DETECTOR_NAME.c_str(), header_alloc); header.AddMember("detector_name", detector_name, header_alloc); header.AddMember("htype", "array-1.0", header_alloc); @@ -101,12 +101,12 @@ void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data) header.AddMember("shape", shape_value, header_alloc); int send_streamvis = 0; - if ( reduction_factor_streamvis > 1 ) { - send_streamvis = rand() % reduction_factor_streamvis; + if ( config_.reduction_factor_streamvis > 1 ) { + send_streamvis = rand() % config_.reduction_factor_streamvis; } if ( send_streamvis == 0 ) { auto& shape = header["shape"]; - shape[0] = n_modules*512; + shape[0] = config_.n_modules*512; shape[1] = 1024; } else{ auto& shape = header["shape"]; @@ -122,18 +122,18 @@ void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data) text_header = buffer.GetString(); } - zmq_send(socket_streamvis, + zmq_send(socket_streamvis_, text_header.c_str(), text_header.size(), ZMQ_SNDMORE); if ( send_streamvis == 0 ) { - zmq_send(socket_streamvis, + zmq_send(socket_streamvis_, (char*)data, - buffer_config::MODULE_N_BYTES * n_modules, + buffer_config::MODULE_N_BYTES * config_.n_modules, 0); } else { - zmq_send(socket_streamvis, + zmq_send(socket_streamvis_, (char*)data_empty, 8, 0); @@ -141,12 +141,12 @@ void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data) //same for live analysis int send_live_analysis = 0; - if ( reduction_factor_live_analysis > 1 ) { - send_live_analysis = rand() % reduction_factor_live_analysis; + if ( config_.reduction_factor_live_analysis > 1 ) { + send_live_analysis = rand() % config_.reduction_factor_live_analysis; } if ( send_live_analysis == 0 ) { auto& shape = header["shape"]; - shape[0] = n_modules*512; + shape[0] = config_.n_modules*512; shape[1] = 1024; } else{ auto& shape = header["shape"]; @@ -162,18 +162,18 @@ void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data) text_header = buffer.GetString(); } - zmq_send(socket_live, + zmq_send(socket_live_, text_header.c_str(), text_header.size(), ZMQ_SNDMORE); if ( send_live_analysis == 0 ) { - zmq_send(socket_live, + zmq_send(socket_live_, (char*)data, - buffer_config::MODULE_N_BYTES * n_modules, + buffer_config::MODULE_N_BYTES * config_.n_modules, 0); } else { - zmq_send(socket_live, + zmq_send(socket_live_, (char*)data_empty, 8, 0); From 337c3d54629f9ba74cdeedee04fa4b5fc817b5a2 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 9 Jun 2020 14:14:12 +0200 Subject: [PATCH 5/9] Modulo on atomic is not really supported --- sf-stream/src/FastQueue.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sf-stream/src/FastQueue.cpp b/sf-stream/src/FastQueue.cpp index 1a67664..e8f5987 100644 --- a/sf-stream/src/FastQueue.cpp +++ b/sf-stream/src/FastQueue.cpp @@ -72,7 +72,7 @@ void FastQueue::commit() } write_slot_id_++; - write_slot_id_ %= n_slots_; + write_slot_id_ = write_slot_id_ % n_slots_; } template @@ -99,7 +99,7 @@ void FastQueue::release() } read_slot_id_++; - read_slot_id_ %= n_slots_; + read_slot_id_ = read_slot_id_ % n_slots_; } template class FastQueue; From 4c6f75eb75292a7915cc490738d63dbca3f3109f Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 9 Jun 2020 14:18:43 +0200 Subject: [PATCH 6/9] Move hpp to correct folder --- sf-stream/{src => include}/ZmqLiveSender.hpp | 19 +------------------ sf-stream/src/ZmqLiveSender.cpp | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 18 deletions(-) rename sf-stream/{src => include}/ZmqLiveSender.hpp (59%) diff --git a/sf-stream/src/ZmqLiveSender.hpp b/sf-stream/include/ZmqLiveSender.hpp similarity index 59% rename from sf-stream/src/ZmqLiveSender.hpp rename to sf-stream/include/ZmqLiveSender.hpp index ba7d0a1..6db77fa 100644 --- a/sf-stream/src/ZmqLiveSender.hpp +++ b/sf-stream/include/ZmqLiveSender.hpp @@ -22,24 +22,7 @@ struct LiveStreamConfig { const int n_modules; }; -LiveStreamConfig read_json_config(const std::string filename) -{ - std::ifstream ifs(filename); - rapidjson::IStreamWrapper isw(ifs); - rapidjson::Document config_parameters; - config_parameters.ParseStream(isw); - - return { - config_parameters["streamvis_stream"].GetString(), - config_parameters["streamvis_rate"].GetInt(), - config_parameters["live_stream"].GetString(), - config_parameters["live_rate"].GetInt(), - config_parameters["pedestal_file"].GetString(), - config_parameters["gain_file"].GetString(), - config_parameters["detector_name"].GetString(), - config_parameters["n_modules"].GetInt() - }; -} +LiveStreamConfig read_json_config(const std::string filename); class ZmqLiveSender { const void* ctx_; diff --git a/sf-stream/src/ZmqLiveSender.cpp b/sf-stream/src/ZmqLiveSender.cpp index bdf7c28..6538f64 100644 --- a/sf-stream/src/ZmqLiveSender.cpp +++ b/sf-stream/src/ZmqLiveSender.cpp @@ -5,6 +5,25 @@ using namespace std; +LiveStreamConfig read_json_config(const std::string filename) +{ + std::ifstream ifs(filename); + rapidjson::IStreamWrapper isw(ifs); + rapidjson::Document config_parameters; + config_parameters.ParseStream(isw); + + return { + config_parameters["streamvis_stream"].GetString(), + config_parameters["streamvis_rate"].GetInt(), + config_parameters["live_stream"].GetString(), + config_parameters["live_rate"].GetInt(), + config_parameters["pedestal_file"].GetString(), + config_parameters["gain_file"].GetString(), + config_parameters["detector_name"].GetString(), + config_parameters["n_modules"].GetInt() + }; +} + ZmqLiveSender::ZmqLiveSender( void* ctx, const LiveStreamConfig& config) : From 5638071b95b988ebaa376f05328e9c6183aeb69e Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 9 Jun 2020 14:20:12 +0200 Subject: [PATCH 7/9] Fix whitespace --- sf-stream/src/main.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sf-stream/src/main.cpp b/sf-stream/src/main.cpp index 2a48505..1a52157 100644 --- a/sf-stream/src/main.cpp +++ b/sf-stream/src/main.cpp @@ -88,7 +88,7 @@ int main (int argc, char *argv[]) if (stats_counter == STATS_MODULO) { cout << "sf_stream:read_us " << read_total_us / STATS_MODULO; cout << " sf_stream:read_max_us " << read_max_us; - cout << "sf_stream:send_us " << send_total_us / STATS_MODULO; + cout << " sf_stream:send_us " << send_total_us / STATS_MODULO; cout << " sf_stream:send_max_us " << send_max_us; cout << endl; From 9ff5c08b9b645c09ecef29d9887ebd47d382c59e Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 9 Jun 2020 14:24:36 +0200 Subject: [PATCH 8/9] Convert sf_stream to single thread --- sf-stream/src/main.cpp | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/sf-stream/src/main.cpp b/sf-stream/src/main.cpp index 1a52157..cc65e45 100644 --- a/sf-stream/src/main.cpp +++ b/sf-stream/src/main.cpp @@ -4,7 +4,6 @@ #include #include -#include "FastQueue.hpp" #include "LiveRecvModule.hpp" #include "buffer_config.hpp" #include "stream_config.hpp" @@ -33,15 +32,13 @@ int main (int argc, char *argv[]) auto config = read_json_config(string(argv[1])); string RECV_IPC_URL = BUFFER_LIVE_IPC_URL + config.DETECTOR_NAME + "-"; - FastQueue queue( - config.n_modules * MODULE_N_BYTES, STREAM_FASTQUEUE_SLOTS); + ModuleFrameBuffer* meta = new ModuleFrameBuffer(); + char* data = new char[config.n_modules * MODULE_N_BYTES]; auto ctx = zmq_ctx_new(); zmq_ctx_set (ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); ZmqLiveReceiver receiver(config.n_modules, ctx, RECV_IPC_URL); - LiveRecvModule recv_module(queue, receiver); - ZmqLiveSender sender(ctx, config); // TODO: Remove stats trash. @@ -55,13 +52,11 @@ int main (int argc, char *argv[]) auto start_time = steady_clock::now(); - int slot_id; - while((slot_id = queue.read()) == -1) { - this_thread::sleep_for(milliseconds(RB_READ_RETRY_INTERVAL_MS)); - } + auto n_lost_pulses = receiver.get_next_image(meta, data); - auto meta = queue.get_metadata_buffer(slot_id); - auto data = queue.get_data_buffer(slot_id); + if (n_lost_pulses > 0) { + cout << "sf_stream:resync_lost_pulses " << n_lost_pulses << endl; + } auto end_time = steady_clock::now(); size_t read_us_duration = duration_cast( @@ -75,8 +70,6 @@ int main (int argc, char *argv[]) size_t send_us_duration = duration_cast( end_time - start_time).count(); - queue.release(); - // TODO: Some poor statistics. stats_counter++; read_total_us += read_us_duration; From e0c3d673cc74080ef14f3cb0095ce33c514d7fbf Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 9 Jun 2020 14:29:18 +0200 Subject: [PATCH 9/9] Reduce number of IO threads --- sf-stream/include/stream_config.hpp | 2 +- sf-stream/src/main.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sf-stream/include/stream_config.hpp b/sf-stream/include/stream_config.hpp index beea625..8d8b18b 100644 --- a/sf-stream/include/stream_config.hpp +++ b/sf-stream/include/stream_config.hpp @@ -1,7 +1,7 @@ namespace stream_config { // N of IO threads to receive data from modules. - const int STREAM_ZMQ_IO_THREADS = 4; + const int STREAM_ZMQ_IO_THREADS = 2; // How long should the RECV queue be. const size_t STREAM_RCVHWM = 100; // Size of buffer between the receiving and sending part. diff --git a/sf-stream/src/main.cpp b/sf-stream/src/main.cpp index cc65e45..78e798d 100644 --- a/sf-stream/src/main.cpp +++ b/sf-stream/src/main.cpp @@ -55,7 +55,7 @@ int main (int argc, char *argv[]) auto n_lost_pulses = receiver.get_next_image(meta, data); if (n_lost_pulses > 0) { - cout << "sf_stream:resync_lost_pulses " << n_lost_pulses << endl; + cout << "sf_stream:sync_lost_pulses " << n_lost_pulses << endl; } auto end_time = steady_clock::now();