From 4fb74ab1b813b81477e70e7ed0d6533d76b7a91f Mon Sep 17 00:00:00 2001 From: mohacsi_i Date: Mon, 4 Oct 2021 18:12:09 +0200 Subject: [PATCH] Small changes for speedup --- CMakeLists.txt | 3 +- jf-zmqstreamer/CMakeLists.txt | 3 +- sim-jungfrau/CMakeLists.txt | 12 ++++ sim-jungfrau/src/main.cpp | 84 +++++++++++++++++++++++++++ sim-jungfrau/test/CMakeLists.txt | 0 sim-jungfrau/test/dummy_detector.json | 13 +++++ zmq-receiver/include/BufferTypes.hpp | 53 +++++++++++++---- zmq-receiver/src/main.cpp | 15 ++++- 8 files changed, 166 insertions(+), 17 deletions(-) create mode 100644 sim-jungfrau/CMakeLists.txt create mode 100644 sim-jungfrau/src/main.cpp create mode 100644 sim-jungfrau/test/CMakeLists.txt create mode 100644 sim-jungfrau/test/dummy_detector.json diff --git a/CMakeLists.txt b/CMakeLists.txt index 03e67fe..ada48b8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,10 +34,11 @@ add_subdirectory("core-buffer") add_subdirectory("jf-udp-recv") add_subdirectory("jf-buffer-writer") add_subdirectory("jf-assembler") -add_subdirectory("jf-zmqstreamer") add_subdirectory("sf-stream") add_subdirectory("sf-writer") +add_subdirectory("jf-zmqstreamer") add_subdirectory("zmq-receiver") +add_subdirectory("sim-jungfrau") if(BUILD_JF_LIVE_WRITER) diff --git a/jf-zmqstreamer/CMakeLists.txt b/jf-zmqstreamer/CMakeLists.txt index c92a33e..a285a40 100644 --- a/jf-zmqstreamer/CMakeLists.txt +++ b/jf-zmqstreamer/CMakeLists.txt @@ -1,12 +1,11 @@ file(GLOB SOURCES src/*.cpp) -set(CMAKE_CXX_STANDARD 20) add_library(jf-zmqstreamer-lib STATIC ${SOURCES}) target_include_directories(jf-zmqstreamer-lib PUBLIC include/) target_link_libraries(jf-zmqstreamer-lib external core-buffer-lib) add_executable(jf-zmqstreamer src/main.cpp) -set_target_properties(jf-zmqstreamer PROPERTIES OUTPUT_NAME jfj_combined) +set_target_properties(jf-zmqstreamer PROPERTIES OUTPUT_NAME jf_zmqstreamer) target_link_libraries(jf-zmqstreamer jf-zmqstreamer-lib zmq rt pthread) enable_testing() diff --git a/sim-jungfrau/CMakeLists.txt b/sim-jungfrau/CMakeLists.txt new file mode 100644 index 0000000..02e32bc --- /dev/null +++ b/sim-jungfrau/CMakeLists.txt @@ -0,0 +1,12 @@ +file(GLOB SOURCES src/*.cpp) + +# add_library(sim-jungfrau-lib STATIC ${SOURCES}) +# target_include_directories(sim-jungfrau-lib PUBLIC include/) +# target_link_libraries(sim-jungfrau-lib external core-buffer-lib) + +add_executable(sim-jungfrau src/main.cpp) +set_target_properties(sim-jungfrau PROPERTIES OUTPUT_NAME sim_jungfrau) +target_link_libraries(sim-jungfrau zmq rt pthread) + +enable_testing() +add_subdirectory(test/) diff --git a/sim-jungfrau/src/main.cpp b/sim-jungfrau/src/main.cpp new file mode 100644 index 0000000..a840e79 --- /dev/null +++ b/sim-jungfrau/src/main.cpp @@ -0,0 +1,84 @@ +#include +#include +#include +#include +#include +#include +#include "../../core-buffer/include/jungfrau.hpp" + + +inline void busy_sleep(std::chrono::microseconds t) { + // auto end = std::chrono::steady_clock::now() + t - overhead; + auto end = std::chrono::steady_clock::now() + t; + while(std::chrono::steady_clock::now() < end); +} + + +void MockDetector(uint16_t udp_port, int32_t moduleId, int32_t sleep_us){ + auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); + if(send_socket_fd < 0){std::cout << "Failed to create socket" << std::endl; exit(-1); }; + + struct sockaddr_in server_address, client_address; + memset(&server_address, 0, sizeof(server_address)); + memset(&client_address, 0, sizeof(client_address)); + + // Filling server information + server_address.sin_family = AF_INET; // IPv4 + server_address.sin_addr.s_addr = INADDR_ANY; + server_address.sin_port = htons(udp_port); + + // Send loop + jungfrau_packet send_udp_buffer; + memset(&send_udp_buffer, 0, sizeof(send_udp_buffer)); + + for(int64_t ff=0; ff<10000; ff++){ + send_udp_buffer.framenum = ff; + send_udp_buffer.bunchid = ff; + send_udp_buffer.moduleID = moduleId; + send_udp_buffer.debug = 0; + + for(int64_t pp=0; pp<128; pp++){ + send_udp_buffer.packetnum = pp; + ::sendto(send_socket_fd, &send_udp_buffer, sizeof(send_udp_buffer), 0, (sockaddr*) &server_address, sizeof(server_address)); + } + + //std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); + busy_sleep(std::chrono::microseconds(sleep_us)); + if(ff%1000==0){ + std::cout << "Sent " << ff << " frames" << std::endl; + } + } + + //close(send_socket_fd); +} + + +int main (int argc, char *argv[]) { + if (argc != 4) { + std::cout << "\nStart a UDP stream from a simulated Jungfrau detector\n"; + std::cout << "Usage:\tsim_jungfrau [num_modules] [start_port] [sleep_us]\n"; + std::cout << std::endl; + exit(-1); + } + + int num_modules = atoi(argv[1]); + int start_port = atoi(argv[2]); + int sleep_us = atoi(argv[3]); + sleep_us = (sleep_us>=1) ? sleep_us : 1; + + + std::cout << "Starting worker threads..." << std::endl; + std::vector vThreads; + + for(int mm=0; mm #include #include "rapidjson/document.h" @@ -48,6 +49,7 @@ public: hsh.set("status", std::vector(bs) ); hsh.set("user_1", std::vector(bs) ); hsh.set("user_2", std::vector(bs) ); + hsh.set("shape", std::vector>(bs) ); }; bool is_full(){ @@ -55,15 +57,20 @@ public: }; void append(void* meta, size_t meta_size, void* data, size_t data_size){ - + std::chrono::time_point t1, t2; + std::chrono::duration ms_double; + + t1 = std::chrono::high_resolution_clock::now(); + std::string jason_string((char*)meta, meta_size); - std::cout << jason_string << std::endl; - rapidjson::Document jason_parsed; - jason_parsed.Parse(jason_string.c_str()); + if(jason_parsed["id"].GetInt() % 20 ==0){ std::cout << jason_string << std::endl; } + + t2 = std::chrono::high_resolution_clock::now(); + ms_double = t2 - t1; + std::cout << " JSON parsing took: " << ms_double.count() << " ms" << std::endl; - //std::cout << "NI" << std::endl; // Enforce flushing when full if(is_full()){ write_to_disk(); } @@ -77,6 +84,9 @@ public: //} // Update the hash + t1 = std::chrono::high_resolution_clock::now(); + + hsh.get&>("version")[write_idx] = jason_parsed["version"].GetInt(); hsh.get&>("id")[write_idx] = jason_parsed["id"].GetInt(); hsh.get&>("height")[write_idx] = jason_parsed["height"].GetInt(); @@ -87,12 +97,29 @@ public: hsh.get&>("status")[write_idx] = jason_parsed["status"].GetInt(); hsh.get&>("user_1")[write_idx] = jason_parsed["user_1"].GetInt(); hsh.get&>("user_2")[write_idx] = jason_parsed["user_2"].GetInt(); + - std::cout << "hashed" << std::endl; + + std::vector shape; + const auto& s = jason_parsed["shape"]; + for(auto& it : s.GetArray()){ shape.push_back(it.GetInt()); } + hsh.get>&>("shape")[write_idx] = shape; - // Hard coded type for now - auto data_buf = hsh.get&>("data"); - std::memcpy(&data_buf[write_idx*m_block_size], data, std::min(data_size, m_block_size)); + t2 = std::chrono::high_resolution_clock::now(); + ms_double = t2 - t1; + std::cout << " Hash update took: " << ms_double.count() << " ms" << std::endl; + + // Hard coded type for now + // NOTE: There's a massive performance bottleneck here if this is pre fetched! + //std::vector& data_buf = hsh.get&>("data"); + + t1 = std::chrono::high_resolution_clock::now(); + + std::memcpy(&hsh.get&>("data")[write_idx*m_block_size], data, std::min(data_size, m_block_size)); + + t2 = std::chrono::high_resolution_clock::now(); + ms_double = t2 - t1; + std::cout << " Data copy took: " << ms_double.count() << " ms (size: " << std::min(data_size, m_block_size) << " )" << std::endl; // Pop index write_idx++; @@ -132,9 +159,11 @@ public: writer.writeVector(hsh.get&>("user_2"), "/data/" + detector_name + "/user_2"); std::cout << "Writing data: " << hsh.get&>("data").size() << std::endl; - writer.writeArray(hsh.get&>("data"), {m_buffer_size, hsh.get&>("height")[0], hsh.get&>("height")[0], 1}, "/data/" + detector_name + "/data"); - - + std::vector array_shape = {m_buffer_size}; + auto slice_shape =hsh.get>&>("shape")[0]; + array_shape.insert(array_shape.end(), slice_shape.begin(), slice_shape.end()); + writer.writeArray(hsh.get&>("data"), array_shape, "/data/" + detector_name + "/data"); + run_id++; }; diff --git a/zmq-receiver/src/main.cpp b/zmq-receiver/src/main.cpp index 487d826..a829582 100644 --- a/zmq-receiver/src/main.cpp +++ b/zmq-receiver/src/main.cpp @@ -1,10 +1,15 @@ #include #include #include +#include #include "../../core-buffer/include/buffer_config.hpp" #include "../include/BufferTypes.hpp" + + + + int main (int argc, char *argv[]){ if (argc != 4) { std::cout << "\nERROR\nUsage: jf_buffer_writer [zmq_topic] [zmq_sub_addr] [detector_name]\n"; @@ -29,7 +34,8 @@ int main (int argc, char *argv[]){ // Subscribe to TOPIC (expected schema) zmq::socket_t subscriber (context, ZMQ_SUB); subscriber.connect(sub_addr.c_str()); - subscriber.setsockopt(ZMQ_SUBSCRIBE, topic.c_str(), topic.size()); + //subscriber.setsockopt(ZMQ_SUBSCRIBE, topic.c_str(), topic.size()); + subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0); // Publisher to ipc std::cout << "Crating publisher...\n" << std::endl; @@ -65,13 +71,18 @@ int main (int argc, char *argv[]){ for (int idx = 0; idx < 100000; idx++) { // ZMQ guarantees full delivery of multipart massages! // Packets are sent as three part messages: topic + meta + data - subscriber.recv(&msg_topic, 0); + //subscriber.recv(&msg_topic, 0); subscriber.recv(&msg_meta, 0); subscriber.recv(&msg_data, 0); // Schema (topic) specific saving) if(topic=="IMAGEDATA"){ + auto t1 = std::chrono::high_resolution_clock::now(); cache.append((void*)msg_meta.data(), msg_meta.size(), (void*)msg_data.data(), msg_data.size()); + auto t2 = std::chrono::high_resolution_clock::now(); + std::chrono::duration ms_double = t2 - t1; + + std::cout << "Append took: " << ms_double.count() << " ms" << std::endl; //buffer.write_image((ImageMetadata*)msg_meta.data(), (char*)msg_data.data); if(idx%100==0){ std::cout << "Received " << idx << " (at size " << msg_data.size() << " )" << std::endl;