Files
Jungfraujoch/image_analysis/indexing/IndexerThreadPool.cpp
T

239 lines
8.0 KiB
C++

// SPDX-FileCopyrightText: 2025 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
// SPDX-License-Identifier: GPL-3.0-only
#include "IndexerThreadPool.h"
#include "../common/CUDAWrapper.h"
#include "../common/Logger.h"
#ifdef JFJOCH_USE_CUDA
#include "FFBIDXIndexer.h"
#include "FFTIndexerGPU.h"
#endif
#ifdef JFJOCH_USE_FFTW
#include "FFTIndexerCPU.h"
#endif
IndexerThread::IndexerThread(const IndexingSettings &settings, int threadid) {
std::unique_lock<std::mutex> lock(m);
state = TaskState::STARTING;
worker_thread = std::thread(&IndexerThread::Worker, this, std::cref(settings), threadid);
c_running.wait(lock, [this] { return state != TaskState::STARTING; });
if (state == TaskState::ERROR) {
worker_thread.join();
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Indexer thread initialization failed");
}
}
void IndexerThread::Worker(const IndexingSettings &settings, int threadid) {
try {
#ifdef JFJOCH_USE_CUDA
auto gpu_count = get_gpu_count();
if (gpu_count > 0)
NUMAHWPolicy::SelectGPUAndItsNUMA(threadid % gpu_count);
#endif
} catch (const std::exception &e) {
spdlog::error("Failed to bind thread to NUMA node: {}", e.what());
} catch (...) {
// NUMA policy errors are not critical and should be ignored for the time being.
}
std::unique_ptr<Indexer> fft_indexer, ffbidx_indexer, fftw_indexer;
#ifdef JFJOCH_USE_CUDA
try {
if (get_gpu_count() > 0) {
if (settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto
|| settings.GetAlgorithm() == IndexingAlgorithmEnum::FFT)
fft_indexer = std::make_unique<FFTIndexerGPU>(settings);
if (settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto
|| settings.GetAlgorithm() == IndexingAlgorithmEnum::FFBIDX)
ffbidx_indexer = std::make_unique<FFBIDXIndexer>();
}
} catch (const std::exception &e) {
spdlog::error("Failed to initialize GPU indexer: {}", e.what());
{
std::unique_lock<std::mutex> lock(m);
state = TaskState::ERROR;
}
c_running.notify_all();
return;
} catch (...) {
spdlog::error("Failed to initialize GPU indexer");
{
std::unique_lock<std::mutex> lock(m);
state = TaskState::ERROR;
}
c_running.notify_all();
return;
}
#endif
#ifdef JFJOCH_USE_FFTW
try {
if ((settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto && (get_gpu_count() == 0))
|| settings.GetAlgorithm() == IndexingAlgorithmEnum::FFTW)
fftw_indexer = std::make_unique<FFTIndexerCPU>(settings);
} catch (const std::exception &e) {
spdlog::error("Failed to initialize FFTW indexer: {}", e.what());
{
std::unique_lock<std::mutex> lock(m);
state = TaskState::ERROR;
}
c_running.notify_all();
return;
} catch (...) {
spdlog::error("Failed to initialize FFTW indexer");
{
std::unique_lock<std::mutex> lock(m);
state = TaskState::ERROR;
}
c_running.notify_all();
return;
}
#endif
{
std::unique_lock<std::mutex> lock(m);
state = TaskState::IDLE;
}
c_running.notify_all();
while (true) {
std::unique_ptr<TaskInput> input;
// Look for task + handle stop
{
std::unique_lock<std::mutex> lock(m);
c_start.wait(lock, [this] { return stop || state == TaskState::READY; });
if (stop)
return;
input = std::move(task_input);
}
if (input) {
std::unique_ptr<IndexerResult> tmp_result;
try {
auto algorithm = input->experiment.GetIndexingAlgorithm();
Indexer *indexer = nullptr;
if (algorithm == IndexingAlgorithmEnum::FFT && fft_indexer) {
indexer = fft_indexer.get();
} else if (algorithm == IndexingAlgorithmEnum::FFBIDX && ffbidx_indexer) {
indexer = ffbidx_indexer.get();
} else if (algorithm == IndexingAlgorithmEnum::FFTW && fftw_indexer) {
indexer = fftw_indexer.get();
} else if (algorithm == IndexingAlgorithmEnum::Auto) {
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Internal error: Invalid indexing algorithm provided");
}
if (indexer) {
indexer->Setup(input->experiment);
tmp_result = std::make_unique<IndexerResult>(indexer->Run(input->recip));
}
} catch (std::exception &e) {
tmp_result = nullptr;
spdlog::error("Indexer thread {} failed: {}", threadid, e.what());
}
{
std::unique_lock<std::mutex> lock(m);
state = TaskState::COMPLETED;
result = std::move(tmp_result);
c_done.notify_all();
}
}
}
}
void IndexerThread::Finalize() {
{
std::unique_lock<std::mutex> lock(m);
stop = true;
}
c_start.notify_one();
if (worker_thread.joinable())
worker_thread.join();
}
std::unique_ptr<IndexerResult> IndexerThread::Run(const DiffractionExperiment &experiment,
const std::vector<Coord> &recip) {
std::unique_ptr<IndexerResult> tmp_result;
{
std::unique_lock<std::mutex> lock(m);
if (stop)
return nullptr;
if (state != TaskState::IDLE)
return nullptr;
task_input = std::make_unique<TaskInput>(std::cref(experiment), std::cref(recip));
state = TaskState::READY;
}
c_start.notify_one();
{
std::unique_lock<std::mutex> lock(m);
c_done.wait(lock, [this] { return state == TaskState::COMPLETED; });
tmp_result = std::move(result);
state = TaskState::IDLE;
}
return std::move(tmp_result);
}
IndexerThread::~IndexerThread() {
Finalize();
}
IndexerThreadPool::IndexerThreadPool(const IndexingSettings &settings)
: worker_busy(settings.GetIndexingThreads(), 0),
worker_free_count(settings.GetIndexingThreads()),
viable_cell_min_spots(settings.GetViableCellMinSpots()),
blocking(settings.GetBlockingBehavior()) {
for (size_t i = 0; i < settings.GetIndexingThreads(); ++i)
tasks.emplace_back(std::make_unique<IndexerThread>(std::cref(settings), i));
}
int IndexerThreadPool::GetFreeWorker() {
std::unique_lock<std::mutex> lock(m);
if (tasks.size() == 0)
return -1;
if (blocking)
c.wait(lock, [this] { return worker_free_count > 0; });
for (int i = 0; i < tasks.size(); i++) {
if (worker_busy[i] == 0) {
worker_busy[i] = 1;
worker_free_count--;
return i;
}
}
return -1;
}
IndexerResult IndexerThreadPool::Run(const DiffractionExperiment &experiment, const std::vector<Coord> &recip) {
if (experiment.GetIndexingAlgorithm() == IndexingAlgorithmEnum::None)
return IndexerResult{.lattice = {}, .indexing_time_s = 0};
if (recip.size() < viable_cell_min_spots)
return IndexerResult{.lattice = {}, .indexing_time_s = 0};
// Check if there is available worker
const int task = GetFreeWorker();
std::unique_ptr<IndexerResult> result;
if (task >= 0) {
try {
result = tasks[task]->Run(experiment, recip);
} catch (const std::exception &e) {
spdlog::error("Indexer thread failed: {}", e.what());
result = nullptr;
}
{
std::unique_lock<std::mutex> lock(m);
worker_busy[task] = 0;
worker_free_count++;
}
c.notify_one();
}
if (result)
return *result;
return IndexerResult{.lattice = {}, .indexing_time_s = 0};
}