mirror of
https://github.com/slsdetectorgroup/aare.git
synced 2026-06-27 21:09:41 +02:00
5922c73c07
- Eliminate the ~200–300 µs inter-batch idle gap by allowing two batches to be in-flight simultaneously: - submit_batch() enqueues H2D+kernel+D2H without blocking - collect() syncs via cudaEventSynchronize (not cudaStreamSynchronize) so a queued second batch runs uninterrupted. - Two ping-pong output slots (NUM_SLOTS=2) with per-slot pinned buffers and cudaEventDisableTiming sync events. - find_clusters_batched() keeps its direct implementation. * Measured: 0.026 -> 0.022 ms/frame (~18%).
95 KiB
95 KiB
In [1]:
import sys; sys.path.append('/home/ferjao_k/aare/build') from pathlib import Path import matplotlib.pyplot as plt from mpl_toolkits.axes_grid1 import make_axes_locatable import numpy as np import boost_histogram as bh import time from aare import File, ClusterFinder, ClusterFinderMT, ClusterCollector, ClusterFinderCUDA
In [2]:
# Helpers N_BINS = 200 def make_hist(clusters): h = bh.Histogram(bh.axis.Regular(N_BINS, -2, 4000)) h.fill(clusters.sum()) return h def make_hist_from_batch(result_list): h = bh.Histogram(bh.axis.Regular(N_BINS, -2, 4000)) energies = [np.asarray(cv.sum()).ravel() for cv in result_list if cv.size > 0] if energies: h.fill(np.concatenate(energies)) return h
In [3]:
base = Path('/mnt/sls_det_storage/matterhorn_data/aare_test_data/') f = File(base / 'Moench03new/cu_half_speed_master_4.json') n_frames_pd = 1000 N = 50000 #88999 cluster_size = (9, 9) rows = f.rows cols = f.cols image_size = (rows, cols) capacity = 50_000 #3_000_000 print(f'Image size: {image_size}') print(f'Pedestal frames: {n_frames_pd}') print(f'Data frames: {N}')
Image size: (400, 400) Pedestal frames: 1000 Data frames: 50000
In [4]:
f.total_frames
Out[4]:
500000
Pedestal (both finders trained on identical frames)¶
- Modify the boolean
SERIALto choose between the sequential CPU version (ClusterFinder) and its multi-threaded homologue (ClusterFinderMT)
In [5]:
SERIAL = True
In [6]:
if(SERIAL): cf_cpu = ClusterFinder(image_size, cluster_size, capacity=capacity) else: cf_cpu = ClusterFinderMT(image_size, cluster_size, capacity=capacity, n_threads=48) sink = ClusterCollector(cf_cpu)
In [7]:
# Runs the destructor under the hood in case cf_cuda has already been constructed # del cf_cuda cf_cuda = None
In [8]:
N_STREAMS = 5 cf_cuda = ClusterFinderCUDA(image_size, cluster_size, n_sigma=7, max_clusters_per_frame=1500, n_streams=N_STREAMS)
In [9]:
cf_async = ClusterFinderCUDA(image_size, cluster_size, n_sigma=7, max_clusters_per_frame=1500, n_streams=N_STREAMS)
In [10]:
t0 = time.perf_counter() for _ in range(n_frames_pd): img = f.read_frame() cf_cpu.push_pedestal_frame(img.copy()) cf_cuda.push_pedestal_frame(img.copy()) cf_async.push_pedestal_frame(img.copy()) print(f'Pedestal ({n_frames_pd} frames): {time.perf_counter() - t0:.3f}s')
Pedestal (1000 frames): 0.659s
Read all data frames into memory (I/O out of the timing loop)¶
In [11]:
f.seek(n_frames_pd) t0 = time.perf_counter() data = f.read_n(N) t_io = time.perf_counter() - t0 print(f'Reading {N} frames: {t_io:.3f}s ({N/t_io:.0f} FPS, ' f'{f.bytes_per_frame * N / 1024**2 / t_io:.3f} GB/s)')
Reading 50000 frames: 2.294s (21800 FPS, 6652.882 GB/s)
CPU clustering¶
In [12]:
from tqdm import tqdm
In [13]:
t0 = time.perf_counter() for frame in tqdm(data): cf_cpu.find_clusters(frame) t_cpu = time.perf_counter() - t0 if(SERIAL): clusters_cpu = cf_cpu.steal_clusters(realloc_same_capacity=False) n_clusters_cpu = clusters_cpu.size hist_cpu = make_hist(clusters_cpu) else: cf_cpu.stop() sink.stop() clusters_cpu = sink.steal_clusters() #cf_cpu.steal_clusters(realloc_same_capacity=False) hist_cpu = bh.Histogram(bh.axis.Regular(N_BINS, -2, 4000)) n_clusters_cpu = 0 for cv in clusters_cpu: hist_cpu.fill(cv.sum()) n_clusters_cpu += cv.size print(f'CPU clustering: {t_cpu:.3f}s ({N/t_cpu:.0f} FPS, ' f'{n_clusters_cpu} clusters, {n_clusters_cpu/N:.2f}/frame)')
100%|█████████████████████████████████████████████████████████████████████████████████████████████████| 50000/50000 [07:46<00:00, 107.08it/s]
CPU clustering: 466.937s (107 FPS, 50982358 clusters, 1019.65/frame)
CUDA clustering¶
In [14]:
BATCHED = True BATCH_SIZE = 3000
In [42]:
if(BATCHED): # Before warmup, pin the fixed size buffer batch_buffer = np.empty((BATCH_SIZE, rows, cols), dtype=np.uint16) cf_cuda.register_input_buffer(batch_buffer) # fixed ~610 MB for 2000 frames # Warmup: first kernel launch pays CUDA context + pedestal H2D upload cost _ = cf_cuda.find_clusters_batched(data[0:BATCH_SIZE], first_frame=0) clusters_cuda_per_frame = [] cf_cuda.reset_timers() t0 = time.perf_counter() for start in range(0, N, BATCH_SIZE): stop = min(start + BATCH_SIZE, N) batch_buffer[:stop-start] = data[start:stop] # CPU memcpy into pinned buf clusters_cuda_per_frame.extend( cf_cuda.find_clusters_batched(batch_buffer[:stop-start], first_frame=start) ) t_cuda = time.perf_counter() - t0 cf_cuda.unregister_input_buffer() # release when done with this dataset kernel_ms = cf_cuda.avg_kernel_time_ms() n_clusters_cuda = sum(cv.size for cv in clusters_cuda_per_frame) hist_cuda = make_hist_from_batch(clusters_cuda_per_frame) else: # Simpler: (non-batched) per-frame run on non-pinned data cf_cuda.find_clusters(data[0]) _ = cf_cuda.steal_clusters(realloc_same_capacity=False) cf_cuda.reset_timers() t0 = time.perf_counter() n_clusters_cuda = 0 hist_cuda = None # steal the clusters as we go rather than at the end of the dataset # which might trigger an std::bad_alloc... for idx, frame in enumerate(data): cf_cuda.find_clusters(frame) clusters_frame = cf_cuda.steal_clusters(realloc_same_capacity=True) n_clusters_cuda += clusters_frame.size h = make_hist(clusters_frame) hist_cuda = h if hist_cuda is None else hist_cuda + h t_cuda = time.perf_counter() - t0 kernel_ms = cf_cuda.avg_kernel_time_ms()
In [43]:
cluster_size
Out[43]:
(9, 9)
In [44]:
print(f'CPU clustering: {t_cpu:.3f}s ({N/t_cpu:.0f} FPS, ' f'{n_clusters_cpu} clusters, {n_clusters_cpu/N:.2f}/frame)')
CPU clustering: 466.937s (107 FPS, 50982358 clusters, 1019.65/frame)
In [45]:
print(f'CUDA clustering: {t_cuda:.3f}s ({N/t_cuda:.0f} FPS, ' f'{n_clusters_cuda} clusters, {n_clusters_cuda/N:.2f}/frame)') print(f' Kernel only: {kernel_ms:.3f} ms/frame') print(f' PCIe + overhead: {t_cuda*1000/N - kernel_ms:.3f} ms/frame') print(f'Speedup (CPU / CUDA): {t_cpu / t_cuda:.2f}×')
CUDA clustering: 3.684s (13571 FPS, 50577611 clusters, 1011.55/frame)
Kernel only: 0.029 ms/frame
PCIe + overhead: 0.045 ms/frame
Speedup (CPU / CUDA): 126.73×
CUDA Clustering (async pipeline)¶
In [46]:
BATCH_SIZE = 3000
In [47]:
# Two alternating batch buffers — buf[cur] is in flight on the GPU while # buf[nxt] is being filled by the CPU memcpy. # Both are pinned simultaneously for DMA-speed H2D (~22 GB/s). buf = [np.empty((BATCH_SIZE, rows, cols), dtype=np.uint16) for _ in range(2)] cf_async.pin_buffer(buf[0]) cf_async.pin_buffer(buf[1]) clusters_async = [] # Warmup: first call pays CUDA context + pedestal upload cost buf[0][:BATCH_SIZE] = data[:BATCH_SIZE] _ = cf_async.find_clusters_batched(buf[0], first_frame=0) cf_async.reset_timers() t0 = time.perf_counter() # Prime: fill buf[0] with the first real batch and submit cur = 0 n0 = min(BATCH_SIZE, N) buf[cur][:n0] = data[:n0] tok = cf_async.submit_batch(buf[cur][:n0], first_frame=0) starts = list(range(BATCH_SIZE, N, BATCH_SIZE)) for start in starts: nxt = 1 - cur stop = min(start + BATCH_SIZE, N) n = stop - start # Fill next buffer while GPU processes current batch buf[nxt][:n] = data[start:stop] # Enqueue next batch — GPU now has both batches queued back-to-back next_tok = cf_async.submit_batch(buf[nxt][:n], first_frame=start) # Drain previous batch (GPU runs next_tok concurrently) clusters_async.extend(cf_async.collect(tok)) tok = next_tok cur = nxt # Drain final batch clusters_async.extend(cf_async.collect(tok)) t_async = time.perf_counter() - t0 cf_async.unpin_buffer(buf[0]) cf_async.unpin_buffer(buf[1]) kernel_ms_async = cf_async.avg_kernel_time_ms() n_clusters_async = sum(cv.size for cv in clusters_async) hist_async = make_hist_from_batch(clusters_async) print(f'CUDA(async pipeline): {t_async:.3f}s ({N/t_async:.0f} FPS, ' f'{n_clusters_async} clusters, {n_clusters_async/N:.2f}/frame)') print(f' Kernel only: {kernel_ms_async:.3f} ms/frame') print(f' PCIe + overhead: {t_async*1000/N - kernel_ms_async:.3f} ms/frame') if 't_cuda' in dir(): print(f'Speedup vs batched: {t_cuda / t_async:.2f}×')
CUDA(async pipeline): 3.384s (14774 FPS, 50577640 clusters, 1011.55/frame)
Kernel only: 0.031 ms/frame
PCIe + overhead: 0.036 ms/frame
Speedup vs batched: 1.09×
Agreement check:¶
- Cluster counts should match closely.
- However, as the CUDA CF updates the pedestal once per frame rather than per-pixel, a small divergence after the first few frames is expected.
In [48]:
diff = abs(n_clusters_cpu - n_clusters_cuda) rel = diff / max(n_clusters_cpu, 1) print(f'Cluster count diff: {diff} ({rel:.2%})')
Cluster count diff: 404747 (0.79%)
Plots¶
In [49]:
print(len(hist_cpu.values()), len(hist_cpu.axes[0].edges)) print(len(hist_cuda.values()), len(hist_cuda.axes[0].edges))
200 201 200 201
In [50]:
fig, (ax_spec, ax_ratio) = plt.subplots( 2, 1, figsize=(8, 6), sharex=True, gridspec_kw={'height_ratios': [3, 1]} ) edges = hist_cpu.axes[0].edges cpu_vals = hist_cpu.values() cuda_vals = hist_cuda.values() async_vals = hist_async.values() ax_spec.stairs(cpu_vals, edges, label=f'CPU ({n_clusters_cpu} clusters)') ax_spec.stairs(cuda_vals, edges, label=f'CUDA ({n_clusters_cuda} clusters)', linestyle='--') ax_spec.stairs(async_vals, edges, label=f'CUDA_async ({n_clusters_async} clusters)', linestyle='-.') ax_spec.set_ylabel('Counts') ax_spec.set_title('Cluster energy spectrum: CPU vs CUDA') ax_spec.legend() ax_spec.grid(alpha=0.2) with np.errstate(divide='ignore', invalid='ignore'): ratio = np.where(cpu_vals > 0, cuda_vals / cpu_vals, np.nan) ax_ratio.stairs(ratio, edges, color='k') ax_ratio.axhline(1.0, color='gray', linewidth=0.5) ax_ratio.set_ylabel('CUDA / CPU') ax_ratio.set_xlabel('Energy [ADU]') ax_ratio.set_ylim(0.5, 2.0) ax_ratio.grid(alpha=0.3) plt.tight_layout() plt.show()