working on queue code

This commit is contained in:
Marty Kraimer
2012-05-09 14:06:17 -04:00
parent 87bff33c30
commit fb453ea9e5
13 changed files with 227 additions and 377 deletions

View File

@@ -94,7 +94,6 @@ LIBSRCS += bitSetUtil.cpp
SRC_DIRS += $(PVDATA)/monitor
INC += monitor.h
INC += monitorQueue.h
#LIBSRCS += monitorQueue.cpp
LIBRARY=pvData

View File

@@ -626,10 +626,10 @@ static PVFieldPtr &findSubField(String fieldName,PVStructure *pvStructure) {
PVFieldPtr pvField;
size_t numFields = pvStructure->getStructure()->getNumberFields();
for(size_t i=0; i<numFields; i++) {
PVFieldPtr pvField = pvFields[i];
pvField = pvFields[i];
size_t result = pvField->getFieldName().compare(name);
if(result==0) {
if(restOfName.length()==0) return pvField;
if(restOfName.length()==0) return pvFields[i];
if(pvField->getField()->getType()!=structure) return nullPVField;
PVStructurePtr pvStructure = std::tr1::static_pointer_cast<PVStructure>(pvField);
return findSubField(restOfName,pvStructure.get());

View File

@@ -14,6 +14,10 @@
namespace epics { namespace pvData {
PVDATA_REFCOUNT_MONITOR_DEFINE(bitSet);
BitSetPtr BitSet::create(uint32 nbits)
{
return BitSetPtr(new BitSet(nbits));
}
BitSet::BitSet() : words(0), wordsLength(0), wordsInUse(0) {
initWords(BITS_PER_WORD);

View File

@@ -36,9 +36,12 @@ namespace epics { namespace pvData {
*
* Based on Java implementation.
*/
class BitSet;
typedef std::tr1::shared_ptr<BitSet> BitSetPtr;
class BitSet : public Serializable {
public:
POINTER_DEFINITIONS(BitSet);
static BitSetPtr create(uint32 nbits);
/**
* Creates a new bit set. All bits are initially {@code false}.
*/

View File

@@ -1,136 +0,0 @@
/* messageQueue.cpp */
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* EPICS pvDataCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#include <cstddef>
#include <cstdlib>
#include <cstddef>
#include <string>
#include <cstdio>
#include <stdexcept>
#include <pv/pvType.h>
#include <pv/lock.h>
#include <pv/requester.h>
#include <pv/noDefaultMethods.h>
#include <pv/CDRMonitor.h>
#include <pv/queue.h>
#include <pv/messageQueue.h>
namespace epics { namespace pvData {
PVDATA_REFCOUNT_MONITOR_DEFINE(messageQueue);
typedef MessageNode * MessageNodePtr;
typedef QueueElement<MessageNode> MessageElement;
typedef MessageElement *MessageElementPtr;
typedef Queue<MessageNode> MessageNodeQueue;
MessageNode::MessageNode()
: message(String()),messageType(infoMessage){}
MessageNode::~MessageNode() {
}
String MessageNode::getMessage() const { return message;};
MessageType MessageNode::getMessageType() const { return messageType;}
void MessageNode::setMessageNull() {message = String();}
class MessageQueuePvt {
public:
MessageNodePtr *messageNodeArray;
MessageNodeQueue *queue;
MessageNodePtr lastPut;
MessageElementPtr lastGet;
int size;
int overrun;
};
MessageQueue::MessageQueue(int size)
: pImpl(new MessageQueuePvt)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(messageQueue);
pImpl->size = size;
pImpl->overrun = 0;
pImpl->lastPut = 0;
pImpl->lastGet = 0;
pImpl->messageNodeArray = new MessageNodePtr[size];
for(int i=0; i<size; i++) {
pImpl->messageNodeArray[i] = new MessageNode();
}
pImpl->queue = new MessageNodeQueue(pImpl->messageNodeArray,size);
}
MessageQueue::~MessageQueue()
{
delete pImpl->queue;
for(int i=0; i< pImpl->size; i++) {
delete pImpl->messageNodeArray[i];
}
delete[] pImpl->messageNodeArray;
PVDATA_REFCOUNT_MONITOR_DESTRUCT(messageQueue);
}
MessageNode *MessageQueue::get() {
if(pImpl->lastGet!=0) {
throw std::logic_error(
String("MessageQueue::get() but did not release last"));
}
MessageElementPtr element = pImpl->queue->getUsed();
if(element==0) return 0;
pImpl->lastGet = element;
return element->getObject();
}
void MessageQueue::release() {
if(pImpl->lastGet==0) return;
pImpl->queue->releaseUsed(pImpl->lastGet);
pImpl->lastGet = 0;
}
bool MessageQueue::put(String message,MessageType messageType,bool replaceLast)
{
MessageElementPtr element = pImpl->queue->getFree();
if(element!=0) {
MessageNodePtr node = element->getObject();
node->message = message;
node->messageType = messageType;
pImpl->lastPut = node;
pImpl->queue->setUsed(element);
return true;
}
pImpl->overrun++;
if(replaceLast) {
MessageNodePtr node = pImpl->lastPut;
node->message = message;
node->messageType = messageType;
}
return false;
}
bool MessageQueue::isEmpty() const
{
int free = pImpl->queue->getNumberFree();
if(free==pImpl->size) return true;
return false;
}
bool MessageQueue::isFull() const
{
if(pImpl->queue->getNumberFree()==0) return true;
return false;
}
int MessageQueue::getClearOverrun()
{
int num = pImpl->overrun;
pImpl->overrun = 0;
return num;
}
}}

View File

@@ -13,94 +13,84 @@
#include <pv/pvType.h>
#include <pv/requester.h>
#include <pv/noDefaultMethods.h>
#include <pv/queue.h>
namespace epics { namespace pvData {
class MessageNode;
class MessageQueue;
typedef std::tr1::shared_ptr<MessageNode> MessageNodePtr;
typedef std::vector<MessageNodePtr> MessageNodePtrArray;
typedef std::tr1::shared_ptr<MessageQueue> MessageQueuePtr;
class MessageNode {
public:
MessageNode() : messageType(infoMessage) {}
String getMessage() const { return message;}
MessageType getMessageType() const {return messageType;}
void setMessageNull() {message=String();}
private:
MessageNode();
~MessageNode();
friend class MessageQueue;
String message;
MessageType messageType;
friend class MessageQueue;
};
class MessageQueue : private NoDefaultMethods {
class MessageQueue : public Queue<MessageNode> {
public:
POINTER_DEFINITIONS(MessageQueue);
static MessageQueuePtr create(int size);
~MessageQueue();
MessageNode *get();
MessageQueue(MessageNodePtrArray &nodeArray);
virtual ~MessageQueue();
MessageNodePtr &get();
// must call release before next get
void release();
// return (false,true) if message (was not, was) put into queue
bool put(String message,MessageType messageType,bool replaceLast);
bool isEmpty() const;
bool isFull() const;
bool isEmpty() ;
bool isFull() ;
int getClearOverrun();
private:
MessageQueue(std::vector<std::tr1::shared_ptr<MessageNode> > data);
Queue<MessageNode> queue;
MessageNode *lastPut;
MessagreNode *lastGet;
int size;
int overrun;
MessageNodePtr nullNode;
MessageNodePtr lastGet;
MessageNodePtr lastPut;
uint32 overrun;
};
MessageQueuePtr MessageQueue::create(int size)
{
std::vector<std::tr1::shared_ptr<MessageNode> > dataArray;
dataArray.reserve(size);
MessageNodePtrArray nodeArray;
nodeArray.reserve(size);
for(int i=0; i<size; i++) {
dataArray.push_back(
std::tr1::shared_ptr<MessageNode>(new MessageNode()));
nodeArray.push_back(
MessageNodePtr(new MessageNode()));
}
return std::tr1::shared_ptr<MessageQueue>(new MessageQueue(dataArray));
return std::tr1::shared_ptr<MessageQueue>(new MessageQueue(nodeArray));
}
MessageQueue::MessageQueue(std::vector<std::tr1::shared_ptr<MessageNode> > data)
: queue(data),
lastPut(NULL),
lastGet(NULL),
size(data.size()),
MessageQueue::MessageQueue(MessageNodePtrArray &data)
: Queue<MessageNode>(data),
overrun(0)
{ }
MessageNode *MessageQueue::get() {
if(lastGet!=NULL) {
throw std::logic_error(
String("MessageQueue::get() but did not release last"));
}
MessageNode node = queue.getUsed();
if(node==NULL) return NULL;
lastGet = node;
return node;
MessageNodePtr &MessageQueue::get() {
if(getNumberUsed()==0) return nullNode;
lastGet = getUsed();
return lastGet;
}
void MessageQueue::release() {
if(lastGet==NULL) return;
queue.releaseUsed(lastGet);
lastGet = NULL;
if(lastGet.get()==NULL) return;
releaseUsed(lastGet);
lastGet.reset();
}
bool MessageQueue::put(String message,MessageType messageType,bool replaceLast)
{
MessageNode node = queue.getFree();
if(node!= NULL) {
MessageNodePtr node = getFree();
if(node.get()!= NULL) {
node->message = message;
node->messageType = messageType;
lastPut = node;
queue.setUsed(node);
setUsed(node);
return true;
}
overrun++;
@@ -108,20 +98,21 @@ bool MessageQueue::put(String message,MessageType messageType,bool replaceLast)
node = lastPut;
node->message = message;
node->messageType = messageType;
return true;
}
return false;
}
bool MessageQueue::isEmpty() const
bool MessageQueue::isEmpty()
{
int free = queue.getNumberFree();
if(free==size) return true;
int free = getNumberFree();
if(free==capacity()) return true;
return false;
}
bool MessageQueue::isFull() const
bool MessageQueue::isFull()
{
if(queue.getNumberFree()==0) return true;
if(getNumberFree()==0) return true;
return false;
}

View File

@@ -8,6 +8,7 @@
#include <tr1/memory>
#include <cstddef>
#include <stdexcept>
#include <pv/sharedPtr.h>
#ifndef QUEUE_H
#define QUEUE_H
namespace epics { namespace pvData {
@@ -16,20 +17,22 @@ template <typename T>
class Queue
{
public:
POINTER_DEFINITIONS(Queue);
typedef std::tr1::shared_ptr<T> queueElementPtr;
Queue(int size);
Queue(std::vector<queueElementPtr> elements);
~Queue(){}
typedef std::vector<queueElementPtr> queueElementPtrArray;
Queue(queueElementPtrArray &);
virtual ~Queue();
void clear();
int capacity(){return size;}
int getNumberFree(){return numberFree;}
int getNumberUsed(){return numberUsed;}
T * getFree();
void setUsed(T *element);
T * getUsed();
void releaseUsed(T *element);
int capacity();
int getNumberFree();
int getNumberUsed();
queueElementPtr & getFree();
void setUsed(queueElementPtr &element);
queueElementPtr & getUsed();
void releaseUsed(queueElementPtr &element);
private:
std::vector<queueElementPtr> elements;
queueElementPtr nullElement;
queueElementPtrArray elements;
int size;
int numberFree;
int numberUsed;
@@ -40,9 +43,8 @@ private:
};
template <typename T>
Queue<T>::Queue(int size)
: elements(size),
size(size),
Queue<T>::Queue(std::vector<queueElementPtr> &xxx)
: size(xxx.size()),
numberFree(size),
numberUsed(0),
nextGetFree(0),
@@ -50,22 +52,20 @@ Queue<T>::Queue(int size)
nextGetUsed(0),
nextReleaseUsed(0)
{
for(int i=0; i<size; i++) {
elements[i] = std::tr1::shared_ptr<T>(new T());
}
elements.swap(xxx);
}
template <typename T>
Queue<T>::Queue(std::vector<queueElementPtr> elements)
: elements(elements),
size(elements.size()),
numberFree(size),
numberUsed(0),
nextGetFree(0),
nextSetUsed(0),
nextGetUsed(0),
nextReleaseUsed(0)
{ }
Queue<T>::~Queue(){}
template <typename T>
int Queue<T>::capacity(){return size;}
template <typename T>
int Queue<T>::getNumberFree(){return numberFree;}
template <typename T>
int Queue<T>::getNumberUsed(){return numberUsed;}
template <typename T>
void Queue<T>::clear()
@@ -79,19 +79,20 @@ void Queue<T>::clear()
}
template <typename T>
T * Queue<T>::getFree()
std::tr1::shared_ptr<T> & Queue<T>::getFree()
{
if(numberFree==0) return NULL;
if(numberFree==0) return nullElement;
numberFree--;
int ind = nextGetFree;
std::tr1::shared_ptr<T> queueElement = elements[nextGetFree++];
if(nextGetFree>=size) nextGetFree = 0;
return queueElement.get();
return elements[ind];
}
template <typename T>
void Queue<T>::setUsed(T *element)
void Queue<T>::setUsed(std::tr1::shared_ptr<T> &element)
{
if(element!=elements[nextSetUsed++].get()) {
if(element!=elements[nextSetUsed++]) {
throw std::logic_error("not correct queueElement");
}
numberUsed++;
@@ -99,25 +100,25 @@ void Queue<T>::setUsed(T *element)
}
template <typename T>
T * Queue<T>::getUsed()
std::tr1::shared_ptr<T> & Queue<T>::getUsed()
{
if(numberUsed==0) return 0;
if(numberUsed==0) return nullElement;
int ind = nextGetUsed;
std::tr1::shared_ptr<T> queueElement = elements[nextGetUsed++];
if(nextGetUsed>=size) nextGetUsed = 0;
return queueElement.get();
return elements[ind];
}
template <typename T>
void Queue<T>::releaseUsed( T *element)
void Queue<T>::releaseUsed(std::tr1::shared_ptr<T> &element)
{
if(element!=elements[nextReleaseUsed++].get()) {
if(element!=elements[nextReleaseUsed++]) {
throw std::logic_error(
"not queueElement returned by last call to getUsed");
}
if(nextReleaseUsed>=size) nextReleaseUsed = 0;
numberUsed--;
numberFree++;
}

View File

@@ -15,8 +15,6 @@
namespace epics { namespace pvData {
typedef std::tr1::shared_ptr<BitSet> BitSetPtr;
class MonitorElement;
typedef std::tr1::shared_ptr<MonitorElement> MonitorElementPtr;
typedef std::vector<MonitorElementPtr> MonitorElementArray;
@@ -32,24 +30,19 @@ typedef std::tr1::shared_ptr<Monitor> MonitorPtr;
class MonitorElement {
public:
POINTER_DEFINITIONS(MonitorElement);
virtual ~MonitorElement(){}
/**
* Get the PVStructure.
* @return The PVStructure.
*/
virtual PVStructurePtr getPVStructure() = 0;
/**
* Get the bitSet showing which fields have changed.
* @return The bitSet.
*/
virtual BitSetPtr getChangedBitSet() = 0;
/**
* Get the bitSet showing which fields have been changed more than once.
* @return The bitSet.
*/
virtual BitSetPtr getOverrunBitSet() = 0;
MonitorElement(){}
MonitorElement(PVStructurePtr &pvStructurePtr);
PVStructurePtr pvStructurePtr;
BitSetPtr changedBitSet;
BitSetPtr overrunBitSet;
};
MonitorElement::MonitorElement(PVStructurePtr &pvStructurePtr)
: pvStructurePtr(pvStructurePtr),
changedBitSet(BitSetPtr(BitSet::create(pvStructurePtr->getNumberFields()))),
overrunBitSet(BitSetPtr(BitSet::create(pvStructurePtr->getNumberFields())))
{
}
/**
* Interface for Monitor.

View File

@@ -12,106 +12,33 @@
namespace epics { namespace pvData {
typedef QueueElement<MonitorElement::shared_pointer> MonitorQueueElement;
class MonitorElementImpl : public MonitorElement {
public:
MonitorElementImpl(PVStructure::shared_pointer pvStructure);
~MonitorElementImpl(){}
virtual PVStructure::shared_pointer const & getPVStructure();
virtual BitSet::shared_pointer const & getChangedBitSet();
virtual BitSet::shared_pointer const & getOverrunBitSet();
void setQueueElement(MonitorQueueElement *queueElement);
MonitorQueueElement *getQueueElement();
private:
PVStructure::shared_pointer pvStructure;
BitSet::shared_pointer changedBitSet;
BitSet::shared_pointer overrunBitSet;
MonitorQueueElement *queueElement;
};
MonitorElementImpl::MonitorElementImpl(PVStructure::shared_pointer pvStructure)
: pvStructure(pvStructure),
changedBitSet(BitSet::shared_pointer(
new BitSet(pvStructure->getNumberFields()))),
overrunBitSet(BitSet::shared_pointer(
new BitSet(pvStructure->getNumberFields()))),
queueElement(0)
{}
PVStructure::shared_pointer const & MonitorElementImpl::getPVStructure()
MonitorElementArray MonitorQueue::createMonitorElements(
StructureConstPtr & elementStructure,int number)
{
return pvStructure;
PVDataCreatePtr pvDataCreate = getPVDataCreate();
MonitorElementArray elementArray(number);
for(int i=0; i<number; i++) {
PVStructurePtr pvStructurePtr
= pvDataCreate->createPVStructure(elementStructure);
MonitorElementPtr monitorElement(new MonitorElement(pvStructurePtr));
elementArray[i] = monitorElement;
}
return elementArray;
}
BitSet::shared_pointer const & MonitorElementImpl::getChangedBitSet()
{
return changedBitSet;
}
BitSet::shared_pointer const & MonitorElementImpl::getOverrunBitSet()
{
return overrunBitSet;
}
void MonitorElementImpl::setQueueElement(MonitorQueueElement *queueElement)
{
this->queueElement = queueElement;
}
MonitorQueueElement *MonitorElementImpl::getQueueElement()
{
return queueElement;
}
MonitorQueue::MonitorQueue(PVStructureSharedPointerPtrArray structures,int number)
: number(number),
structures(structures),
queue(0),
queueElements(new MonitorElement::shared_pointer*[number]),
nullElement(MonitorElement::shared_pointer())
MonitorQueue::MonitorQueue(MonitorElementArray monitorElementArray)
: elementArray(monitorElementArray),
queue(elementArray)//,
//number(monitorElementArray.size())
{
number = monitorElementArray.size();
if(number<2) {
throw std::logic_error(String("queueSize must be >=2"));
}
for(int i=0; i<number; i++) {
queueElements[i] = new MonitorElement::shared_pointer(
new MonitorElementImpl(*structures[i]));
}
queue = new Queue<MonitorElement::shared_pointer>(queueElements,number);
MonitorQueueElement *queueElement;
for(int i=0; i<number;i++) {
queueElement = queue->getFree();
MonitorElementImpl * element = static_cast<MonitorElementImpl *>(
queueElement->getObject()->get());
element->setQueueElement(queueElement);
queue->setUsed(queueElement);
queue->releaseUsed(queueElement);
}
}
MonitorQueue::~MonitorQueue()
{
delete queue;
for(int i=0; i<number; i++) {
delete queueElements[i];
}
delete[] queueElements;
for(int i=0; i<number; i++) delete structures[i];
delete[] structures;
}
PVStructureSharedPointerPtrArray MonitorQueue::createStructures(
PVStructurePtrArray array,int number)
{
PVStructureSharedPointerPtrArray elements =
new PVStructureSharedPointerPtr[number];
for(int i=0; i<number; i++){
elements[i] = new PVStructure::shared_pointer(array[i]);
}
delete[] array;
return elements;
}
void MonitorQueue::clear()
@@ -129,11 +56,11 @@ int MonitorQueue::capacity()
return number;
}
MonitorElement::shared_pointer const & MonitorQueue::getFree()
MonitorElementPtr & MonitorQueue::getFree()
{
MonitorQueueElement *queueElement = queue->getFree();
MonitorElementPtr * queueElement = queue->getFree();
if(queueElement==0) return nullElement;
return *queueElement->getObject();
return *queueElement;
}
void MonitorQueue::setUsed(MonitorElement::shared_pointer const & element)
@@ -155,6 +82,5 @@ void MonitorQueue::releaseUsed(MonitorElement::shared_pointer const & element)
queue->releaseUsed(impl->getQueueElement());
}
}}

View File

@@ -19,30 +19,86 @@
namespace epics { namespace pvData {
typedef PVStructure::shared_pointer* PVStructureSharedPointerPtr;
typedef PVStructureSharedPointerPtr* PVStructureSharedPointerPtrArray;
class MonitorQueue;
typedef std::tr1::shared_ptr<MonitorQueue> MonitorQueuePtr;
class MonitorQueue {
public:
MonitorQueue(PVStructureSharedPointerPtrArray structures,int number);
static MonitorQueuePtr create(
StructureConstPtr & elementStructure,int number);
MonitorQueue(MonitorElementArray &monitorElementArray);
~MonitorQueue();
static PVStructureSharedPointerPtrArray createStructures(
PVStructurePtrArray array,int number);
void clear();
int getNumberFree();
int capacity();
MonitorElementPtr & getFree();
void setUsed(MonitorElementPtr & element);
MonitorElementPtr getUsed();
MonitorElementPtr & getUsed();
void releaseUsed(MonitorElementPtr & element);
private:
int number;
PVStructureSharedPointerPtrArray structures;
Queue<MonitorElementPtr> *queue;
MonitorElementPtr **queueElements;
MonitorElementPtr nullElement;
MonitorElementArray elementArray;
Queue<MonitorElementPtr> queue;
};
MonitorQueuePtr MonitorQueue::create(i
StructureConstPtr & elementStructure,int number)
{
PVDataCreatePtr pvDataCreate = getPVDataCreate();
MonitorElementArray elementArray;
elementArray.reserve(number);
for(int i=0; i<number; i++) {
PVStructurePtr pvStructurePtr
= pvDataCreate->createPVStructure(elementStructure);
MonitorElementPtr monitorElement(new MonitorElement(pvStructurePtr));
elementArray.push_back(new MonitorElement(pvStructurePtr));
}
return MonitorQueuePtr(new MonitorQueue(elementArray));
}
MonitorQueue::MonitorQueue(MonitorElementArray &monitorElementArray)
: queue(new Queue<MonitorElementPtr>(monitorElementArray.swap()))
{}
void MonitorQueue::clear()
{
queue->clear();
}
int MonitorQueue::getNumberFree()
{
return queue->getNumberFree();
}
int MonitorQueue::capacity()
{
return queue.capacity();
}
MonitorElementPtr & MonitorQueue::getFree()
{
MonitorElementPtr queueElement = queue->getFree();
if(queueElement.get()==0) return nullElement;
return queueElement;
}
void MonitorQueue::setUsed(MonitorElementPtr & element)
{
queue->setUsed(element);
}
MonitorElementPtr & MonitorQueue::getUsed()
{
MonitorElementPtr queueElement = queue->getUsed();
if(queueElement.get()==0) return nullElement;
return queueElement;
}
void MonitorQueue::releaseUsed(MonitorElementPtr & element)
{
queue->releaseUsed(element);
}
}}
#endif /* MONITORQUEUE_H */

View File

@@ -38,9 +38,9 @@ PROD_HOST += testQueue
testQueue_SRCS += testQueue.cpp
testQueue_LIBS += pvData Com
#PROD_HOST += testMessageQueue
#testMessageQueue_SRCS += testMessageQueue.cpp
#testMessageQueue_LIBS += pvData Com
PROD_HOST += testMessageQueue
testMessageQueue_SRCS += testMessageQueue.cpp
testMessageQueue_LIBS += pvData Com
include $(TOP)/configure/RULES
#----------------------------------------

View File

@@ -4,7 +4,7 @@
* in file LICENSE that is included with this distribution.
*/
/*
* testQueue.cpp
* testMessageQueue.cpp
*
* Created on: 2010.12
* Author: Marty Kraimer
@@ -33,12 +33,16 @@ using namespace epics::pvData;
static void testBasic(FILE * fd,FILE *auxfd ) {
int queueSize = 3;
String messages[]= {
String("1"),String("2"),String("3"),String("4"),String("5")
};
MessageQueue *queue = new MessageQueue(queueSize);
StringArray messages;
messages.reserve(5);
messages.push_back("1");
messages.push_back("2");
messages.push_back("3");
messages.push_back("4");
messages.push_back("5");
MessageQueuePtr queue = MessageQueue::create(queueSize);
bool result;
MessageNode *messageNode;
MessageNodePtr messageNode;
result = queue->isEmpty();
assert(result);
result = queue->put(messages[0],infoMessage,true);
@@ -51,25 +55,24 @@ static void testBasic(FILE * fd,FILE *auxfd ) {
result = queue->put(messages[3],infoMessage,true);
assert(result==false);
messageNode = queue->get();
assert(messageNode!=0);
assert(messageNode.get()!=0);
fprintf(fd,"message %s messageType %s\n",
messageNode->getMessage().c_str(),
messageTypeName[messageNode->getMessageType()].c_str());
assert(messageNode->getMessage().compare(messages[0])==0);
queue->release();
messageNode = queue->get();
assert(messageNode!=0);
assert(messageNode.get()!=0);
assert(messageNode->getMessage().compare(messages[1])==0);
queue->release();
messageNode = queue->get();
assert(messageNode!=0);
assert(messageNode.get()!=0);
fprintf(fd,"message %s messageType %s\n",
messageNode->getMessage().c_str(),
messageTypeName[messageNode->getMessageType()].c_str());
assert(messageNode->getMessage().compare(messages[3])==0);
queue->release();
result = queue->isEmpty();
delete queue;
}
int main(int argc, char *argv[]) {

View File

@@ -22,7 +22,6 @@
#include <pv/lock.h>
#include <pv/timeStamp.h>
#include <pv/queue.h>
#include <pv/CDRMonitor.h>
#include <pv/event.h>
#include <pv/thread.h>
#include <pv/executor.h>
@@ -35,19 +34,25 @@ struct Data {
int b;
};
typedef std::tr1::shared_ptr<Data> DataPtr;
typedef std::vector<DataPtr> DataPtrArray;
static const int numElements = 5;
typedef Queue<Data> DataQueue;
class Sink;
typedef std::tr1::shared_ptr<Sink> SinkPtr;
class Sink : public Runnable {
public:
static SinkPtr create(DataQueue &queue,FILE *auxfd);
Sink(DataQueue &queue,FILE *auxfd);
~Sink();
void stop();
void look();
virtual void run();
private:
DataQueue queue;
DataQueue &queue;
FILE *auxfd;
bool isStopped;
Event *wait;
@@ -57,6 +62,11 @@ private:
Thread *thread;
};
SinkPtr Sink::create(DataQueue &queue,FILE *auxfd)
{
return SinkPtr(new Sink(queue,auxfd));
}
Sink::Sink(DataQueue &queue,FILE *auxfd)
: queue(queue),
auxfd(auxfd),
@@ -96,8 +106,8 @@ void Sink::run()
wait->wait();
if(isStopped) break;
while(true) {
Data *data = queue.getUsed();
if(data==NULL) {
DataPtr data = queue.getUsed();
if(data.get()==NULL) {
waitEmpty->signal();
break;
}
@@ -109,33 +119,34 @@ void Sink::run()
}
static void testBasic(FILE * fd,FILE *auxfd ) {
std::vector<std::tr1::shared_ptr<Data> >dataArray;
DataPtrArray dataArray;
dataArray.reserve(numElements);
for(int i=0; i<numElements; i++) {
dataArray.push_back(std::tr1::shared_ptr<Data>(new Data()));
dataArray.push_back(DataPtr(new Data()));
}
DataQueue queue(dataArray);
Data *pdata = queue.getFree();
DataPtr data = queue.getFree();
int value = 0;
while(pdata!=NULL) {
pdata->a = value;
pdata->b = value*10;
while(data.get()!=NULL) {
data->a = value;
data->b = value*10;
value++;
queue.setUsed(pdata);
pdata = queue.getFree();
queue.setUsed(data);
data = queue.getFree();
}
std::tr1::shared_ptr<Sink> sink = std::tr1::shared_ptr<Sink>(new Sink(queue,auxfd));
SinkPtr sink = SinkPtr(new Sink(queue,auxfd));
queue.clear();
while(true) {
Data * data = queue.getFree();
if(data==NULL) break;
data = queue.getFree();
if(data.get()==NULL) break;
fprintf(auxfd,"source a %d b %d\n",data->a,data->b);
queue.setUsed(data);
}
sink->look();
// now alternate
for(int i=0; i<numElements; i++) {
Data *data = queue.getFree();
assert(data!=NULL);
data = queue.getFree();
assert(data.get()!=NULL);
fprintf(auxfd,"source a %d b %d\n",data->a,data->b);
queue.setUsed(data);
sink->look();
@@ -143,6 +154,7 @@ static void testBasic(FILE * fd,FILE *auxfd ) {
sink->stop();
}
int main(int argc, char *argv[]) {
char *fileName = 0;
if(argc>1) fileName = argv[1];
@@ -157,8 +169,6 @@ int main(int argc, char *argv[]) {
auxfd = fopen(auxFileName,"w+");
}
testBasic(fd,auxfd);
epicsExitCallAtExits();
CDRMonitor::get().show(fd);
return (0);
}