Tests no longer hang with 25MB UDP buffers

This commit is contained in:
Mohacsi Istvan
2021-06-07 15:39:45 +02:00
parent 1406284d41
commit be7cd4994e
8 changed files with 133 additions and 293 deletions
+3 -4
View File
@@ -3,13 +3,13 @@
#include <cstdint>
#define JFJOCH_N_MODULES 32
#define JFJOCH_N_MODULES 8
#define JFJOCH_BYTES_PER_PACKET 8240
#define JFJOCH_DATA_BYTES_PER_PACKET 8192
#define JFJOCH_N_PACKETS_PER_FRAME (JFJOCH_N_MODULES * 128)
#define JFJOCH_DATA_BYTES_PER_FRAME (JFJOCH_N_MODULES * 1048576)
// 48 bytes + 8192 bytes = 8240 bytes
// 48 bytes + 8192 bytes = 8240 bytes (below 9000 MTU)
#pragma pack(push)
#pragma pack(2)
struct jfjoch_packet_t {
@@ -33,5 +33,4 @@ struct jfjoch_packet_t {
};
#pragma pack(pop)
#endif
#endif // JUNGFRAUJOCH_HPP
-254
View File
@@ -1,254 +0,0 @@
#include <netinet/in.h>
#include <jungfraujoch.hpp>
#include "gtest/gtest.h"
#include "PacketBuffer.hpp"
#include <thread>
#include <chrono>
#include <future>
using namespace std;
template <typename TY>
class MockReceiver<TY>{
public:
int idx_packet = 42000;
int packet_per_frame = 512;
int num_bunches = 100;
int num_packets =50;
int receive_many(mmsghdr* msgs, const size_t n_msgs){
// Receive 'num_packets numner of packets'
for(int ii=0; ii<std::min(num_packets, n_msgs); ii++){
jfjoch_packet_t& refer = std::reinterpret_cast<jfjoch_packet_t&>(mmsghdr[ii].msg_hdr.msg_iov->iov_base);
refer.bunchid = idx_packet / packet_per_frame;
refer.packetnum = idx_packet % packet_per_frame;
idx_packet++;
}
return std::min(num_packets, n_msgs);
};
};
//
//
//
//
//TEST(BufferUdpReceiver, simple_recv)
//{
// auto n_packets = JF_N_PACKETS_PER_FRAME;
// int n_frames = 5;
//
// uint16_t udp_port = MOCK_UDP_PORT;
// auto server_address = get_server_address(udp_port);
// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
// ASSERT_TRUE(send_socket_fd >= 0);
//
// JfjFrameUdpReceiver udp_receiver(udp_port);
//
// auto handle = async(launch::async, [&](){
// for (int i_frame=0; i_frame < n_frames; i_frame++){
// for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
// jungfrau_packet send_udp_buffer;
// send_udp_buffer.packetnum = i_packet;
// send_udp_buffer.bunchid = i_frame + 1;
// send_udp_buffer.framenum = i_frame + 1000;
// send_udp_buffer.debug = i_frame + 10000;
//
// ::sendto(
// send_socket_fd,
// &send_udp_buffer,
// JUNGFRAU_BYTES_PER_PACKET,
// 0,
// (sockaddr*) &server_address,
// sizeof(server_address));
// }
// }
// });
//
// handle.wait();
//
// ModuleFrame metadata;
// auto frame_buffer = make_unique<char[]>(JUNGFRAU_DATA_BYTES_PER_FRAME);
//
// for (int i_frame=0; i_frame < n_frames; i_frame++) {
// auto pulse_id = udp_receiver.get_frame_from_udp(
// metadata, frame_buffer.get());
//
// ASSERT_EQ(i_frame + 1, pulse_id);
// ASSERT_EQ(metadata.frame_index, i_frame + 1000);
// ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
// // -1 because we skipped a packet.
// ASSERT_EQ(metadata.n_recv_packets, n_packets);
// }
//
// ::close(send_socket_fd);
//}
//
//TEST(BufferUdpReceiver, missing_middle_packet)
//{
// auto n_packets = JF_N_PACKETS_PER_FRAME;
// int n_frames = 3;
//
// uint16_t udp_port = MOCK_UDP_PORT;
// auto server_address = get_server_address(udp_port);
// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
// ASSERT_TRUE(send_socket_fd >= 0);
//
// JfjFrameUdpReceiver udp_receiver(udp_port, source_id);
//
// auto handle = async(launch::async, [&](){
// for (int i_frame=0; i_frame < n_frames; i_frame++){
// for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
// // Skip some random middle packet.
// if (i_packet == 10) {
// continue;
// }
//
// jungfrau_packet send_udp_buffer;
// send_udp_buffer.packetnum = i_packet;
// send_udp_buffer.bunchid = i_frame + 1;
// send_udp_buffer.framenum = i_frame + 1000;
// send_udp_buffer.debug = i_frame + 10000;
//
// ::sendto(
// send_socket_fd,
// &send_udp_buffer,
// JUNGFRAU_BYTES_PER_PACKET,
// 0,
// (sockaddr*) &server_address,
// sizeof(server_address));
// }
// }
// });
//
// handle.wait();
//
// ModuleFrame metadata;
// auto frame_buffer = make_unique<char[]>(JUNGFRAU_DATA_BYTES_PER_FRAME);
//
// for (int i_frame=0; i_frame < n_frames; i_frame++) {
// auto pulse_id = udp_receiver.get_frame_from_udp(
// metadata, frame_buffer.get());
//
// ASSERT_EQ(i_frame + 1, pulse_id);
// ASSERT_EQ(metadata.frame_index, i_frame + 1000);
// ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
// // -1 because we skipped a packet.
// ASSERT_EQ(metadata.n_recv_packets, n_packets - 1);
// }
//
// ::close(send_socket_fd);
//}
//
//TEST(BufferUdpReceiver, missing_first_packet)
//{
// auto n_packets = JF_N_PACKETS_PER_FRAME;
// int n_frames = 3;
//
// uint16_t udp_port = MOCK_UDP_PORT;
// auto server_address = get_server_address(udp_port);
// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
// ASSERT_TRUE(send_socket_fd >= 0);
//
// JfjFrameUdpReceiver udp_receiver(udp_port);
//
// auto handle = async(launch::async, [&](){
// for (int i_frame=0; i_frame < n_frames; i_frame++){
// for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
// // Skip first packet.
// if (i_packet == 0) {
// continue;
// }
//
// jungfrau_packet send_udp_buffer;
// send_udp_buffer.packetnum = i_packet;
// send_udp_buffer.bunchid = i_frame + 1;
// send_udp_buffer.framenum = i_frame + 1000;
// send_udp_buffer.debug = i_frame + 10000;
//
// ::sendto(
// send_socket_fd,
// &send_udp_buffer,
// JUNGFRAU_BYTES_PER_PACKET,
// 0,
// (sockaddr*) &server_address,
// sizeof(server_address));
// }
// }
// });
//
// handle.wait();
//
// ModuleFrame metadata;
// auto frame_buffer = make_unique<char[]>(JUNGFRAU_DATA_BYTES_PER_FRAME);
//
// for (int i_frame=0; i_frame < n_frames; i_frame++) {
// auto pulse_id = udp_receiver.get_frame_from_udp(
// metadata, frame_buffer.get());
//
// ASSERT_EQ(i_frame + 1, pulse_id);
// ASSERT_EQ(metadata.frame_index, i_frame + 1000);
// ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
// // -1 because we skipped a packet.
// ASSERT_EQ(metadata.n_recv_packets, n_packets - 1);
// }
//
// ::close(send_socket_fd);
//}
//
//TEST(BufferUdpReceiver, missing_last_packet)
//{
// auto n_packets = JF_N_PACKETS_PER_FRAME;
// int n_frames = 3;
//
// uint16_t udp_port = MOCK_UDP_PORT;
// auto server_address = get_server_address(udp_port);
// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
// ASSERT_TRUE(send_socket_fd >= 0);
//
// JfjFrameUdpReceiver udp_receiver(udp_port);
//
// auto handle = async(launch::async, [&](){
// for (int i_frame=0; i_frame < n_frames; i_frame++){
// for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
// // Skip the last packet.
// if (i_packet == n_packets-1) {
// continue;
// }
//
// jungfrau_packet send_udp_buffer;
// send_udp_buffer.packetnum = i_packet;
// send_udp_buffer.bunchid = i_frame + 1;
// send_udp_buffer.framenum = i_frame + 1000;
// send_udp_buffer.debug = i_frame + 10000;
//
// ::sendto(
// send_socket_fd,
// &send_udp_buffer,
// JUNGFRAU_BYTES_PER_PACKET,
// 0,
// (sockaddr*) &server_address,
// sizeof(server_address));
// }
// }
// });
//
// handle.wait();
//
// ModuleFrame metadata;
// auto frame_buffer = make_unique<char[]>(JUNGFRAU_DATA_BYTES_PER_FRAME);
//
// // n_frames -1 because the last frame is not complete.
// for (int i_frame=0; i_frame < n_frames - 1; i_frame++) {
// auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get());
//
// ASSERT_EQ(i_frame + 1, pulse_id);
// ASSERT_EQ(metadata.frame_index, i_frame + 1000);
// ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
// // -1 because we skipped a packet.
// ASSERT_EQ(metadata.n_recv_packets, n_packets - 1);
// }
//
// ::close(send_socket_fd);
//}
+2 -1
View File
@@ -15,7 +15,8 @@
**/
class JfjFrameUdpReceiver {
PacketUdpReceiver m_udp_receiver;
uint64_t m_frame_index;
bool is_initialized = false;
uint64_t m_frame_index = 0;
PacketBuffer<jfjoch_packet_t, buffer_config::BUFFER_UDP_N_RECV_MSG> m_buffer;
+1
View File
@@ -52,6 +52,7 @@ public:
void fill_from(TY& recv){
std::lock_guard<std::mutex> g_guard(m_mutex);
this->idx_write = recv.receive_many(m_msgs, this->capacity());
// std::cout << "Received " << this->idx_write << " frames" << std::endl;
// Returns -1 with errno=11 if no data received
if(idx_write==-1){ idx_write = 0; }
this->idx_read = 0;
+23 -7
View File
@@ -34,13 +34,19 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char
while(!m_buffer.is_empty()){
// Happens if the last packet from the previous frame gets lost.
if (m_frame_index != m_buffer.peek_front().framenum) {
std::cout << "Metadata pulse: " << metadata.pulse_id << "\tIndex: " << m_frame_index << "\tCurrent one is: " << m_buffer.peek_front().framenum << std::endl;
m_frame_index = m_buffer.peek_front().framenum;
std::cout << "Peeked pulse: " << metadata.pulse_id << std::endl;
return metadata.pulse_id;
if(this->is_initialized){
return metadata.pulse_id;
}else{
this->is_initialized = true;
}
}
// Otherwise pop the queue (and set current frame index)
jfjoch_packet_t& c_packet = m_buffer.pop_front();
std::cout << c_packet << std::endl;
m_frame_index = c_packet.framenum;
// Always copy metadata (otherwise problem when 0th packet gets lost)
@@ -69,18 +75,28 @@ uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ModuleFrame& metadata, char* fr
memset(frame_buffer, 0, JFJOCH_DATA_BYTES_PER_FRAME);
// Process leftover packages in the buffer
if (!m_buffer.is_empty()) {
std::cout << "Leftovers..." << std::endl;
auto pulse_id = process_packets(metadata, frame_buffer);
if (pulse_id != 0) { return pulse_id; }
if (pulse_id != 0) {
std::cout << "Returning frame: " << pulse_id << std::endl;
return pulse_id; }
}
while (true) {
// Receive new packages (pass if none)...
m_buffer.reset();
m_buffer.fill_from(m_udp_receiver);
if (m_buffer.is_empty()) { continue; }
std::cout << "Really new..." << std::endl;
// m_buffer.reset();
m_buffer.fill_from(m_udp_receiver);
std::cout << "Got " << m_buffer.size() << std::endl;
if (m_buffer.is_empty()) {
std::cout << "Empty..." << std::endl;
continue; }
// ... and process them
auto pulse_id = process_packets(metadata, frame_buffer);
if (pulse_id != 0) { return pulse_id; }
if (pulse_id != 0) {
std::cout << "Returning frame: " << pulse_id << std::endl;
return pulse_id; }
}
}
+1
View File
@@ -1,6 +1,7 @@
#include "gtest/gtest.h"
#include "test_PacketUdpReceiver.cpp"
#include "test_FrameUdpReceiver.cpp"
#include "test_PacketBuffer.cpp"
using namespace std;
+38 -27
View File
@@ -10,53 +10,64 @@
using namespace std;
TEST(BufferUdpReceiver, simple_recv)
{
int n_packets = JFJOCH_N_PACKETS_PER_FRAME;
int n_frames = 5;
void mockDetector(int& udp_socket_fd, int64_t n_frames){
uint16_t udp_port = MOCK_UDP_PORT;
auto server_address = get_server_address(udp_port);
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(send_socket_fd >= 0);
JfjFrameUdpReceiver udp_receiver(udp_port);
for (int64_t i_frame=0; i_frame < n_frames; i_frame++){
for (size_t i_packet=0; i_packet<JFJOCH_N_PACKETS_PER_FRAME; i_packet++) {
jfjoch_packet_t send_udp_buffer;
send_udp_buffer.packetnum = i_packet;
send_udp_buffer.bunchid = i_frame + 1;
send_udp_buffer.framenum = i_frame + 1000;
send_udp_buffer.debug = i_frame + 10000;
auto handle = async(launch::async, [&](){
for (int64_t i_frame=0; i_frame < n_frames; i_frame++){
for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
jfjoch_packet_t send_udp_buffer;
send_udp_buffer.packetnum = i_packet;
send_udp_buffer.bunchid = i_frame + 1;
send_udp_buffer.framenum = i_frame + 1000;
send_udp_buffer.debug = i_frame + 10000;
::sendto(udp_socket_fd, &send_udp_buffer, sizeof(send_udp_buffer),
0, (sockaddr*) &server_address, sizeof(server_address));
::sendto(
send_socket_fd,
&send_udp_buffer,
JFJOCH_BYTES_PER_PACKET,
0,
(sockaddr*) &server_address,
sizeof(server_address));
}
}
});
}
std::cout << "Sent " << n_frames << " frames\t" << JFJOCH_N_PACKETS_PER_FRAME*n_frames << " packets" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
TEST(BufferUdpReceiver, simple_recv){
// Open detector port
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(send_socket_fd >= 0);
int n_frames = 3;
// Open receiver
JfjFrameUdpReceiver udp_receiver(MOCK_UDP_PORT);
std::cout << "NI" << std::endl;
auto handle = async(launch::async, [&]{ mockDetector(send_socket_fd, n_frames); } );
std::cout << "NI before wait()" << std::endl;
handle.wait();
std::cout << "NI after wait()" << std::endl;
ModuleFrame metadata;
auto frame_buffer = make_unique<char[]>(JFJOCH_DATA_BYTES_PER_FRAME);
std::cout << "NI" << std::endl;
for (int i_frame=0; i_frame < n_frames; i_frame++) {
auto pulse_id = udp_receiver.get_frame_from_udp(
metadata, frame_buffer.get());
std::cout << "Getting frame: " << i_frame << std::endl;
auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get());
std::cout << ".. gotcha " << pulse_id << std::endl;
ASSERT_EQ(i_frame + 1, pulse_id);
ASSERT_EQ(metadata.frame_index, i_frame + 1000);
ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
// -1 because we skipped a packet.
ASSERT_EQ(metadata.n_recv_packets, n_packets);
ASSERT_EQ(metadata.n_recv_packets, JFJOCH_N_PACKETS_PER_FRAME);
}
std::cout << "NI" << std::endl;
::close(send_socket_fd);
}
+65
View File
@@ -0,0 +1,65 @@
#include <netinet/in.h>
#include <jungfraujoch.hpp>
#include "gtest/gtest.h"
#include "PacketBuffer.hpp"
#include <thread>
#include <chrono>
#include <future>
using namespace std;
class MockReceiver{
public:
int idx_packet = 42000;
int packet_per_frame = 512;
int num_bunches = 100;
int num_packets =50;
int receive_many(mmsghdr* msgs, const size_t n_msgs){
// Receive 'num_packets numner of packets'
for(int ii=0; ii<std::min(size_t(num_packets), n_msgs); ii++){
jfjoch_packet_t& refer = reinterpret_cast<jfjoch_packet_t&>(msgs[ii].msg_hdr.msg_iov->iov_base);
refer.bunchid = idx_packet / packet_per_frame;
refer.packetnum = idx_packet % packet_per_frame;
idx_packet++;
}
return std::min(size_t(num_packets), n_msgs);
};
};
TEST(BufferUdpReceiver, packetbuffer_simple){
std::cout << "Testing PacketBuffer..." << std::endl;
PacketBuffer<jfjoch_packet_t, 128> p_buffer;
MockReceiver mockery;
uint64_t prev_bunch, prev_packet;
jfjoch_packet_t p_pop;
mockery.num_packets = 25;
mockery.idx_packet = 42000;
p_buffer.fill_from(mockery);
// First packet
ASSERT_EQ(p_buffer.peek_front().bunchid, 42000/512+1);
prev_bunch = p_buffer.peek_front().bunchid;
prev_packet = p_buffer.peek_front().packetnum;
ASSERT_EQ(p_buffer.size(), 25);
p_pop = p_buffer.pop_front();
ASSERT_EQ(p_buffer.size(), 24);
ASSERT_EQ(p_pop.bunchid, prev_bunch);
ASSERT_EQ(p_pop.packetnum, prev_packet);
ASSERT_EQ(p_buffer.peek_front().bunchid, prev_bunch);
ASSERT_EQ(p_buffer.peek_front().packetnum, prev_packet+1);
std::cout << "Done..." << std::endl;
};