parallized setup receiver

This commit is contained in:
Dhanya Maliakal 2016-07-07 15:55:22 +02:00
parent 3019f4e6ff
commit 09d0d3ba22
5 changed files with 133 additions and 50 deletions

View File

@ -3279,13 +3279,34 @@ char* multiSlsDetector::getNetworkParameter(networkParameter p) {
char* multiSlsDetector::setNetworkParameter(networkParameter p, string s){ char* multiSlsDetector::setNetworkParameter(networkParameter p, string s){
if (s.find('+')==string::npos) { if (s.find('+')==string::npos) {
int posmin=0, posmax=thisMultiDetector->numberOfDetectors;
if(!threadpool){
cout << "Error in creating threadpool. Exiting" << endl;
return getNetworkParameter(p);
}else{
string* sret[thisMultiDetector->numberOfDetectors];
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){ for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
if(detectors[idet]){ if(detectors[idet]){
detectors[idet]->setNetworkParameter(p,s); sret[idet]=new string("error");
Task* task = new Task(new func2_t <char*,slsDetector,networkParameter,string,string>(&slsDetector::setNetworkParameter,
detectors[idet],p,s,sret[idet]));
threadpool->add_task(task);
}
}
threadpool->wait_for_tasks_to_complete();
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
if(detectors[idet]){
//doing nothing with the return values
if(detectors[idet]->getErrorMask()) if(detectors[idet]->getErrorMask())
setErrorMask(getErrorMask()|(1<<idet)); setErrorMask(getErrorMask()|(1<<idet));
} }
} }
}
} else { } else {
size_t p1=0; size_t p1=0;
size_t p2=s.find('+',p1); size_t p2=s.find('+',p1);

View File

@ -851,6 +851,7 @@ int slsDetector::initializeDetectorSize(detectorType type) {
settingsFile=thisDetector->settingsFile; settingsFile=thisDetector->settingsFile;
filePath=thisDetector->filePath; filePath=thisDetector->filePath;
pthread_mutex_lock(&ms);
fileName=parentDet->fileName; fileName=parentDet->fileName;
fileIndex=parentDet->fileIndex; fileIndex=parentDet->fileIndex;
framesPerFile=parentDet->framesPerFile; framesPerFile=parentDet->framesPerFile;
@ -862,6 +863,7 @@ int slsDetector::initializeDetectorSize(detectorType type) {
setFramesPerFile(JFRAU_MAX_FRAMES_PER_FILE); setFramesPerFile(JFRAU_MAX_FRAMES_PER_FILE);
if (thisDetector->myDetectorType==JUNGFRAUCTB) if (thisDetector->myDetectorType==JUNGFRAUCTB)
setFramesPerFile(JFCTB_MAX_FRAMES_PER_FILE); setFramesPerFile(JFCTB_MAX_FRAMES_PER_FILE);
pthread_mutex_unlock(&ms);
thisReceiver = new receiverInterface(dataSocket); thisReceiver = new receiverInterface(dataSocket);
// setAngularConversionPointer(thisDetector->angOff,&thisDetector->nMods, thisDetector->nChans*thisDetector->nChips); // setAngularConversionPointer(thisDetector->angOff,&thisDetector->nMods, thisDetector->nChans*thisDetector->nChips);
@ -1679,8 +1681,11 @@ int slsDetector::setNumberOfModules(int n, dimension d){
#endif #endif
} }
if(n != GET_FLAG) if(n != GET_FLAG){
pthread_mutex_lock(&ms);
parentDet->updateOffsets(); parentDet->updateOffsets();
pthread_mutex_unlock(&ms);
}
return thisDetector->nMod[d]; return thisDetector->nMod[d];
}; };
@ -5397,8 +5402,10 @@ char* slsDetector::setReceiver(string receiverIP){
std::cout << "file path:" << fileIO::getFilePath() << endl; std::cout << "file path:" << fileIO::getFilePath() << endl;
std::cout << "file name:" << fileIO::getFileName() << endl; std::cout << "file name:" << fileIO::getFileName() << endl;
std::cout << "file index:" << fileIO::getFileIndex() << endl; std::cout << "file index:" << fileIO::getFileIndex() << endl;
pthread_mutex_lock(&ms);
std::cout << "write enable:" << parentDet->enableWriteToFileMask() << endl; std::cout << "write enable:" << parentDet->enableWriteToFileMask() << endl;
std::cout << "overwrite enable:" << parentDet->enableOverwriteMask() << endl; std::cout << "overwrite enable:" << parentDet->enableOverwriteMask() << endl;
pthread_mutex_unlock(&ms);
std::cout << "frame index needed:" << ((thisDetector->timerValue[FRAME_NUMBER]*thisDetector->timerValue[CYCLES_NUMBER])>1) << endl; std::cout << "frame index needed:" << ((thisDetector->timerValue[FRAME_NUMBER]*thisDetector->timerValue[CYCLES_NUMBER])>1) << endl;
std::cout << "frame period:" << thisDetector->timerValue[FRAME_PERIOD] << endl; std::cout << "frame period:" << thisDetector->timerValue[FRAME_PERIOD] << endl;
std::cout << "frame number:" << thisDetector->timerValue[FRAME_NUMBER] << endl; std::cout << "frame number:" << thisDetector->timerValue[FRAME_NUMBER] << endl;
@ -5411,9 +5418,14 @@ char* slsDetector::setReceiver(string receiverIP){
setFilePath(fileIO::getFilePath()); setFilePath(fileIO::getFilePath());
setFileName(fileIO::getFileName()); setFileName(fileIO::getFileName());
setFileIndex(fileIO::getFileIndex()); setFileIndex(fileIO::getFileIndex());
enableWriteToFile(parentDet->enableWriteToFileMask()); pthread_mutex_lock(&ms);
overwriteFile(parentDet->enableOverwriteMask()); int imask = parentDet->enableWriteToFileMask();
pthread_mutex_unlock(&ms);
enableWriteToFile(imask);
pthread_mutex_lock(&ms);
imask = parentDet->enableOverwriteMask();
pthread_mutex_unlock(&ms);
overwriteFile(imask);
if ((thisDetector->timerValue[FRAME_NUMBER]*thisDetector->timerValue[CYCLES_NUMBER])>1) if ((thisDetector->timerValue[FRAME_NUMBER]*thisDetector->timerValue[CYCLES_NUMBER])>1)
setFrameIndex(0); setFrameIndex(0);
else else
@ -5706,11 +5718,12 @@ int slsDetector::configureMAC(){
} }
else if (thisDetector->myDetectorType==GOTTHARD){ else if (thisDetector->myDetectorType==GOTTHARD){
//set frames per file - only for gotthard //set frames per file - only for gotthard
pthread_mutex_lock(&ms);
if(retval==-1) if(retval==-1)
setFramesPerFile(MAX_FRAMES_PER_FILE); setFramesPerFile(MAX_FRAMES_PER_FILE);
else else
setFramesPerFile(SHORT_MAX_FRAMES_PER_FILE); setFramesPerFile(SHORT_MAX_FRAMES_PER_FILE);
pthread_mutex_unlock(&ms);
//connect to receiver //connect to receiver
if(thisDetector->receiverOnlineFlag==ONLINE_FLAG){ if(thisDetector->receiverOnlineFlag==ONLINE_FLAG){
if(setReceiverOnline(ONLINE_FLAG)==ONLINE_FLAG){ if(setReceiverOnline(ONLINE_FLAG)==ONLINE_FLAG){
@ -6709,12 +6722,14 @@ string slsDetector::setFileName(string s) {
char retval[MAX_STR_LENGTH]=""; char retval[MAX_STR_LENGTH]="";
if(!s.empty()){ if(!s.empty()){
pthread_mutex_lock(&ms);
fileIO::setFileName(s); fileIO::setFileName(s);
if(thisDetector->myDetectorType == EIGER) if(thisDetector->myDetectorType == EIGER)
parentDet->setDetectorIndex(detId); parentDet->setDetectorIndex(detId);
else if(parentDet->getNumberOfDetectors()>1) else if(parentDet->getNumberOfDetectors()>1)
parentDet->setDetectorIndex(detId); parentDet->setDetectorIndex(detId);
s=parentDet->createReceiverFilePrefix(); s=parentDet->createReceiverFilePrefix();
pthread_mutex_unlock(&ms);
} }
if(thisDetector->receiverOnlineFlag==ONLINE_FLAG){ if(thisDetector->receiverOnlineFlag==ONLINE_FLAG){
@ -6730,7 +6745,10 @@ string slsDetector::setFileName(string s) {
#ifdef VERBOSE #ifdef VERBOSE
std::cout << "Complete file prefix from receiver: " << retval << std::endl; std::cout << "Complete file prefix from receiver: " << retval << std::endl;
#endif #endif
pthread_mutex_lock(&ms);
fileIO::setFileName(parentDet->getNameFromReceiverFilePrefix(string(retval))); fileIO::setFileName(parentDet->getNameFromReceiverFilePrefix(string(retval)));
pthread_mutex_unlock(&ms);
} }
if(ret==FORCE_UPDATE) if(ret==FORCE_UPDATE)
updateReceiver(); updateReceiver();
@ -6752,8 +6770,11 @@ int slsDetector::setFileIndex(int i) {
if(thisDetector->receiverOnlineFlag==OFFLINE_FLAG){ if(thisDetector->receiverOnlineFlag==OFFLINE_FLAG){
if(i>=0) if(i>=0){
pthread_mutex_lock(&ms);
fileIO::setFileIndex(i); fileIO::setFileIndex(i);
pthread_mutex_unlock(&ms);
}
} }
else if(setReceiverOnline(ONLINE_FLAG)==ONLINE_FLAG){ else if(setReceiverOnline(ONLINE_FLAG)==ONLINE_FLAG){
@ -6763,8 +6784,11 @@ int slsDetector::setFileIndex(int i) {
if (connectData() == OK) if (connectData() == OK)
ret=thisReceiver->sendInt(fnum,retval,arg); ret=thisReceiver->sendInt(fnum,retval,arg);
disconnectData(); disconnectData();
if(ret!=FAIL) if(ret!=FAIL){
pthread_mutex_lock(&ms);
fileIO::setFileIndex(retval); fileIO::setFileIndex(retval);
pthread_mutex_unlock(&ms);
}
if(ret==FORCE_UPDATE) if(ret==FORCE_UPDATE)
updateReceiver(); updateReceiver();
} }
@ -7102,11 +7126,15 @@ int slsDetector::updateReceiverNoWait() {
cout << "Updating receiver last modified by " << lastClientIP << std::endl; cout << "Updating receiver last modified by " << lastClientIP << std::endl;
#endif #endif
n = dataSocket->ReceiveDataOnly(&ind,sizeof(ind)); n = dataSocket->ReceiveDataOnly(&ind,sizeof(ind));
pthread_mutex_lock(&ms);
fileIO::setFileIndex(ind); fileIO::setFileIndex(ind);
pthread_mutex_unlock(&ms);
n = dataSocket->ReceiveDataOnly(path,MAX_STR_LENGTH); n = dataSocket->ReceiveDataOnly(path,MAX_STR_LENGTH);
fileIO::setFilePath(path); fileIO::setFilePath(path);
n = dataSocket->ReceiveDataOnly(path,MAX_STR_LENGTH); n = dataSocket->ReceiveDataOnly(path,MAX_STR_LENGTH);
pthread_mutex_lock(&ms);
fileIO::setFileName(path); fileIO::setFileName(path);
pthread_mutex_unlock(&ms);
return OK; return OK;
} }
@ -7176,8 +7204,11 @@ int slsDetector::enableWriteToFile(int enable){
if(thisDetector->receiverOnlineFlag==OFFLINE_FLAG){ if(thisDetector->receiverOnlineFlag==OFFLINE_FLAG){
if(enable>=0) if(enable>=0){
pthread_mutex_lock(&ms);
parentDet->enableWriteToFileMask(enable); parentDet->enableWriteToFileMask(enable);
pthread_mutex_unlock(&ms);
}
} }
else if(setReceiverOnline(ONLINE_FLAG)==ONLINE_FLAG){ else if(setReceiverOnline(ONLINE_FLAG)==ONLINE_FLAG){
@ -7187,13 +7218,20 @@ int slsDetector::enableWriteToFile(int enable){
if (connectData() == OK) if (connectData() == OK)
ret=thisReceiver->sendInt(fnum,retval,arg); ret=thisReceiver->sendInt(fnum,retval,arg);
disconnectData(); disconnectData();
if(ret!=FAIL) if(ret!=FAIL){
pthread_mutex_lock(&ms);
parentDet->enableWriteToFileMask(retval); parentDet->enableWriteToFileMask(retval);
pthread_mutex_unlock(&ms);
}
if(ret==FORCE_UPDATE) if(ret==FORCE_UPDATE)
updateReceiver(); updateReceiver();
} }
return parentDet->enableWriteToFileMask(); pthread_mutex_lock(&ms);
retval = parentDet->enableWriteToFileMask();
pthread_mutex_unlock(&ms);
return retval;
} }
@ -7207,8 +7245,11 @@ int slsDetector::overwriteFile(int enable){
if(thisDetector->receiverOnlineFlag==OFFLINE_FLAG){ if(thisDetector->receiverOnlineFlag==OFFLINE_FLAG){
if(enable>=0) if(enable>=0){
pthread_mutex_lock(&ms);
parentDet->enableOverwriteMask(enable); parentDet->enableOverwriteMask(enable);
pthread_mutex_unlock(&ms);
}
} }
else if(setReceiverOnline(ONLINE_FLAG)==ONLINE_FLAG){ else if(setReceiverOnline(ONLINE_FLAG)==ONLINE_FLAG){
@ -7218,13 +7259,20 @@ int slsDetector::overwriteFile(int enable){
if (connectData() == OK) if (connectData() == OK)
ret=thisReceiver->sendInt(fnum,retval,arg); ret=thisReceiver->sendInt(fnum,retval,arg);
disconnectData(); disconnectData();
if(ret!=FAIL) if(ret!=FAIL){
pthread_mutex_lock(&ms);
parentDet->enableOverwriteMask(retval); parentDet->enableOverwriteMask(retval);
pthread_mutex_unlock(&ms);
}
if(ret==FORCE_UPDATE) if(ret==FORCE_UPDATE)
updateReceiver(); updateReceiver();
} }
return parentDet->enableOverwriteMask(); pthread_mutex_lock(&ms);
retval = parentDet->enableOverwriteMask();
pthread_mutex_unlock(&ms);
return retval;
} }
@ -7291,10 +7339,14 @@ int slsDetector::calibratePedestal(int frames){
int64_t slsDetector::clearAllErrorMask(){ int64_t slsDetector::clearAllErrorMask(){
clearErrorMask(); clearErrorMask();
pthread_mutex_lock(&ms);
for(int i=0;i<parentDet->getNumberOfDetectors();i++){ for(int i=0;i<parentDet->getNumberOfDetectors();i++){
if(parentDet->getDetectorId(i) == getDetectorId()) if(parentDet->getDetectorId(i) == getDetectorId())
parentDet->setErrorMask(parentDet->getErrorMask()|(0<<i)); parentDet->setErrorMask(parentDet->getErrorMask()|(0<<i));
} }
pthread_mutex_unlock(&ms);
return getErrorMask(); return getErrorMask();
} }

View File

@ -31,6 +31,9 @@ postProcessing::postProcessing(): expTime(NULL), ang(NULL), val(NULL), err(NULL)
pthread_mutex_init(&mp, NULL); pthread_mutex_init(&mp, NULL);
mg=mp1; mg=mp1;
pthread_mutex_init(&mg, NULL); pthread_mutex_init(&mg, NULL);
ms=mp1;
pthread_mutex_init(&ms, NULL);
//cout << "reg callback "<< endl; //cout << "reg callback "<< endl;
dataReady = 0; dataReady = 0;
pCallbackArg = 0; pCallbackArg = 0;

View File

@ -278,6 +278,9 @@ s
/** mutex to synchronizedata processing and plotting threads */ /** mutex to synchronizedata processing and plotting threads */
pthread_mutex_t mg; pthread_mutex_t mg;
/** mutex to synchronize slsdetector threads */
pthread_mutex_t ms;
/** sets when the acquisition is finished */ /** sets when the acquisition is finished */
int jointhread; int jointhread;

View File

@ -94,26 +94,27 @@ private:
class Task: public virtual slsDetectorDefs{ class Task: public virtual slsDetectorDefs{
public: public:
/* Return: int, Param: int */ /* Return: int, Param: int */
Task(func1_t <int,slsDetector,int,int>* t): m1(t),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0){}; Task(func1_t <int,slsDetector,int,int>* t): m1(t),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0){};
/* Return: int, Param: string,int */ /* Return: int, Param: string,int */
Task(func2_t <int,slsDetector,string,int,int>* t): m1(0),m2(t),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0){}; Task(func2_t <int,slsDetector,string,int,int>* t): m1(0),m2(t),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0){};
/* Return: string, Param: string */ /* Return: string, Param: string */
Task(func1_t <string,slsDetector,string,string>* t): m1(0),m2(0),m3(t),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0){}; Task(func1_t <string,slsDetector,string,string>* t): m1(0),m2(0),m3(t),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0){};
/* Return: char*, Param: char* */ /* Return: char*, Param: char* */
Task(func1_t <char*,slsDetector,char*,string>* t): m1(0),m2(0),m3(0),m4(t),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0){}; Task(func1_t <char*,slsDetector,char*,string>* t): m1(0),m2(0),m3(0),m4(t),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0){};
/* Return: detectorSettings, Param: int */ /* Return: detectorSettings, Param: int */
Task(func1_t <detectorSettings,slsDetector,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(t),m6(0),m7(0),m8(0),m9(0),m10(0){}; Task(func1_t <detectorSettings,slsDetector,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(t),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0){};
/* Return: detectorSettings, Param: detectorSettings,int */ /* Return: detectorSettings, Param: detectorSettings,int */
Task(func2_t <detectorSettings,slsDetector,detectorSettings,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(t),m7(0),m8(0),m9(0),m10(0){}; Task(func2_t <detectorSettings,slsDetector,detectorSettings,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(t),m7(0),m8(0),m9(0),m10(0),m11(0){};
/* Return: int, Param: int,int */ /* Return: int, Param: int,int */
Task(func2_t <int,slsDetector,int,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(t),m8(0),m9(0),m10(0){}; Task(func2_t <int,slsDetector,int,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(t),m8(0),m9(0),m10(0),m11(0){};
/* Return: int, Param: int,int */ /* Return: int, Param: int,int */
Task(func3_t <int,slsDetector,int,int,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(t),m9(0),m10(0){}; Task(func3_t <int,slsDetector,int,int,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(t),m9(0),m10(0),m11(0){};
/* Return: int, Param: trimMode,int,int,int */ /* Return: int, Param: trimMode,int,int,int */
Task(func4_t <int,slsDetector,trimMode,int,int,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(t),m10(0){}; Task(func4_t <int,slsDetector,trimMode,int,int,int,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(t),m10(0),m11(0){};
/* Return: int, Param: int */ /* Return: int, Param: int */
Task(func0_t <int,slsDetector,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(t){}; Task(func0_t <int,slsDetector,int>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(t),m11(0){};
/* Return: char*, Param: networkParameter,string,string */
Task(func2_t <char*,slsDetector,networkParameter,string,string>* t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(t){};
~Task(){} ~Task(){}
void operator()(){ void operator()(){
@ -127,6 +128,7 @@ public:
else if(m8) (*m8)(); else if(m8) (*m8)();
else if(m9) (*m9)(); else if(m9) (*m9)();
else if(m10) (*m10)(); else if(m10) (*m10)();
else if(m11) (*m11)();
} }
private: private:
@ -150,6 +152,8 @@ private:
func4_t <int,slsDetector,trimMode,int,int,int,int>* m9; func4_t <int,slsDetector,trimMode,int,int,int,int>* m9;
/* Return: int, Param: int */ /* Return: int, Param: int */
func0_t <int,slsDetector,int>* m10; func0_t <int,slsDetector,int>* m10;
/* Return: char*, Param: networkParameter,string,string */
func2_t <char*,slsDetector,networkParameter,string,string>* m11;
}; };