#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace std; using namespace std::tr1; using namespace epics::pvData; using namespace epics::pvAccess; #define DEFAULT_TIMEOUT 600.0 #define DEFAULT_REQUEST "field(value)" #define DEFAULT_ITERATIONS 10000 #define DEFAULT_CHANNELS 1 #define DEFAULT_ARRAY_SIZE 0 #define DEFAULT_RUNS 1 #define DEFAULT_BULK false int iterations = DEFAULT_ITERATIONS; int channels = DEFAULT_CHANNELS; int runs = DEFAULT_RUNS; bool bulkMode = DEFAULT_BULK; #define DEFAULT_TIMEOUT 600.0 #define DEFAULT_REQUEST "field(value)" double timeOut = DEFAULT_TIMEOUT; string request(DEFAULT_REQUEST); PVStructure::shared_pointer pvRequest; class RequesterImpl : public Requester, public std::tr1::enable_shared_from_this { public: virtual String getRequesterName() { return "RequesterImpl"; }; virtual void message(String const & message,MessageType messageType) { std::cout << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << std::endl; } }; void usage (void) { fprintf (stderr, "\nUsage: testGetPerformance [options] ...\n\n" " -h: Help: Print this message\n" "options:\n" " -r : pvRequest string, specifies what fields to return and options, default is '%s'\n" " -i : number of iterations per each run, default is '%d'\n" " -c : number of channels, default is '%d'\n" " -s : number of array elements (0 means scalar), default is '%d'\n" " -l : number of runs (0 means execute runs continuously), default is '%d'\n" //" -b: bulk mode (send request messages in bulks), default is %d\n" " -w : Wait time, specifies timeout, default is %f second(s)\n\n" , DEFAULT_REQUEST, DEFAULT_ITERATIONS, DEFAULT_CHANNELS, DEFAULT_ARRAY_SIZE, DEFAULT_RUNS, DEFAULT_BULK, DEFAULT_TIMEOUT); } // TODO thread-safety ChannelProvider::shared_pointer provider; vector channelGetList; int channelCount = 0; int iterationCount = 0; int runCount = 0; timeval startTime; void get_all() { for (vector::const_iterator i = channelGetList.begin(); i != channelGetList.end(); i++) (*i)->get(false); // we assume all channels are from the same provider if (bulkMode) provider->flush(); } // NOTE: it is assumed that all the callbacks are called from the same thread, i.e. same TCP connection class ChannelGetRequesterImpl : public ChannelGetRequester { private: ChannelGet::shared_pointer m_channelGet; PVStructure::shared_pointer m_pvStructure; BitSet::shared_pointer m_bitSet; Event m_event; Event m_connectionEvent; String m_channelName; int m_count; timeval m_startTime; public: ChannelGetRequesterImpl(String channelName) : m_channelName(channelName) { } virtual String getRequesterName() { return "ChannelGetRequesterImpl"; } virtual void message(String const & message,MessageType messageType) { std::cout << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << std::endl; } virtual void channelGetConnect(const epics::pvData::Status& status, ChannelGet::shared_pointer const & channelGet, epics::pvData::PVStructure::shared_pointer const & pvStructure, epics::pvData::BitSet::shared_pointer const & bitSet) { if (status.isSuccess()) { // show warning if (!status.isOK()) { std::cout << "[" << m_channelName << "] channel get create: " << status.toString() << std::endl; } m_channelGet = channelGet; m_pvStructure = pvStructure; m_bitSet = bitSet; m_connectionEvent.signal(); } else { std::cout << "[" << m_channelName << "] failed to create channel get: " << status.toString() << std::endl; } } virtual void getDone(const epics::pvData::Status& status) { if (status.isSuccess()) { // show warning if (!status.isOK()) { std::cout << "[" << m_channelName << "] channel get: " << status.toString() << std::endl; } channelCount++; if (channelCount == channels) { iterationCount++; channelCount = 0; } if (iterationCount == iterations) { timeval endTime; gettimeofday(&endTime, NULL); long seconds, nseconds; double duration; seconds = endTime.tv_sec - startTime.tv_sec; nseconds = endTime.tv_usec - startTime.tv_usec; duration = seconds + nseconds/1000000.0; printf("%5.6f seconds, %5.3f (x %d = %5.3f) gets/s\n", duration, iterations/duration, channels, iterations*channels/duration); iterationCount = 0; gettimeofday(&startTime, NULL); runCount++; if (runs == 0 || runCount < runs) get_all(); else exit(0); // all done, not the nicest way to exit } else if (channelCount == 0) { get_all(); } } else { std::cout << "[" << m_channelName << "] failed to get: " << status.toString() << std::endl; } } bool waitUntilConnected(double timeOut) { return m_connectionEvent.wait(timeOut); } }; class ChannelRequesterImpl : public ChannelRequester { private: Event m_event; public: virtual String getRequesterName() { return "ChannelRequesterImpl"; }; virtual void message(String const & message,MessageType messageType) { std::cout << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << std::endl; } virtual void channelCreated(const epics::pvData::Status& status, Channel::shared_pointer const & channel) { if (status.isSuccess()) { // show warning if (!status.isOK()) { std::cout << "[" << channel->getChannelName() << "] channel create: " << status.toString() << std::endl; } } else { std::cout << "[" << channel->getChannelName() << "] failed to create a channel: " << status.toString() << std::endl; } } virtual void channelStateChange(Channel::shared_pointer const & /*channel*/, Channel::ConnectionState connectionState) { if (connectionState == Channel::CONNECTED) { m_event.signal(); } else { // ups... connection loss std::cout << Channel::ConnectionStateNames[connectionState] << std::endl; exit(3); } } bool waitUntilConnected(double timeOut) { return m_event.wait(timeOut); } }; int main (int argc, char *argv[]) { int opt; // getopt() current option int arraySize = DEFAULT_ARRAY_SIZE; // 0 means scalar Requester::shared_pointer requester(new RequesterImpl()); setvbuf(stdout,NULL,_IOLBF,BUFSIZ); // Set stdout to line buffering while ((opt = getopt(argc, argv, ":hr:w:i:c:s:l:b")) != -1) { switch (opt) { case 'h': // Print usage usage(); return 0; case 'w': // Set CA timeout value if(epicsScanDouble(optarg, &timeOut) != 1) { fprintf(stderr, "'%s' is not a valid timeout value " "- ignored. ('cainfo -h' for help.)\n", optarg); timeOut = DEFAULT_TIMEOUT; } break; case 'r': // pvRequest string request = optarg; break; case 'i': // iterations iterations = atoi(optarg); break; case 'c': // channels channels = atoi(optarg); break; case 's': // arraySize arraySize = atoi(optarg); break; case 'l': // runs runs = atoi(optarg); break; //case 'b': // bulk mode // bulkMode = true; // break; case '?': fprintf(stderr, "Unrecognized option: '-%c'. ('testGetPerformance -h' for help.)\n", optopt); return 1; case ':': fprintf(stderr, "Option '-%c' requires an argument. ('testGetPerformance -h' for help.)\n", optopt); return 1; default : usage(); return 1; } } printf("%d channel(s) of double array size of %d element(s) (0==scalar), %d iteration(s) per run, %d run(s) (0==forever)\n", channels, arraySize, iterations, runs); vector channelNames; char buf[64]; for (int i = 0; i < channels; i++) { if (arraySize > 0) sprintf(buf, "testArray%d_%d", arraySize, i); else sprintf(buf, "test%d", i); channelNames.push_back(buf); } pvRequest = getCreateRequest()->createRequest(request,requester); if (pvRequest.get() == 0) { printf("failed to parse request string\n"); return 1; } // typedef enum {logLevelInfo, logLevelDebug, logLevelError, errlogFatal} errlogSevEnum; SET_LOG_LEVEL(logLevelError); bool allOK = 1; ClientFactory::start(); provider = getChannelAccess()->getProvider("pvAccess"); vector channels; for (vector::const_iterator i = channelNames.begin(); i != channelNames.end(); i++) { shared_ptr channelRequesterImpl( new ChannelRequesterImpl() ); Channel::shared_pointer channel = provider->createChannel(*i, channelRequesterImpl); channels.push_back(channel); } bool differentConnectionsWarningIssued = false; String theRemoteAddress; for (vector::iterator i = channels.begin(); i != channels.end(); i++) { Channel::shared_pointer channel = *i; shared_ptr channelRequesterImpl = dynamic_pointer_cast(channel->getChannelRequester()); if (bulkMode) provider->flush(); if (channelRequesterImpl->waitUntilConnected(5.0)) { String remoteAddress = channel->getRemoteAddress(); if (theRemoteAddress.empty()) { theRemoteAddress = remoteAddress; } else if (theRemoteAddress != remoteAddress) { if (!differentConnectionsWarningIssued) { std::cout << "not all channels are hosted by the same connection: " << theRemoteAddress << " != " << remoteAddress << std::endl; differentConnectionsWarningIssued = true; // the assumes same connection (thread-safety) return 2; } } shared_ptr getRequesterImpl( new ChannelGetRequesterImpl(channel->getChannelName()) ); ChannelGet::shared_pointer channelGet = channel->createChannelGet(getRequesterImpl, pvRequest); if (bulkMode) provider->flush(); allOK = getRequesterImpl->waitUntilConnected(timeOut); if (!allOK) { std::cout << "[" << channel->getChannelName() << "] failed to get all the gets" << std::endl; return 1; } channelGetList.push_back(channelGet); } else { std::cout << "[" << channel->getChannelName() << "] connection timeout" << std::endl; return 1; } } std::cout << "all connected" << std::endl; gettimeofday(&startTime, NULL); get_all(); while (true) epicsThreadSleep(600.0); //ClientFactory::stop(); return allOK ? 0 : 1; }