From 8a839d8e592c100913eb1426add8b3843e524d55 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 22 Apr 2020 15:48:38 +0200 Subject: [PATCH] Try multi polling instead of single merge --- sf-buffer/src/sf_replay.cpp | 11 ---- sf-writer/sf_h5_writer.cpp | 118 +++++++++++++++++------------------- 2 files changed, 55 insertions(+), 74 deletions(-) diff --git a/sf-buffer/src/sf_replay.cpp b/sf-buffer/src/sf_replay.cpp index 385ba67..4a8a3a7 100644 --- a/sf-buffer/src/sf_replay.cpp +++ b/sf-buffer/src/sf_replay.cpp @@ -57,14 +57,6 @@ int main (int argc, char *argv[]) { throw runtime_error(strerror (errno)); } - auto meta_socket = zmq_socket(ctx, ZMQ_SUB); - if (zmq_connect(meta_socket, "ipc://metadata") != 0) { - throw runtime_error(strerror (errno)); - } - if (zmq_setsockopt(meta_socket, ZMQ_SUBSCRIBE, "", 0) != 0) { - throw runtime_error(strerror (errno)); - } - for (const auto& suffix:path_suffixes) { metadata_buffer->start_pulse_id = suffix.start_pulse_id; metadata_buffer->stop_pulse_id = suffix.stop_pulse_id; @@ -181,9 +173,6 @@ int main (int argc, char *argv[]) { H5::PredType::NATIVE_UINT16, H5::DataSpace::ALL, meta_space); } - - // Wait for sync. - zmq_recv(meta_socket, nullptr, 0, 0); } input_file.close(); diff --git a/sf-writer/sf_h5_writer.cpp b/sf-writer/sf_h5_writer.cpp index 3c8cf94..2aed9cb 100644 --- a/sf-writer/sf_h5_writer.cpp +++ b/sf-writer/sf_h5_writer.cpp @@ -8,6 +8,7 @@ #include #include #include +#include using namespace std; @@ -30,86 +31,77 @@ int main (int argc, char *argv[]) uint64_t start_pulse_id = (uint64_t) atoll(argv[2]); uint64_t stop_pulse_id = (uint64_t) atoll(argv[3]); - size_t n_modules = 32; - auto ctx = zmq_ctx_new(); zmq_ctx_set (ctx, ZMQ_IO_THREADS, 16); - auto socket = zmq_socket(ctx, ZMQ_PULL); - int rcvhwm = 1000; - if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) { - throw runtime_error(strerror (errno)); - } - int linger = 0; - if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { - throw runtime_error(strerror (errno)); - } - if (zmq_bind(socket, "ipc://writer") != 0) { - throw runtime_error(strerror (errno)); + size_t n_modules = 32; + void* sockets[n_modules]; + + for (size_t i=0; i(); - auto image_buffer = make_unique(512 * 1024); - unordered_map received_counter; - - size_t n_frames_left = 32*BufferUtils::STREAM_BLOCK_SIZE; while (true) { - auto n_bytes_metadata = zmq_recv( - socket, - metadata_buffer.get(), - sizeof(ModuleFrame), - 0); + uint64_t pulse_id = 0; - if (n_bytes_metadata != sizeof(ModuleFrame)) { - throw runtime_error("Unexpected number of bytes in metadata."); - } + for (size_t i=0; ipulse_id) == - received_counter.end()) { - received_counter.insert({metadata_buffer->pulse_id, 31}); - } else { - received_counter[metadata_buffer->pulse_id]--; - - if (received_counter[metadata_buffer->pulse_id] == 0) { - received_counter.erase(metadata_buffer->pulse_id); - } - } - - if (n_frames_left == 0) { - cout << "Batch finished." << endl; - for(auto& data:received_counter) { - cout << data.first << ": " << data.second << endl; + if (i == 0) { + pulse_id = metadata_buffer->pulse_id; + } + + if (pulse_id != metadata_buffer->pulse_id) { + cout << "Module " << i << " pulse " << metadata_buffer->pulse_id; + cout << " instead of " << pulse_id << endl; + } + + if (n_bytes_metadata != sizeof(ModuleFrame)) { + throw runtime_error("Unexpected number of bytes in metadata."); + } + + auto n_bytes_image = zmq_recv( + sockets[i], + image_buffer.get(), + 512 * 1024 * 2, + 0); + + if (n_bytes_image != 512 * 1024 * 2) { + cout << "n_bytes_image " << n_bytes_image << endl; + throw runtime_error("Unexpected number of bytes in image."); } - zmq_send(meta_socket, "", strlen(""), 0); - n_frames_left = 32*BufferUtils::STREAM_BLOCK_SIZE; } } - zmq_close(socket); + for (size_t i=0; i