Files
Jungfraujoch/writer/StreamWriter.cpp
2025-03-02 13:15:28 +01:00

201 lines
7.3 KiB
C++

// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
// SPDX-License-Identifier: GPL-3.0-only
#include "../common/JFJochException.h"
#include "StreamWriter.h"
#include "HDF5Writer.h"
StreamWriter::StreamWriter(Logger &in_logger,
const std::string &zmq_addr,
const std::string &repub_address,
const std::string &in_file_done_address,
const std::optional<int32_t> &rcv_watermark,
const std::optional<int32_t> &repub_watermark)
: image_puller(repub_address, rcv_watermark, repub_watermark),
logger(in_logger),
file_done_address(in_file_done_address),
socket_number(0),
run_number(0) {
image_puller.Connect(zmq_addr);
logger.Info("Connected via ZMQ to {}", zmq_addr);
}
void StreamWriter::CollectImages(std::vector<HDF5DataFileStatistics> &v) {
bool run = WaitForImage();
while (run && !image_puller_output.cbor->start_message) {
if (image_puller_output.cbor->msg_type == CBORImageType::IMAGE)
logger.Warning("Missing meaningful image while waiting for START");
run = WaitForImage();
}
if (!run)
return;
logger.Info("Starting writing for dataset {} of {} images",
image_puller_output.cbor->start_message->file_prefix,
image_puller_output.cbor->start_message->number_of_images);
state = StreamWriterState::Started;
uint64_t max_image_number = 0;
processed_images = 0;
processed_image_size = 0;
file_prefix = image_puller_output.cbor->start_message->file_prefix;
run_number = image_puller_output.cbor->start_message->run_number;
run_name = image_puller_output.cbor->start_message->run_name;
HDF5Writer writer(*image_puller_output.cbor->start_message);
if (!file_done_address.empty())
writer.SetupFinalizedFileSocket(file_done_address);
socket_number = 0;
if (image_puller_output.cbor->start_message->socket_number)
socket_number = image_puller_output.cbor->start_message->socket_number.value();
auto writer_notification_zmq_addr = image_puller_output.cbor->start_message->writer_notification_zmq_addr;
bool first_image = true;
run = WaitForImage();
while (run && image_puller_output.cbor->calibration) {
try {
writer.Write(*image_puller_output.cbor->calibration);
} catch (const JFJochException &e) {
logger.ErrorException(e);
}
run = WaitForImage();
}
while (run && image_puller_output.cbor->data_message) {
if (first_image) {
state = StreamWriterState::Receiving;
start_time = std::chrono::system_clock::now();
first_image = false;
}
writer.Write(*image_puller_output.cbor->data_message);
if (max_image_number < image_puller_output.cbor->data_message->number + 1)
max_image_number = image_puller_output.cbor->data_message->number + 1;
logger.Info("Processed image {}",image_puller_output.cbor->data_message->number);
processed_images++;
processed_image_size += image_puller_output.cbor->data_message->image.size;
run = WaitForImage();
}
bool data_collection_seq_ok = false; // Data collection finished with a proper end message
if (run && image_puller_output.cbor->end_message) {
end_time = std::chrono::system_clock::now();
if ((image_puller_output.cbor->end_message->max_image_number == 0) && (max_image_number > 0))
image_puller_output.cbor->end_message->max_image_number = max_image_number;
writer.Write(*image_puller_output.cbor->end_message);
data_collection_seq_ok = true;
}
v = writer.Finalize();
state = data_collection_seq_ok ? StreamWriterState::Idle : StreamWriterState::Error;
NotifyReceiverOnFinalizedWrite(writer_notification_zmq_addr, data_collection_seq_ok);
}
void StreamWriter::Cancel() {
logger.Info("Cancel requested");
image_puller.Abort();
}
StreamWriterOutput StreamWriter::Run() {
StreamWriterOutput ret;
try {
CollectImages(ret.data_file_stats);
} catch (JFJochException &e) {
// Error during collecting images will skip to end data collection
// End data collection will consume all images till the end
logger.ErrorException(e);
}
ret.stats = GetStatistics();
logger.Info("Write task done. Images = {} Throughput = {:.0f} MB/s Frame rate = {:.0f} Hz max occupation of queues (C/O/R) {}/{}/{}",
ret.stats.processed_images, ret.stats.performance_MBs, ret.stats.performance_Hz,
ret.stats.puller_stats.cbor_fifo_max_util,
ret.stats.puller_stats.outside_fifo_max_util,
ret.stats.puller_stats.repub_fifo_max_util);
return ret;
}
bool StreamWriter::WaitForImage() {
try {
image_puller_output = image_puller.WaitForImage();
return (image_puller_output.cbor != nullptr);
} catch (const JFJochException &e) {
logger.ErrorException(e);
return false;
}
}
StreamWriterStatistics StreamWriter::GetStatistics() const {
float perf_MBs = 0.0f, perf_Hz = 0.0f;
if ((state != StreamWriterState::Started) && (processed_images > 0)) {
int64_t time_us;
if (state == StreamWriterState::Idle)
time_us = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count();
else
time_us = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now() - start_time).count();
// MByte/s ==> Byte/us
perf_MBs = static_cast<float>(processed_image_size) / static_cast<float>(time_us);
perf_Hz = static_cast<float>(processed_images) * 1e6f / static_cast<float>(time_us);
}
return {
.processed_images = processed_images,
.performance_MBs = perf_MBs,
.performance_Hz = perf_Hz,
.file_prefix = file_prefix,
.run_name = run_name,
.run_number = run_number,
.socket_number = socket_number,
.state = state,
.puller_stats = image_puller.GetStatistics()
};
}
void StreamWriter::NotifyReceiverOnFinalizedWrite(const std::string &detector_update_zmq_addr,
bool ok) {
if (debug_skip_write_notification) {
logger.Info("StreamWriter: Skipping notification");
return;
}
if (detector_update_zmq_addr.empty())
return;
nlohmann::json j;
auto stats = GetStatistics();
j["socket_number"] = socket_number;
j["ok"] = ok;
j["processed_images"] = processed_images.load();
j["socket_number"] = stats.socket_number;
j["run_number"] = stats.run_number;
j["run_name"] = stats.run_name;
j["performance_MBs"] = stats.performance_MBs;
try {
logger.Info("Sending notification to {}", detector_update_zmq_addr);
ZMQSocket s(ZMQSocketType::Push);
s.SendTimeout(std::chrono::seconds(1));
s.Connect(detector_update_zmq_addr);
s.Send(j.dump());
} catch (const JFJochException &e) {
logger.ErrorException(e);
}
}
void StreamWriter::DebugSkipWriteNotification(bool input) {
debug_skip_write_notification = input;
}