jfjoch_process: First iteration
This commit is contained in:
@@ -43,4 +43,8 @@ ADD_EXECUTABLE(jfjoch_extract_hkl jfjoch_extract_hkl.cpp)
|
||||
TARGET_LINK_LIBRARIES(jfjoch_extract_hkl JFJochReader)
|
||||
INSTALL(TARGETS jfjoch_extract_hkl RUNTIME COMPONENT viewer)
|
||||
|
||||
ADD_EXECUTABLE(jfjoch_process jfjoch_process.cpp)
|
||||
TARGET_LINK_LIBRARIES(jfjoch_process JFJochReader JFJochImageAnalysis JFJochWriter JFJochReceiver)
|
||||
INSTALL(TARGETS jfjoch_process RUNTIME COMPONENT viewer)
|
||||
|
||||
ADD_SUBDIRECTORY(xbflash.qspi)
|
||||
|
||||
328
tools/jfjoch_process.cpp
Normal file
328
tools/jfjoch_process.cpp
Normal file
@@ -0,0 +1,328 @@
|
||||
// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
#include <string>
|
||||
#include <unistd.h>
|
||||
#include <future>
|
||||
#include <mutex>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
|
||||
#include "../reader/JFJochHDF5Reader.h"
|
||||
#include "../common/Logger.h"
|
||||
#include "../common/DiffractionExperiment.h"
|
||||
#include "../common/PixelMask.h"
|
||||
#include "../common/AzimuthalIntegration.h"
|
||||
#include "../common/time_utc.h"
|
||||
#include "../common/print_license.h"
|
||||
#include "../image_analysis/MXAnalysisWithoutFPGA.h"
|
||||
#include "../image_analysis/indexing/IndexerFactory.h"
|
||||
#include "../writer/FileWriter.h"
|
||||
#include "../image_analysis/IndexAndRefine.h"
|
||||
#include "../receiver/JFJochReceiverPlots.h"
|
||||
|
||||
void print_usage(Logger &logger) {
|
||||
logger.Info("Usage ./jfjoch_analysis {<options>} <input.h5>");
|
||||
logger.Info("Options:");
|
||||
logger.Info(" -o<txt> Output file prefix (default: output)");
|
||||
logger.Info(" -N<num> Number of threads (default: 1)");
|
||||
logger.Info(" -s<num> Start image number (default: 0)");
|
||||
logger.Info(" -e<num> End image number (default: all)");
|
||||
logger.Info(" -v Verbose output");
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
print_license("jfjoch_analysis");
|
||||
|
||||
Logger logger("jfjoch_analysis");
|
||||
|
||||
std::string input_file;
|
||||
std::string output_prefix = "output";
|
||||
int nthreads = 1;
|
||||
int start_image = 0;
|
||||
int end_image = -1; // -1 indicates process until end
|
||||
bool verbose = false;
|
||||
|
||||
if (argc == 1) {
|
||||
print_usage(logger);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
int opt;
|
||||
while ((opt = getopt(argc, argv, "o:N:s:e:v")) != -1) {
|
||||
switch (opt) {
|
||||
case 'o':
|
||||
output_prefix = optarg;
|
||||
break;
|
||||
case 'N':
|
||||
nthreads = atoi(optarg);
|
||||
break;
|
||||
case 's':
|
||||
start_image = atoi(optarg);
|
||||
break;
|
||||
case 'e':
|
||||
end_image = atoi(optarg);
|
||||
break;
|
||||
case 'v':
|
||||
verbose = true;
|
||||
break;
|
||||
default:
|
||||
print_usage(logger);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
}
|
||||
|
||||
if (optind != argc - 1) {
|
||||
logger.Error("Input file not specified");
|
||||
print_usage(logger);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
input_file = argv[optind];
|
||||
logger.Verbose(verbose);
|
||||
|
||||
// 1. Read Input File
|
||||
JFJochHDF5Reader reader;
|
||||
try {
|
||||
reader.ReadFile(input_file);
|
||||
} catch (const std::exception &e) {
|
||||
logger.Error("Error reading input file: {}", e.what());
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
const auto dataset = reader.GetDataset();
|
||||
if (!dataset) {
|
||||
logger.Error("No experiment dataset found in the input file");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
logger.Info("Loaded dataset from {}", input_file);
|
||||
|
||||
// 2. Setup Experiment & Components
|
||||
DiffractionExperiment experiment(dataset->experiment);
|
||||
experiment.FilePrefix(output_prefix);
|
||||
experiment.Mode(DetectorMode::Standard); // Ensure full image analysis
|
||||
|
||||
// Configure Indexing
|
||||
IndexingSettings indexing_settings;
|
||||
indexing_settings.Algorithm(IndexingAlgorithmEnum::FFT);
|
||||
indexing_settings.RotationIndexing(true);
|
||||
indexing_settings.GeomRefinementAlgorithm(GeomRefinementAlgorithmEnum::BeamCenter);
|
||||
experiment.ImportIndexingSettings(indexing_settings);
|
||||
|
||||
SpotFindingSettings spot_settings;
|
||||
spot_settings.enable = true;
|
||||
spot_settings.indexing = true;
|
||||
|
||||
// Initialize Analysis Components
|
||||
// Note: dataset->pixel_mask might need conversion depending on type, assuming it fits constructor
|
||||
PixelMask pixel_mask(experiment);
|
||||
// If dataset has a mask you wish to use, you might need to load it into pixel_mask here
|
||||
// e.g. pixel_mask.LoadUserMask(dataset->pixel_mask, ...);
|
||||
|
||||
AzimuthalIntegration mapping(experiment, pixel_mask);
|
||||
IndexerThreadPool indexer_pool(experiment.GetIndexingSettings());
|
||||
|
||||
// Statistics collector
|
||||
JFJochReceiverPlots plots;
|
||||
plots.Setup(experiment, mapping);
|
||||
|
||||
// 3. Setup FileWriter
|
||||
StartMessage start_message;
|
||||
experiment.FillMessage(start_message);
|
||||
start_message.arm_date = dataset->arm_date; // Use original arm date
|
||||
start_message.az_int_bin_to_q = mapping.GetBinToQ();
|
||||
start_message.az_int_q_bin_count = mapping.GetQBinCount();
|
||||
|
||||
std::unique_ptr<FileWriter> writer;
|
||||
try {
|
||||
writer = std::make_unique<FileWriter>(start_message);
|
||||
} catch (const std::exception &e) {
|
||||
logger.Error("Failed to initialize file writer: {}", e.what());
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
// 4. Processing Setup
|
||||
int total_images_in_file = reader.GetNumberOfImages();
|
||||
if (end_image < 0 || end_image > total_images_in_file)
|
||||
end_image = total_images_in_file;
|
||||
|
||||
int images_to_process = end_image - start_image;
|
||||
|
||||
if (images_to_process <= 0) {
|
||||
logger.Warning("No images to process (Start: {}, End: {}, Total: {})", start_image, end_image, total_images_in_file);
|
||||
return 0;
|
||||
}
|
||||
|
||||
logger.Info("Starting analysis of {} images (range {}-{}) using {} threads",
|
||||
images_to_process, start_image, end_image, nthreads);
|
||||
|
||||
std::mutex reader_mutex;
|
||||
std::mutex plots_mutex; // Protect shared plot/stats updates if they aren't thread-safe
|
||||
|
||||
std::atomic<int> processed_count = 0;
|
||||
std::atomic<uint64_t> total_uncompressed_bytes = 0;
|
||||
std::atomic<uint64_t> max_image_number_sent = 0;
|
||||
|
||||
// Mimic JFJochReceiver lattice handling (IndexAndRefine handles the logic per thread,
|
||||
// but we need a central accumulator or use the pool's functionality if IndexAndRefine wraps it)
|
||||
// Here we will use per-thread IndexAndRefine which uses the shared thread pool.
|
||||
|
||||
auto start_time = std::chrono::steady_clock::now();
|
||||
|
||||
auto worker = [&](int thread_id) {
|
||||
// Thread-local analysis resources
|
||||
IndexAndRefine indexer(experiment, &indexer_pool);
|
||||
MXAnalysisWithoutFPGA analysis(experiment, mapping, pixel_mask, indexer);
|
||||
|
||||
std::vector<uint8_t> decomp_buffer;
|
||||
AzimuthalIntegrationProfile profile(mapping);
|
||||
|
||||
while (true) {
|
||||
int current_idx_offset = processed_count.fetch_add(1);
|
||||
int image_idx = start_image + current_idx_offset;
|
||||
|
||||
if (image_idx >= end_image) break;
|
||||
|
||||
// Load Image
|
||||
std::shared_ptr<JFJochReaderImage> img;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(reader_mutex);
|
||||
try {
|
||||
img = reader.LoadImage(image_idx);
|
||||
} catch (const std::exception& e) {
|
||||
logger.Error("Failed to load image {}: {}", image_idx, e.what());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (!img) continue;
|
||||
|
||||
DataMessage msg = img->ImageData();
|
||||
total_uncompressed_bytes += msg.image.GetUncompressedSize();
|
||||
|
||||
auto image_start_time = std::chrono::high_resolution_clock::now();
|
||||
|
||||
// Analyze
|
||||
try {
|
||||
analysis.Analyze(msg, decomp_buffer, profile, spot_settings);
|
||||
} catch (const std::exception& e) {
|
||||
logger.Error("Error analyzing image {}: {}", image_idx, e.what());
|
||||
continue;
|
||||
}
|
||||
|
||||
auto image_end_time = std::chrono::high_resolution_clock::now();
|
||||
std::chrono::duration<float> image_duration = image_end_time - image_start_time;
|
||||
|
||||
// Mimic DataMessage stats from Receiver
|
||||
msg.processing_time_s = image_duration.count();
|
||||
msg.original_number = msg.number;
|
||||
msg.run_number = experiment.GetRunNumber();
|
||||
msg.run_name = experiment.GetRunName();
|
||||
// msg.receiver_free_send_buf <- Not relevant for file analysis
|
||||
// msg.receiver_aq_dev_delay <- Not relevant for file analysis
|
||||
|
||||
// Update Plots/Stats (Thread safe update needed)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(plots_mutex);
|
||||
plots.Add(msg, profile);
|
||||
}
|
||||
|
||||
// Write Result
|
||||
writer->Write(msg);
|
||||
|
||||
// Update max sent tracking
|
||||
uint64_t current_max = max_image_number_sent.load();
|
||||
while (static_cast<uint64_t>(msg.number) > current_max) {
|
||||
if (max_image_number_sent.compare_exchange_weak(current_max, static_cast<uint64_t>(msg.number)))
|
||||
break;
|
||||
}
|
||||
|
||||
// Progress log
|
||||
if (current_idx_offset > 0 && current_idx_offset % 100 == 0)
|
||||
logger.Info("Processed {} / {} images", current_idx_offset, images_to_process);
|
||||
}
|
||||
|
||||
// Finalize per-thread indexing (if any per-thread aggregation is needed, though pool handles most)
|
||||
// IndexAndRefine doesn't have a per-thread finalize that returns a lattice,
|
||||
// the main lattice determination is usually done on the aggregated results in the pool or main thread
|
||||
};
|
||||
|
||||
// Launch threads
|
||||
std::vector<std::future<void>> futures;
|
||||
futures.reserve(nthreads);
|
||||
for (int i = 0; i < nthreads; ++i) {
|
||||
futures.push_back(std::async(std::launch::async, worker, i));
|
||||
}
|
||||
|
||||
// Wait for completion
|
||||
for (auto &f : futures) {
|
||||
f.get();
|
||||
}
|
||||
|
||||
auto end_time = std::chrono::steady_clock::now();
|
||||
|
||||
// 5. Finalize Statistics and Write EndMessage
|
||||
EndMessage end_msg;
|
||||
end_msg.max_image_number = max_image_number_sent;
|
||||
end_msg.images_collected_count = images_to_process;
|
||||
end_msg.images_sent_to_write_count = images_to_process;
|
||||
end_msg.end_date = time_UTC(std::chrono::system_clock::now());
|
||||
end_msg.run_number = experiment.GetRunNumber();
|
||||
end_msg.run_name = experiment.GetRunName();
|
||||
|
||||
// Gather statistics from plots
|
||||
end_msg.bkg_estimate = plots.GetBkgEstimate();
|
||||
end_msg.indexing_rate = plots.GetIndexingRate();
|
||||
end_msg.az_int_result["dataset"] = plots.GetAzIntProfile();
|
||||
|
||||
// Finalize Indexing (Global) to get rotation lattice
|
||||
// We create a temporary IndexAndRefine to call Finalize() which aggregates pool results
|
||||
IndexAndRefine global_indexer(experiment, &indexer_pool);
|
||||
const auto rotation_indexer_ret = global_indexer.Finalize();
|
||||
|
||||
if (rotation_indexer_ret.has_value()) {
|
||||
end_msg.rotation_lattice = rotation_indexer_ret->lattice;
|
||||
end_msg.rotation_lattice_type = LatticeMessage{
|
||||
.centering = rotation_indexer_ret->search_result.centering,
|
||||
.niggli_class = rotation_indexer_ret->search_result.niggli_class,
|
||||
.crystal_system = rotation_indexer_ret->search_result.system
|
||||
};
|
||||
logger.Info("Rotation Indexing found lattice");
|
||||
}
|
||||
|
||||
// Write End Message
|
||||
writer->WriteHDF5(end_msg);
|
||||
auto stats = writer->Finalize();
|
||||
|
||||
// 6. Report Statistics to Console
|
||||
double processing_time = std::chrono::duration<double>(end_time - start_time).count();
|
||||
double throughput_MBs = static_cast<double>(total_uncompressed_bytes) / (processing_time * 1e6);
|
||||
double frame_rate = static_cast<double>(images_to_process) / processing_time;
|
||||
|
||||
logger.Info("");
|
||||
logger.Info("Processing time: {:.2f} s", processing_time);
|
||||
logger.Info("Frame rate: {:.2f} Hz", frame_rate);
|
||||
logger.Info("Total throughput:{:.2f} MB/s", throughput_MBs);
|
||||
logger.Info("Images written: {}", stats.size());
|
||||
|
||||
// Print extended stats similar to Receiver
|
||||
if (!end_msg.indexing_rate.has_value()) {
|
||||
logger.Info("Indexing rate: {:.2f}%", end_msg.indexing_rate.value() * 100.0);
|
||||
}
|
||||
if (rotation_indexer_ret.has_value()) {
|
||||
auto uc = rotation_indexer_ret->lattice.GetUnitCell();
|
||||
logger.Info("Lattice: a={:.2f} b={:.2f} c={:.2f} alpha={:.2f} beta={:.2f} gamma={:.2f}",
|
||||
uc.a, uc.b, uc.c, uc.alpha, uc.beta, uc.gamma);
|
||||
}
|
||||
|
||||
if (stats.empty()) {
|
||||
logger.Info("Analysis finished with warnings (no files written)");
|
||||
exit(EXIT_FAILURE);
|
||||
} else {
|
||||
logger.Info("Analysis successfully finished");
|
||||
exit(EXIT_SUCCESS);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user