diff --git a/CMakeLists.txt b/CMakeLists.txt index b57f05f..3c0d03a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -275,7 +275,7 @@ else() if(CMAKE_BUILD_TYPE STREQUAL "Release") message(STATUS "Release build") - target_compile_options(aare_compiler_flags INTERFACE -O3) + target_compile_options(aare_compiler_flags INTERFACE -O3 -g) else() message(STATUS "Debug build") endif() @@ -426,6 +426,7 @@ if(AARE_TESTS) ${CMAKE_CURRENT_SOURCE_DIR}/src/Cluster.test.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/CalculateEta.test.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/ClusterFile.test.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/src/ClusterFinderMT.test.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/Pedestal.test.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/JungfrauDataFile.test.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/NumpyFile.test.cpp diff --git a/include/aare/ClusterCollector.hpp b/include/aare/ClusterCollector.hpp index cb49f58..ae78a8e 100644 --- a/include/aare/ClusterCollector.hpp +++ b/include/aare/ClusterCollector.hpp @@ -37,7 +37,11 @@ class ClusterCollector { public: ClusterCollector(ClusterFinderMT *source) { m_source = source->sink(); - m_thread = std::thread(&ClusterCollector::process, this); + m_thread = + std::thread(&ClusterCollector::process, + this); // only one process does that so why isnt it + // automatically written to m_cluster in collect + // - instead of writing first to m_sink? } void stop() { m_stop_requested = true; diff --git a/include/aare/ClusterFinderMT.hpp b/include/aare/ClusterFinderMT.hpp index 29fc715..2dfb279 100644 --- a/include/aare/ClusterFinderMT.hpp +++ b/include/aare/ClusterFinderMT.hpp @@ -34,6 +34,7 @@ template , typename FRAME_TYPE = uint16_t, typename PEDESTAL_TYPE = double> class ClusterFinderMT { + protected: using CT = typename ClusterType::value_type; size_t m_current_thread{0}; size_t m_n_threads{0}; @@ -50,6 +51,7 @@ class ClusterFinderMT { std::thread m_collect_thread; std::chrono::milliseconds m_default_wait{1}; + private: std::atomic m_stop_requested{false}; std::atomic m_processing_threads_stopped{true}; @@ -120,6 +122,7 @@ class ClusterFinderMT { ClusterFinderMT(Shape<2> image_size, PEDESTAL_TYPE nSigma = 5.0, size_t capacity = 2000, size_t n_threads = 3) : m_n_threads(n_threads) { + for (size_t i = 0; i < n_threads; i++) { m_cluster_finders.push_back( std::make_unique< diff --git a/src/ClusterFinderMT.test.cpp b/src/ClusterFinderMT.test.cpp new file mode 100644 index 0000000..9289592 --- /dev/null +++ b/src/ClusterFinderMT.test.cpp @@ -0,0 +1,99 @@ + +#include "aare/ClusterFinderMT.hpp" +#include "aare/Cluster.hpp" +#include "aare/ClusterCollector.hpp" +#include "aare/File.hpp" + +#include "test_config.hpp" + +#include +#include +#include + +using namespace aare; + +// wrapper function to access private member variables for testing +template +class ClusterFinderMTWrapper + : public ClusterFinderMT { + + public: + ClusterFinderMTWrapper(Shape<2> image_size, PEDESTAL_TYPE nSigma = 5.0, + size_t capacity = 2000, size_t n_threads = 3) + : ClusterFinderMT( + image_size, nSigma, capacity, n_threads) {} + + size_t get_m_input_queues_size() const { + return this->m_input_queues.size(); + } + + size_t get_m_output_queues_size() const { + return this->m_output_queues.size(); + } + + size_t get_m_cluster_finders_size() const { + return this->m_cluster_finders.size(); + } + + bool m_output_queues_are_empty() const { + for (auto &queue : this->m_output_queues) { + if (!queue->isEmpty()) + return false; + } + return true; + } + + bool m_input_queues_are_empty() const { + for (auto &queue : this->m_input_queues) { + if (!queue->isEmpty()) + return false; + } + return true; + } + + bool m_sink_is_empty() const { return this->m_sink.isEmpty(); } + + size_t m_sink_size() const { return this->m_sink.sizeGuess(); } +}; + +TEST_CASE("multithreaded cluster finder", "[.files][.ClusterFinder]") { + auto fpath = "/mnt/sls_det_storage/matterhorn_data/aare_test_data/" + "Moench03new/cu_half_speed_master_4.json"; + + File file(fpath); + + size_t n_threads = 2; + size_t n_frames_pd = 10; + + using ClusterType = Cluster; + + ClusterFinderMTWrapper cf( + {static_cast(file.rows()), static_cast(file.cols())}, + 5, 2000, n_threads); // no idea what frame type is!!! default uint16_t + + CHECK(cf.get_m_input_queues_size() == n_threads); + CHECK(cf.get_m_output_queues_size() == n_threads); + CHECK(cf.get_m_cluster_finders_size() == n_threads); + CHECK(cf.m_output_queues_are_empty() == true); + CHECK(cf.m_input_queues_are_empty() == true); + + for (size_t i = 0; i < n_frames_pd; ++i) { + cf.find_clusters(file.read_frame().view()); + } + + cf.stop(); + + CHECK(cf.m_output_queues_are_empty() == true); + CHECK(cf.m_input_queues_are_empty() == true); + + CHECK(cf.m_sink_size() == n_frames_pd); + ClusterCollector clustercollector(&cf); + + clustercollector.stop(); + + CHECK(cf.m_sink_size() == 0); + + auto clustervec = clustercollector.steal_clusters(); + // CHECK(clustervec.size() == ) //dont know how many clusters to expect +}