From 3bb63c47be53c54b2c58a0e063764501b1aba6de Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 20 May 2020 10:51:46 +0200 Subject: [PATCH] Revert back example --- sf-buffer/src/sf_writer.cpp | 55 ++++++++++--------------------------- 1 file changed, 14 insertions(+), 41 deletions(-) diff --git a/sf-buffer/src/sf_writer.cpp b/sf-buffer/src/sf_writer.cpp index 30e5e8c..f4ceb8f 100644 --- a/sf-buffer/src/sf_writer.cpp +++ b/sf-buffer/src/sf_writer.cpp @@ -26,57 +26,30 @@ void receive_replay( const uint64_t stop_pulse_id) { try { - - void* sockets_[n_modules]; - for (size_t i = 0; i < n_modules; i++) { - sockets_[i] = zmq_socket(ctx, ZMQ_PULL); - - int rcvhwm = WRITER_RCVHWM; - if (zmq_setsockopt(sockets_[i], ZMQ_RCVHWM, &rcvhwm, - sizeof(rcvhwm)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - int linger = 0; - if (zmq_setsockopt(sockets_[i], ZMQ_LINGER, &linger, - sizeof(linger)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - stringstream ipc_addr; - ipc_addr << ipc_prefix << i; - const auto ipc = ipc_addr.str(); - - if (zmq_connect(sockets_[i], ipc.c_str()) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - } + WriterZmqReceiver receiver(ctx, ipc_prefix, n_modules); + BufferedFastQueue buffered_queue( + queue, WRITER_DATA_CACHE_N_IMAGES, n_modules); uint64_t current_pulse_id=start_pulse_id; - StreamModuleFrame frame_metadata; - char* image_buffer[MODULE_N_BYTES]; // "<= stop_pulse_id" because we include the last pulse_id. while(current_pulse_id<=stop_pulse_id) { - for (size_t i_module = 0; i_module < n_modules; i_module++) { - auto n_bytes_metadata = zmq_recv( - sockets_[i_module], - &frame_metadata, - sizeof(StreamModuleFrame), - 0); - cout << "received " << frame_metadata.metadata.pulse_id; - cout << " from " << frame_metadata.metadata.module_id; - - auto n_bytes_image = zmq_recv( - sockets_[i_module], -// (image_buffer + image_buffer_offset), - image_buffer, - frame_metadata.data_n_bytes, - 0); + auto image_metadata = buffered_queue.get_metadata_buffer(); + auto image_buffer = buffered_queue.get_data_buffer(); + + receiver.get_next_image( + current_pulse_id, image_metadata, image_buffer); + + if (image_metadata->pulse_id != current_pulse_id) { + throw runtime_error("Wrong pulse id from zmq receiver."); } + buffered_queue.commit(); + current_pulse_id++; } + buffered_queue.finalize(); } catch (const std::exception& e) { using namespace date;