diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 58dbd38..333742d 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -16,6 +16,8 @@ set_property(TARGET core PROPERTY POSITION_INDEPENDENT_CODE ON) if(AARE_TESTS) set(TestSources ${CMAKE_CURRENT_SOURCE_DIR}/src/defs.test.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/src/ProducerConsumerQueue.test.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/src/CircularFifo.test.cpp ) target_sources(tests PRIVATE ${TestSources} ) target_link_libraries(tests PRIVATE core) diff --git a/core/include/aare/CircularFifo.hpp b/core/include/aare/CircularFifo.hpp new file mode 100644 index 0000000..9c01b9a --- /dev/null +++ b/core/include/aare/CircularFifo.hpp @@ -0,0 +1,112 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "aare/ProducerConsumerQueue.hpp" + +namespace aare { + +template class CircularFifo { + uint32_t fifo_size; + folly::ProducerConsumerQueue free_slots; + folly::ProducerConsumerQueue filled_slots; + + public: + CircularFifo() : CircularFifo(100){}; + CircularFifo(uint32_t size) + : fifo_size(size), free_slots(size+1), filled_slots(size+1) { + + // TODO! how do we deal with alignment for writing? alignas??? + // Do we give the user a chance to provide memory locations? + // Templated allocator? + for (size_t i = 0; i < fifo_size; ++i) { + free_slots.write(ItemType{}); + } + } + + bool next(){ + //TODO! avoid default constructing ItemType + ItemType it; + if(!filled_slots.read(it)) + return false; + if(!free_slots.write(std::move(it))) + return false; + return true; + + } + + ~CircularFifo() {} + + using value_type = ItemType; + + auto numFilledSlots() const noexcept { return filled_slots.sizeGuess(); } + auto numFreeSlots() const noexcept { return free_slots.sizeGuess(); } + auto isFull() const noexcept { return filled_slots.isFull(); } + +ItemType pop_free() { + ItemType v; + while (!free_slots.read(v)) + ; + return std::move(v); + // return v; +} + +bool try_pop_free(ItemType& v){ + return free_slots.read(v); +} + +ItemType pop_value(std::chrono::nanoseconds wait, + std::atomic &stopped) { + ItemType v; + while (!filled_slots.read(v) && !stopped) { + std::this_thread::sleep_for(wait); + } + return std::move(v); +} + +ItemType pop_value() { + ItemType v; + while (!filled_slots.read(v)) + ; + return std::move(v); +} + +ItemType* frontPtr(){ + return filled_slots.frontPtr(); +} + +// TODO! Add function to move item from filled to free to be used +// with the frontPtr function + + +template +void push_value(Args&&... recordArgs) { + while (!filled_slots.write(std::forward(recordArgs)...)) + ; +} + +template +bool try_push_value(Args&&... recordArgs) { + return filled_slots.write(std::forward(recordArgs)...); + +} + +template +void push_free(Args&&... recordArgs) { + while (!free_slots.write(std::forward(recordArgs)...)) + ; +} + +template +bool try_push_free(Args&&... recordArgs) { + return free_slots.write(std::forward(recordArgs)...); + +} + +}; + +} // namespace aare \ No newline at end of file diff --git a/core/include/aare/ProducerConsumerQueue.hpp b/core/include/aare/ProducerConsumerQueue.hpp new file mode 100644 index 0000000..b10c327 --- /dev/null +++ b/core/include/aare/ProducerConsumerQueue.hpp @@ -0,0 +1,188 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// @author Bo Hu (bhu@fb.com) +// @author Jordan DeLong (delong.j@fb.com) + +// Changes made by PSD Detector Group: +// Copied: Line 34 constexpr std::size_t hardware_destructive_interference_size = 128; from folly/lang/Align.h +// Changed extension to .hpp + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +constexpr std::size_t hardware_destructive_interference_size = 128; +namespace folly { + +/* + * ProducerConsumerQueue is a one producer and one consumer queue + * without locks. + */ +template +struct ProducerConsumerQueue { + typedef T value_type; + + ProducerConsumerQueue(const ProducerConsumerQueue&) = delete; + ProducerConsumerQueue& operator=(const ProducerConsumerQueue&) = delete; + + // size must be >= 2. + // + // Also, note that the number of usable slots in the queue at any + // given time is actually (size-1), so if you start with an empty queue, + // isFull() will return true after size-1 insertions. + explicit ProducerConsumerQueue(uint32_t size) + : size_(size), + records_(static_cast(std::malloc(sizeof(T) * size))), + readIndex_(0), + writeIndex_(0) { + assert(size >= 2); + if (!records_) { + throw std::bad_alloc(); + } + } + + ~ProducerConsumerQueue() { + // We need to destruct anything that may still exist in our queue. + // (No real synchronization needed at destructor time: only one + // thread can be doing this.) + if (!std::is_trivially_destructible::value) { + size_t readIndex = readIndex_; + size_t endIndex = writeIndex_; + while (readIndex != endIndex) { + records_[readIndex].~T(); + if (++readIndex == size_) { + readIndex = 0; + } + } + } + + std::free(records_); + } + + template + bool write(Args&&... recordArgs) { + auto const currentWrite = writeIndex_.load(std::memory_order_relaxed); + auto nextRecord = currentWrite + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + if (nextRecord != readIndex_.load(std::memory_order_acquire)) { + new (&records_[currentWrite]) T(std::forward(recordArgs)...); + writeIndex_.store(nextRecord, std::memory_order_release); + return true; + } + + // queue is full + return false; + } + + // move (or copy) the value at the front of the queue to given variable + bool read(T& record) { + auto const currentRead = readIndex_.load(std::memory_order_relaxed); + if (currentRead == writeIndex_.load(std::memory_order_acquire)) { + // queue is empty + return false; + } + + auto nextRecord = currentRead + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + record = std::move(records_[currentRead]); + records_[currentRead].~T(); + readIndex_.store(nextRecord, std::memory_order_release); + return true; + } + + // pointer to the value at the front of the queue (for use in-place) or + // nullptr if empty. + T* frontPtr() { + auto const currentRead = readIndex_.load(std::memory_order_relaxed); + if (currentRead == writeIndex_.load(std::memory_order_acquire)) { + // queue is empty + return nullptr; + } + return &records_[currentRead]; + } + + // queue must not be empty + void popFront() { + auto const currentRead = readIndex_.load(std::memory_order_relaxed); + assert(currentRead != writeIndex_.load(std::memory_order_acquire)); + + auto nextRecord = currentRead + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + records_[currentRead].~T(); + readIndex_.store(nextRecord, std::memory_order_release); + } + + bool isEmpty() const { + return readIndex_.load(std::memory_order_acquire) == + writeIndex_.load(std::memory_order_acquire); + } + + bool isFull() const { + auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + if (nextRecord != readIndex_.load(std::memory_order_acquire)) { + return false; + } + // queue is full + return true; + } + + // * If called by consumer, then true size may be more (because producer may + // be adding items concurrently). + // * If called by producer, then true size may be less (because consumer may + // be removing items concurrently). + // * It is undefined to call this from any other thread. + size_t sizeGuess() const { + int ret = writeIndex_.load(std::memory_order_acquire) - + readIndex_.load(std::memory_order_acquire); + if (ret < 0) { + ret += size_; + } + return ret; + } + + // maximum number of items in the queue. + size_t capacity() const { return size_ - 1; } + + private: + using AtomicIndex = std::atomic; + + char pad0_[hardware_destructive_interference_size]; + const uint32_t size_; + T* const records_; + + alignas(hardware_destructive_interference_size) AtomicIndex readIndex_; + alignas(hardware_destructive_interference_size) AtomicIndex writeIndex_; + + char pad1_[hardware_destructive_interference_size - sizeof(AtomicIndex)]; +}; + +} // namespace folly \ No newline at end of file diff --git a/core/src/CircularFifo.test.cpp b/core/src/CircularFifo.test.cpp new file mode 100644 index 0000000..973142a --- /dev/null +++ b/core/src/CircularFifo.test.cpp @@ -0,0 +1,116 @@ +#include + +#include "aare/CircularFifo.hpp" + +using aare::CircularFifo; + +// Only for testing. To make sure we can avoid copy constructor +// and copy assignment + +struct MoveOnlyInt { + int value{}; + + MoveOnlyInt() = default; + MoveOnlyInt(int i) : value(i){}; + MoveOnlyInt(const MoveOnlyInt &) = delete; + MoveOnlyInt &operator=(const MoveOnlyInt &) = delete; + MoveOnlyInt(MoveOnlyInt &&other) { std::swap(value, other.value); } + MoveOnlyInt &operator=(MoveOnlyInt &&other) { + std::swap(value, other.value); + return *this; + } + bool operator==(int other) const { return value == other; } +}; + +TEST_CASE("CircularFifo can be default constructed") { CircularFifo f; } + +TEST_CASE("Newly constructed fifo has the right size") { + size_t size = 17; + CircularFifo f(size); + CHECK(f.numFreeSlots() == size); + CHECK(f.numFilledSlots() == 0); +} + +TEST_CASE("Can fit size number of objects") { + size_t size = 8; + size_t numPushedItems = 0; + CircularFifo f(size); + for (size_t i = 0; i < size; ++i) { + MoveOnlyInt a; + bool popped = f.try_pop_free(a); + CHECK(popped); + if (popped) { + a.value = i; + bool pushed = f.try_push_value(std::move(a)); + CHECK(pushed); + if (pushed) + numPushedItems++; + } + } + CHECK(f.numFreeSlots() == 0); + CHECK(f.numFilledSlots() == size); + CHECK(numPushedItems == size); +} + +TEST_CASE("Push move only type") { + CircularFifo f; + f.push_value(5); +} + +TEST_CASE("Push pop") { + CircularFifo f; + f.push_value(MoveOnlyInt(1)); + + auto a = f.pop_value(); + CHECK(a == 1); +} + +TEST_CASE("Pop free and then push") { + CircularFifo f; + + auto a = f.pop_free(); + a.value = 5; + f.push_value(std::move(a)); // Explicit move since we can't copy + auto b = f.pop_value(); + + CHECK(a == 0); // Moved from value + CHECK(b == 5); // Original value +} + +TEST_CASE("Skip the first value") { + CircularFifo f; + + for (int i = 0; i != 10; ++i) { + auto a = f.pop_free(); + a.value = i + 1; + f.push_value(std::move(a)); // Explicit move since we can't copy + } + + auto b = f.pop_value(); + CHECK(b == 1); + f.next(); + auto c = f.pop_value(); + CHECK(c == 3); +} + +TEST_CASE("Use in place and move to free") { + size_t size = 18; + CircularFifo f(size); + + //Push 10 values to the fifo + for (int i = 0; i != 10; ++i) { + auto a = f.pop_free(); + a.value = i + 1; + f.push_value(std::move(a)); // Explicit move since we can't copy + } + + auto b = f.frontPtr(); + CHECK(*b == 1); + CHECK(f.numFilledSlots() == 10); + CHECK(f.numFreeSlots() == size-10); + f.next(); + auto c = f.frontPtr(); + CHECK(*c == 2); + CHECK(f.numFilledSlots() == 9); + CHECK(f.numFreeSlots() == size-9); +} diff --git a/core/src/ProducerConsumerQueue.test.cpp b/core/src/ProducerConsumerQueue.test.cpp new file mode 100644 index 0000000..a89ebad --- /dev/null +++ b/core/src/ProducerConsumerQueue.test.cpp @@ -0,0 +1,45 @@ +#include +#include "aare/ProducerConsumerQueue.hpp" + +// using arve::SimpleQueue; +TEST_CASE("push pop"){ + + folly::ProducerConsumerQueue q(5); + int a = 3; + int b = 8; + CHECK(q.sizeGuess() == 0); + CHECK(q.write(a)); + CHECK(q.sizeGuess() == 1); + CHECK(q.write(b)); + CHECK(q.sizeGuess() == 2); + int c = 0; + + CHECK(q.read(c)); + CHECK(c == 3); + CHECK(q.sizeGuess() == 1); + CHECK(q.read(c)); + CHECK(c == 8); + CHECK(q.sizeGuess() == 0); +} + +TEST_CASE("Cannot push to a full queue"){ + folly::ProducerConsumerQueue q(3); + int a = 3; + int b = 4; + int c = 0; + CHECK(q.write(a)); + CHECK(q.write(b)); + CHECK_FALSE(q.write(a)); + + //values are still ok + CHECK(q.read(c)); + CHECK(c == 3); + CHECK(q.read(c)); + CHECK(c == 4); +} + +TEST_CASE("Cannot pop from an empty queue"){ + folly::ProducerConsumerQueue q(2); + int a=0; + CHECK_FALSE(q.read(a)); +} \ No newline at end of file