From f972ef462ddec97fd506650a08edf151ee3c3784 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 20 May 2020 10:42:34 +0200 Subject: [PATCH] Make simple zmq example --- sf-buffer/src/sf_writer.cpp | 55 +++++++++++++++++++++++++++---------- 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/sf-buffer/src/sf_writer.cpp b/sf-buffer/src/sf_writer.cpp index f4ceb8f..30e5e8c 100644 --- a/sf-buffer/src/sf_writer.cpp +++ b/sf-buffer/src/sf_writer.cpp @@ -26,30 +26,57 @@ void receive_replay( const uint64_t stop_pulse_id) { try { - WriterZmqReceiver receiver(ctx, ipc_prefix, n_modules); - BufferedFastQueue buffered_queue( - queue, WRITER_DATA_CACHE_N_IMAGES, n_modules); + + void* sockets_[n_modules]; + for (size_t i = 0; i < n_modules; i++) { + sockets_[i] = zmq_socket(ctx, ZMQ_PULL); + + int rcvhwm = WRITER_RCVHWM; + if (zmq_setsockopt(sockets_[i], ZMQ_RCVHWM, &rcvhwm, + sizeof(rcvhwm)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + int linger = 0; + if (zmq_setsockopt(sockets_[i], ZMQ_LINGER, &linger, + sizeof(linger)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + stringstream ipc_addr; + ipc_addr << ipc_prefix << i; + const auto ipc = ipc_addr.str(); + + if (zmq_connect(sockets_[i], ipc.c_str()) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + } uint64_t current_pulse_id=start_pulse_id; + StreamModuleFrame frame_metadata; + char* image_buffer[MODULE_N_BYTES]; // "<= stop_pulse_id" because we include the last pulse_id. while(current_pulse_id<=stop_pulse_id) { + for (size_t i_module = 0; i_module < n_modules; i_module++) { + auto n_bytes_metadata = zmq_recv( + sockets_[i_module], + &frame_metadata, + sizeof(StreamModuleFrame), + 0); - auto image_metadata = buffered_queue.get_metadata_buffer(); - auto image_buffer = buffered_queue.get_data_buffer(); - - receiver.get_next_image( - current_pulse_id, image_metadata, image_buffer); - - if (image_metadata->pulse_id != current_pulse_id) { - throw runtime_error("Wrong pulse id from zmq receiver."); + cout << "received " << frame_metadata.metadata.pulse_id; + cout << " from " << frame_metadata.metadata.module_id; + + auto n_bytes_image = zmq_recv( + sockets_[i_module], +// (image_buffer + image_buffer_offset), + image_buffer, + frame_metadata.data_n_bytes, + 0); } - buffered_queue.commit(); - current_pulse_id++; } - buffered_queue.finalize(); } catch (const std::exception& e) { using namespace date;