mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-04-21 15:44:37 +02:00
sf_stream sending to streamvis/live analysis
This commit is contained in:
+116
-121
@@ -12,6 +12,7 @@
|
||||
#include <zmq.h>
|
||||
#include <LiveRecvModule.hpp>
|
||||
#include "date.h"
|
||||
#include <jsoncpp/json/json.h>
|
||||
|
||||
using namespace std;
|
||||
using namespace core_buffer;
|
||||
@@ -49,19 +50,19 @@ int main (int argc, char *argv[])
|
||||
|
||||
LiveRecvModule recv_module(queue, n_modules, ctx, ipc_prefix);
|
||||
|
||||
// // 0mq sockets to streamvis and live analysis
|
||||
// void *socket_streamvis = zmq_socket(ctx, ZMQ_PUB);
|
||||
// if (zmq_bind(socket_streamvis, streamvis_address.c_str()) != 0) {
|
||||
// throw runtime_error(strerror(errno));
|
||||
// }
|
||||
// void *socket_live = zmq_socket(ctx, ZMQ_PUB);
|
||||
// if (zmq_bind(socket_live, live_analysis_address.c_str()) != 0) {
|
||||
// throw runtime_error(strerror(errno));
|
||||
// }
|
||||
//
|
||||
// uint16_t data_empty [] = { 0, 0, 0, 0};
|
||||
// Json::Value header;
|
||||
// Json::StreamWriterBuilder builder;
|
||||
// 0mq sockets to streamvis and live analysis
|
||||
void *socket_streamvis = zmq_socket(ctx, ZMQ_PUB);
|
||||
if (zmq_bind(socket_streamvis, streamvis_address.c_str()) != 0) {
|
||||
throw runtime_error(strerror(errno));
|
||||
}
|
||||
void *socket_live = zmq_socket(ctx, ZMQ_PUB);
|
||||
if (zmq_bind(socket_live, live_analysis_address.c_str()) != 0) {
|
||||
throw runtime_error(strerror(errno));
|
||||
}
|
||||
|
||||
uint16_t data_empty [] = { 0, 0, 0, 0};
|
||||
Json::Value header;
|
||||
Json::StreamWriterBuilder builder;
|
||||
|
||||
// TODO: Remove stats trash.
|
||||
int stats_counter = 0;
|
||||
@@ -88,119 +89,113 @@ int main (int argc, char *argv[])
|
||||
auto read_us_duration = chrono::duration_cast<chrono::microseconds>(
|
||||
read_end_time-start_time).count();
|
||||
|
||||
uint64_t pulse_id = 0;
|
||||
uint64_t frame_index = 0;
|
||||
uint64_t daq_rec = 0;
|
||||
bool is_good_frame = true;
|
||||
|
||||
for (size_t i_module = 0; i_module < n_modules; i_module++) {
|
||||
// TODO: Place this tests in the appropriate spot.
|
||||
// if (frame_meta_buffer->pulse_id[i_buffer] !=
|
||||
// module_meta_buffer->pulse_id) {
|
||||
// frame_meta_buffer->is_good_frame[i_buffer] = false;
|
||||
// //throw runtime_error("Unexpected pulse_id received.");
|
||||
// }
|
||||
//
|
||||
// if (frame_meta_buffer->frame_index[i_buffer] !=
|
||||
// module_meta_buffer->frame_index) {
|
||||
// frame_meta_buffer->is_good_frame[i_buffer] = false;
|
||||
// }
|
||||
//
|
||||
// if (frame_meta_buffer->daq_rec[i_buffer] !=
|
||||
// module_meta_buffer->daq_rec) {
|
||||
// frame_meta_buffer->is_good_frame[i_buffer] = false;
|
||||
// }
|
||||
//
|
||||
// if ( module_meta_buffer->n_received_packets != 128 ) {
|
||||
// frame_meta_buffer->is_good_frame[i_buffer] = false;
|
||||
// }
|
||||
auto& module_metadata = metadata->module[i_module];
|
||||
if (i_module == 0) {
|
||||
pulse_id = module_metadata.pulse_id;
|
||||
frame_index = module_metadata.frame_index;
|
||||
daq_rec = module_metadata.daq_rec;
|
||||
|
||||
if ( module_metadata.n_received_packets != 128 ) is_good_frame = false;
|
||||
} else {
|
||||
if (module_metadata.pulse_id != pulse_id) is_good_frame = false;
|
||||
|
||||
if (module_metadata.frame_index != frame_index) is_good_frame = false;
|
||||
|
||||
if (module_metadata.daq_rec != daq_rec) is_good_frame = false;
|
||||
|
||||
if (module_metadata.n_received_packets != 128 ) is_good_frame = false;
|
||||
}
|
||||
}
|
||||
|
||||
//Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame)
|
||||
|
||||
// for ( size_t i_buffer=0; i_buffer<WRITER_N_FRAMES_BUFFER_live; i_buffer++) {
|
||||
// //for (size_t i_module = 0; i_module < n_modules; i_module++) {
|
||||
// // cout << metadata->pulse_id[i_buffer*n_modules+i_module] << " ";
|
||||
// //}
|
||||
// //cout << endl;
|
||||
// //cout << metadata->is_good_frame[i_buffer] << " " << metadata->pulse_id[i_buffer] << " " << metadata->frame_index[i_buffer] << " " << metadata->daq_rec[i_buffer] << " " << metadata->n_received_packets[i_buffer] << " " << endl;
|
||||
// header["frame"] = (Json::Value::UInt64)metadata->frame_index[i_buffer];
|
||||
// header["is_good_frame"] = metadata->is_good_frame[i_buffer];
|
||||
// header["daq_rec"] = metadata->daq_rec[i_buffer];
|
||||
// header["pulse_id"] = (Json::Value::UInt64)metadata->pulse_id[i_buffer];
|
||||
//
|
||||
// //this needs to be re-read from external source
|
||||
// header["pedestal_file"] = "/sf/bernina/data/p17534/res/JF_pedestals/pedestal_20200423_1018.JF07T32V01.res.h5";
|
||||
// header["gain_file"] = "/sf/bernina/config/jungfrau/gainMaps/JF07T32V01/gains.h5";
|
||||
//
|
||||
// header["number_frames_expected"] = 10000;
|
||||
// header["run_name"] = to_string(uint64_t(metadata->pulse_id[i_buffer]/10000)*10000);
|
||||
//
|
||||
// // detector name should come as parameter to sf_stream
|
||||
// header["detector_name"] = "JF07T32V01";
|
||||
//
|
||||
// header["htype"] = "array-1.0";
|
||||
// header["type"] = "uint16";
|
||||
//
|
||||
// int send_streamvis = 0;
|
||||
// if ( reduction_factor_streamvis > 1 ) {
|
||||
// send_streamvis = rand() % reduction_factor_streamvis;
|
||||
// }
|
||||
// if ( send_streamvis == 0 ) {
|
||||
// header["shape"][0] = 16384;
|
||||
// header["shape"][1] = 1024;
|
||||
// } else{
|
||||
// header["shape"][0] = 2;
|
||||
// header["shape"][1] = 2;
|
||||
// }
|
||||
//
|
||||
// string text_header = Json::writeString(builder, header);
|
||||
//
|
||||
// zmq_send(socket_streamvis,
|
||||
// text_header.c_str(),
|
||||
// text_header.size(),
|
||||
// ZMQ_SNDMORE);
|
||||
//
|
||||
// if ( send_streamvis == 0 ) {
|
||||
// zmq_send(socket_streamvis,
|
||||
// (char*)data,
|
||||
// core_buffer::MODULE_N_BYTES*n_modules,
|
||||
// 0);
|
||||
// } else {
|
||||
// zmq_send(socket_streamvis,
|
||||
// (char*)data_empty,
|
||||
// 8,
|
||||
// 0);
|
||||
// }
|
||||
//
|
||||
// //same for live analysis
|
||||
// int send_live_analysis = 0;
|
||||
// if ( reduction_factor_live_analysis > 1 ) {
|
||||
// send_live_analysis = rand() % reduction_factor_live_analysis;
|
||||
// }
|
||||
// if ( send_live_analysis == 0 ) {
|
||||
// header["shape"][0] = 16384;
|
||||
// header["shape"][1] = 1024;
|
||||
// } else{
|
||||
// header["shape"][0] = 2;
|
||||
// header["shape"][1] = 2;
|
||||
// }
|
||||
//
|
||||
// text_header = Json::writeString(builder, header);
|
||||
//
|
||||
// zmq_send(socket_live,
|
||||
// text_header.c_str(),
|
||||
// text_header.size(),
|
||||
// ZMQ_SNDMORE);
|
||||
//
|
||||
// if ( send_live_analysis == 0 ) {
|
||||
// zmq_send(socket_live,
|
||||
// (char*)data,
|
||||
// core_buffer::MODULE_N_BYTES*n_modules,
|
||||
// 0);
|
||||
// } else {
|
||||
// zmq_send(socket_live,
|
||||
// (char*)data_empty,
|
||||
// 8,
|
||||
// 0);
|
||||
// }
|
||||
//
|
||||
//
|
||||
// }
|
||||
header["frame"] = (Json::Value::UInt64)frame_index;
|
||||
header["is_good_frame"] = is_good_frame;
|
||||
header["daq_rec"] = (Json::Value::UInt64)daq_rec;
|
||||
header["pulse_id"] = (Json::Value::UInt64)pulse_id;
|
||||
|
||||
//this needs to be re-read from external source
|
||||
header["pedestal_file"] = "/sf/bernina/data/p17534/res/JF_pedestals/pedestal_20200423_1018.JF07T32V01.res.h5";
|
||||
header["gain_file"] = "/sf/bernina/config/jungfrau/gainMaps/JF07T32V01/gains.h5";
|
||||
|
||||
header["number_frames_expected"] = 10000;
|
||||
header["run_name"] = to_string(uint64_t(pulse_id/10000)*10000);
|
||||
|
||||
// detector name should come as parameter to sf_stream
|
||||
header["detector_name"] = "JF07T32V01";
|
||||
|
||||
header["htype"] = "array-1.0";
|
||||
header["type"] = "uint16";
|
||||
|
||||
int send_streamvis = 0;
|
||||
if ( reduction_factor_streamvis > 1 ) {
|
||||
send_streamvis = rand() % reduction_factor_streamvis;
|
||||
}
|
||||
if ( send_streamvis == 0 ) {
|
||||
header["shape"][0] = 16384;
|
||||
header["shape"][1] = 1024;
|
||||
} else{
|
||||
header["shape"][0] = 2;
|
||||
header["shape"][1] = 2;
|
||||
}
|
||||
|
||||
string text_header = Json::writeString(builder, header);
|
||||
|
||||
zmq_send(socket_streamvis,
|
||||
text_header.c_str(),
|
||||
text_header.size(),
|
||||
ZMQ_SNDMORE);
|
||||
|
||||
if ( send_streamvis == 0 ) {
|
||||
zmq_send(socket_streamvis,
|
||||
(char*)data,
|
||||
core_buffer::MODULE_N_BYTES*n_modules,
|
||||
0);
|
||||
} else {
|
||||
zmq_send(socket_streamvis,
|
||||
(char*)data_empty,
|
||||
8,
|
||||
0);
|
||||
}
|
||||
|
||||
//same for live analysis
|
||||
int send_live_analysis = 0;
|
||||
if ( reduction_factor_live_analysis > 1 ) {
|
||||
send_live_analysis = rand() % reduction_factor_live_analysis;
|
||||
}
|
||||
if ( send_live_analysis == 0 ) {
|
||||
header["shape"][0] = 16384;
|
||||
header["shape"][1] = 1024;
|
||||
} else{
|
||||
header["shape"][0] = 2;
|
||||
header["shape"][1] = 2;
|
||||
}
|
||||
|
||||
text_header = Json::writeString(builder, header);
|
||||
|
||||
zmq_send(socket_live,
|
||||
text_header.c_str(),
|
||||
text_header.size(),
|
||||
ZMQ_SNDMORE);
|
||||
|
||||
if ( send_live_analysis == 0 ) {
|
||||
zmq_send(socket_live,
|
||||
(char*)data,
|
||||
core_buffer::MODULE_N_BYTES*n_modules,
|
||||
0);
|
||||
} else {
|
||||
zmq_send(socket_live,
|
||||
(char*)data_empty,
|
||||
8,
|
||||
0);
|
||||
}
|
||||
|
||||
queue.release();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user