diff --git a/slsReceiverSoftware/MySocketTCP/genericSocket.h b/slsReceiverSoftware/MySocketTCP/genericSocket.h index 28ae7b650..59126cf67 100644 --- a/slsReceiverSoftware/MySocketTCP/genericSocket.h +++ b/slsReceiverSoftware/MySocketTCP/genericSocket.h @@ -72,7 +72,6 @@ class sockaddr_in; using namespace std; #define DEFAULT_PACKET_SIZE 1286 -#define DEFAULT_PACKETS_PER_FRAME 2 #define SOCKET_BUFFER_SIZE (100*1024*1024) //100MB #define DEFAULT_PORTNO 1952 #define DEFAULT_BACKLOG 5 @@ -91,8 +90,7 @@ enum communicationProtocol{ UDP /**< UDP */ }; - - genericSocket(const char* const host_ip_or_name, unsigned short int const port_number, communicationProtocol p, int ps = DEFAULT_PACKET_SIZE, int t = DEFAULT_PACKETS_PER_FRAME) : + genericSocket(const char* const host_ip_or_name, unsigned short int const port_number, communicationProtocol p, int ps = DEFAULT_PACKET_SIZE) : // portno(port_number), protocol(p), is_a_server(0), @@ -101,8 +99,7 @@ enum communicationProtocol{ packet_size(ps), nsending(0), nsent(0), - total_sent(0), - packets_per_frame(t)// sender (client): where to? ip + total_sent(0)// sender (client): where to? ip { // strcpy(hostname,host_ip_or_name); @@ -149,7 +146,7 @@ enum communicationProtocol{ */ - genericSocket(unsigned short int const port_number, communicationProtocol p, int ps = DEFAULT_PACKET_SIZE, int t = DEFAULT_PACKETS_PER_FRAME, const char *eth=NULL): + genericSocket(unsigned short int const port_number, communicationProtocol p, int ps = DEFAULT_PACKET_SIZE, const char *eth=NULL): //portno(port_number), protocol(p), is_a_server(1), @@ -158,8 +155,7 @@ enum communicationProtocol{ packet_size(ps), nsending(0), nsent(0), - total_sent(0), - packets_per_frame(t) + total_sent(0) { /* // you can specify an IP address: */ @@ -562,25 +558,23 @@ enum communicationProtocol{ break; case UDP: if (socketDescriptor<0) return -1; - //if length given + //if length given, listens to length, else listens for packetsize till length is reached if(length){ while(length>0){ - nsending=packet_size; + nsending = (length>packet_size) ? packet_size:length; nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length); if(!nsent) break; length-=nsent; total_sent+=nsent; } } - //depends on packets per frame + //listens to only 1 packet else{ - for(int i=0;ishutDownUDPSocket(); + slsReceiverFunctions->shutDownUDPSockets(); cout << "Closing Files... " << endl; slsReceiverFunctions->closeFile(); @@ -250,7 +250,7 @@ void slsReceiverTCPIPInterface::startTCPServer(){ if(v==GOODBYE){ cout << "Shutting down UDP Socket" << endl; if(slsReceiverFunctions) - slsReceiverFunctions->shutDownUDPSocket(); + slsReceiverFunctions->shutDownUDPSockets(); cout << "Closing Files... " << endl; slsReceiverFunctions->closeFile(); diff --git a/slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.cpp b/slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.cpp index b30d58aa2..18ba264a0 100644 --- a/slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.cpp +++ b/slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.cpp @@ -1,4 +1,4 @@ -#ifdef SLS_RECEIVER_UDP_FUNCTIONS +/*#ifdef SLS_RECEIVER_UDP_FUNCTIONS*/ /********************************************//** * @file slsReceiverUDPFunctions.cpp * @short does all the functions for a receiver, set/get parameters, start/stop etc. @@ -31,17 +31,19 @@ using namespace std; slsReceiverUDPFunctions::slsReceiverUDPFunctions(): receiver(NULL), - server_port(DEFAULT_UDP_PORTNO), thread_started(0), - udpSocket(NULL), eth(NULL), latestData(NULL), - guiFileName(NULL), - mem0(NULL), - fifo(NULL), - fifoFree(NULL){ + guiFileName(NULL){ + for(int i=0;iisEmpty()) - fifoFree->pop(buffer); - delete fifoFree; - } - if(fifo) delete fifo; - if(mem0) free(mem0); - fifoFree = new CircularFifo(fifosize); - fifo = new CircularFifo(fifosize); + for(int i=0;iisEmpty()) + fifoFree[i]->pop(buffer[i]); + delete fifoFree[i]; + } + if(fifo[i]) delete fifo[i]; + if(mem0[i]) free(mem0[i]); + fifoFree[i] = new CircularFifo(fifosize); + fifo[i] = new CircularFifo(fifosize); - //allocate memory - mem0=(char*)malloc((bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize); - /** shud let the client know about this */ - if (mem0==NULL){ - cout<<"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR LISTENING !!!!!!!+++++++++++++++++++++" << endl; - exit(-1); + //allocate memory + mem0[i]=(char*)malloc((bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize); + /** shud let the client know about this */ + if (mem0[i]==NULL){ + cout<<"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR LISTENING !!!!!!!+++++++++++++++++++++" << endl; + exit(-1); + } + buffer[i]=mem0[i]; + //push the addresses into freed fifoFree and writingFifoFree + while (buffer[i]<(mem0[i]+(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*(fifosize-1))) { + fifoFree[i]->push(buffer[i]); + buffer[i]+=(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS); + } } - buffer=mem0; - //push the addresses into freed fifoFree and writingFifoFree - while (buffer<(mem0+(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*(fifosize-1))) { - fifoFree->push(buffer); - buffer+=(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS); - } - - cout << "Fifo structure reconstructed" << endl; + cout << "Fifo structure(s) reconstructed" << endl; } @@ -792,7 +820,6 @@ void slsReceiverUDPFunctions::readFrame(char* c,char** raw){ void slsReceiverUDPFunctions::copyFrameToGui(char* startbuf){ - //random read when gui not ready if((!nFrameToGui) && (!guiData)){ pthread_mutex_lock(&dataReadyMutex); @@ -809,7 +836,8 @@ void slsReceiverUDPFunctions::copyFrameToGui(char* startbuf){ pthread_mutex_lock(&dataReadyMutex); guiDataReady=0; //send the first one - memcpy(latestData,startbuf,bufferSize); + for(int i=0;igetErrorStatus(); - if (iret){ + int iret; + for(int i=0;igetErrorStatus(); + if(iret){ #ifdef VERBOSE - cout << "Could not create UDP socket on port " << server_port << " error:" << iret << endl; + cout << "Could not create UDP socket on port " << server_port[i] << " error:" << iret << endl; #endif - - return FAIL; + return FAIL; + } } + return OK; } @@ -858,11 +893,13 @@ int slsReceiverUDPFunctions::createUDPSocket(){ -int slsReceiverUDPFunctions::shutDownUDPSocket(){ - if(udpSocket){ - udpSocket->ShutDownSocket(); - delete udpSocket; - udpSocket = NULL; +int slsReceiverUDPFunctions::shutDownUDPSockets(){ + for(int i=0;iShutDownSocket(); + delete udpSocket[i]; + udpSocket[i] = NULL; + } } return OK; } @@ -872,37 +909,53 @@ int slsReceiverUDPFunctions::shutDownUDPSocket(){ int slsReceiverUDPFunctions::createListeningThreads(bool destroy){ + int i; void* status; - killListeningThread = 0; + killAllListeningThreads = 0; pthread_mutex_lock(&status_mutex); - listening_thread_running = 0; + listeningthreads_mask = 0x0; pthread_mutex_unlock(&(status_mutex)); if(!destroy){ - //listening thread - cout << "Creating Listening Thread" << endl; - sem_init(&listensmp,1,0); - if(pthread_create(&listening_thread, NULL,startListeningThread, (void*) this)){ - cout << "Could not create listening thread" << endl; - return FAIL; + + //start listening threads + cout << "Creating Listening Threads(s)"; + + currentListeningThreadIndex = -1; + + for(i = 0; i < numListeningThreads; ++i){ + sem_init(&listensmp[i],1,0); + thread_started = 0; + currentListeningThreadIndex = i; + if(pthread_create(&listening_thread[i], NULL,startListeningThread, (void*) this)){ + cout << "Could not create listening thread with index " << i << endl; + return FAIL; + } + while(!thread_started); + cout << "."; + cout << flush; } #ifdef VERBOSE - cout << "Listening thread created successfully." << endl; + cout << "Listening thread(s) created successfully." << endl; +#else + cout << endl; #endif }else{ - cout<<"Destroying Listening Thread"<startReceiver(message); @@ -1244,20 +1303,21 @@ int slsReceiverUDPFunctions::startReceiver(char message[]){ //reset listening thread variables measurementStarted = false; startFrameIndex = 0; - totalListeningFrameCount = 0; + for(int i = 0; i < numListeningThreads; ++i) + totalListeningFrameCount[i] = 0; //udp socket - if(createUDPSocket() == FAIL){ - strcpy(message,"Could not create UDP Socket.\n"); + if(createUDPSockets() == FAIL){ + strcpy(message,"Could not create UDP Socket(s).\n"); cout << endl << message << endl; return FAIL; } - cout << "UDP socket created successfully on port " << server_port << endl; + cout << "UDP socket(s) created successfully. 1st port " << server_port[0] << endl; if(setupWriter() == FAIL){ //stop udp socket - shutDownUDPSocket(); + shutDownUDPSockets(); sprintf(message,"Could not create file %s.\n",savefilename); return FAIL; @@ -1274,17 +1334,19 @@ int slsReceiverUDPFunctions::startReceiver(char message[]){ //status pthread_mutex_lock(&status_mutex); status = RUNNING; - for(int i=0;istopReceiver(); return; } - + //#ifdef VERBOSE + cout << "Start Receiver Readout" << endl; + //#endif //wait so that all packets which take time has arrived usleep(50000); @@ -1344,7 +1408,7 @@ void slsReceiverUDPFunctions::startReadout(){ cout << "Status: Transmitting" << endl; //kill udp socket to tell the listening thread to push last packet - shutDownUDPSocket(); + shutDownUDPSockets(); } @@ -1369,152 +1433,142 @@ void* slsReceiverUDPFunctions::startWritingThread(void* this_pointer){ int slsReceiverUDPFunctions::startListening(){ + int ithread = currentListeningThreadIndex; #ifdef VERYVERBOSE - cout << "In startListening()" << endl; + cout << "In startListening() " << endl; #endif + thread_started = 1; + + int i,total; int lastpacketoffset, expected, rc, packetcount, maxBufferSize, carryonBufferSize; uint32_t lastframeheader;// for moench to check for all the packets in last frame char* tempchar = NULL; + int imageheader = 0; + if(myDetectorType==EIGER) + imageheader = EIGER_IMAGE_HEADER_SIZE; while(1){ //variables that need to be checked/set before each acquisition carryonBufferSize = 0; - maxBufferSize = packetsPerFrame * numJobsPerThread * onePacketSize; - if(tempchar) {delete [] tempchar;tempchar = NULL;} - tempchar = new char[onePacketSize * (packetsPerFrame - 1)]; //gotthard: 1packet size, moench:39 packet size - - - while(listening_thread_running){ - - //pop - fifoFree->pop(buffer); + //if more than 1 listening thread, listen one packet at a time, else need to interleaved frame later + maxBufferSize = bufferSize * numJobsPerThread; #ifdef VERYDEBUG - cout << "*** popped from fifo free" << (void*)buffer << endl; + cout << " maxBufferSize:" << maxBufferSize << ",carryonBufferSize:" << carryonBufferSize << endl; #endif - //receive - if(udpSocket == NULL) - rc = 0; - else if(!carryonBufferSize){ - rc = udpSocket->ReceiveDataOnly(buffer + HEADER_SIZE_NUM_TOT_PACKETS, maxBufferSize); - expected = maxBufferSize; - }else{ + if(tempchar) {delete [] tempchar;tempchar = NULL;} + if(myDetectorType == EIGER) + tempchar = new char[bufferSize]; + else + tempchar = new char[onePacketSize * (packetsPerFrame - 1)]; //gotthard: 1packet size, moench:39 packet size + + + + while((1<pop(buffer[ithread]); +#ifdef VERYDEBUG + cout << ithread << " *** popped from fifo free" << (void*)buffer[ithread] << endl; +#endif + + + //receive + if(udpSocket[ithread] == NULL){ + rc = 0; + cout << ithread << "UDP Socket is NULL" << endl; + } + //normal listening + else if(!carryonBufferSize){ + //eiger + if (imageheader){ + rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS); + //if it was the header + if(rc == EIGER_HEADER_LENGTH){ +#ifdef VERYDEBUG + cout << "rc for header2:" << dec << rc << endl; +#endif + expected = EIGER_HEADER_LENGTH; + }else{ + expected = maxBufferSize; + } + } + //not eiger + else{ + rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, maxBufferSize); + expected = maxBufferSize; + } + } + //the remaining packets from previous buffer + else{ +#ifdef VERYDEBUG + cout << ithread << " ***carry on buffer" << carryonBufferSize << endl; + cout << ithread << " framennum in temochar:"<<((((uint32_t)(*((uint32_t*)tempchar))) & (frameIndexMask)) >> frameIndexOffset)<ReceiveDataOnly((buffer + HEADER_SIZE_NUM_TOT_PACKETS + carryonBufferSize),maxBufferSize - carryonBufferSize); + memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, tempchar, carryonBufferSize); + rc = udpSocket[ithread]->ReceiveDataOnly((buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + carryonBufferSize),maxBufferSize - carryonBufferSize); expected = maxBufferSize - carryonBufferSize; } #ifdef VERYDEBUG - cout << "*** rc:" << dec << rc << endl; - cout << "*** expected:" << dec << expected << endl; + cout << ithread << " *** rc:" << dec << rc << endl; + cout << ithread << " *** expected:" << dec << expected << endl; #endif - //start indices - //start of scan - if((!measurementStarted) && (rc > 0)){ - //gotthard has +1 for frame number - if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) - startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer + HEADER_SIZE_NUM_TOT_PACKETS))))+1) - & (frameIndexMask)) >> frameIndexOffset); - else - startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer+HEADER_SIZE_NUM_TOT_PACKETS)))) - & (frameIndexMask)) >> frameIndexOffset); - cout<<"startFrameIndex:"< 0)) + startFrameIndices(ithread); + //problem in receiving or end of acquisition if((rc < expected)||(rc <= 0)){ -#ifdef VERYVERBOSE - cerr << "recvfrom() failed:"<push(buffer); - exit(-1); - continue; - } - //push the last buffer into fifo - if(rc > 0){ - packetcount = (rc/onePacketSize); -#ifdef VERYDEBUG - cout << "*** last packetcount:" << packetcount << endl; -#endif - (*((uint16_t*)(buffer))) = packetcount; - totalListeningFrameCount += packetcount; - while(!fifo->push(buffer)); -#ifdef VERYDEBUG - cout << "*** last lbuf1:" << (void*)buffer << endl; -#endif - } - - - //push dummy buffer - for(int i=0;ipop(buffer); - (*((uint16_t*)(buffer))) = 0xFFFF; - while(!fifo->push(buffer)); -#ifdef VERYDEBUG - cout << "pushed in dummy buffer:" << (void*)buffer << endl; -#endif - } - cout << "Total count listened to " << totalListeningFrameCount/packetsPerFrame << endl; - pthread_mutex_lock(&status_mutex); - listening_thread_running = 0; - pthread_mutex_unlock(&(status_mutex)); - break; + stopListening(ithread,rc,packetcount,total); + continue; } + //reset packetcount = packetsPerFrame * numJobsPerThread; carryonBufferSize = 0; + //check if last packet valid and calculate packet count switch(myDetectorType){ - - case MOENCH: lastpacketoffset = (((numJobsPerThread * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); #ifdef VERYDEBUG - cout <<"first packet:"<< ((((uint32_t)(*((uint32_t*)(buffer+HEADER_SIZE_NUM_TOT_PACKETS))))) & (packetIndexMask)) << endl; - cout <<"first header:"<< (((((uint32_t)(*((uint32_t*)(buffer+HEADER_SIZE_NUM_TOT_PACKETS))))) & (frameIndexMask)) >> frameIndexOffset) << endl; + cout <<"first packet:"<< ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (packetIndexMask)) << endl; + cout <<"first header:"<< (((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (frameIndexMask)) >> frameIndexOffset) << endl; cout << "last packet offset:" << lastpacketoffset << endl; - cout <<"last packet:"<< ((((uint32_t)(*((uint32_t*)(buffer+lastpacketoffset))))) & (packetIndexMask)) << endl; - cout <<"last header:"<< (((((uint32_t)(*((uint32_t*)(buffer+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset) << endl; + cout <<"last packet:"<< ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask)) << endl; + cout <<"last header:"<< (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset) << endl; #endif //moench last packet value is 0 - if( ((((uint32_t)(*((uint32_t*)(buffer+lastpacketoffset))))) & (packetIndexMask))){ - lastframeheader = ((((uint32_t)(*((uint32_t*)(buffer+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset; + if( ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask))){ + lastframeheader = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset; carryonBufferSize += onePacketSize; lastpacketoffset -= onePacketSize; --packetcount; - while (lastframeheader == (((((uint32_t)(*((uint32_t*)(buffer+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset)){ + while (lastframeheader == (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset)){ carryonBufferSize += onePacketSize; lastpacketoffset -= onePacketSize; --packetcount; } - memcpy(tempchar, buffer+(lastpacketoffset+onePacketSize), carryonBufferSize); + memcpy(tempchar, buffer[ithread]+(lastpacketoffset+onePacketSize), carryonBufferSize); #ifdef VERYDEBUG cout << "tempchar header:" << (((((uint32_t)(*((uint32_t*)(tempchar))))) & (frameIndexMask)) >> frameIndexOffset) << endl; @@ -1524,17 +1578,15 @@ int slsReceiverUDPFunctions::startListening(){ } break; - - - default: + case GOTTHARD: if(shortFrame == -1){ lastpacketoffset = (((numJobsPerThread * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); #ifdef VERYDEBUG cout << "last packet offset:" << lastpacketoffset << endl; #endif - if((unsigned int)(packetsPerFrame -1) != ((((uint32_t)(*((uint32_t*)(buffer+lastpacketoffset))))+1) & (packetIndexMask))){ - memcpy(tempchar,buffer+lastpacketoffset, onePacketSize); + if((unsigned int)(packetsPerFrame -1) != ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))+1) & (packetIndexMask))){ + memcpy(tempchar,buffer[ithread]+lastpacketoffset, onePacketSize); #ifdef VERYDEBUG cout << "tempchar header:" << (((((uint32_t)(*((uint32_t*)(tempchar))))+1) & (frameIndexMask)) >> frameIndexOffset) << endl; @@ -1544,32 +1596,45 @@ int slsReceiverUDPFunctions::startListening(){ } } #ifdef VERYDEBUG - cout << "header:" << (((((uint32_t)(*((uint32_t*)(buffer + HEADER_SIZE_NUM_TOT_PACKETS))))+1) + cout << "header:" << (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) & (frameIndexMask)) >> frameIndexOffset) << endl; #endif break; - - - + default: + if(rc==EIGER_HEADER_LENGTH) + packetcount=0; + else + packetcount = 1; + break; } + + + + + #ifdef VERYDEBUG cout << "*** packetcount:" << packetcount << " carryonbuffer:" << carryonBufferSize << endl; #endif //write packet count and push - (*((uint16_t*)(buffer))) = packetcount; - totalListeningFrameCount += packetcount; - while(!fifo->push(buffer)); + (*((uint16_t*)(buffer[ithread]))) = packetcount; + totalListeningFrameCount[ithread] += packetcount; +#ifdef VERYDEBUG + if(!ithread) cout << "totalListeningFrameCount[" << ithread << "]:" << totalListeningFrameCount[ithread] << endl; +#endif + while(!fifo[ithread]->push(buffer[ithread])); #ifdef VERYDEBUG cout << "*** pushed into listening fifo" << endl; #endif } - sem_wait(&listensmp); + sem_wait(&listensmp[ithread]); //make sure its not exiting thread - if(killListeningThread) + if(killAllListeningThreads){ + cout << ithread << " good bye listening thread" << endl; pthread_exit(NULL); + } } return OK; @@ -1599,12 +1664,11 @@ int slsReceiverUDPFunctions::startWriting(){ int numpackets, nf; uint32_t tempframenum; - char* wbuf; + char* wbuf[MAX_NUM_LISTENING_THREADS]; char *data=new char[bufferSize]; int iFrame = 0; int xmax=0,ymax=0; - int ret; - + int ret,i; while(1){ @@ -1626,11 +1690,12 @@ int slsReceiverUDPFunctions::startWriting(){ while((1<pop(wbuf); - numpackets = (uint16_t)(*((uint16_t*)wbuf)); + for(i=0;ipop(wbuf[i]); + numpackets = (uint16_t)(*((uint16_t*)wbuf[0])); #ifdef VERYDEBUG cout << ithread << " numpackets:" << dec << numpackets << endl; cout << ithread << " *** popped from fifo " << numpackets << endl; @@ -1644,13 +1709,14 @@ int slsReceiverUDPFunctions::startWriting(){ //last dummy packet if(numpackets == 0xFFFF){ #ifdef VERYDEBUG - cout << ithread << " **********************popped last dummy frame:" << (void*)wbuf << endl; + cout << ithread << " **********************popped last dummy frame:" << (void*)wbuf[0] << endl; #endif //free fifo - while(!fifoFree->push(wbuf)); + for(i=0;ipush(wbuf[i])); #ifdef VERYDEBUG - cout << ithread << " fifo freed:" << (void*)wbuf << endl; + cout << ithread << " fifo freed:" << (void*)wbuf[i] << endl; #endif @@ -1660,7 +1726,7 @@ int slsReceiverUDPFunctions::startWriting(){ pthread_mutex_lock(&status_mutex); writerthreads_mask^=(1<> frameIndexOffset); + if(myDetectorType == EIGER){ + if(!numpackets) + tempframenum = htonl(*(unsigned int*)((eiger_image_header *)((char*)(wbuf[0] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum); + }else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + tempframenum = (((((uint32_t)(*((uint32_t*)(wbuf[0] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); else - tempframenum = ((((uint32_t)(*((uint32_t*)(wbuf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); + tempframenum = ((((uint32_t)(*((uint32_t*)(wbuf[0] + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); if(numWriterThreads == 1) currframenum = tempframenum; @@ -1715,28 +1784,38 @@ int slsReceiverUDPFunctions::startWriting(){ pthread_mutex_unlock(&progress_mutex); } #ifdef VERYDEBUG - cout << ithread << " tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; + cout << endl <num2)&0xff)<<"\t\t"< 0){ - writeToFile_withoutCompression(wbuf, numpackets); - } - //copy to gui - copyFrameToGui(wbuf + HEADER_SIZE_NUM_TOT_PACKETS); + if(!dataCompression){ - while(!fifoFree->push(wbuf)); -#ifdef VERYVERBOSE - cout<<"buf freed:"<<(void*)wbuf< 0){ + for(i=0;i 0){ + cout<<"numpackets:"<push(wbuf[i])); +#ifdef VERYVERBOSE + cout<<"buf freed:"<<(void*)wbuf[0]< (wbuf + HEADER_SIZE_NUM_TOT_PACKETS + numpackets * onePacketSize) ) + if(data > (wbuf[0] + HEADER_SIZE_NUM_TOT_PACKETS + numpackets * onePacketSize) ) cout <<" **************ERROR SHOULD NOT COME HERE, Error 142536!"<push(wbuf)); + while(!fifoFree[0]->push(wbuf[0])); #ifdef VERYVERBOSE - cout<<"buf freed:"<<(void*)wbuf<fnum); + //gotthard has +1 for frame number and not a short frame + else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) + & (frameIndexMask)) >> frameIndexOffset); + else + startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS)))) + & (frameIndexMask)) >> frameIndexOffset); + + + cout << "startFrameIndex:" << startFrameIndex<push(buffer[ithread]); + exit(-1); + } + //push the last buffer into fifo + if(rc > 0){ + pc = (rc/onePacketSize); +#ifdef VERYDEBUG + cout << ithread << " *** last packetcount:" << packetcount << endl; +#endif + (*((uint16_t*)(buffer[ithread]))) = pc; + totalListeningFrameCount[ithread] += pc; + while(!fifo[ithread]->push(buffer[ithread])); +#ifdef VERYDEBUG + cout << ithread << " *** last lbuf1:" << (void*)buffer[ithread] << endl; +#endif + } + + + //push dummy buffer to all writer threads + for(i=0;ipop(buffer[ithread]); + (*((uint16_t*)(buffer[ithread]))) = 0xFFFF; + while(!fifo[ithread]->push(buffer[ithread])); +#ifdef VERYDEBUG + cout << ithread << " pushed in dummy buffer:" << (void*)buffer[ithread] << endl; +#endif + } + + //reset mask and exit loop + pthread_mutex_lock(&status_mutex); + listeningthreads_mask^=(1< 1) + cout << "Waiting for listening to be done.. current mask:" << hex << listeningthreads_mask << endl; +#endif + while(listeningthreads_mask) + usleep(5000); +#ifdef VERYDEBUG + t = 0; + for(i=0;i 0){ //for progress and packet loss calculation(new files) - if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + if(myDetectorType == EIGER){ + if(((uint16_t)(*((uint16_t*)buf)))==0) + tempframenum = htonl(*(unsigned int*)((eiger_image_header *)((char*)(buf + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum); + }else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) tempframenum = (((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); else tempframenum = ((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); @@ -1950,7 +2119,10 @@ void slsReceiverUDPFunctions::writeToFile_withoutCompression(char* buf,int numpa if(packetsInFile >= maxPacketsPerFile){ //for packet loss lastpacket = (((packetsToSave - 1) * onePacketSize) + offset); - if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + if(myDetectorType == EIGER){ + if(((uint16_t)(*((uint16_t*)buf)))==0) + tempframenum = htonl(*(unsigned int*)((eiger_image_header *)((char*)(buf + lastpacket)))->fnum); + }else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) tempframenum = (((((uint32_t)(*((uint32_t*)(buf + lastpacket))))+1)& (frameIndexMask)) >> frameIndexOffset); else tempframenum = ((((uint32_t)(*((uint32_t*)(buf + lastpacket))))& (frameIndexMask)) >> frameIndexOffset); @@ -1993,4 +2165,4 @@ void slsReceiverUDPFunctions::writeToFile_withoutCompression(char* buf,int numpa } -#endif +/*#endif*/ diff --git a/slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.h b/slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.h index 2029b3126..691c417be 100644 --- a/slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.h +++ b/slsReceiverSoftware/slsReceiver/slsReceiverUDPFunctions.h @@ -1,4 +1,4 @@ -#ifdef SLS_RECEIVER_UDP_FUNCTIONS +/*#ifdef SLS_RECEIVER_UDP_FUNCTIONS*/ #ifndef SLS_RECEIVER_UDP_FUNCTIONS_H #define SLS_RECEIVER_UDP_FUNCTIONS_H /********************************************//** @@ -267,10 +267,10 @@ public: void startReadout(); /** - * shuts down the udp socket + * shuts down the udp sockets * \returns if success or fail */ - int shutDownUDPSocket(); + int shutDownUDPSockets(); private: /** @@ -295,10 +295,10 @@ private: void copyFrameToGui(char* startbuf); /** - * creates udp socket + * creates udp sockets * \returns if success or fail */ - int createUDPSocket(); + int createUDPSockets(); /** * create listening thread @@ -366,17 +366,55 @@ private: */ int startWriting(); - /** * Writing to file without compression * @param buf is the address of buffer popped out of fifo * @param numpackets is the number of packets + * @param framenum current frame number */ - void writeToFile_withoutCompression(char* buf,int numpackets); + void writeToFile_withoutCompression(char* buf,int numpackets, uint32_t framenum); + + /** + * Its called for the first packet of a scan or acquistion + * Sets the startframeindices and the variables to know if acquisition started + * @param ithread listening thread number + */ + void startFrameIndices(int ithread); + + /** + * This is called when udp socket is shut down + * It pops ffff instead of packet number into fifo + * to inform writers about the end of listening session + * @param ithread listening thread number + * @param rc number of bytes received + * @param pc packet count + * @param t total packets listened to + */ + void stopListening(int ithread, int rc, int &pc, int &t); + /** structure of an eiger image header*/ + typedef struct + { + unsigned char header_before[20]; + unsigned char fnum[4]; + unsigned char header_after[24]; + } eiger_image_header; + /** structure of an eiger image header*/ + typedef struct + { + unsigned char num1[4]; + unsigned char num2[4]; + } eiger_packet_header; + + /** max number of listening threads */ + const static int MAX_NUM_LISTENING_THREADS = MAX_EIGER_PORTS; + + /** max number of writer threads */ + const static int MAX_NUM_WRITER_THREADS = 15; + /** Eiger Receiver */ EigerReceiver *receiver; @@ -390,10 +428,10 @@ private: runStatus status; /** UDP Socket between Receiver and Detector */ - genericSocket* udpSocket; + genericSocket* udpSocket[MAX_NUM_LISTENING_THREADS]; /** Server UDP Port*/ - int server_port; + int server_port[MAX_NUM_LISTENING_THREADS]; /** ethernet interface or IP to listen to */ char *eth; @@ -482,7 +520,10 @@ private: /** Previous Frame number from buffer */ uint32_t prevframenum; - /** buffer size can be 1286*2 or 518 or 1286*40 */ + /** size of one frame */ + int frameSize; + + /** buffer size. different from framesize as we wait for one packet instead of frame for eiger */ int bufferSize; /** oen buffer size */ @@ -509,23 +550,23 @@ private: /** number of jobs per thread for data compression */ int numJobsPerThread; - /** memory allocated for the buffer */ - char *mem0; - /** datacompression - save only hits */ bool dataCompression; + /** memory allocated for the buffer */ + char *mem0[MAX_NUM_LISTENING_THREADS]; + /** circular fifo to store addresses of data read */ - CircularFifo* fifo; + CircularFifo* fifo[MAX_NUM_LISTENING_THREADS]; /** circular fifo to store addresses of data already written and ready to be resued*/ - CircularFifo* fifoFree; + CircularFifo* fifoFree[MAX_NUM_LISTENING_THREADS]; /** Receiver buffer */ - char *buffer; + char *buffer[MAX_NUM_LISTENING_THREADS]; - /** max number of writer threads */ - const static int MAX_NUM_WRITER_THREADS = 15; + /** number of writer threads */ + int numListeningThreads; /** number of writer threads */ int numWriterThreads; @@ -533,19 +574,25 @@ private: /** to know if listening and writer threads created properly */ int thread_started; + /** current listening thread index*/ + int currentListeningThreadIndex; + /** current writer thread index*/ int currentWriterThreadIndex; /** thread listening to packets */ - pthread_t listening_thread; + pthread_t listening_thread[MAX_NUM_LISTENING_THREADS]; /** thread writing packets */ pthread_t writing_thread[MAX_NUM_WRITER_THREADS]; /** total frame count the listening thread has listened to */ - int totalListeningFrameCount; + int totalListeningFrameCount[MAX_NUM_LISTENING_THREADS]; - /** mask showing which threads are running */ + /** mask showing which listening threads are running */ + volatile uint32_t listeningthreads_mask; + + /** mask showing which writer threads are running */ volatile uint32_t writerthreads_mask; /** mask showing which threads have created files*/ @@ -554,11 +601,8 @@ private: /** OK if file created was successful */ int ret_createfile; - /** 0 if listening thread is idle, 1 otherwise */ - int listening_thread_running; - /** variable used to self terminate threads waiting for semaphores */ - int killListeningThread; + int killAllListeningThreads; /** variable used to self terminate threads waiting for semaphores */ int killAllWritingThreads; @@ -569,8 +613,8 @@ private: //semaphores /** semaphore to synchronize writer and guireader threads */ sem_t smp; - /** semaphore to synchronize listener thread */ - sem_t listensmp; + /** semaphore to synchronize listener threads */ + sem_t listensmp[MAX_NUM_LISTENING_THREADS]; /** semaphore to synchronize writer threads */ sem_t writersmp[MAX_NUM_WRITER_THREADS]; @@ -687,4 +731,4 @@ public: #endif -#endif +/*#endif*/