20901579af
Build Packages / Unit tests (push) Waiting to run
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 12m15s
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 14m59s
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 16m41s
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 17m11s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 17m24s
Build Packages / build:rpm (rocky8) (push) Successful in 17m55s
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 18m38s
Build Packages / build:rpm (rocky9) (push) Successful in 18m47s
Build Packages / Generate python client (push) Successful in 1m47s
Build Packages / Build documentation (push) Successful in 2m16s
Build Packages / Create release (push) Has been skipped
Build Packages / build:rpm (ubuntu2204) (push) Successful in 9m31s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 9m14s
Build Packages / XDS test (JFJoch plugin) (push) Successful in 12m19s
Build Packages / XDS test (neggia plugin) (push) Successful in 11m57s
Build Packages / DIALS test (push) Successful in 14m14s
Build Packages / XDS test (durin plugin) (push) Successful in 13m43s
Build Packages / build:rpm (rocky8_nocuda) (pull_request) Successful in 11m44s
Build Packages / build:rpm (rocky9_nocuda) (pull_request) Successful in 12m35s
Build Packages / build:rpm (ubuntu2204_nocuda) (pull_request) Successful in 10m7s
Build Packages / build:rpm (ubuntu2404_nocuda) (pull_request) Successful in 8m42s
Build Packages / build:rpm (rocky8_sls9) (pull_request) Successful in 10m21s
Build Packages / build:rpm (rocky8) (pull_request) Successful in 12m5s
Build Packages / build:rpm (rocky9_sls9) (pull_request) Successful in 12m41s
Build Packages / build:rpm (ubuntu2404) (pull_request) Successful in 11m52s
Build Packages / build:rpm (ubuntu2204) (pull_request) Successful in 12m7s
Build Packages / Generate python client (pull_request) Successful in 25s
Build Packages / build:rpm (rocky9) (pull_request) Successful in 13m56s
Build Packages / Create release (pull_request) Has been skipped
Build Packages / Build documentation (pull_request) Successful in 55s
Build Packages / XDS test (durin plugin) (pull_request) Successful in 8m4s
Build Packages / DIALS test (pull_request) Successful in 12m3s
Build Packages / XDS test (neggia plugin) (pull_request) Successful in 7m2s
Build Packages / XDS test (JFJoch plugin) (pull_request) Successful in 7m50s
Build Packages / Unit tests (pull_request) Successful in 58m24s
240 lines
8.1 KiB
C++
240 lines
8.1 KiB
C++
// SPDX-FileCopyrightText: 2025 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
|
|
// SPDX-License-Identifier: GPL-3.0-only
|
|
|
|
#include "IndexerThreadPool.h"
|
|
#include "../common/CUDAWrapper.h"
|
|
#include "../common/Logger.h"
|
|
|
|
#ifdef JFJOCH_USE_CUDA
|
|
#include "FFBIDXIndexer.h"
|
|
#include "FFTIndexerGPU.h"
|
|
#endif
|
|
|
|
#ifdef JFJOCH_USE_FFTW
|
|
#include "FFTIndexerCPU.h"
|
|
#endif
|
|
|
|
IndexerThread::IndexerThread(const IndexingSettings &settings, int threadid) {
|
|
std::unique_lock<std::mutex> lock(m);
|
|
state = TaskState::STARTING;
|
|
worker_thread = std::thread(&IndexerThread::Worker, this, std::cref(settings), threadid);
|
|
c_running.wait(lock, [this] { return state != TaskState::STARTING; });
|
|
if (state == TaskState::ERROR) {
|
|
worker_thread.join();
|
|
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
|
|
"Indexer thread initialization failed");
|
|
}
|
|
}
|
|
|
|
void IndexerThread::Worker(const IndexingSettings &settings, int threadid) {
|
|
try {
|
|
#ifdef JFJOCH_USE_CUDA
|
|
auto gpu_count = get_gpu_count();
|
|
if (gpu_count > 0)
|
|
NUMAHWPolicy::SelectGPUAndItsNUMA(threadid % gpu_count);
|
|
#endif
|
|
} catch (const std::exception &e) {
|
|
spdlog::error("Failed to bind thread to NUMA node: {}", e.what());
|
|
} catch (...) {
|
|
// NUMA policy errors are not critical and should be ignored for the time being.
|
|
}
|
|
|
|
std::unique_ptr<Indexer> fft_indexer, ffbidx_indexer, fftw_indexer;
|
|
|
|
#ifdef JFJOCH_USE_CUDA
|
|
try {
|
|
if (get_gpu_count() > 0) {
|
|
if (settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto
|
|
|| settings.GetAlgorithm() == IndexingAlgorithmEnum::FFT)
|
|
fft_indexer = std::make_unique<FFTIndexerGPU>(settings);
|
|
|
|
if (settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto
|
|
|| settings.GetAlgorithm() == IndexingAlgorithmEnum::FFBIDX)
|
|
ffbidx_indexer = std::make_unique<FFBIDXIndexer>();
|
|
}
|
|
} catch (const std::exception &e) {
|
|
spdlog::error("Failed to initialize GPU indexer: {}", e.what());
|
|
{
|
|
std::unique_lock<std::mutex> lock(m);
|
|
state = TaskState::ERROR;
|
|
}
|
|
c_running.notify_all();
|
|
return;
|
|
} catch (...) {
|
|
spdlog::error("Failed to initialize GPU indexer");
|
|
{
|
|
std::unique_lock<std::mutex> lock(m);
|
|
state = TaskState::ERROR;
|
|
}
|
|
c_running.notify_all();
|
|
return;
|
|
}
|
|
#endif
|
|
#ifdef JFJOCH_USE_FFTW
|
|
try {
|
|
if ((settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto && (get_gpu_count() == 0))
|
|
|| settings.GetAlgorithm() == IndexingAlgorithmEnum::FFTW)
|
|
fftw_indexer = std::make_unique<FFTIndexerCPU>(settings);
|
|
} catch (const std::exception &e) {
|
|
spdlog::error("Failed to initialize FFTW indexer: {}", e.what());
|
|
{
|
|
std::unique_lock<std::mutex> lock(m);
|
|
state = TaskState::ERROR;
|
|
}
|
|
c_running.notify_all();
|
|
return;
|
|
} catch (...) {
|
|
spdlog::error("Failed to initialize FFTW indexer");
|
|
{
|
|
std::unique_lock<std::mutex> lock(m);
|
|
state = TaskState::ERROR;
|
|
}
|
|
c_running.notify_all();
|
|
return;
|
|
}
|
|
#endif
|
|
{
|
|
std::unique_lock<std::mutex> lock(m);
|
|
state = TaskState::IDLE;
|
|
}
|
|
c_running.notify_all();
|
|
|
|
while (true) {
|
|
std::unique_ptr<TaskInput> input;
|
|
// Look for task + handle stop
|
|
{
|
|
std::unique_lock<std::mutex> lock(m);
|
|
c_start.wait(lock, [this] { return stop || state == TaskState::READY; });
|
|
if (stop && (state != TaskState::READY))
|
|
return;
|
|
state = TaskState::RUNNING;
|
|
input = std::move(task_input);
|
|
}
|
|
if (input) {
|
|
std::unique_ptr<IndexerResult> tmp_result;
|
|
try {
|
|
auto algorithm = input->experiment.GetIndexingAlgorithm();
|
|
Indexer *indexer = nullptr;
|
|
|
|
if (algorithm == IndexingAlgorithmEnum::FFT && fft_indexer) {
|
|
indexer = fft_indexer.get();
|
|
} else if (algorithm == IndexingAlgorithmEnum::FFBIDX && ffbidx_indexer) {
|
|
indexer = ffbidx_indexer.get();
|
|
} else if (algorithm == IndexingAlgorithmEnum::FFTW && fftw_indexer) {
|
|
indexer = fftw_indexer.get();
|
|
} else if (algorithm == IndexingAlgorithmEnum::Auto) {
|
|
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
|
|
"Internal error: Invalid indexing algorithm provided");
|
|
}
|
|
|
|
if (indexer) {
|
|
indexer->Setup(input->experiment);
|
|
tmp_result = std::make_unique<IndexerResult>(indexer->Run(input->recip));
|
|
}
|
|
} catch (std::exception &e) {
|
|
tmp_result = nullptr;
|
|
spdlog::error("Indexer thread {} failed: {}", threadid, e.what());
|
|
}
|
|
{
|
|
std::unique_lock<std::mutex> lock(m);
|
|
state = TaskState::COMPLETED;
|
|
result = std::move(tmp_result);
|
|
}
|
|
c_done.notify_all();
|
|
}
|
|
}
|
|
}
|
|
|
|
void IndexerThread::Finalize() {
|
|
{
|
|
std::unique_lock<std::mutex> lock(m);
|
|
stop = true;
|
|
}
|
|
c_start.notify_all();
|
|
if (worker_thread.joinable())
|
|
worker_thread.join();
|
|
}
|
|
|
|
std::unique_ptr<IndexerResult> IndexerThread::Run(const DiffractionExperiment &experiment,
|
|
const std::vector<Coord> &recip) {
|
|
std::unique_ptr<IndexerResult> tmp_result;
|
|
{
|
|
std::unique_lock<std::mutex> lock(m);
|
|
if (stop)
|
|
return nullptr;
|
|
if (state != TaskState::IDLE)
|
|
return nullptr;
|
|
task_input = std::make_unique<TaskInput>(std::cref(experiment), std::cref(recip));
|
|
state = TaskState::READY;
|
|
}
|
|
c_start.notify_one();
|
|
{
|
|
std::unique_lock<std::mutex> lock(m);
|
|
c_done.wait(lock, [this] { return state == TaskState::COMPLETED; });
|
|
tmp_result = std::move(result);
|
|
state = TaskState::IDLE;
|
|
}
|
|
return tmp_result;
|
|
}
|
|
|
|
IndexerThread::~IndexerThread() {
|
|
Finalize();
|
|
}
|
|
|
|
IndexerThreadPool::IndexerThreadPool(const IndexingSettings &settings)
|
|
: worker_busy(settings.GetIndexingThreads(), 0),
|
|
worker_free_count(settings.GetIndexingThreads()),
|
|
viable_cell_min_spots(settings.GetViableCellMinSpots()),
|
|
blocking(settings.GetBlockingBehavior()) {
|
|
for (size_t i = 0; i < settings.GetIndexingThreads(); ++i)
|
|
tasks.emplace_back(std::make_unique<IndexerThread>(std::cref(settings), i));
|
|
}
|
|
|
|
int IndexerThreadPool::GetFreeWorker() {
|
|
std::unique_lock<std::mutex> lock(m);
|
|
|
|
if (tasks.size() == 0)
|
|
return -1;
|
|
|
|
if (blocking)
|
|
c.wait(lock, [this] { return worker_free_count > 0; });
|
|
|
|
for (int i = 0; i < tasks.size(); i++) {
|
|
if (worker_busy[i] == 0) {
|
|
worker_busy[i] = 1;
|
|
worker_free_count--;
|
|
return i;
|
|
}
|
|
}
|
|
return -1;
|
|
}
|
|
|
|
IndexerResult IndexerThreadPool::Run(const DiffractionExperiment &experiment, const std::vector<Coord> &recip) {
|
|
if (experiment.GetIndexingAlgorithm() == IndexingAlgorithmEnum::None)
|
|
return IndexerResult{.lattice = {}, .indexing_time_s = 0};
|
|
if (recip.size() < viable_cell_min_spots)
|
|
return IndexerResult{.lattice = {}, .indexing_time_s = 0};
|
|
|
|
// Check if there is available worker
|
|
const int task = GetFreeWorker();
|
|
|
|
std::unique_ptr<IndexerResult> result;
|
|
if (task >= 0) {
|
|
try {
|
|
result = tasks[task]->Run(experiment, recip);
|
|
} catch (const std::exception &e) {
|
|
spdlog::error("Indexer thread failed: {}", e.what());
|
|
result = nullptr;
|
|
}
|
|
{
|
|
std::unique_lock<std::mutex> lock(m);
|
|
worker_busy[task] = 0;
|
|
worker_free_count++;
|
|
}
|
|
c.notify_one();
|
|
}
|
|
if (result)
|
|
return *result;
|
|
return IndexerResult{.lattice = {}, .indexing_time_s = 0};
|
|
}
|