From be7cd4994e6d89d8724aad6c52185f1d04278422 Mon Sep 17 00:00:00 2001 From: Mohacsi Istvan Date: Mon, 7 Jun 2021 15:39:45 +0200 Subject: [PATCH] Tests no longer hang with 25MB UDP buffers --- core-buffer/include/jungfraujoch.hpp | 7 +- core-buffer/test/test_PacketBuffer.cpp | 254 ------------------- jfj-udp-recv/include/JfjFrameUdpReceiver.hpp | 3 +- jfj-udp-recv/include/PacketBuffer.hpp | 1 + jfj-udp-recv/src/JfjFrameUdpReceiver.cpp | 30 ++- jfj-udp-recv/test/main.cpp | 1 + jfj-udp-recv/test/test_FrameUdpReceiver.cpp | 65 +++-- jfj-udp-recv/test/test_PacketBuffer.cpp | 65 +++++ 8 files changed, 133 insertions(+), 293 deletions(-) delete mode 100644 core-buffer/test/test_PacketBuffer.cpp create mode 100644 jfj-udp-recv/test/test_PacketBuffer.cpp diff --git a/core-buffer/include/jungfraujoch.hpp b/core-buffer/include/jungfraujoch.hpp index 6d219ca..87ae5ca 100644 --- a/core-buffer/include/jungfraujoch.hpp +++ b/core-buffer/include/jungfraujoch.hpp @@ -3,13 +3,13 @@ #include -#define JFJOCH_N_MODULES 32 +#define JFJOCH_N_MODULES 8 #define JFJOCH_BYTES_PER_PACKET 8240 #define JFJOCH_DATA_BYTES_PER_PACKET 8192 #define JFJOCH_N_PACKETS_PER_FRAME (JFJOCH_N_MODULES * 128) #define JFJOCH_DATA_BYTES_PER_FRAME (JFJOCH_N_MODULES * 1048576) -// 48 bytes + 8192 bytes = 8240 bytes +// 48 bytes + 8192 bytes = 8240 bytes (below 9000 MTU) #pragma pack(push) #pragma pack(2) struct jfjoch_packet_t { @@ -33,5 +33,4 @@ struct jfjoch_packet_t { }; #pragma pack(pop) - -#endif +#endif // JUNGFRAUJOCH_HPP diff --git a/core-buffer/test/test_PacketBuffer.cpp b/core-buffer/test/test_PacketBuffer.cpp deleted file mode 100644 index 110e6ca..0000000 --- a/core-buffer/test/test_PacketBuffer.cpp +++ /dev/null @@ -1,254 +0,0 @@ -#include -#include -#include "gtest/gtest.h" -#include "PacketBuffer.hpp" - -#include -#include -#include - -using namespace std; - -template -class MockReceiver{ - public: - int idx_packet = 42000; - int packet_per_frame = 512; - int num_bunches = 100; - int num_packets =50; - - int receive_many(mmsghdr* msgs, const size_t n_msgs){ - // Receive 'num_packets numner of packets' - - for(int ii=0; ii(mmsghdr[ii].msg_hdr.msg_iov->iov_base); - refer.bunchid = idx_packet / packet_per_frame; - refer.packetnum = idx_packet % packet_per_frame; - idx_packet++; - } - return std::min(num_packets, n_msgs); - }; -}; - -// -// -// -// -//TEST(BufferUdpReceiver, simple_recv) -//{ -// auto n_packets = JF_N_PACKETS_PER_FRAME; -// int n_frames = 5; -// -// uint16_t udp_port = MOCK_UDP_PORT; -// auto server_address = get_server_address(udp_port); -// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); -// ASSERT_TRUE(send_socket_fd >= 0); -// -// JfjFrameUdpReceiver udp_receiver(udp_port); -// -// auto handle = async(launch::async, [&](){ -// for (int i_frame=0; i_frame < n_frames; i_frame++){ -// for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); -// -// for (int i_frame=0; i_frame < n_frames; i_frame++) { -// auto pulse_id = udp_receiver.get_frame_from_udp( -// metadata, frame_buffer.get()); -// -// ASSERT_EQ(i_frame + 1, pulse_id); -// ASSERT_EQ(metadata.frame_index, i_frame + 1000); -// ASSERT_EQ(metadata.daq_rec, i_frame + 10000); -// // -1 because we skipped a packet. -// ASSERT_EQ(metadata.n_recv_packets, n_packets); -// } -// -// ::close(send_socket_fd); -//} -// -//TEST(BufferUdpReceiver, missing_middle_packet) -//{ -// auto n_packets = JF_N_PACKETS_PER_FRAME; -// int n_frames = 3; -// -// uint16_t udp_port = MOCK_UDP_PORT; -// auto server_address = get_server_address(udp_port); -// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); -// ASSERT_TRUE(send_socket_fd >= 0); -// -// JfjFrameUdpReceiver udp_receiver(udp_port, source_id); -// -// auto handle = async(launch::async, [&](){ -// for (int i_frame=0; i_frame < n_frames; i_frame++){ -// for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); -// -// for (int i_frame=0; i_frame < n_frames; i_frame++) { -// auto pulse_id = udp_receiver.get_frame_from_udp( -// metadata, frame_buffer.get()); -// -// ASSERT_EQ(i_frame + 1, pulse_id); -// ASSERT_EQ(metadata.frame_index, i_frame + 1000); -// ASSERT_EQ(metadata.daq_rec, i_frame + 10000); -// // -1 because we skipped a packet. -// ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); -// } -// -// ::close(send_socket_fd); -//} -// -//TEST(BufferUdpReceiver, missing_first_packet) -//{ -// auto n_packets = JF_N_PACKETS_PER_FRAME; -// int n_frames = 3; -// -// uint16_t udp_port = MOCK_UDP_PORT; -// auto server_address = get_server_address(udp_port); -// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); -// ASSERT_TRUE(send_socket_fd >= 0); -// -// JfjFrameUdpReceiver udp_receiver(udp_port); -// -// auto handle = async(launch::async, [&](){ -// for (int i_frame=0; i_frame < n_frames; i_frame++){ -// for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); -// -// for (int i_frame=0; i_frame < n_frames; i_frame++) { -// auto pulse_id = udp_receiver.get_frame_from_udp( -// metadata, frame_buffer.get()); -// -// ASSERT_EQ(i_frame + 1, pulse_id); -// ASSERT_EQ(metadata.frame_index, i_frame + 1000); -// ASSERT_EQ(metadata.daq_rec, i_frame + 10000); -// // -1 because we skipped a packet. -// ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); -// } -// -// ::close(send_socket_fd); -//} -// -//TEST(BufferUdpReceiver, missing_last_packet) -//{ -// auto n_packets = JF_N_PACKETS_PER_FRAME; -// int n_frames = 3; -// -// uint16_t udp_port = MOCK_UDP_PORT; -// auto server_address = get_server_address(udp_port); -// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); -// ASSERT_TRUE(send_socket_fd >= 0); -// -// JfjFrameUdpReceiver udp_receiver(udp_port); -// -// auto handle = async(launch::async, [&](){ -// for (int i_frame=0; i_frame < n_frames; i_frame++){ -// for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); -// -// // n_frames -1 because the last frame is not complete. -// for (int i_frame=0; i_frame < n_frames - 1; i_frame++) { -// auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get()); -// -// ASSERT_EQ(i_frame + 1, pulse_id); -// ASSERT_EQ(metadata.frame_index, i_frame + 1000); -// ASSERT_EQ(metadata.daq_rec, i_frame + 10000); -// // -1 because we skipped a packet. -// ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); -// } -// -// ::close(send_socket_fd); -//} diff --git a/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp index aad5962..a575201 100644 --- a/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp +++ b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp @@ -15,7 +15,8 @@ **/ class JfjFrameUdpReceiver { PacketUdpReceiver m_udp_receiver; - uint64_t m_frame_index; + bool is_initialized = false; + uint64_t m_frame_index = 0; PacketBuffer m_buffer; diff --git a/jfj-udp-recv/include/PacketBuffer.hpp b/jfj-udp-recv/include/PacketBuffer.hpp index 0697a5f..abd08bb 100644 --- a/jfj-udp-recv/include/PacketBuffer.hpp +++ b/jfj-udp-recv/include/PacketBuffer.hpp @@ -52,6 +52,7 @@ public: void fill_from(TY& recv){ std::lock_guard g_guard(m_mutex); this->idx_write = recv.receive_many(m_msgs, this->capacity()); + // std::cout << "Received " << this->idx_write << " frames" << std::endl; // Returns -1 with errno=11 if no data received if(idx_write==-1){ idx_write = 0; } this->idx_read = 0; diff --git a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp index 059da2b..e987387 100644 --- a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp +++ b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp @@ -34,13 +34,19 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char while(!m_buffer.is_empty()){ // Happens if the last packet from the previous frame gets lost. if (m_frame_index != m_buffer.peek_front().framenum) { + std::cout << "Metadata pulse: " << metadata.pulse_id << "\tIndex: " << m_frame_index << "\tCurrent one is: " << m_buffer.peek_front().framenum << std::endl; + m_frame_index = m_buffer.peek_front().framenum; - std::cout << "Peeked pulse: " << metadata.pulse_id << std::endl; - return metadata.pulse_id; + if(this->is_initialized){ + return metadata.pulse_id; + }else{ + this->is_initialized = true; + } } // Otherwise pop the queue (and set current frame index) jfjoch_packet_t& c_packet = m_buffer.pop_front(); + std::cout << c_packet << std::endl; m_frame_index = c_packet.framenum; // Always copy metadata (otherwise problem when 0th packet gets lost) @@ -69,18 +75,28 @@ uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ModuleFrame& metadata, char* fr memset(frame_buffer, 0, JFJOCH_DATA_BYTES_PER_FRAME); // Process leftover packages in the buffer if (!m_buffer.is_empty()) { + std::cout << "Leftovers..." << std::endl; auto pulse_id = process_packets(metadata, frame_buffer); - if (pulse_id != 0) { return pulse_id; } + if (pulse_id != 0) { + std::cout << "Returning frame: " << pulse_id << std::endl; + return pulse_id; } } while (true) { // Receive new packages (pass if none)... - m_buffer.reset(); - m_buffer.fill_from(m_udp_receiver); - if (m_buffer.is_empty()) { continue; } + std::cout << "Really new..." << std::endl; + + // m_buffer.reset(); + m_buffer.fill_from(m_udp_receiver); + std::cout << "Got " << m_buffer.size() << std::endl; + if (m_buffer.is_empty()) { + std::cout << "Empty..." << std::endl; + continue; } // ... and process them auto pulse_id = process_packets(metadata, frame_buffer); - if (pulse_id != 0) { return pulse_id; } + if (pulse_id != 0) { + std::cout << "Returning frame: " << pulse_id << std::endl; + return pulse_id; } } } diff --git a/jfj-udp-recv/test/main.cpp b/jfj-udp-recv/test/main.cpp index 8f2cd01..de16fb4 100644 --- a/jfj-udp-recv/test/main.cpp +++ b/jfj-udp-recv/test/main.cpp @@ -1,6 +1,7 @@ #include "gtest/gtest.h" #include "test_PacketUdpReceiver.cpp" #include "test_FrameUdpReceiver.cpp" +#include "test_PacketBuffer.cpp" using namespace std; diff --git a/jfj-udp-recv/test/test_FrameUdpReceiver.cpp b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp index 3f490c7..d32455c 100644 --- a/jfj-udp-recv/test/test_FrameUdpReceiver.cpp +++ b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp @@ -10,53 +10,64 @@ using namespace std; -TEST(BufferUdpReceiver, simple_recv) -{ - int n_packets = JFJOCH_N_PACKETS_PER_FRAME; - int n_frames = 5; +void mockDetector(int& udp_socket_fd, int64_t n_frames){ uint16_t udp_port = MOCK_UDP_PORT; auto server_address = get_server_address(udp_port); - auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); - ASSERT_TRUE(send_socket_fd >= 0); - JfjFrameUdpReceiver udp_receiver(udp_port); + for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet= 0); + + + int n_frames = 3; + + + // Open receiver + JfjFrameUdpReceiver udp_receiver(MOCK_UDP_PORT); + + std::cout << "NI" << std::endl; + auto handle = async(launch::async, [&]{ mockDetector(send_socket_fd, n_frames); } ); + std::cout << "NI before wait()" << std::endl; handle.wait(); + std::cout << "NI after wait()" << std::endl; ModuleFrame metadata; auto frame_buffer = make_unique(JFJOCH_DATA_BYTES_PER_FRAME); + std::cout << "NI" << std::endl; for (int i_frame=0; i_frame < n_frames; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp( - metadata, frame_buffer.get()); + std::cout << "Getting frame: " << i_frame << std::endl; + auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get()); + std::cout << ".. gotcha " << pulse_id << std::endl; ASSERT_EQ(i_frame + 1, pulse_id); ASSERT_EQ(metadata.frame_index, i_frame + 1000); ASSERT_EQ(metadata.daq_rec, i_frame + 10000); // -1 because we skipped a packet. - ASSERT_EQ(metadata.n_recv_packets, n_packets); + ASSERT_EQ(metadata.n_recv_packets, JFJOCH_N_PACKETS_PER_FRAME); } + std::cout << "NI" << std::endl; ::close(send_socket_fd); } diff --git a/jfj-udp-recv/test/test_PacketBuffer.cpp b/jfj-udp-recv/test/test_PacketBuffer.cpp new file mode 100644 index 0000000..e294a6d --- /dev/null +++ b/jfj-udp-recv/test/test_PacketBuffer.cpp @@ -0,0 +1,65 @@ +#include +#include +#include "gtest/gtest.h" +#include "PacketBuffer.hpp" + +#include +#include +#include + +using namespace std; + +class MockReceiver{ + public: + int idx_packet = 42000; + int packet_per_frame = 512; + int num_bunches = 100; + int num_packets =50; + + int receive_many(mmsghdr* msgs, const size_t n_msgs){ + // Receive 'num_packets numner of packets' + + for(int ii=0; ii(msgs[ii].msg_hdr.msg_iov->iov_base); + refer.bunchid = idx_packet / packet_per_frame; + refer.packetnum = idx_packet % packet_per_frame; + idx_packet++; + } + return std::min(size_t(num_packets), n_msgs); + }; +}; + + + + + +TEST(BufferUdpReceiver, packetbuffer_simple){ + std::cout << "Testing PacketBuffer..." << std::endl; + + + PacketBuffer p_buffer; + MockReceiver mockery; + uint64_t prev_bunch, prev_packet; + jfjoch_packet_t p_pop; + + mockery.num_packets = 25; + mockery.idx_packet = 42000; + + p_buffer.fill_from(mockery); + + // First packet + ASSERT_EQ(p_buffer.peek_front().bunchid, 42000/512+1); + prev_bunch = p_buffer.peek_front().bunchid; + prev_packet = p_buffer.peek_front().packetnum; + + ASSERT_EQ(p_buffer.size(), 25); + p_pop = p_buffer.pop_front(); + ASSERT_EQ(p_buffer.size(), 24); + + ASSERT_EQ(p_pop.bunchid, prev_bunch); + ASSERT_EQ(p_pop.packetnum, prev_packet); + ASSERT_EQ(p_buffer.peek_front().bunchid, prev_bunch); + ASSERT_EQ(p_buffer.peek_front().packetnum, prev_packet+1); + std::cout << "Done..." << std::endl; + +};