From f7a9e4eab1f9ac6bb40f14773a4a005355c64487 Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Mon, 4 May 2026 11:44:46 +0200 Subject: [PATCH] StreamWriter: Opus-provided improvements to improve resilience for errors --- writer/FileWriter.cpp | 50 ++++++++++++++-- writer/HDF5DataFile.cpp | 124 +++++++++++++++++++++++++++------------- writer/HDF5DataFile.h | 2 + writer/HDF5NXmx.cpp | 115 ++++++++++++++++++++++--------------- writer/HDF5NXmx.h | 2 + writer/HDF5Objects.cpp | 33 ++++++++--- writer/StreamWriter.cpp | 60 ++++++++++++++----- 7 files changed, 277 insertions(+), 109 deletions(-) diff --git a/writer/FileWriter.cpp b/writer/FileWriter.cpp index c7503786..1aa1651f 100644 --- a/writer/FileWriter.cpp +++ b/writer/FileWriter.cpp @@ -86,7 +86,24 @@ void FileWriter::WriteHDF5(const DataMessage& msg) { if (format == FileWriterFormat::NXmxIntegrated && master_file) files[file_number]->CreateFile(msg, master_file->GetFile()); } - files[file_number]->Write(msg, image_number); + + // If the file is already broken from a previous write, fail fast and stop + // touching it. Mark as closed so we don't try again. + if (files[file_number]->IsBroken()) { + files[file_number].reset(); + closed_files.insert(file_number); + throw JFJochException(JFJochExceptionCategory::FileWriteError, + "Data file " + std::to_string(file_number) + " is broken; aborting writes"); + } + + try { + files[file_number]->Write(msg, image_number); + } catch (...) { + // Don't drop the file yet — we still want to attempt a clean Close() + // (which itself is broken-aware) during Finalize, so the .tmp file + // gets unlinked and we don't publish a corrupt .h5 . + throw; + } if (files[file_number]->GetNumImages() == start_message.images_per_file) { CloseFile(file_number); @@ -103,7 +120,15 @@ void FileWriter::CloseFile(uint64_t file_number) { if (closed_files.contains(file_number)) return; - AddStats(files[file_number]->Close()); + try { + AddStats(files[file_number]->Close()); + } catch (...) { + // Even if Close() failed, mark the slot as closed and drop the handle. + // Re-throw so StreamWriter goes to error state. + files[file_number].reset(); + closed_files.insert(file_number); + throw; + } files[file_number].reset(); closed_files.insert(file_number); } @@ -126,12 +151,23 @@ void FileWriter::CloseOldFiles(uint64_t current_image_number) { std::vector FileWriter::Finalize() { std::lock_guard lock(hdf5_mutex); + std::exception_ptr first_err; for (uint64_t f = 0; f < files.size(); ++f) { - if (files[f] && !closed_files.contains(f)) - CloseFile(f); + if (files[f] && !closed_files.contains(f)) { + try { + CloseFile(f); + } catch (...) { + if (!first_err) first_err = std::current_exception(); + } + } } + // Master is finalized via WriteHDF5(EndMessage); here we only release it. if (master_file) master_file.reset(); + + if (first_err) + std::rethrow_exception(first_err); + return stats; } @@ -199,14 +235,20 @@ void FileWriter::CreateHDF5MasterFile(const StartMessage &msg) { void FileWriter::WriteHDF5(const CompressedImage &msg) { if (master_file) { std::lock_guard lock(hdf5_mutex); + if (master_file->IsBroken()) { + spdlog::warn("Calibration {} not written: master file is broken", msg.GetChannel()); + return; + } try { master_file->WriteCalibration(msg); } catch (const JFJochException &e) { spdlog::error("Calibration {} not written {}", msg.GetChannel(), e.what()); + // master is now marked broken inside WriteCalibration; future calls will short-circuit } } } + void FileWriter::WriteHDF5(const EndMessage &msg) { if (master_file) { std::lock_guard lock(hdf5_mutex); diff --git a/writer/HDF5DataFile.cpp b/writer/HDF5DataFile.cpp index 49626a9f..bdd8e1eb 100644 --- a/writer/HDF5DataFile.cpp +++ b/writer/HDF5DataFile.cpp @@ -60,27 +60,66 @@ std::optional HDF5DataFile::Close() { if (!data_file) return {}; - HDF5Group group_exp(*data_file, "/entry/detector"); - group_exp.NXClass("NXcollection"); - - group_exp.SaveVector("timestamp", timestamp); - group_exp.SaveVector("exptime", exptime); - group_exp.SaveVector("number", number); - - for (auto &p: plugins) - p->WriteFinal(*data_file); - - if (data_set) { - data_set->SetExtent({max_image_number + 1, ypixel, xpixel}); - data_set - ->Attr("image_nr_low", (int32_t) (image_low + 1)) - .Attr("image_nr_high", (int32_t) (image_low + 1 + max_image_number)); - data_set->Close(); - data_set.reset(); + // If a prior write already failed, do not call ANY further HDF5 routines on + // this file (per HDF Forum guidance: behavior after an I/O error is undefined, + // and a subsequent H5Fclose can segfault). Just drop the handles and unlink + // the tmp file. Do NOT rename to the final name. + if (broken) { + if (data_set) data_set.reset(); + if (data_set_image_number) data_set_image_number.reset(); + data_file.reset(); + if (manage_file) { + std::error_code ec; + std::filesystem::remove(tmp_filename, ec); + } + closed = true; + return {}; } - if (manage_file ) { - data_file->Close(); + try { + HDF5Group group_exp(*data_file, "/entry/detector"); + group_exp.NXClass("NXcollection"); + + group_exp.SaveVector("timestamp", timestamp); + group_exp.SaveVector("exptime", exptime); + group_exp.SaveVector("number", number); + + for (auto &p: plugins) + p->WriteFinal(*data_file); + + if (data_set) { + data_set->SetExtent({max_image_number + 1, ypixel, xpixel}); + data_set + ->Attr("image_nr_low", (int32_t) (image_low + 1)) + .Attr("image_nr_high", (int32_t) (image_low + 1 + max_image_number)); + data_set->Close(); + data_set.reset(); + } + } catch (...) { + // Anything during finalize failed (most likely ENOSPC). Mark broken, + // drop handles without further HDF5 calls, remove tmp, propagate. + broken = true; + if (data_set) data_set.reset(); + data_file.reset(); + if (manage_file) { + std::error_code ec; + std::filesystem::remove(tmp_filename, ec); + } + closed = true; + throw; + } + + if (manage_file) { + try { + data_file->Close(); + } catch (...) { + broken = true; + data_file.reset(); + std::error_code ec; + std::filesystem::remove(tmp_filename, ec); + closed = true; + throw; + } data_file.reset(); if (std::filesystem::exists(filename) && !overwrite) @@ -102,7 +141,6 @@ std::optional HDF5DataFile::Close() { ret.total_images = nimages; ret.filename = filename; ret.file_number = file_number + 1; - return ret; } @@ -116,10 +154,8 @@ HDF5DataFile::~HDF5DataFile() { std::error_code ec; std::filesystem::remove(tmp_filename, ec); } - } catch (const std::exception &e) { - std::cerr << "HDF5DataFile::~HDF5DataFile: " << e.what() << std::endl; } catch (...) { - std::cerr << "HDF5DataFile::~HDF5DataFile: Unknown error " << std::endl; + // Never throw from destructor; HDF5 may already be in a bad state } } } @@ -168,6 +204,9 @@ void HDF5DataFile::Write(const DataMessage &msg, uint64_t image_number) { if (closed) throw JFJochException(JFJochExceptionCategory::FileWriteError, "Trying to write to already closed file"); + if (broken) + throw JFJochException(JFJochExceptionCategory::FileWriteError, + "Trying to write to file that previously failed"); if (image_number >= images_per_file) throw JFJochException(JFJochExceptionCategory::FileWriteError, "Image number out of bounds"); @@ -177,23 +216,30 @@ void HDF5DataFile::Write(const DataMessage &msg, uint64_t image_number) { CreateFile(msg, std::make_shared(tmp_filename)); } - if (new_file || (static_cast(image_number) > max_image_number)) { - max_image_number = image_number; - timestamp.resize(max_image_number + 1); - exptime.resize(max_image_number + 1); - number.resize(max_image_number + 1); - new_file = false; + try { + if (new_file || (static_cast(image_number) > max_image_number)) { + max_image_number = image_number; + timestamp.resize(max_image_number + 1); + exptime.resize(max_image_number + 1); + number.resize(max_image_number + 1); + new_file = false; + } + + nimages++; + data_set->WriteDirectChunk(msg.image.GetCompressed(), msg.image.GetCompressedSize(), + {image_number, 0, 0}); + + for (auto &p: plugins) + p->Write(msg, image_number); + + timestamp[image_number] = msg.timestamp; + exptime[image_number] = msg.exptime; + number[image_number] = (msg.original_number) ? msg.original_number.value() : msg.number; + } catch (...) { + // Sticky failure: do not call into HDF5 again for this file. + broken = true; + throw; } - - nimages++; - data_set->WriteDirectChunk(msg.image.GetCompressed(), msg.image.GetCompressedSize(), {image_number, 0, 0}); - - for (auto &p: plugins) - p->Write(msg, image_number); - - timestamp[image_number] = msg.timestamp; - exptime[image_number] = msg.exptime; - number[image_number] = (msg.original_number) ? msg.original_number.value() : msg.number; } size_t HDF5DataFile::GetNumImages() const { diff --git a/writer/HDF5DataFile.h b/writer/HDF5DataFile.h index 1346a333..c3aa5607 100644 --- a/writer/HDF5DataFile.h +++ b/writer/HDF5DataFile.h @@ -44,6 +44,7 @@ class HDF5DataFile { int32_t image_low; bool closed = false; + bool broken = false; bool overwrite = false; int64_t file_number; @@ -55,6 +56,7 @@ public: std::optional Close(); void Write(const DataMessage& msg, uint64_t image_number); size_t GetNumImages() const; + bool IsBroken() const { return broken; } void CreateFile(const DataMessage& msg, std::shared_ptr data_file, bool integrated = false); }; diff --git a/writer/HDF5NXmx.cpp b/writer/HDF5NXmx.cpp index 64efcb90..8602e43b 100644 --- a/writer/HDF5NXmx.cpp +++ b/writer/HDF5NXmx.cpp @@ -648,14 +648,19 @@ void NXmx::Attenuator(const StartMessage &start) { } void NXmx::WriteCalibration(const CompressedImage &image) { - if (!hdf5_file) + if (broken || !hdf5_file) throw JFJochException(JFJochExceptionCategory::FileWriteError, "HDF5 file already closed"); - if (!calibration_group_created) { - calibration_group_created = true; - HDF5Group(*hdf5_file, "/entry/instrument/detector/calibration").NXClass("NXcollection"); + try { + if (!calibration_group_created) { + calibration_group_created = true; + HDF5Group(*hdf5_file, "/entry/instrument/detector/calibration").NXClass("NXcollection"); + } + SaveCBORImage("/entry/instrument/detector/calibration/" + image.GetChannel(), image); + } catch (...) { + broken = true; + throw; } - SaveCBORImage("/entry/instrument/detector/calibration/" + image.GetChannel(), image); } void NXmx::SaveCBORImage(const std::string &hdf5_path, const CompressedImage &image) { @@ -714,52 +719,70 @@ void NXmx::ADUHistogram(const EndMessage &end) { void NXmx::Finalize(const EndMessage &end) { if (!hdf5_file) throw JFJochException(JFJochExceptionCategory::FileWriteError, "HDF5 file already closed"); - if (end.end_date) { - hdf5_file->Attr("file_time", end.end_date.value()); - hdf5_file->SaveScalar("/entry/end_time", end.end_date.value()); - hdf5_file->SaveScalar("/entry/end_time_estimated", end.end_date.value()); - } else { - std::string time_now = time_UTC(std::chrono::system_clock::now()); - hdf5_file->Attr("file_time", time_now); - hdf5_file->SaveScalar("/entry/end_time", time_now); - hdf5_file->SaveScalar("/entry/end_time_estimated", time_now); + + if (broken) { + // Don't touch HDF5 anymore. Drop handle, unlink tmp, refuse to rename. + hdf5_file.reset(); + std::error_code ec; + std::filesystem::remove(tmp_filename, ec); + throw JFJochException(JFJochExceptionCategory::FileWriteError, + "HDF5 master file is broken (previous I/O error)"); } - Detector(start_message, end); - Sample(start_message, end); - AzimuthalIntegration(start_message, end); - ADUHistogram(end); + try { + if (end.end_date) { + hdf5_file->Attr("file_time", end.end_date.value()); + hdf5_file->SaveScalar("/entry/end_time", end.end_date.value()); + hdf5_file->SaveScalar("/entry/end_time_estimated", end.end_date.value()); + } else { + std::string time_now = time_UTC(std::chrono::system_clock::now()); + hdf5_file->Attr("file_time", time_now); + hdf5_file->SaveScalar("/entry/end_time", time_now); + hdf5_file->SaveScalar("/entry/end_time_estimated", time_now); + } - switch (start_message.file_format.value_or(FileWriterFormat::NXmxLegacy)) { - case FileWriterFormat::NXmxLegacy: - LinkToData(start_message, end); - break; - case FileWriterFormat::NXmxVDS: - LinkToData_VDS(start_message, end); - break; - case FileWriterFormat::NXmxIntegrated: - default: - break; + Detector(start_message, end); + Sample(start_message, end); + AzimuthalIntegration(start_message, end); + ADUHistogram(end); + + switch (start_message.file_format.value_or(FileWriterFormat::NXmxLegacy)) { + case FileWriterFormat::NXmxLegacy: + LinkToData(start_message, end); + break; + case FileWriterFormat::NXmxVDS: + LinkToData_VDS(start_message, end); + break; + case FileWriterFormat::NXmxIntegrated: + default: + break; + } + + if (end.rotation_lattice) + SaveVector(*hdf5_file, "/entry/MX/rotationLatticeIndexed", end.rotation_lattice->GetVector()) + ->Units("Angstrom"); + + if (end.rotation_lattice_type) + SaveScalar(*hdf5_file, "/entry/MX/rotationLatticeNiggliClass", end.rotation_lattice_type->niggli_class); + + if (end.indexing_rate) + SaveScalar(*hdf5_file, "/entry/MX/imageIndexedMean", end.indexing_rate.value()); + if (end.bkg_estimate) + SaveScalar(*hdf5_file, "/entry/MX/bkgEstimateMean", end.bkg_estimate.value()); + + if (!end.scale_factor.empty()) + SaveVector(*hdf5_file, "/entry/MX/imageScaleFactor", end.scale_factor); + + hdf5_file->Close(); + } catch (...) { + broken = true; + // Drop the file id without further HDF5 calls; remove tmp. + hdf5_file.reset(); + std::error_code ec; + std::filesystem::remove(tmp_filename, ec); + throw; } - if (end.rotation_lattice) - SaveVector(*hdf5_file, "/entry/MX/rotationLatticeIndexed", end.rotation_lattice->GetVector()) - ->Units("Angstrom"); - - if (end.rotation_lattice_type) - SaveScalar(*hdf5_file, "/entry/MX/rotationLatticeNiggliClass", end.rotation_lattice_type->niggli_class); - - if (end.indexing_rate) { - SaveScalar(*hdf5_file, "/entry/MX/imageIndexedMean", end.indexing_rate.value()); - } - if (end.bkg_estimate) { - SaveScalar(*hdf5_file, "/entry/MX/bkgEstimateMean", end.bkg_estimate.value()); - } - - if (!end.scale_factor.empty()) - SaveVector(*hdf5_file, "/entry/MX/imageScaleFactor", end.scale_factor); - - hdf5_file->Close(); hdf5_file.reset(); if (std::filesystem::exists(filename) && !overwrite) diff --git a/writer/HDF5NXmx.h b/writer/HDF5NXmx.h index a02bfe2b..003a94c8 100644 --- a/writer/HDF5NXmx.h +++ b/writer/HDF5NXmx.h @@ -19,6 +19,7 @@ class NXmx { std::string tmp_filename; bool overwrite = false; bool calibration_group_created = false; + bool broken = false; void LinkToData(const StartMessage &start, const EndMessage &end); void LinkToData_VDS(const StartMessage &start, const EndMessage &end); @@ -58,6 +59,7 @@ public: NXmx& operator=(const NXmx &other) = delete; void Finalize(const EndMessage &end); void WriteCalibration(const CompressedImage &image); + bool IsBroken() const { return broken; } std::shared_ptr GetFile(); }; diff --git a/writer/HDF5Objects.cpp b/writer/HDF5Objects.cpp index 17414001..454d0d5f 100644 --- a/writer/HDF5Objects.cpp +++ b/writer/HDF5Objects.cpp @@ -727,12 +727,24 @@ void HDF5File::Close() { if (id < 0) return; - if (H5Fflush(id, H5F_SCOPE_GLOBAL) < 0) - throw JFJochException(JFJochExceptionCategory::HDF5, "Failed to flush HDF5 file"); - - if (H5Fclose(id) < 0) - throw JFJochException(JFJochExceptionCategory::HDF5, "Failed to close HDF5 file"); + // Invalidate first; if anything below fails (e.g. ENOSPC) we must NOT + // leave a live id behind for the destructor or later code to touch. + const hid_t local_id = id; id = -1; + + herr_t flush_err = 0; + H5E_BEGIN_TRY { + flush_err = H5Fflush(local_id, H5F_SCOPE_GLOBAL); + } H5E_END_TRY; + + herr_t close_err = 0; + H5E_BEGIN_TRY { + close_err = H5Fclose(local_id); + } H5E_END_TRY; + + if (flush_err < 0 || close_err < 0) + throw JFJochException(JFJochExceptionCategory::HDF5, + "Failed to flush/close HDF5 file (likely no space left on device)"); } HDF5File::~HDF5File() { @@ -848,9 +860,16 @@ void HDF5DataSet::Close() { if (id < 0) return; - if (H5Dclose(id) < 0) - throw JFJochException(JFJochExceptionCategory::HDF5, "Cannot close HDF5 dataset"); + const hid_t local_id = id; id = -1; + + herr_t err = 0; + H5E_BEGIN_TRY { + err = H5Dclose(local_id); + } H5E_END_TRY; + + if (err < 0) + throw JFJochException(JFJochExceptionCategory::HDF5, "Cannot close HDF5 dataset"); } HDF5DataSet::~HDF5DataSet() { diff --git a/writer/StreamWriter.cpp b/writer/StreamWriter.cpp index 247aab01..faa0ab50 100644 --- a/writer/StreamWriter.cpp +++ b/writer/StreamWriter.cpp @@ -24,10 +24,22 @@ void StreamWriter::NotifyTcpAck(TCPFrameType ack_for, bool ok, bool fatal, TCPAc if (!image_puller.SupportsAck()) return; + // Send only one fatal DATA ack per data collection; subsequent in-flight + // DATA frames just get a quiet non-fatal NACK. + bool send_fatal = fatal; + if (ack_for == TCPFrameType::DATA) { + if (fatal && tcp_data_fatal_sent) { + send_fatal = false; + ok = false; + } else if (fatal) { + tcp_data_fatal_sent = true; + } + } + PullerAckMessage ack; ack.ack_for = ack_for; ack.ok = ok; - ack.fatal = fatal; + ack.fatal = send_fatal; ack.error_code = code; ack.error_text = error_text; ack.run_number = run_number; @@ -105,7 +117,7 @@ void StreamWriter::ProcessCalibrationImage() { } void StreamWriter::ProcessDataImage() { - switch (state) { + switch (state) { case StreamWriterState::Idle: logger.Warning("Missing meaningful image while waiting for START"); mute_data_msg_in_idle = true; @@ -113,7 +125,7 @@ void StreamWriter::ProcessDataImage() { case StreamWriterState::Started: start_time = std::chrono::system_clock::now(); state = StreamWriterState::Receiving; - // Follow through to receiving - no brake! + // Follow through to receiving - no break! case StreamWriterState::Receiving: try { if (verbose) @@ -129,29 +141,44 @@ void StreamWriter::ProcessDataImage() { if (verbose) logger.Info("Written"); NotifyTcpAck(TCPFrameType::DATA, true, false, TCPAckCode::None); - } catch (const JFJochException &e) { - logger.ErrorException(e); - logger.Warning("Error writing image - switching to error state"); + } catch (const std::exception &e) { + logger.Error("Error writing image: {}", e.what()); + logger.Warning("Switching to error state; subsequent DATA frames will be NACKed silently"); + const bool first_fatal = !tcp_data_fatal_sent; state = StreamWriterState::Error; err = e.what(); - NotifyTcpAck(TCPFrameType::DATA, false, true, TCPAckCode::DataWriteFailed, err); + // Heuristic mapping ENOSPC -> NoSpaceLeft + TCPAckCode code = TCPAckCode::DataWriteFailed; + std::string what = e.what(); + if (what.find("no space") != std::string::npos || + what.find("No space") != std::string::npos || + what.find("ENOSPC") != std::string::npos) + code = TCPAckCode::NoSpaceLeft; + NotifyTcpAck(TCPFrameType::DATA, false, first_fatal, code, err); } break; case StreamWriterState::Error: - // Error state => Wait till end only + // Quiet non-fatal NACK so the producer knows this image wasn't written + NotifyTcpAck(TCPFrameType::DATA, false, false, TCPAckCode::DataWriteFailed, err); + break; case StreamWriterState::Finalized: break; } } void StreamWriter::ProcessEndMessage() { - // Ignore end message when idle state! if (state == StreamWriterState::Idle || state == StreamWriterState::Finalized) return; if (verbose) logger.Info("Received end message"); + // Important: close DATA files BEFORE finalizing the master, so that the + // master's links/VDS reference renamed final filenames (and so that we + // don't keep writing to a master if data-file finalize is going to fail + // anyway on ENOSPC). + FinalizeDataCollection(); + if (state != StreamWriterState::Error) { try { if ((image_puller_output.cbor->end_message->max_image_number == 0) && (max_image_number > 0)) @@ -165,9 +192,12 @@ void StreamWriter::ProcessEndMessage() { } } - FinalizeDataCollection(); + // Drop file_writer either way; the master tmp is unlinked in NXmx::~NXmx + // / NXmx::Finalize on the error path. + file_writer.reset(); const bool error_state = (state == StreamWriterState::Error); + state = StreamWriterState::Finalized; NotifyReceiverOnFinalizedWrite(writer_notification_zmq_addr); NotifyTcpAck(TCPFrameType::END, !error_state, error_state, @@ -181,18 +211,22 @@ void StreamWriter::FinalizeDataCollection() { if (file_writer && (state != StreamWriterState::Error)) { try { hdf5_data_file_statistics = file_writer->Finalize(); - } catch (JFJochException &e) { + } catch (const std::exception &e) { state = StreamWriterState::Error; err = e.what(); logger.ErrorException(e); logger.Error("Error finalizing writing - switching to error state"); } + } else if (file_writer && state == StreamWriterState::Error) { + // Best-effort cleanup: still call Finalize so per-file Close() runs and + // unlinks the tmp files even when broken. + try { (void) file_writer->Finalize(); } catch (...) {} + hdf5_data_file_statistics.clear(); } else { hdf5_data_file_statistics.clear(); } - file_writer.reset(); logger.Info("Data writing finished"); - state = StreamWriterState::Finalized; + // Don't reset file_writer here: ProcessEndMessage still needs it for the master. } void StreamWriter::CollectImages() {