From 0f7c0391d218bf46c2cd467cb1afc32a081426b3 Mon Sep 17 00:00:00 2001 From: Mohacsi Istvan Date: Tue, 14 Sep 2021 11:31:10 +0200 Subject: [PATCH] Cleanup --- jf-zmqstreamer/CMakeLists.txt | 12 + {jfj-combined => jf-zmqstreamer}/README.md | 0 .../include/FrameCache.hpp | 1 - .../include/FrameStats.hpp | 0 .../include/FrameWorker.hpp | 13 +- .../include/PacketBuffer.hpp | 0 .../include/PacketUdpReceiver.hpp | 0 .../include/Watchdog.hpp | 0 .../include/ZmqImagePublisher.hpp | 0 .../src/FrameStats.cpp | 0 .../src/FrameWorker.cpp | 10 +- .../src/PacketUdpReceiver.cpp | 0 {jfj-combined => jf-zmqstreamer}/src/main.cpp | 6 +- .../test/CMakeLists.txt | 0 .../test/main.cpp | 0 .../test/mock/dummy_detector.json | 0 .../test/mock/udp.hpp | 0 .../test/test_FrameUdpReceiver.cpp | 0 .../test/test_PacketBuffer.cpp | 0 .../test/test_PacketUdpReceiver.cpp | 0 .../test/test_Watchdog.cpp | 0 jfj-combined/CMakeLists.txt | 12 - jfj-udp-recv/CMakeLists.txt | 12 - jfj-udp-recv/README.md | 164 ------------- jfj-udp-recv/include/JfjFrameStats.hpp | 31 --- jfj-udp-recv/include/JfjFrameUdpReceiver.hpp | 32 --- jfj-udp-recv/include/PacketBuffer.hpp | 112 --------- jfj-udp-recv/include/PacketUdpReceiver.hpp | 22 -- jfj-udp-recv/src/JfjFrameStats.cpp | 71 ------ jfj-udp-recv/src/JfjFrameUdpReceiver.cpp | 86 ------- jfj-udp-recv/src/PacketUdpReceiver.cpp | 66 ----- jfj-udp-recv/src/main.cpp | 74 ------ jfj-udp-recv/test/CMakeLists.txt | 8 - jfj-udp-recv/test/main.cpp | 10 - jfj-udp-recv/test/mock/udp.hpp | 16 -- jfj-udp-recv/test/test_FrameUdpReceiver.cpp | 230 ------------------ jfj-udp-recv/test/test_PacketUdpReceiver.cpp | 170 ------------- 37 files changed, 26 insertions(+), 1132 deletions(-) create mode 100644 jf-zmqstreamer/CMakeLists.txt rename {jfj-combined => jf-zmqstreamer}/README.md (100%) rename {jfj-combined => jf-zmqstreamer}/include/FrameCache.hpp (99%) rename {jfj-combined => jf-zmqstreamer}/include/FrameStats.hpp (100%) rename {jfj-combined => jf-zmqstreamer}/include/FrameWorker.hpp (74%) rename {jfj-combined => jf-zmqstreamer}/include/PacketBuffer.hpp (100%) rename {jfj-combined => jf-zmqstreamer}/include/PacketUdpReceiver.hpp (100%) rename {jfj-combined => jf-zmqstreamer}/include/Watchdog.hpp (100%) rename {jfj-combined => jf-zmqstreamer}/include/ZmqImagePublisher.hpp (100%) rename jfj-combined/src/JfjFrameStats.cpp => jf-zmqstreamer/src/FrameStats.cpp (100%) rename jfj-combined/src/JfjFrameWorker.cpp => jf-zmqstreamer/src/FrameWorker.cpp (89%) rename {jfj-combined => jf-zmqstreamer}/src/PacketUdpReceiver.cpp (100%) rename {jfj-combined => jf-zmqstreamer}/src/main.cpp (87%) rename {jfj-combined => jf-zmqstreamer}/test/CMakeLists.txt (100%) rename {jfj-combined => jf-zmqstreamer}/test/main.cpp (100%) rename {jfj-combined => jf-zmqstreamer}/test/mock/dummy_detector.json (100%) rename {jfj-combined => jf-zmqstreamer}/test/mock/udp.hpp (100%) rename {jfj-combined => jf-zmqstreamer}/test/test_FrameUdpReceiver.cpp (100%) rename {jfj-combined => jf-zmqstreamer}/test/test_PacketBuffer.cpp (100%) rename {jfj-combined => jf-zmqstreamer}/test/test_PacketUdpReceiver.cpp (100%) rename {jfj-combined => jf-zmqstreamer}/test/test_Watchdog.cpp (100%) delete mode 100644 jfj-combined/CMakeLists.txt delete mode 100644 jfj-udp-recv/CMakeLists.txt delete mode 100644 jfj-udp-recv/README.md delete mode 100644 jfj-udp-recv/include/JfjFrameStats.hpp delete mode 100644 jfj-udp-recv/include/JfjFrameUdpReceiver.hpp delete mode 100644 jfj-udp-recv/include/PacketBuffer.hpp delete mode 100644 jfj-udp-recv/include/PacketUdpReceiver.hpp delete mode 100644 jfj-udp-recv/src/JfjFrameStats.cpp delete mode 100644 jfj-udp-recv/src/JfjFrameUdpReceiver.cpp delete mode 100644 jfj-udp-recv/src/PacketUdpReceiver.cpp delete mode 100644 jfj-udp-recv/src/main.cpp delete mode 100644 jfj-udp-recv/test/CMakeLists.txt delete mode 100644 jfj-udp-recv/test/main.cpp delete mode 100644 jfj-udp-recv/test/mock/udp.hpp delete mode 100644 jfj-udp-recv/test/test_FrameUdpReceiver.cpp delete mode 100644 jfj-udp-recv/test/test_PacketUdpReceiver.cpp diff --git a/jf-zmqstreamer/CMakeLists.txt b/jf-zmqstreamer/CMakeLists.txt new file mode 100644 index 0000000..adc0b3d --- /dev/null +++ b/jf-zmqstreamer/CMakeLists.txt @@ -0,0 +1,12 @@ +file(GLOB SOURCES src/*.cpp) + +add_library(jf-zmqstreamer-lib STATIC ${SOURCES}) +target_include_directories(jf-zmqstreamer-lib PUBLIC include/) +target_link_libraries(jf-zmqstreamer-lib external core-buffer-lib) + +add_executable(jf-zmqstreamer src/main.cpp) +set_target_properties(jf-zmqstreamer PROPERTIES OUTPUT_NAME jfj_combined) +target_link_libraries(jf-zmqstreamer jf-zmqstreamer-lib zmq rt pthread) + +enable_testing() +add_subdirectory(test/) diff --git a/jfj-combined/README.md b/jf-zmqstreamer/README.md similarity index 100% rename from jfj-combined/README.md rename to jf-zmqstreamer/README.md diff --git a/jfj-combined/include/FrameCache.hpp b/jf-zmqstreamer/include/FrameCache.hpp similarity index 99% rename from jfj-combined/include/FrameCache.hpp rename to jf-zmqstreamer/include/FrameCache.hpp index 263577e..444af0f 100644 --- a/jfj-combined/include/FrameCache.hpp +++ b/jf-zmqstreamer/include/FrameCache.hpp @@ -91,7 +91,6 @@ public: } } - protected: /** Flush and start a new line diff --git a/jfj-combined/include/FrameStats.hpp b/jf-zmqstreamer/include/FrameStats.hpp similarity index 100% rename from jfj-combined/include/FrameStats.hpp rename to jf-zmqstreamer/include/FrameStats.hpp diff --git a/jfj-combined/include/FrameWorker.hpp b/jf-zmqstreamer/include/FrameWorker.hpp similarity index 74% rename from jfj-combined/include/FrameWorker.hpp rename to jf-zmqstreamer/include/FrameWorker.hpp index 48e3c3d..aa6d5f5 100644 --- a/jfj-combined/include/FrameWorker.hpp +++ b/jf-zmqstreamer/include/FrameWorker.hpp @@ -10,13 +10,13 @@ #include "PacketBuffer.hpp" #include "FrameStats.hpp" -/** JungfrauJoch UDP receiver +/** Jungfrau UDP receiver Capture UDP data stream from Jungfrau(Joch) FPGA card. NOTE: This design will not scale well for higher frame rates... TODO: Direct copy into FrameCache buffer (saves a memcopy) **/ -class JfjFrameWorker { +class FrameWorker { const std::string m_moduleName; const uint64_t m_moduleID; @@ -35,14 +35,13 @@ class JfjFrameWorker { std::function f_push_callback; public: - JfjFrameWorker(const uint16_t port, std::string moduleName, const uint32_t moduleID, - std::function callback); - virtual ~JfjFrameWorker(); + FrameWorker(const uint16_t port, std::string moduleName, const uint32_t moduleID, std::function callback); + virtual ~FrameWorker(); void run(); // Copy semantics : OFF - JfjFrameWorker(JfjFrameWorker const &) = delete; - JfjFrameWorker& operator=(JfjFrameWorker const &) = delete; + FrameWorker(FrameWorker const &) = delete; + FrameWorker& operator=(FrameWorker const &) = delete; }; #endif //SF_DAQ_BUFFER_JFJ_FRAMEWORKER_HPP diff --git a/jfj-combined/include/PacketBuffer.hpp b/jf-zmqstreamer/include/PacketBuffer.hpp similarity index 100% rename from jfj-combined/include/PacketBuffer.hpp rename to jf-zmqstreamer/include/PacketBuffer.hpp diff --git a/jfj-combined/include/PacketUdpReceiver.hpp b/jf-zmqstreamer/include/PacketUdpReceiver.hpp similarity index 100% rename from jfj-combined/include/PacketUdpReceiver.hpp rename to jf-zmqstreamer/include/PacketUdpReceiver.hpp diff --git a/jfj-combined/include/Watchdog.hpp b/jf-zmqstreamer/include/Watchdog.hpp similarity index 100% rename from jfj-combined/include/Watchdog.hpp rename to jf-zmqstreamer/include/Watchdog.hpp diff --git a/jfj-combined/include/ZmqImagePublisher.hpp b/jf-zmqstreamer/include/ZmqImagePublisher.hpp similarity index 100% rename from jfj-combined/include/ZmqImagePublisher.hpp rename to jf-zmqstreamer/include/ZmqImagePublisher.hpp diff --git a/jfj-combined/src/JfjFrameStats.cpp b/jf-zmqstreamer/src/FrameStats.cpp similarity index 100% rename from jfj-combined/src/JfjFrameStats.cpp rename to jf-zmqstreamer/src/FrameStats.cpp diff --git a/jfj-combined/src/JfjFrameWorker.cpp b/jf-zmqstreamer/src/FrameWorker.cpp similarity index 89% rename from jfj-combined/src/JfjFrameWorker.cpp rename to jf-zmqstreamer/src/FrameWorker.cpp index 703d900..dd7b91d 100644 --- a/jfj-combined/src/JfjFrameWorker.cpp +++ b/jf-zmqstreamer/src/FrameWorker.cpp @@ -3,13 +3,13 @@ #include "FrameWorker.hpp" -JfjFrameWorker::JfjFrameWorker(const uint16_t port, std::string moduleName, const uint32_t moduleID, std::function callback): +FrameWorker::FrameWorker(const uint16_t port, std::string moduleName, const uint32_t moduleID, std::function callback): m_moduleName(moduleName), m_moduleID(moduleID), m_moduleStats(moduleName, moduleID, 10.0), f_push_callback(callback) { m_udp_receiver.bind(port); } -JfjFrameWorker::~JfjFrameWorker() { +FrameWorker::~FrameWorker() { m_udp_receiver.disconnect(); } @@ -18,7 +18,7 @@ JfjFrameWorker::~JfjFrameWorker() { Drains the buffer either until it's empty or the current frame is finished. Has some optimizations and safety checks before segfaulting right away... TODO: Direct memcopy into FrameCache for more speed! **/ -inline uint64_t JfjFrameWorker::process_packets(BufferBinaryFormat& buffer){ +inline uint64_t FrameWorker::process_packets(BufferBinaryFormat& buffer){ while(!m_buffer.is_empty()){ // Happens if the last packet from the previous frame gets lost. @@ -68,7 +68,7 @@ inline uint64_t JfjFrameWorker::process_packets(BufferBinaryFormat& buffer){ return 0; } -uint64_t JfjFrameWorker::get_frame(BufferBinaryFormat& buffer){ +uint64_t FrameWorker::get_frame(BufferBinaryFormat& buffer){ // Reset the metadata and frame buffer for the next frame std::memset(&buffer, 0, sizeof(buffer)); uint64_t pulse_id = 0; @@ -85,7 +85,7 @@ uint64_t JfjFrameWorker::get_frame(BufferBinaryFormat& buffer){ } -void JfjFrameWorker::run(){ +void FrameWorker::run(){ std::cout << "Running worker loop" << std::endl; // Might be better creating a structure for double buffering BufferBinaryFormat buffer; diff --git a/jfj-combined/src/PacketUdpReceiver.cpp b/jf-zmqstreamer/src/PacketUdpReceiver.cpp similarity index 100% rename from jfj-combined/src/PacketUdpReceiver.cpp rename to jf-zmqstreamer/src/PacketUdpReceiver.cpp diff --git a/jfj-combined/src/main.cpp b/jf-zmqstreamer/src/main.cpp similarity index 87% rename from jfj-combined/src/main.cpp rename to jf-zmqstreamer/src/main.cpp index e258723..3f91b58 100644 --- a/jfj-combined/src/main.cpp +++ b/jf-zmqstreamer/src/main.cpp @@ -35,20 +35,20 @@ int main (int argc, char *argv[]) { std::cout << "Creating frame workers..." << std::endl; - std::vector> vWorkers; + std::vector> vWorkers; for(int mm=0; mm(config.start_udp_port+mm, moduleName, mm, push_cb) ); + vWorkers.emplace_back( std::make_shared(config.start_udp_port+mm, moduleName, mm, push_cb) ); } std::cout << "Starting frame worker threads..." << std::endl; std::vector vThreads; for(int mm=0; mm -#include -#include - -#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP -#define SF_DAQ_BUFFER_FRAMESTATS_HPP - - -class FrameStats { - const std::string detector_name_; - const int module_id_; - size_t stats_time_; - - int frames_counter_; - int n_missed_packets_; - int n_corrupted_frames_; - int n_corrupted_pulse_id_; - std::chrono::time_point stats_interval_start_; - - void reset_counters(); - void print_stats(); - -public: - FrameStats(const std::string &detector_name, - const int module_id, - const size_t stats_time); - void record_stats(const ModuleFrame &meta, const bool bad_pulse_id); -}; - - -#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP diff --git a/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp deleted file mode 100644 index aad5962..0000000 --- a/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp +++ /dev/null @@ -1,32 +0,0 @@ -#ifndef SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP -#define SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP - -#include -#include "PacketUdpReceiver.hpp" -#include "formats.hpp" -#include "buffer_config.hpp" -#include "PacketBuffer.hpp" -#include "jungfraujoch.hpp" - -/** JungfrauJoch UDP receiver - - Wrapper class to capture frames from the UDP stream of the JungfrauJoch FPGA card. - NOTE: This design will not scale well for higher frame rates... -**/ -class JfjFrameUdpReceiver { - PacketUdpReceiver m_udp_receiver; - uint64_t m_frame_index; - - PacketBuffer m_buffer; - - inline void init_frame(ModuleFrame& frame_metadata, const jfjoch_packet_t& c_packet); - inline uint64_t process_packets(ModuleFrame& metadata, char* frame_buffer); - -public: - JfjFrameUdpReceiver(const uint16_t port); - virtual ~JfjFrameUdpReceiver(); - uint64_t get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer); -}; - - -#endif //SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP diff --git a/jfj-udp-recv/include/PacketBuffer.hpp b/jfj-udp-recv/include/PacketBuffer.hpp deleted file mode 100644 index 0697a5f..0000000 --- a/jfj-udp-recv/include/PacketBuffer.hpp +++ /dev/null @@ -1,112 +0,0 @@ -#ifndef CIRCULAR_BUFFER_TEMPLATE_HPP -#define CIRCULAR_BUFFER_TEMPLATE_HPP - -#include -#include -#include -#include -#include -#include - - -/** Linear data buffer (NOT FIFO) - - Simplified data buffer that provides pop and push operations and - bundles the actual container with metadata required by . - It stores the actual data in an accessible C-style array. **/ -template -class PacketBuffer{ -public: - PacketBuffer() { - for (int i = 0; i < CAPACITY; i++) { - m_recv_buff_ptr[i].iov_base = (void*) &(m_container[i]); - m_recv_buff_ptr[i].iov_len = sizeof(T); - - // C-structure as expected by - m_msgs[i].msg_hdr.msg_iov = &m_recv_buff_ptr[i]; - m_msgs[i].msg_hdr.msg_iovlen = 1; - m_msgs[i].msg_hdr.msg_name = &m_sock_from[i]; - m_msgs[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); - } - }; - // ~PacketBuffer() {}; - - /**Diagnostics**/ - int size() const { return ( idx_write-idx_read ); } - int capacity() const { return m_capacity; } - bool is_full() const { return bool(idx_write >= m_capacity); } - bool is_empty() const { return bool(idx_write <= idx_read); } - - /**Operators**/ - void reset(){ idx_write = 0; idx_read = 0; }; // Reset the buffer - T& container(){ return m_container; }; // Direct container reference - mmsghdr& msgs(){ return m_msgs; }; - - /**Element access**/ - T& pop_front(); //Destructive read - const T& peek_front(); //Non-destructive read - void push_back(T item); //Write new element to buffer - - /**Fill from UDP receiver**/ - template - void fill_from(TY& recv){ - std::lock_guard g_guard(m_mutex); - this->idx_write = recv.receive_many(m_msgs, this->capacity()); - // Returns -1 with errno=11 if no data received - if(idx_write==-1){ idx_write = 0; } - this->idx_read = 0; - } - -private: - // Main container - T m_container[CAPACITY]; - const size_t m_capacity = CAPACITY; - /**Guards**/ - std::mutex m_mutex; - /**Read and write index**/ - int idx_write = 0; - int idx_read = 0; - - // C-structures as expected by - mmsghdr m_msgs[CAPACITY]; - iovec m_recv_buff_ptr[CAPACITY]; - sockaddr_in m_sock_from[CAPACITY]; -}; - - -/*********************************************************************/ -/*********************************************************************/ -/*********************************************************************/ - -/** Destructive read - Standard read access to queues (i.e. progress the read pointer). - Throws 'std::length_error' if container is empty. **/ -template -T& PacketBuffer::pop_front(){ - std::lock_guard g_guard(m_mutex); - if(this->is_empty()){ throw std::out_of_range("Attempted to read empty queue!"); } - idx_read++; - return m_container[idx_read-1]; -} - -/** Non-destructive read - Standard, non-destructive read access (does not progress the read pointer). - Throws 'std::length_error' if container is empty. **/ -template -const T& PacketBuffer::peek_front(){ - std::lock_guard g_guard(m_mutex); - if(this->is_empty()){ throw std::out_of_range("Attempted to read empty queue!"); } - return m_container[idx_read]; -} - - -/** Push an element into the end of the buffer**/ -template -void PacketBuffer::push_back(T item){ - std::lock_guard g_guard(m_mutex); - if(this->is_full()){ throw std::out_of_range("Attempted to write a full buffer!"); } - m_container[idx_write] = item; - idx_write++; -} - -#endif // CIRCULAR_BUFFER_TEMPLATE_HPP diff --git a/jfj-udp-recv/include/PacketUdpReceiver.hpp b/jfj-udp-recv/include/PacketUdpReceiver.hpp deleted file mode 100644 index cc7e093..0000000 --- a/jfj-udp-recv/include/PacketUdpReceiver.hpp +++ /dev/null @@ -1,22 +0,0 @@ -#ifndef UDPRECEIVER_H -#define UDPRECEIVER_H - -#include - -class PacketUdpReceiver { - - int socket_fd_; - -public: - PacketUdpReceiver(); - virtual ~PacketUdpReceiver(); - - bool receive(void* buffer, const size_t buffer_n_bytes); - int receive_many(mmsghdr* msgs, const size_t n_msgs); - - void bind(const uint16_t port); - void disconnect(); -}; - - -#endif //LIB_CPP_H5_WRITER_UDPRECEIVER_H diff --git a/jfj-udp-recv/src/JfjFrameStats.cpp b/jfj-udp-recv/src/JfjFrameStats.cpp deleted file mode 100644 index ee48423..0000000 --- a/jfj-udp-recv/src/JfjFrameStats.cpp +++ /dev/null @@ -1,71 +0,0 @@ -#include -#include "JfjFrameStats.hpp" - -using namespace std; -using namespace chrono; - -FrameStats::FrameStats( - const std::string &detector_name, - const int module_id, - const size_t stats_time) : - detector_name_(detector_name), - module_id_(module_id), - stats_time_(stats_time) -{ - reset_counters(); -} - -void FrameStats::reset_counters() -{ - frames_counter_ = 0; - n_missed_packets_ = 0; - n_corrupted_frames_ = 0; - n_corrupted_pulse_id_ = 0; - stats_interval_start_ = steady_clock::now(); -} - -void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id) -{ - - if (bad_pulse_id) { - n_corrupted_pulse_id_++; - } - - if (meta.n_recv_packets < JF_N_PACKETS_PER_FRAME) { - n_missed_packets_ += JF_N_PACKETS_PER_FRAME - meta.n_recv_packets; - n_corrupted_frames_++; - } - - frames_counter_++; - - auto time_passed = duration_cast( - steady_clock::now()-stats_interval_start_).count(); - - if (time_passed >= stats_time_*1000) { - print_stats(); - reset_counters(); - } -} - -void FrameStats::print_stats() -{ - auto interval_ms_duration = duration_cast( - steady_clock::now()-stats_interval_start_).count(); - // * 1000 because milliseconds, + 250 because of truncation. - int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration; - uint64_t timestamp = time_point_cast( - system_clock::now()).time_since_epoch().count(); - - // Output in InfluxDB line protocol - cout << "jf_udp_recv"; - cout << ",detector_name=" << detector_name_; - cout << ",module_name=M" << module_id_; - cout << " "; - cout << "n_missed_packets=" << n_missed_packets_ << "i"; - cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; - cout << ",repetition_rate=" << rep_rate << "i"; - cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; - cout << " "; - cout << timestamp; - cout << endl; -} diff --git a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp deleted file mode 100644 index 059da2b..0000000 --- a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp +++ /dev/null @@ -1,86 +0,0 @@ -#include -#include "JfjFrameUdpReceiver.hpp" - -using namespace std; -using namespace buffer_config; - -std::ostream &operator<<(std::ostream &os, jfjoch_packet_t const &packet) { - os << "Frame number: " << packet.framenum << std::endl; - os << "Packet number: " << packet.packetnum << std::endl; - os << "Bunch id: " << packet.bunchid << std::endl; - os << std::endl; - return os; -} - -JfjFrameUdpReceiver::JfjFrameUdpReceiver(const uint16_t port) { - m_udp_receiver.bind(port); -} - -JfjFrameUdpReceiver::~JfjFrameUdpReceiver() { - m_udp_receiver.disconnect(); -} - -inline void JfjFrameUdpReceiver::init_frame(ModuleFrame& metadata, const jfjoch_packet_t& c_packet) { - // std::cout << c_packet; - - metadata.pulse_id = c_packet.bunchid; - metadata.frame_index = c_packet.framenum; - metadata.daq_rec = (uint64_t) c_packet.debug; - metadata.module_id = (int64_t) 0; -} - -inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char* frame_buffer){ - - while(!m_buffer.is_empty()){ - // Happens if the last packet from the previous frame gets lost. - if (m_frame_index != m_buffer.peek_front().framenum) { - m_frame_index = m_buffer.peek_front().framenum; - std::cout << "Peeked pulse: " << metadata.pulse_id << std::endl; - return metadata.pulse_id; - } - - // Otherwise pop the queue (and set current frame index) - jfjoch_packet_t& c_packet = m_buffer.pop_front(); - m_frame_index = c_packet.framenum; - - // Always copy metadata (otherwise problem when 0th packet gets lost) - this->init_frame(metadata, c_packet); - - // Copy data to frame buffer - size_t offset = JFJOCH_DATA_BYTES_PER_PACKET * c_packet.packetnum; - memcpy( (void*) (frame_buffer + offset), c_packet.data, JFJOCH_DATA_BYTES_PER_PACKET); - metadata.n_recv_packets++; - - // Last frame packet received. Frame finished. - if (c_packet.packetnum == JFJOCH_N_PACKETS_PER_FRAME - 1){ - return metadata.pulse_id; - } - } - - // We emptied the buffer. - // m_buffer.reset(); - return 0; -} - -uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer){ - // Reset the metadata and frame buffer for the next frame. (really needed?) - metadata.pulse_id = 0; - metadata.n_recv_packets = 0; - memset(frame_buffer, 0, JFJOCH_DATA_BYTES_PER_FRAME); - // Process leftover packages in the buffer - if (!m_buffer.is_empty()) { - auto pulse_id = process_packets(metadata, frame_buffer); - if (pulse_id != 0) { return pulse_id; } - } - - while (true) { - // Receive new packages (pass if none)... - m_buffer.reset(); - m_buffer.fill_from(m_udp_receiver); - if (m_buffer.is_empty()) { continue; } - - // ... and process them - auto pulse_id = process_packets(metadata, frame_buffer); - if (pulse_id != 0) { return pulse_id; } - } -} diff --git a/jfj-udp-recv/src/PacketUdpReceiver.cpp b/jfj-udp-recv/src/PacketUdpReceiver.cpp deleted file mode 100644 index 713dd98..0000000 --- a/jfj-udp-recv/src/PacketUdpReceiver.cpp +++ /dev/null @@ -1,66 +0,0 @@ -#include -#include -#include "PacketUdpReceiver.hpp" -#include "jungfrau.hpp" -#include -#include -#include "buffer_config.hpp" - -using namespace std; -using namespace buffer_config; - -PacketUdpReceiver::PacketUdpReceiver() : socket_fd_(-1) { } - -PacketUdpReceiver::~PacketUdpReceiver() { - disconnect(); -} - -void PacketUdpReceiver::bind(const uint16_t port){ - if (socket_fd_ > -1) { - throw runtime_error("Socket already bound."); - } - - socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0); - if (socket_fd_ < 0) { - throw runtime_error("Cannot open socket."); - } - - sockaddr_in server_address = {0}; - server_address.sin_family = AF_INET; - server_address.sin_addr.s_addr = INADDR_ANY; - server_address.sin_port = htons(port); - - timeval udp_socket_timeout; - udp_socket_timeout.tv_sec = 0; - udp_socket_timeout.tv_usec = BUFFER_UDP_US_TIMEOUT; - - if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVTIMEO, &udp_socket_timeout, sizeof(timeval)) == -1) { - throw runtime_error("Cannot set SO_RCVTIMEO. " + string(strerror(errno))); - } - - if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVBUF, &BUFFER_UDP_RCVBUF_BYTES, sizeof(int)) == -1) { - throw runtime_error("Cannot set SO_RCVBUF. " + string(strerror(errno))); - }; - //TODO: try to set SO_RCVLOWAT - - auto bind_result = ::bind(socket_fd_, reinterpret_cast(&server_address), sizeof(server_address)); - - if (bind_result < 0) { - throw runtime_error("Cannot bind socket."); - } -} - -int PacketUdpReceiver::receive_many(mmsghdr* msgs, const size_t n_msgs){ - return recvmmsg(socket_fd_, msgs, n_msgs, 0, 0); -} - -bool PacketUdpReceiver::receive(void* buffer, const size_t buffer_n_bytes){ - auto data_len = recv(socket_fd_, buffer, buffer_n_bytes, 0); - - return (data_len == buffer_n_bytes) ? true : false; -} - -void PacketUdpReceiver::disconnect(){ - close(socket_fd_); - socket_fd_ = -1; -} diff --git a/jfj-udp-recv/src/main.cpp b/jfj-udp-recv/src/main.cpp deleted file mode 100644 index 2afb451..0000000 --- a/jfj-udp-recv/src/main.cpp +++ /dev/null @@ -1,74 +0,0 @@ -#include -#include -#include -#include - -#include "formats.hpp" -#include "buffer_config.hpp" -#include "JfjFrameUdpReceiver.hpp" -#include "BufferUtils.hpp" -#include "JfjFrameStats.hpp" - -using namespace std; -using namespace chrono; -using namespace buffer_config; -using namespace BufferUtils; - -int main (int argc, char *argv[]) { - - if (argc != 3) { - cout << endl; - cout << "Usage: jfj_udp_recv [detector_json_filename]" << endl; - cout << "\tdetector_json_filename: detector config file path." << endl; - cout << endl; - - exit(-1); - } - - const auto config = read_json_config(string(argv[1])); - - const auto udp_port = config.start_udp_port; - JfjFrameUdpReceiver receiver(udp_port); - RamBuffer buffer(config.detector_name, config.n_modules); - FrameStats stats(config.detector_name, 0, STATS_TIME); - - auto ctx = zmq_ctx_new(); - zmq_ctx_set(ctx, ZMQ_IO_THREADS, ZMQ_IO_THREADS); - auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "jungfraujoch"); - - // Might be better creating a structure for double buffering - ModuleFrame frameMeta; - ImageMetadata imageMeta; - char* dataBuffer = new char[JFJOCH_DATA_BYTES_PER_FRAME]; - - uint64_t pulse_id_previous = 0; - uint64_t frame_index_previous = 0; - - - while (true) { - // NOTE: Needs to be pipelined for really high frame rates - auto pulse_id = receiver.get_frame_from_udp(frameMeta, dataBuffer); - - bool bad_pulse_id = false; - - if ( ( frameMeta.frame_index != (frame_index_previous+1) ) || ( (pulse_id-pulse_id_previous) < 0 ) || ( (pulse_id-pulse_id_previous) > 1000 ) ) { - bad_pulse_id = true; - } else { - imageMeta.pulse_id = frameMeta.pulse_id; - imageMeta.frame_index = frameMeta.frame_index; - imageMeta.daq_rec = frameMeta.daq_rec; - imageMeta.is_good_image = true; - - buffer.write_frame(frameMeta, dataBuffer); - zmq_send(sender, &imageMeta, sizeof(imageMeta), 0); - } - - stats.record_stats(frameMeta, bad_pulse_id); - - pulse_id_previous = pulse_id; - frame_index_previous = frameMeta.frame_index; - - } - - delete[] dataBuffer; -} diff --git a/jfj-udp-recv/test/CMakeLists.txt b/jfj-udp-recv/test/CMakeLists.txt deleted file mode 100644 index 77eeb3a..0000000 --- a/jfj-udp-recv/test/CMakeLists.txt +++ /dev/null @@ -1,8 +0,0 @@ -add_executable(jfj-udp-recv-tests main.cpp) - -target_link_libraries(jfj-udp-recv-tests - core-buffer-lib - jfj-udp-recv-lib - gtest - ) - diff --git a/jfj-udp-recv/test/main.cpp b/jfj-udp-recv/test/main.cpp deleted file mode 100644 index 8f2cd01..0000000 --- a/jfj-udp-recv/test/main.cpp +++ /dev/null @@ -1,10 +0,0 @@ -#include "gtest/gtest.h" -#include "test_PacketUdpReceiver.cpp" -#include "test_FrameUdpReceiver.cpp" - -using namespace std; - -int main(int argc, char **argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/jfj-udp-recv/test/mock/udp.hpp b/jfj-udp-recv/test/mock/udp.hpp deleted file mode 100644 index 1cef271..0000000 --- a/jfj-udp-recv/test/mock/udp.hpp +++ /dev/null @@ -1,16 +0,0 @@ -#ifndef MOCK_UDP_H -#define MOCK_UDP_H - -const int MOCK_UDP_PORT(13000); - -sockaddr_in get_server_address(uint16_t udp_port) -{ - sockaddr_in server_address = {0}; - server_address.sin_family = AF_INET; - server_address.sin_addr.s_addr = INADDR_ANY; - server_address.sin_port = htons(udp_port); - - return server_address; -} - -#endif \ No newline at end of file diff --git a/jfj-udp-recv/test/test_FrameUdpReceiver.cpp b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp deleted file mode 100644 index 3f490c7..0000000 --- a/jfj-udp-recv/test/test_FrameUdpReceiver.cpp +++ /dev/null @@ -1,230 +0,0 @@ -#include -#include -#include "gtest/gtest.h" -#include "JfjFrameUdpReceiver.hpp" -#include "mock/udp.hpp" - -#include -#include -#include - -using namespace std; - -TEST(BufferUdpReceiver, simple_recv) -{ - int n_packets = JFJOCH_N_PACKETS_PER_FRAME; - int n_frames = 5; - - uint16_t udp_port = MOCK_UDP_PORT; - auto server_address = get_server_address(udp_port); - auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); - ASSERT_TRUE(send_socket_fd >= 0); - - JfjFrameUdpReceiver udp_receiver(udp_port); - - auto handle = async(launch::async, [&](){ - for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ - for (size_t i_packet=0; i_packet(JFJOCH_DATA_BYTES_PER_FRAME); - - for (int i_frame=0; i_frame < n_frames; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp( - metadata, frame_buffer.get()); - - ASSERT_EQ(i_frame + 1, pulse_id); - ASSERT_EQ(metadata.frame_index, i_frame + 1000); - ASSERT_EQ(metadata.daq_rec, i_frame + 10000); - // -1 because we skipped a packet. - ASSERT_EQ(metadata.n_recv_packets, n_packets); - } - - ::close(send_socket_fd); -} - -TEST(BufferUdpReceiver, missing_middle_packet) -{ - int n_packets = JFJOCH_N_PACKETS_PER_FRAME; - int n_frames = 3; - - uint16_t udp_port = MOCK_UDP_PORT; - auto server_address = get_server_address(udp_port); - auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); - ASSERT_TRUE(send_socket_fd >= 0); - - JfjFrameUdpReceiver udp_receiver(udp_port); - - auto handle = async(launch::async, [&](){ - for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ - for (size_t i_packet=0; i_packet(JFJOCH_DATA_BYTES_PER_FRAME); - - for (int i_frame=0; i_frame < n_frames; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp( - metadata, frame_buffer.get()); - - ASSERT_EQ(i_frame + 1, pulse_id); - ASSERT_EQ(metadata.frame_index, i_frame + 1000); - ASSERT_EQ(metadata.daq_rec, i_frame + 10000); - // -1 because we skipped a packet. - ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); - } - - ::close(send_socket_fd); -} - -TEST(BufferUdpReceiver, missing_first_packet) -{ - auto n_packets = JFJOCH_N_PACKETS_PER_FRAME; - int n_frames = 3; - - uint16_t udp_port = MOCK_UDP_PORT; - auto server_address = get_server_address(udp_port); - auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); - ASSERT_TRUE(send_socket_fd >= 0); - - JfjFrameUdpReceiver udp_receiver(udp_port); - - auto handle = async(launch::async, [&](){ - for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ - for (size_t i_packet=0; i_packet(JFJOCH_DATA_BYTES_PER_FRAME); - - for (int i_frame=0; i_frame < n_frames; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp( - metadata, frame_buffer.get()); - - ASSERT_EQ(i_frame + 1, pulse_id); - ASSERT_EQ(metadata.frame_index, i_frame + 1000); - ASSERT_EQ(metadata.daq_rec, i_frame + 10000); - // -1 because we skipped a packet. - ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); - } - - ::close(send_socket_fd); -} - -TEST(BufferUdpReceiver, missing_last_packet) -{ - int n_packets = JFJOCH_N_PACKETS_PER_FRAME; - int n_frames = 3; - - uint16_t udp_port = MOCK_UDP_PORT; - auto server_address = get_server_address(udp_port); - auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); - ASSERT_TRUE(send_socket_fd >= 0); - - JfjFrameUdpReceiver udp_receiver(udp_port); - - auto handle = async(launch::async, [&](){ - for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ - for (size_t i_packet=0; i_packet(JFJOCH_DATA_BYTES_PER_FRAME); - - // n_frames -1 because the last frame is not complete. - for (int i_frame=0; i_frame < n_frames - 1; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get()); - - ASSERT_EQ(i_frame + 1, pulse_id); - ASSERT_EQ(metadata.frame_index, i_frame + 1000); - ASSERT_EQ(metadata.daq_rec, i_frame + 10000); - // -1 because we skipped a packet. - ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); - } - - ::close(send_socket_fd); -} diff --git a/jfj-udp-recv/test/test_PacketUdpReceiver.cpp b/jfj-udp-recv/test/test_PacketUdpReceiver.cpp deleted file mode 100644 index 1be343a..0000000 --- a/jfj-udp-recv/test/test_PacketUdpReceiver.cpp +++ /dev/null @@ -1,170 +0,0 @@ -#include -#include -#include "gtest/gtest.h" -#include "mock/udp.hpp" -#include "PacketUdpReceiver.hpp" - -#include -#include - -using namespace std; - -TEST(PacketUdpReceiver, simple_recv) -{ - uint16_t udp_port = MOCK_UDP_PORT; - - auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); - ASSERT_TRUE(send_socket_fd >= 0); - - PacketUdpReceiver udp_receiver; - udp_receiver.bind(udp_port); - - jungfrau_packet send_udp_buffer; - send_udp_buffer.packetnum = 91; - send_udp_buffer.framenum = 92; - send_udp_buffer.bunchid = 93; - send_udp_buffer.debug = 94; - - auto server_address = get_server_address(udp_port); - ::sendto( - send_socket_fd, - &send_udp_buffer, - JUNGFRAU_BYTES_PER_PACKET, - 0, - (sockaddr*) &server_address, - sizeof(server_address)); - - this_thread::sleep_for(chrono::milliseconds(100)); - - jungfrau_packet recv_udp_buffer; - ASSERT_TRUE(udp_receiver.receive( - &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); - - EXPECT_EQ(send_udp_buffer.packetnum, recv_udp_buffer.packetnum); - EXPECT_EQ(send_udp_buffer.framenum, recv_udp_buffer.framenum); - EXPECT_EQ(send_udp_buffer.bunchid, recv_udp_buffer.bunchid); - EXPECT_EQ(send_udp_buffer.debug, recv_udp_buffer.debug); - - ASSERT_FALSE(udp_receiver.receive( - &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); - - udp_receiver.disconnect(); - ::close(send_socket_fd); -} - -TEST(PacketUdpReceiver, false_recv) -{ - uint16_t udp_port = MOCK_UDP_PORT; - - auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); - ASSERT_TRUE(send_socket_fd >= 0); - - PacketUdpReceiver udp_receiver; - udp_receiver.bind(udp_port); - - jungfrau_packet send_udp_buffer; - jungfrau_packet recv_udp_buffer; - - auto server_address = get_server_address(udp_port); - - ::sendto( - send_socket_fd, - &send_udp_buffer, - JUNGFRAU_BYTES_PER_PACKET-1, - 0, - (sockaddr*) &server_address, - sizeof(server_address)); - - ASSERT_FALSE(udp_receiver.receive( - &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); - - ::sendto( - send_socket_fd, - &send_udp_buffer, - JUNGFRAU_BYTES_PER_PACKET, - 0, - (sockaddr*) &server_address, - sizeof(server_address)); - - ASSERT_TRUE(udp_receiver.receive( - &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); - - ::sendto( - send_socket_fd, - &send_udp_buffer, - JUNGFRAU_BYTES_PER_PACKET-1, - 0, - (sockaddr*) &server_address, - sizeof(server_address)); - - ASSERT_TRUE(udp_receiver.receive( - &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET-1)); - - udp_receiver.disconnect(); - ::close(send_socket_fd); -} - -TEST(PacketUdpReceiver, receive_many) -{ - auto n_msg_buffer = JF_N_PACKETS_PER_FRAME; - jungfrau_packet recv_buffer[n_msg_buffer]; - iovec recv_buff_ptr[n_msg_buffer]; - struct mmsghdr msgs[n_msg_buffer]; - struct sockaddr_in sockFrom[n_msg_buffer]; - - for (int i = 0; i < n_msg_buffer; i++) { - recv_buff_ptr[i].iov_base = (void*) &(recv_buffer[i]); - recv_buff_ptr[i].iov_len = sizeof(jungfrau_packet); - - msgs[i].msg_hdr.msg_iov = &recv_buff_ptr[i]; - msgs[i].msg_hdr.msg_iovlen = 1; - msgs[i].msg_hdr.msg_name = &sockFrom[i]; - msgs[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); - } - - uint16_t udp_port = MOCK_UDP_PORT; - - auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); - ASSERT_TRUE(send_socket_fd >= 0); - - PacketUdpReceiver udp_receiver; - udp_receiver.bind(udp_port); - - jungfrau_packet send_udp_buffer; - - auto server_address = get_server_address(udp_port); - - send_udp_buffer.bunchid = 0; - ::sendto( - send_socket_fd, - &send_udp_buffer, - JUNGFRAU_BYTES_PER_PACKET, - 0, - (sockaddr*) &server_address, - sizeof(server_address)); - - send_udp_buffer.bunchid = 1; - ::sendto( - send_socket_fd, - &send_udp_buffer, - JUNGFRAU_BYTES_PER_PACKET, - 0, - (sockaddr*) &server_address, - sizeof(server_address)); - - this_thread::sleep_for(chrono::milliseconds(10)); - - auto n_msgs = udp_receiver.receive_many(msgs, JF_N_PACKETS_PER_FRAME); - ASSERT_EQ(n_msgs, 2); - - for (size_t i=0;i