This commit is contained in:
Dmitry Ozerov
2020-05-09 23:26:05 +02:00
committed by Data Backend account
6 changed files with 12 additions and 43 deletions
+8 -4
View File
@@ -27,8 +27,6 @@ namespace core_buffer {
// How many frames do we read at once during replay.
const size_t REPLAY_READ_BLOCK_SIZE = 100;
// How long should the RECV queue be.
const size_t STREAM_RCV_QUEUE_SIZE = 100;
// Size of sf_buffer RB in elements.
const size_t BUFFER_INTERNAL_QUEUE_SIZE = 1000;
@@ -39,8 +37,14 @@ namespace core_buffer {
// Microseconds timeout for UDP recv.
const int BUFFER_UDP_US_TIMEOUT = 5 * 1000;
// Output queue length for buffer live stream.
const int BUFFER_LIVE_SEND_HWM = 100;
// HWM for live stream from buffer.
const int BUFFER_ZMQ_SNDHWM = 100;
// N of IO threads to receive data from modules.
const int STREAM_ZMQ_IO_THREADS = 4;
// How long should the RECV queue be.
const size_t STREAM_RCVHWM = 100;
// ZMQ threads for receiving data from sf_replay.
const int WRITER_ZMQ_IO_THREADS = 2;
+1 -1
View File
@@ -48,7 +48,7 @@ void* LiveRecvModule::connect_socket(size_t module_id)
throw runtime_error(zmq_strerror(errno));
}
int rcvhwm = STREAM_RCV_QUEUE_SIZE;
int rcvhwm = STREAM_RCVHWM;
if (zmq_setsockopt(sock, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
+1
View File
@@ -7,6 +7,7 @@ Type=idle
User=root
ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF07-stream.sh
TimeoutStartSec=10
Restart=on-failure
RestartSec=10
[Install]
+1 -1
View File
@@ -40,7 +40,7 @@ int main (int argc, char *argv[]) {
auto ctx = zmq_ctx_new();
auto socket = zmq_socket(ctx, ZMQ_PUB);
const int sndhwm = BUFFER_LIVE_SEND_HWM;
const int sndhwm = BUFFER_ZMQ_SNDHWM;
if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0)
throw runtime_error(strerror (errno));
-36
View File
@@ -83,16 +83,6 @@ void sf_replay (
string filename = filename_base + filename_suffix.path;
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[sf_replay::sf_replay]";
cout << " Reading from filename " << filename << endl;
#endif
for (size_t file_index_offset=0;
file_index_offset < FILE_MOD;
file_index_offset += REPLAY_READ_BLOCK_SIZE)
@@ -161,16 +151,6 @@ void sf_replay (
MODULE_N_BYTES,
0);
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[sf_replay::sf_replay]";
cout << " Sent pulse_id ";
cout << current_pulse_id << endl;
#endif
current_pulse_id++;
}
}
@@ -204,22 +184,6 @@ int main (int argc, char *argv[]) {
ipc_stream << "ipc:///tmp/sf-replay-" << (int)module_id;
const auto ipc_address = ipc_stream.str();
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[sf_replay::receive]";
cout << " device " << device;
cout << " channel_name " << channel_name;
cout << " module_id " << module_id;
cout << " start_pulse_id " << start_pulse_id;
cout << " stop_pulse_id " << stop_pulse_id;
cout << " ipc_address " << ipc_address;
cout << endl;
#endif
auto ctx = zmq_ctx_new();
auto socket = zmq_socket(ctx, ZMQ_PUSH);
+1 -1
View File
@@ -45,7 +45,7 @@ int main (int argc, char *argv[])
WRITER_RB_BUFFER_SLOTS);
auto ctx = zmq_ctx_new();
zmq_ctx_set (ctx, ZMQ_IO_THREADS, WRITER_ZMQ_IO_THREADS);
zmq_ctx_set (ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS);
string ipc_prefix = "ipc:///tmp/sf-live-";
LiveRecvModule recv_module(queue, n_modules, ctx, ipc_prefix);