JFJochReceiver: Using ZeroCopyReturnValue

This commit is contained in:
2023-05-09 16:40:47 +02:00
parent eac366cbe5
commit b33cc45edf
4 changed files with 28 additions and 9 deletions
+1 -1
View File
@@ -78,7 +78,7 @@ void ZMQSocket::Send(zmq_msg_t *msg) {
void zmq_socket_free(void *data, void *hint) {
auto z = (ZeroCopyReturnValue *) hint;
z->fifo->Put(z->handle);
z->release();
}
void ZMQSocket::SendZeroCopy(const void *buf, size_t buf_size, ZeroCopyReturnValue *zero_copy_ret_val) {
+4 -1
View File
@@ -6,9 +6,12 @@
#include "ThreadSafeFIFO.h"
struct ZeroCopyReturnValue {
class ZeroCopyReturnValue {
uint32_t handle;
ThreadSafeFIFO<uint32_t> *fifo;
public:
ZeroCopyReturnValue(ThreadSafeFIFO<uint32_t> &in_fifo, uint32_t in_handle) :
fifo(&in_fifo), handle(in_handle) {}
void release() const {
fifo->PutBlocking(handle);
+17 -6
View File
@@ -37,7 +37,8 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings,
preview_publisher(in_preview_publisher),
ndatastreams(experiment.GetDataStreamsNum()),
data_acquisition_ready(ndatastreams),
frame_transformation_ready((experiment.GetImageNum() > 0) ? frame_transformation_nthreads : 0)
frame_transformation_ready((experiment.GetImageNum() > 0) ? frame_transformation_nthreads : 0),
send_buffer(send_buffer_size * send_buffer_count)
{
if (settings.has_calibration()) {
calib.emplace(settings.calibration());
@@ -47,6 +48,11 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings,
for (auto &i: one_byte_mask) i = 1;
}
for (uint32_t i = 0; i < send_buffer_count; i++) {
send_buffer_avail.Put(i);
send_buffer_zero_copy_ret_val.emplace_back(send_buffer_avail, i);
}
if (experiment.GetConversionOnCPU())
PrepareConversionOnCPU();
@@ -291,9 +297,8 @@ int64_t JFJochReceiver::MiniSummationThread(int d, int m, size_t image_number, b
int64_t JFJochReceiver::FrameTransformationThread() {
FrameTransformation transformation(experiment);
std::vector<uint8_t> serialization_buffer(serializer_buffer_size);
std::unique_ptr<GPUImageAnalysis> spot_finder;
std::unique_ptr<GPUImageAnalysis> spot_finder;
try {
if (rad_int_mapping)
@@ -461,15 +466,18 @@ int64_t JFJochReceiver::FrameTransformationThread() {
}
if (push_images_to_writer) {
JFJochFrameSerializer serializer(serialization_buffer.data(), serialization_buffer.size());
auto send_buffer_handle = send_buffer_avail.GetBlocking();
auto ptr = send_buffer.data() + send_buffer_size * send_buffer_handle;
JFJochFrameSerializer serializer(ptr, send_buffer_size);
PrepareCBORImage(message, experiment, nullptr, 0);
serializer.SerializeImage(message);
if (serializer.GetRemainingBuffer() < experiment.GetMaxCompressedSize())
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds,
"Not enough memory to save image");
size_t image_size = transformation.SaveCompressedImage(serialization_buffer.data() + serializer.GetImageAppendOffset());
size_t image_size = transformation.SaveCompressedImage(ptr + serializer.GetImageAppendOffset());
serializer.AppendImage(image_size);
image_pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), image_number);
image_pusher.SendImage(ptr, serializer.GetBufferSize(), image_number,
&send_buffer_zero_copy_ret_val[send_buffer_handle]);
compressed_size += image_size;
}
@@ -611,6 +619,9 @@ void JFJochReceiver::FinalizeMeasurement() {
if (val >= 0) max_delay.push_back(val);
}
for (int i = 0; i < send_buffer_count; i++)
send_buffer_avail.GetBlocking();
logger.Info("Devices stopped");
logger.Info("Receiving data done");
}
+6 -1
View File
@@ -88,8 +88,13 @@ class JFJochReceiver {
int64_t max_image_number_sent = 0;
std::mutex max_image_number_sent_mutex;
const size_t serializer_buffer_size = experiment.GetMaxCompressedSize() + 1024*1024;
const size_t send_buffer_size = experiment.GetMaxCompressedSize() + 1024*1024;
// max compressed size + 1 MiB reserve for spots and rad. integration results
const size_t send_buffer_count = 256;
ThreadSafeFIFO<uint32_t> send_buffer_avail;
std::vector<uint8_t> send_buffer;
std::vector<ZeroCopyReturnValue> send_buffer_zero_copy_ret_val;
void PinThreadToDevice(uint16_t data_stream);
void PrepareConversionOnCPU();