diff --git a/src/ProcessManager.cpp b/src/ProcessManager.cpp index 3dbdfb9..3815087 100644 --- a/src/ProcessManager.cpp +++ b/src/ProcessManager.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include "ProcessManager.hpp" #include "config.hpp" @@ -27,23 +28,23 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri continue; } - const pair received_data = ring_buffer.read(); + const pair< shared_ptr, char* > received_data = ring_buffer.read(); // NULL pointer means that the ringbuffer->read() timeouted. Faster than rising an exception. if(!received_data.second) { continue; } - writer.write_frame_data(received_data.first.frame_index, - received_data.first.frame_shape, - received_data.first.frame_bytes_size, + writer.write_frame_data(received_data.first->frame_index, + received_data.first->frame_shape, + received_data.first->frame_bytes_size, received_data.second, - received_data.first.type, - received_data.first.endianness); + received_data.first->type, + received_data.first->endianness); - ring_buffer.release(received_data.first.buffer_slot_index); + ring_buffer.release(received_data.first->buffer_slot_index); - manager.written_frame(received_data.first.frame_index); + manager.written_frame(received_data.first->frame_index); } if (writer.is_file_open()) { @@ -94,8 +95,6 @@ void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer zmq::message_t message_header(config::zmq_buffer_size_header); zmq::message_t message_data(config::zmq_buffer_size_data); - FrameMetadata frame_metadata; - pt::ptree json_header; while (manager.is_running()) { @@ -105,51 +104,55 @@ void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer } // Parse JSON header. - frame_metadata.header_string = string(static_cast(message_header.data()), message_header.size()); + auto frame_metadata = make_shared(); + + frame_metadata->header_string = string(static_cast(message_header.data()), message_header.size()); + stringstream header_stream; - header_stream << frame_metadata.header_string << endl; + header_stream << frame_metadata->header_string << endl; + pt::read_json(header_stream, json_header); // Extract data from message header. - frame_metadata.frame_index = json_header.get("frame"); + frame_metadata->frame_index = json_header.get("frame"); uint8_t index = 0; for (const auto& item : json_header.get_child("shape")) { - frame_metadata.frame_shape[index] = item.second.get_value(); + frame_metadata->frame_shape[index] = item.second.get_value(); ++index; } // Array 1.0 specified little endian as the default encoding. - frame_metadata.endianness = json_header.get("endianness", "little"); + frame_metadata->endianness = json_header.get("endianness", "little"); - frame_metadata.type = json_header.get("type"); + frame_metadata->type = json_header.get("type"); // Get the message data. if (!receiver.recv(&message_data)) { - cout << "[h5_zmq_writer::receive_zmq] ERROR: Error while reading from ZMQ. Frame index " << frame_metadata.frame_index << " lost."; + cout << "[h5_zmq_writer::receive_zmq] ERROR: Error while reading from ZMQ. Frame index " << frame_metadata->frame_index << " lost."; cout << " Trying to continue with the next frame." << endl; - manager.lost_frame(frame_metadata.frame_index); + manager.lost_frame(frame_metadata->frame_index); continue; } - frame_metadata.frame_bytes_size = message_data.size(); + frame_metadata->frame_bytes_size = message_data.size(); #ifdef DEBUG_OUTPUT cout << "[h5_zmq_writer::receive_zmq] Processing FrameMetadata"; - cout << " with frame_index " << frame_metadata.frame_index; - cout << " and frame_shape [" << frame_metadata.frame_shape[0] << ", " << frame_metadata.frame_shape[1] << "]"; - cout << " and endianness " << frame_metadata.endianness; - cout << " and type " << frame_metadata.type; - cout << " and frame_bytes_size " << frame_metadata.frame_bytes_size; + cout << " with frame_index " << frame_metadata->frame_index; + cout << " and frame_shape [" << frame_metadata->frame_shape[0] << ", " << frame_metadata->frame_shape[1] << "]"; + cout << " and endianness " << frame_metadata->endianness; + cout << " and type " << frame_metadata->type; + cout << " and frame_bytes_size " << frame_metadata->frame_bytes_size; cout << "." << endl; #endif // Commit the frame to the buffer. ring_buffer.write(frame_metadata, static_cast(message_data.data())); - manager.received_frame(frame_metadata.frame_index); + manager.received_frame(frame_metadata->frame_index); } #ifdef DEBUG_OUTPUT diff --git a/src/RingBuffer.cpp b/src/RingBuffer.cpp index 3c1c1f7..5d136c3 100644 --- a/src/RingBuffer.cpp +++ b/src/RingBuffer.cpp @@ -50,19 +50,19 @@ void RingBuffer::initialize(size_t slot_size) #endif } -void RingBuffer::write(FrameMetadata &frame_metadata, const char* data) +void RingBuffer::write(shared_ptr frame_metadata, const char* data) { // Initialize the buffer on the first write. if (!ring_buffer_initialized) { - initialize(frame_metadata.frame_bytes_size); + initialize(frame_metadata->frame_bytes_size); } // All images must fit in the ring buffer. - if (frame_metadata.frame_bytes_size > slot_size) { + if (frame_metadata->frame_bytes_size > slot_size) { stringstream error_message; - error_message << "[RingBuffer::write] Received frame index "<< frame_metadata.frame_index; + error_message << "[RingBuffer::write] Received frame index "<< frame_metadata->frame_index; error_message << " that is too large for ring buffer slot. "; - error_message << "Slot size " << slot_size << ", but frame bytes size " << frame_metadata.frame_bytes_size << endl; + error_message << "Slot size " << slot_size << ", but frame bytes size " << frame_metadata->frame_bytes_size << endl; throw runtime_error(error_message.str()); } @@ -75,11 +75,11 @@ void RingBuffer::write(FrameMetadata &frame_metadata, const char* data) ringbuffer_slots[write_index] = 1; // Set the write index in the FrameMetadata object. - frame_metadata.buffer_slot_index = write_index; + frame_metadata->buffer_slot_index = write_index; #ifdef DEBUG_OUTPUT - cout << "[RingBuffer::write] Ring buffer slot " << frame_metadata.buffer_slot_index << " reserved for frame_index "; - cout << frame_metadata.frame_index << endl; + cout << "[RingBuffer::write] Ring buffer slot " << frame_metadata->buffer_slot_index << " reserved for frame_index "; + cout << frame_metadata->frame_index << endl; #endif // Increase and wrap the write index around if needed. @@ -97,12 +97,12 @@ void RingBuffer::write(FrameMetadata &frame_metadata, const char* data) } // The slot is already reserved, no need for synchronization. - char* slot_memory_address = get_buffer_slot_address(frame_metadata.buffer_slot_index); - memcpy(slot_memory_address, data, frame_metadata.frame_bytes_size); + char* slot_memory_address = get_buffer_slot_address(frame_metadata->buffer_slot_index); + memcpy(slot_memory_address, data, frame_metadata->frame_bytes_size); #ifdef DEBUG_OUTPUT - cout << "[RingBuffer::write] Copied " << frame_metadata.frame_bytes_size << " frame bytes to buffer_slot_index "; - cout << frame_metadata.buffer_slot_index << endl; + cout << "[RingBuffer::write] Copied " << frame_metadata->frame_bytes_size << " frame bytes to buffer_slot_index "; + cout << frame_metadata->buffer_slot_index << endl; #endif // Add metadata header to the inter-thread communication queue. @@ -113,7 +113,7 @@ void RingBuffer::write(FrameMetadata &frame_metadata, const char* data) } #ifdef DEBUG_OUTPUT - cout << "[RingBuffer::write] Metadata for frame_index " << frame_metadata.frame_index << " added to metadata queue." << endl; + cout << "[RingBuffer::write] Metadata for frame_index " << frame_metadata->frame_index << " added to metadata queue." << endl; #endif } @@ -138,9 +138,9 @@ char* RingBuffer::get_buffer_slot_address(size_t buffer_slot_index) return slot_memory_address; } -pair RingBuffer::read() +pair, char*> RingBuffer::read() { - FrameMetadata frame_metadata; + shared_ptr frame_metadata; // Read data from the metadata queue. { @@ -148,7 +148,7 @@ pair RingBuffer::read() // A NULL char* indicates that there are no available data in the ring buffer. if (frame_metadata_queue.empty()) { - return {frame_metadata, NULL}; + return {NULL, NULL}; } frame_metadata = frame_metadata_queue.front(); @@ -156,24 +156,23 @@ pair RingBuffer::read() } #ifdef DEBUG_OUTPUT - cout << "[RingBuffer::read] Received metadata for frame_index " << frame_metadata.frame_index << endl; + cout << "[RingBuffer::read] Received metadata for frame_index " << frame_metadata->frame_index << endl; #endif // Check if the references ring buffer slot is valid. { lock_guard lock(ringbuffer_slots_mutex); - if (!ringbuffer_slots[frame_metadata.buffer_slot_index]) { + if (!ringbuffer_slots[frame_metadata->buffer_slot_index]) { stringstream error_message; error_message << "[RingBuffer::read] Ring buffer slot referenced in message header "; - error_message << frame_metadata.buffer_slot_index << " is empty." << endl; + error_message << frame_metadata->buffer_slot_index << " is empty." << endl; throw runtime_error(error_message.str()); } - } - char* slot_memory_address = get_buffer_slot_address(frame_metadata.buffer_slot_index); + char* slot_memory_address = get_buffer_slot_address(frame_metadata->buffer_slot_index); return {frame_metadata, slot_memory_address}; } diff --git a/src/RingBuffer.hpp b/src/RingBuffer.hpp index 6ab7587..3fb4ba3 100644 --- a/src/RingBuffer.hpp +++ b/src/RingBuffer.hpp @@ -5,26 +5,20 @@ #include #include #include +#include #include struct FrameMetadata { FrameMetadata(){} - - FrameMetadata(const FrameMetadata& other) : - buffer_slot_index(other.buffer_slot_index), frame_bytes_size(other.frame_bytes_size), frame_index(other.frame_index), - endianness(other.endianness), type(other.type), header_string(other.header_string) { - frame_shape[0] = other.frame_shape[0]; - frame_shape[1] = other.frame_shape[1]; - } // Ring buffer needed data. - size_t buffer_slot_index = 0; - size_t frame_bytes_size = 0; + size_t buffer_slot_index; + size_t frame_bytes_size; // Image header data. - uint64_t frame_index = 0; + uint64_t frame_index; std::string endianness; std::string type; size_t frame_shape[2]; @@ -47,7 +41,7 @@ class RingBuffer size_t buffer_used_slots = 0; bool ring_buffer_initialized = false; - std::list frame_metadata_queue; + std::list< std::shared_ptr > frame_metadata_queue; std::mutex frame_metadata_queue_mutex; std::mutex ringbuffer_slots_mutex; @@ -58,8 +52,8 @@ class RingBuffer virtual ~RingBuffer(); void initialize(size_t slot_size); - void write(FrameMetadata &metadata, const char* data); - std::pair read(); + void write(const std::shared_ptr metadata, const char* data); + std::pair, char*> read(); void release(size_t buffer_slot_index); bool is_empty(); };