diff --git a/sf-stream/include/LiveRecvModule.hpp b/sf-stream/include/LiveRecvModule.hpp index d0820ef..f6d247b 100644 --- a/sf-stream/include/LiveRecvModule.hpp +++ b/sf-stream/include/LiveRecvModule.hpp @@ -21,7 +21,7 @@ class LiveRecvModule { void receive_thread(); void recv_single_module(void* socket, ModuleFrame* meta, char* data); uint64_t align_modules(const std::vector& sockets, - ModuleFrameBuffer *metadata, + ModuleFrameBuffer *meta, char *data); void stop(); diff --git a/sf-stream/src/LiveRecvModule.cpp b/sf-stream/src/LiveRecvModule.cpp index ba4d072..bd7ef05 100644 --- a/sf-stream/src/LiveRecvModule.cpp +++ b/sf-stream/src/LiveRecvModule.cpp @@ -92,34 +92,35 @@ void LiveRecvModule::recv_single_module( } uint64_t LiveRecvModule::align_modules( - const vector& sockets, ModuleFrameBuffer *metadata, char *data) + const vector& sockets, ModuleFrameBuffer *meta, char *data) { uint64_t max_pulse_id = 0; + uint64_t min_pulse_id = numeric_limits::max(); - // First pass - determine current max_pulse_id. - for (size_t i_module = 0; i_module < n_modules_; i_module++) { - auto& module_metadata = metadata->module[i_module]; - max_pulse_id = max(max_pulse_id, module_metadata.pulse_id); + // First pass - determine current min and max pulse_id. + for (auto& module_meta : meta->module) { + min_pulse_id = min(min_pulse_id, module_meta.pulse_id); + max_pulse_id = max(max_pulse_id, module_meta.pulse_id); } // Second pass - align all receivers to max_pulse_id. for (size_t i_module = 0; i_module < n_modules_; i_module++) { - auto& module_metadata = metadata->module[i_module]; + auto& module_meta = meta->module[i_module]; - size_t diff_to_max = max_pulse_id - module_metadata.pulse_id; + size_t diff_to_max = max_pulse_id - module_meta.pulse_id; for (size_t i = 0; i < diff_to_max; i++) { recv_single_module( sockets[i_module], - &module_metadata, + &module_meta, data + (MODULE_N_BYTES * i_module)); } - if (module_metadata.pulse_id != max_pulse_id) { + if (module_meta.pulse_id != max_pulse_id) { throw runtime_error("Cannot align pulse_ids."); } } - return max_pulse_id; + return max_pulse_id - min_pulse_id; } void LiveRecvModule::receive_thread() @@ -160,12 +161,12 @@ void LiveRecvModule::receive_thread() } if (sync_needed) { - auto start_time = chrono::steady_clock::now(); + auto start_time = steady_clock::now(); auto lost_pulses = align_modules(sockets, metadata, data); - auto end_time = chrono::steady_clock::now(); - auto us_duration = chrono::duration_cast( + auto end_time = steady_clock::now(); + auto us_duration = duration_cast( end_time-start_time).count(); cout << "sf_stream:sync_lost_pulses " << lost_pulses;