diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp
index 4452616..ac051bb 100644
--- a/pvAccessApp/remote/blockingTCPTransport.cpp
+++ b/pvAccessApp/remote/blockingTCPTransport.cpp
@@ -587,7 +587,7 @@ namespace epics {
this, version, _command, _payloadSize,
_socketBuffer);
} catch(...) {
- //noop
+ //noop // TODO print?
}
/*
diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h
index 24f4672..6c4ecb0 100644
--- a/pvAccessApp/remote/remote.h
+++ b/pvAccessApp/remote/remote.h
@@ -98,13 +98,22 @@ namespace epics {
virtual void setRecipient(const osiSockAddr& sendTo) =0;
};
+
+ /**
+ * Reference counting instance.
+ */
+ class ReferenceCountingInstance {
+ public:
+ virtual void acquire() =0;
+ virtual void release() =0;
+ };
/**
* Interface defining transport sender (instance sending data over transport).
* @author Matej Sekoranja
* @version $Id: TransportSender.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
*/
- class TransportSender {
+ class TransportSender : public ReferenceCountingInstance {
public:
virtual ~TransportSender() {
}
@@ -122,9 +131,6 @@ namespace epics {
virtual void lock() =0;
virtual void unlock() =0;
-
- virtual void acquire() =0;
- virtual void release() =0;
};
/**
@@ -507,7 +513,7 @@ namespace epics {
* This interface needs to be extended (to provide method called on response).
* @author Matej Sekoranja
*/
- class ResponseRequest {
+ class ResponseRequest : public ReferenceCountingInstance {
public:
virtual ~ResponseRequest() {}
diff --git a/pvAccessApp/remoteClient/clientContextImpl.cpp b/pvAccessApp/remoteClient/clientContextImpl.cpp
index f799a81..71a5920 100644
--- a/pvAccessApp/remoteClient/clientContextImpl.cpp
+++ b/pvAccessApp/remoteClient/clientContextImpl.cpp
@@ -43,6 +43,16 @@ namespace epics {
catch (std::exception &e) { errlogSevPrintf(errlogMajor, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \
catch (...) { errlogSevPrintf(errlogMajor, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); }
+ class ResponseRequestGuard {
+ private:
+ ResponseRequest* m_rr;
+ public:
+ // no, don't be tempted to acquire here (must be done in getResponseRequest())
+ ResponseRequestGuard(ResponseRequest* rr) : m_rr(rr) {};
+ ~ResponseRequestGuard() { if (m_rr) m_rr->release(); };
+ ResponseRequest* get() const { return m_rr; };
+ };
+
/**
* Base channel request.
* @author Matej Sekoranja
@@ -237,6 +247,8 @@ namespace epics {
else if (qos == PURE_DESTROY_REQUEST)
{
control->startMessage((int8)15, 8);
+ // NOTE: reference to the channel that can be deleted
+ // however CHANNEL_DESTROY request to the server keeps it alive
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
}
@@ -1799,10 +1811,10 @@ namespace epics {
AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
transport->ensureData(4);
- ResponseRequest* rr = _context->getResponseRequest(payloadBuffer->getInt());
- if (rr)
+ ResponseRequestGuard rr(_context->getResponseRequest(payloadBuffer->getInt()));
+ if (rr.get())
{
- DataResponse* nrr = dynamic_cast(rr);
+ DataResponse* nrr = dynamic_cast(rr.get());
if (nrr)
nrr->response(transport, version, payloadBuffer);
}
@@ -1987,13 +1999,17 @@ namespace epics {
transport->ensureData(5);
- DataResponse* nrr = dynamic_cast(_context->getResponseRequest(payloadBuffer->getInt()));
- Requester* requester;
- if (nrr && (requester = nrr->getRequester()))
+ ResponseRequestGuard rr(_context->getResponseRequest(payloadBuffer->getInt()));
+ if (rr.get())
{
- MessageType type = (MessageType)payloadBuffer->getByte();
- String message = SerializeHelper::deserializeString(payloadBuffer, transport);
- requester->message(message, type); // TODO do we need to guard from exceptions
+ DataResponse* nrr = dynamic_cast(rr.get());
+ Requester* requester;
+ if (nrr && (requester = nrr->getRequester()))
+ {
+ MessageType type = (MessageType)payloadBuffer->getByte();
+ String message = SerializeHelper::deserializeString(payloadBuffer, transport);
+ requester->message(message, type);
+ }
}
}
@@ -2266,6 +2282,7 @@ namespace epics {
~InternalChannelImpl()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(channel);
+ if (m_addresses) delete m_addresses;
}
public:
@@ -2309,9 +2326,7 @@ namespace epics {
virtual void destroy()
{
- destroy(false); //TODO guard
- if (m_addresses) delete m_addresses;
- delete this;
+ destroy(false);
};
virtual String getRequesterName()
@@ -2520,18 +2535,21 @@ namespace epics {
* @param force force destruction regardless of reference count
*/
void destroy(bool force) {
+ {
Lock guard(&m_channelMutex);
if (m_connectionState == DESTROYED)
throw std::runtime_error("Channel already destroyed.");
-
+ }
+
// do destruction via context
m_context->destroyChannel(this, force);
}
-
- /**
- * Increment reference.
- */
+
+
+ // NOTE: this is used only to keep instance in memory
+ // it is not related to channel destroy; not a mechanism to
+ // allow channel sharing
void acquire() {
Lock guard(&m_channelMutex);
m_references++;
@@ -2541,10 +2559,19 @@ namespace epics {
m_channelMutex.lock();
m_references--;
m_channelMutex.unlock();
- // if (m_references == 0)
- // delete this;
+ if (m_references == 0)
+ {
+ if (m_transport)
+ {
+ // unresponsive state, do not forget to release transport
+ ReferenceCountingTransport* rct = dynamic_cast(m_transport);
+ if (rct) rct->release(this);
+ m_transport = 0;
+ }
+
+ delete this;
+ }
}
-// TTTOOOOOOODOOOOOO !!!
/**
* Actual destroy method, to be called CAJContext.
@@ -2554,37 +2581,39 @@ namespace epics {
* @throws IOException
*/
void destroyChannel(bool force) {
- Lock guard(&m_channelMutex);
-
- if (m_connectionState == DESTROYED)
- throw std::runtime_error("Channel already destroyed.");
-
- m_references--;
- if (m_references > 0 && !force)
- return;
-
- // stop searching...
- m_context->getChannelSearchManager()->unregisterChannel(this);
- cancel();
-
- disconnectPendingIO(true);
-
- if (m_connectionState == CONNECTED)
{
- disconnect(false, true);
+ Lock guard(&m_channelMutex);
+
+ if (m_connectionState == DESTROYED)
+ throw std::runtime_error("Channel already destroyed.");
+
+ // stop searching...
+ m_context->getChannelSearchManager()->unregisterChannel(this);
+ cancel();
+
+ disconnectPendingIO(true);
+
+ if (m_connectionState == CONNECTED)
+ {
+ disconnect(false, true);
+ }/*
+ // transport is release on instance release
+ else if (m_transport)
+ {
+ // unresponsive state, do not forget to release transport
+ ReferenceCountingTransport* rct = dynamic_cast(m_transport);
+ if (rct) rct->release(this);
+ m_transport = 0;
+ }*/
+
+ setConnectionState(DESTROYED);
+
+ // unregister
+ m_context->unregisterChannel(this);
}
- else if (m_transport)
- {
- // unresponsive state, do not forget to release transport
- ReferenceCountingTransport* rct = dynamic_cast(m_transport);
- if (rct) rct->release(this);
- m_transport = 0;
- }
-
- setConnectionState(DESTROYED);
-
- // unregister
- m_context->unregisterChannel(this);
+
+ // can be delete this
+ release();
}
/**
@@ -2612,13 +2641,17 @@ namespace epics {
{
if (remoteDestroy) {
m_issueCreateMessage = false;
- // TODO !!! this causes problems.. since qnqueueSendRequest is added and this instance deleted
- //m_transport->enqueueSendRequest(this);
+ // NOTE: this is neccesary, this holds this channel instance reference
+ // and keeps it alive so that ResponseRequest reference to this instance
+ // is valid; otherwise ResponseRequests should acquire this instance
+ m_transport->enqueueSendRequest(this);
}
-
+ /*
+ // will be release on this instance release
ReferenceCountingTransport* rct = dynamic_cast(m_transport);
if (rct) rct->release(this);
m_transport = 0;
+ */
}
if (initiateSearch)
@@ -2796,11 +2829,18 @@ namespace epics {
m_needSubscriptionUpdate = true;
+ int count = 0;
+ ResponseRequest* rrs[m_responseRequests.size()];
for (IOIDResponseRequestMap::iterator iter = m_responseRequests.begin();
iter != m_responseRequests.end();
iter++)
{
- EXCEPTION_GUARD(iter->second->reportStatus(status));
+ rrs[count++] = iter->second;
+ }
+
+ for (int i = 0; i< count; i++)
+ {
+ EXCEPTION_GUARD(rrs[i]->reportStatus(status));
}
}
@@ -2814,6 +2854,7 @@ namespace epics {
Transport* transport = getTransport();
+ // NOTE: elements cannot be removed within rrs->updateSubscription callbacks
for (IOIDResponseRequestMap::iterator iter = m_responseRequests.begin();
iter != m_responseRequests.end();
iter++)
@@ -2837,6 +2878,7 @@ namespace epics {
else
return; // noop
+ // NOTE: elements cannot be removed within rrs->updateSubscription callbacks
for (IOIDResponseRequestMap::iterator iter = m_responseRequests.begin();
iter != m_responseRequests.end();
iter++)
@@ -3359,7 +3401,10 @@ TODO
{
Lock guard(&m_ioidMapMutex);
IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(ioid);
- return (it == m_pendingResponseRequests.end() ? 0 : it->second);
+ if (it == m_pendingResponseRequests.end()) return 0;
+ ResponseRequest* rr = it->second;
+ rr->acquire();
+ return rr;
}
/**