From 85660a92602f36b8b12172aa9fefb302f8198929 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 1 May 2020 12:41:39 +0200 Subject: [PATCH 01/23] Added new sf_live stub --- sf-buffer/src/sf_live.cpp | 125 ++++++++------------------------------ 1 file changed, 27 insertions(+), 98 deletions(-) diff --git a/sf-buffer/src/sf_live.cpp b/sf-buffer/src/sf_live.cpp index d2c4592..a319390 100644 --- a/sf-buffer/src/sf_live.cpp +++ b/sf-buffer/src/sf_live.cpp @@ -7,17 +7,11 @@ #include #include #include "date.h" +#include "LiveH5Reader.hpp" using namespace std; using namespace core_buffer; -struct FileBufferMetadata { - uint64_t pulse_id[REPLAY_READ_BLOCK_SIZE]; - uint64_t frame_index[REPLAY_READ_BLOCK_SIZE]; - uint32_t daq_rec[REPLAY_READ_BLOCK_SIZE]; - uint16_t n_received_packets[REPLAY_READ_BLOCK_SIZE]; -}; - void load_data_from_file ( FileBufferMetadata* metadata_buffer, char* image_buffer, @@ -84,107 +78,44 @@ void sf_live ( void* socket, const string& device, const string& channel_name, - const uint16_t source_id, - const uint64_t start_pulse_id) + const uint16_t source_id) { - auto metadata_buffer = make_unique(); - auto image_buffer = make_unique( - LIVE_READ_BLOCK_SIZE * MODULE_N_PIXELS); + auto metadata_buffer = make_unique(); + auto image_buffer = make_unique(MODULE_N_PIXELS); - auto latest_filename = ""; + const auto current_filename = device + "/" + channel_name + "/CURRENT"; - uint64_t base_pulse_id = start_pulse_id / core_buffer::FILE_MOD; - base_pulse_id *= core_buffer::FILE_MOD; + LiveH5Reader reader(current_filename, source_id); - size_t current_pulse_id = base_pulse_id; - string filename_base = device + "/" + channel_name + "/"; + auto current_pulse_id = reader.get_latest_pulse_id(); - for (const auto& filename_suffix:path_suffixes) { + while (true) { - string filename = filename_base + filename_suffix.path; + reader.get_frame_metadata(current_pulse_id, metadata_buffer.get()); + + zmq_send(socket, + (char*)(metadata_buffer.get()), + sizeof(ModuleFrame), + ZMQ_SNDMORE); + + reader.get_frame_data(current_pulse_id, image_buffer.get()); + + zmq_send(socket, + (char*)(image_buffer.get()), + MODULE_N_BYTES, + 0); #ifdef DEBUG_OUTPUT using namespace date; using namespace chrono; cout << "[" << system_clock::now() << "]"; - cout << "[sf_replay::sf_replay]"; - - cout << " Reading from filename " << filename << endl; + cout << "[sf_live::sf_live]"; + cout << " Sent pulse_id "; + cout << current_pulse_id << endl; #endif - for (size_t file_index_offset=0; - file_index_offset < FILE_MOD; - file_index_offset += REPLAY_READ_BLOCK_SIZE) - { - auto start_time = chrono::steady_clock::now(); - - load_data_from_file( - metadata_buffer.get(), - (char*)(image_buffer.get()), - filename, - file_index_offset); - - auto end_time = chrono::steady_clock::now(); - auto ms_duration = chrono::duration_cast( - end_time-start_time).count(); - - cout << "sf_replay:batch_read_ms " << ms_duration << endl; - - for (size_t i_frame=0; i_frame < REPLAY_READ_BLOCK_SIZE; i_frame++) { - - ModuleFrame module_frame = { - metadata_buffer->pulse_id[i_frame], - metadata_buffer->frame_index[i_frame], - metadata_buffer->daq_rec[i_frame], - metadata_buffer->n_received_packets[i_frame], - source_id - }; - - if (current_pulse_id < start_pulse_id) { - current_pulse_id++; - continue; - } - - if (current_pulse_id != module_frame.pulse_id) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[sf_live::sf_live]"; - err_msg << " Read unexpected pulse_id. "; - err_msg << " Expected " << current_pulse_id; - err_msg << " received " << module_frame.pulse_id; - err_msg << endl; - - throw runtime_error(err_msg.str()); - } - - zmq_send(socket, - &module_frame, - sizeof(ModuleFrame), - ZMQ_SNDMORE); - - auto buff_offset = i_frame * MODULE_N_PIXELS; - zmq_send(socket, - (char*)(image_buffer.get() + buff_offset), - MODULE_N_BYTES, - 0); - - #ifdef DEBUG_OUTPUT - using namespace date; - using namespace chrono; - - cout << "[" << system_clock::now() << "]"; - cout << "[sf_replay::sf_replay]"; - cout << " Sent pulse_id "; - cout << current_pulse_id << endl; - #endif - - current_pulse_id++; - } - } + current_pulse_id++; } } @@ -192,8 +123,7 @@ int main (int argc, char *argv[]) { if (argc != 6) { cout << endl; - cout << "Usage: sf_live [device]"; - cout << " [channel_name] [source_id] [start_pulse_id]"; + cout << "Usage: sf_live [device] [channel_name] [source_id]"; cout << endl; cout << "\tdevice: Name of detector." << endl; cout << "\tchannel_name: M00-M31 for JF16M." << endl; @@ -206,7 +136,6 @@ int main (int argc, char *argv[]) { const string device = string(argv[1]); const string channel_name = string(argv[2]); const uint16_t source_id = (uint16_t) atoi(argv[3]); - const uint64_t start_pulse_id = (uint64_t) atoll(argv[4]); stringstream ipc_stream; ipc_stream << "ipc://sf-live-" << (int)source_id; @@ -241,7 +170,7 @@ int main (int argc, char *argv[]) { if (zmq_connect(socket, ipc_address.c_str()) != 0) throw runtime_error(strerror (errno)); - sf_live(socket, device, channel_name, source_id, start_pulse_id); + sf_live(socket, device, channel_name, source_id); zmq_close(socket); zmq_ctx_destroy(ctx); From 6df2457dd7b62f6abac0c89868a0da07a4ef7e6d Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 1 May 2020 12:41:50 +0200 Subject: [PATCH 02/23] Stubs for LiveH5Reader --- core-buffer/include/LiveH5Reader.hpp | 9 +++++++++ core-buffer/src/LiveH5Reader.cpp | 1 + 2 files changed, 10 insertions(+) create mode 100644 core-buffer/include/LiveH5Reader.hpp create mode 100644 core-buffer/src/LiveH5Reader.cpp diff --git a/core-buffer/include/LiveH5Reader.hpp b/core-buffer/include/LiveH5Reader.hpp new file mode 100644 index 0000000..16362ad --- /dev/null +++ b/core-buffer/include/LiveH5Reader.hpp @@ -0,0 +1,9 @@ +#ifndef SF_DAQ_BUFFER_LIVEH5READER_HPP +#define SF_DAQ_BUFFER_LIVEH5READER_HPP + +class LiveH5Reader { + +}; + + +#endif //SF_DAQ_BUFFER_LIVEH5READER_HPP diff --git a/core-buffer/src/LiveH5Reader.cpp b/core-buffer/src/LiveH5Reader.cpp new file mode 100644 index 0000000..a8e8826 --- /dev/null +++ b/core-buffer/src/LiveH5Reader.cpp @@ -0,0 +1 @@ +#include "LiveH5Reader.hpp" From 2f95c599bacb5374c80d435427cb89efe17db831 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 1 May 2020 12:52:22 +0200 Subject: [PATCH 03/23] Adjust interface for LiveH5Reader --- core-buffer/include/LiveH5Reader.hpp | 15 ++++++ sf-buffer/src/sf_live.cpp | 78 ++-------------------------- 2 files changed, 20 insertions(+), 73 deletions(-) diff --git a/core-buffer/include/LiveH5Reader.hpp b/core-buffer/include/LiveH5Reader.hpp index 16362ad..9dda91e 100644 --- a/core-buffer/include/LiveH5Reader.hpp +++ b/core-buffer/include/LiveH5Reader.hpp @@ -1,8 +1,23 @@ #ifndef SF_DAQ_BUFFER_LIVEH5READER_HPP #define SF_DAQ_BUFFER_LIVEH5READER_HPP +#include +#include "jungfrau.hpp" + class LiveH5Reader { + const std::string current_filename_; + const uint16_t source_id_; + +public: + LiveH5Reader( + const std::string& device, + const std::string& channel_name, + const uint16_t source_id); + + uint64_t get_latest_pulse_id(); + ModuleFrame* read_frame_metadata(uint64_t pulse_id); + char* read_frame_data(uint64_t pulse_id); }; diff --git a/sf-buffer/src/sf_live.cpp b/sf-buffer/src/sf_live.cpp index a319390..805001c 100644 --- a/sf-buffer/src/sf_live.cpp +++ b/sf-buffer/src/sf_live.cpp @@ -12,96 +12,28 @@ using namespace std; using namespace core_buffer; -void load_data_from_file ( - FileBufferMetadata* metadata_buffer, - char* image_buffer, - const string &filename, - const size_t start_index) -{ - - hsize_t b_image_dim[3] = {REPLAY_READ_BLOCK_SIZE, 512, 1024}; - H5::DataSpace b_i_space (3, b_image_dim); - hsize_t b_i_count[] = {REPLAY_READ_BLOCK_SIZE, 512, 1024}; - hsize_t b_i_start[] = {0, 0, 0}; - b_i_space.selectHyperslab(H5S_SELECT_SET, b_i_count, b_i_start); - - hsize_t f_image_dim[3] = {FILE_MOD, 512, 1024}; - H5::DataSpace f_i_space (3, f_image_dim); - hsize_t f_i_count[] = {REPLAY_READ_BLOCK_SIZE, 512, 1024}; - hsize_t f_i_start[] = {start_index, 0, 0}; - f_i_space.selectHyperslab(H5S_SELECT_SET, f_i_count, f_i_start); - - hsize_t b_metadata_dim[2] = {REPLAY_READ_BLOCK_SIZE, 1}; - H5::DataSpace b_m_space (2, b_metadata_dim); - hsize_t b_m_count[] = {REPLAY_READ_BLOCK_SIZE, 1}; - hsize_t b_m_start[] = {0, 0}; - b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start); - - hsize_t f_metadata_dim[2] = {FILE_MOD, 1}; - H5::DataSpace f_m_space (2, f_metadata_dim); - hsize_t f_m_count[] = {REPLAY_READ_BLOCK_SIZE, 1}; - hsize_t f_m_start[] = {start_index, 0}; - f_m_space.selectHyperslab(H5S_SELECT_SET, f_m_count, f_m_start); - - H5::H5File input_file(filename, H5F_ACC_RDONLY); - - auto image_dataset = input_file.openDataSet("image"); - image_dataset.read( - image_buffer, H5::PredType::NATIVE_UINT16, - b_i_space, f_i_space); - - auto pulse_id_dataset = input_file.openDataSet("pulse_id"); - pulse_id_dataset.read( - metadata_buffer->pulse_id, H5::PredType::NATIVE_UINT64, - b_m_space, f_m_space); - - auto frame_id_dataset = input_file.openDataSet("frame_id"); - frame_id_dataset.read( - metadata_buffer->frame_index, H5::PredType::NATIVE_UINT64, - b_m_space, f_m_space); - - auto daq_rec_dataset = input_file.openDataSet("daq_rec"); - daq_rec_dataset.read( - metadata_buffer->daq_rec, H5::PredType::NATIVE_UINT32, - b_m_space, f_m_space); - - auto received_packets_dataset = - input_file.openDataSet("received_packets"); - received_packets_dataset.read( - metadata_buffer->n_received_packets, H5::PredType::NATIVE_UINT16, - b_m_space, f_m_space); - - input_file.close(); -} - void sf_live ( void* socket, const string& device, const string& channel_name, const uint16_t source_id) { - auto metadata_buffer = make_unique(); - auto image_buffer = make_unique(MODULE_N_PIXELS); - - const auto current_filename = device + "/" + channel_name + "/CURRENT"; - - LiveH5Reader reader(current_filename, source_id); + LiveH5Reader reader(device, channel_name, source_id); auto current_pulse_id = reader.get_latest_pulse_id(); - while (true) { - reader.get_frame_metadata(current_pulse_id, metadata_buffer.get()); + auto metadata = reader.read_frame_metadata(current_pulse_id); zmq_send(socket, - (char*)(metadata_buffer.get()), + (char*) metadata, sizeof(ModuleFrame), ZMQ_SNDMORE); - reader.get_frame_data(current_pulse_id, image_buffer.get()); + auto data = reader.read_frame_data(current_pulse_id); zmq_send(socket, - (char*)(image_buffer.get()), + data, MODULE_N_BYTES, 0); From 0810b67f5aa21e7fbd4a072b94e8e3eda211d9a1 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 1 May 2020 12:52:50 +0200 Subject: [PATCH 04/23] Remove useless imports --- sf-buffer/src/sf_live.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/sf-buffer/src/sf_live.cpp b/sf-buffer/src/sf_live.cpp index 805001c..be0be1a 100644 --- a/sf-buffer/src/sf_live.cpp +++ b/sf-buffer/src/sf_live.cpp @@ -1,10 +1,7 @@ #include -#include #include "jungfrau.hpp" -#include "BufferUtils.hpp" #include "zmq.h" #include "buffer_config.hpp" -#include #include #include "date.h" #include "LiveH5Reader.hpp" From b71a2af0ed12f81b1cebb51b827ed6afe33f7c03 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 1 May 2020 13:05:37 +0200 Subject: [PATCH 05/23] Made interface more simple for sf_live --- core-buffer/include/LiveH5Reader.hpp | 17 +++++++++++++++-- sf-buffer/src/sf_live.cpp | 6 ++++-- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/core-buffer/include/LiveH5Reader.hpp b/core-buffer/include/LiveH5Reader.hpp index 9dda91e..809fbde 100644 --- a/core-buffer/include/LiveH5Reader.hpp +++ b/core-buffer/include/LiveH5Reader.hpp @@ -2,12 +2,23 @@ #define SF_DAQ_BUFFER_LIVEH5READER_HPP #include +#include #include "jungfrau.hpp" +#include "buffer_config.hpp" class LiveH5Reader { + struct LiveBufferMetadata { + uint64_t pulse_id[core_buffer::FILE_MOD]; + uint64_t frame_index[core_buffer::FILE_MOD]; + uint32_t daq_rec[core_buffer::FILE_MOD]; + uint16_t n_received_packets[core_buffer::FILE_MOD]; + }; + const std::string current_filename_; const uint16_t source_id_; + std::unique_ptr metadata_buffer_; + std::unique_ptr data_buffer_; public: LiveH5Reader( @@ -16,8 +27,10 @@ public: const uint16_t source_id); uint64_t get_latest_pulse_id(); - ModuleFrame* read_frame_metadata(uint64_t pulse_id); - char* read_frame_data(uint64_t pulse_id); + void load_pulse_id(uint64_t pulse_id); + + ModuleFrame* get_metadata(); + char* get_data(); }; diff --git a/sf-buffer/src/sf_live.cpp b/sf-buffer/src/sf_live.cpp index be0be1a..2f69bff 100644 --- a/sf-buffer/src/sf_live.cpp +++ b/sf-buffer/src/sf_live.cpp @@ -20,14 +20,16 @@ void sf_live ( auto current_pulse_id = reader.get_latest_pulse_id(); while (true) { - auto metadata = reader.read_frame_metadata(current_pulse_id); + reader.load_pulse_id(current_pulse_id); + + auto metadata = reader.get_metadata(); zmq_send(socket, (char*) metadata, sizeof(ModuleFrame), ZMQ_SNDMORE); - auto data = reader.read_frame_data(current_pulse_id); + auto data = reader.get_data(); zmq_send(socket, data, From 7418cd361304a50b3311834644a4503ebe8f9d64 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 1 May 2020 13:09:09 +0200 Subject: [PATCH 06/23] Improve interface for caching data from file --- core-buffer/include/LiveH5Reader.hpp | 12 +++--------- sf-buffer/src/sf_live.cpp | 2 +- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/core-buffer/include/LiveH5Reader.hpp b/core-buffer/include/LiveH5Reader.hpp index 809fbde..d3c31c8 100644 --- a/core-buffer/include/LiveH5Reader.hpp +++ b/core-buffer/include/LiveH5Reader.hpp @@ -8,16 +8,10 @@ class LiveH5Reader { - struct LiveBufferMetadata { - uint64_t pulse_id[core_buffer::FILE_MOD]; - uint64_t frame_index[core_buffer::FILE_MOD]; - uint32_t daq_rec[core_buffer::FILE_MOD]; - uint16_t n_received_packets[core_buffer::FILE_MOD]; - }; - const std::string current_filename_; const uint16_t source_id_; - std::unique_ptr metadata_buffer_; + + std::unique_ptr pulse_id_buffer_; std::unique_ptr data_buffer_; public: @@ -29,7 +23,7 @@ public: uint64_t get_latest_pulse_id(); void load_pulse_id(uint64_t pulse_id); - ModuleFrame* get_metadata(); + ModuleFrame get_metadata(); char* get_data(); }; diff --git a/sf-buffer/src/sf_live.cpp b/sf-buffer/src/sf_live.cpp index 2f69bff..2f0b6c1 100644 --- a/sf-buffer/src/sf_live.cpp +++ b/sf-buffer/src/sf_live.cpp @@ -25,7 +25,7 @@ void sf_live ( auto metadata = reader.get_metadata(); zmq_send(socket, - (char*) metadata, + &metadata, sizeof(ModuleFrame), ZMQ_SNDMORE); From 6dd2fbbb1ca2cd08f8250b565d6d2d618977da78 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 1 May 2020 13:10:02 +0200 Subject: [PATCH 07/23] Remove unused import --- core-buffer/src/FastH5Writer.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/core-buffer/src/FastH5Writer.cpp b/core-buffer/src/FastH5Writer.cpp index 8c80af4..04405c1 100644 --- a/core-buffer/src/FastH5Writer.cpp +++ b/core-buffer/src/FastH5Writer.cpp @@ -1,10 +1,8 @@ #include #include "FastH5Writer.hpp" -#include "date.h" #include #include #include -#include extern "C" { From 8cf8d3e7d2991fef8b1452458a2093869b18deae Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 1 May 2020 13:56:08 +0200 Subject: [PATCH 08/23] Close file at end --- sf-buffer/src/sf_live.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sf-buffer/src/sf_live.cpp b/sf-buffer/src/sf_live.cpp index 2f0b6c1..6d7cb4e 100644 --- a/sf-buffer/src/sf_live.cpp +++ b/sf-buffer/src/sf_live.cpp @@ -48,6 +48,8 @@ void sf_live ( current_pulse_id++; } + + reader.close_file(); } int main (int argc, char *argv[]) { From 6253e53a19d8a3f057b6a174192f3e1b0bf47ddb Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 1 May 2020 13:56:24 +0200 Subject: [PATCH 09/23] Add partial implementation of LiveH5Reader --- core-buffer/include/LiveH5Reader.hpp | 16 +++++ core-buffer/src/LiveH5Reader.cpp | 97 ++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+) diff --git a/core-buffer/include/LiveH5Reader.hpp b/core-buffer/include/LiveH5Reader.hpp index d3c31c8..ebbe69f 100644 --- a/core-buffer/include/LiveH5Reader.hpp +++ b/core-buffer/include/LiveH5Reader.hpp @@ -5,6 +5,7 @@ #include #include "jungfrau.hpp" #include "buffer_config.hpp" +#include class LiveH5Reader { @@ -14,17 +15,32 @@ class LiveH5Reader { std::unique_ptr pulse_id_buffer_; std::unique_ptr data_buffer_; + uint64_t current_file_max_pulse_id_; + H5::H5File file_; + + H5::DataSet image_dataset_; + H5::DataSet pulse_id_dataset_; + H5::DataSet frame_index_dataset_; + H5::DataSet daq_rec_dataset_; + H5::DataSet n_received_packets_dataset_; + + void open_file(); + public: LiveH5Reader( const std::string& device, const std::string& channel_name, const uint16_t source_id); + ~LiveH5Reader(); + uint64_t get_latest_pulse_id(); void load_pulse_id(uint64_t pulse_id); ModuleFrame get_metadata(); char* get_data(); + + void close_file(); }; diff --git a/core-buffer/src/LiveH5Reader.cpp b/core-buffer/src/LiveH5Reader.cpp index a8e8826..a5955c9 100644 --- a/core-buffer/src/LiveH5Reader.cpp +++ b/core-buffer/src/LiveH5Reader.cpp @@ -1 +1,98 @@ #include "LiveH5Reader.hpp" +#include "BufferUtils.hpp" + +using namespace std; +using namespace core_buffer; + +LiveH5Reader::LiveH5Reader( + const std::string& device, + const std::string& channel_name, + const uint16_t source_id): + current_filename_(device + "/" + channel_name + "/CURRENT"), + source_id_(source_id), + pulse_id_buffer_(make_unique(FILE_MOD)), + data_buffer_(make_unique(MODULE_N_PIXELS)) +{ + auto filename = BufferUtils::get_latest_file(current_filename_); + file_ = H5::H5File(filename, H5F_ACC_RDONLY | H5F_ACC_SWMR_READ); + + uint64_t base_pulse_id = start_pulse_id / core_buffer::FILE_MOD; + base_pulse_id *= core_buffer::FILE_MOD; + + current_file_max_pulse_id_ = + + image_dataset_ = input_file.openDataSet("image"); + pulse_id_dataset_ = input_file.openDataSet("pulse_id"); + frame_index_dataset_ = input_file.openDataSet("frame_id"); + daq_rec_dataset_ = input_file.openDataSet("daq_rec"); + n_received_packets_dataset_ = input_file.openDataSet("received_packets"); + +} + +LiveH5Reader::~LiveH5Reader() { + close_file(); +} + + + +//void load_data_from_file ( +// FileBufferMetadata* metadata_buffer, +// char* image_buffer, +// const string &filename, +// const size_t start_index) +//{ +// +// hsize_t b_image_dim[3] = {REPLAY_READ_BLOCK_SIZE, 512, 1024}; +// H5::DataSpace b_i_space (3, b_image_dim); +// hsize_t b_i_count[] = {REPLAY_READ_BLOCK_SIZE, 512, 1024}; +// hsize_t b_i_start[] = {0, 0, 0}; +// b_i_space.selectHyperslab(H5S_SELECT_SET, b_i_count, b_i_start); +// +// hsize_t f_image_dim[3] = {FILE_MOD, 512, 1024}; +// H5::DataSpace f_i_space (3, f_image_dim); +// hsize_t f_i_count[] = {REPLAY_READ_BLOCK_SIZE, 512, 1024}; +// hsize_t f_i_start[] = {start_index, 0, 0}; +// f_i_space.selectHyperslab(H5S_SELECT_SET, f_i_count, f_i_start); +// +// hsize_t b_metadata_dim[2] = {REPLAY_READ_BLOCK_SIZE, 1}; +// H5::DataSpace b_m_space (2, b_metadata_dim); +// hsize_t b_m_count[] = {REPLAY_READ_BLOCK_SIZE, 1}; +// hsize_t b_m_start[] = {0, 0}; +// b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start); +// +// hsize_t f_metadata_dim[2] = {FILE_MOD, 1}; +// H5::DataSpace f_m_space (2, f_metadata_dim); +// hsize_t f_m_count[] = {REPLAY_READ_BLOCK_SIZE, 1}; +// hsize_t f_m_start[] = {start_index, 0}; +// f_m_space.selectHyperslab(H5S_SELECT_SET, f_m_count, f_m_start); +// +// H5::H5File input_file(filename, H5F_ACC_RDONLY); +// +// auto image_dataset = input_file.openDataSet("image"); +// image_dataset.read( +// image_buffer, H5::PredType::NATIVE_UINT16, +// b_i_space, f_i_space); +// +// auto pulse_id_dataset = input_file.openDataSet("pulse_id"); +// pulse_id_dataset.read( +// metadata_buffer->pulse_id, H5::PredType::NATIVE_UINT64, +// b_m_space, f_m_space); +// +// auto frame_id_dataset = input_file.openDataSet("frame_id"); +// frame_id_dataset.read( +// metadata_buffer->frame_index, H5::PredType::NATIVE_UINT64, +// b_m_space, f_m_space); +// +// auto daq_rec_dataset = input_file.openDataSet("daq_rec"); +// daq_rec_dataset.read( +// metadata_buffer->daq_rec, H5::PredType::NATIVE_UINT32, +// b_m_space, f_m_space); +// +// auto received_packets_dataset = +// input_file.openDataSet("received_packets"); +// received_packets_dataset.read( +// metadata_buffer->n_received_packets, H5::PredType::NATIVE_UINT16, +// b_m_space, f_m_space); +// +// input_file.close(); +//} \ No newline at end of file From 7e7593717cc9bdb3cce339b380365e80503fa5ce Mon Sep 17 00:00:00 2001 From: babic_a Date: Mon, 4 May 2020 09:05:13 +0200 Subject: [PATCH 10/23] move sf_replay to other numa nodes and make a quasi online stream (one hour replay) to test --- scripts/JF07-replay-worker.sh | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/scripts/JF07-replay-worker.sh b/scripts/JF07-replay-worker.sh index 6d77992..aacf7c0 100644 --- a/scripts/JF07-replay-worker.sh +++ b/scripts/JF07-replay-worker.sh @@ -8,20 +8,19 @@ fi M=$1 -#8 replay workers per core, last writer worker occupies 4 -coreAssociated=(17 17 17 17 17 17 17 17 18 18 18 18 18 18 18 18 19 19 19 19 19 19 19 19 20 20 20 20 20 20 20 20 21,22,23,24) +#8 replay workers per core, last (stream to visualisation) worker occupies 4 +coreAssociated=(20 20 20 20 20 20 20 20 21 21 21 21 21 21 21 21 22 22 22 22 22 22 22 22 23 23 23 23 23 23 23 23 24,25,26,27) latest_file=`cat /gpfs/photonics/swissfel/buffer/JF07T32V01/M00/LATEST` last_pulse_id=`basename ${latest_file} | sed 's/.h5//'` -#first_pulse_id=$((${last_pulse_id}-100000)) -first_pulse_id=$((${last_pulse_id}-100000)) +first_pulse_id=$((${last_pulse_id}-360000)) echo "First/last pulse_id : ${first_pulse_id} ${last_pulse_id}" if [ ${M} == 32 ] then # taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_writer /gpfs/photonics/swissfel/buffer/test.${first_pulse_id}-${last_pulse_id}.h5 ${first_pulse_id} ${last_pulse_id} - taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_stream tcp://129.129.241.42:9007 30 tcp://129.129.241.42:9107 30 + taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_stream tcp://129.129.241.42:9007 30 tcp://192.168.30.29:9107 30 else taskset -c ${coreAssociated[10#${M}]} /usr/bin/sf_replay /gpfs/photonics/swissfel/buffer/JF07T32V01 M${M} ${M} ${first_pulse_id} ${last_pulse_id} fi From 5221d53f36f92715029dc47252becc412a7e0e31 Mon Sep 17 00:00:00 2001 From: babic_a Date: Mon, 4 May 2020 09:07:27 +0200 Subject: [PATCH 11/23] in case of missing completely frame, don't raise exception --- sf-buffer/src/sf_replay.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sf-buffer/src/sf_replay.cpp b/sf-buffer/src/sf_replay.cpp index 3be2e80..1ac22a6 100644 --- a/sf-buffer/src/sf_replay.cpp +++ b/sf-buffer/src/sf_replay.cpp @@ -155,7 +155,7 @@ void sf_replay ( return; } - if (current_pulse_id != module_frame.pulse_id) { + if (current_pulse_id != module_frame.pulse_id and module_frame.pulse_id != 0) { stringstream err_msg; using namespace date; From 5f8b8898876dc1a0dc4b1d21f25c12da9cdefeb6 Mon Sep 17 00:00:00 2001 From: babic_a Date: Mon, 4 May 2020 09:08:24 +0200 Subject: [PATCH 12/23] added stream for live analysis to test --- sf-buffer/src/sf_stream.cpp | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/sf-buffer/src/sf_stream.cpp b/sf-buffer/src/sf_stream.cpp index 1303692..099777f 100644 --- a/sf-buffer/src/sf_stream.cpp +++ b/sf-buffer/src/sf_stream.cpp @@ -292,6 +292,39 @@ int main (int argc, char *argv[]) 0); } + //same for live analysis + int send_live_analysis = 0; + if ( reduction_factor_live_analysis > 1 ) { + send_live_analysis = rand() % reduction_factor_live_analysis; + } + if ( send_live_analysis == 0 ) { + header["shape"][0] = 16384; + header["shape"][1] = 1024; + } else{ + header["shape"][0] = 2; + header["shape"][1] = 2; + } + + text_header = Json::writeString(builder, header); + + zmq_send(socket_live, + text_header.c_str(), + text_header.size(), + ZMQ_SNDMORE); + + if ( send_live_analysis == 0 ) { + zmq_send(socket_live, + (char*)data, + core_buffer::MODULE_N_BYTES*n_modules, + 0); + } else { + zmq_send(socket_live, + (char*)data_empty, + 8, + 0); + } + + } queue.release(); From 8148802325c07caa9e7502f1d3a6e2cb639b67f8 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 4 May 2020 09:16:25 +0200 Subject: [PATCH 13/23] Add new parameter for buffer --- core-buffer/include/buffer_config.hpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core-buffer/include/buffer_config.hpp b/core-buffer/include/buffer_config.hpp index b9a8349..01c0602 100644 --- a/core-buffer/include/buffer_config.hpp +++ b/core-buffer/include/buffer_config.hpp @@ -31,6 +31,9 @@ namespace core_buffer { // Microseconds timeout for UDP recv. const int BUFFER_UDP_US_TIMEOUT = 10 * 1000; + // Output queue length for buffer live stream. + const int BUFFER_LIVE_SEND_HWM = 10; + // ZMQ threads for receiving data from sf_replay. const int WRITER_ZMQ_IO_THREADS = 2; From c001faac9a200fa4a1841f4d2ff1c2eaffaf814f Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 4 May 2020 09:16:36 +0200 Subject: [PATCH 14/23] Comment out not yet working code --- core-buffer/src/LiveH5Reader.cpp | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/core-buffer/src/LiveH5Reader.cpp b/core-buffer/src/LiveH5Reader.cpp index a5955c9..e5ef1d8 100644 --- a/core-buffer/src/LiveH5Reader.cpp +++ b/core-buffer/src/LiveH5Reader.cpp @@ -13,19 +13,19 @@ LiveH5Reader::LiveH5Reader( pulse_id_buffer_(make_unique(FILE_MOD)), data_buffer_(make_unique(MODULE_N_PIXELS)) { - auto filename = BufferUtils::get_latest_file(current_filename_); - file_ = H5::H5File(filename, H5F_ACC_RDONLY | H5F_ACC_SWMR_READ); - - uint64_t base_pulse_id = start_pulse_id / core_buffer::FILE_MOD; - base_pulse_id *= core_buffer::FILE_MOD; - - current_file_max_pulse_id_ = - - image_dataset_ = input_file.openDataSet("image"); - pulse_id_dataset_ = input_file.openDataSet("pulse_id"); - frame_index_dataset_ = input_file.openDataSet("frame_id"); - daq_rec_dataset_ = input_file.openDataSet("daq_rec"); - n_received_packets_dataset_ = input_file.openDataSet("received_packets"); +// auto filename = BufferUtils::get_latest_file(current_filename_); +// file_ = H5::H5File(filename, H5F_ACC_RDONLY | H5F_ACC_SWMR_READ); +// +// uint64_t base_pulse_id = start_pulse_id / core_buffer::FILE_MOD; +// base_pulse_id *= core_buffer::FILE_MOD; +// +// current_file_max_pulse_id_ = +// +// image_dataset_ = input_file.openDataSet("image"); +// pulse_id_dataset_ = input_file.openDataSet("pulse_id"); +// frame_index_dataset_ = input_file.openDataSet("frame_id"); +// daq_rec_dataset_ = input_file.openDataSet("daq_rec"); +// n_received_packets_dataset_ = input_file.openDataSet("received_packets"); } From 487efb3f017b835edc7806146414bf6257966736 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 4 May 2020 09:16:49 +0200 Subject: [PATCH 15/23] Extend sf_buffer with streaming capabilities --- sf-buffer/src/sf_buffer.cpp | 41 ++++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/sf-buffer/src/sf_buffer.cpp b/sf-buffer/src/sf_buffer.cpp index 5515f9f..8914968 100644 --- a/sf-buffer/src/sf_buffer.cpp +++ b/sf-buffer/src/sf_buffer.cpp @@ -3,7 +3,7 @@ #include #include #include - +#include "zmq.h" #include "buffer_config.hpp" #include "jungfrau.hpp" @@ -16,10 +16,12 @@ int main (int argc, char *argv[]) { if (argc != 4) { cout << endl; cout << "Usage: sf_buffer [device_name] [udp_port] [root_folder]"; + cout << "[source_id]"; cout << endl; cout << "\tdevice_name: Name to write to disk."; cout << "\tudp_port: UDP port to connect to." << endl; cout << "\troot_folder: FS root folder." << endl; + cout << "\tsource_id: ID of the source for live stream." << endl; cout << endl; exit(-1); @@ -28,6 +30,25 @@ int main (int argc, char *argv[]) { string device_name = string(argv[1]); int udp_port = atoi(argv[2]); string root_folder = string(argv[3]); + int source_id = atoi(argv[2]); + + stringstream ipc_stream; + ipc_stream << "ipc://sf-live-" << source_id; + const auto ipc_address = ipc_stream.str(); + + auto ctx = zmq_ctx_new(); + auto socket = zmq_socket(ctx, ZMQ_PUB); + + const int sndhwm = BUFFER_LIVE_SEND_HWM; + if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) + throw runtime_error(strerror (errno)); + + const int linger_ms = 0; + if (zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0) + throw runtime_error(strerror (errno)); + + if (zmq_connect(socket, ipc_address.c_str()) != 0) + throw runtime_error(strerror (errno)); RingBuffer ring_buffer(BUFFER_RB_SIZE); @@ -78,6 +99,24 @@ int main (int argc, char *argv[]) { "received_packets", &(data.first->n_recv_packets)); + ModuleFrame metadata = { + metadata.pulse_id, + metadata.frame_index, + metadata.daq_rec, + metadata.n_received_packets, + (uint16_t) source_id + }; + + zmq_send(socket, + &metadata, + sizeof(ModuleFrame), + ZMQ_SNDMORE); + + zmq_send(socket, + data.second, + MODULE_N_BYTES, + 0); + ring_buffer.release(data.first->buffer_slot_index); // TODO: Make real statistics, please. From e7ffad0592d7766fb09f77b5f5f201f22813ce99 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 4 May 2020 09:18:44 +0200 Subject: [PATCH 16/23] SF buffer should bind --- sf-buffer/src/sf_buffer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sf-buffer/src/sf_buffer.cpp b/sf-buffer/src/sf_buffer.cpp index 8914968..2a9f443 100644 --- a/sf-buffer/src/sf_buffer.cpp +++ b/sf-buffer/src/sf_buffer.cpp @@ -47,7 +47,7 @@ int main (int argc, char *argv[]) { if (zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0) throw runtime_error(strerror (errno)); - if (zmq_connect(socket, ipc_address.c_str()) != 0) + if (zmq_bind(socket, ipc_address.c_str()) != 0) throw runtime_error(strerror (errno)); RingBuffer ring_buffer(BUFFER_RB_SIZE); From 6bc82e437ad2d036c058c21662b6a56d695fc5d1 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 4 May 2020 09:53:07 +0200 Subject: [PATCH 17/23] Add module frame to FastQueue --- core-buffer/src/FastQueue.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core-buffer/src/FastQueue.cpp b/core-buffer/src/FastQueue.cpp index 3e9765f..e39ecc4 100644 --- a/core-buffer/src/FastQueue.cpp +++ b/core-buffer/src/FastQueue.cpp @@ -1,5 +1,6 @@ #include #include +#include #include "FastQueue.hpp" using namespace std; @@ -102,4 +103,5 @@ void FastQueue::release() read_slot_id_ %= n_slots_; } -template class FastQueue; \ No newline at end of file +template class FastQueue; +template class FastQueue; \ No newline at end of file From b436ba12da629efbed53611a7bd155eceffe1105 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 4 May 2020 09:53:27 +0200 Subject: [PATCH 18/23] Add parameters for new sf_buffer implementation --- core-buffer/include/buffer_config.hpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core-buffer/include/buffer_config.hpp b/core-buffer/include/buffer_config.hpp index 01c0602..1b9c3a8 100644 --- a/core-buffer/include/buffer_config.hpp +++ b/core-buffer/include/buffer_config.hpp @@ -26,7 +26,10 @@ namespace core_buffer { const size_t REPLAY_READ_BLOCK_SIZE = 100; // Size of sf_buffer RB in elements. - const size_t BUFFER_RB_SIZE = 1000; + const size_t BUFFER_INTERNAL_QUEUE_SIZE = 1000; + + // Time to sleep before retrying to read the queue. + const size_t BUFFER_QUEUE_RETRY_MS = 10; // Microseconds timeout for UDP recv. const int BUFFER_UDP_US_TIMEOUT = 10 * 1000; From bf62169fb90a5bfa1e6b24866be0c74b4da32c80 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 4 May 2020 09:56:22 +0200 Subject: [PATCH 19/23] Change sf_buffer to new fast queue --- sf-buffer/src/sf_buffer.cpp | 44 +++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/sf-buffer/src/sf_buffer.cpp b/sf-buffer/src/sf_buffer.cpp index 2a9f443..4bd6f0a 100644 --- a/sf-buffer/src/sf_buffer.cpp +++ b/sf-buffer/src/sf_buffer.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "zmq.h" #include "buffer_config.hpp" #include "jungfrau.hpp" @@ -50,9 +51,9 @@ int main (int argc, char *argv[]) { if (zmq_bind(socket, ipc_address.c_str()) != 0) throw runtime_error(strerror (errno)); - RingBuffer ring_buffer(BUFFER_RB_SIZE); + FastQueue queue(MODULE_N_BYTES, BUFFER_INTERNAL_QUEUE_SIZE); - UdpRecvModule udp_module(ring_buffer); + UdpRecvModule udp_module(queue); udp_module.start_recv(udp_port, JUNGFRAU_DATA_BYTES_PER_FRAME); uint64_t stats_counter(0); @@ -70,61 +71,56 @@ int main (int argc, char *argv[]) { writer.add_scalar_metadata("received_packets"); while (true) { - auto data = ring_buffer.read(); + auto slot_id = queue.read(); - if (data.first == nullptr) { - this_thread::sleep_for(chrono::milliseconds(10)); + if (slot_id == -1){ + this_thread::sleep_for(chrono::milliseconds(BUFFER_QUEUE_RETRY_MS)); continue; } - auto pulse_id = data.first->pulse_id; + ModuleFrame* metadata = queue.get_metadata_buffer(slot_id); + char* data = queue.get_data_buffer(slot_id); + + auto pulse_id = metadata->pulse_id; writer.set_pulse_id(pulse_id); - writer.write_data(data.second); + writer.write_data(data); // TODO: Combine all this into 1 struct. writer.write_scalar_metadata( - "pulse_id", &(data.first->pulse_id)); + "pulse_id", &(metadata->pulse_id)); writer.write_scalar_metadata( "frame_id", - &(data.first->frame_index)); + &(metadata->frame_index)); writer.write_scalar_metadata( "daq_rec", - &(data.first->daq_rec)); + &(metadata->daq_rec)); writer.write_scalar_metadata( "received_packets", - &(data.first->n_recv_packets)); - - ModuleFrame metadata = { - metadata.pulse_id, - metadata.frame_index, - metadata.daq_rec, - metadata.n_received_packets, - (uint16_t) source_id - }; + &(metadata->n_received_packets)); zmq_send(socket, - &metadata, + metadata, sizeof(ModuleFrame), ZMQ_SNDMORE); zmq_send(socket, - data.second, + data, MODULE_N_BYTES, 0); - ring_buffer.release(data.first->buffer_slot_index); + queue.release(); // TODO: Make real statistics, please. stats_counter++; - if (data.first->n_recv_packets < JUNGFRAU_N_PACKETS_PER_FRAME) { + if (metadata->n_received_packets < JUNGFRAU_N_PACKETS_PER_FRAME) { n_missed_packets += - JUNGFRAU_N_PACKETS_PER_FRAME - data.first->n_recv_packets; + JUNGFRAU_N_PACKETS_PER_FRAME - metadata->n_received_packets; } if (last_pulse_id>0) { From a2e609c47b88cdf3ebc2629173007e8d1239a376 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 4 May 2020 10:13:26 +0200 Subject: [PATCH 20/23] Update UDP Module interface --- sf-buffer/src/sf_buffer.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sf-buffer/src/sf_buffer.cpp b/sf-buffer/src/sf_buffer.cpp index 4bd6f0a..04da883 100644 --- a/sf-buffer/src/sf_buffer.cpp +++ b/sf-buffer/src/sf_buffer.cpp @@ -53,8 +53,7 @@ int main (int argc, char *argv[]) { FastQueue queue(MODULE_N_BYTES, BUFFER_INTERNAL_QUEUE_SIZE); - UdpRecvModule udp_module(queue); - udp_module.start_recv(udp_port, JUNGFRAU_DATA_BYTES_PER_FRAME); + UdpRecvModule udp_module(queue, udp_port); uint64_t stats_counter(0); uint64_t n_missed_packets = 0; From 2d279435fc196a8d980f5269e350272cbd80464e Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 4 May 2020 10:13:48 +0200 Subject: [PATCH 21/23] Adapt UdpRecvModule interface --- core-buffer/include/UdpRecvModule.hpp | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/core-buffer/include/UdpRecvModule.hpp b/core-buffer/include/UdpRecvModule.hpp index e884cc0..723d35c 100644 --- a/core-buffer/include/UdpRecvModule.hpp +++ b/core-buffer/include/UdpRecvModule.hpp @@ -2,30 +2,23 @@ #define UDPRECVMODULE_HPP #include "RingBuffer.hpp" +#include "FastQueue.hpp" +#include "jungfrau.hpp" #include class UdpRecvModule { - RingBuffer& ring_buffer_; - - std::atomic_bool is_receiving_; + FastQueue& queue_; std::thread receiving_thread_; + std::atomic_bool is_receiving_; protected: - void receive_thread( - const uint16_t udp_port, - const size_t frame_size); + void receive_thread(const uint16_t udp_port); public: - UdpRecvModule(RingBuffer& ring_buffer); - + UdpRecvModule(FastQueue& queue, const uint16_t udp_port); virtual ~UdpRecvModule(); - void start_recv( - const uint16_t udp_port, - const size_t frame_n_bytes); - void stop_recv(); - bool is_receiving(); }; From 7841a70698be77d4af472a096eac2ba35ef2bed7 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 4 May 2020 10:17:36 +0200 Subject: [PATCH 22/23] Started cleanup of UdpRecvModule --- core-buffer/src/UdpRecvModule.cpp | 85 ++++++++----------------------- 1 file changed, 20 insertions(+), 65 deletions(-) diff --git a/core-buffer/src/UdpRecvModule.cpp b/core-buffer/src/UdpRecvModule.cpp index c0514af..c4f2934 100644 --- a/core-buffer/src/UdpRecvModule.cpp +++ b/core-buffer/src/UdpRecvModule.cpp @@ -5,103 +5,60 @@ using namespace std; -UdpRecvModule::UdpRecvModule(RingBuffer& ring_buffer) : - ring_buffer_(ring_buffer), - is_receiving_(false) +UdpRecvModule::UdpRecvModule( + FastQueue& queue, + const uint16_t udp_port) : + queue_(queue), + is_receiving_(true) { - -} - -UdpRecvModule::~UdpRecvModule() -{ - stop_recv(); -} - -void UdpRecvModule::start_recv( - const uint16_t udp_port, - const size_t frame_n_bytes) -{ - if (is_receiving_ == true) { - std::stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[UdpRecvModule::start_recv]"; - err_msg << " Receivers already running." << endl; - - throw runtime_error(err_msg.str()); - } - #ifdef DEBUG_OUTPUT using namespace date; using namespace chrono; cout << "[" << system_clock::now() << "]"; - cout << "[UdpRecvModule::start_recv]"; + cout << "[UdpRecvModule::UdpRecvModule]"; cout << " Starting with "; cout << "udp_port " << udp_port << endl; #endif - is_receiving_ = true; - - if (receiving_thread_.joinable()) { - receiving_thread_.join(); - } - receiving_thread_ = thread( &UdpRecvModule::receive_thread, this, - udp_port, - frame_n_bytes); + udp_port); } -void UdpRecvModule::stop_recv() +UdpRecvModule::~UdpRecvModule() { -#ifdef DEBUG_OUTPUT - using namespace date; - using namespace chrono; - cout << "[" << system_clock::now() << "]"; - cout << "UdpRecvModule::stop_recv"; - cout << " Stop receiving." << endl; -#endif - is_receiving_ = false; - - if (receiving_thread_.joinable()) { - receiving_thread_.join(); - } + receiving_thread_.join(); } -void UdpRecvModule::receive_thread( - const uint16_t udp_port, - const size_t frame_size) + +void UdpRecvModule::receive_thread(const uint16_t udp_port) { try { - ring_buffer_.initialize(frame_size); - UdpReceiver udp_receiver; udp_receiver.bind(udp_port); - auto metadata = make_shared(); - metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; - metadata->pulse_id = 0; - metadata->n_recv_packets = 0; + ModuleFrame* module_frame; + module_frame->pulse_id = 0; + module_frame->n_received_packets = 0; - char* frame_buffer = ring_buffer_.reserve(metadata); - if (frame_buffer == nullptr) { + jungfrau_packet packet_buffer; + + auto slot_id = queue_.reserve(); + + if (slot_id == -1) { stringstream err_msg; using namespace date; using namespace chrono; err_msg << "[" << system_clock::now() << "]"; err_msg << "[UdpRecvModule::receive_thread]"; - err_msg << " Ring buffer is full."; + err_msg << " Queue is full."; err_msg << endl; throw runtime_error(err_msg.str()); } - jungfrau_packet packet_buffer; - while (is_receiving_.load(memory_order_relaxed)) { if (!udp_receiver.receive( @@ -110,8 +67,6 @@ void UdpRecvModule::receive_thread( continue; } - auto* frame_metadata = metadata.get(); - // TODO: Horrible. Breake it down into methods. // First packet for this frame. From a6d065a285692e29da8f0414d3f6b389a3e86407 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 4 May 2020 12:17:54 +0200 Subject: [PATCH 23/23] Make UdpReceiver quicker and more simple --- core-buffer/include/UdpRecvModule.hpp | 8 ++ core-buffer/src/UdpRecvModule.cpp | 109 +++++++++----------------- 2 files changed, 45 insertions(+), 72 deletions(-) diff --git a/core-buffer/include/UdpRecvModule.hpp b/core-buffer/include/UdpRecvModule.hpp index 723d35c..36e9afc 100644 --- a/core-buffer/include/UdpRecvModule.hpp +++ b/core-buffer/include/UdpRecvModule.hpp @@ -12,6 +12,14 @@ class UdpRecvModule { std::thread receiving_thread_; std::atomic_bool is_receiving_; + inline void init_frame( + ModuleFrame* frame_metadata, + jungfrau_packet& packet_buffer); + + inline void reserve_next_frame_buffers( + ModuleFrame*& frame_metadata, + char*& frame_buffer); + protected: void receive_thread(const uint16_t udp_port); diff --git a/core-buffer/src/UdpRecvModule.cpp b/core-buffer/src/UdpRecvModule.cpp index c4f2934..f633f29 100644 --- a/core-buffer/src/UdpRecvModule.cpp +++ b/core-buffer/src/UdpRecvModule.cpp @@ -31,6 +31,30 @@ UdpRecvModule::~UdpRecvModule() receiving_thread_.join(); } +inline void UdpRecvModule::init_frame ( + ModuleFrame* frame_metadata, + jungfrau_packet& packet_buffer) +{ + frame_metadata->frame_index = packet_buffer.framenum; + frame_metadata->pulse_id = packet_buffer.bunchid; + frame_metadata->daq_rec = packet_buffer.debug; +} + +inline void UdpRecvModule::reserve_next_frame_buffers( + ModuleFrame*& frame_metadata, + char*& frame_buffer) +{ + int slot_id; + if ((slot_id = queue_.reserve()) == -1) + throw runtime_error("Queue is full."); + + frame_metadata = queue_.get_metadata_buffer(slot_id); + frame_metadata->pulse_id=0; + frame_metadata->n_received_packets=0; + + frame_buffer = queue_.get_data_buffer(slot_id); + memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); +} void UdpRecvModule::receive_thread(const uint16_t udp_port) { @@ -38,27 +62,12 @@ void UdpRecvModule::receive_thread(const uint16_t udp_port) UdpReceiver udp_receiver; udp_receiver.bind(udp_port); - ModuleFrame* module_frame; - module_frame->pulse_id = 0; - module_frame->n_received_packets = 0; + ModuleFrame* frame_metadata; + char* frame_buffer; + reserve_next_frame_buffers(frame_metadata, frame_buffer); jungfrau_packet packet_buffer; - auto slot_id = queue_.reserve(); - - if (slot_id == -1) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[UdpRecvModule::receive_thread]"; - err_msg << " Queue is full."; - err_msg << endl; - - throw runtime_error(err_msg.str()); - } - while (is_receiving_.load(memory_order_relaxed)) { if (!udp_receiver.receive( @@ -67,41 +76,16 @@ void UdpRecvModule::receive_thread(const uint16_t udp_port) continue; } - // TODO: Horrible. Breake it down into methods. - // First packet for this frame. if (frame_metadata->pulse_id == 0) { - frame_metadata->frame_index = packet_buffer.framenum; - frame_metadata->pulse_id = packet_buffer.bunchid; - frame_metadata->daq_rec = packet_buffer.debug; - // Packet from new frame, while we lost the last packet of - // previous frame. + init_frame(frame_metadata, packet_buffer); + + // Happens if the last packet from the previous frame gets lost. } else if (frame_metadata->pulse_id != packet_buffer.bunchid) { - ring_buffer_.commit(metadata); + queue_.commit(); + reserve_next_frame_buffers(frame_metadata, frame_buffer); - metadata = make_shared(); - metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; - metadata->pulse_id = 0; - metadata->n_recv_packets = 0; - - frame_buffer = ring_buffer_.reserve(metadata); - if (frame_buffer == nullptr) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[UdpRecvModule::receive_thread]"; - err_msg << " Ring buffer is full."; - err_msg << endl; - - throw runtime_error(err_msg.str()); - } - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); - - frame_metadata->frame_index = packet_buffer.framenum; - frame_metadata->pulse_id = packet_buffer.bunchid; - frame_metadata->daq_rec = packet_buffer.debug; + init_frame(frame_metadata, packet_buffer); } size_t frame_buffer_offset = @@ -112,32 +96,13 @@ void UdpRecvModule::receive_thread(const uint16_t udp_port) packet_buffer.data, JUNGFRAU_DATA_BYTES_PER_PACKET); - frame_metadata->n_recv_packets++; + frame_metadata->n_received_packets++; - // Frame finished with last packet. + // Last frame packet received. Frame finished. if (packet_buffer.packetnum == JUNGFRAU_N_PACKETS_PER_FRAME-1) { - ring_buffer_.commit(metadata); - - metadata = make_shared(); - metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; - metadata->pulse_id = 0; - metadata->n_recv_packets = 0; - - frame_buffer = ring_buffer_.reserve(metadata); - if (frame_buffer == nullptr) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[UdpRecvModule::receive_thread]"; - err_msg << " Ring buffer is full."; - err_msg << endl; - - throw runtime_error(err_msg.str()); - } - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + queue_.commit(); + reserve_next_frame_buffers(frame_metadata, frame_buffer); } }