// Copyright (2019-2023) Paul Scherrer Institute #include "../common/JFJochException.h" #include "StreamWriter.h" #include "HDF5Writer.h" StreamWriter::StreamWriter(Logger &in_logger, const std::string &zmq_addr, const std::string &repub_address, const std::string &in_file_done_address) : image_puller(repub_address), logger(in_logger), file_done_address(in_file_done_address), socket_number(0), run_number(0) { image_puller.Connect(zmq_addr); logger.Info("Connected via ZMQ to {}", zmq_addr); } void StreamWriter::CollectImages(std::vector &v) { bool run = WaitForImage(); while (run && !image_puller_output.cbor->start_message) { if (image_puller_output.cbor->msg_type == CBORImageType::IMAGE) logger.Warning("Missing meaningful image while waiting for START"); run = WaitForImage(); } if (!run) return; logger.Info("Starting writing for dataset {} of {} images", image_puller_output.cbor->start_message->file_prefix, image_puller_output.cbor->start_message->number_of_images); state = StreamWriterState::Started; uint64_t max_image_number = 0; processed_images = 0; processed_image_size = 0; file_prefix = image_puller_output.cbor->start_message->file_prefix; run_number = image_puller_output.cbor->start_message->run_number; run_name = image_puller_output.cbor->start_message->run_name; HDF5Writer writer(*image_puller_output.cbor->start_message); if (!file_done_address.empty()) writer.SetupFinalizedFileSocket(file_done_address); socket_number = 0; if (image_puller_output.cbor->start_message->socket_number) socket_number = image_puller_output.cbor->start_message->socket_number.value(); auto writer_notification_zmq_addr = image_puller_output.cbor->start_message->writer_notification_zmq_addr; bool first_image = true; run = WaitForImage(); while (run && image_puller_output.cbor->calibration) { try { writer.Write(*image_puller_output.cbor->calibration); } catch (const JFJochException &e) { logger.ErrorException(e); } run = WaitForImage(); } while (run && image_puller_output.cbor->data_message) { if (first_image) { state = StreamWriterState::Receiving; start_time = std::chrono::system_clock::now(); first_image = false; } writer.Write(*image_puller_output.cbor->data_message); if (max_image_number < image_puller_output.cbor->data_message->number + 1) max_image_number = image_puller_output.cbor->data_message->number + 1; processed_images++; processed_image_size += image_puller_output.cbor->data_message->image.size; run = WaitForImage(); } bool data_collection_seq_ok = false; // Data collection finished with a proper end message if (run && image_puller_output.cbor->end_message) { end_time = std::chrono::system_clock::now(); if ((image_puller_output.cbor->end_message->max_image_number == 0) && (max_image_number > 0)) image_puller_output.cbor->end_message->max_image_number = max_image_number; writer.Write(*image_puller_output.cbor->end_message); data_collection_seq_ok = true; } v = writer.Finalize(); state = data_collection_seq_ok ? StreamWriterState::Idle : StreamWriterState::Error; NotifyReceiverOnFinalizedWrite(writer_notification_zmq_addr, data_collection_seq_ok); } void StreamWriter::Cancel() { logger.Info("Cancel requested"); image_puller.Abort(); } StreamWriterOutput StreamWriter::Run() { StreamWriterOutput ret; try { CollectImages(ret.data_file_stats); } catch (JFJochException &e) { // Error during collecting images will skip to end data collection // End data collection will consume all images till the end logger.ErrorException(e); } ret.image_puller_stats = GetStatistics(); logger.Info("Write task done. Images = {} Throughput = {:.0f} MB/s Frame rate = {:.0f} Hz", ret.image_puller_stats.processed_images, ret.image_puller_stats.performance_MBs, ret.image_puller_stats.performance_Hz); return ret; } bool StreamWriter::WaitForImage() { try { image_puller_output = image_puller.WaitForImage(); return (image_puller_output.cbor != nullptr); } catch (const JFJochException &e) { logger.ErrorException(e); return false; } } StreamWriterStatistics StreamWriter::GetStatistics() const { float perf_MBs = 0.0f, perf_Hz = 0.0f; if ((state != StreamWriterState::Started) && (processed_images > 0)) { int64_t time_us; if (state == StreamWriterState::Idle) time_us = std::chrono::duration_cast(end_time - start_time).count(); else time_us = std::chrono::duration_cast(std::chrono::system_clock::now() - start_time).count(); // MByte/s ==> Byte/us perf_MBs = static_cast(processed_image_size) / static_cast(time_us); perf_Hz = static_cast(processed_images) * 1e6f / static_cast(time_us); } return { .processed_images = processed_images, .performance_MBs = perf_MBs, .performance_Hz = perf_Hz, .file_prefix = file_prefix, .run_name = run_name, .run_number = run_number, .socket_number = socket_number, .state = state }; } void StreamWriter::NotifyReceiverOnFinalizedWrite(const std::string &detector_update_zmq_addr, bool ok) { if (debug_skip_write_notification) { logger.Info("StreamWriter: Skipping notification"); return; } if (detector_update_zmq_addr.empty()) return; nlohmann::json j; auto stats = GetStatistics(); j["socket_number"] = socket_number; j["ok"] = ok; j["processed_images"] = processed_images.load(); j["socket_number"] = stats.socket_number; j["run_number"] = stats.run_number; j["run_name"] = stats.run_name; j["performance_MBs"] = stats.performance_MBs; try { logger.Info("Sending notification to {}", detector_update_zmq_addr); ZMQSocket s(ZMQSocketType::Push); s.SendTimeout(std::chrono::seconds(1)); s.Connect(detector_update_zmq_addr); s.Send(j.dump()); } catch (const JFJochException &e) { logger.ErrorException(e); } } void StreamWriter::DebugSkipWriteNotification(bool input) { debug_skip_write_notification = input; }