diff --git a/CMakeLists.txt b/CMakeLists.txt index caa8990..7eac5ec 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -406,7 +406,6 @@ set(SourceFiles ${CMAKE_CURRENT_SOURCE_DIR}/src/defs.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/ROIGeometry.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/DetectorGeometry.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/src/hist/PixelHistogram.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/hist/PedestalTrackingPixelHistogram.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/Dtype.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/File.cpp diff --git a/include/aare/hist/PixelHistogram.hpp b/include/aare/hist/PixelHistogram.hpp index 140af40..f4e0812 100644 --- a/include/aare/hist/PixelHistogram.hpp +++ b/include/aare/hist/PixelHistogram.hpp @@ -4,22 +4,23 @@ #include "aare/ProducerConsumerQueue.hpp" #include "aare/hist/PixelHistogramImpl.hpp" +#include #include #include #include #include #include +#include #include #include +#include #include +#include #include namespace aare { +template class PixelHistogram { - public: - using StorageType = uint16_t; - using AxisType = float; - private: using Hist = PixelHistogramImpl; using AsyncQueue = ProducerConsumerQueue>; @@ -86,4 +87,256 @@ class PixelHistogram { NDArray bin_edges() const; }; +template +PixelHistogram::PixelHistogram(int rows, int cols, + int n_bins, double xmin, + double xmax, + int n_threads, + std::size_t max_pending) + : rows_(rows), cols_(cols), n_threads_(n_threads), xmin_(xmin), xmax_(xmax), + current_image_(nullptr), completed_threads_(0), stop_workers_(false), + work_generation_(0) { + if (rows_ < 1 || cols_ < 1 || n_bins < 1) { + throw std::invalid_argument( + "PixelHistogram requires positive rows, cols and bins"); + } + if (n_threads < 1) { + throw std::invalid_argument( + "PixelHistogram requires at least one thread"); + } + if (max_pending < 1) { + throw std::invalid_argument("PixelHistogram requires max_pending >= 1"); + } + + n_threads_ = std::min(n_threads_, rows_); + + // Build a balanced row partition. With base = rows_ / n_threads_ and + // extra = rows_ % n_threads_, the first `extra` threads get base + 1 + // rows each and the rest get `base` rows. This avoids the + // ceil(rows/n_threads) scheme leaving trailing threads with zero or + // negative row counts (e.g. rows=17, n_threads=8). + row_offsets_.resize(n_threads_ + 1); + const int base = rows_ / n_threads_; + const int extra = rows_ % n_threads_; + int offset = 0; + for (int i = 0; i < n_threads_; ++i) { + row_offsets_[i] = offset; + offset += base + (i < extra ? 1 : 0); + } + row_offsets_[n_threads_] = offset; // == rows_ by construction + + // Initialize partial histograms for each thread + partial_hists_.reserve(n_threads_); + for (int i = 0; i < n_threads_; ++i) { + const auto local_rows = row_count(i); + partial_hists_.emplace_back(local_rows, cols, n_bins, + static_cast(xmin), + static_cast(xmax)); + } + + // Spawn worker threads + for (int i = 0; i < n_threads_; ++i) { + workers_.emplace_back([this, i]() { this->worker_loop(i); }); + } + + // Async pipeline. The PCQ holds (size - 1) usable slots, so size up by + // one to honour the requested max_pending. + async_queue_ = std::make_unique( + static_cast(max_pending + 1)); + coordinator_ = std::thread([this]() { this->coordinator_loop(); }); +} + +template +PixelHistogram::~PixelHistogram() { + // Drain any pending async fills before tearing down the worker pool. + // The coordinator's loop keeps processing while stop_coordinator_ is + // true as long as the queue is non-empty (mirrors ClusterFinderMT). + if (coordinator_.joinable()) { + stop_coordinator_ = true; + coordinator_.join(); + } + + // Signal all workers to stop + { + std::unique_lock lock(work_mutex_); + stop_workers_ = true; + } + work_cv_.notify_all(); + + // Join all worker threads + for (auto &thread : workers_) { + if (thread.joinable()) { + thread.join(); + } + } +} + +template +int PixelHistogram::row_start(int thread_id) const { + return row_offsets_[thread_id]; +} + +template +int PixelHistogram::row_count(int thread_id) const { + return row_offsets_[thread_id + 1] - row_offsets_[thread_id]; +} + +template +void PixelHistogram::worker_loop(int thread_id) { + int last_generation = 0; + + while (true) { + std::unique_lock lock(work_mutex_); + work_cv_.wait(lock, [this, last_generation]() { + return work_generation_ != last_generation || stop_workers_; + }); + + if (stop_workers_) { + break; + } + + // Get work assignment + const NDView &image = *current_image_; + const int generation = work_generation_; + const int first_row = row_start(thread_id); + const int local_rows = row_count(thread_id); + + lock.unlock(); + + // Do the work: fill this thread's partial histogram. The + // [xmin, xmax) range gate lives inside PixelHistogramImpl::fill. + auto &my_hist = partial_hists_[thread_id]; + const auto cols = image.shape(1); + for (int local_row = 0; local_row < local_rows; ++local_row) { + const auto row = static_cast(first_row + local_row); + for (ssize_t col = 0; col < cols; ++col) { + const auto val = image(row, col); + my_hist.fill_unchecked(local_row, static_cast(col), val); + } + } + + // Signal completion + { + std::unique_lock done_lock(work_mutex_); + last_generation = generation; + completed_threads_++; + if (completed_threads_ == n_threads_) { + done_cv_.notify_one(); + } + } + } +} + +template +NDArray PixelHistogram::values() const { + // Make sure any pending async fills are merged in before we snapshot + // the partial histograms. Cheap when the queue is already drained. + flush(); + + const auto first_shard_view = partial_hists_.front().view(); + const auto cols = static_cast(first_shard_view.shape(1)); + const auto bins = static_cast(first_shard_view.shape(2)); + const auto rows = static_cast(rows_); + + NDArray data({rows, cols, bins}); + + // Each thread owns a disjoint, contiguous range of rows. The shard's + // dense storage layout [local_row][col][bin] is identical to the slice + // [first_row .. first_row + local_rows)[col][bin] of `data`, so the + // merge is just one bulk copy per thread; no per-element accumulation + // and no upfront zeroing of `data` is needed. + const size_t pixel_stride = static_cast(cols) * bins; + for (int t = 0; t < n_threads_; ++t) { + const auto first_row = static_cast(row_start(t)); + const auto local_rows = static_cast(row_count(t)); + if (local_rows == 0) + continue; + + const auto shard_view = partial_hists_[t].view(); + std::memcpy(data.data() + first_row * pixel_stride, shard_view.data(), + local_rows * pixel_stride * sizeof(StorageType)); + } + + return data; +} + +template +void PixelHistogram::dispatch( + const NDView &image) { + // Called only by the coordinator thread on images already shape-checked + // by fill_async, so there is no need to re-validate or to serialise + // against other callers. + + // Reset counters and set work + { + std::unique_lock lock(work_mutex_); + completed_threads_ = 0; + current_image_ = ℑ + ++work_generation_; + } + + // Signal all worker threads to start + work_cv_.notify_all(); + + // Wait for all workers to complete + { + std::unique_lock lock(work_mutex_); + done_cv_.wait(lock, + [this]() { return completed_threads_ == n_threads_; }); + current_image_ = nullptr; // Clear work assignment + } +} + +template +void PixelHistogram::fill_async( + NDArray image) { + if (image.shape(0) != rows_ || image.shape(1) != cols_) { + throw std::invalid_argument( + "PixelHistogram image shape does not match constructor shape"); + } + + // SPSC backpressure: spin with a short sleep until a slot frees up. + // The std::move only consumes `image` on the iteration that succeeds + // (placement-new inside write() runs only when the slot is free). + while (!async_queue_->write(std::move(image))) { + std::this_thread::sleep_for(async_wait_); + } +} + +template +void PixelHistogram::flush() const { + while (!async_queue_->isEmpty() || + coordinator_busy_.load(std::memory_order_acquire)) { + std::this_thread::sleep_for(async_wait_); + } +} + +template +void PixelHistogram::coordinator_loop() { + NDArray item; + while (!stop_coordinator_.load(std::memory_order_acquire) || + !async_queue_->isEmpty()) { + if (async_queue_->read(item)) { + coordinator_busy_.store(true, std::memory_order_release); + dispatch(item.view()); + coordinator_busy_.store(false, std::memory_order_release); + } else { + std::this_thread::sleep_for(async_wait_); + } + } +} + +template +NDArray +PixelHistogram::bin_centers() const { + // All shards share the same value-axis configuration, so any one will + // do; pick the first. + return partial_hists_.front().bin_centers(); +} + +template +NDArray PixelHistogram::bin_edges() const { + return partial_hists_.front().bin_edges(); +} + } // namespace aare diff --git a/python/src/bind_PixelHistogram.hpp b/python/src/bind_PixelHistogram.hpp index 4b24f7f..a9f6093 100644 --- a/python/src/bind_PixelHistogram.hpp +++ b/python/src/bind_PixelHistogram.hpp @@ -11,8 +11,10 @@ namespace py = pybind11; using namespace ::aare; void define_pixel_histogram_bindings(py::module &m) { - py::class_(m, "PixelHistogram", - "A histogram for pixel-wise statistics") + using PixelHistogramF32U16 = PixelHistogram; + + py::class_(m, "PixelHistogram", + "A histogram for pixel-wise statistics") .def(py::init(), R"( Initialize a PixelHistogram. @@ -34,13 +36,12 @@ void define_pixel_histogram_bindings(py::module &m) { .def( "fill_async", - [](PixelHistogram &self, - py::array_t image) { + [](PixelHistogramF32U16 &self, py::array_t image) { // Copy the numpy buffer into an owned NDArray while we // still hold the GIL so we don't depend on the array's // backing storage outliving this call. auto view = make_view_2d(image); - NDArray owned(view); + NDArray owned(view); // Release the GIL while enqueueing - fill_async can block // on backpressure when the queue is full. py::gil_scoped_release release; @@ -60,7 +61,7 @@ void define_pixel_histogram_bindings(py::module &m) { )", py::arg("image").noconvert()) - .def("flush", &PixelHistogram::flush, + .def("flush", &PixelHistogramF32U16::flush, R"( Block until all images submitted via fill_async() have been merged into the accumulators. Cheap when nothing is pending. @@ -69,15 +70,14 @@ void define_pixel_histogram_bindings(py::module &m) { .def( "values", - [](const PixelHistogram &self) { + [](const PixelHistogramF32U16 &self) { // values() implicitly flushes - release the GIL while it // does so. Allocation/copy into the NDArray runs without // the GIL too; only the numpy wrapping needs it. - NDArray *ptr = nullptr; + NDArray *ptr = nullptr; { py::gil_scoped_release release; - ptr = new NDArray( - self.values()); + ptr = new NDArray(self.values()); } return return_image_data(ptr); }, @@ -94,9 +94,8 @@ void define_pixel_histogram_bindings(py::module &m) { .def( "bin_centers", - [](const PixelHistogram &self) { - auto ptr = new NDArray( - self.bin_centers()); + [](const PixelHistogramF32U16 &self) { + auto ptr = new NDArray(self.bin_centers()); return return_image_data(ptr); }, R"( @@ -107,9 +106,8 @@ void define_pixel_histogram_bindings(py::module &m) { )") .def( "bin_edges", - [](const PixelHistogram &self) { - auto ptr = - new NDArray(self.bin_edges()); + [](const PixelHistogramF32U16 &self) { + auto ptr = new NDArray(self.bin_edges()); return return_image_data(ptr); }, R"( diff --git a/src/hist/PixelHistogram.cpp b/src/hist/PixelHistogram.cpp deleted file mode 100644 index 5e8baec..0000000 --- a/src/hist/PixelHistogram.cpp +++ /dev/null @@ -1,246 +0,0 @@ -#include "aare/hist/PixelHistogram.hpp" - -#include -#include -#include -#include -#include - -namespace aare { - -PixelHistogram::PixelHistogram(int rows, int cols, int n_bins, double xmin, - double xmax, int n_threads, - std::size_t max_pending) - : rows_(rows), cols_(cols), n_threads_(n_threads), xmin_(xmin), xmax_(xmax), - current_image_(nullptr), completed_threads_(0), stop_workers_(false), - work_generation_(0) { - if (rows_ < 1 || cols_ < 1 || n_bins < 1) { - throw std::invalid_argument( - "PixelHistogram requires positive rows, cols and bins"); - } - if (n_threads < 1) { - throw std::invalid_argument( - "PixelHistogram requires at least one thread"); - } - if (max_pending < 1) { - throw std::invalid_argument("PixelHistogram requires max_pending >= 1"); - } - - n_threads_ = std::min(n_threads_, rows_); - - // Build a balanced row partition. With base = rows_ / n_threads_ and - // extra = rows_ % n_threads_, the first `extra` threads get base + 1 - // rows each and the rest get `base` rows. This avoids the - // ceil(rows/n_threads) scheme leaving trailing threads with zero or - // negative row counts (e.g. rows=17, n_threads=8). - row_offsets_.resize(n_threads_ + 1); - const int base = rows_ / n_threads_; - const int extra = rows_ % n_threads_; - int offset = 0; - for (int i = 0; i < n_threads_; ++i) { - row_offsets_[i] = offset; - offset += base + (i < extra ? 1 : 0); - } - row_offsets_[n_threads_] = offset; // == rows_ by construction - - // Initialize partial histograms for each thread - partial_hists_.reserve(n_threads_); - for (int i = 0; i < n_threads_; ++i) { - const auto local_rows = row_count(i); - partial_hists_.emplace_back(local_rows, cols, n_bins, - static_cast(xmin), - static_cast(xmax)); - } - - // Spawn worker threads - for (int i = 0; i < n_threads_; ++i) { - workers_.emplace_back([this, i]() { this->worker_loop(i); }); - } - - // Async pipeline. The PCQ holds (size - 1) usable slots, so size up by - // one to honour the requested max_pending. - async_queue_ = std::make_unique( - static_cast(max_pending + 1)); - coordinator_ = std::thread([this]() { this->coordinator_loop(); }); -} - -PixelHistogram::~PixelHistogram() { - // Drain any pending async fills before tearing down the worker pool. - // The coordinator's loop keeps processing while stop_coordinator_ is - // true as long as the queue is non-empty (mirrors ClusterFinderMT). - if (coordinator_.joinable()) { - stop_coordinator_ = true; - coordinator_.join(); - } - - // Signal all workers to stop - { - std::unique_lock lock(work_mutex_); - stop_workers_ = true; - } - work_cv_.notify_all(); - - // Join all worker threads - for (auto &thread : workers_) { - if (thread.joinable()) { - thread.join(); - } - } -} - -int PixelHistogram::row_start(int thread_id) const { - return row_offsets_[thread_id]; -} - -int PixelHistogram::row_count(int thread_id) const { - return row_offsets_[thread_id + 1] - row_offsets_[thread_id]; -} - -void PixelHistogram::worker_loop(int thread_id) { - int last_generation = 0; - - while (true) { - std::unique_lock lock(work_mutex_); - work_cv_.wait(lock, [this, last_generation]() { - return work_generation_ != last_generation || stop_workers_; - }); - - if (stop_workers_) { - break; - } - - // Get work assignment - const NDView &image = *current_image_; - const int generation = work_generation_; - const int first_row = row_start(thread_id); - const int local_rows = row_count(thread_id); - - lock.unlock(); - - // Do the work: fill this thread's partial histogram. The - // [xmin, xmax) range gate lives inside PixelHistogramImpl::fill. - auto &my_hist = partial_hists_[thread_id]; - const auto cols = image.shape(1); - for (int local_row = 0; local_row < local_rows; ++local_row) { - const auto row = static_cast(first_row + local_row); - for (ssize_t col = 0; col < cols; ++col) { - const auto val = image(row, col); - my_hist.fill_unchecked(local_row, static_cast(col), val); - } - } - - // Signal completion - { - std::unique_lock done_lock(work_mutex_); - last_generation = generation; - completed_threads_++; - if (completed_threads_ == n_threads_) { - done_cv_.notify_one(); - } - } - } -} - -NDArray PixelHistogram::values() const { - // Make sure any pending async fills are merged in before we snapshot - // the partial histograms. Cheap when the queue is already drained. - flush(); - - const auto first_shard_view = partial_hists_.front().view(); - const auto cols = static_cast(first_shard_view.shape(1)); - const auto bins = static_cast(first_shard_view.shape(2)); - const auto rows = static_cast(rows_); - - NDArray data({rows, cols, bins}); - - // Each thread owns a disjoint, contiguous range of rows. The shard's - // dense storage layout [local_row][col][bin] is identical to the slice - // [first_row .. first_row + local_rows)[col][bin] of `data`, so the - // merge is just one bulk copy per thread; no per-element accumulation - // and no upfront zeroing of `data` is needed. - const size_t pixel_stride = static_cast(cols) * bins; - for (int t = 0; t < n_threads_; ++t) { - const auto first_row = static_cast(row_start(t)); - const auto local_rows = static_cast(row_count(t)); - if (local_rows == 0) - continue; - - const auto shard_view = partial_hists_[t].view(); - std::memcpy(data.data() + first_row * pixel_stride, shard_view.data(), - local_rows * pixel_stride * sizeof(StorageType)); - } - - return data; -} - -void PixelHistogram::dispatch(const NDView &image) { - // Called only by the coordinator thread on images already shape-checked - // by fill_async, so there is no need to re-validate or to serialise - // against other callers. - - // Reset counters and set work - { - std::unique_lock lock(work_mutex_); - completed_threads_ = 0; - current_image_ = ℑ - ++work_generation_; - } - - // Signal all worker threads to start - work_cv_.notify_all(); - - // Wait for all workers to complete - { - std::unique_lock lock(work_mutex_); - done_cv_.wait(lock, - [this]() { return completed_threads_ == n_threads_; }); - current_image_ = nullptr; // Clear work assignment - } -} - -void PixelHistogram::fill_async(NDArray image) { - if (image.shape(0) != rows_ || image.shape(1) != cols_) { - throw std::invalid_argument( - "PixelHistogram image shape does not match constructor shape"); - } - - // SPSC backpressure: spin with a short sleep until a slot frees up. - // The std::move only consumes `image` on the iteration that succeeds - // (placement-new inside write() runs only when the slot is free). - while (!async_queue_->write(std::move(image))) { - std::this_thread::sleep_for(async_wait_); - } -} - -void PixelHistogram::flush() const { - while (!async_queue_->isEmpty() || - coordinator_busy_.load(std::memory_order_acquire)) { - std::this_thread::sleep_for(async_wait_); - } -} - -void PixelHistogram::coordinator_loop() { - NDArray item; - while (!stop_coordinator_.load(std::memory_order_acquire) || - !async_queue_->isEmpty()) { - if (async_queue_->read(item)) { - coordinator_busy_.store(true, std::memory_order_release); - dispatch(item.view()); - coordinator_busy_.store(false, std::memory_order_release); - } else { - std::this_thread::sleep_for(async_wait_); - } - } -} - -NDArray PixelHistogram::bin_centers() const { - // All shards share the same value-axis configuration, so any one will - // do; pick the first. - return partial_hists_.front().bin_centers(); -} - -NDArray PixelHistogram::bin_edges() const { - return partial_hists_.front().bin_edges(); -} - -} // namespace aare diff --git a/src/hist/PixelHistogram.test.cpp b/src/hist/PixelHistogram.test.cpp index 135c730..6c230e3 100644 --- a/src/hist/PixelHistogram.test.cpp +++ b/src/hist/PixelHistogram.test.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include "test_config.hpp" @@ -20,7 +21,7 @@ namespace { // The synchronous fill() has been removed; fill_async() is the only entry // point. This helper submits one frame and blocks until it has been merged // so the tests can keep their straightforward, ordered expectations. -void fill_blocking(PixelHistogram &hist, const NDView &image) { +void fill_blocking(PixelHistogram<> &hist, const NDView &image) { NDArray owned(image); hist.fill_async(std::move(owned)); hist.flush(); @@ -309,8 +310,7 @@ TEST_CASE("Destructor drains pending async fills") { frames.push_back(std::move(img)); } - NDArray snapshot({rows, cols, n_bins}, - uint16_t{0}); + NDArray snapshot({rows, cols, n_bins}, uint16_t{0}); { PixelHistogram hist(rows, cols, n_bins, xmin, xmax, 2, max_pending); for (auto &img : frames) { @@ -346,3 +346,27 @@ TEST_CASE("Destructor drains pending async fills") { } CHECK(all_match); } + +TEST_CASE("PixelHistogram supports custom storage and axis types") { + PixelHistogram hist(2, 2, 4, 0.0, 4.0, 1); + NDArray image({2, 2}, -1.0); + + image(0, 0) = 0.25; + image(0, 1) = 1.25; + image(1, 0) = 2.25; + image(1, 1) = 3.25; + + hist.fill_async(std::move(image)); + hist.flush(); + + auto values = hist.values(); + STATIC_REQUIRE(std::is_same_v); + CHECK(values(0, 0, 0) == uint32_t{1}); + CHECK(values(0, 1, 1) == uint32_t{1}); + CHECK(values(1, 0, 2) == uint32_t{1}); + CHECK(values(1, 1, 3) == uint32_t{1}); + + auto centers = hist.bin_centers(); + STATIC_REQUIRE(std::is_same_v); + CHECK(centers.shape(0) == 4); +}