diff --git a/core-buffer/include/buffer_config.hpp b/core-buffer/include/buffer_config.hpp index 78d0ae7..7f42752 100644 --- a/core-buffer/include/buffer_config.hpp +++ b/core-buffer/include/buffer_config.hpp @@ -27,8 +27,6 @@ namespace core_buffer { // How many frames do we read at once during replay. const size_t REPLAY_READ_BLOCK_SIZE = 100; - // How long should the RECV queue be. - const size_t STREAM_RCV_QUEUE_SIZE = 100; // Size of sf_buffer RB in elements. const size_t BUFFER_INTERNAL_QUEUE_SIZE = 1000; @@ -39,8 +37,14 @@ namespace core_buffer { // Microseconds timeout for UDP recv. const int BUFFER_UDP_US_TIMEOUT = 5 * 1000; - // Output queue length for buffer live stream. - const int BUFFER_LIVE_SEND_HWM = 100; + // HWM for live stream from buffer. + const int BUFFER_ZMQ_SNDHWM = 100; + + // N of IO threads to receive data from modules. + const int STREAM_ZMQ_IO_THREADS = 4; + + // How long should the RECV queue be. + const size_t STREAM_RCVHWM = 100; // ZMQ threads for receiving data from sf_replay. const int WRITER_ZMQ_IO_THREADS = 2; diff --git a/core-buffer/src/LiveRecvModule.cpp b/core-buffer/src/LiveRecvModule.cpp index cbaabec..58381b5 100644 --- a/core-buffer/src/LiveRecvModule.cpp +++ b/core-buffer/src/LiveRecvModule.cpp @@ -48,7 +48,7 @@ void* LiveRecvModule::connect_socket(size_t module_id) throw runtime_error(zmq_strerror(errno)); } - int rcvhwm = STREAM_RCV_QUEUE_SIZE; + int rcvhwm = STREAM_RCVHWM; if (zmq_setsockopt(sock, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) { throw runtime_error(zmq_strerror(errno)); } diff --git a/scripts/JF07-stream.service b/scripts/JF07-stream.service index d41554b..84046fc 100644 --- a/scripts/JF07-stream.service +++ b/scripts/JF07-stream.service @@ -7,6 +7,7 @@ Type=idle User=root ExecStart=/usr/bin/sh /home/writer/git/sf_daq_buffer/scripts/JF07-stream.sh TimeoutStartSec=10 +Restart=on-failure RestartSec=10 [Install] diff --git a/sf-buffer/src/sf_buffer.cpp b/sf-buffer/src/sf_buffer.cpp index 539133e..1f23f9e 100644 --- a/sf-buffer/src/sf_buffer.cpp +++ b/sf-buffer/src/sf_buffer.cpp @@ -40,7 +40,7 @@ int main (int argc, char *argv[]) { auto ctx = zmq_ctx_new(); auto socket = zmq_socket(ctx, ZMQ_PUB); - const int sndhwm = BUFFER_LIVE_SEND_HWM; + const int sndhwm = BUFFER_ZMQ_SNDHWM; if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) throw runtime_error(strerror (errno)); diff --git a/sf-buffer/src/sf_replay.cpp b/sf-buffer/src/sf_replay.cpp index ff9915a..6b26788 100644 --- a/sf-buffer/src/sf_replay.cpp +++ b/sf-buffer/src/sf_replay.cpp @@ -83,16 +83,6 @@ void sf_replay ( string filename = filename_base + filename_suffix.path; - #ifdef DEBUG_OUTPUT - using namespace date; - using namespace chrono; - - cout << "[" << system_clock::now() << "]"; - cout << "[sf_replay::sf_replay]"; - - cout << " Reading from filename " << filename << endl; - #endif - for (size_t file_index_offset=0; file_index_offset < FILE_MOD; file_index_offset += REPLAY_READ_BLOCK_SIZE) @@ -161,16 +151,6 @@ void sf_replay ( MODULE_N_BYTES, 0); - #ifdef DEBUG_OUTPUT - using namespace date; - using namespace chrono; - - cout << "[" << system_clock::now() << "]"; - cout << "[sf_replay::sf_replay]"; - cout << " Sent pulse_id "; - cout << current_pulse_id << endl; - #endif - current_pulse_id++; } } @@ -204,22 +184,6 @@ int main (int argc, char *argv[]) { ipc_stream << "ipc:///tmp/sf-replay-" << (int)module_id; const auto ipc_address = ipc_stream.str(); - #ifdef DEBUG_OUTPUT - using namespace date; - using namespace chrono; - - cout << "[" << system_clock::now() << "]"; - cout << "[sf_replay::receive]"; - - cout << " device " << device; - cout << " channel_name " << channel_name; - cout << " module_id " << module_id; - cout << " start_pulse_id " << start_pulse_id; - cout << " stop_pulse_id " << stop_pulse_id; - cout << " ipc_address " << ipc_address; - cout << endl; - #endif - auto ctx = zmq_ctx_new(); auto socket = zmq_socket(ctx, ZMQ_PUSH); diff --git a/sf-buffer/src/sf_stream.cpp b/sf-buffer/src/sf_stream.cpp index 9aecb44..15ce4ec 100644 --- a/sf-buffer/src/sf_stream.cpp +++ b/sf-buffer/src/sf_stream.cpp @@ -45,7 +45,7 @@ int main (int argc, char *argv[]) WRITER_RB_BUFFER_SLOTS); auto ctx = zmq_ctx_new(); - zmq_ctx_set (ctx, ZMQ_IO_THREADS, WRITER_ZMQ_IO_THREADS); + zmq_ctx_set (ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); string ipc_prefix = "ipc:///tmp/sf-live-"; LiveRecvModule recv_module(queue, n_modules, ctx, ipc_prefix);