added Queue and Fifo

This commit is contained in:
Erik Frojdh 2024-03-20 11:54:57 +01:00
parent 4da9bc0813
commit b096cf8dd3
5 changed files with 463 additions and 0 deletions

View File

@ -16,6 +16,8 @@ set_property(TARGET core PROPERTY POSITION_INDEPENDENT_CODE ON)
if(AARE_TESTS)
set(TestSources
${CMAKE_CURRENT_SOURCE_DIR}/src/defs.test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ProducerConsumerQueue.test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/CircularFifo.test.cpp
)
target_sources(tests PRIVATE ${TestSources} )
target_link_libraries(tests PRIVATE core)

View File

@ -0,0 +1,112 @@
#pragma once
#include <chrono>
#include <fmt/color.h>
#include <fmt/format.h>
#include <memory>
#include <thread>
#include "aare/ProducerConsumerQueue.hpp"
namespace aare {
template <class ItemType> class CircularFifo {
uint32_t fifo_size;
folly::ProducerConsumerQueue<ItemType> free_slots;
folly::ProducerConsumerQueue<ItemType> filled_slots;
public:
CircularFifo() : CircularFifo(100){};
CircularFifo(uint32_t size)
: fifo_size(size), free_slots(size+1), filled_slots(size+1) {
// TODO! how do we deal with alignment for writing? alignas???
// Do we give the user a chance to provide memory locations?
// Templated allocator?
for (size_t i = 0; i < fifo_size; ++i) {
free_slots.write(ItemType{});
}
}
bool next(){
//TODO! avoid default constructing ItemType
ItemType it;
if(!filled_slots.read(it))
return false;
if(!free_slots.write(std::move(it)))
return false;
return true;
}
~CircularFifo() {}
using value_type = ItemType;
auto numFilledSlots() const noexcept { return filled_slots.sizeGuess(); }
auto numFreeSlots() const noexcept { return free_slots.sizeGuess(); }
auto isFull() const noexcept { return filled_slots.isFull(); }
ItemType pop_free() {
ItemType v;
while (!free_slots.read(v))
;
return std::move(v);
// return v;
}
bool try_pop_free(ItemType& v){
return free_slots.read(v);
}
ItemType pop_value(std::chrono::nanoseconds wait,
std::atomic<bool> &stopped) {
ItemType v;
while (!filled_slots.read(v) && !stopped) {
std::this_thread::sleep_for(wait);
}
return std::move(v);
}
ItemType pop_value() {
ItemType v;
while (!filled_slots.read(v))
;
return std::move(v);
}
ItemType* frontPtr(){
return filled_slots.frontPtr();
}
// TODO! Add function to move item from filled to free to be used
// with the frontPtr function
template <class... Args>
void push_value(Args&&... recordArgs) {
while (!filled_slots.write(std::forward<Args>(recordArgs)...))
;
}
template <class... Args>
bool try_push_value(Args&&... recordArgs) {
return filled_slots.write(std::forward<Args>(recordArgs)...);
}
template <class... Args>
void push_free(Args&&... recordArgs) {
while (!free_slots.write(std::forward<Args>(recordArgs)...))
;
}
template <class... Args>
bool try_push_free(Args&&... recordArgs) {
return free_slots.write(std::forward<Args>(recordArgs)...);
}
};
} // namespace aare

View File

@ -0,0 +1,188 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// @author Bo Hu (bhu@fb.com)
// @author Jordan DeLong (delong.j@fb.com)
// Changes made by PSD Detector Group:
// Copied: Line 34 constexpr std::size_t hardware_destructive_interference_size = 128; from folly/lang/Align.h
// Changed extension to .hpp
#pragma once
#include <atomic>
#include <cassert>
#include <cstdlib>
#include <memory>
#include <stdexcept>
#include <type_traits>
#include <utility>
constexpr std::size_t hardware_destructive_interference_size = 128;
namespace folly {
/*
* ProducerConsumerQueue is a one producer and one consumer queue
* without locks.
*/
template <class T>
struct ProducerConsumerQueue {
typedef T value_type;
ProducerConsumerQueue(const ProducerConsumerQueue&) = delete;
ProducerConsumerQueue& operator=(const ProducerConsumerQueue&) = delete;
// size must be >= 2.
//
// Also, note that the number of usable slots in the queue at any
// given time is actually (size-1), so if you start with an empty queue,
// isFull() will return true after size-1 insertions.
explicit ProducerConsumerQueue(uint32_t size)
: size_(size),
records_(static_cast<T*>(std::malloc(sizeof(T) * size))),
readIndex_(0),
writeIndex_(0) {
assert(size >= 2);
if (!records_) {
throw std::bad_alloc();
}
}
~ProducerConsumerQueue() {
// We need to destruct anything that may still exist in our queue.
// (No real synchronization needed at destructor time: only one
// thread can be doing this.)
if (!std::is_trivially_destructible<T>::value) {
size_t readIndex = readIndex_;
size_t endIndex = writeIndex_;
while (readIndex != endIndex) {
records_[readIndex].~T();
if (++readIndex == size_) {
readIndex = 0;
}
}
}
std::free(records_);
}
template <class... Args>
bool write(Args&&... recordArgs) {
auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
auto nextRecord = currentWrite + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
writeIndex_.store(nextRecord, std::memory_order_release);
return true;
}
// queue is full
return false;
}
// move (or copy) the value at the front of the queue to given variable
bool read(T& record) {
auto const currentRead = readIndex_.load(std::memory_order_relaxed);
if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
// queue is empty
return false;
}
auto nextRecord = currentRead + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
record = std::move(records_[currentRead]);
records_[currentRead].~T();
readIndex_.store(nextRecord, std::memory_order_release);
return true;
}
// pointer to the value at the front of the queue (for use in-place) or
// nullptr if empty.
T* frontPtr() {
auto const currentRead = readIndex_.load(std::memory_order_relaxed);
if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
// queue is empty
return nullptr;
}
return &records_[currentRead];
}
// queue must not be empty
void popFront() {
auto const currentRead = readIndex_.load(std::memory_order_relaxed);
assert(currentRead != writeIndex_.load(std::memory_order_acquire));
auto nextRecord = currentRead + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
records_[currentRead].~T();
readIndex_.store(nextRecord, std::memory_order_release);
}
bool isEmpty() const {
return readIndex_.load(std::memory_order_acquire) ==
writeIndex_.load(std::memory_order_acquire);
}
bool isFull() const {
auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
return false;
}
// queue is full
return true;
}
// * If called by consumer, then true size may be more (because producer may
// be adding items concurrently).
// * If called by producer, then true size may be less (because consumer may
// be removing items concurrently).
// * It is undefined to call this from any other thread.
size_t sizeGuess() const {
int ret = writeIndex_.load(std::memory_order_acquire) -
readIndex_.load(std::memory_order_acquire);
if (ret < 0) {
ret += size_;
}
return ret;
}
// maximum number of items in the queue.
size_t capacity() const { return size_ - 1; }
private:
using AtomicIndex = std::atomic<unsigned int>;
char pad0_[hardware_destructive_interference_size];
const uint32_t size_;
T* const records_;
alignas(hardware_destructive_interference_size) AtomicIndex readIndex_;
alignas(hardware_destructive_interference_size) AtomicIndex writeIndex_;
char pad1_[hardware_destructive_interference_size - sizeof(AtomicIndex)];
};
} // namespace folly

View File

@ -0,0 +1,116 @@
#include <catch2/catch_all.hpp>
#include "aare/CircularFifo.hpp"
using aare::CircularFifo;
// Only for testing. To make sure we can avoid copy constructor
// and copy assignment
struct MoveOnlyInt {
int value{};
MoveOnlyInt() = default;
MoveOnlyInt(int i) : value(i){};
MoveOnlyInt(const MoveOnlyInt &) = delete;
MoveOnlyInt &operator=(const MoveOnlyInt &) = delete;
MoveOnlyInt(MoveOnlyInt &&other) { std::swap(value, other.value); }
MoveOnlyInt &operator=(MoveOnlyInt &&other) {
std::swap(value, other.value);
return *this;
}
bool operator==(int other) const { return value == other; }
};
TEST_CASE("CircularFifo can be default constructed") { CircularFifo<MoveOnlyInt> f; }
TEST_CASE("Newly constructed fifo has the right size") {
size_t size = 17;
CircularFifo<MoveOnlyInt> f(size);
CHECK(f.numFreeSlots() == size);
CHECK(f.numFilledSlots() == 0);
}
TEST_CASE("Can fit size number of objects") {
size_t size = 8;
size_t numPushedItems = 0;
CircularFifo<MoveOnlyInt> f(size);
for (size_t i = 0; i < size; ++i) {
MoveOnlyInt a;
bool popped = f.try_pop_free(a);
CHECK(popped);
if (popped) {
a.value = i;
bool pushed = f.try_push_value(std::move(a));
CHECK(pushed);
if (pushed)
numPushedItems++;
}
}
CHECK(f.numFreeSlots() == 0);
CHECK(f.numFilledSlots() == size);
CHECK(numPushedItems == size);
}
TEST_CASE("Push move only type") {
CircularFifo<MoveOnlyInt> f;
f.push_value(5);
}
TEST_CASE("Push pop") {
CircularFifo<MoveOnlyInt> f;
f.push_value(MoveOnlyInt(1));
auto a = f.pop_value();
CHECK(a == 1);
}
TEST_CASE("Pop free and then push") {
CircularFifo<MoveOnlyInt> f;
auto a = f.pop_free();
a.value = 5;
f.push_value(std::move(a)); // Explicit move since we can't copy
auto b = f.pop_value();
CHECK(a == 0); // Moved from value
CHECK(b == 5); // Original value
}
TEST_CASE("Skip the first value") {
CircularFifo<MoveOnlyInt> f;
for (int i = 0; i != 10; ++i) {
auto a = f.pop_free();
a.value = i + 1;
f.push_value(std::move(a)); // Explicit move since we can't copy
}
auto b = f.pop_value();
CHECK(b == 1);
f.next();
auto c = f.pop_value();
CHECK(c == 3);
}
TEST_CASE("Use in place and move to free") {
size_t size = 18;
CircularFifo<MoveOnlyInt> f(size);
//Push 10 values to the fifo
for (int i = 0; i != 10; ++i) {
auto a = f.pop_free();
a.value = i + 1;
f.push_value(std::move(a)); // Explicit move since we can't copy
}
auto b = f.frontPtr();
CHECK(*b == 1);
CHECK(f.numFilledSlots() == 10);
CHECK(f.numFreeSlots() == size-10);
f.next();
auto c = f.frontPtr();
CHECK(*c == 2);
CHECK(f.numFilledSlots() == 9);
CHECK(f.numFreeSlots() == size-9);
}

View File

@ -0,0 +1,45 @@
#include <catch2/catch_all.hpp>
#include "aare/ProducerConsumerQueue.hpp"
// using arve::SimpleQueue;
TEST_CASE("push pop"){
folly::ProducerConsumerQueue<int> q(5);
int a = 3;
int b = 8;
CHECK(q.sizeGuess() == 0);
CHECK(q.write(a));
CHECK(q.sizeGuess() == 1);
CHECK(q.write(b));
CHECK(q.sizeGuess() == 2);
int c = 0;
CHECK(q.read(c));
CHECK(c == 3);
CHECK(q.sizeGuess() == 1);
CHECK(q.read(c));
CHECK(c == 8);
CHECK(q.sizeGuess() == 0);
}
TEST_CASE("Cannot push to a full queue"){
folly::ProducerConsumerQueue<int> q(3);
int a = 3;
int b = 4;
int c = 0;
CHECK(q.write(a));
CHECK(q.write(b));
CHECK_FALSE(q.write(a));
//values are still ok
CHECK(q.read(c));
CHECK(c == 3);
CHECK(q.read(c));
CHECK(c == 4);
}
TEST_CASE("Cannot pop from an empty queue"){
folly::ProducerConsumerQueue<int> q(2);
int a=0;
CHECK_FALSE(q.read(a));
}