diff --git a/CMakeLists.txt b/CMakeLists.txt index fa13be7..b04ff00 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -443,6 +443,7 @@ set(PUBLICHEADERS include/aare/FilePtr.hpp include/aare/Frame.hpp include/aare/PixelHistogram.hpp + include/aare/PedestalTrackingPixelHistogram.hpp include/aare/GainMap.hpp include/aare/ROIGeometry.hpp include/aare/DetectorGeometry.hpp @@ -468,6 +469,7 @@ set(SourceFiles ${CMAKE_CURRENT_SOURCE_DIR}/src/ROIGeometry.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/DetectorGeometry.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/PixelHistogram.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/src/PedestalTrackingPixelHistogram.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/Dtype.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/File.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/FilePtr.cpp diff --git a/include/aare/Pedestal.hpp b/include/aare/Pedestal.hpp index 5753176..9d53c5e 100644 --- a/include/aare/Pedestal.hpp +++ b/include/aare/Pedestal.hpp @@ -45,6 +45,8 @@ template class Pedestal { NDArray mean() { return m_mean; } + const NDView view() const { return m_mean.view(); } + SUM_TYPE mean(const uint32_t row, const uint32_t col) const { return m_mean(row, col); } @@ -110,6 +112,24 @@ template class Pedestal { } } + template void push_with_threshold(const NDView frame, const NDView threshold) { + assert(frame.size() == m_rows * m_cols); + + // TODO! move away from m_rows, m_cols + if (frame.shape() != std::array{m_rows, m_cols}) { + throw std::runtime_error( + "Frame shape does not match pedestal shape"); + } + + for (size_t row = 0; row < m_rows; row++) { + for (size_t col = 0; col < m_cols; col++) { + if (fabs(frame(row, col)-mean(row, col)) < threshold(row, col)) { + push(row, col, frame(row, col)); + } + } + } + } + /** * Push but don't update the cached mean. Speeds up the process * when initializing the pedestal. diff --git a/include/aare/PedestalTrackingPixelHistogram.hpp b/include/aare/PedestalTrackingPixelHistogram.hpp new file mode 100644 index 0000000..505bbfd --- /dev/null +++ b/include/aare/PedestalTrackingPixelHistogram.hpp @@ -0,0 +1,205 @@ +#pragma once +#include "aare/NDArray.hpp" +#include "aare/NDView.hpp" +#include "aare/Pedestal.hpp" +#include "aare/ProducerConsumerQueue.hpp" + +// Lets see if we need to hide it behind a pimpl +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +namespace bh = boost::histogram; + +namespace aare { +// PedestalTrackingPixelHistogram histograms `frame - pedestal_mean` per +// pixel. Both the pedestal and the histogram are sharded by row range: +// thread t exclusively owns the slice of rows +// [row_offsets_[t], row_offsets_[t + 1]) of BOTH `partial_pedestals_[t]` +// and `partial_hists_[t]`, so no two threads ever touch the same memory +// while a fan-out is in flight. +// +// All four pedestal/histogram-mutating operations (`fill`, +// `push_pedestal_no_update`, `update_mean`, FillWithThreshold) are +// dispatched through the same worker pool via a `WorkKind` switch in +// `worker_loop`. They are serialised against each other by +// `fill_mutex_`, which also serialises the async coordinator's calls +// into `fill_with_threshold_`. +// +// The single async entry point is `fill_async_with_threshold`, which +// histograms the residual AND additionally pushes raw pixel samples +// whose residual is within `n_sigma_ * cached_std` of zero back into +// the per-thread pedestal shard (sigma-clipped pedestal tracking). +// Setting `n_sigma_ = 0.0` disables that pedestal-update side effect, +// recovering plain histogram-only async filling. +// +// Typical usage: +// +// for (auto& f : pedestal_frames) pth.push_pedestal_no_update(f); +// pth.update_mean(); +// for (auto& f : measurement_frames) +// pth.fill_async_with_threshold(std::move(f)); +// pth.flush(); +// auto data = pth.hdata(); +class PedestalTrackingPixelHistogram { + public: + using StorageType = uint16_t; + using AxisType = float; + using FrameType = uint16_t; + + private: + using Axes = std::tuple< + bh::axis::regular, + bh::axis::integer, + bh::axis::integer>; + using Hist = bh::histogram>; + using AsyncQueue = ProducerConsumerQueue>; + + // What kind of fan-out work the worker pool should currently do. + // Set under work_mutex_; read by worker_loop after wakeup. + enum class WorkKind { Fill, PushPedestal, UpdateMean, FillWithThreshold }; + + int rows_; + int cols_; + int n_threads_; + const AxisType xmin_; + const AxisType xmax_; + // Cumulative row offsets so that thread t owns rows + // [row_offsets_[t], row_offsets_[t + 1]) + // Length is n_threads_ + 1; partition is balanced (the first + // rows_ % n_threads_ threads get one extra row each). + std::vector row_offsets_; + // Per-thread histograms over (residual, col, local_row). + std::vector partial_hists_; + // Per-thread pedestal sized [local_rows x cols]. Indexed by the + // worker using the LOCAL row index (i.e. 0..row_count(t)-1), NOT the + // global row index. Owned exclusively by worker `t` during a + // dispatched fan-out. + std::vector> partial_pedestals_; + // Per-thread cached std, sized [local_rows x cols]. Written by worker + // thread t inside the UpdateMean case of worker_loop (after the + // shard's m_mean has been refreshed); read by worker t in the + // FillWithThreshold case. Same shard-locality contract as + // partial_pedestals_. + std::vector> partial_std_; + + // Thread pool members + std::vector workers_; + std::mutex work_mutex_; + std::condition_variable work_cv_; + std::condition_variable done_cv_; + WorkKind current_work_kind_; + const NDView *current_image_; + std::atomic completed_threads_; + std::atomic stop_workers_; + int work_generation_; + + // Serialises all fan-outs: `fill`, `push_pedestal_no_update`, + // `update_mean`, and the async coordinator's calls into + // `fill_with_threshold_`. Always the outermost lock; work_mutex_ + // is taken briefly inside it. + mutable std::mutex fill_mutex_; + + // Async producer/consumer pipeline. SPSC queue feeds the coordinator + // thread, which calls `fill_with_threshold_` one image at a time. + std::unique_ptr async_queue_; + std::thread coordinator_; + std::atomic stop_coordinator_{false}; + std::atomic coordinator_busy_{false}; + std::chrono::microseconds async_wait_{100}; + + // Sigma multiplier used as the pedestal-update gate in + // FillWithThreshold. Atomic so the setter can update it without + // taking fill_mutex_; workers do relaxed loads on each pixel. + // Setting it to 0.0 disables the pedestal update entirely. + std::atomic n_sigma_; + + // Private worker thread method + void worker_loop(int thread_id); + void coordinator_loop(); + int row_start(int thread_id) const; + int row_count(int thread_id) const; + + // Sets up the work generation, wakes workers, and waits for them. + // Caller MUST already hold `fill_mutex_`. `image` may be nullptr + // for work kinds that don't need it (e.g. UpdateMean). + void dispatch_(WorkKind kind, const NDView *image); + + // Coordinator-facing entry point: takes fill_mutex_ and dispatches + // FillWithThreshold to the worker pool. Same shape as fill(). + void fill_with_threshold_(const NDView &image); + + public: + PedestalTrackingPixelHistogram(int rows, int cols, int n_bins, + AxisType xmin, AxisType xmax, + int n_threads = 1, + std::size_t max_pending = 16, + double n_sigma = 1.0); + ~PedestalTrackingPixelHistogram(); + + // Accumulate `frame` into the running pedestal estimate without + // refreshing the cached mean (the cheap path used while + // bootstrapping the pedestal). Workers update their own shard. + // Call `update_mean()` once you're done before starting to + // `fill`/`fill_async_with_threshold`. + void push_pedestal_no_update(const NDView &frame); + + // Refresh each partial pedestal's cached per-pixel mean from its + // running sums. Serialises with all other fan-outs through + // `fill_mutex_` so worker reads of the pedestal mean cannot race. + void update_mean(); + + // Snapshot of the per-pixel pedestal mean, stitched together from + // all shards into a single `[rows x cols]` array. Implicitly + // drains pending async fills and takes `fill_mutex_` so it cannot + // tear against an in-flight `update_mean()` (which is the only + // operation that overwrites `m_mean`). + NDArray pedestal_mean() const; + + // Synchronous fill: blocks until the pedestal-subtracted residual + // for `image` has been merged into the accumulators. Safe to call + // concurrently with `fill_async_with_threshold` and the + // pedestal-update API (calls are serialised through `fill_mutex_`). + // Histogram-only - independent of `n_sigma()`. + void fill(const NDView &image); + + // Asynchronous fill with sigma-clipped pedestal tracking. Takes + // ownership of `image`, enqueues it for the coordinator thread, and + // returns. The worker pool histograms each in-range residual AND + // additionally pushes the raw pixel value into the pedestal shard + // when `abs(residual) < n_sigma() * cached_std` (per-pixel gate). + // `n_sigma() = 0.0` disables the pedestal update, recovering plain + // histogram-only async filling. Blocks the caller only if the + // queue is full (single-producer, single-consumer queue with a + // sleep-poll backpressure loop, matching the convention in + // ClusterFinderMT). + void fill_async_with_threshold(NDArray image); + + // Sigma multiplier for the pedestal-update gate in + // fill_async_with_threshold. Atomic; safe to read/write at any + // time (the new value takes effect on subsequent pixel evaluations). + double n_sigma() const; + void set_n_sigma(double n_sigma); + + // Wait for all queued async fills to complete. Cheap when the queue + // is already drained. + void flush() const; + + // Number of items either queued or currently being processed. + std::size_t pending() const; + + // Implicitly flushes pending async fills first so the snapshot is + // consistent with everything that was submitted up to the call. + NDArray hdata() const; + NDArray bin_centers() const; + NDArray bin_edges() const; +}; + +} // namespace aare diff --git a/python/aare/__init__.py b/python/aare/__init__.py index cadeb01..33c499d 100644 --- a/python/aare/__init__.py +++ b/python/aare/__init__.py @@ -43,4 +43,4 @@ from ._aare import apply_calibration, count_switching_pixels from ._aare import calculate_pedestal, calculate_pedestal_float, calculate_pedestal_g0, calculate_pedestal_g0_float from ._aare import VarClusterFinder -from ._aare import PixelHistogram +from ._aare import PixelHistogram, PedestalTrackingPixelHistogram diff --git a/python/src/bind_PedestalTrackingPixelHistogram.hpp b/python/src/bind_PedestalTrackingPixelHistogram.hpp new file mode 100644 index 0000000..79ff42c --- /dev/null +++ b/python/src/bind_PedestalTrackingPixelHistogram.hpp @@ -0,0 +1,246 @@ +// SPDX-License-Identifier: MPL-2.0 +#include "aare/PedestalTrackingPixelHistogram.hpp" +#include "np_helper.hpp" + +#include +#include +#include +#include + +namespace py = pybind11; +using namespace ::aare; + +void define_pedestal_tracking_pixel_histogram_bindings(py::module &m) { + py::class_( + m, "PedestalTrackingPixelHistogram", + "A pixel-wise histogram of frame - pedestal residuals, with a " + "per-pixel running pedestal estimate sharded across worker threads") + .def(py::init(), + R"( + Initialize a PedestalTrackingPixelHistogram. + + Args: + rows: Number of rows in the detector + cols: Number of columns in the detector + n_bins: Number of histogram bins along the residual axis + xmin: Minimum residual value (inclusive) + xmax: Maximum residual value (exclusive) + n_threads: Number of worker threads (default: 1). Each + worker owns a disjoint row slice of both the + pedestal and the histogram, so the partition + determines per-thread memory usage. + max_pending: Maximum number of frames that can be + queued for asynchronous filling before + fill_async_with_threshold() applies backpressure + on the caller (default: 16). + n_sigma: Sigma multiplier used as the gate for the + pedestal-update side effect of + fill_async_with_threshold(): a pixel sample is + pushed back into the pedestal estimate iff + ``abs(residual) < n_sigma * cached_std``. Set to + ``0.0`` to disable the pedestal update and get + histogram-only async behaviour (default: 1.0). + Also exposed live via the ``n_sigma`` property. + )", + py::arg("rows"), py::arg("cols"), py::arg("n_bins"), + py::arg("xmin"), py::arg("xmax"), py::arg("n_threads") = 1, + py::arg("max_pending") = std::size_t{16}, + py::arg("n_sigma") = 1.0) + + .def("push_pedestal_no_update", + [](PedestalTrackingPixelHistogram &self, + py::array_t + frame) { + auto view = make_view_2d(frame); + self.push_pedestal_no_update(view); + }, + R"( + Accumulate `frame` into the per-pixel running pedestal + estimate without refreshing the cached mean. + + Use repeatedly while bootstrapping the pedestal, then call + update_mean() once before starting to fill the histogram. + + Args: + frame: A 2D numpy array of raw pixel values (dtype: uint16) + )", + py::arg("frame").noconvert()) + + .def("update_mean", &PedestalTrackingPixelHistogram::update_mean, + R"( + Refresh each partial pedestal's cached per-pixel mean from + its running sums. Drains pending async fills first, then + dispatches the update to the worker pool so the writes to + each shard happen on the same thread that reads them in + fill(). + )", + py::call_guard()) + + .def("pedestal_mean", + [](const PedestalTrackingPixelHistogram &self) { + // pedestal_mean() flushes + locks + memcpys; do all of + // that without the GIL, only reacquire to wrap into a + // numpy array. + NDArray *ptr = nullptr; + { + py::gil_scoped_release release; + ptr = new NDArray(self.pedestal_mean()); + } + return return_image_data(ptr); + }, + R"( + Snapshot the per-pixel pedestal mean stitched together + from all shards. + + Returns: + A 2D numpy array (rows x cols, dtype: float64) + containing the current cached pedestal mean. + )") + + .def("fill", + [](PedestalTrackingPixelHistogram &self, + py::array_t + image) { + auto view = make_view_2d(image); + self.fill(view); + }, + R"( + Fill the histogram with image data (blocking). + + The pedestal-subtracted residual `image - pedestal_mean` + is computed per pixel inside the worker loop, so the + pedestal must already have been bootstrapped via + push_pedestal_no_update() + update_mean() for this to be + meaningful. + + Args: + image: A 2D numpy array of raw pixel values (dtype: uint16) + )", + py::arg("image").noconvert()) + + .def("fill_async_with_threshold", + [](PedestalTrackingPixelHistogram &self, + py::array_t + image) { + // Copy the numpy buffer into an owned NDArray while we + // still hold the GIL so we don't depend on the array's + // backing storage outliving this call. + auto view = make_view_2d(image); + NDArray owned( + view); + // Release the GIL while enqueueing - + // fill_async_with_threshold can block on backpressure + // when the queue is full. + py::gil_scoped_release release; + self.fill_async_with_threshold(std::move(owned)); + }, + R"( + Submit an image for asynchronous filling with sigma-clipped + pedestal tracking. + + For each pixel the worker pool: + * histograms the pedestal-subtracted residual when it + falls in ``[xmin, xmax)``, and + * additionally pushes the raw pixel value back into the + per-thread pedestal estimate when + ``abs(residual) < n_sigma * cached_std`` (the + sigma-clipped pedestal-update gate). + + The cached std is populated by ``update_mean()``, so + ``push_pedestal_no_update()`` + ``update_mean()`` must have + run at least once for the pedestal-update side effect to + fire. Setting ``n_sigma = 0`` disables the side effect and + recovers plain histogram-only async filling. + + The image is copied into an internal buffer before this call + returns, so the caller may mutate or free the numpy array + immediately. If the internal queue is full this call blocks + (with the GIL released) until a slot becomes available. + + Args: + image: A 2D numpy array of raw pixel values (dtype: uint16) + )", + py::arg("image").noconvert()) + + .def_property("n_sigma", &PedestalTrackingPixelHistogram::n_sigma, + &PedestalTrackingPixelHistogram::set_n_sigma, + R"( + Sigma multiplier used as the pedestal-update gate in + fill_async_with_threshold(). Atomic; safe to read or write + from any thread. Setting it to 0.0 disables the pedestal + update entirely. The new value takes effect on subsequent + per-pixel evaluations inside the worker pool. + )") + + .def("flush", &PedestalTrackingPixelHistogram::flush, + R"( + Block until all images submitted via + fill_async_with_threshold() have been merged into the + accumulators. Cheap when nothing is pending. + )", + py::call_guard()) + + .def("pending", &PedestalTrackingPixelHistogram::pending, + R"( + Return the number of images either waiting in the queue or + currently being processed by the background thread (i.e. + still in flight after fill_async_with_threshold()). Useful + for monitoring/diagnostics. + )") + + .def("hdata", + [](const PedestalTrackingPixelHistogram &self) { + // hdata() implicitly flushes - release the GIL while it + // does so. Allocation/copy into the NDArray runs without + // the GIL too; only the numpy wrapping needs it. + NDArray + *ptr = nullptr; + { + py::gil_scoped_release release; + ptr = new NDArray< + PedestalTrackingPixelHistogram::StorageType, 3>( + self.hdata()); + } + return return_image_data(ptr); + }, + R"( + Get the histogram data as a numpy array. + + Implicitly flushes any pending asynchronous fills before + returning, so the snapshot is consistent with everything + submitted up to this call. + + Returns: + A 3D numpy array (rows x cols x n_bins, dtype: uint16) + containing the histogram bins for each pixel. + )") + + .def("bin_centers", + [](const PedestalTrackingPixelHistogram &self) { + auto ptr = new NDArray< + PedestalTrackingPixelHistogram::AxisType, 1>( + self.bin_centers()); + return return_image_data(ptr); + }, + R"( + Get the bin centers along the residual axis. + + Returns: + A 1D numpy array (dtype: float32) of bin center values. + )") + + .def("bin_edges", + [](const PedestalTrackingPixelHistogram &self) { + auto ptr = new NDArray< + PedestalTrackingPixelHistogram::AxisType, 1>( + self.bin_edges()); + return return_image_data(ptr); + }, + R"( + Get the bin edges along the residual axis. + + Returns: + A 1D numpy array (dtype: float32) of bin edge values. + )"); +} diff --git a/python/src/module.cpp b/python/src/module.cpp index 02f4e6a..023a19e 100644 --- a/python/src/module.cpp +++ b/python/src/module.cpp @@ -12,6 +12,7 @@ #include "bind_Defs.hpp" #include "bind_Eta.hpp" #include "bind_Interpolator.hpp" +#include "bind_PedestalTrackingPixelHistogram.hpp" #include "bind_PixelHistogram.hpp" #include "bind_PixelMap.hpp" #include "bind_RawFile.hpp" @@ -66,6 +67,7 @@ PYBIND11_MODULE(_aare, m) { define_var_cluster_finder_bindings(m); define_pixel_map_bindings(m); define_pixel_histogram_bindings(m); + define_pedestal_tracking_pixel_histogram_bindings(m); define_pedestal_bindings(m, "Pedestal_d"); define_pedestal_bindings(m, "Pedestal_f"); define_fit_bindings(m); diff --git a/python/src/pedestal.hpp b/python/src/pedestal.hpp index 9bef948..f0b27e2 100644 --- a/python/src/pedestal.hpp +++ b/python/src/pedestal.hpp @@ -21,6 +21,23 @@ void define_pedestal_bindings(py::module &m, const std::string &name) { *mea = self.mean(); return return_image_data(mea); }) + .def("view", + [](py::object self_py) { + auto &self = self_py.cast &>(); + auto v = self.view(); + std::array shape{ + static_cast(v.shape(0)), + static_cast(v.shape(1))}; + std::array byte_strides{ + static_cast(v.strides()[0]) * + static_cast(sizeof(SUM_TYPE)), + static_cast(v.strides()[1]) * + static_cast(sizeof(SUM_TYPE))}; + auto arr = py::array_t(shape, byte_strides, + v.data(), self_py); + arr.attr("setflags")(py::arg("write") = false); + return arr; + }) .def("variance", [](Pedestal &self) { auto var = new NDArray{}; @@ -49,6 +66,16 @@ void define_pedestal_bindings(py::module &m, const std::string &name) { auto v = make_view_2d(f); pedestal.push(v); }) + .def( + "push_with_threshold", + [](Pedestal &pedestal, + py::array_t &f, + py::array_t &threshold) { + auto frame_view = make_view_2d(f); + auto threshold_view = make_view_2d(threshold); + pedestal.push_with_threshold(frame_view, threshold_view); + }, + py::arg("frame").noconvert(), py::arg("threshold").noconvert()) .def( "push_no_update", [](Pedestal &pedestal, diff --git a/src/PedestalTrackingPixelHistogram.cpp b/src/PedestalTrackingPixelHistogram.cpp new file mode 100644 index 0000000..208fb88 --- /dev/null +++ b/src/PedestalTrackingPixelHistogram.cpp @@ -0,0 +1,465 @@ +#include "aare/PedestalTrackingPixelHistogram.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace aare { + +PedestalTrackingPixelHistogram::PedestalTrackingPixelHistogram( + int rows, int cols, int n_bins, AxisType xmin, AxisType xmax, int n_threads, + std::size_t max_pending, double n_sigma) + : rows_(rows), cols_(cols), n_threads_(n_threads), xmin_(xmin), xmax_(xmax), + current_work_kind_(WorkKind::Fill), current_image_(nullptr), + completed_threads_(0), stop_workers_(false), work_generation_(0), + n_sigma_(n_sigma) { + if (rows_ < 1 || cols_ < 1 || n_bins < 1) { + throw std::invalid_argument( + "PedestalTrackingPixelHistogram requires positive rows, cols and bins"); + } + if (n_threads < 1) { + throw std::invalid_argument( + "PedestalTrackingPixelHistogram requires at least one thread"); + } + if (max_pending < 1) { + throw std::invalid_argument( + "PedestalTrackingPixelHistogram requires max_pending >= 1"); + } + + n_threads_ = std::min(n_threads_, rows_); + + // Build a balanced row partition. With base = rows_ / n_threads_ and + // extra = rows_ % n_threads_, the first `extra` threads get base + 1 + // rows each and the rest get `base` rows. This avoids the + // ceil(rows/n_threads) scheme leaving trailing threads with zero or + // negative row counts (e.g. rows=17, n_threads=8). + row_offsets_.resize(n_threads_ + 1); + const int base = rows_ / n_threads_; + const int extra = rows_ % n_threads_; + int offset = 0; + for (int i = 0; i < n_threads_; ++i) { + row_offsets_[i] = offset; + offset += base + (i < extra ? 1 : 0); + } + row_offsets_[n_threads_] = offset; // == rows_ by construction + + // Initialize partial histograms, partial pedestals and the cached + // per-pixel std for each thread. All three are sized to the + // thread's row slice and indexed by local_row (0..local_rows-1), + // so the worker can address them with the same coordinates. + partial_hists_.reserve(n_threads_); + partial_pedestals_.reserve(n_threads_); + partial_std_.reserve(n_threads_); + for (int i = 0; i < n_threads_; ++i) { + const auto local_rows = row_count(i); + partial_hists_.emplace_back( + bh::axis::regular(n_bins, xmin, xmax, + "value"), + bh::axis::integer( + 0, cols, "y"), + bh::axis::integer( + 0, local_rows, "x")); + partial_pedestals_.emplace_back(static_cast(local_rows), + static_cast(cols)); + partial_std_.emplace_back(NDArray( + {static_cast(local_rows), static_cast(cols)}, + 0.0)); + } + + // Spawn worker threads + for (int i = 0; i < n_threads_; ++i) { + workers_.emplace_back([this, i]() { this->worker_loop(i); }); + } + + // Async pipeline. The PCQ holds (size - 1) usable slots, so size up by + // one to honour the requested max_pending. + async_queue_ = + std::make_unique(static_cast(max_pending + 1)); + coordinator_ = std::thread([this]() { this->coordinator_loop(); }); +} + +PedestalTrackingPixelHistogram::~PedestalTrackingPixelHistogram() { + // Drain any pending async fills before tearing down the worker pool. + // The coordinator's loop keeps processing while stop_coordinator_ is + // true as long as the queue is non-empty (mirrors ClusterFinderMT). + if (coordinator_.joinable()) { + stop_coordinator_ = true; + coordinator_.join(); + } + + // Signal all workers to stop + { + std::unique_lock lock(work_mutex_); + stop_workers_ = true; + } + work_cv_.notify_all(); + + // Join all worker threads + for (auto &thread : workers_) { + if (thread.joinable()) { + thread.join(); + } + } +} + +int PedestalTrackingPixelHistogram::row_start(int thread_id) const { + return row_offsets_[thread_id]; +} + +int PedestalTrackingPixelHistogram::row_count(int thread_id) const { + return row_offsets_[thread_id + 1] - row_offsets_[thread_id]; +} + +void PedestalTrackingPixelHistogram::dispatch_( + WorkKind kind, const NDView *image) { + // Caller must already hold fill_mutex_. Reset counters, publish the + // new work item, wake the workers, wait for completion. + { + std::unique_lock lock(work_mutex_); + completed_threads_ = 0; + current_work_kind_ = kind; + current_image_ = image; + ++work_generation_; + } + work_cv_.notify_all(); + { + std::unique_lock lock(work_mutex_); + done_cv_.wait(lock, + [this]() { return completed_threads_ == n_threads_; }); + current_image_ = nullptr; + } +} + +void PedestalTrackingPixelHistogram::push_pedestal_no_update( + const NDView &frame) { + if (frame.shape(0) != rows_ || frame.shape(1) != cols_) { + throw std::invalid_argument( + "PedestalTrackingPixelHistogram frame shape does not match " + "constructor shape"); + } + std::lock_guard fill_lock(fill_mutex_); + dispatch_(WorkKind::PushPedestal, &frame); +} + +void PedestalTrackingPixelHistogram::update_mean() { + // Drain any in-flight async fills first; the coordinator does NOT + // hold fill_mutex_ at that point, so we can grab it safely after. + flush(); + std::lock_guard fill_lock(fill_mutex_); + dispatch_(WorkKind::UpdateMean, nullptr); +} + +void PedestalTrackingPixelHistogram::worker_loop(int thread_id) { + int last_generation = 0; + + while (true) { + std::unique_lock lock(work_mutex_); + work_cv_.wait(lock, [this, last_generation]() { + return work_generation_ != last_generation || stop_workers_; + }); + + if (stop_workers_) { + break; + } + + // Snapshot the work assignment under the lock so we don't race + // against the next dispatch publishing new state. + const WorkKind kind = current_work_kind_; + const NDView *image = current_image_; + const int generation = work_generation_; + const int first_row = row_start(thread_id); + const int local_rows = row_count(thread_id); + + lock.unlock(); + + auto &my_pedestal = partial_pedestals_[thread_id]; + auto &my_hist = partial_hists_[thread_id]; + + switch (kind) { + case WorkKind::Fill: { + // Histogram the pedestal-subtracted residual for each pixel + // in this thread's row slice. `my_pedestal` is sized to the + // local row range and indexed by (local_row, col). + for (int local_row = 0; local_row < local_rows; ++local_row) { + const auto row = static_cast(first_row + local_row); + for (ssize_t col = 0; col < image->shape(1); ++col) { + const AxisType val = + static_cast((*image)(row, col)) - + static_cast(my_pedestal.mean( + static_cast(local_row), + static_cast(col))); + if (val < xmin_ || val >= xmax_) { + continue; // Skip out-of-range residuals + } + my_hist(val, col, local_row); + } + } + break; + } + case WorkKind::PushPedestal: { + // Accumulate raw frame values into this thread's pedestal + // shard. Uses the pixel-level push_no_update which only + // touches m_sum/m_sum2/m_cur_samples (no m_mean writes). + for (int local_row = 0; local_row < local_rows; ++local_row) { + const auto row = static_cast(first_row + local_row); + for (ssize_t col = 0; col < image->shape(1); ++col) { + my_pedestal.template push_no_update( + static_cast(local_row), + static_cast(col), (*image)(row, col)); + } + } + break; + } + case WorkKind::UpdateMean: { + // Recompute m_mean from the running sums. Only touches this + // thread's shard. Also refresh the cached per-pixel std so + // FillWithThreshold can read it without recomputing on the + // hot path. + my_pedestal.update_mean(); + auto &my_std = partial_std_[thread_id]; + for (int local_row = 0; local_row < local_rows; ++local_row) { + for (int col = 0; col < cols_; ++col) { + my_std(local_row, col) = static_cast( + my_pedestal.std(static_cast(local_row), + static_cast(col))); + } + } + break; + } + case WorkKind::FillWithThreshold: { + // Histogram the pedestal-subtracted residual AND, for pixels + // whose residual is consistent with noise + // (|residual| < n_sigma * cached_std), feed the raw value + // back into the pedestal shard. With n_sigma == 0 the gate + // never fires, which makes this case behave identically to + // WorkKind::Fill (modulo the per-pixel gate evaluation). + auto &my_std = partial_std_[thread_id]; + const double n_sigma = n_sigma_.load(std::memory_order_relaxed); + for (int local_row = 0; local_row < local_rows; ++local_row) { + const auto row = static_cast(first_row + local_row); + for (ssize_t col = 0; col < image->shape(1); ++col) { + const FrameType raw = (*image)(row, col); + const AxisType val = + static_cast(raw) - + static_cast(my_pedestal.mean( + static_cast(local_row), + static_cast(col))); + if (val >= xmin_ && val < xmax_) { + my_hist(val, col, local_row); + } + const double sigma = my_std(local_row, col); + if (sigma > 0.0 && + std::abs(static_cast(val)) < n_sigma * sigma) { + my_pedestal.template push( + static_cast(local_row), + static_cast(col), raw); + } + } + } + break; + } + } + + // Signal completion + { + std::unique_lock done_lock(work_mutex_); + last_generation = generation; + completed_threads_++; + if (completed_threads_ == n_threads_) { + done_cv_.notify_one(); + } + } + } +} + +NDArray +PedestalTrackingPixelHistogram::hdata() const { + // Make sure any pending async fills are merged in before we snapshot + // the partial histograms. Cheap when the queue is already drained. + flush(); + + const auto &hist = partial_hists_.front(); + const auto bins = static_cast(hist.axis(0).size()); + const auto cols = static_cast(hist.axis(1).size()); + const auto rows = static_cast(rows_); + + NDArray data({rows, cols, bins}); + + // Each thread owns a disjoint, contiguous range of rows and its dense + // storage layout [local_row][col][bin] is identical to the slice + // [first_row .. first_row + local_rows)[col][bin] of `data`. So the + // merge is just one bulk copy per thread; no per-element accumulation + // and no upfront zeroing of `data` is needed. + const size_t pixel_stride = static_cast(cols) * bins; + for (int t = 0; t < n_threads_; ++t) { + const auto &storage = bh::unsafe_access::storage(partial_hists_[t]); + const StorageType *partial_data = storage.data(); + const auto first_row = static_cast(row_start(t)); + const auto local_rows = static_cast(row_count(t)); + if (local_rows == 0) + continue; + + std::memcpy(data.data() + first_row * pixel_stride, partial_data, + local_rows * pixel_stride * sizeof(StorageType)); + } + + return data; +} + +NDArray PedestalTrackingPixelHistogram::pedestal_mean() const { + // Drain in-flight async fills and serialise with all other fan-outs + // (Fill / PushPedestal / UpdateMean). m_mean is overwritten wholesale + // by Pedestal::update_mean, so without the lock we could read torn + // rows mid-update. + flush(); + std::lock_guard lock(fill_mutex_); + + NDArray data( + {static_cast(rows_), static_cast(cols_)}); + + // Each partial pedestal stores its slice of m_mean in C-order + // [local_rows x cols], identical in layout to the corresponding + // [first_row .. first_row + local_rows)[col] slice of `data`, so + // we can copy each shard with a single memcpy. + const size_t row_stride = static_cast(cols_); + for (int t = 0; t < n_threads_; ++t) { + const auto first_row = static_cast(row_start(t)); + const auto local_rows = static_cast(row_count(t)); + if (local_rows == 0) + continue; + + const auto view = partial_pedestals_[t].view(); + std::memcpy(data.data() + first_row * row_stride, view.data(), + local_rows * row_stride * sizeof(AxisType)); + } + + return data; +} + +void PedestalTrackingPixelHistogram::fill(const NDView &image) { + if (image.shape(0) != rows_ || image.shape(1) != cols_) { + throw std::invalid_argument( + "PedestalTrackingPixelHistogram image shape does not match " + "constructor shape"); + } + std::lock_guard fill_lock(fill_mutex_); + dispatch_(WorkKind::Fill, &image); +} + +void PedestalTrackingPixelHistogram::fill_with_threshold_( + const NDView &image) { + if (image.shape(0) != rows_ || image.shape(1) != cols_) { + throw std::invalid_argument( + "PedestalTrackingPixelHistogram image shape does not match " + "constructor shape"); + } + std::lock_guard fill_lock(fill_mutex_); + dispatch_(WorkKind::FillWithThreshold, &image); +} + +void PedestalTrackingPixelHistogram::fill_async_with_threshold( + NDArray image) { + if (image.shape(0) != rows_ || image.shape(1) != cols_) { + throw std::invalid_argument( + "PedestalTrackingPixelHistogram image shape does not match " + "constructor shape"); + } + + // SPSC backpressure: spin with a short sleep until a slot frees up. + // The std::move only consumes `image` on the iteration that succeeds + // (placement-new inside write() runs only when the slot is free). + while (!async_queue_->write(std::move(image))) { + std::this_thread::sleep_for(async_wait_); + } +} + +double PedestalTrackingPixelHistogram::n_sigma() const { + return n_sigma_.load(std::memory_order_relaxed); +} + +void PedestalTrackingPixelHistogram::set_n_sigma(double n_sigma) { + n_sigma_.store(n_sigma, std::memory_order_relaxed); +} + +void PedestalTrackingPixelHistogram::flush() const { + while (!async_queue_->isEmpty() || + coordinator_busy_.load(std::memory_order_acquire)) { + std::this_thread::sleep_for(async_wait_); + } +} + +std::size_t PedestalTrackingPixelHistogram::pending() const { + // sizeGuess() counts the items still in the queue; the coordinator + // does `read()` (which pops) before setting `coordinator_busy_`, so an + // in-flight item lives only in the busy flag. + return async_queue_->sizeGuess() + + (coordinator_busy_.load(std::memory_order_acquire) ? 1u : 0u); +} + +void PedestalTrackingPixelHistogram::coordinator_loop() { + NDArray item; + while (!stop_coordinator_.load(std::memory_order_acquire) || + !async_queue_->isEmpty()) { + if (async_queue_->read(item)) { + coordinator_busy_.store(true, std::memory_order_release); + try { + fill_with_threshold_(item.view()); + } catch (const std::exception &e) { + // fill_async_with_threshold pre-validates shape, so this + // is purely defensive. Log to stderr and keep the + // coordinator alive. + std::cerr << "PedestalTrackingPixelHistogram::" + "fill_async_with_threshold error: " + << e.what() << std::endl; + } catch (...) { + std::cerr << "PedestalTrackingPixelHistogram::" + "fill_async_with_threshold error: unknown" + << std::endl; + } + coordinator_busy_.store(false, std::memory_order_release); + } else { + std::this_thread::sleep_for(async_wait_); + } + } +} + +NDArray +PedestalTrackingPixelHistogram::bin_centers() const { + const auto &value_axis = partial_hists_.front().axis(0); + const auto n_bins = static_cast(value_axis.size()); + + NDArray centers({n_bins}); + + for (ssize_t i = 0; i < n_bins; ++i) { + AxisType left = value_axis.value(i); + AxisType right = value_axis.value(i + 1); + centers(i) = (left + right) / AxisType(2.0); + } + + return centers; +} + +NDArray +PedestalTrackingPixelHistogram::bin_edges() const { + const auto &value_axis = partial_hists_.front().axis(0); + const auto n_bins = static_cast(value_axis.size()); + + NDArray edges({n_bins + 1}); + + for (ssize_t i = 0; i <= n_bins; ++i) { + edges(i) = value_axis.value(i); + } + + return edges; +} + +} // namespace aare