merging with master

This commit is contained in:
Dhanya Maliakal
2015-08-21 14:47:07 +02:00
4 changed files with 63 additions and 35 deletions

View File

@ -60,7 +60,6 @@ class sockaddr_in;
#include <ifaddrs.h> #include <ifaddrs.h>
#endif #endif
#include <stdlib.h> /******exit */ #include <stdlib.h> /******exit */
#include <unistd.h> #include <unistd.h>
@ -71,6 +70,8 @@ class sockaddr_in;
#include <errno.h> #include <errno.h>
#include <stdio.h> #include <stdio.h>
using namespace std; using namespace std;
#define DEFAULT_PACKET_SIZE 1286 #define DEFAULT_PACKET_SIZE 1286
@ -272,7 +273,11 @@ typedef struct
if (socketDescriptor >= 0){ \ if (socketDescriptor >= 0){ \
close(socketDescriptor); \ close(socketDescriptor); \
} \ } \
file_des=-1; \ if(is_a_server and getProtocol() == TCP){\
if(file_des>0)\
close(file_des);\
}
file_des=-1; \
serverAddress.sin_port=-1; \ serverAddress.sin_port=-1; \
}; };
@ -376,6 +381,7 @@ typedef struct
#ifdef VERY_VERBOSE #ifdef VERY_VERBOSE
cout << "client connected "<< file_des << endl; cout << "client connected "<< file_des << endl;
#endif #endif
} }
} }
@ -392,7 +398,6 @@ typedef struct
cerr << "Can not create socket "<<endl; cerr << "Can not create socket "<<endl;
file_des = socketDescriptor; file_des = socketDescriptor;
} else { } else {
if(connect(socketDescriptor,(struct sockaddr *) &serverAddress,sizeof(serverAddress))<0){ if(connect(socketDescriptor,(struct sockaddr *) &serverAddress,sizeof(serverAddress))<0){
cerr << "Can not connect to socket "<<endl; cerr << "Can not connect to socket "<<endl;
file_des = -1; file_des = -1;
@ -413,6 +418,16 @@ typedef struct
int getsocketDescriptor(){return socketDescriptor;}; int getsocketDescriptor(){return socketDescriptor;};
void exitServer(){
if(is_a_server){
if (socketDescriptor>=0){
close(socketDescriptor);
socketDescriptor = -1;
}
}
}
/** @short free connection */ /** @short free connection */
void Disconnect(){ void Disconnect(){
if (protocol==UDP){ if (protocol==UDP){
@ -435,9 +450,8 @@ typedef struct
void ShutDownSocket(){ void ShutDownSocket(){
while(!shutdown(socketDescriptor, SHUT_RDWR)); while(!shutdown(socketDescriptor, SHUT_RDWR));
close(socketDescriptor); Disconnect();
socketDescriptor = -1;
}; };
@ -513,6 +527,9 @@ typedef struct
} }
mac[sizeof(mac)-1]='\0'; mac[sizeof(mac)-1]='\0';
if(sock!=1){
close(sock);
}
return string(mac); return string(mac);
}; };
@ -534,6 +551,9 @@ typedef struct
strncpy(addr,p,sizeof(addr)-1); strncpy(addr,p,sizeof(addr)-1);
addr[sizeof(addr)-1]='\0'; addr[sizeof(addr)-1]='\0';
if(sock!=1){
close(sock);
}
return string(addr); return string(addr);
}; };

View File

@ -745,15 +745,17 @@ int UDPBaseImplementation::createUDPSockets(){ FILE_LOG(logDEBUG) << __AT__ << "
if(!strlen(eth)){ if(!strlen(eth)){
cout<<"warning:eth is empty.listening to all"<<endl; cout<<"warning:eth is empty.listening to all"<<endl;
for(int i=0;i<numListeningThreads;i++) for(int i=0;i<numListeningThreads;i++){
udpSocket[i] = new genericSocket(server_port[i],genericSocket::UDP,bufferSize); udpSocket[i] = new genericSocket(server_port[i],genericSocket::UDP,bufferSize);
}
} }
//normal socket //normal socket
else{ else{
cout<<"eth:"<<eth<<endl; cout<<"eth:"<<eth<<endl;
for(int i=0;i<numListeningThreads;i++) for(int i=0;i<numListeningThreads;i++){
udpSocket[i] = new genericSocket(server_port[i],genericSocket::UDP,bufferSize,eth); udpSocket[i] = new genericSocket(server_port[i],genericSocket::UDP,bufferSize,eth);
}
} }
//error //error

View File

@ -80,6 +80,7 @@ UDPStandardImplementation::UDPStandardImplementation()
cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl; cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl;
else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")) else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog"))
cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl; cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl;
/** permanent setting heiner /** permanent setting heiner
net.core.rmem_max = 104857600 # 100MiB net.core.rmem_max = 104857600 # 100MiB
net.core.netdev_max_backlog = 250000 net.core.netdev_max_backlog = 250000
@ -1063,24 +1064,26 @@ int UDPStandardImplementation::createUDPSockets(){
if(!strlen(eth)){ if(!strlen(eth)){
cout<<"warning:eth is empty.listening to all"<<endl; cout<<"warning:eth is empty.listening to all"<<endl;
for(int i=0;i<numListeningThreads;i++) for(int i=0;i<numListeningThreads;i++){
udpSocket[i] = new genericSocket(port[i],genericSocket::UDP,bufferSize); udpSocket[i] = new genericSocket(port[i],genericSocket::UDP,bufferSize);
}
} }
//normal socket //normal socket
else{ else{
cout<<"eth:"<<eth<<endl; cout<<"eth:"<<eth<<endl;
for(int i=0;i<numListeningThreads;i++) for(int i=0;i<numListeningThreads;i++){
udpSocket[i] = new genericSocket(port[i],genericSocket::UDP,bufferSize,eth); udpSocket[i] = new genericSocket(port[i],genericSocket::UDP,bufferSize,eth);
}
} }
//error //error
int iret; int iret;
for(int i=0;i<numListeningThreads;i++){ for(int i=0;i<numListeningThreads;i++){
iret = udpSocket[i]->getErrorStatus(); iret = udpSocket[i]->getErrorStatus();
if(!iret) if(!iret){
cout << "UDP port opened at port " << port[i] << endl; cout << "UDP port opened at port " << port[i] << endl;
else{ }else{
#ifdef VERBOSE #ifdef VERBOSE
cprintf(BG_RED,"Could not create UDP socket on port %d error: %d\n", port[i], iret); cprintf(BG_RED,"Could not create UDP socket on port %d error: %d\n", port[i], iret);
#endif #endif
@ -1089,6 +1092,8 @@ int UDPStandardImplementation::createUDPSockets(){
} }
} }
return OK; return OK;
} }
@ -1286,7 +1291,7 @@ int UDPStandardImplementation::setupWriter(){
numMissingPackets = 0; numMissingPackets = 0;
packetsCaught=0; packetsCaught=0;
frameIndex=0; frameIndex=0;
if(sfilefd) sfilefd=NULL; if(sfilefd) {cprintf(RED,"**FILE not closed!\n");fclose(sfilefd);sfilefd=NULL;}
guiData = NULL; guiData = NULL;
guiDataReady=0; guiDataReady=0;
strcpy(guiFileName,""); strcpy(guiFileName,"");
@ -1414,9 +1419,13 @@ int UDPStandardImplementation::createNewFile(){
if(enableFileWrite && cbAction > DO_NOTHING){ if(enableFileWrite && cbAction > DO_NOTHING){
//close //close
if(sfilefd){ if(sfilefd){
fclose(sfilefd); if(fclose(sfilefd)){
cprintf(RED, "file close problem %d\n",fileno(sfilefd));
fclose(sfilefd);
}
sfilefd = NULL; sfilefd = NULL;
} }
//open file //open file
if(!overwrite){ if(!overwrite){
if (NULL == (sfilefd = fopen((const char *) (savefilename), "wx"))){ if (NULL == (sfilefd = fopen((const char *) (savefilename), "wx"))){
@ -1430,6 +1439,7 @@ int UDPStandardImplementation::createNewFile(){
//setting buffer //setting buffer
setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE); setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE);
//printing packet losses and file names //printing packet losses and file names
if(!packetsCaught) if(!packetsCaught)
cout << savefilename << endl; cout << savefilename << endl;
@ -1473,9 +1483,10 @@ void UDPStandardImplementation::closeFile(int ithr){
if(!dataCompression){ if(!dataCompression){
if(sfilefd){ if(sfilefd){
#ifdef VERBOSE #ifdef VERBOSE
cout << "sfield:" << (int)sfilefd << endl; cprintf(YELLOW, "gonna close file:%d\n",fileno(sfilefd));
#endif #endif
fclose(sfilefd); if(fclose(sfilefd))
perror("file close ERROR");
sfilefd = NULL; sfilefd = NULL;
} }
} }
@ -1486,7 +1497,8 @@ void UDPStandardImplementation::closeFile(int ithr){
#ifdef VERBOSE #ifdef VERBOSE
cout << "sfield:" << (int)sfilefd << endl; cout << "sfield:" << (int)sfilefd << endl;
#endif #endif
fclose(sfilefd); if(fclose(sfilefd))
perror("close ERRROR");
sfilefd = NULL; sfilefd = NULL;
} }
#endif #endif
@ -1625,7 +1637,6 @@ int UDPStandardImplementation::stopReceiver(){
}else cout <<" Not idle to stop receiver" << endl; }else cout <<" Not idle to stop receiver" << endl;
//sem_post(&smp); //sem_post(&smp);
return OK; return OK;
@ -2483,18 +2494,8 @@ void UDPStandardImplementation::startFrameIndices(int ithread, int numbytes){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
//add currframenum later in this method for scans //add currframenum later in this method for scans
if (myDetectorType == EIGER){ if (myDetectorType == EIGER)
//check if its a header
startFrameIndex = 0; startFrameIndex = 0;
/*
if(EIGER_HEADER_LENGTH == numbytes)
startFrameIndex = (htonl(*(unsigned int*)((eiger_image_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum))-1;
//missed header packet, so default value
else
startFrameIndex = ((*(uint32_t*)(((eiger_packet_header *)((char*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->num1))-1);
cout<<"startFrameIndex["<<ithread<<"]:"<<startFrameIndex<<endl;
*/
}
//gotthard has +1 for frame number and not a short frame //gotthard has +1 for frame number and not a short frame
else if ((myDetectorType == PROPIX) || ((myDetectorType == GOTTHARD) && (shortFrame == -1))) else if ((myDetectorType == PROPIX) || ((myDetectorType == GOTTHARD) && (shortFrame == -1)))
startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)
@ -2511,11 +2512,6 @@ void UDPStandardImplementation::startFrameIndices(int ithread, int numbytes){
acqStarted = true; acqStarted = true;
cprintf(BLUE,"%d startAcquisitionIndex:%d\n", ithread, startAcquisitionIndex); cprintf(BLUE,"%d startAcquisitionIndex:%d\n", ithread, startAcquisitionIndex);
} }
/*//for scans, cuz currfraenum resets
else if (myDetectorType == EIGER){
startFrameIndex += (currframenum+1);
}*/
cprintf(BLUE,"%d startFrameIndex: %d\n", ithread,startFrameIndex); cprintf(BLUE,"%d startFrameIndex: %d\n", ithread,startFrameIndex);
prevframenum=startFrameIndex-1; //so that there is no packet loss, when currframenum(max,20) - prevframenum(1) prevframenum=startFrameIndex-1; //so that there is no packet loss, when currframenum(max,20) - prevframenum(1)

View File

@ -65,7 +65,6 @@ slsReceiverTCPIPInterface::slsReceiverTCPIPInterface(int &success, UDPInterface*
strcpy(socket->lastClientIP,"none"); strcpy(socket->lastClientIP,"none");
strcpy(socket->thisClientIP,"none1"); strcpy(socket->thisClientIP,"none1");
strcpy(mess,"dummy message"); strcpy(mess,"dummy message");
function_table(); function_table();
#ifdef VERBOSE #ifdef VERBOSE
cout << "Function table assigned." << endl; cout << "Function table assigned." << endl;
@ -148,8 +147,18 @@ void slsReceiverTCPIPInterface::stop(){
cout<<"Shutting down TCP Socket and TCP thread"<<endl; cout<<"Shutting down TCP Socket and TCP thread"<<endl;
cout << "Shutting down UDP Socket" << endl;
if(receiverBase){
receiverBase->shutDownUDPSockets();
cout << "Closing Files... " << endl;
receiverBase->closeFile();
}
killTCPServerThread = 1; killTCPServerThread = 1;
socket->ShutDownSocket(); socket->ShutDownSocket();
socket->exitServer();
cout<<"Socket closed"<<endl; cout<<"Socket closed"<<endl;
void* status; void* status;
pthread_join(TCPServer_thread, &status); pthread_join(TCPServer_thread, &status);
@ -207,6 +216,7 @@ void slsReceiverTCPIPInterface::startTCPServer(){
receiverBase->closeFile(); receiverBase->closeFile();
} }
socket->exitServer();
pthread_exit(NULL); pthread_exit(NULL);
} }