Periodic watchdog fot cache flushing

This commit is contained in:
2021-07-07 12:09:00 +02:00
parent c717c9c9cf
commit f121fd6ad2
2 changed files with 36 additions and 27 deletions
+20 -10
View File
@@ -8,26 +8,29 @@
#include <shared_mutex>
#include "../../core-buffer/include/formats.hpp"
#include "Watchdog.hpp"
/** Frame cache
Reimplemented RamBuffer that handles concurrency internally via mutexes.
Reimplemented threadsafe RamBuffer that handles concurrency internally via mutexes.
The class operates on in-memory arrays via pointer/reference access. It uses a
linearly increasing pulseID index to provide some headroom for collecting frames
from multiple detectors.
linearly increasing pulseID index for cache addressing. The standard placement method
ensures that no data corruption occurs, lines are always flushed before overwrite.
A large-enough buffer should ensure that there is sufficient time to retrieve all
data from all detector modules.
TODO: The class is header-only for future template-refactoring.
**/
class FrameCache{
public:
FrameCache(uint64_t _C, uint64_t N_MOD, std::function<void(ImageBinaryFormat&)> callback):
m_CAP(_C), m_M(N_MOD),
m_CAP(_C), m_M(N_MOD), m_valid(_C, 0), m_lock(_C),
m_buffer(_C, ImageBinaryFormat(512*N_MOD, 1024, sizeof(uint16_t))),
f_send(callback), m_lock(_C), m_valid(_C, 0) {
f_send(callback), m_watchdog(500, m_flush_all) {
// Initialize buffer metadata
for(auto& it: m_buffer){ memset(&it.meta, 0, sizeof(it.meta)); }
// Initialize Mutexes
//for(auto& it: m_valid){ it = 0; }
m_watchdog.Start();
};
@@ -35,9 +38,13 @@ public:
Place a recorded frame to it's corresponding module location.
This simultaneously handles buffering, assembly and flushing.
Also handles concurrency (shared and unique mutexes). **/
Also handles concurrency (shared and unique mutexes).
NOTE: Forced flushing is performed by the current thread.
**/
void emplace(uint64_t pulseID, uint64_t moduleIDX, BufferBinaryFormat& inc_frame){
uint64_t idx = pulseID % m_CAP;
// Cache-line index
const uint64_t idx = pulseID % m_CAP;
// A new frame is starting
if(inc_frame.meta.pulse_id != m_buffer[idx].meta.pulse_id){
@@ -103,6 +110,9 @@ private:
std::vector<uint32_t> m_valid;
std::vector<std::shared_mutex> m_lock;
std::vector<ImageBinaryFormat> m_buffer;
/** Watchdog timer **/
Watchdog m_watchdog;
};
#endif // SF_DAQ_FRAME_CACHE_HPP
+16 -17
View File
@@ -4,34 +4,30 @@
#include <thread>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <iostream>
/** Watchdog timer class
Unless kicked repeatedly, it periodically calls a user-defined function.
**/
class Watchdog{
public:
Watchdog(uint32_t timeout, std::function<void()> callback): m_timeout(timeout), m_callback(callback) {
m_timeout = timeout;
m_callback = callback;
m_running = false;
};
Watchdog(uint32_t timeout, std::function<void()> callback): m_timeout(timeout), m_callback(callback) {};
~Watchdog() { Stop(); };
void Start();
void Stop();
void Kick();
private:
protected:
uint32_t m_timeout;
std::atomic<bool> m_running = false;
std::function<void()> m_callback;
uint32_t m_timeout;
std::chrono::time_point m_lastKick;
std::chrono::time_point m_lastkick;
std::thread m_thread;
std::mutex m_mutex;
steady_clock::time_point m_lastPetTime;
std::condition_variable m_stopCondition;
void Loop();
};
@@ -40,13 +36,13 @@ void Watchdog::Start(){
std::unique_lock<std::mutex> lock(m_mutex);
if(m_running == false){
m_running = true;
m_lastKick = std::chrono::steady_clock::now();
m_lastkick = std::chrono::steady_clock::now();
m_thread = std::thread(&Watchdog::Loop, this);
}
}
void Watchdog::Stop(){
std::unique_lock<std::mutex> locker(m_mutex);
std::unique_lock<std::mutex> g_guard(m_mutex);
if(m_running == true){
m_running = false;
m_thread.join();
@@ -54,17 +50,20 @@ void Watchdog::Stop(){
}
void Watchdog::Kick(){
std::unique_lock<std::mutex> locker(m_mutex);
m_lastKick = steady_clock::now();
std::unique_lock<std::mutex> g_guard(m_mutex);
m_lastkick = std::chrono::steady_clock::now();
}
void Watchdog::Loop(){
while(m_running){
if((std::chrono::now() - m_last_kick) < m_timeout){
if((std::chrono::now() - m_lastkick) < m_timeout){
std::this_thread::sleep_for(std::chrono::milliseconds(1));
} else {
std::cout << "Expired timer" << std::endl;
m_callback();
// Infinite re-kick
std::unique_lock<std::mutex> g_guard(m_mutex);
m_lastkick = std::chrono::steady_clock::now();
}
}
}