mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-04-29 00:52:21 +02:00
Move FastQueue to sf_stream
This commit is contained in:
@@ -1,40 +0,0 @@
|
||||
#ifndef FASTQUEUE_HPP
|
||||
#define FASTQUEUE_HPP
|
||||
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <atomic>
|
||||
|
||||
template <class T>
|
||||
class FastQueue {
|
||||
const size_t slot_n_bytes_;
|
||||
const size_t n_slots_;
|
||||
char* buffer_;
|
||||
std::atomic_int* buffer_status_;
|
||||
|
||||
std::atomic_uint16_t write_slot_id_;
|
||||
std::atomic_uint16_t read_slot_id_;
|
||||
|
||||
public:
|
||||
|
||||
enum SLOT_STATUS {
|
||||
EMPTY=0,
|
||||
RESERVED=1,
|
||||
READY=2
|
||||
};
|
||||
|
||||
FastQueue(const size_t slot_data_n_bytes, const uint16_t n_slots);
|
||||
virtual ~FastQueue();
|
||||
|
||||
T* get_metadata_buffer(const int slot_id);
|
||||
char* get_data_buffer(const int slot_id);
|
||||
|
||||
int reserve();
|
||||
void commit();
|
||||
|
||||
int read();
|
||||
void release();
|
||||
};
|
||||
|
||||
|
||||
#endif //FASTQUEUE_HPP
|
||||
@@ -1,107 +0,0 @@
|
||||
#include <stdexcept>
|
||||
#include <formats.hpp>
|
||||
#include <jungfrau.hpp>
|
||||
#include "FastQueue.hpp"
|
||||
|
||||
using namespace std;
|
||||
|
||||
template <class T>
|
||||
FastQueue<T>::FastQueue(
|
||||
const size_t slot_data_n_bytes,
|
||||
const uint16_t n_slots) :
|
||||
slot_n_bytes_(slot_data_n_bytes + sizeof(T)),
|
||||
n_slots_(n_slots)
|
||||
{
|
||||
buffer_ = new char[slot_n_bytes_ * n_slots_];
|
||||
buffer_status_ = new atomic_int[n_slots];
|
||||
|
||||
// TODO: Are atomic variables initialized?
|
||||
for (size_t i=0; i < n_slots_; i++) {
|
||||
buffer_status_[i] = 0;
|
||||
}
|
||||
|
||||
write_slot_id_ = 0;
|
||||
read_slot_id_ = 0;
|
||||
}
|
||||
|
||||
template <class T>
|
||||
FastQueue<T>::~FastQueue()
|
||||
{
|
||||
delete[] buffer_;
|
||||
delete[] buffer_status_;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
T* FastQueue<T>::get_metadata_buffer(const int slot_id)
|
||||
{
|
||||
return (T*)(buffer_ + (slot_id * slot_n_bytes_));
|
||||
}
|
||||
|
||||
template<class T>
|
||||
char* FastQueue<T>::get_data_buffer(const int slot_id)
|
||||
{
|
||||
return (char*)(buffer_ + (slot_id * slot_n_bytes_) + sizeof(T));
|
||||
}
|
||||
|
||||
template<class T>
|
||||
int FastQueue<T>::reserve()
|
||||
{
|
||||
int expected = SLOT_STATUS::EMPTY;
|
||||
// If (buffer_status==SLOT_EMPTY) buffer_status=SLOT_RESERVED.
|
||||
bool slot_reserved =
|
||||
buffer_status_[write_slot_id_].compare_exchange_strong(
|
||||
expected, SLOT_STATUS::RESERVED);
|
||||
|
||||
if (!slot_reserved) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return write_slot_id_;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
void FastQueue<T>::commit()
|
||||
{
|
||||
int expected = SLOT_STATUS::RESERVED;
|
||||
// If (buffer_status==SLOT_RESERVED) buffer_status=SLOT_READY.
|
||||
bool slot_ready =
|
||||
buffer_status_[write_slot_id_].compare_exchange_strong(
|
||||
expected, SLOT_STATUS::READY);
|
||||
|
||||
if (!slot_ready) {
|
||||
throw runtime_error("Slot should be reserved first.");
|
||||
}
|
||||
|
||||
write_slot_id_++;
|
||||
write_slot_id_ = write_slot_id_ % n_slots_;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
int FastQueue<T>::read()
|
||||
{
|
||||
if (buffer_status_[read_slot_id_] != SLOT_STATUS::READY) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return read_slot_id_;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
void FastQueue<T>::release()
|
||||
{
|
||||
int expected = SLOT_STATUS::READY;
|
||||
// If (buffer_status==SLOT_RESERVED) buffer_status=SLOT_READY.
|
||||
bool slot_empty =
|
||||
buffer_status_[read_slot_id_].compare_exchange_strong(
|
||||
expected, SLOT_STATUS::EMPTY);
|
||||
|
||||
if (!slot_empty) {
|
||||
throw runtime_error("Slot should be ready first.");
|
||||
}
|
||||
|
||||
read_slot_id_++;
|
||||
read_slot_id_ = read_slot_id_ % n_slots_;
|
||||
}
|
||||
|
||||
template class FastQueue<ModuleFrame>;
|
||||
template class FastQueue<ModuleFrameBuffer>;
|
||||
@@ -1,7 +1,6 @@
|
||||
#include "gtest/gtest.h"
|
||||
#include "test_buffer_utils.cpp"
|
||||
#include "test_bitshuffle.cpp"
|
||||
#include "test_FastQueue.cpp"
|
||||
|
||||
using namespace std;
|
||||
|
||||
|
||||
@@ -1,150 +0,0 @@
|
||||
#include "FastQueue.hpp"
|
||||
#include "formats.hpp"
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
using namespace buffer_config;
|
||||
|
||||
TEST(FastQueue, basic_interaction)
|
||||
{
|
||||
size_t n_slots = 5;
|
||||
size_t slot_data_n_bytes = MODULE_N_BYTES * 2;
|
||||
FastQueue<ImageMetadata> queue(slot_data_n_bytes, n_slots);
|
||||
int slot_id;
|
||||
|
||||
// The queue at the beginning should be empty.
|
||||
ASSERT_EQ(queue.read(), -1);
|
||||
// Cannot commit a slot until you reserve it.
|
||||
ASSERT_THROW(queue.commit(), runtime_error);
|
||||
// Cannot release a slot until its ready.
|
||||
ASSERT_THROW(queue.release(), runtime_error);
|
||||
|
||||
// Reserve a slot.
|
||||
slot_id = queue.reserve();
|
||||
ASSERT_NE(slot_id, -1);
|
||||
// But you cannot reserve 2 slots at once.
|
||||
ASSERT_EQ(queue.reserve(), -1);
|
||||
// And cannot read this slot until its committed.
|
||||
ASSERT_EQ(queue.read(), -1);
|
||||
|
||||
auto detector_frame = queue.get_metadata_buffer(slot_id);
|
||||
char* meta_ptr = (char*) detector_frame;
|
||||
char* data_ptr = (char*) queue.get_data_buffer(slot_id);
|
||||
|
||||
queue.commit();
|
||||
|
||||
slot_id = queue.read();
|
||||
// Once the slot is committed we should be able to read it.
|
||||
ASSERT_NE(slot_id, -1);
|
||||
// You can read the same slot multiple times.
|
||||
ASSERT_NE(queue.read(), -1);
|
||||
// The 2 buffers should match the committed slot.
|
||||
ASSERT_EQ(meta_ptr, (char*)(queue.get_metadata_buffer(slot_id)));
|
||||
ASSERT_EQ(data_ptr, (char*)(queue.get_data_buffer(slot_id)));
|
||||
|
||||
queue.release();
|
||||
}
|
||||
|
||||
TEST(FastQueue, queue_full)
|
||||
{
|
||||
size_t n_slots = 5;
|
||||
size_t slot_data_n_bytes = MODULE_N_BYTES * 2;
|
||||
FastQueue<ImageMetadata> queue(slot_data_n_bytes, n_slots);
|
||||
|
||||
// There is nothing to be read in the queue.
|
||||
ASSERT_EQ(queue.read(), -1);
|
||||
|
||||
for (size_t i=0; i<n_slots; i++) {
|
||||
// Business as usual here, we still have slots left.
|
||||
ASSERT_NE(queue.reserve(), -1);
|
||||
queue.commit();
|
||||
}
|
||||
|
||||
// There are no more slots available.
|
||||
ASSERT_EQ(queue.reserve(), -1);
|
||||
// We now read the first slot.
|
||||
ASSERT_EQ(queue.read(), 0);
|
||||
// But until we release it we cannot re-use it.
|
||||
ASSERT_EQ(queue.reserve(), -1);
|
||||
|
||||
queue.release();
|
||||
// After the release, the first slot is again ready for writing.
|
||||
ASSERT_EQ(queue.reserve(), 0);
|
||||
}
|
||||
|
||||
TEST(FastQueue, data_transfer)
|
||||
{
|
||||
size_t n_slots = 5;
|
||||
size_t slot_data_n_bytes = MODULE_N_BYTES * 2;
|
||||
FastQueue<ImageMetadata> queue(slot_data_n_bytes, n_slots);
|
||||
|
||||
int write_slot_id = queue.reserve();
|
||||
|
||||
auto w_metadata = queue.get_metadata_buffer(write_slot_id);
|
||||
w_metadata->pulse_id = 1;
|
||||
w_metadata->frame_index = 2;
|
||||
w_metadata->daq_rec = 3;
|
||||
w_metadata->is_good_frame = 4;
|
||||
|
||||
auto w_data = (uint16_t*)(queue.get_data_buffer(write_slot_id));
|
||||
for (size_t i=0; i<MODULE_N_PIXELS; i++) {
|
||||
w_data[i] = (uint16_t) i;
|
||||
}
|
||||
|
||||
queue.commit();
|
||||
|
||||
auto read_slot_id = queue.read();
|
||||
|
||||
auto r_metadata = queue.get_metadata_buffer(read_slot_id);
|
||||
EXPECT_EQ(w_metadata->pulse_id,
|
||||
r_metadata->pulse_id);
|
||||
EXPECT_EQ(w_metadata->frame_index,
|
||||
r_metadata->frame_index);
|
||||
EXPECT_EQ(w_metadata->daq_rec,
|
||||
r_metadata->daq_rec);
|
||||
EXPECT_EQ(w_metadata->is_good_frame,
|
||||
r_metadata->is_good_frame);
|
||||
|
||||
auto r_data = (uint16_t*)(queue.get_data_buffer(read_slot_id));
|
||||
for (size_t i=0; i<MODULE_N_PIXELS; i++) {
|
||||
ASSERT_EQ(r_data[i], (uint16_t) i);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(FaseQueue, array_parameter)
|
||||
{
|
||||
size_t n_modules = 32;
|
||||
FastQueue<ModuleFrameBuffer> queue(
|
||||
n_modules * MODULE_N_BYTES,
|
||||
WRITER_FASTQUEUE_N_SLOTS);
|
||||
|
||||
ModuleFrame frame;
|
||||
|
||||
auto slot_id = queue.reserve();
|
||||
auto metadata = queue.get_metadata_buffer(slot_id);
|
||||
|
||||
for (int i_module=0; i_module<n_modules; i_module++) {
|
||||
auto& module_metadata = metadata->module[i_module];
|
||||
|
||||
frame.pulse_id = i_module;
|
||||
frame.frame_index = i_module;
|
||||
frame.daq_rec = i_module;
|
||||
frame.n_recv_packets = i_module;
|
||||
frame.module_id = i_module;
|
||||
|
||||
ModuleFrame* p_metadata = &module_metadata;
|
||||
|
||||
memcpy(p_metadata, &frame, sizeof(ModuleFrame));
|
||||
}
|
||||
|
||||
for (int i_module=0; i_module<n_modules; i_module++) {
|
||||
auto& module_metadata = metadata->module[i_module];
|
||||
|
||||
ASSERT_EQ(module_metadata.pulse_id, i_module);
|
||||
ASSERT_EQ(module_metadata.frame_index, i_module);
|
||||
ASSERT_EQ(module_metadata.daq_rec, i_module);
|
||||
ASSERT_EQ(module_metadata.n_recv_packets, i_module);
|
||||
ASSERT_EQ(module_metadata.module_id, i_module);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Test with payload of zero (metadata only).
|
||||
Reference in New Issue
Block a user