templated PixelHistogram

This commit is contained in:
Erik Fröjdh
2026-06-05 14:51:31 +02:00
parent e7cad1a6e7
commit e4840600ae
5 changed files with 298 additions and 270 deletions
-1
View File
@@ -406,7 +406,6 @@ set(SourceFiles
${CMAKE_CURRENT_SOURCE_DIR}/src/defs.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ROIGeometry.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/DetectorGeometry.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/hist/PixelHistogram.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/hist/PedestalTrackingPixelHistogram.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/Dtype.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/File.cpp
+257 -4
View File
@@ -4,22 +4,23 @@
#include "aare/ProducerConsumerQueue.hpp"
#include "aare/hist/PixelHistogramImpl.hpp"
#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <thread>
#include <utility>
#include <vector>
namespace aare {
template <typename StorageType = uint16_t, typename AxisType = float>
class PixelHistogram {
public:
using StorageType = uint16_t;
using AxisType = float;
private:
using Hist = PixelHistogramImpl<AxisType, StorageType>;
using AsyncQueue = ProducerConsumerQueue<NDArray<AxisType, 2>>;
@@ -86,4 +87,256 @@ class PixelHistogram {
NDArray<AxisType, 1> bin_edges() const;
};
template <typename StorageType, typename AxisType>
PixelHistogram<StorageType, AxisType>::PixelHistogram(int rows, int cols,
int n_bins, double xmin,
double xmax,
int n_threads,
std::size_t max_pending)
: rows_(rows), cols_(cols), n_threads_(n_threads), xmin_(xmin), xmax_(xmax),
current_image_(nullptr), completed_threads_(0), stop_workers_(false),
work_generation_(0) {
if (rows_ < 1 || cols_ < 1 || n_bins < 1) {
throw std::invalid_argument(
"PixelHistogram requires positive rows, cols and bins");
}
if (n_threads < 1) {
throw std::invalid_argument(
"PixelHistogram requires at least one thread");
}
if (max_pending < 1) {
throw std::invalid_argument("PixelHistogram requires max_pending >= 1");
}
n_threads_ = std::min(n_threads_, rows_);
// Build a balanced row partition. With base = rows_ / n_threads_ and
// extra = rows_ % n_threads_, the first `extra` threads get base + 1
// rows each and the rest get `base` rows. This avoids the
// ceil(rows/n_threads) scheme leaving trailing threads with zero or
// negative row counts (e.g. rows=17, n_threads=8).
row_offsets_.resize(n_threads_ + 1);
const int base = rows_ / n_threads_;
const int extra = rows_ % n_threads_;
int offset = 0;
for (int i = 0; i < n_threads_; ++i) {
row_offsets_[i] = offset;
offset += base + (i < extra ? 1 : 0);
}
row_offsets_[n_threads_] = offset; // == rows_ by construction
// Initialize partial histograms for each thread
partial_hists_.reserve(n_threads_);
for (int i = 0; i < n_threads_; ++i) {
const auto local_rows = row_count(i);
partial_hists_.emplace_back(local_rows, cols, n_bins,
static_cast<AxisType>(xmin),
static_cast<AxisType>(xmax));
}
// Spawn worker threads
for (int i = 0; i < n_threads_; ++i) {
workers_.emplace_back([this, i]() { this->worker_loop(i); });
}
// Async pipeline. The PCQ holds (size - 1) usable slots, so size up by
// one to honour the requested max_pending.
async_queue_ = std::make_unique<AsyncQueue>(
static_cast<std::uint32_t>(max_pending + 1));
coordinator_ = std::thread([this]() { this->coordinator_loop(); });
}
template <typename StorageType, typename AxisType>
PixelHistogram<StorageType, AxisType>::~PixelHistogram() {
// Drain any pending async fills before tearing down the worker pool.
// The coordinator's loop keeps processing while stop_coordinator_ is
// true as long as the queue is non-empty (mirrors ClusterFinderMT).
if (coordinator_.joinable()) {
stop_coordinator_ = true;
coordinator_.join();
}
// Signal all workers to stop
{
std::unique_lock<std::mutex> lock(work_mutex_);
stop_workers_ = true;
}
work_cv_.notify_all();
// Join all worker threads
for (auto &thread : workers_) {
if (thread.joinable()) {
thread.join();
}
}
}
template <typename StorageType, typename AxisType>
int PixelHistogram<StorageType, AxisType>::row_start(int thread_id) const {
return row_offsets_[thread_id];
}
template <typename StorageType, typename AxisType>
int PixelHistogram<StorageType, AxisType>::row_count(int thread_id) const {
return row_offsets_[thread_id + 1] - row_offsets_[thread_id];
}
template <typename StorageType, typename AxisType>
void PixelHistogram<StorageType, AxisType>::worker_loop(int thread_id) {
int last_generation = 0;
while (true) {
std::unique_lock<std::mutex> lock(work_mutex_);
work_cv_.wait(lock, [this, last_generation]() {
return work_generation_ != last_generation || stop_workers_;
});
if (stop_workers_) {
break;
}
// Get work assignment
const NDView<AxisType, 2> &image = *current_image_;
const int generation = work_generation_;
const int first_row = row_start(thread_id);
const int local_rows = row_count(thread_id);
lock.unlock();
// Do the work: fill this thread's partial histogram. The
// [xmin, xmax) range gate lives inside PixelHistogramImpl::fill.
auto &my_hist = partial_hists_[thread_id];
const auto cols = image.shape(1);
for (int local_row = 0; local_row < local_rows; ++local_row) {
const auto row = static_cast<ssize_t>(first_row + local_row);
for (ssize_t col = 0; col < cols; ++col) {
const auto val = image(row, col);
my_hist.fill_unchecked(local_row, static_cast<int>(col), val);
}
}
// Signal completion
{
std::unique_lock<std::mutex> done_lock(work_mutex_);
last_generation = generation;
completed_threads_++;
if (completed_threads_ == n_threads_) {
done_cv_.notify_one();
}
}
}
}
template <typename StorageType, typename AxisType>
NDArray<StorageType, 3> PixelHistogram<StorageType, AxisType>::values() const {
// Make sure any pending async fills are merged in before we snapshot
// the partial histograms. Cheap when the queue is already drained.
flush();
const auto first_shard_view = partial_hists_.front().view();
const auto cols = static_cast<ssize_t>(first_shard_view.shape(1));
const auto bins = static_cast<ssize_t>(first_shard_view.shape(2));
const auto rows = static_cast<ssize_t>(rows_);
NDArray<StorageType, 3> data({rows, cols, bins});
// Each thread owns a disjoint, contiguous range of rows. The shard's
// dense storage layout [local_row][col][bin] is identical to the slice
// [first_row .. first_row + local_rows)[col][bin] of `data`, so the
// merge is just one bulk copy per thread; no per-element accumulation
// and no upfront zeroing of `data` is needed.
const size_t pixel_stride = static_cast<size_t>(cols) * bins;
for (int t = 0; t < n_threads_; ++t) {
const auto first_row = static_cast<size_t>(row_start(t));
const auto local_rows = static_cast<size_t>(row_count(t));
if (local_rows == 0)
continue;
const auto shard_view = partial_hists_[t].view();
std::memcpy(data.data() + first_row * pixel_stride, shard_view.data(),
local_rows * pixel_stride * sizeof(StorageType));
}
return data;
}
template <typename StorageType, typename AxisType>
void PixelHistogram<StorageType, AxisType>::dispatch(
const NDView<AxisType, 2> &image) {
// Called only by the coordinator thread on images already shape-checked
// by fill_async, so there is no need to re-validate or to serialise
// against other callers.
// Reset counters and set work
{
std::unique_lock<std::mutex> lock(work_mutex_);
completed_threads_ = 0;
current_image_ = &image;
++work_generation_;
}
// Signal all worker threads to start
work_cv_.notify_all();
// Wait for all workers to complete
{
std::unique_lock<std::mutex> lock(work_mutex_);
done_cv_.wait(lock,
[this]() { return completed_threads_ == n_threads_; });
current_image_ = nullptr; // Clear work assignment
}
}
template <typename StorageType, typename AxisType>
void PixelHistogram<StorageType, AxisType>::fill_async(
NDArray<AxisType, 2> image) {
if (image.shape(0) != rows_ || image.shape(1) != cols_) {
throw std::invalid_argument(
"PixelHistogram image shape does not match constructor shape");
}
// SPSC backpressure: spin with a short sleep until a slot frees up.
// The std::move only consumes `image` on the iteration that succeeds
// (placement-new inside write() runs only when the slot is free).
while (!async_queue_->write(std::move(image))) {
std::this_thread::sleep_for(async_wait_);
}
}
template <typename StorageType, typename AxisType>
void PixelHistogram<StorageType, AxisType>::flush() const {
while (!async_queue_->isEmpty() ||
coordinator_busy_.load(std::memory_order_acquire)) {
std::this_thread::sleep_for(async_wait_);
}
}
template <typename StorageType, typename AxisType>
void PixelHistogram<StorageType, AxisType>::coordinator_loop() {
NDArray<AxisType, 2> item;
while (!stop_coordinator_.load(std::memory_order_acquire) ||
!async_queue_->isEmpty()) {
if (async_queue_->read(item)) {
coordinator_busy_.store(true, std::memory_order_release);
dispatch(item.view());
coordinator_busy_.store(false, std::memory_order_release);
} else {
std::this_thread::sleep_for(async_wait_);
}
}
}
template <typename StorageType, typename AxisType>
NDArray<AxisType, 1>
PixelHistogram<StorageType, AxisType>::bin_centers() const {
// All shards share the same value-axis configuration, so any one will
// do; pick the first.
return partial_hists_.front().bin_centers();
}
template <typename StorageType, typename AxisType>
NDArray<AxisType, 1> PixelHistogram<StorageType, AxisType>::bin_edges() const {
return partial_hists_.front().bin_edges();
}
} // namespace aare
+14 -16
View File
@@ -11,8 +11,10 @@ namespace py = pybind11;
using namespace ::aare;
void define_pixel_histogram_bindings(py::module &m) {
py::class_<PixelHistogram>(m, "PixelHistogram",
"A histogram for pixel-wise statistics")
using PixelHistogramF32U16 = PixelHistogram<uint16_t, float>;
py::class_<PixelHistogramF32U16>(m, "PixelHistogram",
"A histogram for pixel-wise statistics")
.def(py::init<int, int, int, double, double, int, std::size_t>(),
R"(
Initialize a PixelHistogram.
@@ -34,13 +36,12 @@ void define_pixel_histogram_bindings(py::module &m) {
.def(
"fill_async",
[](PixelHistogram &self,
py::array_t<PixelHistogram::AxisType, 0> image) {
[](PixelHistogramF32U16 &self, py::array_t<float, 0> image) {
// Copy the numpy buffer into an owned NDArray while we
// still hold the GIL so we don't depend on the array's
// backing storage outliving this call.
auto view = make_view_2d(image);
NDArray<PixelHistogram::AxisType, 2> owned(view);
NDArray<float, 2> owned(view);
// Release the GIL while enqueueing - fill_async can block
// on backpressure when the queue is full.
py::gil_scoped_release release;
@@ -60,7 +61,7 @@ void define_pixel_histogram_bindings(py::module &m) {
)",
py::arg("image").noconvert())
.def("flush", &PixelHistogram::flush,
.def("flush", &PixelHistogramF32U16::flush,
R"(
Block until all images submitted via fill_async() have been
merged into the accumulators. Cheap when nothing is pending.
@@ -69,15 +70,14 @@ void define_pixel_histogram_bindings(py::module &m) {
.def(
"values",
[](const PixelHistogram &self) {
[](const PixelHistogramF32U16 &self) {
// values() implicitly flushes - release the GIL while it
// does so. Allocation/copy into the NDArray runs without
// the GIL too; only the numpy wrapping needs it.
NDArray<PixelHistogram::StorageType, 3> *ptr = nullptr;
NDArray<uint16_t, 3> *ptr = nullptr;
{
py::gil_scoped_release release;
ptr = new NDArray<PixelHistogram::StorageType, 3>(
self.values());
ptr = new NDArray<uint16_t, 3>(self.values());
}
return return_image_data(ptr);
},
@@ -94,9 +94,8 @@ void define_pixel_histogram_bindings(py::module &m) {
.def(
"bin_centers",
[](const PixelHistogram &self) {
auto ptr = new NDArray<PixelHistogram::AxisType, 1>(
self.bin_centers());
[](const PixelHistogramF32U16 &self) {
auto ptr = new NDArray<float, 1>(self.bin_centers());
return return_image_data(ptr);
},
R"(
@@ -107,9 +106,8 @@ void define_pixel_histogram_bindings(py::module &m) {
)")
.def(
"bin_edges",
[](const PixelHistogram &self) {
auto ptr =
new NDArray<PixelHistogram::AxisType, 1>(self.bin_edges());
[](const PixelHistogramF32U16 &self) {
auto ptr = new NDArray<float, 1>(self.bin_edges());
return return_image_data(ptr);
},
R"(
-246
View File
@@ -1,246 +0,0 @@
#include "aare/hist/PixelHistogram.hpp"
#include <algorithm>
#include <cstring>
#include <stdexcept>
#include <utility>
#include <vector>
namespace aare {
PixelHistogram::PixelHistogram(int rows, int cols, int n_bins, double xmin,
double xmax, int n_threads,
std::size_t max_pending)
: rows_(rows), cols_(cols), n_threads_(n_threads), xmin_(xmin), xmax_(xmax),
current_image_(nullptr), completed_threads_(0), stop_workers_(false),
work_generation_(0) {
if (rows_ < 1 || cols_ < 1 || n_bins < 1) {
throw std::invalid_argument(
"PixelHistogram requires positive rows, cols and bins");
}
if (n_threads < 1) {
throw std::invalid_argument(
"PixelHistogram requires at least one thread");
}
if (max_pending < 1) {
throw std::invalid_argument("PixelHistogram requires max_pending >= 1");
}
n_threads_ = std::min(n_threads_, rows_);
// Build a balanced row partition. With base = rows_ / n_threads_ and
// extra = rows_ % n_threads_, the first `extra` threads get base + 1
// rows each and the rest get `base` rows. This avoids the
// ceil(rows/n_threads) scheme leaving trailing threads with zero or
// negative row counts (e.g. rows=17, n_threads=8).
row_offsets_.resize(n_threads_ + 1);
const int base = rows_ / n_threads_;
const int extra = rows_ % n_threads_;
int offset = 0;
for (int i = 0; i < n_threads_; ++i) {
row_offsets_[i] = offset;
offset += base + (i < extra ? 1 : 0);
}
row_offsets_[n_threads_] = offset; // == rows_ by construction
// Initialize partial histograms for each thread
partial_hists_.reserve(n_threads_);
for (int i = 0; i < n_threads_; ++i) {
const auto local_rows = row_count(i);
partial_hists_.emplace_back(local_rows, cols, n_bins,
static_cast<AxisType>(xmin),
static_cast<AxisType>(xmax));
}
// Spawn worker threads
for (int i = 0; i < n_threads_; ++i) {
workers_.emplace_back([this, i]() { this->worker_loop(i); });
}
// Async pipeline. The PCQ holds (size - 1) usable slots, so size up by
// one to honour the requested max_pending.
async_queue_ = std::make_unique<AsyncQueue>(
static_cast<std::uint32_t>(max_pending + 1));
coordinator_ = std::thread([this]() { this->coordinator_loop(); });
}
PixelHistogram::~PixelHistogram() {
// Drain any pending async fills before tearing down the worker pool.
// The coordinator's loop keeps processing while stop_coordinator_ is
// true as long as the queue is non-empty (mirrors ClusterFinderMT).
if (coordinator_.joinable()) {
stop_coordinator_ = true;
coordinator_.join();
}
// Signal all workers to stop
{
std::unique_lock<std::mutex> lock(work_mutex_);
stop_workers_ = true;
}
work_cv_.notify_all();
// Join all worker threads
for (auto &thread : workers_) {
if (thread.joinable()) {
thread.join();
}
}
}
int PixelHistogram::row_start(int thread_id) const {
return row_offsets_[thread_id];
}
int PixelHistogram::row_count(int thread_id) const {
return row_offsets_[thread_id + 1] - row_offsets_[thread_id];
}
void PixelHistogram::worker_loop(int thread_id) {
int last_generation = 0;
while (true) {
std::unique_lock<std::mutex> lock(work_mutex_);
work_cv_.wait(lock, [this, last_generation]() {
return work_generation_ != last_generation || stop_workers_;
});
if (stop_workers_) {
break;
}
// Get work assignment
const NDView<AxisType, 2> &image = *current_image_;
const int generation = work_generation_;
const int first_row = row_start(thread_id);
const int local_rows = row_count(thread_id);
lock.unlock();
// Do the work: fill this thread's partial histogram. The
// [xmin, xmax) range gate lives inside PixelHistogramImpl::fill.
auto &my_hist = partial_hists_[thread_id];
const auto cols = image.shape(1);
for (int local_row = 0; local_row < local_rows; ++local_row) {
const auto row = static_cast<ssize_t>(first_row + local_row);
for (ssize_t col = 0; col < cols; ++col) {
const auto val = image(row, col);
my_hist.fill_unchecked(local_row, static_cast<int>(col), val);
}
}
// Signal completion
{
std::unique_lock<std::mutex> done_lock(work_mutex_);
last_generation = generation;
completed_threads_++;
if (completed_threads_ == n_threads_) {
done_cv_.notify_one();
}
}
}
}
NDArray<PixelHistogram::StorageType, 3> PixelHistogram::values() const {
// Make sure any pending async fills are merged in before we snapshot
// the partial histograms. Cheap when the queue is already drained.
flush();
const auto first_shard_view = partial_hists_.front().view();
const auto cols = static_cast<ssize_t>(first_shard_view.shape(1));
const auto bins = static_cast<ssize_t>(first_shard_view.shape(2));
const auto rows = static_cast<ssize_t>(rows_);
NDArray<StorageType, 3> data({rows, cols, bins});
// Each thread owns a disjoint, contiguous range of rows. The shard's
// dense storage layout [local_row][col][bin] is identical to the slice
// [first_row .. first_row + local_rows)[col][bin] of `data`, so the
// merge is just one bulk copy per thread; no per-element accumulation
// and no upfront zeroing of `data` is needed.
const size_t pixel_stride = static_cast<size_t>(cols) * bins;
for (int t = 0; t < n_threads_; ++t) {
const auto first_row = static_cast<size_t>(row_start(t));
const auto local_rows = static_cast<size_t>(row_count(t));
if (local_rows == 0)
continue;
const auto shard_view = partial_hists_[t].view();
std::memcpy(data.data() + first_row * pixel_stride, shard_view.data(),
local_rows * pixel_stride * sizeof(StorageType));
}
return data;
}
void PixelHistogram::dispatch(const NDView<AxisType, 2> &image) {
// Called only by the coordinator thread on images already shape-checked
// by fill_async, so there is no need to re-validate or to serialise
// against other callers.
// Reset counters and set work
{
std::unique_lock<std::mutex> lock(work_mutex_);
completed_threads_ = 0;
current_image_ = &image;
++work_generation_;
}
// Signal all worker threads to start
work_cv_.notify_all();
// Wait for all workers to complete
{
std::unique_lock<std::mutex> lock(work_mutex_);
done_cv_.wait(lock,
[this]() { return completed_threads_ == n_threads_; });
current_image_ = nullptr; // Clear work assignment
}
}
void PixelHistogram::fill_async(NDArray<AxisType, 2> image) {
if (image.shape(0) != rows_ || image.shape(1) != cols_) {
throw std::invalid_argument(
"PixelHistogram image shape does not match constructor shape");
}
// SPSC backpressure: spin with a short sleep until a slot frees up.
// The std::move only consumes `image` on the iteration that succeeds
// (placement-new inside write() runs only when the slot is free).
while (!async_queue_->write(std::move(image))) {
std::this_thread::sleep_for(async_wait_);
}
}
void PixelHistogram::flush() const {
while (!async_queue_->isEmpty() ||
coordinator_busy_.load(std::memory_order_acquire)) {
std::this_thread::sleep_for(async_wait_);
}
}
void PixelHistogram::coordinator_loop() {
NDArray<AxisType, 2> item;
while (!stop_coordinator_.load(std::memory_order_acquire) ||
!async_queue_->isEmpty()) {
if (async_queue_->read(item)) {
coordinator_busy_.store(true, std::memory_order_release);
dispatch(item.view());
coordinator_busy_.store(false, std::memory_order_release);
} else {
std::this_thread::sleep_for(async_wait_);
}
}
}
NDArray<PixelHistogram::AxisType, 1> PixelHistogram::bin_centers() const {
// All shards share the same value-axis configuration, so any one will
// do; pick the first.
return partial_hists_.front().bin_centers();
}
NDArray<PixelHistogram::AxisType, 1> PixelHistogram::bin_edges() const {
return partial_hists_.front().bin_edges();
}
} // namespace aare
+27 -3
View File
@@ -5,6 +5,7 @@
#include <cstdint>
#include <random>
#include <thread>
#include <type_traits>
#include <vector>
#include "test_config.hpp"
@@ -20,7 +21,7 @@ namespace {
// The synchronous fill() has been removed; fill_async() is the only entry
// point. This helper submits one frame and blocks until it has been merged
// so the tests can keep their straightforward, ordered expectations.
void fill_blocking(PixelHistogram &hist, const NDView<float, 2> &image) {
void fill_blocking(PixelHistogram<> &hist, const NDView<float, 2> &image) {
NDArray<float, 2> owned(image);
hist.fill_async(std::move(owned));
hist.flush();
@@ -309,8 +310,7 @@ TEST_CASE("Destructor drains pending async fills") {
frames.push_back(std::move(img));
}
NDArray<aare::PixelHistogram::StorageType, 3> snapshot({rows, cols, n_bins},
uint16_t{0});
NDArray<uint16_t, 3> snapshot({rows, cols, n_bins}, uint16_t{0});
{
PixelHistogram hist(rows, cols, n_bins, xmin, xmax, 2, max_pending);
for (auto &img : frames) {
@@ -346,3 +346,27 @@ TEST_CASE("Destructor drains pending async fills") {
}
CHECK(all_match);
}
TEST_CASE("PixelHistogram supports custom storage and axis types") {
PixelHistogram<uint32_t, double> hist(2, 2, 4, 0.0, 4.0, 1);
NDArray<double, 2> image({2, 2}, -1.0);
image(0, 0) = 0.25;
image(0, 1) = 1.25;
image(1, 0) = 2.25;
image(1, 1) = 3.25;
hist.fill_async(std::move(image));
hist.flush();
auto values = hist.values();
STATIC_REQUIRE(std::is_same_v<decltype(values(0, 0, 0)), uint32_t &>);
CHECK(values(0, 0, 0) == uint32_t{1});
CHECK(values(0, 1, 1) == uint32_t{1});
CHECK(values(1, 0, 2) == uint32_t{1});
CHECK(values(1, 1, 3) == uint32_t{1});
auto centers = hist.bin_centers();
STATIC_REQUIRE(std::is_same_v<decltype(centers(0)), double &>);
CHECK(centers.shape(0) == 4);
}