Files
Jungfraujoch/tools/jfjoch_process.cpp

511 lines
20 KiB
C++

// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
// SPDX-License-Identifier: GPL-3.0-only
#include <iostream>
#include <vector>
#include <string>
#include <unistd.h>
#include <future>
#include <mutex>
#include <atomic>
#include <chrono>
#include <fstream>
#include "../reader/JFJochHDF5Reader.h"
#include "../common/Logger.h"
#include "../common/DiffractionExperiment.h"
#include "../common/PixelMask.h"
#include "../common/AzimuthalIntegration.h"
#include "../common/time_utc.h"
#include "../common/print_license.h"
#include "../image_analysis/MXAnalysisWithoutFPGA.h"
#include "../image_analysis/indexing/IndexerFactory.h"
#include "../writer/FileWriter.h"
#include "../image_analysis/IndexAndRefine.h"
#include "../receiver/JFJochReceiverPlots.h"
#include "../compression/JFJochCompressor.h"
#include "../image_analysis/scale_merge/FrenchWilson.h"
void print_usage(Logger &logger) {
logger.Info("Usage ./jfjoch_analysis {<options>} <input.h5>");
logger.Info("Options:");
logger.Info(" -o<txt> Output file prefix (default: output)");
logger.Info(" -N<num> Number of threads (default: 1)");
logger.Info(" -s<num> Start image number (default: 0)");
logger.Info(" -e<num> End image number (default: all)");
logger.Info(" -v Verbose output");
logger.Info(" -R[num] Rotation indexing (optional: min angular range deg)");
logger.Info(" -F Use FFT indexing algorithm (default: Auto)");
logger.Info(" -x No least-square beam center refinement");
logger.Info(" -d<num> High resolution limit for spot finding (default: 1.5)");
logger.Info(" -S<num> Space group number");
logger.Info(" -M Scale and merge (refine mosaicity) and write scaled.hkl + image.dat");
}
int main(int argc, char **argv) {
RegisterHDF5Filter();
print_license("jfjoch_analysis");
Logger logger("jfjoch_analysis");
std::string input_file;
std::string output_prefix = "output";
int nthreads = 1;
int start_image = 0;
int end_image = -1; // -1 indicates process until end
bool verbose = false;
bool rotation_indexing = false;
std::optional<float> rotation_indexing_range;
bool use_fft = false;
bool refine_beam_center = true;
bool run_scaling = false;
std::optional<int> space_group_number;
float d_high = 1.5;
if (argc == 1) {
print_usage(logger);
exit(EXIT_FAILURE);
}
int opt;
while ((opt = getopt(argc, argv, "o:N:s:e:vR::Fxd:S:M")) != -1) {
switch (opt) {
case 'o':
output_prefix = optarg;
break;
case 'N':
nthreads = atoi(optarg);
break;
case 's':
start_image = atoi(optarg);
break;
case 'e':
end_image = atoi(optarg);
break;
case 'v':
verbose = true;
break;
case 'R':
rotation_indexing = true;
if (optarg) rotation_indexing_range = atof(optarg);
break;
case 'F':
use_fft = true;
break;
case 'x':
refine_beam_center = false;
break;
case 'd':
d_high = atof(optarg);
break;
case 'S':
space_group_number = atoi(optarg);
break;
case 'M':
run_scaling = true;
break;
default:
print_usage(logger);
exit(EXIT_FAILURE);
}
}
if (optind != argc - 1) {
logger.Error("Input file not specified");
print_usage(logger);
exit(EXIT_FAILURE);
}
input_file = argv[optind];
logger.Verbose(verbose);
// Validate space group number early
const gemmi::SpaceGroup* space_group = nullptr;
if (space_group_number.has_value()) {
space_group = gemmi::find_spacegroup_by_number(space_group_number.value());
if (!space_group) {
logger.Error("Unknown space group number {}", space_group_number.value());
exit(EXIT_FAILURE);
}
logger.Info("Using space group {} (number {})", space_group->hm, space_group_number.value());
}
// 1. Read Input File
JFJochHDF5Reader reader;
try {
reader.ReadFile(input_file);
} catch (const std::exception &e) {
logger.Error("Error reading input file: {}", e.what());
exit(EXIT_FAILURE);
}
const auto dataset = reader.GetDataset();
if (!dataset) {
logger.Error("No experiment dataset found in the input file");
exit(EXIT_FAILURE);
}
logger.Info("Loaded dataset from {}", input_file);
// 2. Setup Experiment & Components
DiffractionExperiment experiment(dataset->experiment);
experiment.BitDepthImage(32).Compression(CompressionAlgorithm::BSHUF_LZ4);
experiment.FilePrefix(output_prefix);
experiment.Mode(DetectorMode::Standard); // Ensure full image analysis
experiment.PixelSigned(true);
experiment.OverwriteExistingFiles(true);
// Configure Indexing
IndexingSettings indexing_settings;
if (use_fft)
indexing_settings.Algorithm(IndexingAlgorithmEnum::FFT);
else
indexing_settings.Algorithm(IndexingAlgorithmEnum::Auto);
indexing_settings.RotationIndexing(rotation_indexing);
if (rotation_indexing_range.has_value())
indexing_settings.RotationIndexingMinAngularRange_deg(rotation_indexing_range.value());
if (refine_beam_center)
indexing_settings.GeomRefinementAlgorithm(GeomRefinementAlgorithmEnum::BeamCenter);
else
indexing_settings.GeomRefinementAlgorithm(GeomRefinementAlgorithmEnum::None);
experiment.ImportIndexingSettings(indexing_settings);
SpotFindingSettings spot_settings;
spot_settings.enable = true;
spot_settings.indexing = true;
spot_settings.high_resolution_limit = d_high;
// Initialize Analysis Components
PixelMask pixel_mask = dataset->pixel_mask;
// If dataset has a mask you wish to use, you might need to load it into pixel_mask here
// e.g. pixel_mask.LoadUserMask(dataset->pixel_mask, ...);
AzimuthalIntegration mapping(experiment, pixel_mask);
IndexerThreadPool indexer_pool(experiment.GetIndexingSettings());
// Statistics collector
JFJochReceiverPlots plots;
plots.Setup(experiment, mapping);
// 3. Setup FileWriter
StartMessage start_message;
experiment.FillMessage(start_message);
start_message.arm_date = dataset->arm_date; // Use original arm date
start_message.az_int_bin_to_q = mapping.GetBinToQ();
start_message.az_int_q_bin_count = mapping.GetQBinCount();
if (mapping.GetAzimuthalBinCount() > 1)
start_message.az_int_bin_to_phi = mapping.GetBinToPhi();
start_message.pixel_mask["default"] = pixel_mask.GetMask(experiment);
start_message.max_spot_count = experiment.GetMaxSpotCount();
std::unique_ptr<FileWriter> writer;
try {
writer = std::make_unique<FileWriter>(start_message);
} catch (const std::exception &e) {
logger.Error("Failed to initialize file writer: {}", e.what());
exit(EXIT_FAILURE);
}
// 4. Processing Setup
int total_images_in_file = reader.GetNumberOfImages();
if (end_image < 0 || end_image > total_images_in_file)
end_image = total_images_in_file;
int images_to_process = end_image - start_image;
if (images_to_process <= 0) {
logger.Warning("No images to process (Start: {}, End: {}, Total: {})", start_image, end_image, total_images_in_file);
return 0;
}
logger.Info("Starting analysis of {} images (range {}-{}) using {} threads",
images_to_process, start_image, end_image, nthreads);
std::mutex reader_mutex;
std::mutex plots_mutex; // Protect shared plot/stats updates if they aren't thread-safe
std::atomic<int> processed_count = 0;
std::atomic<uint64_t> total_uncompressed_bytes = 0;
std::atomic<uint64_t> max_image_number_sent = 0;
// Mimic JFJochReceiver lattice handling (IndexAndRefine handles the logic per thread,
// but we need a central accumulator or use the pool's functionality if IndexAndRefine wraps it)
// Here we will use per-thread IndexAndRefine which uses the shared thread pool.
auto start_time = std::chrono::steady_clock::now();
IndexAndRefine indexer(experiment, &indexer_pool);
auto worker = [&](int thread_id) {
JFJochBitShuffleCompressor compressor(experiment.GetCompressionAlgorithm());
std::vector<uint8_t> compressed_buffer;
compressed_buffer.resize(MaxCompressedSize(experiment.GetCompressionAlgorithm(),
experiment.GetPixelsNum(),
experiment.GetByteDepthImage()));
// Thread-local analysis resources
MXAnalysisWithoutFPGA analysis(experiment, mapping, pixel_mask, indexer);
std::vector<uint8_t> decomp_buffer;
AzimuthalIntegrationProfile profile(mapping);
while (true) {
int current_idx_offset = processed_count.fetch_add(1);
int image_idx = start_image + current_idx_offset;
if (image_idx >= end_image) break;
// Load Image
std::shared_ptr<JFJochReaderImage> img;
{
std::lock_guard<std::mutex> lock(reader_mutex);
try {
img = reader.LoadImage(image_idx);
} catch (const std::exception& e) {
logger.Error("Failed to load image {}: {}", image_idx, e.what());
continue;
}
}
if (!img) continue;
DataMessage msg;
msg.image = img->ImageData().image;
msg.number = img->ImageData().number;
msg.error_pixel_count = img->ImageData().error_pixel_count;
msg.image_collection_efficiency = img->ImageData().image_collection_efficiency;
msg.storage_cell = img->ImageData().storage_cell;
msg.user_data = img->ImageData().user_data;
total_uncompressed_bytes += msg.image.GetUncompressedSize();
auto image_start_time = std::chrono::high_resolution_clock::now();
// Analyze
try {
analysis.Analyze(msg, decomp_buffer, profile, spot_settings);
} catch (const std::exception& e) {
logger.Error("Error analyzing image {}: {}", image_idx, e.what());
continue;
}
auto image_end_time = std::chrono::high_resolution_clock::now();
std::chrono::duration<float> image_duration = image_end_time - image_start_time;
auto size = compressor.Compress(compressed_buffer.data(),
img->Image().data(),
experiment.GetPixelsNum(),
sizeof(int32_t));
msg.image = CompressedImage(compressed_buffer.data(),
size, experiment.GetXPixelsNum(),
experiment.GetYPixelsNum(),
CompressedImageMode::Int32,
experiment.GetCompressionAlgorithm());
// Mimic DataMessage stats from Receiver
msg.processing_time_s = image_duration.count();
msg.original_number = msg.number;
msg.run_number = experiment.GetRunNumber();
msg.run_name = experiment.GetRunName();
// msg.receiver_free_send_buf <- Not relevant for file analysis
// msg.receiver_aq_dev_delay <- Not relevant for file analysis
// Update Plots/Stats (Thread safe update needed)
{
std::lock_guard<std::mutex> lock(plots_mutex);
plots.Add(msg, profile);
// Write Result
writer->Write(msg);
}
// Update max sent tracking
uint64_t current_max = max_image_number_sent.load();
while (static_cast<uint64_t>(msg.number) > current_max) {
if (max_image_number_sent.compare_exchange_weak(current_max, static_cast<uint64_t>(msg.number)))
break;
}
// Progress log
if (current_idx_offset > 0 && current_idx_offset % 100 == 0)
logger.Info("Processed {} / {} images", current_idx_offset, images_to_process);
}
// Finalize per-thread indexing (if any per-thread aggregation is needed, though pool handles most)
// IndexAndRefine doesn't have a per-thread finalize that returns a lattice,
// the main lattice determination is usually done on the aggregated results in the pool or main thread
};
// Launch threads
std::vector<std::future<void>> futures;
futures.reserve(nthreads);
for (int i = 0; i < nthreads; ++i) {
futures.push_back(std::async(std::launch::async, worker, i));
}
// Wait for completion
for (auto &f : futures) {
f.get();
}
auto end_time = std::chrono::steady_clock::now();
// 5. Finalize Statistics and Write EndMessage
EndMessage end_msg;
end_msg.max_image_number = max_image_number_sent;
end_msg.images_collected_count = images_to_process;
end_msg.images_sent_to_write_count = images_to_process;
end_msg.end_date = time_UTC(std::chrono::system_clock::now());
end_msg.run_number = experiment.GetRunNumber();
end_msg.run_name = experiment.GetRunName();
// Gather statistics from plots
end_msg.bkg_estimate = plots.GetBkgEstimate();
end_msg.indexing_rate = plots.GetIndexingRate();
end_msg.az_int_result["dataset"] = plots.GetAzIntProfile();
// Finalize Indexing (Global) to get rotation lattice
// We create a temporary IndexAndRefine to call Finalize() which aggregates pool results
IndexAndRefine global_indexer(experiment, &indexer_pool);
const auto rotation_indexer_ret = global_indexer.Finalize();
if (rotation_indexer_ret.has_value()) {
end_msg.rotation_lattice = rotation_indexer_ret->lattice;
end_msg.rotation_lattice_type = LatticeMessage{
.centering = rotation_indexer_ret->search_result.centering,
.niggli_class = rotation_indexer_ret->search_result.niggli_class,
.crystal_system = rotation_indexer_ret->search_result.system
};
logger.Info("Rotation Indexing found lattice");
}
// --- Optional: run scaling (mosaicity refinement) on accumulated reflections ---
if (run_scaling) {
logger.Info("Running scaling (mosaicity refinement) ...");
ScaleMergeOptions scale_opts;
scale_opts.refine_mosaicity = true;
scale_opts.max_num_iterations = 500;
scale_opts.max_solver_time_s = 240.0; // generous cutoff for now
if (space_group)
scale_opts.space_group = *space_group;
else
scale_opts.space_group = experiment.GetGemmiSpaceGroup();
auto scale_start = std::chrono::steady_clock::now();
auto scale_result = indexer.ScaleRotationData(scale_opts);
auto scale_end = std::chrono::steady_clock::now();
double scale_time = std::chrono::duration<double>(scale_end - scale_start).count();
if (scale_result) {
logger.Info("Scaling completed in {:.2f} s (GoF² = {:.4f}, {} unique reflections, {} images)",
scale_time, scale_result->gof2,
scale_result->merged.size(), scale_result->image_ids.size());
// Write scaled.hkl (h k l I sigma)
{
const std::string hkl_path = output_prefix + "_scaled.hkl";
std::ofstream hkl_file(hkl_path);
if (!hkl_file) {
logger.Error("Cannot open {} for writing", hkl_path);
} else {
hkl_file << "# h k l I sigma\n";
for (const auto& r : scale_result->merged) {
hkl_file << r.h << " " << r.k << " " << r.l << " "
<< r.I << " " << r.sigma << "\n";
}
hkl_file.close();
logger.Info("Wrote {} reflections to {}", scale_result->merged.size(), hkl_path);
}
}
// Write image.dat (image_id mosaicity_deg K)
{
const std::string img_path = output_prefix + "_image.dat";
std::ofstream img_file(img_path);
if (!img_file) {
logger.Error("Cannot open {} for writing", img_path);
} else {
img_file << "# image_id mosaicity_deg K\n";
for (size_t i = 0; i < scale_result->image_ids.size(); ++i) {
img_file << scale_result->image_ids[i] << " "
<< scale_result->mosaicity_deg[i] << " "
<< scale_result->image_scale_g[i] << "\n";
}
img_file.close();
logger.Info("Wrote {} image records to {}", scale_result->image_ids.size(), img_path);
}
}
// --- French-Wilson: convert I → F ---
{
FrenchWilsonOptions fw_opts;
fw_opts.acentric = true; // typical for MX
fw_opts.num_shells = 20;
auto fw = FrenchWilson(scale_result->merged, fw_opts);
const std::string fw_path = output_prefix + "_amplitudes.hkl";
std::ofstream fw_file(fw_path);
if (!fw_file) {
logger.Error("Cannot open {} for writing", fw_path);
} else {
fw_file << "# h k l F sigmaF I_fw sigmaI\n";
for (const auto& r : fw) {
fw_file << r.h << " " << r.k << " " << r.l << " "
<< r.F << " " << r.sigmaF << " "
<< r.I << " " << r.sigmaI << "\n";
}
fw_file.close();
logger.Info("French-Wilson: wrote {} amplitudes to {}", fw.size(), fw_path);
}
}
} else {
logger.Warning("Scaling skipped — too few reflections accumulated (need >= 20)");
logger.Info("Scaling wall-clock time: {:.2f} s", scale_time);
}
}
// Write End Message
writer->WriteHDF5(end_msg);
auto stats = writer->Finalize();
// 6. Report Statistics to Console
double processing_time = std::chrono::duration<double>(end_time - start_time).count();
double throughput_MBs = static_cast<double>(total_uncompressed_bytes) / (processing_time * 1e6);
double frame_rate = static_cast<double>(images_to_process) / processing_time;
logger.Info("");
logger.Info("Processing time: {:.2f} s", processing_time);
logger.Info("Frame rate: {:.2f} Hz", frame_rate);
logger.Info("Total throughput:{:.2f} MB/s", throughput_MBs);
logger.Info("Images written: {}", stats.size());
// Print extended stats similar to Receiver
if (!end_msg.indexing_rate.has_value()) {
logger.Info("Indexing rate: {:.2f}%", end_msg.indexing_rate.value() * 100.0);
}
if (rotation_indexer_ret.has_value()) {
auto uc = rotation_indexer_ret->lattice.GetUnitCell();
logger.Info("Lattice: a={:.2f} b={:.2f} c={:.2f} alpha={:.2f} beta={:.2f} gamma={:.2f}",
uc.a, uc.b, uc.c, uc.alpha, uc.beta, uc.gamma);
}
if (stats.empty()) {
logger.Info("Analysis finished with warnings (no files written)");
exit(EXIT_FAILURE);
} else {
logger.Info("Analysis successfully finished");
exit(EXIT_SUCCESS);
}
}