added multithreaded cluster finder test
All checks were successful
Build on RHEL9 / buildh (push) Successful in 2m20s

This commit is contained in:
mazzol_a 2025-04-17 17:09:53 +02:00
parent c49a2fdf8e
commit 177459c98a
4 changed files with 109 additions and 2 deletions

View File

@ -275,7 +275,7 @@ else()
if(CMAKE_BUILD_TYPE STREQUAL "Release")
message(STATUS "Release build")
target_compile_options(aare_compiler_flags INTERFACE -O3)
target_compile_options(aare_compiler_flags INTERFACE -O3 -g)
else()
message(STATUS "Debug build")
endif()
@ -426,6 +426,7 @@ if(AARE_TESTS)
${CMAKE_CURRENT_SOURCE_DIR}/src/Cluster.test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/CalculateEta.test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ClusterFile.test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ClusterFinderMT.test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/Pedestal.test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/JungfrauDataFile.test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/NumpyFile.test.cpp

View File

@ -37,7 +37,11 @@ class ClusterCollector {
public:
ClusterCollector(ClusterFinderMT<ClusterType, uint16_t, double> *source) {
m_source = source->sink();
m_thread = std::thread(&ClusterCollector::process, this);
m_thread =
std::thread(&ClusterCollector::process,
this); // only one process does that so why isnt it
// automatically written to m_cluster in collect
// - instead of writing first to m_sink?
}
void stop() {
m_stop_requested = true;

View File

@ -34,6 +34,7 @@ template <typename ClusterType = Cluster<int32_t, 3, 3>,
typename FRAME_TYPE = uint16_t, typename PEDESTAL_TYPE = double>
class ClusterFinderMT {
protected:
using CT = typename ClusterType::value_type;
size_t m_current_thread{0};
size_t m_n_threads{0};
@ -50,6 +51,7 @@ class ClusterFinderMT {
std::thread m_collect_thread;
std::chrono::milliseconds m_default_wait{1};
private:
std::atomic<bool> m_stop_requested{false};
std::atomic<bool> m_processing_threads_stopped{true};
@ -120,6 +122,7 @@ class ClusterFinderMT {
ClusterFinderMT(Shape<2> image_size, PEDESTAL_TYPE nSigma = 5.0,
size_t capacity = 2000, size_t n_threads = 3)
: m_n_threads(n_threads) {
for (size_t i = 0; i < n_threads; i++) {
m_cluster_finders.push_back(
std::make_unique<

View File

@ -0,0 +1,99 @@
#include "aare/ClusterFinderMT.hpp"
#include "aare/Cluster.hpp"
#include "aare/ClusterCollector.hpp"
#include "aare/File.hpp"
#include "test_config.hpp"
#include <catch2/catch_test_macros.hpp>
#include <filesystem>
#include <memory>
using namespace aare;
// wrapper function to access private member variables for testing
template <typename ClusterType, typename FRAME_TYPE = uint16_t,
typename PEDESTAL_TYPE = double>
class ClusterFinderMTWrapper
: public ClusterFinderMT<ClusterType, FRAME_TYPE, PEDESTAL_TYPE> {
public:
ClusterFinderMTWrapper(Shape<2> image_size, PEDESTAL_TYPE nSigma = 5.0,
size_t capacity = 2000, size_t n_threads = 3)
: ClusterFinderMT<ClusterType, FRAME_TYPE, PEDESTAL_TYPE>(
image_size, nSigma, capacity, n_threads) {}
size_t get_m_input_queues_size() const {
return this->m_input_queues.size();
}
size_t get_m_output_queues_size() const {
return this->m_output_queues.size();
}
size_t get_m_cluster_finders_size() const {
return this->m_cluster_finders.size();
}
bool m_output_queues_are_empty() const {
for (auto &queue : this->m_output_queues) {
if (!queue->isEmpty())
return false;
}
return true;
}
bool m_input_queues_are_empty() const {
for (auto &queue : this->m_input_queues) {
if (!queue->isEmpty())
return false;
}
return true;
}
bool m_sink_is_empty() const { return this->m_sink.isEmpty(); }
size_t m_sink_size() const { return this->m_sink.sizeGuess(); }
};
TEST_CASE("multithreaded cluster finder", "[.files][.ClusterFinder]") {
auto fpath = "/mnt/sls_det_storage/matterhorn_data/aare_test_data/"
"Moench03new/cu_half_speed_master_4.json";
File file(fpath);
size_t n_threads = 2;
size_t n_frames_pd = 10;
using ClusterType = Cluster<int32_t, 3, 3>;
ClusterFinderMTWrapper<ClusterType> cf(
{static_cast<int64_t>(file.rows()), static_cast<int64_t>(file.cols())},
5, 2000, n_threads); // no idea what frame type is!!! default uint16_t
CHECK(cf.get_m_input_queues_size() == n_threads);
CHECK(cf.get_m_output_queues_size() == n_threads);
CHECK(cf.get_m_cluster_finders_size() == n_threads);
CHECK(cf.m_output_queues_are_empty() == true);
CHECK(cf.m_input_queues_are_empty() == true);
for (size_t i = 0; i < n_frames_pd; ++i) {
cf.find_clusters(file.read_frame().view<uint16_t>());
}
cf.stop();
CHECK(cf.m_output_queues_are_empty() == true);
CHECK(cf.m_input_queues_are_empty() == true);
CHECK(cf.m_sink_size() == n_frames_pd);
ClusterCollector<ClusterType> clustercollector(&cf);
clustercollector.stop();
CHECK(cf.m_sink_size() == 0);
auto clustervec = clustercollector.steal_clusters();
// CHECK(clustervec.size() == ) //dont know how many clusters to expect
}