diff --git a/slsReceiverSoftware/include/circularFifo.h b/slsReceiverSoftware/include/circularFifo.h index 1ba584cc4..4234a4d5d 100644 --- a/slsReceiverSoftware/include/circularFifo.h +++ b/slsReceiverSoftware/include/circularFifo.h @@ -78,8 +78,10 @@ int CircularFifo::getSemValue() template bool CircularFifo::push(Element*& item_) { - + //cout<<"*head:"<::push(Element*& item_) template bool CircularFifo::pop(Element*& item_) { - // if(head == tail) - // return false; // empty queue + //cout<<"-tail:"<0){ - nsending = (length>packet_size) ? packet_size:length; - nsent = read(file_des,(char*)buf+total_sent,nsending); - if(!nsent) break; - length-=nsent; - total_sent+=nsent; - } + if (buf==NULL) return -1; - if (total_sent>0) - strcpy(thisClientIP,dummyClientIP); - if (strcmp(lastClientIP,thisClientIP)) - differentClients=1; - else - differentClients=0; + total_sent=0; - break; - case UDP: - if (socketDescriptor<0) return -1; + switch(protocol) { + case TCP: + if (file_des<0) return -1; + while(length>0){ + nsending = (length>packet_size) ? packet_size:length; + nsent = read(file_des,(char*)buf+total_sent,nsending); + if(!nsent) break; + length-=nsent; + total_sent+=nsent; + } - //if length given, listens to length, else listens for packetsize till length is reached - if(length){ -/*int k = 0;*/ + if (total_sent>0) + strcpy(thisClientIP,dummyClientIP); - while(length>0){ - nsending = (length>packet_size) ? packet_size:length; -/* + if (strcmp(lastClientIP,thisClientIP)) + differentClients=1; + else + differentClients=0; + + break; + case UDP: + if (socketDescriptor<0) return -1; + + //if length given, listens to length, else listens for packetsize till length is reached + if(length){ + /*int k = 0;*/ + + while(length>0){ + nsending = (length>packet_size) ? packet_size:length; + /* //created for debugging on 11.05.2015 nsending=5000; nsent = recvfrom(socketDescriptor,(char*)buf,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length); @@ -632,33 +632,32 @@ typedef struct } else k++; - */ - - - nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length); - if(!nsent) break; - length-=nsent; - total_sent+=nsent; - } - } - //listens to only 1 packet - else{ - //normal - nsending=packet_size; - nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length); - total_sent+=nsent; - } - break; - default: - ; - } + */ + nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length); + if(!nsent) break; + length-=nsent; + total_sent+=nsent; + } + } + //listens to only 1 packet + else{ + //normal + nsending=packet_size; + nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length); + //nsent = 1040; + total_sent+=nsent; + } + break; + default: + ; + } #ifdef VERY_VERBOSE - cout << "sent "<< total_sent << " Bytes" << endl; + cout << "sent "<< total_sent << " Bytes" << endl; #endif - return total_sent; - + return total_sent; + } @@ -714,21 +713,12 @@ typedef struct protected: communicationProtocol protocol; - - - int is_a_server; - - int socketDescriptor; int file_des; - int packet_size; - struct sockaddr_in clientAddress, serverAddress; socklen_t clientAddress_length; - - char dummyClientIP[INET_ADDRSTRLEN]; diff --git a/slsReceiverSoftware/include/receiver_defs.h b/slsReceiverSoftware/include/receiver_defs.h index bac0df2d6..c11fe876d 100755 --- a/slsReceiverSoftware/include/receiver_defs.h +++ b/slsReceiverSoftware/include/receiver_defs.h @@ -111,7 +111,7 @@ #define EIGER_MAX_PORTS 2 #define EIGER_HEADER_LENGTH 48 -#define EIGER_FIFO_SIZE 250 //cannot be less than max jobs per thread = 1000 +#define EIGER_FIFO_SIZE 100 /*#define EIGER_ALIGNED_FRAME_SIZE 65536*/ #define EIGER_ONE_GIGA_CONSTANT 16 #define EIGER_TEN_GIGA_CONSTANT 4 diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index 8d74dfa90..68ba62a68 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -309,7 +309,6 @@ int UDPStandardImplementation::setupFifoStructure(){ return OK; - int count = 0; //set up fifo structure for(int i=0;ipush(buffer[i]); -//#ifdef DEBUG5 - - if(count==0 || count == 127998) + sprintf(buffer[i],"mem%d",i); +#ifdef DEBUG5 cprintf(BLUE,"Info: %d fifostructure free pushed into fifofree %p\n", i, (void*)(buffer[i])); -//#endif +#endif buffer[i] += (bufferSize * numberofJobsPerBuffer + HEADER_SIZE_NUM_TOT_PACKETS); - count++; } } FILE_LOG(logDEBUG) << "Info: Fifo structure(s) reconstructed"; @@ -1461,6 +1457,7 @@ void UDPStandardImplementation::startListening(){ //until mask unset (udp sockets shut down by client) while((1 << ithread) & listeningThreadsMask){ + //pop from fifo fifoFree[ithread]->pop(buffer[ithread]); #ifdef CFIFODEBUG @@ -1479,6 +1476,7 @@ void UDPStandardImplementation::startListening(){ rc = prepareAndListenBuffer(ithread, listenSize, carryonBufferSize, tempBuffer); + //start indices for each start of scan/acquisition if((!measurementStarted) && (rc > 0)){ pthread_mutex_lock(&progressMutex); @@ -1487,12 +1485,12 @@ void UDPStandardImplementation::startListening(){ pthread_mutex_unlock(&progressMutex); } + //problem in receiving or end of acquisition if (status == TRANSMITTING){ stopListening(ithread,rc); continue; } - //write packet count to buffer if(myDetectorType == EIGER) (*((uint32_t*)(buffer[ithread]))) = 1; @@ -1503,6 +1501,7 @@ void UDPStandardImplementation::startListening(){ //push buffer to FIFO while(!fifo[ithread]->push(buffer[ithread])); + #ifdef CFIFODEBUG if(ithread == 0) cprintf(CYAN,"Listening_Thread %d: Listener pushed into fifo %p\n",ithread, (void*)(buffer[ithread])); @@ -1511,6 +1510,7 @@ void UDPStandardImplementation::startListening(){ #endif + }/*--end of loop for each buffer (inner loop)*/ //end of acquisition, wait for next acquisition/change of parameters @@ -1537,6 +1537,7 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int lSize, in //listen to UDP packets if(cSize) memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, temp, cSize); + int receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, lSize + cSize); //throw away packets that is not one packet size, need to check status if socket is shut down @@ -1550,6 +1551,7 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int lSize, in receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS); } + #ifdef MANUALDEBUG eiger_packet_header_t* header = (eiger_packet_header_t*) (buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS); eiger_packet_footer_t* footer = (eiger_packet_footer_t*)(buffer[ithread] + footerOffset + HEADER_SIZE_NUM_TOT_PACKETS); @@ -1961,7 +1963,6 @@ void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){ //until mask unset (udp sockets shut down by client) while((1 << ithread) & writerThreadsMask){ - //pop fifo and if end of acquisition if(popAndCheckEndofAcquisition(ithread, packetBuffer, popReady, numPackets,fifoTempFree)){ #ifdef DEBUG4 @@ -2109,10 +2110,10 @@ void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){ currentFrameNumber = presentFrameNumber; numTotMissingPacketsInFile += numMissingPackets; numTotMissingPackets += numMissingPackets; -//#ifdef FNUM_DEBUG +#ifdef FNUM_DEBUG cprintf(GREEN,"**fnum:%d**\n",currentFrameNumber); -//#endif -//#ifdef MISSINGP_DEBUG +#endif +#ifdef MISSINGP_DEBUG if(numMissingPackets){ cprintf(RED, "Total missing packets %d for fnum %d\n",numMissingPackets,currentFrameNumber); for (int j=0;jisEmpty()){ fifoTempFree[i]->pop(temp); fifoFree[i]->push(temp); - count++; #ifdef CFIFODEBUG if(i==0) - cprintf(CYAN,"Fifo %d: %d Writing_Thread freed: pushed into fifofree %p\n",i,count, (void*)(temp)); + cprintf(CYAN,"Fifo %d: Writing_Thread freed: pushed into fifofree %p\n",i, (void*)(temp)); else - cprintf(YELLOW,"Fifo %d: %d Writing_Thread freed: pushed into fifofree %p\n",i, count,(void*)(temp)); + cprintf(YELLOW,"Fifo %d: Writing_Thread freed: pushed into fifofree %p\n",i, (void*)(temp)); #endif } }