186 lines
6.6 KiB
C++
186 lines
6.6 KiB
C++
// Copyright (2019-2023) Paul Scherrer Institute
|
|
|
|
#include "../common/JFJochException.h"
|
|
#include "StreamWriter.h"
|
|
#include "HDF5Writer.h"
|
|
#include "HDF5NXmx.h"
|
|
#include "MakeDirectory.h"
|
|
|
|
StreamWriter::StreamWriter(Logger &in_logger,
|
|
const std::string &zmq_addr,
|
|
const std::string &repub_address,
|
|
const std::string &in_file_done_address)
|
|
: image_puller(repub_address),
|
|
logger(in_logger),
|
|
file_done_address(in_file_done_address),
|
|
socket_number(0),
|
|
run_number(0) {
|
|
image_puller.Connect(zmq_addr);
|
|
logger.Info("Connected via ZMQ to {}", zmq_addr);
|
|
}
|
|
|
|
void StreamWriter::CollectImages(std::vector<HDF5DataFileStatistics> &v) {
|
|
bool run = WaitForImage();
|
|
|
|
while (run && !image_puller_output.cbor->start_message) {
|
|
if (image_puller_output.cbor->msg_type == CBORImageType::IMAGE)
|
|
logger.Warning("Missing meaningful image while waiting for START");
|
|
run = WaitForImage();
|
|
}
|
|
|
|
if (!run)
|
|
return;
|
|
|
|
logger.Info("Starting writing for dataset {} of {} images",
|
|
image_puller_output.cbor->start_message->file_prefix,
|
|
image_puller_output.cbor->start_message->number_of_images);
|
|
state = StreamWriterState::Started;
|
|
uint64_t max_image_number = 0;
|
|
|
|
processed_images = 0;
|
|
processed_image_size = 0;
|
|
|
|
file_prefix = image_puller_output.cbor->start_message->file_prefix;
|
|
run_number = image_puller_output.cbor->start_message->run_number;
|
|
run_name = image_puller_output.cbor->start_message->run_name;
|
|
|
|
CheckPath(image_puller_output.cbor->start_message->file_prefix);
|
|
MakeDirectory(image_puller_output.cbor->start_message->file_prefix);
|
|
HDF5Writer writer(*image_puller_output.cbor->start_message);
|
|
|
|
if (!file_done_address.empty())
|
|
writer.SetupSocket(file_done_address);
|
|
|
|
std::unique_ptr<NXmx> master_file;
|
|
if (!image_puller_output.cbor->start_message->write_master_file || image_puller_output.cbor->start_message->write_master_file.value())
|
|
master_file = std::make_unique<NXmx>(*image_puller_output.cbor->start_message);
|
|
|
|
socket_number = 0;
|
|
if (image_puller_output.cbor->start_message->socket_number)
|
|
socket_number = image_puller_output.cbor->start_message->socket_number.value();
|
|
|
|
std::string detector_update_zmq_addr = image_puller_output.cbor->start_message->writer_notification_zmq_addr;
|
|
|
|
bool first_image = true;
|
|
run = WaitForImage();
|
|
while (run && image_puller_output.cbor->calibration) {
|
|
if (master_file)
|
|
master_file->WriteCalibration(*image_puller_output.cbor->calibration);
|
|
run = WaitForImage();
|
|
}
|
|
|
|
while (run && image_puller_output.cbor->data_message) {
|
|
if (first_image) {
|
|
state = StreamWriterState::Receiving;
|
|
start_time = std::chrono::system_clock::now();
|
|
first_image = false;
|
|
}
|
|
|
|
writer.Write(*image_puller_output.cbor->data_message);
|
|
if (max_image_number < image_puller_output.cbor->data_message->number + 1)
|
|
max_image_number = image_puller_output.cbor->data_message->number + 1;
|
|
|
|
processed_images++;
|
|
processed_image_size += image_puller_output.cbor->data_message->image.size;
|
|
run = WaitForImage();
|
|
}
|
|
|
|
bool clean_end = false;
|
|
|
|
if (run && image_puller_output.cbor->end_message) {
|
|
end_time = std::chrono::system_clock::now();
|
|
|
|
if ((image_puller_output.cbor->end_message->max_image_number == 0) && (max_image_number > 0))
|
|
image_puller_output.cbor->end_message->max_image_number = max_image_number;
|
|
|
|
if (master_file)
|
|
master_file->Finalize(*image_puller_output.cbor->end_message);
|
|
|
|
clean_end = true;
|
|
}
|
|
|
|
master_file.reset();
|
|
v = writer.Finalize();
|
|
state = clean_end ? StreamWriterState::Idle : StreamWriterState::Error;
|
|
|
|
if (!detector_update_zmq_addr.empty()) {
|
|
nlohmann::json j;
|
|
|
|
auto stats = GetStatistics();
|
|
j["socket_number"] = socket_number;
|
|
j["ok"] = clean_end ? true : false;
|
|
j["processed_images"] = processed_images.load();
|
|
j["socket_number"] = stats.socket_number;
|
|
j["run_number"] = stats.run_number;
|
|
j["run_name"] = stats.run_name;
|
|
j["performance_MBs"] = stats.performance_MBs;
|
|
|
|
try {
|
|
ZMQSocket s(ZMQSocketType::Push);
|
|
s.SendTimeout(std::chrono::seconds(1));
|
|
s.Connect(detector_update_zmq_addr);
|
|
s.Send(j.dump());
|
|
} catch (const JFJochException &e) {
|
|
logger.ErrorException(e);
|
|
}
|
|
}
|
|
}
|
|
|
|
void StreamWriter::Cancel() {
|
|
logger.Info("Cancel requested");
|
|
image_puller.Abort();
|
|
}
|
|
|
|
StreamWriterOutput StreamWriter::Run() {
|
|
StreamWriterOutput ret;
|
|
try {
|
|
CollectImages(ret.data_file_stats);
|
|
} catch (JFJochException &e) {
|
|
// Error during collecting images will skip to end data collection
|
|
// End data collection will consume all images till the end
|
|
logger.ErrorException(e);
|
|
}
|
|
|
|
ret.image_puller_stats = GetStatistics();
|
|
logger.Info("Write task done. Images = {} Throughput = {:.0f} MB/s Frame rate = {:.0f} Hz",
|
|
ret.image_puller_stats.processed_images, ret.image_puller_stats.performance_MBs, ret.image_puller_stats.performance_Hz);
|
|
return ret;
|
|
}
|
|
|
|
bool StreamWriter::WaitForImage() {
|
|
try {
|
|
image_puller_output = image_puller.WaitForImage();
|
|
return (image_puller_output.cbor != nullptr);
|
|
} catch (const JFJochException &e) {
|
|
logger.ErrorException(e);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
StreamWriterStatistics StreamWriter::GetStatistics() const {
|
|
float perf_MBs = 0.0f, perf_Hz = 0.0f;
|
|
|
|
if ((state != StreamWriterState::Started) && (processed_images > 0)) {
|
|
int64_t time_us;
|
|
if (state == StreamWriterState::Idle)
|
|
time_us = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count();
|
|
else
|
|
time_us = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now() - start_time).count();
|
|
|
|
// MByte/s ==> Byte/us
|
|
perf_MBs = static_cast<float>(processed_image_size) / static_cast<float>(time_us);
|
|
perf_Hz = static_cast<float>(processed_images) * 1e6f / static_cast<float>(time_us);
|
|
}
|
|
|
|
return {
|
|
.processed_images = processed_images,
|
|
.performance_MBs = perf_MBs,
|
|
.performance_Hz = perf_Hz,
|
|
.file_prefix = file_prefix,
|
|
.run_name = run_name,
|
|
.run_number = run_number,
|
|
.socket_number = socket_number,
|
|
.state = state
|
|
};
|
|
}
|