Adding zmq receiver

This commit is contained in:
2021-09-26 15:46:20 +02:00
parent c7b91f1ce9
commit 77aae103bf
8 changed files with 538 additions and 0 deletions
+2
View File
@@ -37,6 +37,8 @@ add_subdirectory("jf-assembler")
add_subdirectory("jf-zmqstreamer")
add_subdirectory("sf-stream")
add_subdirectory("sf-writer")
add_subdirectory("zmq-receiver")
if(BUILD_JF_LIVE_WRITER)
add_subdirectory("jf-live-writer")
+12
View File
@@ -0,0 +1,12 @@
file(GLOB SOURCES src/*.cpp)
add_library(zmq-receiver-lib STATIC ${SOURCES})
target_include_directories(zmq-receiver-lib PUBLIC include/)
target_link_libraries(zmq-receiver-lib external core-buffer-lib)
add_executable(zmq-receiver src/main.cpp)
set_target_properties(zmq-receiver PROPERTIES OUTPUT_NAME zmq_receiver)
target_link_libraries(zmq-receiver zmq-receiver-lib zmq rt pthread hdf5 hdf5_cpp)
enable_testing()
add_subdirectory(test/)
+147
View File
@@ -0,0 +1,147 @@
#ifndef SF_DAQ_BUFFER_TYPES_HPP
#define SF_DAQ_BUFFER_TYPES_HPP
#include <cstring>
#include "rapidjson/document.h"
#include "../../core-buffer/include/buffer_config.hpp"
#include "../../core-buffer/include/formats.hpp"
#include "Hdf5Writer.hpp"
#include "dict_t.hpp"
class ImageMetadataCache{
protected:
// General container
dict::dict_t hsh;
// Metadata for file IO
std::string base_path;
std::string detector_name;
// Block metadata
uint64_t block_start_pulse_id;
uint64_t block_stop_pulse_id;
// Caching indices
bool is_first_run = true;
size_t m_buffer_size;
size_t m_block_size;
uint64_t write_idx = 0;;
uint64_t run_id = 0;
public:
ImageMetadataCache(std::string base_path, std::string detector_name, size_t bs = buffer_config::BUFFER_BLOCK_SIZE):
base_path(base_path), detector_name(detector_name) {
m_buffer_size = bs;
// Nice usability feature
if(&base_path.back()=="/"){ base_path.pop_back(); }
// Fill up the hash according to ImageMetadata schema
hsh.set("version", std::vector<uint64_t>(bs) );
hsh.set("id", std::vector<uint64_t>(bs) );
hsh.set("height", std::vector<uint64_t>(bs) );
hsh.set("width", std::vector<uint64_t>(bs) );
hsh.set("dtype", std::vector<uint16_t>(bs) );
hsh.set("encoding", std::vector<uint16_t>(bs) );
hsh.set("array_id", std::vector<uint16_t>(bs) );
hsh.set("status", std::vector<uint16_t>(bs) );
hsh.set("user_1", std::vector<uint64_t>(bs) );
hsh.set("user_2", std::vector<uint64_t>(bs) );
};
bool is_full(){
return write_idx >= buffer_config::BUFFER_BLOCK_SIZE;
};
void append(void* meta, size_t meta_size, void* data, size_t data_size){
std::string jason_string((char*)meta, meta_size);
std::cout << jason_string << std::endl;
rapidjson::Document jason_parsed;
jason_parsed.Parse(jason_string.c_str());
//std::cout << "NI" << std::endl;
// Enforce flushing when full
if(is_full()){ write_to_disk(); }
if(is_first_run){ initBuffer(jason_parsed); is_first_run=false; }
//std::cout << "init" << std::endl;
//for (rapidjson::Value::ConstMemberIterator itr = jason_parsed.MemberBegin(); itr != jason_parsed.MemberEnd(); ++itr){
// std::string key(itr->name.GetString());
// std::cout << key << std::endl;
//}
// Update the hash
hsh.get<std::vector<uint64_t>&>("version")[write_idx] = jason_parsed["version"].GetInt();
hsh.get<std::vector<uint64_t>&>("id")[write_idx] = jason_parsed["id"].GetInt();
hsh.get<std::vector<uint64_t>&>("height")[write_idx] = jason_parsed["height"].GetInt();
hsh.get<std::vector<uint64_t>&>("width")[write_idx] = jason_parsed["width"].GetInt();
hsh.get<std::vector<uint16_t>&>("dtype")[write_idx] = jason_parsed["dtype"].GetInt();
hsh.get<std::vector<uint16_t>&>("encoding")[write_idx] = jason_parsed["encoding"].GetInt();
hsh.get<std::vector<uint16_t>&>("array_id")[write_idx] = jason_parsed["array_id"].GetInt();
hsh.get<std::vector<uint16_t>&>("status")[write_idx] = jason_parsed["status"].GetInt();
hsh.get<std::vector<uint64_t>&>("user_1")[write_idx] = jason_parsed["user_1"].GetInt();
hsh.get<std::vector<uint64_t>&>("user_2")[write_idx] = jason_parsed["user_2"].GetInt();
std::cout << "hashed" << std::endl;
// Hard coded type for now
auto data_buf = hsh.get<std::vector<uint16_t>&>("data");
std::memcpy(&data_buf[write_idx*m_block_size], data, std::min(data_size, m_block_size));
// Pop index
write_idx++;
};
void initBuffer(rapidjson::Document& meta){
size_t dsize = 2;
m_block_size = meta["width"].GetInt() * meta["height"].GetInt() * dsize;
std::cout << "Block size: " << m_block_size << " ( " << meta["width"].GetInt() << " x " << meta["height"].GetInt() << " ) " << std::endl;
// Hard coded type for now
hsh.set("data", std::vector<uint16_t>(m_buffer_size * m_block_size) );
}
void write_to_disk(){
std::cout << "Writing ImageMetadata cache to disk" << std::endl;
write_idx = 0;
std::string filename = base_path + "/Run" + std::to_string(run_id) + "_Batch.hdf5";
Hdf5Writer writer(filename);
writer.createGroup("/data/");
writer.createGroup("/data/" + detector_name);
writer.createGroup("/general/");
writer.createGroup("/general/" + detector_name);
writer.writeVector(hsh.get<std::vector<uint64_t>&>("version"), "/data/" + detector_name + "/version");
writer.writeVector(hsh.get<std::vector<uint64_t>&>("id"), "/data/" + detector_name + "/id");
writer.writeVector(hsh.get<std::vector<uint64_t>&>("width"), "/data/" + detector_name + "/width");
writer.writeVector(hsh.get<std::vector<uint64_t>&>("height"), "/data/" + detector_name + "/height");
writer.writeVector(hsh.get<std::vector<uint16_t>&>("dtype"), "/data/" + detector_name + "/dtype");
writer.writeVector(hsh.get<std::vector<uint16_t>&>("encoding"), "/data/" + detector_name + "/encoding");
writer.writeVector(hsh.get<std::vector<uint16_t>&>("array_id"), "/data/" + detector_name + "/array_id");
writer.writeVector(hsh.get<std::vector<uint16_t>&>("status"), "/data/" + detector_name + "/status");
writer.writeVector(hsh.get<std::vector<uint64_t>&>("user_1"), "/data/" + detector_name + "/user_1");
writer.writeVector(hsh.get<std::vector<uint64_t>&>("user_2"), "/data/" + detector_name + "/user_2");
std::cout << "Writing data: " << hsh.get<std::vector<uint16_t>&>("data").size() << std::endl;
writer.writeArray(hsh.get<std::vector<uint16_t>&>("data"), "/data/" + detector_name + "/data");
run_id++;
};
};
#endif //SF_DAQ_BUFFER_TYPES_HPP
+78
View File
@@ -0,0 +1,78 @@
#ifndef SF_DAQ_BUFFER_HDF5_WRITER_HPP
#define SF_DAQ_BUFFER_HDF5_WRITER_HPP
#include <hdf5.h>
#include <H5Cpp.h>
#include <vector>
#include <iostream>
#include <typeinfo>
class Hdf5Writer {
protected:
H5::H5File m_file;
template <typename TY>
H5::DataType get_datatype_for(){
if(typeid(TY) == typeid(float) ) return H5::PredType::NATIVE_FLOAT;
if(typeid(TY) == typeid(double) ) return H5::PredType::NATIVE_DOUBLE;
if(typeid(TY) == typeid(char) ) return H5::PredType::NATIVE_CHAR;
if(typeid(TY) == typeid(short) ) return H5::PredType::NATIVE_SHORT;
if(typeid(TY) == typeid(long) ) return H5::PredType::NATIVE_LONG;
if(typeid(TY) == typeid(int8_t) ) return H5::PredType::NATIVE_INT8;
if(typeid(TY) == typeid(uint8_t) ) return H5::PredType::NATIVE_UINT8;
if(typeid(TY) == typeid(int16_t) ) return H5::PredType::NATIVE_INT16;
if(typeid(TY) == typeid(uint16_t) ) return H5::PredType::NATIVE_UINT16;
if(typeid(TY) == typeid(int32_t) ) return H5::PredType::NATIVE_INT32;
if(typeid(TY) == typeid(uint32_t) ) return H5::PredType::NATIVE_UINT32;
if(typeid(TY) == typeid(int64_t) ) return H5::PredType::NATIVE_INT64;
if(typeid(TY) == typeid(uint64_t) ) return H5::PredType::NATIVE_UINT64;
// If all fails
return H5::PredType::NATIVE_CHAR;
};
public:
Hdf5Writer( std::string filename): m_file(filename, H5F_ACC_TRUNC) {
// Stop vomiting to console
H5::Exception::dontPrint();
};
void createGroup(std::string groupname){ m_file.createGroup(groupname); };
template <typename TY>
void writeVector(const std::vector<TY>& data_ref, std::string ipath){
/* Allocating containers for the data */
H5::DataType ds_type = this->get_datatype_for<TY>();
hsize_t ds_dims[1] = { data_ref.size() };
H5::DataSpace ds_space(1, ds_dims);
H5::DataSet dataset = m_file.createDataSet(ipath, ds_type, ds_space);
/* Writing array to hdf5 file */
dataset.write(data_ref.data(), ds_type, H5S_ALL, ds_space);
/* Close dataset */
dataset.close();
};
template <typename TY>
void writeArray(const std::vector<TY>& data_ref, const std::vector<TY>& shape_ref, std::string ipath){
/* Allocating containers for the data */
int64_t n_dim = shape_ref.size();
hsize_t ds_dims[n_dim];
for(int64_t dd=0; dd<n_dim; dd++) { ds_dims[dd]=shape_ref[dd]; }
H5::DataType ds_type = this->get_datatype_for<TY>();
H5::DataSpace ds_space(n_dim, ds_dims);
H5::DataSet dataset = m_file.createDataSet(ipath, ds_type, ds_space);
/* Writing array to hdf5 file */
dataset.write(data_ref.data(), ds_type, H5S_ALL, ds_space);
/* Close dataset */
dataset.close();
};
}; // class Hdf5Writer
#endif //SF_DAQ_BUFFER_HDF5_WRITER_HPP
+162
View File
@@ -0,0 +1,162 @@
#ifndef __HASH_TYPE_HPP__
#define __HASH_TYPE_HPP__
#include <any>
#include <unordered_map>
#include <typeinfo>
#include <list>
#include <iostream>
#include <ostream>
#include <algorithm>
#include <stdexcept>
namespace dict {
/**Python-like dictionary class for C++
It uses 'boost::any' types to provide common storage and ease the strict typing of C++.
The exact type still must be known at insertion and retrieval but not during storage.
NOTE: This hash was never meant to be fast, but clean and maintainable!
NOTE: The variables are passed and retrieved by value or reference (see std::any for details)!**/
class dict_t {
public:
dict_t() {};
virtual ~dict_t() {};
/**Add a variable to the hash
The type must be known for insertion.
NOTE: The variable is passed by value!**/
template<typename TY>
void set(std::string key, TY val) {
vhash[key] = val;
}
/**Add a variable to the hash as a boost::any value
NOTE: The variable is passed by value!**/
void set_raw(std::string key, std::any val) {
vhash[key] = val;
}
/**Retrieve a variable from the hash
The type must be known beforehand for successful retrieval.
NOTE: The variable is returned by value!**/
template<typename TY>
TY get(std::string key) {
if(!this->has(key)){
std::string msg = "KeyError: Unable to find key '" + key + "' in the current dictionary.";
throw std::invalid_argument(msg);
}
try{
return std::any_cast<TY>(vhash[key]);
} catch (const std::bad_any_cast& e){
std::string msg = e.what() + std::string(" thrown for key ") + key;
std::cerr << msg << std::endl;
throw std::bad_any_cast();
}
}
/**Retrieve a variable from the hash as a boost::any value
NOTE: The variable is returned by value!**/
std::any get_raw(std::string key) {
if(!this->has(key)){
std::string msg = "KeyError: Unable to find key '" + key + "' in the current dictionary.";
throw std::invalid_argument(msg);
}
return vhash[key];
}
/**Delete a key from the hash to free memory.**/
void del(std::string key) {
vhash.erase(key);
}
/**Get the list of keys in the hash**/
std::list<std::string> keys() {
std::list<std::string> keys;
for( const auto& item: vhash ) {
keys.push_back(item.first);
}
return keys;
}
/**Check if the key is in the hash**/
bool has(std::string key) {
auto h_keys = this->keys();
return bool(std::find(h_keys.begin(), h_keys.end(), key) != h_keys.end());
}
/**Copy a list of parameters into a new dict_t.**/
dict_t select(std::list<std::string> keys) {
dict_t newTable;
for( const auto& key: keys ) {
newTable.set(key, this->vhash[key]);
}
return newTable;
}
/**Merge another dict_t into the current one
NOTE: Duplicate keys will be overwritten! **/
void merge(dict_t& other) {
for( const auto& key: other.keys() ) {
this->set_raw(key, other.get_raw(key));
}
}
/**The number of elements in the hash**/
size_t size() const {
return this->vhash.size();
}
/**Helper functions to gather type information about the stored data.**/
template<typename TY>
bool is_type(std::string key) {
std::any a = vhash[key];
return (a.type()==typeid(TY));
}
/**Assignment operator**/
dict_t& operator=(dict_t& other) {
this->vhash.clear();
for( auto key: other.keys() ) {
this->set_raw(key, other.get_raw(key));
}
return *this;
}
/**Merge operators**/
dict_t& operator+=(dict_t& other) {
for( auto key: other.keys() ) {
this->set_raw(key, other.get_raw(key));
}
return *this;
}
/**Merge operator**/
dict_t& operator+=(dict_t other) {
for( auto key: other.keys() ) {
this->set_raw(key, other.get_raw(key));
}
return *this;
}
/**Array subscript operator is same as get()**/
template<typename TY>
TY operator[](const std::string key) {
return this->get<TY>(key);
}
protected:
/**The actual container that stores key - value pairs
It uses std::any to allow lazy typing.**/
std::unordered_map<std::string, std::any> vhash;
};
/**Print the array to the output stream**/
std::ostream& operator<<( std::ostream&, dict_t&);
} /**End namespace**/
#endif /* __HASH_TYPE_HPP__ */
+39
View File
@@ -0,0 +1,39 @@
#include "dict_t.hpp"
/**Print the hash into the output stream**/
std::ostream& dict::operator<<( std::ostream &output, dict::dict_t& hsh) {
output << "Printing known key-value pairs in hash table:\n";
for( const auto& key: hsh.keys() ) {
if( hsh.is_type<std::string>(key)) {
output << key << "\t" << hsh.get<std::string>(key) << "\t" << hsh.get_raw(key).type().name() << "\n";}
else if( hsh.is_type<double>(key)) {
output << key << "\t" << hsh.get<double>(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; }
else if( hsh.is_type<float>(key)) {
output << key << "\t" << hsh.get<float>(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; }
else if( hsh.is_type<int64_t>(key)) {
output << key << "\t" << hsh.get<int64_t>(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; }
else if( hsh.is_type<uint64_t>(key)) {
output << key << "\t" << hsh.get<uint64_t>(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; }
else if( hsh.is_type<int32_t>(key)) {
output << key << "\t" << hsh.get<int32_t>(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; }
else if( hsh.is_type<uint32_t>(key)) {
output << key << "\t" << hsh.get<uint32_t>(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; }
else if( hsh.is_type<int16_t>(key)) {
output << key << "\t" << hsh.get<int16_t>(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; }
else if( hsh.is_type<uint16_t>(key)) {
output << key << "\t" << hsh.get<uint16_t>(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; }
else if( hsh.is_type<int8_t>(key)) {
output << key << "\t" << hsh.get<int8_t>(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; }
else if( hsh.is_type<uint8_t>(key)) {
output << key << "\t" << hsh.get<uint8_t>(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; }
else {
output << key << "\t" << hsh.get_raw(key).type().name() << "\n";
}
}
// Flush buffer
output << std::endl;
return output;
}
+98
View File
@@ -0,0 +1,98 @@
#include <zmq.hpp>
#include <iostream>
#include <sstream>
#include "../../core-buffer/include/buffer_config.hpp"
#include "../include/BufferTypes.hpp"
int main (int argc, char *argv[]){
if (argc != 4) {
std::cout << "\nERROR\nUsage: jf_buffer_writer [zmq_topic] [zmq_sub_addr] [detector_name]\n";
std::cout << "Topic corresponds to a supported format: IMAGEDATA\n";
exit(-1);
}
std::string topic(argv[1]);
std::string sub_raw(argv[2]);
std::string detector_name(argv[3]);
std::string sub_addr("tcp://" + sub_raw);
std::cout << "Starting ZMQ receiver at:\nPORT:\t" << sub_addr << "\nTOPIC:\t" << topic << std::endl;
// Allocate the RamBuffer
//RamBuffer buffer(config.detector_name, config.n_modules);
// ZMQ communication setup
std::cout << "Subscribing to server...\n" << std::endl;
zmq::context_t context (1);
// Subscribe to TOPIC (expected schema)
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect(sub_addr.c_str());
subscriber.setsockopt(ZMQ_SUBSCRIBE, topic.c_str(), topic.size());
// Publisher to ipc
std::cout << "Crating publisher...\n" << std::endl;
zmq::socket_t republisher (context, ZMQ_PUB);
// const int sndhwm = buffer_config::BUFFER_ZMQ_SNDHWM;
// republisher.setsockopt(ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm));
// const int linger = 0;
// republisher.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
// std::string ipc_addr = buffer_config::BUFFER_LIVE_IPC_URL + detector_name + "-assembler";
std::string ipc_addr = "tcp://*:5333";
try{
republisher.bind(ipc_addr.c_str());
} catch (std::exception& ex){
std::string msg = "Failed to bing publisher to address '" + ipc_addr + "': " + ex.what();
throw std::runtime_error(msg);
}
// Process 100 updates
int num_img = 0;
long total_temp = 0;
zmq::message_t msg_topic;
zmq::message_t msg_meta;
zmq::message_t msg_data;
ImageMetadataCache cache("./dataout", "raspicam");
std::cout << "I'm listening...\n" << std::endl;
for (int idx = 0; idx < 100000; idx++) {
// ZMQ guarantees full delivery of multipart massages!
// Packets are sent as three part messages: topic + meta + data
subscriber.recv(&msg_topic, 0);
subscriber.recv(&msg_meta, 0);
subscriber.recv(&msg_data, 0);
// Schema (topic) specific saving)
if(topic=="IMAGEDATA"){
cache.append((void*)msg_meta.data(), msg_meta.size(), (void*)msg_data.data(), msg_data.size());
//buffer.write_image((ImageMetadata*)msg_meta.data(), (char*)msg_data.data);
if(idx%100==0){
std::cout << "Received " << idx << " (at size " << msg_data.size() << " )" << std::endl;
}
}
}
return 0;
}
View File