// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-only #include "FileWriter.h" #include #include "MakeDirectory.h" #include "../common/CheckPath.h" #include "../common/Logger.h" #include "../common/JFJochException.h" #include "../preview/JFJochTIFF.h" #include "JFJochDecompress.h" FileWriter::FileWriter(const StartMessage &request) : start_message(request) { if (start_message.file_format) format = start_message.file_format.value(); // defailt if (start_message.images_per_file <= 0) start_message.images_per_file = default_images_per_file; CheckPath(start_message.file_prefix); MakeDirectory(start_message.file_prefix); if (start_message.write_master_file && start_message.write_master_file.value()) { switch (format) { case FileWriterFormat::NXmxLegacy: case FileWriterFormat::NXmxVDS: case FileWriterFormat::NXmxIntegrated: CreateHDF5MasterFile(request); break; case FileWriterFormat::CBF: cbf_writer = std::make_unique(start_message); break; default: // Do nothing break; } } } void FileWriter::Write(const DataMessage &msg) { switch (format) { case FileWriterFormat::DataOnly: case FileWriterFormat::NXmxLegacy: case FileWriterFormat::NXmxVDS: case FileWriterFormat::NXmxIntegrated: WriteHDF5(msg); break; case FileWriterFormat::CBF: cbf_writer->WriteImage(msg); break; case FileWriterFormat::TIFF: WriteTIFF(msg); break; case FileWriterFormat::NoFile: // Do nothing break; } } void FileWriter::WriteTIFF(const DataMessage &msg) { const std::string file_name = fmt::format("{:s}{:06d}.tiff", start_message.file_prefix, msg.number); WriteTIFFToFile(file_name,msg.image); } void FileWriter::WriteHDF5(const DataMessage& msg) { std::lock_guard lock(hdf5_mutex); if (msg.image.GetCompressedSize() == 0) return; if (msg.number < 0) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "No support for negative images"); const uint64_t file_number = (start_message.images_per_file == 0) ? 0 : msg.number / start_message.images_per_file; const uint64_t image_number = (start_message.images_per_file == 0) ? msg.number : msg.number % start_message.images_per_file; if (closed_files.contains(file_number)) return; if (files.size() <= file_number) files.resize(file_number + 1); if (!files[file_number]) { files[file_number] = std::make_unique(start_message, file_number); if (format == FileWriterFormat::NXmxIntegrated && master_file) files[file_number]->CreateFile(msg, master_file->GetFile()); } // If the file is already broken from a previous write, fail fast and stop // touching it. Mark as closed so we don't try again. if (files[file_number]->IsBroken()) { files[file_number].reset(); closed_files.insert(file_number); throw JFJochException(JFJochExceptionCategory::FileWriteError, "Data file " + std::to_string(file_number) + " is broken; aborting writes"); } try { files[file_number]->Write(msg, image_number); } catch (...) { // Don't drop the file yet — we still want to attempt a clean Close() // (which itself is broken-aware) during Finalize, so the .tmp file // gets unlinked and we don't publish a corrupt .h5 . throw; } if (files[file_number]->GetNumImages() == start_message.images_per_file) { CloseFile(file_number); } else { CloseOldFiles(static_cast(msg.number)); } } void FileWriter::CloseFile(uint64_t file_number) { if (file_number >= files.size()) return; if (!files[file_number]) return; if (closed_files.contains(file_number)) return; try { AddStats(files[file_number]->Close()); } catch (...) { // Even if Close() failed, mark the slot as closed and drop the handle. // Re-throw so StreamWriter goes to error state. files[file_number].reset(); closed_files.insert(file_number); throw; } files[file_number].reset(); closed_files.insert(file_number); } void FileWriter::CloseOldFiles(uint64_t current_image_number) { if (start_message.images_per_file == 0) return; for (uint64_t f = 0; f < files.size(); ++f) { if (!files[f] || closed_files.contains(f)) continue; const uint64_t file_end_image = (f + 1) * start_message.images_per_file - 1; if (current_image_number > file_end_image + close_file_lag_images) { CloseFile(f); } } } std::vector FileWriter::Finalize() { std::lock_guard lock(hdf5_mutex); std::exception_ptr first_err; for (uint64_t f = 0; f < files.size(); ++f) { if (files[f] && !closed_files.contains(f)) { try { CloseFile(f); } catch (...) { if (!first_err) first_err = std::current_exception(); } } } // Master is finalized via WriteHDF5(EndMessage); here we only release it. if (master_file) master_file.reset(); if (first_err) std::rethrow_exception(first_err); return stats; } void FileWriter::AddStats(const std::optional& s) { if (!s) return; stats.push_back(*s); if (finalized_file_socket) { nlohmann::json j; j["filename"] = s->filename; j["nimages"] = s->total_images; j["file_number"] = s->file_number; j["detector_distance_m"] = start_message.detector_distance; j["beam_x_pxl"] = start_message.beam_center_x; j["beam_y_pxl"] = start_message.beam_center_y; j["pixel_size_m"] = start_message.pixel_size_x; j["detector_width_pxl"] = start_message.image_size_x; j["detector_height_pxl"] = start_message.image_size_y; j["incident_energy_eV"] = start_message.incident_energy; j["saturation"] = start_message.saturation_value; j["sample_name"] = start_message.sample_name; j["run_number"] = start_message.run_number; j["run_name"] = start_message.run_name; if (!start_message.experiment_group.empty()) j["experiment_group"] = start_message.experiment_group; if (start_message.unit_cell) { j["unit_cell"]["a"] = start_message.unit_cell->a; j["unit_cell"]["b"] = start_message.unit_cell->b; j["unit_cell"]["c"] = start_message.unit_cell->c; j["unit_cell"]["alpha"] = start_message.unit_cell->alpha; j["unit_cell"]["beta"] = start_message.unit_cell->beta; j["unit_cell"]["gamma"] = start_message.unit_cell->gamma; } if (start_message.space_group_number) j["space_group_number"] = start_message.space_group_number.value(); if (start_message.error_value) j["underload"] = start_message.error_value.value(); j["user_data"] = start_message.user_data; finalized_file_socket->Send(j.dump()); } } void FileWriter::SetupFinalizedFileSocket(const std::string &addr) { finalized_file_socket = std::make_unique(ZMQSocketType::Pub); finalized_file_socket->Bind(addr); } std::optional FileWriter::GetZMQAddr() { if (finalized_file_socket) { return finalized_file_socket->GetEndpointName(); } else return {}; } void FileWriter::CreateHDF5MasterFile(const StartMessage &msg) { std::lock_guard lock(hdf5_mutex); master_file = std::make_unique(msg); } void FileWriter::WriteHDF5(const CompressedImage &msg) { if (master_file) { std::lock_guard lock(hdf5_mutex); if (master_file->IsBroken()) { spdlog::warn("Calibration {} not written: master file is broken", msg.GetChannel()); return; } try { master_file->WriteCalibration(msg); } catch (const JFJochException &e) { spdlog::error("Calibration {} not written {}", msg.GetChannel(), e.what()); // master is now marked broken inside WriteCalibration; future calls will short-circuit } } } void FileWriter::WriteHDF5(const EndMessage &msg) { if (master_file) { std::lock_guard lock(hdf5_mutex); master_file->Finalize(msg); } }