From b484b45b88ff033971e0766b5cbcd40990853693 Mon Sep 17 00:00:00 2001 From: Dmitry Ozerov Date: Fri, 1 May 2020 14:20:16 +0200 Subject: [PATCH 1/4] working version of sf_stream (sends to streamvis assembled frame) --- scripts/JF07-replay-worker.sh | 2 +- sf-buffer/src/sf_stream.cpp | 75 ++++++++++++++++++++--------------- 2 files changed, 44 insertions(+), 33 deletions(-) diff --git a/scripts/JF07-replay-worker.sh b/scripts/JF07-replay-worker.sh index 2a60dbe..6d77992 100644 --- a/scripts/JF07-replay-worker.sh +++ b/scripts/JF07-replay-worker.sh @@ -14,7 +14,7 @@ coreAssociated=(17 17 17 17 17 17 17 17 18 18 18 18 18 18 18 18 19 19 19 19 19 1 latest_file=`cat /gpfs/photonics/swissfel/buffer/JF07T32V01/M00/LATEST` last_pulse_id=`basename ${latest_file} | sed 's/.h5//'` #first_pulse_id=$((${last_pulse_id}-100000)) -first_pulse_id=$((${last_pulse_id}-1000)) +first_pulse_id=$((${last_pulse_id}-100000)) echo "First/last pulse_id : ${first_pulse_id} ${last_pulse_id}" diff --git a/sf-buffer/src/sf_stream.cpp b/sf-buffer/src/sf_stream.cpp index dd3ccb7..1303692 100644 --- a/sf-buffer/src/sf_stream.cpp +++ b/sf-buffer/src/sf_stream.cpp @@ -96,7 +96,7 @@ void receive_replay( if (frame_meta_buffer->pulse_id[i_buffer] != module_meta_buffer->pulse_id) { frame_meta_buffer->is_good_frame[i_buffer] = false; - throw runtime_error("Unexpected pulse_id received."); + //throw runtime_error("Unexpected pulse_id received."); } if (frame_meta_buffer->frame_index[i_buffer] != @@ -214,6 +214,7 @@ int main (int argc, char *argv[]) auto start_time = chrono::steady_clock::now(); Json::Value header; + Json::StreamWriterBuilder builder; while (true) { @@ -247,42 +248,52 @@ int main (int argc, char *argv[]) header["daq_rec"] = metadata->daq_rec[i_buffer]; header["pulse_id"] = (Json::Value::UInt64)metadata->pulse_id[i_buffer]; - string text_header = Json::FastWriter().write(header); - zmq_send(socket_streamvis, - text_header, - text_header.size(), - ZMQ_SNDMORE); + //this needs to be re-read from external source + header["pedestal_file"] = "/sf/bernina/data/p17534/res/JF_pedestals/pedestal_20200423_1018.JF07T32V01.res.h5"; + header["gain_file"] = "/sf/bernina/config/jungfrau/gainMaps/JF07T32V01/gains.h5"; + + header["number_frames_expected"] = 10000; + header["run_name"] = to_string(uint64_t(metadata->pulse_id[i_buffer]/10000)*10000); + + // detector name should come as parameter to sf_stream + header["detector_name"] = "JF07T32V01"; + + header["htype"] = "array-1.0"; + header["type"] = "uint16"; + + int send_streamvis = 0; + if ( reduction_factor_streamvis > 1 ) { + send_streamvis = rand() % reduction_factor_streamvis; + } + if ( send_streamvis == 0 ) { + header["shape"][0] = 16384; + header["shape"][1] = 1024; + } else{ + header["shape"][0] = 2; + header["shape"][1] = 2; + } + + string text_header = Json::writeString(builder, header); zmq_send(socket_streamvis, - (char*)data_empty, - 8, - 0); + text_header.c_str(), + text_header.size(), + ZMQ_SNDMORE); + + if ( send_streamvis == 0 ) { + zmq_send(socket_streamvis, + (char*)data, + core_buffer::MODULE_N_BYTES*n_modules, + 0); + } else { + zmq_send(socket_streamvis, + (char*)data_empty, + 8, + 0); + } } - //int send_streamvis = 0; - //if ( reduction_factor_streamvis > 1 ) { - // send_streamvis = rand() % reduction_factor_streamvis; - //} - //send_streamvis = 0; - //zmq_send(socket_streamvis, - // &metadata, - // sizeof(DetectorFrame), - // ZMQ_SNDMORE); - - //if ( send_streamvis == 0 ) { - // zmq_send(socket_streamvis, - // (char*)data, - // MODULE_N_BYTES*n_modules, - // 0); - //} else { - // zmq_send(socket_streamvis, - // (char*)data_empty, - // 8, - // 0); - //} - - queue.release(); // TODO: Some poor statistics. From 7e7593717cc9bdb3cce339b380365e80503fa5ce Mon Sep 17 00:00:00 2001 From: babic_a Date: Mon, 4 May 2020 09:05:13 +0200 Subject: [PATCH 2/4] move sf_replay to other numa nodes and make a quasi online stream (one hour replay) to test --- scripts/JF07-replay-worker.sh | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/scripts/JF07-replay-worker.sh b/scripts/JF07-replay-worker.sh index 6d77992..aacf7c0 100644 --- a/scripts/JF07-replay-worker.sh +++ b/scripts/JF07-replay-worker.sh @@ -8,20 +8,19 @@ fi M=$1 -#8 replay workers per core, last writer worker occupies 4 -coreAssociated=(17 17 17 17 17 17 17 17 18 18 18 18 18 18 18 18 19 19 19 19 19 19 19 19 20 20 20 20 20 20 20 20 21,22,23,24) +#8 replay workers per core, last (stream to visualisation) worker occupies 4 +coreAssociated=(20 20 20 20 20 20 20 20 21 21 21 21 21 21 21 21 22 22 22 22 22 22 22 22 23 23 23 23 23 23 23 23 24,25,26,27) latest_file=`cat /gpfs/photonics/swissfel/buffer/JF07T32V01/M00/LATEST` last_pulse_id=`basename ${latest_file} | sed 's/.h5//'` -#first_pulse_id=$((${last_pulse_id}-100000)) -first_pulse_id=$((${last_pulse_id}-100000)) +first_pulse_id=$((${last_pulse_id}-360000)) echo "First/last pulse_id : ${first_pulse_id} ${last_pulse_id}" if [ ${M} == 32 ] then # taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_writer /gpfs/photonics/swissfel/buffer/test.${first_pulse_id}-${last_pulse_id}.h5 ${first_pulse_id} ${last_pulse_id} - taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_stream tcp://129.129.241.42:9007 30 tcp://129.129.241.42:9107 30 + taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_stream tcp://129.129.241.42:9007 30 tcp://192.168.30.29:9107 30 else taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_replay /gpfs/photonics/swissfel/buffer/JF07T32V01 M${M} ${M} ${first_pulse_id} ${last_pulse_id} fi From 5221d53f36f92715029dc47252becc412a7e0e31 Mon Sep 17 00:00:00 2001 From: babic_a Date: Mon, 4 May 2020 09:07:27 +0200 Subject: [PATCH 3/4] in case of missing completely frame, don't raise exception --- sf-buffer/src/sf_replay.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sf-buffer/src/sf_replay.cpp b/sf-buffer/src/sf_replay.cpp index 3be2e80..1ac22a6 100644 --- a/sf-buffer/src/sf_replay.cpp +++ b/sf-buffer/src/sf_replay.cpp @@ -155,7 +155,7 @@ void sf_replay ( return; } - if (current_pulse_id != module_frame.pulse_id) { + if (current_pulse_id != module_frame.pulse_id and module_frame.pulse_id != 0) { stringstream err_msg; using namespace date; From 5f8b8898876dc1a0dc4b1d21f25c12da9cdefeb6 Mon Sep 17 00:00:00 2001 From: babic_a Date: Mon, 4 May 2020 09:08:24 +0200 Subject: [PATCH 4/4] added stream for live analysis to test --- sf-buffer/src/sf_stream.cpp | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/sf-buffer/src/sf_stream.cpp b/sf-buffer/src/sf_stream.cpp index 1303692..099777f 100644 --- a/sf-buffer/src/sf_stream.cpp +++ b/sf-buffer/src/sf_stream.cpp @@ -292,6 +292,39 @@ int main (int argc, char *argv[]) 0); } + //same for live analysis + int send_live_analysis = 0; + if ( reduction_factor_live_analysis > 1 ) { + send_live_analysis = rand() % reduction_factor_live_analysis; + } + if ( send_live_analysis == 0 ) { + header["shape"][0] = 16384; + header["shape"][1] = 1024; + } else{ + header["shape"][0] = 2; + header["shape"][1] = 2; + } + + text_header = Json::writeString(builder, header); + + zmq_send(socket_live, + text_header.c_str(), + text_header.size(), + ZMQ_SNDMORE); + + if ( send_live_analysis == 0 ) { + zmq_send(socket_live, + (char*)data, + core_buffer::MODULE_N_BYTES*n_modules, + 0); + } else { + zmq_send(socket_live, + (char*)data_empty, + 8, + 0); + } + + } queue.release();