diff --git a/Makefile b/Makefile index fc5d65c..40dd680 100644 --- a/Makefile +++ b/Makefile @@ -24,6 +24,6 @@ SOURCES += src/asynStreamGeneratorDriver.cpp USR_CFLAGS += -O3 -Wall -Wextra -Wunused-result -Werror -fvisibility=hidden # -Wpedantic // Does not work because EPICS macros trigger warnings # Required to support EV42/44 -USR_CXXFLAGS += -O3 -I../dep/flatbuffers/include/ -I../schemas +USR_CXXFLAGS += -std=c++17 -O3 -I../dep/flatbuffers/include/ -I../schemas LIB_SYS_LIBS += rdkafka diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index fbbf86c..bb9f7cd 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -6,10 +6,6 @@ #include #include -// Just for printing -#define __STDC_FORMAT_MACROS -#include - #include "asynStreamGeneratorDriver.h" #include @@ -37,9 +33,11 @@ static rd_kafka_t *create_kafka_producer(const char *kafkaBroker) { // Prepare configuration object rd_kafka_conf_t *conf = rd_kafka_conf_new(); // TODO feel not great about this - set_kafka_config_key(conf, "bootstrap.servers", + set_kafka_config_key(conf, const_cast("bootstrap.servers"), const_cast(kafkaBroker)); - set_kafka_config_key(conf, "queue.buffering.max.messages", "10000000"); + set_kafka_config_key(conf, + const_cast("queue.buffering.max.messages"), + const_cast("10000000")); // With 2e6 counts / s // and a packet size of 20480 events (163920 bytes) // this implies we need to send around 100 messages a second @@ -94,22 +92,26 @@ static void detectorProducerTask(void *drvPvt) { * Stream Generator Helper Methods */ -asynStatus asynStreamGeneratorDriver::createInt32Param( - asynStatus status, char *name, int *variable, epicsInt32 initialValue) { +asynStatus +asynStreamGeneratorDriver::createInt32Param(asynStatus status, const char *name, + int *variable, + epicsInt32 initialValue) { // TODO should show error if there is one return (asynStatus)(status | createParam(name, asynParamInt32, variable) | setIntegerParam(*variable, initialValue)); } -asynStatus asynStreamGeneratorDriver::createInt64Param( - asynStatus status, char *name, int *variable, epicsInt64 initialValue) { +asynStatus +asynStreamGeneratorDriver::createInt64Param(asynStatus status, const char *name, + int *variable, + epicsInt64 initialValue) { // TODO should show error if there is one return (asynStatus)(status | createParam(name, asynParamInt64, variable) | setInteger64Param(*variable, initialValue)); } asynStatus asynStreamGeneratorDriver::createFloat64Param(asynStatus status, - char *name, + const char *name, int *variable, double initialValue) { // TODO should show error if there is one @@ -138,8 +140,8 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( 0, /* Default priority */ 0), /* Default stack size*/ num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream), - monitorTopic(monitorTopic), detectorTopic(detectorTopic), - udpQueueSize(udpQueueSize), kafkaQueueSize(kafkaQueueSize), + kafkaQueueSize(kafkaQueueSize), kafkaMaxPacketSize(kafkaMaxPacketSize), + udpQueueSize(udpQueueSize), // measured in max packet sizes udpQueue( epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), @@ -150,10 +152,11 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), monitorQueue( epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))), + monitorTopic(monitorTopic), detectorQueue( epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))), - kafkaMaxPacketSize(kafkaMaxPacketSize) { - const char *functionName = "asynStreamGeneratorDriver"; + detectorTopic(detectorTopic) { + const char functionName[]{"asynStreamGeneratorDriver"}; // Parameter Setup asynStatus status = asynSuccess; @@ -333,9 +336,8 @@ asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, epicsInt32 *value) { int function = pasynUser->reason; - asynStatus status = asynSuccess; const char *paramName; - const char *functionName = "readInt32"; + // const char functionName[]{"readInt32"}; getParamName(function, ¶mName); if (function == P_UdpQueueHighWaterMark) { @@ -362,7 +364,7 @@ asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser, int function = pasynUser->reason; asynStatus status = asynSuccess; const char *paramName; - const char *functionName = "writeInt32"; + const char functionName[]{"writeInt32"}; getParamName(function, ¶mName); // TODO should maybe lock mutex for this @@ -380,7 +382,7 @@ asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser, // TODO clean up bool isClearCount = false; size_t channelToClear; - for (size_t i = 0; i < this->num_channels; ++i) { + for (std::size_t i = 0; i < this->num_channels; ++i) { isClearCount |= function == P_ClearCounts[i]; if (isClearCount) { channelToClear = i; @@ -452,8 +454,7 @@ asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser, void asynStreamGeneratorDriver::receiveUDP() { - const char *functionName = "receiveUDP"; - asynStatus status = asynSuccess; + const char functionName[]{"receiveUDP"}; // int isConnected = 1; std::size_t received; int eomReason; @@ -470,9 +471,9 @@ void asynStreamGeneratorDriver::receiveUDP() { // "%s:%s: isConnected = %d\n", driverName, functionName, // isConnected); - status = pasynOctetSyncIO->read(pasynUDPUser, buffer, bufferSize, - 0, // timeout - &received, &eomReason); + pasynOctetSyncIO->read(pasynUDPUser, buffer, bufferSize, + 0, // timeout + &received, &eomReason); if (received) { const uint16_t bufferLength = ((uint16_t *)buffer)[0]; @@ -502,11 +503,7 @@ void asynStreamGeneratorDriver::normaliseUDP() { // * so maybe this isn't necessary to solve, as long as we restart the // electronics at least once a year... - const char *functionName = "normaliseUDP"; - asynStatus status = asynSuccess; - int isConnected = 1; - std::size_t received; - int eomReason; + const char functionName[]{"normaliseUDP"}; // The correlation unit sends messages with a maximum size of 1500 bytes. // These messages don't have any obious start or end to synchronise @@ -518,7 +515,7 @@ void asynStreamGeneratorDriver::normaliseUDP() { NormalisedEvent resultBuffer[resultBufferSize]; // We have 10 mcpdids - uint64_t lastBufferNumber[10]; + uint16_t lastBufferNumber[10]; for (size_t i = 0; i < 10; ++i) { lastBufferNumber[i] = 0; } @@ -543,12 +540,11 @@ void asynStreamGeneratorDriver::normaliseUDP() { lastBufferNumber[header->McpdID] != std::numeric_limits< decltype(header->BufferNumber)>::max()) { - asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, - "%s:%s: missed packet on id: %d. Received: %" PRIu64 - ", last: %" PRIu64 "\n", - driverName, functionName, header->McpdID, - header->BufferNumber, - lastBufferNumber[header->McpdID]); + asynPrint( + pasynUserSelf, ASYN_TRACE_ERROR, + "%s:%s: missed packet on id: %d. Received: %d, last: %d\n", + driverName, functionName, header->McpdID, + header->BufferNumber, lastBufferNumber[header->McpdID]); setIntegerParam(P_UdpDropped, ++droppedMessages); } @@ -597,7 +593,7 @@ inline int eventsInQueue(epicsRingBytesId id) { void asynStreamGeneratorDriver::partialSortEvents() { - const char *functionName = "partialSortEvents"; + // const char functionName[]{"partialSortEvents"}; // x * number of ids * max events in packet int bufferedEvents = 5 * 10 * 243; @@ -617,7 +613,7 @@ void asynStreamGeneratorDriver::partialSortEvents() { // wait for mininmum packet frequency or enough packets to ensure we // could potentially have at least 1 packet per mcpdid while (queuedEvents < bufferedEvents && - epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ull) { + epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ll) { epicsThreadSleep(0.0001); // seconds currentTime = epicsTime::getCurrent(); queuedEvents = eventsInQueue(this->normalisedQueue); @@ -650,7 +646,7 @@ inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &ne) { void asynStreamGeneratorDriver::processEvents() { - const char *functionName = "processEvents"; + // const char functionName[]{"processEvents"}; // x * number of ids * max events in packet * event size int bufferedEvents = 5 * 10 * 243; @@ -675,15 +671,12 @@ void asynStreamGeneratorDriver::processEvents() { epicsInt64 counts[this->num_channels]; double elapsedSeconds = 0; uint64_t startTimestamp = std::numeric_limits::max(); - uint64_t currTimestamp; epicsInt32 currStatus = STATUS_IDLE; epicsInt32 prevStatus = STATUS_IDLE; epicsInt32 countPreset; epicsInt32 timePreset; epicsInt32 presetChannel; - epicsInt32 udpQueueHighWaterMark = 0; - epicsInt32 sortedQueueHighWaterMark = 0; while (true) { @@ -695,7 +688,7 @@ void asynStreamGeneratorDriver::processEvents() { // wait for mininmum packet frequency or enough packets to ensure we // could potentially have at least 1 packet per mcpdid while (queuedEvents < bufferedEvents && - epicsTimeDiffInNS(¤tTime, &lastProcess) < 250'000'000ull) { + epicsTimeDiffInNS(¤tTime, &lastProcess) < 250'000'000ll) { epicsThreadSleep(0.0001); // seconds currentTime = epicsTime::getCurrent(); queuedEvents = eventsInQueue(this->sortedQueue); @@ -715,7 +708,7 @@ void asynStreamGeneratorDriver::processEvents() { epicsRingBytesGet(this->sortedQueue, (char *)newStartPtr, queuedEvents * sizeof(NormalisedEvent)); - int toProcess = + std::size_t toProcess = eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5; // TODO could also consider an in-place merge @@ -736,7 +729,7 @@ void asynStreamGeneratorDriver::processEvents() { // reset status variables startTimestamp = eventsA[0].timestamp; elapsedSeconds = 0; - for (size_t i = 0; i < this->num_channels; ++i) { + for (std::size_t i = 0; i < this->num_channels; ++i) { counts[i] = 0; } } @@ -747,7 +740,7 @@ void asynStreamGeneratorDriver::processEvents() { // are using them for comparison, or for showing to the user, to // try and make sure the data we send to kafka is correct, while // the measurement time also appears intuitive. - for (size_t i = 0; i < toProcess; ++i) { + for (std::size_t i = 0; i < toProcess; ++i) { counts[eventsA[i].source == 0 ? eventsA[i].pixelId + 1 : 0] += 1; elapsedSeconds = (eventsA[i].timestamp - startTimestamp) / 1e9; @@ -762,7 +755,7 @@ void asynStreamGeneratorDriver::processEvents() { this->queueForKafka(eventsA[i]); } - for (size_t i = 0; i < num_channels; ++i) { + for (std::size_t i = 0; i < num_channels; ++i) { setInteger64Param(P_Counts[i], counts[i]); } setDoubleParam(P_ElapsedTime, elapsedSeconds); diff --git a/src/asynStreamGeneratorDriver.h b/src/asynStreamGeneratorDriver.h index 3783374..963a36a 100644 --- a/src/asynStreamGeneratorDriver.h +++ b/src/asynStreamGeneratorDriver.h @@ -5,6 +5,10 @@ #include #include +// Just for printing +#define __STDC_FORMAT_MACROS +#include + /******************************************************************************* * UDP Packet Definitions */ @@ -81,24 +85,24 @@ struct __attribute__((__packed__)) NormalisedEvent { * i.e.e drvInfo strings that are used to identify the parameters */ -#define P_StatusString "STATUS" -#define P_ResetString "RESET" -#define P_StopString "STOP" -#define P_CountPresetString "P_CNT" -#define P_TimePresetString "P_TIME" -#define P_ElapsedTimeString "TIME" -#define P_ClearElapsedTimeString "C_TIME" -#define P_MonitorChannelString "MONITOR" -#define P_ThresholdString "THRESH" -#define P_ThresholdChannelString "THRESH_CH" +constexpr static char P_StatusString[]{"STATUS"}; +constexpr static char P_ResetString[]{"RESET"}; +constexpr static char P_StopString[]{"STOP"}; +constexpr static char P_CountPresetString[]{"P_CNT"}; +constexpr static char P_TimePresetString[]{"P_TIME"}; +constexpr static char P_ElapsedTimeString[]{"TIME"}; +constexpr static char P_ClearElapsedTimeString[]{"C_TIME"}; +constexpr static char P_MonitorChannelString[]{"MONITOR"}; +constexpr static char P_ThresholdString[]{"THRESH"}; +constexpr static char P_ThresholdChannelString[]{"THRESH_CH"}; -#define P_CountsString "COUNTS%d" -#define P_RateString "RATE%d" -#define P_ClearCountsString "C_%d" +constexpr static char P_CountsString[]{"COUNTS%" PRIu64}; +constexpr static char P_RateString[]{"RATE%" PRIu64}; +constexpr static char P_ClearCountsString[]{"C_%" PRIu64}; -#define P_UdpDroppedString "DROP" -#define P_UdpQueueHighWaterMarkString "UDP" -#define P_SortedQueueHighWaterMarkString "SORT" +constexpr static char P_UdpDroppedString[]{"DROP"}; +constexpr static char P_UdpQueueHighWaterMarkString[]{"UDP"}; +constexpr static char P_SortedQueueHighWaterMarkString[]{"SORT"}; /******************************************************************************* * Stream Generator Coordinating Class @@ -149,7 +153,7 @@ class asynStreamGeneratorDriver : public asynPortDriver { asynUser *pasynUDPUser; epicsEventId pausedEventId; - const int num_channels; + const std::size_t num_channels; const bool kafkaEnabled; const int kafkaQueueSize; const int kafkaMaxPacketSize; @@ -167,16 +171,16 @@ class asynStreamGeneratorDriver : public asynPortDriver { rd_kafka_t *detectorProducer; const char *detectorTopic; - constexpr static char *driverName = "StreamGenerator"; + static constexpr char driverName[]{"StreamGenerator"}; - asynStatus createInt32Param(asynStatus status, char *name, int *variable, - epicsInt32 initialValue = 0); + asynStatus createInt32Param(asynStatus status, const char *name, + int *variable, epicsInt32 initialValue = 0); - asynStatus createInt64Param(asynStatus status, char *name, int *variable, - epicsInt64 initialValue = 0); + asynStatus createInt64Param(asynStatus status, const char *name, + int *variable, epicsInt64 initialValue = 0); - asynStatus createFloat64Param(asynStatus status, char *name, int *variable, - double initialValue = 0); + asynStatus createFloat64Param(asynStatus status, const char *name, + int *variable, double initialValue = 0); inline void queueForKafka(NormalisedEvent &ne);