channel/responseRequest destruction

This commit is contained in:
Matej Sekoranja
2011-02-07 22:19:46 +01:00
parent f4180cfdb9
commit 88e33f1155
3 changed files with 111 additions and 60 deletions

View File

@@ -587,7 +587,7 @@ namespace epics {
this, version, _command, _payloadSize,
_socketBuffer);
} catch(...) {
//noop
//noop // TODO print?
}
/*

View File

@@ -98,13 +98,22 @@ namespace epics {
virtual void setRecipient(const osiSockAddr& sendTo) =0;
};
/**
* Reference counting instance.
*/
class ReferenceCountingInstance {
public:
virtual void acquire() =0;
virtual void release() =0;
};
/**
* Interface defining transport sender (instance sending data over transport).
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: TransportSender.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
*/
class TransportSender {
class TransportSender : public ReferenceCountingInstance {
public:
virtual ~TransportSender() {
}
@@ -122,9 +131,6 @@ namespace epics {
virtual void lock() =0;
virtual void unlock() =0;
virtual void acquire() =0;
virtual void release() =0;
};
/**
@@ -507,7 +513,7 @@ namespace epics {
* This interface needs to be extended (to provide method called on response).
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
*/
class ResponseRequest {
class ResponseRequest : public ReferenceCountingInstance {
public:
virtual ~ResponseRequest() {}

View File

@@ -43,6 +43,16 @@ namespace epics {
catch (std::exception &e) { errlogSevPrintf(errlogMajor, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \
catch (...) { errlogSevPrintf(errlogMajor, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); }
class ResponseRequestGuard {
private:
ResponseRequest* m_rr;
public:
// no, don't be tempted to acquire here (must be done in getResponseRequest())
ResponseRequestGuard(ResponseRequest* rr) : m_rr(rr) {};
~ResponseRequestGuard() { if (m_rr) m_rr->release(); };
ResponseRequest* get() const { return m_rr; };
};
/**
* Base channel request.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
@@ -237,6 +247,8 @@ namespace epics {
else if (qos == PURE_DESTROY_REQUEST)
{
control->startMessage((int8)15, 8);
// NOTE: reference to the channel that can be deleted
// however CHANNEL_DESTROY request to the server keeps it alive
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
}
@@ -1799,10 +1811,10 @@ namespace epics {
AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
transport->ensureData(4);
ResponseRequest* rr = _context->getResponseRequest(payloadBuffer->getInt());
if (rr)
ResponseRequestGuard rr(_context->getResponseRequest(payloadBuffer->getInt()));
if (rr.get())
{
DataResponse* nrr = dynamic_cast<DataResponse*>(rr);
DataResponse* nrr = dynamic_cast<DataResponse*>(rr.get());
if (nrr)
nrr->response(transport, version, payloadBuffer);
}
@@ -1987,13 +1999,17 @@ namespace epics {
transport->ensureData(5);
DataResponse* nrr = dynamic_cast<DataResponse*>(_context->getResponseRequest(payloadBuffer->getInt()));
Requester* requester;
if (nrr && (requester = nrr->getRequester()))
ResponseRequestGuard rr(_context->getResponseRequest(payloadBuffer->getInt()));
if (rr.get())
{
MessageType type = (MessageType)payloadBuffer->getByte();
String message = SerializeHelper::deserializeString(payloadBuffer, transport);
requester->message(message, type); // TODO do we need to guard from exceptions
DataResponse* nrr = dynamic_cast<DataResponse*>(rr.get());
Requester* requester;
if (nrr && (requester = nrr->getRequester()))
{
MessageType type = (MessageType)payloadBuffer->getByte();
String message = SerializeHelper::deserializeString(payloadBuffer, transport);
requester->message(message, type);
}
}
}
@@ -2266,6 +2282,7 @@ namespace epics {
~InternalChannelImpl()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(channel);
if (m_addresses) delete m_addresses;
}
public:
@@ -2309,9 +2326,7 @@ namespace epics {
virtual void destroy()
{
destroy(false); //TODO guard
if (m_addresses) delete m_addresses;
delete this;
destroy(false);
};
virtual String getRequesterName()
@@ -2520,18 +2535,21 @@ namespace epics {
* @param force force destruction regardless of reference count
*/
void destroy(bool force) {
{
Lock guard(&m_channelMutex);
if (m_connectionState == DESTROYED)
throw std::runtime_error("Channel already destroyed.");
}
// do destruction via context
m_context->destroyChannel(this, force);
}
/**
* Increment reference.
*/
// NOTE: this is used only to keep instance in memory
// it is not related to channel destroy; not a mechanism to
// allow channel sharing
void acquire() {
Lock guard(&m_channelMutex);
m_references++;
@@ -2541,10 +2559,19 @@ namespace epics {
m_channelMutex.lock();
m_references--;
m_channelMutex.unlock();
// if (m_references == 0)
// delete this;
if (m_references == 0)
{
if (m_transport)
{
// unresponsive state, do not forget to release transport
ReferenceCountingTransport* rct = dynamic_cast<ReferenceCountingTransport*>(m_transport);
if (rct) rct->release(this);
m_transport = 0;
}
delete this;
}
}
// TTTOOOOOOODOOOOOO !!!
/**
* Actual destroy method, to be called <code>CAJContext</code>.
@@ -2554,37 +2581,39 @@ namespace epics {
* @throws IOException
*/
void destroyChannel(bool force) {
Lock guard(&m_channelMutex);
if (m_connectionState == DESTROYED)
throw std::runtime_error("Channel already destroyed.");
m_references--;
if (m_references > 0 && !force)
return;
// stop searching...
m_context->getChannelSearchManager()->unregisterChannel(this);
cancel();
disconnectPendingIO(true);
if (m_connectionState == CONNECTED)
{
disconnect(false, true);
Lock guard(&m_channelMutex);
if (m_connectionState == DESTROYED)
throw std::runtime_error("Channel already destroyed.");
// stop searching...
m_context->getChannelSearchManager()->unregisterChannel(this);
cancel();
disconnectPendingIO(true);
if (m_connectionState == CONNECTED)
{
disconnect(false, true);
}/*
// transport is release on instance release
else if (m_transport)
{
// unresponsive state, do not forget to release transport
ReferenceCountingTransport* rct = dynamic_cast<ReferenceCountingTransport*>(m_transport);
if (rct) rct->release(this);
m_transport = 0;
}*/
setConnectionState(DESTROYED);
// unregister
m_context->unregisterChannel(this);
}
else if (m_transport)
{
// unresponsive state, do not forget to release transport
ReferenceCountingTransport* rct = dynamic_cast<ReferenceCountingTransport*>(m_transport);
if (rct) rct->release(this);
m_transport = 0;
}
setConnectionState(DESTROYED);
// unregister
m_context->unregisterChannel(this);
// can be delete this
release();
}
/**
@@ -2612,13 +2641,17 @@ namespace epics {
{
if (remoteDestroy) {
m_issueCreateMessage = false;
// TODO !!! this causes problems.. since qnqueueSendRequest is added and this instance deleted
//m_transport->enqueueSendRequest(this);
// NOTE: this is neccesary, this holds this channel instance reference
// and keeps it alive so that ResponseRequest reference to this instance
// is valid; otherwise ResponseRequests should acquire this instance
m_transport->enqueueSendRequest(this);
}
/*
// will be release on this instance release
ReferenceCountingTransport* rct = dynamic_cast<ReferenceCountingTransport*>(m_transport);
if (rct) rct->release(this);
m_transport = 0;
*/
}
if (initiateSearch)
@@ -2796,11 +2829,18 @@ namespace epics {
m_needSubscriptionUpdate = true;
int count = 0;
ResponseRequest* rrs[m_responseRequests.size()];
for (IOIDResponseRequestMap::iterator iter = m_responseRequests.begin();
iter != m_responseRequests.end();
iter++)
{
EXCEPTION_GUARD(iter->second->reportStatus(status));
rrs[count++] = iter->second;
}
for (int i = 0; i< count; i++)
{
EXCEPTION_GUARD(rrs[i]->reportStatus(status));
}
}
@@ -2814,6 +2854,7 @@ namespace epics {
Transport* transport = getTransport();
// NOTE: elements cannot be removed within rrs->updateSubscription callbacks
for (IOIDResponseRequestMap::iterator iter = m_responseRequests.begin();
iter != m_responseRequests.end();
iter++)
@@ -2837,6 +2878,7 @@ namespace epics {
else
return; // noop
// NOTE: elements cannot be removed within rrs->updateSubscription callbacks
for (IOIDResponseRequestMap::iterator iter = m_responseRequests.begin();
iter != m_responseRequests.end();
iter++)
@@ -3359,7 +3401,10 @@ TODO
{
Lock guard(&m_ioidMapMutex);
IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(ioid);
return (it == m_pendingResponseRequests.end() ? 0 : it->second);
if (it == m_pendingResponseRequests.end()) return 0;
ResponseRequest* rr = it->second;
rr->acquire();
return rr;
}
/**