Files
Jungfraujoch/tools/jfjoch_process.cpp
Filip Leonarski 1ab257af6c
All checks were successful
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 12m8s
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 12m57s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 12m55s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 12m0s
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 13m30s
Build Packages / Generate python client (push) Successful in 20s
Build Packages / Unit tests (push) Has been skipped
Build Packages / Create release (push) Has been skipped
Build Packages / Build documentation (push) Successful in 39s
Build Packages / build:rpm (rocky8) (push) Successful in 9m23s
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 10m33s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 8m2s
Build Packages / build:rpm (ubuntu2204) (push) Successful in 8m42s
Build Packages / build:rpm (rocky9) (push) Successful in 9m38s
v1.0.0-rc.125 (#32)
This is an UNSTABLE release. This version adds scalign and merging. These are experimental at the moment, and should not be used for production analysis.
If things go wrong with analysis, it is better to revert to 1.0.0-rc.124.

* jfjoch_broker: Improve logic on switching on/off spot finding
* jfjoch_broker: Increase maximum spot count for FFBIDX to 65536
* jfjoch_broker: Increase default maximum unit cell for FFT to 500 A (could have performance impact, TBD)
* jfjoch_process: Add scalign and merging functionality - program is experimental at the moment and should not be used for production analysis
* jfjoch_viewer: Display partiality and reciprocal Lorentz-polarization correction for each reflection
* jfjoch_writer: Save more information about each reflection

Reviewed-on: #32
Co-authored-by: Filip Leonarski <filip.leonarski@psi.ch>
Co-committed-by: Filip Leonarski <filip.leonarski@psi.ch>
2026-02-18 16:17:21 +01:00

613 lines
25 KiB
C++

// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
// SPDX-License-Identifier: GPL-3.0-only
#include <iostream>
#include <vector>
#include <string>
#include <unistd.h>
#include <future>
#include <mutex>
#include <atomic>
#include <chrono>
#include <fstream>
#include "../reader/JFJochHDF5Reader.h"
#include "../common/Logger.h"
#include "../common/DiffractionExperiment.h"
#include "../common/PixelMask.h"
#include "../common/AzimuthalIntegration.h"
#include "../common/time_utc.h"
#include "../common/print_license.h"
#include "../image_analysis/MXAnalysisWithoutFPGA.h"
#include "../image_analysis/indexing/IndexerFactory.h"
#include "../writer/FileWriter.h"
#include "../image_analysis/IndexAndRefine.h"
#include "../receiver/JFJochReceiverPlots.h"
#include "../compression/JFJochCompressor.h"
#include "../image_analysis/scale_merge/FrenchWilson.h"
#include "../image_analysis/WriteMmcif.h" // (actually include at top of file)
void print_usage(Logger &logger) {
logger.Info("Usage ./jfjoch_analysis {<options>} <input.h5>");
logger.Info("Options:");
logger.Info(" -o<txt> Output file prefix (default: output)");
logger.Info(" -N<num> Number of threads (default: 1)");
logger.Info(" -s<num> Start image number (default: 0)");
logger.Info(" -e<num> End image number (default: all)");
logger.Info(" -v Verbose output");
logger.Info(" -R[num] Rotation indexing (optional: min angular range deg)");
logger.Info(" -F Use FFT indexing algorithm (default: Auto)");
logger.Info(" -x No least-square beam center refinement");
logger.Info(" -d<num> High resolution limit for spot finding (default: 1.5)");
logger.Info(" -S<num> Space group number");
logger.Info(" -M Scale and merge (refine mosaicity) and write scaled.hkl + image.dat");
logger.Info(" -L Use log-scaling residual");
logger.Info(" -m<txt> Mosaicity refinement none|fixed|image (default: image)");
logger.Info(" -A Anomalous mode (don't merge Friedel pairs)");
}
int main(int argc, char **argv) {
RegisterHDF5Filter();
print_license("jfjoch_analysis");
Logger logger("jfjoch_analysis");
std::string input_file;
std::string output_prefix = "output";
int nthreads = 1;
int start_image = 0;
int end_image = -1; // -1 indicates process until end
bool verbose = false;
bool rotation_indexing = false;
std::optional<float> rotation_indexing_range;
bool use_fft = false;
bool refine_beam_center = true;
bool run_scaling = false;
bool anomalous_mode = false;
std::optional<int> space_group_number;
bool log_residual = false;
enum class MosaicityRefinementMode { None, Fixed, Image };
MosaicityRefinementMode mosaicity_refinement_mode = MosaicityRefinementMode::Image;
float d_high = 1.5;
if (argc == 1) {
print_usage(logger);
exit(EXIT_FAILURE);
}
int opt;
while ((opt = getopt(argc, argv, "o:N:s:e:vR::Fxd:S:MLm:A")) != -1) {
switch (opt) {
case 'o':
output_prefix = optarg;
break;
case 'N':
nthreads = atoi(optarg);
break;
case 's':
start_image = atoi(optarg);
break;
case 'e':
end_image = atoi(optarg);
break;
case 'v':
verbose = true;
break;
case 'R':
rotation_indexing = true;
if (optarg) rotation_indexing_range = atof(optarg);
break;
case 'F':
use_fft = true;
break;
case 'x':
refine_beam_center = false;
break;
case 'd':
d_high = atof(optarg);
break;
case 'S':
space_group_number = atoi(optarg);
break;
case 'M':
run_scaling = true;
break;
case 'L':
log_residual = true;
break;
case 'A':
anomalous_mode = true;
break;
case 'm':
if (strcmp(optarg, "none") == 0)
mosaicity_refinement_mode = MosaicityRefinementMode::None;
else if (strcmp(optarg, "fixed") == 0)
mosaicity_refinement_mode = MosaicityRefinementMode::Fixed;
else if (strcmp(optarg, "image") == 0)
mosaicity_refinement_mode = MosaicityRefinementMode::Image;
else {
logger.Error("Invalid mosaicity refinement mode: {}", optarg);
print_usage(logger);
exit(EXIT_FAILURE);
}
break;
default:
print_usage(logger);
exit(EXIT_FAILURE);
}
}
if (optind != argc - 1) {
logger.Error("Input file not specified");
print_usage(logger);
exit(EXIT_FAILURE);
}
input_file = argv[optind];
logger.Verbose(verbose);
// Validate space group number early
const gemmi::SpaceGroup *space_group = nullptr;
if (space_group_number.has_value()) {
space_group = gemmi::find_spacegroup_by_number(space_group_number.value());
if (!space_group) {
logger.Error("Unknown space group number {}", space_group_number.value());
exit(EXIT_FAILURE);
}
logger.Info("Using space group {} (number {})", space_group->hm, space_group_number.value());
}
// 1. Read Input File
JFJochHDF5Reader reader;
try {
reader.ReadFile(input_file);
} catch (const std::exception &e) {
logger.Error("Error reading input file: {}", e.what());
exit(EXIT_FAILURE);
}
const auto dataset = reader.GetDataset();
if (!dataset) {
logger.Error("No experiment dataset found in the input file");
exit(EXIT_FAILURE);
}
logger.Info("Loaded dataset from {}", input_file);
// 2. Setup Experiment & Components
DiffractionExperiment experiment(dataset->experiment);
experiment.BitDepthImage(32).Compression(CompressionAlgorithm::BSHUF_LZ4);
experiment.FilePrefix(output_prefix);
experiment.Mode(DetectorMode::Standard); // Ensure full image analysis
experiment.PixelSigned(true);
experiment.OverwriteExistingFiles(true);
experiment.PolarizationFactor(0.99);
// Configure Indexing
IndexingSettings indexing_settings;
if (use_fft)
indexing_settings.Algorithm(IndexingAlgorithmEnum::FFT);
else
indexing_settings.Algorithm(IndexingAlgorithmEnum::Auto);
indexing_settings.RotationIndexing(rotation_indexing);
if (rotation_indexing_range.has_value())
indexing_settings.RotationIndexingMinAngularRange_deg(rotation_indexing_range.value());
if (refine_beam_center)
indexing_settings.GeomRefinementAlgorithm(GeomRefinementAlgorithmEnum::BeamCenter);
else
indexing_settings.GeomRefinementAlgorithm(GeomRefinementAlgorithmEnum::None);
experiment.ImportIndexingSettings(indexing_settings);
SpotFindingSettings spot_settings;
spot_settings.enable = true;
spot_settings.indexing = true;
spot_settings.high_resolution_limit = d_high;
// Initialize Analysis Components
PixelMask pixel_mask = dataset->pixel_mask;
// If dataset has a mask you wish to use, you might need to load it into pixel_mask here
// e.g. pixel_mask.LoadUserMask(dataset->pixel_mask, ...);
AzimuthalIntegration mapping(experiment, pixel_mask);
IndexerThreadPool indexer_pool(experiment.GetIndexingSettings());
// Statistics collector
JFJochReceiverPlots plots;
plots.Setup(experiment, mapping);
// 3. Setup FileWriter
StartMessage start_message;
experiment.FillMessage(start_message);
start_message.arm_date = dataset->arm_date; // Use original arm date
start_message.az_int_bin_to_q = mapping.GetBinToQ();
start_message.az_int_q_bin_count = mapping.GetQBinCount();
if (mapping.GetAzimuthalBinCount() > 1)
start_message.az_int_bin_to_phi = mapping.GetBinToPhi();
start_message.pixel_mask["default"] = pixel_mask.GetMask(experiment);
start_message.max_spot_count = experiment.GetMaxSpotCount();
std::unique_ptr<FileWriter> writer;
try {
writer = std::make_unique<FileWriter>(start_message);
} catch (const std::exception &e) {
logger.Error("Failed to initialize file writer: {}", e.what());
exit(EXIT_FAILURE);
}
// 4. Processing Setup
int total_images_in_file = reader.GetNumberOfImages();
if (end_image < 0 || end_image > total_images_in_file)
end_image = total_images_in_file;
int images_to_process = end_image - start_image;
if (images_to_process <= 0) {
logger.Warning("No images to process (Start: {}, End: {}, Total: {})", start_image, end_image,
total_images_in_file);
return 0;
}
logger.Info("Starting analysis of {} images (range {}-{}) using {} threads",
images_to_process, start_image, end_image, nthreads);
std::mutex reader_mutex;
std::mutex plots_mutex; // Protect shared plot/stats updates if they aren't thread-safe
std::atomic<int> processed_count = 0;
std::atomic<uint64_t> total_uncompressed_bytes = 0;
std::atomic<uint64_t> max_image_number_sent = 0;
// Mimic JFJochReceiver lattice handling (IndexAndRefine handles the logic per thread,
// but we need a central accumulator or use the pool's functionality if IndexAndRefine wraps it)
// Here we will use per-thread IndexAndRefine which uses the shared thread pool.
auto start_time = std::chrono::steady_clock::now();
IndexAndRefine indexer(experiment, &indexer_pool);
auto worker = [&](int thread_id) {
JFJochBitShuffleCompressor compressor(experiment.GetCompressionAlgorithm());
std::vector<uint8_t> compressed_buffer;
compressed_buffer.resize(MaxCompressedSize(experiment.GetCompressionAlgorithm(),
experiment.GetPixelsNum(),
experiment.GetByteDepthImage()));
// Thread-local analysis resources
MXAnalysisWithoutFPGA analysis(experiment, mapping, pixel_mask, indexer);
std::vector<uint8_t> decomp_buffer;
AzimuthalIntegrationProfile profile(mapping);
while (true) {
int current_idx_offset = processed_count.fetch_add(1);
int image_idx = start_image + current_idx_offset;
if (image_idx >= end_image) break;
// Load Image
std::shared_ptr<JFJochReaderImage> img; {
std::lock_guard<std::mutex> lock(reader_mutex);
try {
img = reader.LoadImage(image_idx);
} catch (const std::exception &e) {
logger.Error("Failed to load image {}: {}", image_idx, e.what());
continue;
}
}
if (!img) continue;
DataMessage msg;
msg.image = img->ImageData().image;
msg.number = img->ImageData().number;
msg.error_pixel_count = img->ImageData().error_pixel_count;
msg.image_collection_efficiency = img->ImageData().image_collection_efficiency;
msg.storage_cell = img->ImageData().storage_cell;
msg.user_data = img->ImageData().user_data;
total_uncompressed_bytes += msg.image.GetUncompressedSize();
auto image_start_time = std::chrono::high_resolution_clock::now();
// Analyze
try {
analysis.Analyze(msg, decomp_buffer, profile, spot_settings);
} catch (const std::exception &e) {
logger.Error("Error analyzing image {}: {}", image_idx, e.what());
continue;
}
auto image_end_time = std::chrono::high_resolution_clock::now();
std::chrono::duration<float> image_duration = image_end_time - image_start_time;
auto size = compressor.Compress(compressed_buffer.data(),
img->Image().data(),
experiment.GetPixelsNum(),
sizeof(int32_t));
msg.image = CompressedImage(compressed_buffer.data(),
size, experiment.GetXPixelsNum(),
experiment.GetYPixelsNum(),
CompressedImageMode::Int32,
experiment.GetCompressionAlgorithm());
// Mimic DataMessage stats from Receiver
msg.processing_time_s = image_duration.count();
msg.original_number = msg.number;
msg.run_number = experiment.GetRunNumber();
msg.run_name = experiment.GetRunName();
// msg.receiver_free_send_buf <- Not relevant for file analysis
// msg.receiver_aq_dev_delay <- Not relevant for file analysis
// Update Plots/Stats (Thread safe update needed)
{
std::lock_guard<std::mutex> lock(plots_mutex);
plots.Add(msg, profile);
// Write Result
writer->Write(msg);
}
// Update max sent tracking
uint64_t current_max = max_image_number_sent.load();
while (static_cast<uint64_t>(msg.number) > current_max) {
if (max_image_number_sent.compare_exchange_weak(current_max, static_cast<uint64_t>(msg.number)))
break;
}
// Progress log
if (current_idx_offset > 0 && current_idx_offset % 100 == 0)
logger.Info("Processed {} / {} images", current_idx_offset, images_to_process);
}
// Finalize per-thread indexing (if any per-thread aggregation is needed, though pool handles most)
// IndexAndRefine doesn't have a per-thread finalize that returns a lattice,
// the main lattice determination is usually done on the aggregated results in the pool or main thread
};
// Launch threads
std::vector<std::future<void> > futures;
futures.reserve(nthreads);
for (int i = 0; i < nthreads; ++i) {
futures.push_back(std::async(std::launch::async, worker, i));
}
// Wait for completion
for (auto &f: futures) {
f.get();
}
auto end_time = std::chrono::steady_clock::now();
// 5. Finalize Statistics and Write EndMessage
EndMessage end_msg;
end_msg.max_image_number = max_image_number_sent;
end_msg.images_collected_count = images_to_process;
end_msg.images_sent_to_write_count = images_to_process;
end_msg.end_date = time_UTC(std::chrono::system_clock::now());
end_msg.run_number = experiment.GetRunNumber();
end_msg.run_name = experiment.GetRunName();
// Gather statistics from plots
end_msg.bkg_estimate = plots.GetBkgEstimate();
end_msg.indexing_rate = plots.GetIndexingRate();
end_msg.az_int_result["dataset"] = plots.GetAzIntProfile();
// Finalize Indexing (Global) to get rotation lattice
// We create a temporary IndexAndRefine to call Finalize() which aggregates pool results
const auto rotation_indexer_ret = indexer.Finalize();
if (rotation_indexer_ret.has_value()) {
end_msg.rotation_lattice = rotation_indexer_ret->lattice;
end_msg.rotation_lattice_type = LatticeMessage{
.centering = rotation_indexer_ret->search_result.centering,
.niggli_class = rotation_indexer_ret->search_result.niggli_class,
.crystal_system = rotation_indexer_ret->search_result.system
};
logger.Info("Rotation Indexing found lattice");
}
// --- Optional: run scaling (mosaicity refinement) on accumulated reflections ---
if (run_scaling) {
logger.Info("Running scaling (mosaicity refinement) ...");
ScaleMergeOptions scale_opts;
scale_opts.refine_mosaicity = true;
scale_opts.max_num_iterations = 500;
scale_opts.max_solver_time_s = 240.0; // generous cutoff for now
scale_opts.log_scaling_residual = log_residual;
scale_opts.merge_friedel = !anomalous_mode;
switch (mosaicity_refinement_mode) {
case MosaicityRefinementMode::None:
scale_opts.refine_mosaicity = false;
break;
case MosaicityRefinementMode::Fixed:
scale_opts.refine_mosaicity = true;
scale_opts.per_image_mosaicity = false;
break;
case MosaicityRefinementMode::Image:
scale_opts.refine_mosaicity = true;
scale_opts.per_image_mosaicity = true;
break;
}
if (space_group)
scale_opts.space_group = *space_group;
else
scale_opts.space_group = experiment.GetGemmiSpaceGroup();
auto scale_start = std::chrono::steady_clock::now();
auto scale_result = indexer.ScaleRotationData(scale_opts);
auto scale_end = std::chrono::steady_clock::now();
double scale_time = std::chrono::duration<double>(scale_end - scale_start).count();
if (scale_result) {
// ... existing code ...
logger.Info("Scaling completed in {:.2f} s ({} unique reflections, {} images)",
scale_time,
scale_result->merged.size(),
scale_result->image_ids.size());
// Print resolution-shell statistics table
{
const auto& stats = scale_result->statistics;
logger.Info("");
logger.Info(" {:>8s} {:>8s} {:>8s} {:>8s} {:>8s} {:>10s}",
"d_min", "N_obs", "N_uniq", "Rmeas", "<I/sig>", "Complete");
logger.Info(" {:->8s} {:->8s} {:->8s} {:->8s} {:->8s} {:->10s}",
"", "", "", "", "", "");
for (const auto& sh : stats.shells) {
if (sh.unique_reflections == 0)
continue;
std::string compl_str = (sh.completeness > 0.0)
? fmt::format("{:8.1f}%", sh.completeness * 100.0)
: " N/A";
logger.Info(" {:8.2f} {:8d} {:8d} {:8.3f}% {:8.1f} {:>10s}",
sh.d_min, sh.total_observations, sh.unique_reflections,
sh.rmeas * 100, sh.mean_i_over_sigma, compl_str);
}
// Overall
{
const auto& ov = stats.overall;
logger.Info(" {:->8s} {:->8s} {:->8s} {:->8s} {:->8s} {:->10s}",
"", "", "", "", "", "");
std::string compl_str = (ov.completeness > 0.0)
? fmt::format("{:8.1f}%", ov.completeness * 100.0)
: " N/A";
logger.Info(" {:>8s} {:8d} {:8d} {:8.3f}% {:8.1f} {:>10s}",
"Overall", ov.total_observations, ov.unique_reflections,
ov.rmeas * 100, ov.mean_i_over_sigma, compl_str);
}
logger.Info("");
}
// Write image.dat (image_id mosaicity_deg K)
{
const std::string img_path = output_prefix + "_image.dat";
std::ofstream img_file(img_path);
if (!img_file) {
logger.Error("Cannot open {} for writing", img_path);
} else {
img_file << "# image_id mosaicity_deg K\n";
for (size_t i = 0; i < scale_result->image_ids.size(); ++i) {
img_file << scale_result->image_ids[i] << " "
<< scale_result->mosaicity_deg[i] << " "
<< scale_result->image_scale_g[i] << "\n";
}
img_file.close();
logger.Info("Wrote {} image records to {}", scale_result->image_ids.size(), img_path);
}
}
// --- French-Wilson: convert I → F ---
{
FrenchWilsonOptions fw_opts;
fw_opts.acentric = true; // typical for MX
fw_opts.num_shells = 20;
auto fw = FrenchWilson(scale_result->merged, fw_opts);
{
{
// Write scaled.hkl (h k l I sigma)
const std::string hkl_path = output_prefix + "_amplitudes.hkl";
std::ofstream hkl_file(hkl_path);
if (!hkl_file) {
logger.Error("Cannot open {} for writing", hkl_path);
} else {
for (const auto &r: fw) {
hkl_file << r.h << " " << r.k << " " << r.l << " "
<< r.F << " " << r.sigmaF << " "
<< r.I << " " << r.sigmaI
<< "\n";
}
hkl_file.close();
logger.Info("Wrote {} reflections to {}", scale_result->merged.size(), hkl_path);
}
}
MmcifMetadata cif_meta;
// Unit cell — from rotation indexing result or experiment setting
if (rotation_indexer_ret.has_value()) {
cif_meta.unit_cell = rotation_indexer_ret->lattice.GetUnitCell();
} else if (experiment.GetUnitCell().has_value()) {
cif_meta.unit_cell = experiment.GetUnitCell().value();
}
// Space group
if (auto sg = experiment.GetGemmiSpaceGroup(); sg.has_value()) {
cif_meta.space_group_name = sg->hm;
cif_meta.space_group_number = sg->number;
} else if (space_group) {
cif_meta.space_group_name = space_group->hm;
cif_meta.space_group_number = space_group->number;
}
// Detector & experiment info
cif_meta.detector_name = experiment.GetDetectorDescription();
cif_meta.wavelength_A = experiment.GetWavelength_A();
cif_meta.detector_distance_mm = experiment.GetDetectorDistance_mm();
cif_meta.sample_temperature_K = experiment.GetSampleTemperature_K();
cif_meta.sample_name = experiment.GetSampleName();
cif_meta.data_block_name = output_prefix;
cif_meta.beamline = experiment.GetInstrumentName();
cif_meta.source = experiment.GetSourceName();
const std::string cif_path = output_prefix + "_amplitudes.cif";
try {
WriteMmcifReflections(cif_path, fw, cif_meta);
logger.Info("Wrote mmCIF reflections to {}", cif_path);
} catch (const std::exception &e) {
logger.Error("Failed to write mmCIF: {}", e.what());
}
}
}
} else {
logger.Warning("Scaling skipped — too few reflections accumulated (need >= 20)");
logger.Info("Scaling wall-clock time: {:.2f} s", scale_time);
}
}
// Write End Message
writer->WriteHDF5(end_msg);
auto stats = writer->Finalize();
// 6. Report Statistics to Console
double processing_time = std::chrono::duration<double>(end_time - start_time).count();
double throughput_MBs = static_cast<double>(total_uncompressed_bytes) / (processing_time * 1e6);
double frame_rate = static_cast<double>(images_to_process) / processing_time;
logger.Info("");
logger.Info("Processing time: {:.2f} s", processing_time);
logger.Info("Frame rate: {:.2f} Hz", frame_rate);
logger.Info("Total throughput:{:.2f} MB/s", throughput_MBs);
logger.Info("Images written: {}", stats.size());
// Print extended stats similar to Receiver
if (!end_msg.indexing_rate.has_value()) {
logger.Info("Indexing rate: {:.2f}%", end_msg.indexing_rate.value() * 100.0);
}
if (rotation_indexer_ret.has_value()) {
auto uc = rotation_indexer_ret->lattice.GetUnitCell();
logger.Info("Lattice: a={:.2f} b={:.2f} c={:.2f} alpha={:.2f} beta={:.2f} gamma={:.2f}",
uc.a, uc.b, uc.c, uc.alpha, uc.beta, uc.gamma);
}
if (stats.empty()) {
logger.Info("Analysis finished with warnings (no files written)");
exit(EXIT_FAILURE);
} else {
logger.Info("Analysis successfully finished");
exit(EXIT_SUCCESS);
}
}