From 378360932421ecf120e7228bbfd89c99ac264255 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 12 Feb 2018 11:13:33 +0100 Subject: [PATCH] ZmqReceiver added --- src/ZmqReceiver.cpp | 138 ++++++++++++++++++++++++++++++++++++++++++++ src/ZmqReceiver.hpp | 57 ++++++++++++++++++ 2 files changed, 195 insertions(+) create mode 100644 src/ZmqReceiver.cpp create mode 100644 src/ZmqReceiver.hpp diff --git a/src/ZmqReceiver.cpp b/src/ZmqReceiver.cpp new file mode 100644 index 0000000..d9ecfce --- /dev/null +++ b/src/ZmqReceiver.cpp @@ -0,0 +1,138 @@ +#include +#include + +#include "config.hpp" +#include "ZmqReceiver.hpp" +#include "H5Format.hpp" + +using namespace std; +namespace pt = boost::property_tree; + +ZmqReceiver::ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout) : + connect_address(connect_address), n_io_threads(n_io_threads), receive_timeout(receive_timeout), receiver(NULL) +{ + #ifdef DEBUG_OUTPUT + cout << "[ZmqReceiver::ZmqReceiver] Creating ZMQ receiver with"; + cout << " connect_address " << connect_address; + cout << " n_io_threads " << n_io_threads; + cout << " receive_timeout " << receive_timeout; + cout << endl; + #endif + + message_header = zmq::message_t(config::zmq_buffer_size_header); + message_data = zmq::message_t(config::zmq_buffer_size_data); + + header_values_type.reset( + new unordered_map({ + {"pulse_id", UINT64 }, + })); +} + +void ZmqReceiver::connect() +{ + #ifdef DEBUG_OUTPUT + cout << "[ZmqReceiver::connect] Connecting to address " << connect_address; + cout << " with n_io_threads " << n_io_threads << endl; + #endif + + context = make_shared(n_io_threads); + receiver = make_shared(*context, ZMQ_PULL); + + receiver->setsockopt(ZMQ_RCVTIMEO, receive_timeout); + receiver->connect(connect_address); +} + +pair, char*> ZmqReceiver::receive() +{ + if (!receiver) { + stringstream error_message; + error_message << "[ZmqReceiver::receive] Cannot receive before connecting. "; + error_message << "Connect first." << endl; + + throw runtime_error(error_message.str()); + } + + // Get the message header. + if (!receiver->recv(&message_header)){ + return {NULL, NULL}; + } + + auto header_string = string(static_cast(message_header.data()), message_header.size()); + auto frame_metadata = read_json_header(header_string); + + // Get the message data. + if (!receiver->recv(&message_data)) { + cout << "[ZmqReceiver::receive] ERROR: Error while reading from ZMQ. Frame index " << frame_metadata->frame_index << " lost."; + cout << " Trying to continue with the next frame." << endl;\ + + return {NULL, NULL}; + } + + frame_metadata->frame_bytes_size = message_data.size(); + + return {frame_metadata, static_cast(message_data.data())}; +} + +boost::any ZmqReceiver::get_value_from_json(const pt::ptree& json_header, const string& value_name, const HEADER_DATA_TYPE data_type) +{ + switch(data_type) { + case UINT8 : + return json_header.get(value_name); + case UINT16 : + return json_header.get(value_name); + case UINT32 : + return json_header.get(value_name); + case UINT64 : + return json_header.get(value_name); + case INT8 : + return json_header.get(value_name); + case INT16 : + return json_header.get(value_name); + case INT32 : + return json_header.get(value_name); + case INT64 : + return json_header.get(value_name); + case FLOAT32 : + return json_header.get(value_name); + case FLOAT64 : + return json_header.get(value_name); + default: + stringstream error_message; + error_message << "[ZmqReceiver::get_value_from_json] Unknown value type for header value " << value_name << endl; + + throw runtime_error(error_message.str()); + } +} + +shared_ptr ZmqReceiver::read_json_header(const string& header) +{ + stringstream header_stream; + header_stream << header << endl; + pt::read_json(header_stream, json_header); + + auto header_data = make_shared(); + + header_data->frame_index = json_header.get("frame"); + + for (const auto& item : json_header.get_child("shape")) { + header_data->frame_shape.push_back(item.second.get_value()); + } + + // Array 1.0 specified little endian as the default encoding. + header_data->endianness = json_header.get("endianness", "little"); + + header_data->type = json_header.get("type"); + + for (const auto& value_mapping : *header_values_type) { + + const auto& name = value_mapping.first; + const auto& data_type = value_mapping.second; + const boost::any& value = get_value_from_json(json_header, name, data_type); + + header_data->header_values.insert( + {name, value} + ); + } + + return header_data; +} \ No newline at end of file diff --git a/src/ZmqReceiver.hpp b/src/ZmqReceiver.hpp new file mode 100644 index 0000000..e91c594 --- /dev/null +++ b/src/ZmqReceiver.hpp @@ -0,0 +1,57 @@ +#ifndef ZMQRECEIVER_H +#define ZMQRECEIVER_H + +#include +#include +#include +#include +#include +#include +#include + +#include "RingBuffer.hpp" + +enum HEADER_DATA_TYPE +{ + UINT8, + UINT16, + UINT32, + UINT64, + INT8, + INT16, + INT32, + INT64, + FLOAT32, + FLOAT64 +}; + +class ZmqReceiver +{ + const std::string connect_address; + const int n_io_threads; + const int receive_timeout; + std::shared_ptr receiver = NULL; + std::shared_ptr context = NULL; + zmq::message_t message_header; + zmq::message_t message_data; + boost::property_tree::ptree json_header; + + std::shared_ptr> header_values_type = NULL; + + std::shared_ptr read_json_header(const std::string& header); + + boost::any get_value_from_json(const boost::property_tree::ptree& json_header, + const std::string& value_name, const HEADER_DATA_TYPE data_type); + + public: + ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout); + + virtual ~ZmqReceiver(){}; + + void connect(); + + std::pair, char*> receive(); + +}; + +#endif \ No newline at end of file