mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-04-27 12:44:51 +02:00
Add simple re-try sync mechanism
This commit is contained in:
@@ -1,8 +1,5 @@
|
||||
namespace sync_config
|
||||
{
|
||||
// If the modules are offset more than 1000 pulses, crush.
|
||||
const uint64_t PULSE_OFFSET_LIMIT = 100;
|
||||
|
||||
// Number of times we try to re-sync in case of failure.
|
||||
const int SYNC_RETRY_LIMIT = 3;
|
||||
|
||||
|
||||
@@ -39,88 +39,41 @@ ZmqPulseSyncReceiver::~ZmqPulseSyncReceiver()
|
||||
|
||||
PulseAndSync ZmqPulseSyncReceiver::get_next_pulse_id() const
|
||||
{
|
||||
uint64_t pulses[n_modules_];
|
||||
|
||||
bool modules_in_sync = true;
|
||||
for (int i = 0; i < n_modules_; i++) {
|
||||
zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0);
|
||||
if (pulses[0] != pulses[i]) {
|
||||
modules_in_sync = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (modules_in_sync) {
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
cout << " [" << std::chrono::system_clock::now();
|
||||
cout << "] [ZmqPulseSyncReceiver::get_next_pulse_id] ";
|
||||
cout << "] (modules_in_sync) Frame index:" << pulses[0];
|
||||
cout << endl;
|
||||
#endif
|
||||
return {pulses[0], 0};
|
||||
}
|
||||
|
||||
// How many pulses we lost in total to get the next pulse_id.
|
||||
uint32_t n_lost_pulses = 0;
|
||||
for (int i_sync=0; i_sync < SYNC_RETRY_LIMIT; i_sync++) {
|
||||
uint64_t min_pulse_id = numeric_limits<uint64_t>::max();;
|
||||
uint64_t max_pulse_id = 0;
|
||||
uint64_t ids[n_modules_];
|
||||
|
||||
for (uint32_t i_sync=0; i_sync < SYNC_RETRY_LIMIT; i_sync++) {
|
||||
bool modules_in_sync = true;
|
||||
for (int i = 0; i < n_modules_; i++) {
|
||||
min_pulse_id = min(min_pulse_id, pulses[i]);
|
||||
max_pulse_id = max(max_pulse_id, pulses[i]);
|
||||
}
|
||||
|
||||
auto max_diff = max_pulse_id - min_pulse_id;
|
||||
if (max_diff > PULSE_OFFSET_LIMIT) {
|
||||
stringstream err_msg;
|
||||
err_msg << "[ZmqPulseSyncReceiver::get_next_pulse_id]";
|
||||
err_msg << " PULSE_OFFSET_LIMIT exceeded.";
|
||||
err_msg << " max_diff=" << max_diff << " pulses.";
|
||||
zmq_recv(sockets_[i], &ids[i], sizeof(uint64_t), 0);
|
||||
|
||||
for (int i = 0; i < n_modules_; i++) {
|
||||
err_msg << " (module " << i << ", ";
|
||||
err_msg << pulses[i] << "),";
|
||||
}
|
||||
err_msg << endl;
|
||||
|
||||
throw runtime_error(err_msg.str());
|
||||
}
|
||||
|
||||
modules_in_sync = true;
|
||||
// Max pulses we lost in this sync attempt.
|
||||
uint32_t i_sync_lost_pulses = 0;
|
||||
for (int i = 0; i < n_modules_; i++) {
|
||||
// How many pulses we lost for this specific module.
|
||||
uint32_t i_module_lost_pulses = 0;
|
||||
while (pulses[i] < max_pulse_id) {
|
||||
zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0);
|
||||
i_module_lost_pulses++;
|
||||
}
|
||||
|
||||
i_sync_lost_pulses = max(i_sync_lost_pulses, i_module_lost_pulses);
|
||||
|
||||
if (pulses[i] != max_pulse_id) {
|
||||
if (ids[0] != ids[i]) {
|
||||
modules_in_sync = false;
|
||||
}
|
||||
}
|
||||
n_lost_pulses += i_sync_lost_pulses;
|
||||
|
||||
if (modules_in_sync) {
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
cout << " [" << std::chrono::system_clock::now();
|
||||
cout << "] [ZmqPulseSyncReceiver::get_next_pulse_id]";
|
||||
cout << " modules_in_sync false";
|
||||
cout << endl;
|
||||
#endif
|
||||
return {pulses[0], n_lost_pulses};
|
||||
return {ids[0], i_sync};
|
||||
}
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
cout << "[" << std::chrono::system_clock::now() << "]";
|
||||
cout << " [ZmqPulseSyncReceiver::get_next_pulse_id]";
|
||||
cout << " Modules out of sync:" << endl;
|
||||
for (int i=0; i < n_modules_; i++) {
|
||||
cout << " module" << i << ":" << ids[i];
|
||||
}
|
||||
cout << endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
stringstream err_msg;
|
||||
err_msg << "[ZmqLiveReceiver::get_next_pulse_id]";
|
||||
err_msg << " SYNC_RETRY_LIMIT exceeded.";
|
||||
err_msg << "[ZmqPulseSyncReceiver::get_next_pulse_id]";
|
||||
err_msg << " SYNC_RETRY_LIMIT exceeded. State:";
|
||||
for (int i=0; i < n_modules_; i++) {
|
||||
err_msg << " module" << i << ":" << ids[i];
|
||||
}
|
||||
err_msg << endl;
|
||||
|
||||
throw runtime_error(err_msg.str());
|
||||
|
||||
Reference in New Issue
Block a user