can send ess streaming data types flatbuffer messages via kafka
This commit is contained in:
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
O.*
|
||||
.*ignore
|
||||
schemas/
|
||||
4
Makefile
4
Makefile
@@ -15,7 +15,6 @@ DBDS += src/asynStreamGeneratorDriver.dbd
|
||||
# DB files to include in the release
|
||||
TEMPLATES += db/channels.db
|
||||
|
||||
# These headers allow to depend on this library for derived drivers.
|
||||
HEADERS += src/asynStreamGeneratorDriver.h
|
||||
|
||||
# Source files to build
|
||||
@@ -23,4 +22,7 @@ SOURCES += src/asynStreamGeneratorDriver.cpp
|
||||
|
||||
USR_CFLAGS += -Wall -Wextra -Wunused-result -Werror -fvisibility=hidden # -Wpedantic // Does not work because EPICS macros trigger warnings
|
||||
|
||||
# Required to support EV42/44
|
||||
USR_CXXFLAGS += -I../dep/flatbuffers/include/ -I../schemas
|
||||
|
||||
LIB_SYS_LIBS += rdkafka
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
#include "asynOctetSyncIO.h"
|
||||
#include "ev42_events_generated.h"
|
||||
#include <cstring>
|
||||
#include <epicsStdio.h>
|
||||
#include <iocsh.h>
|
||||
@@ -140,14 +141,24 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
|
||||
}
|
||||
|
||||
char *msg = "asdf\n";
|
||||
// EventMessageBuilder b;
|
||||
// We could I believe reuse a buffer which might be more performant.
|
||||
flatbuffers::FlatBufferBuilder builder(1024);
|
||||
std::vector<uint32_t> tof = {1, 2, 3};
|
||||
std::vector<uint32_t> did = {0, 0, 0};
|
||||
auto message =
|
||||
CreateEventMessageDirect(builder, "monitor1", 0, 0, &tof, &did);
|
||||
|
||||
rd_kafka_resp_err_t err =
|
||||
rd_kafka_producev(producer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"),
|
||||
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
|
||||
// RD_KAFKA_V_KEY((void *)key, key_len),
|
||||
RD_KAFKA_V_VALUE((void *)msg, 6),
|
||||
// RD_KAFKA_V_OPAQUE(NULL),
|
||||
RD_KAFKA_V_END);
|
||||
builder.Finish(message, "ev42");
|
||||
printf("buffer size: %d\n", builder.GetSize());
|
||||
|
||||
rd_kafka_resp_err_t err = rd_kafka_producev(
|
||||
producer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"),
|
||||
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
|
||||
// RD_KAFKA_V_KEY((void *)key, key_len),
|
||||
RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), builder.GetSize()),
|
||||
// RD_KAFKA_V_OPAQUE(NULL),
|
||||
RD_KAFKA_V_END);
|
||||
|
||||
if (err) {
|
||||
// TODO
|
||||
|
||||
Reference in New Issue
Block a user