// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-only #include "../common/JFJochException.h" #include "StreamWriter.h" #include #include "FileWriter.h" StreamWriter::StreamWriter(Logger &in_logger, ImagePuller &in_image_puller, std::string in_file_done_address, bool in_verbose) : verbose(in_verbose), image_puller(in_image_puller), logger(in_logger), file_done_address(std::move(in_file_done_address)), socket_number(0), run_number(0), max_image_number(0) { } void StreamWriter::ProcessStartMessage() { if (state == StreamWriterState::Finalized) return; // Should not happen (?) if (state != StreamWriterState::Idle) FinalizeDataCollection(); err = ""; max_image_number = 0; processed_images = 0; processed_image_size = 0; file_prefix = image_puller_output.cbor->start_message->file_prefix; run_number = image_puller_output.cbor->start_message->run_number; run_name = image_puller_output.cbor->start_message->run_name; socket_number = 0; if (image_puller_output.cbor->start_message->socket_number) socket_number = image_puller_output.cbor->start_message->socket_number.value(); writer_notification_zmq_addr = image_puller_output.cbor->start_message->writer_notification_zmq_addr; try { file_writer = std::make_unique(*image_puller_output.cbor->start_message); if (!file_done_address.empty()) file_writer->SetupFinalizedFileSocket(file_done_address); logger.Info("Starting writing for dataset {} of {} images", image_puller_output.cbor->start_message->file_prefix, image_puller_output.cbor->start_message->number_of_images); state = StreamWriterState::Started; } catch (const JFJochException &e) { logger.ErrorException(e); logger.Error("Error writing start message - switching to error state"); state = StreamWriterState::Error; err = e.what(); } } void StreamWriter::ProcessCalibrationImage() { switch (state) { case StreamWriterState::Started: try { file_writer->WriteHDF5(*image_puller_output.cbor->calibration); } catch (const std::exception &e) { logger.Warning(e.what()); logger.Warning("Error during writing calibration data - skipping"); } break; case StreamWriterState::Receiving: logger.Warning("Unexpected calibration message"); break; case StreamWriterState::Error: case StreamWriterState::Idle: case StreamWriterState::Finalized: break; } } void StreamWriter::ProcessDataImage() { switch (state) { case StreamWriterState::Idle: logger.Warning("Missing meaningful image while waiting for START"); mute_data_msg_in_idle = true; break; case StreamWriterState::Started: start_time = std::chrono::system_clock::now(); state = StreamWriterState::Receiving; // Follow through to receiving - no brake! case StreamWriterState::Receiving: try { if (verbose) logger.Info("Received data message {}", image_puller_output.cbor->data_message->number); file_writer->Write(*image_puller_output.cbor->data_message); if (max_image_number < image_puller_output.cbor->data_message->number + 1) max_image_number = image_puller_output.cbor->data_message->number + 1; processed_images++; processed_image_size += image_puller_output.cbor->data_message->image.GetCompressedSize(); if (verbose) logger.Info("Written"); } catch (const JFJochException &e) { logger.ErrorException(e); logger.Warning("Error writing image - switching to error state"); state = StreamWriterState::Error; err = e.what(); } break; case StreamWriterState::Error: case StreamWriterState::Finalized: break; } } void StreamWriter::ProcessEndMessage() { // Ignore end message when idle state! if (state == StreamWriterState::Idle || state == StreamWriterState::Finalized) return; if (verbose) logger.Info("Received end message"); if (state != StreamWriterState::Error) { try { if ((image_puller_output.cbor->end_message->max_image_number == 0) && (max_image_number > 0)) image_puller_output.cbor->end_message->max_image_number = max_image_number; file_writer->WriteHDF5(*image_puller_output.cbor->end_message); } catch (const JFJochException &e) { logger.ErrorException(e); logger.Error("Error writing end message - switching to error state"); state = StreamWriterState::Error; err = e.what(); } } FinalizeDataCollection(); } void StreamWriter::FinalizeDataCollection() { end_time = std::chrono::system_clock::now(); if (file_writer && (state != StreamWriterState::Error)) { try { hdf5_data_file_statistics = file_writer->Finalize(); } catch (JFJochException &e) { state = StreamWriterState::Error; err = e.what(); logger.ErrorException(e); logger.Error("Error finalizing writing - switching to error state"); } } else { hdf5_data_file_statistics.clear(); } file_writer.reset(); NotifyReceiverOnFinalizedWrite(writer_notification_zmq_addr); logger.Info("Data writing finished"); state = StreamWriterState::Finalized; } void StreamWriter::CollectImages() { state = StreamWriterState::Idle; mute_data_msg_in_idle = false; bool run = true; while (run && state != StreamWriterState::Finalized) { run = WaitForImage(); if (image_puller_output.cbor->start_message) ProcessStartMessage(); else if (image_puller_output.cbor->calibration) ProcessCalibrationImage(); else if (image_puller_output.cbor->data_message) ProcessDataImage(); else if (image_puller_output.cbor->end_message) ProcessEndMessage(); else logger.Warning("Unknown message type"); } } void StreamWriter::Cancel() { logger.Info("Cancel requested"); abort = true; } StreamWriterOutput StreamWriter::Run() { hdf5_data_file_statistics.clear(); try { CollectImages(); } catch (std::exception &e) { // Error during collecting images will skip to end data collection // End data collection will consume all images till the end logger.ErrorException(e); logger.Error("Exception not properly handled by CollectImages()"); } StreamWriterOutput ret; ret.data_file_stats = hdf5_data_file_statistics; ret.stats = GetStatistics(); logger.Info("Write task done. Images = {} Throughput = {:.0f} MB/s Frame rate = {:.0f} Hz max occupation of FIFO {}", ret.stats.processed_images, ret.stats.performance_MBs, ret.stats.performance_Hz, ret.stats.max_puller_fifo_utilization); return ret; } bool StreamWriter::WaitForImage() { try { std::optional ret; while (!ret && !abort) ret = image_puller.PollImage(); if (ret.has_value()) image_puller_output = ret.value(); return ret.has_value(); } catch (const JFJochException &e) { logger.ErrorException(e); return false; } } StreamWriterStatistics StreamWriter::GetStatistics() const { float perf_MBs = 0.0f, perf_Hz = 0.0f; if ((state != StreamWriterState::Started) && (processed_images > 0)) { int64_t time_us; if (state == StreamWriterState::Idle) time_us = std::chrono::duration_cast(end_time - start_time).count(); else time_us = std::chrono::duration_cast(std::chrono::system_clock::now() - start_time).count(); // MByte/s ==> Byte/us perf_MBs = static_cast(processed_image_size) / static_cast(time_us); perf_Hz = static_cast(processed_images) * 1e6f / static_cast(time_us); } return { .processed_images = processed_images, .performance_MBs = perf_MBs, .performance_Hz = perf_Hz, .file_prefix = file_prefix, .run_name = run_name, .run_number = run_number, .socket_number = socket_number, .state = state, .max_puller_fifo_utilization = image_puller.GetMaxFifoUtilization() }; } void StreamWriter::NotifyReceiverOnFinalizedWrite(const std::string &detector_update_zmq_addr) { if (debug_skip_write_notification) { logger.Info("StreamWriter: Skipping notification"); return; } if (detector_update_zmq_addr.empty()) return; nlohmann::json j; auto stats = GetStatistics(); j["socket_number"] = socket_number; j["processed_images"] = processed_images.load(); j["socket_number"] = stats.socket_number; j["run_number"] = stats.run_number; j["run_name"] = stats.run_name; j["performance_MBs"] = stats.performance_MBs; if (state == StreamWriterState::Error) { j["ok"] = false; j["error"] = err; } else j["ok"] = true; try { logger.Info("Sending notification to {}", detector_update_zmq_addr); ZMQSocket s(ZMQSocketType::Push); s.SendTimeout(std::chrono::seconds(1)); s.Connect(detector_update_zmq_addr); s.Send(j.dump()); } catch (const JFJochException &e) { logger.ErrorException(e); logger.Error("Error sending notification to detector update socket"); } } void StreamWriter::DebugSkipWriteNotification(bool input) { debug_skip_write_notification = input; }