diff --git a/db/channels.db b/db/channels.db index bb7a3fe..0b94f18 100644 --- a/db/channels.db +++ b/db/channels.db @@ -25,6 +25,7 @@ record(seq, "$(INSTR)$(NAME):O$(CHANNEL)") field(DO0, 0) field(SELM, "Specified") field(SELL, "$(INSTR)$(NAME):M$(CHANNEL).VAL") + field(SCAN, ".1 second") } # Current Status of Channel, i.e. is it ready to count? @@ -34,6 +35,7 @@ record(bi, "$(INSTR)$(NAME):S$(CHANNEL)") field(VAL, 0) field(ZNAM, "OK") field(ONAM, "CLEARING") + field(PINI, 1) } ################################################################################ diff --git a/db/daq_common.db b/db/daq_common.db index 631fd8a..410b80d 100644 --- a/db/daq_common.db +++ b/db/daq_common.db @@ -66,6 +66,7 @@ record(seq, "$(INSTR)$(NAME):ETO") field(DO0, 0) field(SELM, "Specified") field(SELL, "$(INSTR)$(NAME):ELAPSED-TIME.VAL") + field(SCAN, ".1 second") } # Current Status of Channel, i.e. is it ready to count? @@ -75,6 +76,7 @@ record(bi, "$(INSTR)$(NAME):ETS") field(VAL, 0) field(ZNAM, "OK") field(ONAM, "CLEARING") + field(PINI, 1) } ################################################################################ @@ -201,4 +203,5 @@ record(ai,"$(INSTR)$(NAME):ELAPSED-TIME") field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) TIME") field(SCAN, "I/O Intr") field(PINI, "YES") + # field(FLNK, "$(INSTR)$(NAME):ETO") } diff --git a/scripts/st.cmd b/scripts/st.cmd index 5802ad2..489cf84 100755 --- a/scripts/st.cmd +++ b/scripts/st.cmd @@ -9,8 +9,8 @@ epicsEnvSet("INSTR", "SQ:TEST:") epicsEnvSet("NAME", "SG") drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 1) -asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "linkafka01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 1000, 8192) -# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "", "", "", 0, 0) +# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "linkafka01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 1000, 8192) +asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "", "", "", 0, 0) dbLoadRecords("$(StreamGenerator_DB)daq_common.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNELS=5") diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index dfe1033..4862233 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -156,7 +156,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( } // Create Events - this->pausedEventId = epicsEventCreate(epicsEventEmpty); + // this->pausedEventId = epicsEventCreate(epicsEventEmpty); if (enableKafkaStream) { @@ -259,36 +259,77 @@ asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser, const char *functionName = "writeInt32"; getParamName(function, ¶mName); - // if (status) { - // epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize, - // "%s:%s: status=%d, function=%d, name=%s, value=%d", - // driverName, functionName, status, function, paramName, - // value); - // return status; - // } + // TODO should maybe lock mutex for this + epicsInt32 currentStatus; + status = getIntegerParam(this->P_Status, ¤tStatus); + if (status) { + epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize, + "%s:%s: status=%d, function=%d, name=%s, value=%d", + driverName, functionName, status, function, paramName, + value); + return status; + } + + // TODO clean up + bool isClearCount = false; + size_t channelToClear; + for (size_t i = 0; i < this->num_channels; ++i) { + isClearCount |= function == P_ClearCounts[i]; + if (isClearCount) { + channelToClear = i; + break; + } + } + + // TODO should check everything... if (function == P_CountPreset) { - // TODO should block setting a preset when already set - setIntegerParam(function, value); - setIntegerParam(P_Status, STATUS_COUNTING); - status = (asynStatus)callParamCallbacks(); - epicsEventSignal(this->pausedEventId); + if (!currentStatus) { + setIntegerParam(function, value); + setIntegerParam(P_Status, STATUS_COUNTING); + status = (asynStatus)callParamCallbacks(); + } else { + return asynError; + } } else if (function == P_TimePreset) { - // TODO should block setting a preset when already set - setIntegerParam(function, value); - setIntegerParam(P_Status, STATUS_COUNTING); - status = (asynStatus)callParamCallbacks(); - epicsEventSignal(this->pausedEventId); + if (!currentStatus) { + setIntegerParam(function, value); + setIntegerParam(P_Status, STATUS_COUNTING); + status = (asynStatus)callParamCallbacks(); + } else { + return asynError; + } + } else if (function == P_ClearElapsedTime) { + if (!currentStatus) { + setIntegerParam(P_ElapsedTime, 0); + status = (asynStatus)callParamCallbacks(); + } else { + return asynError; + } + } else if (isClearCount) { + if (!currentStatus) { + setIntegerParam(P_Counts[channelToClear], 0); + status = (asynStatus)callParamCallbacks(); + } else { + return asynError; + } } else if (function == P_Reset) { + lock(); // TODO should probably set back everything to defaults setIntegerParam(P_Status, STATUS_IDLE); status = (asynStatus)callParamCallbacks(); + unlock(); + } else if (function == P_Stop) { + lock(); + setIntegerParam(P_Status, STATUS_IDLE); + status = (asynStatus)callParamCallbacks(); + unlock(); } else if (function == P_MonitorChannel) { - epicsInt32 currentStatus; - getIntegerParam(this->P_Status, ¤tStatus); if (!currentStatus) { setIntegerParam(function, value); status = (asynStatus)callParamCallbacks(); + } else { + return asynError; } } else { setIntegerParam(function, value); @@ -478,55 +519,65 @@ void asynStreamGeneratorDriver::processEvents() { if (currStatus == STATUS_COUNTING) { startTimestamp = std::min(startTimestamp, ne->timestamp); - counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1; currTimestamp = ne->timestamp; elapsedSeconds = 0 ? currTimestamp <= startTimestamp : ((double)(currTimestamp - startTimestamp)) / 1e9; - this->queueForKafka(ne); - } else { - delete ne; - } + // is our count finished? + if ((countPreset && counts[presetChannel] >= countPreset) || + (timePreset && elapsedSeconds >= timePreset)) { - // is our count finished? - if ((countPreset && counts[presetChannel] >= countPreset) || - (timePreset && elapsedSeconds >= timePreset)) { + // filter out events that occured after the specified time + if (ne->timestamp - startTimestamp <= countPreset) { + counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1; + this->queueForKafka(ne); + + // add any remaining events with the same timestamp + // we could theoretically have a small overrun if the + // timestamps are identical on the monitor channel + while (!timeQueue.empty() && + !timeQueue.top()->timestamp == currTimestamp) { + ne = timeQueue.top(); + timeQueue.pop(); + counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1; + this->queueForKafka(ne); + } + } else { + delete ne; + } + + countPreset = 0; + timePreset = 0; + + lock(); + for (size_t i = 0; i < num_channels; ++i) { + setIntegerParam(P_Counts[i], counts[i]); + } + setIntegerParam(P_ElapsedTime, elapsedSeconds); + setIntegerParam(P_CountPreset, countPreset); + setIntegerParam(P_TimePreset, timePreset); + callParamCallbacks(); + setIntegerParam(P_Status, STATUS_IDLE); + callParamCallbacks(); + unlock(); + + } else { - // add any remaining events with the same timestamp - // we could theoretically have a small overrun if the - // timestamps are identical on the monitor channel - while (!timeQueue.empty() && - !timeQueue.top()->timestamp == currTimestamp) { - ne = timeQueue.top(); - timeQueue.pop(); counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1; this->queueForKafka(ne); + + lock(); + for (size_t i = 0; i < num_channels; ++i) { + setIntegerParam(P_Counts[i], counts[i]); + } + setIntegerParam(P_ElapsedTime, elapsedSeconds); + callParamCallbacks(); + unlock(); } - countPreset = 0; - timePreset = 0; - - lock(); - for (size_t i = 0; i < num_channels; ++i) { - setIntegerParam(P_Counts[i], counts[i]); - } - setIntegerParam(P_ElapsedTime, elapsedSeconds); - setIntegerParam(P_CountPreset, countPreset); - setIntegerParam(P_TimePreset, timePreset); - callParamCallbacks(); - setIntegerParam(P_Status, STATUS_IDLE); - callParamCallbacks(); - unlock(); - - } else if (currStatus == STATUS_COUNTING) { - lock(); - for (size_t i = 0; i < num_channels; ++i) { - setIntegerParam(P_Counts[i], counts[i]); - } - setIntegerParam(P_ElapsedTime, elapsedSeconds); - callParamCallbacks(); - unlock(); + } else { + delete ne; } } }