Move processing threads to references

This commit is contained in:
2018-01-31 15:13:42 +01:00
parent 8354cf58a2
commit af5b08bc00
+16 -16
View File
@@ -18,19 +18,19 @@
using namespace std;
namespace pt = boost::property_tree;
void write_h5(WriterManager *manager, RingBuffer *ring_buffer, string output_file)
void write_h5(WriterManager& manager, RingBuffer& ring_buffer, string output_file)
{
H5Writer writer(output_file, "raw_data");
// Run until the running flag is set or the ring_buffer is empty.
while(manager->is_running() || !ring_buffer->is_empty()) {
while(manager.is_running() || !ring_buffer.is_empty()) {
if (ring_buffer->is_empty()) {
if (ring_buffer.is_empty()) {
boost::this_thread::sleep_for(boost::chrono::milliseconds(config::ring_buffer_read_retry_interval));
continue;
}
pair<FrameMetadata, char*> received_data = ring_buffer->read();
pair<FrameMetadata, char*> received_data = ring_buffer.read();
// NULL pointer means that the ringbuffer->read() timeouted. Faster than rising an exception.
if(!received_data.second) {
@@ -44,9 +44,9 @@ void write_h5(WriterManager *manager, RingBuffer *ring_buffer, string output_fil
received_data.first.type,
received_data.first.endianness);
ring_buffer->release(received_data.first.buffer_slot_index);
ring_buffer.release(received_data.first.buffer_slot_index);
manager->written_frame(received_data.first.frame_index);
manager.written_frame(received_data.first.frame_index);
}
if (writer.is_file_open()) {
@@ -55,13 +55,13 @@ void write_h5(WriterManager *manager, RingBuffer *ring_buffer, string output_fil
#endif
// Wait until all parameters are set or writer is killed.
while (!manager->are_all_parameters_set() && !manager->is_killed()) {
while (!manager.are_all_parameters_set() && !manager.is_killed()) {
boost::this_thread::sleep_for(boost::chrono::milliseconds(config::parameters_read_retry_interval));
}
// Need to check again if we have all parameters to write down the format.
if (manager->are_all_parameters_set()) {
auto parameters = manager->get_parameters();
if (manager.are_all_parameters_set()) {
auto parameters = manager.get_parameters();
// Even if we can't write the format, lets try to preserve the data.
try {
@@ -82,11 +82,11 @@ void write_h5(WriterManager *manager, RingBuffer *ring_buffer, string output_fil
cout << "[h5_zmq_writer::write] Writer thread stopped." << endl;
#endif
// Exit when writer thread has finished.
// Exit when writer thread has closed the file.
exit(0);
}
void receive_zmq(WriterManager *manager, RingBuffer *ring_buffer, string connect_address, int n_io_threads=1, int receive_timeout=-1)
void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, string connect_address, int n_io_threads=1, int receive_timeout=-1)
{
zmq::context_t context(n_io_threads);
zmq::socket_t receiver(context, ZMQ_PULL);
@@ -100,7 +100,7 @@ void receive_zmq(WriterManager *manager, RingBuffer *ring_buffer, string connect
pt::ptree json_header;
while (manager->is_running()) {
while (manager.is_running()) {
// Get the message header.
if (!receiver.recv(&message_header)){
continue;
@@ -143,9 +143,9 @@ void receive_zmq(WriterManager *manager, RingBuffer *ring_buffer, string connect
#endif
// Commit the frame to the buffer.
ring_buffer->write(frame_metadata, static_cast<char*>(message_data.data()));
ring_buffer.write(frame_metadata, static_cast<char*>(message_data.data()));
manager->received_frame(frame_metadata.frame_index);
manager.received_frame(frame_metadata.frame_index);
}
#ifdef DEBUG_OUTPUT
@@ -172,8 +172,8 @@ void run_writer(string connect_address, string output_file, uint64_t n_frames, u
cout << endl;
#endif
boost::thread receiver_thread(receive_zmq, &manager, &ring_buffer, connect_address, n_io_threads, receive_timeout);
boost::thread writer_thread(write_h5, &manager, &ring_buffer, output_file);
boost::thread receiver_thread(receive_zmq, boost::ref(manager), boost::ref(ring_buffer), connect_address, n_io_threads, receive_timeout);
boost::thread writer_thread(write_h5, boost::ref(manager), boost::ref(ring_buffer), output_file);
start_rest_api(manager, rest_port);