diff --git a/src/RingBuffer.cpp b/src/RingBuffer.cpp index 31913d8..a196606 100644 --- a/src/RingBuffer.cpp +++ b/src/RingBuffer.cpp @@ -1,7 +1,5 @@ #include "RingBuffer.hpp" #include -#include -#include #include #include #include @@ -135,43 +133,39 @@ pair RingBuffer::read() { FrameMetadata frame_metadata; - while (1) { + frame_metadata_queue_mutex.lock(); - frame_metadata_queue_mutex.lock(); + if (frame_metadata_queue.empty()) { + frame_metadata_queue_mutex.unlock(); - if (frame_metadata_queue.empty()) { - frame_metadata_queue_mutex.unlock(); + return {frame_metadata, NULL}; + } - continue; + frame_metadata = frame_metadata_queue.front(); + frame_metadata_queue.pop_front(); - } else { - frame_metadata = frame_metadata_queue.front(); - frame_metadata_queue.pop_front(); + frame_metadata_queue_mutex.unlock(); - frame_metadata_queue_mutex.unlock(); + #ifdef DEBUG_OUTPUT + cout << "[RingBuffer::read] Received metadata for frame_index " << frame_metadata.frame_index << endl; + #endif - #ifdef DEBUG_OUTPUT - cout << "[RingBuffer::read] Received metadata for frame_index " << frame_metadata.frame_index << endl; - #endif - } + // Check if the references ring buffer slot is valid. + ringbuffer_slots_mutex.lock(); - // Check if the references ring buffer slot is valid. - ringbuffer_slots_mutex.lock(); + if (!ringbuffer_slots[frame_metadata.buffer_slot_index]) { + stringstream error_message; + error_message << "Ring buffer slot referenced in message header " << frame_metadata.buffer_slot_index << " is empty." << endl; - if (!ringbuffer_slots[frame_metadata.buffer_slot_index]) { - stringstream error_message; - error_message << "Ring buffer slot referenced in message header " << frame_metadata.buffer_slot_index << " is empty." << endl; + throw runtime_error(error_message.str()); + } - throw runtime_error(error_message.str()); - } + ringbuffer_slots_mutex.unlock(); - ringbuffer_slots_mutex.unlock(); - - // Memory address of frame in buffer. - char* slot_memory_address = get_buffer_slot_address(frame_metadata.buffer_slot_index); - - return {frame_metadata, slot_memory_address}; - }; + // Memory address of frame in buffer. + char* slot_memory_address = get_buffer_slot_address(frame_metadata.buffer_slot_index); + + return {frame_metadata, slot_memory_address}; } void RingBuffer::release(size_t buffer_slot_index) { diff --git a/src/config.cpp b/src/config.cpp index ce2dadd..9afbb99 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2,7 +2,8 @@ namespace config { // Receiver config. - int n_io_threads = 1; + int zmq_n_io_threads = 1; + int zmq_receive_timeout = 100; // Ring buffer config. size_t ring_buffer_n_slots = 100; diff --git a/src/config.hpp b/src/config.hpp index a0ce1d9..823827d 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -6,7 +6,8 @@ namespace config { - extern int n_io_threads; + extern int zmq_n_io_threads; + extern int zmq_receive_timeout; extern size_t ring_buffer_n_slots; extern uint32_t ring_buffer_read_retry_interval; diff --git a/src/h5_zmq_writer.cpp b/src/h5_zmq_writer.cpp index 608d9e8..5f5fee6 100644 --- a/src/h5_zmq_writer.cpp +++ b/src/h5_zmq_writer.cpp @@ -2,6 +2,8 @@ #include #include #include +#include +#include #include "rapidjson/document.h" #include "config.hpp" @@ -18,6 +20,12 @@ void write(WriterManager *manager, RingBuffer *ring_buffer, string output_file) // Run until the running flag is set or the ring_buffer is empty. while(manager->is_running() || !ring_buffer->is_empty()) { + + if (ring_buffer->is_empty()) { + this_thread::sleep_for(std::chrono::milliseconds(config::ring_buffer_read_retry_interval)); + continue; + } + pair received_data = ring_buffer->read(); writer.write_data(received_data.first.frame_index, @@ -37,10 +45,11 @@ void write(WriterManager *manager, RingBuffer *ring_buffer, string output_file) #endif } -void receive(WriterManager *manager, RingBuffer *ring_buffer, string connect_address, int n_io_threads=1) +void receive(WriterManager *manager, RingBuffer *ring_buffer, string connect_address, int n_io_threads=1, int receive_timeout=-1) { zmq::context_t context(n_io_threads); zmq::socket_t receiver(context, ZMQ_PULL); + receiver.setsockopt(ZMQ_RCVTIMEO, receive_timeout); receiver.connect(connect_address); zmq::message_t message_data; @@ -50,7 +59,9 @@ void receive(WriterManager *manager, RingBuffer *ring_buffer, string connect_add while (manager->is_running()) { // Get the message header. - receiver.recv(&message_data); + if (!receiver.recv(&message_data)){ + continue; + } // Parse JSON header. char* header = static_cast(message_data.data()); @@ -80,8 +91,9 @@ void receive(WriterManager *manager, RingBuffer *ring_buffer, string connect_add void run_writer(string connect_address, string output_file, uint64_t n_images, uint16_t rest_port){ - size_t n_slots = config::n_slots; - int n_io_threads = config::n_io_threads; + size_t n_slots = config::ring_buffer_n_slots; + int n_io_threads = config::zmq_n_io_threads; + int receive_timeout = config::zmq_receive_timeout; WriterManager manager(n_images); RingBuffer ring_buffer(n_slots); @@ -92,14 +104,19 @@ void run_writer(string connect_address, string output_file, uint64_t n_images, u cout << " and output_file " << output_file; cout << " and n_slots " << n_slots; cout << " and n_io_threads " << n_io_threads; + cout << " and receive_timeout " << receive_timeout; cout << endl; #endif - thread receiver_thread(receive, &manager, &ring_buffer, connect_address, n_io_threads); + thread receiver_thread(receive, &manager, &ring_buffer, connect_address, n_io_threads, receive_timeout); thread writer_thread(write, &manager, &ring_buffer, output_file); start_rest_api(manager, rest_port); + #ifdef DEBUG_OUTPUT + cout << "[h5_zmq_writer::run_writer] Rest API stopped." << endl; + #endif + receiver_thread.join(); writer_thread.join(); diff --git a/test/test_RingBuffer.cpp b/test/test_RingBuffer.cpp new file mode 100644 index 0000000..10ceda4 --- /dev/null +++ b/test/test_RingBuffer.cpp @@ -0,0 +1,13 @@ +#include "gtest/gtest.h" +#include "RingBuffer.hpp" + +TEST(RingBufferTest, simple_operation) +{ + + EXPECT_EQ(1000, 1000); +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file