#include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef _WIN32 #include #else #include #endif #include using namespace std; namespace TR1 = std::tr1; using namespace epics::pvData; using namespace epics::pvAccess; #define DEFAULT_TIMEOUT 600.0 #define DEFAULT_REQUEST "record[alwaysSendAll=true]field(value)" #define DEFAULT_ITERATIONS 10000 #define DEFAULT_CHANNELS 1 #define DEFAULT_ARRAY_SIZE 0 #define DEFAULT_RUNS 1 #define DEFAULT_BULK false bool verbose = false; int iterations = DEFAULT_ITERATIONS; int channels = DEFAULT_CHANNELS; int runs = DEFAULT_RUNS; bool bulkMode = DEFAULT_BULK; int arraySize = DEFAULT_ARRAY_SIZE; // 0 means scalar Mutex waitLoopPtrMutex; TR1::shared_ptr waitLoopEvent; double timeOut = DEFAULT_TIMEOUT; string request(DEFAULT_REQUEST); PVStructure::shared_pointer pvRequest; class RequesterImpl : public Requester, public TR1::enable_shared_from_this { public: virtual string getRequesterName() { return "RequesterImpl"; }; virtual void message(std::string const & message,MessageType messageType) { std::cout << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << std::endl; } }; void usage (void) { fprintf (stderr, "\nUsage: testGetPerformance [options] ...\n\n" " -h: Help: Print this message\n" "options:\n" " -r : pvRequest string, specifies what fields to return and options, default is '%s'\n" " -i : number of iterations per each run, default is '%d'\n" " -c : number of channels, default is '%d'\n" " -s : number of array elements (0 means scalar), default is '%d'\n" " -l : number of runs (0 means execute runs continuously), default is '%d'\n" //" -b: bulk mode (send request messages in bulks), default is %d\n" " -f : read configuration file that contains list of tests to be performed\n" " each test is defined by a \" \" line\n" " output is a space separated list of get operations per second for each run, one line per test\n" " -v enable verbose output when configuration is read from the file\n" " -w : wait time, specifies timeout, default is %f second(s)\n\n" , DEFAULT_REQUEST, DEFAULT_ITERATIONS, DEFAULT_CHANNELS, DEFAULT_ARRAY_SIZE, DEFAULT_RUNS, /*DEFAULT_BULK,*/ DEFAULT_TIMEOUT); } // TODO thread-safety ChannelProvider::shared_pointer provider; vector channelGetList; int channelCount = 0; int iterationCount = 0; int runCount = 0; double sum = 0; void reset() { channelGetList.clear(); channelCount = 0; iterationCount = 0; runCount = 0; sum = 0; } epicsTimeStamp startTime; void get_all() { for (vector::const_iterator i = channelGetList.begin(); i != channelGetList.end(); i++) (*i)->get(); // we assume all channels are from the same provider if (bulkMode) provider->flush(); } // NOTE: it is assumed that all the callbacks are called from the same thread, i.e. same TCP connection class ChannelGetRequesterImpl : public ChannelGetRequester { private: Event m_event; Event m_connectionEvent; string m_channelName; public: ChannelGetRequesterImpl(std::string channelName) : m_channelName(channelName) { } virtual string getRequesterName() { return "ChannelGetRequesterImpl"; } virtual void message(std::string const & message,MessageType messageType) { std::cout << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << std::endl; } virtual void channelGetConnect(const epics::pvData::Status& status, ChannelGet::shared_pointer const & /*channelGet*/, epics::pvData::Structure::const_shared_pointer const & /*structure*/) { if (status.isSuccess()) { // show warning if (!status.isOK()) { std::cout << "[" << m_channelName << "] channel get create: " << status << std::endl; } m_connectionEvent.signal(); } else { std::cout << "[" << m_channelName << "] failed to create channel get: " << status << std::endl; } } virtual void getDone(const epics::pvData::Status& status, ChannelGet::shared_pointer const & /*channelGet*/, epics::pvData::PVStructure::shared_pointer const & /*pvStructure*/, epics::pvData::BitSet::shared_pointer const & /*bitSet*/) { if (status.isSuccess()) { // show warning if (!status.isOK()) { std::cout << "[" << m_channelName << "] channel get: " << status << std::endl; } channelCount++; if (channelCount == channels) { iterationCount++; channelCount = 0; } if (iterationCount == iterations) { epicsTimeStamp endTime; epicsTimeGetCurrent(&endTime); double duration = epicsTime(endTime) - epicsTime(startTime); double getPerSec = iterations*channels/duration; double gbit = getPerSec*arraySize*sizeof(double)*8/(1000*1000*1000); // * bits / giga; NO, it's really 1000 and not 1024 if (verbose) printf("%5.6f seconds, %.3f (x %d = %.3f) gets/s, data throughput %5.3f Gbits/s\n", duration, iterations/duration, channels, getPerSec, gbit); sum += getPerSec; iterationCount = 0; epicsTimeGetCurrent(&startTime); runCount++; if (runs == 0 || runCount < runs) get_all(); else { printf("%d %d %d %d %.3f\n", channels, arraySize, iterations, runs, sum/runs); Lock guard(waitLoopPtrMutex); waitLoopEvent->signal(); // all done } } else if (channelCount == 0) { get_all(); } } else { std::cout << "[" << m_channelName << "] failed to get: " << status << std::endl; } } bool waitUntilConnected(double timeOut) { return m_connectionEvent.wait(timeOut); } }; class ChannelRequesterImpl : public ChannelRequester { private: Event m_event; public: virtual string getRequesterName() { return "ChannelRequesterImpl"; }; virtual void message(std::string const & message,MessageType messageType) { std::cout << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << std::endl; } virtual void channelCreated(const epics::pvData::Status& status, Channel::shared_pointer const & channel) { if (status.isSuccess()) { // show warning if (!status.isOK()) { std::cout << "[" << channel->getChannelName() << "] channel create: " << status << std::endl; } } else { std::cout << "[" << channel->getChannelName() << "] failed to create a channel: " << status << std::endl; } } virtual void channelStateChange(Channel::shared_pointer const & /*channel*/, Channel::ConnectionState connectionState) { if (connectionState == Channel::CONNECTED) { m_event.signal(); } else { // ups... connection loss std::cout << Channel::ConnectionStateNames[connectionState] << std::endl; exit(3); } } bool waitUntilConnected(double timeOut) { return m_event.wait(timeOut); } }; void runTest() { reset(); if (verbose) printf("%d channel(s) of double array size of %d element(s) (0==scalar), %d iteration(s) per run, %d run(s) (0==forever)\n", channels, arraySize, iterations, runs); /* StringArray fieldNames; fieldNames.push_back("strategy"); FieldConstPtrArray fields; fields.push_back(getFieldCreate()->createScalar(pvInt)); PVStructure::shared_pointer configuration = getPVDataCreate()->createPVStructure(getFieldCreate()->createStructure(fieldNames, fields)); configuration->getSubField("strategy")->put(bulkMode ? USER_CONTROLED : DELAYED); provider->configure(configuration); */ vector channelNames; char buf[64]; for (int i = 0; i < channels; i++) { if (arraySize > 0) sprintf(buf, "testArray%d_%d", arraySize, i); else sprintf(buf, "test%d", i); channelNames.push_back(buf); } vector channels; for (vector::const_iterator i = channelNames.begin(); i != channelNames.end(); i++) { TR1::shared_ptr channelRequesterImpl( new ChannelRequesterImpl() ); Channel::shared_pointer channel = provider->createChannel(*i, channelRequesterImpl); channels.push_back(channel); } if (bulkMode) provider->flush(); bool differentConnectionsWarningIssued = false; string theRemoteAddress; for (vector::iterator i = channels.begin(); i != channels.end(); i++) { Channel::shared_pointer channel = *i; TR1::shared_ptr channelRequesterImpl = TR1::dynamic_pointer_cast(channel->getChannelRequester()); if (channelRequesterImpl->waitUntilConnected(5.0)) { string remoteAddress = channel->getRemoteAddress(); if (theRemoteAddress.empty()) { theRemoteAddress = remoteAddress; } else if (theRemoteAddress != remoteAddress) { if (!differentConnectionsWarningIssued) { std::cout << "not all channels are hosted by the same connection: " << theRemoteAddress << " != " << remoteAddress << std::endl; differentConnectionsWarningIssued = true; // we assumes same connection (thread-safety) exit(2); } } TR1::shared_ptr getRequesterImpl( new ChannelGetRequesterImpl(channel->getChannelName()) ); ChannelGet::shared_pointer channelGet = channel->createChannelGet(getRequesterImpl, pvRequest); if (bulkMode) provider->flush(); bool allOK = getRequesterImpl->waitUntilConnected(timeOut); if (!allOK) { std::cout << "[" << channel->getChannelName() << "] failed to get all the gets" << std::endl; exit(1); } channelGetList.push_back(channelGet); } else { std::cout << "[" << channel->getChannelName() << "] connection timeout" << std::endl; exit(1); } } if (verbose) std::cout << "all connected" << std::endl; { Lock guard(waitLoopPtrMutex); waitLoopEvent.reset(new Event()); } epicsTimeGetCurrent(&startTime); get_all(); waitLoopEvent->wait(); } int main (int argc, char *argv[]) { int opt; // getopt() current option std::string testFile; setvbuf(stdout,NULL,_IOLBF,BUFSIZ); // Set stdout to line buffering while ((opt = getopt(argc, argv, ":hr:w:i:c:s:l:bf:v")) != -1) { switch (opt) { case 'h': // Print usage usage(); return 0; case 'w': // Set PVA timeout value if((epicsScanDouble(optarg, &timeOut)) != 1) { fprintf(stderr, "'%s' is not a valid timeout value " "- ignored. ('cainfo -h' for help.)\n", optarg); timeOut = DEFAULT_TIMEOUT; } break; case 'r': // pvRequest string request = optarg; break; case 'i': // iterations iterations = atoi(optarg); break; case 'c': // channels channels = atoi(optarg); break; case 's': // arraySize arraySize = atoi(optarg); break; case 'l': // runs runs = atoi(optarg); break; case 'b': // bulk mode bulkMode = true; break; case 'f': // testFile testFile = optarg; break; case 'v': // testFile verbose = true; break; case '?': fprintf(stderr, "Unrecognized option: '-%c'. ('testGetPerformance -h' for help.)\n", optopt); return 1; case ':': fprintf(stderr, "Option '-%c' requires an argument. ('testGetPerformance -h' for help.)\n", optopt); return 1; default : usage(); return 1; } } // typedef enum {logLevelInfo, logLevelDebug, logLevelError, errlogFatal} errlogSevEnum; SET_LOG_LEVEL(logLevelError); pvRequest = CreateRequest::create()->createRequest(request); if (pvRequest.get() == 0) { printf("failed to parse request string\n"); return 1; } ClientFactory::start(); provider = getChannelProviderRegistry()->getProvider("pva"); if (!testFile.empty()) { ifstream ifs(testFile.c_str(), ifstream::in); if (ifs.good()) { string line; while (true) { getline(ifs, line); if (ifs.good()) { // ignore lines that starts (no trimming) with '#' if (line.find('#') != 0) { // if (sscanf(line.c_str(), "%d %d %d %d", &channels, &arraySize, &iterations, &runs) == 4) { //printf("%d %d %d %d\n", channels, arraySize, iterations, runs); runTest(); // wait a bit for a next test epicsThreadSleep(1.0); } else { fprintf(stderr, "Failed to parse line '%s', ignoring...\n", line.c_str()); } } } else break; } } else { fprintf(stderr, "Failed to open file '%s'\n", testFile.c_str()); return 2; } ifs.close(); } else { // in non-file mode, verbose is true by default verbose = true; runTest(); } //ClientFactory::stop(); return 0; }