mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-07 16:04:12 +02:00
Lockguards in RingBuffer
This commit is contained in:
+47
-50
@@ -68,35 +68,35 @@ void RingBuffer::write(FrameMetadata &frame_metadata, const char* data)
|
||||
}
|
||||
|
||||
// Check and reserve slot in the buffer.
|
||||
ringbuffer_slots_mutex.lock();
|
||||
{
|
||||
lock_guard<mutex> lock(ringbuffer_slots_mutex);
|
||||
|
||||
if (!ringbuffer_slots[write_index]) {
|
||||
ringbuffer_slots[write_index] = 1;
|
||||
|
||||
// Set the write index in the FrameMetadata object.
|
||||
frame_metadata.buffer_slot_index = write_index;
|
||||
if (!ringbuffer_slots[write_index]) {
|
||||
ringbuffer_slots[write_index] = 1;
|
||||
|
||||
// Set the write index in the FrameMetadata object.
|
||||
frame_metadata.buffer_slot_index = write_index;
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
cout << "[RingBuffer::write] Ring buffer slot " << frame_metadata.buffer_slot_index << " reserved for frame_index ";
|
||||
cout << frame_metadata.frame_index << endl;
|
||||
#endif
|
||||
#ifdef DEBUG_OUTPUT
|
||||
cout << "[RingBuffer::write] Ring buffer slot " << frame_metadata.buffer_slot_index << " reserved for frame_index ";
|
||||
cout << frame_metadata.frame_index << endl;
|
||||
#endif
|
||||
|
||||
// Increase and wrap the write index around if needed.
|
||||
write_index = (write_index + 1) % n_slots;
|
||||
// Increase and wrap the write index around if needed.
|
||||
write_index = (write_index + 1) % n_slots;
|
||||
|
||||
// Keep track of the number of used slots.
|
||||
buffer_used_slots++;
|
||||
// Keep track of the number of used slots.
|
||||
buffer_used_slots++;
|
||||
|
||||
} else {
|
||||
stringstream error_message;
|
||||
error_message << "[RingBuffer::write] Ring buffer is full. Collision at write_index = " << write_index << endl;
|
||||
} else {
|
||||
stringstream error_message;
|
||||
error_message << "[RingBuffer::write] Ring buffer is full. Collision at write_index = " << write_index << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
throw runtime_error(error_message.str());
|
||||
}
|
||||
}
|
||||
|
||||
ringbuffer_slots_mutex.unlock();
|
||||
|
||||
// Write to the buffer. The slot is already reserved.
|
||||
// The slot is already reserved, no need for synchronization.
|
||||
char* slot_memory_address = get_buffer_slot_address(frame_metadata.buffer_slot_index);
|
||||
memcpy(slot_memory_address, data, frame_metadata.frame_bytes_size);
|
||||
|
||||
@@ -104,13 +104,13 @@ void RingBuffer::write(FrameMetadata &frame_metadata, const char* data)
|
||||
cout << "[RingBuffer::write] Copied " << frame_metadata.frame_bytes_size << " frame bytes to buffer_slot_index ";
|
||||
cout << frame_metadata.buffer_slot_index << endl;
|
||||
#endif
|
||||
|
||||
frame_metadata_queue_mutex.lock();
|
||||
|
||||
// Send the metadata header to writing process.
|
||||
frame_metadata_queue.push_back(frame_metadata);
|
||||
// Add metadata header to the inter-thread communication queue.
|
||||
{
|
||||
lock_guard<mutex> lock(frame_metadata_queue_mutex);
|
||||
|
||||
frame_metadata_queue_mutex.unlock();
|
||||
frame_metadata_queue.push_back(frame_metadata);
|
||||
}
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
cout << "[RingBuffer::write] Metadata for frame_index " << frame_metadata.frame_index << " added to metadata queue." << endl;
|
||||
@@ -142,37 +142,37 @@ pair<FrameMetadata, char*> RingBuffer::read()
|
||||
{
|
||||
FrameMetadata frame_metadata;
|
||||
|
||||
frame_metadata_queue_mutex.lock();
|
||||
// Read data from the metadata queue.
|
||||
{
|
||||
lock_guard<mutex> lock(frame_metadata_queue_mutex);
|
||||
|
||||
if (frame_metadata_queue.empty()) {
|
||||
frame_metadata_queue_mutex.unlock();
|
||||
// A NULL char* indicates that there are no available data in the ring buffer.
|
||||
if (frame_metadata_queue.empty()) {
|
||||
return {frame_metadata, NULL};
|
||||
}
|
||||
|
||||
return {frame_metadata, NULL};
|
||||
frame_metadata = frame_metadata_queue.front();
|
||||
frame_metadata_queue.pop_front();
|
||||
}
|
||||
|
||||
frame_metadata = frame_metadata_queue.front();
|
||||
frame_metadata_queue.pop_front();
|
||||
|
||||
frame_metadata_queue_mutex.unlock();
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
cout << "[RingBuffer::read] Received metadata for frame_index " << frame_metadata.frame_index << endl;
|
||||
#endif
|
||||
|
||||
// Check if the references ring buffer slot is valid.
|
||||
ringbuffer_slots_mutex.lock();
|
||||
{
|
||||
lock_guard<mutex> lock(ringbuffer_slots_mutex);
|
||||
|
||||
if (!ringbuffer_slots[frame_metadata.buffer_slot_index]) {
|
||||
stringstream error_message;
|
||||
error_message << "[RingBuffer::read] Ring buffer slot referenced in message header ";
|
||||
error_message << frame_metadata.buffer_slot_index << " is empty." << endl;
|
||||
if (!ringbuffer_slots[frame_metadata.buffer_slot_index]) {
|
||||
stringstream error_message;
|
||||
error_message << "[RingBuffer::read] Ring buffer slot referenced in message header ";
|
||||
error_message << frame_metadata.buffer_slot_index << " is empty." << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
throw runtime_error(error_message.str());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
ringbuffer_slots_mutex.unlock();
|
||||
|
||||
// Memory address of frame in buffer.
|
||||
char* slot_memory_address = get_buffer_slot_address(frame_metadata.buffer_slot_index);
|
||||
|
||||
return {frame_metadata, slot_memory_address};
|
||||
@@ -210,10 +210,7 @@ void RingBuffer::release(size_t buffer_slot_index)
|
||||
|
||||
bool RingBuffer::is_empty()
|
||||
{
|
||||
ringbuffer_slots_mutex.lock();
|
||||
|
||||
bool is_empty = buffer_used_slots == 0;
|
||||
|
||||
ringbuffer_slots_mutex.unlock();
|
||||
return is_empty;
|
||||
lock_guard<mutex> lock(ringbuffer_slots_mutex);
|
||||
|
||||
return buffer_used_slots == 0;
|
||||
}
|
||||
Reference in New Issue
Block a user