HDF5 file writer - per file analysis

This commit is contained in:
2023-04-19 23:09:26 +02:00
parent fae3cf5b42
commit f14693fe43
12 changed files with 123 additions and 96 deletions
+2
View File
@@ -322,6 +322,8 @@ void JFJochStateMachine::SetFullMeasurementOutput(JFJochProtoBuf::BrokerFullStat
for (const auto &i: output.writer()) {
writer_perf += i.performance_mbs();
images_written += i.nimages();
for (const auto &f: i.file_statistics())
*tmp.add_file_statistics() = f;
}
tmp.set_writer_performance_mbs(writer_perf);
+8
View File
@@ -286,10 +286,16 @@ message WriterInput {
int64 series_id = 2;
}
message DataFileStatistics {
string name = 1;
int64 nimages = 2;
}
message WriterOutput {
int64 nimages = 1;
float performance_MBs = 2;
float performance_Hz = 3;
repeated DataFileStatistics file_statistics = 4;
}
// Detector
@@ -455,6 +461,8 @@ message MeasurementStatistics {
int64 detector_width = 12;
int64 detector_height = 13;
int64 detector_pixel_depth = 14;
repeated DataFileStatistics file_statistics = 15;
}
message BrokerStatus {
+63 -61
View File
File diff suppressed because one or more lines are too long
+2 -1
View File
@@ -105,7 +105,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ", "[JFJochReceiver]") {
REQUIRE(statistics.detector_width() == 2068);
REQUIRE(statistics.detector_height() == 2164);
REQUIRE(statistics.detector_pixel_depth() == 2);
REQUIRE(statistics.file_statistics_size() == 2);
fpga_receiver_server->Shutdown();
writer_server->Shutdown();
}
@@ -393,6 +393,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_3Writers", "[JFJochReceiver]") {
REQUIRE(statistics.collection_efficiency() == 1.0);
REQUIRE(statistics.images_collected() == nimages);
REQUIRE(statistics.images_written() == nimages);
REQUIRE(statistics.file_statistics_size() == 5);
fpga_receiver_server->Shutdown();
writer_server_0->Shutdown();
+6 -6
View File
@@ -101,14 +101,14 @@ void HDF5DataFile::Write(const DataMessage &msg, uint64_t image_number) {
timestamp[image_number] = msg.timestamp;
}
size_t HDF5DataFile::GetMaxImageNumber() const {
return max_image_number;
HDF5DataFileStatistics HDF5DataFile::GetStatistics() const {
HDF5DataFileStatistics ret;
ret.max_image_number = max_image_number;
ret.total_images = nimages;
ret.filename = filename;
return ret;
}
size_t HDF5DataFile::GetNumImages() const {
return nimages;
}
std::string HDF5DataFile::GetFilename() const {
return filename;
}
+7 -3
View File
@@ -12,9 +12,14 @@
#include "../common/SpotToSave.h"
#include "../frame_serialize/ImageMessage.h"
struct HDF5DataFileStatistics {
std::string filename;
uint64_t max_image_number;
uint64_t total_images;
};
class HDF5DataFile {
std::string filename;
std::string master_filename;
std::unique_ptr<HDF5File> data_file = nullptr;
std::unique_ptr<HDF5DataSet> data_set = nullptr;
@@ -47,9 +52,8 @@ public:
size_t max_spots = 0);
~HDF5DataFile();
void Write(const DataMessage& msg, uint64_t image_number);
size_t GetMaxImageNumber() const;
HDF5DataFileStatistics GetStatistics() const;
size_t GetNumImages() const;
std::string GetFilename() const;
};
#endif //HDF5DATAFILE_H
+8
View File
@@ -28,3 +28,11 @@ void HDF5Writer::Write(const DataMessage& message) {
files[file_number]->Write(message, message.number / files.size());
}
void HDF5Writer::GetStatistics(std::vector<HDF5DataFileStatistics> &v) const {
for (const auto &f: files) {
auto tmp = f->GetStatistics();
if (tmp.total_images > 0)
v.push_back(tmp);
}
}
+1
View File
@@ -15,6 +15,7 @@ class HDF5Writer {
public:
explicit HDF5Writer(const StartMessage &request);
void Write(const DataMessage& msg);
void GetStatistics(std::vector<HDF5DataFileStatistics> &v) const;
};
#endif //JUNGFRAUJOCH_HDF5WRITER_H
+9 -4
View File
@@ -34,10 +34,15 @@ grpc::Status JFJochWriterService::Stop(grpc::ServerContext *context, const JFJoc
std::shared_lock<std::shared_mutex> ul(m);
logger.Info("Stopping writer");
if (writer_future.valid()) {
ZMQImagePullerStatistics ret = writer_future.get();
response->set_nimages(ret.processed_images);
response->set_performance_mbs(ret.performance_MBs);
response->set_performance_hz(ret.performance_Hz);
auto ret = writer_future.get();
response->set_nimages(ret.image_puller_stats.processed_images);
response->set_performance_mbs(ret.image_puller_stats.performance_MBs);
response->set_performance_hz(ret.image_puller_stats.performance_Hz);
for (const auto &f: ret.data_file_stats) {
auto *tmp = response->add_file_statistics();
tmp->set_name(f.filename);
tmp->set_nimages(f.max_image_number + 1);
}
} else {
response->set_nimages(0);
response->set_performance_mbs(0);
+1 -1
View File
@@ -13,7 +13,7 @@ class JFJochWriterService final : public JFJochProtoBuf::gRPC_JFJochWriter::Serv
std::unique_ptr<StreamWriter> writer;
Logger &logger;
ZMQContext &zmq_context;
std::future<ZMQImagePullerStatistics> writer_future;
std::future<StreamWriterStatistics> writer_future;
public:
JFJochWriterService(ZMQContext &in_context, Logger &in_logger);
JFJochWriterService &BaseDirectory(const std::string& input);
+8 -17
View File
@@ -26,7 +26,7 @@ void StreamWriter::StartDataCollection() {
MakeDirectory(start_message.file_prefix);
}
void StreamWriter::CollectImages() {
void StreamWriter::CollectImages(std::vector<HDF5DataFileStatistics> &v) {
HDF5Writer writer(start_message);
image_puller.WaitForImage();
while (image_puller.GetFrameType() == JFJochFrameDeserializer::Type::IMAGE) {
@@ -34,6 +34,7 @@ void StreamWriter::CollectImages() {
writer.Write(image_array);
image_puller.WaitForImage();
}
writer.GetStatistics(v);
}
void StreamWriter::EndDataCollection() {
@@ -51,10 +52,11 @@ void StreamWriter::Abort() {
image_puller.Abort();
}
ZMQImagePullerStatistics StreamWriter::Run() {
StreamWriterStatistics StreamWriter::Run() {
StreamWriterStatistics ret;
StartDataCollection();
try {
CollectImages();
CollectImages(ret.data_file_stats);
} catch (JFJochException &e) {
// Error during collecting images will skip to end data collection
// End data collection will consume all images till the end
@@ -62,19 +64,8 @@ ZMQImagePullerStatistics StreamWriter::Run() {
}
EndDataCollection();
auto puller_stats = image_puller.GetStatistics();
ret.image_puller_stats = image_puller.GetStatistics();
logger.Info("Write task done. Images = {} Throughput = {:.0f} MB/s Frame rate = {:.0f} Hz",
puller_stats.processed_images, puller_stats.performance_MBs, puller_stats.performance_Hz);
return puller_stats;
ret.image_puller_stats.processed_images, ret.image_puller_stats.performance_MBs, ret.image_puller_stats.performance_Hz);
return ret;
}
void StreamWriter::BaseDirectory(const std::string &input) {
logger.Info("Setting base directory to " + input);
try {
std::filesystem::current_path(input);
} catch (const std::filesystem::filesystem_error& err) {
logger.Error("Cannot set base directory: " + std::string(err.what()));
throw JFJochException(JFJochExceptionCategory::FileWriteError,
"Cannot set base directory " + std::string(err.what()));
}
}
+8 -3
View File
@@ -5,6 +5,12 @@
#define JUNGFRAUJOCH_STREAMWRITER_H
#include "ZMQImagePuller.h"
#include "HDF5DataFile.h"
struct StreamWriterStatistics {
ZMQImagePullerStatistics image_puller_stats;
std::vector<HDF5DataFileStatistics> data_file_stats;
};
class StreamWriter {
ZMQImagePuller image_puller;
@@ -12,13 +18,12 @@ class StreamWriter {
StartMessage start_message;
void StartDataCollection();
void CollectImages();
void CollectImages(std::vector<HDF5DataFileStatistics> &v);
void EndDataCollection();
public:
StreamWriter(ZMQContext& context, Logger &logger, const std::string& zmq_addr);
ZMQImagePullerStatistics Run();
StreamWriterStatistics Run();
void Abort();
void BaseDirectory(const std::string& path);
};