This commit is contained in:
Matej Sekoranja
2011-01-10 23:44:33 +01:00
3 changed files with 510 additions and 73 deletions

View File

@@ -92,13 +92,11 @@ SearchTimer::SearchTimer(ChannelSearchManager* _chanSearchManager, int32 timerIn
_allowSlowdown(allowSlowdown),
_requestPendingChannels(new ArrayFIFO<SearchInstance*>),
_responsePendingChannels(new ArrayFIFO<SearchInstance*>),
_timerNode(NULL),
_timerNode(new TimerNode(this)),
_canceled(false),
_timeAtResponseCheck(0),
_requestPendingChannelsMutex(Mutex()),
_mutex(Mutex())
_timeAtResponseCheck(0)
{
_timerNode = new TimerNode(this);
}
SearchTimer::~SearchTimer()
@@ -111,8 +109,12 @@ SearchTimer::~SearchTimer()
void SearchTimer::shutdown()
{
Lock guard(&_mutex); //the whole method is locked
if(_canceled) return;
_canceled = true;
{
Lock guard(&_volMutex);
if(_canceled) return;
_canceled = true;
}
{
Lock guard(&_requestPendingChannelsMutex);
@@ -128,7 +130,6 @@ void SearchTimer::installChannel(SearchInstance* channel)
Lock guard(&_mutex); //the whole method is locked
if(_canceled) return;
Lock pendingChannelGuard(&_requestPendingChannelsMutex);
bool startImmediately = _requestPendingChannels->isEmpty();
channel->addAndSetListOwnership(_requestPendingChannels, &_requestPendingChannelsMutex, _timerIndex);
@@ -156,7 +157,7 @@ void SearchTimer::moveChannels(SearchTimer* destination)
while((channel = _responsePendingChannels->pop()) != NULL)
{
{
Lock guard(&_mutex);
Lock guard(&_volMutex);
if(_searchAttempts > 0)
{
_searchAttempts--;
@@ -181,7 +182,7 @@ void SearchTimer::timerStopped()
void SearchTimer::callback()
{
{
Lock guard(&_mutex);
Lock guard(&_volMutex);
if(_canceled) return;
}
@@ -189,7 +190,7 @@ void SearchTimer::callback()
// boost search period (if necessary) for channels not recently searched
int32 searchRespones;
{
Lock guard(&_mutex);
Lock guard(&_volMutex);
searchRespones = _searchRespones;
}
if(_allowBoost && searchRespones > 0)
@@ -236,7 +237,7 @@ void SearchTimer::callback()
int32 searchRespones,searchAttempts;
{
Lock guard(&_mutex);
Lock guard(&_volMutex);
searchAttempts = _searchAttempts;
searchRespones = _searchRespones;
}
@@ -272,7 +273,7 @@ void SearchTimer::callback()
{
Lock guard(&_mutex);
Lock guard(&_volMutex);
_startSequenceNumber = _chanSearchManager->getSequenceNumber() + 1;
_searchAttempts = 0;
_searchRespones = 0;
@@ -284,56 +285,66 @@ void SearchTimer::callback()
// reschedule
bool canceled;
{
Lock guard(&_mutex);
Lock guard(&_volMutex);
canceled = _canceled;
}
{
Lock guard(&_requestPendingChannelsMutex);
while (!canceled && (channel = _requestPendingChannels->pop()) != NULL)
{
channel->unsetListOwnership();
channel = _requestPendingChannels->pop();
}
while (!canceled && channel != NULL)
{
channel->unsetListOwnership();
bool requestSent = true;
bool allowNewFrame = (framesSent+1) < _framesPerTry;
bool frameWasSent = _chanSearchManager->generateSearchRequestMessage(channel, allowNewFrame);
if(frameWasSent)
bool requestSent = true;
bool allowNewFrame = (framesSent+1) < _framesPerTry;
bool frameWasSent = _chanSearchManager->generateSearchRequestMessage(channel, allowNewFrame);
if(frameWasSent)
{
framesSent++;
triesInFrame = 0;
if(!allowNewFrame)
{
framesSent++;
triesInFrame = 0;
if(!allowNewFrame)
{
channel->addAndSetListOwnership(_requestPendingChannels, &_requestPendingChannelsMutex, _timerIndex);
requestSent = false;
}
else
{
triesInFrame++;
}
channel->addAndSetListOwnership(_requestPendingChannels, &_requestPendingChannelsMutex, _timerIndex);
requestSent = false;
}
else
{
triesInFrame++;
}
}
else
{
triesInFrame++;
}
if(requestSent)
if(requestSent)
{
channel->addAndSetListOwnership(_responsePendingChannels, &_responsePendingChannelsMutex, _timerIndex);
Lock guard(&_volMutex);
if(_searchAttempts < INT_MAX)
{
channel->addAndSetListOwnership(_responsePendingChannels, &_requestPendingChannelsMutex, _timerIndex);
Lock guard(&_mutex);
if(_searchAttempts < INT_MAX)
{
_searchAttempts++;
}
_searchAttempts++;
}
}
// limit
if(triesInFrame == 0 && !allowNewFrame) break;
// limit
if(triesInFrame == 0 && !allowNewFrame) break;
Lock guard(&_mutex);
{
Lock guard(&_volMutex);
canceled = _canceled;
}
{
Lock guard(&_requestPendingChannelsMutex);
channel = _requestPendingChannels->pop();
}
}
// flush out the search request buffer
if(triesInFrame > 0)
{
@@ -341,11 +352,12 @@ void SearchTimer::callback()
framesSent++;
}
_endSequenceNumber = _chanSearchManager->getSequenceNumber();
// reschedule
{
Lock guard(&_mutex);
Lock guard(&_volMutex);
_endSequenceNumber = _chanSearchManager->getSequenceNumber();
// reschedule
canceled = _canceled;
}
Lock guard(&_requestPendingChannelsMutex);
@@ -363,12 +375,12 @@ void SearchTimer::searchResponse(int32 responseSequenceNumber, bool isSequenceNu
{
bool validResponse = true;
{
Lock guard(&_mutex);
Lock guard(&_volMutex);
if(_canceled) return;
if(isSequenceNumberValid)
{
validResponse = _startSequenceNumber <= _chanSearchManager->_sequenceNumber && _chanSearchManager->_sequenceNumber <= _endSequenceNumber;
validResponse = _startSequenceNumber <= _chanSearchManager->getSequenceNumber() && _chanSearchManager->getSequenceNumber() <= _endSequenceNumber;
}
}
@@ -378,7 +390,7 @@ void SearchTimer::searchResponse(int32 responseSequenceNumber, bool isSequenceNu
{
const int64 dt = responseTime - _chanSearchManager->getTimeAtLastSend();
_chanSearchManager->updateRTTE(dt);
Lock guard(&_mutex);
Lock guard(&_volMutex);
if(_searchRespones < INT_MAX)
{
_searchRespones++;
@@ -436,7 +448,7 @@ ChannelSearchManager::ChannelSearchManager(Context* context):
// create timers
_timers = new SearchTimer*[numberOfTimers];
for(int i = 0; i < numberOfTimers; i++)
for(int32 i = 0; i < numberOfTimers; i++)
{
_timers[i] = new SearchTimer(this, i, i > _beaconAnomalyTimerIndex, i != (numberOfTimers-1));
}
@@ -447,7 +459,7 @@ ChannelSearchManager::ChannelSearchManager(Context* context):
ChannelSearchManager::~ChannelSearchManager()
{
for(int i = 0; i < _numberOfTimers; i++)
for(int32 i = 0; i < _numberOfTimers; i++)
{
if(_timers[i]) delete _timers[i];
}
@@ -459,9 +471,13 @@ ChannelSearchManager::~ChannelSearchManager()
void ChannelSearchManager::cancel()
{
Lock guard(&_mutex);
if(_canceled) return;
_canceled = true;
{
Lock guard(&_volMutex);
if(_canceled) return;
_canceled = true;
}
if(_timers != NULL)
{
@@ -474,15 +490,18 @@ void ChannelSearchManager::cancel()
int32 ChannelSearchManager::registeredChannelCount()
{
Lock guard(&_mutex);
Lock guard(&_channelMutex);
return _channels.size();
}
void ChannelSearchManager::registerChannel(SearchInstance* channel)
{
Lock guard(&_mutex);
if(_canceled) return;
{
Lock guard(&_volMutex);
if(_canceled) return;
}
Lock guard(&_channelMutex);
//overrides if already registered
_channels[channel->getSearchInstanceID()] = channel;
_timers[0]->installChannel(channel);
@@ -490,7 +509,7 @@ void ChannelSearchManager::registerChannel(SearchInstance* channel)
void ChannelSearchManager::unregisterChannel(SearchInstance* channel)
{
Lock guard(&_mutex);
Lock guard(&_channelMutex);
_channelsIter = _channels.find(channel->getSearchInstanceID());
if(_channelsIter != _channels.end())
{
@@ -502,7 +521,7 @@ void ChannelSearchManager::unregisterChannel(SearchInstance* channel)
void ChannelSearchManager::searchResponse(int32 cid, int32 seqNo, int8 minorRevision, osiSockAddr* serverAddress)
{
Lock guard(&_mutex);
Lock guard(&_channelMutex);
// first remove
SearchInstance* si = NULL;
_channelsIter = _channels.find(cid);
@@ -543,7 +562,7 @@ void ChannelSearchManager::beaconAnomalyNotify()
void ChannelSearchManager::initializeSendBuffer()
{
Lock guard(&_mutex);
Lock guard(&_volMutex);
_sequenceNumber++;
@@ -567,6 +586,7 @@ void ChannelSearchManager::initializeSendBuffer()
void ChannelSearchManager::flushSendBuffer()
{
Lock guard(&_mutex);
Lock volGuard(&_volMutex);
TimeStamp now;
now.getCurrent();
_timeAtLastSend = now.getMilliseconds();
@@ -604,28 +624,28 @@ void ChannelSearchManager::boostSearching(SearchInstance* channel, int32 timerIn
inline void ChannelSearchManager::updateRTTE(long rtt)
{
Lock guard(&_mutex);
Lock guard(&_volMutex);
const double error = rtt - _rttmean;
_rttmean += error / 4.0;
}
inline double ChannelSearchManager::getRTTE()
{
Lock guard(&_mutex);
Lock guard(&_volMutex);
double rtte = min(max((double)_rttmean, (double)MIN_RTT), (double)MAX_RTT);
return rtte;
}
inline int32 ChannelSearchManager::getSequenceNumber()
{
Lock guard(&_mutex);
Lock guard(&_volMutex);
int32 retval = _sequenceNumber;
return retval;
}
inline int64 ChannelSearchManager::getTimeAtLastSend()
{
Lock guard(&_mutex);
Lock guard(&_volMutex);
int64 retval = _timeAtLastSend;
return retval;
}

View File

@@ -24,7 +24,7 @@ using namespace epics::pvData;
namespace epics { namespace pvAccess {
//TODO check the const of paramerers
//TODO check the const of parameters
/**
* SearchInstance.
@@ -52,7 +52,7 @@ public:
*/
virtual void unsetListOwnership() = 0;
/**
* Adds this search instance into the provided list and sets it as the owner of this search instance.
* Adds this search instance into the provided list and set it as the owner of this search instance.
*
* @param newOwner a list to which this search instance is added.
* @param ownerMutex mutex belonging to the newOwner list. The mutex will be locked beofe any modification
@@ -232,10 +232,18 @@ private:
* Mutex for request pending channel list.
*/
Mutex _requestPendingChannelsMutex;
/**
* Mutex for request pending channel list.
*/
Mutex _responsePendingChannelsMutex;
/**
* General mutex.
*/
Mutex _mutex;
/**
* Volatile varialbe mutex.
*/
Mutex _volMutex;
/**
* Max search tries per frame.
*/
@@ -377,6 +385,14 @@ private:
* General mutex.
*/
Mutex _mutex;
/**
* Channel mutex.
*/
Mutex _channelMutex;
/**
* Volatile variable mutex.
*/
Mutex _volMutex;
/**
* Mock transport send control
*/

View File

@@ -1,39 +1,440 @@
/* testChannelSearcManager.cpp */
#include <channelSearchManager.h>
#include <sstream>
using namespace epics::pvData;
using namespace epics::pvAccess;
//TODO this will be deleted
class ChannelImpl;
class ContextImpl : public Context
{
private:
Timer* _timer;
public:
ContextImpl()
{
_timer = new Timer("krneki",lowPriority);
}
virtual Version* getVersion() {
return NULL;
}
virtual ChannelProvider* getProvider() {
return NULL;
}
Timer* getTimer()
{
return _timer;
}
virtual void initialize() {
}
virtual void printInfo() {
}
virtual void printInfo(epics::pvData::StringBuilder out) {
}
virtual void destroy()
{
}
virtual void dispose()
{
}
BlockingUDPTransport* getSearchTransport()
{
return NULL;
}
/**
* Searches for a channel with given channel ID.
* @param channelID CID.
* @return channel with given CID, <code></code> if non-existent.
*/
Channel* getChannel(pvAccessID channelID)
{
return NULL;
}
Configuration* getConfiguration() {return NULL;}
TransportRegistry* getTransportRegistry() {return NULL;}
~ContextImpl() { delete _timer;};
private:
void loadConfiguration() {
}
void internalInitialize() {
}
void initializeUDPTransport() {
}
void internalDestroy() {
}
void destroyAllChannels() {
}
/**
* Check channel name.
*/
void checkChannelName(String& name) {
}
/**
* Check context state and tries to establish necessary state.
*/
void checkState() {
}
/**
* Generate Client channel ID (CID).
* @return Client channel ID (CID).
*/
pvAccessID generateCID()
{
return 0;
}
/**
* Free generated channel ID (CID).
*/
void freeCID(int cid)
{
}
/**
* Get, or create if necessary, transport of given server address.
* @param serverAddress required transport address
* @param priority process priority.
* @return transport for given address
*/
Transport* getTransport(TransportClient* client, osiSockAddr* serverAddress, int minorRevision, int priority)
{
return NULL;
}
/**
* Internal create channel.
*/
// TODO no minor version with the addresses
// TODO what if there is an channel with the same name, but on different host!
Channel* createChannelInternal(String name, ChannelRequester* requester, short priority,
InetAddrVector* addresses) {
return NULL;
}
/**
* Destroy channel.
* @param channel
* @param force
* @throws CAException
* @throws IllegalStateException
*/
void destroyChannel(ChannelImpl* channel, bool force) {
}
/**
* Get channel search manager.
* @return channel search manager.
*/
ChannelSearchManager* getChannelSearchManager() {
return NULL;
}
};
class TestSearcInstance : public BaseSearchInstance
{
public:
TestSearcInstance(string channelName, pvAccessID channelID): _channelID(channelID), _channelName(channelName) {}
pvAccessID getSearchInstanceID() { return _channelID;};
string getSearchInstanceName() {return _channelName;};
string getSearchInstanceName() {return _channelName;};
void searchResponse(int8 minorRevision, osiSockAddr* serverAddress) {};
private:
pvAccessID _channelID;
string _channelName;
};
static const int max_channels = 100;
ContextImpl* context = new ContextImpl();
ChannelSearchManager* manager = new ChannelSearchManager(static_cast<Context*>(context));
TestSearcInstance** chanArray = new TestSearcInstance*[max_channels];
void* testWorker1(void* p)
{
for(int i = 0; i < 1000; i++)
{
for(int j = 0; j < max_channels/2; j++)
{
manager->unregisterChannel(chanArray[j]);
usleep(100);
manager->registerChannel(chanArray[j]);
}
}
return NULL;
}
void* testWorker2(void* p)
{
for(int i = 0; i < 1000; i++)
{
for(int j = max_channels/2; j < max_channels; j++)
{
manager->unregisterChannel(chanArray[j]);
usleep(100);
manager->registerChannel(chanArray[j]);
manager->beaconAnomalyNotify();
}
}
return NULL;
}
int main(int argc,char *argv[])
{
//ClientContextImpl* context = new ClientContextImpl();
Context* context = 0; // TODO will crash...
ChannelSearchManager* manager = new ChannelSearchManager(context);
pthread_t _worker1Id;
pthread_t _worker2Id;
TestSearcInstance* chan1 = new TestSearcInstance("chan1", 1);
manager->registerChannel(chan1);
ostringstream obuffer;
for(int i = 0; i < max_channels; i++)
{
obuffer.clear();
obuffer.str("");
obuffer << i;
string name = "chan" + obuffer.str();
chanArray[i] = new TestSearcInstance(name.c_str(), i);
manager->registerChannel(chanArray[i]);
}
sleep(3);
//create two threads
int32 retval = pthread_create(&_worker1Id, NULL, testWorker1, NULL);
if(retval != 0)
{
assert(true);
}
retval = pthread_create(&_worker2Id, NULL, testWorker2, NULL);
if(retval != 0)
{
assert(true);
}
retval = pthread_join(_worker1Id, NULL);
if(retval != 0)
{
assert(true);
}
retval = pthread_join(_worker2Id, NULL);
if(retval != 0)
{
assert(true);
}
manager->cancel();
//context->destroy();
context->destroy();
getShowConstructDestruct()->constuctDestructTotals(stdout);
//if(chan1) delete chan1;
for(int i = 0; i < max_channels; i++)
{
if(chanArray[i]) delete chanArray[i];
}
if(chanArray) delete [] chanArray;
if(manager) delete manager;
if(context) delete context;
return(0);