From f1935526a7eae19412a9d6ab27f66dcaf5070e7e Mon Sep 17 00:00:00 2001 From: leonarski_f Date: Mon, 11 Dec 2023 06:49:24 +0100 Subject: [PATCH] Generalized serializer --- README.md | 1 + broker/JFJochBrokerParser.h | 2 +- broker/jfjoch_broker.cpp | 6 +- common/CMakeLists.txt | 19 +-- common/DiffractionExperiment.h | 2 +- common/FrameTransformation.cpp | 27 ++- common/FrameTransformation.h | 6 +- common/ImagePusher.h | 29 ---- common/ZMQImagePusher.h | 30 ---- common/ZMQPreviewPublisher.cpp | 30 ---- common/ZMQWrappers.h | 4 + compression/JFJochCompressor.cpp | 27 ++- compression/JFJochCompressor.h | 9 +- compression/JFJochDecompress.h | 10 +- ...alizer.cpp => CBORStream2Deserializer.cpp} | 58 +++---- ...serializer.h => CBORStream2Deserializer.h} | 10 +- ...rializer.cpp => CBORStream2Serializer.cpp} | 44 +---- ...meSerializer.h => CBORStream2Serializer.h} | 15 +- frame_serialize/CMakeLists.txt | 20 ++- {common => frame_serialize}/ImagePusher.cpp | 0 frame_serialize/ImagePusher.h | 27 +++ .../{CBORMessages.h => JFJochMessages.h} | 18 +- .../TestImagePusher.cpp | 35 +--- {common => frame_serialize}/TestImagePusher.h | 10 +- frame_serialize/ZMQBsreadImagePusher.cpp | 81 +++++++++ frame_serialize/ZMQBsreadImagePusher.h | 24 +++ frame_serialize/ZMQPreviewPublisher.cpp | 51 ++++++ .../ZMQPreviewPublisher.h | 12 +- .../ZMQStream2Pusher.cpp | 51 +++--- frame_serialize/ZMQStream2Pusher.h | 28 +++ receiver/CMakeLists.txt | 2 +- receiver/JFJochReceiver.cpp | 65 ++----- receiver/JFJochReceiver.h | 14 +- receiver/JFJochReceiverService.cpp | 3 +- receiver/JFJochReceiverTest.cpp | 4 +- tests/CBORTest.cpp | 159 ++++++------------ tests/FrameTransformationTest.cpp | 81 +++++---- tests/JFJochReceiverIntegrationTest.cpp | 2 +- tests/StreamWriterTest.cpp | 4 +- tests/ZMQImagePusherTest.cpp | 40 ++--- tests/ZMQPreviewPublisherTest.cpp | 4 +- tools/CompressionBenchmark.cpp | 9 +- tools/HDF5DatasetWriteTest.cpp | 9 +- tools/jfjoch_writer_test.cpp | 20 +-- writer/CMakeLists.txt | 2 +- writer/HDF5DataFile.h | 2 +- writer/HDF5NXmx.cpp | 2 +- writer/HDF5NXmx.h | 4 +- writer/HDF5Writer.h | 2 +- writer/StreamWriter.cpp | 6 +- writer/ZMQImagePuller.cpp | 10 +- writer/ZMQImagePuller.h | 6 +- 52 files changed, 579 insertions(+), 557 deletions(-) delete mode 100644 common/ImagePusher.h delete mode 100644 common/ZMQImagePusher.h delete mode 100644 common/ZMQPreviewPublisher.cpp rename frame_serialize/{JFJochFrameDeserializer.cpp => CBORStream2Deserializer.cpp} (94%) rename frame_serialize/{JFJochFrameDeserializer.h => CBORStream2Deserializer.h} (89%) rename frame_serialize/{JFJochFrameSerializer.cpp => CBORStream2Serializer.cpp} (94%) rename frame_serialize/{JFJochFrameSerializer.h => CBORStream2Serializer.h} (53%) rename {common => frame_serialize}/ImagePusher.cpp (100%) create mode 100644 frame_serialize/ImagePusher.h rename frame_serialize/{CBORMessages.h => JFJochMessages.h} (90%) rename {common => frame_serialize}/TestImagePusher.cpp (80%) rename {common => frame_serialize}/TestImagePusher.h (72%) create mode 100644 frame_serialize/ZMQBsreadImagePusher.cpp create mode 100644 frame_serialize/ZMQBsreadImagePusher.h create mode 100644 frame_serialize/ZMQPreviewPublisher.cpp rename {common => frame_serialize}/ZMQPreviewPublisher.h (59%) rename common/ZMQImagePusher.cpp => frame_serialize/ZMQStream2Pusher.cpp (57%) create mode 100644 frame_serialize/ZMQStream2Pusher.h diff --git a/README.md b/README.md index b066a539..a63f5837 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ Required: * HDF5 library version 1.10 or newer * ZeroMQ library * Google Remote Procedure Call (gRPC) - see notes below +* OpenSSL library Optional: * CUDA compiler version 11 or newer - image analysis features won't work without it diff --git a/broker/JFJochBrokerParser.h b/broker/JFJochBrokerParser.h index c8b625f4..2122dee6 100644 --- a/broker/JFJochBrokerParser.h +++ b/broker/JFJochBrokerParser.h @@ -7,7 +7,7 @@ #include "../common/DiffractionExperiment.h" #include "JFJochBroker.h" #include "../acquisition_device/AcquisitionDeviceGroup.h" -#include "../common/ZMQPreviewPublisher.h" +#include "../frame_serialize/ZMQPreviewPublisher.h" DetectorGeometry ParseStandardDetectorGeometry(const nlohmann::json &j); DetectorGeometry ParseCustomDetectorGeometry(const nlohmann::json &j); diff --git a/broker/jfjoch_broker.cpp b/broker/jfjoch_broker.cpp index 81a1668b..9659b070 100644 --- a/broker/jfjoch_broker.cpp +++ b/broker/jfjoch_broker.cpp @@ -10,7 +10,7 @@ #include "../grpc/gRPCServer_Template.h" #include "JFJochBrokerParser.h" -#include "../common/ZMQImagePusher.h" +#include "../frame_serialize/ZMQStream2Pusher.h" int main (int argc, char **argv) { if (argc > 3) { @@ -35,7 +35,7 @@ int main (int argc, char **argv) { } std::unique_ptr receiver; - std::unique_ptr image_pusher; + std::unique_ptr image_pusher; std::unique_ptr preview_publisher; ZMQContext context; @@ -47,7 +47,7 @@ int main (int argc, char **argv) { ParseAcquisitionDeviceGroup(input, "devices", aq_devices); if (aq_devices.size() > 0) { experiment.DataStreams(aq_devices.size()); - image_pusher = std::make_unique(context, ParseStringArray(input, "zmq_image_addr")); + image_pusher = std::make_unique(context, ParseStringArray(input, "zmq_image_addr")); receiver = std::make_unique(aq_devices, logger, *image_pusher); std::string zmq_preview_addr = ParseString(input, "zmq_preview_addr"); diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 702f4991..9ba72c34 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -30,31 +30,24 @@ ADD_LIBRARY( CommonFunctions STATIC Definitions.h ${CMAKE_CURRENT_BINARY_DIR}/GitInfo.cpp GitInfo.h FrameTransformation.cpp FrameTransformation.h - ZMQWrappers.cpp ZMQWrappers.h ThreadSafeFIFO.h - ZMQPreviewPublisher.cpp ZMQPreviewPublisher.h - ZMQImagePusher.cpp ZMQImagePusher.h DiffractionSpot.cpp DiffractionSpot.h StatusVector.h - ImagePusher.cpp ImagePusher.h - TestImagePusher.cpp TestImagePusher.h SpotToSave.h NetworkAddressConvert.h NetworkAddressConvert.cpp to_fixed.h DetectorGeometry.cpp DetectorGeometry.h DetectorModuleGeometry.cpp DetectorModuleGeometry.h DetectorSetup.h DetectorSetup.cpp ZeroCopyReturnValue.h Histogram.h DiffractionGeometry.h - CUDAWrapper.cpp - CUDAWrapper.h - NUMAHWPolicy.cpp - NUMAHWPolicy.h - ADUHistogram.cpp - ADUHistogram.h + CUDAWrapper.cpp CUDAWrapper.h + NUMAHWPolicy.cpp NUMAHWPolicy.h + ADUHistogram.cpp ADUHistogram.h RawToConvertedGeometryCore.h Plot.h - DeviceOutput.h) + DeviceOutput.h + ZMQWrappers.cpp ZMQWrappers.h) -TARGET_LINK_LIBRARIES(CommonFunctions Compression FrameSerialize libzmq JFCalibration -lrt) +TARGET_LINK_LIBRARIES(CommonFunctions Compression JFCalibration libzmq -lrt) IF (CMAKE_CUDA_COMPILER) TARGET_SOURCES(CommonFunctions PRIVATE CUDAWrapper.cu ) diff --git a/common/DiffractionExperiment.h b/common/DiffractionExperiment.h index 0b874fdb..df30f7e7 100644 --- a/common/DiffractionExperiment.h +++ b/common/DiffractionExperiment.h @@ -12,7 +12,7 @@ #include "UnitCell.h" #include "Coord.h" #include "Definitions.h" -#include "../frame_serialize/CBORMessages.h" +#include "../frame_serialize/JFJochMessages.h" #include "DetectorSetup.h" #include "../image_analysis/DataProcessingSettings.h" diff --git a/common/FrameTransformation.cpp b/common/FrameTransformation.cpp index 1cb4c728..b17723d9 100644 --- a/common/FrameTransformation.cpp +++ b/common/FrameTransformation.cpp @@ -34,9 +34,28 @@ FrameTransformation::FrameTransformation(const DiffractionExperiment &in_experim } } -size_t FrameTransformation::SaveCompressedImage(void *output) { - return compressor.Compress((char *) output, precompression_buffer.data(), - experiment.GetPixelsNum(), pixel_depth); +CompressedImage FrameTransformation::GetCompressedImage() { + CompressedImage image{}; + if (experiment.GetCompressionAlgorithm() == CompressionAlgorithm::NO_COMPRESSION) { + image.data = (uint8_t *) precompression_buffer.data(); + image.size = experiment.GetPixelsNum() * experiment.GetPixelDepth(); + } else { + compressed_buffer.resize(MaxCompressedSize(experiment.GetCompressionAlgorithm(), + experiment.GetPixelsNum(), + experiment.GetPixelDepth())); + image.data = (uint8_t *) compressed_buffer.data(); + image.size = compressor.Compress(compressed_buffer.data(), precompression_buffer.data(), + experiment.GetPixelsNum(), experiment.GetPixelDepth()); + + } + image.xpixel = experiment.GetXPixelsNum(); + image.ypixel = experiment.GetYPixelsNum(); + image.algorithm = experiment.GetCompressionAlgorithm(); + image.pixel_depth_bytes = experiment.GetPixelDepth(); + image.pixel_is_signed = pixel_signed; + image.pixel_is_float = false; + image.channel = "default"; + return image; } void FrameTransformation::ProcessModule(const void *input, uint16_t module_number, int data_stream) { @@ -80,6 +99,6 @@ void FrameTransformation::ProcessModule(const void *input, uint16_t module_numbe } } -const void *FrameTransformation::GetPreviewImage() const { +const void *FrameTransformation::GetImage() const { return precompression_buffer.data(); } diff --git a/common/FrameTransformation.h b/common/FrameTransformation.h index cc4d8ecf..4f0b2d9e 100644 --- a/common/FrameTransformation.h +++ b/common/FrameTransformation.h @@ -5,19 +5,21 @@ #include "DiffractionExperiment.h" #include "../compression/JFJochCompressor.h" +#include "../frame_serialize/JFJochMessages.h" class FrameTransformation { const DiffractionExperiment& experiment; JFJochBitShuffleCompressor compressor; std::vector precompression_buffer; + std::vector compressed_buffer; const bool pixel_signed; const size_t pixel_depth; public: explicit FrameTransformation(const DiffractionExperiment &experiment); void ProcessModule(const void *input, uint16_t module_number, int data_stream); - size_t SaveCompressedImage(void *output); - const void *GetPreviewImage() const; + CompressedImage GetCompressedImage(); + const void *GetImage() const; }; #endif //JUNGFRAUJOCH_FRAMETRANSFORMATION_H diff --git a/common/ImagePusher.h b/common/ImagePusher.h deleted file mode 100644 index d5c27497..00000000 --- a/common/ImagePusher.h +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright (2019-2023) Paul Scherrer Institute - -#ifndef JUNGFRAUJOCH_IMAGEPUSHER_H -#define JUNGFRAUJOCH_IMAGEPUSHER_H - -#include -#include - -#include "DiffractionExperiment.h" -#include "DiffractionSpot.h" -#include "../frame_serialize/JFJochFrameSerializer.h" -#include "../frame_serialize/CBORMessages.h" -#include "ZeroCopyReturnValue.h" - -void PrepareCBORImage(DataMessage& message, - const DiffractionExperiment &experiment, - void *image, size_t image_size); - -class ImagePusher { -public: - virtual void StartDataCollection(const uint8_t *image_data, size_t image_size, int64_t data_file_count) = 0; - virtual void EndDataCollection(const EndMessage& message) = 0; - virtual void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) = 0; - virtual void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, - ZeroCopyReturnValue *z) = 0; -}; - - -#endif //JUNGFRAUJOCH_IMAGEPUSHER_H diff --git a/common/ZMQImagePusher.h b/common/ZMQImagePusher.h deleted file mode 100644 index 4db04990..00000000 --- a/common/ZMQImagePusher.h +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright (2019-2023) Paul Scherrer Institute - -#ifndef JUNGFRAUJOCH_ZMQIMAGEPUSHER_H -#define JUNGFRAUJOCH_ZMQIMAGEPUSHER_H - -#include "ImagePusher.h" -#include "ThreadSafeFIFO.h" -#include "ZMQWrappers.h" -#include "DiffractionSpot.h" -#include "../frame_serialize/JFJochFrameSerializer.h" - -class ZMQImagePusher : public ImagePusher { - std::vector> contexts; - std::vector> sockets; - int64_t file_count = 1; -public: - ZMQImagePusher(ZMQContext &context, const std::vector& addr, - int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1); - // High performance implementation, where each socket has dedicated ZMQ context - explicit ZMQImagePusher(const std::vector& addr, - int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1); - - void StartDataCollection(const uint8_t *image_data, size_t image_size, int64_t data_file_count) override; - void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override; - void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, - ZeroCopyReturnValue *z) override; - void EndDataCollection(const EndMessage& message) override; -}; - -#endif //JUNGFRAUJOCH_ZMQIMAGEPUSHER_H diff --git a/common/ZMQPreviewPublisher.cpp b/common/ZMQPreviewPublisher.cpp deleted file mode 100644 index 768dc03d..00000000 --- a/common/ZMQPreviewPublisher.cpp +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright (2019-2023) Paul Scherrer Institute - -#include "ZMQPreviewPublisher.h" - -ZMQPreviewPublisher::ZMQPreviewPublisher(ZMQContext& context, const std::string& addr) : - socket(context, ZMQSocketType::Pub) { - socket.SendWaterMark(2).NoLinger(); - socket.Bind(addr); -} - -void ZMQPreviewPublisher::StartDataCollection(const uint8_t *image_data, size_t image_size, int64_t preview_stride) { - { - std::unique_lock ul(m); - stride = preview_stride; - current_part = -1; - } - socket.Send(image_data, image_size); -} - -void ZMQPreviewPublisher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) { - { - std::unique_lock ul(m); - int64_t part = image_number / stride; - if (current_part >= part) - return; - else - current_part = part; - } - socket.Send(image_data, image_size); -} diff --git a/common/ZMQWrappers.h b/common/ZMQWrappers.h index 41597d29..bd89ce26 100644 --- a/common/ZMQWrappers.h +++ b/common/ZMQWrappers.h @@ -1,5 +1,9 @@ // Copyright (2019-2023) Paul Scherrer Institute +// Copyright (2019-2023) Paul Scherrer Institute + +// Copyright (2019-2023) Paul Scherrer Institute + #ifndef JUNGFRAUJOCH_ZMQWRAPPERS_H #define JUNGFRAUJOCH_ZMQWRAPPERS_H diff --git a/compression/JFJochCompressor.cpp b/compression/JFJochCompressor.cpp index b9efa8c0..1a99776b 100644 --- a/compression/JFJochCompressor.cpp +++ b/compression/JFJochCompressor.cpp @@ -56,7 +56,17 @@ size_t JFJochBitShuffleCompressor::CompressBlock(char *dest, const char *source, return compressed_size + 4; } -size_t JFJochBitShuffleCompressor::Compress(char *dest, const char *source, size_t nelements, size_t elem_size) { +std::vector JFJochBitShuffleCompressor::Compress(const void *source, size_t nelements, size_t elem_size) { + std::vector tmp(MaxCompressedSize(algorithm, nelements, elem_size)); + size_t tmp_size = Compress(tmp.data(), source, nelements, elem_size); + tmp.resize(tmp_size); + return tmp; +} + +size_t JFJochBitShuffleCompressor::Compress(void *dest, const void *source, size_t nelements, size_t elem_size) { + auto c_dest = (char *) dest; + auto c_source = (char *) source; + static_assert(DefaultBlockSize % BSHUF_BLOCKED_MULT == 0, "Block size must be multiple of 8"); if (algorithm == CompressionAlgorithm::NO_COMPRESSION) { @@ -65,8 +75,8 @@ size_t JFJochBitShuffleCompressor::Compress(char *dest, const char *source, size return nelements * elem_size; } - bshuf_write_uint64_BE(dest, nelements * elem_size); - bshuf_write_uint32_BE(dest + 8, DefaultBlockSize * elem_size); + bshuf_write_uint64_BE(c_dest, nelements * elem_size); + bshuf_write_uint32_BE(c_dest + 8, DefaultBlockSize * elem_size); if (tmp_space.size() < DefaultBlockSize * elem_size) tmp_space.resize(DefaultBlockSize * elem_size); @@ -75,18 +85,19 @@ size_t JFJochBitShuffleCompressor::Compress(char *dest, const char *source, size size_t reminder_size = nelements - num_full_blocks * DefaultBlockSize; size_t compressed_size = 12; + for (int i = 0; i < num_full_blocks; i++) - compressed_size += CompressBlock(dest + compressed_size, - source + i * DefaultBlockSize * elem_size, DefaultBlockSize, elem_size); + compressed_size += CompressBlock(c_dest + compressed_size, + c_source + i * DefaultBlockSize * elem_size, DefaultBlockSize, elem_size); size_t last_block_size = reminder_size - reminder_size % BSHUF_BLOCKED_MULT; if (last_block_size > 0) - compressed_size += CompressBlock(dest + compressed_size, - source + num_full_blocks * DefaultBlockSize * elem_size, last_block_size, elem_size); + compressed_size += CompressBlock(c_dest + compressed_size, + c_source + num_full_blocks * DefaultBlockSize * elem_size, last_block_size, elem_size); size_t leftover_bytes = (reminder_size % BSHUF_BLOCKED_MULT) * elem_size; if (leftover_bytes > 0) { - memcpy(dest + compressed_size, source + (num_full_blocks * DefaultBlockSize + last_block_size) * elem_size, leftover_bytes); + memcpy(c_dest + compressed_size, c_source + (num_full_blocks * DefaultBlockSize + last_block_size) * elem_size, leftover_bytes); compressed_size += leftover_bytes; } return compressed_size; diff --git a/compression/JFJochCompressor.h b/compression/JFJochCompressor.h index 7da600a7..664a206f 100644 --- a/compression/JFJochCompressor.h +++ b/compression/JFJochCompressor.h @@ -31,13 +31,10 @@ public: template std::vector Compress(const std::vector &src) { - std::vector tmp(MaxCompressedSize(algorithm, src.size(), sizeof(T))); - size_t tmp_size = Compress(tmp.data(), src); - tmp.resize(tmp_size); - return tmp; + return Compress(src.data(), src.size(), sizeof(T)); } - - size_t Compress(char *dest, const char* source, size_t nelements, size_t elem_size); + std::vector Compress(const void* source, size_t nelements, size_t elem_size); + size_t Compress(void *dest, const void* source, size_t nelements, size_t elem_size); private: char scratch[DefaultBlockSize * sizeof(uint64_t)]; }; diff --git a/compression/JFJochDecompress.h b/compression/JFJochDecompress.h index f6424992..b7f56938 100644 --- a/compression/JFJochDecompress.h +++ b/compression/JFJochDecompress.h @@ -20,10 +20,15 @@ extern "C" { template void JFJochDecompress(std::vector &output, CompressionAlgorithm algorithm, std::vector source_v, size_t nelements) { + JFJochDecompress(output, algorithm, source_v.data(), source_v.size() * sizeof(Ts), nelements); +} + +template +void JFJochDecompress(std::vector &output, CompressionAlgorithm algorithm, Ts *source_v, size_t source_size, + size_t nelements) { size_t elem_size = sizeof(Td); output.resize(nelements * elem_size); - size_t source_size = source_v.size() * sizeof(Ts); - auto source = (uint8_t *) source_v.data(); + auto source = (uint8_t *) source_v; size_t block_size; if (algorithm != CompressionAlgorithm::NO_COMPRESSION) { @@ -55,5 +60,4 @@ void JFJochDecompress(std::vector &output, CompressionAlgorithm algorithm, s } } - #endif //JUNGFRAUJOCH_JFJOCHDECOMPRESS_H diff --git a/frame_serialize/JFJochFrameDeserializer.cpp b/frame_serialize/CBORStream2Deserializer.cpp similarity index 94% rename from frame_serialize/JFJochFrameDeserializer.cpp rename to frame_serialize/CBORStream2Deserializer.cpp index cfb5b7f2..70f01b0a 100644 --- a/frame_serialize/JFJochFrameDeserializer.cpp +++ b/frame_serialize/CBORStream2Deserializer.cpp @@ -1,6 +1,6 @@ // Copyright (2019-2023) Paul Scherrer Institute -#include "JFJochFrameDeserializer.h" +#include "CBORStream2Deserializer.h" #include "tinycbor/src/cbor.h" #include "CborErr.h" #include "CborUtil.h" @@ -185,7 +185,7 @@ inline void GetCBORUInt64Array(CborValue &value, std::vector &v) { memcpy(v.data(), ptr, len); } -inline void GetCBORTypedArray(CBORImage &v, CborValue &value) { +inline void GetCBORTypedArray(CompressedImage &v, CborValue &value) { CborTag tag = GetCBORTag(value); switch (tag) { @@ -264,7 +264,7 @@ inline void GetCBORTypedArray(CBORImage &v, CborValue &value) { throw JFJochException(JFJochExceptionCategory::CBORError, "Byte string or compressed array expected"); } -void GetCBORMultidimTypedArray(CBORImage &v, CborValue &value) { +void GetCBORMultidimTypedArray(CompressedImage &v, CborValue &value) { if (GetCBORTag(value) != TagMultiDimArray) throw JFJochException(JFJochExceptionCategory::CBORError, "Multidim array expected"); @@ -294,7 +294,7 @@ bool CheckMagicNumber(CborValue &v) { } } -void JFJochFrameDeserializer::GetCBORSpots(CborValue &value) { +void CBORStream2Deserializer::GetCBORSpots(CborValue &value) { size_t array_len = GetCBORArrayLen(value); if (array_len % 4 != 0) @@ -315,7 +315,7 @@ void JFJochFrameDeserializer::GetCBORSpots(CborValue &value) { cborErr(cbor_value_leave_container(&value, &array_value)); } -void JFJochFrameDeserializer::DecodeType(CborValue &value) { +void CBORStream2Deserializer::DecodeType(CborValue &value) { if (cbor_value_at_end(&value)) throw JFJochException(JFJochExceptionCategory::CBORError, "Message empty"); auto key = GetCBORString(value); @@ -337,7 +337,7 @@ void JFJochFrameDeserializer::DecodeType(CborValue &value) { throw JFJochException(JFJochExceptionCategory::CBORError, "Unknown message type"); } -void JFJochFrameDeserializer::ProcessGoniometerMap(CborValue &value) { +void CBORStream2Deserializer::ProcessGoniometerMap(CborValue &value) { CborValue map_value; cborErr(cbor_value_enter_container(&value, &map_value)); @@ -348,7 +348,7 @@ void JFJochFrameDeserializer::ProcessGoniometerMap(CborValue &value) { cborErr(cbor_value_leave_container(&value, &map_value)); } -GoniometerAxis JFJochFrameDeserializer::ProcessGoniometer(CborValue &value) { +GoniometerAxis CBORStream2Deserializer::ProcessGoniometer(CborValue &value) { CborValue map_value; GoniometerAxis ret{}; @@ -366,7 +366,7 @@ GoniometerAxis JFJochFrameDeserializer::ProcessGoniometer(CborValue &value) { return ret; } -void JFJochFrameDeserializer::ProcessDetTranslation(CborValue &value) { +void CBORStream2Deserializer::ProcessDetTranslation(CborValue &value) { if (GetCBORArrayLen(value) != 3) throw JFJochException(JFJochExceptionCategory::CBORError, "Array with 3 floats expected"); CborValue array_value; @@ -377,7 +377,7 @@ void JFJochFrameDeserializer::ProcessDetTranslation(CborValue &value) { cborErr(cbor_value_leave_container(&value, &array_value)); } -void JFJochFrameDeserializer::ProcessImageMessageUserDataElement(CborValue &value) { +void CBORStream2Deserializer::ProcessImageMessageUserDataElement(CborValue &value) { CborValue map_value; cborErr(cbor_value_enter_container(&value, &map_value)); @@ -413,7 +413,7 @@ void JFJochFrameDeserializer::ProcessImageMessageUserDataElement(CborValue &valu cborErr(cbor_value_leave_container(&value, &map_value)); } -bool JFJochFrameDeserializer::ProcessImageMessageElement(CborValue &value) { +bool CBORStream2Deserializer::ProcessImageMessageElement(CborValue &value) { if (cbor_value_at_end(&value)) return false; else { @@ -443,12 +443,12 @@ bool JFJochFrameDeserializer::ProcessImageMessageElement(CborValue &value) { } } -void JFJochFrameDeserializer::ProcessCalibration(CborValue &value) { +void CBORStream2Deserializer::ProcessCalibration(CborValue &value) { CborValue map_value; cborErr(cbor_value_enter_container(&value, &map_value)); while (! cbor_value_at_end(&map_value)) { auto key = GetCBORString(map_value); - CBORImage image; + CompressedImage image; image.channel = key; GetCBORMultidimTypedArray(image, map_value); start_message.calibration.push_back(image); @@ -457,12 +457,12 @@ void JFJochFrameDeserializer::ProcessCalibration(CborValue &value) { } -void JFJochFrameDeserializer::ProcessPixelMaskElement(CborValue &value) { +void CBORStream2Deserializer::ProcessPixelMaskElement(CborValue &value) { CborValue map_value; cborErr(cbor_value_enter_container(&value, &map_value)); while (! cbor_value_at_end(&map_value)) { auto key = GetCBORString(map_value); - CBORImage image; + CompressedImage image; image.channel = key; GetCBORMultidimTypedArray(image, map_value); start_message.pixel_mask.push_back(image); @@ -470,7 +470,7 @@ void JFJochFrameDeserializer::ProcessPixelMaskElement(CborValue &value) { cborErr(cbor_value_leave_container(&value, &map_value)); } -void JFJochFrameDeserializer::ProcessRadIntResultElement(CborValue &value) { +void CBORStream2Deserializer::ProcessRadIntResultElement(CborValue &value) { CborValue map_value; cborErr(cbor_value_enter_container(&value, &map_value)); while (! cbor_value_at_end(&map_value)) { @@ -482,7 +482,7 @@ void JFJochFrameDeserializer::ProcessRadIntResultElement(CborValue &value) { cborErr(cbor_value_leave_container(&value, &map_value)); } -void JFJochFrameDeserializer::ProcessADUHistogramElement(CborValue &value) { +void CBORStream2Deserializer::ProcessADUHistogramElement(CborValue &value) { CborValue map_value; cborErr(cbor_value_enter_container(&value, &map_value)); while (! cbor_value_at_end(&map_value)) { @@ -494,7 +494,7 @@ void JFJochFrameDeserializer::ProcessADUHistogramElement(CborValue &value) { cborErr(cbor_value_leave_container(&value, &map_value)); } -void JFJochFrameDeserializer::ProcessChannels(CborValue &value) { +void CBORStream2Deserializer::ProcessChannels(CborValue &value) { if (!cbor_value_is_array(&value)) throw JFJochException(JFJochExceptionCategory::CBORError, "Array expected"); CborValue array_value; @@ -506,7 +506,7 @@ void JFJochFrameDeserializer::ProcessChannels(CborValue &value) { cborErr(cbor_value_leave_container(&value, &array_value)); } -void JFJochFrameDeserializer::ProcessUnitCellElement(CborValue &value) { +void CBORStream2Deserializer::ProcessUnitCellElement(CborValue &value) { CborValue map_value; cborErr(cbor_value_enter_container(&value, &map_value)); while (! cbor_value_at_end(&map_value)) { @@ -530,7 +530,7 @@ void JFJochFrameDeserializer::ProcessUnitCellElement(CborValue &value) { cborErr(cbor_value_leave_container(&value, &map_value)); } -void JFJochFrameDeserializer::ProcessStartMessageUserDataElement(CborValue &value) { +void CBORStream2Deserializer::ProcessStartMessageUserDataElement(CborValue &value) { CborValue map_value; cborErr(cbor_value_enter_container(&value, &map_value)); @@ -600,7 +600,7 @@ void JFJochFrameDeserializer::ProcessStartMessageUserDataElement(CborValue &valu cborErr(cbor_value_leave_container(&value, &map_value)); } -bool JFJochFrameDeserializer::ProcessStartMessageElement(CborValue &value) { +bool CBORStream2Deserializer::ProcessStartMessageElement(CborValue &value) { if (cbor_value_at_end(&value)) return false; else { @@ -664,7 +664,7 @@ bool JFJochFrameDeserializer::ProcessStartMessageElement(CborValue &value) { } } -void JFJochFrameDeserializer::ProcessEndMessageUserDataElement(CborValue &value) { +void CBORStream2Deserializer::ProcessEndMessageUserDataElement(CborValue &value) { CborValue map_value; cborErr(cbor_value_enter_container(&value, &map_value)); @@ -695,7 +695,7 @@ void JFJochFrameDeserializer::ProcessEndMessageUserDataElement(CborValue &value) cborErr(cbor_value_leave_container(&value, &map_value)); } -bool JFJochFrameDeserializer::ProcessEndMessageElement(CborValue &value) { +bool CBORStream2Deserializer::ProcessEndMessageElement(CborValue &value) { if (cbor_value_at_end(&value)) return false; else { @@ -714,7 +714,7 @@ bool JFJochFrameDeserializer::ProcessEndMessageElement(CborValue &value) { } } -void JFJochFrameDeserializer::ProcessImageData(CborValue &value) { +void CBORStream2Deserializer::ProcessImageData(CborValue &value) { if (!cbor_value_is_map(&value)) throw JFJochException(JFJochExceptionCategory::CBORError, "Map expected"); @@ -732,11 +732,11 @@ void JFJochFrameDeserializer::ProcessImageData(CborValue &value) { cborErr(cbor_value_leave_container(&value, &map_value)); } -void JFJochFrameDeserializer::Process(const std::vector &buffer) { +void CBORStream2Deserializer::Process(const std::vector &buffer) { Process(buffer.data(), buffer.size()); } -void JFJochFrameDeserializer::Process(const uint8_t *msg, size_t msg_size) { +void CBORStream2Deserializer::Process(const uint8_t *msg, size_t msg_size) { std::unique_lock ul(m); data_message = DataMessage(); @@ -777,22 +777,22 @@ std::unique_lock ul(m); "Serialized frame must be map in top level"); } -DataMessage JFJochFrameDeserializer::GetDataMessage() const { +DataMessage CBORStream2Deserializer::GetDataMessage() const { std::unique_lock ul(m); return data_message; } -JFJochFrameDeserializer::Type JFJochFrameDeserializer::GetType() const { +CBORStream2Deserializer::Type CBORStream2Deserializer::GetType() const { std::unique_lock ul(m); return msg_type; } -EndMessage JFJochFrameDeserializer::GetEndMessage() const { +EndMessage CBORStream2Deserializer::GetEndMessage() const { std::unique_lock ul(m); return end_message; } -StartMessage JFJochFrameDeserializer::GetStartMessage() const { +StartMessage CBORStream2Deserializer::GetStartMessage() const { std::unique_lock ul(m); return start_message; } \ No newline at end of file diff --git a/frame_serialize/JFJochFrameDeserializer.h b/frame_serialize/CBORStream2Deserializer.h similarity index 89% rename from frame_serialize/JFJochFrameDeserializer.h rename to frame_serialize/CBORStream2Deserializer.h index d1288a52..0e2c44b9 100644 --- a/frame_serialize/JFJochFrameDeserializer.h +++ b/frame_serialize/CBORStream2Deserializer.h @@ -1,7 +1,7 @@ // Copyright (2019-2023) Paul Scherrer Institute -#ifndef JUNGFRAUJOCH_JFJOCHFRAMEDESERIALIZER_H -#define JUNGFRAUJOCH_JFJOCHFRAMEDESERIALIZER_H +#ifndef JUNGFRAUJOCH_CBORSTREAM2DESERIALIZER_H +#define JUNGFRAUJOCH_CBORSTREAM2DESERIALIZER_H #include #include @@ -10,10 +10,10 @@ #include "../common/SpotToSave.h" #include "tinycbor/src/cbor.h" -#include "CBORMessages.h" +#include "JFJochMessages.h" #include -class JFJochFrameDeserializer { +class CBORStream2Deserializer { public: enum class Type {START, END, IMAGE}; private: @@ -58,4 +58,4 @@ public: }; -#endif //JUNGFRAUJOCH_JFJOCHFRAMEDESERIALIZER_H +#endif //JUNGFRAUJOCH_CBORSTREAM2DESERIALIZER_H diff --git a/frame_serialize/JFJochFrameSerializer.cpp b/frame_serialize/CBORStream2Serializer.cpp similarity index 94% rename from frame_serialize/JFJochFrameSerializer.cpp rename to frame_serialize/CBORStream2Serializer.cpp index f50871d6..c0730d7e 100644 --- a/frame_serialize/JFJochFrameSerializer.cpp +++ b/frame_serialize/CBORStream2Serializer.cpp @@ -1,6 +1,6 @@ // Copyright (2019-2023) Paul Scherrer Institute -#include "JFJochFrameSerializer.h" +#include "CBORStream2Serializer.h" #include "tinycbor/src/cbor.h" #include "CborErr.h" #include "CborUtil.h" @@ -77,7 +77,7 @@ void CBOR_ENC_COMPRESSED(CborEncoder &encoder, } } -inline void CBOR_ENC_2D_TYPED_ARRAY(CborEncoder &encoder, const CBORImage& image) { +inline void CBOR_ENC_2D_TYPED_ARRAY(CborEncoder &encoder, const CompressedImage& image) { //if ((algorithm == CompressionAlgorithm::NO_COMPRESSION) && (xpixel * ypixel != image_size / elem_size)) // throw JFJochException(JFJochExceptionCategory::CBORError, "Mismatch in array size"); @@ -182,7 +182,7 @@ inline void CBOR_ENC(CborEncoder &encoder, const char* key, const std::vector &v) { +inline void CBOR_ENC(CborEncoder &encoder, const char* key, const std::vector &v) { CborEncoder mapEncoder; cborErr(cbor_encode_text_stringz(&encoder, key)); @@ -306,40 +306,14 @@ inline void CBOR_ENC_USER_DATA(CborEncoder &encoder, const StartMessage& message cborErr(cbor_encoder_close_container(&encoder, &mapEncoder)); } -JFJochFrameSerializer::JFJochFrameSerializer(uint8_t *in_buffer, size_t buffer_size) : +CBORStream2Serializer::CBORStream2Serializer(uint8_t *in_buffer, size_t buffer_size) : buffer(in_buffer), max_buffer_size(buffer_size), curr_size(0) {} - -size_t JFJochFrameSerializer::GetImageAppendOffset() { - return curr_size + sizeof(size_t); -} - -size_t JFJochFrameSerializer::GetBufferSize() const { +size_t CBORStream2Serializer::GetBufferSize() const { return curr_size; } -size_t JFJochFrameSerializer::GetRemainingBuffer() const { - return max_buffer_size - curr_size; -} - -void JFJochFrameSerializer::AppendImage(size_t image_size) { - if (curr_size + image_size + sizeof(size_t) >= max_buffer_size) - throw JFJochException(JFJochExceptionCategory::CBORError, "No space to extend the image"); - curr_size--; - buffer[curr_size] = 0x40 | 27; - curr_size++; -#ifdef LITTLE_ENDIAN - size_t image_size_be = __builtin_bswap64(image_size); -#else - size_t image_size_be = image_size; -#endif - memcpy(buffer + curr_size, &image_size_be, sizeof(size_t)); - curr_size += sizeof(size_t); - curr_size += image_size; -} - - -void JFJochFrameSerializer::SerializeSequenceStart(const StartMessage& message) { +void CBORStream2Serializer::SerializeSequenceStart(const StartMessage& message) { CborEncoder encoder, mapEncoder; cbor_encoder_init(&encoder, buffer, max_buffer_size, 0); @@ -387,7 +361,7 @@ void JFJochFrameSerializer::SerializeSequenceStart(const StartMessage& message) curr_size = cbor_encoder_get_buffer_size(&encoder, buffer); } -void JFJochFrameSerializer::SerializeSequenceEnd(const EndMessage& message) { +void CBORStream2Serializer::SerializeSequenceEnd(const EndMessage& message) { CborEncoder encoder, mapEncoder, userDataMapEncoder; cbor_encoder_init(&encoder, buffer, max_buffer_size, 0); cborErr(cbor_encode_tag(&encoder, CborSignatureTag)); @@ -415,7 +389,7 @@ void JFJochFrameSerializer::SerializeSequenceEnd(const EndMessage& message) { curr_size = cbor_encoder_get_buffer_size(&encoder, buffer); } -void JFJochFrameSerializer::SerializeImage(const DataMessage& message) { +void CBORStream2Serializer::SerializeImage(const DataMessage& message) { CborEncoder encoder, mapEncoder, userDataMapEncoder; cbor_encoder_init(&encoder, buffer, max_buffer_size, 0); diff --git a/frame_serialize/JFJochFrameSerializer.h b/frame_serialize/CBORStream2Serializer.h similarity index 53% rename from frame_serialize/JFJochFrameSerializer.h rename to frame_serialize/CBORStream2Serializer.h index 83746add..4f58744d 100644 --- a/frame_serialize/JFJochFrameSerializer.h +++ b/frame_serialize/CBORStream2Serializer.h @@ -1,28 +1,25 @@ // Copyright (2019-2023) Paul Scherrer Institute -#ifndef JUNGFRAUJOCH_JFJOCHFRAMESERIALIZER_H -#define JUNGFRAUJOCH_JFJOCHFRAMESERIALIZER_H +#ifndef JUNGFRAUJOCH_CBORSTREAM2SERIALIZER_H +#define JUNGFRAUJOCH_CBORSTREAM2SERIALIZER_H #include #include #include #include "../common/SpotToSave.h" -#include "CBORMessages.h" +#include "JFJochMessages.h" -class JFJochFrameSerializer { +class CBORStream2Serializer { uint8_t *buffer = nullptr; size_t max_buffer_size; size_t curr_size; public: - explicit JFJochFrameSerializer(uint8_t *buffer, size_t buffer_size); - [[nodiscard]] size_t GetImageAppendOffset(); + explicit CBORStream2Serializer(uint8_t *buffer, size_t buffer_size); [[nodiscard]] size_t GetBufferSize() const; - [[nodiscard]] size_t GetRemainingBuffer() const; void SerializeSequenceStart(const StartMessage& message); void SerializeSequenceEnd(const EndMessage& message); void SerializeImage(const DataMessage& message); - void AppendImage(size_t image_size); }; -#endif //JUNGFRAUJOCH_JFJOCHFRAMESERIALIZER_H +#endif //JUNGFRAUJOCH_CBORSTREAM2SERIALIZER_H diff --git a/frame_serialize/CMakeLists.txt b/frame_serialize/CMakeLists.txt index c4d541f8..518c7175 100644 --- a/frame_serialize/CMakeLists.txt +++ b/frame_serialize/CMakeLists.txt @@ -1,6 +1,6 @@ -ADD_LIBRARY(FrameSerialize STATIC - JFJochFrameSerializer.cpp JFJochFrameSerializer.h - JFJochFrameDeserializer.cpp JFJochFrameDeserializer.h +ADD_LIBRARY(CBORStream2FrameSerialize STATIC + CBORStream2Serializer.cpp CBORStream2Serializer.h + CBORStream2Deserializer.cpp CBORStream2Deserializer.h tinycbor/src/cborparser_dup_string.c tinycbor/src/cborencoder.c tinycbor/src/cborencoder_close_container_checked.c @@ -10,4 +10,16 @@ ADD_LIBRARY(FrameSerialize STATIC tinycbor/src/cborpretty.c tinycbor/src/cborerrorstrings.c tinycbor/src/cbor.h - tinycbor/src/tinycbor-version.h CborErr.h CborUtil.h CBORMessages.h) + tinycbor/src/tinycbor-version.h CborErr.h CborUtil.h JFJochMessages.h) + +ADD_LIBRARY(ImagePusher STATIC + ImagePusher.cpp ImagePusher.h + TestImagePusher.cpp TestImagePusher.h + ZMQStream2Pusher.cpp ZMQStream2Pusher.h + ZMQPreviewPublisher.cpp ZMQPreviewPublisher.h + ZMQBsreadImagePusher.cpp + ZMQBsreadImagePusher.h) + +FIND_PACKAGE(OpenSSL REQUIRED) + +TARGET_LINK_LIBRARIES(ImagePusher CBORStream2FrameSerialize CommonFunctions Compression OpenSSL::Crypto) \ No newline at end of file diff --git a/common/ImagePusher.cpp b/frame_serialize/ImagePusher.cpp similarity index 100% rename from common/ImagePusher.cpp rename to frame_serialize/ImagePusher.cpp diff --git a/frame_serialize/ImagePusher.h b/frame_serialize/ImagePusher.h new file mode 100644 index 00000000..5aed9556 --- /dev/null +++ b/frame_serialize/ImagePusher.h @@ -0,0 +1,27 @@ +// Copyright (2019-2023) Paul Scherrer Institute + +#ifndef JUNGFRAUJOCH_IMAGEPUSHER_H +#define JUNGFRAUJOCH_IMAGEPUSHER_H + +#include +#include + +#include "../common/DiffractionExperiment.h" +#include "../common/DiffractionSpot.h" +#include "CBORStream2Serializer.h" +#include "JFJochMessages.h" +#include "../common/ZeroCopyReturnValue.h" + +void PrepareCBORImage(DataMessage& message, + const DiffractionExperiment &experiment, + void *image, size_t image_size); + +class ImagePusher { +public: + virtual void StartDataCollection(const StartMessage& message) = 0; + virtual void EndDataCollection(const EndMessage& message) = 0; + virtual void SendImage(const DataMessage& message) = 0; +}; + + +#endif //JUNGFRAUJOCH_IMAGEPUSHER_H diff --git a/frame_serialize/CBORMessages.h b/frame_serialize/JFJochMessages.h similarity index 90% rename from frame_serialize/CBORMessages.h rename to frame_serialize/JFJochMessages.h index ac86c954..b5fd9f1a 100644 --- a/frame_serialize/CBORMessages.h +++ b/frame_serialize/JFJochMessages.h @@ -1,7 +1,7 @@ // Copyright (2019-2023) Paul Scherrer Institute -#ifndef JUNGFRAUJOCH_CBORMESSAGES_H -#define JUNGFRAUJOCH_CBORMESSAGES_H +#ifndef JUNGFRAUJOCH_JFJOCHMESSAGES_H +#define JUNGFRAUJOCH_JFJOCHMESSAGES_H #include #include @@ -13,7 +13,7 @@ constexpr const uint64_t user_data_release = 1; constexpr const uint64_t user_data_magic_number = 0x52320000UL | user_data_release; -struct CBORImage { +struct CompressedImage { const uint8_t *data; size_t size; // Including compression size_t xpixel; @@ -27,7 +27,7 @@ struct CBORImage { struct DataMessage { int64_t number = INT64_MIN; - CBORImage image; + CompressedImage image; std::vector spots; std::vector rad_int_profile; uint64_t indexing_result; // 0 - not tried, 1 - tried and failed, 2 - tried and success @@ -121,18 +121,18 @@ struct StartMessage { std::vector rad_int_bin_to_q; std::vector rad_int_solid_angle_corr; - std::vector pixel_mask; - std::vector calibration; + std::vector pixel_mask; + std::vector calibration; size_t approx_size = 1024*1024; // Use function below to update approx_size - void AddPixelMask(CBORImage image) { + void AddPixelMask(CompressedImage image) { approx_size += image.size; pixel_mask.emplace_back(std::move(image)); } - void AddCalibration(CBORImage image) { + void AddCalibration(CompressedImage image) { approx_size += image.size; calibration.emplace_back(std::move(image)); } @@ -155,4 +155,4 @@ struct EndMessage { uint64_t adu_histogram_bin_width; }; -#endif //JUNGFRAUJOCH_CBORMESSAGES_H +#endif //JUNGFRAUJOCH_JFJOCHMESSAGES_H diff --git a/common/TestImagePusher.cpp b/frame_serialize/TestImagePusher.cpp similarity index 80% rename from common/TestImagePusher.cpp rename to frame_serialize/TestImagePusher.cpp index 1ae0182a..13930207 100644 --- a/common/TestImagePusher.cpp +++ b/frame_serialize/TestImagePusher.cpp @@ -2,15 +2,15 @@ #include "TestImagePusher.h" #include "../tests/FPGAUnitTest.h" -#include "JFJochCompressor.h" +#include "../compression/JFJochCompressor.h" #include "../compression/JFJochDecompress.h" -#include "../frame_serialize/JFJochFrameDeserializer.h" +#include "CBORStream2Deserializer.h" TestImagePusher::TestImagePusher(int64_t image_number) { image_id = image_number; } -void TestImagePusher::StartDataCollection(const uint8_t *image_data, size_t image_size, int64_t data_file_count) { +void TestImagePusher::StartDataCollection(const StartMessage& message) { std::unique_lock ul(m); if (is_running) @@ -28,37 +28,16 @@ void TestImagePusher::EndDataCollection(const EndMessage& message) { is_running = false; } -void TestImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) { +void TestImagePusher::SendImage(const DataMessage& message) { std::unique_lock ul(m); frame_counter++; - if (image_number == image_id) { - - JFJochFrameDeserializer deserializer; - deserializer.Process(image_data, image_size); - - auto image_array = deserializer.GetDataMessage(); - receiver_generated_image.resize(image_array.image.size); - memcpy(receiver_generated_image.data(), image_array.image.data, image_array.image.size); + if (message.number == image_id) { + receiver_generated_image.resize(message.image.size); + memcpy(receiver_generated_image.data(), message.image.data, message.image.size); } } -void TestImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, - ZeroCopyReturnValue *z) { - std::unique_lock ul(m); - - frame_counter++; - if (image_number == image_id) { - JFJochFrameDeserializer deserializer; - deserializer.Process(image_data, image_size); - - auto image_array = deserializer.GetDataMessage(); - receiver_generated_image.resize(image_array.image.size); - memcpy(receiver_generated_image.data(), image_array.image.data, image_array.image.size); - } - z->release(); -} - bool TestImagePusher::CheckSequence() const { std::unique_lock ul(m); return correct_sequence; diff --git a/common/TestImagePusher.h b/frame_serialize/TestImagePusher.h similarity index 72% rename from common/TestImagePusher.h rename to frame_serialize/TestImagePusher.h index 009f6c93..555b54e5 100644 --- a/common/TestImagePusher.h +++ b/frame_serialize/TestImagePusher.h @@ -6,8 +6,8 @@ #include #include "ImagePusher.h" -#include "Logger.h" -#include "DiffractionExperiment.h" +#include "../common/Logger.h" +#include "../common/DiffractionExperiment.h" #include "../jungfrau/JFCalibration.h" #include "../jungfrau/JFModuleGainCalibration.h" @@ -19,11 +19,9 @@ class TestImagePusher : public ImagePusher { bool is_running = false; size_t frame_counter = 0; public: - void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override; - void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, - ZeroCopyReturnValue *z) override; + void SendImage(const DataMessage& message) override; explicit TestImagePusher(int64_t image_number); - void StartDataCollection(const uint8_t *image_data, size_t image_size, int64_t data_file_count) override; + void StartDataCollection(const StartMessage& message) override; void EndDataCollection(const EndMessage& message) override; bool CheckImage(const DiffractionExperiment &x, const std::vector &raw_reference_image, diff --git a/frame_serialize/ZMQBsreadImagePusher.cpp b/frame_serialize/ZMQBsreadImagePusher.cpp new file mode 100644 index 00000000..9d181ffa --- /dev/null +++ b/frame_serialize/ZMQBsreadImagePusher.cpp @@ -0,0 +1,81 @@ +// Copyright (2019-2023) Paul Scherrer Institute + +#include "ZMQBsreadImagePusher.h" +#include +#include + +ZMQBsreadImagePusher::ZMQBsreadImagePusher(const std::string &addr, int32_t send_buffer_high_watermark, + int32_t send_buffer_size) : socket(context, ZMQSocketType::Push) { + if (send_buffer_size > 0) + socket.SendBufferSize(send_buffer_size); + if (send_buffer_high_watermark > 0) + socket.SendWaterMark(send_buffer_high_watermark); + socket.Bind(addr); +} + +void ZMQBsreadImagePusher::StartDataCollection(const StartMessage &message) { + // Do nothing +} + +void ZMQBsreadImagePusher::SendImage(const DataMessage &message) { + std::unique_lock ul(m); + + timespec ts{}; + clock_gettime(CLOCK_REALTIME, &ts); + auto data_header = DataHeader(message); + auto main_header = MainHeader(message, data_header, ts); + socket.Send(main_header, true, true); + socket.Send(data_header, true, true); + socket.Send(message.image.data, message.image.size, true, true); + socket.Send(&ts, sizeof(ts), true, false); +} + +void ZMQBsreadImagePusher::EndDataCollection(const EndMessage &message) { + // Do nothing +} + +std::string ZMQBsreadImagePusher::MainHeader(const DataMessage &message, const std::string &data_header, timespec &ts) { + nlohmann::json j; + j["htype"] = "bsr_m-1.1"; + j["pulse_id"] = message.bunch_id; + j["hash"] = MD5Hash(data_header); + j["global_timestamp"]["sec"] = ts.tv_sec; + j["global_timestamp"]["ns"] = ts.tv_nsec; + return j.dump(); +} + +std::string ZMQBsreadImagePusher::MD5Hash(const std::string &s) { + std::string ret; + uint8_t hash[MD5_DIGEST_LENGTH]; + MD5(reinterpret_cast(s.data()), s.length(), hash); + for (unsigned char i : hash) { + char buf[4]; + sprintf(buf, "%02x", i); + ret.append(buf); + } + return ret; +} + +std::string ZMQBsreadImagePusher::DataHeader(const DataMessage &message) { + nlohmann::json data; + data["htype"] = "bsr_d-1.1"; + + nlohmann::json image_channel; + image_channel["name"] = "JFJOCH:IMAGE"; + if (message.image.pixel_depth_bytes == 2) + image_channel["type"] = (message.image.pixel_is_signed ? "int16" : "uint16"); + else + image_channel["type"] = (message.image.pixel_is_signed ? "int32" : "uint32"); + image_channel["shape"] = {message.image.ypixel, message.image.xpixel}; + image_channel["encoding"] = "little"; + if (message.image.algorithm == CompressionAlgorithm::BSHUF_LZ4) + image_channel["compression"] = "bitshuffle_lz4"; + else if (message.image.algorithm == CompressionAlgorithm::NO_COMPRESSION) + image_channel["compression"] = "none"; + else + throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Compression not allowed for BSRead data"); + image_channel["modulo"] = 1; + image_channel["offset"] = 0; + data["channels"].push_back(image_channel); + return data.dump(); +} diff --git a/frame_serialize/ZMQBsreadImagePusher.h b/frame_serialize/ZMQBsreadImagePusher.h new file mode 100644 index 00000000..22f3a297 --- /dev/null +++ b/frame_serialize/ZMQBsreadImagePusher.h @@ -0,0 +1,24 @@ +// Copyright (2019-2023) Paul Scherrer Institute + +#ifndef JUNGFRAUJOCH_ZMQBSREADIMAGEPUSHER_H +#define JUNGFRAUJOCH_ZMQBSREADIMAGEPUSHER_H + +#include "ImagePusher.h" +#include "../common/ZMQWrappers.h" + +class ZMQBsreadImagePusher : public ImagePusher { + std::mutex m; + ZMQContext context; + ZMQSocket socket; + static std::string MainHeader(const DataMessage &message, const std::string &data_header, timespec &ts); + static std::string MD5Hash(const std::string& s); + static std::string DataHeader(const DataMessage &message); +public: + ZMQBsreadImagePusher(const std::string& addr, int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1); + void StartDataCollection(const StartMessage &message) override; + void SendImage(const DataMessage &message) override; + void EndDataCollection(const EndMessage &message) override; +}; + + +#endif //JUNGFRAUJOCH_ZMQBSREADIMAGEPUSHER_H diff --git a/frame_serialize/ZMQPreviewPublisher.cpp b/frame_serialize/ZMQPreviewPublisher.cpp new file mode 100644 index 00000000..aede7c94 --- /dev/null +++ b/frame_serialize/ZMQPreviewPublisher.cpp @@ -0,0 +1,51 @@ +// Copyright (2019-2023) Paul Scherrer Institute + +#include "ZMQPreviewPublisher.h" + +#include "CBORStream2Serializer.h" + +ZMQPreviewPublisher::ZMQPreviewPublisher(ZMQContext& context, const std::string& addr) : + socket(context, ZMQSocketType::Pub) { + socket.SendWaterMark(2).NoLinger(); + socket.Bind(addr); +} + +void ZMQPreviewPublisher::SetPreviewStride(int64_t preview_stride) { + std::unique_lock ul(m); + stride = preview_stride; +} + +void ZMQPreviewPublisher::StartDataCollection(const StartMessage& message) { + std::unique_lock ul(m); + current_part = -1; + + size_t approx_size = 1024*1024; + for (const auto &x : message.pixel_mask) + approx_size += x.size; + + for (const auto &x : message.calibration) + approx_size += x.size; + + std::vector serialization_buffer(approx_size); + CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size()); + serializer.SerializeSequenceStart(message); + socket.Send(serialization_buffer.data(), serializer.GetBufferSize()); +} + +void ZMQPreviewPublisher::SendImage(const DataMessage& message) { + { + std::unique_lock ul(m); + int64_t part = message.number / stride; + if (current_part >= part) + return; + else + current_part = part; + } + size_t approx_size = message.image.size + 2*1024*1024; + std::vector serialization_buffer(approx_size); + + CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size()); + serializer.SerializeImage(message); + + socket.Send(serialization_buffer.data(), serializer.GetBufferSize()); +} diff --git a/common/ZMQPreviewPublisher.h b/frame_serialize/ZMQPreviewPublisher.h similarity index 59% rename from common/ZMQPreviewPublisher.h rename to frame_serialize/ZMQPreviewPublisher.h index 335c5f05..0e9be2f2 100644 --- a/common/ZMQPreviewPublisher.h +++ b/frame_serialize/ZMQPreviewPublisher.h @@ -3,10 +3,10 @@ #ifndef JUNGFRAUJOCH_ZMQPREVIEWPUBLISHER_H #define JUNGFRAUJOCH_ZMQPREVIEWPUBLISHER_H -#include "ZMQWrappers.h" -#include "DiffractionExperiment.h" +#include "../common/ZMQWrappers.h" +#include "../common/DiffractionExperiment.h" #include "../jungfrau/JFCalibration.h" -#include "../frame_serialize/CBORMessages.h" +#include "JFJochMessages.h" class ZMQPreviewPublisher { ZMQSocket socket; @@ -17,8 +17,10 @@ class ZMQPreviewPublisher { mutable std::mutex m; public: ZMQPreviewPublisher(ZMQContext& context, const std::string& addr); - void StartDataCollection(const uint8_t *image_data, size_t image_size, int64_t preview_stride); - void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number); + void SetPreviewStride(int64_t preview_stride); + + void StartDataCollection(const StartMessage& message); + void SendImage(const DataMessage& message); }; diff --git a/common/ZMQImagePusher.cpp b/frame_serialize/ZMQStream2Pusher.cpp similarity index 57% rename from common/ZMQImagePusher.cpp rename to frame_serialize/ZMQStream2Pusher.cpp index e7564c87..7c0069d0 100644 --- a/common/ZMQImagePusher.cpp +++ b/frame_serialize/ZMQStream2Pusher.cpp @@ -1,9 +1,9 @@ // Copyright (2019-2023) Paul Scherrer Institute -#include "ZMQImagePusher.h" -#include "JFJochException.h" +#include "ZMQStream2Pusher.h" +#include "CBORStream2Serializer.h" -ZMQImagePusher::ZMQImagePusher(ZMQContext &zmq_context, const std::vector &addr, +ZMQStream2Pusher::ZMQStream2Pusher(ZMQContext &zmq_context, const std::vector &addr, int32_t send_buffer_high_watermark, int32_t send_buffer_size) { if (addr.empty()) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, @@ -20,7 +20,7 @@ ZMQImagePusher::ZMQImagePusher(ZMQContext &zmq_context, const std::vector &addr, +ZMQStream2Pusher::ZMQStream2Pusher(const std::vector &addr, int32_t send_buffer_high_watermark, int32_t send_buffer_size) { if (addr.empty()) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, @@ -39,35 +39,44 @@ ZMQImagePusher::ZMQImagePusher(const std::vector &addr, } } -void ZMQImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) { +void ZMQStream2Pusher::SendImage(const DataMessage& message) { if (sockets.empty()) return; - auto socket_number = (image_number % file_count) % sockets.size(); - sockets[socket_number]->Send(image_data, image_size); + auto socket_number = (message.number % file_count) % sockets.size(); + + size_t approx_size = message.image.size + 2*1024*1024; + std::vector serialization_buffer(approx_size); + + CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size()); + serializer.SerializeImage(message); + + sockets[socket_number]->Send(serialization_buffer.data(), serializer.GetBufferSize()); } -void ZMQImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, - ZeroCopyReturnValue *z) { - if (sockets.empty()) - return; - auto socket_number = (image_number % file_count) % sockets.size(); - sockets[socket_number]->SendZeroCopy(image_data, image_size, z); -} - -void ZMQImagePusher::StartDataCollection(const uint8_t *image_data, size_t image_size, int64_t data_file_count) { - if (data_file_count < 1) +void ZMQStream2Pusher::StartDataCollection(const StartMessage& message) { + if (message.data_file_count < 1) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "File count cannot be zero or negative"); - file_count = data_file_count; + file_count = message.data_file_count; + size_t approx_size = 1024*1024; + for (const auto &x : message.pixel_mask) + approx_size += x.size; + + for (const auto &x : message.calibration) + approx_size += x.size; + + std::vector serialization_buffer(approx_size); + CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size()); + serializer.SerializeSequenceStart(message); for (const auto &s: sockets) - s->Send(image_data, image_size, true); + s->Send(serialization_buffer.data(), serializer.GetBufferSize(), true); } -void ZMQImagePusher::EndDataCollection(const EndMessage& message) { +void ZMQStream2Pusher::EndDataCollection(const EndMessage& message) { std::vector serialization_buffer(80*1024*1024); - JFJochFrameSerializer serializer(serialization_buffer.data(), serialization_buffer.size()); + CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size()); EndMessage end_message = message; diff --git a/frame_serialize/ZMQStream2Pusher.h b/frame_serialize/ZMQStream2Pusher.h new file mode 100644 index 00000000..d4aa3eb8 --- /dev/null +++ b/frame_serialize/ZMQStream2Pusher.h @@ -0,0 +1,28 @@ +// Copyright (2019-2023) Paul Scherrer Institute + +#ifndef JUNGFRAUJOCH_ZMQSTREAM2PUSHER_H +#define JUNGFRAUJOCH_ZMQSTREAM2PUSHER_H + +#include "ImagePusher.h" +#include "../common/ThreadSafeFIFO.h" +#include "../common/ZMQWrappers.h" +#include "../common/DiffractionSpot.h" +#include "JFJochMessages.h" + +class ZMQStream2Pusher : public ImagePusher { + std::vector> contexts; + std::vector> sockets; + int64_t file_count = 1; +public: + ZMQStream2Pusher(ZMQContext &context, const std::vector& addr, + int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1); + // High performance implementation, where each socket has dedicated ZMQ context + explicit ZMQStream2Pusher(const std::vector& addr, + int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1); + + void StartDataCollection(const StartMessage& message) override; + void SendImage(const DataMessage& message) override; + void EndDataCollection(const EndMessage& message) override; +}; + +#endif //JUNGFRAUJOCH_ZMQSTREAM2PUSHER_H diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index f552361f..8115378f 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -3,7 +3,7 @@ ADD_LIBRARY(JFJochReceiver STATIC JFJochReceiverTest.cpp JFJochReceiverTest.h JFJochReceiver.cpp JFJochReceiver.h) -TARGET_LINK_LIBRARIES(JFJochReceiver ImageAnalysis JungfraujochAcqusitionDevice CommonFunctions HLSSimulation) +TARGET_LINK_LIBRARIES(JFJochReceiver ImagePusher ImageAnalysis JungfraujochAcqusitionDevice CommonFunctions HLSSimulation) ADD_EXECUTABLE(jfjoch_action_test jfjoch_action_test.cpp) TARGET_LINK_LIBRARIES(jfjoch_action_test JFJochReceiver) diff --git a/receiver/JFJochReceiver.cpp b/receiver/JFJochReceiver.cpp index 0085f3a9..7674c889 100644 --- a/receiver/JFJochReceiver.cpp +++ b/receiver/JFJochReceiver.cpp @@ -4,7 +4,6 @@ #include -#include "../jungfrau/JFPedestalCalc.h" #include "../image_analysis/IndexerWrapper.h" #include "../common/DiffractionGeometry.h" @@ -23,7 +22,6 @@ JFJochReceiver::JFJochReceiver(const DiffractionExperiment& in_experiment, AcquisitionDeviceGroup &in_aq_device, ImagePusher &in_image_sender, Logger &in_logger, int64_t in_forward_and_sum_nthreads, - int64_t in_send_buffer_count, ZMQPreviewPublisher* in_preview_publisher, const NUMAHWPolicy &in_numa_policy) : experiment(in_experiment), @@ -36,14 +34,11 @@ JFJochReceiver::JFJochReceiver(const DiffractionExperiment& in_experiment, ndatastreams(experiment.GetDataStreamsNum()), data_acquisition_ready(ndatastreams), frame_transformation_ready((experiment.GetImageNum() > 0) ? frame_transformation_nthreads : 0), - send_buffer_count(in_send_buffer_count), indexing_solution_per_file(experiment.GetDataFileCount()), numa_policy(in_numa_policy), adu_histogram_module(experiment.GetModulesNum()) { - send_buffer = (uint8_t *) malloc(send_buffer_size * send_buffer_count); - - try { + try { if (calibration != nullptr) { one_byte_mask = calibration->CalculateOneByteMask(experiment); } else { @@ -51,11 +46,6 @@ JFJochReceiver::JFJochReceiver(const DiffractionExperiment& in_experiment, for (auto &i: one_byte_mask) i = 1; } - for (uint32_t i = 0; i < send_buffer_count; i++) { - send_buffer_avail.Put(i); - send_buffer_zero_copy_ret_val.emplace_back(send_buffer_avail, i); - } - if (!experiment.CheckGitSha1Consistent()) logger.Warning(experiment.CheckGitSha1Msg()); @@ -118,7 +108,7 @@ JFJochReceiver::JFJochReceiver(const DiffractionExperiment& in_experiment, size_t ypixel = experiment.GetYPixelsNum(); pixel_mask = compressor.Compress(calibration->CalculateNexusMask(experiment, 0)); - message.AddPixelMask(CBORImage{ + message.AddPixelMask(CompressedImage{ .data = pixel_mask.data(), .size = pixel_mask.size(), .xpixel = (size_t) xpixel, @@ -139,7 +129,7 @@ JFJochReceiver::JFJochReceiver(const DiffractionExperiment& in_experiment, if (experiment.GetStorageCellNumber() > 1) channel += "_sc" + std::to_string(sc); - CBORImage image{ + CompressedImage image{ .data = pedestal.at(pedestal.size() - 1).data(), .size = pedestal.at(pedestal.size() - 1).size(), .xpixel = (size_t) xpixel, @@ -164,17 +154,13 @@ JFJochReceiver::JFJochReceiver(const DiffractionExperiment& in_experiment, } else message.rad_int_bin_number = 0; - std::vector serialization_buffer(message.approx_size); - JFJochFrameSerializer serializer(serialization_buffer.data(), serialization_buffer.size()); + if (preview_publisher != nullptr) { + preview_publisher->SetPreviewStride(experiment.GetPreviewStride()); + preview_publisher->StartDataCollection(message); + } - serializer.SerializeSequenceStart(message); - - if (preview_publisher != nullptr) - preview_publisher->StartDataCollection(serialization_buffer.data(), serializer.GetBufferSize(), - experiment.GetPreviewStride()); if (push_images_to_writer) - image_pusher.StartDataCollection(serialization_buffer.data(), serializer.GetBufferSize(), - experiment.GetDataFileCount()); + image_pusher.StartDataCollection(message); for (int i = 0; i < experiment.GetImageNum(); i++) images_to_go.Put(i); @@ -200,7 +186,6 @@ JFJochReceiver::JFJochReceiver(const DiffractionExperiment& in_experiment, measurement = std::async(std::launch::async, &JFJochReceiver::FinalizeMeasurement, this); } catch (...) { - free(send_buffer); throw; } } @@ -403,28 +388,15 @@ void JFJochReceiver::FrameTransformationThread() { *rad_int_profile_per_file[image_number % experiment.GetDataFileCount()] += *rad_int_profile_image; } - message.receiver_available_send_buffers = GetAvailableSendBuffers(); - - auto send_buffer_handle = send_buffer_avail.GetBlocking(); - auto ptr = send_buffer + send_buffer_size * send_buffer_handle; - JFJochFrameSerializer serializer(ptr, send_buffer_size); - PrepareCBORImage(message, experiment, nullptr, 0); - serializer.SerializeImage(message); - if (serializer.GetRemainingBuffer() < experiment.GetMaxCompressedSize()) - throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, - "Not enough memory to save image"); - size_t image_size = transformation.SaveCompressedImage(ptr + serializer.GetImageAppendOffset()); - serializer.AppendImage(image_size); + message.image = transformation.GetCompressedImage(); if (preview_publisher && (!local_data_processing_settings.preview_indexed_only || indexed)) - preview_publisher->SendImage(ptr, serializer.GetBufferSize(), image_number); + preview_publisher->SendImage(message); if (push_images_to_writer) { - image_pusher.SendImage(ptr, serializer.GetBufferSize(), image_number, - &send_buffer_zero_copy_ret_val[send_buffer_handle]); - compressed_size += image_size; - } else - send_buffer_avail.Put(send_buffer_handle); + image_pusher.SendImage(message); + compressed_size += message.image.size; + } UpdateMaxImage(image_number); images_sent++; @@ -568,9 +540,6 @@ void JFJochReceiver::FinalizeMeasurement() { for (auto &future : data_acquisition_futures) future.get(); - for (int i = 0; i < send_buffer_count; i++) - send_buffer_avail.GetBlocking(); - RetrievePedestal(); logger.Info("Devices stopped"); @@ -597,7 +566,6 @@ void JFJochReceiver::StopReceiver() { JFJochReceiver::~JFJochReceiver() { if (measurement.valid()) measurement.get(); - free(send_buffer); } DataProcessingSettings JFJochReceiver::GetDataProcessingSettings() { @@ -627,7 +595,6 @@ Plot JFJochReceiver::GetPlots(const PlotRequest &request) { return indexing_solution_per_file.GetPlot(); case PlotType::ADUHistorgram: return adu_histogram_total.GetPlot(); - break; default: // Do nothing break; @@ -665,14 +632,10 @@ void JFJochReceiver::UpdateMaxDelay(int64_t delay) { max_delay = delay; } -float JFJochReceiver::GetAvailableSendBuffers() const { - return static_cast(send_buffer_avail.Size()) / static_cast(send_buffer_count); -} - JFJochReceiverStatus JFJochReceiver::GetStatus() const { return { .progress = GetProgress(), .indexing_rate = GetIndexingRate(), - .send_buffers_avail = GetAvailableSendBuffers() + .send_buffers_avail = 0 }; } diff --git a/receiver/JFJochReceiver.h b/receiver/JFJochReceiver.h index 70e3f7a5..b9ed9196 100644 --- a/receiver/JFJochReceiver.h +++ b/receiver/JFJochReceiver.h @@ -15,10 +15,10 @@ #include "../common/DiffractionExperiment.h" #include "../common/JFJochException.h" #include "../common/FrameTransformation.h" -#include "../common/ImagePusher.h" +#include "../frame_serialize/ImagePusher.h" #include "../common/Logger.h" #include "../common/ThreadSafeFIFO.h" -#include "../common/ZMQPreviewPublisher.h" +#include "../frame_serialize/ZMQPreviewPublisher.h" #include "../common/NUMAHWPolicy.h" #include "../common/StatusVector.h" #include "../common/Histogram.h" @@ -118,14 +118,6 @@ class JFJochReceiver { int64_t max_image_number_sent = 0; std::mutex max_image_number_sent_mutex; - const size_t send_buffer_size = experiment.GetMaxCompressedSize() + 1024*1024; - // max compressed size + 1 MiB reserve for spots and rad. integration results - const size_t send_buffer_count; - - ThreadSafeFIFO send_buffer_avail; - uint8_t *send_buffer; - std::vector send_buffer_zero_copy_ret_val; - NUMAHWPolicy numa_policy; void AcquireThread(uint16_t data_stream); @@ -142,7 +134,6 @@ public: AcquisitionDeviceGroup &acquisition_devices, ImagePusher &image_pusher, Logger &logger, int64_t forward_and_sum_nthreads, - int64_t send_buffer_count, ZMQPreviewPublisher* preview_publisher, const NUMAHWPolicy &numa_policy); ~JFJochReceiver(); @@ -160,7 +151,6 @@ public: void SetDataProcessingSettings(const DataProcessingSettings &data_processing_settings); - float GetAvailableSendBuffers() const; Plot GetPlots(const PlotRequest& request); RadialIntegrationProfiles GetRadialIntegrationProfiles(); }; diff --git a/receiver/JFJochReceiverService.cpp b/receiver/JFJochReceiverService.cpp index 7afea292..d44d5dea 100644 --- a/receiver/JFJochReceiverService.cpp +++ b/receiver/JFJochReceiverService.cpp @@ -72,8 +72,7 @@ void JFJochReceiverService::Start(const DiffractionExperiment &experiment, const receiver = std::make_unique(experiment, calibration, aq_devices, image_pusher, - logger, nthreads, send_buffer_count, - preview_publisher, numa_policy); + logger, nthreads, preview_publisher, numa_policy); try { // Don't want to stop receiver->SetDataProcessingSettings(data_processing_settings); diff --git a/receiver/JFJochReceiverTest.cpp b/receiver/JFJochReceiverTest.cpp index 4ea34305..cc3b0146 100644 --- a/receiver/JFJochReceiverTest.cpp +++ b/receiver/JFJochReceiverTest.cpp @@ -2,8 +2,8 @@ #include "JFJochReceiverTest.h" #include "JFJochReceiverService.h" -#include "../common/ZMQImagePusher.h" -#include "../common/TestImagePusher.h" +#include "../frame_serialize/ZMQStream2Pusher.h" +#include "../frame_serialize/TestImagePusher.h" #define STORAGE_CELL_FOR_TEST 11 diff --git a/tests/CBORTest.cpp b/tests/CBORTest.cpp index 96a1b121..a44b1685 100644 --- a/tests/CBORTest.cpp +++ b/tests/CBORTest.cpp @@ -2,15 +2,15 @@ #include -#include "../frame_serialize/JFJochFrameSerializer.h" -#include "../frame_serialize/JFJochFrameDeserializer.h" +#include "../frame_serialize/CBORStream2Serializer.h" +#include "../frame_serialize/CBORStream2Deserializer.h" #include "../compression/JFJochCompressor.h" #include "stream2.h" #include "../frame_serialize/CborUtil.h" TEST_CASE("CBORSerialize_Start", "[CBOR]") { std::vector buffer(8*1024*1024); - JFJochFrameSerializer serializer(buffer.data(), buffer.size()); + CBORStream2Serializer serializer(buffer.data(), buffer.size()); StartMessage message { .data_file_count = 3, @@ -65,9 +65,9 @@ TEST_CASE("CBORSerialize_Start", "[CBOR]") { REQUIRE_NOTHROW(serializer.SerializeSequenceStart(message)); - JFJochFrameDeserializer deserializer; + CBORStream2Deserializer deserializer; REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize())); - REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::START); + REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::START); StartMessage output_message; REQUIRE_NOTHROW(output_message = deserializer.GetStartMessage()); @@ -127,11 +127,11 @@ TEST_CASE("CBORSerialize_Start", "[CBOR]") { TEST_CASE("CBORSerialize_Start_PixelMask", "[CBOR]") { std::vector buffer(8*1024*1024); - JFJochFrameSerializer serializer(buffer.data(), buffer.size()); + CBORStream2Serializer serializer(buffer.data(), buffer.size()); std::vector mask(456*457, 15); - CBORImage image_mask { + CompressedImage image_mask { .data = reinterpret_cast(mask.data()), .size = 456 * 457 * sizeof(uint32_t), .xpixel = 456, @@ -148,9 +148,9 @@ TEST_CASE("CBORSerialize_Start_PixelMask", "[CBOR]") { message.AddPixelMask(image_mask); REQUIRE_NOTHROW(serializer.SerializeSequenceStart(message)); - JFJochFrameDeserializer deserializer; + CBORStream2Deserializer deserializer; REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize())); - REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::START); + REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::START); StartMessage output_message; REQUIRE_NOTHROW(output_message = deserializer.GetStartMessage()); @@ -164,7 +164,7 @@ TEST_CASE("CBORSerialize_Start_PixelMask", "[CBOR]") { TEST_CASE("CBORSerialize_Start_Calibration", "[CBOR]") { std::vector buffer(8*1024*1024); - JFJochFrameSerializer serializer(buffer.data(), buffer.size()); + CBORStream2Serializer serializer(buffer.data(), buffer.size()); std::vector calib1(256); std::vector calib2(256); @@ -174,7 +174,7 @@ TEST_CASE("CBORSerialize_Start_Calibration", "[CBOR]") { calib2[i] = i * 76.33456; } - CBORImage image1 { + CompressedImage image1 { .data = reinterpret_cast(calib1.data()), .size = 16 * 16 * sizeof(float), .xpixel = 16, @@ -186,7 +186,7 @@ TEST_CASE("CBORSerialize_Start_Calibration", "[CBOR]") { .channel = "calib1" }; - CBORImage image2 { + CompressedImage image2 { .data = reinterpret_cast(calib2.data()), .size = 16 * 16 * sizeof(float), .xpixel = 16, @@ -204,9 +204,9 @@ TEST_CASE("CBORSerialize_Start_Calibration", "[CBOR]") { message.AddCalibration(image1); REQUIRE_NOTHROW(serializer.SerializeSequenceStart(message)); - JFJochFrameDeserializer deserializer; + CBORStream2Deserializer deserializer; REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize())); - REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::START); + REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::START); StartMessage output_message; REQUIRE_NOTHROW(output_message = deserializer.GetStartMessage()); @@ -225,7 +225,7 @@ TEST_CASE("CBORSerialize_Start_Calibration", "[CBOR]") { TEST_CASE("CBORSerialize_End", "[CBOR]") { std::vector buffer(8*1024*1024); - JFJochFrameSerializer serializer(buffer.data(), buffer.size()); + CBORStream2Serializer serializer(buffer.data(), buffer.size()); EndMessage message { .number_of_images = 57789, @@ -239,9 +239,9 @@ TEST_CASE("CBORSerialize_End", "[CBOR]") { REQUIRE_NOTHROW(serializer.SerializeSequenceEnd(message)); - JFJochFrameDeserializer deserializer; + CBORStream2Deserializer deserializer; REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize())); - REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::END); + REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::END); EndMessage output_message{}; REQUIRE_NOTHROW(output_message = deserializer.GetEndMessage()); @@ -257,7 +257,7 @@ TEST_CASE("CBORSerialize_End", "[CBOR]") { TEST_CASE("CBORSerialize_End_RadIntResult", "[CBOR]") { std::vector buffer(8*1024*1024); -JFJochFrameSerializer serializer(buffer.data(), buffer.size()); +CBORStream2Serializer serializer(buffer.data(), buffer.size()); EndMessage message { .number_of_images = 57789, @@ -274,9 +274,9 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); REQUIRE_NOTHROW(serializer.SerializeSequenceEnd(message)); - JFJochFrameDeserializer deserializer; + CBORStream2Deserializer deserializer; REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize())); - REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::END); + REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::END); EndMessage output_message{}; REQUIRE_NOTHROW(output_message = deserializer.GetEndMessage()); @@ -289,7 +289,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); TEST_CASE("CBORSerialize_End_ADUHistogram", "[CBOR]") { std::vector buffer(8*1024*1024); - JFJochFrameSerializer serializer(buffer.data(), buffer.size()); + CBORStream2Serializer serializer(buffer.data(), buffer.size()); EndMessage message { .number_of_images = 57789, @@ -307,9 +307,9 @@ TEST_CASE("CBORSerialize_End_ADUHistogram", "[CBOR]") { REQUIRE_NOTHROW(serializer.SerializeSequenceEnd(message)); - JFJochFrameDeserializer deserializer; + CBORStream2Deserializer deserializer; REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize())); - REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::END); + REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::END); EndMessage output_message{}; REQUIRE_NOTHROW(output_message = deserializer.GetEndMessage()); @@ -324,7 +324,7 @@ TEST_CASE("CBORSerialize_End_ADUHistogram", "[CBOR]") { TEST_CASE("CBORSerialize_Image", "[CBOR]") { std::vector buffer(8*1024*1024); -JFJochFrameSerializer serializer(buffer.data(), buffer.size()); +CBORStream2Serializer serializer(buffer.data(), buffer.size()); std::vector spots; @@ -332,7 +332,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); for (int i = 0; i < test.size(); i++) test[i] = (i * 253 + 56) % 256; - CBORImage image { + CompressedImage image { .data = test.data(), .size = 1024, .xpixel = 256, @@ -362,9 +362,9 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); REQUIRE_NOTHROW(serializer.SerializeImage(message)); - JFJochFrameDeserializer deserializer; + CBORStream2Deserializer deserializer; REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize())); - REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE); + REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::IMAGE); auto image_array = deserializer.GetDataMessage(); REQUIRE(image_array.image.algorithm == CompressionAlgorithm::NO_COMPRESSION); @@ -392,7 +392,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); TEST_CASE("CBORSerialize_Image_2", "[CBOR]") { std::vector buffer(8*1024*1024); -JFJochFrameSerializer serializer(buffer.data(), buffer.size()); +CBORStream2Serializer serializer(buffer.data(), buffer.size()); std::vector spots; @@ -400,7 +400,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); for (int i = 0; i < test.size(); i++) test[i] = (i * 253 + 56) % 256; - CBORImage image { + CompressedImage image { .data = test.data(), .size = 1024 * 512, .xpixel = 1024, @@ -421,9 +421,9 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); REQUIRE_NOTHROW(serializer.SerializeImage(message)); - JFJochFrameDeserializer deserializer; + CBORStream2Deserializer deserializer; REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize())); - REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE); + REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::IMAGE); auto image_array = deserializer.GetDataMessage(); REQUIRE(image_array.image.algorithm == CompressionAlgorithm::NO_COMPRESSION); @@ -442,7 +442,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); TEST_CASE("CBORSerialize_Image_Float", "[CBOR]") { std::vector buffer(8*1024*1024); - JFJochFrameSerializer serializer(buffer.data(), buffer.size()); + CBORStream2Serializer serializer(buffer.data(), buffer.size()); std::vector spots; @@ -450,7 +450,7 @@ TEST_CASE("CBORSerialize_Image_Float", "[CBOR]") { for (int i = 0; i < test.size(); i++) test[i] = i * 0.1f; - CBORImage image { + CompressedImage image { .data = reinterpret_cast(test.data()), .size = 1024 * 512 * sizeof(float), .xpixel = 1024, @@ -471,9 +471,9 @@ TEST_CASE("CBORSerialize_Image_Float", "[CBOR]") { REQUIRE_NOTHROW(serializer.SerializeImage(message)); - JFJochFrameDeserializer deserializer; + CBORStream2Deserializer deserializer; REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize())); - REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE); + REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::IMAGE); auto image_array = deserializer.GetDataMessage(); REQUIRE(image_array.image.algorithm == CompressionAlgorithm::NO_COMPRESSION); @@ -490,60 +490,9 @@ TEST_CASE("CBORSerialize_Image_Float", "[CBOR]") { REQUIRE(memcmp(image_array.image.data, test.data(), test.size() * sizeof(float)) == 0); } -TEST_CASE("CBORSerialize_Image_Append", "[CBOR]") { - std::vector buffer(8*1024*1024); -JFJochFrameSerializer serializer(buffer.data(), buffer.size()); - - std::vector spots; - - std::vector test(512*1024); - for (int i = 0; i < test.size(); i++) - test[i] = (i * 253 + 56) % 256; - - CBORImage image { - .data = nullptr, - .size = 0, - .xpixel = 1024, - .ypixel = 512, - .pixel_depth_bytes = 1, - .pixel_is_signed = false, - .pixel_is_float = false, - .algorithm = CompressionAlgorithm::NO_COMPRESSION, - .channel = "default" - }; - - DataMessage message { - .number = 480, - .image = image, - .spots = spots, - .indexing_result = 3 - }; - - REQUIRE_NOTHROW(serializer.SerializeImage(message)); - memcpy(buffer.data() + serializer.GetImageAppendOffset(), test.data(), 512*1024); - REQUIRE_NOTHROW(serializer.AppendImage(512*1024)); - - JFJochFrameDeserializer deserializer; - REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize())); - REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE); - - auto image_array = deserializer.GetDataMessage(); - REQUIRE(image_array.image.algorithm == CompressionAlgorithm::NO_COMPRESSION); - REQUIRE(image_array.image.xpixel == 1024); - REQUIRE(image_array.image.ypixel == 512); - REQUIRE(image_array.image.pixel_depth_bytes == 1); - REQUIRE(!image_array.image.pixel_is_signed); - REQUIRE(!image_array.image.pixel_is_float); - REQUIRE(image_array.image.channel == "default"); - REQUIRE(image_array.image.size == test.size()); - REQUIRE(image_array.indexing_result == message.indexing_result); - REQUIRE(image_array.number == 480); - REQUIRE(memcmp(image_array.image.data, test.data(), test.size()) == 0); -} - TEST_CASE("CBORSerialize_Image_Compressed", "[CBOR]") { std::vector buffer(8*1024*1024); -JFJochFrameSerializer serializer(buffer.data(), buffer.size()); +CBORStream2Serializer serializer(buffer.data(), buffer.size()); std::vector spots; @@ -551,7 +500,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); for (int i = 0; i < test.size(); i++) test[i] = (i * 253 + 56) % 256; - CBORImage image { + CompressedImage image { .data = test.data(), .size = 512, .xpixel = 256, @@ -571,9 +520,9 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); REQUIRE_NOTHROW(serializer.SerializeImage(message)); - JFJochFrameDeserializer deserializer; + CBORStream2Deserializer deserializer; REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize())); - REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE); + REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::IMAGE); auto image_array = deserializer.GetDataMessage(); REQUIRE(image_array.image.algorithm == CompressionAlgorithm::BSHUF_LZ4); @@ -590,13 +539,13 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); TEST_CASE("CBORSerialize_Image_Rad_Int_Profile", "[CBOR]") { std::vector buffer(8 * 1024 * 1024); - JFJochFrameSerializer serializer(buffer.data(), buffer.size()); + CBORStream2Serializer serializer(buffer.data(), buffer.size()); std::vector test(1024); for (int i = 0; i < test.size(); i++) test[i] = (i * 253 + 56) % 256; - CBORImage image{ + CompressedImage image{ .data = test.data(), .size = 1024, .xpixel = 256, @@ -614,9 +563,9 @@ TEST_CASE("CBORSerialize_Image_Rad_Int_Profile", "[CBOR]") { REQUIRE_NOTHROW(serializer.SerializeImage(message)); - JFJochFrameDeserializer deserializer; + CBORStream2Deserializer deserializer; REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize())); - REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE); + REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::IMAGE); auto image_array = deserializer.GetDataMessage(); REQUIRE(image_array.number == 789); @@ -628,7 +577,7 @@ TEST_CASE("CBORSerialize_Image_Rad_Int_Profile", "[CBOR]") { TEST_CASE("CBORSerialize_Image_Spots", "[CBOR]") { std::vector buffer(8*1024*1024); -JFJochFrameSerializer serializer(buffer.data(), buffer.size()); +CBORStream2Serializer serializer(buffer.data(), buffer.size()); std::vector spots; spots.push_back(SpotToSave{.x = 7, .y = 8, .intensity = 34, .indexed = false}); @@ -638,7 +587,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); for (int i = 0; i < test.size(); i++) test[i] = (i * 253 + 56) % 256; - CBORImage image { + CompressedImage image { .data = test.data(), .size = 1024, .xpixel = 256, @@ -656,9 +605,9 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); REQUIRE_NOTHROW(serializer.SerializeImage(message)); - JFJochFrameDeserializer deserializer; + CBORStream2Deserializer deserializer; REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize())); - REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE); + REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::IMAGE); auto image_array = deserializer.GetDataMessage(); REQUIRE(image_array.number == 789); @@ -683,11 +632,11 @@ inline bool CmpString(const char *str1, const std::string& str2) { TEST_CASE("CBORSerialize_Start_stream2", "[CBOR]") { std::vector buffer(8*1024*1024); - JFJochFrameSerializer serializer(buffer.data(), buffer.size()); + CBORStream2Serializer serializer(buffer.data(), buffer.size()); std::vector mask(456*457, 15); - CBORImage image_mask { + CompressedImage image_mask { .data = reinterpret_cast(mask.data()), .size = 456 * 457 * sizeof(uint32_t), .xpixel = 456, @@ -783,7 +732,7 @@ TEST_CASE("CBORSerialize_Start_stream2", "[CBOR]") { TEST_CASE("CBORSerialize_End_stream2", "[CBOR]") { std::vector buffer(8*1024*1024); -JFJochFrameSerializer serializer(buffer.data(), buffer.size()); +CBORStream2Serializer serializer(buffer.data(), buffer.size()); EndMessage message { .number_of_images = 57789, @@ -813,7 +762,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); TEST_CASE("CBORSerialize_Image_compressed_stream2", "[CBOR]") { std::vector buffer(8*1024*1024); -JFJochFrameSerializer serializer(buffer.data(), buffer.size()); +CBORStream2Serializer serializer(buffer.data(), buffer.size()); std::vector spots; @@ -826,7 +775,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); size_t compressed_size = compressor.Compress(compressed_test.data(), test); - CBORImage image { + CompressedImage image { .data = compressed_test.data(), .size = compressed_size, .xpixel = 1024, @@ -870,7 +819,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); TEST_CASE("CBORSerialize_Image_uncompressed_stream2", "[CBOR]") { std::vector buffer(8*1024*1024); -JFJochFrameSerializer serializer(buffer.data(), buffer.size()); +CBORStream2Serializer serializer(buffer.data(), buffer.size()); std::vector spots; @@ -878,7 +827,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); for (int i = 0; i < test.size(); i++) test[i] = (i * 253 + 56) % 256; - CBORImage image { + CompressedImage image { .data = test.data(), .size = 1024 * 512, .xpixel = 1024, diff --git a/tests/FrameTransformationTest.cpp b/tests/FrameTransformationTest.cpp index 46386ada..9cda28e7 100644 --- a/tests/FrameTransformationTest.cpp +++ b/tests/FrameTransformationTest.cpp @@ -10,6 +10,16 @@ using namespace std::literals::chrono_literals; +inline uint32_t read_be32(const void *ptr) { + auto ptr32 = (uint32_t *) ptr; + return __builtin_bswap32(ptr32[0]); +} + +inline uint64_t read_be64(const void *ptr) { + auto ptr64 = (uint64_t *) ptr; + return __builtin_bswap64(ptr64[0]); +} + TEST_CASE("Bshuf_SSE", "[bitshuffle]") { REQUIRE (bshuf_using_SSE2() == 1); } @@ -36,14 +46,13 @@ TEST_CASE("FrameTransformation_Raw_NoCompression" ,"") { for (int i = 0; i < nmodules*RAW_MODULE_SIZE; i++) input_1[i] = dist(g1); - std::vector output(experiment.GetPixelsNum()); - for (int i = 0; i < nmodules; i++) { REQUIRE_NOTHROW(transformation.ProcessModule(input_0.data() + i * RAW_MODULE_SIZE, i, 0)); REQUIRE_NOTHROW(transformation.ProcessModule(input_1.data() + i * RAW_MODULE_SIZE, i, 1)); } - - REQUIRE(transformation.SaveCompressedImage(output.data()) == experiment.GetPixelDepth() * experiment.GetPixelsNum()); + auto image = transformation.GetCompressedImage(); + REQUIRE(image.size == experiment.GetPixelDepth() * experiment.GetPixelsNum()); + auto output = (int16_t *) image.data; uint32_t diff_0 = 0; uint32_t diff_1 = 0; @@ -76,14 +85,14 @@ TEST_CASE("FrameTransformation_Converted_NoCompression" ,"") { for (int i = 0; i < nmodules*RAW_MODULE_SIZE; i++) input_1[i] = dist(g1); - std::vector output(experiment.GetPixelsNum()); - for (int i = 0; i < nmodules; i++) { REQUIRE_NOTHROW(transformation.ProcessModule(input_0.data() + i * RAW_MODULE_SIZE, i, 0)); REQUIRE_NOTHROW(transformation.ProcessModule(input_1.data() + i * RAW_MODULE_SIZE, i, 1)); } - REQUIRE(transformation.SaveCompressedImage(output.data()) == experiment.GetPixelDepth() * experiment.GetPixelsNum()); + auto image = transformation.GetCompressedImage(); + REQUIRE(image.size == experiment.GetPixelDepth() * experiment.GetPixelsNum()); + auto output = (int16_t *) image.data; REQUIRE(input_0[511*1024] == output[CONVERTED_MODULE_SIZE * (2 * nmodules - 2) + 0]); REQUIRE(input_0[511*1024+256]/2 == output[CONVERTED_MODULE_SIZE * (2 * nmodules - 2) + 258]); @@ -134,15 +143,13 @@ TEST_CASE("FrameTransformation_Converted_bshuf_lz4" ,"") { REQUIRE_NOTHROW(transformation.ProcessModule(input_1.data() + i * RAW_MODULE_SIZE, i, 1)); } - size_t compressed_size; - REQUIRE_NOTHROW(compressed_size = transformation.SaveCompressedImage(output_compressed.data())); - output_compressed.resize(compressed_size); + auto image = transformation.GetCompressedImage(); - REQUIRE(bshuf_read_uint64_BE(output_compressed.data()) == experiment.GetPixelsNum() * experiment.GetPixelDepth()); - REQUIRE(bshuf_read_uint32_BE(output_compressed.data()+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth()); + REQUIRE(read_be64(image.data) == experiment.GetPixelsNum() * experiment.GetPixelDepth()); + REQUIRE(read_be32(image.data+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth()); - std::vector output; - REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), output_compressed, + std::vector output; + REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), image.data, image.size, experiment.GetPixelsNum())); REQUIRE(input_0[511*1024] == output[CONVERTED_MODULE_SIZE * (2 * nmodules - 2) + 0]); @@ -193,15 +200,13 @@ TEST_CASE("FrameTransformation_Converted_bshuf_zstd" ,"") { REQUIRE_NOTHROW(transformation.ProcessModule(input_1.data() + i * RAW_MODULE_SIZE, i, 1)); } - size_t compressed_size; - REQUIRE_NOTHROW(compressed_size = transformation.SaveCompressedImage(output_compressed.data())); + auto image = transformation.GetCompressedImage(); - REQUIRE(bshuf_read_uint64_BE(output_compressed.data()) == experiment.GetPixelsNum() * experiment.GetPixelDepth()); - REQUIRE(bshuf_read_uint32_BE(output_compressed.data()+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth()); + REQUIRE(read_be64(image.data) == experiment.GetPixelsNum() * experiment.GetPixelDepth()); + REQUIRE(read_be32(image.data+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth()); - output_compressed.resize(compressed_size); - std::vector output; - REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), output_compressed, + std::vector output; + REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), image.data, image.size, experiment.GetPixelsNum())); REQUIRE(input_0[511*1024] == output[CONVERTED_MODULE_SIZE * (2 * nmodules - 2) + 0]); @@ -252,15 +257,13 @@ TEST_CASE("FrameTransformation_Converted_bshuf_zstd_rle" ,"") { REQUIRE_NOTHROW(transformation.ProcessModule(input_1.data() + i * RAW_MODULE_SIZE, i, 1)); } - size_t compressed_size; - REQUIRE_NOTHROW(compressed_size = transformation.SaveCompressedImage(output_compressed.data())); + auto image = transformation.GetCompressedImage(); - REQUIRE(bshuf_read_uint64_BE(output_compressed.data()) == experiment.GetPixelsNum() * experiment.GetPixelDepth()); - REQUIRE(bshuf_read_uint32_BE(output_compressed.data()+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth()); + REQUIRE(read_be64(image.data) == experiment.GetPixelsNum() * experiment.GetPixelDepth()); + REQUIRE(read_be32(image.data+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth()); - output_compressed.resize(compressed_size); - std::vector output; - REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), output_compressed, + std::vector output; + REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), image.data, image.size, experiment.GetPixelsNum())); REQUIRE(input_0[511*1024] == output[CONVERTED_MODULE_SIZE * (2 * nmodules - 2) + 0]); @@ -314,15 +317,13 @@ TEST_CASE("FrameTransformation_Converted_bshuf_zstd_32bit" ,"") { REQUIRE_NOTHROW(transformation.ProcessModule(input_1.data() + i * RAW_MODULE_SIZE, i, 1)); } - size_t compressed_size; - REQUIRE_NOTHROW(compressed_size = transformation.SaveCompressedImage(output_compressed.data())); + auto image = transformation.GetCompressedImage(); - REQUIRE(bshuf_read_uint64_BE(output_compressed.data()) == experiment.GetPixelsNum() * experiment.GetPixelDepth()); - REQUIRE(bshuf_read_uint32_BE(output_compressed.data()+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth()); + REQUIRE(read_be64(image.data) == experiment.GetPixelsNum() * experiment.GetPixelDepth()); + REQUIRE(read_be32(image.data+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth()); - output_compressed.resize(compressed_size); - std::vector output; - REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), output_compressed, + std::vector output; + REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), image.data, image.size, experiment.GetPixelsNum())); REQUIRE(input_0[511*1024] == output[CONVERTED_MODULE_SIZE * (2 * nmodules - 2) + 0]); @@ -381,15 +382,13 @@ TEST_CASE("FrameTransformation_Converted_bshuf_zstd_unsigned_16bit" ,"") { REQUIRE_NOTHROW(transformation.ProcessModule(input_1.data() + i * RAW_MODULE_SIZE, i, 1)); } - size_t compressed_size; - REQUIRE_NOTHROW(compressed_size = transformation.SaveCompressedImage(output_compressed.data())); + auto image = transformation.GetCompressedImage(); - REQUIRE(bshuf_read_uint64_BE(output_compressed.data()) == experiment.GetPixelsNum() * experiment.GetPixelDepth()); - REQUIRE(bshuf_read_uint32_BE(output_compressed.data()+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth()); + REQUIRE(read_be64(image.data) == experiment.GetPixelsNum() * experiment.GetPixelDepth()); + REQUIRE(read_be32(image.data+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth()); - output_compressed.resize(compressed_size); std::vector output; - REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), output_compressed, + REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), image.data, image.size, experiment.GetPixelsNum())); REQUIRE(input_0[511*1024] == output[CONVERTED_MODULE_SIZE * (2 * nmodules - 2) + 0]); diff --git a/tests/JFJochReceiverIntegrationTest.cpp b/tests/JFJochReceiverIntegrationTest.cpp index ea55f2b2..136f3beb 100644 --- a/tests/JFJochReceiverIntegrationTest.cpp +++ b/tests/JFJochReceiverIntegrationTest.cpp @@ -5,7 +5,7 @@ #include "../receiver/JFJochReceiverTest.h" #include "../acquisition_device/HLSSimulatedDevice.h" #include "../jungfrau/JFPedestalCalc.h" -#include "../common/TestImagePusher.h" +#include "../frame_serialize/TestImagePusher.h" using namespace std::literals::chrono_literals; diff --git a/tests/StreamWriterTest.cpp b/tests/StreamWriterTest.cpp index 55583c52..3c34c83e 100644 --- a/tests/StreamWriterTest.cpp +++ b/tests/StreamWriterTest.cpp @@ -4,7 +4,7 @@ #include #include "../writer/StreamWriter.h" -#include "../common/ZMQImagePusher.h" +#include "../frame_serialize/ZMQStream2Pusher.h" #include "../receiver/JFJochReceiverService.h" TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") { @@ -23,7 +23,7 @@ TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") { for (int i = 0; i < x.GetDataStreamsNum(); i++) aq_devices.AddHLSDevice(64); - ZMQImagePusher pusher (context, {zmq_addr}); + ZMQStream2Pusher pusher (context, {zmq_addr}); JFJochReceiverService fpga_receiver_service(aq_devices, logger, pusher); JFJochReceiverOutput receiver_output; diff --git a/tests/ZMQImagePusherTest.cpp b/tests/ZMQImagePusherTest.cpp index ca3a28ec..1d97507d 100644 --- a/tests/ZMQImagePusherTest.cpp +++ b/tests/ZMQImagePusherTest.cpp @@ -2,7 +2,7 @@ #include #include "../writer/ZMQImagePuller.h" -#include "../common/ZMQImagePusher.h" +#include "../frame_serialize/ZMQStream2Pusher.h" void test_puller(ZMQImagePuller *puller, const DiffractionExperiment& x, @@ -15,13 +15,13 @@ void test_puller(ZMQImagePuller *puller, std::vector &nimages) { puller->WaitForImage(); - if (puller->GetFrameType() != JFJochFrameDeserializer::Type::START) { + if (puller->GetFrameType() != CBORStream2Deserializer::Type::START) { diff_content[writer_id]++; return; } puller->WaitForImage(); - while (puller->GetFrameType() != JFJochFrameDeserializer::Type::END) { - if (puller->GetFrameType() == JFJochFrameDeserializer::Type::IMAGE) { + while (puller->GetFrameType() != CBORStream2Deserializer::Type::END) { + if (puller->GetFrameType() == CBORStream2Deserializer::Type::IMAGE) { auto image = puller->GetDataMessage(); if ((nwriter > 1) && (image.number % nwriter != writer_id)) diff_split[writer_id]++; @@ -76,15 +76,13 @@ TEST_CASE("ZMQImageCommTest_1Writer","[ZeroMQ]") { // Puller needs to be declared first, but both objects need to exist till communication finished // TODO: ImageSender should not allow if there are still completions to be done ZMQImagePuller puller(context); - ZMQImagePusher pusher(context, {zmq_addr}); + ZMQStream2Pusher pusher(context, {zmq_addr}); std::vector diff_size(1), diff_content(1), diff_split(1), nimages(1); puller.Connect(zmq_addr); std::thread sender_thread = std::thread([&] { - std::vector serialization_buffer(16*1024*1024); - JFJochFrameSerializer serializer(serialization_buffer.data(), serialization_buffer.size()); StartMessage message { .data_file_count = 16 }; @@ -92,14 +90,12 @@ TEST_CASE("ZMQImageCommTest_1Writer","[ZeroMQ]") { .write_master_file = true }; - serializer.SerializeSequenceStart(message); - pusher.StartDataCollection(serialization_buffer.data(), serializer.GetBufferSize(), message.data_file_count); + pusher.StartDataCollection(message); for (int i = 0; i < nframes; i++) { DataMessage data_message; data_message.number = i; PrepareCBORImage(data_message, x, image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t)); - serializer.SerializeImage(data_message); - pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), i); + pusher.SendImage(data_message); } pusher.EndDataCollection(end_message); @@ -148,7 +144,7 @@ TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") { for (int i = 0; i < npullers; i++) zmq_addr.push_back("inproc://#" + std::to_string(i)); - ZMQImagePusher pusher(context, zmq_addr); + ZMQStream2Pusher pusher(context, zmq_addr); // Puller needs to be declared first, but both objects need to exist till communication finished // TODO: ImageSender should not allow if there are still completions to be done @@ -161,23 +157,19 @@ TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") { std::vector diff_size(npullers), diff_content(npullers), diff_split(npullers), nimages(npullers); std::thread sender_thread = std::thread([&] { - std::vector serialization_buffer(16*1024*1024); - JFJochFrameSerializer serializer(serialization_buffer.data(), serialization_buffer.size()); StartMessage message { .data_file_count = 16 }; EndMessage end_message{ .write_master_file = true }; - serializer.SerializeSequenceStart(message); - pusher.StartDataCollection(serialization_buffer.data(), serializer.GetBufferSize(), message.data_file_count); + pusher.StartDataCollection(message); for (int i = 0; i < nframes; i++) { DataMessage data_message; data_message.number = i; PrepareCBORImage(data_message, x, image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t)); - serializer.SerializeImage(data_message); - pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), i); + pusher.SendImage(data_message); } pusher.EndDataCollection(end_message); @@ -238,7 +230,7 @@ TEST_CASE("ZMQImageCommTest_4Writers","[ZeroMQ]") { for (int i = 0; i < npullers; i++) zmq_addr.push_back("inproc://#" + std::to_string(i)); - ZMQImagePusher pusher(context, zmq_addr); + ZMQStream2Pusher pusher(context, zmq_addr); // Puller needs to be declared first, but both objects need to exist till communication finished // TODO: ImageSender should not allow if there are still completions to be done @@ -252,23 +244,19 @@ TEST_CASE("ZMQImageCommTest_4Writers","[ZeroMQ]") { std::vector diff_size(npullers), diff_content(npullers), diff_split(npullers), nimages(npullers); std::thread sender_thread = std::thread([&] { - std::vector serialization_buffer(16*1024*1024); - JFJochFrameSerializer serializer(serialization_buffer.data(), serialization_buffer.size()); - StartMessage message { + StartMessage message { .data_file_count = 16 }; EndMessage end_message{ .write_master_file = true }; - serializer.SerializeSequenceStart(message); - pusher.StartDataCollection(serialization_buffer.data(), serializer.GetBufferSize(), message.data_file_count); + pusher.StartDataCollection(message); for (int i = 0; i < nframes; i++) { DataMessage data_message; data_message.number = i; PrepareCBORImage(data_message, x, image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t)); - serializer.SerializeImage(data_message); - pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), i); + pusher.SendImage(data_message); } pusher.EndDataCollection(end_message); diff --git a/tests/ZMQPreviewPublisherTest.cpp b/tests/ZMQPreviewPublisherTest.cpp index caab3b47..968302f3 100644 --- a/tests/ZMQPreviewPublisherTest.cpp +++ b/tests/ZMQPreviewPublisherTest.cpp @@ -1,8 +1,9 @@ // Copyright (2019-2023) Paul Scherrer Institute #include -#include "../common/ZMQPreviewPublisher.h" +#include "../frame_serialize/ZMQPreviewPublisher.h" +/* TEST_CASE("ZMQPreviewPublisher","[ZMQ]") { ZMQContext context; ZMQPreviewPublisher publisher(context, "inproc://#5"); @@ -75,3 +76,4 @@ TEST_CASE("ZMQPreviewPublisher_FrameNumbers","[ZMQ]") { REQUIRE(socket.Receive(s, false) < 0); } +*/ diff --git a/tools/CompressionBenchmark.cpp b/tools/CompressionBenchmark.cpp index c0270a6f..e2a84612 100644 --- a/tools/CompressionBenchmark.cpp +++ b/tools/CompressionBenchmark.cpp @@ -23,7 +23,8 @@ double CheckCompressionThread(const DiffractionExperiment &x, for (int j = 0; j < x.GetModulesNum(); j++ ) { transformation.ProcessModule(image.data() + (j + i * x.GetModulesNum()) * RAW_MODULE_SIZE, j, 0); } - ret += transformation.SaveCompressedImage(output.data() + i * x.GetMaxCompressedSize()); + auto image = transformation.GetCompressedImage(); + ret += image.size; } return ret; } @@ -69,8 +70,10 @@ std::string CheckDecompression(const DiffractionExperiment &x, size_t nimages, c for (int j = 0; j < x.GetModulesNum(); j++ ) { transformation.ProcessModule(image.data() + (j + i * x.GetModulesNum()) * RAW_MODULE_SIZE, j, 0); } - compressed_size[i] = transformation.SaveCompressedImage(output[i].data()); - output[i].resize(compressed_size[i]); + auto image = transformation.GetCompressedImage(); + compressed_size[i] = image.size; + output[i].resize(image.size); + memcpy(output[i].data(), image.data, image.size); } std::vector decompress_v; diff --git a/tools/HDF5DatasetWriteTest.cpp b/tools/HDF5DatasetWriteTest.cpp index 4214c3f8..e8c10b99 100644 --- a/tools/HDF5DatasetWriteTest.cpp +++ b/tools/HDF5DatasetWriteTest.cpp @@ -88,7 +88,12 @@ int main(int argc, char **argv) { for (int i = 0; i < nimages; i++) { for (int j = 0; j < 8; j++) transformation.ProcessModule(image + (i * x.GetModulesNum() + j) * RAW_MODULE_SIZE, j, 0); - output_size[i] = transformation.SaveCompressedImage(output[i].data()); + + auto image = transformation.GetCompressedImage(); + + output_size[i] = image.size; + output[i].resize(image.size); + memcpy(output[i].data(), image.data, image.size); } x.ImagesPerTrigger(nimages_out); @@ -102,7 +107,7 @@ int main(int argc, char **argv) { { size_t xpixel = x.GetXPixelsNum(); size_t ypixel = x.GetYPixelsNum(); - start_message.AddPixelMask(CBORImage{ + start_message.AddPixelMask(CompressedImage{ .data = reinterpret_cast(pixel_mask.data()), .size = pixel_mask.size() * sizeof(uint32_t), .xpixel = xpixel, diff --git a/tools/jfjoch_writer_test.cpp b/tools/jfjoch_writer_test.cpp index 6d49b28a..a327d732 100644 --- a/tools/jfjoch_writer_test.cpp +++ b/tools/jfjoch_writer_test.cpp @@ -6,7 +6,7 @@ #include "../common/Logger.h" #include "../common/FrameTransformation.h" #include "../common/RawToConvertedGeometry.h" -#include "../common/ZMQImagePusher.h" +#include "../frame_serialize/ZMQStream2Pusher.h" #define BASE_TCP_PORT 8000 @@ -56,7 +56,7 @@ int main(int argc, char **argv) { x.DataFileCount(nsockets); - ZMQImagePusher pusher(context, zmq_addr); + ZMQStream2Pusher pusher(context, zmq_addr); FrameTransformation transformation(x); @@ -78,28 +78,28 @@ int main(int argc, char **argv) { for (int j = 0; j < x.GetModulesNum(); j++) transformation.ProcessModule(image_tmp_raw.data() + j * RAW_MODULE_SIZE, j, 0); - output_size[i] = transformation.SaveCompressedImage((uint8_t *) output[i].data()); + + auto image = transformation.GetCompressedImage(); + + output_size[i] = image.size; + output[i].resize(image.size); + memcpy(output[i].data(), image.data, image.size); } logger.Info("Sending {} images", nimages_out); std::vector empty_spot_vector; - std::vector send_buffer(x.GetPixelsNum() * x.GetPixelDepth() * 2); - JFJochFrameSerializer serializer(send_buffer.data(), send_buffer.size()); StartMessage start_message; x.FillMessage(start_message); - serializer.SerializeSequenceStart(start_message); - - pusher.StartDataCollection(send_buffer.data(), serializer.GetBufferSize(), x.GetDataFileCount()); + pusher.StartDataCollection(start_message); for (int i = 0; i < nimages_out; i++) { DataMessage data_message; data_message.number = i; PrepareCBORImage(data_message, x, output[i % nimages_in_file].data(), output_size[i % nimages_in_file]); - serializer.SerializeImage(data_message); - pusher.SendImage(send_buffer.data(), serializer.GetBufferSize(), i); + pusher.SendImage(data_message); } EndMessage end_message{}; diff --git a/writer/CMakeLists.txt b/writer/CMakeLists.txt index 982ce7e3..45f03dd2 100644 --- a/writer/CMakeLists.txt +++ b/writer/CMakeLists.txt @@ -13,7 +13,7 @@ ADD_LIBRARY(JFJochWriter STATIC ZMQImagePuller.cpp ZMQImagePuller.h StreamWriter.cpp StreamWriter.h) -TARGET_LINK_LIBRARIES(JFJochWriter HDF5Wrappers CommonFunctions) +TARGET_LINK_LIBRARIES(JFJochWriter HDF5Wrappers CommonFunctions CBORStream2FrameSerialize) TARGET_LINK_LIBRARIES(jfjoch_writer JFJochWriter) diff --git a/writer/HDF5DataFile.h b/writer/HDF5DataFile.h index 0478c475..d4cd42e6 100644 --- a/writer/HDF5DataFile.h +++ b/writer/HDF5DataFile.h @@ -9,7 +9,7 @@ #include "HDF5Objects.h" #include "../common/SpotToSave.h" -#include "../frame_serialize/CBORMessages.h" +#include "../frame_serialize/JFJochMessages.h" struct HDF5DataFileStatistics { std::string filename; diff --git a/writer/HDF5NXmx.cpp b/writer/HDF5NXmx.cpp index f92f348f..5527762c 100644 --- a/writer/HDF5NXmx.cpp +++ b/writer/HDF5NXmx.cpp @@ -194,7 +194,7 @@ void HDF5Metadata::DetectorModule(HDF5File *hdf5_file, const std::string &name, "", "", "translation", {0,0,0}); } -void HDF5Metadata::SaveCBORImage(HDF5File *hdf5_file, const std::string &hdf5_path, const CBORImage &image) { +void HDF5Metadata::SaveCBORImage(HDF5File *hdf5_file, const std::string &hdf5_path, const CompressedImage &image) { std::vector dims = {image.ypixel, image.xpixel}; HDF5DataType data_type(image.pixel_depth_bytes, image.pixel_is_signed); diff --git a/writer/HDF5NXmx.h b/writer/HDF5NXmx.h index d9756c09..00554787 100644 --- a/writer/HDF5NXmx.h +++ b/writer/HDF5NXmx.h @@ -3,7 +3,7 @@ #ifndef JUNGFRAUJOCH_HDF5NXMX_H #define JUNGFRAUJOCH_HDF5NXMX_H -#include "../frame_serialize/CBORMessages.h" +#include "../frame_serialize/JFJochMessages.h" #include "HDF5Objects.h" @@ -25,7 +25,7 @@ namespace HDF5Metadata { void Mask(HDF5File *hdf5_file, const StartMessage &start, const EndMessage &end); void Calibration(HDF5File *hdf5_file, const StartMessage &start, const EndMessage &end); - void SaveCBORImage(HDF5File *hdf5_file, const std::string& hdf5_path, const CBORImage &image); + void SaveCBORImage(HDF5File *hdf5_file, const std::string& hdf5_path, const CompressedImage &image); void RadInt(HDF5File *hdf5_file, const StartMessage &start, const EndMessage &end); void ADUHistogram(HDF5File *hdf5_file, const EndMessage &end); std::string DataFileName(const std::string& prefix, int64_t file_number); diff --git a/writer/HDF5Writer.h b/writer/HDF5Writer.h index ed39191f..1fc76cdd 100644 --- a/writer/HDF5Writer.h +++ b/writer/HDF5Writer.h @@ -6,7 +6,7 @@ #include #include "HDF5DataFile.h" -#include "../frame_serialize/CBORMessages.h" +#include "../frame_serialize/JFJochMessages.h" class HDF5Writer { std::vector > files; diff --git a/writer/StreamWriter.cpp b/writer/StreamWriter.cpp index 6dbd7634..fe3aa867 100644 --- a/writer/StreamWriter.cpp +++ b/writer/StreamWriter.cpp @@ -14,7 +14,7 @@ StreamWriter::StreamWriter(ZMQContext &context, Logger &in_logger, const std::st void StreamWriter::StartDataCollection() { image_puller.WaitForImage(); - while (image_puller.GetFrameType() != JFJochFrameDeserializer::Type::START) { + while (image_puller.GetFrameType() != CBORStream2Deserializer::Type::START) { logger.Error("Expected START image"); image_puller.WaitForImage(); } @@ -28,7 +28,7 @@ void StreamWriter::StartDataCollection() { void StreamWriter::CollectImages(std::vector &v) { HDF5Writer writer(start_message); image_puller.WaitForImage(); - while (image_puller.GetFrameType() == JFJochFrameDeserializer::Type::IMAGE) { + while (image_puller.GetFrameType() == CBORStream2Deserializer::Type::IMAGE) { auto image_array = image_puller.GetDataMessage(); writer.Write(image_array); image_puller.WaitForImage(); @@ -37,7 +37,7 @@ void StreamWriter::CollectImages(std::vector &v) { } void StreamWriter::EndDataCollection() { - while (image_puller.GetFrameType() != JFJochFrameDeserializer::Type::END) { + while (image_puller.GetFrameType() != CBORStream2Deserializer::Type::END) { logger.Error("Expected END image"); image_puller.WaitForImage(); } diff --git a/writer/ZMQImagePuller.cpp b/writer/ZMQImagePuller.cpp index 12db14fa..048812ce 100644 --- a/writer/ZMQImagePuller.cpp +++ b/writer/ZMQImagePuller.cpp @@ -36,14 +36,14 @@ void ZMQImagePuller::WaitForImage() { if (!abort) { deserializer.Process(zmq_recv_buffer); - if (deserializer.GetType() == JFJochFrameDeserializer::Type::START) { + if (deserializer.GetType() == CBORStream2Deserializer::Type::START) { start_time = std::chrono::system_clock::now(); start_message = std::make_unique(deserializer.GetStartMessage()); end_message.reset(); - } else if (deserializer.GetType() == JFJochFrameDeserializer::Type::END) { + } else if (deserializer.GetType() == CBORStream2Deserializer::Type::END) { end_message = std::make_unique(deserializer.GetEndMessage()); end_time = std::chrono::system_clock::now(); - } else if (deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE) { + } else if (deserializer.GetType() == CBORStream2Deserializer::Type::IMAGE) { processed_images++; deserialized_image_message = std::make_unique(deserializer.GetDataMessage()); processed_size += deserialized_image_message->image.size; @@ -58,9 +58,9 @@ const DataMessage &ZMQImagePuller::GetDataMessage() const { throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Image message not received so far"); } -JFJochFrameDeserializer::Type ZMQImagePuller::GetFrameType() const { +CBORStream2Deserializer::Type ZMQImagePuller::GetFrameType() const { if (abort) - return JFJochFrameDeserializer::Type::END; + return CBORStream2Deserializer::Type::END; else return deserializer.GetType(); } diff --git a/writer/ZMQImagePuller.h b/writer/ZMQImagePuller.h index 1db85b98..c0e4c3e9 100644 --- a/writer/ZMQImagePuller.h +++ b/writer/ZMQImagePuller.h @@ -8,7 +8,7 @@ #include "../common/ZMQWrappers.h" #include "../common/Logger.h" #include "../common/SpotToSave.h" -#include "../frame_serialize/JFJochFrameDeserializer.h" +#include "../frame_serialize/CBORStream2Deserializer.h" struct ZMQImagePullerStatistics { uint64_t processed_images; @@ -19,7 +19,7 @@ struct ZMQImagePullerStatistics { class ZMQImagePuller { std::vector zmq_recv_buffer; - JFJochFrameDeserializer deserializer; + CBORStream2Deserializer deserializer; constexpr const static uint32_t ReceiverWaterMark = 100; // ZeroMQ receive timeout allows to check for abort value from time to time @@ -43,7 +43,7 @@ public: void WaitForImage(); const DataMessage &GetDataMessage() const; - [[nodiscard]] JFJochFrameDeserializer::Type GetFrameType() const; + [[nodiscard]] CBORStream2Deserializer::Type GetFrameType() const; [[nodiscard]] ZMQImagePullerStatistics GetStatistics(); [[nodiscard]] StartMessage GetStartMessage() const; [[nodiscard]] EndMessage GetEndMessage() const;