Files
StreamGenerator/src/asynStreamGeneratorDriver.cpp
2025-11-05 10:13:08 +01:00

793 lines
30 KiB
C++

#include "asynOctetSyncIO.h"
#include "ev42_events_generated.h"
#include <cstring>
#include <epicsStdio.h>
#include <iocsh.h>
#include <queue>
// Just for printing
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include "asynStreamGeneratorDriver.h"
#include <epicsExport.h>
/*******************************************************************************
* Kafka Methods
*/
static void set_kafka_config_key(rd_kafka_conf_t *conf, char *key,
char *value) {
char errstr[512];
rd_kafka_conf_res_t res;
res = rd_kafka_conf_set(conf, key, value, errstr, sizeof(errstr));
if (res != RD_KAFKA_CONF_OK) {
epicsStdoutPrintf("Failed to set config value %s : %s\n", key, value);
exit(1);
}
}
static rd_kafka_t *create_kafka_producer(const char *kafkaBroker) {
char errstr[512];
rd_kafka_t *producer;
// Prepare configuration object
rd_kafka_conf_t *conf = rd_kafka_conf_new();
// TODO feel not great about this
set_kafka_config_key(conf, "bootstrap.servers",
const_cast<char *>(kafkaBroker));
set_kafka_config_key(conf, "queue.buffering.max.messages", "10000000");
// Create the Producer
producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!producer) {
epicsStdoutPrintf("Failed to create Kafka Producer: %s\n", errstr);
exit(1);
}
return producer;
}
/*******************************************************************************
* Static Methods Passed to Epics Threads that should run in the background
*/
static void udpPollerTask(void *drvPvt) {
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
pSGD->receiveUDP();
}
static void daqTask(void *drvPvt) {
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
pSGD->processEvents();
}
static void monitorProducerTask(void *drvPvt) {
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
pSGD->produceMonitor();
}
static void detectorProducerTask(void *drvPvt) {
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
pSGD->produceDetector();
}
/*******************************************************************************
* Stream Generator Helper Methods
*/
asynStatus asynStreamGeneratorDriver::createInt32Param(
// TODO should show error if there is one
asynStatus status, char *name, int *variable, epicsInt32 initialValue) {
return (asynStatus)(status | createParam(name, asynParamInt32, variable) |
setIntegerParam(*variable, initialValue));
}
/*******************************************************************************
* Stream Generator Methods
*/
asynStreamGeneratorDriver::asynStreamGeneratorDriver(
const char *portName, const char *ipPortName, const int numChannels,
const int udpQueueSize, const bool enableKafkaStream,
const char *kafkaBroker, const char *monitorTopic,
const char *detectorTopic, const int kafkaQueueSize,
const int kafkaMaxPacketSize)
: asynPortDriver(portName, 1, /* maxAddr */
asynInt32Mask | asynInt64Mask |
asynDrvUserMask, /* Interface mask */
asynInt32Mask | asynInt64Mask, /* Interrupt mask */
0, /* asynFlags. This driver does not block and it is
not multi-device, but has a
destructor ASYN_DESTRUCTIBLE our version of the Asyn
is too old to support this flag */
1, /* Autoconnect */
0, /* Default priority */
0), /* Default stack size*/
num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream),
monitorTopic(monitorTopic), detectorTopic(detectorTopic),
udpQueue(epicsRingBytesCreate(udpQueueSize * sizeof(NormalisedEvent))),
monitorQueue(
epicsRingBytesCreate(kafkaQueueSize * sizeof(NormalisedEvent))),
detectorQueue(
epicsRingBytesCreate(kafkaQueueSize * sizeof(NormalisedEvent))),
kafkaMaxPacketSize(kafkaMaxPacketSize) {
const char *functionName = "asynStreamGeneratorDriver";
// Parameter Setup
asynStatus status = asynSuccess;
status = createInt32Param(status, P_StatusString, &P_Status, STATUS_IDLE);
status = createInt32Param(status, P_ResetString, &P_Reset);
status = createInt32Param(status, P_StopString, &P_Stop);
status = createInt32Param(status, P_CountPresetString, &P_CountPreset);
status = createInt32Param(status, P_TimePresetString, &P_TimePreset);
status = createInt32Param(status, P_ElapsedTimeString, &P_ElapsedTime);
status =
createInt32Param(status, P_ClearElapsedTimeString, &P_ClearElapsedTime);
status =
createInt32Param(status, P_MonitorChannelString, &P_MonitorChannel);
status = createInt32Param(status, P_ThresholdString, &P_Threshold, 1);
status = createInt32Param(status, P_ThresholdChannelString,
&P_ThresholdChannel, 1);
// Create Parameters templated on Channel Number
char pv_name_buffer[100];
P_Counts = new int[this->num_channels];
P_Rates = new int[this->num_channels];
P_ClearCounts = new int[this->num_channels];
for (std::size_t i = 0; i < this->num_channels; ++i) {
memset(pv_name_buffer, 0, 100);
epicsSnprintf(pv_name_buffer, 100, P_CountsString, i);
status = createInt32Param(status, pv_name_buffer, P_Counts + i);
memset(pv_name_buffer, 0, 100);
epicsSnprintf(pv_name_buffer, 100, P_RateString, i);
status = createInt32Param(status, pv_name_buffer, P_Rates + i);
memset(pv_name_buffer, 0, 100);
epicsSnprintf(pv_name_buffer, 100, P_ClearCountsString, i);
status = createInt32Param(status, pv_name_buffer, P_ClearCounts + i);
}
if (status) {
epicsStdoutPrintf(
"%s:%s: failed to create or setup parameters, status=%d\n",
driverName, functionName, status);
exit(1);
}
// Create Events
// this->pausedEventId = epicsEventCreate(epicsEventEmpty);
if (enableKafkaStream) {
epicsStdoutPrintf(
"Detector Kafka Config: broker=%s, topic=%s\n "
" queue size:%d, max events per packet: %d\n",
kafkaBroker, this->detectorTopic, kafkaQueueSize,
this->kafkaMaxPacketSize);
epicsStdoutPrintf(
"Monitors Kafka Config: broker=%s, topic=%s\n "
" queue size:%d, max events per packet: %d\n",
kafkaBroker, this->monitorTopic, kafkaQueueSize,
this->kafkaMaxPacketSize);
this->monitorProducer = create_kafka_producer(kafkaBroker);
this->detectorProducer = create_kafka_producer(kafkaBroker);
// Setup for Thread Producing Monitor Kafka Events
status =
(asynStatus)(epicsThreadCreate(
"monitor_produce", epicsThreadPriorityMedium,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::monitorProducerTask,
this) == NULL);
if (status) {
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
driverName, functionName, status);
exit(1);
}
// Setup for Thread Producing Detector Kafka Events
status =
(asynStatus)(epicsThreadCreate(
"monitor_produce", epicsThreadPriorityMedium,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::detectorProducerTask,
this) == NULL);
if (status) {
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
driverName, functionName, status);
exit(1);
}
} else {
epicsStdoutPrintf("Kafka Stream Disabled\n");
}
/* Create the thread that orders the events and acts as our sinqDaq stand-in
*/
status = (asynStatus)(epicsThreadCreate(
"sinqDAQ", epicsThreadPriorityMax,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::daqTask, this) == NULL);
if (status) {
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
driverName, functionName, status);
exit(1);
}
// UDP Receive Setup
status = pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL);
if (status) {
epicsStdoutPrintf("%s:%s: Couldn't open connection %s, status=%d\n",
driverName, functionName, ipPortName, status);
exit(1);
}
/* Create the thread that receives UDP traffic in the background */
status = (asynStatus)(epicsThreadCreate(
"udp_receive", epicsThreadPriorityMedium,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::udpPollerTask, this) == NULL);
if (status) {
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
driverName, functionName, status);
exit(1);
}
}
asynStreamGeneratorDriver::~asynStreamGeneratorDriver() {
// should make sure queues are empty and freed
// and that the kafka producers are flushed and freed
delete[] P_Counts;
delete[] P_Rates;
// TODO add exit should perhaps ensure the queue is flushed
// rd_kafka_poll(producer, 0);
// epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
// rd_kafka_flush(producer, 10 * 1000);
// epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
}
asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser,
epicsInt32 value) {
int function = pasynUser->reason;
asynStatus status = asynSuccess;
const char *paramName;
const char *functionName = "writeInt32";
getParamName(function, &paramName);
// TODO should maybe lock mutex for this
epicsInt32 currentStatus;
status = getIntegerParam(this->P_Status, &currentStatus);
if (status) {
epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize,
"%s:%s: status=%d, function=%d, name=%s, value=%d",
driverName, functionName, status, function, paramName,
value);
return status;
}
// TODO clean up
bool isClearCount = false;
size_t channelToClear;
for (size_t i = 0; i < this->num_channels; ++i) {
isClearCount |= function == P_ClearCounts[i];
if (isClearCount) {
channelToClear = i;
break;
}
}
// TODO should check everything...
if (function == P_CountPreset) {
if (!currentStatus) {
setIntegerParam(function, value);
setIntegerParam(P_Status, STATUS_COUNTING);
status = (asynStatus)callParamCallbacks();
} else {
return asynError;
}
} else if (function == P_TimePreset) {
if (!currentStatus) {
setIntegerParam(function, value);
setIntegerParam(P_Status, STATUS_COUNTING);
status = (asynStatus)callParamCallbacks();
} else {
return asynError;
}
} else if (function == P_ClearElapsedTime) {
if (!currentStatus) {
setIntegerParam(P_ElapsedTime, 0);
status = (asynStatus)callParamCallbacks();
} else {
return asynError;
}
} else if (isClearCount) {
if (!currentStatus) {
setIntegerParam(P_Counts[channelToClear], 0);
status = (asynStatus)callParamCallbacks();
} else {
return asynError;
}
} else if (function == P_Reset) {
lock();
// TODO should probably set back everything to defaults
setIntegerParam(P_Status, STATUS_IDLE);
status = (asynStatus)callParamCallbacks();
unlock();
} else if (function == P_Stop) {
lock();
setIntegerParam(P_Status, STATUS_IDLE);
status = (asynStatus)callParamCallbacks();
unlock();
} else if (function == P_MonitorChannel) {
if (!currentStatus) {
setIntegerParam(function, value);
status = (asynStatus)callParamCallbacks();
} else {
return asynError;
}
} else {
setIntegerParam(function, value);
status = (asynStatus)callParamCallbacks();
}
if (status)
epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize,
"%s:%s: status=%d, function=%d, name=%s, value=%d",
driverName, functionName, status, function, paramName,
value);
return status;
}
void asynStreamGeneratorDriver::receiveUDP() {
// TODO fix time overflows
// Regarding time overflow.
// * the header time stamp is 3 words, i.e. 48 bits.
// * it has a resolution of 100ns
// * so we can cover a maximum of (2^(3*16) - 1) * 1e-7 = 28147497 seconds
// * or about 325 days
// * so maybe this isn't necessary to solve, as long as we restart the
// electronics at least once a year...
const char *functionName = "receiveUDP";
asynStatus status = asynSuccess;
int isConnected = 1;
std::size_t received;
int eomReason;
// The correlation unit sents messages with a maximum size of 1500 bytes.
// These messages don't have any obious start or end to synchronise
// against...
const std::size_t bufferSize = 1500;
char buffer[bufferSize];
// We have 10 mcpdids
uint64_t *lastBufferNumber = new uint64_t[10];
for (size_t i = 0; i < 10; ++i) {
lastBufferNumber[i] = 0;
}
NormalisedEvent ne;
while (true) {
status = pasynManager->isConnected(pasynUDPUser, &isConnected);
if (!isConnected)
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: isConnected = %d\n", driverName, functionName,
isConnected);
status = pasynOctetSyncIO->read(pasynUDPUser, buffer, bufferSize,
0, // timeout
&received, &eomReason);
if (received) {
UDPHeader *header = (UDPHeader *)buffer;
std::size_t total_events = (header->BufferLength - 21) / 3;
if (received == total_events * 6 + 42) {
if (header->BufferNumber - lastBufferNumber[header->McpdID] >
1 &&
lastBufferNumber[header->McpdID] !=
std::numeric_limits<
decltype(header->BufferNumber)>::max()) {
asynPrint(
pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: missed packet on id: %d. Received: %" PRIu64
", last: %" PRIu64 "\n",
driverName, functionName, header->McpdID,
header->BufferNumber, lastBufferNumber[header->McpdID]);
}
lastBufferNumber[header->McpdID] = header->BufferNumber;
for (std::size_t i = 0; i < total_events; ++i) {
char *event = (buffer + 21 * 2 + i * 6);
if (event[5] & 0x80) { // Monitor Event
MonitorEvent *m_event = (MonitorEvent *)event;
ne.timestamp =
header->nanosecs() + (uint64_t)m_event->nanosecs();
ne.source = 0;
ne.pixelId = m_event->DataID;
} else { // Detector Event
DetectorEvent *d_event = (DetectorEvent *)event;
ne.timestamp =
header->nanosecs() + (uint64_t)d_event->nanosecs();
ne.source = header->McpdID;
ne.pixelId = d_event->pixelId(header->McpdID);
}
epicsRingBytesPut(this->udpQueue, (char *)&ne,
sizeof(NormalisedEvent));
}
} else {
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: invalid UDP packet\n", driverName,
functionName);
}
}
}
}
inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &&ne) {
if (this->kafkaEnabled) {
if (ne.source == 0)
epicsRingBytesPut(this->monitorQueue, (char *)&ne,
sizeof(NormalisedEvent));
else
epicsRingBytesPut(this->detectorQueue, (char *)&ne,
sizeof(NormalisedEvent));
}
}
void asynStreamGeneratorDriver::processEvents() {
const char *functionName = "processEvents";
const size_t queueBufferSize =
10 * epicsRingBytesSize(this->udpQueue) / sizeof(NormalisedEvent);
struct {
bool operator()(const NormalisedEvent l,
const NormalisedEvent r) const {
return l.timestamp > r.timestamp;
}
} smallestToLargest;
// This should never be used. It is just instantiated to reserve a buffer
// of specific size.
std::vector<NormalisedEvent> queueBuffer;
queueBuffer.reserve(queueBufferSize);
std::priority_queue<NormalisedEvent, std::vector<NormalisedEvent>,
decltype(smallestToLargest)>
timeQueue(smallestToLargest, std::move(queueBuffer));
// TODO epics doesn't seem to support uint64, you would need an array of
// uint32. It does support int64 though.. so we start with that
epicsInt32 *counts = new epicsInt32[this->num_channels];
const uint64_t minRateSamplePeriod = 100'000'000ll;
const size_t rateAverageWindow = 20;
size_t countDiffsPtr = 0;
epicsInt32 *rates = new epicsInt32[this->num_channels];
epicsInt32 *countDiff = new epicsInt32[this->num_channels];
epicsInt32 *countDiffs =
new epicsInt32[this->num_channels * rateAverageWindow];
uint64_t *timeSpans = new uint64_t[this->num_channels];
epicsTimeStamp lastRateUpdate = epicsTime::getCurrent();
asynStatus status = asynSuccess;
NormalisedEvent ne;
uint64_t newestTimestamp = 0;
uint64_t startTimestamp = std::numeric_limits<uint64_t>::max();
uint64_t currTimestamp;
epicsInt32 elapsedSeconds = 0;
epicsInt32 prevStatus = STATUS_IDLE;
epicsInt32 currStatus = STATUS_IDLE;
epicsInt32 countPreset = 0;
epicsInt32 timePreset = 0;
epicsInt32 presetChannel = 0;
while (true) {
// TODO depending on how this is implemented, I may also need to check
// that there is is enough bytes, in case it does partial writes...
if (epicsRingBytesGet(udpQueue, (char *)&ne, sizeof(NormalisedEvent))) {
// we should reastart this ioc at least every few years, as at ns
// resolution with a uint64_t we will have an overflow after around
// 4 years
newestTimestamp = std::max(newestTimestamp, ne.timestamp);
++countDiff[ne.source == 0 ? ne.pixelId + 1 : 0];
timeQueue.push(std::move(ne));
}
// idea is to try and guarantee at least 1 packet per id or the min
// frequency for each id without actually checking all ids
if (timeQueue.size() >= 1500 * 10 ||
(timeQueue.size() > 0 &&
newestTimestamp - timeQueue.top().timestamp >= 200'000'000ull)) {
ne = timeQueue.top();
timeQueue.pop();
status = getIntegerParam(this->P_Status, &currStatus);
if (currStatus == STATUS_COUNTING && prevStatus == STATUS_IDLE) {
// Starting a new count
// get current count configuration
getIntegerParam(this->P_CountPreset, &countPreset);
getIntegerParam(this->P_TimePreset, &timePreset);
getIntegerParam(this->P_MonitorChannel, &presetChannel);
// reset status variables
startTimestamp = std::numeric_limits<uint64_t>::max();
for (size_t i = 0; i < this->num_channels; ++i) {
counts[i] = 0;
}
// reset pvs
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], counts[i]);
}
setIntegerParam(P_ElapsedTime, 0);
callParamCallbacks();
unlock();
// TODO might consider throwing out current buffer as it is
// from before count started? then again, 0.2 ms or whatever is
// set above is quite a small preceeding amount of time, so
// maybe it doesn't matter
}
prevStatus = currStatus;
if (currStatus == STATUS_COUNTING) {
startTimestamp = std::min(startTimestamp, ne.timestamp);
currTimestamp = ne.timestamp;
elapsedSeconds =
0 ? currTimestamp <= startTimestamp
: ((double)(currTimestamp - startTimestamp)) / 1e9;
// is our count finished?
if ((countPreset && counts[presetChannel] >= countPreset) ||
(timePreset && elapsedSeconds >= timePreset)) {
// filter out events that occured after the specified time
if (ne.timestamp - startTimestamp <= countPreset) {
counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1;
this->queueForKafka(std::move(ne));
// add any remaining events with the same timestamp
// we could theoretically have a small overrun if the
// timestamps are identical on the monitor channel
while (!timeQueue.empty() &&
!timeQueue.top().timestamp == currTimestamp) {
ne = timeQueue.top();
timeQueue.pop();
counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1;
this->queueForKafka(std::move(ne));
}
}
countPreset = 0;
timePreset = 0;
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], counts[i]);
}
setIntegerParam(P_ElapsedTime, elapsedSeconds);
setIntegerParam(P_CountPreset, countPreset);
setIntegerParam(P_TimePreset, timePreset);
callParamCallbacks();
setIntegerParam(P_Status, STATUS_IDLE);
callParamCallbacks();
unlock();
} else {
counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1;
this->queueForKafka(std::move(ne));
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], counts[i]);
}
setIntegerParam(P_ElapsedTime, elapsedSeconds);
callParamCallbacks();
unlock();
}
}
}
// Careful changing any of these magic numbers until I clean this up
// as you might end up calculating the wrong rate
epicsTimeStamp currentTime = epicsTime::getCurrent();
if (epicsTimeDiffInNS(&currentTime, &lastRateUpdate) >
minRateSamplePeriod) {
timeSpans[countDiffsPtr] =
epicsTimeDiffInNS(&currentTime, &lastRateUpdate);
uint64_t totalTime = 0;
for (size_t i = 0; i <= rateAverageWindow; ++i) {
totalTime += timeSpans[i];
}
lastRateUpdate = currentTime;
for (size_t i = 0; i <= this->num_channels; ++i) {
countDiffs[i * rateAverageWindow + countDiffsPtr] =
countDiff[i];
uint64_t cnt = 0;
for (size_t j = 0; j <= rateAverageWindow; ++j) {
cnt += countDiffs[i * rateAverageWindow + j];
}
rates[i] = cnt / (totalTime * 1e-9);
countDiff[i] = 0;
}
countDiffsPtr = (countDiffsPtr + 1) % rateAverageWindow;
if (countDiffsPtr % 5 == 0) {
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Rates[i], rates[i]);
}
callParamCallbacks();
unlock();
}
}
}
}
void asynStreamGeneratorDriver::produce(epicsRingBytesId eventQueue,
rd_kafka_t *kafkaProducer,
const char *topic, const char *source) {
flatbuffers::FlatBufferBuilder builder(1024);
const std::size_t bufferSize = this->kafkaMaxPacketSize + 16;
std::vector<uint32_t> tof;
tof.reserve(bufferSize);
std::vector<uint32_t> did;
did.reserve(bufferSize);
epicsTimeStamp last_sent = epicsTime::getCurrent();
epicsTimeStamp now = last_sent;
int total = 0;
uint64_t message_id = 0;
NormalisedEvent ne;
while (true) {
if (!epicsRingBytesIsEmpty(eventQueue)) {
++total;
epicsRingBytesGet(eventQueue, (char *)&ne, sizeof(NormalisedEvent));
tof.push_back(ne.timestamp);
did.push_back(ne.pixelId);
} else {
epicsThreadSleep(0.001); // seconds
}
now = epicsTime::getCurrent();
// At least every 0.2 seconds
if (total >= this->kafkaMaxPacketSize ||
epicsTimeDiffInNS(&now, &last_sent) > 200'000'000ll) {
last_sent = epicsTime::getCurrent();
if (total) {
total = 0;
builder.Clear();
auto message = CreateEventMessageDirect(
builder, source, message_id++,
((uint64_t)now.secPastEpoch) * 1'000'000'000ull +
((uint64_t)now.nsec),
&tof, &did);
builder.Finish(message, "ev42");
rd_kafka_resp_err_t err = rd_kafka_producev(
kafkaProducer, RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
// RD_KAFKA_V_KEY((void *)key, key_len),
RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(),
builder.GetSize()),
// RD_KAFKA_V_OPAQUE(NULL),
RD_KAFKA_V_END);
if (err) {
epicsStdoutPrintf("Failed to produce to topic %s: %s\n",
topic, rd_kafka_err2str(err));
}
rd_kafka_poll(kafkaProducer, 0);
tof.clear();
did.clear();
}
}
}
}
void asynStreamGeneratorDriver::produceMonitor() {
this->produce(monitorQueue, monitorProducer, monitorTopic, "monitor");
}
void asynStreamGeneratorDriver::produceDetector() {
this->produce(detectorQueue, detectorProducer, detectorTopic, "detector");
}
/*******************************************************************************
* Methods exposed to IOC Shell
*/
extern "C" {
asynStatus asynStreamGeneratorDriverConfigure(
const char *portName, const char *ipPortName, const int numChannels,
const int udpQueueSize, const char *kafkaBroker, const char *monitorTopic,
const char *detectorTopic, const int kafkaQueueSize,
const int kafkaMaxPacketSize) {
new asynStreamGeneratorDriver(portName, ipPortName, numChannels,
udpQueueSize, kafkaBroker[0], kafkaBroker,
monitorTopic, detectorTopic, kafkaQueueSize,
kafkaMaxPacketSize);
return asynSuccess;
}
static const iocshArg initArg0 = {"portName", iocshArgString};
static const iocshArg initArg1 = {"ipPortName", iocshArgString};
static const iocshArg initArg2 = {"numChannels", iocshArgInt};
static const iocshArg initArg3 = {"udpQueueSize", iocshArgInt};
static const iocshArg initArg4 = {"kafkaBroker", iocshArgString};
static const iocshArg initArg5 = {"monitorTopic", iocshArgString};
static const iocshArg initArg6 = {"detectorTopic", iocshArgString};
static const iocshArg initArg7 = {"kafkaQueueSize", iocshArgInt};
static const iocshArg initArg8 = {"kafkaMaxPacketSize", iocshArgInt};
static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2,
&initArg3, &initArg4, &initArg5,
&initArg6, &initArg7, &initArg8};
static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 9, initArgs};
static void initCallFunc(const iocshArgBuf *args) {
asynStreamGeneratorDriverConfigure(
args[0].sval, args[1].sval, args[2].ival, args[3].ival, args[4].sval,
args[5].sval, args[6].sval, args[7].ival, args[8].ival);
}
void asynStreamGeneratorDriverRegister(void) {
iocshRegister(&initFuncDef, initCallFunc);
}
epicsExportRegistrar(asynStreamGeneratorDriverRegister);
}