parallelized settimer, setfileindex, setonline, setreceiveronline, getreceiverstatus, resetframescaught, setFrameIndex, setFileName, getFramesCaughtByReceiver; added getFramesCaughtbyAnyreceiver to avoid threadpool and to use in post processing

This commit is contained in:
Dhanya Maliakal
2017-09-18 16:47:22 +02:00
parent 54d231d3c6
commit a95e2efdb0
10 changed files with 369 additions and 165 deletions

View File

@@ -114,6 +114,7 @@ multiSlsDetector::multiSlsDetector(int id) : slsDetectorUtils(), shmId(-1)
thisMultiDetector->onlineFlag = ONLINE_FLAG;
thisMultiDetector->receiverOnlineFlag = OFFLINE_FLAG;
thisMultiDetector->numberOfDetectors=0;
for (int id=0; id<MAXDET; id++) {
thisMultiDetector->detectorIds[id]=-1;
@@ -297,18 +298,18 @@ int multiSlsDetector::createThreadPool(){
if(threadpool){
threadpool->destroy_threadpool();
}
if(thisMultiDetector->numberOfDetectors < 1){
cout << "No detectors attached to create threadpool" << endl;
return OK;
int numthreads = thisMultiDetector->numberOfDetectors;
if(numthreads < 1){
numthreads = 1; //create threadpool anyway, threads initialized only when >1 detector added
}
threadpool = new ThreadPool(thisMultiDetector->numberOfDetectors);
threadpool = new ThreadPool(numthreads);
switch(threadpool->initialize_threadpool()){
case 0:
cerr << "Failed to initialize thread pool!" << endl;
return FAIL;
case 1:
#ifdef VERBOSE
cout << "Not initializing threads, only one detector" << endl;
cout << "Not initializing threads, not multi detector" << endl;
#endif
break;
default:
@@ -416,7 +417,7 @@ int multiSlsDetector::addSlsDetector(int id, int pos) {
}
void multiSlsDetector::updateOffsets(){
void multiSlsDetector::updateOffsets(){//cannot paralllize due to slsdetector calling this via parentdet->
#ifdef VERBOSE
cout << endl << "Updating Multi-Detector Offsets" << endl;
#endif
@@ -1067,20 +1068,45 @@ slsDetectorDefs::synchronizationMode multiSlsDetector::setSynchronization(synchr
int multiSlsDetector::setOnline(int off) {
// int retdet;
if (off!=GET_ONLINE_FLAG) {
thisMultiDetector->onlineFlag=off;
for (int i=0; i<thisMultiDetector->numberOfDetectors; i++) {
if (detectors[i]){
detectors[i]->setOnline(off);
if(detectors[i]->getErrorMask())
setErrorMask(getErrorMask()|(1<<i));
}
}
int ret=-100;
if(!threadpool){
cout << "Error in creating threadpool. Exiting" << endl;
return -1;
}else{
//return storage values
int* iret[thisMultiDetector->numberOfDetectors];
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
if(detectors[idet]){
iret[idet]= new int(-1);
Task* task = new Task(new func1_t<int,int>(&slsDetector::setOnline,
detectors[idet],off,iret[idet]));
threadpool->add_task(task);
}
}
threadpool->startExecuting();
threadpool->wait_for_tasks_to_complete();
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
if(detectors[idet]){
if(iret[idet] != NULL){
if (ret==-100)
ret=*iret[idet];
else if (ret!=*iret[idet])
ret=-1;
delete iret[idet];
}else ret=-1;
if(detectors[idet]->getErrorMask())
setErrorMask(getErrorMask()|(1<<idet));
}
}
}
thisMultiDetector->onlineFlag=ret;
}
return thisMultiDetector->onlineFlag;
};
@@ -3165,7 +3191,7 @@ dacs_t multiSlsDetector::setDAC(dacs_t val, dacIndex idac, int mV, int imod) {
for(int idet=posmin; idet<posmax; ++idet){
if(detectors[idet]){
iret[idet]= new dacs_t(-1);
Task* task = new Task(new func4_t <dacs_t,dacs_t,dacIndex,int,int>(&slsDetector::setDAC,
Task* task = new Task(new func4_t<dacs_t,dacs_t,dacIndex,int,int>(&slsDetector::setDAC,
detectors[idet],val, idac, mV, imod, iret[idet]));
threadpool->add_task(task);
}
@@ -3227,7 +3253,7 @@ dacs_t multiSlsDetector::getADC(dacIndex idac, int imod) {
for(int idet=posmin; idet<posmax; ++idet){
if(detectors[idet]){
iret[idet]= new dacs_t(-1);
Task* task = new Task(new func2_t <dacs_t,dacIndex,int>(&slsDetector::getADC,
Task* task = new Task(new func2_t<dacs_t,dacIndex,int>(&slsDetector::getADC,
detectors[idet],idac, imod, iret[idet]));
threadpool->add_task(task);
}
@@ -3625,7 +3651,7 @@ string multiSlsDetector::setNetworkParameter(networkParameter p, string s){
if (p == RECEIVER_STREAMING_PORT)
s.append("multi\0");
sret[idet]=new string("error");
Task* task = new Task(new func2_t <string,networkParameter,string>(&slsDetector::setNetworkParameter,
Task* task = new Task(new func2_t<string,networkParameter,string>(&slsDetector::setNetworkParameter,
detectors[idet],p,s,sret[idet]));
threadpool->add_task(task);
}
@@ -4281,7 +4307,7 @@ int multiSlsDetector::executeTrimming(trimMode mode, int par1, int par2, int imo
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
if(detectors[idet]){
iret[idet]= new int(-1);
Task* task = new Task(new func4_t <int,trimMode,int,int,int>(&slsDetector::executeTrimming,
Task* task = new Task(new func4_t<int,trimMode,int,int,int>(&slsDetector::executeTrimming,
detectors[idet],mode,par1,par2,imod,iret[idet]));
threadpool->add_task(task);
}
@@ -4384,7 +4410,7 @@ int multiSlsDetector::loadSettingsFile(string fname, int imod) {
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
if(detectors[idet]){
iret[idet]= new int(OK);
Task* task = new Task(new func2_t <int,string,int>(&slsDetector::loadSettingsFile,
Task* task = new Task(new func2_t<int,string,int>(&slsDetector::loadSettingsFile,
detectors[idet],fname,imod,iret[idet]));
threadpool->add_task(task);
}
@@ -4458,7 +4484,7 @@ int multiSlsDetector::setAllTrimbits(int val, int imod){
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
if(detectors[idet]){
iret[idet]= new int(-1);
Task* task = new Task(new func2_t <int,int,int>(&slsDetector::setAllTrimbits,
Task* task = new Task(new func2_t<int,int,int>(&slsDetector::setAllTrimbits,
detectors[idet],val,imod,iret[idet]));
threadpool->add_task(task);
}
@@ -4512,7 +4538,7 @@ int multiSlsDetector::loadCalibrationFile(string fname, int imod) {
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
if(detectors[idet]){
iret[idet]= new int(OK);
Task* task = new Task(new func2_t <int,string,int>(&slsDetector::loadCalibrationFile,
Task* task = new Task(new func2_t<int,string,int>(&slsDetector::loadCalibrationFile,
detectors[idet],fname,imod,iret[idet]));
threadpool->add_task(task);
}
@@ -5061,18 +5087,42 @@ int multiSlsDetector::readDataFile(string fname, int *data) {
int multiSlsDetector::setReceiverOnline(int off) {
int ret=-100,ret1;
for (int i=0; i<thisMultiDetector->numberOfDetectors; i++)
if (detectors[i]){
ret1=detectors[i]->setReceiverOnline(off);
if(detectors[i]->getErrorMask())
setErrorMask(getErrorMask()|(1<<i));
if(ret==-100)
ret=ret1;
else if (ret!=ret1)
ret=-1;
if (off != GET_ONLINE_FLAG) {
int ret=-100;
if(!threadpool){
cout << "Error in creating threadpool. Exiting" << endl;
return -1;
}else{
//return storage values
int* iret[thisMultiDetector->numberOfDetectors];
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
if(detectors[idet]){
iret[idet]= new int(-1);
Task* task = new Task(new func1_t<int,int>(&slsDetector::setReceiverOnline,
detectors[idet],off,iret[idet]));
threadpool->add_task(task);
}
}
threadpool->startExecuting();
threadpool->wait_for_tasks_to_complete();
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
if(detectors[idet]){
if(iret[idet] != NULL){
if (ret==-100)
ret=*iret[idet];
else if (ret!=*iret[idet])
ret=-1;
delete iret[idet];
}else ret=-1;
if(detectors[idet]->getErrorMask())
setErrorMask(getErrorMask()|(1<<idet));
}
}
}
return ret;
thisMultiDetector->receiverOnlineFlag=ret;
}
return thisMultiDetector->receiverOnlineFlag;
}
@@ -5120,21 +5170,41 @@ string multiSlsDetector::setFilePath(string s) {
string multiSlsDetector::setFileName(string s) {
string ret="error", ret1;
for (int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++) {
if (detectors[idet]) {
ret1=detectors[idet]->setFileName(s);
if(detectors[idet]->getErrorMask())
setErrorMask(getErrorMask()|(1<<idet));
if (ret=="error")
ret=ret1;
else if (ret!=ret1)
ret="";
}
}
return ret;
string ret = "error";
int posmax = thisMultiDetector->numberOfDetectors;
if(!threadpool){
cout << "Error in creating threadpool. Exiting" << endl;
return string("");
} else {
string* sret[thisMultiDetector->numberOfDetectors];
for(int idet=0; idet<posmax; ++idet){
if(detectors[idet]){
sret[idet]=new string("error");
Task* task = new Task(new func1_t<string,string>(&slsDetector::setFileName,
detectors[idet],s,sret[idet]));
threadpool->add_task(task);
}
}
threadpool->startExecuting();
threadpool->wait_for_tasks_to_complete();
for(int idet=0; idet<posmax; idet++){
if(detectors[idet]){
if(sret[idet]!= NULL) {
if(ret == "error")
ret = *sret[idet];
else if (ret != *sret[idet])
ret="";
delete sret[idet];
}else ret = "";
//doing nothing with the return values
if(detectors[idet]->getErrorMask())
setErrorMask(getErrorMask()|(1<<idet));
}
}
}
return ret;
}
@@ -5328,52 +5398,120 @@ slsDetectorDefs::runStatus multiSlsDetector::startReceiverReadout(){
}
slsDetectorDefs::runStatus multiSlsDetector::getReceiverStatus(){
int i=0;
runStatus ret=IDLE;
int posmin=0, posmax=thisMultiDetector->numberOfDetectors;
runStatus s = IDLE,s1 = IDLE;
i=thisMultiDetector->masterPosition;
if (thisMultiDetector->masterPosition>=0) {
if (detectors[i]) {
ret=detectors[i]->getReceiverStatus();
if(detectors[i]->getErrorMask())
setErrorMask(getErrorMask()|(1<<i));
return ret;
}
}
if (thisMultiDetector->masterPosition>=0)
if (detectors[thisMultiDetector->masterPosition]){
s = detectors[thisMultiDetector->masterPosition]->getReceiverStatus();
if(detectors[thisMultiDetector->masterPosition]->getErrorMask())
setErrorMask(getErrorMask()|(1<<thisMultiDetector->masterPosition));
return s;
}
if(!threadpool){
cout << "Error in creating threadpool. Exiting" << endl;
return ERROR;
}else{
runStatus* iret[posmax-posmin];
for(int idet=posmin; idet<posmax; idet++){
if((idet!=thisMultiDetector->masterPosition) && (detectors[idet])){
iret[idet]= new runStatus(ERROR);
Task* task = new Task(new func0_t<runStatus>(&slsDetector::getReceiverStatus,
detectors[idet],iret[idet]));
threadpool->add_task(task);
}
}
threadpool->startExecuting();
threadpool->wait_for_tasks_to_complete();
for(int idet=posmin; idet<posmax; idet++){
if((idet!=thisMultiDetector->masterPosition) && (detectors[idet])){
if(iret[idet] != NULL){
if(*iret[idet] == (int)ERROR)
ret = ERROR;
if(*iret[idet] != IDLE)
ret = *iret[idet];
delete iret[idet];
}else ret = ERROR;
if(detectors[idet]->getErrorMask())
setErrorMask(getErrorMask()|(1<<idet));
}
}
}
// if (detectors[0]) s=detectors[0]->getReceiverStatus();
for (int i=0; i<thisMultiDetector->numberOfDetectors; i++) {
s1=detectors[i]->getReceiverStatus();
if(detectors[i]->getErrorMask())
setErrorMask(getErrorMask()|(1<<i));
if (s1==ERROR)
s=ERROR;
//if (s1==IDLE && s!=IDLE)
// s=ERROR;
if (s1!=IDLE)
s = s1;
}
return s;
return ret;
}
int multiSlsDetector::getFramesCaughtByAnyReceiver() {
int ret = 0;
int i = thisMultiDetector->masterPosition;
int multiSlsDetector::getFramesCaughtByReceiver() {
int ret=0,ret1=0;
if (i >=0 ) {
if (detectors[i]){
ret=detectors[i]->getFramesCaughtByReceiver();
if(detectors[i]->getErrorMask())
setErrorMask(getErrorMask()|(1<<i));
// return master receivers frames caught
return ret;
}
}
for (int i=0; i<thisMultiDetector->numberOfDetectors; i++)
if (detectors[i]){
ret1+=detectors[i]->getFramesCaughtByReceiver();
ret=detectors[i]->getFramesCaughtByReceiver();
if(detectors[i]->getErrorMask())
setErrorMask(getErrorMask()|(1<<i));
// return the first one that works
return ret;
}
if(!thisMultiDetector->numberOfDetectors)
return ret;
ret=(int)(ret1/thisMultiDetector->numberOfDetectors);
return -1;
}
int multiSlsDetector::getFramesCaughtByReceiver() {
int ret=0,ret1=0;
int posmax = thisMultiDetector->numberOfDetectors;
if(!threadpool){
cout << "Error in creating threadpool. Exiting" << endl;
return -1;
}else{
int* iret[posmax];
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
if(detectors[idet]){
iret[idet]= new int(0);
Task* task = new Task(new func0_t<int>(&slsDetector::getFramesCaughtByReceiver,
detectors[idet],iret[idet]));
threadpool->add_task(task);
}
}
threadpool->startExecuting();
threadpool->wait_for_tasks_to_complete();
for(int idet=0; idet<posmax; idet++){
if(detectors[idet]){
if(iret[idet] != NULL){
if(*iret[idet] == -1) // could not connect
ret = -1;
else
ret1+=(*iret[idet]);
delete iret[idet];
}else ret = -1;
if(detectors[idet]->getErrorMask())
setErrorMask(getErrorMask()|(1<<idet));
}
}
}
if ((!thisMultiDetector->numberOfDetectors) || (ret == -1))
return ret;
ret=(int)(ret1/thisMultiDetector->numberOfDetectors);
return ret;
}
@@ -5397,17 +5535,36 @@ int multiSlsDetector::getReceiverCurrentFrameIndex() {
int multiSlsDetector::resetFramesCaught() {
int ret=-100, ret1;
for (int i=0; i<thisMultiDetector->numberOfDetectors; i++){
if (detectors[i]){
ret1=detectors[i]->resetFramesCaught();
if(detectors[i]->getErrorMask())
setErrorMask(getErrorMask()|(1<<i));
if (ret==-100)
ret=ret1;
else if (ret!=ret1)
ret=-1;
int ret=OK;
int posmax = thisMultiDetector->numberOfDetectors;
if(!threadpool){
cout << "Error in creating threadpool. Exiting" << endl;
return FAIL;
}else{
int* iret[posmax];
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
if(detectors[idet]){
iret[idet]= new int(OK);
Task* task = new Task(new func0_t<int>(&slsDetector::resetFramesCaught,
detectors[idet],iret[idet]));
threadpool->add_task(task);
}
}
threadpool->startExecuting();
threadpool->wait_for_tasks_to_complete();
for(int idet=0; idet<posmax; idet++){
if(detectors[idet]){
if(iret[idet] != NULL){
if(*iret[idet] != OK)
ret = FAIL;
delete iret[idet];
}else ret = FAIL;
if(detectors[idet]->getErrorMask())
setErrorMask(getErrorMask()|(1<<idet));
}
}
}
@@ -5772,21 +5929,39 @@ int multiSlsDetector::overwriteFile(int enable){
int multiSlsDetector::setFrameIndex(int index){
int ret=-100, ret1;
int ret=-100;
int posmax = thisMultiDetector->numberOfDetectors;
fileIO::setFrameIndex(index);
for (int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++) {
if (detectors[idet]) {
ret1=detectors[idet]->setFrameIndex(index);
if(detectors[idet]->getErrorMask())
setErrorMask(getErrorMask()|(1<<idet));
if (ret==-100)
ret=ret1;
else if (ret!=ret1)
ret=-1;
if(!threadpool){
cout << "Error in creating threadpool. Exiting" << endl;
return -1;
}else{
//return storage values
int* iret[thisMultiDetector->numberOfDetectors];
for(int idet=0; idet<posmax; idet++){
if(detectors[idet]){
iret[idet]= new int(-1);
Task* task = new Task(new func1_t<int,int>(&slsDetector::setFrameIndex,
detectors[idet],index,iret[idet]));
threadpool->add_task(task);
}
}
threadpool->startExecuting();
threadpool->wait_for_tasks_to_complete();
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
if(detectors[idet]){
if(iret[idet] != NULL){
if (ret==-100)
ret=*iret[idet];
else if (ret!=*iret[idet])
ret=-1;
delete iret[idet];
}else ret=-1;
if(detectors[idet]->getErrorMask())
setErrorMask(getErrorMask()|(1<<idet));
}
}
}
return ret;
}
@@ -5910,7 +6085,7 @@ int multiSlsDetector::getStreamingSocketsCreatedInClient() {
return dataSocketsStarted;
}
int multiSlsDetector::enableDataStreamingFromReceiver(int enable){
int multiSlsDetector::enableDataStreamingFromReceiver(int enable){//cannot parrallize due to serReceiver calling parentdet->enabledatastremain
//create client sockets only if no external gui
if (!thisMultiDetector->externalgui) {
@@ -5944,7 +6119,7 @@ int multiSlsDetector::enableDataStreamingFromReceiver(int enable){
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
if(detectors[idet]){
iret[idet]= new int(-1);
Task* task = new Task(new func1_t <int,int>(&slsDetector::enableDataStreamingFromReceiver,
Task* task = new Task(new func1_t<int,int>(&slsDetector::enableDataStreamingFromReceiver,
detectors[idet],enable,iret[idet]));
threadpool->add_task(task);
}
@@ -6173,7 +6348,7 @@ int multiSlsDetector::pulsePixel(int n,int x,int y) {
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
if(detectors[idet]){
iret[idet]= new int(-1);
Task* task = new Task(new func3_t <int,int,int,int>(&slsDetector::pulsePixel,
Task* task = new Task(new func3_t<int,int,int,int>(&slsDetector::pulsePixel,
detectors[idet],n,x,y,iret[idet]));
threadpool->add_task(task);
}
@@ -6210,7 +6385,7 @@ int multiSlsDetector::pulsePixelNMove(int n,int x,int y) {
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
if(detectors[idet]){
iret[idet]= new int(-1);
Task* task = new Task(new func3_t <int,int,int,int>(&slsDetector::pulsePixelNMove,
Task* task = new Task(new func3_t<int,int,int,int>(&slsDetector::pulsePixelNMove,
detectors[idet],n,x,y,iret[idet]));
threadpool->add_task(task);
}
@@ -6247,7 +6422,7 @@ int multiSlsDetector::pulseChip(int n) {
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
if(detectors[idet]){
iret[idet]= new int(-1);
Task* task = new Task(new func1_t <int,int>(&slsDetector::pulseChip,
Task* task = new Task(new func1_t<int,int>(&slsDetector::pulseChip,
detectors[idet],n,iret[idet]));
threadpool->add_task(task);
}

View File

@@ -204,6 +204,9 @@ class multiSlsDetector : public slsDetectorUtils {
/** external gui */
bool externalgui;
/** receiver online flag - is set if the receiver is connected, unset if socket connection is not possible */
int receiverOnlineFlag;
} sharedMultiSlsDetector;
@@ -1233,6 +1236,11 @@ class multiSlsDetector : public slsDetectorUtils {
*/
int getFramesCaughtByReceiver();
/** gets the number of frames caught by any one receiver (to avoid using threadpool)
\returns number of frames caught by any one receiver (master receiver if exists)
*/
int getFramesCaughtByAnyReceiver();
/** gets the current frame index of receiver
\returns current frame index of receiver
*/