closeDataFile is only for gotthard and included lock for socket while using receiver

git-svn-id: file:///afs/psi.ch/project/sls_det_software/svn/slsDetectorSoftware@338 951219d9-93cf-4727-9268-0efd64621fa3
This commit is contained in:
l_maliakal_d 2012-11-13 08:38:18 +00:00
parent 8f7872d056
commit bf7da5f493
7 changed files with 153 additions and 123 deletions

View File

@ -89,6 +89,10 @@ enum communicationProtocol{
genericSocket(const char* const host_ip_or_name, unsigned short int const port_number, communicationProtocol p) : genericSocket(const char* const host_ip_or_name, unsigned short int const port_number, communicationProtocol p) :
portno(port_number), protocol(p), is_a_server(0), socketDescriptor(-1),file_des(-1), packet_size(DEFAULT_PACKET_SIZE)// sender (client): where to? ip portno(port_number), protocol(p), is_a_server(0), socketDescriptor(-1),file_des(-1), packet_size(DEFAULT_PACKET_SIZE)// sender (client): where to? ip
{ {
pthread_mutex_t mp1 = PTHREAD_MUTEX_INITIALIZER;
mp=mp1;
pthread_mutex_init(&mp, NULL);
strcpy(hostname,host_ip_or_name); strcpy(hostname,host_ip_or_name);
struct hostent *hostInfo = gethostbyname(host_ip_or_name); struct hostent *hostInfo = gethostbyname(host_ip_or_name);
if (hostInfo == NULL){ if (hostInfo == NULL){
@ -237,92 +241,96 @@ enum communicationProtocol{
/** @short etablishes connection; disconnect should always follow /** @short etablishes connection; disconnect should always follow
\returns 1 if error \returns 1 if error
*/ */
int Connect(){ int Connect(){//cout<<"connect"<<endl;
if(file_des>0) return file_des; if(file_des>0) return file_des;
if(is_a_server && protocol==TCP){ //server tcp; the server will wait for the clients connection if(is_a_server && protocol==TCP){ //server tcp; the server will wait for the clients connection
if (socketDescriptor>0) { if (socketDescriptor>0) {
if ((file_des = accept(socketDescriptor,(struct sockaddr *) &clientAddress, &clientAddress_length)) < 0) { if ((file_des = accept(socketDescriptor,(struct sockaddr *) &clientAddress, &clientAddress_length)) < 0) {
cerr << "Error: with server accept, connection refused"<<endl; cerr << "Error: with server accept, connection refused"<<endl;
switch(errno) { switch(errno) {
case EWOULDBLOCK: case EWOULDBLOCK:
printf("ewouldblock eagain\n"); printf("ewouldblock eagain\n");
break; break;
case EBADF: case EBADF:
printf("ebadf\n"); printf("ebadf\n");
break; break;
case ECONNABORTED: case ECONNABORTED:
printf("econnaborted\n"); printf("econnaborted\n");
break; break;
case EFAULT: case EFAULT:
printf("efault\n"); printf("efault\n");
break; break;
case EINTR: case EINTR:
printf("eintr\n"); printf("eintr\n");
break; break;
case EINVAL: case EINVAL:
printf("einval\n"); printf("einval\n");
break; break;
case EMFILE: case EMFILE:
printf("emfile\n"); printf("emfile\n");
break; break;
case ENFILE: case ENFILE:
printf("enfile\n"); printf("enfile\n");
break; break;
case ENOTSOCK: case ENOTSOCK:
printf("enotsock\n"); printf("enotsock\n");
break; break;
case EOPNOTSUPP: case EOPNOTSUPP:
printf("eOPNOTSUPP\n"); printf("eOPNOTSUPP\n");
break; break;
case ENOBUFS: case ENOBUFS:
printf("ENOBUFS\n"); printf("ENOBUFS\n");
break; break;
case ENOMEM: case ENOMEM:
printf("ENOMEM\n"); printf("ENOMEM\n");
break; break;
case ENOSR: case ENOSR:
printf("ENOSR\n"); printf("ENOSR\n");
break; break;
case EPROTO: case EPROTO:
printf("EPROTO\n"); printf("EPROTO\n");
break; break;
default: default:
printf("unknown error\n"); printf("unknown error\n");
} }
socketDescriptor=-1; socketDescriptor=-1;
} }
#ifdef VERY_VERBOSE #ifdef VERY_VERBOSE
else else
cout << "client connected "<< file_des << endl; cout << "client connected "<< file_des << endl;
#endif #endif
} }
// file_des = socketDescriptor; // file_des = socketDescriptor;
#ifdef VERY_VERBOSE #ifdef VERY_VERBOSE
cout << "fd " << file_des << endl; cout << "fd " << file_des << endl;
#endif #endif
} else { } else {
if (socketDescriptor<=0) if (socketDescriptor<=0)
socketDescriptor = socket(AF_INET, getProtocol(),0); socketDescriptor = socket(AF_INET, getProtocol(),0);
// SetTimeOut(10); // SetTimeOut(10);
if (socketDescriptor < 0){ if (socketDescriptor < 0){
cerr << "Can not create socket "<<endl; cerr << "Can not create socket "<<endl;
file_des = socketDescriptor; file_des = socketDescriptor;
} else { } else {
if(connect(socketDescriptor,(struct sockaddr *) &serverAddress,sizeof(serverAddress))<0){ if(connect(socketDescriptor,(struct sockaddr *) &serverAddress,sizeof(serverAddress))<0){
cerr << "Can not connect to socket "<<endl; cerr << "Can not connect to socket "<<endl;
file_des = -1; file_des = -1;
} else } else{
file_des = socketDescriptor; file_des = socketDescriptor;
} /*cout<<"locking"<<endl;
pthread_mutex_lock(&mp);
cout<<"locked"<<endl;*/
}
}
} }
return file_des; return file_des;
} }
@ -331,16 +339,19 @@ enum communicationProtocol{
/** @short free connection */ /** @short free connection */
void Disconnect(){ void Disconnect(){
if(file_des>=0){ //then was open if(file_des>=0){ //then was open
if(is_a_server){ if(is_a_server){
close(file_des); close(file_des);
} }
else { else {
close(socketDescriptor); close(socketDescriptor);
socketDescriptor=-1; socketDescriptor=-1;
} }
file_des=-1; file_des=-1;
} /*cout<<"unlocking"<<endl;
pthread_mutex_unlock(&mp);
cout<<"unlocked"<<endl;*/
}
}; };
@ -580,6 +591,6 @@ enum communicationProtocol{
int file_des; int file_des;
pthread_mutex_t mp;
}; };
#endif #endif

View File

@ -5715,7 +5715,7 @@ int* slsDetector::readFrameFromReceiver(){
std::cout<< "Received "<< n << " data bytes" << std::endl; std::cout<< "Received "<< n << " data bytes" << std::endl;
#endif #endif
if (n!=thisDetector->dataBytes+HEADERLENGTH) { if (n!=thisDetector->dataBytes+HEADERLENGTH) {
std::cout<<endl<< "wrong data size received: received " << n << " but expected " << thisDetector->dataBytes+HEADERLENGTH << std::endl; std::cout<<endl<< "wrong data size received: received " << n << " but expected from receiver " << thisDetector->dataBytes+HEADERLENGTH << std::endl;
ret=FAIL; ret=FAIL;
delete [] origVal; delete [] origVal;
delete [] retval; delete [] retval;

View File

@ -240,21 +240,26 @@ void slsDetectorUtils::acquire(int delflag){
} }
if(receiver){ if(receiver){
//send receiver file name //send receiver file name
pthread_mutex_lock(&mp); pthread_mutex_lock(&mp);
createFileName(); createFileName();
pthread_mutex_unlock(&mp); pthread_mutex_unlock(&mp);
setFileName(fileIO::getFileName());
if(setReceiverOnline()==OFFLINE_FLAG){ pthread_mutex_lock(&mg);
stopReceiver(); setFileName(fileIO::getFileName());
break; if(setReceiverOnline()==OFFLINE_FLAG){
} stopReceiver();
//start receiver pthread_mutex_unlock(&mg);
startReceiver(); break;
if(setReceiverOnline()==OFFLINE_FLAG){ }
stopReceiver(); //start receiver
break; startReceiver();
} if(setReceiverOnline()==OFFLINE_FLAG){
stopReceiver();
pthread_mutex_unlock(&mg);
break;
}
pthread_mutex_unlock(&mg);
} }
startAndReadAll(); startAndReadAll();
@ -290,6 +295,8 @@ void slsDetectorUtils::acquire(int delflag){
} else } else
break; break;
pthread_mutex_lock(&mg);
if(setReceiverOnline()==OFFLINE_FLAG){ if(setReceiverOnline()==OFFLINE_FLAG){
// wait until data processing thread has finished the data // wait until data processing thread has finished the data
@ -303,12 +310,16 @@ void slsDetectorUtils::acquire(int delflag){
usleep(100000); usleep(100000);
} }
if((*correctionMask)&(1<<WRITE_FILE)) if (getDetectorsType()==GOTTHARD)
closeDataFile(); if((*correctionMask)&(1<<WRITE_FILE))
closeDataFile();
}else{ }else{
while(stopReceiver()!=OK); while(stopReceiver()!=OK);
} }
pthread_mutex_unlock(&mg);
pthread_mutex_lock(&mp); pthread_mutex_lock(&mp);
if (*stoppedFlag==0) { if (*stoppedFlag==0) {
@ -382,14 +393,14 @@ void slsDetectorUtils::acquire(int delflag){
pthread_mutex_lock(&mg);
#ifdef VERBOSE #ifdef VERBOSE
cout << "findex incremented " << endl; cout << "findex incremented " << endl;
#endif #endif
if(*correctionMask&(1<<WRITE_FILE)) if(*correctionMask&(1<<WRITE_FILE))
IncrementFileIndex(); IncrementFileIndex();
setFileIndex(fileIO::getFileIndex()); setFileIndex(fileIO::getFileIndex());
pthread_mutex_unlock(&mg);
if (measurement_finished) if (measurement_finished)

View File

@ -119,10 +119,10 @@ return 0;
int fileIO::closeDataFile() {fflush(stdout); int fileIO::closeDataFile() {cout<<"closing datafile: filfd:"<<filefd<<endl;
if (filefd) if (filefd)
fclose(filefd); fclose(filefd);
filefd=NULL; filefd=NULL; cout<<"closed data file"<<endl;
return 0; return 0;
} }

View File

@ -13,8 +13,8 @@ postProcessing::postProcessing(): expTime(NULL), ang(NULL), val(NULL), err(NULL)
pthread_mutex_t mp1 = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t mp1 = PTHREAD_MUTEX_INITIALIZER;
mp=mp1; mp=mp1;
pthread_mutex_init(&mp, NULL); pthread_mutex_init(&mp, NULL);
// mg=mp1; mg=mp1;
// pthread_mutex_init(&mg, NULL); pthread_mutex_init(&mg, NULL);
//cout << "reg callback "<< endl; //cout << "reg callback "<< endl;
dataReady = 0; dataReady = 0;
pCallbackArg = 0; pCallbackArg = 0;
@ -99,10 +99,10 @@ void postProcessing::processFrame(int *myData, int delflag) {
cout << "writing raw data " << endl; cout << "writing raw data " << endl;
#endif #endif
if (getDetectorsType()==MYTHEN){ if (getDetectorsType()==MYTHEN){cout<<"gonna write datafile"<<endl;
// if (fdata) { // if (fdata) {
//uses static function?!?!?!? //uses static function?!?!?!?
writeDataFile (fname+string(".raw"),fdata, NULL, NULL, 'i'); writeDataFile (fname+string(".raw"),fdata, NULL, NULL, 'i');cout<<"finished writing datafile"<<endl;
} else { } else {
writeDataFile ((void*)myData, frameIndex); writeDataFile ((void*)myData, frameIndex);
} }
@ -375,19 +375,28 @@ void* postProcessing::processData(int delflag) {
} }
//receiver //receiver
else{ else{
pthread_mutex_lock(&mg);
int prevCaught=getCurrentFrameIndex(); int prevCaught=getCurrentFrameIndex();
pthread_mutex_unlock(&mg);
int caught=0; int caught=0;
while(1){ while(1){
if (checkJoinThread()) break; if (checkJoinThread()) break;
usleep(200000); usleep(200000);
pthread_mutex_lock(&mg);
caught=getCurrentFrameIndex(); caught=getCurrentFrameIndex();
pthread_mutex_unlock(&mg);
incrementProgress(caught-prevCaught); incrementProgress(caught-prevCaught);
prevCaught=caught; prevCaught=caught;
if (checkJoinThread()) break; if (checkJoinThread()) break;
//if(progress_call) //if(progress_call)
// progress_call(getCurrentProgress(),pProgressCallArg); // progress_call(getCurrentProgress(),pProgressCallArg);
pthread_mutex_lock(&mg);
int* receiverData = readFrameFromReceiver(); int* receiverData = readFrameFromReceiver();
pthread_mutex_unlock(&mg);
if(!receiverData) if(!receiverData)
return 0; return 0;
fdata=decodeData(receiverData); fdata=decodeData(receiverData);

View File

@ -398,7 +398,7 @@ char* readFrame(){
// memcpy(sendbuffer,buffer ,sizeof(buffer)); // memcpy(sendbuffer,buffer ,sizeof(buffer));
while (((int)*((int*)buffer))%2==0) ;//usleep(20000); while (((int)*((int*)buffer))%2==0) {printf("checking\n");fflush(stdout);}//usleep(20000);
// memcpy(sendbuffer,buffer ,sizeof(buffer)); // memcpy(sendbuffer,buffer ,sizeof(buffer));

View File

@ -449,7 +449,7 @@ int get_frames_caught(int file_des) {
int get_frame_index(int file_des) {printf("Getting frame Index\n");fflush(stdout); int get_frame_index(int file_des) {
int ret=OK; int ret=OK;
int n=0; int n=0;
int retval=-1; int retval=-1;
@ -467,7 +467,6 @@ int get_frame_index(int file_des) {printf("Getting frame Index\n");fflush(stdout
/* send answer */ /* send answer */
n = sendDataOnly(file_des,&ret,sizeof(ret)); n = sendDataOnly(file_des,&ret,sizeof(ret));
n = sendDataOnly(file_des,&retval,sizeof(retval)); n = sendDataOnly(file_des,&retval,sizeof(retval));
printf("returnrdf:%d\n",retval);
/*return ok/fail*/ /*return ok/fail*/
return ret; return ret;
} }