can stop count and clear channels

This commit is contained in:
2025-11-04 15:31:28 +01:00
parent 2c47f338c2
commit ecc6e98f4c
4 changed files with 116 additions and 60 deletions

View File

@@ -156,7 +156,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
}
// Create Events
this->pausedEventId = epicsEventCreate(epicsEventEmpty);
// this->pausedEventId = epicsEventCreate(epicsEventEmpty);
if (enableKafkaStream) {
@@ -259,36 +259,77 @@ asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser,
const char *functionName = "writeInt32";
getParamName(function, &paramName);
// if (status) {
// epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize,
// "%s:%s: status=%d, function=%d, name=%s, value=%d",
// driverName, functionName, status, function, paramName,
// value);
// return status;
// }
// TODO should maybe lock mutex for this
epicsInt32 currentStatus;
status = getIntegerParam(this->P_Status, &currentStatus);
if (status) {
epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize,
"%s:%s: status=%d, function=%d, name=%s, value=%d",
driverName, functionName, status, function, paramName,
value);
return status;
}
// TODO clean up
bool isClearCount = false;
size_t channelToClear;
for (size_t i = 0; i < this->num_channels; ++i) {
isClearCount |= function == P_ClearCounts[i];
if (isClearCount) {
channelToClear = i;
break;
}
}
// TODO should check everything...
if (function == P_CountPreset) {
// TODO should block setting a preset when already set
setIntegerParam(function, value);
setIntegerParam(P_Status, STATUS_COUNTING);
status = (asynStatus)callParamCallbacks();
epicsEventSignal(this->pausedEventId);
if (!currentStatus) {
setIntegerParam(function, value);
setIntegerParam(P_Status, STATUS_COUNTING);
status = (asynStatus)callParamCallbacks();
} else {
return asynError;
}
} else if (function == P_TimePreset) {
// TODO should block setting a preset when already set
setIntegerParam(function, value);
setIntegerParam(P_Status, STATUS_COUNTING);
status = (asynStatus)callParamCallbacks();
epicsEventSignal(this->pausedEventId);
if (!currentStatus) {
setIntegerParam(function, value);
setIntegerParam(P_Status, STATUS_COUNTING);
status = (asynStatus)callParamCallbacks();
} else {
return asynError;
}
} else if (function == P_ClearElapsedTime) {
if (!currentStatus) {
setIntegerParam(P_ElapsedTime, 0);
status = (asynStatus)callParamCallbacks();
} else {
return asynError;
}
} else if (isClearCount) {
if (!currentStatus) {
setIntegerParam(P_Counts[channelToClear], 0);
status = (asynStatus)callParamCallbacks();
} else {
return asynError;
}
} else if (function == P_Reset) {
lock();
// TODO should probably set back everything to defaults
setIntegerParam(P_Status, STATUS_IDLE);
status = (asynStatus)callParamCallbacks();
unlock();
} else if (function == P_Stop) {
lock();
setIntegerParam(P_Status, STATUS_IDLE);
status = (asynStatus)callParamCallbacks();
unlock();
} else if (function == P_MonitorChannel) {
epicsInt32 currentStatus;
getIntegerParam(this->P_Status, &currentStatus);
if (!currentStatus) {
setIntegerParam(function, value);
status = (asynStatus)callParamCallbacks();
} else {
return asynError;
}
} else {
setIntegerParam(function, value);
@@ -478,55 +519,65 @@ void asynStreamGeneratorDriver::processEvents() {
if (currStatus == STATUS_COUNTING) {
startTimestamp = std::min(startTimestamp, ne->timestamp);
counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1;
currTimestamp = ne->timestamp;
elapsedSeconds =
0 ? currTimestamp <= startTimestamp
: ((double)(currTimestamp - startTimestamp)) / 1e9;
this->queueForKafka(ne);
} else {
delete ne;
}
// is our count finished?
if ((countPreset && counts[presetChannel] >= countPreset) ||
(timePreset && elapsedSeconds >= timePreset)) {
// is our count finished?
if ((countPreset && counts[presetChannel] >= countPreset) ||
(timePreset && elapsedSeconds >= timePreset)) {
// filter out events that occured after the specified time
if (ne->timestamp - startTimestamp <= countPreset) {
counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1;
this->queueForKafka(ne);
// add any remaining events with the same timestamp
// we could theoretically have a small overrun if the
// timestamps are identical on the monitor channel
while (!timeQueue.empty() &&
!timeQueue.top()->timestamp == currTimestamp) {
ne = timeQueue.top();
timeQueue.pop();
counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1;
this->queueForKafka(ne);
}
} else {
delete ne;
}
countPreset = 0;
timePreset = 0;
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], counts[i]);
}
setIntegerParam(P_ElapsedTime, elapsedSeconds);
setIntegerParam(P_CountPreset, countPreset);
setIntegerParam(P_TimePreset, timePreset);
callParamCallbacks();
setIntegerParam(P_Status, STATUS_IDLE);
callParamCallbacks();
unlock();
} else {
// add any remaining events with the same timestamp
// we could theoretically have a small overrun if the
// timestamps are identical on the monitor channel
while (!timeQueue.empty() &&
!timeQueue.top()->timestamp == currTimestamp) {
ne = timeQueue.top();
timeQueue.pop();
counts[ne->source == 0 ? ne->pixelId + 1 : 0] += 1;
this->queueForKafka(ne);
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], counts[i]);
}
setIntegerParam(P_ElapsedTime, elapsedSeconds);
callParamCallbacks();
unlock();
}
countPreset = 0;
timePreset = 0;
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], counts[i]);
}
setIntegerParam(P_ElapsedTime, elapsedSeconds);
setIntegerParam(P_CountPreset, countPreset);
setIntegerParam(P_TimePreset, timePreset);
callParamCallbacks();
setIntegerParam(P_Status, STATUS_IDLE);
callParamCallbacks();
unlock();
} else if (currStatus == STATUS_COUNTING) {
lock();
for (size_t i = 0; i < num_channels; ++i) {
setIntegerParam(P_Counts[i], counts[i]);
}
setIntegerParam(P_ElapsedTime, elapsedSeconds);
callParamCallbacks();
unlock();
} else {
delete ne;
}
}
}