mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-01 08:02:23 +02:00
Merge pull request #4 from paulscherrerinstitute/single_stream
sf_stream in a single thread
This commit is contained in:
@@ -0,0 +1,43 @@
|
||||
#ifndef SF_DAQ_BUFFER_ZMQLIVESENDER_HPP
|
||||
#define SF_DAQ_BUFFER_ZMQLIVESENDER_HPP
|
||||
|
||||
#include <string>
|
||||
#include <fstream>
|
||||
#include <rapidjson/istreamwrapper.h>
|
||||
#include <rapidjson/document.h>
|
||||
#include <rapidjson/stringbuffer.h>
|
||||
#include <rapidjson/writer.h>
|
||||
|
||||
#include "formats.hpp"
|
||||
|
||||
|
||||
struct LiveStreamConfig {
|
||||
const std::string streamvis_address;
|
||||
const int reduction_factor_streamvis;
|
||||
const std::string live_analysis_address;
|
||||
const int reduction_factor_live_analysis;
|
||||
const std::string PEDE_FILENAME;
|
||||
const std::string GAIN_FILENAME;
|
||||
const std::string DETECTOR_NAME;
|
||||
const int n_modules;
|
||||
};
|
||||
|
||||
LiveStreamConfig read_json_config(const std::string filename);
|
||||
|
||||
class ZmqLiveSender {
|
||||
const void* ctx_;
|
||||
const LiveStreamConfig config_;
|
||||
|
||||
void* socket_streamvis_;
|
||||
void* socket_live_;
|
||||
|
||||
public:
|
||||
ZmqLiveSender(void* ctx,
|
||||
const LiveStreamConfig& config);
|
||||
~ZmqLiveSender();
|
||||
|
||||
void send(const ModuleFrameBuffer* meta, const char* data);
|
||||
};
|
||||
|
||||
|
||||
#endif //SF_DAQ_BUFFER_ZMQLIVESENDER_HPP
|
||||
@@ -1,7 +1,7 @@
|
||||
namespace stream_config
|
||||
{
|
||||
// N of IO threads to receive data from modules.
|
||||
const int STREAM_ZMQ_IO_THREADS = 4;
|
||||
const int STREAM_ZMQ_IO_THREADS = 2;
|
||||
// How long should the RECV queue be.
|
||||
const size_t STREAM_RCVHWM = 100;
|
||||
// Size of buffer between the receiving and sending part.
|
||||
|
||||
@@ -72,7 +72,7 @@ void FastQueue<T>::commit()
|
||||
}
|
||||
|
||||
write_slot_id_++;
|
||||
write_slot_id_ %= n_slots_;
|
||||
write_slot_id_ = write_slot_id_ % n_slots_;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
@@ -99,7 +99,7 @@ void FastQueue<T>::release()
|
||||
}
|
||||
|
||||
read_slot_id_++;
|
||||
read_slot_id_ %= n_slots_;
|
||||
read_slot_id_ = read_slot_id_ % n_slots_;
|
||||
}
|
||||
|
||||
template class FastQueue<BufferBinaryBlock>;
|
||||
|
||||
@@ -16,10 +16,10 @@ using namespace stream_config;
|
||||
|
||||
ZmqLiveReceiver::ZmqLiveReceiver(
|
||||
const size_t n_modules,
|
||||
void *ctx,
|
||||
void* ctx,
|
||||
const std::string &ipc_prefix) :
|
||||
n_modules_(n_modules),
|
||||
ctx_(ctx_),
|
||||
ctx_(ctx),
|
||||
ipc_prefix_(ipc_prefix),
|
||||
sockets_(n_modules)
|
||||
{
|
||||
|
||||
@@ -0,0 +1,201 @@
|
||||
#include "ZmqLiveSender.hpp"
|
||||
|
||||
#include "zmq.h"
|
||||
#include <stdexcept>
|
||||
|
||||
using namespace std;
|
||||
|
||||
LiveStreamConfig read_json_config(const std::string filename)
|
||||
{
|
||||
std::ifstream ifs(filename);
|
||||
rapidjson::IStreamWrapper isw(ifs);
|
||||
rapidjson::Document config_parameters;
|
||||
config_parameters.ParseStream(isw);
|
||||
|
||||
return {
|
||||
config_parameters["streamvis_stream"].GetString(),
|
||||
config_parameters["streamvis_rate"].GetInt(),
|
||||
config_parameters["live_stream"].GetString(),
|
||||
config_parameters["live_rate"].GetInt(),
|
||||
config_parameters["pedestal_file"].GetString(),
|
||||
config_parameters["gain_file"].GetString(),
|
||||
config_parameters["detector_name"].GetString(),
|
||||
config_parameters["n_modules"].GetInt()
|
||||
};
|
||||
}
|
||||
|
||||
ZmqLiveSender::ZmqLiveSender(
|
||||
void* ctx,
|
||||
const LiveStreamConfig& config) :
|
||||
ctx_(ctx),
|
||||
config_(config)
|
||||
{
|
||||
// TODO: Set also LINGER and SNDHWM.
|
||||
|
||||
// 0mq sockets to streamvis and live analysis
|
||||
socket_streamvis_ = zmq_socket(ctx, ZMQ_PUB);
|
||||
if (zmq_bind(socket_streamvis_, config.streamvis_address.c_str()) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
socket_live_ = zmq_socket(ctx, ZMQ_PUB);
|
||||
if (zmq_bind(socket_live_, config.live_analysis_address.c_str()) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
}
|
||||
|
||||
ZmqLiveSender::~ZmqLiveSender()
|
||||
{
|
||||
zmq_close(socket_streamvis_);
|
||||
zmq_close(socket_live_);
|
||||
}
|
||||
|
||||
void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data)
|
||||
{
|
||||
uint16_t data_empty [] = { 0, 0, 0, 0};
|
||||
|
||||
rapidjson::Document header(rapidjson::kObjectType);
|
||||
auto& header_alloc = header.GetAllocator();
|
||||
string text_header;
|
||||
|
||||
uint64_t pulse_id = 0;
|
||||
uint64_t frame_index = 0;
|
||||
uint64_t daq_rec = 0;
|
||||
bool is_good_frame = true;
|
||||
|
||||
for (size_t i_module = 0; i_module < config_.n_modules; i_module++) {
|
||||
// TODO: Place this tests in the appropriate spot.
|
||||
auto& module_metadata = meta->module[i_module];
|
||||
if (i_module == 0) {
|
||||
pulse_id = module_metadata.pulse_id;
|
||||
frame_index = module_metadata.frame_index;
|
||||
daq_rec = module_metadata.daq_rec;
|
||||
|
||||
if (module_metadata.n_recv_packets != 128 ) is_good_frame = false;
|
||||
} else {
|
||||
if (module_metadata.pulse_id != pulse_id) is_good_frame = false;
|
||||
|
||||
if (module_metadata.frame_index != frame_index) is_good_frame = false;
|
||||
|
||||
if (module_metadata.daq_rec != daq_rec) is_good_frame = false;
|
||||
|
||||
if (module_metadata.n_recv_packets != 128 ) is_good_frame = false;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame)
|
||||
|
||||
header.AddMember("frame", frame_index, header_alloc);
|
||||
header.AddMember("is_good_frame", is_good_frame, header_alloc);
|
||||
header.AddMember("daq_rec", daq_rec, header_alloc);
|
||||
header.AddMember("pulse_id", pulse_id, header_alloc);
|
||||
|
||||
rapidjson::Value pedestal_file;
|
||||
pedestal_file.SetString(config_.PEDE_FILENAME.c_str(), header_alloc);
|
||||
header.AddMember("pedestal_file", pedestal_file, header_alloc);
|
||||
|
||||
rapidjson::Value gain_file;
|
||||
gain_file.SetString(config_.GAIN_FILENAME.c_str(), header_alloc);
|
||||
header.AddMember("gain_file", gain_file, header_alloc);
|
||||
|
||||
header.AddMember("number_frames_expected", 10000, header_alloc);
|
||||
|
||||
rapidjson::Value run_name;
|
||||
run_name.SetString(
|
||||
to_string(uint64_t(pulse_id/10000)*10000).c_str(),
|
||||
header_alloc);
|
||||
header.AddMember("run_name", run_name, header_alloc);
|
||||
|
||||
rapidjson::Value detector_name;
|
||||
detector_name.SetString(config_.DETECTOR_NAME.c_str(), header_alloc);
|
||||
header.AddMember("detector_name", detector_name, header_alloc);
|
||||
|
||||
header.AddMember("htype", "array-1.0", header_alloc);
|
||||
header.AddMember("type", "uint16", header_alloc);
|
||||
|
||||
// To be retrieved and filled with correct values down.
|
||||
auto shape_value = rapidjson::Value(rapidjson::kArrayType);
|
||||
shape_value.PushBack((uint64_t)0, header_alloc);
|
||||
shape_value.PushBack((uint64_t)0, header_alloc);
|
||||
header.AddMember("shape", shape_value, header_alloc);
|
||||
|
||||
int send_streamvis = 0;
|
||||
if ( config_.reduction_factor_streamvis > 1 ) {
|
||||
send_streamvis = rand() % config_.reduction_factor_streamvis;
|
||||
}
|
||||
if ( send_streamvis == 0 ) {
|
||||
auto& shape = header["shape"];
|
||||
shape[0] = config_.n_modules*512;
|
||||
shape[1] = 1024;
|
||||
} else{
|
||||
auto& shape = header["shape"];
|
||||
shape[0] = 2;
|
||||
shape[1] = 2;
|
||||
}
|
||||
|
||||
{
|
||||
rapidjson::StringBuffer buffer;
|
||||
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
|
||||
header.Accept(writer);
|
||||
|
||||
text_header = buffer.GetString();
|
||||
}
|
||||
|
||||
zmq_send(socket_streamvis_,
|
||||
text_header.c_str(),
|
||||
text_header.size(),
|
||||
ZMQ_SNDMORE);
|
||||
|
||||
if ( send_streamvis == 0 ) {
|
||||
zmq_send(socket_streamvis_,
|
||||
(char*)data,
|
||||
buffer_config::MODULE_N_BYTES * config_.n_modules,
|
||||
0);
|
||||
} else {
|
||||
zmq_send(socket_streamvis_,
|
||||
(char*)data_empty,
|
||||
8,
|
||||
0);
|
||||
}
|
||||
|
||||
//same for live analysis
|
||||
int send_live_analysis = 0;
|
||||
if ( config_.reduction_factor_live_analysis > 1 ) {
|
||||
send_live_analysis = rand() % config_.reduction_factor_live_analysis;
|
||||
}
|
||||
if ( send_live_analysis == 0 ) {
|
||||
auto& shape = header["shape"];
|
||||
shape[0] = config_.n_modules*512;
|
||||
shape[1] = 1024;
|
||||
} else{
|
||||
auto& shape = header["shape"];
|
||||
shape[0] = 2;
|
||||
shape[1] = 2;
|
||||
}
|
||||
|
||||
{
|
||||
rapidjson::StringBuffer buffer;
|
||||
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
|
||||
header.Accept(writer);
|
||||
|
||||
text_header = buffer.GetString();
|
||||
}
|
||||
|
||||
zmq_send(socket_live_,
|
||||
text_header.c_str(),
|
||||
text_header.size(),
|
||||
ZMQ_SNDMORE);
|
||||
|
||||
if ( send_live_analysis == 0 ) {
|
||||
zmq_send(socket_live_,
|
||||
(char*)data,
|
||||
buffer_config::MODULE_N_BYTES * config_.n_modules,
|
||||
0);
|
||||
} else {
|
||||
zmq_send(socket_live_,
|
||||
(char*)data_empty,
|
||||
8,
|
||||
0);
|
||||
}
|
||||
}
|
||||
|
||||
+32
-202
@@ -1,22 +1,16 @@
|
||||
#include <iostream>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <cstring>
|
||||
#include <zmq.h>
|
||||
#include "rapidjson/istreamwrapper.h"
|
||||
#include <fstream>
|
||||
#include "rapidjson/document.h"
|
||||
#include "rapidjson/stringbuffer.h"
|
||||
#include "rapidjson/writer.h"
|
||||
|
||||
#include "FastQueue.hpp"
|
||||
#include "LiveRecvModule.hpp"
|
||||
#include "buffer_config.hpp"
|
||||
#include "stream_config.hpp"
|
||||
#include "ZmqLiveSender.hpp"
|
||||
|
||||
using namespace std;
|
||||
using namespace chrono;
|
||||
using namespace buffer_config;
|
||||
using namespace stream_config;
|
||||
|
||||
@@ -27,239 +21,75 @@ int main (int argc, char *argv[])
|
||||
cout << "Usage: sf_stream ";
|
||||
cout << " [config_json_file]";
|
||||
cout << endl;
|
||||
cout << "\tconfig_json_file: json file with the configuration parameters(detector name, number of modules, pedestal and gain files" << endl;
|
||||
cout << "\tconfig_json_file: json file with the configuration "
|
||||
"parameters(detector name, number of modules, pedestal and "
|
||||
"gain files" << endl;
|
||||
cout << endl;
|
||||
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
string config_json_file = string(argv[1]);
|
||||
|
||||
ifstream ifs(config_json_file);
|
||||
rapidjson::IStreamWrapper isw(ifs);
|
||||
rapidjson::Document config_parameters;
|
||||
config_parameters.ParseStream(isw);
|
||||
auto config = read_json_config(string(argv[1]));
|
||||
string RECV_IPC_URL = BUFFER_LIVE_IPC_URL + config.DETECTOR_NAME + "-";
|
||||
|
||||
string streamvis_address = config_parameters["streamvis_stream"].GetString();
|
||||
int reduction_factor_streamvis = config_parameters["streamvis_rate"].GetInt();
|
||||
string live_analysis_address = config_parameters["live_stream"].GetString();
|
||||
int reduction_factor_live_analysis = config_parameters["live_rate"].GetInt();
|
||||
|
||||
const string PEDE_FILENAME = config_parameters["pedestal_file"].GetString();
|
||||
const string GAIN_FILENAME = config_parameters["gain_file"].GetString();
|
||||
const string DETECTOR_NAME = config_parameters["detector_name"].GetString();
|
||||
size_t n_modules = config_parameters["n_modules"].GetInt();
|
||||
|
||||
FastQueue<ModuleFrameBuffer> queue(
|
||||
n_modules * MODULE_N_BYTES, STREAM_FASTQUEUE_SLOTS);
|
||||
ModuleFrameBuffer* meta = new ModuleFrameBuffer();
|
||||
char* data = new char[config.n_modules * MODULE_N_BYTES];
|
||||
|
||||
auto ctx = zmq_ctx_new();
|
||||
zmq_ctx_set (ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS);
|
||||
|
||||
const string LIVE_IPC_URL = BUFFER_LIVE_IPC_URL+DETECTOR_NAME + "-";
|
||||
ZmqLiveReceiver receiver(n_modules, ctx, LIVE_IPC_URL);
|
||||
|
||||
LiveRecvModule recv_module(queue, receiver);
|
||||
|
||||
// 0mq sockets to streamvis and live analysis
|
||||
void *socket_streamvis = zmq_socket(ctx, ZMQ_PUB);
|
||||
if (zmq_bind(socket_streamvis, streamvis_address.c_str()) != 0) {
|
||||
throw runtime_error(strerror(errno));
|
||||
}
|
||||
void *socket_live = zmq_socket(ctx, ZMQ_PUB);
|
||||
if (zmq_bind(socket_live, live_analysis_address.c_str()) != 0) {
|
||||
throw runtime_error(strerror(errno));
|
||||
}
|
||||
|
||||
uint16_t data_empty [] = { 0, 0, 0, 0};
|
||||
ZmqLiveReceiver receiver(config.n_modules, ctx, RECV_IPC_URL);
|
||||
ZmqLiveSender sender(ctx, config);
|
||||
|
||||
// TODO: Remove stats trash.
|
||||
int stats_counter = 0;
|
||||
|
||||
size_t read_total_us = 0;
|
||||
size_t read_max_us = 0;
|
||||
size_t send_total_us = 0;
|
||||
size_t send_max_us = 0;
|
||||
|
||||
while (true) {
|
||||
|
||||
rapidjson::Document header(rapidjson::kObjectType);
|
||||
auto& header_alloc = header.GetAllocator();
|
||||
string text_header;
|
||||
auto start_time = steady_clock::now();
|
||||
|
||||
auto start_time = chrono::steady_clock::now();
|
||||
auto n_lost_pulses = receiver.get_next_image(meta, data);
|
||||
|
||||
auto slot_id = queue.read();
|
||||
|
||||
if(slot_id == -1) {
|
||||
this_thread::sleep_for(chrono::milliseconds(
|
||||
buffer_config::RB_READ_RETRY_INTERVAL_MS));
|
||||
continue;
|
||||
if (n_lost_pulses > 0) {
|
||||
cout << "sf_stream:sync_lost_pulses " << n_lost_pulses << endl;
|
||||
}
|
||||
|
||||
auto metadata = queue.get_metadata_buffer(slot_id);
|
||||
auto data = queue.get_data_buffer(slot_id);
|
||||
auto end_time = steady_clock::now();
|
||||
size_t read_us_duration = duration_cast<microseconds>(
|
||||
end_time - start_time).count();
|
||||
|
||||
auto read_end_time = chrono::steady_clock::now();
|
||||
auto read_us_duration = chrono::duration_cast<chrono::microseconds>(
|
||||
read_end_time-start_time).count();
|
||||
start_time = steady_clock::now();
|
||||
|
||||
uint64_t pulse_id = 0;
|
||||
uint64_t frame_index = 0;
|
||||
uint64_t daq_rec = 0;
|
||||
bool is_good_frame = true;
|
||||
sender.send(meta, data);
|
||||
|
||||
for (size_t i_module = 0; i_module < n_modules; i_module++) {
|
||||
// TODO: Place this tests in the appropriate spot.
|
||||
auto& module_metadata = metadata->module[i_module];
|
||||
if (i_module == 0) {
|
||||
pulse_id = module_metadata.pulse_id;
|
||||
frame_index = module_metadata.frame_index;
|
||||
daq_rec = module_metadata.daq_rec;
|
||||
|
||||
if (module_metadata.n_recv_packets != 128 ) is_good_frame = false;
|
||||
} else {
|
||||
if (module_metadata.pulse_id != pulse_id) is_good_frame = false;
|
||||
|
||||
if (module_metadata.frame_index != frame_index) is_good_frame = false;
|
||||
|
||||
if (module_metadata.daq_rec != daq_rec) is_good_frame = false;
|
||||
|
||||
if (module_metadata.n_recv_packets != 128 ) is_good_frame = false;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame)
|
||||
|
||||
header.AddMember("frame", frame_index, header_alloc);
|
||||
header.AddMember("is_good_frame", is_good_frame, header_alloc);
|
||||
header.AddMember("daq_rec", daq_rec, header_alloc);
|
||||
header.AddMember("pulse_id", pulse_id, header_alloc);
|
||||
|
||||
rapidjson::Value pedestal_file;
|
||||
pedestal_file.SetString(PEDE_FILENAME.c_str(), header_alloc);
|
||||
header.AddMember("pedestal_file", pedestal_file, header_alloc);
|
||||
|
||||
rapidjson::Value gain_file;
|
||||
gain_file.SetString(GAIN_FILENAME.c_str(), header_alloc);
|
||||
header.AddMember("gain_file", gain_file, header_alloc);
|
||||
|
||||
header.AddMember("number_frames_expected", 10000, header_alloc);
|
||||
|
||||
rapidjson::Value run_name;
|
||||
run_name.SetString(
|
||||
to_string(uint64_t(pulse_id/10000)*10000).c_str(),
|
||||
header_alloc);
|
||||
header.AddMember("run_name", run_name, header_alloc);
|
||||
|
||||
rapidjson::Value detector_name;
|
||||
detector_name.SetString(DETECTOR_NAME.c_str(), header_alloc);
|
||||
header.AddMember("detector_name", detector_name, header_alloc);
|
||||
|
||||
header.AddMember("htype", "array-1.0", header_alloc);
|
||||
header.AddMember("type", "uint16", header_alloc);
|
||||
|
||||
// To be retrieved and filled with correct values down.
|
||||
auto shape_value = rapidjson::Value(rapidjson::kArrayType);
|
||||
shape_value.PushBack((uint64_t)0, header_alloc);
|
||||
shape_value.PushBack((uint64_t)0, header_alloc);
|
||||
header.AddMember("shape", shape_value, header_alloc);
|
||||
|
||||
int send_streamvis = 0;
|
||||
if ( reduction_factor_streamvis > 1 ) {
|
||||
send_streamvis = rand() % reduction_factor_streamvis;
|
||||
}
|
||||
if ( send_streamvis == 0 ) {
|
||||
auto& shape = header["shape"];
|
||||
shape[0] = n_modules*512;
|
||||
shape[1] = 1024;
|
||||
} else{
|
||||
auto& shape = header["shape"];
|
||||
shape[0] = 2;
|
||||
shape[1] = 2;
|
||||
}
|
||||
|
||||
{
|
||||
rapidjson::StringBuffer buffer;
|
||||
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
|
||||
header.Accept(writer);
|
||||
|
||||
text_header = buffer.GetString();
|
||||
}
|
||||
|
||||
zmq_send(socket_streamvis,
|
||||
text_header.c_str(),
|
||||
text_header.size(),
|
||||
ZMQ_SNDMORE);
|
||||
|
||||
if ( send_streamvis == 0 ) {
|
||||
zmq_send(socket_streamvis,
|
||||
(char*)data,
|
||||
buffer_config::MODULE_N_BYTES * n_modules,
|
||||
0);
|
||||
} else {
|
||||
zmq_send(socket_streamvis,
|
||||
(char*)data_empty,
|
||||
8,
|
||||
0);
|
||||
}
|
||||
|
||||
//same for live analysis
|
||||
int send_live_analysis = 0;
|
||||
if ( reduction_factor_live_analysis > 1 ) {
|
||||
send_live_analysis = rand() % reduction_factor_live_analysis;
|
||||
}
|
||||
if ( send_live_analysis == 0 ) {
|
||||
auto& shape = header["shape"];
|
||||
shape[0] = n_modules*512;
|
||||
shape[1] = 1024;
|
||||
} else{
|
||||
auto& shape = header["shape"];
|
||||
shape[0] = 2;
|
||||
shape[1] = 2;
|
||||
}
|
||||
|
||||
{
|
||||
rapidjson::StringBuffer buffer;
|
||||
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
|
||||
header.Accept(writer);
|
||||
|
||||
text_header = buffer.GetString();
|
||||
}
|
||||
|
||||
zmq_send(socket_live,
|
||||
text_header.c_str(),
|
||||
text_header.size(),
|
||||
ZMQ_SNDMORE);
|
||||
|
||||
if ( send_live_analysis == 0 ) {
|
||||
zmq_send(socket_live,
|
||||
(char*)data,
|
||||
buffer_config::MODULE_N_BYTES * n_modules,
|
||||
0);
|
||||
} else {
|
||||
zmq_send(socket_live,
|
||||
(char*)data_empty,
|
||||
8,
|
||||
0);
|
||||
}
|
||||
|
||||
queue.release();
|
||||
end_time = steady_clock::now();
|
||||
size_t send_us_duration = duration_cast<microseconds>(
|
||||
end_time - start_time).count();
|
||||
|
||||
// TODO: Some poor statistics.
|
||||
stats_counter++;
|
||||
read_total_us += read_us_duration;
|
||||
send_total_us += send_us_duration;
|
||||
|
||||
if (read_us_duration > read_max_us) {
|
||||
read_max_us = read_us_duration;
|
||||
}
|
||||
read_max_us = max(read_max_us, read_us_duration);
|
||||
send_max_us = max(send_max_us, send_us_duration);
|
||||
|
||||
if (stats_counter == STATS_MODULO) {
|
||||
cout << "sf_stream:read_us " << read_total_us / STATS_MODULO;
|
||||
cout << " sf_stream:read_max_us " << read_max_us;
|
||||
cout << " sf_stream:send_us " << send_total_us / STATS_MODULO;
|
||||
cout << " sf_stream:send_max_us " << send_max_us;
|
||||
cout << endl;
|
||||
|
||||
stats_counter = 0;
|
||||
read_total_us = 0;
|
||||
read_max_us = 0;
|
||||
send_total_us = 0;
|
||||
send_max_us = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user