small change

This commit is contained in:
Dhanya Maliakal 2015-07-03 16:10:04 +02:00
parent 88e96d45e7
commit fbfafb98fd
3 changed files with 140 additions and 46 deletions

View File

@ -463,10 +463,19 @@ private:
*/ */
void stopWriting(int ithread, char* wbuffer[]); void stopWriting(int ithread, char* wbuffer[]);
/**
* updates parameters and writes to file when not a dummy frame
* Also calls writeToFile_withoutCompression or handleDataCompression
* Called by startWriting()
* @param ithread writing thread number
* @param wbuffer writer buffer
* @param npackets number of packets from the fifo
*/
int handleWithoutDataCompression(int ithread, char* wbuffer[], int &npackets);
/** /**
* data compression for each fifo output * data compression for each fifo output
* @param ithread listening thread number * @param ithread writing thread number
* @param wbuffer writer buffer * @param wbuffer writer buffer
* @param npackets number of packets from the fifo * @param npackets number of packets from the fifo
* @param data pointer to the next packet start * @param data pointer to the next packet start

View File

@ -1826,7 +1826,7 @@ int i;
void UDPBaseImplementation::stopWriting(int ithread, char* wbuffer[]){ FILE_LOG(logDEBUG) << __AT__ << " starting"; void UDPBaseImplementation::stopWriting(int ithread, char* wbuffer[]){ FILE_LOG(logDEBUG) << __AT__ << " starting";
int i,j; int i;
#ifdef VERYDEBUG #ifdef VERYDEBUG
cout << ithread << " **********************popped last dummy frame:" << (void*)wbuffer[wIndex] << endl; cout << ithread << " **********************popped last dummy frame:" << (void*)wbuffer[wIndex] << endl;
#endif #endif

View File

@ -1734,19 +1734,19 @@ int UDPStandardImplementation::startListening(){
cout << ithread << " *** rc:" << dec << rc << ". expected:" << dec << expected << endl; cout << ithread << " *** rc:" << dec << rc << ". expected:" << dec << expected << endl;
#endif #endif
/*
//start indices for each start of scan/acquisition - eiger does it before //start indices for each start of scan/acquisition - eiger does it before
if((!measurementStarted) && (rc > 0) && (!ithread)) if((!measurementStarted) && (rc > 0) && (!ithread))
startFrameIndices(ithread); startFrameIndices(ithread);
*/
//problem in receiving or end of acquisition //problem in receiving or end of acquisition
if((rc < expected)||(rc <= 0)){ if((rc < expected)||(rc <= 0)){
if(myDetectorType != EIGER){ /*if(myDetectorType != EIGER){
//start indices for each start of scan/acquisition - this should be done earlier for normal detectors //start indices for each start of scan/acquisition - this should be done earlier for normal detectors
if((!measurementStarted) && (rc > 0) && (!ithread)) if((!measurementStarted) && (rc > 0) && (!ithread))
startFrameIndices(ithread); startFrameIndices(ithread);
} }*/
stopListening(ithread,rc,packetcount,total); stopListening(ithread,rc,packetcount,total);
continue; continue;
} }
@ -1882,13 +1882,12 @@ int UDPStandardImplementation::startWriting(){
thread_started = 1; thread_started = 1;
int totalheader = HEADER_SIZE_NUM_TOT_PACKETS + EIGER_HEADER_LENGTH;
int numpackets, nf; int numpackets, nf;
uint32_t tempframenum; uint32_t tempframenum;
char* wbuf[numListeningThreads];//interleaved char* wbuf[numListeningThreads];//interleaved
char *d=new char[bufferSize*numListeningThreads]; char *d=new char[bufferSize*numListeningThreads];
int xmax=0,ymax=0; int xmax=0,ymax=0;
int ret,i,j; int ret,i;
int packetsPerThread = packetsPerFrame/numListeningThreads; int packetsPerThread = packetsPerFrame/numListeningThreads;
while(1){ while(1){
@ -1927,9 +1926,9 @@ int UDPStandardImplementation::startWriting(){
cprintf(MAGENTA,"%d writer poped from fifo %x\n", ithread, (void*)(wbuf[i])); cprintf(MAGENTA,"%d writer poped from fifo %x\n", ithread, (void*)(wbuf[i]));
#endif #endif
numpackets = (uint16_t)(*((uint16_t*)wbuf[i])); numpackets = (uint16_t)(*((uint16_t*)wbuf[i]));
#ifdef VERYDEBUG //#ifdef VERYDEBUG
cout << i << " numpackets:" << dec << numpackets << "for fifo :"<< i << endl; cout << i << " numpackets:" << dec << numpackets << "for fifo :"<< i << endl;
#endif //#endif
} }
@ -1947,7 +1946,7 @@ int UDPStandardImplementation::startWriting(){
//for progress //update current frame number for progress
if(myDetectorType == EIGER){ if(myDetectorType == EIGER){
if(dynamicRange != 32) if(dynamicRange != 32)
@ -1977,12 +1976,12 @@ int UDPStandardImplementation::startWriting(){
#endif #endif
//without datacompression: write datacall back, or write data, free fifo /* //without datacompression: write datacall back, or write data, free fifo
if(!dataCompression){ if(!dataCompression){
if (cbAction < DO_EVERYTHING){ if (cbAction < DO_EVERYTHING){
for(i=0;i<numListeningThreads;++i) for(i=0;i<numListeningThreads;++i)
/* for eiger 32 bit mode, currframenum like gotthard, does not start from 0 or 1 */ //for eiger 32 bit mode, currframenum like gotthard, does not start from 0 or 1
rawDataReadyCallBack(currframenum, wbuf[i], numpackets * onePacketSize, sfilefd, guiData,pRawDataReady); rawDataReadyCallBack(currframenum, wbuf[i], numpackets * onePacketSize, sfilefd, guiData,pRawDataReady);
} }
@ -2076,9 +2075,11 @@ int UDPStandardImplementation::startWriting(){
#endif #endif
} }
} }
*/
//without datacompression: write datacall back, or write data, free fifo
if(!dataCompression) handleWithoutDataCompression(ithread,wbuf,numpackets);
//data compression //data compression
else else handleDataCompression(ithread,wbuf,numpackets,d, xmax, ymax, nf);
handleDataCompression(ithread,wbuf,numpackets,d, xmax, ymax, nf);
@ -2150,9 +2151,6 @@ void UDPStandardImplementation::startFrameIndices(int ithread){
if (myDetectorType == EIGER){ if (myDetectorType == EIGER){
//add currframenum later in this method for scans //add currframenum later in this method for scans
/*if(dynamicRange == 32)
startFrameIndex = htonl(*(unsigned int*)((eiger_image_header32 *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum);
else*/
startFrameIndex = htonl(*(unsigned int*)((eiger_image_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum); startFrameIndex = htonl(*(unsigned int*)((eiger_image_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum);
} }
//gotthard has +1 for frame number and not a short frame //gotthard has +1 for frame number and not a short frame
@ -2169,19 +2167,19 @@ void UDPStandardImplementation::startFrameIndices(int ithread){
startAcquisitionIndex=startFrameIndex; startAcquisitionIndex=startFrameIndex;
currframenum = startAcquisitionIndex; currframenum = startAcquisitionIndex;
acqStarted = true; acqStarted = true;
cout << "startAcquisitionIndex:" << hex << startAcquisitionIndex<<endl; cout << "startAcquisitionIndex:" << dec << startAcquisitionIndex<<endl;
} }
//for scans, cuz currfraenum resets //for scans, cuz currfraenum resets
else if (myDetectorType == EIGER){ else if (myDetectorType == EIGER){
if(dynamicRange == 32) /*if(dynamicRange == 32)
startFrameIndex = (currframenum + 1);// to be added later for scans startFrameIndex = (currframenum + 1);// to be added later for scans
else else*/
startFrameIndex += currframenum; startFrameIndex += currframenum;
} }
cout << "startFrameIndex:" << startFrameIndex<<endl; cout << "startFrameIndex: " << dec << startFrameIndex<<endl;
prevframenum=startFrameIndex; prevframenum=startFrameIndex;
measurementStarted = true; measurementStarted = true;
@ -2217,18 +2215,7 @@ int i;
} }
//push the last buffer into fifo //push the last buffer into fifo
else{ else{
//eiger (incomplete frames) - throw away
if((myDetectorType == EIGER) && (rc < (bufferSize * numJobsPerThread)) ){
if(rc == 266240)
cprintf(GREEN, "%d Start of detector: Received test frame of 266240 bytes.\n",ithread);
cout << ithread << "Discarding incomplete frame" << endl;
fifoFree[ithread]->push(buffer[ithread]);/** why not while(!)*/
#ifdef FIFO_DEBUG
cprintf(BLUE,"%d listener last buffer free pushed into fifofree %x\n", ithread,(void*)(buffer[ithread]));
#endif
}
//eiger (complete frames) + other detectors //eiger (complete frames) + other detectors
else{
pc = (rc/onePacketSize); pc = (rc/onePacketSize);
#ifdef VERYDEBUG #ifdef VERYDEBUG
cout << ithread << " last rc:"<<rc<<endl; cout << ithread << " last rc:"<<rc<<endl;
@ -2240,7 +2227,7 @@ int i;
#ifdef FIFO_DEBUG #ifdef FIFO_DEBUG
cprintf(RED,"%d listener last buffer pushed into fifo %x\n", ithread,(void*)(buffer[ithread])); cprintf(RED,"%d listener last buffer pushed into fifo %x\n", ithread,(void*)(buffer[ithread]));
#endif #endif
}
} }
@ -2269,9 +2256,9 @@ int i;
#endif #endif
pthread_mutex_unlock(&(status_mutex)); pthread_mutex_unlock(&(status_mutex));
#ifdef VERYDEBUG //#ifdef VERYDEBUG
cout << ithread << ": Frames listened to " << dec << ((totalListeningFrameCount[ithread]*numListeningThreads)/packetsPerFrame) << endl; cout << ithread << ": Frames listened to " << dec << ((totalListeningFrameCount[ithread]*numListeningThreads)/packetsPerFrame) << endl;
#endif //#endif
//waiting for all listening threads to be done, to print final count of frames listened to //waiting for all listening threads to be done, to print final count of frames listened to
if(ithread == 0){ if(ithread == 0){
@ -2281,12 +2268,12 @@ int i;
#endif #endif
while(listeningthreads_mask) while(listeningthreads_mask)
usleep(5000); usleep(5000);
#ifdef VERYDEBUG //#ifdef VERYDEBUG
t = 0; t = 0;
for(i=0;i<numListeningThreads;++i) for(i=0;i<numListeningThreads;++i)
t += totalListeningFrameCount[i]; t += totalListeningFrameCount[i];
cout << "Total frames listened to " << dec <<(t/packetsPerFrame) << endl; cout << "Total frames listened to " << dec <<(t/packetsPerFrame) << endl;
#endif //#endif
} }
} }
@ -2516,9 +2503,107 @@ void UDPStandardImplementation::writeToFile_withoutCompression(char* buf,int num
int UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* wbuffer[], int &npackets){
int totalheader = HEADER_SIZE_NUM_TOT_PACKETS + EIGER_HEADER_LENGTH;
int i,j;
if (cbAction < DO_EVERYTHING){
for(i=0;i<numListeningThreads;++i)
//for eiger 32 bit mode, currframenum like gotthard, does not start from 0 or 1
rawDataReadyCallBack(currframenum, wbuffer[i], npackets * onePacketSize, sfilefd, guiData,pRawDataReady);
}
else if (npackets > 0){
for(j=0;j<numListeningThreads;++j){
#ifdef WRITE_HEADERS
if (myDetectorType == EIGER){
for (i = 0; i < packetsPerFrame/2; i++){
//overwriting frame number in header
(*(uint32_t*)(((eiger_packet_header *)((char*)(wbuffer[j] + totalheader + EIGER_ONE_GIGA_ONE_PACKET_SIZE*i)))->num1)) = currframenum;
//overwriting port number and dynamic range
if (!j) (*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[j] + totalheader + EIGER_ONE_GIGA_ONE_PACKET_SIZE*i)))->num3)) = (dynamicRange<<2);
else (*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[j] + totalheader + EIGER_ONE_GIGA_ONE_PACKET_SIZE*i)))->num3)) = ((dynamicRange<<2)|(0x1));
#ifdef VERYDEBUG
cprintf(RED, "%d - 0x%x - %d\n", i,
(*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[j] + totalheader +i*EIGER_ONE_GIGA_ONE_PACKET_SIZE)))->num3)),
(*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[j] + totalheader +i*EIGER_ONE_GIGA_ONE_PACKET_SIZE)))->num4)));
#endif
}
//for 32 bit,port number needs to be changed and packet number reconstructed
if(dynamicRange == 32){
for (i = 0; i < packetsPerFrame/4; i++){
//new packet number that has space for 16 bit
(*(uint16_t*)(((eiger_packet_header *)((char*)(wbuffer[j] + totalheader + EIGER_ONE_GIGA_ONE_PACKET_SIZE*i)))->num2))
= ((*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[j] + totalheader + EIGER_ONE_GIGA_ONE_PACKET_SIZE*i)))->num4)));
#ifdef VERYDEBUG
cprintf(RED, "%d - 0x%x - %d - %d\n", i,
(*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[j] + totalheader +i*EIGER_ONE_GIGA_ONE_PACKET_SIZE)))->num3)),
(*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[j] + totalheader +i*EIGER_ONE_GIGA_ONE_PACKET_SIZE)))->num4)),
(*(uint16_t*)(((eiger_packet_header *)((char*)(wbuffer[j] + totalheader +i*EIGER_ONE_GIGA_ONE_PACKET_SIZE)))->num2)));
#endif
}
for (i = packetsPerFrame/4; i < packetsPerFrame/2; i++){
//new packet number that has space for 16 bit
(*(uint16_t*)(((eiger_packet_header *)((char*)(wbuffer[j] + totalheader + EIGER_ONE_GIGA_ONE_PACKET_SIZE*i)))->num2))
= ((*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[j] + totalheader + EIGER_ONE_GIGA_ONE_PACKET_SIZE*i)))->num4))+(packetsPerFrame/4));
#ifdef VERYDEBUG
cprintf(RED, "%d -0x%x - %d - %d\n", i,
(*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[j] + totalheader +i*EIGER_ONE_GIGA_ONE_PACKET_SIZE)))->num3)),
(*(uint8_t*)(((eiger_packet_header *)((char*)(wbuffer[j] + totalheader +i*EIGER_ONE_GIGA_ONE_PACKET_SIZE)))->num4)),
(*(uint16_t*)(((eiger_packet_header *)((char*)(wbuffer[j] + totalheader +i*EIGER_ONE_GIGA_ONE_PACKET_SIZE)))->num2)));
#endif
}
}
}
#endif
writeToFile_withoutCompression(wbuffer[j], npackets,currframenum);
}
#ifdef VERYDEBUG
cprintf(BLUE,"written everyting\n");
#endif
}
if(myDetectorType == EIGER) {
#ifdef VERYDEBUG
cprintf(BLUE,"gonna copy frame\n");
#endif
copyFrameToGui(wbuffer,currframenum);
#ifdef VERYDEBUG
cprintf(BLUE,"copied frame\n");
#endif
for(i=0;i<numListeningThreads;++i){
while(!fifoFree[i]->push(wbuffer[i]));
#ifdef FIFO_DEBUG
cprintf(BLUE,"%d writer freed pushed into fifofree %x for listener %d\n",ithread, (void*)(wbuffer[i]),i);
#endif
}
}
else{
//copy to gui
if(npackets >= packetsPerFrame){//min 1 frame, but neednt be
//if(npackets == packetsPerFrame * numJobsPerThread){ //only full frames
copyFrameToGui(NULL,-1,wbuffer[0]+HEADER_SIZE_NUM_TOT_PACKETS);
#ifdef VERYVERBOSE
cout << ithread << " finished copying" << endl;
#endif
}//else cout << "unfinished buffersize" << endl;
while(!fifoFree[0]->push(wbuffer[0]));
#ifdef FIFO_DEBUG
cprintf(BLUE,"%d writer freed pushed into fifofree %x for listener 0\n",ithread, (void*)(wbuffer[0]));
#endif
}
}