receiver complete change.dont check out

git-svn-id: file:///afs/psi.ch/project/sls_det_software/svn/slsDetectorSoftware@685 951219d9-93cf-4727-9268-0efd64621fa3
This commit is contained in:
l_maliakal_d
2013-11-14 12:32:56 +00:00
parent 1673da0854
commit 68ae219125
6 changed files with 511 additions and 300 deletions

View File

@ -15,11 +15,17 @@
#ifndef CIRCULARFIFO_H_ #ifndef CIRCULARFIFO_H_
#define CIRCULARFIFO_H_ #define CIRCULARFIFO_H_
#include "sls_detector_defs.h" //#include "sls_detector_defs.h"
#include <vector> #include <vector>
using namespace std; using namespace std;
typedef double double32_t;
typedef float float32_t;
typedef int int32_t;
/** Circular Fifo (a.k.a. Circular Buffer) /** Circular Fifo (a.k.a. Circular Buffer)
* Thread safe for one reader, and one writer */ * Thread safe for one reader, and one writer */
template<typename Element> template<typename Element>

View File

@ -16,7 +16,7 @@
//all max frames defined in sls_detector_defs.h. 20000 gotthard, 100000 for short gotthard, 1000 for moench //all max frames defined in sls_detector_defs.h. 20000 gotthard, 100000 for short gotthard, 1000 for moench
#define GOTTHARD_FIFO_SIZE 25000 #define GOTTHARD_FIFO_SIZE 25000//11
#define GOTTHARD_ALIGNED_FRAME_SIZE 4096 #define GOTTHARD_ALIGNED_FRAME_SIZE 4096
#define GOTTHARD_PACKETS_PER_FRAME 2 #define GOTTHARD_PACKETS_PER_FRAME 2
#define GOTTHARD_ONE_PACKET_SIZE 1286 #define GOTTHARD_ONE_PACKET_SIZE 1286
@ -35,7 +35,9 @@
#define GOTTHARD_PACKET_INDEX_MASK 0x1 #define GOTTHARD_PACKET_INDEX_MASK 0x1
#define GOTTHARD_NUM_JOBS_P_THREAD 5000//20000//3 with 25 frames
#define GOTTHARD_SHORT_NUM_JOBS_P_THREAD 2500//40000
#define MOENCH_NUM_JOBS_P_THREAD 1000//10000
#define MOENCH_FIFO_SIZE 2500 #define MOENCH_FIFO_SIZE 2500
#define MOENCH_ALIGNED_FRAME_SIZE 65536 #define MOENCH_ALIGNED_FRAME_SIZE 65536

View File

@ -16,6 +16,7 @@
#include <sys/mman.h> //munmap #include <sys/mman.h> //munmap
#include <string.h> #include <string.h>
#include <iostream> #include <iostream>
using namespace std; using namespace std;
@ -60,6 +61,8 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det):
frameIndexMask(GOTTHARD_FRAME_INDEX_MASK), frameIndexMask(GOTTHARD_FRAME_INDEX_MASK),
frameIndexOffset(GOTTHARD_FRAME_INDEX_OFFSET), frameIndexOffset(GOTTHARD_FRAME_INDEX_OFFSET),
dataCompression(false), dataCompression(false),
numJobsPerThread(GOTTHARD_NUM_JOBS_P_THREAD),
userDefinedNumJobsPerThread(0),
startAcquisitionCallBack(NULL), startAcquisitionCallBack(NULL),
pStartAcquisition(NULL), pStartAcquisition(NULL),
acquisitionFinishedCallBack(NULL), acquisitionFinishedCallBack(NULL),
@ -80,7 +83,13 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det):
packetsPerFrame = MOENCH_PACKETS_PER_FRAME; packetsPerFrame = MOENCH_PACKETS_PER_FRAME;
frameIndexMask = MOENCH_FRAME_INDEX_MASK; frameIndexMask = MOENCH_FRAME_INDEX_MASK;
frameIndexOffset = MOENCH_FRAME_INDEX_OFFSET; frameIndexOffset = MOENCH_FRAME_INDEX_OFFSET;
numJobsPerThread = MOENCH_NUM_JOBS_P_THREAD;
} }
if(userDefinedNumJobsPerThread)
numJobsPerThread = userDefinedNumJobsPerThread;
oneBufferSize = bufferSize/packetsPerFrame; oneBufferSize = bufferSize/packetsPerFrame;
strcpy(savefilename,""); strcpy(savefilename,"");
@ -108,10 +117,10 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det):
} }
vector <vector<int16_t> > map; int16_t* map;
vector <vector<int16_t> > mask; int16_t* mask;
int initial_offset = 4; int initial_offset = 2;
int later_offset = 2; int later_offset = 1;
int mask_y_offset = 120; int mask_y_offset = 120;
int mask_adc = 0x7fff; int mask_adc = 0x7fff;
@ -131,21 +140,17 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det):
case MOENCH: case MOENCH:
x = MOENCH_PIXELS_IN_ONE_ROW; x = MOENCH_PIXELS_IN_ONE_ROW;
y = MOENCH_PIXELS_IN_ONE_ROW; y = MOENCH_PIXELS_IN_ONE_ROW;
mask.resize(x);
for(i=0;i<x; i++)
mask[i].resize(y);
map.resize(x);
for(i=0;i<x; i++)
map[i].resize(y);
mask = new int16_t[x*y];
map = new int16_t[x*y];
//set up mask for moench //set up mask for moench
for(i=0;i<x; i++) for(i=0;i<x; i++)
for(j=0;j<y; j++){ for(j=0;j<y; j++){
if (j<mask_y_offset) if (j<mask_y_offset)
mask[i][j] = mask_adc; mask[i*y+j] = mask_adc;
else else
mask[i][j] = 0; mask[i*y+j] = 0;
} }
//set up mapping for moench //set up mapping for moench
@ -157,7 +162,7 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det):
ipacket = (ipx + 1) + (ipy * num_packets_in_row); ipacket = (ipx + 1) + (ipy * num_packets_in_row);
if (ipacket == MOENCH_PACKETS_PER_FRAME) if (ipacket == MOENCH_PACKETS_PER_FRAME)
ipacket = 0; ipacket = 0;
map[ ipx * num_pixels_per_packet_in_col + ix][ ipy * num_pixels_per_packet_in_row + iy] = map[ (ipx * num_pixels_per_packet_in_col + ix) * y + (ipy * num_pixels_per_packet_in_row + iy) ] =
ipacket * MOENCH_ONE_PACKET_SIZE + offset; ipacket * MOENCH_ONE_PACKET_SIZE + offset;
offset += later_offset; offset += later_offset;
} }
@ -166,41 +171,44 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det):
} }
filter = new singlePhotonFilter(x,y, MOENCH_FRAME_INDEX_MASK, MOENCH_PACKET_INDEX_MASK, MOENCH_FRAME_INDEX_OFFSET, 0, MOENCH_PACKETS_PER_FRAME, 0,map, mask,MOENCH_BUFFER_SIZE); filter = new singlePhotonFilter(x,y, MOENCH_FRAME_INDEX_MASK, MOENCH_PACKET_INDEX_MASK, MOENCH_FRAME_INDEX_OFFSET,
0, MOENCH_PACKETS_PER_FRAME, 0,map, mask,fifofree,MOENCH_BUFFER_SIZE);
break; break;
default: default:
x = 1; x = 1;
y = (GOTTHARD_DATA_BYTES/GOTTHARD_PACKETS_PER_FRAME);/*bufferSize/sizeof(int16_t)*/ y = (GOTTHARD_DATA_BYTES/GOTTHARD_PACKETS_PER_FRAME);/*bufferSize/sizeof(int16_t)*/
offset = initial_offset; offset = initial_offset;
mask.resize(x); mask = new int16_t[x*y];
for(int i=0;i<x; i++) map = new int16_t[x*y];
mask[i].resize(y);
map.resize(x);
for(int i=0;i<x; i++)
map[i].resize(y);
//set up mask for moench //set up mask for gotthard
for (i=0; i < x; i++) for (i=0; i < x; i++)
for (j=0; j < y; j++){ for (j=0; j < y; j++){
mask[i][j] = 0; mask[i*y+j] = 0;
} }
//set up mapping for gotthard //set up mapping for gotthard
for (i=0; i < x; i++) for (i=0; i < x; i++)
for (j=0; j < y; j++){ for (j=0; j < y; j++){
//since there are 2 packets //since there are 2 packets
if (y == y/2) if (j == y/2){
offset += initial_offset; offset += initial_offset;
map[i][j] = offset; offset += 1;
offset += 2; }
map[i*y+j] = offset;
/*cout<<"offset["<<i*y+j<<"]:"<<map[i*y+j]<<"\t";*/
offset += 1;
} }
cout<<endl;
filter = new singlePhotonFilter(x,y,GOTTHARD_FRAME_INDEX_MASK, GOTTHARD_PACKET_INDEX_MASK, GOTTHARD_FRAME_INDEX_OFFSET,
filter = new singlePhotonFilter(x,y,GOTTHARD_FRAME_INDEX_MASK, GOTTHARD_PACKET_INDEX_MASK, GOTTHARD_FRAME_INDEX_OFFSET, 0, GOTTHARD_PACKETS_PER_FRAME, 1,map, mask,GOTTHARD_BUFFER_SIZE); 0, GOTTHARD_PACKETS_PER_FRAME, 1,map, mask,fifofree,GOTTHARD_BUFFER_SIZE);
break; break;
} }
pthread_mutex_init(&status_mutex,NULL); pthread_mutex_init(&status_mutex,NULL);
pthread_mutex_init(&dataReadyMutex,NULL); pthread_mutex_init(&dataReadyMutex,NULL);
@ -332,7 +340,13 @@ int slsReceiverFunctionList::startReceiver(char message[]){
} }
//wait till udp socket created //wait till udp socket created
while(!listening_thread_running); while(!listening_thread_running);
if(listening_thread_running!=1){ if(listening_thread_running==-1){
//change status
pthread_mutex_lock(&status_mutex);
status = IDLE;
listening_thread_running = 0;
receiver_threads_running = 0;
pthread_mutex_unlock(&(status_mutex));
strcpy(message,"Could not create UDP Socket.\n"); strcpy(message,"Could not create UDP Socket.\n");
return FAIL; return FAIL;
} }
@ -359,7 +373,15 @@ int slsReceiverFunctionList::startReceiver(char message[]){
} }
//wait till file is created //wait till file is created
while(!writing_thread_running); while(!writing_thread_running);
if(writing_thread_running!=1){ if(writing_thread_running==-1){
//change status
pthread_mutex_lock(&status_mutex);
status = IDLE;
listening_thread_running = 0;
receiver_threads_running = 0;
pthread_mutex_unlock(&(status_mutex));
if(udpSocket) udpSocket->ShutDownSocket();
pthread_join(listening_thread,NULL);
sprintf(message,"Could not create file %s.\n",savefilename); sprintf(message,"Could not create file %s.\n",savefilename);
return FAIL; return FAIL;
} }
@ -401,7 +423,6 @@ int slsReceiverFunctionList::startReceiver(char message[]){
//initialize semaphore //initialize semaphore
sem_init(&smp,0,1); sem_init(&smp,0,1);
return OK; return OK;
} }
@ -410,14 +431,14 @@ int slsReceiverFunctionList::startReceiver(char message[]){
int slsReceiverFunctionList::stopReceiver(){ int slsReceiverFunctionList::stopReceiver(){
#ifdef VERBOSE //#ifdef VERBOSE
cout << "Stopping Receiver" << endl; cout << "Stopping Receiver" << endl;
#endif //#endif
if(receiver_threads_running){ if(receiver_threads_running){
#ifdef VERBOSE //#ifdef VERBOSE
cout << "Stopping new acquisition threadddd ...." << endl; cout << "Stopping new acquisition thread" << endl;
#endif //#endif
//stop listening thread //stop listening thread
pthread_mutex_lock(&status_mutex); pthread_mutex_lock(&status_mutex);
receiver_threads_running=0; receiver_threads_running=0;
@ -426,11 +447,13 @@ int slsReceiverFunctionList::stopReceiver(){
if(udpSocket) udpSocket->ShutDownSocket(); if(udpSocket) udpSocket->ShutDownSocket();
pthread_join(listening_thread,NULL); pthread_join(listening_thread,NULL);
pthread_join(writing_thread,NULL); pthread_join(writing_thread,NULL);
/*if(dataCompression)
filter->enableCompression(false);*/
} }
//change status //change status
pthread_mutex_lock(&status_mutex); pthread_mutex_lock(&status_mutex);
status = IDLE; status = IDLE;
pthread_mutex_unlock(&(status_mutex)); pthread_mutex_unlock(&(status_mutex));;
//semaphore destroy //semaphore destroy
sem_post(&smp); sem_post(&smp);
@ -462,144 +485,155 @@ int slsReceiverFunctionList::startListening(){
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cout << "In startListening()\n"); cout << "In startListening()\n");
#endif #endif
int rc; int rc=0;
measurementStarted = false; measurementStarted = false;
startFrameIndex = 0; startFrameIndex = 0;
int offset=0; int offset=0;
int ret=1; int ret=1;
int i=0; int i=0;
uint32_t *framenum;
char *tempchar = new char[oneBufferSize]; char *tempchar = new char[oneBufferSize];
// A do/while(FALSE) loop is used to make error cleanup easier. The //to increase socket receiver buffer size and max length of input queue by changing kernel settings
// close() of each of the socket descriptors is only done once at the if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max"))
// very end of the program. cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl;
do { else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog"))
cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl;
//to increase socket receiver buffer size and max length of input queue by changing kernel settings /** permanent setting heiner
if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max"))
cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl;
else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog"))
cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl;
/** permanent setting heiner
net.core.rmem_max = 104857600 # 100MiB net.core.rmem_max = 104857600 # 100MiB
net.core.netdev_max_backlog = 250000 net.core.netdev_max_backlog = 250000
sysctl -p sysctl -p
// from the manual // from the manual
sysctl -w net.core.rmem_max=16777216 sysctl -w net.core.rmem_max=16777216
sysctl -w net.core.netdev_max_backlog=250000 sysctl -w net.core.netdev_max_backlog=250000
*/ */
//creating udp socket //creating udp socket
if (strchr(eth,'.')!=NULL) strcpy(eth,""); if (strchr(eth,'.')!=NULL) strcpy(eth,"");
if(!strlen(eth)){ if(!strlen(eth)){
cout<<"warning:eth is empty.listening to all"<<endl; cout<<"warning:eth is empty.listening to all"<<endl;
if(udpSocket){ if(udpSocket){
delete udpSocket; delete udpSocket;
udpSocket = NULL; udpSocket = NULL;
}
udpSocket = new genericSocket(server_port,genericSocket::UDP,oneBufferSize,1);//packetsPerFrame);
}else{
cout<<"eth:"<<eth<<endl;
udpSocket = new genericSocket(server_port,genericSocket::UDP,oneBufferSize,1,eth);//packetsPerFrame,eth);
}
if (udpSocket->getErrorStatus()){
#ifdef VERBOSE
std::cout<< "Could not create UDP socket "<< server_port << std::endl;
#endif
pthread_mutex_lock(&status_mutex);
listening_thread_running = -1;
pthread_mutex_unlock(&status_mutex);
break;
} }
udpSocket = new genericSocket(server_port,genericSocket::UDP,oneBufferSize,1);//packetsPerFrame);
}else{
cout<<"eth:"<<eth<<endl;
udpSocket = new genericSocket(server_port,genericSocket::UDP,oneBufferSize,1,eth);//packetsPerFrame,eth);
}
int iret = udpSocket->getErrorStatus();
if (iret){
//#ifdef VERBOSE
std::cout<< "Could not create UDP socket on port "<< server_port << " error:"<<iret<<std::endl;
//#endif
pthread_mutex_lock(&status_mutex);
listening_thread_running = -1;
pthread_mutex_unlock(&status_mutex);
return 0;
}else{
/*filter->setupAcquisitionParameters();*/
filter->setupAcquisitionParameters();
while (receiver_threads_running) { while (receiver_threads_running) {
if(!listening_thread_running){
pthread_mutex_lock(&status_mutex);
listening_thread_running = 1;
pthread_mutex_unlock(&(status_mutex));
}
if (!fifofree->isEmpty()) { if (!fifofree->isEmpty()) {
if (ret!=0) if (ret!=0)
fifofree->pop(buffer); fifofree->pop(buffer);
if(ret == -2){
if(ret == -3){
memcpy(buffer,tempchar,oneBufferSize); memcpy(buffer,tempchar,oneBufferSize);
offset = oneBufferSize; offset = oneBufferSize;
}
//receiver 2 half frames / 1 short frame / 40 moench frames
rc = udpSocket->ReceiveDataOnly(buffer+offset,oneBufferSize);
if( rc <= 0){
#ifdef VERYVERBOSE
cerr << "recvfrom() failed" << endl;
#endif
continue;
}
//cout<<"got index:"<<hex<<(((uint32_t)(*((uint32_t*)(buffer+offset))) & (frameIndexMask)) >> frameIndexOffset);
if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
(*((uint32_t*)(buffer+offset)))++;
ret = filter->verifyFrame(buffer+offset);
//start for each scan
if(!measurementStarted){
startFrameIndex = ((((uint32_t)(*((uint32_t*)buffer))) & (frameIndexMask)) >> frameIndexOffset);
cout<<"startFrameIndex:"<<hex<<startFrameIndex<<endl;
prevframenum=startFrameIndex;
measurementStarted = true;
}
//start of acquisition
if(!acqStarted){
startAcquisitionIndex=startFrameIndex;
currframenum = startAcquisitionIndex;
acqStarted = true;
cout<<"startAcquisitionIndex:"<<hex<<startAcquisitionIndex<<endl;
}
/*
if(ret < 0){
offset = 0;
continue;
}
*/
//last packet, but not the full frame, must push previous frame
if(ret == -1){
//set reminaing headers invalid
for( i = offset+ oneBufferSize; i < bufferSize; i += oneBufferSize)
(*((uint32_t*)(buffer+i))) = 0xFFFFFFFF;
offset = 0;
}
//first packet of new frame, must push previous frame
else if(ret == -2){
//copy the new frame to a temp
memcpy(tempchar, buffer+offset, oneBufferSize);
//set all the new frame header and remaining headers invalid //set all the new frame header and remaining headers invalid
for(i = offset; i < bufferSize; i += oneBufferSize); for(i = offset; i < bufferSize; i += oneBufferSize)
(*((uint32_t*)(buffer+i))) = 0xFFFFFFFF; (*((uint32_t*)(buffer+i))) = 0xFFFFFFFF;
} //start from beginning
//wait for new packets of same frame
else if (ret == 0){
offset += oneBufferSize;
continue;
}
//wait for next frame
else{
offset = 0; offset = 0;
ret = 0;
}
else{
if(ret == -2){/** for moench, in between nxt, means ots -1*/
memcpy(buffer,tempchar,oneBufferSize);
offset = oneBufferSize;
ret = 0;
}
if(!startFrameIndex){
if(!listening_thread_running){
pthread_mutex_lock(&status_mutex);
listening_thread_running = 1;
pthread_mutex_unlock(&(status_mutex));
}
}
//receiver 2 half frames / 1 short frame / 40 moench frames
rc = udpSocket->ReceiveDataOnly(buffer+offset,oneBufferSize);
if( rc <= 0){
//#ifdef VERYVERBOSE
cerr << "recvfrom() failed:"<<endl;;// << receiver_threads_running<<endl;
fifofree->push(buffer);
//#endif
continue;
}
//manipulate buffer number to inlude frame number and packet number for gotthard
if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
(*((uint32_t*)(buffer+offset)))++;
ret = filter->verifyFrame(buffer+offset);
//start for each scan
if(!measurementStarted){
startFrameIndex = ((((uint32_t)(*((uint32_t*)buffer))) & (frameIndexMask)) >> frameIndexOffset);
cout<<"startFrameIndex:"<<startFrameIndex<<endl;
prevframenum=startFrameIndex;
measurementStarted = true;
}
//start of acquisition
if(!acqStarted){
startAcquisitionIndex=startFrameIndex;
currframenum = startAcquisitionIndex;
acqStarted = true;
cout<<"startAcquisitionIndex:"<<startAcquisitionIndex<<endl;
}
//last packet, but not the full frame, must push previous frame
if(ret == -1){
//set reminaing headers invalid
for( i = offset+ oneBufferSize; i < bufferSize; i += oneBufferSize)
(*((uint32_t*)(buffer+i))) = 0xFFFFFFFF;
offset = 0;
}
else if(ret == -3){
//copy the new frame to a temp
memcpy(tempchar, buffer+offset, oneBufferSize);
//set all the new frame header and remaining headers invalid
for(i = offset; i < bufferSize; i += oneBufferSize)
(*((uint32_t*)(buffer+i))) = 0xFFFFFFFF;
}
//first packet of new frame, must push previous frame
else if(ret == -2){
//copy the new frame to a temp
memcpy(tempchar, buffer+offset, oneBufferSize);
//set all the new frame header and remaining headers invalid
for(i = offset; i < bufferSize; i += oneBufferSize)
(*((uint32_t*)(buffer+i))) = 0xFFFFFFFF;
}
//wait for new packets of same frame
else if (ret == 0){
offset += oneBufferSize;
continue;
}
//wait for next frame
else
offset = 0;
} }
//so that it doesnt write the last frame twice //so that it doesnt write the last frame twice
@ -609,21 +643,9 @@ int slsReceiverFunctionList::startListening(){
} }
} }
} while (receiver_threads_running); }
pthread_mutex_lock(&status_mutex);
receiver_threads_running=0;
status = IDLE;
pthread_mutex_unlock(&status_mutex);
//Close down any open socket descriptors
udpSocket->Disconnect();
delete tempchar;
#ifdef VERBOSE
cout << "receiver_threads_running:" << receiver_threads_running << endl;
#endif
delete tempchar;
return 0; return 0;
} }
@ -634,7 +656,6 @@ int slsReceiverFunctionList::startListening(){
void* slsReceiverFunctionList::startWritingThread(void* this_pointer){ void* slsReceiverFunctionList::startWritingThread(void* this_pointer){
((slsReceiverFunctionList*)this_pointer)->startWriting(); ((slsReceiverFunctionList*)this_pointer)->startWriting();
return this_pointer; return this_pointer;
} }
@ -649,7 +670,14 @@ int slsReceiverFunctionList::startWriting(){
char *wbuf; char *wbuf;
int sleepnumber=0; int sleepnumber=0;
int frameFactor=0; int frameFactor=0;
int i; int i,p;
/* char* trialarr[GOTTHARD_COMPRESSION_FIFO_SIZE];*/
int iJob = -2;
char* startingmem = 0;
int header_of_last_packet = (packetsPerFrame-1)*oneBufferSize;
int pointinfifo=0;
int firsttime = 1;
packetsInFile=0; packetsInFile=0;
framesCaught=0; framesCaught=0;
@ -667,10 +695,11 @@ int slsReceiverFunctionList::startWriting(){
cout << "Max Frames Per File:" << maxFramesPerFile << endl; cout << "Max Frames Per File:" << maxFramesPerFile << endl;
if (rawDataReadyCallBack) if (rawDataReadyCallBack)
cout << "Note: Data Write has been defined exernally" << endl; cout << "Note: Data Write has been defined exernally" << endl;
if (dataCompression)
cout << "Data Compression is enabled with " << numJobsPerThread << " number of jobs per thread" << endl;
if(nFrameToGui) if(nFrameToGui)
cout << "Sending every " << nFrameToGui << "th frame to gui" << endl; cout << "Sending every " << nFrameToGui << "th frame to gui" << endl;
//by default, we read/write everything //by default, we read/write everything
cbAction = DO_EVERYTHING; cbAction = DO_EVERYTHING;
//acquisition start call back returns enable write //acquisition start call back returns enable write
@ -685,68 +714,83 @@ int slsReceiverFunctionList::startWriting(){
cout << "Ready!" << endl; cout << "Ready!" << endl;
if (dataCompression)
filter->enableFilter(true);
//will always run till acquisition over and then runs till fifo is empty //will always run till acquisition over and then runs till fifo is empty
while(receiver_threads_running || (!fifo->isEmpty())){ while(receiver_threads_running || (!fifo->isEmpty())){
//start a new file //start a new file
if (((int)(packetsInFile/packetsPerFrame) >= maxFramesPerFile) || (strlen(savefilename) == 0)){ if ((strlen(savefilename) == 0) || ((packetsInFile/packetsPerFrame) >= maxFramesPerFile)){
//create file name //create file name
if(frameIndexNeeded==-1) sprintf(savefilename, "%s/%s_%d.raw", filePath,fileName,fileIndex); if(!dataCompression){
else sprintf(savefilename, "%s/%s_f%012d_%d.raw", filePath,fileName,framesCaught,fileIndex); if(frameIndexNeeded==-1)
sprintf(savefilename, "%s/%s_%d.raw", filePath,fileName,fileIndex);
else
sprintf(savefilename, "%s/%s_f%012d_%d.raw", filePath,fileName,framesCaught,fileIndex);
}
//only for gui display
else if(strlen(savefilename) == 0){
sprintf(savefilename, "%s/%s_fxxx_%d.raw", filePath,fileName,fileIndex);
filter->setupAcquisitionParameters(filePath,fileName,fileIndex);
}
if(enableFileWrite && cbAction > DO_NOTHING){ if(enableFileWrite && cbAction > DO_NOTHING){
if (dataCompression){
//create tree and file //only the first time
if(dataCompression){ if ((!framesCaught) && (filter->initTree()== FAIL)){
if(enableFileWrite){ cout << " Error: Could not create file " << savefilename << endl;
filter->writeToFile(); pthread_mutex_lock(&status_mutex);
filter->initTree(savefilename); writing_thread_running = -1;
pthread_mutex_unlock(&(status_mutex));
break;
} }
} }
/*else{*///the standard way
if(sfilefd){
fclose(sfilefd);
sfilefd = NULL;
}
if (NULL == (sfilefd = fopen((const char *) (savefilename), "w"))){
cout << "Error: Could not create file " << savefilename << endl;
pthread_mutex_lock(&status_mutex);
writing_thread_running = -1;
pthread_mutex_unlock(&(status_mutex));
break;
}
//setting buffer
setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE);
/*}*/
//printing packet losses and file names //standard way
//if(prevframenum != 0)
if(!framesCaught)
cout << savefilename << endl;
else{ else{
cout << savefilename //close and open file
<< "\tpacket loss " if(sfilefd){
<< setw(4)<<fixed << setprecision(4)<< dec << fclose(sfilefd);
(int)(((((currframenum-prevframenum)*packetsPerFrame)-(packetsInFile))/(double)((currframenum-prevframenum)*packetsPerFrame))*100.000)/*packetloss*/ sfilefd = NULL;
<< "%\tframenum " }
<< dec << currframenum //<< "\t\t p " << prevframenum if (NULL == (sfilefd = fopen((const char *) (savefilename), "w"))){
<< "\tindex " << dec << getFrameIndex() cout << "Error: Could not create file " << savefilename << endl;
<< "\tpackets lost " << dec << ((currframenum-prevframenum)*packetsPerFrame)-(packetsInFile) << endl; pthread_mutex_lock(&status_mutex);
writing_thread_running = -1;
pthread_mutex_unlock(&(status_mutex));
break;
}
//setting buffer
setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE);
//printing packet losses and file names
if(!framesCaught)
cout << savefilename << endl;
else{
cout << savefilename
<< "\tpacket loss "
<< setw(4)<<fixed << setprecision(4)<< dec <<
(int)(((((currframenum-prevframenum)*packetsPerFrame)-(packetsInFile))/(double)((currframenum-prevframenum)*packetsPerFrame))*100.000)/*packetloss*/
<< "%\tframenum "
<< dec << currframenum //<< "\t\t p " << prevframenum
<< "\tindex " << dec << getFrameIndex()
<< "\tpackets lost " << dec << ((currframenum-prevframenum)*packetsPerFrame)-(packetsInFile) << endl;
}
} }
} }
pthread_mutex_lock(&status_mutex); //only the first time - state the thread is rinning
writing_thread_running = 1; if (!framesCaught){
pthread_mutex_unlock(&(status_mutex)); pthread_mutex_lock(&status_mutex);
writing_thread_running = 1;
pthread_mutex_unlock(&(status_mutex));
//if(prevframenum != 0){ }
if(framesCaught){ else if (!dataCompression){
prevframenum = currframenum; prevframenum = currframenum;
packetsInFile=0; packetsInFile=0;
} }
@ -757,49 +801,108 @@ int slsReceiverFunctionList::startWriting(){
if(!fifo->isEmpty()){ if(!fifo->isEmpty()){
if(fifo->pop(wbuf)){ if(fifo->pop(wbuf)){
currframenum = ((uint32_t)(*((uint32_t*)wbuf))& frameIndexMask) >>frameIndexOffset; currframenum = ((uint32_t)(*((uint32_t*)wbuf))& frameIndexMask) >>frameIndexOffset;
//cout<<"currframenum: "<<hex<<currframenum<<endl; //cout<<"currframenum2:"<<((((uint32_t)(*((uint32_t*)((char*)(wbuf+oneBufferSize)))))& frameIndexMask) >> frameIndexOffset)<<endl;;
//cout<<"currframenum2:"<<hex<<((((uint32_t)(*((uint32_t*)((char*)(wbuf+oneBufferSize)))))& frameIndexMask) >> frameIndexOffset)<<endl;;
for(i=0; i < packetsPerFrame; i++){
if(((uint32_t)(*((uint32_t*)((char*)(wbuf+oneBufferSize))))) == 0xFFFFFFFF){
//cout<<"found one: currframenum:"<<currframenum<<" currframe2:"<<((((uint32_t)(*((uint32_t*)((char*)(wbuf+oneBufferSize)))))& frameIndexMask) >> frameIndexOffset)<<endl;
break;
}
}
packetsInFile += i;
totalPacketsCaught += i;
//count only if you get full frames
if(i == packetsPerFrame){
framesCaught++;
totalFramesCaught++;
}
//write data call back //write data call back
if (cbAction < DO_EVERYTHING) { if (cbAction < DO_EVERYTHING) {
rawDataReadyCallBack(currframenum, wbuf, i*oneBufferSize, sfilefd, guiData,pRawDataReady); rawDataReadyCallBack(currframenum, wbuf, bufferSize, sfilefd, guiData,pRawDataReady);
} }
//default writing to file //default writing to file
else if(enableFileWrite){ else if(enableFileWrite){
/*if(!dataCompression){*/
if(sfilefd) if(dataCompression){
fwrite(wbuf, 1, i*oneBufferSize, sfilefd);
else{ //if whole frame received
cout << "You do not have permissions to overwrite: " << savefilename << endl; if(((uint32_t)(*((uint32_t*)(wbuf+header_of_last_packet)))) != 0xFFFFFFFF){
usleep(50000); framesCaught++;
totalFramesCaught++;
packetsInFile += packetsPerFrame;
totalPacketsCaught += packetsPerFrame;
p = packetsPerFrame;
}
//to skip incompleete frames
else {
/*cout<<"***not whole frame currframenum:"<<currframenum<<endl;*/
(*((uint32_t*)(wbuf))) = 0xFFFFFFFF;
p=0;
}
/*cout<<"wbuf:"<<(void*)wbuf<<endl;*/
//filter and write
if(iJob==-2){
/*cout<<"startingmem changed"<<endl;*/
startingmem = wbuf;
/*cout<<"***************startingmem:"<<(void*)startingmem<<endl;
cout<<"mem0:"<<(void*)mem0<<endl;*/
iJob=0;
if(firsttime){
pointinfifo = (startingmem-mem0)/4096;
firsttime = 0;
/*cout<<"fist time pointinfifo :"<<pointinfifo+1<< " iJob:"<<iJob+1<<endl;*/
}
}
++pointinfifo;
++iJob;
if(pointinfifo >= (fifosize-1)){
/*cout<<"*** sending not normally "<<iJob<<endl;
cout<<"value:"<<((startingmem-mem0)/4096)+1<<endl;
cout<<"pointfifo:"<<pointinfifo<<endl;
cout<<"iJob:"<<iJob<<endl;*/
filter->assignJobsForThread(startingmem,iJob);
startingmem = mem0;
iJob = 0;
pointinfifo = 0;
/*cout<<"***************1trialmem:"<<(void*)startingmem<<endl;
cout<<"1mem0:"<<(void*)mem0<<endl;
cout<<"1fist time pointinfifo :"<<pointinfifo+1<< " iJob:"<<iJob+1<<endl;*/
}
else if(iJob>=(numJobsPerThread)){
/*cout<<"*** sending normally "<<numJobsPerThread<<endl;
cout<<"value:"<<((startingmem-mem0)/4096)+1<<endl;
cout<<"pointfifo:"<<pointinfifo<<endl;
cout<<"iJob:"<<iJob<<endl;*/
filter->assignJobsForThread(startingmem,numJobsPerThread);
iJob = -2;
}
}
//standard way
else{
//find number of packets received
for(i=0,p=0; i < bufferSize; i+=oneBufferSize,++p){
if(((uint32_t)(*((uint32_t*)((wbuf+i))))) == 0xFFFFFFFF){
break;
}
}
//increment counters
totalPacketsCaught += p;
packetsInFile += p;
if(p == packetsPerFrame){
framesCaught++;
totalFramesCaught++;
}
//write to file
if(sfilefd)
fwrite(wbuf, 1, p*oneBufferSize, sfilefd);
else{
cout << "You do not have permissions to overwrite: " << savefilename << endl;
usleep(50000);
}
} }
/*}*/
} }
//does not read every frame //does not read every frame
if(!nFrameToGui){ if(!nFrameToGui){
if((guiData) && (i == packetsPerFrame)){ if((guiData) && (p == packetsPerFrame)){/*change it in funcs*/
/* if(guiData){*/
pthread_mutex_lock(&dataReadyMutex); pthread_mutex_lock(&dataReadyMutex);
guiDataReady=0; guiDataReady=0;
pthread_mutex_unlock(&dataReadyMutex); pthread_mutex_unlock(&dataReadyMutex);
@ -816,7 +919,7 @@ int slsReceiverFunctionList::startWriting(){
} }
//reads every nth frame //reads every nth frame
else{ else{
if (i != packetsPerFrame)//so no 1 packet frame writing over previous 2 packet frame if (p != packetsPerFrame)//so no 1 packet frame writing over previous 2 packet frame
; ;
else if(frameFactor){ else if(frameFactor){
frameFactor--; frameFactor--;
@ -837,33 +940,60 @@ int slsReceiverFunctionList::startWriting(){
} }
} }
fifofree->push(wbuf);
if(!dataCompression)
fifofree->push(wbuf);
} }
} }
else{//cout<<"************************fifo empty**********************************"<<endl; else{//cout<<"************************fifo empty**********************************"<<endl;
//change status to idle if the fifo is empty and status is transmitting
if(status == TRANSMITTING){ //acquisition in the detector is done
pthread_mutex_lock(&status_mutex); if(status==TRANSMITTING){
status = RUN_FINISHED;
pthread_mutex_unlock(&(status_mutex)); //compression
cout << "Status: Run Finished" << endl; if(dataCompression){
}else{ //popped data
if(iJob>0){
cout<<"sending at the end "<<iJob<<endl;
filter->assignJobsForThread(startingmem,iJob);
iJob = -2;
}
//no more popped data
else{
//all jobs done
if(filter->checkIfJobsDone()){
//its all done
pthread_mutex_lock(&status_mutex);
status = RUN_FINISHED;
pthread_mutex_unlock(&(status_mutex));
cout << "Status: Run Finished" << endl;
}
}
}
//standard way
else{
pthread_mutex_lock(&status_mutex);
status = RUN_FINISHED;
pthread_mutex_unlock(&(status_mutex));
cout << "Status: Run Finished" << endl;
}
}
//acquisition not done in detector
else{
sleepnumber++; sleepnumber++;
usleep(50000); usleep(50000);
} }
} }
} }
pthread_mutex_lock(&status_mutex);
receiver_threads_running=0;
pthread_mutex_unlock(&status_mutex);
cout << "Total Packets Caught:" << dec << totalPacketsCaught << endl; cout << "Total Packets Caught:" << dec << totalPacketsCaught << endl;
//cout << "RealTime Full Frames Caught:" << dec << framesCaught << endl; //cout << "RealTime Full Frames Caught:" << dec << framesCaught << endl;
cout << "Total Full Frames Caught:"<< dec << totalFramesCaught << endl; cout << "Total Full Frames Caught:"<< dec << totalFramesCaught << endl;
if(sfilefd){ if((!dataCompression)&&(sfilefd)){
#ifdef VERBOSE #ifdef VERBOSE
cout << "sfield:" << (int)sfilefd << endl; cout << "sfield:" << (int)sfilefd << endl;
#endif #endif
@ -921,6 +1051,7 @@ int slsReceiverFunctionList::setShortFrame(int i){
packetsPerFrame = GOTTHARD_SHORT_PACKETS_PER_FRAME; packetsPerFrame = GOTTHARD_SHORT_PACKETS_PER_FRAME;
frameIndexMask = GOTTHARD_SHORT_FRAME_INDEX_MASK; frameIndexMask = GOTTHARD_SHORT_FRAME_INDEX_MASK;
frameIndexOffset = GOTTHARD_SHORT_FRAME_INDEX_OFFSET; frameIndexOffset = GOTTHARD_SHORT_FRAME_INDEX_OFFSET;
numJobsPerThread = GOTTHARD_SHORT_NUM_JOBS_P_THREAD;
}else{ }else{
bufferSize = GOTTHARD_BUFFER_SIZE; bufferSize = GOTTHARD_BUFFER_SIZE;
@ -928,77 +1059,80 @@ int slsReceiverFunctionList::setShortFrame(int i){
packetsPerFrame = GOTTHARD_PACKETS_PER_FRAME; packetsPerFrame = GOTTHARD_PACKETS_PER_FRAME;
frameIndexMask = GOTTHARD_FRAME_INDEX_MASK; frameIndexMask = GOTTHARD_FRAME_INDEX_MASK;
frameIndexOffset = GOTTHARD_FRAME_INDEX_OFFSET; frameIndexOffset = GOTTHARD_FRAME_INDEX_OFFSET;
numJobsPerThread = GOTTHARD_NUM_JOBS_P_THREAD;
} }
if(userDefinedNumJobsPerThread)
numJobsPerThread = userDefinedNumJobsPerThread;
oneBufferSize = bufferSize/packetsPerFrame; oneBufferSize = bufferSize/packetsPerFrame;
//if the filter is inititalized with the wrong readout //if the filter is inititalized with the wrong readout
if(filter->getPacketsPerFrame() != packetsPerFrame){ if(filter->getPacketsPerFrame() != packetsPerFrame){
vector <vector<int16_t> > map; int16_t* map;
vector <vector<int16_t> > mask; int16_t* mask;
int initial_offset = 4;
int later_offset = 2; int initial_offset = 2;
int later_offset = 1;
int x,y,i,j,offset = 0; int x,y,i,j,offset = 0;
switch(packetsPerFrame){ switch(packetsPerFrame){
case GOTTHARD_SHORT_PACKETS_PER_FRAME://roi readout for gotthard case GOTTHARD_SHORT_PACKETS_PER_FRAME://roi readout for gotthard
x = 1; x = 1;
y = (GOTTHARD_DATA_BYTES/GOTTHARD_PACKETS_PER_FRAME)/2; y = (GOTTHARD_SHORT_DATABYTES/sizeof(int16_t));
offset = initial_offset; offset = initial_offset;
mask.resize(x);
for(int i=0;i<x; i++) mask = new int16_t[x*y];
mask[i].resize(y); map = new int16_t[x*y];
map.resize(x);
for(int i=0;i<x; i++) //set up mask for gotthard short
map[i].resize(y);
//set up mask for moench
for (int i=0; i < x; i++) for (int i=0; i < x; i++)
for (int j=0; j < y; j++){ for (int j=0; j < y; j++){
mask[i][j] = 0; mask[i*y+j] = 0;
} }
//set up mapping for gotthard //set up mapping for gotthard short
for (int i=0; i < x; i++) for (int i=0; i < x; i++)
for (int j=0; j < y; j++){ for (int j=0; j < y; j++){
map[i][j] = offset; map[i*y+j] = offset;
offset += 2; offset += 2;
} }
delete filter; delete filter;
filter = new singlePhotonFilter(x,y,frameIndexMask, GOTTHARD_PACKET_INDEX_MASK, frameIndexOffset, 0, GOTTHARD_SHORT_PACKETS_PER_FRAME, 0,map, mask,GOTTHARD_SHORT_BUFFER_SIZE); filter = new singlePhotonFilter(x,y,frameIndexMask, GOTTHARD_PACKET_INDEX_MASK, frameIndexOffset,
0, GOTTHARD_SHORT_PACKETS_PER_FRAME, 0,map, mask,fifofree,GOTTHARD_SHORT_BUFFER_SIZE);
break; break;
default: //normal readout for gotthard default: //normal readout for gotthard
x = 1; x = 1;
y = (GOTTHARD_DATA_BYTES/GOTTHARD_PACKETS_PER_FRAME); y = (GOTTHARD_DATA_BYTES/sizeof(int16_t));
offset = initial_offset; offset = initial_offset;
mask.resize(x); mask = new int16_t[x*y];
for(int i=0;i<x; i++) map = new int16_t[x*y];
mask[i].resize(y);
map.resize(x);
for(int i=0;i<x; i++)
map[i].resize(y);
//set up mask for moench //set up mask for gotthard
for (int i=0; i < x; i++) for (int i=0; i < x; i++)
for (int j=0; j < y; j++){ for (int j=0; j < y; j++){
mask[i][j] = 0; mask[i*y+j] = 0;
} }
//set up mapping for gotthard //set up mapping for gotthard
for (int i=0; i < x; i++) for (int i=0; i < x; i++)
for (int j=0; j < y; j++){ for (int j=0; j < y; j++){
//since there are 2 packets //since there are 2 packets
if (y == y/2) if (j == y/2){
offset += initial_offset; offset += initial_offset;
map[i][j] = offset; offset += 1;
offset += 2; }
map[i*y+j] = offset;
offset += 1;
} }
delete filter; delete filter;
filter = new singlePhotonFilter(x,y,frameIndexMask, GOTTHARD_PACKET_INDEX_MASK, frameIndexOffset, 0, GOTTHARD_PACKETS_PER_FRAME, 1,map, mask,GOTTHARD_BUFFER_SIZE); filter = new singlePhotonFilter(x,y,frameIndexMask, GOTTHARD_PACKET_INDEX_MASK, frameIndexOffset,
0, GOTTHARD_PACKETS_PER_FRAME, 1,map, mask,fifofree,GOTTHARD_BUFFER_SIZE);
break; break;
} }

View File

@ -51,7 +51,7 @@ public:
/** /**
* Returns status of receiver: idle, running or error * Returns status of receiver: idle, running or error
*/ */
runStatus getStatus(){return status;}; runStatus getStatus(){ return status;};
/** /**
* Returns File Name * Returns File Name
@ -71,12 +71,12 @@ public:
/** /**
* Returns Frames Caught for each real time acquisition (eg. for each scan) * Returns Frames Caught for each real time acquisition (eg. for each scan)
*/ */
uint32_t getFramesCaught(){return framesCaught;}; int getFramesCaught(){return framesCaught;};
/** /**
* Returns Total Frames Caught for an entire acquisition (including all scans) * Returns Total Frames Caught for an entire acquisition (including all scans)
*/ */
uint32_t getTotalFramesCaught(){return totalFramesCaught;}; int getTotalFramesCaught(){return totalFramesCaught;};
/** /**
* Returns the frame index at start of each real time acquisition (eg. for each scan) * Returns the frame index at start of each real time acquisition (eg. for each scan)
@ -209,18 +209,21 @@ public:
void startReadout(); void startReadout();
/** enabl data compression, by saving only hits */ /** enabl data compression, by saving only hits */
void enableDataCompression(bool enable){dataCompression = enable;}; void enableDataCompression(bool enable){dataCompression = enable;filter->enableCompression(enable);};
/** get data compression, by saving only hits */ /** get data compression, by saving only hits */
bool getDataCompression(){ return dataCompression;}; bool getDataCompression(){ return dataCompression;};
/** Set Number of Jobs Per Thread */
void setNumberOfJobsPerThread(int i){userDefinedNumJobsPerThread = i; numJobsPerThread = i;};
private: private:
/** detector type */ /** detector type */
detectorType myDetectorType; detectorType myDetectorType;
/** max frames per file **/ /** max frames per file **/
uint32_t maxFramesPerFile; int maxFramesPerFile;
/** File write enable */ /** File write enable */
int enableFileWrite; int enableFileWrite;
@ -241,7 +244,7 @@ private:
int frameIndexNeeded; int frameIndexNeeded;
/** Frames Caught for each real time acquisition (eg. for each scan) */ /** Frames Caught for each real time acquisition (eg. for each scan) */
uint32_t framesCaught; int framesCaught;
/* Acquisition started */ /* Acquisition started */
bool acqStarted; bool acqStarted;
@ -268,7 +271,7 @@ private:
uint32_t acquisitionIndex; uint32_t acquisitionIndex;
/** Packets currently in current file, starts new file when it reaches max */ /** Packets currently in current file, starts new file when it reaches max */
uint32_t packetsInFile; int packetsInFile;
/** Previous Frame number from buffer */ /** Previous Frame number from buffer */
uint32_t prevframenum; uint32_t prevframenum;
@ -328,10 +331,10 @@ private:
int shortFrame; int shortFrame;
/** buffer size can be 1286*2 or 518 or 1286*40 */ /** buffer size can be 1286*2 or 518 or 1286*40 */
uint32_t bufferSize; int bufferSize;
/** number of packets per frame*/ /** number of packets per frame*/
uint32_t packetsPerFrame; int packetsPerFrame;
/** gui data ready */ /** gui data ready */
int guiDataReady; int guiDataReady;
@ -369,6 +372,12 @@ private:
/** guiDataReady mutex */ /** guiDataReady mutex */
pthread_mutex_t dataReadyMutex; pthread_mutex_t dataReadyMutex;
/** Number of jobs per thread for data compression */
int numJobsPerThread;
/** user defined number of jobs per thread for data compression */
int userDefinedNumJobsPerThread;
/** /**
callback arguments are callback arguments are
filepath filepath
@ -412,6 +421,7 @@ private:
int cbAction; int cbAction;
public: public:
/** File Descriptor */ /** File Descriptor */
static FILE *sfilefd; static FILE *sfilefd;

View File

@ -40,6 +40,8 @@ slsReceiverFuncs::slsReceiverFuncs(int argc, char *argv[], int &success):
ifstream infile; ifstream infile;
string sLine,sargname; string sLine,sargname;
int iline = 0; int iline = 0;
bool dcompr = false;
int jobthread = -1;
success=OK; success=OK;
@ -130,17 +132,20 @@ slsReceiverFuncs::slsReceiverFuncs(int argc, char *argv[], int &success):
//parse command line for type etc.. more priority //parse command line for type etc.. more priority
if(success == OK){ if(success == OK){
for(int iarg=1;iarg<argc;iarg++){ for(int iarg=1;iarg<argc;iarg++){
//type //type
if(!strcasecmp(argv[iarg],"-type")){ if(!strcasecmp(argv[iarg],"-type")){
if(iarg+1==argc){ if(iarg+1==argc){
cout << "no detector type given after -type in command line. Exiting." << endl; cout << "no detector type given after -type in command line. Exiting." << endl;
success=FAIL; success=FAIL;
}else{ }else{
if(!strcasecmp(argv[iarg+1],"gotthard")) if(!strcasecmp(argv[iarg+1],"gotthard")){
slsReceiverFuncs::myDetectorType = GOTTHARD; slsReceiverFuncs::myDetectorType = GOTTHARD;
else if(!strcasecmp(argv[iarg+1],"moench")) iarg++;
}else if(!strcasecmp(argv[iarg+1],"moench")){
slsReceiverFuncs::myDetectorType = MOENCH; slsReceiverFuncs::myDetectorType = MOENCH;
else{ iarg++;
}else{
cout << "could not decode detector type in command line. \nOptions are:\ngotthard\nmoench.\n\nExiting." << endl; cout << "could not decode detector type in command line. \nOptions are:\ngotthard\nmoench.\n\nExiting." << endl;
success=FAIL; success=FAIL;
} }
@ -152,20 +157,57 @@ slsReceiverFuncs::slsReceiverFuncs(int argc, char *argv[], int &success):
cout << "no port given after -rx_tcpport in command line. Exiting." << endl; cout << "no port given after -rx_tcpport in command line. Exiting." << endl;
success=FAIL; success=FAIL;
}else{ }else{
if(sscanf(argv[iarg+1],"%d",&port_no)) if(sscanf(argv[iarg+1],"%d",&port_no)){
cout<<"dataport:"<<port_no<<endl; cout<<"dataport:"<<port_no<<endl;
else{ iarg++;
}else{
cout << "could not decode port in command line. \n\nExiting." << endl; cout << "could not decode port in command line. \n\nExiting." << endl;
success=FAIL; success=FAIL;
} }
} }
} }
//compression
else if(!strcasecmp(argv[iarg],"-compression")){
if(iarg+1==argc){
cout << "no value given after -compression in command line. Exiting." << endl;
success=FAIL;
}else {
if(!strcasecmp(argv[iarg+1],"yes")){
dcompr = true;
iarg++;
}else if(!strcasecmp(argv[iarg+1],"no")){
dcompr = true;
iarg++;
}else{
cout << "could not decode value for compression in command line. \n\nExiting." << endl;
success=FAIL;
}
}
}
//jobstothread
else if(!strcasecmp(argv[iarg],"-jobthread")){
if(iarg+1==argc){
cout << "no value given after -jobthread in command line. Exiting." << endl;
success=FAIL;
}else {
if(sscanf(argv[iarg+1],"%d",&jobthread)){
iarg++;
}else{
cout << "could not decode value for jobthread in command line. \n\nExiting." << endl;
success=FAIL;
}
}
}
else{
cout << "Unknown argument:" << argv[iarg] << endl;
success=FAIL;
}
} }
} }
if(success == OK){
if(success == OK){
//display detector message //display detector message
switch(myDetectorType){ switch(myDetectorType){
case GOTTHARD: case GOTTHARD:
@ -180,6 +222,16 @@ slsReceiverFuncs::slsReceiverFuncs(int argc, char *argv[], int &success):
break; break;
} }
} }
//help
else{
cout << "Help Commands " << endl;
cout << "type:\t\t Type of receiver. Default: Gotthard. Options: Moench" << endl;
cout << "rx_tcpport:\t TCP Communication Port with the client. Default:1954. " << endl;
cout << "compression:\t Data Compression. Saving only hits. Option:yes, no" << endl;
cout << "jobthread:\t Number of jobs given to a thread for compression." << endl << endl;
}
//create socket //create socket
if(success == OK){ if(success == OK){
@ -197,6 +249,9 @@ slsReceiverFuncs::slsReceiverFuncs(int argc, char *argv[], int &success):
function_table(); function_table();
slsReceiverList = new slsReceiverFunctionList(myDetectorType); slsReceiverList = new slsReceiverFunctionList(myDetectorType);
if(dcompr) slsReceiverList->enableDataCompression(dcompr);
if(jobthread!=-1) slsReceiverList->setNumberOfJobsPerThread(jobthread);
#ifdef VERBOSE #ifdef VERBOSE
cout << "Function table assigned." << endl; cout << "Function table assigned." << endl;
#endif #endif
@ -1146,8 +1201,10 @@ int slsReceiverFuncs::gotthard_read_frame(){
if(shortFrame!=-1){ if(shortFrame!=-1){
if(bindex != 0xFFFFFFFF) if(bindex != 0xFFFFFFFF)
memcpy((((char*)retval)+(GOTTHARD_SHORT_DATABYTES*shortFrame)),((char*) origVal)+4, GOTTHARD_SHORT_DATABYTES); memcpy((((char*)retval)+(GOTTHARD_SHORT_DATABYTES*shortFrame)),((char*) origVal)+4, GOTTHARD_SHORT_DATABYTES);
else else{
index = startIndex - 1; index = startIndex - 1;
cout << "Missing Packet,Not sending to gui" << endl;
}
} }
//all adc //all adc
else{ else{
@ -1170,8 +1227,10 @@ int slsReceiverFuncs::gotthard_read_frame(){
}else }else
cout << "different frames caught. frame1:"<< hex << index << ":"<<pindex<<" frame2:" << hex << index2 << ":"<<pindex2<<endl; cout << "different frames caught. frame1:"<< hex << index << ":"<<pindex<<" frame2:" << hex << index2 << ":"<<pindex2<<endl;
} }
else else{
index = startIndex - 1; index = startIndex - 1;
cout << "Missing Packet,Not sending to gui" << endl;
}
} }
arg = (index - startIndex); arg = (index - startIndex);