stability

This commit is contained in:
Matej Sekoranja
2011-02-10 17:05:34 +01:00
parent d95879102a
commit c87ff047ca
16 changed files with 178 additions and 67 deletions

View File

@@ -27,7 +27,11 @@ using namespace epics::pvData;
namespace epics {
namespace pvAccess {
BlockingClientTCPTransport::BlockingClientTCPTransport(
#define EXCEPTION_GUARD(code) try { code; } \
catch (std::exception &e) { errlogSevPrintf(errlogMajor, "Unhandled exception caught from code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \
catch (...) { errlogSevPrintf(errlogMajor, "Unhandled exception caught from code at %s:%d.", __FILE__, __LINE__); }
BlockingClientTCPTransport::BlockingClientTCPTransport(
Context* context, SOCKET channel,
ResponseHandler* responseHandler, int receiveBufferSize,
TransportClient* client, short remoteTransportRevision,
@@ -84,8 +88,12 @@ namespace epics {
_unresponsiveTransport = true;
set<TransportClient*>::iterator it = _owners.begin();
for(; it!=_owners.end(); it++)
(*it)->transportUnresponsive();
for(; it!=_owners.end(); it++) {
TransportClient* client = *it;
client->acquire();
EXCEPTION_GUARD(client->transportUnresponsive());
client->release();
}
}
}
@@ -129,8 +137,13 @@ namespace epics {
ipAddrStr, refs);
set<TransportClient*>::iterator it = _owners.begin();
for(; it!=_owners.end(); it++)
(*it)->transportClosed();
for(; it!=_owners.end(); it++) {
TransportClient* client = *it;
client->acquire();
EXCEPTION_GUARD(client->transportClosed());
client->release();
}
}
_owners.clear();
@@ -165,8 +178,12 @@ namespace epics {
_unresponsiveTransport = false;
set<TransportClient*>::iterator it = _owners.begin();
for(; it!=_owners.end(); it++)
(*it)->transportResponsive(this);
for(; it!=_owners.end(); it++) {
TransportClient* client = *it;
client->acquire();
EXCEPTION_GUARD(client->transportResponsive(this));
client->release();
}
}
}
@@ -175,8 +192,12 @@ namespace epics {
Lock lock(&_ownersMutex);
set<TransportClient*>::iterator it = _owners.begin();
for(; it!=_owners.end(); it++)
(*it)->transportChanged();
for(; it!=_owners.end(); it++) {
TransportClient* client = *it;
client->acquire();
EXCEPTION_GUARD(client->transportChanged());
client->release();
}
}
void BlockingClientTCPTransport::send(ByteBuffer* buffer,

View File

@@ -129,6 +129,7 @@ namespace epics {
clearAndReleaseBuffer();
// add to registry
_context->acquire();
_context->getTransportRegistry()->put(this);
}
@@ -151,6 +152,8 @@ namespace epics {
delete _sendBuffer;
delete _responseHandler;
_context->release();
}
void BlockingTCPTransport::start() {

View File

@@ -11,9 +11,17 @@ namespace epics { namespace pvAccess {
const int BaseSearchInstance::DATA_COUNT_POSITION = CA_MESSAGE_HEADER_SIZE + sizeof(int32)/sizeof(int8) + 1;
const int BaseSearchInstance::PAYLOAD_POSITION = sizeof(int16)/sizeof(int8) + 2;
void BaseSearchInstance::initializeSearchInstance()
{
_owner = NULL;
_ownerMutex = NULL;
_ownerIndex = -1;
}
void BaseSearchInstance::unsetListOwnership()
{
Lock guard(&_mutex);
if (_owner != NULL) this->release();
_owner = NULL;
}
@@ -25,6 +33,7 @@ void BaseSearchInstance::addAndSetListOwnership(ArrayFIFO<SearchInstance*>* newO
Lock ownerGuard(_ownerMutex);
Lock guard(&_mutex);
newOwner->push(this);
if (_owner == NULL) this->acquire(); // new owner
_owner = newOwner;
_ownerIndex = index;
}
@@ -38,6 +47,7 @@ void BaseSearchInstance::removeAndUnsetListOwnership()
Lock guard(&_mutex);
if(_owner != NULL)
{
this->release();
_owner->remove(this);
_owner = NULL;
}
@@ -205,8 +215,10 @@ void SearchTimer::callback()
if(channel->getOwnerIndex() > boostIndex)
{
_requestPendingChannels->pop();
channel->acquire();
channel->unsetListOwnership();
_chanSearchManager->boostSearching(channel, boostIndex);
channel->release();
}
}
}
@@ -226,8 +238,10 @@ void SearchTimer::callback()
{
if(_allowSlowdown)
{
channel->acquire();
channel->unsetListOwnership();
_chanSearchManager->searchResponseTimeout(channel, _timerIndex);
channel->release();
}
else
{
@@ -296,6 +310,7 @@ void SearchTimer::callback()
}
while (!canceled && channel != NULL)
{
channel->acquire();
channel->unsetListOwnership();
bool requestSent = true;
@@ -329,6 +344,8 @@ void SearchTimer::callback()
_searchAttempts++;
}
}
channel->release();
// limit
if(triesInFrame == 0 && !allowNewFrame) break;
@@ -503,7 +520,7 @@ void ChannelSearchManager::registerChannel(SearchInstance* channel)
Lock guard(&_channelMutex);
//overrides if already registered
_channels[channel->getSearchInstanceID()] = channel;
_channels[channel->getSearchInstanceID()] = channel;
_timers[0]->installChannel(channel);
}
@@ -529,7 +546,18 @@ void ChannelSearchManager::searchResponse(int32 cid, int32 seqNo, int8 minorRevi
{
si = _channelsIter->second;
_channels.erase(_channelsIter);
si->acquire();
si->removeAndUnsetListOwnership();
// report success
const int timerIndex = si->getOwnerIndex();
TimeStamp now;
now.getCurrent();
_timers[timerIndex]->searchResponse(seqNo, seqNo != 0, now.getMilliseconds());
// then notify SearchInstance
si->searchResponse(minorRevision, serverAddress);
si->release();
}
else
{
@@ -537,19 +565,12 @@ void ChannelSearchManager::searchResponse(int32 cid, int32 seqNo, int8 minorRevi
si = reinterpret_cast<SearchInstance*>(_context->getChannel(cid));
if(si != NULL)
{
si->acquire(); // TODO not thread/destruction safe
si->searchResponse(minorRevision, serverAddress);
si->release();
}
return;
}
// report success
const int timerIndex = si->getOwnerIndex();
TimeStamp now;
now.getCurrent();
_timers[timerIndex]->searchResponse(seqNo, seqNo != 0, now.getMilliseconds());
// then notify SearchInstance
si->searchResponse(minorRevision, serverAddress);
}
void ChannelSearchManager::beaconAnomalyNotify()

View File

@@ -29,7 +29,7 @@ namespace epics { namespace pvAccess {
/**
* SearchInstance.
*/
class SearchInstance {
class SearchInstance : public ReferenceCountingInstance {
public:
/**
* Destructor
@@ -93,6 +93,7 @@ class BaseSearchInstance : public SearchInstance
{
public:
virtual ~BaseSearchInstance() {};
void initializeSearchInstance();
virtual pvAccessID getSearchInstanceID() = 0;
virtual String getSearchInstanceName() = 0;
virtual void unsetListOwnership();

View File

@@ -267,7 +267,7 @@ namespace epics {
/**
* Not public IF, used by Transports, etc.
*/
class Context {
class Context : public ReferenceCountingInstance {
public:
virtual ~Context() {
}
@@ -355,7 +355,7 @@ namespace epics {
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: TransportClient.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
*/
class TransportClient {
class TransportClient : public ReferenceCountingInstance {
public:
virtual ~TransportClient() {
}

View File

@@ -2178,14 +2178,13 @@ namespace epics {
};
PVDATA_REFCOUNT_MONITOR_DEFINE(remoteClientContext);
class InternalClientContextImpl : public ClientContextImpl
{
/**
* Implementation of CAJ JCA <code>Channel</code>.
*/
@@ -2279,6 +2278,7 @@ namespace epics {
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(channel);
if (m_addresses) delete m_addresses;
m_context->release();
}
public:
@@ -2313,7 +2313,10 @@ namespace epics {
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channel);
initializeSearchInstance();
// register before issuing search request
m_context->acquire();
m_context->registerChannel(this);
// connect
@@ -2528,13 +2531,14 @@ namespace epics {
}
/**
* @param force force destruction regardless of reference count
* @param force force destruction regardless of reference count (not used now)
*/
void destroy(bool force) {
{
Lock guard(&m_channelMutex);
if (m_connectionState == DESTROYED)
throw std::runtime_error("Channel already destroyed.");
return;
//throw std::runtime_error("Channel already destroyed.");
}
// do destruction via context
@@ -3065,8 +3069,10 @@ namespace epics {
m_namedLocker(0), m_lastCID(0), m_lastIOID(0), m_channelSearchManager(0),
m_version(new Version("CA Client", "cpp", 0, 0, 0, 1)),
m_provider(new ChannelProviderImpl(this)),
m_contextState(CONTEXT_NOT_INITIALIZED), m_configuration(new SystemConfigurationImpl())
m_contextState(CONTEXT_NOT_INITIALIZED), m_configuration(new SystemConfigurationImpl()),
m_refCount(1)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(remoteClientContext);
loadConfiguration();
}
@@ -3161,17 +3167,19 @@ TODO
virtual void destroy()
{
m_contextMutex.lock();
if (m_contextState == CONTEXT_DESTROYED)
{
m_contextMutex.unlock();
throw std::runtime_error("Context already destroyed.");
Lock guard(&m_contextMutex);
if (m_contextState == CONTEXT_DESTROYED)
{
m_contextMutex.unlock();
throw std::runtime_error("Context already destroyed.");
}
// go into destroyed state ASAP
m_contextState = CONTEXT_DESTROYED;
}
// go into destroyed state ASAP
m_contextState = CONTEXT_DESTROYED;
internalDestroy();
}
@@ -3181,8 +3189,46 @@ TODO
destroy();
}
virtual void acquire() {
Lock guard(&m_contextMutex);
m_refCount++;
}
virtual void release() {
m_contextMutex.lock();
m_refCount--;
m_contextMutex.unlock();
if (m_refCount == 0)
delete this;
}
private:
~InternalClientContextImpl() {};
~InternalClientContextImpl()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(remoteClientContext);
// stop searching
if (m_channelSearchManager)
delete m_channelSearchManager; //->destroy();
// stop timer
if (m_timer)
delete m_timer;
// TODO destroy !!!
if (m_broadcastTransport)
delete m_broadcastTransport; //->destroy(true);
if (m_searchTransport)
delete m_searchTransport; //->destroy(true);
if (m_namedLocker) delete m_namedLocker;
if (m_transportRegistry) delete m_transportRegistry;
if (m_connector) delete m_connector;
if (m_configuration) delete m_configuration;
m_provider->destroy();
delete m_version;
};
void loadConfiguration() {
m_addressList = m_configuration->getPropertyAsString("EPICS4_CA_ADDR_LIST", m_addressList);
@@ -3273,41 +3319,33 @@ TODO
void internalDestroy() {
// stop searching
if (m_channelSearchManager)
delete m_channelSearchManager; //->destroy();
// stop timer
if (m_timer)
delete m_timer;
//
// cleanup
//
// this will also close all CA transports
destroyAllChannels();
// TODO destroy !!!
if (m_broadcastTransport)
delete m_broadcastTransport; //->destroy(true);
if (m_searchTransport)
delete m_searchTransport; //->destroy(true);
if (m_namedLocker) delete m_namedLocker;
if (m_transportRegistry) delete m_transportRegistry;
if (m_connector) delete m_connector;
if (m_configuration) delete m_configuration;
m_provider->destroy();
delete m_version;
m_contextMutex.unlock();
delete this;
release();
}
void destroyAllChannels() {
// TODO
}
Lock guard(&m_cidMapMutex);
int count = 0;
ChannelImpl* channels[m_channelsByCID.size()];
for (CIDChannelMap::iterator iter = m_channelsByCID.begin();
iter != m_channelsByCID.end();
iter++)
{
channels[count++] = iter->second;
}
for (int i = 0; i< count; i++)
{
EXCEPTION_GUARD(channels[i]->destroy());
}
}
/**
* Check channel name.
@@ -3719,6 +3757,8 @@ TODO
friend class ChannelProviderImpl;
Configuration* m_configuration;
int m_refCount;
};
ClientContextImpl* createClientContextImpl()

View File

@@ -502,6 +502,9 @@ Transport* ServerContextImpl::getSearchTransport()
//TODO
return NULL;
}
// TODO
void ServerContextImpl::acquire() {}
void ServerContextImpl::release() {}
}
}

View File

@@ -258,6 +258,9 @@ public:
*/
ChannelProvider* getChannelProvider();
void release();
void acquire();
private:
/**
* Major version.

View File

@@ -44,7 +44,8 @@ public:
virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; }
virtual Transport* getSearchTransport() { return 0; }
virtual Configuration* getConfiguration() { return _conf; }
virtual void acquire() {}
virtual void release() {}
private:
TransportRegistry* _tr;
Timer* _timer;

View File

@@ -118,6 +118,8 @@ public:
virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; }
virtual Transport* getSearchTransport() { return 0; }
virtual Configuration* getConfiguration() { return _conf; }
virtual void acquire() {}
virtual void release() {}
private:
TransportRegistry* _tr;

View File

@@ -53,6 +53,8 @@ public:
virtual Configuration* getConfiguration() {
return _conf;
}
virtual void acquire() {}
virtual void release() {}
private:
TransportRegistry* _tr;
@@ -92,6 +94,8 @@ public:
virtual void transportClosed() {
errlogSevPrintf(errlogInfo, "closed");
}
virtual void acquire() {};
virtual void release() {};
};
class DummyTransportSender : public TransportSender {

View File

@@ -34,6 +34,8 @@ public:
virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; }
virtual Transport* getSearchTransport() { return 0; }
virtual Configuration* getConfiguration() { return _conf; }
virtual void acquire() {}
virtual void release() {}
private:
TransportRegistry* _tr;

View File

@@ -50,6 +50,8 @@ public:
virtual Configuration* getConfiguration() {
return 0;
}
virtual void acquire() {}
virtual void release() {}
};
class DummyResponseHandler : public ResponseHandler {

View File

@@ -42,6 +42,8 @@ public:
virtual Configuration* getConfiguration() {
return 0;
}
virtual void acquire() {}
virtual void release() {}
};
class DummyResponseHandler : public ResponseHandler {

View File

@@ -335,6 +335,9 @@ using namespace epics::pvAccess;
}
virtual void acquire() {}
virtual void release() {}
};
class TestSearcInstance : public BaseSearchInstance
@@ -344,6 +347,8 @@ public:
pvAccessID getSearchInstanceID() { return _channelID;};
string getSearchInstanceName() {return _channelName;};
void searchResponse(int8 minorRevision, osiSockAddr* serverAddress) {};
void acquire() {};
void release() {};
private:
pvAccessID _channelID;
string _channelName;

View File

@@ -577,6 +577,7 @@ int main(int argc,char *argv[])
printf("done.\n");
*/
epicsThreadSleep ( 1.0 );
std::cout << "-----------------------------------------------------------------------" << std::endl;
epicsExitCallAtExits();
CDRMonitor::get().show(stdout);