diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index c20b7ab..f76055e 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -107,8 +107,11 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( 0), /* Default stack size*/ num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream), monitorTopic(monitorTopic), detectorTopic(detectorTopic), - udpQueue(udpQueueSize, false), monitorQueue(kafkaQueueSize, false), - detectorQueue(kafkaQueueSize, false), + udpQueue(epicsRingBytesCreate(udpQueueSize * sizeof(NormalisedEvent))), + monitorQueue( + epicsRingBytesCreate(kafkaQueueSize * sizeof(NormalisedEvent))), + detectorQueue( + epicsRingBytesCreate(kafkaQueueSize * sizeof(NormalisedEvent))), kafkaMaxPacketSize(kafkaMaxPacketSize) { const char *functionName = "asynStreamGeneratorDriver"; @@ -366,6 +369,8 @@ void asynStreamGeneratorDriver::receiveUDP() { lastBufferNumber[i] = 0; } + NormalisedEvent ne; + while (true) { status = pasynManager->isConnected(pasynUDPUser, &isConnected); @@ -404,26 +409,25 @@ void asynStreamGeneratorDriver::receiveUDP() { for (std::size_t i = 0; i < total_events; ++i) { char *event = (buffer + 21 * 2 + i * 6); - NormalisedEvent *ne; - if (event[5] & 0x80) { // Monitor Event MonitorEvent *m_event = (MonitorEvent *)event; - // needs to be freed!!! - ne = new NormalisedEvent( - header->nanosecs() + (uint64_t)m_event->nanosecs(), - 0, m_event->DataID); + ne.timestamp = + header->nanosecs() + (uint64_t)m_event->nanosecs(); + ne.source = 0; + ne.pixelId = m_event->DataID; } else { // Detector Event DetectorEvent *d_event = (DetectorEvent *)event; - // needs to be freed!!! - ne = new NormalisedEvent( - header->nanosecs() + (uint64_t)d_event->nanosecs(), - header->McpdID, d_event->pixelId(header->McpdID)); + ne.timestamp = + header->nanosecs() + (uint64_t)d_event->nanosecs(); + ne.source = header->McpdID; + ne.pixelId = d_event->pixelId(header->McpdID); } - this->udpQueue.push(ne); + epicsRingBytesPut(this->udpQueue, (char *)&ne, + sizeof(NormalisedEvent)); } } else { @@ -435,15 +439,14 @@ void asynStreamGeneratorDriver::receiveUDP() { } } -inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent *ne) { - +inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &&ne) { if (this->kafkaEnabled) { - if (ne->source == 0) - this->monitorQueue.push(ne); + if (ne.source == 0) + epicsRingBytesPut(this->monitorQueue, (char *)&ne, + sizeof(NormalisedEvent)); else - this->detectorQueue.push(ne); - } else { - delete ne; + epicsRingBytesPut(this->detectorQueue, (char *)&ne, + sizeof(NormalisedEvent)); } } @@ -451,21 +454,22 @@ void asynStreamGeneratorDriver::processEvents() { const char *functionName = "processEvents"; - const size_t queueBufferSize = 10 * this->udpQueue.getSize(); + const size_t queueBufferSize = + 10 * epicsRingBytesSize(this->udpQueue) / sizeof(NormalisedEvent); struct { - bool operator()(const NormalisedEvent *l, - const NormalisedEvent *r) const { - return l->timestamp > r->timestamp; + bool operator()(const NormalisedEvent l, + const NormalisedEvent r) const { + return l.timestamp > r.timestamp; } } smallestToLargest; // This should never be used. It is just instantiated to reserve a buffer // of specific size. - std::vector queueBuffer; + std::vector queueBuffer; queueBuffer.reserve(queueBufferSize); - std::priority_queue, + std::priority_queue, decltype(smallestToLargest)> timeQueue(smallestToLargest, std::move(queueBuffer)); @@ -480,7 +484,7 @@ void asynStreamGeneratorDriver::processEvents() { epicsTimeStamp lastRateUpdate = epicsTime::getCurrent(); asynStatus status = asynSuccess; - NormalisedEvent *ne; + NormalisedEvent ne; uint64_t newestTimestamp = 0; uint64_t startTimestamp = std::numeric_limits::max(); uint64_t currTimestamp; @@ -493,22 +497,24 @@ void asynStreamGeneratorDriver::processEvents() { while (true) { - if ((ne = this->udpQueue.pop()) != nullptr) { + // TODO depending on how this is implemented, I may also need to check + // that there is is enough bytes, in case it does partial writes... + if (epicsRingBytesGet(udpQueue, (char *)&ne, sizeof(NormalisedEvent))) { // we should reastart this ioc at least every few years, as at ns // resolution with a uint64_t we will have an overflow after around // 4 years - newestTimestamp = std::max(newestTimestamp, ne->timestamp); + newestTimestamp = std::max(newestTimestamp, ne.timestamp); - ++countDiff[ne->source == 0 ? ne->pixelId + 1 : 0]; + ++countDiff[ne.source == 0 ? ne.pixelId + 1 : 0]; - timeQueue.push(ne); + timeQueue.push(std::move(ne)); } // idea is to try and guarantee at least 1 packet per id or the min // frequency for each id without actually checking all ids if (timeQueue.size() >= 1500 * 10 || (timeQueue.size() > 0 && - newestTimestamp - timeQueue.top()->timestamp >= 200'000'000ull)) { + newestTimestamp - timeQueue.top().timestamp >= 200'000'000ull)) { ne = timeQueue.top(); timeQueue.pop(); @@ -546,8 +552,8 @@ void asynStreamGeneratorDriver::processEvents() { prevStatus = currStatus; if (currStatus == STATUS_COUNTING) { - startTimestamp = std::min(startTimestamp, ne->timestamp); - currTimestamp = ne->timestamp; + startTimestamp = std::min(startTimestamp, ne.timestamp); + currTimestamp = ne.timestamp; elapsedSeconds = 0 ? currTimestamp <= startTimestamp : ((double)(currTimestamp - startTimestamp)) / 1e9; @@ -557,22 +563,20 @@ void asynStreamGeneratorDriver::processEvents() { (timePreset && elapsedSeconds >= timePreset)) { // filter out events that occured after the specified time - if (ne->timestamp - startTimestamp <= countPreset) { - counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1; - this->queueForKafka(ne); + if (ne.timestamp - startTimestamp <= countPreset) { + counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1; + this->queueForKafka(std::move(ne)); // add any remaining events with the same timestamp // we could theoretically have a small overrun if the // timestamps are identical on the monitor channel while (!timeQueue.empty() && - !timeQueue.top()->timestamp == currTimestamp) { + !timeQueue.top().timestamp == currTimestamp) { ne = timeQueue.top(); timeQueue.pop(); - counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1; - this->queueForKafka(ne); + counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1; + this->queueForKafka(std::move(ne)); } - } else { - delete ne; } countPreset = 0; @@ -592,8 +596,8 @@ void asynStreamGeneratorDriver::processEvents() { } else { - counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1; - this->queueForKafka(ne); + counts[ne.source == 0 ? ne.pixelId + 1 : 0] += 1; + this->queueForKafka(std::move(ne)); lock(); for (size_t i = 0; i < num_channels; ++i) { @@ -603,9 +607,6 @@ void asynStreamGeneratorDriver::processEvents() { callParamCallbacks(); unlock(); } - - } else { - delete ne; } } @@ -622,7 +623,8 @@ void asynStreamGeneratorDriver::processEvents() { for (size_t j = 0; j <= 10; ++j) { cnt += countDiffs[i * 10 + j]; } - rates[i] = cnt / 10.; + rates[i] = + cnt; // would / 10 to average than * 10 as want per second countDiff[i] = 0; } @@ -641,38 +643,41 @@ void asynStreamGeneratorDriver::processEvents() { } } -void asynStreamGeneratorDriver::produceMonitor() { +void asynStreamGeneratorDriver::produce(epicsRingBytesId eventQueue, + rd_kafka_t *kafkaProducer, + const char *topic, const char *source) { flatbuffers::FlatBufferBuilder builder(1024); + const std::size_t bufferSize = this->kafkaMaxPacketSize + 16; + std::vector tof; - tof.reserve(this->kafkaMaxPacketSize + 16); + tof.reserve(bufferSize); std::vector did; - did.reserve(this->kafkaMaxPacketSize + 16); + did.reserve(bufferSize); - int total = 0; epicsTimeStamp last_sent = epicsTime::getCurrent(); - + epicsTimeStamp now = last_sent; + int total = 0; uint64_t message_id = 0; + NormalisedEvent ne; + while (true) { - if (!this->monitorQueue.isEmpty()) { + if (!epicsRingBytesIsEmpty(eventQueue)) { ++total; - auto nme = this->monitorQueue.pop(); - tof.push_back(nme->timestamp); - did.push_back(nme->pixelId); - delete nme; + epicsRingBytesGet(eventQueue, (char *)&ne, sizeof(NormalisedEvent)); + tof.push_back(ne.timestamp); + did.push_back(ne.pixelId); } else { epicsThreadSleep(0.001); // seconds } - // TODO can probably just replace the current - // instead of always getting new object - epicsTimeStamp now = epicsTime::getCurrent(); + now = epicsTime::getCurrent(); // At least every 0.2 seconds if (total >= this->kafkaMaxPacketSize || @@ -685,7 +690,7 @@ void asynStreamGeneratorDriver::produceMonitor() { builder.Clear(); auto message = CreateEventMessageDirect( - builder, "monitor", message_id++, + builder, source, message_id++, ((uint64_t)now.secPastEpoch) * 1'000'000'000ull + ((uint64_t)now.nsec), &tof, &did); @@ -693,7 +698,7 @@ void asynStreamGeneratorDriver::produceMonitor() { builder.Finish(message, "ev42"); rd_kafka_resp_err_t err = rd_kafka_producev( - monitorProducer, RD_KAFKA_V_TOPIC(this->monitorTopic), + kafkaProducer, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), // RD_KAFKA_V_KEY((void *)key, key_len), RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), @@ -703,11 +708,10 @@ void asynStreamGeneratorDriver::produceMonitor() { if (err) { epicsStdoutPrintf("Failed to produce to topic %s: %s\n", - this->monitorTopic, - rd_kafka_err2str(err)); + topic, rd_kafka_err2str(err)); } - rd_kafka_poll(monitorProducer, 0); + rd_kafka_poll(kafkaProducer, 0); tof.clear(); did.clear(); @@ -716,80 +720,12 @@ void asynStreamGeneratorDriver::produceMonitor() { } } +void asynStreamGeneratorDriver::produceMonitor() { + this->produce(monitorQueue, monitorProducer, monitorTopic, "monitor"); +} + void asynStreamGeneratorDriver::produceDetector() { - - static const std::size_t bufferSize = this->kafkaMaxPacketSize + 16; - flatbuffers::FlatBufferBuilder builder(1024); - - std::vector tof; - tof.reserve(bufferSize); - - std::vector did; - did.reserve(bufferSize); - - int total = 0; - epicsTimeStamp last_sent = epicsTime::getCurrent(); - - uint64_t message_id = 0; - - while (true) { - - if (!this->detectorQueue.isEmpty()) { - - ++total; - auto nde = this->detectorQueue.pop(); - tof.push_back(nde->timestamp); - did.push_back(nde->pixelId); - delete nde; - - } else { - // TODO - // rd_kafka_flush(detectorProducer, 10 * 1000); - epicsThreadSleep(0.001); // seconds - } - - epicsTimeStamp now = epicsTime::getCurrent(); - - // At least every 0.2 seconds - if (total >= this->kafkaMaxPacketSize || - epicsTimeDiffInNS(&now, &last_sent) > 200'000'000ll) { - last_sent = epicsTime::getCurrent(); - - if (total) { - total = 0; - - builder.Clear(); - - auto message = CreateEventMessageDirect( - builder, "detector", message_id++, - ((uint64_t)now.secPastEpoch) * 1'000'000'000ull + - ((uint64_t)now.nsec), - &tof, &did); - - builder.Finish(message, "ev42"); - - rd_kafka_resp_err_t err = rd_kafka_producev( - detectorProducer, RD_KAFKA_V_TOPIC(this->detectorTopic), - RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), - // RD_KAFKA_V_KEY((void *)key, key_len), - RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), - builder.GetSize()), - // RD_KAFKA_V_OPAQUE(NULL), - RD_KAFKA_V_END); - - if (err) { - epicsStdoutPrintf("Failed to produce to topic %s: %s\n", - this->detectorTopic, - rd_kafka_err2str(err)); - } - - rd_kafka_poll(detectorProducer, 0); - - tof.clear(); - did.clear(); - } - } - } + this->produce(detectorQueue, detectorProducer, detectorTopic, "detector"); } /******************************************************************************* diff --git a/src/asynStreamGeneratorDriver.h b/src/asynStreamGeneratorDriver.h index 0cc6579..e3dfa25 100644 --- a/src/asynStreamGeneratorDriver.h +++ b/src/asynStreamGeneratorDriver.h @@ -2,7 +2,7 @@ #define asynStreamGeneratorDriver_H #include "asynPortDriver.h" -#include +#include #include /******************************************************************************* @@ -62,8 +62,9 @@ struct __attribute__((__packed__)) NormalisedEvent { uint8_t source; uint32_t pixelId; - inline NormalisedEvent(uint64_t timestamp, uint8_t source, uint32_t pixelId) - : timestamp(timestamp), source(source), pixelId(pixelId){}; + // inline NormalisedEvent(uint64_t timestamp, uint8_t source, uint32_t + // pixelId) + // : timestamp(timestamp), source(source), pixelId(pixelId){}; }; /******************************************************************************* @@ -140,13 +141,13 @@ class asynStreamGeneratorDriver : public asynPortDriver { const bool kafkaEnabled; const int kafkaMaxPacketSize; - epicsRingPointer udpQueue; + epicsRingBytesId udpQueue; - epicsRingPointer monitorQueue; + epicsRingBytesId monitorQueue; rd_kafka_t *monitorProducer; const char *monitorTopic; - epicsRingPointer detectorQueue; + epicsRingBytesId detectorQueue; rd_kafka_t *detectorProducer; const char *detectorTopic; @@ -155,7 +156,10 @@ class asynStreamGeneratorDriver : public asynPortDriver { asynStatus createInt32Param(asynStatus status, char *name, int *variable, epicsInt32 initialValue = 0); - inline void queueForKafka(NormalisedEvent *ne); + inline void queueForKafka(NormalisedEvent &&ne); + + void produce(epicsRingBytesId eventQueue, rd_kafka_t *kafkaProducer, + const char *topic, const char *source); }; #endif