Compare commits
60 Commits
python_var
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 2bfde8c1c6 | |||
| c58e379584 | |||
| aa00966599 | |||
| 8ae9cb0bd8 | |||
| 31aa9d246a | |||
| f169076f65 | |||
| fdbb8f5061 | |||
| 668dd65823 | |||
| 5b65a01e51 | |||
| 594bb6d320 | |||
| 7865273707 | |||
| 2ede400791 | |||
| a13c5b81e2 | |||
| 66792837a6 | |||
| c563b07fed | |||
| dc5244bc43 | |||
| 2f60ac2a24 | |||
| d7a4d057aa | |||
| 0819c5fb12 | |||
| a7c5f9413b | |||
| 205eedbd88 | |||
| d80155ef7d | |||
| ba3c3b5208 | |||
| 5ffd784769 | |||
| 9bfaabdd99 | |||
| 9d93238db4 | |||
| c530de3566 | |||
| ba07a8af9b | |||
| 77ed74a203 | |||
| 8f8b78a9bf | |||
| 6faf23601e | |||
| 18da14f6d6 | |||
| 9d5ed11dac | |||
| 318357127e | |||
| 2f50a21e83 | |||
| e53a2a4f40 | |||
| 5f95e82a3c | |||
| 2ccf37ce33 | |||
| 617dd3153b | |||
| e5cb019143 | |||
| 70c04af034 | |||
| 056b0a5f8a | |||
| 1ce7f93e95 | |||
| ecc6e98f4c | |||
| 2c47f338c2 | |||
| 60aa1652c3 | |||
| 81bd3bef7f | |||
| e65725609c | |||
| a336ca74c9 | |||
| 7bacc716cc | |||
| 1e853487aa | |||
| b9e5f40c21 | |||
| d7bf3977fc | |||
| 750436732c | |||
| 4c1741bd4b | |||
| 09ba30025a | |||
| 2d065a0db9 | |||
| 2d5a43c09a | |||
| c2ca5f699c | |||
| b810509156 |
245
.clang-format
Normal file
245
.clang-format
Normal file
@@ -0,0 +1,245 @@
|
|||||||
|
---
|
||||||
|
Language: Cpp
|
||||||
|
# BasedOnStyle: LLVM
|
||||||
|
AccessModifierOffset: -2
|
||||||
|
AlignAfterOpenBracket: Align
|
||||||
|
AlignArrayOfStructures: None
|
||||||
|
AlignConsecutiveAssignments:
|
||||||
|
Enabled: false
|
||||||
|
AcrossEmptyLines: false
|
||||||
|
AcrossComments: false
|
||||||
|
AlignCompound: false
|
||||||
|
AlignFunctionPointers: false
|
||||||
|
PadOperators: true
|
||||||
|
AlignConsecutiveBitFields:
|
||||||
|
Enabled: false
|
||||||
|
AcrossEmptyLines: false
|
||||||
|
AcrossComments: false
|
||||||
|
AlignCompound: false
|
||||||
|
AlignFunctionPointers: false
|
||||||
|
PadOperators: false
|
||||||
|
AlignConsecutiveDeclarations:
|
||||||
|
Enabled: false
|
||||||
|
AcrossEmptyLines: false
|
||||||
|
AcrossComments: false
|
||||||
|
AlignCompound: false
|
||||||
|
AlignFunctionPointers: false
|
||||||
|
PadOperators: false
|
||||||
|
AlignConsecutiveMacros:
|
||||||
|
Enabled: false
|
||||||
|
AcrossEmptyLines: false
|
||||||
|
AcrossComments: false
|
||||||
|
AlignCompound: false
|
||||||
|
AlignFunctionPointers: false
|
||||||
|
PadOperators: false
|
||||||
|
AlignConsecutiveShortCaseStatements:
|
||||||
|
Enabled: false
|
||||||
|
AcrossEmptyLines: false
|
||||||
|
AcrossComments: false
|
||||||
|
AlignCaseColons: false
|
||||||
|
AlignEscapedNewlines: Right
|
||||||
|
AlignOperands: Align
|
||||||
|
AlignTrailingComments:
|
||||||
|
Kind: Always
|
||||||
|
OverEmptyLines: 0
|
||||||
|
AllowAllArgumentsOnNextLine: true
|
||||||
|
AllowAllParametersOfDeclarationOnNextLine: true
|
||||||
|
AllowBreakBeforeNoexceptSpecifier: Never
|
||||||
|
AllowShortBlocksOnASingleLine: Never
|
||||||
|
AllowShortCaseLabelsOnASingleLine: false
|
||||||
|
AllowShortCompoundRequirementOnASingleLine: true
|
||||||
|
AllowShortEnumsOnASingleLine: true
|
||||||
|
AllowShortFunctionsOnASingleLine: All
|
||||||
|
AllowShortIfStatementsOnASingleLine: Never
|
||||||
|
AllowShortLambdasOnASingleLine: All
|
||||||
|
AllowShortLoopsOnASingleLine: false
|
||||||
|
AlwaysBreakAfterDefinitionReturnType: None
|
||||||
|
AlwaysBreakAfterReturnType: None
|
||||||
|
AlwaysBreakBeforeMultilineStrings: false
|
||||||
|
AlwaysBreakTemplateDeclarations: MultiLine
|
||||||
|
AttributeMacros:
|
||||||
|
- __capability
|
||||||
|
BinPackArguments: true
|
||||||
|
BinPackParameters: true
|
||||||
|
BitFieldColonSpacing: Both
|
||||||
|
BraceWrapping:
|
||||||
|
AfterCaseLabel: false
|
||||||
|
AfterClass: false
|
||||||
|
AfterControlStatement: Never
|
||||||
|
AfterEnum: false
|
||||||
|
AfterExternBlock: false
|
||||||
|
AfterFunction: false
|
||||||
|
AfterNamespace: false
|
||||||
|
AfterObjCDeclaration: false
|
||||||
|
AfterStruct: false
|
||||||
|
AfterUnion: false
|
||||||
|
BeforeCatch: false
|
||||||
|
BeforeElse: false
|
||||||
|
BeforeLambdaBody: false
|
||||||
|
BeforeWhile: false
|
||||||
|
IndentBraces: false
|
||||||
|
SplitEmptyFunction: true
|
||||||
|
SplitEmptyRecord: true
|
||||||
|
SplitEmptyNamespace: true
|
||||||
|
BreakAdjacentStringLiterals: true
|
||||||
|
BreakAfterAttributes: Leave
|
||||||
|
BreakAfterJavaFieldAnnotations: false
|
||||||
|
BreakArrays: true
|
||||||
|
BreakBeforeBinaryOperators: None
|
||||||
|
BreakBeforeConceptDeclarations: Always
|
||||||
|
BreakBeforeBraces: Attach
|
||||||
|
BreakBeforeInlineASMColon: OnlyMultiline
|
||||||
|
BreakBeforeTernaryOperators: true
|
||||||
|
BreakConstructorInitializers: BeforeColon
|
||||||
|
BreakInheritanceList: BeforeColon
|
||||||
|
BreakStringLiterals: true
|
||||||
|
ColumnLimit: 80
|
||||||
|
CommentPragmas: '^ IWYU pragma:'
|
||||||
|
CompactNamespaces: false
|
||||||
|
ConstructorInitializerIndentWidth: 4
|
||||||
|
ContinuationIndentWidth: 4
|
||||||
|
Cpp11BracedListStyle: true
|
||||||
|
DerivePointerAlignment: false
|
||||||
|
DisableFormat: false
|
||||||
|
EmptyLineAfterAccessModifier: Never
|
||||||
|
EmptyLineBeforeAccessModifier: LogicalBlock
|
||||||
|
ExperimentalAutoDetectBinPacking: false
|
||||||
|
FixNamespaceComments: true
|
||||||
|
ForEachMacros:
|
||||||
|
- foreach
|
||||||
|
- Q_FOREACH
|
||||||
|
- BOOST_FOREACH
|
||||||
|
IfMacros:
|
||||||
|
- KJ_IF_MAYBE
|
||||||
|
IncludeBlocks: Preserve
|
||||||
|
IncludeCategories:
|
||||||
|
- Regex: '^"(llvm|llvm-c|clang|clang-c)/'
|
||||||
|
Priority: 2
|
||||||
|
SortPriority: 0
|
||||||
|
CaseSensitive: false
|
||||||
|
- Regex: '^(<|"(gtest|gmock|isl|json)/)'
|
||||||
|
Priority: 3
|
||||||
|
SortPriority: 0
|
||||||
|
CaseSensitive: false
|
||||||
|
- Regex: '.*'
|
||||||
|
Priority: 1
|
||||||
|
SortPriority: 0
|
||||||
|
CaseSensitive: false
|
||||||
|
IncludeIsMainRegex: '(Test)?$'
|
||||||
|
IncludeIsMainSourceRegex: ''
|
||||||
|
IndentAccessModifiers: false
|
||||||
|
IndentCaseBlocks: false
|
||||||
|
IndentCaseLabels: false
|
||||||
|
IndentExternBlock: AfterExternBlock
|
||||||
|
IndentGotoLabels: true
|
||||||
|
IndentPPDirectives: None
|
||||||
|
IndentRequiresClause: true
|
||||||
|
IndentWidth: 4
|
||||||
|
IndentWrappedFunctionNames: false
|
||||||
|
InsertBraces: false
|
||||||
|
InsertNewlineAtEOF: false
|
||||||
|
InsertTrailingCommas: None
|
||||||
|
IntegerLiteralSeparator:
|
||||||
|
Binary: 0
|
||||||
|
BinaryMinDigits: 0
|
||||||
|
Decimal: 0
|
||||||
|
DecimalMinDigits: 0
|
||||||
|
Hex: 0
|
||||||
|
HexMinDigits: 0
|
||||||
|
JavaScriptQuotes: Leave
|
||||||
|
JavaScriptWrapImports: true
|
||||||
|
KeepEmptyLinesAtTheStartOfBlocks: true
|
||||||
|
KeepEmptyLinesAtEOF: false
|
||||||
|
LambdaBodyIndentation: Signature
|
||||||
|
LineEnding: DeriveLF
|
||||||
|
MacroBlockBegin: ''
|
||||||
|
MacroBlockEnd: ''
|
||||||
|
MaxEmptyLinesToKeep: 1
|
||||||
|
NamespaceIndentation: None
|
||||||
|
ObjCBinPackProtocolList: Auto
|
||||||
|
ObjCBlockIndentWidth: 2
|
||||||
|
ObjCBreakBeforeNestedBlockParam: true
|
||||||
|
ObjCSpaceAfterProperty: false
|
||||||
|
ObjCSpaceBeforeProtocolList: true
|
||||||
|
PackConstructorInitializers: BinPack
|
||||||
|
PenaltyBreakAssignment: 2
|
||||||
|
PenaltyBreakBeforeFirstCallParameter: 19
|
||||||
|
PenaltyBreakComment: 300
|
||||||
|
PenaltyBreakFirstLessLess: 120
|
||||||
|
PenaltyBreakOpenParenthesis: 0
|
||||||
|
PenaltyBreakScopeResolution: 500
|
||||||
|
PenaltyBreakString: 1000
|
||||||
|
PenaltyBreakTemplateDeclaration: 10
|
||||||
|
PenaltyExcessCharacter: 1000000
|
||||||
|
PenaltyIndentedWhitespace: 0
|
||||||
|
PenaltyReturnTypeOnItsOwnLine: 60
|
||||||
|
PointerAlignment: Right
|
||||||
|
PPIndentWidth: -1
|
||||||
|
QualifierAlignment: Leave
|
||||||
|
ReferenceAlignment: Pointer
|
||||||
|
ReflowComments: true
|
||||||
|
RemoveBracesLLVM: false
|
||||||
|
RemoveParentheses: Leave
|
||||||
|
RemoveSemicolon: false
|
||||||
|
RequiresClausePosition: OwnLine
|
||||||
|
RequiresExpressionIndentation: OuterScope
|
||||||
|
SeparateDefinitionBlocks: Leave
|
||||||
|
ShortNamespaceLines: 1
|
||||||
|
SkipMacroDefinitionBody: false
|
||||||
|
SortIncludes: CaseSensitive
|
||||||
|
SortJavaStaticImport: Before
|
||||||
|
SortUsingDeclarations: LexicographicNumeric
|
||||||
|
SpaceAfterCStyleCast: false
|
||||||
|
SpaceAfterLogicalNot: false
|
||||||
|
SpaceAfterTemplateKeyword: true
|
||||||
|
SpaceAroundPointerQualifiers: Default
|
||||||
|
SpaceBeforeAssignmentOperators: true
|
||||||
|
SpaceBeforeCaseColon: false
|
||||||
|
SpaceBeforeCpp11BracedList: false
|
||||||
|
SpaceBeforeCtorInitializerColon: true
|
||||||
|
SpaceBeforeInheritanceColon: true
|
||||||
|
SpaceBeforeJsonColon: false
|
||||||
|
SpaceBeforeParens: ControlStatements
|
||||||
|
SpaceBeforeParensOptions:
|
||||||
|
AfterControlStatements: true
|
||||||
|
AfterForeachMacros: true
|
||||||
|
AfterFunctionDefinitionName: false
|
||||||
|
AfterFunctionDeclarationName: false
|
||||||
|
AfterIfMacros: true
|
||||||
|
AfterOverloadedOperator: false
|
||||||
|
AfterPlacementOperator: true
|
||||||
|
AfterRequiresInClause: false
|
||||||
|
AfterRequiresInExpression: false
|
||||||
|
BeforeNonEmptyParentheses: false
|
||||||
|
SpaceBeforeRangeBasedForLoopColon: true
|
||||||
|
SpaceBeforeSquareBrackets: false
|
||||||
|
SpaceInEmptyBlock: false
|
||||||
|
SpacesBeforeTrailingComments: 1
|
||||||
|
SpacesInAngles: Never
|
||||||
|
SpacesInContainerLiterals: true
|
||||||
|
SpacesInLineCommentPrefix:
|
||||||
|
Minimum: 1
|
||||||
|
Maximum: -1
|
||||||
|
SpacesInParens: Never
|
||||||
|
SpacesInParensOptions:
|
||||||
|
InCStyleCasts: false
|
||||||
|
InConditionalStatements: false
|
||||||
|
InEmptyParentheses: false
|
||||||
|
Other: false
|
||||||
|
SpacesInSquareBrackets: false
|
||||||
|
Standard: Latest
|
||||||
|
StatementAttributeLikeMacros:
|
||||||
|
- Q_EMIT
|
||||||
|
StatementMacros:
|
||||||
|
- Q_UNUSED
|
||||||
|
- QT_REQUIRE_VERSION
|
||||||
|
TabWidth: 8
|
||||||
|
UseTab: Never
|
||||||
|
VerilogBreakBetweenInstancePorts: true
|
||||||
|
WhitespaceSensitiveMacros:
|
||||||
|
- BOOST_PP_STRINGIZE
|
||||||
|
- CF_SWIFT_NAME
|
||||||
|
- NS_SWIFT_NAME
|
||||||
|
- PP_STRINGIZE
|
||||||
|
- STRINGIZE
|
||||||
|
...
|
||||||
35
.gitea/workflows/action.yaml
Normal file
35
.gitea/workflows/action.yaml
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
name: Test And Build
|
||||||
|
on: [push]
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
Lint:
|
||||||
|
runs-on: linepics
|
||||||
|
steps:
|
||||||
|
- name: checkout repo
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
- name: formatting
|
||||||
|
run: clang-format --style=file --Werror --dry-run src/*.cpp
|
||||||
|
- name: cppcheck
|
||||||
|
run: cppcheck --std=c++17 --addon=misc --error-exitcode=1 src/*.cpp
|
||||||
|
Build:
|
||||||
|
runs-on: linepics
|
||||||
|
steps:
|
||||||
|
- name: checkout repo
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
with:
|
||||||
|
submodules: 'true'
|
||||||
|
- name: install dependencies
|
||||||
|
run: |
|
||||||
|
dnf install -y librdkafka-devel
|
||||||
|
- name: prepare flatbuffers
|
||||||
|
run: |
|
||||||
|
pushd dep/flatbuffers
|
||||||
|
cmake -G "Unix Makefiles"
|
||||||
|
make -j
|
||||||
|
popd
|
||||||
|
./dep/flatbuffers/flatc -o schemas/ --cpp --gen-mutable --gen-name-strings --scoped-enums ./dep/streaming-data-types/schemas/*
|
||||||
|
- name: build module
|
||||||
|
run: |
|
||||||
|
sed -i 's/ARCH_FILTER=.*/ARCH_FILTER=linux%/' Makefile
|
||||||
|
echo -e "\nIGNORE_SUBMODULES += streaming-data-types flatbuffers" >> Makefile
|
||||||
|
make install
|
||||||
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
O.*
|
||||||
|
.*ignore
|
||||||
|
schemas/
|
||||||
6
.gitmodules
vendored
Normal file
6
.gitmodules
vendored
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
[submodule "dep/streaming-data-types"]
|
||||||
|
path = dep/streaming-data-types
|
||||||
|
url = git@gitea.psi.ch:lin-controls/streaming-data-types.git
|
||||||
|
[submodule "dep/flatbuffers"]
|
||||||
|
path = dep/flatbuffers
|
||||||
|
url = git@gitea.psi.ch:lin-controls/flatbuffers.git
|
||||||
29
Makefile
Normal file
29
Makefile
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
# Include the external Makefile
|
||||||
|
include /ioc/tools/driver.makefile
|
||||||
|
|
||||||
|
MODULE=StreamGenerator
|
||||||
|
BUILDCLASSES=Linux
|
||||||
|
EPICS_VERSIONS=7.0.7
|
||||||
|
ARCH_FILTER=RHEL%
|
||||||
|
|
||||||
|
# Additional module dependencies
|
||||||
|
REQUIRED+=asyn
|
||||||
|
|
||||||
|
DBDS += src/asynStreamGeneratorDriver.dbd
|
||||||
|
|
||||||
|
# DB files to include in the release
|
||||||
|
TEMPLATES += db/channels.db db/daq_common.db db/correlation_unit.db
|
||||||
|
|
||||||
|
# HEADERS += src/asynStreamGeneratorDriver.h
|
||||||
|
|
||||||
|
# Source files to build
|
||||||
|
SOURCES += src/asynStreamGeneratorDriver.cpp
|
||||||
|
|
||||||
|
# I don't think specifying the optimisation level like this is correct...
|
||||||
|
# but I doesn't hurt :D
|
||||||
|
USR_CFLAGS += -O3 -Wall -Wextra -Wunused-result -Werror -fvisibility=hidden # -Wpedantic // Does not work because EPICS macros trigger warnings
|
||||||
|
|
||||||
|
# Required to support EV42/44
|
||||||
|
USR_CXXFLAGS += -std=c++17 -O3 -I../dep/flatbuffers/include/ -I../schemas
|
||||||
|
|
||||||
|
LIB_SYS_LIBS += rdkafka
|
||||||
59
README.md
Normal file
59
README.md
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
# StreamGenerator
|
||||||
|
|
||||||
|
The StreamGenerator, is an Epics module that almost completely implements the
|
||||||
|
same interface as the [sinqDAQ](https://gitea.psi.ch/lin-epics-modules/sinqDAQ)
|
||||||
|
Epics module, as an attempt to maintain a consistent interface across all our
|
||||||
|
data acquisition systems. Sitting behind the interface, however, is different
|
||||||
|
electronics and firmware, specifically, a multi-readout module "Correlation
|
||||||
|
Unit" system, which was developed at MLZ.
|
||||||
|
|
||||||
|
## Correlation Unit Documentation
|
||||||
|
|
||||||
|
The UDP based interface, as well as many other specifics of the detector system
|
||||||
|
are described in the first file, and within the `doc` directory of the
|
||||||
|
`qmesydaq` repository.
|
||||||
|
|
||||||
|
- [20220608\_ErwiN\_Detector.pdf](./docs/20220608_ErwiN_Detector.pdf)
|
||||||
|
- [qmesydaq/doc](https://gitea.psi.ch/lin-controls/qmesydaq/src/branch/master/doc/)
|
||||||
|
|
||||||
|
## Retrieving Code
|
||||||
|
|
||||||
|
Clone the repository to a local directory via:
|
||||||
|
|
||||||
|
```
|
||||||
|
git clone --recurse-submodules -j8 git@gitea.psi.ch:lin-epics-modules/StreamGenerator.git
|
||||||
|
```
|
||||||
|
|
||||||
|
## Dependencies
|
||||||
|
|
||||||
|
Currently, this project requires a system install of librdkafka. On Redhat,
|
||||||
|
this means you should run:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
dnf install -y librdkafka-devel
|
||||||
|
```
|
||||||
|
|
||||||
|
Additionally, you must first build Google's *flatbuffers* and ESS's
|
||||||
|
**streaming-data-types** libraries, which are both included in this project as
|
||||||
|
submodules under the `dep` directory and which are both necessary to build this
|
||||||
|
project.
|
||||||
|
|
||||||
|
First, you should enter the *flatbuffers* directory and run the following:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cmake -G "Unix Makefiles"
|
||||||
|
make -j
|
||||||
|
```
|
||||||
|
|
||||||
|
After these steps, you will find the program `flatc` has been built and placed
|
||||||
|
in the directory.
|
||||||
|
|
||||||
|
Next, you should return to the top of this project's directory tree, and create
|
||||||
|
the flatbuffers from ESS's schema files. This you can do as follows:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
./dep/flatbuffers/flatc -o schemas/ --cpp --gen-mutable --gen-name-strings --scoped-enums ./dep/streaming-data-types/schemas/*
|
||||||
|
```
|
||||||
|
|
||||||
|
This generates header files from each of ESS's schemas and places them in a
|
||||||
|
schemas directory.
|
||||||
76
db/channels.db
Normal file
76
db/channels.db
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
# EPICS Database for streamdevice specific to measurement channels
|
||||||
|
#
|
||||||
|
# Macros
|
||||||
|
# INSTR - Prefix
|
||||||
|
# NAME - the device name, e.g. EL737
|
||||||
|
# PORT - StreamGenerator Port
|
||||||
|
# CHANNEL - the number associated with the measurment channel
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
# Status Variables
|
||||||
|
|
||||||
|
# Trigger a change in status as clearing
|
||||||
|
record(bo, "$(INSTR)$(NAME):T$(CHANNEL)")
|
||||||
|
{
|
||||||
|
field(DESC, "Trigger Clearing Status")
|
||||||
|
field(VAL, 1)
|
||||||
|
field(OUT, "$(INSTR)$(NAME):S$(CHANNEL) PP")
|
||||||
|
}
|
||||||
|
|
||||||
|
# Trigger a change in status as value returned to 0
|
||||||
|
record(seq, "$(INSTR)$(NAME):O$(CHANNEL)")
|
||||||
|
{
|
||||||
|
field(DESC, "Trigger Returned to 0 Status")
|
||||||
|
field(LNK0, "$(INSTR)$(NAME):S$(CHANNEL) PP")
|
||||||
|
field(DO0, 0)
|
||||||
|
field(SELM, "Specified")
|
||||||
|
field(SELL, "$(INSTR)$(NAME):M$(CHANNEL).VAL")
|
||||||
|
field(SCAN, ".1 second")
|
||||||
|
}
|
||||||
|
|
||||||
|
# Current Status of Channel, i.e. is it ready to count?
|
||||||
|
record(bi, "$(INSTR)$(NAME):S$(CHANNEL)")
|
||||||
|
{
|
||||||
|
field(DESC, "Channel Status")
|
||||||
|
field(VAL, 0)
|
||||||
|
field(ZNAM, "OK")
|
||||||
|
field(ONAM, "CLEARING")
|
||||||
|
field(PINI, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
# Count Commands
|
||||||
|
|
||||||
|
record(longout, "$(INSTR)$(NAME):C$(CHANNEL)")
|
||||||
|
{
|
||||||
|
field(DESC, "Clear the current channel count")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) C_$(CHANNEL)")
|
||||||
|
field(FLNK, "$(INSTR)$(NAME):T$(CHANNEL)")
|
||||||
|
}
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
# Read all monitors values
|
||||||
|
|
||||||
|
record(int64in, "$(INSTR)$(NAME):M$(CHANNEL)")
|
||||||
|
{
|
||||||
|
field(DESC, "DAQ CH$(CHANNEL)")
|
||||||
|
field(EGU, "cts")
|
||||||
|
field(DTYP, "asynInt64")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) COUNTS$(CHANNEL)")
|
||||||
|
# This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order
|
||||||
|
# field(SCAN, "I/O Intr")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(ai, "$(INSTR)$(NAME):R$(CHANNEL)")
|
||||||
|
{
|
||||||
|
field(DESC, "Rate of DAQ CH$(CHANNEL)")
|
||||||
|
field(EGU, "cts/sec")
|
||||||
|
field(DTYP, "asynFloat64")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) RATE$(CHANNEL)")
|
||||||
|
field(SCAN, ".5 second")
|
||||||
|
field(PREC, 2)
|
||||||
|
# field(SCAN, "I/O Intr")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
73
db/correlation_unit.db
Normal file
73
db/correlation_unit.db
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
# EPICS Database for streamdevice specific to measurement channels
|
||||||
|
#
|
||||||
|
# Macros
|
||||||
|
# INSTR - Prefix
|
||||||
|
# NAME - the device name, e.g. EL737
|
||||||
|
# PORT - StreamGenerator Port
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
# Status Variables
|
||||||
|
|
||||||
|
record(bo, "$(INSTR)$(NAME):Enable")
|
||||||
|
{
|
||||||
|
field(DESC, "Electronics Status")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) EN_EL")
|
||||||
|
field(ZNAM, "OFF")
|
||||||
|
field(ONAM, "ON")
|
||||||
|
field(PINI, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
record(bi, "$(INSTR)$(NAME):Enable_RBV")
|
||||||
|
{
|
||||||
|
field(DESC, "Electronics Status")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) EN_EL_RBV")
|
||||||
|
field(ZNAM, "OFF")
|
||||||
|
field(ONAM, "ON")
|
||||||
|
field(SCAN, ".5 second")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longin,"$(INSTR)$(NAME):UDP_DROPPED")
|
||||||
|
{
|
||||||
|
field(DESC, "UDP Packets Missed")
|
||||||
|
field(EGU, "Events")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) DROP")
|
||||||
|
# field(SCAN, "I/O Intr")
|
||||||
|
field(SCAN, "1 second")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longin,"$(INSTR)$(NAME):UDP_WATERMARK")
|
||||||
|
{
|
||||||
|
field(DESC, "UDP Queue Usage")
|
||||||
|
field(EGU, "%")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) UDP")
|
||||||
|
# field(SCAN, "I/O Intr")
|
||||||
|
field(SCAN, "1 second")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longin,"$(INSTR)$(NAME):NORMALISED_WATERMARK")
|
||||||
|
{
|
||||||
|
field(DESC, "Normalised Queue Usage")
|
||||||
|
field(EGU, "%")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) NORM")
|
||||||
|
# field(SCAN, "I/O Intr")
|
||||||
|
field(SCAN, "1 second")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longin,"$(INSTR)$(NAME):SORTED_WATERMARK")
|
||||||
|
{
|
||||||
|
field(DESC, "Sort Queue Usage")
|
||||||
|
field(EGU, "%")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) SORT")
|
||||||
|
# field(SCAN, "I/O Intr")
|
||||||
|
field(SCAN, "1 second")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
238
db/daq_common.db
Normal file
238
db/daq_common.db
Normal file
@@ -0,0 +1,238 @@
|
|||||||
|
# EPICS Database for streamdevice specific to measurement channels
|
||||||
|
#
|
||||||
|
# Macros
|
||||||
|
# INSTR - Prefix
|
||||||
|
# NAME - the device name, e.g. EL737
|
||||||
|
# PORT - StreamGenerator Port
|
||||||
|
|
||||||
|
record(longout, "$(INSTR)$(NAME):FULL-RESET")
|
||||||
|
{
|
||||||
|
field(DESC, "Reset the DAQ")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) RESET")
|
||||||
|
}
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
# Status Variables
|
||||||
|
|
||||||
|
record(stringin, "$(INSTR)$(NAME):MsgTxt")
|
||||||
|
{
|
||||||
|
field(DESC, "Unexpected received response")
|
||||||
|
}
|
||||||
|
|
||||||
|
# We separate the RAW-STATUS and the STATUS PV so that the state can be updated
|
||||||
|
# in a sequence, that guarantees that we included the most recent time and
|
||||||
|
# counts before the status switches back to Idle.
|
||||||
|
# We do this via a sequenced update
|
||||||
|
#
|
||||||
|
# RAW-STATUS -> ELAPSED-SECONDS -> M* -> STATUS
|
||||||
|
record(mbbi, "$(INSTR)$(NAME):RAW-STATUS")
|
||||||
|
{
|
||||||
|
field(DESC, "DAQ Status")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) STATUS")
|
||||||
|
field(ZRVL, "0")
|
||||||
|
field(ZRST, "Idle")
|
||||||
|
field(ONVL, "1")
|
||||||
|
field(ONST, "Counting")
|
||||||
|
field(TWVL, "2")
|
||||||
|
field(TWST, "Low rate")
|
||||||
|
field(THVL, "3")
|
||||||
|
field(THST, "Paused")
|
||||||
|
# 4 should never happen, if it does it means the DAQ reports undocumented statusbits
|
||||||
|
field(FRVL, "4")
|
||||||
|
field(FRST, "INVALID")
|
||||||
|
# This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order
|
||||||
|
# field(SCAN, "I/O Intr")
|
||||||
|
field(SCAN, ".1 second")
|
||||||
|
field(FLNK, "$(INSTR)$(NAME):READALL")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(fanout, "$(INSTR)$(NAME):READALL")
|
||||||
|
{
|
||||||
|
field(SELM, "All")
|
||||||
|
field(LNK0, "$(INSTR)$(NAME):ELAPSED-TIME")
|
||||||
|
field(LNK1, "$(INSTR)$(NAME):M1")
|
||||||
|
field(LNK2, "$(INSTR)$(NAME):M2")
|
||||||
|
field(LNK3, "$(INSTR)$(NAME):M3")
|
||||||
|
field(LNK4, "$(INSTR)$(NAME):M4")
|
||||||
|
field(LNK5, "$(INSTR)$(NAME):M5")
|
||||||
|
# Doesn't seemt o be a problem to have more in here :D
|
||||||
|
# field(LNK6, "$(INSTR)$(NAME):M5")
|
||||||
|
# field(LNK7, "$(INSTR)$(NAME):M6")
|
||||||
|
field(FLNK, "$(INSTR)$(NAME):STATUS")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(mbbi, "$(INSTR)$(NAME):STATUS")
|
||||||
|
{
|
||||||
|
field(INP, "$(INSTR)$(NAME):RAW-STATUS NPP")
|
||||||
|
field(DESC, "DAQ Status")
|
||||||
|
field(ZRVL, "0")
|
||||||
|
field(ZRST, "Idle")
|
||||||
|
field(ONVL, "1")
|
||||||
|
field(ONST, "Counting")
|
||||||
|
field(TWVL, "2")
|
||||||
|
field(TWST, "Low rate")
|
||||||
|
field(THVL, "3")
|
||||||
|
field(THST, "Paused")
|
||||||
|
# 4 should never happen, if it does it means the DAQ reports undocumented statusbits
|
||||||
|
field(FRVL, "4")
|
||||||
|
field(FRST, "INVALID")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longin, "$(INSTR)$(NAME):CHANNELS")
|
||||||
|
{
|
||||||
|
field(DESC, "Total Supported Channels")
|
||||||
|
field(VAL, $(CHANNELS))
|
||||||
|
field(DISP, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
# Trigger a change in status as clearing
|
||||||
|
record(bo, "$(INSTR)$(NAME):ETT")
|
||||||
|
{
|
||||||
|
field(DESC, "Trigger Clearing Status")
|
||||||
|
field(VAL, 1)
|
||||||
|
field(OUT, "$(INSTR)$(NAME):ETS PP")
|
||||||
|
}
|
||||||
|
|
||||||
|
# Trigger a change in status as value returned to 0
|
||||||
|
record(seq, "$(INSTR)$(NAME):ETO")
|
||||||
|
{
|
||||||
|
field(DESC, "Trigger Returned to 0 Status")
|
||||||
|
field(LNK0, "$(INSTR)$(NAME):ETS PP")
|
||||||
|
field(DO0, 0)
|
||||||
|
field(SELM, "Specified")
|
||||||
|
field(SELL, "$(INSTR)$(NAME):ELAPSED-TIME.VAL")
|
||||||
|
field(SCAN, ".1 second")
|
||||||
|
}
|
||||||
|
|
||||||
|
# Current Status of Channel, i.e. is it ready to count?
|
||||||
|
record(bi, "$(INSTR)$(NAME):ETS")
|
||||||
|
{
|
||||||
|
field(DESC, "Channel Status")
|
||||||
|
field(VAL, 0)
|
||||||
|
field(ZNAM, "OK")
|
||||||
|
field(ONAM, "CLEARING")
|
||||||
|
field(PINI, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
# Count Commands
|
||||||
|
|
||||||
|
record(longout,"$(INSTR)$(NAME):PRESET-COUNT")
|
||||||
|
{
|
||||||
|
field(DESC, "Count until preset reached")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) P_CNT")
|
||||||
|
field(VAL, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longout,"$(INSTR)$(NAME):PRESET-TIME")
|
||||||
|
{
|
||||||
|
field(DESC, "Count for specified time")
|
||||||
|
field(EGU, "seconds")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) P_TIME")
|
||||||
|
field(VAL, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
record(bo,"$(INSTR)$(NAME):PAUSE")
|
||||||
|
{
|
||||||
|
field(DESC, "Pause the current count")
|
||||||
|
field(VAL, "0")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(bo,"$(INSTR)$(NAME):CONTINUE")
|
||||||
|
{
|
||||||
|
field(DESC, "Continue with a count that was paused")
|
||||||
|
field(VAL, "0")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longout, "$(INSTR)$(NAME):STOP")
|
||||||
|
{
|
||||||
|
field(DESC, "Stop the current counting operation")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) STOP")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longout, "$(INSTR)$(NAME):MONITOR-CHANNEL")
|
||||||
|
{
|
||||||
|
field(DESC, "PRESET-COUNT Monitors this channel")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) MONITOR")
|
||||||
|
field(DRVL, "1") # Smallest Monitor Channel
|
||||||
|
field(DRVH, "$(CHANNELS)") # Largest Monitor Channel
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longin, "$(INSTR)$(NAME):MONITOR-CHANNEL_RBV")
|
||||||
|
{
|
||||||
|
field(DESC, "PRESET-COUNT Monitors this channel")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) MONITOR")
|
||||||
|
field(SCAN, ".2 second")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(ao,"$(INSTR)$(NAME):THRESHOLD")
|
||||||
|
{
|
||||||
|
field(DESC, "Minimum rate for counting to proceed")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH")
|
||||||
|
field(VAL, "1") # Default Rate
|
||||||
|
field(DRVL, "1") # Minimum Rate
|
||||||
|
field(DRVH, "100000") # Maximum Rate
|
||||||
|
}
|
||||||
|
|
||||||
|
record(ai,"$(INSTR)$(NAME):THRESHOLD_RBV")
|
||||||
|
{
|
||||||
|
field(DESC, "Minimum rate for counting to proceed")
|
||||||
|
field(EGU, "cts/sec")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH")
|
||||||
|
field(SCAN, "I/O Intr")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longout,"$(INSTR)$(NAME):THRESHOLD-MONITOR")
|
||||||
|
{
|
||||||
|
field(DESC, "Channel monitored for minimum rate")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH_CH")
|
||||||
|
field(VAL, "1") # Monitor
|
||||||
|
field(DRVL, "0") # Smallest Threshold Channel (0 is off)
|
||||||
|
field(DRVH, "$(CHANNELS)") # Largest Threshold Channel
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longin,"$(INSTR)$(NAME):THRESHOLD-MONITOR_RBV")
|
||||||
|
{
|
||||||
|
field(DESC, "Channel monitored for minimum rate")
|
||||||
|
field(EGU, "CH")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH_CH")
|
||||||
|
field(SCAN, "I/O Intr")
|
||||||
|
field(PINI, "YES")
|
||||||
|
}
|
||||||
|
|
||||||
|
record(longout, "$(INSTR)$(NAME):CT")
|
||||||
|
{
|
||||||
|
field(DESC, "Clear the timer")
|
||||||
|
field(DTYP, "asynInt32")
|
||||||
|
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) C_TIME")
|
||||||
|
field(FLNK, "$(INSTR)$(NAME):ETT")
|
||||||
|
}
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
# Read all monitors values
|
||||||
|
|
||||||
|
record(ai, "$(INSTR)$(NAME):ELAPSED-TIME")
|
||||||
|
{
|
||||||
|
field(DESC, "DAQ Measured Time")
|
||||||
|
field(EGU, "sec")
|
||||||
|
field(DTYP, "asynFloat64")
|
||||||
|
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) TIME")
|
||||||
|
# field(SCAN, "I/O Intr")
|
||||||
|
field(PINI, "YES")
|
||||||
|
# field(FLNK, "$(INSTR)$(NAME):ETO")
|
||||||
|
}
|
||||||
1
dep/flatbuffers
Submodule
1
dep/flatbuffers
Submodule
Submodule dep/flatbuffers added at 1872409707
1
dep/streaming-data-types
Submodule
1
dep/streaming-data-types
Submodule
Submodule dep/streaming-data-types added at 3b1830faf2
BIN
docs/20220608_ErwiN_Detector.pdf
Normal file
BIN
docs/20220608_ErwiN_Detector.pdf
Normal file
Binary file not shown.
BIN
docs/zeitelhack_atlanta_2017-CHARM.pdf
Normal file
BIN
docs/zeitelhack_atlanta_2017-CHARM.pdf
Normal file
Binary file not shown.
@@ -1,4 +0,0 @@
|
|||||||
confluent-kafka==2.12.1
|
|
||||||
ess-streaming-data-types==0.27.0
|
|
||||||
flatbuffers==25.9.23
|
|
||||||
numpy==1.26.3
|
|
||||||
9
scripts/ioc.sh
Executable file
9
scripts/ioc.sh
Executable file
@@ -0,0 +1,9 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
export EPICS_HOST_ARCH=linux-x86_64
|
||||||
|
export EPICS_BASE=/usr/local/epics/base-7.0.7
|
||||||
|
|
||||||
|
PARENT_PATH="$( cd "$(dirname "${BASH_SOURCE[0]}")" ; pwd -P )"
|
||||||
|
|
||||||
|
# /usr/local/bin/procServ -o -L - -f -i ^D^C 20001 "${PARENT_PATH}/st.cmd" -d
|
||||||
|
${PARENT_PATH}/st.cmd
|
||||||
40
scripts/st.cmd
Executable file
40
scripts/st.cmd
Executable file
@@ -0,0 +1,40 @@
|
|||||||
|
#!/usr/local/bin/iocsh
|
||||||
|
#-d
|
||||||
|
|
||||||
|
on error break
|
||||||
|
|
||||||
|
require StreamGenerator, test
|
||||||
|
|
||||||
|
epicsEnvSet("INSTR", "SQ:DMC-DAQ:")
|
||||||
|
epicsEnvSet("NAME", "SG")
|
||||||
|
|
||||||
|
# Local UDP Generator Test Config
|
||||||
|
drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 1)
|
||||||
|
|
||||||
|
# Correlation Unit Config
|
||||||
|
# drvAsynIPPortConfigure("ASYN_IP_PORT", "172.28.69.20:54321:54321 UDP", 0, 0, 1)
|
||||||
|
|
||||||
|
# With a udpQueue and sortQueue size of 10'000 packets, we can hold in memory
|
||||||
|
# 10'000 * 243 = 2.43e6 events
|
||||||
|
|
||||||
|
# Kafka Broker and Topic Configuration
|
||||||
|
# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "linkafka01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 10000, 20480)
|
||||||
|
# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "ess01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 10000, 20480)
|
||||||
|
|
||||||
|
# Don't send any kafka messages
|
||||||
|
asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "", "", "", 0, 0)
|
||||||
|
|
||||||
|
dbLoadRecords("$(StreamGenerator_DB)correlation_unit.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG")
|
||||||
|
|
||||||
|
dbLoadRecords("$(StreamGenerator_DB)daq_common.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNELS=5")
|
||||||
|
|
||||||
|
# Monitor Channels
|
||||||
|
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=1")
|
||||||
|
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=2")
|
||||||
|
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=3")
|
||||||
|
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=4")
|
||||||
|
|
||||||
|
# Detector Count Channel
|
||||||
|
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=5")
|
||||||
|
|
||||||
|
iocInit()
|
||||||
120
scripts/udp_gen.py
Normal file
120
scripts/udp_gen.py
Normal file
@@ -0,0 +1,120 @@
|
|||||||
|
import socket
|
||||||
|
import time
|
||||||
|
import random
|
||||||
|
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
|
||||||
|
header = [
|
||||||
|
0, 0, # buffer length in 16bit words (1, 0) == 1, (0, 1) == 256
|
||||||
|
0, 0x80, # buffer type (probably should be 0)
|
||||||
|
21, 0, # header length
|
||||||
|
0, 0, # buffer number
|
||||||
|
0, 0, # run id
|
||||||
|
0x3, # status
|
||||||
|
0, # id of sending module
|
||||||
|
0, 0, # timestamp low
|
||||||
|
0, 0, # timestamp mid
|
||||||
|
0, 0, # timestamp high
|
||||||
|
] + [0, 0] * 12 # parameters
|
||||||
|
|
||||||
|
data = [
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
0
|
||||||
|
]
|
||||||
|
|
||||||
|
start_time = time.time_ns() // 100
|
||||||
|
|
||||||
|
buffer_ids = {
|
||||||
|
i: (0, 0) for i in range(10)
|
||||||
|
}
|
||||||
|
|
||||||
|
while True:
|
||||||
|
|
||||||
|
# update timestamp
|
||||||
|
base_timestamp = time.time_ns() // 100 - start_time
|
||||||
|
t_low = base_timestamp & 0xffff
|
||||||
|
t_mid = (base_timestamp >> 16) & 0xffff
|
||||||
|
t_high = (base_timestamp >> 32) & 0xffff
|
||||||
|
header[12] = t_low & 0xff
|
||||||
|
header[13] = t_low >> 8
|
||||||
|
header[14] = t_mid & 0xff
|
||||||
|
header[15] = t_mid >> 8
|
||||||
|
header[16] = t_high & 0xff
|
||||||
|
header[17] = t_high >> 8
|
||||||
|
|
||||||
|
num_events = random.randint(0, 243)
|
||||||
|
# num_events = 243
|
||||||
|
# num_events = 1
|
||||||
|
|
||||||
|
# update buffer length
|
||||||
|
buffer_length = 21 + num_events * 3
|
||||||
|
header[0] = buffer_length & 0xff
|
||||||
|
header[1] = (buffer_length >> 8) & 0xff
|
||||||
|
|
||||||
|
# I believe, that in our case we never mix monitor and detector events as
|
||||||
|
# the monitors should have id 0 and the detector events 1-9 so I have
|
||||||
|
# excluded that posibility here. That would, however, if true mean we could
|
||||||
|
# reduce also the number of checks on the parsing side of things...
|
||||||
|
|
||||||
|
is_monitor = random.randint(0, 9)
|
||||||
|
# is_monitor = 4
|
||||||
|
|
||||||
|
header[11] = 0 if is_monitor > 3 else random.randint(1,9)
|
||||||
|
|
||||||
|
# update buffer number (each mcpdid has its own buffer number count)
|
||||||
|
header[6], header[7] = buffer_ids[header[11]]
|
||||||
|
header[6] = (header[6] + 1) % (0xff + 1)
|
||||||
|
header[7] = (header[7] + (header[6] == 0)) % (0xff + 1)
|
||||||
|
buffer_ids[header[11]] = header[6], header[7]
|
||||||
|
|
||||||
|
tosend = []
|
||||||
|
|
||||||
|
if is_monitor > 3:
|
||||||
|
|
||||||
|
for i in range(num_events):
|
||||||
|
d = list(data)
|
||||||
|
|
||||||
|
monitor = random.randint(0,3)
|
||||||
|
# monitor = 0
|
||||||
|
|
||||||
|
d[5] = (1 << 7) | monitor
|
||||||
|
|
||||||
|
# update trigger timestamp
|
||||||
|
event_timestamp = (time.time_ns() // 100) - base_timestamp
|
||||||
|
d[0] = event_timestamp & 0xff
|
||||||
|
d[1] = (event_timestamp >> 8) & 0xff
|
||||||
|
d[2] = (event_timestamp >> 16) & 0x07
|
||||||
|
|
||||||
|
# completely reversed sorting
|
||||||
|
tosend = d + tosend
|
||||||
|
|
||||||
|
else:
|
||||||
|
|
||||||
|
for i in range(num_events):
|
||||||
|
d = list(data)
|
||||||
|
|
||||||
|
amplitude = random.randint(0, 255)
|
||||||
|
x_pos = random.randint(0, 1023)
|
||||||
|
y_pos = random.randint(0, 1023)
|
||||||
|
event_timestamp = (time.time_ns() // 100) - base_timestamp
|
||||||
|
|
||||||
|
d[5] = (0 << 7) | (amplitude >> 1)
|
||||||
|
d[4] = ((amplitude & 0x01) << 7) | (y_pos >> 3)
|
||||||
|
d[3] = ((y_pos << 5) & 0xE0) | (x_pos >> 5)
|
||||||
|
d[2] = ((x_pos << 3) & 0xF8)
|
||||||
|
|
||||||
|
d[0] = event_timestamp & 0xff
|
||||||
|
d[1] = (event_timestamp >> 8) & 0xff
|
||||||
|
d[2] |= (event_timestamp >> 16) & 0x07
|
||||||
|
|
||||||
|
# completely reversed sorting
|
||||||
|
tosend = d + tosend
|
||||||
|
|
||||||
|
sock.sendto(bytes(header + tosend), ('127.0.0.1', 54321))
|
||||||
|
mv = memoryview(bytes(header)).cast('H')
|
||||||
|
print(f'Sent packet {mv[3]} with {num_events} events {base_timestamp}')
|
||||||
|
# time.sleep(.1)
|
||||||
1049
src/asynStreamGeneratorDriver.cpp
Normal file
1049
src/asynStreamGeneratorDriver.cpp
Normal file
File diff suppressed because it is too large
Load Diff
1
src/asynStreamGeneratorDriver.dbd
Normal file
1
src/asynStreamGeneratorDriver.dbd
Normal file
@@ -0,0 +1 @@
|
|||||||
|
registrar("asynStreamGeneratorDriverRegister")
|
||||||
234
src/asynStreamGeneratorDriver.h
Normal file
234
src/asynStreamGeneratorDriver.h
Normal file
@@ -0,0 +1,234 @@
|
|||||||
|
#ifndef asynStreamGeneratorDriver_H
|
||||||
|
#define asynStreamGeneratorDriver_H
|
||||||
|
|
||||||
|
#include "asynPortDriver.h"
|
||||||
|
#include <epicsRingBytes.h>
|
||||||
|
#include <librdkafka/rdkafka.h>
|
||||||
|
|
||||||
|
// Just for printing
|
||||||
|
#define __STDC_FORMAT_MACROS
|
||||||
|
#include <inttypes.h>
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
* UDP Packet Definitions
|
||||||
|
*/
|
||||||
|
enum class CommandId : std::uint16_t {
|
||||||
|
reset = 0,
|
||||||
|
start = 1,
|
||||||
|
stop = 2,
|
||||||
|
cont = 3
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO these headers are actually the same, but with different bodies.
|
||||||
|
struct __attribute__((__packed__)) CommandHeader {
|
||||||
|
uint16_t BufferLength;
|
||||||
|
uint16_t BufferType;
|
||||||
|
uint16_t HeaderLength;
|
||||||
|
uint16_t BufferNumber;
|
||||||
|
uint16_t Command;
|
||||||
|
uint16_t Status : 8;
|
||||||
|
uint16_t McpdID : 8;
|
||||||
|
uint16_t TimeStamp[3];
|
||||||
|
uint16_t Checksum;
|
||||||
|
uint16_t Finalizer;
|
||||||
|
|
||||||
|
CommandHeader(const CommandId commandId)
|
||||||
|
: BufferLength(10), BufferType(0x8000), HeaderLength(10),
|
||||||
|
BufferNumber(0),
|
||||||
|
Command(
|
||||||
|
static_cast<std::underlying_type<CommandId>::type>(commandId)),
|
||||||
|
Status(0), McpdID(0), TimeStamp{0, 0, 0}, Checksum(0),
|
||||||
|
Finalizer(0xffff) {
|
||||||
|
|
||||||
|
uint16_t checksum = 0x0000;
|
||||||
|
for (std::size_t i = 0; i < BufferLength; ++i)
|
||||||
|
checksum ^= ((uint16_t *)this)[i];
|
||||||
|
Checksum = checksum;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct __attribute__((__packed__)) DataHeader {
|
||||||
|
uint16_t BufferLength;
|
||||||
|
uint16_t BufferType;
|
||||||
|
uint16_t HeaderLength;
|
||||||
|
uint16_t BufferNumber;
|
||||||
|
uint16_t RunCmdID;
|
||||||
|
uint16_t Status : 8;
|
||||||
|
uint16_t McpdID : 8;
|
||||||
|
uint16_t TimeStamp[3];
|
||||||
|
uint16_t Parameter0[3];
|
||||||
|
uint16_t Parameter1[3];
|
||||||
|
uint16_t Parameter2[3];
|
||||||
|
uint16_t Parameter3[3];
|
||||||
|
|
||||||
|
inline uint64_t nanosecs() const {
|
||||||
|
uint64_t nsec{((uint64_t)TimeStamp[2]) << 32 |
|
||||||
|
((uint64_t)TimeStamp[1]) << 16 | (uint64_t)TimeStamp[0]};
|
||||||
|
return nsec * 100;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct __attribute__((__packed__)) DetectorEvent {
|
||||||
|
uint64_t TimeStamp : 19;
|
||||||
|
uint16_t XPosition : 10;
|
||||||
|
uint16_t YPosition : 10;
|
||||||
|
uint16_t Amplitude : 8;
|
||||||
|
uint16_t Id : 1;
|
||||||
|
inline uint32_t nanosecs() const { return TimeStamp * 100; }
|
||||||
|
inline uint32_t pixelId(uint32_t mpcdId) const {
|
||||||
|
const uint32_t x_pixels = 128;
|
||||||
|
const uint32_t y_pixels = 128;
|
||||||
|
return (mpcdId - 1) * x_pixels * y_pixels +
|
||||||
|
x_pixels * (uint32_t)(this->XPosition >> 3) +
|
||||||
|
(uint32_t)(this->YPosition >> 3);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct __attribute__((__packed__)) MonitorEvent {
|
||||||
|
uint64_t TimeStamp : 19;
|
||||||
|
uint64_t Data : 21;
|
||||||
|
uint64_t DataID : 4;
|
||||||
|
uint64_t TriggerID : 3;
|
||||||
|
uint64_t Id : 1;
|
||||||
|
inline uint32_t nanosecs() const { return TimeStamp * 100; }
|
||||||
|
};
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
* Simplified Event Struct Definition
|
||||||
|
*/
|
||||||
|
|
||||||
|
struct __attribute__((__packed__)) NormalisedEvent {
|
||||||
|
uint64_t timestamp;
|
||||||
|
uint32_t pixelId : 24;
|
||||||
|
uint8_t source;
|
||||||
|
|
||||||
|
// inline NormalisedEvent(uint64_t timestamp, uint8_t source, uint32_t
|
||||||
|
// pixelId)
|
||||||
|
// : timestamp(timestamp), source(source), pixelId(pixelId){};
|
||||||
|
};
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
* Status values that should match the definition in db/daq_common.db
|
||||||
|
*/
|
||||||
|
#define STATUS_IDLE 0
|
||||||
|
#define STATUS_COUNTING 1
|
||||||
|
#define STATUS_LOWRATE 2
|
||||||
|
#define STATUS_PAUSED 3
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
* Parameters for use in DB records
|
||||||
|
*
|
||||||
|
* i.e.e drvInfo strings that are used to identify the parameters
|
||||||
|
*/
|
||||||
|
|
||||||
|
constexpr static char P_EnableElectronicsString[]{"EN_EL"};
|
||||||
|
constexpr static char P_EnableElectronicsRBVString[]{"EN_EL_RBV"};
|
||||||
|
|
||||||
|
constexpr static char P_StatusString[]{"STATUS"};
|
||||||
|
constexpr static char P_ResetString[]{"RESET"};
|
||||||
|
constexpr static char P_StopString[]{"STOP"};
|
||||||
|
constexpr static char P_CountPresetString[]{"P_CNT"};
|
||||||
|
constexpr static char P_TimePresetString[]{"P_TIME"};
|
||||||
|
constexpr static char P_ElapsedTimeString[]{"TIME"};
|
||||||
|
constexpr static char P_ClearElapsedTimeString[]{"C_TIME"};
|
||||||
|
constexpr static char P_MonitorChannelString[]{"MONITOR"};
|
||||||
|
constexpr static char P_ThresholdString[]{"THRESH"};
|
||||||
|
constexpr static char P_ThresholdChannelString[]{"THRESH_CH"};
|
||||||
|
|
||||||
|
constexpr static char P_CountsString[]{"COUNTS%" PRIu64};
|
||||||
|
constexpr static char P_RateString[]{"RATE%" PRIu64};
|
||||||
|
constexpr static char P_ClearCountsString[]{"C_%" PRIu64};
|
||||||
|
|
||||||
|
constexpr static char P_UdpDroppedString[]{"DROP"};
|
||||||
|
constexpr static char P_UdpQueueHighWaterMarkString[]{"UDP"};
|
||||||
|
constexpr static char P_NormalisedQueueHighWaterMarkString[]{"NORM"};
|
||||||
|
constexpr static char P_SortedQueueHighWaterMarkString[]{"SORT"};
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
* Stream Generator Coordinating Class
|
||||||
|
*/
|
||||||
|
class asynStreamGeneratorDriver : public asynPortDriver {
|
||||||
|
public:
|
||||||
|
asynStreamGeneratorDriver(const char *portName, const char *ipPortName,
|
||||||
|
const int numChannels, const int udpQueueSize,
|
||||||
|
const bool enableKafkaStream,
|
||||||
|
const char *kafkaBroker, const char *monitorTopic,
|
||||||
|
const char *detectorTopic,
|
||||||
|
const int kafkaQueueSize,
|
||||||
|
const int kafkaMaxPacketSize);
|
||||||
|
virtual ~asynStreamGeneratorDriver();
|
||||||
|
|
||||||
|
virtual asynStatus readInt32(asynUser *pasynUser, epicsInt32 *value);
|
||||||
|
virtual asynStatus writeInt32(asynUser *pasynUser, epicsInt32 value);
|
||||||
|
|
||||||
|
void receiveUDP();
|
||||||
|
void normaliseUDP();
|
||||||
|
void partialSortEvents();
|
||||||
|
void processEvents();
|
||||||
|
void produceMonitor();
|
||||||
|
void produceDetector();
|
||||||
|
|
||||||
|
protected:
|
||||||
|
// Parameter Identifying IDs
|
||||||
|
int P_EnableElectronics;
|
||||||
|
int P_EnableElectronicsRBV;
|
||||||
|
int P_Status;
|
||||||
|
int P_Reset;
|
||||||
|
int P_Stop;
|
||||||
|
int P_CountPreset;
|
||||||
|
int P_TimePreset;
|
||||||
|
int P_ElapsedTime;
|
||||||
|
int P_ClearElapsedTime;
|
||||||
|
int P_MonitorChannel;
|
||||||
|
int P_Threshold;
|
||||||
|
int P_ThresholdChannel;
|
||||||
|
int *P_Counts;
|
||||||
|
int *P_Rates;
|
||||||
|
int *P_ClearCounts;
|
||||||
|
|
||||||
|
// System Status Parameter Identifying IDs
|
||||||
|
int P_UdpDropped;
|
||||||
|
int P_UdpQueueHighWaterMark;
|
||||||
|
int P_NormalisedQueueHighWaterMark;
|
||||||
|
int P_SortedQueueHighWaterMark;
|
||||||
|
|
||||||
|
private:
|
||||||
|
asynUser *pasynUDPUser;
|
||||||
|
epicsEventId pausedEventId;
|
||||||
|
|
||||||
|
const std::size_t num_channels;
|
||||||
|
const bool kafkaEnabled;
|
||||||
|
const int kafkaQueueSize;
|
||||||
|
const int kafkaMaxPacketSize;
|
||||||
|
|
||||||
|
const int udpQueueSize;
|
||||||
|
epicsRingBytesId udpQueue;
|
||||||
|
epicsRingBytesId normalisedQueue;
|
||||||
|
epicsRingBytesId sortedQueue;
|
||||||
|
|
||||||
|
epicsRingBytesId monitorQueue;
|
||||||
|
rd_kafka_t *monitorProducer;
|
||||||
|
const std::string monitorTopic;
|
||||||
|
|
||||||
|
epicsRingBytesId detectorQueue;
|
||||||
|
rd_kafka_t *detectorProducer;
|
||||||
|
const std::string detectorTopic;
|
||||||
|
|
||||||
|
static constexpr char driverName[]{"StreamGenerator"};
|
||||||
|
|
||||||
|
asynStatus createInt32Param(asynStatus status, const char *name,
|
||||||
|
int *variable, epicsInt32 initialValue = 0);
|
||||||
|
|
||||||
|
asynStatus createInt64Param(asynStatus status, const char *name,
|
||||||
|
int *variable, epicsInt64 initialValue = 0);
|
||||||
|
|
||||||
|
asynStatus createFloat64Param(asynStatus status, const char *name,
|
||||||
|
int *variable, double initialValue = 0);
|
||||||
|
|
||||||
|
inline void queueForKafka(NormalisedEvent &ne);
|
||||||
|
|
||||||
|
void produce(epicsRingBytesId eventQueue, rd_kafka_t *kafkaProducer,
|
||||||
|
const char *topic, const char *source);
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif
|
||||||
342
udp_rate.py
342
udp_rate.py
@@ -1,342 +0,0 @@
|
|||||||
import queue
|
|
||||||
import socket
|
|
||||||
import time
|
|
||||||
import threading
|
|
||||||
from uuid import uuid4
|
|
||||||
import math
|
|
||||||
|
|
||||||
from confluent_kafka import Producer
|
|
||||||
import streaming_data_types
|
|
||||||
|
|
||||||
# receiving directly (can also specify correlation unit ip)
|
|
||||||
UDP_IP = ""
|
|
||||||
UDP_PORT = 54321
|
|
||||||
|
|
||||||
# If redirecting traffic via
|
|
||||||
# socat -U - udp4-recv:54321 | tee >( socat -u - udp4-datagram:127.0.0.1:54322 ) | socat -u - udp4-datagram:127.0.0.1:54323
|
|
||||||
# UDP_IP = "127.0.0.1"
|
|
||||||
# UDP_PORT = 54323
|
|
||||||
|
|
||||||
WINDOWSECONDS = 5
|
|
||||||
WINDOWSIZE = 20000 * WINDOWSECONDS
|
|
||||||
MONITORS = 4 # We have max 4 monitors
|
|
||||||
|
|
||||||
time_offset = None # Estimate of clock offset
|
|
||||||
|
|
||||||
time_window = {
|
|
||||||
i: queue.Queue(maxsize=WINDOWSIZE)
|
|
||||||
for i in range(MONITORS)
|
|
||||||
}
|
|
||||||
|
|
||||||
# event_time_window = queue.Queue(maxsize=50000 * WINDOWSECONDS)
|
|
||||||
EVENT_WINDOWSIZE = 50000
|
|
||||||
EVENT_WINDOW_PTR = 0
|
|
||||||
event_time_window = [0 for i in range(EVENT_WINDOWSIZE)]
|
|
||||||
|
|
||||||
event_average_rate = 0
|
|
||||||
event_last_timestamp = None
|
|
||||||
|
|
||||||
MISSED_PACKETS = -9 # All modules appear to miss the first time due to initialisation as 0
|
|
||||||
|
|
||||||
# missed_packets_time_window = queue.Queue(maxsize=100)
|
|
||||||
|
|
||||||
def print_monitor_rates():
|
|
||||||
while True:
|
|
||||||
for i in range(MONITORS):
|
|
||||||
msg = f"Monitor {i+1}: {time_window[i].qsize() / WINDOWSECONDS} cts/s"
|
|
||||||
try:
|
|
||||||
earliest = time_window[i].queue[0]
|
|
||||||
newest = max(time_window[i].queue)
|
|
||||||
t = time.time()
|
|
||||||
msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s'
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
print(msg)
|
|
||||||
|
|
||||||
# try:
|
|
||||||
# print(f'Events: {1 / event_average_rate} cts/s')
|
|
||||||
# except:
|
|
||||||
# pass
|
|
||||||
|
|
||||||
try:
|
|
||||||
print(f'Events: {round(1 / (sum(event_time_window) / EVENT_WINDOWSIZE * 1e-7), 2)} cts/s')
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
print(f'Missed Packets: {MISSED_PACKETS}')
|
|
||||||
|
|
||||||
# Detector Events
|
|
||||||
# msg = f"Events : {event_time_window.qsize() / WINDOWSECONDS} cts/s"
|
|
||||||
# try:
|
|
||||||
# earliest = event_time_window.queue[0]
|
|
||||||
# newest = max(event_time_window.queue)
|
|
||||||
# t = time.time()
|
|
||||||
# msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s'
|
|
||||||
# except:
|
|
||||||
# pass
|
|
||||||
|
|
||||||
# print(msg)
|
|
||||||
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
threading.Thread(target=print_monitor_rates, daemon=True).start()
|
|
||||||
|
|
||||||
def clean_monitor_rates():
|
|
||||||
latest = 0
|
|
||||||
while True:
|
|
||||||
for d_id in range(MONITORS):
|
|
||||||
t_w = time_window[d_id]
|
|
||||||
if not t_w.empty():
|
|
||||||
# TODO probably should switch to a priority queue
|
|
||||||
# as the messages might not be in order
|
|
||||||
# TODO could also just replace with a low-pass filter
|
|
||||||
# would be a lot more efficient
|
|
||||||
# TODO the way this is done, we need trigger events
|
|
||||||
# in order for the signal to decay back to 0.
|
|
||||||
# If no events come, the rate remains stuck
|
|
||||||
latest = max(latest, max(t_w.queue))
|
|
||||||
# latest = time_window[1].queue[-1]
|
|
||||||
try:
|
|
||||||
while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7):
|
|
||||||
t_w.get_nowait()
|
|
||||||
except IndexError:
|
|
||||||
pass
|
|
||||||
time.sleep(0.01)
|
|
||||||
|
|
||||||
threading.Thread(target=clean_monitor_rates, daemon=True).start()
|
|
||||||
|
|
||||||
|
|
||||||
# def clean_event_rates():
|
|
||||||
# latest = 0
|
|
||||||
# while True:
|
|
||||||
# t_w = event_time_window
|
|
||||||
# if not t_w.empty():
|
|
||||||
# # TODO probably should switch to a priority queue
|
|
||||||
# # as the messages might not be in order
|
|
||||||
# # TODO could also just replace with a low-pass filter
|
|
||||||
# # would be a lot more efficient
|
|
||||||
# # TODO the way this is done, we need trigger events
|
|
||||||
# # in order for the signal to decay back to 0.
|
|
||||||
# # If no events come, the rate remains stuck
|
|
||||||
# #latest = max(latest, max(t_w.queue))
|
|
||||||
# try:
|
|
||||||
# latest = time_window[1].queue[-1]
|
|
||||||
# while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7):
|
|
||||||
# t_w.get_nowait()
|
|
||||||
# except IndexError:
|
|
||||||
# pass
|
|
||||||
# time.sleep(0.005)
|
|
||||||
#
|
|
||||||
# threading.Thread(target=clean_event_rates, daemon=True).start()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Event Kafka Producer
|
|
||||||
|
|
||||||
event_queue = queue.Queue()
|
|
||||||
|
|
||||||
def event_producer():
|
|
||||||
producer_config = {
|
|
||||||
'bootstrap.servers': "linkafka01:9092",
|
|
||||||
'queue.buffering.max.messages': 1e7,
|
|
||||||
}
|
|
||||||
prod = Producer(producer_config)
|
|
||||||
|
|
||||||
st = time.time()
|
|
||||||
|
|
||||||
msg_id = 0
|
|
||||||
|
|
||||||
b_size = 10000
|
|
||||||
b_ptr = 0
|
|
||||||
pixel_buffer = [0 for _ in range(b_size)]
|
|
||||||
time_buffer = [0 for _ in range(b_size)]
|
|
||||||
poll_cnt = 0
|
|
||||||
|
|
||||||
while True:
|
|
||||||
(p_id, timestamp) = event_queue.get()
|
|
||||||
|
|
||||||
pixel_buffer[b_ptr] = p_id
|
|
||||||
time_buffer[b_ptr] = timestamp
|
|
||||||
b_ptr += 1
|
|
||||||
|
|
||||||
nt = time.time()
|
|
||||||
if b_ptr == b_size or nt - st > 0.001:
|
|
||||||
st = nt
|
|
||||||
|
|
||||||
if b_ptr > 0:
|
|
||||||
message = streaming_data_types.serialise_ev42(
|
|
||||||
message_id = msg_id,
|
|
||||||
pulse_time = time_buffer[0] * 100, # int(time.time() * 1_000_000_000),
|
|
||||||
time_of_flight = time_buffer[0:b_ptr],
|
|
||||||
detector_id = pixel_buffer[0:b_ptr],
|
|
||||||
source_name = '',
|
|
||||||
)
|
|
||||||
|
|
||||||
msg_id = (msg_id + 1) % 100000000
|
|
||||||
b_ptr = 0
|
|
||||||
|
|
||||||
prod.produce(
|
|
||||||
topic = "DMC_detector",
|
|
||||||
value = message,
|
|
||||||
partition = 0,
|
|
||||||
)
|
|
||||||
|
|
||||||
# if poll_cnt % 1000 == 0:
|
|
||||||
prod.poll(0)
|
|
||||||
poll_cnt = (poll_cnt + 1) % 1000
|
|
||||||
|
|
||||||
threading.Thread(target=event_producer, daemon=True).start()
|
|
||||||
|
|
||||||
# Monitor Kafka Producer
|
|
||||||
|
|
||||||
monitor_queue = queue.Queue()
|
|
||||||
|
|
||||||
def monitor_producer():
|
|
||||||
producer_config = {
|
|
||||||
'bootstrap.servers': "linkafka01:9092",
|
|
||||||
'queue.buffering.max.messages': 1e7,
|
|
||||||
}
|
|
||||||
prod = Producer(producer_config)
|
|
||||||
|
|
||||||
monitor_buffer = [0 for i in range(MONITORS)]
|
|
||||||
monitor_time = [0 for i in range(MONITORS)]
|
|
||||||
|
|
||||||
st = time.time()
|
|
||||||
|
|
||||||
poll_cnt = 0
|
|
||||||
|
|
||||||
while True:
|
|
||||||
(d_id, timestamp) = monitor_queue.get()
|
|
||||||
|
|
||||||
monitor_buffer[d_id] += 1
|
|
||||||
monitor_time[d_id] = timestamp
|
|
||||||
|
|
||||||
nt = time.time()
|
|
||||||
if nt - st > 0.05:
|
|
||||||
st = nt
|
|
||||||
|
|
||||||
for i in range(MONITORS):
|
|
||||||
if monitor_buffer[d_id]:
|
|
||||||
message = streaming_data_types.serialise_f142(
|
|
||||||
source_name = f"monitor{d_id+1}",
|
|
||||||
value = monitor_buffer[d_id],
|
|
||||||
# ns resolution (supposed to be past epoch, not what the detector returns though)
|
|
||||||
timestamp_unix_ns = monitor_time[d_id] * 100 # send time of last monitor
|
|
||||||
)
|
|
||||||
|
|
||||||
prod.produce(
|
|
||||||
topic = "DMC_neutron_monitor",
|
|
||||||
value = message,
|
|
||||||
partition = 0,
|
|
||||||
)
|
|
||||||
|
|
||||||
monitor_buffer[d_id] = 0
|
|
||||||
|
|
||||||
if poll_cnt % 1000 == 0:
|
|
||||||
prod.poll(0)
|
|
||||||
poll_cnt = (poll_cnt + 1) % 1000
|
|
||||||
|
|
||||||
threading.Thread(target=monitor_producer, daemon=True).start()
|
|
||||||
|
|
||||||
|
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
||||||
sock.bind((UDP_IP, UDP_PORT))
|
|
||||||
|
|
||||||
val = 0
|
|
||||||
start_time = time.time()
|
|
||||||
|
|
||||||
module_counts = [0 for i in range(10)]
|
|
||||||
|
|
||||||
EVENTS = 0
|
|
||||||
|
|
||||||
while True:
|
|
||||||
data, addr = sock.recvfrom(2056) # Buffer size is 1024 bytes
|
|
||||||
raw_header = data[:42]
|
|
||||||
raw_data = data[42:]
|
|
||||||
|
|
||||||
(buffer_length, buffer_type, header_length,
|
|
||||||
buffer_number, run_id, mcpd_status,
|
|
||||||
t_low, t_mid, t_high, *_) = memoryview(raw_header).cast('H')
|
|
||||||
mcpd_id = ( mcpd_status >> 8 ) & 0xff
|
|
||||||
mcpd_status = ( mcpd_status ) & 0x3
|
|
||||||
running_msg = "running" if (mcpd_status & 0x1) else "stopped"
|
|
||||||
sync_msg = "in sync" if (mcpd_status & 0x2) else "sync error"
|
|
||||||
timestamp = ( t_high << 32 ) | ( t_mid << 16 ) | t_low # 100 ns resolution
|
|
||||||
#print(f'Packet {int(timestamp * 1e-7)}s => buffer: {buffer_number}, length: {int(buffer_length*2/6)} events, status: {mcpd_status} {mcpd_id} {running_msg} with {sync_msg}')
|
|
||||||
# print(f'Packet => buffer: {mcpd_id}-{buffer_number}, length: {int((buffer_length-21)/3)} events, status: {mcpd_status}')
|
|
||||||
|
|
||||||
if time_offset is None:
|
|
||||||
time_offset = time.time() * 1e7 - timestamp
|
|
||||||
|
|
||||||
if buffer_number - module_counts[mcpd_id] != 1:
|
|
||||||
MISSED_PACKETS += 1
|
|
||||||
# if missed_packets_time_window.full():
|
|
||||||
# missed_packets_time_window.get_nowait()
|
|
||||||
# missed_packets_time_window.put(timestamp)
|
|
||||||
|
|
||||||
module_counts[mcpd_id] = buffer_number
|
|
||||||
|
|
||||||
for i in range(0, len(raw_data), 6):
|
|
||||||
event = memoryview(raw_data)[i:i+6]
|
|
||||||
event_type = event[5] >> 7
|
|
||||||
# print(event_type)
|
|
||||||
|
|
||||||
if event_type: # Trigger Event
|
|
||||||
t_id = ( event[5] >> 4 ) & 0x7
|
|
||||||
d_id = event[5] & 0xf
|
|
||||||
event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0]
|
|
||||||
# print(f'Trigger event {event_timestamp * 1e-7}s => TrigID: {t_id}, DataID: {d_id}')
|
|
||||||
|
|
||||||
t_w = time_window[d_id]
|
|
||||||
t_w.put_nowait(event_timestamp)
|
|
||||||
|
|
||||||
monitor_queue.put_nowait((d_id, event_timestamp))
|
|
||||||
|
|
||||||
else: # Neutron Event
|
|
||||||
x_pixels = 128
|
|
||||||
y_pixels = 128
|
|
||||||
amplitude = ( event[5] << 1 ) | ( event[4] >> 7 )
|
|
||||||
|
|
||||||
# The DMC StreamHistogrammer setup currently expects each module to
|
|
||||||
# be 128 * 128 pixels but the resolution in the packages is
|
|
||||||
# actually 10bit. We remove the lowest 3 bits.
|
|
||||||
x = (( (event[3] & 0x1f) << 5 | (event[2] & 0xf8) >> 3 ) & 0x3ff) >> 3
|
|
||||||
y = (( (event[4] & 0x7f) << 3 | (event[3] & 0xe0) >> 5 ) & 0x3ff) >> 3
|
|
||||||
event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0]
|
|
||||||
# print(f'Neutron event {event_timestamp * 1e-7}s: {amplitude}, x: {x}, y: {y}')
|
|
||||||
|
|
||||||
|
|
||||||
if event_last_timestamp is None:
|
|
||||||
event_last_timestamp = event_timestamp
|
|
||||||
|
|
||||||
# Seems like at higher frequencies these come very much out of order
|
|
||||||
# so this is very approximate
|
|
||||||
event_time_window[EVENT_WINDOW_PTR] = event_timestamp - event_last_timestamp
|
|
||||||
EVENT_WINDOW_PTR = (EVENT_WINDOW_PTR + 1) % EVENT_WINDOWSIZE
|
|
||||||
event_last_timestamp = event_timestamp
|
|
||||||
|
|
||||||
# I suppose this doesn't work mostly due to the timestamps ordering...
|
|
||||||
# event_timestamp_seconds = event_timestamp * 1e-7
|
|
||||||
# if event_last_timestamp is None:
|
|
||||||
# event_last_timestamp = event_timestamp_seconds
|
|
||||||
|
|
||||||
# f_cutoff = 1e6 # Hz
|
|
||||||
# tau = 1 / ( 2 * math.pi * f_cutoff)
|
|
||||||
# dt = event_timestamp_seconds - event_last_timestamp
|
|
||||||
# if dt > 0:
|
|
||||||
# w = math.exp(-dt / tau)
|
|
||||||
# event_average_rate = w * dt + event_average_rate * (1 - w)
|
|
||||||
# event_last_timestamp = event_timestamp_seconds
|
|
||||||
|
|
||||||
# EVENTS += 1
|
|
||||||
# a = (mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y
|
|
||||||
# print((EVENTS, x, y, a, a < 128 * 128 * 9, mcpd_id))
|
|
||||||
# if not a < 128 * 128 * 9:
|
|
||||||
# print((event[3], event[3] << 5, event[2], event[2] >> 3))
|
|
||||||
|
|
||||||
event_queue.put_nowait((
|
|
||||||
(mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y,
|
|
||||||
event_timestamp
|
|
||||||
))
|
|
||||||
Reference in New Issue
Block a user