only missing data left to be handled in zmqthread in receiver

This commit is contained in:
Dhanya Maliakal 2016-09-19 17:21:28 +02:00
parent 6a244c1057
commit 611f3a26c2
2 changed files with 61 additions and 64 deletions

View File

@ -515,9 +515,10 @@ private:
* @param wbuffer writer buffer
* @param framenumber reference to the frame number
* @param packetnumber reference to the packet number
* @param subframenumber reference to the subframe number
* @return OK or FAIL
*/
int getFrameandPacketNumber(int ithread, char* wbuffer, uint64_t &framenumber, uint32_t &packetnumber);
int getFrameandPacketNumber(int ithread, char* wbuffer, uint64_t &framenumber, uint32_t &packetnumber, uint32_t &subframenumber);
/**
* Find offset upto this frame number and write it to file

View File

@ -568,23 +568,18 @@ uint32_t UDPStandardImplementation::setDataStreamEnable(const uint32_t enable){
FILE_LOG(logDEBUG) << __AT__ << " called";
cout<<"************datasend:"<<enable<<endl;
int olddatasend = dataStreamEnable;
dataStreamEnable = enable;
//if there is a change
if(olddatasend != dataStreamEnable){
cout<<"***Going to destroy data callback threads and create!!!"<<endl;
if(zmqThreadStarted)
createDataCallbackThreads(true);
cout<<"***datacallback threads destroyed"<<endl;
if(dataStreamEnable){
numberofDataCallbackThreads = MAX_NUMBER_OF_LISTENING_THREADS;
if(createDataCallbackThreads() == FAIL){
cprintf(BG_RED,"Error: Could not create data callback threads\n");
}
cout<<"data call back threads created"<<endl;
}
}
@ -1245,12 +1240,12 @@ int UDPStandardImplementation::createDataCallbackThreads(bool destroy){
}
killAllDataCallbackThreads = false;
zmqThreadStarted = false;
FILE_LOG(logDEBUG) << "Info: Data Callback thread(s) destroyed";
FILE_LOG(logINFO) << "Data Callback thread(s) destroyed";
}
//create
else{
FILE_LOG(logDEBUG) << "Info: Creating Data Callback Thread(s)";
FILE_LOG(logINFO) << "Creating Data Callback Thread(s)";
//reset current index
currentThreadIndex = -1;
@ -1296,12 +1291,12 @@ int UDPStandardImplementation::createListeningThreads(bool destroy){
}
killAllListeningThreads = false;
threadStarted = false;
FILE_LOG(logDEBUG) << "Info: Listening thread(s) destroyed";
FILE_LOG(logINFO) << "Listening thread(s) destroyed";
}
//create
else{
FILE_LOG(logDEBUG) << "Info: Creating Listening Thread(s)";
FILE_LOG(logINFO) << "Creating Listening Thread(s)";
//reset current index
currentThreadIndex = -1;
@ -1348,12 +1343,12 @@ int UDPStandardImplementation::createWriterThreads(bool destroy){
}
killAllWritingThreads = false;
threadStarted = false;
FILE_LOG(logDEBUG) << "Info: Writer thread(s) destroyed";
FILE_LOG(logINFO) << "Writer thread(s) destroyed";
}
//create threads
else{
FILE_LOG(logDEBUG) << "Info: Creating Writer Thread(s)";
FILE_LOG(logINFO) << "Creating Writer Thread(s)";
//reset current index
currentThreadIndex = -1;
@ -1667,7 +1662,6 @@ void* UDPStandardImplementation::startWritingThread(void* this_pointer){
void UDPStandardImplementation::startDataCallback(){
cprintf(MAGENTA,"start data call back thread started %d\n",currentThreadIndex);
FILE_LOG(logDEBUG) << __AT__ << " called";
//set current thread value index
@ -1693,12 +1687,15 @@ void UDPStandardImplementation::startDataCallback(){
int currentfnum = -1;
uint64_t fnum = 0;
uint32_t pnum = 0;
uint32_t snum = 0;
bool randomSendNow = true;
void *context = zmq_ctx_new();
void *zmqsocket = zmq_socket(context, ZMQ_PUSH); // create a publisher
int val = -1;
//zmq_setsockopt(zmq_socket, ZMQ_LINGER, &val,sizeof(val)); // wait for the unsent packets before closing socket
zmq_bind(zmqsocket,hostName); // bind
//let calling function know thread started and obtained current (after sockets created)
if(!zmqThreadStarted)
@ -1706,6 +1703,11 @@ void UDPStandardImplementation::startDataCallback(){
const char *type = "float64";
const char *shape= "[1024, 512]";
const char *jsonFmt ="{\"htype\":[\"chunk-1.0\"], \"type\":\"%s\", \"shape\":%s, \"acqIndex\":%d, \"fIndex\":%d, \"subfnum\":%d, \"fname\":\"%s\"}";
char buf[1000];
int acquisitionIndex = -1;
int frameIndex = -1;
int subframeIndex = -1;
/* inner loop - loop for each buffer */
//until mask reset (dummy pcaket got by writer)
@ -1713,28 +1715,22 @@ void UDPStandardImplementation::startDataCallback(){
//let the writer thread continue, while we process carry over if any
sem_post(&writerGuiSemaphore[ithread]);
if(!ithread) cout<<"*** posted writerguisemiphore (callback)"<<endl;
//wait for receiver to send more data
sem_wait(&dataCallbackWriterSemaphore[ithread]);
//everything is done
if(guiNumPackets[ithread] == dummyPacketValue){
/**suing this in clientzmq_msg_more,
* in serve use zmq_msg_send (&message, sender, ZMQ_SNDMORE); and 0 for last packet, but better to check lengt*/
/*if (checkJoinThread()){for different scans
break;
}*/
ostringstream header;
header << "{\"htype\":[\"chunk-1.0\"], "
<< "\"type\":" << "\"" << type << "\", "
<< "\"shape\":" << shape
<< "}";
//cout<<ithread << "header:"<< header.str()<<endl;
/*send half frames from before if any */
//send header
zmq_send(zmqsocket, header.str().c_str(), header.str().length(), ZMQ_SNDMORE);
int len = sprintf(buf,jsonFmt,type,shape, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]);
zmq_send(zmqsocket, buf,len, ZMQ_SNDMORE);
//send data
zmq_send (zmqsocket, "end", 3, 0);
@ -1742,8 +1738,6 @@ void UDPStandardImplementation::startDataCallback(){
dataCallbackThreadsMask^=(1<<ithread);
pthread_mutex_unlock(&statusMutex);
sem_post(&writerGuiSemaphore[ithread]);
if(!ithread) cout<<"*** posted writerguisemiphore (callback dummy)"<<endl;
continue;
}
@ -1769,7 +1763,7 @@ void UDPStandardImplementation::startDataCallback(){
while(offset < size){
//until getting frame number is not error
while((size>0) && (getFrameandPacketNumber(ithread, latestData[ithread]+offset, fnum, pnum)==FAIL)){
while((size>0) && (getFrameandPacketNumber(ithread, latestData[ithread]+offset, fnum, pnum,snum)==FAIL)){
offset+= onePacketSize;
}
@ -1780,24 +1774,23 @@ void UDPStandardImplementation::startDataCallback(){
//new frame
if(currentfnum==-1){
currentfnum = fnum;
//update frame details
frameIndex = fnum;
acquisitionIndex = fnum - startAcquisitionIndex;
if(dynamicRange == 32) subframeIndex = snum;
}
//last packet
if(pnum == packetsPerFrame){
memcpy(buffer+((pnum-1)*oneDataSize), latestData[ithread]+offset+8,oneDataSize);
offset+= onePacketSize;
ostringstream header;
header << "{\"htype\":[\"chunk-1.0\"], "
<< "\"type\":" << "\"" << type << "\", "
<< "\"shape\":" << shape
<< "}";
//cout<<ithread << "header:"<< header.str()<<endl;
//send header
zmq_send(zmqsocket, header.str().c_str(), header.str().length(), ZMQ_SNDMORE);
int len = sprintf(buf,jsonFmt,type,shape, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]);
zmq_send(zmqsocket, buf,len, ZMQ_SNDMORE);
//send data
zmq_send(zmqsocket, buffer, oneframesize, 0);
#ifdef DEBUG
cprintf(BLUE,"%d sent (last packet)\n",ithread);
if(!ithread)cprintf(BLUE,"%d sent (last packet)\n",ithread);
#endif
//start clock after sending
if(!frameToGuiFrequency){
@ -1811,18 +1804,13 @@ void UDPStandardImplementation::startDataCallback(){
else {
//next frame
if(fnum > currentfnum){
ostringstream header;
header << "{\"htype\":[\"chunk-1.0\"], "
<< "\"type\":" << "\"" << type << "\", "
<< "\"shape\":" << shape
<< "}";
//cout<<ithread << "header:"<< header.str()<<endl;
//send header
zmq_send(zmqsocket, header.str().c_str(), header.str().length(), ZMQ_SNDMORE);
int len = sprintf(buf,jsonFmt,type,shape, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]);
zmq_send(zmqsocket, buf,len, ZMQ_SNDMORE);
//send data
zmq_send(zmqsocket, buffer, oneframesize, 0);
#ifdef DEBUG
cprintf(BLUE,"%d sent (last packet)\n",ithread);
cprintf(BLUE,"%d sent (last packet of previous frame)\n",ithread);
#endif
//start clock after sending
if(!frameToGuiFrequency){
@ -1843,16 +1831,16 @@ void UDPStandardImplementation::startDataCallback(){
}/*--end of loop for each buffer (inner loop)*/
//free resources
delete[] buffer;
zmq_unbind(zmqsocket, hostName); /* will this be too soon and cut the sending*/
zmq_close(zmqsocket);
zmq_ctx_destroy(context);
//end of acquisition, wait for next acquisition/change of parameters
sem_wait(&dataCallbackSemaphore[ithread]);
//free resources (only at the next start so that socket not closed before client gets end of packet)
delete[] buffer;
zmq_unbind(zmqsocket, hostName);
zmq_close(zmqsocket);
zmq_ctx_destroy(context);
//check to exit thread (for change of parameters) - only EXIT possibility
@ -1868,7 +1856,7 @@ void UDPStandardImplementation::startDataCallback(){
void UDPStandardImplementation::startListening(){cprintf(BLUE,"startlistening thread started %d\n",currentThreadIndex);
void UDPStandardImplementation::startListening(){
FILE_LOG(logDEBUG) << __AT__ << " called";
//set current thread value index
@ -2303,7 +2291,7 @@ uint32_t UDPStandardImplementation::processListeningBuffer(int ithread, int &cSi
void UDPStandardImplementation::startWriting(){cprintf(GREEN,"start writing thread started %d\n",currentThreadIndex);
void UDPStandardImplementation::startWriting(){
FILE_LOG(logDEBUG) << __AT__ << " called";
//set current thread value index
@ -2504,9 +2492,7 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
if(dataStreamEnable){
//ensure previous frame was processed
if(!ithread) cout<<"*** waiting for writerguisemiphore (stopwriting)"<<endl;
sem_wait(&writerGuiSemaphore[ithread]);
if(!ithread) cout<<"*** got post for writerguisemiphore (stopwriting)"<<endl;
guiNumPackets[ithread] = dummyPacketValue;
//let it know its got data
sem_post(&dataCallbackWriterSemaphore[ithread]);
@ -2584,7 +2570,8 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char*
//get current frame number
uint64_t tempframenumber;
uint32_t pnum;
if(getFrameandPacketNumber(ithread, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS,tempframenumber,pnum) == FAIL){
uint32_t snum;
if(getFrameandPacketNumber(ithread, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS,tempframenumber,pnum,snum) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
@ -2660,8 +2647,9 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w
//get start frame (required to create new file at the right juncture)
uint64_t startframe =-1;
uint32_t pnum;
uint32_t snum;
//if(ithread) cout<<"getting start frame number"<<endl;
if(getFrameandPacketNumber(ithread, wbuffer + offset, startframe,pnum) == FAIL){
if(getFrameandPacketNumber(ithread, wbuffer + offset, startframe,pnum,snum) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return;
@ -2719,7 +2707,8 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w
//get last frame number
uint64_t finalLastFrameNumberToSave = 0;
uint32_t pnum;
if(getFrameandPacketNumber(ithread, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS + ((numpackets - 1) * onePacketSize), finalLastFrameNumberToSave,pnum) == FAIL){
uint32_t snum;
if(getFrameandPacketNumber(ithread, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS + ((numpackets - 1) * onePacketSize), finalLastFrameNumberToSave,pnum,snum) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return;
@ -2809,9 +2798,8 @@ void UDPStandardImplementation::copyFrameToGui(int ithread, char* buffer, uint32
cprintf(GREEN,"Writing_Thread: CopyingFrame: Going to copy data\n");
#endif
//ensure previous frame was processed
if(!ithread) cout<<"*** waiting for writerguisemiphore (copyfrmae)"<<endl;
sem_wait(&writerGuiSemaphore[ithread]);
if(!ithread) cout<<"*** got post for writerguisemiphore (copyframe)"<<endl;
//copy date
guiNumPackets[ithread] = numpackets;
strcpy(guiFileName[ithread],completeFileName[ithread]);
@ -2842,7 +2830,8 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer
//get frame number
uint64_t tempframenumber=-1;
uint32_t pnum;
if(getFrameandPacketNumber(ithread, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS, tempframenumber,pnum) == FAIL){
uint32_t snum;
if(getFrameandPacketNumber(ithread, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS, tempframenumber,pnum,snum) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return;
@ -2978,11 +2967,15 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer
int UDPStandardImplementation::getFrameandPacketNumber(int ithread, char* wbuffer, uint64_t &framenumber, uint32_t &packetnumber){
int UDPStandardImplementation::getFrameandPacketNumber(int ithread, char* wbuffer, uint64_t &framenumber, uint32_t &packetnumber,uint32_t &subframenumber){
FILE_LOG(logDEBUG) << __AT__ << " called";
eiger_packet_footer_t* footer=0;
eiger_packet_header_t* e_header=0;
jfrau_packet_header_t* header=0;
framenumber = 0;
packetnumber = 0;
subframenumber = 0;
switch(myDetectorType){
@ -2996,12 +2989,15 @@ int UDPStandardImplementation::getFrameandPacketNumber(int ithread, char* wbuffe
return FAIL;
}
packetnumber = (*( (uint16_t*) footer->packetNumber));
e_header = (eiger_packet_header_t*) (wbuffer);
subframenumber = *( (uint32_t*) e_header->subFrameNumber);
#ifdef DEBUG4
if(!ithread) cprintf(GREEN,"Writing_Thread %d: fnum:%lld pnum:%d FPGA_fnum:%d footeroffset:%d\n",
if(!ithread) cprintf(GREEN,"Writing_Thread %d: fnum:%lld pnum:%d FPGA_fnum:%d subfnum:%d footeroffset:%d\n",
ithread,
(long long int)framenumber,
packetnumber,
framenumber,
subframenumber,
footerOffset);
#endif
framenumber += (startFrameIndex - 1);
@ -3051,9 +3047,9 @@ int UDPStandardImplementation::writeUptoFrameNumber(int ithread, char* wbuffer,
uint64_t tempframenumber=-1;
offset = endoffset;
uint32_t pnum;
uint32_t snum;
//get last frame number
if(getFrameandPacketNumber(ithread, wbuffer + (endoffset-onePacketSize), tempframenumber,pnum) == FAIL){
if(getFrameandPacketNumber(ithread, wbuffer + (endoffset-onePacketSize), tempframenumber,pnum,snum) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return FAIL;
@ -3079,7 +3075,7 @@ int UDPStandardImplementation::writeUptoFrameNumber(int ithread, char* wbuffer,
offset -= bigIncrements;
if(offset<startoffset)
break;//if(ithread) cout<<"frame number at going backwards fast f#:"<<tempframenumber<< " offset:"<<offset<<endl;
if(getFrameandPacketNumber(ithread, wbuffer + offset, tempframenumber,pnum) == FAIL){
if(getFrameandPacketNumber(ithread, wbuffer + offset, tempframenumber,pnum,snum) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return FAIL;
@ -3087,7 +3083,7 @@ int UDPStandardImplementation::writeUptoFrameNumber(int ithread, char* wbuffer,
}
if(offset<startoffset){
offset = startoffset;//if(ithread) cout<<"offset < start offset f#:"<<tempframenumber<< " offset:"<<offset<<endl;
if(getFrameandPacketNumber(ithread, wbuffer + offset, tempframenumber,pnum) == FAIL){
if(getFrameandPacketNumber(ithread, wbuffer + offset, tempframenumber,pnum,snum) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return FAIL;
@ -3095,7 +3091,7 @@ int UDPStandardImplementation::writeUptoFrameNumber(int ithread, char* wbuffer,
}
while(tempframenumber<nextFrameNumber){
offset += onePacketSize;//if(ithread) cout<<"frame number at going forwards slow f#:"<<tempframenumber<< " offset:"<<offset<<endl;
if(getFrameandPacketNumber(ithread, wbuffer + offset, tempframenumber,pnum) == FAIL){
if(getFrameandPacketNumber(ithread, wbuffer + offset, tempframenumber,pnum,snum) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return FAIL;