diff --git a/sf-stream/include/ZmqLiveSender.hpp b/sf-stream/include/ZmqLiveSender.hpp new file mode 100644 index 0000000..6db77fa --- /dev/null +++ b/sf-stream/include/ZmqLiveSender.hpp @@ -0,0 +1,43 @@ +#ifndef SF_DAQ_BUFFER_ZMQLIVESENDER_HPP +#define SF_DAQ_BUFFER_ZMQLIVESENDER_HPP + +#include +#include +#include +#include +#include +#include + +#include "formats.hpp" + + +struct LiveStreamConfig { + const std::string streamvis_address; + const int reduction_factor_streamvis; + const std::string live_analysis_address; + const int reduction_factor_live_analysis; + const std::string PEDE_FILENAME; + const std::string GAIN_FILENAME; + const std::string DETECTOR_NAME; + const int n_modules; +}; + +LiveStreamConfig read_json_config(const std::string filename); + +class ZmqLiveSender { + const void* ctx_; + const LiveStreamConfig config_; + + void* socket_streamvis_; + void* socket_live_; + +public: + ZmqLiveSender(void* ctx, + const LiveStreamConfig& config); + ~ZmqLiveSender(); + + void send(const ModuleFrameBuffer* meta, const char* data); +}; + + +#endif //SF_DAQ_BUFFER_ZMQLIVESENDER_HPP diff --git a/sf-stream/include/stream_config.hpp b/sf-stream/include/stream_config.hpp index beea625..8d8b18b 100644 --- a/sf-stream/include/stream_config.hpp +++ b/sf-stream/include/stream_config.hpp @@ -1,7 +1,7 @@ namespace stream_config { // N of IO threads to receive data from modules. - const int STREAM_ZMQ_IO_THREADS = 4; + const int STREAM_ZMQ_IO_THREADS = 2; // How long should the RECV queue be. const size_t STREAM_RCVHWM = 100; // Size of buffer between the receiving and sending part. diff --git a/sf-stream/src/FastQueue.cpp b/sf-stream/src/FastQueue.cpp index 1a67664..e8f5987 100644 --- a/sf-stream/src/FastQueue.cpp +++ b/sf-stream/src/FastQueue.cpp @@ -72,7 +72,7 @@ void FastQueue::commit() } write_slot_id_++; - write_slot_id_ %= n_slots_; + write_slot_id_ = write_slot_id_ % n_slots_; } template @@ -99,7 +99,7 @@ void FastQueue::release() } read_slot_id_++; - read_slot_id_ %= n_slots_; + read_slot_id_ = read_slot_id_ % n_slots_; } template class FastQueue; diff --git a/sf-stream/src/ZmqLiveReceiver.cpp b/sf-stream/src/ZmqLiveReceiver.cpp index 7a943c9..f8811e6 100644 --- a/sf-stream/src/ZmqLiveReceiver.cpp +++ b/sf-stream/src/ZmqLiveReceiver.cpp @@ -16,10 +16,10 @@ using namespace stream_config; ZmqLiveReceiver::ZmqLiveReceiver( const size_t n_modules, - void *ctx, + void* ctx, const std::string &ipc_prefix) : n_modules_(n_modules), - ctx_(ctx_), + ctx_(ctx), ipc_prefix_(ipc_prefix), sockets_(n_modules) { diff --git a/sf-stream/src/ZmqLiveSender.cpp b/sf-stream/src/ZmqLiveSender.cpp new file mode 100644 index 0000000..6538f64 --- /dev/null +++ b/sf-stream/src/ZmqLiveSender.cpp @@ -0,0 +1,201 @@ +#include "ZmqLiveSender.hpp" + +#include "zmq.h" +#include + +using namespace std; + +LiveStreamConfig read_json_config(const std::string filename) +{ + std::ifstream ifs(filename); + rapidjson::IStreamWrapper isw(ifs); + rapidjson::Document config_parameters; + config_parameters.ParseStream(isw); + + return { + config_parameters["streamvis_stream"].GetString(), + config_parameters["streamvis_rate"].GetInt(), + config_parameters["live_stream"].GetString(), + config_parameters["live_rate"].GetInt(), + config_parameters["pedestal_file"].GetString(), + config_parameters["gain_file"].GetString(), + config_parameters["detector_name"].GetString(), + config_parameters["n_modules"].GetInt() + }; +} + +ZmqLiveSender::ZmqLiveSender( + void* ctx, + const LiveStreamConfig& config) : + ctx_(ctx), + config_(config) +{ + // TODO: Set also LINGER and SNDHWM. + + // 0mq sockets to streamvis and live analysis + socket_streamvis_ = zmq_socket(ctx, ZMQ_PUB); + if (zmq_bind(socket_streamvis_, config.streamvis_address.c_str()) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + socket_live_ = zmq_socket(ctx, ZMQ_PUB); + if (zmq_bind(socket_live_, config.live_analysis_address.c_str()) != 0) { + throw runtime_error(zmq_strerror(errno)); + } +} + +ZmqLiveSender::~ZmqLiveSender() +{ + zmq_close(socket_streamvis_); + zmq_close(socket_live_); +} + +void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data) +{ + uint16_t data_empty [] = { 0, 0, 0, 0}; + + rapidjson::Document header(rapidjson::kObjectType); + auto& header_alloc = header.GetAllocator(); + string text_header; + + uint64_t pulse_id = 0; + uint64_t frame_index = 0; + uint64_t daq_rec = 0; + bool is_good_frame = true; + + for (size_t i_module = 0; i_module < config_.n_modules; i_module++) { + // TODO: Place this tests in the appropriate spot. + auto& module_metadata = meta->module[i_module]; + if (i_module == 0) { + pulse_id = module_metadata.pulse_id; + frame_index = module_metadata.frame_index; + daq_rec = module_metadata.daq_rec; + + if (module_metadata.n_recv_packets != 128 ) is_good_frame = false; + } else { + if (module_metadata.pulse_id != pulse_id) is_good_frame = false; + + if (module_metadata.frame_index != frame_index) is_good_frame = false; + + if (module_metadata.daq_rec != daq_rec) is_good_frame = false; + + if (module_metadata.n_recv_packets != 128 ) is_good_frame = false; + } + } + + // TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame) + + header.AddMember("frame", frame_index, header_alloc); + header.AddMember("is_good_frame", is_good_frame, header_alloc); + header.AddMember("daq_rec", daq_rec, header_alloc); + header.AddMember("pulse_id", pulse_id, header_alloc); + + rapidjson::Value pedestal_file; + pedestal_file.SetString(config_.PEDE_FILENAME.c_str(), header_alloc); + header.AddMember("pedestal_file", pedestal_file, header_alloc); + + rapidjson::Value gain_file; + gain_file.SetString(config_.GAIN_FILENAME.c_str(), header_alloc); + header.AddMember("gain_file", gain_file, header_alloc); + + header.AddMember("number_frames_expected", 10000, header_alloc); + + rapidjson::Value run_name; + run_name.SetString( + to_string(uint64_t(pulse_id/10000)*10000).c_str(), + header_alloc); + header.AddMember("run_name", run_name, header_alloc); + + rapidjson::Value detector_name; + detector_name.SetString(config_.DETECTOR_NAME.c_str(), header_alloc); + header.AddMember("detector_name", detector_name, header_alloc); + + header.AddMember("htype", "array-1.0", header_alloc); + header.AddMember("type", "uint16", header_alloc); + + // To be retrieved and filled with correct values down. + auto shape_value = rapidjson::Value(rapidjson::kArrayType); + shape_value.PushBack((uint64_t)0, header_alloc); + shape_value.PushBack((uint64_t)0, header_alloc); + header.AddMember("shape", shape_value, header_alloc); + + int send_streamvis = 0; + if ( config_.reduction_factor_streamvis > 1 ) { + send_streamvis = rand() % config_.reduction_factor_streamvis; + } + if ( send_streamvis == 0 ) { + auto& shape = header["shape"]; + shape[0] = config_.n_modules*512; + shape[1] = 1024; + } else{ + auto& shape = header["shape"]; + shape[0] = 2; + shape[1] = 2; + } + + { + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + header.Accept(writer); + + text_header = buffer.GetString(); + } + + zmq_send(socket_streamvis_, + text_header.c_str(), + text_header.size(), + ZMQ_SNDMORE); + + if ( send_streamvis == 0 ) { + zmq_send(socket_streamvis_, + (char*)data, + buffer_config::MODULE_N_BYTES * config_.n_modules, + 0); + } else { + zmq_send(socket_streamvis_, + (char*)data_empty, + 8, + 0); + } + + //same for live analysis + int send_live_analysis = 0; + if ( config_.reduction_factor_live_analysis > 1 ) { + send_live_analysis = rand() % config_.reduction_factor_live_analysis; + } + if ( send_live_analysis == 0 ) { + auto& shape = header["shape"]; + shape[0] = config_.n_modules*512; + shape[1] = 1024; + } else{ + auto& shape = header["shape"]; + shape[0] = 2; + shape[1] = 2; + } + + { + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + header.Accept(writer); + + text_header = buffer.GetString(); + } + + zmq_send(socket_live_, + text_header.c_str(), + text_header.size(), + ZMQ_SNDMORE); + + if ( send_live_analysis == 0 ) { + zmq_send(socket_live_, + (char*)data, + buffer_config::MODULE_N_BYTES * config_.n_modules, + 0); + } else { + zmq_send(socket_live_, + (char*)data_empty, + 8, + 0); + } +} + diff --git a/sf-stream/src/main.cpp b/sf-stream/src/main.cpp index 83d3f9d..78e798d 100644 --- a/sf-stream/src/main.cpp +++ b/sf-stream/src/main.cpp @@ -1,22 +1,16 @@ #include -#include #include -#include #include #include #include -#include "rapidjson/istreamwrapper.h" -#include -#include "rapidjson/document.h" -#include "rapidjson/stringbuffer.h" -#include "rapidjson/writer.h" -#include "FastQueue.hpp" #include "LiveRecvModule.hpp" #include "buffer_config.hpp" #include "stream_config.hpp" +#include "ZmqLiveSender.hpp" using namespace std; +using namespace chrono; using namespace buffer_config; using namespace stream_config; @@ -27,239 +21,75 @@ int main (int argc, char *argv[]) cout << "Usage: sf_stream "; cout << " [config_json_file]"; cout << endl; - cout << "\tconfig_json_file: json file with the configuration parameters(detector name, number of modules, pedestal and gain files" << endl; + cout << "\tconfig_json_file: json file with the configuration " + "parameters(detector name, number of modules, pedestal and " + "gain files" << endl; cout << endl; exit(-1); } - string config_json_file = string(argv[1]); - - ifstream ifs(config_json_file); - rapidjson::IStreamWrapper isw(ifs); - rapidjson::Document config_parameters; - config_parameters.ParseStream(isw); + auto config = read_json_config(string(argv[1])); + string RECV_IPC_URL = BUFFER_LIVE_IPC_URL + config.DETECTOR_NAME + "-"; - string streamvis_address = config_parameters["streamvis_stream"].GetString(); - int reduction_factor_streamvis = config_parameters["streamvis_rate"].GetInt(); - string live_analysis_address = config_parameters["live_stream"].GetString(); - int reduction_factor_live_analysis = config_parameters["live_rate"].GetInt(); - - const string PEDE_FILENAME = config_parameters["pedestal_file"].GetString(); - const string GAIN_FILENAME = config_parameters["gain_file"].GetString(); - const string DETECTOR_NAME = config_parameters["detector_name"].GetString(); - size_t n_modules = config_parameters["n_modules"].GetInt(); - - FastQueue queue( - n_modules * MODULE_N_BYTES, STREAM_FASTQUEUE_SLOTS); + ModuleFrameBuffer* meta = new ModuleFrameBuffer(); + char* data = new char[config.n_modules * MODULE_N_BYTES]; auto ctx = zmq_ctx_new(); zmq_ctx_set (ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); - const string LIVE_IPC_URL = BUFFER_LIVE_IPC_URL+DETECTOR_NAME + "-"; - ZmqLiveReceiver receiver(n_modules, ctx, LIVE_IPC_URL); - - LiveRecvModule recv_module(queue, receiver); - - // 0mq sockets to streamvis and live analysis - void *socket_streamvis = zmq_socket(ctx, ZMQ_PUB); - if (zmq_bind(socket_streamvis, streamvis_address.c_str()) != 0) { - throw runtime_error(strerror(errno)); - } - void *socket_live = zmq_socket(ctx, ZMQ_PUB); - if (zmq_bind(socket_live, live_analysis_address.c_str()) != 0) { - throw runtime_error(strerror(errno)); - } - - uint16_t data_empty [] = { 0, 0, 0, 0}; + ZmqLiveReceiver receiver(config.n_modules, ctx, RECV_IPC_URL); + ZmqLiveSender sender(ctx, config); // TODO: Remove stats trash. int stats_counter = 0; - size_t read_total_us = 0; size_t read_max_us = 0; + size_t send_total_us = 0; + size_t send_max_us = 0; while (true) { - rapidjson::Document header(rapidjson::kObjectType); - auto& header_alloc = header.GetAllocator(); - string text_header; + auto start_time = steady_clock::now(); - auto start_time = chrono::steady_clock::now(); + auto n_lost_pulses = receiver.get_next_image(meta, data); - auto slot_id = queue.read(); - - if(slot_id == -1) { - this_thread::sleep_for(chrono::milliseconds( - buffer_config::RB_READ_RETRY_INTERVAL_MS)); - continue; + if (n_lost_pulses > 0) { + cout << "sf_stream:sync_lost_pulses " << n_lost_pulses << endl; } - auto metadata = queue.get_metadata_buffer(slot_id); - auto data = queue.get_data_buffer(slot_id); + auto end_time = steady_clock::now(); + size_t read_us_duration = duration_cast( + end_time - start_time).count(); - auto read_end_time = chrono::steady_clock::now(); - auto read_us_duration = chrono::duration_cast( - read_end_time-start_time).count(); + start_time = steady_clock::now(); - uint64_t pulse_id = 0; - uint64_t frame_index = 0; - uint64_t daq_rec = 0; - bool is_good_frame = true; + sender.send(meta, data); - for (size_t i_module = 0; i_module < n_modules; i_module++) { - // TODO: Place this tests in the appropriate spot. - auto& module_metadata = metadata->module[i_module]; - if (i_module == 0) { - pulse_id = module_metadata.pulse_id; - frame_index = module_metadata.frame_index; - daq_rec = module_metadata.daq_rec; - - if (module_metadata.n_recv_packets != 128 ) is_good_frame = false; - } else { - if (module_metadata.pulse_id != pulse_id) is_good_frame = false; - - if (module_metadata.frame_index != frame_index) is_good_frame = false; - - if (module_metadata.daq_rec != daq_rec) is_good_frame = false; - - if (module_metadata.n_recv_packets != 128 ) is_good_frame = false; - } - } - - // TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame) - - header.AddMember("frame", frame_index, header_alloc); - header.AddMember("is_good_frame", is_good_frame, header_alloc); - header.AddMember("daq_rec", daq_rec, header_alloc); - header.AddMember("pulse_id", pulse_id, header_alloc); - - rapidjson::Value pedestal_file; - pedestal_file.SetString(PEDE_FILENAME.c_str(), header_alloc); - header.AddMember("pedestal_file", pedestal_file, header_alloc); - - rapidjson::Value gain_file; - gain_file.SetString(GAIN_FILENAME.c_str(), header_alloc); - header.AddMember("gain_file", gain_file, header_alloc); - - header.AddMember("number_frames_expected", 10000, header_alloc); - - rapidjson::Value run_name; - run_name.SetString( - to_string(uint64_t(pulse_id/10000)*10000).c_str(), - header_alloc); - header.AddMember("run_name", run_name, header_alloc); - - rapidjson::Value detector_name; - detector_name.SetString(DETECTOR_NAME.c_str(), header_alloc); - header.AddMember("detector_name", detector_name, header_alloc); - - header.AddMember("htype", "array-1.0", header_alloc); - header.AddMember("type", "uint16", header_alloc); - - // To be retrieved and filled with correct values down. - auto shape_value = rapidjson::Value(rapidjson::kArrayType); - shape_value.PushBack((uint64_t)0, header_alloc); - shape_value.PushBack((uint64_t)0, header_alloc); - header.AddMember("shape", shape_value, header_alloc); - - int send_streamvis = 0; - if ( reduction_factor_streamvis > 1 ) { - send_streamvis = rand() % reduction_factor_streamvis; - } - if ( send_streamvis == 0 ) { - auto& shape = header["shape"]; - shape[0] = n_modules*512; - shape[1] = 1024; - } else{ - auto& shape = header["shape"]; - shape[0] = 2; - shape[1] = 2; - } - - { - rapidjson::StringBuffer buffer; - rapidjson::Writer writer(buffer); - header.Accept(writer); - - text_header = buffer.GetString(); - } - - zmq_send(socket_streamvis, - text_header.c_str(), - text_header.size(), - ZMQ_SNDMORE); - - if ( send_streamvis == 0 ) { - zmq_send(socket_streamvis, - (char*)data, - buffer_config::MODULE_N_BYTES * n_modules, - 0); - } else { - zmq_send(socket_streamvis, - (char*)data_empty, - 8, - 0); - } - - //same for live analysis - int send_live_analysis = 0; - if ( reduction_factor_live_analysis > 1 ) { - send_live_analysis = rand() % reduction_factor_live_analysis; - } - if ( send_live_analysis == 0 ) { - auto& shape = header["shape"]; - shape[0] = n_modules*512; - shape[1] = 1024; - } else{ - auto& shape = header["shape"]; - shape[0] = 2; - shape[1] = 2; - } - - { - rapidjson::StringBuffer buffer; - rapidjson::Writer writer(buffer); - header.Accept(writer); - - text_header = buffer.GetString(); - } - - zmq_send(socket_live, - text_header.c_str(), - text_header.size(), - ZMQ_SNDMORE); - - if ( send_live_analysis == 0 ) { - zmq_send(socket_live, - (char*)data, - buffer_config::MODULE_N_BYTES * n_modules, - 0); - } else { - zmq_send(socket_live, - (char*)data_empty, - 8, - 0); - } - - queue.release(); + end_time = steady_clock::now(); + size_t send_us_duration = duration_cast( + end_time - start_time).count(); // TODO: Some poor statistics. stats_counter++; read_total_us += read_us_duration; + send_total_us += send_us_duration; - if (read_us_duration > read_max_us) { - read_max_us = read_us_duration; - } + read_max_us = max(read_max_us, read_us_duration); + send_max_us = max(send_max_us, send_us_duration); if (stats_counter == STATS_MODULO) { cout << "sf_stream:read_us " << read_total_us / STATS_MODULO; cout << " sf_stream:read_max_us " << read_max_us; + cout << " sf_stream:send_us " << send_total_us / STATS_MODULO; + cout << " sf_stream:send_max_us " << send_max_us; cout << endl; stats_counter = 0; read_total_us = 0; read_max_us = 0; + send_total_us = 0; + send_max_us = 0; } } }