ChannelSearchManager implementation
This commit is contained in:
632
pvAccessApp/remote/channelSearchManager.cpp
Normal file
632
pvAccessApp/remote/channelSearchManager.cpp
Normal file
@@ -0,0 +1,632 @@
|
||||
/*
|
||||
* channelSearchManager.cpp
|
||||
*/
|
||||
|
||||
#include "channelSearchManager.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
namespace epics { namespace pvAccess {
|
||||
|
||||
const int BaseSearchInstance::DATA_COUNT_POSITION = CA_MESSAGE_HEADER_SIZE + sizeof(int32)/sizeof(int8) + 1;
|
||||
const int BaseSearchInstance::PAYLOAD_POSITION = sizeof(int16)/sizeof(int8) + 2;
|
||||
|
||||
void BaseSearchInstance::unsetListOwnership()
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
_owner = NULL;
|
||||
}
|
||||
|
||||
void BaseSearchInstance::addAndSetListOwnership(ArrayFIFO<SearchInstance*>* newOwner, Mutex* ownerMutex, int32 index)
|
||||
{
|
||||
if(ownerMutex == NULL) throw BaseException("Null owner mutex", __FILE__,__LINE__);
|
||||
|
||||
_ownerMutex = ownerMutex;
|
||||
Lock ownerGuard(_ownerMutex);
|
||||
Lock guard(&_mutex);
|
||||
newOwner->push(this);
|
||||
_owner = newOwner;
|
||||
_ownerIndex = index;
|
||||
}
|
||||
|
||||
void BaseSearchInstance::removeAndUnsetListOwnership()
|
||||
{
|
||||
if(_owner == NULL) return;
|
||||
|
||||
if(_ownerMutex == NULL) throw BaseException("Null owner mutex", __FILE__,__LINE__);
|
||||
Lock ownerGuard(_ownerMutex);
|
||||
Lock guard(&_mutex);
|
||||
if(_owner != NULL)
|
||||
{
|
||||
_owner->remove(this);
|
||||
_owner = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
int32 BaseSearchInstance::getOwnerIndex()
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
int32 retval = _ownerIndex;
|
||||
return retval;
|
||||
}
|
||||
|
||||
bool BaseSearchInstance::generateSearchRequestMessage(ByteBuffer* requestMessage, TransportSendControl* control)
|
||||
{
|
||||
int16 dataCount = requestMessage->getShort(DATA_COUNT_POSITION);
|
||||
|
||||
dataCount++;
|
||||
if(dataCount >= MAX_SEARCH_BATCH_COUNT)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
const string name = getChannelName();
|
||||
// not nice...
|
||||
const int addedPayloadSize = sizeof(int32)/sizeof(int8) + (1 + sizeof(int32)/sizeof(int8) + name.length());
|
||||
|
||||
if(requestMessage->getRemaining() < addedPayloadSize)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
requestMessage->putInt(getChannelID());
|
||||
SerializeHelper::serializeString(name, requestMessage, control);
|
||||
|
||||
requestMessage->putInt(PAYLOAD_POSITION, requestMessage->getPosition() - CA_MESSAGE_HEADER_SIZE);
|
||||
requestMessage->putShort(DATA_COUNT_POSITION, dataCount);
|
||||
return true;
|
||||
}
|
||||
|
||||
const int32 SearchTimer::MAX_FRAMES_PER_TRY = 64;
|
||||
|
||||
SearchTimer::SearchTimer(ChannelSearchManager* _chanSearchManager, int32 timerIndex, bool allowBoost, bool allowSlowdown):
|
||||
_chanSearchManager(_chanSearchManager),
|
||||
_searchAttempts(0),
|
||||
_searchRespones(0),
|
||||
_framesPerTry(1),
|
||||
_framesPerTryCongestThresh(DBL_MAX),
|
||||
_startSequenceNumber(0),
|
||||
_endSequenceNumber(0),
|
||||
_timerIndex(timerIndex),
|
||||
_allowBoost(allowBoost),
|
||||
_allowSlowdown(allowSlowdown),
|
||||
_requestPendingChannels(new ArrayFIFO<SearchInstance*>),
|
||||
_responsePendingChannels(new ArrayFIFO<SearchInstance*>),
|
||||
_timerNode(NULL),
|
||||
_canceled(false),
|
||||
_timeAtResponseCheck(0),
|
||||
_requestPendingChannelsMutex(Mutex()),
|
||||
_mutex(Mutex())
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
SearchTimer::~SearchTimer()
|
||||
{
|
||||
if(_requestPendingChannels) delete _requestPendingChannels;
|
||||
if(_responsePendingChannels) delete _responsePendingChannels;
|
||||
}
|
||||
|
||||
void SearchTimer::shutdown()
|
||||
{
|
||||
Lock guard(&_mutex); //the whole method is locked
|
||||
if(_canceled) return;
|
||||
_canceled = true;
|
||||
|
||||
{
|
||||
Lock guard(&_requestPendingChannelsMutex);
|
||||
_timerNode->cancel();
|
||||
|
||||
_requestPendingChannels->clear();
|
||||
_responsePendingChannels->clear();
|
||||
}
|
||||
}
|
||||
|
||||
void SearchTimer::installChannel(SearchInstance* channel)
|
||||
{
|
||||
Lock guard(&_mutex); //the whole method is locked
|
||||
if(_canceled) return;
|
||||
|
||||
|
||||
Lock pendingChannelGuard(&_requestPendingChannelsMutex);
|
||||
bool startImmediately = _requestPendingChannels->isEmpty();
|
||||
channel->addAndSetListOwnership(_requestPendingChannels, &_requestPendingChannelsMutex, _timerIndex);
|
||||
|
||||
// start searching
|
||||
if(startImmediately)
|
||||
{
|
||||
_timerNode->cancel();
|
||||
if(_timeAtResponseCheck == 0)
|
||||
{
|
||||
TimeStamp current;
|
||||
current.getCurrent();
|
||||
_timeAtResponseCheck = current.getMilliseconds();
|
||||
}
|
||||
|
||||
// start with some initial delay (to collect all installed requests)
|
||||
_chanSearchManager->_context->getTimer()->scheduleAfterDelay(_timerNode, 0.01);
|
||||
}
|
||||
}
|
||||
|
||||
void SearchTimer::moveChannels(SearchTimer* destination)
|
||||
{
|
||||
// do not sync this, not necessary and might cause deadlock
|
||||
SearchInstance* channel;
|
||||
while((channel = _responsePendingChannels->pop()) != NULL)
|
||||
{
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
if(_searchAttempts > 0)
|
||||
{
|
||||
_searchAttempts--;
|
||||
}
|
||||
}
|
||||
destination->installChannel(channel);
|
||||
}
|
||||
|
||||
// bulk move
|
||||
Lock guard(&_requestPendingChannelsMutex);
|
||||
while (!_requestPendingChannels->isEmpty())
|
||||
{
|
||||
destination->installChannel(_requestPendingChannels->pop());
|
||||
}
|
||||
}
|
||||
|
||||
void SearchTimer::timerStopped()
|
||||
{
|
||||
//noop
|
||||
}
|
||||
|
||||
void SearchTimer::callback()
|
||||
{
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
if(_canceled) return;
|
||||
}
|
||||
|
||||
// if there was some success (no congestion)
|
||||
// boost search period (if necessary) for channels not recently searched
|
||||
int32 searchRespones;
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
searchRespones = _searchRespones;
|
||||
}
|
||||
if(_allowBoost && searchRespones > 0)
|
||||
{
|
||||
Lock guard(&_requestPendingChannelsMutex);
|
||||
while(!_requestPendingChannels->isEmpty())
|
||||
{
|
||||
SearchInstance* channel = _requestPendingChannels->peek();
|
||||
// boost needed check
|
||||
//final int boostIndex = searchRespones >= searchAttempts * SUCCESS_RATE ? Math.min(Math.max(0, timerIndex - 1), beaconAnomalyTimerIndex) : beaconAnomalyTimerIndex;
|
||||
const int boostIndex = _chanSearchManager->_beaconAnomalyTimerIndex;
|
||||
if(channel->getOwnerIndex() > boostIndex)
|
||||
{
|
||||
_requestPendingChannels->pop();
|
||||
channel->unsetListOwnership();
|
||||
_chanSearchManager->boostSearching(channel, boostIndex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SearchInstance* channel;
|
||||
|
||||
// should we check results (installChannel trigger timer immediately)
|
||||
TimeStamp current;
|
||||
current.getCurrent();
|
||||
int64 now = current.getMilliseconds();
|
||||
if(now - _timeAtResponseCheck >= period())
|
||||
{
|
||||
_timeAtResponseCheck = now;
|
||||
|
||||
// notify about timeout (move it to other timer)
|
||||
while((channel = _responsePendingChannels->pop()) != NULL)
|
||||
{
|
||||
if(_allowSlowdown)
|
||||
{
|
||||
channel->unsetListOwnership();
|
||||
_chanSearchManager->searchResponseTimeout(channel, _timerIndex);
|
||||
}
|
||||
else
|
||||
{
|
||||
channel->addAndSetListOwnership(_requestPendingChannels, &_requestPendingChannelsMutex, _timerIndex);
|
||||
}
|
||||
}
|
||||
|
||||
int32 searchRespones,searchAttempts;
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
searchAttempts = _searchAttempts;
|
||||
searchRespones = _searchRespones;
|
||||
}
|
||||
// check search results
|
||||
if(searchAttempts > 0)
|
||||
{
|
||||
// increase UDP frames per try if we have a good score
|
||||
if(searchRespones >= searchAttempts * ChannelSearchManager::SUCCESS_RATE)
|
||||
{
|
||||
// increase frames per try
|
||||
// a congestion avoidance threshold similar to TCP is now used
|
||||
if(_framesPerTry < MAX_FRAMES_PER_TRY)
|
||||
{
|
||||
if(_framesPerTry < _framesPerTryCongestThresh)
|
||||
{
|
||||
_framesPerTry = min(2*_framesPerTry, _framesPerTryCongestThresh);
|
||||
}
|
||||
else
|
||||
{
|
||||
_framesPerTry += 1.0/_framesPerTry;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// decrease frames per try, fallback
|
||||
_framesPerTryCongestThresh = _framesPerTry / 2.0;
|
||||
_framesPerTry = 1;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
_startSequenceNumber = _chanSearchManager->getSequenceNumber() + 1;
|
||||
_searchAttempts = 0;
|
||||
_searchRespones = 0;
|
||||
}
|
||||
|
||||
int32 framesSent = 0;
|
||||
int32 triesInFrame = 0;
|
||||
|
||||
// reschedule
|
||||
bool canceled;
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
canceled = _canceled;
|
||||
}
|
||||
|
||||
{
|
||||
Lock guard(&_requestPendingChannelsMutex);
|
||||
while (!canceled && (channel = _requestPendingChannels->pop()) != NULL)
|
||||
{
|
||||
channel->unsetListOwnership();
|
||||
|
||||
bool requestSent = true;
|
||||
bool allowNewFrame = (framesSent+1) < _framesPerTry;
|
||||
bool frameWasSent = _chanSearchManager->generateSearchRequestMessage(channel, allowNewFrame);
|
||||
if(frameWasSent)
|
||||
{
|
||||
framesSent++;
|
||||
triesInFrame = 0;
|
||||
if(!allowNewFrame)
|
||||
{
|
||||
channel->addAndSetListOwnership(_requestPendingChannels, &_requestPendingChannelsMutex, _timerIndex);
|
||||
requestSent = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
triesInFrame++;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
triesInFrame++;
|
||||
}
|
||||
|
||||
if(requestSent)
|
||||
{
|
||||
channel->addAndSetListOwnership(_responsePendingChannels, &_requestPendingChannelsMutex, _timerIndex);
|
||||
Lock guard(&_mutex);
|
||||
if(_searchAttempts < INT_MAX)
|
||||
{
|
||||
_searchAttempts++;
|
||||
}
|
||||
}
|
||||
|
||||
// limit
|
||||
if(triesInFrame == 0 && !allowNewFrame) break;
|
||||
|
||||
Lock guard(&_mutex);
|
||||
canceled = _canceled;
|
||||
}
|
||||
}
|
||||
|
||||
// flush out the search request buffer
|
||||
if(triesInFrame > 0)
|
||||
{
|
||||
_chanSearchManager->flushSendBuffer();
|
||||
framesSent++;
|
||||
}
|
||||
|
||||
_endSequenceNumber = _chanSearchManager->getSequenceNumber();
|
||||
|
||||
// reschedule
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
canceled = _canceled;
|
||||
}
|
||||
Lock guard(&_requestPendingChannelsMutex);
|
||||
if(!canceled && !_timerNode->isScheduled())
|
||||
{
|
||||
bool someWorkToDo = (!_requestPendingChannels->isEmpty() || !_responsePendingChannels->isEmpty());
|
||||
if(someWorkToDo)
|
||||
{
|
||||
_chanSearchManager->_context->getTimer()->scheduleAfterDelay(_timerNode, period()/1000.0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void SearchTimer::searchResponse(int32 responseSequenceNumber, bool isSequenceNumberValid, int64 responseTime)
|
||||
{
|
||||
bool validResponse = true;
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
if(_canceled) return;
|
||||
|
||||
if(isSequenceNumberValid)
|
||||
{
|
||||
validResponse = _startSequenceNumber <= _chanSearchManager->_sequenceNumber && _chanSearchManager->_sequenceNumber <= _endSequenceNumber;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// update RTTE
|
||||
if(validResponse)
|
||||
{
|
||||
const int64 dt = responseTime - _chanSearchManager->getTimeAtLastSend();
|
||||
_chanSearchManager->updateRTTE(dt);
|
||||
Lock guard(&_mutex);
|
||||
if(_searchRespones < INT_MAX)
|
||||
{
|
||||
_searchRespones++;
|
||||
|
||||
// all found, send new search requests immediately if necessary
|
||||
if(_searchRespones == _searchAttempts)
|
||||
{
|
||||
if(_requestPendingChannels->size() > 0)
|
||||
{
|
||||
_timerNode->cancel();
|
||||
_chanSearchManager->_context->getTimer()->scheduleAfterDelay(_timerNode, 0.0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const int64 SearchTimer::period()
|
||||
{
|
||||
return (int64) ((1 << _timerIndex) * _chanSearchManager->getRTTE());
|
||||
}
|
||||
|
||||
const int64 ChannelSearchManager::MIN_RTT = 32;
|
||||
const int64 ChannelSearchManager::MAX_RTT = 2 * ChannelSearchManager::MIN_RTT;
|
||||
const double ChannelSearchManager::SUCCESS_RATE = 0.9;
|
||||
const int64 ChannelSearchManager::MAX_SEARCH_PERIOD = 5 * 60000;
|
||||
const int64 ChannelSearchManager::MAX_SEARCH_PERIOD_LOWER_LIMIT = 60000;
|
||||
const int64 ChannelSearchManager::BEACON_ANOMALY_SEARCH_PERIOD = 5000;
|
||||
const int32 ChannelSearchManager::MAX_TIMERS = 18;
|
||||
|
||||
ChannelSearchManager::ChannelSearchManager(ClientContextImpl* context):
|
||||
_context(context),
|
||||
_canceled(false),
|
||||
_rttmean(MIN_RTT),
|
||||
_sequenceNumber(0)
|
||||
{
|
||||
// create and initialize send buffer
|
||||
_sendBuffer = new ByteBuffer(MAX_UDP_SEND);
|
||||
initializeSendBuffer();
|
||||
|
||||
// TODO should be configurable
|
||||
int64 maxPeriod = MAX_SEARCH_PERIOD;
|
||||
|
||||
maxPeriod = min(maxPeriod, MAX_SEARCH_PERIOD_LOWER_LIMIT);
|
||||
|
||||
// calculate number of timers to reach maxPeriod (each timer period is doubled)
|
||||
double powerOfTwo = log(maxPeriod / (double)MIN_RTT) / log(2);
|
||||
int32 numberOfTimers = (int32)(powerOfTwo + 1);
|
||||
numberOfTimers = min(numberOfTimers, MAX_TIMERS);
|
||||
|
||||
// calculate beacon anomaly timer index
|
||||
powerOfTwo = log(BEACON_ANOMALY_SEARCH_PERIOD / (double)MIN_RTT) / log(2);
|
||||
_beaconAnomalyTimerIndex = (int32)(powerOfTwo + 1);
|
||||
_beaconAnomalyTimerIndex = min(_beaconAnomalyTimerIndex, numberOfTimers - 1);
|
||||
|
||||
// create timers
|
||||
_timers = new SearchTimer*[numberOfTimers];
|
||||
for(int i = 0; i < numberOfTimers; i++)
|
||||
{
|
||||
_timers[i] = new SearchTimer(this, i, i > _beaconAnomalyTimerIndex, i != (numberOfTimers-1));
|
||||
}
|
||||
_numberOfTimers = numberOfTimers;
|
||||
}
|
||||
|
||||
ChannelSearchManager::~ChannelSearchManager()
|
||||
{
|
||||
for(int i = 0; i < _numberOfTimers; i++)
|
||||
{
|
||||
if(_timers[i]) delete _timers[i];
|
||||
}
|
||||
if(_timers) delete[] _timers;
|
||||
if(_sendBuffer) delete _sendBuffer;
|
||||
}
|
||||
|
||||
void ChannelSearchManager::cancel()
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
if(_canceled) return;
|
||||
|
||||
_canceled = true;
|
||||
|
||||
if(_timers != NULL)
|
||||
{
|
||||
for(int i = 0; i < _numberOfTimers; i++)
|
||||
{
|
||||
_timers[i]->shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32 ChannelSearchManager::registeredChannelCount()
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
return _channels.size();
|
||||
}
|
||||
|
||||
void ChannelSearchManager::registerChannel(SearchInstance* channel)
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
if(_canceled) return;
|
||||
|
||||
//overrides if already registered
|
||||
_channels[channel->getChannelID()] = channel;
|
||||
_timers[0]->installChannel(channel);
|
||||
}
|
||||
|
||||
void ChannelSearchManager::unregisterChannel(SearchInstance* channel)
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
_channelsIter = _channels.find(channel->getChannelID());
|
||||
if(_channelsIter != _channels.end())
|
||||
{
|
||||
_channels.erase(channel->getChannelID());
|
||||
}
|
||||
|
||||
channel->removeAndUnsetListOwnership();
|
||||
}
|
||||
|
||||
void ChannelSearchManager::searchResponse(int32 cid, int32 seqNo, int8 minorRevision, osiSockAddr* serverAddress)
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
// first remove
|
||||
SearchInstance* si = NULL;
|
||||
_channelsIter = _channels.find(cid);
|
||||
if(_channelsIter != _channels.end())
|
||||
{
|
||||
SearchInstance* si = _channelsIter->second;
|
||||
_channels.erase(_channelsIter);
|
||||
si->removeAndUnsetListOwnership();
|
||||
}
|
||||
else
|
||||
{
|
||||
// minor hack to enable duplicate reports
|
||||
si = static_cast<SearchInstance*>(_context->getChannel(cid));
|
||||
if(si != NULL)
|
||||
{
|
||||
si->searchResponse(minorRevision, serverAddress);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// report success
|
||||
const int timerIndex = si->getOwnerIndex();
|
||||
TimeStamp now;
|
||||
now.getCurrent();
|
||||
_timers[timerIndex]->searchResponse(seqNo, seqNo != 0, now.getMilliseconds());
|
||||
|
||||
// then notify SearchInstance
|
||||
si->searchResponse(minorRevision, serverAddress);
|
||||
}
|
||||
|
||||
void ChannelSearchManager::beaconAnomalyNotify()
|
||||
{
|
||||
for(int i = _beaconAnomalyTimerIndex + 1; i < _numberOfTimers; i++)
|
||||
{
|
||||
_timers[i]->moveChannels(_timers[_beaconAnomalyTimerIndex]);
|
||||
}
|
||||
}
|
||||
|
||||
void ChannelSearchManager::initializeSendBuffer()
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
_sequenceNumber++;
|
||||
|
||||
|
||||
// new buffer
|
||||
_sendBuffer->clear();
|
||||
_sendBuffer->putShort(CA_MAGIC_AND_VERSION);
|
||||
_sendBuffer->putByte((int8)0); // data
|
||||
_sendBuffer->putByte((int8)3); // beacon
|
||||
_sendBuffer->putInt(sizeof(int32)/sizeof(int8) + 1); // "zero" payload
|
||||
_sendBuffer->putInt(_sequenceNumber);
|
||||
|
||||
/*
|
||||
final boolean REQUIRE_REPLY = false;
|
||||
sendBuffer.put(REQUIRE_REPLY ? (byte)QoS.REPLY_REQUIRED.getMaskValue() : (byte)QoS.DEFAULT.getMaskValue());
|
||||
*/
|
||||
|
||||
//TODO implement Qos
|
||||
//_sendBuffer->put((int8)QoS.DEFAULT.getMaskValue());
|
||||
_sendBuffer->putShort((int16)0); // count
|
||||
}
|
||||
|
||||
void ChannelSearchManager::flushSendBuffer()
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
TimeStamp now;
|
||||
now.getCurrent();
|
||||
_timeAtLastSend = now.getMilliseconds();
|
||||
//TODO
|
||||
//_context->getSearchTransport()->send(sendBuffer);
|
||||
initializeSendBuffer();
|
||||
}
|
||||
|
||||
bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance* channel, bool allowNewFrame)
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
bool success = channel->generateSearchRequestMessage(_sendBuffer, _mockTransportSendControl);
|
||||
// buffer full, flush
|
||||
if(!success)
|
||||
{
|
||||
flushSendBuffer();
|
||||
if(allowNewFrame)
|
||||
{
|
||||
channel->generateSearchRequestMessage(_sendBuffer, _mockTransportSendControl);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void ChannelSearchManager::searchResponseTimeout(SearchInstance* channel, int32 timerIndex)
|
||||
{
|
||||
int32 newTimerIndex = min(++timerIndex, _numberOfTimers - 1);
|
||||
_timers[newTimerIndex]->installChannel(channel);
|
||||
}
|
||||
|
||||
void ChannelSearchManager::boostSearching(SearchInstance* channel, int32 timerIndex)
|
||||
{
|
||||
_timers[timerIndex]->installChannel(channel);
|
||||
}
|
||||
|
||||
inline void ChannelSearchManager::updateRTTE(long rtt)
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
const double error = rtt - _rttmean;
|
||||
_rttmean += error / 4.0;
|
||||
}
|
||||
|
||||
inline double ChannelSearchManager::getRTTE()
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
double rtte = min(max((double)_rttmean, (double)MIN_RTT), (double)MAX_RTT);
|
||||
return rtte;
|
||||
}
|
||||
|
||||
inline int32 ChannelSearchManager::getSequenceNumber()
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
int32 retval = _sequenceNumber;
|
||||
return retval;
|
||||
}
|
||||
|
||||
inline int64 ChannelSearchManager::getTimeAtLastSend()
|
||||
{
|
||||
Lock guard(&_mutex);
|
||||
int64 retval = _timeAtLastSend;
|
||||
return retval;
|
||||
}
|
||||
|
||||
}}
|
||||
|
||||
393
pvAccessApp/remote/channelSearchManager.h
Normal file
393
pvAccessApp/remote/channelSearchManager.h
Normal file
@@ -0,0 +1,393 @@
|
||||
/*
|
||||
* channelSearchManager.h
|
||||
*/
|
||||
|
||||
#ifndef CHANNELSEARCHMANAGER_H
|
||||
#define CHANNELSEARCHMANAGER_H
|
||||
|
||||
#include "remote.h"
|
||||
#include "pvAccess.h"
|
||||
#include "arrayFIFO.h"
|
||||
#include "caConstants.h"
|
||||
#include "clientContextImpl.h"
|
||||
|
||||
#include <timeStamp.h>
|
||||
#include <osiSock.h>
|
||||
#include <lock.h>
|
||||
#include <timer.h>
|
||||
|
||||
#include <iostream>
|
||||
#include <float.h>
|
||||
#include <math.h>
|
||||
|
||||
using namespace epics::pvData;
|
||||
|
||||
namespace epics { namespace pvAccess {
|
||||
|
||||
typedef int32 pvAccessID;
|
||||
|
||||
//TODO check the const of paramerers
|
||||
|
||||
/**
|
||||
* SearchInstance.
|
||||
*/
|
||||
//TODO document
|
||||
class SearchInstance {
|
||||
public:
|
||||
virtual ~SearchInstance() {};
|
||||
virtual pvAccessID getChannelID() = 0;
|
||||
virtual String getChannelName() = 0;
|
||||
virtual void unsetListOwnership() = 0;
|
||||
virtual void addAndSetListOwnership(ArrayFIFO<SearchInstance*>* newOwner, Mutex* ownerMutex, int32 index) = 0;
|
||||
virtual void removeAndUnsetListOwnership() = 0;
|
||||
virtual int32 getOwnerIndex() = 0;
|
||||
virtual bool generateSearchRequestMessage(ByteBuffer* requestMessage, TransportSendControl* control) = 0;
|
||||
|
||||
/**
|
||||
* Search response from server (channel found).
|
||||
* @param minorRevision server minor CA revision.
|
||||
* @param serverAddress server address.
|
||||
*/
|
||||
virtual void searchResponse(int8 minorRevision, osiSockAddr* serverAddress) = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* BaseSearchInstance.
|
||||
*/
|
||||
class BaseSearchInstance : public SearchInstance
|
||||
{
|
||||
public:
|
||||
virtual ~BaseSearchInstance() {};
|
||||
virtual pvAccessID getChannelID() = 0;
|
||||
virtual string getChannelName() = 0;
|
||||
virtual void unsetListOwnership();
|
||||
virtual void addAndSetListOwnership(ArrayFIFO<SearchInstance*>* newOwner, Mutex* ownerMutex, int32 index);
|
||||
virtual void removeAndUnsetListOwnership();
|
||||
virtual int32 getOwnerIndex();
|
||||
/**
|
||||
* Send search message.
|
||||
* @return success status.
|
||||
*/
|
||||
virtual bool generateSearchRequestMessage(ByteBuffer* requestMessage, TransportSendControl* control);
|
||||
private:
|
||||
Mutex _mutex;
|
||||
ArrayFIFO<SearchInstance*>* _owner;
|
||||
Mutex* _ownerMutex;
|
||||
int32 _ownerIndex;
|
||||
|
||||
const static int DATA_COUNT_POSITION;
|
||||
const static int PAYLOAD_POSITION;
|
||||
};
|
||||
|
||||
class ChannelSearchManager;
|
||||
/**
|
||||
* SearchTimer.
|
||||
*/
|
||||
class SearchTimer: public TimerCallback
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* Constructor;
|
||||
* @param timerIndex this timer instance index.
|
||||
* @param allowBoost is boost allowed flag.
|
||||
*/
|
||||
SearchTimer(ChannelSearchManager* csmanager,int32 timerIndex, bool allowBoost, bool allowSlowdown);
|
||||
/**
|
||||
* Destructor.
|
||||
*/
|
||||
virtual ~SearchTimer();
|
||||
/**
|
||||
* Shutdown this instance.
|
||||
*/
|
||||
void shutdown();
|
||||
/**
|
||||
* Install channel.
|
||||
* @param channel channel to be registered.
|
||||
*/
|
||||
void installChannel(SearchInstance* channel);
|
||||
/**
|
||||
* Move channels to other <code>SearchTimer</code>.
|
||||
* @param destination where to move channels.
|
||||
*/
|
||||
void moveChannels(SearchTimer* destination);
|
||||
/**
|
||||
* @see TimerCallback#timerStopped()
|
||||
*/
|
||||
void timerStopped();
|
||||
/**
|
||||
* @see TimerCallback#callback()
|
||||
*/
|
||||
void callback();
|
||||
/**
|
||||
* Search response received notification.
|
||||
* @param responseSequenceNumber sequence number of search frame which contained search request.
|
||||
* @param isSequenceNumberValid valid flag of <code>responseSequenceNumber</code>.
|
||||
* @param responseTime time of search response.
|
||||
*/
|
||||
void searchResponse(int32 responseSequenceNumber, bool isSequenceNumberValid, int64 responseTime);
|
||||
/**
|
||||
* Calculate search time period.
|
||||
* @return search time period.
|
||||
*/
|
||||
const int64 period();
|
||||
private:
|
||||
/**
|
||||
* Instance of the channel search manager with which this search timer
|
||||
* is associated.
|
||||
*/
|
||||
ChannelSearchManager* _chanSearchManager;
|
||||
/**
|
||||
* Number of search attempts in one frame.
|
||||
*/
|
||||
volatile int32 _searchAttempts;
|
||||
/**
|
||||
* Number of search responses in one frame.
|
||||
*/
|
||||
volatile int32 _searchRespones;
|
||||
/**
|
||||
* Number of frames per search try.
|
||||
*/
|
||||
double _framesPerTry;
|
||||
/**
|
||||
* Number of frames until congestion threshold is reached.
|
||||
*/
|
||||
double _framesPerTryCongestThresh;
|
||||
/**
|
||||
* Start sequence number (first frame number within a search try).
|
||||
*/
|
||||
volatile int32 _startSequenceNumber;
|
||||
/**
|
||||
* End sequence number (last frame number within a search try).
|
||||
*/
|
||||
volatile int32 _endSequenceNumber;
|
||||
/**
|
||||
* This timer index.
|
||||
*/
|
||||
const int32 _timerIndex;
|
||||
/**
|
||||
* Flag indicating whether boost is allowed.
|
||||
*/
|
||||
const bool _allowBoost;
|
||||
/**
|
||||
* Flag indicating whether slow-down is allowed (for last timer).
|
||||
*/
|
||||
const bool _allowSlowdown;
|
||||
/**
|
||||
* Ordered (as inserted) list of channels with search request pending.
|
||||
*/
|
||||
ArrayFIFO<SearchInstance*>* _requestPendingChannels;
|
||||
/**
|
||||
* Ordered (as inserted) list of channels with search request pending.
|
||||
*/
|
||||
ArrayFIFO<SearchInstance*>* _responsePendingChannels;
|
||||
/**
|
||||
* Timer node.
|
||||
* (sync on requestPendingChannels)
|
||||
*/
|
||||
TimerNode* _timerNode;
|
||||
/**
|
||||
* Cancel this instance.
|
||||
*/
|
||||
volatile bool _canceled;
|
||||
/**
|
||||
* Time of last response check.
|
||||
*/
|
||||
int64 _timeAtResponseCheck;
|
||||
/**
|
||||
* Mutex for request pending channel list.
|
||||
*/
|
||||
Mutex _requestPendingChannelsMutex;
|
||||
/**
|
||||
* General mutex.
|
||||
*/
|
||||
Mutex _mutex;
|
||||
/**
|
||||
* Max search tries per frame.
|
||||
*/
|
||||
static const int32 MAX_FRAMES_PER_TRY;
|
||||
};
|
||||
|
||||
class ChannelSearchManager
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* Constructor.
|
||||
* @param context
|
||||
*/
|
||||
ChannelSearchManager(ClientContextImpl* context);
|
||||
/**
|
||||
* Constructor.
|
||||
* @param context
|
||||
*/
|
||||
virtual ~ChannelSearchManager();
|
||||
/**
|
||||
* Cancel.
|
||||
*/
|
||||
void cancel();
|
||||
/**
|
||||
* Get number of registered channels.
|
||||
* @return number of registered channels.
|
||||
*/
|
||||
int32 registeredChannelCount();
|
||||
/**
|
||||
* Register channel.
|
||||
* @param channel to register.
|
||||
*/
|
||||
void registerChannel(SearchInstance* channel);
|
||||
/**
|
||||
* Unregister channel.
|
||||
* @param channel to unregister.
|
||||
*/
|
||||
void unregisterChannel(SearchInstance* channel);
|
||||
/**
|
||||
* Search response from server (channel found).
|
||||
* @param cid client channel ID.
|
||||
* @param seqNo search sequence number.
|
||||
* @param minorRevision server minor CA revision.
|
||||
* @param serverAddress server address.
|
||||
*/
|
||||
void searchResponse(int32 cid, int32 seqNo, int8 minorRevision, osiSockAddr* serverAddress);
|
||||
/**
|
||||
* Beacon anomaly detected.
|
||||
* Boost searching of all channels.
|
||||
*/
|
||||
void beaconAnomalyNotify();
|
||||
private:
|
||||
/**
|
||||
* Minimal RTT (ms).
|
||||
*/
|
||||
static const int64 MIN_RTT;
|
||||
/**
|
||||
* Maximal RTT (ms).
|
||||
*/
|
||||
static const int64 MAX_RTT;
|
||||
/**
|
||||
* Rate to be considered as OK.
|
||||
*/
|
||||
static const double SUCCESS_RATE;
|
||||
/**
|
||||
* Context.
|
||||
*/
|
||||
ClientContextImpl* _context;
|
||||
/**
|
||||
* Canceled flag.
|
||||
*/
|
||||
volatile bool _canceled;
|
||||
/**
|
||||
* Round-trip time (RTT) mean.
|
||||
*/
|
||||
volatile double _rttmean;
|
||||
/**
|
||||
* Search timers array.
|
||||
* Each timer with a greater index has longer (doubled) search period.
|
||||
*/
|
||||
SearchTimer** _timers;
|
||||
/**
|
||||
* Number of timers in timers array.
|
||||
*/
|
||||
int32 _numberOfTimers;
|
||||
/**
|
||||
* Index of a timer to be used when beacon anomaly is detected.
|
||||
*/
|
||||
int32 _beaconAnomalyTimerIndex;
|
||||
/**
|
||||
* Search (datagram) sequence number.
|
||||
*/
|
||||
volatile int32 _sequenceNumber;
|
||||
/**
|
||||
* Max search period (in ms).
|
||||
*/
|
||||
static const int64 MAX_SEARCH_PERIOD;
|
||||
/**
|
||||
* Max search period (in ms) - lower limit.
|
||||
*/
|
||||
static const int64 MAX_SEARCH_PERIOD_LOWER_LIMIT;
|
||||
/**
|
||||
* Beacon anomaly search period (in ms).
|
||||
*/
|
||||
static const int64 BEACON_ANOMALY_SEARCH_PERIOD;
|
||||
/**
|
||||
* Max number of timers.
|
||||
*/
|
||||
static const int32 MAX_TIMERS;
|
||||
/**
|
||||
* Send byte buffer (frame)
|
||||
*/
|
||||
ByteBuffer* _sendBuffer;
|
||||
/**
|
||||
* Time of last frame send.
|
||||
*/
|
||||
volatile int64 _timeAtLastSend;
|
||||
/**
|
||||
* Set of registered channels.
|
||||
*/
|
||||
std::map<pvAccessID,SearchInstance*> _channels;
|
||||
/**
|
||||
* Iterator for the set of registered channels.
|
||||
*/
|
||||
std::map<pvAccessID,SearchInstance*>::iterator _channelsIter;
|
||||
/**
|
||||
* General mutex.
|
||||
*/
|
||||
Mutex _mutex;
|
||||
/**
|
||||
* Mock transport send control
|
||||
*/
|
||||
TransportSendControl* _mockTransportSendControl;
|
||||
/**
|
||||
* SearchTimer is a friend.
|
||||
*/
|
||||
friend class SearchTimer;
|
||||
/**
|
||||
* Initialize send buffer.
|
||||
*/
|
||||
void initializeSendBuffer();
|
||||
/**
|
||||
* Flush send buffer.
|
||||
*/
|
||||
void flushSendBuffer();
|
||||
/**
|
||||
* Generate (put on send buffer) search request
|
||||
* @param channel
|
||||
* @param allowNewFrame flag indicating if new search request message is allowed to be put in new frame.
|
||||
* @return <code>true</code> if new frame was sent.
|
||||
*/
|
||||
bool generateSearchRequestMessage(SearchInstance* channel, bool allowNewFrame);
|
||||
/**
|
||||
* Notify about search failure (response timeout).
|
||||
* @param channel channel whose search failed.
|
||||
* @param timerIndex index of timer which tries to search.
|
||||
*/
|
||||
void searchResponseTimeout(SearchInstance* channel, int32 timerIndex);
|
||||
/**
|
||||
* Boost searching of a channel.
|
||||
* @param channel channel to boost searching.
|
||||
* @param timerIndex to what timer-index to boost
|
||||
*/
|
||||
void boostSearching(SearchInstance* channel, int32 timerIndex);
|
||||
/**
|
||||
* Update (recalculate) round-trip estimate.
|
||||
* @param rtt new sample of round-trip value.
|
||||
*/
|
||||
void updateRTTE(long rtt);
|
||||
/**
|
||||
* Get round-trip estimate (in ms).
|
||||
* @return round-trip estimate (in ms).
|
||||
*/
|
||||
double getRTTE();
|
||||
/**
|
||||
* Get search (UDP) frame sequence number.
|
||||
* @return search (UDP) frame sequence number.
|
||||
*/
|
||||
int32 getSequenceNumber();
|
||||
/**
|
||||
* Get time at last send (when sendBuffer was flushed).
|
||||
* @return time at last send.
|
||||
*/
|
||||
int64 getTimeAtLastSend();
|
||||
};
|
||||
|
||||
|
||||
}}
|
||||
|
||||
#endif /* CHANNELSEARCHMANAGER_H */
|
||||
Reference in New Issue
Block a user