Files
aare/python/tests/ClusterFinderCUDA.ipynb
kferjaoui 5922c73c07
Build on RHEL8 / build (push) Successful in 3m16s
Build on RHEL9 / build (push) Successful in 3m26s
Run tests using data on local RHEL8 / build (push) Successful in 9m42s
feat(ClusterFinderCUDA): async submit_batch/collect API
- Eliminate the ~200–300 µs inter-batch idle gap by allowing two batches
to be in-flight simultaneously:
  - submit_batch() enqueues H2D+kernel+D2H without blocking
  - collect() syncs via cudaEventSynchronize (not
  cudaStreamSynchronize) so a queued second batch runs uninterrupted.

- Two ping-pong output slots (NUM_SLOTS=2) with per-slot pinned buffers
and cudaEventDisableTiming sync events.
- find_clusters_batched() keeps its direct implementation.

* Measured: 0.026 -> 0.022 ms/frame (~18%).
2026-05-28 16:23:37 +02:00

95 KiB
Raw Permalink Blame History

In [1]:
import sys; sys.path.append('/home/ferjao_k/aare/build')

from pathlib import Path
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
import numpy as np
import boost_histogram as bh
import time

from aare import File, ClusterFinder, ClusterFinderMT, ClusterCollector, ClusterFinderCUDA
In [2]:
# Helpers
N_BINS = 200
def make_hist(clusters):
    h = bh.Histogram(bh.axis.Regular(N_BINS, -2, 4000))
    h.fill(clusters.sum())
    return h

def make_hist_from_batch(result_list):
    h = bh.Histogram(bh.axis.Regular(N_BINS, -2, 4000))
    energies = [np.asarray(cv.sum()).ravel() for cv in result_list if cv.size > 0]
    if energies:
        h.fill(np.concatenate(energies))
    return h
In [3]:
base = Path('/mnt/sls_det_storage/matterhorn_data/aare_test_data/')
f = File(base / 'Moench03new/cu_half_speed_master_4.json')

n_frames_pd = 1000
N           = 50000 #88999
cluster_size = (9, 9)
rows = f.rows
cols = f.cols
image_size   = (rows, cols)
capacity     = 50_000 #3_000_000

print(f'Image size:       {image_size}')
print(f'Pedestal frames:  {n_frames_pd}')
print(f'Data frames:      {N}')
Image size:       (400, 400)
Pedestal frames:  1000
Data frames:      50000
In [4]:
f.total_frames
Out[4]:
500000

Pedestal (both finders trained on identical frames)

  • Modify the boolean SERIAL to choose between the sequential CPU version (ClusterFinder) and its multi-threaded homologue (ClusterFinderMT)
In [5]:
SERIAL = True
In [6]:
if(SERIAL):
    cf_cpu  = ClusterFinder(image_size, cluster_size, capacity=capacity)
else:
    cf_cpu  = ClusterFinderMT(image_size, cluster_size, capacity=capacity, n_threads=48)
    sink = ClusterCollector(cf_cpu)
In [7]:
# Runs the destructor under the hood in case cf_cuda has already been constructed
# del cf_cuda
cf_cuda = None
In [8]:
N_STREAMS  = 5
cf_cuda = ClusterFinderCUDA(image_size, 
                            cluster_size, 
                            n_sigma=7, 
                            max_clusters_per_frame=1500,
                            n_streams=N_STREAMS) 
In [9]:
cf_async = ClusterFinderCUDA(image_size,
                             cluster_size,
                             n_sigma=7,
                             max_clusters_per_frame=1500,
                             n_streams=N_STREAMS)
In [10]:
t0 = time.perf_counter()
for _ in range(n_frames_pd):
    img = f.read_frame()
    cf_cpu.push_pedestal_frame(img.copy())
    cf_cuda.push_pedestal_frame(img.copy())
    cf_async.push_pedestal_frame(img.copy())
print(f'Pedestal ({n_frames_pd} frames): {time.perf_counter() - t0:.3f}s')
Pedestal (1000 frames): 0.659s

Read all data frames into memory (I/O out of the timing loop)

In [11]:
f.seek(n_frames_pd)
t0 = time.perf_counter()
data = f.read_n(N)
t_io = time.perf_counter() - t0
print(f'Reading {N} frames:        {t_io:.3f}s  ({N/t_io:.0f} FPS, '
      f'{f.bytes_per_frame * N / 1024**2 / t_io:.3f} GB/s)')
Reading 50000 frames:        2.294s  (21800 FPS, 6652.882 GB/s)

CPU clustering

In [12]:
from tqdm import tqdm
In [13]:
t0 = time.perf_counter()
for frame in tqdm(data):
    cf_cpu.find_clusters(frame)
t_cpu = time.perf_counter() - t0

if(SERIAL):
    clusters_cpu = cf_cpu.steal_clusters(realloc_same_capacity=False)
    n_clusters_cpu = clusters_cpu.size
    
    hist_cpu  = make_hist(clusters_cpu)
else:
    cf_cpu.stop()
    sink.stop()
    
    clusters_cpu = sink.steal_clusters() #cf_cpu.steal_clusters(realloc_same_capacity=False)
    
    hist_cpu = bh.Histogram(bh.axis.Regular(N_BINS, -2, 4000))
    n_clusters_cpu = 0
    for cv in clusters_cpu:
        hist_cpu.fill(cv.sum())
        n_clusters_cpu += cv.size
        
print(f'CPU clustering:          {t_cpu:.3f}s ({N/t_cpu:.0f} FPS, '
      f'{n_clusters_cpu} clusters, {n_clusters_cpu/N:.2f}/frame)')
100%|█████████████████████████████████████████████████████████████████████████████████████████████████| 50000/50000 [07:46<00:00, 107.08it/s]
CPU clustering:          466.937s (107 FPS, 50982358 clusters, 1019.65/frame)

CUDA clustering

In [14]:
BATCHED = True
BATCH_SIZE = 3000
In [42]:
if(BATCHED):
    # Before warmup, pin the fixed size buffer
    batch_buffer = np.empty((BATCH_SIZE, rows, cols), dtype=np.uint16)
    cf_cuda.register_input_buffer(batch_buffer)       # fixed ~610 MB for 2000 frames

    # Warmup: first kernel launch pays CUDA context + pedestal H2D upload cost
    _ = cf_cuda.find_clusters_batched(data[0:BATCH_SIZE], first_frame=0)
    
    clusters_cuda_per_frame = []

    cf_cuda.reset_timers()
    t0 = time.perf_counter()
    for start in range(0, N, BATCH_SIZE):
        stop = min(start + BATCH_SIZE, N)
        batch_buffer[:stop-start] = data[start:stop]  # CPU memcpy into pinned buf
        clusters_cuda_per_frame.extend(
            cf_cuda.find_clusters_batched(batch_buffer[:stop-start], first_frame=start)
        )
    t_cuda = time.perf_counter() - t0

    cf_cuda.unregister_input_buffer()  # release when done with this dataset

    kernel_ms = cf_cuda.avg_kernel_time_ms()
    
    n_clusters_cuda = sum(cv.size for cv in clusters_cuda_per_frame)

    hist_cuda = make_hist_from_batch(clusters_cuda_per_frame)
    
else:   
    # Simpler: (non-batched) per-frame run on non-pinned data
    cf_cuda.find_clusters(data[0])
    _ = cf_cuda.steal_clusters(realloc_same_capacity=False)

    cf_cuda.reset_timers()
    t0 = time.perf_counter()

    n_clusters_cuda = 0
    hist_cuda = None

    # steal the clusters as we go rather than at the end of the  dataset 
    # which might trigger an std::bad_alloc...
    for idx, frame in enumerate(data):
        cf_cuda.find_clusters(frame)
        clusters_frame = cf_cuda.steal_clusters(realloc_same_capacity=True)

        n_clusters_cuda += clusters_frame.size

        h = make_hist(clusters_frame)
        hist_cuda = h if hist_cuda is None else hist_cuda + h
        
    t_cuda = time.perf_counter() - t0
    kernel_ms = cf_cuda.avg_kernel_time_ms()
In [43]:
cluster_size
Out[43]:
(9, 9)
In [44]:
print(f'CPU clustering:          {t_cpu:.3f}s ({N/t_cpu:.0f} FPS, '
      f'{n_clusters_cpu} clusters, {n_clusters_cpu/N:.2f}/frame)')
CPU clustering:          466.937s (107 FPS, 50982358 clusters, 1019.65/frame)
In [45]:
print(f'CUDA clustering:          {t_cuda:.3f}s  ({N/t_cuda:.0f} FPS, '
      f'{n_clusters_cuda} clusters, {n_clusters_cuda/N:.2f}/frame)')
print(f'  Kernel only:            {kernel_ms:.3f} ms/frame')
print(f'  PCIe + overhead:        {t_cuda*1000/N - kernel_ms:.3f} ms/frame')
print(f'Speedup (CPU / CUDA):     {t_cpu / t_cuda:.2f}×')
CUDA clustering:          3.684s  (13571 FPS, 50577611 clusters, 1011.55/frame)
  Kernel only:            0.029 ms/frame
  PCIe + overhead:        0.045 ms/frame
Speedup (CPU / CUDA):     126.73×

CUDA Clustering (async pipeline)

In [46]:
BATCH_SIZE = 3000
In [47]:
# Two alternating batch buffers — buf[cur] is in flight on the GPU while
# buf[nxt] is being filled by the CPU memcpy.
# Both are pinned simultaneously for DMA-speed H2D (~22 GB/s).
buf = [np.empty((BATCH_SIZE, rows, cols), dtype=np.uint16) for _ in range(2)]
cf_async.pin_buffer(buf[0])
cf_async.pin_buffer(buf[1])

clusters_async = []

# Warmup: first call pays CUDA context + pedestal upload cost
buf[0][:BATCH_SIZE] = data[:BATCH_SIZE]
_ = cf_async.find_clusters_batched(buf[0], first_frame=0)

cf_async.reset_timers()
t0 = time.perf_counter()

# Prime: fill buf[0] with the first real batch and submit
cur = 0
n0 = min(BATCH_SIZE, N)
buf[cur][:n0] = data[:n0]
tok = cf_async.submit_batch(buf[cur][:n0], first_frame=0)

starts = list(range(BATCH_SIZE, N, BATCH_SIZE))
for start in starts:
    nxt  = 1 - cur
    stop = min(start + BATCH_SIZE, N)
    n    = stop - start

    # Fill next buffer while GPU processes current batch
    buf[nxt][:n] = data[start:stop]

    # Enqueue next batch — GPU now has both batches queued back-to-back
    next_tok = cf_async.submit_batch(buf[nxt][:n], first_frame=start)

    # Drain previous batch (GPU runs next_tok concurrently)
    clusters_async.extend(cf_async.collect(tok))

    tok = next_tok
    cur = nxt

# Drain final batch
clusters_async.extend(cf_async.collect(tok))

t_async = time.perf_counter() - t0

cf_async.unpin_buffer(buf[0])
cf_async.unpin_buffer(buf[1])

kernel_ms_async = cf_async.avg_kernel_time_ms()
n_clusters_async = sum(cv.size for cv in clusters_async)
hist_async = make_hist_from_batch(clusters_async)

print(f'CUDA(async pipeline):    {t_async:.3f}s  ({N/t_async:.0f} FPS, '
      f'{n_clusters_async} clusters, {n_clusters_async/N:.2f}/frame)')
print(f'  Kernel only:           {kernel_ms_async:.3f} ms/frame')
print(f'  PCIe + overhead:       {t_async*1000/N - kernel_ms_async:.3f} ms/frame')
if 't_cuda' in dir():
    print(f'Speedup vs batched:      {t_cuda / t_async:.2f}×')
CUDA(async pipeline):    3.384s  (14774 FPS, 50577640 clusters, 1011.55/frame)
  Kernel only:           0.031 ms/frame
  PCIe + overhead:       0.036 ms/frame
Speedup vs batched:      1.09×

Agreement check:

  • Cluster counts should match closely.
  • However, as the CUDA CF updates the pedestal once per frame rather than per-pixel, a small divergence after the first few frames is expected.
In [48]:
diff = abs(n_clusters_cpu - n_clusters_cuda)
rel  = diff / max(n_clusters_cpu, 1)
print(f'Cluster count diff:       {diff} ({rel:.2%})')
Cluster count diff:       404747 (0.79%)

Plots

In [49]:
print(len(hist_cpu.values()), len(hist_cpu.axes[0].edges))
print(len(hist_cuda.values()), len(hist_cuda.axes[0].edges))
200 201
200 201
In [50]:
fig, (ax_spec, ax_ratio) = plt.subplots(
    2, 1, figsize=(8, 6), sharex=True,
    gridspec_kw={'height_ratios': [3, 1]}
)

edges = hist_cpu.axes[0].edges
cpu_vals = hist_cpu.values()
cuda_vals = hist_cuda.values()
async_vals = hist_async.values()

ax_spec.stairs(cpu_vals, edges, label=f'CPU  ({n_clusters_cpu} clusters)')
ax_spec.stairs(cuda_vals, edges, label=f'CUDA ({n_clusters_cuda} clusters)', linestyle='--')
ax_spec.stairs(async_vals, edges, label=f'CUDA_async ({n_clusters_async} clusters)', linestyle='-.')
ax_spec.set_ylabel('Counts')
ax_spec.set_title('Cluster energy spectrum: CPU vs CUDA')
ax_spec.legend()
ax_spec.grid(alpha=0.2)

with np.errstate(divide='ignore', invalid='ignore'):
    ratio = np.where(cpu_vals > 0, cuda_vals / cpu_vals, np.nan)

ax_ratio.stairs(ratio, edges, color='k')
ax_ratio.axhline(1.0, color='gray', linewidth=0.5)
ax_ratio.set_ylabel('CUDA / CPU')
ax_ratio.set_xlabel('Energy [ADU]')
ax_ratio.set_ylim(0.5, 2.0)
ax_ratio.grid(alpha=0.3)

plt.tight_layout()
plt.show()
No description has been provided for this image