diff --git a/src/ProcessManager.cpp b/src/ProcessManager.cpp index 36aba1f..a0c3a41 100644 --- a/src/ProcessManager.cpp +++ b/src/ProcessManager.cpp @@ -1,24 +1,20 @@ -#include #include #include #include #include -#include #include #include +#include -#include "ProcessManager.hpp" -#include "config.hpp" -#include "H5Writer.hpp" #include "RestApi.hpp" using namespace std; -namespace pt = boost::property_tree; -void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer) +void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer) { H5Writer writer(manager.get_output_file()); auto raw_frames_dataset_name = format.get_raw_frames_dataset_name(); + const auto& header_value_type = format.get_header_value_type(); // Run until the running flag is set or the ring_buffer is empty. while(manager.is_running() || !ring_buffer.is_empty()) { @@ -31,7 +27,7 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri const pair< shared_ptr, char* > received_data = ring_buffer.read(); // NULL pointer means that the ringbuffer->read() timeouted. Faster than rising an exception. - if(!received_data.second) { + if(!received_data.first) { continue; } @@ -86,40 +82,22 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri exit(0); } -void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, const H5Format& format, - const string& connect_address, int n_io_threads, int receive_timeout) +void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, + ZmqReceiver& receiver, const H5Format& format) { - zmq::context_t context(n_io_threads); - zmq::socket_t receiver(context, ZMQ_PULL); - receiver.setsockopt(ZMQ_RCVTIMEO, receive_timeout); - receiver.connect(connect_address); - - zmq::message_t message_header(config::zmq_buffer_size_header); - zmq::message_t message_data(config::zmq_buffer_size_data); - - pt::ptree json_header; const auto& header_value_type = format.get_header_value_type(); while (manager.is_running()) { - // Get the message header. - if (!receiver.recv(&message_header)){ + + auto frame = receiver.receive(); + + // In case no message is available before the timeout, both pointers are NULL. + if (!frame.first){ continue; } - auto header_string = string(static_cast(message_header.data()), message_header.size()); - auto frame_metadata = read_json_header(json_header, header_string, header_value_type); - - // Get the message data. - if (!receiver.recv(&message_data)) { - cout << "[h5_zmq_writer::receive_zmq] ERROR: Error while reading from ZMQ. Frame index " << frame_metadata->frame_index << " lost."; - cout << " Trying to continue with the next frame." << endl; - - manager.lost_frame(frame_metadata->frame_index); - - continue; - } - - frame_metadata->frame_bytes_size = message_data.size(); + auto frame_metadata = frame.first; + auto frame_data = frame.second; #ifdef DEBUG_OUTPUT cout << "[h5_zmq_writer::receive_zmq] Processing FrameMetadata"; @@ -132,7 +110,7 @@ void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer #endif // Commit the frame to the buffer. - ring_buffer.write(frame_metadata, static_cast(message_data.data())); + ring_buffer.write(frame_metadata, frame_data); manager.received_frame(frame_metadata->frame_index); } @@ -142,92 +120,25 @@ void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer #endif } -boost::any ProcessManager::get_value_from_json(const pt::ptree& json_header, const string& value_name, const HEADER_DATA_TYPE data_type) -{ - switch(data_type) { - case UINT8 : - return json_header.get(value_name); - case UINT16 : - return json_header.get(value_name); - case UINT32 : - return json_header.get(value_name); - case UINT64 : - return json_header.get(value_name); - case INT8 : - return json_header.get(value_name); - case INT16 : - return json_header.get(value_name); - case INT32 : - return json_header.get(value_name); - case INT64 : - return json_header.get(value_name); - case FLOAT32 : - return json_header.get(value_name); - case FLOAT64 : - return json_header.get(value_name); - default: - stringstream error_message; - error_message << "[ProcessManager::get_value_from_json] Unknown value type for header value " << value_name << endl; - throw runtime_error(error_message.str()); - } -} - -shared_ptr ProcessManager::read_json_header(pt::ptree& json_header, - const string& header, const map& header_data_type) -{ - stringstream header_stream; - header_stream << header << endl; - pt::read_json(header_stream, json_header); - - auto header_data = make_shared(); - - header_data->frame_index = json_header.get("frame"); - - for (const auto& item : json_header.get_child("shape")) { - header_data->frame_shape.push_back(item.second.get_value()); - } - - // Array 1.0 specified little endian as the default encoding. - header_data->endianness = json_header.get("endianness", "little"); - - header_data->type = json_header.get("type"); - - for (const auto& value_mapping : header_data_type) { - - const auto& name = value_mapping.first; - const auto& data_type = value_mapping.second; - const boost::any& value = get_value_from_json(json_header, name, data_type); - - header_data->header_values.insert( - {name, value} - ); - } - - return header_data; -} void ProcessManager::run_writer(WriterManager& manager, const H5Format& format, - const string& connect_address, uint16_t rest_port) + ZmqReceiver& receiver, uint16_t rest_port) { size_t n_slots = config::ring_buffer_n_slots; - int n_io_threads = config::zmq_n_io_threads; - int receive_timeout = config::zmq_receive_timeout; - RingBuffer ring_buffer(n_slots); #ifdef DEBUG_OUTPUT - cout << "[h5_zmq_writer::run_writer] Running writer"; - cout << " with connect_address " << connect_address; + cout << "[h5_zmq_writer::run_writer] Running writer"; cout << " and output_file " << manager.get_output_file(); cout << " and n_slots " << n_slots; - cout << " and n_io_threads " << n_io_threads; - cout << " and receive_timeout " << receive_timeout; cout << endl; #endif - boost::thread receiver_thread(receive_zmq, boost::ref(manager), boost::ref(ring_buffer), boost::ref(format), connect_address, n_io_threads, receive_timeout); - boost::thread writer_thread(write_h5, boost::ref(manager), boost::ref(format), boost::ref(ring_buffer)); + boost::thread receiver_thread(receive_zmq, boost::ref(manager), boost::ref(ring_buffer), + boost::ref(receiver), boost::ref(format)); + boost::thread writer_thread(write_h5, boost::ref(manager), + boost::ref(format), boost::ref(ring_buffer)); RestApi::start_rest_api(manager, rest_port); diff --git a/src/ProcessManager.hpp b/src/ProcessManager.hpp index 163d92b..b872762 100644 --- a/src/ProcessManager.hpp +++ b/src/ProcessManager.hpp @@ -1,23 +1,13 @@ -#include -#include -#include - #include "WriterManager.hpp" #include "H5Format.hpp" #include "RingBuffer.hpp" +#include "ZmqReceiver.hpp" namespace ProcessManager { - void run_writer(WriterManager& manager, const H5Format& format, const std::string& connect_address, uint16_t rest_port); + void run_writer(WriterManager& manager, const H5Format& format, ZmqReceiver& receiver, uint16_t rest_port); - void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, const H5Format& format, - const std::string& connect_address, int n_io_threads=1, int receive_timeout=-1); + void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, ZmqReceiver& receiver, const H5Format& format); void write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer); - - std::shared_ptr read_json_header(boost::property_tree::ptree& json_header, - const std::string& header, const std::map& header_data_type); - - boost::any get_value_from_json(const boost::property_tree::ptree& json_header, - const std::string& value_name, const HEADER_DATA_TYPE data_type); }; \ No newline at end of file