From 76bde34d97a38548137c2a828e37463566672cef Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 22 Apr 2020 12:48:48 +0200 Subject: [PATCH] Trying ipc for transport protocol --- sf-buffer/src/sf_replay.cpp | 23 ++++++----------------- sf-writer/sf_h5_writer.cpp | 26 +++++++++----------------- 2 files changed, 15 insertions(+), 34 deletions(-) diff --git a/sf-buffer/src/sf_replay.cpp b/sf-buffer/src/sf_replay.cpp index 2ed0db8..3524b28 100644 --- a/sf-buffer/src/sf_replay.cpp +++ b/sf-buffer/src/sf_replay.cpp @@ -47,20 +47,10 @@ int main (int argc, char *argv[]) { auto ctx = zmq_ctx_new(); auto socket = zmq_socket(ctx, ZMQ_PUSH); - auto more_socket = zmq_socket(ctx, ZMQ_SUB); - - //TODO: Use ipc? - if (zmq_connect(socket, "tcp://localhost:50000") != 0) { - throw runtime_error(strerror (errno)); - } - - if (zmq_connect(more_socket, "tcp://localhost:50001") != 0) { - throw runtime_error(strerror (errno)); - } int status = 0; - int sndhwm = 1000; + int sndhwm = 100; status += zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)); int linger_ms = 0; status += zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)); @@ -71,6 +61,11 @@ int main (int argc, char *argv[]) { throw runtime_error(strerror (errno)); } + //TODO: Use ipc? + if (zmq_connect(socket, "ipc://writer") != 0) { + throw runtime_error(strerror (errno)); + } + for (const auto& suffix:path_suffixes) { metadata_buffer->start_pulse_id = suffix.start_pulse_id; metadata_buffer->stop_pulse_id = suffix.stop_pulse_id; @@ -126,12 +121,6 @@ int main (int argc, char *argv[]) { (char*) (image_buffer.get() + (i_frame * 512 * 1024)), 512 * 1024 * 2, 0); - - if ((i_frame>0) && (i_frame % 100 == 0)) { - // Waiting to send more. - uint64_t test = 0; - zmq_recv(more_socket, &test, sizeof(test), 0); - } } } diff --git a/sf-writer/sf_h5_writer.cpp b/sf-writer/sf_h5_writer.cpp index 73e64aa..b20012d 100644 --- a/sf-writer/sf_h5_writer.cpp +++ b/sf-writer/sf_h5_writer.cpp @@ -35,18 +35,8 @@ int main (int argc, char *argv[]) zmq_ctx_set (ctx, ZMQ_IO_THREADS, 16); auto socket = zmq_socket(ctx, ZMQ_PULL); - auto more_socket = zmq_socket(ctx, ZMQ_PUB); - //TODO: Use ipc? - if (zmq_bind(socket, "tcp://127.0.0.1:50000") != 0) { - throw runtime_error(strerror (errno)); - } - - if (zmq_bind(more_socket, "tcp://127.0.0.1:50001") != 0) { - throw runtime_error(strerror (errno)); - } - - int rcvhwm = 10000; + int rcvhwm = 1000; if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) { throw runtime_error(strerror (errno)); } @@ -56,6 +46,14 @@ int main (int argc, char *argv[]) throw runtime_error(strerror (errno)); } + //TODO: Use ipc? + if (zmq_bind(socket, "ipc://writer") != 0) { + throw runtime_error(strerror (errno)); + } + + + + auto metadata_buffer = make_unique(); auto image_buffer = make_unique(512 * 1024); @@ -96,12 +94,6 @@ int main (int argc, char *argv[]) size_t n_in_progress_frames = received_counter.size(); cout << "n frames in progress " << n_in_progress_frames << endl; - - if (n_in_progress_frames == 0) { - uint64_t test = 0; - zmq_send(more_socket, &test, sizeof(test), 0); - cout << "SENT!!!" << endl; - } } zmq_close(socket);