ZeroCopyReturnValue: Add structure to return ZMQ zero copy buffer

This commit is contained in:
2023-05-09 16:17:13 +02:00
parent 2c62d01759
commit eac366cbe5
9 changed files with 35 additions and 16 deletions

View File

@@ -43,7 +43,7 @@ ADD_LIBRARY( CommonFunctions STATIC
grpcToJson.h jsonToGrpc.h to_fixed.h
DetectorGeometry.cpp DetectorGeometry.h
DetectorModuleGeometry.cpp DetectorModuleGeometry.h
DetectorSetup.h DetectorSetup.cpp)
DetectorSetup.h DetectorSetup.cpp ZeroCopyReturnValue.h)
TARGET_LINK_LIBRARIES(CommonFunctions Compression FrameSerialize libzmq JFCalibration JFJochProtoBuf -lrt)

View File

@@ -6,13 +6,13 @@
#include <cstdint>
#include <vector>
#include <semaphore>
#include "DiffractionExperiment.h"
#include "DiffractionSpot.h"
#include "../frame_serialize/JFJochFrameSerializer.h"
#include "../frame_serialize/StartMessage.h"
#include "../frame_serialize/EndMessage.h"
#include "ZeroCopyReturnValue.h"
void PrepareCBORImage(DataMessage& message,
const DiffractionExperiment &experiment,
@@ -24,7 +24,7 @@ public:
virtual void EndDataCollection(const EndMessage& message) = 0;
virtual void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) = 0;
virtual void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number,
std::binary_semaphore *sempahore) = 0;
ZeroCopyReturnValue *z) = 0;
};

View File

@@ -45,7 +45,7 @@ void TestImagePusher::SendImage(const uint8_t *image_data, size_t image_size, in
}
void TestImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number,
std::binary_semaphore *sempahore) {
ZeroCopyReturnValue *z) {
std::unique_lock<std::mutex> ul(m);
frame_counter++;
@@ -57,7 +57,7 @@ void TestImagePusher::SendImage(const uint8_t *image_data, size_t image_size, in
receiver_generated_image.resize(image_array.image.size);
memcpy(receiver_generated_image.data(), image_array.image.data, image_array.image.size);
}
sempahore->release();
z->release();
}
bool TestImagePusher::CheckSequence() const {

View File

@@ -22,7 +22,7 @@ class TestImagePusher : public ImagePusher {
public:
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override;
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number,
std::binary_semaphore *sempahore) override;
ZeroCopyReturnValue *z) override;
explicit TestImagePusher(int64_t image_number);
void StartDataCollection(const StartMessage& message) override;
void EndDataCollection(const EndMessage& message) override;

View File

@@ -48,11 +48,11 @@ void ZMQImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int
}
void ZMQImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number,
std::binary_semaphore *sempahore) {
ZeroCopyReturnValue *z) {
if (sockets.empty())
return;
auto socket_number = (image_number % file_count) % sockets.size();
sockets[socket_number]->SendZeroCopy(image_data, image_size, sempahore);
sockets[socket_number]->SendZeroCopy(image_data, image_size, z);
}
void ZMQImagePusher::StartDataCollection(const StartMessage& message) {

View File

@@ -19,7 +19,7 @@ class ZMQImagePusher : public ImagePusher {
public:
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override;
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number,
std::binary_semaphore *sempahore) override;
ZeroCopyReturnValue *z) override;
ZMQImagePusher(ZMQContext &context, const std::vector<std::string>& addr,
int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1);
// High performance implementation, where each socket has dedicated ZMQ context

View File

@@ -77,19 +77,19 @@ void ZMQSocket::Send(zmq_msg_t *msg) {
}
void zmq_socket_free(void *data, void *hint) {
auto s = (std::binary_semaphore *) hint;
s->release();
auto z = (ZeroCopyReturnValue *) hint;
z->fifo->Put(z->handle);
}
void ZMQSocket::SendZeroCopy(const void *buf, size_t buf_size, std::binary_semaphore *sempahore) {
void ZMQSocket::SendZeroCopy(const void *buf, size_t buf_size, ZeroCopyReturnValue *zero_copy_ret_val) {
std::unique_lock<std::mutex> ul(m);
zmq_msg_t msg;
if (zmq_msg_init_data(&msg, const_cast<void *>(buf), buf_size, zmq_socket_free, sempahore) != 0) {
sempahore->release();
if (zmq_msg_init_data(&msg, const_cast<void *>(buf), buf_size, zmq_socket_free, zero_copy_ret_val) != 0) {
zero_copy_ret_val->release();
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_msg_init_data failed");
}
if (zmq_msg_send(&msg, socket, 0) < 0) {
sempahore->release();
zero_copy_ret_val->release();
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_msg_send failed");
}
}

View File

@@ -12,6 +12,7 @@
#include <semaphore>
#include "JFJochException.h"
#include "ZeroCopyReturnValue.h"
class ZMQContext {
void *context;
@@ -79,7 +80,7 @@ public:
void Send();
void Send(const void *buf, size_t buf_size, bool blocking = true, bool multipart = false);
void SendZeroCopy(const void *buf, size_t buf_size, std::binary_semaphore *sempahore);
void SendZeroCopy(const void *buf, size_t buf_size, ZeroCopyReturnValue *zero_copy_ret_val);
template <class T> void Send(const std::vector<T> &buf) {
Send(buf.data(), buf.size() * sizeof(T));
}

View File

@@ -0,0 +1,18 @@
// Copyright (2019-2023) Paul Scherrer Institute
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef JUNGFRAUJOCH_ZEROCOPYRETURNVALUE_H
#define JUNGFRAUJOCH_ZEROCOPYRETURNVALUE_H
#include "ThreadSafeFIFO.h"
struct ZeroCopyReturnValue {
uint32_t handle;
ThreadSafeFIFO<uint32_t> *fifo;
void release() const {
fifo->PutBlocking(handle);
}
};
#endif //JUNGFRAUJOCH_ZEROCOPYRETURNVALUE_H