#include #include #include #include "RingBuffer.hpp" using namespace std; template RingBuffer::RingBuffer(size_t n_slots) : n_slots_(n_slots), ringbuffer_slots_(n_slots, 0) { #ifdef DEBUG_OUTPUT using namespace date; using namespace chrono; cout << "[" << system_clock::now() << "]"; cout << "[RingBuffer::RingBuffer]"; cout << " Creating ring buffer"; cout << " with n_slots " << n_slots << endl; #endif } template RingBuffer::~RingBuffer() { if (frame_data_buffer_ != NULL) { free(frame_data_buffer_); frame_data_buffer_ = NULL; } } template void RingBuffer::initialize(const size_t requested_slot_size) { if (is_initialized()) { return; } lock_guard lock(ringbuffer_slots_mutex_); if (initialized_) { if (requested_slot_size > slot_size_) { stringstream err_msg; using namespace date; using namespace chrono; err_msg << "[" << system_clock::now() << "]"; err_msg << "[RingBuffer::initialize]"; err_msg << " Already initialized with smaller slot_size "; err_msg << slot_size_ << " than requested_slot_size "; err_msg << requested_slot_size << endl; throw runtime_error(err_msg.str()); } return; } write_index_ = 0; slot_size_ = requested_slot_size; buffer_size_ = slot_size_ * n_slots_; frame_data_buffer_ = new char[buffer_size_]; buffer_used_slots_ = 0; initialized_ = true; #ifdef DEBUG_OUTPUT using namespace date; using namespace chrono; cout << "[" << system_clock::now() << "]"; cout << "[RingBuffer::initialize]"; cout << " Ringbuffer initialized"; cout << " with slot_size " << slot_size_; cout << " and n_slots " << n_slots_; cout << " and buffer_size " << buffer_size_ << endl; #endif } template char* RingBuffer::reserve(shared_ptr frame_metadata) { if (!is_initialized()) { stringstream err_msg; using namespace date; using namespace chrono; err_msg << "[" << system_clock::now() << "]"; err_msg << "[RingBuffer::reserve]"; err_msg << " Ringbuffer not initialized."; throw runtime_error(err_msg.str()); } if (frame_metadata->frame_bytes_size > slot_size_) { stringstream err_msg; using namespace date; using namespace chrono; err_msg << "[" << system_clock::now() << "]"; err_msg << "[RingBuffer::reserve]"; err_msg << " Received frame index " << frame_metadata->frame_index; err_msg << " that is too large for ring buffer slot."; err_msg << " Slot size " << slot_size_ << ", but frame bytes size "; err_msg << frame_metadata->frame_bytes_size << endl; throw runtime_error(err_msg.str()); } // Check and reserve slot in the buffer. { lock_guard lock(ringbuffer_slots_mutex_); if (!ringbuffer_slots_[write_index_]) { ringbuffer_slots_[write_index_] = true; frame_metadata->buffer_slot_index = write_index_; #ifdef DEBUG_OUTPUT using namespace date; using namespace chrono; cout << "[" << system_clock::now() << "]"; cout << "[RingBuafer::reserve]"; cout << " Ring buffer slot "; cout << frame_metadata->buffer_slot_index; cout << " reserved for frame_index "; cout << frame_metadata->frame_index << endl; #endif write_index_ = (write_index_ + 1) % n_slots_; buffer_used_slots_++; } else { return nullptr; } } return get_buffer_slot_address(frame_metadata->buffer_slot_index); } template void RingBuffer::commit(shared_ptr frame_metadata) { lock_guard lock(frame_metadata_queue_mutex_); frame_metadata_queue_.push_back(frame_metadata); #ifdef DEBUG_OUTPUT using namespace date; using namespace chrono; cout << "[" << system_clock::now() << "]"; cout << "[RingBuffer::commit]"; cout << " Metadata for frame_index " << frame_metadata->frame_index; cout << " added to metadata queue." << endl; #endif } template char* RingBuffer::get_buffer_slot_address(size_t buffer_slot_index) { char* slot_memory_address = frame_data_buffer_ + (buffer_slot_index * slot_size_); // Check if the memory address is valid. if (slot_memory_address > frame_data_buffer_ + buffer_size_) { stringstream err_msg; using namespace date; using namespace chrono; err_msg << "[" << system_clock::now() << "]"; err_msg << "[RingBuffer::get_buffer_slot_address]"; err_msg << " Ring buffer address out of range." << endl; throw runtime_error(err_msg.str()); } return slot_memory_address; } template pair, char*> RingBuffer::read() { shared_ptr frame_metadata; { lock_guard lock(frame_metadata_queue_mutex_); // A NULL char* means no waiting data in the ring buffer. if (frame_metadata_queue_.empty()) { return {NULL, NULL}; } frame_metadata = frame_metadata_queue_.front(); frame_metadata_queue_.pop_front(); } #ifdef DEBUG_OUTPUT using namespace date; using namespace chrono; cout << "[" << system_clock::now() << "]"; cout << "[RingBuffer::read]"; cout << " Received metadata for frame_index "; cout << frame_metadata->frame_index << endl; #endif // Check if the references ring buffer slot is valid. { lock_guard lock(ringbuffer_slots_mutex_); if (!ringbuffer_slots_[frame_metadata->buffer_slot_index]) { stringstream err_msg; using namespace date; using namespace chrono; err_msg << "[" << system_clock::now() << "]"; err_msg << "[RingBuffer::read]"; err_msg << " Ring buffer slot"; err_msg << " referenced in message header "; err_msg << frame_metadata->buffer_slot_index << " is empty."; err_msg << endl; throw runtime_error(err_msg.str()); } } return {frame_metadata, get_buffer_slot_address(frame_metadata->buffer_slot_index)}; } template void RingBuffer::release(size_t buffer_slot_index) { if (buffer_slot_index >= n_slots_) { stringstream err_msg; using namespace date; using namespace chrono; err_msg << "[" << system_clock::now() << "]"; err_msg << "[RingBuffer::release]"; err_msg << " Slot index " << buffer_slot_index << " is out of range."; err_msg << " Ring buffer n_slots = " << n_slots_ << endl; throw runtime_error(err_msg.str()); } { lock_guard lock(ringbuffer_slots_mutex_); if (ringbuffer_slots_[buffer_slot_index]) { ringbuffer_slots_[buffer_slot_index] = false; buffer_used_slots_--; } else { stringstream err_msg; using namespace date; using namespace chrono; err_msg << "[" << system_clock::now() << "]"; err_msg << "[RingBuffer::release]"; err_msg << " Cannot release empty ring buffer slot "; err_msg << buffer_slot_index << endl; throw runtime_error(err_msg.str()); } } } template bool RingBuffer::is_empty() { lock_guard lock(ringbuffer_slots_mutex_); return buffer_used_slots_ == 0; } template bool RingBuffer::is_initialized() { return initialized_.load(memory_order_relaxed); } template void RingBuffer::clear() { lock_guard lock_slots(ringbuffer_slots_mutex_); lock_guard lock_metadata(frame_metadata_queue_mutex_); write_index_ = 0; buffer_used_slots_ = 0; ringbuffer_slots_ = vector(n_slots_, 0); frame_metadata_queue_.clear(); } template size_t RingBuffer::get_slot_size() { return slot_size_; } template class RingBuffer; template class RingBuffer;