latest changes; memory leaks fixed

This commit is contained in:
Marty Kraimer
2013-10-16 08:13:15 -04:00
parent ed5bf2d79b
commit 14b3640e9a
6 changed files with 84 additions and 49 deletions

View File

@ -40,5 +40,5 @@ PVDATA=/home/mrk/hg/pvDataCPP
PVACCESS=/home/mrk/hg/pvAccessCPP
# set your EPICS_BASE, PVDATA and PVACCESS paths in here
-include $(TOP)/configure/RELEASE.local
-include $(TOP)/../RELEASE.local
-include $(TOP)/configure/RELEASE.local

View File

@ -77,6 +77,8 @@ int main(int argc,char *argv[])
PVRecordPtr pvRecord;
pvRecord = ArrayPerformance::create(recordName,size,delay);
result = master->addRecord(pvRecord);
PVRecordPtr arrayPreformance = pvRecord;
arrayPreformance->setTraceLevel(1);
pvRecord = TraceRecord::create("traceRecordPGRPC");
result = master->addRecord(pvRecord);
if(!result) cout<< "record " << recordName << " not added" << endl;
@ -97,11 +99,14 @@ int main(int argc,char *argv[])
if(str.compare("exit")==0) break;
}
arrayPreformance.reset();
for(size_t i=0; i<nMonitor; ++i) longArrayMonitor[i]->stop();
for(size_t i=0; i<nMonitor; ++i) longArrayMonitor[i]->destroy();
pvaServer->shutdown();
epicsThreadSleep(1.0);
pvaServer->destroy();
ClientFactory::stop();
epicsThreadSleep(1.0);
channelProvider->destroy();
return 0;
}

View File

@ -62,9 +62,10 @@ int main(int argc,char *argv[])
if(str.compare("exit")==0) break;
}
ClientFactory::stop();
epicsThreadSleep(.1);
longArrayMonitor->destroy();
longArrayMonitor.reset();
ClientFactory::stop();
return 0;
}

View File

@ -37,6 +37,7 @@ public:
: longArrayMonitor(longArrayMonitor)
{}
virtual ~LAMChannelRequester(){}
virtual void destroy(){longArrayMonitor.reset();}
virtual String getRequesterName() { return requesterName;}
virtual void message(String const & message, MessageType messageType)
{ messagePvt(message,messageType);}
@ -184,7 +185,6 @@ void LAMMonitorRequester::run()
double diff = TimeStamp::diff(timeStamp,timeStampLast);
double elementsPerSecond = data.size();
elementsPerSecond = 1e-6*elementsPerSecond/diff;
cout.flush();
cout << "first " << first << " last " << last << " sum " << sum;
cout << " elements/sec " << elementsPerSecond << "million";
BitSetPtr changed = monitorElement->changedBitSet;
@ -196,7 +196,6 @@ void LAMMonitorRequester::run()
overrun->toString(&buffer);
cout << " overrun " << buffer;
cout << endl;
cout.flush();
timeStampLast = timeStamp;
} else {
cout << "size = 0" << endl;
@ -282,6 +281,7 @@ void LongArrayMonitor::destroy()
monitor.reset();
channel->destroy();
channel.reset();
channelRequester->destroy();
channelRequester.reset();
}

View File

@ -28,6 +28,7 @@ using std::endl;
static MonitorAlgorithmCreatePtr nullMonitorAlgorithmCreate;
static MonitorPtr nullMonitor;
static MonitorElementPtr NULLMonitorElement;
static Status wasDestroyedStatus(Status::Status::STATUSTYPE_ERROR,"was destroyed");
static ConvertPtr convert = getConvert();
@ -81,14 +82,12 @@ public:
virtual MonitorElementPtr poll();
virtual void release(MonitorElementPtr const &monitorElement);
private:
MonitorLocalPtr monitorLocal;
std::tr1::weak_ptr<MonitorLocal> monitorLocal;
PVStructurePtr pvCopyStructure;
MonitorElementPtr monitorElement;
bool gotMonitor;
bool wasReleased;
BitSetPtr changedBitSet;
BitSetPtr overrunBitSet;
Mutex mutex;
};
class RealQueue :
@ -106,11 +105,10 @@ public:
virtual MonitorElementPtr poll();
virtual void release(MonitorElementPtr const &monitorElement);
private:
MonitorLocalPtr monitorLocal;
std::tr1::weak_ptr<MonitorLocal> monitorLocal;
Queue<MonitorElement> queue;
bool queueIsFull;
MonitorElementPtr monitorElement;
Mutex mutex;
};
@ -190,11 +188,15 @@ void MonitorLocal::destroy()
Status MonitorLocal::start()
{
{
Lock xx(mutex);
if(pvRecord->getTraceLevel()>0)
{
cout << "MonitorLocal::start() " << endl;
}
if(isDestroyed) return wasDestroyedStatus;
firstMonitor = true;
}
return queue->start();
}
@ -211,11 +213,44 @@ Status MonitorLocal::stop()
MonitorElementPtr MonitorLocal::poll()
{
pvRecord->lock();
try {
Lock xx(mutex);
if(pvRecord->getTraceLevel()>1)
{
cout << "MonitorLocal::poll() " << endl;
}
return queue->poll();
if(isDestroyed) {
pvRecord->unlock();
return NULLMonitorElement;
}
MonitorElementPtr element = queue->poll();
pvRecord->unlock();
return element;
} catch(...) {
pvRecord->unlock();
throw;
}
}
void MonitorLocal::release(MonitorElementPtr const & monitorElement)
{
pvRecord->lock();
try {
Lock xx(mutex);
if(pvRecord->getTraceLevel()>1)
{
cout << "MonitorLocal::release() " << endl;
}
if(isDestroyed) {
return;
}
queue->release(monitorElement);
pvRecord->unlock();
} catch(...) {
pvRecord->unlock();
throw;
}
}
void MonitorLocal::dataChanged()
@ -224,6 +259,8 @@ void MonitorLocal::dataChanged()
{
cout << "MonitorLocal::dataChanged() " "firstMonitor " << firstMonitor << endl;
}
Lock xx(mutex);
if(isDestroyed) return;
if(firstMonitor) {
queue->dataChanged();
firstMonitor = false;
@ -244,15 +281,6 @@ void MonitorLocal::unlisten()
monitorRequester->unlisten(getPtrSelf());
}
void MonitorLocal::release(MonitorElementPtr const & monitorElement)
{
if(pvRecord->getTraceLevel()>1)
{
cout << "MonitorLocal::release() " << endl;
}
queue->release(monitorElement);
}
bool MonitorLocal::init(PVStructurePtr const & pvRequest)
{
PVFieldPtr pvField;
@ -273,10 +301,7 @@ bool MonitorLocal::init(PVStructurePtr const & pvRequest)
}
}
}
if(queueSize==1) {
monitorRequester->message("queueSize can not be 1",errorMessage);
return false;
}
pvField = pvRequest->getSubField("field");
if(pvField.get()==NULL) {
pvCopy = PVCopy::create(pvRecord,pvRequest,"");
@ -297,7 +322,7 @@ bool MonitorLocal::init(PVStructurePtr const & pvRequest)
}
pvCopyMonitor = pvCopy->createPVCopyMonitor(getPtrSelf());
// MARTY MUST IMPLEMENT periodic
if(queueSize==0) {
if(queueSize<2) {
queue = NOQueuePtr(new NOQueue(getPtrSelf()));
} else {
std::vector<MonitorElementPtr> monitorElementArray;
@ -387,7 +412,6 @@ NOQueue::NOQueue(
pvCopyStructure(monitorLocal->getPVCopy()->createPVStructure()),
monitorElement(new MonitorElement(pvCopyStructure)),
gotMonitor(false),
wasReleased(false),
changedBitSet(new BitSet(pvCopyStructure->getNumberFields())),
overrunBitSet(new BitSet(pvCopyStructure->getNumberFields()))
{
@ -395,12 +419,12 @@ NOQueue::NOQueue(
Status NOQueue::start()
{
Lock xx(mutex);
gotMonitor = true;
wasReleased = true;
changedBitSet->clear();
overrunBitSet->clear();
monitorLocal->getPVCopyMonitor()->startMonitoring(
MonitorLocalPtr ml = monitorLocal.lock();
if(ml==NULL) return wasDestroyedStatus;
ml->getPVCopyMonitor()->startMonitoring(
changedBitSet,
overrunBitSet);
return Status::Ok;
@ -412,17 +436,17 @@ void NOQueue::stop()
bool NOQueue::dataChanged()
{
Lock xx(mutex);
monitorLocal->getPVCopy()->updateCopyFromBitSet(
pvCopyStructure, changedBitSet);
gotMonitor = true;
return wasReleased;
return true;
}
MonitorElementPtr NOQueue::poll()
{
Lock xx(mutex);
if(!gotMonitor) return NULLMonitorElement;
MonitorLocalPtr ml = monitorLocal.lock();
if(ml==NULL) return NULLMonitorElement;
ml->getPVCopy()->updateCopyFromBitSet(
pvCopyStructure, changedBitSet);
getConvert()->copyStructure(pvCopyStructure,monitorElement->pvStructurePtr);
BitSetUtil::compress(changedBitSet,pvCopyStructure);
BitSetUtil::compress(overrunBitSet,pvCopyStructure);
@ -435,7 +459,6 @@ MonitorElementPtr NOQueue::poll()
void NOQueue::release(MonitorElementPtr const &monitorElement)
{
Lock xx(mutex);
gotMonitor = false;
}
@ -450,12 +473,13 @@ RealQueue::RealQueue(
Status RealQueue::start()
{
Lock xx(mutex);
queue.clear();
monitorElement = queue.getFree();
monitorElement->changedBitSet->clear();
monitorElement->overrunBitSet->clear();
monitorLocal->getPVCopyMonitor()->startMonitoring(
MonitorLocalPtr ml = monitorLocal.lock();
if(ml==NULL) return wasDestroyedStatus;
ml->getPVCopyMonitor()->startMonitoring(
monitorElement->changedBitSet,
monitorElement->overrunBitSet);
return Status::Ok;
@ -467,9 +491,10 @@ void RealQueue::stop()
bool RealQueue::dataChanged()
{
Lock xx(mutex);
PVStructurePtr pvStructure = monitorElement->pvStructurePtr;
monitorLocal->getPVCopy()->updateCopyFromBitSet(
MonitorLocalPtr ml = monitorLocal.lock();
if(ml==NULL) return false;
ml->getPVCopy()->updateCopyFromBitSet(
pvStructure,monitorElement->changedBitSet);
if(queue.getNumberFree()==0) {
if(queueIsFull) return false;
@ -485,7 +510,7 @@ bool RealQueue::dataChanged()
convert->copy(pvStructure,newElement->pvStructurePtr);
newElement->changedBitSet->clear();
newElement->overrunBitSet->clear();
monitorLocal->getPVCopyMonitor()->switchBitSets(
ml->getPVCopyMonitor()->switchBitSets(
newElement->changedBitSet,newElement->overrunBitSet);
queue.setUsed(monitorElement);
monitorElement = newElement;
@ -494,13 +519,11 @@ bool RealQueue::dataChanged()
MonitorElementPtr RealQueue::poll()
{
Lock xx(mutex);
return queue.getUsed();
}
void RealQueue::release(MonitorElementPtr const &currentElement)
{
Lock xx(mutex);
queue.releaseUsed(currentElement);
queueIsFull = false;
}

View File

@ -16,6 +16,7 @@
#include <pv/thread.h>
#include <pv/channelProviderLocal.h>
#include <pv/pvCopy.h>
namespace epics { namespace pvDatabase {
@ -121,6 +122,11 @@ size_t PVCopy::getCopyOffset(PVRecordFieldPtr const &recordPVField)
if((recordNode->recordPVField.get())==recordPVField.get()) {
return headNode->structureOffset;
}
PVStructure * parent = recordPVField->getPVField()->getParent();
size_t offsetParent = parent->getFieldOffset();
size_t off = recordPVField->getPVField()->getFieldOffset();
size_t offdiff = off -offsetParent;
if(offdiff<recordNode->nfields) return headNode->structureOffset + offdiff;
return String::npos;
}
CopyStructureNodePtr node = static_pointer_cast<CopyStructureNode>(headNode);
@ -977,10 +983,10 @@ void PVCopyMonitor::startMonitoring(
this->changeBitSet = changeBitSet;
this->overrunBitSet = overrunBitSet;
isGroupPut = false;
pvRecord->addListener(getPtrSelf());
addListener(headNode);
pvRecord->lock();
try {
pvRecord->addListener(getPtrSelf());
addListener(headNode);
changeBitSet->clear();
overrunBitSet->clear();
changeBitSet->set(0);