diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index 8c5e1e7..ee4204c 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -440,51 +440,41 @@ namespace epics { int currentStartPosition; if(addToBuffer) { currentStartPosition = _socketBuffer->getPosition(); - _socketBuffer->setPosition( - _socketBuffer->getLimit()); + _socketBuffer->setPosition(_socketBuffer->getLimit()); _socketBuffer->setLimit(_socketBuffer->getSize()); } else { // add to bytes read - _totalBytesReceived - += (_socketBuffer->getPosition() - -_startPosition); + _totalBytesReceived += (_socketBuffer->getPosition() -_startPosition); // copy remaining bytes, if any int remainingBytes = _socketBuffer->getRemaining(); - int endPosition = MAX_ENSURE_DATA_BUFFER_SIZE - +remainingBytes; - for(int i = MAX_ENSURE_DATA_BUFFER_SIZE; i - putByte(i, - _socketBuffer->getByte()); + int endPosition = MAX_ENSURE_DATA_BUFFER_SIZE+remainingBytes; + for(int i = MAX_ENSURE_DATA_BUFFER_SIZE; iputByte(i, _socketBuffer->getByte()); - currentStartPosition = _startPosition - = MAX_ENSURE_DATA_BUFFER_SIZE; - _socketBuffer->setPosition( - MAX_ENSURE_DATA_BUFFER_SIZE+remainingBytes); + currentStartPosition = _startPosition = MAX_ENSURE_DATA_BUFFER_SIZE; + _socketBuffer->setPosition(MAX_ENSURE_DATA_BUFFER_SIZE+remainingBytes); _socketBuffer->setLimit(_socketBuffer->getSize()); } // read at least requiredBytes bytes - int requiredPosition = (currentStartPosition - +requiredBytes); + int requiredPosition = (currentStartPosition+requiredBytes); while(_socketBuffer->getPosition()getRemaining()); - ssize_t bytesRead = recv(_channel, readBuffer, - maxToRead, 0); + size_t maxToRead = min(MAX_TCP_RECV,_socketBuffer->getRemaining()); + ssize_t bytesRead = recv(_channel, readBuffer, maxToRead, 0); _socketBuffer->put(readBuffer, 0, bytesRead); if(bytesRead<=0) { // error (disconnect, end-of-stream) detected close(true); - if(bytesRead<0&&nestedCall) THROW_BASE_EXCEPTION( - "bytesRead < 0"); + if(nestedCall) + THROW_BASE_EXCEPTION("bytesRead < 0"); return; } @@ -503,16 +493,14 @@ namespace epics { if(_stage==PROCESS_HEADER) { // ensure CAConstants.CA_MESSAGE_HEADER_SIZE bytes of data - if(_socketBuffer->getRemaining()getRemaining()getShort(); - if((short)(_magicAndVersion&0xFFF0) - !=CA_MAGIC_AND_MAJOR_VERSION) { + if((short)(_magicAndVersion&0xFFF0)!=CA_MAGIC_AND_MAJOR_VERSION) { // error... disconnect errlogSevPrintf( errlogMinor, @@ -540,14 +528,14 @@ namespace epics { if(_command==0) { _flowControlMutex.lock(); if(_markerToSend==0) - _markerToSend = _payloadSize; // TODO send back response + _markerToSend = _payloadSize; + // TODO send back response _flowControlMutex.unlock(); } else //if (command == 1) { _flowControlMutex.lock(); - int difference = (int)_totalBytesSent - -_payloadSize+CA_MESSAGE_HEADER_SIZE; + int difference = (int)_totalBytesSent-_payloadSize+CA_MESSAGE_HEADER_SIZE; // overrun check if(difference<0) difference += INT_MAX; _remoteBufferFreeSpace @@ -587,8 +575,7 @@ namespace epics { // NOTE: nested data (w/ payload) messages between segmented messages are not supported _storedPosition = _socketBuffer->getPosition(); _storedLimit = _socketBuffer->getLimit(); - _socketBuffer->setLimit(min(_storedPosition - +_storedPayloadSize, _storedLimit)); + _socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize, _storedLimit)); try { // handle response _responseHandler->handleResponse(&_socketAddress, @@ -606,8 +593,7 @@ namespace epics { if(newPosition>_storedLimit) { newPosition -= _storedLimit; _socketBuffer->setPosition(_storedLimit); - processReadCached(true, PROCESS_PAYLOAD, - newPosition, false); + processReadCached(true, PROCESS_PAYLOAD,newPosition, false); newPosition += _startPosition; } _socketBuffer->setPosition(newPosition); diff --git a/pvAccessApp/remoteClient/clientContextImpl.cpp b/pvAccessApp/remoteClient/clientContextImpl.cpp index 6ab1de8..d75cac2 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.cpp +++ b/pvAccessApp/remoteClient/clientContextImpl.cpp @@ -62,7 +62,7 @@ namespace epics { Requester* m_requester; bool m_destroyed; - bool m_remotelyDestroyed; + bool m_remotelyDestroy; /* negative... */ static const int NULL_REQUEST = -1; @@ -88,7 +88,7 @@ namespace epics { BaseRequestImpl(ChannelImpl* channel, Requester* requester) : m_channel(channel), m_context(channel->getContext()), - m_requester(requester), m_destroyed(false), m_remotelyDestroyed(false), + m_requester(requester), m_destroyed(false), m_remotelyDestroy(false), m_pendingRequest(NULL_REQUEST), m_refCount(1) { // register response request @@ -138,12 +138,20 @@ namespace epics { { if (qos & QOS_INIT) { + if (status->isSuccess()) + { + // once created set destroy flag + m_mutex.lock(); + m_remotelyDestroy = true; + m_mutex.unlock(); + } + initResponse(transport, version, payloadBuffer, qos, status); } else if (qos & QOS_DESTROY) { m_mutex.lock(); - m_remotelyDestroyed = true; + m_remotelyDestroy = false; m_mutex.unlock(); if (!destroyResponse(transport, version, payloadBuffer, qos, status)) @@ -185,10 +193,16 @@ namespace epics { m_channel->unregisterResponseRequest(this); // destroy remote instance - if (!m_remotelyDestroyed) + if (m_remotelyDestroy) { - startRequest(PURE_DESTROY_REQUEST); - m_channel->checkAndGetTransport()->enqueueSendRequest(this); + try + { + startRequest(PURE_DESTROY_REQUEST); + m_channel->checkAndGetTransport()->enqueueSendRequest(this); + } catch (...) { + // noop (do not complain if fails) + } + } release(); @@ -1583,7 +1597,7 @@ namespace epics { else if (qos & QOS_DESTROY) { Status* status = statusCreate->deserializeStatus(payloadBuffer, transport); - m_remotelyDestroyed = true; + m_remotelyDestroy = true; if (!destroyResponse(transport, version, payloadBuffer, qos, status)) cancel(); diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index a611478..743a05d 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -74,10 +74,15 @@ class GetFieldRequesterImpl : public GetFieldRequester class ChannelGetRequesterImpl : public ChannelGetRequester { + Mutex m_mutex; ChannelGet *m_channelGet; epics::pvData::PVStructure *m_pvStructure; epics::pvData::BitSet *m_bitSet; + public: + + ChannelGetRequesterImpl() : m_channelGet(0), m_pvStructure(0), m_bitSet(0) {} + virtual String getRequesterName() { return "ChannelGetRequesterImpl"; @@ -99,15 +104,17 @@ class ChannelGetRequesterImpl : public ChannelGetRequester std::cout << st << std::endl; } - // TODO sync + m_mutex.lock(); m_channelGet = channelGet; m_pvStructure = pvStructure; m_bitSet = bitSet; + m_mutex.unlock(); } virtual void getDone(epics::pvData::Status *status) { std::cout << "getDone(" << status->toString() << ")" << std::endl; + Lock guard(&m_mutex); if (m_pvStructure) { String str; @@ -465,7 +472,7 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 1.0 ); */ ChannelGetRequesterImpl channelGetRequesterImpl; - pvRequest = getCreateRequest()->createRequest("field(value,timeStamp)",&channelGetRequesterImpl); + pvRequest = 0;//getCreateRequest()->createRequest("field(kiki)",&channelGetRequesterImpl); ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, pvRequest); epicsThreadSleep ( 3.0 ); channelGet->get(false);