diff --git a/common/ImagePusher.h b/common/ImagePusher.h index 7d2f4ee9..059c02dc 100644 --- a/common/ImagePusher.h +++ b/common/ImagePusher.h @@ -6,6 +6,7 @@ #include #include +#include #include "DiffractionExperiment.h" #include "DiffractionSpot.h" @@ -22,6 +23,8 @@ public: virtual void StartDataCollection(const StartMessage& message) = 0; virtual void EndDataCollection(const EndMessage& message) = 0; virtual void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) = 0; + virtual void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, + std::binary_semaphore *sempahore) = 0; }; diff --git a/common/TestImagePusher.cpp b/common/TestImagePusher.cpp index 2bd2254a..1426c861 100644 --- a/common/TestImagePusher.cpp +++ b/common/TestImagePusher.cpp @@ -44,6 +44,22 @@ void TestImagePusher::SendImage(const uint8_t *image_data, size_t image_size, in } } +void TestImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, + std::binary_semaphore *sempahore) { + std::unique_lock ul(m); + + frame_counter++; + if (image_number == image_id) { + JFJochFrameDeserializer deserializer; + deserializer.Process(image_data, image_size); + + auto image_array = deserializer.GetDataMessage(); + receiver_generated_image.resize(image_array.image.size); + memcpy(receiver_generated_image.data(), image_array.image.data, image_array.image.size); + } + sempahore->release(); +} + bool TestImagePusher::CheckSequence() const { std::unique_lock ul(m); return correct_sequence; diff --git a/common/TestImagePusher.h b/common/TestImagePusher.h index 47d89ba2..97fd46f5 100644 --- a/common/TestImagePusher.h +++ b/common/TestImagePusher.h @@ -21,6 +21,8 @@ class TestImagePusher : public ImagePusher { size_t frame_counter = 0; public: void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override; + void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, + std::binary_semaphore *sempahore) override; explicit TestImagePusher(int64_t image_number); void StartDataCollection(const StartMessage& message) override; void EndDataCollection(const EndMessage& message) override; diff --git a/common/ZMQImagePusher.cpp b/common/ZMQImagePusher.cpp index 96537e5b..922a3525 100644 --- a/common/ZMQImagePusher.cpp +++ b/common/ZMQImagePusher.cpp @@ -47,6 +47,14 @@ void ZMQImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int sockets[socket_number]->Send(image_data, image_size); } +void ZMQImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, + std::binary_semaphore *sempahore) { + if (sockets.empty()) + return; + auto socket_number = (image_number % file_count) % sockets.size(); + sockets[socket_number]->SendZeroCopy(image_data, image_size, sempahore); +} + void ZMQImagePusher::StartDataCollection(const StartMessage& message) { JFJochFrameSerializer serializer(80*1024*1024); // 80 MiB should be safe even for 16M diff --git a/common/ZMQImagePusher.h b/common/ZMQImagePusher.h index ccdaa544..1ec8ee59 100644 --- a/common/ZMQImagePusher.h +++ b/common/ZMQImagePusher.h @@ -18,6 +18,8 @@ class ZMQImagePusher : public ImagePusher { int64_t file_count = 1; public: void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override; + void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, + std::binary_semaphore *sempahore) override; ZMQImagePusher(ZMQContext &context, const std::vector& addr, int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1); // High performance implementation, where each socket has dedicated ZMQ context diff --git a/common/ZMQWrappers.cpp b/common/ZMQWrappers.cpp index d981911d..19d34284 100644 --- a/common/ZMQWrappers.cpp +++ b/common/ZMQWrappers.cpp @@ -76,6 +76,24 @@ void ZMQSocket::Send(zmq_msg_t *msg) { throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_msg_send failed"); } +void zmq_socket_free(void *data, void *hint) { + auto s = (std::binary_semaphore *) hint; + s->release(); +} + +void ZMQSocket::SendZeroCopy(const void *buf, size_t buf_size, std::binary_semaphore *sempahore) { + std::unique_lock ul(m); + zmq_msg_t msg; + if (zmq_msg_init_data(&msg, const_cast(buf), buf_size, zmq_socket_free, sempahore) != 0) { + sempahore->release(); + throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_msg_init_data failed"); + } + if (zmq_msg_send(&msg, socket, 0) < 0) { + sempahore->release(); + throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_msg_send failed"); + } +} + int64_t ZMQSocket::Receive(bool blocking) { std::vector msg; return Receive(msg, blocking, true); diff --git a/common/ZMQWrappers.h b/common/ZMQWrappers.h index 9c0df8d4..90ed8561 100644 --- a/common/ZMQWrappers.h +++ b/common/ZMQWrappers.h @@ -9,6 +9,8 @@ #include #include #include +#include + #include "JFJochException.h" class ZMQContext { @@ -77,6 +79,7 @@ public: void Send(); void Send(const void *buf, size_t buf_size, bool blocking = true, bool multipart = false); + void SendZeroCopy(const void *buf, size_t buf_size, std::binary_semaphore *sempahore); template void Send(const std::vector &buf) { Send(buf.data(), buf.size() * sizeof(T)); } diff --git a/receiver/JFJochReceiver.cpp b/receiver/JFJochReceiver.cpp index f6d61d76..d9e84762 100644 --- a/receiver/JFJochReceiver.cpp +++ b/receiver/JFJochReceiver.cpp @@ -286,10 +286,13 @@ int64_t JFJochReceiver::MiniSummationThread(int d, int m, size_t image_number, b int64_t JFJochReceiver::FrameTransformationThread() { + FrameTransformation transformation(experiment); JFJochFrameSerializer serializer(experiment.GetPixelsNum()*sizeof(uint32_t)*2); + std::binary_semaphore serializer_buffer_sempahore(1); + std::unique_ptr spot_finder; try { @@ -452,11 +455,14 @@ int64_t JFJochReceiver::FrameTransformationThread() { } if (push_images_to_writer) { + serializer_buffer_sempahore.acquire(); PrepareCBORImage(message, experiment, nullptr, 0); serializer.SerializeImage(message); size_t image_size = transformation.SaveCompressedImage(serializer.GetImageAppendLocation()); serializer.AppendImage(image_size); - image_pusher.SendImage(serializer.GetBuffer(), serializer.GetBufferSize(), image_number); + image_pusher.SendImage(const_cast(serializer.GetBuffer()), serializer.GetBufferSize(), + image_number, + &serializer_buffer_sempahore); compressed_size += image_size; } @@ -467,6 +473,7 @@ int64_t JFJochReceiver::FrameTransformationThread() { } spot_finder->UnregisterBuffer(); + serializer_buffer_sempahore.acquire(); logger.Debug("Sum&compression thread done");