diff --git a/CMakeLists.txt b/CMakeLists.txt index e3575ad..03e67fe 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -37,6 +37,8 @@ add_subdirectory("jf-assembler") add_subdirectory("jf-zmqstreamer") add_subdirectory("sf-stream") add_subdirectory("sf-writer") +add_subdirectory("zmq-receiver") + if(BUILD_JF_LIVE_WRITER) add_subdirectory("jf-live-writer") diff --git a/zmq-receiver/CMakeLists.txt b/zmq-receiver/CMakeLists.txt new file mode 100644 index 0000000..9a564db --- /dev/null +++ b/zmq-receiver/CMakeLists.txt @@ -0,0 +1,12 @@ +file(GLOB SOURCES src/*.cpp) + +add_library(zmq-receiver-lib STATIC ${SOURCES}) +target_include_directories(zmq-receiver-lib PUBLIC include/) +target_link_libraries(zmq-receiver-lib external core-buffer-lib) + +add_executable(zmq-receiver src/main.cpp) +set_target_properties(zmq-receiver PROPERTIES OUTPUT_NAME zmq_receiver) +target_link_libraries(zmq-receiver zmq-receiver-lib zmq rt pthread hdf5 hdf5_cpp) + +enable_testing() +add_subdirectory(test/) diff --git a/zmq-receiver/include/BufferTypes.hpp b/zmq-receiver/include/BufferTypes.hpp new file mode 100644 index 0000000..f769334 --- /dev/null +++ b/zmq-receiver/include/BufferTypes.hpp @@ -0,0 +1,147 @@ +#ifndef SF_DAQ_BUFFER_TYPES_HPP +#define SF_DAQ_BUFFER_TYPES_HPP + +#include +#include "rapidjson/document.h" + +#include "../../core-buffer/include/buffer_config.hpp" +#include "../../core-buffer/include/formats.hpp" +#include "Hdf5Writer.hpp" +#include "dict_t.hpp" + + +class ImageMetadataCache{ +protected: + // General container + dict::dict_t hsh; + // Metadata for file IO + std::string base_path; + std::string detector_name; + + // Block metadata + uint64_t block_start_pulse_id; + uint64_t block_stop_pulse_id; + + // Caching indices + bool is_first_run = true; + size_t m_buffer_size; + size_t m_block_size; + uint64_t write_idx = 0;; + uint64_t run_id = 0; + +public: + ImageMetadataCache(std::string base_path, std::string detector_name, size_t bs = buffer_config::BUFFER_BLOCK_SIZE): + base_path(base_path), detector_name(detector_name) { + m_buffer_size = bs; + + // Nice usability feature + if(&base_path.back()=="/"){ base_path.pop_back(); } + + // Fill up the hash according to ImageMetadata schema + hsh.set("version", std::vector(bs) ); + hsh.set("id", std::vector(bs) ); + hsh.set("height", std::vector(bs) ); + hsh.set("width", std::vector(bs) ); + hsh.set("dtype", std::vector(bs) ); + hsh.set("encoding", std::vector(bs) ); + hsh.set("array_id", std::vector(bs) ); + hsh.set("status", std::vector(bs) ); + hsh.set("user_1", std::vector(bs) ); + hsh.set("user_2", std::vector(bs) ); + }; + + bool is_full(){ + return write_idx >= buffer_config::BUFFER_BLOCK_SIZE; + }; + + void append(void* meta, size_t meta_size, void* data, size_t data_size){ + + std::string jason_string((char*)meta, meta_size); + std::cout << jason_string << std::endl; + + rapidjson::Document jason_parsed; + + jason_parsed.Parse(jason_string.c_str()); + + //std::cout << "NI" << std::endl; + + // Enforce flushing when full + if(is_full()){ write_to_disk(); } + if(is_first_run){ initBuffer(jason_parsed); is_first_run=false; } + + //std::cout << "init" << std::endl; + + //for (rapidjson::Value::ConstMemberIterator itr = jason_parsed.MemberBegin(); itr != jason_parsed.MemberEnd(); ++itr){ + // std::string key(itr->name.GetString()); + // std::cout << key << std::endl; + //} + + // Update the hash + hsh.get&>("version")[write_idx] = jason_parsed["version"].GetInt(); + hsh.get&>("id")[write_idx] = jason_parsed["id"].GetInt(); + hsh.get&>("height")[write_idx] = jason_parsed["height"].GetInt(); + hsh.get&>("width")[write_idx] = jason_parsed["width"].GetInt(); + hsh.get&>("dtype")[write_idx] = jason_parsed["dtype"].GetInt(); + hsh.get&>("encoding")[write_idx] = jason_parsed["encoding"].GetInt(); + hsh.get&>("array_id")[write_idx] = jason_parsed["array_id"].GetInt(); + hsh.get&>("status")[write_idx] = jason_parsed["status"].GetInt(); + hsh.get&>("user_1")[write_idx] = jason_parsed["user_1"].GetInt(); + hsh.get&>("user_2")[write_idx] = jason_parsed["user_2"].GetInt(); + + std::cout << "hashed" << std::endl; + + // Hard coded type for now + auto data_buf = hsh.get&>("data"); + std::memcpy(&data_buf[write_idx*m_block_size], data, std::min(data_size, m_block_size)); + + // Pop index + write_idx++; + }; + + void initBuffer(rapidjson::Document& meta){ + size_t dsize = 2; + m_block_size = meta["width"].GetInt() * meta["height"].GetInt() * dsize; + std::cout << "Block size: " << m_block_size << " ( " << meta["width"].GetInt() << " x " << meta["height"].GetInt() << " ) " << std::endl; + + + // Hard coded type for now + hsh.set("data", std::vector(m_buffer_size * m_block_size) ); + } + + + void write_to_disk(){ + std::cout << "Writing ImageMetadata cache to disk" << std::endl; + write_idx = 0; + + std::string filename = base_path + "/Run" + std::to_string(run_id) + "_Batch.hdf5"; + Hdf5Writer writer(filename); + writer.createGroup("/data/"); + writer.createGroup("/data/" + detector_name); + writer.createGroup("/general/"); + writer.createGroup("/general/" + detector_name); + + writer.writeVector(hsh.get&>("version"), "/data/" + detector_name + "/version"); + writer.writeVector(hsh.get&>("id"), "/data/" + detector_name + "/id"); + writer.writeVector(hsh.get&>("width"), "/data/" + detector_name + "/width"); + writer.writeVector(hsh.get&>("height"), "/data/" + detector_name + "/height"); + writer.writeVector(hsh.get&>("dtype"), "/data/" + detector_name + "/dtype"); + writer.writeVector(hsh.get&>("encoding"), "/data/" + detector_name + "/encoding"); + writer.writeVector(hsh.get&>("array_id"), "/data/" + detector_name + "/array_id"); + writer.writeVector(hsh.get&>("status"), "/data/" + detector_name + "/status"); + writer.writeVector(hsh.get&>("user_1"), "/data/" + detector_name + "/user_1"); + writer.writeVector(hsh.get&>("user_2"), "/data/" + detector_name + "/user_2"); + + std::cout << "Writing data: " << hsh.get&>("data").size() << std::endl; + writer.writeArray(hsh.get&>("data"), "/data/" + detector_name + "/data"); + + + run_id++; + }; + +}; + + + + + +#endif //SF_DAQ_BUFFER_TYPES_HPP diff --git a/zmq-receiver/include/Hdf5Writer.hpp b/zmq-receiver/include/Hdf5Writer.hpp new file mode 100644 index 0000000..596134d --- /dev/null +++ b/zmq-receiver/include/Hdf5Writer.hpp @@ -0,0 +1,78 @@ +#ifndef SF_DAQ_BUFFER_HDF5_WRITER_HPP +#define SF_DAQ_BUFFER_HDF5_WRITER_HPP + +#include +#include +#include +#include +#include + + +class Hdf5Writer { +protected: + H5::H5File m_file; + + template + H5::DataType get_datatype_for(){ + if(typeid(TY) == typeid(float) ) return H5::PredType::NATIVE_FLOAT; + if(typeid(TY) == typeid(double) ) return H5::PredType::NATIVE_DOUBLE; + if(typeid(TY) == typeid(char) ) return H5::PredType::NATIVE_CHAR; + if(typeid(TY) == typeid(short) ) return H5::PredType::NATIVE_SHORT; + if(typeid(TY) == typeid(long) ) return H5::PredType::NATIVE_LONG; + + if(typeid(TY) == typeid(int8_t) ) return H5::PredType::NATIVE_INT8; + if(typeid(TY) == typeid(uint8_t) ) return H5::PredType::NATIVE_UINT8; + if(typeid(TY) == typeid(int16_t) ) return H5::PredType::NATIVE_INT16; + if(typeid(TY) == typeid(uint16_t) ) return H5::PredType::NATIVE_UINT16; + + if(typeid(TY) == typeid(int32_t) ) return H5::PredType::NATIVE_INT32; + if(typeid(TY) == typeid(uint32_t) ) return H5::PredType::NATIVE_UINT32; + if(typeid(TY) == typeid(int64_t) ) return H5::PredType::NATIVE_INT64; + if(typeid(TY) == typeid(uint64_t) ) return H5::PredType::NATIVE_UINT64; + + // If all fails + return H5::PredType::NATIVE_CHAR; + }; + +public: + Hdf5Writer( std::string filename): m_file(filename, H5F_ACC_TRUNC) { + // Stop vomiting to console + H5::Exception::dontPrint(); + }; + + void createGroup(std::string groupname){ m_file.createGroup(groupname); }; + + template + void writeVector(const std::vector& data_ref, std::string ipath){ + /* Allocating containers for the data */ + H5::DataType ds_type = this->get_datatype_for(); + hsize_t ds_dims[1] = { data_ref.size() }; + H5::DataSpace ds_space(1, ds_dims); + H5::DataSet dataset = m_file.createDataSet(ipath, ds_type, ds_space); + + /* Writing array to hdf5 file */ + dataset.write(data_ref.data(), ds_type, H5S_ALL, ds_space); + /* Close dataset */ + dataset.close(); + }; + + template + void writeArray(const std::vector& data_ref, const std::vector& shape_ref, std::string ipath){ + /* Allocating containers for the data */ + int64_t n_dim = shape_ref.size(); + hsize_t ds_dims[n_dim]; + for(int64_t dd=0; ddget_datatype_for(); + + H5::DataSpace ds_space(n_dim, ds_dims); + H5::DataSet dataset = m_file.createDataSet(ipath, ds_type, ds_space); + + /* Writing array to hdf5 file */ + dataset.write(data_ref.data(), ds_type, H5S_ALL, ds_space); + /* Close dataset */ + dataset.close(); + }; +}; // class Hdf5Writer + + +#endif //SF_DAQ_BUFFER_HDF5_WRITER_HPP diff --git a/zmq-receiver/include/dict_t.hpp b/zmq-receiver/include/dict_t.hpp new file mode 100644 index 0000000..4f97bdd --- /dev/null +++ b/zmq-receiver/include/dict_t.hpp @@ -0,0 +1,162 @@ +#ifndef __HASH_TYPE_HPP__ +#define __HASH_TYPE_HPP__ + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace dict { + + /**Python-like dictionary class for C++ + It uses 'boost::any' types to provide common storage and ease the strict typing of C++. + The exact type still must be known at insertion and retrieval but not during storage. + + NOTE: This hash was never meant to be fast, but clean and maintainable! + NOTE: The variables are passed and retrieved by value or reference (see std::any for details)!**/ + class dict_t { + public: + dict_t() {}; + virtual ~dict_t() {}; + + /**Add a variable to the hash + The type must be known for insertion. + NOTE: The variable is passed by value!**/ + template + void set(std::string key, TY val) { + vhash[key] = val; + } + + /**Add a variable to the hash as a boost::any value + NOTE: The variable is passed by value!**/ + void set_raw(std::string key, std::any val) { + vhash[key] = val; + } + + /**Retrieve a variable from the hash + The type must be known beforehand for successful retrieval. + NOTE: The variable is returned by value!**/ + template + TY get(std::string key) { + if(!this->has(key)){ + std::string msg = "KeyError: Unable to find key '" + key + "' in the current dictionary."; + throw std::invalid_argument(msg); + } + + try{ + return std::any_cast(vhash[key]); + } catch (const std::bad_any_cast& e){ + std::string msg = e.what() + std::string(" thrown for key ") + key; + std::cerr << msg << std::endl; + throw std::bad_any_cast(); + } + } + + /**Retrieve a variable from the hash as a boost::any value + NOTE: The variable is returned by value!**/ + std::any get_raw(std::string key) { + if(!this->has(key)){ + std::string msg = "KeyError: Unable to find key '" + key + "' in the current dictionary."; + throw std::invalid_argument(msg); + } + return vhash[key]; + } + + /**Delete a key from the hash to free memory.**/ + void del(std::string key) { + vhash.erase(key); + } + + /**Get the list of keys in the hash**/ + std::list keys() { + std::list keys; + for( const auto& item: vhash ) { + keys.push_back(item.first); + } + return keys; + } + + /**Check if the key is in the hash**/ + bool has(std::string key) { + auto h_keys = this->keys(); + return bool(std::find(h_keys.begin(), h_keys.end(), key) != h_keys.end()); + } + + /**Copy a list of parameters into a new dict_t.**/ + dict_t select(std::list keys) { + dict_t newTable; + for( const auto& key: keys ) { + newTable.set(key, this->vhash[key]); + } + return newTable; + } + + /**Merge another dict_t into the current one + NOTE: Duplicate keys will be overwritten! **/ + void merge(dict_t& other) { + for( const auto& key: other.keys() ) { + this->set_raw(key, other.get_raw(key)); + } + } + + /**The number of elements in the hash**/ + size_t size() const { + return this->vhash.size(); + } + + /**Helper functions to gather type information about the stored data.**/ + template + bool is_type(std::string key) { + std::any a = vhash[key]; + return (a.type()==typeid(TY)); + } + + /**Assignment operator**/ + dict_t& operator=(dict_t& other) { + this->vhash.clear(); + for( auto key: other.keys() ) { + this->set_raw(key, other.get_raw(key)); + } + return *this; + } + + /**Merge operators**/ + dict_t& operator+=(dict_t& other) { + for( auto key: other.keys() ) { + this->set_raw(key, other.get_raw(key)); + } + return *this; + } + + /**Merge operator**/ + dict_t& operator+=(dict_t other) { + for( auto key: other.keys() ) { + this->set_raw(key, other.get_raw(key)); + } + return *this; + } + + /**Array subscript operator is same as get()**/ + template + TY operator[](const std::string key) { + return this->get(key); + } + + protected: + /**The actual container that stores key - value pairs + It uses std::any to allow lazy typing.**/ + std::unordered_map vhash; + }; + + /**Print the array to the output stream**/ + std::ostream& operator<<( std::ostream&, dict_t&); + +} /**End namespace**/ + + +#endif /* __HASH_TYPE_HPP__ */ + diff --git a/zmq-receiver/src/dict_t.cpp b/zmq-receiver/src/dict_t.cpp new file mode 100644 index 0000000..c27403b --- /dev/null +++ b/zmq-receiver/src/dict_t.cpp @@ -0,0 +1,39 @@ + +#include "dict_t.hpp" + + +/**Print the hash into the output stream**/ +std::ostream& dict::operator<<( std::ostream &output, dict::dict_t& hsh) { + output << "Printing known key-value pairs in hash table:\n"; + + for( const auto& key: hsh.keys() ) { + if( hsh.is_type(key)) { + output << key << "\t" << hsh.get(key) << "\t" << hsh.get_raw(key).type().name() << "\n";} + else if( hsh.is_type(key)) { + output << key << "\t" << hsh.get(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; } + else if( hsh.is_type(key)) { + output << key << "\t" << hsh.get(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; } + else if( hsh.is_type(key)) { + output << key << "\t" << hsh.get(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; } + else if( hsh.is_type(key)) { + output << key << "\t" << hsh.get(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; } + else if( hsh.is_type(key)) { + output << key << "\t" << hsh.get(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; } + else if( hsh.is_type(key)) { + output << key << "\t" << hsh.get(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; } + else if( hsh.is_type(key)) { + output << key << "\t" << hsh.get(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; } + else if( hsh.is_type(key)) { + output << key << "\t" << hsh.get(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; } + else if( hsh.is_type(key)) { + output << key << "\t" << hsh.get(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; } + else if( hsh.is_type(key)) { + output << key << "\t" << hsh.get(key) << "\t" << hsh.get_raw(key).type().name() << "\n"; } + else { + output << key << "\t" << hsh.get_raw(key).type().name() << "\n"; + } + } + // Flush buffer + output << std::endl; + return output; +} diff --git a/zmq-receiver/src/main.cpp b/zmq-receiver/src/main.cpp new file mode 100644 index 0000000..487d826 --- /dev/null +++ b/zmq-receiver/src/main.cpp @@ -0,0 +1,98 @@ +#include +#include +#include + +#include "../../core-buffer/include/buffer_config.hpp" +#include "../include/BufferTypes.hpp" + +int main (int argc, char *argv[]){ + if (argc != 4) { + std::cout << "\nERROR\nUsage: jf_buffer_writer [zmq_topic] [zmq_sub_addr] [detector_name]\n"; + std::cout << "Topic corresponds to a supported format: IMAGEDATA\n"; + exit(-1); + } + + + std::string topic(argv[1]); + std::string sub_raw(argv[2]); + std::string detector_name(argv[3]); + std::string sub_addr("tcp://" + sub_raw); + + std::cout << "Starting ZMQ receiver at:\nPORT:\t" << sub_addr << "\nTOPIC:\t" << topic << std::endl; + + // Allocate the RamBuffer + //RamBuffer buffer(config.detector_name, config.n_modules); + + // ZMQ communication setup + std::cout << "Subscribing to server...\n" << std::endl; + zmq::context_t context (1); + // Subscribe to TOPIC (expected schema) + zmq::socket_t subscriber (context, ZMQ_SUB); + subscriber.connect(sub_addr.c_str()); + subscriber.setsockopt(ZMQ_SUBSCRIBE, topic.c_str(), topic.size()); + + // Publisher to ipc + std::cout << "Crating publisher...\n" << std::endl; + + zmq::socket_t republisher (context, ZMQ_PUB); +// const int sndhwm = buffer_config::BUFFER_ZMQ_SNDHWM; +// republisher.setsockopt(ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)); +// const int linger = 0; +// republisher.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); + +// std::string ipc_addr = buffer_config::BUFFER_LIVE_IPC_URL + detector_name + "-assembler"; + std::string ipc_addr = "tcp://*:5333"; + try{ + republisher.bind(ipc_addr.c_str()); + } catch (std::exception& ex){ + std::string msg = "Failed to bing publisher to address '" + ipc_addr + "': " + ex.what(); + throw std::runtime_error(msg); + + } + + + // Process 100 updates + int num_img = 0; + long total_temp = 0; + zmq::message_t msg_topic; + zmq::message_t msg_meta; + zmq::message_t msg_data; + + + ImageMetadataCache cache("./dataout", "raspicam"); + + std::cout << "I'm listening...\n" << std::endl; + for (int idx = 0; idx < 100000; idx++) { + // ZMQ guarantees full delivery of multipart massages! + // Packets are sent as three part messages: topic + meta + data + subscriber.recv(&msg_topic, 0); + subscriber.recv(&msg_meta, 0); + subscriber.recv(&msg_data, 0); + + // Schema (topic) specific saving) + if(topic=="IMAGEDATA"){ + cache.append((void*)msg_meta.data(), msg_meta.size(), (void*)msg_data.data(), msg_data.size()); + //buffer.write_image((ImageMetadata*)msg_meta.data(), (char*)msg_data.data); + if(idx%100==0){ + std::cout << "Received " << idx << " (at size " << msg_data.size() << " )" << std::endl; + } + } + } + return 0; +} + + + + + + + + + + + + + + + + diff --git a/zmq-receiver/test/CMakeLists.txt b/zmq-receiver/test/CMakeLists.txt new file mode 100644 index 0000000..e69de29