mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-04-23 11:50:44 +02:00
Add clear method to RingBuffer
Since the writer will be running all the time, if a user calls stop, the state of the RingBuffer needs to be cleared without re-creating the object.
This commit is contained in:
+74
-38
@@ -12,8 +12,10 @@ RingBuffer::RingBuffer(size_t n_slots) : n_slots(n_slots), ringbuffer_slots(n_sl
|
||||
{
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
cout << "[" << std::chrono::system_clock::now() << "]";
|
||||
cout << "[RingBuffer::RingBuffer] Creating ring buffer with n_slots " << n_slots << endl;
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
cout << "[RingBuffer::RingBuffer] Creating ring buffer";
|
||||
cout << " with n_slots " << n_slots << endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -28,20 +30,23 @@ RingBuffer::~RingBuffer()
|
||||
|
||||
void RingBuffer::initialize(size_t slot_size)
|
||||
{
|
||||
// Check if the ring buffer is already initialized.
|
||||
if (frame_data_buffer) {
|
||||
stringstream error_message;
|
||||
using namespace date;
|
||||
error_message << "[" << std::chrono::system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::initialize] Ring buffer already initialized." << endl;
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::initialize] Ring buffer";
|
||||
error_message << already initialized." << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
}
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
cout << "[" << std::chrono::system_clock::now() << "]";
|
||||
cout << "[RingBuffer::initialize] Initializing ring buffer with slot_size " << slot_size << endl;
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
cout << "[RingBuffer::initialize] Initializing ring buffer";
|
||||
cout << " with slot_size " << slot_size << endl;
|
||||
#endif
|
||||
|
||||
this->write_index = 0;
|
||||
@@ -53,26 +58,30 @@ void RingBuffer::initialize(size_t slot_size)
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
cout << "[" << std::chrono::system_clock::now() << "]";
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
cout << "[RingBuffer::initialize] Total buffer_size " << buffer_size << endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
void RingBuffer::write(shared_ptr<FrameMetadata> frame_metadata, const char* data)
|
||||
{
|
||||
// Initialize the buffer on the first write.
|
||||
if (!ring_buffer_initialized) {
|
||||
initialize(frame_metadata->frame_bytes_size);
|
||||
}
|
||||
|
||||
// All images must fit in the ring buffer.
|
||||
// All images must fit in the ring buffer slot.
|
||||
if (frame_metadata->frame_bytes_size > slot_size) {
|
||||
stringstream error_message;
|
||||
|
||||
using namespace date;
|
||||
error_message << "[" << std::chrono::system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::write] Received frame index "<< frame_metadata->frame_index;
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::write] Received frame index ";
|
||||
error_message << frame_metadata->frame_index;
|
||||
error_message << " that is too large for ring buffer slot. ";
|
||||
error_message << "Slot size " << slot_size << ", but frame bytes size " << frame_metadata->frame_bytes_size << endl;
|
||||
error_message << "Slot size " << slot_size << ", but frame bytes size ";
|
||||
error_message << frame_metadata->frame_bytes_size << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
}
|
||||
@@ -84,27 +93,29 @@ void RingBuffer::write(shared_ptr<FrameMetadata> frame_metadata, const char* dat
|
||||
if (!ringbuffer_slots[write_index]) {
|
||||
ringbuffer_slots[write_index] = 1;
|
||||
|
||||
// Set the write index in the FrameMetadata object.
|
||||
frame_metadata->buffer_slot_index = write_index;
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
cout << "[" << std::chrono::system_clock::now() << "]";
|
||||
cout << "[RingBuffer::write] Ring buffer slot " << frame_metadata->buffer_slot_index << " reserved for frame_index ";
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
cout << "[RingBuafer::write] Ring buffer slot ";
|
||||
cout << frame_metadata->buffer_slot_index;
|
||||
cout << " reserved for frame_index ";
|
||||
cout << frame_metadata->frame_index << endl;
|
||||
#endif
|
||||
|
||||
// Increase and wrap the write index around if needed.
|
||||
write_index = (write_index + 1) % n_slots;
|
||||
|
||||
// Keep track of the number of used slots.
|
||||
buffer_used_slots++;
|
||||
|
||||
} else {
|
||||
stringstream error_message;
|
||||
|
||||
using namespace date;
|
||||
error_message << "[" << std::chrono::system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::write] Ring buffer is full. Collision at write_index = " << write_index << endl;
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::write] Ring buffer is full.";
|
||||
error_message << " Collision at write_index = " << write_index << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
}
|
||||
@@ -116,8 +127,10 @@ void RingBuffer::write(shared_ptr<FrameMetadata> frame_metadata, const char* dat
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
cout << "[" << std::chrono::system_clock::now() << "]";
|
||||
cout << "[RingBuffer::write] Copied " << frame_metadata->frame_bytes_size << " frame bytes to buffer_slot_index ";
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
cout << "[RingBuffer::write] Copied " << frame_metadata->frame_bytes_size;
|
||||
cout << " frame bytes to buffer_slot_index ";
|
||||
cout << frame_metadata->buffer_slot_index << endl;
|
||||
#endif
|
||||
|
||||
@@ -131,7 +144,8 @@ void RingBuffer::write(shared_ptr<FrameMetadata> frame_metadata, const char* dat
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
cout << "[" << std::chrono::system_clock::now() << "]";
|
||||
cout << "[RingBuffer::write] Metadata for frame_index " << frame_metadata->frame_index << " added to metadata queue." << endl;
|
||||
cout << "[RingBuffer::write] Metadata for frame_index ";
|
||||
cout << frame_metadata->frame_index << " added to metadata queue." << endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -142,9 +156,12 @@ char* RingBuffer::get_buffer_slot_address(size_t buffer_slot_index)
|
||||
// Check if the memory address is valid.
|
||||
if (slot_memory_address > frame_data_buffer + buffer_size) {
|
||||
stringstream error_message;
|
||||
|
||||
using namespace date;
|
||||
error_message << "[" << std::chrono::system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::get_buffer_slot_address] Calculated ring buffer address is out of bound for buffer_slot_index ";
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::get_buffer_slot_address] Calculated";
|
||||
error_message << " ring buffer address is out of bound for buffer_slot_index ";
|
||||
error_message << buffer_slot_index << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
@@ -157,7 +174,6 @@ pair<shared_ptr<FrameMetadata>, char*> RingBuffer::read()
|
||||
{
|
||||
shared_ptr<FrameMetadata> frame_metadata;
|
||||
|
||||
// Read data from the metadata queue.
|
||||
{
|
||||
lock_guard<mutex> lock(frame_metadata_queue_mutex);
|
||||
|
||||
@@ -172,8 +188,10 @@ pair<shared_ptr<FrameMetadata>, char*> RingBuffer::read()
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
cout << "[" << std::chrono::system_clock::now() << "]";
|
||||
cout << "[RingBuffer::read] Received metadata for frame_index " << frame_metadata->frame_index << endl;
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
cout << "[RingBuffer::read] Received metadata for frame_index ";
|
||||
cout << frame_metadata->frame_index << endl;
|
||||
#endif
|
||||
|
||||
// Check if the references ring buffer slot is valid.
|
||||
@@ -182,9 +200,12 @@ pair<shared_ptr<FrameMetadata>, char*> RingBuffer::read()
|
||||
|
||||
if (!ringbuffer_slots[frame_metadata->buffer_slot_index]) {
|
||||
stringstream error_message;
|
||||
|
||||
using namespace date;
|
||||
error_message << "[" << std::chrono::system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::read] Ring buffer slot referenced in message header ";
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::read] Ring buffer slot";
|
||||
error_message << " referenced in message header ";
|
||||
error_message << frame_metadata->buffer_slot_index << " is empty." << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
@@ -201,29 +222,33 @@ void RingBuffer::release(size_t buffer_slot_index)
|
||||
// Cannot release a slot index that is out of range.
|
||||
if (buffer_slot_index >= n_slots) {
|
||||
stringstream error_message;
|
||||
|
||||
using namespace date;
|
||||
error_message << "[" << std::chrono::system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::release] Slot index to release " << buffer_slot_index;
|
||||
error_message << " is out of range. Ring buffer n_slots = " << n_slots << endl;
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::release] Slot index to release ";
|
||||
error_message << buffer_slot_index << " is out of range.";
|
||||
error_message << " Ring buffer n_slots = " << n_slots << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
}
|
||||
|
||||
// Release the buffer slot.
|
||||
{
|
||||
lock_guard<mutex> lock(ringbuffer_slots_mutex);
|
||||
|
||||
if (ringbuffer_slots[buffer_slot_index]) {
|
||||
ringbuffer_slots[buffer_slot_index] = 0;
|
||||
|
||||
// Keep track of the number of used slots.
|
||||
buffer_used_slots--;
|
||||
|
||||
} else {
|
||||
stringstream error_message;
|
||||
|
||||
using namespace date;
|
||||
error_message << "[" << std::chrono::system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::release] Cannot release empty ring buffer slot " << buffer_slot_index << endl;
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::release] Cannot release empty";
|
||||
error_message << " ring buffer slot " << buffer_slot_index << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
}
|
||||
@@ -236,3 +261,14 @@ bool RingBuffer::is_empty()
|
||||
|
||||
return buffer_used_slots == 0;
|
||||
}
|
||||
|
||||
bool RingBuffer::clear()
|
||||
{
|
||||
lock_guard<mutex> lock(ringbuffer_slots_mutex);
|
||||
lock_guard<mutex> lock(frame_metadata_queue_mutex);
|
||||
|
||||
write_index = 0;
|
||||
buffer_used_slots == 0;
|
||||
ringbuffer_slots = vector<bool>(n_slots, 0);
|
||||
frame_metadata_queue.clear();
|
||||
}
|
||||
|
||||
@@ -56,6 +56,7 @@ class RingBuffer
|
||||
std::pair<std::shared_ptr<FrameMetadata>, char*> read();
|
||||
void release(size_t buffer_slot_index);
|
||||
bool is_empty();
|
||||
void clear();
|
||||
};
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user