mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-04-27 19:52:22 +02:00
Added drain
This commit is contained in:
@@ -3,9 +3,10 @@
|
||||
|
||||
#include <typeindex>
|
||||
#include <unordered_map>
|
||||
#include <complex>
|
||||
|
||||
|
||||
enum class TypeMap {
|
||||
enum class TypeList {
|
||||
VOID,
|
||||
CHAR,
|
||||
INT8,
|
||||
@@ -19,47 +20,50 @@ enum class TypeMap {
|
||||
FLOAT,
|
||||
DOUBLE,
|
||||
COMPLEX_FLOAT,
|
||||
COMPLEX_DOUBLE
|
||||
COMPLEX_DOUBLE,
|
||||
};
|
||||
|
||||
struct Type{
|
||||
|
||||
struct DataType{
|
||||
const size_t size;
|
||||
const int value;
|
||||
};
|
||||
|
||||
|
||||
const std::unordered_map<std::type_index, TypeMap> TypeTable = {
|
||||
{ typeid(void), {sizeof(void), TypeMap::VOID} },
|
||||
{ typeid(char), {sizeof(char), TypeMap::CHAR} },
|
||||
{ typeid(int8_t), {sizeof(int8_t), TypeMap::INT8} },
|
||||
{ typeid(uint8_t), {sizeof(uint8_t), TypeMap::UINT8} },
|
||||
{ typeid(int16_t), {sizeof(int16_t), TypeMap::INT16} },
|
||||
{ typeid(uint16_t), {sizeof(uint16_t), TypeMap::UINT16} },
|
||||
{ typeid(int32_t), {sizeof(int32_t), TypeMap::INT32} },
|
||||
{ typeid(uint32_t), {sizeof(uint32_t), TypeMap::UINT32} },
|
||||
{ typeid(int64_t), {sizeof(int64_t), TypeMap::INT64} },
|
||||
{ typeid(uint64_t), {sizeof(uint64_t), TypeMap::UINT64} },
|
||||
{ typeid(float), {sizeof(float), TypeMap::float} },
|
||||
{ typeid(double), {sizeof(double), TypeMap::DOUBLE} },
|
||||
{ typeid(std::complex<float>), {sizeof(std::complex<float>), TypeMap::COMPLEX_FLOAT} },
|
||||
{ typeid(std::complex<double>), {sizeof(std::complex<double>), TypeMap::COMPLEX_DOUBLE} }
|
||||
const std::unordered_map<std::type_index, DataType> TypeTable = {
|
||||
{ typeid(void), {sizeof(char), (int)TypeList::VOID} },
|
||||
{ typeid(char), {sizeof(char), (int)TypeList::CHAR} },
|
||||
{ typeid(int8_t), {sizeof(int8_t), (int)TypeList::INT8} },
|
||||
{ typeid(uint8_t), {sizeof(uint8_t), (int)TypeList::UINT8} },
|
||||
{ typeid(int16_t), {sizeof(int16_t), (int)TypeList::INT16} },
|
||||
{ typeid(uint16_t), {sizeof(uint16_t), (int)TypeList::UINT16} },
|
||||
{ typeid(int32_t), {sizeof(int32_t), (int)TypeList::INT32} },
|
||||
{ typeid(uint32_t), {sizeof(uint32_t), (int)TypeList::UINT32} },
|
||||
{ typeid(int64_t), {sizeof(int64_t), (int)TypeList::INT64} },
|
||||
{ typeid(uint64_t), {sizeof(uint64_t), (int)TypeList::UINT64} },
|
||||
{ typeid(float), {sizeof(float), (int)TypeList::FLOAT} },
|
||||
{ typeid(double), {sizeof(double), (int)TypeList::DOUBLE} },
|
||||
{ typeid(std::complex<float>), {sizeof(std::complex<float>), (int)TypeList::COMPLEX_FLOAT} },
|
||||
{ typeid(std::complex<double>), {sizeof(std::complex<double>), (int)TypeList::COMPLEX_DOUBLE} },
|
||||
};
|
||||
|
||||
const std::unordered_map<int , Type> TypeTable = {
|
||||
{ TypeMap::VOID, {sizeof(void), TypeMap::VOID} },
|
||||
{ TypeMap::CHAR, {sizeof(char), TypeMap::CHAR} },
|
||||
{ TypeMap::INT8, {sizeof(int8_t), TypeMap::INT8} },
|
||||
{ TypeMap::UINT8, {sizeof(uint8_t), TypeMap::UINT8} },
|
||||
{ TypeMap::INT16, {sizeof(int16_t), TypeMap::INT16} },
|
||||
{ TypeMap::UINT16, {sizeof(uint16_t), TypeMap::UINT16} },
|
||||
{ TypeMap::INT32, {sizeof(int32_t), TypeMap::INT32} },
|
||||
{ TypeMap::UINT32, {sizeof(uint32_t), TypeMap::UINT32} },
|
||||
{ TypeMap::INT64, {sizeof(int64_t), TypeMap::INT64} },
|
||||
{ TypeMap::UINT64, {sizeof(uint64_t), TypeMap::UINT64} },
|
||||
{ TypeMap::FLOAT, {sizeof(float), TypeMap::float} },
|
||||
{ TypeMap::DOUBLE, {sizeof(double), TypeMap::DOUBLE} },
|
||||
{ TypeMap::COMPLEX_FLOAT, {sizeof(std::complex<float>), TypeMap::COMPLEX_FLOAT} },
|
||||
{ TypeMap::COMPLEX_DOUBLE, {sizeof(std::complex<double>), TypeMap::COMPLEX_DOUBLE} }
|
||||
|
||||
const std::unordered_map<int , DataType> TypeMap = {
|
||||
{ (int)TypeList::VOID, {sizeof(char), (int)TypeList::VOID} },
|
||||
{ (int)TypeList::CHAR, {sizeof(char), (int)TypeList::CHAR} },
|
||||
{ (int)TypeList::INT8, {sizeof(int8_t), (int)TypeList::INT8} },
|
||||
{ (int)TypeList::UINT8, {sizeof(uint8_t), (int)TypeList::UINT8} },
|
||||
{ (int)TypeList::INT16, {sizeof(int16_t), (int)TypeList::INT16} },
|
||||
{ (int)TypeList::UINT16, {sizeof(uint16_t), (int)TypeList::UINT16} },
|
||||
{ (int)TypeList::INT32, {sizeof(int32_t), (int)TypeList::INT32} },
|
||||
{ (int)TypeList::UINT32, {sizeof(uint32_t), (int)TypeList::UINT32} },
|
||||
{ (int)TypeList::INT64, {sizeof(int64_t), (int)TypeList::INT64} },
|
||||
{ (int)TypeList::UINT64, {sizeof(uint64_t), (int)TypeList::UINT64} },
|
||||
{ (int)TypeList::FLOAT, {sizeof(float), (int)TypeList::FLOAT} },
|
||||
{ (int)TypeList::DOUBLE, {sizeof(double), (int)TypeList::DOUBLE} },
|
||||
{ (int)TypeList::COMPLEX_FLOAT, {sizeof(std::complex<float>), (int)TypeList::COMPLEX_FLOAT} },
|
||||
{ (int)TypeList::COMPLEX_DOUBLE, {sizeof(std::complex<double>), (int)TypeList::COMPLEX_DOUBLE} },
|
||||
};
|
||||
|
||||
|
||||
#endif // SF_DAQ_BUFFER_TYPEMAP_HPP
|
||||
|
||||
@@ -38,7 +38,7 @@ struct ImageMetadata {
|
||||
|
||||
uint64_t user_1; // extra field for custom needs
|
||||
uint64_t user_2; // extra field for custom needs
|
||||
}
|
||||
};
|
||||
#pragma pack(pop)
|
||||
|
||||
struct ModuleFrameBuffer {
|
||||
|
||||
@@ -169,7 +169,7 @@ char* RamBuffer::read_image(const uint64_t pulse_id) const
|
||||
void RamBuffer::write_image(const ImageMetadata& src_meta, const char *src_data)
|
||||
{
|
||||
const int slot_n = src_meta.id % n_slots_;
|
||||
const int image_n_bytes = src_meta.height * src_meta.width * TypeMap[src_meta.dtype].size;
|
||||
const int image_n_bytes = src_meta.height * src_meta.width * TypeMap.at(src_meta.dtype).size;
|
||||
|
||||
char *dst_data = image_buffer_ + (image_n_bytes * slot_n);
|
||||
|
||||
|
||||
@@ -30,15 +30,19 @@
|
||||
**/
|
||||
class FrameCache{
|
||||
public:
|
||||
FrameCache(uint64_t _C, uint64_t N_MOD, std::function<void(ImageBinaryFormat&)> callback):
|
||||
m_CAP(_C), m_MOD(N_MOD), m_valid(_C, 0), m_fill(_C, 0), m_lock(_C),
|
||||
m_buffer(_C, ImageBinaryFormat(512*N_MOD, 1024, sizeof(uint16_t))),
|
||||
f_send(callback), m_watchdog(500, flush_all) {
|
||||
|
||||
FrameCache(uint64_t N_CAP, uint64_t N_MOD, std::function<void(ImageBinaryFormat&)> callback):
|
||||
m_CAP(N_CAP), m_MOD(N_MOD), m_valid(N_CAP, 0), m_fill(N_CAP, 0), m_lock(N_CAP),
|
||||
m_buffer(N_CAP, ImageBinaryFormat(512*N_MOD, 1024, sizeof(uint16_t))),
|
||||
f_send(callback) {
|
||||
// Initialize buffer metadata
|
||||
for(auto& it: m_buffer){ memset(&it.meta, 0, sizeof(it.meta)); }
|
||||
// Start watchdog
|
||||
m_watchdog.Start();
|
||||
|
||||
|
||||
std::function<void()> wd_callback = std::bind(&FrameCache::flush_all, this);
|
||||
|
||||
m_watchdog = new Watchdog(500, wd_callback);
|
||||
m_watchdog->Start();
|
||||
|
||||
// Start drain worker
|
||||
m_drainer = std::thread(&FrameCache::drain_loop, this);
|
||||
};
|
||||
@@ -57,11 +61,11 @@ public:
|
||||
const uint64_t idx = pulseID % m_CAP;
|
||||
|
||||
// A new frame is starting
|
||||
if(inc_frame.meta.pulse_id != m_buffer[idx].meta.pulse_id){
|
||||
if(inc_frame.meta.pulse_id != m_buffer[idx].meta.id){
|
||||
// Unique lock to flush and start a new one
|
||||
std::unique_lock<std::shared_mutex> p_guard(m_lock[idx]);
|
||||
// Check if condition persists after getting the mutex
|
||||
if(inc_frame.meta.pulse_id != m_buffer[idx].meta.pulse_id){
|
||||
if(inc_frame.meta.pulse_id != m_buffer[idx].meta.id){
|
||||
start_line(idx, inc_frame.meta);
|
||||
}
|
||||
}
|
||||
@@ -73,6 +77,7 @@ public:
|
||||
char* ptr_dest = m_buffer[idx].data.data() + moduleIDX * m_blocksize;
|
||||
std::memcpy((void*)ptr_dest, (void*)&inc_frame.data, m_blocksize);
|
||||
m_fill[idx]++;
|
||||
m_watchdog->Kick();
|
||||
|
||||
// Queue for draining
|
||||
if(m_fill[idx]==m_MOD-1){
|
||||
@@ -92,10 +97,10 @@ protected:
|
||||
if(m_valid[idx]){ f_send(m_buffer[idx]); }
|
||||
|
||||
// 2. Init new frame
|
||||
m_buffer[idx].meta.pulse_id = inc_frame.pulse_id;
|
||||
m_buffer[idx].meta.frame_index = inc_frame.frame_index;
|
||||
m_buffer[idx].meta.daq_rec = inc_frame.daq_rec;
|
||||
m_buffer[idx].meta.is_good_image = true;
|
||||
m_buffer[idx].meta.id = inc_frame.pulse_id;
|
||||
m_buffer[idx].meta.user_1 = inc_frame.frame_index;
|
||||
m_buffer[idx].meta.user_2 = inc_frame.daq_rec;
|
||||
m_buffer[idx].meta.status = true;
|
||||
m_fill[idx] = 0;
|
||||
m_valid[idx] = 1;
|
||||
}
|
||||
@@ -148,9 +153,10 @@ protected:
|
||||
std::vector<ImageBinaryFormat> m_buffer;
|
||||
|
||||
/** Watchdog timer and flush queue **/
|
||||
Watchdog m_watchdog;
|
||||
std::thread m_drainer;
|
||||
Watchdog *m_watchdog;
|
||||
std::thread m_drainer;
|
||||
std::deque<uint32_t> drain_queue();
|
||||
|
||||
};
|
||||
|
||||
#endif // SF_DAQ_FRAME_CACHE_HPP
|
||||
|
||||
@@ -14,17 +14,17 @@
|
||||
**/
|
||||
class Watchdog{
|
||||
public:
|
||||
Watchdog(uint32_t timeout, std::function<void()> callback): m_timeout(timeout), m_callback(callback) {};
|
||||
Watchdog(int64_t timeout, std::function<void()> callback): m_timeout(timeout), m_callback(callback) {};
|
||||
~Watchdog() { Stop(); };
|
||||
void Start();
|
||||
void Stop();
|
||||
void Kick();
|
||||
|
||||
protected:
|
||||
uint32_t m_timeout;
|
||||
int64_t m_timeout;
|
||||
std::atomic<bool> m_running = false;
|
||||
std::function<void()> m_callback;
|
||||
std::chrono::time_point m_lastkick;
|
||||
std::chrono::time_point<std::chrono::steady_clock> m_lastkick;
|
||||
|
||||
std::thread m_thread;
|
||||
std::mutex m_mutex;
|
||||
@@ -55,8 +55,11 @@ void Watchdog::Kick(){
|
||||
}
|
||||
|
||||
void Watchdog::Loop(){
|
||||
std::cout << "Starting watchdog" << std::endl;
|
||||
while(m_running){
|
||||
if((std::chrono::now() - m_lastkick) < m_timeout){
|
||||
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - m_lastkick);
|
||||
if(elapsed.count() < m_timeout){
|
||||
// std::cout << "Elapsed " << (int64_t)elapsed.count() << " of " << m_timeout << std::endl;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||
} else {
|
||||
std::cout << "Expired timer" << std::endl;
|
||||
|
||||
@@ -71,8 +71,8 @@ class ZmqImagePublisher: public ZmqPublisher {
|
||||
len = m_socket.send(image.data.data(), image.data.size(), 0);
|
||||
ASSERT_TRUE( len >=0, "Failed to send image data" )
|
||||
|
||||
if(image.meta.pulse_id%100==0){
|
||||
std::cout << "Sent ZMQ stream of pulse: " << image.meta.pulse_id << std::endl;
|
||||
if(image.meta.id%100==0){
|
||||
std::cout << "Sent ZMQ stream of pulse: " << image.meta.id << std::endl;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user