refactored other cpp files

This commit is contained in:
Mazzoleni Alice Francesca 2025-04-02 16:00:46 +02:00
parent 61af1105a1
commit 98d2d6098e
2 changed files with 61 additions and 53 deletions

View File

@ -2,29 +2,31 @@
#include <atomic> #include <atomic>
#include <thread> #include <thread>
#include "aare/ProducerConsumerQueue.hpp"
#include "aare/ClusterVector.hpp"
#include "aare/ClusterFinderMT.hpp" #include "aare/ClusterFinderMT.hpp"
#include "aare/ClusterVector.hpp"
#include "aare/ProducerConsumerQueue.hpp"
namespace aare { namespace aare {
class ClusterCollector{ template <typename ClusterType,
ProducerConsumerQueue<ClusterVector<int>>* m_source; typename = std::enable_if_t<is_cluster_v<ClusterType>>>
std::atomic<bool> m_stop_requested{false}; class ClusterCollector {
std::atomic<bool> m_stopped{true}; ProducerConsumerQueue<ClusterVector<ClusterType>> *m_source;
std::chrono::milliseconds m_default_wait{1}; std::atomic<bool> m_stop_requested{false};
std::thread m_thread; std::atomic<bool> m_stopped{true};
std::vector<ClusterVector<int>> m_clusters; std::chrono::milliseconds m_default_wait{1};
std::thread m_thread;
std::vector<ClusterVector<ClusterType>> m_clusters;
void process(){ void process() {
m_stopped = false; m_stopped = false;
fmt::print("ClusterCollector started\n"); fmt::print("ClusterCollector started\n");
while (!m_stop_requested || !m_source->isEmpty()) { while (!m_stop_requested || !m_source->isEmpty()) {
if (ClusterVector<int> *clusters = m_source->frontPtr(); if (ClusterVector<ClusterType> *clusters = m_source->frontPtr();
clusters != nullptr) { clusters != nullptr) {
m_clusters.push_back(std::move(*clusters)); m_clusters.push_back(std::move(*clusters));
m_source->popFront(); m_source->popFront();
}else{ } else {
std::this_thread::sleep_for(m_default_wait); std::this_thread::sleep_for(m_default_wait);
} }
} }
@ -32,21 +34,21 @@ class ClusterCollector{
m_stopped = true; m_stopped = true;
} }
public: public:
ClusterCollector(ClusterFinderMT<uint16_t, double, int32_t>* source){ ClusterCollector(ClusterFinderMT<uint16_t, double, int32_t> *source) {
m_source = source->sink(); m_source = source->sink();
m_thread = std::thread(&ClusterCollector::process, this); m_thread = std::thread(&ClusterCollector::process, this);
} }
void stop(){ void stop() {
m_stop_requested = true; m_stop_requested = true;
m_thread.join(); m_thread.join();
} }
std::vector<ClusterVector<int>> steal_clusters(){ std::vector<ClusterVector<ClusterType>> steal_clusters() {
if(!m_stopped){ if (!m_stopped) {
throw std::runtime_error("ClusterCollector is still running"); throw std::runtime_error("ClusterCollector is still running");
}
return std::move(m_clusters);
} }
return std::move(m_clusters);
}
}; };
} // namespace aare } // namespace aare

View File

@ -3,35 +3,41 @@
#include <filesystem> #include <filesystem>
#include <thread> #include <thread>
#include "aare/ProducerConsumerQueue.hpp"
#include "aare/ClusterVector.hpp"
#include "aare/ClusterFinderMT.hpp" #include "aare/ClusterFinderMT.hpp"
#include "aare/ClusterVector.hpp"
#include "aare/ProducerConsumerQueue.hpp"
namespace aare{ namespace aare {
class ClusterFileSink{ template <typename ClusterType,
ProducerConsumerQueue<ClusterVector<int>>* m_source; typename = std::enable_if_t <
is_cluster_v<ClusterType> class ClusterFileSink {
ProducerConsumerQueue<ClusterVector<ClusterType>> *m_source;
std::atomic<bool> m_stop_requested{false}; std::atomic<bool> m_stop_requested{false};
std::atomic<bool> m_stopped{true}; std::atomic<bool> m_stopped{true};
std::chrono::milliseconds m_default_wait{1}; std::chrono::milliseconds m_default_wait{1};
std::thread m_thread; std::thread m_thread;
std::ofstream m_file; std::ofstream m_file;
void process() {
void process(){
m_stopped = false; m_stopped = false;
fmt::print("ClusterFileSink started\n"); fmt::print("ClusterFileSink started\n");
while (!m_stop_requested || !m_source->isEmpty()) { while (!m_stop_requested || !m_source->isEmpty()) {
if (ClusterVector<int> *clusters = m_source->frontPtr(); if (ClusterVector<ClusterType> *clusters = m_source->frontPtr();
clusters != nullptr) { clusters != nullptr) {
// Write clusters to file // Write clusters to file
int32_t frame_number = clusters->frame_number(); //TODO! Should we store frame number already as int? int32_t frame_number =
clusters->frame_number(); // TODO! Should we store frame
// number already as int?
uint32_t num_clusters = clusters->size(); uint32_t num_clusters = clusters->size();
m_file.write(reinterpret_cast<const char*>(&frame_number), sizeof(frame_number)); m_file.write(reinterpret_cast<const char *>(&frame_number),
m_file.write(reinterpret_cast<const char*>(&num_clusters), sizeof(num_clusters)); sizeof(frame_number));
m_file.write(reinterpret_cast<const char*>(clusters->data()), clusters->size() * clusters->item_size()); m_file.write(reinterpret_cast<const char *>(&num_clusters),
sizeof(num_clusters));
m_file.write(reinterpret_cast<const char *>(clusters->data()),
clusters->size() * clusters->item_size());
m_source->popFront(); m_source->popFront();
}else{ } else {
std::this_thread::sleep_for(m_default_wait); std::this_thread::sleep_for(m_default_wait);
} }
} }
@ -39,18 +45,18 @@ class ClusterFileSink{
m_stopped = true; m_stopped = true;
} }
public: public:
ClusterFileSink(ClusterFinderMT<uint16_t, double, int32_t>* source, const std::filesystem::path& fname){ ClusterFileSink(ClusterFinderMT<uint16_t, double, int32_t> *source,
m_source = source->sink(); const std::filesystem::path &fname) {
m_thread = std::thread(&ClusterFileSink::process, this); m_source = source->sink();
m_file.open(fname, std::ios::binary); m_thread = std::thread(&ClusterFileSink::process, this);
} m_file.open(fname, std::ios::binary);
void stop(){ }
m_stop_requested = true; void stop() {
m_thread.join(); m_stop_requested = true;
m_file.close(); m_thread.join();
} m_file.close();
}
}; };
} // namespace aare } // namespace aare