diff --git a/core-buffer/include/jungfraujoch.hpp b/core-buffer/include/jungfraujoch.hpp index fd549b2..6d219ca 100644 --- a/core-buffer/include/jungfraujoch.hpp +++ b/core-buffer/include/jungfraujoch.hpp @@ -1,13 +1,13 @@ -#ifndef JUNGFRAUJOCH_H -#define JUNGFRAUJOCH_H +#ifndef JUNGFRAUJOCH_HPP +#define JUNGFRAUJOCH_HPP #include #define JFJOCH_N_MODULES 32 #define JFJOCH_BYTES_PER_PACKET 8240 #define JFJOCH_DATA_BYTES_PER_PACKET 8192 -#define JFJOCH_N_PACKETS_PER_FRAME JFJOCH_N_MODULES * 128 -#define JFJOCH_DATA_BYTES_PER_FRAME JFJOCH_N_MODULES * 1048576 +#define JFJOCH_N_PACKETS_PER_FRAME (JFJOCH_N_MODULES * 128) +#define JFJOCH_DATA_BYTES_PER_FRAME (JFJOCH_N_MODULES * 1048576) // 48 bytes + 8192 bytes = 8240 bytes #pragma pack(push) @@ -17,7 +17,7 @@ struct jfjoch_packet_t { uint32_t exptime; uint32_t packetnum; - uint64_t bunchid; + int64_t bunchid; uint64_t timestamp; uint16_t moduleID; diff --git a/jfj-udp-recv/CMakeLists.txt b/jfj-udp-recv/CMakeLists.txt index 7380471..8a19873 100644 --- a/jfj-udp-recv/CMakeLists.txt +++ b/jfj-udp-recv/CMakeLists.txt @@ -9,4 +9,4 @@ set_target_properties(jfj-udp-recv PROPERTIES OUTPUT_NAME jf_udp_recv) target_link_libraries(jfj-udp-recv jfj-udp-recv-lib zmq rt) enable_testing() -# add_subdirectory(test/) +add_subdirectory(test/) diff --git a/jfj-udp-recv/include/PacketBuffer.hpp b/jfj-udp-recv/include/PacketBuffer.hpp index b2be33b..0697a5f 100644 --- a/jfj-udp-recv/include/PacketBuffer.hpp +++ b/jfj-udp-recv/include/PacketBuffer.hpp @@ -32,8 +32,8 @@ public: // ~PacketBuffer() {}; /**Diagnostics**/ - size_t size() const { return ( idx_write-idx_read ); } - size_t capacity() const { return m_capacity; } + int size() const { return ( idx_write-idx_read ); } + int capacity() const { return m_capacity; } bool is_full() const { return bool(idx_write >= m_capacity); } bool is_empty() const { return bool(idx_write <= idx_read); } @@ -52,6 +52,8 @@ public: void fill_from(TY& recv){ std::lock_guard g_guard(m_mutex); this->idx_write = recv.receive_many(m_msgs, this->capacity()); + // Returns -1 with errno=11 if no data received + if(idx_write==-1){ idx_write = 0; } this->idx_read = 0; } @@ -62,8 +64,8 @@ private: /**Guards**/ std::mutex m_mutex; /**Read and write index**/ - size_t idx_write = 0; - size_t idx_read = 0; + int idx_write = 0; + int idx_read = 0; // C-structures as expected by mmsghdr m_msgs[CAPACITY]; diff --git a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp index c7c6724..059da2b 100644 --- a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp +++ b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp @@ -1,10 +1,17 @@ #include -#include #include "JfjFrameUdpReceiver.hpp" using namespace std; using namespace buffer_config; +std::ostream &operator<<(std::ostream &os, jfjoch_packet_t const &packet) { + os << "Frame number: " << packet.framenum << std::endl; + os << "Packet number: " << packet.packetnum << std::endl; + os << "Bunch id: " << packet.bunchid << std::endl; + os << std::endl; + return os; +} + JfjFrameUdpReceiver::JfjFrameUdpReceiver(const uint16_t port) { m_udp_receiver.bind(port); } @@ -13,7 +20,9 @@ JfjFrameUdpReceiver::~JfjFrameUdpReceiver() { m_udp_receiver.disconnect(); } -inline void JfjFrameUdpReceiver::init_frame(ModuleFrame& metadata, const jfjoch_packet_t& c_packet) { +inline void JfjFrameUdpReceiver::init_frame(ModuleFrame& metadata, const jfjoch_packet_t& c_packet) { + // std::cout << c_packet; + metadata.pulse_id = c_packet.bunchid; metadata.frame_index = c_packet.framenum; metadata.daq_rec = (uint64_t) c_packet.debug; @@ -26,6 +35,7 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char // Happens if the last packet from the previous frame gets lost. if (m_frame_index != m_buffer.peek_front().framenum) { m_frame_index = m_buffer.peek_front().framenum; + std::cout << "Peeked pulse: " << metadata.pulse_id << std::endl; return metadata.pulse_id; } @@ -48,7 +58,7 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char } // We emptied the buffer. - m_buffer.reset(); + // m_buffer.reset(); return 0; } @@ -56,8 +66,7 @@ uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ModuleFrame& metadata, char* fr // Reset the metadata and frame buffer for the next frame. (really needed?) metadata.pulse_id = 0; metadata.n_recv_packets = 0; - memset(frame_buffer, 0, JFJOCH_DATA_BYTES_PER_PACKET); - + memset(frame_buffer, 0, JFJOCH_DATA_BYTES_PER_FRAME); // Process leftover packages in the buffer if (!m_buffer.is_empty()) { auto pulse_id = process_packets(metadata, frame_buffer); @@ -66,7 +75,8 @@ uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ModuleFrame& metadata, char* fr while (true) { // Receive new packages (pass if none)... - m_buffer.fill_from(m_udp_receiver); + m_buffer.reset(); + m_buffer.fill_from(m_udp_receiver); if (m_buffer.is_empty()) { continue; } // ... and process them diff --git a/jfj-udp-recv/src/main.cpp b/jfj-udp-recv/src/main.cpp index 2c52f25..2afb451 100644 --- a/jfj-udp-recv/src/main.cpp +++ b/jfj-udp-recv/src/main.cpp @@ -18,8 +18,7 @@ int main (int argc, char *argv[]) { if (argc != 3) { cout << endl; - cout << "Usage: jfj_udp_recv [detector_json_filename]"; - cout << endl; + cout << "Usage: jfj_udp_recv [detector_json_filename]" << endl; cout << "\tdetector_json_filename: detector config file path." << endl; cout << endl; diff --git a/jfj-udp-recv/test/CMakeLists.txt b/jfj-udp-recv/test/CMakeLists.txt index 25c729a..77eeb3a 100644 --- a/jfj-udp-recv/test/CMakeLists.txt +++ b/jfj-udp-recv/test/CMakeLists.txt @@ -1,8 +1,8 @@ -add_executable(jf-udp-recv-tests main.cpp) +add_executable(jfj-udp-recv-tests main.cpp) -target_link_libraries(jf-udp-recv-tests +target_link_libraries(jfj-udp-recv-tests core-buffer-lib - jf-udp-recv-lib + jfj-udp-recv-lib gtest ) diff --git a/jfj-udp-recv/test/test_FrameUdpReceiver.cpp b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp index e1e86ab..3f490c7 100644 --- a/jfj-udp-recv/test/test_FrameUdpReceiver.cpp +++ b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp @@ -1,7 +1,7 @@ #include -#include +#include #include "gtest/gtest.h" -#include "FrameUdpReceiver.hpp" +#include "JfjFrameUdpReceiver.hpp" #include "mock/udp.hpp" #include @@ -12,7 +12,7 @@ using namespace std; TEST(BufferUdpReceiver, simple_recv) { - auto n_packets = JF_N_PACKETS_PER_FRAME; + int n_packets = JFJOCH_N_PACKETS_PER_FRAME; int n_frames = 5; uint16_t udp_port = MOCK_UDP_PORT; @@ -23,9 +23,9 @@ TEST(BufferUdpReceiver, simple_recv) JfjFrameUdpReceiver udp_receiver(udp_port); auto handle = async(launch::async, [&](){ - for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + auto frame_buffer = make_unique(JFJOCH_DATA_BYTES_PER_FRAME); for (int i_frame=0; i_frame < n_frames; i_frame++) { auto pulse_id = udp_receiver.get_frame_from_udp( @@ -63,7 +63,7 @@ TEST(BufferUdpReceiver, simple_recv) TEST(BufferUdpReceiver, missing_middle_packet) { - auto n_packets = JF_N_PACKETS_PER_FRAME; + int n_packets = JFJOCH_N_PACKETS_PER_FRAME; int n_frames = 3; uint16_t udp_port = MOCK_UDP_PORT; @@ -71,17 +71,17 @@ TEST(BufferUdpReceiver, missing_middle_packet) auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); ASSERT_TRUE(send_socket_fd >= 0); - JfjFrameUdpReceiver udp_receiver(udp_port, source_id); + JfjFrameUdpReceiver udp_receiver(udp_port); auto handle = async(launch::async, [&](){ - for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + auto frame_buffer = make_unique(JFJOCH_DATA_BYTES_PER_FRAME); for (int i_frame=0; i_frame < n_frames; i_frame++) { auto pulse_id = udp_receiver.get_frame_from_udp( @@ -119,7 +119,7 @@ TEST(BufferUdpReceiver, missing_middle_packet) TEST(BufferUdpReceiver, missing_first_packet) { - auto n_packets = JF_N_PACKETS_PER_FRAME; + auto n_packets = JFJOCH_N_PACKETS_PER_FRAME; int n_frames = 3; uint16_t udp_port = MOCK_UDP_PORT; @@ -130,14 +130,14 @@ TEST(BufferUdpReceiver, missing_first_packet) JfjFrameUdpReceiver udp_receiver(udp_port); auto handle = async(launch::async, [&](){ - for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + auto frame_buffer = make_unique(JFJOCH_DATA_BYTES_PER_FRAME); for (int i_frame=0; i_frame < n_frames; i_frame++) { auto pulse_id = udp_receiver.get_frame_from_udp( @@ -175,7 +175,7 @@ TEST(BufferUdpReceiver, missing_first_packet) TEST(BufferUdpReceiver, missing_last_packet) { - auto n_packets = JF_N_PACKETS_PER_FRAME; + int n_packets = JFJOCH_N_PACKETS_PER_FRAME; int n_frames = 3; uint16_t udp_port = MOCK_UDP_PORT; @@ -186,14 +186,14 @@ TEST(BufferUdpReceiver, missing_last_packet) JfjFrameUdpReceiver udp_receiver(udp_port); auto handle = async(launch::async, [&](){ - for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + auto frame_buffer = make_unique(JFJOCH_DATA_BYTES_PER_FRAME); // n_frames -1 because the last frame is not complete. for (int i_frame=0; i_frame < n_frames - 1; i_frame++) {