From 7899765e5fe7c1bbfcd19b4f2475858d831a6897 Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Thu, 16 Apr 2026 14:19:42 +0200 Subject: [PATCH 01/28] CBOR: Fix receiver_buf_in_sending encoding --- frame_serialize/CBORStream2Serializer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frame_serialize/CBORStream2Serializer.cpp b/frame_serialize/CBORStream2Serializer.cpp index de0abf60..1fc8902d 100644 --- a/frame_serialize/CBORStream2Serializer.cpp +++ b/frame_serialize/CBORStream2Serializer.cpp @@ -717,7 +717,7 @@ void CBORStream2Serializer::SerializeImageInternal(CborEncoder &mapEncoder, cons CBOR_ENC(mapEncoder, "jf_info", message.jf_info); CBOR_ENC(mapEncoder, "receiver_aq_dev_delay", message.receiver_aq_dev_delay); CBOR_ENC(mapEncoder, "receiver_free_send_buf", message.receiver_buf_available); - CBOR_ENC(mapEncoder, "receiver_buf_in_sending", message.receiver_buf_in_preparation); + CBOR_ENC(mapEncoder, "receiver_buf_in_sending", message.receiver_buf_in_sending); CBOR_ENC(mapEncoder, "receiver_buf_in_preparation", message.receiver_buf_in_preparation); CBOR_ENC(mapEncoder, "storage_cell", message.storage_cell); CBOR_ENC(mapEncoder, "saturated_pixel_count", message.saturated_pixel_count); -- 2.52.0 From 5a830d7af6a20dfb9af6dff4fccd882257624c39 Mon Sep 17 00:00:00 2001 From: leonarski_f Date: Thu, 16 Apr 2026 19:40:27 +0200 Subject: [PATCH 02/28] IndexerThreadPool: IndexerThreadPool return value, not promise + if no spots or no indexing algorithm, don't queue --- image_analysis/IndexAndRefine.cpp | 2 +- image_analysis/RotationIndexer.cpp | 2 +- image_analysis/indexing/IndexerThreadPool.cpp | 14 ++++++++++---- image_analysis/indexing/IndexerThreadPool.h | 4 +++- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/image_analysis/IndexAndRefine.cpp b/image_analysis/IndexAndRefine.cpp index fd19adbb..45f0e47c 100644 --- a/image_analysis/IndexAndRefine.cpp +++ b/image_analysis/IndexAndRefine.cpp @@ -56,7 +56,7 @@ IndexAndRefine::IndexingOutcome IndexAndRefine::DetermineLatticeAndSymmetry(Data recip.push_back(i.ReciprocalCoord(geom_)); } - auto indexer_result = indexer_->Run(experiment, recip).get(); + auto indexer_result = indexer_->Run(experiment, recip); msg.indexing_time_s = indexer_result.indexing_time_s; if (indexer_result.lattice.empty()) diff --git a/image_analysis/RotationIndexer.cpp b/image_analysis/RotationIndexer.cpp index df4d09f1..cd7a04ea 100644 --- a/image_analysis/RotationIndexer.cpp +++ b/image_analysis/RotationIndexer.cpp @@ -72,7 +72,7 @@ void RotationIndexer::TryIndex() { coords_sel = coords_; } - auto indexer_result = indexer_.Run(experiment, coords_sel).get(); + auto indexer_result = indexer_.Run(experiment, coords_sel); if (!indexer_result.lattice.empty()) { // Find lattice type search_result_ = LatticeSearch(indexer_result.lattice[0]); diff --git a/image_analysis/indexing/IndexerThreadPool.cpp b/image_analysis/indexing/IndexerThreadPool.cpp index ee6ff3ff..d8efeb2e 100644 --- a/image_analysis/indexing/IndexerThreadPool.cpp +++ b/image_analysis/indexing/IndexerThreadPool.cpp @@ -14,7 +14,9 @@ #endif IndexerThreadPool::IndexerThreadPool(const IndexingSettings &settings, const NUMAHWPolicy &numa_policy) - : stop(false), workers_ready(settings.GetIndexingThreads()) { + : workers_ready(settings.GetIndexingThreads()), + viable_cell_min_spots(settings.GetViableCellMinSpots()), + stop(false) { for (size_t i = 0; i < settings.GetIndexingThreads(); ++i) workers.emplace_back([this, i, numa_policy, settings] { Worker(i, numa_policy, settings); }); workers_ready.wait(); @@ -47,8 +49,12 @@ IndexerThreadPool::~IndexerThreadPool() { { } } -std::future IndexerThreadPool::Run(const DiffractionExperiment &experiment, - const std::vector& recip) { +IndexerResult IndexerThreadPool::Run(const DiffractionExperiment &experiment, const std::vector &recip) { + if (experiment.GetIndexingAlgorithm() == IndexingAlgorithmEnum::None) + return IndexerResult{.lattice = {}, .indexing_time_s = 0}; + if (recip.size() < viable_cell_min_spots) + return IndexerResult{.lattice = {}, .indexing_time_s = 0}; + // Create a promise/future pair auto promise = std::make_shared >(); std::future result = promise->get_future(); { @@ -64,7 +70,7 @@ std::future IndexerThreadPool::Run(const DiffractionExperiment &e } cond.notify_one(); - return result; + return result.get(); } void IndexerThreadPool::Worker(int32_t threadIndex, const NUMAHWPolicy &numa_policy, const IndexingSettings &settings) { diff --git a/image_analysis/indexing/IndexerThreadPool.h b/image_analysis/indexing/IndexerThreadPool.h index c2055aac..f4c42d75 100644 --- a/image_analysis/indexing/IndexerThreadPool.h +++ b/image_analysis/indexing/IndexerThreadPool.h @@ -38,13 +38,15 @@ class IndexerThreadPool { std::queue taskQueue; std::latch workers_ready; + const int64_t viable_cell_min_spots; + bool stop; void Worker(int32_t threadIndex, const NUMAHWPolicy &numa_policy, const IndexingSettings& settings); public: IndexerThreadPool(const IndexingSettings& settings, const NUMAHWPolicy &numa_policy = NUMAHWPolicy()); ~IndexerThreadPool(); - std::future Run(const DiffractionExperiment& experiment, const std::vector& recip); + IndexerResult Run(const DiffractionExperiment& experiment, const std::vector& recip); }; -- 2.52.0 From 31a1b90bb90167c7a04daf4282bcccbd5978d978 Mon Sep 17 00:00:00 2001 From: leonarski_f Date: Thu, 16 Apr 2026 19:40:58 +0200 Subject: [PATCH 03/28] Indexer: Fix viable cell min spots meaning to be more correct --- image_analysis/indexing/FFBIDXIndexer.cpp | 2 +- image_analysis/indexing/FFTIndexer.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/image_analysis/indexing/FFBIDXIndexer.cpp b/image_analysis/indexing/FFBIDXIndexer.cpp index 731efa36..6079826c 100644 --- a/image_analysis/indexing/FFBIDXIndexer.cpp +++ b/image_analysis/indexing/FFBIDXIndexer.cpp @@ -24,7 +24,7 @@ std::vector FFBIDXIndexer::RunInternal(const std::vector if (nspots > coord.size()) nspots = coord.size(); - if (nspots <= viable_cell_min_spots) + if (nspots < viable_cell_min_spots) return ret; assert(nspots <= MAX_SPOT_COUNT); diff --git a/image_analysis/indexing/FFTIndexer.cpp b/image_analysis/indexing/FFTIndexer.cpp index d4c8d848..b68d56b8 100644 --- a/image_analysis/indexing/FFTIndexer.cpp +++ b/image_analysis/indexing/FFTIndexer.cpp @@ -201,7 +201,7 @@ std::vector FFTIndexer::RunInternal(const std::vector &co if (nspots > coord.size()) nspots = coord.size(); - if (nspots <= viable_cell_min_spots) + if (nspots < viable_cell_min_spots) return {}; assert(nspots <= FFT_MAX_SPOTS); -- 2.52.0 From 6aa671c78c4f19e2bd0d30aeda3104e792d011af Mon Sep 17 00:00:00 2001 From: leonarski_f Date: Thu, 16 Apr 2026 19:53:12 +0200 Subject: [PATCH 04/28] IndexerThreadPool: Handle Auto indexer type --- image_analysis/indexing/IndexerThreadPool.cpp | 29 +++++++++++++++---- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/image_analysis/indexing/IndexerThreadPool.cpp b/image_analysis/indexing/IndexerThreadPool.cpp index d8efeb2e..668ae067 100644 --- a/image_analysis/indexing/IndexerThreadPool.cpp +++ b/image_analysis/indexing/IndexerThreadPool.cpp @@ -3,6 +3,7 @@ #include "IndexerThreadPool.h" #include "../common/CUDAWrapper.h" +#include "../common/Logger.h" #ifdef JFJOCH_USE_CUDA #include "FFBIDXIndexer.h" @@ -84,6 +85,8 @@ void IndexerThreadPool::Worker(int32_t threadIndex, const NUMAHWPolicy &numa_pol #else numa_policy.Bind(threadIndex); #endif + } catch (const std::exception &e) { + spdlog::error("Failed to bind thread to NUMA node: {}", e.what()); } catch (...) { // NUMA policy errors are not critical and should be ignored for the time being. } @@ -101,24 +104,37 @@ void IndexerThreadPool::Worker(int32_t threadIndex, const NUMAHWPolicy &numa_pol || settings.GetAlgorithm() == IndexingAlgorithmEnum::FFBIDX) ffbidx_indexer = std::make_unique(); } + } catch (const std::exception &e) { + spdlog::error("Failed to initialize GPU indexer: {}", e.what()); + failed_start = true; } catch (...) { + spdlog::error("Failed to initialize GPU indexer"); failed_start = true; } #endif #ifdef JFJOCH_USE_FFTW - if ((settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto && (get_gpu_count() == 0)) - || settings.GetAlgorithm() == IndexingAlgorithmEnum::FFTW) - fftw_indexer = std::make_unique(settings); + try { + if ((settings.GetAlgorithm() == IndexingAlgorithmEnum::Auto && (get_gpu_count() == 0)) + || settings.GetAlgorithm() == IndexingAlgorithmEnum::FFTW) + fftw_indexer = std::make_unique(settings); + } catch (const std::exception &e) { + spdlog::error("Failed to initialize FFTW indexer: {}", e.what()); + failed_start = true; + } catch (...) { + spdlog::error("Failed to initialize FFTW indexer"); + failed_start = true; + } #endif workers_ready.count_down(); while (true) { - TaskPackage task; { + TaskPackage task; + { std::unique_lock lock(m); // Add a timeout to the wait to ensure we can exit even if no notification - cond.wait_for(lock, std::chrono::seconds(1), [this] { + cond.wait_for(lock, std::chrono::milliseconds(50), [this] { return stop || !taskQueue.empty(); }); @@ -145,6 +161,9 @@ void IndexerThreadPool::Worker(int32_t threadIndex, const NUMAHWPolicy &numa_pol indexer = ffbidx_indexer.get(); } else if (algorithm == IndexingAlgorithmEnum::FFTW && fftw_indexer) { indexer = fftw_indexer.get(); + } else if (algorithm == IndexingAlgorithmEnum::Auto) { + throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, + "Internal error: Invalid indexing algorithm provided"); } if (indexer) { -- 2.52.0 From 801a50830ac0b7f9e3a898b9fbeb2e502d1cac0d Mon Sep 17 00:00:00 2001 From: leonarski_f Date: Thu, 16 Apr 2026 22:35:32 +0200 Subject: [PATCH 05/28] IndexerThreadPool: New version - requests are dropped, if no available workers (good for real-time, not good for offline processing...need smarter fix) --- image_analysis/indexing/IndexerThreadPool.cpp | 272 ++++++++++-------- image_analysis/indexing/IndexerThreadPool.h | 41 +-- receiver/JFJochReceiverService.cpp | 2 +- 3 files changed, 181 insertions(+), 134 deletions(-) diff --git a/image_analysis/indexing/IndexerThreadPool.cpp b/image_analysis/indexing/IndexerThreadPool.cpp index 668ae067..3d57769f 100644 --- a/image_analysis/indexing/IndexerThreadPool.cpp +++ b/image_analysis/indexing/IndexerThreadPool.cpp @@ -14,76 +14,24 @@ #include "FFTIndexerCPU.h" #endif -IndexerThreadPool::IndexerThreadPool(const IndexingSettings &settings, const NUMAHWPolicy &numa_policy) - : workers_ready(settings.GetIndexingThreads()), - viable_cell_min_spots(settings.GetViableCellMinSpots()), - stop(false) { - for (size_t i = 0; i < settings.GetIndexingThreads(); ++i) - workers.emplace_back([this, i, numa_policy, settings] { Worker(i, numa_policy, settings); }); - workers_ready.wait(); - - if (failed_start) { - { - std::unique_lock lock(m); - stop = true; - } - cond.notify_all(); - - for (std::thread &worker: workers) { - if (worker.joinable()) - worker.join(); - } - throw JFJochException(JFJochExceptionCategory::GPUCUDAError, - "Cannot configure indexer (likely too many threads, not enough memory)"); +IndexerThread::IndexerThread(const IndexingSettings &settings, int threadid) { + std::unique_lock lock(m); + state = TaskState::STARTING; + worker_thread = std::thread(&IndexerThread::Worker, this, std::cref(settings), threadid); + c_running.wait(lock, [this] { return state != TaskState::STARTING; }); + if (state == TaskState::ERROR) { + worker_thread.join(); + throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, + "Indexer thread initialization failed"); } } -IndexerThreadPool::~IndexerThreadPool() { { - std::unique_lock lock(m); - stop = true; - } - cond.notify_all(); - - for (std::thread &worker: workers) { - if (worker.joinable()) - worker.join(); - } -} - -IndexerResult IndexerThreadPool::Run(const DiffractionExperiment &experiment, const std::vector &recip) { - if (experiment.GetIndexingAlgorithm() == IndexingAlgorithmEnum::None) - return IndexerResult{.lattice = {}, .indexing_time_s = 0}; - if (recip.size() < viable_cell_min_spots) - return IndexerResult{.lattice = {}, .indexing_time_s = 0}; - - // Create a promise/future pair - auto promise = std::make_shared >(); - std::future result = promise->get_future(); { - std::unique_lock lock(m); - - // Don't allow enqueueing after stopping the pool - if (stop) { - throw std::runtime_error("Cannot enqueue on stopped thread pool"); - } - - // Create a task package with the data message and coordinates - taskQueue.emplace(TaskPackage{promise, &experiment, &recip}); - } - - cond.notify_one(); - return result.get(); -} - -void IndexerThreadPool::Worker(int32_t threadIndex, const NUMAHWPolicy &numa_policy, const IndexingSettings &settings) { +void IndexerThread::Worker(const IndexingSettings &settings, int threadid) { try { #ifdef JFJOCH_USE_CUDA auto gpu_count = get_gpu_count(); if (gpu_count > 0) - NUMAHWPolicy::SelectGPUAndItsNUMA(threadIndex % gpu_count); - else - numa_policy.Bind(threadIndex); -#else - numa_policy.Bind(threadIndex); + NUMAHWPolicy::SelectGPUAndItsNUMA(threadid % gpu_count); #endif } catch (const std::exception &e) { spdlog::error("Failed to bind thread to NUMA node: {}", e.what()); @@ -106,10 +54,20 @@ void IndexerThreadPool::Worker(int32_t threadIndex, const NUMAHWPolicy &numa_pol } } catch (const std::exception &e) { spdlog::error("Failed to initialize GPU indexer: {}", e.what()); - failed_start = true; + { + std::unique_lock lock(m); + state = TaskState::ERROR; + } + c_running.notify_all(); + return; } catch (...) { spdlog::error("Failed to initialize GPU indexer"); - failed_start = true; + { + std::unique_lock lock(m); + state = TaskState::ERROR; + } + c_running.notify_all(); + return; } #endif #ifdef JFJOCH_USE_FFTW @@ -119,65 +77,145 @@ void IndexerThreadPool::Worker(int32_t threadIndex, const NUMAHWPolicy &numa_pol fftw_indexer = std::make_unique(settings); } catch (const std::exception &e) { spdlog::error("Failed to initialize FFTW indexer: {}", e.what()); - failed_start = true; - } catch (...) { - spdlog::error("Failed to initialize FFTW indexer"); - failed_start = true; - } -#endif - - workers_ready.count_down(); - - while (true) { - TaskPackage task; { std::unique_lock lock(m); - - // Add a timeout to the wait to ensure we can exit even if no notification - cond.wait_for(lock, std::chrono::milliseconds(50), [this] { - return stop || !taskQueue.empty(); - }); - - // Check for exit conditions - if (stop && taskQueue.empty()) - return; // Exit cleanly - - if (!taskQueue.empty()) { - task = std::move(taskQueue.front()); - taskQueue.pop(); - } else { - continue; // No tasks, go back to waiting - } + state = TaskState::ERROR; } - try { - IndexerResult result; + c_running.notify_all(); + return; + } catch (...) { + spdlog::error("Failed to initialize FFTW indexer"); + { + std::unique_lock lock(m); + state = TaskState::ERROR; + } + c_running.notify_all(); + return; + } +#endif + { + std::unique_lock lock(m); + state = TaskState::IDLE; + } + c_running.notify_all(); - auto algorithm = task.experiment->GetIndexingAlgorithm(); - Indexer *indexer = nullptr; + while (true) { + std::unique_ptr input; + // Look for task + handle stop + { + std::unique_lock lock(m); + c_start.wait(lock, [this] { return stop || state == TaskState::READY; }); + if (stop) + return; + input = std::move(task_input); + } + if (input) { + std::unique_ptr tmp_result; + try { + auto algorithm = input->experiment.GetIndexingAlgorithm(); + Indexer *indexer = nullptr; - if (algorithm == IndexingAlgorithmEnum::FFT && fft_indexer) { - indexer = fft_indexer.get(); - } else if (algorithm == IndexingAlgorithmEnum::FFBIDX && ffbidx_indexer) { - indexer = ffbidx_indexer.get(); - } else if (algorithm == IndexingAlgorithmEnum::FFTW && fftw_indexer) { - indexer = fftw_indexer.get(); - } else if (algorithm == IndexingAlgorithmEnum::Auto) { - throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, - "Internal error: Invalid indexing algorithm provided"); + if (algorithm == IndexingAlgorithmEnum::FFT && fft_indexer) { + indexer = fft_indexer.get(); + } else if (algorithm == IndexingAlgorithmEnum::FFBIDX && ffbidx_indexer) { + indexer = ffbidx_indexer.get(); + } else if (algorithm == IndexingAlgorithmEnum::FFTW && fftw_indexer) { + indexer = fftw_indexer.get(); + } else if (algorithm == IndexingAlgorithmEnum::Auto) { + throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, + "Internal error: Invalid indexing algorithm provided"); + } + + if (indexer) { + indexer->Setup(input->experiment); + tmp_result = std::make_unique(indexer->Run(input->recip)); + } + } catch (std::exception &e) { + tmp_result = nullptr; + spdlog::error("Indexer thread {} failed: {}", threadid, e.what()); } - - if (indexer) { - indexer->Setup(*task.experiment); - result = indexer->Run(*task.recip); + { + std::unique_lock lock(m); + state = TaskState::COMPLETED; + result = std::move(tmp_result); + c_done.notify_all(); } - - // Set the result via the promise - if (task.promise) { - task.promise->set_value(result); - } - } catch (std::exception &e) { - if (task.promise) - task.promise->set_exception(std::current_exception()); } } } + +void IndexerThread::Finalize() { + { + std::unique_lock lock(m); + stop = true; + } + c_start.notify_one(); + if (worker_thread.joinable()) + worker_thread.join(); +} + +std::unique_ptr IndexerThread::Run(const DiffractionExperiment &experiment, + const std::vector &recip) { + std::unique_ptr tmp_result; + { + std::unique_lock lock(m); + if (stop) + return nullptr; + if (state != TaskState::IDLE) + return nullptr; + task_input = std::make_unique(std::cref(experiment), std::cref(recip)); + state = TaskState::READY; + } + c_start.notify_one(); + { + std::unique_lock lock(m); + c_done.wait(lock, [this] { return state == TaskState::COMPLETED; }); + tmp_result = std::move(result); + state = TaskState::IDLE; + } + return std::move(tmp_result); +} + +IndexerThread::~IndexerThread() { + Finalize(); +} + +IndexerThreadPool::IndexerThreadPool(const IndexingSettings &settings) + : worker_busy(settings.GetIndexingThreads(), 0), + viable_cell_min_spots(settings.GetViableCellMinSpots()) { + for (size_t i = 0; i < settings.GetIndexingThreads(); ++i) + tasks.emplace_back(std::make_unique(std::cref(settings), i)); +} + + +IndexerResult IndexerThreadPool::Run(const DiffractionExperiment &experiment, const std::vector &recip) { + if (experiment.GetIndexingAlgorithm() == IndexingAlgorithmEnum::None) + return IndexerResult{.lattice = {}, .indexing_time_s = 0}; + if (recip.size() < viable_cell_min_spots) + return IndexerResult{.lattice = {}, .indexing_time_s = 0}; + + // Check if there is available worker + int task = -1; + { + std::unique_lock lock(m); + for (int i = 0; i < tasks.size(); i++) { + if (worker_busy[i] == 0) { + task = i; + worker_busy[i] = 1; + break; + } + } + } + + std::unique_ptr result; + if (task >= 0) { + result = tasks[task]->Run(experiment, recip); + { + std::unique_lock lock(m); + worker_busy[task] = 0; + } + } + if (result) + return *result; + return IndexerResult{.lattice = {}, .indexing_time_s = 0}; +} diff --git a/image_analysis/indexing/IndexerThreadPool.h b/image_analysis/indexing/IndexerThreadPool.h index f4c42d75..efccc369 100644 --- a/image_analysis/indexing/IndexerThreadPool.h +++ b/image_analysis/indexing/IndexerThreadPool.h @@ -21,31 +21,40 @@ #include "../common/NUMAHWPolicy.h" #include "Indexer.h" -class IndexerThreadPool { - std::atomic failed_start = false; - struct TaskPackage { - std::shared_ptr> promise; - const DiffractionExperiment *experiment; - const std::vector *recip; +class IndexerThread { + struct TaskInput { + const DiffractionExperiment &experiment; + const std::vector &recip; }; - std::vector workers; - + bool stop = false; + enum class TaskState {STARTING, IDLE, READY, COMPLETED, ERROR} state = TaskState::STARTING; std::mutex m; - std::condition_variable cond; - std::queue taskQueue; - std::latch workers_ready; + std::condition_variable c_running; + std::condition_variable c_start; + std::condition_variable c_done; + std::unique_ptr result = nullptr; + std::unique_ptr task_input = nullptr; + std::thread worker_thread; + void Worker(const IndexingSettings& settings, int threadid); +public: + IndexerThread(const IndexingSettings& settings, int threadid); + ~IndexerThread(); + std::unique_ptr Run(const DiffractionExperiment &experiment, const std::vector &recip); + void Finalize(); +}; + +class IndexerThreadPool { + std::mutex m; + std::vector worker_busy; + std::vector> tasks; const int64_t viable_cell_min_spots; - bool stop; - void Worker(int32_t threadIndex, const NUMAHWPolicy &numa_policy, const IndexingSettings& settings); public: - IndexerThreadPool(const IndexingSettings& settings, const NUMAHWPolicy &numa_policy = NUMAHWPolicy()); - ~IndexerThreadPool(); - + IndexerThreadPool(const IndexingSettings& settings); IndexerResult Run(const DiffractionExperiment& experiment, const std::vector& recip); }; diff --git a/receiver/JFJochReceiverService.cpp b/receiver/JFJochReceiverService.cpp index 79151fae..e44655f3 100644 --- a/receiver/JFJochReceiverService.cpp +++ b/receiver/JFJochReceiverService.cpp @@ -329,7 +329,7 @@ JFJochReceiverService &JFJochReceiverService::Indexing(const IndexingSettings &i if (input.GetAlgorithm() != IndexingAlgorithmEnum::None) { logger.Info("Creating indexing thread pool..."); - indexer_thread_pool = std::make_unique(input, numa_policy); + indexer_thread_pool = std::make_unique(input); logger.Info(" ... done"); } return *this; -- 2.52.0 From 1b06ecfad68c125ea82067d0e7c3d35113a231c0 Mon Sep 17 00:00:00 2001 From: leonarski_f Date: Thu, 16 Apr 2026 22:45:44 +0200 Subject: [PATCH 06/28] IndexerThreadPool: Minor fixes --- image_analysis/indexing/IndexerThreadPool.cpp | 45 +++++++++++++------ image_analysis/indexing/IndexerThreadPool.h | 9 ++-- receiver/JFJochReceiverService.cpp | 2 +- 3 files changed, 37 insertions(+), 19 deletions(-) diff --git a/image_analysis/indexing/IndexerThreadPool.cpp b/image_analysis/indexing/IndexerThreadPool.cpp index 3d57769f..7da37f55 100644 --- a/image_analysis/indexing/IndexerThreadPool.cpp +++ b/image_analysis/indexing/IndexerThreadPool.cpp @@ -180,13 +180,33 @@ IndexerThread::~IndexerThread() { Finalize(); } -IndexerThreadPool::IndexerThreadPool(const IndexingSettings &settings) +IndexerThreadPool::IndexerThreadPool(const IndexingSettings &settings, bool blocking) : worker_busy(settings.GetIndexingThreads(), 0), - viable_cell_min_spots(settings.GetViableCellMinSpots()) { + worker_free_count(settings.GetIndexingThreads()), + viable_cell_min_spots(settings.GetViableCellMinSpots()), + blocking(blocking) { for (size_t i = 0; i < settings.GetIndexingThreads(); ++i) tasks.emplace_back(std::make_unique(std::cref(settings), i)); } +int IndexerThreadPool::GetFreeWorker() { + std::unique_lock lock(m); + + if (tasks.size() == 0) + return -1; + + if (blocking) + c.wait(lock, [this] { return worker_free_count > 0; }); + + for (int i = 0; i < tasks.size(); i++) { + if (worker_busy[i] == 0) { + worker_busy[i] = 1; + worker_free_count--; + return i; + } + } + return -1; +} IndexerResult IndexerThreadPool::Run(const DiffractionExperiment &experiment, const std::vector &recip) { if (experiment.GetIndexingAlgorithm() == IndexingAlgorithmEnum::None) @@ -195,25 +215,22 @@ IndexerResult IndexerThreadPool::Run(const DiffractionExperiment &experiment, co return IndexerResult{.lattice = {}, .indexing_time_s = 0}; // Check if there is available worker - int task = -1; - { - std::unique_lock lock(m); - for (int i = 0; i < tasks.size(); i++) { - if (worker_busy[i] == 0) { - task = i; - worker_busy[i] = 1; - break; - } - } - } + const int task = GetFreeWorker(); std::unique_ptr result; if (task >= 0) { - result = tasks[task]->Run(experiment, recip); + try { + result = tasks[task]->Run(experiment, recip); + } catch (const std::exception &e) { + spdlog::error("Indexer thread failed: {}", e.what()); + result = nullptr; + } { std::unique_lock lock(m); worker_busy[task] = 0; + worker_free_count++; } + c.notify_one(); } if (result) return *result; diff --git a/image_analysis/indexing/IndexerThreadPool.h b/image_analysis/indexing/IndexerThreadPool.h index efccc369..7f5fa978 100644 --- a/image_analysis/indexing/IndexerThreadPool.h +++ b/image_analysis/indexing/IndexerThreadPool.h @@ -21,8 +21,6 @@ #include "../common/NUMAHWPolicy.h" #include "Indexer.h" - - class IndexerThread { struct TaskInput { const DiffractionExperiment &experiment; @@ -49,12 +47,15 @@ public: class IndexerThreadPool { std::mutex m; + std::condition_variable c; std::vector worker_busy; + size_t worker_free_count; std::vector> tasks; const int64_t viable_cell_min_spots; - + const bool blocking; + int GetFreeWorker(); public: - IndexerThreadPool(const IndexingSettings& settings); + IndexerThreadPool(const IndexingSettings& settings, bool blocking = true); IndexerResult Run(const DiffractionExperiment& experiment, const std::vector& recip); }; diff --git a/receiver/JFJochReceiverService.cpp b/receiver/JFJochReceiverService.cpp index e44655f3..a320cbe3 100644 --- a/receiver/JFJochReceiverService.cpp +++ b/receiver/JFJochReceiverService.cpp @@ -329,7 +329,7 @@ JFJochReceiverService &JFJochReceiverService::Indexing(const IndexingSettings &i if (input.GetAlgorithm() != IndexingAlgorithmEnum::None) { logger.Info("Creating indexing thread pool..."); - indexer_thread_pool = std::make_unique(input); + indexer_thread_pool = std::make_unique(input, false); logger.Info(" ... done"); } return *this; -- 2.52.0 From e4fbaa44408e7c15284c9ce687c7b45f9d763e4e Mon Sep 17 00:00:00 2001 From: leonarski_f Date: Thu, 16 Apr 2026 22:56:15 +0200 Subject: [PATCH 07/28] OpenAPI: Add blocking indexing thread pool option --- broker/OpenAPIConvert.cpp | 2 ++ broker/gen/model/Indexing_settings.cpp | 31 ++++++++++++++++++- broker/gen/model/Indexing_settings.h | 9 ++++++ broker/jfjoch_api.yaml | 7 +++++ broker/redoc-static.html | 13 +++++--- common/IndexingSettings.cpp | 9 ++++++ common/IndexingSettings.h | 3 ++ docs/python_client/docs/IndexingSettings.md | 1 + frontend/package-lock.json | 4 +-- frontend/src/components/IndexingSettings.tsx | 18 +++++++++-- .../src/openapi/models/indexing_settings.ts | 7 +++++ image_analysis/indexing/IndexerThreadPool.cpp | 4 +-- image_analysis/indexing/IndexerThreadPool.h | 2 +- receiver/JFJochReceiverService.cpp | 2 +- 14 files changed, 98 insertions(+), 14 deletions(-) diff --git a/broker/OpenAPIConvert.cpp b/broker/OpenAPIConvert.cpp index 495529a2..6cfb7aa7 100644 --- a/broker/OpenAPIConvert.cpp +++ b/broker/OpenAPIConvert.cpp @@ -952,6 +952,7 @@ IndexingSettings Convert(const org::openapitools::server::model::Indexing_settin ret.RotationIndexing(input.isRotationIndexing()); ret.RotationIndexingAngularStride_deg(input.getRotationIndexingAngularStrideDeg()); ret.RotationIndexingMinAngularRange_deg(input.getRotationIndexingMinAngularRangeDeg()); + ret.BlockingBehavior(input.isBlocking()); return ret; } @@ -969,6 +970,7 @@ org::openapitools::server::model::Indexing_settings Convert(const IndexingSettin ret.setRotationIndexing(input.GetRotationIndexing()); ret.setRotationIndexingAngularStrideDeg(input.GetRotationIndexingAngularStride_deg()); ret.setRotationIndexingMinAngularRangeDeg(input.GetRotationIndexingMinAngularRange_deg()); + ret.setBlocking(input.GetBlockingBehavior()); org::openapitools::server::model::Geom_refinement_algorithm refinement; switch (input.GetGeomRefinementAlgorithm()) { diff --git a/broker/gen/model/Indexing_settings.cpp b/broker/gen/model/Indexing_settings.cpp index 8de04fb1..a797c2aa 100644 --- a/broker/gen/model/Indexing_settings.cpp +++ b/broker/gen/model/Indexing_settings.cpp @@ -33,6 +33,8 @@ Indexing_settings::Indexing_settings() m_Rotation_indexing = false; m_Rotation_indexing_min_angular_range_deg = 20.0f; m_Rotation_indexing_angular_stride_deg = 0.5f; + m_Blocking = false; + m_BlockingIsSet = false; } @@ -225,7 +227,7 @@ bool Indexing_settings::validate(std::stringstream& msg, const std::string& path } } - + return success; } @@ -274,8 +276,11 @@ bool Indexing_settings::operator==(const Indexing_settings& rhs) const && (getRotationIndexingAngularStrideDeg() == rhs.getRotationIndexingAngularStrideDeg()) + && + ((!blockingIsSet() && !rhs.blockingIsSet()) || (blockingIsSet() && rhs.blockingIsSet() && isBlocking() == rhs.isBlocking())) + ; } @@ -301,6 +306,8 @@ void to_json(nlohmann::json& j, const Indexing_settings& o) j["rotation_indexing"] = o.m_Rotation_indexing; j["rotation_indexing_min_angular_range_deg"] = o.m_Rotation_indexing_min_angular_range_deg; j["rotation_indexing_angular_stride_deg"] = o.m_Rotation_indexing_angular_stride_deg; + if(o.blockingIsSet()) + j["blocking"] = o.m_Blocking; } @@ -320,6 +327,11 @@ void from_json(const nlohmann::json& j, Indexing_settings& o) j.at("rotation_indexing").get_to(o.m_Rotation_indexing); j.at("rotation_indexing_min_angular_range_deg").get_to(o.m_Rotation_indexing_min_angular_range_deg); j.at("rotation_indexing_angular_stride_deg").get_to(o.m_Rotation_indexing_angular_stride_deg); + if(j.find("blocking") != j.end()) + { + j.at("blocking").get_to(o.m_Blocking); + o.m_BlockingIsSet = true; + } } @@ -435,6 +447,23 @@ void Indexing_settings::setRotationIndexingAngularStrideDeg(float const value) { m_Rotation_indexing_angular_stride_deg = value; } +bool Indexing_settings::isBlocking() const +{ + return m_Blocking; +} +void Indexing_settings::setBlocking(bool const value) +{ + m_Blocking = value; + m_BlockingIsSet = true; +} +bool Indexing_settings::blockingIsSet() const +{ + return m_BlockingIsSet; +} +void Indexing_settings::unsetBlocking() +{ + m_BlockingIsSet = false; +} } // namespace org::openapitools::server::model diff --git a/broker/gen/model/Indexing_settings.h b/broker/gen/model/Indexing_settings.h index 41673aaa..68ca3bb8 100644 --- a/broker/gen/model/Indexing_settings.h +++ b/broker/gen/model/Indexing_settings.h @@ -129,6 +129,13 @@ public: /// float getRotationIndexingAngularStrideDeg() const; void setRotationIndexingAngularStrideDeg(float const value); + /// + /// Indexing in Jungfraujoch goes with a dedicated thread pool. If set to false, the thread pool is non-blocking, i.e. if there are no threads available, image indexing will be skipped. If set to true, the thread pool will block until a thread is available. + /// + bool isBlocking() const; + void setBlocking(bool const value); + bool blockingIsSet() const; + void unsetBlocking(); friend void to_json(nlohmann::json& j, const Indexing_settings& o); friend void from_json(const nlohmann::json& j, Indexing_settings& o); @@ -161,6 +168,8 @@ protected: float m_Rotation_indexing_angular_stride_deg; + bool m_Blocking; + bool m_BlockingIsSet; }; diff --git a/broker/jfjoch_api.yaml b/broker/jfjoch_api.yaml index 50c2d28c..883c0410 100644 --- a/broker/jfjoch_api.yaml +++ b/broker/jfjoch_api.yaml @@ -1917,6 +1917,13 @@ components: format: float default: 0.5 minimum: 0 + blocking: + type: boolean + default: false + description: | + Indexing in Jungfraujoch goes with a dedicated thread pool. + If set to false, the thread pool is non-blocking, i.e. if there are no threads available, image indexing will be skipped. + If set to true, the thread pool will block until a thread is available. instrument_metadata: type: object description: "Metadata for a measurement instrument" diff --git a/broker/redoc-static.html b/broker/redoc-static.html index 9dfe8c63..79a96aa0 100644 --- a/broker/redoc-static.html +++ b/broker/redoc-static.html @@ -582,14 +582,17 @@ This option is using non-linear least squares optimization to find unit cell and viable_cell_min_spots
required
integer <int64> >= 5
Default: 10

Minimum number of indexed spots required for a cell to be considered viable

index_ice_rings
required
boolean
Default: false

Include spots marked as ice rings in the indexing run. If dataset_settings doesn't have detect_ice_rings on, this option will have no effect on processing.

-
rotation_indexing
required
boolean
Default: false
rotation_indexing_min_angular_range_deg
required
number <float> >= 1
Default: 20
rotation_indexing_angular_stride_deg
required
number <float> >= 0
Default: 0.5

Responses

rotation_indexing
required
boolean
Default: false
rotation_indexing_min_angular_range_deg
required
number <float> >= 1
Default: 20
rotation_indexing_angular_stride_deg
required
number <float> >= 0
Default: 0.5
blocking
boolean
Default: false

Indexing in Jungfraujoch goes with a dedicated thread pool. +If set to false, the thread pool is non-blocking, i.e. if there are no threads available, image indexing will be skipped. +If set to true, the thread pool will block until a thread is available.

+

Responses

Request samples

Content type
application/json
{
  • "algorithm": "FFBIDX",
  • "fft_max_unit_cell_A": 250,
  • "fft_min_unit_cell_A": 10,
  • "fft_high_resolution_A": 2,
  • "fft_num_vectors": 16384,
  • "tolerance": 0.5,
  • "thread_count": 1,
  • "geom_refinement_algorithm": "BeamCenter",
  • "unit_cell_dist_tolerance": 0.05,
  • "viable_cell_min_spots": 10,
  • "index_ice_rings": false,
  • "rotation_indexing": false,
  • "rotation_indexing_min_angular_range_deg": 20,
  • "rotation_indexing_angular_stride_deg": 0.5
}

Response samples

Content type
application/json
{
  • "msg": "Detector in wrong state",
  • "reason": "WrongDAQState"
}

Get indexing configuration

Can be done anytime

+
http://localhost:5232/config/indexing

Request samples

Content type
application/json
{
  • "algorithm": "FFBIDX",
  • "fft_max_unit_cell_A": 250,
  • "fft_min_unit_cell_A": 10,
  • "fft_high_resolution_A": 2,
  • "fft_num_vectors": 16384,
  • "tolerance": 0.5,
  • "thread_count": 1,
  • "geom_refinement_algorithm": "BeamCenter",
  • "unit_cell_dist_tolerance": 0.05,
  • "viable_cell_min_spots": 10,
  • "index_ice_rings": false,
  • "rotation_indexing": false,
  • "rotation_indexing_min_angular_range_deg": 20,
  • "rotation_indexing_angular_stride_deg": 0.5,
  • "blocking": false
}

Response samples

Content type
application/json
{
  • "msg": "Detector in wrong state",
  • "reason": "WrongDAQState"
}

Get indexing configuration

Can be done anytime

Responses

Response samples

Content type
application/json
{
  • "algorithm": "FFBIDX",
  • "fft_max_unit_cell_A": 250,
  • "fft_min_unit_cell_A": 10,
  • "fft_high_resolution_A": 2,
  • "fft_num_vectors": 16384,
  • "tolerance": 0.5,
  • "thread_count": 1,
  • "geom_refinement_algorithm": "BeamCenter",
  • "unit_cell_dist_tolerance": 0.05,
  • "viable_cell_min_spots": 10,
  • "index_ice_rings": false,
  • "rotation_indexing": false,
  • "rotation_indexing_min_angular_range_deg": 20,
  • "rotation_indexing_angular_stride_deg": 0.5
}

Change file writer settings

This can only be done when detector is Idle, Error or Inactive states.

+
http://localhost:5232/config/indexing

Response samples

Content type
application/json
{
  • "algorithm": "FFBIDX",
  • "fft_max_unit_cell_A": 250,
  • "fft_min_unit_cell_A": 10,
  • "fft_high_resolution_A": 2,
  • "fft_num_vectors": 16384,
  • "tolerance": 0.5,
  • "thread_count": 1,
  • "geom_refinement_algorithm": "BeamCenter",
  • "unit_cell_dist_tolerance": 0.05,
  • "viable_cell_min_spots": 10,
  • "index_ice_rings": false,
  • "rotation_indexing": false,
  • "rotation_indexing_min_angular_range_deg": 20,
  • "rotation_indexing_angular_stride_deg": 0.5,
  • "blocking": false
}

Change file writer settings

This can only be done when detector is Idle, Error or Inactive states.

Request Body schema: application/json
overwrite
boolean
Default: false

Inform jfjoch_write to overwrite existing files. Otherwise files would be saved with .h5.{timestamp}.tmp suffix.

format
string (file_writer_format)
Default: "NXmxLegacy"
Enum: "NXmxOnlyData" "NXmxLegacy" "NXmxVDS" "NXmxIntegrated" "CBF" "TIFF" "NoFileWritten"

NoFileWritten - no files are written at all NXmxOnlyData - only data files are written, no master file @@ -796,7 +799,7 @@ This can only be done when detector is Idle, Error or

Request samples

Content type
application/json
{
  • "box": {
    },
  • "circle": {
    },
  • "azim": {
    }
}

Response samples

Content type
application/json
{
  • "msg": "Detector in wrong state",
  • "reason": "WrongDAQState"
}

Get general statistics

Responses

Response samples

Content type
application/json
{
  • "detector": {
    },
  • "detector_list": {
    },
  • "detector_settings": {
    },
  • "image_format_settings": {
    },
  • "instrument_metadata": {
    },
  • "file_writer_settings": {
    },
  • "data_processing_settings": {
    },
  • "measurement": {
    },
  • "broker": {
    },
  • "fpga": [
    ],
  • "calibration": [
    ],
  • "zeromq_preview": {
    },
  • "zeromq_metadata": {
    },
  • "dark_mask": {
    },
  • "pixel_mask": {
    },
  • "roi": {
    },
  • "az_int": {
    },
  • "buffer": {
    },
  • "indexing": {
    },
  • "image_pusher": {
    }
}

Get data collection statistics

Results of the last data collection

+
http://localhost:5232/statistics

Response samples

Content type
application/json
{
  • "detector": {
    },
  • "detector_list": {
    },
  • "detector_settings": {
    },
  • "image_format_settings": {
    },
  • "instrument_metadata": {
    },
  • "file_writer_settings": {
    },
  • "data_processing_settings": {
    },
  • "measurement": {
    },
  • "broker": {
    },
  • "fpga": [
    ],
  • "calibration": [
    ],
  • "zeromq_preview": {
    },
  • "zeromq_metadata": {
    },
  • "dark_mask": {
    },
  • "pixel_mask": {
    },
  • "roi": {
    },
  • "az_int": {
    },
  • "buffer": {
    },
  • "indexing": {
    },
  • "image_pusher": {
    }
}

Get data collection statistics

Results of the last data collection

Responses