From 9331d066aa4d2523c8bd3eb40aafdbe28706486d Mon Sep 17 00:00:00 2001 From: lhdamiani Date: Thu, 19 Aug 2021 12:07:26 +0200 Subject: [PATCH] live stream with json+blob --- std-stream-send/CMakeLists.txt | 5 +- std-stream-send/include/ZmqLiveSender.hpp | 31 +++++++ std-stream-send/include/stream_config.hpp | 3 + std-stream-send/src/ZmqLiveSender.cpp | 108 ++++++++++++++++++++++ std-stream-send/src/main.cpp | 42 ++++----- 5 files changed, 167 insertions(+), 22 deletions(-) create mode 100644 std-stream-send/include/ZmqLiveSender.hpp create mode 100644 std-stream-send/src/ZmqLiveSender.cpp diff --git a/std-stream-send/CMakeLists.txt b/std-stream-send/CMakeLists.txt index 3d8b419..83a6b7d 100644 --- a/std-stream-send/CMakeLists.txt +++ b/std-stream-send/CMakeLists.txt @@ -4,15 +4,18 @@ file(GLOB SOURCES add_library(std-stream-send-lib STATIC ${SOURCES}) target_include_directories(std-stream-send-lib PUBLIC include/) target_link_libraries(std-stream-send-lib + external core-buffer-lib) add_executable(std-stream-send src/main.cpp) set_target_properties(std-stream-send PROPERTIES OUTPUT_NAME std_stream_send) target_link_libraries(std-stream-send + external + core-buffer-lib std-stream-send-lib zmq pthread rt) - + enable_testing() add_subdirectory(test/) diff --git a/std-stream-send/include/ZmqLiveSender.hpp b/std-stream-send/include/ZmqLiveSender.hpp new file mode 100644 index 0000000..167af65 --- /dev/null +++ b/std-stream-send/include/ZmqLiveSender.hpp @@ -0,0 +1,31 @@ +#ifndef SF_DAQ_BUFFER_ZMQLIVESENDER_HPP +#define SF_DAQ_BUFFER_ZMQLIVESENDER_HPP + +#include +#include "formats.hpp" +#include "BufferUtils.hpp" + + +class ZmqLiveSender { + void* ctx_; + const std::string& det_name_; + const std::string& stream_address_; + + void* socket_streamvis_; + +private: + std::string _get_data_type_mapping(int dtype) const; + +public: + ZmqLiveSender(void* ctx, + const std::string& det_name, + const std::string& stream_address); + ~ZmqLiveSender(); + + void send(const ImageMetadata& meta, const char* data, + const size_t image_n_bytes); + +}; + + +#endif //SF_DAQ_BUFFER_ZMQLIVESENDER_HPP diff --git a/std-stream-send/include/stream_config.hpp b/std-stream-send/include/stream_config.hpp index a0f1d72..2452fd9 100644 --- a/std-stream-send/include/stream_config.hpp +++ b/std-stream-send/include/stream_config.hpp @@ -11,4 +11,7 @@ namespace stream_config // Number of pulses between each statistics print out. const size_t STREAM_STATS_MODULO = 1000; + + // reduction factor + const int REDUCTION_FACTOR = 5; } diff --git a/std-stream-send/src/ZmqLiveSender.cpp b/std-stream-send/src/ZmqLiveSender.cpp new file mode 100644 index 0000000..f8b5039 --- /dev/null +++ b/std-stream-send/src/ZmqLiveSender.cpp @@ -0,0 +1,108 @@ +#include "ZmqLiveSender.hpp" +#include "stream_config.hpp" + +#include "zmq.h" +#include +#include +#include +#include +#include +// +using namespace std; +using namespace stream_config; + +ZmqLiveSender::ZmqLiveSender( + void* ctx, + const std::string& det_name, + const std::string& stream_address): + ctx_(ctx), + det_name_(det_name), + stream_address_(stream_address) +{ + socket_streamvis_ = BufferUtils::bind_socket( + ctx_, det_name_, stream_address_); +} + +ZmqLiveSender::~ZmqLiveSender() +{ + zmq_close(socket_streamvis_); +} + +std::string ZmqLiveSender::_get_data_type_mapping(const int dtype) const{ + if (dtype == 1) + return "uint8"; + else if (dtype == 2) + return "uint16"; + else if (dtype == 4) + return "uint32"; + else if (dtype == 8) + return "uint64"; +} + +void ZmqLiveSender::send(const ImageMetadata& meta, const char *data, + const size_t image_n_bytes) +{ + uint16_t data_empty [] = { 0, 0, 0, 0}; + + rapidjson::Document header(rapidjson::kObjectType); + auto& header_alloc = header.GetAllocator(); + string text_header; + + // TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame) + header.AddMember("frame", meta.id, header_alloc); + header.AddMember("is_good_frame", meta.status, header_alloc); + + rapidjson::Value detector_name; + detector_name.SetString(det_name_.c_str(), header_alloc); + header.AddMember("detector_name", detector_name, header_alloc); + + header.AddMember("htype", "array-1.0", header_alloc); + + rapidjson::Value dtype; + dtype.SetString(_get_data_type_mapping(meta.dtype).c_str(), header_alloc); + header.AddMember("type", dtype, header_alloc); + + // To be retrieved and filled with correct values down. + auto shape_value = rapidjson::Value(rapidjson::kArrayType); + shape_value.PushBack((uint64_t)0, header_alloc); + shape_value.PushBack((uint64_t)0, header_alloc); + header.AddMember("shape", shape_value, header_alloc); + + int send_streamvis = 0; + send_streamvis = rand() % REDUCTION_FACTOR; + if ( send_streamvis == 0 ) { + auto& shape = header["shape"]; + shape[0] = meta.width; + shape[1] = meta.height; + } else{ + auto& shape = header["shape"]; + shape[0] = 2; + shape[1] = 2; + } + + { + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + header.Accept(writer); + + text_header = buffer.GetString(); + } + + zmq_send(socket_streamvis_, + text_header.c_str(), + text_header.size(), + ZMQ_SNDMORE); + + if ( send_streamvis == 0 ) { + zmq_send(socket_streamvis_, + (char*)data, + image_n_bytes, + 0); + } else { + zmq_send(socket_streamvis_, + (char*)data_empty, + 8, + 0); + } + +} \ No newline at end of file diff --git a/std-stream-send/src/main.cpp b/std-stream-send/src/main.cpp index 0679200..b39f2d3 100644 --- a/std-stream-send/src/main.cpp +++ b/std-stream-send/src/main.cpp @@ -1,12 +1,12 @@ #include #include -#include "stream_config.hpp" #include #include -#include -#include "RamBuffer.hpp" #include +#include +#include "stream_config.hpp" +#include "ZmqLiveSender.hpp" using namespace std; using namespace stream_config; @@ -26,20 +26,19 @@ int main (int argc, char *argv[]) exit(-1); } - const auto config = StreamSendConfig::from_json_file(string(argv[1])); + auto config = BufferUtils::read_json_config(string(argv[1])); const int bit_depth = atoi(argv[2]); - const string stream_address = string(argv[3]); + const auto stream_address = string(argv[3]); auto ctx = zmq_ctx_new(); zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); - - auto sender = BufferUtils::bind_socket( - ctx, config.detector_name, stream_address.c_str()); + ZmqLiveSender sender(ctx, config.detector_name, stream_address); auto receiver_assembler = BufferUtils::connect_socket( ctx, config.detector_name, "assembler"); const size_t IMAGE_N_BYTES = config.image_height * config.image_width * bit_depth / 8; + RamBuffer image_buffer(config.detector_name + "_assembler", sizeof(ImageMetadata), IMAGE_N_BYTES, 1, RAM_BUFFER_N_SLOTS); @@ -50,19 +49,20 @@ int main (int argc, char *argv[]) // receives the assembled image id from the assembler zmq_recv(receiver_assembler, &meta, sizeof(meta), 0); + // gets the image data + char* dst_data = image_buffer.get_slot_data(meta.id); + // sends the json metadata with the data + sender.send(meta, dst_data, IMAGE_N_BYTES); + + + // zmq_send(sender, + // &meta, + // sizeof(ImageMetadata), + // ZMQ_SNDMORE | ZMQ_NOBLOCK); + + // zmq_send(sender, + // dst_data, + // IMAGE_N_BYTES, ZMQ_NOBLOCK); - - if (meta.id % 10){ - auto* dst_meta = image_buffer.get_slot_meta(meta.id); - auto* dst_data = image_buffer.get_slot_data(meta.id); - zmq_send(sender, - &meta, - sizeof(ImageMetadata), - ZMQ_SNDMORE | ZMQ_NOBLOCK); - - zmq_send(sender, - dst_data, - IMAGE_N_BYTES, ZMQ_NOBLOCK); - } } }