Merge pull request 'less_logic_in_udp_thread' (#1) from less_logic_in_udp_thread into main
Reviewed-on: #1
This commit was merged in pull request #1.
This commit is contained in:
245
.clang-format
Normal file
245
.clang-format
Normal file
@@ -0,0 +1,245 @@
|
||||
---
|
||||
Language: Cpp
|
||||
# BasedOnStyle: LLVM
|
||||
AccessModifierOffset: -2
|
||||
AlignAfterOpenBracket: Align
|
||||
AlignArrayOfStructures: None
|
||||
AlignConsecutiveAssignments:
|
||||
Enabled: false
|
||||
AcrossEmptyLines: false
|
||||
AcrossComments: false
|
||||
AlignCompound: false
|
||||
AlignFunctionPointers: false
|
||||
PadOperators: true
|
||||
AlignConsecutiveBitFields:
|
||||
Enabled: false
|
||||
AcrossEmptyLines: false
|
||||
AcrossComments: false
|
||||
AlignCompound: false
|
||||
AlignFunctionPointers: false
|
||||
PadOperators: false
|
||||
AlignConsecutiveDeclarations:
|
||||
Enabled: false
|
||||
AcrossEmptyLines: false
|
||||
AcrossComments: false
|
||||
AlignCompound: false
|
||||
AlignFunctionPointers: false
|
||||
PadOperators: false
|
||||
AlignConsecutiveMacros:
|
||||
Enabled: false
|
||||
AcrossEmptyLines: false
|
||||
AcrossComments: false
|
||||
AlignCompound: false
|
||||
AlignFunctionPointers: false
|
||||
PadOperators: false
|
||||
AlignConsecutiveShortCaseStatements:
|
||||
Enabled: false
|
||||
AcrossEmptyLines: false
|
||||
AcrossComments: false
|
||||
AlignCaseColons: false
|
||||
AlignEscapedNewlines: Right
|
||||
AlignOperands: Align
|
||||
AlignTrailingComments:
|
||||
Kind: Always
|
||||
OverEmptyLines: 0
|
||||
AllowAllArgumentsOnNextLine: true
|
||||
AllowAllParametersOfDeclarationOnNextLine: true
|
||||
AllowBreakBeforeNoexceptSpecifier: Never
|
||||
AllowShortBlocksOnASingleLine: Never
|
||||
AllowShortCaseLabelsOnASingleLine: false
|
||||
AllowShortCompoundRequirementOnASingleLine: true
|
||||
AllowShortEnumsOnASingleLine: true
|
||||
AllowShortFunctionsOnASingleLine: All
|
||||
AllowShortIfStatementsOnASingleLine: Never
|
||||
AllowShortLambdasOnASingleLine: All
|
||||
AllowShortLoopsOnASingleLine: false
|
||||
AlwaysBreakAfterDefinitionReturnType: None
|
||||
AlwaysBreakAfterReturnType: None
|
||||
AlwaysBreakBeforeMultilineStrings: false
|
||||
AlwaysBreakTemplateDeclarations: MultiLine
|
||||
AttributeMacros:
|
||||
- __capability
|
||||
BinPackArguments: true
|
||||
BinPackParameters: true
|
||||
BitFieldColonSpacing: Both
|
||||
BraceWrapping:
|
||||
AfterCaseLabel: false
|
||||
AfterClass: false
|
||||
AfterControlStatement: Never
|
||||
AfterEnum: false
|
||||
AfterExternBlock: false
|
||||
AfterFunction: false
|
||||
AfterNamespace: false
|
||||
AfterObjCDeclaration: false
|
||||
AfterStruct: false
|
||||
AfterUnion: false
|
||||
BeforeCatch: false
|
||||
BeforeElse: false
|
||||
BeforeLambdaBody: false
|
||||
BeforeWhile: false
|
||||
IndentBraces: false
|
||||
SplitEmptyFunction: true
|
||||
SplitEmptyRecord: true
|
||||
SplitEmptyNamespace: true
|
||||
BreakAdjacentStringLiterals: true
|
||||
BreakAfterAttributes: Leave
|
||||
BreakAfterJavaFieldAnnotations: false
|
||||
BreakArrays: true
|
||||
BreakBeforeBinaryOperators: None
|
||||
BreakBeforeConceptDeclarations: Always
|
||||
BreakBeforeBraces: Attach
|
||||
BreakBeforeInlineASMColon: OnlyMultiline
|
||||
BreakBeforeTernaryOperators: true
|
||||
BreakConstructorInitializers: BeforeColon
|
||||
BreakInheritanceList: BeforeColon
|
||||
BreakStringLiterals: true
|
||||
ColumnLimit: 80
|
||||
CommentPragmas: '^ IWYU pragma:'
|
||||
CompactNamespaces: false
|
||||
ConstructorInitializerIndentWidth: 4
|
||||
ContinuationIndentWidth: 4
|
||||
Cpp11BracedListStyle: true
|
||||
DerivePointerAlignment: false
|
||||
DisableFormat: false
|
||||
EmptyLineAfterAccessModifier: Never
|
||||
EmptyLineBeforeAccessModifier: LogicalBlock
|
||||
ExperimentalAutoDetectBinPacking: false
|
||||
FixNamespaceComments: true
|
||||
ForEachMacros:
|
||||
- foreach
|
||||
- Q_FOREACH
|
||||
- BOOST_FOREACH
|
||||
IfMacros:
|
||||
- KJ_IF_MAYBE
|
||||
IncludeBlocks: Preserve
|
||||
IncludeCategories:
|
||||
- Regex: '^"(llvm|llvm-c|clang|clang-c)/'
|
||||
Priority: 2
|
||||
SortPriority: 0
|
||||
CaseSensitive: false
|
||||
- Regex: '^(<|"(gtest|gmock|isl|json)/)'
|
||||
Priority: 3
|
||||
SortPriority: 0
|
||||
CaseSensitive: false
|
||||
- Regex: '.*'
|
||||
Priority: 1
|
||||
SortPriority: 0
|
||||
CaseSensitive: false
|
||||
IncludeIsMainRegex: '(Test)?$'
|
||||
IncludeIsMainSourceRegex: ''
|
||||
IndentAccessModifiers: false
|
||||
IndentCaseBlocks: false
|
||||
IndentCaseLabels: false
|
||||
IndentExternBlock: AfterExternBlock
|
||||
IndentGotoLabels: true
|
||||
IndentPPDirectives: None
|
||||
IndentRequiresClause: true
|
||||
IndentWidth: 4
|
||||
IndentWrappedFunctionNames: false
|
||||
InsertBraces: false
|
||||
InsertNewlineAtEOF: false
|
||||
InsertTrailingCommas: None
|
||||
IntegerLiteralSeparator:
|
||||
Binary: 0
|
||||
BinaryMinDigits: 0
|
||||
Decimal: 0
|
||||
DecimalMinDigits: 0
|
||||
Hex: 0
|
||||
HexMinDigits: 0
|
||||
JavaScriptQuotes: Leave
|
||||
JavaScriptWrapImports: true
|
||||
KeepEmptyLinesAtTheStartOfBlocks: true
|
||||
KeepEmptyLinesAtEOF: false
|
||||
LambdaBodyIndentation: Signature
|
||||
LineEnding: DeriveLF
|
||||
MacroBlockBegin: ''
|
||||
MacroBlockEnd: ''
|
||||
MaxEmptyLinesToKeep: 1
|
||||
NamespaceIndentation: None
|
||||
ObjCBinPackProtocolList: Auto
|
||||
ObjCBlockIndentWidth: 2
|
||||
ObjCBreakBeforeNestedBlockParam: true
|
||||
ObjCSpaceAfterProperty: false
|
||||
ObjCSpaceBeforeProtocolList: true
|
||||
PackConstructorInitializers: BinPack
|
||||
PenaltyBreakAssignment: 2
|
||||
PenaltyBreakBeforeFirstCallParameter: 19
|
||||
PenaltyBreakComment: 300
|
||||
PenaltyBreakFirstLessLess: 120
|
||||
PenaltyBreakOpenParenthesis: 0
|
||||
PenaltyBreakScopeResolution: 500
|
||||
PenaltyBreakString: 1000
|
||||
PenaltyBreakTemplateDeclaration: 10
|
||||
PenaltyExcessCharacter: 1000000
|
||||
PenaltyIndentedWhitespace: 0
|
||||
PenaltyReturnTypeOnItsOwnLine: 60
|
||||
PointerAlignment: Right
|
||||
PPIndentWidth: -1
|
||||
QualifierAlignment: Leave
|
||||
ReferenceAlignment: Pointer
|
||||
ReflowComments: true
|
||||
RemoveBracesLLVM: false
|
||||
RemoveParentheses: Leave
|
||||
RemoveSemicolon: false
|
||||
RequiresClausePosition: OwnLine
|
||||
RequiresExpressionIndentation: OuterScope
|
||||
SeparateDefinitionBlocks: Leave
|
||||
ShortNamespaceLines: 1
|
||||
SkipMacroDefinitionBody: false
|
||||
SortIncludes: CaseSensitive
|
||||
SortJavaStaticImport: Before
|
||||
SortUsingDeclarations: LexicographicNumeric
|
||||
SpaceAfterCStyleCast: false
|
||||
SpaceAfterLogicalNot: false
|
||||
SpaceAfterTemplateKeyword: true
|
||||
SpaceAroundPointerQualifiers: Default
|
||||
SpaceBeforeAssignmentOperators: true
|
||||
SpaceBeforeCaseColon: false
|
||||
SpaceBeforeCpp11BracedList: false
|
||||
SpaceBeforeCtorInitializerColon: true
|
||||
SpaceBeforeInheritanceColon: true
|
||||
SpaceBeforeJsonColon: false
|
||||
SpaceBeforeParens: ControlStatements
|
||||
SpaceBeforeParensOptions:
|
||||
AfterControlStatements: true
|
||||
AfterForeachMacros: true
|
||||
AfterFunctionDefinitionName: false
|
||||
AfterFunctionDeclarationName: false
|
||||
AfterIfMacros: true
|
||||
AfterOverloadedOperator: false
|
||||
AfterPlacementOperator: true
|
||||
AfterRequiresInClause: false
|
||||
AfterRequiresInExpression: false
|
||||
BeforeNonEmptyParentheses: false
|
||||
SpaceBeforeRangeBasedForLoopColon: true
|
||||
SpaceBeforeSquareBrackets: false
|
||||
SpaceInEmptyBlock: false
|
||||
SpacesBeforeTrailingComments: 1
|
||||
SpacesInAngles: Never
|
||||
SpacesInContainerLiterals: true
|
||||
SpacesInLineCommentPrefix:
|
||||
Minimum: 1
|
||||
Maximum: -1
|
||||
SpacesInParens: Never
|
||||
SpacesInParensOptions:
|
||||
InCStyleCasts: false
|
||||
InConditionalStatements: false
|
||||
InEmptyParentheses: false
|
||||
Other: false
|
||||
SpacesInSquareBrackets: false
|
||||
Standard: Latest
|
||||
StatementAttributeLikeMacros:
|
||||
- Q_EMIT
|
||||
StatementMacros:
|
||||
- Q_UNUSED
|
||||
- QT_REQUIRE_VERSION
|
||||
TabWidth: 8
|
||||
UseTab: Never
|
||||
VerilogBreakBetweenInstancePorts: true
|
||||
WhitespaceSensitiveMacros:
|
||||
- BOOST_PP_STRINGIZE
|
||||
- CF_SWIFT_NAME
|
||||
- NS_SWIFT_NAME
|
||||
- PP_STRINGIZE
|
||||
- STRINGIZE
|
||||
...
|
||||
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
O.*
|
||||
.*ignore
|
||||
schemas/
|
||||
6
.gitmodules
vendored
Normal file
6
.gitmodules
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
[submodule "dep/streaming-data-types"]
|
||||
path = dep/streaming-data-types
|
||||
url = git@gitea.psi.ch:lin-controls/streaming-data-types.git
|
||||
[submodule "dep/flatbuffers"]
|
||||
path = dep/flatbuffers
|
||||
url = git@gitea.psi.ch:lin-controls/flatbuffers.git
|
||||
30
Makefile
Normal file
30
Makefile
Normal file
@@ -0,0 +1,30 @@
|
||||
# Include the external Makefile
|
||||
include /ioc/tools/driver.makefile
|
||||
|
||||
MODULE=StreamGenerator
|
||||
BUILDCLASSES=Linux
|
||||
EPICS_VERSIONS=7.0.7
|
||||
#ARCH_FILTER=RHEL%
|
||||
ARCH_FILTER=linux-x86_64
|
||||
|
||||
# Additional module dependencies
|
||||
REQUIRED+=asyn
|
||||
|
||||
DBDS += src/asynStreamGeneratorDriver.dbd
|
||||
|
||||
# DB files to include in the release
|
||||
TEMPLATES += db/channels.db db/daq_common.db
|
||||
|
||||
# HEADERS += src/asynStreamGeneratorDriver.h
|
||||
|
||||
# Source files to build
|
||||
SOURCES += src/asynStreamGeneratorDriver.cpp
|
||||
|
||||
# I don't think specifying the optimisation level like this is correct...
|
||||
# but I doesn't hurt :D
|
||||
USR_CFLAGS += -O3 -Wall -Wextra -Wunused-result -Werror -fvisibility=hidden # -Wpedantic // Does not work because EPICS macros trigger warnings
|
||||
|
||||
# Required to support EV42/44
|
||||
USR_CXXFLAGS += -O3 -I../dep/flatbuffers/include/ -I../schemas
|
||||
|
||||
LIB_SYS_LIBS += rdkafka
|
||||
41
README.md
Normal file
41
README.md
Normal file
@@ -0,0 +1,41 @@
|
||||
# StreamGenerator
|
||||
|
||||
Clone the repository to a local directory via:
|
||||
|
||||
```
|
||||
git clone --recurse-submodules -j8 git@gitea.psi.ch:lin-instrument-computers/StreamGenerator.git
|
||||
```
|
||||
|
||||
## Dependencies
|
||||
|
||||
Currently, this project requires a system install of librdkafka. On Redhat,
|
||||
this means you should run:
|
||||
|
||||
```bash
|
||||
dnf install -y librdkafka-devel
|
||||
```
|
||||
|
||||
Additionally, you must first build Google's *flatbuffers* and ESS's
|
||||
**streaming-data-types** libraries, which are both included in this project as
|
||||
submodules under the `dep` directory and which are both necessary to build this
|
||||
project.
|
||||
|
||||
First, you should enter the *flatbuffers* directory and run the following:
|
||||
|
||||
```bash
|
||||
cmake -G "Unix Makefiles"
|
||||
make -j
|
||||
```
|
||||
|
||||
After these steps, you will find the program `flatc` has been built and placed
|
||||
in the directory.
|
||||
|
||||
Next, you should return to the top of this project's directory tree, and create
|
||||
the flatbuffers from ESS's schema files. This you can do as follows:
|
||||
|
||||
```bash
|
||||
./dep/flatbuffers/flatc -o schemas/ --cpp --gen-mutable --gen-name-strings --scoped-enums ./dep/streaming-data-types/schemas/*
|
||||
```
|
||||
|
||||
This generates header files from each of ESS's schemas and places them in a
|
||||
schemas directory.
|
||||
75
db/channels.db
Normal file
75
db/channels.db
Normal file
@@ -0,0 +1,75 @@
|
||||
# EPICS Database for streamdevice specific to measurement channels
|
||||
#
|
||||
# Macros
|
||||
# INSTR - Prefix
|
||||
# NAME - the device name, e.g. EL737
|
||||
# PORT - StreamGenerator Port
|
||||
# CHANNEL - the number associated with the measurment channel
|
||||
|
||||
################################################################################
|
||||
# Status Variables
|
||||
|
||||
# Trigger a change in status as clearing
|
||||
record(bo, "$(INSTR)$(NAME):T$(CHANNEL)")
|
||||
{
|
||||
field(DESC, "Trigger Clearing Status")
|
||||
field(VAL, 1)
|
||||
field(OUT, "$(INSTR)$(NAME):S$(CHANNEL) PP")
|
||||
}
|
||||
|
||||
# Trigger a change in status as value returned to 0
|
||||
record(seq, "$(INSTR)$(NAME):O$(CHANNEL)")
|
||||
{
|
||||
field(DESC, "Trigger Returned to 0 Status")
|
||||
field(LNK0, "$(INSTR)$(NAME):S$(CHANNEL) PP")
|
||||
field(DO0, 0)
|
||||
field(SELM, "Specified")
|
||||
field(SELL, "$(INSTR)$(NAME):M$(CHANNEL).VAL")
|
||||
field(SCAN, ".1 second")
|
||||
}
|
||||
|
||||
# Current Status of Channel, i.e. is it ready to count?
|
||||
record(bi, "$(INSTR)$(NAME):S$(CHANNEL)")
|
||||
{
|
||||
field(DESC, "Channel Status")
|
||||
field(VAL, 0)
|
||||
field(ZNAM, "OK")
|
||||
field(ONAM, "CLEARING")
|
||||
field(PINI, 1)
|
||||
}
|
||||
|
||||
################################################################################
|
||||
# Count Commands
|
||||
|
||||
record(longout, "$(INSTR)$(NAME):C$(CHANNEL)")
|
||||
{
|
||||
field(DESC, "Clear the current channel count")
|
||||
field(DTYP, "asynInt32")
|
||||
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) C_$(CHANNEL)")
|
||||
field(FLNK, "$(INSTR)$(NAME):T$(CHANNEL)")
|
||||
}
|
||||
|
||||
################################################################################
|
||||
# Read all monitors values
|
||||
|
||||
record(int64in, "$(INSTR)$(NAME):M$(CHANNEL)")
|
||||
{
|
||||
field(DESC, "DAQ CH$(CHANNEL)")
|
||||
field(EGU, "cts")
|
||||
field(DTYP, "asynInt64")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) COUNTS$(CHANNEL)")
|
||||
# This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(ai, "$(INSTR)$(NAME):R$(CHANNEL)")
|
||||
{
|
||||
field(DESC, "Rate of DAQ CH$(CHANNEL)")
|
||||
field(EGU, "cts/sec")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) RATE$(CHANNEL)")
|
||||
field(SCAN, ".2 second")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
277
db/daq_common.db
Normal file
277
db/daq_common.db
Normal file
@@ -0,0 +1,277 @@
|
||||
# EPICS Database for streamdevice specific to measurement channels
|
||||
#
|
||||
# Macros
|
||||
# INSTR - Prefix
|
||||
# NAME - the device name, e.g. EL737
|
||||
# PORT - StreamGenerator Port
|
||||
|
||||
record(longout, "$(INSTR)$(NAME):FULL-RESET")
|
||||
{
|
||||
field(DESC, "Reset the DAQ")
|
||||
field(DTYP, "asynInt32")
|
||||
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) RESET")
|
||||
}
|
||||
|
||||
################################################################################
|
||||
# Status Variables
|
||||
|
||||
# We separate the RAW-STATUS and the STATUS PV so that the state can be updated
|
||||
# in a sequence, that guarantees that we included the most recent time and
|
||||
# counts before the status switches back to Idle.
|
||||
# We do this via a sequenced update
|
||||
#
|
||||
# RAW-STATUS -> ELAPSED-SECONDS -> M* -> STATUS
|
||||
record(mbbi, "$(INSTR)$(NAME):RAW-STATUS")
|
||||
{
|
||||
field(DESC, "DAQ Status")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) STATUS")
|
||||
field(ZRVL, "0")
|
||||
field(ZRST, "Idle")
|
||||
field(ONVL, "1")
|
||||
field(ONST, "Counting")
|
||||
field(TWVL, "2")
|
||||
field(TWST, "Low rate")
|
||||
field(THVL, "3")
|
||||
field(THST, "Paused")
|
||||
# 4 should never happen, if it does it means the DAQ reports undocumented statusbits
|
||||
field(FRVL, "4")
|
||||
field(FRST, "INVALID")
|
||||
# This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order
|
||||
#field(SCAN, "I/O Intr")
|
||||
field(SCAN, ".2 second")
|
||||
field(FLNK, "$(INSTR)$(NAME):READALL")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(fanout, "$(INSTR)$(NAME):READALL")
|
||||
{
|
||||
field(SELM, "All")
|
||||
field(LNK0, "$(INSTR)$(NAME):ELAPSED-TIME PP")
|
||||
field(LNK1, "$(INSTR)$(NAME):M0")
|
||||
field(LNK2, "$(INSTR)$(NAME):M1")
|
||||
field(LNK3, "$(INSTR)$(NAME):M2")
|
||||
field(LNK4, "$(INSTR)$(NAME):M3")
|
||||
field(LNK5, "$(INSTR)$(NAME):M4")
|
||||
# Doesn't seemt o be a problem to have more in here :D
|
||||
# field(LNK6, "$(INSTR)$(NAME):M5")
|
||||
# field(LNK7, "$(INSTR)$(NAME):M6")
|
||||
field(FLNK, "$(INSTR)$(NAME):STATUS")
|
||||
}
|
||||
|
||||
record(mbbi, "$(INSTR)$(NAME):STATUS")
|
||||
{
|
||||
field(INP, "$(INSTR)$(NAME):RAW-STATUS NPP")
|
||||
field(DESC, "DAQ Status")
|
||||
field(ZRVL, "0")
|
||||
field(ZRST, "Idle")
|
||||
field(ONVL, "1")
|
||||
field(ONST, "Counting")
|
||||
field(TWVL, "2")
|
||||
field(TWST, "Low rate")
|
||||
field(THVL, "3")
|
||||
field(THST, "Paused")
|
||||
# 4 should never happen, if it does it means the DAQ reports undocumented statusbits
|
||||
field(FRVL, "4")
|
||||
field(FRST, "INVALID")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(longin, "$(INSTR)$(NAME):CHANNELS")
|
||||
{
|
||||
field(DESC, "Total Supported Channels")
|
||||
field(VAL, $(CHANNELS))
|
||||
field(DISP, 1)
|
||||
}
|
||||
|
||||
# Trigger a change in status as clearing
|
||||
record(bo, "$(INSTR)$(NAME):ETT")
|
||||
{
|
||||
field(DESC, "Trigger Clearing Status")
|
||||
field(VAL, 1)
|
||||
field(OUT, "$(INSTR)$(NAME):ETS PP")
|
||||
}
|
||||
|
||||
# Trigger a change in status as value returned to 0
|
||||
record(seq, "$(INSTR)$(NAME):ETO")
|
||||
{
|
||||
field(DESC, "Trigger Returned to 0 Status")
|
||||
field(LNK0, "$(INSTR)$(NAME):ETS PP")
|
||||
field(DO0, 0)
|
||||
field(SELM, "Specified")
|
||||
field(SELL, "$(INSTR)$(NAME):ELAPSED-TIME.VAL")
|
||||
field(SCAN, ".1 second")
|
||||
}
|
||||
|
||||
# Current Status of Channel, i.e. is it ready to count?
|
||||
record(bi, "$(INSTR)$(NAME):ETS")
|
||||
{
|
||||
field(DESC, "Channel Status")
|
||||
field(VAL, 0)
|
||||
field(ZNAM, "OK")
|
||||
field(ONAM, "CLEARING")
|
||||
field(PINI, 1)
|
||||
}
|
||||
|
||||
################################################################################
|
||||
# Count Commands
|
||||
|
||||
record(ao,"$(INSTR)$(NAME):PRESET-COUNT")
|
||||
{
|
||||
field(DESC, "Count until preset reached")
|
||||
field(DTYP, "asynInt32")
|
||||
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) P_CNT")
|
||||
field(VAL, 0)
|
||||
field(PREC, 2)
|
||||
}
|
||||
|
||||
record(ao,"$(INSTR)$(NAME):PRESET-TIME")
|
||||
{
|
||||
field(DESC, "Count for specified time")
|
||||
field(EGU, "seconds")
|
||||
field(DTYP, "asynInt32")
|
||||
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) P_TIME")
|
||||
field(VAL, 0)
|
||||
field(PREC, 2)
|
||||
}
|
||||
|
||||
# record(bo,"$(INSTR)$(NAME):PAUSE")
|
||||
# {
|
||||
# field(DESC, "Pause the current count")
|
||||
# field(DTYP, "stream")
|
||||
# field(OUT, "@... pauseCount($(INSTR)$(NAME):) $(PORT)")
|
||||
# field(VAL, "0")
|
||||
# field(FLNK, "$(INSTR)$(NAME):RAW-STATUS")
|
||||
# }
|
||||
#
|
||||
# record(bo,"$(INSTR)$(NAME):CONTINUE")
|
||||
# {
|
||||
# field(DESC, "Continue with a count that was paused")
|
||||
# field(DTYP, "stream")
|
||||
# field(OUT, "@... continueCount($(INSTR)$(NAME):) $(PORT)")
|
||||
# field(VAL, "0")
|
||||
# field(FLNK, "$(INSTR)$(NAME):RAW-STATUS")
|
||||
# }
|
||||
|
||||
record(longout, "$(INSTR)$(NAME):STOP")
|
||||
{
|
||||
field(DESC, "Stop the current counting operation")
|
||||
field(DTYP, "asynInt32")
|
||||
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) STOP")
|
||||
}
|
||||
|
||||
record(longout, "$(INSTR)$(NAME):MONITOR-CHANNEL")
|
||||
{
|
||||
field(DESC, "PRESET-COUNT Monitors this channel")
|
||||
field(DTYP, "asynInt32")
|
||||
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) MONITOR")
|
||||
field(DRVL, "0") # Smallest Monitor Channel
|
||||
field(DRVH, "$(CHANNELS)") # Largest Monitor Channel
|
||||
}
|
||||
|
||||
record(longin, "$(INSTR)$(NAME):MONITOR-CHANNEL_RBV")
|
||||
{
|
||||
field(DESC, "PRESET-COUNT Monitors this channel")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) MONITOR")
|
||||
field(SCAN, "I/O Intr")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(ao,"$(INSTR)$(NAME):THRESHOLD")
|
||||
{
|
||||
field(DESC, "Minimum rate for counting to proceed")
|
||||
field(DTYP, "asynInt32")
|
||||
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH")
|
||||
field(VAL, "1") # Default Rate
|
||||
field(DRVL, "1") # Minimum Rate
|
||||
field(DRVH, "100000") # Maximum Rate
|
||||
}
|
||||
|
||||
record(ai,"$(INSTR)$(NAME):THRESHOLD_RBV")
|
||||
{
|
||||
field(DESC, "Minimum rate for counting to proceed")
|
||||
field(EGU, "cts/sec")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH")
|
||||
field(SCAN, "I/O Intr")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(longout,"$(INSTR)$(NAME):THRESHOLD-MONITOR")
|
||||
{
|
||||
field(DESC, "Channel monitored for minimum rate")
|
||||
field(DTYP, "asynInt32")
|
||||
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH_CH")
|
||||
field(VAL, "1") # Monitor
|
||||
field(DRVL, "0") # Smallest Threshold Channel (0 is off)
|
||||
field(DRVH, "$(CHANNELS)") # Largest Threshold Channel
|
||||
}
|
||||
|
||||
record(longin,"$(INSTR)$(NAME):THRESHOLD-MONITOR_RBV")
|
||||
{
|
||||
field(DESC, "Channel monitored for minimum rate")
|
||||
field(EGU, "CH")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH_CH")
|
||||
field(SCAN, "I/O Intr")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(longout, "$(INSTR)$(NAME):CT")
|
||||
{
|
||||
field(DESC, "Clear the timer")
|
||||
field(DTYP, "asynInt32")
|
||||
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) C_TIME")
|
||||
field(FLNK, "$(INSTR)$(NAME):ETT")
|
||||
}
|
||||
|
||||
################################################################################
|
||||
# Read all monitors values
|
||||
|
||||
record(ai, "$(INSTR)$(NAME):ELAPSED-TIME")
|
||||
{
|
||||
field(DESC, "DAQ Measured Time")
|
||||
field(EGU, "sec")
|
||||
field(DTYP, "asynFloat64")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) TIME")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(PINI, "YES")
|
||||
# field(FLNK, "$(INSTR)$(NAME):ETO")
|
||||
}
|
||||
|
||||
################################################################################
|
||||
# Stream Generator Status PVs
|
||||
|
||||
record(longin,"$(INSTR)$(NAME):UDP_DROPPED")
|
||||
{
|
||||
field(DESC, "UDP Packets Missed")
|
||||
field(EGU, "Events")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) DROP")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(SCAN, "1 second")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(longin,"$(INSTR)$(NAME):UDP_WATERMARK")
|
||||
{
|
||||
field(DESC, "UDP Queue Usage")
|
||||
field(EGU, "%")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) UDP")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(SCAN, "1 second")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(longin,"$(INSTR)$(NAME):SORTED_WATERMARK")
|
||||
{
|
||||
field(DESC, "Partial Sort Queue Usage")
|
||||
field(EGU, "%")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) SORT")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(SCAN, "1 second")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
1
dep/flatbuffers
Submodule
1
dep/flatbuffers
Submodule
Submodule dep/flatbuffers added at 1872409707
1
dep/streaming-data-types
Submodule
1
dep/streaming-data-types
Submodule
Submodule dep/streaming-data-types added at 3b1830faf2
@@ -1,4 +0,0 @@
|
||||
confluent-kafka==2.12.1
|
||||
ess-streaming-data-types==0.27.0
|
||||
flatbuffers==25.9.23
|
||||
numpy==1.26.3
|
||||
9
scripts/ioc.sh
Executable file
9
scripts/ioc.sh
Executable file
@@ -0,0 +1,9 @@
|
||||
#!/bin/bash
|
||||
|
||||
export EPICS_HOST_ARCH=linux-x86_64
|
||||
export EPICS_BASE=/usr/local/epics/base-7.0.7
|
||||
|
||||
PARENT_PATH="$( cd "$(dirname "${BASH_SOURCE[0]}")" ; pwd -P )"
|
||||
|
||||
# /usr/local/bin/procServ -o -L - -f -i ^D^C 20001 "${PARENT_PATH}/st.cmd" -d
|
||||
${PARENT_PATH}/st.cmd
|
||||
38
scripts/st.cmd
Executable file
38
scripts/st.cmd
Executable file
@@ -0,0 +1,38 @@
|
||||
#!/usr/local/bin/iocsh
|
||||
#-d
|
||||
|
||||
on error break
|
||||
|
||||
require StreamGenerator, test
|
||||
|
||||
epicsEnvSet("INSTR", "SQ:TEST:")
|
||||
epicsEnvSet("NAME", "SG")
|
||||
|
||||
# Local UDP Generator Test Config
|
||||
# drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 1)
|
||||
|
||||
# Correlation Unit Config
|
||||
drvAsynIPPortConfigure("ASYN_IP_PORT", "172.28.69.20:54321:54321 UDP", 0, 0, 1)
|
||||
|
||||
# With a udpQueue and sortQueue size of 10'000 packets, we can hold in memory
|
||||
# 10'000 * 243 = 2.43e6 events
|
||||
|
||||
# Kafka Broker and Topic Configuration
|
||||
# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "linkafka01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 10000, 20480)
|
||||
# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "ess01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 10000, 20480)
|
||||
|
||||
# Don't send any kafka messages
|
||||
asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "", "", "", 0, 0)
|
||||
|
||||
dbLoadRecords("$(StreamGenerator_DB)daq_common.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNELS=5")
|
||||
|
||||
# Detector Count Channel
|
||||
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=0")
|
||||
|
||||
# Monitor Channels
|
||||
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=1")
|
||||
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=2")
|
||||
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=3")
|
||||
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=4")
|
||||
|
||||
iocInit()
|
||||
118
scripts/udp_gen.py
Normal file
118
scripts/udp_gen.py
Normal file
@@ -0,0 +1,118 @@
|
||||
import socket
|
||||
import time
|
||||
import random
|
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
|
||||
header = [
|
||||
0, 0, # buffer length in 16bit words (1, 0) == 1, (0, 1) == 256
|
||||
0, 0x80, # buffer type (probably should be 0)
|
||||
21, 0, # header length
|
||||
0, 0, # buffer number
|
||||
0, 0, # run id
|
||||
0x3, # status
|
||||
0, # id of sending module
|
||||
0, 0, # timestamp low
|
||||
0, 0, # timestamp mid
|
||||
0, 0, # timestamp high
|
||||
] + [0, 0] * 12 # parameters
|
||||
|
||||
data = [
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0
|
||||
]
|
||||
|
||||
start_time = time.time_ns() // 100
|
||||
|
||||
buffer_ids = {
|
||||
i: (0, 0) for i in range(10)
|
||||
}
|
||||
|
||||
while True:
|
||||
|
||||
# update timestamp
|
||||
base_timestamp = time.time_ns() // 100 - start_time
|
||||
t_low = base_timestamp & 0xffff
|
||||
t_mid = (base_timestamp >> 16) & 0xffff
|
||||
t_high = (base_timestamp >> 32) & 0xffff
|
||||
header[12] = t_low & 0xff
|
||||
header[13] = t_low >> 8
|
||||
header[14] = t_mid & 0xff
|
||||
header[15] = t_mid >> 8
|
||||
header[16] = t_high & 0xff
|
||||
header[17] = t_high >> 8
|
||||
|
||||
num_events = random.randint(0, 243)
|
||||
# num_events = 243
|
||||
# num_events = 1
|
||||
|
||||
# update buffer length
|
||||
buffer_length = 21 + num_events * 3
|
||||
header[0] = buffer_length & 0xff
|
||||
header[1] = (buffer_length >> 8) & 0xff
|
||||
|
||||
# I believe, that in our case we never mix monitor and detector events as
|
||||
# the monitors should have id 0 and the detector events 1-9 so I have
|
||||
# excluded that posibility here. That would, however, if true mean we could
|
||||
# reduce also the number of checks on the parsing side of things...
|
||||
|
||||
is_monitor = random.randint(0, 9)
|
||||
# is_monitor = 4
|
||||
|
||||
header[11] = 0 if is_monitor > 3 else random.randint(1,9)
|
||||
|
||||
# update buffer number (each mcpdid has its own buffer number count)
|
||||
header[6], header[7] = buffer_ids[header[11]]
|
||||
header[6] = (header[6] + 1) % (0xff + 1)
|
||||
header[7] = (header[7] + (header[6] == 0)) % (0xff + 1)
|
||||
buffer_ids[header[11]] = header[6], header[7]
|
||||
|
||||
tosend = list(header)
|
||||
|
||||
if is_monitor > 3:
|
||||
|
||||
for i in range(num_events):
|
||||
d = list(data)
|
||||
|
||||
monitor = random.randint(0,3)
|
||||
# monitor = 0
|
||||
|
||||
d[5] = (1 << 7) | monitor
|
||||
|
||||
# update trigger timestamp
|
||||
event_timestamp = (time.time_ns() // 100) - base_timestamp
|
||||
d[0] = event_timestamp & 0xff
|
||||
d[1] = (event_timestamp >> 8) & 0xff
|
||||
d[2] = (event_timestamp >> 16) & 0x07
|
||||
|
||||
tosend += d
|
||||
|
||||
else:
|
||||
|
||||
for i in range(num_events):
|
||||
d = list(data)
|
||||
|
||||
amplitude = random.randint(0, 255)
|
||||
x_pos = random.randint(0, 1023)
|
||||
y_pos = random.randint(0, 1023)
|
||||
event_timestamp = (time.time_ns() // 100) - base_timestamp
|
||||
|
||||
d[5] = (0 << 7) | (amplitude >> 1)
|
||||
d[4] = ((amplitude & 0x01) << 7) | (y_pos >> 3)
|
||||
d[3] = ((y_pos << 5) & 0xE0) | (x_pos >> 5)
|
||||
d[2] = ((x_pos << 3) & 0xF8)
|
||||
|
||||
d[0] = event_timestamp & 0xff
|
||||
d[1] = (event_timestamp >> 8) & 0xff
|
||||
d[2] |= (event_timestamp >> 16) & 0x07
|
||||
|
||||
tosend += d
|
||||
|
||||
sock.sendto(bytes(tosend), ('127.0.0.1', 54321))
|
||||
mv = memoryview(bytes(header)).cast('H')
|
||||
print(f'Sent packet {mv[3]} with {num_events} events {base_timestamp}')
|
||||
# time.sleep(.01)
|
||||
910
src/asynStreamGeneratorDriver.cpp
Normal file
910
src/asynStreamGeneratorDriver.cpp
Normal file
@@ -0,0 +1,910 @@
|
||||
#include "asynOctetSyncIO.h"
|
||||
#include "ev42_events_generated.h"
|
||||
#include <cmath>
|
||||
#include <cstring>
|
||||
#include <epicsStdio.h>
|
||||
#include <iocsh.h>
|
||||
#include <queue>
|
||||
|
||||
// Just for printing
|
||||
#define __STDC_FORMAT_MACROS
|
||||
#include <inttypes.h>
|
||||
|
||||
#include "asynStreamGeneratorDriver.h"
|
||||
#include <epicsExport.h>
|
||||
|
||||
/*******************************************************************************
|
||||
* Kafka Methods
|
||||
*/
|
||||
|
||||
static void set_kafka_config_key(rd_kafka_conf_t *conf, char *key,
|
||||
char *value) {
|
||||
char errstr[512];
|
||||
rd_kafka_conf_res_t res;
|
||||
|
||||
res = rd_kafka_conf_set(conf, key, value, errstr, sizeof(errstr));
|
||||
if (res != RD_KAFKA_CONF_OK) {
|
||||
epicsStdoutPrintf("Failed to set config value %s : %s\n", key, value);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
static rd_kafka_t *create_kafka_producer(const char *kafkaBroker) {
|
||||
|
||||
char errstr[512];
|
||||
rd_kafka_t *producer;
|
||||
|
||||
// Prepare configuration object
|
||||
rd_kafka_conf_t *conf = rd_kafka_conf_new();
|
||||
// TODO feel not great about this
|
||||
set_kafka_config_key(conf, "bootstrap.servers",
|
||||
const_cast<char *>(kafkaBroker));
|
||||
set_kafka_config_key(conf, "queue.buffering.max.messages", "10000000");
|
||||
// With 2e6 counts / s
|
||||
// and a packet size of 20480 events (163920 bytes)
|
||||
// this implies we need to send around 100 messages a second
|
||||
// and need about .2 gigabit upload
|
||||
// set_kafka_config_key(conf, "queue.buffering.max.kbytes", "10000000");
|
||||
|
||||
// Create the Producer
|
||||
producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
|
||||
|
||||
if (!producer) {
|
||||
epicsStdoutPrintf("Failed to create Kafka Producer: %s\n", errstr);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
return producer;
|
||||
}
|
||||
|
||||
/*******************************************************************************
|
||||
* Static Methods Passed to Epics Threads that should run in the background
|
||||
*/
|
||||
static void udpPollerTask(void *drvPvt) {
|
||||
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
|
||||
pSGD->receiveUDP();
|
||||
}
|
||||
|
||||
static void udpNormaliserTask(void *drvPvt) {
|
||||
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
|
||||
pSGD->normaliseUDP();
|
||||
}
|
||||
|
||||
static void sortTask(void *drvPvt) {
|
||||
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
|
||||
pSGD->partialSortEvents();
|
||||
}
|
||||
|
||||
static void daqTask(void *drvPvt) {
|
||||
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
|
||||
pSGD->processEvents();
|
||||
}
|
||||
|
||||
static void monitorProducerTask(void *drvPvt) {
|
||||
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
|
||||
pSGD->produceMonitor();
|
||||
}
|
||||
|
||||
static void detectorProducerTask(void *drvPvt) {
|
||||
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
|
||||
pSGD->produceDetector();
|
||||
}
|
||||
|
||||
/*******************************************************************************
|
||||
* Stream Generator Helper Methods
|
||||
*/
|
||||
|
||||
asynStatus asynStreamGeneratorDriver::createInt32Param(
|
||||
asynStatus status, char *name, int *variable, epicsInt32 initialValue) {
|
||||
// TODO should show error if there is one
|
||||
return (asynStatus)(status | createParam(name, asynParamInt32, variable) |
|
||||
setIntegerParam(*variable, initialValue));
|
||||
}
|
||||
|
||||
asynStatus asynStreamGeneratorDriver::createInt64Param(
|
||||
asynStatus status, char *name, int *variable, epicsInt64 initialValue) {
|
||||
// TODO should show error if there is one
|
||||
return (asynStatus)(status | createParam(name, asynParamInt64, variable) |
|
||||
setInteger64Param(*variable, initialValue));
|
||||
}
|
||||
|
||||
asynStatus asynStreamGeneratorDriver::createFloat64Param(asynStatus status,
|
||||
char *name,
|
||||
int *variable,
|
||||
double initialValue) {
|
||||
// TODO should show error if there is one
|
||||
return (asynStatus)(status | createParam(name, asynParamFloat64, variable) |
|
||||
setDoubleParam(*variable, initialValue));
|
||||
}
|
||||
|
||||
/*******************************************************************************
|
||||
* Stream Generator Methods
|
||||
*/
|
||||
asynStreamGeneratorDriver::asynStreamGeneratorDriver(
|
||||
const char *portName, const char *ipPortName, const int numChannels,
|
||||
const int udpQueueSize, const bool enableKafkaStream,
|
||||
const char *kafkaBroker, const char *monitorTopic,
|
||||
const char *detectorTopic, const int kafkaQueueSize,
|
||||
const int kafkaMaxPacketSize)
|
||||
: asynPortDriver(portName, 1, /* maxAddr */
|
||||
asynInt32Mask | asynInt64Mask | asynFloat64Mask |
|
||||
asynDrvUserMask, /* Interface mask */
|
||||
asynInt32Mask, // | asynFloat64Mask, /* Interrupt mask */
|
||||
0, /* asynFlags. This driver does not block and it is
|
||||
not multi-device, but has a
|
||||
destructor ASYN_DESTRUCTIBLE our version of the Asyn
|
||||
is too old to support this flag */
|
||||
1, /* Autoconnect */
|
||||
0, /* Default priority */
|
||||
0), /* Default stack size*/
|
||||
num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream),
|
||||
monitorTopic(monitorTopic), detectorTopic(detectorTopic),
|
||||
udpQueueSize(udpQueueSize), kafkaQueueSize(kafkaQueueSize),
|
||||
// measured in max packet sizes
|
||||
udpQueue(
|
||||
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
|
||||
normalisedQueue(
|
||||
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
|
||||
// TODO configurable sizes
|
||||
sortedQueue(
|
||||
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
|
||||
monitorQueue(
|
||||
epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))),
|
||||
detectorQueue(
|
||||
epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))),
|
||||
kafkaMaxPacketSize(kafkaMaxPacketSize) {
|
||||
const char *functionName = "asynStreamGeneratorDriver";
|
||||
|
||||
// Parameter Setup
|
||||
asynStatus status = asynSuccess;
|
||||
|
||||
status = createInt32Param(status, P_StatusString, &P_Status, STATUS_IDLE);
|
||||
status = createInt32Param(status, P_ResetString, &P_Reset);
|
||||
status = createInt32Param(status, P_StopString, &P_Stop);
|
||||
status = createInt32Param(status, P_CountPresetString, &P_CountPreset);
|
||||
status = createInt32Param(status, P_TimePresetString, &P_TimePreset);
|
||||
status = createFloat64Param(status, P_ElapsedTimeString, &P_ElapsedTime);
|
||||
status =
|
||||
createInt32Param(status, P_ClearElapsedTimeString, &P_ClearElapsedTime);
|
||||
status =
|
||||
createInt32Param(status, P_MonitorChannelString, &P_MonitorChannel);
|
||||
status = createInt32Param(status, P_ThresholdString, &P_Threshold, 1);
|
||||
status = createInt32Param(status, P_ThresholdChannelString,
|
||||
&P_ThresholdChannel, 1);
|
||||
|
||||
// Create Parameters templated on Channel Number
|
||||
char pv_name_buffer[100];
|
||||
P_Counts = new int[this->num_channels];
|
||||
P_Rates = new int[this->num_channels];
|
||||
P_ClearCounts = new int[this->num_channels];
|
||||
for (std::size_t i = 0; i < this->num_channels; ++i) {
|
||||
memset(pv_name_buffer, 0, 100);
|
||||
epicsSnprintf(pv_name_buffer, 100, P_CountsString, i);
|
||||
status = createInt64Param(status, pv_name_buffer, P_Counts + i);
|
||||
|
||||
memset(pv_name_buffer, 0, 100);
|
||||
epicsSnprintf(pv_name_buffer, 100, P_RateString, i);
|
||||
status = createInt32Param(status, pv_name_buffer, P_Rates + i);
|
||||
|
||||
memset(pv_name_buffer, 0, 100);
|
||||
epicsSnprintf(pv_name_buffer, 100, P_ClearCountsString, i);
|
||||
status = createInt32Param(status, pv_name_buffer, P_ClearCounts + i);
|
||||
}
|
||||
|
||||
status = createInt32Param(status, P_UdpDroppedString, &P_UdpDropped);
|
||||
status = createInt32Param(status, P_UdpQueueHighWaterMarkString,
|
||||
&P_UdpQueueHighWaterMark);
|
||||
status = createInt32Param(status, P_SortedQueueHighWaterMarkString,
|
||||
&P_SortedQueueHighWaterMark);
|
||||
|
||||
if (status) {
|
||||
epicsStdoutPrintf(
|
||||
"%s:%s: failed to create or setup parameters, status=%d\n",
|
||||
driverName, functionName, status);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// Create Events
|
||||
// this->pausedEventId = epicsEventCreate(epicsEventEmpty);
|
||||
|
||||
if (enableKafkaStream) {
|
||||
|
||||
epicsStdoutPrintf(
|
||||
"Detector Kafka Config: broker=%s, topic=%s\n "
|
||||
" queue size:%d, max events per packet: %d\n",
|
||||
kafkaBroker, this->detectorTopic, kafkaQueueSize,
|
||||
this->kafkaMaxPacketSize);
|
||||
|
||||
epicsStdoutPrintf(
|
||||
"Monitors Kafka Config: broker=%s, topic=%s\n "
|
||||
" queue size:%d, max events per packet: %d\n",
|
||||
kafkaBroker, this->monitorTopic, kafkaQueueSize,
|
||||
this->kafkaMaxPacketSize);
|
||||
|
||||
this->monitorProducer = create_kafka_producer(kafkaBroker);
|
||||
this->detectorProducer = create_kafka_producer(kafkaBroker);
|
||||
|
||||
// Setup for Thread Producing Monitor Kafka Events
|
||||
status =
|
||||
(asynStatus)(epicsThreadCreate(
|
||||
"monitor_produce", epicsThreadPriorityMedium,
|
||||
epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||
(EPICSTHREADFUNC)::monitorProducerTask,
|
||||
this) == NULL);
|
||||
if (status) {
|
||||
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
||||
driverName, functionName, status);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// Setup for Thread Producing Detector Kafka Events
|
||||
status =
|
||||
(asynStatus)(epicsThreadCreate(
|
||||
"monitor_produce", epicsThreadPriorityMedium,
|
||||
epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||
(EPICSTHREADFUNC)::detectorProducerTask,
|
||||
this) == NULL);
|
||||
if (status) {
|
||||
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
||||
driverName, functionName, status);
|
||||
exit(1);
|
||||
}
|
||||
} else {
|
||||
|
||||
epicsStdoutPrintf("Kafka Stream Disabled\n");
|
||||
}
|
||||
|
||||
/* Create the thread that orders the events and acts as our sinqDaq stand-in
|
||||
*/
|
||||
status =
|
||||
(asynStatus)(epicsThreadCreate(
|
||||
"sinqDAQ",
|
||||
epicsThreadPriorityMedium, // epicsThreadPriorityMax,
|
||||
epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||
(EPICSTHREADFUNC)::daqTask, this) == NULL);
|
||||
if (status) {
|
||||
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
||||
driverName, functionName, status);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* Create the thread that orders packets of in preparation for our sinqDAQ
|
||||
* stand-in
|
||||
*/
|
||||
status = (asynStatus)(epicsThreadCreate(
|
||||
"partialSort", epicsThreadPriorityMedium,
|
||||
epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||
(EPICSTHREADFUNC)::sortTask, this) == NULL);
|
||||
if (status) {
|
||||
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
||||
driverName, functionName, status);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* Create the thread normalises the events
|
||||
*/
|
||||
status =
|
||||
(asynStatus)(epicsThreadCreate(
|
||||
"eventNormaliser",
|
||||
epicsThreadPriorityMedium, // epicsThreadPriorityMax,
|
||||
epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||
(EPICSTHREADFUNC)::udpNormaliserTask, this) == NULL);
|
||||
if (status) {
|
||||
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
||||
driverName, functionName, status);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// UDP Receive Setup
|
||||
status = pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL);
|
||||
|
||||
if (status) {
|
||||
epicsStdoutPrintf("%s:%s: Couldn't open connection %s, status=%d\n",
|
||||
driverName, functionName, ipPortName, status);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* Create the thread that receives UDP traffic in the background */
|
||||
status = (asynStatus)(epicsThreadCreate(
|
||||
"udp_receive", epicsThreadPriorityMax,
|
||||
epicsThreadGetStackSize(epicsThreadStackMedium),
|
||||
(EPICSTHREADFUNC)::udpPollerTask, this) == NULL);
|
||||
if (status) {
|
||||
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
|
||||
driverName, functionName, status);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
asynStreamGeneratorDriver::~asynStreamGeneratorDriver() {
|
||||
// should make sure queues are empty and freed
|
||||
// and that the kafka producers are flushed and freed
|
||||
delete[] P_Counts;
|
||||
delete[] P_Rates;
|
||||
|
||||
// TODO add exit should perhaps ensure the queue is flushed
|
||||
// rd_kafka_poll(producer, 0);
|
||||
// epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
|
||||
// rd_kafka_flush(producer, 10 * 1000);
|
||||
// epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
|
||||
}
|
||||
|
||||
asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser,
|
||||
epicsInt32 *value) {
|
||||
|
||||
int function = pasynUser->reason;
|
||||
asynStatus status = asynSuccess;
|
||||
const char *paramName;
|
||||
const char *functionName = "readInt32";
|
||||
getParamName(function, ¶mName);
|
||||
|
||||
if (function == P_UdpQueueHighWaterMark) {
|
||||
const double toPercent = 100. / (243. * udpQueueSize);
|
||||
*value = (epicsInt32)(epicsRingBytesHighWaterMark(this->udpQueue) /
|
||||
sizeof(NormalisedEvent) * toPercent);
|
||||
// Aparently resetting the watermark causes problems...
|
||||
// at least concurrently :D
|
||||
// epicsRingBytesResetHighWaterMark(this->udpQueue);
|
||||
return asynSuccess;
|
||||
} else if (function == P_SortedQueueHighWaterMark) {
|
||||
const double toPercent = 100. / (243. * udpQueueSize);
|
||||
*value = (epicsInt32)(epicsRingBytesHighWaterMark(this->sortedQueue) /
|
||||
sizeof(NormalisedEvent) * toPercent);
|
||||
// epicsRingBytesResetHighWaterMark(this->sortedQueue);
|
||||
return asynSuccess;
|
||||
}
|
||||
|
||||
return asynPortDriver::readInt32(pasynUser, value);
|
||||
}
|
||||
|
||||
asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser,
|
||||
epicsInt32 value) {
|
||||
int function = pasynUser->reason;
|
||||
asynStatus status = asynSuccess;
|
||||
const char *paramName;
|
||||
const char *functionName = "writeInt32";
|
||||
getParamName(function, ¶mName);
|
||||
|
||||
// TODO should maybe lock mutex for this
|
||||
epicsInt32 currentStatus;
|
||||
status = getIntegerParam(this->P_Status, ¤tStatus);
|
||||
|
||||
if (status) {
|
||||
epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize,
|
||||
"%s:%s: status=%d, function=%d, name=%s, value=%d",
|
||||
driverName, functionName, status, function, paramName,
|
||||
value);
|
||||
return status;
|
||||
}
|
||||
|
||||
// TODO clean up
|
||||
bool isClearCount = false;
|
||||
size_t channelToClear;
|
||||
for (size_t i = 0; i < this->num_channels; ++i) {
|
||||
isClearCount |= function == P_ClearCounts[i];
|
||||
if (isClearCount) {
|
||||
channelToClear = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO should check everything...
|
||||
if (function == P_CountPreset) {
|
||||
if (!currentStatus) {
|
||||
setIntegerParam(function, value);
|
||||
setIntegerParam(P_Status, STATUS_COUNTING);
|
||||
status = (asynStatus)callParamCallbacks();
|
||||
} else {
|
||||
return asynError;
|
||||
}
|
||||
} else if (function == P_TimePreset) {
|
||||
if (!currentStatus) {
|
||||
setIntegerParam(function, value);
|
||||
setIntegerParam(P_Status, STATUS_COUNTING);
|
||||
status = (asynStatus)callParamCallbacks();
|
||||
} else {
|
||||
return asynError;
|
||||
}
|
||||
} else if (function == P_ClearElapsedTime) {
|
||||
if (!currentStatus) {
|
||||
setIntegerParam(P_ElapsedTime, 0);
|
||||
status = (asynStatus)callParamCallbacks();
|
||||
} else {
|
||||
return asynError;
|
||||
}
|
||||
} else if (isClearCount) {
|
||||
if (!currentStatus) {
|
||||
setInteger64Param(P_Counts[channelToClear], 0);
|
||||
status = (asynStatus)callParamCallbacks();
|
||||
} else {
|
||||
return asynError;
|
||||
}
|
||||
} else if (function == P_Reset) {
|
||||
lock();
|
||||
// TODO should probably set back everything to defaults
|
||||
setIntegerParam(P_Status, STATUS_IDLE);
|
||||
status = (asynStatus)callParamCallbacks();
|
||||
unlock();
|
||||
} else if (function == P_Stop) {
|
||||
lock();
|
||||
setIntegerParam(P_Status, STATUS_IDLE);
|
||||
status = (asynStatus)callParamCallbacks();
|
||||
unlock();
|
||||
} else if (function == P_MonitorChannel) {
|
||||
if (!currentStatus) {
|
||||
setIntegerParam(function, value);
|
||||
status = (asynStatus)callParamCallbacks();
|
||||
} else {
|
||||
return asynError;
|
||||
}
|
||||
} else {
|
||||
setIntegerParam(function, value);
|
||||
status = (asynStatus)callParamCallbacks();
|
||||
}
|
||||
|
||||
if (status)
|
||||
epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize,
|
||||
"%s:%s: status=%d, function=%d, name=%s, value=%d",
|
||||
driverName, functionName, status, function, paramName,
|
||||
value);
|
||||
return status;
|
||||
}
|
||||
|
||||
void asynStreamGeneratorDriver::receiveUDP() {
|
||||
|
||||
const char *functionName = "receiveUDP";
|
||||
asynStatus status = asynSuccess;
|
||||
// int isConnected = 1;
|
||||
std::size_t received;
|
||||
int eomReason;
|
||||
|
||||
const std::size_t bufferSize = 1500;
|
||||
char buffer[bufferSize];
|
||||
|
||||
while (true) {
|
||||
|
||||
// status = pasynManager->isConnected(pasynUDPUser, &isConnected);
|
||||
|
||||
// if (!isConnected)
|
||||
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
|
||||
// "%s:%s: isConnected = %d\n", driverName, functionName,
|
||||
// isConnected);
|
||||
|
||||
status = pasynOctetSyncIO->read(pasynUDPUser, buffer, bufferSize,
|
||||
0, // timeout
|
||||
&received, &eomReason);
|
||||
|
||||
if (received) {
|
||||
const uint16_t bufferLength = ((uint16_t *)buffer)[0];
|
||||
const std::size_t headerLength = 42;
|
||||
|
||||
if (received >= headerLength && received == bufferLength * 2) {
|
||||
|
||||
epicsRingBytesPut(this->udpQueue, (char *)buffer, bufferSize);
|
||||
|
||||
} else {
|
||||
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
|
||||
"%s:%s: invalid UDP packet\n", driverName,
|
||||
functionName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void asynStreamGeneratorDriver::normaliseUDP() {
|
||||
|
||||
// TODO fix time overflows
|
||||
// Regarding time overflow.
|
||||
// * the header time stamp is 3 words, i.e. 48 bits.
|
||||
// * it has a resolution of 100ns
|
||||
// * so we can cover a maximum of (2^(3*16) - 1) * 1e-7 = 28147497 seconds
|
||||
// * or about 325 days
|
||||
// * so maybe this isn't necessary to solve, as long as we restart the
|
||||
// electronics at least once a year...
|
||||
|
||||
const char *functionName = "normaliseUDP";
|
||||
asynStatus status = asynSuccess;
|
||||
int isConnected = 1;
|
||||
std::size_t received;
|
||||
int eomReason;
|
||||
|
||||
// The correlation unit sends messages with a maximum size of 1500 bytes.
|
||||
// These messages don't have any obious start or end to synchronise
|
||||
// against...
|
||||
const std::size_t bufferSize = 1500;
|
||||
char buffer[bufferSize];
|
||||
|
||||
const std::size_t resultBufferSize = 243;
|
||||
NormalisedEvent resultBuffer[resultBufferSize];
|
||||
|
||||
// We have 10 mcpdids
|
||||
uint64_t lastBufferNumber[10];
|
||||
for (size_t i = 0; i < 10; ++i) {
|
||||
lastBufferNumber[i] = 0;
|
||||
}
|
||||
|
||||
epicsInt32 droppedMessages = 0;
|
||||
|
||||
const UDPHeader *header;
|
||||
const DetectorEvent *d_event;
|
||||
const MonitorEvent *m_event;
|
||||
NormalisedEvent ne;
|
||||
|
||||
while (true) {
|
||||
|
||||
if (epicsRingBytesUsedBytes(this->udpQueue) > 1500) {
|
||||
|
||||
epicsRingBytesGet(this->udpQueue, (char *)buffer, bufferSize);
|
||||
|
||||
header = (UDPHeader *)buffer;
|
||||
const std::size_t total_events = (header->BufferLength - 21) / 3;
|
||||
|
||||
if (header->BufferNumber - lastBufferNumber[header->McpdID] > 1 &&
|
||||
lastBufferNumber[header->McpdID] !=
|
||||
std::numeric_limits<
|
||||
decltype(header->BufferNumber)>::max()) {
|
||||
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
|
||||
"%s:%s: missed packet on id: %d. Received: %" PRIu64
|
||||
", last: %" PRIu64 "\n",
|
||||
driverName, functionName, header->McpdID,
|
||||
header->BufferNumber,
|
||||
lastBufferNumber[header->McpdID]);
|
||||
setIntegerParam(P_UdpDropped, ++droppedMessages);
|
||||
}
|
||||
|
||||
lastBufferNumber[header->McpdID] = header->BufferNumber;
|
||||
|
||||
for (std::size_t i = 0; i < total_events; ++i) {
|
||||
char *event = (buffer + 21 * 2 + i * 6);
|
||||
const bool isMonitorEvent = event[5] & 0x80;
|
||||
|
||||
if (isMonitorEvent) {
|
||||
m_event = (MonitorEvent *)event;
|
||||
ne.timestamp =
|
||||
header->nanosecs() + (uint64_t)m_event->nanosecs();
|
||||
ne.source = 0;
|
||||
ne.pixelId = m_event->DataID;
|
||||
|
||||
} else {
|
||||
d_event = (DetectorEvent *)event;
|
||||
ne.timestamp =
|
||||
header->nanosecs() + (uint64_t)d_event->nanosecs();
|
||||
ne.source = header->McpdID;
|
||||
ne.pixelId = d_event->pixelId(header->McpdID);
|
||||
}
|
||||
|
||||
resultBuffer[i] = ne;
|
||||
}
|
||||
|
||||
epicsRingBytesPut(this->normalisedQueue, (char *)resultBuffer,
|
||||
total_events * sizeof(NormalisedEvent));
|
||||
|
||||
} else {
|
||||
epicsThreadSleep(0.0001); // seconds
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct {
|
||||
bool operator()(const NormalisedEvent l, const NormalisedEvent r) const {
|
||||
return l.timestamp < r.timestamp;
|
||||
}
|
||||
} oldestEventsFirst;
|
||||
|
||||
inline int eventsInQueue(epicsRingBytesId id) {
|
||||
return epicsRingBytesUsedBytes(id) / sizeof(NormalisedEvent);
|
||||
}
|
||||
|
||||
void asynStreamGeneratorDriver::partialSortEvents() {
|
||||
|
||||
const char *functionName = "partialSortEvents";
|
||||
|
||||
// x * number of ids * max events in packet
|
||||
int bufferedEvents = 5 * 10 * 243;
|
||||
NormalisedEvent events[bufferedEvents];
|
||||
|
||||
int queuedEvents = 0;
|
||||
epicsTimeStamp lastSort = epicsTime::getCurrent();
|
||||
epicsTimeStamp currentTime = lastSort;
|
||||
|
||||
while (true) {
|
||||
|
||||
queuedEvents =
|
||||
eventsInQueue(this->normalisedQueue); // in case we can't wait
|
||||
lastSort = epicsTime::getCurrent();
|
||||
currentTime = lastSort;
|
||||
|
||||
// wait for mininmum packet frequency or enough packets to ensure we
|
||||
// could potentially have at least 1 packet per mcpdid
|
||||
while (queuedEvents < bufferedEvents &&
|
||||
epicsTimeDiffInNS(¤tTime, &lastSort) < 250'000'000ull) {
|
||||
epicsThreadSleep(0.0001); // seconds
|
||||
currentTime = epicsTime::getCurrent();
|
||||
queuedEvents = eventsInQueue(this->normalisedQueue);
|
||||
}
|
||||
|
||||
queuedEvents = std::min(queuedEvents, bufferedEvents);
|
||||
|
||||
if (queuedEvents) {
|
||||
epicsRingBytesGet(this->normalisedQueue, (char *)events,
|
||||
queuedEvents * sizeof(NormalisedEvent));
|
||||
|
||||
std::sort(events, events + queuedEvents, oldestEventsFirst);
|
||||
|
||||
epicsRingBytesPut(this->sortedQueue, (char *)events,
|
||||
queuedEvents * sizeof(NormalisedEvent));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &ne) {
|
||||
if (this->kafkaEnabled) {
|
||||
if (ne.source == 0)
|
||||
epicsRingBytesPut(this->monitorQueue, (char *)&ne,
|
||||
sizeof(NormalisedEvent));
|
||||
else
|
||||
epicsRingBytesPut(this->detectorQueue, (char *)&ne,
|
||||
sizeof(NormalisedEvent));
|
||||
}
|
||||
}
|
||||
|
||||
void asynStreamGeneratorDriver::processEvents() {
|
||||
|
||||
const char *functionName = "processEvents";
|
||||
|
||||
// x * number of ids * max events in packet * event size
|
||||
int bufferedEvents = 5 * 10 * 243;
|
||||
// we need a little extra space for merge sorting in
|
||||
int extraBufferedEvents = 1 * 10 * 243;
|
||||
|
||||
// we have two buffers. We alternate between reading data into one of them,
|
||||
// and then merge sorting into the other
|
||||
NormalisedEvent eventsABuffer[(bufferedEvents + extraBufferedEvents)];
|
||||
NormalisedEvent eventsBBuffer[(bufferedEvents + extraBufferedEvents)];
|
||||
|
||||
NormalisedEvent *eventsA = &eventsABuffer[0];
|
||||
NormalisedEvent *eventsB = &eventsBBuffer[0];
|
||||
NormalisedEvent *eventsBLastStart = eventsB + bufferedEvents;
|
||||
NormalisedEvent *eventsBLastEnd = eventsBLastStart;
|
||||
|
||||
int queuedEvents = 0;
|
||||
|
||||
epicsTimeStamp lastProcess = epicsTime::getCurrent();
|
||||
epicsTimeStamp currentTime = lastProcess;
|
||||
|
||||
epicsInt64 counts[this->num_channels];
|
||||
double elapsedSeconds = 0;
|
||||
uint64_t startTimestamp = std::numeric_limits<uint64_t>::max();
|
||||
uint64_t currTimestamp;
|
||||
|
||||
epicsInt32 currStatus = STATUS_IDLE;
|
||||
epicsInt32 prevStatus = STATUS_IDLE;
|
||||
epicsInt32 countPreset;
|
||||
epicsInt32 timePreset;
|
||||
epicsInt32 presetChannel;
|
||||
epicsInt32 udpQueueHighWaterMark = 0;
|
||||
epicsInt32 sortedQueueHighWaterMark = 0;
|
||||
|
||||
while (true) {
|
||||
|
||||
queuedEvents =
|
||||
eventsInQueue(this->sortedQueue); // in case we can't wait
|
||||
lastProcess = epicsTime::getCurrent();
|
||||
currentTime = lastProcess;
|
||||
|
||||
// wait for mininmum packet frequency or enough packets to ensure we
|
||||
// could potentially have at least 1 packet per mcpdid
|
||||
while (queuedEvents < bufferedEvents &&
|
||||
epicsTimeDiffInNS(¤tTime, &lastProcess) < 250'000'000ull) {
|
||||
epicsThreadSleep(0.0001); // seconds
|
||||
currentTime = epicsTime::getCurrent();
|
||||
queuedEvents = eventsInQueue(this->sortedQueue);
|
||||
}
|
||||
|
||||
getIntegerParam(this->P_Status, &currStatus);
|
||||
|
||||
queuedEvents = std::min(queuedEvents, bufferedEvents);
|
||||
|
||||
NormalisedEvent *newStartPtr = eventsA + extraBufferedEvents;
|
||||
|
||||
// We read into the array, such that we have enough space, that the
|
||||
// entirety of the leftover from the previous read can fit before this
|
||||
// new read, in the case that all new events are newer timewise, and
|
||||
// therefore, all events from eventsB have to be placed in a preceeding
|
||||
// position.
|
||||
epicsRingBytesGet(this->sortedQueue, (char *)newStartPtr,
|
||||
queuedEvents * sizeof(NormalisedEvent));
|
||||
|
||||
int toProcess =
|
||||
eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5;
|
||||
|
||||
// TODO could also consider an in-place merge
|
||||
eventsBLastEnd = std::merge(newStartPtr, newStartPtr + queuedEvents,
|
||||
eventsBLastStart, eventsBLastEnd, eventsA,
|
||||
oldestEventsFirst);
|
||||
|
||||
eventsBLastStart = eventsA + toProcess;
|
||||
|
||||
// TODO I haven't really taken care of the case that there are no events
|
||||
|
||||
if (prevStatus == STATUS_IDLE && currStatus == STATUS_COUNTING) {
|
||||
|
||||
getIntegerParam(this->P_CountPreset, &countPreset);
|
||||
getIntegerParam(this->P_TimePreset, &timePreset);
|
||||
getIntegerParam(this->P_MonitorChannel, &presetChannel);
|
||||
|
||||
// reset status variables
|
||||
startTimestamp = eventsA[0].timestamp;
|
||||
elapsedSeconds = 0;
|
||||
for (size_t i = 0; i < this->num_channels; ++i) {
|
||||
counts[i] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (currStatus == STATUS_COUNTING) {
|
||||
|
||||
// The elapsedSeconds are round differently depending on whether we
|
||||
// are using them for comparison, or for showing to the user, to
|
||||
// try and make sure the data we send to kafka is correct, while
|
||||
// the measurement time also appears intuitive.
|
||||
for (size_t i = 0; i < toProcess; ++i) {
|
||||
counts[eventsA[i].source == 0 ? eventsA[i].pixelId + 1 : 0] +=
|
||||
1;
|
||||
elapsedSeconds = (eventsA[i].timestamp - startTimestamp) / 1e9;
|
||||
|
||||
// TODO should really check there an no more events with the
|
||||
// same final timestamp
|
||||
if ((countPreset && counts[presetChannel] >= countPreset) ||
|
||||
(timePreset && elapsedSeconds > (double)timePreset))
|
||||
break;
|
||||
|
||||
// TODO also batchwise?
|
||||
this->queueForKafka(eventsA[i]);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < num_channels; ++i) {
|
||||
setInteger64Param(P_Counts[i], counts[i]);
|
||||
}
|
||||
setDoubleParam(P_ElapsedTime, elapsedSeconds);
|
||||
|
||||
if ((countPreset && counts[presetChannel] >= countPreset) ||
|
||||
(timePreset && elapsedSeconds > (double)timePreset)) {
|
||||
setIntegerParam(this->P_Status, STATUS_IDLE);
|
||||
setIntegerParam(this->P_CountPreset, 0);
|
||||
setIntegerParam(this->P_TimePreset, 0);
|
||||
}
|
||||
}
|
||||
|
||||
prevStatus = currStatus;
|
||||
|
||||
std::swap(eventsA, eventsB);
|
||||
}
|
||||
}
|
||||
|
||||
void asynStreamGeneratorDriver::produce(epicsRingBytesId eventQueue,
|
||||
rd_kafka_t *kafkaProducer,
|
||||
const char *topic, const char *source) {
|
||||
|
||||
flatbuffers::FlatBufferBuilder builder(1024);
|
||||
|
||||
const std::size_t bufferSize = this->kafkaMaxPacketSize + 16;
|
||||
|
||||
std::vector<uint32_t> tof;
|
||||
tof.reserve(bufferSize);
|
||||
|
||||
std::vector<uint32_t> did;
|
||||
did.reserve(bufferSize);
|
||||
|
||||
epicsTimeStamp last_sent = epicsTime::getCurrent();
|
||||
epicsTimeStamp now = last_sent;
|
||||
int total = 0;
|
||||
uint64_t message_id = 0;
|
||||
|
||||
NormalisedEvent ne;
|
||||
|
||||
while (true) {
|
||||
|
||||
if (!epicsRingBytesIsEmpty(eventQueue)) {
|
||||
|
||||
++total;
|
||||
epicsRingBytesGet(eventQueue, (char *)&ne, sizeof(NormalisedEvent));
|
||||
tof.push_back(ne.timestamp);
|
||||
did.push_back(ne.pixelId);
|
||||
|
||||
} else {
|
||||
epicsThreadSleep(0.001); // seconds
|
||||
}
|
||||
|
||||
now = epicsTime::getCurrent();
|
||||
|
||||
// At least every 0.2 seconds
|
||||
if (total >= this->kafkaMaxPacketSize ||
|
||||
epicsTimeDiffInNS(&now, &last_sent) > 250'000'000ll) {
|
||||
last_sent = epicsTime::getCurrent();
|
||||
|
||||
if (total) {
|
||||
total = 0;
|
||||
|
||||
builder.Clear();
|
||||
|
||||
auto message = CreateEventMessageDirect(
|
||||
builder, source, message_id++,
|
||||
((uint64_t)now.secPastEpoch) * 1'000'000'000ull +
|
||||
((uint64_t)now.nsec),
|
||||
&tof, &did);
|
||||
|
||||
builder.Finish(message, "ev42");
|
||||
|
||||
rd_kafka_resp_err_t err = rd_kafka_producev(
|
||||
kafkaProducer, RD_KAFKA_V_TOPIC(topic),
|
||||
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
|
||||
// RD_KAFKA_V_KEY((void *)key, key_len),
|
||||
RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(),
|
||||
builder.GetSize()),
|
||||
// RD_KAFKA_V_OPAQUE(NULL),
|
||||
RD_KAFKA_V_END);
|
||||
|
||||
if (err) {
|
||||
epicsStdoutPrintf("Failed to produce to topic %s: %s\n",
|
||||
topic, rd_kafka_err2str(err));
|
||||
}
|
||||
|
||||
rd_kafka_poll(kafkaProducer, 0);
|
||||
|
||||
tof.clear();
|
||||
did.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void asynStreamGeneratorDriver::produceMonitor() {
|
||||
this->produce(monitorQueue, monitorProducer, monitorTopic, "monitor");
|
||||
}
|
||||
|
||||
void asynStreamGeneratorDriver::produceDetector() {
|
||||
this->produce(detectorQueue, detectorProducer, detectorTopic, "detector");
|
||||
}
|
||||
|
||||
/*******************************************************************************
|
||||
* Methods exposed to IOC Shell
|
||||
*/
|
||||
extern "C" {
|
||||
|
||||
asynStatus asynStreamGeneratorDriverConfigure(
|
||||
const char *portName, const char *ipPortName, const int numChannels,
|
||||
const int udpQueueSize, const char *kafkaBroker, const char *monitorTopic,
|
||||
const char *detectorTopic, const int kafkaQueueSize,
|
||||
const int kafkaMaxPacketSize) {
|
||||
new asynStreamGeneratorDriver(portName, ipPortName, numChannels,
|
||||
udpQueueSize, kafkaBroker[0], kafkaBroker,
|
||||
monitorTopic, detectorTopic, kafkaQueueSize,
|
||||
kafkaMaxPacketSize);
|
||||
return asynSuccess;
|
||||
}
|
||||
|
||||
static const iocshArg initArg0 = {"portName", iocshArgString};
|
||||
static const iocshArg initArg1 = {"ipPortName", iocshArgString};
|
||||
static const iocshArg initArg2 = {"numChannels", iocshArgInt};
|
||||
static const iocshArg initArg3 = {"udpQueueSize", iocshArgInt};
|
||||
static const iocshArg initArg4 = {"kafkaBroker", iocshArgString};
|
||||
static const iocshArg initArg5 = {"monitorTopic", iocshArgString};
|
||||
static const iocshArg initArg6 = {"detectorTopic", iocshArgString};
|
||||
static const iocshArg initArg7 = {"kafkaQueueSize", iocshArgInt};
|
||||
static const iocshArg initArg8 = {"kafkaMaxPacketSize", iocshArgInt};
|
||||
static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2,
|
||||
&initArg3, &initArg4, &initArg5,
|
||||
&initArg6, &initArg7, &initArg8};
|
||||
static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 9, initArgs};
|
||||
static void initCallFunc(const iocshArgBuf *args) {
|
||||
asynStreamGeneratorDriverConfigure(
|
||||
args[0].sval, args[1].sval, args[2].ival, args[3].ival, args[4].sval,
|
||||
args[5].sval, args[6].sval, args[7].ival, args[8].ival);
|
||||
}
|
||||
|
||||
void asynStreamGeneratorDriverRegister(void) {
|
||||
iocshRegister(&initFuncDef, initCallFunc);
|
||||
}
|
||||
|
||||
epicsExportRegistrar(asynStreamGeneratorDriverRegister);
|
||||
}
|
||||
1
src/asynStreamGeneratorDriver.dbd
Normal file
1
src/asynStreamGeneratorDriver.dbd
Normal file
@@ -0,0 +1 @@
|
||||
registrar("asynStreamGeneratorDriverRegister")
|
||||
187
src/asynStreamGeneratorDriver.h
Normal file
187
src/asynStreamGeneratorDriver.h
Normal file
@@ -0,0 +1,187 @@
|
||||
#ifndef asynStreamGeneratorDriver_H
|
||||
#define asynStreamGeneratorDriver_H
|
||||
|
||||
#include "asynPortDriver.h"
|
||||
#include <epicsRingBytes.h>
|
||||
#include <librdkafka/rdkafka.h>
|
||||
|
||||
/*******************************************************************************
|
||||
* UDP Packet Definitions
|
||||
*/
|
||||
struct __attribute__((__packed__)) UDPHeader {
|
||||
uint16_t BufferLength;
|
||||
uint16_t BufferType;
|
||||
uint16_t HeaderLength;
|
||||
uint16_t BufferNumber;
|
||||
uint16_t RunCmdID;
|
||||
uint16_t Status : 8;
|
||||
uint16_t McpdID : 8;
|
||||
uint16_t TimeStamp[3];
|
||||
uint16_t Parameter0[3];
|
||||
uint16_t Parameter1[3];
|
||||
uint16_t Parameter2[3];
|
||||
uint16_t Parameter3[3];
|
||||
|
||||
inline uint64_t nanosecs() const {
|
||||
uint64_t nsec{((uint64_t)TimeStamp[2]) << 32 |
|
||||
((uint64_t)TimeStamp[1]) << 16 | (uint64_t)TimeStamp[0]};
|
||||
return nsec * 100;
|
||||
}
|
||||
};
|
||||
|
||||
struct __attribute__((__packed__)) DetectorEvent {
|
||||
uint64_t TimeStamp : 19;
|
||||
uint16_t XPosition : 10;
|
||||
uint16_t YPosition : 10;
|
||||
uint16_t Amplitude : 8;
|
||||
uint16_t Id : 1;
|
||||
inline uint32_t nanosecs() const { return TimeStamp * 100; }
|
||||
inline uint64_t pixelId(uint32_t mpcdId) const {
|
||||
const uint32_t x_pixels = 128;
|
||||
const uint32_t y_pixels = 128;
|
||||
return (mpcdId - 1) * x_pixels * y_pixels +
|
||||
x_pixels * (uint32_t)this->XPosition + (uint32_t)this->YPosition;
|
||||
}
|
||||
};
|
||||
|
||||
struct __attribute__((__packed__)) MonitorEvent {
|
||||
uint64_t TimeStamp : 19;
|
||||
uint64_t Data : 21;
|
||||
uint64_t DataID : 4;
|
||||
uint64_t TriggerID : 3;
|
||||
uint64_t Id : 1;
|
||||
inline uint32_t nanosecs() const { return TimeStamp * 100; }
|
||||
};
|
||||
|
||||
/*******************************************************************************
|
||||
* Simplified Event Struct Definition
|
||||
*/
|
||||
|
||||
struct __attribute__((__packed__)) NormalisedEvent {
|
||||
uint64_t timestamp;
|
||||
uint32_t pixelId : 24;
|
||||
uint8_t source;
|
||||
|
||||
// inline NormalisedEvent(uint64_t timestamp, uint8_t source, uint32_t
|
||||
// pixelId)
|
||||
// : timestamp(timestamp), source(source), pixelId(pixelId){};
|
||||
};
|
||||
|
||||
/*******************************************************************************
|
||||
* Status values that should match the definition in db/daq_common.db
|
||||
*/
|
||||
#define STATUS_IDLE 0
|
||||
#define STATUS_COUNTING 1
|
||||
#define STATUS_LOWRATE 2
|
||||
#define STATUS_PAUSED 3
|
||||
|
||||
/*******************************************************************************
|
||||
* Parameters for use in DB records
|
||||
*
|
||||
* i.e.e drvInfo strings that are used to identify the parameters
|
||||
*/
|
||||
|
||||
#define P_StatusString "STATUS"
|
||||
#define P_ResetString "RESET"
|
||||
#define P_StopString "STOP"
|
||||
#define P_CountPresetString "P_CNT"
|
||||
#define P_TimePresetString "P_TIME"
|
||||
#define P_ElapsedTimeString "TIME"
|
||||
#define P_ClearElapsedTimeString "C_TIME"
|
||||
#define P_MonitorChannelString "MONITOR"
|
||||
#define P_ThresholdString "THRESH"
|
||||
#define P_ThresholdChannelString "THRESH_CH"
|
||||
|
||||
#define P_CountsString "COUNTS%d"
|
||||
#define P_RateString "RATE%d"
|
||||
#define P_ClearCountsString "C_%d"
|
||||
|
||||
#define P_UdpDroppedString "DROP"
|
||||
#define P_UdpQueueHighWaterMarkString "UDP"
|
||||
#define P_SortedQueueHighWaterMarkString "SORT"
|
||||
|
||||
/*******************************************************************************
|
||||
* Stream Generator Coordinating Class
|
||||
*/
|
||||
class asynStreamGeneratorDriver : public asynPortDriver {
|
||||
public:
|
||||
asynStreamGeneratorDriver(const char *portName, const char *ipPortName,
|
||||
const int numChannels, const int udpQueueSize,
|
||||
const bool enableKafkaStream,
|
||||
const char *kafkaBroker, const char *monitorTopic,
|
||||
const char *detectorTopic,
|
||||
const int kafkaQueueSize,
|
||||
const int kafkaMaxPacketSize);
|
||||
virtual ~asynStreamGeneratorDriver();
|
||||
|
||||
virtual asynStatus readInt32(asynUser *pasynUser, epicsInt32 *value);
|
||||
virtual asynStatus writeInt32(asynUser *pasynUser, epicsInt32 value);
|
||||
|
||||
void receiveUDP();
|
||||
void normaliseUDP();
|
||||
void partialSortEvents();
|
||||
void processEvents();
|
||||
void produceMonitor();
|
||||
void produceDetector();
|
||||
|
||||
protected:
|
||||
// Parameter Identifying IDs
|
||||
int P_Status;
|
||||
int P_Reset;
|
||||
int P_Stop;
|
||||
int P_CountPreset;
|
||||
int P_TimePreset;
|
||||
int P_ElapsedTime;
|
||||
int P_ClearElapsedTime;
|
||||
int P_MonitorChannel;
|
||||
int P_Threshold;
|
||||
int P_ThresholdChannel;
|
||||
int *P_Counts;
|
||||
int *P_Rates;
|
||||
int *P_ClearCounts;
|
||||
|
||||
// System Status Parameter Identifying IDs
|
||||
int P_UdpDropped;
|
||||
int P_UdpQueueHighWaterMark;
|
||||
int P_SortedQueueHighWaterMark;
|
||||
|
||||
private:
|
||||
asynUser *pasynUDPUser;
|
||||
epicsEventId pausedEventId;
|
||||
|
||||
const int num_channels;
|
||||
const bool kafkaEnabled;
|
||||
const int kafkaQueueSize;
|
||||
const int kafkaMaxPacketSize;
|
||||
|
||||
const int udpQueueSize;
|
||||
epicsRingBytesId udpQueue;
|
||||
epicsRingBytesId normalisedQueue;
|
||||
epicsRingBytesId sortedQueue;
|
||||
|
||||
epicsRingBytesId monitorQueue;
|
||||
rd_kafka_t *monitorProducer;
|
||||
const char *monitorTopic;
|
||||
|
||||
epicsRingBytesId detectorQueue;
|
||||
rd_kafka_t *detectorProducer;
|
||||
const char *detectorTopic;
|
||||
|
||||
constexpr static char *driverName = "StreamGenerator";
|
||||
|
||||
asynStatus createInt32Param(asynStatus status, char *name, int *variable,
|
||||
epicsInt32 initialValue = 0);
|
||||
|
||||
asynStatus createInt64Param(asynStatus status, char *name, int *variable,
|
||||
epicsInt64 initialValue = 0);
|
||||
|
||||
asynStatus createFloat64Param(asynStatus status, char *name, int *variable,
|
||||
double initialValue = 0);
|
||||
|
||||
inline void queueForKafka(NormalisedEvent &ne);
|
||||
|
||||
void produce(epicsRingBytesId eventQueue, rd_kafka_t *kafkaProducer,
|
||||
const char *topic, const char *source);
|
||||
};
|
||||
|
||||
#endif
|
||||
342
udp_rate.py
342
udp_rate.py
@@ -1,342 +0,0 @@
|
||||
import queue
|
||||
import socket
|
||||
import time
|
||||
import threading
|
||||
from uuid import uuid4
|
||||
import math
|
||||
|
||||
from confluent_kafka import Producer
|
||||
import streaming_data_types
|
||||
|
||||
# receiving directly (can also specify correlation unit ip)
|
||||
UDP_IP = ""
|
||||
UDP_PORT = 54321
|
||||
|
||||
# If redirecting traffic via
|
||||
# socat -U - udp4-recv:54321 | tee >( socat -u - udp4-datagram:127.0.0.1:54322 ) | socat -u - udp4-datagram:127.0.0.1:54323
|
||||
# UDP_IP = "127.0.0.1"
|
||||
# UDP_PORT = 54323
|
||||
|
||||
WINDOWSECONDS = 5
|
||||
WINDOWSIZE = 20000 * WINDOWSECONDS
|
||||
MONITORS = 4 # We have max 4 monitors
|
||||
|
||||
time_offset = None # Estimate of clock offset
|
||||
|
||||
time_window = {
|
||||
i: queue.Queue(maxsize=WINDOWSIZE)
|
||||
for i in range(MONITORS)
|
||||
}
|
||||
|
||||
# event_time_window = queue.Queue(maxsize=50000 * WINDOWSECONDS)
|
||||
EVENT_WINDOWSIZE = 50000
|
||||
EVENT_WINDOW_PTR = 0
|
||||
event_time_window = [0 for i in range(EVENT_WINDOWSIZE)]
|
||||
|
||||
event_average_rate = 0
|
||||
event_last_timestamp = None
|
||||
|
||||
MISSED_PACKETS = -9 # All modules appear to miss the first time due to initialisation as 0
|
||||
|
||||
# missed_packets_time_window = queue.Queue(maxsize=100)
|
||||
|
||||
def print_monitor_rates():
|
||||
while True:
|
||||
for i in range(MONITORS):
|
||||
msg = f"Monitor {i+1}: {time_window[i].qsize() / WINDOWSECONDS} cts/s"
|
||||
try:
|
||||
earliest = time_window[i].queue[0]
|
||||
newest = max(time_window[i].queue)
|
||||
t = time.time()
|
||||
msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s'
|
||||
except:
|
||||
pass
|
||||
|
||||
print(msg)
|
||||
|
||||
# try:
|
||||
# print(f'Events: {1 / event_average_rate} cts/s')
|
||||
# except:
|
||||
# pass
|
||||
|
||||
try:
|
||||
print(f'Events: {round(1 / (sum(event_time_window) / EVENT_WINDOWSIZE * 1e-7), 2)} cts/s')
|
||||
except:
|
||||
pass
|
||||
|
||||
print(f'Missed Packets: {MISSED_PACKETS}')
|
||||
|
||||
# Detector Events
|
||||
# msg = f"Events : {event_time_window.qsize() / WINDOWSECONDS} cts/s"
|
||||
# try:
|
||||
# earliest = event_time_window.queue[0]
|
||||
# newest = max(event_time_window.queue)
|
||||
# t = time.time()
|
||||
# msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s'
|
||||
# except:
|
||||
# pass
|
||||
|
||||
# print(msg)
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
threading.Thread(target=print_monitor_rates, daemon=True).start()
|
||||
|
||||
def clean_monitor_rates():
|
||||
latest = 0
|
||||
while True:
|
||||
for d_id in range(MONITORS):
|
||||
t_w = time_window[d_id]
|
||||
if not t_w.empty():
|
||||
# TODO probably should switch to a priority queue
|
||||
# as the messages might not be in order
|
||||
# TODO could also just replace with a low-pass filter
|
||||
# would be a lot more efficient
|
||||
# TODO the way this is done, we need trigger events
|
||||
# in order for the signal to decay back to 0.
|
||||
# If no events come, the rate remains stuck
|
||||
latest = max(latest, max(t_w.queue))
|
||||
# latest = time_window[1].queue[-1]
|
||||
try:
|
||||
while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7):
|
||||
t_w.get_nowait()
|
||||
except IndexError:
|
||||
pass
|
||||
time.sleep(0.01)
|
||||
|
||||
threading.Thread(target=clean_monitor_rates, daemon=True).start()
|
||||
|
||||
|
||||
# def clean_event_rates():
|
||||
# latest = 0
|
||||
# while True:
|
||||
# t_w = event_time_window
|
||||
# if not t_w.empty():
|
||||
# # TODO probably should switch to a priority queue
|
||||
# # as the messages might not be in order
|
||||
# # TODO could also just replace with a low-pass filter
|
||||
# # would be a lot more efficient
|
||||
# # TODO the way this is done, we need trigger events
|
||||
# # in order for the signal to decay back to 0.
|
||||
# # If no events come, the rate remains stuck
|
||||
# #latest = max(latest, max(t_w.queue))
|
||||
# try:
|
||||
# latest = time_window[1].queue[-1]
|
||||
# while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7):
|
||||
# t_w.get_nowait()
|
||||
# except IndexError:
|
||||
# pass
|
||||
# time.sleep(0.005)
|
||||
#
|
||||
# threading.Thread(target=clean_event_rates, daemon=True).start()
|
||||
|
||||
|
||||
|
||||
|
||||
# Event Kafka Producer
|
||||
|
||||
event_queue = queue.Queue()
|
||||
|
||||
def event_producer():
|
||||
producer_config = {
|
||||
'bootstrap.servers': "linkafka01:9092",
|
||||
'queue.buffering.max.messages': 1e7,
|
||||
}
|
||||
prod = Producer(producer_config)
|
||||
|
||||
st = time.time()
|
||||
|
||||
msg_id = 0
|
||||
|
||||
b_size = 10000
|
||||
b_ptr = 0
|
||||
pixel_buffer = [0 for _ in range(b_size)]
|
||||
time_buffer = [0 for _ in range(b_size)]
|
||||
poll_cnt = 0
|
||||
|
||||
while True:
|
||||
(p_id, timestamp) = event_queue.get()
|
||||
|
||||
pixel_buffer[b_ptr] = p_id
|
||||
time_buffer[b_ptr] = timestamp
|
||||
b_ptr += 1
|
||||
|
||||
nt = time.time()
|
||||
if b_ptr == b_size or nt - st > 0.001:
|
||||
st = nt
|
||||
|
||||
if b_ptr > 0:
|
||||
message = streaming_data_types.serialise_ev42(
|
||||
message_id = msg_id,
|
||||
pulse_time = time_buffer[0] * 100, # int(time.time() * 1_000_000_000),
|
||||
time_of_flight = time_buffer[0:b_ptr],
|
||||
detector_id = pixel_buffer[0:b_ptr],
|
||||
source_name = '',
|
||||
)
|
||||
|
||||
msg_id = (msg_id + 1) % 100000000
|
||||
b_ptr = 0
|
||||
|
||||
prod.produce(
|
||||
topic = "DMC_detector",
|
||||
value = message,
|
||||
partition = 0,
|
||||
)
|
||||
|
||||
# if poll_cnt % 1000 == 0:
|
||||
prod.poll(0)
|
||||
poll_cnt = (poll_cnt + 1) % 1000
|
||||
|
||||
threading.Thread(target=event_producer, daemon=True).start()
|
||||
|
||||
# Monitor Kafka Producer
|
||||
|
||||
monitor_queue = queue.Queue()
|
||||
|
||||
def monitor_producer():
|
||||
producer_config = {
|
||||
'bootstrap.servers': "linkafka01:9092",
|
||||
'queue.buffering.max.messages': 1e7,
|
||||
}
|
||||
prod = Producer(producer_config)
|
||||
|
||||
monitor_buffer = [0 for i in range(MONITORS)]
|
||||
monitor_time = [0 for i in range(MONITORS)]
|
||||
|
||||
st = time.time()
|
||||
|
||||
poll_cnt = 0
|
||||
|
||||
while True:
|
||||
(d_id, timestamp) = monitor_queue.get()
|
||||
|
||||
monitor_buffer[d_id] += 1
|
||||
monitor_time[d_id] = timestamp
|
||||
|
||||
nt = time.time()
|
||||
if nt - st > 0.05:
|
||||
st = nt
|
||||
|
||||
for i in range(MONITORS):
|
||||
if monitor_buffer[d_id]:
|
||||
message = streaming_data_types.serialise_f142(
|
||||
source_name = f"monitor{d_id+1}",
|
||||
value = monitor_buffer[d_id],
|
||||
# ns resolution (supposed to be past epoch, not what the detector returns though)
|
||||
timestamp_unix_ns = monitor_time[d_id] * 100 # send time of last monitor
|
||||
)
|
||||
|
||||
prod.produce(
|
||||
topic = "DMC_neutron_monitor",
|
||||
value = message,
|
||||
partition = 0,
|
||||
)
|
||||
|
||||
monitor_buffer[d_id] = 0
|
||||
|
||||
if poll_cnt % 1000 == 0:
|
||||
prod.poll(0)
|
||||
poll_cnt = (poll_cnt + 1) % 1000
|
||||
|
||||
threading.Thread(target=monitor_producer, daemon=True).start()
|
||||
|
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sock.bind((UDP_IP, UDP_PORT))
|
||||
|
||||
val = 0
|
||||
start_time = time.time()
|
||||
|
||||
module_counts = [0 for i in range(10)]
|
||||
|
||||
EVENTS = 0
|
||||
|
||||
while True:
|
||||
data, addr = sock.recvfrom(2056) # Buffer size is 1024 bytes
|
||||
raw_header = data[:42]
|
||||
raw_data = data[42:]
|
||||
|
||||
(buffer_length, buffer_type, header_length,
|
||||
buffer_number, run_id, mcpd_status,
|
||||
t_low, t_mid, t_high, *_) = memoryview(raw_header).cast('H')
|
||||
mcpd_id = ( mcpd_status >> 8 ) & 0xff
|
||||
mcpd_status = ( mcpd_status ) & 0x3
|
||||
running_msg = "running" if (mcpd_status & 0x1) else "stopped"
|
||||
sync_msg = "in sync" if (mcpd_status & 0x2) else "sync error"
|
||||
timestamp = ( t_high << 32 ) | ( t_mid << 16 ) | t_low # 100 ns resolution
|
||||
#print(f'Packet {int(timestamp * 1e-7)}s => buffer: {buffer_number}, length: {int(buffer_length*2/6)} events, status: {mcpd_status} {mcpd_id} {running_msg} with {sync_msg}')
|
||||
# print(f'Packet => buffer: {mcpd_id}-{buffer_number}, length: {int((buffer_length-21)/3)} events, status: {mcpd_status}')
|
||||
|
||||
if time_offset is None:
|
||||
time_offset = time.time() * 1e7 - timestamp
|
||||
|
||||
if buffer_number - module_counts[mcpd_id] != 1:
|
||||
MISSED_PACKETS += 1
|
||||
# if missed_packets_time_window.full():
|
||||
# missed_packets_time_window.get_nowait()
|
||||
# missed_packets_time_window.put(timestamp)
|
||||
|
||||
module_counts[mcpd_id] = buffer_number
|
||||
|
||||
for i in range(0, len(raw_data), 6):
|
||||
event = memoryview(raw_data)[i:i+6]
|
||||
event_type = event[5] >> 7
|
||||
# print(event_type)
|
||||
|
||||
if event_type: # Trigger Event
|
||||
t_id = ( event[5] >> 4 ) & 0x7
|
||||
d_id = event[5] & 0xf
|
||||
event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0]
|
||||
# print(f'Trigger event {event_timestamp * 1e-7}s => TrigID: {t_id}, DataID: {d_id}')
|
||||
|
||||
t_w = time_window[d_id]
|
||||
t_w.put_nowait(event_timestamp)
|
||||
|
||||
monitor_queue.put_nowait((d_id, event_timestamp))
|
||||
|
||||
else: # Neutron Event
|
||||
x_pixels = 128
|
||||
y_pixels = 128
|
||||
amplitude = ( event[5] << 1 ) | ( event[4] >> 7 )
|
||||
|
||||
# The DMC StreamHistogrammer setup currently expects each module to
|
||||
# be 128 * 128 pixels but the resolution in the packages is
|
||||
# actually 10bit. We remove the lowest 3 bits.
|
||||
x = (( (event[3] & 0x1f) << 5 | (event[2] & 0xf8) >> 3 ) & 0x3ff) >> 3
|
||||
y = (( (event[4] & 0x7f) << 3 | (event[3] & 0xe0) >> 5 ) & 0x3ff) >> 3
|
||||
event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0]
|
||||
# print(f'Neutron event {event_timestamp * 1e-7}s: {amplitude}, x: {x}, y: {y}')
|
||||
|
||||
|
||||
if event_last_timestamp is None:
|
||||
event_last_timestamp = event_timestamp
|
||||
|
||||
# Seems like at higher frequencies these come very much out of order
|
||||
# so this is very approximate
|
||||
event_time_window[EVENT_WINDOW_PTR] = event_timestamp - event_last_timestamp
|
||||
EVENT_WINDOW_PTR = (EVENT_WINDOW_PTR + 1) % EVENT_WINDOWSIZE
|
||||
event_last_timestamp = event_timestamp
|
||||
|
||||
# I suppose this doesn't work mostly due to the timestamps ordering...
|
||||
# event_timestamp_seconds = event_timestamp * 1e-7
|
||||
# if event_last_timestamp is None:
|
||||
# event_last_timestamp = event_timestamp_seconds
|
||||
|
||||
# f_cutoff = 1e6 # Hz
|
||||
# tau = 1 / ( 2 * math.pi * f_cutoff)
|
||||
# dt = event_timestamp_seconds - event_last_timestamp
|
||||
# if dt > 0:
|
||||
# w = math.exp(-dt / tau)
|
||||
# event_average_rate = w * dt + event_average_rate * (1 - w)
|
||||
# event_last_timestamp = event_timestamp_seconds
|
||||
|
||||
# EVENTS += 1
|
||||
# a = (mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y
|
||||
# print((EVENTS, x, y, a, a < 128 * 128 * 9, mcpd_id))
|
||||
# if not a < 128 * 128 * 9:
|
||||
# print((event[3], event[3] << 5, event[2], event[2] >> 3))
|
||||
|
||||
event_queue.put_nowait((
|
||||
(mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y,
|
||||
event_timestamp
|
||||
))
|
||||
Reference in New Issue
Block a user