FlushStrategy, ChannelProvider::configure, USER_CONTROLED support

This commit is contained in:
Matej Sekoranja
2013-04-24 23:27:51 +02:00
parent 43b545b644
commit 116c9f40e1
5 changed files with 93 additions and 20 deletions

View File

@@ -1276,10 +1276,21 @@ printf("sendThreadRunnner exception\n");
_sendQueue.push_back(sender);
}
class FlushTransportSender : public TransportSender {
public:
virtual void send(epics::pvData::ByteBuffer*, TransportSendControl* control)
{
control->flush(true);
}
virtual void lock() {}
virtual void unlock() {}
};
static TransportSender::shared_pointer flushTransportSender(new FlushTransportSender());
void BlockingTCPTransport::flushSendQueue() {
Lock lock(_sendQueueMutex);
if(unlikely(_closed.get())) return;
_sendQueueEvent.signal();
enqueueSendRequest(flushTransportSender);
}
/*

View File

@@ -122,12 +122,14 @@ int32 TransportRegistry::numberOfActiveTransports()
return _transportCount;
}
auto_ptr<TransportRegistry::transportVector_t> TransportRegistry::toArray(String const & /*type*/)
{
// TODO support type
return toArray();
}
auto_ptr<TransportRegistry::transportVector_t> TransportRegistry::toArray()
{
Lock guard(_mutex);
@@ -153,5 +155,27 @@ auto_ptr<TransportRegistry::transportVector_t> TransportRegistry::toArray()
return transportArray;
}
void TransportRegistry::toArray(transportVector_t & transportArray)
{
Lock guard(_mutex);
if (_transportCount == 0)
return;
transportArray.reserve(transportArray.size() + _transportCount);
for (transportsMap_t::iterator transportsIter = _transports.begin();
transportsIter != _transports.end();
transportsIter++)
{
prioritiesMapSharedPtr_t priorities = transportsIter->second;
for (prioritiesMap_t::iterator prioritiesIter = priorities->begin();
prioritiesIter != priorities->end();
prioritiesIter++)
{
transportArray.push_back(prioritiesIter->second);
}
}
}
}}

View File

@@ -39,8 +39,12 @@ public:
Transport::shared_pointer remove(Transport::shared_pointer const & transport);
void clear();
epics::pvData::int32 numberOfActiveTransports();
// TODO note type not supported
std::auto_ptr<transportVector_t> toArray(epics::pvData::String const & type);
std::auto_ptr<transportVector_t> toArray();
// optimized to avoid reallocation, adds to array
void toArray(transportVector_t & transportArray);
private:
//TODO if unordered map is used instead of map we can use sockAddrAreIdentical routine from osiSock.h

View File

@@ -3991,9 +3991,11 @@ namespace epics {
m_namedLocker(), m_lastCID(0), m_lastIOID(0),
m_version("pvAccess Client", "cpp", 1, 2, 0, true),
m_contextState(CONTEXT_NOT_INITIALIZED),
m_configuration(new SystemConfigurationImpl())
m_configuration(new SystemConfigurationImpl()),
m_flushStrategy(DELAYED)
{
PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(remoteClientContext);
m_flushTransports.reserve(64);
loadConfiguration();
}
@@ -4450,7 +4452,10 @@ TODO
{
// TODO we are creating a new response handler even-though we might not need a new transprot !!!
auto_ptr<ResponseHandler> handler(new ClientResponseHandler(shared_from_this()));
return m_connector->connect(client, handler, *serverAddress, minorRevision, priority);
Transport::shared_pointer t = m_connector->connect(client, handler, *serverAddress, minorRevision, priority);
// TODO !!!
static_pointer_cast<BlockingTCPTransport>(t)->setFlushStrategy(m_flushStrategy);
return t;
}
catch (...)
{
@@ -4526,19 +4531,38 @@ TODO
throw std::runtime_error("Failed to obtain synchronization lock for '" + name + "', possible deadlock.");
}
}
virtual void configure(epics::pvData::PVStructure::shared_pointer /*configuration*/)
virtual void configure(epics::pvData::PVStructure::shared_pointer configuration)
{
// TODO
if (m_transportRegistry->numberOfActiveTransports() > 0)
throw std::runtime_error("Configure must be called when there is no transports active.");
PVInt::shared_pointer pvStrategy = dynamic_pointer_cast<PVInt>(configuration->getSubField("strategy"));
if (pvStrategy.get())
{
int32 value = pvStrategy->get();
switch (value)
{
case IMMEDIATE:
case DELAYED:
case USER_CONTROLED:
m_flushStrategy = static_cast<FlushStrategy>(value);
break;
default:
// TODO report warning
break;
}
}
}
virtual void flush()
{
// TODO not OK, since new object is created by toArray() call
std::auto_ptr<TransportRegistry::transportVector_t> transports = m_transportRegistry->toArray();
TransportRegistry::transportVector_t::const_iterator iter = transports->begin();
while (iter != transports->end())
(*iter)->flushSendQueue();
m_transportRegistry->toArray(m_flushTransports);
TransportRegistry::transportVector_t::const_iterator iter = m_flushTransports.begin();
while (iter != m_flushTransports.end())
(*iter++)->flushSendQueue();
m_flushTransports.clear();
}
virtual void poll()
@@ -4698,7 +4722,9 @@ TODO
Configuration::shared_pointer m_configuration;
int m_refCount;
TransportRegistry::transportVector_t m_flushTransports;
FlushStrategy m_flushStrategy;
};
ClientContextImpl::shared_pointer createClientContextImpl()

View File

@@ -112,7 +112,7 @@ void get_all()
(*i)->get(false);
// we assume all channels are from the same provider
//if (bulkMode) provider->flush();
if (bulkMode) provider->flush();
}
@@ -301,6 +301,14 @@ void runTest()
if (verbose)
printf("%d channel(s) of double array size of %d element(s) (0==scalar), %d iteration(s) per run, %d run(s) (0==forever)\n", channels, arraySize, iterations, runs);
StringArray fieldNames;
fieldNames.push_back("strategy");
FieldConstPtrArray fields;
fields.push_back(getFieldCreate()->createScalar(pvInt));
PVStructure::shared_pointer configuration =
getPVDataCreate()->createPVStructure(getFieldCreate()->createStructure(fieldNames, fields));
configuration->getIntField("strategy")->put(bulkMode ? USER_CONTROLED : DELAYED);
vector<string> channelNames;
char buf[64];
for (int i = 0; i < channels; i++)
@@ -323,6 +331,7 @@ void runTest()
Channel::shared_pointer channel = provider->createChannel(*i, channelRequesterImpl);
channels.push_back(channel);
}
if (bulkMode) provider->flush();
bool differentConnectionsWarningIssued = false;
String theRemoteAddress;
@@ -333,7 +342,6 @@ void runTest()
Channel::shared_pointer channel = *i;
shared_ptr<ChannelRequesterImpl> channelRequesterImpl =
dynamic_pointer_cast<ChannelRequesterImpl>(channel->getChannelRequester());
//if (bulkMode) provider->flush();
if (channelRequesterImpl->waitUntilConnected(5.0))
{
String remoteAddress = channel->getRemoteAddress();
@@ -357,7 +365,7 @@ void runTest()
new ChannelGetRequesterImpl(channel->getChannelName())
);
ChannelGet::shared_pointer channelGet = channel->createChannelGet(getRequesterImpl, pvRequest);
//if (bulkMode) provider->flush();
if (bulkMode) provider->flush();
bool allOK = getRequesterImpl->waitUntilConnected(timeOut);
@@ -426,9 +434,9 @@ int main (int argc, char *argv[])
case 'l': // runs
runs = atoi(optarg);
break;
//case 'b': // bulk mode
// bulkMode = true;
// break;
case 'b': // bulk mode
bulkMode = true;
break;
case 'f': // testFile
testFile = optarg;
break;