mirror of
https://github.com/slsdetectorgroup/aare.git
synced 2026-01-07 04:34:22 +01:00
Compare commits
2 Commits
2025.8.22
...
dev/highz/
| Author | SHA1 | Date | |
|---|---|---|---|
| 055feb2290 | |||
|
|
c035491b63 |
12
RELEASE.md
12
RELEASE.md
@@ -1,22 +1,16 @@
|
||||
# Release notes
|
||||
|
||||
|
||||
### 2025.8.22
|
||||
### head
|
||||
|
||||
Features:
|
||||
|
||||
- Apply calibration works in G0 if passes a 2D calibration and pedestal
|
||||
- count pixels that switch
|
||||
- calculate pedestal (also g0 version)
|
||||
- NDArray::view() needs an lvalue to reduce issues with the view outliving the array
|
||||
|
||||
|
||||
Bugfixes:
|
||||
|
||||
- Now using glibc 2.17 in conda builds (was using the host)
|
||||
- Fixed shifted pixels in clusters close to the edge of a frame
|
||||
|
||||
### 2025.7.18
|
||||
### 2025.07.18
|
||||
|
||||
Features:
|
||||
|
||||
@@ -30,7 +24,7 @@ Bugfixes:
|
||||
- Removed unused file: ClusterFile.cpp
|
||||
|
||||
|
||||
### 2025.5.22
|
||||
### 2025.05.22
|
||||
|
||||
Features:
|
||||
|
||||
|
||||
@@ -3,14 +3,3 @@ python:
|
||||
- 3.12
|
||||
- 3.13
|
||||
|
||||
c_compiler:
|
||||
- gcc # [linux]
|
||||
|
||||
c_stdlib:
|
||||
- sysroot # [linux]
|
||||
|
||||
cxx_compiler:
|
||||
- gxx # [linux]
|
||||
|
||||
c_stdlib_version: # [linux]
|
||||
- 2.17 # [linux]
|
||||
|
||||
@@ -16,8 +16,6 @@ build:
|
||||
|
||||
requirements:
|
||||
build:
|
||||
- {{ compiler('c') }}
|
||||
- {{ stdlib("c") }}
|
||||
- {{ compiler('cxx') }}
|
||||
- cmake
|
||||
- ninja
|
||||
|
||||
59
include/aare/BlockingQueue.hpp
Normal file
59
include/aare/BlockingQueue.hpp
Normal file
@@ -0,0 +1,59 @@
|
||||
#pragma once
|
||||
#include <atomic>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <deque>
|
||||
|
||||
#include "aare/ClusterFinder.hpp"
|
||||
#include "aare/NDArray.hpp"
|
||||
#include "aare/logger.hpp"
|
||||
|
||||
template <typename T>
|
||||
class BlockingQueue {
|
||||
std::mutex mtx;
|
||||
std::condition_variable cv_push, cv_pop;
|
||||
std::deque<T> queue;
|
||||
size_t max_size;
|
||||
bool closed = false;
|
||||
|
||||
public:
|
||||
BlockingQueue(size_t capacity) : max_size(capacity) {}
|
||||
|
||||
void push(T item) {
|
||||
std::unique_lock lock(mtx);
|
||||
cv_push.wait(lock, [this] { return queue.size() < max_size; });
|
||||
queue.push_back(std::move(item));
|
||||
cv_pop.notify_one();
|
||||
}
|
||||
|
||||
T pop() {
|
||||
std::unique_lock lock(mtx);
|
||||
cv_pop.wait(lock, [this] { return !queue.empty(); });
|
||||
T item = std::move(queue.front());
|
||||
queue.pop_front();
|
||||
cv_push.notify_one();
|
||||
return item;
|
||||
}
|
||||
|
||||
void close() {
|
||||
std::lock_guard lock(mtx);
|
||||
closed = true;
|
||||
cv_pop.notify_all();
|
||||
cv_push.notify_all();
|
||||
}
|
||||
|
||||
bool empty() {
|
||||
std::lock_guard lock(mtx);
|
||||
return queue.empty();
|
||||
}
|
||||
|
||||
|
||||
void write(T item) {push(item);}
|
||||
bool isEmpty() {return empty();}
|
||||
T frontPtr() {return pop();}
|
||||
T popFront() {return pop();}
|
||||
};
|
||||
@@ -5,13 +5,15 @@
|
||||
#include "aare/ClusterFinderMT.hpp"
|
||||
#include "aare/ClusterVector.hpp"
|
||||
#include "aare/ProducerConsumerQueue.hpp"
|
||||
#include "aare/BlockingQueue.hpp"
|
||||
|
||||
namespace aare {
|
||||
|
||||
template <typename ClusterType,
|
||||
typename = std::enable_if_t<is_cluster_v<ClusterType>>>
|
||||
class ClusterCollector {
|
||||
ProducerConsumerQueue<ClusterVector<ClusterType>> *m_source;
|
||||
// ProducerConsumerQueue<ClusterVector<ClusterType>> *m_source;
|
||||
BlockingQueue<ClusterVector<ClusterType>> *m_source;
|
||||
std::atomic<bool> m_stop_requested{false};
|
||||
std::atomic<bool> m_stopped{true};
|
||||
std::chrono::milliseconds m_default_wait{1};
|
||||
@@ -19,19 +21,47 @@ class ClusterCollector {
|
||||
std::vector<ClusterVector<ClusterType>> m_clusters;
|
||||
|
||||
void process() {
|
||||
// m_stopped = false;
|
||||
// fmt::print("ClusterCollector started\n");
|
||||
// // while (!m_stop_requested || !m_source->isEmpty()) {
|
||||
// while (true) {
|
||||
// if (clusters.frame_number() == -1)
|
||||
// break;
|
||||
|
||||
// ClusterVector<ClusterType> clusters = m_source->pop();
|
||||
// m_clusters.push_back(std::move(clusters));
|
||||
// // if (ClusterVector<ClusterType> *clusters = m_source->frontPtr();
|
||||
// // clusters != nullptr) {
|
||||
// // m_clusters.push_back(std::move(*clusters));
|
||||
// // m_source->popFront();
|
||||
// // } else {
|
||||
// // std::this_thread::sleep_for(m_default_wait);
|
||||
// // }
|
||||
// }
|
||||
// fmt::print("ClusterCollector stopped\n");
|
||||
// m_stopped = true;
|
||||
|
||||
|
||||
m_stopped = false;
|
||||
fmt::print("ClusterCollector started\n");
|
||||
while (!m_stop_requested || !m_source->isEmpty()) {
|
||||
if (ClusterVector<ClusterType> *clusters = m_source->frontPtr();
|
||||
clusters != nullptr) {
|
||||
m_clusters.push_back(std::move(*clusters));
|
||||
m_source->popFront();
|
||||
} else {
|
||||
std::this_thread::sleep_for(m_default_wait);
|
||||
|
||||
while (true) {
|
||||
// pop blocks until there is data
|
||||
ClusterVector<ClusterType> clusters = m_source->pop();
|
||||
|
||||
// POISON DETECTION
|
||||
if (clusters.frame_number() == -1) {
|
||||
fmt::print("ClusterCollector received poison frame, stopping\n");
|
||||
break; // exit loop cleanly
|
||||
}
|
||||
|
||||
// NORMAL DATA: store or process
|
||||
m_clusters.push_back(std::move(clusters));
|
||||
}
|
||||
|
||||
fmt::print("ClusterCollector stopped\n");
|
||||
m_stopped = true;
|
||||
|
||||
}
|
||||
|
||||
public:
|
||||
|
||||
@@ -6,13 +6,15 @@
|
||||
#include "aare/ClusterFinderMT.hpp"
|
||||
#include "aare/ClusterVector.hpp"
|
||||
#include "aare/ProducerConsumerQueue.hpp"
|
||||
#include "aare/BlockingQueue.hpp"
|
||||
|
||||
namespace aare {
|
||||
|
||||
template <typename ClusterType,
|
||||
typename = std::enable_if_t<is_cluster_v<ClusterType>>>
|
||||
class ClusterFileSink {
|
||||
ProducerConsumerQueue<ClusterVector<ClusterType>> *m_source;
|
||||
// ProducerConsumerQueue<ClusterVector<ClusterType>> *m_source;
|
||||
BlockingQueue<ClusterVector<ClusterType>> *m_source;
|
||||
std::atomic<bool> m_stop_requested{false};
|
||||
std::atomic<bool> m_stopped{true};
|
||||
std::chrono::milliseconds m_default_wait{1};
|
||||
@@ -20,29 +22,60 @@ class ClusterFileSink {
|
||||
std::ofstream m_file;
|
||||
|
||||
void process() {
|
||||
m_stopped = false;
|
||||
// m_stopped = false;
|
||||
// LOG(logDEBUG) << "ClusterFileSink started";
|
||||
// while (!m_stop_requested || !m_source->isEmpty()) {
|
||||
// // if (ClusterVector<ClusterType> *clusters = m_source->pop(); m_source->frontPtr();
|
||||
// // clusters != nullptr) {
|
||||
// {
|
||||
// ClusterVector<ClusterType> clusters = m_source->pop();
|
||||
// // Write clusters to file
|
||||
// int32_t frame_number =
|
||||
// clusters->frame_number(); // TODO! Should we store frame
|
||||
// // number already as int?
|
||||
// uint32_t num_clusters = clusters.size();
|
||||
|
||||
// if (frame_number >= 9910 && frame_number <= 9930)
|
||||
// std::cout << "prcoess: frame_number = " << frame_number << std::endl;
|
||||
|
||||
// m_file.write(reinterpret_cast<const char *>(&frame_number),
|
||||
// sizeof(frame_number));
|
||||
// m_file.write(reinterpret_cast<const char *>(&num_clusters),
|
||||
// sizeof(num_clusters));
|
||||
// m_file.write(reinterpret_cast<const char *>(clusters.data()),
|
||||
// clusters.size() * clusters.item_size());
|
||||
// m_source->popFront();
|
||||
// }
|
||||
// // else {
|
||||
// // std::this_thread::sleep_for(m_default_wait);
|
||||
// // }
|
||||
// }
|
||||
// LOG(logDEBUG) << "ClusterFileSink stopped";
|
||||
// m_stopped = true;
|
||||
|
||||
LOG(logDEBUG) << "ClusterFileSink started";
|
||||
while (!m_stop_requested || !m_source->isEmpty()) {
|
||||
if (ClusterVector<ClusterType> *clusters = m_source->frontPtr();
|
||||
clusters != nullptr) {
|
||||
// Write clusters to file
|
||||
int32_t frame_number =
|
||||
clusters->frame_number(); // TODO! Should we store frame
|
||||
// number already as int?
|
||||
uint32_t num_clusters = clusters->size();
|
||||
m_file.write(reinterpret_cast<const char *>(&frame_number),
|
||||
sizeof(frame_number));
|
||||
m_file.write(reinterpret_cast<const char *>(&num_clusters),
|
||||
sizeof(num_clusters));
|
||||
m_file.write(reinterpret_cast<const char *>(clusters->data()),
|
||||
clusters->size() * clusters->item_size());
|
||||
m_source->popFront();
|
||||
} else {
|
||||
std::this_thread::sleep_for(m_default_wait);
|
||||
|
||||
while (true) {
|
||||
ClusterVector<ClusterType> clusters = m_source->pop(); // blocks
|
||||
|
||||
// POISON PILL CHECK
|
||||
if (clusters.frame_number() == -1) {
|
||||
LOG(logDEBUG) << "ClusterFileSink received poison pill";
|
||||
break;
|
||||
}
|
||||
|
||||
int32_t frame_number = clusters.frame_number();
|
||||
uint32_t num_clusters = clusters.size();
|
||||
|
||||
m_file.write(reinterpret_cast<const char*>(&frame_number),
|
||||
sizeof(frame_number));
|
||||
m_file.write(reinterpret_cast<const char*>(&num_clusters),
|
||||
sizeof(num_clusters));
|
||||
m_file.write(reinterpret_cast<const char*>(clusters.data()),
|
||||
clusters.size() * clusters.item_size());
|
||||
}
|
||||
|
||||
LOG(logDEBUG) << "ClusterFileSink stopped";
|
||||
m_stopped = true;
|
||||
}
|
||||
|
||||
public:
|
||||
|
||||
@@ -19,9 +19,11 @@ class ClusterFinder {
|
||||
const PEDESTAL_TYPE c3;
|
||||
Pedestal<PEDESTAL_TYPE> m_pedestal;
|
||||
ClusterVector<ClusterType> m_clusters;
|
||||
const uint32_t ClusterSizeX;
|
||||
const uint32_t ClusterSizeY;
|
||||
|
||||
static const uint8_t ClusterSizeX = ClusterType::cluster_size_x;
|
||||
static const uint8_t ClusterSizeY = ClusterType::cluster_size_y;
|
||||
static const uint8_t SavedClusterSizeX = ClusterType::cluster_size_x;
|
||||
static const uint8_t SavedClusterSizeY = ClusterType::cluster_size_y;
|
||||
using CT = typename ClusterType::value_type;
|
||||
|
||||
public:
|
||||
@@ -34,10 +36,12 @@ class ClusterFinder {
|
||||
*
|
||||
*/
|
||||
ClusterFinder(Shape<2> image_size, PEDESTAL_TYPE nSigma = 5.0,
|
||||
size_t capacity = 1000000)
|
||||
size_t capacity = 1000000,
|
||||
uint32_t cluster_size_x = 3, uint32_t cluster_size_y = 3)
|
||||
: m_image_size(image_size), m_nSigma(nSigma),
|
||||
c2(sqrt((ClusterSizeY + 1) / 2 * (ClusterSizeX + 1) / 2)),
|
||||
c3(sqrt(ClusterSizeX * ClusterSizeY)),
|
||||
c2(sqrt((cluster_size_y + 1) / 2 * (cluster_size_x + 1) / 2)),
|
||||
c3(sqrt(cluster_size_x * cluster_size_y)),
|
||||
ClusterSizeX(cluster_size_x), ClusterSizeY(cluster_size_y),
|
||||
m_pedestal(image_size[0], image_size[1]), m_clusters(capacity) {
|
||||
LOG(logDEBUG) << "ClusterFinder: "
|
||||
<< "image_size: " << image_size[0] << "x" << image_size[1]
|
||||
@@ -74,12 +78,20 @@ class ClusterFinder {
|
||||
// // 4,4 -> +/- 2
|
||||
int dy = ClusterSizeY / 2;
|
||||
int dx = ClusterSizeX / 2;
|
||||
int dy2 = SavedClusterSizeY / 2;
|
||||
int dx2 = SavedClusterSizeX / 2;
|
||||
|
||||
int has_center_pixel_x =
|
||||
ClusterSizeX %
|
||||
2; // for even sized clusters there is no proper cluster center and
|
||||
// even amount of pixels around the center
|
||||
int has_center_pixel_y = ClusterSizeY % 2;
|
||||
|
||||
// if (frame_number >= 8000 && frame_number <= 9000)
|
||||
// // std::cout << "find_clusters: frame_number = " << frame_number << std::endl;
|
||||
// std::cout << frame_number << std::endl;
|
||||
|
||||
|
||||
m_clusters.set_frame_number(frame_number);
|
||||
for (int iy = 0; iy < frame.shape(0); iy++) {
|
||||
for (int ix = 0; ix < frame.shape(1); ix++) {
|
||||
@@ -135,16 +147,14 @@ class ClusterFinder {
|
||||
// It's worth redoing the look since most of the time we
|
||||
// don't have a photon
|
||||
int i = 0;
|
||||
for (int ir = -dy; ir < dy + has_center_pixel_y; ir++) {
|
||||
for (int ic = -dx; ic < dx + has_center_pixel_y; ic++) {
|
||||
for (int ir = -dy2; ir < dy2 + has_center_pixel_y; ir++) {
|
||||
for (int ic = -dx2; ic < dx2 + has_center_pixel_y; ic++) {
|
||||
if (ix + ic >= 0 && ix + ic < frame.shape(1) &&
|
||||
iy + ir >= 0 && iy + ir < frame.shape(0)) {
|
||||
CT tmp =
|
||||
static_cast<CT>(frame(iy + ir, ix + ic)) -
|
||||
static_cast<CT>(
|
||||
m_pedestal.mean(iy + ir, ix + ic));
|
||||
cluster.data[i] =
|
||||
tmp; // Watch for out of bounds access
|
||||
|
||||
CT tmp = static_cast<CT>(frame(iy + ir, ix + ic)) - static_cast<CT>(m_pedestal.mean(iy + ir, ix + ic));
|
||||
cluster.data[i] = tmp; // Watch for out of bounds access
|
||||
|
||||
}
|
||||
i++;
|
||||
}
|
||||
|
||||
@@ -4,10 +4,14 @@
|
||||
#include <memory>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <deque>
|
||||
|
||||
#include "aare/ClusterFinder.hpp"
|
||||
#include "aare/NDArray.hpp"
|
||||
#include "aare/ProducerConsumerQueue.hpp"
|
||||
#include "aare/BlockingQueue.hpp"
|
||||
#include "aare/logger.hpp"
|
||||
|
||||
namespace aare {
|
||||
@@ -23,6 +27,14 @@ struct FrameWrapper {
|
||||
NDArray<uint16_t, 2> data;
|
||||
};
|
||||
|
||||
static FrameWrapper make_poison_frame() {
|
||||
return FrameWrapper{FrameType::DATA, UINT64_MAX, NDArray<uint16_t,2>()};
|
||||
}
|
||||
|
||||
static bool is_poison(const FrameWrapper& f) {
|
||||
return f.frame_number == UINT64_MAX;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief ClusterFinderMT is a multi-threaded version of ClusterFinder. It uses
|
||||
* a producer-consumer queue to distribute the frames to the threads. The
|
||||
@@ -37,11 +49,15 @@ class ClusterFinderMT {
|
||||
|
||||
protected:
|
||||
using CT = typename ClusterType::value_type;
|
||||
size_t m_current_thread{0};
|
||||
// size_t m_current_thread{0};
|
||||
std::atomic<size_t> m_current_thread{0};
|
||||
|
||||
size_t m_n_threads{0};
|
||||
using Finder = ClusterFinder<ClusterType, FRAME_TYPE, PEDESTAL_TYPE>;
|
||||
using InputQueue = ProducerConsumerQueue<FrameWrapper>;
|
||||
using OutputQueue = ProducerConsumerQueue<ClusterVector<ClusterType>>;
|
||||
// using InputQueue = ProducerConsumerQueue<FrameWrapper>;
|
||||
// using OutputQueue = ProducerConsumerQueue<ClusterVector<ClusterType>>;
|
||||
using InputQueue = BlockingQueue<FrameWrapper>;
|
||||
using OutputQueue = BlockingQueue<ClusterVector<ClusterType>>;
|
||||
std::vector<std::unique_ptr<InputQueue>> m_input_queues;
|
||||
std::vector<std::unique_ptr<OutputQueue>> m_output_queues;
|
||||
|
||||
@@ -50,41 +66,72 @@ class ClusterFinderMT {
|
||||
std::vector<std::unique_ptr<Finder>> m_cluster_finders;
|
||||
std::vector<std::thread> m_threads;
|
||||
std::thread m_collect_thread;
|
||||
|
||||
|
||||
std::chrono::milliseconds m_default_wait{1};
|
||||
|
||||
private:
|
||||
std::atomic<bool> m_stop_requested{false};
|
||||
std::atomic<bool> m_processing_threads_stopped{true};
|
||||
|
||||
static ClusterVector<ClusterType> make_poison_cluster() {
|
||||
ClusterVector<ClusterType> v;
|
||||
v.set_frame_number(-1);
|
||||
return v;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* @brief Function called by the processing threads. It reads the frames
|
||||
* from the input queue and processes them.
|
||||
*/
|
||||
void process(int thread_id) {
|
||||
// auto cf = m_cluster_finders[thread_id].get();
|
||||
// auto q = m_input_queues[thread_id].get();
|
||||
// bool realloc_same_capacity = true;
|
||||
|
||||
// while (!m_stop_requested || !q->isEmpty()) {
|
||||
// if (FrameWrapper *frame = q->frontPtr(); frame != nullptr) {
|
||||
|
||||
// switch (frame->type) {
|
||||
// case FrameType::DATA:
|
||||
// cf->find_clusters(frame->data.view(), frame->frame_number);
|
||||
// m_output_queues[thread_id]->write(
|
||||
// cf->steal_clusters(realloc_same_capacity));
|
||||
// break;
|
||||
|
||||
// case FrameType::PEDESTAL:
|
||||
// m_cluster_finders[thread_id]->push_pedestal_frame(
|
||||
// frame->data.view());
|
||||
// break;
|
||||
// }
|
||||
|
||||
// // frame is processed now discard it
|
||||
// m_input_queues[thread_id]->popFront();
|
||||
// } else {
|
||||
// std::this_thread::sleep_for(m_default_wait);
|
||||
// }
|
||||
// }
|
||||
|
||||
auto cf = m_cluster_finders[thread_id].get();
|
||||
auto q = m_input_queues[thread_id].get();
|
||||
bool realloc_same_capacity = true;
|
||||
auto q = m_input_queues[thread_id].get();
|
||||
|
||||
while (!m_stop_requested || !q->isEmpty()) {
|
||||
if (FrameWrapper *frame = q->frontPtr(); frame != nullptr) {
|
||||
while (true) {
|
||||
FrameWrapper frame = q->pop(); // blocks
|
||||
|
||||
switch (frame->type) {
|
||||
if (is_poison(frame))
|
||||
break;
|
||||
|
||||
switch (frame.type) {
|
||||
case FrameType::DATA:
|
||||
cf->find_clusters(frame->data.view(), frame->frame_number);
|
||||
m_output_queues[thread_id]->write(
|
||||
cf->steal_clusters(realloc_same_capacity));
|
||||
cf->find_clusters(frame.data.view(), frame.frame_number);
|
||||
m_output_queues[thread_id]->push(cf->steal_clusters());
|
||||
break;
|
||||
|
||||
case FrameType::PEDESTAL:
|
||||
m_cluster_finders[thread_id]->push_pedestal_frame(
|
||||
frame->data.view());
|
||||
cf->push_pedestal_frame(frame.data.view());
|
||||
break;
|
||||
}
|
||||
|
||||
// frame is processed now discard it
|
||||
m_input_queues[thread_id]->popFront();
|
||||
} else {
|
||||
std::this_thread::sleep_for(m_default_wait);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -94,20 +141,66 @@ class ClusterFinderMT {
|
||||
* the sink
|
||||
*/
|
||||
void collect() {
|
||||
bool empty = true;
|
||||
while (!m_stop_requested || !empty || !m_processing_threads_stopped) {
|
||||
empty = true;
|
||||
for (auto &queue : m_output_queues) {
|
||||
if (!queue->isEmpty()) {
|
||||
// std::ofstream frame_log("/mnt/datapool/JMulvey/Data_Analysis/aare_testing/Read_Frame_Bug/test2.txt");
|
||||
|
||||
while (!m_sink.write(std::move(*queue->frontPtr()))) {
|
||||
std::this_thread::sleep_for(m_default_wait);
|
||||
// bool empty = true;
|
||||
// while (!m_stop_requested || !all_output_queues_empty() || !all_input_queues_empty()) {
|
||||
// // while (!m_stop_requested || !empty || !m_processing_threads_stopped) {
|
||||
// empty = true;
|
||||
// for (auto &queue : m_output_queues) {
|
||||
// if (!queue->isEmpty()) {
|
||||
|
||||
// // auto item = std::move(*queue->frontPtr()); //For Debug
|
||||
|
||||
// // while (!m_sink.write(item)) {
|
||||
// // std::this_thread::sleep_for(m_default_wait);
|
||||
// // }
|
||||
|
||||
// // frame_log << item.frame_number() << '\n'; //For Debug
|
||||
|
||||
// // queue->popFront();
|
||||
// // empty = false;
|
||||
|
||||
|
||||
// auto& item = *queue->frontPtr(); // use reference
|
||||
// while (!m_sink.write(std::move(item))) {
|
||||
// std::this_thread::sleep_for(m_default_wait);
|
||||
// }
|
||||
// frame_log << item.frame_number() << '\n'; // log frame number
|
||||
// queue->popFront();
|
||||
// empty = false;
|
||||
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// frame_log.close();
|
||||
|
||||
|
||||
std::ofstream frame_log("/mnt/datapool/JMulvey/Data_Analysis/aare_testing/Read_Frame_Bug/test2.txt");
|
||||
|
||||
size_t poison_count = 0;
|
||||
|
||||
while (true) {
|
||||
for (auto& queue : m_output_queues) {
|
||||
auto item = queue->pop(); // BLOCKS
|
||||
|
||||
if (item.frame_number() == -1) {
|
||||
poison_count++;
|
||||
if (poison_count == m_n_threads) {
|
||||
// all workers finished
|
||||
m_sink.push(make_poison_cluster());
|
||||
return;
|
||||
}
|
||||
queue->popFront();
|
||||
empty = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
m_sink.push(std::move(item));
|
||||
frame_log << item.frame_number() << '\n';
|
||||
}
|
||||
}
|
||||
|
||||
frame_log.close();
|
||||
}
|
||||
|
||||
public:
|
||||
@@ -121,7 +214,8 @@ class ClusterFinderMT {
|
||||
* @param n_threads number of threads to use
|
||||
*/
|
||||
ClusterFinderMT(Shape<2> image_size, PEDESTAL_TYPE nSigma = 5.0,
|
||||
size_t capacity = 2000, size_t n_threads = 3)
|
||||
size_t capacity = 2000, size_t n_threads = 3,
|
||||
uint32_t cluster_size_x = 3, uint32_t cluster_size_y = 3)
|
||||
: m_n_threads(n_threads) {
|
||||
|
||||
LOG(logDEBUG1) << "ClusterFinderMT: "
|
||||
@@ -134,7 +228,7 @@ class ClusterFinderMT {
|
||||
m_cluster_finders.push_back(
|
||||
std::make_unique<
|
||||
ClusterFinder<ClusterType, FRAME_TYPE, PEDESTAL_TYPE>>(
|
||||
image_size, nSigma, capacity));
|
||||
image_size, nSigma, capacity, cluster_size_x, cluster_size_y));
|
||||
}
|
||||
for (size_t i = 0; i < n_threads; i++) {
|
||||
m_input_queues.emplace_back(std::make_unique<InputQueue>(200));
|
||||
@@ -149,7 +243,8 @@ class ClusterFinderMT {
|
||||
* @warning You need to empty this queue otherwise the cluster finder will
|
||||
* wait forever
|
||||
*/
|
||||
ProducerConsumerQueue<ClusterVector<ClusterType>> *sink() {
|
||||
BlockingQueue<ClusterVector<ClusterType>> *sink() {
|
||||
//ProducerConsumerQueue<ClusterVector<ClusterType>> *sink() {
|
||||
return &m_sink;
|
||||
}
|
||||
|
||||
@@ -172,14 +267,31 @@ class ClusterFinderMT {
|
||||
* @brief Stop all processing threads
|
||||
*/
|
||||
void stop() {
|
||||
m_stop_requested = true;
|
||||
// m_stop_requested = true;
|
||||
|
||||
for (auto &thread : m_threads) {
|
||||
thread.join();
|
||||
}
|
||||
// for (auto &thread : m_threads) {
|
||||
// thread.join();
|
||||
// }
|
||||
// m_threads.clear();
|
||||
|
||||
// m_processing_threads_stopped = true;
|
||||
// m_collect_thread.join();
|
||||
|
||||
|
||||
// 1. Send poison to ALL worker input queues
|
||||
for (auto& q : m_input_queues)
|
||||
q->push(make_poison_frame());
|
||||
|
||||
// 2. Wait for worker threads
|
||||
for (auto& t : m_threads)
|
||||
t.join();
|
||||
m_threads.clear();
|
||||
|
||||
m_processing_threads_stopped = true;
|
||||
// 3. Send poison clusters from workers to collector
|
||||
for (auto& q : m_output_queues)
|
||||
q->push(make_poison_cluster());
|
||||
|
||||
// 4. Wait for collector
|
||||
m_collect_thread.join();
|
||||
}
|
||||
|
||||
@@ -211,9 +323,10 @@ class ClusterFinderMT {
|
||||
NDArray(frame)}; // TODO! copies the data!
|
||||
|
||||
for (auto &queue : m_input_queues) {
|
||||
while (!queue->write(fw)) {
|
||||
std::this_thread::sleep_for(m_default_wait);
|
||||
}
|
||||
queue->push(fw);
|
||||
// while (!queue->write(fw)) {
|
||||
// std::this_thread::sleep_for(m_default_wait);
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -223,12 +336,18 @@ class ClusterFinderMT {
|
||||
* @note Spin locks with a default wait if the queue is full.
|
||||
*/
|
||||
void find_clusters(NDView<FRAME_TYPE, 2> frame, uint64_t frame_number = 0) {
|
||||
FrameWrapper fw{FrameType::DATA, frame_number,
|
||||
NDArray(frame)}; // TODO! copies the data!
|
||||
while (!m_input_queues[m_current_thread % m_n_threads]->write(fw)) {
|
||||
std::this_thread::sleep_for(m_default_wait);
|
||||
}
|
||||
m_current_thread++;
|
||||
// FrameWrapper fw{FrameType::DATA, frame_number,
|
||||
// NDArray(frame)}; // TODO! copies the data!
|
||||
// size_t thread_idx = m_current_thread.fetch_add(1) % m_n_threads;
|
||||
// while (!m_input_queues[thread_idx]->write(fw)) {
|
||||
// // while (!m_input_queues[m_current_thread % m_n_threads]->write(fw)) {
|
||||
// std::this_thread::sleep_for(m_default_wait);
|
||||
// }
|
||||
// // m_current_thread++;
|
||||
|
||||
FrameWrapper fw{FrameType::DATA, frame_number, NDArray(frame)};
|
||||
size_t thread_idx = m_current_thread.fetch_add(1) % m_n_threads;
|
||||
m_input_queues[thread_idx]->push(std::move(fw)); // blocks if full
|
||||
}
|
||||
|
||||
void clear_pedestal() {
|
||||
|
||||
@@ -105,7 +105,7 @@ class Frame {
|
||||
* @tparam T type of the pixels
|
||||
* @return NDView<T, 2>
|
||||
*/
|
||||
template <typename T> NDView<T, 2> view() & {
|
||||
template <typename T> NDView<T, 2> view() {
|
||||
std::array<ssize_t, 2> shape = {static_cast<ssize_t>(m_rows),
|
||||
static_cast<ssize_t>(m_cols)};
|
||||
T *data = reinterpret_cast<T *>(m_data);
|
||||
|
||||
@@ -26,24 +26,24 @@ def _get_class(name, cluster_size, dtype):
|
||||
|
||||
|
||||
|
||||
def ClusterFinder(image_size, cluster_size, n_sigma=5, dtype = np.int32, capacity = 1024):
|
||||
def ClusterFinder(image_size, saved_cluster_size, checked_cluster_size, n_sigma=5, dtype = np.int32, capacity = 1024):
|
||||
"""
|
||||
Factory function to create a ClusterFinder object. Provides a cleaner syntax for
|
||||
the templated ClusterFinder in C++.
|
||||
"""
|
||||
cls = _get_class("ClusterFinder", cluster_size, dtype)
|
||||
return cls(image_size, n_sigma=n_sigma, capacity=capacity)
|
||||
cls = _get_class("ClusterFinder", saved_cluster_size, dtype)
|
||||
return cls(image_size, n_sigma=n_sigma, capacity=capacity, cluster_size_x=checked_cluster_size[0], cluster_size_y=checked_cluster_size[1])
|
||||
|
||||
|
||||
|
||||
def ClusterFinderMT(image_size, cluster_size = (3,3), dtype=np.int32, n_sigma=5, capacity = 1024, n_threads = 3):
|
||||
def ClusterFinderMT(image_size, saved_cluster_size = (3,3), checked_cluster_size = (3,3), dtype=np.int32, n_sigma=5, capacity = 1024, n_threads = 3):
|
||||
"""
|
||||
Factory function to create a ClusterFinderMT object. Provides a cleaner syntax for
|
||||
the templated ClusterFinderMT in C++.
|
||||
"""
|
||||
|
||||
cls = _get_class("ClusterFinderMT", cluster_size, dtype)
|
||||
return cls(image_size, n_sigma=n_sigma, capacity=capacity, n_threads=n_threads)
|
||||
cls = _get_class("ClusterFinderMT", saved_cluster_size, dtype)
|
||||
return cls(image_size, n_sigma=n_sigma, capacity=capacity, n_threads=n_threads, cluster_size_x=checked_cluster_size[0], cluster_size_y=checked_cluster_size[1])
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -30,8 +30,9 @@ void define_ClusterFinder(py::module &m, const std::string &typestr) {
|
||||
|
||||
py::class_<ClusterFinder<ClusterType, uint16_t, pd_type>>(
|
||||
m, class_name.c_str())
|
||||
.def(py::init<Shape<2>, pd_type, size_t>(), py::arg("image_size"),
|
||||
py::arg("n_sigma") = 5.0, py::arg("capacity") = 1'000'000)
|
||||
.def(py::init<Shape<2>, pd_type, size_t, uint32_t, uint32_t>(), py::arg("image_size"),
|
||||
py::arg("n_sigma") = 5.0, py::arg("capacity") = 1'000'000,
|
||||
py::arg("cluster_size_x") = 3, py::arg("cluster_size_y") = 3)
|
||||
.def("push_pedestal_frame",
|
||||
[](ClusterFinder<ClusterType, uint16_t, pd_type> &self,
|
||||
py::array_t<uint16_t> frame) {
|
||||
|
||||
@@ -30,9 +30,10 @@ void define_ClusterFinderMT(py::module &m, const std::string &typestr) {
|
||||
|
||||
py::class_<ClusterFinderMT<ClusterType, uint16_t, pd_type>>(
|
||||
m, class_name.c_str())
|
||||
.def(py::init<Shape<2>, pd_type, size_t, size_t>(),
|
||||
.def(py::init<Shape<2>, pd_type, size_t, size_t, uint32_t, uint32_t>(),
|
||||
py::arg("image_size"), py::arg("n_sigma") = 5.0,
|
||||
py::arg("capacity") = 2048, py::arg("n_threads") = 3)
|
||||
py::arg("capacity") = 2048, py::arg("n_threads") = 3,
|
||||
py::arg("cluster_size_x") = 3, py::arg("cluster_size_y") = 3)
|
||||
.def("push_pedestal_frame",
|
||||
[](ClusterFinderMT<ClusterType, uint16_t, pd_type> &self,
|
||||
py::array_t<uint16_t> frame) {
|
||||
|
||||
@@ -84,4 +84,9 @@ PYBIND11_MODULE(_aare, m) {
|
||||
DEFINE_CLUSTER_BINDINGS(int, 9, 9, uint16_t, i);
|
||||
DEFINE_CLUSTER_BINDINGS(double, 9, 9, uint16_t, d);
|
||||
DEFINE_CLUSTER_BINDINGS(float, 9, 9, uint16_t, f);
|
||||
|
||||
DEFINE_CLUSTER_BINDINGS(int, 21, 21, uint16_t, i);
|
||||
DEFINE_CLUSTER_BINDINGS(double, 21, 21, uint16_t, d);
|
||||
DEFINE_CLUSTER_BINDINGS(float, 21, 21, uint16_t, f);
|
||||
|
||||
}
|
||||
|
||||
@@ -57,7 +57,6 @@ class ClusterFinderMTWrapper
|
||||
size_t m_sink_size() const { return this->m_sink.sizeGuess(); }
|
||||
};
|
||||
|
||||
|
||||
TEST_CASE("multithreaded cluster finder", "[.with-data]") {
|
||||
auto fpath =
|
||||
test_data_path() / "raw/moench03/cu_half_speed_master_4.json";
|
||||
@@ -82,8 +81,7 @@ TEST_CASE("multithreaded cluster finder", "[.with-data]") {
|
||||
CHECK(cf.m_input_queues_are_empty() == true);
|
||||
|
||||
for (size_t i = 0; i < n_frames_pd; ++i) {
|
||||
auto frame = file.read_frame();
|
||||
cf.find_clusters(frame.view<uint16_t>());
|
||||
cf.find_clusters(file.read_frame().view<uint16_t>());
|
||||
}
|
||||
|
||||
cf.stop();
|
||||
|
||||
@@ -7,7 +7,6 @@ Script to update VERSION file with semantic versioning if provided as an argumen
|
||||
import sys
|
||||
import os
|
||||
import re
|
||||
from datetime import datetime
|
||||
|
||||
from packaging.version import Version, InvalidVersion
|
||||
|
||||
@@ -27,9 +26,9 @@ def get_version():
|
||||
|
||||
# Check at least one argument is passed
|
||||
if len(sys.argv) < 2:
|
||||
version = datetime.today().strftime('%Y.%-m.%-d')
|
||||
else:
|
||||
version = sys.argv[1]
|
||||
return "0.0.0"
|
||||
|
||||
version = sys.argv[1]
|
||||
|
||||
try:
|
||||
v = Version(version) # normalize check if version follows PEP 440 specification
|
||||
@@ -55,4 +54,4 @@ def write_version_to_file(version):
|
||||
if __name__ == "__main__":
|
||||
|
||||
version = get_version()
|
||||
write_version_to_file(version)
|
||||
write_version_to_file(version)
|
||||
Reference in New Issue
Block a user