diff --git a/core-buffer/include/BufferUtils.hpp b/core-buffer/include/BufferUtils.hpp index 37f9f62..5216a2d 100644 --- a/core-buffer/include/BufferUtils.hpp +++ b/core-buffer/include/BufferUtils.hpp @@ -18,6 +18,11 @@ namespace BufferUtils const std::string& filename_to_write); void create_destination_folder(const std::string& output_file); + + void* bind_socket( + void* ctx, const std::string& detector_name, const int source_id); + void* connect_socket( + void* ctx, const std::string& detector_name, const int source_id); } #endif //BUFFER_UTILS_HPP diff --git a/core-buffer/include/buffer_config.hpp b/core-buffer/include/buffer_config.hpp index 2f1f662..73c97b5 100644 --- a/core-buffer/include/buffer_config.hpp +++ b/core-buffer/include/buffer_config.hpp @@ -38,6 +38,8 @@ namespace buffer_config { const int BUFFER_UDP_US_TIMEOUT = 2 * 1000; // HWM for live stream from buffer. const int BUFFER_ZMQ_SNDHWM = 100; + // HWM for live stream from buffer. + const int BUFFER_ZMQ_RCVHWM = 100; // IPC address of the live stream. const std::string BUFFER_LIVE_IPC_URL = "ipc:///tmp/sf-live-"; // Number of image slots in ram buffer - 10 seconds should be enough diff --git a/core-buffer/src/BufferUtils.cpp b/core-buffer/src/BufferUtils.cpp index c2a60a4..a7346aa 100644 --- a/core-buffer/src/BufferUtils.cpp +++ b/core-buffer/src/BufferUtils.cpp @@ -2,8 +2,10 @@ #include #include +#include using namespace std; +using namespace buffer_config; string BufferUtils::get_filename( std::string detector_folder, @@ -60,4 +62,63 @@ void BufferUtils::create_destination_folder(const string& output_file) string create_folder_command("mkdir -p " + output_folder); system(create_folder_command.c_str()); } -} \ No newline at end of file +} + +void* BufferUtils::connect_socket( + void* ctx, const string& detector_name, const int source_id) +{ + string ipc_address = BUFFER_LIVE_IPC_URL + + detector_name + "-" + + to_string(source_id); + + void* socket = zmq_socket(ctx, ZMQ_SUB); + if (socket == nullptr) { + throw runtime_error(zmq_strerror(errno)); + } + + int rcvhwm = BUFFER_ZMQ_RCVHWM; + if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + int linger = 0; + if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + if (zmq_connect(socket, ipc_address.c_str()) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + if (zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + return socket; +} + +void* BufferUtils::bind_socket( + void* ctx, const string& detector_name, const int source_id) +{ + string ipc_address = BUFFER_LIVE_IPC_URL + + detector_name + "-" + + to_string(source_id); + + void* socket = zmq_socket(ctx, ZMQ_PUB); + + const int sndhwm = BUFFER_ZMQ_SNDHWM; + if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + const int linger = 0; + if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + if (zmq_bind(socket, ipc_address.c_str()) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + return socket; +}