From b8105091562786b6f3361a19aa3ba054177d837b Mon Sep 17 00:00:00 2001 From: Edward Wall Date: Wed, 29 Oct 2025 12:07:02 +0100 Subject: [PATCH] POC for each interface type --- .clang-format | 245 ++++++++++++++++++++++++++++++ Makefile | 26 ++++ db/channels.db | 19 +++ scripts/ioc.sh | 9 ++ scripts/st.cmd | 16 ++ src/asynStreamGeneratorDriver.cpp | 213 ++++++++++++++++++++++++++ src/asynStreamGeneratorDriver.dbd | 1 + src/asynStreamGeneratorDriver.h | 26 ++++ 8 files changed, 555 insertions(+) create mode 100644 .clang-format create mode 100644 Makefile create mode 100644 db/channels.db create mode 100755 scripts/ioc.sh create mode 100755 scripts/st.cmd create mode 100644 src/asynStreamGeneratorDriver.cpp create mode 100644 src/asynStreamGeneratorDriver.dbd create mode 100644 src/asynStreamGeneratorDriver.h diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..a92596f --- /dev/null +++ b/.clang-format @@ -0,0 +1,245 @@ +--- +Language: Cpp +# BasedOnStyle: LLVM +AccessModifierOffset: -2 +AlignAfterOpenBracket: Align +AlignArrayOfStructures: None +AlignConsecutiveAssignments: + Enabled: false + AcrossEmptyLines: false + AcrossComments: false + AlignCompound: false + AlignFunctionPointers: false + PadOperators: true +AlignConsecutiveBitFields: + Enabled: false + AcrossEmptyLines: false + AcrossComments: false + AlignCompound: false + AlignFunctionPointers: false + PadOperators: false +AlignConsecutiveDeclarations: + Enabled: false + AcrossEmptyLines: false + AcrossComments: false + AlignCompound: false + AlignFunctionPointers: false + PadOperators: false +AlignConsecutiveMacros: + Enabled: false + AcrossEmptyLines: false + AcrossComments: false + AlignCompound: false + AlignFunctionPointers: false + PadOperators: false +AlignConsecutiveShortCaseStatements: + Enabled: false + AcrossEmptyLines: false + AcrossComments: false + AlignCaseColons: false +AlignEscapedNewlines: Right +AlignOperands: Align +AlignTrailingComments: + Kind: Always + OverEmptyLines: 0 +AllowAllArgumentsOnNextLine: true +AllowAllParametersOfDeclarationOnNextLine: true +AllowBreakBeforeNoexceptSpecifier: Never +AllowShortBlocksOnASingleLine: Never +AllowShortCaseLabelsOnASingleLine: false +AllowShortCompoundRequirementOnASingleLine: true +AllowShortEnumsOnASingleLine: true +AllowShortFunctionsOnASingleLine: All +AllowShortIfStatementsOnASingleLine: Never +AllowShortLambdasOnASingleLine: All +AllowShortLoopsOnASingleLine: false +AlwaysBreakAfterDefinitionReturnType: None +AlwaysBreakAfterReturnType: None +AlwaysBreakBeforeMultilineStrings: false +AlwaysBreakTemplateDeclarations: MultiLine +AttributeMacros: + - __capability +BinPackArguments: true +BinPackParameters: true +BitFieldColonSpacing: Both +BraceWrapping: + AfterCaseLabel: false + AfterClass: false + AfterControlStatement: Never + AfterEnum: false + AfterExternBlock: false + AfterFunction: false + AfterNamespace: false + AfterObjCDeclaration: false + AfterStruct: false + AfterUnion: false + BeforeCatch: false + BeforeElse: false + BeforeLambdaBody: false + BeforeWhile: false + IndentBraces: false + SplitEmptyFunction: true + SplitEmptyRecord: true + SplitEmptyNamespace: true +BreakAdjacentStringLiterals: true +BreakAfterAttributes: Leave +BreakAfterJavaFieldAnnotations: false +BreakArrays: true +BreakBeforeBinaryOperators: None +BreakBeforeConceptDeclarations: Always +BreakBeforeBraces: Attach +BreakBeforeInlineASMColon: OnlyMultiline +BreakBeforeTernaryOperators: true +BreakConstructorInitializers: BeforeColon +BreakInheritanceList: BeforeColon +BreakStringLiterals: true +ColumnLimit: 80 +CommentPragmas: '^ IWYU pragma:' +CompactNamespaces: false +ConstructorInitializerIndentWidth: 4 +ContinuationIndentWidth: 4 +Cpp11BracedListStyle: true +DerivePointerAlignment: false +DisableFormat: false +EmptyLineAfterAccessModifier: Never +EmptyLineBeforeAccessModifier: LogicalBlock +ExperimentalAutoDetectBinPacking: false +FixNamespaceComments: true +ForEachMacros: + - foreach + - Q_FOREACH + - BOOST_FOREACH +IfMacros: + - KJ_IF_MAYBE +IncludeBlocks: Preserve +IncludeCategories: + - Regex: '^"(llvm|llvm-c|clang|clang-c)/' + Priority: 2 + SortPriority: 0 + CaseSensitive: false + - Regex: '^(<|"(gtest|gmock|isl|json)/)' + Priority: 3 + SortPriority: 0 + CaseSensitive: false + - Regex: '.*' + Priority: 1 + SortPriority: 0 + CaseSensitive: false +IncludeIsMainRegex: '(Test)?$' +IncludeIsMainSourceRegex: '' +IndentAccessModifiers: false +IndentCaseBlocks: false +IndentCaseLabels: false +IndentExternBlock: AfterExternBlock +IndentGotoLabels: true +IndentPPDirectives: None +IndentRequiresClause: true +IndentWidth: 4 +IndentWrappedFunctionNames: false +InsertBraces: false +InsertNewlineAtEOF: false +InsertTrailingCommas: None +IntegerLiteralSeparator: + Binary: 0 + BinaryMinDigits: 0 + Decimal: 0 + DecimalMinDigits: 0 + Hex: 0 + HexMinDigits: 0 +JavaScriptQuotes: Leave +JavaScriptWrapImports: true +KeepEmptyLinesAtTheStartOfBlocks: true +KeepEmptyLinesAtEOF: false +LambdaBodyIndentation: Signature +LineEnding: DeriveLF +MacroBlockBegin: '' +MacroBlockEnd: '' +MaxEmptyLinesToKeep: 1 +NamespaceIndentation: None +ObjCBinPackProtocolList: Auto +ObjCBlockIndentWidth: 2 +ObjCBreakBeforeNestedBlockParam: true +ObjCSpaceAfterProperty: false +ObjCSpaceBeforeProtocolList: true +PackConstructorInitializers: BinPack +PenaltyBreakAssignment: 2 +PenaltyBreakBeforeFirstCallParameter: 19 +PenaltyBreakComment: 300 +PenaltyBreakFirstLessLess: 120 +PenaltyBreakOpenParenthesis: 0 +PenaltyBreakScopeResolution: 500 +PenaltyBreakString: 1000 +PenaltyBreakTemplateDeclaration: 10 +PenaltyExcessCharacter: 1000000 +PenaltyIndentedWhitespace: 0 +PenaltyReturnTypeOnItsOwnLine: 60 +PointerAlignment: Right +PPIndentWidth: -1 +QualifierAlignment: Leave +ReferenceAlignment: Pointer +ReflowComments: true +RemoveBracesLLVM: false +RemoveParentheses: Leave +RemoveSemicolon: false +RequiresClausePosition: OwnLine +RequiresExpressionIndentation: OuterScope +SeparateDefinitionBlocks: Leave +ShortNamespaceLines: 1 +SkipMacroDefinitionBody: false +SortIncludes: CaseSensitive +SortJavaStaticImport: Before +SortUsingDeclarations: LexicographicNumeric +SpaceAfterCStyleCast: false +SpaceAfterLogicalNot: false +SpaceAfterTemplateKeyword: true +SpaceAroundPointerQualifiers: Default +SpaceBeforeAssignmentOperators: true +SpaceBeforeCaseColon: false +SpaceBeforeCpp11BracedList: false +SpaceBeforeCtorInitializerColon: true +SpaceBeforeInheritanceColon: true +SpaceBeforeJsonColon: false +SpaceBeforeParens: ControlStatements +SpaceBeforeParensOptions: + AfterControlStatements: true + AfterForeachMacros: true + AfterFunctionDefinitionName: false + AfterFunctionDeclarationName: false + AfterIfMacros: true + AfterOverloadedOperator: false + AfterPlacementOperator: true + AfterRequiresInClause: false + AfterRequiresInExpression: false + BeforeNonEmptyParentheses: false +SpaceBeforeRangeBasedForLoopColon: true +SpaceBeforeSquareBrackets: false +SpaceInEmptyBlock: false +SpacesBeforeTrailingComments: 1 +SpacesInAngles: Never +SpacesInContainerLiterals: true +SpacesInLineCommentPrefix: + Minimum: 1 + Maximum: -1 +SpacesInParens: Never +SpacesInParensOptions: + InCStyleCasts: false + InConditionalStatements: false + InEmptyParentheses: false + Other: false +SpacesInSquareBrackets: false +Standard: Latest +StatementAttributeLikeMacros: + - Q_EMIT +StatementMacros: + - Q_UNUSED + - QT_REQUIRE_VERSION +TabWidth: 8 +UseTab: Never +VerilogBreakBetweenInstancePorts: true +WhitespaceSensitiveMacros: + - BOOST_PP_STRINGIZE + - CF_SWIFT_NAME + - NS_SWIFT_NAME + - PP_STRINGIZE + - STRINGIZE +... diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..2bbce1b --- /dev/null +++ b/Makefile @@ -0,0 +1,26 @@ +# Include the external Makefile +include /ioc/tools/driver.makefile + +MODULE=StreamGenerator +BUILDCLASSES=Linux +EPICS_VERSIONS=7.0.7 +#ARCH_FILTER=RHEL% +ARCH_FILTER=linux-x86_64 + +# Additional module dependencies +REQUIRED+=asyn + +DBDS += src/asynStreamGeneratorDriver.dbd + +# DB files to include in the release +TEMPLATES += db/channels.db + +# These headers allow to depend on this library for derived drivers. +HEADERS += src/asynStreamGeneratorDriver.h + +# Source files to build +SOURCES += src/asynStreamGeneratorDriver.cpp + +USR_CFLAGS += -Wall -Wextra -Wunused-result -Werror -fvisibility=hidden # -Wpedantic // Does not work because EPICS macros trigger warnings + +LIB_SYS_LIBS += rdkafka diff --git a/db/channels.db b/db/channels.db new file mode 100644 index 0000000..19aed39 --- /dev/null +++ b/db/channels.db @@ -0,0 +1,19 @@ +# EPICS Database for streamdevice specific to measurement channels +# +# Macros +# INSTR - Prefix +# NAME - the device name, e.g. EL737 +# PORT - Stream Generator Port +# CHANNEL - the number associated with the measurment channel + +################################################################################ +# Read all monitors values + +record(longin, "$(INSTR)$(NAME):M$(CHANNEL)") +{ + field(DESC, "DAQ CH$(CHANNEL)") + field(EGU, "cts") + field(DTYP, "asynInt32") + field(INP, "@asyn($(PORT),0,$(TIMEOUT=1))COUNTS") + field(SCAN, "I/O Intr") +} diff --git a/scripts/ioc.sh b/scripts/ioc.sh new file mode 100755 index 0000000..7528295 --- /dev/null +++ b/scripts/ioc.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +export EPICS_HOST_ARCH=linux-x86_64 +export EPICS_BASE=/usr/local/epics/base-7.0.7 + +PARENT_PATH="$( cd "$(dirname "${BASH_SOURCE[0]}")" ; pwd -P )" + +# /usr/local/bin/procServ -o -L - -f -i ^D^C 20001 "${PARENT_PATH}/st.cmd" -d +${PARENT_PATH}/st.cmd diff --git a/scripts/st.cmd b/scripts/st.cmd new file mode 100755 index 0000000..31dce8e --- /dev/null +++ b/scripts/st.cmd @@ -0,0 +1,16 @@ +#!/usr/local/bin/iocsh +#-d + +on error break + +require StreamGenerator, test + +epicsEnvSet("INSTR", "SQ:TEST:") +epicsEnvSet("NAME", "SG") + +drvAsynIPPortConfigure("ASYN_IP_PORT", "127.0.0.1:9071:9073 UDP", 0, 0, 0) +asynStreamGenerator("ASYN_SG", "ASYN_IP_PORT", 4) + +dbLoadRecords("$(StreamGenerator_DB)channels.db", "INSTR=$(INSTR), NAME=$(NAME), PORT=ASYN_SG, CHANNEL=0") + +iocInit() diff --git a/src/asynStreamGeneratorDriver.cpp b/src/asynStreamGeneratorDriver.cpp new file mode 100644 index 0000000..0870399 --- /dev/null +++ b/src/asynStreamGeneratorDriver.cpp @@ -0,0 +1,213 @@ +#include "asynOctetSyncIO.h" +#include +#include +#include + +#include "asynStreamGeneratorDriver.h" +#include + +/* Wrapper to set config values and error out if needed. + */ +static void set_config(rd_kafka_conf_t *conf, char *key, char *value) { + char errstr[512]; + rd_kafka_conf_res_t res; + + res = rd_kafka_conf_set(conf, key, value, errstr, sizeof(errstr)); + if (res != RD_KAFKA_CONF_OK) { + // TODO + // g_error("Unable to set config: %s", errstr); + exit(1); + } +} + +static void udpPollerTask(void *drvPvt) { + asynStreamGeneratorDriver *pSGD = (asynStreamGeneratorDriver *)drvPvt; + pSGD->receiveUDP(); +} + +/** Constructor for the asynStreamGeneratorDriver class. + * Calls constructor for the asynPortDriver base class. + * \param[in] portName The name of the asyn port driver to be created. */ +asynStreamGeneratorDriver::asynStreamGeneratorDriver(const char *portName, + const char *ipPortName, + const int numChannels) + : asynPortDriver(portName, 1, /* maxAddr */ + asynInt32Mask, /* Interface mask */ + asynInt32Mask, /* Interrupt mask */ + 0, /* asynFlags. This driver does not block and it is + not multi-device, but has a + destructor ASYN_DESTRUCTIBLE our version of the Asyn + is too old to support this flag */ + 1, /* Autoconnect */ + 0, /* Default priority */ + 0) /* Default stack size*/ +{ + + // Parameter Setup + createParam(P_CountsString, asynParamInt32, &P_Counts); + setIntegerParam(P_Counts, 0); + + // UDP Receive Setup + pasynOctetSyncIO->connect(ipPortName, 0, &pasynUDPUser, NULL); + + /* Create the thread that receives UDP traffic in the background */ + asynStatus status = + (asynStatus)(epicsThreadCreate( + "udp_receive", epicsThreadPriorityMedium, + epicsThreadGetStackSize(epicsThreadStackMedium), + (EPICSTHREADFUNC)::udpPollerTask, this) == NULL); + if (status) { + // printf("%s:%s: epicsThreadCreate failure, status=%d\n", driverName, + // functionName, status); + printf("%s:%s: epicsThreadCreate failure, status=%d\n", + "StreamGenerator", "init", status); + return; + } + + // Kafka Produce Setup + rd_kafka_conf_t *conf; + char errstr[512]; + + // Create client configuration + conf = rd_kafka_conf_new(); + set_config(conf, "bootstrap.servers", "linkafka01:9092"); + set_config(conf, "queue.buffering.max.messages", "1e7"); + + // Create the Producer instance. + rd_kafka_t *producer = + rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); + if (!producer) { + // TODO + // g_error("Failed to create new producer: %s", errstr); + exit(1); + } + + char *msg = "asdf\n"; + + rd_kafka_resp_err_t err = + rd_kafka_producev(producer, RD_KAFKA_V_TOPIC("NEWEFU_TEST"), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + // RD_KAFKA_V_KEY((void *)key, key_len), + RD_KAFKA_V_VALUE((void *)msg, 6), + // RD_KAFKA_V_OPAQUE(NULL), + RD_KAFKA_V_END); + + if (err) { + // TODO + // g_error("Failed to produce to topic %s: %s", topic, + // rd_kafka_err2str(err)); + exit(1); + } + + epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); + + rd_kafka_poll(producer, 0); + epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); + rd_kafka_flush(producer, 10 * 1000); + epicsStdoutPrintf("Kafka Queue Size %d\n", rd_kafka_outq_len(producer)); +} + +asynStreamGeneratorDriver::~asynStreamGeneratorDriver() {} + +asynStatus asynStreamGeneratorDriver::readInt32(asynUser *pasynUser, + epicsInt32 *value) { + + int function = pasynUser->reason; + asynStatus status; + + if (function == P_Counts) { + int val; + status = getIntegerParam(P_Counts, &val); + *value = val; + return status; + } else { + return asynError; + } + return asynSuccess; +} + +void asynStreamGeneratorDriver::receiveUDP() { + asynStatus status; + int isConnected; + + char buffer[1500]; + size_t received; + int eomReason; + + int val; + + while (true) { + // epicsStdoutPrintf("polling!!"); + status = pasynManager->isConnected(pasynUDPUser, &isConnected); + if (status) { + asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, + "%s:%s: error calling pasynManager->isConnected, " + "status=%d, error=%s\n", + "StreamGenerator", "receiveUDP", status, + pasynUDPUser->errorMessage); + // driverName, functionName, status, + // pasynUserIPPort_->errorMessage); + } + asynPrint(pasynUserSelf, ASYN_TRACEIO_DRIVER, + "%s:%s: isConnected = %d\n", // + "StreamGenerator", "receiveUDP", isConnected); + + status = pasynOctetSyncIO->read(pasynUDPUser, buffer, 1500, + 1, // timeout + &received, &eomReason); + + // if (status) + // asynPrint( + // pasynUserSelf, ASYN_TRACE_ERROR, + // "%s:%s: error calling pasynOctetSyncIO->read, status=%d\n", + // "StreamGenerator", "receiveUDP", status); + + buffer[received] = 0; + + if (received) + asynPrint(pasynUserSelf, ASYN_TRACE_ERROR, "%s:%s: received %s\n", + "StreamGenerator", "receiveUDP", buffer); + + lock(); + getIntegerParam(P_Counts, &val); + val += received > 0; + setIntegerParam(P_Counts, val); + callParamCallbacks(); + unlock(); + + epicsThreadSleep(0.001); // seconds + } +} + +/* Configuration routine. Called directly, or from the iocsh function below */ + +extern "C" { + +/** EPICS iocsh callable function to call constructor for the + * asynStreamGeneratorDriver class. \param[in] portName The name of the asyn + * port driver to be created. */ +asynStatus asynStreamGeneratorDriverConfigure(const char *portName, + const char *ipPortName, + const int numChannels) { + new asynStreamGeneratorDriver(portName, ipPortName, numChannels); + return asynSuccess; +} + +/* EPICS iocsh shell commands */ + +static const iocshArg initArg0 = {"portName", iocshArgString}; +static const iocshArg initArg1 = {"ipPortName", iocshArgString}; +static const iocshArg initArg2 = {"numChannels", iocshArgInt}; +static const iocshArg *const initArgs[] = {&initArg0, &initArg1, &initArg2}; +static const iocshFuncDef initFuncDef = {"asynStreamGenerator", 3, initArgs}; +static void initCallFunc(const iocshArgBuf *args) { + asynStreamGeneratorDriverConfigure(args[0].sval, args[1].sval, + args[2].ival); +} + +void asynStreamGeneratorDriverRegister(void) { + iocshRegister(&initFuncDef, initCallFunc); +} + +epicsExportRegistrar(asynStreamGeneratorDriverRegister); +} diff --git a/src/asynStreamGeneratorDriver.dbd b/src/asynStreamGeneratorDriver.dbd new file mode 100644 index 0000000..d0df127 --- /dev/null +++ b/src/asynStreamGeneratorDriver.dbd @@ -0,0 +1 @@ +registrar("asynStreamGeneratorDriverRegister") diff --git a/src/asynStreamGeneratorDriver.h b/src/asynStreamGeneratorDriver.h new file mode 100644 index 0000000..28bcab7 --- /dev/null +++ b/src/asynStreamGeneratorDriver.h @@ -0,0 +1,26 @@ +#ifndef asynStreamGeneratorDriver_H +#define asynStreamGeneratorDriver_H + +#include "asynPortDriver.h" + +/* These are the drvInfo strings that are used to identify the parameters. */ +#define P_CountsString "COUNTS" /* asynInt32, r/w */ + +class asynStreamGeneratorDriver : public asynPortDriver { + public: + asynStreamGeneratorDriver(const char *portName, const char *ipPortName, + const int numChannels); + virtual ~asynStreamGeneratorDriver(); + + virtual asynStatus readInt32(asynUser *pasynUser, epicsInt32 *value); + + void receiveUDP(); + + protected: + int P_Counts; + + private: + asynUser *pasynUDPUser; +}; + +#endif