fix warnings
This commit is contained in:
@@ -6,10 +6,6 @@
|
||||
#include <iocsh.h>
|
||||
#include <queue>
|
||||
|
||||
// Just for printing
|
||||
#define __STDC_FORMAT_MACROS
|
||||
#include <inttypes.h>
|
||||
|
||||
#include "asynStreamGeneratorDriver.h"
|
||||
#include <epicsExport.h>
|
||||
|
||||
@@ -37,9 +33,11 @@ static rd_kafka_t *create_kafka_producer(const char *kafkaBroker) {
|
||||
// Prepare configuration object
|
||||
rd_kafka_conf_t *conf = rd_kafka_conf_new();
|
||||
// TODO feel not great about this
|
||||
set_kafka_config_key(conf, "bootstrap.servers",
|
||||
set_kafka_config_key(conf, const_cast<char *>("bootstrap.servers"),
|
||||
const_cast<char *>(kafkaBroker));
|
||||
set_kafka_config_key(conf, "queue.buffering.max.messages", "10000000");
|
||||
set_kafka_config_key(conf,
|
||||
const_cast<char *>("queue.buffering.max.messages"),
|
||||
const_cast<char *>("10000000"));
|
||||
// With 2e6 counts / s
|
||||
// and a packet size of 20480 events (163920 bytes)
|
||||
// this implies we need to send around 100 messages a second
|
||||
@@ -94,22 +92,26 @@ static void detectorProducerTask(void *drvPvt) {
|
||||
* Stream Generator Helper Methods
|
||||
*/
|
||||
|
||||
asynStatus asynStreamGeneratorDriver::createInt32Param(
|
||||
asynStatus status, char *name, int *variable, epicsInt32 initialValue) {
|
||||
asynStatus
|
||||
asynStreamGeneratorDriver::createInt32Param(asynStatus status, const char *name,
|
||||
int *variable,
|
||||
epicsInt32 initialValue) {
|
||||
// TODO should show error if there is one
|
||||
return (asynStatus)(status | createParam(name, asynParamInt32, variable) |
|
||||
setIntegerParam(*variable, initialValue));
|
||||
}
|
||||
|
||||
asynStatus asynStreamGeneratorDriver::createInt64Param(
|
||||
asynStatus status, char *name, int *variable, epicsInt64 initialValue) {
|
||||
asynStatus
|
||||
asynStreamGeneratorDriver::createInt64Param(asynStatus status, const char *name,
|
||||
int *variable,
|
||||
epicsInt64 initialValue) {
|
||||
// TODO should show error if there is one
|
||||
return (asynStatus)(status | createParam(name, asynParamInt64, variable) |
|
||||
setInteger64Param(*variable, initialValue));
|
||||
}
|
||||
|
||||
asynStatus asynStreamGeneratorDriver::createFloat64Param(asynStatus status,
|
||||
char *name,
|
||||
const char *name,
|
||||
int *variable,
|
||||
double initialValue) {
|
||||
// TODO should show error if there is one
|
||||
@@ -138,8 +140,8 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
|
||||
0, /* Default priority */
|
||||
0), /* Default stack size*/
|
||||
num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream),
|
||||
monitorTopic(monitorTopic), detectorTopic(detectorTopic),
|
||||
udpQueueSize(udpQueueSize), kafkaQueueSize(kafkaQueueSize),
|
||||
kafkaQueueSize(kafkaQueueSize), kafkaMaxPacketSize(kafkaMaxPacketSize),
|
||||
udpQueueSize(udpQueueSize),
|
||||
// measured in max packet sizes
|
||||
udpQueue(
|
||||
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
|
||||
@@ -150,10 +152,11 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
|
||||
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
|
||||
monitorQueue(
|
||||
epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))),
|
||||
monitorTopic(monitorTopic),
|
||||
detectorQueue(
|
||||
epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))),
|
||||
kafkaMaxPacketSize(kafkaMaxPacketSize) {
|
||||
const char *functionName = "asynStreamGeneratorDriver";
|
||||
detectorTopic(detectorTopic) {
|
||||
const char functionName[]{"asynStreamGeneratorDriver"};
|
||||
|
||||
// Parameter Setup
|
||||
asynStatus status = asynSuccess;
|
||||
@@ -333,9 +336,8 @@ asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser,
|
||||
epicsInt32 *value) {
|
||||
|
||||
int function = pasynUser->reason;
|
||||
asynStatus status = asynSuccess;
|
||||
const char *paramName;
|
||||
const char *functionName = "readInt32";
|
||||
// const char functionName[]{"readInt32"};
|
||||
getParamName(function, ¶mName);
|
||||
|
||||
if (function == P_UdpQueueHighWaterMark) {
|
||||
@@ -362,7 +364,7 @@ asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser,
|
||||
int function = pasynUser->reason;
|
||||
asynStatus status = asynSuccess;
|
||||
const char *paramName;
|
||||
const char *functionName = "writeInt32";
|
||||
const char functionName[]{"writeInt32"};
|
||||
getParamName(function, ¶mName);
|
||||
|
||||
// TODO should maybe lock mutex for this
|
||||
@@ -380,7 +382,7 @@ asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser,
|
||||
// TODO clean up
|
||||
bool isClearCount = false;
|
||||
size_t channelToClear;
|
||||
for (size_t i = 0; i < this->num_channels; ++i) {
|
||||
for (std::size_t i = 0; i < this->num_channels; ++i) {
|
||||
isClearCount |= function == P_ClearCounts[i];
|
||||
if (isClearCount) {
|
||||
channelToClear = i;
|
||||
@@ -452,8 +454,7 @@ asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser,
|
||||
|
||||
void asynStreamGeneratorDriver::receiveUDP() {
|
||||
|
||||
const char *functionName = "receiveUDP";
|
||||
asynStatus status = asynSuccess;
|
||||
const char functionName[]{"receiveUDP"};
|
||||
// int isConnected = 1;
|
||||
std::size_t received;
|
||||
int eomReason;
|
||||
@@ -470,9 +471,9 @@ void asynStreamGeneratorDriver::receiveUDP() {
|
||||
// "%s:%s: isConnected = %d\n", driverName, functionName,
|
||||
// isConnected);
|
||||
|
||||
status = pasynOctetSyncIO->read(pasynUDPUser, buffer, bufferSize,
|
||||
0, // timeout
|
||||
&received, &eomReason);
|
||||
pasynOctetSyncIO->read(pasynUDPUser, buffer, bufferSize,
|
||||
0, // timeout
|
||||
&received, &eomReason);
|
||||
|
||||
if (received) {
|
||||
const uint16_t bufferLength = ((uint16_t *)buffer)[0];
|
||||
@@ -502,11 +503,7 @@ void asynStreamGeneratorDriver::normaliseUDP() {
|
||||
// * so maybe this isn't necessary to solve, as long as we restart the
|
||||
// electronics at least once a year...
|
||||
|
||||
const char *functionName = "normaliseUDP";
|
||||
asynStatus status = asynSuccess;
|
||||
int isConnected = 1;
|
||||
std::size_t received;
|
||||
int eomReason;
|
||||
const char functionName[]{"normaliseUDP"};
|
||||
|
||||
// The correlation unit sends messages with a maximum size of 1500 bytes.
|
||||
// These messages don't have any obious start or end to synchronise
|
||||
@@ -518,7 +515,7 @@ void asynStreamGeneratorDriver::normaliseUDP() {
|
||||
NormalisedEvent resultBuffer[resultBufferSize];
|
||||
|
||||
// We have 10 mcpdids
|
||||
uint64_t lastBufferNumber[10];
|
||||
uint16_t lastBufferNumber[10];
|
||||
for (size_t i = 0; i < 10; ++i) {
|
||||
lastBufferNumber[i] = 0;
|
||||
}
|
||||
@@ -543,12 +540,11 @@ void asynStreamGeneratorDriver::normaliseUDP() {
|
||||
lastBufferNumber[header->McpdID] !=
|
||||
std::numeric_limits<
|
||||
decltype(header->BufferNumber)>::max()) {
|
||||
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
|
||||
"%s:%s: missed packet on id: %d. Received: %" PRIu64
|
||||
", last: %" PRIu64 "\n",
|
||||
driverName, functionName, header->McpdID,
|
||||
header->BufferNumber,
|
||||
lastBufferNumber[header->McpdID]);
|
||||
asynPrint(
|
||||
pasynUserSelf, ASYN_TRACE_ERROR,
|
||||
"%s:%s: missed packet on id: %d. Received: %d, last: %d\n",
|
||||
driverName, functionName, header->McpdID,
|
||||
header->BufferNumber, lastBufferNumber[header->McpdID]);
|
||||
setIntegerParam(P_UdpDropped, ++droppedMessages);
|
||||
}
|
||||
|
||||
@@ -597,7 +593,7 @@ inline int eventsInQueue(epicsRingBytesId id) {
|
||||
|
||||
void asynStreamGeneratorDriver::partialSortEvents() {
|
||||
|
||||
const char *functionName = "partialSortEvents";
|
||||
// const char functionName[]{"partialSortEvents"};
|
||||
|
||||
// x * number of ids * max events in packet
|
||||
int bufferedEvents = 5 * 10 * 243;
|
||||
@@ -617,7 +613,7 @@ void asynStreamGeneratorDriver::partialSortEvents() {
|
||||
// wait for mininmum packet frequency or enough packets to ensure we
|
||||
// could potentially have at least 1 packet per mcpdid
|
||||
while (queuedEvents < bufferedEvents &&
|
||||
epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ull) {
|
||||
epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ll) {
|
||||
epicsThreadSleep(0.0001); // seconds
|
||||
currentTime = epicsTime::getCurrent();
|
||||
queuedEvents = eventsInQueue(this->normalisedQueue);
|
||||
@@ -650,7 +646,7 @@ inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &ne) {
|
||||
|
||||
void asynStreamGeneratorDriver::processEvents() {
|
||||
|
||||
const char *functionName = "processEvents";
|
||||
// const char functionName[]{"processEvents"};
|
||||
|
||||
// x * number of ids * max events in packet * event size
|
||||
int bufferedEvents = 5 * 10 * 243;
|
||||
@@ -675,15 +671,12 @@ void asynStreamGeneratorDriver::processEvents() {
|
||||
epicsInt64 counts[this->num_channels];
|
||||
double elapsedSeconds = 0;
|
||||
uint64_t startTimestamp = std::numeric_limits<uint64_t>::max();
|
||||
uint64_t currTimestamp;
|
||||
|
||||
epicsInt32 currStatus = STATUS_IDLE;
|
||||
epicsInt32 prevStatus = STATUS_IDLE;
|
||||
epicsInt32 countPreset;
|
||||
epicsInt32 timePreset;
|
||||
epicsInt32 presetChannel;
|
||||
epicsInt32 udpQueueHighWaterMark = 0;
|
||||
epicsInt32 sortedQueueHighWaterMark = 0;
|
||||
|
||||
while (true) {
|
||||
|
||||
@@ -695,7 +688,7 @@ void asynStreamGeneratorDriver::processEvents() {
|
||||
// wait for mininmum packet frequency or enough packets to ensure we
|
||||
// could potentially have at least 1 packet per mcpdid
|
||||
while (queuedEvents < bufferedEvents &&
|
||||
epicsTimeDiffInNS(¤tTime, &lastProcess) < 250'000'000ull) {
|
||||
epicsTimeDiffInNS(¤tTime, &lastProcess) < 250'000'000ll) {
|
||||
epicsThreadSleep(0.0001); // seconds
|
||||
currentTime = epicsTime::getCurrent();
|
||||
queuedEvents = eventsInQueue(this->sortedQueue);
|
||||
@@ -715,7 +708,7 @@ void asynStreamGeneratorDriver::processEvents() {
|
||||
epicsRingBytesGet(this->sortedQueue, (char *)newStartPtr,
|
||||
queuedEvents * sizeof(NormalisedEvent));
|
||||
|
||||
int toProcess =
|
||||
std::size_t toProcess =
|
||||
eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5;
|
||||
|
||||
// TODO could also consider an in-place merge
|
||||
@@ -736,7 +729,7 @@ void asynStreamGeneratorDriver::processEvents() {
|
||||
// reset status variables
|
||||
startTimestamp = eventsA[0].timestamp;
|
||||
elapsedSeconds = 0;
|
||||
for (size_t i = 0; i < this->num_channels; ++i) {
|
||||
for (std::size_t i = 0; i < this->num_channels; ++i) {
|
||||
counts[i] = 0;
|
||||
}
|
||||
}
|
||||
@@ -747,7 +740,7 @@ void asynStreamGeneratorDriver::processEvents() {
|
||||
// are using them for comparison, or for showing to the user, to
|
||||
// try and make sure the data we send to kafka is correct, while
|
||||
// the measurement time also appears intuitive.
|
||||
for (size_t i = 0; i < toProcess; ++i) {
|
||||
for (std::size_t i = 0; i < toProcess; ++i) {
|
||||
counts[eventsA[i].source == 0 ? eventsA[i].pixelId + 1 : 0] +=
|
||||
1;
|
||||
elapsedSeconds = (eventsA[i].timestamp - startTimestamp) / 1e9;
|
||||
@@ -762,7 +755,7 @@ void asynStreamGeneratorDriver::processEvents() {
|
||||
this->queueForKafka(eventsA[i]);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < num_channels; ++i) {
|
||||
for (std::size_t i = 0; i < num_channels; ++i) {
|
||||
setInteger64Param(P_Counts[i], counts[i]);
|
||||
}
|
||||
setDoubleParam(P_ElapsedTime, elapsedSeconds);
|
||||
|
||||
Reference in New Issue
Block a user