From cccba2f771415b2c95ee74b6c143b616fa7f1ff5 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 22 Apr 2020 12:33:05 +0200 Subject: [PATCH] Add simple back pressure mechanism --- sf-buffer/src/sf_replay.cpp | 10 ++++++++++ sf-writer/sf_h5_writer.cpp | 26 +++++++++++++++++++++++--- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/sf-buffer/src/sf_replay.cpp b/sf-buffer/src/sf_replay.cpp index 4e3f7ba..960840f 100644 --- a/sf-buffer/src/sf_replay.cpp +++ b/sf-buffer/src/sf_replay.cpp @@ -47,12 +47,17 @@ int main (int argc, char *argv[]) { auto ctx = zmq_ctx_new(); auto socket = zmq_socket(ctx, ZMQ_PUSH); + auto more_socket = zmq_socket(ctx, ZMQ_SUB); //TODO: Use ipc? if (zmq_connect(socket, "tcp://localhost:50000") != 0) { throw runtime_error(strerror (errno)); } + if (zmq_connect(more_socket, "tcp://localhost:50001") != 0) { + throw runtime_error(strerror (errno)); + } + int status = 0; int sndhwm = 1000; @@ -121,6 +126,11 @@ int main (int argc, char *argv[]) { (char*) (image_buffer.get() + (i_frame * 512 * 1024)), 512 * 1024 * 2, 0); + + if ((i_frame>0) && (i_frame % 100 == 0)) { + // Waiting to send more. + zmq_recv(more_socket, nullptr, 0, 0); + } } } diff --git a/sf-writer/sf_h5_writer.cpp b/sf-writer/sf_h5_writer.cpp index e0cbe59..a261252 100644 --- a/sf-writer/sf_h5_writer.cpp +++ b/sf-writer/sf_h5_writer.cpp @@ -34,12 +34,17 @@ int main (int argc, char *argv[]) zmq_ctx_set (ctx, ZMQ_IO_THREADS, 16); auto socket = zmq_socket(ctx, ZMQ_PULL); + auto more_socket = zmq_socket(ctx, ZMQ_PUB); //TODO: Use ipc? if (zmq_bind(socket, "tcp://127.0.0.1:50000") != 0) { throw runtime_error(strerror (errno)); } + if (zmq_bind(more_socket, "tcp://127.0.0.1:50001") != 0) { + throw runtime_error(strerror (errno)); + } + int rcvhwm = 10000; if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) { throw runtime_error(strerror (errno)); @@ -53,6 +58,7 @@ int main (int argc, char *argv[]) auto metadata_buffer = make_unique(); auto image_buffer = make_unique(512 * 1024); + auto received_counter = unordered_map(); while (true) { auto n_bytes_metadata = zmq_recv( @@ -76,9 +82,23 @@ int main (int argc, char *argv[]) throw runtime_error("Unexpected number of bytes in image."); } - cout << "Received " << metadata_buffer->pulse_id; - cout << " from " << metadata_buffer->module_id; - cout << endl; + if (received_counter.find(metadata_buffer->pulse_id) == + received_counter.end()) { + received_counter.insert({metadata_buffer->pulse_id, 31}); + } else { + received_counter[metadata_buffer->pulse_id]--; + + if (received_counter[metadata_buffer->pulse_id] == 0) { + received_counter.erase(metadata_buffer->pulse_id); + } + } + + size_t n_in_progress_frames = received_counter.size(); + cout << "n frames in progress " << n_in_progress_frames << endl; + + if (n_in_progress_frames == 0) { + zmq_send(more_socket, nullptr, 0, 0); + } } zmq_close(socket);