testGetPerformance file support, fixed directSerialization with fast get clients

This commit is contained in:
Matej Sekoranja
2013-04-24 12:22:43 +02:00
parent f72f89b4d2
commit d59f2c81ea
2 changed files with 154 additions and 85 deletions

View File

@@ -1,4 +1,5 @@
#include <iostream>
#include <fstream>
#include <pv/clientFactory.h>
#include <pv/pvAccess.h>
@@ -36,6 +37,8 @@ int channels = DEFAULT_CHANNELS;
int runs = DEFAULT_RUNS;
bool bulkMode = DEFAULT_BULK;
int arraySize = DEFAULT_ARRAY_SIZE; // 0 means scalar
Mutex waitLoopPtrMutex;
std::tr1::shared_ptr<Event> waitLoopEvent;
#define DEFAULT_TIMEOUT 600.0
#define DEFAULT_REQUEST "field(value)"
@@ -86,6 +89,14 @@ int channelCount = 0;
int iterationCount = 0;
int runCount = 0;
void reset()
{
channelGetList.clear();
channelCount = 0;
iterationCount = 0;
runCount = 0;
}
timeval startTime;
void get_all()
@@ -198,7 +209,10 @@ public:
if (runs == 0 || runCount < runs)
get_all();
else
exit(0); // all done, not the nicest way to exit
{
Lock guard(waitLoopPtrMutex);
waitLoopEvent->signal(); // all done
}
}
else if (channelCount == 0)
{
@@ -209,7 +223,6 @@ public:
{
std::cout << "[" << m_channelName << "] failed to get: " << status.toString() << std::endl;
}
}
bool waitUntilConnected(double timeOut)
@@ -272,6 +285,98 @@ public:
}
};
void runTest()
{
reset();
printf("%d channel(s) of double array size of %d element(s) (0==scalar), %d iteration(s) per run, %d run(s) (0==forever)\n", channels, arraySize, iterations, runs);
vector<string> channelNames;
char buf[64];
for (int i = 0; i < channels; i++)
{
if (arraySize > 0)
sprintf(buf, "testArray%d_%d", arraySize, i);
else
sprintf(buf, "test%d", i);
channelNames.push_back(buf);
}
vector<Channel::shared_pointer> channels;
for (vector<string>::const_iterator i = channelNames.begin();
i != channelNames.end();
i++)
{
shared_ptr<ChannelRequesterImpl> channelRequesterImpl(
new ChannelRequesterImpl()
);
Channel::shared_pointer channel = provider->createChannel(*i, channelRequesterImpl);
channels.push_back(channel);
}
bool differentConnectionsWarningIssued = false;
String theRemoteAddress;
for (vector<Channel::shared_pointer>::iterator i = channels.begin();
i != channels.end();
i++)
{
Channel::shared_pointer channel = *i;
shared_ptr<ChannelRequesterImpl> channelRequesterImpl =
dynamic_pointer_cast<ChannelRequesterImpl>(channel->getChannelRequester());
//if (bulkMode) provider->flush();
if (channelRequesterImpl->waitUntilConnected(5.0))
{
String remoteAddress = channel->getRemoteAddress();
if (theRemoteAddress.empty())
{
theRemoteAddress = remoteAddress;
}
else if (theRemoteAddress != remoteAddress)
{
if (!differentConnectionsWarningIssued)
{
std::cout << "not all channels are hosted by the same connection: " <<
theRemoteAddress << " != " << remoteAddress << std::endl;
differentConnectionsWarningIssued = true;
// the assumes same connection (thread-safety)
exit(2);
}
}
shared_ptr<ChannelGetRequesterImpl> getRequesterImpl(
new ChannelGetRequesterImpl(channel->getChannelName())
);
ChannelGet::shared_pointer channelGet = channel->createChannelGet(getRequesterImpl, pvRequest);
//if (bulkMode) provider->flush();
bool allOK = getRequesterImpl->waitUntilConnected(timeOut);
if (!allOK)
{
std::cout << "[" << channel->getChannelName() << "] failed to get all the gets" << std::endl;
exit(1);
}
channelGetList.push_back(channelGet);
}
else
{
std::cout << "[" << channel->getChannelName() << "] connection timeout" << std::endl;
exit(1);
}
}
std::cout << "all connected" << std::endl;
{
Lock guard(waitLoopPtrMutex);
waitLoopEvent.reset(new Event());
}
gettimeofday(&startTime, NULL);
get_all();
waitLoopEvent->wait();
}
int main (int argc, char *argv[])
{
@@ -315,12 +420,6 @@ int main (int argc, char *argv[])
// break;
case 'f': // testFile
testFile = optarg;
// TODO
fprintf(stderr,
"Unimplemented option: '-%c'.\n",
opt);
return 1;
break;
case '?':
fprintf(stderr,
@@ -350,94 +449,57 @@ int main (int argc, char *argv[])
ClientFactory::start();
provider = getChannelAccess()->getProvider("pvAccess");
printf("%d channel(s) of double array size of %d element(s) (0==scalar), %d iteration(s) per run, %d run(s) (0==forever)\n", channels, arraySize, iterations, runs);
vector<string> channelNames;
char buf[64];
for (int i = 0; i < channels; i++)
if (!testFile.empty())
{
if (arraySize > 0)
sprintf(buf, "testArray%d_%d", arraySize, i);
else
sprintf(buf, "test%d", i);
channelNames.push_back(buf);
}
bool allOK = 1;
vector<Channel::shared_pointer> channels;
for (vector<string>::const_iterator i = channelNames.begin();
i != channelNames.end();
i++)
{
shared_ptr<ChannelRequesterImpl> channelRequesterImpl(
new ChannelRequesterImpl()
);
Channel::shared_pointer channel = provider->createChannel(*i, channelRequesterImpl);
channels.push_back(channel);
}
bool differentConnectionsWarningIssued = false;
String theRemoteAddress;
for (vector<Channel::shared_pointer>::iterator i = channels.begin();
i != channels.end();
i++)
{
Channel::shared_pointer channel = *i;
shared_ptr<ChannelRequesterImpl> channelRequesterImpl =
dynamic_pointer_cast<ChannelRequesterImpl>(channel->getChannelRequester());
//if (bulkMode) provider->flush();
if (channelRequesterImpl->waitUntilConnected(5.0))
ifstream ifs(testFile.c_str(), ifstream::in);
if (ifs.good())
{
String remoteAddress = channel->getRemoteAddress();
if (theRemoteAddress.empty())
string line;
while (true)
{
theRemoteAddress = remoteAddress;
}
else if (theRemoteAddress != remoteAddress)
{
if (!differentConnectionsWarningIssued)
getline(ifs, line);
if (ifs.good())
{
std::cout << "not all channels are hosted by the same connection: " <<
theRemoteAddress << " != " << remoteAddress << std::endl;
differentConnectionsWarningIssued = true;
// the assumes same connection (thread-safety)
return 2;
// ignore lines that starts (no trimming) with '#'
if (line.find('#') != 0)
{
// <c> <s> <i> <l>
if (sscanf(line.c_str(), "%d %d %d %d", &channels, &arraySize, &iterations, &runs) == 4)
{
//printf("%d %d %d %d\n", channels, arraySize, iterations, runs);
runTest();
// wait a bit for a next test
epicsThreadSleep(1.0);
}
else
{
fprintf(stderr,
"Failed to parse line '%s', ignoring...\n",
line.c_str());
}
}
}
else
break;
}
shared_ptr<ChannelGetRequesterImpl> getRequesterImpl(
new ChannelGetRequesterImpl(channel->getChannelName())
);
ChannelGet::shared_pointer channelGet = channel->createChannelGet(getRequesterImpl, pvRequest);
//if (bulkMode) provider->flush();
allOK = getRequesterImpl->waitUntilConnected(timeOut);
if (!allOK)
{
std::cout << "[" << channel->getChannelName() << "] failed to get all the gets" << std::endl;
return 1;
}
channelGetList.push_back(channelGet);
}
else
{
std::cout << "[" << channel->getChannelName() << "] connection timeout" << std::endl;
return 1;
fprintf(stderr,
"Failed to open file '%s'\n",
testFile.c_str());
return 2;
}
ifs.close();
}
else
{
runTest();
}
std::cout << "all connected" << std::endl;
gettimeofday(&startTime, NULL);
get_all();
while (true)
epicsThreadSleep(600.0);
//ClientFactory::stop();
return allOK ? 0 : 1;
return 0;
}