AbstractCodec use fair_queue

This commit is contained in:
Michael Davidsaver
2015-10-09 17:14:17 -04:00
parent db86e47659
commit 730d30fe54
4 changed files with 120 additions and 190 deletions

View File

@@ -53,7 +53,7 @@ namespace epics {
//PROTECTED
_readMode(NORMAL), _version(0), _flags(0), _command(0), _payloadSize(0),
_remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV), _totalBytesSent(0),
_blockingProcessQueue(false), _senderThread(0),
_senderThread(0),
_writeMode(PROCESS_SEND_QUEUE),
_writeOpReady(false),_lowLatency(false),
_socketBuffer(receiveBuffer),
@@ -98,7 +98,6 @@ namespace epics {
_maxSendPayloadSize =
_sendBuffer->getSize() - 2*PVA_MESSAGE_HEADER_SIZE;
_socketSendBufferSize = socketSendBufferSize;
_blockingProcessQueue = blockingProcessQueue;
}
@@ -851,7 +850,8 @@ namespace epics {
std::size_t senderProcessed = 0;
while (senderProcessed++ < MAX_MESSAGE_SEND)
{
TransportSender::shared_pointer sender = _sendQueue.take(-1);
TransportSender::shared_pointer sender;
_sendQueue.pop_front_try(sender);
if (sender.get() == 0)
{
// flush
@@ -860,19 +860,20 @@ namespace epics {
sendCompleted(); // do not schedule sending
if (_blockingProcessQueue) {
if (terminated()) // termination
if (terminated()) // termination
break;
sender = _sendQueue.take(0);
// termination (we want to process even if shutdown)
if (sender.get() == 0)
break;
}
else
return;
// termination (we want to process even if shutdown)
_sendQueue.pop_front(sender);
}
processSender(sender);
try{
processSender(sender);
}catch(...){
if (_sendBuffer->getPosition() > 0)
flush(true);
sendCompleted();
throw;
}
}
}
@@ -884,13 +885,13 @@ namespace epics {
void AbstractCodec::clearSendQueue()
{
_sendQueue.clean();
_sendQueue.clear();
}
void AbstractCodec::enqueueSendRequest(
TransportSender::shared_pointer const & sender) {
_sendQueue.put(sender);
_sendQueue.push_back(sender);
scheduleSend();
}
@@ -1066,8 +1067,6 @@ namespace epics {
// this is important to avoid cyclic refs (memory leak)
clearSendQueue();
_sendQueue.wakeup();
// post close
internalPostClose(true);
}

View File

@@ -112,120 +112,6 @@ namespace epics {
#endif
// TODO replace this queue with lock-free implementation
template<typename T>
class queue {
public:
queue(void) { }
//TODO
/*queue(queue const &T) = delete;
queue(queue &&T) = delete;
queue& operator=(const queue &T) = delete;
*/
~queue(void)
{
}
bool empty(void)
{
epics::pvData::Lock lock(_queueMutex);
return _queue.empty();
}
void clean()
{
epics::pvData::Lock lock(_queueMutex);
_queue.clear();
}
void wakeup()
{
if (!_wakeup.getAndSet(true))
{
_queueEvent.signal();
}
}
void put(T const & elem)
{
{
epics::pvData::Lock lock(_queueMutex);
_queue.push_back(elem);
}
_queueEvent.signal();
}
// TODO very sub-optimal (locks and empty() - pop() sequence; at least 2 locks!)
T take(int timeOut)
{
while (true)
{
bool isEmpty = empty();
if (isEmpty)
{
if (timeOut < 0) {
return T();
}
while (isEmpty)
{
if (timeOut == 0) {
_queueEvent.wait();
}
else {
_queueEvent.wait(timeOut);
}
isEmpty = empty();
if (isEmpty)
{
if (timeOut > 0) { // TODO spurious wakeup, but not critical
return T();
}
else // if (timeout == 0) cannot be negative
{
if (_wakeup.getAndSet(false)) {
return T();
}
}
}
}
}
else
{
epics::pvData::Lock lock(_queueMutex);
if (_queue.empty())
return T();
T sender = _queue.front();
_queue.pop_front();
return sender;
}
}
}
size_t size() {
epics::pvData::Lock lock(_queueMutex);
return _queue.size();
}
private:
std::deque<T> _queue;
epics::pvData::Event _queueEvent;
epics::pvData::Mutex _queueMutex;
AtomicValue<bool> _wakeup;
};
class epicsShareClass io_exception: public std::runtime_error {
public:
@@ -327,6 +213,10 @@ namespace epics {
char* /*deserializeTo*/,
std::size_t /*elementCount*/, std::size_t /*elementSize*/);
bool sendQueueEmpty() const {
return _sendQueue.empty();
}
protected:
virtual void sendBufferFull(int tries) = 0;
@@ -341,7 +231,6 @@ namespace epics {
int32_t _payloadSize; // TODO why not size_t?
epics::pvData::int32 _remoteTransportSocketReceiveBufferSize;
int64_t _totalBytesSent;
bool _blockingProcessQueue;
//TODO initialize union
osiSockAddr _sendTo;
epicsThreadId _senderThread;
@@ -352,7 +241,7 @@ namespace epics {
std::tr1::shared_ptr<epics::pvData::ByteBuffer> _socketBuffer;
std::tr1::shared_ptr<epics::pvData::ByteBuffer> _sendBuffer;
queue<TransportSender::shared_pointer> _sendQueue;
fair_queue<TransportSender> _sendQueue;
private:

View File

@@ -24,6 +24,7 @@
#include <pv/timer.h>
#include <pv/pvData.h>
#include <pv/sharedPtr.h>
#include <pv/fairQueue.h>
#ifdef remoteEpicsExportSharedSymbols
# define epicsExportSharedSymbols
@@ -142,7 +143,7 @@ namespace epics {
/**
* Interface defining transport sender (instance sending data over transport).
*/
class TransportSender : public Lockable {
class TransportSender : public Lockable, public fair_queue<TransportSender>::entry {
public:
POINTER_DEFINITIONS(TransportSender);

View File

@@ -22,6 +22,33 @@ namespace epics {
namespace pvAccess {
struct sender_break : public connection_closed_exception
{
sender_break() : connection_closed_exception("break") {}
};
struct TransportSenderDisconnect: public TransportSender {
void unlock() {}
void lock() {}
void send(ByteBuffer *buffer, TransportSendControl *control)
{
control->flush(true);
throw sender_break();
}
};
struct TransportSenderSignal: public TransportSender {
Event *evt;
TransportSenderSignal(Event& evt) :evt(&evt) {}
void unlock() {}
void lock() {}
void send(ByteBuffer *buffer, TransportSendControl *control)
{
evt->signal();
}
};
class PVAMessage {
public:
@@ -257,13 +284,6 @@ namespace epics {
}
void endBlockedProcessSendQueue() {
//TODO not thread safe
_blockingProcessQueue = false;
_sendQueue.wakeup();
}
void close() { _closedCount++; }
bool isOpen() { return _closedCount == 0; }
@@ -288,6 +308,10 @@ namespace epics {
void sendCompleted() { _sendCompletedCount++; }
void breakSender() {
enqueueSendRequest(std::tr1::shared_ptr<TransportSender>(new TransportSenderDisconnect()));
}
bool terminated() { return false; }
void cachedSerialize(
@@ -412,7 +436,7 @@ namespace epics {
public:
int runAllTest() {
testPlan(5882);
testPlan(5885);
testHeaderProcess();
testInvalidHeaderMagic();
testInvalidHeaderSegmentedInNormal();
@@ -2223,7 +2247,6 @@ namespace epics {
"%s: codec._closedCount == 1", CURRENT_FUNCTION);
}
class TransportSenderForTestEnqueueSendRequest:
public TransportSender {
public:
@@ -2290,7 +2313,10 @@ namespace epics {
// process
codec.enqueueSendRequest(sender);
codec.enqueueSendRequest(sender2);
codec.processSendQueue();
codec.breakSender();
try{
codec.processSendQueue();
}catch(sender_break&) {}
codec.transferToReadBuffer();
@@ -2430,11 +2456,16 @@ namespace epics {
//was processed
testOk(0 == codec._sendCompletedCount,
"%s: 0 == codec._sendCompletedCount", CURRENT_FUNCTION);
testOk1(!codec.sendQueueEmpty());
codec.processSendQueue();
codec.breakSender();
try{
codec.processSendQueue();
}catch(sender_break&) {}
testOk(1 == codec._sendCompletedCount,
"%s: 1 == codec._sendCompletedCount", CURRENT_FUNCTION);
testOk1(codec.sendQueueEmpty());
codec.transferToReadBuffer();
@@ -2483,6 +2514,13 @@ namespace epics {
"%s: 0 == codec.getSendBuffer()->getPosition()",
CURRENT_FUNCTION);
testOk1(codec.sendQueueEmpty());
testDiag("%u %u", (unsigned)codec._scheduleSendCount,
(unsigned)codec._sendCompletedCount);
testOk1(3 == codec._scheduleSendCount);
testOk1(1 == codec._sendCompletedCount);
// now queue is empty and thread is right
codec.enqueueSendRequest(sender2, PVA_MESSAGE_HEADER_SIZE);
@@ -2491,12 +2529,17 @@ namespace epics {
"%s: PVA_MESSAGE_HEADER_SIZE == "
"codec.getSendBuffer()->getPosition()",
CURRENT_FUNCTION);
testOk(3 == codec._scheduleSendCount,
"%s: 3 == codec._scheduleSendCount", CURRENT_FUNCTION);
testOk(1 == codec._sendCompletedCount,
"%s: 1 == codec._sendCompletedCount", CURRENT_FUNCTION);
codec.processWrite();
testDiag("%u %u", (unsigned)codec._scheduleSendCount,
(unsigned)codec._sendCompletedCount);
testOk1(4 == codec._scheduleSendCount);
testOk1(1 == codec._sendCompletedCount);
codec.breakSender();
try{
codec.processWrite();
}catch(sender_break&) {}
testOk(2 == codec._sendCompletedCount,
"%s: 2 == codec._sendCompletedCount", CURRENT_FUNCTION);
@@ -2575,7 +2618,10 @@ namespace epics {
// process
codec.enqueueSendRequest(sender);
codec.processSendQueue();
codec.breakSender();
try{
codec.processSendQueue();
}catch(sender_break&) {}
codec.transferToReadBuffer();
@@ -2664,7 +2710,7 @@ namespace epics {
// process
codec.enqueueSendRequest(sender);
codec.breakSender();
try
{
codec.processSendQueue();
@@ -2781,7 +2827,10 @@ namespace epics {
// process
codec.enqueueSendRequest(sender);
codec.processSendQueue();
codec.breakSender();
try{
codec.processSendQueue();
}catch(sender_break&) {}
codec.addToReadBuffer();
@@ -2906,7 +2955,10 @@ namespace epics {
codec.clearSendQueue();
codec.processSendQueue();
codec.breakSender();
try{
codec.processSendQueue();
}catch(sender_break&) {}
testOk(0 == codec.getSendBuffer()->getPosition(),
"%s: 0 == codec.getSendBuffer()->getPosition()",
@@ -3128,6 +3180,7 @@ namespace epics {
TransportSendControl* control)
{
_codec.putControlMessage((int8_t)0x01, 0x00112233);
_codec.flush(true);
}
private:
@@ -3135,16 +3188,20 @@ namespace epics {
};
class ValueHolder {
class ValueHolder : public Runnable {
public:
ValueHolder(
TestCodec &testCodec,
AtomicValue<bool> &processTreadExited):
_testCodec(testCodec),
_processTreadExited(processTreadExited) {}
ValueHolder(TestCodec &testCodec):
_testCodec(testCodec) {}
TestCodec &_testCodec;
AtomicValue<bool> & _processTreadExited;
Event waiter;
virtual void run() {
waiter.signal();
try{
_testCodec.processSendQueue();
}catch(sender_break&) {}
}
};
@@ -3155,56 +3212,40 @@ namespace epics {
TestCodec codec(DEFAULT_BUFFER_SIZE,
DEFAULT_BUFFER_SIZE, true);
_processTreadExited.getAndSet(false);
std::tr1::shared_ptr<TransportSender> sender =
std::tr1::shared_ptr<TransportSender>(
new TransportSenderForTestBlockingProcessQueueTest(codec));
ValueHolder valueHolder(codec, _processTreadExited);
ValueHolder valueHolder(codec);
Event done;
epicsThreadCreate(
"testBlockingProcessQueueTest-processThread",
epicsThreadPriorityMedium,
epicsThreadGetStackSize(
epicsThreadStackMedium),
CodecTest::blockingProcessQueueThread,
&valueHolder);
Thread thr(Thread::Config(&valueHolder)
.name("testBlockingProcessQueueTest-processThread"));
epicsThreadSleep(3);
testOk(_processTreadExited.get() == false,
"%s: _processTreadExited.get() == false",
CURRENT_FUNCTION);
valueHolder.waiter.wait();
// let's put something into it
codec.enqueueSendRequest(sender);
codec.enqueueSendRequest(std::tr1::shared_ptr<TransportSender>(new TransportSenderSignal(done)));
epicsThreadSleep(1);
testDiag("Waiting for work");
done.wait();
testOk((std::size_t)PVA_MESSAGE_HEADER_SIZE ==
codec._writeBuffer.getPosition(),
"%s: PVA_MESSAGE_HEADER_SIZE == "
"codec._writeBuffer.getPosition()",
CURRENT_FUNCTION);
"codec._writeBuffer.getPosition() (%u)",
CURRENT_FUNCTION,
(unsigned)codec._writeBuffer.getPosition());
codec.endBlockedProcessSendQueue();
codec.breakSender();
epicsThreadSleep(1);
testOk(_processTreadExited.get() == true,
"%s: _processTreadExited.get() == true", CURRENT_FUNCTION);
thr.exitWait();
}
private:
void static blockingProcessQueueThread(void *param) {
ValueHolder *valueHolder = static_cast<ValueHolder *>(param);
// this should block
valueHolder->_testCodec.processSendQueue();
valueHolder->_processTreadExited.getAndSet(true);
}
AtomicValue<bool> _processTreadExited;
};
}