6aa671c78c
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 12m53s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 14m33s
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 15m8s
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 15m54s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 17m47s
Build Packages / build:rpm (rocky8) (push) Successful in 17m43s
Build Packages / build:rpm (rocky9) (push) Successful in 18m36s
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 18m40s
Build Packages / Generate python client (push) Successful in 2m5s
Build Packages / Build documentation (push) Successful in 2m12s
Build Packages / Create release (push) Has been skipped
Build Packages / build:rpm (ubuntu2204) (push) Successful in 10m0s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 11m39s
Build Packages / XDS test (durin plugin) (push) Successful in 10m19s
Build Packages / XDS test (neggia plugin) (push) Successful in 9m8s
Build Packages / XDS test (JFJoch plugin) (push) Successful in 9m46s
Build Packages / DIALS test (push) Successful in 12m58s
Build Packages / Unit tests (push) Successful in 57m47s
184 lines
6.3 KiB
C++
184 lines
6.3 KiB
C++
// SPDX-FileCopyrightText: 2025 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
|
|
// SPDX-License-Identifier: GPL-3.0-only
|
|
|
|
#include "IndexerThreadPool.h"
|
|
#include "../common/CUDAWrapper.h"
|
|
#include "../common/Logger.h"
|
|
|
|
#ifdef JFJOCH_USE_CUDA
|
|
#include "FFBIDXIndexer.h"
|
|
#include "FFTIndexerGPU.h"
|
|
#endif
|
|
|
|
#ifdef JFJOCH_USE_FFTW
|
|
#include "FFTIndexerCPU.h"
|
|
#endif
|
|
|
|
IndexerThreadPool::IndexerThreadPool(const IndexingSettings &settings, const NUMAHWPolicy &numa_policy)
|
|
: workers_ready(settings.GetIndexingThreads()),
|
|
viable_cell_min_spots(settings.GetViableCellMinSpots()),
|
|
stop(false) {
|
|
for (size_t i = 0; i < settings.GetIndexingThreads(); ++i)
|
|
workers.emplace_back([this, i, numa_policy, settings] { Worker(i, numa_policy, settings); });
|
|
workers_ready.wait();
|
|
|
|
if (failed_start) {
|
|
{
|
|
std::unique_lock<std::mutex> lock(m);
|
|
stop = true;
|
|
}
|
|
cond.notify_all();
|
|
|
|
for (std::thread &worker: workers) {
|
|
if (worker.joinable())
|
|
worker.join();
|
|
}
|
|
throw JFJochException(JFJochExceptionCategory::GPUCUDAError,
|
|
"Cannot configure indexer (likely too many threads, not enough memory)");
|
|
}
|
|
}
|
|
|
|
IndexerThreadPool::~IndexerThreadPool() { {
|
|
std::unique_lock<std::mutex> lock(m);
|
|
stop = true;
|
|
}
|
|
cond.notify_all();
|
|
|
|
for (std::thread &worker: workers) {
|
|
if (worker.joinable())
|
|
worker.join();
|
|
}
|
|
}
|
|
|
|
IndexerResult IndexerThreadPool::Run(const DiffractionExperiment &experiment, const std::vector<Coord> &recip) {
|
|
if (experiment.GetIndexingAlgorithm() == IndexingAlgorithmEnum::None)
|
|
return IndexerResult{.lattice = {}, .indexing_time_s = 0};
|
|
if (recip.size() < viable_cell_min_spots)
|
|
return IndexerResult{.lattice = {}, .indexing_time_s = 0};
|
|
|
|
// Create a promise/future pair
|
|
auto promise = std::make_shared<std::promise<IndexerResult > >();
|
|
std::future<IndexerResult> result = promise->get_future(); {
|
|
std::unique_lock<std::mutex> lock(m);
|
|
|
|
// Don't allow enqueueing after stopping the pool
|
|
if (stop) {
|
|
throw std::runtime_error("Cannot enqueue on stopped thread pool");
|
|
}
|
|
|
|
// Create a task package with the data message and coordinates
|
|
taskQueue.emplace(TaskPackage{promise, &experiment, &recip});
|
|
}
|
|
|
|
cond.notify_one();
|
|
return result.get();
|
|
}
|
|
|
|
void IndexerThreadPool::Worker(int32_t threadIndex, const NUMAHWPolicy &numa_policy, const IndexingSettings &settings) {
|
|
try {
|
|
#ifdef JFJOCH_USE_CUDA
|
|
auto gpu_count = get_gpu_count();
|
|
if (gpu_count > 0)
|
|
NUMAHWPolicy::SelectGPUAndItsNUMA(threadIndex % gpu_count);
|
|
else
|
|
numa_policy.Bind(threadIndex);
|
|
#else
|
|
numa_policy.Bind(threadIndex);
|
|
#endif
|
|
} catch (const std::exception &e) {
|
|
spdlog::error("Failed to bind thread to NUMA node: {}", e.what());
|
|
} catch (...) {
|
|
// NUMA policy errors are not critical and should be ignored for the time being.
|
|
}
|
|
|
|
std::unique_ptr<Indexer> fft_indexer, ffbidx_indexer, fftw_indexer;
|
|
|
|
#ifdef JFJOCH_USE_CUDA
|
|
try {
|
|
if (get_gpu_count() > 0) {
|
|
if (settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto
|
|
|| settings.GetAlgorithm() == IndexingAlgorithmEnum::FFT)
|
|
fft_indexer = std::make_unique<FFTIndexerGPU>(settings);
|
|
|
|
if (settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto
|
|
|| settings.GetAlgorithm() == IndexingAlgorithmEnum::FFBIDX)
|
|
ffbidx_indexer = std::make_unique<FFBIDXIndexer>();
|
|
}
|
|
} catch (const std::exception &e) {
|
|
spdlog::error("Failed to initialize GPU indexer: {}", e.what());
|
|
failed_start = true;
|
|
} catch (...) {
|
|
spdlog::error("Failed to initialize GPU indexer");
|
|
failed_start = true;
|
|
}
|
|
#endif
|
|
#ifdef JFJOCH_USE_FFTW
|
|
try {
|
|
if ((settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto && (get_gpu_count() == 0))
|
|
|| settings.GetAlgorithm() == IndexingAlgorithmEnum::FFTW)
|
|
fftw_indexer = std::make_unique<FFTIndexerCPU>(settings);
|
|
} catch (const std::exception &e) {
|
|
spdlog::error("Failed to initialize FFTW indexer: {}", e.what());
|
|
failed_start = true;
|
|
} catch (...) {
|
|
spdlog::error("Failed to initialize FFTW indexer");
|
|
failed_start = true;
|
|
}
|
|
#endif
|
|
|
|
workers_ready.count_down();
|
|
|
|
while (true) {
|
|
TaskPackage task;
|
|
{
|
|
std::unique_lock<std::mutex> lock(m);
|
|
|
|
// Add a timeout to the wait to ensure we can exit even if no notification
|
|
cond.wait_for(lock, std::chrono::milliseconds(50), [this] {
|
|
return stop || !taskQueue.empty();
|
|
});
|
|
|
|
// Check for exit conditions
|
|
if (stop && taskQueue.empty())
|
|
return; // Exit cleanly
|
|
|
|
if (!taskQueue.empty()) {
|
|
task = std::move(taskQueue.front());
|
|
taskQueue.pop();
|
|
} else {
|
|
continue; // No tasks, go back to waiting
|
|
}
|
|
}
|
|
try {
|
|
IndexerResult result;
|
|
|
|
auto algorithm = task.experiment->GetIndexingAlgorithm();
|
|
Indexer *indexer = nullptr;
|
|
|
|
if (algorithm == IndexingAlgorithmEnum::FFT && fft_indexer) {
|
|
indexer = fft_indexer.get();
|
|
} else if (algorithm == IndexingAlgorithmEnum::FFBIDX && ffbidx_indexer) {
|
|
indexer = ffbidx_indexer.get();
|
|
} else if (algorithm == IndexingAlgorithmEnum::FFTW && fftw_indexer) {
|
|
indexer = fftw_indexer.get();
|
|
} else if (algorithm == IndexingAlgorithmEnum::Auto) {
|
|
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
|
|
"Internal error: Invalid indexing algorithm provided");
|
|
}
|
|
|
|
if (indexer) {
|
|
indexer->Setup(*task.experiment);
|
|
result = indexer->Run(*task.recip);
|
|
}
|
|
|
|
// Set the result via the promise
|
|
if (task.promise) {
|
|
task.promise->set_value(result);
|
|
}
|
|
} catch (std::exception &e) {
|
|
if (task.promise)
|
|
task.promise->set_exception(std::current_exception());
|
|
}
|
|
}
|
|
}
|