From c87ff047ca2829f4b6dc2753f15e53eec1d32a4d Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Thu, 10 Feb 2011 17:05:34 +0100 Subject: [PATCH] stability --- .../remote/blockingClientTCPTransport.cpp | 39 ++++-- pvAccessApp/remote/blockingTCPTransport.cpp | 3 + pvAccessApp/remote/channelSearchManager.cpp | 41 ++++-- pvAccessApp/remote/channelSearchManager.h | 3 +- pvAccessApp/remote/remote.h | 4 +- .../remoteClient/clientContextImpl.cpp | 128 ++++++++++++------ pvAccessApp/server/serverContext.cpp | 3 + pvAccessApp/server/serverContext.h | 3 + testApp/remote/testBeaconEmitter.cpp | 3 +- testApp/remote/testBeaconHandler.cpp | 2 + testApp/remote/testBlockingTCPClnt.cpp | 4 + testApp/remote/testBlockingTCPSrv.cpp | 2 + testApp/remote/testBlockingUDPClnt.cpp | 2 + testApp/remote/testBlockingUDPSrv.cpp | 2 + testApp/remote/testChannelSearchManager.cpp | 5 + testApp/remote/testRemoteClientImpl.cpp | 1 + 16 files changed, 178 insertions(+), 67 deletions(-) diff --git a/pvAccessApp/remote/blockingClientTCPTransport.cpp b/pvAccessApp/remote/blockingClientTCPTransport.cpp index b3f274c..6d647bd 100644 --- a/pvAccessApp/remote/blockingClientTCPTransport.cpp +++ b/pvAccessApp/remote/blockingClientTCPTransport.cpp @@ -27,7 +27,11 @@ using namespace epics::pvData; namespace epics { namespace pvAccess { - BlockingClientTCPTransport::BlockingClientTCPTransport( +#define EXCEPTION_GUARD(code) try { code; } \ + catch (std::exception &e) { errlogSevPrintf(errlogMajor, "Unhandled exception caught from code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \ + catch (...) { errlogSevPrintf(errlogMajor, "Unhandled exception caught from code at %s:%d.", __FILE__, __LINE__); } + + BlockingClientTCPTransport::BlockingClientTCPTransport( Context* context, SOCKET channel, ResponseHandler* responseHandler, int receiveBufferSize, TransportClient* client, short remoteTransportRevision, @@ -84,8 +88,12 @@ namespace epics { _unresponsiveTransport = true; set::iterator it = _owners.begin(); - for(; it!=_owners.end(); it++) - (*it)->transportUnresponsive(); + for(; it!=_owners.end(); it++) { + TransportClient* client = *it; + client->acquire(); + EXCEPTION_GUARD(client->transportUnresponsive()); + client->release(); + } } } @@ -129,8 +137,13 @@ namespace epics { ipAddrStr, refs); set::iterator it = _owners.begin(); - for(; it!=_owners.end(); it++) - (*it)->transportClosed(); + for(; it!=_owners.end(); it++) { + TransportClient* client = *it; + client->acquire(); + EXCEPTION_GUARD(client->transportClosed()); + client->release(); + } + } _owners.clear(); @@ -165,8 +178,12 @@ namespace epics { _unresponsiveTransport = false; set::iterator it = _owners.begin(); - for(; it!=_owners.end(); it++) - (*it)->transportResponsive(this); + for(; it!=_owners.end(); it++) { + TransportClient* client = *it; + client->acquire(); + EXCEPTION_GUARD(client->transportResponsive(this)); + client->release(); + } } } @@ -175,8 +192,12 @@ namespace epics { Lock lock(&_ownersMutex); set::iterator it = _owners.begin(); - for(; it!=_owners.end(); it++) - (*it)->transportChanged(); + for(; it!=_owners.end(); it++) { + TransportClient* client = *it; + client->acquire(); + EXCEPTION_GUARD(client->transportChanged()); + client->release(); + } } void BlockingClientTCPTransport::send(ByteBuffer* buffer, diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index ac051bb..58bce7b 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -129,6 +129,7 @@ namespace epics { clearAndReleaseBuffer(); // add to registry + _context->acquire(); _context->getTransportRegistry()->put(this); } @@ -151,6 +152,8 @@ namespace epics { delete _sendBuffer; delete _responseHandler; + + _context->release(); } void BlockingTCPTransport::start() { diff --git a/pvAccessApp/remote/channelSearchManager.cpp b/pvAccessApp/remote/channelSearchManager.cpp index bcadc80..932cd9e 100644 --- a/pvAccessApp/remote/channelSearchManager.cpp +++ b/pvAccessApp/remote/channelSearchManager.cpp @@ -11,9 +11,17 @@ namespace epics { namespace pvAccess { const int BaseSearchInstance::DATA_COUNT_POSITION = CA_MESSAGE_HEADER_SIZE + sizeof(int32)/sizeof(int8) + 1; const int BaseSearchInstance::PAYLOAD_POSITION = sizeof(int16)/sizeof(int8) + 2; +void BaseSearchInstance::initializeSearchInstance() +{ + _owner = NULL; + _ownerMutex = NULL; + _ownerIndex = -1; + } + void BaseSearchInstance::unsetListOwnership() { Lock guard(&_mutex); + if (_owner != NULL) this->release(); _owner = NULL; } @@ -25,6 +33,7 @@ void BaseSearchInstance::addAndSetListOwnership(ArrayFIFO* newO Lock ownerGuard(_ownerMutex); Lock guard(&_mutex); newOwner->push(this); + if (_owner == NULL) this->acquire(); // new owner _owner = newOwner; _ownerIndex = index; } @@ -38,6 +47,7 @@ void BaseSearchInstance::removeAndUnsetListOwnership() Lock guard(&_mutex); if(_owner != NULL) { + this->release(); _owner->remove(this); _owner = NULL; } @@ -205,8 +215,10 @@ void SearchTimer::callback() if(channel->getOwnerIndex() > boostIndex) { _requestPendingChannels->pop(); + channel->acquire(); channel->unsetListOwnership(); _chanSearchManager->boostSearching(channel, boostIndex); + channel->release(); } } } @@ -226,8 +238,10 @@ void SearchTimer::callback() { if(_allowSlowdown) { + channel->acquire(); channel->unsetListOwnership(); _chanSearchManager->searchResponseTimeout(channel, _timerIndex); + channel->release(); } else { @@ -296,6 +310,7 @@ void SearchTimer::callback() } while (!canceled && channel != NULL) { + channel->acquire(); channel->unsetListOwnership(); bool requestSent = true; @@ -329,6 +344,8 @@ void SearchTimer::callback() _searchAttempts++; } } + + channel->release(); // limit if(triesInFrame == 0 && !allowNewFrame) break; @@ -503,7 +520,7 @@ void ChannelSearchManager::registerChannel(SearchInstance* channel) Lock guard(&_channelMutex); //overrides if already registered - _channels[channel->getSearchInstanceID()] = channel; + _channels[channel->getSearchInstanceID()] = channel; _timers[0]->installChannel(channel); } @@ -529,7 +546,18 @@ void ChannelSearchManager::searchResponse(int32 cid, int32 seqNo, int8 minorRevi { si = _channelsIter->second; _channels.erase(_channelsIter); + si->acquire(); si->removeAndUnsetListOwnership(); + + // report success + const int timerIndex = si->getOwnerIndex(); + TimeStamp now; + now.getCurrent(); + _timers[timerIndex]->searchResponse(seqNo, seqNo != 0, now.getMilliseconds()); + + // then notify SearchInstance + si->searchResponse(minorRevision, serverAddress); + si->release(); } else { @@ -537,19 +565,12 @@ void ChannelSearchManager::searchResponse(int32 cid, int32 seqNo, int8 minorRevi si = reinterpret_cast(_context->getChannel(cid)); if(si != NULL) { + si->acquire(); // TODO not thread/destruction safe si->searchResponse(minorRevision, serverAddress); + si->release(); } return; } - - // report success - const int timerIndex = si->getOwnerIndex(); - TimeStamp now; - now.getCurrent(); - _timers[timerIndex]->searchResponse(seqNo, seqNo != 0, now.getMilliseconds()); - - // then notify SearchInstance - si->searchResponse(minorRevision, serverAddress); } void ChannelSearchManager::beaconAnomalyNotify() diff --git a/pvAccessApp/remote/channelSearchManager.h b/pvAccessApp/remote/channelSearchManager.h index d99f52a..3973c4f 100644 --- a/pvAccessApp/remote/channelSearchManager.h +++ b/pvAccessApp/remote/channelSearchManager.h @@ -29,7 +29,7 @@ namespace epics { namespace pvAccess { /** * SearchInstance. */ -class SearchInstance { +class SearchInstance : public ReferenceCountingInstance { public: /** * Destructor @@ -93,6 +93,7 @@ class BaseSearchInstance : public SearchInstance { public: virtual ~BaseSearchInstance() {}; + void initializeSearchInstance(); virtual pvAccessID getSearchInstanceID() = 0; virtual String getSearchInstanceName() = 0; virtual void unsetListOwnership(); diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index 6c4ecb0..cd131c5 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -267,7 +267,7 @@ namespace epics { /** * Not public IF, used by Transports, etc. */ - class Context { + class Context : public ReferenceCountingInstance { public: virtual ~Context() { } @@ -355,7 +355,7 @@ namespace epics { * @author Matej Sekoranja * @version $Id: TransportClient.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ */ - class TransportClient { + class TransportClient : public ReferenceCountingInstance { public: virtual ~TransportClient() { } diff --git a/pvAccessApp/remoteClient/clientContextImpl.cpp b/pvAccessApp/remoteClient/clientContextImpl.cpp index fd755b3..5ba0016 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.cpp +++ b/pvAccessApp/remoteClient/clientContextImpl.cpp @@ -2178,14 +2178,13 @@ namespace epics { }; + + + PVDATA_REFCOUNT_MONITOR_DEFINE(remoteClientContext); + class InternalClientContextImpl : public ClientContextImpl { - - - - - /** * Implementation of CAJ JCA Channel. */ @@ -2279,6 +2278,7 @@ namespace epics { { PVDATA_REFCOUNT_MONITOR_DESTRUCT(channel); if (m_addresses) delete m_addresses; + m_context->release(); } public: @@ -2313,7 +2313,10 @@ namespace epics { { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channel); + initializeSearchInstance(); + // register before issuing search request + m_context->acquire(); m_context->registerChannel(this); // connect @@ -2528,13 +2531,14 @@ namespace epics { } /** - * @param force force destruction regardless of reference count + * @param force force destruction regardless of reference count (not used now) */ void destroy(bool force) { { Lock guard(&m_channelMutex); if (m_connectionState == DESTROYED) - throw std::runtime_error("Channel already destroyed."); + return; + //throw std::runtime_error("Channel already destroyed."); } // do destruction via context @@ -3065,8 +3069,10 @@ namespace epics { m_namedLocker(0), m_lastCID(0), m_lastIOID(0), m_channelSearchManager(0), m_version(new Version("CA Client", "cpp", 0, 0, 0, 1)), m_provider(new ChannelProviderImpl(this)), - m_contextState(CONTEXT_NOT_INITIALIZED), m_configuration(new SystemConfigurationImpl()) + m_contextState(CONTEXT_NOT_INITIALIZED), m_configuration(new SystemConfigurationImpl()), + m_refCount(1) { + PVDATA_REFCOUNT_MONITOR_CONSTRUCT(remoteClientContext); loadConfiguration(); } @@ -3161,17 +3167,19 @@ TODO virtual void destroy() { - m_contextMutex.lock(); - - if (m_contextState == CONTEXT_DESTROYED) { - m_contextMutex.unlock(); - throw std::runtime_error("Context already destroyed."); + Lock guard(&m_contextMutex); + + if (m_contextState == CONTEXT_DESTROYED) + { + m_contextMutex.unlock(); + throw std::runtime_error("Context already destroyed."); + } + + // go into destroyed state ASAP + m_contextState = CONTEXT_DESTROYED; } - - // go into destroyed state ASAP - m_contextState = CONTEXT_DESTROYED; - + internalDestroy(); } @@ -3181,8 +3189,46 @@ TODO destroy(); } + virtual void acquire() { + Lock guard(&m_contextMutex); + m_refCount++; + } + + virtual void release() { + m_contextMutex.lock(); + m_refCount--; + m_contextMutex.unlock(); + if (m_refCount == 0) + delete this; + } + private: - ~InternalClientContextImpl() {}; + ~InternalClientContextImpl() + { + PVDATA_REFCOUNT_MONITOR_DESTRUCT(remoteClientContext); + + // stop searching + if (m_channelSearchManager) + delete m_channelSearchManager; //->destroy(); + + // stop timer + if (m_timer) + delete m_timer; + + // TODO destroy !!! + if (m_broadcastTransport) + delete m_broadcastTransport; //->destroy(true); + if (m_searchTransport) + delete m_searchTransport; //->destroy(true); + + if (m_namedLocker) delete m_namedLocker; + if (m_transportRegistry) delete m_transportRegistry; + if (m_connector) delete m_connector; + if (m_configuration) delete m_configuration; + + m_provider->destroy(); + delete m_version; + }; void loadConfiguration() { m_addressList = m_configuration->getPropertyAsString("EPICS4_CA_ADDR_LIST", m_addressList); @@ -3273,41 +3319,33 @@ TODO void internalDestroy() { - // stop searching - if (m_channelSearchManager) - delete m_channelSearchManager; //->destroy(); - - // stop timer - if (m_timer) - delete m_timer; - // // cleanup // // this will also close all CA transports destroyAllChannels(); - - // TODO destroy !!! - if (m_broadcastTransport) - delete m_broadcastTransport; //->destroy(true); - if (m_searchTransport) - delete m_searchTransport; //->destroy(true); - - if (m_namedLocker) delete m_namedLocker; - if (m_transportRegistry) delete m_transportRegistry; - if (m_connector) delete m_connector; - if (m_configuration) delete m_configuration; - - m_provider->destroy(); - delete m_version; - m_contextMutex.unlock(); - delete this; + + release(); } void destroyAllChannels() { - // TODO - } + Lock guard(&m_cidMapMutex); + + int count = 0; + ChannelImpl* channels[m_channelsByCID.size()]; + for (CIDChannelMap::iterator iter = m_channelsByCID.begin(); + iter != m_channelsByCID.end(); + iter++) + { + channels[count++] = iter->second; + } + + for (int i = 0; i< count; i++) + { + EXCEPTION_GUARD(channels[i]->destroy()); + } + } /** * Check channel name. @@ -3719,6 +3757,8 @@ TODO friend class ChannelProviderImpl; Configuration* m_configuration; + + int m_refCount; }; ClientContextImpl* createClientContextImpl() diff --git a/pvAccessApp/server/serverContext.cpp b/pvAccessApp/server/serverContext.cpp index e7b8aae..6e2fce3 100644 --- a/pvAccessApp/server/serverContext.cpp +++ b/pvAccessApp/server/serverContext.cpp @@ -502,6 +502,9 @@ Transport* ServerContextImpl::getSearchTransport() //TODO return NULL; } +// TODO +void ServerContextImpl::acquire() {} +void ServerContextImpl::release() {} } } diff --git a/pvAccessApp/server/serverContext.h b/pvAccessApp/server/serverContext.h index 4ad0357..6b16093 100644 --- a/pvAccessApp/server/serverContext.h +++ b/pvAccessApp/server/serverContext.h @@ -258,6 +258,9 @@ public: */ ChannelProvider* getChannelProvider(); + void release(); + void acquire(); + private: /** * Major version. diff --git a/testApp/remote/testBeaconEmitter.cpp b/testApp/remote/testBeaconEmitter.cpp index 4c6cdaa..83d87b4 100644 --- a/testApp/remote/testBeaconEmitter.cpp +++ b/testApp/remote/testBeaconEmitter.cpp @@ -44,7 +44,8 @@ public: virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; } virtual Transport* getSearchTransport() { return 0; } virtual Configuration* getConfiguration() { return _conf; } - + virtual void acquire() {} + virtual void release() {} private: TransportRegistry* _tr; Timer* _timer; diff --git a/testApp/remote/testBeaconHandler.cpp b/testApp/remote/testBeaconHandler.cpp index c72194d..b09bc4e 100644 --- a/testApp/remote/testBeaconHandler.cpp +++ b/testApp/remote/testBeaconHandler.cpp @@ -118,6 +118,8 @@ public: virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; } virtual Transport* getSearchTransport() { return 0; } virtual Configuration* getConfiguration() { return _conf; } + virtual void acquire() {} + virtual void release() {} private: TransportRegistry* _tr; diff --git a/testApp/remote/testBlockingTCPClnt.cpp b/testApp/remote/testBlockingTCPClnt.cpp index f988736..d899557 100644 --- a/testApp/remote/testBlockingTCPClnt.cpp +++ b/testApp/remote/testBlockingTCPClnt.cpp @@ -53,6 +53,8 @@ public: virtual Configuration* getConfiguration() { return _conf; } + virtual void acquire() {} + virtual void release() {} private: TransportRegistry* _tr; @@ -92,6 +94,8 @@ public: virtual void transportClosed() { errlogSevPrintf(errlogInfo, "closed"); } + virtual void acquire() {}; + virtual void release() {}; }; class DummyTransportSender : public TransportSender { diff --git a/testApp/remote/testBlockingTCPSrv.cpp b/testApp/remote/testBlockingTCPSrv.cpp index 7d086e3..3732a20 100644 --- a/testApp/remote/testBlockingTCPSrv.cpp +++ b/testApp/remote/testBlockingTCPSrv.cpp @@ -34,6 +34,8 @@ public: virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; } virtual Transport* getSearchTransport() { return 0; } virtual Configuration* getConfiguration() { return _conf; } + virtual void acquire() {} + virtual void release() {} private: TransportRegistry* _tr; diff --git a/testApp/remote/testBlockingUDPClnt.cpp b/testApp/remote/testBlockingUDPClnt.cpp index 9e035fd..2178970 100644 --- a/testApp/remote/testBlockingUDPClnt.cpp +++ b/testApp/remote/testBlockingUDPClnt.cpp @@ -50,6 +50,8 @@ public: virtual Configuration* getConfiguration() { return 0; } + virtual void acquire() {} + virtual void release() {} }; class DummyResponseHandler : public ResponseHandler { diff --git a/testApp/remote/testBlockingUDPSrv.cpp b/testApp/remote/testBlockingUDPSrv.cpp index 7bbed6a..c043e0e 100644 --- a/testApp/remote/testBlockingUDPSrv.cpp +++ b/testApp/remote/testBlockingUDPSrv.cpp @@ -42,6 +42,8 @@ public: virtual Configuration* getConfiguration() { return 0; } + virtual void acquire() {} + virtual void release() {} }; class DummyResponseHandler : public ResponseHandler { diff --git a/testApp/remote/testChannelSearchManager.cpp b/testApp/remote/testChannelSearchManager.cpp index 26255bf..074166f 100644 --- a/testApp/remote/testChannelSearchManager.cpp +++ b/testApp/remote/testChannelSearchManager.cpp @@ -335,6 +335,9 @@ using namespace epics::pvAccess; } + virtual void acquire() {} + virtual void release() {} + }; class TestSearcInstance : public BaseSearchInstance @@ -344,6 +347,8 @@ public: pvAccessID getSearchInstanceID() { return _channelID;}; string getSearchInstanceName() {return _channelName;}; void searchResponse(int8 minorRevision, osiSockAddr* serverAddress) {}; + void acquire() {}; + void release() {}; private: pvAccessID _channelID; string _channelName; diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 0c95fdc..357e314 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -577,6 +577,7 @@ int main(int argc,char *argv[]) printf("done.\n"); */ + epicsThreadSleep ( 1.0 ); std::cout << "-----------------------------------------------------------------------" << std::endl; epicsExitCallAtExits(); CDRMonitor::get().show(stdout);