// Copyright (2019-2023) Paul Scherrer Institute #include "../common/JFJochException.h" #include "StreamWriter.h" #include "HDF5Writer.h" #include "HDF5NXmx.h" #include "MakeDirectory.h" StreamWriter::StreamWriter(ZMQContext &context, Logger &in_logger, const std::string &zmq_addr, const std::string& repub_address) : image_puller(context, repub_address), logger(in_logger) { image_puller.Connect(zmq_addr); logger.Info("Connected via ZMQ to {}", zmq_addr); } void StreamWriter::CollectImages(std::vector &v) { bool run = true; while (run && (image_puller.GetFrameType() != CBORStream2Deserializer::Type::START)) { if (image_puller.GetFrameType() == CBORStream2Deserializer::Type::IMAGE) logger.Warning("Missing meaningful image while waiting for START"); run = WaitForImage(); } if (!run) return; StartMessage start_message = image_puller.GetStartMessage(); logger.Info("Starting writing for dataset {} of {} images", start_message.file_prefix, start_message.number_of_images); state = StreamWriterState::Started; uint64_t max_image_number = 0; processed_images = 0; processed_image_size = 0; file_prefix = start_message.file_prefix; CheckPath(start_message.file_prefix); MakeDirectory(start_message.file_prefix); HDF5Writer writer(start_message); bool first_image = true; run = WaitForImage(); while (run && (image_puller.GetFrameType() == CBORStream2Deserializer::Type::IMAGE)) { if (first_image) { state = StreamWriterState::Receiving; start_time = std::chrono::system_clock::now(); first_image = false; } auto image_array = image_puller.GetDataMessage(); writer.Write(image_array); if (max_image_number < image_array.number + 1) max_image_number = image_array.number + 1; if (start_message.pixel_bit_depth == 0) start_message.pixel_bit_depth = image_array.image.pixel_depth_bytes * 8; processed_images++; processed_image_size += image_array.image.size; run = WaitForImage(); } if (image_puller.GetFrameType() == CBORStream2Deserializer::Type::END) { EndMessage end_message = image_puller.GetEndMessage(); end_time = std::chrono::system_clock::now(); if ((end_message.max_image_number == 0) && (max_image_number > 0)) end_message.max_image_number = max_image_number; if (!end_message.write_master_file || end_message.write_master_file.value()) HDF5Metadata::NXmx(start_message, end_message); state = StreamWriterState::Idle; } writer.GetStatistics(v); } void StreamWriter::Cancel() { logger.Info("Cancel requested"); image_puller.Abort(); } StreamWriterOutput StreamWriter::Run() { StreamWriterOutput ret; try { CollectImages(ret.data_file_stats); } catch (JFJochException &e) { // Error during collecting images will skip to end data collection // End data collection will consume all images till the end logger.ErrorException(e); } ret.image_puller_stats = GetStatistics(); logger.Info("Write task done. Images = {} Throughput = {:.0f} MB/s Frame rate = {:.0f} Hz", ret.image_puller_stats.processed_images, ret.image_puller_stats.performance_MBs, ret.image_puller_stats.performance_Hz); return ret; } bool StreamWriter::WaitForImage() { try { return image_puller.WaitForImage(); } catch (const JFJochException &e) { logger.ErrorException(e); return false; } } StreamWriterStatistics StreamWriter::GetStatistics() const { float perf_MBs = 0.0f, perf_Hz = 0.0f; if ((state != StreamWriterState::Started) && (processed_images > 0)) { int64_t time_us; if (state == StreamWriterState::Idle) time_us = std::chrono::duration_cast(end_time - start_time).count(); else time_us = std::chrono::duration_cast(std::chrono::system_clock::now() - start_time).count(); // MByte/s ==> Byte/us perf_MBs = static_cast(processed_image_size) / static_cast(time_us); perf_Hz = static_cast(processed_images) * 1e6f / static_cast(time_us); } return { .processed_images = processed_images, .performance_MBs = perf_MBs, .performance_Hz = perf_Hz, .file_prefix = file_prefix, .state = state }; }