From b1e83014078be801f8f11d4ac8c0584717a3321b Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 22 Apr 2020 14:01:27 +0200 Subject: [PATCH] Reverted back to backpressure implementation --- sf-buffer/src/sf_replay.cpp | 30 ++++++++++-------------------- sf-writer/sf_h5_writer.cpp | 19 +++---------------- 2 files changed, 13 insertions(+), 36 deletions(-) diff --git a/sf-buffer/src/sf_replay.cpp b/sf-buffer/src/sf_replay.cpp index 377ea4c..a03f233 100644 --- a/sf-buffer/src/sf_replay.cpp +++ b/sf-buffer/src/sf_replay.cpp @@ -46,28 +46,19 @@ int main (int argc, char *argv[]) { start_pulse_id, stop_pulse_id); auto ctx = zmq_ctx_new(); + auto socket = zmq_socket(ctx, ZMQ_PUSH); - - - int status = 0; - int sndhwm = 1; - status += zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)); + if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) { + throw runtime_error(strerror (errno)); + }; int linger_ms = 0; - status += zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)); - - //status += zmq_setsockopt(socket, ZMQ_SNDTIMEO, 1000); - - if (status != 0) { + if (zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms))) { throw runtime_error(strerror (errno)); } - - //TODO: Use ipc? if (zmq_connect(socket, "ipc://writer") != 0) { throw runtime_error(strerror (errno)); } - //TODO: Use ipc? - auto meta_socket = zmq_socket(ctx, ZMQ_SUB); if (zmq_connect(meta_socket, "ipc://metadata") != 0) { @@ -76,12 +67,6 @@ int main (int argc, char *argv[]) { if (zmq_setsockopt(meta_socket, ZMQ_SUBSCRIBE, "", 0) != 0) { throw runtime_error(strerror (errno)); } - while (true) { - cout << "receiving " << endl; - uint64_t response; - zmq_recv(meta_socket, &response, sizeof(response), 0); - cout << "Done!! " << response << endl; - } for (const auto& suffix:path_suffixes) { metadata_buffer->start_pulse_id = suffix.start_pulse_id; @@ -138,6 +123,11 @@ int main (int argc, char *argv[]) { (char*) (image_buffer.get() + (i_frame * 512 * 1024)), 512 * 1024 * 2, 0); + + if ((i_frame > 0) && (i_frame%100 == 0)) { + // Wait for the sync message. + zmq_recv(meta_socket, nullptr, 0, 0); + } } } diff --git a/sf-writer/sf_h5_writer.cpp b/sf-writer/sf_h5_writer.cpp index d820926..f3ba308 100644 --- a/sf-writer/sf_h5_writer.cpp +++ b/sf-writer/sf_h5_writer.cpp @@ -36,39 +36,23 @@ int main (int argc, char *argv[]) zmq_ctx_set (ctx, ZMQ_IO_THREADS, 16); auto socket = zmq_socket(ctx, ZMQ_PULL); - - int rcvhwm = 1000; if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) { throw runtime_error(strerror (errno)); } - int linger = 0; if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { throw runtime_error(strerror (errno)); } - if (zmq_bind(socket, "ipc://writer") != 0) { throw runtime_error(strerror (errno)); } - - auto meta_socket = zmq_socket(ctx, ZMQ_PUB); if (zmq_bind(meta_socket, "ipc://metadata") != 0) { throw runtime_error(strerror (errno)); } - while(true) { - string test = "test"; - auto c_test = test.c_str(); - zmq_send(meta_socket, c_test, strlen(c_test), 0); - cout << "sent test" << endl; - this_thread::sleep_for(chrono::milliseconds(100)); - } - - - uint64_t test = 12; zmq_send(meta_socket, &test, sizeof(test),0); @@ -112,6 +96,9 @@ int main (int argc, char *argv[]) size_t n_in_progress_frames = received_counter.size(); cout << "n frames in progress " << n_in_progress_frames << endl; + if (n_in_progress_frames == 0) { + zmq_send(meta_socket, "", strlen(""), 0); + } } zmq_close(socket);