shows queue usage as a percentage
This commit is contained in:
@@ -213,7 +213,7 @@ record(ai,"$(INSTR)$(NAME):ELAPSED-TIME")
|
|||||||
|
|
||||||
record(longin,"$(INSTR)$(NAME):UDP_DROPPED")
|
record(longin,"$(INSTR)$(NAME):UDP_DROPPED")
|
||||||
{
|
{
|
||||||
field(DESC, "Max Events in Queue")
|
field(DESC, "UDP Packets Missed")
|
||||||
field(EGU, "Events")
|
field(EGU, "Events")
|
||||||
field(DTYP, "asynInt32")
|
field(DTYP, "asynInt32")
|
||||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) DROP")
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) DROP")
|
||||||
@@ -224,8 +224,8 @@ record(longin,"$(INSTR)$(NAME):UDP_DROPPED")
|
|||||||
|
|
||||||
record(longin,"$(INSTR)$(NAME):UDP_WATERMARK")
|
record(longin,"$(INSTR)$(NAME):UDP_WATERMARK")
|
||||||
{
|
{
|
||||||
field(DESC, "Max Events in Queue")
|
field(DESC, "UDP Queue Usage")
|
||||||
field(EGU, "Events")
|
field(EGU, "%")
|
||||||
field(DTYP, "asynInt32")
|
field(DTYP, "asynInt32")
|
||||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) UDP")
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) UDP")
|
||||||
# field(SCAN, "I/O Intr")
|
# field(SCAN, "I/O Intr")
|
||||||
@@ -235,8 +235,8 @@ record(longin,"$(INSTR)$(NAME):UDP_WATERMARK")
|
|||||||
|
|
||||||
record(longin,"$(INSTR)$(NAME):SORTED_WATERMARK")
|
record(longin,"$(INSTR)$(NAME):SORTED_WATERMARK")
|
||||||
{
|
{
|
||||||
field(DESC, "Max Events in Queue")
|
field(DESC, "Partial Sort Queue Usage")
|
||||||
field(EGU, "Events")
|
field(EGU, "%")
|
||||||
field(DTYP, "asynInt32")
|
field(DTYP, "asynInt32")
|
||||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) SORT")
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) SORT")
|
||||||
# field(SCAN, "I/O Intr")
|
# field(SCAN, "I/O Intr")
|
||||||
|
|||||||
@@ -96,8 +96,10 @@ asynStatus asynStreamGeneratorDriver::createInt32Param(
|
|||||||
setIntegerParam(*variable, initialValue));
|
setIntegerParam(*variable, initialValue));
|
||||||
}
|
}
|
||||||
|
|
||||||
asynStatus asynStreamGeneratorDriver::createFloat64Param(
|
asynStatus asynStreamGeneratorDriver::createFloat64Param(asynStatus status,
|
||||||
asynStatus status, char *name, int *variable, double initialValue) {
|
char *name,
|
||||||
|
int *variable,
|
||||||
|
double initialValue) {
|
||||||
// TODO should show error if there is one
|
// TODO should show error if there is one
|
||||||
return (asynStatus)(status | createParam(name, asynParamFloat64, variable) |
|
return (asynStatus)(status | createParam(name, asynParamFloat64, variable) |
|
||||||
setDoubleParam(*variable, initialValue));
|
setDoubleParam(*variable, initialValue));
|
||||||
@@ -114,7 +116,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
|
|||||||
const int kafkaMaxPacketSize)
|
const int kafkaMaxPacketSize)
|
||||||
: asynPortDriver(portName, 1, /* maxAddr */
|
: asynPortDriver(portName, 1, /* maxAddr */
|
||||||
asynInt32Mask | asynFloat64Mask |
|
asynInt32Mask | asynFloat64Mask |
|
||||||
asynDrvUserMask, /* Interface mask */
|
asynDrvUserMask, /* Interface mask */
|
||||||
asynInt32Mask, // | asynFloat64Mask, /* Interrupt mask */
|
asynInt32Mask, // | asynFloat64Mask, /* Interrupt mask */
|
||||||
0, /* asynFlags. This driver does not block and it is
|
0, /* asynFlags. This driver does not block and it is
|
||||||
not multi-device, but has a
|
not multi-device, but has a
|
||||||
@@ -125,6 +127,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
|
|||||||
0), /* Default stack size*/
|
0), /* Default stack size*/
|
||||||
num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream),
|
num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream),
|
||||||
monitorTopic(monitorTopic), detectorTopic(detectorTopic),
|
monitorTopic(monitorTopic), detectorTopic(detectorTopic),
|
||||||
|
udpQueueSize(udpQueueSize), kafkaQueueSize(kafkaQueueSize),
|
||||||
// measured in max packet sizes
|
// measured in max packet sizes
|
||||||
udpQueue(
|
udpQueue(
|
||||||
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
|
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
|
||||||
@@ -308,15 +311,17 @@ asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser,
|
|||||||
getParamName(function, ¶mName);
|
getParamName(function, ¶mName);
|
||||||
|
|
||||||
if (function == P_UdpQueueHighWaterMark) {
|
if (function == P_UdpQueueHighWaterMark) {
|
||||||
*value = epicsRingBytesHighWaterMark(this->udpQueue) /
|
const double toPercent = 100. / (243. * udpQueueSize);
|
||||||
sizeof(NormalisedEvent);
|
*value = (epicsInt32)(epicsRingBytesHighWaterMark(this->udpQueue) /
|
||||||
|
sizeof(NormalisedEvent) * toPercent);
|
||||||
// Aparently resetting the watermark causes problems...
|
// Aparently resetting the watermark causes problems...
|
||||||
// at least concurrently :D
|
// at least concurrently :D
|
||||||
// epicsRingBytesResetHighWaterMark(this->udpQueue);
|
// epicsRingBytesResetHighWaterMark(this->udpQueue);
|
||||||
return asynSuccess;
|
return asynSuccess;
|
||||||
} else if (function == P_SortedQueueHighWaterMark) {
|
} else if (function == P_SortedQueueHighWaterMark) {
|
||||||
*value = epicsRingBytesHighWaterMark(this->sortedQueue) /
|
const double toPercent = 100. / (243. * udpQueueSize);
|
||||||
sizeof(NormalisedEvent);
|
*value = (epicsInt32)(epicsRingBytesHighWaterMark(this->sortedQueue) /
|
||||||
|
sizeof(NormalisedEvent) * toPercent);
|
||||||
// epicsRingBytesResetHighWaterMark(this->sortedQueue);
|
// epicsRingBytesResetHighWaterMark(this->sortedQueue);
|
||||||
return asynSuccess;
|
return asynSuccess;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -150,8 +150,10 @@ class asynStreamGeneratorDriver : public asynPortDriver {
|
|||||||
|
|
||||||
const int num_channels;
|
const int num_channels;
|
||||||
const bool kafkaEnabled;
|
const bool kafkaEnabled;
|
||||||
|
const int kafkaQueueSize;
|
||||||
const int kafkaMaxPacketSize;
|
const int kafkaMaxPacketSize;
|
||||||
|
|
||||||
|
const int udpQueueSize;
|
||||||
epicsRingBytesId udpQueue;
|
epicsRingBytesId udpQueue;
|
||||||
epicsRingBytesId sortedQueue;
|
epicsRingBytesId sortedQueue;
|
||||||
|
|
||||||
@@ -168,7 +170,8 @@ class asynStreamGeneratorDriver : public asynPortDriver {
|
|||||||
asynStatus createInt32Param(asynStatus status, char *name, int *variable,
|
asynStatus createInt32Param(asynStatus status, char *name, int *variable,
|
||||||
epicsInt32 initialValue = 0);
|
epicsInt32 initialValue = 0);
|
||||||
|
|
||||||
asynStatus createFloat64Param(asynStatus status, char *name, int *variable, double initialValue = 0);
|
asynStatus createFloat64Param(asynStatus status, char *name, int *variable,
|
||||||
|
double initialValue = 0);
|
||||||
|
|
||||||
inline void queueForKafka(NormalisedEvent &ne);
|
inline void queueForKafka(NormalisedEvent &ne);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user