// SPDX-FileCopyrightText: 2026 Filip Leonarski, Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-only #include "JFJochProcess.h" #include #include #include #include #include #include #include #include #include #include "../reader/JFJochHDF5Reader.h" #include "../common/Logger.h" #include "../common/AzimuthalIntegrationMapping.h" #include "../common/AzimuthalIntegrationProfile.h" #include "../common/CUDAWrapper.h" #include "../common/time_utc.h" #include "../writer/FileWriter.h" #include "../image_analysis/MXAnalysisWithoutFPGA.h" #include "../image_analysis/IndexAndRefine.h" #include "../image_analysis/indexing/IndexerThreadPool.h" #include "../image_analysis/azint/AzIntEngineCPU.h" #include "../image_analysis/image_preprocessing/ImagePreprocessorCPU.h" #include "../image_analysis/image_preprocessing/ImagePreprocessorBuffer.h" #include "../image_analysis/scale_merge/Merge.h" #include "../image_analysis/scale_merge/ScaleOnTheFly.h" #include "../image_analysis/scale_merge/ScalingResult.h" #include "../image_analysis/scale_merge/SearchSpaceGroup.h" #include "../image_analysis/scale_merge/TwinningAnalysis.h" #include "../image_analysis/scale_merge/Combine3D.h" #include "../image_analysis/WriteReflections.h" namespace { // Pick up to requested_images ordinals spread evenly across [0, images_to_process) for the // first pass of two-pass rotation indexing. std::vector select_equally_spaced_image_ordinals(int images_to_process, int requested_images) { std::vector ret; if (images_to_process <= 0 || requested_images <= 0) return ret; const int n = std::min(images_to_process, requested_images); if (n == 1) { ret.push_back(0); return ret; } std::set unique_ordinals; for (int i = 0; i < n; i++) unique_ordinals.insert(static_cast( std::llround(static_cast(i) * static_cast(images_to_process - 1) / static_cast(n - 1)))); ret.assign(unique_ordinals.begin(), unique_ordinals.end()); return ret; } // XDS-order scaling. The rot3d combine emits fulls with partiality == 1 (image_scale_corr == 1), // so they were only ever scaled as per-frame *partials* upstream - their per-frame scale is // entangled with the rocking-curve/partiality model. This refits a per-frame scale directly on // the complete reflections with the Unity model (no partiality term, G*Itrue - I_full), the way // XDS/DIALS scale 3D-integrated fulls. A pure post-correction: it updates image_scale_corr on the // fulls (1 -> 1/G) without re-combining. void ScaleFulls(const DiffractionExperiment &experiment, std::vector &fulls, int scaling_iter, size_t nthreads, Logger &logger) { DiffractionExperiment unity = experiment; ScalingSettings ss = unity.GetScalingSettings(); ss.SetPartialityModel(PartialityModel::Unity); unity.ImportScalingSettings(ss); for (int i = 0; i < scaling_iter; i++) { const auto reference = MergeAll(unity, fulls, false); ScaleOnTheFly(unity, reference).Scale(fulls, nthreads); } logger.Info("Scaled fulls (XDS order, Unity model)"); } // Smooth the per-frame scale G across frames before the rot3d combine. ScaleOnTheFly fits each // frame's G independently, so the few partials of one rocking event get inconsistent scales when // weight-summed; a centered moving average of log(G) over a small odd frame window removes that // jitter. Only G changed, so each reflection's image_scale_corr (proportional to 1/G) is rescaled // by g_old/g_smooth. Frames without a fitted G (n &outcomes, int window, Logger &logger) { const int n = static_cast(outcomes.size()); const int half = window / 2; std::vector g_smooth(n, NAN); for (int o = 0; o < n; o++) { double sum_log = 0.0; int count = 0; for (int j = std::max(0, o - half); j <= std::min(n - 1, o + half); j++) { const auto &g = outcomes[j].image_scale_g; if (g && std::isfinite(*g) && *g > 0.0) { sum_log += std::log(*g); count++; } } if (count > 0) g_smooth[o] = std::exp(sum_log / count); } size_t n_smoothed = 0; for (int o = 0; o < n; o++) { const auto &g_old = outcomes[o].image_scale_g; if (!g_old || !std::isfinite(*g_old) || *g_old <= 0.0 || !std::isfinite(g_smooth[o])) continue; const double factor = *g_old / g_smooth[o]; for (auto &r : outcomes[o].reflections) if (std::isfinite(r.image_scale_corr)) r.image_scale_corr = static_cast(r.image_scale_corr * factor); outcomes[o].image_scale_g = g_smooth[o]; n_smoothed++; } logger.Info("Smoothed per-frame scale G over a {}-frame window ({} of {} frames)", window, n_smoothed, n); } } JFJochProcess::JFJochProcess(JFJochHDF5Reader &reader, DiffractionExperiment experiment, PixelMask pixel_mask, ProcessConfig config) : reader_(reader), experiment_(std::move(experiment)), pixel_mask_(std::move(pixel_mask)), config_(std::move(config)) {} ProcessResult JFJochProcess::Run(JFJochProcessObserver *observer) { Logger logger("JFJochProcess"); ProcessResult result; const auto dataset = reader_.GetDataset(); if (!dataset) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "No experiment dataset found in the input file"); if (config_.stride <= 0) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Image stride must be positive"); const auto total_images_in_file = static_cast(reader_.GetNumberOfImages()); int end_image = config_.end_image; if (end_image < 0 || end_image > total_images_in_file) end_image = total_images_in_file; const int start_image = config_.start_image; const int images_to_process = (end_image - start_image) / config_.stride; if (images_to_process <= 0) { logger.Warning("No images to process (start {}, end {}, stride {}, total {})", start_image, end_image, config_.stride, total_images_in_file); return result; } const bool full = (config_.mode == ProcessMode::FullAnalysis); const bool write_files = !config_.output_prefix.empty(); // Output/runtime invariants. Algorithm settings (indexing, scaling, integration, polarization, // space group, unit cell, ...) are configured on experiment_ by the caller. experiment_.BitDepthImage(32).PixelSigned(true); experiment_.Mode(DetectorMode::Standard); experiment_.OverwriteExistingFiles(true); experiment_.FilePrefix(config_.output_prefix.empty() ? "output" : config_.output_prefix); experiment_.SetFileWriterFormat(FileWriterFormat::NXmxLegacy); experiment_.ImagesPerTrigger(images_to_process); experiment_.NumTriggers(1); if (full) experiment_.Compression(CompressionAlgorithm::BSHUF_LZ4); // The pipeline indexes images 0..N-1 within this run; if we process a sub-range/strided // selection, shift the goniometer so local index i maps to the angle of original image // start+i*stride (keeping the per-image rotation wedge), otherwise rotation angles would be // wrong for any start_image != 0. if (const auto g = experiment_.GetGoniometer(); g.has_value() && (start_image != 0 || config_.stride != 1)) { const float incr = g->GetIncrement_deg(); GoniometerAxis shifted(g->GetName(), g->GetStart_deg() + incr * static_cast(start_image), incr * static_cast(config_.stride), g->GetAxis(), g->GetHelicalStep()); shifted.ScreeningWedge(g->GetScreeningWedge().value_or(incr)); experiment_.Goniometer(shifted); } AzimuthalIntegrationMapping mapping(experiment_, pixel_mask_); JFJochReceiverPlots plots; plots.Setup(experiment_, mapping); // Output file (NXmxIntegrated master that links back to the original images). StartMessage start_message; experiment_.FillMessage(start_message); start_message.arm_date = dataset->arm_date; start_message.az_int_bin_to_q = mapping.GetBinToQ(); start_message.az_int_bin_to_two_theta = mapping.GetBinToTwoTheta(); start_message.az_int_q_bin_count = mapping.GetQBinCount(); start_message.az_int_phi_bin_count = mapping.GetAzimuthalBinCount(); if (mapping.GetAzimuthalBinCount() > 1) start_message.az_int_bin_to_phi = mapping.GetBinToPhi(); start_message.pixel_mask["default"] = pixel_mask_.GetMask(experiment_); if (full) { start_message.rois = experiment_.ROI().ExportMetadata(); if (!experiment_.ROI().empty()) start_message.roi_map = experiment_.ExportROIMap(); start_message.max_spot_count = experiment_.GetMaxSpotCount(); } start_message.master_suffix = "process"; start_message.file_format = FileWriterFormat::NXmxIntegrated; start_message.write_master_file = true; start_message.write_images = false; start_message.hdf5_source_data = reader_.GetHDF5DataSource(start_image, images_to_process); std::unique_ptr writer; if (write_files) writer = std::make_unique(start_message); logger.Info("Processing {} images (range {}-{}, stride {}) using {} threads [{}]", images_to_process, start_image, end_image, config_.stride, config_.nthreads, full ? "full analysis" : "azimuthal integration"); if (observer) observer->OnPhase(full ? "Full analysis" : "Azimuthal integration"); // Full-analysis shared engines. std::unique_ptr indexer_pool; std::unique_ptr indexer; if (full) { indexer_pool = std::make_unique(experiment_.GetIndexingSettings()); indexer = std::make_unique(experiment_, indexer_pool.get()); if (!config_.reference_data.empty()) indexer->ReferenceIntensities(config_.reference_data); } const auto start_time = std::chrono::steady_clock::now(); // First pass of two-pass rotation indexing (full analysis only). if (full && config_.forced_rotation_lattice.has_value()) { indexer->ForceRotationIndexerLattice(*config_.forced_rotation_lattice); logger.Info("Rotation indexer lattice forced externally - skipping first pass"); } else if (full && config_.rotation_indexing && config_.two_pass_rotation) { if (observer) observer->OnPhase("Rotation indexing (first pass)"); const auto selected = select_equally_spaced_image_ordinals(images_to_process, config_.rotation_indexing_image_count); logger.Info("First-pass rotation indexing using {} images{}", selected.size(), config_.reuse_rotation_spots ? " and stored spots" : ""); for (const int ordinal: selected) { if (cancelled_) break; const int image_idx = start_image + ordinal * config_.stride; DataMessage msg{}; msg.number = ordinal; // index into the rotation indexer (0..images_to_process-1) msg.original_number = image_idx; try { if (config_.reuse_rotation_spots) { msg.spots = reader_.ReadSpots(image_idx); } else { auto img = reader_.GetRawImage(image_idx); if (!img) continue; MXAnalysisWithoutFPGA analysis(experiment_, mapping, pixel_mask_, *indexer); AzimuthalIntegrationProfile profile(mapping); auto first_pass = config_.spot_finding; first_pass.indexing = false; first_pass.quick_integration = false; msg.image = img->image; if (dataset->efficiency.size() > image_idx) msg.image_collection_efficiency = dataset->efficiency[image_idx]; analysis.Analyze(msg, profile, first_pass); } indexer->AddImageToRotationIndexer(msg); } catch (const std::exception &e) { logger.Warning("First-pass rotation indexing failed for image {}: {}", image_idx, e.what()); } } if (!cancelled_ && !indexer->FinalizeRotationIndexing().has_value()) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Two-pass rotation indexing failed"); if (!cancelled_) logger.Info("Two-pass rotation indexing found lattice"); } // Main per-image loop, spread over N worker threads pulling from a shared counter. HDF5 reads // are serialized by the global hdf5_mutex; the analysis runs in parallel. std::atomic next_ordinal = 0; std::atomic finished_count = 0; std::atomic total_uncompressed_bytes = 0; auto azint_worker = [&]() { std::vector decompression_buffer; ImagePreprocessorCPU preprocessor(experiment_, pixel_mask_); ImagePreprocessorBuffer buffer(experiment_.GetPixelsNum()); AzIntEngineCPU azint(mapping); AzimuthalIntegrationProfile profile(mapping); while (!cancelled_) { const int ordinal = next_ordinal.fetch_add(1); const int image_idx = start_image + ordinal * config_.stride; if (image_idx >= end_image) break; std::shared_ptr img; try { img = reader_.GetRawImage(image_idx); } catch (const std::exception &e) { logger.Error("Failed to load image {}: {}", image_idx, e.what()); continue; } if (!img) continue; DataMessage msg{}; msg.image = img->image; msg.number = ordinal; msg.original_number = image_idx; if (dataset->efficiency.size() > image_idx) msg.image_collection_efficiency = dataset->efficiency[image_idx]; total_uncompressed_bytes += msg.image.GetUncompressedSize(); const auto t0 = std::chrono::steady_clock::now(); try { const uint8_t *image_ptr = msg.image.GetUncompressedPtr(decompression_buffer); preprocessor.Analyze(buffer, image_ptr, msg.image.GetMode()); azint.Run(buffer, profile); } catch (const std::exception &e) { logger.Error("Error integrating image {}: {}", image_idx, e.what()); continue; } msg.azint_time_s = std::chrono::duration(std::chrono::steady_clock::now() - t0).count(); msg.processing_time_s = msg.azint_time_s; msg.az_int_profile = profile.GetResult(); msg.az_int_profile_count = profile.GetPixelCount(); msg.az_int_profile_std = profile.GetStd(); msg.bkg_estimate = profile.GetBkgEstimate(mapping.Settings()); msg.run_number = experiment_.GetRunNumber(); msg.run_name = experiment_.GetRunName(); plots.Add(msg, profile); if (writer) writer->Write(msg); if (observer) observer->OnImageProcessed(msg); const int done = finished_count.fetch_add(1) + 1; if (observer) observer->OnProgress(done, images_to_process); } }; auto full_worker = [&]() { pin_gpu(); // round-robin per worker thread; must precede engine construction MXAnalysisWithoutFPGA analysis(experiment_, mapping, pixel_mask_, *indexer); AzimuthalIntegrationProfile profile(mapping); while (!cancelled_) { const int ordinal = next_ordinal.fetch_add(1); const int image_idx = start_image + ordinal * config_.stride; if (image_idx >= end_image) break; std::shared_ptr img; try { img = reader_.GetRawImage(image_idx); } catch (const std::exception &e) { logger.Error("Failed to load image {}: {}", image_idx, e.what()); continue; } if (!img) continue; DataMessage msg{}; msg.image = img->image; msg.number = ordinal; msg.original_number = image_idx; if (dataset->efficiency.size() > image_idx) msg.image_collection_efficiency = dataset->efficiency[image_idx]; total_uncompressed_bytes += msg.image.GetUncompressedSize(); const auto t0 = std::chrono::steady_clock::now(); try { analysis.Analyze(msg, profile, config_.spot_finding); } catch (const std::exception &e) { logger.Error("Error analyzing image {}: {}", image_idx, e.what()); continue; } msg.processing_time_s = std::chrono::duration(std::chrono::steady_clock::now() - t0).count(); msg.run_number = experiment_.GetRunNumber(); msg.run_name = experiment_.GetRunName(); plots.Add(msg, profile); if (writer) writer->Write(msg); if (observer) observer->OnImageProcessed(msg); const int done = finished_count.fetch_add(1) + 1; if (observer) observer->OnProgress(done, images_to_process); } }; if (observer) observer->OnPhase("Processing images"); std::function worker = full ? std::function(full_worker) : std::function(azint_worker); std::vector > futures; futures.reserve(config_.nthreads); for (int i = 0; i < config_.nthreads; ++i) futures.push_back(std::async(std::launch::async, worker)); for (auto &f: futures) f.get(); const auto end_time = std::chrono::steady_clock::now(); result.cancelled = cancelled_; result.images_processed = finished_count.load(); result.processing_time_s = std::chrono::duration(end_time - start_time).count(); if (result.processing_time_s > 0.0) { result.frame_rate_hz = static_cast(result.images_processed) / result.processing_time_s; result.throughput_MBs = static_cast(total_uncompressed_bytes) / (result.processing_time_s * 1e6); } result.mean_processing_time = plots.GetMeanProcessingTime(); result.indexing_rate = plots.GetIndexingRate(); // End message (also written to the file). EndMessage end_msg; end_msg.max_image_number = result.images_processed; end_msg.images_collected_count = result.images_processed; end_msg.images_sent_to_write_count = result.images_processed; end_msg.end_date = time_UTC(std::chrono::system_clock::now()); end_msg.run_number = experiment_.GetRunNumber(); end_msg.run_name = experiment_.GetRunName(); end_msg.bkg_estimate = plots.GetBkgEstimate(); end_msg.az_int_result["dataset"] = plots.GetAzIntProfile(); end_msg.indexing_rate = result.indexing_rate; if (full && !cancelled_) { if (const auto rot = indexer->FinalizeRotationIndexing(); rot.has_value()) { end_msg.rotation_lattice = rot->lattice; end_msg.rotation_lattice_type = LatticeMessage{ .centering = rot->search_result.centering, .niggli_class = rot->search_result.niggli_class, .crystal_system = rot->search_result.system }; result.rotation_lattice_found = true; } result.consensus_cell = indexer->GetConsensusUnitCell(); end_msg.unit_cell = result.consensus_cell; } // Scaling and merging (full analysis only). if (full && !cancelled_ && result.indexing_rate.has_value() && result.indexing_rate > 0 && (config_.run_scaling || !config_.reference_data.empty())) { // Scaling/merging is a long post-pass; report each sub-step as a phase so the GUI progress // bar reflects what is happening instead of freezing on one label. Also time each phase // (logged on transition) so the bottlenecks are visible. auto t_phase = std::chrono::steady_clock::now(); std::string prev_phase; auto phase = [&](const std::string &p) { const auto now = std::chrono::steady_clock::now(); if (!prev_phase.empty()) logger.Info("[timing] {}: {:.2f} s", prev_phase, std::chrono::duration(now - t_phase).count()); t_phase = now; prev_phase = p; if (observer) observer->OnPhase(p); }; phase("Scaling and merging"); // Scale the images and merge. Factored so it can run twice: first in P1 to give the // space-group search a merged dataset, then again in the determined space group so the // scaling sees symmetry equivalents and the final statistics are in the right symmetry. // (With a user-fixed space group it simply runs once, already in that symmetry.) struct ScaleMergeResult { std::vector merged; MergeStatistics statistics; }; auto scale_and_merge = [&](const std::string &label) -> ScaleMergeResult { // ScaleOnTheFly self-scaling is only for the no-reference path; with a reference each // image is already scaled against it during the per-image pass, so we merge directly. if (config_.reference_data.empty()) { logger.Info("Running scaling ({}) ...", label); for (int i = 0; i < config_.scaling_iter; i++) { phase("Scaling images (" + label + ", iteration " + std::to_string(i + 1) + "/" + std::to_string(config_.scaling_iter) + ")"); auto merge_result = MergeAll(experiment_, indexer->GetIntegrationOutcome(), false); indexer->ScaleAllImages(merge_result); } } // -P rot3d: weight-sum each reflection's per-frame partials into one full before // merging, so the error model sees counting statistics (high ISa) instead of // rocking-curve slicing scatter. const bool rot3d = experiment_.GetScalingSettings().GetCombine3D(); if (rot3d && experiment_.GetScalingSettings().GetSmoothGWindow() > 0) { phase("Smoothing per-frame scale G"); SmoothImageScaleG(indexer->GetIntegrationOutcome(), experiment_.GetScalingSettings().GetSmoothGWindow(), logger); } std::vector combined; if (rot3d) { phase("Combining 3D partials"); combined = CombineRotationObservations(indexer->GetIntegrationOutcome(), experiment_, &logger, config_.observation_dump_path); } if (rot3d && experiment_.GetScalingSettings().GetScaleFulls()) { phase("Scaling fulls (XDS order)"); ScaleFulls(experiment_, combined, static_cast(config_.scaling_iter), config_.nthreads, logger); } const std::vector &merge_input = rot3d ? combined : indexer->GetIntegrationOutcome(); phase("Merging"); MergeOnTheFly merge_engine(experiment_); if (result.consensus_cell.has_value()) merge_engine.ReferenceCell(*result.consensus_cell); merge_engine.RefineErrorModel(merge_input); if (merge_engine.ErrorModelActive()) logger.Info("Error model: a={:.3f} b={:.3f} ISa={:.1f} chi2={:.2f}", merge_engine.ErrorModelA(), merge_engine.ErrorModelB(), merge_engine.ErrorModelB() > 0 ? 1.0 / merge_engine.ErrorModelB() : 0.0, merge_engine.ErrorModelChi2()); for (const auto &outcome: merge_input) merge_engine.AddImage(outcome); ScaleMergeResult out; out.merged = merge_engine.ExportReflections(); phase("Computing statistics"); out.statistics = merge_engine.MergeStats(out.merged, merge_input, config_.reference_data); result.error_model_isa = merge_engine.ErrorModelB() > 0 ? 1.0 / merge_engine.ErrorModelB() : 0.0; logger.Info("Merge complete ({} unique reflections, {})", out.merged.size(), label); return out; }; // First pass: P1 when searching, or directly the user-fixed space group. const auto initial_sg = experiment_.GetGemmiSpaceGroup(); auto sm = scale_and_merge(initial_sg ? initial_sg->short_name() : "P1"); std::ostringstream stats_text; if (!experiment_.GetGemmiSpaceGroup().has_value()) { SearchSpaceGroupOptions sg_opts; sg_opts.merge_friedel = experiment_.GetScalingSettings().GetMergeFriedel(); sg_opts.d_min_limit_A = experiment_.GetScalingSettings().GetHighResolutionLimit_A().value_or(0.0); // Constrain the search to subgroups of the lattice (metric) symmetry found by rotation // indexing. Centering is not constrained here - it is determined from the absences. if (end_msg.rotation_lattice_type.has_value()) sg_opts.lattice_system = end_msg.rotation_lattice_type->crystal_system; const auto sg_search = SearchSpaceGroup(sm.merged, sg_opts); stats_text << SearchSpaceGroupResultToText(sg_search) << "\n\n"; // Adopt the determined space group and re-scale/merge in it, so scaling uses symmetry // equivalents and the statistics come out in the right symmetry. P1 stands when nothing // is determined. if (sg_search.best_space_group.has_value()) { const auto &sg = *sg_search.best_space_group; logger.Info("Adopting space group {} (number {})", sg.short_name(), sg.number); experiment_.SpaceGroupNumber(sg.number); end_msg.space_group_number = sg.number; result.space_group_number = sg.number; phase("Re-scaling in space group " + sg.short_name()); sm = scale_and_merge(sg.short_name()); } } const auto twin_sg_number = experiment_.GetSpaceGroupNumber(); const gemmi::SpaceGroup *twin_sg = twin_sg_number ? gemmi::find_spacegroup_by_number(twin_sg_number.value()) : nullptr; result.twinning = AnalyzeTwinning(sm.merged, twin_sg); stats_text << TwinningAnalysisToText(result.twinning) << "\n"; stats_text << sm.statistics; result.merge_statistics_text = stats_text.str(); result.has_merge_statistics = true; result.merge_statistics = sm.statistics; result.has_reference = !config_.reference_data.empty(); if (result.consensus_cell && write_files) { phase("Writing reflections"); WriteReflections(sm.merged, *result.consensus_cell, experiment_, config_.output_prefix); // Per-image scaling table (G, B-factor, mosaicity, wedge, CC) for inspection / XDS // comparison. The offline self-scaling result is otherwise not exposed (process.h5's // per-image arrays are only filled on the online per-image path). Sourced from the // partials, which carry the first-pass per-image scale. ScalingResult(indexer->GetIntegrationOutcome()).SaveToFile(config_.output_prefix); } phase(""); // flush the last phase's timing } if (writer) { writer->WriteHDF5(end_msg); writer->Finalize(); result.written_master_path = config_.output_prefix + "_process.h5"; } if (observer) observer->OnPhase(cancelled_ ? "Cancelled" : "Done"); logger.Info("{} {} images in {:.2f} s ({:.2f} Hz)", cancelled_ ? "Cancelled after" : "Processed", result.images_processed, result.processing_time_s, result.frame_rate_hz); return result; }