Merge remote-tracking branch 'origin/master'

This commit is contained in:
2020-05-07 09:53:33 +02:00
3 changed files with 135 additions and 121 deletions
+14
View File
@@ -0,0 +1,14 @@
[Unit]
Description=stream service (to streamvis and live analysis) of JF07
[Service]
PermissionsStartOnly=true
Type=idle
User=root
ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF07-stream.sh
TimeoutStartSec=10
RestartSec=10
[Install]
WantedBy=multi-user.target
+5
View File
@@ -0,0 +1,5 @@
#!/bin/bash
coreAssociated="20,21,22,23"
taskset -c ${coreAssociated} /usr/bin/sf_stream tcp://129.129.241.42:9007 25 tcp://192.168.30.29:9107 10
+116 -121
View File
@@ -12,6 +12,7 @@
#include <zmq.h>
#include <LiveRecvModule.hpp>
#include "date.h"
#include <jsoncpp/json/json.h>
using namespace std;
using namespace core_buffer;
@@ -49,19 +50,19 @@ int main (int argc, char *argv[])
LiveRecvModule recv_module(queue, n_modules, ctx, ipc_prefix);
// // 0mq sockets to streamvis and live analysis
// void *socket_streamvis = zmq_socket(ctx, ZMQ_PUB);
// if (zmq_bind(socket_streamvis, streamvis_address.c_str()) != 0) {
// throw runtime_error(strerror(errno));
// }
// void *socket_live = zmq_socket(ctx, ZMQ_PUB);
// if (zmq_bind(socket_live, live_analysis_address.c_str()) != 0) {
// throw runtime_error(strerror(errno));
// }
//
// uint16_t data_empty [] = { 0, 0, 0, 0};
// Json::Value header;
// Json::StreamWriterBuilder builder;
// 0mq sockets to streamvis and live analysis
void *socket_streamvis = zmq_socket(ctx, ZMQ_PUB);
if (zmq_bind(socket_streamvis, streamvis_address.c_str()) != 0) {
throw runtime_error(strerror(errno));
}
void *socket_live = zmq_socket(ctx, ZMQ_PUB);
if (zmq_bind(socket_live, live_analysis_address.c_str()) != 0) {
throw runtime_error(strerror(errno));
}
uint16_t data_empty [] = { 0, 0, 0, 0};
Json::Value header;
Json::StreamWriterBuilder builder;
// TODO: Remove stats trash.
int stats_counter = 0;
@@ -88,119 +89,113 @@ int main (int argc, char *argv[])
auto read_us_duration = chrono::duration_cast<chrono::microseconds>(
read_end_time-start_time).count();
uint64_t pulse_id = 0;
uint64_t frame_index = 0;
uint64_t daq_rec = 0;
bool is_good_frame = true;
for (size_t i_module = 0; i_module < n_modules; i_module++) {
// TODO: Place this tests in the appropriate spot.
// if (frame_meta_buffer->pulse_id[i_buffer] !=
// module_meta_buffer->pulse_id) {
// frame_meta_buffer->is_good_frame[i_buffer] = false;
// //throw runtime_error("Unexpected pulse_id received.");
// }
//
// if (frame_meta_buffer->frame_index[i_buffer] !=
// module_meta_buffer->frame_index) {
// frame_meta_buffer->is_good_frame[i_buffer] = false;
// }
//
// if (frame_meta_buffer->daq_rec[i_buffer] !=
// module_meta_buffer->daq_rec) {
// frame_meta_buffer->is_good_frame[i_buffer] = false;
// }
//
// if ( module_meta_buffer->n_received_packets != 128 ) {
// frame_meta_buffer->is_good_frame[i_buffer] = false;
// }
auto& module_metadata = metadata->module[i_module];
if (i_module == 0) {
pulse_id = module_metadata.pulse_id;
frame_index = module_metadata.frame_index;
daq_rec = module_metadata.daq_rec;
if ( module_metadata.n_received_packets != 128 ) is_good_frame = false;
} else {
if (module_metadata.pulse_id != pulse_id) is_good_frame = false;
if (module_metadata.frame_index != frame_index) is_good_frame = false;
if (module_metadata.daq_rec != daq_rec) is_good_frame = false;
if (module_metadata.n_received_packets != 128 ) is_good_frame = false;
}
}
//Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame)
// for ( size_t i_buffer=0; i_buffer<WRITER_N_FRAMES_BUFFER_live; i_buffer++) {
// //for (size_t i_module = 0; i_module < n_modules; i_module++) {
// // cout << metadata->pulse_id[i_buffer*n_modules+i_module] << " ";
// //}
// //cout << endl;
// //cout << metadata->is_good_frame[i_buffer] << " " << metadata->pulse_id[i_buffer] << " " << metadata->frame_index[i_buffer] << " " << metadata->daq_rec[i_buffer] << " " << metadata->n_received_packets[i_buffer] << " " << endl;
// header["frame"] = (Json::Value::UInt64)metadata->frame_index[i_buffer];
// header["is_good_frame"] = metadata->is_good_frame[i_buffer];
// header["daq_rec"] = metadata->daq_rec[i_buffer];
// header["pulse_id"] = (Json::Value::UInt64)metadata->pulse_id[i_buffer];
//
// //this needs to be re-read from external source
// header["pedestal_file"] = "/sf/bernina/data/p17534/res/JF_pedestals/pedestal_20200423_1018.JF07T32V01.res.h5";
// header["gain_file"] = "/sf/bernina/config/jungfrau/gainMaps/JF07T32V01/gains.h5";
//
// header["number_frames_expected"] = 10000;
// header["run_name"] = to_string(uint64_t(metadata->pulse_id[i_buffer]/10000)*10000);
//
// // detector name should come as parameter to sf_stream
// header["detector_name"] = "JF07T32V01";
//
// header["htype"] = "array-1.0";
// header["type"] = "uint16";
//
// int send_streamvis = 0;
// if ( reduction_factor_streamvis > 1 ) {
// send_streamvis = rand() % reduction_factor_streamvis;
// }
// if ( send_streamvis == 0 ) {
// header["shape"][0] = 16384;
// header["shape"][1] = 1024;
// } else{
// header["shape"][0] = 2;
// header["shape"][1] = 2;
// }
//
// string text_header = Json::writeString(builder, header);
//
// zmq_send(socket_streamvis,
// text_header.c_str(),
// text_header.size(),
// ZMQ_SNDMORE);
//
// if ( send_streamvis == 0 ) {
// zmq_send(socket_streamvis,
// (char*)data,
// core_buffer::MODULE_N_BYTES*n_modules,
// 0);
// } else {
// zmq_send(socket_streamvis,
// (char*)data_empty,
// 8,
// 0);
// }
//
// //same for live analysis
// int send_live_analysis = 0;
// if ( reduction_factor_live_analysis > 1 ) {
// send_live_analysis = rand() % reduction_factor_live_analysis;
// }
// if ( send_live_analysis == 0 ) {
// header["shape"][0] = 16384;
// header["shape"][1] = 1024;
// } else{
// header["shape"][0] = 2;
// header["shape"][1] = 2;
// }
//
// text_header = Json::writeString(builder, header);
//
// zmq_send(socket_live,
// text_header.c_str(),
// text_header.size(),
// ZMQ_SNDMORE);
//
// if ( send_live_analysis == 0 ) {
// zmq_send(socket_live,
// (char*)data,
// core_buffer::MODULE_N_BYTES*n_modules,
// 0);
// } else {
// zmq_send(socket_live,
// (char*)data_empty,
// 8,
// 0);
// }
//
//
// }
header["frame"] = (Json::Value::UInt64)frame_index;
header["is_good_frame"] = is_good_frame;
header["daq_rec"] = (Json::Value::UInt64)daq_rec;
header["pulse_id"] = (Json::Value::UInt64)pulse_id;
//this needs to be re-read from external source
header["pedestal_file"] = "/sf/bernina/data/p17534/res/JF_pedestals/pedestal_20200423_1018.JF07T32V01.res.h5";
header["gain_file"] = "/sf/bernina/config/jungfrau/gainMaps/JF07T32V01/gains.h5";
header["number_frames_expected"] = 10000;
header["run_name"] = to_string(uint64_t(pulse_id/10000)*10000);
// detector name should come as parameter to sf_stream
header["detector_name"] = "JF07T32V01";
header["htype"] = "array-1.0";
header["type"] = "uint16";
int send_streamvis = 0;
if ( reduction_factor_streamvis > 1 ) {
send_streamvis = rand() % reduction_factor_streamvis;
}
if ( send_streamvis == 0 ) {
header["shape"][0] = 16384;
header["shape"][1] = 1024;
} else{
header["shape"][0] = 2;
header["shape"][1] = 2;
}
string text_header = Json::writeString(builder, header);
zmq_send(socket_streamvis,
text_header.c_str(),
text_header.size(),
ZMQ_SNDMORE);
if ( send_streamvis == 0 ) {
zmq_send(socket_streamvis,
(char*)data,
core_buffer::MODULE_N_BYTES*n_modules,
0);
} else {
zmq_send(socket_streamvis,
(char*)data_empty,
8,
0);
}
//same for live analysis
int send_live_analysis = 0;
if ( reduction_factor_live_analysis > 1 ) {
send_live_analysis = rand() % reduction_factor_live_analysis;
}
if ( send_live_analysis == 0 ) {
header["shape"][0] = 16384;
header["shape"][1] = 1024;
} else{
header["shape"][0] = 2;
header["shape"][1] = 2;
}
text_header = Json::writeString(builder, header);
zmq_send(socket_live,
text_header.c_str(),
text_header.size(),
ZMQ_SNDMORE);
if ( send_live_analysis == 0 ) {
zmq_send(socket_live,
(char*)data,
core_buffer::MODULE_N_BYTES*n_modules,
0);
} else {
zmq_send(socket_live,
(char*)data_empty,
8,
0);
}
queue.release();