diff --git a/common/ZMQWrappers.cpp b/common/ZMQWrappers.cpp index 4245dbff..f235d95a 100644 --- a/common/ZMQWrappers.cpp +++ b/common/ZMQWrappers.cpp @@ -78,7 +78,7 @@ void ZMQSocket::Send(zmq_msg_t *msg) { void zmq_socket_free(void *data, void *hint) { auto z = (ZeroCopyReturnValue *) hint; - z->fifo->Put(z->handle); + z->release(); } void ZMQSocket::SendZeroCopy(const void *buf, size_t buf_size, ZeroCopyReturnValue *zero_copy_ret_val) { diff --git a/common/ZeroCopyReturnValue.h b/common/ZeroCopyReturnValue.h index 2037d6fe..59742d76 100644 --- a/common/ZeroCopyReturnValue.h +++ b/common/ZeroCopyReturnValue.h @@ -6,9 +6,12 @@ #include "ThreadSafeFIFO.h" -struct ZeroCopyReturnValue { +class ZeroCopyReturnValue { uint32_t handle; ThreadSafeFIFO *fifo; +public: + ZeroCopyReturnValue(ThreadSafeFIFO &in_fifo, uint32_t in_handle) : + fifo(&in_fifo), handle(in_handle) {} void release() const { fifo->PutBlocking(handle); diff --git a/receiver/JFJochReceiver.cpp b/receiver/JFJochReceiver.cpp index 3e385106..d4294e35 100644 --- a/receiver/JFJochReceiver.cpp +++ b/receiver/JFJochReceiver.cpp @@ -37,7 +37,8 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings, preview_publisher(in_preview_publisher), ndatastreams(experiment.GetDataStreamsNum()), data_acquisition_ready(ndatastreams), - frame_transformation_ready((experiment.GetImageNum() > 0) ? frame_transformation_nthreads : 0) + frame_transformation_ready((experiment.GetImageNum() > 0) ? frame_transformation_nthreads : 0), + send_buffer(send_buffer_size * send_buffer_count) { if (settings.has_calibration()) { calib.emplace(settings.calibration()); @@ -47,6 +48,11 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings, for (auto &i: one_byte_mask) i = 1; } + for (uint32_t i = 0; i < send_buffer_count; i++) { + send_buffer_avail.Put(i); + send_buffer_zero_copy_ret_val.emplace_back(send_buffer_avail, i); + } + if (experiment.GetConversionOnCPU()) PrepareConversionOnCPU(); @@ -291,9 +297,8 @@ int64_t JFJochReceiver::MiniSummationThread(int d, int m, size_t image_number, b int64_t JFJochReceiver::FrameTransformationThread() { FrameTransformation transformation(experiment); - std::vector serialization_buffer(serializer_buffer_size); - std::unique_ptr spot_finder; + std::unique_ptr spot_finder; try { if (rad_int_mapping) @@ -461,15 +466,18 @@ int64_t JFJochReceiver::FrameTransformationThread() { } if (push_images_to_writer) { - JFJochFrameSerializer serializer(serialization_buffer.data(), serialization_buffer.size()); + auto send_buffer_handle = send_buffer_avail.GetBlocking(); + auto ptr = send_buffer.data() + send_buffer_size * send_buffer_handle; + JFJochFrameSerializer serializer(ptr, send_buffer_size); PrepareCBORImage(message, experiment, nullptr, 0); serializer.SerializeImage(message); if (serializer.GetRemainingBuffer() < experiment.GetMaxCompressedSize()) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Not enough memory to save image"); - size_t image_size = transformation.SaveCompressedImage(serialization_buffer.data() + serializer.GetImageAppendOffset()); + size_t image_size = transformation.SaveCompressedImage(ptr + serializer.GetImageAppendOffset()); serializer.AppendImage(image_size); - image_pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), image_number); + image_pusher.SendImage(ptr, serializer.GetBufferSize(), image_number, + &send_buffer_zero_copy_ret_val[send_buffer_handle]); compressed_size += image_size; } @@ -611,6 +619,9 @@ void JFJochReceiver::FinalizeMeasurement() { if (val >= 0) max_delay.push_back(val); } + for (int i = 0; i < send_buffer_count; i++) + send_buffer_avail.GetBlocking(); + logger.Info("Devices stopped"); logger.Info("Receiving data done"); } diff --git a/receiver/JFJochReceiver.h b/receiver/JFJochReceiver.h index bf64238a..7cc2172c 100644 --- a/receiver/JFJochReceiver.h +++ b/receiver/JFJochReceiver.h @@ -88,8 +88,13 @@ class JFJochReceiver { int64_t max_image_number_sent = 0; std::mutex max_image_number_sent_mutex; - const size_t serializer_buffer_size = experiment.GetMaxCompressedSize() + 1024*1024; + const size_t send_buffer_size = experiment.GetMaxCompressedSize() + 1024*1024; // max compressed size + 1 MiB reserve for spots and rad. integration results + const size_t send_buffer_count = 256; + + ThreadSafeFIFO send_buffer_avail; + std::vector send_buffer; + std::vector send_buffer_zero_copy_ret_val; void PinThreadToDevice(uint16_t data_stream); void PrepareConversionOnCPU();