diff --git a/core-buffer/include/LiveRecvModule.hpp b/core-buffer/include/LiveRecvModule.hpp index b6ea20f..27b72c0 100644 --- a/core-buffer/include/LiveRecvModule.hpp +++ b/core-buffer/include/LiveRecvModule.hpp @@ -4,6 +4,7 @@ #include "FastQueue.hpp" #include #include "jungfrau.hpp" +#include class LiveRecvModule { @@ -25,6 +26,11 @@ public: void* connect_socket(size_t module_id); void recv_single_module(void* socket, ModuleFrame* metadata, char* data); void receive_thread(const size_t n_modules); + uint64_t align_modules( + const std::vector& sockets, + ModuleFrameBuffer *metadata, + char *data); + }; diff --git a/core-buffer/src/LiveRecvModule.cpp b/core-buffer/src/LiveRecvModule.cpp index 18cacea..082ca85 100644 --- a/core-buffer/src/LiveRecvModule.cpp +++ b/core-buffer/src/LiveRecvModule.cpp @@ -96,22 +96,54 @@ void LiveRecvModule::recv_single_module( } } +uint64_t LiveRecvModule::align_modules( + const vector& sockets, ModuleFrameBuffer *metadata, char *data) +{ + uint64_t max_pulse_id = 0; + + // First pass - determine current max_pulse_id. + for (size_t i_module = 0; i_module < n_modules_; i_module++) { + auto& module_metadata = metadata->module[i_module]; + max_pulse_id = max(max_pulse_id, module_metadata.pulse_id); + } + + // Second pass - align all receivers to max_pulse_id. + for (size_t i_module = 0; i_module < n_modules_; i_module++) { + auto& module_metadata = metadata->module[i_module]; + + size_t diff_to_max = max_pulse_id - module_metadata.pulse_id; + for (size_t i = 0; i < diff_to_max; i++) { + recv_single_module( + sockets[i_module], + &module_metadata, + data + (MODULE_N_BYTES * i_module)); + } + + if (module_metadata.pulse_id != max_pulse_id) { + throw runtime_error("Cannot align pulse_ids."); + } + } + + return max_pulse_id; +} + void LiveRecvModule::receive_thread(const size_t n_modules) { try { - void *sockets[n_modules]; + vector sockets(n_modules); + for (size_t i = 0; i < n_modules; i++) { sockets[i] = connect_socket(i); } - uint64_t current_pulse_id = 0; auto slot_id = queue_.reserve(); if (slot_id == -1) throw runtime_error("This cannot really happen"); auto metadata = queue_.get_metadata_buffer(slot_id); auto data = queue_.get_data_buffer(slot_id); - // First pass - determine current max pulse id. + + // First buffer load for alignment. for (size_t i_module = 0; i_module < n_modules; i_module++) { auto& module_metadata = metadata->module[i_module]; @@ -119,26 +151,9 @@ void LiveRecvModule::receive_thread(const size_t n_modules) sockets[i_module], &module_metadata, data + (MODULE_N_BYTES * i_module)); - - current_pulse_id = max(current_pulse_id, module_metadata.pulse_id); } - // Second pass - align all receivers to the max pulse_id. - for (size_t i_module = 0; i_module < n_modules; i_module++) { - auto& module_metadata = metadata->module[i_module]; - - size_t diff_to_max = current_pulse_id - module_metadata.pulse_id; - for (size_t i = 0; i < diff_to_max; i++) { - recv_single_module( - sockets[i_module], - &module_metadata, - data + (MODULE_N_BYTES * i_module)); - } - - if (module_metadata.pulse_id != current_pulse_id) { - throw runtime_error("Cannot align pulse_ids."); - } - } + auto current_pulse_id = align_modules(sockets, metadata, data); queue_.commit(); current_pulse_id++; @@ -154,6 +169,7 @@ void LiveRecvModule::receive_thread(const size_t n_modules) metadata = queue_.get_metadata_buffer(slot_id); data = queue_.get_data_buffer(slot_id); + bool sync_needed = false; for (size_t i_module = 0; i_module < n_modules; i_module++) { auto& module_metadata = metadata->module[i_module]; @@ -162,11 +178,27 @@ void LiveRecvModule::receive_thread(const size_t n_modules) &module_metadata, data + (MODULE_N_BYTES * i_module)); - if (current_pulse_id != module_metadata.pulse_id) { - throw runtime_error("Modules out of sync."); + if (module_metadata.pulse_id != current_pulse_id) { + sync_needed = true; } } + if (sync_needed) { + auto start_time = chrono::steady_clock::now(); + + auto new_pulse_id = align_modules(sockets, metadata, data); + auto lost_pulses = new_pulse_id - current_pulse_id; + current_pulse_id = new_pulse_id; + + auto end_time = chrono::steady_clock::now(); + auto us_duration = chrono::duration_cast( + end_time-start_time).count(); + + cout << "sf_stream:sync_lost_pulses " << lost_pulses; + cout << " sf_stream::sync_us " << us_duration; + cout << endl; + } + queue_.commit(); current_pulse_id++; }