made udp socket in receiver use generic socket, included a incrememnt frame index in acquire before a break, got rid of infinite loop for unmatched index in receiver

git-svn-id: file:///afs/psi.ch/project/sls_det_software/svn/slsDetectorSoftware@350 951219d9-93cf-4727-9268-0efd64621fa3
This commit is contained in:
l_maliakal_d 2012-11-19 16:29:17 +00:00
parent 1b7c42540a
commit 97ed3e8872
7 changed files with 65 additions and 101 deletions

View File

@ -90,9 +90,6 @@ enum communicationProtocol{
// portno(port_number), // portno(port_number),
protocol(p), is_a_server(0), socketDescriptor(-1),file_des(-1), packet_size(DEFAULT_PACKET_SIZE)// sender (client): where to? ip protocol(p), is_a_server(0), socketDescriptor(-1),file_des(-1), packet_size(DEFAULT_PACKET_SIZE)// sender (client): where to? ip
{ {
/* pthread_mutex_t mp1 = PTHREAD_MUTEX_INITIALIZER; */
/* mp=mp1; */
/* pthread_mutex_init(&mp, NULL); */
// strcpy(hostname,host_ip_or_name); // strcpy(hostname,host_ip_or_name);
struct hostent *hostInfo = gethostbyname(host_ip_or_name); struct hostent *hostInfo = gethostbyname(host_ip_or_name);
@ -346,9 +343,6 @@ protocol(p), is_a_server(0), socketDescriptor(-1),file_des(-1), packet_size(DEFA
file_des = -1; file_des = -1;
} else{ } else{
file_des = socketDescriptor; file_des = socketDescriptor;
/*cout<<"locking"<<endl;
pthread_mutex_lock(&mp);
cout<<"locked"<<endl;*/
} }
} }
@ -362,23 +356,31 @@ protocol(p), is_a_server(0), socketDescriptor(-1),file_des(-1), packet_size(DEFA
/** @short free connection */ /** @short free connection */
void Disconnect(){ void Disconnect(){
if (protocol==UDP){
if(file_des>=0){ //then was open close(socketDescriptor);
if(is_a_server){ socketDescriptor=-1;
close(file_des); }
else{
if(file_des>=0){ //then was open
if(is_a_server){
close(file_des);
}
else {
close(socketDescriptor);
socketDescriptor=-1;
}
file_des=-1;
} }
else {
close(socketDescriptor);
socketDescriptor=-1;
}
file_des=-1;
/*cout<<"unlocking"<<endl;
pthread_mutex_unlock(&mp);
cout<<"unlocked"<<endl;*/
} }
}; };
void ShutDownSocket(){
if (socketDescriptor != -1)
shutdown(socketDescriptor, SHUT_RDWR);
};
/** Set the socket timeout ts is in seconds */ /** Set the socket timeout ts is in seconds */
int SetTimeOut(int ts){ int SetTimeOut(int ts){

View File

@ -386,8 +386,19 @@ void slsDetectorUtils::acquire(int delflag){
if (*stoppedFlag==0) { if (*stoppedFlag==0) {
executeAction(stopScript); executeAction(stopScript);
} else } else{
pthread_mutex_lock(&mg);
#ifdef VERBOSE
cout << "findex incremented " << endl;
#endif
if(*correctionMask&(1<<WRITE_FILE))
IncrementFileIndex();
setFileIndex(fileIO::getFileIndex());
pthread_mutex_unlock(&mg);
break; break;
}
@ -419,7 +430,7 @@ void slsDetectorUtils::acquire(int delflag){
pthread_join(dataProcessingThread, &status); pthread_join(dataProcessingThread, &status);
} }
if (connectChannels) { if (connectChannels) {
if (disconnect_channels) if (disconnect_channels)
disconnect_channels(DCarg); disconnect_channels(DCarg);

View File

@ -6,7 +6,7 @@
#include <stdint.h> #include <stdint.h>
#define GOODBYE -200 #define GOODBYE -200
#define HALF_BUFFER_SIZE 1286 #define BUFFER_SIZE 1286*2
#define DEFAULT_UDP_PORT 50001 #define DEFAULT_UDP_PORT 50001

View File

@ -22,9 +22,10 @@ int main(int argc, char *argv[])
//assign function table //assign function table
slsReceiverFuncs *receiver = new slsReceiverFuncs(socket); slsReceiverFuncs *receiver = new slsReceiverFuncs(socket);
#ifdef VERBOSE #ifdef VERBOSE
cout << "Function table assigned.\n Ready..." << endl; cout << "Function table assigned." << endl;
#endif #endif
cout << " Ready..." << endl;
//waits for connection //waits for connection
while(retval!=GOODBYE) { while(retval!=GOODBYE) {
#ifdef VERBOSE #ifdef VERBOSE

View File

@ -33,15 +33,14 @@ slsReceiverFunctionList::slsReceiverFunctionList():
startAcquisitionIndex(-1), startAcquisitionIndex(-1),
acquisitionIndex(0), acquisitionIndex(0),
framesInFile(0), framesInFile(0),
udpSocket(-1),
status(IDLE), status(IDLE),
udpSocket(NULL),
server_port(DEFAULT_UDP_PORT) server_port(DEFAULT_UDP_PORT)
{ {
strcpy(savefilename,""); strcpy(savefilename,"");
strcpy(filePath,""); strcpy(filePath,"");
strcpy(fileName,"run"); strcpy(fileName,"run");
strcpy(buffer,""); strcpy(buffer,"");
} }
@ -90,7 +89,6 @@ char* slsReceiverFunctionList::setFilePath(char c[]){
int slsReceiverFunctionList::setFileIndex(int i){ int slsReceiverFunctionList::setFileIndex(int i){
if(i>=0) if(i>=0)
fileIndex = i; fileIndex = i;
return getFileIndex(); return getFileIndex();
} }
@ -153,8 +151,7 @@ int slsReceiverFunctionList::stopReceiver(){
cout << "Stopping new acquisition threadddd ...." << endl; cout << "Stopping new acquisition threadddd ...." << endl;
//stop thread //stop thread
listening_thread_running=0; listening_thread_running=0;
if (udpSocket != -1) udpSocket->ShutDownSocket();
shutdown(udpSocket, SHUT_RDWR);
pthread_join(listening_thread,NULL); pthread_join(listening_thread,NULL);
status = IDLE; status = IDLE;
} }
@ -179,12 +176,7 @@ int slsReceiverFunctionList::startListening(){
#endif #endif
// Variable and structure definitions // Variable and structure definitions
/*udpSocket = -1;*/ int rc, currframenum, prevframenum;
int rc1, rc2, rc;
int currframenum, prevframenum;
struct sockaddr_in serveraddr;
struct sockaddr_in clientaddr;
socklen_t clientaddrlen = sizeof(clientaddr);
//reset variables for each acquisition //reset variables for each acquisition
framesInFile=0; framesInFile=0;
@ -208,42 +200,17 @@ int slsReceiverFunctionList::startListening(){
// close() of each of the socket descriptors is only done once at the // close() of each of the socket descriptors is only done once at the
// very end of the program. // very end of the program.
do { do {
// The socket() function returns a socket descriptor, which represents
// an endpoint. The statement also identifies that the INET udpSocket = new genericSocket(server_port,genericSocket::UDP);
// (Internet Protocol) address family with the UDP transport if (udpSocket->getErrorStatus()){
// (SOCK_DGRAM) will be used for this socket.
udpSocket = socket(AF_INET, SOCK_DGRAM, 0);
if (udpSocket < 0) {
cerr << "socket() failed" << endl;
break; break;
} }
// After the socket descriptor is created, a bind() function gets a
// unique name for the socket. In this example, the user sets the
// s_addr to zero, which means that the UDP port of 3555 will be
// bound to all IP addresses on the system.
memset(&serveraddr, 0, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
serveraddr.sin_port = htons(server_port);
//serveraddr.sin_addr.s_addr = inet_addr(server_ip);
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
rc = bind(udpSocket, (struct sockaddr *) &serveraddr, sizeof(serveraddr));
if (rc < 0) {
cerr << "bind() failed" << endl;
break;
}
sfilefd = fopen((const char *) (savefilename), "w"); sfilefd = fopen((const char *) (savefilename), "w");
cout << "Saving to " << savefilename << ". Ready! " << endl; cout << "Saving to " << savefilename << ". Ready! " << endl;
while (listening_thread_running) { while (listening_thread_running) {
// The server uses the recvfrom() function to receive that data.
// The recvfrom() function waits indefinitely for data to arrive.
if (framesInFile == 20000) { if (framesInFile == 20000) {
fclose(sfilefd); fclose(sfilefd);
@ -263,12 +230,11 @@ int slsReceiverFunctionList::startListening(){
} }
status = RUNNING; status = RUNNING;
rc1 = recvfrom(udpSocket, buffer, HALF_BUFFER_SIZE, 0,
(struct sockaddr *) &clientaddr, &clientaddrlen);
//cout << "rc1 done" << endl;
rc2 = recvfrom(udpSocket, buffer+HALF_BUFFER_SIZE, HALF_BUFFER_SIZE, 0,
(struct sockaddr *) &clientaddr, &clientaddrlen);
//receiver 2 half frames
rc = udpSocket->ReceiveDataOnly(buffer,sizeof(buffer));
if( rc < 0)
cerr << "recvfrom() failed" << endl;
//for each scan //for each scan
if(startFrameIndex==-1){ if(startFrameIndex==-1){
@ -279,35 +245,29 @@ int slsReceiverFunctionList::startListening(){
if(startAcquisitionIndex==-1) if(startAcquisitionIndex==-1)
startAcquisitionIndex=startFrameIndex; startAcquisitionIndex=startFrameIndex;
//cout << "rc2 done" << endl;
if ((rc1 < 0) || (rc2 < 0)) {
perror("recvfrom() failed");
break;
}
//so that it doesnt write the last frame twice //so that it doesnt write the last frame twice
if(listening_thread_running){ if(listening_thread_running){
fwrite(buffer, 1, rc1, sfilefd); fwrite(buffer, 1, rc, sfilefd);
fwrite(buffer+HALF_BUFFER_SIZE, 1, rc2, sfilefd);
framesInFile++; framesInFile++;
framesCaught++; framesCaught++;
totalFramesCaught++; totalFramesCaught++;
//cout << "saving" << endl;
} }
} }
} while (listening_thread_running); } while (listening_thread_running);
listening_thread_running=0; listening_thread_running=0;
status = IDLE; status = IDLE;
//Close down any open socket descriptors //Close down any open socket descriptors
if (udpSocket != -1){ udpSocket->Disconnect();
cout << "Closing udpSocket" << endl;
close(udpSocket);
}
//close file //close file
fclose(sfilefd); fclose(sfilefd);
#ifdef VERBOSE
cout << "sfield:" << (int)sfilefd << endl; cout << "sfield:" << (int)sfilefd << endl;
#endif
return 0; return 0;
} }
@ -317,8 +277,6 @@ int slsReceiverFunctionList::startListening(){
char* slsReceiverFunctionList::readFrame(char* c){ char* slsReceiverFunctionList::readFrame(char* c){
strcpy(c,savefilename); strcpy(c,savefilename);
return buffer; return buffer;

View File

@ -7,6 +7,7 @@
#include "sls_detector_defs.h" #include "sls_detector_defs.h"
#include "receiver_defs.h" #include "receiver_defs.h"
#include "genericSocket.h"
#include <pthread.h> #include <pthread.h>
#include <stdio.h> #include <stdio.h>
@ -186,11 +187,11 @@ private:
/** File Descriptor */ /** File Descriptor */
static FILE *sfilefd; static FILE *sfilefd;
/** UDP Socket - with server */
int udpSocket;
/** Receiver buffer */ /** Receiver buffer */
char buffer[HALF_BUFFER_SIZE*2]; char buffer[BUFFER_SIZE];
/** UDP Socket between Receiver and Detector */
genericSocket* udpSocket;
/** Server UDP Port*/ /** Server UDP Port*/
int server_port; int server_port;
@ -198,22 +199,6 @@ private:
}; };
/* /*
int getStartFrameIndex();
//int setUDPPortNumber(int p=-1); //sets/gets port number to listen to for data from the detector //int setUDPPortNumber(int p=-1); //sets/gets port number to listen to for data from the detector
//int setTCPPortNumber(int p=-1); //sets/get port number for communication to client //int setTCPPortNumber(int p=-1); //sets/get port number for communication to client
*/ */

View File

@ -496,7 +496,14 @@ int slsReceiverFuncs::read_frame(){
if(ret==OK){ if(ret==OK){
int count=0; int count=0;
do{ do{
if(count>0){ cout << endl << "unmatching: index:" << index <<" index2:" << index2 << endl;} if(count>0){
cout << endl << "unmatching: index:" << index <<" index2:" << index2 << endl;
if(count>10){
strcpy(mess,"unmatching index. could not read frame.\n");
ret=FAIL;
break;
}
}
retval=slsReceiverList->readFrame(fName); retval=slsReceiverList->readFrame(fName);
index=(int)(*((int*)retval)); index=(int)(*((int*)retval));
char* retval2= retval+1286; char* retval2= retval+1286;