feat(ClusterFinderCUDA): async submit_batch/collect API
Build on RHEL8 / build (push) Successful in 3m16s
Build on RHEL9 / build (push) Successful in 3m26s
Run tests using data on local RHEL8 / build (push) Successful in 9m42s

- Eliminate the ~200–300 µs inter-batch idle gap by allowing two batches
to be in-flight simultaneously:
  - submit_batch() enqueues H2D+kernel+D2H without blocking
  - collect() syncs via cudaEventSynchronize (not
  cudaStreamSynchronize) so a queued second batch runs uninterrupted.

- Two ping-pong output slots (NUM_SLOTS=2) with per-slot pinned buffers
and cudaEventDisableTiming sync events.
- find_clusters_batched() keeps its direct implementation.

* Measured: 0.026 -> 0.022 ms/frame (~18%).
This commit is contained in:
kferjaoui
2026-05-28 16:23:37 +02:00
parent 4c66802980
commit 5922c73c07
5 changed files with 597 additions and 95 deletions
+67 -1
View File
@@ -31,6 +31,10 @@ void define_ClusterFinderCUDA(py::module &m, const std::string &typestr) {
using ContigArr =
py::array_t<uint16_t, py::array::c_style | py::array::forcecast>;
// Opaque batch handle returned by submit_batch() and consumed by collect()
py::class_<typename CF::BatchToken>(m,
(class_name + "_BatchToken").c_str());
py::class_<CF>(m, class_name.c_str())
.def(py::init<Shape<2>, float, size_t, int>(), py::arg("image_size"),
py::arg("n_sigma") = 5.0f,
@@ -91,6 +95,44 @@ void define_ClusterFinderCUDA(py::module &m, const std::string &typestr) {
list of ClusterVector, one per input frame. The input array is
converted to C-contiguous uint16 if needed.)")
.def(
"submit_batch",
[](CF &self, ContigArr frames, uint64_t first_frame) {
auto view = make_view_3d(frames);
return self.submit_batch(view, first_frame);
},
py::arg("frames"), py::arg("first_frame") = 0,
py::call_guard<py::gil_scoped_release>(),
R"(Enqueue one batch of frames onto the GPU without waiting for
completion. Returns a BatchToken that must be passed to collect()
to retrieve results and release the slot.
At most 2 batches can be in flight simultaneously. The intended
usage pattern to eliminate inter-batch GPU idle time is:
tok = cf.submit_batch(buf_a, first_frame=0)
for start in range(BATCH_SIZE, N, BATCH_SIZE):
buf_b[:n] = data[start:start+n] # fill next buffer
next_tok = cf.submit_batch(buf_b, first_frame=start)
results += cf.collect(tok) # GPU runs buf_b
tok = next_tok
buf_a, buf_b = buf_b, buf_a # swap
results += cf.collect(tok) # drain last batch
Two separate input buffers must be used (one per in-flight batch)
so that filling the next buffer does not corrupt the ongoing H2D
transfer for the current batch.)")
.def(
"collect",
[](CF &self, typename CF::BatchToken token) {
return self.collect(token);
},
py::arg("token"), py::call_guard<py::gil_scoped_release>(),
R"(Wait for a previously submitted batch and return its results as
a list of ClusterVector, one per input frame. Releases the batch
slot so it can be reused by the next submit_batch() call.)")
.def("avg_kernel_time_ms", &CF::avg_kernel_time_ms,
R"(Average kernel execution time per frame in milliseconds,
excluding PCIe transfers.)")
@@ -117,7 +159,31 @@ void define_ClusterFinderCUDA(py::module &m, const std::string &typestr) {
Call unregister_input_buffer() when done.)")
.def("unregister_input_buffer", &CF::unregister_input_buffer,
"Release the previously pinned input buffer.");
"Release the previously pinned input buffer.")
.def(
"pin_buffer",
[](CF & /*self*/, py::array arr) {
auto info = arr.request();
CUDA_CHECK(
cudaHostRegister(info.ptr,
static_cast<size_t>(info.size) *
static_cast<size_t>(info.itemsize),
cudaHostRegisterDefault));
},
R"(Pin an arbitrary numpy array as a locked host buffer for DMA-speed
transfers. Unlike register_input_buffer(), does not unpin a
previously registered buffer use this to pin multiple buffers
simultaneously (e.g. the two alternating buffers in an async
pipeline). Call unpin_buffer() on each array when done.)")
.def(
"unpin_buffer",
[](CF & /*self*/, py::array arr) {
auto info = arr.request();
CUDA_CHECK(cudaHostUnregister(info.ptr));
},
"Release a buffer previously pinned with pin_buffer().");
}
} // namespace aare
File diff suppressed because one or more lines are too long