Remove protocol from process manager

This commit is contained in:
2018-02-12 10:14:38 +01:00
parent b3ce819f00
commit 0e8fd9e2c9
2 changed files with 23 additions and 122 deletions
+20 -109
View File
@@ -1,24 +1,20 @@
#include <zmq.hpp>
#include <cstdlib>
#include <chrono>
#include <unistd.h>
#include <stdexcept>
#include <boost/thread.hpp>
#include <iostream>
#include <memory>
#include <boost/thread.hpp>
#include "ProcessManager.hpp"
#include "config.hpp"
#include "H5Writer.hpp"
#include "RestApi.hpp"
using namespace std;
namespace pt = boost::property_tree;
void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer)
void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer)
{
H5Writer writer(manager.get_output_file());
auto raw_frames_dataset_name = format.get_raw_frames_dataset_name();
const auto& header_value_type = format.get_header_value_type();
// Run until the running flag is set or the ring_buffer is empty.
while(manager.is_running() || !ring_buffer.is_empty()) {
@@ -31,7 +27,7 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri
const pair< shared_ptr<FrameMetadata>, char* > received_data = ring_buffer.read();
// NULL pointer means that the ringbuffer->read() timeouted. Faster than rising an exception.
if(!received_data.second) {
if(!received_data.first) {
continue;
}
@@ -86,40 +82,22 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri
exit(0);
}
void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, const H5Format& format,
const string& connect_address, int n_io_threads, int receive_timeout)
void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer,
ZmqReceiver& receiver, const H5Format& format)
{
zmq::context_t context(n_io_threads);
zmq::socket_t receiver(context, ZMQ_PULL);
receiver.setsockopt(ZMQ_RCVTIMEO, receive_timeout);
receiver.connect(connect_address);
zmq::message_t message_header(config::zmq_buffer_size_header);
zmq::message_t message_data(config::zmq_buffer_size_data);
pt::ptree json_header;
const auto& header_value_type = format.get_header_value_type();
while (manager.is_running()) {
// Get the message header.
if (!receiver.recv(&message_header)){
auto frame = receiver.receive();
// In case no message is available before the timeout, both pointers are NULL.
if (!frame.first){
continue;
}
auto header_string = string(static_cast<char*>(message_header.data()), message_header.size());
auto frame_metadata = read_json_header(json_header, header_string, header_value_type);
// Get the message data.
if (!receiver.recv(&message_data)) {
cout << "[h5_zmq_writer::receive_zmq] ERROR: Error while reading from ZMQ. Frame index " << frame_metadata->frame_index << " lost.";
cout << " Trying to continue with the next frame." << endl;
manager.lost_frame(frame_metadata->frame_index);
continue;
}
frame_metadata->frame_bytes_size = message_data.size();
auto frame_metadata = frame.first;
auto frame_data = frame.second;
#ifdef DEBUG_OUTPUT
cout << "[h5_zmq_writer::receive_zmq] Processing FrameMetadata";
@@ -132,7 +110,7 @@ void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer
#endif
// Commit the frame to the buffer.
ring_buffer.write(frame_metadata, static_cast<char*>(message_data.data()));
ring_buffer.write(frame_metadata, frame_data);
manager.received_frame(frame_metadata->frame_index);
}
@@ -142,92 +120,25 @@ void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer
#endif
}
boost::any ProcessManager::get_value_from_json(const pt::ptree& json_header, const string& value_name, const HEADER_DATA_TYPE data_type)
{
switch(data_type) {
case UINT8 :
return json_header.get<uint8_t>(value_name);
case UINT16 :
return json_header.get<uint16_t>(value_name);
case UINT32 :
return json_header.get<uint32_t>(value_name);
case UINT64 :
return json_header.get<uint64_t>(value_name);
case INT8 :
return json_header.get<int8_t>(value_name);
case INT16 :
return json_header.get<int16_t>(value_name);
case INT32 :
return json_header.get<int32_t>(value_name);
case INT64 :
return json_header.get<int64_t>(value_name);
case FLOAT32 :
return json_header.get<float>(value_name);
case FLOAT64 :
return json_header.get<double>(value_name);
default:
stringstream error_message;
error_message << "[ProcessManager::get_value_from_json] Unknown value type for header value " << value_name << endl;
throw runtime_error(error_message.str());
}
}
shared_ptr<FrameMetadata> ProcessManager::read_json_header(pt::ptree& json_header,
const string& header, const map<string, HEADER_DATA_TYPE>& header_data_type)
{
stringstream header_stream;
header_stream << header << endl;
pt::read_json(header_stream, json_header);
auto header_data = make_shared<FrameMetadata>();
header_data->frame_index = json_header.get<uint64_t>("frame");
for (const auto& item : json_header.get_child("shape")) {
header_data->frame_shape.push_back(item.second.get_value<size_t>());
}
// Array 1.0 specified little endian as the default encoding.
header_data->endianness = json_header.get("endianness", "little");
header_data->type = json_header.get<string>("type");
for (const auto& value_mapping : header_data_type) {
const auto& name = value_mapping.first;
const auto& data_type = value_mapping.second;
const boost::any& value = get_value_from_json(json_header, name, data_type);
header_data->header_values.insert(
{name, value}
);
}
return header_data;
}
void ProcessManager::run_writer(WriterManager& manager, const H5Format& format,
const string& connect_address, uint16_t rest_port)
ZmqReceiver& receiver, uint16_t rest_port)
{
size_t n_slots = config::ring_buffer_n_slots;
int n_io_threads = config::zmq_n_io_threads;
int receive_timeout = config::zmq_receive_timeout;
RingBuffer ring_buffer(n_slots);
#ifdef DEBUG_OUTPUT
cout << "[h5_zmq_writer::run_writer] Running writer";
cout << " with connect_address " << connect_address;
cout << "[h5_zmq_writer::run_writer] Running writer";
cout << " and output_file " << manager.get_output_file();
cout << " and n_slots " << n_slots;
cout << " and n_io_threads " << n_io_threads;
cout << " and receive_timeout " << receive_timeout;
cout << endl;
#endif
boost::thread receiver_thread(receive_zmq, boost::ref(manager), boost::ref(ring_buffer), boost::ref(format), connect_address, n_io_threads, receive_timeout);
boost::thread writer_thread(write_h5, boost::ref(manager), boost::ref(format), boost::ref(ring_buffer));
boost::thread receiver_thread(receive_zmq, boost::ref(manager), boost::ref(ring_buffer),
boost::ref(receiver), boost::ref(format));
boost::thread writer_thread(write_h5, boost::ref(manager),
boost::ref(format), boost::ref(ring_buffer));
RestApi::start_rest_api(manager, rest_port);
+3 -13
View File
@@ -1,23 +1,13 @@
#include <sstream>
#include <memory>
#include <boost/property_tree/json_parser.hpp>
#include "WriterManager.hpp"
#include "H5Format.hpp"
#include "RingBuffer.hpp"
#include "ZmqReceiver.hpp"
namespace ProcessManager
{
void run_writer(WriterManager& manager, const H5Format& format, const std::string& connect_address, uint16_t rest_port);
void run_writer(WriterManager& manager, const H5Format& format, ZmqReceiver& receiver, uint16_t rest_port);
void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, const H5Format& format,
const std::string& connect_address, int n_io_threads=1, int receive_timeout=-1);
void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, ZmqReceiver& receiver, const H5Format& format);
void write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer);
std::shared_ptr<FrameMetadata> read_json_header(boost::property_tree::ptree& json_header,
const std::string& header, const std::map<std::string, HEADER_DATA_TYPE>& header_data_type);
boost::any get_value_from_json(const boost::property_tree::ptree& json_header,
const std::string& value_name, const HEADER_DATA_TYPE data_type);
};