POC for each interface type
This commit is contained in:
213
src/asynStreamGeneratorDriver.cpp
Normal file
213
src/asynStreamGeneratorDriver.cpp
Normal file
@@ -0,0 +1,213 @@
|
||||
#include "asynOctetSyncIO.h"
|
||||
#include <epicsStdio.h>
|
||||
#include <iocsh.h>
|
||||
#include <librdkafka/rdkafka.h>
|
||||
|
||||
#include "asynStreamGeneratorDriver.h"
|
||||
#include <epicsExport.h>
|
||||
|
||||
/* Wrapper to set config values and error out if needed.
|
||||
*/
|
||||
static void set_config(rd_kafka_conf_t *conf, char *key, char *value) {
|
||||
char errstr[512];
|
||||
rd_kafka_conf_res_t res;
|
||||
|
||||
res = rd_kafka_conf_set(conf, key, value, errstr, sizeof(errstr));
|
||||
if (res != RD_KAFKA_CONF_OK) {
|
||||
// TODO
|
||||
// g_error("Unable to set config: %s", errstr);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
static void udpPollerTask(void *drvPvt) {
|
||||
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
|
||||
pSGD->receiveUDP();
|
||||
}
|
||||
|
||||
/** Constructor for the asynStreamGeneratorDriver class.
|
||||
* Calls constructor for the asynPortDriver base class.
|
||||
* \param[in] portName The name of the asyn port driver to be created. */
|
||||
asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
|
||||
const char *ipPortName,
|
||||
const int numChannels)
|
||||
: asynPortDriver(portName, 1, /* maxAddr */
|
||||
asynInt32Mask, /* Interface mask */
|
||||
asynInt32Mask, /* Interrupt mask */
|
||||
0, /* asynFlags. This driver does not block and it is
|
||||
not multi-device, but has a
|
||||
destructor ASYN_DESTRUCTIBLE our version of the Asyn
|
||||
is too old to support this flag */
|
||||
1, /* Autoconnect */
|
||||
0, /* Default priority */
|
||||
0) /* Default stack size*/
|
||||
{
|
||||
|
||||
// Parameter Setup
|
||||
createParam(P_CountsString, asynParamInt32, &P_Counts);
|
||||
setIntegerParam(P_Counts, 0);
|
||||
|
||||
// UDP Receive Setup
|
||||
pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL);
|
||||
|
||||
/* Create the thread that receives UDP traffic in the background */
|
||||
asynStatus status =
|
||||
(asynStatus)(epicsThreadCreate(
|
||||
"udp_receive", epicsThreadPriorityMedium,
|
||||
epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||
(EPICSTHREADFUNC)::udpPollerTask, this) == NULL);
|
||||
if (status) {
|
||||
// printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName,
|
||||
// functionName, status);
|
||||
printf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
||||
"StreamGenerator", "init", status);
|
||||
return;
|
||||
}
|
||||
|
||||
// Kafka Produce Setup
|
||||
rd_kafka_conf_t *conf;
|
||||
char errstr[512];
|
||||
|
||||
// Create client configuration
|
||||
conf = rd_kafka_conf_new();
|
||||
set_config(conf, "bootstrap.servers", "linkafka01:9092");
|
||||
set_config(conf, "queue.buffering.max.messages", "1e7");
|
||||
|
||||
// Create the Producer instance.
|
||||
rd_kafka_t *producer =
|
||||
rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
|
||||
if (!producer) {
|
||||
// TODO
|
||||
// g_error("Failed to create new producer: %s", errstr);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
char *msg = "asdf\n";
|
||||
|
||||
rd_kafka_resp_err_t err =
|
||||
rd_kafka_producev(producer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"),
|
||||
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
|
||||
// RD_KAFKA_V_KEY((void *)key, key_len),
|
||||
RD_KAFKA_V_VALUE((void *)msg, 6),
|
||||
// RD_KAFKA_V_OPAQUE(NULL),
|
||||
RD_KAFKA_V_END);
|
||||
|
||||
if (err) {
|
||||
// TODO
|
||||
// g_error("Failed to produce to topic %s: %s", topic,
|
||||
// rd_kafka_err2str(err));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
|
||||
|
||||
rd_kafka_poll(producer, 0);
|
||||
epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
|
||||
rd_kafka_flush(producer, 10 * 1000);
|
||||
epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
|
||||
}
|
||||
|
||||
asynStreamGeneratorDriver::~asynStreamGeneratorDriver() {}
|
||||
|
||||
asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser,
|
||||
epicsInt32 *value) {
|
||||
|
||||
int function = pasynUser->reason;
|
||||
asynStatus status;
|
||||
|
||||
if (function == P_Counts) {
|
||||
int val;
|
||||
status = getIntegerParam(P_Counts, &val);
|
||||
*value = val;
|
||||
return status;
|
||||
} else {
|
||||
return asynError;
|
||||
}
|
||||
return asynSuccess;
|
||||
}
|
||||
|
||||
void asynStreamGeneratorDriver::receiveUDP() {
|
||||
asynStatus status;
|
||||
int isConnected;
|
||||
|
||||
char buffer[1500];
|
||||
size_t received;
|
||||
int eomReason;
|
||||
|
||||
int val;
|
||||
|
||||
while (true) {
|
||||
// epicsStdoutPrintf("polling!!");
|
||||
status = pasynManager->isConnected(pasynUDPUser, &isConnected);
|
||||
if (status) {
|
||||
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
|
||||
"%s:%s: error calling pasynManager->isConnected, "
|
||||
"status=%d, error=%s\n",
|
||||
"StreamGenerator", "receiveUDP", status,
|
||||
pasynUDPUser->errorMessage);
|
||||
// driverName, functionName, status,
|
||||
// pasynUserIPPort_->errorMessage);
|
||||
}
|
||||
asynPrint(pasynUserSelf, ASYN_TRACEIO_DRIVER,
|
||||
"%s:%s: isConnected = %d\n", //
|
||||
"StreamGenerator", "receiveUDP", isConnected);
|
||||
|
||||
status = pasynOctetSyncIO->read(pasynUDPUser, buffer, 1500,
|
||||
1, // timeout
|
||||
&received, &eomReason);
|
||||
|
||||
// if (status)
|
||||
// asynPrint(
|
||||
// pasynUserSelf, ASYN_TRACE_ERROR,
|
||||
// "%s:%s: error calling pasynOctetSyncIO->read, status=%d\n",
|
||||
// "StreamGenerator", "receiveUDP", status);
|
||||
|
||||
buffer[received] = 0;
|
||||
|
||||
if (received)
|
||||
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: received %s\n",
|
||||
"StreamGenerator", "receiveUDP", buffer);
|
||||
|
||||
lock();
|
||||
getIntegerParam(P_Counts, &val);
|
||||
val += received > 0;
|
||||
setIntegerParam(P_Counts, val);
|
||||
callParamCallbacks();
|
||||
unlock();
|
||||
|
||||
epicsThreadSleep(0.001); // seconds
|
||||
}
|
||||
}
|
||||
|
||||
/* Configuration routine. Called directly, or from the iocsh function below */
|
||||
|
||||
extern "C" {
|
||||
|
||||
/** EPICS iocsh callable function to call constructor for the
|
||||
* asynStreamGeneratorDriver class. \param[in] portName The name of the asyn
|
||||
* port driver to be created. */
|
||||
asynStatus asynStreamGeneratorDriverConfigure(const char *portName,
|
||||
const char *ipPortName,
|
||||
const int numChannels) {
|
||||
new asynStreamGeneratorDriver(portName, ipPortName, numChannels);
|
||||
return asynSuccess;
|
||||
}
|
||||
|
||||
/* EPICS iocsh shell commands */
|
||||
|
||||
static const iocshArg initArg0 = {"portName", iocshArgString};
|
||||
static const iocshArg initArg1 = {"ipPortName", iocshArgString};
|
||||
static const iocshArg initArg2 = {"numChannels", iocshArgInt};
|
||||
static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2};
|
||||
static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 3, initArgs};
|
||||
static void initCallFunc(const iocshArgBuf *args) {
|
||||
asynStreamGeneratorDriverConfigure(args[0].sval, args[1].sval,
|
||||
args[2].ival);
|
||||
}
|
||||
|
||||
void asynStreamGeneratorDriverRegister(void) {
|
||||
iocshRegister(&initFuncDef, initCallFunc);
|
||||
}
|
||||
|
||||
epicsExportRegistrar(asynStreamGeneratorDriverRegister);
|
||||
}
|
||||
Reference in New Issue
Block a user