From dbd7697e01a377d3ab421803394fe57d0d28aec4 Mon Sep 17 00:00:00 2001 From: Dmitry Ozerov Date: Mon, 4 May 2020 09:05:13 +0200 Subject: [PATCH 1/3] move sf_replay to other numa nodes and make a quasi online stream (one hour replay) to test --- scripts/JF07-replay-worker.sh | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/scripts/JF07-replay-worker.sh b/scripts/JF07-replay-worker.sh index 6d77992..aacf7c0 100644 --- a/scripts/JF07-replay-worker.sh +++ b/scripts/JF07-replay-worker.sh @@ -8,20 +8,19 @@ fi M=$1 -#8 replay workers per core, last writer worker occupies 4 -coreAssociated=(17 17 17 17 17 17 17 17 18 18 18 18 18 18 18 18 19 19 19 19 19 19 19 19 20 20 20 20 20 20 20 20 21,22,23,24) +#8 replay workers per core, last (stream to visualisation) worker occupies 4 +coreAssociated=(20 20 20 20 20 20 20 20 21 21 21 21 21 21 21 21 22 22 22 22 22 22 22 22 23 23 23 23 23 23 23 23 24,25,26,27) latest_file=`cat /gpfs/photonics/swissfel/buffer/JF07T32V01/M00/LATEST` last_pulse_id=`basename ${latest_file} | sed 's/.h5//'` -#first_pulse_id=$((${last_pulse_id}-100000)) -first_pulse_id=$((${last_pulse_id}-100000)) +first_pulse_id=$((${last_pulse_id}-360000)) echo "First/last pulse_id : ${first_pulse_id} ${last_pulse_id}" if [ ${M} == 32 ] then # taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_writer /gpfs/photonics/swissfel/buffer/test.${first_pulse_id}-${last_pulse_id}.h5 ${first_pulse_id} ${last_pulse_id} - taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_stream tcp://129.129.241.42:9007 30 tcp://129.129.241.42:9107 30 + taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_stream tcp://129.129.241.42:9007 30 tcp://192.168.30.29:9107 30 else taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_replay /gpfs/photonics/swissfel/buffer/JF07T32V01 M${M} ${M} ${first_pulse_id} ${last_pulse_id} fi From b5803a3f43bae2ce4ea91d4da94b43c79c87c1bb Mon Sep 17 00:00:00 2001 From: Dmitry Ozerov Date: Mon, 4 May 2020 09:07:27 +0200 Subject: [PATCH 2/3] in case of missing completely frame, don't raise exception --- sf-buffer/src/sf_replay.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sf-buffer/src/sf_replay.cpp b/sf-buffer/src/sf_replay.cpp index 3be2e80..1ac22a6 100644 --- a/sf-buffer/src/sf_replay.cpp +++ b/sf-buffer/src/sf_replay.cpp @@ -155,7 +155,7 @@ void sf_replay ( return; } - if (current_pulse_id != module_frame.pulse_id) { + if (current_pulse_id != module_frame.pulse_id and module_frame.pulse_id != 0) { stringstream err_msg; using namespace date; From 8ce25dd84696c7bd1cc0a41a6dd3011a2fa15dfa Mon Sep 17 00:00:00 2001 From: Dmitry Ozerov Date: Mon, 4 May 2020 09:08:24 +0200 Subject: [PATCH 3/3] added stream for live analysis to test --- sf-buffer/src/sf_stream.cpp | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/sf-buffer/src/sf_stream.cpp b/sf-buffer/src/sf_stream.cpp index 1303692..099777f 100644 --- a/sf-buffer/src/sf_stream.cpp +++ b/sf-buffer/src/sf_stream.cpp @@ -292,6 +292,39 @@ int main (int argc, char *argv[]) 0); } + //same for live analysis + int send_live_analysis = 0; + if ( reduction_factor_live_analysis > 1 ) { + send_live_analysis = rand() % reduction_factor_live_analysis; + } + if ( send_live_analysis == 0 ) { + header["shape"][0] = 16384; + header["shape"][1] = 1024; + } else{ + header["shape"][0] = 2; + header["shape"][1] = 2; + } + + text_header = Json::writeString(builder, header); + + zmq_send(socket_live, + text_header.c_str(), + text_header.size(), + ZMQ_SNDMORE); + + if ( send_live_analysis == 0 ) { + zmq_send(socket_live, + (char*)data, + core_buffer::MODULE_N_BYTES*n_modules, + 0); + } else { + zmq_send(socket_live, + (char*)data_empty, + 8, + 0); + } + + } queue.release();