From eac366cbe55b174e6364ccc566d65be3640ce5a1 Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Tue, 9 May 2023 16:17:13 +0200 Subject: [PATCH] ZeroCopyReturnValue: Add structure to return ZMQ zero copy buffer --- common/CMakeLists.txt | 2 +- common/ImagePusher.h | 4 ++-- common/TestImagePusher.cpp | 4 ++-- common/TestImagePusher.h | 2 +- common/ZMQImagePusher.cpp | 4 ++-- common/ZMQImagePusher.h | 2 +- common/ZMQWrappers.cpp | 12 ++++++------ common/ZMQWrappers.h | 3 ++- common/ZeroCopyReturnValue.h | 18 ++++++++++++++++++ 9 files changed, 35 insertions(+), 16 deletions(-) create mode 100644 common/ZeroCopyReturnValue.h diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 58acae12..c99663fe 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -43,7 +43,7 @@ ADD_LIBRARY( CommonFunctions STATIC grpcToJson.h jsonToGrpc.h to_fixed.h DetectorGeometry.cpp DetectorGeometry.h DetectorModuleGeometry.cpp DetectorModuleGeometry.h - DetectorSetup.h DetectorSetup.cpp) + DetectorSetup.h DetectorSetup.cpp ZeroCopyReturnValue.h) TARGET_LINK_LIBRARIES(CommonFunctions Compression FrameSerialize libzmq JFCalibration JFJochProtoBuf -lrt) diff --git a/common/ImagePusher.h b/common/ImagePusher.h index 059c02dc..bde64161 100644 --- a/common/ImagePusher.h +++ b/common/ImagePusher.h @@ -6,13 +6,13 @@ #include #include -#include #include "DiffractionExperiment.h" #include "DiffractionSpot.h" #include "../frame_serialize/JFJochFrameSerializer.h" #include "../frame_serialize/StartMessage.h" #include "../frame_serialize/EndMessage.h" +#include "ZeroCopyReturnValue.h" void PrepareCBORImage(DataMessage& message, const DiffractionExperiment &experiment, @@ -24,7 +24,7 @@ public: virtual void EndDataCollection(const EndMessage& message) = 0; virtual void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) = 0; virtual void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, - std::binary_semaphore *sempahore) = 0; + ZeroCopyReturnValue *z) = 0; }; diff --git a/common/TestImagePusher.cpp b/common/TestImagePusher.cpp index 1426c861..99d4c6bc 100644 --- a/common/TestImagePusher.cpp +++ b/common/TestImagePusher.cpp @@ -45,7 +45,7 @@ void TestImagePusher::SendImage(const uint8_t *image_data, size_t image_size, in } void TestImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, - std::binary_semaphore *sempahore) { + ZeroCopyReturnValue *z) { std::unique_lock ul(m); frame_counter++; @@ -57,7 +57,7 @@ void TestImagePusher::SendImage(const uint8_t *image_data, size_t image_size, in receiver_generated_image.resize(image_array.image.size); memcpy(receiver_generated_image.data(), image_array.image.data, image_array.image.size); } - sempahore->release(); + z->release(); } bool TestImagePusher::CheckSequence() const { diff --git a/common/TestImagePusher.h b/common/TestImagePusher.h index 97fd46f5..5102b6fa 100644 --- a/common/TestImagePusher.h +++ b/common/TestImagePusher.h @@ -22,7 +22,7 @@ class TestImagePusher : public ImagePusher { public: void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override; void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, - std::binary_semaphore *sempahore) override; + ZeroCopyReturnValue *z) override; explicit TestImagePusher(int64_t image_number); void StartDataCollection(const StartMessage& message) override; void EndDataCollection(const EndMessage& message) override; diff --git a/common/ZMQImagePusher.cpp b/common/ZMQImagePusher.cpp index 681c7a42..6b7681bf 100644 --- a/common/ZMQImagePusher.cpp +++ b/common/ZMQImagePusher.cpp @@ -48,11 +48,11 @@ void ZMQImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int } void ZMQImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, - std::binary_semaphore *sempahore) { + ZeroCopyReturnValue *z) { if (sockets.empty()) return; auto socket_number = (image_number % file_count) % sockets.size(); - sockets[socket_number]->SendZeroCopy(image_data, image_size, sempahore); + sockets[socket_number]->SendZeroCopy(image_data, image_size, z); } void ZMQImagePusher::StartDataCollection(const StartMessage& message) { diff --git a/common/ZMQImagePusher.h b/common/ZMQImagePusher.h index 1ec8ee59..9d1af851 100644 --- a/common/ZMQImagePusher.h +++ b/common/ZMQImagePusher.h @@ -19,7 +19,7 @@ class ZMQImagePusher : public ImagePusher { public: void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override; void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, - std::binary_semaphore *sempahore) override; + ZeroCopyReturnValue *z) override; ZMQImagePusher(ZMQContext &context, const std::vector& addr, int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1); // High performance implementation, where each socket has dedicated ZMQ context diff --git a/common/ZMQWrappers.cpp b/common/ZMQWrappers.cpp index 19d34284..4245dbff 100644 --- a/common/ZMQWrappers.cpp +++ b/common/ZMQWrappers.cpp @@ -77,19 +77,19 @@ void ZMQSocket::Send(zmq_msg_t *msg) { } void zmq_socket_free(void *data, void *hint) { - auto s = (std::binary_semaphore *) hint; - s->release(); + auto z = (ZeroCopyReturnValue *) hint; + z->fifo->Put(z->handle); } -void ZMQSocket::SendZeroCopy(const void *buf, size_t buf_size, std::binary_semaphore *sempahore) { +void ZMQSocket::SendZeroCopy(const void *buf, size_t buf_size, ZeroCopyReturnValue *zero_copy_ret_val) { std::unique_lock ul(m); zmq_msg_t msg; - if (zmq_msg_init_data(&msg, const_cast(buf), buf_size, zmq_socket_free, sempahore) != 0) { - sempahore->release(); + if (zmq_msg_init_data(&msg, const_cast(buf), buf_size, zmq_socket_free, zero_copy_ret_val) != 0) { + zero_copy_ret_val->release(); throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_msg_init_data failed"); } if (zmq_msg_send(&msg, socket, 0) < 0) { - sempahore->release(); + zero_copy_ret_val->release(); throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_msg_send failed"); } } diff --git a/common/ZMQWrappers.h b/common/ZMQWrappers.h index 90ed8561..6cc68927 100644 --- a/common/ZMQWrappers.h +++ b/common/ZMQWrappers.h @@ -12,6 +12,7 @@ #include #include "JFJochException.h" +#include "ZeroCopyReturnValue.h" class ZMQContext { void *context; @@ -79,7 +80,7 @@ public: void Send(); void Send(const void *buf, size_t buf_size, bool blocking = true, bool multipart = false); - void SendZeroCopy(const void *buf, size_t buf_size, std::binary_semaphore *sempahore); + void SendZeroCopy(const void *buf, size_t buf_size, ZeroCopyReturnValue *zero_copy_ret_val); template void Send(const std::vector &buf) { Send(buf.data(), buf.size() * sizeof(T)); } diff --git a/common/ZeroCopyReturnValue.h b/common/ZeroCopyReturnValue.h new file mode 100644 index 00000000..2037d6fe --- /dev/null +++ b/common/ZeroCopyReturnValue.h @@ -0,0 +1,18 @@ +// Copyright (2019-2023) Paul Scherrer Institute +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef JUNGFRAUJOCH_ZEROCOPYRETURNVALUE_H +#define JUNGFRAUJOCH_ZEROCOPYRETURNVALUE_H + +#include "ThreadSafeFIFO.h" + +struct ZeroCopyReturnValue { + uint32_t handle; + ThreadSafeFIFO *fifo; + + void release() const { + fifo->PutBlocking(handle); + } +}; + +#endif //JUNGFRAUJOCH_ZEROCOPYRETURNVALUE_H