diff --git a/scripts/JF07-replay-worker.sh b/scripts/JF07-replay-worker.sh index 2a60dbe..aacf7c0 100644 --- a/scripts/JF07-replay-worker.sh +++ b/scripts/JF07-replay-worker.sh @@ -8,20 +8,19 @@ fi M=$1 -#8 replay workers per core, last writer worker occupies 4 -coreAssociated=(17 17 17 17 17 17 17 17 18 18 18 18 18 18 18 18 19 19 19 19 19 19 19 19 20 20 20 20 20 20 20 20 21,22,23,24) +#8 replay workers per core, last (stream to visualisation) worker occupies 4 +coreAssociated=(20 20 20 20 20 20 20 20 21 21 21 21 21 21 21 21 22 22 22 22 22 22 22 22 23 23 23 23 23 23 23 23 24,25,26,27) latest_file=`cat /gpfs/photonics/swissfel/buffer/JF07T32V01/M00/LATEST` last_pulse_id=`basename ${latest_file} | sed 's/.h5//'` -#first_pulse_id=$((${last_pulse_id}-100000)) -first_pulse_id=$((${last_pulse_id}-1000)) +first_pulse_id=$((${last_pulse_id}-360000)) echo "First/last pulse_id : ${first_pulse_id} ${last_pulse_id}" if [ ${M} == 32 ] then # taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_writer /gpfs/photonics/swissfel/buffer/test.${first_pulse_id}-${last_pulse_id}.h5 ${first_pulse_id} ${last_pulse_id} - taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_stream tcp://129.129.241.42:9007 30 tcp://129.129.241.42:9107 30 + taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_stream tcp://129.129.241.42:9007 30 tcp://192.168.30.29:9107 30 else taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_replay /gpfs/photonics/swissfel/buffer/JF07T32V01 M${M} ${M} ${first_pulse_id} ${last_pulse_id} fi diff --git a/sf-buffer/src/sf_replay.cpp b/sf-buffer/src/sf_replay.cpp index 3be2e80..1ac22a6 100644 --- a/sf-buffer/src/sf_replay.cpp +++ b/sf-buffer/src/sf_replay.cpp @@ -155,7 +155,7 @@ void sf_replay ( return; } - if (current_pulse_id != module_frame.pulse_id) { + if (current_pulse_id != module_frame.pulse_id and module_frame.pulse_id != 0) { stringstream err_msg; using namespace date; diff --git a/sf-buffer/src/sf_stream.cpp b/sf-buffer/src/sf_stream.cpp index dd3ccb7..099777f 100644 --- a/sf-buffer/src/sf_stream.cpp +++ b/sf-buffer/src/sf_stream.cpp @@ -96,7 +96,7 @@ void receive_replay( if (frame_meta_buffer->pulse_id[i_buffer] != module_meta_buffer->pulse_id) { frame_meta_buffer->is_good_frame[i_buffer] = false; - throw runtime_error("Unexpected pulse_id received."); + //throw runtime_error("Unexpected pulse_id received."); } if (frame_meta_buffer->frame_index[i_buffer] != @@ -214,6 +214,7 @@ int main (int argc, char *argv[]) auto start_time = chrono::steady_clock::now(); Json::Value header; + Json::StreamWriterBuilder builder; while (true) { @@ -247,42 +248,85 @@ int main (int argc, char *argv[]) header["daq_rec"] = metadata->daq_rec[i_buffer]; header["pulse_id"] = (Json::Value::UInt64)metadata->pulse_id[i_buffer]; - string text_header = Json::FastWriter().write(header); + //this needs to be re-read from external source + header["pedestal_file"] = "/sf/bernina/data/p17534/res/JF_pedestals/pedestal_20200423_1018.JF07T32V01.res.h5"; + header["gain_file"] = "/sf/bernina/config/jungfrau/gainMaps/JF07T32V01/gains.h5"; + + header["number_frames_expected"] = 10000; + header["run_name"] = to_string(uint64_t(metadata->pulse_id[i_buffer]/10000)*10000); + + // detector name should come as parameter to sf_stream + header["detector_name"] = "JF07T32V01"; + + header["htype"] = "array-1.0"; + header["type"] = "uint16"; + + int send_streamvis = 0; + if ( reduction_factor_streamvis > 1 ) { + send_streamvis = rand() % reduction_factor_streamvis; + } + if ( send_streamvis == 0 ) { + header["shape"][0] = 16384; + header["shape"][1] = 1024; + } else{ + header["shape"][0] = 2; + header["shape"][1] = 2; + } + + string text_header = Json::writeString(builder, header); + zmq_send(socket_streamvis, - text_header, + text_header.c_str(), + text_header.size(), + ZMQ_SNDMORE); + + if ( send_streamvis == 0 ) { + zmq_send(socket_streamvis, + (char*)data, + core_buffer::MODULE_N_BYTES*n_modules, + 0); + } else { + zmq_send(socket_streamvis, + (char*)data_empty, + 8, + 0); + } + + //same for live analysis + int send_live_analysis = 0; + if ( reduction_factor_live_analysis > 1 ) { + send_live_analysis = rand() % reduction_factor_live_analysis; + } + if ( send_live_analysis == 0 ) { + header["shape"][0] = 16384; + header["shape"][1] = 1024; + } else{ + header["shape"][0] = 2; + header["shape"][1] = 2; + } + + text_header = Json::writeString(builder, header); + + zmq_send(socket_live, + text_header.c_str(), text_header.size(), ZMQ_SNDMORE); - zmq_send(socket_streamvis, - (char*)data_empty, - 8, - 0); + if ( send_live_analysis == 0 ) { + zmq_send(socket_live, + (char*)data, + core_buffer::MODULE_N_BYTES*n_modules, + 0); + } else { + zmq_send(socket_live, + (char*)data_empty, + 8, + 0); + } + } - //int send_streamvis = 0; - //if ( reduction_factor_streamvis > 1 ) { - // send_streamvis = rand() % reduction_factor_streamvis; - //} - //send_streamvis = 0; - //zmq_send(socket_streamvis, - // &metadata, - // sizeof(DetectorFrame), - // ZMQ_SNDMORE); - - //if ( send_streamvis == 0 ) { - // zmq_send(socket_streamvis, - // (char*)data, - // MODULE_N_BYTES*n_modules, - // 0); - //} else { - // zmq_send(socket_streamvis, - // (char*)data_empty, - // 8, - // 0); - //} - - queue.release(); // TODO: Some poor statistics.