58 Commits
0.0.1 ... 1.0.0

Author SHA1 Message Date
aa00966599 sets precision for rates
Some checks failed
Test And Build / Build (push) Failing after 8s
Test And Build / Lint (push) Failing after 8s
2025-11-20 12:31:25 +01:00
8ae9cb0bd8 adds rate calculation for each channel
Some checks failed
Test And Build / Lint (push) Failing after 2s
Test And Build / Build (push) Failing after 2s
2025-11-20 12:16:52 +01:00
31aa9d246a bugfix
Some checks failed
Test And Build / Lint (push) Failing after 2s
Test And Build / Build (push) Failing after 2s
2025-11-19 19:27:05 +01:00
f169076f65 heck
Some checks failed
Test And Build / Lint (push) Failing after 2s
Test And Build / Build (push) Failing after 2s
2025-11-19 18:33:26 +01:00
fdbb8f5061 saving state 2025-11-19 16:16:35 +01:00
668dd65823 fixes pausing issue due to logic bug
Some checks failed
Test And Build / Build (push) Failing after 2s
Test And Build / Lint (push) Successful in 3s
2025-11-19 15:08:59 +01:00
5b65a01e51 can readback correlation unit status
Some checks failed
Test And Build / Build (push) Failing after 2s
Test And Build / Lint (push) Successful in 2s
2025-11-19 14:08:06 +01:00
594bb6d320 comment on max starting of count frequency
Some checks failed
Test And Build / Lint (push) Failing after 2s
Test And Build / Build (push) Failing after 2s
2025-11-19 12:02:12 +01:00
7865273707 improving synchronisation issues
Some checks failed
Test And Build / Lint (push) Failing after 4s
Test And Build / Build (push) Failing after 5s
2025-11-19 10:04:22 +01:00
2ede400791 forgot to map down from 10bit
Some checks failed
Test And Build / Build (push) Failing after 2s
Test And Build / Lint (push) Successful in 2s
2025-11-18 16:49:37 +01:00
a13c5b81e2 woops, strings got deleted
Some checks failed
Test And Build / Build (push) Failing after 2s
Test And Build / Lint (push) Successful in 2s
2025-11-18 14:27:41 +01:00
66792837a6 adds PV for turning Electornics on and off
Some checks failed
Test And Build / Lint (push) Successful in 2s
Test And Build / Build (push) Failing after 2s
2025-11-18 14:00:26 +01:00
c563b07fed bugfix resetting elapsed-time
Some checks failed
Test And Build / Build (push) Failing after 2s
Test And Build / Lint (push) Successful in 3s
2025-11-18 11:52:25 +01:00
dc5244bc43 this sometimes causes errors in the log having the callbacks, but they seem necessary
Some checks failed
Test And Build / Build (push) Failing after 2s
Test And Build / Lint (push) Successful in 2s
2025-11-18 08:49:29 +01:00
2f60ac2a24 bugfix
Some checks failed
Test And Build / Build (push) Failing after 2s
Test And Build / Lint (push) Successful in 2s
2025-11-18 08:47:57 +01:00
d7a4d057aa makes detector channel the last channel
Some checks failed
Test And Build / Build (push) Failing after 2s
Test And Build / Lint (push) Successful in 3s
2025-11-18 07:58:02 +01:00
0819c5fb12 cppcheck
Some checks failed
Test And Build / Build (push) Failing after 2s
Test And Build / Lint (push) Successful in 3s
2025-11-17 12:05:47 +01:00
a7c5f9413b suppress warning
Some checks failed
Test And Build / Build (push) Failing after 2s
Test And Build / Lint (push) Failing after 2s
2025-11-17 11:56:59 +01:00
205eedbd88 tries to add a workflow
Some checks failed
Test And Build / Build (push) Failing after 2s
Test And Build / Lint (push) Failing after 3s
2025-11-17 11:50:29 +01:00
d80155ef7d fix warnings 2025-11-17 11:33:02 +01:00
ba3c3b5208 correct arch_filter 2025-11-14 14:17:13 +01:00
5ffd784769 corrects clone link 2025-11-14 14:15:24 +01:00
9bfaabdd99 Merge pull request 'less_logic_in_udp_thread' (#1) from less_logic_in_udp_thread into main
Reviewed-on: #1
2025-11-14 14:11:17 +01:00
9d93238db4 change counts to 64 bit integer, improve broken packet check, correct order of updating status 2025-11-14 14:07:54 +01:00
c530de3566 does removing all logic in the udp receive thread help to improve the packet receive frequency? 2025-11-07 16:14:05 +01:00
ba07a8af9b shows queue usage as a percentage 2025-11-07 14:28:01 +01:00
77ed74a203 returns elapsed time as a double 2025-11-07 14:05:37 +01:00
8f8b78a9bf adds a udp config that works with the correlation unit 2025-11-07 13:20:15 +01:00
6faf23601e adds PV for number of missed udp packets 2025-11-07 09:00:53 +01:00
18da14f6d6 adds additional key that can be set 2025-11-06 16:48:31 +01:00
9d5ed11dac adds comment on cloning with depdencies 2025-11-06 15:38:33 +01:00
318357127e use ssh variant... 2025-11-06 15:36:40 +01:00
2f50a21e83 use local mirrors 2025-11-06 15:34:32 +01:00
e53a2a4f40 finished converting the processing to a batch-wise variant 2025-11-06 15:30:25 +01:00
5f95e82a3c in the process of switching to a more batch processing approach. so far, seems like it can keep up 2025-11-06 11:58:19 +01:00
2ccf37ce33 comments on time overflow 2025-11-05 10:13:08 +01:00
617dd3153b not 100% this rate calculation is right, but might be better than before? 2025-11-05 09:57:05 +01:00
e5cb019143 no pointers, just bytes buffers of fixed size 2025-11-05 09:25:01 +01:00
70c04af034 slow rate updates 2025-11-05 08:00:17 +01:00
056b0a5f8a check for udp packets being missed 2025-11-04 17:01:18 +01:00
1ce7f93e95 adds a simple rate calculation 2025-11-04 16:19:28 +01:00
ecc6e98f4c can stop count and clear channels 2025-11-04 15:31:28 +01:00
2c47f338c2 can send kafka messages again and can set the broker and topics in the start command 2025-11-04 13:56:44 +01:00
60aa1652c3 again at the point that I can do preset based counts, but now with the priority queue built in so that the events are sorted 2025-11-04 10:24:25 +01:00
81bd3bef7f working on correcting the ordering of the messages 2025-11-03 17:31:16 +01:00
e65725609c moves more options to ioc function 2025-11-03 13:29:01 +01:00
a336ca74c9 adds remaining missing PVs 2025-11-03 09:26:50 +01:00
7bacc716cc adds elapsed time and time based preset 2025-10-31 19:10:59 +01:00
1e853487aa adds a POC preset based count 2025-10-31 13:23:55 +01:00
b9e5f40c21 removes old python variant 2025-10-31 10:18:57 +01:00
d7bf3977fc reorganises and cleans up some parts of the code 2025-10-31 10:16:54 +01:00
750436732c can receive both monitor and detector udp events and send them to different kafka topics 2025-10-30 16:48:33 +01:00
4c1741bd4b very inefficient, but can receive udp monitor events and count them and send them as kafka events 2025-10-30 15:07:21 +01:00
09ba30025a adds information on how to build dependencies for the project 2025-10-30 14:02:42 +01:00
2d065a0db9 can send ess streaming data types flatbuffer messages via kafka 2025-10-30 13:53:00 +01:00
2d5a43c09a adds external dependency versions 2025-10-30 11:55:10 +01:00
c2ca5f699c progress with parsing and kafka 2025-10-30 11:51:16 +01:00
b810509156 POC for each interface type 2025-10-29 12:07:02 +01:00
19 changed files with 2275 additions and 346 deletions

245
.clang-format Normal file
View File

@@ -0,0 +1,245 @@
---
Language: Cpp
# BasedOnStyle: LLVM
AccessModifierOffset: -2
AlignAfterOpenBracket: Align
AlignArrayOfStructures: None
AlignConsecutiveAssignments:
Enabled: false
AcrossEmptyLines: false
AcrossComments: false
AlignCompound: false
AlignFunctionPointers: false
PadOperators: true
AlignConsecutiveBitFields:
Enabled: false
AcrossEmptyLines: false
AcrossComments: false
AlignCompound: false
AlignFunctionPointers: false
PadOperators: false
AlignConsecutiveDeclarations:
Enabled: false
AcrossEmptyLines: false
AcrossComments: false
AlignCompound: false
AlignFunctionPointers: false
PadOperators: false
AlignConsecutiveMacros:
Enabled: false
AcrossEmptyLines: false
AcrossComments: false
AlignCompound: false
AlignFunctionPointers: false
PadOperators: false
AlignConsecutiveShortCaseStatements:
Enabled: false
AcrossEmptyLines: false
AcrossComments: false
AlignCaseColons: false
AlignEscapedNewlines: Right
AlignOperands: Align
AlignTrailingComments:
Kind: Always
OverEmptyLines: 0
AllowAllArgumentsOnNextLine: true
AllowAllParametersOfDeclarationOnNextLine: true
AllowBreakBeforeNoexceptSpecifier: Never
AllowShortBlocksOnASingleLine: Never
AllowShortCaseLabelsOnASingleLine: false
AllowShortCompoundRequirementOnASingleLine: true
AllowShortEnumsOnASingleLine: true
AllowShortFunctionsOnASingleLine: All
AllowShortIfStatementsOnASingleLine: Never
AllowShortLambdasOnASingleLine: All
AllowShortLoopsOnASingleLine: false
AlwaysBreakAfterDefinitionReturnType: None
AlwaysBreakAfterReturnType: None
AlwaysBreakBeforeMultilineStrings: false
AlwaysBreakTemplateDeclarations: MultiLine
AttributeMacros:
- __capability
BinPackArguments: true
BinPackParameters: true
BitFieldColonSpacing: Both
BraceWrapping:
AfterCaseLabel: false
AfterClass: false
AfterControlStatement: Never
AfterEnum: false
AfterExternBlock: false
AfterFunction: false
AfterNamespace: false
AfterObjCDeclaration: false
AfterStruct: false
AfterUnion: false
BeforeCatch: false
BeforeElse: false
BeforeLambdaBody: false
BeforeWhile: false
IndentBraces: false
SplitEmptyFunction: true
SplitEmptyRecord: true
SplitEmptyNamespace: true
BreakAdjacentStringLiterals: true
BreakAfterAttributes: Leave
BreakAfterJavaFieldAnnotations: false
BreakArrays: true
BreakBeforeBinaryOperators: None
BreakBeforeConceptDeclarations: Always
BreakBeforeBraces: Attach
BreakBeforeInlineASMColon: OnlyMultiline
BreakBeforeTernaryOperators: true
BreakConstructorInitializers: BeforeColon
BreakInheritanceList: BeforeColon
BreakStringLiterals: true
ColumnLimit: 80
CommentPragmas: '^ IWYU pragma:'
CompactNamespaces: false
ConstructorInitializerIndentWidth: 4
ContinuationIndentWidth: 4
Cpp11BracedListStyle: true
DerivePointerAlignment: false
DisableFormat: false
EmptyLineAfterAccessModifier: Never
EmptyLineBeforeAccessModifier: LogicalBlock
ExperimentalAutoDetectBinPacking: false
FixNamespaceComments: true
ForEachMacros:
- foreach
- Q_FOREACH
- BOOST_FOREACH
IfMacros:
- KJ_IF_MAYBE
IncludeBlocks: Preserve
IncludeCategories:
- Regex: '^"(llvm|llvm-c|clang|clang-c)/'
Priority: 2
SortPriority: 0
CaseSensitive: false
- Regex: '^(<|"(gtest|gmock|isl|json)/)'
Priority: 3
SortPriority: 0
CaseSensitive: false
- Regex: '.*'
Priority: 1
SortPriority: 0
CaseSensitive: false
IncludeIsMainRegex: '(Test)?$'
IncludeIsMainSourceRegex: ''
IndentAccessModifiers: false
IndentCaseBlocks: false
IndentCaseLabels: false
IndentExternBlock: AfterExternBlock
IndentGotoLabels: true
IndentPPDirectives: None
IndentRequiresClause: true
IndentWidth: 4
IndentWrappedFunctionNames: false
InsertBraces: false
InsertNewlineAtEOF: false
InsertTrailingCommas: None
IntegerLiteralSeparator:
Binary: 0
BinaryMinDigits: 0
Decimal: 0
DecimalMinDigits: 0
Hex: 0
HexMinDigits: 0
JavaScriptQuotes: Leave
JavaScriptWrapImports: true
KeepEmptyLinesAtTheStartOfBlocks: true
KeepEmptyLinesAtEOF: false
LambdaBodyIndentation: Signature
LineEnding: DeriveLF
MacroBlockBegin: ''
MacroBlockEnd: ''
MaxEmptyLinesToKeep: 1
NamespaceIndentation: None
ObjCBinPackProtocolList: Auto
ObjCBlockIndentWidth: 2
ObjCBreakBeforeNestedBlockParam: true
ObjCSpaceAfterProperty: false
ObjCSpaceBeforeProtocolList: true
PackConstructorInitializers: BinPack
PenaltyBreakAssignment: 2
PenaltyBreakBeforeFirstCallParameter: 19
PenaltyBreakComment: 300
PenaltyBreakFirstLessLess: 120
PenaltyBreakOpenParenthesis: 0
PenaltyBreakScopeResolution: 500
PenaltyBreakString: 1000
PenaltyBreakTemplateDeclaration: 10
PenaltyExcessCharacter: 1000000
PenaltyIndentedWhitespace: 0
PenaltyReturnTypeOnItsOwnLine: 60
PointerAlignment: Right
PPIndentWidth: -1
QualifierAlignment: Leave
ReferenceAlignment: Pointer
ReflowComments: true
RemoveBracesLLVM: false
RemoveParentheses: Leave
RemoveSemicolon: false
RequiresClausePosition: OwnLine
RequiresExpressionIndentation: OuterScope
SeparateDefinitionBlocks: Leave
ShortNamespaceLines: 1
SkipMacroDefinitionBody: false
SortIncludes: CaseSensitive
SortJavaStaticImport: Before
SortUsingDeclarations: LexicographicNumeric
SpaceAfterCStyleCast: false
SpaceAfterLogicalNot: false
SpaceAfterTemplateKeyword: true
SpaceAroundPointerQualifiers: Default
SpaceBeforeAssignmentOperators: true
SpaceBeforeCaseColon: false
SpaceBeforeCpp11BracedList: false
SpaceBeforeCtorInitializerColon: true
SpaceBeforeInheritanceColon: true
SpaceBeforeJsonColon: false
SpaceBeforeParens: ControlStatements
SpaceBeforeParensOptions:
AfterControlStatements: true
AfterForeachMacros: true
AfterFunctionDefinitionName: false
AfterFunctionDeclarationName: false
AfterIfMacros: true
AfterOverloadedOperator: false
AfterPlacementOperator: true
AfterRequiresInClause: false
AfterRequiresInExpression: false
BeforeNonEmptyParentheses: false
SpaceBeforeRangeBasedForLoopColon: true
SpaceBeforeSquareBrackets: false
SpaceInEmptyBlock: false
SpacesBeforeTrailingComments: 1
SpacesInAngles: Never
SpacesInContainerLiterals: true
SpacesInLineCommentPrefix:
Minimum: 1
Maximum: -1
SpacesInParens: Never
SpacesInParensOptions:
InCStyleCasts: false
InConditionalStatements: false
InEmptyParentheses: false
Other: false
SpacesInSquareBrackets: false
Standard: Latest
StatementAttributeLikeMacros:
- Q_EMIT
StatementMacros:
- Q_UNUSED
- QT_REQUIRE_VERSION
TabWidth: 8
UseTab: Never
VerilogBreakBetweenInstancePorts: true
WhitespaceSensitiveMacros:
- BOOST_PP_STRINGIZE
- CF_SWIFT_NAME
- NS_SWIFT_NAME
- PP_STRINGIZE
- STRINGIZE
...

View File

@@ -0,0 +1,35 @@
name: Test And Build
on: [push]
jobs:
Lint:
runs-on: linepics
steps:
- name: checkout repo
uses: actions/checkout@v4
- name: formatting
run: clang-format --style=file --Werror --dry-run src/*.cpp
- name: cppcheck
run: cppcheck --std=c++17 --addon=misc --error-exitcode=1 src/*.cpp
Build:
runs-on: linepics
steps:
- name: checkout repo
uses: actions/checkout@v4
with:
submodules: 'true'
- name: install dependencies
run: |
dnf install -y librdkafka-devel
- name: prepare flatbuffers
run: |
pushd dep/flatbuffers
cmake -G "Unix Makefiles"
make -j
popd
./dep/flatbuffers/flatc -o schemas/ --cpp --gen-mutable --gen-name-strings --scoped-enums ./dep/streaming-data-types/schemas/*
- name: build module
run: |
sed -i 's/ARCH_FILTER=.*/ARCH_FILTER=linux%/' Makefile
echo -e "\nIGNORE_SUBMODULES += streaming-data-types flatbuffers" >> Makefile
make install

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
O.*
.*ignore
schemas/

6
.gitmodules vendored Normal file
View File

@@ -0,0 +1,6 @@
[submodule "dep/streaming-data-types"]
path = dep/streaming-data-types
url = git@gitea.psi.ch:lin-controls/streaming-data-types.git
[submodule "dep/flatbuffers"]
path = dep/flatbuffers
url = git@gitea.psi.ch:lin-controls/flatbuffers.git

29
Makefile Normal file
View File

@@ -0,0 +1,29 @@
# Include the external Makefile
include /ioc/tools/driver.makefile
MODULE=StreamGenerator
BUILDCLASSES=Linux
EPICS_VERSIONS=7.0.7
ARCH_FILTER=RHEL%
# Additional module dependencies
REQUIRED+=asyn
DBDS += src/asynStreamGeneratorDriver.dbd
# DB files to include in the release
TEMPLATES += db/channels.db db/daq_common.db db/correlation_unit.db
# HEADERS += src/asynStreamGeneratorDriver.h
# Source files to build
SOURCES += src/asynStreamGeneratorDriver.cpp
# I don't think specifying the optimisation level like this is correct...
# but I doesn't hurt :D
USR_CFLAGS += -O3 -Wall -Wextra -Wunused-result -Werror -fvisibility=hidden # -Wpedantic // Does not work because EPICS macros trigger warnings
# Required to support EV42/44
USR_CXXFLAGS += -std=c++17 -O3 -I../dep/flatbuffers/include/ -I../schemas
LIB_SYS_LIBS += rdkafka

41
README.md Normal file
View File

@@ -0,0 +1,41 @@
# StreamGenerator
Clone the repository to a local directory via:
```
git clone --recurse-submodules -j8 git@gitea.psi.ch:lin-epics-modules/StreamGenerator.git
```
## Dependencies
Currently, this project requires a system install of librdkafka. On Redhat,
this means you should run:
```bash
dnf install -y librdkafka-devel
```
Additionally, you must first build Google's *flatbuffers* and ESS's
**streaming-data-types** libraries, which are both included in this project as
submodules under the `dep` directory and which are both necessary to build this
project.
First, you should enter the *flatbuffers* directory and run the following:
```bash
cmake -G "Unix Makefiles"
make -j
```
After these steps, you will find the program `flatc` has been built and placed
in the directory.
Next, you should return to the top of this project's directory tree, and create
the flatbuffers from ESS's schema files. This you can do as follows:
```bash
./dep/flatbuffers/flatc -o schemas/ --cpp --gen-mutable --gen-name-strings --scoped-enums ./dep/streaming-data-types/schemas/*
```
This generates header files from each of ESS's schemas and places them in a
schemas directory.

76
db/channels.db Normal file
View File

@@ -0,0 +1,76 @@
# EPICS Database for streamdevice specific to measurement channels
#
# Macros
# INSTR - Prefix
# NAME - the device name, e.g. EL737
# PORT - StreamGenerator Port
# CHANNEL - the number associated with the measurment channel
################################################################################
# Status Variables
# Trigger a change in status as clearing
record(bo, "$(INSTR)$(NAME):T$(CHANNEL)")
{
field(DESC, "Trigger Clearing Status")
field(VAL, 1)
field(OUT, "$(INSTR)$(NAME):S$(CHANNEL) PP")
}
# Trigger a change in status as value returned to 0
record(seq, "$(INSTR)$(NAME):O$(CHANNEL)")
{
field(DESC, "Trigger Returned to 0 Status")
field(LNK0, "$(INSTR)$(NAME):S$(CHANNEL) PP")
field(DO0, 0)
field(SELM, "Specified")
field(SELL, "$(INSTR)$(NAME):M$(CHANNEL).VAL")
field(SCAN, ".1 second")
}
# Current Status of Channel, i.e. is it ready to count?
record(bi, "$(INSTR)$(NAME):S$(CHANNEL)")
{
field(DESC, "Channel Status")
field(VAL, 0)
field(ZNAM, "OK")
field(ONAM, "CLEARING")
field(PINI, 1)
}
################################################################################
# Count Commands
record(longout, "$(INSTR)$(NAME):C$(CHANNEL)")
{
field(DESC, "Clear the current channel count")
field(DTYP, "asynInt32")
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) C_$(CHANNEL)")
field(FLNK, "$(INSTR)$(NAME):T$(CHANNEL)")
}
################################################################################
# Read all monitors values
record(int64in, "$(INSTR)$(NAME):M$(CHANNEL)")
{
field(DESC, "DAQ CH$(CHANNEL)")
field(EGU, "cts")
field(DTYP, "asynInt64")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) COUNTS$(CHANNEL)")
# This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order
# field(SCAN, "I/O Intr")
field(PINI, "YES")
}
record(ai, "$(INSTR)$(NAME):R$(CHANNEL)")
{
field(DESC, "Rate of DAQ CH$(CHANNEL)")
field(EGU, "cts/sec")
field(DTYP, "asynFloat64")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) RATE$(CHANNEL)")
field(SCAN, ".5 second")
field(PREC, 2)
# field(SCAN, "I/O Intr")
field(PINI, "YES")
}

73
db/correlation_unit.db Normal file
View File

@@ -0,0 +1,73 @@
# EPICS Database for streamdevice specific to measurement channels
#
# Macros
# INSTR - Prefix
# NAME - the device name, e.g. EL737
# PORT - StreamGenerator Port
################################################################################
# Status Variables
record(bo, "$(INSTR)$(NAME):Enable")
{
field(DESC, "Electronics Status")
field(DTYP, "asynInt32")
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) EN_EL")
field(ZNAM, "OFF")
field(ONAM, "ON")
field(PINI, 1)
}
record(bi, "$(INSTR)$(NAME):Enable_RBV")
{
field(DESC, "Electronics Status")
field(DTYP, "asynInt32")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) EN_EL_RBV")
field(ZNAM, "OFF")
field(ONAM, "ON")
field(SCAN, ".5 second")
}
record(longin,"$(INSTR)$(NAME):UDP_DROPPED")
{
field(DESC, "UDP Packets Missed")
field(EGU, "Events")
field(DTYP, "asynInt32")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) DROP")
# field(SCAN, "I/O Intr")
field(SCAN, "1 second")
field(PINI, "YES")
}
record(longin,"$(INSTR)$(NAME):UDP_WATERMARK")
{
field(DESC, "UDP Queue Usage")
field(EGU, "%")
field(DTYP, "asynInt32")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) UDP")
# field(SCAN, "I/O Intr")
field(SCAN, "1 second")
field(PINI, "YES")
}
record(longin,"$(INSTR)$(NAME):NORMALISED_WATERMARK")
{
field(DESC, "Normalised Queue Usage")
field(EGU, "%")
field(DTYP, "asynInt32")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) NORM")
# field(SCAN, "I/O Intr")
field(SCAN, "1 second")
field(PINI, "YES")
}
record(longin,"$(INSTR)$(NAME):SORTED_WATERMARK")
{
field(DESC, "Sort Queue Usage")
field(EGU, "%")
field(DTYP, "asynInt32")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) SORT")
# field(SCAN, "I/O Intr")
field(SCAN, "1 second")
field(PINI, "YES")
}

238
db/daq_common.db Normal file
View File

@@ -0,0 +1,238 @@
# EPICS Database for streamdevice specific to measurement channels
#
# Macros
# INSTR - Prefix
# NAME - the device name, e.g. EL737
# PORT - StreamGenerator Port
record(longout, "$(INSTR)$(NAME):FULL-RESET")
{
field(DESC, "Reset the DAQ")
field(DTYP, "asynInt32")
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) RESET")
}
################################################################################
# Status Variables
record(stringin, "$(INSTR)$(NAME):MsgTxt")
{
field(DESC, "Unexpected received response")
}
# We separate the RAW-STATUS and the STATUS PV so that the state can be updated
# in a sequence, that guarantees that we included the most recent time and
# counts before the status switches back to Idle.
# We do this via a sequenced update
#
# RAW-STATUS -> ELAPSED-SECONDS -> M* -> STATUS
record(mbbi, "$(INSTR)$(NAME):RAW-STATUS")
{
field(DESC, "DAQ Status")
field(DTYP, "asynInt32")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) STATUS")
field(ZRVL, "0")
field(ZRST, "Idle")
field(ONVL, "1")
field(ONST, "Counting")
field(TWVL, "2")
field(TWST, "Low rate")
field(THVL, "3")
field(THST, "Paused")
# 4 should never happen, if it does it means the DAQ reports undocumented statusbits
field(FRVL, "4")
field(FRST, "INVALID")
# This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order
# field(SCAN, "I/O Intr")
field(SCAN, ".1 second")
field(FLNK, "$(INSTR)$(NAME):READALL")
field(PINI, "YES")
}
record(fanout, "$(INSTR)$(NAME):READALL")
{
field(SELM, "All")
field(LNK0, "$(INSTR)$(NAME):ELAPSED-TIME")
field(LNK1, "$(INSTR)$(NAME):M1")
field(LNK2, "$(INSTR)$(NAME):M2")
field(LNK3, "$(INSTR)$(NAME):M3")
field(LNK4, "$(INSTR)$(NAME):M4")
field(LNK5, "$(INSTR)$(NAME):M5")
# Doesn't seemt o be a problem to have more in here :D
# field(LNK6, "$(INSTR)$(NAME):M5")
# field(LNK7, "$(INSTR)$(NAME):M6")
field(FLNK, "$(INSTR)$(NAME):STATUS")
}
record(mbbi, "$(INSTR)$(NAME):STATUS")
{
field(INP, "$(INSTR)$(NAME):RAW-STATUS NPP")
field(DESC, "DAQ Status")
field(ZRVL, "0")
field(ZRST, "Idle")
field(ONVL, "1")
field(ONST, "Counting")
field(TWVL, "2")
field(TWST, "Low rate")
field(THVL, "3")
field(THST, "Paused")
# 4 should never happen, if it does it means the DAQ reports undocumented statusbits
field(FRVL, "4")
field(FRST, "INVALID")
field(PINI, "YES")
}
record(longin, "$(INSTR)$(NAME):CHANNELS")
{
field(DESC, "Total Supported Channels")
field(VAL, $(CHANNELS))
field(DISP, 1)
}
# Trigger a change in status as clearing
record(bo, "$(INSTR)$(NAME):ETT")
{
field(DESC, "Trigger Clearing Status")
field(VAL, 1)
field(OUT, "$(INSTR)$(NAME):ETS PP")
}
# Trigger a change in status as value returned to 0
record(seq, "$(INSTR)$(NAME):ETO")
{
field(DESC, "Trigger Returned to 0 Status")
field(LNK0, "$(INSTR)$(NAME):ETS PP")
field(DO0, 0)
field(SELM, "Specified")
field(SELL, "$(INSTR)$(NAME):ELAPSED-TIME.VAL")
field(SCAN, ".1 second")
}
# Current Status of Channel, i.e. is it ready to count?
record(bi, "$(INSTR)$(NAME):ETS")
{
field(DESC, "Channel Status")
field(VAL, 0)
field(ZNAM, "OK")
field(ONAM, "CLEARING")
field(PINI, 1)
}
################################################################################
# Count Commands
record(longout,"$(INSTR)$(NAME):PRESET-COUNT")
{
field(DESC, "Count until preset reached")
field(DTYP, "asynInt32")
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) P_CNT")
field(VAL, 0)
}
record(longout,"$(INSTR)$(NAME):PRESET-TIME")
{
field(DESC, "Count for specified time")
field(EGU, "seconds")
field(DTYP, "asynInt32")
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) P_TIME")
field(VAL, 0)
}
record(bo,"$(INSTR)$(NAME):PAUSE")
{
field(DESC, "Pause the current count")
field(VAL, "0")
}
record(bo,"$(INSTR)$(NAME):CONTINUE")
{
field(DESC, "Continue with a count that was paused")
field(VAL, "0")
}
record(longout, "$(INSTR)$(NAME):STOP")
{
field(DESC, "Stop the current counting operation")
field(DTYP, "asynInt32")
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) STOP")
}
record(longout, "$(INSTR)$(NAME):MONITOR-CHANNEL")
{
field(DESC, "PRESET-COUNT Monitors this channel")
field(DTYP, "asynInt32")
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) MONITOR")
field(DRVL, "1") # Smallest Monitor Channel
field(DRVH, "$(CHANNELS)") # Largest Monitor Channel
}
record(longin, "$(INSTR)$(NAME):MONITOR-CHANNEL_RBV")
{
field(DESC, "PRESET-COUNT Monitors this channel")
field(DTYP, "asynInt32")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) MONITOR")
field(SCAN, ".2 second")
field(PINI, "YES")
}
record(ao,"$(INSTR)$(NAME):THRESHOLD")
{
field(DESC, "Minimum rate for counting to proceed")
field(DTYP, "asynInt32")
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH")
field(VAL, "1") # Default Rate
field(DRVL, "1") # Minimum Rate
field(DRVH, "100000") # Maximum Rate
}
record(ai,"$(INSTR)$(NAME):THRESHOLD_RBV")
{
field(DESC, "Minimum rate for counting to proceed")
field(EGU, "cts/sec")
field(DTYP, "asynInt32")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH")
field(SCAN, "I/O Intr")
field(PINI, "YES")
}
record(longout,"$(INSTR)$(NAME):THRESHOLD-MONITOR")
{
field(DESC, "Channel monitored for minimum rate")
field(DTYP, "asynInt32")
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH_CH")
field(VAL, "1") # Monitor
field(DRVL, "0") # Smallest Threshold Channel (0 is off)
field(DRVH, "$(CHANNELS)") # Largest Threshold Channel
}
record(longin,"$(INSTR)$(NAME):THRESHOLD-MONITOR_RBV")
{
field(DESC, "Channel monitored for minimum rate")
field(EGU, "CH")
field(DTYP, "asynInt32")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH_CH")
field(SCAN, "I/O Intr")
field(PINI, "YES")
}
record(longout, "$(INSTR)$(NAME):CT")
{
field(DESC, "Clear the timer")
field(DTYP, "asynInt32")
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) C_TIME")
field(FLNK, "$(INSTR)$(NAME):ETT")
}
################################################################################
# Read all monitors values
record(ai, "$(INSTR)$(NAME):ELAPSED-TIME")
{
field(DESC, "DAQ Measured Time")
field(EGU, "sec")
field(DTYP, "asynFloat64")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) TIME")
# field(SCAN, "I/O Intr")
field(PINI, "YES")
# field(FLNK, "$(INSTR)$(NAME):ETO")
}

1
dep/flatbuffers Submodule

Submodule dep/flatbuffers added at 1872409707

View File

@@ -1,4 +0,0 @@
confluent-kafka==2.12.1
ess-streaming-data-types==0.27.0
flatbuffers==25.9.23
numpy==1.26.3

9
scripts/ioc.sh Executable file
View File

@@ -0,0 +1,9 @@
#!/bin/bash
export EPICS_HOST_ARCH=linux-x86_64
export EPICS_BASE=/usr/local/epics/base-7.0.7
PARENT_PATH="$( cd "$(dirname "${BASH_SOURCE[0]}")" ; pwd -P )"
# /usr/local/bin/procServ -o -L - -f -i ^D^C 20001 "${PARENT_PATH}/st.cmd" -d
${PARENT_PATH}/st.cmd

40
scripts/st.cmd Executable file
View File

@@ -0,0 +1,40 @@
#!/usr/local/bin/iocsh
#-d
on error break
require StreamGenerator, test
epicsEnvSet("INSTR", "SQ:DMC-DAQ:")
epicsEnvSet("NAME", "SG")
# Local UDP Generator Test Config
drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 1)
# Correlation Unit Config
# drvAsynIPPortConfigure("ASYN_IP_PORT", "172.28.69.20:54321:54321 UDP", 0, 0, 1)
# With a udpQueue and sortQueue size of 10'000 packets, we can hold in memory
# 10'000 * 243 = 2.43e6 events
# Kafka Broker and Topic Configuration
# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "linkafka01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 10000, 20480)
# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "ess01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 10000, 20480)
# Don't send any kafka messages
asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "", "", "", 0, 0)
dbLoadRecords("$(StreamGenerator_DB)correlation_unit.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG")
dbLoadRecords("$(StreamGenerator_DB)daq_common.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNELS=5")
# Monitor Channels
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=1")
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=2")
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=3")
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=4")
# Detector Count Channel
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=5")
iocInit()

120
scripts/udp_gen.py Normal file
View File

@@ -0,0 +1,120 @@
import socket
import time
import random
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
header = [
0, 0, # buffer length in 16bit words (1, 0) == 1, (0, 1) == 256
0, 0x80, # buffer type (probably should be 0)
21, 0, # header length
0, 0, # buffer number
0, 0, # run id
0x3, # status
0, # id of sending module
0, 0, # timestamp low
0, 0, # timestamp mid
0, 0, # timestamp high
] + [0, 0] * 12 # parameters
data = [
0,
0,
0,
0,
0,
0
]
start_time = time.time_ns() // 100
buffer_ids = {
i: (0, 0) for i in range(10)
}
while True:
# update timestamp
base_timestamp = time.time_ns() // 100 - start_time
t_low = base_timestamp & 0xffff
t_mid = (base_timestamp >> 16) & 0xffff
t_high = (base_timestamp >> 32) & 0xffff
header[12] = t_low & 0xff
header[13] = t_low >> 8
header[14] = t_mid & 0xff
header[15] = t_mid >> 8
header[16] = t_high & 0xff
header[17] = t_high >> 8
num_events = random.randint(0, 243)
# num_events = 243
# num_events = 1
# update buffer length
buffer_length = 21 + num_events * 3
header[0] = buffer_length & 0xff
header[1] = (buffer_length >> 8) & 0xff
# I believe, that in our case we never mix monitor and detector events as
# the monitors should have id 0 and the detector events 1-9 so I have
# excluded that posibility here. That would, however, if true mean we could
# reduce also the number of checks on the parsing side of things...
is_monitor = random.randint(0, 9)
# is_monitor = 4
header[11] = 0 if is_monitor > 3 else random.randint(1,9)
# update buffer number (each mcpdid has its own buffer number count)
header[6], header[7] = buffer_ids[header[11]]
header[6] = (header[6] + 1) % (0xff + 1)
header[7] = (header[7] + (header[6] == 0)) % (0xff + 1)
buffer_ids[header[11]] = header[6], header[7]
tosend = []
if is_monitor > 3:
for i in range(num_events):
d = list(data)
monitor = random.randint(0,3)
# monitor = 0
d[5] = (1 << 7) | monitor
# update trigger timestamp
event_timestamp = (time.time_ns() // 100) - base_timestamp
d[0] = event_timestamp & 0xff
d[1] = (event_timestamp >> 8) & 0xff
d[2] = (event_timestamp >> 16) & 0x07
# completely reversed sorting
tosend = d + tosend
else:
for i in range(num_events):
d = list(data)
amplitude = random.randint(0, 255)
x_pos = random.randint(0, 1023)
y_pos = random.randint(0, 1023)
event_timestamp = (time.time_ns() // 100) - base_timestamp
d[5] = (0 << 7) | (amplitude >> 1)
d[4] = ((amplitude & 0x01) << 7) | (y_pos >> 3)
d[3] = ((y_pos << 5) & 0xE0) | (x_pos >> 5)
d[2] = ((x_pos << 3) & 0xF8)
d[0] = event_timestamp & 0xff
d[1] = (event_timestamp >> 8) & 0xff
d[2] |= (event_timestamp >> 16) & 0x07
# completely reversed sorting
tosend = d + tosend
sock.sendto(bytes(header + tosend), ('127.0.0.1', 54321))
mv = memoryview(bytes(header)).cast('H')
print(f'Sent packet {mv[3]} with {num_events} events {base_timestamp}')
# time.sleep(.1)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1 @@
registrar("asynStreamGeneratorDriverRegister")

View File

@@ -0,0 +1,234 @@
#ifndef asynStreamGeneratorDriver_H
#define asynStreamGeneratorDriver_H
#include "asynPortDriver.h"
#include <epicsRingBytes.h>
#include <librdkafka/rdkafka.h>
// Just for printing
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
/*******************************************************************************
* UDP Packet Definitions
*/
enum class CommandId : std::uint16_t {
reset = 0,
start = 1,
stop = 2,
cont = 3
};
// TODO these headers are actually the same, but with different bodies.
struct __attribute__((__packed__)) CommandHeader {
uint16_t BufferLength;
uint16_t BufferType;
uint16_t HeaderLength;
uint16_t BufferNumber;
uint16_t Command;
uint16_t Status : 8;
uint16_t McpdID : 8;
uint16_t TimeStamp[3];
uint16_t Checksum;
uint16_t Finalizer;
CommandHeader(const CommandId commandId)
: BufferLength(10), BufferType(0x8000), HeaderLength(10),
BufferNumber(0),
Command(
static_cast<std::underlying_type<CommandId>::type>(commandId)),
Status(0), McpdID(0), TimeStamp{0, 0, 0}, Checksum(0),
Finalizer(0xffff) {
uint16_t checksum = 0x0000;
for (std::size_t i = 0; i < BufferLength; ++i)
checksum ^= ((uint16_t *)this)[i];
Checksum = checksum;
}
};
struct __attribute__((__packed__)) DataHeader {
uint16_t BufferLength;
uint16_t BufferType;
uint16_t HeaderLength;
uint16_t BufferNumber;
uint16_t RunCmdID;
uint16_t Status : 8;
uint16_t McpdID : 8;
uint16_t TimeStamp[3];
uint16_t Parameter0[3];
uint16_t Parameter1[3];
uint16_t Parameter2[3];
uint16_t Parameter3[3];
inline uint64_t nanosecs() const {
uint64_t nsec{((uint64_t)TimeStamp[2]) << 32 |
((uint64_t)TimeStamp[1]) << 16 | (uint64_t)TimeStamp[0]};
return nsec * 100;
}
};
struct __attribute__((__packed__)) DetectorEvent {
uint64_t TimeStamp : 19;
uint16_t XPosition : 10;
uint16_t YPosition : 10;
uint16_t Amplitude : 8;
uint16_t Id : 1;
inline uint32_t nanosecs() const { return TimeStamp * 100; }
inline uint32_t pixelId(uint32_t mpcdId) const {
const uint32_t x_pixels = 128;
const uint32_t y_pixels = 128;
return (mpcdId - 1) * x_pixels * y_pixels +
x_pixels * (uint32_t)(this->XPosition >> 3) +
(uint32_t)(this->YPosition >> 3);
}
};
struct __attribute__((__packed__)) MonitorEvent {
uint64_t TimeStamp : 19;
uint64_t Data : 21;
uint64_t DataID : 4;
uint64_t TriggerID : 3;
uint64_t Id : 1;
inline uint32_t nanosecs() const { return TimeStamp * 100; }
};
/*******************************************************************************
* Simplified Event Struct Definition
*/
struct __attribute__((__packed__)) NormalisedEvent {
uint64_t timestamp;
uint32_t pixelId : 24;
uint8_t source;
// inline NormalisedEvent(uint64_t timestamp, uint8_t source, uint32_t
// pixelId)
// : timestamp(timestamp), source(source), pixelId(pixelId){};
};
/*******************************************************************************
* Status values that should match the definition in db/daq_common.db
*/
#define STATUS_IDLE 0
#define STATUS_COUNTING 1
#define STATUS_LOWRATE 2
#define STATUS_PAUSED 3
/*******************************************************************************
* Parameters for use in DB records
*
* i.e.e drvInfo strings that are used to identify the parameters
*/
constexpr static char P_EnableElectronicsString[]{"EN_EL"};
constexpr static char P_EnableElectronicsRBVString[]{"EN_EL_RBV"};
constexpr static char P_StatusString[]{"STATUS"};
constexpr static char P_ResetString[]{"RESET"};
constexpr static char P_StopString[]{"STOP"};
constexpr static char P_CountPresetString[]{"P_CNT"};
constexpr static char P_TimePresetString[]{"P_TIME"};
constexpr static char P_ElapsedTimeString[]{"TIME"};
constexpr static char P_ClearElapsedTimeString[]{"C_TIME"};
constexpr static char P_MonitorChannelString[]{"MONITOR"};
constexpr static char P_ThresholdString[]{"THRESH"};
constexpr static char P_ThresholdChannelString[]{"THRESH_CH"};
constexpr static char P_CountsString[]{"COUNTS%" PRIu64};
constexpr static char P_RateString[]{"RATE%" PRIu64};
constexpr static char P_ClearCountsString[]{"C_%" PRIu64};
constexpr static char P_UdpDroppedString[]{"DROP"};
constexpr static char P_UdpQueueHighWaterMarkString[]{"UDP"};
constexpr static char P_NormalisedQueueHighWaterMarkString[]{"NORM"};
constexpr static char P_SortedQueueHighWaterMarkString[]{"SORT"};
/*******************************************************************************
* Stream Generator Coordinating Class
*/
class asynStreamGeneratorDriver : public asynPortDriver {
public:
asynStreamGeneratorDriver(const char *portName, const char *ipPortName,
const int numChannels, const int udpQueueSize,
const bool enableKafkaStream,
const char *kafkaBroker, const char *monitorTopic,
const char *detectorTopic,
const int kafkaQueueSize,
const int kafkaMaxPacketSize);
virtual ~asynStreamGeneratorDriver();
virtual asynStatus readInt32(asynUser *pasynUser, epicsInt32 *value);
virtual asynStatus writeInt32(asynUser *pasynUser, epicsInt32 value);
void receiveUDP();
void normaliseUDP();
void partialSortEvents();
void processEvents();
void produceMonitor();
void produceDetector();
protected:
// Parameter Identifying IDs
int P_EnableElectronics;
int P_EnableElectronicsRBV;
int P_Status;
int P_Reset;
int P_Stop;
int P_CountPreset;
int P_TimePreset;
int P_ElapsedTime;
int P_ClearElapsedTime;
int P_MonitorChannel;
int P_Threshold;
int P_ThresholdChannel;
int *P_Counts;
int *P_Rates;
int *P_ClearCounts;
// System Status Parameter Identifying IDs
int P_UdpDropped;
int P_UdpQueueHighWaterMark;
int P_NormalisedQueueHighWaterMark;
int P_SortedQueueHighWaterMark;
private:
asynUser *pasynUDPUser;
epicsEventId pausedEventId;
const std::size_t num_channels;
const bool kafkaEnabled;
const int kafkaQueueSize;
const int kafkaMaxPacketSize;
const int udpQueueSize;
epicsRingBytesId udpQueue;
epicsRingBytesId normalisedQueue;
epicsRingBytesId sortedQueue;
epicsRingBytesId monitorQueue;
rd_kafka_t *monitorProducer;
const std::string monitorTopic;
epicsRingBytesId detectorQueue;
rd_kafka_t *detectorProducer;
const std::string detectorTopic;
static constexpr char driverName[]{"StreamGenerator"};
asynStatus createInt32Param(asynStatus status, const char *name,
int *variable, epicsInt32 initialValue = 0);
asynStatus createInt64Param(asynStatus status, const char *name,
int *variable, epicsInt64 initialValue = 0);
asynStatus createFloat64Param(asynStatus status, const char *name,
int *variable, double initialValue = 0);
inline void queueForKafka(NormalisedEvent &ne);
void produce(epicsRingBytesId eventQueue, rd_kafka_t *kafkaProducer,
const char *topic, const char *source);
};
#endif

View File

@@ -1,342 +0,0 @@
import queue
import socket
import time
import threading
from uuid import uuid4
import math
from confluent_kafka import Producer
import streaming_data_types
# receiving directly (can also specify correlation unit ip)
UDP_IP = ""
UDP_PORT = 54321
# If redirecting traffic via
# socat -U - udp4-recv:54321 | tee >( socat -u - udp4-datagram:127.0.0.1:54322 ) | socat -u - udp4-datagram:127.0.0.1:54323
# UDP_IP = "127.0.0.1"
# UDP_PORT = 54323
WINDOWSECONDS = 5
WINDOWSIZE = 20000 * WINDOWSECONDS
MONITORS = 4 # We have max 4 monitors
time_offset = None # Estimate of clock offset
time_window = {
i: queue.Queue(maxsize=WINDOWSIZE)
for i in range(MONITORS)
}
# event_time_window = queue.Queue(maxsize=50000 * WINDOWSECONDS)
EVENT_WINDOWSIZE = 50000
EVENT_WINDOW_PTR = 0
event_time_window = [0 for i in range(EVENT_WINDOWSIZE)]
event_average_rate = 0
event_last_timestamp = None
MISSED_PACKETS = -9 # All modules appear to miss the first time due to initialisation as 0
# missed_packets_time_window = queue.Queue(maxsize=100)
def print_monitor_rates():
while True:
for i in range(MONITORS):
msg = f"Monitor {i+1}: {time_window[i].qsize() / WINDOWSECONDS} cts/s"
try:
earliest = time_window[i].queue[0]
newest = max(time_window[i].queue)
t = time.time()
msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s'
except:
pass
print(msg)
# try:
# print(f'Events: {1 / event_average_rate} cts/s')
# except:
# pass
try:
print(f'Events: {round(1 / (sum(event_time_window) / EVENT_WINDOWSIZE * 1e-7), 2)} cts/s')
except:
pass
print(f'Missed Packets: {MISSED_PACKETS}')
# Detector Events
# msg = f"Events : {event_time_window.qsize() / WINDOWSECONDS} cts/s"
# try:
# earliest = event_time_window.queue[0]
# newest = max(event_time_window.queue)
# t = time.time()
# msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s'
# except:
# pass
# print(msg)
time.sleep(1)
threading.Thread(target=print_monitor_rates, daemon=True).start()
def clean_monitor_rates():
latest = 0
while True:
for d_id in range(MONITORS):
t_w = time_window[d_id]
if not t_w.empty():
# TODO probably should switch to a priority queue
# as the messages might not be in order
# TODO could also just replace with a low-pass filter
# would be a lot more efficient
# TODO the way this is done, we need trigger events
# in order for the signal to decay back to 0.
# If no events come, the rate remains stuck
latest = max(latest, max(t_w.queue))
# latest = time_window[1].queue[-1]
try:
while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7):
t_w.get_nowait()
except IndexError:
pass
time.sleep(0.01)
threading.Thread(target=clean_monitor_rates, daemon=True).start()
# def clean_event_rates():
# latest = 0
# while True:
# t_w = event_time_window
# if not t_w.empty():
# # TODO probably should switch to a priority queue
# # as the messages might not be in order
# # TODO could also just replace with a low-pass filter
# # would be a lot more efficient
# # TODO the way this is done, we need trigger events
# # in order for the signal to decay back to 0.
# # If no events come, the rate remains stuck
# #latest = max(latest, max(t_w.queue))
# try:
# latest = time_window[1].queue[-1]
# while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7):
# t_w.get_nowait()
# except IndexError:
# pass
# time.sleep(0.005)
#
# threading.Thread(target=clean_event_rates, daemon=True).start()
# Event Kafka Producer
event_queue = queue.Queue()
def event_producer():
producer_config = {
'bootstrap.servers': "linkafka01:9092",
'queue.buffering.max.messages': 1e7,
}
prod = Producer(producer_config)
st = time.time()
msg_id = 0
b_size = 10000
b_ptr = 0
pixel_buffer = [0 for _ in range(b_size)]
time_buffer = [0 for _ in range(b_size)]
poll_cnt = 0
while True:
(p_id, timestamp) = event_queue.get()
pixel_buffer[b_ptr] = p_id
time_buffer[b_ptr] = timestamp
b_ptr += 1
nt = time.time()
if b_ptr == b_size or nt - st > 0.001:
st = nt
if b_ptr > 0:
message = streaming_data_types.serialise_ev42(
message_id = msg_id,
pulse_time = time_buffer[0] * 100, # int(time.time() * 1_000_000_000),
time_of_flight = time_buffer[0:b_ptr],
detector_id = pixel_buffer[0:b_ptr],
source_name = '',
)
msg_id = (msg_id + 1) % 100000000
b_ptr = 0
prod.produce(
topic = "DMC_detector",
value = message,
partition = 0,
)
# if poll_cnt % 1000 == 0:
prod.poll(0)
poll_cnt = (poll_cnt + 1) % 1000
threading.Thread(target=event_producer, daemon=True).start()
# Monitor Kafka Producer
monitor_queue = queue.Queue()
def monitor_producer():
producer_config = {
'bootstrap.servers': "linkafka01:9092",
'queue.buffering.max.messages': 1e7,
}
prod = Producer(producer_config)
monitor_buffer = [0 for i in range(MONITORS)]
monitor_time = [0 for i in range(MONITORS)]
st = time.time()
poll_cnt = 0
while True:
(d_id, timestamp) = monitor_queue.get()
monitor_buffer[d_id] += 1
monitor_time[d_id] = timestamp
nt = time.time()
if nt - st > 0.05:
st = nt
for i in range(MONITORS):
if monitor_buffer[d_id]:
message = streaming_data_types.serialise_f142(
source_name = f"monitor{d_id+1}",
value = monitor_buffer[d_id],
# ns resolution (supposed to be past epoch, not what the detector returns though)
timestamp_unix_ns = monitor_time[d_id] * 100 # send time of last monitor
)
prod.produce(
topic = "DMC_neutron_monitor",
value = message,
partition = 0,
)
monitor_buffer[d_id] = 0
if poll_cnt % 1000 == 0:
prod.poll(0)
poll_cnt = (poll_cnt + 1) % 1000
threading.Thread(target=monitor_producer, daemon=True).start()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_IP, UDP_PORT))
val = 0
start_time = time.time()
module_counts = [0 for i in range(10)]
EVENTS = 0
while True:
data, addr = sock.recvfrom(2056) # Buffer size is 1024 bytes
raw_header = data[:42]
raw_data = data[42:]
(buffer_length, buffer_type, header_length,
buffer_number, run_id, mcpd_status,
t_low, t_mid, t_high, *_) = memoryview(raw_header).cast('H')
mcpd_id = ( mcpd_status >> 8 ) & 0xff
mcpd_status = ( mcpd_status ) & 0x3
running_msg = "running" if (mcpd_status & 0x1) else "stopped"
sync_msg = "in sync" if (mcpd_status & 0x2) else "sync error"
timestamp = ( t_high << 32 ) | ( t_mid << 16 ) | t_low # 100 ns resolution
#print(f'Packet {int(timestamp * 1e-7)}s => buffer: {buffer_number}, length: {int(buffer_length*2/6)} events, status: {mcpd_status} {mcpd_id} {running_msg} with {sync_msg}')
# print(f'Packet => buffer: {mcpd_id}-{buffer_number}, length: {int((buffer_length-21)/3)} events, status: {mcpd_status}')
if time_offset is None:
time_offset = time.time() * 1e7 - timestamp
if buffer_number - module_counts[mcpd_id] != 1:
MISSED_PACKETS += 1
# if missed_packets_time_window.full():
# missed_packets_time_window.get_nowait()
# missed_packets_time_window.put(timestamp)
module_counts[mcpd_id] = buffer_number
for i in range(0, len(raw_data), 6):
event = memoryview(raw_data)[i:i+6]
event_type = event[5] >> 7
# print(event_type)
if event_type: # Trigger Event
t_id = ( event[5] >> 4 ) & 0x7
d_id = event[5] & 0xf
event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0]
# print(f'Trigger event {event_timestamp * 1e-7}s => TrigID: {t_id}, DataID: {d_id}')
t_w = time_window[d_id]
t_w.put_nowait(event_timestamp)
monitor_queue.put_nowait((d_id, event_timestamp))
else: # Neutron Event
x_pixels = 128
y_pixels = 128
amplitude = ( event[5] << 1 ) | ( event[4] >> 7 )
# The DMC StreamHistogrammer setup currently expects each module to
# be 128 * 128 pixels but the resolution in the packages is
# actually 10bit. We remove the lowest 3 bits.
x = (( (event[3] & 0x1f) << 5 | (event[2] & 0xf8) >> 3 ) & 0x3ff) >> 3
y = (( (event[4] & 0x7f) << 3 | (event[3] & 0xe0) >> 5 ) & 0x3ff) >> 3
event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0]
# print(f'Neutron event {event_timestamp * 1e-7}s: {amplitude}, x: {x}, y: {y}')
if event_last_timestamp is None:
event_last_timestamp = event_timestamp
# Seems like at higher frequencies these come very much out of order
# so this is very approximate
event_time_window[EVENT_WINDOW_PTR] = event_timestamp - event_last_timestamp
EVENT_WINDOW_PTR = (EVENT_WINDOW_PTR + 1) % EVENT_WINDOWSIZE
event_last_timestamp = event_timestamp
# I suppose this doesn't work mostly due to the timestamps ordering...
# event_timestamp_seconds = event_timestamp * 1e-7
# if event_last_timestamp is None:
# event_last_timestamp = event_timestamp_seconds
# f_cutoff = 1e6 # Hz
# tau = 1 / ( 2 * math.pi * f_cutoff)
# dt = event_timestamp_seconds - event_last_timestamp
# if dt > 0:
# w = math.exp(-dt / tau)
# event_average_rate = w * dt + event_average_rate * (1 - w)
# event_last_timestamp = event_timestamp_seconds
# EVENTS += 1
# a = (mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y
# print((EVENTS, x, y, a, a < 128 * 128 * 9, mcpd_id))
# if not a < 128 * 128 * 9:
# print((event[3], event[3] << 5, event[2], event[2] >> 3))
event_queue.put_nowait((
(mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y,
event_timestamp
))