Lot of fixes from testing

This commit is contained in:
Mohacsi Istvan
2021-06-14 16:05:26 +02:00
parent be7cd4994e
commit a1c9e5c1fd
6 changed files with 139 additions and 169 deletions
+2 -3
View File
@@ -3,11 +3,10 @@
#include <cstdint>
#define JFJOCH_N_MODULES 8
#define JFJOCH_BYTES_PER_PACKET 8240
#define JFJOCH_DATA_BYTES_PER_PACKET 8192
#define JFJOCH_N_PACKETS_PER_FRAME (JFJOCH_N_MODULES * 128)
#define JFJOCH_DATA_BYTES_PER_FRAME (JFJOCH_N_MODULES * 1048576)
#define JFJOCH_N_PACKETS_PER_MODULE 128
#define JFJOCH_DATA_BYTES_PER_MODULE 1048576
// 48 bytes + 8192 bytes = 8240 bytes (below 9000 MTU)
#pragma pack(push)
+10 -7
View File
@@ -1,5 +1,5 @@
#ifndef SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP
#define SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP
#ifndef SF_DAQ_BUFFER_JFJ_UDPRECEIVER_HPP
#define SF_DAQ_BUFFER_JFJ_UDPRECEIVER_HPP
#include <netinet/in.h>
#include "PacketUdpReceiver.hpp"
@@ -15,19 +15,22 @@
**/
class JfjFrameUdpReceiver {
PacketUdpReceiver m_udp_receiver;
bool is_initialized = false;
bool in_progress = false;
uint64_t m_frame_index = 0;
const uint64_t m_num_modules;
const uint64_t m_num_packets;
const uint64_t m_num_data_bytes;
PacketBuffer<jfjoch_packet_t, buffer_config::BUFFER_UDP_N_RECV_MSG> m_buffer;
// PacketBuffer<jfjoch_packet_t, buffer_config::BUFFER_UDP_N_RECV_MSG> m_buffer;
PacketBuffer<jfjoch_packet_t, 64> m_buffer;
inline void init_frame(ModuleFrame& frame_metadata, const jfjoch_packet_t& c_packet);
inline uint64_t process_packets(ModuleFrame& metadata, char* frame_buffer);
public:
JfjFrameUdpReceiver(const uint16_t port);
JfjFrameUdpReceiver(const uint16_t port, uint64_t n_modules = 8);
virtual ~JfjFrameUdpReceiver();
uint64_t get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer);
};
#endif //SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP
#endif //SF_DAQ_BUFFER_JFJ_UDPRECEIVER_HPP
+22 -33
View File
@@ -4,15 +4,11 @@
using namespace std;
using namespace buffer_config;
std::ostream &operator<<(std::ostream &os, jfjoch_packet_t const &packet) {
os << "Frame number: " << packet.framenum << std::endl;
os << "Packet number: " << packet.packetnum << std::endl;
os << "Bunch id: " << packet.bunchid << std::endl;
os << std::endl;
return os;
}
JfjFrameUdpReceiver::JfjFrameUdpReceiver(const uint16_t port) {
JfjFrameUdpReceiver::JfjFrameUdpReceiver(const uint16_t port, uint64_t n_modules):
m_num_modules(n_modules), m_num_packets(n_modules*JFJOCH_N_PACKETS_PER_MODULE),
m_num_data_bytes(n_modules*JFJOCH_DATA_BYTES_PER_MODULE) {
m_udp_receiver.bind(port);
}
@@ -20,9 +16,7 @@ JfjFrameUdpReceiver::~JfjFrameUdpReceiver() {
m_udp_receiver.disconnect();
}
inline void JfjFrameUdpReceiver::init_frame(ModuleFrame& metadata, const jfjoch_packet_t& c_packet) {
// std::cout << c_packet;
inline void JfjFrameUdpReceiver::init_frame(ModuleFrame& metadata, const jfjoch_packet_t& c_packet) {
metadata.pulse_id = c_packet.bunchid;
metadata.frame_index = c_packet.framenum;
metadata.daq_rec = (uint64_t) c_packet.debug;
@@ -33,21 +27,18 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char
while(!m_buffer.is_empty()){
// Happens if the last packet from the previous frame gets lost.
if (m_frame_index != m_buffer.peek_front().framenum) {
std::cout << "Metadata pulse: " << metadata.pulse_id << "\tIndex: " << m_frame_index << "\tCurrent one is: " << m_buffer.peek_front().framenum << std::endl;
if (m_frame_index != m_buffer.peek_front().framenum) {
m_frame_index = m_buffer.peek_front().framenum;
if(this->is_initialized){
if(this->in_progress){
this->in_progress = false;
return metadata.pulse_id;
}else{
this->is_initialized = true;
}
}
// Otherwise pop the queue (and set current frame index)
jfjoch_packet_t& c_packet = m_buffer.pop_front();
std::cout << c_packet << std::endl;
m_frame_index = c_packet.framenum;
this->in_progress = true;
// Always copy metadata (otherwise problem when 0th packet gets lost)
this->init_frame(metadata, c_packet);
@@ -58,7 +49,8 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char
metadata.n_recv_packets++;
// Last frame packet received. Frame finished.
if (c_packet.packetnum == JFJOCH_N_PACKETS_PER_FRAME - 1){
if (c_packet.packetnum == m_num_packets - 1){
this->in_progress = false;
return metadata.pulse_id;
}
}
@@ -70,33 +62,30 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char
uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer){
// Reset the metadata and frame buffer for the next frame. (really needed?)
std::cout << "Asking for next frame..." << std::endl;
metadata.pulse_id = 0;
metadata.n_recv_packets = 0;
memset(frame_buffer, 0, JFJOCH_DATA_BYTES_PER_FRAME);
memset(frame_buffer, 0, m_num_data_bytes);
// Process leftover packages in the buffer
if (!m_buffer.is_empty()) {
std::cout << "Leftovers..." << std::endl;
if (!m_buffer.is_empty()) {
auto pulse_id = process_packets(metadata, frame_buffer);
if (pulse_id != 0) {
std::cout << "Returning frame: " << pulse_id << std::endl;
return pulse_id; }
if (pulse_id != 0) { return pulse_id; }
}
while (true) {
// Receive new packages (pass if none)...
std::cout << "Really new..." << std::endl;
std::cout << "Fetching new data..." << std::endl;
// m_buffer.reset();
m_buffer.fill_from(m_udp_receiver);
std::cout << "Got " << m_buffer.size() << std::endl;
if (m_buffer.is_empty()) {
std::cout << "Empty..." << std::endl;
continue; }
if (m_buffer.is_empty()) { continue; }
// ... and process them
auto pulse_id = process_packets(metadata, frame_buffer);
if (pulse_id != 0) {
std::cout << "Returning frame: " << pulse_id << std::endl;
return pulse_id; }
if (pulse_id != 0) { return pulse_id; }
}
}
+2 -2
View File
@@ -28,7 +28,7 @@ int main (int argc, char *argv[]) {
const auto config = read_json_config(string(argv[1]));
const auto udp_port = config.start_udp_port;
JfjFrameUdpReceiver receiver(udp_port);
JfjFrameUdpReceiver receiver(udp_port, 8);
RamBuffer buffer(config.detector_name, config.n_modules);
FrameStats stats(config.detector_name, 0, STATS_TIME);
@@ -39,7 +39,7 @@ int main (int argc, char *argv[]) {
// Might be better creating a structure for double buffering
ModuleFrame frameMeta;
ImageMetadata imageMeta;
char* dataBuffer = new char[JFJOCH_DATA_BYTES_PER_FRAME];
char* dataBuffer = new char[8 * JFJOCH_DATA_BYTES_PER_MODULE];
uint64_t pulse_id_previous = 0;
uint64_t frame_index_previous = 0;
+69 -110
View File
@@ -9,72 +9,11 @@
#include <future>
using namespace std;
#define NUM_TEST_MODULES 3
void mockDetector(int& udp_socket_fd, int64_t n_frames){
uint16_t udp_port = MOCK_UDP_PORT;
auto server_address = get_server_address(udp_port);
for (int64_t i_frame=0; i_frame < n_frames; i_frame++){
for (size_t i_packet=0; i_packet<JFJOCH_N_PACKETS_PER_FRAME; i_packet++) {
jfjoch_packet_t send_udp_buffer;
send_udp_buffer.packetnum = i_packet;
send_udp_buffer.bunchid = i_frame + 1;
send_udp_buffer.framenum = i_frame + 1000;
send_udp_buffer.debug = i_frame + 10000;
::sendto(udp_socket_fd, &send_udp_buffer, sizeof(send_udp_buffer),
0, (sockaddr*) &server_address, sizeof(server_address));
}
}
std::cout << "Sent " << n_frames << " frames\t" << JFJOCH_N_PACKETS_PER_FRAME*n_frames << " packets" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
TEST(BufferUdpReceiver, simple_recv){
// Open detector port
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(send_socket_fd >= 0);
int n_frames = 3;
// Open receiver
JfjFrameUdpReceiver udp_receiver(MOCK_UDP_PORT);
std::cout << "NI" << std::endl;
auto handle = async(launch::async, [&]{ mockDetector(send_socket_fd, n_frames); } );
std::cout << "NI before wait()" << std::endl;
handle.wait();
std::cout << "NI after wait()" << std::endl;
ModuleFrame metadata;
auto frame_buffer = make_unique<char[]>(JFJOCH_DATA_BYTES_PER_FRAME);
std::cout << "NI" << std::endl;
for (int i_frame=0; i_frame < n_frames; i_frame++) {
std::cout << "Getting frame: " << i_frame << std::endl;
auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get());
std::cout << ".. gotcha " << pulse_id << std::endl;
ASSERT_EQ(i_frame + 1, pulse_id);
ASSERT_EQ(metadata.frame_index, i_frame + 1000);
ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
// -1 because we skipped a packet.
ASSERT_EQ(metadata.n_recv_packets, JFJOCH_N_PACKETS_PER_FRAME);
}
std::cout << "NI" << std::endl;
::close(send_socket_fd);
}
TEST(BufferUdpReceiver, missing_middle_packet)
{
int n_packets = JFJOCH_N_PACKETS_PER_FRAME;
int n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE;
int n_frames = 3;
uint16_t udp_port = MOCK_UDP_PORT;
@@ -82,15 +21,56 @@ TEST(BufferUdpReceiver, missing_middle_packet)
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(send_socket_fd >= 0);
JfjFrameUdpReceiver udp_receiver(udp_port);
JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES);
auto handle = async(launch::async, [&](){
for (int64_t i_frame=0; i_frame < n_frames; i_frame++){
for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
jfjoch_packet_t send_udp_buffer;
send_udp_buffer.packetnum = i_packet;
send_udp_buffer.bunchid = i_frame + 1;
send_udp_buffer.framenum = i_frame + 1000;
send_udp_buffer.debug = i_frame + 10000;
::sendto(send_socket_fd, &send_udp_buffer, JFJOCH_BYTES_PER_PACKET,
0, (sockaddr*) &server_address, sizeof(server_address));
}
}
});
handle.wait();
ModuleFrame metadata;
auto frame_buffer = make_unique<char[]>(NUM_TEST_MODULES*JFJOCH_DATA_BYTES_PER_MODULE);
for (int i_frame=0; i_frame < n_frames; i_frame++) {
auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get());
ASSERT_EQ(i_frame + 1, pulse_id);
ASSERT_EQ(metadata.frame_index, i_frame + 1000);
ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
ASSERT_EQ(metadata.n_recv_packets, n_packets);
}
::close(send_socket_fd);
}
TEST(BufferUdpReceiver, missing_middle_packet){
int n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE;
int n_frames = 3;
uint16_t udp_port = MOCK_UDP_PORT;
auto server_address = get_server_address(udp_port);
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(send_socket_fd >= 0);
JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES);
auto handle = async(launch::async, [&](){
for (int64_t i_frame=0; i_frame < n_frames; i_frame++){
for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
// Skip some random middle packet.
if (i_packet == 10) {
continue;
}
if (i_packet == 10) { continue; }
jfjoch_packet_t send_udp_buffer;
send_udp_buffer.packetnum = i_packet;
@@ -98,13 +78,8 @@ TEST(BufferUdpReceiver, missing_middle_packet)
send_udp_buffer.framenum = i_frame + 1000;
send_udp_buffer.debug = i_frame + 10000;
::sendto(
send_socket_fd,
&send_udp_buffer,
JFJOCH_BYTES_PER_PACKET,
0,
(sockaddr*) &server_address,
sizeof(server_address));
::sendto(send_socket_fd, &send_udp_buffer, JFJOCH_BYTES_PER_PACKET,
0, (sockaddr*) &server_address, sizeof(server_address));
}
}
});
@@ -112,7 +87,7 @@ TEST(BufferUdpReceiver, missing_middle_packet)
handle.wait();
ModuleFrame metadata;
auto frame_buffer = make_unique<char[]>(JFJOCH_DATA_BYTES_PER_FRAME);
auto frame_buffer = make_unique<char[]>(NUM_TEST_MODULES * JFJOCH_DATA_BYTES_PER_MODULE);
for (int i_frame=0; i_frame < n_frames; i_frame++) {
auto pulse_id = udp_receiver.get_frame_from_udp(
@@ -128,9 +103,8 @@ TEST(BufferUdpReceiver, missing_middle_packet)
::close(send_socket_fd);
}
TEST(BufferUdpReceiver, missing_first_packet)
{
auto n_packets = JFJOCH_N_PACKETS_PER_FRAME;
TEST(BufferUdpReceiver, missing_first_packet){
auto n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE;
int n_frames = 3;
uint16_t udp_port = MOCK_UDP_PORT;
@@ -138,15 +112,13 @@ TEST(BufferUdpReceiver, missing_first_packet)
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(send_socket_fd >= 0);
JfjFrameUdpReceiver udp_receiver(udp_port);
JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES);
auto handle = async(launch::async, [&](){
for (int64_t i_frame=0; i_frame < n_frames; i_frame++){
for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
// Skip first packet.
if (i_packet == 0) {
continue;
}
if (i_packet == 0) {continue;}
jfjoch_packet_t send_udp_buffer;
send_udp_buffer.packetnum = i_packet;
@@ -154,13 +126,8 @@ TEST(BufferUdpReceiver, missing_first_packet)
send_udp_buffer.framenum = i_frame + 1000;
send_udp_buffer.debug = i_frame + 10000;
::sendto(
send_socket_fd,
&send_udp_buffer,
JUNGFRAU_BYTES_PER_PACKET,
0,
(sockaddr*) &server_address,
sizeof(server_address));
::sendto(send_socket_fd, &send_udp_buffer, JUNGFRAU_BYTES_PER_PACKET,
0, (sockaddr*) &server_address, sizeof(server_address));
}
}
});
@@ -168,41 +135,37 @@ TEST(BufferUdpReceiver, missing_first_packet)
handle.wait();
ModuleFrame metadata;
auto frame_buffer = make_unique<char[]>(JFJOCH_DATA_BYTES_PER_FRAME);
auto frame_buffer = make_unique<char[]>(NUM_TEST_MODULES * JFJOCH_DATA_BYTES_PER_MODULE);
for (int i_frame=0; i_frame < n_frames; i_frame++) {
auto pulse_id = udp_receiver.get_frame_from_udp(
metadata, frame_buffer.get());
auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get());
ASSERT_EQ(i_frame + 1, pulse_id);
ASSERT_EQ(metadata.frame_index, i_frame + 1000);
ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
// -1 because we skipped a packet.
// -2 because we skipped a packet.
ASSERT_EQ(metadata.n_recv_packets, n_packets - 1);
}
::close(send_socket_fd);
}
TEST(BufferUdpReceiver, missing_last_packet)
{
int n_packets = JFJOCH_N_PACKETS_PER_FRAME;
int n_frames = 3;
TEST(BufferUdpReceiver, missing_last_packet){
int n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE;
int n_frames = 4;
uint16_t udp_port = MOCK_UDP_PORT;
auto server_address = get_server_address(udp_port);
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(send_socket_fd >= 0);
JfjFrameUdpReceiver udp_receiver(udp_port);
JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES);
auto handle = async(launch::async, [&](){
for (int64_t i_frame=0; i_frame < n_frames; i_frame++){
for (int64_t i_frame=0; i_frame < n_frames+1; i_frame++){
for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
// Skip the last packet.
if (i_packet == n_packets-1) {
continue;
}
if (i_packet == n_packets-1) {continue;}
jfjoch_packet_t send_udp_buffer;
send_udp_buffer.packetnum = i_packet;
@@ -210,13 +173,8 @@ TEST(BufferUdpReceiver, missing_last_packet)
send_udp_buffer.framenum = i_frame + 1000;
send_udp_buffer.debug = i_frame + 10000;
::sendto(
send_socket_fd,
&send_udp_buffer,
JUNGFRAU_BYTES_PER_PACKET,
0,
(sockaddr*) &server_address,
sizeof(server_address));
::sendto(send_socket_fd, &send_udp_buffer, JUNGFRAU_BYTES_PER_PACKET,
0, (sockaddr*) &server_address, sizeof(server_address));
}
}
});
@@ -224,11 +182,12 @@ TEST(BufferUdpReceiver, missing_last_packet)
handle.wait();
ModuleFrame metadata;
auto frame_buffer = make_unique<char[]>(JFJOCH_DATA_BYTES_PER_FRAME);
auto frame_buffer = make_unique<char[]>(NUM_TEST_MODULES * JFJOCH_DATA_BYTES_PER_MODULE);
// n_frames -1 because the last frame is not complete.
for (int i_frame=0; i_frame < n_frames - 1; i_frame++) {
auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get());
std::cout << "Retrieved pulse_id: " << pulse_id << std::endl;
ASSERT_EQ(i_frame + 1, pulse_id);
ASSERT_EQ(metadata.frame_index, i_frame + 1000);
+34 -14
View File
@@ -9,50 +9,70 @@
using namespace std;
std::ostream &operator<<(std::ostream &os, jfjoch_packet_t const &packet) {
os << "Frame number: " << packet.framenum << std::endl;
os << "Packet number: " << packet.packetnum << std::endl;
os << "Bunch id: " << packet.bunchid << std::endl;
os << std::endl;
return os;
}
class MockReceiver{
public:
int idx_packet = 42000;
int packet_per_frame = 512;
int num_bunches = 100;
int num_packets =50;
uint64_t idx_packet = 42000;
uint64_t packet_per_frame = 512;
uint64_t num_bunches = 100;
uint64_t num_packets =50;
int receive_many(mmsghdr* msgs, const size_t n_msgs){
uint64_t receive_many(mmsghdr* msgs, const size_t n_msgs){
// Receive 'num_packets numner of packets'
for(int ii=0; ii<std::min(size_t(num_packets), n_msgs); ii++){
for(int ii=0; ii<num_packets; ii++){
std::cout << "Buffer length: " << msgs[ii].msg_hdr.msg_iov->iov_len << "\tExpected: " << sizeof(jfjoch_packet_t) << std::endl;
jfjoch_packet_t& refer = reinterpret_cast<jfjoch_packet_t&>(msgs[ii].msg_hdr.msg_iov->iov_base);
refer.framenum = idx_packet / packet_per_frame;
refer.bunchid = idx_packet / packet_per_frame;
refer.packetnum = idx_packet % packet_per_frame;
std::cout << refer << "\n";
idx_packet++;
}
return std::min(size_t(num_packets), n_msgs);
return num_packets;
};
};
TEST(BufferUdpReceiver, packetbuffer_simple){
std::cout << "Testing PacketBuffer..." << std::endl;
PacketBuffer<jfjoch_packet_t, 128> p_buffer;
MockReceiver mockery;
uint64_t prev_bunch, prev_packet;
jfjoch_packet_t p_pop;
mockery.idx_packet = 7*512 + 13;
mockery.num_packets = 25;
mockery.idx_packet = 42000;
p_buffer.fill_from(mockery);
// First packet
ASSERT_EQ(p_buffer.peek_front().bunchid, 42000/512+1);
ASSERT_FALSE(p_buffer.is_empty());
ASSERT_EQ(p_buffer.size(), 25);
std::cout << "Current packet:\n" << p_buffer.peek_front();
ASSERT_EQ(p_buffer.peek_front().bunchid, 7);
ASSERT_EQ(p_buffer.peek_front().packetnum, 13);
prev_bunch = p_buffer.peek_front().bunchid;
prev_packet = p_buffer.peek_front().packetnum;
ASSERT_EQ(p_buffer.size(), 25);
p_pop = p_buffer.pop_front();
ASSERT_EQ(p_buffer.size(), 24);