From 86be688dc4ccacb8a76d7837b05a571720a4190b Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 28 Apr 2020 21:43:50 +0200 Subject: [PATCH] Implement fast queue --- core-buffer/include/FastQueue.hpp | 38 +++++++++++ core-buffer/src/FastQueue.cpp | 102 ++++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+) create mode 100644 core-buffer/include/FastQueue.hpp create mode 100644 core-buffer/src/FastQueue.cpp diff --git a/core-buffer/include/FastQueue.hpp b/core-buffer/include/FastQueue.hpp new file mode 100644 index 0000000..84a9139 --- /dev/null +++ b/core-buffer/include/FastQueue.hpp @@ -0,0 +1,38 @@ +#ifndef FASTQUEUE_HPP +#define FASTQUEUE_HPP + +#include +#include +#include + +template +class FastQueue { + const size_t slot_n_bytes_; + const size_t n_slots_; + char* buffer_; + std::atomic_int* buffer_status_; + + uint16_t write_slot_id_; + uint16_t read_slot_id_; + +public: + + int SLOT_EMPTY=0; + int SLOT_RESERVED=1; + int SLOT_READY=1; + + FastQueue(const size_t slot_data_n_bytes, const uint16_t n_slots); + virtual ~FastQueue(); + + T* get_metadata_buffer(const int slot_id); + char* get_data_buffer(const int slot_id); + + int reserve(); + void commit(); + + int read(); + void release(); +}; + + +#endif //FASTQUEUE_HPP diff --git a/core-buffer/src/FastQueue.cpp b/core-buffer/src/FastQueue.cpp new file mode 100644 index 0000000..4ce18b6 --- /dev/null +++ b/core-buffer/src/FastQueue.cpp @@ -0,0 +1,102 @@ +#include +#include +#include "FastQueue.hpp" + +using namespace std; + +template +FastQueue::FastQueue( + const size_t slot_data_n_bytes, + const uint16_t n_slots) : + slot_n_bytes_(slot_data_n_bytes + sizeof(T)), + n_slots_(n_slots) +{ + buffer_ = new char[slot_n_bytes_ * n_slots_]; + buffer_status_ = new atomic_int[n_slots]; + + // TODO: Are atomic variables initialized? + for (size_t i=0; i < n_slots_; i++) { + buffer_status_[i] = 0; + } + + write_slot_id_ = 0; + read_slot_id_ = 0; +} + +template +FastQueue::~FastQueue() +{ + delete[] buffer_; + delete[] buffer_status_; +} + +template +T* FastQueue::get_metadata_buffer(const int slot_id) +{ + return (T*)(buffer_ + (slot_id * slot_n_bytes_)); +} + +template +char* FastQueue::get_data_buffer(const int slot_id) +{ + return (char*)(buffer_ + (slot_id * slot_n_bytes_) + sizeof(T)); +} + +template +int FastQueue::reserve() +{ + // If (buffer_status==SLOT_EMPTY) buffer_status=SLOT_RESERVED. + bool slot_reserved = + buffer_status_[write_slot_id_].compare_exchange_strong( + SLOT_EMPTY, SLOT_RESERVED); + + if (!slot_reserved) { + return -1; + } + + return write_slot_id_; +} + +template +void FastQueue::commit() +{ + // If (buffer_status==SLOT_RESERVED) buffer_status=SLOT_READY. + bool slot_ready = + buffer_status_[write_slot_id_].compare_exchange_strong( + SLOT_RESERVED, SLOT_READY); + + if (!slot_ready) { + throw runtime_error("Slot should be reserved first."); + } + + write_slot_id_++; + write_slot_id_ %= n_slots_; +} + +template +int FastQueue::read() +{ + if (buffer_status_[read_slot_id_] != SLOT_READY) { + return -1; + } + + return read_slot_id_; +} + +template +void FastQueue::release() +{ + // If (buffer_status==SLOT_RESERVED) buffer_status=SLOT_READY. + bool slot_empty = + buffer_status_[read_slot_id_].compare_exchange_strong( + SLOT_READY, SLOT_EMPTY); + + if (!slot_empty) { + throw runtime_error("Slot should be ready first."); + } + + read_slot_id_++; + read_slot_id_ %= n_slots_; +} + +template class FastQueue; \ No newline at end of file