Files
Jungfraujoch/image_analysis/indexing/IndexerThreadPool.cpp
T
leonarski_f 1558dddbb8 indexing: make the resolved-algorithm invariant explicit in the pool
IndexerThreadPool dispatches on DiffractionExperiment::GetIndexingAlgorithm(),
which already resolves Auto to a concrete algorithm (FFTW/FFT/FFBIDX) or None;
the pool has no policy to resolve Auto itself. The worker handled a stray Auto
with a dead branch and silently produced no result when the resolved algorithm
had no matching indexer built.

- Document on GetIndexingAlgorithm() that it never returns Auto.
- Throw a clear internal error at the pool boundary if Auto ever arrives.
- In the worker, replace the dead Auto branch with a loud failure for any
  resolved algorithm that has no matching indexer (e.g. a GPU algorithm on a
  host without a GPU), instead of returning no result silently.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-18 14:25:30 +02:00

247 lines
8.7 KiB
C++

// SPDX-FileCopyrightText: 2025 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
// SPDX-License-Identifier: GPL-3.0-only
#include "IndexerThreadPool.h"
#include "../common/CUDAWrapper.h"
#include "../common/Logger.h"
#ifdef JFJOCH_USE_CUDA
#include "FFBIDXIndexer.h"
#include "FFTIndexerGPU.h"
#endif
#ifdef JFJOCH_USE_FFTW
#include "FFTIndexerCPU.h"
#endif
IndexerThread::IndexerThread(const IndexingSettings &settings, int threadid) {
std::unique_lock<std::mutex> lock(m);
state = TaskState::STARTING;
worker_thread = std::thread(&IndexerThread::Worker, this, std::cref(settings), threadid);
c_running.wait(lock, [this] { return state != TaskState::STARTING; });
if (state == TaskState::ERROR) {
worker_thread.join();
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Indexer thread initialization failed");
}
}
void IndexerThread::Worker(const IndexingSettings &settings, int threadid) {
try {
pin_gpu();
} catch (const std::exception &e) {
spdlog::error("Failed to pin to GPU {}", e.what());
} catch (...) {
// GPU pinning errors are not critical and should be ignored for the time being.
}
std::unique_ptr<Indexer> fft_indexer, ffbidx_indexer, fftw_indexer;
#ifdef JFJOCH_USE_CUDA
try {
if (get_gpu_count() > 0) {
if (settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto
|| settings.GetAlgorithm() == IndexingAlgorithmEnum::FFT)
fft_indexer = std::make_unique<FFTIndexerGPU>(settings);
if (settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto
|| settings.GetAlgorithm() == IndexingAlgorithmEnum::FFBIDX)
ffbidx_indexer = std::make_unique<FFBIDXIndexer>();
}
} catch (const std::exception &e) {
spdlog::error("Failed to initialize GPU indexer: {}", e.what());
{
std::unique_lock<std::mutex> lock(m);
state = TaskState::ERROR;
}
c_running.notify_all();
return;
} catch (...) {
spdlog::error("Failed to initialize GPU indexer");
{
std::unique_lock<std::mutex> lock(m);
state = TaskState::ERROR;
}
c_running.notify_all();
return;
}
#endif
#ifdef JFJOCH_USE_FFTW
try {
if ((settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto && (get_gpu_count() == 0))
|| settings.GetAlgorithm() == IndexingAlgorithmEnum::FFTW)
fftw_indexer = std::make_unique<FFTIndexerCPU>(settings);
} catch (const std::exception &e) {
spdlog::error("Failed to initialize FFTW indexer: {}", e.what());
{
std::unique_lock<std::mutex> lock(m);
state = TaskState::ERROR;
}
c_running.notify_all();
return;
} catch (...) {
spdlog::error("Failed to initialize FFTW indexer");
{
std::unique_lock<std::mutex> lock(m);
state = TaskState::ERROR;
}
c_running.notify_all();
return;
}
#endif
{
std::unique_lock<std::mutex> lock(m);
state = TaskState::IDLE;
}
c_running.notify_all();
while (true) {
std::unique_ptr<TaskInput> input;
// Look for task + handle stop
{
std::unique_lock<std::mutex> lock(m);
c_start.wait(lock, [this] { return stop || state == TaskState::READY; });
if (stop && (state != TaskState::READY))
return;
state = TaskState::RUNNING;
input = std::move(task_input);
}
if (input) {
std::unique_ptr<IndexerResult> tmp_result;
try {
auto algorithm = input->experiment.GetIndexingAlgorithm();
Indexer *indexer = nullptr;
if (algorithm == IndexingAlgorithmEnum::FFT && fft_indexer) {
indexer = fft_indexer.get();
} else if (algorithm == IndexingAlgorithmEnum::FFBIDX && ffbidx_indexer) {
indexer = ffbidx_indexer.get();
} else if (algorithm == IndexingAlgorithmEnum::FFTW && fftw_indexer) {
indexer = fftw_indexer.get();
} else {
// Algorithm is already resolved here (never Auto/None - see
// IndexerThreadPool::Run). Reaching this means the resolved algorithm
// has no matching indexer in this worker (e.g. a GPU algorithm on a
// host without a GPU) - fail loudly instead of silently not indexing.
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Internal error: no indexer available for the resolved "
"indexing algorithm");
}
if (indexer) {
indexer->Setup(input->experiment);
tmp_result = std::make_unique<IndexerResult>(indexer->Run(input->recip));
}
} catch (std::exception &e) {
tmp_result = nullptr;
spdlog::error("Indexer thread {} failed: {}", threadid, e.what());
}
{
std::unique_lock<std::mutex> lock(m);
state = TaskState::COMPLETED;
result = std::move(tmp_result);
}
c_done.notify_all();
}
}
}
void IndexerThread::Finalize() {
{
std::unique_lock<std::mutex> lock(m);
stop = true;
}
c_start.notify_all();
if (worker_thread.joinable())
worker_thread.join();
}
std::unique_ptr<IndexerResult> IndexerThread::Run(const DiffractionExperiment &experiment,
const std::vector<Coord> &recip) {
std::unique_ptr<IndexerResult> tmp_result;
{
std::unique_lock<std::mutex> lock(m);
if (stop)
return nullptr;
if (state != TaskState::IDLE)
return nullptr;
task_input = std::make_unique<TaskInput>(std::cref(experiment), std::cref(recip));
state = TaskState::READY;
}
c_start.notify_one();
{
std::unique_lock<std::mutex> lock(m);
c_done.wait(lock, [this] { return state == TaskState::COMPLETED; });
tmp_result = std::move(result);
state = TaskState::IDLE;
}
return tmp_result;
}
IndexerThread::~IndexerThread() {
Finalize();
}
IndexerThreadPool::IndexerThreadPool(const IndexingSettings &settings)
: worker_busy(settings.GetIndexingThreads(), 0),
worker_free_count(settings.GetIndexingThreads()),
viable_cell_min_spots(settings.GetViableCellMinSpots()),
blocking(settings.GetBlockingBehavior()) {
for (size_t i = 0; i < settings.GetIndexingThreads(); ++i)
tasks.emplace_back(std::make_unique<IndexerThread>(std::cref(settings), i));
}
int IndexerThreadPool::GetFreeWorker() {
std::unique_lock<std::mutex> lock(m);
if (tasks.size() == 0)
return -1;
if (blocking)
c.wait(lock, [this] { return worker_free_count > 0; });
for (int i = 0; i < tasks.size(); i++) {
if (worker_busy[i] == 0) {
worker_busy[i] = 1;
worker_free_count--;
return i;
}
}
return -1;
}
IndexerResult IndexerThreadPool::Run(const DiffractionExperiment &experiment, const std::vector<Coord> &recip) {
const auto algorithm = experiment.GetIndexingAlgorithm();
if (algorithm == IndexingAlgorithmEnum::None)
return IndexerResult{.lattice = {}, .indexing_time_s = 0, .executed = false};
// GetIndexingAlgorithm() must already have resolved Auto to a concrete algorithm;
// the pool has no policy to resolve it, so Auto here is an upstream contract bug.
if (algorithm == IndexingAlgorithmEnum::Auto)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Internal error: indexing algorithm must be resolved (not Auto) "
"before reaching the indexer pool");
// Check if there is available worker
const int task = GetFreeWorker();
std::unique_ptr<IndexerResult> result;
if (task >= 0) {
try {
result = tasks[task]->Run(experiment, recip);
} catch (const std::exception &e) {
spdlog::error("Indexer thread failed: {}", e.what());
result = nullptr;
}
{
std::unique_lock<std::mutex> lock(m);
worker_busy[task] = 0;
worker_free_count++;
}
c.notify_one();
}
if (result)
return *result;
return IndexerResult{.lattice = {}, .indexing_time_s = 0};
}