From d0db4588ee32d5bd625af16241aad99eee5f1fae Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Tue, 1 Feb 2011 00:38:11 +0100 Subject: [PATCH] mm of TransportSender --- pvAccessApp/remote/beaconEmitter.cpp | 10 ++ pvAccessApp/remote/beaconEmitter.h | 3 + pvAccessApp/remote/blockingTCP.h | 16 ++ pvAccessApp/remote/blockingTCPAcceptor.cpp | 4 +- pvAccessApp/remote/blockingTCPTransport.cpp | 22 +++ pvAccessApp/remote/remote.h | 4 +- .../remoteClient/clientContextImpl.cpp | 157 ++++++++++-------- pvAccessApp/server/responseHandlers.cpp | 7 + testApp/remote/testBlockingTCPClnt.cpp | 4 + testApp/remote/testBlockingUDPClnt.cpp | 4 + testApp/remote/testRemoteClientImpl.cpp | 6 +- 11 files changed, 163 insertions(+), 74 deletions(-) diff --git a/pvAccessApp/remote/beaconEmitter.cpp b/pvAccessApp/remote/beaconEmitter.cpp index e967757..911aec6 100644 --- a/pvAccessApp/remote/beaconEmitter.cpp +++ b/pvAccessApp/remote/beaconEmitter.cpp @@ -69,6 +69,16 @@ void BeaconEmitter::unlock() //noop } +void BeaconEmitter::acquire() +{ + //noop +} + +void BeaconEmitter::release() +{ + //noop +} + void BeaconEmitter::send(ByteBuffer* buffer, TransportSendControl* control) { // get server status diff --git a/pvAccessApp/remote/beaconEmitter.h b/pvAccessApp/remote/beaconEmitter.h index a3789c4..f8a4dcf 100644 --- a/pvAccessApp/remote/beaconEmitter.h +++ b/pvAccessApp/remote/beaconEmitter.h @@ -53,6 +53,9 @@ namespace epics { namespace pvAccess { * @see TransportSender#unlock() */ void unlock(); + + void acquire(); + void release(); void send(ByteBuffer* buffer, TransportSendControl* control); /** diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index 6061b17..99cb1a9 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -477,6 +477,14 @@ namespace epics { // noop } + virtual void acquire() { + // noop, since does not make sence on itself + } + + virtual void release() { + // noop, since does not make sence on itself + } + virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); @@ -651,6 +659,14 @@ namespace epics { // noop } + virtual void acquire() { + // noop, since does not make sence on itself + } + + virtual void release() { + // noop, since does not make sence on itself + } + /** * Verify transport. Server side is self-verified. */ diff --git a/pvAccessApp/remote/blockingTCPAcceptor.cpp b/pvAccessApp/remote/blockingTCPAcceptor.cpp index 33368cb..4cf3895 100644 --- a/pvAccessApp/remote/blockingTCPAcceptor.cpp +++ b/pvAccessApp/remote/blockingTCPAcceptor.cpp @@ -65,7 +65,7 @@ namespace epics { sizeof(strBuffer)); ostringstream temp; temp<<"Socket create error: "<extract()) + sender->release(); + delete _monitorSendQueue; + + while (sender = _sendQueue->extract()) + sender->release(); delete _sendQueue; + + delete _monitorSender; + + delete _socketBuffer; delete _sendBuffer; @@ -244,6 +261,7 @@ namespace epics { internalVerified = _verified; _verifiedMutex.unlock(); } + return internalVerified; } @@ -806,6 +824,7 @@ namespace epics { _sendBuffer->setPosition(_lastMessageStartPosition); } sender->unlock(); + sender->release(); } // if(sender!=NULL) } // while(!_closed) } @@ -869,6 +888,7 @@ printf("sendThreadRunnner exception\n"); void BlockingTCPTransport::enqueueSendRequest(TransportSender* sender) { Lock lock(&_sendQueueMutex); if(_closed) return; + sender->acquire(); _sendQueue->insert(sender); _sendQueueEvent.signal(); } @@ -876,6 +896,7 @@ printf("sendThreadRunnner exception\n"); void BlockingTCPTransport::enqueueMonitorSendRequest(TransportSender* sender) { Lock lock(&_monitorMutex); if(_closed) return; + sender->acquire(); _monitorSendQueue->insert(sender); if(_monitorSendQueue->size()==1) enqueueSendRequest(_monitorSender); } @@ -898,6 +919,7 @@ printf("sendThreadRunnner exception\n"); break; } sender->send(buffer, control); + sender->release(); } } diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index 1df7f51..52db98d 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -123,8 +123,8 @@ namespace epics { virtual void lock() =0; virtual void unlock() =0; - //virtual void acquire() =0; - //virtual void release() =0; + virtual void acquire() =0; + virtual void release() =0; }; /** diff --git a/pvAccessApp/remoteClient/clientContextImpl.cpp b/pvAccessApp/remoteClient/clientContextImpl.cpp index 9ccbf67..6ab1de8 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.cpp +++ b/pvAccessApp/remoteClient/clientContextImpl.cpp @@ -50,7 +50,8 @@ namespace epics { class BaseRequestImpl : public DataResponse, public SubscriptionRequest, - public TransportSender { + public TransportSender, + public Destroyable { protected: ChannelImpl* m_channel; @@ -70,6 +71,10 @@ namespace epics { int32 m_pendingRequest; Mutex m_mutex; + + int m_refCount; + + virtual ~BaseRequestImpl() {}; public: @@ -84,7 +89,7 @@ namespace epics { BaseRequestImpl(ChannelImpl* channel, Requester* requester) : m_channel(channel), m_context(channel->getContext()), m_requester(requester), m_destroyed(false), m_remotelyDestroyed(false), - m_pendingRequest(NULL_REQUEST) + m_pendingRequest(NULL_REQUEST), m_refCount(1) { // register response request m_ioid = m_context->registerResponseRequest(this); @@ -182,10 +187,11 @@ namespace epics { // destroy remote instance if (!m_remotelyDestroyed) { - // TODO !!! startRequest(PURE_DESTROY_REQUEST); - /// TODO !!! causes crash m_channel->checkAndGetTransport()->enqueueSendRequest(this); + startRequest(PURE_DESTROY_REQUEST); + m_channel->checkAndGetTransport()->enqueueSendRequest(this); } - + + release(); } virtual void timeout() { @@ -227,6 +233,20 @@ namespace epics { // noop } + virtual void acquire() { + Lock guard(&m_mutex); + m_refCount++; + } + + virtual void release() { + m_mutex.lock(); + m_refCount--; + m_mutex.unlock(); + if (m_refCount == 0) + delete this; + } + + }; @@ -257,6 +277,7 @@ namespace epics { ~ChannelProcessRequestImpl() { PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelProcess); + if (m_pvRequest) delete m_pvRequest; } public: @@ -343,14 +364,11 @@ namespace epics { startRequest(QOS_INIT); transport->enqueueSendRequest(this); } - + virtual void destroy() { BaseRequestImpl::destroy(); - if (m_pvRequest) delete m_pvRequest; - delete this; } - }; @@ -375,6 +393,10 @@ namespace epics { ~ChannelGetImpl() { PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelGet); + // synced by code calling this + if (m_data) delete m_data; + if (m_bitSet) delete m_bitSet; + if (m_pvRequest) delete m_pvRequest; } public: @@ -484,17 +506,10 @@ namespace epics { transport->enqueueSendRequest(this); } - virtual void destroy() { BaseRequestImpl::destroy(); - // synced by code above - if (m_data) delete m_data; - if (m_bitSet) delete m_bitSet; - if (m_pvRequest) delete m_pvRequest; - delete this; } - }; @@ -523,6 +538,10 @@ namespace epics { ~ChannelPutImpl() { PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelPut); + // synced by code calling this + if (m_data) delete m_data; + if (m_bitSet) delete m_bitSet; + if (m_pvRequest) delete m_pvRequest; } public: @@ -662,17 +681,10 @@ namespace epics { transport->enqueueSendRequest(this); } - virtual void destroy() { BaseRequestImpl::destroy(); - // TODO sync - if (m_data) delete m_data; - if (m_bitSet) delete m_bitSet; - if (m_pvRequest) delete m_pvRequest; - delete this; } - }; @@ -698,6 +710,10 @@ namespace epics { ~ChannelPutGetImpl() { PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelPutGet); + // synced by code calling this + if (m_putData) delete m_putData; + if (m_getData) delete m_getData; + if (m_pvRequest) delete m_pvRequest; } public: @@ -878,17 +894,10 @@ namespace epics { transport->enqueueSendRequest(this); } - virtual void destroy() { BaseRequestImpl::destroy(); - // TODO sync - if (m_putData) delete m_putData; - if (m_getData) delete m_getData; - if (m_pvRequest) delete m_pvRequest; - delete this; } - }; @@ -916,6 +925,10 @@ namespace epics { ~ChannelRPCImpl() { PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelRPC); + // synced by code calling this + if (m_data) delete m_data; + if (m_bitSet) delete m_bitSet; + if (m_pvRequest) delete m_pvRequest; } public: @@ -1026,17 +1039,10 @@ namespace epics { transport->enqueueSendRequest(this); } - virtual void destroy() { BaseRequestImpl::destroy(); - // TODO sync - if (m_data) delete m_data; - if (m_bitSet) delete m_bitSet; - if (m_pvRequest) delete m_pvRequest; - delete this; } - }; @@ -1068,6 +1074,9 @@ namespace epics { ~ChannelArrayImpl() { PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelArray); + // synced by code calling this + if (m_data) delete m_data; + if (m_pvRequest) delete m_pvRequest; } public: @@ -1247,16 +1256,10 @@ namespace epics { transport->enqueueSendRequest(this); } - virtual void destroy() { BaseRequestImpl::destroy(); - // TODO sync - if (m_data) delete m_data; - if (m_pvRequest) delete m_pvRequest; - delete this; } - }; @@ -1280,7 +1283,8 @@ namespace epics { String m_subField; Mutex m_mutex; bool m_destroyed; - + int m_refCount; + private: ~ChannelGetFieldRequestImpl() { @@ -1292,7 +1296,7 @@ namespace epics { ChannelGetFieldRequestImpl(ChannelImpl* channel, GetFieldRequester* callback, String subField) : m_channel(channel), m_context(channel->getContext()), m_callback(callback), m_subField(subField), - m_destroyed(false) + m_destroyed(false), m_refCount(1) { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelGetField); @@ -1361,7 +1365,20 @@ namespace epics { m_context->unregisterResponseRequest(this); m_channel->unregisterResponseRequest(this); - delete this; + release(); + } + + virtual void acquire() { + Lock guard(&m_mutex); + m_refCount++; + } + + virtual void release() { + m_mutex.lock(); + m_refCount--; + m_mutex.unlock(); + if (m_refCount == 0) + delete this; } virtual void response(Transport* transport, int8 version, ByteBuffer* payloadBuffer) { @@ -1436,6 +1453,19 @@ namespace epics { ~ChannelMonitorImpl() { PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelMonitor); + + // synced by code calling this + if (m_pvRequest) delete m_pvRequest; + // uncomment when m_pvStructure not destroyed if (m_structure) m_structure->decReferenceCount(); + + // TODO temp + if (m_pvStructure) + { + delete m_pvStructure; + delete m_overrunBitSet; + delete m_changedBitSet; + } + } public: @@ -1535,27 +1565,6 @@ namespace epics { transport->enqueueSendRequest(this); } - - virtual void destroy() - { - BaseRequestImpl::destroy(); - // TODO sync - if (m_pvRequest) delete m_pvRequest; - // uncomment when m_pvStructure not destroyed if (m_structure) m_structure->decReferenceCount(); - - // TODO temp - if (m_pvStructure) - { - delete m_pvStructure; - delete m_overrunBitSet; - delete m_changedBitSet; - } - - delete this; - } - - - // override, since we optimize status virtual void response(Transport* transport, int8 version, ByteBuffer* payloadBuffer) { // TODO? @@ -1644,6 +1653,11 @@ namespace epics { } + virtual void destroy() + { + BaseRequestImpl::destroy(); + } + // ============ temp ============ virtual MonitorElement* poll() @@ -2508,6 +2522,15 @@ namespace epics { Lock guard(&m_channelMutex); m_references++; } + + virtual void release() { + m_channelMutex.lock(); + m_references--; + m_channelMutex.unlock(); + // if (m_references == 0) + // delete this; + } +// TTTOOOOOOODOOOOOO !!! /** * Actual destroy method, to be called CAJContext. diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index 2e65cbe..c677177 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -136,8 +136,15 @@ namespace epics { } virtual void unlock() { + } + + virtual void acquire() { + } + + virtual void release() { delete this; } + private: osiSockAddr _echoFrom; diff --git a/testApp/remote/testBlockingTCPClnt.cpp b/testApp/remote/testBlockingTCPClnt.cpp index 43226e1..f988736 100644 --- a/testApp/remote/testBlockingTCPClnt.cpp +++ b/testApp/remote/testBlockingTCPClnt.cpp @@ -115,6 +115,10 @@ public: } virtual void unlock() { } + virtual void acquire() { + } + virtual void release() { + } private: char data[20]; int count; diff --git a/testApp/remote/testBlockingUDPClnt.cpp b/testApp/remote/testBlockingUDPClnt.cpp index c74b6ad..8e86412 100644 --- a/testApp/remote/testBlockingUDPClnt.cpp +++ b/testApp/remote/testBlockingUDPClnt.cpp @@ -87,6 +87,10 @@ public: } virtual void unlock() { } + virtual void acquire() { + } + virtual void release() { + } private: char data[20]; int count; diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index bc8e835..faacd7c 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -451,7 +451,7 @@ int main(int argc,char *argv[]) channel->printInfo(); PVStructure* pvRequest; -/* + GetFieldRequesterImpl getFieldRequesterImpl; channel->getField(&getFieldRequesterImpl, ""); epicsThreadSleep ( 1.0 ); @@ -521,7 +521,7 @@ int main(int argc,char *argv[]) channelArray->setLength(false,3,4); epicsThreadSleep ( 1.0 ); channelArray->destroy(); -*/ + MonitorRequesterImpl monitorRequesterImpl; pvRequest = getCreateRequest()->createRequest("field()",&monitorRequesterImpl); Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, pvRequest); @@ -532,7 +532,7 @@ int main(int argc,char *argv[]) std::cout << "monitor->start() = " << status->toString() << std::endl; delete status; - epicsThreadSleep( 30.0 ); + epicsThreadSleep( 3.0 ); status = monitor->stop(); std::cout << "monitor->stop() = " << status->toString() << std::endl;