diff --git a/sf-stream/src/LiveRecvModule.cpp b/sf-stream/src/LiveRecvModule.cpp index a1f5b6f..ba4d072 100644 --- a/sf-stream/src/LiveRecvModule.cpp +++ b/sf-stream/src/LiveRecvModule.cpp @@ -6,6 +6,7 @@ #include "buffer_config.hpp" using namespace std; +using namespace chrono; using namespace core_buffer; LiveRecvModule::LiveRecvModule( @@ -131,36 +132,15 @@ void LiveRecvModule::receive_thread() sockets[i] = connect_socket(i); } - auto slot_id = queue_.reserve(); - if (slot_id == -1) throw runtime_error("This cannot really happen"); - - auto metadata = queue_.get_metadata_buffer(slot_id); - auto data = queue_.get_data_buffer(slot_id); - - // First buffer load for alignment. - for (size_t i_module = 0; i_module < n_modules_; i_module++) { - auto& module_metadata = metadata->module[i_module]; - - recv_single_module( - sockets[i_module], - &module_metadata, - data + (MODULE_N_BYTES * i_module)); - } - - align_modules(sockets, metadata, data); - - queue_.commit(); - + int slot_id; while(is_receiving_.load(memory_order_relaxed)) { - auto slot_id = queue_.reserve(); - if (slot_id == -1){ - this_thread::sleep_for(chrono::milliseconds(5)); - continue; + while ((slot_id == queue_.reserve()) == -1) { + this_thread::sleep_for(milliseconds(RB_READ_RETRY_INTERVAL_MS)); } - metadata = queue_.get_metadata_buffer(slot_id); - data = queue_.get_data_buffer(slot_id); + auto metadata = queue_.get_metadata_buffer(slot_id); + auto data = queue_.get_data_buffer(slot_id); uint64_t frame_pulse_id; bool sync_needed = false; @@ -174,7 +154,6 @@ void LiveRecvModule::receive_thread() if (i_module == 0) { frame_pulse_id = module_metadata.pulse_id; - } else if (frame_pulse_id != module_metadata.pulse_id) { sync_needed = true; }