diff --git a/pvAccessApp/Makefile b/pvAccessApp/Makefile index 757c567..23831e4 100644 --- a/pvAccessApp/Makefile +++ b/pvAccessApp/Makefile @@ -32,6 +32,34 @@ SRC_DIRS += $(PVACCESS)/client INC += pvAccess.h LIBSRCS += pvAccess.cpp +SRC_DIRS += $(PVACCESS)/factory +LIBSRCS += ChannelAccessFactory.cpp +LIBSRCS += CreateRequestFactory.cpp + +SRC_DIRS += $(PVACCESS)/remote +INC += remote.h +INC += blockingUDP.h +INC += beaconHandler.h +INC += blockingTCP.h +INC += channelSearchManager.h +INC += simpleChannelSearchManagerImpl.h +INC += transportRegistry.h +LIBSRCS += blockingUDPTransport.cpp +LIBSRCS += blockingUDPConnector.cpp +LIBSRCS += beaconHandler.cpp +LIBSRCS += blockingTCPTransport.cpp +LIBSRCS += blockingClientTCPTransport.cpp +LIBSRCS += blockingTCPConnector.cpp +LIBSRCS += blockingServerTCPTransport.cpp +LIBSRCS += simpleChannelSearchManagerImpl.cpp +LIBSRCS += abstractResponseHandler.cpp +LIBSRCS += blockingTCPAcceptor.cpp +LIBSRCS += transportRegistry.cpp + +SRC_DIRS += $(PVACCESS)/remoteClient +INC += clientContextImpl.h +LIBSRCS += clientContextImpl.cpp + SRC_DIRS += $(PVACCESS)/server INC += serverContext.h INC += responseHandlers.h @@ -46,35 +74,6 @@ LIBSRCS += baseChannelRequester.cpp LIBSRCS += beaconEmitter.cpp LIBSRCS += beaconServerStatusProvider.cpp -SRC_DIRS += $(PVACCESS)/factory -LIBSRCS += ChannelAccessFactory.cpp -LIBSRCS += CreateRequestFactory.cpp - - -SRC_DIRS += $(PVACCESS)/remote -INC += remote.h -INC += blockingUDP.h -INC += beaconHandler.h -INC += blockingTCP.h -INC += channelSearchManager.h -INC += transportRegistry.h -LIBSRCS += blockingUDPTransport.cpp -LIBSRCS += blockingUDPConnector.cpp -LIBSRCS += beaconHandler.cpp -LIBSRCS += blockingTCPTransport.cpp -LIBSRCS += blockingClientTCPTransport.cpp -LIBSRCS += blockingTCPConnector.cpp -LIBSRCS += blockingServerTCPTransport.cpp -LIBSRCS += channelSearchManager.cpp -LIBSRCS += abstractResponseHandler.cpp -LIBSRCS += blockingTCPAcceptor.cpp -LIBSRCS += transportRegistry.cpp - -SRC_DIRS += $(PVACCESS)/remoteClient -INC += clientContextImpl.h -LIBSRCS += clientContextImpl.cpp - - LIBRARY = pvAccess pvAccess_LIBS += Com pvAccess_LIBS += pvData diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index 81855e4..f33eb70 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -124,17 +124,20 @@ namespace pvAccess { // TODO minor tweak: deque size is not preallocated... - _socketBuffer = new ByteBuffer(max((int)(MAX_TCP_RECV+MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize)); + unsigned int bufferSize = max((int)(MAX_TCP_RECV+MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize); + // size must be "aligned" + bufferSize = (bufferSize + (CA_ALIGNMENT - 1)) & (~(CA_ALIGNMENT - 1)); + + _socketBuffer = new ByteBuffer(bufferSize); _socketBuffer->setPosition(_socketBuffer->getLimit()); _startPosition = _socketBuffer->getPosition(); // allocate buffer - _sendBuffer = new ByteBuffer(_socketBuffer->getSize()); + _sendBuffer = new ByteBuffer(bufferSize); _maxPayloadSize = _sendBuffer->getSize() - 2*CA_MESSAGE_HEADER_SIZE; // one for header, one for flow control - // get send buffer size + // get TCP send buffer size osiSocklen_t intLen = sizeof(int); - int retval = getsockopt(_channel, SOL_SOCKET, SO_SNDBUF, (char *)&_socketSendBufferSize, &intLen); if(unlikely(retval<0)) { _socketSendBufferSize = MAX_TCP_RECV; @@ -145,6 +148,7 @@ namespace pvAccess { errStr); } + // get remote address osiSocklen_t saSize = sizeof(sockaddr); retval = getpeername(_channel, &(_socketAddress.sa), &saSize); if(unlikely(retval<0)) { @@ -606,7 +610,7 @@ namespace pvAccess { // control // marker request sent - if (_command == 0) { + if (_command == CMD_SET_MARKER) { _flowControlMutex.lock(); if(_markerToSend==0) _markerToSend = _payloadSize; @@ -615,7 +619,7 @@ namespace pvAccess { } // marker received back - else if (_command == 1) + else if (_command == CMD_ACK_MARKER) { _flowControlMutex.lock(); int difference = (int)_totalBytesSent-_payloadSize+CA_MESSAGE_HEADER_SIZE; @@ -629,7 +633,7 @@ namespace pvAccess { _flowControlMutex.unlock(); } // set byte order - else if (_command == 2) + else if (_command == CMD_SET_ENDIANESS) { // check 7-th bit @@ -642,8 +646,7 @@ namespace pvAccess { _sendBuffer->setEndianess(endianess); _byteOrderFlag = (endianess == EPICS_ENDIAN_BIG) ? 0x80 : 0x00; _sendQueueMutex.unlock(); - } - + } // no payload //stage = ReceiveStage.PROCESS_HEADER; diff --git a/pvAccessApp/remote/channelSearchManager.cpp b/pvAccessApp/remote/channelSearchManager.cpp deleted file mode 100644 index 909f3d3..0000000 --- a/pvAccessApp/remote/channelSearchManager.cpp +++ /dev/null @@ -1,736 +0,0 @@ -/** - * Copyright - See the COPYRIGHT that is included with this distribution. - * pvAccessCPP is distributed subject to a Software License Agreement found - * in file LICENSE that is included with this distribution. - */ - -#include - -using namespace std; -using namespace epics::pvData; - -namespace epics { namespace pvAccess { - -const int BaseSearchInstance::DATA_COUNT_POSITION = CA_MESSAGE_HEADER_SIZE + sizeof(int32)/sizeof(int8) + 1; -const int BaseSearchInstance::PAYLOAD_POSITION = sizeof(int16)/sizeof(int8) + 2; - -void BaseSearchInstance::initializeSearchInstance() -{ - _owner = NULL; - _ownerMutex = NULL; - _ownerIndex = -1; - } - -void BaseSearchInstance::unsetListOwnership() -{ - Lock guard(_mutex); - //if (_owner != NULL) this->release(); - _owner = NULL; -} - -void BaseSearchInstance::addAndSetListOwnership(SearchInstance::List* newOwner, Mutex* ownerMutex, int32 index) -{ - if(ownerMutex == NULL) THROW_BASE_EXCEPTION("Null owner mutex"); - - Lock ownerGuard(*ownerMutex); - _ownerMutex = ownerMutex; - - Lock guard(_mutex); - newOwner->push_back(this); - //if (_owner == NULL) this->acquire(); // new owner - _owner = newOwner; - _ownerIndex = index; -} - -void BaseSearchInstance::removeAndUnsetListOwnership() -{ - Mutex * localOm; - SearchInstance::List* localOwner; - BaseSearchInstance *_this; - { - Lock guard(_mutex); - if(_owner == NULL) return; - - if(_ownerMutex == NULL) THROW_BASE_EXCEPTION("Null owner mutex"); - localOm=_ownerMutex; - localOwner=_owner; - _owner = NULL; - _ownerMutex = NULL; - _this=this; - } - - Lock ownerGuard(*localOm); - for (SearchInstance::List::iterator iter = localOwner->begin(); iter != localOwner->end(); iter++) { - if (*iter == _this) - { - localOwner->erase(iter); - break; - } - } -} - -int32 BaseSearchInstance::getOwnerIndex() -{ - Lock guard(_mutex); - int32 retval = _ownerIndex; - return retval; -} - -bool BaseSearchInstance::generateSearchRequestMessage(ByteBuffer* requestMessage, TransportSendControl* control) -{ - int16 dataCount = requestMessage->getShort(DATA_COUNT_POSITION); - - dataCount++; - /* - if(dataCount >= MAX_SEARCH_BATCH_COUNT) - { - return false; - } - */ - - const String name = getSearchInstanceName(); - // not nice... - const int addedPayloadSize = sizeof(int32)/sizeof(int8) + (1 + sizeof(int32)/sizeof(int8) + name.length()); - - if(((int)requestMessage->getRemaining()) < addedPayloadSize) - { - return false; - } - - requestMessage->putInt(getSearchInstanceID()); - SerializeHelper::serializeString(name, requestMessage, control); - - requestMessage->putInt(PAYLOAD_POSITION, requestMessage->getPosition() - CA_MESSAGE_HEADER_SIZE); - requestMessage->putShort(DATA_COUNT_POSITION, dataCount); - return true; -} - -const int32 SearchTimer::MAX_FRAMES_PER_TRY = 64; - -SearchTimer::SearchTimer(ChannelSearchManager* _chanSearchManager, int32 timerIndex, bool allowBoost, bool allowSlowdown): - _chanSearchManager(_chanSearchManager), - _searchAttempts(0), - _searchRespones(0), - _framesPerTry(1), - _framesPerTryCongestThresh(DBL_MAX), - _startSequenceNumber(0), - _endSequenceNumber(0), - _timerIndex(timerIndex), - _allowBoost(allowBoost), - _allowSlowdown(allowSlowdown), - _requestPendingChannels(new SearchInstance::List()), - _responsePendingChannels(new SearchInstance::List()), - _timerNode(new TimerNode(*this)), - _canceled(false), - _timeAtResponseCheck(0) -{ - -} - -SearchTimer::~SearchTimer() -{ - if(_requestPendingChannels) delete _requestPendingChannels; - if(_responsePendingChannels) delete _responsePendingChannels; - if(_timerNode) delete _timerNode; -} - -void SearchTimer::shutdown() -{ - Lock guard(_mutex); //the whole method is locked - - { - Lock guard(_volMutex); - if(_canceled) return; - _canceled = true; - } - - { - Lock guard(_requestPendingChannelsMutex); - _timerNode->cancel(); - - _requestPendingChannels->clear(); - _responsePendingChannels->clear(); - } -} - -void SearchTimer::installChannel(SearchInstance* channel) -{ - Lock guard(_mutex); //the whole method is locked - if(_canceled) return; - - Lock pendingChannelGuard(_requestPendingChannelsMutex); - bool startImmediately = _requestPendingChannels->empty(); - channel->addAndSetListOwnership(_requestPendingChannels, &_requestPendingChannelsMutex, _timerIndex); - - // start searching - if(startImmediately) - { - _timerNode->cancel(); - if(_timeAtResponseCheck == 0) - { - TimeStamp current; - current.getCurrent(); - _timeAtResponseCheck = current.getMilliseconds(); - } - - // start with some initial delay (to collect all installed requests) - _chanSearchManager->_context->getTimer()->scheduleAfterDelay(*_timerNode, 0.01); - } -} - -void SearchTimer::moveChannels(SearchTimer* destination) -{ - // do not sync this, not necessary and might cause deadlock - while(!_responsePendingChannels->empty()) - { - SearchInstance* channel = _responsePendingChannels->front(); - _responsePendingChannels->pop_front(); - - { - Lock guard(_volMutex); - if(_searchAttempts > 0) - { - _searchAttempts--; - } - } - destination->installChannel(channel); - } - - // bulk move - Lock guard(_requestPendingChannelsMutex); - while (!_requestPendingChannels->empty()) - { - SearchInstance* channel = _requestPendingChannels->front(); - _requestPendingChannels->pop_front(); - - destination->installChannel(channel); - } -} - -void SearchTimer::timerStopped() -{ - //noop -} - -void SearchTimer::callback() -{ - { - Lock guard(_volMutex); - if(_canceled) return; - } - - // if there was some success (no congestion) - // boost search period (if necessary) for channels not recently searched - int32 searchRespones; - { - Lock guard(_volMutex); - searchRespones = _searchRespones; - } - if(_allowBoost && searchRespones > 0) - { - Lock guard(_requestPendingChannelsMutex); - while(!_requestPendingChannels->empty()) - { - SearchInstance* channel = _requestPendingChannels->front(); - // boost needed check - //final int boostIndex = searchRespones >= searchAttempts * SUCCESS_RATE ? Math.min(Math.max(0, timerIndex - 1), beaconAnomalyTimerIndex) : beaconAnomalyTimerIndex; - const int boostIndex = _chanSearchManager->_beaconAnomalyTimerIndex; - if(channel->getOwnerIndex() > boostIndex) - { - _requestPendingChannels->pop_front(); - //channel->acquire(); - channel->unsetListOwnership(); - _chanSearchManager->boostSearching(channel, boostIndex); - //channel->release(); - } - } - } - - SearchInstance* channel; - - // should we check results (installChannel trigger timer immediately) - TimeStamp current; - current.getCurrent(); - int64 now = current.getMilliseconds(); - if(now - _timeAtResponseCheck >= period()) - { - _timeAtResponseCheck = now; - - // notify about timeout (move it to other timer) - while(!_responsePendingChannels->empty()) - { - channel = _responsePendingChannels->front(); - _responsePendingChannels->pop_front(); - - if(_allowSlowdown) - { - //channel->acquire(); - channel->unsetListOwnership(); - _chanSearchManager->searchResponseTimeout(channel, _timerIndex); - //channel->release(); - } - else - { - channel->addAndSetListOwnership(_requestPendingChannels, &_requestPendingChannelsMutex, _timerIndex); - } - } - - int32 searchRespones,searchAttempts; - { - Lock guard(_volMutex); - searchAttempts = _searchAttempts; - searchRespones = _searchRespones; - } - // check search results - if(searchAttempts > 0) - { - // increase UDP frames per try if we have a good score - if(searchRespones >= searchAttempts * ChannelSearchManager::SUCCESS_RATE) - { - // increase frames per try - // a congestion avoidance threshold similar to TCP is now used - if(_framesPerTry < MAX_FRAMES_PER_TRY) - { - if(_framesPerTry < _framesPerTryCongestThresh) - { - _framesPerTry = min(2*_framesPerTry, _framesPerTryCongestThresh); - } - else - { - _framesPerTry += 1.0/_framesPerTry; - } - } - } - else - { - // decrease frames per try, fallback - _framesPerTryCongestThresh = _framesPerTry / 2.0; - _framesPerTry = 1; - } - - } - } - - - { - Lock guard(_volMutex); - _startSequenceNumber = _chanSearchManager->getSequenceNumber(); - _searchAttempts = 0; - _searchRespones = 0; - } - - int32 framesSent = 0; - int32 triesInFrame = 0; - - // reschedule - bool canceled; - { - Lock guard(_volMutex); - canceled = _canceled; - } - - - { - Lock guard(_requestPendingChannelsMutex); - if (_requestPendingChannels->empty()) - channel = 0; - else { - channel = _requestPendingChannels->front(); - _requestPendingChannels->pop_front(); - } - - } - while (!canceled && channel != NULL) - { - //channel->acquire(); - channel->unsetListOwnership(); - - bool requestSent = true; - bool allowNewFrame = (framesSent+1) < _framesPerTry; - - { - Lock guard(_volMutex); - _endSequenceNumber = _chanSearchManager->getSequenceNumber() + 1; - } - - bool frameWasSent = _chanSearchManager->generateSearchRequestMessage(channel, allowNewFrame); - if(frameWasSent) - { - framesSent++; - triesInFrame = 0; - if(!allowNewFrame) - { - channel->addAndSetListOwnership(_requestPendingChannels, &_requestPendingChannelsMutex, _timerIndex); - requestSent = false; - } - else - { - triesInFrame++; - } - } - else - { - triesInFrame++; - } - - if(requestSent) - { - channel->addAndSetListOwnership(_responsePendingChannels, &_responsePendingChannelsMutex, _timerIndex); - Lock guard(_volMutex); - if(_searchAttempts < INT_MAX) - { - _searchAttempts++; - } - } - - //channel->release(); - - // limit - if(triesInFrame == 0 && !allowNewFrame) break; - - { - Lock guard(_volMutex); - canceled = _canceled; - } - - { - Lock guard(_requestPendingChannelsMutex); - if (_requestPendingChannels->empty()) - channel = 0; - else { - channel = _requestPendingChannels->front(); - _requestPendingChannels->pop_front(); - } - } - } - - - // flush out the search request buffer - if(triesInFrame > 0) - { - - { - Lock guard(_volMutex); - _endSequenceNumber = _chanSearchManager->getSequenceNumber() + 1; - } - - _chanSearchManager->flushSendBuffer(); - framesSent++; - } - - - { - Lock guard(_volMutex); - _endSequenceNumber = _chanSearchManager->getSequenceNumber(); - //printf("[%d] sn %d -> %d\n", _timerIndex, _startSequenceNumber, _endSequenceNumber); - - // reschedule - canceled = _canceled; - } - Lock guard(_requestPendingChannelsMutex); - if(!canceled && !_timerNode->isScheduled()) - { - bool someWorkToDo = (!_requestPendingChannels->empty() || !_responsePendingChannels->empty()); - if(someWorkToDo) - { - _chanSearchManager->_context->getTimer()->scheduleAfterDelay(*_timerNode, period()/1000.0); - } - } -} - -void SearchTimer::searchResponse(int32 responseSequenceNumber, bool isSequenceNumberValid, int64 responseTime) -{ - bool validResponse = true; - { - Lock guard(_volMutex); - if(_canceled) return; - - if(isSequenceNumberValid) - { - validResponse = _startSequenceNumber <= responseSequenceNumber && responseSequenceNumber <= _endSequenceNumber; - } - //if (!validResponse) - // printf("[%d] not valid response %d < %d < %d\n", _timerIndex, _startSequenceNumber,responseSequenceNumber, _endSequenceNumber); - } - - - // update RTTE - if(validResponse) - { - const int64 dt = responseTime - _chanSearchManager->getTimeAtLastSend(); - _chanSearchManager->updateRTTE(dt); - Lock guard(_volMutex); - if(_searchRespones < INT_MAX) - { - _searchRespones++; - - // all found, send new search requests immediately if necessary - if(_searchRespones == _searchAttempts) - { - if(_requestPendingChannels->size() > 0) - { - _timerNode->cancel(); - _chanSearchManager->_context->getTimer()->scheduleAfterDelay(*_timerNode, 0.0); - } - } - } - } -} - -const int64 SearchTimer::period() -{ - return (int64) ((1 << _timerIndex) * _chanSearchManager->getRTTE()); -} - -const int64 ChannelSearchManager::MIN_RTT = 32; -const int64 ChannelSearchManager::MAX_RTT = 2 * ChannelSearchManager::MIN_RTT; -const double ChannelSearchManager::SUCCESS_RATE = 0.9; -const int64 ChannelSearchManager::MAX_SEARCH_PERIOD = 5 * 60000; -const int64 ChannelSearchManager::MAX_SEARCH_PERIOD_LOWER_LIMIT = 60000; -const int64 ChannelSearchManager::BEACON_ANOMALY_SEARCH_PERIOD = 5000; -const int32 ChannelSearchManager::MAX_TIMERS = 18; - -ChannelSearchManager::ChannelSearchManager(Context* context): - _context(context), - _canceled(false), - _rttmean(MIN_RTT), - _sequenceNumber(0) -{ - // create and initialize send buffer - _sendBuffer = new ByteBuffer(MAX_UDP_SEND); - initializeSendBuffer(); - - // TODO should be configurable - int64 maxPeriod = MAX_SEARCH_PERIOD; - - maxPeriod = min(maxPeriod, MAX_SEARCH_PERIOD_LOWER_LIMIT); - - // calculate number of timers to reach maxPeriod (each timer period is doubled) - double powerOfTwo = log(maxPeriod / (double)MIN_RTT) / log(2.0); - int32 numberOfTimers = (int32)(powerOfTwo + 1); - numberOfTimers = min(numberOfTimers, MAX_TIMERS); - - // calculate beacon anomaly timer index - powerOfTwo = log(BEACON_ANOMALY_SEARCH_PERIOD / (double)MIN_RTT) / log(2.0); - _beaconAnomalyTimerIndex = (int32)(powerOfTwo + 1); - _beaconAnomalyTimerIndex = min(_beaconAnomalyTimerIndex, numberOfTimers - 1); - - // create timers - _timers = new SearchTimer*[numberOfTimers]; - for(int32 i = 0; i < numberOfTimers; i++) - { - _timers[i] = new SearchTimer(this, i, i > _beaconAnomalyTimerIndex, i != (numberOfTimers-1)); - } - _numberOfTimers = numberOfTimers; - - _mockTransportSendControl = new MockTransportSendControl(); -} - -ChannelSearchManager::~ChannelSearchManager() -{ - for(int32 i = 0; i < _numberOfTimers; i++) - { - if(_timers[i]) delete _timers[i]; - } - if(_timers) delete[] _timers; - if(_sendBuffer) delete _sendBuffer; - if(_mockTransportSendControl) delete _mockTransportSendControl; -} - -void ChannelSearchManager::cancel() -{ - Lock guard(_mutex); - - { - Lock guard(_volMutex); - if(_canceled) return; - - _canceled = true; - } - - if(_timers != NULL) - { - for(int i = 0; i < _numberOfTimers; i++) - { - _timers[i]->shutdown(); - } - } -} - -int32 ChannelSearchManager::registeredChannelCount() -{ - Lock guard(_channelMutex); - return _channels.size(); -} - -void ChannelSearchManager::registerChannel(SearchInstance* channel) -{ - { - Lock guard(_volMutex); - if(_canceled) return; - } - - Lock guard(_channelMutex); - //overrides if already registered - _channels[channel->getSearchInstanceID()] = channel; - _timers[0]->installChannel(channel); -} - -void ChannelSearchManager::unregisterChannel(SearchInstance* channel) -{ - Lock guard(_channelMutex); - _channelsIter = _channels.find(channel->getSearchInstanceID()); - if(_channelsIter != _channels.end()) - { - _channels.erase(channel->getSearchInstanceID()); - } - - channel->removeAndUnsetListOwnership(); -} - -void ChannelSearchManager::searchResponse(int32 cid, int32 seqNo, int8 minorRevision, osiSockAddr* serverAddress) -{ - Lock guard(_channelMutex); - - // first remove - SearchInstance* si = NULL; - _channelsIter = _channels.find(cid); - if(_channelsIter != _channels.end()) - { - si = _channelsIter->second; - _channels.erase(_channelsIter); - //si->acquire(); - si->removeAndUnsetListOwnership(); - - // report success - const int timerIndex = si->getOwnerIndex(); - TimeStamp now; - now.getCurrent(); - _timers[timerIndex]->searchResponse(seqNo, seqNo != 0, now.getMilliseconds()); - - guard.unlock(); - - // then noftify SearchInstance - si->searchResponse(minorRevision, serverAddress); - //si->release(); - } - else - { - guard.unlock(); - - // minor hack to enable duplicate reports - si = dynamic_cast(_context->getChannel(cid).get()); // TODO - if(si != NULL) - { - //si->acquire(); // TODO not thread/destruction safe - si->searchResponse(minorRevision, serverAddress); - //si->release(); - } - return; - } -} - -void ChannelSearchManager::beaconAnomalyNotify() -{ - for(int i = _beaconAnomalyTimerIndex + 1; i < _numberOfTimers; i++) - { - _timers[i]->moveChannels(_timers[_beaconAnomalyTimerIndex]); - } -} - -void ChannelSearchManager::initializeSendBuffer() -{ - Lock guard(_volMutex); - _sequenceNumber++; - - - // new buffer - _sendBuffer->clear(); - _sendBuffer->putByte(CA_MAGIC); - _sendBuffer->putByte(CA_VERSION); - _sendBuffer->putByte((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00); // data + 7-bit endianess - _sendBuffer->putByte((int8)3); // search - _sendBuffer->putInt(sizeof(int32)/sizeof(int8) + 1); // "zero" payload - _sendBuffer->putInt(_sequenceNumber); - - /* - final boolean REQUIRE_REPLY = false; - sendBuffer.put(REQUIRE_REPLY ? (byte)QoS.REPLY_REQUIRED.getMaskValue() : (byte)QoS.DEFAULT.getMaskValue()); - */ - - _sendBuffer->putByte((int8)QOS_DEFAULT); - _sendBuffer->putShort((int16)0); // count -} - -void ChannelSearchManager::flushSendBuffer() -{ - Lock guard(_mutex); - Lock volGuard(_volMutex); - TimeStamp now; - now.getCurrent(); - _timeAtLastSend = now.getMilliseconds(); - - Transport::shared_pointer tt = _context->getSearchTransport(); - BlockingUDPTransport::shared_pointer ut = std::tr1::static_pointer_cast(tt); - ut->send(_sendBuffer); // TODO - initializeSendBuffer(); -} - -bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance* channel, bool allowNewFrame) -{ - Lock guard(_mutex); - bool success = channel->generateSearchRequestMessage(_sendBuffer, _mockTransportSendControl); - // buffer full, flush - if(!success) - { - flushSendBuffer(); - if(allowNewFrame) - { - channel->generateSearchRequestMessage(_sendBuffer, _mockTransportSendControl); - } - return true; - } - return false; -} - -void ChannelSearchManager::searchResponseTimeout(SearchInstance* channel, int32 timerIndex) -{ - int32 newTimerIndex = min(++timerIndex, _numberOfTimers - 1); - _timers[newTimerIndex]->installChannel(channel); -} - -void ChannelSearchManager::boostSearching(SearchInstance* channel, int32 timerIndex) -{ - _timers[timerIndex]->installChannel(channel); -} - -inline void ChannelSearchManager::updateRTTE(long rtt) -{ - Lock guard(_volMutex); - const double error = rtt - _rttmean; - _rttmean += error / 4.0; -} - -inline double ChannelSearchManager::getRTTE() -{ - Lock guard(_volMutex); - double rtte = min(max((double)_rttmean, (double)MIN_RTT), (double)MAX_RTT); - return rtte; -} - -inline int32 ChannelSearchManager::getSequenceNumber() -{ - Lock guard(_volMutex); - int32 retval = _sequenceNumber; - return retval; -} - -inline int64 ChannelSearchManager::getTimeAtLastSend() -{ - Lock guard(_volMutex); - int64 retval = _timeAtLastSend; - return retval; -} - -}} - diff --git a/pvAccessApp/remote/channelSearchManager.h b/pvAccessApp/remote/channelSearchManager.h index f748eca..d074976 100644 --- a/pvAccessApp/remote/channelSearchManager.h +++ b/pvAccessApp/remote/channelSearchManager.h @@ -8,303 +8,65 @@ #define CHANNELSEARCHMANAGER_H #include -#include -#include -#include - -#include #include -#include -#include -#include -#include -#include +#include -#include +namespace epics { +namespace pvAccess { -namespace epics { namespace pvAccess { - -//TODO check the const of parameters - -/** - * SearchInstance. - */ class SearchInstance { -public: - - typedef std::deque List; - + public: + POINTER_DEFINITIONS(SearchInstance); + /** * Destructor */ virtual ~SearchInstance() {}; - /** - * Return channel ID. - * - * @return channel ID. - */ + virtual pvAccessID getSearchInstanceID() = 0; - /** - * Return search instance, e.g. channel, name. - * - * @return channel channel name. - */ + virtual epics::pvData::String getSearchInstanceName() = 0; - /** - * Removes the owner of this search instance. - */ - virtual void unsetListOwnership() = 0; - /** - * Adds this search instance into the provided list and set it as the owner of this search instance. - * - * @param newOwner a list to which this search instance is added. - * @param ownerMutex mutex belonging to the newOwner list. The mutex will be locked beofe any modification - * to the list will be done. - * @param index index of the owner (which is search timer index). - * - * @throws BaseException if the ownerMutex is NULL. - */ - virtual void addAndSetListOwnership(List* newOwner, epics::pvData::Mutex* ownerMutex, epics::pvData::int32 index) = 0; - /** - * Removes this search instance from the owner list and also removes the list as the owner of this - * search instance. - * - * @throws BaseException if the ownerMutex is NULL. - */ - virtual void removeAndUnsetListOwnership() = 0; - /** - * Returns the index of the owner. - */ - virtual epics::pvData::int32 getOwnerIndex() = 0; - /** - * Generates request message. - */ - virtual bool generateSearchRequestMessage(epics::pvData::ByteBuffer* requestMessage, TransportSendControl* control) = 0; + + virtual int32_t& getUserValue() = 0; /** * Search response from server (channel found). * @param minorRevision server minor CA revision. * @param serverAddress server address. */ - virtual void searchResponse(epics::pvData::int8 minorRevision, osiSockAddr* serverAddress) = 0; + // TODO make serverAddress an URI or similar + virtual void searchResponse(int8_t minorRevision, osiSockAddr* serverAddress) = 0; }; -/** - * BaseSearchInstance. - */ -class BaseSearchInstance : public SearchInstance -{ -public: - virtual ~BaseSearchInstance() {}; - void initializeSearchInstance(); - virtual pvAccessID getSearchInstanceID() = 0; - virtual epics::pvData::String getSearchInstanceName() = 0; - virtual void unsetListOwnership(); - virtual void addAndSetListOwnership(List* newOwner, epics::pvData::Mutex* ownerMutex, epics::pvData::int32 index); - virtual void removeAndUnsetListOwnership(); - virtual epics::pvData::int32 getOwnerIndex(); +class ChannelSearchManager { + public: + POINTER_DEFINITIONS(ChannelSearchManager); + /** - * Send search message. - * @return success status. + * Destructor */ - virtual bool generateSearchRequestMessage(epics::pvData::ByteBuffer* requestMessage, TransportSendControl* control); -private: - epics::pvData::Mutex _mutex; - List* _owner; - epics::pvData::Mutex* _ownerMutex; - epics::pvData::int32 _ownerIndex; - - const static int DATA_COUNT_POSITION; - const static int PAYLOAD_POSITION; -}; - -class ChannelSearchManager; -/** - * SearchTimer. - */ -class SearchTimer: public epics::pvData::TimerCallback -{ -public: - /** - * Constructor; - * @param timerIndex this timer instance index. - * @param allowBoost is boost allowed flag. - */ - SearchTimer(ChannelSearchManager* csmanager,epics::pvData::int32 timerIndex, bool allowBoost, bool allowSlowdown); - /** - * Destructor. - */ - virtual ~SearchTimer(); - /** - * Shutdown this instance. - */ - void shutdown(); - /** - * Install channel. - * @param channel channel to be registered. - */ - void installChannel(SearchInstance* channel); - /** - * Move channels to other SearchTimer. - * @param destination where to move channels. - */ - void moveChannels(SearchTimer* destination); - /** - * @see TimerCallback#timerStopped() - */ - void timerStopped(); - /** - * @see TimerCallback#callback() - */ - void callback(); - /** - * Search response received notification. - * @param responseSequenceNumber sequence number of search frame which contained search request. - * @param isSequenceNumberValid valid flag of responseSequenceNumber. - * @param responseTime time of search response. - */ - void searchResponse(epics::pvData::int32 responseSequenceNumber, bool isSequenceNumberValid, epics::pvData::int64 responseTime); - /** - * Calculate search time period. - * @return search time period. - */ - const epics::pvData::int64 period(); -private: - /** - * Instance of the channel search manager with which this search timer - * is associated. - */ - ChannelSearchManager* _chanSearchManager; - /** - * Number of search attempts in one frame. - */ - epics::pvData::int32 _searchAttempts; - /** - * Number of search responses in one frame. - */ - epics::pvData::int32 _searchRespones; - /** - * Number of frames per search try. - */ - double _framesPerTry; - /** - * Number of frames until congestion threshold is reached. - */ - double _framesPerTryCongestThresh; - /** - * Start sequence number (first frame number within a search try). - */ - epics::pvData::int32 _startSequenceNumber; - /** - * End sequence number (last frame number within a search try). - */ - epics::pvData::int32 _endSequenceNumber; - /** - * This timer index. - */ - const epics::pvData::int32 _timerIndex; - /** - * Flag indicating whether boost is allowed. - */ - const bool _allowBoost; - /** - * Flag indicating whether slow-down is allowed (for last timer). - */ - const bool _allowSlowdown; - /** - * Ordered (as inserted) list of channels with search request pending. - */ - // TODO replace with stl::deque - SearchInstance::List* _requestPendingChannels; - /** - * Ordered (as inserted) list of channels with search request pending. - */ - // TODO replace with stl::deque - SearchInstance::List* _responsePendingChannels; - /** - * Timer node. - * (sync on requestPendingChannels) - */ - epics::pvData::TimerNode* _timerNode; - /** - * Cancel this instance. - */ - bool _canceled; - /** - * Time of last response check. - */ - epics::pvData::int64 _timeAtResponseCheck; - /** - * epics::pvData::Mutex for request pending channel list. - */ - epics::pvData::Mutex _requestPendingChannelsMutex; - /** - * epics::pvData::Mutex for request pending channel list. - */ - epics::pvData::Mutex _responsePendingChannelsMutex; - /** - * General mutex. - */ - epics::pvData::Mutex _mutex; - /** - * Volatile varialbe mutex. - */ - epics::pvData::Mutex _volMutex; - /** - * Max search tries per frame. - */ - static const epics::pvData::int32 MAX_FRAMES_PER_TRY; -}; - -class MockTransportSendControl: public TransportSendControl -{ -public: - void endMessage() {} - void flush(bool lastMessageCompleted) {} - void setRecipient(const osiSockAddr& sendTo) {} - void startMessage(epics::pvData::int8 command, int ensureCapacity) {} - void ensureBuffer(int size) {} - void alignBuffer(int alignment) {} - void flushSerializeBuffer() {} -}; - - -class ChannelSearchManager -{ -public: - typedef std::tr1::shared_ptr shared_pointer; - typedef std::tr1::shared_ptr const_shared_pointer; - - /** - * Constructor. - * @param context - */ - ChannelSearchManager(Context* context); - /** - * Constructor. - * @param context - */ - virtual ~ChannelSearchManager(); - /** - * Cancel. - */ - void cancel(); + virtual ~ChannelSearchManager() {}; + /** * Get number of registered channels. * @return number of registered channels. */ - epics::pvData::int32 registeredChannelCount(); + virtual int32_t registeredCount() = 0; + /** * Register channel. - * @param channel to register. + * @param channel */ - void registerChannel(SearchInstance* channel); + virtual void registerSearchInstance(SearchInstance::shared_pointer const & channel) = 0; + + /** * Unregister channel. - * @param channel to unregister. + * @param channel */ - void unregisterChannel(SearchInstance* channel); + virtual void unregisterSearchInstance(SearchInstance::shared_pointer const & channel) = 0; + /** * Search response from server (channel found). * @param cid client channel ID. @@ -312,155 +74,22 @@ public: * @param minorRevision server minor CA revision. * @param serverAddress server address. */ - void searchResponse(epics::pvData::int32 cid, epics::pvData::int32 seqNo, epics::pvData::int8 minorRevision, osiSockAddr* serverAddress); + virtual void searchResponse(pvAccessID cid, int32_t seqNo, int8_t minorRevision, osiSockAddr* serverAddress) = 0; + /** * Beacon anomaly detected. * Boost searching of all channels. */ - void beaconAnomalyNotify(); -private: + virtual void beaconAnomalyNotify() = 0; + /** - * Minimal RTT (ms). + * Cancel. */ - static const epics::pvData::int64 MIN_RTT; - /** - * Maximal RTT (ms). - */ - static const epics::pvData::int64 MAX_RTT; - /** - * Rate to be considered as OK. - */ - static const double SUCCESS_RATE; - /** - * Context. - */ - Context* _context; - /** - * Canceled flag. - */ - bool _canceled; - /** - * Round-trip time (RTT) mean. - */ - double _rttmean; - /** - * Search timers array. - * Each timer with a greater index has longer (doubled) search period. - */ - SearchTimer** _timers; - /** - * Number of timers in timers array. - */ - epics::pvData::int32 _numberOfTimers; - /** - * Index of a timer to be used when beacon anomaly is detected. - */ - epics::pvData::int32 _beaconAnomalyTimerIndex; - /** - * Search (datagram) sequence number. - */ - epics::pvData::int32 _sequenceNumber; - /** - * Max search period (in ms). - */ - static const epics::pvData::int64 MAX_SEARCH_PERIOD; - /** - * Max search period (in ms) - lower limit. - */ - static const epics::pvData::int64 MAX_SEARCH_PERIOD_LOWER_LIMIT; - /** - * Beacon anomaly search period (in ms). - */ - static const epics::pvData::int64 BEACON_ANOMALY_SEARCH_PERIOD; - /** - * Max number of timers. - */ - static const epics::pvData::int32 MAX_TIMERS; - /** - * Send byte buffer (frame) - */ - epics::pvData::ByteBuffer* _sendBuffer; - /** - * Time of last frame send. - */ - epics::pvData::int64 _timeAtLastSend; - /** - * Set of registered channels. - */ - std::map _channels; - /** - * Iterator for the set of registered channels. - */ - std::map::iterator _channelsIter; - /** - * General mutex. - */ - epics::pvData::Mutex _mutex; - /** - * Channel mutex. - */ - epics::pvData::Mutex _channelMutex; - /** - * Volatile variable mutex. - */ - epics::pvData::Mutex _volMutex; - /** - * Mock transport send control - */ - MockTransportSendControl* _mockTransportSendControl; - /** - * SearchTimer is a friend. - */ - friend class SearchTimer; - /** - * Initialize send buffer. - */ - void initializeSendBuffer(); - /** - * Flush send buffer. - */ - void flushSendBuffer(); - /** - * Generate (put on send buffer) search request - * @param channel - * @param allowNewFrame flag indicating if new search request message is allowed to be put in new frame. - * @return true if new frame was sent. - */ - bool generateSearchRequestMessage(SearchInstance* channel, bool allowNewFrame); - /** - * Notify about search failure (response timeout). - * @param channel channel whose search failed. - * @param timerIndex index of timer which tries to search. - */ - void searchResponseTimeout(SearchInstance* channel, epics::pvData::int32 timerIndex); - /** - * Boost searching of a channel. - * @param channel channel to boost searching. - * @param timerIndex to what timer-index to boost - */ - void boostSearching(SearchInstance* channel, epics::pvData::int32 timerIndex); - /** - * Update (recalculate) round-trip estimate. - * @param rtt new sample of round-trip value. - */ - void updateRTTE(long rtt); - /** - * Get round-trip estimate (in ms). - * @return round-trip estimate (in ms). - */ - double getRTTE(); - /** - * Get search (UDP) frame sequence number. - * @return search (UDP) frame sequence number. - */ - epics::pvData::int32 getSequenceNumber(); - /** - * Get time at last send (when sendBuffer was flushed). - * @return time at last send. - */ - epics::pvData::int64 getTimeAtLastSend(); + virtual void cancel() = 0; + }; -}} +} +} -#endif /* CHANNELSEARCHMANAGER_H */ +#endif \ No newline at end of file diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index 3319a8f..0f17ee8 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -71,7 +71,7 @@ namespace epics { typedef epics::pvData::int32 pvAccessID; - enum MessageCommands { + enum ApplicationCommands { CMD_BEACON = 0, CMD_CONNECTION_VALIDATION = 1, CMD_ECHO = 2, @@ -95,6 +95,12 @@ namespace epics { CMD_RPC = 20 }; + enum ControlCommands { + CMD_SET_MARKER = 0, + CMD_ACK_MARKER = 1, + CMD_SET_ENDIANESS = 2 + }; + /** * Interface defining transport send control. */ diff --git a/pvAccessApp/remote/simpleChannelSearchManagerImpl.cpp b/pvAccessApp/remote/simpleChannelSearchManagerImpl.cpp new file mode 100644 index 0000000..57abe88 --- /dev/null +++ b/pvAccessApp/remote/simpleChannelSearchManagerImpl.cpp @@ -0,0 +1,297 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include +#include + +#include +#include +#include +#include + +using namespace std; +using namespace epics::pvData; + +namespace epics { +namespace pvAccess { + +const int SimpleChannelSearchManagerImpl::DATA_COUNT_POSITION = CA_MESSAGE_HEADER_SIZE + sizeof(int32)/sizeof(int8) + 1; +const int SimpleChannelSearchManagerImpl::PAYLOAD_POSITION = sizeof(int16)/sizeof(int8) + 2; + +// 225ms +/- 25ms random +const double SimpleChannelSearchManagerImpl::ATOMIC_PERIOD = 0.225; +const int SimpleChannelSearchManagerImpl::PERIOD_JITTER_MS = 25; + +const int SimpleChannelSearchManagerImpl::BOOST_VALUE = 1; +// must be power of two (so that search is done) +const int SimpleChannelSearchManagerImpl::MAX_COUNT_VALUE = 1 << 7; +const int SimpleChannelSearchManagerImpl::MAX_FALLBACK_COUNT_VALUE = (1 << 6) + 1; + +const int SimpleChannelSearchManagerImpl::MAX_FRAMES_AT_ONCE = 10; +const int SimpleChannelSearchManagerImpl::DELAY_BETWEEN_FRAMES_MS = 50; + + +SimpleChannelSearchManagerImpl::SimpleChannelSearchManagerImpl(Context::shared_pointer const & context) : + m_context(context), + m_canceled(), + m_sequenceNumber(0), + m_sendBuffer(MAX_UDP_SEND), + m_channels(), + m_timerNode(*this), + m_lastTimeSent(), + m_mockTransportSendControl(), + m_mutex() +{ + // initialize send buffer + initializeSendBuffer(); + + + // initialize random seed with some random value + srand ( time(NULL) ); + + // add some jitter so that all the clients do not send at the same time + double period = ATOMIC_PERIOD + (rand() % (2*PERIOD_JITTER_MS+1) - PERIOD_JITTER_MS)/(double)1000; + context->getTimer()->schedulePeriodic(m_timerNode, period, period); + + //new Thread(this, "pvAccess immediate-search").start(); +} + +SimpleChannelSearchManagerImpl::~SimpleChannelSearchManagerImpl() +{ + cancel(); +} + +void SimpleChannelSearchManagerImpl::cancel() +{ + Lock guard(m_mutex); + + if (m_canceled.get()) + return; + m_canceled.set(); + + m_timerNode.cancel(); +} + +int32_t SimpleChannelSearchManagerImpl::registeredCount() +{ + Lock guard(m_channelMutex); + return m_channels.size(); +} + +void SimpleChannelSearchManagerImpl::registerSearchInstance(SearchInstance::shared_pointer const & channel) +{ + if (m_canceled.get()) + return; + + Lock guard(m_channelMutex); + //overrides if already registered + m_channels[channel->getSearchInstanceID()] = channel; + int32_t& userValue = channel->getUserValue(); + userValue = 1; +} + +void SimpleChannelSearchManagerImpl::unregisterSearchInstance(SearchInstance::shared_pointer const & channel) +{ + Lock guard(m_channelMutex); + pvAccessID id = channel->getSearchInstanceID(); + std::map::iterator channelsIter = m_channels.find(id); + if(channelsIter != m_channels.end()) + m_channels.erase(id); +} + +void SimpleChannelSearchManagerImpl::searchResponse(pvAccessID cid, int32_t seqNo, int8_t minorRevision, osiSockAddr* serverAddress) +{ + Lock guard(m_channelMutex); + std::map::iterator channelsIter = m_channels.find(cid); + if(channelsIter == m_channels.end()) + { + guard.unlock(); + + // minor hack to enable duplicate reports + SearchInstance::shared_pointer si = std::tr1::dynamic_pointer_cast(m_context.lock()->getChannel(cid)); + if (si) + si->searchResponse(minorRevision, serverAddress); + } + else + { + SearchInstance::shared_pointer si = channelsIter->second; + + // remove from search list + m_channels.erase(cid); + + guard.unlock(); + + // then notify SearchInstance + si->searchResponse(minorRevision, serverAddress); + } +} + +void SimpleChannelSearchManagerImpl::beaconAnomalyNotify() +{ + boost(); + callback(); +} + +void SimpleChannelSearchManagerImpl::initializeSendBuffer() +{ + // for now OK, since it is only set here + m_sequenceNumber++; + + + // new buffer + m_sendBuffer.clear(); + m_sendBuffer.putByte(CA_MAGIC); + m_sendBuffer.putByte(CA_VERSION); + m_sendBuffer.putByte((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00); // data + 7-bit endianess + m_sendBuffer.putByte((int8_t)3); // search + m_sendBuffer.putInt(sizeof(int32_t)/sizeof(int8_t) + 1); // "zero" payload + m_sendBuffer.putInt(m_sequenceNumber); + + /* + final boolean REQUIRE_REPLY = false; + sendBuffer.put(REQUIRE_REPLY ? (byte)QoS.REPLY_REQUIRED.getMaskValue() : (byte)QoS.DEFAULT.getMaskValue()); + */ + + m_sendBuffer.putByte((int8_t)QOS_DEFAULT); + m_sendBuffer.putShort((int16_t)0); // count +} + +void SimpleChannelSearchManagerImpl::flushSendBuffer() +{ + Lock guard(m_mutex); + + Transport::shared_pointer tt = m_context.lock()->getSearchTransport(); + BlockingUDPTransport::shared_pointer ut = std::tr1::static_pointer_cast(tt); + ut->send(&m_sendBuffer); // TODO + initializeSendBuffer(); +} + + +bool SimpleChannelSearchManagerImpl::generateSearchRequestMessage(SearchInstance::shared_pointer const & channel, + ByteBuffer* requestMessage, TransportSendControl* control) +{ + epics::pvData::int16 dataCount = requestMessage->getShort(DATA_COUNT_POSITION); + + dataCount++; + + /* + if(dataCount >= MAX_SEARCH_BATCH_COUNT) + return false; + */ + + const epics::pvData::String name = channel->getSearchInstanceName(); + // not nice... + const int addedPayloadSize = sizeof(int32)/sizeof(int8) + (1 + sizeof(int32)/sizeof(int8) + name.length()); + if(((int)requestMessage->getRemaining()) < addedPayloadSize) + return false; + + requestMessage->putInt(channel->getSearchInstanceID()); + SerializeHelper::serializeString(name, requestMessage, control); + + requestMessage->putInt(PAYLOAD_POSITION, requestMessage->getPosition() - CA_MESSAGE_HEADER_SIZE); + requestMessage->putShort(DATA_COUNT_POSITION, dataCount); + return true; +} + +bool SimpleChannelSearchManagerImpl::generateSearchRequestMessage(SearchInstance::shared_pointer const & channel, + bool allowNewFrame, bool flush) +{ + Lock guard(m_mutex); + bool success = generateSearchRequestMessage(channel, &m_sendBuffer, &m_mockTransportSendControl); + // buffer full, flush + if(!success) + { + flushSendBuffer(); + if(allowNewFrame) + generateSearchRequestMessage(channel, &m_sendBuffer, &m_mockTransportSendControl); + if (flush) + flushSendBuffer(); + return true; + } + + if (flush) + flushSendBuffer(); + + return flush; +} + +void SimpleChannelSearchManagerImpl::boost() +{ + Lock guard(m_channelMutex); + std::map::iterator channelsIter = m_channels.begin(); + for(; channelsIter != m_channels.end(); channelsIter++) + { + int32_t& userValue = channelsIter->second->getUserValue(); + userValue = BOOST_VALUE; + } +} + +void SimpleChannelSearchManagerImpl::callback() +{ + // high-frequency beacon anomaly trigger guard + { + Lock guard(m_mutex); + + epics::pvData::TimeStamp now; + now.getCurrent(); + int64_t nowMS = now.getMilliseconds(); + + if (nowMS - m_lastTimeSent < 100) + return; + m_lastTimeSent = nowMS; + } + + + int frameSent = 0; + + vector toSend; + { + Lock guard(m_channelMutex); + toSend.reserve(m_channels.size()); + std::map::iterator channelsIter = m_channels.begin(); + for(; channelsIter != m_channels.end(); channelsIter++) + toSend.push_back(channelsIter->second); + } + + vector::iterator siter = toSend.begin(); + for (; siter != toSend.end(); siter++) + { + int32_t& countValue = (*siter)->getUserValue(); + bool skip = !isPowerOfTwo(countValue); + + if (countValue == MAX_COUNT_VALUE) + countValue = MAX_FALLBACK_COUNT_VALUE; + else + countValue++; + + // back-off + if (skip) + continue; + + if (generateSearchRequestMessage(*siter, true, false)) + frameSent++; + if (frameSent == MAX_FRAMES_AT_ONCE) + { + epicsThreadSleep(DELAY_BETWEEN_FRAMES_MS/(double)1000.0); + frameSent = 0; + } + } + + flushSendBuffer(); +} + +bool SimpleChannelSearchManagerImpl::isPowerOfTwo(int32_t x) +{ + return ((x > 0) && (x & (x - 1)) == 0); +} + +void SimpleChannelSearchManagerImpl::timerStopped() +{ +} + +}} + diff --git a/pvAccessApp/remote/simpleChannelSearchManagerImpl.h b/pvAccessApp/remote/simpleChannelSearchManagerImpl.h new file mode 100644 index 0000000..6387a6b --- /dev/null +++ b/pvAccessApp/remote/simpleChannelSearchManagerImpl.h @@ -0,0 +1,170 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#ifndef SIMPLECHANNELSEARCHMANAGERIMPL_H +#define SIMPLECHANNELSEARCHMANAGERIMPL_H + +#include + +#include +#include +#include + +namespace epics { +namespace pvAccess { + + +class MockTransportSendControl: public TransportSendControl +{ +public: + void endMessage() {} + void flush(bool lastMessageCompleted) {} + void setRecipient(const osiSockAddr& sendTo) {} + void startMessage(epics::pvData::int8 command, int ensureCapacity) {} + void ensureBuffer(int size) {} + void alignBuffer(int alignment) {} + void flushSerializeBuffer() {} +}; + + +class SimpleChannelSearchManagerImpl : public ChannelSearchManager, public epics::pvData::TimerCallback +{ + public: + POINTER_DEFINITIONS(SimpleChannelSearchManagerImpl); + + /** + * Constructor. + * @param context + */ + SimpleChannelSearchManagerImpl(Context::shared_pointer const & context); + /** + * Constructor. + * @param context + */ + virtual ~SimpleChannelSearchManagerImpl(); + /** + * Cancel. + */ + void cancel(); + /** + * Get number of registered channels. + * @return number of registered channels. + */ + int32_t registeredCount(); + /** + * Register channel. + * @param channel to register. + */ + void registerSearchInstance(SearchInstance::shared_pointer const & channel); + /** + * Unregister channel. + * @param channel to unregister. + */ + void unregisterSearchInstance(SearchInstance::shared_pointer const & channel); + /** + * Search response from server (channel found). + * @param cid client channel ID. + * @param seqNo search sequence number. + * @param minorRevision server minor CA revision. + * @param serverAddress server address. + */ + void searchResponse(pvAccessID cid, int32_t seqNo, int8_t minorRevision, osiSockAddr* serverAddress); + /** + * Beacon anomaly detected. + * Boost searching of all channels. + */ + void beaconAnomalyNotify(); + + /// Timer callback. + void callback(); + + /// Timer stooped callback. + void timerStopped(); + + private: + + bool generateSearchRequestMessage(SearchInstance::shared_pointer const & channel, bool allowNewFrame, bool flush); + + static bool generateSearchRequestMessage(SearchInstance::shared_pointer const & channel, + epics::pvData::ByteBuffer* byteBuffer, TransportSendControl* control); + + void boost(); + + void initializeSendBuffer(); + void flushSendBuffer(); + + static bool isPowerOfTwo(int32_t x); + + /** + * Context. + */ + Context::weak_pointer m_context; + + /** + * Canceled flag. + */ + AtomicBoolean m_canceled; + + /** + * Search (datagram) sequence number. + */ + int32_t m_sequenceNumber; + + /** + * Send byte buffer (frame) + */ + epics::pvData::ByteBuffer m_sendBuffer; + + /** + * Set of registered channels. + */ + std::map m_channels; + + /** + * Timer node. + * (sync on requestPendingChannels) + */ + epics::pvData::TimerNode m_timerNode; + + /** + * Time of last frame send. + */ + int64_t m_lastTimeSent; + + /** + * Mock transport send control + */ + MockTransportSendControl m_mockTransportSendControl; + + /** + * This instance mutex. + */ + epics::pvData::Mutex m_channelMutex; + + /** + * m_channels mutex. + */ + epics::pvData::Mutex m_mutex; + + static const int DATA_COUNT_POSITION; + static const int PAYLOAD_POSITION; + + static const double ATOMIC_PERIOD; + static const int PERIOD_JITTER_MS; + + static const int BOOST_VALUE; + static const int MAX_COUNT_VALUE; + static const int MAX_FALLBACK_COUNT_VALUE; + + static const int MAX_FRAMES_AT_ONCE; + static const int DELAY_BETWEEN_FRAMES_MS; + +}; + +} +} + +#endif /* SIMPLECHANNELSEARCHMANAGERIMPL_H */ diff --git a/pvAccessApp/remoteClient/clientContextImpl.cpp b/pvAccessApp/remoteClient/clientContextImpl.cpp index ae63204..c213ea3 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.cpp +++ b/pvAccessApp/remoteClient/clientContextImpl.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -2979,6 +2980,9 @@ namespace epics { */ bool m_issueCreateMessage; + /// Used by SearchInstance. + int32_t m_userValue; + /** * Constructor. * @param context @@ -3010,8 +3014,6 @@ namespace epics { void activate() { - initializeSearchInstance(); - // register before issuing search request ChannelImpl::shared_pointer thisPointer = shared_from_this(); m_context->registerChannel(thisPointer); @@ -3054,6 +3056,8 @@ namespace epics { std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } + int32_t& getUserValue() { return m_userValue; } + virtual ChannelProvider::shared_pointer const & getProvider() { return m_context->getProvider(); @@ -3294,7 +3298,8 @@ namespace epics { throw std::runtime_error("Channel already destroyed."); // stop searching... - m_context->getChannelSearchManager()->unregisterChannel(this); + SearchInstance::shared_pointer thisChannelPointer = shared_from_this(); + m_context->getChannelSearchManager()->unregisterSearchInstance(thisChannelPointer); cancel(); disconnectPendingIO(true); @@ -3339,7 +3344,8 @@ namespace epics { if (!initiateSearch) { // stop searching... - m_context->getChannelSearchManager()->unregisterChannel(this); + SearchInstance::shared_pointer thisChannelPointer = shared_from_this(); + m_context->getChannelSearchManager()->unregisterSearchInstance(thisChannelPointer); cancel(); } setConnectionState(DISCONNECTED); @@ -3378,7 +3384,10 @@ namespace epics { m_allowCreation = true; if (!m_addresses.get()) - m_context->getChannelSearchManager()->registerChannel(this); + { + SearchInstance::shared_pointer thisChannelPointer = shared_from_this(); + m_context->getChannelSearchManager()->registerSearchInstance(thisChannelPointer); + } /* TODO else // TODO not only first @@ -3893,7 +3902,7 @@ TODO m_transportRegistry.reset(new TransportRegistry()); // setup search manager - m_channelSearchManager.reset(new ChannelSearchManager(thisPointer.get())); + m_channelSearchManager.reset(new SimpleChannelSearchManagerImpl(thisPointer)); // TODO put memory barrier here... diff --git a/pvAccessApp/remoteClient/clientContextImpl.h b/pvAccessApp/remoteClient/clientContextImpl.h index a16e207..a677de5 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.h +++ b/pvAccessApp/remoteClient/clientContextImpl.h @@ -11,6 +11,7 @@ #include #include #include +#include class ChannelSearchManager; @@ -24,7 +25,7 @@ namespace epics { public Channel, public TransportClient, public TransportSender, - public BaseSearchInstance + public SearchInstance { public: POINTER_DEFINITIONS(ChannelImpl); diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile index 07ebd9c..6daba7f 100644 --- a/testApp/remote/Makefile +++ b/testApp/remote/Makefile @@ -26,9 +26,9 @@ testBeaconEmitter_LIBS += pvData pvAccess Com testBeaconHandler_SRCS += testBeaconHandler.cpp testBeaconHandler_LIBS += pvData pvAccess Com -PROD_HOST += testChannelSearchManager -testChannelSearchManager_SRCS += testChannelSearchManager.cpp -testChannelSearchManager_LIBS += pvData pvAccess Com +#PROD_HOST += testChannelSearchManager +#testChannelSearchManager_SRCS += testChannelSearchManager.cpp +#testChannelSearchManager_LIBS += pvData pvAccess Com PROD_HOST += testBlockingTCPSrv testBlockingTCPSrv_SRCS += testBlockingTCPSrv.cpp diff --git a/testApp/remote/pvget.cpp b/testApp/remote/pvget.cpp index 3b2b372..b6762cd 100644 --- a/testApp/remote/pvget.cpp +++ b/testApp/remote/pvget.cpp @@ -4,13 +4,14 @@ #include #include #include -#include +#include #include #include #include +#include #include using namespace std; diff --git a/testApp/remote/pvput.cpp b/testApp/remote/pvput.cpp index 71d1be9..13b2be8 100644 --- a/testApp/remote/pvput.cpp +++ b/testApp/remote/pvput.cpp @@ -4,12 +4,13 @@ #include #include #include -#include +#include #include #include #include +#include using namespace std; using namespace std::tr1; diff --git a/testApp/remote/testChannelConnect.cpp b/testApp/remote/testChannelConnect.cpp index 6f9d092..074e4e2 100644 --- a/testApp/remote/testChannelConnect.cpp +++ b/testApp/remote/testChannelConnect.cpp @@ -10,6 +10,8 @@ #include #include +#include + using namespace epics::pvData; using namespace epics::pvAccess; using namespace std; @@ -88,7 +90,7 @@ int main(int argc,char *argv[]) ClientFactory::stop(); } - epicsThreadSleep ( 1.0 ); + epicsThreadSleep ( 2.0 ); std::cout << "-----------------------------------------------------------------------" << std::endl; epicsExitCallAtExits(); CDRMonitor::get().show(stdout, true); diff --git a/testApp/remote/testGetPerformance.cpp b/testApp/remote/testGetPerformance.cpp index 45310d1..e71775c 100644 --- a/testApp/remote/testGetPerformance.cpp +++ b/testApp/remote/testGetPerformance.cpp @@ -13,6 +13,8 @@ #include #include +#include + using namespace std; using namespace std::tr1; using namespace epics::pvData; @@ -284,4 +286,4 @@ int main (int argc, char *argv[]) ClientFactory::stop(); return allOK ? 0 : 1; -} \ No newline at end of file +}