From 40b8d91c6db17775b9b341dd276a4495843814cd Mon Sep 17 00:00:00 2001 From: Dmitry Ozerov Date: Wed, 6 May 2020 22:47:05 +0200 Subject: [PATCH] sf_stream sending to streamvis/live analysis --- sf-buffer/src/sf_stream.cpp | 237 ++++++++++++++++++------------------ 1 file changed, 116 insertions(+), 121 deletions(-) diff --git a/sf-buffer/src/sf_stream.cpp b/sf-buffer/src/sf_stream.cpp index f30640b..9aecb44 100644 --- a/sf-buffer/src/sf_stream.cpp +++ b/sf-buffer/src/sf_stream.cpp @@ -12,6 +12,7 @@ #include #include #include "date.h" +#include using namespace std; using namespace core_buffer; @@ -49,19 +50,19 @@ int main (int argc, char *argv[]) LiveRecvModule recv_module(queue, n_modules, ctx, ipc_prefix); -// // 0mq sockets to streamvis and live analysis -// void *socket_streamvis = zmq_socket(ctx, ZMQ_PUB); -// if (zmq_bind(socket_streamvis, streamvis_address.c_str()) != 0) { -// throw runtime_error(strerror(errno)); -// } -// void *socket_live = zmq_socket(ctx, ZMQ_PUB); -// if (zmq_bind(socket_live, live_analysis_address.c_str()) != 0) { -// throw runtime_error(strerror(errno)); -// } -// -// uint16_t data_empty [] = { 0, 0, 0, 0}; -// Json::Value header; -// Json::StreamWriterBuilder builder; + // 0mq sockets to streamvis and live analysis + void *socket_streamvis = zmq_socket(ctx, ZMQ_PUB); + if (zmq_bind(socket_streamvis, streamvis_address.c_str()) != 0) { + throw runtime_error(strerror(errno)); + } + void *socket_live = zmq_socket(ctx, ZMQ_PUB); + if (zmq_bind(socket_live, live_analysis_address.c_str()) != 0) { + throw runtime_error(strerror(errno)); + } + + uint16_t data_empty [] = { 0, 0, 0, 0}; + Json::Value header; + Json::StreamWriterBuilder builder; // TODO: Remove stats trash. int stats_counter = 0; @@ -88,119 +89,113 @@ int main (int argc, char *argv[]) auto read_us_duration = chrono::duration_cast( read_end_time-start_time).count(); + uint64_t pulse_id = 0; + uint64_t frame_index = 0; + uint64_t daq_rec = 0; + bool is_good_frame = true; + for (size_t i_module = 0; i_module < n_modules; i_module++) { // TODO: Place this tests in the appropriate spot. -// if (frame_meta_buffer->pulse_id[i_buffer] != -// module_meta_buffer->pulse_id) { -// frame_meta_buffer->is_good_frame[i_buffer] = false; -// //throw runtime_error("Unexpected pulse_id received."); -// } -// -// if (frame_meta_buffer->frame_index[i_buffer] != -// module_meta_buffer->frame_index) { -// frame_meta_buffer->is_good_frame[i_buffer] = false; -// } -// -// if (frame_meta_buffer->daq_rec[i_buffer] != -// module_meta_buffer->daq_rec) { -// frame_meta_buffer->is_good_frame[i_buffer] = false; -// } -// -// if ( module_meta_buffer->n_received_packets != 128 ) { -// frame_meta_buffer->is_good_frame[i_buffer] = false; -// } + auto& module_metadata = metadata->module[i_module]; + if (i_module == 0) { + pulse_id = module_metadata.pulse_id; + frame_index = module_metadata.frame_index; + daq_rec = module_metadata.daq_rec; + + if ( module_metadata.n_received_packets != 128 ) is_good_frame = false; + } else { + if (module_metadata.pulse_id != pulse_id) is_good_frame = false; + + if (module_metadata.frame_index != frame_index) is_good_frame = false; + + if (module_metadata.daq_rec != daq_rec) is_good_frame = false; + + if (module_metadata.n_received_packets != 128 ) is_good_frame = false; + } + } //Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame) -// for ( size_t i_buffer=0; i_bufferpulse_id[i_buffer*n_modules+i_module] << " "; -// //} -// //cout << endl; -// //cout << metadata->is_good_frame[i_buffer] << " " << metadata->pulse_id[i_buffer] << " " << metadata->frame_index[i_buffer] << " " << metadata->daq_rec[i_buffer] << " " << metadata->n_received_packets[i_buffer] << " " << endl; -// header["frame"] = (Json::Value::UInt64)metadata->frame_index[i_buffer]; -// header["is_good_frame"] = metadata->is_good_frame[i_buffer]; -// header["daq_rec"] = metadata->daq_rec[i_buffer]; -// header["pulse_id"] = (Json::Value::UInt64)metadata->pulse_id[i_buffer]; -// -// //this needs to be re-read from external source -// header["pedestal_file"] = "/sf/bernina/data/p17534/res/JF_pedestals/pedestal_20200423_1018.JF07T32V01.res.h5"; -// header["gain_file"] = "/sf/bernina/config/jungfrau/gainMaps/JF07T32V01/gains.h5"; -// -// header["number_frames_expected"] = 10000; -// header["run_name"] = to_string(uint64_t(metadata->pulse_id[i_buffer]/10000)*10000); -// -// // detector name should come as parameter to sf_stream -// header["detector_name"] = "JF07T32V01"; -// -// header["htype"] = "array-1.0"; -// header["type"] = "uint16"; -// -// int send_streamvis = 0; -// if ( reduction_factor_streamvis > 1 ) { -// send_streamvis = rand() % reduction_factor_streamvis; -// } -// if ( send_streamvis == 0 ) { -// header["shape"][0] = 16384; -// header["shape"][1] = 1024; -// } else{ -// header["shape"][0] = 2; -// header["shape"][1] = 2; -// } -// -// string text_header = Json::writeString(builder, header); -// -// zmq_send(socket_streamvis, -// text_header.c_str(), -// text_header.size(), -// ZMQ_SNDMORE); -// -// if ( send_streamvis == 0 ) { -// zmq_send(socket_streamvis, -// (char*)data, -// core_buffer::MODULE_N_BYTES*n_modules, -// 0); -// } else { -// zmq_send(socket_streamvis, -// (char*)data_empty, -// 8, -// 0); -// } -// -// //same for live analysis -// int send_live_analysis = 0; -// if ( reduction_factor_live_analysis > 1 ) { -// send_live_analysis = rand() % reduction_factor_live_analysis; -// } -// if ( send_live_analysis == 0 ) { -// header["shape"][0] = 16384; -// header["shape"][1] = 1024; -// } else{ -// header["shape"][0] = 2; -// header["shape"][1] = 2; -// } -// -// text_header = Json::writeString(builder, header); -// -// zmq_send(socket_live, -// text_header.c_str(), -// text_header.size(), -// ZMQ_SNDMORE); -// -// if ( send_live_analysis == 0 ) { -// zmq_send(socket_live, -// (char*)data, -// core_buffer::MODULE_N_BYTES*n_modules, -// 0); -// } else { -// zmq_send(socket_live, -// (char*)data_empty, -// 8, -// 0); -// } -// -// -// } + header["frame"] = (Json::Value::UInt64)frame_index; + header["is_good_frame"] = is_good_frame; + header["daq_rec"] = (Json::Value::UInt64)daq_rec; + header["pulse_id"] = (Json::Value::UInt64)pulse_id; + + //this needs to be re-read from external source + header["pedestal_file"] = "/sf/bernina/data/p17534/res/JF_pedestals/pedestal_20200423_1018.JF07T32V01.res.h5"; + header["gain_file"] = "/sf/bernina/config/jungfrau/gainMaps/JF07T32V01/gains.h5"; + + header["number_frames_expected"] = 10000; + header["run_name"] = to_string(uint64_t(pulse_id/10000)*10000); + + // detector name should come as parameter to sf_stream + header["detector_name"] = "JF07T32V01"; + + header["htype"] = "array-1.0"; + header["type"] = "uint16"; + + int send_streamvis = 0; + if ( reduction_factor_streamvis > 1 ) { + send_streamvis = rand() % reduction_factor_streamvis; + } + if ( send_streamvis == 0 ) { + header["shape"][0] = 16384; + header["shape"][1] = 1024; + } else{ + header["shape"][0] = 2; + header["shape"][1] = 2; + } + + string text_header = Json::writeString(builder, header); + + zmq_send(socket_streamvis, + text_header.c_str(), + text_header.size(), + ZMQ_SNDMORE); + + if ( send_streamvis == 0 ) { + zmq_send(socket_streamvis, + (char*)data, + core_buffer::MODULE_N_BYTES*n_modules, + 0); + } else { + zmq_send(socket_streamvis, + (char*)data_empty, + 8, + 0); + } + + //same for live analysis + int send_live_analysis = 0; + if ( reduction_factor_live_analysis > 1 ) { + send_live_analysis = rand() % reduction_factor_live_analysis; + } + if ( send_live_analysis == 0 ) { + header["shape"][0] = 16384; + header["shape"][1] = 1024; + } else{ + header["shape"][0] = 2; + header["shape"][1] = 2; + } + + text_header = Json::writeString(builder, header); + + zmq_send(socket_live, + text_header.c_str(), + text_header.size(), + ZMQ_SNDMORE); + + if ( send_live_analysis == 0 ) { + zmq_send(socket_live, + (char*)data, + core_buffer::MODULE_N_BYTES*n_modules, + 0); + } else { + zmq_send(socket_live, + (char*)data_empty, + 8, + 0); + } queue.release();