From bb89f2b6412bcd062d88e7570cff338f39b6e0de Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Sun, 11 Jan 2026 15:06:27 +0100 Subject: [PATCH] jfjoch_process: First iteration --- tools/CMakeLists.txt | 4 + tools/jfjoch_process.cpp | 328 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 332 insertions(+) create mode 100644 tools/jfjoch_process.cpp diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index eeb04752..424ec962 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -43,4 +43,8 @@ ADD_EXECUTABLE(jfjoch_extract_hkl jfjoch_extract_hkl.cpp) TARGET_LINK_LIBRARIES(jfjoch_extract_hkl JFJochReader) INSTALL(TARGETS jfjoch_extract_hkl RUNTIME COMPONENT viewer) +ADD_EXECUTABLE(jfjoch_process jfjoch_process.cpp) +TARGET_LINK_LIBRARIES(jfjoch_process JFJochReader JFJochImageAnalysis JFJochWriter JFJochReceiver) +INSTALL(TARGETS jfjoch_process RUNTIME COMPONENT viewer) + ADD_SUBDIRECTORY(xbflash.qspi) diff --git a/tools/jfjoch_process.cpp b/tools/jfjoch_process.cpp new file mode 100644 index 00000000..0b891fd3 --- /dev/null +++ b/tools/jfjoch_process.cpp @@ -0,0 +1,328 @@ +// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute +// SPDX-License-Identifier: GPL-3.0-only + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../reader/JFJochHDF5Reader.h" +#include "../common/Logger.h" +#include "../common/DiffractionExperiment.h" +#include "../common/PixelMask.h" +#include "../common/AzimuthalIntegration.h" +#include "../common/time_utc.h" +#include "../common/print_license.h" +#include "../image_analysis/MXAnalysisWithoutFPGA.h" +#include "../image_analysis/indexing/IndexerFactory.h" +#include "../writer/FileWriter.h" +#include "../image_analysis/IndexAndRefine.h" +#include "../receiver/JFJochReceiverPlots.h" + +void print_usage(Logger &logger) { + logger.Info("Usage ./jfjoch_analysis {} "); + logger.Info("Options:"); + logger.Info(" -o Output file prefix (default: output)"); + logger.Info(" -N Number of threads (default: 1)"); + logger.Info(" -s Start image number (default: 0)"); + logger.Info(" -e End image number (default: all)"); + logger.Info(" -v Verbose output"); +} + +int main(int argc, char **argv) { + print_license("jfjoch_analysis"); + + Logger logger("jfjoch_analysis"); + + std::string input_file; + std::string output_prefix = "output"; + int nthreads = 1; + int start_image = 0; + int end_image = -1; // -1 indicates process until end + bool verbose = false; + + if (argc == 1) { + print_usage(logger); + exit(EXIT_FAILURE); + } + + int opt; + while ((opt = getopt(argc, argv, "o:N:s:e:v")) != -1) { + switch (opt) { + case 'o': + output_prefix = optarg; + break; + case 'N': + nthreads = atoi(optarg); + break; + case 's': + start_image = atoi(optarg); + break; + case 'e': + end_image = atoi(optarg); + break; + case 'v': + verbose = true; + break; + default: + print_usage(logger); + exit(EXIT_FAILURE); + } + } + + if (optind != argc - 1) { + logger.Error("Input file not specified"); + print_usage(logger); + exit(EXIT_FAILURE); + } + + input_file = argv[optind]; + logger.Verbose(verbose); + + // 1. Read Input File + JFJochHDF5Reader reader; + try { + reader.ReadFile(input_file); + } catch (const std::exception &e) { + logger.Error("Error reading input file: {}", e.what()); + exit(EXIT_FAILURE); + } + + const auto dataset = reader.GetDataset(); + if (!dataset) { + logger.Error("No experiment dataset found in the input file"); + exit(EXIT_FAILURE); + } + + logger.Info("Loaded dataset from {}", input_file); + + // 2. Setup Experiment & Components + DiffractionExperiment experiment(dataset->experiment); + experiment.FilePrefix(output_prefix); + experiment.Mode(DetectorMode::Standard); // Ensure full image analysis + + // Configure Indexing + IndexingSettings indexing_settings; + indexing_settings.Algorithm(IndexingAlgorithmEnum::FFT); + indexing_settings.RotationIndexing(true); + indexing_settings.GeomRefinementAlgorithm(GeomRefinementAlgorithmEnum::BeamCenter); + experiment.ImportIndexingSettings(indexing_settings); + + SpotFindingSettings spot_settings; + spot_settings.enable = true; + spot_settings.indexing = true; + + // Initialize Analysis Components + // Note: dataset->pixel_mask might need conversion depending on type, assuming it fits constructor + PixelMask pixel_mask(experiment); + // If dataset has a mask you wish to use, you might need to load it into pixel_mask here + // e.g. pixel_mask.LoadUserMask(dataset->pixel_mask, ...); + + AzimuthalIntegration mapping(experiment, pixel_mask); + IndexerThreadPool indexer_pool(experiment.GetIndexingSettings()); + + // Statistics collector + JFJochReceiverPlots plots; + plots.Setup(experiment, mapping); + + // 3. Setup FileWriter + StartMessage start_message; + experiment.FillMessage(start_message); + start_message.arm_date = dataset->arm_date; // Use original arm date + start_message.az_int_bin_to_q = mapping.GetBinToQ(); + start_message.az_int_q_bin_count = mapping.GetQBinCount(); + + std::unique_ptr writer; + try { + writer = std::make_unique(start_message); + } catch (const std::exception &e) { + logger.Error("Failed to initialize file writer: {}", e.what()); + exit(EXIT_FAILURE); + } + + // 4. Processing Setup + int total_images_in_file = reader.GetNumberOfImages(); + if (end_image < 0 || end_image > total_images_in_file) + end_image = total_images_in_file; + + int images_to_process = end_image - start_image; + + if (images_to_process <= 0) { + logger.Warning("No images to process (Start: {}, End: {}, Total: {})", start_image, end_image, total_images_in_file); + return 0; + } + + logger.Info("Starting analysis of {} images (range {}-{}) using {} threads", + images_to_process, start_image, end_image, nthreads); + + std::mutex reader_mutex; + std::mutex plots_mutex; // Protect shared plot/stats updates if they aren't thread-safe + + std::atomic processed_count = 0; + std::atomic total_uncompressed_bytes = 0; + std::atomic max_image_number_sent = 0; + + // Mimic JFJochReceiver lattice handling (IndexAndRefine handles the logic per thread, + // but we need a central accumulator or use the pool's functionality if IndexAndRefine wraps it) + // Here we will use per-thread IndexAndRefine which uses the shared thread pool. + + auto start_time = std::chrono::steady_clock::now(); + + auto worker = [&](int thread_id) { + // Thread-local analysis resources + IndexAndRefine indexer(experiment, &indexer_pool); + MXAnalysisWithoutFPGA analysis(experiment, mapping, pixel_mask, indexer); + + std::vector decomp_buffer; + AzimuthalIntegrationProfile profile(mapping); + + while (true) { + int current_idx_offset = processed_count.fetch_add(1); + int image_idx = start_image + current_idx_offset; + + if (image_idx >= end_image) break; + + // Load Image + std::shared_ptr img; + { + std::lock_guard lock(reader_mutex); + try { + img = reader.LoadImage(image_idx); + } catch (const std::exception& e) { + logger.Error("Failed to load image {}: {}", image_idx, e.what()); + continue; + } + } + + if (!img) continue; + + DataMessage msg = img->ImageData(); + total_uncompressed_bytes += msg.image.GetUncompressedSize(); + + auto image_start_time = std::chrono::high_resolution_clock::now(); + + // Analyze + try { + analysis.Analyze(msg, decomp_buffer, profile, spot_settings); + } catch (const std::exception& e) { + logger.Error("Error analyzing image {}: {}", image_idx, e.what()); + continue; + } + + auto image_end_time = std::chrono::high_resolution_clock::now(); + std::chrono::duration image_duration = image_end_time - image_start_time; + + // Mimic DataMessage stats from Receiver + msg.processing_time_s = image_duration.count(); + msg.original_number = msg.number; + msg.run_number = experiment.GetRunNumber(); + msg.run_name = experiment.GetRunName(); + // msg.receiver_free_send_buf <- Not relevant for file analysis + // msg.receiver_aq_dev_delay <- Not relevant for file analysis + + // Update Plots/Stats (Thread safe update needed) + { + std::lock_guard lock(plots_mutex); + plots.Add(msg, profile); + } + + // Write Result + writer->Write(msg); + + // Update max sent tracking + uint64_t current_max = max_image_number_sent.load(); + while (static_cast(msg.number) > current_max) { + if (max_image_number_sent.compare_exchange_weak(current_max, static_cast(msg.number))) + break; + } + + // Progress log + if (current_idx_offset > 0 && current_idx_offset % 100 == 0) + logger.Info("Processed {} / {} images", current_idx_offset, images_to_process); + } + + // Finalize per-thread indexing (if any per-thread aggregation is needed, though pool handles most) + // IndexAndRefine doesn't have a per-thread finalize that returns a lattice, + // the main lattice determination is usually done on the aggregated results in the pool or main thread + }; + + // Launch threads + std::vector> futures; + futures.reserve(nthreads); + for (int i = 0; i < nthreads; ++i) { + futures.push_back(std::async(std::launch::async, worker, i)); + } + + // Wait for completion + for (auto &f : futures) { + f.get(); + } + + auto end_time = std::chrono::steady_clock::now(); + + // 5. Finalize Statistics and Write EndMessage + EndMessage end_msg; + end_msg.max_image_number = max_image_number_sent; + end_msg.images_collected_count = images_to_process; + end_msg.images_sent_to_write_count = images_to_process; + end_msg.end_date = time_UTC(std::chrono::system_clock::now()); + end_msg.run_number = experiment.GetRunNumber(); + end_msg.run_name = experiment.GetRunName(); + + // Gather statistics from plots + end_msg.bkg_estimate = plots.GetBkgEstimate(); + end_msg.indexing_rate = plots.GetIndexingRate(); + end_msg.az_int_result["dataset"] = plots.GetAzIntProfile(); + + // Finalize Indexing (Global) to get rotation lattice + // We create a temporary IndexAndRefine to call Finalize() which aggregates pool results + IndexAndRefine global_indexer(experiment, &indexer_pool); + const auto rotation_indexer_ret = global_indexer.Finalize(); + + if (rotation_indexer_ret.has_value()) { + end_msg.rotation_lattice = rotation_indexer_ret->lattice; + end_msg.rotation_lattice_type = LatticeMessage{ + .centering = rotation_indexer_ret->search_result.centering, + .niggli_class = rotation_indexer_ret->search_result.niggli_class, + .crystal_system = rotation_indexer_ret->search_result.system + }; + logger.Info("Rotation Indexing found lattice"); + } + + // Write End Message + writer->WriteHDF5(end_msg); + auto stats = writer->Finalize(); + + // 6. Report Statistics to Console + double processing_time = std::chrono::duration(end_time - start_time).count(); + double throughput_MBs = static_cast(total_uncompressed_bytes) / (processing_time * 1e6); + double frame_rate = static_cast(images_to_process) / processing_time; + + logger.Info(""); + logger.Info("Processing time: {:.2f} s", processing_time); + logger.Info("Frame rate: {:.2f} Hz", frame_rate); + logger.Info("Total throughput:{:.2f} MB/s", throughput_MBs); + logger.Info("Images written: {}", stats.size()); + + // Print extended stats similar to Receiver + if (!end_msg.indexing_rate.has_value()) { + logger.Info("Indexing rate: {:.2f}%", end_msg.indexing_rate.value() * 100.0); + } + if (rotation_indexer_ret.has_value()) { + auto uc = rotation_indexer_ret->lattice.GetUnitCell(); + logger.Info("Lattice: a={:.2f} b={:.2f} c={:.2f} alpha={:.2f} beta={:.2f} gamma={:.2f}", + uc.a, uc.b, uc.c, uc.alpha, uc.beta, uc.gamma); + } + + if (stats.empty()) { + logger.Info("Analysis finished with warnings (no files written)"); + exit(EXIT_FAILURE); + } else { + logger.Info("Analysis successfully finished"); + exit(EXIT_SUCCESS); + } +}