diff --git a/sf-writer/test/manual/test_sf_writer_recv.cpp b/sf-writer/test/manual/test_sf_writer_recv.cpp index f90d8ce..4f2e463 100644 --- a/sf-writer/test/manual/test_sf_writer_recv.cpp +++ b/sf-writer/test/manual/test_sf_writer_recv.cpp @@ -23,66 +23,32 @@ void receive_replay( const uint64_t stop_pulse_id) { try { + WriterZmqReceiver receiver(ctx, ipc_prefix, n_modules, stop_pulse_id); - void* sockets_[n_modules]; - ModuleFrame f_meta_; - char* image_buffer = new char[MODULE_N_BYTES*n_modules]; - - for (size_t i = 0; i < n_modules; i++) { - sockets_[i] = zmq_socket(ctx, ZMQ_PULL); - - int rcvhwm = WRITER_RCVHWM; - if (zmq_setsockopt(sockets_[i], ZMQ_RCVHWM, &rcvhwm, - sizeof(rcvhwm)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - int linger = 0; - if (zmq_setsockopt(sockets_[i], ZMQ_LINGER, &linger, - sizeof(linger)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - stringstream ipc_addr; - ipc_addr << ipc_prefix << i; - const auto ipc = ipc_addr.str(); - - if (zmq_connect(sockets_[i], ipc.c_str()) != 0) { - throw runtime_error(zmq_strerror(errno)); - } + int slot_id; + while((slot_id = queue.reserve()) == -1) { + this_thread::sleep_for(chrono::milliseconds( + RB_READ_RETRY_INTERVAL_MS)); } uint64_t pulse_id = start_pulse_id; // "<= stop_pulse_id" because we include the last pulse_id. while(pulse_id <= stop_pulse_id) { - auto start_time = chrono::steady_clock::now(); - for (size_t i_module = 0; i_module < n_modules; i_module++) { + auto image_metadata = queue.get_metadata_buffer(slot_id); + auto image_buffer = queue.get_data_buffer(slot_id); - auto n_bytes_metadata = zmq_recv( - sockets_[i_module], &f_meta_, sizeof(f_meta_), 0); + receiver.get_next_buffer(pulse_id, image_metadata, image_buffer); - if (n_bytes_metadata != sizeof(f_meta_)) { - throw runtime_error("Wrong number of metadata bytes."); - } - - auto module_offset = i_module * MODULE_N_BYTES; - - auto n_bytes_image = zmq_recv( - sockets_[i_module], - (image_buffer + module_offset), - MODULE_N_BYTES, 0); - - if (n_bytes_image != MODULE_N_BYTES) { - throw runtime_error("Wrong number of data bytes."); - } - } + pulse_id += image_metadata->n_images; auto end_time = chrono::steady_clock::now(); auto read_us_duration = chrono::duration_cast( end_time-start_time).count(); - cout << "sf_writer:read_us " << read_us_duration << endl; + cout << "sf_writer::avg_read_us "; + cout << read_us_duration / image_metadata->n_images << endl; } queue.commit();