From 055feb2290add57fed0e35fd46ddc9fe66e781c4 Mon Sep 17 00:00:00 2001 From: Jonathan Mulvey Date: Tue, 6 Jan 2026 14:09:51 +0100 Subject: [PATCH] Rewrote the multithreading to fix frame loss --- include/aare/BlockingQueue.hpp | 59 +++++++++ include/aare/ClusterCollector.hpp | 46 +++++-- include/aare/ClusterFileSink.hpp | 73 ++++++++--- include/aare/ClusterFinder.hpp | 5 + include/aare/ClusterFinderMT.hpp | 204 +++++++++++++++++++++++------- 5 files changed, 316 insertions(+), 71 deletions(-) create mode 100644 include/aare/BlockingQueue.hpp diff --git a/include/aare/BlockingQueue.hpp b/include/aare/BlockingQueue.hpp new file mode 100644 index 0000000..b75157f --- /dev/null +++ b/include/aare/BlockingQueue.hpp @@ -0,0 +1,59 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include + +#include "aare/ClusterFinder.hpp" +#include "aare/NDArray.hpp" +#include "aare/logger.hpp" + +template +class BlockingQueue { + std::mutex mtx; + std::condition_variable cv_push, cv_pop; + std::deque queue; + size_t max_size; + bool closed = false; + +public: + BlockingQueue(size_t capacity) : max_size(capacity) {} + + void push(T item) { + std::unique_lock lock(mtx); + cv_push.wait(lock, [this] { return queue.size() < max_size; }); + queue.push_back(std::move(item)); + cv_pop.notify_one(); + } + + T pop() { + std::unique_lock lock(mtx); + cv_pop.wait(lock, [this] { return !queue.empty(); }); + T item = std::move(queue.front()); + queue.pop_front(); + cv_push.notify_one(); + return item; + } + + void close() { + std::lock_guard lock(mtx); + closed = true; + cv_pop.notify_all(); + cv_push.notify_all(); + } + + bool empty() { + std::lock_guard lock(mtx); + return queue.empty(); + } + + + void write(T item) {push(item);} + bool isEmpty() {return empty();} + T frontPtr() {return pop();} + T popFront() {return pop();} +}; \ No newline at end of file diff --git a/include/aare/ClusterCollector.hpp b/include/aare/ClusterCollector.hpp index ae78a8e..8812734 100644 --- a/include/aare/ClusterCollector.hpp +++ b/include/aare/ClusterCollector.hpp @@ -5,13 +5,15 @@ #include "aare/ClusterFinderMT.hpp" #include "aare/ClusterVector.hpp" #include "aare/ProducerConsumerQueue.hpp" +#include "aare/BlockingQueue.hpp" namespace aare { template >> class ClusterCollector { - ProducerConsumerQueue> *m_source; + // ProducerConsumerQueue> *m_source; + BlockingQueue> *m_source; std::atomic m_stop_requested{false}; std::atomic m_stopped{true}; std::chrono::milliseconds m_default_wait{1}; @@ -19,19 +21,47 @@ class ClusterCollector { std::vector> m_clusters; void process() { + // m_stopped = false; + // fmt::print("ClusterCollector started\n"); + // // while (!m_stop_requested || !m_source->isEmpty()) { + // while (true) { + // if (clusters.frame_number() == -1) + // break; + + // ClusterVector clusters = m_source->pop(); + // m_clusters.push_back(std::move(clusters)); + // // if (ClusterVector *clusters = m_source->frontPtr(); + // // clusters != nullptr) { + // // m_clusters.push_back(std::move(*clusters)); + // // m_source->popFront(); + // // } else { + // // std::this_thread::sleep_for(m_default_wait); + // // } + // } + // fmt::print("ClusterCollector stopped\n"); + // m_stopped = true; + + m_stopped = false; fmt::print("ClusterCollector started\n"); - while (!m_stop_requested || !m_source->isEmpty()) { - if (ClusterVector *clusters = m_source->frontPtr(); - clusters != nullptr) { - m_clusters.push_back(std::move(*clusters)); - m_source->popFront(); - } else { - std::this_thread::sleep_for(m_default_wait); + + while (true) { + // pop blocks until there is data + ClusterVector clusters = m_source->pop(); + + // POISON DETECTION + if (clusters.frame_number() == -1) { + fmt::print("ClusterCollector received poison frame, stopping\n"); + break; // exit loop cleanly } + + // NORMAL DATA: store or process + m_clusters.push_back(std::move(clusters)); } + fmt::print("ClusterCollector stopped\n"); m_stopped = true; + } public: diff --git a/include/aare/ClusterFileSink.hpp b/include/aare/ClusterFileSink.hpp index 1900774..c68e988 100644 --- a/include/aare/ClusterFileSink.hpp +++ b/include/aare/ClusterFileSink.hpp @@ -6,13 +6,15 @@ #include "aare/ClusterFinderMT.hpp" #include "aare/ClusterVector.hpp" #include "aare/ProducerConsumerQueue.hpp" +#include "aare/BlockingQueue.hpp" namespace aare { template >> class ClusterFileSink { - ProducerConsumerQueue> *m_source; + // ProducerConsumerQueue> *m_source; + BlockingQueue> *m_source; std::atomic m_stop_requested{false}; std::atomic m_stopped{true}; std::chrono::milliseconds m_default_wait{1}; @@ -20,29 +22,60 @@ class ClusterFileSink { std::ofstream m_file; void process() { - m_stopped = false; + // m_stopped = false; + // LOG(logDEBUG) << "ClusterFileSink started"; + // while (!m_stop_requested || !m_source->isEmpty()) { + // // if (ClusterVector *clusters = m_source->pop(); m_source->frontPtr(); + // // clusters != nullptr) { + // { + // ClusterVector clusters = m_source->pop(); + // // Write clusters to file + // int32_t frame_number = + // clusters->frame_number(); // TODO! Should we store frame + // // number already as int? + // uint32_t num_clusters = clusters.size(); + + // if (frame_number >= 9910 && frame_number <= 9930) + // std::cout << "prcoess: frame_number = " << frame_number << std::endl; + + // m_file.write(reinterpret_cast(&frame_number), + // sizeof(frame_number)); + // m_file.write(reinterpret_cast(&num_clusters), + // sizeof(num_clusters)); + // m_file.write(reinterpret_cast(clusters.data()), + // clusters.size() * clusters.item_size()); + // m_source->popFront(); + // } + // // else { + // // std::this_thread::sleep_for(m_default_wait); + // // } + // } + // LOG(logDEBUG) << "ClusterFileSink stopped"; + // m_stopped = true; + LOG(logDEBUG) << "ClusterFileSink started"; - while (!m_stop_requested || !m_source->isEmpty()) { - if (ClusterVector *clusters = m_source->frontPtr(); - clusters != nullptr) { - // Write clusters to file - int32_t frame_number = - clusters->frame_number(); // TODO! Should we store frame - // number already as int? - uint32_t num_clusters = clusters->size(); - m_file.write(reinterpret_cast(&frame_number), - sizeof(frame_number)); - m_file.write(reinterpret_cast(&num_clusters), - sizeof(num_clusters)); - m_file.write(reinterpret_cast(clusters->data()), - clusters->size() * clusters->item_size()); - m_source->popFront(); - } else { - std::this_thread::sleep_for(m_default_wait); + + while (true) { + ClusterVector clusters = m_source->pop(); // blocks + + // POISON PILL CHECK + if (clusters.frame_number() == -1) { + LOG(logDEBUG) << "ClusterFileSink received poison pill"; + break; } + + int32_t frame_number = clusters.frame_number(); + uint32_t num_clusters = clusters.size(); + + m_file.write(reinterpret_cast(&frame_number), + sizeof(frame_number)); + m_file.write(reinterpret_cast(&num_clusters), + sizeof(num_clusters)); + m_file.write(reinterpret_cast(clusters.data()), + clusters.size() * clusters.item_size()); } + LOG(logDEBUG) << "ClusterFileSink stopped"; - m_stopped = true; } public: diff --git a/include/aare/ClusterFinder.hpp b/include/aare/ClusterFinder.hpp index 8525132..b60a8db 100644 --- a/include/aare/ClusterFinder.hpp +++ b/include/aare/ClusterFinder.hpp @@ -87,6 +87,11 @@ class ClusterFinder { // even amount of pixels around the center int has_center_pixel_y = ClusterSizeY % 2; + // if (frame_number >= 8000 && frame_number <= 9000) + // // std::cout << "find_clusters: frame_number = " << frame_number << std::endl; + // std::cout << frame_number << std::endl; + + m_clusters.set_frame_number(frame_number); for (int iy = 0; iy < frame.shape(0); iy++) { for (int ix = 0; ix < frame.shape(1); ix++) { diff --git a/include/aare/ClusterFinderMT.hpp b/include/aare/ClusterFinderMT.hpp index cf621db..261cd2a 100644 --- a/include/aare/ClusterFinderMT.hpp +++ b/include/aare/ClusterFinderMT.hpp @@ -4,10 +4,14 @@ #include #include #include +#include +#include +#include #include "aare/ClusterFinder.hpp" #include "aare/NDArray.hpp" #include "aare/ProducerConsumerQueue.hpp" +#include "aare/BlockingQueue.hpp" #include "aare/logger.hpp" namespace aare { @@ -23,6 +27,14 @@ struct FrameWrapper { NDArray data; }; +static FrameWrapper make_poison_frame() { + return FrameWrapper{FrameType::DATA, UINT64_MAX, NDArray()}; +} + +static bool is_poison(const FrameWrapper& f) { + return f.frame_number == UINT64_MAX; +} + /** * @brief ClusterFinderMT is a multi-threaded version of ClusterFinder. It uses * a producer-consumer queue to distribute the frames to the threads. The @@ -37,11 +49,15 @@ class ClusterFinderMT { protected: using CT = typename ClusterType::value_type; - size_t m_current_thread{0}; + // size_t m_current_thread{0}; + std::atomic m_current_thread{0}; + size_t m_n_threads{0}; using Finder = ClusterFinder; - using InputQueue = ProducerConsumerQueue; - using OutputQueue = ProducerConsumerQueue>; + // using InputQueue = ProducerConsumerQueue; + // using OutputQueue = ProducerConsumerQueue>; + using InputQueue = BlockingQueue; + using OutputQueue = BlockingQueue>; std::vector> m_input_queues; std::vector> m_output_queues; @@ -50,41 +66,72 @@ class ClusterFinderMT { std::vector> m_cluster_finders; std::vector m_threads; std::thread m_collect_thread; + + std::chrono::milliseconds m_default_wait{1}; private: std::atomic m_stop_requested{false}; std::atomic m_processing_threads_stopped{true}; + static ClusterVector make_poison_cluster() { + ClusterVector v; + v.set_frame_number(-1); + return v; + } + + + /** * @brief Function called by the processing threads. It reads the frames * from the input queue and processes them. */ void process(int thread_id) { + // auto cf = m_cluster_finders[thread_id].get(); + // auto q = m_input_queues[thread_id].get(); + // bool realloc_same_capacity = true; + + // while (!m_stop_requested || !q->isEmpty()) { + // if (FrameWrapper *frame = q->frontPtr(); frame != nullptr) { + + // switch (frame->type) { + // case FrameType::DATA: + // cf->find_clusters(frame->data.view(), frame->frame_number); + // m_output_queues[thread_id]->write( + // cf->steal_clusters(realloc_same_capacity)); + // break; + + // case FrameType::PEDESTAL: + // m_cluster_finders[thread_id]->push_pedestal_frame( + // frame->data.view()); + // break; + // } + + // // frame is processed now discard it + // m_input_queues[thread_id]->popFront(); + // } else { + // std::this_thread::sleep_for(m_default_wait); + // } + // } + auto cf = m_cluster_finders[thread_id].get(); - auto q = m_input_queues[thread_id].get(); - bool realloc_same_capacity = true; + auto q = m_input_queues[thread_id].get(); - while (!m_stop_requested || !q->isEmpty()) { - if (FrameWrapper *frame = q->frontPtr(); frame != nullptr) { + while (true) { + FrameWrapper frame = q->pop(); // blocks - switch (frame->type) { + if (is_poison(frame)) + break; + + switch (frame.type) { case FrameType::DATA: - cf->find_clusters(frame->data.view(), frame->frame_number); - m_output_queues[thread_id]->write( - cf->steal_clusters(realloc_same_capacity)); + cf->find_clusters(frame.data.view(), frame.frame_number); + m_output_queues[thread_id]->push(cf->steal_clusters()); break; case FrameType::PEDESTAL: - m_cluster_finders[thread_id]->push_pedestal_frame( - frame->data.view()); + cf->push_pedestal_frame(frame.data.view()); break; - } - - // frame is processed now discard it - m_input_queues[thread_id]->popFront(); - } else { - std::this_thread::sleep_for(m_default_wait); } } } @@ -94,20 +141,66 @@ class ClusterFinderMT { * the sink */ void collect() { - bool empty = true; - while (!m_stop_requested || !empty || !m_processing_threads_stopped) { - empty = true; - for (auto &queue : m_output_queues) { - if (!queue->isEmpty()) { + // std::ofstream frame_log("/mnt/datapool/JMulvey/Data_Analysis/aare_testing/Read_Frame_Bug/test2.txt"); - while (!m_sink.write(std::move(*queue->frontPtr()))) { - std::this_thread::sleep_for(m_default_wait); + // bool empty = true; + // while (!m_stop_requested || !all_output_queues_empty() || !all_input_queues_empty()) { + // // while (!m_stop_requested || !empty || !m_processing_threads_stopped) { + // empty = true; + // for (auto &queue : m_output_queues) { + // if (!queue->isEmpty()) { + + // // auto item = std::move(*queue->frontPtr()); //For Debug + + // // while (!m_sink.write(item)) { + // // std::this_thread::sleep_for(m_default_wait); + // // } + + // // frame_log << item.frame_number() << '\n'; //For Debug + + // // queue->popFront(); + // // empty = false; + + + // auto& item = *queue->frontPtr(); // use reference + // while (!m_sink.write(std::move(item))) { + // std::this_thread::sleep_for(m_default_wait); + // } + // frame_log << item.frame_number() << '\n'; // log frame number + // queue->popFront(); + // empty = false; + + // } + // } + // } + + // frame_log.close(); + + + std::ofstream frame_log("/mnt/datapool/JMulvey/Data_Analysis/aare_testing/Read_Frame_Bug/test2.txt"); + + size_t poison_count = 0; + + while (true) { + for (auto& queue : m_output_queues) { + auto item = queue->pop(); // BLOCKS + + if (item.frame_number() == -1) { + poison_count++; + if (poison_count == m_n_threads) { + // all workers finished + m_sink.push(make_poison_cluster()); + return; } - queue->popFront(); - empty = false; + continue; } + + m_sink.push(std::move(item)); + frame_log << item.frame_number() << '\n'; } } + + frame_log.close(); } public: @@ -150,7 +243,8 @@ class ClusterFinderMT { * @warning You need to empty this queue otherwise the cluster finder will * wait forever */ - ProducerConsumerQueue> *sink() { + BlockingQueue> *sink() { + //ProducerConsumerQueue> *sink() { return &m_sink; } @@ -173,14 +267,31 @@ class ClusterFinderMT { * @brief Stop all processing threads */ void stop() { - m_stop_requested = true; + // m_stop_requested = true; - for (auto &thread : m_threads) { - thread.join(); - } + // for (auto &thread : m_threads) { + // thread.join(); + // } + // m_threads.clear(); + + // m_processing_threads_stopped = true; + // m_collect_thread.join(); + + + // 1. Send poison to ALL worker input queues + for (auto& q : m_input_queues) + q->push(make_poison_frame()); + + // 2. Wait for worker threads + for (auto& t : m_threads) + t.join(); m_threads.clear(); - m_processing_threads_stopped = true; + // 3. Send poison clusters from workers to collector + for (auto& q : m_output_queues) + q->push(make_poison_cluster()); + + // 4. Wait for collector m_collect_thread.join(); } @@ -212,9 +323,10 @@ class ClusterFinderMT { NDArray(frame)}; // TODO! copies the data! for (auto &queue : m_input_queues) { - while (!queue->write(fw)) { - std::this_thread::sleep_for(m_default_wait); - } + queue->push(fw); + // while (!queue->write(fw)) { + // std::this_thread::sleep_for(m_default_wait); + // } } } @@ -224,12 +336,18 @@ class ClusterFinderMT { * @note Spin locks with a default wait if the queue is full. */ void find_clusters(NDView frame, uint64_t frame_number = 0) { - FrameWrapper fw{FrameType::DATA, frame_number, - NDArray(frame)}; // TODO! copies the data! - while (!m_input_queues[m_current_thread % m_n_threads]->write(fw)) { - std::this_thread::sleep_for(m_default_wait); - } - m_current_thread++; + // FrameWrapper fw{FrameType::DATA, frame_number, + // NDArray(frame)}; // TODO! copies the data! + // size_t thread_idx = m_current_thread.fetch_add(1) % m_n_threads; + // while (!m_input_queues[thread_idx]->write(fw)) { + // // while (!m_input_queues[m_current_thread % m_n_threads]->write(fw)) { + // std::this_thread::sleep_for(m_default_wait); + // } + // // m_current_thread++; + + FrameWrapper fw{FrameType::DATA, frame_number, NDArray(frame)}; + size_t thread_idx = m_current_thread.fetch_add(1) % m_n_threads; + m_input_queues[thread_idx]->push(std::move(fw)); // blocks if full } void clear_pedestal() {