From 298db2cab5aadde927bf9f4ce328bfc0e0c2591c Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 6 Jul 2021 09:22:59 +0200 Subject: [PATCH 01/16] Add simple re-try sync mechanism --- std-udp-sync/include/sync_config.hpp | 3 - std-udp-sync/src/ZmqPulseSyncReceiver.cpp | 91 ++++++----------------- 2 files changed, 22 insertions(+), 72 deletions(-) diff --git a/std-udp-sync/include/sync_config.hpp b/std-udp-sync/include/sync_config.hpp index ff8f500..253b4e2 100644 --- a/std-udp-sync/include/sync_config.hpp +++ b/std-udp-sync/include/sync_config.hpp @@ -1,8 +1,5 @@ namespace sync_config { - // If the modules are offset more than 1000 pulses, crush. - const uint64_t PULSE_OFFSET_LIMIT = 100; - // Number of times we try to re-sync in case of failure. const int SYNC_RETRY_LIMIT = 3; diff --git a/std-udp-sync/src/ZmqPulseSyncReceiver.cpp b/std-udp-sync/src/ZmqPulseSyncReceiver.cpp index c8a87cb..63682bd 100644 --- a/std-udp-sync/src/ZmqPulseSyncReceiver.cpp +++ b/std-udp-sync/src/ZmqPulseSyncReceiver.cpp @@ -39,88 +39,41 @@ ZmqPulseSyncReceiver::~ZmqPulseSyncReceiver() PulseAndSync ZmqPulseSyncReceiver::get_next_pulse_id() const { - uint64_t pulses[n_modules_]; - - bool modules_in_sync = true; - for (int i = 0; i < n_modules_; i++) { - zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0); - if (pulses[0] != pulses[i]) { - modules_in_sync = false; - } - } - - if (modules_in_sync) { - #ifdef DEBUG_OUTPUT - using namespace date; - cout << " [" << std::chrono::system_clock::now(); - cout << "] [ZmqPulseSyncReceiver::get_next_pulse_id] "; - cout << "] (modules_in_sync) Frame index:" << pulses[0]; - cout << endl; - #endif - return {pulses[0], 0}; - } - - // How many pulses we lost in total to get the next pulse_id. - uint32_t n_lost_pulses = 0; - for (int i_sync=0; i_sync < SYNC_RETRY_LIMIT; i_sync++) { - uint64_t min_pulse_id = numeric_limits::max();; - uint64_t max_pulse_id = 0; + uint64_t ids[n_modules_]; + for (uint32_t i_sync=0; i_sync < SYNC_RETRY_LIMIT; i_sync++) { + bool modules_in_sync = true; for (int i = 0; i < n_modules_; i++) { - min_pulse_id = min(min_pulse_id, pulses[i]); - max_pulse_id = max(max_pulse_id, pulses[i]); - } - auto max_diff = max_pulse_id - min_pulse_id; - if (max_diff > PULSE_OFFSET_LIMIT) { - stringstream err_msg; - err_msg << "[ZmqPulseSyncReceiver::get_next_pulse_id]"; - err_msg << " PULSE_OFFSET_LIMIT exceeded."; - err_msg << " max_diff=" << max_diff << " pulses."; + zmq_recv(sockets_[i], &ids[i], sizeof(uint64_t), 0); - for (int i = 0; i < n_modules_; i++) { - err_msg << " (module " << i << ", "; - err_msg << pulses[i] << "),"; - } - err_msg << endl; - - throw runtime_error(err_msg.str()); - } - - modules_in_sync = true; - // Max pulses we lost in this sync attempt. - uint32_t i_sync_lost_pulses = 0; - for (int i = 0; i < n_modules_; i++) { - // How many pulses we lost for this specific module. - uint32_t i_module_lost_pulses = 0; - while (pulses[i] < max_pulse_id) { - zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0); - i_module_lost_pulses++; - } - - i_sync_lost_pulses = max(i_sync_lost_pulses, i_module_lost_pulses); - - if (pulses[i] != max_pulse_id) { + if (ids[0] != ids[i]) { modules_in_sync = false; } } - n_lost_pulses += i_sync_lost_pulses; if (modules_in_sync) { - #ifdef DEBUG_OUTPUT - using namespace date; - cout << " [" << std::chrono::system_clock::now(); - cout << "] [ZmqPulseSyncReceiver::get_next_pulse_id]"; - cout << " modules_in_sync false"; - cout << endl; - #endif - return {pulses[0], n_lost_pulses}; + return {ids[0], i_sync}; } + + #ifdef DEBUG_OUTPUT + using namespace date; + cout << "[" << std::chrono::system_clock::now() << "]"; + cout << " [ZmqPulseSyncReceiver::get_next_pulse_id]"; + cout << " Modules out of sync:" << endl; + for (int i=0; i < n_modules_; i++) { + cout << " module" << i << ":" << ids[i]; + } + cout << endl; + #endif } stringstream err_msg; - err_msg << "[ZmqLiveReceiver::get_next_pulse_id]"; - err_msg << " SYNC_RETRY_LIMIT exceeded."; + err_msg << "[ZmqPulseSyncReceiver::get_next_pulse_id]"; + err_msg << " SYNC_RETRY_LIMIT exceeded. State:"; + for (int i=0; i < n_modules_; i++) { + err_msg << " module" << i << ":" << ids[i]; + } err_msg << endl; throw runtime_error(err_msg.str()); From 5b05bcc473a9504455c7fcdaa23ee9c6beb27995 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 6 Jul 2021 09:23:24 +0200 Subject: [PATCH 02/16] Fix build script --- std-udp-recv/test/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/std-udp-recv/test/CMakeLists.txt b/std-udp-recv/test/CMakeLists.txt index 25c729a..3c5b145 100644 --- a/std-udp-recv/test/CMakeLists.txt +++ b/std-udp-recv/test/CMakeLists.txt @@ -2,7 +2,7 @@ add_executable(jf-udp-recv-tests main.cpp) target_link_libraries(jf-udp-recv-tests core-buffer-lib - jf-udp-recv-lib + std-udp-recv-lib gtest ) From 42414b62504c5f758a50c7a956748a82cf1e39d8 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 6 Jul 2021 09:27:54 +0200 Subject: [PATCH 03/16] Adjust signature --- std-udp-recv/src/main.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/std-udp-recv/src/main.cpp b/std-udp-recv/src/main.cpp index c9c1a06..8a69df9 100644 --- a/std-udp-recv/src/main.cpp +++ b/std-udp-recv/src/main.cpp @@ -45,7 +45,7 @@ int main (int argc, char *argv[]) { FrameUdpReceiver receiver(udp_port, N_PACKETS_PER_FRAME); RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame), - FRAME_N_BYTES, config.n_modules); + FRAME_N_BYTES, config.n_modules, RAM_BUFFER_N_SLOTS); FrameStats stats(config.detector_name, module_id, N_PACKETS_PER_FRAME, STATS_TIME); From 3e137105bea8022548103c48d53fb7656dc47aa9 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 6 Jul 2021 09:30:04 +0200 Subject: [PATCH 04/16] Fix name in udp recv help --- std-udp-recv/src/main.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/std-udp-recv/src/main.cpp b/std-udp-recv/src/main.cpp index 8a69df9..3b8274e 100644 --- a/std-udp-recv/src/main.cpp +++ b/std-udp-recv/src/main.cpp @@ -20,10 +20,10 @@ int main (int argc, char *argv[]) { if (argc != 4) { cout << endl; - cout << "Usage: std_udp_recv [udp_recv_config_filename] [module_id] " + cout << "Usage: std_udp_recv [detector_json_filename] [module_id] " "[bit_depth]"; cout << endl; - cout << "\tudp_recv_config_filename: detector config file path." << endl; + cout << "\tdetector_json_filename: detector config file path." << endl; cout << "\tmodule_id: id of the module for this process." << endl; cout << "\tbit_depth: bit depth of the incoming udp packets." << endl; cout << endl; From 451092cc6678b6a6b5d67a12e14471ea117e8736 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 6 Jul 2021 11:21:46 +0200 Subject: [PATCH 05/16] Fix tests for std-udp-recv --- std-udp-recv/test/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/std-udp-recv/test/CMakeLists.txt b/std-udp-recv/test/CMakeLists.txt index 3c5b145..e947c2d 100644 --- a/std-udp-recv/test/CMakeLists.txt +++ b/std-udp-recv/test/CMakeLists.txt @@ -1,6 +1,6 @@ -add_executable(jf-udp-recv-tests main.cpp) +add_executable(std-udp-recv-tests main.cpp) -target_link_libraries(jf-udp-recv-tests +target_link_libraries(std-udp-recv-tests core-buffer-lib std-udp-recv-lib gtest From 6928d931ce0aee24de199f87861acc05b02b4192 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 6 Jul 2021 11:22:23 +0200 Subject: [PATCH 06/16] Add marker for invalid frame_index So far we used 0 but this is not valid int he Eiger case where the frame_index gets set to 0 in each acquisition. --- core-buffer/include/formats.hpp | 2 ++ std-udp-recv/src/FrameUdpReceiver.cpp | 16 ++++++++++------ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/core-buffer/include/formats.hpp b/core-buffer/include/formats.hpp index 7ae731f..e643af4 100644 --- a/core-buffer/include/formats.hpp +++ b/core-buffer/include/formats.hpp @@ -1,6 +1,8 @@ #ifndef SF_DAQ_BUFFER_FORMATS_HPP #define SF_DAQ_BUFFER_FORMATS_HPP +#define INVALID_FRAME_INDEX UINT64_C(-1) + #pragma pack(push) #pragma pack(1) struct ModuleFrame { diff --git a/std-udp-recv/src/FrameUdpReceiver.cpp b/std-udp-recv/src/FrameUdpReceiver.cpp index 92b7a9c..d0304ae 100644 --- a/std-udp-recv/src/FrameUdpReceiver.cpp +++ b/std-udp-recv/src/FrameUdpReceiver.cpp @@ -56,15 +56,17 @@ inline void FrameUdpReceiver::init_frame( frame_metadata.daq_rec = (uint64_t) packet_buffer_[i_packet].debug; frame_metadata.pos_y = (int16_t) packet_buffer_[i_packet].row; frame_metadata.pos_x = (int16_t) packet_buffer_[i_packet].column; + frame_metadata.n_recv_packets = 0; #ifdef DEBUG_OUTPUT using namespace date; cout << " [" << std::chrono::system_clock::now(); cout << "] [FrameUdpReceiver::init_frame] :"; - cout << " || pos_y: " << frame_metadata.pos_x; + cout << " || pos_y: " << frame_metadata.pos_y; cout << " || pos_x: " << frame_metadata.pos_x; cout << " || pulse_id: " << frame_metadata.pulse_id; cout << " || frame_index: " << frame_metadata.frame_index; + cout << " || daq_rec: " << frame_metadata.daq_rec; cout << endl; #endif @@ -95,7 +97,7 @@ inline uint64_t FrameUdpReceiver::process_packets( i_packet++) { // First packet for this frame. - if (metadata.frame_index == 0) { + if (metadata.frame_index == INVALID_FRAME_INDEX) { init_frame(metadata, i_packet); // Happens if the last packet from the previous frame gets lost. @@ -136,25 +138,27 @@ inline uint64_t FrameUdpReceiver::process_packets( packet_buffer_offset_ = 0; } - return metadata.pulse_id; + return metadata.frame_index; } } // We emptied the buffer. packet_buffer_loaded_ = false; packet_buffer_offset_ = 0; - return 0; + return INVALID_FRAME_INDEX; } uint64_t FrameUdpReceiver::get_frame_from_udp( ModuleFrame& meta, char* frame_buffer) { + meta.frame_index = INVALID_FRAME_INDEX; + // Happens when last packet from previous frame was missed. if (packet_buffer_loaded_) { auto frame_index = process_packets( packet_buffer_offset_, meta, frame_buffer); - if (frame_index != 0) { + if (frame_index != INVALID_FRAME_INDEX) { return frame_index; } } @@ -170,7 +174,7 @@ uint64_t FrameUdpReceiver::get_frame_from_udp( auto frame_index = process_packets(0, meta, frame_buffer); - if (frame_index != 0) { + if (frame_index != INVALID_FRAME_INDEX) { return frame_index; } } From b33d13b59567cf67114c3fd6fd68f33b28e0c88a Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 6 Jul 2021 11:22:50 +0200 Subject: [PATCH 07/16] Add hint about failing tests --- README.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/README.md b/README.md index 6f648fe..3d97181 100644 --- a/README.md +++ b/README.md @@ -104,6 +104,15 @@ ln -s "$(pwd)""/""sf_writer" /usr/bin/sf_writer ### Warnings +#### UDP recv tests failing + +In case unit tests for std-udp-recv are failing the most common cause of +problems is the rmem limit. Please increase your rmem_max to something large: + +```bash +echo 2147483646 > /proc/sys/net/core/rmem_max +``` + #### Zeromq Zeromq version 4.1.4 (default on RH7) has a LINGER bug. Sometimes, the last From 881491220ebf1de4ade6fe9f89c84454cb103ed7 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 6 Jul 2021 11:24:11 +0200 Subject: [PATCH 08/16] Add config to streamer --- std-stream-send/include/StreamSendConfig.hpp | 8 +++---- std-stream-send/src/main.cpp | 22 ++++++++++++++++++-- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/std-stream-send/include/StreamSendConfig.hpp b/std-stream-send/include/StreamSendConfig.hpp index 36ecd2c..5bf5531 100644 --- a/std-stream-send/include/StreamSendConfig.hpp +++ b/std-stream-send/include/StreamSendConfig.hpp @@ -17,16 +17,16 @@ struct StreamSendConfig { return { config_parameters["detector_name"].GetString(), - config_parameters["detector_type"].GetString(), config_parameters["n_modules"].GetInt(), - config_parameters["start_udp_port"].GetInt(), + config_parameters["image_n_pixels"].GetInt(), + config_parameters["stream_address"].GetString() }; } const std::string detector_name; - const std::string detector_type; const int n_modules; - const int start_udp_port; + const int image_n_pixels; + const std::string stream_address; }; diff --git a/std-stream-send/src/main.cpp b/std-stream-send/src/main.cpp index c832578..2418c87 100644 --- a/std-stream-send/src/main.cpp +++ b/std-stream-send/src/main.cpp @@ -3,14 +3,30 @@ #include "stream_config.hpp" #include #include +#include #include "RamBuffer.hpp" using namespace std; using namespace stream_config; +using namespace buffer_config; int main (int argc, char *argv[]) { + if (argc != 3) { + cout << endl; + cout << "Usage: std_stream_send [detector_json_filename]" + " [bit_depth]" << endl; + cout << "\tdetector_json_filename: detector config file path." << endl; + cout << "\tbit_depth: bit depth of the incoming udp packets." << endl; + cout << endl; + + exit(-1); + } + + const auto config = StreamSendConfig::from_json_file(string(argv[1])); + const int bit_depth = atoi(argv[2]); + auto ctx = zmq_ctx_new(); zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); @@ -27,12 +43,14 @@ int main (int argc, char *argv[]) throw runtime_error(zmq_strerror(errno)); } - if (zmq_bind(sender, "tcp://127.0.0.1:10000") != 0) { + if (zmq_bind(sender, config.stream_address.c_str()) != 0) { throw runtime_error(zmq_strerror(errno)); } + const size_t IMAGE_N_BYTES = config.image_n_pixels * bit_depth / 8; RamBuffer image_buffer(config.detector_name + "_assembler", - sizeof(ImageMetadata), assembler.get_image_n_bytes(), 1); + sizeof(ImageMetadata), IMAGE_N_BYTES, + config.n_modules, RAM_BUFFER_N_SLOTS); while (true) { From 2948afcdf2cfc8c1389b4181bb6e523f1a1bc457 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 6 Jul 2021 11:25:16 +0200 Subject: [PATCH 09/16] Properly initialize buffer in sync --- std-udp-sync/src/main.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/std-udp-sync/src/main.cpp b/std-udp-sync/src/main.cpp index 1068091..2bfd5dd 100644 --- a/std-udp-sync/src/main.cpp +++ b/std-udp-sync/src/main.cpp @@ -10,10 +10,12 @@ #include "sync_config.hpp" #include "ZmqPulseSyncReceiver.hpp" #include "UdpSyncConfig.hpp" +#include "buffer_config.hpp" using namespace std; using namespace sync_config; +using namespace buffer_config; #ifdef USE_EIGER #include "eiger.hpp" @@ -54,7 +56,7 @@ int main (int argc, char *argv[]) #endif RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame), - FRAME_N_BYTES, config.n_modules); + FRAME_N_BYTES, config.n_modules, RAM_BUFFER_N_SLOTS); ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules); SyncStats stats(config.detector_name, SYNC_STATS_MODULO); From eb18db94463f858c0349f456448b8c637947f9d7 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 6 Jul 2021 11:25:41 +0200 Subject: [PATCH 10/16] Frame index reset happens as part of the acquire function --- std-udp-recv/src/main.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/std-udp-recv/src/main.cpp b/std-udp-recv/src/main.cpp index 3b8274e..591d9b5 100644 --- a/std-udp-recv/src/main.cpp +++ b/std-udp-recv/src/main.cpp @@ -59,8 +59,7 @@ int main (int argc, char *argv[]) { char* data = new char[FRAME_N_BYTES]; while (true) { - // Reset the metadata and frame buffer for the next frame. - meta.frame_index = 0; + // Reset the data buffer. memset(data, 0, FRAME_N_BYTES); receiver.get_frame_from_udp(meta, data); From 644d6f4f6a441241b4926a20cd15125e6fe7e1d8 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 6 Jul 2021 11:31:28 +0200 Subject: [PATCH 11/16] Improve output log --- std-udp-recv/src/FrameUdpReceiver.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/std-udp-recv/src/FrameUdpReceiver.cpp b/std-udp-recv/src/FrameUdpReceiver.cpp index d0304ae..cd12760 100644 --- a/std-udp-recv/src/FrameUdpReceiver.cpp +++ b/std-udp-recv/src/FrameUdpReceiver.cpp @@ -120,9 +120,10 @@ inline uint64_t FrameUdpReceiver::process_packets( #ifdef DEBUG_OUTPUT using namespace date; cout << " [" << std::chrono::system_clock::now(); - cout << "] [frameudpreceiver::process_packets] :"; + cout << "] [FrameUdpReceiver::process_packets] :"; cout << " frame " << metadata.frame_index << " || "; - cout << packet_buffer_[i_packet].packetnum << " packets received."; + cout << packet_buffer_[i_packet].packetnum+1; + cout << " packets received."; cout << " pulse id "<< metadata.pulse_id; cout << endl; #endif From f8b9a7a5188bf4e083057341b361e3502a6762f0 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 6 Jul 2021 11:32:16 +0200 Subject: [PATCH 12/16] Fix Packet receiver tests --- std-udp-recv/test/test_PacketUdpReceiver.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/std-udp-recv/test/test_PacketUdpReceiver.cpp b/std-udp-recv/test/test_PacketUdpReceiver.cpp index 5bc195f..9bb4f51 100644 --- a/std-udp-recv/test/test_PacketUdpReceiver.cpp +++ b/std-udp-recv/test/test_PacketUdpReceiver.cpp @@ -1,5 +1,4 @@ #include -#include #include "gtest/gtest.h" #include "mock/udp.hpp" #include "PacketUdpReceiver.hpp" @@ -7,8 +6,16 @@ #include #include +#ifdef USE_EIGER +#include "eiger.hpp" +#else +#include "jungfrau.hpp" +#endif + using namespace std; +const int N_PACKETS_PER_FRAME = 128; + TEST(PacketUdpReceiver, simple_recv) { uint16_t udp_port = MOCK_UDP_PORT; From a0f7b6bd861f73e60670036ffbf9ea7c55de372d Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 6 Jul 2021 11:32:46 +0200 Subject: [PATCH 13/16] Adapt FrameUdpReceiver tests to new code --- std-udp-recv/test/test_FrameUdpReceiver.cpp | 32 +++++++++++---------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/std-udp-recv/test/test_FrameUdpReceiver.cpp b/std-udp-recv/test/test_FrameUdpReceiver.cpp index e22356d..5a9f116 100644 --- a/std-udp-recv/test/test_FrameUdpReceiver.cpp +++ b/std-udp-recv/test/test_FrameUdpReceiver.cpp @@ -1,5 +1,4 @@ #include -#include #include "gtest/gtest.h" #include "FrameUdpReceiver.hpp" #include "mock/udp.hpp" @@ -10,10 +9,12 @@ using namespace std; +const int DATA_BYTES_PER_FRAME = 512*1024*2; + TEST(BufferUdpReceiver, simple_recv) { - auto n_packets = N_PACKETS_PER_FRAME; - int source_id = 1234; + auto n_packets = 128; + int module_id = 0; int n_frames = 5; uint16_t udp_port = MOCK_UDP_PORT; @@ -21,7 +22,7 @@ TEST(BufferUdpReceiver, simple_recv) auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); ASSERT_TRUE(send_socket_fd >= 0); - FrameUdpReceiver udp_receiver(udp_port, source_id); + FrameUdpReceiver udp_receiver(udp_port, n_packets); auto handle = async(launch::async, [&](){ for (int i_frame=0; i_frame < n_frames; i_frame++){ @@ -45,19 +46,20 @@ TEST(BufferUdpReceiver, simple_recv) handle.wait(); - ModuleFrame metadata; + ModuleFrame meta; + meta.module_id = module_id; + meta.bit_depth = 16; auto frame_buffer = make_unique(DATA_BYTES_PER_FRAME); for (int i_frame=0; i_frame < n_frames; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp( - metadata, frame_buffer.get()); + udp_receiver.get_frame_from_udp(meta, frame_buffer.get()); - ASSERT_EQ(i_frame + 1, pulse_id); - ASSERT_EQ(metadata.frame_index, i_frame + 1000); - ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + ASSERT_EQ(i_frame + 1, meta.pulse_id); + ASSERT_EQ(meta.frame_index, i_frame + 1000); + ASSERT_EQ(meta.daq_rec, i_frame + 10000); // -1 because we skipped a packet. - ASSERT_EQ(metadata.n_recv_packets, n_packets); - ASSERT_EQ(metadata.module_id, source_id); + ASSERT_EQ(meta.n_recv_packets, n_packets); + ASSERT_EQ(meta.module_id, module_id); } ::close(send_socket_fd); @@ -65,7 +67,7 @@ TEST(BufferUdpReceiver, simple_recv) TEST(BufferUdpReceiver, missing_middle_packet) { - auto n_packets = N_PACKETS_PER_FRAME; + auto n_packets = 128; int source_id = 1234; int n_frames = 3; @@ -123,7 +125,7 @@ TEST(BufferUdpReceiver, missing_middle_packet) TEST(BufferUdpReceiver, missing_first_packet) { - auto n_packets = N_PACKETS_PER_FRAME; + auto n_packets = 128; int source_id = 1234; int n_frames = 3; @@ -181,7 +183,7 @@ TEST(BufferUdpReceiver, missing_first_packet) TEST(BufferUdpReceiver, missing_last_packet) { - auto n_packets = N_PACKETS_PER_FRAME; + auto n_packets = 128; int source_id = 1234; int n_frames = 3; From 97b7ed12bc4238d124e66bdc7bae18862d7cb61b Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 6 Jul 2021 11:45:25 +0200 Subject: [PATCH 14/16] Fix test --- std-udp-recv/test/test_FrameUdpReceiver.cpp | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/std-udp-recv/test/test_FrameUdpReceiver.cpp b/std-udp-recv/test/test_FrameUdpReceiver.cpp index 5a9f116..0ebc5ce 100644 --- a/std-udp-recv/test/test_FrameUdpReceiver.cpp +++ b/std-udp-recv/test/test_FrameUdpReceiver.cpp @@ -68,7 +68,7 @@ TEST(BufferUdpReceiver, simple_recv) TEST(BufferUdpReceiver, missing_middle_packet) { auto n_packets = 128; - int source_id = 1234; + int module_id = 1234; int n_frames = 3; uint16_t udp_port = MOCK_UDP_PORT; @@ -76,7 +76,7 @@ TEST(BufferUdpReceiver, missing_middle_packet) auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); ASSERT_TRUE(send_socket_fd >= 0); - FrameUdpReceiver udp_receiver(udp_port, source_id); + FrameUdpReceiver udp_receiver(udp_port, n_packets); auto handle = async(launch::async, [&](){ for (int i_frame=0; i_frame < n_frames; i_frame++){ @@ -105,19 +105,20 @@ TEST(BufferUdpReceiver, missing_middle_packet) handle.wait(); - ModuleFrame metadata; + ModuleFrame meta; + meta.module_id = module_id; + auto frame_buffer = make_unique(DATA_BYTES_PER_FRAME); for (int i_frame=0; i_frame < n_frames; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp( - metadata, frame_buffer.get()); + udp_receiver.get_frame_from_udp(meta, frame_buffer.get()); - ASSERT_EQ(i_frame + 1, pulse_id); - ASSERT_EQ(metadata.frame_index, i_frame + 1000); - ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + ASSERT_EQ(i_frame + 1, meta.pulse_id); + ASSERT_EQ(meta.frame_index, i_frame + 1000); + ASSERT_EQ(meta.daq_rec, i_frame + 10000); // -1 because we skipped a packet. - ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); - ASSERT_EQ(metadata.module_id, source_id); + ASSERT_EQ(meta.n_recv_packets, n_packets - 1); + ASSERT_EQ(meta.module_id, module_id); } ::close(send_socket_fd); From 48932e637d62a0d49cb22ecb6477c31937ea2024 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 6 Jul 2021 11:48:11 +0200 Subject: [PATCH 15/16] Fix test for udp receiving --- std-udp-recv/test/test_FrameUdpReceiver.cpp | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/std-udp-recv/test/test_FrameUdpReceiver.cpp b/std-udp-recv/test/test_FrameUdpReceiver.cpp index 0ebc5ce..8169626 100644 --- a/std-udp-recv/test/test_FrameUdpReceiver.cpp +++ b/std-udp-recv/test/test_FrameUdpReceiver.cpp @@ -127,7 +127,7 @@ TEST(BufferUdpReceiver, missing_middle_packet) TEST(BufferUdpReceiver, missing_first_packet) { auto n_packets = 128; - int source_id = 1234; + int module_id = 1234; int n_frames = 3; uint16_t udp_port = MOCK_UDP_PORT; @@ -135,7 +135,7 @@ TEST(BufferUdpReceiver, missing_first_packet) auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); ASSERT_TRUE(send_socket_fd >= 0); - FrameUdpReceiver udp_receiver(udp_port, source_id); + FrameUdpReceiver udp_receiver(udp_port, n_packets); auto handle = async(launch::async, [&](){ for (int i_frame=0; i_frame < n_frames; i_frame++){ @@ -164,19 +164,19 @@ TEST(BufferUdpReceiver, missing_first_packet) handle.wait(); - ModuleFrame metadata; + ModuleFrame meta; + meta.module_id = module_id; auto frame_buffer = make_unique(DATA_BYTES_PER_FRAME); for (int i_frame=0; i_frame < n_frames; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp( - metadata, frame_buffer.get()); + udp_receiver.get_frame_from_udp(meta, frame_buffer.get()); - ASSERT_EQ(i_frame + 1, pulse_id); - ASSERT_EQ(metadata.frame_index, i_frame + 1000); - ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + ASSERT_EQ(i_frame + 1, meta.pulse_id); + ASSERT_EQ(meta.frame_index, i_frame + 1000); + ASSERT_EQ(meta.daq_rec, i_frame + 10000); // -1 because we skipped a packet. - ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); - ASSERT_EQ(metadata.module_id, source_id); + ASSERT_EQ(meta.n_recv_packets, n_packets - 1); + ASSERT_EQ(meta.module_id, module_id); } ::close(send_socket_fd); From fbad86ac67dcd6a5ea86ea0123037aca911bb089 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 6 Jul 2021 11:50:38 +0200 Subject: [PATCH 16/16] Fix all tests for udp recv --- std-udp-recv/test/test_FrameUdpReceiver.cpp | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/std-udp-recv/test/test_FrameUdpReceiver.cpp b/std-udp-recv/test/test_FrameUdpReceiver.cpp index 8169626..295aec7 100644 --- a/std-udp-recv/test/test_FrameUdpReceiver.cpp +++ b/std-udp-recv/test/test_FrameUdpReceiver.cpp @@ -185,7 +185,7 @@ TEST(BufferUdpReceiver, missing_first_packet) TEST(BufferUdpReceiver, missing_last_packet) { auto n_packets = 128; - int source_id = 1234; + int module_id = 1234; int n_frames = 3; uint16_t udp_port = MOCK_UDP_PORT; @@ -193,7 +193,7 @@ TEST(BufferUdpReceiver, missing_last_packet) auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); ASSERT_TRUE(send_socket_fd >= 0); - FrameUdpReceiver udp_receiver(udp_port, source_id); + FrameUdpReceiver udp_receiver(udp_port, n_packets); auto handle = async(launch::async, [&](){ for (int i_frame=0; i_frame < n_frames; i_frame++){ @@ -222,20 +222,20 @@ TEST(BufferUdpReceiver, missing_last_packet) handle.wait(); - ModuleFrame metadata; + ModuleFrame meta; + meta.module_id = module_id; auto frame_buffer = make_unique(DATA_BYTES_PER_FRAME); // n_frames -1 because the last frame is not complete. for (int i_frame=0; i_frame < n_frames - 1; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp( - metadata, frame_buffer.get()); + udp_receiver.get_frame_from_udp(meta, frame_buffer.get()); - ASSERT_EQ(i_frame + 1, pulse_id); - ASSERT_EQ(metadata.frame_index, i_frame + 1000); - ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + ASSERT_EQ(i_frame + 1, meta.pulse_id); + ASSERT_EQ(meta.frame_index, i_frame + 1000); + ASSERT_EQ(meta.daq_rec, i_frame + 10000); // -1 because we skipped a packet. - ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); - ASSERT_EQ(metadata.module_id, source_id); + ASSERT_EQ(meta.n_recv_packets, n_packets - 1); + ASSERT_EQ(meta.module_id, module_id); } ::close(send_socket_fd);