From 7234764d71e2b8e4a0b0445ae7be89fb3b5b2e34 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Thu, 26 Mar 2020 16:55:47 +0100 Subject: [PATCH] Improve RingBuffer --- core-writer/include/RingBuffer.hpp | 23 ++-- core-writer/src/RingBuffer.cpp | 196 ++++++++++++++--------------- 2 files changed, 110 insertions(+), 109 deletions(-) diff --git a/core-writer/include/RingBuffer.hpp b/core-writer/include/RingBuffer.hpp index 6081e88..03dc0bd 100644 --- a/core-writer/include/RingBuffer.hpp +++ b/core-writer/include/RingBuffer.hpp @@ -30,20 +30,20 @@ struct FrameMetadata class RingBuffer { // Initialized in constructor. - size_t n_slots = 0; - std::vector ringbuffer_slots; + size_t n_slots_ = 0; + std::vector ringbuffer_slots_; // Set in initialize(). - size_t slot_size = 0; - size_t buffer_size = 0; - char* frame_data_buffer = NULL; - size_t write_index = 0; - size_t buffer_used_slots = 0; - bool ring_buffer_initialized = false; + size_t slot_size_ = 0; + size_t buffer_size_ = 0; + char* frame_data_buffer_ = NULL; + size_t write_index_ = 0; + size_t buffer_used_slots_ = 0; + bool ring_buffer_initialized_ = false; - std::list< std::shared_ptr > frame_metadata_queue; - std::mutex frame_metadata_queue_mutex; - std::mutex ringbuffer_slots_mutex; + std::list< std::shared_ptr > frame_metadata_queue_; + std::mutex frame_metadata_queue_mutex_; + std::mutex ringbuffer_slots_mutex_; char* get_buffer_slot_address(size_t buffer_slot_index); @@ -56,6 +56,7 @@ class RingBuffer void commit(std::shared_ptr metadata); std::pair, char*> read(); void release(size_t buffer_slot_index); + bool is_empty(); void clear(); size_t get_slot_size(); diff --git a/core-writer/src/RingBuffer.cpp b/core-writer/src/RingBuffer.cpp index a92f4de..42627ee 100644 --- a/core-writer/src/RingBuffer.cpp +++ b/core-writer/src/RingBuffer.cpp @@ -1,6 +1,4 @@ #include -#include -#include #include #include @@ -8,7 +6,9 @@ using namespace std; -RingBuffer::RingBuffer(size_t n_slots) : n_slots(n_slots), ringbuffer_slots(n_slots, 0) +RingBuffer::RingBuffer(size_t n_slots) : + n_slots_(n_slots), + ringbuffer_slots_(n_slots, 0) { #ifdef DEBUG_OUTPUT using namespace date; @@ -21,24 +21,24 @@ RingBuffer::RingBuffer(size_t n_slots) : n_slots(n_slots), ringbuffer_slots(n_sl RingBuffer::~RingBuffer() { - // If the frame buffer is allocated, free it. - if (frame_data_buffer != NULL) { - free(frame_data_buffer); - frame_data_buffer = NULL; + if (frame_data_buffer_ != NULL) { + free(frame_data_buffer_); + frame_data_buffer_ = NULL; } } void RingBuffer::initialize(size_t slot_size) { - if (frame_data_buffer) { - stringstream error_message; + if (frame_data_buffer_) { + stringstream err_msg; + using namespace date; using namespace chrono; - error_message << "[" << system_clock::now() << "]"; - error_message << "[RingBuffer::initialize] Ring buffer"; - error_message << " already initialized." << endl; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[RingBuffer::initialize]"; + err_msg << " Ring buffer already initialized." << endl; - throw runtime_error(error_message.str()); + throw runtime_error(err_msg.str()); } #ifdef DEBUG_OUTPUT @@ -49,51 +49,51 @@ void RingBuffer::initialize(size_t slot_size) cout << " with slot_size " << slot_size << endl; #endif - this->write_index = 0; - this->slot_size = slot_size; - this->buffer_size = slot_size * n_slots; - this->frame_data_buffer = new char[buffer_size]; - this->buffer_used_slots = 0; - this->ring_buffer_initialized = true; + this->write_index_ = 0; + this->slot_size_ = slot_size; + this->buffer_size_ = slot_size * n_slots_; + this->frame_data_buffer_ = new char[buffer_size_]; + this->buffer_used_slots_ = 0; + this->ring_buffer_initialized_ = true; #ifdef DEBUG_OUTPUT using namespace date; using namespace chrono; cout << "[" << system_clock::now() << "]"; - cout << "[RingBuffer::initialize] Total buffer_size " << buffer_size << endl; + cout << "[RingBuffer::initialize] "; + cout << " Total buffer_size " << buffer_size_ << endl; #endif } char* RingBuffer::reserve(shared_ptr frame_metadata) { - if (!ring_buffer_initialized) { + if (!ring_buffer_initialized_) { initialize(frame_metadata->frame_bytes_size); } - // All images must fit in the ring buffer slot. - if (frame_metadata->frame_bytes_size > slot_size) { - stringstream error_message; + if (frame_metadata->frame_bytes_size > slot_size_) { + stringstream err_msg; using namespace date; using namespace chrono; - error_message << "[" << system_clock::now() << "]"; - error_message << "[RingBuffer::reserve] Received frame index "; - error_message << frame_metadata->frame_index; - error_message << " that is too large for ring buffer slot. "; - error_message << "Slot size " << slot_size << ", but frame bytes size "; - error_message << frame_metadata->frame_bytes_size << endl; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[RingBuffer::reserve] Received frame index "; + err_msg << frame_metadata->frame_index; + err_msg << " that is too large for ring buffer slot. "; + err_msg << "Slot size " << slot_size_ << ", but frame bytes size "; + err_msg << frame_metadata->frame_bytes_size << endl; - throw runtime_error(error_message.str()); + throw runtime_error(err_msg.str()); } // Check and reserve slot in the buffer. { - lock_guard lock(ringbuffer_slots_mutex); + lock_guard lock(ringbuffer_slots_mutex_); - if (!ringbuffer_slots[write_index]) { - ringbuffer_slots[write_index] = 1; + if (!ringbuffer_slots_[write_index_]) { + ringbuffer_slots_[write_index_] = 1; - frame_metadata->buffer_slot_index = write_index; + frame_metadata->buffer_slot_index = write_index_; #ifdef DEBUG_OUTPUT using namespace date; @@ -105,19 +105,19 @@ char* RingBuffer::reserve(shared_ptr frame_metadata) cout << frame_metadata->frame_index << endl; #endif - write_index = (write_index + 1) % n_slots; - buffer_used_slots++; + write_index_ = (write_index_ + 1) % n_slots_; + buffer_used_slots_++; } else { - stringstream error_message; + stringstream err_msg; using namespace date; using namespace chrono; - error_message << "[" << system_clock::now() << "]"; - error_message << "[RingBuffer::reserve] Ring buffer is full."; - error_message << " Collision at write_index = " << write_index << endl; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[RingBuffer::reserve] Ring buffer is full."; + err_msg << " Collision at write_index = " << write_index_ << endl; - throw runtime_error(error_message.str()); + throw runtime_error(err_msg.str()); } } @@ -127,34 +127,36 @@ char* RingBuffer::reserve(shared_ptr frame_metadata) void RingBuffer::commit(shared_ptr frame_metadata) { - lock_guard lock(frame_metadata_queue_mutex); + lock_guard lock(frame_metadata_queue_mutex_); - frame_metadata_queue.push_back(frame_metadata); + frame_metadata_queue_.push_back(frame_metadata); #ifdef DEBUG_OUTPUT using namespace date; - cout << "[" << std::chrono::system_clock::now() << "]"; - cout << "[RingBuffer::write] Metadata for frame_index "; - cout << frame_metadata->frame_index << " added to metadata queue." << endl; + using namespace chrono; + cout << "[" << system_clock::now() << "]"; + cout << "[RingBuffer::commit] Metadata for frame_index "; + cout << frame_metadata->frame_index; + cout << " added to metadata queue." << endl; #endif } char* RingBuffer::get_buffer_slot_address(size_t buffer_slot_index) { - char* slot_memory_address = frame_data_buffer + (buffer_slot_index * slot_size); + char* slot_memory_address = + frame_data_buffer_ + (buffer_slot_index * slot_size_); // Check if the memory address is valid. - if (slot_memory_address > frame_data_buffer + buffer_size) { - stringstream error_message; + if (slot_memory_address > frame_data_buffer_ + buffer_size_) { + stringstream err_msg; using namespace date; using namespace chrono; - error_message << "[" << system_clock::now() << "]"; - error_message << "[RingBuffer::get_buffer_slot_address] Calculated"; - error_message << " ring buffer address is out of bound for buffer_slot_index "; - error_message << buffer_slot_index << endl; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[RingBuffer::get_buffer_slot_address]"; + err_msg << " Ring buffer address out of range." << endl; - throw runtime_error(error_message.str()); + throw runtime_error(err_msg.str()); } return slot_memory_address; @@ -165,15 +167,15 @@ pair, char*> RingBuffer::read() shared_ptr frame_metadata; { - lock_guard lock(frame_metadata_queue_mutex); + lock_guard lock(frame_metadata_queue_mutex_); - // A NULL char* indicates that there are no available data in the ring buffer. - if (frame_metadata_queue.empty()) { + // A NULL char* means no waiting data in the ring buffer. + if (frame_metadata_queue_.empty()) { return {NULL, NULL}; } - frame_metadata = frame_metadata_queue.front(); - frame_metadata_queue.pop_front(); + frame_metadata = frame_metadata_queue_.front(); + frame_metadata_queue_.pop_front(); } #ifdef DEBUG_OUTPUT @@ -186,85 +188,83 @@ pair, char*> RingBuffer::read() // Check if the references ring buffer slot is valid. { - lock_guard lock(ringbuffer_slots_mutex); + lock_guard lock(ringbuffer_slots_mutex_); - if (!ringbuffer_slots[frame_metadata->buffer_slot_index]) { - stringstream error_message; + if (!ringbuffer_slots_[frame_metadata->buffer_slot_index]) { + stringstream err_msg; using namespace date; using namespace chrono; - error_message << "[" << system_clock::now() << "]"; - error_message << "[RingBuffer::read] Ring buffer slot"; - error_message << " referenced in message header "; - error_message << frame_metadata->buffer_slot_index << " is empty." << endl; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[RingBuffer::read] Ring buffer slot"; + err_msg << " referenced in message header "; + err_msg << frame_metadata->buffer_slot_index << " is empty."; + err_msg << endl; - throw runtime_error(error_message.str()); + throw runtime_error(err_msg.str()); } } - - char* slot_memory_address = get_buffer_slot_address(frame_metadata->buffer_slot_index); - return {frame_metadata, slot_memory_address}; + return {frame_metadata, + get_buffer_slot_address(frame_metadata->buffer_slot_index)}; } void RingBuffer::release(size_t buffer_slot_index) { - // Cannot release a slot index that is out of range. - if (buffer_slot_index >= n_slots) { - stringstream error_message; + if (buffer_slot_index >= n_slots_) { + stringstream err_msg; using namespace date; using namespace chrono; - error_message << "[" << system_clock::now() << "]"; - error_message << "[RingBuffer::release] Slot index to release "; - error_message << buffer_slot_index << " is out of range."; - error_message << " Ring buffer n_slots = " << n_slots << endl; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[RingBuffer::release] Slot index "; + err_msg << buffer_slot_index << " is out of range."; + err_msg << " Ring buffer n_slots = " << n_slots_ << endl; - throw runtime_error(error_message.str()); + throw runtime_error(err_msg.str()); } { - lock_guard lock(ringbuffer_slots_mutex); + lock_guard lock(ringbuffer_slots_mutex_); - if (ringbuffer_slots[buffer_slot_index]) { - ringbuffer_slots[buffer_slot_index] = 0; + if (ringbuffer_slots_[buffer_slot_index]) { + ringbuffer_slots_[buffer_slot_index] = 0; - buffer_used_slots--; + buffer_used_slots_--; } else { - stringstream error_message; + stringstream err_msg; using namespace date; using namespace chrono; - error_message << "[" << system_clock::now() << "]"; - error_message << "[RingBuffer::release] Cannot release empty"; - error_message << " ring buffer slot " << buffer_slot_index << endl; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[RingBuffer::release] Cannot release empty"; + err_msg << " ring buffer slot " << buffer_slot_index << endl; - throw runtime_error(error_message.str()); + throw runtime_error(err_msg.str()); } } } bool RingBuffer::is_empty() { - lock_guard lock(ringbuffer_slots_mutex); + lock_guard lock(ringbuffer_slots_mutex_); - return buffer_used_slots == 0; + return buffer_used_slots_ == 0; } void RingBuffer::clear() { - lock_guard lock_slots(ringbuffer_slots_mutex); - lock_guard lock_metadata(frame_metadata_queue_mutex); + lock_guard lock_slots(ringbuffer_slots_mutex_); + lock_guard lock_metadata(frame_metadata_queue_mutex_); - write_index = 0; - buffer_used_slots = 0; - ringbuffer_slots = vector(n_slots, 0); - frame_metadata_queue.clear(); + write_index_ = 0; + buffer_used_slots_ = 0; + ringbuffer_slots_ = vector(n_slots_, 0); + frame_metadata_queue_.clear(); } size_t RingBuffer::get_slot_size() { - return slot_size; + return slot_size_; } -