live stream with json+blob

This commit is contained in:
lhdamiani
2021-08-19 12:07:26 +02:00
parent 9ac7690241
commit 9331d066aa
5 changed files with 167 additions and 22 deletions
+4 -1
View File
@@ -4,15 +4,18 @@ file(GLOB SOURCES
add_library(std-stream-send-lib STATIC ${SOURCES})
target_include_directories(std-stream-send-lib PUBLIC include/)
target_link_libraries(std-stream-send-lib
external
core-buffer-lib)
add_executable(std-stream-send src/main.cpp)
set_target_properties(std-stream-send PROPERTIES OUTPUT_NAME std_stream_send)
target_link_libraries(std-stream-send
external
core-buffer-lib
std-stream-send-lib
zmq
pthread
rt)
enable_testing()
add_subdirectory(test/)
+31
View File
@@ -0,0 +1,31 @@
#ifndef SF_DAQ_BUFFER_ZMQLIVESENDER_HPP
#define SF_DAQ_BUFFER_ZMQLIVESENDER_HPP
#include <string>
#include "formats.hpp"
#include "BufferUtils.hpp"
class ZmqLiveSender {
void* ctx_;
const std::string& det_name_;
const std::string& stream_address_;
void* socket_streamvis_;
private:
std::string _get_data_type_mapping(int dtype) const;
public:
ZmqLiveSender(void* ctx,
const std::string& det_name,
const std::string& stream_address);
~ZmqLiveSender();
void send(const ImageMetadata& meta, const char* data,
const size_t image_n_bytes);
};
#endif //SF_DAQ_BUFFER_ZMQLIVESENDER_HPP
@@ -11,4 +11,7 @@ namespace stream_config
// Number of pulses between each statistics print out.
const size_t STREAM_STATS_MODULO = 1000;
// reduction factor
const int REDUCTION_FACTOR = 5;
}
+108
View File
@@ -0,0 +1,108 @@
#include "ZmqLiveSender.hpp"
#include "stream_config.hpp"
#include "zmq.h"
#include <stdexcept>
#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include <iostream>
//
using namespace std;
using namespace stream_config;
ZmqLiveSender::ZmqLiveSender(
void* ctx,
const std::string& det_name,
const std::string& stream_address):
ctx_(ctx),
det_name_(det_name),
stream_address_(stream_address)
{
socket_streamvis_ = BufferUtils::bind_socket(
ctx_, det_name_, stream_address_);
}
ZmqLiveSender::~ZmqLiveSender()
{
zmq_close(socket_streamvis_);
}
std::string ZmqLiveSender::_get_data_type_mapping(const int dtype) const{
if (dtype == 1)
return "uint8";
else if (dtype == 2)
return "uint16";
else if (dtype == 4)
return "uint32";
else if (dtype == 8)
return "uint64";
}
void ZmqLiveSender::send(const ImageMetadata& meta, const char *data,
const size_t image_n_bytes)
{
uint16_t data_empty [] = { 0, 0, 0, 0};
rapidjson::Document header(rapidjson::kObjectType);
auto& header_alloc = header.GetAllocator();
string text_header;
// TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame)
header.AddMember("frame", meta.id, header_alloc);
header.AddMember("is_good_frame", meta.status, header_alloc);
rapidjson::Value detector_name;
detector_name.SetString(det_name_.c_str(), header_alloc);
header.AddMember("detector_name", detector_name, header_alloc);
header.AddMember("htype", "array-1.0", header_alloc);
rapidjson::Value dtype;
dtype.SetString(_get_data_type_mapping(meta.dtype).c_str(), header_alloc);
header.AddMember("type", dtype, header_alloc);
// To be retrieved and filled with correct values down.
auto shape_value = rapidjson::Value(rapidjson::kArrayType);
shape_value.PushBack((uint64_t)0, header_alloc);
shape_value.PushBack((uint64_t)0, header_alloc);
header.AddMember("shape", shape_value, header_alloc);
int send_streamvis = 0;
send_streamvis = rand() % REDUCTION_FACTOR;
if ( send_streamvis == 0 ) {
auto& shape = header["shape"];
shape[0] = meta.width;
shape[1] = meta.height;
} else{
auto& shape = header["shape"];
shape[0] = 2;
shape[1] = 2;
}
{
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
header.Accept(writer);
text_header = buffer.GetString();
}
zmq_send(socket_streamvis_,
text_header.c_str(),
text_header.size(),
ZMQ_SNDMORE);
if ( send_streamvis == 0 ) {
zmq_send(socket_streamvis_,
(char*)data,
image_n_bytes,
0);
} else {
zmq_send(socket_streamvis_,
(char*)data_empty,
8,
0);
}
}
+21 -21
View File
@@ -1,12 +1,12 @@
#include <iostream>
#include <zmq.h>
#include "stream_config.hpp"
#include <chrono>
#include <thread>
#include <StreamSendConfig.hpp>
#include "RamBuffer.hpp"
#include <BufferUtils.hpp>
#include <RamBuffer.hpp>
#include "stream_config.hpp"
#include "ZmqLiveSender.hpp"
using namespace std;
using namespace stream_config;
@@ -26,20 +26,19 @@ int main (int argc, char *argv[])
exit(-1);
}
const auto config = StreamSendConfig::from_json_file(string(argv[1]));
auto config = BufferUtils::read_json_config(string(argv[1]));
const int bit_depth = atoi(argv[2]);
const string stream_address = string(argv[3]);
const auto stream_address = string(argv[3]);
auto ctx = zmq_ctx_new();
zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS);
auto sender = BufferUtils::bind_socket(
ctx, config.detector_name, stream_address.c_str());
ZmqLiveSender sender(ctx, config.detector_name, stream_address);
auto receiver_assembler = BufferUtils::connect_socket(
ctx, config.detector_name, "assembler");
const size_t IMAGE_N_BYTES = config.image_height * config.image_width * bit_depth / 8;
RamBuffer image_buffer(config.detector_name + "_assembler",
sizeof(ImageMetadata), IMAGE_N_BYTES,
1, RAM_BUFFER_N_SLOTS);
@@ -50,19 +49,20 @@ int main (int argc, char *argv[])
// receives the assembled image id from the assembler
zmq_recv(receiver_assembler, &meta, sizeof(meta), 0);
// gets the image data
char* dst_data = image_buffer.get_slot_data(meta.id);
// sends the json metadata with the data
sender.send(meta, dst_data, IMAGE_N_BYTES);
// zmq_send(sender,
// &meta,
// sizeof(ImageMetadata),
// ZMQ_SNDMORE | ZMQ_NOBLOCK);
// zmq_send(sender,
// dst_data,
// IMAGE_N_BYTES, ZMQ_NOBLOCK);
if (meta.id % 10){
auto* dst_meta = image_buffer.get_slot_meta(meta.id);
auto* dst_data = image_buffer.get_slot_data(meta.id);
zmq_send(sender,
&meta,
sizeof(ImageMetadata),
ZMQ_SNDMORE | ZMQ_NOBLOCK);
zmq_send(sender,
dst_data,
IMAGE_N_BYTES, ZMQ_NOBLOCK);
}
}
}