JFJochReceiver: Use zero-copy ZeroMQ sending
This commit is contained in:
@@ -6,6 +6,7 @@
|
||||
|
||||
#include <cstdint>
|
||||
#include <vector>
|
||||
#include <semaphore>
|
||||
|
||||
#include "DiffractionExperiment.h"
|
||||
#include "DiffractionSpot.h"
|
||||
@@ -22,6 +23,8 @@ public:
|
||||
virtual void StartDataCollection(const StartMessage& message) = 0;
|
||||
virtual void EndDataCollection(const EndMessage& message) = 0;
|
||||
virtual void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) = 0;
|
||||
virtual void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number,
|
||||
std::binary_semaphore *sempahore) = 0;
|
||||
};
|
||||
|
||||
|
||||
|
||||
@@ -44,6 +44,22 @@ void TestImagePusher::SendImage(const uint8_t *image_data, size_t image_size, in
|
||||
}
|
||||
}
|
||||
|
||||
void TestImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number,
|
||||
std::binary_semaphore *sempahore) {
|
||||
std::unique_lock<std::mutex> ul(m);
|
||||
|
||||
frame_counter++;
|
||||
if (image_number == image_id) {
|
||||
JFJochFrameDeserializer deserializer;
|
||||
deserializer.Process(image_data, image_size);
|
||||
|
||||
auto image_array = deserializer.GetDataMessage();
|
||||
receiver_generated_image.resize(image_array.image.size);
|
||||
memcpy(receiver_generated_image.data(), image_array.image.data, image_array.image.size);
|
||||
}
|
||||
sempahore->release();
|
||||
}
|
||||
|
||||
bool TestImagePusher::CheckSequence() const {
|
||||
std::unique_lock<std::mutex> ul(m);
|
||||
return correct_sequence;
|
||||
|
||||
@@ -21,6 +21,8 @@ class TestImagePusher : public ImagePusher {
|
||||
size_t frame_counter = 0;
|
||||
public:
|
||||
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override;
|
||||
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number,
|
||||
std::binary_semaphore *sempahore) override;
|
||||
explicit TestImagePusher(int64_t image_number);
|
||||
void StartDataCollection(const StartMessage& message) override;
|
||||
void EndDataCollection(const EndMessage& message) override;
|
||||
|
||||
@@ -47,6 +47,14 @@ void ZMQImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int
|
||||
sockets[socket_number]->Send(image_data, image_size);
|
||||
}
|
||||
|
||||
void ZMQImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number,
|
||||
std::binary_semaphore *sempahore) {
|
||||
if (sockets.empty())
|
||||
return;
|
||||
auto socket_number = (image_number % file_count) % sockets.size();
|
||||
sockets[socket_number]->SendZeroCopy(image_data, image_size, sempahore);
|
||||
}
|
||||
|
||||
void ZMQImagePusher::StartDataCollection(const StartMessage& message) {
|
||||
JFJochFrameSerializer serializer(80*1024*1024); // 80 MiB should be safe even for 16M
|
||||
|
||||
|
||||
@@ -18,6 +18,8 @@ class ZMQImagePusher : public ImagePusher {
|
||||
int64_t file_count = 1;
|
||||
public:
|
||||
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override;
|
||||
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number,
|
||||
std::binary_semaphore *sempahore) override;
|
||||
ZMQImagePusher(ZMQContext &context, const std::vector<std::string>& addr,
|
||||
int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1);
|
||||
// High performance implementation, where each socket has dedicated ZMQ context
|
||||
|
||||
@@ -76,6 +76,24 @@ void ZMQSocket::Send(zmq_msg_t *msg) {
|
||||
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_msg_send failed");
|
||||
}
|
||||
|
||||
void zmq_socket_free(void *data, void *hint) {
|
||||
auto s = (std::binary_semaphore *) hint;
|
||||
s->release();
|
||||
}
|
||||
|
||||
void ZMQSocket::SendZeroCopy(const void *buf, size_t buf_size, std::binary_semaphore *sempahore) {
|
||||
std::unique_lock<std::mutex> ul(m);
|
||||
zmq_msg_t msg;
|
||||
if (zmq_msg_init_data(&msg, const_cast<void *>(buf), buf_size, zmq_socket_free, sempahore) != 0) {
|
||||
sempahore->release();
|
||||
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_msg_init_data failed");
|
||||
}
|
||||
if (zmq_msg_send(&msg, socket, 0) < 0) {
|
||||
sempahore->release();
|
||||
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_msg_send failed");
|
||||
}
|
||||
}
|
||||
|
||||
int64_t ZMQSocket::Receive(bool blocking) {
|
||||
std::vector<uint8_t> msg;
|
||||
return Receive(msg, blocking, true);
|
||||
|
||||
@@ -9,6 +9,8 @@
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
#include <zmq.h>
|
||||
#include <semaphore>
|
||||
|
||||
#include "JFJochException.h"
|
||||
|
||||
class ZMQContext {
|
||||
@@ -77,6 +79,7 @@ public:
|
||||
|
||||
void Send();
|
||||
void Send(const void *buf, size_t buf_size, bool blocking = true, bool multipart = false);
|
||||
void SendZeroCopy(const void *buf, size_t buf_size, std::binary_semaphore *sempahore);
|
||||
template <class T> void Send(const std::vector<T> &buf) {
|
||||
Send(buf.data(), buf.size() * sizeof(T));
|
||||
}
|
||||
|
||||
@@ -286,10 +286,13 @@ int64_t JFJochReceiver::MiniSummationThread(int d, int m, size_t image_number, b
|
||||
|
||||
int64_t JFJochReceiver::FrameTransformationThread() {
|
||||
|
||||
|
||||
FrameTransformation transformation(experiment);
|
||||
|
||||
JFJochFrameSerializer serializer(experiment.GetPixelsNum()*sizeof(uint32_t)*2);
|
||||
|
||||
std::binary_semaphore serializer_buffer_sempahore(1);
|
||||
|
||||
std::unique_ptr<GPUImageAnalysis> spot_finder;
|
||||
|
||||
try {
|
||||
@@ -452,11 +455,14 @@ int64_t JFJochReceiver::FrameTransformationThread() {
|
||||
}
|
||||
|
||||
if (push_images_to_writer) {
|
||||
serializer_buffer_sempahore.acquire();
|
||||
PrepareCBORImage(message, experiment, nullptr, 0);
|
||||
serializer.SerializeImage(message);
|
||||
size_t image_size = transformation.SaveCompressedImage(serializer.GetImageAppendLocation());
|
||||
serializer.AppendImage(image_size);
|
||||
image_pusher.SendImage(serializer.GetBuffer(), serializer.GetBufferSize(), image_number);
|
||||
image_pusher.SendImage(const_cast<uint8_t *>(serializer.GetBuffer()), serializer.GetBufferSize(),
|
||||
image_number,
|
||||
&serializer_buffer_sempahore);
|
||||
compressed_size += image_size;
|
||||
}
|
||||
|
||||
@@ -467,6 +473,7 @@ int64_t JFJochReceiver::FrameTransformationThread() {
|
||||
}
|
||||
|
||||
spot_finder->UnregisterBuffer();
|
||||
serializer_buffer_sempahore.acquire();
|
||||
|
||||
logger.Debug("Sum&compression thread done");
|
||||
|
||||
|
||||
Reference in New Issue
Block a user