diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index 65bbd99..ee0bfc7 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -1276,10 +1276,21 @@ printf("sendThreadRunnner exception\n"); _sendQueue.push_back(sender); } + class FlushTransportSender : public TransportSender { + public: + virtual void send(epics::pvData::ByteBuffer*, TransportSendControl* control) + { + control->flush(true); + } + + virtual void lock() {} + virtual void unlock() {} + }; + + static TransportSender::shared_pointer flushTransportSender(new FlushTransportSender()); + void BlockingTCPTransport::flushSendQueue() { - Lock lock(_sendQueueMutex); - if(unlikely(_closed.get())) return; - _sendQueueEvent.signal(); + enqueueSendRequest(flushTransportSender); } /* diff --git a/pvAccessApp/remote/transportRegistry.cpp b/pvAccessApp/remote/transportRegistry.cpp index 02a9c20..1b9189a 100644 --- a/pvAccessApp/remote/transportRegistry.cpp +++ b/pvAccessApp/remote/transportRegistry.cpp @@ -122,12 +122,14 @@ int32 TransportRegistry::numberOfActiveTransports() return _transportCount; } + auto_ptr TransportRegistry::toArray(String const & /*type*/) { // TODO support type return toArray(); } + auto_ptr TransportRegistry::toArray() { Lock guard(_mutex); @@ -153,5 +155,27 @@ auto_ptr TransportRegistry::toArray() return transportArray; } +void TransportRegistry::toArray(transportVector_t & transportArray) +{ + Lock guard(_mutex); + if (_transportCount == 0) + return; + + transportArray.reserve(transportArray.size() + _transportCount); + + for (transportsMap_t::iterator transportsIter = _transports.begin(); + transportsIter != _transports.end(); + transportsIter++) + { + prioritiesMapSharedPtr_t priorities = transportsIter->second; + for (prioritiesMap_t::iterator prioritiesIter = priorities->begin(); + prioritiesIter != priorities->end(); + prioritiesIter++) + { + transportArray.push_back(prioritiesIter->second); + } + } +} + }} diff --git a/pvAccessApp/remote/transportRegistry.h b/pvAccessApp/remote/transportRegistry.h index 60a8c07..fd0fa4d 100644 --- a/pvAccessApp/remote/transportRegistry.h +++ b/pvAccessApp/remote/transportRegistry.h @@ -39,8 +39,12 @@ public: Transport::shared_pointer remove(Transport::shared_pointer const & transport); void clear(); epics::pvData::int32 numberOfActiveTransports(); + + // TODO note type not supported std::auto_ptr toArray(epics::pvData::String const & type); std::auto_ptr toArray(); + // optimized to avoid reallocation, adds to array + void toArray(transportVector_t & transportArray); private: //TODO if unordered map is used instead of map we can use sockAddrAreIdentical routine from osiSock.h diff --git a/pvAccessApp/remoteClient/clientContextImpl.cpp b/pvAccessApp/remoteClient/clientContextImpl.cpp index e298922..c57a0f6 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.cpp +++ b/pvAccessApp/remoteClient/clientContextImpl.cpp @@ -3991,9 +3991,11 @@ namespace epics { m_namedLocker(), m_lastCID(0), m_lastIOID(0), m_version("pvAccess Client", "cpp", 1, 2, 0, true), m_contextState(CONTEXT_NOT_INITIALIZED), - m_configuration(new SystemConfigurationImpl()) + m_configuration(new SystemConfigurationImpl()), + m_flushStrategy(DELAYED) { PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(remoteClientContext); + m_flushTransports.reserve(64); loadConfiguration(); } @@ -4450,7 +4452,10 @@ TODO { // TODO we are creating a new response handler even-though we might not need a new transprot !!! auto_ptr handler(new ClientResponseHandler(shared_from_this())); - return m_connector->connect(client, handler, *serverAddress, minorRevision, priority); + Transport::shared_pointer t = m_connector->connect(client, handler, *serverAddress, minorRevision, priority); + // TODO !!! + static_pointer_cast(t)->setFlushStrategy(m_flushStrategy); + return t; } catch (...) { @@ -4526,19 +4531,38 @@ TODO throw std::runtime_error("Failed to obtain synchronization lock for '" + name + "', possible deadlock."); } } - - virtual void configure(epics::pvData::PVStructure::shared_pointer /*configuration*/) + + virtual void configure(epics::pvData::PVStructure::shared_pointer configuration) { - // TODO + if (m_transportRegistry->numberOfActiveTransports() > 0) + throw std::runtime_error("Configure must be called when there is no transports active."); + + PVInt::shared_pointer pvStrategy = dynamic_pointer_cast(configuration->getSubField("strategy")); + if (pvStrategy.get()) + { + int32 value = pvStrategy->get(); + switch (value) + { + case IMMEDIATE: + case DELAYED: + case USER_CONTROLED: + m_flushStrategy = static_cast(value); + break; + default: + // TODO report warning + break; + } + } + } virtual void flush() { - // TODO not OK, since new object is created by toArray() call - std::auto_ptr transports = m_transportRegistry->toArray(); - TransportRegistry::transportVector_t::const_iterator iter = transports->begin(); - while (iter != transports->end()) - (*iter)->flushSendQueue(); + m_transportRegistry->toArray(m_flushTransports); + TransportRegistry::transportVector_t::const_iterator iter = m_flushTransports.begin(); + while (iter != m_flushTransports.end()) + (*iter++)->flushSendQueue(); + m_flushTransports.clear(); } virtual void poll() @@ -4698,7 +4722,9 @@ TODO Configuration::shared_pointer m_configuration; - int m_refCount; + TransportRegistry::transportVector_t m_flushTransports; + + FlushStrategy m_flushStrategy; }; ClientContextImpl::shared_pointer createClientContextImpl() diff --git a/testApp/remote/testGetPerformance.cpp b/testApp/remote/testGetPerformance.cpp index 811835a..6dec124 100644 --- a/testApp/remote/testGetPerformance.cpp +++ b/testApp/remote/testGetPerformance.cpp @@ -112,7 +112,7 @@ void get_all() (*i)->get(false); // we assume all channels are from the same provider - //if (bulkMode) provider->flush(); + if (bulkMode) provider->flush(); } @@ -301,6 +301,14 @@ void runTest() if (verbose) printf("%d channel(s) of double array size of %d element(s) (0==scalar), %d iteration(s) per run, %d run(s) (0==forever)\n", channels, arraySize, iterations, runs); + StringArray fieldNames; + fieldNames.push_back("strategy"); + FieldConstPtrArray fields; + fields.push_back(getFieldCreate()->createScalar(pvInt)); + PVStructure::shared_pointer configuration = + getPVDataCreate()->createPVStructure(getFieldCreate()->createStructure(fieldNames, fields)); + configuration->getIntField("strategy")->put(bulkMode ? USER_CONTROLED : DELAYED); + vector channelNames; char buf[64]; for (int i = 0; i < channels; i++) @@ -323,6 +331,7 @@ void runTest() Channel::shared_pointer channel = provider->createChannel(*i, channelRequesterImpl); channels.push_back(channel); } + if (bulkMode) provider->flush(); bool differentConnectionsWarningIssued = false; String theRemoteAddress; @@ -333,7 +342,6 @@ void runTest() Channel::shared_pointer channel = *i; shared_ptr channelRequesterImpl = dynamic_pointer_cast(channel->getChannelRequester()); - //if (bulkMode) provider->flush(); if (channelRequesterImpl->waitUntilConnected(5.0)) { String remoteAddress = channel->getRemoteAddress(); @@ -357,7 +365,7 @@ void runTest() new ChannelGetRequesterImpl(channel->getChannelName()) ); ChannelGet::shared_pointer channelGet = channel->createChannelGet(getRequesterImpl, pvRequest); - //if (bulkMode) provider->flush(); + if (bulkMode) provider->flush(); bool allOK = getRequesterImpl->waitUntilConnected(timeOut); @@ -426,9 +434,9 @@ int main (int argc, char *argv[]) case 'l': // runs runs = atoi(optarg); break; - //case 'b': // bulk mode - // bulkMode = true; - // break; + case 'b': // bulk mode + bulkMode = true; + break; case 'f': // testFile testFile = optarg; break;