From 3dcc75bb5714747c4986647938362aab62263a19 Mon Sep 17 00:00:00 2001 From: Gasper Jansa Date: Thu, 6 Jan 2011 15:59:40 +0100 Subject: [PATCH 1/3] ChannelSearchManager implementation --- pvAccessApp/remote/channelSearchManager.cpp | 632 ++++++++++++++++++++ pvAccessApp/remote/channelSearchManager.h | 393 ++++++++++++ 2 files changed, 1025 insertions(+) create mode 100644 pvAccessApp/remote/channelSearchManager.cpp create mode 100644 pvAccessApp/remote/channelSearchManager.h diff --git a/pvAccessApp/remote/channelSearchManager.cpp b/pvAccessApp/remote/channelSearchManager.cpp new file mode 100644 index 0000000..033ff00 --- /dev/null +++ b/pvAccessApp/remote/channelSearchManager.cpp @@ -0,0 +1,632 @@ +/* + * channelSearchManager.cpp + */ + +#include "channelSearchManager.h" + +using namespace std; + +namespace epics { namespace pvAccess { + +const int BaseSearchInstance::DATA_COUNT_POSITION = CA_MESSAGE_HEADER_SIZE + sizeof(int32)/sizeof(int8) + 1; +const int BaseSearchInstance::PAYLOAD_POSITION = sizeof(int16)/sizeof(int8) + 2; + +void BaseSearchInstance::unsetListOwnership() +{ + Lock guard(&_mutex); + _owner = NULL; +} + +void BaseSearchInstance::addAndSetListOwnership(ArrayFIFO* newOwner, Mutex* ownerMutex, int32 index) +{ + if(ownerMutex == NULL) throw BaseException("Null owner mutex", __FILE__,__LINE__); + + _ownerMutex = ownerMutex; + Lock ownerGuard(_ownerMutex); + Lock guard(&_mutex); + newOwner->push(this); + _owner = newOwner; + _ownerIndex = index; +} + +void BaseSearchInstance::removeAndUnsetListOwnership() +{ + if(_owner == NULL) return; + + if(_ownerMutex == NULL) throw BaseException("Null owner mutex", __FILE__,__LINE__); + Lock ownerGuard(_ownerMutex); + Lock guard(&_mutex); + if(_owner != NULL) + { + _owner->remove(this); + _owner = NULL; + } +} + +int32 BaseSearchInstance::getOwnerIndex() +{ + Lock guard(&_mutex); + int32 retval = _ownerIndex; + return retval; +} + +bool BaseSearchInstance::generateSearchRequestMessage(ByteBuffer* requestMessage, TransportSendControl* control) +{ + int16 dataCount = requestMessage->getShort(DATA_COUNT_POSITION); + + dataCount++; + if(dataCount >= MAX_SEARCH_BATCH_COUNT) + { + return false; + } + + const string name = getChannelName(); + // not nice... + const int addedPayloadSize = sizeof(int32)/sizeof(int8) + (1 + sizeof(int32)/sizeof(int8) + name.length()); + + if(requestMessage->getRemaining() < addedPayloadSize) + { + return false; + } + + requestMessage->putInt(getChannelID()); + SerializeHelper::serializeString(name, requestMessage, control); + + requestMessage->putInt(PAYLOAD_POSITION, requestMessage->getPosition() - CA_MESSAGE_HEADER_SIZE); + requestMessage->putShort(DATA_COUNT_POSITION, dataCount); + return true; +} + +const int32 SearchTimer::MAX_FRAMES_PER_TRY = 64; + +SearchTimer::SearchTimer(ChannelSearchManager* _chanSearchManager, int32 timerIndex, bool allowBoost, bool allowSlowdown): + _chanSearchManager(_chanSearchManager), + _searchAttempts(0), + _searchRespones(0), + _framesPerTry(1), + _framesPerTryCongestThresh(DBL_MAX), + _startSequenceNumber(0), + _endSequenceNumber(0), + _timerIndex(timerIndex), + _allowBoost(allowBoost), + _allowSlowdown(allowSlowdown), + _requestPendingChannels(new ArrayFIFO), + _responsePendingChannels(new ArrayFIFO), + _timerNode(NULL), + _canceled(false), + _timeAtResponseCheck(0), + _requestPendingChannelsMutex(Mutex()), + _mutex(Mutex()) +{ + +} + +SearchTimer::~SearchTimer() +{ + if(_requestPendingChannels) delete _requestPendingChannels; + if(_responsePendingChannels) delete _responsePendingChannels; +} + +void SearchTimer::shutdown() +{ + Lock guard(&_mutex); //the whole method is locked + if(_canceled) return; + _canceled = true; + + { + Lock guard(&_requestPendingChannelsMutex); + _timerNode->cancel(); + + _requestPendingChannels->clear(); + _responsePendingChannels->clear(); + } +} + +void SearchTimer::installChannel(SearchInstance* channel) +{ + Lock guard(&_mutex); //the whole method is locked + if(_canceled) return; + + + Lock pendingChannelGuard(&_requestPendingChannelsMutex); + bool startImmediately = _requestPendingChannels->isEmpty(); + channel->addAndSetListOwnership(_requestPendingChannels, &_requestPendingChannelsMutex, _timerIndex); + + // start searching + if(startImmediately) + { + _timerNode->cancel(); + if(_timeAtResponseCheck == 0) + { + TimeStamp current; + current.getCurrent(); + _timeAtResponseCheck = current.getMilliseconds(); + } + + // start with some initial delay (to collect all installed requests) + _chanSearchManager->_context->getTimer()->scheduleAfterDelay(_timerNode, 0.01); + } +} + +void SearchTimer::moveChannels(SearchTimer* destination) +{ + // do not sync this, not necessary and might cause deadlock + SearchInstance* channel; + while((channel = _responsePendingChannels->pop()) != NULL) + { + { + Lock guard(&_mutex); + if(_searchAttempts > 0) + { + _searchAttempts--; + } + } + destination->installChannel(channel); + } + + // bulk move + Lock guard(&_requestPendingChannelsMutex); + while (!_requestPendingChannels->isEmpty()) + { + destination->installChannel(_requestPendingChannels->pop()); + } +} + +void SearchTimer::timerStopped() +{ + //noop +} + +void SearchTimer::callback() +{ + { + Lock guard(&_mutex); + if(_canceled) return; + } + + // if there was some success (no congestion) + // boost search period (if necessary) for channels not recently searched + int32 searchRespones; + { + Lock guard(&_mutex); + searchRespones = _searchRespones; + } + if(_allowBoost && searchRespones > 0) + { + Lock guard(&_requestPendingChannelsMutex); + while(!_requestPendingChannels->isEmpty()) + { + SearchInstance* channel = _requestPendingChannels->peek(); + // boost needed check + //final int boostIndex = searchRespones >= searchAttempts * SUCCESS_RATE ? Math.min(Math.max(0, timerIndex - 1), beaconAnomalyTimerIndex) : beaconAnomalyTimerIndex; + const int boostIndex = _chanSearchManager->_beaconAnomalyTimerIndex; + if(channel->getOwnerIndex() > boostIndex) + { + _requestPendingChannels->pop(); + channel->unsetListOwnership(); + _chanSearchManager->boostSearching(channel, boostIndex); + } + } + } + + SearchInstance* channel; + + // should we check results (installChannel trigger timer immediately) + TimeStamp current; + current.getCurrent(); + int64 now = current.getMilliseconds(); + if(now - _timeAtResponseCheck >= period()) + { + _timeAtResponseCheck = now; + + // notify about timeout (move it to other timer) + while((channel = _responsePendingChannels->pop()) != NULL) + { + if(_allowSlowdown) + { + channel->unsetListOwnership(); + _chanSearchManager->searchResponseTimeout(channel, _timerIndex); + } + else + { + channel->addAndSetListOwnership(_requestPendingChannels, &_requestPendingChannelsMutex, _timerIndex); + } + } + + int32 searchRespones,searchAttempts; + { + Lock guard(&_mutex); + searchAttempts = _searchAttempts; + searchRespones = _searchRespones; + } + // check search results + if(searchAttempts > 0) + { + // increase UDP frames per try if we have a good score + if(searchRespones >= searchAttempts * ChannelSearchManager::SUCCESS_RATE) + { + // increase frames per try + // a congestion avoidance threshold similar to TCP is now used + if(_framesPerTry < MAX_FRAMES_PER_TRY) + { + if(_framesPerTry < _framesPerTryCongestThresh) + { + _framesPerTry = min(2*_framesPerTry, _framesPerTryCongestThresh); + } + else + { + _framesPerTry += 1.0/_framesPerTry; + } + } + } + else + { + // decrease frames per try, fallback + _framesPerTryCongestThresh = _framesPerTry / 2.0; + _framesPerTry = 1; + } + + } + } + + + { + Lock guard(&_mutex); + _startSequenceNumber = _chanSearchManager->getSequenceNumber() + 1; + _searchAttempts = 0; + _searchRespones = 0; + } + + int32 framesSent = 0; + int32 triesInFrame = 0; + + // reschedule + bool canceled; + { + Lock guard(&_mutex); + canceled = _canceled; + } + + { + Lock guard(&_requestPendingChannelsMutex); + while (!canceled && (channel = _requestPendingChannels->pop()) != NULL) + { + channel->unsetListOwnership(); + + bool requestSent = true; + bool allowNewFrame = (framesSent+1) < _framesPerTry; + bool frameWasSent = _chanSearchManager->generateSearchRequestMessage(channel, allowNewFrame); + if(frameWasSent) + { + framesSent++; + triesInFrame = 0; + if(!allowNewFrame) + { + channel->addAndSetListOwnership(_requestPendingChannels, &_requestPendingChannelsMutex, _timerIndex); + requestSent = false; + } + else + { + triesInFrame++; + } + } + else + { + triesInFrame++; + } + + if(requestSent) + { + channel->addAndSetListOwnership(_responsePendingChannels, &_requestPendingChannelsMutex, _timerIndex); + Lock guard(&_mutex); + if(_searchAttempts < INT_MAX) + { + _searchAttempts++; + } + } + + // limit + if(triesInFrame == 0 && !allowNewFrame) break; + + Lock guard(&_mutex); + canceled = _canceled; + } + } + + // flush out the search request buffer + if(triesInFrame > 0) + { + _chanSearchManager->flushSendBuffer(); + framesSent++; + } + + _endSequenceNumber = _chanSearchManager->getSequenceNumber(); + + // reschedule + { + Lock guard(&_mutex); + canceled = _canceled; + } + Lock guard(&_requestPendingChannelsMutex); + if(!canceled && !_timerNode->isScheduled()) + { + bool someWorkToDo = (!_requestPendingChannels->isEmpty() || !_responsePendingChannels->isEmpty()); + if(someWorkToDo) + { + _chanSearchManager->_context->getTimer()->scheduleAfterDelay(_timerNode, period()/1000.0); + } + } +} + +void SearchTimer::searchResponse(int32 responseSequenceNumber, bool isSequenceNumberValid, int64 responseTime) +{ + bool validResponse = true; + { + Lock guard(&_mutex); + if(_canceled) return; + + if(isSequenceNumberValid) + { + validResponse = _startSequenceNumber <= _chanSearchManager->_sequenceNumber && _chanSearchManager->_sequenceNumber <= _endSequenceNumber; + } + } + + + // update RTTE + if(validResponse) + { + const int64 dt = responseTime - _chanSearchManager->getTimeAtLastSend(); + _chanSearchManager->updateRTTE(dt); + Lock guard(&_mutex); + if(_searchRespones < INT_MAX) + { + _searchRespones++; + + // all found, send new search requests immediately if necessary + if(_searchRespones == _searchAttempts) + { + if(_requestPendingChannels->size() > 0) + { + _timerNode->cancel(); + _chanSearchManager->_context->getTimer()->scheduleAfterDelay(_timerNode, 0.0); + } + } + } + } +} + +const int64 SearchTimer::period() +{ + return (int64) ((1 << _timerIndex) * _chanSearchManager->getRTTE()); +} + +const int64 ChannelSearchManager::MIN_RTT = 32; +const int64 ChannelSearchManager::MAX_RTT = 2 * ChannelSearchManager::MIN_RTT; +const double ChannelSearchManager::SUCCESS_RATE = 0.9; +const int64 ChannelSearchManager::MAX_SEARCH_PERIOD = 5 * 60000; +const int64 ChannelSearchManager::MAX_SEARCH_PERIOD_LOWER_LIMIT = 60000; +const int64 ChannelSearchManager::BEACON_ANOMALY_SEARCH_PERIOD = 5000; +const int32 ChannelSearchManager::MAX_TIMERS = 18; + +ChannelSearchManager::ChannelSearchManager(ClientContextImpl* context): + _context(context), + _canceled(false), + _rttmean(MIN_RTT), + _sequenceNumber(0) +{ + // create and initialize send buffer + _sendBuffer = new ByteBuffer(MAX_UDP_SEND); + initializeSendBuffer(); + + // TODO should be configurable + int64 maxPeriod = MAX_SEARCH_PERIOD; + + maxPeriod = min(maxPeriod, MAX_SEARCH_PERIOD_LOWER_LIMIT); + + // calculate number of timers to reach maxPeriod (each timer period is doubled) + double powerOfTwo = log(maxPeriod / (double)MIN_RTT) / log(2); + int32 numberOfTimers = (int32)(powerOfTwo + 1); + numberOfTimers = min(numberOfTimers, MAX_TIMERS); + + // calculate beacon anomaly timer index + powerOfTwo = log(BEACON_ANOMALY_SEARCH_PERIOD / (double)MIN_RTT) / log(2); + _beaconAnomalyTimerIndex = (int32)(powerOfTwo + 1); + _beaconAnomalyTimerIndex = min(_beaconAnomalyTimerIndex, numberOfTimers - 1); + + // create timers + _timers = new SearchTimer*[numberOfTimers]; + for(int i = 0; i < numberOfTimers; i++) + { + _timers[i] = new SearchTimer(this, i, i > _beaconAnomalyTimerIndex, i != (numberOfTimers-1)); + } + _numberOfTimers = numberOfTimers; +} + +ChannelSearchManager::~ChannelSearchManager() +{ + for(int i = 0; i < _numberOfTimers; i++) + { + if(_timers[i]) delete _timers[i]; + } + if(_timers) delete[] _timers; + if(_sendBuffer) delete _sendBuffer; +} + +void ChannelSearchManager::cancel() +{ + Lock guard(&_mutex); + if(_canceled) return; + + _canceled = true; + + if(_timers != NULL) + { + for(int i = 0; i < _numberOfTimers; i++) + { + _timers[i]->shutdown(); + } + } +} + +int32 ChannelSearchManager::registeredChannelCount() +{ + Lock guard(&_mutex); + return _channels.size(); +} + +void ChannelSearchManager::registerChannel(SearchInstance* channel) +{ + Lock guard(&_mutex); + if(_canceled) return; + + //overrides if already registered + _channels[channel->getChannelID()] = channel; + _timers[0]->installChannel(channel); +} + +void ChannelSearchManager::unregisterChannel(SearchInstance* channel) +{ + Lock guard(&_mutex); + _channelsIter = _channels.find(channel->getChannelID()); + if(_channelsIter != _channels.end()) + { + _channels.erase(channel->getChannelID()); + } + + channel->removeAndUnsetListOwnership(); +} + +void ChannelSearchManager::searchResponse(int32 cid, int32 seqNo, int8 minorRevision, osiSockAddr* serverAddress) +{ + Lock guard(&_mutex); + // first remove + SearchInstance* si = NULL; + _channelsIter = _channels.find(cid); + if(_channelsIter != _channels.end()) + { + SearchInstance* si = _channelsIter->second; + _channels.erase(_channelsIter); + si->removeAndUnsetListOwnership(); + } + else + { + // minor hack to enable duplicate reports + si = static_cast(_context->getChannel(cid)); + if(si != NULL) + { + si->searchResponse(minorRevision, serverAddress); + } + return; + } + + // report success + const int timerIndex = si->getOwnerIndex(); + TimeStamp now; + now.getCurrent(); + _timers[timerIndex]->searchResponse(seqNo, seqNo != 0, now.getMilliseconds()); + + // then notify SearchInstance + si->searchResponse(minorRevision, serverAddress); +} + +void ChannelSearchManager::beaconAnomalyNotify() +{ + for(int i = _beaconAnomalyTimerIndex + 1; i < _numberOfTimers; i++) + { + _timers[i]->moveChannels(_timers[_beaconAnomalyTimerIndex]); + } +} + +void ChannelSearchManager::initializeSendBuffer() +{ + Lock guard(&_mutex); + _sequenceNumber++; + + + // new buffer + _sendBuffer->clear(); + _sendBuffer->putShort(CA_MAGIC_AND_VERSION); + _sendBuffer->putByte((int8)0); // data + _sendBuffer->putByte((int8)3); // beacon + _sendBuffer->putInt(sizeof(int32)/sizeof(int8) + 1); // "zero" payload + _sendBuffer->putInt(_sequenceNumber); + + /* + final boolean REQUIRE_REPLY = false; + sendBuffer.put(REQUIRE_REPLY ? (byte)QoS.REPLY_REQUIRED.getMaskValue() : (byte)QoS.DEFAULT.getMaskValue()); + */ + + //TODO implement Qos + //_sendBuffer->put((int8)QoS.DEFAULT.getMaskValue()); + _sendBuffer->putShort((int16)0); // count +} + +void ChannelSearchManager::flushSendBuffer() +{ + Lock guard(&_mutex); + TimeStamp now; + now.getCurrent(); + _timeAtLastSend = now.getMilliseconds(); + //TODO + //_context->getSearchTransport()->send(sendBuffer); + initializeSendBuffer(); +} + +bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance* channel, bool allowNewFrame) +{ + Lock guard(&_mutex); + bool success = channel->generateSearchRequestMessage(_sendBuffer, _mockTransportSendControl); + // buffer full, flush + if(!success) + { + flushSendBuffer(); + if(allowNewFrame) + { + channel->generateSearchRequestMessage(_sendBuffer, _mockTransportSendControl); + } + return true; + } + return false; +} + +void ChannelSearchManager::searchResponseTimeout(SearchInstance* channel, int32 timerIndex) +{ + int32 newTimerIndex = min(++timerIndex, _numberOfTimers - 1); + _timers[newTimerIndex]->installChannel(channel); +} + +void ChannelSearchManager::boostSearching(SearchInstance* channel, int32 timerIndex) +{ + _timers[timerIndex]->installChannel(channel); +} + +inline void ChannelSearchManager::updateRTTE(long rtt) +{ + Lock guard(&_mutex); + const double error = rtt - _rttmean; + _rttmean += error / 4.0; +} + +inline double ChannelSearchManager::getRTTE() +{ + Lock guard(&_mutex); + double rtte = min(max((double)_rttmean, (double)MIN_RTT), (double)MAX_RTT); + return rtte; +} + +inline int32 ChannelSearchManager::getSequenceNumber() +{ + Lock guard(&_mutex); + int32 retval = _sequenceNumber; + return retval; +} + +inline int64 ChannelSearchManager::getTimeAtLastSend() +{ + Lock guard(&_mutex); + int64 retval = _timeAtLastSend; + return retval; +} + +}} + diff --git a/pvAccessApp/remote/channelSearchManager.h b/pvAccessApp/remote/channelSearchManager.h new file mode 100644 index 0000000..b58a9ef --- /dev/null +++ b/pvAccessApp/remote/channelSearchManager.h @@ -0,0 +1,393 @@ +/* + * channelSearchManager.h + */ + +#ifndef CHANNELSEARCHMANAGER_H +#define CHANNELSEARCHMANAGER_H + +#include "remote.h" +#include "pvAccess.h" +#include "arrayFIFO.h" +#include "caConstants.h" +#include "clientContextImpl.h" + +#include +#include +#include +#include + +#include +#include +#include + +using namespace epics::pvData; + +namespace epics { namespace pvAccess { + +typedef int32 pvAccessID; + +//TODO check the const of paramerers + +/** + * SearchInstance. + */ +//TODO document +class SearchInstance { +public: + virtual ~SearchInstance() {}; + virtual pvAccessID getChannelID() = 0; + virtual String getChannelName() = 0; + virtual void unsetListOwnership() = 0; + virtual void addAndSetListOwnership(ArrayFIFO* newOwner, Mutex* ownerMutex, int32 index) = 0; + virtual void removeAndUnsetListOwnership() = 0; + virtual int32 getOwnerIndex() = 0; + virtual bool generateSearchRequestMessage(ByteBuffer* requestMessage, TransportSendControl* control) = 0; + + /** + * Search response from server (channel found). + * @param minorRevision server minor CA revision. + * @param serverAddress server address. + */ + virtual void searchResponse(int8 minorRevision, osiSockAddr* serverAddress) = 0; +}; + +/** + * BaseSearchInstance. + */ +class BaseSearchInstance : public SearchInstance +{ +public: + virtual ~BaseSearchInstance() {}; + virtual pvAccessID getChannelID() = 0; + virtual string getChannelName() = 0; + virtual void unsetListOwnership(); + virtual void addAndSetListOwnership(ArrayFIFO* newOwner, Mutex* ownerMutex, int32 index); + virtual void removeAndUnsetListOwnership(); + virtual int32 getOwnerIndex(); + /** + * Send search message. + * @return success status. + */ + virtual bool generateSearchRequestMessage(ByteBuffer* requestMessage, TransportSendControl* control); +private: + Mutex _mutex; + ArrayFIFO* _owner; + Mutex* _ownerMutex; + int32 _ownerIndex; + + const static int DATA_COUNT_POSITION; + const static int PAYLOAD_POSITION; +}; + +class ChannelSearchManager; +/** + * SearchTimer. + */ +class SearchTimer: public TimerCallback +{ +public: + /** + * Constructor; + * @param timerIndex this timer instance index. + * @param allowBoost is boost allowed flag. + */ + SearchTimer(ChannelSearchManager* csmanager,int32 timerIndex, bool allowBoost, bool allowSlowdown); + /** + * Destructor. + */ + virtual ~SearchTimer(); + /** + * Shutdown this instance. + */ + void shutdown(); + /** + * Install channel. + * @param channel channel to be registered. + */ + void installChannel(SearchInstance* channel); + /** + * Move channels to other SearchTimer. + * @param destination where to move channels. + */ + void moveChannels(SearchTimer* destination); + /** + * @see TimerCallback#timerStopped() + */ + void timerStopped(); + /** + * @see TimerCallback#callback() + */ + void callback(); + /** + * Search response received notification. + * @param responseSequenceNumber sequence number of search frame which contained search request. + * @param isSequenceNumberValid valid flag of responseSequenceNumber. + * @param responseTime time of search response. + */ + void searchResponse(int32 responseSequenceNumber, bool isSequenceNumberValid, int64 responseTime); + /** + * Calculate search time period. + * @return search time period. + */ + const int64 period(); +private: + /** + * Instance of the channel search manager with which this search timer + * is associated. + */ + ChannelSearchManager* _chanSearchManager; + /** + * Number of search attempts in one frame. + */ + volatile int32 _searchAttempts; + /** + * Number of search responses in one frame. + */ + volatile int32 _searchRespones; + /** + * Number of frames per search try. + */ + double _framesPerTry; + /** + * Number of frames until congestion threshold is reached. + */ + double _framesPerTryCongestThresh; + /** + * Start sequence number (first frame number within a search try). + */ + volatile int32 _startSequenceNumber; + /** + * End sequence number (last frame number within a search try). + */ + volatile int32 _endSequenceNumber; + /** + * This timer index. + */ + const int32 _timerIndex; + /** + * Flag indicating whether boost is allowed. + */ + const bool _allowBoost; + /** + * Flag indicating whether slow-down is allowed (for last timer). + */ + const bool _allowSlowdown; + /** + * Ordered (as inserted) list of channels with search request pending. + */ + ArrayFIFO* _requestPendingChannels; + /** + * Ordered (as inserted) list of channels with search request pending. + */ + ArrayFIFO* _responsePendingChannels; + /** + * Timer node. + * (sync on requestPendingChannels) + */ + TimerNode* _timerNode; + /** + * Cancel this instance. + */ + volatile bool _canceled; + /** + * Time of last response check. + */ + int64 _timeAtResponseCheck; + /** + * Mutex for request pending channel list. + */ + Mutex _requestPendingChannelsMutex; + /** + * General mutex. + */ + Mutex _mutex; + /** + * Max search tries per frame. + */ + static const int32 MAX_FRAMES_PER_TRY; +}; + +class ChannelSearchManager +{ +public: + /** + * Constructor. + * @param context + */ + ChannelSearchManager(ClientContextImpl* context); + /** + * Constructor. + * @param context + */ + virtual ~ChannelSearchManager(); + /** + * Cancel. + */ + void cancel(); + /** + * Get number of registered channels. + * @return number of registered channels. + */ + int32 registeredChannelCount(); + /** + * Register channel. + * @param channel to register. + */ + void registerChannel(SearchInstance* channel); + /** + * Unregister channel. + * @param channel to unregister. + */ + void unregisterChannel(SearchInstance* channel); + /** + * Search response from server (channel found). + * @param cid client channel ID. + * @param seqNo search sequence number. + * @param minorRevision server minor CA revision. + * @param serverAddress server address. + */ + void searchResponse(int32 cid, int32 seqNo, int8 minorRevision, osiSockAddr* serverAddress); + /** + * Beacon anomaly detected. + * Boost searching of all channels. + */ + void beaconAnomalyNotify(); +private: + /** + * Minimal RTT (ms). + */ + static const int64 MIN_RTT; + /** + * Maximal RTT (ms). + */ + static const int64 MAX_RTT; + /** + * Rate to be considered as OK. + */ + static const double SUCCESS_RATE; + /** + * Context. + */ + ClientContextImpl* _context; + /** + * Canceled flag. + */ + volatile bool _canceled; + /** + * Round-trip time (RTT) mean. + */ + volatile double _rttmean; + /** + * Search timers array. + * Each timer with a greater index has longer (doubled) search period. + */ + SearchTimer** _timers; + /** + * Number of timers in timers array. + */ + int32 _numberOfTimers; + /** + * Index of a timer to be used when beacon anomaly is detected. + */ + int32 _beaconAnomalyTimerIndex; + /** + * Search (datagram) sequence number. + */ + volatile int32 _sequenceNumber; + /** + * Max search period (in ms). + */ + static const int64 MAX_SEARCH_PERIOD; + /** + * Max search period (in ms) - lower limit. + */ + static const int64 MAX_SEARCH_PERIOD_LOWER_LIMIT; + /** + * Beacon anomaly search period (in ms). + */ + static const int64 BEACON_ANOMALY_SEARCH_PERIOD; + /** + * Max number of timers. + */ + static const int32 MAX_TIMERS; + /** + * Send byte buffer (frame) + */ + ByteBuffer* _sendBuffer; + /** + * Time of last frame send. + */ + volatile int64 _timeAtLastSend; + /** + * Set of registered channels. + */ + std::map _channels; + /** + * Iterator for the set of registered channels. + */ + std::map::iterator _channelsIter; + /** + * General mutex. + */ + Mutex _mutex; + /** + * Mock transport send control + */ + TransportSendControl* _mockTransportSendControl; + /** + * SearchTimer is a friend. + */ + friend class SearchTimer; + /** + * Initialize send buffer. + */ + void initializeSendBuffer(); + /** + * Flush send buffer. + */ + void flushSendBuffer(); + /** + * Generate (put on send buffer) search request + * @param channel + * @param allowNewFrame flag indicating if new search request message is allowed to be put in new frame. + * @return true if new frame was sent. + */ + bool generateSearchRequestMessage(SearchInstance* channel, bool allowNewFrame); + /** + * Notify about search failure (response timeout). + * @param channel channel whose search failed. + * @param timerIndex index of timer which tries to search. + */ + void searchResponseTimeout(SearchInstance* channel, int32 timerIndex); + /** + * Boost searching of a channel. + * @param channel channel to boost searching. + * @param timerIndex to what timer-index to boost + */ + void boostSearching(SearchInstance* channel, int32 timerIndex); + /** + * Update (recalculate) round-trip estimate. + * @param rtt new sample of round-trip value. + */ + void updateRTTE(long rtt); + /** + * Get round-trip estimate (in ms). + * @return round-trip estimate (in ms). + */ + double getRTTE(); + /** + * Get search (UDP) frame sequence number. + * @return search (UDP) frame sequence number. + */ + int32 getSequenceNumber(); + /** + * Get time at last send (when sendBuffer was flushed). + * @return time at last send. + */ + int64 getTimeAtLastSend(); +}; + + +}} + +#endif /* CHANNELSEARCHMANAGER_H */ From 512bc514e69f36f80f25c485cf75fcee47cbb72c Mon Sep 17 00:00:00 2001 From: Gasper Jansa Date: Thu, 6 Jan 2011 17:12:35 +0100 Subject: [PATCH 2/3] Small fixes and empty channel search manager test --- pvAccessApp/Makefile | 2 + pvAccessApp/remote/channelSearchManager.cpp | 11 +- pvAccessApp/remote/channelSearchManager.h | 223 +++++++++++++++++++- pvAccessApp/utils/referenceCountingLock.cpp | 8 +- pvAccessApp/utils/referenceCountingLock.h | 2 +- testApp/remote/Makefile | 4 + 6 files changed, 237 insertions(+), 13 deletions(-) diff --git a/pvAccessApp/Makefile b/pvAccessApp/Makefile index b6071bd..fb1b425 100644 --- a/pvAccessApp/Makefile +++ b/pvAccessApp/Makefile @@ -53,6 +53,7 @@ INC += beaconEmitter.h INC += beaconServerStatusProvider.h INC += beaconHandler.h INC += blockingTCP.h +INC += channelSearchManager.h LIBSRCS += blockingUDPTransport.cpp LIBSRCS += blockingUDPConnector.cpp LIBSRCS += beaconEmitter.cpp @@ -63,6 +64,7 @@ LIBSRCS += blockingClientTCPTransport.cpp LIBSRCS += blockingTCPConnector.cpp LIBSRCS += blockingServerTCPTransport.cpp LIBSRCS += blockingTCPAcceptor.cpp +LIBSRCS += channelSearchManager.cpp LIBRARY = pvAccess pvAccess_LIBS += Com diff --git a/pvAccessApp/remote/channelSearchManager.cpp b/pvAccessApp/remote/channelSearchManager.cpp index 033ff00..1c6110c 100644 --- a/pvAccessApp/remote/channelSearchManager.cpp +++ b/pvAccessApp/remote/channelSearchManager.cpp @@ -440,6 +440,8 @@ ChannelSearchManager::ChannelSearchManager(ClientContextImpl* context): _timers[i] = new SearchTimer(this, i, i > _beaconAnomalyTimerIndex, i != (numberOfTimers-1)); } _numberOfTimers = numberOfTimers; + + _mockTransportSendControl = new MockTransportSendControl(); } ChannelSearchManager::~ChannelSearchManager() @@ -450,6 +452,7 @@ ChannelSearchManager::~ChannelSearchManager() } if(_timers) delete[] _timers; if(_sendBuffer) delete _sendBuffer; + if(_mockTransportSendControl) delete _mockTransportSendControl; } void ChannelSearchManager::cancel() @@ -511,7 +514,7 @@ void ChannelSearchManager::searchResponse(int32 cid, int32 seqNo, int8 minorRevi else { // minor hack to enable duplicate reports - si = static_cast(_context->getChannel(cid)); + si = reinterpret_cast(_context->getChannel(cid)); if(si != NULL) { si->searchResponse(minorRevision, serverAddress); @@ -556,8 +559,7 @@ void ChannelSearchManager::initializeSendBuffer() sendBuffer.put(REQUIRE_REPLY ? (byte)QoS.REPLY_REQUIRED.getMaskValue() : (byte)QoS.DEFAULT.getMaskValue()); */ - //TODO implement Qos - //_sendBuffer->put((int8)QoS.DEFAULT.getMaskValue()); + _sendBuffer->putByte((int8)DEFAULT); _sendBuffer->putShort((int16)0); // count } @@ -567,8 +569,7 @@ void ChannelSearchManager::flushSendBuffer() TimeStamp now; now.getCurrent(); _timeAtLastSend = now.getMilliseconds(); - //TODO - //_context->getSearchTransport()->send(sendBuffer); + _context->getSearchTransport()->send(_sendBuffer); initializeSendBuffer(); } diff --git a/pvAccessApp/remote/channelSearchManager.h b/pvAccessApp/remote/channelSearchManager.h index b58a9ef..b180e71 100644 --- a/pvAccessApp/remote/channelSearchManager.h +++ b/pvAccessApp/remote/channelSearchManager.h @@ -9,7 +9,7 @@ #include "pvAccess.h" #include "arrayFIFO.h" #include "caConstants.h" -#include "clientContextImpl.h" +#include "blockingUDP.h" #include #include @@ -26,6 +26,212 @@ namespace epics { namespace pvAccess { typedef int32 pvAccessID; +enum QoS { + /** + * Default behavior. + */ + DEFAULT = 0x00, + /** + * Require reply (acknowledgment for reliable operation). + */ + REPLY_REQUIRED = 0x01, + /** + * Best-effort option (no reply). + */ + BESY_EFFORT = 0x02, + /** + * Process option. + */ + PROCESS = 0x04, + /** + * Initialize option. + */ + INIT = 0x08, + /** + * Destroy option. + */ + DESTROY = 0x10, + /** + * Share data option. + */ + SHARE = 0x20, + /** + * Get. + */ + GET = 0x40, + /** + * Get-put. + */ + GET_PUT =0x80 +}; + + +//TODO this will be deleted +class ChannelImpl; +class ChannelSearchManager; +class ClientContextImpl : public ClientContext +{ + public: + + ClientContextImpl() + { + + } + + virtual Version* getVersion() { + return NULL; + } + + virtual ChannelProvider* getProvider() { + return NULL; + } + + Timer* getTimer() + { + return NULL; + } + + virtual void initialize() { + + } + + virtual void printInfo() { + + } + + virtual void printInfo(epics::pvData::StringBuilder out) { + + } + + virtual void destroy() + { + + } + + virtual void dispose() + { + + } + + BlockingUDPTransport* getSearchTransport() + { + return NULL; + } + + /** + * Searches for a channel with given channel ID. + * @param channelID CID. + * @return channel with given CID, 0 if non-existent. + */ + ChannelImpl* getChannel(pvAccessID channelID) + { + return NULL; + } + + + private: + ~ClientContextImpl() {}; + + void loadConfiguration() { + + } + + void internalInitialize() { + + + } + + void initializeUDPTransport() { + + } + + void internalDestroy() { + + } + + void destroyAllChannels() { + + } + + /** + * Check channel name. + */ + void checkChannelName(String& name) { + + } + + /** + * Check context state and tries to establish necessary state. + */ + void checkState() { + + } + + + + /** + * Generate Client channel ID (CID). + * @return Client channel ID (CID). + */ + pvAccessID generateCID() + { + return 0; + } + + /** + * Free generated channel ID (CID). + */ + void freeCID(int cid) + { + + } + + + /** + * Get, or create if necessary, transport of given server address. + * @param serverAddress required transport address + * @param priority process priority. + * @return transport for given address + */ + Transport* getTransport(TransportClient* client, osiSockAddr* serverAddress, int16 minorRevision, int16 priority) + { + + return NULL; + } + + /** + * Internal create channel. + */ + // TODO no minor version with the addresses + // TODO what if there is an channel with the same name, but on different host! + Channel* createChannelInternal(String name, ChannelRequester* requester, short priority, + InetAddrVector* addresses) { + return NULL; + } + + /** + * Destroy channel. + * @param channel + * @param force + * @throws CAException + * @throws IllegalStateException + */ + void destroyChannel(ChannelImpl* channel, bool force) { + + + } + + /** + * Get channel search manager. + * @return channel search manager. + */ + ChannelSearchManager* getChannelSearchManager() { + return NULL; + } +}; + + + //TODO check the const of paramerers /** @@ -207,6 +413,18 @@ private: static const int32 MAX_FRAMES_PER_TRY; }; +class MockTransportSendControl: public TransportSendControl +{ +public: + void endMessage() {} + void flush(bool lastMessageCompleted) {} + void setRecipient(const osiSockAddr* sendTo) {} + void startMessage(int8 command, int32 ensureCapacity) {} + void ensureBuffer(int32 size) {} + void flushSerializeBuffer() {} +}; + + class ChannelSearchManager { public: @@ -333,7 +551,7 @@ private: /** * Mock transport send control */ - TransportSendControl* _mockTransportSendControl; + MockTransportSendControl* _mockTransportSendControl; /** * SearchTimer is a friend. */ @@ -387,7 +605,6 @@ private: int64 getTimeAtLastSend(); }; - }} #endif /* CHANNELSEARCHMANAGER_H */ diff --git a/pvAccessApp/utils/referenceCountingLock.cpp b/pvAccessApp/utils/referenceCountingLock.cpp index abd2250..5699502 100644 --- a/pvAccessApp/utils/referenceCountingLock.cpp +++ b/pvAccessApp/utils/referenceCountingLock.cpp @@ -13,7 +13,7 @@ ReferenceCountingLock::ReferenceCountingLock(): _references(1) if(retval != 0) { //string errMsg = "Error: pthread_mutexattr_init failed: " + string(strerror(retval)); - assert(true); + assert(false); } retval = pthread_mutexattr_settype(&mutexAttribute, PTHREAD_MUTEX_RECURSIVE); if(retval == 0) @@ -22,13 +22,13 @@ ReferenceCountingLock::ReferenceCountingLock(): _references(1) if(retval != 0) { //string errMsg = "Error: pthread_mutex_init failed: " + string(strerror(retval)); - assert(true); + assert(false); } } else { //string errMsg = "Error: pthread_mutexattr_settype failed: " + string(strerror(retval)); - assert(true); + assert(false); } pthread_mutexattr_destroy(&mutexAttribute); @@ -58,7 +58,7 @@ void ReferenceCountingLock::release() int retval = pthread_mutex_unlock(&_mutex); if(retval != 0) { - string errMsg = "Error: pthread_mutex_unlock failed: " + string(strerror(retval)); + //string errMsg = "Error: pthread_mutex_unlock failed: " + string(strerror(retval)); //TODO do something? } } diff --git a/pvAccessApp/utils/referenceCountingLock.h b/pvAccessApp/utils/referenceCountingLock.h index 2825cae..4153b50 100644 --- a/pvAccessApp/utils/referenceCountingLock.h +++ b/pvAccessApp/utils/referenceCountingLock.h @@ -71,4 +71,4 @@ private: }} -#endif /* NAMEDLOCKPATTERN_H */ +#endif /* REFERENCECOUNTINGLOCK_H */ diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile index 0ddd4f4..620b69c 100644 --- a/testApp/remote/Makefile +++ b/testApp/remote/Makefile @@ -22,6 +22,10 @@ PROD_HOST += testBeaconHandler testBeaconHandler_SRCS += testBeaconHandler.cpp testBeaconHandler_LIBS += pvData pvAccess Com +PROD_HOST += testChannelSearchManager +testChannelSearchManager_SRCS += testChannelSearchManager.cpp +testChannelSearchManager_LIBS += pvData pvAccess Com + include $(TOP)/configure/RULES #---------------------------------------- # ADD RULES AFTER THIS LINE From 570781576776d1fdbe8a5b3cf3e68463f4c4b251 Mon Sep 17 00:00:00 2001 From: Gasper Jansa Date: Thu, 6 Jan 2011 17:19:20 +0100 Subject: [PATCH 3/3] empty test file for channel search manager for real now :) --- testApp/remote/testChannelSearchManager.cpp | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 testApp/remote/testChannelSearchManager.cpp diff --git a/testApp/remote/testChannelSearchManager.cpp b/testApp/remote/testChannelSearchManager.cpp new file mode 100644 index 0000000..780cda2 --- /dev/null +++ b/testApp/remote/testChannelSearchManager.cpp @@ -0,0 +1,18 @@ +/* testChannelSearcManager.cpp */ + +#include + +using namespace epics::pvData; +using namespace epics::pvAccess; + + + +int main(int argc,char *argv[]) +{ + ClientContextImpl* context = new ClientContextImpl(); + ChannelSearchManager* manager = new ChannelSearchManager(context); + + context->destroy(); + getShowConstructDestruct()->constuctDestructTotals(stdout); + return(0); +}