diff --git a/db/channels.db b/db/channels.db index 9df5d06..6e07ee6 100644 --- a/db/channels.db +++ b/db/channels.db @@ -52,15 +52,14 @@ record(longout, "$(INSTR)$(NAME):C$(CHANNEL)") ################################################################################ # Read all monitors values -record(longin, "$(INSTR)$(NAME):M$(CHANNEL)") +record(int64in, "$(INSTR)$(NAME):M$(CHANNEL)") { field(DESC, "DAQ CH$(CHANNEL)") field(EGU, "cts") - field(DTYP, "asynInt32") + field(DTYP, "asynInt64") field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) COUNTS$(CHANNEL)") # This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order # field(SCAN, "I/O Intr") - field(SCAN, ".2 second") field(PINI, "YES") } diff --git a/db/daq_common.db b/db/daq_common.db index 2c5f7f4..22ad9a1 100644 --- a/db/daq_common.db +++ b/db/daq_common.db @@ -15,14 +15,13 @@ record(longout, "$(INSTR)$(NAME):FULL-RESET") ################################################################################ # Status Variables -# record(stringin, "$(INSTR)$(NAME):MsgTxt") -# { -# field(DESC, "Unexpected received response") -# field(DTYP, "devDAQStringError") -# field(FLNK, "$(INSTR)$(NAME):INVALID-CONFIG") -# } - -record(mbbi, "$(INSTR)$(NAME):STATUS") +# We separate the RAW-STATUS and the STATUS PV so that the state can be updated +# in a sequence, that guarantees that we included the most recent time and +# counts before the status switches back to Idle. +# We do this via a sequenced update +# +# RAW-STATUS -> ELAPSED-SECONDS -> M* -> STATUS +record(mbbi, "$(INSTR)$(NAME):RAW-STATUS") { field(DESC, "DAQ Status") field(DTYP, "asynInt32") @@ -40,7 +39,41 @@ record(mbbi, "$(INSTR)$(NAME):STATUS") field(FRST, "INVALID") # This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order #field(SCAN, "I/O Intr") - field(SCAN, ".5 second") + field(SCAN, ".2 second") + field(FLNK, "$(INSTR)$(NAME):READALL") + field(PINI, "YES") +} + +record(fanout, "$(INSTR)$(NAME):READALL") +{ + field(SELM, "All") + field(LNK0, "$(INSTR)$(NAME):ELAPSED-TIME PP") + field(LNK1, "$(INSTR)$(NAME):M0") + field(LNK2, "$(INSTR)$(NAME):M1") + field(LNK3, "$(INSTR)$(NAME):M2") + field(LNK4, "$(INSTR)$(NAME):M3") + field(LNK5, "$(INSTR)$(NAME):M4") + # Doesn't seemt o be a problem to have more in here :D + # field(LNK6, "$(INSTR)$(NAME):M5") + # field(LNK7, "$(INSTR)$(NAME):M6") + field(FLNK, "$(INSTR)$(NAME):STATUS") +} + +record(mbbi, "$(INSTR)$(NAME):STATUS") +{ + field(INP, "$(INSTR)$(NAME):RAW-STATUS NPP") + field(DESC, "DAQ Status") + field(ZRVL, "0") + field(ZRST, "Idle") + field(ONVL, "1") + field(ONST, "Counting") + field(TWVL, "2") + field(TWST, "Low rate") + field(THVL, "3") + field(THST, "Paused") + # 4 should never happen, if it does it means the DAQ reports undocumented statusbits + field(FRVL, "4") + field(FRST, "INVALID") field(PINI, "YES") } @@ -196,14 +229,13 @@ record(longout, "$(INSTR)$(NAME):CT") ################################################################################ # Read all monitors values -record(ai,"$(INSTR)$(NAME):ELAPSED-TIME") +record(ai, "$(INSTR)$(NAME):ELAPSED-TIME") { field(DESC, "DAQ Measured Time") field(EGU, "sec") field(DTYP, "asynFloat64") field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) TIME") # field(SCAN, "I/O Intr") - field(SCAN, ".5 second") field(PINI, "YES") # field(FLNK, "$(INSTR)$(NAME):ETO") } diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp index 11bb120..fbbf86c 100644 --- a/src/asynStreamGeneratorDriver.cpp +++ b/src/asynStreamGeneratorDriver.cpp @@ -101,6 +101,13 @@ asynStatus asynStreamGeneratorDriver::createInt32Param( setIntegerParam(*variable, initialValue)); } +asynStatus asynStreamGeneratorDriver::createInt64Param( + asynStatus status, char *name, int *variable, epicsInt64 initialValue) { + // TODO should show error if there is one + return (asynStatus)(status | createParam(name, asynParamInt64, variable) | + setInteger64Param(*variable, initialValue)); +} + asynStatus asynStreamGeneratorDriver::createFloat64Param(asynStatus status, char *name, int *variable, @@ -120,7 +127,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( const char *detectorTopic, const int kafkaQueueSize, const int kafkaMaxPacketSize) : asynPortDriver(portName, 1, /* maxAddr */ - asynInt32Mask | asynFloat64Mask | + asynInt32Mask | asynInt64Mask | asynFloat64Mask | asynDrvUserMask, /* Interface mask */ asynInt32Mask, // | asynFloat64Mask, /* Interrupt mask */ 0, /* asynFlags. This driver does not block and it is @@ -173,7 +180,7 @@ asynStreamGeneratorDriver::asynStreamGeneratorDriver( for (std::size_t i = 0; i < this->num_channels; ++i) { memset(pv_name_buffer, 0, 100); epicsSnprintf(pv_name_buffer, 100, P_CountsString, i); - status = createInt32Param(status, pv_name_buffer, P_Counts + i); + status = createInt64Param(status, pv_name_buffer, P_Counts + i); memset(pv_name_buffer, 0, 100); epicsSnprintf(pv_name_buffer, 100, P_RateString, i); @@ -407,7 +414,7 @@ asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser, } } else if (isClearCount) { if (!currentStatus) { - setIntegerParam(P_Counts[channelToClear], 0); + setInteger64Param(P_Counts[channelToClear], 0); status = (asynStatus)callParamCallbacks(); } else { return asynError; @@ -447,7 +454,7 @@ void asynStreamGeneratorDriver::receiveUDP() { const char *functionName = "receiveUDP"; asynStatus status = asynSuccess; - int isConnected = 1; + // int isConnected = 1; std::size_t received; int eomReason; @@ -468,8 +475,10 @@ void asynStreamGeneratorDriver::receiveUDP() { &received, &eomReason); if (received) { + const uint16_t bufferLength = ((uint16_t *)buffer)[0]; + const std::size_t headerLength = 42; - if ((received - 42) % 6 == 0) { + if (received >= headerLength && received == bufferLength * 2) { epicsRingBytesPut(this->udpQueue, (char *)buffer, bufferSize); @@ -499,7 +508,7 @@ void asynStreamGeneratorDriver::normaliseUDP() { std::size_t received; int eomReason; - // The correlation unit sents messages with a maximum size of 1500 bytes. + // The correlation unit sends messages with a maximum size of 1500 bytes. // These messages don't have any obious start or end to synchronise // against... const std::size_t bufferSize = 1500; @@ -516,6 +525,9 @@ void asynStreamGeneratorDriver::normaliseUDP() { epicsInt32 droppedMessages = 0; + const UDPHeader *header; + const DetectorEvent *d_event; + const MonitorEvent *m_event; NormalisedEvent ne; while (true) { @@ -524,9 +536,8 @@ void asynStreamGeneratorDriver::normaliseUDP() { epicsRingBytesGet(this->udpQueue, (char *)buffer, bufferSize); - UDPHeader *header = (UDPHeader *)buffer; - - std::size_t total_events = (header->BufferLength - 21) / 3; + header = (UDPHeader *)buffer; + const std::size_t total_events = (header->BufferLength - 21) / 3; if (header->BufferNumber - lastBufferNumber[header->McpdID] > 1 && lastBufferNumber[header->McpdID] != @@ -540,22 +551,22 @@ void asynStreamGeneratorDriver::normaliseUDP() { lastBufferNumber[header->McpdID]); setIntegerParam(P_UdpDropped, ++droppedMessages); } + lastBufferNumber[header->McpdID] = header->BufferNumber; for (std::size_t i = 0; i < total_events; ++i) { char *event = (buffer + 21 * 2 + i * 6); + const bool isMonitorEvent = event[5] & 0x80; - if (event[5] & 0x80) { // Monitor Event - MonitorEvent *m_event = (MonitorEvent *)event; - + if (isMonitorEvent) { + m_event = (MonitorEvent *)event; ne.timestamp = header->nanosecs() + (uint64_t)m_event->nanosecs(); ne.source = 0; ne.pixelId = m_event->DataID; - } else { // Detector Event - DetectorEvent *d_event = (DetectorEvent *)event; - + } else { + d_event = (DetectorEvent *)event; ne.timestamp = header->nanosecs() + (uint64_t)d_event->nanosecs(); ne.source = header->McpdID; @@ -661,7 +672,7 @@ void asynStreamGeneratorDriver::processEvents() { epicsTimeStamp lastProcess = epicsTime::getCurrent(); epicsTimeStamp currentTime = lastProcess; - epicsInt32 counts[this->num_channels]; + epicsInt64 counts[this->num_channels]; double elapsedSeconds = 0; uint64_t startTimestamp = std::numeric_limits::max(); uint64_t currTimestamp; @@ -741,6 +752,8 @@ void asynStreamGeneratorDriver::processEvents() { 1; elapsedSeconds = (eventsA[i].timestamp - startTimestamp) / 1e9; + // TODO should really check there an no more events with the + // same final timestamp if ((countPreset && counts[presetChannel] >= countPreset) || (timePreset && elapsedSeconds > (double)timePreset)) break; @@ -750,7 +763,7 @@ void asynStreamGeneratorDriver::processEvents() { } for (size_t i = 0; i < num_channels; ++i) { - setIntegerParam(P_Counts[i], counts[i]); + setInteger64Param(P_Counts[i], counts[i]); } setDoubleParam(P_ElapsedTime, elapsedSeconds); diff --git a/src/asynStreamGeneratorDriver.h b/src/asynStreamGeneratorDriver.h index 1517697..3783374 100644 --- a/src/asynStreamGeneratorDriver.h +++ b/src/asynStreamGeneratorDriver.h @@ -22,7 +22,7 @@ struct __attribute__((__packed__)) UDPHeader { uint16_t Parameter2[3]; uint16_t Parameter3[3]; - inline uint64_t nanosecs() { + inline uint64_t nanosecs() const { uint64_t nsec{((uint64_t)TimeStamp[2]) << 32 | ((uint64_t)TimeStamp[1]) << 16 | (uint64_t)TimeStamp[0]}; return nsec * 100; @@ -35,8 +35,8 @@ struct __attribute__((__packed__)) DetectorEvent { uint16_t YPosition : 10; uint16_t Amplitude : 8; uint16_t Id : 1; - inline uint32_t nanosecs() { return TimeStamp * 100; } - inline uint64_t pixelId(uint32_t mpcdId) { + inline uint32_t nanosecs() const { return TimeStamp * 100; } + inline uint64_t pixelId(uint32_t mpcdId) const { const uint32_t x_pixels = 128; const uint32_t y_pixels = 128; return (mpcdId - 1) * x_pixels * y_pixels + @@ -50,7 +50,7 @@ struct __attribute__((__packed__)) MonitorEvent { uint64_t DataID : 4; uint64_t TriggerID : 3; uint64_t Id : 1; - inline uint32_t nanosecs() { return TimeStamp * 100; } + inline uint32_t nanosecs() const { return TimeStamp * 100; } }; /******************************************************************************* @@ -172,6 +172,9 @@ class asynStreamGeneratorDriver : public asynPortDriver { asynStatus createInt32Param(asynStatus status, char *name, int *variable, epicsInt32 initialValue = 0); + asynStatus createInt64Param(asynStatus status, char *name, int *variable, + epicsInt64 initialValue = 0); + asynStatus createFloat64Param(asynStatus status, char *name, int *variable, double initialValue = 0);