Rewrote the multithreading to fix frame loss
Some checks failed
Build on RHEL8 / build (push) Failing after 3m21s
Build on RHEL9 / build (push) Failing after 3m32s

This commit is contained in:
2026-01-06 14:09:51 +01:00
parent c035491b63
commit 055feb2290
5 changed files with 316 additions and 71 deletions

View File

@@ -0,0 +1,59 @@
#pragma once
#include <atomic>
#include <cstdint>
#include <memory>
#include <thread>
#include <vector>
#include <condition_variable>
#include <mutex>
#include <deque>
#include "aare/ClusterFinder.hpp"
#include "aare/NDArray.hpp"
#include "aare/logger.hpp"
template <typename T>
class BlockingQueue {
std::mutex mtx;
std::condition_variable cv_push, cv_pop;
std::deque<T> queue;
size_t max_size;
bool closed = false;
public:
BlockingQueue(size_t capacity) : max_size(capacity) {}
void push(T item) {
std::unique_lock lock(mtx);
cv_push.wait(lock, [this] { return queue.size() < max_size; });
queue.push_back(std::move(item));
cv_pop.notify_one();
}
T pop() {
std::unique_lock lock(mtx);
cv_pop.wait(lock, [this] { return !queue.empty(); });
T item = std::move(queue.front());
queue.pop_front();
cv_push.notify_one();
return item;
}
void close() {
std::lock_guard lock(mtx);
closed = true;
cv_pop.notify_all();
cv_push.notify_all();
}
bool empty() {
std::lock_guard lock(mtx);
return queue.empty();
}
void write(T item) {push(item);}
bool isEmpty() {return empty();}
T frontPtr() {return pop();}
T popFront() {return pop();}
};

View File

@@ -5,13 +5,15 @@
#include "aare/ClusterFinderMT.hpp"
#include "aare/ClusterVector.hpp"
#include "aare/ProducerConsumerQueue.hpp"
#include "aare/BlockingQueue.hpp"
namespace aare {
template <typename ClusterType,
typename = std::enable_if_t<is_cluster_v<ClusterType>>>
class ClusterCollector {
ProducerConsumerQueue<ClusterVector<ClusterType>> *m_source;
// ProducerConsumerQueue<ClusterVector<ClusterType>> *m_source;
BlockingQueue<ClusterVector<ClusterType>> *m_source;
std::atomic<bool> m_stop_requested{false};
std::atomic<bool> m_stopped{true};
std::chrono::milliseconds m_default_wait{1};
@@ -19,19 +21,47 @@ class ClusterCollector {
std::vector<ClusterVector<ClusterType>> m_clusters;
void process() {
// m_stopped = false;
// fmt::print("ClusterCollector started\n");
// // while (!m_stop_requested || !m_source->isEmpty()) {
// while (true) {
// if (clusters.frame_number() == -1)
// break;
// ClusterVector<ClusterType> clusters = m_source->pop();
// m_clusters.push_back(std::move(clusters));
// // if (ClusterVector<ClusterType> *clusters = m_source->frontPtr();
// // clusters != nullptr) {
// // m_clusters.push_back(std::move(*clusters));
// // m_source->popFront();
// // } else {
// // std::this_thread::sleep_for(m_default_wait);
// // }
// }
// fmt::print("ClusterCollector stopped\n");
// m_stopped = true;
m_stopped = false;
fmt::print("ClusterCollector started\n");
while (!m_stop_requested || !m_source->isEmpty()) {
if (ClusterVector<ClusterType> *clusters = m_source->frontPtr();
clusters != nullptr) {
m_clusters.push_back(std::move(*clusters));
m_source->popFront();
} else {
std::this_thread::sleep_for(m_default_wait);
while (true) {
// pop blocks until there is data
ClusterVector<ClusterType> clusters = m_source->pop();
// POISON DETECTION
if (clusters.frame_number() == -1) {
fmt::print("ClusterCollector received poison frame, stopping\n");
break; // exit loop cleanly
}
// NORMAL DATA: store or process
m_clusters.push_back(std::move(clusters));
}
fmt::print("ClusterCollector stopped\n");
m_stopped = true;
}
public:

View File

@@ -6,13 +6,15 @@
#include "aare/ClusterFinderMT.hpp"
#include "aare/ClusterVector.hpp"
#include "aare/ProducerConsumerQueue.hpp"
#include "aare/BlockingQueue.hpp"
namespace aare {
template <typename ClusterType,
typename = std::enable_if_t<is_cluster_v<ClusterType>>>
class ClusterFileSink {
ProducerConsumerQueue<ClusterVector<ClusterType>> *m_source;
// ProducerConsumerQueue<ClusterVector<ClusterType>> *m_source;
BlockingQueue<ClusterVector<ClusterType>> *m_source;
std::atomic<bool> m_stop_requested{false};
std::atomic<bool> m_stopped{true};
std::chrono::milliseconds m_default_wait{1};
@@ -20,29 +22,60 @@ class ClusterFileSink {
std::ofstream m_file;
void process() {
m_stopped = false;
// m_stopped = false;
// LOG(logDEBUG) << "ClusterFileSink started";
// while (!m_stop_requested || !m_source->isEmpty()) {
// // if (ClusterVector<ClusterType> *clusters = m_source->pop(); m_source->frontPtr();
// // clusters != nullptr) {
// {
// ClusterVector<ClusterType> clusters = m_source->pop();
// // Write clusters to file
// int32_t frame_number =
// clusters->frame_number(); // TODO! Should we store frame
// // number already as int?
// uint32_t num_clusters = clusters.size();
// if (frame_number >= 9910 && frame_number <= 9930)
// std::cout << "prcoess: frame_number = " << frame_number << std::endl;
// m_file.write(reinterpret_cast<const char *>(&frame_number),
// sizeof(frame_number));
// m_file.write(reinterpret_cast<const char *>(&num_clusters),
// sizeof(num_clusters));
// m_file.write(reinterpret_cast<const char *>(clusters.data()),
// clusters.size() * clusters.item_size());
// m_source->popFront();
// }
// // else {
// // std::this_thread::sleep_for(m_default_wait);
// // }
// }
// LOG(logDEBUG) << "ClusterFileSink stopped";
// m_stopped = true;
LOG(logDEBUG) << "ClusterFileSink started";
while (!m_stop_requested || !m_source->isEmpty()) {
if (ClusterVector<ClusterType> *clusters = m_source->frontPtr();
clusters != nullptr) {
// Write clusters to file
int32_t frame_number =
clusters->frame_number(); // TODO! Should we store frame
// number already as int?
uint32_t num_clusters = clusters->size();
m_file.write(reinterpret_cast<const char *>(&frame_number),
sizeof(frame_number));
m_file.write(reinterpret_cast<const char *>(&num_clusters),
sizeof(num_clusters));
m_file.write(reinterpret_cast<const char *>(clusters->data()),
clusters->size() * clusters->item_size());
m_source->popFront();
} else {
std::this_thread::sleep_for(m_default_wait);
while (true) {
ClusterVector<ClusterType> clusters = m_source->pop(); // blocks
// POISON PILL CHECK
if (clusters.frame_number() == -1) {
LOG(logDEBUG) << "ClusterFileSink received poison pill";
break;
}
int32_t frame_number = clusters.frame_number();
uint32_t num_clusters = clusters.size();
m_file.write(reinterpret_cast<const char*>(&frame_number),
sizeof(frame_number));
m_file.write(reinterpret_cast<const char*>(&num_clusters),
sizeof(num_clusters));
m_file.write(reinterpret_cast<const char*>(clusters.data()),
clusters.size() * clusters.item_size());
}
LOG(logDEBUG) << "ClusterFileSink stopped";
m_stopped = true;
}
public:

View File

@@ -87,6 +87,11 @@ class ClusterFinder {
// even amount of pixels around the center
int has_center_pixel_y = ClusterSizeY % 2;
// if (frame_number >= 8000 && frame_number <= 9000)
// // std::cout << "find_clusters: frame_number = " << frame_number << std::endl;
// std::cout << frame_number << std::endl;
m_clusters.set_frame_number(frame_number);
for (int iy = 0; iy < frame.shape(0); iy++) {
for (int ix = 0; ix < frame.shape(1); ix++) {

View File

@@ -4,10 +4,14 @@
#include <memory>
#include <thread>
#include <vector>
#include <condition_variable>
#include <mutex>
#include <deque>
#include "aare/ClusterFinder.hpp"
#include "aare/NDArray.hpp"
#include "aare/ProducerConsumerQueue.hpp"
#include "aare/BlockingQueue.hpp"
#include "aare/logger.hpp"
namespace aare {
@@ -23,6 +27,14 @@ struct FrameWrapper {
NDArray<uint16_t, 2> data;
};
static FrameWrapper make_poison_frame() {
return FrameWrapper{FrameType::DATA, UINT64_MAX, NDArray<uint16_t,2>()};
}
static bool is_poison(const FrameWrapper& f) {
return f.frame_number == UINT64_MAX;
}
/**
* @brief ClusterFinderMT is a multi-threaded version of ClusterFinder. It uses
* a producer-consumer queue to distribute the frames to the threads. The
@@ -37,11 +49,15 @@ class ClusterFinderMT {
protected:
using CT = typename ClusterType::value_type;
size_t m_current_thread{0};
// size_t m_current_thread{0};
std::atomic<size_t> m_current_thread{0};
size_t m_n_threads{0};
using Finder = ClusterFinder<ClusterType, FRAME_TYPE, PEDESTAL_TYPE>;
using InputQueue = ProducerConsumerQueue<FrameWrapper>;
using OutputQueue = ProducerConsumerQueue<ClusterVector<ClusterType>>;
// using InputQueue = ProducerConsumerQueue<FrameWrapper>;
// using OutputQueue = ProducerConsumerQueue<ClusterVector<ClusterType>>;
using InputQueue = BlockingQueue<FrameWrapper>;
using OutputQueue = BlockingQueue<ClusterVector<ClusterType>>;
std::vector<std::unique_ptr<InputQueue>> m_input_queues;
std::vector<std::unique_ptr<OutputQueue>> m_output_queues;
@@ -50,41 +66,72 @@ class ClusterFinderMT {
std::vector<std::unique_ptr<Finder>> m_cluster_finders;
std::vector<std::thread> m_threads;
std::thread m_collect_thread;
std::chrono::milliseconds m_default_wait{1};
private:
std::atomic<bool> m_stop_requested{false};
std::atomic<bool> m_processing_threads_stopped{true};
static ClusterVector<ClusterType> make_poison_cluster() {
ClusterVector<ClusterType> v;
v.set_frame_number(-1);
return v;
}
/**
* @brief Function called by the processing threads. It reads the frames
* from the input queue and processes them.
*/
void process(int thread_id) {
// auto cf = m_cluster_finders[thread_id].get();
// auto q = m_input_queues[thread_id].get();
// bool realloc_same_capacity = true;
// while (!m_stop_requested || !q->isEmpty()) {
// if (FrameWrapper *frame = q->frontPtr(); frame != nullptr) {
// switch (frame->type) {
// case FrameType::DATA:
// cf->find_clusters(frame->data.view(), frame->frame_number);
// m_output_queues[thread_id]->write(
// cf->steal_clusters(realloc_same_capacity));
// break;
// case FrameType::PEDESTAL:
// m_cluster_finders[thread_id]->push_pedestal_frame(
// frame->data.view());
// break;
// }
// // frame is processed now discard it
// m_input_queues[thread_id]->popFront();
// } else {
// std::this_thread::sleep_for(m_default_wait);
// }
// }
auto cf = m_cluster_finders[thread_id].get();
auto q = m_input_queues[thread_id].get();
bool realloc_same_capacity = true;
auto q = m_input_queues[thread_id].get();
while (!m_stop_requested || !q->isEmpty()) {
if (FrameWrapper *frame = q->frontPtr(); frame != nullptr) {
while (true) {
FrameWrapper frame = q->pop(); // blocks
switch (frame->type) {
if (is_poison(frame))
break;
switch (frame.type) {
case FrameType::DATA:
cf->find_clusters(frame->data.view(), frame->frame_number);
m_output_queues[thread_id]->write(
cf->steal_clusters(realloc_same_capacity));
cf->find_clusters(frame.data.view(), frame.frame_number);
m_output_queues[thread_id]->push(cf->steal_clusters());
break;
case FrameType::PEDESTAL:
m_cluster_finders[thread_id]->push_pedestal_frame(
frame->data.view());
cf->push_pedestal_frame(frame.data.view());
break;
}
// frame is processed now discard it
m_input_queues[thread_id]->popFront();
} else {
std::this_thread::sleep_for(m_default_wait);
}
}
}
@@ -94,20 +141,66 @@ class ClusterFinderMT {
* the sink
*/
void collect() {
bool empty = true;
while (!m_stop_requested || !empty || !m_processing_threads_stopped) {
empty = true;
for (auto &queue : m_output_queues) {
if (!queue->isEmpty()) {
// std::ofstream frame_log("/mnt/datapool/JMulvey/Data_Analysis/aare_testing/Read_Frame_Bug/test2.txt");
while (!m_sink.write(std::move(*queue->frontPtr()))) {
std::this_thread::sleep_for(m_default_wait);
// bool empty = true;
// while (!m_stop_requested || !all_output_queues_empty() || !all_input_queues_empty()) {
// // while (!m_stop_requested || !empty || !m_processing_threads_stopped) {
// empty = true;
// for (auto &queue : m_output_queues) {
// if (!queue->isEmpty()) {
// // auto item = std::move(*queue->frontPtr()); //For Debug
// // while (!m_sink.write(item)) {
// // std::this_thread::sleep_for(m_default_wait);
// // }
// // frame_log << item.frame_number() << '\n'; //For Debug
// // queue->popFront();
// // empty = false;
// auto& item = *queue->frontPtr(); // use reference
// while (!m_sink.write(std::move(item))) {
// std::this_thread::sleep_for(m_default_wait);
// }
// frame_log << item.frame_number() << '\n'; // log frame number
// queue->popFront();
// empty = false;
// }
// }
// }
// frame_log.close();
std::ofstream frame_log("/mnt/datapool/JMulvey/Data_Analysis/aare_testing/Read_Frame_Bug/test2.txt");
size_t poison_count = 0;
while (true) {
for (auto& queue : m_output_queues) {
auto item = queue->pop(); // BLOCKS
if (item.frame_number() == -1) {
poison_count++;
if (poison_count == m_n_threads) {
// all workers finished
m_sink.push(make_poison_cluster());
return;
}
queue->popFront();
empty = false;
continue;
}
m_sink.push(std::move(item));
frame_log << item.frame_number() << '\n';
}
}
frame_log.close();
}
public:
@@ -150,7 +243,8 @@ class ClusterFinderMT {
* @warning You need to empty this queue otherwise the cluster finder will
* wait forever
*/
ProducerConsumerQueue<ClusterVector<ClusterType>> *sink() {
BlockingQueue<ClusterVector<ClusterType>> *sink() {
//ProducerConsumerQueue<ClusterVector<ClusterType>> *sink() {
return &m_sink;
}
@@ -173,14 +267,31 @@ class ClusterFinderMT {
* @brief Stop all processing threads
*/
void stop() {
m_stop_requested = true;
// m_stop_requested = true;
for (auto &thread : m_threads) {
thread.join();
}
// for (auto &thread : m_threads) {
// thread.join();
// }
// m_threads.clear();
// m_processing_threads_stopped = true;
// m_collect_thread.join();
// 1. Send poison to ALL worker input queues
for (auto& q : m_input_queues)
q->push(make_poison_frame());
// 2. Wait for worker threads
for (auto& t : m_threads)
t.join();
m_threads.clear();
m_processing_threads_stopped = true;
// 3. Send poison clusters from workers to collector
for (auto& q : m_output_queues)
q->push(make_poison_cluster());
// 4. Wait for collector
m_collect_thread.join();
}
@@ -212,9 +323,10 @@ class ClusterFinderMT {
NDArray(frame)}; // TODO! copies the data!
for (auto &queue : m_input_queues) {
while (!queue->write(fw)) {
std::this_thread::sleep_for(m_default_wait);
}
queue->push(fw);
// while (!queue->write(fw)) {
// std::this_thread::sleep_for(m_default_wait);
// }
}
}
@@ -224,12 +336,18 @@ class ClusterFinderMT {
* @note Spin locks with a default wait if the queue is full.
*/
void find_clusters(NDView<FRAME_TYPE, 2> frame, uint64_t frame_number = 0) {
FrameWrapper fw{FrameType::DATA, frame_number,
NDArray(frame)}; // TODO! copies the data!
while (!m_input_queues[m_current_thread % m_n_threads]->write(fw)) {
std::this_thread::sleep_for(m_default_wait);
}
m_current_thread++;
// FrameWrapper fw{FrameType::DATA, frame_number,
// NDArray(frame)}; // TODO! copies the data!
// size_t thread_idx = m_current_thread.fetch_add(1) % m_n_threads;
// while (!m_input_queues[thread_idx]->write(fw)) {
// // while (!m_input_queues[m_current_thread % m_n_threads]->write(fw)) {
// std::this_thread::sleep_for(m_default_wait);
// }
// // m_current_thread++;
FrameWrapper fw{FrameType::DATA, frame_number, NDArray(frame)};
size_t thread_idx = m_current_thread.fetch_add(1) % m_n_threads;
m_input_queues[thread_idx]->push(std::move(fw)); // blocks if full
}
void clear_pedestal() {