diff --git a/sf-writer/src/main.cpp b/sf-writer/src/main.cpp index 3c8db4c..d5b0031 100644 --- a/sf-writer/src/main.cpp +++ b/sf-writer/src/main.cpp @@ -1,5 +1,4 @@ #include -#include #include #include #include @@ -7,12 +6,11 @@ #include "date.h" #include "zmq.h" -#include "jungfrau.hpp" #include "buffer_config.hpp" #include "bitshuffle/bitshuffle.h" -#include "WriterZmqReceiver.hpp" #include "JFH5Writer.hpp" #include "BufferBinaryReader.hpp" +#include "ImageAssembler.hpp" using namespace std; using namespace core_buffer; @@ -21,12 +19,21 @@ using namespace chrono; void read_buffer( const string device, const string channel_name, - const vector& buffer_blocks) + const int i_module, + const vector& buffer_blocks, + ImageAssembler& image_assembler) { BufferBinaryReader block_reader(device, channel_name); auto block_buffer = new BufferBinaryBlock(); for (uint64_t block_number:buffer_blocks) { + + int slot_id; + while((slot_id = image_assembler.get_free_slot()) == -1) { + this_thread::sleep_for(chrono::milliseconds( + RB_READ_RETRY_INTERVAL_MS)); + } + auto start_time = steady_clock::now(); block_reader.get_block(block_number, block_buffer); @@ -37,16 +44,15 @@ void read_buffer( start_time = steady_clock::now(); - // TODO: Send to composition. + image_assembler.process(slot_id, i_module, block_buffer); end_time = steady_clock::now(); uint64_t compose_us_duration = duration_cast( end_time-start_time).count(); - // TODO: Proper statistics cout << "sf_replay:avg_read_us "; cout << read_us_duration / BUFFER_BLOCK_SIZE << endl; - cout << "sf_replay:avg_compose_us "; + cout << "sf_replay:avg_assemble_us "; cout << compose_us_duration / BUFFER_BLOCK_SIZE << endl; } @@ -79,6 +85,8 @@ int main (int argc, char *argv[]) uint64_t stop_block = stop_pulse_id / BUFFER_BLOCK_SIZE; auto n_blocks = stop_block - start_block + 1; + ImageAssembler image_assembler; + // Generate list of buffer blocks that need to be loaded. std::vector buffer_blocks(n_blocks); for (uint64_t curr_block=start_block; @@ -91,24 +99,26 @@ int main (int argc, char *argv[]) for (size_t i_module=0; i_module( end_time-start_time).count(); - queue.release(); + image_assembler.free_slot(slot_id); cout << "sf_writer:avg_write_us "; cout << write_us_duration / BUFFER_BLOCK_SIZE << endl;