Connection time optimized

This commit is contained in:
2017-11-14 09:18:27 +01:00
parent 4d9a8fd116
commit 350ddab997
47 changed files with 350899 additions and 701 deletions

View File

@@ -93,6 +93,9 @@ int Connect::createChannel(unsigned int handle, const char * pv, chid &pCh) {
if (status != ECA_NORMAL) {
cout << __FILE__ << "//" << __LINE__ << "//"<< __METHOD__ << " ca_create_channel failed: " << endl;
if (status == ECA_EVDISALLOW) {
cout << __FILE__ << "//" << __LINE__ << "//"<< __METHOD__ << " inappropriate function " << endl;
}
cafeStatus.report(status);
if(MUTEX){cafeMutex.lock();} //lock
@@ -101,18 +104,58 @@ int Connect::createChannel(unsigned int handle, const char * pv, chid &pCh) {
return (int) status;
}
//What is the POLICY!??
//To Flush or to Pend this is the question!
if (channelOpenPolicy.getWhenToFlushSendBuffer()==FLUSH_AFTER_EACH_CHANNEL_CREATION) {
using namespace boost::posix_time;
ptime timeStart(microsec_clock::local_time());
double timeElapsed=0;
unsigned int nPoll=0;
channelOpenPolicy.flushSendBufferNow();
channelOpenPolicy.flushSendBufferNow();
ptime timeEnd(microsec_clock::local_time());
time_duration duration(timeEnd-timeStart);
timeElapsed= (double) duration.total_microseconds()/1000000.0;
while ( !(*it_handle).isConnected() && timeElapsed <= channelOpenPolicy.getTimeout()){
#if HAVE_BOOST_THREAD
boost::this_thread::sleep_for(boost::chrono::microseconds(1000));
#else
#if HAVE_LINUX
usleep(1000);
#endif
#endif
++nPoll;
ptime timeEnd(microsec_clock::local_time());
time_duration duration(timeEnd-timeStart);
timeElapsed= (double) duration.total_microseconds()/1000000.0;
}
/*
if ( (*it_handle).isConnected() ) {
cout << (*it_handle).getPV() << " // is connected //" << endl;
}
else {
cout << (*it_handle).getPV() << " // is not connected //" << endl;
}
cout << "timeElapsed for open " << timeElapsed << " nPoll = " << nPoll << endl;
*/
}
// Peculiar if true
/*
if (status == ECA_EVDISALLOW) {
cout << __FILE__ << "//" << __LINE__ << "//"<< __METHOD__ << " inappropriate function " << endl;
cafeStatus.report(status);
@@ -120,6 +163,11 @@ int Connect::createChannel(unsigned int handle, const char * pv, chid &pCh) {
handle_index.modify(it_handle, change_status (status) );
if(MUTEX){cafeMutex.unlock();} //unlock
}
*/
//
}
else {
return ECAFE_INVALID_HANDLE;
@@ -1124,19 +1172,17 @@ int Connect::open(const char * _pv, const char * _pvAlias, unsigned int & hand
if (it_handle != handle_index.end()) {
if ((*it_handle).getChannelID() != NULL && cctLocal != NULL ) {
status = ca_clear_channel((*it_handle).getChannelID());
if (status != ECA_NORMAL) {
return status; //ECA_BADCHID
}
status = ca_pend_io(channelClosePolicy.getTimeout());
status = ca_pend_io(channelClosePolicy.getTimeout()); //See channelClosePolicy.setPolicy in connect.h
//channelClosePolicy.flushSendBufferNow();
if(MUTEX){cafeMutex.lock();}
handle_index.modify(it_handle, free_dataBuffers());
if(MUTEX){cafeMutex.unlock();}
@@ -1192,7 +1238,7 @@ int Connect::open(const char * _pv, const char * _pvAlias, unsigned int & hand
* \return ECA_NORMAL if all OK; error if one or more channelIDs fail to close
*/
int Connect::closeHandles(unsigned int * handleArray, unsigned int nHandles) {
#define __METHOD__ "Connect::closeHandles(unsigned int *) "
#define __METHOD__ "Connect::closeHandles(unsigned int *, unsigned int) "
//copy array to vector and remove dupliacte handles
@@ -1373,6 +1419,339 @@ int Connect::open(const char * _pv, const char * _pvAlias, unsigned int & hand
}
/**
* \brief Closes channel connections (even if in other thread) but does not delete handle. \n
* Note that this does NOT cause the channel's disconnect handler to be called. \n
* It does however invoke event handlers for subscriptions (i.e., monitors). \n
* The Conduit handle is NOT erased
* \param handle input
* \return ECA_NORMAL if all OK else error
*/
int Connect::closeChannelKeepHandle(unsigned int handle) {
#define __METHOD__ "Connect::closeChannelKeepHandle"
//We can close handle irrespective of ca_current_context!
ca_client_context * cctLocal= ca_current_context();
status=ICAFE_NORMAL;
ChannelRegalia channelRegalia;
cafeConduit_set_by_handle & handle_index = cs.get<by_handle> ();
cafeConduit_set_by_handle::iterator it_handle;
it_handle = handle_index.find(handle);
bool statusFlag=false;
if (it_handle != handle_index.end()) {
if ((*it_handle).getChannelID() != NULL && cctLocal != NULL ) {
Connect::monitorStop(handle);
status = ca_clear_channel((*it_handle).getChannelID());
if (status != ECA_NORMAL) {
if(MUTEX){cafeMutex.lock();} //lock
handle_index.modify(it_handle, change_status (status) );
if(MUTEX){cafeMutex.unlock();} //unlock
return status; //ECA_BADCHID
}
channelRegalia=(*it_handle).getChannelRegalia();
//status = ca_pend_io(channelClosePolicy.getTimeout()); //See channelClosePolicy.setPolicy in connect.h
channelClosePolicy.flushSendBufferNow();
//Do not want to free buffers(!)
//if(MUTEX){cafeMutex.lock();}
//handle_index.modify(it_handle, free_dataBuffers());
//if(MUTEX){cafeMutex.unlock();}
statusFlag=true;
}
}
else {
// Loop through all elements
for (itcs = cs.begin(); itcs != cs.end();) {
//Conduit cc = *itcs;
//operator overload prints handle, pv and nelem
if ((*itcs).handle == handle) {
if ((*itcs).getChannelID() != NULL && cctLocal != NULL) {
Connect::monitorStop(handle);
status = ca_clear_channel((*itcs).getChannelID());
if (status != ECA_NORMAL) {
if(MUTEX){cafeMutex.lock();} //lock
handle_index.modify(it_handle, change_status (status) );
if(MUTEX){cafeMutex.unlock();} //unlock
return status; //ECA_BADCHID
}
channelRegalia=(*it_handle).getChannelRegalia();
//status = ca_pend_io(channelClosePolicy.getTimeout());
channelClosePolicy.flushSendBufferNow();
//if(MUTEX){cafeMutex.lock();}
//handle_index.modify(itcs, free_dataBuffers());
//if(MUTEX){cafeMutex.unlock();}
}
statusFlag=true;
break;
}
else {
++itcs;
}
}
return ECAFE_INVALID_HANDLE;
}
if (statusFlag) {
if(MUTEX){cafeMutex.lock();} //lock
handle_index.modify(it_handle, change_status (ICAFE_CS_CLOSED) );
if(MUTEX){cafeMutex.unlock();} //unlock
//Also callback done
ChannelRequestStatus channelRequestStatusGet;
ChannelRequestStatus channelRequestStatusPut;
channelRequestStatusGet.setCallbackKind(false, true); //fake completion
channelRequestStatusPut.setCallbackKind(false, true); //fake completion
if(MUTEX){cafeMutex.lock();} //lock
handle_index.modify(it_handle, change_channelRequestStatusGet (channelRequestStatusGet) );
handle_index.modify(it_handle, change_channelRequestStatusPut (channelRequestStatusPut) );
if(MUTEX){cafeMutex.unlock();} //unlock
channelRegalia.setCafeConnectionState(ICAFE_CS_CLOSED);
channelRegalia.setConnectFlag(false);
channelRegalia.setConnectionState(ICAFE_CA_OP_CONN_DOWN);
channelRegalia.channelID=NULL;
if(MUTEX){cafeMutex.lock();} //lock
handle_index.modify(it_handle, change_channelRegalia (channelRegalia) );
if(MUTEX){cafeMutex.unlock();} //unlock
}
return status;
#undef __METHOD__
}
/**
* \brief Closes given channel connections (even if in other threads) but does not deletes their handles. \n
* Note that this does NOT cause the channel's disconnect handler to be called. \n
* It does however invoke event subscriptions (for monitors). \n
* All corresponding Conduit handles are not erased.
* \param handleArray input: unsigned int *
* \param nHandles input: unsigned int
* \return ECA_NORMAL if all OK; error if one or more channelIDs fail to close
*/
int Connect::closeChannelsKeepHandles(unsigned int * handleArray, unsigned int nHandles) {
#define __METHOD__ "Connect::closeChannelsKeepHandles(unsigned int *, unsigned int) "
//copy array to vector and remove duplicate handles
vector<unsigned int> vec;
vec.reserve(nHandles);
vec.insert(vec.end(), &handleArray[0], &handleArray[nHandles]);
//Remove duplicate values
sort( vec.begin(), vec.end() );
vec.erase( unique( vec.begin(), vec.end() ), vec.end() );
int statusGroup = ECA_NORMAL;
int status = ICAFE_NORMAL;
bool isClearChannel = false;
ChannelRegalia channelRegalia;
vector<bool> isClearChannelV;
isClearChannelV.reserve(vec.size());
cafeConduit_set_by_handle & handle_index = cs.get<by_handle> ();
cafeConduit_set_by_handle::iterator it_handle;
if (ca_current_context() != NULL) {
for (unsigned int i=0; i<vec.size(); ++i) {
isClearChannelV.push_back(false);
it_handle = handle_index.find(vec[i]);
if (it_handle != handle_index.end()) {
if ((*it_handle).getChannelID() != NULL ) {
Connect::monitorStop(vec[i]);
status = ca_clear_channel((*it_handle).getChannelID());
if (status != ECA_NORMAL) {
statusGroup=status; //ECA_BADCHID
}
}
isClearChannelV[i]=true;
isClearChannel=true;
}
else {
// Loop through all elements
for (itcs = cs.begin(); itcs != cs.end(); ++itcs) {
//if ((*itcs).getHandle()==handleArray[i]) {
if ((*itcs).getHandle()==vec[i]) {
if ((*itcs).getChannelID() != NULL ) {
Connect::monitorStop(vec[i]);
status = ca_clear_channel((*itcs).getChannelID());
//cout << "clear channel with handle: " << (*itcs).getHandle() << endl;
if (status != ECA_NORMAL){
statusGroup = status;
}
isClearChannelV[i]=true;
isClearChannel=true;
}
break;
}
}
}// else
} //for
}//if ca_current_context != NULL
if (isClearChannel) {
channelClosePolicy.flushSendBufferNow();
//status = ca_pend_io(channelClosePolicy.getTimeout());
//if (statusGroup == ECA_NORMAL) {
// statusGroup = status;
//}
//for (unsigned int i=0; i<nHandles; ++i) {
//it_handle = handle_index.find(handleArray[i]);
for (unsigned int i=0; i<vec.size(); ++i) {
it_handle = handle_index.find(vec[i]);
if (it_handle != handle_index.end() && isClearChannelV[i]) {
//if(MUTEX){cafeMutex.lock();}
//handle_index.modify(it_handle, free_dataBuffers());
//if(MUTEX){cafeMutex.unlock();}
if(MUTEX){cafeMutex.lock();} //lock
handle_index.modify(it_handle, change_status (ICAFE_CS_CLOSED) );
if(MUTEX){cafeMutex.unlock();} //unlock
//Also callback done
ChannelRequestStatus channelRequestStatusGet;
ChannelRequestStatus channelRequestStatusPut;
channelRequestStatusGet.setCallbackKind(false, true); //fake completion
channelRequestStatusPut.setCallbackKind(false, true); //fake completion
if(MUTEX){cafeMutex.lock();} //lock
handle_index.modify(it_handle, change_channelRequestStatusGet (channelRequestStatusGet) );
handle_index.modify(it_handle, change_channelRequestStatusPut (channelRequestStatusPut) );
if(MUTEX){cafeMutex.unlock();} //unlock
channelRegalia=(*it_handle).getChannelRegalia();
channelRegalia.setCafeConnectionState(ICAFE_CS_CLOSED);
channelRegalia.setConnectFlag(false);
channelRegalia.setConnectionState(ICAFE_CA_OP_CONN_DOWN);
channelRegalia.channelID=NULL;
if(MUTEX){cafeMutex.lock();} //lock
handle_index.modify(it_handle, change_channelRegalia (channelRegalia) );
if(MUTEX){cafeMutex.unlock();} //unlock
}
else {
// Loop through all elements
for (itcs = cs.begin(); itcs != cs.end();) {
//if ((*itcs).handle == handleArray[i]) {
if ((*itcs).handle == vec[i] && isClearChannelV[i]) {
//if(MUTEX){cafeMutex.lock();}
//handle_index.modify(itcs, free_dataBuffers());
// if(MUTEX){cafeMutex.unlock();}
if(MUTEX){cafeMutex.lock();} //lock
handle_index.modify(itcs, change_status (ICAFE_CS_CLOSED) );
if(MUTEX){cafeMutex.unlock();} //unlock
//Also callback done
ChannelRequestStatus channelRequestStatusGet;
ChannelRequestStatus channelRequestStatusPut;
channelRequestStatusGet.setCallbackKind(false, true); //fake completion
channelRequestStatusPut.setCallbackKind(false, true); //fake completion
if(MUTEX){cafeMutex.lock();} //lock
handle_index.modify(itcs, change_channelRequestStatusGet (channelRequestStatusGet) );
handle_index.modify(itcs, change_channelRequestStatusPut (channelRequestStatusPut) );
if(MUTEX){cafeMutex.unlock();} //unlock
channelRegalia=(*itcs).getChannelRegalia();
channelRegalia.setCafeConnectionState(ICAFE_CS_CLOSED);
channelRegalia.setConnectFlag(false);
channelRegalia.setConnectionState(ICAFE_CA_OP_CONN_DOWN);
channelRegalia.channelID=NULL;
if(MUTEX){cafeMutex.lock();} //lock
handle_index.modify(itcs, change_channelRegalia (channelRegalia) );
if(MUTEX){cafeMutex.unlock();} //unlock
break;
}
else {
++itcs;
}
} //for itcs
}//if else
}//for
} //isClearChannel
return statusGroup;
#undef __METHOD__
}
/**
* \brief Closes channels for the given channel access client context. \n
* Shuts down the 'local' channel access client context and frees allocated resources \n
@@ -1829,7 +2208,8 @@ int Connect::monitorStart(vector<unsigned int> handleV, vector<int> &statusV,
//FIRST CHECK IF CONNECTED!
//Once connected, then we have this information stored
//
if ((*it_handle).getChannelRegalia().getCafeConnectionState() != ICAFE_CS_NEVER_CONN ) {
if ((*it_handle).getChannelRegalia().getCafeConnectionState() != ICAFE_CS_NEVER_CONN
&& (*it_handle).getChannelRegalia().getCafeConnectionState() != ICAFE_CS_CLOSED ) {
mp.setDataType((*it_handle).getChannelRegalia().getDataType());
mp.setNelem((*it_handle).getChannelRegalia().getNelem());
}
@@ -2186,6 +2566,74 @@ int Connect::monitorStart(vector<unsigned int> handleV, vector<int> &statusV,
}
/**
* \brief send the command to the ioc to open channels
* \param _timeout input: max pend time to establish connections \n
*/
void Connect::openNowAndWait(double _timeout) {
double dto = channelOpenPolicy.getTimeout();
channelOpenPolicy.setTimeout(_timeout);
//channelOpenPolicy.flushSendBufferNow();
//Time lapsed
using namespace boost::posix_time;
ptime timeStart(microsec_clock::local_time());
double timeElapsed=0;
unsigned int nPoll=0;
channelOpenPolicy.flushSendBufferNow();
ptime timeEnd(microsec_clock::local_time());
time_duration duration(timeEnd-timeStart);
timeElapsed= (double) duration.total_microseconds()/1000000.0;
while ( !handleHelper.allChannelsConnected() && (timeElapsed <= channelOpenPolicy.getTimeout())){
#if HAVE_BOOST_THREAD
boost::this_thread::sleep_for(boost::chrono::microseconds(1000));
#else
#if HAVE_LINUX
usleep(1000);
#endif
#endif
++nPoll;
ptime timeEnd(microsec_clock::local_time());
time_duration duration(timeEnd-timeStart);
timeElapsed= (double) duration.total_microseconds()/1000000.0;
}
/*
if ( handleHelper.allChannelsConnected() ) {
cout << " // all is connected //" << endl;
}
else {
cout << " // all is not connected //" << endl;
}
cout << "timeElapsed for open " << timeElapsed << " nPoll = " << nPoll << endl;
*/
//reset
channelOpenPolicy.setWhenToFlushSendBuffer(FLUSH_NOW);
channelOpenPolicy.setFlushSendBufferKind(WITH_POLL); //PEND_EVENT);
//channelOpenPolicy.setTimeoutToDefault();
channelOpenPolicy.setTimeout(dto);
return;
}
/**
* \brief print status information of given handle
* \param handle input: handle to Conduit object \n
@@ -2214,6 +2662,7 @@ int Connect::monitorStart(vector<unsigned int> handleV, vector<int> &statusV,
}
/**
* \brief print status information of given handle only on error
* \param handle input: handle to Conduit object \n
@@ -2394,7 +2843,7 @@ int Connect::monitorStart(vector<unsigned int> handleV, vector<int> &statusV,
/**
* \brief print status information of given PVs only on error
* \param handleArray input: array of Handles to Conduit objects \n
* \param pvArray input: array of PVnames \n
* \param nelem input: size of array of handles
* \param statusArray input: array of statuses \n
* \return ECA_NORMAL if all OK else ECAFE_INVALID_HANDLE (if one or more handles are invalid)
@@ -2419,6 +2868,9 @@ int Connect::monitorStart(vector<unsigned int> handleV, vector<int> &statusV,
/**
* \brief Closes all channels within the given context and their respective handles \n
* Shuts down the given channel access client context and frees allocated resources \n