Compare commits
58 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| aa00966599 | |||
| 8ae9cb0bd8 | |||
| 31aa9d246a | |||
| f169076f65 | |||
| fdbb8f5061 | |||
| 668dd65823 | |||
| 5b65a01e51 | |||
| 594bb6d320 | |||
| 7865273707 | |||
| 2ede400791 | |||
| a13c5b81e2 | |||
| 66792837a6 | |||
| c563b07fed | |||
| dc5244bc43 | |||
| 2f60ac2a24 | |||
| d7a4d057aa | |||
| 0819c5fb12 | |||
| a7c5f9413b | |||
| 205eedbd88 | |||
| d80155ef7d | |||
| ba3c3b5208 | |||
| 5ffd784769 | |||
| 9bfaabdd99 | |||
| 9d93238db4 | |||
| c530de3566 | |||
| ba07a8af9b | |||
| 77ed74a203 | |||
| 8f8b78a9bf | |||
| 6faf23601e | |||
| 18da14f6d6 | |||
| 9d5ed11dac | |||
| 318357127e | |||
| 2f50a21e83 | |||
| e53a2a4f40 | |||
| 5f95e82a3c | |||
| 2ccf37ce33 | |||
| 617dd3153b | |||
| e5cb019143 | |||
| 70c04af034 | |||
| 056b0a5f8a | |||
| 1ce7f93e95 | |||
| ecc6e98f4c | |||
| 2c47f338c2 | |||
| 60aa1652c3 | |||
| 81bd3bef7f | |||
| e65725609c | |||
| a336ca74c9 | |||
| 7bacc716cc | |||
| 1e853487aa | |||
| b9e5f40c21 | |||
| d7bf3977fc | |||
| 750436732c | |||
| 4c1741bd4b | |||
| 09ba30025a | |||
| 2d065a0db9 | |||
| 2d5a43c09a | |||
| c2ca5f699c | |||
| b810509156 |
245
.clang-format
Normal file
245
.clang-format
Normal file
@@ -0,0 +1,245 @@
|
||||
---
|
||||
Language: Cpp
|
||||
# BasedOnStyle: LLVM
|
||||
AccessModifierOffset: -2
|
||||
AlignAfterOpenBracket: Align
|
||||
AlignArrayOfStructures: None
|
||||
AlignConsecutiveAssignments:
|
||||
Enabled: false
|
||||
AcrossEmptyLines: false
|
||||
AcrossComments: false
|
||||
AlignCompound: false
|
||||
AlignFunctionPointers: false
|
||||
PadOperators: true
|
||||
AlignConsecutiveBitFields:
|
||||
Enabled: false
|
||||
AcrossEmptyLines: false
|
||||
AcrossComments: false
|
||||
AlignCompound: false
|
||||
AlignFunctionPointers: false
|
||||
PadOperators: false
|
||||
AlignConsecutiveDeclarations:
|
||||
Enabled: false
|
||||
AcrossEmptyLines: false
|
||||
AcrossComments: false
|
||||
AlignCompound: false
|
||||
AlignFunctionPointers: false
|
||||
PadOperators: false
|
||||
AlignConsecutiveMacros:
|
||||
Enabled: false
|
||||
AcrossEmptyLines: false
|
||||
AcrossComments: false
|
||||
AlignCompound: false
|
||||
AlignFunctionPointers: false
|
||||
PadOperators: false
|
||||
AlignConsecutiveShortCaseStatements:
|
||||
Enabled: false
|
||||
AcrossEmptyLines: false
|
||||
AcrossComments: false
|
||||
AlignCaseColons: false
|
||||
AlignEscapedNewlines: Right
|
||||
AlignOperands: Align
|
||||
AlignTrailingComments:
|
||||
Kind: Always
|
||||
OverEmptyLines: 0
|
||||
AllowAllArgumentsOnNextLine: true
|
||||
AllowAllParametersOfDeclarationOnNextLine: true
|
||||
AllowBreakBeforeNoexceptSpecifier: Never
|
||||
AllowShortBlocksOnASingleLine: Never
|
||||
AllowShortCaseLabelsOnASingleLine: false
|
||||
AllowShortCompoundRequirementOnASingleLine: true
|
||||
AllowShortEnumsOnASingleLine: true
|
||||
AllowShortFunctionsOnASingleLine: All
|
||||
AllowShortIfStatementsOnASingleLine: Never
|
||||
AllowShortLambdasOnASingleLine: All
|
||||
AllowShortLoopsOnASingleLine: false
|
||||
AlwaysBreakAfterDefinitionReturnType: None
|
||||
AlwaysBreakAfterReturnType: None
|
||||
AlwaysBreakBeforeMultilineStrings: false
|
||||
AlwaysBreakTemplateDeclarations: MultiLine
|
||||
AttributeMacros:
|
||||
- __capability
|
||||
BinPackArguments: true
|
||||
BinPackParameters: true
|
||||
BitFieldColonSpacing: Both
|
||||
BraceWrapping:
|
||||
AfterCaseLabel: false
|
||||
AfterClass: false
|
||||
AfterControlStatement: Never
|
||||
AfterEnum: false
|
||||
AfterExternBlock: false
|
||||
AfterFunction: false
|
||||
AfterNamespace: false
|
||||
AfterObjCDeclaration: false
|
||||
AfterStruct: false
|
||||
AfterUnion: false
|
||||
BeforeCatch: false
|
||||
BeforeElse: false
|
||||
BeforeLambdaBody: false
|
||||
BeforeWhile: false
|
||||
IndentBraces: false
|
||||
SplitEmptyFunction: true
|
||||
SplitEmptyRecord: true
|
||||
SplitEmptyNamespace: true
|
||||
BreakAdjacentStringLiterals: true
|
||||
BreakAfterAttributes: Leave
|
||||
BreakAfterJavaFieldAnnotations: false
|
||||
BreakArrays: true
|
||||
BreakBeforeBinaryOperators: None
|
||||
BreakBeforeConceptDeclarations: Always
|
||||
BreakBeforeBraces: Attach
|
||||
BreakBeforeInlineASMColon: OnlyMultiline
|
||||
BreakBeforeTernaryOperators: true
|
||||
BreakConstructorInitializers: BeforeColon
|
||||
BreakInheritanceList: BeforeColon
|
||||
BreakStringLiterals: true
|
||||
ColumnLimit: 80
|
||||
CommentPragmas: '^ IWYU pragma:'
|
||||
CompactNamespaces: false
|
||||
ConstructorInitializerIndentWidth: 4
|
||||
ContinuationIndentWidth: 4
|
||||
Cpp11BracedListStyle: true
|
||||
DerivePointerAlignment: false
|
||||
DisableFormat: false
|
||||
EmptyLineAfterAccessModifier: Never
|
||||
EmptyLineBeforeAccessModifier: LogicalBlock
|
||||
ExperimentalAutoDetectBinPacking: false
|
||||
FixNamespaceComments: true
|
||||
ForEachMacros:
|
||||
- foreach
|
||||
- Q_FOREACH
|
||||
- BOOST_FOREACH
|
||||
IfMacros:
|
||||
- KJ_IF_MAYBE
|
||||
IncludeBlocks: Preserve
|
||||
IncludeCategories:
|
||||
- Regex: '^"(llvm|llvm-c|clang|clang-c)/'
|
||||
Priority: 2
|
||||
SortPriority: 0
|
||||
CaseSensitive: false
|
||||
- Regex: '^(<|"(gtest|gmock|isl|json)/)'
|
||||
Priority: 3
|
||||
SortPriority: 0
|
||||
CaseSensitive: false
|
||||
- Regex: '.*'
|
||||
Priority: 1
|
||||
SortPriority: 0
|
||||
CaseSensitive: false
|
||||
IncludeIsMainRegex: '(Test)?$'
|
||||
IncludeIsMainSourceRegex: ''
|
||||
IndentAccessModifiers: false
|
||||
IndentCaseBlocks: false
|
||||
IndentCaseLabels: false
|
||||
IndentExternBlock: AfterExternBlock
|
||||
IndentGotoLabels: true
|
||||
IndentPPDirectives: None
|
||||
IndentRequiresClause: true
|
||||
IndentWidth: 4
|
||||
IndentWrappedFunctionNames: false
|
||||
InsertBraces: false
|
||||
InsertNewlineAtEOF: false
|
||||
InsertTrailingCommas: None
|
||||
IntegerLiteralSeparator:
|
||||
Binary: 0
|
||||
BinaryMinDigits: 0
|
||||
Decimal: 0
|
||||
DecimalMinDigits: 0
|
||||
Hex: 0
|
||||
HexMinDigits: 0
|
||||
JavaScriptQuotes: Leave
|
||||
JavaScriptWrapImports: true
|
||||
KeepEmptyLinesAtTheStartOfBlocks: true
|
||||
KeepEmptyLinesAtEOF: false
|
||||
LambdaBodyIndentation: Signature
|
||||
LineEnding: DeriveLF
|
||||
MacroBlockBegin: ''
|
||||
MacroBlockEnd: ''
|
||||
MaxEmptyLinesToKeep: 1
|
||||
NamespaceIndentation: None
|
||||
ObjCBinPackProtocolList: Auto
|
||||
ObjCBlockIndentWidth: 2
|
||||
ObjCBreakBeforeNestedBlockParam: true
|
||||
ObjCSpaceAfterProperty: false
|
||||
ObjCSpaceBeforeProtocolList: true
|
||||
PackConstructorInitializers: BinPack
|
||||
PenaltyBreakAssignment: 2
|
||||
PenaltyBreakBeforeFirstCallParameter: 19
|
||||
PenaltyBreakComment: 300
|
||||
PenaltyBreakFirstLessLess: 120
|
||||
PenaltyBreakOpenParenthesis: 0
|
||||
PenaltyBreakScopeResolution: 500
|
||||
PenaltyBreakString: 1000
|
||||
PenaltyBreakTemplateDeclaration: 10
|
||||
PenaltyExcessCharacter: 1000000
|
||||
PenaltyIndentedWhitespace: 0
|
||||
PenaltyReturnTypeOnItsOwnLine: 60
|
||||
PointerAlignment: Right
|
||||
PPIndentWidth: -1
|
||||
QualifierAlignment: Leave
|
||||
ReferenceAlignment: Pointer
|
||||
ReflowComments: true
|
||||
RemoveBracesLLVM: false
|
||||
RemoveParentheses: Leave
|
||||
RemoveSemicolon: false
|
||||
RequiresClausePosition: OwnLine
|
||||
RequiresExpressionIndentation: OuterScope
|
||||
SeparateDefinitionBlocks: Leave
|
||||
ShortNamespaceLines: 1
|
||||
SkipMacroDefinitionBody: false
|
||||
SortIncludes: CaseSensitive
|
||||
SortJavaStaticImport: Before
|
||||
SortUsingDeclarations: LexicographicNumeric
|
||||
SpaceAfterCStyleCast: false
|
||||
SpaceAfterLogicalNot: false
|
||||
SpaceAfterTemplateKeyword: true
|
||||
SpaceAroundPointerQualifiers: Default
|
||||
SpaceBeforeAssignmentOperators: true
|
||||
SpaceBeforeCaseColon: false
|
||||
SpaceBeforeCpp11BracedList: false
|
||||
SpaceBeforeCtorInitializerColon: true
|
||||
SpaceBeforeInheritanceColon: true
|
||||
SpaceBeforeJsonColon: false
|
||||
SpaceBeforeParens: ControlStatements
|
||||
SpaceBeforeParensOptions:
|
||||
AfterControlStatements: true
|
||||
AfterForeachMacros: true
|
||||
AfterFunctionDefinitionName: false
|
||||
AfterFunctionDeclarationName: false
|
||||
AfterIfMacros: true
|
||||
AfterOverloadedOperator: false
|
||||
AfterPlacementOperator: true
|
||||
AfterRequiresInClause: false
|
||||
AfterRequiresInExpression: false
|
||||
BeforeNonEmptyParentheses: false
|
||||
SpaceBeforeRangeBasedForLoopColon: true
|
||||
SpaceBeforeSquareBrackets: false
|
||||
SpaceInEmptyBlock: false
|
||||
SpacesBeforeTrailingComments: 1
|
||||
SpacesInAngles: Never
|
||||
SpacesInContainerLiterals: true
|
||||
SpacesInLineCommentPrefix:
|
||||
Minimum: 1
|
||||
Maximum: -1
|
||||
SpacesInParens: Never
|
||||
SpacesInParensOptions:
|
||||
InCStyleCasts: false
|
||||
InConditionalStatements: false
|
||||
InEmptyParentheses: false
|
||||
Other: false
|
||||
SpacesInSquareBrackets: false
|
||||
Standard: Latest
|
||||
StatementAttributeLikeMacros:
|
||||
- Q_EMIT
|
||||
StatementMacros:
|
||||
- Q_UNUSED
|
||||
- QT_REQUIRE_VERSION
|
||||
TabWidth: 8
|
||||
UseTab: Never
|
||||
VerilogBreakBetweenInstancePorts: true
|
||||
WhitespaceSensitiveMacros:
|
||||
- BOOST_PP_STRINGIZE
|
||||
- CF_SWIFT_NAME
|
||||
- NS_SWIFT_NAME
|
||||
- PP_STRINGIZE
|
||||
- STRINGIZE
|
||||
...
|
||||
35
.gitea/workflows/action.yaml
Normal file
35
.gitea/workflows/action.yaml
Normal file
@@ -0,0 +1,35 @@
|
||||
name: Test And Build
|
||||
on: [push]
|
||||
|
||||
jobs:
|
||||
Lint:
|
||||
runs-on: linepics
|
||||
steps:
|
||||
- name: checkout repo
|
||||
uses: actions/checkout@v4
|
||||
- name: formatting
|
||||
run: clang-format --style=file --Werror --dry-run src/*.cpp
|
||||
- name: cppcheck
|
||||
run: cppcheck --std=c++17 --addon=misc --error-exitcode=1 src/*.cpp
|
||||
Build:
|
||||
runs-on: linepics
|
||||
steps:
|
||||
- name: checkout repo
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: 'true'
|
||||
- name: install dependencies
|
||||
run: |
|
||||
dnf install -y librdkafka-devel
|
||||
- name: prepare flatbuffers
|
||||
run: |
|
||||
pushd dep/flatbuffers
|
||||
cmake -G "Unix Makefiles"
|
||||
make -j
|
||||
popd
|
||||
./dep/flatbuffers/flatc -o schemas/ --cpp --gen-mutable --gen-name-strings --scoped-enums ./dep/streaming-data-types/schemas/*
|
||||
- name: build module
|
||||
run: |
|
||||
sed -i 's/ARCH_FILTER=.*/ARCH_FILTER=linux%/' Makefile
|
||||
echo -e "\nIGNORE_SUBMODULES += streaming-data-types flatbuffers" >> Makefile
|
||||
make install
|
||||
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
O.*
|
||||
.*ignore
|
||||
schemas/
|
||||
6
.gitmodules
vendored
Normal file
6
.gitmodules
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
[submodule "dep/streaming-data-types"]
|
||||
path = dep/streaming-data-types
|
||||
url = git@gitea.psi.ch:lin-controls/streaming-data-types.git
|
||||
[submodule "dep/flatbuffers"]
|
||||
path = dep/flatbuffers
|
||||
url = git@gitea.psi.ch:lin-controls/flatbuffers.git
|
||||
29
Makefile
Normal file
29
Makefile
Normal file
@@ -0,0 +1,29 @@
|
||||
# Include the external Makefile
|
||||
include /ioc/tools/driver.makefile
|
||||
|
||||
MODULE=StreamGenerator
|
||||
BUILDCLASSES=Linux
|
||||
EPICS_VERSIONS=7.0.7
|
||||
ARCH_FILTER=RHEL%
|
||||
|
||||
# Additional module dependencies
|
||||
REQUIRED+=asyn
|
||||
|
||||
DBDS += src/asynStreamGeneratorDriver.dbd
|
||||
|
||||
# DB files to include in the release
|
||||
TEMPLATES += db/channels.db db/daq_common.db db/correlation_unit.db
|
||||
|
||||
# HEADERS += src/asynStreamGeneratorDriver.h
|
||||
|
||||
# Source files to build
|
||||
SOURCES += src/asynStreamGeneratorDriver.cpp
|
||||
|
||||
# I don't think specifying the optimisation level like this is correct...
|
||||
# but I doesn't hurt :D
|
||||
USR_CFLAGS += -O3 -Wall -Wextra -Wunused-result -Werror -fvisibility=hidden # -Wpedantic // Does not work because EPICS macros trigger warnings
|
||||
|
||||
# Required to support EV42/44
|
||||
USR_CXXFLAGS += -std=c++17 -O3 -I../dep/flatbuffers/include/ -I../schemas
|
||||
|
||||
LIB_SYS_LIBS += rdkafka
|
||||
41
README.md
Normal file
41
README.md
Normal file
@@ -0,0 +1,41 @@
|
||||
# StreamGenerator
|
||||
|
||||
Clone the repository to a local directory via:
|
||||
|
||||
```
|
||||
git clone --recurse-submodules -j8 git@gitea.psi.ch:lin-epics-modules/StreamGenerator.git
|
||||
```
|
||||
|
||||
## Dependencies
|
||||
|
||||
Currently, this project requires a system install of librdkafka. On Redhat,
|
||||
this means you should run:
|
||||
|
||||
```bash
|
||||
dnf install -y librdkafka-devel
|
||||
```
|
||||
|
||||
Additionally, you must first build Google's *flatbuffers* and ESS's
|
||||
**streaming-data-types** libraries, which are both included in this project as
|
||||
submodules under the `dep` directory and which are both necessary to build this
|
||||
project.
|
||||
|
||||
First, you should enter the *flatbuffers* directory and run the following:
|
||||
|
||||
```bash
|
||||
cmake -G "Unix Makefiles"
|
||||
make -j
|
||||
```
|
||||
|
||||
After these steps, you will find the program `flatc` has been built and placed
|
||||
in the directory.
|
||||
|
||||
Next, you should return to the top of this project's directory tree, and create
|
||||
the flatbuffers from ESS's schema files. This you can do as follows:
|
||||
|
||||
```bash
|
||||
./dep/flatbuffers/flatc -o schemas/ --cpp --gen-mutable --gen-name-strings --scoped-enums ./dep/streaming-data-types/schemas/*
|
||||
```
|
||||
|
||||
This generates header files from each of ESS's schemas and places them in a
|
||||
schemas directory.
|
||||
76
db/channels.db
Normal file
76
db/channels.db
Normal file
@@ -0,0 +1,76 @@
|
||||
# EPICS Database for streamdevice specific to measurement channels
|
||||
#
|
||||
# Macros
|
||||
# INSTR - Prefix
|
||||
# NAME - the device name, e.g. EL737
|
||||
# PORT - StreamGenerator Port
|
||||
# CHANNEL - the number associated with the measurment channel
|
||||
|
||||
################################################################################
|
||||
# Status Variables
|
||||
|
||||
# Trigger a change in status as clearing
|
||||
record(bo, "$(INSTR)$(NAME):T$(CHANNEL)")
|
||||
{
|
||||
field(DESC, "Trigger Clearing Status")
|
||||
field(VAL, 1)
|
||||
field(OUT, "$(INSTR)$(NAME):S$(CHANNEL) PP")
|
||||
}
|
||||
|
||||
# Trigger a change in status as value returned to 0
|
||||
record(seq, "$(INSTR)$(NAME):O$(CHANNEL)")
|
||||
{
|
||||
field(DESC, "Trigger Returned to 0 Status")
|
||||
field(LNK0, "$(INSTR)$(NAME):S$(CHANNEL) PP")
|
||||
field(DO0, 0)
|
||||
field(SELM, "Specified")
|
||||
field(SELL, "$(INSTR)$(NAME):M$(CHANNEL).VAL")
|
||||
field(SCAN, ".1 second")
|
||||
}
|
||||
|
||||
# Current Status of Channel, i.e. is it ready to count?
|
||||
record(bi, "$(INSTR)$(NAME):S$(CHANNEL)")
|
||||
{
|
||||
field(DESC, "Channel Status")
|
||||
field(VAL, 0)
|
||||
field(ZNAM, "OK")
|
||||
field(ONAM, "CLEARING")
|
||||
field(PINI, 1)
|
||||
}
|
||||
|
||||
################################################################################
|
||||
# Count Commands
|
||||
|
||||
record(longout, "$(INSTR)$(NAME):C$(CHANNEL)")
|
||||
{
|
||||
field(DESC, "Clear the current channel count")
|
||||
field(DTYP, "asynInt32")
|
||||
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) C_$(CHANNEL)")
|
||||
field(FLNK, "$(INSTR)$(NAME):T$(CHANNEL)")
|
||||
}
|
||||
|
||||
################################################################################
|
||||
# Read all monitors values
|
||||
|
||||
record(int64in, "$(INSTR)$(NAME):M$(CHANNEL)")
|
||||
{
|
||||
field(DESC, "DAQ CH$(CHANNEL)")
|
||||
field(EGU, "cts")
|
||||
field(DTYP, "asynInt64")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) COUNTS$(CHANNEL)")
|
||||
# This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(ai, "$(INSTR)$(NAME):R$(CHANNEL)")
|
||||
{
|
||||
field(DESC, "Rate of DAQ CH$(CHANNEL)")
|
||||
field(EGU, "cts/sec")
|
||||
field(DTYP, "asynFloat64")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) RATE$(CHANNEL)")
|
||||
field(SCAN, ".5 second")
|
||||
field(PREC, 2)
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
73
db/correlation_unit.db
Normal file
73
db/correlation_unit.db
Normal file
@@ -0,0 +1,73 @@
|
||||
# EPICS Database for streamdevice specific to measurement channels
|
||||
#
|
||||
# Macros
|
||||
# INSTR - Prefix
|
||||
# NAME - the device name, e.g. EL737
|
||||
# PORT - StreamGenerator Port
|
||||
|
||||
################################################################################
|
||||
# Status Variables
|
||||
|
||||
record(bo, "$(INSTR)$(NAME):Enable")
|
||||
{
|
||||
field(DESC, "Electronics Status")
|
||||
field(DTYP, "asynInt32")
|
||||
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) EN_EL")
|
||||
field(ZNAM, "OFF")
|
||||
field(ONAM, "ON")
|
||||
field(PINI, 1)
|
||||
}
|
||||
|
||||
record(bi, "$(INSTR)$(NAME):Enable_RBV")
|
||||
{
|
||||
field(DESC, "Electronics Status")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) EN_EL_RBV")
|
||||
field(ZNAM, "OFF")
|
||||
field(ONAM, "ON")
|
||||
field(SCAN, ".5 second")
|
||||
}
|
||||
|
||||
record(longin,"$(INSTR)$(NAME):UDP_DROPPED")
|
||||
{
|
||||
field(DESC, "UDP Packets Missed")
|
||||
field(EGU, "Events")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) DROP")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(SCAN, "1 second")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(longin,"$(INSTR)$(NAME):UDP_WATERMARK")
|
||||
{
|
||||
field(DESC, "UDP Queue Usage")
|
||||
field(EGU, "%")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) UDP")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(SCAN, "1 second")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(longin,"$(INSTR)$(NAME):NORMALISED_WATERMARK")
|
||||
{
|
||||
field(DESC, "Normalised Queue Usage")
|
||||
field(EGU, "%")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) NORM")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(SCAN, "1 second")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(longin,"$(INSTR)$(NAME):SORTED_WATERMARK")
|
||||
{
|
||||
field(DESC, "Sort Queue Usage")
|
||||
field(EGU, "%")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) SORT")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(SCAN, "1 second")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
238
db/daq_common.db
Normal file
238
db/daq_common.db
Normal file
@@ -0,0 +1,238 @@
|
||||
# EPICS Database for streamdevice specific to measurement channels
|
||||
#
|
||||
# Macros
|
||||
# INSTR - Prefix
|
||||
# NAME - the device name, e.g. EL737
|
||||
# PORT - StreamGenerator Port
|
||||
|
||||
record(longout, "$(INSTR)$(NAME):FULL-RESET")
|
||||
{
|
||||
field(DESC, "Reset the DAQ")
|
||||
field(DTYP, "asynInt32")
|
||||
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) RESET")
|
||||
}
|
||||
|
||||
################################################################################
|
||||
# Status Variables
|
||||
|
||||
record(stringin, "$(INSTR)$(NAME):MsgTxt")
|
||||
{
|
||||
field(DESC, "Unexpected received response")
|
||||
}
|
||||
|
||||
# We separate the RAW-STATUS and the STATUS PV so that the state can be updated
|
||||
# in a sequence, that guarantees that we included the most recent time and
|
||||
# counts before the status switches back to Idle.
|
||||
# We do this via a sequenced update
|
||||
#
|
||||
# RAW-STATUS -> ELAPSED-SECONDS -> M* -> STATUS
|
||||
record(mbbi, "$(INSTR)$(NAME):RAW-STATUS")
|
||||
{
|
||||
field(DESC, "DAQ Status")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) STATUS")
|
||||
field(ZRVL, "0")
|
||||
field(ZRST, "Idle")
|
||||
field(ONVL, "1")
|
||||
field(ONST, "Counting")
|
||||
field(TWVL, "2")
|
||||
field(TWST, "Low rate")
|
||||
field(THVL, "3")
|
||||
field(THST, "Paused")
|
||||
# 4 should never happen, if it does it means the DAQ reports undocumented statusbits
|
||||
field(FRVL, "4")
|
||||
field(FRST, "INVALID")
|
||||
# This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(SCAN, ".1 second")
|
||||
field(FLNK, "$(INSTR)$(NAME):READALL")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(fanout, "$(INSTR)$(NAME):READALL")
|
||||
{
|
||||
field(SELM, "All")
|
||||
field(LNK0, "$(INSTR)$(NAME):ELAPSED-TIME")
|
||||
field(LNK1, "$(INSTR)$(NAME):M1")
|
||||
field(LNK2, "$(INSTR)$(NAME):M2")
|
||||
field(LNK3, "$(INSTR)$(NAME):M3")
|
||||
field(LNK4, "$(INSTR)$(NAME):M4")
|
||||
field(LNK5, "$(INSTR)$(NAME):M5")
|
||||
# Doesn't seemt o be a problem to have more in here :D
|
||||
# field(LNK6, "$(INSTR)$(NAME):M5")
|
||||
# field(LNK7, "$(INSTR)$(NAME):M6")
|
||||
field(FLNK, "$(INSTR)$(NAME):STATUS")
|
||||
}
|
||||
|
||||
record(mbbi, "$(INSTR)$(NAME):STATUS")
|
||||
{
|
||||
field(INP, "$(INSTR)$(NAME):RAW-STATUS NPP")
|
||||
field(DESC, "DAQ Status")
|
||||
field(ZRVL, "0")
|
||||
field(ZRST, "Idle")
|
||||
field(ONVL, "1")
|
||||
field(ONST, "Counting")
|
||||
field(TWVL, "2")
|
||||
field(TWST, "Low rate")
|
||||
field(THVL, "3")
|
||||
field(THST, "Paused")
|
||||
# 4 should never happen, if it does it means the DAQ reports undocumented statusbits
|
||||
field(FRVL, "4")
|
||||
field(FRST, "INVALID")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(longin, "$(INSTR)$(NAME):CHANNELS")
|
||||
{
|
||||
field(DESC, "Total Supported Channels")
|
||||
field(VAL, $(CHANNELS))
|
||||
field(DISP, 1)
|
||||
}
|
||||
|
||||
# Trigger a change in status as clearing
|
||||
record(bo, "$(INSTR)$(NAME):ETT")
|
||||
{
|
||||
field(DESC, "Trigger Clearing Status")
|
||||
field(VAL, 1)
|
||||
field(OUT, "$(INSTR)$(NAME):ETS PP")
|
||||
}
|
||||
|
||||
# Trigger a change in status as value returned to 0
|
||||
record(seq, "$(INSTR)$(NAME):ETO")
|
||||
{
|
||||
field(DESC, "Trigger Returned to 0 Status")
|
||||
field(LNK0, "$(INSTR)$(NAME):ETS PP")
|
||||
field(DO0, 0)
|
||||
field(SELM, "Specified")
|
||||
field(SELL, "$(INSTR)$(NAME):ELAPSED-TIME.VAL")
|
||||
field(SCAN, ".1 second")
|
||||
}
|
||||
|
||||
# Current Status of Channel, i.e. is it ready to count?
|
||||
record(bi, "$(INSTR)$(NAME):ETS")
|
||||
{
|
||||
field(DESC, "Channel Status")
|
||||
field(VAL, 0)
|
||||
field(ZNAM, "OK")
|
||||
field(ONAM, "CLEARING")
|
||||
field(PINI, 1)
|
||||
}
|
||||
|
||||
################################################################################
|
||||
# Count Commands
|
||||
|
||||
record(longout,"$(INSTR)$(NAME):PRESET-COUNT")
|
||||
{
|
||||
field(DESC, "Count until preset reached")
|
||||
field(DTYP, "asynInt32")
|
||||
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) P_CNT")
|
||||
field(VAL, 0)
|
||||
}
|
||||
|
||||
record(longout,"$(INSTR)$(NAME):PRESET-TIME")
|
||||
{
|
||||
field(DESC, "Count for specified time")
|
||||
field(EGU, "seconds")
|
||||
field(DTYP, "asynInt32")
|
||||
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) P_TIME")
|
||||
field(VAL, 0)
|
||||
}
|
||||
|
||||
record(bo,"$(INSTR)$(NAME):PAUSE")
|
||||
{
|
||||
field(DESC, "Pause the current count")
|
||||
field(VAL, "0")
|
||||
}
|
||||
|
||||
record(bo,"$(INSTR)$(NAME):CONTINUE")
|
||||
{
|
||||
field(DESC, "Continue with a count that was paused")
|
||||
field(VAL, "0")
|
||||
}
|
||||
|
||||
record(longout, "$(INSTR)$(NAME):STOP")
|
||||
{
|
||||
field(DESC, "Stop the current counting operation")
|
||||
field(DTYP, "asynInt32")
|
||||
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) STOP")
|
||||
}
|
||||
|
||||
record(longout, "$(INSTR)$(NAME):MONITOR-CHANNEL")
|
||||
{
|
||||
field(DESC, "PRESET-COUNT Monitors this channel")
|
||||
field(DTYP, "asynInt32")
|
||||
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) MONITOR")
|
||||
field(DRVL, "1") # Smallest Monitor Channel
|
||||
field(DRVH, "$(CHANNELS)") # Largest Monitor Channel
|
||||
}
|
||||
|
||||
record(longin, "$(INSTR)$(NAME):MONITOR-CHANNEL_RBV")
|
||||
{
|
||||
field(DESC, "PRESET-COUNT Monitors this channel")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) MONITOR")
|
||||
field(SCAN, ".2 second")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(ao,"$(INSTR)$(NAME):THRESHOLD")
|
||||
{
|
||||
field(DESC, "Minimum rate for counting to proceed")
|
||||
field(DTYP, "asynInt32")
|
||||
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH")
|
||||
field(VAL, "1") # Default Rate
|
||||
field(DRVL, "1") # Minimum Rate
|
||||
field(DRVH, "100000") # Maximum Rate
|
||||
}
|
||||
|
||||
record(ai,"$(INSTR)$(NAME):THRESHOLD_RBV")
|
||||
{
|
||||
field(DESC, "Minimum rate for counting to proceed")
|
||||
field(EGU, "cts/sec")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH")
|
||||
field(SCAN, "I/O Intr")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(longout,"$(INSTR)$(NAME):THRESHOLD-MONITOR")
|
||||
{
|
||||
field(DESC, "Channel monitored for minimum rate")
|
||||
field(DTYP, "asynInt32")
|
||||
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH_CH")
|
||||
field(VAL, "1") # Monitor
|
||||
field(DRVL, "0") # Smallest Threshold Channel (0 is off)
|
||||
field(DRVH, "$(CHANNELS)") # Largest Threshold Channel
|
||||
}
|
||||
|
||||
record(longin,"$(INSTR)$(NAME):THRESHOLD-MONITOR_RBV")
|
||||
{
|
||||
field(DESC, "Channel monitored for minimum rate")
|
||||
field(EGU, "CH")
|
||||
field(DTYP, "asynInt32")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH_CH")
|
||||
field(SCAN, "I/O Intr")
|
||||
field(PINI, "YES")
|
||||
}
|
||||
|
||||
record(longout, "$(INSTR)$(NAME):CT")
|
||||
{
|
||||
field(DESC, "Clear the timer")
|
||||
field(DTYP, "asynInt32")
|
||||
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) C_TIME")
|
||||
field(FLNK, "$(INSTR)$(NAME):ETT")
|
||||
}
|
||||
|
||||
################################################################################
|
||||
# Read all monitors values
|
||||
|
||||
record(ai, "$(INSTR)$(NAME):ELAPSED-TIME")
|
||||
{
|
||||
field(DESC, "DAQ Measured Time")
|
||||
field(EGU, "sec")
|
||||
field(DTYP, "asynFloat64")
|
||||
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) TIME")
|
||||
# field(SCAN, "I/O Intr")
|
||||
field(PINI, "YES")
|
||||
# field(FLNK, "$(INSTR)$(NAME):ETO")
|
||||
}
|
||||
1
dep/flatbuffers
Submodule
1
dep/flatbuffers
Submodule
Submodule dep/flatbuffers added at 1872409707
1
dep/streaming-data-types
Submodule
1
dep/streaming-data-types
Submodule
Submodule dep/streaming-data-types added at 3b1830faf2
@@ -1,4 +0,0 @@
|
||||
confluent-kafka==2.12.1
|
||||
ess-streaming-data-types==0.27.0
|
||||
flatbuffers==25.9.23
|
||||
numpy==1.26.3
|
||||
9
scripts/ioc.sh
Executable file
9
scripts/ioc.sh
Executable file
@@ -0,0 +1,9 @@
|
||||
#!/bin/bash
|
||||
|
||||
export EPICS_HOST_ARCH=linux-x86_64
|
||||
export EPICS_BASE=/usr/local/epics/base-7.0.7
|
||||
|
||||
PARENT_PATH="$( cd "$(dirname "${BASH_SOURCE[0]}")" ; pwd -P )"
|
||||
|
||||
# /usr/local/bin/procServ -o -L - -f -i ^D^C 20001 "${PARENT_PATH}/st.cmd" -d
|
||||
${PARENT_PATH}/st.cmd
|
||||
40
scripts/st.cmd
Executable file
40
scripts/st.cmd
Executable file
@@ -0,0 +1,40 @@
|
||||
#!/usr/local/bin/iocsh
|
||||
#-d
|
||||
|
||||
on error break
|
||||
|
||||
require StreamGenerator, test
|
||||
|
||||
epicsEnvSet("INSTR", "SQ:DMC-DAQ:")
|
||||
epicsEnvSet("NAME", "SG")
|
||||
|
||||
# Local UDP Generator Test Config
|
||||
drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 1)
|
||||
|
||||
# Correlation Unit Config
|
||||
# drvAsynIPPortConfigure("ASYN_IP_PORT", "172.28.69.20:54321:54321 UDP", 0, 0, 1)
|
||||
|
||||
# With a udpQueue and sortQueue size of 10'000 packets, we can hold in memory
|
||||
# 10'000 * 243 = 2.43e6 events
|
||||
|
||||
# Kafka Broker and Topic Configuration
|
||||
# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "linkafka01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 10000, 20480)
|
||||
# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "ess01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 10000, 20480)
|
||||
|
||||
# Don't send any kafka messages
|
||||
asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "", "", "", 0, 0)
|
||||
|
||||
dbLoadRecords("$(StreamGenerator_DB)correlation_unit.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG")
|
||||
|
||||
dbLoadRecords("$(StreamGenerator_DB)daq_common.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNELS=5")
|
||||
|
||||
# Monitor Channels
|
||||
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=1")
|
||||
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=2")
|
||||
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=3")
|
||||
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=4")
|
||||
|
||||
# Detector Count Channel
|
||||
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=5")
|
||||
|
||||
iocInit()
|
||||
120
scripts/udp_gen.py
Normal file
120
scripts/udp_gen.py
Normal file
@@ -0,0 +1,120 @@
|
||||
import socket
|
||||
import time
|
||||
import random
|
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
|
||||
header = [
|
||||
0, 0, # buffer length in 16bit words (1, 0) == 1, (0, 1) == 256
|
||||
0, 0x80, # buffer type (probably should be 0)
|
||||
21, 0, # header length
|
||||
0, 0, # buffer number
|
||||
0, 0, # run id
|
||||
0x3, # status
|
||||
0, # id of sending module
|
||||
0, 0, # timestamp low
|
||||
0, 0, # timestamp mid
|
||||
0, 0, # timestamp high
|
||||
] + [0, 0] * 12 # parameters
|
||||
|
||||
data = [
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0
|
||||
]
|
||||
|
||||
start_time = time.time_ns() // 100
|
||||
|
||||
buffer_ids = {
|
||||
i: (0, 0) for i in range(10)
|
||||
}
|
||||
|
||||
while True:
|
||||
|
||||
# update timestamp
|
||||
base_timestamp = time.time_ns() // 100 - start_time
|
||||
t_low = base_timestamp & 0xffff
|
||||
t_mid = (base_timestamp >> 16) & 0xffff
|
||||
t_high = (base_timestamp >> 32) & 0xffff
|
||||
header[12] = t_low & 0xff
|
||||
header[13] = t_low >> 8
|
||||
header[14] = t_mid & 0xff
|
||||
header[15] = t_mid >> 8
|
||||
header[16] = t_high & 0xff
|
||||
header[17] = t_high >> 8
|
||||
|
||||
num_events = random.randint(0, 243)
|
||||
# num_events = 243
|
||||
# num_events = 1
|
||||
|
||||
# update buffer length
|
||||
buffer_length = 21 + num_events * 3
|
||||
header[0] = buffer_length & 0xff
|
||||
header[1] = (buffer_length >> 8) & 0xff
|
||||
|
||||
# I believe, that in our case we never mix monitor and detector events as
|
||||
# the monitors should have id 0 and the detector events 1-9 so I have
|
||||
# excluded that posibility here. That would, however, if true mean we could
|
||||
# reduce also the number of checks on the parsing side of things...
|
||||
|
||||
is_monitor = random.randint(0, 9)
|
||||
# is_monitor = 4
|
||||
|
||||
header[11] = 0 if is_monitor > 3 else random.randint(1,9)
|
||||
|
||||
# update buffer number (each mcpdid has its own buffer number count)
|
||||
header[6], header[7] = buffer_ids[header[11]]
|
||||
header[6] = (header[6] + 1) % (0xff + 1)
|
||||
header[7] = (header[7] + (header[6] == 0)) % (0xff + 1)
|
||||
buffer_ids[header[11]] = header[6], header[7]
|
||||
|
||||
tosend = []
|
||||
|
||||
if is_monitor > 3:
|
||||
|
||||
for i in range(num_events):
|
||||
d = list(data)
|
||||
|
||||
monitor = random.randint(0,3)
|
||||
# monitor = 0
|
||||
|
||||
d[5] = (1 << 7) | monitor
|
||||
|
||||
# update trigger timestamp
|
||||
event_timestamp = (time.time_ns() // 100) - base_timestamp
|
||||
d[0] = event_timestamp & 0xff
|
||||
d[1] = (event_timestamp >> 8) & 0xff
|
||||
d[2] = (event_timestamp >> 16) & 0x07
|
||||
|
||||
# completely reversed sorting
|
||||
tosend = d + tosend
|
||||
|
||||
else:
|
||||
|
||||
for i in range(num_events):
|
||||
d = list(data)
|
||||
|
||||
amplitude = random.randint(0, 255)
|
||||
x_pos = random.randint(0, 1023)
|
||||
y_pos = random.randint(0, 1023)
|
||||
event_timestamp = (time.time_ns() // 100) - base_timestamp
|
||||
|
||||
d[5] = (0 << 7) | (amplitude >> 1)
|
||||
d[4] = ((amplitude & 0x01) << 7) | (y_pos >> 3)
|
||||
d[3] = ((y_pos << 5) & 0xE0) | (x_pos >> 5)
|
||||
d[2] = ((x_pos << 3) & 0xF8)
|
||||
|
||||
d[0] = event_timestamp & 0xff
|
||||
d[1] = (event_timestamp >> 8) & 0xff
|
||||
d[2] |= (event_timestamp >> 16) & 0x07
|
||||
|
||||
# completely reversed sorting
|
||||
tosend = d + tosend
|
||||
|
||||
sock.sendto(bytes(header + tosend), ('127.0.0.1', 54321))
|
||||
mv = memoryview(bytes(header)).cast('H')
|
||||
print(f'Sent packet {mv[3]} with {num_events} events {base_timestamp}')
|
||||
# time.sleep(.1)
|
||||
1123
src/asynStreamGeneratorDriver.cpp
Normal file
1123
src/asynStreamGeneratorDriver.cpp
Normal file
File diff suppressed because it is too large
Load Diff
1
src/asynStreamGeneratorDriver.dbd
Normal file
1
src/asynStreamGeneratorDriver.dbd
Normal file
@@ -0,0 +1 @@
|
||||
registrar("asynStreamGeneratorDriverRegister")
|
||||
234
src/asynStreamGeneratorDriver.h
Normal file
234
src/asynStreamGeneratorDriver.h
Normal file
@@ -0,0 +1,234 @@
|
||||
#ifndef asynStreamGeneratorDriver_H
|
||||
#define asynStreamGeneratorDriver_H
|
||||
|
||||
#include "asynPortDriver.h"
|
||||
#include <epicsRingBytes.h>
|
||||
#include <librdkafka/rdkafka.h>
|
||||
|
||||
// Just for printing
|
||||
#define __STDC_FORMAT_MACROS
|
||||
#include <inttypes.h>
|
||||
|
||||
/*******************************************************************************
|
||||
* UDP Packet Definitions
|
||||
*/
|
||||
enum class CommandId : std::uint16_t {
|
||||
reset = 0,
|
||||
start = 1,
|
||||
stop = 2,
|
||||
cont = 3
|
||||
};
|
||||
|
||||
// TODO these headers are actually the same, but with different bodies.
|
||||
struct __attribute__((__packed__)) CommandHeader {
|
||||
uint16_t BufferLength;
|
||||
uint16_t BufferType;
|
||||
uint16_t HeaderLength;
|
||||
uint16_t BufferNumber;
|
||||
uint16_t Command;
|
||||
uint16_t Status : 8;
|
||||
uint16_t McpdID : 8;
|
||||
uint16_t TimeStamp[3];
|
||||
uint16_t Checksum;
|
||||
uint16_t Finalizer;
|
||||
|
||||
CommandHeader(const CommandId commandId)
|
||||
: BufferLength(10), BufferType(0x8000), HeaderLength(10),
|
||||
BufferNumber(0),
|
||||
Command(
|
||||
static_cast<std::underlying_type<CommandId>::type>(commandId)),
|
||||
Status(0), McpdID(0), TimeStamp{0, 0, 0}, Checksum(0),
|
||||
Finalizer(0xffff) {
|
||||
|
||||
uint16_t checksum = 0x0000;
|
||||
for (std::size_t i = 0; i < BufferLength; ++i)
|
||||
checksum ^= ((uint16_t *)this)[i];
|
||||
Checksum = checksum;
|
||||
}
|
||||
};
|
||||
|
||||
struct __attribute__((__packed__)) DataHeader {
|
||||
uint16_t BufferLength;
|
||||
uint16_t BufferType;
|
||||
uint16_t HeaderLength;
|
||||
uint16_t BufferNumber;
|
||||
uint16_t RunCmdID;
|
||||
uint16_t Status : 8;
|
||||
uint16_t McpdID : 8;
|
||||
uint16_t TimeStamp[3];
|
||||
uint16_t Parameter0[3];
|
||||
uint16_t Parameter1[3];
|
||||
uint16_t Parameter2[3];
|
||||
uint16_t Parameter3[3];
|
||||
|
||||
inline uint64_t nanosecs() const {
|
||||
uint64_t nsec{((uint64_t)TimeStamp[2]) << 32 |
|
||||
((uint64_t)TimeStamp[1]) << 16 | (uint64_t)TimeStamp[0]};
|
||||
return nsec * 100;
|
||||
}
|
||||
};
|
||||
|
||||
struct __attribute__((__packed__)) DetectorEvent {
|
||||
uint64_t TimeStamp : 19;
|
||||
uint16_t XPosition : 10;
|
||||
uint16_t YPosition : 10;
|
||||
uint16_t Amplitude : 8;
|
||||
uint16_t Id : 1;
|
||||
inline uint32_t nanosecs() const { return TimeStamp * 100; }
|
||||
inline uint32_t pixelId(uint32_t mpcdId) const {
|
||||
const uint32_t x_pixels = 128;
|
||||
const uint32_t y_pixels = 128;
|
||||
return (mpcdId - 1) * x_pixels * y_pixels +
|
||||
x_pixels * (uint32_t)(this->XPosition >> 3) +
|
||||
(uint32_t)(this->YPosition >> 3);
|
||||
}
|
||||
};
|
||||
|
||||
struct __attribute__((__packed__)) MonitorEvent {
|
||||
uint64_t TimeStamp : 19;
|
||||
uint64_t Data : 21;
|
||||
uint64_t DataID : 4;
|
||||
uint64_t TriggerID : 3;
|
||||
uint64_t Id : 1;
|
||||
inline uint32_t nanosecs() const { return TimeStamp * 100; }
|
||||
};
|
||||
|
||||
/*******************************************************************************
|
||||
* Simplified Event Struct Definition
|
||||
*/
|
||||
|
||||
struct __attribute__((__packed__)) NormalisedEvent {
|
||||
uint64_t timestamp;
|
||||
uint32_t pixelId : 24;
|
||||
uint8_t source;
|
||||
|
||||
// inline NormalisedEvent(uint64_t timestamp, uint8_t source, uint32_t
|
||||
// pixelId)
|
||||
// : timestamp(timestamp), source(source), pixelId(pixelId){};
|
||||
};
|
||||
|
||||
/*******************************************************************************
|
||||
* Status values that should match the definition in db/daq_common.db
|
||||
*/
|
||||
#define STATUS_IDLE 0
|
||||
#define STATUS_COUNTING 1
|
||||
#define STATUS_LOWRATE 2
|
||||
#define STATUS_PAUSED 3
|
||||
|
||||
/*******************************************************************************
|
||||
* Parameters for use in DB records
|
||||
*
|
||||
* i.e.e drvInfo strings that are used to identify the parameters
|
||||
*/
|
||||
|
||||
constexpr static char P_EnableElectronicsString[]{"EN_EL"};
|
||||
constexpr static char P_EnableElectronicsRBVString[]{"EN_EL_RBV"};
|
||||
|
||||
constexpr static char P_StatusString[]{"STATUS"};
|
||||
constexpr static char P_ResetString[]{"RESET"};
|
||||
constexpr static char P_StopString[]{"STOP"};
|
||||
constexpr static char P_CountPresetString[]{"P_CNT"};
|
||||
constexpr static char P_TimePresetString[]{"P_TIME"};
|
||||
constexpr static char P_ElapsedTimeString[]{"TIME"};
|
||||
constexpr static char P_ClearElapsedTimeString[]{"C_TIME"};
|
||||
constexpr static char P_MonitorChannelString[]{"MONITOR"};
|
||||
constexpr static char P_ThresholdString[]{"THRESH"};
|
||||
constexpr static char P_ThresholdChannelString[]{"THRESH_CH"};
|
||||
|
||||
constexpr static char P_CountsString[]{"COUNTS%" PRIu64};
|
||||
constexpr static char P_RateString[]{"RATE%" PRIu64};
|
||||
constexpr static char P_ClearCountsString[]{"C_%" PRIu64};
|
||||
|
||||
constexpr static char P_UdpDroppedString[]{"DROP"};
|
||||
constexpr static char P_UdpQueueHighWaterMarkString[]{"UDP"};
|
||||
constexpr static char P_NormalisedQueueHighWaterMarkString[]{"NORM"};
|
||||
constexpr static char P_SortedQueueHighWaterMarkString[]{"SORT"};
|
||||
|
||||
/*******************************************************************************
|
||||
* Stream Generator Coordinating Class
|
||||
*/
|
||||
class asynStreamGeneratorDriver : public asynPortDriver {
|
||||
public:
|
||||
asynStreamGeneratorDriver(const char *portName, const char *ipPortName,
|
||||
const int numChannels, const int udpQueueSize,
|
||||
const bool enableKafkaStream,
|
||||
const char *kafkaBroker, const char *monitorTopic,
|
||||
const char *detectorTopic,
|
||||
const int kafkaQueueSize,
|
||||
const int kafkaMaxPacketSize);
|
||||
virtual ~asynStreamGeneratorDriver();
|
||||
|
||||
virtual asynStatus readInt32(asynUser *pasynUser, epicsInt32 *value);
|
||||
virtual asynStatus writeInt32(asynUser *pasynUser, epicsInt32 value);
|
||||
|
||||
void receiveUDP();
|
||||
void normaliseUDP();
|
||||
void partialSortEvents();
|
||||
void processEvents();
|
||||
void produceMonitor();
|
||||
void produceDetector();
|
||||
|
||||
protected:
|
||||
// Parameter Identifying IDs
|
||||
int P_EnableElectronics;
|
||||
int P_EnableElectronicsRBV;
|
||||
int P_Status;
|
||||
int P_Reset;
|
||||
int P_Stop;
|
||||
int P_CountPreset;
|
||||
int P_TimePreset;
|
||||
int P_ElapsedTime;
|
||||
int P_ClearElapsedTime;
|
||||
int P_MonitorChannel;
|
||||
int P_Threshold;
|
||||
int P_ThresholdChannel;
|
||||
int *P_Counts;
|
||||
int *P_Rates;
|
||||
int *P_ClearCounts;
|
||||
|
||||
// System Status Parameter Identifying IDs
|
||||
int P_UdpDropped;
|
||||
int P_UdpQueueHighWaterMark;
|
||||
int P_NormalisedQueueHighWaterMark;
|
||||
int P_SortedQueueHighWaterMark;
|
||||
|
||||
private:
|
||||
asynUser *pasynUDPUser;
|
||||
epicsEventId pausedEventId;
|
||||
|
||||
const std::size_t num_channels;
|
||||
const bool kafkaEnabled;
|
||||
const int kafkaQueueSize;
|
||||
const int kafkaMaxPacketSize;
|
||||
|
||||
const int udpQueueSize;
|
||||
epicsRingBytesId udpQueue;
|
||||
epicsRingBytesId normalisedQueue;
|
||||
epicsRingBytesId sortedQueue;
|
||||
|
||||
epicsRingBytesId monitorQueue;
|
||||
rd_kafka_t *monitorProducer;
|
||||
const std::string monitorTopic;
|
||||
|
||||
epicsRingBytesId detectorQueue;
|
||||
rd_kafka_t *detectorProducer;
|
||||
const std::string detectorTopic;
|
||||
|
||||
static constexpr char driverName[]{"StreamGenerator"};
|
||||
|
||||
asynStatus createInt32Param(asynStatus status, const char *name,
|
||||
int *variable, epicsInt32 initialValue = 0);
|
||||
|
||||
asynStatus createInt64Param(asynStatus status, const char *name,
|
||||
int *variable, epicsInt64 initialValue = 0);
|
||||
|
||||
asynStatus createFloat64Param(asynStatus status, const char *name,
|
||||
int *variable, double initialValue = 0);
|
||||
|
||||
inline void queueForKafka(NormalisedEvent &ne);
|
||||
|
||||
void produce(epicsRingBytesId eventQueue, rd_kafka_t *kafkaProducer,
|
||||
const char *topic, const char *source);
|
||||
};
|
||||
|
||||
#endif
|
||||
342
udp_rate.py
342
udp_rate.py
@@ -1,342 +0,0 @@
|
||||
import queue
|
||||
import socket
|
||||
import time
|
||||
import threading
|
||||
from uuid import uuid4
|
||||
import math
|
||||
|
||||
from confluent_kafka import Producer
|
||||
import streaming_data_types
|
||||
|
||||
# receiving directly (can also specify correlation unit ip)
|
||||
UDP_IP = ""
|
||||
UDP_PORT = 54321
|
||||
|
||||
# If redirecting traffic via
|
||||
# socat -U - udp4-recv:54321 | tee >( socat -u - udp4-datagram:127.0.0.1:54322 ) | socat -u - udp4-datagram:127.0.0.1:54323
|
||||
# UDP_IP = "127.0.0.1"
|
||||
# UDP_PORT = 54323
|
||||
|
||||
WINDOWSECONDS = 5
|
||||
WINDOWSIZE = 20000 * WINDOWSECONDS
|
||||
MONITORS = 4 # We have max 4 monitors
|
||||
|
||||
time_offset = None # Estimate of clock offset
|
||||
|
||||
time_window = {
|
||||
i: queue.Queue(maxsize=WINDOWSIZE)
|
||||
for i in range(MONITORS)
|
||||
}
|
||||
|
||||
# event_time_window = queue.Queue(maxsize=50000 * WINDOWSECONDS)
|
||||
EVENT_WINDOWSIZE = 50000
|
||||
EVENT_WINDOW_PTR = 0
|
||||
event_time_window = [0 for i in range(EVENT_WINDOWSIZE)]
|
||||
|
||||
event_average_rate = 0
|
||||
event_last_timestamp = None
|
||||
|
||||
MISSED_PACKETS = -9 # All modules appear to miss the first time due to initialisation as 0
|
||||
|
||||
# missed_packets_time_window = queue.Queue(maxsize=100)
|
||||
|
||||
def print_monitor_rates():
|
||||
while True:
|
||||
for i in range(MONITORS):
|
||||
msg = f"Monitor {i+1}: {time_window[i].qsize() / WINDOWSECONDS} cts/s"
|
||||
try:
|
||||
earliest = time_window[i].queue[0]
|
||||
newest = max(time_window[i].queue)
|
||||
t = time.time()
|
||||
msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s'
|
||||
except:
|
||||
pass
|
||||
|
||||
print(msg)
|
||||
|
||||
# try:
|
||||
# print(f'Events: {1 / event_average_rate} cts/s')
|
||||
# except:
|
||||
# pass
|
||||
|
||||
try:
|
||||
print(f'Events: {round(1 / (sum(event_time_window) / EVENT_WINDOWSIZE * 1e-7), 2)} cts/s')
|
||||
except:
|
||||
pass
|
||||
|
||||
print(f'Missed Packets: {MISSED_PACKETS}')
|
||||
|
||||
# Detector Events
|
||||
# msg = f"Events : {event_time_window.qsize() / WINDOWSECONDS} cts/s"
|
||||
# try:
|
||||
# earliest = event_time_window.queue[0]
|
||||
# newest = max(event_time_window.queue)
|
||||
# t = time.time()
|
||||
# msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s'
|
||||
# except:
|
||||
# pass
|
||||
|
||||
# print(msg)
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
threading.Thread(target=print_monitor_rates, daemon=True).start()
|
||||
|
||||
def clean_monitor_rates():
|
||||
latest = 0
|
||||
while True:
|
||||
for d_id in range(MONITORS):
|
||||
t_w = time_window[d_id]
|
||||
if not t_w.empty():
|
||||
# TODO probably should switch to a priority queue
|
||||
# as the messages might not be in order
|
||||
# TODO could also just replace with a low-pass filter
|
||||
# would be a lot more efficient
|
||||
# TODO the way this is done, we need trigger events
|
||||
# in order for the signal to decay back to 0.
|
||||
# If no events come, the rate remains stuck
|
||||
latest = max(latest, max(t_w.queue))
|
||||
# latest = time_window[1].queue[-1]
|
||||
try:
|
||||
while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7):
|
||||
t_w.get_nowait()
|
||||
except IndexError:
|
||||
pass
|
||||
time.sleep(0.01)
|
||||
|
||||
threading.Thread(target=clean_monitor_rates, daemon=True).start()
|
||||
|
||||
|
||||
# def clean_event_rates():
|
||||
# latest = 0
|
||||
# while True:
|
||||
# t_w = event_time_window
|
||||
# if not t_w.empty():
|
||||
# # TODO probably should switch to a priority queue
|
||||
# # as the messages might not be in order
|
||||
# # TODO could also just replace with a low-pass filter
|
||||
# # would be a lot more efficient
|
||||
# # TODO the way this is done, we need trigger events
|
||||
# # in order for the signal to decay back to 0.
|
||||
# # If no events come, the rate remains stuck
|
||||
# #latest = max(latest, max(t_w.queue))
|
||||
# try:
|
||||
# latest = time_window[1].queue[-1]
|
||||
# while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7):
|
||||
# t_w.get_nowait()
|
||||
# except IndexError:
|
||||
# pass
|
||||
# time.sleep(0.005)
|
||||
#
|
||||
# threading.Thread(target=clean_event_rates, daemon=True).start()
|
||||
|
||||
|
||||
|
||||
|
||||
# Event Kafka Producer
|
||||
|
||||
event_queue = queue.Queue()
|
||||
|
||||
def event_producer():
|
||||
producer_config = {
|
||||
'bootstrap.servers': "linkafka01:9092",
|
||||
'queue.buffering.max.messages': 1e7,
|
||||
}
|
||||
prod = Producer(producer_config)
|
||||
|
||||
st = time.time()
|
||||
|
||||
msg_id = 0
|
||||
|
||||
b_size = 10000
|
||||
b_ptr = 0
|
||||
pixel_buffer = [0 for _ in range(b_size)]
|
||||
time_buffer = [0 for _ in range(b_size)]
|
||||
poll_cnt = 0
|
||||
|
||||
while True:
|
||||
(p_id, timestamp) = event_queue.get()
|
||||
|
||||
pixel_buffer[b_ptr] = p_id
|
||||
time_buffer[b_ptr] = timestamp
|
||||
b_ptr += 1
|
||||
|
||||
nt = time.time()
|
||||
if b_ptr == b_size or nt - st > 0.001:
|
||||
st = nt
|
||||
|
||||
if b_ptr > 0:
|
||||
message = streaming_data_types.serialise_ev42(
|
||||
message_id = msg_id,
|
||||
pulse_time = time_buffer[0] * 100, # int(time.time() * 1_000_000_000),
|
||||
time_of_flight = time_buffer[0:b_ptr],
|
||||
detector_id = pixel_buffer[0:b_ptr],
|
||||
source_name = '',
|
||||
)
|
||||
|
||||
msg_id = (msg_id + 1) % 100000000
|
||||
b_ptr = 0
|
||||
|
||||
prod.produce(
|
||||
topic = "DMC_detector",
|
||||
value = message,
|
||||
partition = 0,
|
||||
)
|
||||
|
||||
# if poll_cnt % 1000 == 0:
|
||||
prod.poll(0)
|
||||
poll_cnt = (poll_cnt + 1) % 1000
|
||||
|
||||
threading.Thread(target=event_producer, daemon=True).start()
|
||||
|
||||
# Monitor Kafka Producer
|
||||
|
||||
monitor_queue = queue.Queue()
|
||||
|
||||
def monitor_producer():
|
||||
producer_config = {
|
||||
'bootstrap.servers': "linkafka01:9092",
|
||||
'queue.buffering.max.messages': 1e7,
|
||||
}
|
||||
prod = Producer(producer_config)
|
||||
|
||||
monitor_buffer = [0 for i in range(MONITORS)]
|
||||
monitor_time = [0 for i in range(MONITORS)]
|
||||
|
||||
st = time.time()
|
||||
|
||||
poll_cnt = 0
|
||||
|
||||
while True:
|
||||
(d_id, timestamp) = monitor_queue.get()
|
||||
|
||||
monitor_buffer[d_id] += 1
|
||||
monitor_time[d_id] = timestamp
|
||||
|
||||
nt = time.time()
|
||||
if nt - st > 0.05:
|
||||
st = nt
|
||||
|
||||
for i in range(MONITORS):
|
||||
if monitor_buffer[d_id]:
|
||||
message = streaming_data_types.serialise_f142(
|
||||
source_name = f"monitor{d_id+1}",
|
||||
value = monitor_buffer[d_id],
|
||||
# ns resolution (supposed to be past epoch, not what the detector returns though)
|
||||
timestamp_unix_ns = monitor_time[d_id] * 100 # send time of last monitor
|
||||
)
|
||||
|
||||
prod.produce(
|
||||
topic = "DMC_neutron_monitor",
|
||||
value = message,
|
||||
partition = 0,
|
||||
)
|
||||
|
||||
monitor_buffer[d_id] = 0
|
||||
|
||||
if poll_cnt % 1000 == 0:
|
||||
prod.poll(0)
|
||||
poll_cnt = (poll_cnt + 1) % 1000
|
||||
|
||||
threading.Thread(target=monitor_producer, daemon=True).start()
|
||||
|
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sock.bind((UDP_IP, UDP_PORT))
|
||||
|
||||
val = 0
|
||||
start_time = time.time()
|
||||
|
||||
module_counts = [0 for i in range(10)]
|
||||
|
||||
EVENTS = 0
|
||||
|
||||
while True:
|
||||
data, addr = sock.recvfrom(2056) # Buffer size is 1024 bytes
|
||||
raw_header = data[:42]
|
||||
raw_data = data[42:]
|
||||
|
||||
(buffer_length, buffer_type, header_length,
|
||||
buffer_number, run_id, mcpd_status,
|
||||
t_low, t_mid, t_high, *_) = memoryview(raw_header).cast('H')
|
||||
mcpd_id = ( mcpd_status >> 8 ) & 0xff
|
||||
mcpd_status = ( mcpd_status ) & 0x3
|
||||
running_msg = "running" if (mcpd_status & 0x1) else "stopped"
|
||||
sync_msg = "in sync" if (mcpd_status & 0x2) else "sync error"
|
||||
timestamp = ( t_high << 32 ) | ( t_mid << 16 ) | t_low # 100 ns resolution
|
||||
#print(f'Packet {int(timestamp * 1e-7)}s => buffer: {buffer_number}, length: {int(buffer_length*2/6)} events, status: {mcpd_status} {mcpd_id} {running_msg} with {sync_msg}')
|
||||
# print(f'Packet => buffer: {mcpd_id}-{buffer_number}, length: {int((buffer_length-21)/3)} events, status: {mcpd_status}')
|
||||
|
||||
if time_offset is None:
|
||||
time_offset = time.time() * 1e7 - timestamp
|
||||
|
||||
if buffer_number - module_counts[mcpd_id] != 1:
|
||||
MISSED_PACKETS += 1
|
||||
# if missed_packets_time_window.full():
|
||||
# missed_packets_time_window.get_nowait()
|
||||
# missed_packets_time_window.put(timestamp)
|
||||
|
||||
module_counts[mcpd_id] = buffer_number
|
||||
|
||||
for i in range(0, len(raw_data), 6):
|
||||
event = memoryview(raw_data)[i:i+6]
|
||||
event_type = event[5] >> 7
|
||||
# print(event_type)
|
||||
|
||||
if event_type: # Trigger Event
|
||||
t_id = ( event[5] >> 4 ) & 0x7
|
||||
d_id = event[5] & 0xf
|
||||
event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0]
|
||||
# print(f'Trigger event {event_timestamp * 1e-7}s => TrigID: {t_id}, DataID: {d_id}')
|
||||
|
||||
t_w = time_window[d_id]
|
||||
t_w.put_nowait(event_timestamp)
|
||||
|
||||
monitor_queue.put_nowait((d_id, event_timestamp))
|
||||
|
||||
else: # Neutron Event
|
||||
x_pixels = 128
|
||||
y_pixels = 128
|
||||
amplitude = ( event[5] << 1 ) | ( event[4] >> 7 )
|
||||
|
||||
# The DMC StreamHistogrammer setup currently expects each module to
|
||||
# be 128 * 128 pixels but the resolution in the packages is
|
||||
# actually 10bit. We remove the lowest 3 bits.
|
||||
x = (( (event[3] & 0x1f) << 5 | (event[2] & 0xf8) >> 3 ) & 0x3ff) >> 3
|
||||
y = (( (event[4] & 0x7f) << 3 | (event[3] & 0xe0) >> 5 ) & 0x3ff) >> 3
|
||||
event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0]
|
||||
# print(f'Neutron event {event_timestamp * 1e-7}s: {amplitude}, x: {x}, y: {y}')
|
||||
|
||||
|
||||
if event_last_timestamp is None:
|
||||
event_last_timestamp = event_timestamp
|
||||
|
||||
# Seems like at higher frequencies these come very much out of order
|
||||
# so this is very approximate
|
||||
event_time_window[EVENT_WINDOW_PTR] = event_timestamp - event_last_timestamp
|
||||
EVENT_WINDOW_PTR = (EVENT_WINDOW_PTR + 1) % EVENT_WINDOWSIZE
|
||||
event_last_timestamp = event_timestamp
|
||||
|
||||
# I suppose this doesn't work mostly due to the timestamps ordering...
|
||||
# event_timestamp_seconds = event_timestamp * 1e-7
|
||||
# if event_last_timestamp is None:
|
||||
# event_last_timestamp = event_timestamp_seconds
|
||||
|
||||
# f_cutoff = 1e6 # Hz
|
||||
# tau = 1 / ( 2 * math.pi * f_cutoff)
|
||||
# dt = event_timestamp_seconds - event_last_timestamp
|
||||
# if dt > 0:
|
||||
# w = math.exp(-dt / tau)
|
||||
# event_average_rate = w * dt + event_average_rate * (1 - w)
|
||||
# event_last_timestamp = event_timestamp_seconds
|
||||
|
||||
# EVENTS += 1
|
||||
# a = (mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y
|
||||
# print((EVENTS, x, y, a, a < 128 * 128 * 9, mcpd_id))
|
||||
# if not a < 128 * 128 * 9:
|
||||
# print((event[3], event[3] << 5, event[2], event[2] >> 3))
|
||||
|
||||
event_queue.put_nowait((
|
||||
(mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y,
|
||||
event_timestamp
|
||||
))
|
||||
Reference in New Issue
Block a user