diff --git a/src/remote/beaconHandler.cpp b/src/remote/beaconHandler.cpp index 4867d42..619c4de 100644 --- a/src/remote/beaconHandler.cpp +++ b/src/remote/beaconHandler.cpp @@ -16,10 +16,8 @@ namespace epics { namespace pvAccess { BeaconHandler::BeaconHandler(Context::shared_pointer const & context, - std::string const & protocol, const osiSockAddr* responseFrom) : _context(Context::weak_pointer(context)), - _protocol(protocol), _responseFrom(*responseFrom), _mutex(), _serverGUID(), @@ -88,7 +86,7 @@ bool BeaconHandler::updateBeacon(int8 /*remoteTransportRevision*/, TimeStamp* /* void BeaconHandler::changedTransport() { TransportRegistry::transportVector_t transports; - _context.lock()->getTransportRegistry()->get(_protocol, &_responseFrom, transports); + _context.lock()->getTransportRegistry()->toArray(transports, &_responseFrom); // notify all for (TransportRegistry::transportVector_t::iterator iter(transports.begin()), end(transports.end()); diff --git a/src/remote/blockingTCPConnector.cpp b/src/remote/blockingTCPConnector.cpp index 069935a..9ef0e21 100644 --- a/src/remote/blockingTCPConnector.cpp +++ b/src/remote/blockingTCPConnector.cpp @@ -26,7 +26,6 @@ BlockingTCPConnector::BlockingTCPConnector( int receiveBufferSize, float heartbeatInterval) : _context(context), - _namedLocker(), _receiveBufferSize(receiveBufferSize), _heartbeatInterval(heartbeatInterval) { @@ -75,8 +74,13 @@ Transport::shared_pointer BlockingTCPConnector::connect(std::tr1::shared_ptrgetTransportRegistry()->get("TCP", &address, priority); + TransportRegistry::Reservation rsvp(context->getTransportRegistry(), + address, priority); + // we are now blocking any connect() to this destination (address and prio) + // concurrent connect() to other destination is allowed. + // This prevents us from opening duplicate connections. + + Transport::shared_pointer transport = context->getTransportRegistry()->get(address, priority); if(transport.get()) { LOG(logLevelDebug, "Reusing existing connection to PVA server: %s.", @@ -85,110 +89,82 @@ Transport::shared_pointer BlockingTCPConnector::connect(std::tr1::shared_ptr size - * Lazy creates a lock for each 'address' ever encountered. - */ - bool lockAcquired = _namedLocker.acquireSynchronizationObject(&address, LOCK_TIMEOUT); - if(lockAcquired) { - try { - // ... transport created during waiting in lock - transport = context->getTransportRegistry()->get("TCP", &address, priority); - if(transport.get()) { - LOG(logLevelDebug, - "Reusing existing connection to PVA server: %s.", - ipAddrStr); - if (transport->acquire(client)) - return transport; - } + try { + LOG(logLevelDebug, "Connecting to PVA server: %s.", ipAddrStr); - LOG(logLevelDebug, "Connecting to PVA server: %s.", ipAddrStr); + socket = tryConnect(address, 3); - socket = tryConnect(address, 3); - - // verify - if(socket==INVALID_SOCKET) { - LOG(logLevelDebug, - "Connection to PVA server %s failed.", ipAddrStr); - std::ostringstream temp; - temp<<"Failed to verify TCP connection to '"<verify(5000)) { - LOG( - logLevelDebug, - "Connection to PVA server %s failed to be validated, closing it.", - ipAddrStr); - - std::ostringstream temp; - temp<<"Failed to verify TCP connection to '"<close(); - else if(socket!=INVALID_SOCKET) epicsSocketDestroy(socket); - _namedLocker.releaseSynchronizationObject(&address); - throw; - } catch(...) { - if(transport.get()) - transport->close(); - else if(socket!=INVALID_SOCKET) epicsSocketDestroy(socket); - _namedLocker.releaseSynchronizationObject(&address); - throw; + // verify + if(socket==INVALID_SOCKET) { + LOG(logLevelDebug, + "Connection to PVA server %s failed.", ipAddrStr); + std::ostringstream temp; + temp<<"Failed to verify TCP connection to '"<getTransportRegistry() + transport = detail::BlockingClientTCPTransportCodec::create( + context, socket, responseHandler, _receiveBufferSize, _socketSendBufferSize, + client, transportRevision, _heartbeatInterval, priority); + + // verify + if(!transport->verify(5000)) { + LOG( + logLevelDebug, + "Connection to PVA server %s failed to be validated, closing it.", + ipAddrStr); + + std::ostringstream temp; + temp<<"Failed to verify TCP connection to '"<close(); + else if(socket!=INVALID_SOCKET) + epicsSocketDestroy(socket); + throw; } } diff --git a/src/remote/pv/beaconHandler.h b/src/remote/pv/beaconHandler.h index 09a25e5..b6c2bb8 100644 --- a/src/remote/pv/beaconHandler.h +++ b/src/remote/pv/beaconHandler.h @@ -40,7 +40,7 @@ public: /** * Constructor. */ - BeaconHandler(Context::shared_pointer const & context, std::string const & protocol, + BeaconHandler(Context::shared_pointer const & context, const osiSockAddr* responseFrom); virtual ~BeaconHandler(); @@ -66,10 +66,6 @@ private: * Context instance. */ Context::weak_pointer _context; - /** - * The procotol (transport), "tcp" for pvAccess TCP/IP. - */ - std::string _protocol; /** * Remote address. */ diff --git a/src/remote/pv/blockingTCP.h b/src/remote/pv/blockingTCP.h index ceb361a..e248e65 100644 --- a/src/remote/pv/blockingTCP.h +++ b/src/remote/pv/blockingTCP.h @@ -36,7 +36,6 @@ #include #include #include -#include #include namespace epics { @@ -70,11 +69,6 @@ private: */ Context::weak_pointer _context; - /** - * named lock - */ - NamedLockPattern _namedLocker; - /** * Receive buffer size. */ diff --git a/src/remote/pv/codec.h b/src/remote/pv/codec.h index efadc85..81b01e6 100644 --- a/src/remote/pv/codec.h +++ b/src/remote/pv/codec.h @@ -422,7 +422,7 @@ public: void activate() { Transport::shared_pointer thisSharedPtr = shared_from_this(); - _context->getTransportRegistry()->put(thisSharedPtr); + _context->getTransportRegistry()->install(thisSharedPtr); start(); } diff --git a/src/remote/pv/remote.h b/src/remote/pv/remote.h index 108439c..7e15e63 100644 --- a/src/remote/pv/remote.h +++ b/src/remote/pv/remote.h @@ -189,7 +189,7 @@ public: /** * Get remote address. - * @return remote address, can be null. + * @return remote address, can never be null. */ virtual const osiSockAddr* getRemoteAddress() const = 0; diff --git a/src/remote/pv/transportRegistry.h b/src/remote/pv/transportRegistry.h index 4e1b01b..60b547f 100644 --- a/src/remote/pv/transportRegistry.h +++ b/src/remote/pv/transportRegistry.h @@ -9,6 +9,7 @@ #include #include +#include #include #ifdef epicsExportSharedSymbols @@ -36,33 +37,51 @@ namespace pvAccess { class TransportRegistry { public: - typedef std::tr1::shared_ptr shared_pointer; - typedef std::tr1::shared_ptr const_shared_pointer; + class Reservation; +private: + struct Key { + osiSockAddr addr; + epics::pvData::int16 prio; + Key(const osiSockAddr& a, epics::pvData::int16 p) :addr(a), prio(p) {} + bool operator<(const Key& o) const; + }; + + typedef std::map transports_t; + typedef std::map > locks_t; + +public: + POINTER_DEFINITIONS(TransportRegistry); typedef std::vector transportVector_t; - TransportRegistry(); - virtual ~TransportRegistry(); + class Reservation { + TransportRegistry* const owner; + const Key key; + std::tr1::shared_ptr mutex; + public: - void put(Transport::shared_pointer const & transport); - Transport::shared_pointer get(std::string const & type, const osiSockAddr* address, const epics::pvData::int16 priority); - void get(std::string const & type, const osiSockAddr* address, transportVector_t&output); + // ctor blocks until no concurrent connect() in progress (success or failure) + Reservation(TransportRegistry *owner, const osiSockAddr& address, epics::pvData::int16 prio); + ~Reservation(); + }; + + TransportRegistry() {} + ~TransportRegistry(); + + Transport::shared_pointer get(const osiSockAddr& address, epics::pvData::int16 prio); + void install(const Transport::shared_pointer& ptr); Transport::shared_pointer remove(Transport::shared_pointer const & transport); void clear(); - epics::pvData::int32 numberOfActiveTransports(); + size_t size(); // optimized to avoid reallocation, adds to array - void toArray(transportVector_t & transportArray); + void toArray(transportVector_t & transportArray, const osiSockAddr *dest=0); private: - //TODO if unordered map is used instead of map we can use sockAddrAreIdentical routine from osiSock.h - // NOTE: pointers are used to osiSockAddr (to save memory), since it guaranteed that their reference is valid as long as Transport - typedef std::map prioritiesMap_t; - typedef std::tr1::shared_ptr prioritiesMapSharedPtr_t; - typedef std::map transportsMap_t; + transports_t transports; + // per destination mutex to serialize concurrent connect() attempts + locks_t locks; - transportsMap_t _transports; - epics::pvData::int32 _transportCount; epics::pvData::Mutex _mutex; }; diff --git a/src/remote/transportRegistry.cpp b/src/remote/transportRegistry.cpp index d357536..cc0591d 100644 --- a/src/remote/transportRegistry.cpp +++ b/src/remote/transportRegistry.cpp @@ -6,68 +6,79 @@ #define epicsExportSharedSymbols #include +#include -using namespace epics::pvData; +namespace pvd = epics::pvData; namespace epics { namespace pvAccess { -TransportRegistry::TransportRegistry(): _transports(), _transportCount(0), _mutex() -{ +bool TransportRegistry::Key::operator<(const Key& o) const +{ + if(addr.sa.sa_familyo.addr.sa.sa_family) + return false; + if(addr.ia.sin_addr.s_addro.addr.ia.sin_addr.s_addr) + return false; + if(addr.ia.sin_porto.addr.ia.sin_port) + return false; + if(prio_mutex); + + std::tr1::shared_ptr& lock = owner->locks[key]; // fetch or alloc + if(!lock) + lock.reset(new pvd::Mutex()); + + mutex = lock; + } + + mutex->lock(); +} + +TransportRegistry::Reservation::~Reservation() +{ + mutex->unlock(); + + pvd::Lock G(owner->_mutex); + + assert(mutex.use_count()>=2); + + if(mutex.use_count()==2) { + // no other concurrent connect(), so can drop this lock + owner->locks.erase(key); + } + + assert(mutex.use_count()==1); } TransportRegistry::~TransportRegistry() { + pvd::Lock G(_mutex); + if(!transports.empty()) + LOG(logLevelWarn, "TransportRegistry destroyed while not empty"); } - -void TransportRegistry::put(Transport::shared_pointer const & transport) -{ - Lock guard(_mutex); - //const string type = transport.getType(); - const int16 priority = transport->getPriority(); - const osiSockAddr* address = transport->getRemoteAddress(); - - transportsMap_t::iterator transportsIter = _transports.find(address); - prioritiesMapSharedPtr_t priorities; - if(transportsIter == _transports.end()) - { - priorities.reset(new prioritiesMap_t()); - _transports[address] = priorities; - _transportCount++; - } - else - { - priorities = transportsIter->second; - prioritiesMap_t::iterator prioritiesIter = priorities->find(priority); - if(prioritiesIter == priorities->end()) //only increase transportCount if not replacing - { - _transportCount++; - } - } - (*priorities)[priority] = transport; -} - -Transport::shared_pointer TransportRegistry::get(std::string const & /*type*/, const osiSockAddr* address, const int16 priority) -{ - Lock guard(_mutex); - transportsMap_t::iterator transportsIter = _transports.find(address); - if(transportsIter != _transports.end()) - { - prioritiesMapSharedPtr_t priorities = transportsIter->second; - prioritiesMap_t::iterator prioritiesIter = priorities->find(priority); - if(prioritiesIter != priorities->end()) - { - return prioritiesIter->second; - } - } - return Transport::shared_pointer(); -} - +/* void -TransportRegistry::get(std::string const & /*type*/, const osiSockAddr* address, transportVector_t& output) +TransportRegistry::get(const osiSockAddr* address, transportVector_t& output) { - Lock guard(_mutex); + pvd::Lock guard(_mutex); transportsMap_t::iterator transportsIter = _transports.find(address); if(transportsIter != _transports.end()) { @@ -84,67 +95,109 @@ TransportRegistry::get(std::string const & /*type*/, const osiSockAddr* address, } } } +*/ + +Transport::shared_pointer TransportRegistry::get(const osiSockAddr& address, epics::pvData::int16 prio) +{ + const Key key(address, prio); + + pvd::Lock G(_mutex); + + transports_t::iterator it(transports.find(key)); + if(it!=transports.end()) { + return it->second; + } + return Transport::shared_pointer(); +} + +void TransportRegistry::install(const Transport::shared_pointer& ptr) +{ + const Key key(*ptr->getRemoteAddress(), ptr->getPriority()); + + pvd::Lock G(_mutex); + + std::pair itpair(transports.insert(std::make_pair(key, ptr))); + if(!itpair.second) + THROW_EXCEPTION2(std::logic_error, "Refuse to insert dup"); +} Transport::shared_pointer TransportRegistry::remove(Transport::shared_pointer const & transport) { - Lock guard(_mutex); - const int16 priority = transport->getPriority(); - const osiSockAddr* address = transport->getRemoteAddress(); - Transport::shared_pointer retTransport; - transportsMap_t::iterator transportsIter = _transports.find(address); - if(transportsIter != _transports.end()) + assert(!!transport); + Transport::shared_pointer ret; + + pvd::Lock guard(_mutex); + for(transports_t::iterator it(transports.begin()), end(transports.end()); + it != end; ++it) { - prioritiesMapSharedPtr_t priorities = transportsIter->second; - prioritiesMap_t::iterator prioritiesIter = priorities->find(priority); - if(prioritiesIter != priorities->end()) - { - retTransport = prioritiesIter->second; - priorities->erase(prioritiesIter); - _transportCount--; - if(priorities->size() == 0) - { - _transports.erase(transportsIter); - } + Transport::shared_pointer& tr = it->second; + + if(transport.get() == tr.get()) { + ret.swap(it->second); + transports.erase(it); } } - return retTransport; + return ret; } +#define LEAK_CHECK(PTR, NAME) if((PTR) && !(PTR).unique()) { std::cerr<<"Leaking Transport " NAME " use_count="<<(PTR).use_count()<<"\n"<second; - for (prioritiesMap_t::iterator prioritiesIter = priorities->begin(); - prioritiesIter != priorities->end(); - prioritiesIter++) - { - transportArray.push_back(prioritiesIter->second); + it->second->close(); + } + + for(transports_t::iterator it(temp.begin()), end(temp.end()); + it != end; ++it) + { + const Transport::shared_pointer& transport = it->second; + transport->waitJoin(); + LEAK_CHECK(transport, "tcp transport") + if(!transport.unique()) { + LOG(logLevelError, "Closed transport %s still has use_count=%u", + transport->getRemoteName().c_str(), + (unsigned)transport.use_count()); } } } +size_t TransportRegistry::size() +{ + pvd::Lock guard(_mutex); + return transports.size(); +} + +void TransportRegistry::toArray(transportVector_t & transportArray, const osiSockAddr *dest) +{ + pvd::Lock guard(_mutex); + + transportArray.reserve(transportArray.size() + transports.size()); + + for(transports_t::const_iterator it(transports.begin()), end(transports.end()); + it != end; ++it) + { + const Key& key = it->first; + const Transport::shared_pointer& tr = it->second; + + if(!dest || sockAddrAreIdentical(dest, &key.addr)) + transportArray.push_back(tr); + } +} + } } diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 2ab15f9..ab8271c 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -2730,14 +2730,16 @@ public: int16 port = payloadBuffer->getShort(); serverAddress.ia.sin_port = htons(port); - string protocol = SerializeHelper::deserializeString(payloadBuffer, transport.get()); + string protocol(SerializeHelper::deserializeString(payloadBuffer, transport.get())); + if(protocol!="tcp") + return; // TODO optimize ClientContextImpl::shared_pointer context = _context.lock(); if (!context) return; - std::tr1::shared_ptr beaconHandler = context->getBeaconHandler(protocol, responseFrom); + std::tr1::shared_ptr beaconHandler = context->getBeaconHandler(responseFrom); // currently we care only for servers used by this context if (!beaconHandler) return; @@ -4208,7 +4210,7 @@ private: // wait for all transports to cleanly exit int tries = 40; epics::pvData::int32 transportCount; - while ((transportCount = m_transportRegistry.numberOfActiveTransports()) && tries--) + while ((transportCount = m_transportRegistry.size()) && tries--) epicsThreadSleep(0.025); { @@ -4403,19 +4405,15 @@ private: * @param responseFrom remote source address of received beacon. * @return beacon handler for particular server. */ - BeaconHandler::shared_pointer getBeaconHandler(std::string const & protocol, osiSockAddr* responseFrom) OVERRIDE FINAL + BeaconHandler::shared_pointer getBeaconHandler(osiSockAddr* responseFrom) OVERRIDE FINAL { - // TODO !!! protocol !!! - if (protocol != "tcp") - return BeaconHandler::shared_pointer(); - Lock guard(m_beaconMapMutex); AddressBeaconHandlerMap::iterator it = m_beaconHandlers.find(*responseFrom); BeaconHandler::shared_pointer handler; if (it == m_beaconHandlers.end()) { // stores weak_ptr - handler.reset(new BeaconHandler(internal_from_this(), protocol, responseFrom)); + handler.reset(new BeaconHandler(internal_from_this(), responseFrom)); m_beaconHandlers[*responseFrom] = handler; } else diff --git a/src/remoteClient/pv/clientContextImpl.h b/src/remoteClient/pv/clientContextImpl.h index 29852b3..0ee0884 100644 --- a/src/remoteClient/pv/clientContextImpl.h +++ b/src/remoteClient/pv/clientContextImpl.h @@ -112,7 +112,7 @@ public: virtual void newServerDetected() = 0; - virtual std::tr1::shared_ptr getBeaconHandler(std::string const & protocol, osiSockAddr* responseFrom) = 0; + virtual std::tr1::shared_ptr getBeaconHandler(osiSockAddr* responseFrom) = 0; virtual void destroy() = 0; }; diff --git a/src/server/serverContext.cpp b/src/server/serverContext.cpp index 1deab07..3d7a969 100644 --- a/src/server/serverContext.cpp +++ b/src/server/serverContext.cpp @@ -369,41 +369,8 @@ void ServerContextImpl::destroyAllTransports() if (size == 0) return; - LOG(logLevelDebug, "Server context still has %zu transport(s) active and closing...", size); - - for (size_t i = 0; i < size; i++) - { - const Transport::shared_pointer& transport = transports[i]; - try - { - transport->close(); - } - catch (std::exception &e) - { - // do all exception safe, log in case of an error - LOG(logLevelError, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); - } - catch (...) - { - // do all exception safe, log in case of an error - LOG(logLevelError, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); - } - } - // now clear all (release) _transportRegistry.clear(); - - for (size_t i = 0; i < size; i++) - { - const Transport::shared_pointer& transport = transports[i]; - transport->waitJoin(); - LEAK_CHECK(transport, "tcp transport") - if(!transport.unique()) { - LOG(logLevelError, "Closed transport %s still has use_count=%u", - transport->getRemoteName().c_str(), - (unsigned)transport.use_count()); - } - } } void ServerContext::printInfo(int lvl) diff --git a/src/utils/pv/inetAddressUtil.h b/src/utils/pv/inetAddressUtil.h index 31e05e5..9140850 100644 --- a/src/utils/pv/inetAddressUtil.h +++ b/src/utils/pv/inetAddressUtil.h @@ -109,18 +109,6 @@ epicsShareFunc int getLoopbackNIF(osiSockAddr& loAddr, std::string const & local // comparators for osiSockAddr -struct comp_osiSockAddrPtr { - bool operator()(osiSockAddr const *a, osiSockAddr const *b) const { - if(a->sa.sa_familysa.sa_family) return true; - if((a->sa.sa_family==b->sa.sa_family)&&(a->ia.sin_addr.s_addr - ia.sin_addr.s_addr)) return true; - if((a->sa.sa_family==b->sa.sa_family)&&(a->ia.sin_addr.s_addr - ==b->ia.sin_addr.s_addr)&&(a->ia.sin_port - ia.sin_port)) return true; - return false; - } -}; - struct comp_osiSock_lt { bool operator()(const osiSockAddr& a, const osiSockAddr& b) const { if(a.sa.sa_family