Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f7a9e4eab1 | |||
| 0a2c370983 | |||
| 18a9c0bcc1 | |||
| ab6f6e8f49 | |||
| 949b7597bb | |||
| 77ef33f92e | |||
| 35d394cda7 |
@@ -122,13 +122,11 @@ namespace {
|
||||
delta_phi_(r.delta_phi_deg),
|
||||
lp_(SafeInv(r.rlp, 1.0)),
|
||||
c1_(r.zeta / std::sqrt(2.0)),
|
||||
inv_2d2_(0.5 / (static_cast<double>(r.d) * static_cast<double>(r.d))),
|
||||
partiality_(r.partiality) {
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
bool operator()(const T *const G,
|
||||
const T *const B,
|
||||
const T *const mosaicity,
|
||||
const T *const Itrue,
|
||||
const T *const wedge,
|
||||
@@ -142,8 +140,7 @@ namespace {
|
||||
} else
|
||||
partiality = T(1.0);
|
||||
|
||||
const T bscale = ceres::exp(-B[0] * T(inv_2d2_));
|
||||
const T Ipred = G[0] * bscale * partiality * T(lp_) * Itrue[0];
|
||||
const T Ipred = G[0] * partiality * T(lp_) * Itrue[0];
|
||||
residual[0] = (Ipred - T(Iobs_)) * T(weight_);
|
||||
return true;
|
||||
}
|
||||
@@ -153,7 +150,6 @@ namespace {
|
||||
double delta_phi_;
|
||||
double lp_;
|
||||
double c1_;
|
||||
double inv_2d2_;
|
||||
double partiality_;
|
||||
};
|
||||
|
||||
@@ -162,19 +158,16 @@ namespace {
|
||||
: Iobs_(static_cast<double>(r.I)),
|
||||
weight_(SafeInv(sigma_obs, 1.0)),
|
||||
lp_(SafeInv(r.rlp, 1.0)),
|
||||
dist_ewald_sq_(r.dist_ewald * r.dist_ewald),
|
||||
inv_2d2_(0.5 / (static_cast<double>(r.d) * static_cast<double>(r.d))) {
|
||||
dist_ewald_sq_(r.dist_ewald * r.dist_ewald) {
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
bool operator()(const T *const G,
|
||||
const T *const B,
|
||||
const T *const R,
|
||||
const T *const Itrue,
|
||||
T *residual) const {
|
||||
const T partiality = ceres::exp(-T(dist_ewald_sq_) / R[0]);
|
||||
const T bscale = ceres::exp(-B[0] * T(inv_2d2_));
|
||||
const T Ipred = G[0] * bscale * partiality * T(lp_) * Itrue[0];
|
||||
const T partiality = ceres::exp(-T(dist_ewald_sq_)/R[0]);
|
||||
const T Ipred = G[0] * partiality * T(lp_) * Itrue[0];
|
||||
residual[0] = (Ipred - T(Iobs_)) * T(weight_);
|
||||
return true;
|
||||
}
|
||||
@@ -183,24 +176,20 @@ namespace {
|
||||
double weight_;
|
||||
double lp_;
|
||||
double dist_ewald_sq_;
|
||||
double inv_2d2_;
|
||||
};
|
||||
|
||||
struct IntensityFixedResidual {
|
||||
IntensityFixedResidual(const Reflection &r, double sigma_obs, double partiality)
|
||||
: Iobs_(static_cast<double>(r.I)),
|
||||
weight_(SafeInv(sigma_obs, 1.0)),
|
||||
corr_(partiality * SafeInv(r.rlp, 1.0)),
|
||||
inv_2d2_(0.5 / (static_cast<double>(r.d) * static_cast<double>(r.d))) {
|
||||
}
|
||||
corr_(partiality * SafeInv(r.rlp, 1.0))
|
||||
{}
|
||||
|
||||
template<typename T>
|
||||
bool operator()(const T *const G,
|
||||
const T *const B,
|
||||
const T *const Itrue,
|
||||
T *residual) const {
|
||||
const T bscale = ceres::exp(-B[0] * T(inv_2d2_));
|
||||
const T Ipred = T(corr_) * G[0] * bscale * Itrue[0];
|
||||
const T Ipred = T(corr_) * G[0] * Itrue[0];
|
||||
residual[0] = (Ipred - T(Iobs_)) * T(weight_);
|
||||
return true;
|
||||
}
|
||||
@@ -208,7 +197,6 @@ namespace {
|
||||
double Iobs_;
|
||||
double weight_;
|
||||
double corr_;
|
||||
double inv_2d2_;
|
||||
};
|
||||
|
||||
struct ScaleRegularizationResidual {
|
||||
@@ -242,21 +230,6 @@ namespace {
|
||||
double inv_sigma_;
|
||||
};
|
||||
|
||||
struct ValueRegularizationResidual {
|
||||
ValueRegularizationResidual(double target, double sigma)
|
||||
: target_(target), inv_sigma_(SafeInv(sigma, 1.0)) {
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
bool operator()(const T *const x, T *residual) const {
|
||||
residual[0] = (x[0] - T(target_)) * T(inv_sigma_);
|
||||
return true;
|
||||
}
|
||||
|
||||
double target_;
|
||||
double inv_sigma_;
|
||||
};
|
||||
|
||||
struct ObsRef {
|
||||
const Reflection *r = nullptr;
|
||||
int img_id = 0;
|
||||
@@ -275,7 +248,6 @@ namespace {
|
||||
std::vector<double> &g,
|
||||
std::vector<double> &mosaicity,
|
||||
std::vector<double> &R_sq,
|
||||
std::vector<double> &image_bfactor,
|
||||
const std::vector<uint8_t> &image_slot_used,
|
||||
bool rotation_crystallography,
|
||||
size_t nhkl,
|
||||
@@ -310,45 +282,41 @@ namespace {
|
||||
for (const auto &o: obs) {
|
||||
switch (opt.partiality_model) {
|
||||
case ScaleMergeOptions::PartialityModel::Rotation: {
|
||||
auto *cost = new ceres::AutoDiffCostFunction<IntensityRotResidual, 1, 1, 1, 1, 1, 1>(
|
||||
auto *cost = new ceres::AutoDiffCostFunction<IntensityRotResidual, 1, 1, 1, 1, 1>(
|
||||
new IntensityRotResidual(*o.r, o.sigma, opt.wedge_deg.value_or(0.0)));
|
||||
problem.AddResidualBlock(cost,
|
||||
nullptr,
|
||||
&g[o.img_id],
|
||||
&image_bfactor[o.img_id],
|
||||
&mosaicity[o.img_id],
|
||||
&Itrue[o.hkl_slot],
|
||||
&wedge);
|
||||
}
|
||||
break;
|
||||
case ScaleMergeOptions::PartialityModel::Still: {
|
||||
auto *cost = new ceres::AutoDiffCostFunction<IntensityStillResidual, 1, 1, 1, 1, 1>(
|
||||
auto *cost = new ceres::AutoDiffCostFunction<IntensityStillResidual, 1, 1, 1, 1>(
|
||||
new IntensityStillResidual(*o.r, o.sigma));
|
||||
problem.AddResidualBlock(cost,
|
||||
nullptr,
|
||||
&g[o.img_id],
|
||||
&image_bfactor[o.img_id],
|
||||
&R_sq[o.img_id],
|
||||
&Itrue[o.hkl_slot]);
|
||||
}
|
||||
break;
|
||||
case ScaleMergeOptions::PartialityModel::Unity: {
|
||||
auto *cost = new ceres::AutoDiffCostFunction<IntensityFixedResidual, 1, 1, 1, 1>(
|
||||
auto *cost = new ceres::AutoDiffCostFunction<IntensityFixedResidual, 1, 1, 1>(
|
||||
new IntensityFixedResidual(*o.r, o.sigma, 1.0));
|
||||
problem.AddResidualBlock(cost,
|
||||
nullptr,
|
||||
&g[o.img_id],
|
||||
&image_bfactor[o.img_id],
|
||||
&Itrue[o.hkl_slot]);
|
||||
}
|
||||
break;
|
||||
case ScaleMergeOptions::PartialityModel::Fixed: {
|
||||
auto *cost = new ceres::AutoDiffCostFunction<IntensityFixedResidual, 1, 1, 1, 1>(
|
||||
auto *cost = new ceres::AutoDiffCostFunction<IntensityFixedResidual, 1, 1, 1>(
|
||||
new IntensityFixedResidual(*o.r, o.sigma, o.r->partiality));
|
||||
problem.AddResidualBlock(cost,
|
||||
nullptr,
|
||||
&g[o.img_id],
|
||||
&image_bfactor[o.img_id],
|
||||
&Itrue[o.hkl_slot]);
|
||||
}
|
||||
break;
|
||||
@@ -359,12 +327,8 @@ namespace {
|
||||
for (int i = 0; i < g.size(); ++i) {
|
||||
if (image_slot_used[i]) {
|
||||
auto *cost = new ceres::AutoDiffCostFunction<ScaleRegularizationResidual, 1, 1>(
|
||||
new ScaleRegularizationResidual(opt.scale_regularization_sigma));
|
||||
new ScaleRegularizationResidual(0.05));
|
||||
problem.AddResidualBlock(cost, nullptr, &g[i]);
|
||||
|
||||
auto *bcost = new ceres::AutoDiffCostFunction<ValueRegularizationResidual, 1, 1>(
|
||||
new ValueRegularizationResidual(0.0, opt.image_bfactor_regularization_sigma_A2));
|
||||
problem.AddResidualBlock(bcost, nullptr, &image_bfactor[i]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -400,20 +364,13 @@ namespace {
|
||||
}
|
||||
}
|
||||
|
||||
// Scaling factors must be always positive
|
||||
for (int i = 0; i < g.size(); i++) {
|
||||
if (image_slot_used[i])
|
||||
problem.SetParameterLowerBound(&g[i], 0, 1e-12);
|
||||
}
|
||||
|
||||
for (int i = 0; i < image_bfactor.size(); ++i) {
|
||||
if (!image_slot_used[i])
|
||||
continue;
|
||||
problem.SetParameterLowerBound(&image_bfactor[i], 0, opt.image_bfactor_min_A2);
|
||||
problem.SetParameterUpperBound(&image_bfactor[i], 0, opt.image_bfactor_max_A2);
|
||||
if (!opt.refine_image_bfactor)
|
||||
problem.SetParameterBlockConstant(&image_bfactor[i]);
|
||||
}
|
||||
|
||||
// Mosaicity refinement + bounds
|
||||
if (opt.partiality_model == ScaleMergeOptions::PartialityModel::Rotation) {
|
||||
for (int i = 0; i < mosaicity.size(); ++i) {
|
||||
if (image_slot_used[i]) {
|
||||
@@ -427,9 +384,10 @@ namespace {
|
||||
problem.SetParameterLowerBound(&wedge, 0, 0.0);
|
||||
}
|
||||
|
||||
// use all available threads
|
||||
unsigned int hw = std::thread::hardware_concurrency();
|
||||
if (hw == 0)
|
||||
hw = 1;
|
||||
hw = 1; // fallback
|
||||
|
||||
ceres::Solver::Options options;
|
||||
|
||||
@@ -630,14 +588,14 @@ namespace {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void calc_obs(const ScaleMergeOptions &opt,
|
||||
std::vector<double> &g,
|
||||
std::vector<double> &mosaicity,
|
||||
std::vector<double> &R_sq,
|
||||
std::vector<double> &image_bfactor,
|
||||
const std::vector<ObsRef> &obs,
|
||||
std::vector<CorrectedObs> &corr_obs) {
|
||||
std::vector<double> &g,
|
||||
std::vector<double> &mosaicity,
|
||||
std::vector<double> &R_sq,
|
||||
const std::vector<ObsRef> &obs,
|
||||
std::vector<CorrectedObs> &corr_obs) {
|
||||
|
||||
// ---- Compute corrected observations once (used for both merging and statistics) ----
|
||||
const double half_wedge = opt.wedge_deg.value_or(0.0) / 2.0;
|
||||
|
||||
for (const auto &o: obs) {
|
||||
@@ -645,6 +603,7 @@ namespace {
|
||||
const double lp = SafeInv(static_cast<double>(r.rlp), 1.0);
|
||||
const double G_i = g[o.img_id];
|
||||
|
||||
// Compute partiality with refined mosaicity
|
||||
double partiality = 1.0;
|
||||
|
||||
switch (opt.partiality_model) {
|
||||
@@ -657,7 +616,7 @@ namespace {
|
||||
const double arg_minus = (r.delta_phi_deg - half_wedge) * c1 / mosaicity[o.img_id];
|
||||
partiality = (std::erf(arg_plus) - std::erf(arg_minus)) / 2.0;
|
||||
}
|
||||
break;
|
||||
break;
|
||||
case ScaleMergeOptions::PartialityModel::Still:
|
||||
partiality = std::exp(-r.dist_ewald * r.dist_ewald / R_sq[o.img_id]);
|
||||
break;
|
||||
@@ -668,8 +627,7 @@ namespace {
|
||||
if (partiality <= opt.min_partiality_for_merge)
|
||||
continue;
|
||||
|
||||
const double bscale = std::exp(-image_bfactor[o.img_id] / (2.0 * r.d * r.d));
|
||||
const double correction = G_i * bscale * partiality * lp;
|
||||
const double correction = G_i * partiality * lp;
|
||||
if (correction <= 0.0)
|
||||
continue;
|
||||
|
||||
@@ -767,32 +725,28 @@ ScaleMergeResult ScaleAndMergeReflectionsCeres(const std::vector<std::vector<Ref
|
||||
std::vector<double> g(n_image_slots, 1.0);
|
||||
std::vector<double> mosaicity(n_image_slots, opt.mosaicity_init_deg);
|
||||
std::vector<double> R_sq(n_image_slots, 0.001 * 0.001);
|
||||
std::vector<double> image_bfactor(n_image_slots, opt.image_bfactor_init_A2);
|
||||
|
||||
for (int i = 0; i < n_image_slots; i++) {
|
||||
if (!image_slot_used[i]) {
|
||||
mosaicity[i] = NAN;
|
||||
g[i] = NAN;
|
||||
R_sq[i] = NAN;
|
||||
image_bfactor[i] = NAN;
|
||||
} else if (opt.mosaicity_init_deg_vec.size() > i && std::isfinite(opt.mosaicity_init_deg_vec[i])) {
|
||||
mosaicity[i] = opt.mosaicity_init_deg_vec[i];
|
||||
}
|
||||
}
|
||||
|
||||
scale(opt, g, mosaicity, R_sq, image_bfactor, image_slot_used, rotation_crystallography, nhkl, obs);
|
||||
scale(opt, g, mosaicity, R_sq, image_slot_used, rotation_crystallography, nhkl, obs);
|
||||
|
||||
ScaleMergeResult out;
|
||||
|
||||
out.image_scale_g.resize(observations.size(), NAN);
|
||||
out.mosaicity_deg.resize(observations.size(), NAN);
|
||||
out.image_bfactor.resize(observations.size(), NAN);
|
||||
for (int i = 0; i < observations.size(); i++) {
|
||||
size_t img_slot = i / opt.image_cluster;
|
||||
if (image_slot_used[img_slot]) {
|
||||
out.image_scale_g[i] = g[img_slot];
|
||||
out.mosaicity_deg[i] = mosaicity[img_slot];
|
||||
out.image_bfactor[i] = image_bfactor[img_slot];
|
||||
}
|
||||
}
|
||||
|
||||
@@ -810,6 +764,7 @@ ScaleMergeResult ScaleAndMergeReflectionsCeres(const std::vector<std::vector<Ref
|
||||
out.merged[h].d = 0.0;
|
||||
}
|
||||
|
||||
// Populate d from median of observations per HKL
|
||||
{
|
||||
std::vector<std::vector<double> > per_hkl_d(nhkl);
|
||||
for (const auto &o: obs) {
|
||||
@@ -826,7 +781,7 @@ ScaleMergeResult ScaleAndMergeReflectionsCeres(const std::vector<std::vector<Ref
|
||||
}
|
||||
}
|
||||
|
||||
calc_obs(opt, g, mosaicity, R_sq, image_bfactor, obs, corr_obs);
|
||||
calc_obs(opt, g, mosaicity, R_sq, obs, corr_obs);
|
||||
merge(nhkl, out, corr_obs);
|
||||
stats(opt, nhkl, out, corr_obs);
|
||||
|
||||
|
||||
@@ -50,12 +50,6 @@ struct ScaleMergeOptions {
|
||||
|
||||
bool refine_wedge = false;
|
||||
|
||||
bool refine_image_bfactor = false;
|
||||
double image_bfactor_init_A2 = 0.0;
|
||||
double image_bfactor_min_A2 = -50.0;
|
||||
double image_bfactor_max_A2 = 100.0;
|
||||
double image_bfactor_regularization_sigma_A2 = 20.0;
|
||||
|
||||
enum class PartialityModel {Fixed, Rotation, Unity, Still} partiality_model = PartialityModel::Fixed;
|
||||
};
|
||||
|
||||
@@ -91,7 +85,7 @@ struct ScaleMergeResult {
|
||||
std::vector<MergedReflection> merged;
|
||||
std::vector<float> image_scale_g;
|
||||
std::vector<float> mosaicity_deg;
|
||||
std::vector<float> image_bfactor;
|
||||
|
||||
/// Per-shell and overall merging statistics (populated after merging)
|
||||
MergeStatistics statistics;
|
||||
};
|
||||
|
||||
@@ -37,22 +37,21 @@ void print_usage(Logger &logger) {
|
||||
logger.Info(" -s<num> Start image number (default: 0)");
|
||||
logger.Info(" -e<num> End image number (default: all)");
|
||||
logger.Info(" -v Verbose output");
|
||||
logger.Info(" -d<num> High resolution limit for spot finding (default: 1.5)");
|
||||
logger.Info(" -T<num> Noise sigma level for spot finding (default: 3.0)");
|
||||
logger.Info(" -t<num> Photon count threshold for spot finding (default: 10)");
|
||||
logger.Info(" -c<num> Max spot count (default: 250)");
|
||||
logger.Info(" -R[num] Rotation indexing (optional: min angular range deg)");
|
||||
logger.Info(" -F Use FFT indexing algorithm (shortcut for -XFFT)");
|
||||
logger.Info(" -X<txt> Indexing algorithm (FFBIDX|FFT|FFTW|Auto|None)");
|
||||
logger.Info(" -x No least-square beam center refinement");
|
||||
logger.Info(" -C<cell> Fix reference unit cell: -C\"a,b,c,alpha,beta,gamma\" (comma-separated, no spaces; quotes optional)");
|
||||
logger.Info(" -M Scale and merge (refine mosaicity) and write scaled.hkl + image.dat");
|
||||
logger.Info(" -B Refine per image B-factor in scaling");
|
||||
logger.Info(" -S<num> Space group number");
|
||||
logger.Info(" -P<txt> Partiality refinement fixed|rot|unity (default: fixed)");
|
||||
logger.Info(" -d<num> High resolution limit for spot finding (default: 1.5)");
|
||||
logger.Info(" -D<num> High resolution limit for scaling/merging (default: 0.0; no limit)");
|
||||
logger.Info(" -S<num> Space group number");
|
||||
logger.Info(" -M Scale and merge (refine mosaicity) and write scaled.hkl + image.dat");
|
||||
logger.Info(" -P<txt> Partiality refinement fixed|rot|unity (default: fixed)");
|
||||
logger.Info(" -A Anomalous mode (don't merge Friedel pairs)");
|
||||
logger.Info(" -C<cell> Fix reference unit cell: -C\"a,b,c,alpha,beta,gamma\" (comma-separated, no spaces; quotes optional)");
|
||||
logger.Info(" -c<num> Max spot count (default: 250)");
|
||||
logger.Info(" -W HDF5 file with analysis results is written");
|
||||
logger.Info(" -T<num> Noise sigma level for spot finding (default: 3.0)");
|
||||
logger.Info(" -t<num> Photon count threshold for spot finding (default: 10)");
|
||||
}
|
||||
|
||||
void trim_in_place(std::string& t) {
|
||||
@@ -138,7 +137,6 @@ int main(int argc, char **argv) {
|
||||
std::optional<int64_t> max_spot_count_override;
|
||||
float sigma_spot_finding = 3.0;
|
||||
int64_t photon_count_threshold_spot_finding = 10;
|
||||
bool refine_b_factor = false;
|
||||
|
||||
IndexingAlgorithmEnum indexing_algorithm = IndexingAlgorithmEnum::Auto;
|
||||
|
||||
@@ -153,7 +151,7 @@ int main(int argc, char **argv) {
|
||||
}
|
||||
|
||||
int opt;
|
||||
while ((opt = getopt(argc, argv, "o:N:s:e:vc:R::FX:xd:S:MP:AD:C:T:t:WB")) != -1) {
|
||||
while ((opt = getopt(argc, argv, "o:N:s:e:vc:R::FX:xd:S:MP:AD:C:T:t:W")) != -1) {
|
||||
switch (opt) {
|
||||
case 'o':
|
||||
output_prefix = optarg;
|
||||
@@ -179,9 +177,6 @@ int main(int argc, char **argv) {
|
||||
case 'c':
|
||||
max_spot_count_override = atoll(optarg);
|
||||
break;
|
||||
case 'B':
|
||||
refine_b_factor = true;
|
||||
break;
|
||||
case 'R':
|
||||
rotation_indexing = true;
|
||||
if (optarg) rotation_indexing_range = atof(optarg);
|
||||
@@ -552,7 +547,6 @@ int main(int argc, char **argv) {
|
||||
scale_opts.max_solver_time_s = 240.0; // generous cutoff for now
|
||||
scale_opts.merge_friedel = !anomalous_mode;
|
||||
scale_opts.d_min_limit_A = d_min_scale_merge.value_or(0.0);
|
||||
scale_opts.refine_image_bfactor = refine_b_factor;
|
||||
|
||||
const bool fixed_space_group = space_group || experiment.GetGemmiSpaceGroup().has_value();
|
||||
|
||||
|
||||
+46
-4
@@ -86,7 +86,24 @@ void FileWriter::WriteHDF5(const DataMessage& msg) {
|
||||
if (format == FileWriterFormat::NXmxIntegrated && master_file)
|
||||
files[file_number]->CreateFile(msg, master_file->GetFile());
|
||||
}
|
||||
files[file_number]->Write(msg, image_number);
|
||||
|
||||
// If the file is already broken from a previous write, fail fast and stop
|
||||
// touching it. Mark as closed so we don't try again.
|
||||
if (files[file_number]->IsBroken()) {
|
||||
files[file_number].reset();
|
||||
closed_files.insert(file_number);
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError,
|
||||
"Data file " + std::to_string(file_number) + " is broken; aborting writes");
|
||||
}
|
||||
|
||||
try {
|
||||
files[file_number]->Write(msg, image_number);
|
||||
} catch (...) {
|
||||
// Don't drop the file yet — we still want to attempt a clean Close()
|
||||
// (which itself is broken-aware) during Finalize, so the .tmp file
|
||||
// gets unlinked and we don't publish a corrupt .h5 .
|
||||
throw;
|
||||
}
|
||||
|
||||
if (files[file_number]->GetNumImages() == start_message.images_per_file) {
|
||||
CloseFile(file_number);
|
||||
@@ -103,7 +120,15 @@ void FileWriter::CloseFile(uint64_t file_number) {
|
||||
if (closed_files.contains(file_number))
|
||||
return;
|
||||
|
||||
AddStats(files[file_number]->Close());
|
||||
try {
|
||||
AddStats(files[file_number]->Close());
|
||||
} catch (...) {
|
||||
// Even if Close() failed, mark the slot as closed and drop the handle.
|
||||
// Re-throw so StreamWriter goes to error state.
|
||||
files[file_number].reset();
|
||||
closed_files.insert(file_number);
|
||||
throw;
|
||||
}
|
||||
files[file_number].reset();
|
||||
closed_files.insert(file_number);
|
||||
}
|
||||
@@ -126,12 +151,23 @@ void FileWriter::CloseOldFiles(uint64_t current_image_number) {
|
||||
std::vector<HDF5DataFileStatistics> FileWriter::Finalize() {
|
||||
std::lock_guard<std::mutex> lock(hdf5_mutex);
|
||||
|
||||
std::exception_ptr first_err;
|
||||
for (uint64_t f = 0; f < files.size(); ++f) {
|
||||
if (files[f] && !closed_files.contains(f))
|
||||
CloseFile(f);
|
||||
if (files[f] && !closed_files.contains(f)) {
|
||||
try {
|
||||
CloseFile(f);
|
||||
} catch (...) {
|
||||
if (!first_err) first_err = std::current_exception();
|
||||
}
|
||||
}
|
||||
}
|
||||
// Master is finalized via WriteHDF5(EndMessage); here we only release it.
|
||||
if (master_file)
|
||||
master_file.reset();
|
||||
|
||||
if (first_err)
|
||||
std::rethrow_exception(first_err);
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
@@ -199,14 +235,20 @@ void FileWriter::CreateHDF5MasterFile(const StartMessage &msg) {
|
||||
void FileWriter::WriteHDF5(const CompressedImage &msg) {
|
||||
if (master_file) {
|
||||
std::lock_guard<std::mutex> lock(hdf5_mutex);
|
||||
if (master_file->IsBroken()) {
|
||||
spdlog::warn("Calibration {} not written: master file is broken", msg.GetChannel());
|
||||
return;
|
||||
}
|
||||
try {
|
||||
master_file->WriteCalibration(msg);
|
||||
} catch (const JFJochException &e) {
|
||||
spdlog::error("Calibration {} not written {}", msg.GetChannel(), e.what());
|
||||
// master is now marked broken inside WriteCalibration; future calls will short-circuit
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void FileWriter::WriteHDF5(const EndMessage &msg) {
|
||||
if (master_file) {
|
||||
std::lock_guard<std::mutex> lock(hdf5_mutex);
|
||||
|
||||
+105
-39
@@ -60,27 +60,79 @@ std::optional<HDF5DataFileStatistics> HDF5DataFile::Close() {
|
||||
if (!data_file)
|
||||
return {};
|
||||
|
||||
HDF5Group group_exp(*data_file, "/entry/detector");
|
||||
group_exp.NXClass("NXcollection");
|
||||
|
||||
group_exp.SaveVector("timestamp", timestamp);
|
||||
group_exp.SaveVector("exptime", exptime);
|
||||
group_exp.SaveVector("number", number);
|
||||
|
||||
for (auto &p: plugins)
|
||||
p->WriteFinal(*data_file);
|
||||
|
||||
if (data_set) {
|
||||
data_set->SetExtent({max_image_number + 1, ypixel, xpixel});
|
||||
data_set
|
||||
->Attr("image_nr_low", (int32_t) (image_low + 1))
|
||||
.Attr("image_nr_high", (int32_t) (image_low + 1 + max_image_number));
|
||||
data_set.reset();
|
||||
// If a prior write already failed, do not call ANY further HDF5 routines on
|
||||
// this file (per HDF Forum guidance: behavior after an I/O error is undefined,
|
||||
// and a subsequent H5Fclose can segfault). Just drop the handles and unlink
|
||||
// the tmp file. Do NOT rename to the final name.
|
||||
if (broken) {
|
||||
if (data_set) data_set.reset();
|
||||
if (data_set_image_number) data_set_image_number.reset();
|
||||
data_file.reset();
|
||||
if (manage_file) {
|
||||
std::error_code ec;
|
||||
std::filesystem::remove(tmp_filename, ec);
|
||||
}
|
||||
closed = true;
|
||||
return {};
|
||||
}
|
||||
data_file.reset();
|
||||
|
||||
if (manage_file && (!std::filesystem::exists(filename.c_str()) || overwrite))
|
||||
std::rename(tmp_filename.c_str(), filename.c_str());
|
||||
try {
|
||||
HDF5Group group_exp(*data_file, "/entry/detector");
|
||||
group_exp.NXClass("NXcollection");
|
||||
|
||||
group_exp.SaveVector("timestamp", timestamp);
|
||||
group_exp.SaveVector("exptime", exptime);
|
||||
group_exp.SaveVector("number", number);
|
||||
|
||||
for (auto &p: plugins)
|
||||
p->WriteFinal(*data_file);
|
||||
|
||||
if (data_set) {
|
||||
data_set->SetExtent({max_image_number + 1, ypixel, xpixel});
|
||||
data_set
|
||||
->Attr("image_nr_low", (int32_t) (image_low + 1))
|
||||
.Attr("image_nr_high", (int32_t) (image_low + 1 + max_image_number));
|
||||
data_set->Close();
|
||||
data_set.reset();
|
||||
}
|
||||
} catch (...) {
|
||||
// Anything during finalize failed (most likely ENOSPC). Mark broken,
|
||||
// drop handles without further HDF5 calls, remove tmp, propagate.
|
||||
broken = true;
|
||||
if (data_set) data_set.reset();
|
||||
data_file.reset();
|
||||
if (manage_file) {
|
||||
std::error_code ec;
|
||||
std::filesystem::remove(tmp_filename, ec);
|
||||
}
|
||||
closed = true;
|
||||
throw;
|
||||
}
|
||||
|
||||
if (manage_file) {
|
||||
try {
|
||||
data_file->Close();
|
||||
} catch (...) {
|
||||
broken = true;
|
||||
data_file.reset();
|
||||
std::error_code ec;
|
||||
std::filesystem::remove(tmp_filename, ec);
|
||||
closed = true;
|
||||
throw;
|
||||
}
|
||||
data_file.reset();
|
||||
|
||||
if (std::filesystem::exists(filename) && !overwrite)
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError, "File already exists");
|
||||
std::error_code ec;
|
||||
std::filesystem::rename(tmp_filename, filename, ec);
|
||||
if (ec)
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError,
|
||||
"Cannot rename temporary HDF5 file " + tmp_filename +
|
||||
" to " + filename + ": " + ec.message());
|
||||
} else {
|
||||
data_file.reset();
|
||||
}
|
||||
|
||||
closed = true;
|
||||
|
||||
@@ -95,11 +147,15 @@ std::optional<HDF5DataFileStatistics> HDF5DataFile::Close() {
|
||||
HDF5DataFile::~HDF5DataFile() {
|
||||
if (data_file) {
|
||||
try {
|
||||
Close();
|
||||
} catch (const std::exception &e) {
|
||||
std::cerr << "HDF5DataFile::~HDF5DataFile: " << e.what() << std::endl;
|
||||
data_set.reset();
|
||||
data_set_image_number.reset();
|
||||
data_file.reset();
|
||||
if (manage_file) {
|
||||
std::error_code ec;
|
||||
std::filesystem::remove(tmp_filename, ec);
|
||||
}
|
||||
} catch (...) {
|
||||
std::cerr << "HDF5DataFile::~HDF5DataFile: Unknown error " << std::endl;
|
||||
// Never throw from destructor; HDF5 may already be in a bad state
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -148,6 +204,9 @@ void HDF5DataFile::Write(const DataMessage &msg, uint64_t image_number) {
|
||||
if (closed)
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError,
|
||||
"Trying to write to already closed file");
|
||||
if (broken)
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError,
|
||||
"Trying to write to file that previously failed");
|
||||
if (image_number >= images_per_file)
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError,
|
||||
"Image number out of bounds");
|
||||
@@ -157,23 +216,30 @@ void HDF5DataFile::Write(const DataMessage &msg, uint64_t image_number) {
|
||||
CreateFile(msg, std::make_shared<HDF5File>(tmp_filename));
|
||||
}
|
||||
|
||||
if (new_file || (static_cast<int64_t>(image_number) > max_image_number)) {
|
||||
max_image_number = image_number;
|
||||
timestamp.resize(max_image_number + 1);
|
||||
exptime.resize(max_image_number + 1);
|
||||
number.resize(max_image_number + 1);
|
||||
new_file = false;
|
||||
try {
|
||||
if (new_file || (static_cast<int64_t>(image_number) > max_image_number)) {
|
||||
max_image_number = image_number;
|
||||
timestamp.resize(max_image_number + 1);
|
||||
exptime.resize(max_image_number + 1);
|
||||
number.resize(max_image_number + 1);
|
||||
new_file = false;
|
||||
}
|
||||
|
||||
nimages++;
|
||||
data_set->WriteDirectChunk(msg.image.GetCompressed(), msg.image.GetCompressedSize(),
|
||||
{image_number, 0, 0});
|
||||
|
||||
for (auto &p: plugins)
|
||||
p->Write(msg, image_number);
|
||||
|
||||
timestamp[image_number] = msg.timestamp;
|
||||
exptime[image_number] = msg.exptime;
|
||||
number[image_number] = (msg.original_number) ? msg.original_number.value() : msg.number;
|
||||
} catch (...) {
|
||||
// Sticky failure: do not call into HDF5 again for this file.
|
||||
broken = true;
|
||||
throw;
|
||||
}
|
||||
|
||||
nimages++;
|
||||
data_set->WriteDirectChunk(msg.image.GetCompressed(), msg.image.GetCompressedSize(), {image_number, 0, 0});
|
||||
|
||||
for (auto &p: plugins)
|
||||
p->Write(msg, image_number);
|
||||
|
||||
timestamp[image_number] = msg.timestamp;
|
||||
exptime[image_number] = msg.exptime;
|
||||
number[image_number] = (msg.original_number) ? msg.original_number.value() : msg.number;
|
||||
}
|
||||
|
||||
size_t HDF5DataFile::GetNumImages() const {
|
||||
|
||||
@@ -44,6 +44,7 @@ class HDF5DataFile {
|
||||
int32_t image_low;
|
||||
|
||||
bool closed = false;
|
||||
bool broken = false;
|
||||
|
||||
bool overwrite = false;
|
||||
int64_t file_number;
|
||||
@@ -55,6 +56,7 @@ public:
|
||||
std::optional<HDF5DataFileStatistics> Close();
|
||||
void Write(const DataMessage& msg, uint64_t image_number);
|
||||
size_t GetNumImages() const;
|
||||
bool IsBroken() const { return broken; }
|
||||
|
||||
void CreateFile(const DataMessage& msg, std::shared_ptr<HDF5File> data_file, bool integrated = false);
|
||||
};
|
||||
|
||||
+107
-46
@@ -35,9 +35,7 @@ NXmx::NXmx(const StartMessage &start)
|
||||
|
||||
MakeDirectory(filename);
|
||||
|
||||
bool v1_10 = (start.file_format == FileWriterFormat::NXmxVDS);
|
||||
|
||||
hdf5_file = std::make_shared<HDF5File>(tmp_filename, v1_10);
|
||||
hdf5_file = std::make_shared<HDF5File>(tmp_filename, true);
|
||||
hdf5_file->Attr("file_name", filename);
|
||||
hdf5_file->Attr("HDF5_Version", hdf5_version());
|
||||
HDF5Group(*hdf5_file, "/entry").NXClass("NXentry").SaveScalar("definition", "NXmx");
|
||||
@@ -54,8 +52,13 @@ NXmx::NXmx(const StartMessage &start)
|
||||
}
|
||||
|
||||
NXmx::~NXmx() {
|
||||
if (!std::filesystem::exists(filename.c_str()) || overwrite)
|
||||
std::rename(tmp_filename.c_str(), filename.c_str());
|
||||
try {
|
||||
if (hdf5_file) {
|
||||
hdf5_file.reset();
|
||||
std::error_code ec;
|
||||
std::filesystem::remove(tmp_filename, ec);
|
||||
}
|
||||
} catch (...) {}
|
||||
}
|
||||
|
||||
|
||||
@@ -373,6 +376,21 @@ void NXmx::Detector(const StartMessage &start) {
|
||||
}
|
||||
|
||||
void NXmx::Detector(const StartMessage &start, const EndMessage &end) {
|
||||
hsize_t total_images = end.max_image_number;
|
||||
hsize_t width = start.image_size_x;
|
||||
hsize_t height = start.image_size_y;
|
||||
|
||||
if (start.file_format == FileWriterFormat::NXmxLegacy) {
|
||||
auto data_dataset = VDS(start,
|
||||
"/entry/instrument/detector/data",
|
||||
{total_images, height, width},
|
||||
HDF5DataType(start.bit_depth_image / 8, start.pixel_signed));
|
||||
data_dataset->Attr("image_nr_low", (int32_t) 1)
|
||||
.Attr("image_nr_high",(int32_t) total_images);
|
||||
} else {
|
||||
hdf5_file->HardLink("/entry/data/data", "/entry/instrument/detector/data");
|
||||
}
|
||||
|
||||
if (start.images_per_trigger.has_value() && start.images_per_trigger.value() > 0) {
|
||||
SaveScalar(*hdf5_file, "/entry/instrument/detector/detectorSpecific/nimages", start.images_per_trigger.value());
|
||||
SaveScalar(*hdf5_file, "/entry/instrument/detector/detectorSpecific/ntrigger", (end.max_image_number + start.images_per_trigger.value() - 1)/ start.images_per_trigger.value());
|
||||
@@ -630,11 +648,19 @@ void NXmx::Attenuator(const StartMessage &start) {
|
||||
}
|
||||
|
||||
void NXmx::WriteCalibration(const CompressedImage &image) {
|
||||
if (!calibration_group_created) {
|
||||
calibration_group_created = true;
|
||||
HDF5Group(*hdf5_file, "/entry/instrument/detector/calibration").NXClass("NXcollection");
|
||||
if (broken || !hdf5_file)
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError, "HDF5 file already closed");
|
||||
|
||||
try {
|
||||
if (!calibration_group_created) {
|
||||
calibration_group_created = true;
|
||||
HDF5Group(*hdf5_file, "/entry/instrument/detector/calibration").NXClass("NXcollection");
|
||||
}
|
||||
SaveCBORImage("/entry/instrument/detector/calibration/" + image.GetChannel(), image);
|
||||
} catch (...) {
|
||||
broken = true;
|
||||
throw;
|
||||
}
|
||||
SaveCBORImage("/entry/instrument/detector/calibration/" + image.GetChannel(), image);
|
||||
}
|
||||
|
||||
void NXmx::SaveCBORImage(const std::string &hdf5_path, const CompressedImage &image) {
|
||||
@@ -655,6 +681,8 @@ void NXmx::SaveCBORImage(const std::string &hdf5_path, const CompressedImage &im
|
||||
dataset->Write(data_type, image.GetCompressed());
|
||||
else
|
||||
dataset->WriteDirectChunk(image.GetCompressed(), image.GetCompressedSize(), {0, 0});
|
||||
|
||||
dataset->Close();
|
||||
}
|
||||
|
||||
void NXmx::AzimuthalIntegration(const StartMessage &start, const EndMessage &end) {
|
||||
@@ -689,50 +717,83 @@ void NXmx::ADUHistogram(const EndMessage &end) {
|
||||
}
|
||||
|
||||
void NXmx::Finalize(const EndMessage &end) {
|
||||
if (end.end_date) {
|
||||
hdf5_file->Attr("file_time", end.end_date.value());
|
||||
hdf5_file->SaveScalar("/entry/end_time", end.end_date.value());
|
||||
hdf5_file->SaveScalar("/entry/end_time_estimated", end.end_date.value());
|
||||
} else {
|
||||
std::string time_now = time_UTC(std::chrono::system_clock::now());
|
||||
hdf5_file->Attr("file_time", time_now);
|
||||
hdf5_file->SaveScalar("/entry/end_time", time_now);
|
||||
hdf5_file->SaveScalar("/entry/end_time_estimated", time_now);
|
||||
if (!hdf5_file)
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError, "HDF5 file already closed");
|
||||
|
||||
if (broken) {
|
||||
// Don't touch HDF5 anymore. Drop handle, unlink tmp, refuse to rename.
|
||||
hdf5_file.reset();
|
||||
std::error_code ec;
|
||||
std::filesystem::remove(tmp_filename, ec);
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError,
|
||||
"HDF5 master file is broken (previous I/O error)");
|
||||
}
|
||||
|
||||
Detector(start_message, end);
|
||||
Sample(start_message, end);
|
||||
AzimuthalIntegration(start_message, end);
|
||||
ADUHistogram(end);
|
||||
try {
|
||||
if (end.end_date) {
|
||||
hdf5_file->Attr("file_time", end.end_date.value());
|
||||
hdf5_file->SaveScalar("/entry/end_time", end.end_date.value());
|
||||
hdf5_file->SaveScalar("/entry/end_time_estimated", end.end_date.value());
|
||||
} else {
|
||||
std::string time_now = time_UTC(std::chrono::system_clock::now());
|
||||
hdf5_file->Attr("file_time", time_now);
|
||||
hdf5_file->SaveScalar("/entry/end_time", time_now);
|
||||
hdf5_file->SaveScalar("/entry/end_time_estimated", time_now);
|
||||
}
|
||||
|
||||
switch (start_message.file_format.value_or(FileWriterFormat::NXmxLegacy)) {
|
||||
case FileWriterFormat::NXmxLegacy:
|
||||
LinkToData(start_message, end);
|
||||
break;
|
||||
case FileWriterFormat::NXmxVDS:
|
||||
LinkToData_VDS(start_message, end);
|
||||
break;
|
||||
case FileWriterFormat::NXmxIntegrated:
|
||||
default:
|
||||
break;
|
||||
Detector(start_message, end);
|
||||
Sample(start_message, end);
|
||||
AzimuthalIntegration(start_message, end);
|
||||
ADUHistogram(end);
|
||||
|
||||
switch (start_message.file_format.value_or(FileWriterFormat::NXmxLegacy)) {
|
||||
case FileWriterFormat::NXmxLegacy:
|
||||
LinkToData(start_message, end);
|
||||
break;
|
||||
case FileWriterFormat::NXmxVDS:
|
||||
LinkToData_VDS(start_message, end);
|
||||
break;
|
||||
case FileWriterFormat::NXmxIntegrated:
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
if (end.rotation_lattice)
|
||||
SaveVector(*hdf5_file, "/entry/MX/rotationLatticeIndexed", end.rotation_lattice->GetVector())
|
||||
->Units("Angstrom");
|
||||
|
||||
if (end.rotation_lattice_type)
|
||||
SaveScalar(*hdf5_file, "/entry/MX/rotationLatticeNiggliClass", end.rotation_lattice_type->niggli_class);
|
||||
|
||||
if (end.indexing_rate)
|
||||
SaveScalar(*hdf5_file, "/entry/MX/imageIndexedMean", end.indexing_rate.value());
|
||||
if (end.bkg_estimate)
|
||||
SaveScalar(*hdf5_file, "/entry/MX/bkgEstimateMean", end.bkg_estimate.value());
|
||||
|
||||
if (!end.scale_factor.empty())
|
||||
SaveVector(*hdf5_file, "/entry/MX/imageScaleFactor", end.scale_factor);
|
||||
|
||||
hdf5_file->Close();
|
||||
} catch (...) {
|
||||
broken = true;
|
||||
// Drop the file id without further HDF5 calls; remove tmp.
|
||||
hdf5_file.reset();
|
||||
std::error_code ec;
|
||||
std::filesystem::remove(tmp_filename, ec);
|
||||
throw;
|
||||
}
|
||||
|
||||
if (end.rotation_lattice)
|
||||
SaveVector(*hdf5_file, "/entry/MX/rotationLatticeIndexed", end.rotation_lattice->GetVector())
|
||||
->Units("Angstrom");
|
||||
hdf5_file.reset();
|
||||
|
||||
if (end.rotation_lattice_type)
|
||||
SaveScalar(*hdf5_file, "/entry/MX/rotationLatticeNiggliClass", end.rotation_lattice_type->niggli_class);
|
||||
if (std::filesystem::exists(filename) && !overwrite)
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError, "File already exists");
|
||||
|
||||
if (end.indexing_rate) {
|
||||
SaveScalar(*hdf5_file, "/entry/MX/imageIndexedMean", end.indexing_rate.value());
|
||||
}
|
||||
if (end.bkg_estimate) {
|
||||
SaveScalar(*hdf5_file, "/entry/MX/bkgEstimateMean", end.bkg_estimate.value());
|
||||
}
|
||||
|
||||
if (!end.scale_factor.empty())
|
||||
SaveVector(*hdf5_file, "/entry/MX/imageScaleFactor", end.scale_factor);
|
||||
std::error_code ec;
|
||||
std::filesystem::rename(tmp_filename, filename, ec);
|
||||
if (ec)
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError,
|
||||
"Cannot rename temporary HDF5 master file " + tmp_filename +
|
||||
" to " + filename + ": " + ec.message());
|
||||
}
|
||||
|
||||
void NXmx::UserData(const StartMessage &start) {
|
||||
|
||||
@@ -19,6 +19,7 @@ class NXmx {
|
||||
std::string tmp_filename;
|
||||
bool overwrite = false;
|
||||
bool calibration_group_created = false;
|
||||
bool broken = false;
|
||||
|
||||
void LinkToData(const StartMessage &start, const EndMessage &end);
|
||||
void LinkToData_VDS(const StartMessage &start, const EndMessage &end);
|
||||
@@ -58,6 +59,7 @@ public:
|
||||
NXmx& operator=(const NXmx &other) = delete;
|
||||
void Finalize(const EndMessage &end);
|
||||
void WriteCalibration(const CompressedImage &image);
|
||||
bool IsBroken() const { return broken; }
|
||||
|
||||
std::shared_ptr<HDF5File> GetFile();
|
||||
};
|
||||
|
||||
+86
-8
@@ -90,7 +90,12 @@ void HDF5DataSpace::SelectHyperslabWithStride(const std::vector<hsize_t> &start,
|
||||
}
|
||||
|
||||
HDF5DataSpace::~HDF5DataSpace() {
|
||||
if (id >= 0) H5Sclose(id);
|
||||
if (id >= 0) {
|
||||
H5E_BEGIN_TRY {
|
||||
H5Sclose(id);
|
||||
} H5E_END_TRY;
|
||||
id = -1;
|
||||
}
|
||||
}
|
||||
|
||||
uint8_t HDF5DataSpace::GetNumOfDimensions() const {
|
||||
@@ -204,7 +209,12 @@ HDF5DataType::HDF5DataType(const HDF5DataSet &data_set) :HDF5Id() {
|
||||
}
|
||||
|
||||
HDF5DataType::~HDF5DataType() {
|
||||
if (id >= 0) H5Tclose(id);
|
||||
if (id >= 0) {
|
||||
H5E_BEGIN_TRY {
|
||||
H5Tclose(id);
|
||||
} H5E_END_TRY;
|
||||
id = -1;
|
||||
}
|
||||
}
|
||||
|
||||
size_t HDF5DataType::GetElemSize() const {
|
||||
@@ -251,7 +261,12 @@ HDF5Dcpl::HDF5Dcpl(const HDF5DataSet &data_set) : HDF5Id() {
|
||||
}
|
||||
|
||||
HDF5Dcpl::~HDF5Dcpl() {
|
||||
if (id >= 0) H5Pclose(id);
|
||||
if (id >= 0) {
|
||||
H5E_BEGIN_TRY {
|
||||
H5Pclose(id);
|
||||
} H5E_END_TRY;
|
||||
id = -1;
|
||||
}
|
||||
}
|
||||
|
||||
void HDF5Dcpl::SetChunking(const std::vector<hsize_t> &dims) {
|
||||
@@ -308,7 +323,12 @@ HDF5Fapl::HDF5Fapl() : HDF5Id() {
|
||||
}
|
||||
|
||||
HDF5Fapl::~HDF5Fapl() {
|
||||
H5Pclose(id);
|
||||
if (id >= 0) {
|
||||
H5E_BEGIN_TRY {
|
||||
H5Pclose(id);
|
||||
} H5E_END_TRY;
|
||||
id = -1;
|
||||
}
|
||||
}
|
||||
|
||||
void HDF5Fapl::SetVersionTo1p10orNewer() {
|
||||
@@ -685,7 +705,12 @@ HDF5Group::HDF5Group(const HDF5Object& parent, const char *name) : HDF5Object()
|
||||
}
|
||||
|
||||
HDF5Group::~HDF5Group() {
|
||||
H5Gclose(id);
|
||||
if (id >= 0) {
|
||||
H5E_BEGIN_TRY {
|
||||
H5Gclose(id);
|
||||
} H5E_END_TRY;
|
||||
id = -1;
|
||||
}
|
||||
}
|
||||
|
||||
HDF5File::HDF5File(const std::string& filename, bool v1_10) : HDF5Object() {
|
||||
@@ -698,8 +723,37 @@ HDF5File::HDF5File(const std::string& filename, bool v1_10) : HDF5Object() {
|
||||
throw JFJochException(JFJochExceptionCategory::HDF5, "Cannot open/create data HDF5 file " + filename);
|
||||
}
|
||||
|
||||
void HDF5File::Close() {
|
||||
if (id < 0)
|
||||
return;
|
||||
|
||||
// Invalidate first; if anything below fails (e.g. ENOSPC) we must NOT
|
||||
// leave a live id behind for the destructor or later code to touch.
|
||||
const hid_t local_id = id;
|
||||
id = -1;
|
||||
|
||||
herr_t flush_err = 0;
|
||||
H5E_BEGIN_TRY {
|
||||
flush_err = H5Fflush(local_id, H5F_SCOPE_GLOBAL);
|
||||
} H5E_END_TRY;
|
||||
|
||||
herr_t close_err = 0;
|
||||
H5E_BEGIN_TRY {
|
||||
close_err = H5Fclose(local_id);
|
||||
} H5E_END_TRY;
|
||||
|
||||
if (flush_err < 0 || close_err < 0)
|
||||
throw JFJochException(JFJochExceptionCategory::HDF5,
|
||||
"Failed to flush/close HDF5 file (likely no space left on device)");
|
||||
}
|
||||
|
||||
HDF5File::~HDF5File() {
|
||||
if (id >= 0) H5Fclose(id);
|
||||
if (id >= 0) {
|
||||
H5E_BEGIN_TRY {
|
||||
H5Fclose(id);
|
||||
} H5E_END_TRY;
|
||||
id = -1;
|
||||
}
|
||||
}
|
||||
|
||||
void HDF5File::Delete(const std::string& path) {
|
||||
@@ -713,7 +767,10 @@ HDF5ReadOnlyFile::HDF5ReadOnlyFile(const std::string &filename) {
|
||||
}
|
||||
|
||||
HDF5ReadOnlyFile::~HDF5ReadOnlyFile() {
|
||||
if (id >= 0) H5Fclose(id);
|
||||
if (id >= 0) {
|
||||
H5E_BEGIN_TRY {H5Fclose(id); } H5E_END_TRY;
|
||||
id = -1;
|
||||
}
|
||||
}
|
||||
|
||||
HDF5DataSet::HDF5DataSet(const HDF5Object &parent, const std::string &name, const HDF5DataType &data_type,
|
||||
@@ -799,8 +856,29 @@ std::string HDF5DataSet::ReadString() const {
|
||||
return buffer;
|
||||
}
|
||||
|
||||
void HDF5DataSet::Close() {
|
||||
if (id < 0)
|
||||
return;
|
||||
|
||||
const hid_t local_id = id;
|
||||
id = -1;
|
||||
|
||||
herr_t err = 0;
|
||||
H5E_BEGIN_TRY {
|
||||
err = H5Dclose(local_id);
|
||||
} H5E_END_TRY;
|
||||
|
||||
if (err < 0)
|
||||
throw JFJochException(JFJochExceptionCategory::HDF5, "Cannot close HDF5 dataset");
|
||||
}
|
||||
|
||||
HDF5DataSet::~HDF5DataSet() {
|
||||
if (id >= 0) H5Dclose(id);
|
||||
if (id >= 0) {
|
||||
H5E_BEGIN_TRY {
|
||||
H5Dclose(id);
|
||||
} H5E_END_TRY;
|
||||
id = -1;
|
||||
}
|
||||
}
|
||||
|
||||
void HDF5DataSet::ReadDirectChunk(std::vector<uint8_t> &val, const std::vector<hsize_t> &offset) {
|
||||
|
||||
@@ -169,6 +169,7 @@ public:
|
||||
explicit HDF5File(const std::string& filename, bool v1_10 = false);
|
||||
~HDF5File();
|
||||
void Delete(const std::string& path);
|
||||
void Close();
|
||||
};
|
||||
|
||||
class HDF5ReadOnlyFile : public HDF5Object {
|
||||
@@ -321,6 +322,7 @@ public:
|
||||
}
|
||||
|
||||
std::string ReadString() const;
|
||||
void Close();
|
||||
};
|
||||
|
||||
inline std::unique_ptr<HDF5DataSet> SaveScalar(const HDF5Object& parent, const std::string &name, const char* val) {
|
||||
|
||||
+47
-13
@@ -24,10 +24,22 @@ void StreamWriter::NotifyTcpAck(TCPFrameType ack_for, bool ok, bool fatal, TCPAc
|
||||
if (!image_puller.SupportsAck())
|
||||
return;
|
||||
|
||||
// Send only one fatal DATA ack per data collection; subsequent in-flight
|
||||
// DATA frames just get a quiet non-fatal NACK.
|
||||
bool send_fatal = fatal;
|
||||
if (ack_for == TCPFrameType::DATA) {
|
||||
if (fatal && tcp_data_fatal_sent) {
|
||||
send_fatal = false;
|
||||
ok = false;
|
||||
} else if (fatal) {
|
||||
tcp_data_fatal_sent = true;
|
||||
}
|
||||
}
|
||||
|
||||
PullerAckMessage ack;
|
||||
ack.ack_for = ack_for;
|
||||
ack.ok = ok;
|
||||
ack.fatal = fatal;
|
||||
ack.fatal = send_fatal;
|
||||
ack.error_code = code;
|
||||
ack.error_text = error_text;
|
||||
ack.run_number = run_number;
|
||||
@@ -105,7 +117,7 @@ void StreamWriter::ProcessCalibrationImage() {
|
||||
}
|
||||
|
||||
void StreamWriter::ProcessDataImage() {
|
||||
switch (state) {
|
||||
switch (state) {
|
||||
case StreamWriterState::Idle:
|
||||
logger.Warning("Missing meaningful image while waiting for START");
|
||||
mute_data_msg_in_idle = true;
|
||||
@@ -113,7 +125,7 @@ void StreamWriter::ProcessDataImage() {
|
||||
case StreamWriterState::Started:
|
||||
start_time = std::chrono::system_clock::now();
|
||||
state = StreamWriterState::Receiving;
|
||||
// Follow through to receiving - no brake!
|
||||
// Follow through to receiving - no break!
|
||||
case StreamWriterState::Receiving:
|
||||
try {
|
||||
if (verbose)
|
||||
@@ -129,29 +141,44 @@ void StreamWriter::ProcessDataImage() {
|
||||
if (verbose)
|
||||
logger.Info("Written");
|
||||
NotifyTcpAck(TCPFrameType::DATA, true, false, TCPAckCode::None);
|
||||
} catch (const JFJochException &e) {
|
||||
logger.ErrorException(e);
|
||||
logger.Warning("Error writing image - switching to error state");
|
||||
} catch (const std::exception &e) {
|
||||
logger.Error("Error writing image: {}", e.what());
|
||||
logger.Warning("Switching to error state; subsequent DATA frames will be NACKed silently");
|
||||
const bool first_fatal = !tcp_data_fatal_sent;
|
||||
state = StreamWriterState::Error;
|
||||
err = e.what();
|
||||
NotifyTcpAck(TCPFrameType::DATA, false, true, TCPAckCode::DataWriteFailed, err);
|
||||
// Heuristic mapping ENOSPC -> NoSpaceLeft
|
||||
TCPAckCode code = TCPAckCode::DataWriteFailed;
|
||||
std::string what = e.what();
|
||||
if (what.find("no space") != std::string::npos ||
|
||||
what.find("No space") != std::string::npos ||
|
||||
what.find("ENOSPC") != std::string::npos)
|
||||
code = TCPAckCode::NoSpaceLeft;
|
||||
NotifyTcpAck(TCPFrameType::DATA, false, first_fatal, code, err);
|
||||
}
|
||||
break;
|
||||
case StreamWriterState::Error:
|
||||
// Error state => Wait till end only
|
||||
// Quiet non-fatal NACK so the producer knows this image wasn't written
|
||||
NotifyTcpAck(TCPFrameType::DATA, false, false, TCPAckCode::DataWriteFailed, err);
|
||||
break;
|
||||
case StreamWriterState::Finalized:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void StreamWriter::ProcessEndMessage() {
|
||||
// Ignore end message when idle state!
|
||||
if (state == StreamWriterState::Idle || state == StreamWriterState::Finalized)
|
||||
return;
|
||||
|
||||
if (verbose)
|
||||
logger.Info("Received end message");
|
||||
|
||||
// Important: close DATA files BEFORE finalizing the master, so that the
|
||||
// master's links/VDS reference renamed final filenames (and so that we
|
||||
// don't keep writing to a master if data-file finalize is going to fail
|
||||
// anyway on ENOSPC).
|
||||
FinalizeDataCollection();
|
||||
|
||||
if (state != StreamWriterState::Error) {
|
||||
try {
|
||||
if ((image_puller_output.cbor->end_message->max_image_number == 0) && (max_image_number > 0))
|
||||
@@ -165,9 +192,12 @@ void StreamWriter::ProcessEndMessage() {
|
||||
}
|
||||
}
|
||||
|
||||
FinalizeDataCollection();
|
||||
// Drop file_writer either way; the master tmp is unlinked in NXmx::~NXmx
|
||||
// / NXmx::Finalize on the error path.
|
||||
file_writer.reset();
|
||||
|
||||
const bool error_state = (state == StreamWriterState::Error);
|
||||
state = StreamWriterState::Finalized;
|
||||
|
||||
NotifyReceiverOnFinalizedWrite(writer_notification_zmq_addr);
|
||||
NotifyTcpAck(TCPFrameType::END, !error_state, error_state,
|
||||
@@ -181,18 +211,22 @@ void StreamWriter::FinalizeDataCollection() {
|
||||
if (file_writer && (state != StreamWriterState::Error)) {
|
||||
try {
|
||||
hdf5_data_file_statistics = file_writer->Finalize();
|
||||
} catch (JFJochException &e) {
|
||||
} catch (const std::exception &e) {
|
||||
state = StreamWriterState::Error;
|
||||
err = e.what();
|
||||
logger.ErrorException(e);
|
||||
logger.Error("Error finalizing writing - switching to error state");
|
||||
}
|
||||
} else if (file_writer && state == StreamWriterState::Error) {
|
||||
// Best-effort cleanup: still call Finalize so per-file Close() runs and
|
||||
// unlinks the tmp files even when broken.
|
||||
try { (void) file_writer->Finalize(); } catch (...) {}
|
||||
hdf5_data_file_statistics.clear();
|
||||
} else {
|
||||
hdf5_data_file_statistics.clear();
|
||||
}
|
||||
file_writer.reset();
|
||||
logger.Info("Data writing finished");
|
||||
state = StreamWriterState::Finalized;
|
||||
// Don't reset file_writer here: ProcessEndMessage still needs it for the master.
|
||||
}
|
||||
|
||||
void StreamWriter::CollectImages() {
|
||||
|
||||
Reference in New Issue
Block a user