Compare commits

...

3 Commits

Author SHA1 Message Date
Khalil Ferjaoui
e412e72e5b Add test script to ensure no leaks in PC queue
All checks were successful
Build on RHEL8 / build (push) Successful in 3m15s
Build on RHEL9 / build (push) Successful in 3m34s
2025-09-15 10:15:42 +02:00
Erik Fröjdh
9008393955 Merge branch 'main' into fix/spsc-move-assign-leak
All checks were successful
Build on RHEL8 / build (push) Successful in 3m13s
Build on RHEL9 / build (push) Successful in 3m21s
2025-09-12 08:47:05 +02:00
Khalil Ferjaoui
a8b87e6b53 Fix: make ProducerConsumerQueue move-safe 2025-09-04 17:55:08 +02:00
3 changed files with 153 additions and 6 deletions

View File

@@ -45,19 +45,63 @@ template <class T> struct ProducerConsumerQueue {
ProducerConsumerQueue(const ProducerConsumerQueue &) = delete;
ProducerConsumerQueue &operator=(const ProducerConsumerQueue &) = delete;
ProducerConsumerQueue(ProducerConsumerQueue &&other) {
// ProducerConsumerQueue(ProducerConsumerQueue &&other) {
// size_ = other.size_;
// records_ = other.records_;
// other.records_ = nullptr;
// readIndex_ = other.readIndex_.load(std::memory_order_acquire);
// writeIndex_ = other.writeIndex_.load(std::memory_order_acquire);
// }
ProducerConsumerQueue(ProducerConsumerQueue&& other) noexcept {
size_ = other.size_;
records_ = other.records_;
readIndex_.store(other.readIndex_.load(std::memory_order_acquire),
std::memory_order_relaxed);
writeIndex_.store(other.writeIndex_.load(std::memory_order_acquire),
std::memory_order_relaxed);
other.records_ = nullptr;
readIndex_ = other.readIndex_.load(std::memory_order_acquire);
writeIndex_ = other.writeIndex_.load(std::memory_order_acquire);
other.size_ = 0;
other.readIndex_.store(0, std::memory_order_relaxed);
other.writeIndex_.store(0, std::memory_order_relaxed);
}
ProducerConsumerQueue &operator=(ProducerConsumerQueue &&other) {
// ProducerConsumerQueue &operator=(ProducerConsumerQueue &&other) {
// size_ = other.size_;
// records_ = other.records_;
// other.records_ = nullptr;
// readIndex_ = other.readIndex_.load(std::memory_order_acquire);
// writeIndex_ = other.writeIndex_.load(std::memory_order_acquire);
// return *this;
// }
ProducerConsumerQueue& operator=(ProducerConsumerQueue&& other) {
if (this == &other) return *this;
//Destroy existing elements and free old storage
if (records_ && !std::is_trivially_destructible<T>::value) {
size_t r = readIndex_.load(std::memory_order_relaxed);
size_t w = writeIndex_.load(std::memory_order_relaxed);
while (r != w) {
records_[r].~T();
if (++r == size_) r = 0;
}
}
std::free(records_);
//Steal other's state
size_ = other.size_;
records_ = other.records_;
readIndex_.store( other.readIndex_.load(std::memory_order_acquire), std::memory_order_relaxed );
writeIndex_.store( other.writeIndex_.load(std::memory_order_acquire), std::memory_order_relaxed );
//leave 'other' empty and harmless
other.records_ = nullptr;
readIndex_ = other.readIndex_.load(std::memory_order_acquire);
writeIndex_ = other.writeIndex_.load(std::memory_order_acquire);
other.size_ = 0;
other.readIndex_.store(0, std::memory_order_relaxed);
other.writeIndex_.store(0, std::memory_order_relaxed);
return *this;
}

20
src/Makefile Normal file
View File

@@ -0,0 +1,20 @@
# Makefile
CXX := g++
CXXFLAGS := -std=c++17 -O0 -g
INCLUDE := -I../include
SRC := ProducerConsumerQueue.test.cpp
BIN := test_pcq
.PHONY: all clean run
all: $(BIN)
$(BIN): $(SRC)
$(CXX) $(CXXFLAGS) $(INCLUDE) $< -o $@
run: $(BIN)
./$(BIN)
clean:
$(RM) $(BIN)

View File

@@ -0,0 +1,83 @@
#include <iostream>
#include <atomic>
#include <string>
#include <vector>
#include "aare/ProducerConsumerQueue.hpp"
struct Tracker {
static std::atomic<int> ctors;
static std::atomic<int> dtors;
static std::atomic<int> moves;
static std::atomic<int> live;
std::string tag;
std::vector<int> buf;
Tracker() = delete;
explicit Tracker(int id)
: tag("T" + std::to_string(id)), buf(1 << 18, id)
{
++ctors; ++live;
}
Tracker(Tracker&& other) noexcept
: tag(std::move(other.tag)), buf(std::move(other.buf))
{
++moves;
++ctors;
++live;
}
Tracker& operator=(Tracker&&) = delete;
Tracker(const Tracker&) = delete;
Tracker& operator=(const Tracker&) = delete;
~Tracker()
{
++dtors; --live;
}
};
std::atomic<int> Tracker::ctors{0};
std::atomic<int> Tracker::dtors{0};
std::atomic<int> Tracker::moves{0};
std::atomic<int> Tracker::live{0};
int main() {
using Queue = aare::ProducerConsumerQueue<Tracker>;
// Scope make sure destructors have ran before we check the counters.
{
Queue q1(8);
Queue q2(8);
for (int i = 0; i < 3; ++i) q2.write(Tracker(100 + i));
for (int i = 0; i < 5; ++i) q1.write(Tracker(200 + i));
q2 = std::move(q1);
Tracker tmp(9999);
if (auto* p = q2.frontPtr())
{
(void)p;
}
}
std::cout << "ctors=" << Tracker::ctors.load()
<< " dtors=" << Tracker::dtors.load()
<< " moves=" << Tracker::moves.load()
<< " live=" << Tracker::live.load()
<< "\n";
bool ok = (Tracker::ctors.load() == Tracker::dtors.load()) && (Tracker::live.load() == 0);
if (!ok)
{
std::cerr << "Leak or skipped destructors detected (move-assignment bug)\n";
return 1;
}
std::cout << "No leaks; move-assignment cleans up correctly\n";
return 0;
}