moves more options to ioc function
This commit is contained in:
@@ -9,7 +9,7 @@ epicsEnvSet("INSTR", "SQ:TEST:")
|
|||||||
epicsEnvSet("NAME", "SG")
|
epicsEnvSet("NAME", "SG")
|
||||||
|
|
||||||
drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 0)
|
drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 0)
|
||||||
asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4)
|
asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 1000, 8192)
|
||||||
|
|
||||||
dbLoadRecords("$(StreamGenerator_DB)daq_common.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNELS=5")
|
dbLoadRecords("$(StreamGenerator_DB)daq_common.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNELS=5")
|
||||||
|
|
||||||
|
|||||||
@@ -67,12 +67,23 @@ static void detectorProducerTask(void *drvPvt) {
|
|||||||
pSGD->produceDetector();
|
pSGD->produceDetector();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
* Stream Generator Helper Methods
|
||||||
|
*/
|
||||||
|
|
||||||
|
asynStatus asynStreamGeneratorDriver::createInt32Param(
|
||||||
|
// TODO should show error if there is one
|
||||||
|
asynStatus status, char *name, int *variable, epicsInt32 initialValue) {
|
||||||
|
return (asynStatus)(status | createParam(name, asynParamInt32, variable) |
|
||||||
|
setIntegerParam(*variable, initialValue));
|
||||||
|
}
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
* Stream Generator Methods
|
* Stream Generator Methods
|
||||||
*/
|
*/
|
||||||
asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
|
asynStreamGeneratorDriver::asynStreamGeneratorDriver(
|
||||||
const char *ipPortName,
|
const char *portName, const char *ipPortName, const int numChannels,
|
||||||
const int numChannels)
|
const int kafkaQueueSize, const int kafkaMaxPacketSize)
|
||||||
: asynPortDriver(portName, 1, /* maxAddr */
|
: asynPortDriver(portName, 1, /* maxAddr */
|
||||||
asynInt32Mask | asynInt64Mask |
|
asynInt32Mask | asynInt64Mask |
|
||||||
asynDrvUserMask, /* Interface mask */
|
asynDrvUserMask, /* Interface mask */
|
||||||
@@ -84,57 +95,27 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
|
|||||||
1, /* Autoconnect */
|
1, /* Autoconnect */
|
||||||
0, /* Default priority */
|
0, /* Default priority */
|
||||||
0), /* Default stack size*/
|
0), /* Default stack size*/
|
||||||
num_channels(numChannels + 1), monitorQueue(1000, false),
|
num_channels(numChannels + 1), monitorQueue(kafkaQueueSize, false),
|
||||||
detectorQueue(1000, false) {
|
detectorQueue(kafkaQueueSize, false),
|
||||||
|
kafkaMaxPacketSize(kafkaMaxPacketSize) {
|
||||||
const char *functionName = "asynStreamGeneratorDriver";
|
const char *functionName = "asynStreamGeneratorDriver";
|
||||||
|
|
||||||
// Parameter Setup
|
// Parameter Setup
|
||||||
asynStatus status = asynSuccess;
|
asynStatus status = asynSuccess;
|
||||||
|
|
||||||
status = (asynStatus)(status | createParam(P_StatusString, asynParamInt32,
|
status = createInt32Param(status, P_StatusString, &P_Status, STATUS_IDLE);
|
||||||
&P_Status));
|
status = createInt32Param(status, P_ResetString, &P_Reset);
|
||||||
status = (asynStatus)(status | setIntegerParam(P_Status, STATUS_IDLE));
|
status = createInt32Param(status, P_StopString, &P_Stop);
|
||||||
|
status = createInt32Param(status, P_CountPresetString, &P_CountPreset);
|
||||||
status = (asynStatus)(status |
|
status = createInt32Param(status, P_TimePresetString, &P_TimePreset);
|
||||||
createParam(P_ResetString, asynParamInt32, &P_Reset));
|
status = createInt32Param(status, P_ElapsedTimeString, &P_ElapsedTime);
|
||||||
status = (asynStatus)(status | setIntegerParam(P_Reset, 0));
|
|
||||||
|
|
||||||
status = (asynStatus)(status |
|
|
||||||
createParam(P_StopString, asynParamInt32, &P_Stop));
|
|
||||||
status = (asynStatus)(status | setIntegerParam(P_Stop, 0));
|
|
||||||
|
|
||||||
status = (asynStatus)(status | createParam(P_CountPresetString,
|
|
||||||
asynParamInt32, &P_CountPreset));
|
|
||||||
status = (asynStatus)(status | setIntegerParam(P_CountPreset, 0));
|
|
||||||
|
|
||||||
status = (asynStatus)(status | createParam(P_TimePresetString,
|
|
||||||
asynParamInt32, &P_TimePreset));
|
|
||||||
status = (asynStatus)(status | setIntegerParam(P_TimePreset, 0));
|
|
||||||
|
|
||||||
status = (asynStatus)(status | createParam(P_ElapsedTimeString,
|
|
||||||
asynParamInt32, &P_ElapsedTime));
|
|
||||||
status = (asynStatus)(status | setIntegerParam(P_ElapsedTime, 0));
|
|
||||||
|
|
||||||
status = (asynStatus)(status | createParam(P_ClearElapsedTimeString,
|
|
||||||
asynParamInt32, &P_ClearElapsedTime));
|
|
||||||
status = (asynStatus)(status | setIntegerParam(P_ClearElapsedTime, 0));
|
|
||||||
|
|
||||||
status =
|
status =
|
||||||
(asynStatus)(status | createParam(P_MonitorChannelString,
|
createInt32Param(status, P_ClearElapsedTimeString, &P_ClearElapsedTime);
|
||||||
asynParamInt32, &P_MonitorChannel));
|
|
||||||
status = (asynStatus)(status | setIntegerParam(P_MonitorChannel, 0));
|
|
||||||
|
|
||||||
status =
|
status =
|
||||||
(asynStatus)(status | createParam(P_ThresholdString,
|
createInt32Param(status, P_MonitorChannelString, &P_MonitorChannel);
|
||||||
asynParamInt32, &P_Threshold));
|
status = createInt32Param(status, P_ThresholdString, &P_Threshold, 1);
|
||||||
status = (asynStatus)(status | setIntegerParam(P_Threshold, 1));
|
status = createInt32Param(status, P_ThresholdChannelString,
|
||||||
|
&P_ThresholdChannel, 1);
|
||||||
status =
|
|
||||||
(asynStatus)(status | createParam(P_ThresholdChannelString,
|
|
||||||
asynParamInt32, &P_ThresholdChannel));
|
|
||||||
status = (asynStatus)(status | setIntegerParam(P_ThresholdChannel, 1));
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// Create Parameters templated on Channel Number
|
// Create Parameters templated on Channel Number
|
||||||
char pv_name_buffer[100];
|
char pv_name_buffer[100];
|
||||||
@@ -144,24 +125,15 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName,
|
|||||||
for (size_t i = 0; i < this->num_channels; ++i) {
|
for (size_t i = 0; i < this->num_channels; ++i) {
|
||||||
memset(pv_name_buffer, 0, 100);
|
memset(pv_name_buffer, 0, 100);
|
||||||
epicsSnprintf(pv_name_buffer, 100, P_CountsString, i);
|
epicsSnprintf(pv_name_buffer, 100, P_CountsString, i);
|
||||||
status =
|
status = createInt32Param(status, pv_name_buffer, P_Counts + i);
|
||||||
(asynStatus)(status | createParam(pv_name_buffer, asynParamInt32,
|
|
||||||
P_Counts + i));
|
|
||||||
status = (asynStatus)(status | setIntegerParam(P_Counts[i], 0));
|
|
||||||
|
|
||||||
memset(pv_name_buffer, 0, 100);
|
memset(pv_name_buffer, 0, 100);
|
||||||
epicsSnprintf(pv_name_buffer, 100, P_RateString, i);
|
epicsSnprintf(pv_name_buffer, 100, P_RateString, i);
|
||||||
status =
|
status = createInt32Param(status, pv_name_buffer, P_Rates + i);
|
||||||
(asynStatus)(status | createParam(pv_name_buffer, asynParamInt32,
|
|
||||||
P_Rates + i));
|
|
||||||
status = (asynStatus)(status | setIntegerParam(P_Rates[i], 0));
|
|
||||||
|
|
||||||
memset(pv_name_buffer, 0, 100);
|
memset(pv_name_buffer, 0, 100);
|
||||||
epicsSnprintf(pv_name_buffer, 100, P_ClearCountsString, i);
|
epicsSnprintf(pv_name_buffer, 100, P_ClearCountsString, i);
|
||||||
status =
|
status = createInt32Param(status, pv_name_buffer, P_ClearCounts + i);
|
||||||
(asynStatus)(status | createParam(pv_name_buffer, asynParamInt32,
|
|
||||||
P_ClearCounts + i));
|
|
||||||
status = (asynStatus)(status | setIntegerParam(P_ClearCounts[i], 0));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (status) {
|
if (status) {
|
||||||
@@ -251,13 +223,13 @@ asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser,
|
|||||||
// }
|
// }
|
||||||
|
|
||||||
if (function == P_CountPreset) {
|
if (function == P_CountPreset) {
|
||||||
// TODO should block setting a preset when already set
|
// TODO should block setting a preset when already set
|
||||||
setIntegerParam(function, value);
|
setIntegerParam(function, value);
|
||||||
setIntegerParam(P_Status, STATUS_COUNTING);
|
setIntegerParam(P_Status, STATUS_COUNTING);
|
||||||
status = (asynStatus)callParamCallbacks();
|
status = (asynStatus)callParamCallbacks();
|
||||||
epicsEventSignal(this->pausedEventId);
|
epicsEventSignal(this->pausedEventId);
|
||||||
} else if (function == P_TimePreset) {
|
} else if (function == P_TimePreset) {
|
||||||
// TODO should block setting a preset when already set
|
// TODO should block setting a preset when already set
|
||||||
setIntegerParam(function, value);
|
setIntegerParam(function, value);
|
||||||
setIntegerParam(P_Status, STATUS_COUNTING);
|
setIntegerParam(P_Status, STATUS_COUNTING);
|
||||||
status = (asynStatus)callParamCallbacks();
|
status = (asynStatus)callParamCallbacks();
|
||||||
@@ -331,7 +303,7 @@ void asynStreamGeneratorDriver::receiveUDP() {
|
|||||||
|
|
||||||
start_time = std::numeric_limits<uint64_t>::max();
|
start_time = std::numeric_limits<uint64_t>::max();
|
||||||
current_time = 0;
|
current_time = 0;
|
||||||
elapsedTime = 0;
|
elapsedTime = 0;
|
||||||
|
|
||||||
lock();
|
lock();
|
||||||
for (size_t i = 0; i < num_channels; ++i) {
|
for (size_t i = 0; i < num_channels; ++i) {
|
||||||
@@ -423,7 +395,7 @@ void asynStreamGeneratorDriver::receiveUDP() {
|
|||||||
for (size_t i = 0; i < num_channels; ++i) {
|
for (size_t i = 0; i < num_channels; ++i) {
|
||||||
setIntegerParam(P_Counts[i], counts[i]);
|
setIntegerParam(P_Counts[i], counts[i]);
|
||||||
}
|
}
|
||||||
elapsedTime = current_time - start_time;
|
elapsedTime = current_time - start_time;
|
||||||
setIntegerParam(P_ElapsedTime, elapsedTime);
|
setIntegerParam(P_ElapsedTime, elapsedTime);
|
||||||
callParamCallbacks();
|
callParamCallbacks();
|
||||||
unlock();
|
unlock();
|
||||||
@@ -433,7 +405,8 @@ void asynStreamGeneratorDriver::receiveUDP() {
|
|||||||
functionName);
|
functionName);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((countPreset && counts[presetChannel] >= countPreset) || (timePreset && elapsedTime >= timePreset)) {
|
if ((countPreset && counts[presetChannel] >= countPreset) ||
|
||||||
|
(timePreset && elapsedTime >= timePreset)) {
|
||||||
lock();
|
lock();
|
||||||
setIntegerParam(P_Status, STATUS_IDLE);
|
setIntegerParam(P_Status, STATUS_IDLE);
|
||||||
setIntegerParam(P_CountPreset, 0);
|
setIntegerParam(P_CountPreset, 0);
|
||||||
@@ -452,10 +425,10 @@ void asynStreamGeneratorDriver::produceMonitor() {
|
|||||||
flatbuffers::FlatBufferBuilder builder(1024);
|
flatbuffers::FlatBufferBuilder builder(1024);
|
||||||
|
|
||||||
std::vector<uint32_t> tof;
|
std::vector<uint32_t> tof;
|
||||||
tof.reserve(9000);
|
tof.reserve(this->kafkaMaxPacketSize + 16);
|
||||||
|
|
||||||
std::vector<uint32_t> did;
|
std::vector<uint32_t> did;
|
||||||
did.reserve(9000);
|
did.reserve(this->kafkaMaxPacketSize + 16);
|
||||||
|
|
||||||
int total = 0;
|
int total = 0;
|
||||||
epicsTimeStamp last_sent = epicsTime::getCurrent();
|
epicsTimeStamp last_sent = epicsTime::getCurrent();
|
||||||
@@ -481,7 +454,7 @@ void asynStreamGeneratorDriver::produceMonitor() {
|
|||||||
epicsTimeStamp now = epicsTime::getCurrent();
|
epicsTimeStamp now = epicsTime::getCurrent();
|
||||||
|
|
||||||
// At least every 0.2 seconds
|
// At least every 0.2 seconds
|
||||||
if (total >= 8192 ||
|
if (total >= this->kafkaMaxPacketSize ||
|
||||||
epicsTimeDiffInNS(&now, &last_sent) > 200'000'000ll) {
|
epicsTimeDiffInNS(&now, &last_sent) > 200'000'000ll) {
|
||||||
last_sent = epicsTime::getCurrent();
|
last_sent = epicsTime::getCurrent();
|
||||||
|
|
||||||
@@ -524,7 +497,7 @@ void asynStreamGeneratorDriver::produceMonitor() {
|
|||||||
|
|
||||||
void asynStreamGeneratorDriver::produceDetector() {
|
void asynStreamGeneratorDriver::produceDetector() {
|
||||||
|
|
||||||
static const size_t bufferSize = 9000;
|
static const size_t bufferSize = this->kafkaMaxPacketSize + 16;
|
||||||
flatbuffers::FlatBufferBuilder builder(1024);
|
flatbuffers::FlatBufferBuilder builder(1024);
|
||||||
|
|
||||||
std::vector<uint32_t> tof;
|
std::vector<uint32_t> tof;
|
||||||
@@ -573,7 +546,7 @@ void asynStreamGeneratorDriver::produceDetector() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
while (!timeQueue.empty() &&
|
while (!timeQueue.empty() &&
|
||||||
(timeQueue.size() >= 8192 ||
|
(timeQueue.size() >= this->kafkaMaxPacketSize ||
|
||||||
(newest - timeQueue.top()) > 5'000'000'000ull))
|
(newest - timeQueue.top()) > 5'000'000'000ull))
|
||||||
timeQueue.pop();
|
timeQueue.pop();
|
||||||
epicsInt32 rate = 0;
|
epicsInt32 rate = 0;
|
||||||
@@ -590,7 +563,7 @@ void asynStreamGeneratorDriver::produceDetector() {
|
|||||||
epicsTimeStamp now = epicsTime::getCurrent();
|
epicsTimeStamp now = epicsTime::getCurrent();
|
||||||
|
|
||||||
// At least every 0.2 seconds
|
// At least every 0.2 seconds
|
||||||
if (total >= 8192 ||
|
if (total >= this->kafkaMaxPacketSize ||
|
||||||
epicsTimeDiffInNS(&now, &last_sent) > 200'000'000ll) {
|
epicsTimeDiffInNS(&now, &last_sent) > 200'000'000ll) {
|
||||||
last_sent = epicsTime::getCurrent();
|
last_sent = epicsTime::getCurrent();
|
||||||
|
|
||||||
@@ -638,19 +611,25 @@ extern "C" {
|
|||||||
|
|
||||||
asynStatus asynStreamGeneratorDriverConfigure(const char *portName,
|
asynStatus asynStreamGeneratorDriverConfigure(const char *portName,
|
||||||
const char *ipPortName,
|
const char *ipPortName,
|
||||||
const int numChannels) {
|
const int numChannels,
|
||||||
new asynStreamGeneratorDriver(portName, ipPortName, numChannels);
|
const int kafkaQueueSize,
|
||||||
|
const int kafkaMaxPacketSize) {
|
||||||
|
new asynStreamGeneratorDriver(portName, ipPortName, numChannels,
|
||||||
|
kafkaQueueSize, kafkaMaxPacketSize);
|
||||||
return asynSuccess;
|
return asynSuccess;
|
||||||
}
|
}
|
||||||
|
|
||||||
static const iocshArg initArg0 = {"portName", iocshArgString};
|
static const iocshArg initArg0 = {"portName", iocshArgString};
|
||||||
static const iocshArg initArg1 = {"ipPortName", iocshArgString};
|
static const iocshArg initArg1 = {"ipPortName", iocshArgString};
|
||||||
static const iocshArg initArg2 = {"numChannels", iocshArgInt};
|
static const iocshArg initArg2 = {"numChannels", iocshArgInt};
|
||||||
static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2};
|
static const iocshArg initArg3 = {"kafkaQueueSize", iocshArgInt};
|
||||||
static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 3, initArgs};
|
static const iocshArg initArg4 = {"kafkaMaxPacketSize", iocshArgInt};
|
||||||
|
static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2,
|
||||||
|
&initArg3, &initArg4};
|
||||||
|
static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 5, initArgs};
|
||||||
static void initCallFunc(const iocshArgBuf *args) {
|
static void initCallFunc(const iocshArgBuf *args) {
|
||||||
asynStreamGeneratorDriverConfigure(args[0].sval, args[1].sval,
|
asynStreamGeneratorDriverConfigure(args[0].sval, args[1].sval, args[2].ival,
|
||||||
args[2].ival);
|
args[3].ival, args[4].ival);
|
||||||
}
|
}
|
||||||
|
|
||||||
void asynStreamGeneratorDriverRegister(void) {
|
void asynStreamGeneratorDriverRegister(void) {
|
||||||
|
|||||||
@@ -101,7 +101,8 @@ struct __attribute__((__packed__)) NormalisedDetectorEvent {
|
|||||||
class asynStreamGeneratorDriver : public asynPortDriver {
|
class asynStreamGeneratorDriver : public asynPortDriver {
|
||||||
public:
|
public:
|
||||||
asynStreamGeneratorDriver(const char *portName, const char *ipPortName,
|
asynStreamGeneratorDriver(const char *portName, const char *ipPortName,
|
||||||
const int numChannels);
|
const int numChannels, const int kafkaQueueSize,
|
||||||
|
const int kafkaMaxPacketSize);
|
||||||
virtual ~asynStreamGeneratorDriver();
|
virtual ~asynStreamGeneratorDriver();
|
||||||
|
|
||||||
virtual asynStatus writeInt32(asynUser *pasynUser, epicsInt32 value);
|
virtual asynStatus writeInt32(asynUser *pasynUser, epicsInt32 value);
|
||||||
@@ -130,7 +131,8 @@ class asynStreamGeneratorDriver : public asynPortDriver {
|
|||||||
asynUser *pasynUDPUser;
|
asynUser *pasynUDPUser;
|
||||||
epicsEventId pausedEventId;
|
epicsEventId pausedEventId;
|
||||||
|
|
||||||
int num_channels;
|
const int num_channels;
|
||||||
|
const int kafkaMaxPacketSize;
|
||||||
|
|
||||||
epicsRingPointer<NormalisedMonitorEvent> monitorQueue;
|
epicsRingPointer<NormalisedMonitorEvent> monitorQueue;
|
||||||
rd_kafka_t *monitorProducer;
|
rd_kafka_t *monitorProducer;
|
||||||
@@ -139,6 +141,9 @@ class asynStreamGeneratorDriver : public asynPortDriver {
|
|||||||
rd_kafka_t *detectorProducer;
|
rd_kafka_t *detectorProducer;
|
||||||
|
|
||||||
constexpr static char *driverName = "StreamGenerator";
|
constexpr static char *driverName = "StreamGenerator";
|
||||||
|
|
||||||
|
asynStatus createInt32Param(asynStatus status, char *name, int *variable,
|
||||||
|
epicsInt32 initialValue = 0);
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
Reference in New Issue
Block a user