348 lines
13 KiB
C++
348 lines
13 KiB
C++
// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
|
|
// SPDX-License-Identifier: GPL-3.0-only
|
|
|
|
#include <iostream>
|
|
#include <vector>
|
|
#include <string>
|
|
#include <unistd.h>
|
|
#include <future>
|
|
#include <mutex>
|
|
#include <atomic>
|
|
#include <chrono>
|
|
|
|
#include "../reader/JFJochHDF5Reader.h"
|
|
#include "../common/Logger.h"
|
|
#include "../common/DiffractionExperiment.h"
|
|
#include "../common/PixelMask.h"
|
|
#include "../common/AzimuthalIntegration.h"
|
|
#include "../common/time_utc.h"
|
|
#include "../common/print_license.h"
|
|
#include "../image_analysis/MXAnalysisWithoutFPGA.h"
|
|
#include "../image_analysis/indexing/IndexerFactory.h"
|
|
#include "../writer/FileWriter.h"
|
|
#include "../image_analysis/IndexAndRefine.h"
|
|
#include "../receiver/JFJochReceiverPlots.h"
|
|
|
|
void print_usage(Logger &logger) {
|
|
logger.Info("Usage ./jfjoch_analysis {<options>} <input.h5>");
|
|
logger.Info("Options:");
|
|
logger.Info(" -o<txt> Output file prefix (default: output)");
|
|
logger.Info(" -N<num> Number of threads (default: 1)");
|
|
logger.Info(" -s<num> Start image number (default: 0)");
|
|
logger.Info(" -e<num> End image number (default: all)");
|
|
logger.Info(" -v Verbose output");
|
|
}
|
|
|
|
int main(int argc, char **argv) {
|
|
print_license("jfjoch_analysis");
|
|
|
|
Logger logger("jfjoch_analysis");
|
|
|
|
std::string input_file;
|
|
std::string output_prefix = "output";
|
|
int nthreads = 1;
|
|
int start_image = 0;
|
|
int end_image = -1; // -1 indicates process until end
|
|
bool verbose = false;
|
|
bool rotation_index = false;
|
|
|
|
if (argc == 1) {
|
|
print_usage(logger);
|
|
exit(EXIT_FAILURE);
|
|
}
|
|
|
|
int opt;
|
|
while ((opt = getopt(argc, argv, "o:N:s:e:vR")) != -1) {
|
|
switch (opt) {
|
|
case 'o':
|
|
output_prefix = optarg;
|
|
break;
|
|
case 'N':
|
|
nthreads = atoi(optarg);
|
|
break;
|
|
case 's':
|
|
start_image = atoi(optarg);
|
|
break;
|
|
case 'e':
|
|
end_image = atoi(optarg);
|
|
break;
|
|
case 'v':
|
|
verbose = true;
|
|
break;
|
|
case 'R':
|
|
rotation_index = true;
|
|
break;
|
|
default:
|
|
print_usage(logger);
|
|
exit(EXIT_FAILURE);
|
|
}
|
|
}
|
|
|
|
if (optind != argc - 1) {
|
|
logger.Error("Input file not specified");
|
|
print_usage(logger);
|
|
exit(EXIT_FAILURE);
|
|
}
|
|
|
|
input_file = argv[optind];
|
|
logger.Verbose(verbose);
|
|
|
|
// 1. Read Input File
|
|
JFJochHDF5Reader reader;
|
|
try {
|
|
reader.ReadFile(input_file);
|
|
} catch (const std::exception &e) {
|
|
logger.Error("Error reading input file: {}", e.what());
|
|
exit(EXIT_FAILURE);
|
|
}
|
|
|
|
const auto dataset = reader.GetDataset();
|
|
if (!dataset) {
|
|
logger.Error("No experiment dataset found in the input file");
|
|
exit(EXIT_FAILURE);
|
|
}
|
|
|
|
logger.Info("Loaded dataset from {}", input_file);
|
|
|
|
// 2. Setup Experiment & Components
|
|
DiffractionExperiment experiment(dataset->experiment);
|
|
experiment.FilePrefix(output_prefix);
|
|
experiment.Mode(DetectorMode::Standard); // Ensure full image analysis
|
|
experiment.PixelSigned(false);
|
|
experiment.OverwriteExistingFiles(true);
|
|
|
|
// Configure Indexing
|
|
IndexingSettings indexing_settings;
|
|
indexing_settings.Algorithm(IndexingAlgorithmEnum::FFT);
|
|
indexing_settings.RotationIndexing(rotation_index);
|
|
indexing_settings.GeomRefinementAlgorithm(GeomRefinementAlgorithmEnum::BeamCenter);
|
|
experiment.ImportIndexingSettings(indexing_settings);
|
|
|
|
SpotFindingSettings spot_settings;
|
|
spot_settings.enable = true;
|
|
spot_settings.indexing = true;
|
|
|
|
// Initialize Analysis Components
|
|
PixelMask pixel_mask = dataset->pixel_mask;
|
|
|
|
// If dataset has a mask you wish to use, you might need to load it into pixel_mask here
|
|
// e.g. pixel_mask.LoadUserMask(dataset->pixel_mask, ...);
|
|
|
|
AzimuthalIntegration mapping(experiment, pixel_mask);
|
|
IndexerThreadPool indexer_pool(experiment.GetIndexingSettings());
|
|
|
|
// Statistics collector
|
|
JFJochReceiverPlots plots;
|
|
plots.Setup(experiment, mapping);
|
|
|
|
// 3. Setup FileWriter
|
|
StartMessage start_message;
|
|
experiment.FillMessage(start_message);
|
|
start_message.arm_date = dataset->arm_date; // Use original arm date
|
|
start_message.az_int_bin_to_q = mapping.GetBinToQ();
|
|
start_message.az_int_q_bin_count = mapping.GetQBinCount();
|
|
if (mapping.GetAzimuthalBinCount() > 1)
|
|
start_message.az_int_bin_to_phi = mapping.GetBinToPhi();
|
|
|
|
start_message.pixel_mask["default"] = pixel_mask.GetMask(experiment);
|
|
start_message.max_spot_count = experiment.GetMaxSpotCount();
|
|
|
|
std::unique_ptr<FileWriter> writer;
|
|
try {
|
|
writer = std::make_unique<FileWriter>(start_message);
|
|
} catch (const std::exception &e) {
|
|
logger.Error("Failed to initialize file writer: {}", e.what());
|
|
exit(EXIT_FAILURE);
|
|
}
|
|
|
|
// 4. Processing Setup
|
|
int total_images_in_file = reader.GetNumberOfImages();
|
|
if (end_image < 0 || end_image > total_images_in_file)
|
|
end_image = total_images_in_file;
|
|
|
|
int images_to_process = end_image - start_image;
|
|
|
|
if (images_to_process <= 0) {
|
|
logger.Warning("No images to process (Start: {}, End: {}, Total: {})", start_image, end_image, total_images_in_file);
|
|
return 0;
|
|
}
|
|
|
|
logger.Info("Starting analysis of {} images (range {}-{}) using {} threads",
|
|
images_to_process, start_image, end_image, nthreads);
|
|
|
|
std::mutex reader_mutex;
|
|
std::mutex plots_mutex; // Protect shared plot/stats updates if they aren't thread-safe
|
|
|
|
std::atomic<int> processed_count = 0;
|
|
std::atomic<uint64_t> total_uncompressed_bytes = 0;
|
|
std::atomic<uint64_t> max_image_number_sent = 0;
|
|
|
|
// Mimic JFJochReceiver lattice handling (IndexAndRefine handles the logic per thread,
|
|
// but we need a central accumulator or use the pool's functionality if IndexAndRefine wraps it)
|
|
// Here we will use per-thread IndexAndRefine which uses the shared thread pool.
|
|
|
|
auto start_time = std::chrono::steady_clock::now();
|
|
|
|
auto worker = [&](int thread_id) {
|
|
// Thread-local analysis resources
|
|
IndexAndRefine indexer(experiment, &indexer_pool);
|
|
MXAnalysisWithoutFPGA analysis(experiment, mapping, pixel_mask, indexer);
|
|
|
|
std::vector<uint8_t> decomp_buffer;
|
|
AzimuthalIntegrationProfile profile(mapping);
|
|
|
|
while (true) {
|
|
int current_idx_offset = processed_count.fetch_add(1);
|
|
int image_idx = start_image + current_idx_offset;
|
|
|
|
if (image_idx >= end_image) break;
|
|
|
|
// Load Image
|
|
std::shared_ptr<JFJochReaderImage> img;
|
|
{
|
|
std::lock_guard<std::mutex> lock(reader_mutex);
|
|
try {
|
|
img = reader.LoadImage(image_idx);
|
|
} catch (const std::exception& e) {
|
|
logger.Error("Failed to load image {}: {}", image_idx, e.what());
|
|
continue;
|
|
}
|
|
}
|
|
|
|
if (!img) continue;
|
|
|
|
DataMessage msg;
|
|
|
|
msg.image = img->ImageData().image;
|
|
msg.number = img->ImageData().number;
|
|
msg.error_pixel_count = img->ImageData().error_pixel_count;
|
|
msg.image_collection_efficiency = img->ImageData().image_collection_efficiency;
|
|
msg.storage_cell = img->ImageData().storage_cell;
|
|
msg.user_data = img->ImageData().user_data;
|
|
|
|
total_uncompressed_bytes += msg.image.GetUncompressedSize();
|
|
|
|
auto image_start_time = std::chrono::high_resolution_clock::now();
|
|
|
|
// Analyze
|
|
try {
|
|
analysis.Analyze(msg, decomp_buffer, profile, spot_settings);
|
|
} catch (const std::exception& e) {
|
|
logger.Error("Error analyzing image {}: {}", image_idx, e.what());
|
|
continue;
|
|
}
|
|
|
|
auto image_end_time = std::chrono::high_resolution_clock::now();
|
|
std::chrono::duration<float> image_duration = image_end_time - image_start_time;
|
|
|
|
// Mimic DataMessage stats from Receiver
|
|
msg.processing_time_s = image_duration.count();
|
|
msg.original_number = msg.number;
|
|
msg.run_number = experiment.GetRunNumber();
|
|
msg.run_name = experiment.GetRunName();
|
|
// msg.receiver_free_send_buf <- Not relevant for file analysis
|
|
// msg.receiver_aq_dev_delay <- Not relevant for file analysis
|
|
|
|
// Update Plots/Stats (Thread safe update needed)
|
|
{
|
|
std::lock_guard<std::mutex> lock(plots_mutex);
|
|
plots.Add(msg, profile);
|
|
}
|
|
|
|
// Write Result
|
|
writer->Write(msg);
|
|
|
|
// Update max sent tracking
|
|
uint64_t current_max = max_image_number_sent.load();
|
|
while (static_cast<uint64_t>(msg.number) > current_max) {
|
|
if (max_image_number_sent.compare_exchange_weak(current_max, static_cast<uint64_t>(msg.number)))
|
|
break;
|
|
}
|
|
|
|
// Progress log
|
|
if (current_idx_offset > 0 && current_idx_offset % 100 == 0)
|
|
logger.Info("Processed {} / {} images", current_idx_offset, images_to_process);
|
|
}
|
|
|
|
// Finalize per-thread indexing (if any per-thread aggregation is needed, though pool handles most)
|
|
// IndexAndRefine doesn't have a per-thread finalize that returns a lattice,
|
|
// the main lattice determination is usually done on the aggregated results in the pool or main thread
|
|
};
|
|
|
|
// Launch threads
|
|
std::vector<std::future<void>> futures;
|
|
futures.reserve(nthreads);
|
|
for (int i = 0; i < nthreads; ++i) {
|
|
futures.push_back(std::async(std::launch::async, worker, i));
|
|
}
|
|
|
|
// Wait for completion
|
|
for (auto &f : futures) {
|
|
f.get();
|
|
}
|
|
|
|
auto end_time = std::chrono::steady_clock::now();
|
|
|
|
// 5. Finalize Statistics and Write EndMessage
|
|
EndMessage end_msg;
|
|
end_msg.max_image_number = max_image_number_sent;
|
|
end_msg.images_collected_count = images_to_process;
|
|
end_msg.images_sent_to_write_count = images_to_process;
|
|
end_msg.end_date = time_UTC(std::chrono::system_clock::now());
|
|
end_msg.run_number = experiment.GetRunNumber();
|
|
end_msg.run_name = experiment.GetRunName();
|
|
|
|
// Gather statistics from plots
|
|
end_msg.bkg_estimate = plots.GetBkgEstimate();
|
|
end_msg.indexing_rate = plots.GetIndexingRate();
|
|
end_msg.az_int_result["dataset"] = plots.GetAzIntProfile();
|
|
|
|
// Finalize Indexing (Global) to get rotation lattice
|
|
// We create a temporary IndexAndRefine to call Finalize() which aggregates pool results
|
|
IndexAndRefine global_indexer(experiment, &indexer_pool);
|
|
const auto rotation_indexer_ret = global_indexer.Finalize();
|
|
|
|
if (rotation_indexer_ret.has_value()) {
|
|
end_msg.rotation_lattice = rotation_indexer_ret->lattice;
|
|
end_msg.rotation_lattice_type = LatticeMessage{
|
|
.centering = rotation_indexer_ret->search_result.centering,
|
|
.niggli_class = rotation_indexer_ret->search_result.niggli_class,
|
|
.crystal_system = rotation_indexer_ret->search_result.system
|
|
};
|
|
logger.Info("Rotation Indexing found lattice");
|
|
}
|
|
|
|
// Write End Message
|
|
writer->WriteHDF5(end_msg);
|
|
auto stats = writer->Finalize();
|
|
|
|
// 6. Report Statistics to Console
|
|
double processing_time = std::chrono::duration<double>(end_time - start_time).count();
|
|
double throughput_MBs = static_cast<double>(total_uncompressed_bytes) / (processing_time * 1e6);
|
|
double frame_rate = static_cast<double>(images_to_process) / processing_time;
|
|
|
|
logger.Info("");
|
|
logger.Info("Processing time: {:.2f} s", processing_time);
|
|
logger.Info("Frame rate: {:.2f} Hz", frame_rate);
|
|
logger.Info("Total throughput:{:.2f} MB/s", throughput_MBs);
|
|
logger.Info("Images written: {}", stats.size());
|
|
|
|
// Print extended stats similar to Receiver
|
|
if (!end_msg.indexing_rate.has_value()) {
|
|
logger.Info("Indexing rate: {:.2f}%", end_msg.indexing_rate.value() * 100.0);
|
|
}
|
|
if (rotation_indexer_ret.has_value()) {
|
|
auto uc = rotation_indexer_ret->lattice.GetUnitCell();
|
|
logger.Info("Lattice: a={:.2f} b={:.2f} c={:.2f} alpha={:.2f} beta={:.2f} gamma={:.2f}",
|
|
uc.a, uc.b, uc.c, uc.alpha, uc.beta, uc.gamma);
|
|
}
|
|
|
|
if (stats.empty()) {
|
|
logger.Info("Analysis finished with warnings (no files written)");
|
|
exit(EXIT_FAILURE);
|
|
} else {
|
|
logger.Info("Analysis successfully finished");
|
|
exit(EXIT_SUCCESS);
|
|
}
|
|
}
|