From fbe2ffa3cb9afcb0f8416ddec0e5e21665378e55 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Wed, 30 Jul 2014 11:14:52 +0200 Subject: [PATCH] testServer: added alwaysSendAll for ChannelGet, velocious for Monitor; added testMonitorPerformance util --- pvAccessCPP.files | 113 ++++- src/remote/codec.cpp | 4 +- testApp/remote/Makefile | 4 + testApp/remote/testGetPerformance.cpp | 7 +- testApp/remote/testMonitorPerformance.cpp | 499 ++++++++++++++++++++++ testApp/remote/testServer.cpp | 26 +- 6 files changed, 642 insertions(+), 11 deletions(-) create mode 100644 testApp/remote/testMonitorPerformance.cpp diff --git a/pvAccessCPP.files b/pvAccessCPP.files index 6b8af14..18d359a 100644 --- a/pvAccessCPP.files +++ b/pvAccessCPP.files @@ -123,4 +123,115 @@ src/ca/caChannel.cpp testApp/utils/Makefile testApp/remote/channelAccessIFTest.h testApp/remote/channelAccessIFTest.cpp -testApp/remote/syncTestRequesters.h \ No newline at end of file +testApp/remote/syncTestRequesters.h +pvtoolsSrc/eget.cpp +pvtoolsSrc/pvget.cpp +pvtoolsSrc/pvinfo.cpp +pvtoolsSrc/pvput.cpp +pvtoolsSrc/pvutils.cpp +pvtoolsSrc/pvutils.h +src/ca/caChannel.cpp +src/ca/caChannel.h +src/ca/caProvider.cpp +src/ca/caProvider.h +src/client/pvAccess.cpp +src/client/pvAccess.h +src/factory/ChannelAccessFactory.cpp +src/mb/pvAccessMB.cpp +src/mb/pvAccessMB.h +src/pva/clientFactory.cpp +src/pva/clientFactory.h +src/pva/pvaConstants.h +src/pva/pvaVersion.cpp +src/pva/pvaVersion.h +src/remote/abstractResponseHandler.cpp +src/remote/beaconHandler.cpp +src/remote/beaconHandler.h +src/remote/blockingTCP.h +src/remote/blockingTCPAcceptor.cpp +src/remote/blockingTCPConnector.cpp +src/remote/blockingUDP.h +src/remote/blockingUDPConnector.cpp +src/remote/blockingUDPTransport.cpp +src/remote/channelSearchManager.h +src/remote/codec.cpp +src/remote/codec.h +src/remote/remote.h +src/remote/serializationHelper.cpp +src/remote/serializationHelper.h +src/remote/simpleChannelSearchManagerImpl.cpp +src/remote/simpleChannelSearchManagerImpl.h +src/remote/transportRegistry.cpp +src/remote/transportRegistry.h +src/remoteClient/clientContextImpl.cpp +src/remoteClient/clientContextImpl.h +src/rpcClient/rpcClient.cpp +src/rpcClient/rpcClient.h +src/rpcService/rpcServer.cpp +src/rpcService/rpcServer.h +src/rpcService/rpcService.cpp +src/rpcService/rpcService.h +src/server/baseChannelRequester.cpp +src/server/baseChannelRequester.h +src/server/beaconEmitter.cpp +src/server/beaconEmitter.h +src/server/beaconServerStatusProvider.cpp +src/server/beaconServerStatusProvider.h +src/server/responseHandlers.cpp +src/server/responseHandlers.h +src/server/serverChannelImpl.cpp +src/server/serverChannelImpl.h +src/server/serverContext.cpp +src/server/serverContext.h +src/utils/configuration.cpp +src/utils/configuration.h +src/utils/hexDump.cpp +src/utils/hexDump.h +src/utils/inetAddressUtil.cpp +src/utils/inetAddressUtil.h +src/utils/introspectionRegistry.cpp +src/utils/introspectionRegistry.h +src/utils/likely.h +src/utils/logger.cpp +src/utils/logger.h +src/utils/namedLockPattern.h +src/utils/referenceCountingLock.cpp +src/utils/referenceCountingLock.h +src/v3ioc/PVAClientRegister.cpp +src/v3ioc/PVAServerRegister.cpp +src/v3ioc/syncChannelFind.h +testApp/client/MockClientImpl.cpp +testApp/client/testChannelAccessFactory.cpp +testApp/client/testMockClient.cpp +testApp/client/testStartStop.cpp +testApp/remote/channelAccessIFTest.cpp +testApp/remote/channelAccessIFTest.h +testApp/remote/epicsv4Grayscale.h +testApp/remote/rpcClientExample.cpp +testApp/remote/rpcServiceExample.cpp +testApp/remote/syncTestRequesters.h +testApp/remote/testADCSim.cpp +testApp/remote/testBeaconEmitter.cpp +testApp/remote/testBeaconHandler.cpp +testApp/remote/testBlockingTCPClnt.cpp +testApp/remote/testBlockingTCPSrv.cpp +testApp/remote/testBlockingUDPClnt.cpp +testApp/remote/testBlockingUDPSrv.cpp +testApp/remote/testChannelAccess.cpp +testApp/remote/testChannelConnect.cpp +testApp/remote/testChannelSearchManager.cpp +testApp/remote/testCodec.cpp +testApp/remote/testGetPerformance.cpp +testApp/remote/testMonitorPerformance.cpp +testApp/remote/testNTImage.cpp +testApp/remote/testRemoteClientImpl.cpp +testApp/remote/testServer.cpp +testApp/remote/testServerContext.cpp +testApp/utils/configurationTest.cpp +testApp/utils/introspectionRegistryTest.cpp +testApp/utils/loggerTest.cpp +testApp/utils/namedLockPatternTest.cpp +testApp/utils/testAtomicBoolean.cpp +testApp/utils/testHexDump.cpp +testApp/utils/testInetAddressUtils.cpp +testApp/utils/transportRegistryTest.cpp diff --git a/src/remote/codec.cpp b/src/remote/codec.cpp index 00cffc9..a67da07 100644 --- a/src/remote/codec.cpp +++ b/src/remote/codec.cpp @@ -1173,11 +1173,11 @@ namespace epics { bac->processRead(); } catch (std::exception &e) { LOG(logLevelWarn, - "an exception caught while in sendThread at %s:%d: %s", + "an exception caught while in receiveThread at %s:%d: %s", __FILE__, __LINE__, e.what()); } catch (...) { LOG(logLevelWarn, - "unknown exception caught while in sendThread at %s:%d.", + "unknown exception caught while in receiveThread at %s:%d.", __FILE__, __LINE__); } } diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile index 6a6b3a9..5e2e524 100644 --- a/testApp/remote/Makefile +++ b/testApp/remote/Makefile @@ -60,6 +60,10 @@ PROD_HOST += testGetPerformance testGetPerformance_SRCS += testGetPerformance.cpp testGetPerformance_LIBS += pvAccess pvData pvMB Com +PROD_HOST += testMonitorPerformance +testMonitorPerformance_SRCS += testMonitorPerformance.cpp +testMonitorPerformance_LIBS += pvAccess pvData pvMB Com + PROD_HOST += rpcServiceExample rpcServiceExample_SRCS += rpcServiceExample.cpp rpcServiceExample_LIBS += pvAccess pvData pvMB Com diff --git a/testApp/remote/testGetPerformance.cpp b/testApp/remote/testGetPerformance.cpp index dc4749a..183e259 100644 --- a/testApp/remote/testGetPerformance.cpp +++ b/testApp/remote/testGetPerformance.cpp @@ -30,7 +30,7 @@ using namespace epics::pvData; using namespace epics::pvAccess; #define DEFAULT_TIMEOUT 600.0 -#define DEFAULT_REQUEST "field(value)" +#define DEFAULT_REQUEST "record[alwaysSendAll=true]field(value)" #define DEFAULT_ITERATIONS 10000 #define DEFAULT_CHANNELS 1 #define DEFAULT_ARRAY_SIZE 0 @@ -47,9 +47,6 @@ int arraySize = DEFAULT_ARRAY_SIZE; // 0 means scalar Mutex waitLoopPtrMutex; std::tr1::shared_ptr waitLoopEvent; -#define DEFAULT_TIMEOUT 600.0 -#define DEFAULT_REQUEST "field(value)" - double timeOut = DEFAULT_TIMEOUT; string request(DEFAULT_REQUEST); @@ -352,7 +349,7 @@ void runTest() std::cout << "not all channels are hosted by the same connection: " << theRemoteAddress << " != " << remoteAddress << std::endl; differentConnectionsWarningIssued = true; - // the assumes same connection (thread-safety) + // we assumes same connection (thread-safety) exit(2); } } diff --git a/testApp/remote/testMonitorPerformance.cpp b/testApp/remote/testMonitorPerformance.cpp new file mode 100644 index 0000000..e86f5f0 --- /dev/null +++ b/testApp/remote/testMonitorPerformance.cpp @@ -0,0 +1,499 @@ +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +#ifdef _WIN32 +#include +#else +#include +#endif + +#include + +using namespace std; +using namespace std::tr1; +using namespace epics::pvData; +using namespace epics::pvAccess; + +#define DEFAULT_TIMEOUT 600.0 +#define DEFAULT_REQUEST "record[velocious=true]field(value)" +#define DEFAULT_ITERATIONS 10000 +#define DEFAULT_CHANNELS 1 +#define DEFAULT_ARRAY_SIZE 0 +#define DEFAULT_RUNS 1 + +bool verbose = false; + +int iterations = DEFAULT_ITERATIONS; +int channels = DEFAULT_CHANNELS; +int runs = DEFAULT_RUNS; +int arraySize = DEFAULT_ARRAY_SIZE; // 0 means scalar +Mutex waitLoopPtrMutex; +std::tr1::shared_ptr waitLoopEvent; + +double timeOut = DEFAULT_TIMEOUT; +string request(DEFAULT_REQUEST); + +PVStructure::shared_pointer pvRequest; + +class RequesterImpl : public Requester, + public std::tr1::enable_shared_from_this +{ +public: + + virtual string getRequesterName() + { + return "RequesterImpl"; + }; + + virtual void message(std::string const & message,MessageType messageType) + { + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << std::endl; + } +}; + +void usage (void) +{ + fprintf (stderr, "\nUsage: testMonitorPerformance [options] ...\n\n" + " -h: Help: Print this message\n" + "options:\n" + " -r : pvRequest string, specifies what fields to return and options, default is '%s'\n" + " -i : number of iterations per each run, default is '%d'\n" + " -c : number of channels, default is '%d'\n" + " -s : number of array elements (0 means scalar), default is '%d'\n" + " -l : number of runs (0 means execute runs continuously), default is '%d'\n" + " -f : read configuration file that contains list of tests to be performed\n" + " each test is defined by a \" \" line\n" + " output is a space separated list of get operations per second for each run, one line per test\n" + " -v enable verbose output when configuration is read from the file\n" + " -w : wait time, specifies timeout, default is %f second(s)\n\n" + , DEFAULT_REQUEST, DEFAULT_ITERATIONS, DEFAULT_CHANNELS, DEFAULT_ARRAY_SIZE, DEFAULT_RUNS, DEFAULT_TIMEOUT); +} + +// TODO thread-safety +ChannelProvider::shared_pointer provider; +vector channelMonitorList; +int channelCount = 0; +int iterationCount = 0; +int runCount = 0; +double sum = 0; + +void reset() +{ + channelMonitorList.clear(); + channelCount = 0; + iterationCount = 0; + runCount = 0; + sum = 0; +} + +epicsTimeStamp startTime; + +void monitor_all() +{ + for (vector::const_iterator i = channelMonitorList.begin(); + i != channelMonitorList.end(); + i++) + (*i)->start(); +} + + +// NOTE: it is assumed that all the callbacks are called from the same thread, i.e. same TCP connection +class ChannelMonitorRequesterImpl : public MonitorRequester +{ +private: + Event m_event; + Event m_connectionEvent; + string m_channelName; + int m_count; + + timeval m_startTime; + +public: + + ChannelMonitorRequesterImpl(std::string channelName) : + m_channelName(channelName) + { + } + + virtual string getRequesterName() + { + return "ChannelMonitorRequesterImpl"; + } + + virtual void message(std::string const & message,MessageType messageType) + { + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << std::endl; + } + + virtual void monitorConnect(const epics::pvData::Status& status, + Monitor::shared_pointer const & /*monitor*/, + epics::pvData::Structure::const_shared_pointer const & /*structure*/) + { + if (status.isSuccess()) + { + // show warning + if (!status.isOK()) + { + std::cout << "[" << m_channelName << "] channel monitor create: " << status << std::endl; + } + + m_connectionEvent.signal(); + } + else + { + std::cout << "[" << m_channelName << "] failed to create channel monitor: " << status << std::endl; + } + } + + virtual void monitorEvent(Monitor::shared_pointer const & monitor) + { + + MonitorElement::shared_pointer element; + while (element = monitor->poll()) + { + channelCount++; + if (channelCount == channels) + { + iterationCount++; + channelCount = 0; + } + + if (iterationCount == iterations) + { + epicsTimeStamp endTime; + epicsTimeGetCurrent(&endTime); + + double duration = epicsTime(endTime) - epicsTime(startTime); + double getPerSec = iterations*channels/duration; + double gbit = getPerSec*arraySize*sizeof(double)*8/(1000*1000*1000); // * bits / giga; NO, it's really 1000 and not 1024 + if (verbose) + printf("%5.6f seconds, %.3f (x %d = %.3f) monitors/s, data throughput %5.3f Gbits/s\n", + duration, iterations/duration, channels, getPerSec, gbit); + sum += getPerSec; + + iterationCount = 0; + epicsTimeGetCurrent(&startTime); + + runCount++; + if (runs == 0 || runCount < runs) + { + // noop + } + else + { + printf("%d %d %d %d %.3f\n", channels, arraySize, iterations, runs, sum/runs); + + Lock guard(waitLoopPtrMutex); + waitLoopEvent->signal(); // all done + } + } + else if (channelCount == 0) + { + // noop + } + + monitor->release(element); + } + + } + + virtual void unlisten(Monitor::shared_pointer const & /*monitor*/) + { + std::cerr << "unlisten" << std::endl; + } + + bool waitUntilConnected(double timeOut) + { + return m_connectionEvent.wait(timeOut); + } +}; + +class ChannelRequesterImpl : public ChannelRequester +{ +private: + Event m_event; + +public: + + virtual string getRequesterName() + { + return "ChannelRequesterImpl"; + }; + + virtual void message(std::string const & message,MessageType messageType) + { + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << std::endl; + } + + virtual void channelCreated(const epics::pvData::Status& status, + Channel::shared_pointer const & channel) + { + if (status.isSuccess()) + { + // show warning + if (!status.isOK()) + { + std::cout << "[" << channel->getChannelName() << "] channel create: " << status << std::endl; + } + } + else + { + std::cout << "[" << channel->getChannelName() << "] failed to create a channel: " << status << std::endl; + } + } + + virtual void channelStateChange(Channel::shared_pointer const & /*channel*/, Channel::ConnectionState connectionState) + { + if (connectionState == Channel::CONNECTED) + { + m_event.signal(); + } + else + { + // ups... connection loss + std::cout << Channel::ConnectionStateNames[connectionState] << std::endl; + exit(3); + } + } + + bool waitUntilConnected(double timeOut) + { + return m_event.wait(timeOut); + } +}; + +void runTest() +{ + reset(); + + if (verbose) + printf("%d channel(s) of double array size of %d element(s) (0==scalar), %d iteration(s) per run, %d run(s) (0==forever)\n", channels, arraySize, iterations, runs); + + vector channelNames; + char buf[64]; + for (int i = 0; i < channels; i++) + { + if (arraySize > 0) + sprintf(buf, "testArray%d_%d", arraySize, i); + else + sprintf(buf, "test%d", i); + channelNames.push_back(buf); + } + + vector channels; + for (vector::const_iterator i = channelNames.begin(); + i != channelNames.end(); + i++) + { + shared_ptr channelRequesterImpl( + new ChannelRequesterImpl() + ); + Channel::shared_pointer channel = provider->createChannel(*i, channelRequesterImpl); + channels.push_back(channel); + } + + bool differentConnectionsWarningIssued = false; + string theRemoteAddress; + for (vector::iterator i = channels.begin(); + i != channels.end(); + i++) + { + Channel::shared_pointer channel = *i; + shared_ptr channelRequesterImpl = + dynamic_pointer_cast(channel->getChannelRequester()); + if (channelRequesterImpl->waitUntilConnected(5.0)) + { + string remoteAddress = channel->getRemoteAddress(); + if (theRemoteAddress.empty()) + { + theRemoteAddress = remoteAddress; + } + else if (theRemoteAddress != remoteAddress) + { + if (!differentConnectionsWarningIssued) + { + std::cout << "not all channels are hosted by the same connection: " << + theRemoteAddress << " != " << remoteAddress << std::endl; + differentConnectionsWarningIssued = true; + // we assumes same connection (thread-safety) + exit(2); + } + } + + shared_ptr getRequesterImpl( + new ChannelMonitorRequesterImpl(channel->getChannelName()) + ); + Monitor::shared_pointer monitor = channel->createMonitor(getRequesterImpl, pvRequest); + + bool allOK = getRequesterImpl->waitUntilConnected(timeOut); + + if (!allOK) + { + std::cout << "[" << channel->getChannelName() << "] failed to get all the monitors" << std::endl; + exit(1); + } + + channelMonitorList.push_back(monitor); + + } + else + { + std::cout << "[" << channel->getChannelName() << "] connection timeout" << std::endl; + exit(1); + } + } + if (verbose) + std::cout << "all connected" << std::endl; + + { + Lock guard(waitLoopPtrMutex); + waitLoopEvent.reset(new Event()); + } + epicsTimeGetCurrent(&startTime); + monitor_all(); + + waitLoopEvent->wait(); +} + +int main (int argc, char *argv[]) +{ + int opt; // getopt() current option + std::string testFile; + + Requester::shared_pointer requester(new RequesterImpl()); + + setvbuf(stdout,NULL,_IOLBF,BUFSIZ); // Set stdout to line buffering + + while ((opt = getopt(argc, argv, ":hr:w:i:c:s:l:f:v")) != -1) { + switch (opt) { + case 'h': // Print usage + usage(); + return 0; + case 'w': // Set PVA timeout value + if(epicsScanDouble(optarg, &timeOut) != 1) + { + fprintf(stderr, "'%s' is not a valid timeout value " + "- ignored. ('cainfo -h' for help.)\n", optarg); + timeOut = DEFAULT_TIMEOUT; + } + break; + case 'r': // pvRequest string + request = optarg; + break; + case 'i': // iterations + iterations = atoi(optarg); + break; + case 'c': // channels + channels = atoi(optarg); + break; + case 's': // arraySize + arraySize = atoi(optarg); + break; + case 'l': // runs + runs = atoi(optarg); + break; + case 'f': // testFile + testFile = optarg; + break; + case 'v': // testFile + verbose = true; + break; + case '?': + fprintf(stderr, + "Unrecognized option: '-%c'. ('testGetPerformance -h' for help.)\n", + optopt); + return 1; + case ':': + fprintf(stderr, + "Option '-%c' requires an argument. ('testGetPerformance -h' for help.)\n", + optopt); + return 1; + default : + usage(); + return 1; + } + } + + // typedef enum {logLevelInfo, logLevelDebug, logLevelError, errlogFatal} errlogSevEnum; + SET_LOG_LEVEL(logLevelError); + + pvRequest = CreateRequest::create()->createRequest(request); + if (pvRequest.get() == 0) { + printf("failed to parse request string\n"); + return 1; + } + + ClientFactory::start(); + provider = getChannelProviderRegistry()->getProvider("pva"); + + if (!testFile.empty()) + { + ifstream ifs(testFile.c_str(), ifstream::in); + if (ifs.good()) + { + string line; + while (true) + { + getline(ifs, line); + if (ifs.good()) + { + // ignore lines that starts (no trimming) with '#' + if (line.find('#') != 0) + { + // + if (sscanf(line.c_str(), "%d %d %d %d", &channels, &arraySize, &iterations, &runs) == 4) + { + //printf("%d %d %d %d\n", channels, arraySize, iterations, runs); + runTest(); + + // wait a bit for a next test + epicsThreadSleep(1.0); + } + else + { + fprintf(stderr, + "Failed to parse line '%s', ignoring...\n", + line.c_str()); + } + } + } + else + break; + } + } + else + { + fprintf(stderr, + "Failed to open file '%s'\n", + testFile.c_str()); + return 2; + } + + ifs.close(); + } + else + { + // in non-file mode, verbose is true by default + verbose = true; + runTest(); + } + + //ClientFactory::stop(); + + return 0; +} diff --git a/testApp/remote/testServer.cpp b/testApp/remote/testServer.cpp index da3fc57..6e9562c 100644 --- a/testApp/remote/testServer.cpp +++ b/testApp/remote/testServer.cpp @@ -933,6 +933,7 @@ class MockChannelGet : private: Channel::shared_pointer m_channel; ChannelGetRequester::shared_pointer m_channelGetRequester; + bool m_alwaysSendAll; PVStructure::shared_pointer m_pvStructure; BitSet::shared_pointer m_bitSet; ChannelProcess::shared_pointer m_channelProcess; @@ -946,11 +947,17 @@ protected: PVStructure::shared_pointer const & pvRequest) : m_channel(channel), m_channelGetRequester(channelGetRequester), + m_alwaysSendAll(false), m_pvStructure(getRequestedStructure(pvStructure, pvRequest)), m_bitSet(new BitSet(m_pvStructure->getNumberFields())), m_channelProcess(getChannelProcess(channel, pvRequest)) { PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(mockChannelGet); + + PVScalar::shared_pointer pvScalar = pvRequest->getSubField("record.alwaysSendAll"); + if (pvScalar) + m_alwaysSendAll = pvScalar->getAs(); + m_changed.set(); // initial value } @@ -982,7 +989,7 @@ public: m_channelProcess->process(); // TODO far from being thread-safe - if (m_changed.get()) + if (m_alwaysSendAll || m_changed.get()) { m_bitSet->set(0); m_changed.clear(); @@ -1998,6 +2005,7 @@ class MockMonitor : private: string m_channelName; MonitorRequester::shared_pointer m_monitorRequester; + bool m_continuous; PVStructure::shared_pointer m_pvStructure; PVStructure::shared_pointer m_copy; BitSet::shared_pointer m_changedBitSet; @@ -2016,7 +2024,9 @@ protected: MockMonitor(std::string const & channelName, MonitorRequester::shared_pointer const & monitorRequester, PVStructure::shared_pointer const & pvStructure, PVStructure::shared_pointer const & pvRequest) : m_channelName(channelName), - m_monitorRequester(monitorRequester), m_pvStructure(getRequestedStructure(pvStructure, pvRequest)), + m_monitorRequester(monitorRequester), + m_continuous(false), + m_pvStructure(getRequestedStructure(pvStructure, pvRequest)), m_copy(getPVDataCreate()->createPVStructure(m_pvStructure->getStructure())), m_changedBitSet(new BitSet(m_pvStructure->getNumberFields())), m_overrunBitSet(new BitSet(m_pvStructure->getNumberFields())), @@ -2027,6 +2037,11 @@ protected: { PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(mockMonitor); + PVScalar::shared_pointer pvScalar = pvRequest->getSubField("record.velocious"); + if (pvScalar) + m_continuous = pvScalar->getAs(); + + // we always send all m_changedBitSet->set(0); m_thisPtr->pvStructurePtr = m_copy; @@ -2138,7 +2153,12 @@ public: Lock xx(m_lock); if (m_state == MM_STATE_TAKEN) - m_state = MM_STATE_FREE; + { + if (m_continuous) + m_state = MM_STATE_FULL; + else + m_state = MM_STATE_FREE; + } } virtual void cancel()