Remove clearSendQueue

Use BreakTransport exception instead
This commit is contained in:
Michael Davidsaver
2015-11-18 16:20:49 -06:00
parent f2b47ef5e9
commit 4a1bfff40f
4 changed files with 38 additions and 62 deletions

View File

@@ -32,6 +32,18 @@ using namespace std;
using namespace epics::pvData;
using namespace epics::pvAccess;
namespace {
struct BreakTransport : TransportSender
{
virtual ~BreakTransport() {}
virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control)
{
throw epics::pvAccess::detail::connection_closed_exception("Break");
}
virtual void lock() {}
virtual void unlock() {}
};
} // namespace
namespace epics {
namespace pvAccess {
@@ -883,12 +895,6 @@ namespace epics {
}
void AbstractCodec::clearSendQueue()
{
_sendQueue.clear();
}
void AbstractCodec::enqueueSendRequest(
TransportSender::shared_pointer const & sender) {
_sendQueue.push_back(sender);
@@ -1044,6 +1050,13 @@ namespace epics {
.autostart(false))
{ _isOpen.getAndSet(true);}
BlockingAbstractCodec::~BlockingAbstractCodec()
{
assert(!_isOpen.get());
_sendThread.exitWait();
_readThread.exitWait();
}
void BlockingAbstractCodec::readPollOne() {
throw std::logic_error("should not be called for blocking IO");
}
@@ -1061,18 +1074,21 @@ namespace epics {
// always close in the same thread, same way, etc.
// wakeup processSendQueue
// clean resources
// clean resources (close socket)
internalClose(true);
// this is important to avoid cyclic refs (memory leak)
clearSendQueue();
// Break sender from queue wait
BreakTransport::shared_pointer B(new BreakTransport);
enqueueSendRequest(B);
// post close
internalPostClose(true);
}
}
void BlockingAbstractCodec::internalClose(bool /*force*/) {
void BlockingAbstractCodec::internalClose(bool /*force*/)
{
this->internalDestroy();
}
void BlockingAbstractCodec::internalPostClose(bool /*force*/) {
@@ -1143,18 +1159,7 @@ namespace epics {
__FILE__, __LINE__);
}
}
/*
// wait read thread to die
// TODO rewise if this is really needed
// this timeout is needed where close() is initiated from the send thread,
// and not from the read thread as usualy - recv() does not exit until socket is not destroyed,
// which is done the internalDestroy() call below
bac->_shutdownEvent.wait(3.0);
*/
// call internal destroy
this->internalDestroy();
_sendQueue.clear();
}
@@ -1233,7 +1238,7 @@ namespace epics {
epicsSocketDestroy(_channel);
}
_channel = INVALID_SOCKET;
_channel = INVALID_SOCKET; //TODO: mutex to guard _channel
}
}
@@ -1863,7 +1868,10 @@ namespace epics {
// not used anymore, close it
// TODO consider delayed destruction (can improve performance!!!)
if(_owners.size()==0) close(); // TODO close(false)
if(_owners.size()==0) {
lock.unlock();
close();
}
}
void BlockingClientTCPTransportCodec::aliveNotification() {

View File

@@ -193,7 +193,6 @@ namespace epics {
void processWrite();
void processRead();
void processSendQueue();
void clearSendQueue();
void enqueueSendRequest(TransportSender::shared_pointer const & sender);
void enqueueSendRequest(TransportSender::shared_pointer const & sender,
std::size_t requiredBufferSize);
@@ -285,6 +284,7 @@ namespace epics {
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & receiveBuffer,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & sendBuffer,
int32_t socketSendBufferSize);
virtual ~BlockingAbstractCodec();
void readPollOne();
void writePollOne();

View File

@@ -3877,6 +3877,9 @@ namespace epics {
* @param remoteDestroy issue channel destroy request.
*/
void disconnect(bool initiateSearch, bool remoteDestroy) {
// order of oldchan and guard is important to ensure
// oldchan is destoryed after unlock
Transport::shared_pointer oldchan;
Lock guard(m_channelMutex);
if (m_connectionState != CONNECTED)
@@ -3900,7 +3903,7 @@ namespace epics {
}
m_transport->release(getID());
m_transport.reset();
oldchan.swap(m_transport);
}
if (initiateSearch)

View File

@@ -436,7 +436,7 @@ namespace epics {
public:
int runAllTest() {
testPlan(5885);
testPlan(5883);
testHeaderProcess();
testInvalidHeaderMagic();
testInvalidHeaderSegmentedInNormal();
@@ -462,7 +462,6 @@ namespace epics {
testSendException();
testSendHugeMessagePartes();
testRecipient();
testClearSendQueue();
testInvalidArguments();
testDefaultModes();
testEnqueueSendRequestExceptionThrown();
@@ -2935,40 +2934,6 @@ namespace epics {
TestCodec &_codec;
};
void testClearSendQueue()
{
testDiag("BEGIN TEST %s:", CURRENT_FUNCTION);
TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE);
std::tr1::shared_ptr<TransportSender> sender =
std::tr1::shared_ptr<TransportSender>(
new TransportSenderForTestClearSendQueue(codec));
std::tr1::shared_ptr<TransportSender> sender2 =
std::tr1::shared_ptr<TransportSender>(
new TransportSender2ForTestClearSendQueue(codec));
codec.enqueueSendRequest(sender);
codec.enqueueSendRequest(sender2);
codec.clearSendQueue();
codec.breakSender();
try{
codec.processSendQueue();
}catch(sender_break&) {}
testOk(0 == codec.getSendBuffer()->getPosition(),
"%s: 0 == codec.getSendBuffer()->getPosition()",
CURRENT_FUNCTION);
testOk(0 == codec._writeBuffer.getPosition(),
"%s: 0 == codec._writeBuffer.getPosition()",
CURRENT_FUNCTION);
}
void testInvalidArguments()
{
testDiag("BEGIN TEST %s:", CURRENT_FUNCTION);