AARE
Data analysis library for PSI hybrid detectors
Loading...
Searching...
No Matches
ProducerConsumerQueue.hpp
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17// @author Bo Hu (bhu@fb.com)
18// @author Jordan DeLong (delong.j@fb.com)
19
20// Changes made by PSD Detector Group:
21// Copied: Line 34 constexpr std::size_t hardware_destructive_interference_size = 128; from folly/lang/Align.h
22// Changed extension to .hpp
23
24#pragma once
25
26#include <atomic>
27#include <cassert>
28#include <cstdlib>
29#include <memory>
30#include <stdexcept>
31#include <type_traits>
32#include <utility>
33
34constexpr std::size_t hardware_destructive_interference_size = 128;
35namespace folly {
36
37/*
38 * ProducerConsumerQueue is a one producer and one consumer queue
39 * without locks.
40 */
41template <class T> struct ProducerConsumerQueue {
42 typedef T value_type;
43
46
47 // size must be >= 2.
48 //
49 // Also, note that the number of usable slots in the queue at any
50 // given time is actually (size-1), so if you start with an empty queue,
51 // isFull() will return true after size-1 insertions.
52 explicit ProducerConsumerQueue(uint32_t size)
53 : size_(size), records_(static_cast<T *>(std::malloc(sizeof(T) * size))), readIndex_(0), writeIndex_(0) {
54 assert(size >= 2);
55 if (!records_) {
56 throw std::bad_alloc();
57 }
58 }
59
61 // We need to destruct anything that may still exist in our queue.
62 // (No real synchronization needed at destructor time: only one
63 // thread can be doing this.)
64 if (!std::is_trivially_destructible<T>::value) {
65 size_t readIndex = readIndex_;
66 size_t endIndex = writeIndex_;
67 while (readIndex != endIndex) {
68 records_[readIndex].~T();
69 if (++readIndex == size_) {
70 readIndex = 0;
71 }
72 }
73 }
74
75 std::free(records_);
76 }
77
78 template <class... Args> bool write(Args &&...recordArgs) {
79 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
80 auto nextRecord = currentWrite + 1;
81 if (nextRecord == size_) {
82 nextRecord = 0;
83 }
84 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
85 new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
86 writeIndex_.store(nextRecord, std::memory_order_release);
87 return true;
88 }
89
90 // queue is full
91 return false;
92 }
93
94 // move (or copy) the value at the front of the queue to given variable
95 bool read(T &record) {
96 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
97 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
98 // queue is empty
99 return false;
100 }
101
102 auto nextRecord = currentRead + 1;
103 if (nextRecord == size_) {
104 nextRecord = 0;
105 }
106 record = std::move(records_[currentRead]);
107 records_[currentRead].~T();
108 readIndex_.store(nextRecord, std::memory_order_release);
109 return true;
110 }
111
112 // pointer to the value at the front of the queue (for use in-place) or
113 // nullptr if empty.
114 T *frontPtr() {
115 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
116 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
117 // queue is empty
118 return nullptr;
119 }
120 return &records_[currentRead];
121 }
122
123 // queue must not be empty
124 void popFront() {
125 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
126 assert(currentRead != writeIndex_.load(std::memory_order_acquire));
127
128 auto nextRecord = currentRead + 1;
129 if (nextRecord == size_) {
130 nextRecord = 0;
131 }
132 records_[currentRead].~T();
133 readIndex_.store(nextRecord, std::memory_order_release);
134 }
135
136 bool isEmpty() const {
137 return readIndex_.load(std::memory_order_acquire) == writeIndex_.load(std::memory_order_acquire);
138 }
139
140 bool isFull() const {
141 auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
142 if (nextRecord == size_) {
143 nextRecord = 0;
144 }
145 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
146 return false;
147 }
148 // queue is full
149 return true;
150 }
151
152 // * If called by consumer, then true size may be more (because producer may
153 // be adding items concurrently).
154 // * If called by producer, then true size may be less (because consumer may
155 // be removing items concurrently).
156 // * It is undefined to call this from any other thread.
157 size_t sizeGuess() const {
158 int ret = writeIndex_.load(std::memory_order_acquire) - readIndex_.load(std::memory_order_acquire);
159 if (ret < 0) {
160 ret += size_;
161 }
162 return ret;
163 }
164
165 // maximum number of items in the queue.
166 size_t capacity() const { return size_ - 1; }
167
168 private:
169 using AtomicIndex = std::atomic<unsigned int>;
170
172 const uint32_t size_;
173 T *const records_;
174
177
179};
180
181} // namespace folly
constexpr std::size_t hardware_destructive_interference_size
Definition ProducerConsumerQueue.hpp:34
Definition ProducerConsumerQueue.hpp:35
Definition ProducerConsumerQueue.hpp:41
char pad1_[hardware_destructive_interference_size - sizeof(AtomicIndex)]
Definition ProducerConsumerQueue.hpp:178
T * frontPtr()
Definition ProducerConsumerQueue.hpp:114
bool read(T &record)
Definition ProducerConsumerQueue.hpp:95
ProducerConsumerQueue(uint32_t size)
Definition ProducerConsumerQueue.hpp:52
AtomicIndex readIndex_
Definition ProducerConsumerQueue.hpp:175
AtomicIndex writeIndex_
Definition ProducerConsumerQueue.hpp:176
bool isEmpty() const
Definition ProducerConsumerQueue.hpp:136
T *const records_
Definition ProducerConsumerQueue.hpp:173
void popFront()
Definition ProducerConsumerQueue.hpp:124
char pad0_[hardware_destructive_interference_size]
Definition ProducerConsumerQueue.hpp:171
size_t capacity() const
Definition ProducerConsumerQueue.hpp:166
T value_type
Definition ProducerConsumerQueue.hpp:42
ProducerConsumerQueue & operator=(const ProducerConsumerQueue &)=delete
const uint32_t size_
Definition ProducerConsumerQueue.hpp:172
bool isFull() const
Definition ProducerConsumerQueue.hpp:140
size_t sizeGuess() const
Definition ProducerConsumerQueue.hpp:157
~ProducerConsumerQueue()
Definition ProducerConsumerQueue.hpp:60
ProducerConsumerQueue(const ProducerConsumerQueue &)=delete
bool write(Args &&...recordArgs)
Definition ProducerConsumerQueue.hpp:78
std::atomic< unsigned int > AtomicIndex
Definition ProducerConsumerQueue.hpp:169