Files
Jungfraujoch/writer/StreamWriter.cpp
leonarski_f d315506633 * Enhancements for XFEL
* Enhancements for EIGER
* Writer is more flexible and capable of handling DECTRIS data
2024-03-05 20:41:47 +01:00

117 lines
4.2 KiB
C++

// Copyright (2019-2023) Paul Scherrer Institute
#include "../common/JFJochException.h"
#include "StreamWriter.h"
#include "HDF5Writer.h"
#include "HDF5NXmx.h"
#include "MakeDirectory.h"
StreamWriter::StreamWriter(ZMQContext &context, Logger &in_logger, const std::string &zmq_addr, const std::string& repub_address)
: image_puller(context, repub_address), logger(in_logger) {
image_puller.Connect(zmq_addr);
logger.Info("Connected via ZMQ to {}", zmq_addr);
}
void StreamWriter::CollectImages(std::vector<HDF5DataFileStatistics> &v) {
bool run = true;
while (run && (image_puller.GetFrameType() != CBORStream2Deserializer::Type::START))
run = image_puller.WaitForImage();
if (!run)
return;
StartMessage start_message = image_puller.GetStartMessage();
logger.Info("Starting writing for dataset {} of {} images", start_message.file_prefix, start_message.number_of_images);
state = StreamWriterState::Started;
uint64_t max_image_number = 0;
processed_images = 0;
processed_image_size = 0;
file_prefix = start_message.file_prefix;
CheckPath(start_message.file_prefix);
MakeDirectory(start_message.file_prefix);
HDF5Writer writer(start_message);
bool first_image = true;
run = image_puller.WaitForImage();
while (run && (image_puller.GetFrameType() == CBORStream2Deserializer::Type::IMAGE)) {
if (first_image) {
state = StreamWriterState::Receiving;
start_time = std::chrono::system_clock::now();
first_image = false;
}
auto image_array = image_puller.GetDataMessage();
writer.Write(image_array);
if (max_image_number < image_array.number + 1)
max_image_number = image_array.number + 1;
if (start_message.pixel_bit_depth == 0)
start_message.pixel_bit_depth = image_array.image.pixel_depth_bytes * 8;
processed_images++;
processed_image_size += image_array.image.size;
run = image_puller.WaitForImage();
}
if (image_puller.GetFrameType() == CBORStream2Deserializer::Type::END) {
EndMessage end_message = image_puller.GetEndMessage();
end_time = std::chrono::system_clock::now();
if ((end_message.max_image_number == 0) && (max_image_number > 0))
end_message.max_image_number = max_image_number;
if (!end_message.write_master_file || end_message.write_master_file.value())
HDF5Metadata::NXmx(start_message, end_message);
state = StreamWriterState::Idle;
}
writer.GetStatistics(v);
}
void StreamWriter::Cancel() {
logger.Info("Cancel requested");
image_puller.Abort();
}
StreamWriterOutput StreamWriter::Run() {
StreamWriterOutput ret;
try {
CollectImages(ret.data_file_stats);
} catch (JFJochException &e) {
// Error during collecting images will skip to end data collection
// End data collection will consume all images till the end
logger.ErrorException(e);
}
ret.image_puller_stats = GetStatistics();
logger.Info("Write task done. Images = {} Throughput = {:.0f} MB/s Frame rate = {:.0f} Hz",
ret.image_puller_stats.processed_images, ret.image_puller_stats.performance_MBs, ret.image_puller_stats.performance_Hz);
return ret;
}
StreamWriterStatistics StreamWriter::GetStatistics() const {
float perf_MBs = 0.0f, perf_Hz = 0.0f;
if ((state != StreamWriterState::Started) && (processed_images > 0)) {
int64_t time_us;
if (state == StreamWriterState::Idle)
time_us = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count();
else
time_us = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now() - start_time).count();
// MByte/s ==> Byte/us
perf_MBs = static_cast<float>(processed_image_size) / static_cast<float>(time_us);
perf_Hz = static_cast<float>(processed_images) * 1e6f / static_cast<float>(time_us);
}
return {
.processed_images = processed_images,
.performance_MBs = perf_MBs,
.performance_Hz = perf_Hz,
.file_prefix = file_prefix,
.state = state
};
}