diff --git a/src/utils/Makefile b/src/utils/Makefile index f7e163c..81b16ea 100644 --- a/src/utils/Makefile +++ b/src/utils/Makefile @@ -11,6 +11,7 @@ INC += referenceCountingLock.h INC += configuration.h INC += likely.h INC += wildcard.h +INC += fairQueue.h LIBSRCS += hexDump.cpp LIBSRCS += inetAddressUtil.cpp diff --git a/src/utils/fairQueue.h b/src/utils/fairQueue.h new file mode 100644 index 0000000..976c14a --- /dev/null +++ b/src/utils/fairQueue.h @@ -0,0 +1,176 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#ifndef FAIRQUEUE_H +#define FAIRQUEUE_H + +#include +#include +#include +#include +#include +#include + +#include + +namespace epics {namespace pvAccess { + + +/** @brief An intrusive, loss-less, unbounded, round-robin queue + * + * The parameterized type 'T' must be a sub-class of @class fair_queue::entry + * + * @li Intrusive. Entries in the queue must derive from @class entry + * + * @li Loss-less. An entry will be returned by pop_front() corresponding to + * each call to push_back(). + * + * @li Un-bounded. There is no upper limit to the number of times an entry + * may be queued other than machine constraints. + * + * @li Round robin. The order that entries are returned may not match + * the order they were added in. "Fairness" is achived by returning + * entries in a rotating fashion based on the order in which they were + * first added. Re-adding the same entry before it is popped does not change + * this order. + * Adding [A, A, B, A, C, C] would give out [A, B, C, A, C, A]. + * + * @warning Only one thread should call pop_front() + * as push_back() does not broadcast (only wakes up one waiter) + */ +template +class epicsShareClass fair_queue +{ + typedef epicsGuard guard_t; +public: + typedef std::tr1::shared_ptr value_type; + + class epicsShareClass entry { + ELLNODE node; + unsigned Qcnt; + value_type holder; +#ifndef NDEBUG + fair_queue *owner; +#endif + + friend class fair_queue; + + entry(const entry&); + entry& operator=(const entry&); + public: + entry() :node(), Qcnt(0), holder() +#ifndef NDEBUG + , owner(NULL) +#endif + { + node.next = node.previous = NULL; + } + ~entry() { + // nodes should be removed from the list before deletion + assert(!node.next && !node.previous && !owner); + } + }; + + fair_queue() + { + ellInit(&list); + } + ~fair_queue() + { + clear(); + assert(ellCount(&list)==0); + } + + void clear() + { + value_type C; + guard_t G(mutex); + do { + pop_front_try(C); + } while(C); + } + + bool empty() const { + guard_t G(mutex); + return ellFirst(&list)==NULL; + } + + void push_back(const value_type& ent) + { + bool wake; + entry *P = ent.get(); + { + guard_t G(mutex); + wake = ellFirst(&list)==NULL; // empty queue + + if(P->Qcnt++==0) { + // not in list + assert(P->owner==NULL); + P->owner = this; + P->holder = ent; // the list will hold a reference + ellAdd(&list, &P->node); // push_back + } else + assert(P->owner==this); + } + if(wake) wakeup.signal(); + } + + bool pop_front_try(value_type& ret) + { + guard_t G(mutex); + ELLNODE *cur = ellGet(&list); // pop_front + + if(cur) { + entry *P = CONTAINER(cur, entry, node); + assert(P->owner==this); + assert(P->Qcnt>0); + if(--P->Qcnt==0) { + P->node.previous = P->node.next = NULL; + P->owner = NULL; + + ret.swap(P->holder); + } else { + ellAdd(&list, &P->node); // push_back + + ret = P->holder; + } + return true; + } else { + ret.reset(); + return false; + } + } + + void pop_front(value_type& ret) + { + while(1) { + pop_front_try(ret); + if(ret) + break; + wakeup.wait(); + } + } + + bool pop_front(value_type& ret, double timeout) + { + while(1) { + pop_front_try(ret); + if(ret) + return true; + if(!wakeup.wait(timeout)) + return false; + } + } + +private: + ELLLIST list; + mutable epicsMutex mutex; + mutable epicsEvent wakeup; +}; + +}} // namespace + +#endif // FAIRQUEUE_H diff --git a/testApp/utils/Makefile b/testApp/utils/Makefile index b4232f0..252455a 100644 --- a/testApp/utils/Makefile +++ b/testApp/utils/Makefile @@ -22,8 +22,6 @@ testInetAddressUtils_SYS_LIBS_WIN32 += ws2_32 testHarness_SRCS += testInetAddressUtils.cpp TESTS += testInetAddressUtils -TESTSCRIPTS_HOST += $(TESTS:%=%.t) - #TESTPROD_HOST += loggerTest #loggerTest_SRCS += loggerTest.cpp #testHarness_SRCS += loggerTest.cpp @@ -49,3 +47,7 @@ configurationTest_SRCS += configurationTest.cpp configurationTest_SYS_LIBS_WIN32 += ws2_32 #testHarness_SRCS += configurationTest.cpp TESTS += configurationTest + +TESTPROD_HOST += testFairQueue +testFairQueue_SRCS += testFairQueue +TESTS += testFairQueue diff --git a/testApp/utils/testFairQueue.cpp b/testApp/utils/testFairQueue.cpp new file mode 100644 index 0000000..6b96c55 --- /dev/null +++ b/testApp/utils/testFairQueue.cpp @@ -0,0 +1,75 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include + +#include + +#include +#include + +namespace { + +struct Qnode : public epics::pvAccess::fair_queue::entry { + unsigned i; + Qnode(unsigned i):i(i) {} +}; + +} // namespace + +static unsigned Ninput[] = {0,0,0,1,0,2,1,0,1,0,0}; +static unsigned Nexpect[] = {0,1,2,0,1,0,1,0,0,0,0}; + +static +void testOrder() +{ + epics::pvAccess::fair_queue Q; + typedef epics::pvAccess::fair_queue::value_type value_type; + + std::vector unique, inputs, outputs; + unique.resize(3); + unique[0].reset(new Qnode(0)); + unique[1].reset(new Qnode(1)); + unique[2].reset(new Qnode(2)); + + testDiag("Queueing"); + + for(unsigned i=0; ii); + } + } + + testOk(outputs.size()==NELEMENTS(Nexpect), "sizes match actual %u expected %u", + (unsigned)outputs.size(), (unsigned)NELEMENTS(Nexpect)); + + for(unsigned i=0; i=outputs.size()) { + testFail("output truncated"); + continue; + } + testOk(outputs[i]->i==Nexpect[i], "[%u] %u == %u", + i, (unsigned)outputs[i]->i, Nexpect[i]); + } +} + +MAIN(testFairQueue) +{ + testPlan(12); + testOrder(); + return testDone(); +}