2 Commits
1.0.0 ... main

Author SHA1 Message Date
2bfde8c1c6 adds some documentation and fixes compiler warning
Some checks failed
Test And Build / Lint (push) Failing after 3s
Test And Build / Build (push) Failing after 3s
2025-11-24 11:12:16 +01:00
c58e379584 Merge pull request 'Fixes some logic bugs and adds rate calculation' (#2) from bugfixing into main
Some checks failed
Test And Build / Lint (push) Failing after 3s
Test And Build / Build (push) Failing after 3s
Reviewed-on: #2
2025-11-24 10:34:15 +01:00
4 changed files with 23 additions and 79 deletions

View File

@@ -1,5 +1,23 @@
# StreamGenerator
The StreamGenerator, is an Epics module that almost completely implements the
same interface as the [sinqDAQ](https://gitea.psi.ch/lin-epics-modules/sinqDAQ)
Epics module, as an attempt to maintain a consistent interface across all our
data acquisition systems. Sitting behind the interface, however, is different
electronics and firmware, specifically, a multi-readout module "Correlation
Unit" system, which was developed at MLZ.
## Correlation Unit Documentation
The UDP based interface, as well as many other specifics of the detector system
are described in the first file, and within the `doc` directory of the
`qmesydaq` repository.
- [20220608\_ErwiN\_Detector.pdf](./docs/20220608_ErwiN_Detector.pdf)
- [qmesydaq/doc](https://gitea.psi.ch/lin-controls/qmesydaq/src/branch/master/doc/)
## Retrieving Code
Clone the repository to a local directory via:
```

Binary file not shown.

Binary file not shown.

View File

@@ -213,9 +213,6 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
exit(1);
}
// Create Events
// this->pausedEventId = epicsEventCreate(epicsEventEmpty);
if (enableKafkaStream) {
epicsStdoutPrintf(
@@ -277,19 +274,6 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver(
exit(1);
}
/* Create the thread that orders packets of in preparation for our sinqDAQ
* stand-in
*/
// status = (asynStatus)(epicsThreadCreate(
// "partialSort", epicsThreadPriorityMedium,
// epicsThreadGetStackSize(epicsThreadStackMedium),
// (EPICSTHREADFUNC)::sortTask, this) == NULL);
// if (status) {
// epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
// driverName, functionName, status);
// exit(1);
// }
/* Create the thread normalises the events
*/
status =
@@ -622,9 +606,6 @@ void asynStreamGeneratorDriver::normaliseUDP() {
resultBuffer[i] = ne;
}
// epicsRingBytesPut(this->normalisedQueue, (char *)resultBuffer,
// total_events * sizeof(NormalisedEvent));
epicsRingBytesPut(this->sortedQueue, (char *)resultBuffer,
total_events * sizeof(NormalisedEvent));
@@ -708,8 +689,11 @@ void asynStreamGeneratorDriver::processEvents() {
// so we have 5 and want to keep 1/5 for the next window
// which is why toProcess has 4/5 below
// or now
// we have 10 and want to keep 4/10 for the next window
// so toProcess is 6/10 below
// we have 20 and want to keep 1/2 for the next window
// so toProcess is 10/20 below
//
// TODO need to go through out max rate and min rate calculation,
// we could then figure out the optimal buffer size
// we have two buffers. We alternate between reading data into one of them,
// and then merge sorting into the other
@@ -750,7 +734,6 @@ void asynStreamGeneratorDriver::processEvents() {
epicsInt32 countPreset;
epicsInt32 timePreset;
epicsInt32 presetChannel;
std::size_t b_cnt = 0;
while (true) {
@@ -790,20 +773,9 @@ void asynStreamGeneratorDriver::processEvents() {
std::sort(newStartPtr, newStartPtr + queuedEvents, oldestEventsFirst);
// std::size_t toProcess =
// eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5;
std::size_t toProcess =
eventsBLastEnd - eventsBLastStart + queuedEvents * 10 / 20;
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
// "newStartPtr %" PRIu64 " queuedEvents %" PRIu64 " toProcess %" PRIu64 "\n",
// newStartPtr - eventsA, queuedEvents, toProcess);
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
// "eventsBLastStart %" PRIu64 " eventsBLastEnd %" PRIu64"\n",
// eventsBLastStart - eventsB, eventsBLastEnd - eventsB);
// TODO could also consider an in-place merge
eventsBLastEnd = std::merge(newStartPtr, newStartPtr + queuedEvents,
eventsBLastStart, eventsBLastEnd, eventsA,
@@ -811,9 +783,6 @@ void asynStreamGeneratorDriver::processEvents() {
eventsBLastStart = eventsA + toProcess;
// TODO I haven't really taken care of the case that there are no events
if (prevStatus == STATUS_IDLE && currStatus == STATUS_COUNTING) {
getIntegerParam(this->P_CountPreset, &countPreset);
@@ -834,41 +803,8 @@ void asynStreamGeneratorDriver::processEvents() {
"%s:%s: starting count: (%d, %d, %d, %" PRIu64 ")\n",
driverName, functionName, countPreset, timePreset,
presetChannel, startTimestamp);
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
// "%s:%s: starting count: %" PRIu64 " %" PRIu64 " %" PRIu64 "\n",
// driverName, functionName, eventsA[0].timestamp, eventsA[1].timestamp, eventsA[2].timestamp);
}
//if (eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp < eventsA[0].timestamp)
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
// "%s:%s: time-span: %" PRIu64 " %" PRIu64 "\n",
// driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp);
// if (!std::is_sorted( eventsA, eventsA + toProcess, oldestEventsFirst )) {
// // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
// // "%s:%s: not sorted: %" PRIu64 " %" PRIu64 "\n",
// // driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp);
//
//
//
// for (size_t j = 1; j < toProcess; ++j) {
// if (eventsA[j-1].timestamp > eventsA[j].timestamp) {
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
// "%s:%s: not sorted %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 "\n",
// driverName, functionName, b_cnt, j, eventsA[j-1].timestamp, eventsA[j].timestamp);
// }
// }
// ++b_cnt;
// std::sort(eventsA, eventsA + toProcess, oldestEventsFirst);
// }
// if (!std::is_sorted( eventsA, eventsA + toProcess, oldestEventsFirst ))
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
// "%s:%s: not sorted: %" PRIu64 " %" PRIu64 "\n",
// driverName, functionName, eventsA[0].timestamp, eventsA[std::max((std::size_t) 0, toProcess - 1)].timestamp);
if (currStatus == STATUS_COUNTING) {
for (std::size_t i = 0; i < toProcess; ++i) {
@@ -926,9 +862,6 @@ void asynStreamGeneratorDriver::processEvents() {
}
}
// TODO this was just done quickly and is probably not totally correct
// Determing the rates on each channel
uint32_t *ratesWindowPtr = ratesWindow + this->num_channels * ratePtr;
@@ -954,10 +887,6 @@ void asynStreamGeneratorDriver::processEvents() {
ratesStartTimes[ratePtr] = eventsA[0].timestamp;
ratesStopTimes[ratePtr] = eventsA[toProcess-1].timestamp;
}
// else {
// ratesStartTimes[ratePtr] = std::numeric_limits<uint64_t>::max();
// ratesStopTimes[ratePtr] = 0;
// }
uint64_t minTime = std::numeric_limits<uint64_t>::max();
uint64_t maxTime = 0;
@@ -977,9 +906,6 @@ void asynStreamGeneratorDriver::processEvents() {
rate / timeWindow
);
// if (i == 0)
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
// "%s:%s: window %f (%d) counts %f\n", driverName, functionName, timeWindow, timeWindow > 0., rate);
}