From f121fd6ad2690efeeb75efce7b44799f720ac52c Mon Sep 17 00:00:00 2001 From: Mohacsi Istvan Date: Wed, 7 Jul 2021 12:09:00 +0200 Subject: [PATCH] Periodic watchdog fot cache flushing --- jfj-combined/include/JfjFrameCache.hpp | 30 +++++++++++++++-------- jfj-combined/include/Watchdog.hpp | 33 +++++++++++++------------- 2 files changed, 36 insertions(+), 27 deletions(-) diff --git a/jfj-combined/include/JfjFrameCache.hpp b/jfj-combined/include/JfjFrameCache.hpp index 22fe3d8..f974fba 100644 --- a/jfj-combined/include/JfjFrameCache.hpp +++ b/jfj-combined/include/JfjFrameCache.hpp @@ -8,26 +8,29 @@ #include #include "../../core-buffer/include/formats.hpp" +#include "Watchdog.hpp" /** Frame cache - Reimplemented RamBuffer that handles concurrency internally via mutexes. + Reimplemented threadsafe RamBuffer that handles concurrency internally via mutexes. The class operates on in-memory arrays via pointer/reference access. It uses a - linearly increasing pulseID index to provide some headroom for collecting frames - from multiple detectors. + linearly increasing pulseID index for cache addressing. The standard placement method + ensures that no data corruption occurs, lines are always flushed before overwrite. + A large-enough buffer should ensure that there is sufficient time to retrieve all + data from all detector modules. + + TODO: The class is header-only for future template-refactoring. **/ class FrameCache{ public: FrameCache(uint64_t _C, uint64_t N_MOD, std::function callback): - m_CAP(_C), m_M(N_MOD), + m_CAP(_C), m_M(N_MOD), m_valid(_C, 0), m_lock(_C), m_buffer(_C, ImageBinaryFormat(512*N_MOD, 1024, sizeof(uint16_t))), - f_send(callback), m_lock(_C), m_valid(_C, 0) { + f_send(callback), m_watchdog(500, m_flush_all) { // Initialize buffer metadata for(auto& it: m_buffer){ memset(&it.meta, 0, sizeof(it.meta)); } - - // Initialize Mutexes - //for(auto& it: m_valid){ it = 0; } + m_watchdog.Start(); }; @@ -35,9 +38,13 @@ public: Place a recorded frame to it's corresponding module location. This simultaneously handles buffering, assembly and flushing. - Also handles concurrency (shared and unique mutexes). **/ + Also handles concurrency (shared and unique mutexes). + + NOTE: Forced flushing is performed by the current thread. + **/ void emplace(uint64_t pulseID, uint64_t moduleIDX, BufferBinaryFormat& inc_frame){ - uint64_t idx = pulseID % m_CAP; + // Cache-line index + const uint64_t idx = pulseID % m_CAP; // A new frame is starting if(inc_frame.meta.pulse_id != m_buffer[idx].meta.pulse_id){ @@ -103,6 +110,9 @@ private: std::vector m_valid; std::vector m_lock; std::vector m_buffer; + + /** Watchdog timer **/ + Watchdog m_watchdog; }; #endif // SF_DAQ_FRAME_CACHE_HPP diff --git a/jfj-combined/include/Watchdog.hpp b/jfj-combined/include/Watchdog.hpp index 0dd917f..0758273 100644 --- a/jfj-combined/include/Watchdog.hpp +++ b/jfj-combined/include/Watchdog.hpp @@ -4,34 +4,30 @@ #include #include #include -#include #include #include +/** Watchdog timer class + + Unless kicked repeatedly, it periodically calls a user-defined function. + **/ class Watchdog{ public: - Watchdog(uint32_t timeout, std::function callback): m_timeout(timeout), m_callback(callback) { - m_timeout = timeout; - m_callback = callback; - m_running = false; - }; + Watchdog(uint32_t timeout, std::function callback): m_timeout(timeout), m_callback(callback) {}; ~Watchdog() { Stop(); }; void Start(); void Stop(); void Kick(); -private: +protected: + uint32_t m_timeout; std::atomic m_running = false; std::function m_callback; - uint32_t m_timeout; - std::chrono::time_point m_lastKick; - + std::chrono::time_point m_lastkick; std::thread m_thread; std::mutex m_mutex; - steady_clock::time_point m_lastPetTime; - std::condition_variable m_stopCondition; void Loop(); }; @@ -40,13 +36,13 @@ void Watchdog::Start(){ std::unique_lock lock(m_mutex); if(m_running == false){ m_running = true; - m_lastKick = std::chrono::steady_clock::now(); + m_lastkick = std::chrono::steady_clock::now(); m_thread = std::thread(&Watchdog::Loop, this); } } void Watchdog::Stop(){ - std::unique_lock locker(m_mutex); + std::unique_lock g_guard(m_mutex); if(m_running == true){ m_running = false; m_thread.join(); @@ -54,17 +50,20 @@ void Watchdog::Stop(){ } void Watchdog::Kick(){ - std::unique_lock locker(m_mutex); - m_lastKick = steady_clock::now(); + std::unique_lock g_guard(m_mutex); + m_lastkick = std::chrono::steady_clock::now(); } void Watchdog::Loop(){ while(m_running){ - if((std::chrono::now() - m_last_kick) < m_timeout){ + if((std::chrono::now() - m_lastkick) < m_timeout){ std::this_thread::sleep_for(std::chrono::milliseconds(1)); } else { std::cout << "Expired timer" << std::endl; m_callback(); + // Infinite re-kick + std::unique_lock g_guard(m_mutex); + m_lastkick = std::chrono::steady_clock::now(); } } }