diff --git a/scripts/st.cmd b/scripts/st.cmd index d8fe540..463962b 100755 --- a/scripts/st.cmd +++ b/scripts/st.cmd @@ -9,7 +9,7 @@ epicsEnvSet("INSTR", "SQ:TEST:") epicsEnvSet("NAME", "SG") drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 0) -asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4) +asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 1000, 8192) dbLoadRecords("$(StreamGenerator_DB)daq_common.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNELS=5") diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index b9f28a2..c2309c1 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -67,12 +67,23 @@ static void detectorProducerTask(void *drvPvt) { pSGD->produceDetector(); } +/******************************************************************************* + * Stream Generator Helper Methods + */ + +asynStatus asynStreamGeneratorDriver::createInt32Param( + // TODO should show error if there is one + asynStatus status, char *name, int *variable, epicsInt32 initialValue) { + return (asynStatus)(status | createParam(name, asynParamInt32, variable) | + setIntegerParam(*variable, initialValue)); +} + /******************************************************************************* * Stream Generator Methods */ -asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, - const char *ipPortName, - const int numChannels) +asynStreamGeneratorDriver::asynStreamGeneratorDriver( + const char *portName, const char *ipPortName, const int numChannels, + const int kafkaQueueSize, const int kafkaMaxPacketSize) : asynPortDriver(portName, 1, /* maxAddr */ asynInt32Mask | asynInt64Mask | asynDrvUserMask, /* Interface mask */ @@ -84,57 +95,27 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, 1, /* Autoconnect */ 0, /* Default priority */ 0), /* Default stack size*/ - num_channels(numChannels + 1), monitorQueue(1000, false), - detectorQueue(1000, false) { + num_channels(numChannels + 1), monitorQueue(kafkaQueueSize, false), + detectorQueue(kafkaQueueSize, false), + kafkaMaxPacketSize(kafkaMaxPacketSize) { const char *functionName = "asynStreamGeneratorDriver"; // Parameter Setup asynStatus status = asynSuccess; - status = (asynStatus)(status | createParam(P_StatusString, asynParamInt32, - &P_Status)); - status = (asynStatus)(status | setIntegerParam(P_Status, STATUS_IDLE)); - - status = (asynStatus)(status | - createParam(P_ResetString, asynParamInt32, &P_Reset)); - status = (asynStatus)(status | setIntegerParam(P_Reset, 0)); - - status = (asynStatus)(status | - createParam(P_StopString, asynParamInt32, &P_Stop)); - status = (asynStatus)(status | setIntegerParam(P_Stop, 0)); - - status = (asynStatus)(status | createParam(P_CountPresetString, - asynParamInt32, &P_CountPreset)); - status = (asynStatus)(status | setIntegerParam(P_CountPreset, 0)); - - status = (asynStatus)(status | createParam(P_TimePresetString, - asynParamInt32, &P_TimePreset)); - status = (asynStatus)(status | setIntegerParam(P_TimePreset, 0)); - - status = (asynStatus)(status | createParam(P_ElapsedTimeString, - asynParamInt32, &P_ElapsedTime)); - status = (asynStatus)(status | setIntegerParam(P_ElapsedTime, 0)); - - status = (asynStatus)(status | createParam(P_ClearElapsedTimeString, - asynParamInt32, &P_ClearElapsedTime)); - status = (asynStatus)(status | setIntegerParam(P_ClearElapsedTime, 0)); - + status = createInt32Param(status, P_StatusString, &P_Status, STATUS_IDLE); + status = createInt32Param(status, P_ResetString, &P_Reset); + status = createInt32Param(status, P_StopString, &P_Stop); + status = createInt32Param(status, P_CountPresetString, &P_CountPreset); + status = createInt32Param(status, P_TimePresetString, &P_TimePreset); + status = createInt32Param(status, P_ElapsedTimeString, &P_ElapsedTime); status = - (asynStatus)(status | createParam(P_MonitorChannelString, - asynParamInt32, &P_MonitorChannel)); - status = (asynStatus)(status | setIntegerParam(P_MonitorChannel, 0)); - + createInt32Param(status, P_ClearElapsedTimeString, &P_ClearElapsedTime); status = - (asynStatus)(status | createParam(P_ThresholdString, - asynParamInt32, &P_Threshold)); - status = (asynStatus)(status | setIntegerParam(P_Threshold, 1)); - - status = - (asynStatus)(status | createParam(P_ThresholdChannelString, - asynParamInt32, &P_ThresholdChannel)); - status = (asynStatus)(status | setIntegerParam(P_ThresholdChannel, 1)); - - + createInt32Param(status, P_MonitorChannelString, &P_MonitorChannel); + status = createInt32Param(status, P_ThresholdString, &P_Threshold, 1); + status = createInt32Param(status, P_ThresholdChannelString, + &P_ThresholdChannel, 1); // Create Parameters templated on Channel Number char pv_name_buffer[100]; @@ -144,24 +125,15 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, for (size_t i = 0; i < this->num_channels; ++i) { memset(pv_name_buffer, 0, 100); epicsSnprintf(pv_name_buffer, 100, P_CountsString, i); - status = - (asynStatus)(status | createParam(pv_name_buffer, asynParamInt32, - P_Counts + i)); - status = (asynStatus)(status | setIntegerParam(P_Counts[i], 0)); + status = createInt32Param(status, pv_name_buffer, P_Counts + i); memset(pv_name_buffer, 0, 100); epicsSnprintf(pv_name_buffer, 100, P_RateString, i); - status = - (asynStatus)(status | createParam(pv_name_buffer, asynParamInt32, - P_Rates + i)); - status = (asynStatus)(status | setIntegerParam(P_Rates[i], 0)); + status = createInt32Param(status, pv_name_buffer, P_Rates + i); memset(pv_name_buffer, 0, 100); epicsSnprintf(pv_name_buffer, 100, P_ClearCountsString, i); - status = - (asynStatus)(status | createParam(pv_name_buffer, asynParamInt32, - P_ClearCounts + i)); - status = (asynStatus)(status | setIntegerParam(P_ClearCounts[i], 0)); + status = createInt32Param(status, pv_name_buffer, P_ClearCounts + i); } if (status) { @@ -251,13 +223,13 @@ asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser, // } if (function == P_CountPreset) { - // TODO should block setting a preset when already set + // TODO should block setting a preset when already set setIntegerParam(function, value); setIntegerParam(P_Status, STATUS_COUNTING); status = (asynStatus)callParamCallbacks(); epicsEventSignal(this->pausedEventId); } else if (function == P_TimePreset) { - // TODO should block setting a preset when already set + // TODO should block setting a preset when already set setIntegerParam(function, value); setIntegerParam(P_Status, STATUS_COUNTING); status = (asynStatus)callParamCallbacks(); @@ -331,7 +303,7 @@ void asynStreamGeneratorDriver::receiveUDP() { start_time = std::numeric_limits::max(); current_time = 0; - elapsedTime = 0; + elapsedTime = 0; lock(); for (size_t i = 0; i < num_channels; ++i) { @@ -423,7 +395,7 @@ void asynStreamGeneratorDriver::receiveUDP() { for (size_t i = 0; i < num_channels; ++i) { setIntegerParam(P_Counts[i], counts[i]); } - elapsedTime = current_time - start_time; + elapsedTime = current_time - start_time; setIntegerParam(P_ElapsedTime, elapsedTime); callParamCallbacks(); unlock(); @@ -433,7 +405,8 @@ void asynStreamGeneratorDriver::receiveUDP() { functionName); } - if ((countPreset && counts[presetChannel] >= countPreset) || (timePreset && elapsedTime >= timePreset)) { + if ((countPreset && counts[presetChannel] >= countPreset) || + (timePreset && elapsedTime >= timePreset)) { lock(); setIntegerParam(P_Status, STATUS_IDLE); setIntegerParam(P_CountPreset, 0); @@ -452,10 +425,10 @@ void asynStreamGeneratorDriver::produceMonitor() { flatbuffers::FlatBufferBuilder builder(1024); std::vector tof; - tof.reserve(9000); + tof.reserve(this->kafkaMaxPacketSize + 16); std::vector did; - did.reserve(9000); + did.reserve(this->kafkaMaxPacketSize + 16); int total = 0; epicsTimeStamp last_sent = epicsTime::getCurrent(); @@ -481,7 +454,7 @@ void asynStreamGeneratorDriver::produceMonitor() { epicsTimeStamp now = epicsTime::getCurrent(); // At least every 0.2 seconds - if (total >= 8192 || + if (total >= this->kafkaMaxPacketSize || epicsTimeDiffInNS(&now, &last_sent) > 200'000'000ll) { last_sent = epicsTime::getCurrent(); @@ -524,7 +497,7 @@ void asynStreamGeneratorDriver::produceMonitor() { void asynStreamGeneratorDriver::produceDetector() { - static const size_t bufferSize = 9000; + static const size_t bufferSize = this->kafkaMaxPacketSize + 16; flatbuffers::FlatBufferBuilder builder(1024); std::vector tof; @@ -573,7 +546,7 @@ void asynStreamGeneratorDriver::produceDetector() { } while (!timeQueue.empty() && - (timeQueue.size() >= 8192 || + (timeQueue.size() >= this->kafkaMaxPacketSize || (newest - timeQueue.top()) > 5'000'000'000ull)) timeQueue.pop(); epicsInt32 rate = 0; @@ -590,7 +563,7 @@ void asynStreamGeneratorDriver::produceDetector() { epicsTimeStamp now = epicsTime::getCurrent(); // At least every 0.2 seconds - if (total >= 8192 || + if (total >= this->kafkaMaxPacketSize || epicsTimeDiffInNS(&now, &last_sent) > 200'000'000ll) { last_sent = epicsTime::getCurrent(); @@ -638,19 +611,25 @@ extern "C" { asynStatus asynStreamGeneratorDriverConfigure(const char *portName, const char *ipPortName, - const int numChannels) { - new asynStreamGeneratorDriver(portName, ipPortName, numChannels); + const int numChannels, + const int kafkaQueueSize, + const int kafkaMaxPacketSize) { + new asynStreamGeneratorDriver(portName, ipPortName, numChannels, + kafkaQueueSize, kafkaMaxPacketSize); return asynSuccess; } static const iocshArg initArg0 = {"portName", iocshArgString}; static const iocshArg initArg1 = {"ipPortName", iocshArgString}; static const iocshArg initArg2 = {"numChannels", iocshArgInt}; -static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2}; -static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 3, initArgs}; +static const iocshArg initArg3 = {"kafkaQueueSize", iocshArgInt}; +static const iocshArg initArg4 = {"kafkaMaxPacketSize", iocshArgInt}; +static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2, + &initArg3, &initArg4}; +static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 5, initArgs}; static void initCallFunc(const iocshArgBuf *args) { - asynStreamGeneratorDriverConfigure(args[0].sval, args[1].sval, - args[2].ival); + asynStreamGeneratorDriverConfigure(args[0].sval, args[1].sval, args[2].ival, + args[3].ival, args[4].ival); } void asynStreamGeneratorDriverRegister(void) { diff --git a/src/asynStreamGeneratorDriver.h b/src/asynStreamGeneratorDriver.h index 7904a24..d3b6002 100644 --- a/src/asynStreamGeneratorDriver.h +++ b/src/asynStreamGeneratorDriver.h @@ -101,7 +101,8 @@ struct __attribute__((__packed__)) NormalisedDetectorEvent { class asynStreamGeneratorDriver : public asynPortDriver { public: asynStreamGeneratorDriver(const char *portName, const char *ipPortName, - const int numChannels); + const int numChannels, const int kafkaQueueSize, + const int kafkaMaxPacketSize); virtual ~asynStreamGeneratorDriver(); virtual asynStatus writeInt32(asynUser *pasynUser, epicsInt32 value); @@ -130,7 +131,8 @@ class asynStreamGeneratorDriver : public asynPortDriver { asynUser *pasynUDPUser; epicsEventId pausedEventId; - int num_channels; + const int num_channels; + const int kafkaMaxPacketSize; epicsRingPointer monitorQueue; rd_kafka_t *monitorProducer; @@ -139,6 +141,9 @@ class asynStreamGeneratorDriver : public asynPortDriver { rd_kafka_t *detectorProducer; constexpr static char *driverName = "StreamGenerator"; + + asynStatus createInt32Param(asynStatus status, char *name, int *variable, + epicsInt32 initialValue = 0); }; #endif