diff --git a/core-buffer/include/TypeMap.hpp b/core-buffer/include/TypeMap.hpp index fb23ff7..2967101 100644 --- a/core-buffer/include/TypeMap.hpp +++ b/core-buffer/include/TypeMap.hpp @@ -1,6 +1,7 @@ #ifndef SF_DAQ_BUFFER_TYPEMAP_HPP #define SF_DAQ_BUFFER_TYPEMAP_HPP +#include #include @@ -27,7 +28,7 @@ struct Type{ }; -const std::unordered_map TypeTable = { +const std::unordered_map TypeTable = { { typeid(void), {sizeof(void), TypeMap::VOID} }, { typeid(char), {sizeof(char), TypeMap::CHAR} }, { typeid(int8_t), {sizeof(int8_t), TypeMap::INT8} }, diff --git a/jfj-combined/include/JfjFrameCache.hpp b/jfj-combined/include/JfjFrameCache.hpp index aa6e185..ac42fa5 100644 --- a/jfj-combined/include/JfjFrameCache.hpp +++ b/jfj-combined/include/JfjFrameCache.hpp @@ -11,26 +11,36 @@ #include "Watchdog.hpp" -/** Frame cache +/** Frame Cache - Reimplemented threadsafe RamBuffer that handles concurrency internally via mutexes. + Reimplemented thread-safe RamBuffer that handles concurrency internally via mutexes. The class operates on in-memory arrays via pointer/reference access. It uses a linearly increasing pulseID index for cache addressing. The standard placement method ensures that no data corruption occurs, lines are always flushed before overwrite. A large-enough buffer should ensure that there is sufficient time to retrieve all data from all detector modules. - TODO: The class is header-only for future template-refactoring. + The cache line is flushed on three occasions: + - A new frame is about to overwrite it (by the frame-worker thread) + - Complete frames are queued for flushing internally (by internal worker) + - Incomplete frames are flushed by a watchdog after a timeout (by watchdog worker) + + NOTE: The class is header-only for future template-refactoring. + TODO: Multiple queue workers **/ class FrameCache{ public: FrameCache(uint64_t _C, uint64_t N_MOD, std::function callback): - m_CAP(_C), m_M(N_MOD), m_valid(_C, 0), m_lock(_C), + m_CAP(_C), m_MOD(N_MOD), m_valid(_C, 0), m_fill(_C, 0), m_lock(_C), m_buffer(_C, ImageBinaryFormat(512*N_MOD, 1024, sizeof(uint16_t))), f_send(callback), m_watchdog(500, flush_all) { + // Initialize buffer metadata for(auto& it: m_buffer){ memset(&it.meta, 0, sizeof(it.meta)); } + // Start watchdog m_watchdog.Start(); + // Start drain worker + m_drainer = std::thread(&FrameCache::drain_loop, this); }; @@ -62,26 +72,17 @@ public: // Calculate destination pointer and copy data char* ptr_dest = m_buffer[idx].data.data() + moduleIDX * m_blocksize; std::memcpy((void*)ptr_dest, (void*)&inc_frame.data, m_blocksize); - } + m_fill[idx]++; - void flush_all(){ - for(int64_t idx=0; idx< m_CAP; idx++){ - std::unique_lock p_guard(m_lock[idx]); - flush_line(idx); + // Queue for draining + if(m_fill[idx]==m_MOD-1){ + std::cout << "Complete frame at " << idx << "\t(queued for draining)" < p_guard(m_lock[idx]); + flush_line(idx); + } + } + + /** Drain loop + Flushes a valid cache line and invalidates the associated buffer. + NOTE : It does not lock, that must be done externally! **/ + void drain_loop(){ + while(true){ + if(!drain_fifo.empty()){ + uint32_t idx = drain_queue.pop_front(); + std::cout << "\tDraining " << idx << std::endl; + flush_line(idx); + } + } + } + + /** Variables **/ const uint64_t m_CAP; - const uint64_t m_M; + const uint64_t m_MOD; const uint64_t m_blocksize = 1024*512*sizeof(uint16_t); /** Flush function **/ @@ -108,11 +143,14 @@ private: /** Main container and mutex guard **/ std::vector m_valid; + std::vector m_fill; std::vector m_lock; std::vector m_buffer; - /** Watchdog timer **/ + /** Watchdog timer and flush queue **/ Watchdog m_watchdog; + std::thread m_drainer; + std::deque drain_queue(); }; #endif // SF_DAQ_FRAME_CACHE_HPP diff --git a/jfj-combined/include/ZmqImagePublisher.hpp b/jfj-combined/include/ZmqImagePublisher.hpp index a53751f..1993e5b 100644 --- a/jfj-combined/include/ZmqImagePublisher.hpp +++ b/jfj-combined/include/ZmqImagePublisher.hpp @@ -26,9 +26,10 @@ Lightweight wrapper base class to initialize a ZMQ Publisher. Nothing data specific, but everything is only 'protected'. - It also has an internal mutex that can be used for threadsafe - access to the undelying connection; + It also has an internal mutex that can be used for thread-safe + access to the underlying connection; **/ +template class ZmqPublisher { protected: const uint16_t m_port; @@ -39,7 +40,7 @@ class ZmqPublisher { public: ZmqPublisher(std::string ip, uint16_t port) : - m_port(port), m_address("tcp://*:" + std::to_string(port)), m_ctx(1), m_socket(m_ctx, ZMQ_PUB) { + m_port(port), m_address("tcp://*:" + std::to_string(port)), m_ctx(ZMQ_PUB_IO_THREADS), m_socket(m_ctx, ZMQ_PUB) { // Bind the socket m_socket.bind(m_address.c_str()); std::cout << "Initialized ZMQ publisher at " << m_address << std::endl; diff --git a/jfj-combined/src/main.cpp b/jfj-combined/src/main.cpp index df872d4..eec7e80 100644 --- a/jfj-combined/src/main.cpp +++ b/jfj-combined/src/main.cpp @@ -20,7 +20,7 @@ int main (int argc, char *argv[]) { std::cout << "Creating ZMQ socket..." << std::endl; - ZmqImagePublisher pub("*", 5200); + ZmqImagePublisher<2> pub("*", 5200); // ... and extracting sender function std::function zmq_publish = std::bind(&ZmqImagePublisher::sendImage, &pub, std::placeholders::_1); @@ -33,18 +33,18 @@ int main (int argc, char *argv[]) { std::bind(&FrameCache::emplace, &cache, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); - std::cout << "Creating workers..." << std::endl; + std::cout << "Creating frame workers..." << std::endl; std::vector> vWorkers; for(int mm=0; mm(config.start_udp_port+mm, moduleName, mm, push_cb) ); } - std::cout << "Starting worker threads..." << std::endl; + std::cout << "Starting frame worker threads..." << std::endl; std::vector vThreads; for(int mm=0; mm