diff --git a/sf-stream/include/ZmqPulseSyncReceiver.hpp b/sf-stream/include/ZmqPulseSyncReceiver.hpp index aa00035..624de34 100644 --- a/sf-stream/include/ZmqPulseSyncReceiver.hpp +++ b/sf-stream/include/ZmqPulseSyncReceiver.hpp @@ -8,6 +8,11 @@ #include "formats.hpp" +struct PulseAndSync { + const uint64_t pulse_id; + const uint32_t n_lost_pulses; +}; + class ZmqPulseSyncReceiver { void* ctx_; @@ -22,7 +27,7 @@ public: const int n_modules); ~ZmqPulseSyncReceiver(); - uint64_t get_next_pulse_id() const; + PulseAndSync get_next_pulse_id() const; }; diff --git a/sf-stream/src/ZmqPulseSyncReceiver.cpp b/sf-stream/src/ZmqPulseSyncReceiver.cpp index f309389..ead65ee 100644 --- a/sf-stream/src/ZmqPulseSyncReceiver.cpp +++ b/sf-stream/src/ZmqPulseSyncReceiver.cpp @@ -38,7 +38,7 @@ ZmqPulseSyncReceiver::~ZmqPulseSyncReceiver() } } -uint64_t ZmqPulseSyncReceiver::get_next_pulse_id() const +PulseAndSync ZmqPulseSyncReceiver::get_next_pulse_id() const { uint64_t pulses[n_modules_]; @@ -52,12 +52,12 @@ uint64_t ZmqPulseSyncReceiver::get_next_pulse_id() const } if (modules_in_sync) { - return pulses[0]; + return {pulses[0], 0}; } + // How many pulses we lost in total to get the next pulse_id. + uint32_t n_lost_pulses = 0; for (int i_sync=0; i_sync < SYNC_RETRY_LIMIT; i_sync++) { - cout << "Sync attempt " << i_sync << endl; - uint64_t min_pulse_id = numeric_limits::max();; uint64_t max_pulse_id = 0; @@ -83,18 +83,26 @@ uint64_t ZmqPulseSyncReceiver::get_next_pulse_id() const } modules_in_sync = true; + // Max pulses we lost in this sync attempt. + uint32_t i_sync_lost_pulses = 0; for (int i = 0; i < n_modules_; i++) { + // How many pulses we lost for this specific module. + uint32_t i_module_lost_pulses = 0; while (pulses[i] < max_pulse_id) { zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0); + i_module_lost_pulses++; } + i_sync_lost_pulses = max(i_sync_lost_pulses, i_module_lost_pulses); + if (pulses[i] != max_pulse_id) { modules_in_sync = false; } } + n_lost_pulses += i_sync_lost_pulses; if (modules_in_sync) { - return pulses[0]; + return {pulses[0], n_lost_pulses}; } }