diff --git a/sf-writer/test/manual/test_sf_writer.cpp b/sf-writer/test/manual/test_sf_writer_recv.cpp similarity index 77% rename from sf-writer/test/manual/test_sf_writer.cpp rename to sf-writer/test/manual/test_sf_writer_recv.cpp index c992586..7d25011 100644 --- a/sf-writer/test/manual/test_sf_writer.cpp +++ b/sf-writer/test/manual/test_sf_writer_recv.cpp @@ -1,15 +1,12 @@ #include -#include #include "buffer_config.hpp" #include "zmq.h" #include -#include #include #include #include "WriterH5Writer.hpp" #include #include -#include #include "date.h" #include "bitshuffle/bitshuffle.h" #include "WriterZmqReceiver.hpp" @@ -21,17 +18,16 @@ void receive_replay( void* ctx, const string ipc_prefix, const size_t n_modules, - FastQueue& queue, + FastQueue& queue, const uint64_t start_pulse_id, const uint64_t stop_pulse_id) { try { - WriterZmqReceiver receiver(ctx, ipc_prefix, n_modules); - - uint64_t current_pulse_id=start_pulse_id; + WriterZmqReceiver receiver(ctx, ipc_prefix, n_modules, stop_pulse_id); + uint64_t pulse_id = start_pulse_id; // "<= stop_pulse_id" because we include the last pulse_id. - while(current_pulse_id<=stop_pulse_id) { + while(pulse_id <= stop_pulse_id) { int slot_id; while((slot_id = queue.reserve()) == -1) { @@ -42,17 +38,11 @@ void receive_replay( auto image_metadata = queue.get_metadata_buffer(slot_id); auto image_buffer = queue.get_data_buffer(slot_id); - cout << "Received " << image_metadata->pulse_id << endl; - - receiver.get_next_batch( - current_pulse_id, image_metadata, image_buffer); - - if (image_metadata->pulse_id != current_pulse_id) { - throw runtime_error("Wrong pulse id from zmq receiver."); - } + receiver.get_next_buffer(pulse_id, image_metadata, image_buffer); queue.commit(); - current_pulse_id++; + + pulse_id += image_metadata->n_images; } } catch (const std::exception& e) { @@ -89,8 +79,8 @@ int main (int argc, char *argv[]) size_t n_modules = 32; - FastQueue queue( - MODULE_N_BYTES * n_modules, + FastQueue queue( + MODULE_N_BYTES * n_modules * WRITER_DATA_CACHE_N_IMAGES, WRITER_FASTQUEUE_N_SLOTS); auto ctx = zmq_ctx_new(); @@ -112,10 +102,13 @@ int main (int argc, char *argv[]) auto metadata = queue.get_metadata_buffer(slot_id); - cout << "Written image " << metadata->pulse_id << endl; - current_pulse_id++; + for (int i_pulse=0; i_pulse < metadata->n_images; i_pulse++) { + cout << "Written image " << metadata->pulse_id[i_pulse] << endl; + + } queue.release(); + current_pulse_id += metadata->n_images; } //wait till receive thread is finished