Merge remote-tracking branch 'origin/master' into compression

This commit is contained in:
2020-05-04 09:25:57 +02:00
3 changed files with 79 additions and 36 deletions
+4 -5
View File
@@ -8,20 +8,19 @@ fi
M=$1
#8 replay workers per core, last writer worker occupies 4
coreAssociated=(17 17 17 17 17 17 17 17 18 18 18 18 18 18 18 18 19 19 19 19 19 19 19 19 20 20 20 20 20 20 20 20 21,22,23,24)
#8 replay workers per core, last (stream to visualisation) worker occupies 4
coreAssociated=(20 20 20 20 20 20 20 20 21 21 21 21 21 21 21 21 22 22 22 22 22 22 22 22 23 23 23 23 23 23 23 23 24,25,26,27)
latest_file=`cat /gpfs/photonics/swissfel/buffer/JF07T32V01/M00/LATEST`
last_pulse_id=`basename ${latest_file} | sed 's/.h5//'`
#first_pulse_id=$((${last_pulse_id}-100000))
first_pulse_id=$((${last_pulse_id}-1000))
first_pulse_id=$((${last_pulse_id}-360000))
echo "First/last pulse_id : ${first_pulse_id} ${last_pulse_id}"
if [ ${M} == 32 ]
then
# taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_writer /gpfs/photonics/swissfel/buffer/test.${first_pulse_id}-${last_pulse_id}.h5 ${first_pulse_id} ${last_pulse_id}
taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_stream tcp://129.129.241.42:9007 30 tcp://129.129.241.42:9107 30
taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_stream tcp://129.129.241.42:9007 30 tcp://192.168.30.29:9107 30
else
taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_replay /gpfs/photonics/swissfel/buffer/JF07T32V01 M${M} ${M} ${first_pulse_id} ${last_pulse_id}
fi
+1 -1
View File
@@ -155,7 +155,7 @@ void sf_replay (
return;
}
if (current_pulse_id != module_frame.pulse_id) {
if (current_pulse_id != module_frame.pulse_id and module_frame.pulse_id != 0) {
stringstream err_msg;
using namespace date;
+74 -30
View File
@@ -96,7 +96,7 @@ void receive_replay(
if (frame_meta_buffer->pulse_id[i_buffer] !=
module_meta_buffer->pulse_id) {
frame_meta_buffer->is_good_frame[i_buffer] = false;
throw runtime_error("Unexpected pulse_id received.");
//throw runtime_error("Unexpected pulse_id received.");
}
if (frame_meta_buffer->frame_index[i_buffer] !=
@@ -214,6 +214,7 @@ int main (int argc, char *argv[])
auto start_time = chrono::steady_clock::now();
Json::Value header;
Json::StreamWriterBuilder builder;
while (true) {
@@ -247,42 +248,85 @@ int main (int argc, char *argv[])
header["daq_rec"] = metadata->daq_rec[i_buffer];
header["pulse_id"] = (Json::Value::UInt64)metadata->pulse_id[i_buffer];
string text_header = Json::FastWriter().write(header);
//this needs to be re-read from external source
header["pedestal_file"] = "/sf/bernina/data/p17534/res/JF_pedestals/pedestal_20200423_1018.JF07T32V01.res.h5";
header["gain_file"] = "/sf/bernina/config/jungfrau/gainMaps/JF07T32V01/gains.h5";
header["number_frames_expected"] = 10000;
header["run_name"] = to_string(uint64_t(metadata->pulse_id[i_buffer]/10000)*10000);
// detector name should come as parameter to sf_stream
header["detector_name"] = "JF07T32V01";
header["htype"] = "array-1.0";
header["type"] = "uint16";
int send_streamvis = 0;
if ( reduction_factor_streamvis > 1 ) {
send_streamvis = rand() % reduction_factor_streamvis;
}
if ( send_streamvis == 0 ) {
header["shape"][0] = 16384;
header["shape"][1] = 1024;
} else{
header["shape"][0] = 2;
header["shape"][1] = 2;
}
string text_header = Json::writeString(builder, header);
zmq_send(socket_streamvis,
text_header,
text_header.c_str(),
text_header.size(),
ZMQ_SNDMORE);
if ( send_streamvis == 0 ) {
zmq_send(socket_streamvis,
(char*)data,
core_buffer::MODULE_N_BYTES*n_modules,
0);
} else {
zmq_send(socket_streamvis,
(char*)data_empty,
8,
0);
}
//same for live analysis
int send_live_analysis = 0;
if ( reduction_factor_live_analysis > 1 ) {
send_live_analysis = rand() % reduction_factor_live_analysis;
}
if ( send_live_analysis == 0 ) {
header["shape"][0] = 16384;
header["shape"][1] = 1024;
} else{
header["shape"][0] = 2;
header["shape"][1] = 2;
}
text_header = Json::writeString(builder, header);
zmq_send(socket_live,
text_header.c_str(),
text_header.size(),
ZMQ_SNDMORE);
zmq_send(socket_streamvis,
(char*)data_empty,
8,
0);
if ( send_live_analysis == 0 ) {
zmq_send(socket_live,
(char*)data,
core_buffer::MODULE_N_BYTES*n_modules,
0);
} else {
zmq_send(socket_live,
(char*)data_empty,
8,
0);
}
}
//int send_streamvis = 0;
//if ( reduction_factor_streamvis > 1 ) {
// send_streamvis = rand() % reduction_factor_streamvis;
//}
//send_streamvis = 0;
//zmq_send(socket_streamvis,
// &metadata,
// sizeof(DetectorFrame),
// ZMQ_SNDMORE);
//if ( send_streamvis == 0 ) {
// zmq_send(socket_streamvis,
// (char*)data,
// MODULE_N_BYTES*n_modules,
// 0);
//} else {
// zmq_send(socket_streamvis,
// (char*)data_empty,
// 8,
// 0);
//}
queue.release();
// TODO: Some poor statistics.