// SPDX-FileCopyrightText: 2025 Filip Leonarski, Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-only #include "IndexerThreadPool.h" #include "../common/CUDAWrapper.h" #include "../common/Logger.h" #ifdef JFJOCH_USE_CUDA #include "FFBIDXIndexer.h" #include "FFTIndexerGPU.h" #endif #ifdef JFJOCH_USE_FFTW #include "FFTIndexerCPU.h" #endif IndexerThread::IndexerThread(const IndexingSettings &settings, int threadid) { std::unique_lock lock(m); state = TaskState::STARTING; worker_thread = std::thread(&IndexerThread::Worker, this, std::cref(settings), threadid); c_running.wait(lock, [this] { return state != TaskState::STARTING; }); if (state == TaskState::ERROR) { worker_thread.join(); throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Indexer thread initialization failed"); } } void IndexerThread::Worker(const IndexingSettings &settings, int threadid) { try { #ifdef JFJOCH_USE_CUDA auto gpu_count = get_gpu_count(); if (gpu_count > 0) NUMAHWPolicy::SelectGPUAndItsNUMA(threadid % gpu_count); #endif } catch (const std::exception &e) { spdlog::error("Failed to bind thread to NUMA node: {}", e.what()); } catch (...) { // NUMA policy errors are not critical and should be ignored for the time being. } std::unique_ptr fft_indexer, ffbidx_indexer, fftw_indexer; #ifdef JFJOCH_USE_CUDA try { if (get_gpu_count() > 0) { if (settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto || settings.GetAlgorithm() == IndexingAlgorithmEnum::FFT) fft_indexer = std::make_unique(settings); if (settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto || settings.GetAlgorithm() == IndexingAlgorithmEnum::FFBIDX) ffbidx_indexer = std::make_unique(); } } catch (const std::exception &e) { spdlog::error("Failed to initialize GPU indexer: {}", e.what()); { std::unique_lock lock(m); state = TaskState::ERROR; } c_running.notify_all(); return; } catch (...) { spdlog::error("Failed to initialize GPU indexer"); { std::unique_lock lock(m); state = TaskState::ERROR; } c_running.notify_all(); return; } #endif #ifdef JFJOCH_USE_FFTW try { if ((settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto && (get_gpu_count() == 0)) || settings.GetAlgorithm() == IndexingAlgorithmEnum::FFTW) fftw_indexer = std::make_unique(settings); } catch (const std::exception &e) { spdlog::error("Failed to initialize FFTW indexer: {}", e.what()); { std::unique_lock lock(m); state = TaskState::ERROR; } c_running.notify_all(); return; } catch (...) { spdlog::error("Failed to initialize FFTW indexer"); { std::unique_lock lock(m); state = TaskState::ERROR; } c_running.notify_all(); return; } #endif { std::unique_lock lock(m); state = TaskState::IDLE; } c_running.notify_all(); while (true) { std::unique_ptr input; // Look for task + handle stop { std::unique_lock lock(m); c_start.wait(lock, [this] { return stop || state == TaskState::READY; }); if (stop && (state != TaskState::READY)) return; state = TaskState::RUNNING; input = std::move(task_input); } if (input) { std::unique_ptr tmp_result; try { auto algorithm = input->experiment.GetIndexingAlgorithm(); Indexer *indexer = nullptr; if (algorithm == IndexingAlgorithmEnum::FFT && fft_indexer) { indexer = fft_indexer.get(); } else if (algorithm == IndexingAlgorithmEnum::FFBIDX && ffbidx_indexer) { indexer = ffbidx_indexer.get(); } else if (algorithm == IndexingAlgorithmEnum::FFTW && fftw_indexer) { indexer = fftw_indexer.get(); } else if (algorithm == IndexingAlgorithmEnum::Auto) { throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Internal error: Invalid indexing algorithm provided"); } if (indexer) { indexer->Setup(input->experiment); tmp_result = std::make_unique(indexer->Run(input->recip)); } } catch (std::exception &e) { tmp_result = nullptr; spdlog::error("Indexer thread {} failed: {}", threadid, e.what()); } { std::unique_lock lock(m); state = TaskState::COMPLETED; result = std::move(tmp_result); } c_done.notify_all(); } } } void IndexerThread::Finalize() { { std::unique_lock lock(m); stop = true; } c_start.notify_all(); if (worker_thread.joinable()) worker_thread.join(); } std::unique_ptr IndexerThread::Run(const DiffractionExperiment &experiment, const std::vector &recip) { std::unique_ptr tmp_result; { std::unique_lock lock(m); if (stop) return nullptr; if (state != TaskState::IDLE) return nullptr; task_input = std::make_unique(std::cref(experiment), std::cref(recip)); state = TaskState::READY; } c_start.notify_one(); { std::unique_lock lock(m); c_done.wait(lock, [this] { return state == TaskState::COMPLETED; }); tmp_result = std::move(result); state = TaskState::IDLE; } return tmp_result; } IndexerThread::~IndexerThread() { Finalize(); } IndexerThreadPool::IndexerThreadPool(const IndexingSettings &settings) : worker_busy(settings.GetIndexingThreads(), 0), worker_free_count(settings.GetIndexingThreads()), viable_cell_min_spots(settings.GetViableCellMinSpots()), blocking(settings.GetBlockingBehavior()) { for (size_t i = 0; i < settings.GetIndexingThreads(); ++i) tasks.emplace_back(std::make_unique(std::cref(settings), i)); } int IndexerThreadPool::GetFreeWorker() { std::unique_lock lock(m); if (tasks.size() == 0) return -1; if (blocking) c.wait(lock, [this] { return worker_free_count > 0; }); for (int i = 0; i < tasks.size(); i++) { if (worker_busy[i] == 0) { worker_busy[i] = 1; worker_free_count--; return i; } } return -1; } IndexerResult IndexerThreadPool::Run(const DiffractionExperiment &experiment, const std::vector &recip) { if (experiment.GetIndexingAlgorithm() == IndexingAlgorithmEnum::None) return IndexerResult{.lattice = {}, .indexing_time_s = 0}; // Check if there is available worker const int task = GetFreeWorker(); std::unique_ptr result; if (task >= 0) { try { result = tasks[task]->Run(experiment, recip); } catch (const std::exception &e) { spdlog::error("Indexer thread failed: {}", e.what()); result = nullptr; } { std::unique_lock lock(m); worker_busy[task] = 0; worker_free_count++; } c.notify_one(); } if (result) return *result; return IndexerResult{.lattice = {}, .indexing_time_s = 0}; }