From 0627668090724e368ddc1889d4fd2f111df8c774 Mon Sep 17 00:00:00 2001 From: Dhanya Maliakal Date: Wed, 11 Nov 2015 15:08:50 +0100 Subject: [PATCH] some changes for memory leak --- .../include/UDPStandardImplementation.h | 5 +- slsReceiverSoftware/include/genericSocket.h | 32 ++-- .../src/UDPStandardImplementation.cpp | 163 ++++++++++++------ .../src/slsReceiverTCPIPInterface.cpp | 30 ++-- 4 files changed, 146 insertions(+), 84 deletions(-) diff --git a/slsReceiverSoftware/include/UDPStandardImplementation.h b/slsReceiverSoftware/include/UDPStandardImplementation.h index d478f0e7a..a1cad30e9 100644 --- a/slsReceiverSoftware/include/UDPStandardImplementation.h +++ b/slsReceiverSoftware/include/UDPStandardImplementation.h @@ -379,11 +379,10 @@ private: * @param wbuffer the buffer array that is popped from all the FIFOs * @param ready if that FIFO is allowed to pop (depends on if dummy buffer already popped/ waiting for other FIFO to finish a frame(eiger)) * @param nP number of packets in the buffer popped out - * @param toFree array of addresses to pop into fifoFree (eiger specific) - * @param toFreeOffset the number of addresses to free for each FIFO (eiger specific) + * @param fifoTempFree circular fifo to save addresses of packets adding upto a frame before pushing into fifofree (eiger specific) * @return true if end of acquisition else false */ - bool popAndCheckEndofAcquisition(int ithread, char* wbuffer[], bool ready[], uint32_t nP[],char* toFree[],int toFreeOffset[]); + bool popAndCheckEndofAcquisition(int ithread, char* wbuffer[], bool ready[], uint32_t nP[],CircularFifo* fifoTempFree[]); /** * Called by processWritingBuffer and processWritingBufferPacketByPacket diff --git a/slsReceiverSoftware/include/genericSocket.h b/slsReceiverSoftware/include/genericSocket.h index de49d7860..b1b5e8f62 100644 --- a/slsReceiverSoftware/include/genericSocket.h +++ b/slsReceiverSoftware/include/genericSocket.h @@ -116,6 +116,11 @@ typedef struct // serverAddress = {0}; // clientAddress = {0}; // strcpy(hostname,host_ip_or_name); + + strcpy(lastClientIP,"none"); + strcpy(thisClientIP,"none1"); + strcpy(dummyClientIP,"dummy"); + struct hostent *hostInfo = gethostbyname(host_ip_or_name); if (hostInfo == NULL){ cerr << "Exiting: Problem interpreting host: " << host_ip_or_name << "\n"; @@ -170,16 +175,16 @@ typedef struct nsent(0), total_sent(0) { - //memset(&serverAddress, 0, sizeof(sockaddr_in)); - // memset(&clientAddress, 0, sizeof(sockaddr_in)); - // serverAddress = {0}; - // clientAddress = {0}; -/* // you can specify an IP address: */ -/* */ +/* // you can specify an IP address: */ /* // or you can let it automatically select one: */ /* myaddr.sin_addr.s_addr = INADDR_ANY; */ + + strcpy(lastClientIP,"none"); + strcpy(thisClientIP,"none1"); + strcpy(dummyClientIP,"dummy"); + if(serverAddress.sin_port == htons(port_number)){ socketDescriptor = -10; return; @@ -592,6 +597,15 @@ typedef struct length-=nsent; total_sent+=nsent; } + + if (total_sent>0) + strcpy(thisClientIP,dummyClientIP); + + if (strcmp(lastClientIP,thisClientIP)) + differentClients=1; + else + differentClients=0; + break; case UDP: if (socketDescriptor<0) return -1; @@ -641,13 +655,7 @@ typedef struct #ifdef VERY_VERBOSE cout << "sent "<< total_sent << " Bytes" << endl; #endif - if (total_sent>0) - strcpy(thisClientIP,dummyClientIP); - if (strcmp(lastClientIP,thisClientIP)) - differentClients=1; - else - differentClients=0; return total_sent; diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index b12826f16..266f3e38c 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -152,7 +152,7 @@ void UDPStandardImplementation::initializeMembers(){ previousFrameNumber = -1; acqStarted = false; measurementStarted = false; - for(int i = 0; i < numberofListeningThreads; ++i) + for(int i = 0; i < MAX_NUMBER_OF_LISTENING_THREADS; ++i) totalListeningFrameCount[i] = 0; packetsInFile = 0; numMissingPackets = 0; @@ -309,6 +309,7 @@ int UDPStandardImplementation::setupFifoStructure(){ return OK; + int count = 0; //set up fifo structure for(int i=0;iisEmpty()) + fifo[i]->pop(buffer[i]); + delete fifo[i]; + } if(mem0[i]) free(mem0[i]); //creating fifoFree[i] = new CircularFifo(fifoSize); fifo[i] = new CircularFifo(fifoSize); + //cout<<"buffersize:"<push(buffer[i]); -#ifdef DEBUG5 +//#ifdef DEBUG5 + + if(count==0 || count == 127998) cprintf(BLUE,"Info: %d fifostructure free pushed into fifofree %p\n", i, (void*)(buffer[i])); -#endif +//#endif buffer[i] += (bufferSize * numberofJobsPerBuffer + HEADER_SIZE_NUM_TOT_PACKETS); + count++; } + cout<pop(buffer[ithread]); -#ifdef DEBUG5 - cprintf(BLUE,"Listening_Thread %d :Listener popped from fifofree %p\n", ithread, (void*)(buffer[ithread])); +#ifdef CFIFODEBUG + if(ithread == 0) + cprintf(CYAN,"Listening_Thread %d :Listener popped from fifofree %p\n", ithread, (void*)(buffer[ithread])); + else + cprintf(YELLOW,"Listening_Thread %d :Listener popped from fifofree %p\n", ithread, (void*)(buffer[ithread])); #endif //udpsocket doesnt exist @@ -1489,8 +1504,12 @@ void UDPStandardImplementation::startListening(){ //push buffer to FIFO while(!fifo[ithread]->push(buffer[ithread])); -#ifdef DEBUG5 - cprintf(BLUE,"Listening_Thread %d: Listener pushed into fifo %p\n",ithread, (void*)(buffer[ithread])); +#ifdef CFIFODEBUG + if(ithread == 0) + cprintf(CYAN,"Listening_Thread %d: Listener pushed into fifo %p\n",ithread, (void*)(buffer[ithread])); + else + cprintf(YELLOW,"Listening_Thread %d: Listener pushed into fifo %p\n",ithread, (void*)(buffer[ithread])); + #endif }/*--end of loop for each buffer (inner loop)*/ @@ -1517,7 +1536,8 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int lSize, in FILE_LOG(logDEBUG) << __AT__ << " called"; //listen to UDP packets - memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, temp, cSize); + if(cSize) + memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, temp, cSize); int receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, lSize + cSize); //throw away packets that is not one packet size, need to check status if socket is shut down @@ -1606,8 +1626,11 @@ void UDPStandardImplementation::stopListening(int ithread, int numbytes){ if(numbytes <= 0){ cprintf(BLUE,"Listening_Thread %d :End of Acquisition\n", ithread); while(!fifoFree[ithread]->push(buffer[ithread])); -#ifdef DEBUG5 - cprintf(BLUE,"Listening_Thread %d :Listener push empty buffer into fifofree %p\n", ithread, (void*)(buffer[ithread])); +#ifdef CFIFODEBUG + if(ithread == 0) + cprintf(CYAN,"Listening_Thread %d :Listener push empty buffer into fifofree %p\n", ithread, (void*)(buffer[ithread])); + else + cprintf(YELLOW,"Listening_Thread %d :Listener push empty buffer into fifofree %p\n", ithread, (void*)(buffer[ithread])); #endif } @@ -1621,19 +1644,31 @@ void UDPStandardImplementation::stopListening(int ithread, int numbytes){ cprintf(BLUE,"Listening_Thread %d: Last Buffer packet count:%d\n",ithread, numbytes/onePacketSize); #endif while(!fifo[ithread]->push(buffer[ithread])); -#ifdef DEBUG5 - cprintf(BLUE,"Listening_Thread %d: Listener Last Buffer pushed into fifo %p\n", ithread,(void*)(buffer[ithread])); +#ifdef CFIFODEBUG + if(ithread == 0) + cprintf(CYAN,"Listening_Thread %d: Listener Last Buffer pushed into fifo %p\n", ithread,(void*)(buffer[ithread])); + else + cprintf(YELLOW,"Listening_Thread %d: Listener Last Buffer pushed into fifo %p\n", ithread,(void*)(buffer[ithread])); #endif } //push dummy-end buffer into fifo for all writer threads for(int i=0; ipop(buffer[ithread]); +#ifdef CFIFODEBUG + if(ithread == 0) + cprintf(CYAN,"Listening_Thread %d: Popped Dummy from fifoFree %p\n", ithread,(void*)(buffer[ithread])); + else + cprintf(YELLOW,"Listening_Thread %d: Popped Dummy from fifoFree %p\n", ithread,(void*)(buffer[ithread])); +#endif //creating dummy-end buffer with pc=0xFFFF (*((uint32_t*)(buffer[ithread]))) = dummyPacketValue; while(!fifo[ithread]->push(buffer[ithread])); -#ifdef DEBUG5 - cprintf(BLUE,"Listening_Thread %d: Listener pushed dummy-end buffer into fifo %p\n", ithread,(void*)(buffer[ithread])); +#ifdef CFIFODEBUG + if(ithread == 0) + cprintf(CYAN,"Listening_Thread %d: Listener pushed dummy-end buffer into fifo %p\n", ithread,(void*)(buffer[ithread])); + else + cprintf(YELLOW,"Listening_Thread %d: Listener pushed dummy-end buffer into fifo %p\n", ithread,(void*)(buffer[ithread])); #endif } @@ -1852,8 +1887,9 @@ void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){ int MAX_NUM_PACKETS = 1024; //highest 32 bit has 1024 number of packets uint32_t LAST_PACKET_VALUE; //last packet number - char* toFreePointers[MAX_NUM_PACKETS]; //pointers to free for each frame - int toFreePointersOffset[numberofListeningThreads]; //offset of pointers to free added for each thread + + CircularFifo* fifoTempFree[numberofListeningThreads];//ciruclar fifo to keep track of one frame packets to be freed and reused later + char* temp = NULL; char* frameBuffer[MAX_NUM_PACKETS]; //buffer offset created for a whole frame int frameBufferoffset[numberofListeningThreads]; //buffer offset created for a whole frame for both listening threads @@ -1868,34 +1904,42 @@ void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){ volatile int numberofMissingPackets[numberofListeningThreads]; // number of missing packets in this buffer for(int i=0; iisEmpty()) + fifoTempFree[i]->pop(temp); + delete fifoTempFree[i]; + } + fifoTempFree[i] = new CircularFifo(MAX_NUM_PACKETS); } - presentFrameNumber = 0; - //blank frame - initializing with missing packet values - blankoffset = 0; for(uint32_t i=0; imissingPacket)==missingPacketValue) cprintf(RED,"Found missing packet at pnum %d\n",j); } } -#endif +//#endif //write and copy to gui handleWithoutDataCompression(ithread,frameBuffer,packetsPerFrame); //freeing - for(int j=0;jpush(toFreePointers[j])); -#ifdef DEBUG5 - cprintf(GREEN,"Fifo 0: Writing_Thread freed: pushed into fifofree %p\n",ithread, (void*)(toFreePointers[j])); -#endif - } - for(int j=(packetsPerFrame/numberofListeningThreads);jpush(toFreePointers[j])); -#ifdef DEBUG5 - cprintf(GREEN,"Fifo 1: Writing_Thread freed: pushed into fifofree %p\n",ithread, (void*)(toFreePointers[j])); + for(int i=0; iisEmpty()){ + fifoTempFree[i]->pop(temp); + fifoFree[i]->push(temp); + count++; +#ifdef CFIFODEBUG + if(i==0) + cprintf(CYAN,"Fifo %d: %d Writing_Thread freed: pushed into fifofree %p\n",i,count, (void*)(temp)); + else + cprintf(YELLOW,"Fifo %d: %d Writing_Thread freed: pushed into fifofree %p\n",i, count,(void*)(temp)); #endif + } } + + + #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: finished freeing\n"); #endif @@ -2113,7 +2158,6 @@ void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){ if((numPackets[i] != dummyPacketValue) && (currentPacketNumber[i] == LAST_PACKET_VALUE)) popReady[i] = true; frameBufferoffset[i] = (i*packetsPerFrame/numberofListeningThreads); - toFreePointersOffset[i] = (i*packetsPerFrame/numberofListeningThreads); blankoffset = 0; lastPacketNumber[i] = 0; currentPacketNumber[i] = 0; @@ -2202,7 +2246,7 @@ void UDPStandardImplementation::waitWritingBufferForNextAcquisition(int ithread) } -bool UDPStandardImplementation::popAndCheckEndofAcquisition(int ithread, char* wbuffer[], bool ready[], uint32_t nP[],char* toFree[],int toFreeOffset[]){ +bool UDPStandardImplementation::popAndCheckEndofAcquisition(int ithread, char* wbuffer[], bool ready[], uint32_t nP[],CircularFifo* fifoTempFree[]){ FILE_LOG(logDEBUG) << __AT__ << " called"; bool endofAcquisition = true; @@ -2210,8 +2254,11 @@ bool UDPStandardImplementation::popAndCheckEndofAcquisition(int ithread, char* w //pop if ready if(ready[i]){ fifo[i]->pop(wbuffer[i]); -#ifdef DEBUG5 - cprintf(GREEN,"Writing_Thread %d: Popped %p from FIFO %d\n", ithread, (void*)(wbuffer[i]),i); +#ifdef CFIFODEBUG + if(i == 0) + cprintf(CYAN,"Writing_Thread %d: Popped %p from FIFO %d\n", ithread, (void*)(wbuffer[i]),i); + else + cprintf(YELLOW,"Writing_Thread %d: Popped %p from FIFO %d\n", ithread, (void*)(wbuffer[i]),i); #endif nP[i] = (uint32_t)(*((uint32_t*)wbuffer[i])); #ifdef DEBUG4 @@ -2239,8 +2286,7 @@ bool UDPStandardImplementation::popAndCheckEndofAcquisition(int ithread, char* w } #endif if(myDetectorType == EIGER){ - toFree[toFreeOffset[i]] = wbuffer[i]; - toFreeOffset[i]++; + while(!fifoTempFree[i]->push(wbuffer[i])); } } } @@ -2264,8 +2310,11 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer[]){ //free fifo for(int i=0; ipush(wbuffer[i])); -#ifdef DEBUG5 - cprintf(GREEN,"Writing_Thread %d: Freeing dummy-end buffer. Pushed into fifofree %p for listener %d\n", ithread,(void*)(wbuffer[i]),i); +#ifdef CFIFODEBUG + if(i==0) + cprintf(CYAN,"Writing_Thread %d: Freeing dummy-end buffer. Pushed into fifofree %p for listener %d\n", ithread,(void*)(wbuffer[i]),i); + else + cprintf(YELLOW,"Writing_Thread %d: Freeing dummy-end buffer. Pushed into fifofree %p for listener %d\n", ithread,(void*)(wbuffer[i]),i); #endif } @@ -2478,14 +2527,14 @@ void UDPStandardImplementation::writeFileWithoutCompression(char* wbuffer[],uint void UDPStandardImplementation::createHeaders(char* wbuffer[]){ - eiger_packet_header_t* wbuf_header=0; - eiger_packet_footer_t* wbuf_footer=0; + int port = 0, missingPacket; for (uint32_t i = 0; i < packetsPerFrame; i++){ - wbuf_header = (eiger_packet_header_t*) wbuffer[i]; - wbuf_footer = (eiger_packet_footer_t*)(wbuffer[i] + footerOffset); + + eiger_packet_header_t* wbuf_header = (eiger_packet_header_t*) wbuffer[i]; + eiger_packet_footer_t* wbuf_footer = (eiger_packet_footer_t*)(wbuffer[i] + footerOffset); #ifdef DEBUG4 cprintf(GREEN, "Loop index:%d Pnum:%d\n",i,*( (uint16_t*) wbuf_footer->packetNumber)); #endif diff --git a/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp b/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp index 1a6f9860c..4e32516fa 100644 --- a/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp +++ b/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp @@ -441,6 +441,7 @@ int slsReceiverTCPIPInterface::set_file_name() { #endif #endif + if(ret==OK && socket->differentClients){ FILE_LOG(logDEBUG) << "Force update"; ret=FORCE_UPDATE; @@ -451,12 +452,14 @@ int slsReceiverTCPIPInterface::set_file_name() { if(ret==FAIL){ cprintf(RED, "%s\n", mess); socket->SendDataOnly(mess,sizeof(mess)); + } + if(retval == NULL) socket->SendDataOnly(defaultVal,MAX_STR_LENGTH); - }else + else{ socket->SendDataOnly(retval,MAX_STR_LENGTH); + delete[] retval; + } - //free - if(retval != NULL) delete[] retval; //return ok/fail return ret; @@ -523,12 +526,13 @@ int slsReceiverTCPIPInterface::set_file_dir() { if(ret==FAIL){ cprintf(RED, "%s\n", mess); socket->SendDataOnly(mess,sizeof(mess)); + } + if(retval == NULL) socket->SendDataOnly(defaultVal,MAX_STR_LENGTH); - }else + else{ socket->SendDataOnly(retval,MAX_STR_LENGTH); - - //free - if(retval != NULL) delete[] retval; + delete[] retval; + } //return ok/fail return ret; @@ -2244,13 +2248,15 @@ int slsReceiverTCPIPInterface::set_detector_hostname() { // send answer socket->SendDataOnly(&ret,sizeof(ret)); if(ret==FAIL){ - cprintf(RED,"%s\n",mess); + cprintf(RED, "%s\n", mess); socket->SendDataOnly(mess,sizeof(mess)); + } + if(retval == NULL) socket->SendDataOnly(defaultVal,MAX_STR_LENGTH); - }else socket->SendDataOnly(retval,MAX_STR_LENGTH); - - //free - if(retval!=NULL) delete[] retval; + else{ + socket->SendDataOnly(retval,MAX_STR_LENGTH); + delete[] retval; + } //return ok/fail return ret;