diff --git a/sf-stream/src/main.cpp b/sf-stream/src/main.cpp index 83d3f9d..2a48505 100644 --- a/sf-stream/src/main.cpp +++ b/sf-stream/src/main.cpp @@ -1,22 +1,17 @@ #include -#include #include -#include #include #include #include -#include "rapidjson/istreamwrapper.h" -#include -#include "rapidjson/document.h" -#include "rapidjson/stringbuffer.h" -#include "rapidjson/writer.h" #include "FastQueue.hpp" #include "LiveRecvModule.hpp" #include "buffer_config.hpp" #include "stream_config.hpp" +#include "ZmqLiveSender.hpp" using namespace std; +using namespace chrono; using namespace buffer_config; using namespace stream_config; @@ -27,239 +22,81 @@ int main (int argc, char *argv[]) cout << "Usage: sf_stream "; cout << " [config_json_file]"; cout << endl; - cout << "\tconfig_json_file: json file with the configuration parameters(detector name, number of modules, pedestal and gain files" << endl; + cout << "\tconfig_json_file: json file with the configuration " + "parameters(detector name, number of modules, pedestal and " + "gain files" << endl; cout << endl; exit(-1); } - string config_json_file = string(argv[1]); - - ifstream ifs(config_json_file); - rapidjson::IStreamWrapper isw(ifs); - rapidjson::Document config_parameters; - config_parameters.ParseStream(isw); - - string streamvis_address = config_parameters["streamvis_stream"].GetString(); - int reduction_factor_streamvis = config_parameters["streamvis_rate"].GetInt(); - string live_analysis_address = config_parameters["live_stream"].GetString(); - int reduction_factor_live_analysis = config_parameters["live_rate"].GetInt(); - - const string PEDE_FILENAME = config_parameters["pedestal_file"].GetString(); - const string GAIN_FILENAME = config_parameters["gain_file"].GetString(); - const string DETECTOR_NAME = config_parameters["detector_name"].GetString(); - size_t n_modules = config_parameters["n_modules"].GetInt(); + auto config = read_json_config(string(argv[1])); + string RECV_IPC_URL = BUFFER_LIVE_IPC_URL + config.DETECTOR_NAME + "-"; FastQueue queue( - n_modules * MODULE_N_BYTES, STREAM_FASTQUEUE_SLOTS); + config.n_modules * MODULE_N_BYTES, STREAM_FASTQUEUE_SLOTS); auto ctx = zmq_ctx_new(); zmq_ctx_set (ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); - const string LIVE_IPC_URL = BUFFER_LIVE_IPC_URL+DETECTOR_NAME + "-"; - ZmqLiveReceiver receiver(n_modules, ctx, LIVE_IPC_URL); - + ZmqLiveReceiver receiver(config.n_modules, ctx, RECV_IPC_URL); LiveRecvModule recv_module(queue, receiver); - // 0mq sockets to streamvis and live analysis - void *socket_streamvis = zmq_socket(ctx, ZMQ_PUB); - if (zmq_bind(socket_streamvis, streamvis_address.c_str()) != 0) { - throw runtime_error(strerror(errno)); - } - void *socket_live = zmq_socket(ctx, ZMQ_PUB); - if (zmq_bind(socket_live, live_analysis_address.c_str()) != 0) { - throw runtime_error(strerror(errno)); - } - - uint16_t data_empty [] = { 0, 0, 0, 0}; + ZmqLiveSender sender(ctx, config); // TODO: Remove stats trash. int stats_counter = 0; - size_t read_total_us = 0; size_t read_max_us = 0; + size_t send_total_us = 0; + size_t send_max_us = 0; while (true) { - rapidjson::Document header(rapidjson::kObjectType); - auto& header_alloc = header.GetAllocator(); - string text_header; + auto start_time = steady_clock::now(); - auto start_time = chrono::steady_clock::now(); - - auto slot_id = queue.read(); - - if(slot_id == -1) { - this_thread::sleep_for(chrono::milliseconds( - buffer_config::RB_READ_RETRY_INTERVAL_MS)); - continue; + int slot_id; + while((slot_id = queue.read()) == -1) { + this_thread::sleep_for(milliseconds(RB_READ_RETRY_INTERVAL_MS)); } - auto metadata = queue.get_metadata_buffer(slot_id); + auto meta = queue.get_metadata_buffer(slot_id); auto data = queue.get_data_buffer(slot_id); - auto read_end_time = chrono::steady_clock::now(); - auto read_us_duration = chrono::duration_cast( - read_end_time-start_time).count(); + auto end_time = steady_clock::now(); + size_t read_us_duration = duration_cast( + end_time - start_time).count(); - uint64_t pulse_id = 0; - uint64_t frame_index = 0; - uint64_t daq_rec = 0; - bool is_good_frame = true; + start_time = steady_clock::now(); - for (size_t i_module = 0; i_module < n_modules; i_module++) { - // TODO: Place this tests in the appropriate spot. - auto& module_metadata = metadata->module[i_module]; - if (i_module == 0) { - pulse_id = module_metadata.pulse_id; - frame_index = module_metadata.frame_index; - daq_rec = module_metadata.daq_rec; + sender.send(meta, data); - if (module_metadata.n_recv_packets != 128 ) is_good_frame = false; - } else { - if (module_metadata.pulse_id != pulse_id) is_good_frame = false; - - if (module_metadata.frame_index != frame_index) is_good_frame = false; - - if (module_metadata.daq_rec != daq_rec) is_good_frame = false; - - if (module_metadata.n_recv_packets != 128 ) is_good_frame = false; - } - } - - // TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame) - - header.AddMember("frame", frame_index, header_alloc); - header.AddMember("is_good_frame", is_good_frame, header_alloc); - header.AddMember("daq_rec", daq_rec, header_alloc); - header.AddMember("pulse_id", pulse_id, header_alloc); - - rapidjson::Value pedestal_file; - pedestal_file.SetString(PEDE_FILENAME.c_str(), header_alloc); - header.AddMember("pedestal_file", pedestal_file, header_alloc); - - rapidjson::Value gain_file; - gain_file.SetString(GAIN_FILENAME.c_str(), header_alloc); - header.AddMember("gain_file", gain_file, header_alloc); - - header.AddMember("number_frames_expected", 10000, header_alloc); - - rapidjson::Value run_name; - run_name.SetString( - to_string(uint64_t(pulse_id/10000)*10000).c_str(), - header_alloc); - header.AddMember("run_name", run_name, header_alloc); - - rapidjson::Value detector_name; - detector_name.SetString(DETECTOR_NAME.c_str(), header_alloc); - header.AddMember("detector_name", detector_name, header_alloc); - - header.AddMember("htype", "array-1.0", header_alloc); - header.AddMember("type", "uint16", header_alloc); - - // To be retrieved and filled with correct values down. - auto shape_value = rapidjson::Value(rapidjson::kArrayType); - shape_value.PushBack((uint64_t)0, header_alloc); - shape_value.PushBack((uint64_t)0, header_alloc); - header.AddMember("shape", shape_value, header_alloc); - - int send_streamvis = 0; - if ( reduction_factor_streamvis > 1 ) { - send_streamvis = rand() % reduction_factor_streamvis; - } - if ( send_streamvis == 0 ) { - auto& shape = header["shape"]; - shape[0] = n_modules*512; - shape[1] = 1024; - } else{ - auto& shape = header["shape"]; - shape[0] = 2; - shape[1] = 2; - } - - { - rapidjson::StringBuffer buffer; - rapidjson::Writer writer(buffer); - header.Accept(writer); - - text_header = buffer.GetString(); - } - - zmq_send(socket_streamvis, - text_header.c_str(), - text_header.size(), - ZMQ_SNDMORE); - - if ( send_streamvis == 0 ) { - zmq_send(socket_streamvis, - (char*)data, - buffer_config::MODULE_N_BYTES * n_modules, - 0); - } else { - zmq_send(socket_streamvis, - (char*)data_empty, - 8, - 0); - } - - //same for live analysis - int send_live_analysis = 0; - if ( reduction_factor_live_analysis > 1 ) { - send_live_analysis = rand() % reduction_factor_live_analysis; - } - if ( send_live_analysis == 0 ) { - auto& shape = header["shape"]; - shape[0] = n_modules*512; - shape[1] = 1024; - } else{ - auto& shape = header["shape"]; - shape[0] = 2; - shape[1] = 2; - } - - { - rapidjson::StringBuffer buffer; - rapidjson::Writer writer(buffer); - header.Accept(writer); - - text_header = buffer.GetString(); - } - - zmq_send(socket_live, - text_header.c_str(), - text_header.size(), - ZMQ_SNDMORE); - - if ( send_live_analysis == 0 ) { - zmq_send(socket_live, - (char*)data, - buffer_config::MODULE_N_BYTES * n_modules, - 0); - } else { - zmq_send(socket_live, - (char*)data_empty, - 8, - 0); - } + end_time = steady_clock::now(); + size_t send_us_duration = duration_cast( + end_time - start_time).count(); queue.release(); // TODO: Some poor statistics. stats_counter++; read_total_us += read_us_duration; + send_total_us += send_us_duration; - if (read_us_duration > read_max_us) { - read_max_us = read_us_duration; - } + read_max_us = max(read_max_us, read_us_duration); + send_max_us = max(send_max_us, send_us_duration); if (stats_counter == STATS_MODULO) { cout << "sf_stream:read_us " << read_total_us / STATS_MODULO; cout << " sf_stream:read_max_us " << read_max_us; + cout << "sf_stream:send_us " << send_total_us / STATS_MODULO; + cout << " sf_stream:send_max_us " << send_max_us; cout << endl; stats_counter = 0; read_total_us = 0; read_max_us = 0; + send_total_us = 0; + send_max_us = 0; } } }