Merge with 8ddd7d0d0f6862b3ad94474f490855909859637e

This commit is contained in:
miha_vitorovic
2011-01-07 08:46:17 +01:00
7 changed files with 1271 additions and 12 deletions

View File

@@ -53,6 +53,7 @@ INC += beaconEmitter.h
INC += beaconServerStatusProvider.h
INC += beaconHandler.h
INC += blockingTCP.h
INC += channelSearchManager.h
LIBSRCS += blockingUDPTransport.cpp
LIBSRCS += blockingUDPConnector.cpp
LIBSRCS += beaconEmitter.cpp
@@ -63,6 +64,7 @@ LIBSRCS += blockingClientTCPTransport.cpp
LIBSRCS += blockingTCPConnector.cpp
LIBSRCS += blockingServerTCPTransport.cpp
LIBSRCS += blockingTCPAcceptor.cpp
LIBSRCS += channelSearchManager.cpp
LIBRARY = pvAccess
pvAccess_LIBS += Com

View File

@@ -0,0 +1,633 @@
/*
* channelSearchManager.cpp
*/
#include "channelSearchManager.h"
using namespace std;
namespace epics { namespace pvAccess {
const int BaseSearchInstance::DATA_COUNT_POSITION = CA_MESSAGE_HEADER_SIZE + sizeof(int32)/sizeof(int8) + 1;
const int BaseSearchInstance::PAYLOAD_POSITION = sizeof(int16)/sizeof(int8) + 2;
void BaseSearchInstance::unsetListOwnership()
{
Lock guard(&_mutex);
_owner = NULL;
}
void BaseSearchInstance::addAndSetListOwnership(ArrayFIFO<SearchInstance*>* newOwner, Mutex* ownerMutex, int32 index)
{
if(ownerMutex == NULL) throw BaseException("Null owner mutex", __FILE__,__LINE__);
_ownerMutex = ownerMutex;
Lock ownerGuard(_ownerMutex);
Lock guard(&_mutex);
newOwner->push(this);
_owner = newOwner;
_ownerIndex = index;
}
void BaseSearchInstance::removeAndUnsetListOwnership()
{
if(_owner == NULL) return;
if(_ownerMutex == NULL) throw BaseException("Null owner mutex", __FILE__,__LINE__);
Lock ownerGuard(_ownerMutex);
Lock guard(&_mutex);
if(_owner != NULL)
{
_owner->remove(this);
_owner = NULL;
}
}
int32 BaseSearchInstance::getOwnerIndex()
{
Lock guard(&_mutex);
int32 retval = _ownerIndex;
return retval;
}
bool BaseSearchInstance::generateSearchRequestMessage(ByteBuffer* requestMessage, TransportSendControl* control)
{
int16 dataCount = requestMessage->getShort(DATA_COUNT_POSITION);
dataCount++;
if(dataCount >= MAX_SEARCH_BATCH_COUNT)
{
return false;
}
const string name = getChannelName();
// not nice...
const int addedPayloadSize = sizeof(int32)/sizeof(int8) + (1 + sizeof(int32)/sizeof(int8) + name.length());
if(requestMessage->getRemaining() < addedPayloadSize)
{
return false;
}
requestMessage->putInt(getChannelID());
SerializeHelper::serializeString(name, requestMessage, control);
requestMessage->putInt(PAYLOAD_POSITION, requestMessage->getPosition() - CA_MESSAGE_HEADER_SIZE);
requestMessage->putShort(DATA_COUNT_POSITION, dataCount);
return true;
}
const int32 SearchTimer::MAX_FRAMES_PER_TRY = 64;
SearchTimer::SearchTimer(ChannelSearchManager* _chanSearchManager, int32 timerIndex, bool allowBoost, bool allowSlowdown):
_chanSearchManager(_chanSearchManager),
_searchAttempts(0),
_searchRespones(0),
_framesPerTry(1),
_framesPerTryCongestThresh(DBL_MAX),
_startSequenceNumber(0),
_endSequenceNumber(0),
_timerIndex(timerIndex),
_allowBoost(allowBoost),
_allowSlowdown(allowSlowdown),
_requestPendingChannels(new ArrayFIFO<SearchInstance*>),
_responsePendingChannels(new ArrayFIFO<SearchInstance*>),
_timerNode(NULL),
_canceled(false),
_timeAtResponseCheck(0),
_requestPendingChannelsMutex(Mutex()),
_mutex(Mutex())
{
}
SearchTimer::~SearchTimer()
{
if(_requestPendingChannels) delete _requestPendingChannels;
if(_responsePendingChannels) delete _responsePendingChannels;
}
void SearchTimer::shutdown()
{
Lock guard(&_mutex); //the whole method is locked
if(_canceled) return;
_canceled = true;
{
Lock guard(&_requestPendingChannelsMutex);
_timerNode->cancel();
_requestPendingChannels->clear();
_responsePendingChannels->clear();
}
}
void SearchTimer::installChannel(SearchInstance* channel)
{
Lock guard(&_mutex); //the whole method is locked
if(_canceled) return;
Lock pendingChannelGuard(&_requestPendingChannelsMutex);
bool startImmediately = _requestPendingChannels->isEmpty();
channel->addAndSetListOwnership(_requestPendingChannels, &_requestPendingChannelsMutex, _timerIndex);
// start searching
if(startImmediately)
{
_timerNode->cancel();
if(_timeAtResponseCheck == 0)
{
TimeStamp current;
current.getCurrent();
_timeAtResponseCheck = current.getMilliseconds();
}
// start with some initial delay (to collect all installed requests)
_chanSearchManager->_context->getTimer()->scheduleAfterDelay(_timerNode, 0.01);
}
}
void SearchTimer::moveChannels(SearchTimer* destination)
{
// do not sync this, not necessary and might cause deadlock
SearchInstance* channel;
while((channel = _responsePendingChannels->pop()) != NULL)
{
{
Lock guard(&_mutex);
if(_searchAttempts > 0)
{
_searchAttempts--;
}
}
destination->installChannel(channel);
}
// bulk move
Lock guard(&_requestPendingChannelsMutex);
while (!_requestPendingChannels->isEmpty())
{
destination->installChannel(_requestPendingChannels->pop());
}
}
void SearchTimer::timerStopped()
{
//noop
}
void SearchTimer::callback()
{
{
Lock guard(&_mutex);
if(_canceled) return;
}
// if there was some success (no congestion)
// boost search period (if necessary) for channels not recently searched
int32 searchRespones;
{
Lock guard(&_mutex);
searchRespones = _searchRespones;
}
if(_allowBoost && searchRespones > 0)
{
Lock guard(&_requestPendingChannelsMutex);
while(!_requestPendingChannels->isEmpty())
{
SearchInstance* channel = _requestPendingChannels->peek();
// boost needed check
//final int boostIndex = searchRespones >= searchAttempts * SUCCESS_RATE ? Math.min(Math.max(0, timerIndex - 1), beaconAnomalyTimerIndex) : beaconAnomalyTimerIndex;
const int boostIndex = _chanSearchManager->_beaconAnomalyTimerIndex;
if(channel->getOwnerIndex() > boostIndex)
{
_requestPendingChannels->pop();
channel->unsetListOwnership();
_chanSearchManager->boostSearching(channel, boostIndex);
}
}
}
SearchInstance* channel;
// should we check results (installChannel trigger timer immediately)
TimeStamp current;
current.getCurrent();
int64 now = current.getMilliseconds();
if(now - _timeAtResponseCheck >= period())
{
_timeAtResponseCheck = now;
// notify about timeout (move it to other timer)
while((channel = _responsePendingChannels->pop()) != NULL)
{
if(_allowSlowdown)
{
channel->unsetListOwnership();
_chanSearchManager->searchResponseTimeout(channel, _timerIndex);
}
else
{
channel->addAndSetListOwnership(_requestPendingChannels, &_requestPendingChannelsMutex, _timerIndex);
}
}
int32 searchRespones,searchAttempts;
{
Lock guard(&_mutex);
searchAttempts = _searchAttempts;
searchRespones = _searchRespones;
}
// check search results
if(searchAttempts > 0)
{
// increase UDP frames per try if we have a good score
if(searchRespones >= searchAttempts * ChannelSearchManager::SUCCESS_RATE)
{
// increase frames per try
// a congestion avoidance threshold similar to TCP is now used
if(_framesPerTry < MAX_FRAMES_PER_TRY)
{
if(_framesPerTry < _framesPerTryCongestThresh)
{
_framesPerTry = min(2*_framesPerTry, _framesPerTryCongestThresh);
}
else
{
_framesPerTry += 1.0/_framesPerTry;
}
}
}
else
{
// decrease frames per try, fallback
_framesPerTryCongestThresh = _framesPerTry / 2.0;
_framesPerTry = 1;
}
}
}
{
Lock guard(&_mutex);
_startSequenceNumber = _chanSearchManager->getSequenceNumber() + 1;
_searchAttempts = 0;
_searchRespones = 0;
}
int32 framesSent = 0;
int32 triesInFrame = 0;
// reschedule
bool canceled;
{
Lock guard(&_mutex);
canceled = _canceled;
}
{
Lock guard(&_requestPendingChannelsMutex);
while (!canceled && (channel = _requestPendingChannels->pop()) != NULL)
{
channel->unsetListOwnership();
bool requestSent = true;
bool allowNewFrame = (framesSent+1) < _framesPerTry;
bool frameWasSent = _chanSearchManager->generateSearchRequestMessage(channel, allowNewFrame);
if(frameWasSent)
{
framesSent++;
triesInFrame = 0;
if(!allowNewFrame)
{
channel->addAndSetListOwnership(_requestPendingChannels, &_requestPendingChannelsMutex, _timerIndex);
requestSent = false;
}
else
{
triesInFrame++;
}
}
else
{
triesInFrame++;
}
if(requestSent)
{
channel->addAndSetListOwnership(_responsePendingChannels, &_requestPendingChannelsMutex, _timerIndex);
Lock guard(&_mutex);
if(_searchAttempts < INT_MAX)
{
_searchAttempts++;
}
}
// limit
if(triesInFrame == 0 && !allowNewFrame) break;
Lock guard(&_mutex);
canceled = _canceled;
}
}
// flush out the search request buffer
if(triesInFrame > 0)
{
_chanSearchManager->flushSendBuffer();
framesSent++;
}
_endSequenceNumber = _chanSearchManager->getSequenceNumber();
// reschedule
{
Lock guard(&_mutex);
canceled = _canceled;
}
Lock guard(&_requestPendingChannelsMutex);
if(!canceled && !_timerNode->isScheduled())
{
bool someWorkToDo = (!_requestPendingChannels->isEmpty() || !_responsePendingChannels->isEmpty());
if(someWorkToDo)
{
_chanSearchManager->_context->getTimer()->scheduleAfterDelay(_timerNode, period()/1000.0);
}
}
}
void SearchTimer::searchResponse(int32 responseSequenceNumber, bool isSequenceNumberValid, int64 responseTime)
{
bool validResponse = true;
{
Lock guard(&_mutex);
if(_canceled) return;
if(isSequenceNumberValid)
{
validResponse = _startSequenceNumber <= _chanSearchManager->_sequenceNumber && _chanSearchManager->_sequenceNumber <= _endSequenceNumber;
}
}
// update RTTE
if(validResponse)
{
const int64 dt = responseTime - _chanSearchManager->getTimeAtLastSend();
_chanSearchManager->updateRTTE(dt);
Lock guard(&_mutex);
if(_searchRespones < INT_MAX)
{
_searchRespones++;
// all found, send new search requests immediately if necessary
if(_searchRespones == _searchAttempts)
{
if(_requestPendingChannels->size() > 0)
{
_timerNode->cancel();
_chanSearchManager->_context->getTimer()->scheduleAfterDelay(_timerNode, 0.0);
}
}
}
}
}
const int64 SearchTimer::period()
{
return (int64) ((1 << _timerIndex) * _chanSearchManager->getRTTE());
}
const int64 ChannelSearchManager::MIN_RTT = 32;
const int64 ChannelSearchManager::MAX_RTT = 2 * ChannelSearchManager::MIN_RTT;
const double ChannelSearchManager::SUCCESS_RATE = 0.9;
const int64 ChannelSearchManager::MAX_SEARCH_PERIOD = 5 * 60000;
const int64 ChannelSearchManager::MAX_SEARCH_PERIOD_LOWER_LIMIT = 60000;
const int64 ChannelSearchManager::BEACON_ANOMALY_SEARCH_PERIOD = 5000;
const int32 ChannelSearchManager::MAX_TIMERS = 18;
ChannelSearchManager::ChannelSearchManager(ClientContextImpl* context):
_context(context),
_canceled(false),
_rttmean(MIN_RTT),
_sequenceNumber(0)
{
// create and initialize send buffer
_sendBuffer = new ByteBuffer(MAX_UDP_SEND);
initializeSendBuffer();
// TODO should be configurable
int64 maxPeriod = MAX_SEARCH_PERIOD;
maxPeriod = min(maxPeriod, MAX_SEARCH_PERIOD_LOWER_LIMIT);
// calculate number of timers to reach maxPeriod (each timer period is doubled)
double powerOfTwo = log(maxPeriod / (double)MIN_RTT) / log(2);
int32 numberOfTimers = (int32)(powerOfTwo + 1);
numberOfTimers = min(numberOfTimers, MAX_TIMERS);
// calculate beacon anomaly timer index
powerOfTwo = log(BEACON_ANOMALY_SEARCH_PERIOD / (double)MIN_RTT) / log(2);
_beaconAnomalyTimerIndex = (int32)(powerOfTwo + 1);
_beaconAnomalyTimerIndex = min(_beaconAnomalyTimerIndex, numberOfTimers - 1);
// create timers
_timers = new SearchTimer*[numberOfTimers];
for(int i = 0; i < numberOfTimers; i++)
{
_timers[i] = new SearchTimer(this, i, i > _beaconAnomalyTimerIndex, i != (numberOfTimers-1));
}
_numberOfTimers = numberOfTimers;
_mockTransportSendControl = new MockTransportSendControl();
}
ChannelSearchManager::~ChannelSearchManager()
{
for(int i = 0; i < _numberOfTimers; i++)
{
if(_timers[i]) delete _timers[i];
}
if(_timers) delete[] _timers;
if(_sendBuffer) delete _sendBuffer;
if(_mockTransportSendControl) delete _mockTransportSendControl;
}
void ChannelSearchManager::cancel()
{
Lock guard(&_mutex);
if(_canceled) return;
_canceled = true;
if(_timers != NULL)
{
for(int i = 0; i < _numberOfTimers; i++)
{
_timers[i]->shutdown();
}
}
}
int32 ChannelSearchManager::registeredChannelCount()
{
Lock guard(&_mutex);
return _channels.size();
}
void ChannelSearchManager::registerChannel(SearchInstance* channel)
{
Lock guard(&_mutex);
if(_canceled) return;
//overrides if already registered
_channels[channel->getChannelID()] = channel;
_timers[0]->installChannel(channel);
}
void ChannelSearchManager::unregisterChannel(SearchInstance* channel)
{
Lock guard(&_mutex);
_channelsIter = _channels.find(channel->getChannelID());
if(_channelsIter != _channels.end())
{
_channels.erase(channel->getChannelID());
}
channel->removeAndUnsetListOwnership();
}
void ChannelSearchManager::searchResponse(int32 cid, int32 seqNo, int8 minorRevision, osiSockAddr* serverAddress)
{
Lock guard(&_mutex);
// first remove
SearchInstance* si = NULL;
_channelsIter = _channels.find(cid);
if(_channelsIter != _channels.end())
{
SearchInstance* si = _channelsIter->second;
_channels.erase(_channelsIter);
si->removeAndUnsetListOwnership();
}
else
{
// minor hack to enable duplicate reports
si = reinterpret_cast<SearchInstance*>(_context->getChannel(cid));
if(si != NULL)
{
si->searchResponse(minorRevision, serverAddress);
}
return;
}
// report success
const int timerIndex = si->getOwnerIndex();
TimeStamp now;
now.getCurrent();
_timers[timerIndex]->searchResponse(seqNo, seqNo != 0, now.getMilliseconds());
// then notify SearchInstance
si->searchResponse(minorRevision, serverAddress);
}
void ChannelSearchManager::beaconAnomalyNotify()
{
for(int i = _beaconAnomalyTimerIndex + 1; i < _numberOfTimers; i++)
{
_timers[i]->moveChannels(_timers[_beaconAnomalyTimerIndex]);
}
}
void ChannelSearchManager::initializeSendBuffer()
{
Lock guard(&_mutex);
_sequenceNumber++;
// new buffer
_sendBuffer->clear();
_sendBuffer->putShort(CA_MAGIC_AND_VERSION);
_sendBuffer->putByte((int8)0); // data
_sendBuffer->putByte((int8)3); // beacon
_sendBuffer->putInt(sizeof(int32)/sizeof(int8) + 1); // "zero" payload
_sendBuffer->putInt(_sequenceNumber);
/*
final boolean REQUIRE_REPLY = false;
sendBuffer.put(REQUIRE_REPLY ? (byte)QoS.REPLY_REQUIRED.getMaskValue() : (byte)QoS.DEFAULT.getMaskValue());
*/
_sendBuffer->putByte((int8)DEFAULT);
_sendBuffer->putShort((int16)0); // count
}
void ChannelSearchManager::flushSendBuffer()
{
Lock guard(&_mutex);
TimeStamp now;
now.getCurrent();
_timeAtLastSend = now.getMilliseconds();
_context->getSearchTransport()->send(_sendBuffer);
initializeSendBuffer();
}
bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance* channel, bool allowNewFrame)
{
Lock guard(&_mutex);
bool success = channel->generateSearchRequestMessage(_sendBuffer, _mockTransportSendControl);
// buffer full, flush
if(!success)
{
flushSendBuffer();
if(allowNewFrame)
{
channel->generateSearchRequestMessage(_sendBuffer, _mockTransportSendControl);
}
return true;
}
return false;
}
void ChannelSearchManager::searchResponseTimeout(SearchInstance* channel, int32 timerIndex)
{
int32 newTimerIndex = min(++timerIndex, _numberOfTimers - 1);
_timers[newTimerIndex]->installChannel(channel);
}
void ChannelSearchManager::boostSearching(SearchInstance* channel, int32 timerIndex)
{
_timers[timerIndex]->installChannel(channel);
}
inline void ChannelSearchManager::updateRTTE(long rtt)
{
Lock guard(&_mutex);
const double error = rtt - _rttmean;
_rttmean += error / 4.0;
}
inline double ChannelSearchManager::getRTTE()
{
Lock guard(&_mutex);
double rtte = min(max((double)_rttmean, (double)MIN_RTT), (double)MAX_RTT);
return rtte;
}
inline int32 ChannelSearchManager::getSequenceNumber()
{
Lock guard(&_mutex);
int32 retval = _sequenceNumber;
return retval;
}
inline int64 ChannelSearchManager::getTimeAtLastSend()
{
Lock guard(&_mutex);
int64 retval = _timeAtLastSend;
return retval;
}
}}

View File

@@ -0,0 +1,610 @@
/*
* channelSearchManager.h
*/
#ifndef CHANNELSEARCHMANAGER_H
#define CHANNELSEARCHMANAGER_H
#include "remote.h"
#include "pvAccess.h"
#include "arrayFIFO.h"
#include "caConstants.h"
#include "blockingUDP.h"
#include <timeStamp.h>
#include <osiSock.h>
#include <lock.h>
#include <timer.h>
#include <iostream>
#include <float.h>
#include <math.h>
using namespace epics::pvData;
namespace epics { namespace pvAccess {
typedef int32 pvAccessID;
enum QoS {
/**
* Default behavior.
*/
DEFAULT = 0x00,
/**
* Require reply (acknowledgment for reliable operation).
*/
REPLY_REQUIRED = 0x01,
/**
* Best-effort option (no reply).
*/
BESY_EFFORT = 0x02,
/**
* Process option.
*/
PROCESS = 0x04,
/**
* Initialize option.
*/
INIT = 0x08,
/**
* Destroy option.
*/
DESTROY = 0x10,
/**
* Share data option.
*/
SHARE = 0x20,
/**
* Get.
*/
GET = 0x40,
/**
* Get-put.
*/
GET_PUT =0x80
};
//TODO this will be deleted
class ChannelImpl;
class ChannelSearchManager;
class ClientContextImpl : public ClientContext
{
public:
ClientContextImpl()
{
}
virtual Version* getVersion() {
return NULL;
}
virtual ChannelProvider* getProvider() {
return NULL;
}
Timer* getTimer()
{
return NULL;
}
virtual void initialize() {
}
virtual void printInfo() {
}
virtual void printInfo(epics::pvData::StringBuilder out) {
}
virtual void destroy()
{
}
virtual void dispose()
{
}
BlockingUDPTransport* getSearchTransport()
{
return NULL;
}
/**
* Searches for a channel with given channel ID.
* @param channelID CID.
* @return channel with given CID, <code>0</code> if non-existent.
*/
ChannelImpl* getChannel(pvAccessID channelID)
{
return NULL;
}
private:
~ClientContextImpl() {};
void loadConfiguration() {
}
void internalInitialize() {
}
void initializeUDPTransport() {
}
void internalDestroy() {
}
void destroyAllChannels() {
}
/**
* Check channel name.
*/
void checkChannelName(String& name) {
}
/**
* Check context state and tries to establish necessary state.
*/
void checkState() {
}
/**
* Generate Client channel ID (CID).
* @return Client channel ID (CID).
*/
pvAccessID generateCID()
{
return 0;
}
/**
* Free generated channel ID (CID).
*/
void freeCID(int cid)
{
}
/**
* Get, or create if necessary, transport of given server address.
* @param serverAddress required transport address
* @param priority process priority.
* @return transport for given address
*/
Transport* getTransport(TransportClient* client, osiSockAddr* serverAddress, int16 minorRevision, int16 priority)
{
return NULL;
}
/**
* Internal create channel.
*/
// TODO no minor version with the addresses
// TODO what if there is an channel with the same name, but on different host!
Channel* createChannelInternal(String name, ChannelRequester* requester, short priority,
InetAddrVector* addresses) {
return NULL;
}
/**
* Destroy channel.
* @param channel
* @param force
* @throws CAException
* @throws IllegalStateException
*/
void destroyChannel(ChannelImpl* channel, bool force) {
}
/**
* Get channel search manager.
* @return channel search manager.
*/
ChannelSearchManager* getChannelSearchManager() {
return NULL;
}
};
//TODO check the const of paramerers
/**
* SearchInstance.
*/
//TODO document
class SearchInstance {
public:
virtual ~SearchInstance() {};
virtual pvAccessID getChannelID() = 0;
virtual String getChannelName() = 0;
virtual void unsetListOwnership() = 0;
virtual void addAndSetListOwnership(ArrayFIFO<SearchInstance*>* newOwner, Mutex* ownerMutex, int32 index) = 0;
virtual void removeAndUnsetListOwnership() = 0;
virtual int32 getOwnerIndex() = 0;
virtual bool generateSearchRequestMessage(ByteBuffer* requestMessage, TransportSendControl* control) = 0;
/**
* Search response from server (channel found).
* @param minorRevision server minor CA revision.
* @param serverAddress server address.
*/
virtual void searchResponse(int8 minorRevision, osiSockAddr* serverAddress) = 0;
};
/**
* BaseSearchInstance.
*/
class BaseSearchInstance : public SearchInstance
{
public:
virtual ~BaseSearchInstance() {};
virtual pvAccessID getChannelID() = 0;
virtual string getChannelName() = 0;
virtual void unsetListOwnership();
virtual void addAndSetListOwnership(ArrayFIFO<SearchInstance*>* newOwner, Mutex* ownerMutex, int32 index);
virtual void removeAndUnsetListOwnership();
virtual int32 getOwnerIndex();
/**
* Send search message.
* @return success status.
*/
virtual bool generateSearchRequestMessage(ByteBuffer* requestMessage, TransportSendControl* control);
private:
Mutex _mutex;
ArrayFIFO<SearchInstance*>* _owner;
Mutex* _ownerMutex;
int32 _ownerIndex;
const static int DATA_COUNT_POSITION;
const static int PAYLOAD_POSITION;
};
class ChannelSearchManager;
/**
* SearchTimer.
*/
class SearchTimer: public TimerCallback
{
public:
/**
* Constructor;
* @param timerIndex this timer instance index.
* @param allowBoost is boost allowed flag.
*/
SearchTimer(ChannelSearchManager* csmanager,int32 timerIndex, bool allowBoost, bool allowSlowdown);
/**
* Destructor.
*/
virtual ~SearchTimer();
/**
* Shutdown this instance.
*/
void shutdown();
/**
* Install channel.
* @param channel channel to be registered.
*/
void installChannel(SearchInstance* channel);
/**
* Move channels to other <code>SearchTimer</code>.
* @param destination where to move channels.
*/
void moveChannels(SearchTimer* destination);
/**
* @see TimerCallback#timerStopped()
*/
void timerStopped();
/**
* @see TimerCallback#callback()
*/
void callback();
/**
* Search response received notification.
* @param responseSequenceNumber sequence number of search frame which contained search request.
* @param isSequenceNumberValid valid flag of <code>responseSequenceNumber</code>.
* @param responseTime time of search response.
*/
void searchResponse(int32 responseSequenceNumber, bool isSequenceNumberValid, int64 responseTime);
/**
* Calculate search time period.
* @return search time period.
*/
const int64 period();
private:
/**
* Instance of the channel search manager with which this search timer
* is associated.
*/
ChannelSearchManager* _chanSearchManager;
/**
* Number of search attempts in one frame.
*/
volatile int32 _searchAttempts;
/**
* Number of search responses in one frame.
*/
volatile int32 _searchRespones;
/**
* Number of frames per search try.
*/
double _framesPerTry;
/**
* Number of frames until congestion threshold is reached.
*/
double _framesPerTryCongestThresh;
/**
* Start sequence number (first frame number within a search try).
*/
volatile int32 _startSequenceNumber;
/**
* End sequence number (last frame number within a search try).
*/
volatile int32 _endSequenceNumber;
/**
* This timer index.
*/
const int32 _timerIndex;
/**
* Flag indicating whether boost is allowed.
*/
const bool _allowBoost;
/**
* Flag indicating whether slow-down is allowed (for last timer).
*/
const bool _allowSlowdown;
/**
* Ordered (as inserted) list of channels with search request pending.
*/
ArrayFIFO<SearchInstance*>* _requestPendingChannels;
/**
* Ordered (as inserted) list of channels with search request pending.
*/
ArrayFIFO<SearchInstance*>* _responsePendingChannels;
/**
* Timer node.
* (sync on requestPendingChannels)
*/
TimerNode* _timerNode;
/**
* Cancel this instance.
*/
volatile bool _canceled;
/**
* Time of last response check.
*/
int64 _timeAtResponseCheck;
/**
* Mutex for request pending channel list.
*/
Mutex _requestPendingChannelsMutex;
/**
* General mutex.
*/
Mutex _mutex;
/**
* Max search tries per frame.
*/
static const int32 MAX_FRAMES_PER_TRY;
};
class MockTransportSendControl: public TransportSendControl
{
public:
void endMessage() {}
void flush(bool lastMessageCompleted) {}
void setRecipient(const osiSockAddr* sendTo) {}
void startMessage(int8 command, int32 ensureCapacity) {}
void ensureBuffer(int32 size) {}
void flushSerializeBuffer() {}
};
class ChannelSearchManager
{
public:
/**
* Constructor.
* @param context
*/
ChannelSearchManager(ClientContextImpl* context);
/**
* Constructor.
* @param context
*/
virtual ~ChannelSearchManager();
/**
* Cancel.
*/
void cancel();
/**
* Get number of registered channels.
* @return number of registered channels.
*/
int32 registeredChannelCount();
/**
* Register channel.
* @param channel to register.
*/
void registerChannel(SearchInstance* channel);
/**
* Unregister channel.
* @param channel to unregister.
*/
void unregisterChannel(SearchInstance* channel);
/**
* Search response from server (channel found).
* @param cid client channel ID.
* @param seqNo search sequence number.
* @param minorRevision server minor CA revision.
* @param serverAddress server address.
*/
void searchResponse(int32 cid, int32 seqNo, int8 minorRevision, osiSockAddr* serverAddress);
/**
* Beacon anomaly detected.
* Boost searching of all channels.
*/
void beaconAnomalyNotify();
private:
/**
* Minimal RTT (ms).
*/
static const int64 MIN_RTT;
/**
* Maximal RTT (ms).
*/
static const int64 MAX_RTT;
/**
* Rate to be considered as OK.
*/
static const double SUCCESS_RATE;
/**
* Context.
*/
ClientContextImpl* _context;
/**
* Canceled flag.
*/
volatile bool _canceled;
/**
* Round-trip time (RTT) mean.
*/
volatile double _rttmean;
/**
* Search timers array.
* Each timer with a greater index has longer (doubled) search period.
*/
SearchTimer** _timers;
/**
* Number of timers in timers array.
*/
int32 _numberOfTimers;
/**
* Index of a timer to be used when beacon anomaly is detected.
*/
int32 _beaconAnomalyTimerIndex;
/**
* Search (datagram) sequence number.
*/
volatile int32 _sequenceNumber;
/**
* Max search period (in ms).
*/
static const int64 MAX_SEARCH_PERIOD;
/**
* Max search period (in ms) - lower limit.
*/
static const int64 MAX_SEARCH_PERIOD_LOWER_LIMIT;
/**
* Beacon anomaly search period (in ms).
*/
static const int64 BEACON_ANOMALY_SEARCH_PERIOD;
/**
* Max number of timers.
*/
static const int32 MAX_TIMERS;
/**
* Send byte buffer (frame)
*/
ByteBuffer* _sendBuffer;
/**
* Time of last frame send.
*/
volatile int64 _timeAtLastSend;
/**
* Set of registered channels.
*/
std::map<pvAccessID,SearchInstance*> _channels;
/**
* Iterator for the set of registered channels.
*/
std::map<pvAccessID,SearchInstance*>::iterator _channelsIter;
/**
* General mutex.
*/
Mutex _mutex;
/**
* Mock transport send control
*/
MockTransportSendControl* _mockTransportSendControl;
/**
* SearchTimer is a friend.
*/
friend class SearchTimer;
/**
* Initialize send buffer.
*/
void initializeSendBuffer();
/**
* Flush send buffer.
*/
void flushSendBuffer();
/**
* Generate (put on send buffer) search request
* @param channel
* @param allowNewFrame flag indicating if new search request message is allowed to be put in new frame.
* @return <code>true</code> if new frame was sent.
*/
bool generateSearchRequestMessage(SearchInstance* channel, bool allowNewFrame);
/**
* Notify about search failure (response timeout).
* @param channel channel whose search failed.
* @param timerIndex index of timer which tries to search.
*/
void searchResponseTimeout(SearchInstance* channel, int32 timerIndex);
/**
* Boost searching of a channel.
* @param channel channel to boost searching.
* @param timerIndex to what timer-index to boost
*/
void boostSearching(SearchInstance* channel, int32 timerIndex);
/**
* Update (recalculate) round-trip estimate.
* @param rtt new sample of round-trip value.
*/
void updateRTTE(long rtt);
/**
* Get round-trip estimate (in ms).
* @return round-trip estimate (in ms).
*/
double getRTTE();
/**
* Get search (UDP) frame sequence number.
* @return search (UDP) frame sequence number.
*/
int32 getSequenceNumber();
/**
* Get time at last send (when sendBuffer was flushed).
* @return time at last send.
*/
int64 getTimeAtLastSend();
};
}}
#endif /* CHANNELSEARCHMANAGER_H */

View File

@@ -13,7 +13,7 @@ ReferenceCountingLock::ReferenceCountingLock(): _references(1)
if(retval != 0)
{
//string errMsg = "Error: pthread_mutexattr_init failed: " + string(strerror(retval));
assert(true);
assert(false);
}
retval = pthread_mutexattr_settype(&mutexAttribute, PTHREAD_MUTEX_RECURSIVE);
if(retval == 0)
@@ -22,13 +22,13 @@ ReferenceCountingLock::ReferenceCountingLock(): _references(1)
if(retval != 0)
{
//string errMsg = "Error: pthread_mutex_init failed: " + string(strerror(retval));
assert(true);
assert(false);
}
}
else
{
//string errMsg = "Error: pthread_mutexattr_settype failed: " + string(strerror(retval));
assert(true);
assert(false);
}
pthread_mutexattr_destroy(&mutexAttribute);
@@ -63,7 +63,7 @@ void ReferenceCountingLock::release()
int retval = pthread_mutex_unlock(&_mutex);
if(retval != 0)
{
string errMsg = "Error: pthread_mutex_unlock failed: " + string(strerror(retval));
//string errMsg = "Error: pthread_mutex_unlock failed: " + string(strerror(retval));
//TODO do something?
}
}

View File

@@ -71,4 +71,4 @@ private:
}}
#endif /* NAMEDLOCKPATTERN_H */
#endif /* REFERENCECOUNTINGLOCK_H */

View File

@@ -22,13 +22,9 @@ PROD_HOST += testBeaconHandler
testBeaconHandler_SRCS += testBeaconHandler.cpp
testBeaconHandler_LIBS += pvData pvAccess Com
PROD_HOST += testBlockingTCPSrv
testBlockingTCPSrv_SRCS += testBlockingTCPSrv.cpp
testBlockingTCPSrv_LIBS += pvData pvAccess Com
PROD_HOST += testBlockingTCPClnt
testBlockingTCPClnt_SRCS += testBlockingTCPClnt
testBlockingTCPClnt_LIBS += pvData pvAccess Com
PROD_HOST += testChannelSearchManager
testChannelSearchManager_SRCS += testChannelSearchManager.cpp
testChannelSearchManager_LIBS += pvData pvAccess Com
include $(TOP)/configure/RULES
#----------------------------------------

View File

@@ -0,0 +1,18 @@
/* testChannelSearcManager.cpp */
#include <channelSearchManager.h>
using namespace epics::pvData;
using namespace epics::pvAccess;
int main(int argc,char *argv[])
{
ClientContextImpl* context = new ClientContextImpl();
ChannelSearchManager* manager = new ChannelSearchManager(context);
context->destroy();
getShowConstructDestruct()->constuctDestructTotals(stdout);
return(0);
}