diff --git a/CMakeLists.txt b/CMakeLists.txt index 44e8243..3fcb1f5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,4 +31,5 @@ add_subdirectory( add_subdirectory("core-buffer") add_subdirectory("sf-buffer") add_subdirectory("sf-stream") -add_subdirectory("sf-writer") \ No newline at end of file +add_subdirectory("sf-writer") +add_subdirectory("jf-live-writer") \ No newline at end of file diff --git a/jf-live-writer/CMakeLists.txt b/jf-live-writer/CMakeLists.txt new file mode 100644 index 0000000..6028bfe --- /dev/null +++ b/jf-live-writer/CMakeLists.txt @@ -0,0 +1,22 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(jf-live-writer-lib STATIC ${SOURCES}) +target_include_directories(jf-live-writer-lib PUBLIC include/) +target_link_libraries(jf-live-writer-lib + external + core-buffer-lib) + +add_executable(jf-live-writer src/main.cpp) +set_target_properties(jf-live-writer PROPERTIES OUTPUT_NAME jf_live_writer) +target_link_libraries(jf-live-writer + jf-live-writer-lib + sf-writer-lib + hdf5 + hdf5_hl + hdf5_cpp + pthread + ) + +enable_testing() +add_subdirectory(test/) \ No newline at end of file diff --git a/jf-live-writer/src/main.cpp b/jf-live-writer/src/main.cpp new file mode 100644 index 0000000..7a73af7 --- /dev/null +++ b/jf-live-writer/src/main.cpp @@ -0,0 +1,158 @@ +#include +#include +#include +#include +#include + +#include "date.h" +#include "zmq.h" +#include "writer_config.hpp" +#include "buffer_config.hpp" +#include "bitshuffle/bitshuffle.h" +#include "JFH5Writer.hpp" +#include "ImageAssembler.hpp" +#include "BufferBinaryReader.hpp" + +using namespace std; +using namespace chrono; +using namespace writer_config; +using namespace buffer_config; + +void read_buffer( + const string device, + const string channel_name, + const int i_module, + const vector& buffer_blocks, + ImageAssembler& image_assembler) +{ + BufferBinaryReader block_reader(device, channel_name); + auto block_buffer = new BufferBinaryBlock(); + + for (uint64_t block_id:buffer_blocks) { + + while(!image_assembler.is_slot_free(block_id)) { + this_thread::sleep_for(chrono::milliseconds(ASSEMBLER_RETRY_MS)); + } + + auto start_time = steady_clock::now(); + + block_reader.get_block(block_id, block_buffer); + + auto end_time = steady_clock::now(); + uint64_t read_us_duration = duration_cast( + end_time-start_time).count(); + + start_time = steady_clock::now(); + + image_assembler.process(block_id, i_module, block_buffer); + + end_time = steady_clock::now(); + uint64_t compose_us_duration = duration_cast( + end_time-start_time).count(); + + cout << "sf_writer:avg_read_us "; + cout << read_us_duration / BUFFER_BLOCK_SIZE << endl; + cout << "sf_writer:avg_assemble_us "; + cout << compose_us_duration / BUFFER_BLOCK_SIZE << endl; + } + + delete block_buffer; +} + +int main (int argc, char *argv[]) +{ + if (argc != 7) { + cout << endl; + cout << "Usage: sf_writer [output_file] [device] [n_modules]"; + cout << " [start_pulse_id] [stop_pulse_id] [pulse_id_step]"; + cout << endl; + cout << "\toutput_file: Complete path to the output file." << endl; + cout << "\tdevice: Name of detector." << endl; + cout << "\tn_modules: number of modules" << endl; + cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; + cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl; + cout << "\tpulse_id_step: 1==100Hz, 2==50hz, 4==25Hz.." << endl; + cout << endl; + + exit(-1); + } + + string output_file = string(argv[1]); + const string device = string(argv[2]); + size_t n_modules = atoi(argv[3]); + uint64_t start_pulse_id = (uint64_t) atoll(argv[4]); + uint64_t stop_pulse_id = (uint64_t) atoll(argv[5]); + int pulse_id_step = atoi(argv[6]); + + // Align start (up) and stop(down) pulse_id with pulse_id_step. + if (start_pulse_id % pulse_id_step != 0) { + start_pulse_id += pulse_id_step - (start_pulse_id % pulse_id_step); + } + if (stop_pulse_id % pulse_id_step != 0) { + stop_pulse_id -= (start_pulse_id % pulse_id_step); + } + + uint64_t start_block = start_pulse_id / BUFFER_BLOCK_SIZE; + uint64_t stop_block = stop_pulse_id / BUFFER_BLOCK_SIZE; + + // Generate list of buffer blocks that need to be loaded. + std::vector buffer_blocks; + for (uint64_t i_block=start_block; i_block <= stop_block; i_block++) { + buffer_blocks.push_back(i_block); + } + + ImageAssembler image_assembler(n_modules); + + std::vector reading_threads(n_modules); + for (size_t i_module=0; i_module( + end_time-start_time).count(); + + image_assembler.free_slot(block_id); + + cout << "sf_writer:avg_write_us "; + cout << write_us_duration / BUFFER_BLOCK_SIZE << endl; + } + + for (auto& reading_thread : reading_threads) { + if (reading_thread.joinable()) { + reading_thread.join(); + } + } + + return 0; +} diff --git a/jf-live-writer/test/CMakeLists.txt b/jf-live-writer/test/CMakeLists.txt new file mode 100644 index 0000000..1079fc2 --- /dev/null +++ b/jf-live-writer/test/CMakeLists.txt @@ -0,0 +1,10 @@ +add_executable(jf-live-writer-tests main.cpp) + +target_link_libraries(jf-live-writer-tests + jf-live-writer-lib + hdf5 + hdf5_hl + hdf5_cpp + zmq + gtest + ) diff --git a/jf-live-writer/test/main.cpp b/jf-live-writer/test/main.cpp new file mode 100644 index 0000000..e819294 --- /dev/null +++ b/jf-live-writer/test/main.cpp @@ -0,0 +1,8 @@ +#include "gtest/gtest.h" + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}