Debugging and refactoring

This commit is contained in:
2018-01-10 16:38:18 +01:00
parent 1acf59a08a
commit ba2fc91c74
5 changed files with 62 additions and 36 deletions
+23 -29
View File
@@ -1,7 +1,5 @@
#include "RingBuffer.hpp"
#include <stdexcept>
#include <chrono>
#include <thread>
#include <sstream>
#include <cstring>
#include <iostream>
@@ -135,43 +133,39 @@ pair<FrameMetadata, char*> RingBuffer::read()
{
FrameMetadata frame_metadata;
while (1) {
frame_metadata_queue_mutex.lock();
frame_metadata_queue_mutex.lock();
if (frame_metadata_queue.empty()) {
frame_metadata_queue_mutex.unlock();
if (frame_metadata_queue.empty()) {
frame_metadata_queue_mutex.unlock();
return {frame_metadata, NULL};
}
continue;
frame_metadata = frame_metadata_queue.front();
frame_metadata_queue.pop_front();
} else {
frame_metadata = frame_metadata_queue.front();
frame_metadata_queue.pop_front();
frame_metadata_queue_mutex.unlock();
frame_metadata_queue_mutex.unlock();
#ifdef DEBUG_OUTPUT
cout << "[RingBuffer::read] Received metadata for frame_index " << frame_metadata.frame_index << endl;
#endif
#ifdef DEBUG_OUTPUT
cout << "[RingBuffer::read] Received metadata for frame_index " << frame_metadata.frame_index << endl;
#endif
}
// Check if the references ring buffer slot is valid.
ringbuffer_slots_mutex.lock();
// Check if the references ring buffer slot is valid.
ringbuffer_slots_mutex.lock();
if (!ringbuffer_slots[frame_metadata.buffer_slot_index]) {
stringstream error_message;
error_message << "Ring buffer slot referenced in message header " << frame_metadata.buffer_slot_index << " is empty." << endl;
if (!ringbuffer_slots[frame_metadata.buffer_slot_index]) {
stringstream error_message;
error_message << "Ring buffer slot referenced in message header " << frame_metadata.buffer_slot_index << " is empty." << endl;
throw runtime_error(error_message.str());
}
throw runtime_error(error_message.str());
}
ringbuffer_slots_mutex.unlock();
ringbuffer_slots_mutex.unlock();
// Memory address of frame in buffer.
char* slot_memory_address = get_buffer_slot_address(frame_metadata.buffer_slot_index);
return {frame_metadata, slot_memory_address};
};
// Memory address of frame in buffer.
char* slot_memory_address = get_buffer_slot_address(frame_metadata.buffer_slot_index);
return {frame_metadata, slot_memory_address};
}
void RingBuffer::release(size_t buffer_slot_index) {
+2 -1
View File
@@ -2,7 +2,8 @@
namespace config {
// Receiver config.
int n_io_threads = 1;
int zmq_n_io_threads = 1;
int zmq_receive_timeout = 100;
// Ring buffer config.
size_t ring_buffer_n_slots = 100;
+2 -1
View File
@@ -6,7 +6,8 @@
namespace config
{
extern int n_io_threads;
extern int zmq_n_io_threads;
extern int zmq_receive_timeout;
extern size_t ring_buffer_n_slots;
extern uint32_t ring_buffer_read_retry_interval;
+22 -5
View File
@@ -2,6 +2,8 @@
#include <zmq.hpp>
#include <cstdlib>
#include <thread>
#include <chrono>
#include <thread>
#include "rapidjson/document.h"
#include "config.hpp"
@@ -18,6 +20,12 @@ void write(WriterManager *manager, RingBuffer *ring_buffer, string output_file)
// Run until the running flag is set or the ring_buffer is empty.
while(manager->is_running() || !ring_buffer->is_empty()) {
if (ring_buffer->is_empty()) {
this_thread::sleep_for(std::chrono::milliseconds(config::ring_buffer_read_retry_interval));
continue;
}
pair<FrameMetadata, char*> received_data = ring_buffer->read();
writer.write_data(received_data.first.frame_index,
@@ -37,10 +45,11 @@ void write(WriterManager *manager, RingBuffer *ring_buffer, string output_file)
#endif
}
void receive(WriterManager *manager, RingBuffer *ring_buffer, string connect_address, int n_io_threads=1)
void receive(WriterManager *manager, RingBuffer *ring_buffer, string connect_address, int n_io_threads=1, int receive_timeout=-1)
{
zmq::context_t context(n_io_threads);
zmq::socket_t receiver(context, ZMQ_PULL);
receiver.setsockopt(ZMQ_RCVTIMEO, receive_timeout);
receiver.connect(connect_address);
zmq::message_t message_data;
@@ -50,7 +59,9 @@ void receive(WriterManager *manager, RingBuffer *ring_buffer, string connect_add
while (manager->is_running()) {
// Get the message header.
receiver.recv(&message_data);
if (!receiver.recv(&message_data)){
continue;
}
// Parse JSON header.
char* header = static_cast<char*>(message_data.data());
@@ -80,8 +91,9 @@ void receive(WriterManager *manager, RingBuffer *ring_buffer, string connect_add
void run_writer(string connect_address, string output_file, uint64_t n_images, uint16_t rest_port){
size_t n_slots = config::n_slots;
int n_io_threads = config::n_io_threads;
size_t n_slots = config::ring_buffer_n_slots;
int n_io_threads = config::zmq_n_io_threads;
int receive_timeout = config::zmq_receive_timeout;
WriterManager manager(n_images);
RingBuffer ring_buffer(n_slots);
@@ -92,14 +104,19 @@ void run_writer(string connect_address, string output_file, uint64_t n_images, u
cout << " and output_file " << output_file;
cout << " and n_slots " << n_slots;
cout << " and n_io_threads " << n_io_threads;
cout << " and receive_timeout " << receive_timeout;
cout << endl;
#endif
thread receiver_thread(receive, &manager, &ring_buffer, connect_address, n_io_threads);
thread receiver_thread(receive, &manager, &ring_buffer, connect_address, n_io_threads, receive_timeout);
thread writer_thread(write, &manager, &ring_buffer, output_file);
start_rest_api(manager, rest_port);
#ifdef DEBUG_OUTPUT
cout << "[h5_zmq_writer::run_writer] Rest API stopped." << endl;
#endif
receiver_thread.join();
writer_thread.join();