add ChannelBaseRequester::channelDisconnect()

per-operation notification of channel disconnect/destroy.
This commit is contained in:
Michael Davidsaver
2017-05-25 18:01:41 -04:00
parent d7a7b81900
commit 7c2a093338
6 changed files with 109 additions and 64 deletions

View File

@@ -145,7 +145,7 @@ protected:
ChannelImpl::shared_pointer m_channel;
Requester::shared_pointer m_requester;
ChannelBaseRequester::shared_pointer m_requester;
/* negative... */
static const int NULL_REQUEST = -1;
@@ -170,7 +170,7 @@ protected:
virtual ~BaseRequestImpl() {};
BaseRequestImpl(ChannelImpl::shared_pointer const & channel, Requester::shared_pointer const & requester) :
BaseRequestImpl(ChannelImpl::shared_pointer const & channel, ChannelBaseRequester::shared_pointer const & requester) :
m_channel(channel),
m_requester(requester),
m_ioid(INVALID_IOID),
@@ -213,7 +213,7 @@ protected:
public:
// used to send message to this request
Requester::shared_pointer getRequester() {
ChannelBaseRequester::shared_pointer getRequester() {
return m_requester;
}
@@ -1969,7 +1969,7 @@ public:
PVACCESS_REFCOUNT_MONITOR_DESTRUCT(channelGetField);
}
Requester::shared_pointer getRequester() {
ChannelBaseRequester::shared_pointer getRequester() {
return m_callback;
}
@@ -4043,10 +4043,11 @@ private:
void reportChannelStateChange()
{
Channel::shared_pointer thisPointer = shared_from_this();
Channel::shared_pointer self(shared_from_this());
while (true)
{
std::vector<ResponseRequest::weak_pointer> ops;
ConnectionState connectionState;
{
Lock guard(m_channelMutex);
@@ -4054,9 +4055,28 @@ private:
break;
connectionState = channelStateChangeQueue.front();
channelStateChangeQueue.pop();
if(connectionState==Channel::DISCONNECTED || connectionState==Channel::DESTROYED) {
Lock guard(m_responseRequestsMutex);
ops.reserve(m_responseRequests.size());
for(IOIDResponseRequestMap::const_iterator it = m_responseRequests.begin(),
end = m_responseRequests.end();
it!=end; ++it)
{
ops.push_back(it->second);
}
}
}
EXCEPTION_GUARD(m_requester->channelStateChange(thisPointer, connectionState));
EXCEPTION_GUARD(m_requester->channelStateChange(self, connectionState));
if(connectionState==Channel::DISCONNECTED || connectionState==Channel::DESTROYED) {
for(size_t i=0, N=ops.size(); i<N; i++) {
ResponseRequest::shared_pointer R(ops[i].lock());
if(!R) continue;
EXCEPTION_GUARD(R->getRequester()->channelDisconnect(connectionState==Channel::DESTROYED);)
}
}
}