mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-07 00:24:14 +02:00
Cleaning renaming
This commit is contained in:
@@ -12,6 +12,8 @@
|
||||
#include "../../core-buffer/include/formats.hpp"
|
||||
#include "Watchdog.hpp"
|
||||
|
||||
#define MAX_FIFO_LENGTH 32
|
||||
|
||||
|
||||
/** Frame Cache
|
||||
|
||||
@@ -39,9 +41,8 @@ public:
|
||||
// Initialize buffer metadata
|
||||
for(auto& it: m_buffer){ memset(&it.meta, 0, sizeof(it.meta)); }
|
||||
|
||||
|
||||
// Initialize the watchdog
|
||||
std::function<void()> wd_callback = std::bind(&FrameCache::flush_all, this);
|
||||
|
||||
m_watchdog = new Watchdog(500, wd_callback);
|
||||
m_watchdog->Start();
|
||||
|
||||
@@ -83,8 +84,9 @@ public:
|
||||
|
||||
// Queue for draining
|
||||
if(m_fill[idx]==m_MOD-1){
|
||||
// std::cout << "Complete frame at " << idx << "\t(queued for draining)\tqueue size: " << drain_queue.size() << std::endl;
|
||||
drain_queue.push_back(idx);
|
||||
if(m_fill.size() > MAX_FIFO_LENGTH) {
|
||||
m_drain_queue.push_back(idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -128,13 +130,16 @@ protected:
|
||||
}
|
||||
|
||||
/** Drain loop
|
||||
Flushes a valid cache line and invalidates the associated buffer.
|
||||
NOTE : It does not lock, that must be done externally! **/
|
||||
|
||||
Flushes queued frames from the cache buffer and invalidates line.
|
||||
It also locks the frame for the duration of flushing! **/
|
||||
void drain_loop(){
|
||||
while(true){
|
||||
if(!drain_queue.empty()){
|
||||
uint32_t idx = drain_queue.front();
|
||||
drain_queue.pop_front();
|
||||
if(!m_drain_queue.empty()){
|
||||
uint32_t idx = m_drain_queue.front();
|
||||
m_drain_queue.pop_front();
|
||||
// Lock and flush the frame
|
||||
std::unique_lock<std::shared_mutex> p_guard(m_lock[idx]);
|
||||
flush_line(idx);
|
||||
} else {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(2));
|
||||
@@ -159,7 +164,7 @@ protected:
|
||||
/** Watchdog timer and flush queue **/
|
||||
Watchdog *m_watchdog;
|
||||
std::thread m_drainer;
|
||||
std::deque<uint32_t> drain_queue;
|
||||
std::deque<uint32_t> m_drain_queue;
|
||||
|
||||
};
|
||||
|
||||
@@ -14,12 +14,13 @@ int main (int argc, char *argv[]) {
|
||||
if (argc != 2) {
|
||||
std::cout << "\nUsage: jf_buffer_writer [detector_json_filename]\n";
|
||||
std::cout << "\tdetector_json_filename: detector config file path." << std::endl;
|
||||
std::cout << "\tZMQ publisher port: 5200 (high data rate)" << std::endl;
|
||||
exit(-1);
|
||||
}
|
||||
const auto config = BufferUtils::read_json_config(std::string(argv[1]));
|
||||
|
||||
|
||||
std::cout << "Creating ZMQ socket..." << std::endl;
|
||||
std::cout << "Creating ZMQ sockets..." << std::endl;
|
||||
ZmqImagePublisher pub("*", 5200, 2);
|
||||
// ... and extracting sender function
|
||||
std::function<void(ImageBinaryFormat&)> zmq_publish =
|
||||
|
||||
@@ -1,51 +0,0 @@
|
||||
//
|
||||
// Weather update client in C++
|
||||
// Connects SUB socket to tcp://localhost:5556
|
||||
// Collects weather updates and finds avg temp in zipcode
|
||||
//
|
||||
|
||||
#include <zmq.hpp>
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
|
||||
int main (int argc, char *argv[]){
|
||||
if (argc != 2) {
|
||||
std::cout << "\nERROR\nUsage: jf_buffer_writer [zmq_port]\n";
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
int zmq_port = atoi(argv[1]);
|
||||
std::string addr("tcp://localhost:" + std::to_string(zmq_port));
|
||||
|
||||
zmq::context_t context (1);
|
||||
|
||||
// Socket to talk to server
|
||||
std::cout << "Subscribing to server...\n" << std::endl;
|
||||
zmq::socket_t subscriber (context, ZMQ_SUB);
|
||||
subscriber.connect(addr.c_str());
|
||||
|
||||
// Subscribe to IMAGEDATA
|
||||
const char *filter = "IMAGEDATA";
|
||||
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));
|
||||
|
||||
// Process 100 updates
|
||||
int num_img = 0;
|
||||
long total_temp = 0;
|
||||
zmq::message_t msg_topic;
|
||||
zmq::message_t msg_meta;
|
||||
zmq::message_t msg_image;
|
||||
std::cout << "I'm listening...\n" << std::endl;
|
||||
for (int idx = 0; idx < 100000; idx++) {
|
||||
subscriber.recv(&msg_topic);
|
||||
if(msg_topic.size()==strlen(filter)){
|
||||
subscriber.recv(&msg_meta);
|
||||
subscriber.recv(&msg_image);
|
||||
num_img++;
|
||||
}
|
||||
|
||||
if(idx%500==0){
|
||||
std::cout << "Received " << idx << " (at size " << msg_image.size() << " )\t Received: " << num_img << std::endl;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
#include "gtest/gtest.h"
|
||||
#include "Watchdog.hpp"
|
||||
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <future>
|
||||
|
||||
using namespace std;
|
||||
|
||||
|
||||
uint64_t tick_counter = 0;
|
||||
// Dummy callback to increase tick counter
|
||||
void mock_callback(){
|
||||
tick_counter++;
|
||||
};
|
||||
|
||||
|
||||
TEST(WatchdogTimer, timer_test){
|
||||
Watchdog wDog(100, &mock_callback);
|
||||
|
||||
// Free running
|
||||
wDog.Start();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
ASSERT_EQ(tick_counter, 0);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
ASSERT_EQ(tick_counter, 5);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
ASSERT_EQ(tick_counter, 10);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
ASSERT_EQ(tick_counter, 15);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
ASSERT_EQ(tick_counter, 20);
|
||||
|
||||
// Test Stop()
|
||||
wDog.Stop();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
ASSERT_EQ(tick_counter, 20);
|
||||
|
||||
// Test Kick()
|
||||
tick_counter = 0;
|
||||
wDog.Start();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(250));
|
||||
ASSERT_EQ(tick_counter, 2);
|
||||
for(int ii=0; ii<20 ii++){
|
||||
wDog.Kick();
|
||||
ASSERT_EQ(tick_counter, 2);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(150));
|
||||
ASSERT_EQ(tick_counter, 4);
|
||||
}
|
||||
@@ -1,51 +0,0 @@
|
||||
//
|
||||
// Weather update client in C++
|
||||
// Connects SUB socket to tcp://localhost:5556
|
||||
// Collects weather updates and finds avg temp in zipcode
|
||||
//
|
||||
|
||||
#include <zmq.hpp>
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
|
||||
int main (int argc, char *argv[]){
|
||||
if (argc != 2) {
|
||||
std::cout << "\nERROR\nUsage: jf_buffer_writer [zmq_port]\n";
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
int zmq_port = atoi(argv[1]);
|
||||
std::string addr("tcp://localhost:" + std::to_string(zmq_port));
|
||||
|
||||
zmq::context_t context (1);
|
||||
|
||||
// Socket to talk to server
|
||||
std::cout << "Subscribing to server...\n" << std::endl;
|
||||
zmq::socket_t subscriber (context, ZMQ_SUB);
|
||||
subscriber.connect(addr.c_str());
|
||||
|
||||
// Subscribe to IMAGEDATA
|
||||
const char *filter = "IMAGEDATA";
|
||||
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));
|
||||
|
||||
// Process 100 updates
|
||||
int num_img = 0;
|
||||
long total_temp = 0;
|
||||
zmq::message_t msg_topic;
|
||||
zmq::message_t msg_meta;
|
||||
zmq::message_t msg_image;
|
||||
std::cout << "I'm listening...\n" << std::endl;
|
||||
for (int idx = 0; idx < 100000; idx++) {
|
||||
subscriber.recv(&msg_topic);
|
||||
if(msg_topic.size()==strlen(filter)){
|
||||
subscriber.recv(&msg_meta);
|
||||
subscriber.recv(&msg_image);
|
||||
num_img++;
|
||||
}
|
||||
|
||||
if(idx%500==0){
|
||||
std::cout << "Received " << idx << " (at size " << msg_image.size() << " )\t Received: " << num_img << std::endl;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
@@ -62,7 +62,6 @@ int main (int argc, char *argv[]){
|
||||
for (int idx = 0; idx < 100000; idx++) {
|
||||
// ZMQ guarantees full delivery of multipart massages!
|
||||
// Packets are sent as three part messages: topic + meta + data
|
||||
// Blocks until recv succesfull!
|
||||
subscriber.recv(&msg_topic, 0);
|
||||
subscriber.recv(&msg_meta, 0);
|
||||
subscriber.recv(&msg_data, 0);
|
||||
|
||||
Reference in New Issue
Block a user