Files
Jungfraujoch/tools/jfjoch_process.cpp
T

819 lines
33 KiB
C++

// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
// SPDX-License-Identifier: GPL-3.0-only
#include <iostream>
#include <vector>
#include <string>
#include <unistd.h>
#include <future>
#include <mutex>
#include <atomic>
#include <chrono>
#include <fstream>
#include <sstream>
#include <getopt.h>
#include "../reader/JFJochHDF5Reader.h"
#include "../common/Logger.h"
#include "../common/DiffractionExperiment.h"
#include "../common/PixelMask.h"
#include "../common/AzimuthalIntegrationMapping.h"
#include "../common/time_utc.h"
#include "../common/print_license.h"
#include "../image_analysis/MXAnalysisWithoutFPGA.h"
#include "../image_analysis/indexing/IndexerFactory.h"
#include "../writer/FileWriter.h"
#include "../image_analysis/IndexAndRefine.h"
#include "../receiver/JFJochReceiverPlots.h"
#include "../compression/JFJochCompressor.h"
#include "../image_analysis/LoadFCalcFromMtz.h"
#include "../image_analysis/scale_merge/Merge.h"
#include "../image_analysis/scale_merge/SearchSpaceGroup.h"
#include "../image_analysis/WriteReflections.h"
#include "../image_analysis/UpdateReflectionResolution.h"
void print_usage() {
std::cout << "Usage ./jfjoch_analysis {<options>} <input.h5>" << std::endl;
std::cout << "Options:" << std::endl;
std::cout << " -o, --output-prefix <txt> Output file prefix (default: output)" << std::endl;
std::cout << " -N, --threads <num> Number of threads (default: 1)" << std::endl;
std::cout << " -s, --start-image <num> Start image number (default: 0)" << std::endl;
std::cout << " -e, --end-image <num> End image number (default: all)" << std::endl;
std::cout << " -t, --stride <num> Image stride (default: 1)" << std::endl;
std::cout << " -v, --verbose Verbose output" << std::endl;
std::cout << std::endl;
std::cout << " Spot finding" << std::endl;
std::cout << " --spot-sigma <num> Noise sigma level for spot finding (default: 3.0)" << std::endl;
std::cout << " --spot-threshold <num> Photon count threshold for spot finding (default: 10)" << std::endl;
std::cout << " --spot-high-resolution <num> High resolution limit for spot finding (default: 1.5)" << std::endl;
std::cout << " --max-spots <num> Max spot count (default: 250)" << std::endl;
std::cout << std::endl;
std::cout << " Indexing" << std::endl;
std::cout << " -R, --rotation-indexing[=num] Rotation indexing (optional: min angular range deg)" << std::endl;
std::cout << " -X, --indexing-algorithm <txt> Indexing algorithm (FFBIDX|FFT|FFTW|Auto|None)" << std::endl;
std::cout << " -S, --space-group <num> Space group number - used for both indexing and scaling" << std::endl;
std::cout << " -C, --unit-cell <cell> Fix reference unit cell: \"a,b,c,alpha,beta,gamma\"" << std::endl;
std::cout << " -r, --refine <txt> Geometry refinement algorithm (none|orientation|beam_and_lattice)" << std::endl;
std::cout << std::endl;
std::cout << " Scaling and merging" << std::endl;
std::cout << " -M, --scale-merge Scale and merge (refine mosaicity) and write scaled.hkl + image.dat" << std::endl;
std::cout << " -P, --partiality <txt> Partiality refinement fixed|rot|unity (default: fixed)" << std::endl;
std::cout << " -A, --anomalous Anomalous mode (don't merge Friedel pairs)" << std::endl;
std::cout << " -B, --refine-bfactor Refine per image B-factor" << std::endl;
std::cout << " -w, --wedge[=num] Refine image wedge during scaling with starting wedge value" << std::endl;
std::cout << " --scaling-high-resolution <num> High resolution limit for spot finding (default: no limit)" << std::endl;
std::cout << " --min-partiality <num> Minimum partiality to accept reflection (default: 0.02)" << std::endl;
std::cout << " --min-image-cc <num> Per-image CC limit in percent (default: no limit)" << std::endl;
std::cout << " --scaling-iterations <num> Number of scaling iterations with no reference data (default: 3)" << std::endl;
std::cout << " --scaling-output <txt> Output format for scaling results mtz|cif|txt (default: mtz)" << std::endl;
std::cout << " -z, --reference-mtz <file> Reference MTZ file" << std::endl;
}
enum {
OPT_SPOT_SIGMA = 1000,
OPT_SPOT_THRESHOLD,
OPT_SPOT_RESOLUTION,
OPT_MAX_SPOTS,
OPT_MIN_PARTIALITY,
OPT_MIN_IMAGE_CC,
OPT_SCALING_ITERATIONS,
OPT_SCALING_HIGH_RESOLUTION,
OPT_SCALING_OUTPUT
};
static option long_options[] = {
{"verbose", no_argument, nullptr, 'v'},
{"output-prefix", required_argument, nullptr, 'o'},
{"threads", required_argument, nullptr, 'N'},
{"start-image", required_argument, nullptr, 's'},
{"end-image", required_argument, nullptr, 'e'},
{"stride", required_argument, nullptr, 't'},
{"rotation-indexing", optional_argument, nullptr, 'R'},
{"indexing-algorithm", required_argument, nullptr, 'X'},
{"unit-cell", required_argument, nullptr, 'C'},
{"reference-mtz", required_argument, nullptr, 'z'},
{"space-group", required_argument, nullptr, 'S'},
{"partiality", required_argument, nullptr, 'P'},
{"anomalous", no_argument, nullptr, 'A'},
{"refine-bfactor", no_argument, nullptr, 'B'},
{"wedge", optional_argument, nullptr, 'w'},
{"scale-merge", no_argument, nullptr, 'M'},
{"refine", required_argument, nullptr, 'r'},
{"spot-sigma", required_argument, nullptr, OPT_SPOT_SIGMA},
{"spot-threshold", required_argument, nullptr, OPT_SPOT_THRESHOLD},
{"spot-high-resolution", required_argument, nullptr, OPT_SPOT_RESOLUTION},
{"max-spots", required_argument, nullptr, OPT_MAX_SPOTS},
{"min-partiality", required_argument, nullptr, OPT_MIN_PARTIALITY},
{"min-image-cc", required_argument, nullptr, OPT_MIN_IMAGE_CC},
{"scaling-iterations", required_argument, nullptr, OPT_SCALING_ITERATIONS},
{"scaling-high-resolution", required_argument, nullptr, OPT_SCALING_HIGH_RESOLUTION},
{"scaling-output", required_argument, nullptr, OPT_SCALING_OUTPUT},
{nullptr, 0, nullptr, 0}
};
void trim_in_place(std::string &t) {
size_t b = 0;
while (b < t.size() && std::isspace(static_cast<unsigned char>(t[b]))) b++;
size_t e = t.size();
while (e > b && std::isspace(static_cast<unsigned char>(t[e - 1]))) e--;
t = t.substr(b, e - b);
};
std::optional<UnitCell> parse_unit_cell_arg(const char *arg) {
if (!arg)
return std::nullopt;
std::string s(arg);
trim_in_place(s);
if (s.size() >= 2 && ((s.front() == '"' && s.back() == '"') || (s.front() == '\'' && s.back() == '\''))) {
s = s.substr(1, s.size() - 2);
trim_in_place(s);
}
std::vector<std::string> parts;
parts.reserve(6);
size_t start = 0;
while (true) {
size_t pos = s.find(',', start);
if (pos == std::string::npos) {
parts.push_back(s.substr(start));
break;
}
parts.push_back(s.substr(start, pos - start));
start = pos + 1;
}
if (parts.size() != 6)
return std::nullopt;
auto parse_float_strict = [](const std::string &t, float &out) -> bool {
try {
size_t idx = 0;
out = std::stof(t, &idx);
return idx == t.size();
} catch (...) {
return false;
}
};
UnitCell uc{};
if (!parse_float_strict(parts[0], uc.a)) return std::nullopt;
if (!parse_float_strict(parts[1], uc.b)) return std::nullopt;
if (!parse_float_strict(parts[2], uc.c)) return std::nullopt;
if (!parse_float_strict(parts[3], uc.alpha)) return std::nullopt;
if (!parse_float_strict(parts[4], uc.beta)) return std::nullopt;
if (!parse_float_strict(parts[5], uc.gamma)) return std::nullopt;
return uc;
};
int main(int argc, char **argv) {
for (int i = 0; i < argc; i++) {
std::cout << argv[i] << " ";
}
std::cout << std::endl << std::endl;
RegisterHDF5Filter();
print_license("jfjoch_analysis");
Logger logger("jfjoch_analysis");
std::string input_file;
std::string output_prefix = "output";
int nthreads = 1;
int start_image = 0;
int end_image = -1; // -1 indicates process until end
int image_stride = 1;
bool verbose = false;
bool rotation_indexing = false;
std::optional<float> rotation_indexing_range;
bool run_scaling = false;
bool anomalous_mode = false;
std::optional<int64_t> space_group_number;
std::optional<UnitCell> fixed_reference_unit_cell;
std::optional<int64_t> max_spot_count_override;
float sigma_spot_finding = 3.0;
int64_t photon_count_threshold_spot_finding = 10;
bool refine_bfactor = false;
bool refine_wedge = false;
std::optional<double> wedge_for_scaling;
std::string ref_mtz;
double min_partiality = 0.02;
double min_image_cc = 0.0;
int64_t scaling_iter = 3;
IndexingAlgorithmEnum indexing_algorithm = IndexingAlgorithmEnum::Auto;
GeomRefinementAlgorithmEnum refinement_algorithm = GeomRefinementAlgorithmEnum::BeamCenter;
IntensityFormat intensity_format = IntensityFormat::MTZ;
PartialityModel partiality_model = PartialityModel::Fixed;
float d_min_spot_finding = 1.5;
std::optional<float> d_min_scale_merge;
if (argc == 1) {
print_usage();
exit(EXIT_FAILURE);
}
int opt;
int option_index = 0;
const char *short_opts = "vo:N:s:e:t:R::X:C:z:FABw::S:MP:r:";
while ((opt = getopt_long(argc, argv, short_opts, long_options, &option_index)) != -1) {
switch (opt) {
case 'o':
output_prefix = optarg;
break;
case 'v':
verbose = true;
break;
case 'N':
nthreads = atoi(optarg);
break;
case 's':
start_image = atoi(optarg);
break;
case 'e':
end_image = atoi(optarg);
break;
case 't':
image_stride = atoi(optarg);
break;
case 'R':
rotation_indexing = true;
if (optarg) rotation_indexing_range = atof(optarg);
break;
case 'X': {
std::string alg = optarg ? optarg : "";
std::transform(alg.begin(), alg.end(), alg.begin(),
[](unsigned char c) { return static_cast<char>(std::tolower(c)); });
if (alg == "ffbidx")
indexing_algorithm = IndexingAlgorithmEnum::FFBIDX;
else if (alg == "fft")
indexing_algorithm = IndexingAlgorithmEnum::FFT;
else if (alg == "fftw")
indexing_algorithm = IndexingAlgorithmEnum::FFTW;
else if (alg == "auto")
indexing_algorithm = IndexingAlgorithmEnum::Auto;
else if (alg == "none")
indexing_algorithm = IndexingAlgorithmEnum::None;
else {
logger.Error("Invalid indexing algorithm: {}", alg);
print_usage();
exit(EXIT_FAILURE);
}
break;
}
case 'r': {
std::string alg = optarg ? optarg : "";
std::transform(alg.begin(), alg.end(), alg.begin(),
[](unsigned char c) { return static_cast<char>(std::tolower(c)); });
if (alg == "none")
refinement_algorithm = GeomRefinementAlgorithmEnum::None;
else if (alg == "beam_and_lattice")
refinement_algorithm = GeomRefinementAlgorithmEnum::BeamCenter;
else if (alg == "orientation")
refinement_algorithm = GeomRefinementAlgorithmEnum::OrientationOnly;
else {
logger.Error("Invalid geom refinement algorithm: {}", alg);
print_usage();
exit(EXIT_FAILURE);
}
break;
}
case 'C': {
auto uc = parse_unit_cell_arg(optarg);
if (!uc.has_value()) {
logger.Error(
"Invalid unit cell. Expected: \"a,b,c,alpha,beta,gamma\" (6 floats, comma-separated, no spaces). Got: {}",
optarg ? optarg : "<null>");
print_usage();
exit(EXIT_FAILURE);
}
fixed_reference_unit_cell = uc;
logger.Info(
"Fixed reference unit cell set: a={:.3f} b={:.3f} c={:.3f} alpha={:.3f} beta={:.3f} gamma={:.3f}",
uc->a, uc->b, uc->c, uc->alpha, uc->beta, uc->gamma);
break;
}
case 'z':
ref_mtz = optarg;
break;
case 'F':
indexing_algorithm = IndexingAlgorithmEnum::FFT;
break;
case 'A':
anomalous_mode = true;
break;
case 'B':
refine_bfactor = true;
break;
case 'w':
refine_wedge = true;
if (optarg)
wedge_for_scaling = std::stod(optarg);
break;
case 'S':
space_group_number = atoi(optarg);
break;
case 'P':
if (strcmp(optarg, "unity") == 0)
partiality_model = PartialityModel::Unity;
else if (strcmp(optarg, "fixed") == 0)
partiality_model = PartialityModel::Fixed;
else if (strcmp(optarg, "rot") == 0)
partiality_model = PartialityModel::Rotation;
else if (strcmp(optarg, "postref") == 0)
partiality_model = PartialityModel::Postrefinement;
else {
logger.Error("Invalid partiality mode: {}", optarg);
print_usage();
exit(EXIT_FAILURE);
}
break;
case OPT_SPOT_SIGMA:
sigma_spot_finding = atof(optarg);
logger.Info("Noise threshold level for spot finding set to {:.2f} sigma", sigma_spot_finding);
break;
case OPT_SPOT_THRESHOLD:
photon_count_threshold_spot_finding = atoi(optarg);
logger.Info("Photon-count threshold level for spot finding set to {:d}",
photon_count_threshold_spot_finding);
break;
case OPT_SPOT_RESOLUTION:
d_min_spot_finding = atof(optarg);
logger.Info("High resolution limit for spot finding set to {:.2f} A", d_min_spot_finding);
break;
case OPT_MAX_SPOTS:
max_spot_count_override = atoll(optarg);
logger.Info("Max spot count overridden to {}", max_spot_count_override.value());
break;
case 'M':
run_scaling = true;
break;
case OPT_MIN_PARTIALITY:
min_partiality = std::stod(optarg);
break;
case OPT_MIN_IMAGE_CC:
min_image_cc = std::stod(optarg);
break;
case OPT_SCALING_HIGH_RESOLUTION:
d_min_scale_merge = atof(optarg);
break;
case OPT_SCALING_OUTPUT:
if (strcmp(optarg, "mtz") == 0) {
intensity_format = IntensityFormat::MTZ;
} else if (strcmp(optarg, "cif") == 0) {
intensity_format = IntensityFormat::mmCIF;
} else if (strcmp(optarg, "txt") == 0) {
intensity_format = IntensityFormat::Text;
} else {
logger.Error("Invalid intensity format: {}", optarg);
exit(EXIT_FAILURE);
}
break;
case OPT_SCALING_ITERATIONS:
scaling_iter = atoi(optarg);
if (scaling_iter <= 0) {
logger.Error("Invalid scaling iteration count: {}", scaling_iter);
exit(EXIT_FAILURE);
}
break;
default:
print_usage();
exit(EXIT_FAILURE);
}
}
if (optind != argc - 1) {
logger.Error("Input file not specified");
print_usage();
exit(EXIT_FAILURE);
}
input_file = argv[optind];
logger.Verbose(verbose);
// Validate space group number early
const gemmi::SpaceGroup *space_group = nullptr;
if (space_group_number.has_value()) {
space_group = gemmi::find_spacegroup_by_number(space_group_number.value());
if (!space_group) {
logger.Error("Unknown space group number {}", space_group_number.value());
exit(EXIT_FAILURE);
}
logger.Info("Using space group {} (number {})", space_group->hm, space_group_number.value());
}
// 1. Read Input File
JFJochHDF5Reader reader;
try {
reader.ReadFile(input_file);
} catch (const std::exception &e) {
logger.Error("Error reading input file: {}", e.what());
exit(EXIT_FAILURE);
}
const auto dataset = reader.GetDataset();
if (!dataset) {
logger.Error("No experiment dataset found in the input file");
exit(EXIT_FAILURE);
}
logger.Info("Loaded dataset from {}", input_file);
std::vector<MergedReflection> reference_data;
if (!ref_mtz.empty()) {
reference_data = LoadFCalcFromMtz(ref_mtz);
logger.Info("Loaded {} reflections from {} MTZ file", reference_data.size(), ref_mtz);
}
uint64_t total_images_in_file = reader.GetNumberOfImages();
if (end_image < 0 || end_image > total_images_in_file)
end_image = total_images_in_file;
if (image_stride < 0) {
logger.Error("Image stride cannot be negative");
exit(EXIT_FAILURE);
}
int images_to_process = (end_image - start_image) / image_stride;
if (images_to_process <= 0) {
logger.Warning("No images to process (Start: {}, End: {} Stride: {}, Total: {})", start_image, end_image,
image_stride, total_images_in_file);
return 0;
}
logger.Info("Starting analysis of {} images (range {}-{}) using {} threads",
images_to_process, start_image, end_image, nthreads);
// 2. Setup Experiment & Components
DiffractionExperiment experiment(dataset->experiment);
experiment.BitDepthImage(32).Compression(CompressionAlgorithm::BSHUF_LZ4);
experiment.FilePrefix(output_prefix);
experiment.Mode(DetectorMode::Standard); // Ensure full image analysis
experiment.PixelSigned(true);
experiment.OverwriteExistingFiles(true);
experiment.PolarizationFactor(0.99);
experiment.SetFileWriterFormat(FileWriterFormat::NXmxLegacy);
experiment.SpaceGroupNumber(space_group_number);
experiment.ImagesPerTrigger(images_to_process);
experiment.NumTriggers(1);
if (fixed_reference_unit_cell.has_value())
experiment.SetUnitCell(*fixed_reference_unit_cell);
if (max_spot_count_override.has_value()) {
experiment.MaxSpotCount(max_spot_count_override.value());
logger.Info("Max spot count overridden to {}", max_spot_count_override.value());
}
// Configure Indexing
IndexingSettings indexing_settings;
indexing_settings.Algorithm(indexing_algorithm);
indexing_settings.RotationIndexing(rotation_indexing);
if (rotation_indexing_range.has_value())
indexing_settings.RotationIndexingMinAngularRange_deg(rotation_indexing_range.value());
indexing_settings.GeomRefinementAlgorithm(refinement_algorithm);
experiment.ImportIndexingSettings(indexing_settings);
ScalingSettings scaling_settings;
scaling_settings.SetPartialityModel(partiality_model);
if (d_min_scale_merge)
scaling_settings.HighResolutionLimit_A(d_min_scale_merge.value());
scaling_settings.MergeFriedel(!anomalous_mode);
scaling_settings.RefineB(refine_bfactor);
scaling_settings.RefineRotationWedge(refine_wedge);
if (wedge_for_scaling.has_value())
scaling_settings.RotationWedgeForScaling(wedge_for_scaling);
scaling_settings.MinPartiality(min_partiality);
scaling_settings.MinCCForImage(min_image_cc);
scaling_settings.FileFormat(intensity_format);
experiment.ImportScalingSettings(scaling_settings);
SpotFindingSettings spot_settings;
spot_settings.enable = true;
spot_settings.indexing = true;
spot_settings.high_resolution_limit = d_min_spot_finding;
spot_settings.signal_to_noise_threshold = sigma_spot_finding;
spot_settings.photon_count_threshold = photon_count_threshold_spot_finding;
if (d_min_spot_finding > 0.0f)
spot_settings.high_resolution_limit = d_min_spot_finding;
// Initialize Analysis Components
PixelMask pixel_mask = dataset->pixel_mask;
// If dataset has a mask you wish to use, you might need to load it into pixel_mask here
// e.g. pixel_mask.LoadUserMask(dataset->pixel_mask, ...);
AzimuthalIntegrationMapping mapping(experiment, pixel_mask);
IndexerThreadPool indexer_pool(experiment.GetIndexingSettings());
// Statistics collector
JFJochReceiverPlots plots;
plots.Setup(experiment, mapping);
StartMessage start_message;
experiment.FillMessage(start_message);
start_message.arm_date = dataset->arm_date; // Use original arm date
start_message.az_int_bin_to_q = mapping.GetBinToQ();
start_message.az_int_q_bin_count = mapping.GetQBinCount();
if (mapping.GetAzimuthalBinCount() > 1)
start_message.az_int_bin_to_phi = mapping.GetBinToPhi();
start_message.pixel_mask["default"] = pixel_mask.GetMask(experiment);
start_message.max_spot_count = experiment.GetMaxSpotCount();
start_message.master_suffix = "process";
start_message.file_format = FileWriterFormat::NXmxIntegrated;
start_message.write_master_file = true;
start_message.write_images = false;
start_message.hdf5_source_data = reader.GetHDF5DataSource(start_image, images_to_process);
std::unique_ptr<FileWriter> writer;
try {
if (!output_prefix.empty())
writer = std::make_unique<FileWriter>(start_message);
} catch (const std::exception &e) {
logger.Error("Failed to initialize file writer: {}", e.what());
exit(EXIT_FAILURE);
}
std::atomic<int> processed_count = 0;
std::atomic<uint64_t> total_uncompressed_bytes = 0;
// Mimic JFJochReceiver lattice handling (IndexAndRefine handles the logic per thread,
// but we need a central accumulator or use the pool's functionality if IndexAndRefine wraps it)
// Here we will use per-thread IndexAndRefine which uses the shared thread pool.
auto start_time = std::chrono::steady_clock::now();
IndexAndRefine indexer(experiment, &indexer_pool);
if (!reference_data.empty())
indexer.ReferenceIntensities(reference_data);
std::atomic<int> finished_count = 0;
auto worker = [&](int thread_id) {
// Thread-local analysis resources
MXAnalysisWithoutFPGA analysis(experiment, mapping, pixel_mask, indexer);
AzimuthalIntegrationProfile profile(mapping);
while (true) {
int current_idx_offset = processed_count.fetch_add(image_stride);
int image_idx = start_image + current_idx_offset;
if (image_idx >= end_image) break;
// Load Image
std::shared_ptr<JFJochReaderRawImage> img;
try {
img = reader.GetRawImage(image_idx);
} catch (const std::exception &e) {
logger.Error("Failed to load image {}: {}", image_idx, e.what());
continue;
}
if (!img) continue;
DataMessage msg{};
msg.image = img->image;
msg.number = current_idx_offset;
msg.original_number = image_idx;
if (dataset->efficiency.size() > image_idx) msg.image_collection_efficiency = dataset->efficiency[image_idx];
total_uncompressed_bytes += msg.image.GetUncompressedSize();
auto image_start_time = std::chrono::high_resolution_clock::now();
// Analyze
try {
analysis.Analyze(msg, profile, spot_settings);
} catch (const std::exception &e) {
logger.Error("Error analyzing image {}: {}", image_idx, e.what());
continue;
}
auto image_end_time = std::chrono::high_resolution_clock::now();
std::chrono::duration<float> image_duration = image_end_time - image_start_time;
msg.processing_time_s = image_duration.count();
msg.run_number = experiment.GetRunNumber();
msg.run_name = experiment.GetRunName();
plots.Add(msg, profile);
// Write Result
if (writer)
writer->Write(msg);
finished_count.fetch_add(1);
// Progress log
if (current_idx_offset > 0 && current_idx_offset % 100 == 0) {
std::optional<float> indexing_rate = plots.GetIndexingRate();
const auto now = std::chrono::steady_clock::now();
const double elapsed_s = std::chrono::duration<double>(now - start_time).count();
const int processed_images = finished_count.load();
const double frame_rate_hz = (elapsed_s > 0.0) ? (processed_images / elapsed_s) : 0.0;
if (indexing_rate.has_value()) {
logger.Info("Processed {} / {} images ({:.2f} Hz, indexing rate {:.1f}%)",
current_idx_offset, images_to_process,
frame_rate_hz,
indexing_rate.value() * 100.0f);
} else {
logger.Info("Processed {} / {} images ({:.2f} Hz, indexing rate N/A)",
current_idx_offset, images_to_process,
frame_rate_hz);
}
}
}
// Finalize per-thread indexing (if any per-thread aggregation is needed, though pool handles most)
// IndexAndRefine doesn't have a per-thread finalize that returns a lattice,
// the main lattice determination is usually done on the aggregated results in the pool or main thread
};
// Launch threads
std::vector<std::future<void> > futures;
futures.reserve(nthreads);
for (int i = 0; i < nthreads; ++i) {
futures.push_back(std::async(std::launch::async, worker, i));
}
// Wait for completion
for (auto &f: futures) {
f.get();
}
auto end_time = std::chrono::steady_clock::now();
// 5. Finalize Statistics and Write EndMessage
EndMessage end_msg;
end_msg.max_image_number = images_to_process;
end_msg.images_collected_count = images_to_process;
end_msg.images_sent_to_write_count = images_to_process;
end_msg.end_date = time_UTC(std::chrono::system_clock::now());
end_msg.run_number = experiment.GetRunNumber();
end_msg.run_name = experiment.GetRunName();
// Gather statistics from plots
end_msg.bkg_estimate = plots.GetBkgEstimate();
end_msg.indexing_rate = plots.GetIndexingRate();
end_msg.az_int_result["dataset"] = plots.GetAzIntProfile();
// Finalize Indexing (Global) to get rotation lattice
// We create a temporary IndexAndRefine to call Finalize() which aggregates pool results
const auto rotation_indexer_ret = indexer.Finalize();
if (rotation_indexer_ret.has_value()) {
end_msg.rotation_lattice = rotation_indexer_ret->lattice;
end_msg.rotation_lattice_type = LatticeMessage{
.centering = rotation_indexer_ret->search_result.centering,
.niggli_class = rotation_indexer_ret->search_result.niggli_class,
.crystal_system = rotation_indexer_ret->search_result.system
};
logger.Info("Rotation Indexing found lattice");
}
std::vector<uint8_t> merging_mask_uc(images_to_process, 1);
auto consensus_start_time = std::chrono::steady_clock::now();
const auto consensus_cell = indexer.GetConsensusUnitCell();
auto consensus_end_time = std::chrono::steady_clock::now();
auto consensus_duration = std::chrono::duration<double>(consensus_end_time - consensus_start_time).count();
if (consensus_cell) {
logger.Info("Consensus unit cell found in {:.2f} ms", consensus_duration * 1e3);
logger.Info("UC: a={:.2f} b={:.2f} c={:.2f} alpha={:.2f} beta={:.2f} gamma={:.2f}",
consensus_cell->a, consensus_cell->b, consensus_cell->c,
consensus_cell->alpha, consensus_cell->beta, consensus_cell->gamma);
} else
logger.Info("Consensus unit cell not found - calculation tool {:.2f} ms", consensus_duration * 1e3);
end_msg.unit_cell = consensus_cell;
if (run_scaling || !reference_data.empty()) {
logger.Info("Running scaling (mosaicity refinement) ...");
if (reference_data.empty()) {
// If reference data are given, there is live scaling (no need to go again)
ScalingResult scale_result(0);
auto scale_start = std::chrono::steady_clock::now();
for (int i = 0; i < scaling_iter; i++) {
auto iter_start = std::chrono::steady_clock::now();
auto merge_result = MergeAll(experiment, indexer.GetIntegrationOutcome(), false);
scale_result = indexer.ScaleAllImages(merge_result);
scale_result.SaveToFile(output_prefix + "_iter" + std::to_string(i) + "_scale.dat");
auto iter_end = std::chrono::steady_clock::now();
double iter_time = std::chrono::duration<double>(iter_end - iter_start).count();
logger.Info("Scaling iteration {} took {:.3f} seconds", i, iter_time);
}
end_msg.image_scale_factor = scale_result.image_scale_g;
end_msg.image_scale_cc = scale_result.image_cc;
end_msg.image_scale_mosaicity = scale_result.mosaicity_deg;
end_msg.image_scale_b_factor = scale_result.image_bfactor_Ang2;
auto scale_end = std::chrono::steady_clock::now();
double scale_time = std::chrono::duration<double>(scale_end - scale_start).count();
logger.Info("Scaling completed in {:.2f} s", scale_time);
}
auto merge_start = std::chrono::steady_clock::now();
MergeOnTheFly merge_engine(experiment);
if (consensus_cell.has_value())
merge_engine.ReferenceCell(*consensus_cell);
for (auto &i : indexer.GetIntegrationOutcome())
merge_engine.AddImage(i);
auto merged_reflections = merge_engine.ExportReflections();
auto merged_statistics = merge_engine.MergeStats(merged_reflections, indexer.GetIntegrationOutcome(), reference_data);
auto merge_end = std::chrono::steady_clock::now();
double merge_time = std::chrono::duration<double>(merge_end - merge_start).count();
logger.Info("Merge completed in {:.2f} s ({} unique reflections)", merge_time, merged_reflections.size());
if (!experiment.GetGemmiSpaceGroup().has_value()) {
logger.Info("Searching for space group from P1-merged reflections ...");
SearchSpaceGroupOptions sg_opts;
sg_opts.crystal_system.reset();
sg_opts.centering = '\0';
sg_opts.merge_friedel = experiment.GetScalingSettings().GetMergeFriedel();
sg_opts.d_min_limit_A = experiment.GetScalingSettings().GetHighResolutionLimit_A().value_or(0.0);
sg_opts.min_i_over_sigma = 0.0;
sg_opts.min_operator_cc = 0.80;
sg_opts.min_pairs_per_operator = 20;
sg_opts.min_total_compared = 100;
sg_opts.test_systematic_absences = true;
const auto sg_search = SearchSpaceGroup(merged_reflections, sg_opts);
std::cout << std::endl << SearchSpaceGroupResultToText(sg_search) << std::endl << std::endl;
}
std::cout << merged_statistics;
if (consensus_cell && !output_prefix.empty())
WriteReflections(merged_reflections, *consensus_cell, experiment, output_prefix);
}
// Write End Message
if (writer) {
writer->WriteHDF5(end_msg);
auto stats = writer->Finalize();
}
// 6. Report Statistics to Console
double processing_time = std::chrono::duration<double>(end_time - start_time).count();
double throughput_MBs = static_cast<double>(total_uncompressed_bytes) / (processing_time * 1e6);
double frame_rate = static_cast<double>(images_to_process) / processing_time;
std::cout << fmt::format("Processing time: {:.2f} s", processing_time) << std::endl;
std::cout << fmt::format("Frame rate: {:.2f} Hz", frame_rate) << std::endl;
std::cout << fmt::format("Total throughput:{:.2f} MB/s", throughput_MBs) << std::endl;
// Print extended stats similar to Receiver
if (end_msg.indexing_rate.has_value()) {
std::cout << fmt::format("Indexing rate: {:.2f}%", end_msg.indexing_rate.value() * 100.0) << std::endl;
}
auto image_mean_time = plots.GetMeanProcessingTime();
std::cout << fmt::format(
"Per-image time: (mean; milliseconds): decompress {:.2f} preprocess {:.2f} azint {:.2f} spot finding {:.2f} indexing {:.2f} refinement {:.2f} indexing analysis {:.2f} prediction {:.2f} integration {:.2f} scaling {:.2f} total {:.2f}",
image_mean_time.compression * 1e3,
image_mean_time.preprocessing * 1e3,
image_mean_time.azint * 1e3,
image_mean_time.spot_finding * 1e3,
image_mean_time.indexing * 1e3,
image_mean_time.refinement * 1e3,
image_mean_time.indexing_analysis * 1e3,
image_mean_time.bragg_prediction * 1e3,
image_mean_time.integration * 1e3,
image_mean_time.image_scale * 1e3,
image_mean_time.processing * 1e3)
<< std::endl;
}