mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-02 12:34:14 +02:00
Make RB initialization flag local
This commit is contained in:
@@ -101,11 +101,14 @@ void ZmqRecvModule::receive_thread(const string& connect_address)
|
||||
ZmqReceiver receiver(header_values_);
|
||||
receiver.connect(connect_address);
|
||||
|
||||
bool rb_initialized(false);
|
||||
|
||||
while (is_receiving_.load(memory_order_relaxed)) {
|
||||
|
||||
auto frame = receiver.receive();
|
||||
|
||||
// If no message, .first and .second = nullptr
|
||||
// .first and .second = nullptr when no message received
|
||||
// If no message or currently not writing, idle.
|
||||
if (frame.first == nullptr ||
|
||||
!is_writing_.load(memory_order_relaxed)) {
|
||||
continue;
|
||||
@@ -129,11 +132,8 @@ void ZmqRecvModule::receive_thread(const string& connect_address)
|
||||
cout << frame_metadata->frame_bytes_size << "." << endl;
|
||||
#endif
|
||||
|
||||
char* buffer = ring_buffer_.reserve(frame_metadata);
|
||||
if (!rb_initialized) {
|
||||
|
||||
// TODO: Add flag to disable compression.
|
||||
// TODO: Cache results no to calculate this every time.
|
||||
{
|
||||
size_t n_elements =
|
||||
frame_metadata->frame_shape[0] *
|
||||
frame_metadata->frame_shape[1];
|
||||
@@ -143,22 +143,13 @@ void ZmqRecvModule::receive_thread(const string& connect_address)
|
||||
n_elements,
|
||||
frame_metadata->frame_bytes_size/n_elements);
|
||||
|
||||
if (max_buffer_size > ring_buffer_.get_slot_size()) {
|
||||
stringstream err_msg;
|
||||
ring_buffer_.initialize(max_buffer_size);
|
||||
|
||||
using namespace date;
|
||||
using namespace chrono;
|
||||
err_msg << "[" << system_clock::now() << "]";
|
||||
err_msg << "[ZmqRecvModule::receive_thread]";
|
||||
err_msg << " RingBuffer slot size ";
|
||||
err_msg << ring_buffer_.get_slot_size();
|
||||
err_msg << " smaller than max_buffer_size ";
|
||||
err_msg << max_buffer_size << endl;
|
||||
|
||||
throw runtime_error(err_msg.str());
|
||||
}
|
||||
rb_initialized = true;
|
||||
}
|
||||
|
||||
char* buffer = ring_buffer_.reserve(frame_metadata);
|
||||
|
||||
auto compressed_size = compression::compress_bitshuffle(
|
||||
static_cast<const char*>(frame_data),
|
||||
frame_metadata->frame_bytes_size,
|
||||
|
||||
Reference in New Issue
Block a user