diff --git a/include/aare/PixelHistogram.hpp b/include/aare/PixelHistogram.hpp index ae72f15..3a3560e 100644 --- a/include/aare/PixelHistogram.hpp +++ b/include/aare/PixelHistogram.hpp @@ -1,12 +1,16 @@ #pragma once #include "aare/NDArray.hpp" #include "aare/NDView.hpp" +#include "aare/ProducerConsumerQueue.hpp" //Lets see if we need to hide it behind a pimpl #include -#include #include +#include #include +#include +#include +#include #include #include #include @@ -25,6 +29,8 @@ class PixelHistogram { bh::axis::integer, bh::axis::integer>; using Hist = bh::histogram>; + using AsyncQueue = ProducerConsumerQueue>; + int rows_; int cols_; int n_threads_; @@ -47,15 +53,52 @@ class PixelHistogram { std::atomic stop_workers_; int work_generation_; + // Serialises calls into the synchronous fan-out (`fill`). The + // coordinator thread acquires it for the duration of each item it + // processes, and direct callers of `fill` also acquire it. Without + // this, concurrent sync + async fills would race on `current_image_` + // and `work_generation_`. + std::mutex fill_mutex_; + + // Async producer/consumer pipeline. SPSC queue feeds the coordinator + // thread, which calls the synchronous `fill` one image at a time. + std::unique_ptr async_queue_; + std::thread coordinator_; + std::atomic stop_coordinator_{false}; + std::atomic coordinator_busy_{false}; + std::chrono::microseconds async_wait_{100}; + // Private worker thread method void worker_loop(int thread_id); + void coordinator_loop(); int row_start(int thread_id) const; int row_count(int thread_id) const; public: - PixelHistogram(int rows, int cols, int n_bins, double xmin, double xmax, int n_threads = 1); + PixelHistogram(int rows, int cols, int n_bins, double xmin, double xmax, + int n_threads = 1, std::size_t max_pending = 16); ~PixelHistogram(); + + // Synchronous fill: blocks until the image has been merged into the + // accumulators. Safe to call concurrently with `fill_async` (calls are + // serialised through `fill_mutex_`). void fill(const NDView &image); + + // Asynchronous fill: takes ownership of `image`, enqueues it for the + // coordinator thread, and returns. Blocks the caller only if the queue + // is full (single-producer, single-consumer queue with a sleep-poll + // backpressure loop, matching the convention in ClusterFinderMT). + void fill_async(NDArray image); + + // Wait for all queued async fills to complete. Cheap when the queue + // is already drained. + void flush() const; + + // Number of items either queued or currently being processed. + std::size_t pending() const; + + // Implicitly flushes pending async fills first so the snapshot is + // consistent with everything that was submitted up to the call. NDArray hdata() const; NDArray bin_centers() const; NDArray bin_edges() const; diff --git a/python/aare/__init__.py b/python/aare/__init__.py index f4bddda..cadeb01 100644 --- a/python/aare/__init__.py +++ b/python/aare/__init__.py @@ -43,3 +43,4 @@ from ._aare import apply_calibration, count_switching_pixels from ._aare import calculate_pedestal, calculate_pedestal_float, calculate_pedestal_g0, calculate_pedestal_g0_float from ._aare import VarClusterFinder +from ._aare import PixelHistogram diff --git a/python/src/bind_PixelHistogram.hpp b/python/src/bind_PixelHistogram.hpp index cdbf6aa..cf87e7a 100644 --- a/python/src/bind_PixelHistogram.hpp +++ b/python/src/bind_PixelHistogram.hpp @@ -13,10 +13,10 @@ using namespace ::aare; void define_pixel_histogram_bindings(py::module &m) { py::class_(m, "PixelHistogram", "A histogram for pixel-wise statistics") - .def(py::init(), + .def(py::init(), R"( Initialize a PixelHistogram. - + Args: rows: Number of rows in the detector cols: Number of columns in the detector @@ -24,9 +24,13 @@ void define_pixel_histogram_bindings(py::module &m) { xmin: Minimum value for histogram range xmax: Maximum value for histogram range n_threads: Number of threads for parallel filling (default: 1) + max_pending: Maximum number of images that can be queued for + asynchronous filling before fill_async() applies + backpressure on the caller (default: 16) )", py::arg("rows"), py::arg("cols"), py::arg("n_bins"), - py::arg("xmin"), py::arg("xmax"), py::arg("n_threads") = 1) + py::arg("xmin"), py::arg("xmax"), py::arg("n_threads") = 1, + py::arg("max_pending") = std::size_t{16}) .def("fill", [](PixelHistogram &self, @@ -35,21 +39,75 @@ void define_pixel_histogram_bindings(py::module &m) { self.fill(view); }, R"( - Fill the histogram with image data. - + Fill the histogram with image data (blocking). + Args: image: A 2D numpy array of pixel values (dtype: float32) )", py::arg("image").noconvert()) + .def("fill_async", + [](PixelHistogram &self, + py::array_t image) { + // Copy the numpy buffer into an owned NDArray while we + // still hold the GIL so we don't depend on the array's + // backing storage outliving this call. + auto view = make_view_2d(image); + NDArray owned(view); + // Release the GIL while enqueueing - fill_async can block + // on backpressure when the queue is full. + py::gil_scoped_release release; + self.fill_async(std::move(owned)); + }, + R"( + Submit an image for asynchronous filling. + + The image is copied into an internal buffer before this call + returns, so the caller may mutate or free the numpy array + immediately. The actual histogram update happens on a + background thread. If the internal queue is full this call + blocks (with the GIL released) until a slot becomes available. + + Args: + image: A 2D numpy array of pixel values (dtype: float32) + )", + py::arg("image").noconvert()) + + .def("flush", + &PixelHistogram::flush, + R"( + Block until all images submitted via fill_async() have been + merged into the accumulators. Cheap when nothing is pending. + )", + py::call_guard()) + + .def("pending", + &PixelHistogram::pending, + R"( + Return the number of images either waiting in the queue or + currently being processed by the background thread. Useful + for monitoring/diagnostics. + )") + .def("hdata", [](const PixelHistogram &self) { - auto ptr = new NDArray(self.hdata()); + // hdata() implicitly flushes - release the GIL while it + // does so. Allocation/copy into the NDArray runs without + // the GIL too; only the numpy wrapping needs it. + NDArray* ptr = nullptr; + { + py::gil_scoped_release release; + ptr = new NDArray(self.hdata()); + } return return_image_data(ptr); }, R"( Get the histogram data as a numpy array. - + + Implicitly flushes any pending asynchronous fills before + returning, so the snapshot is consistent with everything + submitted up to this call. + Returns: A 3D numpy array containing the histogram bins for each pixel )") @@ -61,7 +119,7 @@ void define_pixel_histogram_bindings(py::module &m) { }, R"( Get the bin centers along the value axis. - + Returns: A 1D numpy array containing the center values for each histogram bin )") @@ -72,7 +130,7 @@ void define_pixel_histogram_bindings(py::module &m) { }, R"( Get the bin edges along the value axis. - + Returns: A 1D numpy array containing the edge values for the histogram bins )"); diff --git a/src/PixelHistogram.cpp b/src/PixelHistogram.cpp index 228a225..a0aadfd 100644 --- a/src/PixelHistogram.cpp +++ b/src/PixelHistogram.cpp @@ -5,13 +5,17 @@ #include #include #include +#include +#include #include +#include #include namespace aare { -PixelHistogram::PixelHistogram(int rows, int cols, int n_bins, double xmin, double xmax, int n_threads): +PixelHistogram::PixelHistogram(int rows, int cols, int n_bins, double xmin, double xmax, + int n_threads, std::size_t max_pending): rows_(rows), cols_(cols), n_threads_(n_threads), xmin_(xmin), xmax_(xmax), current_image_(nullptr), completed_threads_(0), stop_workers_(false), work_generation_(0) { if (rows_ < 1 || cols_ < 1 || n_bins < 1) { @@ -20,6 +24,9 @@ PixelHistogram::PixelHistogram(int rows, int cols, int n_bins, double xmin, doub if (n_threads < 1) { throw std::invalid_argument("PixelHistogram requires at least one thread"); } + if (max_pending < 1) { + throw std::invalid_argument("PixelHistogram requires max_pending >= 1"); + } n_threads_ = std::min(n_threads_, rows_); @@ -54,16 +61,29 @@ PixelHistogram::PixelHistogram(int rows, int cols, int n_bins, double xmin, doub for (int i = 0; i < n_threads_; ++i) { workers_.emplace_back([this, i]() { this->worker_loop(i); }); } + + // Async pipeline. The PCQ holds (size - 1) usable slots, so size up by + // one to honour the requested max_pending. + async_queue_ = std::make_unique(static_cast(max_pending + 1)); + coordinator_ = std::thread([this]() { this->coordinator_loop(); }); } PixelHistogram::~PixelHistogram() { + // Drain any pending async fills before tearing down the worker pool. + // The coordinator's loop keeps processing while stop_coordinator_ is + // true as long as the queue is non-empty (mirrors ClusterFinderMT). + if (coordinator_.joinable()) { + stop_coordinator_ = true; + coordinator_.join(); + } + // Signal all workers to stop { std::unique_lock lock(work_mutex_); stop_workers_ = true; } work_cv_.notify_all(); - + // Join all worker threads for (auto& thread : workers_) { if (thread.joinable()) { @@ -126,6 +146,10 @@ void PixelHistogram::worker_loop(int thread_id) { } NDArray PixelHistogram::hdata() const { + // Make sure any pending async fills are merged in before we snapshot + // the partial histograms. Cheap when the queue is already drained. + flush(); + const auto &hist = partial_hists_.front(); const auto bins = static_cast(hist.axis(0).size()); const auto cols = static_cast(hist.axis(1).size()); @@ -159,6 +183,10 @@ void PixelHistogram::fill(const NDView &image) { throw std::invalid_argument("PixelHistogram image shape does not match constructor shape"); } + // Serialise all calls into the fan-out. fill_mutex_ is always the + // outermost lock; work_mutex_ is taken briefly inside it. + std::lock_guard fill_lock(fill_mutex_); + // Reset counters and set work { std::unique_lock lock(work_mutex_); @@ -166,10 +194,10 @@ void PixelHistogram::fill(const NDView &image) { current_image_ = ℑ ++work_generation_; } - + // Signal all worker threads to start work_cv_.notify_all(); - + // Wait for all workers to complete { std::unique_lock lock(work_mutex_); @@ -178,6 +206,56 @@ void PixelHistogram::fill(const NDView &image) { } } +void PixelHistogram::fill_async(NDArray image) { + if (image.shape(0) != rows_ || image.shape(1) != cols_) { + throw std::invalid_argument("PixelHistogram image shape does not match constructor shape"); + } + + // SPSC backpressure: spin with a short sleep until a slot frees up. + // The std::move only consumes `image` on the iteration that succeeds + // (placement-new inside write() runs only when the slot is free). + while (!async_queue_->write(std::move(image))) { + std::this_thread::sleep_for(async_wait_); + } +} + +void PixelHistogram::flush() const { + while (!async_queue_->isEmpty() || coordinator_busy_.load(std::memory_order_acquire)) { + std::this_thread::sleep_for(async_wait_); + } +} + +std::size_t PixelHistogram::pending() const { + // sizeGuess() counts the items still in the queue; the coordinator + // does `read()` (which pops) before setting `coordinator_busy_`, so an + // in-flight item lives only in the busy flag. + return async_queue_->sizeGuess() + + (coordinator_busy_.load(std::memory_order_acquire) ? 1u : 0u); +} + +void PixelHistogram::coordinator_loop() { + NDArray item; + while (!stop_coordinator_.load(std::memory_order_acquire) || !async_queue_->isEmpty()) { + if (async_queue_->read(item)) { + coordinator_busy_.store(true, std::memory_order_release); + try { + fill(item.view()); + } catch (const std::exception& e) { + // fill_async pre-validates shape, so this is purely + // defensive. Log to stderr and keep the coordinator alive. + std::cerr << "PixelHistogram::fill_async error: " + << e.what() << std::endl; + } catch (...) { + std::cerr << "PixelHistogram::fill_async error: unknown" + << std::endl; + } + coordinator_busy_.store(false, std::memory_order_release); + } else { + std::this_thread::sleep_for(async_wait_); + } + } +} + NDArray PixelHistogram::bin_centers() const { const auto& value_axis = partial_hists_.front().axis(0); const auto n_bins = static_cast(value_axis.size()); diff --git a/src/PixelHistogram.test.cpp b/src/PixelHistogram.test.cpp index f79dfb7..1d8fdbe 100644 --- a/src/PixelHistogram.test.cpp +++ b/src/PixelHistogram.test.cpp @@ -1,8 +1,10 @@ #include #include +#include #include #include +#include #include #include "test_config.hpp" @@ -206,3 +208,123 @@ TEST_CASE("Random fills match a reference implementation") { } CHECK(all_match); } + +TEST_CASE("Async fill matches sync fill") { + // Submit a stream of random frames through fill_async and compare + // against the same frames processed by fill() on a separate histogram. + constexpr int rows = 19; + constexpr int cols = 23; + constexpr int n_bins = 16; + constexpr float xmin = -1.0f; + constexpr float xmax = 3.0f; + const int n_threads = GENERATE(1, 2, 4); + constexpr int n_frames = 32; + // Pick a small queue capacity so the producer trips the backpressure + // path at least a few times during the run. + constexpr std::size_t max_pending = 4; + + PixelHistogram async_hist(rows, cols, n_bins, xmin, xmax, n_threads, max_pending); + PixelHistogram sync_hist(rows, cols, n_bins, xmin, xmax, n_threads); + + std::mt19937 rng(0xA5A5A5A5); + std::uniform_real_distribution dist(xmin - 0.25f, xmax + 0.25f); + + for (int f = 0; f < n_frames; ++f) { + NDArray img({rows, cols}, 0.0f); + for (ssize_t r = 0; r < rows; ++r) + for (ssize_t c = 0; c < cols; ++c) + img(r, c) = dist(rng); + sync_hist.fill(img.view()); + async_hist.fill_async(std::move(img)); + } + // hdata() calls flush() internally, but exercise the explicit path too. + async_hist.flush(); + CHECK(async_hist.pending() == 0); + + auto a = async_hist.hdata(); + auto s = sync_hist.hdata(); + REQUIRE(a.shape(0) == s.shape(0)); + REQUIRE(a.shape(1) == s.shape(1)); + REQUIRE(a.shape(2) == s.shape(2)); + + bool all_match = true; + for (ssize_t r = 0; r < rows && all_match; ++r) { + for (ssize_t c = 0; c < cols && all_match; ++c) { + for (ssize_t b = 0; b < n_bins && all_match; ++b) { + if (a(r, c, b) != s(r, c, b)) { + all_match = false; + INFO("r=" << r << " c=" << c << " b=" << b + << " async=" << a(r, c, b) << " sync=" << s(r, c, b)); + CHECK(a(r, c, b) == s(r, c, b)); + } + } + } + } + CHECK(all_match); +} + +TEST_CASE("fill_async with mismatched shape throws") { + PixelHistogram hist(8, 8, 16, 0.0, 1.0, 2); + NDArray bad({4, 4}, 0.0f); + CHECK_THROWS_AS(hist.fill_async(std::move(bad)), std::invalid_argument); +} + +TEST_CASE("Destructor drains pending async fills") { + // Submit more frames than the queue can hold so backpressure kicks in, + // then immediately let the histogram go out of scope and verify that + // the merged hdata() matches the reference computed sequentially. + constexpr int rows = 11; + constexpr int cols = 7; + constexpr int n_bins = 8; + constexpr float xmin = 0.0f; + constexpr float xmax = 1.0f; + constexpr int n_frames = 12; + constexpr std::size_t max_pending = 2; + + std::vector> frames; + frames.reserve(n_frames); + std::mt19937 rng(0xDEADBEEF); + std::uniform_real_distribution dist(xmin, xmax); + for (int f = 0; f < n_frames; ++f) { + NDArray img({rows, cols}, 0.0f); + for (ssize_t r = 0; r < rows; ++r) + for (ssize_t c = 0; c < cols; ++c) + img(r, c) = dist(rng); + frames.push_back(std::move(img)); + } + + NDArray snapshot({rows, cols, n_bins}, uint16_t{0}); + { + PixelHistogram hist(rows, cols, n_bins, xmin, xmax, 2, max_pending); + for (auto& img : frames) { + // Move a copy so we can also build the reference below. + NDArray copy({rows, cols}, 0.0f); + std::memcpy(copy.data(), img.data(), copy.total_bytes()); + hist.fill_async(std::move(copy)); + } + // No explicit flush(); destructor must drain. + // Capture hdata() *after* the loop but inside the scope so it + // observes everything that was submitted (hdata flushes too). + snapshot = hist.hdata(); + } + + PixelHistogram reference(rows, cols, n_bins, xmin, xmax, 2); + for (const auto& img : frames) reference.fill(img.view()); + auto expected = reference.hdata(); + + bool all_match = true; + for (ssize_t r = 0; r < rows && all_match; ++r) { + for (ssize_t c = 0; c < cols && all_match; ++c) { + for (ssize_t b = 0; b < n_bins && all_match; ++b) { + if (snapshot(r, c, b) != expected(r, c, b)) { + all_match = false; + INFO("r=" << r << " c=" << c << " b=" << b + << " got=" << snapshot(r, c, b) + << " expected=" << expected(r, c, b)); + CHECK(snapshot(r, c, b) == expected(r, c, b)); + } + } + } + } + CHECK(all_match); +}