Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ea9ba06927 | |||
| 6f3f807aa4 | |||
| 6e833d593e | |||
| 2f57b8d16c | |||
| beeb090a2f |
+46
-4
@@ -86,7 +86,24 @@ void FileWriter::WriteHDF5(const DataMessage& msg) {
|
||||
if (format == FileWriterFormat::NXmxIntegrated && master_file)
|
||||
files[file_number]->CreateFile(msg, master_file->GetFile());
|
||||
}
|
||||
files[file_number]->Write(msg, image_number);
|
||||
|
||||
// If the file is already broken from a previous write, fail fast and stop
|
||||
// touching it. Mark as closed so we don't try again.
|
||||
if (files[file_number]->IsBroken()) {
|
||||
files[file_number].reset();
|
||||
closed_files.insert(file_number);
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError,
|
||||
"Data file " + std::to_string(file_number) + " is broken; aborting writes");
|
||||
}
|
||||
|
||||
try {
|
||||
files[file_number]->Write(msg, image_number);
|
||||
} catch (...) {
|
||||
// Don't drop the file yet — we still want to attempt a clean Close()
|
||||
// (which itself is broken-aware) during Finalize, so the .tmp file
|
||||
// gets unlinked and we don't publish a corrupt .h5 .
|
||||
throw;
|
||||
}
|
||||
|
||||
if (files[file_number]->GetNumImages() == start_message.images_per_file) {
|
||||
CloseFile(file_number);
|
||||
@@ -103,7 +120,15 @@ void FileWriter::CloseFile(uint64_t file_number) {
|
||||
if (closed_files.contains(file_number))
|
||||
return;
|
||||
|
||||
AddStats(files[file_number]->Close());
|
||||
try {
|
||||
AddStats(files[file_number]->Close());
|
||||
} catch (...) {
|
||||
// Even if Close() failed, mark the slot as closed and drop the handle.
|
||||
// Re-throw so StreamWriter goes to error state.
|
||||
files[file_number].reset();
|
||||
closed_files.insert(file_number);
|
||||
throw;
|
||||
}
|
||||
files[file_number].reset();
|
||||
closed_files.insert(file_number);
|
||||
}
|
||||
@@ -126,12 +151,23 @@ void FileWriter::CloseOldFiles(uint64_t current_image_number) {
|
||||
std::vector<HDF5DataFileStatistics> FileWriter::Finalize() {
|
||||
std::lock_guard<std::mutex> lock(hdf5_mutex);
|
||||
|
||||
std::exception_ptr first_err;
|
||||
for (uint64_t f = 0; f < files.size(); ++f) {
|
||||
if (files[f] && !closed_files.contains(f))
|
||||
CloseFile(f);
|
||||
if (files[f] && !closed_files.contains(f)) {
|
||||
try {
|
||||
CloseFile(f);
|
||||
} catch (...) {
|
||||
if (!first_err) first_err = std::current_exception();
|
||||
}
|
||||
}
|
||||
}
|
||||
// Master is finalized via WriteHDF5(EndMessage); here we only release it.
|
||||
if (master_file)
|
||||
master_file.reset();
|
||||
|
||||
if (first_err)
|
||||
std::rethrow_exception(first_err);
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
@@ -199,14 +235,20 @@ void FileWriter::CreateHDF5MasterFile(const StartMessage &msg) {
|
||||
void FileWriter::WriteHDF5(const CompressedImage &msg) {
|
||||
if (master_file) {
|
||||
std::lock_guard<std::mutex> lock(hdf5_mutex);
|
||||
if (master_file->IsBroken()) {
|
||||
spdlog::warn("Calibration {} not written: master file is broken", msg.GetChannel());
|
||||
return;
|
||||
}
|
||||
try {
|
||||
master_file->WriteCalibration(msg);
|
||||
} catch (const JFJochException &e) {
|
||||
spdlog::error("Calibration {} not written {}", msg.GetChannel(), e.what());
|
||||
// master is now marked broken inside WriteCalibration; future calls will short-circuit
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void FileWriter::WriteHDF5(const EndMessage &msg) {
|
||||
if (master_file) {
|
||||
std::lock_guard<std::mutex> lock(hdf5_mutex);
|
||||
|
||||
+105
-39
@@ -60,27 +60,79 @@ std::optional<HDF5DataFileStatistics> HDF5DataFile::Close() {
|
||||
if (!data_file)
|
||||
return {};
|
||||
|
||||
HDF5Group group_exp(*data_file, "/entry/detector");
|
||||
group_exp.NXClass("NXcollection");
|
||||
|
||||
group_exp.SaveVector("timestamp", timestamp);
|
||||
group_exp.SaveVector("exptime", exptime);
|
||||
group_exp.SaveVector("number", number);
|
||||
|
||||
for (auto &p: plugins)
|
||||
p->WriteFinal(*data_file);
|
||||
|
||||
if (data_set) {
|
||||
data_set->SetExtent({max_image_number + 1, ypixel, xpixel});
|
||||
data_set
|
||||
->Attr("image_nr_low", (int32_t) (image_low + 1))
|
||||
.Attr("image_nr_high", (int32_t) (image_low + 1 + max_image_number));
|
||||
data_set.reset();
|
||||
// If a prior write already failed, do not call ANY further HDF5 routines on
|
||||
// this file (per HDF Forum guidance: behavior after an I/O error is undefined,
|
||||
// and a subsequent H5Fclose can segfault). Just drop the handles and unlink
|
||||
// the tmp file. Do NOT rename to the final name.
|
||||
if (broken) {
|
||||
if (data_set) data_set.reset();
|
||||
if (data_set_image_number) data_set_image_number.reset();
|
||||
data_file.reset();
|
||||
if (manage_file) {
|
||||
std::error_code ec;
|
||||
std::filesystem::remove(tmp_filename, ec);
|
||||
}
|
||||
closed = true;
|
||||
return {};
|
||||
}
|
||||
data_file.reset();
|
||||
|
||||
if (manage_file && (!std::filesystem::exists(filename.c_str()) || overwrite))
|
||||
std::rename(tmp_filename.c_str(), filename.c_str());
|
||||
try {
|
||||
HDF5Group group_exp(*data_file, "/entry/detector");
|
||||
group_exp.NXClass("NXcollection");
|
||||
|
||||
group_exp.SaveVector("timestamp", timestamp);
|
||||
group_exp.SaveVector("exptime", exptime);
|
||||
group_exp.SaveVector("number", number);
|
||||
|
||||
for (auto &p: plugins)
|
||||
p->WriteFinal(*data_file);
|
||||
|
||||
if (data_set) {
|
||||
data_set->SetExtent({max_image_number + 1, ypixel, xpixel});
|
||||
data_set
|
||||
->Attr("image_nr_low", (int32_t) (image_low + 1))
|
||||
.Attr("image_nr_high", (int32_t) (image_low + 1 + max_image_number));
|
||||
data_set->Close();
|
||||
data_set.reset();
|
||||
}
|
||||
} catch (...) {
|
||||
// Anything during finalize failed (most likely ENOSPC). Mark broken,
|
||||
// drop handles without further HDF5 calls, remove tmp, propagate.
|
||||
broken = true;
|
||||
if (data_set) data_set.reset();
|
||||
data_file.reset();
|
||||
if (manage_file) {
|
||||
std::error_code ec;
|
||||
std::filesystem::remove(tmp_filename, ec);
|
||||
}
|
||||
closed = true;
|
||||
throw;
|
||||
}
|
||||
|
||||
if (manage_file) {
|
||||
try {
|
||||
data_file->Close();
|
||||
} catch (...) {
|
||||
broken = true;
|
||||
data_file.reset();
|
||||
std::error_code ec;
|
||||
std::filesystem::remove(tmp_filename, ec);
|
||||
closed = true;
|
||||
throw;
|
||||
}
|
||||
data_file.reset();
|
||||
|
||||
if (std::filesystem::exists(filename) && !overwrite)
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError, "File already exists");
|
||||
std::error_code ec;
|
||||
std::filesystem::rename(tmp_filename, filename, ec);
|
||||
if (ec)
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError,
|
||||
"Cannot rename temporary HDF5 file " + tmp_filename +
|
||||
" to " + filename + ": " + ec.message());
|
||||
} else {
|
||||
data_file.reset();
|
||||
}
|
||||
|
||||
closed = true;
|
||||
|
||||
@@ -95,11 +147,15 @@ std::optional<HDF5DataFileStatistics> HDF5DataFile::Close() {
|
||||
HDF5DataFile::~HDF5DataFile() {
|
||||
if (data_file) {
|
||||
try {
|
||||
Close();
|
||||
} catch (const std::exception &e) {
|
||||
std::cerr << "HDF5DataFile::~HDF5DataFile: " << e.what() << std::endl;
|
||||
data_set.reset();
|
||||
data_set_image_number.reset();
|
||||
data_file.reset();
|
||||
if (manage_file) {
|
||||
std::error_code ec;
|
||||
std::filesystem::remove(tmp_filename, ec);
|
||||
}
|
||||
} catch (...) {
|
||||
std::cerr << "HDF5DataFile::~HDF5DataFile: Unknown error " << std::endl;
|
||||
// Never throw from destructor; HDF5 may already be in a bad state
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -148,6 +204,9 @@ void HDF5DataFile::Write(const DataMessage &msg, uint64_t image_number) {
|
||||
if (closed)
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError,
|
||||
"Trying to write to already closed file");
|
||||
if (broken)
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError,
|
||||
"Trying to write to file that previously failed");
|
||||
if (image_number >= images_per_file)
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError,
|
||||
"Image number out of bounds");
|
||||
@@ -157,23 +216,30 @@ void HDF5DataFile::Write(const DataMessage &msg, uint64_t image_number) {
|
||||
CreateFile(msg, std::make_shared<HDF5File>(tmp_filename));
|
||||
}
|
||||
|
||||
if (new_file || (static_cast<int64_t>(image_number) > max_image_number)) {
|
||||
max_image_number = image_number;
|
||||
timestamp.resize(max_image_number + 1);
|
||||
exptime.resize(max_image_number + 1);
|
||||
number.resize(max_image_number + 1);
|
||||
new_file = false;
|
||||
try {
|
||||
if (new_file || (static_cast<int64_t>(image_number) > max_image_number)) {
|
||||
max_image_number = image_number;
|
||||
timestamp.resize(max_image_number + 1);
|
||||
exptime.resize(max_image_number + 1);
|
||||
number.resize(max_image_number + 1);
|
||||
new_file = false;
|
||||
}
|
||||
|
||||
nimages++;
|
||||
data_set->WriteDirectChunk(msg.image.GetCompressed(), msg.image.GetCompressedSize(),
|
||||
{image_number, 0, 0});
|
||||
|
||||
for (auto &p: plugins)
|
||||
p->Write(msg, image_number);
|
||||
|
||||
timestamp[image_number] = msg.timestamp;
|
||||
exptime[image_number] = msg.exptime;
|
||||
number[image_number] = (msg.original_number) ? msg.original_number.value() : msg.number;
|
||||
} catch (...) {
|
||||
// Sticky failure: do not call into HDF5 again for this file.
|
||||
broken = true;
|
||||
throw;
|
||||
}
|
||||
|
||||
nimages++;
|
||||
data_set->WriteDirectChunk(msg.image.GetCompressed(), msg.image.GetCompressedSize(), {image_number, 0, 0});
|
||||
|
||||
for (auto &p: plugins)
|
||||
p->Write(msg, image_number);
|
||||
|
||||
timestamp[image_number] = msg.timestamp;
|
||||
exptime[image_number] = msg.exptime;
|
||||
number[image_number] = (msg.original_number) ? msg.original_number.value() : msg.number;
|
||||
}
|
||||
|
||||
size_t HDF5DataFile::GetNumImages() const {
|
||||
|
||||
@@ -44,6 +44,7 @@ class HDF5DataFile {
|
||||
int32_t image_low;
|
||||
|
||||
bool closed = false;
|
||||
bool broken = false;
|
||||
|
||||
bool overwrite = false;
|
||||
int64_t file_number;
|
||||
@@ -55,6 +56,7 @@ public:
|
||||
std::optional<HDF5DataFileStatistics> Close();
|
||||
void Write(const DataMessage& msg, uint64_t image_number);
|
||||
size_t GetNumImages() const;
|
||||
bool IsBroken() const { return broken; }
|
||||
|
||||
void CreateFile(const DataMessage& msg, std::shared_ptr<HDF5File> data_file, bool integrated = false);
|
||||
};
|
||||
|
||||
+91
-43
@@ -54,8 +54,13 @@ NXmx::NXmx(const StartMessage &start)
|
||||
}
|
||||
|
||||
NXmx::~NXmx() {
|
||||
if (!std::filesystem::exists(filename.c_str()) || overwrite)
|
||||
std::rename(tmp_filename.c_str(), filename.c_str());
|
||||
try {
|
||||
if (hdf5_file) {
|
||||
hdf5_file.reset();
|
||||
std::error_code ec;
|
||||
std::filesystem::remove(tmp_filename, ec);
|
||||
}
|
||||
} catch (...) {}
|
||||
}
|
||||
|
||||
|
||||
@@ -630,11 +635,19 @@ void NXmx::Attenuator(const StartMessage &start) {
|
||||
}
|
||||
|
||||
void NXmx::WriteCalibration(const CompressedImage &image) {
|
||||
if (!calibration_group_created) {
|
||||
calibration_group_created = true;
|
||||
HDF5Group(*hdf5_file, "/entry/instrument/detector/calibration").NXClass("NXcollection");
|
||||
if (broken || !hdf5_file)
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError, "HDF5 file already closed");
|
||||
|
||||
try {
|
||||
if (!calibration_group_created) {
|
||||
calibration_group_created = true;
|
||||
HDF5Group(*hdf5_file, "/entry/instrument/detector/calibration").NXClass("NXcollection");
|
||||
}
|
||||
SaveCBORImage("/entry/instrument/detector/calibration/" + image.GetChannel(), image);
|
||||
} catch (...) {
|
||||
broken = true;
|
||||
throw;
|
||||
}
|
||||
SaveCBORImage("/entry/instrument/detector/calibration/" + image.GetChannel(), image);
|
||||
}
|
||||
|
||||
void NXmx::SaveCBORImage(const std::string &hdf5_path, const CompressedImage &image) {
|
||||
@@ -655,6 +668,8 @@ void NXmx::SaveCBORImage(const std::string &hdf5_path, const CompressedImage &im
|
||||
dataset->Write(data_type, image.GetCompressed());
|
||||
else
|
||||
dataset->WriteDirectChunk(image.GetCompressed(), image.GetCompressedSize(), {0, 0});
|
||||
|
||||
dataset->Close();
|
||||
}
|
||||
|
||||
void NXmx::AzimuthalIntegration(const StartMessage &start, const EndMessage &end) {
|
||||
@@ -689,50 +704,83 @@ void NXmx::ADUHistogram(const EndMessage &end) {
|
||||
}
|
||||
|
||||
void NXmx::Finalize(const EndMessage &end) {
|
||||
if (end.end_date) {
|
||||
hdf5_file->Attr("file_time", end.end_date.value());
|
||||
hdf5_file->SaveScalar("/entry/end_time", end.end_date.value());
|
||||
hdf5_file->SaveScalar("/entry/end_time_estimated", end.end_date.value());
|
||||
} else {
|
||||
std::string time_now = time_UTC(std::chrono::system_clock::now());
|
||||
hdf5_file->Attr("file_time", time_now);
|
||||
hdf5_file->SaveScalar("/entry/end_time", time_now);
|
||||
hdf5_file->SaveScalar("/entry/end_time_estimated", time_now);
|
||||
if (!hdf5_file)
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError, "HDF5 file already closed");
|
||||
|
||||
if (broken) {
|
||||
// Don't touch HDF5 anymore. Drop handle, unlink tmp, refuse to rename.
|
||||
hdf5_file.reset();
|
||||
std::error_code ec;
|
||||
std::filesystem::remove(tmp_filename, ec);
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError,
|
||||
"HDF5 master file is broken (previous I/O error)");
|
||||
}
|
||||
|
||||
Detector(start_message, end);
|
||||
Sample(start_message, end);
|
||||
AzimuthalIntegration(start_message, end);
|
||||
ADUHistogram(end);
|
||||
try {
|
||||
if (end.end_date) {
|
||||
hdf5_file->Attr("file_time", end.end_date.value());
|
||||
hdf5_file->SaveScalar("/entry/end_time", end.end_date.value());
|
||||
hdf5_file->SaveScalar("/entry/end_time_estimated", end.end_date.value());
|
||||
} else {
|
||||
std::string time_now = time_UTC(std::chrono::system_clock::now());
|
||||
hdf5_file->Attr("file_time", time_now);
|
||||
hdf5_file->SaveScalar("/entry/end_time", time_now);
|
||||
hdf5_file->SaveScalar("/entry/end_time_estimated", time_now);
|
||||
}
|
||||
|
||||
switch (start_message.file_format.value_or(FileWriterFormat::NXmxLegacy)) {
|
||||
case FileWriterFormat::NXmxLegacy:
|
||||
LinkToData(start_message, end);
|
||||
break;
|
||||
case FileWriterFormat::NXmxVDS:
|
||||
LinkToData_VDS(start_message, end);
|
||||
break;
|
||||
case FileWriterFormat::NXmxIntegrated:
|
||||
default:
|
||||
break;
|
||||
Detector(start_message, end);
|
||||
Sample(start_message, end);
|
||||
AzimuthalIntegration(start_message, end);
|
||||
ADUHistogram(end);
|
||||
|
||||
switch (start_message.file_format.value_or(FileWriterFormat::NXmxLegacy)) {
|
||||
case FileWriterFormat::NXmxLegacy:
|
||||
LinkToData(start_message, end);
|
||||
break;
|
||||
case FileWriterFormat::NXmxVDS:
|
||||
LinkToData_VDS(start_message, end);
|
||||
break;
|
||||
case FileWriterFormat::NXmxIntegrated:
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
if (end.rotation_lattice)
|
||||
SaveVector(*hdf5_file, "/entry/MX/rotationLatticeIndexed", end.rotation_lattice->GetVector())
|
||||
->Units("Angstrom");
|
||||
|
||||
if (end.rotation_lattice_type)
|
||||
SaveScalar(*hdf5_file, "/entry/MX/rotationLatticeNiggliClass", end.rotation_lattice_type->niggli_class);
|
||||
|
||||
if (end.indexing_rate)
|
||||
SaveScalar(*hdf5_file, "/entry/MX/imageIndexedMean", end.indexing_rate.value());
|
||||
if (end.bkg_estimate)
|
||||
SaveScalar(*hdf5_file, "/entry/MX/bkgEstimateMean", end.bkg_estimate.value());
|
||||
|
||||
if (!end.scale_factor.empty())
|
||||
SaveVector(*hdf5_file, "/entry/MX/imageScaleFactor", end.scale_factor);
|
||||
|
||||
hdf5_file->Close();
|
||||
} catch (...) {
|
||||
broken = true;
|
||||
// Drop the file id without further HDF5 calls; remove tmp.
|
||||
hdf5_file.reset();
|
||||
std::error_code ec;
|
||||
std::filesystem::remove(tmp_filename, ec);
|
||||
throw;
|
||||
}
|
||||
|
||||
if (end.rotation_lattice)
|
||||
SaveVector(*hdf5_file, "/entry/MX/rotationLatticeIndexed", end.rotation_lattice->GetVector())
|
||||
->Units("Angstrom");
|
||||
hdf5_file.reset();
|
||||
|
||||
if (end.rotation_lattice_type)
|
||||
SaveScalar(*hdf5_file, "/entry/MX/rotationLatticeNiggliClass", end.rotation_lattice_type->niggli_class);
|
||||
if (std::filesystem::exists(filename) && !overwrite)
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError, "File already exists");
|
||||
|
||||
if (end.indexing_rate) {
|
||||
SaveScalar(*hdf5_file, "/entry/MX/imageIndexedMean", end.indexing_rate.value());
|
||||
}
|
||||
if (end.bkg_estimate) {
|
||||
SaveScalar(*hdf5_file, "/entry/MX/bkgEstimateMean", end.bkg_estimate.value());
|
||||
}
|
||||
|
||||
if (!end.scale_factor.empty())
|
||||
SaveVector(*hdf5_file, "/entry/MX/imageScaleFactor", end.scale_factor);
|
||||
std::error_code ec;
|
||||
std::filesystem::rename(tmp_filename, filename, ec);
|
||||
if (ec)
|
||||
throw JFJochException(JFJochExceptionCategory::FileWriteError,
|
||||
"Cannot rename temporary HDF5 master file " + tmp_filename +
|
||||
" to " + filename + ": " + ec.message());
|
||||
}
|
||||
|
||||
void NXmx::UserData(const StartMessage &start) {
|
||||
|
||||
@@ -19,6 +19,7 @@ class NXmx {
|
||||
std::string tmp_filename;
|
||||
bool overwrite = false;
|
||||
bool calibration_group_created = false;
|
||||
bool broken = false;
|
||||
|
||||
void LinkToData(const StartMessage &start, const EndMessage &end);
|
||||
void LinkToData_VDS(const StartMessage &start, const EndMessage &end);
|
||||
@@ -58,6 +59,7 @@ public:
|
||||
NXmx& operator=(const NXmx &other) = delete;
|
||||
void Finalize(const EndMessage &end);
|
||||
void WriteCalibration(const CompressedImage &image);
|
||||
bool IsBroken() const { return broken; }
|
||||
|
||||
std::shared_ptr<HDF5File> GetFile();
|
||||
};
|
||||
|
||||
+86
-8
@@ -90,7 +90,12 @@ void HDF5DataSpace::SelectHyperslabWithStride(const std::vector<hsize_t> &start,
|
||||
}
|
||||
|
||||
HDF5DataSpace::~HDF5DataSpace() {
|
||||
if (id >= 0) H5Sclose(id);
|
||||
if (id >= 0) {
|
||||
H5E_BEGIN_TRY {
|
||||
H5Sclose(id);
|
||||
} H5E_END_TRY;
|
||||
id = -1;
|
||||
}
|
||||
}
|
||||
|
||||
uint8_t HDF5DataSpace::GetNumOfDimensions() const {
|
||||
@@ -204,7 +209,12 @@ HDF5DataType::HDF5DataType(const HDF5DataSet &data_set) :HDF5Id() {
|
||||
}
|
||||
|
||||
HDF5DataType::~HDF5DataType() {
|
||||
if (id >= 0) H5Tclose(id);
|
||||
if (id >= 0) {
|
||||
H5E_BEGIN_TRY {
|
||||
H5Tclose(id);
|
||||
} H5E_END_TRY;
|
||||
id = -1;
|
||||
}
|
||||
}
|
||||
|
||||
size_t HDF5DataType::GetElemSize() const {
|
||||
@@ -251,7 +261,12 @@ HDF5Dcpl::HDF5Dcpl(const HDF5DataSet &data_set) : HDF5Id() {
|
||||
}
|
||||
|
||||
HDF5Dcpl::~HDF5Dcpl() {
|
||||
if (id >= 0) H5Pclose(id);
|
||||
if (id >= 0) {
|
||||
H5E_BEGIN_TRY {
|
||||
H5Pclose(id);
|
||||
} H5E_END_TRY;
|
||||
id = -1;
|
||||
}
|
||||
}
|
||||
|
||||
void HDF5Dcpl::SetChunking(const std::vector<hsize_t> &dims) {
|
||||
@@ -308,7 +323,12 @@ HDF5Fapl::HDF5Fapl() : HDF5Id() {
|
||||
}
|
||||
|
||||
HDF5Fapl::~HDF5Fapl() {
|
||||
H5Pclose(id);
|
||||
if (id >= 0) {
|
||||
H5E_BEGIN_TRY {
|
||||
H5Pclose(id);
|
||||
} H5E_END_TRY;
|
||||
id = -1;
|
||||
}
|
||||
}
|
||||
|
||||
void HDF5Fapl::SetVersionTo1p10orNewer() {
|
||||
@@ -685,7 +705,12 @@ HDF5Group::HDF5Group(const HDF5Object& parent, const char *name) : HDF5Object()
|
||||
}
|
||||
|
||||
HDF5Group::~HDF5Group() {
|
||||
H5Gclose(id);
|
||||
if (id >= 0) {
|
||||
H5E_BEGIN_TRY {
|
||||
H5Gclose(id);
|
||||
} H5E_END_TRY;
|
||||
id = -1;
|
||||
}
|
||||
}
|
||||
|
||||
HDF5File::HDF5File(const std::string& filename, bool v1_10) : HDF5Object() {
|
||||
@@ -698,8 +723,37 @@ HDF5File::HDF5File(const std::string& filename, bool v1_10) : HDF5Object() {
|
||||
throw JFJochException(JFJochExceptionCategory::HDF5, "Cannot open/create data HDF5 file " + filename);
|
||||
}
|
||||
|
||||
void HDF5File::Close() {
|
||||
if (id < 0)
|
||||
return;
|
||||
|
||||
// Invalidate first; if anything below fails (e.g. ENOSPC) we must NOT
|
||||
// leave a live id behind for the destructor or later code to touch.
|
||||
const hid_t local_id = id;
|
||||
id = -1;
|
||||
|
||||
herr_t flush_err = 0;
|
||||
H5E_BEGIN_TRY {
|
||||
flush_err = H5Fflush(local_id, H5F_SCOPE_GLOBAL);
|
||||
} H5E_END_TRY;
|
||||
|
||||
herr_t close_err = 0;
|
||||
H5E_BEGIN_TRY {
|
||||
close_err = H5Fclose(local_id);
|
||||
} H5E_END_TRY;
|
||||
|
||||
if (flush_err < 0 || close_err < 0)
|
||||
throw JFJochException(JFJochExceptionCategory::HDF5,
|
||||
"Failed to flush/close HDF5 file (likely no space left on device)");
|
||||
}
|
||||
|
||||
HDF5File::~HDF5File() {
|
||||
if (id >= 0) H5Fclose(id);
|
||||
if (id >= 0) {
|
||||
H5E_BEGIN_TRY {
|
||||
H5Fclose(id);
|
||||
} H5E_END_TRY;
|
||||
id = -1;
|
||||
}
|
||||
}
|
||||
|
||||
void HDF5File::Delete(const std::string& path) {
|
||||
@@ -713,7 +767,10 @@ HDF5ReadOnlyFile::HDF5ReadOnlyFile(const std::string &filename) {
|
||||
}
|
||||
|
||||
HDF5ReadOnlyFile::~HDF5ReadOnlyFile() {
|
||||
if (id >= 0) H5Fclose(id);
|
||||
if (id >= 0) {
|
||||
H5E_BEGIN_TRY {H5Fclose(id); } H5E_END_TRY;
|
||||
id = -1;
|
||||
}
|
||||
}
|
||||
|
||||
HDF5DataSet::HDF5DataSet(const HDF5Object &parent, const std::string &name, const HDF5DataType &data_type,
|
||||
@@ -799,8 +856,29 @@ std::string HDF5DataSet::ReadString() const {
|
||||
return buffer;
|
||||
}
|
||||
|
||||
void HDF5DataSet::Close() {
|
||||
if (id < 0)
|
||||
return;
|
||||
|
||||
const hid_t local_id = id;
|
||||
id = -1;
|
||||
|
||||
herr_t err = 0;
|
||||
H5E_BEGIN_TRY {
|
||||
err = H5Dclose(local_id);
|
||||
} H5E_END_TRY;
|
||||
|
||||
if (err < 0)
|
||||
throw JFJochException(JFJochExceptionCategory::HDF5, "Cannot close HDF5 dataset");
|
||||
}
|
||||
|
||||
HDF5DataSet::~HDF5DataSet() {
|
||||
if (id >= 0) H5Dclose(id);
|
||||
if (id >= 0) {
|
||||
H5E_BEGIN_TRY {
|
||||
H5Dclose(id);
|
||||
} H5E_END_TRY;
|
||||
id = -1;
|
||||
}
|
||||
}
|
||||
|
||||
void HDF5DataSet::ReadDirectChunk(std::vector<uint8_t> &val, const std::vector<hsize_t> &offset) {
|
||||
|
||||
@@ -169,6 +169,7 @@ public:
|
||||
explicit HDF5File(const std::string& filename, bool v1_10 = false);
|
||||
~HDF5File();
|
||||
void Delete(const std::string& path);
|
||||
void Close();
|
||||
};
|
||||
|
||||
class HDF5ReadOnlyFile : public HDF5Object {
|
||||
@@ -321,6 +322,7 @@ public:
|
||||
}
|
||||
|
||||
std::string ReadString() const;
|
||||
void Close();
|
||||
};
|
||||
|
||||
inline std::unique_ptr<HDF5DataSet> SaveScalar(const HDF5Object& parent, const std::string &name, const char* val) {
|
||||
|
||||
+47
-13
@@ -24,10 +24,22 @@ void StreamWriter::NotifyTcpAck(TCPFrameType ack_for, bool ok, bool fatal, TCPAc
|
||||
if (!image_puller.SupportsAck())
|
||||
return;
|
||||
|
||||
// Send only one fatal DATA ack per data collection; subsequent in-flight
|
||||
// DATA frames just get a quiet non-fatal NACK.
|
||||
bool send_fatal = fatal;
|
||||
if (ack_for == TCPFrameType::DATA) {
|
||||
if (fatal && tcp_data_fatal_sent) {
|
||||
send_fatal = false;
|
||||
ok = false;
|
||||
} else if (fatal) {
|
||||
tcp_data_fatal_sent = true;
|
||||
}
|
||||
}
|
||||
|
||||
PullerAckMessage ack;
|
||||
ack.ack_for = ack_for;
|
||||
ack.ok = ok;
|
||||
ack.fatal = fatal;
|
||||
ack.fatal = send_fatal;
|
||||
ack.error_code = code;
|
||||
ack.error_text = error_text;
|
||||
ack.run_number = run_number;
|
||||
@@ -105,7 +117,7 @@ void StreamWriter::ProcessCalibrationImage() {
|
||||
}
|
||||
|
||||
void StreamWriter::ProcessDataImage() {
|
||||
switch (state) {
|
||||
switch (state) {
|
||||
case StreamWriterState::Idle:
|
||||
logger.Warning("Missing meaningful image while waiting for START");
|
||||
mute_data_msg_in_idle = true;
|
||||
@@ -113,7 +125,7 @@ void StreamWriter::ProcessDataImage() {
|
||||
case StreamWriterState::Started:
|
||||
start_time = std::chrono::system_clock::now();
|
||||
state = StreamWriterState::Receiving;
|
||||
// Follow through to receiving - no brake!
|
||||
// Follow through to receiving - no break!
|
||||
case StreamWriterState::Receiving:
|
||||
try {
|
||||
if (verbose)
|
||||
@@ -129,29 +141,44 @@ void StreamWriter::ProcessDataImage() {
|
||||
if (verbose)
|
||||
logger.Info("Written");
|
||||
NotifyTcpAck(TCPFrameType::DATA, true, false, TCPAckCode::None);
|
||||
} catch (const JFJochException &e) {
|
||||
logger.ErrorException(e);
|
||||
logger.Warning("Error writing image - switching to error state");
|
||||
} catch (const std::exception &e) {
|
||||
logger.Error("Error writing image: {}", e.what());
|
||||
logger.Warning("Switching to error state; subsequent DATA frames will be NACKed silently");
|
||||
const bool first_fatal = !tcp_data_fatal_sent;
|
||||
state = StreamWriterState::Error;
|
||||
err = e.what();
|
||||
NotifyTcpAck(TCPFrameType::DATA, false, true, TCPAckCode::DataWriteFailed, err);
|
||||
// Heuristic mapping ENOSPC -> NoSpaceLeft
|
||||
TCPAckCode code = TCPAckCode::DataWriteFailed;
|
||||
std::string what = e.what();
|
||||
if (what.find("no space") != std::string::npos ||
|
||||
what.find("No space") != std::string::npos ||
|
||||
what.find("ENOSPC") != std::string::npos)
|
||||
code = TCPAckCode::NoSpaceLeft;
|
||||
NotifyTcpAck(TCPFrameType::DATA, false, first_fatal, code, err);
|
||||
}
|
||||
break;
|
||||
case StreamWriterState::Error:
|
||||
// Error state => Wait till end only
|
||||
// Quiet non-fatal NACK so the producer knows this image wasn't written
|
||||
NotifyTcpAck(TCPFrameType::DATA, false, false, TCPAckCode::DataWriteFailed, err);
|
||||
break;
|
||||
case StreamWriterState::Finalized:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void StreamWriter::ProcessEndMessage() {
|
||||
// Ignore end message when idle state!
|
||||
if (state == StreamWriterState::Idle || state == StreamWriterState::Finalized)
|
||||
return;
|
||||
|
||||
if (verbose)
|
||||
logger.Info("Received end message");
|
||||
|
||||
// Important: close DATA files BEFORE finalizing the master, so that the
|
||||
// master's links/VDS reference renamed final filenames (and so that we
|
||||
// don't keep writing to a master if data-file finalize is going to fail
|
||||
// anyway on ENOSPC).
|
||||
FinalizeDataCollection();
|
||||
|
||||
if (state != StreamWriterState::Error) {
|
||||
try {
|
||||
if ((image_puller_output.cbor->end_message->max_image_number == 0) && (max_image_number > 0))
|
||||
@@ -165,9 +192,12 @@ void StreamWriter::ProcessEndMessage() {
|
||||
}
|
||||
}
|
||||
|
||||
FinalizeDataCollection();
|
||||
// Drop file_writer either way; the master tmp is unlinked in NXmx::~NXmx
|
||||
// / NXmx::Finalize on the error path.
|
||||
file_writer.reset();
|
||||
|
||||
const bool error_state = (state == StreamWriterState::Error);
|
||||
state = StreamWriterState::Finalized;
|
||||
|
||||
NotifyReceiverOnFinalizedWrite(writer_notification_zmq_addr);
|
||||
NotifyTcpAck(TCPFrameType::END, !error_state, error_state,
|
||||
@@ -181,18 +211,22 @@ void StreamWriter::FinalizeDataCollection() {
|
||||
if (file_writer && (state != StreamWriterState::Error)) {
|
||||
try {
|
||||
hdf5_data_file_statistics = file_writer->Finalize();
|
||||
} catch (JFJochException &e) {
|
||||
} catch (const std::exception &e) {
|
||||
state = StreamWriterState::Error;
|
||||
err = e.what();
|
||||
logger.ErrorException(e);
|
||||
logger.Error("Error finalizing writing - switching to error state");
|
||||
}
|
||||
} else if (file_writer && state == StreamWriterState::Error) {
|
||||
// Best-effort cleanup: still call Finalize so per-file Close() runs and
|
||||
// unlinks the tmp files even when broken.
|
||||
try { (void) file_writer->Finalize(); } catch (...) {}
|
||||
hdf5_data_file_statistics.clear();
|
||||
} else {
|
||||
hdf5_data_file_statistics.clear();
|
||||
}
|
||||
file_writer.reset();
|
||||
logger.Info("Data writing finished");
|
||||
state = StreamWriterState::Finalized;
|
||||
// Don't reset file_writer here: ProcessEndMessage still needs it for the master.
|
||||
}
|
||||
|
||||
void StreamWriter::CollectImages() {
|
||||
|
||||
Reference in New Issue
Block a user