diff --git a/jfj-combined/include/JfjFrameCache.hpp b/jfj-combined/include/FrameCache.hpp similarity index 89% rename from jfj-combined/include/JfjFrameCache.hpp rename to jfj-combined/include/FrameCache.hpp index 609048d..8b7c426 100644 --- a/jfj-combined/include/JfjFrameCache.hpp +++ b/jfj-combined/include/FrameCache.hpp @@ -12,6 +12,8 @@ #include "../../core-buffer/include/formats.hpp" #include "Watchdog.hpp" +#define MAX_FIFO_LENGTH 32 + /** Frame Cache @@ -39,9 +41,8 @@ public: // Initialize buffer metadata for(auto& it: m_buffer){ memset(&it.meta, 0, sizeof(it.meta)); } - + // Initialize the watchdog std::function wd_callback = std::bind(&FrameCache::flush_all, this); - m_watchdog = new Watchdog(500, wd_callback); m_watchdog->Start(); @@ -83,8 +84,9 @@ public: // Queue for draining if(m_fill[idx]==m_MOD-1){ - // std::cout << "Complete frame at " << idx << "\t(queued for draining)\tqueue size: " << drain_queue.size() << std::endl; - drain_queue.push_back(idx); + if(m_fill.size() > MAX_FIFO_LENGTH) { + m_drain_queue.push_back(idx); + } } } @@ -128,13 +130,16 @@ protected: } /** Drain loop - Flushes a valid cache line and invalidates the associated buffer. - NOTE : It does not lock, that must be done externally! **/ + + Flushes queued frames from the cache buffer and invalidates line. + It also locks the frame for the duration of flushing! **/ void drain_loop(){ while(true){ - if(!drain_queue.empty()){ - uint32_t idx = drain_queue.front(); - drain_queue.pop_front(); + if(!m_drain_queue.empty()){ + uint32_t idx = m_drain_queue.front(); + m_drain_queue.pop_front(); + // Lock and flush the frame + std::unique_lock p_guard(m_lock[idx]); flush_line(idx); } else { std::this_thread::sleep_for(std::chrono::milliseconds(2)); @@ -159,7 +164,7 @@ protected: /** Watchdog timer and flush queue **/ Watchdog *m_watchdog; std::thread m_drainer; - std::deque drain_queue; + std::deque m_drain_queue; }; diff --git a/jfj-combined/include/JfjFrameStats.hpp b/jfj-combined/include/FrameStats.hpp similarity index 100% rename from jfj-combined/include/JfjFrameStats.hpp rename to jfj-combined/include/FrameStats.hpp diff --git a/jfj-combined/include/JfjFrameWorker.hpp b/jfj-combined/include/FrameWorker.hpp similarity index 100% rename from jfj-combined/include/JfjFrameWorker.hpp rename to jfj-combined/include/FrameWorker.hpp diff --git a/jfj-combined/src/main.cpp b/jfj-combined/src/main.cpp index 1006f1d..d8d17a1 100644 --- a/jfj-combined/src/main.cpp +++ b/jfj-combined/src/main.cpp @@ -14,12 +14,13 @@ int main (int argc, char *argv[]) { if (argc != 2) { std::cout << "\nUsage: jf_buffer_writer [detector_json_filename]\n"; std::cout << "\tdetector_json_filename: detector config file path." << std::endl; + std::cout << "\tZMQ publisher port: 5200 (high data rate)" << std::endl; exit(-1); } const auto config = BufferUtils::read_json_config(std::string(argv[1])); - std::cout << "Creating ZMQ socket..." << std::endl; + std::cout << "Creating ZMQ sockets..." << std::endl; ZmqImagePublisher pub("*", 5200, 2); // ... and extracting sender function std::function zmq_publish = diff --git a/jfj-combined/test/SimulatedZmqReceiver.cpp b/jfj-combined/test/SimulatedZmqReceiver.cpp deleted file mode 100644 index 499785d..0000000 --- a/jfj-combined/test/SimulatedZmqReceiver.cpp +++ /dev/null @@ -1,51 +0,0 @@ -// -// Weather update client in C++ -// Connects SUB socket to tcp://localhost:5556 -// Collects weather updates and finds avg temp in zipcode -// - -#include -#include -#include - -int main (int argc, char *argv[]){ - if (argc != 2) { - std::cout << "\nERROR\nUsage: jf_buffer_writer [zmq_port]\n"; - exit(-1); - } - - int zmq_port = atoi(argv[1]); - std::string addr("tcp://localhost:" + std::to_string(zmq_port)); - - zmq::context_t context (1); - - // Socket to talk to server - std::cout << "Subscribing to server...\n" << std::endl; - zmq::socket_t subscriber (context, ZMQ_SUB); - subscriber.connect(addr.c_str()); - - // Subscribe to IMAGEDATA - const char *filter = "IMAGEDATA"; - subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter)); - - // Process 100 updates - int num_img = 0; - long total_temp = 0; - zmq::message_t msg_topic; - zmq::message_t msg_meta; - zmq::message_t msg_image; - std::cout << "I'm listening...\n" << std::endl; - for (int idx = 0; idx < 100000; idx++) { - subscriber.recv(&msg_topic); - if(msg_topic.size()==strlen(filter)){ - subscriber.recv(&msg_meta); - subscriber.recv(&msg_image); - num_img++; - } - - if(idx%500==0){ - std::cout << "Received " << idx << " (at size " << msg_image.size() << " )\t Received: " << num_img << std::endl; - } - } - return 0; -} diff --git a/jfj-combined/test/SimulatedDetector.cpp b/jfj-combined/test/mock/SimulatedDetector.cpp similarity index 100% rename from jfj-combined/test/SimulatedDetector.cpp rename to jfj-combined/test/mock/SimulatedDetector.cpp diff --git a/jfj-combined/test/dummy_detector.json b/jfj-combined/test/mock/dummy_detector.json similarity index 100% rename from jfj-combined/test/dummy_detector.json rename to jfj-combined/test/mock/dummy_detector.json diff --git a/jfj-combined/test/test_Watchdog.cpp b/jfj-combined/test/test_Watchdog.cpp new file mode 100644 index 0000000..df18012 --- /dev/null +++ b/jfj-combined/test/test_Watchdog.cpp @@ -0,0 +1,51 @@ +#include "gtest/gtest.h" +#include "Watchdog.hpp" + +#include +#include +#include + +using namespace std; + + +uint64_t tick_counter = 0; +// Dummy callback to increase tick counter +void mock_callback(){ + tick_counter++; +}; + + +TEST(WatchdogTimer, timer_test){ + Watchdog wDog(100, &mock_callback); + + // Free running + wDog.Start(); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + ASSERT_EQ(tick_counter, 0); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + ASSERT_EQ(tick_counter, 5); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + ASSERT_EQ(tick_counter, 10); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + ASSERT_EQ(tick_counter, 15); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + ASSERT_EQ(tick_counter, 20); + + // Test Stop() + wDog.Stop(); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + ASSERT_EQ(tick_counter, 20); + + // Test Kick() + tick_counter = 0; + wDog.Start(); + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + ASSERT_EQ(tick_counter, 2); + for(int ii=0; ii<20 ii++){ + wDog.Kick(); + ASSERT_EQ(tick_counter, 2); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + ASSERT_EQ(tick_counter, 4); +} diff --git a/zmq-receiver/src/SimulatedZmqReceiver.cpp b/zmq-receiver/src/SimulatedZmqReceiver.cpp deleted file mode 100644 index 499785d..0000000 --- a/zmq-receiver/src/SimulatedZmqReceiver.cpp +++ /dev/null @@ -1,51 +0,0 @@ -// -// Weather update client in C++ -// Connects SUB socket to tcp://localhost:5556 -// Collects weather updates and finds avg temp in zipcode -// - -#include -#include -#include - -int main (int argc, char *argv[]){ - if (argc != 2) { - std::cout << "\nERROR\nUsage: jf_buffer_writer [zmq_port]\n"; - exit(-1); - } - - int zmq_port = atoi(argv[1]); - std::string addr("tcp://localhost:" + std::to_string(zmq_port)); - - zmq::context_t context (1); - - // Socket to talk to server - std::cout << "Subscribing to server...\n" << std::endl; - zmq::socket_t subscriber (context, ZMQ_SUB); - subscriber.connect(addr.c_str()); - - // Subscribe to IMAGEDATA - const char *filter = "IMAGEDATA"; - subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter)); - - // Process 100 updates - int num_img = 0; - long total_temp = 0; - zmq::message_t msg_topic; - zmq::message_t msg_meta; - zmq::message_t msg_image; - std::cout << "I'm listening...\n" << std::endl; - for (int idx = 0; idx < 100000; idx++) { - subscriber.recv(&msg_topic); - if(msg_topic.size()==strlen(filter)){ - subscriber.recv(&msg_meta); - subscriber.recv(&msg_image); - num_img++; - } - - if(idx%500==0){ - std::cout << "Received " << idx << " (at size " << msg_image.size() << " )\t Received: " << num_img << std::endl; - } - } - return 0; -} diff --git a/zmq-receiver/src/main.cpp b/zmq-receiver/src/main.cpp index 83c5dc9..5c6470c 100644 --- a/zmq-receiver/src/main.cpp +++ b/zmq-receiver/src/main.cpp @@ -62,7 +62,6 @@ int main (int argc, char *argv[]){ for (int idx = 0; idx < 100000; idx++) { // ZMQ guarantees full delivery of multipart massages! // Packets are sent as three part messages: topic + meta + data - // Blocks until recv succesfull! subscriber.recv(&msg_topic, 0); subscriber.recv(&msg_meta, 0); subscriber.recv(&msg_data, 0);