v1.0.0-rc.81
This commit is contained in:
@@ -6,11 +6,15 @@
|
||||
|
||||
#ifdef JFJOCH_USE_CUDA
|
||||
#include "FFBIDXIndexer.h"
|
||||
#include "FFTIndexer.h"
|
||||
#include "FFTIndexerGPU.h"
|
||||
#endif
|
||||
|
||||
IndexerThreadPool::IndexerThreadPool(const NUMAHWPolicy &numa_policy, const IndexingSettings& settings)
|
||||
: stop(false), workers_ready(settings.GetIndexingThreads()) {
|
||||
#ifdef JFJOCH_USE_FFTW
|
||||
#include "FFTIndexerCPU.h"
|
||||
#endif
|
||||
|
||||
IndexerThreadPool::IndexerThreadPool(const IndexingSettings &settings, const NUMAHWPolicy &numa_policy)
|
||||
: stop(false), workers_ready(settings.GetIndexingThreads()) {
|
||||
for (size_t i = 0; i < settings.GetIndexingThreads(); ++i)
|
||||
workers.emplace_back([this, i, numa_policy, settings] { Worker(i, numa_policy, settings); });
|
||||
workers_ready.wait();
|
||||
@@ -22,7 +26,7 @@ IndexerThreadPool::IndexerThreadPool(const NUMAHWPolicy &numa_policy, const Inde
|
||||
}
|
||||
cond.notify_all();
|
||||
|
||||
for (std::thread &worker : workers) {
|
||||
for (std::thread &worker: workers) {
|
||||
if (worker.joinable())
|
||||
worker.join();
|
||||
}
|
||||
@@ -31,27 +35,23 @@ IndexerThreadPool::IndexerThreadPool(const NUMAHWPolicy &numa_policy, const Inde
|
||||
}
|
||||
}
|
||||
|
||||
IndexerThreadPool::~IndexerThreadPool() {
|
||||
{
|
||||
IndexerThreadPool::~IndexerThreadPool() { {
|
||||
std::unique_lock<std::mutex> lock(m);
|
||||
stop = true;
|
||||
}
|
||||
cond.notify_all();
|
||||
|
||||
for (std::thread &worker : workers) {
|
||||
for (std::thread &worker: workers) {
|
||||
if (worker.joinable())
|
||||
worker.join();
|
||||
}
|
||||
}
|
||||
|
||||
std::future<std::optional<CrystalLattice>> IndexerThreadPool::Run(const DiffractionExperiment& experiment,
|
||||
DataMessage& message) {
|
||||
|
||||
std::future<std::optional<CrystalLattice> > IndexerThreadPool::Run(const DiffractionExperiment &experiment,
|
||||
DataMessage &message) {
|
||||
// Create a promise/future pair
|
||||
auto promise = std::make_shared<std::promise<std::optional<CrystalLattice>>>();
|
||||
std::future<std::optional<CrystalLattice>> result = promise->get_future();
|
||||
|
||||
{
|
||||
auto promise = std::make_shared<std::promise<std::optional<CrystalLattice> > >();
|
||||
std::future<std::optional<CrystalLattice> > result = promise->get_future(); {
|
||||
std::unique_lock<std::mutex> lock(m);
|
||||
|
||||
// Don't allow enqueueing after stopping the pool
|
||||
@@ -67,31 +67,40 @@ std::future<std::optional<CrystalLattice>> IndexerThreadPool::Run(const Diffract
|
||||
return result;
|
||||
}
|
||||
|
||||
void IndexerThreadPool::Worker(size_t threadIndex, const NUMAHWPolicy &numa_policy, const IndexingSettings& settings) {
|
||||
void IndexerThreadPool::Worker(size_t threadIndex, const NUMAHWPolicy &numa_policy, const IndexingSettings &settings) {
|
||||
try {
|
||||
numa_policy.Bind(threadIndex);
|
||||
} catch (...) {
|
||||
// NUMA policy errors are not critical and should be ignored for the time being.
|
||||
}
|
||||
|
||||
std::unique_ptr<Indexer> fft_indexer, ffbidx_indexer;
|
||||
std::unique_ptr<Indexer> fft_indexer, ffbidx_indexer, fftw_indexer;
|
||||
|
||||
#ifdef JFJOCH_USE_CUDA
|
||||
try {
|
||||
if (get_gpu_count() > 0) {
|
||||
fft_indexer = std::make_unique<FFTIndexer>(settings);
|
||||
ffbidx_indexer = std::make_unique<FFBIDXIndexer>();
|
||||
if (settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto
|
||||
|| settings.GetAlgorithm() == IndexingAlgorithmEnum::FFT)
|
||||
fft_indexer = std::make_unique<FFTIndexerGPU>(settings);
|
||||
|
||||
if (settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto
|
||||
|| settings.GetAlgorithm() == IndexingAlgorithmEnum::FFBIDX)
|
||||
ffbidx_indexer = std::make_unique<FFBIDXIndexer>();
|
||||
}
|
||||
} catch (...) {
|
||||
failed_start = true;
|
||||
}
|
||||
#endif
|
||||
#ifdef JFJOCH_USE_FFTW
|
||||
if ((settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto && (get_gpu_count() == 0))
|
||||
|| settings.GetAlgorithm() == IndexingAlgorithmEnum::FFTW)
|
||||
fftw_indexer = std::make_unique<FFTIndexerCPU>(settings);
|
||||
#endif
|
||||
|
||||
workers_ready.count_down();
|
||||
|
||||
while (true) {
|
||||
TaskPackage task;
|
||||
|
||||
{
|
||||
TaskPackage task; {
|
||||
std::unique_lock<std::mutex> lock(m);
|
||||
|
||||
// Add a timeout to the wait to ensure we can exit even if no notification
|
||||
@@ -101,13 +110,13 @@ void IndexerThreadPool::Worker(size_t threadIndex, const NUMAHWPolicy &numa_poli
|
||||
|
||||
// Check for exit conditions
|
||||
if (stop && taskQueue.empty())
|
||||
return; // Exit cleanly
|
||||
return; // Exit cleanly
|
||||
|
||||
if (!taskQueue.empty()) {
|
||||
task = std::move(taskQueue.front());
|
||||
taskQueue.pop();
|
||||
} else {
|
||||
continue; // No tasks, go back to waiting
|
||||
continue; // No tasks, go back to waiting
|
||||
}
|
||||
}
|
||||
try {
|
||||
@@ -120,6 +129,8 @@ void IndexerThreadPool::Worker(size_t threadIndex, const NUMAHWPolicy &numa_poli
|
||||
indexer = fft_indexer.get();
|
||||
} else if (algorithm == IndexingAlgorithmEnum::FFBIDX && ffbidx_indexer) {
|
||||
indexer = ffbidx_indexer.get();
|
||||
} else if (algorithm == IndexingAlgorithmEnum::FFTW && fftw_indexer) {
|
||||
indexer = fftw_indexer.get();
|
||||
}
|
||||
|
||||
if (indexer) {
|
||||
|
||||
Reference in New Issue
Block a user