write and copy to gui left

This commit is contained in:
Dhanya Maliakal 2015-07-27 12:36:45 +02:00
parent 878c12d43e
commit f970baf1b8
2 changed files with 144 additions and 229 deletions

View File

@ -442,8 +442,9 @@ private:
* Its called for the first packet of a scan or acquistion
* Sets the startframeindices and the variables to know if acquisition started
* @param ithread listening thread number
* @param numbytes number of bytes it listened to
*/
void startFrameIndices(int ithread);
void startFrameIndices(int ithread, int numbytes);
/**
* This is called when udp socket is shut down

View File

@ -828,15 +828,17 @@ void UDPStandardImplementation::setupFifoStructure(){
if(myDetectorType == MOENCH)
fifosize = MOENCH_FIFO_SIZE;
else if(myDetectorType == EIGER)
fifosize = EIGER_FIFO_SIZE;
fifosize = EIGER_FIFO_SIZE * packetsPerFrame;
if(fifosize % numJobsPerThread)
fifosize = (fifosize/numJobsPerThread)+1;
else
fifosize = fifosize/numJobsPerThread;
cout << "Number of Frames per buffer:" << numJobsPerThread << endl;
if(myDetectorType == EIGER)
cout << "1 packet per buffer" << endl;
else
cout << "Number of Frames per buffer:" << numJobsPerThread << endl;
cout << "Fifo Size:" << fifosize << endl;
/*
@ -854,27 +856,32 @@ void UDPStandardImplementation::setupFifoStructure(){
#endif
delete fifoFree[i];
}
if(fifo[i]) delete fifo[i];
if(fifo[i]) delete fifo[i];
if(mem0[i]) free(mem0[i]);
fifoFree[i] = new CircularFifo<char>(fifosize);
fifo[i] = new CircularFifo<char>(fifosize);
int whatperbuffer = bufferSize;
if(myDetectorType == EIGER)
whatperbuffer = onePacketSize;
//allocate memory
mem0[i]=(char*)malloc((bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize);
mem0[i]=(char*)malloc((whatperbuffer * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize);
/** shud let the client know about this */
if (mem0[i]==NULL){
cout<<"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR LISTENING !!!!!!!+++++++++++++++++++++" << endl;
exit(-1);
}
buffer[i]=mem0[i];
//push the addresses into freed fifoFree and writingFifoFree
while (buffer[i]<(mem0[i]+(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*(fifosize-1))) {
while (buffer[i]<(mem0[i]+(whatperbuffer * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*(fifosize-1))) {
fifoFree[i]->push(buffer[i]);
#ifdef FIFO_DEBUG
cprintf(BLUE,"%d fifostructure free pushed into fifofree %x\n", i, (void*)(buffer[i]));
#endif
buffer[i]+=(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS);
buffer[i]+=(whatperbuffer * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS);
}
}
cout << "Fifo structure(s) reconstructed" << endl;
@ -1683,7 +1690,8 @@ int UDPStandardImplementation::startListening(){
if(tempchar) {delete [] tempchar;tempchar = NULL;}
if(myDetectorType != EIGER)
tempchar = new char[onePacketSize * ((packetsPerFrame/numListeningThreads) - 1)]; //gotthard: 1packet size, moench:39 packet size
else
maxBufferSize = 0;
@ -1747,7 +1755,7 @@ int UDPStandardImplementation::startListening(){
//start indices for each start of scan/acquisition
if((!measurementStarted) && (rc > 0)){
pthread_mutex_lock(&progress_mutex);
startFrameIndices(ithread);
startFrameIndices(ithread, rc);
pthread_mutex_unlock(&progress_mutex);
}
@ -1825,6 +1833,7 @@ int UDPStandardImplementation::startListening(){
case EIGER:
//because even headers might be included, so not packet count
(*((uint32_t*)(buffer[ithread]))) = rc;
packetcount = 1;
break;
default:
@ -1890,16 +1899,16 @@ int UDPStandardImplementation::startWriting(){
thread_started = 1;
int totalheader = HEADER_SIZE_NUM_TOT_PACKETS + EIGER_HEADER_LENGTH;
int numpackets[numListeningThreads], popready[numListeningThreads], woffset[numListeningThreads], nf;
bool endofacquisition, startheader[numListeningThreads];
int numpackets[numListeningThreads], popready[numListeningThreads], nf;
bool startdatapacket[numListeningThreads],fullframe[numListeningThreads];
uint32_t tempframenum[numListeningThreads];
uint32_t lastpacketheader[numListeningThreads], currentpacketheader[numListeningThreads];
int numberofmissingpackets[numListeningThreads];
char* tempbuffer[numListeningThreads] = NULL;
char* tempbuffer = NULL;
int tempoffset[numListeningThreads];
int LAST_PACKET_VALUE;
bool fullframe;
char* wbuf[numListeningThreads];//interleaved
@ -1930,31 +1939,33 @@ int UDPStandardImplementation::startWriting(){
//allow them all to be popped initially
for(i=0;i<numListeningThreads;++i){
numpackets[i] = 0;
popready[i] = 1;
woffset[i] = HEADER_SIZE_NUM_TOT_PACKETS;
startdatapacket[i] = false;
fullframe[i] = false;
tempframenum[i] = 0;
lastpacketheader[i] = -1;
currentpacketheader[i] = -1;
tempframenum[i] = 0;
numberofmissingpackets[i] = 0;
startheader[i] = false;
if(tempbuffer[i]) {
delete [] tempbuffer[i];
tempbuffer[i] = NULL;
}
if(myDetectorType == EIGER){
tempbuffer[i] = new char[onePacketSize * packetsPerFrame];
currframenum = 0;
}
tempoffset[i] = 0;
tempoffset[i] = i*(packetsPerFrame/numListeningThreads)*onePacketSize;
}
if(tempbuffer) {
delete [] tempbuffer;
tempbuffer = NULL;
}
if(myDetectorType == EIGER){
tempbuffer = new char[onePacketSize * packetsPerFrame];
currframenum = 0;
}
switch(dynamicRange){
case 4: LAST_PACKET_VALUE = 0x40; break;
case 8: LAST_PACKET_VALUE = 0x80; break;
case 16: LAST_PACKET_VALUE = 0xff; break;
case 32: LAST_PACKET_VALUE = 0xff; break;
case 32: LAST_PACKET_VALUE = 0xff; break;
default: break;
}
@ -1963,13 +1974,10 @@ int UDPStandardImplementation::startWriting(){
//pop
#ifdef VERYDEBUG
cprintf(GREEN,"%d writer gonna pop from fifo: %d\n",ithread,i);
#endif
for(i=0;i<numListeningThreads;++i){
if(popready[i]){
#ifdef VERYDEBUG
cout << "writer gonna pop from fifo:" << i << endl;
cprintf(GREEN,"%d writer poped from fifo %x\n", ithread, (void*)(wbuf[i]));
#endif
fifo[i]->pop(wbuf[i]);
#ifdef FIFO_DEBUG
@ -1977,27 +1985,18 @@ int UDPStandardImplementation::startWriting(){
#endif
numpackets[i] = (uint32_t)(*((uint32_t*)wbuf[i]));
#ifdef VERYDEBUG
cout << i << " numpackets:" << dec << numpackets[i] << "for fifo :"<< i << endl;
cprintf(GREEN,"%d numpackets: %d for fifo :%d\n", ithread, numpackets[i], i);
#endif
//dont pop until ready
popready[i] = 0;
//reset offset
woffset[i] = 0;
//dont pop again if dummy packet
if(!numpackets[i])
popready[i] = 0;
}
}
//check for end of acquisition
endofacquisition = true;
for(i=0;i<numListeningThreads;++i){
if(numpackets[i] == 0xFFFF)
popready[i] = 0;
if(popready[i])
endofacquisition = false;
}
//end of acquisition
if(endofacquisition){
if((!numpackets[0])&& (!numpackets[1])){
#ifdef VERYDEBUG
cprintf(GREEN,"%d End of Acquisition in Writing Thread\n", ithread);
#endif
@ -2008,202 +2007,106 @@ int UDPStandardImplementation::startWriting(){
//for progress
if(myDetectorType == EIGER){
fullframe = true;
//trying to find a full frame
for(i=0;i<numListeningThreads;++i){
//not dummy buffer
if(numpackets[i]){
//makes sure it doesnt add the finished thread packet all over again
if(tempframenum == currframenum)
while(woffset[i] <= numpackets[i]){
//offset outside boundaries, also eliminates dummy packet
if((numpackets[i] != EIGER_HEADER_LENGTH) && (numpackets[i] != onePacketSize)){
if(numpackets[i])
cprintf(RED, "WARNING: Got a weird packet size: %d from fifo %d\n", numpackets[i],i);
#ifdef VERBOSE
else
cprintf(RED, "WARNING: Dummy packet: %d from fifo %d\n", numpackets[i],i);
#endif
continue;
}
//not dummy buffer and not after getting a full frame
if(numpackets[i] && (!fullframe[i])){
//offset outside boundaries to even check for header
if((woffset[i] + EIGER_HEADER_LENGTH)>= numpackets[i]){
popready[i] = 1;
fullframe = false;
break;
}
//header packet
if( 0x01 == (*(uint8_t*)(((eiger_image_header *)((char*)(wbuf[i])))->header_confirm))){
//check if header
if( 0x01 == (*(uint16_t*)(((eiger_image_header *)((char*)(wbuf[i] + woffset[i])))->header_confirm))){
//expected frame header -eiger frame numbers start at 1, so need to -1
if(tempframenum[i] == (htonl(*(unsigned int*)((eiger_image_header *)((char*)(wbuf[ithread] + woffset[i])))->fnum)+(startFrameIndex-1))){
woffset[i] += EIGER_HEADER_LENGTH;
startheader[i] = true;
numberofmissingpackets[i] = 0;
lastpacketheader[i] = -1;
tempoffset[i] = 0;
}
//wrong header - leave
else{
numberofmissingpackets[i] += (LAST_PACKET_VALUE - lastpacketheader[i]);
tempframenum[i]++;
//add missing packets
for(j=0;j<numberofmissingpackets[i];++j){
tempbuffer[i][tempoffset[i]] = BLANK_PACKET;
tempoffset[i] += onePacketSize;
}
//reset
lastpacketheader[i] = -1;
tempoffset[i] = 0 ;
break;
}
}
//packet
//new frame (no datapacket received yet), update frame num and corrected for fnum reset for scans
if(!startdatapacket[i])
tempframenum[i] = htonl(*(unsigned int*)((eiger_image_header *)((char*)(wbuf[ithread] + woffset[i])))->fnum)+(startFrameIndex-1);
//next frame, leave
else{
//update current packet
currentpacketheader[i] = ((*(uint8_t*)(((eiger_packet_header *)((char*)(wbuf[i] + woffset[i])))->num4)));
//last packet - leave
if(currentpacketheader[i] == LAST_PACKET_VALUE){
//fill buffer
tempbuffer[i][tempoffset[i]] = wbuf[i] + woffset[i];
woffset[i] += onePacketSize;
//reset
startheader[i] = false;
lastpacketheader[i] = -1;
tempoffset[i] = 0;
break;
}
//same frame packet
if(currentpacketheader[i] > lastpacketheader[i]){
if(!startheader[i]){
tempframenum[i]++;
}
else
startheader[i] = false;
lastpacketheader[i] = -1;
numberofmissingpackets[i] = 0;
numberofmissingpackets[i] += (currentpacketheader[i] - lastpacketheader[i] -1);
//add missing packets
for(j=0;j<numberofmissingpackets[i];++j){
tempbuffer[i][tempoffset[i]] = BLANK_PACKET;
tempoffset[i] += onePacketSize;
}
//add current packet
tempbuffer[tempoffset[i]] = wbuf[i] + woffset[i];
woffset[i] += onePacketSize;
lastpacketheader[i] = currentpacketheader[i];
//add missing packets
numberofmissingpackets += (LAST_PACKET_VALUE = lastpacketheader[i]);
for(j=0;j<numberofmissingpackets[i];++j){
tempbuffer[tempoffset[i]] = BLANK_PACKET;
tempoffset[i] += onePacketSize;
}
//different packet header - leave
else{
numberofmissingpackets[i] = LAST_PACKET_VALUE - lastpacketheader[i];
//add missing packets
for(j=0;j<numberofmissingpackets[i];++j){
tempbuffer[i][tempoffset[i]] = BLANK_PACKET;
tempoffset[i] += onePacketSize;
}
//reset
lastpacketheader[i] = -1;
tempoffset[i] = 0;
break;
//set fullframe and dont let fifo pop over it until written
fullframe[i] = true;
popready[i] = false;
}
}
//data packet
else{
//update current packet
currentpacketheader[i] = ((*(uint8_t*)(((eiger_packet_header *)((char*)(wbuf[i] + woffset[i])))->num4)));
//same frame packet - continue building frame
if(currentpacketheader[i] > lastpacketheader[i]){
//add missing packets
numberofmissingpackets[i] += (currentpacketheader[i] - lastpacketheader[i] -1);
for(j=0;j<numberofmissingpackets[i];++j){
tempbuffer[tempoffset[i]] = BLANK_PACKET;
tempoffset[i] += onePacketSize;
}
//add current packet
tempbuffer[tempoffset[i]] = wbuf[i];
tempoffset[i] += onePacketSize;
//update last packet
lastpacketheader[i] = currentpacketheader[i];
//last frame got, this will save time and also for last frames, it doesnt wait for stop receiver
if(currentpacketheader[i] == LAST_PACKET_VALUE){
fullframe[i] = true;
popready[i] = false;
}
}
}
}
}
//received a full frame
if(tempframenum[0] == tempframenum[1] != currframenum){
currframenum = tempframenum[0];
//handleWithoutDataCompression(ithread, tempbuffer);
if (cbAction < DO_EVERYTHING){
;/***/
}
else{
#ifdef WRITE_HEADERS
for (i = 0; i < packetsPerFrame/2; i++){
//overwriting frame number in header
(*(uint32_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + onePacketSize*i)))->num1)) = currframenum;
//overwriting port number and dynamic range
if (!j) (*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + onePacketSize*i)))->num3)) = (dynamicRange<<2);
else (*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + onePacketSize*i)))->num3)) = ((dynamicRange<<2)|(0x1));
#ifdef VERYDEBUG
cprintf(GREEN, "%d - 0x%x - %d\n", i,
(*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + i*onePacketSize)))->num3)),
(*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + i*onePacketSize)))->num4)));
#endif
}
//for 32 bit,port number needs to be changed and packet number reconstructed
if(dynamicRange == 32){
for (i = 0; i < packetsPerFrame/4; i++){
//new packet number that has space for 16 bit
(*(uint16_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + onePacketSize*i)))->num2))
= ((*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + onePacketSize*i)))->num4)));
#ifdef VERYDEBUG
cprintf(GREEN, "%d - 0x%x - %d - %d\n", i,
(*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + i*onePacketSize)))->num3)),
(*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + i*onePacketSize)))->num4)),
(*(uint16_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + i*onePacketSize)))->num2)));
#endif
}
for (i = packetsPerFrame/4; i < packetsPerFrame/2; i++){
//new packet number that has space for 16 bit
(*(uint16_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + onePacketSize*i)))->num2))
= ((*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + onePacketSize*i)))->num4))+(packetsPerFrame/4));
#ifdef VERYDEBUG
cprintf(GREEN, "%d -0x%x - %d - %d\n", i,
(*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + i*onePacketSize)))->num3)),
(*(uint8_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + i*onePacketSize)))->num4)),
(*(uint16_t*)(((eiger_packet_header *)((char*)(tempoffset[j] + i*onePacketSize)))->num2)));
#endif
//next frame packet - leave
else{
//add missing packets
numberofmissingpackets += (LAST_PACKET_VALUE = lastpacketheader[i]);
for(j=0;j<numberofmissingpackets[i];++j){
tempbuffer[tempoffset[i]] = BLANK_PACKET;
tempoffset[i] += onePacketSize;
}
//set fullframe and dont let fifo pop over it until written
fullframe[i] = true;
popready[i] = false;
}
}
#endif
//writeToFile_withoutCompression(wbuffer[j], npackets,currframenum);
}
}
#ifdef VERYDEBUG
cprintf(GREEN,"gonna copy frame\n");
#endif
copyFrameToGui(wbuffer,currframenum);
//#ifdef VERYDEBUG
cprintf(GREEN,"copied frame\n");
//#endif
for(i=0;i<numListeningThreads;++i){
while(!fifoFree[i]->push(tempoffset[i]));
#ifdef FIFO_DEBUG
cprintf(GREEN,"%d writer freed pushed into fifofree %x for listener %d\n",ithread, (void*)(tempoffset[i]),i);
#endif
}
//check if a full frame received
if(fullframe[0] && fullframe[1]){
for(int i=0;i<numListeningThreads;i++){
popready[i] = 1;
startdatapacket[i] = 0;
tempoffset[i] = i*(packetsPerFrame/numListeningThreads)*onePacketSize;
lastpacketheader[i] = -1;
numberofmissingpackets[i] = 0;
}
if(tempframenum[0] != tempframenum[1])
cprintf(RED,"Frame numbers mismatch!!! %d %d\n",tempframenum[0],tempframenum[1]);
currentpacketheader = tempframenum[0];
//write
//copy
}
#ifdef EIGER_DEBUG2
@ -2302,12 +2205,17 @@ int UDPStandardImplementation::startWriting(){
void UDPStandardImplementation::startFrameIndices(int ithread){
void UDPStandardImplementation::startFrameIndices(int ithread, int numbytes){
FILE_LOG(logDEBUG) << __AT__ << " called";
//add currframenum later in this method for scans
if (myDetectorType == EIGER){
//add currframenum later in this method for scans
startFrameIndex = htonl(*(unsigned int*)((eiger_image_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum);
//check if its a header
if(EIGER_HEADER_LENGTH == numbytes)
startFrameIndex = (htonl(*(unsigned int*)((eiger_image_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum))-1;
//missed header packet, so default value
else
startFrameIndex = 0;
}
//gotthard has +1 for frame number and not a short frame
else if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
@ -2321,13 +2229,13 @@ void UDPStandardImplementation::startFrameIndices(int ithread){
//start of acquisition
if(!acqStarted){
startAcquisitionIndex=startFrameIndex;
currframenum = startAcquisitionIndex;
//currframenum = startAcquisitionIndex;
acqStarted = true;
cprintf(BLUE,"%d startAcquisitionIndex:%d\n", ithread, startAcquisitionIndex);
}
//for scans, cuz currfraenum resets
else if (myDetectorType == EIGER){
startFrameIndex += currframenum;
startFrameIndex += (currframenum+1);
}
@ -2369,12 +2277,18 @@ void UDPStandardImplementation::stopListening(int ithread, int rc, int &pc, int
//push the last buffer into fifo
else{
pc = (rc/onePacketSize);
if(myDetectorType == EIGER){
(*((uint32_t*)(buffer[ithread]))) = rc;
pc = 1;
}else{
pc = (rc/onePacketSize);
(*((uint32_t*)(buffer[ithread]))) = pc;
}
#ifdef VERYDEBUG
cprintf(BLUE,"%d last rc:%d\n",ithread, rc);
cprintf(BLUE,"%d last packetcount:%d\n", ithread, pc);
#endif
(*((uint32_t*)(buffer[ithread]))) = pc;
totalListeningFrameCount[ithread] += pc;
while(!fifo[ithread]->push(buffer[ithread]));
#ifdef FIFO_DEBUG