mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-06 06:24:13 +02:00
Split ringbuffer write to reserve and commit
Since the introduction of compression, it is no longer practival to do a manual memcopy int he ringbuffer write method. In order to save a memory copy, the ringbuffer does not copy the buffer itself, but the compression is done directly to the ringbuffer slot.
This commit is contained in:
+10
-20
@@ -64,7 +64,7 @@ void RingBuffer::initialize(size_t slot_size)
|
||||
#endif
|
||||
}
|
||||
|
||||
void RingBuffer::write(shared_ptr<FrameMetadata> frame_metadata, const char* data)
|
||||
char* RingBuffer::reserve(shared_ptr<FrameMetadata> frame_metadata)
|
||||
{
|
||||
if (!ring_buffer_initialized) {
|
||||
initialize(frame_metadata->frame_bytes_size);
|
||||
@@ -77,7 +77,7 @@ void RingBuffer::write(shared_ptr<FrameMetadata> frame_metadata, const char* dat
|
||||
using namespace date;
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::write] Received frame index ";
|
||||
error_message << "[RingBuffer::reserve] Received frame index ";
|
||||
error_message << frame_metadata->frame_index;
|
||||
error_message << " that is too large for ring buffer slot. ";
|
||||
error_message << "Slot size " << slot_size << ", but frame bytes size ";
|
||||
@@ -99,7 +99,7 @@ void RingBuffer::write(shared_ptr<FrameMetadata> frame_metadata, const char* dat
|
||||
using namespace date;
|
||||
using namespace chrono;
|
||||
cout << "[" << system_clock::now() << "]";
|
||||
cout << "[RingBuafer::write] Ring buffer slot ";
|
||||
cout << "[RingBuafer::reserve] Ring buffer slot ";
|
||||
cout << frame_metadata->buffer_slot_index;
|
||||
cout << " reserved for frame_index ";
|
||||
cout << frame_metadata->frame_index << endl;
|
||||
@@ -114,7 +114,7 @@ void RingBuffer::write(shared_ptr<FrameMetadata> frame_metadata, const char* dat
|
||||
using namespace date;
|
||||
using namespace chrono;
|
||||
error_message << "[" << system_clock::now() << "]";
|
||||
error_message << "[RingBuffer::write] Ring buffer is full.";
|
||||
error_message << "[RingBuffer::reserve] Ring buffer is full.";
|
||||
error_message << " Collision at write_index = " << write_index << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
@@ -122,24 +122,14 @@ void RingBuffer::write(shared_ptr<FrameMetadata> frame_metadata, const char* dat
|
||||
}
|
||||
|
||||
// The slot is already reserved, no need for synchronization.
|
||||
char* slot_memory_address = get_buffer_slot_address(frame_metadata->buffer_slot_index);
|
||||
memcpy(slot_memory_address, data, frame_metadata->frame_bytes_size);
|
||||
return get_buffer_slot_address(frame_metadata->buffer_slot_index);
|
||||
}
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
using namespace chrono;
|
||||
cout << "[" << system_clock::now() << "]";
|
||||
cout << "[RingBuffer::write] Copied " << frame_metadata->frame_bytes_size;
|
||||
cout << " frame bytes to buffer_slot_index ";
|
||||
cout << frame_metadata->buffer_slot_index << endl;
|
||||
#endif
|
||||
void RingBuffer::commit(shared_ptr<FrameMetadata> frame_metadata)
|
||||
{
|
||||
lock_guard<mutex> lock(frame_metadata_queue_mutex);
|
||||
|
||||
// Add metadata header to the inter-thread communication queue.
|
||||
{
|
||||
lock_guard<mutex> lock(frame_metadata_queue_mutex);
|
||||
|
||||
frame_metadata_queue.push_back(frame_metadata);
|
||||
}
|
||||
frame_metadata_queue.push_back(frame_metadata);
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
|
||||
@@ -52,7 +52,8 @@ class RingBuffer
|
||||
virtual ~RingBuffer();
|
||||
void initialize(size_t slot_size);
|
||||
|
||||
void write(const std::shared_ptr<FrameMetadata> metadata, const char* data);
|
||||
char* reserve(std::shared_ptr<FrameMetadata> metadata);
|
||||
void commit(std::shared_ptr<FrameMetadata> metadata);
|
||||
std::pair<std::shared_ptr<FrameMetadata>, char*> read();
|
||||
void release(size_t buffer_slot_index);
|
||||
bool is_empty();
|
||||
|
||||
Reference in New Issue
Block a user