diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..a92596f --- /dev/null +++ b/.clang-format @@ -0,0 +1,245 @@ +--- +Language: Cpp +# BasedOnStyle: LLVM +AccessModifierOffset: -2 +AlignAfterOpenBracket: Align +AlignArrayOfStructures: None +AlignConsecutiveAssignments: + Enabled: false + AcrossEmptyLines: false + AcrossComments: false + AlignCompound: false + AlignFunctionPointers: false + PadOperators: true +AlignConsecutiveBitFields: + Enabled: false + AcrossEmptyLines: false + AcrossComments: false + AlignCompound: false + AlignFunctionPointers: false + PadOperators: false +AlignConsecutiveDeclarations: + Enabled: false + AcrossEmptyLines: false + AcrossComments: false + AlignCompound: false + AlignFunctionPointers: false + PadOperators: false +AlignConsecutiveMacros: + Enabled: false + AcrossEmptyLines: false + AcrossComments: false + AlignCompound: false + AlignFunctionPointers: false + PadOperators: false +AlignConsecutiveShortCaseStatements: + Enabled: false + AcrossEmptyLines: false + AcrossComments: false + AlignCaseColons: false +AlignEscapedNewlines: Right +AlignOperands: Align +AlignTrailingComments: + Kind: Always + OverEmptyLines: 0 +AllowAllArgumentsOnNextLine: true +AllowAllParametersOfDeclarationOnNextLine: true +AllowBreakBeforeNoexceptSpecifier: Never +AllowShortBlocksOnASingleLine: Never +AllowShortCaseLabelsOnASingleLine: false +AllowShortCompoundRequirementOnASingleLine: true +AllowShortEnumsOnASingleLine: true +AllowShortFunctionsOnASingleLine: All +AllowShortIfStatementsOnASingleLine: Never +AllowShortLambdasOnASingleLine: All +AllowShortLoopsOnASingleLine: false +AlwaysBreakAfterDefinitionReturnType: None +AlwaysBreakAfterReturnType: None +AlwaysBreakBeforeMultilineStrings: false +AlwaysBreakTemplateDeclarations: MultiLine +AttributeMacros: + - __capability +BinPackArguments: true +BinPackParameters: true +BitFieldColonSpacing: Both +BraceWrapping: + AfterCaseLabel: false + AfterClass: false + AfterControlStatement: Never + AfterEnum: false + AfterExternBlock: false + AfterFunction: false + AfterNamespace: false + AfterObjCDeclaration: false + AfterStruct: false + AfterUnion: false + BeforeCatch: false + BeforeElse: false + BeforeLambdaBody: false + BeforeWhile: false + IndentBraces: false + SplitEmptyFunction: true + SplitEmptyRecord: true + SplitEmptyNamespace: true +BreakAdjacentStringLiterals: true +BreakAfterAttributes: Leave +BreakAfterJavaFieldAnnotations: false +BreakArrays: true +BreakBeforeBinaryOperators: None +BreakBeforeConceptDeclarations: Always +BreakBeforeBraces: Attach +BreakBeforeInlineASMColon: OnlyMultiline +BreakBeforeTernaryOperators: true +BreakConstructorInitializers: BeforeColon +BreakInheritanceList: BeforeColon +BreakStringLiterals: true +ColumnLimit: 80 +CommentPragmas: '^ IWYU pragma:' +CompactNamespaces: false +ConstructorInitializerIndentWidth: 4 +ContinuationIndentWidth: 4 +Cpp11BracedListStyle: true +DerivePointerAlignment: false +DisableFormat: false +EmptyLineAfterAccessModifier: Never +EmptyLineBeforeAccessModifier: LogicalBlock +ExperimentalAutoDetectBinPacking: false +FixNamespaceComments: true +ForEachMacros: + - foreach + - Q_FOREACH + - BOOST_FOREACH +IfMacros: + - KJ_IF_MAYBE +IncludeBlocks: Preserve +IncludeCategories: + - Regex: '^"(llvm|llvm-c|clang|clang-c)/' + Priority: 2 + SortPriority: 0 + CaseSensitive: false + - Regex: '^(<|"(gtest|gmock|isl|json)/)' + Priority: 3 + SortPriority: 0 + CaseSensitive: false + - Regex: '.*' + Priority: 1 + SortPriority: 0 + CaseSensitive: false +IncludeIsMainRegex: '(Test)?$' +IncludeIsMainSourceRegex: '' +IndentAccessModifiers: false +IndentCaseBlocks: false +IndentCaseLabels: false +IndentExternBlock: AfterExternBlock +IndentGotoLabels: true +IndentPPDirectives: None +IndentRequiresClause: true +IndentWidth: 4 +IndentWrappedFunctionNames: false +InsertBraces: false +InsertNewlineAtEOF: false +InsertTrailingCommas: None +IntegerLiteralSeparator: + Binary: 0 + BinaryMinDigits: 0 + Decimal: 0 + DecimalMinDigits: 0 + Hex: 0 + HexMinDigits: 0 +JavaScriptQuotes: Leave +JavaScriptWrapImports: true +KeepEmptyLinesAtTheStartOfBlocks: true +KeepEmptyLinesAtEOF: false +LambdaBodyIndentation: Signature +LineEnding: DeriveLF +MacroBlockBegin: '' +MacroBlockEnd: '' +MaxEmptyLinesToKeep: 1 +NamespaceIndentation: None +ObjCBinPackProtocolList: Auto +ObjCBlockIndentWidth: 2 +ObjCBreakBeforeNestedBlockParam: true +ObjCSpaceAfterProperty: false +ObjCSpaceBeforeProtocolList: true +PackConstructorInitializers: BinPack +PenaltyBreakAssignment: 2 +PenaltyBreakBeforeFirstCallParameter: 19 +PenaltyBreakComment: 300 +PenaltyBreakFirstLessLess: 120 +PenaltyBreakOpenParenthesis: 0 +PenaltyBreakScopeResolution: 500 +PenaltyBreakString: 1000 +PenaltyBreakTemplateDeclaration: 10 +PenaltyExcessCharacter: 1000000 +PenaltyIndentedWhitespace: 0 +PenaltyReturnTypeOnItsOwnLine: 60 +PointerAlignment: Right +PPIndentWidth: -1 +QualifierAlignment: Leave +ReferenceAlignment: Pointer +ReflowComments: true +RemoveBracesLLVM: false +RemoveParentheses: Leave +RemoveSemicolon: false +RequiresClausePosition: OwnLine +RequiresExpressionIndentation: OuterScope +SeparateDefinitionBlocks: Leave +ShortNamespaceLines: 1 +SkipMacroDefinitionBody: false +SortIncludes: CaseSensitive +SortJavaStaticImport: Before +SortUsingDeclarations: LexicographicNumeric +SpaceAfterCStyleCast: false +SpaceAfterLogicalNot: false +SpaceAfterTemplateKeyword: true +SpaceAroundPointerQualifiers: Default +SpaceBeforeAssignmentOperators: true +SpaceBeforeCaseColon: false +SpaceBeforeCpp11BracedList: false +SpaceBeforeCtorInitializerColon: true +SpaceBeforeInheritanceColon: true +SpaceBeforeJsonColon: false +SpaceBeforeParens: ControlStatements +SpaceBeforeParensOptions: + AfterControlStatements: true + AfterForeachMacros: true + AfterFunctionDefinitionName: false + AfterFunctionDeclarationName: false + AfterIfMacros: true + AfterOverloadedOperator: false + AfterPlacementOperator: true + AfterRequiresInClause: false + AfterRequiresInExpression: false + BeforeNonEmptyParentheses: false +SpaceBeforeRangeBasedForLoopColon: true +SpaceBeforeSquareBrackets: false +SpaceInEmptyBlock: false +SpacesBeforeTrailingComments: 1 +SpacesInAngles: Never +SpacesInContainerLiterals: true +SpacesInLineCommentPrefix: + Minimum: 1 + Maximum: -1 +SpacesInParens: Never +SpacesInParensOptions: + InCStyleCasts: false + InConditionalStatements: false + InEmptyParentheses: false + Other: false +SpacesInSquareBrackets: false +Standard: Latest +StatementAttributeLikeMacros: + - Q_EMIT +StatementMacros: + - Q_UNUSED + - QT_REQUIRE_VERSION +TabWidth: 8 +UseTab: Never +VerilogBreakBetweenInstancePorts: true +WhitespaceSensitiveMacros: + - BOOST_PP_STRINGIZE + - CF_SWIFT_NAME + - NS_SWIFT_NAME + - PP_STRINGIZE + - STRINGIZE +... diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a472f69 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +O.* +.*ignore +schemas/ diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..a74445c --- /dev/null +++ b/.gitmodules @@ -0,0 +1,6 @@ +[submodule "dep/streaming-data-types"] + path = dep/streaming-data-types + url = git@gitea.psi.ch:lin-controls/streaming-data-types.git +[submodule "dep/flatbuffers"] + path = dep/flatbuffers + url = git@gitea.psi.ch:lin-controls/flatbuffers.git diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..400d0cb --- /dev/null +++ b/Makefile @@ -0,0 +1,30 @@ +# Include the external Makefile +include /ioc/tools/driver.makefile + +MODULE=StreamGenerator +BUILDCLASSES=Linux +EPICS_VERSIONS=7.0.7 +#ARCH_FILTER=RHEL% +ARCH_FILTER=linux-x86_64 + +# Additional module dependencies +REQUIRED+=asyn + +DBDS += src/asynStreamGeneratorDriver.dbd + +# DB files to include in the release +TEMPLATES += db/channels.db db/daq_common.db + +# HEADERS += src/asynStreamGeneratorDriver.h + +# Source files to build +SOURCES += src/asynStreamGeneratorDriver.cpp + +# I don't think specifying the optimisation level like this is correct... +# but I doesn't hurt :D +USR_CFLAGS += -O3 -Wall -Wextra -Wunused-result -Werror -fvisibility=hidden # -Wpedantic // Does not work because EPICS macros trigger warnings + +# Required to support EV42/44 +USR_CXXFLAGS += -O3 -I../dep/flatbuffers/include/ -I../schemas + +LIB_SYS_LIBS += rdkafka diff --git a/README.md b/README.md new file mode 100644 index 0000000..ae26b8b --- /dev/null +++ b/README.md @@ -0,0 +1,41 @@ +# StreamGenerator + +Clone the repository to a local directory via: + +``` +git clone --recurse-submodules -j8 git@gitea.psi.ch:lin-instrument-computers/StreamGenerator.git +``` + +## Dependencies + +Currently, this project requires a system install of librdkafka. On Redhat, +this means you should run: + +```bash +dnf install -y librdkafka-devel +``` + +Additionally, you must first build Google's *flatbuffers* and ESS's +**streaming-data-types** libraries, which are both included in this project as +submodules under the `dep` directory and which are both necessary to build this +project. + +First, you should enter the *flatbuffers* directory and run the following: + +```bash +cmake -G "Unix Makefiles" +make -j +``` + +After these steps, you will find the program `flatc` has been built and placed +in the directory. + +Next, you should return to the top of this project's directory tree, and create +the flatbuffers from ESS's schema files. This you can do as follows: + +```bash +./dep/flatbuffers/flatc -o schemas/ --cpp --gen-mutable --gen-name-strings --scoped-enums ./dep/streaming-data-types/schemas/* +``` + +This generates header files from each of ESS's schemas and places them in a +schemas directory. diff --git a/db/channels.db b/db/channels.db new file mode 100644 index 0000000..6e07ee6 --- /dev/null +++ b/db/channels.db @@ -0,0 +1,75 @@ +# EPICS Database for streamdevice specific to measurement channels +# +# Macros +# INSTR - Prefix +# NAME - the device name, e.g. EL737 +# PORT - StreamGenerator Port +# CHANNEL - the number associated with the measurment channel + +################################################################################ +# Status Variables + +# Trigger a change in status as clearing +record(bo, "$(INSTR)$(NAME):T$(CHANNEL)") +{ + field(DESC, "Trigger Clearing Status") + field(VAL, 1) + field(OUT, "$(INSTR)$(NAME):S$(CHANNEL) PP") +} + +# Trigger a change in status as value returned to 0 +record(seq, "$(INSTR)$(NAME):O$(CHANNEL)") +{ + field(DESC, "Trigger Returned to 0 Status") + field(LNK0, "$(INSTR)$(NAME):S$(CHANNEL) PP") + field(DO0, 0) + field(SELM, "Specified") + field(SELL, "$(INSTR)$(NAME):M$(CHANNEL).VAL") + field(SCAN, ".1 second") +} + +# Current Status of Channel, i.e. is it ready to count? +record(bi, "$(INSTR)$(NAME):S$(CHANNEL)") +{ + field(DESC, "Channel Status") + field(VAL, 0) + field(ZNAM, "OK") + field(ONAM, "CLEARING") + field(PINI, 1) +} + +################################################################################ +# Count Commands + +record(longout, "$(INSTR)$(NAME):C$(CHANNEL)") +{ + field(DESC, "Clear the current channel count") + field(DTYP, "asynInt32") + field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) C_$(CHANNEL)") + field(FLNK, "$(INSTR)$(NAME):T$(CHANNEL)") +} + +################################################################################ +# Read all monitors values + +record(int64in, "$(INSTR)$(NAME):M$(CHANNEL)") +{ + field(DESC, "DAQ CH$(CHANNEL)") + field(EGU, "cts") + field(DTYP, "asynInt64") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) COUNTS$(CHANNEL)") + # This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order + # field(SCAN, "I/O Intr") + field(PINI, "YES") +} + +record(ai, "$(INSTR)$(NAME):R$(CHANNEL)") +{ + field(DESC, "Rate of DAQ CH$(CHANNEL)") + field(EGU, "cts/sec") + field(DTYP, "asynInt32") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) RATE$(CHANNEL)") + field(SCAN, ".2 second") + # field(SCAN, "I/O Intr") + field(PINI, "YES") +} diff --git a/db/daq_common.db b/db/daq_common.db new file mode 100644 index 0000000..22ad9a1 --- /dev/null +++ b/db/daq_common.db @@ -0,0 +1,277 @@ +# EPICS Database for streamdevice specific to measurement channels +# +# Macros +# INSTR - Prefix +# NAME - the device name, e.g. EL737 +# PORT - StreamGenerator Port + +record(longout, "$(INSTR)$(NAME):FULL-RESET") +{ + field(DESC, "Reset the DAQ") + field(DTYP, "asynInt32") + field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) RESET") +} + +################################################################################ +# Status Variables + +# We separate the RAW-STATUS and the STATUS PV so that the state can be updated +# in a sequence, that guarantees that we included the most recent time and +# counts before the status switches back to Idle. +# We do this via a sequenced update +# +# RAW-STATUS -> ELAPSED-SECONDS -> M* -> STATUS +record(mbbi, "$(INSTR)$(NAME):RAW-STATUS") +{ + field(DESC, "DAQ Status") + field(DTYP, "asynInt32") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) STATUS") + field(ZRVL, "0") + field(ZRST, "Idle") + field(ONVL, "1") + field(ONST, "Counting") + field(TWVL, "2") + field(TWST, "Low rate") + field(THVL, "3") + field(THST, "Paused") + # 4 should never happen, if it does it means the DAQ reports undocumented statusbits + field(FRVL, "4") + field(FRST, "INVALID") + # This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order + #field(SCAN, "I/O Intr") + field(SCAN, ".2 second") + field(FLNK, "$(INSTR)$(NAME):READALL") + field(PINI, "YES") +} + +record(fanout, "$(INSTR)$(NAME):READALL") +{ + field(SELM, "All") + field(LNK0, "$(INSTR)$(NAME):ELAPSED-TIME PP") + field(LNK1, "$(INSTR)$(NAME):M0") + field(LNK2, "$(INSTR)$(NAME):M1") + field(LNK3, "$(INSTR)$(NAME):M2") + field(LNK4, "$(INSTR)$(NAME):M3") + field(LNK5, "$(INSTR)$(NAME):M4") + # Doesn't seemt o be a problem to have more in here :D + # field(LNK6, "$(INSTR)$(NAME):M5") + # field(LNK7, "$(INSTR)$(NAME):M6") + field(FLNK, "$(INSTR)$(NAME):STATUS") +} + +record(mbbi, "$(INSTR)$(NAME):STATUS") +{ + field(INP, "$(INSTR)$(NAME):RAW-STATUS NPP") + field(DESC, "DAQ Status") + field(ZRVL, "0") + field(ZRST, "Idle") + field(ONVL, "1") + field(ONST, "Counting") + field(TWVL, "2") + field(TWST, "Low rate") + field(THVL, "3") + field(THST, "Paused") + # 4 should never happen, if it does it means the DAQ reports undocumented statusbits + field(FRVL, "4") + field(FRST, "INVALID") + field(PINI, "YES") +} + +record(longin, "$(INSTR)$(NAME):CHANNELS") +{ + field(DESC, "Total Supported Channels") + field(VAL, $(CHANNELS)) + field(DISP, 1) +} + +# Trigger a change in status as clearing +record(bo, "$(INSTR)$(NAME):ETT") +{ + field(DESC, "Trigger Clearing Status") + field(VAL, 1) + field(OUT, "$(INSTR)$(NAME):ETS PP") +} + +# Trigger a change in status as value returned to 0 +record(seq, "$(INSTR)$(NAME):ETO") +{ + field(DESC, "Trigger Returned to 0 Status") + field(LNK0, "$(INSTR)$(NAME):ETS PP") + field(DO0, 0) + field(SELM, "Specified") + field(SELL, "$(INSTR)$(NAME):ELAPSED-TIME.VAL") + field(SCAN, ".1 second") +} + +# Current Status of Channel, i.e. is it ready to count? +record(bi, "$(INSTR)$(NAME):ETS") +{ + field(DESC, "Channel Status") + field(VAL, 0) + field(ZNAM, "OK") + field(ONAM, "CLEARING") + field(PINI, 1) +} + +################################################################################ +# Count Commands + +record(ao,"$(INSTR)$(NAME):PRESET-COUNT") +{ + field(DESC, "Count until preset reached") + field(DTYP, "asynInt32") + field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) P_CNT") + field(VAL, 0) + field(PREC, 2) +} + +record(ao,"$(INSTR)$(NAME):PRESET-TIME") +{ + field(DESC, "Count for specified time") + field(EGU, "seconds") + field(DTYP, "asynInt32") + field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) P_TIME") + field(VAL, 0) + field(PREC, 2) +} + +# record(bo,"$(INSTR)$(NAME):PAUSE") +# { +# field(DESC, "Pause the current count") +# field(DTYP, "stream") +# field(OUT, "@... pauseCount($(INSTR)$(NAME):) $(PORT)") +# field(VAL, "0") +# field(FLNK, "$(INSTR)$(NAME):RAW-STATUS") +# } +# +# record(bo,"$(INSTR)$(NAME):CONTINUE") +# { +# field(DESC, "Continue with a count that was paused") +# field(DTYP, "stream") +# field(OUT, "@... continueCount($(INSTR)$(NAME):) $(PORT)") +# field(VAL, "0") +# field(FLNK, "$(INSTR)$(NAME):RAW-STATUS") +# } + +record(longout, "$(INSTR)$(NAME):STOP") +{ + field(DESC, "Stop the current counting operation") + field(DTYP, "asynInt32") + field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) STOP") +} + +record(longout, "$(INSTR)$(NAME):MONITOR-CHANNEL") +{ + field(DESC, "PRESET-COUNT Monitors this channel") + field(DTYP, "asynInt32") + field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) MONITOR") + field(DRVL, "0") # Smallest Monitor Channel + field(DRVH, "$(CHANNELS)") # Largest Monitor Channel +} + +record(longin, "$(INSTR)$(NAME):MONITOR-CHANNEL_RBV") +{ + field(DESC, "PRESET-COUNT Monitors this channel") + field(DTYP, "asynInt32") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) MONITOR") + field(SCAN, "I/O Intr") + field(PINI, "YES") +} + +record(ao,"$(INSTR)$(NAME):THRESHOLD") +{ + field(DESC, "Minimum rate for counting to proceed") + field(DTYP, "asynInt32") + field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH") + field(VAL, "1") # Default Rate + field(DRVL, "1") # Minimum Rate + field(DRVH, "100000") # Maximum Rate +} + +record(ai,"$(INSTR)$(NAME):THRESHOLD_RBV") +{ + field(DESC, "Minimum rate for counting to proceed") + field(EGU, "cts/sec") + field(DTYP, "asynInt32") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH") + field(SCAN, "I/O Intr") + field(PINI, "YES") +} + +record(longout,"$(INSTR)$(NAME):THRESHOLD-MONITOR") +{ + field(DESC, "Channel monitored for minimum rate") + field(DTYP, "asynInt32") + field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH_CH") + field(VAL, "1") # Monitor + field(DRVL, "0") # Smallest Threshold Channel (0 is off) + field(DRVH, "$(CHANNELS)") # Largest Threshold Channel +} + +record(longin,"$(INSTR)$(NAME):THRESHOLD-MONITOR_RBV") +{ + field(DESC, "Channel monitored for minimum rate") + field(EGU, "CH") + field(DTYP, "asynInt32") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH_CH") + field(SCAN, "I/O Intr") + field(PINI, "YES") +} + +record(longout, "$(INSTR)$(NAME):CT") +{ + field(DESC, "Clear the timer") + field(DTYP, "asynInt32") + field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) C_TIME") + field(FLNK, "$(INSTR)$(NAME):ETT") +} + +################################################################################ +# Read all monitors values + +record(ai, "$(INSTR)$(NAME):ELAPSED-TIME") +{ + field(DESC, "DAQ Measured Time") + field(EGU, "sec") + field(DTYP, "asynFloat64") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) TIME") + # field(SCAN, "I/O Intr") + field(PINI, "YES") + # field(FLNK, "$(INSTR)$(NAME):ETO") +} + +################################################################################ +# Stream Generator Status PVs + +record(longin,"$(INSTR)$(NAME):UDP_DROPPED") +{ + field(DESC, "UDP Packets Missed") + field(EGU, "Events") + field(DTYP, "asynInt32") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) DROP") + # field(SCAN, "I/O Intr") + field(SCAN, "1 second") + field(PINI, "YES") +} + +record(longin,"$(INSTR)$(NAME):UDP_WATERMARK") +{ + field(DESC, "UDP Queue Usage") + field(EGU, "%") + field(DTYP, "asynInt32") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) UDP") + # field(SCAN, "I/O Intr") + field(SCAN, "1 second") + field(PINI, "YES") +} + +record(longin,"$(INSTR)$(NAME):SORTED_WATERMARK") +{ + field(DESC, "Partial Sort Queue Usage") + field(EGU, "%") + field(DTYP, "asynInt32") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) SORT") + # field(SCAN, "I/O Intr") + field(SCAN, "1 second") + field(PINI, "YES") +} diff --git a/dep/flatbuffers b/dep/flatbuffers new file mode 160000 index 0000000..1872409 --- /dev/null +++ b/dep/flatbuffers @@ -0,0 +1 @@ +Subproject commit 187240970746d00bbd26b0f5873ed54d2477f9f3 diff --git a/dep/streaming-data-types b/dep/streaming-data-types new file mode 160000 index 0000000..3b1830f --- /dev/null +++ b/dep/streaming-data-types @@ -0,0 +1 @@ +Subproject commit 3b1830faf268bda2175618162cb6a6ce25b0aa23 diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 9dfa0e4..0000000 --- a/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -confluent-kafka==2.12.1 -ess-streaming-data-types==0.27.0 -flatbuffers==25.9.23 -numpy==1.26.3 diff --git a/scripts/ioc.sh b/scripts/ioc.sh new file mode 100755 index 0000000..7528295 --- /dev/null +++ b/scripts/ioc.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +export EPICS_HOST_ARCH=linux-x86_64 +export EPICS_BASE=/usr/local/epics/base-7.0.7 + +PARENT_PATH="$( cd "$(dirname "${BASH_SOURCE[0]}")" ; pwd -P )" + +# /usr/local/bin/procServ -o -L - -f -i ^D^C 20001 "${PARENT_PATH}/st.cmd" -d +${PARENT_PATH}/st.cmd diff --git a/scripts/st.cmd b/scripts/st.cmd new file mode 100755 index 0000000..06a696e --- /dev/null +++ b/scripts/st.cmd @@ -0,0 +1,38 @@ +#!/usr/local/bin/iocsh +#-d + +on error break + +require StreamGenerator, test + +epicsEnvSet("INSTR", "SQ:TEST:") +epicsEnvSet("NAME", "SG") + +# Local UDP Generator Test Config +# drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 1) + +# Correlation Unit Config +drvAsynIPPortConfigure("ASYN_IP_PORT", "172.28.69.20:54321:54321 UDP", 0, 0, 1) + +# With a udpQueue and sortQueue size of 10'000 packets, we can hold in memory +# 10'000 * 243 = 2.43e6 events + +# Kafka Broker and Topic Configuration +# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "linkafka01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 10000, 20480) +# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "ess01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 10000, 20480) + +# Don't send any kafka messages +asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "", "", "", 0, 0) + +dbLoadRecords("$(StreamGenerator_DB)daq_common.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNELS=5") + +# Detector Count Channel +dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=0") + +# Monitor Channels +dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=1") +dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=2") +dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=3") +dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=4") + +iocInit() diff --git a/scripts/udp_gen.py b/scripts/udp_gen.py new file mode 100644 index 0000000..3f01887 --- /dev/null +++ b/scripts/udp_gen.py @@ -0,0 +1,118 @@ +import socket +import time +import random + +sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + +header = [ + 0, 0, # buffer length in 16bit words (1, 0) == 1, (0, 1) == 256 + 0, 0x80, # buffer type (probably should be 0) + 21, 0, # header length + 0, 0, # buffer number + 0, 0, # run id + 0x3, # status + 0, # id of sending module + 0, 0, # timestamp low + 0, 0, # timestamp mid + 0, 0, # timestamp high +] + [0, 0] * 12 # parameters + +data = [ + 0, + 0, + 0, + 0, + 0, + 0 +] + +start_time = time.time_ns() // 100 + +buffer_ids = { + i: (0, 0) for i in range(10) +} + +while True: + + # update timestamp + base_timestamp = time.time_ns() // 100 - start_time + t_low = base_timestamp & 0xffff + t_mid = (base_timestamp >> 16) & 0xffff + t_high = (base_timestamp >> 32) & 0xffff + header[12] = t_low & 0xff + header[13] = t_low >> 8 + header[14] = t_mid & 0xff + header[15] = t_mid >> 8 + header[16] = t_high & 0xff + header[17] = t_high >> 8 + + num_events = random.randint(0, 243) + # num_events = 243 + # num_events = 1 + + # update buffer length + buffer_length = 21 + num_events * 3 + header[0] = buffer_length & 0xff + header[1] = (buffer_length >> 8) & 0xff + + # I believe, that in our case we never mix monitor and detector events as + # the monitors should have id 0 and the detector events 1-9 so I have + # excluded that posibility here. That would, however, if true mean we could + # reduce also the number of checks on the parsing side of things... + + is_monitor = random.randint(0, 9) + # is_monitor = 4 + + header[11] = 0 if is_monitor > 3 else random.randint(1,9) + + # update buffer number (each mcpdid has its own buffer number count) + header[6], header[7] = buffer_ids[header[11]] + header[6] = (header[6] + 1) % (0xff + 1) + header[7] = (header[7] + (header[6] == 0)) % (0xff + 1) + buffer_ids[header[11]] = header[6], header[7] + + tosend = list(header) + + if is_monitor > 3: + + for i in range(num_events): + d = list(data) + + monitor = random.randint(0,3) + # monitor = 0 + + d[5] = (1 << 7) | monitor + + # update trigger timestamp + event_timestamp = (time.time_ns() // 100) - base_timestamp + d[0] = event_timestamp & 0xff + d[1] = (event_timestamp >> 8) & 0xff + d[2] = (event_timestamp >> 16) & 0x07 + + tosend += d + + else: + + for i in range(num_events): + d = list(data) + + amplitude = random.randint(0, 255) + x_pos = random.randint(0, 1023) + y_pos = random.randint(0, 1023) + event_timestamp = (time.time_ns() // 100) - base_timestamp + + d[5] = (0 << 7) | (amplitude >> 1) + d[4] = ((amplitude & 0x01) << 7) | (y_pos >> 3) + d[3] = ((y_pos << 5) & 0xE0) | (x_pos >> 5) + d[2] = ((x_pos << 3) & 0xF8) + + d[0] = event_timestamp & 0xff + d[1] = (event_timestamp >> 8) & 0xff + d[2] |= (event_timestamp >> 16) & 0x07 + + tosend += d + + sock.sendto(bytes(tosend), ('127.0.0.1', 54321)) + mv = memoryview(bytes(header)).cast('H') + print(f'Sent packet {mv[3]} with {num_events} events {base_timestamp}') + # time.sleep(.01) diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp new file mode 100644 index 0000000..fbbf86c --- /dev/null +++ b/src/asynStreamGeneratorDriver.cpp @@ -0,0 +1,910 @@ +#include "asynOctetSyncIO.h" +#include "ev42_events_generated.h" +#include +#include +#include +#include +#include + +// Just for printing +#define __STDC_FORMAT_MACROS +#include + +#include "asynStreamGeneratorDriver.h" +#include + +/******************************************************************************* + * Kafka Methods + */ + +static void set_kafka_config_key(rd_kafka_conf_t *conf, char *key, + char *value) { + char errstr[512]; + rd_kafka_conf_res_t res; + + res = rd_kafka_conf_set(conf, key, value, errstr, sizeof(errstr)); + if (res != RD_KAFKA_CONF_OK) { + epicsStdoutPrintf("Failed to set config value %s : %s\n", key, value); + exit(1); + } +} + +static rd_kafka_t *create_kafka_producer(const char *kafkaBroker) { + + char errstr[512]; + rd_kafka_t *producer; + + // Prepare configuration object + rd_kafka_conf_t *conf = rd_kafka_conf_new(); + // TODO feel not great about this + set_kafka_config_key(conf, "bootstrap.servers", + const_cast(kafkaBroker)); + set_kafka_config_key(conf, "queue.buffering.max.messages", "10000000"); + // With 2e6 counts / s + // and a packet size of 20480 events (163920 bytes) + // this implies we need to send around 100 messages a second + // and need about .2 gigabit upload + // set_kafka_config_key(conf, "queue.buffering.max.kbytes", "10000000"); + + // Create the Producer + producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); + + if (!producer) { + epicsStdoutPrintf("Failed to create Kafka Producer: %s\n", errstr); + exit(1); + } + + return producer; +} + +/******************************************************************************* + * Static Methods Passed to Epics Threads that should run in the background + */ +static void udpPollerTask(void *drvPvt) { + asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; + pSGD->receiveUDP(); +} + +static void udpNormaliserTask(void *drvPvt) { + asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; + pSGD->normaliseUDP(); +} + +static void sortTask(void *drvPvt) { + asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; + pSGD->partialSortEvents(); +} + +static void daqTask(void *drvPvt) { + asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; + pSGD->processEvents(); +} + +static void monitorProducerTask(void *drvPvt) { + asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; + pSGD->produceMonitor(); +} + +static void detectorProducerTask(void *drvPvt) { + asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; + pSGD->produceDetector(); +} + +/******************************************************************************* + * Stream Generator Helper Methods + */ + +asynStatus asynStreamGeneratorDriver::createInt32Param( + asynStatus status, char *name, int *variable, epicsInt32 initialValue) { + // TODO should show error if there is one + return (asynStatus)(status | createParam(name, asynParamInt32, variable) | + setIntegerParam(*variable, initialValue)); +} + +asynStatus asynStreamGeneratorDriver::createInt64Param( + asynStatus status, char *name, int *variable, epicsInt64 initialValue) { + // TODO should show error if there is one + return (asynStatus)(status | createParam(name, asynParamInt64, variable) | + setInteger64Param(*variable, initialValue)); +} + +asynStatus asynStreamGeneratorDriver::createFloat64Param(asynStatus status, + char *name, + int *variable, + double initialValue) { + // TODO should show error if there is one + return (asynStatus)(status | createParam(name, asynParamFloat64, variable) | + setDoubleParam(*variable, initialValue)); +} + +/******************************************************************************* + * Stream Generator Methods + */ +asynStreamGeneratorDriver::asynStreamGeneratorDriver( + const char *portName, const char *ipPortName, const int numChannels, + const int udpQueueSize, const bool enableKafkaStream, + const char *kafkaBroker, const char *monitorTopic, + const char *detectorTopic, const int kafkaQueueSize, + const int kafkaMaxPacketSize) + : asynPortDriver(portName, 1, /* maxAddr */ + asynInt32Mask | asynInt64Mask | asynFloat64Mask | + asynDrvUserMask, /* Interface mask */ + asynInt32Mask, // | asynFloat64Mask, /* Interrupt mask */ + 0, /* asynFlags. This driver does not block and it is + not multi-device, but has a + destructor ASYN_DESTRUCTIBLE our version of the Asyn + is too old to support this flag */ + 1, /* Autoconnect */ + 0, /* Default priority */ + 0), /* Default stack size*/ + num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream), + monitorTopic(monitorTopic), detectorTopic(detectorTopic), + udpQueueSize(udpQueueSize), kafkaQueueSize(kafkaQueueSize), + // measured in max packet sizes + udpQueue( + epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), + normalisedQueue( + epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), + // TODO configurable sizes + sortedQueue( + epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))), + monitorQueue( + epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))), + detectorQueue( + epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))), + kafkaMaxPacketSize(kafkaMaxPacketSize) { + const char *functionName = "asynStreamGeneratorDriver"; + + // Parameter Setup + asynStatus status = asynSuccess; + + status = createInt32Param(status, P_StatusString, &P_Status, STATUS_IDLE); + status = createInt32Param(status, P_ResetString, &P_Reset); + status = createInt32Param(status, P_StopString, &P_Stop); + status = createInt32Param(status, P_CountPresetString, &P_CountPreset); + status = createInt32Param(status, P_TimePresetString, &P_TimePreset); + status = createFloat64Param(status, P_ElapsedTimeString, &P_ElapsedTime); + status = + createInt32Param(status, P_ClearElapsedTimeString, &P_ClearElapsedTime); + status = + createInt32Param(status, P_MonitorChannelString, &P_MonitorChannel); + status = createInt32Param(status, P_ThresholdString, &P_Threshold, 1); + status = createInt32Param(status, P_ThresholdChannelString, + &P_ThresholdChannel, 1); + + // Create Parameters templated on Channel Number + char pv_name_buffer[100]; + P_Counts = new int[this->num_channels]; + P_Rates = new int[this->num_channels]; + P_ClearCounts = new int[this->num_channels]; + for (std::size_t i = 0; i < this->num_channels; ++i) { + memset(pv_name_buffer, 0, 100); + epicsSnprintf(pv_name_buffer, 100, P_CountsString, i); + status = createInt64Param(status, pv_name_buffer, P_Counts + i); + + memset(pv_name_buffer, 0, 100); + epicsSnprintf(pv_name_buffer, 100, P_RateString, i); + status = createInt32Param(status, pv_name_buffer, P_Rates + i); + + memset(pv_name_buffer, 0, 100); + epicsSnprintf(pv_name_buffer, 100, P_ClearCountsString, i); + status = createInt32Param(status, pv_name_buffer, P_ClearCounts + i); + } + + status = createInt32Param(status, P_UdpDroppedString, &P_UdpDropped); + status = createInt32Param(status, P_UdpQueueHighWaterMarkString, + &P_UdpQueueHighWaterMark); + status = createInt32Param(status, P_SortedQueueHighWaterMarkString, + &P_SortedQueueHighWaterMark); + + if (status) { + epicsStdoutPrintf( + "%s:%s: failed to create or setup parameters, status=%d\n", + driverName, functionName, status); + exit(1); + } + + // Create Events + // this->pausedEventId = epicsEventCreate(epicsEventEmpty); + + if (enableKafkaStream) { + + epicsStdoutPrintf( + "Detector Kafka Config: broker=%s, topic=%s\n " + " queue size:%d, max events per packet: %d\n", + kafkaBroker, this->detectorTopic, kafkaQueueSize, + this->kafkaMaxPacketSize); + + epicsStdoutPrintf( + "Monitors Kafka Config: broker=%s, topic=%s\n " + " queue size:%d, max events per packet: %d\n", + kafkaBroker, this->monitorTopic, kafkaQueueSize, + this->kafkaMaxPacketSize); + + this->monitorProducer = create_kafka_producer(kafkaBroker); + this->detectorProducer = create_kafka_producer(kafkaBroker); + + // Setup for Thread Producing Monitor Kafka Events + status = + (asynStatus)(epicsThreadCreate( + "monitor_produce", epicsThreadPriorityMedium, + epicsThreadGetStackSize(epicsThreadStackMedium), + (EPICSTHREADFUNC)::monitorProducerTask, + this) == NULL); + if (status) { + epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", + driverName, functionName, status); + exit(1); + } + + // Setup for Thread Producing Detector Kafka Events + status = + (asynStatus)(epicsThreadCreate( + "monitor_produce", epicsThreadPriorityMedium, + epicsThreadGetStackSize(epicsThreadStackMedium), + (EPICSTHREADFUNC)::detectorProducerTask, + this) == NULL); + if (status) { + epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", + driverName, functionName, status); + exit(1); + } + } else { + + epicsStdoutPrintf("Kafka Stream Disabled\n"); + } + + /* Create the thread that orders the events and acts as our sinqDaq stand-in + */ + status = + (asynStatus)(epicsThreadCreate( + "sinqDAQ", + epicsThreadPriorityMedium, // epicsThreadPriorityMax, + epicsThreadGetStackSize(epicsThreadStackMedium), + (EPICSTHREADFUNC)::daqTask, this) == NULL); + if (status) { + epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", + driverName, functionName, status); + exit(1); + } + + /* Create the thread that orders packets of in preparation for our sinqDAQ + * stand-in + */ + status = (asynStatus)(epicsThreadCreate( + "partialSort", epicsThreadPriorityMedium, + epicsThreadGetStackSize(epicsThreadStackMedium), + (EPICSTHREADFUNC)::sortTask, this) == NULL); + if (status) { + epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", + driverName, functionName, status); + exit(1); + } + + /* Create the thread normalises the events + */ + status = + (asynStatus)(epicsThreadCreate( + "eventNormaliser", + epicsThreadPriorityMedium, // epicsThreadPriorityMax, + epicsThreadGetStackSize(epicsThreadStackMedium), + (EPICSTHREADFUNC)::udpNormaliserTask, this) == NULL); + if (status) { + epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", + driverName, functionName, status); + exit(1); + } + + // UDP Receive Setup + status = pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL); + + if (status) { + epicsStdoutPrintf("%s:%s: Couldn't open connection %s, status=%d\n", + driverName, functionName, ipPortName, status); + exit(1); + } + + /* Create the thread that receives UDP traffic in the background */ + status = (asynStatus)(epicsThreadCreate( + "udp_receive", epicsThreadPriorityMax, + epicsThreadGetStackSize(epicsThreadStackMedium), + (EPICSTHREADFUNC)::udpPollerTask, this) == NULL); + if (status) { + epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n", + driverName, functionName, status); + exit(1); + } +} + +asynStreamGeneratorDriver::~asynStreamGeneratorDriver() { + // should make sure queues are empty and freed + // and that the kafka producers are flushed and freed + delete[] P_Counts; + delete[] P_Rates; + + // TODO add exit should perhaps ensure the queue is flushed + // rd_kafka_poll(producer, 0); + // epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); + // rd_kafka_flush(producer, 10 * 1000); + // epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); +} + +asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, + epicsInt32 *value) { + + int function = pasynUser->reason; + asynStatus status = asynSuccess; + const char *paramName; + const char *functionName = "readInt32"; + getParamName(function, ¶mName); + + if (function == P_UdpQueueHighWaterMark) { + const double toPercent = 100. / (243. * udpQueueSize); + *value = (epicsInt32)(epicsRingBytesHighWaterMark(this->udpQueue) / + sizeof(NormalisedEvent) * toPercent); + // Aparently resetting the watermark causes problems... + // at least concurrently :D + // epicsRingBytesResetHighWaterMark(this->udpQueue); + return asynSuccess; + } else if (function == P_SortedQueueHighWaterMark) { + const double toPercent = 100. / (243. * udpQueueSize); + *value = (epicsInt32)(epicsRingBytesHighWaterMark(this->sortedQueue) / + sizeof(NormalisedEvent) * toPercent); + // epicsRingBytesResetHighWaterMark(this->sortedQueue); + return asynSuccess; + } + + return asynPortDriver::readInt32(pasynUser, value); +} + +asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser, + epicsInt32 value) { + int function = pasynUser->reason; + asynStatus status = asynSuccess; + const char *paramName; + const char *functionName = "writeInt32"; + getParamName(function, ¶mName); + + // TODO should maybe lock mutex for this + epicsInt32 currentStatus; + status = getIntegerParam(this->P_Status, ¤tStatus); + + if (status) { + epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize, + "%s:%s: status=%d, function=%d, name=%s, value=%d", + driverName, functionName, status, function, paramName, + value); + return status; + } + + // TODO clean up + bool isClearCount = false; + size_t channelToClear; + for (size_t i = 0; i < this->num_channels; ++i) { + isClearCount |= function == P_ClearCounts[i]; + if (isClearCount) { + channelToClear = i; + break; + } + } + + // TODO should check everything... + if (function == P_CountPreset) { + if (!currentStatus) { + setIntegerParam(function, value); + setIntegerParam(P_Status, STATUS_COUNTING); + status = (asynStatus)callParamCallbacks(); + } else { + return asynError; + } + } else if (function == P_TimePreset) { + if (!currentStatus) { + setIntegerParam(function, value); + setIntegerParam(P_Status, STATUS_COUNTING); + status = (asynStatus)callParamCallbacks(); + } else { + return asynError; + } + } else if (function == P_ClearElapsedTime) { + if (!currentStatus) { + setIntegerParam(P_ElapsedTime, 0); + status = (asynStatus)callParamCallbacks(); + } else { + return asynError; + } + } else if (isClearCount) { + if (!currentStatus) { + setInteger64Param(P_Counts[channelToClear], 0); + status = (asynStatus)callParamCallbacks(); + } else { + return asynError; + } + } else if (function == P_Reset) { + lock(); + // TODO should probably set back everything to defaults + setIntegerParam(P_Status, STATUS_IDLE); + status = (asynStatus)callParamCallbacks(); + unlock(); + } else if (function == P_Stop) { + lock(); + setIntegerParam(P_Status, STATUS_IDLE); + status = (asynStatus)callParamCallbacks(); + unlock(); + } else if (function == P_MonitorChannel) { + if (!currentStatus) { + setIntegerParam(function, value); + status = (asynStatus)callParamCallbacks(); + } else { + return asynError; + } + } else { + setIntegerParam(function, value); + status = (asynStatus)callParamCallbacks(); + } + + if (status) + epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize, + "%s:%s: status=%d, function=%d, name=%s, value=%d", + driverName, functionName, status, function, paramName, + value); + return status; +} + +void asynStreamGeneratorDriver::receiveUDP() { + + const char *functionName = "receiveUDP"; + asynStatus status = asynSuccess; + // int isConnected = 1; + std::size_t received; + int eomReason; + + const std::size_t bufferSize = 1500; + char buffer[bufferSize]; + + while (true) { + + // status = pasynManager->isConnected(pasynUDPUser, &isConnected); + + // if (!isConnected) + // asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + // "%s:%s: isConnected = %d\n", driverName, functionName, + // isConnected); + + status = pasynOctetSyncIO->read(pasynUDPUser, buffer, bufferSize, + 0, // timeout + &received, &eomReason); + + if (received) { + const uint16_t bufferLength = ((uint16_t *)buffer)[0]; + const std::size_t headerLength = 42; + + if (received >= headerLength && received == bufferLength * 2) { + + epicsRingBytesPut(this->udpQueue, (char *)buffer, bufferSize); + + } else { + asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + "%s:%s: invalid UDP packet\n", driverName, + functionName); + } + } + } +} + +void asynStreamGeneratorDriver::normaliseUDP() { + + // TODO fix time overflows + // Regarding time overflow. + // * the header time stamp is 3 words, i.e. 48 bits. + // * it has a resolution of 100ns + // * so we can cover a maximum of (2^(3*16) - 1) * 1e-7 = 28147497 seconds + // * or about 325 days + // * so maybe this isn't necessary to solve, as long as we restart the + // electronics at least once a year... + + const char *functionName = "normaliseUDP"; + asynStatus status = asynSuccess; + int isConnected = 1; + std::size_t received; + int eomReason; + + // The correlation unit sends messages with a maximum size of 1500 bytes. + // These messages don't have any obious start or end to synchronise + // against... + const std::size_t bufferSize = 1500; + char buffer[bufferSize]; + + const std::size_t resultBufferSize = 243; + NormalisedEvent resultBuffer[resultBufferSize]; + + // We have 10 mcpdids + uint64_t lastBufferNumber[10]; + for (size_t i = 0; i < 10; ++i) { + lastBufferNumber[i] = 0; + } + + epicsInt32 droppedMessages = 0; + + const UDPHeader *header; + const DetectorEvent *d_event; + const MonitorEvent *m_event; + NormalisedEvent ne; + + while (true) { + + if (epicsRingBytesUsedBytes(this->udpQueue) > 1500) { + + epicsRingBytesGet(this->udpQueue, (char *)buffer, bufferSize); + + header = (UDPHeader *)buffer; + const std::size_t total_events = (header->BufferLength - 21) / 3; + + if (header->BufferNumber - lastBufferNumber[header->McpdID] > 1 && + lastBufferNumber[header->McpdID] != + std::numeric_limits< + decltype(header->BufferNumber)>::max()) { + asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + "%s:%s: missed packet on id: %d. Received: %" PRIu64 + ", last: %" PRIu64 "\n", + driverName, functionName, header->McpdID, + header->BufferNumber, + lastBufferNumber[header->McpdID]); + setIntegerParam(P_UdpDropped, ++droppedMessages); + } + + lastBufferNumber[header->McpdID] = header->BufferNumber; + + for (std::size_t i = 0; i < total_events; ++i) { + char *event = (buffer + 21 * 2 + i * 6); + const bool isMonitorEvent = event[5] & 0x80; + + if (isMonitorEvent) { + m_event = (MonitorEvent *)event; + ne.timestamp = + header->nanosecs() + (uint64_t)m_event->nanosecs(); + ne.source = 0; + ne.pixelId = m_event->DataID; + + } else { + d_event = (DetectorEvent *)event; + ne.timestamp = + header->nanosecs() + (uint64_t)d_event->nanosecs(); + ne.source = header->McpdID; + ne.pixelId = d_event->pixelId(header->McpdID); + } + + resultBuffer[i] = ne; + } + + epicsRingBytesPut(this->normalisedQueue, (char *)resultBuffer, + total_events * sizeof(NormalisedEvent)); + + } else { + epicsThreadSleep(0.0001); // seconds + } + } +} + +struct { + bool operator()(const NormalisedEvent l, const NormalisedEvent r) const { + return l.timestamp < r.timestamp; + } +} oldestEventsFirst; + +inline int eventsInQueue(epicsRingBytesId id) { + return epicsRingBytesUsedBytes(id) / sizeof(NormalisedEvent); +} + +void asynStreamGeneratorDriver::partialSortEvents() { + + const char *functionName = "partialSortEvents"; + + // x * number of ids * max events in packet + int bufferedEvents = 5 * 10 * 243; + NormalisedEvent events[bufferedEvents]; + + int queuedEvents = 0; + epicsTimeStamp lastSort = epicsTime::getCurrent(); + epicsTimeStamp currentTime = lastSort; + + while (true) { + + queuedEvents = + eventsInQueue(this->normalisedQueue); // in case we can't wait + lastSort = epicsTime::getCurrent(); + currentTime = lastSort; + + // wait for mininmum packet frequency or enough packets to ensure we + // could potentially have at least 1 packet per mcpdid + while (queuedEvents < bufferedEvents && + epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ull) { + epicsThreadSleep(0.0001); // seconds + currentTime = epicsTime::getCurrent(); + queuedEvents = eventsInQueue(this->normalisedQueue); + } + + queuedEvents = std::min(queuedEvents, bufferedEvents); + + if (queuedEvents) { + epicsRingBytesGet(this->normalisedQueue, (char *)events, + queuedEvents * sizeof(NormalisedEvent)); + + std::sort(events, events + queuedEvents, oldestEventsFirst); + + epicsRingBytesPut(this->sortedQueue, (char *)events, + queuedEvents * sizeof(NormalisedEvent)); + } + } +} + +inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &ne) { + if (this->kafkaEnabled) { + if (ne.source == 0) + epicsRingBytesPut(this->monitorQueue, (char *)&ne, + sizeof(NormalisedEvent)); + else + epicsRingBytesPut(this->detectorQueue, (char *)&ne, + sizeof(NormalisedEvent)); + } +} + +void asynStreamGeneratorDriver::processEvents() { + + const char *functionName = "processEvents"; + + // x * number of ids * max events in packet * event size + int bufferedEvents = 5 * 10 * 243; + // we need a little extra space for merge sorting in + int extraBufferedEvents = 1 * 10 * 243; + + // we have two buffers. We alternate between reading data into one of them, + // and then merge sorting into the other + NormalisedEvent eventsABuffer[(bufferedEvents + extraBufferedEvents)]; + NormalisedEvent eventsBBuffer[(bufferedEvents + extraBufferedEvents)]; + + NormalisedEvent *eventsA = &eventsABuffer[0]; + NormalisedEvent *eventsB = &eventsBBuffer[0]; + NormalisedEvent *eventsBLastStart = eventsB + bufferedEvents; + NormalisedEvent *eventsBLastEnd = eventsBLastStart; + + int queuedEvents = 0; + + epicsTimeStamp lastProcess = epicsTime::getCurrent(); + epicsTimeStamp currentTime = lastProcess; + + epicsInt64 counts[this->num_channels]; + double elapsedSeconds = 0; + uint64_t startTimestamp = std::numeric_limits::max(); + uint64_t currTimestamp; + + epicsInt32 currStatus = STATUS_IDLE; + epicsInt32 prevStatus = STATUS_IDLE; + epicsInt32 countPreset; + epicsInt32 timePreset; + epicsInt32 presetChannel; + epicsInt32 udpQueueHighWaterMark = 0; + epicsInt32 sortedQueueHighWaterMark = 0; + + while (true) { + + queuedEvents = + eventsInQueue(this->sortedQueue); // in case we can't wait + lastProcess = epicsTime::getCurrent(); + currentTime = lastProcess; + + // wait for mininmum packet frequency or enough packets to ensure we + // could potentially have at least 1 packet per mcpdid + while (queuedEvents < bufferedEvents && + epicsTimeDiffInNS(¤tTime, &lastProcess) < 250'000'000ull) { + epicsThreadSleep(0.0001); // seconds + currentTime = epicsTime::getCurrent(); + queuedEvents = eventsInQueue(this->sortedQueue); + } + + getIntegerParam(this->P_Status, &currStatus); + + queuedEvents = std::min(queuedEvents, bufferedEvents); + + NormalisedEvent *newStartPtr = eventsA + extraBufferedEvents; + + // We read into the array, such that we have enough space, that the + // entirety of the leftover from the previous read can fit before this + // new read, in the case that all new events are newer timewise, and + // therefore, all events from eventsB have to be placed in a preceeding + // position. + epicsRingBytesGet(this->sortedQueue, (char *)newStartPtr, + queuedEvents * sizeof(NormalisedEvent)); + + int toProcess = + eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5; + + // TODO could also consider an in-place merge + eventsBLastEnd = std::merge(newStartPtr, newStartPtr + queuedEvents, + eventsBLastStart, eventsBLastEnd, eventsA, + oldestEventsFirst); + + eventsBLastStart = eventsA + toProcess; + + // TODO I haven't really taken care of the case that there are no events + + if (prevStatus == STATUS_IDLE && currStatus == STATUS_COUNTING) { + + getIntegerParam(this->P_CountPreset, &countPreset); + getIntegerParam(this->P_TimePreset, &timePreset); + getIntegerParam(this->P_MonitorChannel, &presetChannel); + + // reset status variables + startTimestamp = eventsA[0].timestamp; + elapsedSeconds = 0; + for (size_t i = 0; i < this->num_channels; ++i) { + counts[i] = 0; + } + } + + if (currStatus == STATUS_COUNTING) { + + // The elapsedSeconds are round differently depending on whether we + // are using them for comparison, or for showing to the user, to + // try and make sure the data we send to kafka is correct, while + // the measurement time also appears intuitive. + for (size_t i = 0; i < toProcess; ++i) { + counts[eventsA[i].source == 0 ? eventsA[i].pixelId + 1 : 0] += + 1; + elapsedSeconds = (eventsA[i].timestamp - startTimestamp) / 1e9; + + // TODO should really check there an no more events with the + // same final timestamp + if ((countPreset && counts[presetChannel] >= countPreset) || + (timePreset && elapsedSeconds > (double)timePreset)) + break; + + // TODO also batchwise? + this->queueForKafka(eventsA[i]); + } + + for (size_t i = 0; i < num_channels; ++i) { + setInteger64Param(P_Counts[i], counts[i]); + } + setDoubleParam(P_ElapsedTime, elapsedSeconds); + + if ((countPreset && counts[presetChannel] >= countPreset) || + (timePreset && elapsedSeconds > (double)timePreset)) { + setIntegerParam(this->P_Status, STATUS_IDLE); + setIntegerParam(this->P_CountPreset, 0); + setIntegerParam(this->P_TimePreset, 0); + } + } + + prevStatus = currStatus; + + std::swap(eventsA, eventsB); + } +} + +void asynStreamGeneratorDriver::produce(epicsRingBytesId eventQueue, + rd_kafka_t *kafkaProducer, + const char *topic, const char *source) { + + flatbuffers::FlatBufferBuilder builder(1024); + + const std::size_t bufferSize = this->kafkaMaxPacketSize + 16; + + std::vector tof; + tof.reserve(bufferSize); + + std::vector did; + did.reserve(bufferSize); + + epicsTimeStamp last_sent = epicsTime::getCurrent(); + epicsTimeStamp now = last_sent; + int total = 0; + uint64_t message_id = 0; + + NormalisedEvent ne; + + while (true) { + + if (!epicsRingBytesIsEmpty(eventQueue)) { + + ++total; + epicsRingBytesGet(eventQueue, (char *)&ne, sizeof(NormalisedEvent)); + tof.push_back(ne.timestamp); + did.push_back(ne.pixelId); + + } else { + epicsThreadSleep(0.001); // seconds + } + + now = epicsTime::getCurrent(); + + // At least every 0.2 seconds + if (total >= this->kafkaMaxPacketSize || + epicsTimeDiffInNS(&now, &last_sent) > 250'000'000ll) { + last_sent = epicsTime::getCurrent(); + + if (total) { + total = 0; + + builder.Clear(); + + auto message = CreateEventMessageDirect( + builder, source, message_id++, + ((uint64_t)now.secPastEpoch) * 1'000'000'000ull + + ((uint64_t)now.nsec), + &tof, &did); + + builder.Finish(message, "ev42"); + + rd_kafka_resp_err_t err = rd_kafka_producev( + kafkaProducer, RD_KAFKA_V_TOPIC(topic), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + // RD_KAFKA_V_KEY((void *)key, key_len), + RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(), + builder.GetSize()), + // RD_KAFKA_V_OPAQUE(NULL), + RD_KAFKA_V_END); + + if (err) { + epicsStdoutPrintf("Failed to produce to topic %s: %s\n", + topic, rd_kafka_err2str(err)); + } + + rd_kafka_poll(kafkaProducer, 0); + + tof.clear(); + did.clear(); + } + } + } +} + +void asynStreamGeneratorDriver::produceMonitor() { + this->produce(monitorQueue, monitorProducer, monitorTopic, "monitor"); +} + +void asynStreamGeneratorDriver::produceDetector() { + this->produce(detectorQueue, detectorProducer, detectorTopic, "detector"); +} + +/******************************************************************************* + * Methods exposed to IOC Shell + */ +extern "C" { + +asynStatus asynStreamGeneratorDriverConfigure( + const char *portName, const char *ipPortName, const int numChannels, + const int udpQueueSize, const char *kafkaBroker, const char *monitorTopic, + const char *detectorTopic, const int kafkaQueueSize, + const int kafkaMaxPacketSize) { + new asynStreamGeneratorDriver(portName, ipPortName, numChannels, + udpQueueSize, kafkaBroker[0], kafkaBroker, + monitorTopic, detectorTopic, kafkaQueueSize, + kafkaMaxPacketSize); + return asynSuccess; +} + +static const iocshArg initArg0 = {"portName", iocshArgString}; +static const iocshArg initArg1 = {"ipPortName", iocshArgString}; +static const iocshArg initArg2 = {"numChannels", iocshArgInt}; +static const iocshArg initArg3 = {"udpQueueSize", iocshArgInt}; +static const iocshArg initArg4 = {"kafkaBroker", iocshArgString}; +static const iocshArg initArg5 = {"monitorTopic", iocshArgString}; +static const iocshArg initArg6 = {"detectorTopic", iocshArgString}; +static const iocshArg initArg7 = {"kafkaQueueSize", iocshArgInt}; +static const iocshArg initArg8 = {"kafkaMaxPacketSize", iocshArgInt}; +static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2, + &initArg3, &initArg4, &initArg5, + &initArg6, &initArg7, &initArg8}; +static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 9, initArgs}; +static void initCallFunc(const iocshArgBuf *args) { + asynStreamGeneratorDriverConfigure( + args[0].sval, args[1].sval, args[2].ival, args[3].ival, args[4].sval, + args[5].sval, args[6].sval, args[7].ival, args[8].ival); +} + +void asynStreamGeneratorDriverRegister(void) { + iocshRegister(&initFuncDef, initCallFunc); +} + +epicsExportRegistrar(asynStreamGeneratorDriverRegister); +} diff --git a/src/asynStreamGeneratorDriver.dbd b/src/asynStreamGeneratorDriver.dbd new file mode 100644 index 0000000..d0df127 --- /dev/null +++ b/src/asynStreamGeneratorDriver.dbd @@ -0,0 +1 @@ +registrar("asynStreamGeneratorDriverRegister") diff --git a/src/asynStreamGeneratorDriver.h b/src/asynStreamGeneratorDriver.h new file mode 100644 index 0000000..3783374 --- /dev/null +++ b/src/asynStreamGeneratorDriver.h @@ -0,0 +1,187 @@ +#ifndef asynStreamGeneratorDriver_H +#define asynStreamGeneratorDriver_H + +#include "asynPortDriver.h" +#include +#include + +/******************************************************************************* + * UDP Packet Definitions + */ +struct __attribute__((__packed__)) UDPHeader { + uint16_t BufferLength; + uint16_t BufferType; + uint16_t HeaderLength; + uint16_t BufferNumber; + uint16_t RunCmdID; + uint16_t Status : 8; + uint16_t McpdID : 8; + uint16_t TimeStamp[3]; + uint16_t Parameter0[3]; + uint16_t Parameter1[3]; + uint16_t Parameter2[3]; + uint16_t Parameter3[3]; + + inline uint64_t nanosecs() const { + uint64_t nsec{((uint64_t)TimeStamp[2]) << 32 | + ((uint64_t)TimeStamp[1]) << 16 | (uint64_t)TimeStamp[0]}; + return nsec * 100; + } +}; + +struct __attribute__((__packed__)) DetectorEvent { + uint64_t TimeStamp : 19; + uint16_t XPosition : 10; + uint16_t YPosition : 10; + uint16_t Amplitude : 8; + uint16_t Id : 1; + inline uint32_t nanosecs() const { return TimeStamp * 100; } + inline uint64_t pixelId(uint32_t mpcdId) const { + const uint32_t x_pixels = 128; + const uint32_t y_pixels = 128; + return (mpcdId - 1) * x_pixels * y_pixels + + x_pixels * (uint32_t)this->XPosition + (uint32_t)this->YPosition; + } +}; + +struct __attribute__((__packed__)) MonitorEvent { + uint64_t TimeStamp : 19; + uint64_t Data : 21; + uint64_t DataID : 4; + uint64_t TriggerID : 3; + uint64_t Id : 1; + inline uint32_t nanosecs() const { return TimeStamp * 100; } +}; + +/******************************************************************************* + * Simplified Event Struct Definition + */ + +struct __attribute__((__packed__)) NormalisedEvent { + uint64_t timestamp; + uint32_t pixelId : 24; + uint8_t source; + + // inline NormalisedEvent(uint64_t timestamp, uint8_t source, uint32_t + // pixelId) + // : timestamp(timestamp), source(source), pixelId(pixelId){}; +}; + +/******************************************************************************* + * Status values that should match the definition in db/daq_common.db + */ +#define STATUS_IDLE 0 +#define STATUS_COUNTING 1 +#define STATUS_LOWRATE 2 +#define STATUS_PAUSED 3 + +/******************************************************************************* + * Parameters for use in DB records + * + * i.e.e drvInfo strings that are used to identify the parameters + */ + +#define P_StatusString "STATUS" +#define P_ResetString "RESET" +#define P_StopString "STOP" +#define P_CountPresetString "P_CNT" +#define P_TimePresetString "P_TIME" +#define P_ElapsedTimeString "TIME" +#define P_ClearElapsedTimeString "C_TIME" +#define P_MonitorChannelString "MONITOR" +#define P_ThresholdString "THRESH" +#define P_ThresholdChannelString "THRESH_CH" + +#define P_CountsString "COUNTS%d" +#define P_RateString "RATE%d" +#define P_ClearCountsString "C_%d" + +#define P_UdpDroppedString "DROP" +#define P_UdpQueueHighWaterMarkString "UDP" +#define P_SortedQueueHighWaterMarkString "SORT" + +/******************************************************************************* + * Stream Generator Coordinating Class + */ +class asynStreamGeneratorDriver : public asynPortDriver { + public: + asynStreamGeneratorDriver(const char *portName, const char *ipPortName, + const int numChannels, const int udpQueueSize, + const bool enableKafkaStream, + const char *kafkaBroker, const char *monitorTopic, + const char *detectorTopic, + const int kafkaQueueSize, + const int kafkaMaxPacketSize); + virtual ~asynStreamGeneratorDriver(); + + virtual asynStatus readInt32(asynUser *pasynUser, epicsInt32 *value); + virtual asynStatus writeInt32(asynUser *pasynUser, epicsInt32 value); + + void receiveUDP(); + void normaliseUDP(); + void partialSortEvents(); + void processEvents(); + void produceMonitor(); + void produceDetector(); + + protected: + // Parameter Identifying IDs + int P_Status; + int P_Reset; + int P_Stop; + int P_CountPreset; + int P_TimePreset; + int P_ElapsedTime; + int P_ClearElapsedTime; + int P_MonitorChannel; + int P_Threshold; + int P_ThresholdChannel; + int *P_Counts; + int *P_Rates; + int *P_ClearCounts; + + // System Status Parameter Identifying IDs + int P_UdpDropped; + int P_UdpQueueHighWaterMark; + int P_SortedQueueHighWaterMark; + + private: + asynUser *pasynUDPUser; + epicsEventId pausedEventId; + + const int num_channels; + const bool kafkaEnabled; + const int kafkaQueueSize; + const int kafkaMaxPacketSize; + + const int udpQueueSize; + epicsRingBytesId udpQueue; + epicsRingBytesId normalisedQueue; + epicsRingBytesId sortedQueue; + + epicsRingBytesId monitorQueue; + rd_kafka_t *monitorProducer; + const char *monitorTopic; + + epicsRingBytesId detectorQueue; + rd_kafka_t *detectorProducer; + const char *detectorTopic; + + constexpr static char *driverName = "StreamGenerator"; + + asynStatus createInt32Param(asynStatus status, char *name, int *variable, + epicsInt32 initialValue = 0); + + asynStatus createInt64Param(asynStatus status, char *name, int *variable, + epicsInt64 initialValue = 0); + + asynStatus createFloat64Param(asynStatus status, char *name, int *variable, + double initialValue = 0); + + inline void queueForKafka(NormalisedEvent &ne); + + void produce(epicsRingBytesId eventQueue, rd_kafka_t *kafkaProducer, + const char *topic, const char *source); +}; + +#endif diff --git a/udp_rate.py b/udp_rate.py deleted file mode 100644 index 9e6471f..0000000 --- a/udp_rate.py +++ /dev/null @@ -1,342 +0,0 @@ -import queue -import socket -import time -import threading -from uuid import uuid4 -import math - -from confluent_kafka import Producer -import streaming_data_types - -# receiving directly (can also specify correlation unit ip) -UDP_IP = "" -UDP_PORT = 54321 - -# If redirecting traffic via -# socat -U - udp4-recv:54321 | tee >( socat -u - udp4-datagram:127.0.0.1:54322 ) | socat -u - udp4-datagram:127.0.0.1:54323 -# UDP_IP = "127.0.0.1" -# UDP_PORT = 54323 - -WINDOWSECONDS = 5 -WINDOWSIZE = 20000 * WINDOWSECONDS -MONITORS = 4 # We have max 4 monitors - -time_offset = None # Estimate of clock offset - -time_window = { - i: queue.Queue(maxsize=WINDOWSIZE) - for i in range(MONITORS) -} - -# event_time_window = queue.Queue(maxsize=50000 * WINDOWSECONDS) -EVENT_WINDOWSIZE = 50000 -EVENT_WINDOW_PTR = 0 -event_time_window = [0 for i in range(EVENT_WINDOWSIZE)] - -event_average_rate = 0 -event_last_timestamp = None - -MISSED_PACKETS = -9 # All modules appear to miss the first time due to initialisation as 0 - -# missed_packets_time_window = queue.Queue(maxsize=100) - -def print_monitor_rates(): - while True: - for i in range(MONITORS): - msg = f"Monitor {i+1}: {time_window[i].qsize() / WINDOWSECONDS} cts/s" - try: - earliest = time_window[i].queue[0] - newest = max(time_window[i].queue) - t = time.time() - msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s' - except: - pass - - print(msg) - - # try: - # print(f'Events: {1 / event_average_rate} cts/s') - # except: - # pass - - try: - print(f'Events: {round(1 / (sum(event_time_window) / EVENT_WINDOWSIZE * 1e-7), 2)} cts/s') - except: - pass - - print(f'Missed Packets: {MISSED_PACKETS}') - - # Detector Events - # msg = f"Events : {event_time_window.qsize() / WINDOWSECONDS} cts/s" - # try: - # earliest = event_time_window.queue[0] - # newest = max(event_time_window.queue) - # t = time.time() - # msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s' - # except: - # pass - - # print(msg) - - time.sleep(1) - -threading.Thread(target=print_monitor_rates, daemon=True).start() - -def clean_monitor_rates(): - latest = 0 - while True: - for d_id in range(MONITORS): - t_w = time_window[d_id] - if not t_w.empty(): - # TODO probably should switch to a priority queue - # as the messages might not be in order - # TODO could also just replace with a low-pass filter - # would be a lot more efficient - # TODO the way this is done, we need trigger events - # in order for the signal to decay back to 0. - # If no events come, the rate remains stuck - latest = max(latest, max(t_w.queue)) - # latest = time_window[1].queue[-1] - try: - while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7): - t_w.get_nowait() - except IndexError: - pass - time.sleep(0.01) - -threading.Thread(target=clean_monitor_rates, daemon=True).start() - - -# def clean_event_rates(): -# latest = 0 -# while True: -# t_w = event_time_window -# if not t_w.empty(): -# # TODO probably should switch to a priority queue -# # as the messages might not be in order -# # TODO could also just replace with a low-pass filter -# # would be a lot more efficient -# # TODO the way this is done, we need trigger events -# # in order for the signal to decay back to 0. -# # If no events come, the rate remains stuck -# #latest = max(latest, max(t_w.queue)) -# try: -# latest = time_window[1].queue[-1] -# while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7): -# t_w.get_nowait() -# except IndexError: -# pass -# time.sleep(0.005) -# -# threading.Thread(target=clean_event_rates, daemon=True).start() - - - - -# Event Kafka Producer - -event_queue = queue.Queue() - -def event_producer(): - producer_config = { - 'bootstrap.servers': "linkafka01:9092", - 'queue.buffering.max.messages': 1e7, - } - prod = Producer(producer_config) - - st = time.time() - - msg_id = 0 - - b_size = 10000 - b_ptr = 0 - pixel_buffer = [0 for _ in range(b_size)] - time_buffer = [0 for _ in range(b_size)] - poll_cnt = 0 - - while True: - (p_id, timestamp) = event_queue.get() - - pixel_buffer[b_ptr] = p_id - time_buffer[b_ptr] = timestamp - b_ptr += 1 - - nt = time.time() - if b_ptr == b_size or nt - st > 0.001: - st = nt - - if b_ptr > 0: - message = streaming_data_types.serialise_ev42( - message_id = msg_id, - pulse_time = time_buffer[0] * 100, # int(time.time() * 1_000_000_000), - time_of_flight = time_buffer[0:b_ptr], - detector_id = pixel_buffer[0:b_ptr], - source_name = '', - ) - - msg_id = (msg_id + 1) % 100000000 - b_ptr = 0 - - prod.produce( - topic = "DMC_detector", - value = message, - partition = 0, - ) - - # if poll_cnt % 1000 == 0: - prod.poll(0) - poll_cnt = (poll_cnt + 1) % 1000 - -threading.Thread(target=event_producer, daemon=True).start() - -# Monitor Kafka Producer - -monitor_queue = queue.Queue() - -def monitor_producer(): - producer_config = { - 'bootstrap.servers': "linkafka01:9092", - 'queue.buffering.max.messages': 1e7, - } - prod = Producer(producer_config) - - monitor_buffer = [0 for i in range(MONITORS)] - monitor_time = [0 for i in range(MONITORS)] - - st = time.time() - - poll_cnt = 0 - - while True: - (d_id, timestamp) = monitor_queue.get() - - monitor_buffer[d_id] += 1 - monitor_time[d_id] = timestamp - - nt = time.time() - if nt - st > 0.05: - st = nt - - for i in range(MONITORS): - if monitor_buffer[d_id]: - message = streaming_data_types.serialise_f142( - source_name = f"monitor{d_id+1}", - value = monitor_buffer[d_id], - # ns resolution (supposed to be past epoch, not what the detector returns though) - timestamp_unix_ns = monitor_time[d_id] * 100 # send time of last monitor - ) - - prod.produce( - topic = "DMC_neutron_monitor", - value = message, - partition = 0, - ) - - monitor_buffer[d_id] = 0 - - if poll_cnt % 1000 == 0: - prod.poll(0) - poll_cnt = (poll_cnt + 1) % 1000 - -threading.Thread(target=monitor_producer, daemon=True).start() - - -sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) -sock.bind((UDP_IP, UDP_PORT)) - -val = 0 -start_time = time.time() - -module_counts = [0 for i in range(10)] - -EVENTS = 0 - -while True: - data, addr = sock.recvfrom(2056) # Buffer size is 1024 bytes - raw_header = data[:42] - raw_data = data[42:] - - (buffer_length, buffer_type, header_length, - buffer_number, run_id, mcpd_status, - t_low, t_mid, t_high, *_) = memoryview(raw_header).cast('H') - mcpd_id = ( mcpd_status >> 8 ) & 0xff - mcpd_status = ( mcpd_status ) & 0x3 - running_msg = "running" if (mcpd_status & 0x1) else "stopped" - sync_msg = "in sync" if (mcpd_status & 0x2) else "sync error" - timestamp = ( t_high << 32 ) | ( t_mid << 16 ) | t_low # 100 ns resolution - #print(f'Packet {int(timestamp * 1e-7)}s => buffer: {buffer_number}, length: {int(buffer_length*2/6)} events, status: {mcpd_status} {mcpd_id} {running_msg} with {sync_msg}') - # print(f'Packet => buffer: {mcpd_id}-{buffer_number}, length: {int((buffer_length-21)/3)} events, status: {mcpd_status}') - - if time_offset is None: - time_offset = time.time() * 1e7 - timestamp - - if buffer_number - module_counts[mcpd_id] != 1: - MISSED_PACKETS += 1 - # if missed_packets_time_window.full(): - # missed_packets_time_window.get_nowait() - # missed_packets_time_window.put(timestamp) - - module_counts[mcpd_id] = buffer_number - - for i in range(0, len(raw_data), 6): - event = memoryview(raw_data)[i:i+6] - event_type = event[5] >> 7 - # print(event_type) - - if event_type: # Trigger Event - t_id = ( event[5] >> 4 ) & 0x7 - d_id = event[5] & 0xf - event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0] - # print(f'Trigger event {event_timestamp * 1e-7}s => TrigID: {t_id}, DataID: {d_id}') - - t_w = time_window[d_id] - t_w.put_nowait(event_timestamp) - - monitor_queue.put_nowait((d_id, event_timestamp)) - - else: # Neutron Event - x_pixels = 128 - y_pixels = 128 - amplitude = ( event[5] << 1 ) | ( event[4] >> 7 ) - - # The DMC StreamHistogrammer setup currently expects each module to - # be 128 * 128 pixels but the resolution in the packages is - # actually 10bit. We remove the lowest 3 bits. - x = (( (event[3] & 0x1f) << 5 | (event[2] & 0xf8) >> 3 ) & 0x3ff) >> 3 - y = (( (event[4] & 0x7f) << 3 | (event[3] & 0xe0) >> 5 ) & 0x3ff) >> 3 - event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0] - # print(f'Neutron event {event_timestamp * 1e-7}s: {amplitude}, x: {x}, y: {y}') - - - if event_last_timestamp is None: - event_last_timestamp = event_timestamp - - # Seems like at higher frequencies these come very much out of order - # so this is very approximate - event_time_window[EVENT_WINDOW_PTR] = event_timestamp - event_last_timestamp - EVENT_WINDOW_PTR = (EVENT_WINDOW_PTR + 1) % EVENT_WINDOWSIZE - event_last_timestamp = event_timestamp - - # I suppose this doesn't work mostly due to the timestamps ordering... - # event_timestamp_seconds = event_timestamp * 1e-7 - # if event_last_timestamp is None: - # event_last_timestamp = event_timestamp_seconds - - # f_cutoff = 1e6 # Hz - # tau = 1 / ( 2 * math.pi * f_cutoff) - # dt = event_timestamp_seconds - event_last_timestamp - # if dt > 0: - # w = math.exp(-dt / tau) - # event_average_rate = w * dt + event_average_rate * (1 - w) - # event_last_timestamp = event_timestamp_seconds - - # EVENTS += 1 - # a = (mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y - # print((EVENTS, x, y, a, a < 128 * 128 * 9, mcpd_id)) - # if not a < 128 * 128 * 9: - # print((event[3], event[3] << 5, event[2], event[2] >> 3)) - - event_queue.put_nowait(( - (mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y, - event_timestamp - ))