changes to slsreceiver

git-svn-id: file:///afs/psi.ch/project/sls_det_software/svn/slsDetectorSoftware@703 951219d9-93cf-4727-9268-0efd64621fa3
This commit is contained in:
l_maliakal_d
2013-12-03 08:54:52 +00:00
parent a952077c1d
commit 64616be52d
6 changed files with 315 additions and 218 deletions

View File

@ -62,6 +62,11 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det):
frameIndexOffset(GOTTHARD_FRAME_INDEX_OFFSET),
dataCompression(false),
numJobsPerThread(-1),
currentPacketOffset(0),
currentFrameOffset(0),
currentPacketCount(0),
currentFrameCount(-1),
totalListeningFrameCount(0),
acquisitionPeriod(SAMPLE_TIME_IN_NS),
startAcquisitionCallBack(NULL),
pStartAcquisition(NULL),
@ -462,7 +467,46 @@ void* slsReceiverFunctionList::startListeningThread(void* this_pointer){
void slsReceiverFunctionList::processFrameForFifo(){
//write packet count and set/increment counters/offsets
if(currentFrameCount > -1){
(*((uint8_t*)(buffer+currentFrameOffset))) = currentPacketCount;
currentPacketCount = 0;
currentFrameCount++;
//#ifdef VERYVERBOSE
totalListeningFrameCount++;
//#endif
#ifdef VERYVERBOSE
cout<<"lcurrframnum:"<< dec<<
(((uint32_t)(*((uint32_t*)(buffer+currentPacketOffset))) & frameIndexMask) >> frameIndexOffset)<<"*"<<endl;
#endif
currentFrameOffset += (HEADER_SIZE_NUM_PACKETS + bufferSize);
currentPacketOffset = currentFrameOffset + HEADER_SIZE_NUM_PACKETS;
}
//write frame count for each buffer and write buffer
if(currentFrameCount >= numJobsPerThread){
(*((uint16_t*)buffer)) = currentFrameCount;
while(!fifo->push(buffer));
#ifdef VERYVERBOSE
cout << "lbuf1:" << (void*)buffer << endl;
#endif
}
//pop freefifo and reset counters, set offsets
if((currentFrameCount >= numJobsPerThread) || (currentFrameCount == -1)){
fifofree->pop(buffer);
#ifdef VERYVERBOSE
cout << "lbuf1 popped:" << (void*)buffer << endl;
#endif
currentFrameCount = 0;
currentPacketCount = 0;
currentPacketOffset = HEADER_SIZE_NUM_FRAMES + HEADER_SIZE_NUM_PACKETS;
currentFrameOffset = HEADER_SIZE_NUM_FRAMES;
}
}
@ -475,17 +519,21 @@ int slsReceiverFunctionList::startListening(){
measurementStarted = false;
startFrameIndex = 0;
int offset=0;
int frameStartOffset = 0;
int ret=1;
int i=0;
int framesCount = -1;
int packetsCount = 0;
bool newFrame = true;
char *tempchar = new char[oneBufferSize];
int tempoffset= 0;
currentPacketOffset = 0;
currentFrameOffset = 0;
currentFrameCount = -1;
currentPacketCount = 0;
//#ifdef VERYVERBOSE
int totalcount = 0;
totalListeningFrameCount = 0;
//#endif
//to increase socket receiver buffer size and max length of input queue by changing kernel settings
if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max"))
cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl;
@ -527,34 +575,10 @@ int slsReceiverFunctionList::startListening(){
while (receiver_threads_running) {
//push buffer
if(framesCount >= numJobsPerThread){
//write frame count for each buffer
(*((uint16_t*)buffer)) = framesCount;
while(!fifo->push(buffer));
#ifdef VERYVERBOSE
cout << "lbuf1:" << (void*)buffer << endl;
#endif
}
if(newFrame)
processFrameForFifo();
//pop freefifo
if((framesCount >= numJobsPerThread) || (framesCount == -1)){
//reset frame count and packet count
framesCount = 0;
packetsCount = 0;
//pop freefifo
/*while(fifofree->isEmpty());*/
fifofree->pop(buffer);
#ifdef VERYVERBOSE
cout << "lbuf1 popped:" << (void*)buffer << endl;
#endif
//increment offsets
offset = HEADER_SIZE_NUM_FRAMES;
offset += HEADER_SIZE_NUM_PACKETS;
frameStartOffset = HEADER_SIZE_NUM_FRAMES;
}
//let tcp thread know this thread is in working condition
if(!startFrameIndex){
if(!listening_thread_running){
@ -565,148 +589,97 @@ int slsReceiverFunctionList::startListening(){
}
//ret -2, remaining, start new frame with curent packet, then progress to ret = 0 (waiting for next packet)
if(ret == -2){
memcpy(buffer+offset,tempchar,oneBufferSize);
ret = 0;
}
//ret = -3, remaning: start new frame with current packet, progress to ret = -1 (invalidate remaining packets, start new frame)
else if(ret == -3){
memcpy(buffer+offset,tempchar,oneBufferSize);
ret = -1;
}
else{
//receive 1 packet
rc = udpSocket->ReceiveDataOnly(buffer+offset,oneBufferSize);
if( rc <= 0){
//receive 1 packet
rc = udpSocket->ReceiveDataOnly(buffer+currentPacketOffset,oneBufferSize);
if( rc <= 0){
#ifdef VERYVERBOSE
cerr << "recvfrom() failed:"<<endl;
#endif
if(status != TRANSMITTING){
continue;
}
//the end of listening thread, after pushing off to fifo
else{
//push the last buffer into fifo
if(framesCount > 0){
(*((uint16_t*)buffer)) = framesCount;
fifo->push(buffer);
#ifdef VERYVERBOSE
cout <<" last lbuf1:" << (void*)buffer << endl;
cerr << "recvfrom() failed:"<<endl;
#endif
if(status != TRANSMITTING){
continue;
}
//the end of listening thread, after pushing off to fifo
else{
//push the last buffer into fifo
if(currentFrameCount > 0){
(*((uint8_t*)(buffer+currentFrameOffset))) = currentPacketCount;
if(currentPacketCount != 0){
currentFrameCount++;
totalListeningFrameCount++;
}
//push in dummy packet
while(fifofree->isEmpty());
fifofree->pop(buffer);
(*((uint16_t*)buffer)) = 0xFFFF;
(*((uint16_t*)buffer)) = currentFrameCount;
fifo->push(buffer);
#ifdef VERYVERBOSE
cout << "pushed in dummy buffer:" << (void*)buffer << endl;
cout <<" last lbuf1:" << (void*)buffer << endl;
#endif
break;
}
}
//manipulate buffer number to inlude frame number and packet number for gotthard
if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
(*((uint32_t*)(buffer+offset)))++;
//start for each scan
if(!measurementStarted){
startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer+offset)))) & (frameIndexMask)) >> frameIndexOffset);
cout<<"startFrameIndex:"<<startFrameIndex<<endl;
prevframenum=startFrameIndex;
measurementStarted = true;
}
//start of acquisition
if(!acqStarted){
startAcquisitionIndex=startFrameIndex;
currframenum = startAcquisitionIndex;
acqStarted = true;
cout<<"startAcquisitionIndex:"<<startAcquisitionIndex<<endl;
}
ret = filter->verifyFrame(buffer+offset);
/*
rets
case 0: waiting for next packet of new frame
case 1: finished with full frame,
start new frame
case -1: last packet of current frame,
invalidate remaining packets,
start new frame
case -2: first packet of new frame,
invalidate remaining packets,
check buffer needs to be pushed,
start new frame with the current packet,
then ret = 0
case -3: last packet of new frame,
invalidate remaining packets,
check buffer needs to be pushed,
start new frame with current packet,
then ret = -1 (invalidate remaining packets and start a new frame)
*/
}
//for each packet
packetsCount++;
//ret = 0, so just increment offset and continue
if(ret == 0){
offset += oneBufferSize;
continue;
}
//ret -2, -3, copy the current packet temporarily
if(ret < -1){
memcpy(tempchar, buffer+offset, oneBufferSize);
packetsCount --;
}
//ret -1, change needed only for the remaining packets
else if (ret == -1){
offset += oneBufferSize;
ret = 1;
}
//ret = -1, -2, -3, invalidate remaining packets
if(ret < 0){
for( i = offset; i < bufferSize; i += oneBufferSize)
(*((uint32_t*)(buffer+i))) = 0xFFFFFFFF;
}
//for each frame
//write packet count
(*((uint8_t*)(buffer+frameStartOffset))) = packetsCount;
//reset packet count
packetsCount = 0;
//increment frame count
framesCount++;
//#ifdef VERYVERBOSE
totalcount++;
//#endif
//push in dummy packet
while(fifofree->isEmpty());
fifofree->pop(buffer);
(*((uint16_t*)buffer)) = 0xFFFF;
fifo->push(buffer);
#ifdef VERYVERBOSE
cout<<"lcurrframnum:"<< dec<<
(((uint32_t)(*((uint32_t*)(buffer+offset))) & frameIndexMask) >> frameIndexOffset)<<"*"<<endl;
cout << "pushed in dummy buffer:" << (void*)buffer << endl;
#endif
//increment offsets
frameStartOffset += (HEADER_SIZE_NUM_PACKETS + bufferSize);
offset = frameStartOffset + HEADER_SIZE_NUM_PACKETS;
break;
}
}
//manipulate buffer number to inlude frame number and packet number for gotthard
if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
(*((uint32_t*)(buffer+currentPacketOffset)))++;
//start for each scan
if(!measurementStarted){
startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer+currentPacketOffset)))) & (frameIndexMask)) >> frameIndexOffset);
cout<<"startFrameIndex:"<<startFrameIndex<<endl;
prevframenum=startFrameIndex;
measurementStarted = true;
}
//start of acquisition
if(!acqStarted){
startAcquisitionIndex=startFrameIndex;
currframenum = startAcquisitionIndex;
acqStarted = true;
cout<<"startAcquisitionIndex:"<<startAcquisitionIndex<<endl;
}
ret = filter->verifyFrame(buffer+currentPacketOffset);
//ret = -1, -3 :packets of next frame, so copy it to a later offset or to new buffer
if(ret < -1){
if((currentFrameCount + 1) >= numJobsPerThread){
memcpy(tempchar, buffer+currentPacketOffset, oneBufferSize);
processFrameForFifo();
memcpy(buffer+currentPacketOffset,tempchar,oneBufferSize);
}else{
tempoffset = currentPacketCount;
processFrameForFifo();
memcpy(buffer+currentPacketOffset,buffer+tempoffset,oneBufferSize);
}
//ret = -2, not last frame of next packet. so wait for next packet
if(ret == -2)
ret = 0;
//rer = -3, last packet, so new frame
}
currentPacketCount++;
//ret = 0, wait for next packet
if(ret == 0){
currentPacketOffset += oneBufferSize;
newFrame = false;
}
// ret = -1, 1, last packet rxd for current frame, so new frame please
else
newFrame = true;
}
}
delete tempchar;
@ -716,7 +689,7 @@ int slsReceiverFunctionList::startListening(){
pthread_mutex_unlock(&(status_mutex));
//#ifdef VERYVERBOSE
cout << "Total count listened to " << totalcount << endl;
cout << "Total count listened to " << totalListeningFrameCount << endl;
//#endif
return 0;
}
@ -904,7 +877,10 @@ int slsReceiverFunctionList::startWriting(){
cout << "ERROR: You do not have permissions to overwrite: " << savefilename << endl;
}
}
if(npackets == packetsPerFrame){
framesCaught++;
totalFramesCaught++;
}
//increment offset
offset += bufferSize;
@ -914,8 +890,6 @@ int slsReceiverFunctionList::startWriting(){
//increment/decrement counters
framesInFile += numFramesToBeSaved;
framesCaught += numFramesToBeSaved;
totalFramesCaught += numFramesToBeSaved;
numFrames -= numFramesToBeSaved;
//create new file
if(framesInFile >= maxFramesPerFile)
@ -925,8 +899,9 @@ int slsReceiverFunctionList::startWriting(){
}
}
copyFrameToGui(wbuf + HEADER_SIZE_NUM_FRAMES + HEADER_SIZE_NUM_PACKETS);
if(((uint8_t)(*((uint8_t*)(wbuf + HEADER_SIZE_NUM_FRAMES)))) == packetsPerFrame){
copyFrameToGui(wbuf + HEADER_SIZE_NUM_FRAMES + HEADER_SIZE_NUM_PACKETS);
}
if(!dataCompression){
@ -1041,11 +1016,11 @@ void slsReceiverFunctionList::copyFrameToGui(char* startbuf){
pthread_mutex_lock(&dataReadyMutex);
guiDataReady=0;
pthread_mutex_unlock(&dataReadyMutex);
/*pthread_mutex_unlock(&dataReadyMutex);*/
//send the first one
memcpy(latestData,startbuf,bufferSize);
strcpy(guiFileName,savefilename);
pthread_mutex_lock(&dataReadyMutex);
/*pthread_mutex_lock(&dataReadyMutex);*/
guiDataReady=1;
pthread_mutex_unlock(&dataReadyMutex);
}
@ -1059,6 +1034,7 @@ void slsReceiverFunctionList::readFrame(char* c,char** raw){
//point to gui data
if (guiData == NULL)
guiData = latestData;
//copy data and filename
strcpy(c,guiFileName);
//could not get gui data
@ -1069,6 +1045,7 @@ void slsReceiverFunctionList::readFrame(char* c,char** raw){
else{
*raw = guiData;
guiData = NULL;
pthread_mutex_lock(&dataReadyMutex);
guiDataReady = 0;
pthread_mutex_unlock(&dataReadyMutex);