diff --git a/sf-writer/test/manual/test_sf_writer_recv.cpp b/sf-writer/test/manual/test_sf_writer_recv.cpp index ba28f12..ea61e66 100644 --- a/sf-writer/test/manual/test_sf_writer_recv.cpp +++ b/sf-writer/test/manual/test_sf_writer_recv.cpp @@ -23,24 +23,59 @@ void receive_replay( const uint64_t stop_pulse_id) { try { - WriterZmqReceiver receiver(ctx, ipc_prefix, n_modules, stop_pulse_id); - int slot_id; - while((slot_id = queue.reserve()) == -1) { - this_thread::sleep_for(chrono::milliseconds( - RB_READ_RETRY_INTERVAL_MS)); + void* sockets_[n_modules]; + ModuleFrame f_meta_; + char* image_buffer = new char[MODULE_N_BYTES*n_modules]; + + for (size_t i = 0; i < n_modules; i++) { + sockets_[i] = zmq_socket(ctx, ZMQ_PULL); + + int rcvhwm = WRITER_RCVHWM; + if (zmq_setsockopt(sockets_[i], ZMQ_RCVHWM, &rcvhwm, + sizeof(rcvhwm)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + int linger = 0; + if (zmq_setsockopt(sockets_[i], ZMQ_LINGER, &linger, + sizeof(linger)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + stringstream ipc_addr; + ipc_addr << ipc_prefix << i; + const auto ipc = ipc_addr.str(); + + if (zmq_connect(sockets_[i], ipc.c_str()) != 0) { + throw runtime_error(zmq_strerror(errno)); + } } uint64_t pulse_id = start_pulse_id; // "<= stop_pulse_id" because we include the last pulse_id. while(pulse_id <= stop_pulse_id) { - auto image_metadata = queue.get_metadata_buffer(slot_id); - auto image_buffer = queue.get_data_buffer(slot_id); + for (size_t i_module = 0; i_module < n_modules; i_module++) { - receiver.get_next_buffer(pulse_id, image_metadata, image_buffer); + auto n_bytes_metadata = zmq_recv( + sockets_[i_module], &f_meta_, sizeof(f_meta_), 0); - pulse_id += image_metadata->n_images; + if (n_bytes_metadata != sizeof(f_meta_)) { + throw runtime_error("Wrong number of metadata bytes."); + } + + auto module_offset = i_module * MODULE_N_BYTES; + + auto n_bytes_image = zmq_recv( + sockets_[i_module], + (image_buffer + module_offset), + MODULE_N_BYTES, 0); + + if (n_bytes_image != MODULE_N_BYTES) { + throw runtime_error("Wrong number of data bytes."); + } + + } } queue.commit();