diff --git a/pvAccessApp/remote/blockingClientTCPTransport.cpp b/pvAccessApp/remote/blockingClientTCPTransport.cpp index b793374..5f4a30f 100644 --- a/pvAccessApp/remote/blockingClientTCPTransport.cpp +++ b/pvAccessApp/remote/blockingClientTCPTransport.cpp @@ -34,11 +34,9 @@ namespace epics { float beaconInterval, int16 priority) : BlockingTCPTransport(context, channel, responseHandler, receiveBufferSize, priority), _introspectionRegistry( - new IntrospectionRegistry(false)), _owners(new set< - TransportClient*> ()), _connectionTimeout(beaconInterval + new IntrospectionRegistry(false)), _connectionTimeout(beaconInterval *1000), _unresponsiveTransport(false), _timerNode( - new TimerNode(this)), _mutex(new Mutex()), _ownersMutex( - new Mutex()), _verifyOrEcho(true) { + new TimerNode(this)), _verifyOrEcho(true) { _autoDelete = false; // initialize owners list, send queue @@ -58,11 +56,9 @@ namespace epics { } BlockingClientTCPTransport::~BlockingClientTCPTransport() { + printf("========== ~BlockingClientTCPTransport\n"); delete _introspectionRegistry; - delete _owners; delete _timerNode; - delete _mutex; - delete _ownersMutex; } void BlockingClientTCPTransport::callback() { @@ -84,15 +80,15 @@ namespace epics { if(!_unresponsiveTransport) { _unresponsiveTransport = true; - Lock lock(_ownersMutex); - set::iterator it = _owners->begin(); - for(; it!=_owners->end(); it++) + Lock lock(&_ownersMutex); + set::iterator it = _owners.begin(); + for(; it!=_owners.end(); it++) (*it)->transportUnresponsive(); } } bool BlockingClientTCPTransport::acquire(TransportClient* client) { - Lock lock(_mutex); + Lock lock(&_mutex); if(_closed) return false; @@ -100,11 +96,9 @@ namespace epics { ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Acquiring transport to %s.", ipAddrStr); - _ownersMutex->lock(); - if(_closed) return false; - - _owners->insert(client); - _ownersMutex->unlock(); + Lock lock2(&_ownersMutex); +// TODO double check? if(_closed) return false; + _owners.insert(client); return true; } @@ -121,10 +115,10 @@ namespace epics { * Notifies clients about disconnect. */ void BlockingClientTCPTransport::closedNotifyClients() { - Lock lock(_ownersMutex); + Lock lock(&_ownersMutex); // check if still acquired - int refs = _owners->size(); + int refs = _owners.size(); if(refs>0) { char ipAddrStr[48]; ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); @@ -133,12 +127,12 @@ namespace epics { "Transport to %s still has %d client(s) active and closing...", ipAddrStr, refs); - set::iterator it = _owners->begin(); - for(; it!=_owners->end(); it++) + set::iterator it = _owners.begin(); + for(; it!=_owners.end(); it++) (*it)->transportClosed(); } - _owners->clear(); + _owners.clear(); } void BlockingClientTCPTransport::release(TransportClient* client) { @@ -149,12 +143,12 @@ namespace epics { errlogSevPrintf(errlogInfo, "Releasing transport to %s.", ipAddrStr); - Lock lock(_ownersMutex); - _owners->erase(client); + Lock lock(&_ownersMutex); + _owners.erase(client); // not used anymore // TODO consider delayed destruction (can improve performance!!!) - if(_owners->size()==0) close(false); + if(_owners.size()==0) close(false); } void BlockingClientTCPTransport::aliveNotification() { @@ -165,20 +159,20 @@ namespace epics { void BlockingClientTCPTransport::responsiveTransport() { if(_unresponsiveTransport) { _unresponsiveTransport = false; - Lock lock(_ownersMutex); + Lock lock(&_ownersMutex); - set::iterator it = _owners->begin(); - for(; it!=_owners->end(); it++) + set::iterator it = _owners.begin(); + for(; it!=_owners.end(); it++) (*it)->transportResponsive(this); } } void BlockingClientTCPTransport::changedTransport() { _introspectionRegistry->reset(); - Lock lock(_ownersMutex); + Lock lock(&_ownersMutex); - set::iterator it = _owners->begin(); - for(; it!=_owners->end(); it++) + set::iterator it = _owners.begin(); + for(; it!=_owners.end(); it++) (*it)->transportChanged(); } diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp index 079e2f4..86ab989 100644 --- a/pvAccessApp/remote/blockingServerTCPTransport.cpp +++ b/pvAccessApp/remote/blockingServerTCPTransport.cpp @@ -31,9 +31,7 @@ namespace epics { BlockingTCPTransport(context, channel, responseHandler, receiveBufferSize, CA_DEFAULT_PRIORITY), _introspectionRegistry(new IntrospectionRegistry(true)), - _lastChannelSID(0), _channels( - new map ()), _channelsMutex( - new Mutex()) { + _lastChannelSID(0) { // NOTE: priority not yet known, default priority is used to register/unregister // TODO implement priorities in Reactor... not that user will // change it.. still getPriority() must return "registered" priority! @@ -43,13 +41,11 @@ namespace epics { BlockingServerTCPTransport::~BlockingServerTCPTransport() { delete _introspectionRegistry; - delete _channels; - delete _channelsMutex; } void BlockingServerTCPTransport::destroyAllChannels() { - Lock lock(_channelsMutex); - if(_channels->size()==0) return; + Lock lock(&_channelsMutex); + if(_channels.size()==0) return; char ipAddrStr[64]; ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); @@ -57,13 +53,13 @@ namespace epics { errlogSevPrintf( errlogInfo, "Transport to %s still has %u channel(s) active and closing...", - ipAddrStr, _channels->size()); + ipAddrStr, _channels.size()); - map::iterator it = _channels->begin(); - for(; it!=_channels->end(); it++) + map::iterator it = _channels.begin(); + for(; it!=_channels.end(); it++) it->second->destroy(); - _channels->clear(); + _channels.clear(); } void BlockingServerTCPTransport::internalClose(bool force) { @@ -72,37 +68,37 @@ namespace epics { } pvAccessID BlockingServerTCPTransport::preallocateChannelSID() { - Lock lock(_channelsMutex); + Lock lock(&_channelsMutex); // search first free (theoretically possible loop of death) pvAccessID sid = ++_lastChannelSID; - while(_channels->find(sid)!=_channels->end()) + while(_channels.find(sid)!=_channels.end()) sid = ++_lastChannelSID; return sid; } void BlockingServerTCPTransport::registerChannel(pvAccessID sid, ServerChannel* channel) { - Lock lock(_channelsMutex); - (*_channels)[sid] = channel; + Lock lock(&_channelsMutex); + _channels[sid] = channel; } void BlockingServerTCPTransport::unregisterChannel(pvAccessID sid) { - Lock lock(_channelsMutex); - _channels->erase(sid); + Lock lock(&_channelsMutex); + _channels.erase(sid); } ServerChannel* BlockingServerTCPTransport::getChannel(pvAccessID sid) { - Lock lock(_channelsMutex); + Lock lock(&_channelsMutex); - map::iterator it = _channels->find(sid); - if(it!=_channels->end()) return it->second; + map::iterator it = _channels.find(sid); + if(it!=_channels.end()) return it->second; return NULL; } int BlockingServerTCPTransport::getChannelCount() { - Lock lock(_channelsMutex); - return _channels->size(); + Lock lock(&_channelsMutex); + return _channels.size(); } void BlockingServerTCPTransport::send(ByteBuffer* buffer, diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index b017341..19fc260 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -52,7 +52,7 @@ namespace epics { BlockingTCPTransport(Context* context, SOCKET channel, ResponseHandler* responseHandler, int receiveBufferSize, int16 priority); - + virtual bool isClosed() const { return _closed; } @@ -439,7 +439,7 @@ namespace epics { /** * Owners (users) of the transport. */ - std::set* _owners; + std::set _owners; /** * Connection timeout (no-traffic) flag. @@ -461,8 +461,8 @@ namespace epics { */ volatile epicsTimeStamp _aliveTimestamp; - epics::pvData::Mutex* _mutex; - epics::pvData::Mutex* _ownersMutex; + epics::pvData::Mutex _mutex; + epics::pvData::Mutex _ownersMutex; bool _verifyOrEcho; @@ -645,9 +645,9 @@ namespace epics { /** * Channel table (SID -> channel mapping). */ - std::map* _channels; + std::map _channels; - Mutex* _channelsMutex; + Mutex _channelsMutex; /** * Destroy all channels. @@ -672,7 +672,7 @@ namespace epics { BlockingTCPAcceptor(Context* context, int port, int receiveBufferSize); - ~BlockingTCPAcceptor(); + virtual ~BlockingTCPAcceptor(); void handleEvents(); diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index 8d3dd00..930edf5 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -130,6 +130,8 @@ namespace epics { BlockingTCPTransport::~BlockingTCPTransport() { close(true); + // TODO remove + epicsThreadSleep(3.0); delete _socketAddress; delete _sendQueue; @@ -659,7 +661,7 @@ namespace epics { } catch(BaseException* e) { String trace; e->toString(trace); - errlogSevPrintf(errlogMajor, trace.c_str()); + errlogSevPrintf(errlogMajor, "%s", trace.c_str()); // error, release lock clearAndReleaseBuffer(); } catch(...) { @@ -703,7 +705,7 @@ namespace epics { // connection lost ostringstream temp; temp<<"error in sending TCP data: "<toString(trace); - errlogSevPrintf(errlogMajor, trace.c_str()); + errlogSevPrintf(errlogMajor, "%s", trace.c_str()); _sendBuffer->setPosition(_lastMessageStartPosition); } catch(...) { _sendBuffer->setPosition(_lastMessageStartPosition); diff --git a/pvAccessApp/utils/introspectionRegistry.cpp b/pvAccessApp/utils/introspectionRegistry.cpp index a50ee41..5e0e229 100644 --- a/pvAccessApp/utils/introspectionRegistry.cpp +++ b/pvAccessApp/utils/introspectionRegistry.cpp @@ -26,6 +26,7 @@ IntrospectionRegistry::IntrospectionRegistry(bool serverSide) : _mutex(Mutex()) IntrospectionRegistry::~IntrospectionRegistry() { + reset(); } void IntrospectionRegistry::reset() @@ -242,7 +243,9 @@ FieldConstPtr IntrospectionRegistry::deserialize(ByteBuffer* buffer, Deserializa else if(typeCode == IntrospectionRegistry::ONLY_ID_TYPE_CODE) { control->ensureData(sizeof(int16)/sizeof(int8)); - return registry->getIntrospectionInterface(buffer->getShort()); + FieldConstPtr field = registry->getIntrospectionInterface(buffer->getShort()); + field->incReferenceCount(); // we inc, so that deserialize always returns a field with +1 ref. count (as when created) + return field; } // could also be a mask @@ -305,7 +308,6 @@ StructureConstPtr IntrospectionRegistry::deserializeStructureField(ByteBuffer* b } StructureConstPtr structure = _fieldCreate->createStructure(structureFieldName, size, fields); - //???????delete [] fields; return structure; } @@ -353,7 +355,8 @@ PVStructurePtr IntrospectionRegistry::deserializeStructureAndCreatePVStructure(B { return NULL; } - return _pvDataCreate->createPVStructure(NULL,static_cast(field)); + PVStructurePtr retVal = _pvDataCreate->createPVStructure(NULL,static_cast(field)); + return retVal; } void IntrospectionRegistry::serializeStatus(ByteBuffer* buffer, SerializableControl* control, Status* status) diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 272fb11..b26afac 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -761,6 +761,7 @@ class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender // deserialize Field... const Field* field = transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport); m_callback->getDone(status, field); + field->decReferenceCount(); } else { @@ -1499,6 +1500,7 @@ class TestChannelImpl : public ChannelImpl { virtual void destroy() { + destroy(false); //TODO guard if (m_addresses) delete m_addresses; delete this; }; @@ -3072,11 +3074,11 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 1.0 ); channel->printInfo(); - +/* GetFieldRequesterImpl getFieldRequesterImpl; channel->getField(&getFieldRequesterImpl, ""); epicsThreadSleep ( 1.0 ); -/* + ChannelProcessRequesterImpl channelProcessRequester; ChannelProcess* channelProcess = channel->createChannelProcess(&channelProcessRequester, 0); epicsThreadSleep ( 1.0 ); @@ -3084,16 +3086,16 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 1.0 ); channelProcess->destroy(); epicsThreadSleep ( 1.0 ); - +*/ ChannelGetRequesterImpl channelGetRequesterImpl; - PVStructure* pvRequest = getCreateRequest()->createRequest("field(timeStamp,value)",&channelGetRequesterImpl); + PVStructure* pvRequest = getCreateRequest()->createRequest("field(value)",&channelGetRequesterImpl); ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, pvRequest); epicsThreadSleep ( 3.0 ); channelGet->get(false); epicsThreadSleep ( 3.0 ); - //TODOchannelGet->destroy(); + channelGet->destroy(); epicsThreadSleep ( 1.0 ); - +/* ChannelPutRequesterImpl channelPutRequesterImpl; ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, pvRequest); epicsThreadSleep ( 1.0 ); @@ -3101,10 +3103,9 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 1.0 ); channelPut->put(false); epicsThreadSleep ( 1.0 ); - //TODOchannelPut->destroy(); + channelPut->destroy(); + - // TODO delete pvRequest -*/ /* MonitorRequesterImpl monitorRequesterImpl; Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, 0); @@ -3122,6 +3123,10 @@ int main(int argc,char *argv[]) monitor->destroy(); */ + + // TODO share it? + delete pvRequest; + epicsThreadSleep ( 3.0 ); channel->destroy();