From aefe95920c706faf1ba15ca8119b96d2236b7620 Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Mon, 10 Apr 2023 20:35:33 +0200 Subject: [PATCH] DataMessage: use it in ZMQImagePuller, StreamWriter, HDF5DataFile, HDF5Writer --- frame_serialize/JFJochFrameDeserializer.cpp | 2 + frame_serialize/JFJochFrameSerializer.cpp | 4 +- frame_serialize/StartMessage.h | 2 + receiver/JFJochReceiver.cpp | 10 ++- tests/CBORTest.cpp | 4 +- tests/HDF5WritingTest.cpp | 36 ++++++++--- tests/ZMQImagePusherTest.cpp | 18 +++--- tools/HDF5DatasetWriteTest.cpp | 10 ++- writer/HDF5DataFile.cpp | 68 ++++++++++----------- writer/HDF5DataFile.h | 8 +-- writer/HDF5Writer.cpp | 10 +-- writer/HDF5Writer.h | 4 +- writer/StreamWriter.cpp | 5 +- writer/ZMQImagePuller.cpp | 18 +----- writer/ZMQImagePuller.h | 4 +- 15 files changed, 111 insertions(+), 92 deletions(-) diff --git a/frame_serialize/JFJochFrameDeserializer.cpp b/frame_serialize/JFJochFrameDeserializer.cpp index 4f9e1137..b7b1f711 100644 --- a/frame_serialize/JFJochFrameDeserializer.cpp +++ b/frame_serialize/JFJochFrameDeserializer.cpp @@ -434,6 +434,8 @@ void JFJochFrameDeserializer::ProcessUserDataElement(CborValue &value) { start_message.pixel_signed = GetCBORBool(map_value); else if (key == "min_value") start_message.min_value = GetCBORInt(map_value); + else if (key == "rad_int_bin_number") + start_message.rad_int_bin_number = GetCBORInt(map_value); else if (key == "storage_cell_number") start_message.storage_cell_number = GetCBORUInt(map_value); else if (key == "compression_algorithm") { diff --git a/frame_serialize/JFJochFrameSerializer.cpp b/frame_serialize/JFJochFrameSerializer.cpp index e66e324b..d05efab8 100644 --- a/frame_serialize/JFJochFrameSerializer.cpp +++ b/frame_serialize/JFJochFrameSerializer.cpp @@ -245,7 +245,7 @@ inline void CBOR_ENC_USER_DATA(CborEncoder &encoder, const StartMessage& message CborEncoder mapEncoder; cborErr(cbor_encode_text_stringz(&encoder, "user_data")); - cborErr(cbor_encoder_create_map(&encoder, &mapEncoder, 16)); + cborErr(cbor_encoder_create_map(&encoder, &mapEncoder, 17)); CBOR_ENC(mapEncoder, "file_prefix", message.file_prefix); CBOR_ENC(mapEncoder, "sample_name", message.sample_name); @@ -276,7 +276,7 @@ inline void CBOR_ENC_USER_DATA(CborEncoder &encoder, const StartMessage& message CBOR_ENC(mapEncoder, "source_name_short", message.source_name_short); CBOR_ENC(mapEncoder, "instrument_name", message.instrument_name); CBOR_ENC(mapEncoder, "instrument_name_short", message.instrument_name_short); - + CBOR_ENC(mapEncoder, "rad_int_bin_number", message.rad_int_bin_number); cborErr(cbor_encoder_close_container(&encoder, &mapEncoder)); } diff --git a/frame_serialize/StartMessage.h b/frame_serialize/StartMessage.h index ea25d4c9..f8129b08 100644 --- a/frame_serialize/StartMessage.h +++ b/frame_serialize/StartMessage.h @@ -74,6 +74,8 @@ struct StartMessage { std::string source_name_short; std::string instrument_name; std::string instrument_name_short; + + uint64_t rad_int_bin_number; }; #endif //JUNGFRAUJOCH_STARTMESSAGE_H diff --git a/receiver/JFJochReceiver.cpp b/receiver/JFJochReceiver.cpp index 0be888e0..965aba6c 100644 --- a/receiver/JFJochReceiver.cpp +++ b/receiver/JFJochReceiver.cpp @@ -137,9 +137,15 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings, StartMessage message{}; experiment.FillMessage(message); message.arm_date = time_UTC(std::chrono::system_clock::now()); - if (calib) { + + if (calib) message.pixel_mask["sc0"] = calib->CalculateNexusMask(experiment, 0); - } + + if (rad_int_mapping) + message.rad_int_bin_number = rad_int_mapping->GetBinNumber(); + else + message.rad_int_bin_number = 0; + image_pusher.StartDataCollection(message); } diff --git a/tests/CBORTest.cpp b/tests/CBORTest.cpp index 2992c953..1fef64b5 100644 --- a/tests/CBORTest.cpp +++ b/tests/CBORTest.cpp @@ -53,7 +53,8 @@ TEST_CASE("CBORSerialize_Start", "[CBOR]") { .source_name = "Swiss Light Source", .source_name_short = "SLS", .instrument_name = "X06SA", - .instrument_name_short = "PXIII" + .instrument_name_short = "PXIII", + .rad_int_bin_number = 35 }; message.pixel_mask["sc0"] = std::vector(456*457, 15); @@ -106,6 +107,7 @@ TEST_CASE("CBORSerialize_Start", "[CBOR]") { CHECK(output_message.source_name_short == message.source_name_short); CHECK(output_message.instrument_name == message.instrument_name); CHECK(output_message.instrument_name_short == message.instrument_name_short); + CHECK(output_message.rad_int_bin_number == message.rad_int_bin_number); for (int i = 0; i < 3; i++) CHECK(output_message.detector_translation[i] == message.detector_translation[i]); diff --git a/tests/HDF5WritingTest.cpp b/tests/HDF5WritingTest.cpp index 47547314..d6df70d2 100644 --- a/tests/HDF5WritingTest.cpp +++ b/tests/HDF5WritingTest.cpp @@ -150,8 +150,13 @@ TEST_CASE("HDF5Writer", "[HDF5][Full]") { std::vector image(x.GetPixelsNum()); for (int i = 0; i < x.GetImageNum(); i++) { - REQUIRE_NOTHROW(file_set.Write((const char *) image.data(), x.GetPixelsNum() * x.GetPixelDepth(), - spots, i)); + DataMessage message{}; + message.image.data = (uint8_t *) image.data(); + message.image.size = x.GetPixelsNum() * x.GetPixelDepth(); + message.spots = spots; + message.number = i; + + REQUIRE_NOTHROW(file_set.Write(message)); } } // No leftover HDF5 objects @@ -176,8 +181,13 @@ TEST_CASE("HDF5Writer_Spots", "[HDF5][Full]") { std::vector image(x.GetPixelsNum()); for (int i = 0; i < x.GetImageNum(); i++) { - REQUIRE_NOTHROW(file_set.Write((const char *) image.data(), x.GetPixelsNum() * x.GetPixelDepth(), - spots, i)); + DataMessage message{}; + message.image.data = (uint8_t *) image.data(); + message.image.size = x.GetPixelsNum() * x.GetPixelDepth(); + message.spots = spots; + message.number = i; + + REQUIRE_NOTHROW(file_set.Write(message)); } } // No leftover HDF5 objects @@ -204,8 +214,13 @@ TEST_CASE("HDF5Writer_VDS", "[HDF5][Full]") { for (int i = 0; i < x.GetImageNum(); i++) { for (auto &j: image) j = i; - REQUIRE_NOTHROW(writer.Write((const char *) image.data(), x.GetPixelsNum() * x.GetPixelDepth(), - spots, i)); + DataMessage message{}; + message.image.data = (uint8_t *) image.data(); + message.image.size = x.GetPixelsNum() * x.GetPixelDepth(); + message.spots = spots; + message.number = i; + + REQUIRE_NOTHROW(writer.Write(message)); } REQUIRE_NOTHROW(HDF5Metadata::NXmx(start_message, end_message)); @@ -256,8 +271,13 @@ TEST_CASE("HDF5Writer_VDS_missing", "[HDF5][Full]") { for (int i = 0; i < x.GetImageNum() - 1; i++) { for (auto &j: image) j = i; - REQUIRE_NOTHROW(writer.Write((const char *) image.data(), x.GetPixelsNum() * x.GetPixelDepth(), - spots, i)); + DataMessage message{}; + message.image.data = (uint8_t *) image.data(); + message.image.size = x.GetPixelsNum() * x.GetPixelDepth(); + message.spots = spots; + message.number = i; + + REQUIRE_NOTHROW(writer.Write(message)); } REQUIRE_NOTHROW(HDF5Metadata::NXmx(start_message, end_message)); diff --git a/tests/ZMQImagePusherTest.cpp b/tests/ZMQImagePusherTest.cpp index 9c95ca4f..94071b25 100644 --- a/tests/ZMQImagePusherTest.cpp +++ b/tests/ZMQImagePusherTest.cpp @@ -23,23 +23,23 @@ void test_puller(ZMQImagePuller *puller, puller->WaitForImage(); while (puller->GetFrameType() != JFJochFrameDeserializer::Type::END) { if (puller->GetFrameType() == JFJochFrameDeserializer::Type::IMAGE) { - - if ((nwriter > 1) && (puller->GetImageNumber() % nwriter != writer_id)) + auto image = puller->GetDataMessage(); + if ((nwriter > 1) && (image.number % nwriter != writer_id)) diff_split[writer_id]++; - auto image_array = puller->GetImage(); - if (image_array.size != x.GetPixelsNum() * sizeof(uint16_t)) + + if (image.image.size != x.GetPixelsNum() * sizeof(uint16_t)) diff_size[writer_id]++; - else if (memcmp(image_array.data, image1.data() + puller->GetImageNumber() * x.GetPixelsNum(), + else if (memcmp(image.image.data, image1.data() + image.number * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t)) != 0) diff_content[writer_id]++; - if (image_array.xpixel != RAW_MODULE_COLS) + if (image.image.xpixel != RAW_MODULE_COLS) diff_content[writer_id]++; - if (image_array.ypixel != RAW_MODULE_LINES) + if (image.image.ypixel != RAW_MODULE_LINES) diff_content[writer_id]++; - if (image_array.pixel_depth_bytes != 2) + if (image.image.pixel_depth_bytes != 2) diff_content[writer_id]++; - if (image_array.algorithm != CompressionAlgorithm::NO_COMPRESSION) + if (image.image.algorithm != CompressionAlgorithm::NO_COMPRESSION) diff_content[writer_id]++; nimages[writer_id]++; } diff --git a/tools/HDF5DatasetWriteTest.cpp b/tools/HDF5DatasetWriteTest.cpp index b13f87dd..3bcdbc70 100644 --- a/tools/HDF5DatasetWriteTest.cpp +++ b/tools/HDF5DatasetWriteTest.cpp @@ -114,7 +114,15 @@ int main(int argc, char **argv) { int64_t total_image_size = 0; for (int i = 0; i < nimages_out; i++) { std::this_thread::sleep_until(start_time + i * period_us); - fileset->Write(output[i % nimages].data(), output_size[i % nimages], spots, i); + + DataMessage message{}; + message.image.data = (uint8_t *) output[i % nimages].data(); + message.image.size = output_size[i % nimages]; + message.spots = spots; + message.number = i; + + fileset->Write(message); + total_image_size += output_size[i % nimages]; } diff --git a/writer/HDF5DataFile.cpp b/writer/HDF5DataFile.cpp index dc46a3ba..e0f1199c 100644 --- a/writer/HDF5DataFile.cpp +++ b/writer/HDF5DataFile.cpp @@ -1,8 +1,6 @@ // Copyright (2019-2022) Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-or-later -#include - #include "HDF5DataFile.h" #include "../compression/JFJochCompressor.h" @@ -36,6 +34,7 @@ HDF5DataFile::~HDF5DataFile() { group.SaveVector("peakXPosRaw", spot_x, dims, CompressionAlgorithm::BSHUF_LZ4); group.SaveVector("peakYPosRaw", spot_y, dims, CompressionAlgorithm::BSHUF_LZ4); group.SaveVector("peakTotalIntensity", spot_intensity, dims, CompressionAlgorithm::BSHUF_LZ4); + group.SaveVector("indexingResult", indexing_result); } data_file.reset(); @@ -48,46 +47,43 @@ void HDF5DataFile::CreateFile() { HDF5DataSpace data_space({1, ypixel, xpixel},{H5S_UNLIMITED, ypixel, xpixel}); data_set = std::make_unique(*data_file, "/entry/data/data", data_type, data_space, dcpl); + + spot_count.resize(1); + spot_x.resize(max_spots); + spot_y.resize(max_spots); + spot_intensity.resize(max_spots); + indexing_result.resize(1); } -void HDF5DataFile::WriteImage(const void *data, size_t data_size, uint64_t image_number) { - if (image_number > max_image_number) { - max_image_number = image_number; - data_set->SetExtent({max_image_number+1, ypixel, xpixel}); - } - nimages++; - data_set->WriteDirectChunk(data, data_size, {image_number, 0, 0}); -} - -void HDF5DataFile::WriteSpots(const std::vector &spots, uint64_t image_number) { - std::unique_lock ul(spot_mutex); - - size_t cnt = std::min(spots.size(), max_spots); - if (spot_count.size() < image_number + 1) { - spot_count.resize(image_number + 1); - spot_x.resize(max_spots * (image_number + 1)); - spot_y.resize(max_spots * (image_number + 1)); - spot_intensity.resize(max_spots * (image_number + 1)); - } - - spot_count[image_number] = cnt; - - for (int i = 0; i < cnt; i++) { - spot_x[max_spots * image_number + i] = spots[i].x; - spot_y[max_spots * image_number + i] = spots[i].y; - spot_intensity[max_spots * image_number + i] = spots[i].intensity; - } -} - -void HDF5DataFile::Write(const void *data, size_t data_size, const std::vector &spots, - uint64_t image_number) { +void HDF5DataFile::Write(const DataMessage &msg, uint64_t image_number) { std::lock_guard lock(hdf5_mutex); if (!data_file) CreateFile(); - WriteImage(data, data_size, image_number); - if (!spots.empty()) - WriteSpots(spots, image_number); + if (image_number > max_image_number) { + max_image_number = image_number; + data_set->SetExtent({max_image_number+1, ypixel, xpixel}); + spot_count.resize(max_image_number + 1); + spot_x.resize(max_spots * (max_image_number + 1)); + spot_y.resize(max_spots * (max_image_number + 1)); + spot_intensity.resize(max_spots * (max_image_number + 1)); + indexing_result.resize(max_image_number + 1); + } + + nimages++; + data_set->WriteDirectChunk(msg.image.data, msg.image.size, {image_number, 0, 0}); + + size_t cnt = std::min(msg.spots.size(), max_spots); + + spot_count[image_number] = cnt; + + for (int i = 0; i < cnt; i++) { + spot_x[max_spots * image_number + i] = msg.spots[i].x; + spot_y[max_spots * image_number + i] = msg.spots[i].y; + spot_intensity[max_spots * image_number + i] = msg.spots[i].intensity; + } + + indexing_result[image_number] = msg.indexing_result; } size_t HDF5DataFile::GetMaxImageNumber() const { diff --git a/writer/HDF5DataFile.h b/writer/HDF5DataFile.h index a2785bac..09431d8f 100644 --- a/writer/HDF5DataFile.h +++ b/writer/HDF5DataFile.h @@ -10,6 +10,7 @@ #include "HDF5Objects.h" #include "../common/SpotToSave.h" +#include "../frame_serialize/ImageMessage.h" class HDF5DataFile { std::string filename; @@ -27,22 +28,21 @@ class HDF5DataFile { HDF5DataType data_type; std::string image_units; - std::mutex spot_mutex; std::vector spot_x; std::vector spot_y; std::vector spot_intensity; std::vector spot_count; const size_t max_spots; + std::vector indexing_result; + void CreateFile(); - void WriteImage(const void *data, size_t data_size, uint64_t image_number ); - void WriteSpots(const std::vector & spots, uint64_t image_number); public: HDF5DataFile(const std::string& name, int64_t width, int64_t height, int64_t pixel_depth_byte, bool is_signed, CompressionAlgorithm compression = CompressionAlgorithm::BSHUF_LZ4, size_t max_spots = 0); ~HDF5DataFile(); - void Write(const void *data, size_t data_size, const std::vector & spots, uint64_t image_number); + void Write(const DataMessage& msg, uint64_t image_number); size_t GetMaxImageNumber() const; size_t GetNumImages() const; std::string GetFilename() const; diff --git a/writer/HDF5Writer.cpp b/writer/HDF5Writer.cpp index 6129af59..a0d8a9a1 100644 --- a/writer/HDF5Writer.cpp +++ b/writer/HDF5Writer.cpp @@ -16,15 +16,15 @@ HDF5Writer::HDF5Writer(const StartMessage &request) request.max_spot_count); } -void HDF5Writer::Write(const void *data, size_t data_size, const std::vector& spots, int64_t image_number) { - if (image_number < 0) +void HDF5Writer::Write(const DataMessage& message) { + if (message.number < 0) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "No support for negative images"); if (files.empty()) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "No file to write"); - size_t file_number = image_number % files.size(); + size_t file_number = message.number % files.size(); // Ignore zero size images - if (data_size > 0) - files[file_number]->Write(data, data_size, spots, image_number / files.size()); + if (message.image.size > 0) + files[file_number]->Write(message, message.number / files.size()); } diff --git a/writer/HDF5Writer.h b/writer/HDF5Writer.h index f070c19c..10631081 100644 --- a/writer/HDF5Writer.h +++ b/writer/HDF5Writer.h @@ -8,13 +8,13 @@ #include "HDF5DataFile.h" #include "../frame_serialize/StartMessage.h" +#include "../frame_serialize/ImageMessage.h" class HDF5Writer { std::vector > files; public: explicit HDF5Writer(const StartMessage &request); - void Write(const void *data, size_t data_size, const std::vector& spots, - int64_t image_number); + void Write(const DataMessage& msg); }; #endif //JUNGFRAUJOCH_HDF5WRITER_H diff --git a/writer/StreamWriter.cpp b/writer/StreamWriter.cpp index 3f80190e..6c430a83 100644 --- a/writer/StreamWriter.cpp +++ b/writer/StreamWriter.cpp @@ -30,9 +30,8 @@ void StreamWriter::CollectImages() { HDF5Writer writer(start_message); image_puller.WaitForImage(); while (image_puller.GetFrameType() == JFJochFrameDeserializer::Type::IMAGE) { - auto image_array = image_puller.GetImage(); - writer.Write(image_array.data, image_array.size, - image_puller.GetSpots(), image_puller.GetImageNumber()); + auto image_array = image_puller.GetDataMessage(); + writer.Write(image_array); image_puller.WaitForImage(); } } diff --git a/writer/ZMQImagePuller.cpp b/writer/ZMQImagePuller.cpp index 8c27cf70..b783ce94 100644 --- a/writer/ZMQImagePuller.cpp +++ b/writer/ZMQImagePuller.cpp @@ -52,23 +52,9 @@ void ZMQImagePuller::WaitForImage() { } } -CBORImage ZMQImagePuller::GetImage() const { +const DataMessage &ZMQImagePuller::GetDataMessage() const { if (deserialized_image_message) - return deserialized_image_message->image; - else - throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Image message not received so far"); -} - -int64_t ZMQImagePuller::GetImageNumber() const { - if (deserialized_image_message) - return deserialized_image_message->number; - else - throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Image message not received so far"); -} - -const std::vector& ZMQImagePuller::GetSpots() const { - if (deserialized_image_message) - return deserialized_image_message->spots; + return *deserialized_image_message; else throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Image message not received so far"); } diff --git a/writer/ZMQImagePuller.h b/writer/ZMQImagePuller.h index 4503d989..6ad5851a 100644 --- a/writer/ZMQImagePuller.h +++ b/writer/ZMQImagePuller.h @@ -43,9 +43,7 @@ public: void Abort(); void WaitForImage(); - [[nodiscard]] CBORImage GetImage() const; - [[nodiscard]] int64_t GetImageNumber() const; - [[nodiscard]] const std::vector& GetSpots() const; + const DataMessage &GetDataMessage() const; [[nodiscard]] JFJochFrameDeserializer::Type GetFrameType() const; [[nodiscard]] ZMQImagePullerStatistics GetStatistics(); [[nodiscard]] StartMessage GetStartMessage() const;