less_logic_in_udp_thread #1
245
.clang-format
Normal file
245
.clang-format
Normal file
@@ -0,0 +1,245 @@
|
|||||||
|
---
|
||||||
|
Language: Cpp
|
||||||
|
# BasedOnStyle: LLVM
|
||||||
|
AccessModifierOffset: -2
|
||||||
|
AlignAfterOpenBracket: Align
|
||||||
|
AlignArrayOfStructures: None
|
||||||
|
AlignConsecutiveAssignments:
|
||||||
|
Enabled: false
|
||||||
|
AcrossEmptyLines: false
|
||||||
|
AcrossComments: false
|
||||||
|
AlignCompound: false
|
||||||
|
AlignFunctionPointers: false
|
||||||
|
PadOperators: true
|
||||||
|
AlignConsecutiveBitFields:
|
||||||
|
Enabled: false
|
||||||
|
AcrossEmptyLines: false
|
||||||
|
AcrossComments: false
|
||||||
|
AlignCompound: false
|
||||||
|
AlignFunctionPointers: false
|
||||||
|
PadOperators: false
|
||||||
|
AlignConsecutiveDeclarations:
|
||||||
|
Enabled: false
|
||||||
|
AcrossEmptyLines: false
|
||||||
|
AcrossComments: false
|
||||||
|
AlignCompound: false
|
||||||
|
AlignFunctionPointers: false
|
||||||
|
PadOperators: false
|
||||||
|
AlignConsecutiveMacros:
|
||||||
|
Enabled: false
|
||||||
|
AcrossEmptyLines: false
|
||||||
|
AcrossComments: false
|
||||||
|
AlignCompound: false
|
||||||
|
AlignFunctionPointers: false
|
||||||
|
PadOperators: false
|
||||||
|
AlignConsecutiveShortCaseStatements:
|
||||||
|
Enabled: false
|
||||||
|
AcrossEmptyLines: false
|
||||||
|
AcrossComments: false
|
||||||
|
AlignCaseColons: false
|
||||||
|
AlignEscapedNewlines: Right
|
||||||
|
AlignOperands: Align
|
||||||
|
AlignTrailingComments:
|
||||||
|
Kind: Always
|
||||||
|
OverEmptyLines: 0
|
||||||
|
AllowAllArgumentsOnNextLine: true
|
||||||
|
AllowAllParametersOfDeclarationOnNextLine: true
|
||||||
|
AllowBreakBeforeNoexceptSpecifier: Never
|
||||||
|
AllowShortBlocksOnASingleLine: Never
|
||||||
|
AllowShortCaseLabelsOnASingleLine: false
|
||||||
|
AllowShortCompoundRequirementOnASingleLine: true
|
||||||
|
AllowShortEnumsOnASingleLine: true
|
||||||
|
AllowShortFunctionsOnASingleLine: All
|
||||||
|
AllowShortIfStatementsOnASingleLine: Never
|
||||||
|
AllowShortLambdasOnASingleLine: All
|
||||||
|
AllowShortLoopsOnASingleLine: false
|
||||||
|
AlwaysBreakAfterDefinitionReturnType: None
|
||||||
|
AlwaysBreakAfterReturnType: None
|
||||||
|
AlwaysBreakBeforeMultilineStrings: false
|
||||||
|
AlwaysBreakTemplateDeclarations: MultiLine
|
||||||
|
AttributeMacros:
|
||||||
|
- __capability
|
||||||
|
BinPackArguments: true
|
||||||
|
BinPackParameters: true
|
||||||
|
BitFieldColonSpacing: Both
|
||||||
|
BraceWrapping:
|
||||||
|
AfterCaseLabel: false
|
||||||
|
AfterClass: false
|
||||||
|
AfterControlStatement: Never
|
||||||
|
AfterEnum: false
|
||||||
|
AfterExternBlock: false
|
||||||
|
AfterFunction: false
|
||||||
|
AfterNamespace: false
|
||||||
|
AfterObjCDeclaration: false
|
||||||
|
AfterStruct: false
|
||||||
|
AfterUnion: false
|
||||||
|
BeforeCatch: false
|
||||||
|
BeforeElse: false
|
||||||
|
BeforeLambdaBody: false
|
||||||
|
BeforeWhile: false
|
||||||
|
IndentBraces: false
|
||||||
|
SplitEmptyFunction: true
|
||||||
|
SplitEmptyRecord: true
|
||||||
|
SplitEmptyNamespace: true
|
||||||
|
BreakAdjacentStringLiterals: true
|
||||||
|
BreakAfterAttributes: Leave
|
||||||
|
BreakAfterJavaFieldAnnotations: false
|
||||||
|
BreakArrays: true
|
||||||
|
BreakBeforeBinaryOperators: None
|
||||||
|
BreakBeforeConceptDeclarations: Always
|
||||||
|
BreakBeforeBraces: Attach
|
||||||
|
BreakBeforeInlineASMColon: OnlyMultiline
|
||||||
|
BreakBeforeTernaryOperators: true
|
||||||
|
BreakConstructorInitializers: BeforeColon
|
||||||
|
BreakInheritanceList: BeforeColon
|
||||||
|
BreakStringLiterals: true
|
||||||
|
ColumnLimit: 80
|
||||||
|
CommentPragmas: '^ IWYU pragma:'
|
||||||
|
CompactNamespaces: false
|
||||||
|
ConstructorInitializerIndentWidth: 4
|
||||||
|
ContinuationIndentWidth: 4
|
||||||
|
Cpp11BracedListStyle: true
|
||||||
|
DerivePointerAlignment: false
|
||||||
|
DisableFormat: false
|
||||||
|
EmptyLineAfterAccessModifier: Never
|
||||||
|
EmptyLineBeforeAccessModifier: LogicalBlock
|
||||||
|
ExperimentalAutoDetectBinPacking: false
|
||||||
|
FixNamespaceComments: true
|
||||||
|
ForEachMacros:
|
||||||
|
- foreach
|
||||||
|
- Q_FOREACH
|
||||||
|
- BOOST_FOREACH
|
||||||
|
IfMacros:
|
||||||
|
- KJ_IF_MAYBE
|
||||||
|
IncludeBlocks: Preserve
|
||||||
|
IncludeCategories:
|
||||||
|
- Regex: '^"(llvm|llvm-c|clang|clang-c)/'
|
||||||
|
Priority: 2
|
||||||
|
SortPriority: 0
|
||||||
|
CaseSensitive: false
|
||||||
|
- Regex: '^(<|"(gtest|gmock|isl|json)/)'
|
||||||
|
Priority: 3
|
||||||
|
SortPriority: 0
|
||||||
|
CaseSensitive: false
|
||||||
|
- Regex: '.*'
|
||||||
|
Priority: 1
|
||||||
|
SortPriority: 0
|
||||||
|
CaseSensitive: false
|
||||||
|
IncludeIsMainRegex: '(Test)?$'
|
||||||
|
IncludeIsMainSourceRegex: ''
|
||||||
|
IndentAccessModifiers: false
|
||||||
|
IndentCaseBlocks: false
|
||||||
|
IndentCaseLabels: false
|
||||||
|
IndentExternBlock: AfterExternBlock
|
||||||
|
IndentGotoLabels: true
|
||||||
|
IndentPPDirectives: None
|
||||||
|
IndentRequiresClause: true
|
||||||
|
IndentWidth: 4
|
||||||
|
IndentWrappedFunctionNames: false
|
||||||
|
InsertBraces: false
|
||||||
|
InsertNewlineAtEOF: false
|
||||||
|
InsertTrailingCommas: None
|
||||||
|
IntegerLiteralSeparator:
|
||||||
|
Binary: 0
|
||||||
|
BinaryMinDigits: 0
|
||||||
|
Decimal: 0
|
||||||
|
DecimalMinDigits: 0
|
||||||
|
Hex: 0
|
||||||
|
HexMinDigits: 0
|
||||||
|
JavaScriptQuotes: Leave
|
||||||
|
JavaScriptWrapImports: true
|
||||||
|
KeepEmptyLinesAtTheStartOfBlocks: true
|
||||||
|
KeepEmptyLinesAtEOF: false
|
||||||
|
LambdaBodyIndentation: Signature
|
||||||
|
LineEnding: DeriveLF
|
||||||
|
MacroBlockBegin: ''
|
||||||
|
MacroBlockEnd: ''
|
||||||
|
MaxEmptyLinesToKeep: 1
|
||||||
|
NamespaceIndentation: None
|
||||||
|
ObjCBinPackProtocolList: Auto
|
||||||
|
ObjCBlockIndentWidth: 2
|
||||||
|
ObjCBreakBeforeNestedBlockParam: true
|
||||||
|
ObjCSpaceAfterProperty: false
|
||||||
|
ObjCSpaceBeforeProtocolList: true
|
||||||
|
PackConstructorInitializers: BinPack
|
||||||
|
PenaltyBreakAssignment: 2
|
||||||
|
PenaltyBreakBeforeFirstCallParameter: 19
|
||||||
|
PenaltyBreakComment: 300
|
||||||
|
PenaltyBreakFirstLessLess: 120
|
||||||
|
PenaltyBreakOpenParenthesis: 0
|
||||||
|
PenaltyBreakScopeResolution: 500
|
||||||
|
PenaltyBreakString: 1000
|
||||||
|
PenaltyBreakTemplateDeclaration: 10
|
||||||
|
PenaltyExcessCharacter: 1000000
|
||||||
|
PenaltyIndentedWhitespace: 0
|
||||||
|
PenaltyReturnTypeOnItsOwnLine: 60
|
||||||
|
PointerAlignment: Right
|
||||||
|
PPIndentWidth: -1
|
||||||
|
QualifierAlignment: Leave
|
||||||
|
ReferenceAlignment: Pointer
|
||||||
|
ReflowComments: true
|
||||||
|
RemoveBracesLLVM: false
|
||||||
|
RemoveParentheses: Leave
|
||||||
|
RemoveSemicolon: false
|
||||||
|
RequiresClausePosition: OwnLine
|
||||||
|
RequiresExpressionIndentation: OuterScope
|
||||||
|
SeparateDefinitionBlocks: Leave
|
||||||
|
ShortNamespaceLines: 1
|
||||||
|
SkipMacroDefinitionBody: false
|
||||||
|
SortIncludes: CaseSensitive
|
||||||
|
SortJavaStaticImport: Before
|
||||||
|
SortUsingDeclarations: LexicographicNumeric
|
||||||
|
SpaceAfterCStyleCast: false
|
||||||
|
SpaceAfterLogicalNot: false
|
||||||
|
SpaceAfterTemplateKeyword: true
|
||||||
|
SpaceAroundPointerQualifiers: Default
|
||||||
|
SpaceBeforeAssignmentOperators: true
|
||||||
|
SpaceBeforeCaseColon: false
|
||||||
|
SpaceBeforeCpp11BracedList: false
|
||||||
|
SpaceBeforeCtorInitializerColon: true
|
||||||
|
SpaceBeforeInheritanceColon: true
|
||||||
|
SpaceBeforeJsonColon: false
|
||||||
|
SpaceBeforeParens: ControlStatements
|
||||||
|
SpaceBeforeParensOptions:
|
||||||
|
AfterControlStatements: true
|
||||||
|
AfterForeachMacros: true
|
||||||
|
AfterFunctionDefinitionName: false
|
||||||
|
AfterFunctionDeclarationName: false
|
||||||
|
AfterIfMacros: true
|
||||||
|
AfterOverloadedOperator: false
|
||||||
|
AfterPlacementOperator: true
|
||||||
|
AfterRequiresInClause: false
|
||||||
|
AfterRequiresInExpression: false
|
||||||
|
BeforeNonEmptyParentheses: false
|
||||||
|
SpaceBeforeRangeBasedForLoopColon: true
|
||||||
|
SpaceBeforeSquareBrackets: false
|
||||||
|
SpaceInEmptyBlock: false
|
||||||
|
SpacesBeforeTrailingComments: 1
|
||||||
|
SpacesInAngles: Never
|
||||||
|
SpacesInContainerLiterals: true
|
||||||
|
SpacesInLineCommentPrefix:
|
||||||
|
Minimum: 1
|
||||||
|
Maximum: -1
|
||||||
|
SpacesInParens: Never
|
||||||
|
SpacesInParensOptions:
|
||||||
|
InCStyleCasts: false
|
||||||
|
InConditionalStatements: false
|
||||||
|
InEmptyParentheses: false
|
||||||
|
Other: false
|
||||||
|
SpacesInSquareBrackets: false
|
||||||
|
Standard: Latest
|
||||||
|
StatementAttributeLikeMacros:
|
||||||
|
- Q_EMIT
|
||||||
|
StatementMacros:
|
||||||
|
- Q_UNUSED
|
||||||
|
- QT_REQUIRE_VERSION
|
||||||
|
TabWidth: 8
|
||||||
|
UseTab: Never
|
||||||
|
VerilogBreakBetweenInstancePorts: true
|
||||||
|
WhitespaceSensitiveMacros:
|
||||||
|
- BOOST_PP_STRINGIZE
|
||||||
|
- CF_SWIFT_NAME
|
||||||
|
- NS_SWIFT_NAME
|
||||||
|
- PP_STRINGIZE
|
||||||
|
- STRINGIZE
|
||||||
|
...
|
||||||
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
O.*
|
||||||
|
.*ignore
|
||||||
|
schemas/
|
||||||
6
.gitmodules
vendored
Normal file
6
.gitmodules
vendored
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
[submodule "dep/streaming-data-types"]
|
||||||
|
path = dep/streaming-data-types
|
||||||
|
url = git@gitea.psi.ch:lin-controls/streaming-data-types.git
|
||||||
|
[submodule "dep/flatbuffers"]
|
||||||
|
path = dep/flatbuffers
|
||||||
|
url = git@gitea.psi.ch:lin-controls/flatbuffers.git
|
||||||
30
Makefile
Normal file
30
Makefile
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
# Include the external Makefile
|
||||||
|
include /ioc/tools/driver.makefile
|
||||||
|
|
||||||
|
MODULE=StreamGenerator
|
||||||
|
BUILDCLASSES=Linux
|
||||||
|
EPICS_VERSIONS=7.0.7
|
||||||
|
#ARCH_FILTER=RHEL%
|
||||||
|
ARCH_FILTER=linux-x86_64
|
||||||
|
|
||||||
|
# Additional module dependencies
|
||||||
|
REQUIRED+=asyn
|
||||||
|
|
||||||
|
DBDS += src/asynStreamGeneratorDriver.dbd
|
||||||
|
|
||||||
|
# DB files to include in the release
|
||||||
|
TEMPLATES += db/channels.db db/daq_common.db
|
||||||
|
|
||||||
|
# HEADERS += src/asynStreamGeneratorDriver.h
|
||||||
|
|
||||||
|
# Source files to build
|
||||||
|
SOURCES += src/asynStreamGeneratorDriver.cpp
|
||||||
|
|
||||||
|
# I don't think specifying the optimisation level like this is correct...
|
||||||
|
# but I doesn't hurt :D
|
||||||
|
USR_CFLAGS += -O3 -Wall -Wextra -Wunused-result -Werror -fvisibility=hidden # -Wpedantic // Does not work because EPICS macros trigger warnings
|
||||||
|
|
||||||
|
# Required to support EV42/44
|
||||||
|
USR_CXXFLAGS += -O3 -I../dep/flatbuffers/include/ -I../schemas
|
||||||
|
|
||||||
|
LIB_SYS_LIBS += rdkafka
|
||||||
41
README.md
Normal file
41
README.md
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
# StreamGenerator
|
||||||
|
|
||||||
|
Clone the repository to a local directory via:
|
||||||
|
|
||||||
|
```
|
||||||
|
git clone --recurse-submodules -j8 git@gitea.psi.ch:lin-instrument-computers/StreamGenerator.git
|
||||||
|
```
|
||||||
|
|
||||||
|
## Dependencies
|
||||||
|
|
||||||
|
Currently, this project requires a system install of librdkafka. On Redhat,
|
||||||
|
this means you should run:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
dnf install -y librdkafka-devel
|
||||||
|
```
|
||||||
|
|
||||||
|
Additionally, you must first build Google's *flatbuffers* and ESS's
|
||||||
|
**streaming-data-types** libraries, which are both included in this project as
|
||||||
|
submodules under the `dep` directory and which are both necessary to build this
|
||||||
|
project.
|
||||||
|
|
||||||
|
First, you should enter the *flatbuffers* directory and run the following:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cmake -G "Unix Makefiles"
|
||||||
|
make -j
|
||||||
|
```
|
||||||
|
|
||||||
|
After these steps, you will find the program `flatc` has been built and placed
|
||||||
|
in the directory.
|
||||||
|
|
||||||
|
Next, you should return to the top of this project's directory tree, and create
|
||||||
|
the flatbuffers from ESS's schema files. This you can do as follows:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
./dep/flatbuffers/flatc -o schemas/ --cpp --gen-mutable --gen-name-strings --scoped-enums ./dep/streaming-data-types/schemas/*
|
||||||
|
```
|
||||||
|
|
||||||
|
This generates header files from each of ESS's schemas and places them in a
|
||||||
|
schemas directory.
|
||||||
75
db/channels.db
Normal file
75
db/channels.db
Normal file
@@ -0,0 +1,75 @@
|
|||||||
|
# EPICS Database for streamdevice specific to measurement channels
|
||||||
|
#
|
||||||
|
# Macros
|
||||||
|
# INSTR - Prefix
|
||||||
|
# NAME - the device name, e.g. EL737
|
||||||
|
# PORT - StreamGenerator Port
|
||||||
|
# CHANNEL - the number associated with the measurment channel
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
# Status Variables
|
||||||
|
|
||||||
|
# Trigger a change in status as clearing
|
||||||
|
record(bo, "$(INSTR)$(NAME):T$(CHANNEL)")
|
||||||
|
{
|
||||||
|
field(DESC, "Trigger Clearing Status")
|
||||||
|
field(VAL, 1)
|
||||||
|
field(OUT, "$(INSTR)$(NAME):S$(CHANNEL) PP")
|
||||||
|
}
|
||||||
|
|
||||||
|
# Trigger a change in status as value returned to 0
|
||||||
|
record(seq, "$(INSTR)$(NAME):O$(CHANNEL)")
|
||||||
|
{
|
||||||
|
field(DESC, "Trigger Returned to 0 Status")
|
||||||
|
field(LNK0, "$(INSTR)$(NAME):S$(CHANNEL) PP")
|
||||||
|
field(DO0, 0)
|
||||||
|
field(SELM, "Specified")
|
||||||
|
field(SELL, "$(INSTR)$(NAME):M$(CHANNEL).VAL")
|
||||||
|
field(SCAN, ".1 second")
|
||||||
|
}
|
||||||
|
|
||||||
|
# Current Status of Channel, i.e. is it ready to count?
|
||||||
|
record(bi, "$(INSTR)$(NAME):S$(CHANNEL)")
|
||||||
|
{
|
||||||
|
field(DESC, "Channel Status")
|
||||||
|
field(VAL, 0)
|
||||||
|
field(ZNAM, "OK")
|
||||||
|
field(ONAM, "CLEARING")
|
||||||
|
field(PINI, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
# Count Commands
|
||||||
|
|
||||||
|
record(longout, "$(INSTR)$(NAME):C$(CHANNEL)")
|
||||||
|
{
|
||||||
|
field(DESC, "Clear the current channel count")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) C_$(CHANNEL)")
|
||||||
|
field(FLNK, "$(INSTR)$(NAME):T$(CHANNEL)")
|
||||||
|
}
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
# Read all monitors values
|
||||||
|
|
||||||
|
record(int64in, "$(INSTR)$(NAME):M$(CHANNEL)")
|
||||||
|
{
|
||||||
|
field(DESC, "DAQ CH$(CHANNEL)")
|
||||||
|
field(EGU, "cts")
|
||||||
|
field(DTYP, "asynInt64")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) COUNTS$(CHANNEL)")
|
||||||
|
# This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order
|
||||||
|
# field(SCAN, "I/O Intr")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(ai, "$(INSTR)$(NAME):R$(CHANNEL)")
|
||||||
|
{
|
||||||
|
field(DESC, "Rate of DAQ CH$(CHANNEL)")
|
||||||
|
field(EGU, "cts/sec")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) RATE$(CHANNEL)")
|
||||||
|
field(SCAN, ".2 second")
|
||||||
|
# field(SCAN, "I/O Intr")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
277
db/daq_common.db
Normal file
277
db/daq_common.db
Normal file
@@ -0,0 +1,277 @@
|
|||||||
|
# EPICS Database for streamdevice specific to measurement channels
|
||||||
|
#
|
||||||
|
# Macros
|
||||||
|
# INSTR - Prefix
|
||||||
|
# NAME - the device name, e.g. EL737
|
||||||
|
# PORT - StreamGenerator Port
|
||||||
|
|
||||||
|
record(longout, "$(INSTR)$(NAME):FULL-RESET")
|
||||||
|
{
|
||||||
|
field(DESC, "Reset the DAQ")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) RESET")
|
||||||
|
}
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
# Status Variables
|
||||||
|
|
||||||
|
# We separate the RAW-STATUS and the STATUS PV so that the state can be updated
|
||||||
|
# in a sequence, that guarantees that we included the most recent time and
|
||||||
|
# counts before the status switches back to Idle.
|
||||||
|
# We do this via a sequenced update
|
||||||
|
#
|
||||||
|
# RAW-STATUS -> ELAPSED-SECONDS -> M* -> STATUS
|
||||||
|
record(mbbi, "$(INSTR)$(NAME):RAW-STATUS")
|
||||||
|
{
|
||||||
|
field(DESC, "DAQ Status")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) STATUS")
|
||||||
|
field(ZRVL, "0")
|
||||||
|
field(ZRST, "Idle")
|
||||||
|
field(ONVL, "1")
|
||||||
|
field(ONST, "Counting")
|
||||||
|
field(TWVL, "2")
|
||||||
|
field(TWST, "Low rate")
|
||||||
|
field(THVL, "3")
|
||||||
|
field(THST, "Paused")
|
||||||
|
# 4 should never happen, if it does it means the DAQ reports undocumented statusbits
|
||||||
|
field(FRVL, "4")
|
||||||
|
field(FRST, "INVALID")
|
||||||
|
# This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order
|
||||||
|
#field(SCAN, "I/O Intr")
|
||||||
|
field(SCAN, ".2 second")
|
||||||
|
field(FLNK, "$(INSTR)$(NAME):READALL")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(fanout, "$(INSTR)$(NAME):READALL")
|
||||||
|
{
|
||||||
|
field(SELM, "All")
|
||||||
|
field(LNK0, "$(INSTR)$(NAME):ELAPSED-TIME PP")
|
||||||
|
field(LNK1, "$(INSTR)$(NAME):M0")
|
||||||
|
field(LNK2, "$(INSTR)$(NAME):M1")
|
||||||
|
field(LNK3, "$(INSTR)$(NAME):M2")
|
||||||
|
field(LNK4, "$(INSTR)$(NAME):M3")
|
||||||
|
field(LNK5, "$(INSTR)$(NAME):M4")
|
||||||
|
# Doesn't seemt o be a problem to have more in here :D
|
||||||
|
# field(LNK6, "$(INSTR)$(NAME):M5")
|
||||||
|
# field(LNK7, "$(INSTR)$(NAME):M6")
|
||||||
|
field(FLNK, "$(INSTR)$(NAME):STATUS")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(mbbi, "$(INSTR)$(NAME):STATUS")
|
||||||
|
{
|
||||||
|
field(INP, "$(INSTR)$(NAME):RAW-STATUS NPP")
|
||||||
|
field(DESC, "DAQ Status")
|
||||||
|
field(ZRVL, "0")
|
||||||
|
field(ZRST, "Idle")
|
||||||
|
field(ONVL, "1")
|
||||||
|
field(ONST, "Counting")
|
||||||
|
field(TWVL, "2")
|
||||||
|
field(TWST, "Low rate")
|
||||||
|
field(THVL, "3")
|
||||||
|
field(THST, "Paused")
|
||||||
|
# 4 should never happen, if it does it means the DAQ reports undocumented statusbits
|
||||||
|
field(FRVL, "4")
|
||||||
|
field(FRST, "INVALID")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longin, "$(INSTR)$(NAME):CHANNELS")
|
||||||
|
{
|
||||||
|
field(DESC, "Total Supported Channels")
|
||||||
|
field(VAL, $(CHANNELS))
|
||||||
|
field(DISP, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
# Trigger a change in status as clearing
|
||||||
|
record(bo, "$(INSTR)$(NAME):ETT")
|
||||||
|
{
|
||||||
|
field(DESC, "Trigger Clearing Status")
|
||||||
|
field(VAL, 1)
|
||||||
|
field(OUT, "$(INSTR)$(NAME):ETS PP")
|
||||||
|
}
|
||||||
|
|
||||||
|
# Trigger a change in status as value returned to 0
|
||||||
|
record(seq, "$(INSTR)$(NAME):ETO")
|
||||||
|
{
|
||||||
|
field(DESC, "Trigger Returned to 0 Status")
|
||||||
|
field(LNK0, "$(INSTR)$(NAME):ETS PP")
|
||||||
|
field(DO0, 0)
|
||||||
|
field(SELM, "Specified")
|
||||||
|
field(SELL, "$(INSTR)$(NAME):ELAPSED-TIME.VAL")
|
||||||
|
field(SCAN, ".1 second")
|
||||||
|
}
|
||||||
|
|
||||||
|
# Current Status of Channel, i.e. is it ready to count?
|
||||||
|
record(bi, "$(INSTR)$(NAME):ETS")
|
||||||
|
{
|
||||||
|
field(DESC, "Channel Status")
|
||||||
|
field(VAL, 0)
|
||||||
|
field(ZNAM, "OK")
|
||||||
|
field(ONAM, "CLEARING")
|
||||||
|
field(PINI, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
# Count Commands
|
||||||
|
|
||||||
|
record(ao,"$(INSTR)$(NAME):PRESET-COUNT")
|
||||||
|
{
|
||||||
|
field(DESC, "Count until preset reached")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) P_CNT")
|
||||||
|
field(VAL, 0)
|
||||||
|
field(PREC, 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
record(ao,"$(INSTR)$(NAME):PRESET-TIME")
|
||||||
|
{
|
||||||
|
field(DESC, "Count for specified time")
|
||||||
|
field(EGU, "seconds")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) P_TIME")
|
||||||
|
field(VAL, 0)
|
||||||
|
field(PREC, 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
# record(bo,"$(INSTR)$(NAME):PAUSE")
|
||||||
|
# {
|
||||||
|
# field(DESC, "Pause the current count")
|
||||||
|
# field(DTYP, "stream")
|
||||||
|
# field(OUT, "@... pauseCount($(INSTR)$(NAME):) $(PORT)")
|
||||||
|
# field(VAL, "0")
|
||||||
|
# field(FLNK, "$(INSTR)$(NAME):RAW-STATUS")
|
||||||
|
# }
|
||||||
|
#
|
||||||
|
# record(bo,"$(INSTR)$(NAME):CONTINUE")
|
||||||
|
# {
|
||||||
|
# field(DESC, "Continue with a count that was paused")
|
||||||
|
# field(DTYP, "stream")
|
||||||
|
# field(OUT, "@... continueCount($(INSTR)$(NAME):) $(PORT)")
|
||||||
|
# field(VAL, "0")
|
||||||
|
# field(FLNK, "$(INSTR)$(NAME):RAW-STATUS")
|
||||||
|
# }
|
||||||
|
|
||||||
|
record(longout, "$(INSTR)$(NAME):STOP")
|
||||||
|
{
|
||||||
|
field(DESC, "Stop the current counting operation")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) STOP")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longout, "$(INSTR)$(NAME):MONITOR-CHANNEL")
|
||||||
|
{
|
||||||
|
field(DESC, "PRESET-COUNT Monitors this channel")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) MONITOR")
|
||||||
|
field(DRVL, "0") # Smallest Monitor Channel
|
||||||
|
field(DRVH, "$(CHANNELS)") # Largest Monitor Channel
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longin, "$(INSTR)$(NAME):MONITOR-CHANNEL_RBV")
|
||||||
|
{
|
||||||
|
field(DESC, "PRESET-COUNT Monitors this channel")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) MONITOR")
|
||||||
|
field(SCAN, "I/O Intr")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(ao,"$(INSTR)$(NAME):THRESHOLD")
|
||||||
|
{
|
||||||
|
field(DESC, "Minimum rate for counting to proceed")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH")
|
||||||
|
field(VAL, "1") # Default Rate
|
||||||
|
field(DRVL, "1") # Minimum Rate
|
||||||
|
field(DRVH, "100000") # Maximum Rate
|
||||||
|
}
|
||||||
|
|
||||||
|
record(ai,"$(INSTR)$(NAME):THRESHOLD_RBV")
|
||||||
|
{
|
||||||
|
field(DESC, "Minimum rate for counting to proceed")
|
||||||
|
field(EGU, "cts/sec")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH")
|
||||||
|
field(SCAN, "I/O Intr")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longout,"$(INSTR)$(NAME):THRESHOLD-MONITOR")
|
||||||
|
{
|
||||||
|
field(DESC, "Channel monitored for minimum rate")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH_CH")
|
||||||
|
field(VAL, "1") # Monitor
|
||||||
|
field(DRVL, "0") # Smallest Threshold Channel (0 is off)
|
||||||
|
field(DRVH, "$(CHANNELS)") # Largest Threshold Channel
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longin,"$(INSTR)$(NAME):THRESHOLD-MONITOR_RBV")
|
||||||
|
{
|
||||||
|
field(DESC, "Channel monitored for minimum rate")
|
||||||
|
field(EGU, "CH")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH_CH")
|
||||||
|
field(SCAN, "I/O Intr")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longout, "$(INSTR)$(NAME):CT")
|
||||||
|
{
|
||||||
|
field(DESC, "Clear the timer")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) C_TIME")
|
||||||
|
field(FLNK, "$(INSTR)$(NAME):ETT")
|
||||||
|
}
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
# Read all monitors values
|
||||||
|
|
||||||
|
record(ai, "$(INSTR)$(NAME):ELAPSED-TIME")
|
||||||
|
{
|
||||||
|
field(DESC, "DAQ Measured Time")
|
||||||
|
field(EGU, "sec")
|
||||||
|
field(DTYP, "asynFloat64")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) TIME")
|
||||||
|
# field(SCAN, "I/O Intr")
|
||||||
|
field(PINI, "YES")
|
||||||
|
# field(FLNK, "$(INSTR)$(NAME):ETO")
|
||||||
|
}
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
# Stream Generator Status PVs
|
||||||
|
|
||||||
|
record(longin,"$(INSTR)$(NAME):UDP_DROPPED")
|
||||||
|
{
|
||||||
|
field(DESC, "UDP Packets Missed")
|
||||||
|
field(EGU, "Events")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) DROP")
|
||||||
|
# field(SCAN, "I/O Intr")
|
||||||
|
field(SCAN, "1 second")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longin,"$(INSTR)$(NAME):UDP_WATERMARK")
|
||||||
|
{
|
||||||
|
field(DESC, "UDP Queue Usage")
|
||||||
|
field(EGU, "%")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) UDP")
|
||||||
|
# field(SCAN, "I/O Intr")
|
||||||
|
field(SCAN, "1 second")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longin,"$(INSTR)$(NAME):SORTED_WATERMARK")
|
||||||
|
{
|
||||||
|
field(DESC, "Partial Sort Queue Usage")
|
||||||
|
field(EGU, "%")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) SORT")
|
||||||
|
# field(SCAN, "I/O Intr")
|
||||||
|
field(SCAN, "1 second")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
1
dep/flatbuffers
Submodule
1
dep/flatbuffers
Submodule
Submodule dep/flatbuffers added at 1872409707
1
dep/streaming-data-types
Submodule
1
dep/streaming-data-types
Submodule
Submodule dep/streaming-data-types added at 3b1830faf2
@@ -1,4 +0,0 @@
|
|||||||
confluent-kafka==2.12.1
|
|
||||||
ess-streaming-data-types==0.27.0
|
|
||||||
flatbuffers==25.9.23
|
|
||||||
numpy==1.26.3
|
|
||||||
9
scripts/ioc.sh
Executable file
9
scripts/ioc.sh
Executable file
@@ -0,0 +1,9 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
export EPICS_HOST_ARCH=linux-x86_64
|
||||||
|
export EPICS_BASE=/usr/local/epics/base-7.0.7
|
||||||
|
|
||||||
|
PARENT_PATH="$( cd "$(dirname "${BASH_SOURCE[0]}")" ; pwd -P )"
|
||||||
|
|
||||||
|
# /usr/local/bin/procServ -o -L - -f -i ^D^C 20001 "${PARENT_PATH}/st.cmd" -d
|
||||||
|
${PARENT_PATH}/st.cmd
|
||||||
38
scripts/st.cmd
Executable file
38
scripts/st.cmd
Executable file
@@ -0,0 +1,38 @@
|
|||||||
|
#!/usr/local/bin/iocsh
|
||||||
|
#-d
|
||||||
|
|
||||||
|
on error break
|
||||||
|
|
||||||
|
require StreamGenerator, test
|
||||||
|
|
||||||
|
epicsEnvSet("INSTR", "SQ:TEST:")
|
||||||
|
epicsEnvSet("NAME", "SG")
|
||||||
|
|
||||||
|
# Local UDP Generator Test Config
|
||||||
|
# drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 1)
|
||||||
|
|
||||||
|
# Correlation Unit Config
|
||||||
|
drvAsynIPPortConfigure("ASYN_IP_PORT", "172.28.69.20:54321:54321 UDP", 0, 0, 1)
|
||||||
|
|
||||||
|
# With a udpQueue and sortQueue size of 10'000 packets, we can hold in memory
|
||||||
|
# 10'000 * 243 = 2.43e6 events
|
||||||
|
|
||||||
|
# Kafka Broker and Topic Configuration
|
||||||
|
# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "linkafka01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 10000, 20480)
|
||||||
|
# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "ess01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 10000, 20480)
|
||||||
|
|
||||||
|
# Don't send any kafka messages
|
||||||
|
asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "", "", "", 0, 0)
|
||||||
|
|
||||||
|
dbLoadRecords("$(StreamGenerator_DB)daq_common.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNELS=5")
|
||||||
|
|
||||||
|
# Detector Count Channel
|
||||||
|
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=0")
|
||||||
|
|
||||||
|
# Monitor Channels
|
||||||
|
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=1")
|
||||||
|
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=2")
|
||||||
|
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=3")
|
||||||
|
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=4")
|
||||||
|
|
||||||
|
iocInit()
|
||||||
118
scripts/udp_gen.py
Normal file
118
scripts/udp_gen.py
Normal file
@@ -0,0 +1,118 @@
|
|||||||
|
import socket
|
||||||
|
import time
|
||||||
|
import random
|
||||||
|
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
|
||||||
|
header = [
|
||||||
|
0, 0, # buffer length in 16bit words (1, 0) == 1, (0, 1) == 256
|
||||||
|
0, 0x80, # buffer type (probably should be 0)
|
||||||
|
21, 0, # header length
|
||||||
|
0, 0, # buffer number
|
||||||
|
0, 0, # run id
|
||||||
|
0x3, # status
|
||||||
|
0, # id of sending module
|
||||||
|
0, 0, # timestamp low
|
||||||
|
0, 0, # timestamp mid
|
||||||
|
0, 0, # timestamp high
|
||||||
|
] + [0, 0] * 12 # parameters
|
||||||
|
|
||||||
|
data = [
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
0
|
||||||
|
]
|
||||||
|
|
||||||
|
start_time = time.time_ns() // 100
|
||||||
|
|
||||||
|
buffer_ids = {
|
||||||
|
i: (0, 0) for i in range(10)
|
||||||
|
}
|
||||||
|
|
||||||
|
while True:
|
||||||
|
|
||||||
|
# update timestamp
|
||||||
|
base_timestamp = time.time_ns() // 100 - start_time
|
||||||
|
t_low = base_timestamp & 0xffff
|
||||||
|
t_mid = (base_timestamp >> 16) & 0xffff
|
||||||
|
t_high = (base_timestamp >> 32) & 0xffff
|
||||||
|
header[12] = t_low & 0xff
|
||||||
|
header[13] = t_low >> 8
|
||||||
|
header[14] = t_mid & 0xff
|
||||||
|
header[15] = t_mid >> 8
|
||||||
|
header[16] = t_high & 0xff
|
||||||
|
header[17] = t_high >> 8
|
||||||
|
|
||||||
|
num_events = random.randint(0, 243)
|
||||||
|
# num_events = 243
|
||||||
|
# num_events = 1
|
||||||
|
|
||||||
|
# update buffer length
|
||||||
|
buffer_length = 21 + num_events * 3
|
||||||
|
header[0] = buffer_length & 0xff
|
||||||
|
header[1] = (buffer_length >> 8) & 0xff
|
||||||
|
|
||||||
|
# I believe, that in our case we never mix monitor and detector events as
|
||||||
|
# the monitors should have id 0 and the detector events 1-9 so I have
|
||||||
|
# excluded that posibility here. That would, however, if true mean we could
|
||||||
|
# reduce also the number of checks on the parsing side of things...
|
||||||
|
|
||||||
|
is_monitor = random.randint(0, 9)
|
||||||
|
# is_monitor = 4
|
||||||
|
|
||||||
|
header[11] = 0 if is_monitor > 3 else random.randint(1,9)
|
||||||
|
|
||||||
|
# update buffer number (each mcpdid has its own buffer number count)
|
||||||
|
header[6], header[7] = buffer_ids[header[11]]
|
||||||
|
header[6] = (header[6] + 1) % (0xff + 1)
|
||||||
|
header[7] = (header[7] + (header[6] == 0)) % (0xff + 1)
|
||||||
|
buffer_ids[header[11]] = header[6], header[7]
|
||||||
|
|
||||||
|
tosend = list(header)
|
||||||
|
|
||||||
|
if is_monitor > 3:
|
||||||
|
|
||||||
|
for i in range(num_events):
|
||||||
|
d = list(data)
|
||||||
|
|
||||||
|
monitor = random.randint(0,3)
|
||||||
|
# monitor = 0
|
||||||
|
|
||||||
|
d[5] = (1 << 7) | monitor
|
||||||
|
|
||||||
|
# update trigger timestamp
|
||||||
|
event_timestamp = (time.time_ns() // 100) - base_timestamp
|
||||||
|
d[0] = event_timestamp & 0xff
|
||||||
|
d[1] = (event_timestamp >> 8) & 0xff
|
||||||
|
d[2] = (event_timestamp >> 16) & 0x07
|
||||||
|
|
||||||
|
tosend += d
|
||||||
|
|
||||||
|
else:
|
||||||
|
|
||||||
|
for i in range(num_events):
|
||||||
|
d = list(data)
|
||||||
|
|
||||||
|
amplitude = random.randint(0, 255)
|
||||||
|
x_pos = random.randint(0, 1023)
|
||||||
|
y_pos = random.randint(0, 1023)
|
||||||
|
event_timestamp = (time.time_ns() // 100) - base_timestamp
|
||||||
|
|
||||||
|
d[5] = (0 << 7) | (amplitude >> 1)
|
||||||
|
d[4] = ((amplitude & 0x01) << 7) | (y_pos >> 3)
|
||||||
|
d[3] = ((y_pos << 5) & 0xE0) | (x_pos >> 5)
|
||||||
|
d[2] = ((x_pos << 3) & 0xF8)
|
||||||
|
|
||||||
|
d[0] = event_timestamp & 0xff
|
||||||
|
d[1] = (event_timestamp >> 8) & 0xff
|
||||||
|
d[2] |= (event_timestamp >> 16) & 0x07
|
||||||
|
|
||||||
|
tosend += d
|
||||||
|
|
||||||
|
sock.sendto(bytes(tosend), ('127.0.0.1', 54321))
|
||||||
|
mv = memoryview(bytes(header)).cast('H')
|
||||||
|
print(f'Sent packet {mv[3]} with {num_events} events {base_timestamp}')
|
||||||
|
# time.sleep(.01)
|
||||||
910
src/asynStreamGeneratorDriver.cpp
Normal file
910
src/asynStreamGeneratorDriver.cpp
Normal file
@@ -0,0 +1,910 @@
|
|||||||
|
#include "asynOctetSyncIO.h"
|
||||||
|
#include "ev42_events_generated.h"
|
||||||
|
#include <cmath>
|
||||||
|
#include <cstring>
|
||||||
|
#include <epicsStdio.h>
|
||||||
|
#include <iocsh.h>
|
||||||
|
#include <queue>
|
||||||
|
|
||||||
|
// Just for printing
|
||||||
|
#define __STDC_FORMAT_MACROS
|
||||||
|
#include <inttypes.h>
|
||||||
|
|
||||||
|
#include "asynStreamGeneratorDriver.h"
|
||||||
|
#include <epicsExport.h>
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
* Kafka Methods
|
||||||
|
*/
|
||||||
|
|
||||||
|
static void set_kafka_config_key(rd_kafka_conf_t *conf, char *key,
|
||||||
|
char *value) {
|
||||||
|
char errstr[512];
|
||||||
|
rd_kafka_conf_res_t res;
|
||||||
|
|
||||||
|
res = rd_kafka_conf_set(conf, key, value, errstr, sizeof(errstr));
|
||||||
|
if (res != RD_KAFKA_CONF_OK) {
|
||||||
|
epicsStdoutPrintf("Failed to set config value %s : %s\n", key, value);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static rd_kafka_t *create_kafka_producer(const char *kafkaBroker) {
|
||||||
|
|
||||||
|
char errstr[512];
|
||||||
|
rd_kafka_t *producer;
|
||||||
|
|
||||||
|
// Prepare configuration object
|
||||||
|
rd_kafka_conf_t *conf = rd_kafka_conf_new();
|
||||||
|
// TODO feel not great about this
|
||||||
|
set_kafka_config_key(conf, "bootstrap.servers",
|
||||||
|
const_cast<char *>(kafkaBroker));
|
||||||
|
set_kafka_config_key(conf, "queue.buffering.max.messages", "10000000");
|
||||||
|
// With 2e6 counts / s
|
||||||
|
// and a packet size of 20480 events (163920 bytes)
|
||||||
|
// this implies we need to send around 100 messages a second
|
||||||
|
// and need about .2 gigabit upload
|
||||||
|
// set_kafka_config_key(conf, "queue.buffering.max.kbytes", "10000000");
|
||||||
|
|
||||||
|
// Create the Producer
|
||||||
|
producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
|
||||||
|
|
||||||
|
if (!producer) {
|
||||||
|
epicsStdoutPrintf("Failed to create Kafka Producer: %s\n", errstr);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
return producer;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
* Static Methods Passed to Epics Threads that should run in the background
|
||||||
|
*/
|
||||||
|
static void udpPollerTask(void *drvPvt) {
|
||||||
|
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
|
||||||
|
pSGD->receiveUDP();
|
||||||
|
}
|
||||||
|
|
||||||
|
static void udpNormaliserTask(void *drvPvt) {
|
||||||
|
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
|
||||||
|
pSGD->normaliseUDP();
|
||||||
|
}
|
||||||
|
|
||||||
|
static void sortTask(void *drvPvt) {
|
||||||
|
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
|
||||||
|
pSGD->partialSortEvents();
|
||||||
|
}
|
||||||
|
|
||||||
|
static void daqTask(void *drvPvt) {
|
||||||
|
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
|
||||||
|
pSGD->processEvents();
|
||||||
|
}
|
||||||
|
|
||||||
|
static void monitorProducerTask(void *drvPvt) {
|
||||||
|
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
|
||||||
|
pSGD->produceMonitor();
|
||||||
|
}
|
||||||
|
|
||||||
|
static void detectorProducerTask(void *drvPvt) {
|
||||||
|
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
|
||||||
|
pSGD->produceDetector();
|
||||||
|
}
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
* Stream Generator Helper Methods
|
||||||
|
*/
|
||||||
|
|
||||||
|
asynStatus asynStreamGeneratorDriver::createInt32Param(
|
||||||
|
asynStatus status, char *name, int *variable, epicsInt32 initialValue) {
|
||||||
|
// TODO should show error if there is one
|
||||||
|
return (asynStatus)(status | createParam(name, asynParamInt32, variable) |
|
||||||
|
setIntegerParam(*variable, initialValue));
|
||||||
|
}
|
||||||
|
|
||||||
|
asynStatus asynStreamGeneratorDriver::createInt64Param(
|
||||||
|
asynStatus status, char *name, int *variable, epicsInt64 initialValue) {
|
||||||
|
// TODO should show error if there is one
|
||||||
|
return (asynStatus)(status | createParam(name, asynParamInt64, variable) |
|
||||||
|
setInteger64Param(*variable, initialValue));
|
||||||
|
}
|
||||||
|
|
||||||
|
asynStatus asynStreamGeneratorDriver::createFloat64Param(asynStatus status,
|
||||||
|
char *name,
|
||||||
|
int *variable,
|
||||||
|
double initialValue) {
|
||||||
|
// TODO should show error if there is one
|
||||||
|
return (asynStatus)(status | createParam(name, asynParamFloat64, variable) |
|
||||||
|
setDoubleParam(*variable, initialValue));
|
||||||
|
}
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
* Stream Generator Methods
|
||||||
|
*/
|
||||||
|
asynStreamGeneratorDriver::asynStreamGeneratorDriver(
|
||||||
|
const char *portName, const char *ipPortName, const int numChannels,
|
||||||
|
const int udpQueueSize, const bool enableKafkaStream,
|
||||||
|
const char *kafkaBroker, const char *monitorTopic,
|
||||||
|
const char *detectorTopic, const int kafkaQueueSize,
|
||||||
|
const int kafkaMaxPacketSize)
|
||||||
|
: asynPortDriver(portName, 1, /* maxAddr */
|
||||||
|
asynInt32Mask | asynInt64Mask | asynFloat64Mask |
|
||||||
|
asynDrvUserMask, /* Interface mask */
|
||||||
|
asynInt32Mask, // | asynFloat64Mask, /* Interrupt mask */
|
||||||
|
0, /* asynFlags. This driver does not block and it is
|
||||||
|
not multi-device, but has a
|
||||||
|
destructor ASYN_DESTRUCTIBLE our version of the Asyn
|
||||||
|
is too old to support this flag */
|
||||||
|
1, /* Autoconnect */
|
||||||
|
0, /* Default priority */
|
||||||
|
0), /* Default stack size*/
|
||||||
|
num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream),
|
||||||
|
monitorTopic(monitorTopic), detectorTopic(detectorTopic),
|
||||||
|
udpQueueSize(udpQueueSize), kafkaQueueSize(kafkaQueueSize),
|
||||||
|
// measured in max packet sizes
|
||||||
|
udpQueue(
|
||||||
|
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
|
||||||
|
normalisedQueue(
|
||||||
|
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
|
||||||
|
// TODO configurable sizes
|
||||||
|
sortedQueue(
|
||||||
|
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
|
||||||
|
monitorQueue(
|
||||||
|
epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))),
|
||||||
|
detectorQueue(
|
||||||
|
epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))),
|
||||||
|
kafkaMaxPacketSize(kafkaMaxPacketSize) {
|
||||||
|
const char *functionName = "asynStreamGeneratorDriver";
|
||||||
|
|
||||||
|
// Parameter Setup
|
||||||
|
asynStatus status = asynSuccess;
|
||||||
|
|
||||||
|
status = createInt32Param(status, P_StatusString, &P_Status, STATUS_IDLE);
|
||||||
|
status = createInt32Param(status, P_ResetString, &P_Reset);
|
||||||
|
status = createInt32Param(status, P_StopString, &P_Stop);
|
||||||
|
status = createInt32Param(status, P_CountPresetString, &P_CountPreset);
|
||||||
|
status = createInt32Param(status, P_TimePresetString, &P_TimePreset);
|
||||||
|
status = createFloat64Param(status, P_ElapsedTimeString, &P_ElapsedTime);
|
||||||
|
status =
|
||||||
|
createInt32Param(status, P_ClearElapsedTimeString, &P_ClearElapsedTime);
|
||||||
|
status =
|
||||||
|
createInt32Param(status, P_MonitorChannelString, &P_MonitorChannel);
|
||||||
|
status = createInt32Param(status, P_ThresholdString, &P_Threshold, 1);
|
||||||
|
status = createInt32Param(status, P_ThresholdChannelString,
|
||||||
|
&P_ThresholdChannel, 1);
|
||||||
|
|
||||||
|
// Create Parameters templated on Channel Number
|
||||||
|
char pv_name_buffer[100];
|
||||||
|
P_Counts = new int[this->num_channels];
|
||||||
|
P_Rates = new int[this->num_channels];
|
||||||
|
P_ClearCounts = new int[this->num_channels];
|
||||||
|
for (std::size_t i = 0; i < this->num_channels; ++i) {
|
||||||
|
memset(pv_name_buffer, 0, 100);
|
||||||
|
epicsSnprintf(pv_name_buffer, 100, P_CountsString, i);
|
||||||
|
status = createInt64Param(status, pv_name_buffer, P_Counts + i);
|
||||||
|
|
||||||
|
memset(pv_name_buffer, 0, 100);
|
||||||
|
epicsSnprintf(pv_name_buffer, 100, P_RateString, i);
|
||||||
|
status = createInt32Param(status, pv_name_buffer, P_Rates + i);
|
||||||
|
|
||||||
|
memset(pv_name_buffer, 0, 100);
|
||||||
|
epicsSnprintf(pv_name_buffer, 100, P_ClearCountsString, i);
|
||||||
|
status = createInt32Param(status, pv_name_buffer, P_ClearCounts + i);
|
||||||
|
}
|
||||||
|
|
||||||
|
status = createInt32Param(status, P_UdpDroppedString, &P_UdpDropped);
|
||||||
|
status = createInt32Param(status, P_UdpQueueHighWaterMarkString,
|
||||||
|
&P_UdpQueueHighWaterMark);
|
||||||
|
status = createInt32Param(status, P_SortedQueueHighWaterMarkString,
|
||||||
|
&P_SortedQueueHighWaterMark);
|
||||||
|
|
||||||
|
if (status) {
|
||||||
|
epicsStdoutPrintf(
|
||||||
|
"%s:%s: failed to create or setup parameters, status=%d\n",
|
||||||
|
driverName, functionName, status);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create Events
|
||||||
|
// this->pausedEventId = epicsEventCreate(epicsEventEmpty);
|
||||||
|
|
||||||
|
if (enableKafkaStream) {
|
||||||
|
|
||||||
|
epicsStdoutPrintf(
|
||||||
|
"Detector Kafka Config: broker=%s, topic=%s\n "
|
||||||
|
" queue size:%d, max events per packet: %d\n",
|
||||||
|
kafkaBroker, this->detectorTopic, kafkaQueueSize,
|
||||||
|
this->kafkaMaxPacketSize);
|
||||||
|
|
||||||
|
epicsStdoutPrintf(
|
||||||
|
"Monitors Kafka Config: broker=%s, topic=%s\n "
|
||||||
|
" queue size:%d, max events per packet: %d\n",
|
||||||
|
kafkaBroker, this->monitorTopic, kafkaQueueSize,
|
||||||
|
this->kafkaMaxPacketSize);
|
||||||
|
|
||||||
|
this->monitorProducer = create_kafka_producer(kafkaBroker);
|
||||||
|
this->detectorProducer = create_kafka_producer(kafkaBroker);
|
||||||
|
|
||||||
|
// Setup for Thread Producing Monitor Kafka Events
|
||||||
|
status =
|
||||||
|
(asynStatus)(epicsThreadCreate(
|
||||||
|
"monitor_produce", epicsThreadPriorityMedium,
|
||||||
|
epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||||
|
(EPICSTHREADFUNC)::monitorProducerTask,
|
||||||
|
this) == NULL);
|
||||||
|
if (status) {
|
||||||
|
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
||||||
|
driverName, functionName, status);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup for Thread Producing Detector Kafka Events
|
||||||
|
status =
|
||||||
|
(asynStatus)(epicsThreadCreate(
|
||||||
|
"monitor_produce", epicsThreadPriorityMedium,
|
||||||
|
epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||||
|
(EPICSTHREADFUNC)::detectorProducerTask,
|
||||||
|
this) == NULL);
|
||||||
|
if (status) {
|
||||||
|
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
||||||
|
driverName, functionName, status);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
|
||||||
|
epicsStdoutPrintf("Kafka Stream Disabled\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Create the thread that orders the events and acts as our sinqDaq stand-in
|
||||||
|
*/
|
||||||
|
status =
|
||||||
|
(asynStatus)(epicsThreadCreate(
|
||||||
|
"sinqDAQ",
|
||||||
|
epicsThreadPriorityMedium, // epicsThreadPriorityMax,
|
||||||
|
epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||||
|
(EPICSTHREADFUNC)::daqTask, this) == NULL);
|
||||||
|
if (status) {
|
||||||
|
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
||||||
|
driverName, functionName, status);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Create the thread that orders packets of in preparation for our sinqDAQ
|
||||||
|
* stand-in
|
||||||
|
*/
|
||||||
|
status = (asynStatus)(epicsThreadCreate(
|
||||||
|
"partialSort", epicsThreadPriorityMedium,
|
||||||
|
epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||||
|
(EPICSTHREADFUNC)::sortTask, this) == NULL);
|
||||||
|
if (status) {
|
||||||
|
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
||||||
|
driverName, functionName, status);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Create the thread normalises the events
|
||||||
|
*/
|
||||||
|
status =
|
||||||
|
(asynStatus)(epicsThreadCreate(
|
||||||
|
"eventNormaliser",
|
||||||
|
epicsThreadPriorityMedium, // epicsThreadPriorityMax,
|
||||||
|
epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||||
|
(EPICSTHREADFUNC)::udpNormaliserTask, this) == NULL);
|
||||||
|
if (status) {
|
||||||
|
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
||||||
|
driverName, functionName, status);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// UDP Receive Setup
|
||||||
|
status = pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL);
|
||||||
|
|
||||||
|
if (status) {
|
||||||
|
epicsStdoutPrintf("%s:%s: Couldn't open connection %s, status=%d\n",
|
||||||
|
driverName, functionName, ipPortName, status);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Create the thread that receives UDP traffic in the background */
|
||||||
|
status = (asynStatus)(epicsThreadCreate(
|
||||||
|
"udp_receive", epicsThreadPriorityMax,
|
||||||
|
epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||||
|
(EPICSTHREADFUNC)::udpPollerTask, this) == NULL);
|
||||||
|
if (status) {
|
||||||
|
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
||||||
|
driverName, functionName, status);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
asynStreamGeneratorDriver::~asynStreamGeneratorDriver() {
|
||||||
|
// should make sure queues are empty and freed
|
||||||
|
// and that the kafka producers are flushed and freed
|
||||||
|
delete[] P_Counts;
|
||||||
|
delete[] P_Rates;
|
||||||
|
|
||||||
|
// TODO add exit should perhaps ensure the queue is flushed
|
||||||
|
// rd_kafka_poll(producer, 0);
|
||||||
|
// epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
|
||||||
|
// rd_kafka_flush(producer, 10 * 1000);
|
||||||
|
// epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
|
||||||
|
}
|
||||||
|
|
||||||
|
asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser,
|
||||||
|
epicsInt32 *value) {
|
||||||
|
|
||||||
|
int function = pasynUser->reason;
|
||||||
|
asynStatus status = asynSuccess;
|
||||||
|
const char *paramName;
|
||||||
|
const char *functionName = "readInt32";
|
||||||
|
getParamName(function, ¶mName);
|
||||||
|
|
||||||
|
if (function == P_UdpQueueHighWaterMark) {
|
||||||
|
const double toPercent = 100. / (243. * udpQueueSize);
|
||||||
|
*value = (epicsInt32)(epicsRingBytesHighWaterMark(this->udpQueue) /
|
||||||
|
sizeof(NormalisedEvent) * toPercent);
|
||||||
|
// Aparently resetting the watermark causes problems...
|
||||||
|
// at least concurrently :D
|
||||||
|
// epicsRingBytesResetHighWaterMark(this->udpQueue);
|
||||||
|
return asynSuccess;
|
||||||
|
} else if (function == P_SortedQueueHighWaterMark) {
|
||||||
|
const double toPercent = 100. / (243. * udpQueueSize);
|
||||||
|
*value = (epicsInt32)(epicsRingBytesHighWaterMark(this->sortedQueue) /
|
||||||
|
sizeof(NormalisedEvent) * toPercent);
|
||||||
|
// epicsRingBytesResetHighWaterMark(this->sortedQueue);
|
||||||
|
return asynSuccess;
|
||||||
|
}
|
||||||
|
|
||||||
|
return asynPortDriver::readInt32(pasynUser, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser,
|
||||||
|
epicsInt32 value) {
|
||||||
|
int function = pasynUser->reason;
|
||||||
|
asynStatus status = asynSuccess;
|
||||||
|
const char *paramName;
|
||||||
|
const char *functionName = "writeInt32";
|
||||||
|
getParamName(function, ¶mName);
|
||||||
|
|
||||||
|
// TODO should maybe lock mutex for this
|
||||||
|
epicsInt32 currentStatus;
|
||||||
|
status = getIntegerParam(this->P_Status, ¤tStatus);
|
||||||
|
|
||||||
|
if (status) {
|
||||||
|
epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize,
|
||||||
|
"%s:%s: status=%d, function=%d, name=%s, value=%d",
|
||||||
|
driverName, functionName, status, function, paramName,
|
||||||
|
value);
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO clean up
|
||||||
|
bool isClearCount = false;
|
||||||
|
size_t channelToClear;
|
||||||
|
for (size_t i = 0; i < this->num_channels; ++i) {
|
||||||
|
isClearCount |= function == P_ClearCounts[i];
|
||||||
|
if (isClearCount) {
|
||||||
|
channelToClear = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO should check everything...
|
||||||
|
if (function == P_CountPreset) {
|
||||||
|
if (!currentStatus) {
|
||||||
|
setIntegerParam(function, value);
|
||||||
|
setIntegerParam(P_Status, STATUS_COUNTING);
|
||||||
|
status = (asynStatus)callParamCallbacks();
|
||||||
|
} else {
|
||||||
|
return asynError;
|
||||||
|
}
|
||||||
|
} else if (function == P_TimePreset) {
|
||||||
|
if (!currentStatus) {
|
||||||
|
setIntegerParam(function, value);
|
||||||
|
setIntegerParam(P_Status, STATUS_COUNTING);
|
||||||
|
status = (asynStatus)callParamCallbacks();
|
||||||
|
} else {
|
||||||
|
return asynError;
|
||||||
|
}
|
||||||
|
} else if (function == P_ClearElapsedTime) {
|
||||||
|
if (!currentStatus) {
|
||||||
|
setIntegerParam(P_ElapsedTime, 0);
|
||||||
|
status = (asynStatus)callParamCallbacks();
|
||||||
|
} else {
|
||||||
|
return asynError;
|
||||||
|
}
|
||||||
|
} else if (isClearCount) {
|
||||||
|
if (!currentStatus) {
|
||||||
|
setInteger64Param(P_Counts[channelToClear], 0);
|
||||||
|
status = (asynStatus)callParamCallbacks();
|
||||||
|
} else {
|
||||||
|
return asynError;
|
||||||
|
}
|
||||||
|
} else if (function == P_Reset) {
|
||||||
|
lock();
|
||||||
|
// TODO should probably set back everything to defaults
|
||||||
|
setIntegerParam(P_Status, STATUS_IDLE);
|
||||||
|
status = (asynStatus)callParamCallbacks();
|
||||||
|
unlock();
|
||||||
|
} else if (function == P_Stop) {
|
||||||
|
lock();
|
||||||
|
setIntegerParam(P_Status, STATUS_IDLE);
|
||||||
|
status = (asynStatus)callParamCallbacks();
|
||||||
|
unlock();
|
||||||
|
} else if (function == P_MonitorChannel) {
|
||||||
|
if (!currentStatus) {
|
||||||
|
setIntegerParam(function, value);
|
||||||
|
status = (asynStatus)callParamCallbacks();
|
||||||
|
} else {
|
||||||
|
return asynError;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
setIntegerParam(function, value);
|
||||||
|
status = (asynStatus)callParamCallbacks();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (status)
|
||||||
|
epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize,
|
||||||
|
"%s:%s: status=%d, function=%d, name=%s, value=%d",
|
||||||
|
driverName, functionName, status, function, paramName,
|
||||||
|
value);
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
void asynStreamGeneratorDriver::receiveUDP() {
|
||||||
|
|
||||||
|
const char *functionName = "receiveUDP";
|
||||||
|
asynStatus status = asynSuccess;
|
||||||
|
// int isConnected = 1;
|
||||||
|
std::size_t received;
|
||||||
|
int eomReason;
|
||||||
|
|
||||||
|
const std::size_t bufferSize = 1500;
|
||||||
|
char buffer[bufferSize];
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
|
||||||
|
// status = pasynManager->isConnected(pasynUDPUser, &isConnected);
|
||||||
|
|
||||||
|
// if (!isConnected)
|
||||||
|
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
|
||||||
|
// "%s:%s: isConnected = %d\n", driverName, functionName,
|
||||||
|
// isConnected);
|
||||||
|
|
||||||
|
status = pasynOctetSyncIO->read(pasynUDPUser, buffer, bufferSize,
|
||||||
|
0, // timeout
|
||||||
|
&received, &eomReason);
|
||||||
|
|
||||||
|
if (received) {
|
||||||
|
const uint16_t bufferLength = ((uint16_t *)buffer)[0];
|
||||||
|
const std::size_t headerLength = 42;
|
||||||
|
|
||||||
|
if (received >= headerLength && received == bufferLength * 2) {
|
||||||
|
|
||||||
|
epicsRingBytesPut(this->udpQueue, (char *)buffer, bufferSize);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
|
||||||
|
"%s:%s: invalid UDP packet\n", driverName,
|
||||||
|
functionName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void asynStreamGeneratorDriver::normaliseUDP() {
|
||||||
|
|
||||||
|
// TODO fix time overflows
|
||||||
|
// Regarding time overflow.
|
||||||
|
// * the header time stamp is 3 words, i.e. 48 bits.
|
||||||
|
// * it has a resolution of 100ns
|
||||||
|
// * so we can cover a maximum of (2^(3*16) - 1) * 1e-7 = 28147497 seconds
|
||||||
|
// * or about 325 days
|
||||||
|
// * so maybe this isn't necessary to solve, as long as we restart the
|
||||||
|
// electronics at least once a year...
|
||||||
|
|
||||||
|
const char *functionName = "normaliseUDP";
|
||||||
|
asynStatus status = asynSuccess;
|
||||||
|
int isConnected = 1;
|
||||||
|
std::size_t received;
|
||||||
|
int eomReason;
|
||||||
|
|
||||||
|
// The correlation unit sends messages with a maximum size of 1500 bytes.
|
||||||
|
// These messages don't have any obious start or end to synchronise
|
||||||
|
// against...
|
||||||
|
const std::size_t bufferSize = 1500;
|
||||||
|
char buffer[bufferSize];
|
||||||
|
|
||||||
|
const std::size_t resultBufferSize = 243;
|
||||||
|
NormalisedEvent resultBuffer[resultBufferSize];
|
||||||
|
|
||||||
|
// We have 10 mcpdids
|
||||||
|
uint64_t lastBufferNumber[10];
|
||||||
|
for (size_t i = 0; i < 10; ++i) {
|
||||||
|
lastBufferNumber[i] = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
epicsInt32 droppedMessages = 0;
|
||||||
|
|
||||||
|
const UDPHeader *header;
|
||||||
|
const DetectorEvent *d_event;
|
||||||
|
const MonitorEvent *m_event;
|
||||||
|
NormalisedEvent ne;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
|
||||||
|
if (epicsRingBytesUsedBytes(this->udpQueue) > 1500) {
|
||||||
|
|
||||||
|
epicsRingBytesGet(this->udpQueue, (char *)buffer, bufferSize);
|
||||||
|
|
||||||
|
header = (UDPHeader *)buffer;
|
||||||
|
const std::size_t total_events = (header->BufferLength - 21) / 3;
|
||||||
|
|
||||||
|
if (header->BufferNumber - lastBufferNumber[header->McpdID] > 1 &&
|
||||||
|
lastBufferNumber[header->McpdID] !=
|
||||||
|
std::numeric_limits<
|
||||||
|
decltype(header->BufferNumber)>::max()) {
|
||||||
|
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
|
||||||
|
"%s:%s: missed packet on id: %d. Received: %" PRIu64
|
||||||
|
", last: %" PRIu64 "\n",
|
||||||
|
driverName, functionName, header->McpdID,
|
||||||
|
header->BufferNumber,
|
||||||
|
lastBufferNumber[header->McpdID]);
|
||||||
|
setIntegerParam(P_UdpDropped, ++droppedMessages);
|
||||||
|
}
|
||||||
|
|
||||||
|
lastBufferNumber[header->McpdID] = header->BufferNumber;
|
||||||
|
|
||||||
|
for (std::size_t i = 0; i < total_events; ++i) {
|
||||||
|
char *event = (buffer + 21 * 2 + i * 6);
|
||||||
|
const bool isMonitorEvent = event[5] & 0x80;
|
||||||
|
|
||||||
|
if (isMonitorEvent) {
|
||||||
|
m_event = (MonitorEvent *)event;
|
||||||
|
ne.timestamp =
|
||||||
|
header->nanosecs() + (uint64_t)m_event->nanosecs();
|
||||||
|
ne.source = 0;
|
||||||
|
ne.pixelId = m_event->DataID;
|
||||||
|
|
||||||
|
} else {
|
||||||
|
d_event = (DetectorEvent *)event;
|
||||||
|
ne.timestamp =
|
||||||
|
header->nanosecs() + (uint64_t)d_event->nanosecs();
|
||||||
|
ne.source = header->McpdID;
|
||||||
|
ne.pixelId = d_event->pixelId(header->McpdID);
|
||||||
|
}
|
||||||
|
|
||||||
|
resultBuffer[i] = ne;
|
||||||
|
}
|
||||||
|
|
||||||
|
epicsRingBytesPut(this->normalisedQueue, (char *)resultBuffer,
|
||||||
|
total_events * sizeof(NormalisedEvent));
|
||||||
|
|
||||||
|
} else {
|
||||||
|
epicsThreadSleep(0.0001); // seconds
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct {
|
||||||
|
bool operator()(const NormalisedEvent l, const NormalisedEvent r) const {
|
||||||
|
return l.timestamp < r.timestamp;
|
||||||
|
}
|
||||||
|
} oldestEventsFirst;
|
||||||
|
|
||||||
|
inline int eventsInQueue(epicsRingBytesId id) {
|
||||||
|
return epicsRingBytesUsedBytes(id) / sizeof(NormalisedEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
void asynStreamGeneratorDriver::partialSortEvents() {
|
||||||
|
|
||||||
|
const char *functionName = "partialSortEvents";
|
||||||
|
|
||||||
|
// x * number of ids * max events in packet
|
||||||
|
int bufferedEvents = 5 * 10 * 243;
|
||||||
|
NormalisedEvent events[bufferedEvents];
|
||||||
|
|
||||||
|
int queuedEvents = 0;
|
||||||
|
epicsTimeStamp lastSort = epicsTime::getCurrent();
|
||||||
|
epicsTimeStamp currentTime = lastSort;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
|
||||||
|
queuedEvents =
|
||||||
|
eventsInQueue(this->normalisedQueue); // in case we can't wait
|
||||||
|
lastSort = epicsTime::getCurrent();
|
||||||
|
currentTime = lastSort;
|
||||||
|
|
||||||
|
// wait for mininmum packet frequency or enough packets to ensure we
|
||||||
|
// could potentially have at least 1 packet per mcpdid
|
||||||
|
while (queuedEvents < bufferedEvents &&
|
||||||
|
epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ull) {
|
||||||
|
epicsThreadSleep(0.0001); // seconds
|
||||||
|
currentTime = epicsTime::getCurrent();
|
||||||
|
queuedEvents = eventsInQueue(this->normalisedQueue);
|
||||||
|
}
|
||||||
|
|
||||||
|
queuedEvents = std::min(queuedEvents, bufferedEvents);
|
||||||
|
|
||||||
|
if (queuedEvents) {
|
||||||
|
epicsRingBytesGet(this->normalisedQueue, (char *)events,
|
||||||
|
queuedEvents * sizeof(NormalisedEvent));
|
||||||
|
|
||||||
|
std::sort(events, events + queuedEvents, oldestEventsFirst);
|
||||||
|
|
||||||
|
epicsRingBytesPut(this->sortedQueue, (char *)events,
|
||||||
|
queuedEvents * sizeof(NormalisedEvent));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &ne) {
|
||||||
|
if (this->kafkaEnabled) {
|
||||||
|
if (ne.source == 0)
|
||||||
|
epicsRingBytesPut(this->monitorQueue, (char *)&ne,
|
||||||
|
sizeof(NormalisedEvent));
|
||||||
|
else
|
||||||
|
epicsRingBytesPut(this->detectorQueue, (char *)&ne,
|
||||||
|
sizeof(NormalisedEvent));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void asynStreamGeneratorDriver::processEvents() {
|
||||||
|
|
||||||
|
const char *functionName = "processEvents";
|
||||||
|
|
||||||
|
// x * number of ids * max events in packet * event size
|
||||||
|
int bufferedEvents = 5 * 10 * 243;
|
||||||
|
// we need a little extra space for merge sorting in
|
||||||
|
int extraBufferedEvents = 1 * 10 * 243;
|
||||||
|
|
||||||
|
// we have two buffers. We alternate between reading data into one of them,
|
||||||
|
// and then merge sorting into the other
|
||||||
|
NormalisedEvent eventsABuffer[(bufferedEvents + extraBufferedEvents)];
|
||||||
|
NormalisedEvent eventsBBuffer[(bufferedEvents + extraBufferedEvents)];
|
||||||
|
|
||||||
|
NormalisedEvent *eventsA = &eventsABuffer[0];
|
||||||
|
NormalisedEvent *eventsB = &eventsBBuffer[0];
|
||||||
|
NormalisedEvent *eventsBLastStart = eventsB + bufferedEvents;
|
||||||
|
NormalisedEvent *eventsBLastEnd = eventsBLastStart;
|
||||||
|
|
||||||
|
int queuedEvents = 0;
|
||||||
|
|
||||||
|
epicsTimeStamp lastProcess = epicsTime::getCurrent();
|
||||||
|
epicsTimeStamp currentTime = lastProcess;
|
||||||
|
|
||||||
|
epicsInt64 counts[this->num_channels];
|
||||||
|
double elapsedSeconds = 0;
|
||||||
|
uint64_t startTimestamp = std::numeric_limits<uint64_t>::max();
|
||||||
|
uint64_t currTimestamp;
|
||||||
|
|
||||||
|
epicsInt32 currStatus = STATUS_IDLE;
|
||||||
|
epicsInt32 prevStatus = STATUS_IDLE;
|
||||||
|
epicsInt32 countPreset;
|
||||||
|
epicsInt32 timePreset;
|
||||||
|
epicsInt32 presetChannel;
|
||||||
|
epicsInt32 udpQueueHighWaterMark = 0;
|
||||||
|
epicsInt32 sortedQueueHighWaterMark = 0;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
|
||||||
|
queuedEvents =
|
||||||
|
eventsInQueue(this->sortedQueue); // in case we can't wait
|
||||||
|
lastProcess = epicsTime::getCurrent();
|
||||||
|
currentTime = lastProcess;
|
||||||
|
|
||||||
|
// wait for mininmum packet frequency or enough packets to ensure we
|
||||||
|
// could potentially have at least 1 packet per mcpdid
|
||||||
|
while (queuedEvents < bufferedEvents &&
|
||||||
|
epicsTimeDiffInNS(¤tTime, &lastProcess) < 250'000'000ull) {
|
||||||
|
epicsThreadSleep(0.0001); // seconds
|
||||||
|
currentTime = epicsTime::getCurrent();
|
||||||
|
queuedEvents = eventsInQueue(this->sortedQueue);
|
||||||
|
}
|
||||||
|
|
||||||
|
getIntegerParam(this->P_Status, &currStatus);
|
||||||
|
|
||||||
|
queuedEvents = std::min(queuedEvents, bufferedEvents);
|
||||||
|
|
||||||
|
NormalisedEvent *newStartPtr = eventsA + extraBufferedEvents;
|
||||||
|
|
||||||
|
// We read into the array, such that we have enough space, that the
|
||||||
|
// entirety of the leftover from the previous read can fit before this
|
||||||
|
// new read, in the case that all new events are newer timewise, and
|
||||||
|
// therefore, all events from eventsB have to be placed in a preceeding
|
||||||
|
// position.
|
||||||
|
epicsRingBytesGet(this->sortedQueue, (char *)newStartPtr,
|
||||||
|
queuedEvents * sizeof(NormalisedEvent));
|
||||||
|
|
||||||
|
int toProcess =
|
||||||
|
eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5;
|
||||||
|
|
||||||
|
// TODO could also consider an in-place merge
|
||||||
|
eventsBLastEnd = std::merge(newStartPtr, newStartPtr + queuedEvents,
|
||||||
|
eventsBLastStart, eventsBLastEnd, eventsA,
|
||||||
|
oldestEventsFirst);
|
||||||
|
|
||||||
|
eventsBLastStart = eventsA + toProcess;
|
||||||
|
|
||||||
|
// TODO I haven't really taken care of the case that there are no events
|
||||||
|
|
||||||
|
if (prevStatus == STATUS_IDLE && currStatus == STATUS_COUNTING) {
|
||||||
|
|
||||||
|
getIntegerParam(this->P_CountPreset, &countPreset);
|
||||||
|
getIntegerParam(this->P_TimePreset, &timePreset);
|
||||||
|
getIntegerParam(this->P_MonitorChannel, &presetChannel);
|
||||||
|
|
||||||
|
// reset status variables
|
||||||
|
startTimestamp = eventsA[0].timestamp;
|
||||||
|
elapsedSeconds = 0;
|
||||||
|
for (size_t i = 0; i < this->num_channels; ++i) {
|
||||||
|
counts[i] = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (currStatus == STATUS_COUNTING) {
|
||||||
|
|
||||||
|
// The elapsedSeconds are round differently depending on whether we
|
||||||
|
// are using them for comparison, or for showing to the user, to
|
||||||
|
// try and make sure the data we send to kafka is correct, while
|
||||||
|
// the measurement time also appears intuitive.
|
||||||
|
for (size_t i = 0; i < toProcess; ++i) {
|
||||||
|
counts[eventsA[i].source == 0 ? eventsA[i].pixelId + 1 : 0] +=
|
||||||
|
1;
|
||||||
|
elapsedSeconds = (eventsA[i].timestamp - startTimestamp) / 1e9;
|
||||||
|
|
||||||
|
// TODO should really check there an no more events with the
|
||||||
|
// same final timestamp
|
||||||
|
if ((countPreset && counts[presetChannel] >= countPreset) ||
|
||||||
|
(timePreset && elapsedSeconds > (double)timePreset))
|
||||||
|
break;
|
||||||
|
|
||||||
|
// TODO also batchwise?
|
||||||
|
this->queueForKafka(eventsA[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (size_t i = 0; i < num_channels; ++i) {
|
||||||
|
setInteger64Param(P_Counts[i], counts[i]);
|
||||||
|
}
|
||||||
|
setDoubleParam(P_ElapsedTime, elapsedSeconds);
|
||||||
|
|
||||||
|
if ((countPreset && counts[presetChannel] >= countPreset) ||
|
||||||
|
(timePreset && elapsedSeconds > (double)timePreset)) {
|
||||||
|
setIntegerParam(this->P_Status, STATUS_IDLE);
|
||||||
|
setIntegerParam(this->P_CountPreset, 0);
|
||||||
|
setIntegerParam(this->P_TimePreset, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
prevStatus = currStatus;
|
||||||
|
|
||||||
|
std::swap(eventsA, eventsB);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void asynStreamGeneratorDriver::produce(epicsRingBytesId eventQueue,
|
||||||
|
rd_kafka_t *kafkaProducer,
|
||||||
|
const char *topic, const char *source) {
|
||||||
|
|
||||||
|
flatbuffers::FlatBufferBuilder builder(1024);
|
||||||
|
|
||||||
|
const std::size_t bufferSize = this->kafkaMaxPacketSize + 16;
|
||||||
|
|
||||||
|
std::vector<uint32_t> tof;
|
||||||
|
tof.reserve(bufferSize);
|
||||||
|
|
||||||
|
std::vector<uint32_t> did;
|
||||||
|
did.reserve(bufferSize);
|
||||||
|
|
||||||
|
epicsTimeStamp last_sent = epicsTime::getCurrent();
|
||||||
|
epicsTimeStamp now = last_sent;
|
||||||
|
int total = 0;
|
||||||
|
uint64_t message_id = 0;
|
||||||
|
|
||||||
|
NormalisedEvent ne;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
|
||||||
|
if (!epicsRingBytesIsEmpty(eventQueue)) {
|
||||||
|
|
||||||
|
++total;
|
||||||
|
epicsRingBytesGet(eventQueue, (char *)&ne, sizeof(NormalisedEvent));
|
||||||
|
tof.push_back(ne.timestamp);
|
||||||
|
did.push_back(ne.pixelId);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
epicsThreadSleep(0.001); // seconds
|
||||||
|
}
|
||||||
|
|
||||||
|
now = epicsTime::getCurrent();
|
||||||
|
|
||||||
|
// At least every 0.2 seconds
|
||||||
|
if (total >= this->kafkaMaxPacketSize ||
|
||||||
|
epicsTimeDiffInNS(&now, &last_sent) > 250'000'000ll) {
|
||||||
|
last_sent = epicsTime::getCurrent();
|
||||||
|
|
||||||
|
if (total) {
|
||||||
|
total = 0;
|
||||||
|
|
||||||
|
builder.Clear();
|
||||||
|
|
||||||
|
auto message = CreateEventMessageDirect(
|
||||||
|
builder, source, message_id++,
|
||||||
|
((uint64_t)now.secPastEpoch) * 1'000'000'000ull +
|
||||||
|
((uint64_t)now.nsec),
|
||||||
|
&tof, &did);
|
||||||
|
|
||||||
|
builder.Finish(message, "ev42");
|
||||||
|
|
||||||
|
rd_kafka_resp_err_t err = rd_kafka_producev(
|
||||||
|
kafkaProducer, RD_KAFKA_V_TOPIC(topic),
|
||||||
|
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
|
||||||
|
// RD_KAFKA_V_KEY((void *)key, key_len),
|
||||||
|
RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(),
|
||||||
|
builder.GetSize()),
|
||||||
|
// RD_KAFKA_V_OPAQUE(NULL),
|
||||||
|
RD_KAFKA_V_END);
|
||||||
|
|
||||||
|
if (err) {
|
||||||
|
epicsStdoutPrintf("Failed to produce to topic %s: %s\n",
|
||||||
|
topic, rd_kafka_err2str(err));
|
||||||
|
}
|
||||||
|
|
||||||
|
rd_kafka_poll(kafkaProducer, 0);
|
||||||
|
|
||||||
|
tof.clear();
|
||||||
|
did.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void asynStreamGeneratorDriver::produceMonitor() {
|
||||||
|
this->produce(monitorQueue, monitorProducer, monitorTopic, "monitor");
|
||||||
|
}
|
||||||
|
|
||||||
|
void asynStreamGeneratorDriver::produceDetector() {
|
||||||
|
this->produce(detectorQueue, detectorProducer, detectorTopic, "detector");
|
||||||
|
}
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
* Methods exposed to IOC Shell
|
||||||
|
*/
|
||||||
|
extern "C" {
|
||||||
|
|
||||||
|
asynStatus asynStreamGeneratorDriverConfigure(
|
||||||
|
const char *portName, const char *ipPortName, const int numChannels,
|
||||||
|
const int udpQueueSize, const char *kafkaBroker, const char *monitorTopic,
|
||||||
|
const char *detectorTopic, const int kafkaQueueSize,
|
||||||
|
const int kafkaMaxPacketSize) {
|
||||||
|
new asynStreamGeneratorDriver(portName, ipPortName, numChannels,
|
||||||
|
udpQueueSize, kafkaBroker[0], kafkaBroker,
|
||||||
|
monitorTopic, detectorTopic, kafkaQueueSize,
|
||||||
|
kafkaMaxPacketSize);
|
||||||
|
return asynSuccess;
|
||||||
|
}
|
||||||
|
|
||||||
|
static const iocshArg initArg0 = {"portName", iocshArgString};
|
||||||
|
static const iocshArg initArg1 = {"ipPortName", iocshArgString};
|
||||||
|
static const iocshArg initArg2 = {"numChannels", iocshArgInt};
|
||||||
|
static const iocshArg initArg3 = {"udpQueueSize", iocshArgInt};
|
||||||
|
static const iocshArg initArg4 = {"kafkaBroker", iocshArgString};
|
||||||
|
static const iocshArg initArg5 = {"monitorTopic", iocshArgString};
|
||||||
|
static const iocshArg initArg6 = {"detectorTopic", iocshArgString};
|
||||||
|
static const iocshArg initArg7 = {"kafkaQueueSize", iocshArgInt};
|
||||||
|
static const iocshArg initArg8 = {"kafkaMaxPacketSize", iocshArgInt};
|
||||||
|
static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2,
|
||||||
|
&initArg3, &initArg4, &initArg5,
|
||||||
|
&initArg6, &initArg7, &initArg8};
|
||||||
|
static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 9, initArgs};
|
||||||
|
static void initCallFunc(const iocshArgBuf *args) {
|
||||||
|
asynStreamGeneratorDriverConfigure(
|
||||||
|
args[0].sval, args[1].sval, args[2].ival, args[3].ival, args[4].sval,
|
||||||
|
args[5].sval, args[6].sval, args[7].ival, args[8].ival);
|
||||||
|
}
|
||||||
|
|
||||||
|
void asynStreamGeneratorDriverRegister(void) {
|
||||||
|
iocshRegister(&initFuncDef, initCallFunc);
|
||||||
|
}
|
||||||
|
|
||||||
|
epicsExportRegistrar(asynStreamGeneratorDriverRegister);
|
||||||
|
}
|
||||||
1
src/asynStreamGeneratorDriver.dbd
Normal file
1
src/asynStreamGeneratorDriver.dbd
Normal file
@@ -0,0 +1 @@
|
|||||||
|
registrar("asynStreamGeneratorDriverRegister")
|
||||||
187
src/asynStreamGeneratorDriver.h
Normal file
187
src/asynStreamGeneratorDriver.h
Normal file
@@ -0,0 +1,187 @@
|
|||||||
|
#ifndef asynStreamGeneratorDriver_H
|
||||||
|
#define asynStreamGeneratorDriver_H
|
||||||
|
|
||||||
|
#include "asynPortDriver.h"
|
||||||
|
#include <epicsRingBytes.h>
|
||||||
|
#include <librdkafka/rdkafka.h>
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
* UDP Packet Definitions
|
||||||
|
*/
|
||||||
|
struct __attribute__((__packed__)) UDPHeader {
|
||||||
|
uint16_t BufferLength;
|
||||||
|
uint16_t BufferType;
|
||||||
|
uint16_t HeaderLength;
|
||||||
|
uint16_t BufferNumber;
|
||||||
|
uint16_t RunCmdID;
|
||||||
|
uint16_t Status : 8;
|
||||||
|
uint16_t McpdID : 8;
|
||||||
|
uint16_t TimeStamp[3];
|
||||||
|
uint16_t Parameter0[3];
|
||||||
|
uint16_t Parameter1[3];
|
||||||
|
uint16_t Parameter2[3];
|
||||||
|
uint16_t Parameter3[3];
|
||||||
|
|
||||||
|
inline uint64_t nanosecs() const {
|
||||||
|
uint64_t nsec{((uint64_t)TimeStamp[2]) << 32 |
|
||||||
|
((uint64_t)TimeStamp[1]) << 16 | (uint64_t)TimeStamp[0]};
|
||||||
|
return nsec * 100;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct __attribute__((__packed__)) DetectorEvent {
|
||||||
|
uint64_t TimeStamp : 19;
|
||||||
|
uint16_t XPosition : 10;
|
||||||
|
uint16_t YPosition : 10;
|
||||||
|
uint16_t Amplitude : 8;
|
||||||
|
uint16_t Id : 1;
|
||||||
|
inline uint32_t nanosecs() const { return TimeStamp * 100; }
|
||||||
|
inline uint64_t pixelId(uint32_t mpcdId) const {
|
||||||
|
const uint32_t x_pixels = 128;
|
||||||
|
const uint32_t y_pixels = 128;
|
||||||
|
return (mpcdId - 1) * x_pixels * y_pixels +
|
||||||
|
x_pixels * (uint32_t)this->XPosition + (uint32_t)this->YPosition;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct __attribute__((__packed__)) MonitorEvent {
|
||||||
|
uint64_t TimeStamp : 19;
|
||||||
|
uint64_t Data : 21;
|
||||||
|
uint64_t DataID : 4;
|
||||||
|
uint64_t TriggerID : 3;
|
||||||
|
uint64_t Id : 1;
|
||||||
|
inline uint32_t nanosecs() const { return TimeStamp * 100; }
|
||||||
|
};
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
* Simplified Event Struct Definition
|
||||||
|
*/
|
||||||
|
|
||||||
|
struct __attribute__((__packed__)) NormalisedEvent {
|
||||||
|
uint64_t timestamp;
|
||||||
|
uint32_t pixelId : 24;
|
||||||
|
uint8_t source;
|
||||||
|
|
||||||
|
// inline NormalisedEvent(uint64_t timestamp, uint8_t source, uint32_t
|
||||||
|
// pixelId)
|
||||||
|
// : timestamp(timestamp), source(source), pixelId(pixelId){};
|
||||||
|
};
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
* Status values that should match the definition in db/daq_common.db
|
||||||
|
*/
|
||||||
|
#define STATUS_IDLE 0
|
||||||
|
#define STATUS_COUNTING 1
|
||||||
|
#define STATUS_LOWRATE 2
|
||||||
|
#define STATUS_PAUSED 3
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
* Parameters for use in DB records
|
||||||
|
*
|
||||||
|
* i.e.e drvInfo strings that are used to identify the parameters
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define P_StatusString "STATUS"
|
||||||
|
#define P_ResetString "RESET"
|
||||||
|
#define P_StopString "STOP"
|
||||||
|
#define P_CountPresetString "P_CNT"
|
||||||
|
#define P_TimePresetString "P_TIME"
|
||||||
|
#define P_ElapsedTimeString "TIME"
|
||||||
|
#define P_ClearElapsedTimeString "C_TIME"
|
||||||
|
#define P_MonitorChannelString "MONITOR"
|
||||||
|
#define P_ThresholdString "THRESH"
|
||||||
|
#define P_ThresholdChannelString "THRESH_CH"
|
||||||
|
|
||||||
|
#define P_CountsString "COUNTS%d"
|
||||||
|
#define P_RateString "RATE%d"
|
||||||
|
#define P_ClearCountsString "C_%d"
|
||||||
|
|
||||||
|
#define P_UdpDroppedString "DROP"
|
||||||
|
#define P_UdpQueueHighWaterMarkString "UDP"
|
||||||
|
#define P_SortedQueueHighWaterMarkString "SORT"
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
* Stream Generator Coordinating Class
|
||||||
|
*/
|
||||||
|
class asynStreamGeneratorDriver : public asynPortDriver {
|
||||||
|
public:
|
||||||
|
asynStreamGeneratorDriver(const char *portName, const char *ipPortName,
|
||||||
|
const int numChannels, const int udpQueueSize,
|
||||||
|
const bool enableKafkaStream,
|
||||||
|
const char *kafkaBroker, const char *monitorTopic,
|
||||||
|
const char *detectorTopic,
|
||||||
|
const int kafkaQueueSize,
|
||||||
|
const int kafkaMaxPacketSize);
|
||||||
|
virtual ~asynStreamGeneratorDriver();
|
||||||
|
|
||||||
|
virtual asynStatus readInt32(asynUser *pasynUser, epicsInt32 *value);
|
||||||
|
virtual asynStatus writeInt32(asynUser *pasynUser, epicsInt32 value);
|
||||||
|
|
||||||
|
void receiveUDP();
|
||||||
|
void normaliseUDP();
|
||||||
|
void partialSortEvents();
|
||||||
|
void processEvents();
|
||||||
|
void produceMonitor();
|
||||||
|
void produceDetector();
|
||||||
|
|
||||||
|
protected:
|
||||||
|
// Parameter Identifying IDs
|
||||||
|
int P_Status;
|
||||||
|
int P_Reset;
|
||||||
|
int P_Stop;
|
||||||
|
int P_CountPreset;
|
||||||
|
int P_TimePreset;
|
||||||
|
int P_ElapsedTime;
|
||||||
|
int P_ClearElapsedTime;
|
||||||
|
int P_MonitorChannel;
|
||||||
|
int P_Threshold;
|
||||||
|
int P_ThresholdChannel;
|
||||||
|
int *P_Counts;
|
||||||
|
int *P_Rates;
|
||||||
|
int *P_ClearCounts;
|
||||||
|
|
||||||
|
// System Status Parameter Identifying IDs
|
||||||
|
int P_UdpDropped;
|
||||||
|
int P_UdpQueueHighWaterMark;
|
||||||
|
int P_SortedQueueHighWaterMark;
|
||||||
|
|
||||||
|
private:
|
||||||
|
asynUser *pasynUDPUser;
|
||||||
|
epicsEventId pausedEventId;
|
||||||
|
|
||||||
|
const int num_channels;
|
||||||
|
const bool kafkaEnabled;
|
||||||
|
const int kafkaQueueSize;
|
||||||
|
const int kafkaMaxPacketSize;
|
||||||
|
|
||||||
|
const int udpQueueSize;
|
||||||
|
epicsRingBytesId udpQueue;
|
||||||
|
epicsRingBytesId normalisedQueue;
|
||||||
|
epicsRingBytesId sortedQueue;
|
||||||
|
|
||||||
|
epicsRingBytesId monitorQueue;
|
||||||
|
rd_kafka_t *monitorProducer;
|
||||||
|
const char *monitorTopic;
|
||||||
|
|
||||||
|
epicsRingBytesId detectorQueue;
|
||||||
|
rd_kafka_t *detectorProducer;
|
||||||
|
const char *detectorTopic;
|
||||||
|
|
||||||
|
constexpr static char *driverName = "StreamGenerator";
|
||||||
|
|
||||||
|
asynStatus createInt32Param(asynStatus status, char *name, int *variable,
|
||||||
|
epicsInt32 initialValue = 0);
|
||||||
|
|
||||||
|
asynStatus createInt64Param(asynStatus status, char *name, int *variable,
|
||||||
|
epicsInt64 initialValue = 0);
|
||||||
|
|
||||||
|
asynStatus createFloat64Param(asynStatus status, char *name, int *variable,
|
||||||
|
double initialValue = 0);
|
||||||
|
|
||||||
|
inline void queueForKafka(NormalisedEvent &ne);
|
||||||
|
|
||||||
|
void produce(epicsRingBytesId eventQueue, rd_kafka_t *kafkaProducer,
|
||||||
|
const char *topic, const char *source);
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif
|
||||||
342
udp_rate.py
342
udp_rate.py
@@ -1,342 +0,0 @@
|
|||||||
import queue
|
|
||||||
import socket
|
|
||||||
import time
|
|
||||||
import threading
|
|
||||||
from uuid import uuid4
|
|
||||||
import math
|
|
||||||
|
|
||||||
from confluent_kafka import Producer
|
|
||||||
import streaming_data_types
|
|
||||||
|
|
||||||
# receiving directly (can also specify correlation unit ip)
|
|
||||||
UDP_IP = ""
|
|
||||||
UDP_PORT = 54321
|
|
||||||
|
|
||||||
# If redirecting traffic via
|
|
||||||
# socat -U - udp4-recv:54321 | tee >( socat -u - udp4-datagram:127.0.0.1:54322 ) | socat -u - udp4-datagram:127.0.0.1:54323
|
|
||||||
# UDP_IP = "127.0.0.1"
|
|
||||||
# UDP_PORT = 54323
|
|
||||||
|
|
||||||
WINDOWSECONDS = 5
|
|
||||||
WINDOWSIZE = 20000 * WINDOWSECONDS
|
|
||||||
MONITORS = 4 # We have max 4 monitors
|
|
||||||
|
|
||||||
time_offset = None # Estimate of clock offset
|
|
||||||
|
|
||||||
time_window = {
|
|
||||||
i: queue.Queue(maxsize=WINDOWSIZE)
|
|
||||||
for i in range(MONITORS)
|
|
||||||
}
|
|
||||||
|
|
||||||
# event_time_window = queue.Queue(maxsize=50000 * WINDOWSECONDS)
|
|
||||||
EVENT_WINDOWSIZE = 50000
|
|
||||||
EVENT_WINDOW_PTR = 0
|
|
||||||
event_time_window = [0 for i in range(EVENT_WINDOWSIZE)]
|
|
||||||
|
|
||||||
event_average_rate = 0
|
|
||||||
event_last_timestamp = None
|
|
||||||
|
|
||||||
MISSED_PACKETS = -9 # All modules appear to miss the first time due to initialisation as 0
|
|
||||||
|
|
||||||
# missed_packets_time_window = queue.Queue(maxsize=100)
|
|
||||||
|
|
||||||
def print_monitor_rates():
|
|
||||||
while True:
|
|
||||||
for i in range(MONITORS):
|
|
||||||
msg = f"Monitor {i+1}: {time_window[i].qsize() / WINDOWSECONDS} cts/s"
|
|
||||||
try:
|
|
||||||
earliest = time_window[i].queue[0]
|
|
||||||
newest = max(time_window[i].queue)
|
|
||||||
t = time.time()
|
|
||||||
msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s'
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
print(msg)
|
|
||||||
|
|
||||||
# try:
|
|
||||||
# print(f'Events: {1 / event_average_rate} cts/s')
|
|
||||||
# except:
|
|
||||||
# pass
|
|
||||||
|
|
||||||
try:
|
|
||||||
print(f'Events: {round(1 / (sum(event_time_window) / EVENT_WINDOWSIZE * 1e-7), 2)} cts/s')
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
print(f'Missed Packets: {MISSED_PACKETS}')
|
|
||||||
|
|
||||||
# Detector Events
|
|
||||||
# msg = f"Events : {event_time_window.qsize() / WINDOWSECONDS} cts/s"
|
|
||||||
# try:
|
|
||||||
# earliest = event_time_window.queue[0]
|
|
||||||
# newest = max(event_time_window.queue)
|
|
||||||
# t = time.time()
|
|
||||||
# msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s'
|
|
||||||
# except:
|
|
||||||
# pass
|
|
||||||
|
|
||||||
# print(msg)
|
|
||||||
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
threading.Thread(target=print_monitor_rates, daemon=True).start()
|
|
||||||
|
|
||||||
def clean_monitor_rates():
|
|
||||||
latest = 0
|
|
||||||
while True:
|
|
||||||
for d_id in range(MONITORS):
|
|
||||||
t_w = time_window[d_id]
|
|
||||||
if not t_w.empty():
|
|
||||||
# TODO probably should switch to a priority queue
|
|
||||||
# as the messages might not be in order
|
|
||||||
# TODO could also just replace with a low-pass filter
|
|
||||||
# would be a lot more efficient
|
|
||||||
# TODO the way this is done, we need trigger events
|
|
||||||
# in order for the signal to decay back to 0.
|
|
||||||
# If no events come, the rate remains stuck
|
|
||||||
latest = max(latest, max(t_w.queue))
|
|
||||||
# latest = time_window[1].queue[-1]
|
|
||||||
try:
|
|
||||||
while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7):
|
|
||||||
t_w.get_nowait()
|
|
||||||
except IndexError:
|
|
||||||
pass
|
|
||||||
time.sleep(0.01)
|
|
||||||
|
|
||||||
threading.Thread(target=clean_monitor_rates, daemon=True).start()
|
|
||||||
|
|
||||||
|
|
||||||
# def clean_event_rates():
|
|
||||||
# latest = 0
|
|
||||||
# while True:
|
|
||||||
# t_w = event_time_window
|
|
||||||
# if not t_w.empty():
|
|
||||||
# # TODO probably should switch to a priority queue
|
|
||||||
# # as the messages might not be in order
|
|
||||||
# # TODO could also just replace with a low-pass filter
|
|
||||||
# # would be a lot more efficient
|
|
||||||
# # TODO the way this is done, we need trigger events
|
|
||||||
# # in order for the signal to decay back to 0.
|
|
||||||
# # If no events come, the rate remains stuck
|
|
||||||
# #latest = max(latest, max(t_w.queue))
|
|
||||||
# try:
|
|
||||||
# latest = time_window[1].queue[-1]
|
|
||||||
# while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7):
|
|
||||||
# t_w.get_nowait()
|
|
||||||
# except IndexError:
|
|
||||||
# pass
|
|
||||||
# time.sleep(0.005)
|
|
||||||
#
|
|
||||||
# threading.Thread(target=clean_event_rates, daemon=True).start()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Event Kafka Producer
|
|
||||||
|
|
||||||
event_queue = queue.Queue()
|
|
||||||
|
|
||||||
def event_producer():
|
|
||||||
producer_config = {
|
|
||||||
'bootstrap.servers': "linkafka01:9092",
|
|
||||||
'queue.buffering.max.messages': 1e7,
|
|
||||||
}
|
|
||||||
prod = Producer(producer_config)
|
|
||||||
|
|
||||||
st = time.time()
|
|
||||||
|
|
||||||
msg_id = 0
|
|
||||||
|
|
||||||
b_size = 10000
|
|
||||||
b_ptr = 0
|
|
||||||
pixel_buffer = [0 for _ in range(b_size)]
|
|
||||||
time_buffer = [0 for _ in range(b_size)]
|
|
||||||
poll_cnt = 0
|
|
||||||
|
|
||||||
while True:
|
|
||||||
(p_id, timestamp) = event_queue.get()
|
|
||||||
|
|
||||||
pixel_buffer[b_ptr] = p_id
|
|
||||||
time_buffer[b_ptr] = timestamp
|
|
||||||
b_ptr += 1
|
|
||||||
|
|
||||||
nt = time.time()
|
|
||||||
if b_ptr == b_size or nt - st > 0.001:
|
|
||||||
st = nt
|
|
||||||
|
|
||||||
if b_ptr > 0:
|
|
||||||
message = streaming_data_types.serialise_ev42(
|
|
||||||
message_id = msg_id,
|
|
||||||
pulse_time = time_buffer[0] * 100, # int(time.time() * 1_000_000_000),
|
|
||||||
time_of_flight = time_buffer[0:b_ptr],
|
|
||||||
detector_id = pixel_buffer[0:b_ptr],
|
|
||||||
source_name = '',
|
|
||||||
)
|
|
||||||
|
|
||||||
msg_id = (msg_id + 1) % 100000000
|
|
||||||
b_ptr = 0
|
|
||||||
|
|
||||||
prod.produce(
|
|
||||||
topic = "DMC_detector",
|
|
||||||
value = message,
|
|
||||||
partition = 0,
|
|
||||||
)
|
|
||||||
|
|
||||||
# if poll_cnt % 1000 == 0:
|
|
||||||
prod.poll(0)
|
|
||||||
poll_cnt = (poll_cnt + 1) % 1000
|
|
||||||
|
|
||||||
threading.Thread(target=event_producer, daemon=True).start()
|
|
||||||
|
|
||||||
# Monitor Kafka Producer
|
|
||||||
|
|
||||||
monitor_queue = queue.Queue()
|
|
||||||
|
|
||||||
def monitor_producer():
|
|
||||||
producer_config = {
|
|
||||||
'bootstrap.servers': "linkafka01:9092",
|
|
||||||
'queue.buffering.max.messages': 1e7,
|
|
||||||
}
|
|
||||||
prod = Producer(producer_config)
|
|
||||||
|
|
||||||
monitor_buffer = [0 for i in range(MONITORS)]
|
|
||||||
monitor_time = [0 for i in range(MONITORS)]
|
|
||||||
|
|
||||||
st = time.time()
|
|
||||||
|
|
||||||
poll_cnt = 0
|
|
||||||
|
|
||||||
while True:
|
|
||||||
(d_id, timestamp) = monitor_queue.get()
|
|
||||||
|
|
||||||
monitor_buffer[d_id] += 1
|
|
||||||
monitor_time[d_id] = timestamp
|
|
||||||
|
|
||||||
nt = time.time()
|
|
||||||
if nt - st > 0.05:
|
|
||||||
st = nt
|
|
||||||
|
|
||||||
for i in range(MONITORS):
|
|
||||||
if monitor_buffer[d_id]:
|
|
||||||
message = streaming_data_types.serialise_f142(
|
|
||||||
source_name = f"monitor{d_id+1}",
|
|
||||||
value = monitor_buffer[d_id],
|
|
||||||
# ns resolution (supposed to be past epoch, not what the detector returns though)
|
|
||||||
timestamp_unix_ns = monitor_time[d_id] * 100 # send time of last monitor
|
|
||||||
)
|
|
||||||
|
|
||||||
prod.produce(
|
|
||||||
topic = "DMC_neutron_monitor",
|
|
||||||
value = message,
|
|
||||||
partition = 0,
|
|
||||||
)
|
|
||||||
|
|
||||||
monitor_buffer[d_id] = 0
|
|
||||||
|
|
||||||
if poll_cnt % 1000 == 0:
|
|
||||||
prod.poll(0)
|
|
||||||
poll_cnt = (poll_cnt + 1) % 1000
|
|
||||||
|
|
||||||
threading.Thread(target=monitor_producer, daemon=True).start()
|
|
||||||
|
|
||||||
|
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
||||||
sock.bind((UDP_IP, UDP_PORT))
|
|
||||||
|
|
||||||
val = 0
|
|
||||||
start_time = time.time()
|
|
||||||
|
|
||||||
module_counts = [0 for i in range(10)]
|
|
||||||
|
|
||||||
EVENTS = 0
|
|
||||||
|
|
||||||
while True:
|
|
||||||
data, addr = sock.recvfrom(2056) # Buffer size is 1024 bytes
|
|
||||||
raw_header = data[:42]
|
|
||||||
raw_data = data[42:]
|
|
||||||
|
|
||||||
(buffer_length, buffer_type, header_length,
|
|
||||||
buffer_number, run_id, mcpd_status,
|
|
||||||
t_low, t_mid, t_high, *_) = memoryview(raw_header).cast('H')
|
|
||||||
mcpd_id = ( mcpd_status >> 8 ) & 0xff
|
|
||||||
mcpd_status = ( mcpd_status ) & 0x3
|
|
||||||
running_msg = "running" if (mcpd_status & 0x1) else "stopped"
|
|
||||||
sync_msg = "in sync" if (mcpd_status & 0x2) else "sync error"
|
|
||||||
timestamp = ( t_high << 32 ) | ( t_mid << 16 ) | t_low # 100 ns resolution
|
|
||||||
#print(f'Packet {int(timestamp * 1e-7)}s => buffer: {buffer_number}, length: {int(buffer_length*2/6)} events, status: {mcpd_status} {mcpd_id} {running_msg} with {sync_msg}')
|
|
||||||
# print(f'Packet => buffer: {mcpd_id}-{buffer_number}, length: {int((buffer_length-21)/3)} events, status: {mcpd_status}')
|
|
||||||
|
|
||||||
if time_offset is None:
|
|
||||||
time_offset = time.time() * 1e7 - timestamp
|
|
||||||
|
|
||||||
if buffer_number - module_counts[mcpd_id] != 1:
|
|
||||||
MISSED_PACKETS += 1
|
|
||||||
# if missed_packets_time_window.full():
|
|
||||||
# missed_packets_time_window.get_nowait()
|
|
||||||
# missed_packets_time_window.put(timestamp)
|
|
||||||
|
|
||||||
module_counts[mcpd_id] = buffer_number
|
|
||||||
|
|
||||||
for i in range(0, len(raw_data), 6):
|
|
||||||
event = memoryview(raw_data)[i:i+6]
|
|
||||||
event_type = event[5] >> 7
|
|
||||||
# print(event_type)
|
|
||||||
|
|
||||||
if event_type: # Trigger Event
|
|
||||||
t_id = ( event[5] >> 4 ) & 0x7
|
|
||||||
d_id = event[5] & 0xf
|
|
||||||
event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0]
|
|
||||||
# print(f'Trigger event {event_timestamp * 1e-7}s => TrigID: {t_id}, DataID: {d_id}')
|
|
||||||
|
|
||||||
t_w = time_window[d_id]
|
|
||||||
t_w.put_nowait(event_timestamp)
|
|
||||||
|
|
||||||
monitor_queue.put_nowait((d_id, event_timestamp))
|
|
||||||
|
|
||||||
else: # Neutron Event
|
|
||||||
x_pixels = 128
|
|
||||||
y_pixels = 128
|
|
||||||
amplitude = ( event[5] << 1 ) | ( event[4] >> 7 )
|
|
||||||
|
|
||||||
# The DMC StreamHistogrammer setup currently expects each module to
|
|
||||||
# be 128 * 128 pixels but the resolution in the packages is
|
|
||||||
# actually 10bit. We remove the lowest 3 bits.
|
|
||||||
x = (( (event[3] & 0x1f) << 5 | (event[2] & 0xf8) >> 3 ) & 0x3ff) >> 3
|
|
||||||
y = (( (event[4] & 0x7f) << 3 | (event[3] & 0xe0) >> 5 ) & 0x3ff) >> 3
|
|
||||||
event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0]
|
|
||||||
# print(f'Neutron event {event_timestamp * 1e-7}s: {amplitude}, x: {x}, y: {y}')
|
|
||||||
|
|
||||||
|
|
||||||
if event_last_timestamp is None:
|
|
||||||
event_last_timestamp = event_timestamp
|
|
||||||
|
|
||||||
# Seems like at higher frequencies these come very much out of order
|
|
||||||
# so this is very approximate
|
|
||||||
event_time_window[EVENT_WINDOW_PTR] = event_timestamp - event_last_timestamp
|
|
||||||
EVENT_WINDOW_PTR = (EVENT_WINDOW_PTR + 1) % EVENT_WINDOWSIZE
|
|
||||||
event_last_timestamp = event_timestamp
|
|
||||||
|
|
||||||
# I suppose this doesn't work mostly due to the timestamps ordering...
|
|
||||||
# event_timestamp_seconds = event_timestamp * 1e-7
|
|
||||||
# if event_last_timestamp is None:
|
|
||||||
# event_last_timestamp = event_timestamp_seconds
|
|
||||||
|
|
||||||
# f_cutoff = 1e6 # Hz
|
|
||||||
# tau = 1 / ( 2 * math.pi * f_cutoff)
|
|
||||||
# dt = event_timestamp_seconds - event_last_timestamp
|
|
||||||
# if dt > 0:
|
|
||||||
# w = math.exp(-dt / tau)
|
|
||||||
# event_average_rate = w * dt + event_average_rate * (1 - w)
|
|
||||||
# event_last_timestamp = event_timestamp_seconds
|
|
||||||
|
|
||||||
# EVENTS += 1
|
|
||||||
# a = (mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y
|
|
||||||
# print((EVENTS, x, y, a, a < 128 * 128 * 9, mcpd_id))
|
|
||||||
# if not a < 128 * 128 * 9:
|
|
||||||
# print((event[3], event[3] << 5, event[2], event[2] >> 3))
|
|
||||||
|
|
||||||
event_queue.put_nowait((
|
|
||||||
(mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y,
|
|
||||||
event_timestamp
|
|
||||||
))
|
|
||||||
Reference in New Issue
Block a user