// Copyright (2019-2023) Paul Scherrer Institute #include "HDF5Writer.h" #include #include "MakeDirectory.h" #include "../common/CheckPath.h" #include "../common/Logger.h" #include "../common/JFJochException.h" HDF5Writer::HDF5Writer(const StartMessage &request) : start_message(request) { CheckPath(start_message.file_prefix); MakeDirectory(start_message.file_prefix); if (start_message.write_master_file && start_message.write_master_file.value()) { std::lock_guard lock(hdf5_mutex); master_file = std::make_unique(request); } } void HDF5Writer::Write(const DataMessage& message) { std::lock_guard lock(hdf5_mutex); if (message.image.size == 0) return; if (message.number < 0) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "No support for negative images"); uint64_t file_number = 0; size_t image_number = message.number; if (start_message.images_per_file > 0) { file_number = message.number / start_message.images_per_file; image_number = message.number % start_message.images_per_file; } if (files.size() <= file_number) files.resize(file_number + 1); if (!files[file_number]) files[file_number] = std::make_unique(start_message, file_number); // Ignore zero size images if (message.image.size > 0) files[file_number]->Write(message, image_number); if (files[file_number]->GetNumImages() == start_message.images_per_file) AddStats(files[file_number]->Close()); } std::vector HDF5Writer::Finalize() { if (master_file) { std::lock_guard lock(hdf5_mutex); master_file.reset(); } for (auto &f: files) { if (f) AddStats(f->Close()); } return stats; } void HDF5Writer::AddStats(const std::optional& s) { if (!s) return; stats.push_back(*s); if (finalized_file_socket) { nlohmann::json j; j["filename"] = s->filename; j["nimages"] = s->total_images; j["file_number"] = s->file_number; j["detector_distance_m"] = start_message.detector_distance; j["beam_x_pxl"] = start_message.beam_center_x; j["beam_y_pxl"] = start_message.beam_center_y; j["pixel_size_m"] = start_message.pixel_size_x; j["detector_width_pxl"] = start_message.image_size_x; j["detector_height_pxl"] = start_message.image_size_y; j["incident_energy_eV"] = start_message.incident_energy; j["saturation"] = start_message.saturation_value; j["sample_name"] = start_message.sample_name; j["run_number"] = start_message.run_number; j["run_name"] = start_message.run_name; if (!start_message.experiment_group.empty()) j["experiment_group"] = start_message.experiment_group; if (start_message.unit_cell) { j["unit_cell"]["a"] = start_message.unit_cell->a; j["unit_cell"]["b"] = start_message.unit_cell->b; j["unit_cell"]["c"] = start_message.unit_cell->c; j["unit_cell"]["alpha"] = start_message.unit_cell->alpha; j["unit_cell"]["beta"] = start_message.unit_cell->beta; j["unit_cell"]["gamma"] = start_message.unit_cell->gamma; } if (start_message.space_group_number > 0) j["space_group_number"] = start_message.space_group_number; if (start_message.error_value) j["underload"] = start_message.error_value.value(); j["user_data"] = start_message.user_data; finalized_file_socket->Send(j.dump()); } } void HDF5Writer::SetupFinalizedFileSocket(const std::string &addr) { finalized_file_socket = std::make_unique(ZMQSocketType::Pub); finalized_file_socket->Bind(addr); } std::optional HDF5Writer::GetZMQAddr() { if (finalized_file_socket) { return finalized_file_socket->GetEndpointName(); } else return {}; } void HDF5Writer::Write(const CompressedImage &msg) { if (master_file) { std::lock_guard lock(hdf5_mutex); try { master_file->WriteCalibration(msg); } catch (const JFJochException &e) { spdlog::error("Calibration {} not written {}", msg.channel, e.what()); } } } void HDF5Writer::Write(const EndMessage &msg) { if (master_file) { std::lock_guard lock(hdf5_mutex); master_file->Finalize(msg); } }