// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-only #include #include #include #include #include #include #include #include #include #include "../reader/JFJochHDF5Reader.h" #include "../common/Logger.h" #include "../common/DiffractionExperiment.h" #include "../common/PixelMask.h" #include "../common/AzimuthalIntegration.h" #include "../common/time_utc.h" #include "../common/print_license.h" #include "../image_analysis/MXAnalysisWithoutFPGA.h" #include "../image_analysis/indexing/IndexerFactory.h" #include "../writer/FileWriter.h" #include "../image_analysis/IndexAndRefine.h" #include "../receiver/JFJochReceiverPlots.h" #include "../compression/JFJochCompressor.h" #include "../image_analysis/scale_merge/FrenchWilson.h" #include "../image_analysis/WriteMmcif.h" // (actually include at top of file) void print_usage(Logger &logger) { logger.Info("Usage ./jfjoch_analysis {} "); logger.Info("Options:"); logger.Info(" -o Output file prefix (default: output)"); logger.Info(" -N Number of threads (default: 1)"); logger.Info(" -s Start image number (default: 0)"); logger.Info(" -e End image number (default: all)"); logger.Info(" -v Verbose output"); logger.Info(" -R[num] Rotation indexing (optional: min angular range deg)"); logger.Info(" -F Use FFT indexing algorithm (default: Auto)"); logger.Info(" -x No least-square beam center refinement"); logger.Info(" -d High resolution limit for spot finding (default: 1.5)"); logger.Info(" -S Space group number"); logger.Info(" -M Scale and merge (refine mosaicity) and write scaled.hkl + image.dat"); logger.Info(" -L Use log-scaling residual"); logger.Info(" -m Mosaicity refinement none|fixed|image (default: image)"); logger.Info(" -A Anomalous mode (don't merge Friedel pairs)"); } int main(int argc, char **argv) { RegisterHDF5Filter(); print_license("jfjoch_analysis"); Logger logger("jfjoch_analysis"); std::string input_file; std::string output_prefix = "output"; int nthreads = 1; int start_image = 0; int end_image = -1; // -1 indicates process until end bool verbose = false; bool rotation_indexing = false; std::optional rotation_indexing_range; bool use_fft = false; bool refine_beam_center = true; bool run_scaling = false; bool anomalous_mode = false; std::optional space_group_number; bool log_residual = false; enum class MosaicityRefinementMode { None, Fixed, Image }; MosaicityRefinementMode mosaicity_refinement_mode = MosaicityRefinementMode::Image; float d_high = 1.5; if (argc == 1) { print_usage(logger); exit(EXIT_FAILURE); } int opt; while ((opt = getopt(argc, argv, "o:N:s:e:vR::Fxd:S:MLm:A")) != -1) { switch (opt) { case 'o': output_prefix = optarg; break; case 'N': nthreads = atoi(optarg); break; case 's': start_image = atoi(optarg); break; case 'e': end_image = atoi(optarg); break; case 'v': verbose = true; break; case 'R': rotation_indexing = true; if (optarg) rotation_indexing_range = atof(optarg); break; case 'F': use_fft = true; break; case 'x': refine_beam_center = false; break; case 'd': d_high = atof(optarg); break; case 'S': space_group_number = atoi(optarg); break; case 'M': run_scaling = true; break; case 'L': log_residual = true; break; case 'A': anomalous_mode = true; break; case 'm': if (strcmp(optarg, "none") == 0) mosaicity_refinement_mode = MosaicityRefinementMode::None; else if (strcmp(optarg, "fixed") == 0) mosaicity_refinement_mode = MosaicityRefinementMode::Fixed; else if (strcmp(optarg, "image") == 0) mosaicity_refinement_mode = MosaicityRefinementMode::Image; else { logger.Error("Invalid mosaicity refinement mode: {}", optarg); print_usage(logger); exit(EXIT_FAILURE); } break; default: print_usage(logger); exit(EXIT_FAILURE); } } if (optind != argc - 1) { logger.Error("Input file not specified"); print_usage(logger); exit(EXIT_FAILURE); } input_file = argv[optind]; logger.Verbose(verbose); // Validate space group number early const gemmi::SpaceGroup *space_group = nullptr; if (space_group_number.has_value()) { space_group = gemmi::find_spacegroup_by_number(space_group_number.value()); if (!space_group) { logger.Error("Unknown space group number {}", space_group_number.value()); exit(EXIT_FAILURE); } logger.Info("Using space group {} (number {})", space_group->hm, space_group_number.value()); } // 1. Read Input File JFJochHDF5Reader reader; try { reader.ReadFile(input_file); } catch (const std::exception &e) { logger.Error("Error reading input file: {}", e.what()); exit(EXIT_FAILURE); } const auto dataset = reader.GetDataset(); if (!dataset) { logger.Error("No experiment dataset found in the input file"); exit(EXIT_FAILURE); } logger.Info("Loaded dataset from {}", input_file); // 2. Setup Experiment & Components DiffractionExperiment experiment(dataset->experiment); experiment.BitDepthImage(32).Compression(CompressionAlgorithm::BSHUF_LZ4); experiment.FilePrefix(output_prefix); experiment.Mode(DetectorMode::Standard); // Ensure full image analysis experiment.PixelSigned(true); experiment.OverwriteExistingFiles(true); experiment.PolarizationFactor(0.99); // Configure Indexing IndexingSettings indexing_settings; if (use_fft) indexing_settings.Algorithm(IndexingAlgorithmEnum::FFT); else indexing_settings.Algorithm(IndexingAlgorithmEnum::Auto); indexing_settings.RotationIndexing(rotation_indexing); if (rotation_indexing_range.has_value()) indexing_settings.RotationIndexingMinAngularRange_deg(rotation_indexing_range.value()); if (refine_beam_center) indexing_settings.GeomRefinementAlgorithm(GeomRefinementAlgorithmEnum::BeamCenter); else indexing_settings.GeomRefinementAlgorithm(GeomRefinementAlgorithmEnum::None); experiment.ImportIndexingSettings(indexing_settings); SpotFindingSettings spot_settings; spot_settings.enable = true; spot_settings.indexing = true; spot_settings.high_resolution_limit = d_high; // Initialize Analysis Components PixelMask pixel_mask = dataset->pixel_mask; // If dataset has a mask you wish to use, you might need to load it into pixel_mask here // e.g. pixel_mask.LoadUserMask(dataset->pixel_mask, ...); AzimuthalIntegration mapping(experiment, pixel_mask); IndexerThreadPool indexer_pool(experiment.GetIndexingSettings()); // Statistics collector JFJochReceiverPlots plots; plots.Setup(experiment, mapping); // 3. Setup FileWriter StartMessage start_message; experiment.FillMessage(start_message); start_message.arm_date = dataset->arm_date; // Use original arm date start_message.az_int_bin_to_q = mapping.GetBinToQ(); start_message.az_int_q_bin_count = mapping.GetQBinCount(); if (mapping.GetAzimuthalBinCount() > 1) start_message.az_int_bin_to_phi = mapping.GetBinToPhi(); start_message.pixel_mask["default"] = pixel_mask.GetMask(experiment); start_message.max_spot_count = experiment.GetMaxSpotCount(); std::unique_ptr writer; try { writer = std::make_unique(start_message); } catch (const std::exception &e) { logger.Error("Failed to initialize file writer: {}", e.what()); exit(EXIT_FAILURE); } // 4. Processing Setup int total_images_in_file = reader.GetNumberOfImages(); if (end_image < 0 || end_image > total_images_in_file) end_image = total_images_in_file; int images_to_process = end_image - start_image; if (images_to_process <= 0) { logger.Warning("No images to process (Start: {}, End: {}, Total: {})", start_image, end_image, total_images_in_file); return 0; } logger.Info("Starting analysis of {} images (range {}-{}) using {} threads", images_to_process, start_image, end_image, nthreads); std::mutex reader_mutex; std::mutex plots_mutex; // Protect shared plot/stats updates if they aren't thread-safe std::atomic processed_count = 0; std::atomic total_uncompressed_bytes = 0; std::atomic max_image_number_sent = 0; // Mimic JFJochReceiver lattice handling (IndexAndRefine handles the logic per thread, // but we need a central accumulator or use the pool's functionality if IndexAndRefine wraps it) // Here we will use per-thread IndexAndRefine which uses the shared thread pool. auto start_time = std::chrono::steady_clock::now(); IndexAndRefine indexer(experiment, &indexer_pool); auto worker = [&](int thread_id) { JFJochBitShuffleCompressor compressor(experiment.GetCompressionAlgorithm()); std::vector compressed_buffer; compressed_buffer.resize(MaxCompressedSize(experiment.GetCompressionAlgorithm(), experiment.GetPixelsNum(), experiment.GetByteDepthImage())); // Thread-local analysis resources MXAnalysisWithoutFPGA analysis(experiment, mapping, pixel_mask, indexer); std::vector decomp_buffer; AzimuthalIntegrationProfile profile(mapping); while (true) { int current_idx_offset = processed_count.fetch_add(1); int image_idx = start_image + current_idx_offset; if (image_idx >= end_image) break; // Load Image std::shared_ptr img; { std::lock_guard lock(reader_mutex); try { img = reader.LoadImage(image_idx); } catch (const std::exception &e) { logger.Error("Failed to load image {}: {}", image_idx, e.what()); continue; } } if (!img) continue; DataMessage msg; msg.image = img->ImageData().image; msg.number = img->ImageData().number; msg.error_pixel_count = img->ImageData().error_pixel_count; msg.image_collection_efficiency = img->ImageData().image_collection_efficiency; msg.storage_cell = img->ImageData().storage_cell; msg.user_data = img->ImageData().user_data; total_uncompressed_bytes += msg.image.GetUncompressedSize(); auto image_start_time = std::chrono::high_resolution_clock::now(); // Analyze try { analysis.Analyze(msg, decomp_buffer, profile, spot_settings); } catch (const std::exception &e) { logger.Error("Error analyzing image {}: {}", image_idx, e.what()); continue; } auto image_end_time = std::chrono::high_resolution_clock::now(); std::chrono::duration image_duration = image_end_time - image_start_time; auto size = compressor.Compress(compressed_buffer.data(), img->Image().data(), experiment.GetPixelsNum(), sizeof(int32_t)); msg.image = CompressedImage(compressed_buffer.data(), size, experiment.GetXPixelsNum(), experiment.GetYPixelsNum(), CompressedImageMode::Int32, experiment.GetCompressionAlgorithm()); // Mimic DataMessage stats from Receiver msg.processing_time_s = image_duration.count(); msg.original_number = msg.number; msg.run_number = experiment.GetRunNumber(); msg.run_name = experiment.GetRunName(); // msg.receiver_free_send_buf <- Not relevant for file analysis // msg.receiver_aq_dev_delay <- Not relevant for file analysis // Update Plots/Stats (Thread safe update needed) { std::lock_guard lock(plots_mutex); plots.Add(msg, profile); // Write Result writer->Write(msg); } // Update max sent tracking uint64_t current_max = max_image_number_sent.load(); while (static_cast(msg.number) > current_max) { if (max_image_number_sent.compare_exchange_weak(current_max, static_cast(msg.number))) break; } // Progress log if (current_idx_offset > 0 && current_idx_offset % 100 == 0) logger.Info("Processed {} / {} images", current_idx_offset, images_to_process); } // Finalize per-thread indexing (if any per-thread aggregation is needed, though pool handles most) // IndexAndRefine doesn't have a per-thread finalize that returns a lattice, // the main lattice determination is usually done on the aggregated results in the pool or main thread }; // Launch threads std::vector > futures; futures.reserve(nthreads); for (int i = 0; i < nthreads; ++i) { futures.push_back(std::async(std::launch::async, worker, i)); } // Wait for completion for (auto &f: futures) { f.get(); } auto end_time = std::chrono::steady_clock::now(); // 5. Finalize Statistics and Write EndMessage EndMessage end_msg; end_msg.max_image_number = max_image_number_sent; end_msg.images_collected_count = images_to_process; end_msg.images_sent_to_write_count = images_to_process; end_msg.end_date = time_UTC(std::chrono::system_clock::now()); end_msg.run_number = experiment.GetRunNumber(); end_msg.run_name = experiment.GetRunName(); // Gather statistics from plots end_msg.bkg_estimate = plots.GetBkgEstimate(); end_msg.indexing_rate = plots.GetIndexingRate(); end_msg.az_int_result["dataset"] = plots.GetAzIntProfile(); // Finalize Indexing (Global) to get rotation lattice // We create a temporary IndexAndRefine to call Finalize() which aggregates pool results const auto rotation_indexer_ret = indexer.Finalize(); if (rotation_indexer_ret.has_value()) { end_msg.rotation_lattice = rotation_indexer_ret->lattice; end_msg.rotation_lattice_type = LatticeMessage{ .centering = rotation_indexer_ret->search_result.centering, .niggli_class = rotation_indexer_ret->search_result.niggli_class, .crystal_system = rotation_indexer_ret->search_result.system }; logger.Info("Rotation Indexing found lattice"); } // --- Optional: run scaling (mosaicity refinement) on accumulated reflections --- if (run_scaling) { logger.Info("Running scaling (mosaicity refinement) ..."); ScaleMergeOptions scale_opts; scale_opts.refine_mosaicity = true; scale_opts.max_num_iterations = 500; scale_opts.max_solver_time_s = 240.0; // generous cutoff for now scale_opts.log_scaling_residual = log_residual; scale_opts.merge_friedel = !anomalous_mode; switch (mosaicity_refinement_mode) { case MosaicityRefinementMode::None: scale_opts.refine_mosaicity = false; break; case MosaicityRefinementMode::Fixed: scale_opts.refine_mosaicity = true; scale_opts.per_image_mosaicity = false; break; case MosaicityRefinementMode::Image: scale_opts.refine_mosaicity = true; scale_opts.per_image_mosaicity = true; break; } if (space_group) scale_opts.space_group = *space_group; else scale_opts.space_group = experiment.GetGemmiSpaceGroup(); auto scale_start = std::chrono::steady_clock::now(); auto scale_result = indexer.ScaleRotationData(scale_opts); auto scale_end = std::chrono::steady_clock::now(); double scale_time = std::chrono::duration(scale_end - scale_start).count(); if (scale_result) { // ... existing code ... logger.Info("Scaling completed in {:.2f} s ({} unique reflections, {} images)", scale_time, scale_result->merged.size(), scale_result->image_ids.size()); // Print resolution-shell statistics table { const auto& stats = scale_result->statistics; logger.Info(""); logger.Info(" {:>8s} {:>8s} {:>8s} {:>8s} {:>8s} {:>10s}", "d_min", "N_obs", "N_uniq", "Rmeas", "", "Complete"); logger.Info(" {:->8s} {:->8s} {:->8s} {:->8s} {:->8s} {:->10s}", "", "", "", "", "", ""); for (const auto& sh : stats.shells) { if (sh.unique_reflections == 0) continue; std::string compl_str = (sh.completeness > 0.0) ? fmt::format("{:8.1f}%", sh.completeness * 100.0) : " N/A"; logger.Info(" {:8.2f} {:8d} {:8d} {:8.3f}% {:8.1f} {:>10s}", sh.d_min, sh.total_observations, sh.unique_reflections, sh.rmeas * 100, sh.mean_i_over_sigma, compl_str); } // Overall { const auto& ov = stats.overall; logger.Info(" {:->8s} {:->8s} {:->8s} {:->8s} {:->8s} {:->10s}", "", "", "", "", "", ""); std::string compl_str = (ov.completeness > 0.0) ? fmt::format("{:8.1f}%", ov.completeness * 100.0) : " N/A"; logger.Info(" {:>8s} {:8d} {:8d} {:8.3f}% {:8.1f} {:>10s}", "Overall", ov.total_observations, ov.unique_reflections, ov.rmeas * 100, ov.mean_i_over_sigma, compl_str); } logger.Info(""); } // Write image.dat (image_id mosaicity_deg K) { const std::string img_path = output_prefix + "_image.dat"; std::ofstream img_file(img_path); if (!img_file) { logger.Error("Cannot open {} for writing", img_path); } else { img_file << "# image_id mosaicity_deg K\n"; for (size_t i = 0; i < scale_result->image_ids.size(); ++i) { img_file << scale_result->image_ids[i] << " " << scale_result->mosaicity_deg[i] << " " << scale_result->image_scale_g[i] << "\n"; } img_file.close(); logger.Info("Wrote {} image records to {}", scale_result->image_ids.size(), img_path); } } // --- French-Wilson: convert I → F --- { FrenchWilsonOptions fw_opts; fw_opts.acentric = true; // typical for MX fw_opts.num_shells = 20; auto fw = FrenchWilson(scale_result->merged, fw_opts); { { // Write scaled.hkl (h k l I sigma) const std::string hkl_path = output_prefix + "_amplitudes.hkl"; std::ofstream hkl_file(hkl_path); if (!hkl_file) { logger.Error("Cannot open {} for writing", hkl_path); } else { for (const auto &r: fw) { hkl_file << r.h << " " << r.k << " " << r.l << " " << r.F << " " << r.sigmaF << " " << r.I << " " << r.sigmaI << "\n"; } hkl_file.close(); logger.Info("Wrote {} reflections to {}", scale_result->merged.size(), hkl_path); } } MmcifMetadata cif_meta; // Unit cell — from rotation indexing result or experiment setting if (rotation_indexer_ret.has_value()) { cif_meta.unit_cell = rotation_indexer_ret->lattice.GetUnitCell(); } else if (experiment.GetUnitCell().has_value()) { cif_meta.unit_cell = experiment.GetUnitCell().value(); } // Space group if (auto sg = experiment.GetGemmiSpaceGroup(); sg.has_value()) { cif_meta.space_group_name = sg->hm; cif_meta.space_group_number = sg->number; } else if (space_group) { cif_meta.space_group_name = space_group->hm; cif_meta.space_group_number = space_group->number; } // Detector & experiment info cif_meta.detector_name = experiment.GetDetectorDescription(); cif_meta.wavelength_A = experiment.GetWavelength_A(); cif_meta.detector_distance_mm = experiment.GetDetectorDistance_mm(); cif_meta.sample_temperature_K = experiment.GetSampleTemperature_K(); cif_meta.sample_name = experiment.GetSampleName(); cif_meta.data_block_name = output_prefix; cif_meta.beamline = experiment.GetInstrumentName(); cif_meta.source = experiment.GetSourceName(); const std::string cif_path = output_prefix + "_amplitudes.cif"; try { WriteMmcifReflections(cif_path, fw, cif_meta); logger.Info("Wrote mmCIF reflections to {}", cif_path); } catch (const std::exception &e) { logger.Error("Failed to write mmCIF: {}", e.what()); } } } } else { logger.Warning("Scaling skipped — too few reflections accumulated (need >= 20)"); logger.Info("Scaling wall-clock time: {:.2f} s", scale_time); } } // Write End Message writer->WriteHDF5(end_msg); auto stats = writer->Finalize(); // 6. Report Statistics to Console double processing_time = std::chrono::duration(end_time - start_time).count(); double throughput_MBs = static_cast(total_uncompressed_bytes) / (processing_time * 1e6); double frame_rate = static_cast(images_to_process) / processing_time; logger.Info(""); logger.Info("Processing time: {:.2f} s", processing_time); logger.Info("Frame rate: {:.2f} Hz", frame_rate); logger.Info("Total throughput:{:.2f} MB/s", throughput_MBs); logger.Info("Images written: {}", stats.size()); // Print extended stats similar to Receiver if (!end_msg.indexing_rate.has_value()) { logger.Info("Indexing rate: {:.2f}%", end_msg.indexing_rate.value() * 100.0); } if (rotation_indexer_ret.has_value()) { auto vec0 = rotation_indexer_ret->lattice.Vec0(); auto vec1 = rotation_indexer_ret->lattice.Vec1(); auto vec2 = rotation_indexer_ret->lattice.Vec2(); auto uc = rotation_indexer_ret->lattice.GetUnitCell(); logger.Info("Lattice vec0: {:.3f}, {:.3f}, {:.3f}", vec0.x, vec0.y, vec0.z); logger.Info("Lattice vec1: {:.3f}, {:.3f}, {:.3f}", vec1.x, vec1.y, vec1.z); logger.Info("Lattice vec2: {:.3f}, {:.3f}, {:.3f}", vec2.x, vec2.y, vec2.z); logger.Info("Lattice: a={:.2f} b={:.2f} c={:.2f} alpha={:.2f} beta={:.2f} gamma={:.2f}", uc.a, uc.b, uc.c, uc.alpha, uc.beta, uc.gamma); } if (stats.empty()) { logger.Info("Analysis finished with warnings (no files written)"); exit(EXIT_FAILURE); } else { logger.Info("Analysis successfully finished"); exit(EXIT_SUCCESS); } }