less_logic_in_udp_thread #1

Merged
wall_e merged 35 commits from less_logic_in_udp_thread into main 2025-11-14 14:11:18 +01:00
17 changed files with 1942 additions and 346 deletions

245
.clang-format Normal file
View File

@@ -0,0 +1,245 @@
---
Language: Cpp
# BasedOnStyle: LLVM
AccessModifierOffset: -2
AlignAfterOpenBracket: Align
AlignArrayOfStructures: None
AlignConsecutiveAssignments:
Enabled: false
AcrossEmptyLines: false
AcrossComments: false
AlignCompound: false
AlignFunctionPointers: false
PadOperators: true
AlignConsecutiveBitFields:
Enabled: false
AcrossEmptyLines: false
AcrossComments: false
AlignCompound: false
AlignFunctionPointers: false
PadOperators: false
AlignConsecutiveDeclarations:
Enabled: false
AcrossEmptyLines: false
AcrossComments: false
AlignCompound: false
AlignFunctionPointers: false
PadOperators: false
AlignConsecutiveMacros:
Enabled: false
AcrossEmptyLines: false
AcrossComments: false
AlignCompound: false
AlignFunctionPointers: false
PadOperators: false
AlignConsecutiveShortCaseStatements:
Enabled: false
AcrossEmptyLines: false
AcrossComments: false
AlignCaseColons: false
AlignEscapedNewlines: Right
AlignOperands: Align
AlignTrailingComments:
Kind: Always
OverEmptyLines: 0
AllowAllArgumentsOnNextLine: true
AllowAllParametersOfDeclarationOnNextLine: true
AllowBreakBeforeNoexceptSpecifier: Never
AllowShortBlocksOnASingleLine: Never
AllowShortCaseLabelsOnASingleLine: false
AllowShortCompoundRequirementOnASingleLine: true
AllowShortEnumsOnASingleLine: true
AllowShortFunctionsOnASingleLine: All
AllowShortIfStatementsOnASingleLine: Never
AllowShortLambdasOnASingleLine: All
AllowShortLoopsOnASingleLine: false
AlwaysBreakAfterDefinitionReturnType: None
AlwaysBreakAfterReturnType: None
AlwaysBreakBeforeMultilineStrings: false
AlwaysBreakTemplateDeclarations: MultiLine
AttributeMacros:
- __capability
BinPackArguments: true
BinPackParameters: true
BitFieldColonSpacing: Both
BraceWrapping:
AfterCaseLabel: false
AfterClass: false
AfterControlStatement: Never
AfterEnum: false
AfterExternBlock: false
AfterFunction: false
AfterNamespace: false
AfterObjCDeclaration: false
AfterStruct: false
AfterUnion: false
BeforeCatch: false
BeforeElse: false
BeforeLambdaBody: false
BeforeWhile: false
IndentBraces: false
SplitEmptyFunction: true
SplitEmptyRecord: true
SplitEmptyNamespace: true
BreakAdjacentStringLiterals: true
BreakAfterAttributes: Leave
BreakAfterJavaFieldAnnotations: false
BreakArrays: true
BreakBeforeBinaryOperators: None
BreakBeforeConceptDeclarations: Always
BreakBeforeBraces: Attach
BreakBeforeInlineASMColon: OnlyMultiline
BreakBeforeTernaryOperators: true
BreakConstructorInitializers: BeforeColon
BreakInheritanceList: BeforeColon
BreakStringLiterals: true
ColumnLimit: 80
CommentPragmas: '^ IWYU pragma:'
CompactNamespaces: false
ConstructorInitializerIndentWidth: 4
ContinuationIndentWidth: 4
Cpp11BracedListStyle: true
DerivePointerAlignment: false
DisableFormat: false
EmptyLineAfterAccessModifier: Never
EmptyLineBeforeAccessModifier: LogicalBlock
ExperimentalAutoDetectBinPacking: false
FixNamespaceComments: true
ForEachMacros:
- foreach
- Q_FOREACH
- BOOST_FOREACH
IfMacros:
- KJ_IF_MAYBE
IncludeBlocks: Preserve
IncludeCategories:
- Regex: '^"(llvm|llvm-c|clang|clang-c)/'
Priority: 2
SortPriority: 0
CaseSensitive: false
- Regex: '^(<|"(gtest|gmock|isl|json)/)'
Priority: 3
SortPriority: 0
CaseSensitive: false
- Regex: '.*'
Priority: 1
SortPriority: 0
CaseSensitive: false
IncludeIsMainRegex: '(Test)?$'
IncludeIsMainSourceRegex: ''
IndentAccessModifiers: false
IndentCaseBlocks: false
IndentCaseLabels: false
IndentExternBlock: AfterExternBlock
IndentGotoLabels: true
IndentPPDirectives: None
IndentRequiresClause: true
IndentWidth: 4
IndentWrappedFunctionNames: false
InsertBraces: false
InsertNewlineAtEOF: false
InsertTrailingCommas: None
IntegerLiteralSeparator:
Binary: 0
BinaryMinDigits: 0
Decimal: 0
DecimalMinDigits: 0
Hex: 0
HexMinDigits: 0
JavaScriptQuotes: Leave
JavaScriptWrapImports: true
KeepEmptyLinesAtTheStartOfBlocks: true
KeepEmptyLinesAtEOF: false
LambdaBodyIndentation: Signature
LineEnding: DeriveLF
MacroBlockBegin: ''
MacroBlockEnd: ''
MaxEmptyLinesToKeep: 1
NamespaceIndentation: None
ObjCBinPackProtocolList: Auto
ObjCBlockIndentWidth: 2
ObjCBreakBeforeNestedBlockParam: true
ObjCSpaceAfterProperty: false
ObjCSpaceBeforeProtocolList: true
PackConstructorInitializers: BinPack
PenaltyBreakAssignment: 2
PenaltyBreakBeforeFirstCallParameter: 19
PenaltyBreakComment: 300
PenaltyBreakFirstLessLess: 120
PenaltyBreakOpenParenthesis: 0
PenaltyBreakScopeResolution: 500
PenaltyBreakString: 1000
PenaltyBreakTemplateDeclaration: 10
PenaltyExcessCharacter: 1000000
PenaltyIndentedWhitespace: 0
PenaltyReturnTypeOnItsOwnLine: 60
PointerAlignment: Right
PPIndentWidth: -1
QualifierAlignment: Leave
ReferenceAlignment: Pointer
ReflowComments: true
RemoveBracesLLVM: false
RemoveParentheses: Leave
RemoveSemicolon: false
RequiresClausePosition: OwnLine
RequiresExpressionIndentation: OuterScope
SeparateDefinitionBlocks: Leave
ShortNamespaceLines: 1
SkipMacroDefinitionBody: false
SortIncludes: CaseSensitive
SortJavaStaticImport: Before
SortUsingDeclarations: LexicographicNumeric
SpaceAfterCStyleCast: false
SpaceAfterLogicalNot: false
SpaceAfterTemplateKeyword: true
SpaceAroundPointerQualifiers: Default
SpaceBeforeAssignmentOperators: true
SpaceBeforeCaseColon: false
SpaceBeforeCpp11BracedList: false
SpaceBeforeCtorInitializerColon: true
SpaceBeforeInheritanceColon: true
SpaceBeforeJsonColon: false
SpaceBeforeParens: ControlStatements
SpaceBeforeParensOptions:
AfterControlStatements: true
AfterForeachMacros: true
AfterFunctionDefinitionName: false
AfterFunctionDeclarationName: false
AfterIfMacros: true
AfterOverloadedOperator: false
AfterPlacementOperator: true
AfterRequiresInClause: false
AfterRequiresInExpression: false
BeforeNonEmptyParentheses: false
SpaceBeforeRangeBasedForLoopColon: true
SpaceBeforeSquareBrackets: false
SpaceInEmptyBlock: false
SpacesBeforeTrailingComments: 1
SpacesInAngles: Never
SpacesInContainerLiterals: true
SpacesInLineCommentPrefix:
Minimum: 1
Maximum: -1
SpacesInParens: Never
SpacesInParensOptions:
InCStyleCasts: false
InConditionalStatements: false
InEmptyParentheses: false
Other: false
SpacesInSquareBrackets: false
Standard: Latest
StatementAttributeLikeMacros:
- Q_EMIT
StatementMacros:
- Q_UNUSED
- QT_REQUIRE_VERSION
TabWidth: 8
UseTab: Never
VerilogBreakBetweenInstancePorts: true
WhitespaceSensitiveMacros:
- BOOST_PP_STRINGIZE
- CF_SWIFT_NAME
- NS_SWIFT_NAME
- PP_STRINGIZE
- STRINGIZE
...

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
O.*
.*ignore
schemas/

6
.gitmodules vendored Normal file
View File

@@ -0,0 +1,6 @@
[submodule "dep/streaming-data-types"]
path = dep/streaming-data-types
url = git@gitea.psi.ch:lin-controls/streaming-data-types.git
[submodule "dep/flatbuffers"]
path = dep/flatbuffers
url = git@gitea.psi.ch:lin-controls/flatbuffers.git

30
Makefile Normal file
View File

@@ -0,0 +1,30 @@
# Include the external Makefile
include /ioc/tools/driver.makefile
MODULE=StreamGenerator
BUILDCLASSES=Linux
EPICS_VERSIONS=7.0.7
#ARCH_FILTER=RHEL%
ARCH_FILTER=linux-x86_64
# Additional module dependencies
REQUIRED+=asyn
DBDS += src/asynStreamGeneratorDriver.dbd
# DB files to include in the release
TEMPLATES += db/channels.db db/daq_common.db
# HEADERS += src/asynStreamGeneratorDriver.h
# Source files to build
SOURCES += src/asynStreamGeneratorDriver.cpp
# I don't think specifying the optimisation level like this is correct...
# but I doesn't hurt :D
USR_CFLAGS += -O3 -Wall -Wextra -Wunused-result -Werror -fvisibility=hidden # -Wpedantic // Does not work because EPICS macros trigger warnings
# Required to support EV42/44
USR_CXXFLAGS += -O3 -I../dep/flatbuffers/include/ -I../schemas
LIB_SYS_LIBS += rdkafka

41
README.md Normal file
View File

@@ -0,0 +1,41 @@
# StreamGenerator
Clone the repository to a local directory via:
```
git clone --recurse-submodules -j8 git@gitea.psi.ch:lin-instrument-computers/StreamGenerator.git
```
## Dependencies
Currently, this project requires a system install of librdkafka. On Redhat,
this means you should run:
```bash
dnf install -y librdkafka-devel
```
Additionally, you must first build Google's *flatbuffers* and ESS's
**streaming-data-types** libraries, which are both included in this project as
submodules under the `dep` directory and which are both necessary to build this
project.
First, you should enter the *flatbuffers* directory and run the following:
```bash
cmake -G "Unix Makefiles"
make -j
```
After these steps, you will find the program `flatc` has been built and placed
in the directory.
Next, you should return to the top of this project's directory tree, and create
the flatbuffers from ESS's schema files. This you can do as follows:
```bash
./dep/flatbuffers/flatc -o schemas/ --cpp --gen-mutable --gen-name-strings --scoped-enums ./dep/streaming-data-types/schemas/*
```
This generates header files from each of ESS's schemas and places them in a
schemas directory.

75
db/channels.db Normal file
View File

@@ -0,0 +1,75 @@
# EPICS Database for streamdevice specific to measurement channels
#
# Macros
# INSTR - Prefix
# NAME - the device name, e.g. EL737
# PORT - StreamGenerator Port
# CHANNEL - the number associated with the measurment channel
################################################################################
# Status Variables
# Trigger a change in status as clearing
record(bo, "$(INSTR)$(NAME):T$(CHANNEL)")
{
field(DESC, "Trigger Clearing Status")
field(VAL, 1)
field(OUT, "$(INSTR)$(NAME):S$(CHANNEL) PP")
}
# Trigger a change in status as value returned to 0
record(seq, "$(INSTR)$(NAME):O$(CHANNEL)")
{
field(DESC, "Trigger Returned to 0 Status")
field(LNK0, "$(INSTR)$(NAME):S$(CHANNEL) PP")
field(DO0, 0)
field(SELM, "Specified")
field(SELL, "$(INSTR)$(NAME):M$(CHANNEL).VAL")
field(SCAN, ".1 second")
}
# Current Status of Channel, i.e. is it ready to count?
record(bi, "$(INSTR)$(NAME):S$(CHANNEL)")
{
field(DESC, "Channel Status")
field(VAL, 0)
field(ZNAM, "OK")
field(ONAM, "CLEARING")
field(PINI, 1)
}
################################################################################
# Count Commands
record(longout, "$(INSTR)$(NAME):C$(CHANNEL)")
{
field(DESC, "Clear the current channel count")
field(DTYP, "asynInt32")
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) C_$(CHANNEL)")
field(FLNK, "$(INSTR)$(NAME):T$(CHANNEL)")
}
################################################################################
# Read all monitors values
record(int64in, "$(INSTR)$(NAME):M$(CHANNEL)")
{
field(DESC, "DAQ CH$(CHANNEL)")
field(EGU, "cts")
field(DTYP, "asynInt64")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) COUNTS$(CHANNEL)")
# This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order
# field(SCAN, "I/O Intr")
field(PINI, "YES")
}
record(ai, "$(INSTR)$(NAME):R$(CHANNEL)")
{
field(DESC, "Rate of DAQ CH$(CHANNEL)")
field(EGU, "cts/sec")
field(DTYP, "asynInt32")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) RATE$(CHANNEL)")
field(SCAN, ".2 second")
# field(SCAN, "I/O Intr")
field(PINI, "YES")
}

277
db/daq_common.db Normal file
View File

@@ -0,0 +1,277 @@
# EPICS Database for streamdevice specific to measurement channels
#
# Macros
# INSTR - Prefix
# NAME - the device name, e.g. EL737
# PORT - StreamGenerator Port
record(longout, "$(INSTR)$(NAME):FULL-RESET")
{
field(DESC, "Reset the DAQ")
field(DTYP, "asynInt32")
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) RESET")
}
################################################################################
# Status Variables
# We separate the RAW-STATUS and the STATUS PV so that the state can be updated
# in a sequence, that guarantees that we included the most recent time and
# counts before the status switches back to Idle.
# We do this via a sequenced update
#
# RAW-STATUS -> ELAPSED-SECONDS -> M* -> STATUS
record(mbbi, "$(INSTR)$(NAME):RAW-STATUS")
{
field(DESC, "DAQ Status")
field(DTYP, "asynInt32")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) STATUS")
field(ZRVL, "0")
field(ZRST, "Idle")
field(ONVL, "1")
field(ONST, "Counting")
field(TWVL, "2")
field(TWST, "Low rate")
field(THVL, "3")
field(THST, "Paused")
# 4 should never happen, if it does it means the DAQ reports undocumented statusbits
field(FRVL, "4")
field(FRST, "INVALID")
# This is probably too fast. We could trigger things the same as sinqDAQ to ensure the db is update in the same order
#field(SCAN, "I/O Intr")
field(SCAN, ".2 second")
field(FLNK, "$(INSTR)$(NAME):READALL")
field(PINI, "YES")
}
record(fanout, "$(INSTR)$(NAME):READALL")
{
field(SELM, "All")
field(LNK0, "$(INSTR)$(NAME):ELAPSED-TIME PP")
field(LNK1, "$(INSTR)$(NAME):M0")
field(LNK2, "$(INSTR)$(NAME):M1")
field(LNK3, "$(INSTR)$(NAME):M2")
field(LNK4, "$(INSTR)$(NAME):M3")
field(LNK5, "$(INSTR)$(NAME):M4")
# Doesn't seemt o be a problem to have more in here :D
# field(LNK6, "$(INSTR)$(NAME):M5")
# field(LNK7, "$(INSTR)$(NAME):M6")
field(FLNK, "$(INSTR)$(NAME):STATUS")
}
record(mbbi, "$(INSTR)$(NAME):STATUS")
{
field(INP, "$(INSTR)$(NAME):RAW-STATUS NPP")
field(DESC, "DAQ Status")
field(ZRVL, "0")
field(ZRST, "Idle")
field(ONVL, "1")
field(ONST, "Counting")
field(TWVL, "2")
field(TWST, "Low rate")
field(THVL, "3")
field(THST, "Paused")
# 4 should never happen, if it does it means the DAQ reports undocumented statusbits
field(FRVL, "4")
field(FRST, "INVALID")
field(PINI, "YES")
}
record(longin, "$(INSTR)$(NAME):CHANNELS")
{
field(DESC, "Total Supported Channels")
field(VAL, $(CHANNELS))
field(DISP, 1)
}
# Trigger a change in status as clearing
record(bo, "$(INSTR)$(NAME):ETT")
{
field(DESC, "Trigger Clearing Status")
field(VAL, 1)
field(OUT, "$(INSTR)$(NAME):ETS PP")
}
# Trigger a change in status as value returned to 0
record(seq, "$(INSTR)$(NAME):ETO")
{
field(DESC, "Trigger Returned to 0 Status")
field(LNK0, "$(INSTR)$(NAME):ETS PP")
field(DO0, 0)
field(SELM, "Specified")
field(SELL, "$(INSTR)$(NAME):ELAPSED-TIME.VAL")
field(SCAN, ".1 second")
}
# Current Status of Channel, i.e. is it ready to count?
record(bi, "$(INSTR)$(NAME):ETS")
{
field(DESC, "Channel Status")
field(VAL, 0)
field(ZNAM, "OK")
field(ONAM, "CLEARING")
field(PINI, 1)
}
################################################################################
# Count Commands
record(ao,"$(INSTR)$(NAME):PRESET-COUNT")
{
field(DESC, "Count until preset reached")
field(DTYP, "asynInt32")
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) P_CNT")
field(VAL, 0)
field(PREC, 2)
}
record(ao,"$(INSTR)$(NAME):PRESET-TIME")
{
field(DESC, "Count for specified time")
field(EGU, "seconds")
field(DTYP, "asynInt32")
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) P_TIME")
field(VAL, 0)
field(PREC, 2)
}
# record(bo,"$(INSTR)$(NAME):PAUSE")
# {
# field(DESC, "Pause the current count")
# field(DTYP, "stream")
# field(OUT, "@... pauseCount($(INSTR)$(NAME):) $(PORT)")
# field(VAL, "0")
# field(FLNK, "$(INSTR)$(NAME):RAW-STATUS")
# }
#
# record(bo,"$(INSTR)$(NAME):CONTINUE")
# {
# field(DESC, "Continue with a count that was paused")
# field(DTYP, "stream")
# field(OUT, "@... continueCount($(INSTR)$(NAME):) $(PORT)")
# field(VAL, "0")
# field(FLNK, "$(INSTR)$(NAME):RAW-STATUS")
# }
record(longout, "$(INSTR)$(NAME):STOP")
{
field(DESC, "Stop the current counting operation")
field(DTYP, "asynInt32")
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) STOP")
}
record(longout, "$(INSTR)$(NAME):MONITOR-CHANNEL")
{
field(DESC, "PRESET-COUNT Monitors this channel")
field(DTYP, "asynInt32")
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) MONITOR")
field(DRVL, "0") # Smallest Monitor Channel
field(DRVH, "$(CHANNELS)") # Largest Monitor Channel
}
record(longin, "$(INSTR)$(NAME):MONITOR-CHANNEL_RBV")
{
field(DESC, "PRESET-COUNT Monitors this channel")
field(DTYP, "asynInt32")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) MONITOR")
field(SCAN, "I/O Intr")
field(PINI, "YES")
}
record(ao,"$(INSTR)$(NAME):THRESHOLD")
{
field(DESC, "Minimum rate for counting to proceed")
field(DTYP, "asynInt32")
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH")
field(VAL, "1") # Default Rate
field(DRVL, "1") # Minimum Rate
field(DRVH, "100000") # Maximum Rate
}
record(ai,"$(INSTR)$(NAME):THRESHOLD_RBV")
{
field(DESC, "Minimum rate for counting to proceed")
field(EGU, "cts/sec")
field(DTYP, "asynInt32")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH")
field(SCAN, "I/O Intr")
field(PINI, "YES")
}
record(longout,"$(INSTR)$(NAME):THRESHOLD-MONITOR")
{
field(DESC, "Channel monitored for minimum rate")
field(DTYP, "asynInt32")
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH_CH")
field(VAL, "1") # Monitor
field(DRVL, "0") # Smallest Threshold Channel (0 is off)
field(DRVH, "$(CHANNELS)") # Largest Threshold Channel
}
record(longin,"$(INSTR)$(NAME):THRESHOLD-MONITOR_RBV")
{
field(DESC, "Channel monitored for minimum rate")
field(EGU, "CH")
field(DTYP, "asynInt32")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) THRESH_CH")
field(SCAN, "I/O Intr")
field(PINI, "YES")
}
record(longout, "$(INSTR)$(NAME):CT")
{
field(DESC, "Clear the timer")
field(DTYP, "asynInt32")
field(OUT, "@asyn($(PORT),0,$(TIMEOUT=1)) C_TIME")
field(FLNK, "$(INSTR)$(NAME):ETT")
}
################################################################################
# Read all monitors values
record(ai, "$(INSTR)$(NAME):ELAPSED-TIME")
{
field(DESC, "DAQ Measured Time")
field(EGU, "sec")
field(DTYP, "asynFloat64")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) TIME")
# field(SCAN, "I/O Intr")
field(PINI, "YES")
# field(FLNK, "$(INSTR)$(NAME):ETO")
}
################################################################################
# Stream Generator Status PVs
record(longin,"$(INSTR)$(NAME):UDP_DROPPED")
{
field(DESC, "UDP Packets Missed")
field(EGU, "Events")
field(DTYP, "asynInt32")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) DROP")
# field(SCAN, "I/O Intr")
field(SCAN, "1 second")
field(PINI, "YES")
}
record(longin,"$(INSTR)$(NAME):UDP_WATERMARK")
{
field(DESC, "UDP Queue Usage")
field(EGU, "%")
field(DTYP, "asynInt32")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) UDP")
# field(SCAN, "I/O Intr")
field(SCAN, "1 second")
field(PINI, "YES")
}
record(longin,"$(INSTR)$(NAME):SORTED_WATERMARK")
{
field(DESC, "Partial Sort Queue Usage")
field(EGU, "%")
field(DTYP, "asynInt32")
field(INP, "@asyn($(PORT),0,$(TIMEOUT=1)) SORT")
# field(SCAN, "I/O Intr")
field(SCAN, "1 second")
field(PINI, "YES")
}

1
dep/flatbuffers Submodule

Submodule dep/flatbuffers added at 1872409707

View File

@@ -1,4 +0,0 @@
confluent-kafka==2.12.1
ess-streaming-data-types==0.27.0
flatbuffers==25.9.23
numpy==1.26.3

9
scripts/ioc.sh Executable file
View File

@@ -0,0 +1,9 @@
#!/bin/bash
export EPICS_HOST_ARCH=linux-x86_64
export EPICS_BASE=/usr/local/epics/base-7.0.7
PARENT_PATH="$( cd "$(dirname "${BASH_SOURCE[0]}")" ; pwd -P )"
# /usr/local/bin/procServ -o -L - -f -i ^D^C 20001 "${PARENT_PATH}/st.cmd" -d
${PARENT_PATH}/st.cmd

38
scripts/st.cmd Executable file
View File

@@ -0,0 +1,38 @@
#!/usr/local/bin/iocsh
#-d
on error break
require StreamGenerator, test
epicsEnvSet("INSTR", "SQ:TEST:")
epicsEnvSet("NAME", "SG")
# Local UDP Generator Test Config
# drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:54321 UDP", 0, 0, 1)
# Correlation Unit Config
drvAsynIPPortConfigure("ASYN_IP_PORT", "172.28.69.20:54321:54321 UDP", 0, 0, 1)
# With a udpQueue and sortQueue size of 10'000 packets, we can hold in memory
# 10'000 * 243 = 2.43e6 events
# Kafka Broker and Topic Configuration
# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "linkafka01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 10000, 20480)
# asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "ess01:9092", "NEWEFU_TEST", "NEWEFU_TEST2", 10000, 20480)
# Don't send any kafka messages
asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4, 10000, "", "", "", 0, 0)
dbLoadRecords("$(StreamGenerator_DB)daq_common.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNELS=5")
# Detector Count Channel
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=0")
# Monitor Channels
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=1")
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=2")
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=3")
dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=4")
iocInit()

118
scripts/udp_gen.py Normal file
View File

@@ -0,0 +1,118 @@
import socket
import time
import random
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
header = [
0, 0, # buffer length in 16bit words (1, 0) == 1, (0, 1) == 256
0, 0x80, # buffer type (probably should be 0)
21, 0, # header length
0, 0, # buffer number
0, 0, # run id
0x3, # status
0, # id of sending module
0, 0, # timestamp low
0, 0, # timestamp mid
0, 0, # timestamp high
] + [0, 0] * 12 # parameters
data = [
0,
0,
0,
0,
0,
0
]
start_time = time.time_ns() // 100
buffer_ids = {
i: (0, 0) for i in range(10)
}
while True:
# update timestamp
base_timestamp = time.time_ns() // 100 - start_time
t_low = base_timestamp & 0xffff
t_mid = (base_timestamp >> 16) & 0xffff
t_high = (base_timestamp >> 32) & 0xffff
header[12] = t_low & 0xff
header[13] = t_low >> 8
header[14] = t_mid & 0xff
header[15] = t_mid >> 8
header[16] = t_high & 0xff
header[17] = t_high >> 8
num_events = random.randint(0, 243)
# num_events = 243
# num_events = 1
# update buffer length
buffer_length = 21 + num_events * 3
header[0] = buffer_length & 0xff
header[1] = (buffer_length >> 8) & 0xff
# I believe, that in our case we never mix monitor and detector events as
# the monitors should have id 0 and the detector events 1-9 so I have
# excluded that posibility here. That would, however, if true mean we could
# reduce also the number of checks on the parsing side of things...
is_monitor = random.randint(0, 9)
# is_monitor = 4
header[11] = 0 if is_monitor > 3 else random.randint(1,9)
# update buffer number (each mcpdid has its own buffer number count)
header[6], header[7] = buffer_ids[header[11]]
header[6] = (header[6] + 1) % (0xff + 1)
header[7] = (header[7] + (header[6] == 0)) % (0xff + 1)
buffer_ids[header[11]] = header[6], header[7]
tosend = list(header)
if is_monitor > 3:
for i in range(num_events):
d = list(data)
monitor = random.randint(0,3)
# monitor = 0
d[5] = (1 << 7) | monitor
# update trigger timestamp
event_timestamp = (time.time_ns() // 100) - base_timestamp
d[0] = event_timestamp & 0xff
d[1] = (event_timestamp >> 8) & 0xff
d[2] = (event_timestamp >> 16) & 0x07
tosend += d
else:
for i in range(num_events):
d = list(data)
amplitude = random.randint(0, 255)
x_pos = random.randint(0, 1023)
y_pos = random.randint(0, 1023)
event_timestamp = (time.time_ns() // 100) - base_timestamp
d[5] = (0 << 7) | (amplitude >> 1)
d[4] = ((amplitude & 0x01) << 7) | (y_pos >> 3)
d[3] = ((y_pos << 5) & 0xE0) | (x_pos >> 5)
d[2] = ((x_pos << 3) & 0xF8)
d[0] = event_timestamp & 0xff
d[1] = (event_timestamp >> 8) & 0xff
d[2] |= (event_timestamp >> 16) & 0x07
tosend += d
sock.sendto(bytes(tosend), ('127.0.0.1', 54321))
mv = memoryview(bytes(header)).cast('H')
print(f'Sent packet {mv[3]} with {num_events} events {base_timestamp}')
# time.sleep(.01)

View File

@@ -0,0 +1,910 @@
#include "asynOctetSyncIO.h"
#include "ev42_events_generated.h"
#include <cmath>
#include <cstring>
#include <epicsStdio.h>
#include <iocsh.h>
#include <queue>
// Just for printing
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include "asynStreamGeneratorDriver.h"
#include <epicsExport.h>
/*******************************************************************************
* Kafka Methods
*/
static void set_kafka_config_key(rd_kafka_conf_t *conf, char *key,
char *value) {
char errstr[512];
rd_kafka_conf_res_t res;
res = rd_kafka_conf_set(conf, key, value, errstr, sizeof(errstr));
if (res != RD_KAFKA_CONF_OK) {
epicsStdoutPrintf("Failed to set config value %s : %s\n", key, value);
exit(1);
}
}
static rd_kafka_t *create_kafka_producer(const char *kafkaBroker) {
char errstr[512];
rd_kafka_t *producer;
// Prepare configuration object
rd_kafka_conf_t *conf = rd_kafka_conf_new();
// TODO feel not great about this
set_kafka_config_key(conf, "bootstrap.servers",
const_cast<char *>(kafkaBroker));
set_kafka_config_key(conf, "queue.buffering.max.messages", "10000000");
// With 2e6 counts / s
// and a packet size of 20480 events (163920 bytes)
// this implies we need to send around 100 messages a second
// and need about .2 gigabit upload
// set_kafka_config_key(conf, "queue.buffering.max.kbytes", "10000000");
// Create the Producer
producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!producer) {
epicsStdoutPrintf("Failed to create Kafka Producer: %s\n", errstr);
exit(1);
}
return producer;
}
/*******************************************************************************
* Static Methods Passed to Epics Threads that should run in the background
*/
static void udpPollerTask(void *drvPvt) {
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
pSGD->receiveUDP();
}
static void udpNormaliserTask(void *drvPvt) {
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
pSGD->normaliseUDP();
}
static void sortTask(void *drvPvt) {
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
pSGD->partialSortEvents();
}
static void daqTask(void *drvPvt) {
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
pSGD->processEvents();
}
static void monitorProducerTask(void *drvPvt) {
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
pSGD->produceMonitor();
}
static void detectorProducerTask(void *drvPvt) {
asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt;
pSGD->produceDetector();
}
/*******************************************************************************
* Stream Generator Helper Methods
*/
asynStatus asynStreamGeneratorDriver::createInt32Param(
asynStatus status, char *name, int *variable, epicsInt32 initialValue) {
// TODO should show error if there is one
return (asynStatus)(status | createParam(name, asynParamInt32, variable) |
setIntegerParam(*variable, initialValue));
}
asynStatus asynStreamGeneratorDriver::createInt64Param(
asynStatus status, char *name, int *variable, epicsInt64 initialValue) {
// TODO should show error if there is one
return (asynStatus)(status | createParam(name, asynParamInt64, variable) |
setInteger64Param(*variable, initialValue));
}
asynStatus asynStreamGeneratorDriver::createFloat64Param(asynStatus status,
char *name,
int *variable,
double initialValue) {
// TODO should show error if there is one
return (asynStatus)(status | createParam(name, asynParamFloat64, variable) |
setDoubleParam(*variable, initialValue));
}
/*******************************************************************************
* Stream Generator Methods
*/
asynStreamGeneratorDriver::asynStreamGeneratorDriver(
const char *portName, const char *ipPortName, const int numChannels,
const int udpQueueSize, const bool enableKafkaStream,
const char *kafkaBroker, const char *monitorTopic,
const char *detectorTopic, const int kafkaQueueSize,
const int kafkaMaxPacketSize)
: asynPortDriver(portName, 1, /* maxAddr */
asynInt32Mask | asynInt64Mask | asynFloat64Mask |
asynDrvUserMask, /* Interface mask */
asynInt32Mask, // | asynFloat64Mask, /* Interrupt mask */
0, /* asynFlags. This driver does not block and it is
not multi-device, but has a
destructor ASYN_DESTRUCTIBLE our version of the Asyn
is too old to support this flag */
1, /* Autoconnect */
0, /* Default priority */
0), /* Default stack size*/
num_channels(numChannels + 1), kafkaEnabled(enableKafkaStream),
monitorTopic(monitorTopic), detectorTopic(detectorTopic),
udpQueueSize(udpQueueSize), kafkaQueueSize(kafkaQueueSize),
// measured in max packet sizes
udpQueue(
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
normalisedQueue(
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
// TODO configurable sizes
sortedQueue(
epicsRingBytesCreate(243 * udpQueueSize * sizeof(NormalisedEvent))),
monitorQueue(
epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))),
detectorQueue(
epicsRingBytesCreate(243 * kafkaQueueSize * sizeof(NormalisedEvent))),
kafkaMaxPacketSize(kafkaMaxPacketSize) {
const char *functionName = "asynStreamGeneratorDriver";
// Parameter Setup
asynStatus status = asynSuccess;
status = createInt32Param(status, P_StatusString, &P_Status, STATUS_IDLE);
status = createInt32Param(status, P_ResetString, &P_Reset);
status = createInt32Param(status, P_StopString, &P_Stop);
status = createInt32Param(status, P_CountPresetString, &P_CountPreset);
status = createInt32Param(status, P_TimePresetString, &P_TimePreset);
status = createFloat64Param(status, P_ElapsedTimeString, &P_ElapsedTime);
status =
createInt32Param(status, P_ClearElapsedTimeString, &P_ClearElapsedTime);
status =
createInt32Param(status, P_MonitorChannelString, &P_MonitorChannel);
status = createInt32Param(status, P_ThresholdString, &P_Threshold, 1);
status = createInt32Param(status, P_ThresholdChannelString,
&P_ThresholdChannel, 1);
// Create Parameters templated on Channel Number
char pv_name_buffer[100];
P_Counts = new int[this->num_channels];
P_Rates = new int[this->num_channels];
P_ClearCounts = new int[this->num_channels];
for (std::size_t i = 0; i < this->num_channels; ++i) {
memset(pv_name_buffer, 0, 100);
epicsSnprintf(pv_name_buffer, 100, P_CountsString, i);
status = createInt64Param(status, pv_name_buffer, P_Counts + i);
memset(pv_name_buffer, 0, 100);
epicsSnprintf(pv_name_buffer, 100, P_RateString, i);
status = createInt32Param(status, pv_name_buffer, P_Rates + i);
memset(pv_name_buffer, 0, 100);
epicsSnprintf(pv_name_buffer, 100, P_ClearCountsString, i);
status = createInt32Param(status, pv_name_buffer, P_ClearCounts + i);
}
status = createInt32Param(status, P_UdpDroppedString, &P_UdpDropped);
status = createInt32Param(status, P_UdpQueueHighWaterMarkString,
&P_UdpQueueHighWaterMark);
status = createInt32Param(status, P_SortedQueueHighWaterMarkString,
&P_SortedQueueHighWaterMark);
if (status) {
epicsStdoutPrintf(
"%s:%s: failed to create or setup parameters, status=%d\n",
driverName, functionName, status);
exit(1);
}
// Create Events
// this->pausedEventId = epicsEventCreate(epicsEventEmpty);
if (enableKafkaStream) {
epicsStdoutPrintf(
"Detector Kafka Config: broker=%s, topic=%s\n "
" queue size:%d, max events per packet: %d\n",
kafkaBroker, this->detectorTopic, kafkaQueueSize,
this->kafkaMaxPacketSize);
epicsStdoutPrintf(
"Monitors Kafka Config: broker=%s, topic=%s\n "
" queue size:%d, max events per packet: %d\n",
kafkaBroker, this->monitorTopic, kafkaQueueSize,
this->kafkaMaxPacketSize);
this->monitorProducer = create_kafka_producer(kafkaBroker);
this->detectorProducer = create_kafka_producer(kafkaBroker);
// Setup for Thread Producing Monitor Kafka Events
status =
(asynStatus)(epicsThreadCreate(
"monitor_produce", epicsThreadPriorityMedium,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::monitorProducerTask,
this) == NULL);
if (status) {
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
driverName, functionName, status);
exit(1);
}
// Setup for Thread Producing Detector Kafka Events
status =
(asynStatus)(epicsThreadCreate(
"monitor_produce", epicsThreadPriorityMedium,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::detectorProducerTask,
this) == NULL);
if (status) {
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
driverName, functionName, status);
exit(1);
}
} else {
epicsStdoutPrintf("Kafka Stream Disabled\n");
}
/* Create the thread that orders the events and acts as our sinqDaq stand-in
*/
status =
(asynStatus)(epicsThreadCreate(
"sinqDAQ",
epicsThreadPriorityMedium, // epicsThreadPriorityMax,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::daqTask, this) == NULL);
if (status) {
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
driverName, functionName, status);
exit(1);
}
/* Create the thread that orders packets of in preparation for our sinqDAQ
* stand-in
*/
status = (asynStatus)(epicsThreadCreate(
"partialSort", epicsThreadPriorityMedium,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::sortTask, this) == NULL);
if (status) {
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
driverName, functionName, status);
exit(1);
}
/* Create the thread normalises the events
*/
status =
(asynStatus)(epicsThreadCreate(
"eventNormaliser",
epicsThreadPriorityMedium, // epicsThreadPriorityMax,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::udpNormaliserTask, this) == NULL);
if (status) {
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
driverName, functionName, status);
exit(1);
}
// UDP Receive Setup
status = pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL);
if (status) {
epicsStdoutPrintf("%s:%s: Couldn't open connection %s, status=%d\n",
driverName, functionName, ipPortName, status);
exit(1);
}
/* Create the thread that receives UDP traffic in the background */
status = (asynStatus)(epicsThreadCreate(
"udp_receive", epicsThreadPriorityMax,
epicsThreadGetStackSize(epicsThreadStackMedium),
(EPICSTHREADFUNC)::udpPollerTask, this) == NULL);
if (status) {
epicsStdoutPrintf("%s:%s: epicsThreadCreate failure, status=%d\n",
driverName, functionName, status);
exit(1);
}
}
asynStreamGeneratorDriver::~asynStreamGeneratorDriver() {
// should make sure queues are empty and freed
// and that the kafka producers are flushed and freed
delete[] P_Counts;
delete[] P_Rates;
// TODO add exit should perhaps ensure the queue is flushed
// rd_kafka_poll(producer, 0);
// epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
// rd_kafka_flush(producer, 10 * 1000);
// epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer));
}
asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser,
epicsInt32 *value) {
int function = pasynUser->reason;
asynStatus status = asynSuccess;
const char *paramName;
const char *functionName = "readInt32";
getParamName(function, &paramName);
if (function == P_UdpQueueHighWaterMark) {
const double toPercent = 100. / (243. * udpQueueSize);
*value = (epicsInt32)(epicsRingBytesHighWaterMark(this->udpQueue) /
sizeof(NormalisedEvent) * toPercent);
// Aparently resetting the watermark causes problems...
// at least concurrently :D
// epicsRingBytesResetHighWaterMark(this->udpQueue);
return asynSuccess;
} else if (function == P_SortedQueueHighWaterMark) {
const double toPercent = 100. / (243. * udpQueueSize);
*value = (epicsInt32)(epicsRingBytesHighWaterMark(this->sortedQueue) /
sizeof(NormalisedEvent) * toPercent);
// epicsRingBytesResetHighWaterMark(this->sortedQueue);
return asynSuccess;
}
return asynPortDriver::readInt32(pasynUser, value);
}
asynStatus asynStreamGeneratorDriver::writeInt32(asynUser *pasynUser,
epicsInt32 value) {
int function = pasynUser->reason;
asynStatus status = asynSuccess;
const char *paramName;
const char *functionName = "writeInt32";
getParamName(function, &paramName);
// TODO should maybe lock mutex for this
epicsInt32 currentStatus;
status = getIntegerParam(this->P_Status, &currentStatus);
if (status) {
epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize,
"%s:%s: status=%d, function=%d, name=%s, value=%d",
driverName, functionName, status, function, paramName,
value);
return status;
}
// TODO clean up
bool isClearCount = false;
size_t channelToClear;
for (size_t i = 0; i < this->num_channels; ++i) {
isClearCount |= function == P_ClearCounts[i];
if (isClearCount) {
channelToClear = i;
break;
}
}
// TODO should check everything...
if (function == P_CountPreset) {
if (!currentStatus) {
setIntegerParam(function, value);
setIntegerParam(P_Status, STATUS_COUNTING);
status = (asynStatus)callParamCallbacks();
} else {
return asynError;
}
} else if (function == P_TimePreset) {
if (!currentStatus) {
setIntegerParam(function, value);
setIntegerParam(P_Status, STATUS_COUNTING);
status = (asynStatus)callParamCallbacks();
} else {
return asynError;
}
} else if (function == P_ClearElapsedTime) {
if (!currentStatus) {
setIntegerParam(P_ElapsedTime, 0);
status = (asynStatus)callParamCallbacks();
} else {
return asynError;
}
} else if (isClearCount) {
if (!currentStatus) {
setInteger64Param(P_Counts[channelToClear], 0);
status = (asynStatus)callParamCallbacks();
} else {
return asynError;
}
} else if (function == P_Reset) {
lock();
// TODO should probably set back everything to defaults
setIntegerParam(P_Status, STATUS_IDLE);
status = (asynStatus)callParamCallbacks();
unlock();
} else if (function == P_Stop) {
lock();
setIntegerParam(P_Status, STATUS_IDLE);
status = (asynStatus)callParamCallbacks();
unlock();
} else if (function == P_MonitorChannel) {
if (!currentStatus) {
setIntegerParam(function, value);
status = (asynStatus)callParamCallbacks();
} else {
return asynError;
}
} else {
setIntegerParam(function, value);
status = (asynStatus)callParamCallbacks();
}
if (status)
epicsSnprintf(pasynUser->errorMessage, pasynUser->errorMessageSize,
"%s:%s: status=%d, function=%d, name=%s, value=%d",
driverName, functionName, status, function, paramName,
value);
return status;
}
void asynStreamGeneratorDriver::receiveUDP() {
const char *functionName = "receiveUDP";
asynStatus status = asynSuccess;
// int isConnected = 1;
std::size_t received;
int eomReason;
const std::size_t bufferSize = 1500;
char buffer[bufferSize];
while (true) {
// status = pasynManager->isConnected(pasynUDPUser, &isConnected);
// if (!isConnected)
// asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
// "%s:%s: isConnected = %d\n", driverName, functionName,
// isConnected);
status = pasynOctetSyncIO->read(pasynUDPUser, buffer, bufferSize,
0, // timeout
&received, &eomReason);
if (received) {
const uint16_t bufferLength = ((uint16_t *)buffer)[0];
const std::size_t headerLength = 42;
if (received >= headerLength && received == bufferLength * 2) {
epicsRingBytesPut(this->udpQueue, (char *)buffer, bufferSize);
} else {
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: invalid UDP packet\n", driverName,
functionName);
}
}
}
}
void asynStreamGeneratorDriver::normaliseUDP() {
// TODO fix time overflows
// Regarding time overflow.
// * the header time stamp is 3 words, i.e. 48 bits.
// * it has a resolution of 100ns
// * so we can cover a maximum of (2^(3*16) - 1) * 1e-7 = 28147497 seconds
// * or about 325 days
// * so maybe this isn't necessary to solve, as long as we restart the
// electronics at least once a year...
const char *functionName = "normaliseUDP";
asynStatus status = asynSuccess;
int isConnected = 1;
std::size_t received;
int eomReason;
// The correlation unit sends messages with a maximum size of 1500 bytes.
// These messages don't have any obious start or end to synchronise
// against...
const std::size_t bufferSize = 1500;
char buffer[bufferSize];
const std::size_t resultBufferSize = 243;
NormalisedEvent resultBuffer[resultBufferSize];
// We have 10 mcpdids
uint64_t lastBufferNumber[10];
for (size_t i = 0; i < 10; ++i) {
lastBufferNumber[i] = 0;
}
epicsInt32 droppedMessages = 0;
const UDPHeader *header;
const DetectorEvent *d_event;
const MonitorEvent *m_event;
NormalisedEvent ne;
while (true) {
if (epicsRingBytesUsedBytes(this->udpQueue) > 1500) {
epicsRingBytesGet(this->udpQueue, (char *)buffer, bufferSize);
header = (UDPHeader *)buffer;
const std::size_t total_events = (header->BufferLength - 21) / 3;
if (header->BufferNumber - lastBufferNumber[header->McpdID] > 1 &&
lastBufferNumber[header->McpdID] !=
std::numeric_limits<
decltype(header->BufferNumber)>::max()) {
asynPrint(pasynUserSelf, ASYN_TRACE_ERROR,
"%s:%s: missed packet on id: %d. Received: %" PRIu64
", last: %" PRIu64 "\n",
driverName, functionName, header->McpdID,
header->BufferNumber,
lastBufferNumber[header->McpdID]);
setIntegerParam(P_UdpDropped, ++droppedMessages);
}
lastBufferNumber[header->McpdID] = header->BufferNumber;
for (std::size_t i = 0; i < total_events; ++i) {
char *event = (buffer + 21 * 2 + i * 6);
const bool isMonitorEvent = event[5] & 0x80;
if (isMonitorEvent) {
m_event = (MonitorEvent *)event;
ne.timestamp =
header->nanosecs() + (uint64_t)m_event->nanosecs();
ne.source = 0;
ne.pixelId = m_event->DataID;
} else {
d_event = (DetectorEvent *)event;
ne.timestamp =
header->nanosecs() + (uint64_t)d_event->nanosecs();
ne.source = header->McpdID;
ne.pixelId = d_event->pixelId(header->McpdID);
}
resultBuffer[i] = ne;
}
epicsRingBytesPut(this->normalisedQueue, (char *)resultBuffer,
total_events * sizeof(NormalisedEvent));
} else {
epicsThreadSleep(0.0001); // seconds
}
}
}
struct {
bool operator()(const NormalisedEvent l, const NormalisedEvent r) const {
return l.timestamp < r.timestamp;
}
} oldestEventsFirst;
inline int eventsInQueue(epicsRingBytesId id) {
return epicsRingBytesUsedBytes(id) / sizeof(NormalisedEvent);
}
void asynStreamGeneratorDriver::partialSortEvents() {
const char *functionName = "partialSortEvents";
// x * number of ids * max events in packet
int bufferedEvents = 5 * 10 * 243;
NormalisedEvent events[bufferedEvents];
int queuedEvents = 0;
epicsTimeStamp lastSort = epicsTime::getCurrent();
epicsTimeStamp currentTime = lastSort;
while (true) {
queuedEvents =
eventsInQueue(this->normalisedQueue); // in case we can't wait
lastSort = epicsTime::getCurrent();
currentTime = lastSort;
// wait for mininmum packet frequency or enough packets to ensure we
// could potentially have at least 1 packet per mcpdid
while (queuedEvents < bufferedEvents &&
epicsTimeDiffInNS(&currentTime, &lastSort) < 250'000'000ull) {
epicsThreadSleep(0.0001); // seconds
currentTime = epicsTime::getCurrent();
queuedEvents = eventsInQueue(this->normalisedQueue);
}
queuedEvents = std::min(queuedEvents, bufferedEvents);
if (queuedEvents) {
epicsRingBytesGet(this->normalisedQueue, (char *)events,
queuedEvents * sizeof(NormalisedEvent));
std::sort(events, events + queuedEvents, oldestEventsFirst);
epicsRingBytesPut(this->sortedQueue, (char *)events,
queuedEvents * sizeof(NormalisedEvent));
}
}
}
inline void asynStreamGeneratorDriver::queueForKafka(NormalisedEvent &ne) {
if (this->kafkaEnabled) {
if (ne.source == 0)
epicsRingBytesPut(this->monitorQueue, (char *)&ne,
sizeof(NormalisedEvent));
else
epicsRingBytesPut(this->detectorQueue, (char *)&ne,
sizeof(NormalisedEvent));
}
}
void asynStreamGeneratorDriver::processEvents() {
const char *functionName = "processEvents";
// x * number of ids * max events in packet * event size
int bufferedEvents = 5 * 10 * 243;
// we need a little extra space for merge sorting in
int extraBufferedEvents = 1 * 10 * 243;
// we have two buffers. We alternate between reading data into one of them,
// and then merge sorting into the other
NormalisedEvent eventsABuffer[(bufferedEvents + extraBufferedEvents)];
NormalisedEvent eventsBBuffer[(bufferedEvents + extraBufferedEvents)];
NormalisedEvent *eventsA = &eventsABuffer[0];
NormalisedEvent *eventsB = &eventsBBuffer[0];
NormalisedEvent *eventsBLastStart = eventsB + bufferedEvents;
NormalisedEvent *eventsBLastEnd = eventsBLastStart;
int queuedEvents = 0;
epicsTimeStamp lastProcess = epicsTime::getCurrent();
epicsTimeStamp currentTime = lastProcess;
epicsInt64 counts[this->num_channels];
double elapsedSeconds = 0;
uint64_t startTimestamp = std::numeric_limits<uint64_t>::max();
uint64_t currTimestamp;
epicsInt32 currStatus = STATUS_IDLE;
epicsInt32 prevStatus = STATUS_IDLE;
epicsInt32 countPreset;
epicsInt32 timePreset;
epicsInt32 presetChannel;
epicsInt32 udpQueueHighWaterMark = 0;
epicsInt32 sortedQueueHighWaterMark = 0;
while (true) {
queuedEvents =
eventsInQueue(this->sortedQueue); // in case we can't wait
lastProcess = epicsTime::getCurrent();
currentTime = lastProcess;
// wait for mininmum packet frequency or enough packets to ensure we
// could potentially have at least 1 packet per mcpdid
while (queuedEvents < bufferedEvents &&
epicsTimeDiffInNS(&currentTime, &lastProcess) < 250'000'000ull) {
epicsThreadSleep(0.0001); // seconds
currentTime = epicsTime::getCurrent();
queuedEvents = eventsInQueue(this->sortedQueue);
}
getIntegerParam(this->P_Status, &currStatus);
queuedEvents = std::min(queuedEvents, bufferedEvents);
NormalisedEvent *newStartPtr = eventsA + extraBufferedEvents;
// We read into the array, such that we have enough space, that the
// entirety of the leftover from the previous read can fit before this
// new read, in the case that all new events are newer timewise, and
// therefore, all events from eventsB have to be placed in a preceeding
// position.
epicsRingBytesGet(this->sortedQueue, (char *)newStartPtr,
queuedEvents * sizeof(NormalisedEvent));
int toProcess =
eventsBLastEnd - eventsBLastStart + queuedEvents * 4 / 5;
// TODO could also consider an in-place merge
eventsBLastEnd = std::merge(newStartPtr, newStartPtr + queuedEvents,
eventsBLastStart, eventsBLastEnd, eventsA,
oldestEventsFirst);
eventsBLastStart = eventsA + toProcess;
// TODO I haven't really taken care of the case that there are no events
if (prevStatus == STATUS_IDLE && currStatus == STATUS_COUNTING) {
getIntegerParam(this->P_CountPreset, &countPreset);
getIntegerParam(this->P_TimePreset, &timePreset);
getIntegerParam(this->P_MonitorChannel, &presetChannel);
// reset status variables
startTimestamp = eventsA[0].timestamp;
elapsedSeconds = 0;
for (size_t i = 0; i < this->num_channels; ++i) {
counts[i] = 0;
}
}
if (currStatus == STATUS_COUNTING) {
// The elapsedSeconds are round differently depending on whether we
// are using them for comparison, or for showing to the user, to
// try and make sure the data we send to kafka is correct, while
// the measurement time also appears intuitive.
for (size_t i = 0; i < toProcess; ++i) {
counts[eventsA[i].source == 0 ? eventsA[i].pixelId + 1 : 0] +=
1;
elapsedSeconds = (eventsA[i].timestamp - startTimestamp) / 1e9;
// TODO should really check there an no more events with the
// same final timestamp
if ((countPreset && counts[presetChannel] >= countPreset) ||
(timePreset && elapsedSeconds > (double)timePreset))
break;
// TODO also batchwise?
this->queueForKafka(eventsA[i]);
}
for (size_t i = 0; i < num_channels; ++i) {
setInteger64Param(P_Counts[i], counts[i]);
}
setDoubleParam(P_ElapsedTime, elapsedSeconds);
if ((countPreset && counts[presetChannel] >= countPreset) ||
(timePreset && elapsedSeconds > (double)timePreset)) {
setIntegerParam(this->P_Status, STATUS_IDLE);
setIntegerParam(this->P_CountPreset, 0);
setIntegerParam(this->P_TimePreset, 0);
}
}
prevStatus = currStatus;
std::swap(eventsA, eventsB);
}
}
void asynStreamGeneratorDriver::produce(epicsRingBytesId eventQueue,
rd_kafka_t *kafkaProducer,
const char *topic, const char *source) {
flatbuffers::FlatBufferBuilder builder(1024);
const std::size_t bufferSize = this->kafkaMaxPacketSize + 16;
std::vector<uint32_t> tof;
tof.reserve(bufferSize);
std::vector<uint32_t> did;
did.reserve(bufferSize);
epicsTimeStamp last_sent = epicsTime::getCurrent();
epicsTimeStamp now = last_sent;
int total = 0;
uint64_t message_id = 0;
NormalisedEvent ne;
while (true) {
if (!epicsRingBytesIsEmpty(eventQueue)) {
++total;
epicsRingBytesGet(eventQueue, (char *)&ne, sizeof(NormalisedEvent));
tof.push_back(ne.timestamp);
did.push_back(ne.pixelId);
} else {
epicsThreadSleep(0.001); // seconds
}
now = epicsTime::getCurrent();
// At least every 0.2 seconds
if (total >= this->kafkaMaxPacketSize ||
epicsTimeDiffInNS(&now, &last_sent) > 250'000'000ll) {
last_sent = epicsTime::getCurrent();
if (total) {
total = 0;
builder.Clear();
auto message = CreateEventMessageDirect(
builder, source, message_id++,
((uint64_t)now.secPastEpoch) * 1'000'000'000ull +
((uint64_t)now.nsec),
&tof, &did);
builder.Finish(message, "ev42");
rd_kafka_resp_err_t err = rd_kafka_producev(
kafkaProducer, RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
// RD_KAFKA_V_KEY((void *)key, key_len),
RD_KAFKA_V_VALUE((void *)builder.GetBufferPointer(),
builder.GetSize()),
// RD_KAFKA_V_OPAQUE(NULL),
RD_KAFKA_V_END);
if (err) {
epicsStdoutPrintf("Failed to produce to topic %s: %s\n",
topic, rd_kafka_err2str(err));
}
rd_kafka_poll(kafkaProducer, 0);
tof.clear();
did.clear();
}
}
}
}
void asynStreamGeneratorDriver::produceMonitor() {
this->produce(monitorQueue, monitorProducer, monitorTopic, "monitor");
}
void asynStreamGeneratorDriver::produceDetector() {
this->produce(detectorQueue, detectorProducer, detectorTopic, "detector");
}
/*******************************************************************************
* Methods exposed to IOC Shell
*/
extern "C" {
asynStatus asynStreamGeneratorDriverConfigure(
const char *portName, const char *ipPortName, const int numChannels,
const int udpQueueSize, const char *kafkaBroker, const char *monitorTopic,
const char *detectorTopic, const int kafkaQueueSize,
const int kafkaMaxPacketSize) {
new asynStreamGeneratorDriver(portName, ipPortName, numChannels,
udpQueueSize, kafkaBroker[0], kafkaBroker,
monitorTopic, detectorTopic, kafkaQueueSize,
kafkaMaxPacketSize);
return asynSuccess;
}
static const iocshArg initArg0 = {"portName", iocshArgString};
static const iocshArg initArg1 = {"ipPortName", iocshArgString};
static const iocshArg initArg2 = {"numChannels", iocshArgInt};
static const iocshArg initArg3 = {"udpQueueSize", iocshArgInt};
static const iocshArg initArg4 = {"kafkaBroker", iocshArgString};
static const iocshArg initArg5 = {"monitorTopic", iocshArgString};
static const iocshArg initArg6 = {"detectorTopic", iocshArgString};
static const iocshArg initArg7 = {"kafkaQueueSize", iocshArgInt};
static const iocshArg initArg8 = {"kafkaMaxPacketSize", iocshArgInt};
static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2,
&initArg3, &initArg4, &initArg5,
&initArg6, &initArg7, &initArg8};
static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 9, initArgs};
static void initCallFunc(const iocshArgBuf *args) {
asynStreamGeneratorDriverConfigure(
args[0].sval, args[1].sval, args[2].ival, args[3].ival, args[4].sval,
args[5].sval, args[6].sval, args[7].ival, args[8].ival);
}
void asynStreamGeneratorDriverRegister(void) {
iocshRegister(&initFuncDef, initCallFunc);
}
epicsExportRegistrar(asynStreamGeneratorDriverRegister);
}

View File

@@ -0,0 +1 @@
registrar("asynStreamGeneratorDriverRegister")

View File

@@ -0,0 +1,187 @@
#ifndef asynStreamGeneratorDriver_H
#define asynStreamGeneratorDriver_H
#include "asynPortDriver.h"
#include <epicsRingBytes.h>
#include <librdkafka/rdkafka.h>
/*******************************************************************************
* UDP Packet Definitions
*/
struct __attribute__((__packed__)) UDPHeader {
uint16_t BufferLength;
uint16_t BufferType;
uint16_t HeaderLength;
uint16_t BufferNumber;
uint16_t RunCmdID;
uint16_t Status : 8;
uint16_t McpdID : 8;
uint16_t TimeStamp[3];
uint16_t Parameter0[3];
uint16_t Parameter1[3];
uint16_t Parameter2[3];
uint16_t Parameter3[3];
inline uint64_t nanosecs() const {
uint64_t nsec{((uint64_t)TimeStamp[2]) << 32 |
((uint64_t)TimeStamp[1]) << 16 | (uint64_t)TimeStamp[0]};
return nsec * 100;
}
};
struct __attribute__((__packed__)) DetectorEvent {
uint64_t TimeStamp : 19;
uint16_t XPosition : 10;
uint16_t YPosition : 10;
uint16_t Amplitude : 8;
uint16_t Id : 1;
inline uint32_t nanosecs() const { return TimeStamp * 100; }
inline uint64_t pixelId(uint32_t mpcdId) const {
const uint32_t x_pixels = 128;
const uint32_t y_pixels = 128;
return (mpcdId - 1) * x_pixels * y_pixels +
x_pixels * (uint32_t)this->XPosition + (uint32_t)this->YPosition;
}
};
struct __attribute__((__packed__)) MonitorEvent {
uint64_t TimeStamp : 19;
uint64_t Data : 21;
uint64_t DataID : 4;
uint64_t TriggerID : 3;
uint64_t Id : 1;
inline uint32_t nanosecs() const { return TimeStamp * 100; }
};
/*******************************************************************************
* Simplified Event Struct Definition
*/
struct __attribute__((__packed__)) NormalisedEvent {
uint64_t timestamp;
uint32_t pixelId : 24;
uint8_t source;
// inline NormalisedEvent(uint64_t timestamp, uint8_t source, uint32_t
// pixelId)
// : timestamp(timestamp), source(source), pixelId(pixelId){};
};
/*******************************************************************************
* Status values that should match the definition in db/daq_common.db
*/
#define STATUS_IDLE 0
#define STATUS_COUNTING 1
#define STATUS_LOWRATE 2
#define STATUS_PAUSED 3
/*******************************************************************************
* Parameters for use in DB records
*
* i.e.e drvInfo strings that are used to identify the parameters
*/
#define P_StatusString "STATUS"
#define P_ResetString "RESET"
#define P_StopString "STOP"
#define P_CountPresetString "P_CNT"
#define P_TimePresetString "P_TIME"
#define P_ElapsedTimeString "TIME"
#define P_ClearElapsedTimeString "C_TIME"
#define P_MonitorChannelString "MONITOR"
#define P_ThresholdString "THRESH"
#define P_ThresholdChannelString "THRESH_CH"
#define P_CountsString "COUNTS%d"
#define P_RateString "RATE%d"
#define P_ClearCountsString "C_%d"
#define P_UdpDroppedString "DROP"
#define P_UdpQueueHighWaterMarkString "UDP"
#define P_SortedQueueHighWaterMarkString "SORT"
/*******************************************************************************
* Stream Generator Coordinating Class
*/
class asynStreamGeneratorDriver : public asynPortDriver {
public:
asynStreamGeneratorDriver(const char *portName, const char *ipPortName,
const int numChannels, const int udpQueueSize,
const bool enableKafkaStream,
const char *kafkaBroker, const char *monitorTopic,
const char *detectorTopic,
const int kafkaQueueSize,
const int kafkaMaxPacketSize);
virtual ~asynStreamGeneratorDriver();
virtual asynStatus readInt32(asynUser *pasynUser, epicsInt32 *value);
virtual asynStatus writeInt32(asynUser *pasynUser, epicsInt32 value);
void receiveUDP();
void normaliseUDP();
void partialSortEvents();
void processEvents();
void produceMonitor();
void produceDetector();
protected:
// Parameter Identifying IDs
int P_Status;
int P_Reset;
int P_Stop;
int P_CountPreset;
int P_TimePreset;
int P_ElapsedTime;
int P_ClearElapsedTime;
int P_MonitorChannel;
int P_Threshold;
int P_ThresholdChannel;
int *P_Counts;
int *P_Rates;
int *P_ClearCounts;
// System Status Parameter Identifying IDs
int P_UdpDropped;
int P_UdpQueueHighWaterMark;
int P_SortedQueueHighWaterMark;
private:
asynUser *pasynUDPUser;
epicsEventId pausedEventId;
const int num_channels;
const bool kafkaEnabled;
const int kafkaQueueSize;
const int kafkaMaxPacketSize;
const int udpQueueSize;
epicsRingBytesId udpQueue;
epicsRingBytesId normalisedQueue;
epicsRingBytesId sortedQueue;
epicsRingBytesId monitorQueue;
rd_kafka_t *monitorProducer;
const char *monitorTopic;
epicsRingBytesId detectorQueue;
rd_kafka_t *detectorProducer;
const char *detectorTopic;
constexpr static char *driverName = "StreamGenerator";
asynStatus createInt32Param(asynStatus status, char *name, int *variable,
epicsInt32 initialValue = 0);
asynStatus createInt64Param(asynStatus status, char *name, int *variable,
epicsInt64 initialValue = 0);
asynStatus createFloat64Param(asynStatus status, char *name, int *variable,
double initialValue = 0);
inline void queueForKafka(NormalisedEvent &ne);
void produce(epicsRingBytesId eventQueue, rd_kafka_t *kafkaProducer,
const char *topic, const char *source);
};
#endif

View File

@@ -1,342 +0,0 @@
import queue
import socket
import time
import threading
from uuid import uuid4
import math
from confluent_kafka import Producer
import streaming_data_types
# receiving directly (can also specify correlation unit ip)
UDP_IP = ""
UDP_PORT = 54321
# If redirecting traffic via
# socat -U - udp4-recv:54321 | tee >( socat -u - udp4-datagram:127.0.0.1:54322 ) | socat -u - udp4-datagram:127.0.0.1:54323
# UDP_IP = "127.0.0.1"
# UDP_PORT = 54323
WINDOWSECONDS = 5
WINDOWSIZE = 20000 * WINDOWSECONDS
MONITORS = 4 # We have max 4 monitors
time_offset = None # Estimate of clock offset
time_window = {
i: queue.Queue(maxsize=WINDOWSIZE)
for i in range(MONITORS)
}
# event_time_window = queue.Queue(maxsize=50000 * WINDOWSECONDS)
EVENT_WINDOWSIZE = 50000
EVENT_WINDOW_PTR = 0
event_time_window = [0 for i in range(EVENT_WINDOWSIZE)]
event_average_rate = 0
event_last_timestamp = None
MISSED_PACKETS = -9 # All modules appear to miss the first time due to initialisation as 0
# missed_packets_time_window = queue.Queue(maxsize=100)
def print_monitor_rates():
while True:
for i in range(MONITORS):
msg = f"Monitor {i+1}: {time_window[i].qsize() / WINDOWSECONDS} cts/s"
try:
earliest = time_window[i].queue[0]
newest = max(time_window[i].queue)
t = time.time()
msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s'
except:
pass
print(msg)
# try:
# print(f'Events: {1 / event_average_rate} cts/s')
# except:
# pass
try:
print(f'Events: {round(1 / (sum(event_time_window) / EVENT_WINDOWSIZE * 1e-7), 2)} cts/s')
except:
pass
print(f'Missed Packets: {MISSED_PACKETS}')
# Detector Events
# msg = f"Events : {event_time_window.qsize() / WINDOWSECONDS} cts/s"
# try:
# earliest = event_time_window.queue[0]
# newest = max(event_time_window.queue)
# t = time.time()
# msg += f', buffer range: {round((newest - earliest) * 1e-7, 3)} s, oldest: {round(time.time() - ((time_offset + earliest) * 1e-7), 3)} s, newest: {round(time.time() - ((time_offset + newest) * 1e-7), 3)} s'
# except:
# pass
# print(msg)
time.sleep(1)
threading.Thread(target=print_monitor_rates, daemon=True).start()
def clean_monitor_rates():
latest = 0
while True:
for d_id in range(MONITORS):
t_w = time_window[d_id]
if not t_w.empty():
# TODO probably should switch to a priority queue
# as the messages might not be in order
# TODO could also just replace with a low-pass filter
# would be a lot more efficient
# TODO the way this is done, we need trigger events
# in order for the signal to decay back to 0.
# If no events come, the rate remains stuck
latest = max(latest, max(t_w.queue))
# latest = time_window[1].queue[-1]
try:
while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7):
t_w.get_nowait()
except IndexError:
pass
time.sleep(0.01)
threading.Thread(target=clean_monitor_rates, daemon=True).start()
# def clean_event_rates():
# latest = 0
# while True:
# t_w = event_time_window
# if not t_w.empty():
# # TODO probably should switch to a priority queue
# # as the messages might not be in order
# # TODO could also just replace with a low-pass filter
# # would be a lot more efficient
# # TODO the way this is done, we need trigger events
# # in order for the signal to decay back to 0.
# # If no events come, the rate remains stuck
# #latest = max(latest, max(t_w.queue))
# try:
# latest = time_window[1].queue[-1]
# while t_w.queue[0] < (latest - WINDOWSECONDS * 1e7):
# t_w.get_nowait()
# except IndexError:
# pass
# time.sleep(0.005)
#
# threading.Thread(target=clean_event_rates, daemon=True).start()
# Event Kafka Producer
event_queue = queue.Queue()
def event_producer():
producer_config = {
'bootstrap.servers': "linkafka01:9092",
'queue.buffering.max.messages': 1e7,
}
prod = Producer(producer_config)
st = time.time()
msg_id = 0
b_size = 10000
b_ptr = 0
pixel_buffer = [0 for _ in range(b_size)]
time_buffer = [0 for _ in range(b_size)]
poll_cnt = 0
while True:
(p_id, timestamp) = event_queue.get()
pixel_buffer[b_ptr] = p_id
time_buffer[b_ptr] = timestamp
b_ptr += 1
nt = time.time()
if b_ptr == b_size or nt - st > 0.001:
st = nt
if b_ptr > 0:
message = streaming_data_types.serialise_ev42(
message_id = msg_id,
pulse_time = time_buffer[0] * 100, # int(time.time() * 1_000_000_000),
time_of_flight = time_buffer[0:b_ptr],
detector_id = pixel_buffer[0:b_ptr],
source_name = '',
)
msg_id = (msg_id + 1) % 100000000
b_ptr = 0
prod.produce(
topic = "DMC_detector",
value = message,
partition = 0,
)
# if poll_cnt % 1000 == 0:
prod.poll(0)
poll_cnt = (poll_cnt + 1) % 1000
threading.Thread(target=event_producer, daemon=True).start()
# Monitor Kafka Producer
monitor_queue = queue.Queue()
def monitor_producer():
producer_config = {
'bootstrap.servers': "linkafka01:9092",
'queue.buffering.max.messages': 1e7,
}
prod = Producer(producer_config)
monitor_buffer = [0 for i in range(MONITORS)]
monitor_time = [0 for i in range(MONITORS)]
st = time.time()
poll_cnt = 0
while True:
(d_id, timestamp) = monitor_queue.get()
monitor_buffer[d_id] += 1
monitor_time[d_id] = timestamp
nt = time.time()
if nt - st > 0.05:
st = nt
for i in range(MONITORS):
if monitor_buffer[d_id]:
message = streaming_data_types.serialise_f142(
source_name = f"monitor{d_id+1}",
value = monitor_buffer[d_id],
# ns resolution (supposed to be past epoch, not what the detector returns though)
timestamp_unix_ns = monitor_time[d_id] * 100 # send time of last monitor
)
prod.produce(
topic = "DMC_neutron_monitor",
value = message,
partition = 0,
)
monitor_buffer[d_id] = 0
if poll_cnt % 1000 == 0:
prod.poll(0)
poll_cnt = (poll_cnt + 1) % 1000
threading.Thread(target=monitor_producer, daemon=True).start()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_IP, UDP_PORT))
val = 0
start_time = time.time()
module_counts = [0 for i in range(10)]
EVENTS = 0
while True:
data, addr = sock.recvfrom(2056) # Buffer size is 1024 bytes
raw_header = data[:42]
raw_data = data[42:]
(buffer_length, buffer_type, header_length,
buffer_number, run_id, mcpd_status,
t_low, t_mid, t_high, *_) = memoryview(raw_header).cast('H')
mcpd_id = ( mcpd_status >> 8 ) & 0xff
mcpd_status = ( mcpd_status ) & 0x3
running_msg = "running" if (mcpd_status & 0x1) else "stopped"
sync_msg = "in sync" if (mcpd_status & 0x2) else "sync error"
timestamp = ( t_high << 32 ) | ( t_mid << 16 ) | t_low # 100 ns resolution
#print(f'Packet {int(timestamp * 1e-7)}s => buffer: {buffer_number}, length: {int(buffer_length*2/6)} events, status: {mcpd_status} {mcpd_id} {running_msg} with {sync_msg}')
# print(f'Packet => buffer: {mcpd_id}-{buffer_number}, length: {int((buffer_length-21)/3)} events, status: {mcpd_status}')
if time_offset is None:
time_offset = time.time() * 1e7 - timestamp
if buffer_number - module_counts[mcpd_id] != 1:
MISSED_PACKETS += 1
# if missed_packets_time_window.full():
# missed_packets_time_window.get_nowait()
# missed_packets_time_window.put(timestamp)
module_counts[mcpd_id] = buffer_number
for i in range(0, len(raw_data), 6):
event = memoryview(raw_data)[i:i+6]
event_type = event[5] >> 7
# print(event_type)
if event_type: # Trigger Event
t_id = ( event[5] >> 4 ) & 0x7
d_id = event[5] & 0xf
event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0]
# print(f'Trigger event {event_timestamp * 1e-7}s => TrigID: {t_id}, DataID: {d_id}')
t_w = time_window[d_id]
t_w.put_nowait(event_timestamp)
monitor_queue.put_nowait((d_id, event_timestamp))
else: # Neutron Event
x_pixels = 128
y_pixels = 128
amplitude = ( event[5] << 1 ) | ( event[4] >> 7 )
# The DMC StreamHistogrammer setup currently expects each module to
# be 128 * 128 pixels but the resolution in the packages is
# actually 10bit. We remove the lowest 3 bits.
x = (( (event[3] & 0x1f) << 5 | (event[2] & 0xf8) >> 3 ) & 0x3ff) >> 3
y = (( (event[4] & 0x7f) << 3 | (event[3] & 0xe0) >> 5 ) & 0x3ff) >> 3
event_timestamp = timestamp + ( ( event[2] << 16 ) & 0x7 ) | ( event[1] << 8 ) | event[0]
# print(f'Neutron event {event_timestamp * 1e-7}s: {amplitude}, x: {x}, y: {y}')
if event_last_timestamp is None:
event_last_timestamp = event_timestamp
# Seems like at higher frequencies these come very much out of order
# so this is very approximate
event_time_window[EVENT_WINDOW_PTR] = event_timestamp - event_last_timestamp
EVENT_WINDOW_PTR = (EVENT_WINDOW_PTR + 1) % EVENT_WINDOWSIZE
event_last_timestamp = event_timestamp
# I suppose this doesn't work mostly due to the timestamps ordering...
# event_timestamp_seconds = event_timestamp * 1e-7
# if event_last_timestamp is None:
# event_last_timestamp = event_timestamp_seconds
# f_cutoff = 1e6 # Hz
# tau = 1 / ( 2 * math.pi * f_cutoff)
# dt = event_timestamp_seconds - event_last_timestamp
# if dt > 0:
# w = math.exp(-dt / tau)
# event_average_rate = w * dt + event_average_rate * (1 - w)
# event_last_timestamp = event_timestamp_seconds
# EVENTS += 1
# a = (mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y
# print((EVENTS, x, y, a, a < 128 * 128 * 9, mcpd_id))
# if not a < 128 * 128 * 9:
# print((event[3], event[3] << 5, event[2], event[2] >> 3))
event_queue.put_nowait((
(mcpd_id - 1) * x_pixels * y_pixels + x_pixels * x + y,
event_timestamp
))