Refactor ClusterFinderCUDA
Build on RHEL9 / build (push) Successful in 3m22s
Build on RHEL8 / build (push) Successful in 3m28s
Run tests using data on local RHEL8 / build (push) Successful in 3m37s

Rework the multi-stream pipeline to eliminate per-frame sync barriers and
fix the D2H staging architecture.

Sync reduction:
- Replace one cudaStreamSynchronize per frame with one per stream per batch,
  cutting synchronisation calls from O(n_frames x n_streams) to O(n_streams)
- Introduce a unified per-frame D2H output layout [uint32_t count | clusters[max]]
  stored in a single class-level lazy-allocated pinned pool (h_output_pinned),
  replacing the per-stream separate cluster/count device buffers
- Move CUDA event pool from per-stream fixed-size to per-frame-slot lazy-allocated,
  enabling correct kernel timing across any batch size

Pinned H2D without CPU-side copy:
- Add register_input_buffer(ptr, bytes) / unregister_input_buffer() wrapping
  cudaHostRegister so callers can pin their existing batch buffer once; all
  find_clusters_batched() slices then transfer at DMA speed (~22 GB/s) instead
  of ~15 GB/s for pageable, with no extra memcpy or WC-memory penalty

Result (RTX 4090, 400x400 uint16, 3x3 clusters, batch=2000, 5 streams):
  Before: ~34 µs/frame  ->  After: ~28 µs/frame  (−18 %)
This commit is contained in:
kferjaoui
2026-05-18 16:30:13 +02:00
parent 5cfc888567
commit 6a12e3de24
8 changed files with 1497 additions and 290 deletions
+42 -20
View File
@@ -8,8 +8,8 @@
#include <cstdint>
#include <pybind11/pybind11.h>
// #include <pybind11/stl.h>
#include <pybind11/stl_bind.h>
#include <pybind11/stl.h>
// #include <pybind11/stl_bind.h>
namespace py = pybind11;
using pd_type = double;
@@ -28,18 +28,20 @@ void define_ClusterFinderCUDA(py::module &m, const std::string &typestr) {
using ClusterType = Cluster<T, ClusterSizeX, ClusterSizeY, CoordType>;
using CF = ClusterFinderCUDA<ClusterType, uint16_t, pd_type>;
using ContigArr =
py::array_t<uint16_t, py::array::c_style | py::array::forcecast>;
py::class_<CF>(m, class_name.c_str())
.def(py::init<Shape<2>, pd_type, size_t, int>(), py::arg("image_size"),
py::arg("n_sigma") = 5.0, py::arg("capacity") = 1'000'000,
py::arg("n_streams") = 1)
.def(py::init<Shape<2>, float, size_t, int>(), py::arg("image_size"),
py::arg("n_sigma") = 5.0f,
py::arg("max_clusters_per_frame") = 2048, py::arg("n_streams") = 4)
.def_property(
"nSigma", &CF::get_nSigma, &CF::set_nSigma,
R"(Number of sigma above the pedestal to consider a photon during cluster finding.)")
.def("push_pedestal_frame",
[](CF &self, py::array_t<uint16_t> frame) {
[](CF &self, ContigArr frame) {
auto view = make_view_2d(frame);
self.push_pedestal_frame(view);
})
@@ -63,39 +65,59 @@ void define_ClusterFinderCUDA(py::module &m, const std::string &typestr) {
.def(
"steal_clusters",
[](CF &self, bool realloc_same_capacity) {
ClusterVector<ClusterType> clusters =
self.steal_clusters(realloc_same_capacity);
return clusters;
return std::move(self.steal_clusters(realloc_same_capacity));
},
py::arg("realloc_same_capacity") = false)
py::arg("realloc_same_capacity") = true)
.def(
"find_clusters",
[](CF &self, py::array_t<uint16_t> frame, uint64_t frame_number) {
[](CF &self, ContigArr frame, uint64_t frame_number) {
auto view = make_view_2d(frame);
self.find_clusters(view, frame_number);
},
py::arg("frame"), py::arg("frame_number") = 0)
py::arg("frame"), py::arg("frame_number") = 0,
py::call_guard<py::gil_scoped_release>())
.def(
"find_clusters_batched",
[](CF &self, py::array_t<uint16_t> frames, uint64_t first_frame) {
// frames is expected as a 3D numpy array (n_frames, nrows,
// ncols)
[](CF &self, ContigArr frames, uint64_t first_frame) {
auto view = make_view_3d(frames);
return self.find_clusters_batched(view, first_frame);
},
py::arg("frames"), py::arg("first_frame") = 0,
R"(Process a 3D array of frames (n_frames, nrows, ncols) in parallel
across the configured CUDA streams. Returns a list of ClusterVector, one per
input frame.)")
py::call_guard<py::gil_scoped_release>(),
R"(Process a 3D array of frames (n_frames, nrows, ncols) using
n_streams CUDA streams for H2D/kernel/D2H pipelining. Returns a
list of ClusterVector, one per input frame. The input array is
converted to C-contiguous uint16 if needed.)")
.def("avg_kernel_time_ms", &CF::avg_kernel_time_ms,
R"(Average kernel execution time per frame in milliseconds,
excluding PCIe transfers. Use wall_time - avg_kernel_time to estimate transfer overhead.)")
excluding PCIe transfers.)")
.def("reset_timers", &CF::reset_timers,
R"(Reset the internal kernel timing counters.)");
R"(Reset the internal kernel timing counters.)")
.def(
"register_input_buffer",
[](CF &self, py::array arr) {
auto info = arr.request();
self.register_input_buffer(
info.ptr, static_cast<size_t>(info.size) *
static_cast<size_t>(info.itemsize));
},
R"(Pin a numpy array as a locked host buffer so that
find_clusters_batched transfers it at full DMA bandwidth
(~22 GB/s) instead of going through the CUDA driver's
internal staging (~15 GB/s for pageable memory).
Call once before the processing loop with the full data
array. Slices of that array passed to find_clusters_batched
lie within the registered region and benefit automatically.
Call unregister_input_buffer() when done.)")
.def("unregister_input_buffer", &CF::unregister_input_buffer,
"Release the previously pinned input buffer.");
}
} // namespace aare