mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-07 14:34:12 +02:00
Small changes for speedup
This commit is contained in:
+2
-1
@@ -34,10 +34,11 @@ add_subdirectory("core-buffer")
|
||||
add_subdirectory("jf-udp-recv")
|
||||
add_subdirectory("jf-buffer-writer")
|
||||
add_subdirectory("jf-assembler")
|
||||
add_subdirectory("jf-zmqstreamer")
|
||||
add_subdirectory("sf-stream")
|
||||
add_subdirectory("sf-writer")
|
||||
add_subdirectory("jf-zmqstreamer")
|
||||
add_subdirectory("zmq-receiver")
|
||||
add_subdirectory("sim-jungfrau")
|
||||
|
||||
|
||||
if(BUILD_JF_LIVE_WRITER)
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
file(GLOB SOURCES src/*.cpp)
|
||||
|
||||
set(CMAKE_CXX_STANDARD 20)
|
||||
add_library(jf-zmqstreamer-lib STATIC ${SOURCES})
|
||||
target_include_directories(jf-zmqstreamer-lib PUBLIC include/)
|
||||
target_link_libraries(jf-zmqstreamer-lib external core-buffer-lib)
|
||||
|
||||
add_executable(jf-zmqstreamer src/main.cpp)
|
||||
set_target_properties(jf-zmqstreamer PROPERTIES OUTPUT_NAME jfj_combined)
|
||||
set_target_properties(jf-zmqstreamer PROPERTIES OUTPUT_NAME jf_zmqstreamer)
|
||||
target_link_libraries(jf-zmqstreamer jf-zmqstreamer-lib zmq rt pthread)
|
||||
|
||||
enable_testing()
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
file(GLOB SOURCES src/*.cpp)
|
||||
|
||||
# add_library(sim-jungfrau-lib STATIC ${SOURCES})
|
||||
# target_include_directories(sim-jungfrau-lib PUBLIC include/)
|
||||
# target_link_libraries(sim-jungfrau-lib external core-buffer-lib)
|
||||
|
||||
add_executable(sim-jungfrau src/main.cpp)
|
||||
set_target_properties(sim-jungfrau PROPERTIES OUTPUT_NAME sim_jungfrau)
|
||||
target_link_libraries(sim-jungfrau zmq rt pthread)
|
||||
|
||||
enable_testing()
|
||||
add_subdirectory(test/)
|
||||
@@ -0,0 +1,84 @@
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <cstring>
|
||||
#include <vector>
|
||||
#include <iostream>
|
||||
#include <netinet/in.h>
|
||||
#include "../../core-buffer/include/jungfrau.hpp"
|
||||
|
||||
|
||||
inline void busy_sleep(std::chrono::microseconds t) {
|
||||
// auto end = std::chrono::steady_clock::now() + t - overhead;
|
||||
auto end = std::chrono::steady_clock::now() + t;
|
||||
while(std::chrono::steady_clock::now() < end);
|
||||
}
|
||||
|
||||
|
||||
void MockDetector(uint16_t udp_port, int32_t moduleId, int32_t sleep_us){
|
||||
auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0);
|
||||
if(send_socket_fd < 0){std::cout << "Failed to create socket" << std::endl; exit(-1); };
|
||||
|
||||
struct sockaddr_in server_address, client_address;
|
||||
memset(&server_address, 0, sizeof(server_address));
|
||||
memset(&client_address, 0, sizeof(client_address));
|
||||
|
||||
// Filling server information
|
||||
server_address.sin_family = AF_INET; // IPv4
|
||||
server_address.sin_addr.s_addr = INADDR_ANY;
|
||||
server_address.sin_port = htons(udp_port);
|
||||
|
||||
// Send loop
|
||||
jungfrau_packet send_udp_buffer;
|
||||
memset(&send_udp_buffer, 0, sizeof(send_udp_buffer));
|
||||
|
||||
for(int64_t ff=0; ff<10000; ff++){
|
||||
send_udp_buffer.framenum = ff;
|
||||
send_udp_buffer.bunchid = ff;
|
||||
send_udp_buffer.moduleID = moduleId;
|
||||
send_udp_buffer.debug = 0;
|
||||
|
||||
for(int64_t pp=0; pp<128; pp++){
|
||||
send_udp_buffer.packetnum = pp;
|
||||
::sendto(send_socket_fd, &send_udp_buffer, sizeof(send_udp_buffer), 0, (sockaddr*) &server_address, sizeof(server_address));
|
||||
}
|
||||
|
||||
//std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
|
||||
busy_sleep(std::chrono::microseconds(sleep_us));
|
||||
if(ff%1000==0){
|
||||
std::cout << "Sent " << ff << " frames" << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
//close(send_socket_fd);
|
||||
}
|
||||
|
||||
|
||||
int main (int argc, char *argv[]) {
|
||||
if (argc != 4) {
|
||||
std::cout << "\nStart a UDP stream from a simulated Jungfrau detector\n";
|
||||
std::cout << "Usage:\tsim_jungfrau [num_modules] [start_port] [sleep_us]\n";
|
||||
std::cout << std::endl;
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
int num_modules = atoi(argv[1]);
|
||||
int start_port = atoi(argv[2]);
|
||||
int sleep_us = atoi(argv[3]);
|
||||
sleep_us = (sleep_us>=1) ? sleep_us : 1;
|
||||
|
||||
|
||||
std::cout << "Starting worker threads..." << std::endl;
|
||||
std::vector<std::thread> vThreads;
|
||||
|
||||
for(int mm=0; mm<num_modules; mm++){
|
||||
vThreads.push_back( std::thread(&MockDetector, start_port+mm, mm, sleep_us) );
|
||||
}
|
||||
std::cout << "Threads are up and running..." << std::endl;
|
||||
|
||||
|
||||
for(auto& it: vThreads){
|
||||
it.join();
|
||||
}
|
||||
std::cout << "Exiting program..." << std::endl;
|
||||
return 0;
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
{
|
||||
"streamvis_stream": "127.0.0.1",
|
||||
"streamvis_rate": 2,
|
||||
"live_stream": "127.0.0.1",
|
||||
"live_rate": 2,
|
||||
"live_rate": 2,
|
||||
"pedestal_file": "/dev/null",
|
||||
"gain_file": "/dev/null",
|
||||
"detector_name": "JF",
|
||||
"n_modules": 3,
|
||||
"start_udp_port": 5100,
|
||||
"buffer_folder": "."
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
#ifndef SF_DAQ_BUFFER_TYPES_HPP
|
||||
#define SF_DAQ_BUFFER_TYPES_HPP
|
||||
|
||||
#include <chrono>
|
||||
#include <cstring>
|
||||
#include "rapidjson/document.h"
|
||||
|
||||
@@ -48,6 +49,7 @@ public:
|
||||
hsh.set("status", std::vector<uint16_t>(bs) );
|
||||
hsh.set("user_1", std::vector<uint64_t>(bs) );
|
||||
hsh.set("user_2", std::vector<uint64_t>(bs) );
|
||||
hsh.set("shape", std::vector<std::vector<uint64_t>>(bs) );
|
||||
};
|
||||
|
||||
bool is_full(){
|
||||
@@ -55,15 +57,20 @@ public:
|
||||
};
|
||||
|
||||
void append(void* meta, size_t meta_size, void* data, size_t data_size){
|
||||
|
||||
std::chrono::time_point<std::chrono::high_resolution_clock> t1, t2;
|
||||
std::chrono::duration<double, std::milli> ms_double;
|
||||
|
||||
t1 = std::chrono::high_resolution_clock::now();
|
||||
|
||||
std::string jason_string((char*)meta, meta_size);
|
||||
std::cout << jason_string << std::endl;
|
||||
|
||||
rapidjson::Document jason_parsed;
|
||||
|
||||
jason_parsed.Parse(jason_string.c_str());
|
||||
if(jason_parsed["id"].GetInt() % 20 ==0){ std::cout << jason_string << std::endl; }
|
||||
|
||||
t2 = std::chrono::high_resolution_clock::now();
|
||||
ms_double = t2 - t1;
|
||||
std::cout << " JSON parsing took: " << ms_double.count() << " ms" << std::endl;
|
||||
|
||||
//std::cout << "NI" << std::endl;
|
||||
|
||||
// Enforce flushing when full
|
||||
if(is_full()){ write_to_disk(); }
|
||||
@@ -77,6 +84,9 @@ public:
|
||||
//}
|
||||
|
||||
// Update the hash
|
||||
t1 = std::chrono::high_resolution_clock::now();
|
||||
|
||||
|
||||
hsh.get<std::vector<uint64_t>&>("version")[write_idx] = jason_parsed["version"].GetInt();
|
||||
hsh.get<std::vector<uint64_t>&>("id")[write_idx] = jason_parsed["id"].GetInt();
|
||||
hsh.get<std::vector<uint64_t>&>("height")[write_idx] = jason_parsed["height"].GetInt();
|
||||
@@ -87,12 +97,29 @@ public:
|
||||
hsh.get<std::vector<uint16_t>&>("status")[write_idx] = jason_parsed["status"].GetInt();
|
||||
hsh.get<std::vector<uint64_t>&>("user_1")[write_idx] = jason_parsed["user_1"].GetInt();
|
||||
hsh.get<std::vector<uint64_t>&>("user_2")[write_idx] = jason_parsed["user_2"].GetInt();
|
||||
|
||||
|
||||
std::cout << "hashed" << std::endl;
|
||||
|
||||
std::vector<uint64_t> shape;
|
||||
const auto& s = jason_parsed["shape"];
|
||||
for(auto& it : s.GetArray()){ shape.push_back(it.GetInt()); }
|
||||
hsh.get<std::vector<std::vector<uint64_t>>&>("shape")[write_idx] = shape;
|
||||
|
||||
// Hard coded type for now
|
||||
auto data_buf = hsh.get<std::vector<uint16_t>&>("data");
|
||||
std::memcpy(&data_buf[write_idx*m_block_size], data, std::min(data_size, m_block_size));
|
||||
t2 = std::chrono::high_resolution_clock::now();
|
||||
ms_double = t2 - t1;
|
||||
std::cout << " Hash update took: " << ms_double.count() << " ms" << std::endl;
|
||||
|
||||
// Hard coded type for now
|
||||
// NOTE: There's a massive performance bottleneck here if this is pre fetched!
|
||||
//std::vector<uint64_t>& data_buf = hsh.get<std::vector<uint16_t>&>("data");
|
||||
|
||||
t1 = std::chrono::high_resolution_clock::now();
|
||||
|
||||
std::memcpy(&hsh.get<std::vector<uint16_t>&>("data")[write_idx*m_block_size], data, std::min(data_size, m_block_size));
|
||||
|
||||
t2 = std::chrono::high_resolution_clock::now();
|
||||
ms_double = t2 - t1;
|
||||
std::cout << " Data copy took: " << ms_double.count() << " ms (size: " << std::min(data_size, m_block_size) << " )" << std::endl;
|
||||
|
||||
// Pop index
|
||||
write_idx++;
|
||||
@@ -132,9 +159,11 @@ public:
|
||||
writer.writeVector(hsh.get<std::vector<uint64_t>&>("user_2"), "/data/" + detector_name + "/user_2");
|
||||
|
||||
std::cout << "Writing data: " << hsh.get<std::vector<uint16_t>&>("data").size() << std::endl;
|
||||
writer.writeArray(hsh.get<std::vector<uint16_t>&>("data"), {m_buffer_size, hsh.get<std::vector<uint64_t>&>("height")[0], hsh.get<std::vector<uint64_t>&>("height")[0], 1}, "/data/" + detector_name + "/data");
|
||||
|
||||
|
||||
std::vector<uint64_t> array_shape = {m_buffer_size};
|
||||
auto slice_shape =hsh.get<std::vector<std::vector<uint64_t>>&>("shape")[0];
|
||||
array_shape.insert(array_shape.end(), slice_shape.begin(), slice_shape.end());
|
||||
writer.writeArray(hsh.get<std::vector<uint16_t>&>("data"), array_shape, "/data/" + detector_name + "/data");
|
||||
|
||||
run_id++;
|
||||
};
|
||||
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
#include <zmq.hpp>
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
#include <chrono>
|
||||
|
||||
#include "../../core-buffer/include/buffer_config.hpp"
|
||||
#include "../include/BufferTypes.hpp"
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
int main (int argc, char *argv[]){
|
||||
if (argc != 4) {
|
||||
std::cout << "\nERROR\nUsage: jf_buffer_writer [zmq_topic] [zmq_sub_addr] [detector_name]\n";
|
||||
@@ -29,7 +34,8 @@ int main (int argc, char *argv[]){
|
||||
// Subscribe to TOPIC (expected schema)
|
||||
zmq::socket_t subscriber (context, ZMQ_SUB);
|
||||
subscriber.connect(sub_addr.c_str());
|
||||
subscriber.setsockopt(ZMQ_SUBSCRIBE, topic.c_str(), topic.size());
|
||||
//subscriber.setsockopt(ZMQ_SUBSCRIBE, topic.c_str(), topic.size());
|
||||
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);
|
||||
|
||||
// Publisher to ipc
|
||||
std::cout << "Crating publisher...\n" << std::endl;
|
||||
@@ -65,13 +71,18 @@ int main (int argc, char *argv[]){
|
||||
for (int idx = 0; idx < 100000; idx++) {
|
||||
// ZMQ guarantees full delivery of multipart massages!
|
||||
// Packets are sent as three part messages: topic + meta + data
|
||||
subscriber.recv(&msg_topic, 0);
|
||||
//subscriber.recv(&msg_topic, 0);
|
||||
subscriber.recv(&msg_meta, 0);
|
||||
subscriber.recv(&msg_data, 0);
|
||||
|
||||
// Schema (topic) specific saving)
|
||||
if(topic=="IMAGEDATA"){
|
||||
auto t1 = std::chrono::high_resolution_clock::now();
|
||||
cache.append((void*)msg_meta.data(), msg_meta.size(), (void*)msg_data.data(), msg_data.size());
|
||||
auto t2 = std::chrono::high_resolution_clock::now();
|
||||
std::chrono::duration<double, std::milli> ms_double = t2 - t1;
|
||||
|
||||
std::cout << "Append took: " << ms_double.count() << " ms" << std::endl;
|
||||
//buffer.write_image((ImageMetadata*)msg_meta.data(), (char*)msg_data.data);
|
||||
if(idx%100==0){
|
||||
std::cout << "Received " << idx << " (at size " << msg_data.size() << " )" << std::endl;
|
||||
|
||||
Reference in New Issue
Block a user