Saturating testpc

This commit is contained in:
Mohacsi Istvan
2021-07-01 17:31:18 +02:00
parent 411f71fd6d
commit abcaa030ca
4 changed files with 79 additions and 30 deletions
+3 -1
View File
@@ -70,7 +70,9 @@ class ZmqImagePublisher: public ZmqPublisher {
len = m_socket.send(image.data.data(), image.data.size(), 0);
ASSERT_TRUE( len >=0, "Failed to send image data" )
std::cout << "Sent ZMQ stream of pulse: " << image.meta.pulse_id << std::endl;
if(image.meta.pulse_id%100==0){
std::cout << "Sent ZMQ stream of pulse: " << image.meta.pulse_id << std::endl;
}
}
};
+11 -23
View File
@@ -16,56 +16,44 @@ int main (int argc, char *argv[]) {
std::cout << "\tdetector_json_filename: detector config file path." << std::endl;
exit(-1);
}
const auto config = BufferUtils::read_json_config(std::string(argv[1]));
// // Module name
// char mn[128];
// snprintf(mn, 128, "M%02d", module_id);
// std::string moduleName(mn);
std::cout << "Creating ZMQ socket..." << std::endl;
// ZmqImagePublisher pub("129.129.144.76", 5158);
ZmqImagePublisher pub("*", 5158);
ZmqImagePublisher pub("*", 5200);
// ... and extracting sender function
std::function<void(ImageBinaryFormat&)> zmq_publish =
std::bind(&ZmqImagePublisher::sendImage, &pub, std::placeholders::_1);
std::cout << "Creating frame cache..." << std::endl;
FrameCache cache(128, 3, zmq_publish);
// ... and extracting push function
std::function<void(uint64_t, uint64_t, BufferBinaryFormat&)> push_cb =
std::bind(&FrameCache::emplace, &cache, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
std::cout << "Creating workers..." << std::endl;
std::vector<std::shared_ptr<JfjFrameWorker>> vWorkers;
for(int mm=0; mm<config.n_modules; mm++){
vWorkers.emplace_back( std::make_shared<JfjFrameWorker>(config.start_udp_port+mm, config.detector_name, mm, push_cb) );
// Module name (not really used...)
char m_name[128];
snprintf(m_name, 128, "M%02d", mm);
std::string moduleName(m_name);
vWorkers.emplace_back( std::make_shared<JfjFrameWorker>(config.start_udp_port+mm, moduleName, mm, push_cb) );
}
// JfjFrameWorker W0(5005, "JOCH3M", 0, push_cb);
// JfjFrameWorker W1(5006, "JOCH3M", 1, push_cb);
// JfjFrameWorker W2(5007, "JOCH3M", 2, push_cb);
std::cout << "Starting worker threads..." << std::endl;
std::vector<std::thread> vThreads;
for(int mm=0; mm<config.n_modules; mm++){
vThreads.push_back( std::thread(&JfjFrameWorker::run, vWorkers[mm].get()) );
}
// std::thread T0(&JfjFrameWorker::run, &W0);
// std::thread T1(&JfjFrameWorker::run, &W1);
// std::thread T2(&JfjFrameWorker::run, &W2);
for(auto& it: vThreads){
it.join();
}
//T0.join();
//T1.join();
//T2.join();
std::cout << "Exiting program..." << std::endl;
return 0;
}
+14 -6
View File
@@ -7,7 +7,14 @@
#include "../../core-buffer/include/jungfrau.hpp"
void MockDetector(uint16_t udp_port, int32_t moduleId, int32_t sleep_ms){
inline void busy_sleep(std::chrono::microseconds t) {
// auto end = std::chrono::steady_clock::now() + t - overhead;
auto end = std::chrono::steady_clock::now() + t;
while(std::chrono::steady_clock::now() < end);
}
void MockDetector(uint16_t udp_port, int32_t moduleId, int32_t sleep_us){
auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0);
if(send_socket_fd < 0){std::cout << "Failed to create socket" << std::endl; exit(-1); };
@@ -35,7 +42,8 @@ void MockDetector(uint16_t udp_port, int32_t moduleId, int32_t sleep_ms){
::sendto(send_socket_fd, &send_udp_buffer, sizeof(send_udp_buffer), 0, (sockaddr*) &server_address, sizeof(server_address));
}
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
//std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
busy_sleep(std::chrono::microseconds(sleep_us));
if(ff%1000==0){
std::cout << "Sent " << ff << " frames" << std::endl;
}
@@ -47,21 +55,21 @@ void MockDetector(uint16_t udp_port, int32_t moduleId, int32_t sleep_ms){
int main (int argc, char *argv[]) {
if (argc != 4) {
std::cout << "\nERROR\nUsage: jf_buffer_writer [num_modules] [start_port] [sleep_ms]\n";
std::cout << "\nERROR\nUsage: jf_buffer_writer [num_modules] [start_port] [sleep_us]\n";
exit(-1);
}
int num_modules = atoi(argv[1]);
int start_port = atoi(argv[2]);
int sleep_ms = atoi(argv[3]);
sleep_ms = (sleep_ms>=1) ? sleep_ms : 1;
int sleep_us = atoi(argv[3]);
sleep_us = (sleep_us>=1) ? sleep_us : 1;
std::cout << "Starting worker threads..." << std::endl;
std::vector<std::thread> vThreads;
for(int mm=0; mm<num_modules; mm++){
vThreads.push_back( std::thread(&MockDetector, start_port+mm, mm, sleep_ms) );
vThreads.push_back( std::thread(&MockDetector, start_port+mm, mm, sleep_us) );
}
std::cout << "Threads are up and running..." << std::endl;
@@ -0,0 +1,51 @@
//
// Weather update client in C++
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode
//
#include <zmq.hpp>
#include <iostream>
#include <sstream>
int main (int argc, char *argv[]){
if (argc != 2) {
std::cout << "\nERROR\nUsage: jf_buffer_writer [zmq_port]\n";
exit(-1);
}
int zmq_port = atoi(argv[1]);
std::string addr("tcp://localhost:" + std::to_string(zmq_port));
zmq::context_t context (1);
// Socket to talk to server
std::cout << "Subscribing to server...\n" << std::endl;
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect(addr.c_str());
// Subscribe to IMAGEDATA
const char *filter = "IMAGEDATA";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));
// Process 100 updates
int num_img = 0;
long total_temp = 0;
zmq::message_t msg_topic;
zmq::message_t msg_meta;
zmq::message_t msg_image;
std::cout << "I'm listening...\n" << std::endl;
for (int idx = 0; idx < 100000; idx++) {
subscriber.recv(&msg_topic);
if(msg_topic.size()==strlen(filter)){
subscriber.recv(&msg_meta);
subscriber.recv(&msg_image);
num_img++;
}
if(idx%500==0){
std::cout << "Received " << idx << " (at size " << msg_image.size() << " )\t Received: " << num_img << std::endl;
}
}
return 0;
}