From 636a1f73bdee8d00336a3d215e70c51c5e45e551 Mon Sep 17 00:00:00 2001 From: Gasper Jansa Date: Mon, 10 Jan 2011 22:15:14 +0100 Subject: [PATCH 1/4] deadlock and some other fixes --- pvAccessApp/remote/channelSearchManager.cpp | 142 +++++++++++--------- 1 file changed, 81 insertions(+), 61 deletions(-) diff --git a/pvAccessApp/remote/channelSearchManager.cpp b/pvAccessApp/remote/channelSearchManager.cpp index c908e87..89b8627 100644 --- a/pvAccessApp/remote/channelSearchManager.cpp +++ b/pvAccessApp/remote/channelSearchManager.cpp @@ -92,13 +92,11 @@ SearchTimer::SearchTimer(ChannelSearchManager* _chanSearchManager, int32 timerIn _allowSlowdown(allowSlowdown), _requestPendingChannels(new ArrayFIFO), _responsePendingChannels(new ArrayFIFO), - _timerNode(NULL), + _timerNode(new TimerNode(this)), _canceled(false), - _timeAtResponseCheck(0), - _requestPendingChannelsMutex(Mutex()), - _mutex(Mutex()) + _timeAtResponseCheck(0) { - _timerNode = new TimerNode(this); + } SearchTimer::~SearchTimer() @@ -111,8 +109,12 @@ SearchTimer::~SearchTimer() void SearchTimer::shutdown() { Lock guard(&_mutex); //the whole method is locked - if(_canceled) return; - _canceled = true; + + { + Lock guard(&_volMutex); + if(_canceled) return; + _canceled = true; + } { Lock guard(&_requestPendingChannelsMutex); @@ -128,7 +130,6 @@ void SearchTimer::installChannel(SearchInstance* channel) Lock guard(&_mutex); //the whole method is locked if(_canceled) return; - Lock pendingChannelGuard(&_requestPendingChannelsMutex); bool startImmediately = _requestPendingChannels->isEmpty(); channel->addAndSetListOwnership(_requestPendingChannels, &_requestPendingChannelsMutex, _timerIndex); @@ -156,7 +157,7 @@ void SearchTimer::moveChannels(SearchTimer* destination) while((channel = _responsePendingChannels->pop()) != NULL) { { - Lock guard(&_mutex); + Lock guard(&_volMutex); if(_searchAttempts > 0) { _searchAttempts--; @@ -181,7 +182,7 @@ void SearchTimer::timerStopped() void SearchTimer::callback() { { - Lock guard(&_mutex); + Lock guard(&_volMutex); if(_canceled) return; } @@ -189,7 +190,7 @@ void SearchTimer::callback() // boost search period (if necessary) for channels not recently searched int32 searchRespones; { - Lock guard(&_mutex); + Lock guard(&_volMutex); searchRespones = _searchRespones; } if(_allowBoost && searchRespones > 0) @@ -236,7 +237,7 @@ void SearchTimer::callback() int32 searchRespones,searchAttempts; { - Lock guard(&_mutex); + Lock guard(&_volMutex); searchAttempts = _searchAttempts; searchRespones = _searchRespones; } @@ -272,7 +273,7 @@ void SearchTimer::callback() { - Lock guard(&_mutex); + Lock guard(&_volMutex); _startSequenceNumber = _chanSearchManager->getSequenceNumber() + 1; _searchAttempts = 0; _searchRespones = 0; @@ -284,56 +285,66 @@ void SearchTimer::callback() // reschedule bool canceled; { - Lock guard(&_mutex); + Lock guard(&_volMutex); canceled = _canceled; } + { Lock guard(&_requestPendingChannelsMutex); - while (!canceled && (channel = _requestPendingChannels->pop()) != NULL) - { - channel->unsetListOwnership(); + channel = _requestPendingChannels->pop(); + } + while (!canceled && channel != NULL) + { + channel->unsetListOwnership(); - bool requestSent = true; - bool allowNewFrame = (framesSent+1) < _framesPerTry; - bool frameWasSent = _chanSearchManager->generateSearchRequestMessage(channel, allowNewFrame); - if(frameWasSent) + bool requestSent = true; + bool allowNewFrame = (framesSent+1) < _framesPerTry; + bool frameWasSent = _chanSearchManager->generateSearchRequestMessage(channel, allowNewFrame); + if(frameWasSent) + { + framesSent++; + triesInFrame = 0; + if(!allowNewFrame) { - framesSent++; - triesInFrame = 0; - if(!allowNewFrame) - { - channel->addAndSetListOwnership(_requestPendingChannels, &_requestPendingChannelsMutex, _timerIndex); - requestSent = false; - } - else - { - triesInFrame++; - } + channel->addAndSetListOwnership(_requestPendingChannels, &_requestPendingChannelsMutex, _timerIndex); + requestSent = false; } else { triesInFrame++; } + } + else + { + triesInFrame++; + } - if(requestSent) + if(requestSent) + { + channel->addAndSetListOwnership(_responsePendingChannels, &_responsePendingChannelsMutex, _timerIndex); + Lock guard(&_volMutex); + if(_searchAttempts < INT_MAX) { - channel->addAndSetListOwnership(_responsePendingChannels, &_requestPendingChannelsMutex, _timerIndex); - Lock guard(&_mutex); - if(_searchAttempts < INT_MAX) - { - _searchAttempts++; - } + _searchAttempts++; } + } - // limit - if(triesInFrame == 0 && !allowNewFrame) break; + // limit + if(triesInFrame == 0 && !allowNewFrame) break; - Lock guard(&_mutex); + { + Lock guard(&_volMutex); canceled = _canceled; } + + { + Lock guard(&_requestPendingChannelsMutex); + channel = _requestPendingChannels->pop(); + } } + // flush out the search request buffer if(triesInFrame > 0) { @@ -341,11 +352,12 @@ void SearchTimer::callback() framesSent++; } - _endSequenceNumber = _chanSearchManager->getSequenceNumber(); - // reschedule { - Lock guard(&_mutex); + Lock guard(&_volMutex); + _endSequenceNumber = _chanSearchManager->getSequenceNumber(); + + // reschedule canceled = _canceled; } Lock guard(&_requestPendingChannelsMutex); @@ -363,12 +375,12 @@ void SearchTimer::searchResponse(int32 responseSequenceNumber, bool isSequenceNu { bool validResponse = true; { - Lock guard(&_mutex); + Lock guard(&_volMutex); if(_canceled) return; if(isSequenceNumberValid) { - validResponse = _startSequenceNumber <= _chanSearchManager->_sequenceNumber && _chanSearchManager->_sequenceNumber <= _endSequenceNumber; + validResponse = _startSequenceNumber <= _chanSearchManager->getSequenceNumber() && _chanSearchManager->getSequenceNumber() <= _endSequenceNumber; } } @@ -378,7 +390,7 @@ void SearchTimer::searchResponse(int32 responseSequenceNumber, bool isSequenceNu { const int64 dt = responseTime - _chanSearchManager->getTimeAtLastSend(); _chanSearchManager->updateRTTE(dt); - Lock guard(&_mutex); + Lock guard(&_volMutex); if(_searchRespones < INT_MAX) { _searchRespones++; @@ -459,9 +471,13 @@ ChannelSearchManager::~ChannelSearchManager() void ChannelSearchManager::cancel() { Lock guard(&_mutex); - if(_canceled) return; - _canceled = true; + { + Lock guard(&_volMutex); + if(_canceled) return; + + _canceled = true; + } if(_timers != NULL) { @@ -474,15 +490,18 @@ void ChannelSearchManager::cancel() int32 ChannelSearchManager::registeredChannelCount() { - Lock guard(&_mutex); + Lock guard(&_channelMutex); return _channels.size(); } void ChannelSearchManager::registerChannel(SearchInstance* channel) { - Lock guard(&_mutex); - if(_canceled) return; + { + Lock guard(&_volMutex); + if(_canceled) return; + } + Lock guard(&_channelMutex); //overrides if already registered _channels[channel->getChannelID()] = channel; _timers[0]->installChannel(channel); @@ -490,7 +509,7 @@ void ChannelSearchManager::registerChannel(SearchInstance* channel) void ChannelSearchManager::unregisterChannel(SearchInstance* channel) { - Lock guard(&_mutex); + Lock guard(&_channelMutex); _channelsIter = _channels.find(channel->getChannelID()); if(_channelsIter != _channels.end()) { @@ -502,13 +521,13 @@ void ChannelSearchManager::unregisterChannel(SearchInstance* channel) void ChannelSearchManager::searchResponse(int32 cid, int32 seqNo, int8 minorRevision, osiSockAddr* serverAddress) { - Lock guard(&_mutex); + Lock guard(&_channelMutex); // first remove SearchInstance* si = NULL; _channelsIter = _channels.find(cid); if(_channelsIter != _channels.end()) { - SearchInstance* si = _channelsIter->second; + si = _channelsIter->second; _channels.erase(_channelsIter); si->removeAndUnsetListOwnership(); } @@ -543,7 +562,7 @@ void ChannelSearchManager::beaconAnomalyNotify() void ChannelSearchManager::initializeSendBuffer() { - Lock guard(&_mutex); + Lock guard(&_volMutex); _sequenceNumber++; @@ -567,6 +586,7 @@ void ChannelSearchManager::initializeSendBuffer() void ChannelSearchManager::flushSendBuffer() { Lock guard(&_mutex); + Lock volGuard(&_volMutex); TimeStamp now; now.getCurrent(); _timeAtLastSend = now.getMilliseconds(); @@ -604,28 +624,28 @@ void ChannelSearchManager::boostSearching(SearchInstance* channel, int32 timerIn inline void ChannelSearchManager::updateRTTE(long rtt) { - Lock guard(&_mutex); + Lock guard(&_volMutex); const double error = rtt - _rttmean; _rttmean += error / 4.0; } inline double ChannelSearchManager::getRTTE() { - Lock guard(&_mutex); + Lock guard(&_volMutex); double rtte = min(max((double)_rttmean, (double)MIN_RTT), (double)MAX_RTT); return rtte; } inline int32 ChannelSearchManager::getSequenceNumber() { - Lock guard(&_mutex); + Lock guard(&_volMutex); int32 retval = _sequenceNumber; return retval; } inline int64 ChannelSearchManager::getTimeAtLastSend() { - Lock guard(&_mutex); + Lock guard(&_volMutex); int64 retval = _timeAtLastSend; return retval; } From 5224123ba45c2167a087cb54ae3fe35722a13923 Mon Sep 17 00:00:00 2001 From: Gasper Jansa Date: Mon, 10 Jan 2011 22:15:32 +0100 Subject: [PATCH 2/4] deadlock and some other fixes --- pvAccessApp/remote/channelSearchManager.h | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/pvAccessApp/remote/channelSearchManager.h b/pvAccessApp/remote/channelSearchManager.h index b09571c..c18ca62 100644 --- a/pvAccessApp/remote/channelSearchManager.h +++ b/pvAccessApp/remote/channelSearchManager.h @@ -262,7 +262,7 @@ public: */ virtual void unsetListOwnership() = 0; /** - * Adds this search instance into the provided list and sets it as the owner of this search instance. + * Adds this search instance into the provided list and set it as the owner of this search instance. * * @param newOwner a list to which this search instance is added. * @param ownerMutex mutex belonging to the newOwner list. The mutex will be locked beofe any modification @@ -442,10 +442,18 @@ private: * Mutex for request pending channel list. */ Mutex _requestPendingChannelsMutex; + /** + * Mutex for request pending channel list. + */ + Mutex _responsePendingChannelsMutex; /** * General mutex. */ Mutex _mutex; + /** + * Volatile varialbe mutex. + */ + Mutex _volMutex; /** * Max search tries per frame. */ @@ -587,6 +595,14 @@ private: * General mutex. */ Mutex _mutex; + /** + * Channel mutex. + */ + Mutex _channelMutex; + /** + * Volatile variable mutex. + */ + Mutex _volMutex; /** * Mock transport send control */ From e43d2472516b98a15e87373313950c9c7ce0844e Mon Sep 17 00:00:00 2001 From: Gasper Jansa Date: Mon, 10 Jan 2011 22:15:43 +0100 Subject: [PATCH 3/4] deadlock and some other fixes --- testApp/remote/testChannelSearchManager.cpp | 85 +++++++++++++++++++-- 1 file changed, 79 insertions(+), 6 deletions(-) diff --git a/testApp/remote/testChannelSearchManager.cpp b/testApp/remote/testChannelSearchManager.cpp index 1d07806..601b639 100644 --- a/testApp/remote/testChannelSearchManager.cpp +++ b/testApp/remote/testChannelSearchManager.cpp @@ -1,6 +1,7 @@ /* testChannelSearcManager.cpp */ #include +#include using namespace epics::pvData; using namespace epics::pvAccess; @@ -17,22 +18,94 @@ private: string _channelName; }; +static const int max_channels = 100; +ClientContextImpl* context = new ClientContextImpl(); +ChannelSearchManager* manager = new ChannelSearchManager(context); +TestSearcInstance** chanArray = new TestSearcInstance*[max_channels]; + +void* testWorker1(void* p) +{ + for(int i = 0; i < 1000; i++) + { + for(int j = 0; j < max_channels/2; j++) + { + manager->unregisterChannel(chanArray[j]); + usleep(100); + manager->registerChannel(chanArray[j]); + } + } + + return NULL; +} + + +void* testWorker2(void* p) +{ + for(int i = 0; i < 1000; i++) + { + for(int j = max_channels/2; j < max_channels; j++) + { + manager->unregisterChannel(chanArray[j]); + usleep(100); + manager->registerChannel(chanArray[j]); + manager->beaconAnomalyNotify(); + } + } + + return NULL; +} + int main(int argc,char *argv[]) { - ClientContextImpl* context = new ClientContextImpl(); - ChannelSearchManager* manager = new ChannelSearchManager(context); + pthread_t _worker1Id; + pthread_t _worker2Id; - TestSearcInstance* chan1 = new TestSearcInstance("chan1", 1); - manager->registerChannel(chan1); + ostringstream obuffer; + for(int i = 0; i < max_channels; i++) + { + obuffer.clear(); + obuffer.str(""); + obuffer << i; + string name = "chan" + obuffer.str(); + chanArray[i] = new TestSearcInstance(name.c_str(), i); + manager->registerChannel(chanArray[i]); + } - sleep(3); + //create two threads + int32 retval = pthread_create(&_worker1Id, NULL, testWorker1, NULL); + if(retval != 0) + { + assert(true); + } + + retval = pthread_create(&_worker2Id, NULL, testWorker2, NULL); + if(retval != 0) + { + assert(true); + } + + retval = pthread_join(_worker1Id, NULL); + if(retval != 0) + { + assert(true); + } + + retval = pthread_join(_worker2Id, NULL); + if(retval != 0) + { + assert(true); + } manager->cancel(); context->destroy(); getShowConstructDestruct()->constuctDestructTotals(stdout); - //if(chan1) delete chan1; + for(int i = 0; i < max_channels; i++) + { + if(chanArray[i]) delete chanArray[i]; + } + if(chanArray) delete [] chanArray; if(manager) delete manager; if(context) delete context; return(0); From b3fb43800fb40973862abdeb8d25288e680d3fb3 Mon Sep 17 00:00:00 2001 From: Gasper Jansa Date: Mon, 10 Jan 2011 23:06:46 +0100 Subject: [PATCH 4/4] Fixing bad mering and test for search manager --- pvAccessApp/remote/channelSearchManager.cpp | 14 +- pvAccessApp/remote/channelSearchManager.h | 2 +- testApp/remote/testChannelSearchManager.cpp | 337 +++++++++++++++++++- 3 files changed, 341 insertions(+), 12 deletions(-) diff --git a/pvAccessApp/remote/channelSearchManager.cpp b/pvAccessApp/remote/channelSearchManager.cpp index 89b8627..c07c6a1 100644 --- a/pvAccessApp/remote/channelSearchManager.cpp +++ b/pvAccessApp/remote/channelSearchManager.cpp @@ -60,7 +60,7 @@ bool BaseSearchInstance::generateSearchRequestMessage(ByteBuffer* requestMessage return false; } - const string name = getChannelName(); + const String name = getSearchInstanceName(); // not nice... const int addedPayloadSize = sizeof(int32)/sizeof(int8) + (1 + sizeof(int32)/sizeof(int8) + name.length()); @@ -69,7 +69,7 @@ bool BaseSearchInstance::generateSearchRequestMessage(ByteBuffer* requestMessage return false; } - requestMessage->putInt(getChannelID()); + requestMessage->putInt(getSearchInstanceID()); SerializeHelper::serializeString(name, requestMessage, control); requestMessage->putInt(PAYLOAD_POSITION, requestMessage->getPosition() - CA_MESSAGE_HEADER_SIZE); @@ -421,7 +421,7 @@ const int64 ChannelSearchManager::MAX_SEARCH_PERIOD_LOWER_LIMIT = 60000; const int64 ChannelSearchManager::BEACON_ANOMALY_SEARCH_PERIOD = 5000; const int32 ChannelSearchManager::MAX_TIMERS = 18; -ChannelSearchManager::ChannelSearchManager(ClientContextImpl* context): +ChannelSearchManager::ChannelSearchManager(Context* context): _context(context), _canceled(false), _rttmean(MIN_RTT), @@ -503,17 +503,17 @@ void ChannelSearchManager::registerChannel(SearchInstance* channel) Lock guard(&_channelMutex); //overrides if already registered - _channels[channel->getChannelID()] = channel; + _channels[channel->getSearchInstanceID()] = channel; _timers[0]->installChannel(channel); } void ChannelSearchManager::unregisterChannel(SearchInstance* channel) { Lock guard(&_channelMutex); - _channelsIter = _channels.find(channel->getChannelID()); + _channelsIter = _channels.find(channel->getSearchInstanceID()); if(_channelsIter != _channels.end()) { - _channels.erase(channel->getChannelID()); + _channels.erase(channel->getSearchInstanceID()); } channel->removeAndUnsetListOwnership(); @@ -590,7 +590,7 @@ void ChannelSearchManager::flushSendBuffer() TimeStamp now; now.getCurrent(); _timeAtLastSend = now.getMilliseconds(); - _context->getSearchTransport()->send(_sendBuffer); + ((BlockingUDPTransport*)_context->getSearchTransport())->send(_sendBuffer); initializeSendBuffer(); } diff --git a/pvAccessApp/remote/channelSearchManager.h b/pvAccessApp/remote/channelSearchManager.h index 93a1873..09311c7 100644 --- a/pvAccessApp/remote/channelSearchManager.h +++ b/pvAccessApp/remote/channelSearchManager.h @@ -24,7 +24,7 @@ using namespace epics::pvData; namespace epics { namespace pvAccess { -//TODO check the const of paramerers +//TODO check the const of parameters /** * SearchInstance. diff --git a/testApp/remote/testChannelSearchManager.cpp b/testApp/remote/testChannelSearchManager.cpp index 601b639..b432ba7 100644 --- a/testApp/remote/testChannelSearchManager.cpp +++ b/testApp/remote/testChannelSearchManager.cpp @@ -6,12 +6,341 @@ using namespace epics::pvData; using namespace epics::pvAccess; +//TODO this will be deleted + class ChannelImpl; + + class ContextImpl : public Context + + { + + private: + + Timer* _timer; + + public: + + + + ContextImpl() + + { + + _timer = new Timer("krneki",lowPriority); + + } + + + + virtual Version* getVersion() { + + return NULL; + + } + + + + virtual ChannelProvider* getProvider() { + + return NULL; + + } + + + + Timer* getTimer() + + { + + return _timer; + + } + + + + virtual void initialize() { + + + + } + + + + virtual void printInfo() { + + + + } + + + + virtual void printInfo(epics::pvData::StringBuilder out) { + + + + } + + + + virtual void destroy() + + { + + + + } + + + + virtual void dispose() + + { + + + + } + + + + BlockingUDPTransport* getSearchTransport() + + { + + return NULL; + + } + + + + /** + + * Searches for a channel with given channel ID. + + * @param channelID CID. + + * @return channel with given CID, if non-existent. + + */ + + Channel* getChannel(pvAccessID channelID) + + { + + return NULL; + + } + + Configuration* getConfiguration() {return NULL;} + + TransportRegistry* getTransportRegistry() {return NULL;} + + ~ContextImpl() { delete _timer;}; + + private: + + + + + + void loadConfiguration() { + + + + } + + + + void internalInitialize() { + + + + + + } + + + + void initializeUDPTransport() { + + + + } + + + + void internalDestroy() { + + + + } + + + + void destroyAllChannels() { + + + + } + + + + /** + + * Check channel name. + + */ + + void checkChannelName(String& name) { + + + + } + + + + /** + + * Check context state and tries to establish necessary state. + + */ + + void checkState() { + + + + } + + + + + + + + /** + + * Generate Client channel ID (CID). + + * @return Client channel ID (CID). + + */ + + pvAccessID generateCID() + + { + + return 0; + + } + + + + /** + + * Free generated channel ID (CID). + + */ + + void freeCID(int cid) + + { + + + + } + + + + + + /** + + * Get, or create if necessary, transport of given server address. + + * @param serverAddress required transport address + + * @param priority process priority. + + * @return transport for given address + + */ + + Transport* getTransport(TransportClient* client, osiSockAddr* serverAddress, int minorRevision, int priority) + + { + + + + return NULL; + + } + + + + /** + + * Internal create channel. + + */ + + // TODO no minor version with the addresses + + // TODO what if there is an channel with the same name, but on different host! + + Channel* createChannelInternal(String name, ChannelRequester* requester, short priority, + + InetAddrVector* addresses) { + + return NULL; + + } + + + + /** + + * Destroy channel. + + * @param channel + + * @param force + + * @throws CAException + + * @throws IllegalStateException + + */ + + void destroyChannel(ChannelImpl* channel, bool force) { + + + + + + } + + + + /** + + * Get channel search manager. + + * @return channel search manager. + + */ + + ChannelSearchManager* getChannelSearchManager() { + + return NULL; + + } + + }; + class TestSearcInstance : public BaseSearchInstance { public: TestSearcInstance(string channelName, pvAccessID channelID): _channelID(channelID), _channelName(channelName) {} - pvAccessID getChannelID() { return _channelID;}; - string getChannelName() {return _channelName;}; + pvAccessID getSearchInstanceID() { return _channelID;}; + string getSearchInstanceName() {return _channelName;}; void searchResponse(int8 minorRevision, osiSockAddr* serverAddress) {}; private: pvAccessID _channelID; @@ -19,8 +348,8 @@ private: }; static const int max_channels = 100; -ClientContextImpl* context = new ClientContextImpl(); -ChannelSearchManager* manager = new ChannelSearchManager(context); +ContextImpl* context = new ContextImpl(); +ChannelSearchManager* manager = new ChannelSearchManager(static_cast(context)); TestSearcInstance** chanArray = new TestSearcInstance*[max_channels]; void* testWorker1(void* p)