diff --git a/jfj-combined/include/JfjFrameCache.hpp b/jfj-combined/include/JfjFrameCache.hpp index cc226ca..1d633b4 100644 --- a/jfj-combined/include/JfjFrameCache.hpp +++ b/jfj-combined/include/JfjFrameCache.hpp @@ -1,24 +1,23 @@ -#ifndef FRAME_CACHE_HPP -#define FRAME_CACHE_HPP +#ifndef SF_DAQ_FRAME_CACHE_HPP +#define SF_DAQ_FRAME_CACHE_HPP -#include -#include -#include +// #include +// #include +// #include #include -#include -#include #include -#include #include -#include -#include +#include +// #include +// #include +// #include #include "../../core-buffer/include/formats.hpp" /** Frame cache - Reimplemented RamBuffer for better concurrency. + Reimplemented RamBuffer that handles concurrency internally via mutexes. The class operates on in-memory arrays via pointer/reference access. It uses a linearly increasing pulseID index to provide some headroom for collecting frames from multiple detectors. @@ -28,24 +27,26 @@ public: FrameCache(uint64_t _C, uint64_t N_MOD, std::function callback): m_CAP(_C), m_M(N_MOD), m_buffer(_C, ImageBinaryFormat(512*N_MOD, 1024, sizeof(uint16_t))), - f_send(callback), m_lock(_C), m_valid(_C) { + f_send(callback), m_lock(_C), m_valid(_C, 0) { // Initialize buffer metadata for(auto& it: m_buffer){ memset(&it.meta, 0, sizeof(it.meta)); } // Initialize Mutexes - for(auto& it: m_valid){ it = 0; } + //for(auto& it: m_valid){ it = 0; } }; /** Emplace Place a recorded frame to it's corresponding module location. - This simultaneously handles buffering and assembly. **/ + This simultaneously handles buffering, assembly and flushing. + Also handles concurrency (shared and unique mutexes). **/ void emplace(uint64_t pulseID, uint64_t moduleIDX, BufferBinaryFormat& inc_frame){ uint64_t idx = pulseID % m_CAP; // A new frame is starting if(inc_frame.meta.pulse_id != m_buffer[idx].meta.pulse_id){ + // Unique lock to flush and start a new one std::unique_lock p_guard(m_lock[idx]); // Check if condition persists after getting the mutex if(inc_frame.meta.pulse_id != m_buffer[idx].meta.pulse_id){ @@ -56,34 +57,38 @@ public: // Shared lock for concurrent PUT operations std::shared_lock s_guard(m_lock[idx]); - // Calculate destination pointer (easier to debug) + // Calculate destination pointer and copy data char* ptr_dest = m_buffer[idx].data.data() + moduleIDX * m_blocksize; std::memcpy((void*)ptr_dest, (void*)&inc_frame.data, m_blocksize); } void flush_all(){ for(int64_t idx=0; idx< m_CAP; idx++){ + std::unique_lock p_guard(m_lock[idx]); flush_line(idx); } } - // Flush and invalidate a line (incl. lock) + /** Flush and invalidate a line + + Flushes a valid cache line and invalidates the associated buffer. + NOTE : It does not lock, that must be done externally! **/ void flush_line(uint64_t idx){ - std::unique_lock guard(m_lock[idx]); if(m_valid[idx]){ f_send(m_buffer[idx]); m_valid[idx] = 0; } } - // Flush and start a new line (incl. lock) + /** Flush and start a new line + + Flushes a valid cache line and starts another one from the provided metadata. + NOTE : It does not lock, that must be done externally! **/ void start_line(uint64_t idx, ModuleFrame& inc_frame){ - // 0. Guard // 1. Flush - if(m_valid[idx]){ - f_send(m_buffer[idx]); - } - // 2. Init + if(m_valid[idx]){ f_send(m_buffer[idx]); } + + // 2. Init new frame m_buffer[idx].meta.pulse_id = inc_frame.pulse_id; m_buffer[idx].meta.frame_index = inc_frame.frame_index; m_buffer[idx].meta.daq_rec = inc_frame.daq_rec; @@ -95,12 +100,14 @@ private: const uint64_t m_CAP; const uint64_t m_M; const uint64_t m_blocksize = 1024*512*sizeof(uint16_t); + + /** Flush function **/ std::function f_send; /** Main container and mutex guard **/ + std::vector m_valid; std::vector m_lock; - std::vector> m_valid; std::vector m_buffer; }; -#endif // FRAME_CACHE_HPP +#endif // SF_DAQ_FRAME_CACHE_HPP diff --git a/jfj-combined/include/JfjFrameStats.hpp b/jfj-combined/include/JfjFrameStats.hpp index e96d3f5..db73ab8 100644 --- a/jfj-combined/include/JfjFrameStats.hpp +++ b/jfj-combined/include/JfjFrameStats.hpp @@ -7,6 +7,7 @@ class FrameStats { +private: const std::string detector_name_; const int module_id_; size_t stats_time_; @@ -21,9 +22,7 @@ class FrameStats { void print_stats(); public: - FrameStats(const std::string &detector_name, - const int module_id, - const size_t stats_time); + FrameStats(const std::string &detector_name, const int module_id, const size_t stats_time); void record_stats(const ModuleFrame &meta, const bool bad_pulse_id); }; diff --git a/jfj-combined/include/JfjFrameWorker.hpp b/jfj-combined/include/JfjFrameWorker.hpp index df3e0c5..218a721 100644 --- a/jfj-combined/include/JfjFrameWorker.hpp +++ b/jfj-combined/include/JfjFrameWorker.hpp @@ -9,32 +9,33 @@ /** JungfrauJoch UDP receiver - Wrapper class to capture frames from the UDP stream of the JungfrauJoch FPGA card. + Capture UDP data stream from Jungfrau(Joch) FPGA card. NOTE: This design will not scale well for higher frame rates... + TODO: Direct copy into FrameCache buffer (saves a memcopy) **/ class JfjFrameWorker { - std::string m_state = "INIT"; - PacketUdpReceiver m_udp_receiver; - bool in_progress = false; - uint64_t m_frame_index = 0; + const std::string m_moduleName; const uint64_t m_moduleID; + // UDP and statistics interfaces + FrameStats m_moduleStats; + PacketUdpReceiver m_udp_receiver; + + // Buffer and helper structures + bool in_progress = false; + uint64_t m_current_index = 0; PacketBuffer m_buffer; + // Buffer processing inline uint64_t process_packets(BufferBinaryFormat& buffer); uint64_t get_frame(BufferBinaryFormat& buffer); std::function f_push_callback; public: - JfjFrameWorker(const uint16_t port, const uint32_t moduleID, + JfjFrameWorker(const uint16_t port, std::string moduleName, const uint32_t moduleID, std::function callback); virtual ~JfjFrameWorker(); - std::string print() const; void run(); }; - -std::ostream& operator<<(std::ostream& os, const JfjFrameWorker& worker); - - #endif //SF_DAQ_BUFFER_JFJ_FRAMEWORKER_HPP diff --git a/jfj-combined/include/PacketBuffer.hpp b/jfj-combined/include/PacketBuffer.hpp index f571094..b513991 100644 --- a/jfj-combined/include/PacketBuffer.hpp +++ b/jfj-combined/include/PacketBuffer.hpp @@ -23,11 +23,11 @@ template class PacketBuffer{ public: PacketBuffer() { + // Initialize C-structures as expected by for (int i = 0; i < CAPACITY; i++) { m_recv_buff_ptr[i].iov_base = (void*) &(m_container[i]); m_recv_buff_ptr[i].iov_len = sizeof(T); - // C-structure as expected by m_msgs[i].msg_hdr.msg_iov = &m_recv_buff_ptr[i]; m_msgs[i].msg_hdr.msg_iovlen = 1; m_msgs[i].msg_hdr.msg_name = &m_sock_from[i]; @@ -52,12 +52,11 @@ public: const T& peek_front(); //Non-destructive read void push_back(T item); //Write new element to buffer - /**Fill from UDP receiver**/ + /**Fill from UDP receiver (threadsafe)**/ template void fill_from(TY& recv){ std::lock_guard g_guard(m_mutex); this->idx_write = recv.receive_many(m_msgs, this->capacity()); - // std::cout << "Received " << this->idx_write << " frames" << std::endl; // Returns -1 with errno=11 if no data received if(idx_write==-1){ idx_write = 0; } this->idx_read = 0; @@ -90,7 +89,7 @@ private: template T& PacketBuffer::pop_front(){ std::lock_guard g_guard(m_mutex); - if(this->is_empty()){ throw std::out_of_range("Attempted to read empty queue!"); } + if(this->is_empty()) [[unlikely]] { throw std::out_of_range("Attempted to read empty queue!"); } idx_read++; return m_container[idx_read-1]; } @@ -101,7 +100,7 @@ T& PacketBuffer::pop_front(){ template const T& PacketBuffer::peek_front(){ std::lock_guard g_guard(m_mutex); - if(this->is_empty()){ throw std::out_of_range("Attempted to read empty queue!"); } + if(this->is_empty()) [[unlikely]] { throw std::out_of_range("Attempted to read empty queue!"); } return m_container[idx_read]; } @@ -110,7 +109,7 @@ const T& PacketBuffer::peek_front(){ template void PacketBuffer::push_back(T item){ std::lock_guard g_guard(m_mutex); - if(this->is_full()){ throw std::out_of_range("Attempted to write a full buffer!"); } + if(this->is_full()) [[unlikely]] { throw std::out_of_range("Attempted to write a full buffer!"); } m_container[idx_write] = item; idx_write++; } diff --git a/jfj-combined/include/ZmqImagePublisher.hpp b/jfj-combined/include/ZmqImagePublisher.hpp index 5ababe4..a9dc117 100644 --- a/jfj-combined/include/ZmqImagePublisher.hpp +++ b/jfj-combined/include/ZmqImagePublisher.hpp @@ -7,7 +7,7 @@ #define ASSERT_FALSE(expr, msg) \ - if(bool(expr)){ \ + if(bool(expr)) [[unlikely]] { \ std::string text = "ASSERTION called at " + std::string(__FILE__) + " line " + std::to_string(__LINE__) + "\n"; \ text = text + "Reason: " + std::to_string(expr) + "\n"; \ text = text + "Message:" + msg + "\nErrno: " + std::to_string(errno); \ @@ -15,18 +15,18 @@ } \ #define ASSERT_TRUE(expr, msg) \ - if(!bool(expr)){ \ + if(!bool(expr)) [[unlikely]] { \ std::string text = "ASSERTION called at " + std::string(__FILE__) + " line " + std::to_string(__LINE__) + "\n"; \ text = text + "Reason: " + std::to_string(expr) + "\n"; \ text = text + "Message:" + msg + "\nErrno: " + std::to_string(errno); \ throw std::runtime_error(text); \ - } + } /** ZMQ Publisher Lightweight wrapper base class to initialize a ZMQ Publisher. Nothing data specific, but everything is only 'protected'. - It also has an internal mutex that can be used for threadsafe + It also has an internal mutex that can be used for threadsafe access to the undelying connection; **/ class ZmqPublisher { @@ -36,22 +36,22 @@ class ZmqPublisher { zmq::context_t m_ctx; zmq::socket_t m_socket; std::mutex g_zmq_socket; - - public: + + public: ZmqPublisher(std::string ip, uint16_t port) : m_port(port), m_address("tcp://*:" + std::to_string(port)), m_ctx(1), m_socket(m_ctx, ZMQ_PUB) { // Bind the socket m_socket.bind(m_address.c_str()); std::cout << "Initialized ZMQ publisher at " << m_address << std::endl; }; - + ~ZmqPublisher(){}; }; /** ZMQ Image Publisher - Specialized publisher to send 'ImageBinaryFormat' data format as + Specialized publisher to send 'ImageBinaryFormat' data format as multipart message. It also takes care of thread safety. **/ class ZmqImagePublisher: public ZmqPublisher { @@ -62,9 +62,9 @@ class ZmqImagePublisher: public ZmqPublisher { void sendImage(ImageBinaryFormat& image){ std::lock_guard guard(g_zmq_socket); int len; - + len = m_socket.send(topic.c_str(), topic.size(), ZMQ_SNDMORE); - ASSERT_TRUE( len >=0, "Failed to send topic data" ) + ASSERT_TRUE( len >=0, "Failed to send topic data" ) len = m_socket.send(&image.meta, sizeof(image.meta), ZMQ_SNDMORE); ASSERT_TRUE( len >=0, "Failed to send meta data" ) // std::cout << "\tPT1 Sent " << len << "\n"; diff --git a/jfj-combined/src/JfjFrameStats.cpp b/jfj-combined/src/JfjFrameStats.cpp index ee48423..fd843ef 100644 --- a/jfj-combined/src/JfjFrameStats.cpp +++ b/jfj-combined/src/JfjFrameStats.cpp @@ -1,4 +1,4 @@ -#include +#include #include "JfjFrameStats.hpp" using namespace std; @@ -15,8 +15,7 @@ FrameStats::FrameStats( reset_counters(); } -void FrameStats::reset_counters() -{ +void FrameStats::reset_counters(){ frames_counter_ = 0; n_missed_packets_ = 0; n_corrupted_frames_ = 0; @@ -24,12 +23,11 @@ void FrameStats::reset_counters() stats_interval_start_ = steady_clock::now(); } -void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id) -{ +void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id){ if (bad_pulse_id) { n_corrupted_pulse_id_++; - } + } if (meta.n_recv_packets < JF_N_PACKETS_PER_FRAME) { n_missed_packets_ += JF_N_PACKETS_PER_FRAME - meta.n_recv_packets; @@ -38,8 +36,7 @@ void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id) frames_counter_++; - auto time_passed = duration_cast( - steady_clock::now()-stats_interval_start_).count(); + auto time_passed = duration_cast(steady_clock::now()-stats_interval_start_).count(); if (time_passed >= stats_time_*1000) { print_stats(); @@ -47,25 +44,24 @@ void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id) } } -void FrameStats::print_stats() -{ - auto interval_ms_duration = duration_cast( - steady_clock::now()-stats_interval_start_).count(); +void FrameStats::print_stats(){ + auto interval_ms_duration = duration_cast(steady_clock::now()-stats_interval_start_).count(); // * 1000 because milliseconds, + 250 because of truncation. int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration; - uint64_t timestamp = time_point_cast( - system_clock::now()).time_since_epoch().count(); + uint64_t timestamp = time_point_cast(system_clock::now()).time_since_epoch().count(); // Output in InfluxDB line protocol - cout << "jf_udp_recv"; - cout << ",detector_name=" << detector_name_; - cout << ",module_name=M" << module_id_; - cout << " "; - cout << "n_missed_packets=" << n_missed_packets_ << "i"; - cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; - cout << ",repetition_rate=" << rep_rate << "i"; - cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; - cout << " "; - cout << timestamp; - cout << endl; + stringstream ss; + ss << "jf_udp_recv"; + ss << ",detector_name=" << detector_name_; + ss << ",module_name=M" << module_id_; + ss << " "; + ss << "n_missed_packets=" << n_missed_packets_ << "i"; + ss << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; + ss << ",repetition_rate=" << rep_rate << "i"; + ss << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; + ss << " "; + ss << timestamp; + ss << endl; + std::cout << ss.str(); } diff --git a/jfj-combined/src/JfjFrameWorker.cpp b/jfj-combined/src/JfjFrameWorker.cpp index 8ab4fbc..3979706 100644 --- a/jfj-combined/src/JfjFrameWorker.cpp +++ b/jfj-combined/src/JfjFrameWorker.cpp @@ -1,16 +1,11 @@ -#include +#include + #include "JfjFrameWorker.hpp" -using namespace std; -using namespace buffer_config; - - -JfjFrameWorker::JfjFrameWorker(const uint16_t port, const uint32_t moduleID, - std::function callback): - m_moduleID(moduleID), f_push_callback(callback) { +JfjFrameWorker::JfjFrameWorker(const uint16_t port, std::string moduleName, const uint32_t moduleID, std::function callback): + m_moduleName(moduleName), m_moduleID(moduleID), m_moduleStats(modulename, moduleID, 10.0), f_push_callback(callback) { m_udp_receiver.bind(port); - m_state = "ON"; } @@ -18,14 +13,17 @@ JfjFrameWorker::~JfjFrameWorker() { m_udp_receiver.disconnect(); } +/** Process Packets + Drains the buffer either until it's empty or the current frame is finished. + Has some optimizations and safety checks before segfaulting right away... + TODO: Direct memcopy into FrameCache for more speed! **/ inline uint64_t JfjFrameWorker::process_packets(BufferBinaryFormat& buffer){ - //std::cout << " Called process_packets()" << std::endl; while(!m_buffer.is_empty()){ // Happens if the last packet from the previous frame gets lost. - if (m_frame_index != m_buffer.peek_front().framenum) { - m_frame_index = m_buffer.peek_front().framenum; + if (m_current_index != m_buffer.peek_front().bunchid) [[unlikely]] { + m_current_index = m_buffer.peek_front().bunchid; if(this->in_progress){ this->in_progress = false; return buffer.meta.pulse_id; @@ -34,30 +32,32 @@ inline uint64_t JfjFrameWorker::process_packets(BufferBinaryFormat& buffer){ // Otherwise pop the queue (and set current frame index) jfjoch_packet_t& c_packet = m_buffer.pop_front(); - m_frame_index = c_packet.framenum; - this->in_progress = true; - //std::cout << " ff: " << c_packet.framenum << std::endl; - //std::cout << " ex: " << c_packet.exptime << std::endl; - //std::cout << " pp: " << c_packet.packetnum << std::endl; - // Always copy metadata (otherwise problem when 0th packet gets lost) - buffer.meta.pulse_id = c_packet.bunchid; - buffer.meta.frame_index = c_packet.framenum; - buffer.meta.daq_rec = c_packet.debug; - buffer.meta.module_id = m_moduleID; - + // Sanity check: rather throw than segfault... + if(c_packet.packetnum >= JF_N_PACKETS_PER_FRAME) [[unlikely]] { + std::stringstream ss << "Packet index '" << c_packet.packetnum "' is out of range of " << JF_N_PACKETS_PER_FRAME << std::endl; + throw std::range_error(ss.str()); + } + + // Start new frame + if(!this->in_progress) [[unlikely]] { + m_current_index = c_packet.bunchid; + this->in_progress = true; + + // Always copy metadata (otherwise problem when 0th packet gets lost) + buffer.meta.pulse_id = c_packet.bunchid; + buffer.meta.frame_index = c_packet.framenum; + buffer.meta.daq_rec = c_packet.debug; + buffer.meta.module_id = m_moduleID; + } + // Copy data to frame buffer - if(c_packet.packetnum >= JF_N_PACKETS_PER_FRAME){ - std::cout << "Too high packet index: " << c_packet.packetnum << std::endl; - return 0; - } size_t offset = JUNGFRAU_DATA_BYTES_PER_PACKET * c_packet.packetnum; memcpy( (void*) (buffer.data + offset), c_packet.data, JUNGFRAU_DATA_BYTES_PER_PACKET); buffer.meta.n_recv_packets++; // Last frame packet received. Frame finished. - if (c_packet.packetnum == JF_N_PACKETS_PER_FRAME - 1){ - // std::cout << "Finished pulse: " << buffer.meta.pulse_id << std::endl; + if (c_packet.packetnum == JF_N_PACKETS_PER_FRAME - 1) [[unlikely]] { this->in_progress = false; return buffer.meta.pulse_id; } @@ -68,68 +68,39 @@ inline uint64_t JfjFrameWorker::process_packets(BufferBinaryFormat& buffer){ } uint64_t JfjFrameWorker::get_frame(BufferBinaryFormat& buffer){ - //std::cout << "Called get_frame()" << std::endl; - // std::cout << "Called get_frame()" << std::endl; - // Reset the metadata and frame buffer for the next frame. (really needed?) + // Reset the metadata and frame buffer for the next frame memset(&buffer, 0, sizeof(buffer)); + uint64_t pulse_id = 0; - // Process leftover packages in the buffer - if (!m_buffer.is_empty()) { - //std::cout << "Leftovers" << std::endl; - auto pulse_id = process_packets(buffer); - if (pulse_id != 0) { return pulse_id; } - } + // Hehehehe... do-while loop! + do { + // First make sure the buffer is drained of leftovers + pulse_id = process_packets(buffer); + if (pulse_id != 0) [[likely]] { return pulse_id; } - - while (true) { - // Receive new packages (pass if none)... - //std::cout << "New packages" << std::endl; + // Then try to refill buffer... m_buffer.fill_from(m_udp_receiver); - if (m_buffer.is_empty()) { continue; } - // std::cout << "\tGot " << m_buffer.size() << std::endl; - - // ... and process them - auto pulse_id = process_packets(buffer); - if (pulse_id != 0) { return pulse_id; } - } + } while (true); } + void JfjFrameWorker::run(){ std::cout << "Running worker loop" << std::endl; // Might be better creating a structure for double buffering BufferBinaryFormat buffer; - uint64_t pulse_id_previous = 0; - uint64_t frame_index_previous = 0; - try{ - m_state = "RUNNING"; while (true) { // NOTE: Needs to be pipelined for really high frame rates auto pulse_id = get_frame(buffer); + m_stats.record_stats(buffer.meta, true); - if(pulse_id>10){ + if(pulse_id>10) [[likely]] { f_push_callback(pulse_id, m_moduleID, buffer); } } } catch (const std::exception& ex) { std::cout << "Exception in worker loop: " << ex.what() << std::endl; - m_state = "ERROR"; throw; }; - } - -std::string JfjFrameWorker::print() const { - std::string msg = "JungfrauFrameWorker #" + std::to_string(m_moduleID) + "\n"+ - "State:\t" + m_state + "\n"; - return msg; -} - - -std::ostream& operator<<(std::ostream& os, const JfjFrameWorker& worker){ - os << worker.print() << std::endl; - return os; -} - -