526 lines
16 KiB
C++
526 lines
16 KiB
C++
#include <iostream>
|
|
#include <fstream>
|
|
#include <pv/clientFactory.h>
|
|
#include <pv/pvAccess.h>
|
|
|
|
#include <stdio.h>
|
|
#include <epicsStdlib.h>
|
|
#include <epicsGetopt.h>
|
|
#include <epicsThread.h>
|
|
#include <epicsTime.h>
|
|
#include <pv/logger.h>
|
|
#include <pv/lock.h>
|
|
|
|
#include <vector>
|
|
#include <string>
|
|
|
|
#include <stdlib.h>
|
|
|
|
#ifdef _WIN32
|
|
#include <WinSock2.h>
|
|
#else
|
|
#include <sys/time.h>
|
|
#endif
|
|
|
|
#include <pv/event.h>
|
|
|
|
using namespace std;
|
|
using namespace std::tr1;
|
|
using namespace epics::pvData;
|
|
using namespace epics::pvAccess;
|
|
|
|
#define DEFAULT_TIMEOUT 600.0
|
|
#define DEFAULT_REQUEST "field(value)"
|
|
#define DEFAULT_ITERATIONS 10000
|
|
#define DEFAULT_CHANNELS 1
|
|
#define DEFAULT_ARRAY_SIZE 0
|
|
#define DEFAULT_RUNS 1
|
|
#define DEFAULT_BULK false
|
|
|
|
bool verbose = false;
|
|
|
|
int iterations = DEFAULT_ITERATIONS;
|
|
int channels = DEFAULT_CHANNELS;
|
|
int runs = DEFAULT_RUNS;
|
|
bool bulkMode = DEFAULT_BULK;
|
|
int arraySize = DEFAULT_ARRAY_SIZE; // 0 means scalar
|
|
Mutex waitLoopPtrMutex;
|
|
std::tr1::shared_ptr<Event> waitLoopEvent;
|
|
|
|
#define DEFAULT_TIMEOUT 600.0
|
|
#define DEFAULT_REQUEST "field(value)"
|
|
|
|
double timeOut = DEFAULT_TIMEOUT;
|
|
string request(DEFAULT_REQUEST);
|
|
|
|
PVStructure::shared_pointer pvRequest;
|
|
|
|
class RequesterImpl : public Requester,
|
|
public std::tr1::enable_shared_from_this<RequesterImpl>
|
|
{
|
|
public:
|
|
|
|
virtual string getRequesterName()
|
|
{
|
|
return "RequesterImpl";
|
|
};
|
|
|
|
virtual void message(std::string const & message,MessageType messageType)
|
|
{
|
|
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << std::endl;
|
|
}
|
|
};
|
|
|
|
void usage (void)
|
|
{
|
|
fprintf (stderr, "\nUsage: testGetPerformance [options] <PV name>...\n\n"
|
|
" -h: Help: Print this message\n"
|
|
"options:\n"
|
|
" -r <pv request>: pvRequest string, specifies what fields to return and options, default is '%s'\n"
|
|
" -i <iterations>: number of iterations per each run, default is '%d'\n"
|
|
" -c <channels>: number of channels, default is '%d'\n"
|
|
" -s <array size>: number of array elements (0 means scalar), default is '%d'\n"
|
|
" -l <runs>: number of runs (0 means execute runs continuously), default is '%d'\n"
|
|
//" -b: bulk mode (send request messages in bulks), default is %d\n"
|
|
" -f <filename>: read configuration file that contains list of tests to be performed\n"
|
|
" each test is defined by a \"<c> <s> <i> <l>\" line\n"
|
|
" output is a space separated list of get operations per second for each run, one line per test\n"
|
|
" -v enable verbose output when configuration is read from the file\n"
|
|
" -w <sec>: wait time, specifies timeout, default is %f second(s)\n\n"
|
|
, DEFAULT_REQUEST, DEFAULT_ITERATIONS, DEFAULT_CHANNELS, DEFAULT_ARRAY_SIZE, DEFAULT_RUNS, /*DEFAULT_BULK,*/ DEFAULT_TIMEOUT);
|
|
}
|
|
|
|
// TODO thread-safety
|
|
ChannelProvider::shared_pointer provider;
|
|
vector<ChannelGet::shared_pointer> channelGetList;
|
|
int channelCount = 0;
|
|
int iterationCount = 0;
|
|
int runCount = 0;
|
|
double sum = 0;
|
|
|
|
void reset()
|
|
{
|
|
channelGetList.clear();
|
|
channelCount = 0;
|
|
iterationCount = 0;
|
|
runCount = 0;
|
|
sum = 0;
|
|
}
|
|
|
|
epicsTimeStamp startTime;
|
|
|
|
void get_all()
|
|
{
|
|
for (vector<ChannelGet::shared_pointer>::const_iterator i = channelGetList.begin();
|
|
i != channelGetList.end();
|
|
i++)
|
|
(*i)->get();
|
|
|
|
// we assume all channels are from the same provider
|
|
if (bulkMode) provider->flush();
|
|
}
|
|
|
|
|
|
// NOTE: it is assumed that all the callbacks are called from the same thread, i.e. same TCP connection
|
|
class ChannelGetRequesterImpl : public ChannelGetRequester
|
|
{
|
|
private:
|
|
Event m_event;
|
|
Event m_connectionEvent;
|
|
string m_channelName;
|
|
int m_count;
|
|
|
|
timeval m_startTime;
|
|
|
|
public:
|
|
|
|
ChannelGetRequesterImpl(std::string channelName) :
|
|
m_channelName(channelName)
|
|
{
|
|
}
|
|
|
|
virtual string getRequesterName()
|
|
{
|
|
return "ChannelGetRequesterImpl";
|
|
}
|
|
|
|
virtual void message(std::string const & message,MessageType messageType)
|
|
{
|
|
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << std::endl;
|
|
}
|
|
|
|
virtual void channelGetConnect(const epics::pvData::Status& status,
|
|
ChannelGet::shared_pointer const & /*channelGet*/,
|
|
epics::pvData::Structure::const_shared_pointer const & /*structure*/)
|
|
{
|
|
if (status.isSuccess())
|
|
{
|
|
// show warning
|
|
if (!status.isOK())
|
|
{
|
|
std::cout << "[" << m_channelName << "] channel get create: " << status << std::endl;
|
|
}
|
|
|
|
m_connectionEvent.signal();
|
|
}
|
|
else
|
|
{
|
|
std::cout << "[" << m_channelName << "] failed to create channel get: " << status << std::endl;
|
|
}
|
|
}
|
|
|
|
virtual void getDone(const epics::pvData::Status& status,
|
|
ChannelGet::shared_pointer const & /*channelGet*/,
|
|
epics::pvData::PVStructure::shared_pointer const & /*pvStructure*/,
|
|
epics::pvData::BitSet::shared_pointer const & /*bitSet*/)
|
|
{
|
|
if (status.isSuccess())
|
|
{
|
|
// show warning
|
|
if (!status.isOK())
|
|
{
|
|
std::cout << "[" << m_channelName << "] channel get: " << status << std::endl;
|
|
}
|
|
|
|
channelCount++;
|
|
if (channelCount == channels)
|
|
{
|
|
iterationCount++;
|
|
channelCount = 0;
|
|
}
|
|
|
|
if (iterationCount == iterations)
|
|
{
|
|
epicsTimeStamp endTime;
|
|
epicsTimeGetCurrent(&endTime);
|
|
|
|
double duration = epicsTime(endTime) - epicsTime(startTime);
|
|
double getPerSec = iterations*channels/duration;
|
|
double gbit = getPerSec*arraySize*sizeof(double)*8/(1000*1000*1000); // * bits / giga; NO, it's really 1000 and not 1024
|
|
if (verbose)
|
|
printf("%5.6f seconds, %.3f (x %d = %.3f) gets/s, data throughput %5.3f Gbits/s\n",
|
|
duration, iterations/duration, channels, getPerSec, gbit);
|
|
sum += getPerSec;
|
|
|
|
iterationCount = 0;
|
|
epicsTimeGetCurrent(&startTime);
|
|
|
|
runCount++;
|
|
if (runs == 0 || runCount < runs)
|
|
get_all();
|
|
else
|
|
{
|
|
printf("%d %d %d %d %.3f\n", channels, arraySize, iterations, runs, sum/runs);
|
|
|
|
Lock guard(waitLoopPtrMutex);
|
|
waitLoopEvent->signal(); // all done
|
|
}
|
|
}
|
|
else if (channelCount == 0)
|
|
{
|
|
get_all();
|
|
}
|
|
}
|
|
else
|
|
{
|
|
std::cout << "[" << m_channelName << "] failed to get: " << status << std::endl;
|
|
}
|
|
}
|
|
|
|
bool waitUntilConnected(double timeOut)
|
|
{
|
|
return m_connectionEvent.wait(timeOut);
|
|
}
|
|
};
|
|
|
|
class ChannelRequesterImpl : public ChannelRequester
|
|
{
|
|
private:
|
|
Event m_event;
|
|
|
|
public:
|
|
|
|
virtual string getRequesterName()
|
|
{
|
|
return "ChannelRequesterImpl";
|
|
};
|
|
|
|
virtual void message(std::string const & message,MessageType messageType)
|
|
{
|
|
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << std::endl;
|
|
}
|
|
|
|
virtual void channelCreated(const epics::pvData::Status& status,
|
|
Channel::shared_pointer const & channel)
|
|
{
|
|
if (status.isSuccess())
|
|
{
|
|
// show warning
|
|
if (!status.isOK())
|
|
{
|
|
std::cout << "[" << channel->getChannelName() << "] channel create: " << status << std::endl;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
std::cout << "[" << channel->getChannelName() << "] failed to create a channel: " << status << std::endl;
|
|
}
|
|
}
|
|
|
|
virtual void channelStateChange(Channel::shared_pointer const & /*channel*/, Channel::ConnectionState connectionState)
|
|
{
|
|
if (connectionState == Channel::CONNECTED)
|
|
{
|
|
m_event.signal();
|
|
}
|
|
else
|
|
{
|
|
// ups... connection loss
|
|
std::cout << Channel::ConnectionStateNames[connectionState] << std::endl;
|
|
exit(3);
|
|
}
|
|
}
|
|
|
|
bool waitUntilConnected(double timeOut)
|
|
{
|
|
return m_event.wait(timeOut);
|
|
}
|
|
};
|
|
|
|
void runTest()
|
|
{
|
|
reset();
|
|
|
|
if (verbose)
|
|
printf("%d channel(s) of double array size of %d element(s) (0==scalar), %d iteration(s) per run, %d run(s) (0==forever)\n", channels, arraySize, iterations, runs);
|
|
|
|
/*
|
|
StringArray fieldNames;
|
|
fieldNames.push_back("strategy");
|
|
FieldConstPtrArray fields;
|
|
fields.push_back(getFieldCreate()->createScalar(pvInt));
|
|
PVStructure::shared_pointer configuration =
|
|
getPVDataCreate()->createPVStructure(getFieldCreate()->createStructure(fieldNames, fields));
|
|
configuration->getIntField("strategy")->put(bulkMode ? USER_CONTROLED : DELAYED);
|
|
provider->configure(configuration);
|
|
*/
|
|
|
|
vector<string> channelNames;
|
|
char buf[64];
|
|
for (int i = 0; i < channels; i++)
|
|
{
|
|
if (arraySize > 0)
|
|
sprintf(buf, "testArray%d_%d", arraySize, i);
|
|
else
|
|
sprintf(buf, "test%d", i);
|
|
channelNames.push_back(buf);
|
|
}
|
|
|
|
vector<Channel::shared_pointer> channels;
|
|
for (vector<string>::const_iterator i = channelNames.begin();
|
|
i != channelNames.end();
|
|
i++)
|
|
{
|
|
shared_ptr<ChannelRequesterImpl> channelRequesterImpl(
|
|
new ChannelRequesterImpl()
|
|
);
|
|
Channel::shared_pointer channel = provider->createChannel(*i, channelRequesterImpl);
|
|
channels.push_back(channel);
|
|
}
|
|
if (bulkMode) provider->flush();
|
|
|
|
bool differentConnectionsWarningIssued = false;
|
|
string theRemoteAddress;
|
|
for (vector<Channel::shared_pointer>::iterator i = channels.begin();
|
|
i != channels.end();
|
|
i++)
|
|
{
|
|
Channel::shared_pointer channel = *i;
|
|
shared_ptr<ChannelRequesterImpl> channelRequesterImpl =
|
|
dynamic_pointer_cast<ChannelRequesterImpl>(channel->getChannelRequester());
|
|
if (channelRequesterImpl->waitUntilConnected(5.0))
|
|
{
|
|
string remoteAddress = channel->getRemoteAddress();
|
|
if (theRemoteAddress.empty())
|
|
{
|
|
theRemoteAddress = remoteAddress;
|
|
}
|
|
else if (theRemoteAddress != remoteAddress)
|
|
{
|
|
if (!differentConnectionsWarningIssued)
|
|
{
|
|
std::cout << "not all channels are hosted by the same connection: " <<
|
|
theRemoteAddress << " != " << remoteAddress << std::endl;
|
|
differentConnectionsWarningIssued = true;
|
|
// the assumes same connection (thread-safety)
|
|
exit(2);
|
|
}
|
|
}
|
|
|
|
shared_ptr<ChannelGetRequesterImpl> getRequesterImpl(
|
|
new ChannelGetRequesterImpl(channel->getChannelName())
|
|
);
|
|
ChannelGet::shared_pointer channelGet = channel->createChannelGet(getRequesterImpl, pvRequest);
|
|
if (bulkMode) provider->flush();
|
|
|
|
bool allOK = getRequesterImpl->waitUntilConnected(timeOut);
|
|
|
|
if (!allOK)
|
|
{
|
|
std::cout << "[" << channel->getChannelName() << "] failed to get all the gets" << std::endl;
|
|
exit(1);
|
|
}
|
|
|
|
channelGetList.push_back(channelGet);
|
|
|
|
}
|
|
else
|
|
{
|
|
std::cout << "[" << channel->getChannelName() << "] connection timeout" << std::endl;
|
|
exit(1);
|
|
}
|
|
}
|
|
if (verbose)
|
|
std::cout << "all connected" << std::endl;
|
|
|
|
{
|
|
Lock guard(waitLoopPtrMutex);
|
|
waitLoopEvent.reset(new Event());
|
|
}
|
|
epicsTimeGetCurrent(&startTime);
|
|
get_all();
|
|
|
|
waitLoopEvent->wait();
|
|
}
|
|
|
|
int main (int argc, char *argv[])
|
|
{
|
|
int opt; // getopt() current option
|
|
std::string testFile;
|
|
|
|
Requester::shared_pointer requester(new RequesterImpl());
|
|
|
|
setvbuf(stdout,NULL,_IOLBF,BUFSIZ); // Set stdout to line buffering
|
|
|
|
while ((opt = getopt(argc, argv, ":hr:w:i:c:s:l:bf:v")) != -1) {
|
|
switch (opt) {
|
|
case 'h': // Print usage
|
|
usage();
|
|
return 0;
|
|
case 'w': // Set PVA timeout value
|
|
if(epicsScanDouble(optarg, &timeOut) != 1)
|
|
{
|
|
fprintf(stderr, "'%s' is not a valid timeout value "
|
|
"- ignored. ('cainfo -h' for help.)\n", optarg);
|
|
timeOut = DEFAULT_TIMEOUT;
|
|
}
|
|
break;
|
|
case 'r': // pvRequest string
|
|
request = optarg;
|
|
break;
|
|
case 'i': // iterations
|
|
iterations = atoi(optarg);
|
|
break;
|
|
case 'c': // channels
|
|
channels = atoi(optarg);
|
|
break;
|
|
case 's': // arraySize
|
|
arraySize = atoi(optarg);
|
|
break;
|
|
case 'l': // runs
|
|
runs = atoi(optarg);
|
|
break;
|
|
case 'b': // bulk mode
|
|
bulkMode = true;
|
|
break;
|
|
case 'f': // testFile
|
|
testFile = optarg;
|
|
break;
|
|
case 'v': // testFile
|
|
verbose = true;
|
|
break;
|
|
case '?':
|
|
fprintf(stderr,
|
|
"Unrecognized option: '-%c'. ('testGetPerformance -h' for help.)\n",
|
|
optopt);
|
|
return 1;
|
|
case ':':
|
|
fprintf(stderr,
|
|
"Option '-%c' requires an argument. ('testGetPerformance -h' for help.)\n",
|
|
optopt);
|
|
return 1;
|
|
default :
|
|
usage();
|
|
return 1;
|
|
}
|
|
}
|
|
|
|
// typedef enum {logLevelInfo, logLevelDebug, logLevelError, errlogFatal} errlogSevEnum;
|
|
SET_LOG_LEVEL(logLevelError);
|
|
|
|
pvRequest = CreateRequest::create()->createRequest(request);
|
|
if (pvRequest.get() == 0) {
|
|
printf("failed to parse request string\n");
|
|
return 1;
|
|
}
|
|
|
|
ClientFactory::start();
|
|
provider = getChannelProviderRegistry()->getProvider("pva");
|
|
|
|
if (!testFile.empty())
|
|
{
|
|
ifstream ifs(testFile.c_str(), ifstream::in);
|
|
if (ifs.good())
|
|
{
|
|
string line;
|
|
while (true)
|
|
{
|
|
getline(ifs, line);
|
|
if (ifs.good())
|
|
{
|
|
// ignore lines that starts (no trimming) with '#'
|
|
if (line.find('#') != 0)
|
|
{
|
|
// <c> <s> <i> <l>
|
|
if (sscanf(line.c_str(), "%d %d %d %d", &channels, &arraySize, &iterations, &runs) == 4)
|
|
{
|
|
//printf("%d %d %d %d\n", channels, arraySize, iterations, runs);
|
|
runTest();
|
|
|
|
// wait a bit for a next test
|
|
epicsThreadSleep(1.0);
|
|
}
|
|
else
|
|
{
|
|
fprintf(stderr,
|
|
"Failed to parse line '%s', ignoring...\n",
|
|
line.c_str());
|
|
}
|
|
}
|
|
}
|
|
else
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
fprintf(stderr,
|
|
"Failed to open file '%s'\n",
|
|
testFile.c_str());
|
|
return 2;
|
|
}
|
|
|
|
ifs.close();
|
|
}
|
|
else
|
|
{
|
|
// in non-file mode, verbose is true by default
|
|
verbose = true;
|
|
runTest();
|
|
}
|
|
|
|
//ClientFactory::stop();
|
|
|
|
return 0;
|
|
}
|