From 8fb9fe4e3ca5a331b4355b5e82138cc81ca3c45b Mon Sep 17 00:00:00 2001 From: Dmitry Ozerov Date: Mon, 4 May 2020 19:08:50 +0200 Subject: [PATCH] intermediate ersion of sf_stream to read directly from sf_buffer --- sf-buffer/src/sf_stream.cpp | 48 +++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/sf-buffer/src/sf_stream.cpp b/sf-buffer/src/sf_stream.cpp index 4057629..838f33e 100644 --- a/sf-buffer/src/sf_stream.cpp +++ b/sf-buffer/src/sf_stream.cpp @@ -15,6 +15,8 @@ using namespace std; using namespace core_buffer; +const int WRITER_N_FRAMES_BUFFER_live = 1; + void receive_replay( const string ipc_prefix, const size_t n_modules, @@ -25,25 +27,27 @@ void receive_replay( void *sockets[n_modules]; for (size_t i = 0; i < n_modules; i++) { - sockets[i] = zmq_socket(ctx, ZMQ_PULL); - int rcvhwm = REPLAY_READ_BLOCK_SIZE; - if (zmq_setsockopt(sockets[i], ZMQ_RCVHWM, &rcvhwm, - sizeof(rcvhwm)) != 0) { - throw runtime_error(strerror(errno)); - } - int linger = 0; - if (zmq_setsockopt(sockets[i], ZMQ_LINGER, &linger, - sizeof(linger)) != 0) { - throw runtime_error(strerror(errno)); - } + //sockets[i] = zmq_socket(ctx, ZMQ_PULL); + sockets[i] = zmq_socket(ctx, ZMQ_SUB); + //int rcvhwm = REPLAY_READ_BLOCK_SIZE; + //if (zmq_setsockopt(sockets[i], ZMQ_RCVHWM, &rcvhwm, + // sizeof(rcvhwm)) != 0) { + // throw runtime_error(strerror(errno)); + //} + //int linger = 0; + //if (zmq_setsockopt(sockets[i], ZMQ_LINGER, &linger, + // sizeof(linger)) != 0) { + // throw runtime_error(strerror(errno)); + //} stringstream ipc_addr; ipc_addr << ipc_prefix << i; const auto ipc = ipc_addr.str(); - if (zmq_bind(sockets[i], ipc.c_str()) != 0) { + if (zmq_connect(sockets[i], ipc.c_str()) != 0) { throw runtime_error(strerror(errno)); } + zmq_setsockopt(sockets[i], ZMQ_SUBSCRIBE, "", sizeof("") != 0); } auto module_meta_buffer = make_unique(); @@ -62,18 +66,25 @@ void receive_replay( for ( size_t i_buffer=0; - i_bufferis_good_frame[i_buffer] = true; for (size_t i_module = 0; i_module < n_modules; i_module++) { + if (i_module == 0 ) { + cout << "before zmq_recv" << endl; + } auto n_bytes_metadata = zmq_recv( sockets[i_module], module_meta_buffer.get(), sizeof(ModuleFrame), 0); + if (i_module == 0 ) { + cout << "after zmq_recv" << endl; + } + if (n_bytes_metadata != sizeof(ModuleFrame)) { // TODO: Make nicer expcetion. @@ -177,10 +188,11 @@ int main (int argc, char *argv[]) size_t n_modules = 32; FastQueue queue( - n_modules * MODULE_N_BYTES * WRITER_N_FRAMES_BUFFER, + n_modules * MODULE_N_BYTES * WRITER_N_FRAMES_BUFFER_live, WRITER_RB_BUFFER_SLOTS); - string ipc_prefix = "ipc:///tmp/sf-replay-"; + string ipc_prefix = "ipc:///tmp/sf-live-"; + auto ctx = zmq_ctx_new(); zmq_ctx_set (ctx, ZMQ_IO_THREADS, WRITER_ZMQ_IO_THREADS); @@ -237,7 +249,7 @@ int main (int argc, char *argv[]) //Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame) - for ( size_t i_buffer=0; i_bufferpulse_id[i_buffer*n_modules+i_module] << " "; //} @@ -326,11 +338,11 @@ int main (int argc, char *argv[]) } - + queue.release(); // TODO: Some poor statistics. - stats_counter += WRITER_N_FRAMES_BUFFER; + stats_counter += WRITER_N_FRAMES_BUFFER_live; auto write_end_time = chrono::steady_clock::now(); auto write_us_duration = chrono::duration_cast( write_end_time-start_time).count();