From fada8a7ec63b5ba1b7f5076dcf01189c59ff0abe Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 11 May 2020 12:34:30 +0200 Subject: [PATCH] Make IPC address a parameter in buffer config --- core-buffer/include/buffer_config.hpp | 2 ++ core-buffer/test/test_LiveRecvModule.cpp | 8 +++----- sf-buffer/src/sf_buffer.cpp | 3 +-- sf-buffer/src/sf_live.cpp | 2 +- sf-buffer/src/sf_stream.cpp | 3 +-- 5 files changed, 8 insertions(+), 10 deletions(-) diff --git a/core-buffer/include/buffer_config.hpp b/core-buffer/include/buffer_config.hpp index 7f42752..e727976 100644 --- a/core-buffer/include/buffer_config.hpp +++ b/core-buffer/include/buffer_config.hpp @@ -40,6 +40,8 @@ namespace core_buffer { // HWM for live stream from buffer. const int BUFFER_ZMQ_SNDHWM = 100; + const std::string BUFFER_LIVE_IPC_URL = "ipc:///tmp/sf-live-"; + // N of IO threads to receive data from modules. const int STREAM_ZMQ_IO_THREADS = 4; diff --git a/core-buffer/test/test_LiveRecvModule.cpp b/core-buffer/test/test_LiveRecvModule.cpp index 7665167..326f5d4 100644 --- a/core-buffer/test/test_LiveRecvModule.cpp +++ b/core-buffer/test/test_LiveRecvModule.cpp @@ -8,19 +8,17 @@ using namespace core_buffer; TEST(LiveRecvModule, basic_interaction) { auto ctx = zmq_ctx_new(); - string ipc_prefix = "ipc:///tmp/sf-live-"; size_t n_modules = 32; size_t n_slots = 5; FastQueue queue(MODULE_N_BYTES * n_modules, n_slots); - LiveRecvModule recv_module(queue, n_modules, ctx, ipc_prefix); + LiveRecvModule recv_module(queue, n_modules, ctx, BUFFER_LIVE_IPC_URL); this_thread::sleep_for(chrono::milliseconds(100)); zmq_ctx_destroy(ctx); } TEST(LiveRecvModule, transfer_test) { auto ctx = zmq_ctx_new(); - string ipc_prefix = "ipc:///tmp/sf-live-"; size_t n_modules = 32; size_t n_slots = 5; @@ -37,7 +35,7 @@ TEST(LiveRecvModule, transfer_test) { } stringstream ipc_addr; - ipc_addr << ipc_prefix << i; + ipc_addr << BUFFER_LIVE_IPC_URL << i; const auto ipc = ipc_addr.str(); if (zmq_bind(sockets[i], ipc.c_str()) != 0) { @@ -45,7 +43,7 @@ TEST(LiveRecvModule, transfer_test) { } } - LiveRecvModule recv_module(queue, n_modules, ctx, ipc_prefix); + LiveRecvModule recv_module(queue, n_modules, ctx, BUFFER_LIVE_IPC_URL); // Nothing should be committed, queue, should be empty. ASSERT_EQ(queue.read(), -1); diff --git a/sf-buffer/src/sf_buffer.cpp b/sf-buffer/src/sf_buffer.cpp index df88c63..7fff24b 100644 --- a/sf-buffer/src/sf_buffer.cpp +++ b/sf-buffer/src/sf_buffer.cpp @@ -33,8 +33,7 @@ int main (int argc, char *argv[]) { int source_id = atoi(argv[4]); stringstream ipc_stream; - // TODO: Move this into config. - ipc_stream << "ipc:///tmp/sf-live-" << source_id; + ipc_stream << BUFFER_LIVE_IPC_URL << source_id; const auto ipc_address = ipc_stream.str(); auto ctx = zmq_ctx_new(); diff --git a/sf-buffer/src/sf_live.cpp b/sf-buffer/src/sf_live.cpp index 8b829a0..9cdd191 100644 --- a/sf-buffer/src/sf_live.cpp +++ b/sf-buffer/src/sf_live.cpp @@ -71,7 +71,7 @@ int main (int argc, char *argv[]) { const uint16_t source_id = (uint16_t) atoi(argv[3]); stringstream ipc_stream; - ipc_stream << "ipc:///tmp/sf-live-" << (int)source_id; + ipc_stream << BUFFER_LIVE_IPC_URL << (int)source_id; const auto ipc_address = ipc_stream.str(); #ifdef DEBUG_OUTPUT diff --git a/sf-buffer/src/sf_stream.cpp b/sf-buffer/src/sf_stream.cpp index 15ce4ec..ce6eeba 100644 --- a/sf-buffer/src/sf_stream.cpp +++ b/sf-buffer/src/sf_stream.cpp @@ -46,9 +46,8 @@ int main (int argc, char *argv[]) auto ctx = zmq_ctx_new(); zmq_ctx_set (ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); - string ipc_prefix = "ipc:///tmp/sf-live-"; - LiveRecvModule recv_module(queue, n_modules, ctx, ipc_prefix); + LiveRecvModule recv_module(queue, n_modules, ctx, BUFFER_LIVE_IPC_URL); // 0mq sockets to streamvis and live analysis void *socket_streamvis = zmq_socket(ctx, ZMQ_PUB);