diff --git a/core-buffer/CMakeLists.txt b/core-buffer/CMakeLists.txt index 5db21bf..f8e3b5c 100644 --- a/core-buffer/CMakeLists.txt +++ b/core-buffer/CMakeLists.txt @@ -5,11 +5,10 @@ file(GLOB SOURCES add_library(core-buffer-lib STATIC ${SOURCES}) target_include_directories(core-buffer-lib PUBLIC include/) -target_include_directories(core-buffer-lib PUBLIC external/) -if(CMAKE_BUILD_TYPE STREQUAL "Debug") - target_compile_definitions(core-buffer-lib PRIVATE DEBUG_OUTPUT) -endif() +#if(CMAKE_BUILD_TYPE STREQUAL "Debug") +# target_compile_definitions(core-buffer-lib PRIVATE DEBUG_OUTPUT) +#endif() #add_executable(sf-replay src/replay/sf_replay.cpp) #set_target_properties(sf-replay PROPERTIES OUTPUT_NAME sf_replay) diff --git a/core-buffer/include/FastQueue.hpp b/core-buffer/include/FastQueue.hpp new file mode 100644 index 0000000..4103c55 --- /dev/null +++ b/core-buffer/include/FastQueue.hpp @@ -0,0 +1,40 @@ +#ifndef FASTQUEUE_HPP +#define FASTQUEUE_HPP + +#include +#include +#include + +template +class FastQueue { + const size_t slot_n_bytes_; + const size_t n_slots_; + char* buffer_; + std::atomic_int* buffer_status_; + + uint16_t write_slot_id_; + uint16_t read_slot_id_; + +public: + + enum SLOT_STATUS { + EMPTY=0, + RESERVED=1, + READY=2 + }; + + FastQueue(const size_t slot_data_n_bytes, const uint16_t n_slots); + virtual ~FastQueue(); + + T* get_metadata_buffer(const int slot_id); + char* get_data_buffer(const int slot_id); + + int reserve(); + void commit(); + + int read(); + void release(); +}; + + +#endif //FASTQUEUE_HPP diff --git a/core-buffer/include/formats.hpp b/core-buffer/include/formats.hpp new file mode 100644 index 0000000..9c83b88 --- /dev/null +++ b/core-buffer/include/formats.hpp @@ -0,0 +1,25 @@ +#ifndef SF_DAQ_BUFFER_FORMATS_HPP +#define SF_DAQ_BUFFER_FORMATS_HPP + +#include "buffer_config.hpp" + +struct ImageMetadataBuffer +{ + uint64_t pulse_id[core_buffer::WRITER_DATA_CACHE_N_IMAGES]; + uint64_t frame_index[core_buffer::WRITER_DATA_CACHE_N_IMAGES]; + uint32_t daq_rec[core_buffer::WRITER_DATA_CACHE_N_IMAGES]; + uint8_t is_good_frame[core_buffer::WRITER_DATA_CACHE_N_IMAGES]; + uint64_t data_n_bytes[core_buffer::WRITER_DATA_CACHE_N_IMAGES]; + uint16_t n_pulses_in_buffer; +}; + +struct ImageMetadata +{ + uint64_t pulse_id; + uint64_t frame_index; + uint32_t daq_rec; + uint8_t is_good_frame; + uint64_t data_n_bytes; +}; + +#endif //SF_DAQ_BUFFER_FORMATS_HPP diff --git a/core-buffer/src/FastQueue.cpp b/core-buffer/src/FastQueue.cpp new file mode 100644 index 0000000..73ea5c5 --- /dev/null +++ b/core-buffer/src/FastQueue.cpp @@ -0,0 +1,109 @@ +#include +#include +#include +#include "FastQueue.hpp" + +using namespace std; + +template +FastQueue::FastQueue( + const size_t slot_data_n_bytes, + const uint16_t n_slots) : + slot_n_bytes_(slot_data_n_bytes + sizeof(T)), + n_slots_(n_slots) +{ + buffer_ = new char[slot_n_bytes_ * n_slots_]; + buffer_status_ = new atomic_int[n_slots]; + + // TODO: Are atomic variables initialized? + for (size_t i=0; i < n_slots_; i++) { + buffer_status_[i] = 0; + } + + write_slot_id_ = 0; + read_slot_id_ = 0; +} + +template +FastQueue::~FastQueue() +{ + delete[] buffer_; + delete[] buffer_status_; +} + +template +T* FastQueue::get_metadata_buffer(const int slot_id) +{ + return (T*)(buffer_ + (slot_id * slot_n_bytes_)); +} + +template +char* FastQueue::get_data_buffer(const int slot_id) +{ + return (char*)(buffer_ + (slot_id * slot_n_bytes_) + sizeof(T)); +} + +template +int FastQueue::reserve() +{ + int expected = SLOT_STATUS::EMPTY; + // If (buffer_status==SLOT_EMPTY) buffer_status=SLOT_RESERVED. + bool slot_reserved = + buffer_status_[write_slot_id_].compare_exchange_strong( + expected, SLOT_STATUS::RESERVED); + + if (!slot_reserved) { + return -1; + } + + return write_slot_id_; +} + +template +void FastQueue::commit() +{ + int expected = SLOT_STATUS::RESERVED; + // If (buffer_status==SLOT_RESERVED) buffer_status=SLOT_READY. + bool slot_ready = + buffer_status_[write_slot_id_].compare_exchange_strong( + expected, SLOT_STATUS::READY); + + if (!slot_ready) { + throw runtime_error("Slot should be reserved first."); + } + + write_slot_id_++; + write_slot_id_ %= n_slots_; +} + +template +int FastQueue::read() +{ + if (buffer_status_[read_slot_id_] != SLOT_STATUS::READY) { + return -1; + } + + return read_slot_id_; +} + +template +void FastQueue::release() +{ + int expected = SLOT_STATUS::READY; + // If (buffer_status==SLOT_RESERVED) buffer_status=SLOT_READY. + bool slot_empty = + buffer_status_[read_slot_id_].compare_exchange_strong( + expected, SLOT_STATUS::EMPTY); + + if (!slot_empty) { + throw runtime_error("Slot should be ready first."); + } + + read_slot_id_++; + read_slot_id_ %= n_slots_; +} + +template class FastQueue; +template class FastQueue; +template class FastQueue; +template class FastQueue; diff --git a/core-buffer/test/main.cpp b/core-buffer/test/main.cpp index e379d99..9dd8358 100644 --- a/core-buffer/test/main.cpp +++ b/core-buffer/test/main.cpp @@ -1,6 +1,7 @@ #include "gtest/gtest.h" #include "test_buffer_utils.cpp" #include "test_bitshuffle.cpp" +#include "test_FastQueue.cpp" using namespace std; diff --git a/core-buffer/test/test_FastQueue.cpp b/core-buffer/test/test_FastQueue.cpp new file mode 100644 index 0000000..0d31142 --- /dev/null +++ b/core-buffer/test/test_FastQueue.cpp @@ -0,0 +1,148 @@ +#include "FastQueue.hpp" +#include "formats.hpp" +#include "gtest/gtest.h" + +using namespace core_buffer; + +TEST(FastQueue, basic_interaction) +{ + size_t n_slots = 5; + size_t slot_data_n_bytes = MODULE_N_BYTES * 2; + FastQueue queue(slot_data_n_bytes, n_slots); + int slot_id; + + // The queue at the beginning should be empty. + ASSERT_EQ(queue.read(), -1); + // Cannot commit a slot until you reserve it. + ASSERT_THROW(queue.commit(), runtime_error); + // Cannot release a slot until its ready. + ASSERT_THROW(queue.release(), runtime_error); + + // Reserve a slot. + slot_id = queue.reserve(); + ASSERT_NE(slot_id, -1); + // But you cannot reserve 2 slots at once. + ASSERT_EQ(queue.reserve(), -1); + // And cannot read this slot until its committed. + ASSERT_EQ(queue.read(), -1); + + auto detector_frame = queue.get_metadata_buffer(slot_id); + char* meta_ptr = (char*) detector_frame; + char* data_ptr = (char*) queue.get_data_buffer(slot_id); + + queue.commit(); + + slot_id = queue.read(); + // Once the slot is committed we should be able to read it. + ASSERT_NE(slot_id, -1); + // You can read the same slot multiple times. + ASSERT_NE(queue.read(), -1); + // The 2 buffers should match the committed slot. + ASSERT_EQ(meta_ptr, (char*)(queue.get_metadata_buffer(slot_id))); + ASSERT_EQ(data_ptr, (char*)(queue.get_data_buffer(slot_id))); + + queue.release(); +} + +TEST(FastQueue, queue_full) +{ + size_t n_slots = 5; + size_t slot_data_n_bytes = MODULE_N_BYTES * 2; + FastQueue queue(slot_data_n_bytes, n_slots); + + // There is nothing to be read in the queue. + ASSERT_EQ(queue.read(), -1); + + for (size_t i=0; i queue(slot_data_n_bytes, n_slots); + + int write_slot_id = queue.reserve(); + + auto w_metadata = queue.get_metadata_buffer(write_slot_id); + w_metadata->pulse_id = 1; + w_metadata->frame_index = 2; + w_metadata->daq_rec = 3; + w_metadata->is_good_frame = 4; + + auto w_data = (uint16_t*)(queue.get_data_buffer(write_slot_id)); + for (size_t i=0; ipulse_id, + r_metadata->pulse_id); + EXPECT_EQ(w_metadata->frame_index, + r_metadata->frame_index); + EXPECT_EQ(w_metadata->daq_rec, + r_metadata->daq_rec); + EXPECT_EQ(w_metadata->is_good_frame, + r_metadata->is_good_frame); + + auto r_data = (uint16_t*)(queue.get_data_buffer(read_slot_id)); + for (size_t i=0; i queue( + n_modules * MODULE_N_BYTES, + WRITER_FASTQUEUE_N_SLOTS); + + ModuleFrame frame; + + auto slot_id = queue.reserve(); + auto metadata = queue.get_metadata_buffer(slot_id); + + for (int i_module=0; i_modulemodule[i_module]; + + frame.pulse_id = i_module; + frame.frame_index = i_module; + frame.daq_rec = i_module; + frame.n_received_packets = i_module; + frame.module_id = i_module; + + ModuleFrame* p_metadata = &module_metadata; + + memcpy(p_metadata, &frame, sizeof(ModuleFrame)); + } + + for (int i_module=0; i_modulemodule[i_module]; + + ASSERT_EQ(module_metadata.pulse_id, i_module); + ASSERT_EQ(module_metadata.frame_index, i_module); + ASSERT_EQ(module_metadata.daq_rec, i_module); + ASSERT_EQ(module_metadata.n_received_packets, i_module); + ASSERT_EQ(module_metadata.module_id, i_module); + } +} \ No newline at end of file