mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-01 13:22:24 +02:00
Improve RingBuffer
This commit is contained in:
@@ -30,20 +30,20 @@ struct FrameMetadata
|
||||
class RingBuffer
|
||||
{
|
||||
// Initialized in constructor.
|
||||
size_t n_slots = 0;
|
||||
std::vector<bool> ringbuffer_slots;
|
||||
size_t n_slots_ = 0;
|
||||
std::vector<bool> ringbuffer_slots_;
|
||||
|
||||
// Set in initialize().
|
||||
size_t slot_size = 0;
|
||||
size_t buffer_size = 0;
|
||||
char* frame_data_buffer = NULL;
|
||||
size_t write_index = 0;
|
||||
size_t buffer_used_slots = 0;
|
||||
bool ring_buffer_initialized = false;
|
||||
size_t slot_size_ = 0;
|
||||
size_t buffer_size_ = 0;
|
||||
char* frame_data_buffer_ = NULL;
|
||||
size_t write_index_ = 0;
|
||||
size_t buffer_used_slots_ = 0;
|
||||
bool ring_buffer_initialized_ = false;
|
||||
|
||||
std::list< std::shared_ptr<FrameMetadata> > frame_metadata_queue;
|
||||
std::mutex frame_metadata_queue_mutex;
|
||||
std::mutex ringbuffer_slots_mutex;
|
||||
std::list< std::shared_ptr<FrameMetadata> > frame_metadata_queue_;
|
||||
std::mutex frame_metadata_queue_mutex_;
|
||||
std::mutex ringbuffer_slots_mutex_;
|
||||
|
||||
char* get_buffer_slot_address(size_t buffer_slot_index);
|
||||
|
||||
@@ -56,6 +56,7 @@ class RingBuffer
|
||||
void commit(std::shared_ptr<FrameMetadata> metadata);
|
||||
std::pair<std::shared_ptr<FrameMetadata>, char*> read();
|
||||
void release(size_t buffer_slot_index);
|
||||
|
||||
bool is_empty();
|
||||
void clear();
|
||||
size_t get_slot_size();
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
#include <stdexcept>
|
||||
#include <sstream>
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
#include <cstddef>
|
||||
|
||||
@@ -8,7 +6,9 @@
|
||||
|
||||
using namespace std;
|
||||
|
||||
RingBuffer::RingBuffer(size_t n_slots) : n_slots(n_slots), ringbuffer_slots(n_slots, 0)
|
||||
RingBuffer::RingBuffer(size_t n_slots) :
|
||||
n_slots_(n_slots),
|
||||
ringbuffer_slots_(n_slots, 0)
|
||||
{
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
@@ -21,24 +21,24 @@ RingBuffer::RingBuffer(size_t n_slots) : n_slots(n_slots), ringbuffer_slots(n_sl
|
||||
|
||||
RingBuffer::~RingBuffer()
|
||||
{
|
||||
// If the frame buffer is allocated, free it.
|
||||
if (frame_data_buffer != NULL) {
|
||||
free(frame_data_buffer);
|
||||
frame_data_buffer = NULL;
|
||||
if (frame_data_buffer_ != NULL) {
|
||||
free(frame_data_buffer_);
|
||||
frame_data_buffer_ = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void RingBuffer::initialize(size_t slot_size)
|
||||
{
|
||||
if (frame_data_buffer) {
|
||||
stringstream error_message;
|
||||
if (frame_data_buffer_) {
|
||||
stringstream err_msg;
|
||||
|
||||
using namespace date;
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::initialize] Ring buffer";
|
||||
error_message << " already initialized." << endl;
|
||||
err_msg << "[" << system_clock::now() << "]";
|
||||
err_msg << "[RingBuffer::initialize]";
|
||||
err_msg << " Ring buffer already initialized." << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
throw runtime_error(err_msg.str());
|
||||
}
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
@@ -49,51 +49,51 @@ void RingBuffer::initialize(size_t slot_size)
|
||||
cout << " with slot_size " << slot_size << endl;
|
||||
#endif
|
||||
|
||||
this->write_index = 0;
|
||||
this->slot_size = slot_size;
|
||||
this->buffer_size = slot_size * n_slots;
|
||||
this->frame_data_buffer = new char[buffer_size];
|
||||
this->buffer_used_slots = 0;
|
||||
this->ring_buffer_initialized = true;
|
||||
this->write_index_ = 0;
|
||||
this->slot_size_ = slot_size;
|
||||
this->buffer_size_ = slot_size * n_slots_;
|
||||
this->frame_data_buffer_ = new char[buffer_size_];
|
||||
this->buffer_used_slots_ = 0;
|
||||
this->ring_buffer_initialized_ = true;
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
using namespace chrono;
|
||||
cout << "[" << system_clock::now() << "]";
|
||||
cout << "[RingBuffer::initialize] Total buffer_size " << buffer_size << endl;
|
||||
cout << "[RingBuffer::initialize] ";
|
||||
cout << " Total buffer_size " << buffer_size_ << endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
char* RingBuffer::reserve(shared_ptr<FrameMetadata> frame_metadata)
|
||||
{
|
||||
if (!ring_buffer_initialized) {
|
||||
if (!ring_buffer_initialized_) {
|
||||
initialize(frame_metadata->frame_bytes_size);
|
||||
}
|
||||
|
||||
// All images must fit in the ring buffer slot.
|
||||
if (frame_metadata->frame_bytes_size > slot_size) {
|
||||
stringstream error_message;
|
||||
if (frame_metadata->frame_bytes_size > slot_size_) {
|
||||
stringstream err_msg;
|
||||
|
||||
using namespace date;
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::reserve] Received frame index ";
|
||||
error_message << frame_metadata->frame_index;
|
||||
error_message << " that is too large for ring buffer slot. ";
|
||||
error_message << "Slot size " << slot_size << ", but frame bytes size ";
|
||||
error_message << frame_metadata->frame_bytes_size << endl;
|
||||
err_msg << "[" << system_clock::now() << "]";
|
||||
err_msg << "[RingBuffer::reserve] Received frame index ";
|
||||
err_msg << frame_metadata->frame_index;
|
||||
err_msg << " that is too large for ring buffer slot. ";
|
||||
err_msg << "Slot size " << slot_size_ << ", but frame bytes size ";
|
||||
err_msg << frame_metadata->frame_bytes_size << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
throw runtime_error(err_msg.str());
|
||||
}
|
||||
|
||||
// Check and reserve slot in the buffer.
|
||||
{
|
||||
lock_guard<mutex> lock(ringbuffer_slots_mutex);
|
||||
lock_guard<mutex> lock(ringbuffer_slots_mutex_);
|
||||
|
||||
if (!ringbuffer_slots[write_index]) {
|
||||
ringbuffer_slots[write_index] = 1;
|
||||
if (!ringbuffer_slots_[write_index_]) {
|
||||
ringbuffer_slots_[write_index_] = 1;
|
||||
|
||||
frame_metadata->buffer_slot_index = write_index;
|
||||
frame_metadata->buffer_slot_index = write_index_;
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
@@ -105,19 +105,19 @@ char* RingBuffer::reserve(shared_ptr<FrameMetadata> frame_metadata)
|
||||
cout << frame_metadata->frame_index << endl;
|
||||
#endif
|
||||
|
||||
write_index = (write_index + 1) % n_slots;
|
||||
buffer_used_slots++;
|
||||
write_index_ = (write_index_ + 1) % n_slots_;
|
||||
buffer_used_slots_++;
|
||||
|
||||
} else {
|
||||
stringstream error_message;
|
||||
stringstream err_msg;
|
||||
|
||||
using namespace date;
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::reserve] Ring buffer is full.";
|
||||
error_message << " Collision at write_index = " << write_index << endl;
|
||||
err_msg << "[" << system_clock::now() << "]";
|
||||
err_msg << "[RingBuffer::reserve] Ring buffer is full.";
|
||||
err_msg << " Collision at write_index = " << write_index_ << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
throw runtime_error(err_msg.str());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -127,34 +127,36 @@ char* RingBuffer::reserve(shared_ptr<FrameMetadata> frame_metadata)
|
||||
|
||||
void RingBuffer::commit(shared_ptr<FrameMetadata> frame_metadata)
|
||||
{
|
||||
lock_guard<mutex> lock(frame_metadata_queue_mutex);
|
||||
lock_guard<mutex> lock(frame_metadata_queue_mutex_);
|
||||
|
||||
frame_metadata_queue.push_back(frame_metadata);
|
||||
frame_metadata_queue_.push_back(frame_metadata);
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
cout << "[" << std::chrono::system_clock::now() << "]";
|
||||
cout << "[RingBuffer::write] Metadata for frame_index ";
|
||||
cout << frame_metadata->frame_index << " added to metadata queue." << endl;
|
||||
using namespace chrono;
|
||||
cout << "[" << system_clock::now() << "]";
|
||||
cout << "[RingBuffer::commit] Metadata for frame_index ";
|
||||
cout << frame_metadata->frame_index;
|
||||
cout << " added to metadata queue." << endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
char* RingBuffer::get_buffer_slot_address(size_t buffer_slot_index)
|
||||
{
|
||||
char* slot_memory_address = frame_data_buffer + (buffer_slot_index * slot_size);
|
||||
char* slot_memory_address =
|
||||
frame_data_buffer_ + (buffer_slot_index * slot_size_);
|
||||
|
||||
// Check if the memory address is valid.
|
||||
if (slot_memory_address > frame_data_buffer + buffer_size) {
|
||||
stringstream error_message;
|
||||
if (slot_memory_address > frame_data_buffer_ + buffer_size_) {
|
||||
stringstream err_msg;
|
||||
|
||||
using namespace date;
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::get_buffer_slot_address] Calculated";
|
||||
error_message << " ring buffer address is out of bound for buffer_slot_index ";
|
||||
error_message << buffer_slot_index << endl;
|
||||
err_msg << "[" << system_clock::now() << "]";
|
||||
err_msg << "[RingBuffer::get_buffer_slot_address]";
|
||||
err_msg << " Ring buffer address out of range." << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
throw runtime_error(err_msg.str());
|
||||
}
|
||||
|
||||
return slot_memory_address;
|
||||
@@ -165,15 +167,15 @@ pair<shared_ptr<FrameMetadata>, char*> RingBuffer::read()
|
||||
shared_ptr<FrameMetadata> frame_metadata;
|
||||
|
||||
{
|
||||
lock_guard<mutex> lock(frame_metadata_queue_mutex);
|
||||
lock_guard<mutex> lock(frame_metadata_queue_mutex_);
|
||||
|
||||
// A NULL char* indicates that there are no available data in the ring buffer.
|
||||
if (frame_metadata_queue.empty()) {
|
||||
// A NULL char* means no waiting data in the ring buffer.
|
||||
if (frame_metadata_queue_.empty()) {
|
||||
return {NULL, NULL};
|
||||
}
|
||||
|
||||
frame_metadata = frame_metadata_queue.front();
|
||||
frame_metadata_queue.pop_front();
|
||||
frame_metadata = frame_metadata_queue_.front();
|
||||
frame_metadata_queue_.pop_front();
|
||||
}
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
@@ -186,85 +188,83 @@ pair<shared_ptr<FrameMetadata>, char*> RingBuffer::read()
|
||||
|
||||
// Check if the references ring buffer slot is valid.
|
||||
{
|
||||
lock_guard<mutex> lock(ringbuffer_slots_mutex);
|
||||
lock_guard<mutex> lock(ringbuffer_slots_mutex_);
|
||||
|
||||
if (!ringbuffer_slots[frame_metadata->buffer_slot_index]) {
|
||||
stringstream error_message;
|
||||
if (!ringbuffer_slots_[frame_metadata->buffer_slot_index]) {
|
||||
stringstream err_msg;
|
||||
|
||||
using namespace date;
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::read] Ring buffer slot";
|
||||
error_message << " referenced in message header ";
|
||||
error_message << frame_metadata->buffer_slot_index << " is empty." << endl;
|
||||
err_msg << "[" << system_clock::now() << "]";
|
||||
err_msg << "[RingBuffer::read] Ring buffer slot";
|
||||
err_msg << " referenced in message header ";
|
||||
err_msg << frame_metadata->buffer_slot_index << " is empty.";
|
||||
err_msg << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
throw runtime_error(err_msg.str());
|
||||
}
|
||||
}
|
||||
|
||||
char* slot_memory_address = get_buffer_slot_address(frame_metadata->buffer_slot_index);
|
||||
|
||||
return {frame_metadata, slot_memory_address};
|
||||
return {frame_metadata,
|
||||
get_buffer_slot_address(frame_metadata->buffer_slot_index)};
|
||||
}
|
||||
|
||||
void RingBuffer::release(size_t buffer_slot_index)
|
||||
{
|
||||
// Cannot release a slot index that is out of range.
|
||||
if (buffer_slot_index >= n_slots) {
|
||||
stringstream error_message;
|
||||
if (buffer_slot_index >= n_slots_) {
|
||||
stringstream err_msg;
|
||||
|
||||
using namespace date;
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::release] Slot index to release ";
|
||||
error_message << buffer_slot_index << " is out of range.";
|
||||
error_message << " Ring buffer n_slots = " << n_slots << endl;
|
||||
err_msg << "[" << system_clock::now() << "]";
|
||||
err_msg << "[RingBuffer::release] Slot index ";
|
||||
err_msg << buffer_slot_index << " is out of range.";
|
||||
err_msg << " Ring buffer n_slots = " << n_slots_ << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
throw runtime_error(err_msg.str());
|
||||
}
|
||||
|
||||
{
|
||||
lock_guard<mutex> lock(ringbuffer_slots_mutex);
|
||||
lock_guard<mutex> lock(ringbuffer_slots_mutex_);
|
||||
|
||||
if (ringbuffer_slots[buffer_slot_index]) {
|
||||
ringbuffer_slots[buffer_slot_index] = 0;
|
||||
if (ringbuffer_slots_[buffer_slot_index]) {
|
||||
ringbuffer_slots_[buffer_slot_index] = 0;
|
||||
|
||||
buffer_used_slots--;
|
||||
buffer_used_slots_--;
|
||||
|
||||
} else {
|
||||
stringstream error_message;
|
||||
stringstream err_msg;
|
||||
|
||||
using namespace date;
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::release] Cannot release empty";
|
||||
error_message << " ring buffer slot " << buffer_slot_index << endl;
|
||||
err_msg << "[" << system_clock::now() << "]";
|
||||
err_msg << "[RingBuffer::release] Cannot release empty";
|
||||
err_msg << " ring buffer slot " << buffer_slot_index << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
throw runtime_error(err_msg.str());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool RingBuffer::is_empty()
|
||||
{
|
||||
lock_guard<mutex> lock(ringbuffer_slots_mutex);
|
||||
lock_guard<mutex> lock(ringbuffer_slots_mutex_);
|
||||
|
||||
return buffer_used_slots == 0;
|
||||
return buffer_used_slots_ == 0;
|
||||
}
|
||||
|
||||
void RingBuffer::clear()
|
||||
{
|
||||
lock_guard<mutex> lock_slots(ringbuffer_slots_mutex);
|
||||
lock_guard<mutex> lock_metadata(frame_metadata_queue_mutex);
|
||||
lock_guard<mutex> lock_slots(ringbuffer_slots_mutex_);
|
||||
lock_guard<mutex> lock_metadata(frame_metadata_queue_mutex_);
|
||||
|
||||
write_index = 0;
|
||||
buffer_used_slots = 0;
|
||||
ringbuffer_slots = vector<bool>(n_slots, 0);
|
||||
frame_metadata_queue.clear();
|
||||
write_index_ = 0;
|
||||
buffer_used_slots_ = 0;
|
||||
ringbuffer_slots_ = vector<bool>(n_slots_, 0);
|
||||
frame_metadata_queue_.clear();
|
||||
}
|
||||
|
||||
size_t RingBuffer::get_slot_size()
|
||||
{
|
||||
return slot_size;
|
||||
return slot_size_;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user