Added drain

This commit is contained in:
2021-07-13 12:10:43 +02:00
parent 905c231036
commit a5171bccc2
4 changed files with 70 additions and 30 deletions
+2 -1
View File
@@ -1,6 +1,7 @@
#ifndef SF_DAQ_BUFFER_TYPEMAP_HPP
#define SF_DAQ_BUFFER_TYPEMAP_HPP
#include <typeindex>
#include <unordered_map>
@@ -27,7 +28,7 @@ struct Type{
};
const std::unordered_map<std::type_index, Type> TypeTable = {
const std::unordered_map<std::type_index, TypeMap> TypeTable = {
{ typeid(void), {sizeof(void), TypeMap::VOID} },
{ typeid(char), {sizeof(char), TypeMap::CHAR} },
{ typeid(int8_t), {sizeof(int8_t), TypeMap::INT8} },
+60 -22
View File
@@ -11,26 +11,36 @@
#include "Watchdog.hpp"
/** Frame cache
/** Frame Cache
Reimplemented threadsafe RamBuffer that handles concurrency internally via mutexes.
Reimplemented thread-safe RamBuffer that handles concurrency internally via mutexes.
The class operates on in-memory arrays via pointer/reference access. It uses a
linearly increasing pulseID index for cache addressing. The standard placement method
ensures that no data corruption occurs, lines are always flushed before overwrite.
A large-enough buffer should ensure that there is sufficient time to retrieve all
data from all detector modules.
TODO: The class is header-only for future template-refactoring.
The cache line is flushed on three occasions:
- A new frame is about to overwrite it (by the frame-worker thread)
- Complete frames are queued for flushing internally (by internal worker)
- Incomplete frames are flushed by a watchdog after a timeout (by watchdog worker)
NOTE: The class is header-only for future template-refactoring.
TODO: Multiple queue workers
**/
class FrameCache{
public:
FrameCache(uint64_t _C, uint64_t N_MOD, std::function<void(ImageBinaryFormat&)> callback):
m_CAP(_C), m_M(N_MOD), m_valid(_C, 0), m_lock(_C),
m_CAP(_C), m_MOD(N_MOD), m_valid(_C, 0), m_fill(_C, 0), m_lock(_C),
m_buffer(_C, ImageBinaryFormat(512*N_MOD, 1024, sizeof(uint16_t))),
f_send(callback), m_watchdog(500, flush_all) {
// Initialize buffer metadata
for(auto& it: m_buffer){ memset(&it.meta, 0, sizeof(it.meta)); }
// Start watchdog
m_watchdog.Start();
// Start drain worker
m_drainer = std::thread(&FrameCache::drain_loop, this);
};
@@ -62,26 +72,17 @@ public:
// Calculate destination pointer and copy data
char* ptr_dest = m_buffer[idx].data.data() + moduleIDX * m_blocksize;
std::memcpy((void*)ptr_dest, (void*)&inc_frame.data, m_blocksize);
}
m_fill[idx]++;
void flush_all(){
for(int64_t idx=0; idx< m_CAP; idx++){
std::unique_lock<std::shared_mutex> p_guard(m_lock[idx]);
flush_line(idx);
// Queue for draining
if(m_fill[idx]==m_MOD-1){
std::cout << "Complete frame at " << idx << "\t(queued for draining)" <<std::endl;
drain_queue.push_back(idx);
}
}
/** Flush and invalidate a line
Flushes a valid cache line and invalidates the associated buffer.
NOTE : It does not lock, that must be done externally! **/
void flush_line(uint64_t idx){
if(m_valid[idx]){
f_send(m_buffer[idx]);
m_valid[idx] = 0;
}
}
protected:
/** Flush and start a new line
Flushes a valid cache line and starts another one from the provided metadata.
@@ -95,12 +96,46 @@ public:
m_buffer[idx].meta.frame_index = inc_frame.frame_index;
m_buffer[idx].meta.daq_rec = inc_frame.daq_rec;
m_buffer[idx].meta.is_good_image = true;
m_fill[idx] = 0;
m_valid[idx] = 1;
}
private:
/** Flush and invalidate a line
Flushes a valid cache line and invalidates the associated buffer.
NOTE : It does not lock, that must be done externally! **/
void flush_line(uint64_t idx){
if(m_valid[idx]){
f_send(m_buffer[idx]);
m_fill[idx] = 0;
m_valid[idx] = 0;
}
}
/** Flush all lines in the buffer**/
void flush_all(){
for(int64_t idx=0; idx< m_CAP; idx++){
std::unique_lock<std::shared_mutex> p_guard(m_lock[idx]);
flush_line(idx);
}
}
/** Drain loop
Flushes a valid cache line and invalidates the associated buffer.
NOTE : It does not lock, that must be done externally! **/
void drain_loop(){
while(true){
if(!drain_fifo.empty()){
uint32_t idx = drain_queue.pop_front();
std::cout << "\tDraining " << idx << std::endl;
flush_line(idx);
}
}
}
/** Variables **/
const uint64_t m_CAP;
const uint64_t m_M;
const uint64_t m_MOD;
const uint64_t m_blocksize = 1024*512*sizeof(uint16_t);
/** Flush function **/
@@ -108,11 +143,14 @@ private:
/** Main container and mutex guard **/
std::vector<uint32_t> m_valid;
std::vector<uint32_t> m_fill;
std::vector<std::shared_mutex> m_lock;
std::vector<ImageBinaryFormat> m_buffer;
/** Watchdog timer **/
/** Watchdog timer and flush queue **/
Watchdog m_watchdog;
std::thread m_drainer;
std::deque<uint32_t> drain_queue();
};
#endif // SF_DAQ_FRAME_CACHE_HPP
+4 -3
View File
@@ -26,9 +26,10 @@
Lightweight wrapper base class to initialize a ZMQ Publisher.
Nothing data specific, but everything is only 'protected'.
It also has an internal mutex that can be used for threadsafe
access to the undelying connection;
It also has an internal mutex that can be used for thread-safe
access to the underlying connection;
**/
template <size_t ZMQ_PUB_IO_THREADS>
class ZmqPublisher {
protected:
const uint16_t m_port;
@@ -39,7 +40,7 @@ class ZmqPublisher {
public:
ZmqPublisher(std::string ip, uint16_t port) :
m_port(port), m_address("tcp://*:" + std::to_string(port)), m_ctx(1), m_socket(m_ctx, ZMQ_PUB) {
m_port(port), m_address("tcp://*:" + std::to_string(port)), m_ctx(ZMQ_PUB_IO_THREADS), m_socket(m_ctx, ZMQ_PUB) {
// Bind the socket
m_socket.bind(m_address.c_str());
std::cout << "Initialized ZMQ publisher at " << m_address << std::endl;
+4 -4
View File
@@ -20,7 +20,7 @@ int main (int argc, char *argv[]) {
std::cout << "Creating ZMQ socket..." << std::endl;
ZmqImagePublisher pub("*", 5200);
ZmqImagePublisher<2> pub("*", 5200);
// ... and extracting sender function
std::function<void(ImageBinaryFormat&)> zmq_publish =
std::bind(&ZmqImagePublisher::sendImage, &pub, std::placeholders::_1);
@@ -33,18 +33,18 @@ int main (int argc, char *argv[]) {
std::bind(&FrameCache::emplace, &cache, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
std::cout << "Creating workers..." << std::endl;
std::cout << "Creating frame workers..." << std::endl;
std::vector<std::shared_ptr<JfjFrameWorker>> vWorkers;
for(int mm=0; mm<config.n_modules; mm++){
// Module name (not really used...)
char m_name[128];
snprintf(m_name, 128, "M%02d", mm);
std::string moduleName(m_name);
std::string moduleName(m_name);
vWorkers.emplace_back( std::make_shared<JfjFrameWorker>(config.start_udp_port+mm, moduleName, mm, push_cb) );
}
std::cout << "Starting worker threads..." << std::endl;
std::cout << "Starting frame worker threads..." << std::endl;
std::vector<std::thread> vThreads;
for(int mm=0; mm<config.n_modules; mm++){
vThreads.push_back( std::thread(&JfjFrameWorker::run, vWorkers[mm].get()) );