Merge branch 'master' into eiger_receiver

Conflicts:
	include/slsReceiverTCPIPInterface.h
	slsReceiver/slsReceiver.cpp
	src/UDPStandardImplementation.cpp
	src/slsReceiverTCPIPInterface.cpp

it compiles
This commit is contained in:
2014-11-11 15:53:48 +01:00
committed by Manuel Guizar
17 changed files with 362 additions and 209 deletions

View File

@@ -24,10 +24,73 @@
#include <string.h>
#include <iostream>
using namespace std;
#define EIGER_32BIT_INITIAL_CONSTANT 0x17c
UDPStandardImplementation::UDPStandardImplementation()
//:
//thread_started(0),
//eth(NULL),
//latestData(NULL),
//guiFileName(NULL),
//guiFrameNumber(0),
//tengigaEnable(0)
{
thread_started = 0;
eth = NULL;
latestData = NULL;
guiFileName = NULL;
guiFrameNumber = NULL;
tengigaEnable = 0;
for(int i=0;i<MAX_NUM_LISTENING_THREADS;i++){
udpSocket[i] = NULL;
server_port[i] = DEFAULT_UDP_PORTNO+i;
mem0[i] = NULL;
fifo[i] = NULL;
fifoFree[i] = NULL;
}
for(int i=0;i<MAX_NUM_WRITER_THREADS;i++){
singlePhotonDet[i] = NULL;
receiverdata[i] = NULL;
}
startAcquisitionCallBack = NULL;
pStartAcquisition = NULL;
acquisitionFinishedCallBack = NULL;
pAcquisitionFinished = NULL;
rawDataReadyCallBack = NULL;
pRawDataReady = NULL;
initializeMembers();
//mutex
pthread_mutex_init(&dataReadyMutex,NULL);
pthread_mutex_init(&status_mutex,NULL);
pthread_mutex_init(&progress_mutex,NULL);
pthread_mutex_init(&write_mutex,NULL);
//to increase socket receiver buffer size and max length of input queue by changing kernel settings
if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max"))
cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl;
else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog"))
cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl;
/** permanent setting heiner
net.core.rmem_max = 104857600 # 100MiB
net.core.netdev_max_backlog = 250000
sysctl -p
// from the manual
sysctl -w net.core.rmem_max=16777216
sysctl -w net.core.netdev_max_backlog=250000
*/
}
void UDPStandardImplementation::initializeMembers(){
myDetectorType = GENERIC;
maxPacketsPerFile = 0;
@@ -125,63 +188,6 @@ void UDPStandardImplementation::initializeMembers(){
}
UDPStandardImplementation::UDPStandardImplementation(){ FILE_LOG(logDEBUG) << __AT__ << " called";
FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " starting" ;
thread_started = 0;
eth = NULL;
latestData = NULL;
guiFileName = NULL;
guiFrameNumber = 0;
tengigaEnable = 0;
for(int i=0;i<MAX_NUM_LISTENING_THREADS;i++){
udpSocket[i] = NULL;
server_port[i] = DEFAULT_UDP_PORTNO+i;
mem0[i] = NULL;
fifo[i] = NULL;
fifoFree[i] = NULL;
}
for(int i=0;i<MAX_NUM_WRITER_THREADS;i++){
singlePhotonDet[i] = NULL;
receiverdata[i] = NULL;
}
startAcquisitionCallBack = NULL;
pStartAcquisition = NULL;
acquisitionFinishedCallBack = NULL;
pAcquisitionFinished = NULL;
rawDataReadyCallBack = NULL;
pRawDataReady = NULL;
UDPStandardImplementation::initializeMembers();
//mutex
pthread_mutex_init(&dataReadyMutex,NULL);
pthread_mutex_init(&status_mutex,NULL);
pthread_mutex_init(&progress_mutex,NULL);
pthread_mutex_init(&write_mutex,NULL);
//to increase socket receiver buffer size and max length of input queue by changing kernel settings
if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max"))
cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl;
else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog"))
cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl;
/** permanent setting heiner
net.core.rmem_max = 104857600 # 100MiB
net.core.netdev_max_backlog = 250000
sysctl -p
// from the manual
sysctl -w net.core.rmem_max=16777216
sysctl -w net.core.netdev_max_backlog=250000
*/
}
UDPStandardImplementation::~UDPStandardImplementation(){ FILE_LOG(logDEBUG) << __AT__ << " called";
@@ -478,11 +484,15 @@ void UDPStandardImplementation::setEthernetInterface(char* c){ FILE_LOG(logDEBU
}
void UDPStandardImplementation::setUDPPortNo(int p){ FILE_LOG(logDEBUG) << __AT__ << " called";
void UDPStandardImplementation::setUDPPortNo(int p){
FILE_LOG(logDEBUG) << __AT__ << " called";
server_port[0] = p;
}
for(int i=0;i<numListeningThreads;i++){
server_port[i] = p+i;
}
void UDPStandardImplementation::setUDPPortNo2(int p){
FILE_LOG(logDEBUG) << __AT__ << " called";
server_port[1] = p;
}
@@ -835,10 +845,8 @@ void UDPStandardImplementation::setupFifoStructure(){
/** acquisition functions */
void UDPStandardImplementation::readFrame(char* c,char** raw, uint32_t &fnum){
void UDPStandardImplementation::readFrame(char* c,char** raw, uint32_t &fnum, uint32_t &fstartind){
FILE_LOG(logDEBUG) << __AT__ << " called";
//point to gui data
if (guiData == NULL)
guiData = latestData;
@@ -846,7 +854,7 @@ void UDPStandardImplementation::readFrame(char* c,char** raw, uint32_t &fnum){
//copy data and filename
strcpy(c,guiFileName);
fnum = guiFrameNumber;
fstartind = getStartFrameIndex();
//could not get gui data
if(!guiDataReady){
@@ -932,7 +940,17 @@ void UDPStandardImplementation::copyFrameToGui(char* startbuf[], uint32_t fnum,
int UDPStandardImplementation::createUDPSockets(){
FILE_LOG(logDEBUG) << __AT__ << " called";
int port[2];
port[0] = server_port[0];
port[1] = server_port[1];
/** eiger specific */
/*
if(bottom){
port[0] = server_port[1];
port[1] = server_port[0];
}
*/
//if eth is mistaken with ip address
if (strchr(eth,'.')!=NULL)
strcpy(eth,"");
@@ -944,23 +962,25 @@ int UDPStandardImplementation::createUDPSockets(){
cout<<"warning:eth is empty.listening to all"<<endl;
for(int i=0;i<numListeningThreads;i++)
udpSocket[i] = new genericSocket(server_port[i],genericSocket::UDP,bufferSize);
udpSocket[i] = new genericSocket(port[i],genericSocket::UDP,bufferSize);
}
//normal socket
else{
cout<<"eth:"<<eth<<endl;
for(int i=0;i<numListeningThreads;i++)
udpSocket[i] = new genericSocket(server_port[i],genericSocket::UDP,bufferSize,eth);
udpSocket[i] = new genericSocket(port[i],genericSocket::UDP,bufferSize,eth);
}
//error
int iret;
for(int i=0;i<numListeningThreads;i++){
iret = udpSocket[i]->getErrorStatus();
if(iret){
if(!iret)
cout << "UDP port opened at port " << port[i] << endl;
else{
#ifdef VERBOSE
cout << "Could not create UDP socket on port " << server_port[i] << " error:" << iret << endl;
cout << "Could not create UDP socket on port " << port[i] << " error:" << iret << endl;
#endif
return FAIL;
}
@@ -1418,7 +1438,7 @@ int UDPStandardImplementation::startReceiver(char message[]){
cout << endl << message << endl;
return FAIL;
}
cout << "UDP socket(s) created successfully. 1st port " << server_port[0] << endl;
cout << "UDP socket(s) created successfully." << endl;
if(setupWriter() == FAIL){
@@ -1589,9 +1609,13 @@ int UDPStandardImplementation::startListening(){
//normal listening
else if(!carryonBufferSize){
/* if(!ithread){*/
rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, maxBufferSize);
expected = maxBufferSize;
/*}else{
while(1) usleep(100000000);
}
*/
}
//the remaining packets from previous buffer
else{
@@ -1608,24 +1632,26 @@ int UDPStandardImplementation::startListening(){
expected = maxBufferSize - carryonBufferSize;
}
#ifdef VERYDEBUG
//#ifdef VERYDEBUG
cout << ithread << " *** rc:" << dec << rc << ". expected:" << dec << expected << endl;
#endif
//#endif
//start indices for each start of scan/acquisition - eiger does it before
if((!measurementStarted) && (rc > 0) && (!ithread))
startFrameIndices(ithread);
if((!measurementStarted) && (rc > 0) && (!ithread))
startFrameIndices(ithread);
//problem in receiving or end of acquisition
if((rc < expected)||(rc <= 0)){
stopListening(ithread,rc,packetcount,total);
continue;
}
/*
//start indices for each start of scan/acquisition - eiger does it before
if((!measurementStarted) && (rc > 0) && (!ithread))
startFrameIndices(ithread);
*/
//reset
packetcount = (packetsPerFrame/numListeningThreads) * numJobsPerThread;
@@ -1811,7 +1837,10 @@ int loop;
//for progress
if(myDetectorType == EIGER){
tempframenum = htonl(*(unsigned int*)((eiger_image_header *)((char*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum);
tempframenum += (startFrameIndex-1); //eiger frame numbers start at 1, so need to -1
if(dynamicRange != 32)
tempframenum += (startFrameIndex-1); //eiger frame numbers start at 1, so need to -1
else
tempframenum = ((tempframenum / EIGER_32BIT_INITIAL_CONSTANT) + startFrameIndex)-1;//eiger 32 bit mode is a multiple of 17c. +startframeindex for scans
}else if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
tempframenum = (((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset);
else
@@ -1836,6 +1865,7 @@ int loop;
if (cbAction < DO_EVERYTHING){
for(i=0;i<numListeningThreads;++i)
/* for eiger 32 bit mode, currframenum like gotthard, does not start from 0 or 1 */
rawDataReadyCallBack(currframenum, wbuf[i], numpackets * onePacketSize, sfilefd, guiData,pRawDataReady);
}else if (numpackets > 0){
for(i=0;i<numListeningThreads;++i)
@@ -1957,8 +1987,13 @@ void UDPStandardImplementation::startFrameIndices(int ithread){
cout << "startAcquisitionIndex:" << startAcquisitionIndex<<endl;
}
//for scans, cuz currfraenum resets
else if (myDetectorType == EIGER)
startFrameIndex += currframenum;
else if (myDetectorType == EIGER){
if(dynamicRange == 32)
startFrameIndex = (currframenum + 1);// to be added later for scans
else
startFrameIndex += currframenum;
}
cout << "startFrameIndex:" << startFrameIndex<<endl;
@@ -2019,9 +2054,9 @@ int i;
#endif
pthread_mutex_unlock(&(status_mutex));
#ifdef VERYDEBUG
//#ifdef VERYDEBUG
cout << ithread << ": Frames listened to " << dec << ((totalListeningFrameCount[ithread]*numListeningThreads)/packetsPerFrame) << endl;
#endif
//#endif
//waiting for all listening threads to be done, to print final count of frames listened to
if(ithread == 0){