fill_async
Build on RHEL8 / build (push) Failing after 1m17s
Build on RHEL9 / build (push) Failing after 1m22s
Run tests using data on local RHEL8 / build (push) Failing after 1m54s

This commit is contained in:
Erik Frojdh
2026-05-24 10:02:49 +02:00
parent ee6b6dc990
commit e71bbb8288
5 changed files with 317 additions and 15 deletions
+45 -2
View File
@@ -1,12 +1,16 @@
#pragma once
#include "aare/NDArray.hpp"
#include "aare/NDView.hpp"
#include "aare/ProducerConsumerQueue.hpp"
//Lets see if we need to hide it behind a pimpl
#include <boost/histogram.hpp>
#include <cstdint>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
@@ -25,6 +29,8 @@ class PixelHistogram {
bh::axis::integer<int, bh::use_default, bh::axis::option::none_t>,
bh::axis::integer<int, bh::use_default, bh::axis::option::none_t>>;
using Hist = bh::histogram<Axes, bh::dense_storage<StorageType>>;
using AsyncQueue = ProducerConsumerQueue<NDArray<AxisType, 2>>;
int rows_;
int cols_;
int n_threads_;
@@ -47,15 +53,52 @@ class PixelHistogram {
std::atomic<bool> stop_workers_;
int work_generation_;
// Serialises calls into the synchronous fan-out (`fill`). The
// coordinator thread acquires it for the duration of each item it
// processes, and direct callers of `fill` also acquire it. Without
// this, concurrent sync + async fills would race on `current_image_`
// and `work_generation_`.
std::mutex fill_mutex_;
// Async producer/consumer pipeline. SPSC queue feeds the coordinator
// thread, which calls the synchronous `fill` one image at a time.
std::unique_ptr<AsyncQueue> async_queue_;
std::thread coordinator_;
std::atomic<bool> stop_coordinator_{false};
std::atomic<bool> coordinator_busy_{false};
std::chrono::microseconds async_wait_{100};
// Private worker thread method
void worker_loop(int thread_id);
void coordinator_loop();
int row_start(int thread_id) const;
int row_count(int thread_id) const;
public:
PixelHistogram(int rows, int cols, int n_bins, double xmin, double xmax, int n_threads = 1);
PixelHistogram(int rows, int cols, int n_bins, double xmin, double xmax,
int n_threads = 1, std::size_t max_pending = 16);
~PixelHistogram();
// Synchronous fill: blocks until the image has been merged into the
// accumulators. Safe to call concurrently with `fill_async` (calls are
// serialised through `fill_mutex_`).
void fill(const NDView<AxisType, 2> &image);
// Asynchronous fill: takes ownership of `image`, enqueues it for the
// coordinator thread, and returns. Blocks the caller only if the queue
// is full (single-producer, single-consumer queue with a sleep-poll
// backpressure loop, matching the convention in ClusterFinderMT).
void fill_async(NDArray<AxisType, 2> image);
// Wait for all queued async fills to complete. Cheap when the queue
// is already drained.
void flush() const;
// Number of items either queued or currently being processed.
std::size_t pending() const;
// Implicitly flushes pending async fills first so the snapshot is
// consistent with everything that was submitted up to the call.
NDArray<StorageType, 3> hdata() const;
NDArray<AxisType, 1> bin_centers() const;
NDArray<AxisType, 1> bin_edges() const;
+1
View File
@@ -43,3 +43,4 @@ from ._aare import apply_calibration, count_switching_pixels
from ._aare import calculate_pedestal, calculate_pedestal_float, calculate_pedestal_g0, calculate_pedestal_g0_float
from ._aare import VarClusterFinder
from ._aare import PixelHistogram
+67 -9
View File
@@ -13,10 +13,10 @@ using namespace ::aare;
void define_pixel_histogram_bindings(py::module &m) {
py::class_<PixelHistogram>(m, "PixelHistogram",
"A histogram for pixel-wise statistics")
.def(py::init<int, int, int, double, double, int>(),
.def(py::init<int, int, int, double, double, int, std::size_t>(),
R"(
Initialize a PixelHistogram.
Args:
rows: Number of rows in the detector
cols: Number of columns in the detector
@@ -24,9 +24,13 @@ void define_pixel_histogram_bindings(py::module &m) {
xmin: Minimum value for histogram range
xmax: Maximum value for histogram range
n_threads: Number of threads for parallel filling (default: 1)
max_pending: Maximum number of images that can be queued for
asynchronous filling before fill_async() applies
backpressure on the caller (default: 16)
)",
py::arg("rows"), py::arg("cols"), py::arg("n_bins"),
py::arg("xmin"), py::arg("xmax"), py::arg("n_threads") = 1)
py::arg("xmin"), py::arg("xmax"), py::arg("n_threads") = 1,
py::arg("max_pending") = std::size_t{16})
.def("fill",
[](PixelHistogram &self,
@@ -35,21 +39,75 @@ void define_pixel_histogram_bindings(py::module &m) {
self.fill(view);
},
R"(
Fill the histogram with image data.
Fill the histogram with image data (blocking).
Args:
image: A 2D numpy array of pixel values (dtype: float32)
)",
py::arg("image").noconvert())
.def("fill_async",
[](PixelHistogram &self,
py::array_t<PixelHistogram::AxisType, 0> image) {
// Copy the numpy buffer into an owned NDArray while we
// still hold the GIL so we don't depend on the array's
// backing storage outliving this call.
auto view = make_view_2d(image);
NDArray<PixelHistogram::AxisType, 2> owned(view);
// Release the GIL while enqueueing - fill_async can block
// on backpressure when the queue is full.
py::gil_scoped_release release;
self.fill_async(std::move(owned));
},
R"(
Submit an image for asynchronous filling.
The image is copied into an internal buffer before this call
returns, so the caller may mutate or free the numpy array
immediately. The actual histogram update happens on a
background thread. If the internal queue is full this call
blocks (with the GIL released) until a slot becomes available.
Args:
image: A 2D numpy array of pixel values (dtype: float32)
)",
py::arg("image").noconvert())
.def("flush",
&PixelHistogram::flush,
R"(
Block until all images submitted via fill_async() have been
merged into the accumulators. Cheap when nothing is pending.
)",
py::call_guard<py::gil_scoped_release>())
.def("pending",
&PixelHistogram::pending,
R"(
Return the number of images either waiting in the queue or
currently being processed by the background thread. Useful
for monitoring/diagnostics.
)")
.def("hdata",
[](const PixelHistogram &self) {
auto ptr = new NDArray<PixelHistogram::StorageType, 3>(self.hdata());
// hdata() implicitly flushes - release the GIL while it
// does so. Allocation/copy into the NDArray runs without
// the GIL too; only the numpy wrapping needs it.
NDArray<PixelHistogram::StorageType, 3>* ptr = nullptr;
{
py::gil_scoped_release release;
ptr = new NDArray<PixelHistogram::StorageType, 3>(self.hdata());
}
return return_image_data(ptr);
},
R"(
Get the histogram data as a numpy array.
Implicitly flushes any pending asynchronous fills before
returning, so the snapshot is consistent with everything
submitted up to this call.
Returns:
A 3D numpy array containing the histogram bins for each pixel
)")
@@ -61,7 +119,7 @@ void define_pixel_histogram_bindings(py::module &m) {
},
R"(
Get the bin centers along the value axis.
Returns:
A 1D numpy array containing the center values for each histogram bin
)")
@@ -72,7 +130,7 @@ void define_pixel_histogram_bindings(py::module &m) {
},
R"(
Get the bin edges along the value axis.
Returns:
A 1D numpy array containing the edge values for the histogram bins
)");
+82 -4
View File
@@ -5,13 +5,17 @@
#include <boost/histogram/storage_adaptor.hpp>
#include <boost/histogram/unsafe_access.hpp>
#include <cstring>
#include <exception>
#include <iostream>
#include <stdexcept>
#include <utility>
#include <vector>
namespace aare {
PixelHistogram::PixelHistogram(int rows, int cols, int n_bins, double xmin, double xmax, int n_threads):
PixelHistogram::PixelHistogram(int rows, int cols, int n_bins, double xmin, double xmax,
int n_threads, std::size_t max_pending):
rows_(rows), cols_(cols), n_threads_(n_threads), xmin_(xmin), xmax_(xmax), current_image_(nullptr),
completed_threads_(0), stop_workers_(false), work_generation_(0) {
if (rows_ < 1 || cols_ < 1 || n_bins < 1) {
@@ -20,6 +24,9 @@ PixelHistogram::PixelHistogram(int rows, int cols, int n_bins, double xmin, doub
if (n_threads < 1) {
throw std::invalid_argument("PixelHistogram requires at least one thread");
}
if (max_pending < 1) {
throw std::invalid_argument("PixelHistogram requires max_pending >= 1");
}
n_threads_ = std::min(n_threads_, rows_);
@@ -54,16 +61,29 @@ PixelHistogram::PixelHistogram(int rows, int cols, int n_bins, double xmin, doub
for (int i = 0; i < n_threads_; ++i) {
workers_.emplace_back([this, i]() { this->worker_loop(i); });
}
// Async pipeline. The PCQ holds (size - 1) usable slots, so size up by
// one to honour the requested max_pending.
async_queue_ = std::make_unique<AsyncQueue>(static_cast<std::uint32_t>(max_pending + 1));
coordinator_ = std::thread([this]() { this->coordinator_loop(); });
}
PixelHistogram::~PixelHistogram() {
// Drain any pending async fills before tearing down the worker pool.
// The coordinator's loop keeps processing while stop_coordinator_ is
// true as long as the queue is non-empty (mirrors ClusterFinderMT).
if (coordinator_.joinable()) {
stop_coordinator_ = true;
coordinator_.join();
}
// Signal all workers to stop
{
std::unique_lock<std::mutex> lock(work_mutex_);
stop_workers_ = true;
}
work_cv_.notify_all();
// Join all worker threads
for (auto& thread : workers_) {
if (thread.joinable()) {
@@ -126,6 +146,10 @@ void PixelHistogram::worker_loop(int thread_id) {
}
NDArray<PixelHistogram::StorageType, 3> PixelHistogram::hdata() const {
// Make sure any pending async fills are merged in before we snapshot
// the partial histograms. Cheap when the queue is already drained.
flush();
const auto &hist = partial_hists_.front();
const auto bins = static_cast<ssize_t>(hist.axis(0).size());
const auto cols = static_cast<ssize_t>(hist.axis(1).size());
@@ -159,6 +183,10 @@ void PixelHistogram::fill(const NDView<AxisType, 2> &image) {
throw std::invalid_argument("PixelHistogram image shape does not match constructor shape");
}
// Serialise all calls into the fan-out. fill_mutex_ is always the
// outermost lock; work_mutex_ is taken briefly inside it.
std::lock_guard<std::mutex> fill_lock(fill_mutex_);
// Reset counters and set work
{
std::unique_lock<std::mutex> lock(work_mutex_);
@@ -166,10 +194,10 @@ void PixelHistogram::fill(const NDView<AxisType, 2> &image) {
current_image_ = &image;
++work_generation_;
}
// Signal all worker threads to start
work_cv_.notify_all();
// Wait for all workers to complete
{
std::unique_lock<std::mutex> lock(work_mutex_);
@@ -178,6 +206,56 @@ void PixelHistogram::fill(const NDView<AxisType, 2> &image) {
}
}
void PixelHistogram::fill_async(NDArray<AxisType, 2> image) {
if (image.shape(0) != rows_ || image.shape(1) != cols_) {
throw std::invalid_argument("PixelHistogram image shape does not match constructor shape");
}
// SPSC backpressure: spin with a short sleep until a slot frees up.
// The std::move only consumes `image` on the iteration that succeeds
// (placement-new inside write() runs only when the slot is free).
while (!async_queue_->write(std::move(image))) {
std::this_thread::sleep_for(async_wait_);
}
}
void PixelHistogram::flush() const {
while (!async_queue_->isEmpty() || coordinator_busy_.load(std::memory_order_acquire)) {
std::this_thread::sleep_for(async_wait_);
}
}
std::size_t PixelHistogram::pending() const {
// sizeGuess() counts the items still in the queue; the coordinator
// does `read()` (which pops) before setting `coordinator_busy_`, so an
// in-flight item lives only in the busy flag.
return async_queue_->sizeGuess() +
(coordinator_busy_.load(std::memory_order_acquire) ? 1u : 0u);
}
void PixelHistogram::coordinator_loop() {
NDArray<AxisType, 2> item;
while (!stop_coordinator_.load(std::memory_order_acquire) || !async_queue_->isEmpty()) {
if (async_queue_->read(item)) {
coordinator_busy_.store(true, std::memory_order_release);
try {
fill(item.view());
} catch (const std::exception& e) {
// fill_async pre-validates shape, so this is purely
// defensive. Log to stderr and keep the coordinator alive.
std::cerr << "PixelHistogram::fill_async error: "
<< e.what() << std::endl;
} catch (...) {
std::cerr << "PixelHistogram::fill_async error: unknown"
<< std::endl;
}
coordinator_busy_.store(false, std::memory_order_release);
} else {
std::this_thread::sleep_for(async_wait_);
}
}
}
NDArray<PixelHistogram::AxisType, 1> PixelHistogram::bin_centers() const {
const auto& value_axis = partial_hists_.front().axis(0);
const auto n_bins = static_cast<ssize_t>(value_axis.size());
+122
View File
@@ -1,8 +1,10 @@
#include <catch2/catch_test_macros.hpp>
#include <catch2/generators/catch_generators.hpp>
#include <chrono>
#include <cstdint>
#include <random>
#include <thread>
#include <vector>
#include "test_config.hpp"
@@ -206,3 +208,123 @@ TEST_CASE("Random fills match a reference implementation") {
}
CHECK(all_match);
}
TEST_CASE("Async fill matches sync fill") {
// Submit a stream of random frames through fill_async and compare
// against the same frames processed by fill() on a separate histogram.
constexpr int rows = 19;
constexpr int cols = 23;
constexpr int n_bins = 16;
constexpr float xmin = -1.0f;
constexpr float xmax = 3.0f;
const int n_threads = GENERATE(1, 2, 4);
constexpr int n_frames = 32;
// Pick a small queue capacity so the producer trips the backpressure
// path at least a few times during the run.
constexpr std::size_t max_pending = 4;
PixelHistogram async_hist(rows, cols, n_bins, xmin, xmax, n_threads, max_pending);
PixelHistogram sync_hist(rows, cols, n_bins, xmin, xmax, n_threads);
std::mt19937 rng(0xA5A5A5A5);
std::uniform_real_distribution<float> dist(xmin - 0.25f, xmax + 0.25f);
for (int f = 0; f < n_frames; ++f) {
NDArray<float, 2> img({rows, cols}, 0.0f);
for (ssize_t r = 0; r < rows; ++r)
for (ssize_t c = 0; c < cols; ++c)
img(r, c) = dist(rng);
sync_hist.fill(img.view());
async_hist.fill_async(std::move(img));
}
// hdata() calls flush() internally, but exercise the explicit path too.
async_hist.flush();
CHECK(async_hist.pending() == 0);
auto a = async_hist.hdata();
auto s = sync_hist.hdata();
REQUIRE(a.shape(0) == s.shape(0));
REQUIRE(a.shape(1) == s.shape(1));
REQUIRE(a.shape(2) == s.shape(2));
bool all_match = true;
for (ssize_t r = 0; r < rows && all_match; ++r) {
for (ssize_t c = 0; c < cols && all_match; ++c) {
for (ssize_t b = 0; b < n_bins && all_match; ++b) {
if (a(r, c, b) != s(r, c, b)) {
all_match = false;
INFO("r=" << r << " c=" << c << " b=" << b
<< " async=" << a(r, c, b) << " sync=" << s(r, c, b));
CHECK(a(r, c, b) == s(r, c, b));
}
}
}
}
CHECK(all_match);
}
TEST_CASE("fill_async with mismatched shape throws") {
PixelHistogram hist(8, 8, 16, 0.0, 1.0, 2);
NDArray<float, 2> bad({4, 4}, 0.0f);
CHECK_THROWS_AS(hist.fill_async(std::move(bad)), std::invalid_argument);
}
TEST_CASE("Destructor drains pending async fills") {
// Submit more frames than the queue can hold so backpressure kicks in,
// then immediately let the histogram go out of scope and verify that
// the merged hdata() matches the reference computed sequentially.
constexpr int rows = 11;
constexpr int cols = 7;
constexpr int n_bins = 8;
constexpr float xmin = 0.0f;
constexpr float xmax = 1.0f;
constexpr int n_frames = 12;
constexpr std::size_t max_pending = 2;
std::vector<NDArray<float, 2>> frames;
frames.reserve(n_frames);
std::mt19937 rng(0xDEADBEEF);
std::uniform_real_distribution<float> dist(xmin, xmax);
for (int f = 0; f < n_frames; ++f) {
NDArray<float, 2> img({rows, cols}, 0.0f);
for (ssize_t r = 0; r < rows; ++r)
for (ssize_t c = 0; c < cols; ++c)
img(r, c) = dist(rng);
frames.push_back(std::move(img));
}
NDArray<aare::PixelHistogram::StorageType, 3> snapshot({rows, cols, n_bins}, uint16_t{0});
{
PixelHistogram hist(rows, cols, n_bins, xmin, xmax, 2, max_pending);
for (auto& img : frames) {
// Move a copy so we can also build the reference below.
NDArray<float, 2> copy({rows, cols}, 0.0f);
std::memcpy(copy.data(), img.data(), copy.total_bytes());
hist.fill_async(std::move(copy));
}
// No explicit flush(); destructor must drain.
// Capture hdata() *after* the loop but inside the scope so it
// observes everything that was submitted (hdata flushes too).
snapshot = hist.hdata();
}
PixelHistogram reference(rows, cols, n_bins, xmin, xmax, 2);
for (const auto& img : frames) reference.fill(img.view());
auto expected = reference.hdata();
bool all_match = true;
for (ssize_t r = 0; r < rows && all_match; ++r) {
for (ssize_t c = 0; c < cols && all_match; ++c) {
for (ssize_t b = 0; b < n_bins && all_match; ++b) {
if (snapshot(r, c, b) != expected(r, c, b)) {
all_match = false;
INFO("r=" << r << " c=" << c << " b=" << b
<< " got=" << snapshot(r, c, b)
<< " expected=" << expected(r, c, b));
CHECK(snapshot(r, c, b) == expected(r, c, b));
}
}
}
}
CHECK(all_match);
}