FileWriter: Protect agains increasing memory without bounds.
Some checks failed
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 13m13s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 14m21s
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 14m25s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 14m22s
Build Packages / Generate python client (push) Successful in 15s
Build Packages / build:rpm (rocky8) (push) Successful in 14m34s
Build Packages / Create release (push) Has been skipped
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 15m5s
Build Packages / Build documentation (push) Successful in 38s
Build Packages / build:rpm (rocky9) (push) Successful in 14m59s
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 15m16s
Build Packages / build:rpm (ubuntu2204) (push) Successful in 8m18s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 7m30s
Build Packages / Unit tests (push) Failing after 55m4s
Some checks failed
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 13m13s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 14m21s
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 14m25s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 14m22s
Build Packages / Generate python client (push) Successful in 15s
Build Packages / build:rpm (rocky8) (push) Successful in 14m34s
Build Packages / Create release (push) Has been skipped
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 15m5s
Build Packages / Build documentation (push) Successful in 38s
Build Packages / build:rpm (rocky9) (push) Successful in 14m59s
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 15m16s
Build Packages / build:rpm (ubuntu2204) (push) Successful in 8m18s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 7m30s
Build Packages / Unit tests (push) Failing after 55m4s
This commit is contained in:
@@ -57,6 +57,7 @@ void FileWriter::WriteTIFF(const DataMessage &msg) {
|
||||
WriteTIFFToFile(file_name,msg.image);
|
||||
}
|
||||
|
||||
|
||||
void FileWriter::WriteHDF5(const DataMessage& msg) {
|
||||
std::lock_guard<std::mutex> lock(hdf5_mutex);
|
||||
if (msg.image.GetCompressedSize() == 0)
|
||||
@@ -71,18 +72,51 @@ void FileWriter::WriteHDF5(const DataMessage& msg) {
|
||||
file_number = msg.number / start_message.images_per_file;
|
||||
image_number = msg.number % start_message.images_per_file;
|
||||
}
|
||||
|
||||
if (closed_files.contains(file_number))
|
||||
return;
|
||||
|
||||
if (files.size() <= file_number)
|
||||
files.resize(file_number + 1);
|
||||
|
||||
if (!files[file_number])
|
||||
files[file_number] = std::make_unique<HDF5DataFile>(start_message, file_number);
|
||||
|
||||
// Ignore zero size images
|
||||
if (msg.image.GetCompressedSize() > 0)
|
||||
files[file_number]->Write(msg, image_number);
|
||||
files[file_number]->Write(msg, image_number);
|
||||
|
||||
if (files[file_number]->GetNumImages() == start_message.images_per_file)
|
||||
AddStats(files[file_number]->Close());
|
||||
if (files[file_number]->GetNumImages() == start_message.images_per_file) {
|
||||
CloseFile(file_number);
|
||||
} else {
|
||||
CloseOldFiles(static_cast<uint64_t>(msg.number));
|
||||
}
|
||||
}
|
||||
|
||||
void FileWriter::CloseFile(uint64_t file_number) {
|
||||
if (file_number >= files.size())
|
||||
return;
|
||||
if (!files[file_number])
|
||||
return;
|
||||
if (closed_files.contains(file_number))
|
||||
return;
|
||||
|
||||
AddStats(files[file_number]->Close());
|
||||
files[file_number].reset();
|
||||
closed_files.insert(file_number);
|
||||
}
|
||||
|
||||
void FileWriter::CloseOldFiles(uint64_t current_image_number) {
|
||||
if (start_message.images_per_file == 0)
|
||||
return;
|
||||
|
||||
for (uint64_t f = 0; f < files.size(); ++f) {
|
||||
if (!files[f] || closed_files.contains(f))
|
||||
continue;
|
||||
|
||||
const uint64_t file_end_image = (f + 1) * start_message.images_per_file - 1;
|
||||
if (current_image_number > file_end_image + close_file_lag_images) {
|
||||
CloseFile(f);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<HDF5DataFileStatistics> FileWriter::Finalize() {
|
||||
@@ -92,9 +126,9 @@ std::vector<HDF5DataFileStatistics> FileWriter::Finalize() {
|
||||
master_file.reset();
|
||||
}
|
||||
|
||||
for (auto &f: files) {
|
||||
if (f)
|
||||
AddStats(f->Close());
|
||||
for (uint64_t f = 0; f < files.size(); ++f) {
|
||||
if (files[f] && !closed_files.contains(f))
|
||||
CloseFile(f);
|
||||
}
|
||||
return stats;
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include "../common/ZMQWrappers.h"
|
||||
#include "HDF5NXmx.h"
|
||||
#include "CBFWriter.h"
|
||||
#include <unordered_set>
|
||||
|
||||
class FileWriter {
|
||||
FileWriterFormat format = FileWriterFormat::NXmxLegacy;
|
||||
@@ -21,8 +22,14 @@ class FileWriter {
|
||||
std::vector<HDF5DataFileStatistics> stats;
|
||||
std::unique_ptr<ZMQSocket> finalized_file_socket;
|
||||
std::unique_ptr<CBFWriter> cbf_writer;
|
||||
|
||||
std::unordered_set<uint64_t> closed_files;
|
||||
uint64_t close_file_lag_images = 1000;
|
||||
|
||||
void CreateHDF5MasterFile(const StartMessage& msg);
|
||||
void AddStats(const std::optional<HDF5DataFileStatistics>& s);
|
||||
void CloseFile(uint64_t file_number);
|
||||
void CloseOldFiles(uint64_t current_image_number);
|
||||
|
||||
public:
|
||||
explicit FileWriter(const StartMessage &request);
|
||||
|
||||
@@ -9,7 +9,7 @@ void HDF5DataFilePluginROI::OpenFile(HDF5File &data_file, const DataMessage &msg
|
||||
|
||||
void HDF5DataFilePluginROI::Write(const DataMessage &msg, uint64_t image_number) {
|
||||
for (const auto &r: msg.roi) {
|
||||
if (roi_data.contains(r.first)) {
|
||||
if (!roi_data.contains(r.first)) {
|
||||
roi_data[r.first].max.reserve(RESERVE_IMAGES);
|
||||
roi_data[r.first].sum.reserve(RESERVE_IMAGES);
|
||||
roi_data[r.first].sum_sq.reserve(RESERVE_IMAGES);
|
||||
|
||||
Reference in New Issue
Block a user