Add resync capabilities to sf_streamer

This commit is contained in:
2020-05-08 10:33:25 +02:00
parent ea909d1e7e
commit 3c6805a8e6
2 changed files with 61 additions and 23 deletions
+6
View File
@@ -4,6 +4,7 @@
#include "FastQueue.hpp"
#include <thread>
#include "jungfrau.hpp"
#include <vector>
class LiveRecvModule {
@@ -25,6 +26,11 @@ public:
void* connect_socket(size_t module_id);
void recv_single_module(void* socket, ModuleFrame* metadata, char* data);
void receive_thread(const size_t n_modules);
uint64_t align_modules(
const std::vector<void*>& sockets,
ModuleFrameBuffer *metadata,
char *data);
};
+55 -23
View File
@@ -96,22 +96,54 @@ void LiveRecvModule::recv_single_module(
}
}
uint64_t LiveRecvModule::align_modules(
const vector<void*>& sockets, ModuleFrameBuffer *metadata, char *data)
{
uint64_t max_pulse_id = 0;
// First pass - determine current max_pulse_id.
for (size_t i_module = 0; i_module < n_modules_; i_module++) {
auto& module_metadata = metadata->module[i_module];
max_pulse_id = max(max_pulse_id, module_metadata.pulse_id);
}
// Second pass - align all receivers to max_pulse_id.
for (size_t i_module = 0; i_module < n_modules_; i_module++) {
auto& module_metadata = metadata->module[i_module];
size_t diff_to_max = max_pulse_id - module_metadata.pulse_id;
for (size_t i = 0; i < diff_to_max; i++) {
recv_single_module(
sockets[i_module],
&module_metadata,
data + (MODULE_N_BYTES * i_module));
}
if (module_metadata.pulse_id != max_pulse_id) {
throw runtime_error("Cannot align pulse_ids.");
}
}
return max_pulse_id;
}
void LiveRecvModule::receive_thread(const size_t n_modules)
{
try {
void *sockets[n_modules];
vector<void*> sockets(n_modules);
for (size_t i = 0; i < n_modules; i++) {
sockets[i] = connect_socket(i);
}
uint64_t current_pulse_id = 0;
auto slot_id = queue_.reserve();
if (slot_id == -1) throw runtime_error("This cannot really happen");
auto metadata = queue_.get_metadata_buffer(slot_id);
auto data = queue_.get_data_buffer(slot_id);
// First pass - determine current max pulse id.
// First buffer load for alignment.
for (size_t i_module = 0; i_module < n_modules; i_module++) {
auto& module_metadata = metadata->module[i_module];
@@ -119,26 +151,9 @@ void LiveRecvModule::receive_thread(const size_t n_modules)
sockets[i_module],
&module_metadata,
data + (MODULE_N_BYTES * i_module));
current_pulse_id = max(current_pulse_id, module_metadata.pulse_id);
}
// Second pass - align all receivers to the max pulse_id.
for (size_t i_module = 0; i_module < n_modules; i_module++) {
auto& module_metadata = metadata->module[i_module];
size_t diff_to_max = current_pulse_id - module_metadata.pulse_id;
for (size_t i = 0; i < diff_to_max; i++) {
recv_single_module(
sockets[i_module],
&module_metadata,
data + (MODULE_N_BYTES * i_module));
}
if (module_metadata.pulse_id != current_pulse_id) {
throw runtime_error("Cannot align pulse_ids.");
}
}
auto current_pulse_id = align_modules(sockets, metadata, data);
queue_.commit();
current_pulse_id++;
@@ -154,6 +169,7 @@ void LiveRecvModule::receive_thread(const size_t n_modules)
metadata = queue_.get_metadata_buffer(slot_id);
data = queue_.get_data_buffer(slot_id);
bool sync_needed = false;
for (size_t i_module = 0; i_module < n_modules; i_module++) {
auto& module_metadata = metadata->module[i_module];
@@ -162,11 +178,27 @@ void LiveRecvModule::receive_thread(const size_t n_modules)
&module_metadata,
data + (MODULE_N_BYTES * i_module));
if (current_pulse_id != module_metadata.pulse_id) {
throw runtime_error("Modules out of sync.");
if (module_metadata.pulse_id != current_pulse_id) {
sync_needed = true;
}
}
if (sync_needed) {
auto start_time = chrono::steady_clock::now();
auto new_pulse_id = align_modules(sockets, metadata, data);
auto lost_pulses = new_pulse_id - current_pulse_id;
current_pulse_id = new_pulse_id;
auto end_time = chrono::steady_clock::now();
auto us_duration = chrono::duration_cast<chrono::microseconds>(
end_time-start_time).count();
cout << "sf_stream:sync_lost_pulses " << lost_pulses;
cout << " sf_stream::sync_us " << us_duration;
cout << endl;
}
queue_.commit();
current_pulse_id++;
}