mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-04-23 06:32:41 +02:00
Add receiving flag
This commit is contained in:
@@ -11,6 +11,7 @@ class ZmqRecvModule
|
||||
RingBuffer& ring_buffer_;
|
||||
const header_map& header_values_;
|
||||
const std::atomic_bool& is_writing_;
|
||||
std::atomic_bool is_receiving_;
|
||||
|
||||
protected:
|
||||
void receive_thread(
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
|
||||
#include <config.hpp>
|
||||
#include <iostream>
|
||||
#include <compression.hpp>
|
||||
#include "ZmqRecvModule.hpp"
|
||||
|
||||
using namespace std;
|
||||
@@ -11,10 +12,9 @@ ZmqRecvModule::ZmqRecvModule(
|
||||
const std::atomic_bool& is_writing) :
|
||||
ring_buffer_(ring_buffer_),
|
||||
header_values_(header_values),
|
||||
is_writing_(is_writing)
|
||||
{
|
||||
|
||||
}
|
||||
is_writing_(is_writing),
|
||||
is_receiving_(false)
|
||||
{}
|
||||
|
||||
void ZmqRecvModule::start(
|
||||
const string& connect_address,
|
||||
@@ -40,13 +40,13 @@ void ZmqRecvModule::receive_thread(
|
||||
|
||||
receiver.connect();
|
||||
|
||||
while (true) {
|
||||
while (is_receiving_.load(memory_order_relaxed)) {
|
||||
|
||||
auto frame = receiver.receive();
|
||||
|
||||
// If no message, first and second = nullptr
|
||||
if (frame.first == nullptr ||
|
||||
!is_writing_.load(memory_order::memory_order_relaxed)) {
|
||||
!is_writing_.load(memory_order_relaxed)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -70,11 +70,16 @@ void ZmqRecvModule::receive_thread(
|
||||
|
||||
char* buffer = ring_buffer_.reserve(frame_metadata);
|
||||
|
||||
size_t max_buffer_size = compression::get_bitshuffle_max_buffer_size(
|
||||
frame_metadata->frame_bytes_size, 1);
|
||||
|
||||
if (max_buffer_size > ring_buffer.get_slot_size()) {
|
||||
// TODO: Add flag to disable compression.
|
||||
{
|
||||
// TODO: Cache results no to calculate this every time.
|
||||
size_t max_buffer_size =
|
||||
compression::get_bitshuffle_max_buffer_size(
|
||||
frame_metadata->frame_bytes_size, 1);
|
||||
|
||||
if (max_buffer_size > ring_buffer_.get_slot_size()) {
|
||||
//TODO: Throw error if not large enough.
|
||||
}
|
||||
}
|
||||
|
||||
auto compressed_size = compression::compress_bitshuffle(
|
||||
@@ -85,8 +90,10 @@ void ZmqRecvModule::receive_thread(
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
cout << "[" << std::chrono::system_clock::now() << "]";
|
||||
cout << "[ProcessManager::receive_zmq] Compressed image from ";
|
||||
using namespace chrono;
|
||||
cout << "[" << system_clock::now() << "]";
|
||||
cout << "[ProcessManager::receive_zmq]";
|
||||
cout << " Compressed image from ";
|
||||
cout << frame_metadata->frame_bytes_size << " bytes to ";
|
||||
cout << compressed_size << " bytes." << endl;
|
||||
#endif
|
||||
@@ -98,7 +105,9 @@ void ZmqRecvModule::receive_thread(
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
cout << "[" << std::chrono::system_clock::now() << "]";
|
||||
cout << "[ProcessManager::receive_zmq] Receiver thread stopped." << endl;
|
||||
using namespace chrono;
|
||||
cout << "[" << system_clock::now() << "]";
|
||||
cout << "[ProcessManager::receive_zmq]";
|
||||
cout << " Receiver thread stopped." << endl;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user