From 730d30fe5487871a90f28630d1f622bd0fa16e9b Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Fri, 9 Oct 2015 17:14:17 -0400 Subject: [PATCH] AbstractCodec use fair_queue --- src/remote/codec.cpp | 33 ++++---- src/remote/codec.h | 121 ++------------------------- src/remote/remote.h | 3 +- testApp/remote/testCodec.cpp | 153 ++++++++++++++++++++++------------- 4 files changed, 120 insertions(+), 190 deletions(-) diff --git a/src/remote/codec.cpp b/src/remote/codec.cpp index 4b6fee0..9bb4e6d 100644 --- a/src/remote/codec.cpp +++ b/src/remote/codec.cpp @@ -53,7 +53,7 @@ namespace epics { //PROTECTED _readMode(NORMAL), _version(0), _flags(0), _command(0), _payloadSize(0), _remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV), _totalBytesSent(0), - _blockingProcessQueue(false), _senderThread(0), + _senderThread(0), _writeMode(PROCESS_SEND_QUEUE), _writeOpReady(false),_lowLatency(false), _socketBuffer(receiveBuffer), @@ -98,7 +98,6 @@ namespace epics { _maxSendPayloadSize = _sendBuffer->getSize() - 2*PVA_MESSAGE_HEADER_SIZE; _socketSendBufferSize = socketSendBufferSize; - _blockingProcessQueue = blockingProcessQueue; } @@ -851,7 +850,8 @@ namespace epics { std::size_t senderProcessed = 0; while (senderProcessed++ < MAX_MESSAGE_SEND) { - TransportSender::shared_pointer sender = _sendQueue.take(-1); + TransportSender::shared_pointer sender; + _sendQueue.pop_front_try(sender); if (sender.get() == 0) { // flush @@ -860,19 +860,20 @@ namespace epics { sendCompleted(); // do not schedule sending - if (_blockingProcessQueue) { - if (terminated()) // termination + if (terminated()) // termination break; - sender = _sendQueue.take(0); - // termination (we want to process even if shutdown) - if (sender.get() == 0) - break; - } - else - return; + // termination (we want to process even if shutdown) + _sendQueue.pop_front(sender); } - processSender(sender); + try{ + processSender(sender); + }catch(...){ + if (_sendBuffer->getPosition() > 0) + flush(true); + sendCompleted(); + throw; + } } } @@ -884,13 +885,13 @@ namespace epics { void AbstractCodec::clearSendQueue() { - _sendQueue.clean(); + _sendQueue.clear(); } void AbstractCodec::enqueueSendRequest( TransportSender::shared_pointer const & sender) { - _sendQueue.put(sender); + _sendQueue.push_back(sender); scheduleSend(); } @@ -1066,8 +1067,6 @@ namespace epics { // this is important to avoid cyclic refs (memory leak) clearSendQueue(); - _sendQueue.wakeup(); - // post close internalPostClose(true); } diff --git a/src/remote/codec.h b/src/remote/codec.h index a6c88c8..8d3084d 100644 --- a/src/remote/codec.h +++ b/src/remote/codec.h @@ -112,120 +112,6 @@ namespace epics { #endif - // TODO replace this queue with lock-free implementation - template - class queue { - public: - - queue(void) { } - //TODO - /*queue(queue const &T) = delete; - queue(queue &&T) = delete; - queue& operator=(const queue &T) = delete; - */ - ~queue(void) - { - } - - - bool empty(void) - { - epics::pvData::Lock lock(_queueMutex); - return _queue.empty(); - } - - void clean() - { - epics::pvData::Lock lock(_queueMutex); - _queue.clear(); - } - - - void wakeup() - { - if (!_wakeup.getAndSet(true)) - { - _queueEvent.signal(); - } - } - - - void put(T const & elem) - { - { - epics::pvData::Lock lock(_queueMutex); - _queue.push_back(elem); - } - - _queueEvent.signal(); - } - - - // TODO very sub-optimal (locks and empty() - pop() sequence; at least 2 locks!) - T take(int timeOut) - { - while (true) - { - - bool isEmpty = empty(); - - if (isEmpty) - { - - if (timeOut < 0) { - return T(); - } - - while (isEmpty) - { - - if (timeOut == 0) { - _queueEvent.wait(); - } - else { - _queueEvent.wait(timeOut); - } - - isEmpty = empty(); - if (isEmpty) - { - if (timeOut > 0) { // TODO spurious wakeup, but not critical - return T(); - } - else // if (timeout == 0) cannot be negative - { - if (_wakeup.getAndSet(false)) { - return T(); - } - } - } - } - } - else - { - epics::pvData::Lock lock(_queueMutex); - if (_queue.empty()) - return T(); - T sender = _queue.front(); - _queue.pop_front(); - return sender; - } - } - } - - size_t size() { - epics::pvData::Lock lock(_queueMutex); - return _queue.size(); - } - - private: - - std::deque _queue; - epics::pvData::Event _queueEvent; - epics::pvData::Mutex _queueMutex; - AtomicValue _wakeup; - }; - class epicsShareClass io_exception: public std::runtime_error { public: @@ -327,6 +213,10 @@ namespace epics { char* /*deserializeTo*/, std::size_t /*elementCount*/, std::size_t /*elementSize*/); + bool sendQueueEmpty() const { + return _sendQueue.empty(); + } + protected: virtual void sendBufferFull(int tries) = 0; @@ -341,7 +231,6 @@ namespace epics { int32_t _payloadSize; // TODO why not size_t? epics::pvData::int32 _remoteTransportSocketReceiveBufferSize; int64_t _totalBytesSent; - bool _blockingProcessQueue; //TODO initialize union osiSockAddr _sendTo; epicsThreadId _senderThread; @@ -352,7 +241,7 @@ namespace epics { std::tr1::shared_ptr _socketBuffer; std::tr1::shared_ptr _sendBuffer; - queue _sendQueue; + fair_queue _sendQueue; private: diff --git a/src/remote/remote.h b/src/remote/remote.h index d024fb5..28f7133 100644 --- a/src/remote/remote.h +++ b/src/remote/remote.h @@ -24,6 +24,7 @@ #include #include #include +#include #ifdef remoteEpicsExportSharedSymbols # define epicsExportSharedSymbols @@ -142,7 +143,7 @@ namespace epics { /** * Interface defining transport sender (instance sending data over transport). */ - class TransportSender : public Lockable { + class TransportSender : public Lockable, public fair_queue::entry { public: POINTER_DEFINITIONS(TransportSender); diff --git a/testApp/remote/testCodec.cpp b/testApp/remote/testCodec.cpp index 250f27d..c476cfe 100644 --- a/testApp/remote/testCodec.cpp +++ b/testApp/remote/testCodec.cpp @@ -22,6 +22,33 @@ namespace epics { namespace pvAccess { + struct sender_break : public connection_closed_exception + { + sender_break() : connection_closed_exception("break") {} + }; + + struct TransportSenderDisconnect: public TransportSender { + void unlock() {} + void lock() {} + void send(ByteBuffer *buffer, TransportSendControl *control) + { + control->flush(true); + throw sender_break(); + } + }; + + struct TransportSenderSignal: public TransportSender { + Event *evt; + TransportSenderSignal(Event& evt) :evt(&evt) {} + void unlock() {} + void lock() {} + void send(ByteBuffer *buffer, TransportSendControl *control) + { + evt->signal(); + } + }; + + class PVAMessage { public: @@ -257,13 +284,6 @@ namespace epics { } - void endBlockedProcessSendQueue() { - //TODO not thread safe - _blockingProcessQueue = false; - _sendQueue.wakeup(); - } - - void close() { _closedCount++; } bool isOpen() { return _closedCount == 0; } @@ -288,6 +308,10 @@ namespace epics { void sendCompleted() { _sendCompletedCount++; } + void breakSender() { + enqueueSendRequest(std::tr1::shared_ptr(new TransportSenderDisconnect())); + } + bool terminated() { return false; } void cachedSerialize( @@ -412,7 +436,7 @@ namespace epics { public: int runAllTest() { - testPlan(5882); + testPlan(5885); testHeaderProcess(); testInvalidHeaderMagic(); testInvalidHeaderSegmentedInNormal(); @@ -2223,7 +2247,6 @@ namespace epics { "%s: codec._closedCount == 1", CURRENT_FUNCTION); } - class TransportSenderForTestEnqueueSendRequest: public TransportSender { public: @@ -2290,7 +2313,10 @@ namespace epics { // process codec.enqueueSendRequest(sender); codec.enqueueSendRequest(sender2); - codec.processSendQueue(); + codec.breakSender(); + try{ + codec.processSendQueue(); + }catch(sender_break&) {} codec.transferToReadBuffer(); @@ -2430,11 +2456,16 @@ namespace epics { //was processed testOk(0 == codec._sendCompletedCount, "%s: 0 == codec._sendCompletedCount", CURRENT_FUNCTION); + testOk1(!codec.sendQueueEmpty()); - codec.processSendQueue(); + codec.breakSender(); + try{ + codec.processSendQueue(); + }catch(sender_break&) {} testOk(1 == codec._sendCompletedCount, "%s: 1 == codec._sendCompletedCount", CURRENT_FUNCTION); + testOk1(codec.sendQueueEmpty()); codec.transferToReadBuffer(); @@ -2483,6 +2514,13 @@ namespace epics { "%s: 0 == codec.getSendBuffer()->getPosition()", CURRENT_FUNCTION); + testOk1(codec.sendQueueEmpty()); + + testDiag("%u %u", (unsigned)codec._scheduleSendCount, + (unsigned)codec._sendCompletedCount); + testOk1(3 == codec._scheduleSendCount); + testOk1(1 == codec._sendCompletedCount); + // now queue is empty and thread is right codec.enqueueSendRequest(sender2, PVA_MESSAGE_HEADER_SIZE); @@ -2491,12 +2529,17 @@ namespace epics { "%s: PVA_MESSAGE_HEADER_SIZE == " "codec.getSendBuffer()->getPosition()", CURRENT_FUNCTION); - testOk(3 == codec._scheduleSendCount, - "%s: 3 == codec._scheduleSendCount", CURRENT_FUNCTION); - testOk(1 == codec._sendCompletedCount, - "%s: 1 == codec._sendCompletedCount", CURRENT_FUNCTION); - codec.processWrite(); + testDiag("%u %u", (unsigned)codec._scheduleSendCount, + (unsigned)codec._sendCompletedCount); + testOk1(4 == codec._scheduleSendCount); + testOk1(1 == codec._sendCompletedCount); + + codec.breakSender(); + + try{ + codec.processWrite(); + }catch(sender_break&) {} testOk(2 == codec._sendCompletedCount, "%s: 2 == codec._sendCompletedCount", CURRENT_FUNCTION); @@ -2575,7 +2618,10 @@ namespace epics { // process codec.enqueueSendRequest(sender); - codec.processSendQueue(); + codec.breakSender(); + try{ + codec.processSendQueue(); + }catch(sender_break&) {} codec.transferToReadBuffer(); @@ -2664,7 +2710,7 @@ namespace epics { // process codec.enqueueSendRequest(sender); - + codec.breakSender(); try { codec.processSendQueue(); @@ -2781,7 +2827,10 @@ namespace epics { // process codec.enqueueSendRequest(sender); - codec.processSendQueue(); + codec.breakSender(); + try{ + codec.processSendQueue(); + }catch(sender_break&) {} codec.addToReadBuffer(); @@ -2906,7 +2955,10 @@ namespace epics { codec.clearSendQueue(); - codec.processSendQueue(); + codec.breakSender(); + try{ + codec.processSendQueue(); + }catch(sender_break&) {} testOk(0 == codec.getSendBuffer()->getPosition(), "%s: 0 == codec.getSendBuffer()->getPosition()", @@ -3128,6 +3180,7 @@ namespace epics { TransportSendControl* control) { _codec.putControlMessage((int8_t)0x01, 0x00112233); + _codec.flush(true); } private: @@ -3135,16 +3188,20 @@ namespace epics { }; - class ValueHolder { + class ValueHolder : public Runnable { public: - ValueHolder( - TestCodec &testCodec, - AtomicValue &processTreadExited): - _testCodec(testCodec), - _processTreadExited(processTreadExited) {} + ValueHolder(TestCodec &testCodec): + _testCodec(testCodec) {} TestCodec &_testCodec; - AtomicValue & _processTreadExited; + Event waiter; + + virtual void run() { + waiter.signal(); + try{ + _testCodec.processSendQueue(); + }catch(sender_break&) {} + } }; @@ -3155,56 +3212,40 @@ namespace epics { TestCodec codec(DEFAULT_BUFFER_SIZE, DEFAULT_BUFFER_SIZE, true); - _processTreadExited.getAndSet(false); std::tr1::shared_ptr sender = std::tr1::shared_ptr( new TransportSenderForTestBlockingProcessQueueTest(codec)); - ValueHolder valueHolder(codec, _processTreadExited); + ValueHolder valueHolder(codec); + Event done; - epicsThreadCreate( - "testBlockingProcessQueueTest-processThread", - epicsThreadPriorityMedium, - epicsThreadGetStackSize( - epicsThreadStackMedium), - CodecTest::blockingProcessQueueThread, - &valueHolder); + Thread thr(Thread::Config(&valueHolder) + .name("testBlockingProcessQueueTest-processThread")); - epicsThreadSleep(3); - - testOk(_processTreadExited.get() == false, - "%s: _processTreadExited.get() == false", - CURRENT_FUNCTION); + valueHolder.waiter.wait(); // let's put something into it codec.enqueueSendRequest(sender); + codec.enqueueSendRequest(std::tr1::shared_ptr(new TransportSenderSignal(done))); - epicsThreadSleep(1); + testDiag("Waiting for work"); + done.wait(); testOk((std::size_t)PVA_MESSAGE_HEADER_SIZE == codec._writeBuffer.getPosition(), "%s: PVA_MESSAGE_HEADER_SIZE == " - "codec._writeBuffer.getPosition()", - CURRENT_FUNCTION); + "codec._writeBuffer.getPosition() (%u)", + CURRENT_FUNCTION, + (unsigned)codec._writeBuffer.getPosition()); - codec.endBlockedProcessSendQueue(); + codec.breakSender(); - epicsThreadSleep(1); - - testOk(_processTreadExited.get() == true, - "%s: _processTreadExited.get() == true", CURRENT_FUNCTION); + thr.exitWait(); } private: - void static blockingProcessQueueThread(void *param) { - ValueHolder *valueHolder = static_cast(param); - // this should block - valueHolder->_testCodec.processSendQueue(); - valueHolder->_processTreadExited.getAndSet(true); - } - AtomicValue _processTreadExited; }; }