From b9275646ad74d0ec3fb60d6a8f1c0aa2ec2287a9 Mon Sep 17 00:00:00 2001 From: Dhanya Maliakal Date: Thu, 27 Apr 2017 14:05:04 +0200 Subject: [PATCH] crazy amount of changes, both necessary and unnecessary;need to narrow down the real change later --- slsReceiverSoftware/include/genericSocket.h | 57 +++- .../src/UDPBaseImplementation.cpp | 101 ++++--- .../src/UDPStandardImplementation.cpp | 270 ++++++++++-------- 3 files changed, 256 insertions(+), 172 deletions(-) diff --git a/slsReceiverSoftware/include/genericSocket.h b/slsReceiverSoftware/include/genericSocket.h index 2f151de43..485e8903a 100644 --- a/slsReceiverSoftware/include/genericSocket.h +++ b/slsReceiverSoftware/include/genericSocket.h @@ -100,7 +100,6 @@ enum communicationProtocol{ genericSocket(const char* const host_ip_or_name, unsigned short int const port_number, communicationProtocol p, int ps = DEFAULT_PACKET_SIZE) : - // portno(port_number), protocol(p), is_a_server(0), socketDescriptor(-1), @@ -109,11 +108,15 @@ enum communicationProtocol{ nsending(0), nsent(0), total_sent(0),// sender (client): where to? ip - header_packet_size(0) + header_packet_size(0), + portno(port_number) { memset(&serverAddress, 0,sizeof(serverAddress)); memset(&clientAddress,0,sizeof(clientAddress)); // strcpy(hostname,host_ip_or_name); + memset(lastClientIP, 0, INET_ADDRSTRLEN); + memset(thisClientIP, 0, INET_ADDRSTRLEN); + memset(dummyClientIP, 0, INET_ADDRSTRLEN); strcpy(lastClientIP,"none"); strcpy(thisClientIP,"none1"); @@ -164,7 +167,6 @@ enum communicationProtocol{ */ genericSocket(unsigned short int const port_number, communicationProtocol p, int ps = DEFAULT_PACKET_SIZE, const char *eth=NULL, int hsize=0): - //portno(port_number), protocol(p), is_a_server(1), socketDescriptor(-1), @@ -173,7 +175,8 @@ enum communicationProtocol{ nsending(0), nsent(0), total_sent(0), - header_packet_size(hsize) + header_packet_size(hsize), + portno(port_number) { memset(&serverAddress, 0, sizeof(serverAddress)); @@ -181,7 +184,9 @@ enum communicationProtocol{ /* // you can specify an IP address: */ /* // or you can let it automatically select one: */ /* myaddr.sin_addr.s_addr = INADDR_ANY; */ - + memset(lastClientIP, 0, INET_ADDRSTRLEN); + memset(thisClientIP, 0, INET_ADDRSTRLEN); + memset(dummyClientIP, 0, INET_ADDRSTRLEN); strcpy(lastClientIP,"none"); strcpy(thisClientIP,"none1"); @@ -610,6 +615,7 @@ enum communicationProtocol{ break; case UDP: + if (socketDescriptor<0) return -1; //if length given, listens to length, else listens for packetsize till length is reached if(length){ @@ -632,12 +638,30 @@ enum communicationProtocol{ else{ //normal nsending=packet_size; + /* if(portno%2){ + cprintf(BLUE,"%d total_sent set to zero:%d\n",portno, total_sent);fflush(stdout); + }else{ + cprintf(GREEN,"%d total_sent set to zero:%d\n",portno, total_sent);fflush(stdout); + }*/ while(1){ nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length); - if(nsent<=0 || nsent == packet_size) + /* if(portno%2){ + cprintf(BLUE,"%d nsent:%d total_sent:%d\n", portno, nsent, total_sent);fflush(stdout); + }else{ + cprintf(GREEN,"%d nsent:%d total_sent:%d\n", portno, nsent, total_sent);fflush(stdout); + }*/ + if((nsent<=0) || (nsent == packet_size)) { + + /* if(portno%2){ + cprintf(BLUE,"%d breaking out of loop %d\n",portno, nsent);fflush(stdout); + }else{ + cprintf(GREEN,"%d breaking out of loop %d\n",portno, nsent);fflush(stdout); + }*/ break; - if(nsent != packet_size && nsent != header_packet_size) - cprintf(RED,"Incomplete Packet size %d\n",nsent); + } + if(nsent != packet_size && nsent != header_packet_size){ + cprintf(RED,"%d Incomplete Packet size %d\n",portno, nsent);fflush(stdout); + } } //nsent = 1040; total_sent+=nsent; @@ -649,8 +673,13 @@ enum communicationProtocol{ #ifdef VERY_VERBOSE cout << "sent "<< total_sent << " Bytes" << endl; #endif - - + /*if(protocol == UDP){ + if(portno%2){ + cprintf(BLUE,"%d exiting total sent %d\n",portno, total_sent);fflush(stdout); + }else{ + cprintf(GREEN,"%d exiting total sent %d\n",portno, total_sent);fflush(stdout); + } + }*/ return total_sent; @@ -723,11 +752,11 @@ enum communicationProtocol{ private: - - int nsending; - int nsent; - int total_sent; + volatile int nsending; + volatile int nsent; + volatile int total_sent; int header_packet_size; + const int portno; // pthread_mutex_t mp; diff --git a/slsReceiverSoftware/src/UDPBaseImplementation.cpp b/slsReceiverSoftware/src/UDPBaseImplementation.cpp index 2235e480d..c5c706ce1 100644 --- a/slsReceiverSoftware/src/UDPBaseImplementation.cpp +++ b/slsReceiverSoftware/src/UDPBaseImplementation.cpp @@ -35,7 +35,7 @@ void UDPBaseImplementation::initializeMembers(){ //**detector parameters*** myDetectorType = GENERIC; - strcpy(detHostname,""); + memset(detHostname,0,MAX_STR_LENGTH); packetsPerFrame = 0; acquisitionPeriod = 0; acquisitionTime = 0; @@ -51,14 +51,15 @@ void UDPBaseImplementation::initializeMembers(){ activated = true; //***connection parameters*** - strcpy(eth,""); + memset(eth,0,MAX_STR_LENGTH); for(int i=0;iShutDownSocket(); char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH); - sprintf(cstreambuf, "Shut down UDP Sock %d ", i); + snprintf(cstreambuf, MAX_STR_LENGTH, "Shut down UDP Sock %d ", i); FILE_LOG(logINFO, cstreambuf); delete udpSocket[i]; @@ -1267,7 +1279,7 @@ void UDPStandardImplementation::closeFile(int ithread){ if(!dataCompressionEnable){ if(sfilefd[ithread]){ char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH); - sprintf(cstreambuf, "Going to close file: %d ", fileno(sfilefd[ithread])); + snprintf(cstreambuf, MAX_STR_LENGTH, "Going to close file: %d ", fileno(sfilefd[ithread])); FILE_LOG(logDEBUG4, cstreambuf); fflush(sfilefd[ithread]); @@ -1281,7 +1293,7 @@ void UDPStandardImplementation::closeFile(int ithread){ #if (defined(MYROOT1) && defined(ALLFILE_DEBUG)) || !defined(MYROOT1) if(sfilefd[0]){ char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH); - sprintf(cstreambuf, "sfilefd: %d ", fileno(sfilefd[0])); + snprintf(cstreambuf, MAX_STR_LENGTH, "sfilefd: %d ", fileno(sfilefd[0])); FILE_LOG(logDEBUG4, cstreambuf); fclose(sfilefd[0]); @@ -1324,7 +1336,7 @@ int UDPStandardImplementation::setActivate(int enable){ if(enable != -1){ activated = enable; char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH); - sprintf(cstreambuf, "Activation: %s ", stringEnable(activated).c_str()); + snprintf(cstreambuf, MAX_STR_LENGTH, "Activation: %s ", stringEnable(activated).c_str()); FILE_LOG(logINFO, cstreambuf); } @@ -1382,7 +1394,7 @@ int UDPStandardImplementation::createDataCallbackThreads(bool destroy){ currentThreadIndex = i; if(pthread_create(&dataCallbackThreads[i], NULL,startDataCallbackThread, (void*) this)){ char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH); - sprintf(cstreambuf, "Could not create data call back thread with index %d ", i); + snprintf(cstreambuf, MAX_STR_LENGTH, "Could not create data call back thread with index %d ", i); FILE_LOG(logERROR, cstreambuf); return FAIL; } @@ -1448,7 +1460,7 @@ int UDPStandardImplementation::createListeningThreads(bool destroy){ currentThreadIndex = i; if(pthread_create(&listeningThreads[i], NULL,startListeningThread, (void*) this)){ char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH); - sprintf(cstreambuf, "Could not create listening thread with index %d ", i); + snprintf(cstreambuf, MAX_STR_LENGTH, "Could not create listening thread with index %d ", i); FILE_LOG(logERROR, cstreambuf); return FAIL; } @@ -1514,7 +1526,7 @@ int UDPStandardImplementation::createWriterThreads(bool destroy){ currentThreadIndex = i; if(pthread_create(&writingThreads[i], NULL,startWritingThread, (void*) this)){ char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH); - sprintf(cstreambuf, "Could not create writer thread with index %d ", i); + snprintf(cstreambuf, MAX_STR_LENGTH, "Could not create writer thread with index %d ", i); FILE_LOG(logERROR, cstreambuf); return FAIL; } @@ -1595,8 +1607,9 @@ int UDPStandardImplementation::createUDPSockets(){ //if eth is mistaken with ip address - if (strchr(eth,'.') != NULL) - strcpy(eth,""); + if (strchr(eth,'.') != NULL){ + memset(eth,0,MAX_STR_LENGTH); + } shutDownUDPSockets(); int headerpacketsize = 0; @@ -1614,7 +1627,7 @@ int UDPStandardImplementation::createUDPSockets(){ else{ { char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH); - sprintf(cstreambuf, "Ethernet Interface: %s", eth); + snprintf(cstreambuf, MAX_STR_LENGTH, "Ethernet Interface: %s", eth); FILE_LOG(logINFO, cstreambuf); } for(int i=0;iinitEventTree(temp, &iframe); //resets the pedestalSubtraction array and the commonModeSubtraction singlePhotonDetectorObject[ithread]->newDataSet(); @@ -1878,9 +1893,10 @@ void UDPStandardImplementation::startDataCallback(){ struct timespec begin,end; // server address to bind - char hostName[100] = "tcp://*:";//"tcp://127.0.0.1:"; + char hostName[100]; + memset(hostName,0,100); int portno = DEFAULT_ZMQ_PORTNO + (detID*numberofListeningThreads+ithread); - sprintf(hostName,"%s%d",hostName,portno); + snprintf(hostName,100,"tcp://*:%d",portno);//"tcp://127.0.0.1:"; //socket details void *context = zmq_ctx_new(); @@ -1893,7 +1909,7 @@ void UDPStandardImplementation::startDataCallback(){ // bind { char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH); - sprintf(cstreambuf, "Thread %d: ZMQ Server at %s ", ithread, hostName); + snprintf(cstreambuf, MAX_STR_LENGTH, "Thread %d: ZMQ Server at %s ", ithread, hostName); FILE_LOG(logINFO, cstreambuf); } @@ -1905,19 +1921,18 @@ void UDPStandardImplementation::startDataCallback(){ //infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again) while(true){ - int oneframesize = oneDataSize * packetsPerFrame; - char* buffer = new char[packetsPerFrame*oneDataSize](); - memset(buffer,0xFF,oneframesize); bool randomSendNow = true; //header details const char *jsonFmt ="{" - "\"jsonversion\":%u, " - "\"acqIndex\":%llu, " - "\"fIndex\":%llu, " - "\"bitmode\":%d, " - "\"shape\":[%d, %d], " - "\"fname\":\"%s\", " + "\"jsonversion\":%u, " + "\"acqIndex\":%llu, " + "\"fIndex\":%llu, " + "\"bitmode\":%d, " + "\"shapex\": %d, " + "\"shapey\": %d, " + "\"fname\":\"%s\", " + "\"data\": %d, " "\"frameNumber\":%llu, " "\"expLength\":%u, " @@ -1932,15 +1947,14 @@ void UDPStandardImplementation::startDataCallback(){ "\"roundRNumber\":%u, " "\"detType\":%u, " "\"version\":%u" - "}\n"; + + "}\n\0"; int npixelsx=0, npixelsy=0; switch(myDetectorType) { case JUNGFRAU: npixelsx = JFRAU_PIXELS_IN_ONE_ROW; npixelsy = JFRAU_PIXELS_IN_ONE_COL; break; case EIGER: npixelsx = EIGER_PIXELS_IN_ONE_ROW; npixelsy = EIGER_PIXELS_IN_ONE_COL; break; default:break; /* will not work for other detectors*/ } - uint64_t acquisitionIndex = -1; - uint64_t frameIndex = -1; #ifdef DEBUG int oldpnum = -1; #endif @@ -1963,19 +1977,22 @@ void UDPStandardImplementation::startDataCallback(){ cprintf(BLUE,"%d sending dummy\n"); #endif - frameIndex = -1; - acquisitionIndex = -1; { - char buf[1000]="";memset(buf,0,1000); - sprintf(buf,jsonFmt, - SLS_DETECTOR_JSON_HEADER_VERSION, acquisitionIndex, frameIndex, dynamicRange, npixelsx, npixelsy,completeFileName[ithread], - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0); + char buf[MAX_STR_LENGTH]="";memset(buf,0xFF,1000); + int len = snprintf(buf,MAX_STR_LENGTH, jsonFmt, + SLS_DETECTOR_JSON_HEADER_VERSION,0,0,0,0,0, "", 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0); + //cprintf(BLUE,"%d:Dummy:%s.\n",ithread, buf);fflush(stdout); + zmq_send_const(zmqsocket, buf, len, 0);//ZMQ_SNDMORE); - zmq_send(zmqsocket, buf,1000, 0); } + cprintf(BLUE,"%d dummy\n",ithread); + /* { //send final data - zmq_send (zmqsocket, "end\n", 4, 0); - + char buf[MAX_STR_LENGTH]="";memset(buf,0xFF,0); + int len = snprintf(buf,MAX_STR_LENGTH,"%s","end\n"); + zmq_send_const(zmqsocket, buf,len, 0); + }*/ pthread_mutex_lock(&statusMutex); dataCallbackThreadsMask^=(1<frameNumber; - frameIndex = fnum - startFrameIndex; - acquisitionIndex = fnum - startAcquisitionIndex; - { - char buf[1000]="";memset(buf,0,1000); - sprintf(buf,jsonFmt, - SLS_DETECTOR_JSON_HEADER_VERSION, acquisitionIndex, frameIndex, dynamicRange, npixelsx, npixelsy,completeFileName[ithread], - header->frameNumber, header->expLength, header->packetNumber, header->bunchId, header->timestamp, - header->modId, header->xCoord, header->yCoord, header->zCoord, header->debug, header->roundRNumber, header->detType, header->version); + uint64_t frameIndex = fnum - startFrameIndex; + uint64_t acquisitionIndex = fnum - startAcquisitionIndex; - zmq_send(zmqsocket, buf,1000, 0); + { + char buf[MAX_STR_LENGTH]="";memset(buf,0xFF,1000); + int len = snprintf(buf,MAX_STR_LENGTH,jsonFmt, + SLS_DETECTOR_JSON_HEADER_VERSION, acquisitionIndex, frameIndex, dynamicRange, npixelsx, npixelsy,completeFileName[ithread],1, + // 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0); + + + header->frameNumber, header->expLength, header->packetNumber, header->bunchId, header->timestamp, + header->modId, header->xCoord, header->yCoord, header->zCoord, header->debug, header->roundRNumber, header->detType, header->version + ); + //cprintf(BLUE,"%d:%s.\n",ithread, buf);fflush(stdout); + zmq_send_const(zmqsocket, buf,len, ZMQ_SNDMORE); } //send data - zmq_send(zmqsocket, (latestData[ithread]+sizeof(sls_detector_header)), bufferSize, 0); + zmq_send_const(zmqsocket,(latestData[ithread]+sizeof(sls_detector_header)), bufferSize, 0); //start clock after sending if(!frameToGuiFrequency){ randomSendNow = false; @@ -2039,8 +2061,6 @@ void UDPStandardImplementation::startDataCallback(){ }/*--end of loop for each buffer (inner loop)*/ - //free resources - delete[] buffer; //end of acquisition, wait for next acquisition/change of parameters sem_wait(&dataCallbackSemaphore[ithread]); @@ -2088,6 +2108,7 @@ void UDPStandardImplementation::startListening(){ carryonBufferSize = 0; if(tempBuffer){delete []tempBuffer;tempBuffer=0;} tempBuffer = new char[onePacketSize * (packetsPerFrame - 1)](); //store maximum of 1 packets less in a frame + memset(tempBuffer,0,onePacketSize * (packetsPerFrame - 1)); } /* inner loop - loop for each buffer */ @@ -2102,7 +2123,7 @@ void UDPStandardImplementation::startListening(){ if(activated && !udpSocket[ithread]){ { char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH); - sprintf(cstreambuf, "Listening_Thread %d: UDP Socket not created or shut down earlier ", ithread); + snprintf(cstreambuf, MAX_STR_LENGTH, "Listening_Thread %d: UDP Socket not created or shut down earlier ", ithread); FILE_LOG(logERROR, cstreambuf); } stopListening(ithread,0); @@ -2249,6 +2270,11 @@ int UDPStandardImplementation::prepareAndListenBufferCompleteFrames(int ithread) //read first packet pnum = FIRSTPNUM; //first packet number to validate if(status != TRANSMITTING) rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + offset); + /*if(!ithread){ + cprintf(BLUE,"%d first rc:%d\n",ithread, rc);fflush(stdout); + }else{ + cprintf(GREEN,"%d first rc:%d\n",ithread, rc);fflush(stdout); + }*/ if(rc <= 0) return 0; if(getFrameandPacketNumber(ithread,buffer[ithread] + offset,fi,pi,si,bi) == FAIL){ pi = ALL_MASK_32; //got 0 from fpga @@ -2276,7 +2302,7 @@ int UDPStandardImplementation::prepareAndListenBufferCompleteFrames(int ithread) if(!ithread) cout << "correct packet" << endl; #endif //copy only data - memcpy(buffer[ithread] + offset,buffer[ithread] + offset + headerlength, oneDataSize); + memmove(buffer[ithread] + offset,buffer[ithread] + offset + headerlength, oneDataSize); offset+=oneDataSize; //if complete frame @@ -2364,13 +2390,21 @@ int UDPStandardImplementation::prepareAndListenBufferCompleteFrames(int ithread) header->detType = (uint8_t) myDetectorType; header->version = (uint8_t) SLS_DETECTOR_HEADER_VERSION; -#ifdef VERBOSE - if(!ithread) + //#ifdef VERBOSE + //if(!ithread) + + /*if(!ithread) { cprintf(BLUE, - "framenumber:%lu\tsubfnum:%u\tpnum:%u\tbunchid:%lu\txcoord:%u\tdettype:%u\tversion:%u\n", - header->frameNumber, header->expLength, header->packetNumber, - header->bunchId, header->xCoord, header->detType, header->version); -#endif + "%d framenumber:%lu\tsubfnum:%u\tpnum:%u\tbunchid:%lu\txcoord:%u\tdettype:%u\tversion:%u\n", + ithread, header->frameNumber, header->expLength, header->packetNumber, + header->bunchId, header->xCoord, header->detType, header->version);fflush(stdout); + }else{ + cprintf(GREEN, + "%d framenumber:%lu\tsubfnum:%u\tpnum:%u\tbunchid:%lu\txcoord:%u\tdettype:%u\tversion:%u\n", + ithread, header->frameNumber, header->expLength, header->packetNumber, + header->bunchId, header->xCoord, header->detType, header->version);fflush(stdout); + }*/ + //#endif //write packet count to buffer *((uint32_t*)(buffer[ithread])) = packetsPerFrame; @@ -2433,7 +2467,7 @@ void UDPStandardImplementation::stopListening(int ithread, int numbytes){ //free empty buffer if(numbytes <= 0){ char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH); - sprintf(cstreambuf, "Listening %d: End of Acquisition", ithread); + snprintf(cstreambuf, MAX_STR_LENGTH, "Listening %d: End of Acquisition", ithread); FILE_LOG(logINFO, cstreambuf); while(!fifoFree[ithread]->push(buffer[ithread])); } @@ -2470,7 +2504,7 @@ void UDPStandardImplementation::stopListening(int ithread, int numbytes){ //#ifdef DEBUG4 { char cstreambuf[MAX_STR_LENGTH]; memset(cstreambuf, 0, MAX_STR_LENGTH); - sprintf(cstreambuf, "Listening Thread of %u got %d packets ", udpPortNum[ithread], totalListeningPacketCount[ithread]); + snprintf(cstreambuf, MAX_STR_LENGTH, "Listening Thread of %u got %d packets ", udpPortNum[ithread], totalListeningPacketCount[ithread]); FILE_LOG(logINFO, cstreambuf); } //#endif @@ -2667,6 +2701,7 @@ void UDPStandardImplementation::waitWritingBufferForNextAcquisition(int ithread) //create file if((1<subFrameNumber); + snprintf(cstreambuf, MAX_STR_LENGTH, "Fifo %d: Frame Number is zero from firmware. subnum: %u, fnum:%u, pnum:%u ", ithread, subframenumber, + (uint32_t)(*( (uint64_t*) footer)),(*( (uint16_t*) footer->packetNumber))-1); FILE_LOG(logERROR, cstreambuf); - return FAIL; } packetnumber = (*( (uint16_t*) footer->packetNumber))-1;