From abcaa030ca0c7c82566331116ca795362a7d651a Mon Sep 17 00:00:00 2001 From: Mohacsi Istvan Date: Thu, 1 Jul 2021 17:31:18 +0200 Subject: [PATCH] Saturating testpc --- jfj-combined/include/ZmqImagePublisher.hpp | 4 +- jfj-combined/src/main.cpp | 34 +++++---------- jfj-combined/test/SimulatedDetector.cpp | 20 ++++++--- jfj-combined/test/SimulatedZmqReceiver.cpp | 51 ++++++++++++++++++++++ 4 files changed, 79 insertions(+), 30 deletions(-) create mode 100644 jfj-combined/test/SimulatedZmqReceiver.cpp diff --git a/jfj-combined/include/ZmqImagePublisher.hpp b/jfj-combined/include/ZmqImagePublisher.hpp index b105c78..a53751f 100644 --- a/jfj-combined/include/ZmqImagePublisher.hpp +++ b/jfj-combined/include/ZmqImagePublisher.hpp @@ -70,7 +70,9 @@ class ZmqImagePublisher: public ZmqPublisher { len = m_socket.send(image.data.data(), image.data.size(), 0); ASSERT_TRUE( len >=0, "Failed to send image data" ) - std::cout << "Sent ZMQ stream of pulse: " << image.meta.pulse_id << std::endl; + if(image.meta.pulse_id%100==0){ + std::cout << "Sent ZMQ stream of pulse: " << image.meta.pulse_id << std::endl; + } } }; diff --git a/jfj-combined/src/main.cpp b/jfj-combined/src/main.cpp index ee43a2a..df872d4 100644 --- a/jfj-combined/src/main.cpp +++ b/jfj-combined/src/main.cpp @@ -16,56 +16,44 @@ int main (int argc, char *argv[]) { std::cout << "\tdetector_json_filename: detector config file path." << std::endl; exit(-1); } - const auto config = BufferUtils::read_json_config(std::string(argv[1])); -// // Module name -// char mn[128]; -// snprintf(mn, 128, "M%02d", module_id); -// std::string moduleName(mn); - - std::cout << "Creating ZMQ socket..." << std::endl; -// ZmqImagePublisher pub("129.129.144.76", 5158); - ZmqImagePublisher pub("*", 5158); + ZmqImagePublisher pub("*", 5200); + // ... and extracting sender function std::function zmq_publish = std::bind(&ZmqImagePublisher::sendImage, &pub, std::placeholders::_1); + std::cout << "Creating frame cache..." << std::endl; FrameCache cache(128, 3, zmq_publish); - + // ... and extracting push function std::function push_cb = std::bind(&FrameCache::emplace, &cache, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); + std::cout << "Creating workers..." << std::endl; std::vector> vWorkers; - for(int mm=0; mm(config.start_udp_port+mm, config.detector_name, mm, push_cb) ); + // Module name (not really used...) + char m_name[128]; + snprintf(m_name, 128, "M%02d", mm); + std::string moduleName(m_name); + vWorkers.emplace_back( std::make_shared(config.start_udp_port+mm, moduleName, mm, push_cb) ); } - // JfjFrameWorker W0(5005, "JOCH3M", 0, push_cb); - // JfjFrameWorker W1(5006, "JOCH3M", 1, push_cb); - // JfjFrameWorker W2(5007, "JOCH3M", 2, push_cb); + std::cout << "Starting worker threads..." << std::endl; std::vector vThreads; - for(int mm=0; mm=1) ? sleep_ms : 1; + int sleep_us = atoi(argv[3]); + sleep_us = (sleep_us>=1) ? sleep_us : 1; std::cout << "Starting worker threads..." << std::endl; std::vector vThreads; for(int mm=0; mm +#include +#include + +int main (int argc, char *argv[]){ + if (argc != 2) { + std::cout << "\nERROR\nUsage: jf_buffer_writer [zmq_port]\n"; + exit(-1); + } + + int zmq_port = atoi(argv[1]); + std::string addr("tcp://localhost:" + std::to_string(zmq_port)); + + zmq::context_t context (1); + + // Socket to talk to server + std::cout << "Subscribing to server...\n" << std::endl; + zmq::socket_t subscriber (context, ZMQ_SUB); + subscriber.connect(addr.c_str()); + + // Subscribe to IMAGEDATA + const char *filter = "IMAGEDATA"; + subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter)); + + // Process 100 updates + int num_img = 0; + long total_temp = 0; + zmq::message_t msg_topic; + zmq::message_t msg_meta; + zmq::message_t msg_image; + std::cout << "I'm listening...\n" << std::endl; + for (int idx = 0; idx < 100000; idx++) { + subscriber.recv(&msg_topic); + if(msg_topic.size()==strlen(filter)){ + subscriber.recv(&msg_meta); + subscriber.recv(&msg_image); + num_img++; + } + + if(idx%500==0){ + std::cout << "Received " << idx << " (at size " << msg_image.size() << " )\t Received: " << num_img << std::endl; + } + } + return 0; +}