done for eiger, some checks for frameindex=-1,socket closing earlier than last socket etc

This commit is contained in:
Dhanya Maliakal 2016-09-20 15:12:26 +02:00
parent bdcbdba2ab
commit d6ca7ecbc4
3 changed files with 60 additions and 39 deletions

View File

@ -619,7 +619,7 @@ enum communicationProtocol{
nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length); nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length);
if(nsent < packet_size) { if(nsent < packet_size) {
if(nsent){ if(nsent){
if(nsent != header_packet_size) if((nsent != header_packet_size) && (nsent != -1))
cprintf(RED,"Incomplete Packet size %d\n",nsent); cprintf(RED,"Incomplete Packet size %d\n",nsent);
} }
break; break;

View File

@ -164,7 +164,7 @@ void UDPStandardImplementation::initializeMembers(){
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){ for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){
frameIndex[i] = 0; frameIndex[i] = 0;
currentFrameNumber[i] = 0; currentFrameNumber[i] = 0;
frameNumberInPreviousFile[i] = -1; frameNumberInPreviousFile[i] = 0;
lastFrameNumberInFile[i] = -1; lastFrameNumberInFile[i] = -1;
totalPacketsInFile[i] = 0; totalPacketsInFile[i] = 0;
totalWritingPacketCount[i] = 0; totalWritingPacketCount[i] = 0;
@ -1048,7 +1048,6 @@ void UDPStandardImplementation::stopReceiver(){
} }
//semaphore destroy //semaphore destroy
cout<<"gonna destroy writerguisemphore"<<endl;
for(int i=0; i < numberofWriterThreads; i++){ for(int i=0; i < numberofWriterThreads; i++){
sem_destroy(&writerGuiSemaphore[i]); sem_destroy(&writerGuiSemaphore[i]);
sem_destroy(&dataCallbackWriterSemaphore[i]); sem_destroy(&dataCallbackWriterSemaphore[i]);
@ -1565,24 +1564,19 @@ int UDPStandardImplementation::createNewFile(int ithread){
//Print packet loss and filenames //Print packet loss and filenames
if(!totalWritingPacketCount[ithread]){ if(!totalWritingPacketCount[ithread]){
frameNumberInPreviousFile[ithread] = -1; frameNumberInPreviousFile[ithread] = 0;
cout << "Thread " << ithread << " File:" << completeFileName[ithread] << endl; cout << "Thread " << ithread << " File:" << completeFileName[ithread] << endl;
}else{ }else{
//Assumption for startFrameindex usign ithread: datacompression never enters here and therefore is always same number of listening and writing threads to use ithread if(!ithread) cout << "Thread " << ithread << " File:" << completeFileName[ithread]
if (frameNumberInPreviousFile[ithread] == -1) << "\tPacket Loss:" << setw(4)<<fixed << setprecision(4) << dec <<
frameNumberInPreviousFile[ithread] = startFrameIndex-1; (int)(( ( (currentFrameNumber[ithread]-frameNumberInPreviousFile[ithread]) - (((double)totalPacketsInFile[ithread])/(double)packetsPerFrame) )/
(double)(currentFrameNumber[ithread]-frameNumberInPreviousFile[ithread])) *100.000)
<< "%\tFrame#:" << currentFrameNumber[ithread]
cout << "Thread " << ithread << " File:" << completeFileName[ithread] << "\tPackets in File: " << totalPacketsInFile[ithread]
<< "\tPacket Loss: " << setw(4)<<fixed << setprecision(4) << dec << // << "\t\t frameNumberInPreviousFile:" << frameNumberInPreviousFile[ithread]
(int)((( ((currentFrameNumber[ithread]-1)-frameNumberInPreviousFile[ithread]) - ((totalPacketsInFile[ithread])/packetsPerFrame))/ // << "\tIndex:" << dec << index
(double)((currentFrameNumber[ithread]-1)-frameNumberInPreviousFile[ithread]))*100.000) << "\tPackets Lost:" << dec << ( ((int)(currentFrameNumber[ithread]-frameNumberInPreviousFile[ithread])*packetsPerFrame) -
<< "\tFrame Number: " << currentFrameNumber[ithread] totalPacketsInFile[ithread])
<< "\tTotal Packets in File: " << totalPacketsInFile[ithread]
// << "\t\t frameNumberInPreviousFile: " << frameNumberInPreviousFile[ithread]
// << "\tIndex " << dec << index
<< "\tPackets Lost " << dec << ( ((int)((currentFrameNumber[ithread]-1)-frameNumberInPreviousFile[ithread])) -
((totalPacketsInFile[ithread])/packetsPerFrame))
<< endl; << endl;
} }
@ -1688,19 +1682,21 @@ void UDPStandardImplementation::startDataCallback(){
uint64_t fnum = 0; uint64_t fnum = 0;
uint32_t pnum = 0; uint32_t pnum = 0;
uint32_t snum = 0; uint32_t snum = 0;
bool randomSendNow = true; bool randomSendNow = true;
bool newFrame = false;
//socket details
void *context = zmq_ctx_new(); void *context = zmq_ctx_new();
void *zmqsocket = zmq_socket(context, ZMQ_PUSH); // create a publisher void *zmqsocket = zmq_socket(context, ZMQ_PUSH); // create a publisher
int val = -1; int val = -1;
zmq_setsockopt(zmqsocket, ZMQ_LINGER, &val,sizeof(val)); // wait for the unsent packets before closing socket zmq_setsockopt(zmqsocket, ZMQ_LINGER, &val,sizeof(val)); // wait for the unsent packets before closing socket
zmq_bind(zmqsocket,hostName); // bind zmq_bind(zmqsocket,hostName); // bind
//let calling function know thread started and obtained current (after sockets created) //let calling function know thread started and obtained current (after sockets created)
if(!zmqThreadStarted) if(!zmqThreadStarted)
zmqThreadStarted = true; zmqThreadStarted = true;
//header details
const char *type = "float64"; const char *type = "float64";
const char *shape= "[1024, 512]"; const char *shape= "[1024, 512]";
const char *jsonFmt ="{\"htype\":[\"chunk-1.0\"], \"type\":\"%s\", \"shape\":%s, \"acqIndex\":%d, \"fIndex\":%d, \"subfnum\":%d, \"fname\":\"%s\"}"; const char *jsonFmt ="{\"htype\":[\"chunk-1.0\"], \"type\":\"%s\", \"shape\":%s, \"acqIndex\":%d, \"fIndex\":%d, \"subfnum\":%d, \"fname\":\"%s\"}";
@ -1708,7 +1704,10 @@ void UDPStandardImplementation::startDataCallback(){
int acquisitionIndex = -1; int acquisitionIndex = -1;
int frameIndex = -1; int frameIndex = -1;
int subframeIndex = -1; int subframeIndex = -1;
bool newFrame = false; #ifdef DEBUG
int oldpnum = 0;
#endif
int datapacketscaught = 0;
/* inner loop - loop for each buffer */ /* inner loop - loop for each buffer */
//until mask reset (dummy pcaket got by writer) //until mask reset (dummy pcaket got by writer)
@ -1719,15 +1718,14 @@ void UDPStandardImplementation::startDataCallback(){
//wait for receiver to send more data //wait for receiver to send more data
sem_wait(&dataCallbackWriterSemaphore[ithread]); sem_wait(&dataCallbackWriterSemaphore[ithread]);
//everything is done //end if acquistion
if(guiNumPackets[ithread] == dummyPacketValue){ if(guiNumPackets[ithread] == dummyPacketValue){
//sending previous half frames if any //sending previous half frames if any
if(newFrame){cout<<"dummy but something remaining"<<endl; if(newFrame){
//send header //send header
//update frame details //update frame details
frameIndex = fnum; frameIndex = fnum;if(frameIndex==-1) cprintf(RED,"frameindex = -1, 111\n");
acquisitionIndex = fnum - startAcquisitionIndex; acquisitionIndex = fnum - startAcquisitionIndex;
if(dynamicRange == 32) subframeIndex = snum; if(dynamicRange == 32) subframeIndex = snum;
int len = sprintf(buf,jsonFmt,type,shape, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]); int len = sprintf(buf,jsonFmt,type,shape, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]);
@ -1737,15 +1735,14 @@ void UDPStandardImplementation::startDataCallback(){
newFrame = false; newFrame = false;
} }
//send final header //send final header
//update frame details //update frame details
#ifdef DEBUG #ifdef DEBUG
cout << "sending dummy" << endl; cout << "sending dummy" << endl;
#endif #endif
frameIndex = -1; frameIndex = -9;
acquisitionIndex = -1; acquisitionIndex = -9;
subframeIndex = -1; subframeIndex = -9;
int len = sprintf(buf,jsonFmt,type,shape, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]); int len = sprintf(buf,jsonFmt,type,shape, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]);
zmq_send(zmqsocket, buf,len, ZMQ_SNDMORE); zmq_send(zmqsocket, buf,len, ZMQ_SNDMORE);
//send final data //send final data
@ -1754,7 +1751,9 @@ void UDPStandardImplementation::startDataCallback(){
pthread_mutex_lock(&statusMutex); pthread_mutex_lock(&statusMutex);
dataCallbackThreadsMask^=(1<<ithread); dataCallbackThreadsMask^=(1<<ithread);
pthread_mutex_unlock(&statusMutex); pthread_mutex_unlock(&statusMutex);
#ifdef DEBUG
cprintf(GREEN,"Data Streaming %d: packets sent:%d\n",ithread,datapacketscaught);
#endif
continue; continue;
} }
@ -1774,6 +1773,7 @@ void UDPStandardImplementation::startDataCallback(){
} }
size = guiNumPackets[ithread]*onePacketSize; size = guiNumPackets[ithread]*onePacketSize;
datapacketscaught+=guiNumPackets[ithread];
offset=0; offset=0;
//copy packet by packet -getting rid of headers, -in the right order(padding missing packets) //copy packet by packet -getting rid of headers, -in the right order(padding missing packets)
@ -1783,7 +1783,14 @@ void UDPStandardImplementation::startDataCallback(){
while((size>0) && (getFrameandPacketNumber(ithread, latestData[ithread]+offset, fnum, pnum,snum)==FAIL)){ while((size>0) && (getFrameandPacketNumber(ithread, latestData[ithread]+offset, fnum, pnum,snum)==FAIL)){
offset+= onePacketSize; offset+= onePacketSize;
} }
//if(!ithread) cout<< ithread <<" fnum:"<< fnum<<" pnum:"<<pnum<<endl;
#ifdef DEBUG
if(pnum != (oldpnum+1)){
cprintf(RED,"%d - packets missing: %d (old pnum: %d, new pnum: %d)\n",ithread, pnum-oldpnum-1,oldpnum,pnum);
}
oldpnum=pnum;
#endif
//end of buffer //end of buffer
if(offset >= size) if(offset >= size)
break; break;
@ -1791,11 +1798,14 @@ void UDPStandardImplementation::startDataCallback(){
//last packet of same frame //last packet of same frame
if(fnum == currentfnum && pnum == packetsPerFrame){ if(fnum == currentfnum && pnum == packetsPerFrame){
#ifdef DEBUG
oldpnum=0;
#endif
memcpy(buffer+((pnum-1)*oneDataSize), latestData[ithread]+offset+8,oneDataSize); memcpy(buffer+((pnum-1)*oneDataSize), latestData[ithread]+offset+8,oneDataSize);
offset+= onePacketSize; offset+= onePacketSize;
//send header //send header
//update frame details //update frame details
frameIndex = fnum; frameIndex = fnum;if(frameIndex==-1) cprintf(RED,"frameindex = -1, 222\n");
acquisitionIndex = fnum - startAcquisitionIndex; acquisitionIndex = fnum - startAcquisitionIndex;
if(dynamicRange == 32) subframeIndex = snum; if(dynamicRange == 32) subframeIndex = snum;
int len = sprintf(buf,jsonFmt,type,shape, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]); int len = sprintf(buf,jsonFmt,type,shape, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]);
@ -1818,10 +1828,20 @@ void UDPStandardImplementation::startDataCallback(){
//same frame (not last) or next frame //same frame (not last) or next frame
else { else {
//next frame //next frame
#ifdef DEBUG
int once = true;
#endif
while(fnum > currentfnum){ while(fnum > currentfnum){
#ifdef DEBUG
if(once){
if((fnum-currentfnum-1)>1) cprintf(RED,"%d Complete sub image missing:%d (cfnum:%d nfnum:%d)\n",
ithread,fnum-currentfnum-1,currentfnum,fnum);
once = false;
}
#endif
//send header //send header
//update frame details //update frame details
frameIndex = fnum; frameIndex = fnum;if(frameIndex==-1) cprintf(RED,"frameindex = -1, 333\n");
acquisitionIndex = fnum - startAcquisitionIndex; acquisitionIndex = fnum - startAcquisitionIndex;
if(dynamicRange == 32) subframeIndex = snum; if(dynamicRange == 32) subframeIndex = snum;
int len = sprintf(buf,jsonFmt,type,shape, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]); int len = sprintf(buf,jsonFmt,type,shape, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]);
@ -2160,10 +2180,11 @@ void UDPStandardImplementation::stopListening(int ithread, int numbytes){
//reset mask and exit loop //reset mask and exit loop
pthread_mutex_lock(&statusMutex); pthread_mutex_lock(&statusMutex);
listeningThreadsMask^=(1<<ithread); listeningThreadsMask^=(1<<ithread);
#ifdef DEBUG4 //#ifdef DEBUG4
cprintf(BLUE,"Listening_Thread %d: Resetting mask of listening thread. New Mask: 0x%x", ithread, listeningThreadsMask); //cprintf(BLUE,"Listening_Thread %d: Resetting mask of listening thread. New Mask: 0x%x", ithread, listeningThreadsMask);
cprintf(BLUE,"Listening_Thread %d: Frames listened to :%d\n",ithread, (totalListeningPacketCount[ithread]/packetsPerFrame)); cprintf(BLUE,"Listening_Thread %d: Packets listened to :%d\n",ithread, (totalListeningPacketCount[ithread]));
#endif //cprintf(BLUE,"Listening_Thread %d: Frames listened to :%d\n",ithread, (totalListeningPacketCount[ithread]/packetsPerFrame));
//#endif
pthread_mutex_unlock(&(statusMutex)); pthread_mutex_unlock(&(statusMutex));
@ -2626,7 +2647,7 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char*
//copy frame for gui //copy frame for gui
//if(npackets >= (packetsPerFrame/numberofListeningThreads)) //if(npackets >= (packetsPerFrame/numberofListeningThreads))
if(dataStreamEnable && npackets) if(dataStreamEnable && npackets > 0)
copyFrameToGui(ithread, wbuffer,npackets); copyFrameToGui(ithread, wbuffer,npackets);
#ifdef DEBUG4 #ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: Copied frame\n"); cprintf(GREEN,"Writing_Thread: Copied frame\n");

View File

@ -2163,7 +2163,7 @@ int slsReceiverTCPIPInterface::set_data_stream_enable(){
strcpy(mess,SET_RECEIVER_ERR_MESSAGE); strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL; ret=FAIL;
} }
else if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){ else if((index >= 0) && (receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING)){
strcpy(mess,"Can not set data stream enable while receiver not idle\n"); strcpy(mess,"Can not set data stream enable while receiver not idle\n");
cprintf(RED,"%s\n",mess); cprintf(RED,"%s\n",mess);
ret = FAIL; ret = FAIL;