memory leak fixed

This commit is contained in:
Dhanya Maliakal 2015-11-12 13:49:09 +01:00
parent 24438419d5
commit 53f11c499d
4 changed files with 77 additions and 83 deletions

View File

@ -78,8 +78,10 @@ int CircularFifo<Element>::getSemValue()
template<typename Element> template<typename Element>
bool CircularFifo<Element>::push(Element*& item_) bool CircularFifo<Element>::push(Element*& item_)
{ {
//cout<<"*head:"<<head<<endl;
//cout<<"*tail before"<<tail<<endl;
unsigned int nextTail = increment(tail); unsigned int nextTail = increment(tail);
//cout<<"*next tail"<<nextTail<<endl;
if(nextTail != head) if(nextTail != head)
{ {
array[tail] = item_; array[tail] = item_;
@ -101,12 +103,15 @@ bool CircularFifo<Element>::push(Element*& item_)
template<typename Element> template<typename Element>
bool CircularFifo<Element>::pop(Element*& item_) bool CircularFifo<Element>::pop(Element*& item_)
{ {
// if(head == tail) //cout<<"-tail:"<<tail<<endl;
//cout<<"-head before:"<<head<<endl;
//if(head == tail)
// return false; // empty queue // return false; // empty queue
sem_wait(&free_mutex); sem_wait(&free_mutex);
item_ = array[head]; item_ = array[head];
head = increment(head); head = increment(head);
//cout<<"-head after:"<<head<<endl;
return true; return true;
} }

View File

@ -612,11 +612,11 @@ typedef struct
//if length given, listens to length, else listens for packetsize till length is reached //if length given, listens to length, else listens for packetsize till length is reached
if(length){ if(length){
/*int k = 0;*/ /*int k = 0;*/
while(length>0){ while(length>0){
nsending = (length>packet_size) ? packet_size:length; nsending = (length>packet_size) ? packet_size:length;
/* /*
//created for debugging on 11.05.2015 //created for debugging on 11.05.2015
nsending=5000; nsending=5000;
nsent = recvfrom(socketDescriptor,(char*)buf,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length); nsent = recvfrom(socketDescriptor,(char*)buf,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length);
@ -633,8 +633,6 @@ typedef struct
else else
k++; k++;
*/ */
nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length); nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length);
if(!nsent) break; if(!nsent) break;
length-=nsent; length-=nsent;
@ -646,6 +644,7 @@ typedef struct
//normal //normal
nsending=packet_size; nsending=packet_size;
nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length); nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length);
//nsent = 1040;
total_sent+=nsent; total_sent+=nsent;
} }
break; break;
@ -714,21 +713,12 @@ typedef struct
protected: protected:
communicationProtocol protocol; communicationProtocol protocol;
int is_a_server; int is_a_server;
int socketDescriptor; int socketDescriptor;
int file_des; int file_des;
int packet_size; int packet_size;
struct sockaddr_in clientAddress, serverAddress; struct sockaddr_in clientAddress, serverAddress;
socklen_t clientAddress_length; socklen_t clientAddress_length;
char dummyClientIP[INET_ADDRSTRLEN]; char dummyClientIP[INET_ADDRSTRLEN];

View File

@ -111,7 +111,7 @@
#define EIGER_MAX_PORTS 2 #define EIGER_MAX_PORTS 2
#define EIGER_HEADER_LENGTH 48 #define EIGER_HEADER_LENGTH 48
#define EIGER_FIFO_SIZE 250 //cannot be less than max jobs per thread = 1000 #define EIGER_FIFO_SIZE 100
/*#define EIGER_ALIGNED_FRAME_SIZE 65536*/ /*#define EIGER_ALIGNED_FRAME_SIZE 65536*/
#define EIGER_ONE_GIGA_CONSTANT 16 #define EIGER_ONE_GIGA_CONSTANT 16
#define EIGER_TEN_GIGA_CONSTANT 4 #define EIGER_TEN_GIGA_CONSTANT 4

View File

@ -309,7 +309,6 @@ int UDPStandardImplementation::setupFifoStructure(){
return OK; return OK;
int count = 0;
//set up fifo structure //set up fifo structure
for(int i=0;i<numberofListeningThreads;i++){ for(int i=0;i<numberofListeningThreads;i++){
@ -343,17 +342,14 @@ int UDPStandardImplementation::setupFifoStructure(){
} }
//push free address into fifoFree //push free address into fifoFree
count = 0;
buffer[i]=mem0[i]; buffer[i]=mem0[i];
while (buffer[i] < (mem0[i]+(bufferSize * numberofJobsPerBuffer + HEADER_SIZE_NUM_TOT_PACKETS) * (fifoSize-1))) { while (buffer[i] < (mem0[i]+(bufferSize * numberofJobsPerBuffer + HEADER_SIZE_NUM_TOT_PACKETS) * (fifoSize-1))) {
fifoFree[i]->push(buffer[i]); fifoFree[i]->push(buffer[i]);
//#ifdef DEBUG5 sprintf(buffer[i],"mem%d",i);
#ifdef DEBUG5
if(count==0 || count == 127998)
cprintf(BLUE,"Info: %d fifostructure free pushed into fifofree %p\n", i, (void*)(buffer[i])); cprintf(BLUE,"Info: %d fifostructure free pushed into fifofree %p\n", i, (void*)(buffer[i]));
//#endif #endif
buffer[i] += (bufferSize * numberofJobsPerBuffer + HEADER_SIZE_NUM_TOT_PACKETS); buffer[i] += (bufferSize * numberofJobsPerBuffer + HEADER_SIZE_NUM_TOT_PACKETS);
count++;
} }
} }
FILE_LOG(logDEBUG) << "Info: Fifo structure(s) reconstructed"; FILE_LOG(logDEBUG) << "Info: Fifo structure(s) reconstructed";
@ -1461,6 +1457,7 @@ void UDPStandardImplementation::startListening(){
//until mask unset (udp sockets shut down by client) //until mask unset (udp sockets shut down by client)
while((1 << ithread) & listeningThreadsMask){ while((1 << ithread) & listeningThreadsMask){
//pop from fifo //pop from fifo
fifoFree[ithread]->pop(buffer[ithread]); fifoFree[ithread]->pop(buffer[ithread]);
#ifdef CFIFODEBUG #ifdef CFIFODEBUG
@ -1479,6 +1476,7 @@ void UDPStandardImplementation::startListening(){
rc = prepareAndListenBuffer(ithread, listenSize, carryonBufferSize, tempBuffer); rc = prepareAndListenBuffer(ithread, listenSize, carryonBufferSize, tempBuffer);
//start indices for each start of scan/acquisition //start indices for each start of scan/acquisition
if((!measurementStarted) && (rc > 0)){ if((!measurementStarted) && (rc > 0)){
pthread_mutex_lock(&progressMutex); pthread_mutex_lock(&progressMutex);
@ -1487,12 +1485,12 @@ void UDPStandardImplementation::startListening(){
pthread_mutex_unlock(&progressMutex); pthread_mutex_unlock(&progressMutex);
} }
//problem in receiving or end of acquisition //problem in receiving or end of acquisition
if (status == TRANSMITTING){ if (status == TRANSMITTING){
stopListening(ithread,rc); stopListening(ithread,rc);
continue; continue;
} }
//write packet count to buffer //write packet count to buffer
if(myDetectorType == EIGER) if(myDetectorType == EIGER)
(*((uint32_t*)(buffer[ithread]))) = 1; (*((uint32_t*)(buffer[ithread]))) = 1;
@ -1503,6 +1501,7 @@ void UDPStandardImplementation::startListening(){
//push buffer to FIFO //push buffer to FIFO
while(!fifo[ithread]->push(buffer[ithread])); while(!fifo[ithread]->push(buffer[ithread]));
#ifdef CFIFODEBUG #ifdef CFIFODEBUG
if(ithread == 0) if(ithread == 0)
cprintf(CYAN,"Listening_Thread %d: Listener pushed into fifo %p\n",ithread, (void*)(buffer[ithread])); cprintf(CYAN,"Listening_Thread %d: Listener pushed into fifo %p\n",ithread, (void*)(buffer[ithread]));
@ -1511,6 +1510,7 @@ void UDPStandardImplementation::startListening(){
#endif #endif
}/*--end of loop for each buffer (inner loop)*/ }/*--end of loop for each buffer (inner loop)*/
//end of acquisition, wait for next acquisition/change of parameters //end of acquisition, wait for next acquisition/change of parameters
@ -1537,6 +1537,7 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int lSize, in
//listen to UDP packets //listen to UDP packets
if(cSize) if(cSize)
memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, temp, cSize); memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, temp, cSize);
int receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, lSize + cSize); int receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, lSize + cSize);
//throw away packets that is not one packet size, need to check status if socket is shut down //throw away packets that is not one packet size, need to check status if socket is shut down
@ -1550,6 +1551,7 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int lSize, in
receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS); receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS);
} }
#ifdef MANUALDEBUG #ifdef MANUALDEBUG
eiger_packet_header_t* header = (eiger_packet_header_t*) (buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS); eiger_packet_header_t* header = (eiger_packet_header_t*) (buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS);
eiger_packet_footer_t* footer = (eiger_packet_footer_t*)(buffer[ithread] + footerOffset + HEADER_SIZE_NUM_TOT_PACKETS); eiger_packet_footer_t* footer = (eiger_packet_footer_t*)(buffer[ithread] + footerOffset + HEADER_SIZE_NUM_TOT_PACKETS);
@ -1961,7 +1963,6 @@ void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){
//until mask unset (udp sockets shut down by client) //until mask unset (udp sockets shut down by client)
while((1 << ithread) & writerThreadsMask){ while((1 << ithread) & writerThreadsMask){
//pop fifo and if end of acquisition //pop fifo and if end of acquisition
if(popAndCheckEndofAcquisition(ithread, packetBuffer, popReady, numPackets,fifoTempFree)){ if(popAndCheckEndofAcquisition(ithread, packetBuffer, popReady, numPackets,fifoTempFree)){
#ifdef DEBUG4 #ifdef DEBUG4
@ -2109,10 +2110,10 @@ void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){
currentFrameNumber = presentFrameNumber; currentFrameNumber = presentFrameNumber;
numTotMissingPacketsInFile += numMissingPackets; numTotMissingPacketsInFile += numMissingPackets;
numTotMissingPackets += numMissingPackets; numTotMissingPackets += numMissingPackets;
//#ifdef FNUM_DEBUG #ifdef FNUM_DEBUG
cprintf(GREEN,"**fnum:%d**\n",currentFrameNumber); cprintf(GREEN,"**fnum:%d**\n",currentFrameNumber);
//#endif #endif
//#ifdef MISSINGP_DEBUG #ifdef MISSINGP_DEBUG
if(numMissingPackets){ if(numMissingPackets){
cprintf(RED, "Total missing packets %d for fnum %d\n",numMissingPackets,currentFrameNumber); cprintf(RED, "Total missing packets %d for fnum %d\n",numMissingPackets,currentFrameNumber);
for (int j=0;j<packetsPerFrame;++j){ for (int j=0;j<packetsPerFrame;++j){
@ -2121,23 +2122,21 @@ void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){
cprintf(RED,"Found missing packet at pnum %d\n",j); cprintf(RED,"Found missing packet at pnum %d\n",j);
} }
} }
//#endif #endif
//write and copy to gui //write and copy to gui
handleWithoutDataCompression(ithread,frameBuffer,packetsPerFrame); handleWithoutDataCompression(ithread,frameBuffer,packetsPerFrame);
//freeing //freeing
for(int i=0; i<numberofListeningThreads; ++i){ for(int i=0; i<numberofListeningThreads; ++i){
int count =0;
while(!fifoTempFree[i]->isEmpty()){ while(!fifoTempFree[i]->isEmpty()){
fifoTempFree[i]->pop(temp); fifoTempFree[i]->pop(temp);
fifoFree[i]->push(temp); fifoFree[i]->push(temp);
count++;
#ifdef CFIFODEBUG #ifdef CFIFODEBUG
if(i==0) if(i==0)
cprintf(CYAN,"Fifo %d: %d Writing_Thread freed: pushed into fifofree %p\n",i,count, (void*)(temp)); cprintf(CYAN,"Fifo %d: Writing_Thread freed: pushed into fifofree %p\n",i, (void*)(temp));
else else
cprintf(YELLOW,"Fifo %d: %d Writing_Thread freed: pushed into fifofree %p\n",i, count,(void*)(temp)); cprintf(YELLOW,"Fifo %d: Writing_Thread freed: pushed into fifofree %p\n",i, (void*)(temp));
#endif #endif
} }
} }