From a1c9e5c1fdda7d0b2bf319ba7e0b94c308890fd2 Mon Sep 17 00:00:00 2001 From: Mohacsi Istvan Date: Mon, 14 Jun 2021 16:05:26 +0200 Subject: [PATCH] Lot of fixes from testing --- core-buffer/include/jungfraujoch.hpp | 5 +- jfj-udp-recv/include/JfjFrameUdpReceiver.hpp | 17 +- jfj-udp-recv/src/JfjFrameUdpReceiver.cpp | 55 +++--- jfj-udp-recv/src/main.cpp | 4 +- jfj-udp-recv/test/test_FrameUdpReceiver.cpp | 179 +++++++------------ jfj-udp-recv/test/test_PacketBuffer.cpp | 48 +++-- 6 files changed, 139 insertions(+), 169 deletions(-) diff --git a/core-buffer/include/jungfraujoch.hpp b/core-buffer/include/jungfraujoch.hpp index 87ae5ca..8ade068 100644 --- a/core-buffer/include/jungfraujoch.hpp +++ b/core-buffer/include/jungfraujoch.hpp @@ -3,11 +3,10 @@ #include -#define JFJOCH_N_MODULES 8 #define JFJOCH_BYTES_PER_PACKET 8240 #define JFJOCH_DATA_BYTES_PER_PACKET 8192 -#define JFJOCH_N_PACKETS_PER_FRAME (JFJOCH_N_MODULES * 128) -#define JFJOCH_DATA_BYTES_PER_FRAME (JFJOCH_N_MODULES * 1048576) +#define JFJOCH_N_PACKETS_PER_MODULE 128 +#define JFJOCH_DATA_BYTES_PER_MODULE 1048576 // 48 bytes + 8192 bytes = 8240 bytes (below 9000 MTU) #pragma pack(push) diff --git a/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp index a575201..4b84647 100644 --- a/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp +++ b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp @@ -1,5 +1,5 @@ -#ifndef SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP -#define SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP +#ifndef SF_DAQ_BUFFER_JFJ_UDPRECEIVER_HPP +#define SF_DAQ_BUFFER_JFJ_UDPRECEIVER_HPP #include #include "PacketUdpReceiver.hpp" @@ -15,19 +15,22 @@ **/ class JfjFrameUdpReceiver { PacketUdpReceiver m_udp_receiver; - bool is_initialized = false; + bool in_progress = false; uint64_t m_frame_index = 0; + const uint64_t m_num_modules; + const uint64_t m_num_packets; + const uint64_t m_num_data_bytes; - PacketBuffer m_buffer; + // PacketBuffer m_buffer; + PacketBuffer m_buffer; inline void init_frame(ModuleFrame& frame_metadata, const jfjoch_packet_t& c_packet); inline uint64_t process_packets(ModuleFrame& metadata, char* frame_buffer); public: - JfjFrameUdpReceiver(const uint16_t port); + JfjFrameUdpReceiver(const uint16_t port, uint64_t n_modules = 8); virtual ~JfjFrameUdpReceiver(); uint64_t get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer); }; - -#endif //SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP +#endif //SF_DAQ_BUFFER_JFJ_UDPRECEIVER_HPP diff --git a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp index e987387..a733707 100644 --- a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp +++ b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp @@ -4,15 +4,11 @@ using namespace std; using namespace buffer_config; -std::ostream &operator<<(std::ostream &os, jfjoch_packet_t const &packet) { - os << "Frame number: " << packet.framenum << std::endl; - os << "Packet number: " << packet.packetnum << std::endl; - os << "Bunch id: " << packet.bunchid << std::endl; - os << std::endl; - return os; -} -JfjFrameUdpReceiver::JfjFrameUdpReceiver(const uint16_t port) { + +JfjFrameUdpReceiver::JfjFrameUdpReceiver(const uint16_t port, uint64_t n_modules): + m_num_modules(n_modules), m_num_packets(n_modules*JFJOCH_N_PACKETS_PER_MODULE), + m_num_data_bytes(n_modules*JFJOCH_DATA_BYTES_PER_MODULE) { m_udp_receiver.bind(port); } @@ -20,9 +16,7 @@ JfjFrameUdpReceiver::~JfjFrameUdpReceiver() { m_udp_receiver.disconnect(); } -inline void JfjFrameUdpReceiver::init_frame(ModuleFrame& metadata, const jfjoch_packet_t& c_packet) { - // std::cout << c_packet; - +inline void JfjFrameUdpReceiver::init_frame(ModuleFrame& metadata, const jfjoch_packet_t& c_packet) { metadata.pulse_id = c_packet.bunchid; metadata.frame_index = c_packet.framenum; metadata.daq_rec = (uint64_t) c_packet.debug; @@ -33,21 +27,18 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char while(!m_buffer.is_empty()){ // Happens if the last packet from the previous frame gets lost. - if (m_frame_index != m_buffer.peek_front().framenum) { - std::cout << "Metadata pulse: " << metadata.pulse_id << "\tIndex: " << m_frame_index << "\tCurrent one is: " << m_buffer.peek_front().framenum << std::endl; - + if (m_frame_index != m_buffer.peek_front().framenum) { m_frame_index = m_buffer.peek_front().framenum; - if(this->is_initialized){ + if(this->in_progress){ + this->in_progress = false; return metadata.pulse_id; - }else{ - this->is_initialized = true; } } // Otherwise pop the queue (and set current frame index) jfjoch_packet_t& c_packet = m_buffer.pop_front(); - std::cout << c_packet << std::endl; m_frame_index = c_packet.framenum; + this->in_progress = true; // Always copy metadata (otherwise problem when 0th packet gets lost) this->init_frame(metadata, c_packet); @@ -58,7 +49,8 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char metadata.n_recv_packets++; // Last frame packet received. Frame finished. - if (c_packet.packetnum == JFJOCH_N_PACKETS_PER_FRAME - 1){ + if (c_packet.packetnum == m_num_packets - 1){ + this->in_progress = false; return metadata.pulse_id; } } @@ -70,33 +62,30 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer){ // Reset the metadata and frame buffer for the next frame. (really needed?) + std::cout << "Asking for next frame..." << std::endl; + metadata.pulse_id = 0; metadata.n_recv_packets = 0; - memset(frame_buffer, 0, JFJOCH_DATA_BYTES_PER_FRAME); + memset(frame_buffer, 0, m_num_data_bytes); + + // Process leftover packages in the buffer - if (!m_buffer.is_empty()) { - std::cout << "Leftovers..." << std::endl; + if (!m_buffer.is_empty()) { auto pulse_id = process_packets(metadata, frame_buffer); - if (pulse_id != 0) { - std::cout << "Returning frame: " << pulse_id << std::endl; - return pulse_id; } + if (pulse_id != 0) { return pulse_id; } } + while (true) { // Receive new packages (pass if none)... - std::cout << "Really new..." << std::endl; + std::cout << "Fetching new data..." << std::endl; // m_buffer.reset(); m_buffer.fill_from(m_udp_receiver); - std::cout << "Got " << m_buffer.size() << std::endl; - if (m_buffer.is_empty()) { - std::cout << "Empty..." << std::endl; - continue; } + if (m_buffer.is_empty()) { continue; } // ... and process them auto pulse_id = process_packets(metadata, frame_buffer); - if (pulse_id != 0) { - std::cout << "Returning frame: " << pulse_id << std::endl; - return pulse_id; } + if (pulse_id != 0) { return pulse_id; } } } diff --git a/jfj-udp-recv/src/main.cpp b/jfj-udp-recv/src/main.cpp index 2afb451..fb2c3ce 100644 --- a/jfj-udp-recv/src/main.cpp +++ b/jfj-udp-recv/src/main.cpp @@ -28,7 +28,7 @@ int main (int argc, char *argv[]) { const auto config = read_json_config(string(argv[1])); const auto udp_port = config.start_udp_port; - JfjFrameUdpReceiver receiver(udp_port); + JfjFrameUdpReceiver receiver(udp_port, 8); RamBuffer buffer(config.detector_name, config.n_modules); FrameStats stats(config.detector_name, 0, STATS_TIME); @@ -39,7 +39,7 @@ int main (int argc, char *argv[]) { // Might be better creating a structure for double buffering ModuleFrame frameMeta; ImageMetadata imageMeta; - char* dataBuffer = new char[JFJOCH_DATA_BYTES_PER_FRAME]; + char* dataBuffer = new char[8 * JFJOCH_DATA_BYTES_PER_MODULE]; uint64_t pulse_id_previous = 0; uint64_t frame_index_previous = 0; diff --git a/jfj-udp-recv/test/test_FrameUdpReceiver.cpp b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp index d32455c..b0d8ebb 100644 --- a/jfj-udp-recv/test/test_FrameUdpReceiver.cpp +++ b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp @@ -9,72 +9,11 @@ #include using namespace std; +#define NUM_TEST_MODULES 3 -void mockDetector(int& udp_socket_fd, int64_t n_frames){ - uint16_t udp_port = MOCK_UDP_PORT; - auto server_address = get_server_address(udp_port); - - for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ - for (size_t i_packet=0; i_packet= 0); - - - int n_frames = 3; - - - // Open receiver - JfjFrameUdpReceiver udp_receiver(MOCK_UDP_PORT); - - std::cout << "NI" << std::endl; - auto handle = async(launch::async, [&]{ mockDetector(send_socket_fd, n_frames); } ); - std::cout << "NI before wait()" << std::endl; - - handle.wait(); - std::cout << "NI after wait()" << std::endl; - - ModuleFrame metadata; - auto frame_buffer = make_unique(JFJOCH_DATA_BYTES_PER_FRAME); - std::cout << "NI" << std::endl; - - for (int i_frame=0; i_frame < n_frames; i_frame++) { - std::cout << "Getting frame: " << i_frame << std::endl; - auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get()); - std::cout << ".. gotcha " << pulse_id << std::endl; - - ASSERT_EQ(i_frame + 1, pulse_id); - ASSERT_EQ(metadata.frame_index, i_frame + 1000); - ASSERT_EQ(metadata.daq_rec, i_frame + 10000); - // -1 because we skipped a packet. - ASSERT_EQ(metadata.n_recv_packets, JFJOCH_N_PACKETS_PER_FRAME); - } - std::cout << "NI" << std::endl; - - ::close(send_socket_fd); -} - -TEST(BufferUdpReceiver, missing_middle_packet) -{ - int n_packets = JFJOCH_N_PACKETS_PER_FRAME; + int n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE; int n_frames = 3; uint16_t udp_port = MOCK_UDP_PORT; @@ -82,15 +21,56 @@ TEST(BufferUdpReceiver, missing_middle_packet) auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); ASSERT_TRUE(send_socket_fd >= 0); - JfjFrameUdpReceiver udp_receiver(udp_port); + JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES); + + auto handle = async(launch::async, [&](){ + for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(NUM_TEST_MODULES*JFJOCH_DATA_BYTES_PER_MODULE); + + for (int i_frame=0; i_frame < n_frames; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + ASSERT_EQ(metadata.n_recv_packets, n_packets); + } + + ::close(send_socket_fd); +} + +TEST(BufferUdpReceiver, missing_middle_packet){ + int n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE; + int n_frames = 3; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES); auto handle = async(launch::async, [&](){ for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ for (size_t i_packet=0; i_packet(JFJOCH_DATA_BYTES_PER_FRAME); + auto frame_buffer = make_unique(NUM_TEST_MODULES * JFJOCH_DATA_BYTES_PER_MODULE); for (int i_frame=0; i_frame < n_frames; i_frame++) { auto pulse_id = udp_receiver.get_frame_from_udp( @@ -128,9 +103,8 @@ TEST(BufferUdpReceiver, missing_middle_packet) ::close(send_socket_fd); } -TEST(BufferUdpReceiver, missing_first_packet) -{ - auto n_packets = JFJOCH_N_PACKETS_PER_FRAME; +TEST(BufferUdpReceiver, missing_first_packet){ + auto n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE; int n_frames = 3; uint16_t udp_port = MOCK_UDP_PORT; @@ -138,15 +112,13 @@ TEST(BufferUdpReceiver, missing_first_packet) auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); ASSERT_TRUE(send_socket_fd >= 0); - JfjFrameUdpReceiver udp_receiver(udp_port); + JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES); auto handle = async(launch::async, [&](){ for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ for (size_t i_packet=0; i_packet(JFJOCH_DATA_BYTES_PER_FRAME); + auto frame_buffer = make_unique(NUM_TEST_MODULES * JFJOCH_DATA_BYTES_PER_MODULE); for (int i_frame=0; i_frame < n_frames; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp( - metadata, frame_buffer.get()); + auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get()); ASSERT_EQ(i_frame + 1, pulse_id); ASSERT_EQ(metadata.frame_index, i_frame + 1000); ASSERT_EQ(metadata.daq_rec, i_frame + 10000); - // -1 because we skipped a packet. + // -2 because we skipped a packet. ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); } ::close(send_socket_fd); } -TEST(BufferUdpReceiver, missing_last_packet) -{ - int n_packets = JFJOCH_N_PACKETS_PER_FRAME; - int n_frames = 3; +TEST(BufferUdpReceiver, missing_last_packet){ + int n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE; + int n_frames = 4; uint16_t udp_port = MOCK_UDP_PORT; auto server_address = get_server_address(udp_port); auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); ASSERT_TRUE(send_socket_fd >= 0); - JfjFrameUdpReceiver udp_receiver(udp_port); + JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES); auto handle = async(launch::async, [&](){ - for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ + for (int64_t i_frame=0; i_frame < n_frames+1; i_frame++){ for (size_t i_packet=0; i_packet(JFJOCH_DATA_BYTES_PER_FRAME); + auto frame_buffer = make_unique(NUM_TEST_MODULES * JFJOCH_DATA_BYTES_PER_MODULE); // n_frames -1 because the last frame is not complete. for (int i_frame=0; i_frame < n_frames - 1; i_frame++) { auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get()); + std::cout << "Retrieved pulse_id: " << pulse_id << std::endl; ASSERT_EQ(i_frame + 1, pulse_id); ASSERT_EQ(metadata.frame_index, i_frame + 1000); diff --git a/jfj-udp-recv/test/test_PacketBuffer.cpp b/jfj-udp-recv/test/test_PacketBuffer.cpp index e294a6d..cba236f 100644 --- a/jfj-udp-recv/test/test_PacketBuffer.cpp +++ b/jfj-udp-recv/test/test_PacketBuffer.cpp @@ -9,50 +9,70 @@ using namespace std; + + +std::ostream &operator<<(std::ostream &os, jfjoch_packet_t const &packet) { + os << "Frame number: " << packet.framenum << std::endl; + os << "Packet number: " << packet.packetnum << std::endl; + os << "Bunch id: " << packet.bunchid << std::endl; + os << std::endl; + return os; +} + + + + class MockReceiver{ public: - int idx_packet = 42000; - int packet_per_frame = 512; - int num_bunches = 100; - int num_packets =50; + uint64_t idx_packet = 42000; + uint64_t packet_per_frame = 512; + uint64_t num_bunches = 100; + uint64_t num_packets =50; - int receive_many(mmsghdr* msgs, const size_t n_msgs){ + uint64_t receive_many(mmsghdr* msgs, const size_t n_msgs){ // Receive 'num_packets numner of packets' - - for(int ii=0; iiiov_len << "\tExpected: " << sizeof(jfjoch_packet_t) << std::endl; + jfjoch_packet_t& refer = reinterpret_cast(msgs[ii].msg_hdr.msg_iov->iov_base); + refer.framenum = idx_packet / packet_per_frame; refer.bunchid = idx_packet / packet_per_frame; refer.packetnum = idx_packet % packet_per_frame; + + std::cout << refer << "\n"; idx_packet++; } - return std::min(size_t(num_packets), n_msgs); + return num_packets; }; }; - - TEST(BufferUdpReceiver, packetbuffer_simple){ std::cout << "Testing PacketBuffer..." << std::endl; - PacketBuffer p_buffer; MockReceiver mockery; uint64_t prev_bunch, prev_packet; jfjoch_packet_t p_pop; + mockery.idx_packet = 7*512 + 13; mockery.num_packets = 25; - mockery.idx_packet = 42000; p_buffer.fill_from(mockery); // First packet - ASSERT_EQ(p_buffer.peek_front().bunchid, 42000/512+1); + ASSERT_FALSE(p_buffer.is_empty()); + ASSERT_EQ(p_buffer.size(), 25); + + std::cout << "Current packet:\n" << p_buffer.peek_front(); + + + ASSERT_EQ(p_buffer.peek_front().bunchid, 7); + ASSERT_EQ(p_buffer.peek_front().packetnum, 13); prev_bunch = p_buffer.peek_front().bunchid; prev_packet = p_buffer.peek_front().packetnum; - ASSERT_EQ(p_buffer.size(), 25); p_pop = p_buffer.pop_front(); ASSERT_EQ(p_buffer.size(), 24);