From 9d3e4cb0eb09551c9cd18d8615182b285f1d6d06 Mon Sep 17 00:00:00 2001 From: Mohacsi Istvan Date: Wed, 30 Jun 2021 17:38:03 +0200 Subject: [PATCH] Dynamic number of modules and cleanup --- core-buffer/include/formats.hpp | 5 +- jfj-combined/CMakeLists.txt | 2 +- jfj-combined/include/JfjFrameWorker.hpp | 4 + jfj-combined/include/PacketBuffer.hpp | 6 +- jfj-combined/include/ZmqImagePublisher.hpp | 6 +- jfj-combined/src/JfjFrameWorker.cpp | 12 +- jfj-combined/src/main.cpp | 40 ++-- jfj-udp-recv/CMakeLists.txt | 12 -- jfj-udp-recv/README.md | 164 --------------- jfj-udp-recv/include/JfjFrameStats.hpp | 31 --- jfj-udp-recv/include/JfjFrameUdpReceiver.hpp | 36 ---- jfj-udp-recv/include/PacketBuffer.hpp | 113 ----------- jfj-udp-recv/include/PacketUdpReceiver.hpp | 22 -- jfj-udp-recv/src/JfjFrameStats.cpp | 71 ------- jfj-udp-recv/src/JfjFrameUdpReceiver.cpp | 86 -------- jfj-udp-recv/src/PacketUdpReceiver.cpp | 66 ------ jfj-udp-recv/src/main.cpp | 74 ------- jfj-udp-recv/test/CMakeLists.txt | 8 - jfj-udp-recv/test/main.cpp | 11 - jfj-udp-recv/test/mock/udp.hpp | 16 -- jfj-udp-recv/test/test_FrameUdpReceiver.cpp | 199 ------------------- jfj-udp-recv/test/test_PacketBuffer.cpp | 76 ------- jfj-udp-recv/test/test_PacketUdpReceiver.cpp | 170 ---------------- 23 files changed, 45 insertions(+), 1185 deletions(-) delete mode 100644 jfj-udp-recv/CMakeLists.txt delete mode 100644 jfj-udp-recv/README.md delete mode 100644 jfj-udp-recv/include/JfjFrameStats.hpp delete mode 100644 jfj-udp-recv/include/JfjFrameUdpReceiver.hpp delete mode 100644 jfj-udp-recv/include/PacketBuffer.hpp delete mode 100644 jfj-udp-recv/include/PacketUdpReceiver.hpp delete mode 100644 jfj-udp-recv/src/JfjFrameStats.cpp delete mode 100644 jfj-udp-recv/src/JfjFrameUdpReceiver.cpp delete mode 100644 jfj-udp-recv/src/PacketUdpReceiver.cpp delete mode 100644 jfj-udp-recv/src/main.cpp delete mode 100644 jfj-udp-recv/test/CMakeLists.txt delete mode 100644 jfj-udp-recv/test/main.cpp delete mode 100644 jfj-udp-recv/test/mock/udp.hpp delete mode 100644 jfj-udp-recv/test/test_FrameUdpReceiver.cpp delete mode 100644 jfj-udp-recv/test/test_PacketBuffer.cpp delete mode 100644 jfj-udp-recv/test/test_PacketUdpReceiver.cpp diff --git a/core-buffer/include/formats.hpp b/core-buffer/include/formats.hpp index f8f9ce9..4ad623c 100644 --- a/core-buffer/include/formats.hpp +++ b/core-buffer/include/formats.hpp @@ -1,10 +1,13 @@ #ifndef SF_DAQ_BUFFER_FORMATS_HPP #define SF_DAQ_BUFFER_FORMATS_HPP +#include +#include + #include "buffer_config.hpp" #include "jungfrau.hpp" #include "jungfraujoch.hpp" -#include + #pragma pack(push) #pragma pack(1) diff --git a/jfj-combined/CMakeLists.txt b/jfj-combined/CMakeLists.txt index b3a123c..4932c5d 100644 --- a/jfj-combined/CMakeLists.txt +++ b/jfj-combined/CMakeLists.txt @@ -2,7 +2,7 @@ file(GLOB SOURCES src/*.cpp) add_library(jfj-combined-lib STATIC ${SOURCES}) target_include_directories(jfj-combined-lib PUBLIC include/) -target_link_libraries(jfj-combined-lib external) +target_link_libraries(jfj-combined-lib external core-buffer-lib) add_executable(jfj-combined src/main.cpp) set_target_properties(jfj-combined PROPERTIES OUTPUT_NAME jfj_combined) diff --git a/jfj-combined/include/JfjFrameWorker.hpp b/jfj-combined/include/JfjFrameWorker.hpp index 8591b29..12ecd7d 100644 --- a/jfj-combined/include/JfjFrameWorker.hpp +++ b/jfj-combined/include/JfjFrameWorker.hpp @@ -39,6 +39,10 @@ public: std::function callback); virtual ~JfjFrameWorker(); void run(); + + // Copy semantics : OFF + JfjFrameWorker(JfjFrameWorker const &) = delete; + JfjFrameWorker& operator=(JfjFrameWorker const &) = delete; }; #endif //SF_DAQ_BUFFER_JFJ_FRAMEWORKER_HPP diff --git a/jfj-combined/include/PacketBuffer.hpp b/jfj-combined/include/PacketBuffer.hpp index b513991..45feb32 100644 --- a/jfj-combined/include/PacketBuffer.hpp +++ b/jfj-combined/include/PacketBuffer.hpp @@ -89,7 +89,7 @@ private: template T& PacketBuffer::pop_front(){ std::lock_guard g_guard(m_mutex); - if(this->is_empty()) [[unlikely]] { throw std::out_of_range("Attempted to read empty queue!"); } + if(this->is_empty()) { throw std::out_of_range("Attempted to read empty queue!"); } idx_read++; return m_container[idx_read-1]; } @@ -100,7 +100,7 @@ T& PacketBuffer::pop_front(){ template const T& PacketBuffer::peek_front(){ std::lock_guard g_guard(m_mutex); - if(this->is_empty()) [[unlikely]] { throw std::out_of_range("Attempted to read empty queue!"); } + if(this->is_empty()) { throw std::out_of_range("Attempted to read empty queue!"); } return m_container[idx_read]; } @@ -109,7 +109,7 @@ const T& PacketBuffer::peek_front(){ template void PacketBuffer::push_back(T item){ std::lock_guard g_guard(m_mutex); - if(this->is_full()) [[unlikely]] { throw std::out_of_range("Attempted to write a full buffer!"); } + if(this->is_full()) { throw std::out_of_range("Attempted to write a full buffer!"); } m_container[idx_write] = item; idx_write++; } diff --git a/jfj-combined/include/ZmqImagePublisher.hpp b/jfj-combined/include/ZmqImagePublisher.hpp index a9dc117..b105c78 100644 --- a/jfj-combined/include/ZmqImagePublisher.hpp +++ b/jfj-combined/include/ZmqImagePublisher.hpp @@ -7,7 +7,7 @@ #define ASSERT_FALSE(expr, msg) \ - if(bool(expr)) [[unlikely]] { \ + if(bool(expr)) { \ std::string text = "ASSERTION called at " + std::string(__FILE__) + " line " + std::to_string(__LINE__) + "\n"; \ text = text + "Reason: " + std::to_string(expr) + "\n"; \ text = text + "Message:" + msg + "\nErrno: " + std::to_string(errno); \ @@ -15,7 +15,7 @@ } \ #define ASSERT_TRUE(expr, msg) \ - if(!bool(expr)) [[unlikely]] { \ + if(!bool(expr)) { \ std::string text = "ASSERTION called at " + std::string(__FILE__) + " line " + std::to_string(__LINE__) + "\n"; \ text = text + "Reason: " + std::to_string(expr) + "\n"; \ text = text + "Message:" + msg + "\nErrno: " + std::to_string(errno); \ @@ -67,10 +67,8 @@ class ZmqImagePublisher: public ZmqPublisher { ASSERT_TRUE( len >=0, "Failed to send topic data" ) len = m_socket.send(&image.meta, sizeof(image.meta), ZMQ_SNDMORE); ASSERT_TRUE( len >=0, "Failed to send meta data" ) - // std::cout << "\tPT1 Sent " << len << "\n"; len = m_socket.send(image.data.data(), image.data.size(), 0); ASSERT_TRUE( len >=0, "Failed to send image data" ) - // std::cout << "\tPT1 Sent " << len << "\n"; std::cout << "Sent ZMQ stream of pulse: " << image.meta.pulse_id << std::endl; } diff --git a/jfj-combined/src/JfjFrameWorker.cpp b/jfj-combined/src/JfjFrameWorker.cpp index 9ade8af..551a024 100644 --- a/jfj-combined/src/JfjFrameWorker.cpp +++ b/jfj-combined/src/JfjFrameWorker.cpp @@ -22,7 +22,7 @@ inline uint64_t JfjFrameWorker::process_packets(BufferBinaryFormat& buffer){ while(!m_buffer.is_empty()){ // Happens if the last packet from the previous frame gets lost. - if (m_current_index != m_buffer.peek_front().bunchid) [[unlikely]] { + if (m_current_index != m_buffer.peek_front().bunchid) { m_current_index = m_buffer.peek_front().bunchid; if(this->in_progress){ this->in_progress = false; @@ -34,14 +34,14 @@ inline uint64_t JfjFrameWorker::process_packets(BufferBinaryFormat& buffer){ jfjoch_packet_t& c_packet = m_buffer.pop_front(); // Sanity check: rather throw than segfault... - if(c_packet.packetnum >= JF_N_PACKETS_PER_FRAME) [[unlikely]] { + if(c_packet.packetnum >= JF_N_PACKETS_PER_FRAME) { std::stringstream ss; ss << "Packet index '" << c_packet.packetnum << "' is out of range of " << JF_N_PACKETS_PER_FRAME << std::endl; throw std::range_error(ss.str()); } // Start new frame - if(!this->in_progress) [[unlikely]] { + if(!this->in_progress) { m_current_index = c_packet.bunchid; this->in_progress = true; @@ -58,7 +58,7 @@ inline uint64_t JfjFrameWorker::process_packets(BufferBinaryFormat& buffer){ buffer.meta.n_recv_packets++; // Last frame packet received. Frame finished. - if (c_packet.packetnum == JF_N_PACKETS_PER_FRAME - 1) [[unlikely]] { + if (c_packet.packetnum == JF_N_PACKETS_PER_FRAME - 1) { this->in_progress = false; return buffer.meta.pulse_id; } @@ -77,7 +77,7 @@ uint64_t JfjFrameWorker::get_frame(BufferBinaryFormat& buffer){ do { // First make sure the buffer is drained of leftovers pulse_id = process_packets(buffer); - if (pulse_id != 0) [[likely]] { return pulse_id; } + if (pulse_id != 0) { return pulse_id; } // Then try to refill buffer... m_buffer.fill_from(m_udp_receiver); @@ -96,7 +96,7 @@ void JfjFrameWorker::run(){ auto pulse_id = get_frame(buffer); m_moduleStats.record_stats(buffer.meta, true); - if(pulse_id>10) [[likely]] { + if(pulse_id>10) { f_push_callback(pulse_id, m_moduleID, buffer); } } diff --git a/jfj-combined/src/main.cpp b/jfj-combined/src/main.cpp index 632c37a..8567c57 100644 --- a/jfj-combined/src/main.cpp +++ b/jfj-combined/src/main.cpp @@ -3,8 +3,8 @@ #include #include -#include "../../core-buffer/include/BufferUtils.hpp" -#include "../../core-buffer/include/formats.hpp" +#include "BufferUtils.hpp" +#include "formats.hpp" #include "../include/JfjFrameCache.hpp" #include "../include/JfjFrameWorker.hpp" #include "../include/ZmqImagePublisher.hpp" @@ -12,12 +12,12 @@ int main (int argc, char *argv[]) { if (argc != 2) { - cout << "\nUsage: jf_buffer_writer [detector_json_filename]\n"; - cout << "\tdetector_json_filename: detector config file path." << std::endl; + std::cout << "\nUsage: jf_buffer_writer [detector_json_filename]\n"; + std::cout << "\tdetector_json_filename: detector config file path." << std::endl; exit(-1); } - const auto config = read_json_config(std::string(argv[1])); + const auto config = BufferUtils::read_json_config(std::string(argv[1])); // // Module name // char mn[128]; @@ -39,23 +39,33 @@ int main (int argc, char *argv[]) { std::bind(&FrameCache::emplace, &cache, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); std::cout << "Creating workers..." << std::endl; - std::vector vWorkers; + std::vector> vWorkers; - JfjFrameWorker W0(5005, "JOCH3M", 0, push_cb); - JfjFrameWorker W1(5006, "JOCH3M", 1, push_cb); - JfjFrameWorker W2(5007, "JOCH3M", 2, push_cb); + for(int mm=0; mm(config.start_udp_port+mm, config.detector_name, mm, push_cb) ); + } + // JfjFrameWorker W0(5005, "JOCH3M", 0, push_cb); + // JfjFrameWorker W1(5006, "JOCH3M", 1, push_cb); + // JfjFrameWorker W2(5007, "JOCH3M", 2, push_cb); std::cout << "Starting worker threads..." << std::endl; std::vector vThreads; + + for(int mm=0; mm -#include -#include - -#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP -#define SF_DAQ_BUFFER_FRAMESTATS_HPP - - -class FrameStats { - const std::string detector_name_; - const int module_id_; - size_t stats_time_; - - int frames_counter_; - int n_missed_packets_; - int n_corrupted_frames_; - int n_corrupted_pulse_id_; - std::chrono::time_point stats_interval_start_; - - void reset_counters(); - void print_stats(); - -public: - FrameStats(const std::string &detector_name, - const int module_id, - const size_t stats_time); - void record_stats(const ModuleFrame &meta, const bool bad_pulse_id); -}; - - -#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP diff --git a/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp deleted file mode 100644 index 4b84647..0000000 --- a/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp +++ /dev/null @@ -1,36 +0,0 @@ -#ifndef SF_DAQ_BUFFER_JFJ_UDPRECEIVER_HPP -#define SF_DAQ_BUFFER_JFJ_UDPRECEIVER_HPP - -#include -#include "PacketUdpReceiver.hpp" -#include "formats.hpp" -#include "buffer_config.hpp" -#include "PacketBuffer.hpp" -#include "jungfraujoch.hpp" - -/** JungfrauJoch UDP receiver - - Wrapper class to capture frames from the UDP stream of the JungfrauJoch FPGA card. - NOTE: This design will not scale well for higher frame rates... -**/ -class JfjFrameUdpReceiver { - PacketUdpReceiver m_udp_receiver; - bool in_progress = false; - uint64_t m_frame_index = 0; - const uint64_t m_num_modules; - const uint64_t m_num_packets; - const uint64_t m_num_data_bytes; - - // PacketBuffer m_buffer; - PacketBuffer m_buffer; - - inline void init_frame(ModuleFrame& frame_metadata, const jfjoch_packet_t& c_packet); - inline uint64_t process_packets(ModuleFrame& metadata, char* frame_buffer); - -public: - JfjFrameUdpReceiver(const uint16_t port, uint64_t n_modules = 8); - virtual ~JfjFrameUdpReceiver(); - uint64_t get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer); -}; - -#endif //SF_DAQ_BUFFER_JFJ_UDPRECEIVER_HPP diff --git a/jfj-udp-recv/include/PacketBuffer.hpp b/jfj-udp-recv/include/PacketBuffer.hpp deleted file mode 100644 index abd08bb..0000000 --- a/jfj-udp-recv/include/PacketBuffer.hpp +++ /dev/null @@ -1,113 +0,0 @@ -#ifndef CIRCULAR_BUFFER_TEMPLATE_HPP -#define CIRCULAR_BUFFER_TEMPLATE_HPP - -#include -#include -#include -#include -#include -#include - - -/** Linear data buffer (NOT FIFO) - - Simplified data buffer that provides pop and push operations and - bundles the actual container with metadata required by . - It stores the actual data in an accessible C-style array. **/ -template -class PacketBuffer{ -public: - PacketBuffer() { - for (int i = 0; i < CAPACITY; i++) { - m_recv_buff_ptr[i].iov_base = (void*) &(m_container[i]); - m_recv_buff_ptr[i].iov_len = sizeof(T); - - // C-structure as expected by - m_msgs[i].msg_hdr.msg_iov = &m_recv_buff_ptr[i]; - m_msgs[i].msg_hdr.msg_iovlen = 1; - m_msgs[i].msg_hdr.msg_name = &m_sock_from[i]; - m_msgs[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); - } - }; - // ~PacketBuffer() {}; - - /**Diagnostics**/ - int size() const { return ( idx_write-idx_read ); } - int capacity() const { return m_capacity; } - bool is_full() const { return bool(idx_write >= m_capacity); } - bool is_empty() const { return bool(idx_write <= idx_read); } - - /**Operators**/ - void reset(){ idx_write = 0; idx_read = 0; }; // Reset the buffer - T& container(){ return m_container; }; // Direct container reference - mmsghdr& msgs(){ return m_msgs; }; - - /**Element access**/ - T& pop_front(); //Destructive read - const T& peek_front(); //Non-destructive read - void push_back(T item); //Write new element to buffer - - /**Fill from UDP receiver**/ - template - void fill_from(TY& recv){ - std::lock_guard g_guard(m_mutex); - this->idx_write = recv.receive_many(m_msgs, this->capacity()); - // std::cout << "Received " << this->idx_write << " frames" << std::endl; - // Returns -1 with errno=11 if no data received - if(idx_write==-1){ idx_write = 0; } - this->idx_read = 0; - } - -private: - // Main container - T m_container[CAPACITY]; - const size_t m_capacity = CAPACITY; - /**Guards**/ - std::mutex m_mutex; - /**Read and write index**/ - int idx_write = 0; - int idx_read = 0; - - // C-structures as expected by - mmsghdr m_msgs[CAPACITY]; - iovec m_recv_buff_ptr[CAPACITY]; - sockaddr_in m_sock_from[CAPACITY]; -}; - - -/*********************************************************************/ -/*********************************************************************/ -/*********************************************************************/ - -/** Destructive read - Standard read access to queues (i.e. progress the read pointer). - Throws 'std::length_error' if container is empty. **/ -template -T& PacketBuffer::pop_front(){ - std::lock_guard g_guard(m_mutex); - if(this->is_empty()){ throw std::out_of_range("Attempted to read empty queue!"); } - idx_read++; - return m_container[idx_read-1]; -} - -/** Non-destructive read - Standard, non-destructive read access (does not progress the read pointer). - Throws 'std::length_error' if container is empty. **/ -template -const T& PacketBuffer::peek_front(){ - std::lock_guard g_guard(m_mutex); - if(this->is_empty()){ throw std::out_of_range("Attempted to read empty queue!"); } - return m_container[idx_read]; -} - - -/** Push an element into the end of the buffer**/ -template -void PacketBuffer::push_back(T item){ - std::lock_guard g_guard(m_mutex); - if(this->is_full()){ throw std::out_of_range("Attempted to write a full buffer!"); } - m_container[idx_write] = item; - idx_write++; -} - -#endif // CIRCULAR_BUFFER_TEMPLATE_HPP diff --git a/jfj-udp-recv/include/PacketUdpReceiver.hpp b/jfj-udp-recv/include/PacketUdpReceiver.hpp deleted file mode 100644 index cc7e093..0000000 --- a/jfj-udp-recv/include/PacketUdpReceiver.hpp +++ /dev/null @@ -1,22 +0,0 @@ -#ifndef UDPRECEIVER_H -#define UDPRECEIVER_H - -#include - -class PacketUdpReceiver { - - int socket_fd_; - -public: - PacketUdpReceiver(); - virtual ~PacketUdpReceiver(); - - bool receive(void* buffer, const size_t buffer_n_bytes); - int receive_many(mmsghdr* msgs, const size_t n_msgs); - - void bind(const uint16_t port); - void disconnect(); -}; - - -#endif //LIB_CPP_H5_WRITER_UDPRECEIVER_H diff --git a/jfj-udp-recv/src/JfjFrameStats.cpp b/jfj-udp-recv/src/JfjFrameStats.cpp deleted file mode 100644 index ee48423..0000000 --- a/jfj-udp-recv/src/JfjFrameStats.cpp +++ /dev/null @@ -1,71 +0,0 @@ -#include -#include "JfjFrameStats.hpp" - -using namespace std; -using namespace chrono; - -FrameStats::FrameStats( - const std::string &detector_name, - const int module_id, - const size_t stats_time) : - detector_name_(detector_name), - module_id_(module_id), - stats_time_(stats_time) -{ - reset_counters(); -} - -void FrameStats::reset_counters() -{ - frames_counter_ = 0; - n_missed_packets_ = 0; - n_corrupted_frames_ = 0; - n_corrupted_pulse_id_ = 0; - stats_interval_start_ = steady_clock::now(); -} - -void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id) -{ - - if (bad_pulse_id) { - n_corrupted_pulse_id_++; - } - - if (meta.n_recv_packets < JF_N_PACKETS_PER_FRAME) { - n_missed_packets_ += JF_N_PACKETS_PER_FRAME - meta.n_recv_packets; - n_corrupted_frames_++; - } - - frames_counter_++; - - auto time_passed = duration_cast( - steady_clock::now()-stats_interval_start_).count(); - - if (time_passed >= stats_time_*1000) { - print_stats(); - reset_counters(); - } -} - -void FrameStats::print_stats() -{ - auto interval_ms_duration = duration_cast( - steady_clock::now()-stats_interval_start_).count(); - // * 1000 because milliseconds, + 250 because of truncation. - int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration; - uint64_t timestamp = time_point_cast( - system_clock::now()).time_since_epoch().count(); - - // Output in InfluxDB line protocol - cout << "jf_udp_recv"; - cout << ",detector_name=" << detector_name_; - cout << ",module_name=M" << module_id_; - cout << " "; - cout << "n_missed_packets=" << n_missed_packets_ << "i"; - cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; - cout << ",repetition_rate=" << rep_rate << "i"; - cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; - cout << " "; - cout << timestamp; - cout << endl; -} diff --git a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp deleted file mode 100644 index 77a0c91..0000000 --- a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp +++ /dev/null @@ -1,86 +0,0 @@ -#include -#include "JfjFrameUdpReceiver.hpp" - -using namespace std; -using namespace buffer_config; - - - -JfjFrameUdpReceiver::JfjFrameUdpReceiver(const uint16_t port, uint64_t n_modules): - m_num_modules(n_modules), m_num_packets(n_modules*JFJOCH_N_PACKETS_PER_MODULE), - m_num_data_bytes(n_modules*JFJOCH_DATA_BYTES_PER_MODULE) { - m_udp_receiver.bind(port); -} - -JfjFrameUdpReceiver::~JfjFrameUdpReceiver() { - m_udp_receiver.disconnect(); -} - -inline void JfjFrameUdpReceiver::init_frame(ModuleFrame& metadata, const jfjoch_packet_t& c_packet) { - metadata.pulse_id = c_packet.bunchid; - metadata.frame_index = c_packet.framenum; - metadata.daq_rec = (uint64_t) c_packet.debug; - metadata.module_id = (int64_t) 0; -} - -inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char* frame_buffer){ - - while(!m_buffer.is_empty()){ - // Happens if the last packet from the previous frame gets lost. - if (m_frame_index != m_buffer.peek_front().framenum) { - m_frame_index = m_buffer.peek_front().framenum; - if(this->in_progress){ - this->in_progress = false; - return metadata.pulse_id; - } - } - - // Otherwise pop the queue (and set current frame index) - jfjoch_packet_t& c_packet = m_buffer.pop_front(); - m_frame_index = c_packet.framenum; - this->in_progress = true; - - // Always copy metadata (otherwise problem when 0th packet gets lost) - this->init_frame(metadata, c_packet); - - // Copy data to frame buffer - size_t offset = JFJOCH_DATA_BYTES_PER_PACKET * c_packet.packetnum; - memcpy( (void*) (frame_buffer + offset), c_packet.data, JFJOCH_DATA_BYTES_PER_PACKET); - metadata.n_recv_packets++; - - // Last frame packet received. Frame finished. - if (c_packet.packetnum == m_num_packets - 1){ - this->in_progress = false; - return metadata.pulse_id; - } - } - - // We emptied the buffer. - // m_buffer.reset(); - return 0; -} - -uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer){ - // Reset the metadata and frame buffer for the next frame. (really needed?) - metadata.pulse_id = 0; - metadata.n_recv_packets = 0; - memset(frame_buffer, 0, m_num_data_bytes); - - - // Process leftover packages in the buffer - if (!m_buffer.is_empty()) { - auto pulse_id = process_packets(metadata, frame_buffer); - if (pulse_id != 0) { return pulse_id; } - } - - - while (true) { - // Receive new packages (pass if none)... - m_buffer.fill_from(m_udp_receiver); - if (m_buffer.is_empty()) { continue; } - - // ... and process them - auto pulse_id = process_packets(metadata, frame_buffer); - if (pulse_id != 0) { return pulse_id; } - } -} diff --git a/jfj-udp-recv/src/PacketUdpReceiver.cpp b/jfj-udp-recv/src/PacketUdpReceiver.cpp deleted file mode 100644 index 713dd98..0000000 --- a/jfj-udp-recv/src/PacketUdpReceiver.cpp +++ /dev/null @@ -1,66 +0,0 @@ -#include -#include -#include "PacketUdpReceiver.hpp" -#include "jungfrau.hpp" -#include -#include -#include "buffer_config.hpp" - -using namespace std; -using namespace buffer_config; - -PacketUdpReceiver::PacketUdpReceiver() : socket_fd_(-1) { } - -PacketUdpReceiver::~PacketUdpReceiver() { - disconnect(); -} - -void PacketUdpReceiver::bind(const uint16_t port){ - if (socket_fd_ > -1) { - throw runtime_error("Socket already bound."); - } - - socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0); - if (socket_fd_ < 0) { - throw runtime_error("Cannot open socket."); - } - - sockaddr_in server_address = {0}; - server_address.sin_family = AF_INET; - server_address.sin_addr.s_addr = INADDR_ANY; - server_address.sin_port = htons(port); - - timeval udp_socket_timeout; - udp_socket_timeout.tv_sec = 0; - udp_socket_timeout.tv_usec = BUFFER_UDP_US_TIMEOUT; - - if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVTIMEO, &udp_socket_timeout, sizeof(timeval)) == -1) { - throw runtime_error("Cannot set SO_RCVTIMEO. " + string(strerror(errno))); - } - - if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVBUF, &BUFFER_UDP_RCVBUF_BYTES, sizeof(int)) == -1) { - throw runtime_error("Cannot set SO_RCVBUF. " + string(strerror(errno))); - }; - //TODO: try to set SO_RCVLOWAT - - auto bind_result = ::bind(socket_fd_, reinterpret_cast(&server_address), sizeof(server_address)); - - if (bind_result < 0) { - throw runtime_error("Cannot bind socket."); - } -} - -int PacketUdpReceiver::receive_many(mmsghdr* msgs, const size_t n_msgs){ - return recvmmsg(socket_fd_, msgs, n_msgs, 0, 0); -} - -bool PacketUdpReceiver::receive(void* buffer, const size_t buffer_n_bytes){ - auto data_len = recv(socket_fd_, buffer, buffer_n_bytes, 0); - - return (data_len == buffer_n_bytes) ? true : false; -} - -void PacketUdpReceiver::disconnect(){ - close(socket_fd_); - socket_fd_ = -1; -} diff --git a/jfj-udp-recv/src/main.cpp b/jfj-udp-recv/src/main.cpp deleted file mode 100644 index fb2c3ce..0000000 --- a/jfj-udp-recv/src/main.cpp +++ /dev/null @@ -1,74 +0,0 @@ -#include -#include -#include -#include - -#include "formats.hpp" -#include "buffer_config.hpp" -#include "JfjFrameUdpReceiver.hpp" -#include "BufferUtils.hpp" -#include "JfjFrameStats.hpp" - -using namespace std; -using namespace chrono; -using namespace buffer_config; -using namespace BufferUtils; - -int main (int argc, char *argv[]) { - - if (argc != 3) { - cout << endl; - cout << "Usage: jfj_udp_recv [detector_json_filename]" << endl; - cout << "\tdetector_json_filename: detector config file path." << endl; - cout << endl; - - exit(-1); - } - - const auto config = read_json_config(string(argv[1])); - - const auto udp_port = config.start_udp_port; - JfjFrameUdpReceiver receiver(udp_port, 8); - RamBuffer buffer(config.detector_name, config.n_modules); - FrameStats stats(config.detector_name, 0, STATS_TIME); - - auto ctx = zmq_ctx_new(); - zmq_ctx_set(ctx, ZMQ_IO_THREADS, ZMQ_IO_THREADS); - auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "jungfraujoch"); - - // Might be better creating a structure for double buffering - ModuleFrame frameMeta; - ImageMetadata imageMeta; - char* dataBuffer = new char[8 * JFJOCH_DATA_BYTES_PER_MODULE]; - - uint64_t pulse_id_previous = 0; - uint64_t frame_index_previous = 0; - - - while (true) { - // NOTE: Needs to be pipelined for really high frame rates - auto pulse_id = receiver.get_frame_from_udp(frameMeta, dataBuffer); - - bool bad_pulse_id = false; - - if ( ( frameMeta.frame_index != (frame_index_previous+1) ) || ( (pulse_id-pulse_id_previous) < 0 ) || ( (pulse_id-pulse_id_previous) > 1000 ) ) { - bad_pulse_id = true; - } else { - imageMeta.pulse_id = frameMeta.pulse_id; - imageMeta.frame_index = frameMeta.frame_index; - imageMeta.daq_rec = frameMeta.daq_rec; - imageMeta.is_good_image = true; - - buffer.write_frame(frameMeta, dataBuffer); - zmq_send(sender, &imageMeta, sizeof(imageMeta), 0); - } - - stats.record_stats(frameMeta, bad_pulse_id); - - pulse_id_previous = pulse_id; - frame_index_previous = frameMeta.frame_index; - - } - - delete[] dataBuffer; -} diff --git a/jfj-udp-recv/test/CMakeLists.txt b/jfj-udp-recv/test/CMakeLists.txt deleted file mode 100644 index 77eeb3a..0000000 --- a/jfj-udp-recv/test/CMakeLists.txt +++ /dev/null @@ -1,8 +0,0 @@ -add_executable(jfj-udp-recv-tests main.cpp) - -target_link_libraries(jfj-udp-recv-tests - core-buffer-lib - jfj-udp-recv-lib - gtest - ) - diff --git a/jfj-udp-recv/test/main.cpp b/jfj-udp-recv/test/main.cpp deleted file mode 100644 index de16fb4..0000000 --- a/jfj-udp-recv/test/main.cpp +++ /dev/null @@ -1,11 +0,0 @@ -#include "gtest/gtest.h" -#include "test_PacketUdpReceiver.cpp" -#include "test_FrameUdpReceiver.cpp" -#include "test_PacketBuffer.cpp" - -using namespace std; - -int main(int argc, char **argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/jfj-udp-recv/test/mock/udp.hpp b/jfj-udp-recv/test/mock/udp.hpp deleted file mode 100644 index 1cef271..0000000 --- a/jfj-udp-recv/test/mock/udp.hpp +++ /dev/null @@ -1,16 +0,0 @@ -#ifndef MOCK_UDP_H -#define MOCK_UDP_H - -const int MOCK_UDP_PORT(13000); - -sockaddr_in get_server_address(uint16_t udp_port) -{ - sockaddr_in server_address = {0}; - server_address.sin_family = AF_INET; - server_address.sin_addr.s_addr = INADDR_ANY; - server_address.sin_port = htons(udp_port); - - return server_address; -} - -#endif \ No newline at end of file diff --git a/jfj-udp-recv/test/test_FrameUdpReceiver.cpp b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp deleted file mode 100644 index 9410415..0000000 --- a/jfj-udp-recv/test/test_FrameUdpReceiver.cpp +++ /dev/null @@ -1,199 +0,0 @@ -#include -#include -#include "gtest/gtest.h" -#include "JfjFrameUdpReceiver.hpp" -#include "mock/udp.hpp" - -#include -#include -#include - -using namespace std; -#define NUM_TEST_MODULES 3 - - -TEST(BufferUdpReceiver, simple_recv){ - int n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE; - int n_frames = 3; - - uint16_t udp_port = MOCK_UDP_PORT; - auto server_address = get_server_address(udp_port); - auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); - ASSERT_TRUE(send_socket_fd >= 0); - - JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES); - - auto handle = async(launch::async, [&](){ - for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ - for (size_t i_packet=0; i_packet(NUM_TEST_MODULES*JFJOCH_DATA_BYTES_PER_MODULE); - - for (int i_frame=0; i_frame < n_frames; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get()); - - ASSERT_EQ(i_frame + 1, pulse_id); - ASSERT_EQ(metadata.frame_index, i_frame + 1000); - ASSERT_EQ(metadata.daq_rec, i_frame + 10000); - ASSERT_EQ(metadata.n_recv_packets, n_packets); - } - - ::close(send_socket_fd); -} - -TEST(BufferUdpReceiver, missing_middle_packet){ - int n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE; - int n_frames = 3; - - uint16_t udp_port = MOCK_UDP_PORT; - auto server_address = get_server_address(udp_port); - auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); - ASSERT_TRUE(send_socket_fd >= 0); - - JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES); - - auto handle = async(launch::async, [&](){ - for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ - for (size_t i_packet=0; i_packet(NUM_TEST_MODULES * JFJOCH_DATA_BYTES_PER_MODULE); - - for (int i_frame=0; i_frame < n_frames; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp( - metadata, frame_buffer.get()); - - ASSERT_EQ(i_frame + 1, pulse_id); - ASSERT_EQ(metadata.frame_index, i_frame + 1000); - ASSERT_EQ(metadata.daq_rec, i_frame + 10000); - // -1 because we skipped a packet. - ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); - } - - ::close(send_socket_fd); -} - -TEST(BufferUdpReceiver, missing_first_packet){ - auto n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE; - int n_frames = 3; - - uint16_t udp_port = MOCK_UDP_PORT; - auto server_address = get_server_address(udp_port); - auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); - ASSERT_TRUE(send_socket_fd >= 0); - - JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES); - - auto handle = async(launch::async, [&](){ - for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ - for (size_t i_packet=0; i_packet(NUM_TEST_MODULES * JFJOCH_DATA_BYTES_PER_MODULE); - - for (int i_frame=0; i_frame < n_frames; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get()); - - ASSERT_EQ(i_frame + 1, pulse_id); - ASSERT_EQ(metadata.frame_index, i_frame + 1000); - ASSERT_EQ(metadata.daq_rec, i_frame + 10000); - // -2 because we skipped a packet. - ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); - } - - ::close(send_socket_fd); -} - -TEST(BufferUdpReceiver, missing_last_packet){ - int n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE; - int n_frames = 4; - - uint16_t udp_port = MOCK_UDP_PORT; - auto server_address = get_server_address(udp_port); - auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); - ASSERT_TRUE(send_socket_fd >= 0); - - JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES); - - auto handle = async(launch::async, [&](){ - for (int64_t i_frame=0; i_frame < n_frames+1; i_frame++){ - for (size_t i_packet=0; i_packet(NUM_TEST_MODULES * JFJOCH_DATA_BYTES_PER_MODULE); - - // n_frames -1 because the last frame is not complete. - for (int i_frame=0; i_frame < n_frames - 1; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get()); - - ASSERT_EQ(i_frame + 1, pulse_id); - ASSERT_EQ(metadata.frame_index, i_frame + 1000); - ASSERT_EQ(metadata.daq_rec, i_frame + 10000); - // -1 because we skipped a packet. - ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); - } - - ::close(send_socket_fd); -} diff --git a/jfj-udp-recv/test/test_PacketBuffer.cpp b/jfj-udp-recv/test/test_PacketBuffer.cpp deleted file mode 100644 index 7b602a8..0000000 --- a/jfj-udp-recv/test/test_PacketBuffer.cpp +++ /dev/null @@ -1,76 +0,0 @@ -#include -#include -#include "gtest/gtest.h" -#include "PacketBuffer.hpp" - -#include -#include -#include - -using namespace std; - - - -std::ostream &operator<<(std::ostream &os, jfjoch_packet_t const &packet) { - os << "Frame number: " << packet.framenum << std::endl; - os << "Packet number: " << packet.packetnum << std::endl; - os << "Bunch id: " << packet.bunchid << std::endl; - os << std::endl; - return os; -} - - - - -class MockReceiver{ - public: - uint64_t idx_packet = 42000; - uint64_t packet_per_frame = 512; - uint64_t num_bunches = 100; - uint64_t num_packets =50; - jfjoch_packet_t tmp; - - uint64_t receive_many(mmsghdr* msgs, const size_t n_msgs){ - // Receive 'num_packets numner of packets' - for(int ii=0; iiiov_base, &tmp, sizeof(tmp)); - idx_packet++; - } - return num_packets; - }; -}; - - - -TEST(BufferUdpReceiver, packetbuffer_simple){ - - PacketBuffer p_buffer; - MockReceiver mockery; - uint64_t prev_bunch, prev_packet; - jfjoch_packet_t p_pop; - - mockery.idx_packet = 7*512 + 13; - mockery.num_packets = 25; - - p_buffer.fill_from(mockery); - - // First packet - ASSERT_FALSE(p_buffer.is_empty()); - ASSERT_EQ(p_buffer.size(), 25); - - ASSERT_EQ(p_buffer.peek_front().bunchid, 1007); - ASSERT_EQ(p_buffer.peek_front().packetnum, 13); - prev_bunch = p_buffer.peek_front().bunchid; - prev_packet = p_buffer.peek_front().packetnum; - - p_pop = p_buffer.pop_front(); - ASSERT_EQ(p_buffer.size(), 24); - - ASSERT_EQ(p_pop.bunchid, prev_bunch); - ASSERT_EQ(p_pop.packetnum, prev_packet); - ASSERT_EQ(p_buffer.peek_front().bunchid, prev_bunch); - ASSERT_EQ(p_buffer.peek_front().packetnum, prev_packet+1); -}; diff --git a/jfj-udp-recv/test/test_PacketUdpReceiver.cpp b/jfj-udp-recv/test/test_PacketUdpReceiver.cpp deleted file mode 100644 index 1be343a..0000000 --- a/jfj-udp-recv/test/test_PacketUdpReceiver.cpp +++ /dev/null @@ -1,170 +0,0 @@ -#include -#include -#include "gtest/gtest.h" -#include "mock/udp.hpp" -#include "PacketUdpReceiver.hpp" - -#include -#include - -using namespace std; - -TEST(PacketUdpReceiver, simple_recv) -{ - uint16_t udp_port = MOCK_UDP_PORT; - - auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); - ASSERT_TRUE(send_socket_fd >= 0); - - PacketUdpReceiver udp_receiver; - udp_receiver.bind(udp_port); - - jungfrau_packet send_udp_buffer; - send_udp_buffer.packetnum = 91; - send_udp_buffer.framenum = 92; - send_udp_buffer.bunchid = 93; - send_udp_buffer.debug = 94; - - auto server_address = get_server_address(udp_port); - ::sendto( - send_socket_fd, - &send_udp_buffer, - JUNGFRAU_BYTES_PER_PACKET, - 0, - (sockaddr*) &server_address, - sizeof(server_address)); - - this_thread::sleep_for(chrono::milliseconds(100)); - - jungfrau_packet recv_udp_buffer; - ASSERT_TRUE(udp_receiver.receive( - &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); - - EXPECT_EQ(send_udp_buffer.packetnum, recv_udp_buffer.packetnum); - EXPECT_EQ(send_udp_buffer.framenum, recv_udp_buffer.framenum); - EXPECT_EQ(send_udp_buffer.bunchid, recv_udp_buffer.bunchid); - EXPECT_EQ(send_udp_buffer.debug, recv_udp_buffer.debug); - - ASSERT_FALSE(udp_receiver.receive( - &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); - - udp_receiver.disconnect(); - ::close(send_socket_fd); -} - -TEST(PacketUdpReceiver, false_recv) -{ - uint16_t udp_port = MOCK_UDP_PORT; - - auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); - ASSERT_TRUE(send_socket_fd >= 0); - - PacketUdpReceiver udp_receiver; - udp_receiver.bind(udp_port); - - jungfrau_packet send_udp_buffer; - jungfrau_packet recv_udp_buffer; - - auto server_address = get_server_address(udp_port); - - ::sendto( - send_socket_fd, - &send_udp_buffer, - JUNGFRAU_BYTES_PER_PACKET-1, - 0, - (sockaddr*) &server_address, - sizeof(server_address)); - - ASSERT_FALSE(udp_receiver.receive( - &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); - - ::sendto( - send_socket_fd, - &send_udp_buffer, - JUNGFRAU_BYTES_PER_PACKET, - 0, - (sockaddr*) &server_address, - sizeof(server_address)); - - ASSERT_TRUE(udp_receiver.receive( - &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); - - ::sendto( - send_socket_fd, - &send_udp_buffer, - JUNGFRAU_BYTES_PER_PACKET-1, - 0, - (sockaddr*) &server_address, - sizeof(server_address)); - - ASSERT_TRUE(udp_receiver.receive( - &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET-1)); - - udp_receiver.disconnect(); - ::close(send_socket_fd); -} - -TEST(PacketUdpReceiver, receive_many) -{ - auto n_msg_buffer = JF_N_PACKETS_PER_FRAME; - jungfrau_packet recv_buffer[n_msg_buffer]; - iovec recv_buff_ptr[n_msg_buffer]; - struct mmsghdr msgs[n_msg_buffer]; - struct sockaddr_in sockFrom[n_msg_buffer]; - - for (int i = 0; i < n_msg_buffer; i++) { - recv_buff_ptr[i].iov_base = (void*) &(recv_buffer[i]); - recv_buff_ptr[i].iov_len = sizeof(jungfrau_packet); - - msgs[i].msg_hdr.msg_iov = &recv_buff_ptr[i]; - msgs[i].msg_hdr.msg_iovlen = 1; - msgs[i].msg_hdr.msg_name = &sockFrom[i]; - msgs[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); - } - - uint16_t udp_port = MOCK_UDP_PORT; - - auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); - ASSERT_TRUE(send_socket_fd >= 0); - - PacketUdpReceiver udp_receiver; - udp_receiver.bind(udp_port); - - jungfrau_packet send_udp_buffer; - - auto server_address = get_server_address(udp_port); - - send_udp_buffer.bunchid = 0; - ::sendto( - send_socket_fd, - &send_udp_buffer, - JUNGFRAU_BYTES_PER_PACKET, - 0, - (sockaddr*) &server_address, - sizeof(server_address)); - - send_udp_buffer.bunchid = 1; - ::sendto( - send_socket_fd, - &send_udp_buffer, - JUNGFRAU_BYTES_PER_PACKET, - 0, - (sockaddr*) &server_address, - sizeof(server_address)); - - this_thread::sleep_for(chrono::milliseconds(10)); - - auto n_msgs = udp_receiver.receive_many(msgs, JF_N_PACKETS_PER_FRAME); - ASSERT_EQ(n_msgs, 2); - - for (size_t i=0;i