memory management fixes

This commit is contained in:
Matej Sekoranja
2011-01-23 23:47:13 +01:00
parent d8f81d79db
commit b23e7f13d8
6 changed files with 74 additions and 74 deletions

View File

@@ -34,11 +34,9 @@ namespace epics {
float beaconInterval, int16 priority) :
BlockingTCPTransport(context, channel, responseHandler,
receiveBufferSize, priority), _introspectionRegistry(
new IntrospectionRegistry(false)), _owners(new set<
TransportClient*> ()), _connectionTimeout(beaconInterval
new IntrospectionRegistry(false)), _connectionTimeout(beaconInterval
*1000), _unresponsiveTransport(false), _timerNode(
new TimerNode(this)), _mutex(new Mutex()), _ownersMutex(
new Mutex()), _verifyOrEcho(true) {
new TimerNode(this)), _verifyOrEcho(true) {
_autoDelete = false;
// initialize owners list, send queue
@@ -58,11 +56,9 @@ namespace epics {
}
BlockingClientTCPTransport::~BlockingClientTCPTransport() {
printf("========== ~BlockingClientTCPTransport\n");
delete _introspectionRegistry;
delete _owners;
delete _timerNode;
delete _mutex;
delete _ownersMutex;
}
void BlockingClientTCPTransport::callback() {
@@ -84,15 +80,15 @@ namespace epics {
if(!_unresponsiveTransport) {
_unresponsiveTransport = true;
Lock lock(_ownersMutex);
set<TransportClient*>::iterator it = _owners->begin();
for(; it!=_owners->end(); it++)
Lock lock(&_ownersMutex);
set<TransportClient*>::iterator it = _owners.begin();
for(; it!=_owners.end(); it++)
(*it)->transportUnresponsive();
}
}
bool BlockingClientTCPTransport::acquire(TransportClient* client) {
Lock lock(_mutex);
Lock lock(&_mutex);
if(_closed) return false;
@@ -100,11 +96,9 @@ namespace epics {
ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr));
errlogSevPrintf(errlogInfo, "Acquiring transport to %s.", ipAddrStr);
_ownersMutex->lock();
if(_closed) return false;
_owners->insert(client);
_ownersMutex->unlock();
Lock lock2(&_ownersMutex);
// TODO double check? if(_closed) return false;
_owners.insert(client);
return true;
}
@@ -121,10 +115,10 @@ namespace epics {
* Notifies clients about disconnect.
*/
void BlockingClientTCPTransport::closedNotifyClients() {
Lock lock(_ownersMutex);
Lock lock(&_ownersMutex);
// check if still acquired
int refs = _owners->size();
int refs = _owners.size();
if(refs>0) {
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr));
@@ -133,12 +127,12 @@ namespace epics {
"Transport to %s still has %d client(s) active and closing...",
ipAddrStr, refs);
set<TransportClient*>::iterator it = _owners->begin();
for(; it!=_owners->end(); it++)
set<TransportClient*>::iterator it = _owners.begin();
for(; it!=_owners.end(); it++)
(*it)->transportClosed();
}
_owners->clear();
_owners.clear();
}
void BlockingClientTCPTransport::release(TransportClient* client) {
@@ -149,12 +143,12 @@ namespace epics {
errlogSevPrintf(errlogInfo, "Releasing transport to %s.", ipAddrStr);
Lock lock(_ownersMutex);
_owners->erase(client);
Lock lock(&_ownersMutex);
_owners.erase(client);
// not used anymore
// TODO consider delayed destruction (can improve performance!!!)
if(_owners->size()==0) close(false);
if(_owners.size()==0) close(false);
}
void BlockingClientTCPTransport::aliveNotification() {
@@ -165,20 +159,20 @@ namespace epics {
void BlockingClientTCPTransport::responsiveTransport() {
if(_unresponsiveTransport) {
_unresponsiveTransport = false;
Lock lock(_ownersMutex);
Lock lock(&_ownersMutex);
set<TransportClient*>::iterator it = _owners->begin();
for(; it!=_owners->end(); it++)
set<TransportClient*>::iterator it = _owners.begin();
for(; it!=_owners.end(); it++)
(*it)->transportResponsive(this);
}
}
void BlockingClientTCPTransport::changedTransport() {
_introspectionRegistry->reset();
Lock lock(_ownersMutex);
Lock lock(&_ownersMutex);
set<TransportClient*>::iterator it = _owners->begin();
for(; it!=_owners->end(); it++)
set<TransportClient*>::iterator it = _owners.begin();
for(; it!=_owners.end(); it++)
(*it)->transportChanged();
}

View File

@@ -31,9 +31,7 @@ namespace epics {
BlockingTCPTransport(context, channel, responseHandler,
receiveBufferSize, CA_DEFAULT_PRIORITY),
_introspectionRegistry(new IntrospectionRegistry(true)),
_lastChannelSID(0), _channels(
new map<int, ServerChannel*> ()), _channelsMutex(
new Mutex()) {
_lastChannelSID(0) {
// NOTE: priority not yet known, default priority is used to register/unregister
// TODO implement priorities in Reactor... not that user will
// change it.. still getPriority() must return "registered" priority!
@@ -43,13 +41,11 @@ namespace epics {
BlockingServerTCPTransport::~BlockingServerTCPTransport() {
delete _introspectionRegistry;
delete _channels;
delete _channelsMutex;
}
void BlockingServerTCPTransport::destroyAllChannels() {
Lock lock(_channelsMutex);
if(_channels->size()==0) return;
Lock lock(&_channelsMutex);
if(_channels.size()==0) return;
char ipAddrStr[64];
ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr));
@@ -57,13 +53,13 @@ namespace epics {
errlogSevPrintf(
errlogInfo,
"Transport to %s still has %u channel(s) active and closing...",
ipAddrStr, _channels->size());
ipAddrStr, _channels.size());
map<pvAccessID, ServerChannel*>::iterator it = _channels->begin();
for(; it!=_channels->end(); it++)
map<pvAccessID, ServerChannel*>::iterator it = _channels.begin();
for(; it!=_channels.end(); it++)
it->second->destroy();
_channels->clear();
_channels.clear();
}
void BlockingServerTCPTransport::internalClose(bool force) {
@@ -72,37 +68,37 @@ namespace epics {
}
pvAccessID BlockingServerTCPTransport::preallocateChannelSID() {
Lock lock(_channelsMutex);
Lock lock(&_channelsMutex);
// search first free (theoretically possible loop of death)
pvAccessID sid = ++_lastChannelSID;
while(_channels->find(sid)!=_channels->end())
while(_channels.find(sid)!=_channels.end())
sid = ++_lastChannelSID;
return sid;
}
void BlockingServerTCPTransport::registerChannel(pvAccessID sid,
ServerChannel* channel) {
Lock lock(_channelsMutex);
(*_channels)[sid] = channel;
Lock lock(&_channelsMutex);
_channels[sid] = channel;
}
void BlockingServerTCPTransport::unregisterChannel(pvAccessID sid) {
Lock lock(_channelsMutex);
_channels->erase(sid);
Lock lock(&_channelsMutex);
_channels.erase(sid);
}
ServerChannel* BlockingServerTCPTransport::getChannel(pvAccessID sid) {
Lock lock(_channelsMutex);
Lock lock(&_channelsMutex);
map<pvAccessID, ServerChannel*>::iterator it = _channels->find(sid);
if(it!=_channels->end()) return it->second;
map<pvAccessID, ServerChannel*>::iterator it = _channels.find(sid);
if(it!=_channels.end()) return it->second;
return NULL;
}
int BlockingServerTCPTransport::getChannelCount() {
Lock lock(_channelsMutex);
return _channels->size();
Lock lock(&_channelsMutex);
return _channels.size();
}
void BlockingServerTCPTransport::send(ByteBuffer* buffer,

View File

@@ -52,7 +52,7 @@ namespace epics {
BlockingTCPTransport(Context* context, SOCKET channel,
ResponseHandler* responseHandler, int receiveBufferSize,
int16 priority);
virtual bool isClosed() const {
return _closed;
}
@@ -439,7 +439,7 @@ namespace epics {
/**
* Owners (users) of the transport.
*/
std::set<TransportClient*>* _owners;
std::set<TransportClient*> _owners;
/**
* Connection timeout (no-traffic) flag.
@@ -461,8 +461,8 @@ namespace epics {
*/
volatile epicsTimeStamp _aliveTimestamp;
epics::pvData::Mutex* _mutex;
epics::pvData::Mutex* _ownersMutex;
epics::pvData::Mutex _mutex;
epics::pvData::Mutex _ownersMutex;
bool _verifyOrEcho;
@@ -645,9 +645,9 @@ namespace epics {
/**
* Channel table (SID -> channel mapping).
*/
std::map<pvAccessID, ServerChannel*>* _channels;
std::map<pvAccessID, ServerChannel*> _channels;
Mutex* _channelsMutex;
Mutex _channelsMutex;
/**
* Destroy all channels.
@@ -672,7 +672,7 @@ namespace epics {
BlockingTCPAcceptor(Context* context, int port,
int receiveBufferSize);
~BlockingTCPAcceptor();
virtual ~BlockingTCPAcceptor();
void handleEvents();

View File

@@ -130,6 +130,8 @@ namespace epics {
BlockingTCPTransport::~BlockingTCPTransport() {
close(true);
// TODO remove
epicsThreadSleep(3.0);
delete _socketAddress;
delete _sendQueue;
@@ -659,7 +661,7 @@ namespace epics {
} catch(BaseException* e) {
String trace;
e->toString(trace);
errlogSevPrintf(errlogMajor, trace.c_str());
errlogSevPrintf(errlogMajor, "%s", trace.c_str());
// error, release lock
clearAndReleaseBuffer();
} catch(...) {
@@ -703,7 +705,7 @@ namespace epics {
// connection lost
ostringstream temp;
temp<<"error in sending TCP data: "<<strerror(errno);
errlogSevPrintf(errlogMajor, temp.str().c_str());
errlogSevPrintf(errlogMajor, "%s", temp.str().c_str());
THROW_BASE_EXCEPTION(temp.str().c_str());
}
else if(bytesSent==0) {
@@ -815,7 +817,7 @@ namespace epics {
} catch(BaseException* e) {
String trace;
e->toString(trace);
errlogSevPrintf(errlogMajor, trace.c_str());
errlogSevPrintf(errlogMajor, "%s", trace.c_str());
_sendBuffer->setPosition(_lastMessageStartPosition);
} catch(...) {
_sendBuffer->setPosition(_lastMessageStartPosition);

View File

@@ -26,6 +26,7 @@ IntrospectionRegistry::IntrospectionRegistry(bool serverSide) : _mutex(Mutex())
IntrospectionRegistry::~IntrospectionRegistry()
{
reset();
}
void IntrospectionRegistry::reset()
@@ -242,7 +243,9 @@ FieldConstPtr IntrospectionRegistry::deserialize(ByteBuffer* buffer, Deserializa
else if(typeCode == IntrospectionRegistry::ONLY_ID_TYPE_CODE)
{
control->ensureData(sizeof(int16)/sizeof(int8));
return registry->getIntrospectionInterface(buffer->getShort());
FieldConstPtr field = registry->getIntrospectionInterface(buffer->getShort());
field->incReferenceCount(); // we inc, so that deserialize always returns a field with +1 ref. count (as when created)
return field;
}
// could also be a mask
@@ -305,7 +308,6 @@ StructureConstPtr IntrospectionRegistry::deserializeStructureField(ByteBuffer* b
}
StructureConstPtr structure = _fieldCreate->createStructure(structureFieldName, size, fields);
//???????delete [] fields;
return structure;
}
@@ -353,7 +355,8 @@ PVStructurePtr IntrospectionRegistry::deserializeStructureAndCreatePVStructure(B
{
return NULL;
}
return _pvDataCreate->createPVStructure(NULL,static_cast<StructureConstPtr>(field));
PVStructurePtr retVal = _pvDataCreate->createPVStructure(NULL,static_cast<StructureConstPtr>(field));
return retVal;
}
void IntrospectionRegistry::serializeStatus(ByteBuffer* buffer, SerializableControl* control, Status* status)

View File

@@ -761,6 +761,7 @@ class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender
// deserialize Field...
const Field* field = transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport);
m_callback->getDone(status, field);
field->decReferenceCount();
}
else
{
@@ -1499,6 +1500,7 @@ class TestChannelImpl : public ChannelImpl {
virtual void destroy()
{
destroy(false); //TODO guard
if (m_addresses) delete m_addresses;
delete this;
};
@@ -3072,11 +3074,11 @@ int main(int argc,char *argv[])
epicsThreadSleep ( 1.0 );
channel->printInfo();
/*
GetFieldRequesterImpl getFieldRequesterImpl;
channel->getField(&getFieldRequesterImpl, "");
epicsThreadSleep ( 1.0 );
/*
ChannelProcessRequesterImpl channelProcessRequester;
ChannelProcess* channelProcess = channel->createChannelProcess(&channelProcessRequester, 0);
epicsThreadSleep ( 1.0 );
@@ -3084,16 +3086,16 @@ int main(int argc,char *argv[])
epicsThreadSleep ( 1.0 );
channelProcess->destroy();
epicsThreadSleep ( 1.0 );
*/
ChannelGetRequesterImpl channelGetRequesterImpl;
PVStructure* pvRequest = getCreateRequest()->createRequest("field(timeStamp,value)",&channelGetRequesterImpl);
PVStructure* pvRequest = getCreateRequest()->createRequest("field(value)",&channelGetRequesterImpl);
ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, pvRequest);
epicsThreadSleep ( 3.0 );
channelGet->get(false);
epicsThreadSleep ( 3.0 );
//TODOchannelGet->destroy();
channelGet->destroy();
epicsThreadSleep ( 1.0 );
/*
ChannelPutRequesterImpl channelPutRequesterImpl;
ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, pvRequest);
epicsThreadSleep ( 1.0 );
@@ -3101,10 +3103,9 @@ int main(int argc,char *argv[])
epicsThreadSleep ( 1.0 );
channelPut->put(false);
epicsThreadSleep ( 1.0 );
//TODOchannelPut->destroy();
channelPut->destroy();
// TODO delete pvRequest
*/
/*
MonitorRequesterImpl monitorRequesterImpl;
Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, 0);
@@ -3122,6 +3123,10 @@ int main(int argc,char *argv[])
monitor->destroy();
*/
// TODO share it?
delete pvRequest;
epicsThreadSleep ( 3.0 );
channel->destroy();