proper thread handling for the REST receiver

This commit is contained in:
sala 2014-11-12 11:54:12 +01:00 committed by Manuel Guizar
parent 72bf64ff57
commit 3324667d05
4 changed files with 64 additions and 32 deletions

View File

@ -25,6 +25,7 @@
#include <sstream> #include <sstream>
#include <string> #include <string>
#include <exception> #include <exception>
#include <unistd.h>
@ -35,19 +36,21 @@ using namespace std;
class RestHelper { class RestHelper {
public: public:
RestHelper(int timeout=10, int n_tries=10){ RestHelper(int timeout=10, int n_tries=1){
/** /**
* *
* *
* @param timeout default=10 * @param timeout default=10
* @param n_tries default=3 * @param n_tries default=1
*/ */
http_timeout = timeout; http_timeout = timeout;
n_connection_tries = n_tries; n_connection_tries = n_tries;
} }
~RestHelper(){}; ~RestHelper(){
delete session;
};
void set_connection_params(int timeout, int n_tries){ void set_connection_params(int timeout, int n_tries){
@ -74,7 +77,6 @@ class RestHelper {
*/ */
//Check for http:// string //Check for http:// string
FILE_LOG(logDEBUG4) << __func__ << " starting";
string proto_str = "http://"; string proto_str = "http://";
if( size_t found = hostname.find(proto_str) != string::npos ){ if( size_t found = hostname.find(proto_str) != string::npos ){
@ -165,7 +167,7 @@ class RestHelper {
string answer; string answer;
int code = send_request(session, req, &answer); int code = send_request(session, req, &answer);
if(code == 0 ) { if(code == 0 ) {
FILE_LOG(logDEBUG4) << "REQUEST: " << " ANSWER: " << answer; FILE_LOG(logDEBUG) << __AT__ << " REQUEST: " << " ANSWER: " << answer;
json_value->loadFromString(answer); json_value->loadFromString(answer);
} }
delete uri; delete uri;
@ -285,7 +287,8 @@ class RestHelper {
n+=1; n+=1;
} }
return code; throw std::string("Cannot connect to the REST server! Please check...");
//return code;
} }
}; };

View File

@ -4,6 +4,7 @@
#include <sstream> #include <sstream>
#include <string> #include <string>
#include <stdio.h> #include <stdio.h>
#include <unistd.h>
#define STRINGIFY(x) #x #define STRINGIFY(x) #x
#define TOSTRING(x) STRINGIFY(x) #define TOSTRING(x) STRINGIFY(x)

View File

@ -53,7 +53,6 @@ void UDPBaseImplementation::initializeMembers(){
int UDPBaseImplementation::setDetectorType(detectorType det){ int UDPBaseImplementation::setDetectorType(detectorType det){
cout << "[WARNING] This is a base implementation, " << __func__ << " not correctly implemented" << endl; cout << "[WARNING] This is a base implementation, " << __func__ << " not correctly implemented" << endl;
cout << "Setting Receiver Type " << endl; cout << "Setting Receiver Type " << endl;
deleteMembers(); deleteMembers();
@ -76,7 +75,7 @@ int UDPBaseImplementation::setDetectorType(detectorType det){
return FAIL; return FAIL;
break; break;
} }
/*
//moench variables //moench variables
if(myDetectorType == GOTTHARD){ if(myDetectorType == GOTTHARD){
fifosize = GOTTHARD_FIFO_SIZE; fifosize = GOTTHARD_FIFO_SIZE;
@ -138,7 +137,7 @@ int UDPBaseImplementation::setDetectorType(detectorType det){
cout << "Ready..." << endl; cout << "Ready..." << endl;
return OK; return OK;
*/
return OK; return OK;
} }
@ -1124,7 +1123,7 @@ void UDPBaseImplementation::closeFile(int ithr)
{ {
FILE_LOG(logDEBUG) << __AT__ << "called"; FILE_LOG(logDEBUG) << __AT__ << "called";
/*
if(!dataCompression){ if(!dataCompression){
if(sfilefd){ if(sfilefd){
#ifdef VERBOSE #ifdef VERBOSE
@ -1171,7 +1170,7 @@ void UDPBaseImplementation::closeFile(int ithr)
#endif #endif
} }
*/
FILE_LOG(logDEBUG) << __AT__ << "exited"; FILE_LOG(logDEBUG) << __AT__ << "exited";
} }

View File

@ -36,7 +36,7 @@ using namespace std;
UDPRESTImplementation::UDPRESTImplementation(){ UDPRESTImplementation::UDPRESTImplementation(){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << "PID: " << getpid() << __AT__ << " called";
//TODO I do not really know what to do with bottom... //TODO I do not really know what to do with bottom...
// Default values // Default values
@ -718,20 +718,40 @@ int UDPRESTImplementation::shutDownUDPSockets(){
FILE_LOG(logDEBUG) << __AT__ << "called"; FILE_LOG(logDEBUG) << __AT__ << "called";
// this is just to be sure, it could be removed
for(int i=0;i<numListeningThreads;i++){
if(udpSocket[i]){
FILE_LOG(logDEBUG) << __AT__ << " closing UDP socket #" << i;
udpSocket[i]->ShutDownSocket();
delete udpSocket[i];
udpSocket[i] = NULL;
}
}
JsonBox::Value answer; JsonBox::Value answer;
int code; int code;
string be_state = ""; string be_state = "";
FILE_LOG(logDEBUG) << __AT__ << " numListeningThreads=" << numListeningThreads;
if (rest == NULL){
FILE_LOG(logWARNING) << __AT__ << "No REST object initialized, closing...";
return OK;
}
// getting the state
FILE_LOG(logWARNING) << "PLEASE WAIT WHILE CHECKING AND SHUTTING DOWN ALL CONNECTIONS!";
code = rest->get_json("state", &answer);
be_state = answer["state"].getString();
// LEO: this is probably wrong // LEO: this is probably wrong
cout << "AAAAAAAAAAAAA " << be_state << " " << status << endl; if (be_state == "OPEN"){
//if (be_state == "OPEN"){
while (be_state != "TRANSIENT"){ while (be_state != "TRANSIENT"){
code = rest->get_json("state", &answer); code = rest->get_json("state", &answer);
be_state = answer["state"].getString(); be_state = answer["state"].getString();
cout << "be_State: " << be_state << endl; cout << "be_State: " << be_state << endl;
usleep(10000); usleep(10000);
} }
//}
code = rest->post_json("state/close", &answer); code = rest->post_json("state/close", &answer);
std::cout <<code << " " << answer << std::endl; std::cout <<code << " " << answer << std::endl;
code = rest->post_json("state/reset", &answer); code = rest->post_json("state/reset", &answer);
@ -739,11 +759,13 @@ int UDPRESTImplementation::shutDownUDPSockets(){
code = rest->get_json("state", &answer); code = rest->get_json("state", &answer);
std::cout << code << " " << answer << std::endl; std::cout << code << " " << answer << std::endl;
}
status = slsReceiverDefs::RUN_FINISHED; status = slsReceiverDefs::RUN_FINISHED;
FILE_LOG(logDEBUG) << __AT__ << "finished"; //LEO: not sure it's needed
delete rest;
FILE_LOG(logDEBUG) << __AT__ << "finished";
return OK; return OK;
} }
@ -1289,8 +1311,9 @@ void UDPRESTImplementation::startReadout(){
void* UDPRESTImplementation::startListeningThread(void* this_pointer){ void* UDPRESTImplementation::startListeningThread(void* this_pointer){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
FILE_LOG(logDEBUG) << __AT__ << " doing a big bunch of nothing";
((UDPRESTImplementation*)this_pointer)->startListening(); //((UDPRESTImplementation*)this_pointer)->startListening();
return this_pointer; return this_pointer;
} }
@ -1299,7 +1322,9 @@ void* UDPRESTImplementation::startListeningThread(void* this_pointer){
void* UDPRESTImplementation::startWritingThread(void* this_pointer){ void* UDPRESTImplementation::startWritingThread(void* this_pointer){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
((UDPRESTImplementation*)this_pointer)->startWriting(); FILE_LOG(logDEBUG) << __AT__ << " doing a big bunch of nothing";
//((UDPRESTImplementation*)this_pointer)->startWriting();
return this_pointer; return this_pointer;
} }
@ -1310,7 +1335,9 @@ void* UDPRESTImplementation::startWritingThread(void* this_pointer){
int UDPRESTImplementation::startListening(){ int UDPRESTImplementation::startListening(){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
FILE_LOG(logDEBUG) << __AT__ << " doing a big bunch of nothing";
/*
int ithread = currentListeningThreadIndex; int ithread = currentListeningThreadIndex;
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cout << "In startListening() " << endl; cout << "In startListening() " << endl;
@ -1495,7 +1522,7 @@ int UDPRESTImplementation::startListening(){
pthread_exit(NULL); pthread_exit(NULL);
} }
} }
*/
return OK; return OK;
} }
@ -1513,6 +1540,8 @@ int UDPRESTImplementation::startListening(){
int UDPRESTImplementation::startWriting(){ int UDPRESTImplementation::startWriting(){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
FILE_LOG(logDEBUG) << __AT__ << " doing a big bunch of nothing";
/*
int ithread = currentWriterThreadIndex; int ithread = currentWriterThreadIndex;
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cout << ithread << "In startWriting()" <<endl; cout << ithread << "In startWriting()" <<endl;
@ -1698,7 +1727,7 @@ int loop;
#endif #endif
} }
*/
return OK; return OK;
} }
@ -1739,9 +1768,9 @@ void UDPRESTImplementation::startFrameIndices(int ithread){
} }
void UDPRESTImplementation::stopListening(int ithread, int rc, int &pc, int &t){ //void UDPRESTImplementation::stopListening(int ithread, int rc, int &pc, int &t){
FILE_LOG(logDEBUG) << __AT__ << " called, doing nothing"; // FILE_LOG(logDEBUG) << __AT__ << " called, doing nothing";
}; //};
/* /*
void UDPRESTImplementation::stopListening(int ithread, int rc, int &pc, int &t){ void UDPRESTImplementation::stopListening(int ithread, int rc, int &pc, int &t){