From 298db2cab5aadde927bf9f4ce328bfc0e0c2591c Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 6 Jul 2021 09:22:59 +0200 Subject: [PATCH] Add simple re-try sync mechanism --- std-udp-sync/include/sync_config.hpp | 3 - std-udp-sync/src/ZmqPulseSyncReceiver.cpp | 91 ++++++----------------- 2 files changed, 22 insertions(+), 72 deletions(-) diff --git a/std-udp-sync/include/sync_config.hpp b/std-udp-sync/include/sync_config.hpp index ff8f500..253b4e2 100644 --- a/std-udp-sync/include/sync_config.hpp +++ b/std-udp-sync/include/sync_config.hpp @@ -1,8 +1,5 @@ namespace sync_config { - // If the modules are offset more than 1000 pulses, crush. - const uint64_t PULSE_OFFSET_LIMIT = 100; - // Number of times we try to re-sync in case of failure. const int SYNC_RETRY_LIMIT = 3; diff --git a/std-udp-sync/src/ZmqPulseSyncReceiver.cpp b/std-udp-sync/src/ZmqPulseSyncReceiver.cpp index c8a87cb..63682bd 100644 --- a/std-udp-sync/src/ZmqPulseSyncReceiver.cpp +++ b/std-udp-sync/src/ZmqPulseSyncReceiver.cpp @@ -39,88 +39,41 @@ ZmqPulseSyncReceiver::~ZmqPulseSyncReceiver() PulseAndSync ZmqPulseSyncReceiver::get_next_pulse_id() const { - uint64_t pulses[n_modules_]; - - bool modules_in_sync = true; - for (int i = 0; i < n_modules_; i++) { - zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0); - if (pulses[0] != pulses[i]) { - modules_in_sync = false; - } - } - - if (modules_in_sync) { - #ifdef DEBUG_OUTPUT - using namespace date; - cout << " [" << std::chrono::system_clock::now(); - cout << "] [ZmqPulseSyncReceiver::get_next_pulse_id] "; - cout << "] (modules_in_sync) Frame index:" << pulses[0]; - cout << endl; - #endif - return {pulses[0], 0}; - } - - // How many pulses we lost in total to get the next pulse_id. - uint32_t n_lost_pulses = 0; - for (int i_sync=0; i_sync < SYNC_RETRY_LIMIT; i_sync++) { - uint64_t min_pulse_id = numeric_limits::max();; - uint64_t max_pulse_id = 0; + uint64_t ids[n_modules_]; + for (uint32_t i_sync=0; i_sync < SYNC_RETRY_LIMIT; i_sync++) { + bool modules_in_sync = true; for (int i = 0; i < n_modules_; i++) { - min_pulse_id = min(min_pulse_id, pulses[i]); - max_pulse_id = max(max_pulse_id, pulses[i]); - } - auto max_diff = max_pulse_id - min_pulse_id; - if (max_diff > PULSE_OFFSET_LIMIT) { - stringstream err_msg; - err_msg << "[ZmqPulseSyncReceiver::get_next_pulse_id]"; - err_msg << " PULSE_OFFSET_LIMIT exceeded."; - err_msg << " max_diff=" << max_diff << " pulses."; + zmq_recv(sockets_[i], &ids[i], sizeof(uint64_t), 0); - for (int i = 0; i < n_modules_; i++) { - err_msg << " (module " << i << ", "; - err_msg << pulses[i] << "),"; - } - err_msg << endl; - - throw runtime_error(err_msg.str()); - } - - modules_in_sync = true; - // Max pulses we lost in this sync attempt. - uint32_t i_sync_lost_pulses = 0; - for (int i = 0; i < n_modules_; i++) { - // How many pulses we lost for this specific module. - uint32_t i_module_lost_pulses = 0; - while (pulses[i] < max_pulse_id) { - zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0); - i_module_lost_pulses++; - } - - i_sync_lost_pulses = max(i_sync_lost_pulses, i_module_lost_pulses); - - if (pulses[i] != max_pulse_id) { + if (ids[0] != ids[i]) { modules_in_sync = false; } } - n_lost_pulses += i_sync_lost_pulses; if (modules_in_sync) { - #ifdef DEBUG_OUTPUT - using namespace date; - cout << " [" << std::chrono::system_clock::now(); - cout << "] [ZmqPulseSyncReceiver::get_next_pulse_id]"; - cout << " modules_in_sync false"; - cout << endl; - #endif - return {pulses[0], n_lost_pulses}; + return {ids[0], i_sync}; } + + #ifdef DEBUG_OUTPUT + using namespace date; + cout << "[" << std::chrono::system_clock::now() << "]"; + cout << " [ZmqPulseSyncReceiver::get_next_pulse_id]"; + cout << " Modules out of sync:" << endl; + for (int i=0; i < n_modules_; i++) { + cout << " module" << i << ":" << ids[i]; + } + cout << endl; + #endif } stringstream err_msg; - err_msg << "[ZmqLiveReceiver::get_next_pulse_id]"; - err_msg << " SYNC_RETRY_LIMIT exceeded."; + err_msg << "[ZmqPulseSyncReceiver::get_next_pulse_id]"; + err_msg << " SYNC_RETRY_LIMIT exceeded. State:"; + for (int i=0; i < n_modules_; i++) { + err_msg << " module" << i << ":" << ids[i]; + } err_msg << endl; throw runtime_error(err_msg.str());