update transportRegistry
avoid leaks of SOCKET and leaving mutex locked when exceptions are thrown.
This commit is contained in:
@ -16,10 +16,8 @@ namespace epics {
|
||||
namespace pvAccess {
|
||||
|
||||
BeaconHandler::BeaconHandler(Context::shared_pointer const & context,
|
||||
std::string const & protocol,
|
||||
const osiSockAddr* responseFrom) :
|
||||
_context(Context::weak_pointer(context)),
|
||||
_protocol(protocol),
|
||||
_responseFrom(*responseFrom),
|
||||
_mutex(),
|
||||
_serverGUID(),
|
||||
@ -88,7 +86,7 @@ bool BeaconHandler::updateBeacon(int8 /*remoteTransportRevision*/, TimeStamp* /*
|
||||
void BeaconHandler::changedTransport()
|
||||
{
|
||||
TransportRegistry::transportVector_t transports;
|
||||
_context.lock()->getTransportRegistry()->get(_protocol, &_responseFrom, transports);
|
||||
_context.lock()->getTransportRegistry()->toArray(transports, &_responseFrom);
|
||||
|
||||
// notify all
|
||||
for (TransportRegistry::transportVector_t::iterator iter(transports.begin()), end(transports.end());
|
||||
|
@ -26,7 +26,6 @@ BlockingTCPConnector::BlockingTCPConnector(
|
||||
int receiveBufferSize,
|
||||
float heartbeatInterval) :
|
||||
_context(context),
|
||||
_namedLocker(),
|
||||
_receiveBufferSize(receiveBufferSize),
|
||||
_heartbeatInterval(heartbeatInterval)
|
||||
{
|
||||
@ -75,8 +74,13 @@ Transport::shared_pointer BlockingTCPConnector::connect(std::tr1::shared_ptr<Cli
|
||||
|
||||
Context::shared_pointer context = _context.lock();
|
||||
|
||||
// first try to check cache w/o named lock...
|
||||
Transport::shared_pointer transport = context->getTransportRegistry()->get("TCP", &address, priority);
|
||||
TransportRegistry::Reservation rsvp(context->getTransportRegistry(),
|
||||
address, priority);
|
||||
// we are now blocking any connect() to this destination (address and prio)
|
||||
// concurrent connect() to other destination is allowed.
|
||||
// This prevents us from opening duplicate connections.
|
||||
|
||||
Transport::shared_pointer transport = context->getTransportRegistry()->get(address, priority);
|
||||
if(transport.get()) {
|
||||
LOG(logLevelDebug,
|
||||
"Reusing existing connection to PVA server: %s.",
|
||||
@ -85,110 +89,82 @@ Transport::shared_pointer BlockingTCPConnector::connect(std::tr1::shared_ptr<Cli
|
||||
return transport;
|
||||
}
|
||||
|
||||
/* TODO: bound map<> size
|
||||
* Lazy creates a lock for each 'address' ever encountered.
|
||||
*/
|
||||
bool lockAcquired = _namedLocker.acquireSynchronizationObject(&address, LOCK_TIMEOUT);
|
||||
if(lockAcquired) {
|
||||
try {
|
||||
// ... transport created during waiting in lock
|
||||
transport = context->getTransportRegistry()->get("TCP", &address, priority);
|
||||
if(transport.get()) {
|
||||
LOG(logLevelDebug,
|
||||
"Reusing existing connection to PVA server: %s.",
|
||||
ipAddrStr);
|
||||
if (transport->acquire(client))
|
||||
return transport;
|
||||
}
|
||||
try {
|
||||
LOG(logLevelDebug, "Connecting to PVA server: %s.", ipAddrStr);
|
||||
|
||||
LOG(logLevelDebug, "Connecting to PVA server: %s.", ipAddrStr);
|
||||
socket = tryConnect(address, 3);
|
||||
|
||||
socket = tryConnect(address, 3);
|
||||
|
||||
// verify
|
||||
if(socket==INVALID_SOCKET) {
|
||||
LOG(logLevelDebug,
|
||||
"Connection to PVA server %s failed.", ipAddrStr);
|
||||
std::ostringstream temp;
|
||||
temp<<"Failed to verify TCP connection to '"<<ipAddrStr<<"'.";
|
||||
THROW_BASE_EXCEPTION(temp.str().c_str());
|
||||
}
|
||||
|
||||
LOG(logLevelDebug, "Socket connected to PVA server: %s.", ipAddrStr);
|
||||
|
||||
// enable TCP_NODELAY (disable Nagle's algorithm)
|
||||
int optval = 1; // true
|
||||
int retval = ::setsockopt(socket, IPPROTO_TCP, TCP_NODELAY,
|
||||
(char *)&optval, sizeof(int));
|
||||
if(retval<0) {
|
||||
char errStr[64];
|
||||
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
|
||||
LOG(logLevelWarn, "Error setting TCP_NODELAY: %s.", errStr);
|
||||
}
|
||||
|
||||
// enable TCP_KEEPALIVE
|
||||
retval = ::setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE,
|
||||
(char *)&optval, sizeof(int));
|
||||
if(retval<0)
|
||||
{
|
||||
char errStr[64];
|
||||
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
|
||||
LOG(logLevelWarn, "Error setting SO_KEEPALIVE: %s.", errStr);
|
||||
}
|
||||
|
||||
// TODO tune buffer sizes?! Win32 defaults are 8k, which is OK
|
||||
|
||||
// create transport
|
||||
// TODO introduce factory
|
||||
// get TCP send buffer size
|
||||
osiSocklen_t intLen = sizeof(int);
|
||||
int _socketSendBufferSize;
|
||||
retval = getsockopt(socket, SOL_SOCKET, SO_SNDBUF, (char *)&_socketSendBufferSize, &intLen);
|
||||
if(retval<0) {
|
||||
char strBuffer[64];
|
||||
epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer));
|
||||
LOG(logLevelDebug, "Error getting SO_SNDBUF: %s.", strBuffer);
|
||||
}
|
||||
|
||||
transport = detail::BlockingClientTCPTransportCodec::create(
|
||||
context, socket, responseHandler, _receiveBufferSize, _socketSendBufferSize,
|
||||
client, transportRevision, _heartbeatInterval, priority);
|
||||
|
||||
// verify
|
||||
if(!transport->verify(5000)) {
|
||||
LOG(
|
||||
logLevelDebug,
|
||||
"Connection to PVA server %s failed to be validated, closing it.",
|
||||
ipAddrStr);
|
||||
|
||||
std::ostringstream temp;
|
||||
temp<<"Failed to verify TCP connection to '"<<ipAddrStr<<"'.";
|
||||
THROW_BASE_EXCEPTION(temp.str().c_str());
|
||||
}
|
||||
|
||||
LOG(logLevelDebug, "Connected to PVA server: %s.", ipAddrStr);
|
||||
|
||||
_namedLocker.releaseSynchronizationObject(&address);
|
||||
return transport;
|
||||
} catch(std::exception&) {
|
||||
if(transport.get())
|
||||
transport->close();
|
||||
else if(socket!=INVALID_SOCKET) epicsSocketDestroy(socket);
|
||||
_namedLocker.releaseSynchronizationObject(&address);
|
||||
throw;
|
||||
} catch(...) {
|
||||
if(transport.get())
|
||||
transport->close();
|
||||
else if(socket!=INVALID_SOCKET) epicsSocketDestroy(socket);
|
||||
_namedLocker.releaseSynchronizationObject(&address);
|
||||
throw;
|
||||
// verify
|
||||
if(socket==INVALID_SOCKET) {
|
||||
LOG(logLevelDebug,
|
||||
"Connection to PVA server %s failed.", ipAddrStr);
|
||||
std::ostringstream temp;
|
||||
temp<<"Failed to verify TCP connection to '"<<ipAddrStr<<"'.";
|
||||
THROW_BASE_EXCEPTION(temp.str().c_str());
|
||||
}
|
||||
}
|
||||
else {
|
||||
std::ostringstream temp;
|
||||
temp<<"Failed to obtain synchronization lock for '"<<ipAddrStr;
|
||||
temp<<"', possible deadlock.";
|
||||
THROW_BASE_EXCEPTION(temp.str().c_str());
|
||||
|
||||
LOG(logLevelDebug, "Socket connected to PVA server: %s.", ipAddrStr);
|
||||
|
||||
// enable TCP_NODELAY (disable Nagle's algorithm)
|
||||
int optval = 1; // true
|
||||
int retval = ::setsockopt(socket, IPPROTO_TCP, TCP_NODELAY,
|
||||
(char *)&optval, sizeof(int));
|
||||
if(retval<0) {
|
||||
char errStr[64];
|
||||
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
|
||||
LOG(logLevelWarn, "Error setting TCP_NODELAY: %s.", errStr);
|
||||
}
|
||||
|
||||
// enable TCP_KEEPALIVE
|
||||
retval = ::setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE,
|
||||
(char *)&optval, sizeof(int));
|
||||
if(retval<0)
|
||||
{
|
||||
char errStr[64];
|
||||
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
|
||||
LOG(logLevelWarn, "Error setting SO_KEEPALIVE: %s.", errStr);
|
||||
}
|
||||
|
||||
// TODO tune buffer sizes?! Win32 defaults are 8k, which is OK
|
||||
|
||||
// create transport
|
||||
// TODO introduce factory
|
||||
// get TCP send buffer size
|
||||
osiSocklen_t intLen = sizeof(int);
|
||||
int _socketSendBufferSize;
|
||||
retval = getsockopt(socket, SOL_SOCKET, SO_SNDBUF, (char *)&_socketSendBufferSize, &intLen);
|
||||
if(retval<0) {
|
||||
char strBuffer[64];
|
||||
epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer));
|
||||
LOG(logLevelDebug, "Error getting SO_SNDBUF: %s.", strBuffer);
|
||||
}
|
||||
|
||||
// create() also adds to context connection pool _context->getTransportRegistry()
|
||||
transport = detail::BlockingClientTCPTransportCodec::create(
|
||||
context, socket, responseHandler, _receiveBufferSize, _socketSendBufferSize,
|
||||
client, transportRevision, _heartbeatInterval, priority);
|
||||
|
||||
// verify
|
||||
if(!transport->verify(5000)) {
|
||||
LOG(
|
||||
logLevelDebug,
|
||||
"Connection to PVA server %s failed to be validated, closing it.",
|
||||
ipAddrStr);
|
||||
|
||||
std::ostringstream temp;
|
||||
temp<<"Failed to verify TCP connection to '"<<ipAddrStr<<"'.";
|
||||
THROW_BASE_EXCEPTION(temp.str().c_str());
|
||||
}
|
||||
|
||||
LOG(logLevelDebug, "Connected to PVA server: %s.", ipAddrStr);
|
||||
|
||||
return transport;
|
||||
} catch(std::exception&) {
|
||||
if(transport.get())
|
||||
transport->close();
|
||||
else if(socket!=INVALID_SOCKET)
|
||||
epicsSocketDestroy(socket);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -40,7 +40,7 @@ public:
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
BeaconHandler(Context::shared_pointer const & context, std::string const & protocol,
|
||||
BeaconHandler(Context::shared_pointer const & context,
|
||||
const osiSockAddr* responseFrom);
|
||||
|
||||
virtual ~BeaconHandler();
|
||||
@ -66,10 +66,6 @@ private:
|
||||
* Context instance.
|
||||
*/
|
||||
Context::weak_pointer _context;
|
||||
/**
|
||||
* The procotol (transport), "tcp" for pvAccess TCP/IP.
|
||||
*/
|
||||
std::string _protocol;
|
||||
/**
|
||||
* Remote address.
|
||||
*/
|
||||
|
@ -36,7 +36,6 @@
|
||||
#include <pv/remote.h>
|
||||
#include <pv/transportRegistry.h>
|
||||
#include <pv/introspectionRegistry.h>
|
||||
#include <pv/namedLockPattern.h>
|
||||
#include <pv/inetAddressUtil.h>
|
||||
|
||||
namespace epics {
|
||||
@ -70,11 +69,6 @@ private:
|
||||
*/
|
||||
Context::weak_pointer _context;
|
||||
|
||||
/**
|
||||
* named lock
|
||||
*/
|
||||
NamedLockPattern<const osiSockAddr*, comp_osiSockAddrPtr> _namedLocker;
|
||||
|
||||
/**
|
||||
* Receive buffer size.
|
||||
*/
|
||||
|
@ -422,7 +422,7 @@ public:
|
||||
|
||||
void activate() {
|
||||
Transport::shared_pointer thisSharedPtr = shared_from_this();
|
||||
_context->getTransportRegistry()->put(thisSharedPtr);
|
||||
_context->getTransportRegistry()->install(thisSharedPtr);
|
||||
|
||||
start();
|
||||
}
|
||||
|
@ -189,7 +189,7 @@ public:
|
||||
|
||||
/**
|
||||
* Get remote address.
|
||||
* @return remote address, can be null.
|
||||
* @return remote address, can never be null.
|
||||
*/
|
||||
virtual const osiSockAddr* getRemoteAddress() const = 0;
|
||||
|
||||
|
@ -9,6 +9,7 @@
|
||||
|
||||
#include <map>
|
||||
#include <vector>
|
||||
#include <list>
|
||||
#include <iostream>
|
||||
|
||||
#ifdef epicsExportSharedSymbols
|
||||
@ -36,33 +37,51 @@ namespace pvAccess {
|
||||
|
||||
class TransportRegistry {
|
||||
public:
|
||||
typedef std::tr1::shared_ptr<TransportRegistry> shared_pointer;
|
||||
typedef std::tr1::shared_ptr<const TransportRegistry> const_shared_pointer;
|
||||
class Reservation;
|
||||
private:
|
||||
struct Key {
|
||||
osiSockAddr addr;
|
||||
epics::pvData::int16 prio;
|
||||
Key(const osiSockAddr& a, epics::pvData::int16 p) :addr(a), prio(p) {}
|
||||
bool operator<(const Key& o) const;
|
||||
};
|
||||
|
||||
typedef std::map<Key, Transport::shared_pointer> transports_t;
|
||||
typedef std::map<Key, std::tr1::shared_ptr<epics::pvData::Mutex> > locks_t;
|
||||
|
||||
public:
|
||||
POINTER_DEFINITIONS(TransportRegistry);
|
||||
|
||||
typedef std::vector<Transport::shared_pointer> transportVector_t;
|
||||
|
||||
TransportRegistry();
|
||||
virtual ~TransportRegistry();
|
||||
class Reservation {
|
||||
TransportRegistry* const owner;
|
||||
const Key key;
|
||||
std::tr1::shared_ptr<epics::pvData::Mutex> mutex;
|
||||
public:
|
||||
|
||||
void put(Transport::shared_pointer const & transport);
|
||||
Transport::shared_pointer get(std::string const & type, const osiSockAddr* address, const epics::pvData::int16 priority);
|
||||
void get(std::string const & type, const osiSockAddr* address, transportVector_t&output);
|
||||
// ctor blocks until no concurrent connect() in progress (success or failure)
|
||||
Reservation(TransportRegistry *owner, const osiSockAddr& address, epics::pvData::int16 prio);
|
||||
~Reservation();
|
||||
};
|
||||
|
||||
TransportRegistry() {}
|
||||
~TransportRegistry();
|
||||
|
||||
Transport::shared_pointer get(const osiSockAddr& address, epics::pvData::int16 prio);
|
||||
void install(const Transport::shared_pointer& ptr);
|
||||
Transport::shared_pointer remove(Transport::shared_pointer const & transport);
|
||||
void clear();
|
||||
epics::pvData::int32 numberOfActiveTransports();
|
||||
size_t size();
|
||||
|
||||
// optimized to avoid reallocation, adds to array
|
||||
void toArray(transportVector_t & transportArray);
|
||||
void toArray(transportVector_t & transportArray, const osiSockAddr *dest=0);
|
||||
|
||||
private:
|
||||
//TODO if unordered map is used instead of map we can use sockAddrAreIdentical routine from osiSock.h
|
||||
// NOTE: pointers are used to osiSockAddr (to save memory), since it guaranteed that their reference is valid as long as Transport
|
||||
typedef std::map<const epics::pvData::int16,Transport::shared_pointer> prioritiesMap_t;
|
||||
typedef std::tr1::shared_ptr<prioritiesMap_t> prioritiesMapSharedPtr_t;
|
||||
typedef std::map<const osiSockAddr*,prioritiesMapSharedPtr_t,comp_osiSockAddrPtr> transportsMap_t;
|
||||
transports_t transports;
|
||||
// per destination mutex to serialize concurrent connect() attempts
|
||||
locks_t locks;
|
||||
|
||||
transportsMap_t _transports;
|
||||
epics::pvData::int32 _transportCount;
|
||||
epics::pvData::Mutex _mutex;
|
||||
};
|
||||
|
||||
|
@ -6,68 +6,79 @@
|
||||
|
||||
#define epicsExportSharedSymbols
|
||||
#include <pv/transportRegistry.h>
|
||||
#include <pv/logger.h>
|
||||
|
||||
using namespace epics::pvData;
|
||||
namespace pvd = epics::pvData;
|
||||
|
||||
namespace epics {
|
||||
namespace pvAccess {
|
||||
|
||||
TransportRegistry::TransportRegistry(): _transports(), _transportCount(0), _mutex()
|
||||
{
|
||||
|
||||
bool TransportRegistry::Key::operator<(const Key& o) const
|
||||
{
|
||||
if(addr.sa.sa_family<o.addr.sa.sa_family)
|
||||
return true;
|
||||
if(addr.sa.sa_family>o.addr.sa.sa_family)
|
||||
return false;
|
||||
if(addr.ia.sin_addr.s_addr<o.addr.ia.sin_addr.s_addr)
|
||||
return true;
|
||||
if(addr.ia.sin_addr.s_addr>o.addr.ia.sin_addr.s_addr)
|
||||
return false;
|
||||
if(addr.ia.sin_port<o.addr.ia.sin_port)
|
||||
return true;
|
||||
if(addr.ia.sin_port>o.addr.ia.sin_port)
|
||||
return false;
|
||||
if(prio<o.prio)
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
TransportRegistry::Reservation::Reservation(TransportRegistry *owner,
|
||||
const osiSockAddr& address,
|
||||
pvd::int16 prio)
|
||||
:owner(owner)
|
||||
,key(address, prio)
|
||||
{
|
||||
{
|
||||
pvd::Lock G(owner->_mutex);
|
||||
|
||||
std::tr1::shared_ptr<pvd::Mutex>& lock = owner->locks[key]; // fetch or alloc
|
||||
if(!lock)
|
||||
lock.reset(new pvd::Mutex());
|
||||
|
||||
mutex = lock;
|
||||
}
|
||||
|
||||
mutex->lock();
|
||||
}
|
||||
|
||||
TransportRegistry::Reservation::~Reservation()
|
||||
{
|
||||
mutex->unlock();
|
||||
|
||||
pvd::Lock G(owner->_mutex);
|
||||
|
||||
assert(mutex.use_count()>=2);
|
||||
|
||||
if(mutex.use_count()==2) {
|
||||
// no other concurrent connect(), so can drop this lock
|
||||
owner->locks.erase(key);
|
||||
}
|
||||
|
||||
assert(mutex.use_count()==1);
|
||||
}
|
||||
|
||||
TransportRegistry::~TransportRegistry()
|
||||
{
|
||||
pvd::Lock G(_mutex);
|
||||
if(!transports.empty())
|
||||
LOG(logLevelWarn, "TransportRegistry destroyed while not empty");
|
||||
}
|
||||
|
||||
void TransportRegistry::put(Transport::shared_pointer const & transport)
|
||||
{
|
||||
Lock guard(_mutex);
|
||||
//const string type = transport.getType();
|
||||
const int16 priority = transport->getPriority();
|
||||
const osiSockAddr* address = transport->getRemoteAddress();
|
||||
|
||||
transportsMap_t::iterator transportsIter = _transports.find(address);
|
||||
prioritiesMapSharedPtr_t priorities;
|
||||
if(transportsIter == _transports.end())
|
||||
{
|
||||
priorities.reset(new prioritiesMap_t());
|
||||
_transports[address] = priorities;
|
||||
_transportCount++;
|
||||
}
|
||||
else
|
||||
{
|
||||
priorities = transportsIter->second;
|
||||
prioritiesMap_t::iterator prioritiesIter = priorities->find(priority);
|
||||
if(prioritiesIter == priorities->end()) //only increase transportCount if not replacing
|
||||
{
|
||||
_transportCount++;
|
||||
}
|
||||
}
|
||||
(*priorities)[priority] = transport;
|
||||
}
|
||||
|
||||
Transport::shared_pointer TransportRegistry::get(std::string const & /*type*/, const osiSockAddr* address, const int16 priority)
|
||||
{
|
||||
Lock guard(_mutex);
|
||||
transportsMap_t::iterator transportsIter = _transports.find(address);
|
||||
if(transportsIter != _transports.end())
|
||||
{
|
||||
prioritiesMapSharedPtr_t priorities = transportsIter->second;
|
||||
prioritiesMap_t::iterator prioritiesIter = priorities->find(priority);
|
||||
if(prioritiesIter != priorities->end())
|
||||
{
|
||||
return prioritiesIter->second;
|
||||
}
|
||||
}
|
||||
return Transport::shared_pointer();
|
||||
}
|
||||
|
||||
/*
|
||||
void
|
||||
TransportRegistry::get(std::string const & /*type*/, const osiSockAddr* address, transportVector_t& output)
|
||||
TransportRegistry::get(const osiSockAddr* address, transportVector_t& output)
|
||||
{
|
||||
Lock guard(_mutex);
|
||||
pvd::Lock guard(_mutex);
|
||||
transportsMap_t::iterator transportsIter = _transports.find(address);
|
||||
if(transportsIter != _transports.end())
|
||||
{
|
||||
@ -84,67 +95,109 @@ TransportRegistry::get(std::string const & /*type*/, const osiSockAddr* address,
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
Transport::shared_pointer TransportRegistry::get(const osiSockAddr& address, epics::pvData::int16 prio)
|
||||
{
|
||||
const Key key(address, prio);
|
||||
|
||||
pvd::Lock G(_mutex);
|
||||
|
||||
transports_t::iterator it(transports.find(key));
|
||||
if(it!=transports.end()) {
|
||||
return it->second;
|
||||
}
|
||||
return Transport::shared_pointer();
|
||||
}
|
||||
|
||||
void TransportRegistry::install(const Transport::shared_pointer& ptr)
|
||||
{
|
||||
const Key key(*ptr->getRemoteAddress(), ptr->getPriority());
|
||||
|
||||
pvd::Lock G(_mutex);
|
||||
|
||||
std::pair<transports_t::iterator, bool> itpair(transports.insert(std::make_pair(key, ptr)));
|
||||
if(!itpair.second)
|
||||
THROW_EXCEPTION2(std::logic_error, "Refuse to insert dup");
|
||||
}
|
||||
|
||||
Transport::shared_pointer TransportRegistry::remove(Transport::shared_pointer const & transport)
|
||||
{
|
||||
Lock guard(_mutex);
|
||||
const int16 priority = transport->getPriority();
|
||||
const osiSockAddr* address = transport->getRemoteAddress();
|
||||
Transport::shared_pointer retTransport;
|
||||
transportsMap_t::iterator transportsIter = _transports.find(address);
|
||||
if(transportsIter != _transports.end())
|
||||
assert(!!transport);
|
||||
Transport::shared_pointer ret;
|
||||
|
||||
pvd::Lock guard(_mutex);
|
||||
for(transports_t::iterator it(transports.begin()), end(transports.end());
|
||||
it != end; ++it)
|
||||
{
|
||||
prioritiesMapSharedPtr_t priorities = transportsIter->second;
|
||||
prioritiesMap_t::iterator prioritiesIter = priorities->find(priority);
|
||||
if(prioritiesIter != priorities->end())
|
||||
{
|
||||
retTransport = prioritiesIter->second;
|
||||
priorities->erase(prioritiesIter);
|
||||
_transportCount--;
|
||||
if(priorities->size() == 0)
|
||||
{
|
||||
_transports.erase(transportsIter);
|
||||
}
|
||||
Transport::shared_pointer& tr = it->second;
|
||||
|
||||
if(transport.get() == tr.get()) {
|
||||
ret.swap(it->second);
|
||||
transports.erase(it);
|
||||
}
|
||||
}
|
||||
return retTransport;
|
||||
return ret;
|
||||
}
|
||||
|
||||
#define LEAK_CHECK(PTR, NAME) if((PTR) && !(PTR).unique()) { std::cerr<<"Leaking Transport " NAME " use_count="<<(PTR).use_count()<<"\n"<<show_referrers(PTR, false);}
|
||||
|
||||
void TransportRegistry::clear()
|
||||
{
|
||||
Lock guard(_mutex);
|
||||
_transports.clear();
|
||||
_transportCount = 0;
|
||||
}
|
||||
transports_t temp;
|
||||
{
|
||||
pvd::Lock guard(_mutex);
|
||||
transports.swap(temp);
|
||||
}
|
||||
|
||||
int32 TransportRegistry::numberOfActiveTransports()
|
||||
{
|
||||
Lock guard(_mutex);
|
||||
return _transportCount;
|
||||
}
|
||||
|
||||
void TransportRegistry::toArray(transportVector_t & transportArray)
|
||||
{
|
||||
Lock guard(_mutex);
|
||||
if (_transportCount == 0)
|
||||
if(temp.empty())
|
||||
return;
|
||||
|
||||
transportArray.reserve(transportArray.size() + _transportCount);
|
||||
LOG(logLevelDebug, "Context still has %zu transport(s) active and closing...", temp.size());
|
||||
|
||||
for (transportsMap_t::iterator transportsIter = _transports.begin();
|
||||
transportsIter != _transports.end();
|
||||
transportsIter++)
|
||||
for(transports_t::iterator it(temp.begin()), end(temp.end());
|
||||
it != end; ++it)
|
||||
{
|
||||
prioritiesMapSharedPtr_t priorities = transportsIter->second;
|
||||
for (prioritiesMap_t::iterator prioritiesIter = priorities->begin();
|
||||
prioritiesIter != priorities->end();
|
||||
prioritiesIter++)
|
||||
{
|
||||
transportArray.push_back(prioritiesIter->second);
|
||||
it->second->close();
|
||||
}
|
||||
|
||||
for(transports_t::iterator it(temp.begin()), end(temp.end());
|
||||
it != end; ++it)
|
||||
{
|
||||
const Transport::shared_pointer& transport = it->second;
|
||||
transport->waitJoin();
|
||||
LEAK_CHECK(transport, "tcp transport")
|
||||
if(!transport.unique()) {
|
||||
LOG(logLevelError, "Closed transport %s still has use_count=%u",
|
||||
transport->getRemoteName().c_str(),
|
||||
(unsigned)transport.use_count());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
size_t TransportRegistry::size()
|
||||
{
|
||||
pvd::Lock guard(_mutex);
|
||||
return transports.size();
|
||||
}
|
||||
|
||||
void TransportRegistry::toArray(transportVector_t & transportArray, const osiSockAddr *dest)
|
||||
{
|
||||
pvd::Lock guard(_mutex);
|
||||
|
||||
transportArray.reserve(transportArray.size() + transports.size());
|
||||
|
||||
for(transports_t::const_iterator it(transports.begin()), end(transports.end());
|
||||
it != end; ++it)
|
||||
{
|
||||
const Key& key = it->first;
|
||||
const Transport::shared_pointer& tr = it->second;
|
||||
|
||||
if(!dest || sockAddrAreIdentical(dest, &key.addr))
|
||||
transportArray.push_back(tr);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2730,14 +2730,16 @@ public:
|
||||
int16 port = payloadBuffer->getShort();
|
||||
serverAddress.ia.sin_port = htons(port);
|
||||
|
||||
string protocol = SerializeHelper::deserializeString(payloadBuffer, transport.get());
|
||||
string protocol(SerializeHelper::deserializeString(payloadBuffer, transport.get()));
|
||||
if(protocol!="tcp")
|
||||
return;
|
||||
|
||||
// TODO optimize
|
||||
ClientContextImpl::shared_pointer context = _context.lock();
|
||||
if (!context)
|
||||
return;
|
||||
|
||||
std::tr1::shared_ptr<epics::pvAccess::BeaconHandler> beaconHandler = context->getBeaconHandler(protocol, responseFrom);
|
||||
std::tr1::shared_ptr<epics::pvAccess::BeaconHandler> beaconHandler = context->getBeaconHandler(responseFrom);
|
||||
// currently we care only for servers used by this context
|
||||
if (!beaconHandler)
|
||||
return;
|
||||
@ -4208,7 +4210,7 @@ private:
|
||||
// wait for all transports to cleanly exit
|
||||
int tries = 40;
|
||||
epics::pvData::int32 transportCount;
|
||||
while ((transportCount = m_transportRegistry.numberOfActiveTransports()) && tries--)
|
||||
while ((transportCount = m_transportRegistry.size()) && tries--)
|
||||
epicsThreadSleep(0.025);
|
||||
|
||||
{
|
||||
@ -4403,19 +4405,15 @@ private:
|
||||
* @param responseFrom remote source address of received beacon.
|
||||
* @return beacon handler for particular server.
|
||||
*/
|
||||
BeaconHandler::shared_pointer getBeaconHandler(std::string const & protocol, osiSockAddr* responseFrom) OVERRIDE FINAL
|
||||
BeaconHandler::shared_pointer getBeaconHandler(osiSockAddr* responseFrom) OVERRIDE FINAL
|
||||
{
|
||||
// TODO !!! protocol !!!
|
||||
if (protocol != "tcp")
|
||||
return BeaconHandler::shared_pointer();
|
||||
|
||||
Lock guard(m_beaconMapMutex);
|
||||
AddressBeaconHandlerMap::iterator it = m_beaconHandlers.find(*responseFrom);
|
||||
BeaconHandler::shared_pointer handler;
|
||||
if (it == m_beaconHandlers.end())
|
||||
{
|
||||
// stores weak_ptr
|
||||
handler.reset(new BeaconHandler(internal_from_this(), protocol, responseFrom));
|
||||
handler.reset(new BeaconHandler(internal_from_this(), responseFrom));
|
||||
m_beaconHandlers[*responseFrom] = handler;
|
||||
}
|
||||
else
|
||||
|
@ -112,7 +112,7 @@ public:
|
||||
|
||||
virtual void newServerDetected() = 0;
|
||||
|
||||
virtual std::tr1::shared_ptr<BeaconHandler> getBeaconHandler(std::string const & protocol, osiSockAddr* responseFrom) = 0;
|
||||
virtual std::tr1::shared_ptr<BeaconHandler> getBeaconHandler(osiSockAddr* responseFrom) = 0;
|
||||
|
||||
virtual void destroy() = 0;
|
||||
};
|
||||
|
@ -369,41 +369,8 @@ void ServerContextImpl::destroyAllTransports()
|
||||
if (size == 0)
|
||||
return;
|
||||
|
||||
LOG(logLevelDebug, "Server context still has %zu transport(s) active and closing...", size);
|
||||
|
||||
for (size_t i = 0; i < size; i++)
|
||||
{
|
||||
const Transport::shared_pointer& transport = transports[i];
|
||||
try
|
||||
{
|
||||
transport->close();
|
||||
}
|
||||
catch (std::exception &e)
|
||||
{
|
||||
// do all exception safe, log in case of an error
|
||||
LOG(logLevelError, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
// do all exception safe, log in case of an error
|
||||
LOG(logLevelError, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__);
|
||||
}
|
||||
}
|
||||
|
||||
// now clear all (release)
|
||||
_transportRegistry.clear();
|
||||
|
||||
for (size_t i = 0; i < size; i++)
|
||||
{
|
||||
const Transport::shared_pointer& transport = transports[i];
|
||||
transport->waitJoin();
|
||||
LEAK_CHECK(transport, "tcp transport")
|
||||
if(!transport.unique()) {
|
||||
LOG(logLevelError, "Closed transport %s still has use_count=%u",
|
||||
transport->getRemoteName().c_str(),
|
||||
(unsigned)transport.use_count());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ServerContext::printInfo(int lvl)
|
||||
|
@ -109,18 +109,6 @@ epicsShareFunc int getLoopbackNIF(osiSockAddr& loAddr, std::string const & local
|
||||
|
||||
// comparators for osiSockAddr
|
||||
|
||||
struct comp_osiSockAddrPtr {
|
||||
bool operator()(osiSockAddr const *a, osiSockAddr const *b) const {
|
||||
if(a->sa.sa_family<b->sa.sa_family) return true;
|
||||
if((a->sa.sa_family==b->sa.sa_family)&&(a->ia.sin_addr.s_addr
|
||||
<b->ia.sin_addr.s_addr)) return true;
|
||||
if((a->sa.sa_family==b->sa.sa_family)&&(a->ia.sin_addr.s_addr
|
||||
==b->ia.sin_addr.s_addr)&&(a->ia.sin_port
|
||||
<b->ia.sin_port)) return true;
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
struct comp_osiSock_lt {
|
||||
bool operator()(const osiSockAddr& a, const osiSockAddr& b) const {
|
||||
if(a.sa.sa_family<b.sa.sa_family) return true;
|
||||
@ -133,20 +121,6 @@ struct comp_osiSock_lt {
|
||||
}
|
||||
};
|
||||
|
||||
//TODO if unordered map is used instead of map we can use sockAddrAreIdentical routine from osiSock.h
|
||||
struct comp_osiSockAddr {
|
||||
bool operator()(osiSockAddr const a, osiSockAddr const b) const {
|
||||
if(a.sa.sa_family<b.sa.sa_family) return true;
|
||||
if((a.sa.sa_family==b.sa.sa_family)&&(a.ia.sin_addr.s_addr
|
||||
<b.ia.sin_addr.s_addr)) return true;
|
||||
if((a.sa.sa_family==b.sa.sa_family)&&(a.ia.sin_addr.s_addr
|
||||
==b.ia.sin_addr.s_addr)&&(a.ia.sin_port
|
||||
<b.ia.sin_port)) return true;
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
}} // namespace epics::pvAccess
|
||||
|
||||
#endif /* INETADDRESSUTIL_H_ */
|
||||
|
@ -10,6 +10,7 @@ TESTS += testChannelAccess
|
||||
TESTPROD_HOST += testCodec
|
||||
testCodec_SRCS = testCodec
|
||||
testHarness_SRCS += testCodec.cpp
|
||||
testCodec_SYS_LIBS_WIN32 += ws2_32
|
||||
TESTS += testCodec
|
||||
|
||||
TESTPROD_HOST += testRPC
|
||||
|
@ -113,6 +113,9 @@ public:
|
||||
_writeBuffer(sendBufferSize),
|
||||
_dummyAddress()
|
||||
{
|
||||
dummyAddr.ia.sin_family = AF_INET;
|
||||
dummyAddr.ia.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
|
||||
dummyAddr.ia.sin_port = htons(42);
|
||||
}
|
||||
|
||||
|
||||
@ -353,7 +356,7 @@ public:
|
||||
}
|
||||
|
||||
const osiSockAddr* getRemoteAddress() const {
|
||||
return 0;
|
||||
return &dummyAddr;
|
||||
}
|
||||
std::string dummyRemoteName;
|
||||
const std::string& getRemoteName() const {
|
||||
@ -412,6 +415,7 @@ public:
|
||||
}
|
||||
|
||||
|
||||
osiSockAddr dummyAddr;
|
||||
std::size_t _closedCount;
|
||||
std::size_t _invalidDataStreamCount;
|
||||
std::size_t _scheduleSendCount;
|
||||
|
Reference in New Issue
Block a user