From d59f2c81ea6d5c831b4f58f42a97082448766bae Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Wed, 24 Apr 2013 12:22:43 +0200 Subject: [PATCH] testGetPerformance file support, fixed directSerialization with fast get clients --- pvAccessApp/server/responseHandlers.cpp | 9 +- testApp/remote/testGetPerformance.cpp | 230 +++++++++++++++--------- 2 files changed, 154 insertions(+), 85 deletions(-) diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index 01e60a8..0991f4d 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -690,6 +690,13 @@ void ServerChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendContro _status.serialize(buffer, control); } + // TODO !!! + // if we call stopRequest() below (the second one, commented out), we might be too late + // since between last serialization data and stopRequest() a buffer can be already flushed + // (i.e. in case of directSerialize) + // if we call it here, then a bad client can issue another request just after stopRequest() was called + stopRequest(); + if (_status.isSuccess()) { if (request & QOS_INIT) @@ -712,7 +719,7 @@ void ServerChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendContro } } - stopRequest(); + //stopRequest(); // lastRequest if (request & QOS_DESTROY) diff --git a/testApp/remote/testGetPerformance.cpp b/testApp/remote/testGetPerformance.cpp index 7966238..204ecee 100644 --- a/testApp/remote/testGetPerformance.cpp +++ b/testApp/remote/testGetPerformance.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -36,6 +37,8 @@ int channels = DEFAULT_CHANNELS; int runs = DEFAULT_RUNS; bool bulkMode = DEFAULT_BULK; int arraySize = DEFAULT_ARRAY_SIZE; // 0 means scalar +Mutex waitLoopPtrMutex; +std::tr1::shared_ptr waitLoopEvent; #define DEFAULT_TIMEOUT 600.0 #define DEFAULT_REQUEST "field(value)" @@ -86,6 +89,14 @@ int channelCount = 0; int iterationCount = 0; int runCount = 0; +void reset() +{ + channelGetList.clear(); + channelCount = 0; + iterationCount = 0; + runCount = 0; +} + timeval startTime; void get_all() @@ -198,7 +209,10 @@ public: if (runs == 0 || runCount < runs) get_all(); else - exit(0); // all done, not the nicest way to exit + { + Lock guard(waitLoopPtrMutex); + waitLoopEvent->signal(); // all done + } } else if (channelCount == 0) { @@ -209,7 +223,6 @@ public: { std::cout << "[" << m_channelName << "] failed to get: " << status.toString() << std::endl; } - } bool waitUntilConnected(double timeOut) @@ -272,6 +285,98 @@ public: } }; +void runTest() +{ + reset(); + + printf("%d channel(s) of double array size of %d element(s) (0==scalar), %d iteration(s) per run, %d run(s) (0==forever)\n", channels, arraySize, iterations, runs); + + vector channelNames; + char buf[64]; + for (int i = 0; i < channels; i++) + { + if (arraySize > 0) + sprintf(buf, "testArray%d_%d", arraySize, i); + else + sprintf(buf, "test%d", i); + channelNames.push_back(buf); + } + + vector channels; + for (vector::const_iterator i = channelNames.begin(); + i != channelNames.end(); + i++) + { + shared_ptr channelRequesterImpl( + new ChannelRequesterImpl() + ); + Channel::shared_pointer channel = provider->createChannel(*i, channelRequesterImpl); + channels.push_back(channel); + } + + bool differentConnectionsWarningIssued = false; + String theRemoteAddress; + for (vector::iterator i = channels.begin(); + i != channels.end(); + i++) + { + Channel::shared_pointer channel = *i; + shared_ptr channelRequesterImpl = + dynamic_pointer_cast(channel->getChannelRequester()); + //if (bulkMode) provider->flush(); + if (channelRequesterImpl->waitUntilConnected(5.0)) + { + String remoteAddress = channel->getRemoteAddress(); + if (theRemoteAddress.empty()) + { + theRemoteAddress = remoteAddress; + } + else if (theRemoteAddress != remoteAddress) + { + if (!differentConnectionsWarningIssued) + { + std::cout << "not all channels are hosted by the same connection: " << + theRemoteAddress << " != " << remoteAddress << std::endl; + differentConnectionsWarningIssued = true; + // the assumes same connection (thread-safety) + exit(2); + } + } + + shared_ptr getRequesterImpl( + new ChannelGetRequesterImpl(channel->getChannelName()) + ); + ChannelGet::shared_pointer channelGet = channel->createChannelGet(getRequesterImpl, pvRequest); + //if (bulkMode) provider->flush(); + + bool allOK = getRequesterImpl->waitUntilConnected(timeOut); + + if (!allOK) + { + std::cout << "[" << channel->getChannelName() << "] failed to get all the gets" << std::endl; + exit(1); + } + + channelGetList.push_back(channelGet); + + } + else + { + std::cout << "[" << channel->getChannelName() << "] connection timeout" << std::endl; + exit(1); + } + } + std::cout << "all connected" << std::endl; + + { + Lock guard(waitLoopPtrMutex); + waitLoopEvent.reset(new Event()); + } + gettimeofday(&startTime, NULL); + get_all(); + + waitLoopEvent->wait(); +} int main (int argc, char *argv[]) { @@ -315,12 +420,6 @@ int main (int argc, char *argv[]) // break; case 'f': // testFile testFile = optarg; - - // TODO - fprintf(stderr, - "Unimplemented option: '-%c'.\n", - opt); - return 1; break; case '?': fprintf(stderr, @@ -350,94 +449,57 @@ int main (int argc, char *argv[]) ClientFactory::start(); provider = getChannelAccess()->getProvider("pvAccess"); - printf("%d channel(s) of double array size of %d element(s) (0==scalar), %d iteration(s) per run, %d run(s) (0==forever)\n", channels, arraySize, iterations, runs); - - vector channelNames; - char buf[64]; - for (int i = 0; i < channels; i++) + if (!testFile.empty()) { - if (arraySize > 0) - sprintf(buf, "testArray%d_%d", arraySize, i); - else - sprintf(buf, "test%d", i); - channelNames.push_back(buf); - } - - bool allOK = 1; - - vector channels; - for (vector::const_iterator i = channelNames.begin(); - i != channelNames.end(); - i++) - { - shared_ptr channelRequesterImpl( - new ChannelRequesterImpl() - ); - Channel::shared_pointer channel = provider->createChannel(*i, channelRequesterImpl); - channels.push_back(channel); - } - - bool differentConnectionsWarningIssued = false; - String theRemoteAddress; - for (vector::iterator i = channels.begin(); - i != channels.end(); - i++) - { - Channel::shared_pointer channel = *i; - shared_ptr channelRequesterImpl = - dynamic_pointer_cast(channel->getChannelRequester()); - //if (bulkMode) provider->flush(); - if (channelRequesterImpl->waitUntilConnected(5.0)) + ifstream ifs(testFile.c_str(), ifstream::in); + if (ifs.good()) { - String remoteAddress = channel->getRemoteAddress(); - if (theRemoteAddress.empty()) + string line; + while (true) { - theRemoteAddress = remoteAddress; - } - else if (theRemoteAddress != remoteAddress) - { - if (!differentConnectionsWarningIssued) + getline(ifs, line); + if (ifs.good()) { - std::cout << "not all channels are hosted by the same connection: " << - theRemoteAddress << " != " << remoteAddress << std::endl; - differentConnectionsWarningIssued = true; - // the assumes same connection (thread-safety) - return 2; + // ignore lines that starts (no trimming) with '#' + if (line.find('#') != 0) + { + // + if (sscanf(line.c_str(), "%d %d %d %d", &channels, &arraySize, &iterations, &runs) == 4) + { + //printf("%d %d %d %d\n", channels, arraySize, iterations, runs); + runTest(); + + // wait a bit for a next test + epicsThreadSleep(1.0); + } + else + { + fprintf(stderr, + "Failed to parse line '%s', ignoring...\n", + line.c_str()); + } + } } + else + break; } - - shared_ptr getRequesterImpl( - new ChannelGetRequesterImpl(channel->getChannelName()) - ); - ChannelGet::shared_pointer channelGet = channel->createChannelGet(getRequesterImpl, pvRequest); - //if (bulkMode) provider->flush(); - - allOK = getRequesterImpl->waitUntilConnected(timeOut); - - if (!allOK) - { - std::cout << "[" << channel->getChannelName() << "] failed to get all the gets" << std::endl; - return 1; - } - - channelGetList.push_back(channelGet); - } else { - std::cout << "[" << channel->getChannelName() << "] connection timeout" << std::endl; - return 1; + fprintf(stderr, + "Failed to open file '%s'\n", + testFile.c_str()); + return 2; } + + ifs.close(); + } + else + { + runTest(); } - std::cout << "all connected" << std::endl; - - gettimeofday(&startTime, NULL); - get_all(); - - while (true) - epicsThreadSleep(600.0); //ClientFactory::stop(); - return allOK ? 0 : 1; + return 0; }