From af5b08bc00177ba71b6b8406eabe402b5a0e040d Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 31 Jan 2018 15:13:42 +0100 Subject: [PATCH] Move processing threads to references --- src/h5_zmq_writer.cpp | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/h5_zmq_writer.cpp b/src/h5_zmq_writer.cpp index 86c911d..640b6d0 100644 --- a/src/h5_zmq_writer.cpp +++ b/src/h5_zmq_writer.cpp @@ -18,19 +18,19 @@ using namespace std; namespace pt = boost::property_tree; -void write_h5(WriterManager *manager, RingBuffer *ring_buffer, string output_file) +void write_h5(WriterManager& manager, RingBuffer& ring_buffer, string output_file) { H5Writer writer(output_file, "raw_data"); // Run until the running flag is set or the ring_buffer is empty. - while(manager->is_running() || !ring_buffer->is_empty()) { + while(manager.is_running() || !ring_buffer.is_empty()) { - if (ring_buffer->is_empty()) { + if (ring_buffer.is_empty()) { boost::this_thread::sleep_for(boost::chrono::milliseconds(config::ring_buffer_read_retry_interval)); continue; } - pair received_data = ring_buffer->read(); + pair received_data = ring_buffer.read(); // NULL pointer means that the ringbuffer->read() timeouted. Faster than rising an exception. if(!received_data.second) { @@ -44,9 +44,9 @@ void write_h5(WriterManager *manager, RingBuffer *ring_buffer, string output_fil received_data.first.type, received_data.first.endianness); - ring_buffer->release(received_data.first.buffer_slot_index); + ring_buffer.release(received_data.first.buffer_slot_index); - manager->written_frame(received_data.first.frame_index); + manager.written_frame(received_data.first.frame_index); } if (writer.is_file_open()) { @@ -55,13 +55,13 @@ void write_h5(WriterManager *manager, RingBuffer *ring_buffer, string output_fil #endif // Wait until all parameters are set or writer is killed. - while (!manager->are_all_parameters_set() && !manager->is_killed()) { + while (!manager.are_all_parameters_set() && !manager.is_killed()) { boost::this_thread::sleep_for(boost::chrono::milliseconds(config::parameters_read_retry_interval)); } // Need to check again if we have all parameters to write down the format. - if (manager->are_all_parameters_set()) { - auto parameters = manager->get_parameters(); + if (manager.are_all_parameters_set()) { + auto parameters = manager.get_parameters(); // Even if we can't write the format, lets try to preserve the data. try { @@ -82,11 +82,11 @@ void write_h5(WriterManager *manager, RingBuffer *ring_buffer, string output_fil cout << "[h5_zmq_writer::write] Writer thread stopped." << endl; #endif - // Exit when writer thread has finished. + // Exit when writer thread has closed the file. exit(0); } -void receive_zmq(WriterManager *manager, RingBuffer *ring_buffer, string connect_address, int n_io_threads=1, int receive_timeout=-1) +void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, string connect_address, int n_io_threads=1, int receive_timeout=-1) { zmq::context_t context(n_io_threads); zmq::socket_t receiver(context, ZMQ_PULL); @@ -100,7 +100,7 @@ void receive_zmq(WriterManager *manager, RingBuffer *ring_buffer, string connect pt::ptree json_header; - while (manager->is_running()) { + while (manager.is_running()) { // Get the message header. if (!receiver.recv(&message_header)){ continue; @@ -143,9 +143,9 @@ void receive_zmq(WriterManager *manager, RingBuffer *ring_buffer, string connect #endif // Commit the frame to the buffer. - ring_buffer->write(frame_metadata, static_cast(message_data.data())); + ring_buffer.write(frame_metadata, static_cast(message_data.data())); - manager->received_frame(frame_metadata.frame_index); + manager.received_frame(frame_metadata.frame_index); } #ifdef DEBUG_OUTPUT @@ -172,8 +172,8 @@ void run_writer(string connect_address, string output_file, uint64_t n_frames, u cout << endl; #endif - boost::thread receiver_thread(receive_zmq, &manager, &ring_buffer, connect_address, n_io_threads, receive_timeout); - boost::thread writer_thread(write_h5, &manager, &ring_buffer, output_file); + boost::thread receiver_thread(receive_zmq, boost::ref(manager), boost::ref(ring_buffer), connect_address, n_io_threads, receive_timeout); + boost::thread writer_thread(write_h5, boost::ref(manager), boost::ref(ring_buffer), output_file); start_rest_api(manager, rest_port);