From 68e6077e73180f0512dfeb061c7e09fb472d5fa9 Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Thu, 6 Apr 2023 19:59:58 +0200 Subject: [PATCH] ProcessJFPacket: Add shared_/unique_lock --- receiver/host/ProcessJFPacket.cpp | 57 +++++++++++++++++-------------- receiver/host/ProcessJFPacket.h | 3 ++ 2 files changed, 35 insertions(+), 25 deletions(-) diff --git a/receiver/host/ProcessJFPacket.cpp b/receiver/host/ProcessJFPacket.cpp index 5dc0e143..c50bf7e1 100644 --- a/receiver/host/ProcessJFPacket.cpp +++ b/receiver/host/ProcessJFPacket.cpp @@ -37,34 +37,41 @@ bool ProcessJFPacket::ProcessPacket(jf_payload *datagram, uint32_t src_ip) { if (counter > module_info.size()) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Module number out of bounds"); - if (module_info[counter].c.frame_number != frame_number) { - if (module_info[counter].c.frame_number != UINT64_MAX) - c_fifo.Put(module_info[counter].c); + { + std::shared_lock sl(m); + if (module_info[counter].c.frame_number != frame_number) { + sl.unlock(); + { + std::unique_lock ul(m); + if (module_info[counter].c.frame_number != UINT64_MAX) + c_fifo.Put(module_info[counter].c); - auto wr = wr_fifo.GetBlocking(); + auto wr = wr_fifo.GetBlocking(); - module_info[counter].c.type = Completion::Type::Image; - module_info[counter].c.frame_number = frame_number; - module_info[counter].c.timestamp = datagram->timestamp; - module_info[counter].c.bunchid = datagram->bunchid; - module_info[counter].c.debug = datagram->debug; - module_info[counter].c.packet_mask[0] = 0; - module_info[counter].c.packet_mask[1] = 0; - module_info[counter].c.packet_count = 0; - module_info[counter].c.module = module_number; + module_info[counter].c.type = Completion::Type::Image; + module_info[counter].c.frame_number = frame_number; + module_info[counter].c.timestamp = datagram->timestamp; + module_info[counter].c.bunchid = datagram->bunchid; + module_info[counter].c.debug = datagram->debug; + module_info[counter].c.packet_mask[0] = 0; + module_info[counter].c.packet_mask[1] = 0; + module_info[counter].c.packet_count = 0; + module_info[counter].c.module = module_number; - module_info[counter].c.handle = wr.handle; - module_info[counter].ptr = wr.ptr; + module_info[counter].c.handle = wr.handle; + module_info[counter].ptr = wr.ptr; + } + sl.lock(); + } + + module_info[counter].c.packet_count++; + module_info[counter].c.packet_mask[packetnum >= 64 ? 1 : 0] |= (1LU << (packetnum % 64)); + + if (conversion) + conversion->ConvertPacket((int16_t *) (module_info[counter].ptr + 4096 * packetnum), datagram->data, + packetnum); + else + memcpy(module_info[counter].ptr + 4096 * packetnum, datagram->data, 4096 * sizeof(uint16_t)); } - - - module_info[counter].c.packet_count++; - module_info[counter].c.packet_mask[packetnum >= 64 ? 1 : 0] |= (1LU << (packetnum % 64)); - - if (conversion) - conversion->ConvertPacket((int16_t *) (module_info[counter].ptr + 4096 * packetnum), datagram->data, packetnum); - else - memcpy(module_info[counter].ptr + 4096 * packetnum, datagram->data, 4096 * sizeof(uint16_t)); - return true; } diff --git a/receiver/host/ProcessJFPacket.h b/receiver/host/ProcessJFPacket.h index 91651fa3..50f1a1d0 100644 --- a/receiver/host/ProcessJFPacket.h +++ b/receiver/host/ProcessJFPacket.h @@ -9,6 +9,8 @@ #include "../../jungfrau/JFConversion.h" #include "RawJFUDPPacket.h" +#include + struct ProcessWorkRequest { uint16_t *ptr; uint32_t handle; @@ -20,6 +22,7 @@ struct ModuleInfo { }; class ProcessJFPacket { + std::shared_mutex m; ThreadSafeFIFO &c_fifo; ThreadSafeFIFO &wr_fifo; JFConversion *conversion;