remote connection close fix, request destroy fix

This commit is contained in:
Matej Sekoranja
2011-02-02 10:49:32 +01:00
parent 321be995cc
commit 8fdd51396c
3 changed files with 51 additions and 44 deletions

View File

@@ -440,51 +440,41 @@ namespace epics {
int currentStartPosition;
if(addToBuffer) {
currentStartPosition = _socketBuffer->getPosition();
_socketBuffer->setPosition(
_socketBuffer->getLimit());
_socketBuffer->setPosition(_socketBuffer->getLimit());
_socketBuffer->setLimit(_socketBuffer->getSize());
}
else {
// add to bytes read
_totalBytesReceived
+= (_socketBuffer->getPosition()
-_startPosition);
_totalBytesReceived += (_socketBuffer->getPosition() -_startPosition);
// copy remaining bytes, if any
int remainingBytes = _socketBuffer->getRemaining();
int endPosition = MAX_ENSURE_DATA_BUFFER_SIZE
+remainingBytes;
for(int i = MAX_ENSURE_DATA_BUFFER_SIZE; i
<endPosition; i++)
_socketBuffer->putByte(i,
_socketBuffer->getByte());
int endPosition = MAX_ENSURE_DATA_BUFFER_SIZE+remainingBytes;
for(int i = MAX_ENSURE_DATA_BUFFER_SIZE; i<endPosition; i++)
_socketBuffer->putByte(i, _socketBuffer->getByte());
currentStartPosition = _startPosition
= MAX_ENSURE_DATA_BUFFER_SIZE;
_socketBuffer->setPosition(
MAX_ENSURE_DATA_BUFFER_SIZE+remainingBytes);
currentStartPosition = _startPosition = MAX_ENSURE_DATA_BUFFER_SIZE;
_socketBuffer->setPosition(MAX_ENSURE_DATA_BUFFER_SIZE+remainingBytes);
_socketBuffer->setLimit(_socketBuffer->getSize());
}
// read at least requiredBytes bytes
int requiredPosition = (currentStartPosition
+requiredBytes);
int requiredPosition = (currentStartPosition+requiredBytes);
while(_socketBuffer->getPosition()<requiredPosition) {
// read
// TODO wrap and do not copy !!!
char readBuffer[MAX_TCP_RECV];
size_t maxToRead = min(MAX_TCP_RECV,
_socketBuffer->getRemaining());
ssize_t bytesRead = recv(_channel, readBuffer,
maxToRead, 0);
size_t maxToRead = min(MAX_TCP_RECV,_socketBuffer->getRemaining());
ssize_t bytesRead = recv(_channel, readBuffer, maxToRead, 0);
_socketBuffer->put(readBuffer, 0, bytesRead);
if(bytesRead<=0) {
// error (disconnect, end-of-stream) detected
close(true);
if(bytesRead<0&&nestedCall) THROW_BASE_EXCEPTION(
"bytesRead < 0");
if(nestedCall)
THROW_BASE_EXCEPTION("bytesRead < 0");
return;
}
@@ -503,16 +493,14 @@ namespace epics {
if(_stage==PROCESS_HEADER) {
// ensure CAConstants.CA_MESSAGE_HEADER_SIZE bytes of data
if(_socketBuffer->getRemaining()<CA_MESSAGE_HEADER_SIZE) processReadCached(
true, PROCESS_HEADER, CA_MESSAGE_HEADER_SIZE,
false);
if(_socketBuffer->getRemaining()<CA_MESSAGE_HEADER_SIZE)
processReadCached(true, PROCESS_HEADER, CA_MESSAGE_HEADER_SIZE, false);
// first byte is CA_MAGIC
// second byte version - major/minor nibble
// check magic and version at once
_magicAndVersion = _socketBuffer->getShort();
if((short)(_magicAndVersion&0xFFF0)
!=CA_MAGIC_AND_MAJOR_VERSION) {
if((short)(_magicAndVersion&0xFFF0)!=CA_MAGIC_AND_MAJOR_VERSION) {
// error... disconnect
errlogSevPrintf(
errlogMinor,
@@ -540,14 +528,14 @@ namespace epics {
if(_command==0) {
_flowControlMutex.lock();
if(_markerToSend==0)
_markerToSend = _payloadSize; // TODO send back response
_markerToSend = _payloadSize;
// TODO send back response
_flowControlMutex.unlock();
}
else //if (command == 1)
{
_flowControlMutex.lock();
int difference = (int)_totalBytesSent
-_payloadSize+CA_MESSAGE_HEADER_SIZE;
int difference = (int)_totalBytesSent-_payloadSize+CA_MESSAGE_HEADER_SIZE;
// overrun check
if(difference<0) difference += INT_MAX;
_remoteBufferFreeSpace
@@ -587,8 +575,7 @@ namespace epics {
// NOTE: nested data (w/ payload) messages between segmented messages are not supported
_storedPosition = _socketBuffer->getPosition();
_storedLimit = _socketBuffer->getLimit();
_socketBuffer->setLimit(min(_storedPosition
+_storedPayloadSize, _storedLimit));
_socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize, _storedLimit));
try {
// handle response
_responseHandler->handleResponse(&_socketAddress,
@@ -606,8 +593,7 @@ namespace epics {
if(newPosition>_storedLimit) {
newPosition -= _storedLimit;
_socketBuffer->setPosition(_storedLimit);
processReadCached(true, PROCESS_PAYLOAD,
newPosition, false);
processReadCached(true, PROCESS_PAYLOAD,newPosition, false);
newPosition += _startPosition;
}
_socketBuffer->setPosition(newPosition);

View File

@@ -62,7 +62,7 @@ namespace epics {
Requester* m_requester;
bool m_destroyed;
bool m_remotelyDestroyed;
bool m_remotelyDestroy;
/* negative... */
static const int NULL_REQUEST = -1;
@@ -88,7 +88,7 @@ namespace epics {
BaseRequestImpl(ChannelImpl* channel, Requester* requester) :
m_channel(channel), m_context(channel->getContext()),
m_requester(requester), m_destroyed(false), m_remotelyDestroyed(false),
m_requester(requester), m_destroyed(false), m_remotelyDestroy(false),
m_pendingRequest(NULL_REQUEST), m_refCount(1)
{
// register response request
@@ -138,12 +138,20 @@ namespace epics {
{
if (qos & QOS_INIT)
{
if (status->isSuccess())
{
// once created set destroy flag
m_mutex.lock();
m_remotelyDestroy = true;
m_mutex.unlock();
}
initResponse(transport, version, payloadBuffer, qos, status);
}
else if (qos & QOS_DESTROY)
{
m_mutex.lock();
m_remotelyDestroyed = true;
m_remotelyDestroy = false;
m_mutex.unlock();
if (!destroyResponse(transport, version, payloadBuffer, qos, status))
@@ -185,10 +193,16 @@ namespace epics {
m_channel->unregisterResponseRequest(this);
// destroy remote instance
if (!m_remotelyDestroyed)
if (m_remotelyDestroy)
{
startRequest(PURE_DESTROY_REQUEST);
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
try
{
startRequest(PURE_DESTROY_REQUEST);
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
} catch (...) {
// noop (do not complain if fails)
}
}
release();
@@ -1583,7 +1597,7 @@ namespace epics {
else if (qos & QOS_DESTROY)
{
Status* status = statusCreate->deserializeStatus(payloadBuffer, transport);
m_remotelyDestroyed = true;
m_remotelyDestroy = true;
if (!destroyResponse(transport, version, payloadBuffer, qos, status))
cancel();

View File

@@ -74,10 +74,15 @@ class GetFieldRequesterImpl : public GetFieldRequester
class ChannelGetRequesterImpl : public ChannelGetRequester
{
Mutex m_mutex;
ChannelGet *m_channelGet;
epics::pvData::PVStructure *m_pvStructure;
epics::pvData::BitSet *m_bitSet;
public:
ChannelGetRequesterImpl() : m_channelGet(0), m_pvStructure(0), m_bitSet(0) {}
virtual String getRequesterName()
{
return "ChannelGetRequesterImpl";
@@ -99,15 +104,17 @@ class ChannelGetRequesterImpl : public ChannelGetRequester
std::cout << st << std::endl;
}
// TODO sync
m_mutex.lock();
m_channelGet = channelGet;
m_pvStructure = pvStructure;
m_bitSet = bitSet;
m_mutex.unlock();
}
virtual void getDone(epics::pvData::Status *status)
{
std::cout << "getDone(" << status->toString() << ")" << std::endl;
Lock guard(&m_mutex);
if (m_pvStructure)
{
String str;
@@ -465,7 +472,7 @@ int main(int argc,char *argv[])
epicsThreadSleep ( 1.0 );
*/
ChannelGetRequesterImpl channelGetRequesterImpl;
pvRequest = getCreateRequest()->createRequest("field(value,timeStamp)",&channelGetRequesterImpl);
pvRequest = 0;//getCreateRequest()->createRequest("field(kiki)",&channelGetRequesterImpl);
ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, pvRequest);
epicsThreadSleep ( 3.0 );
channelGet->get(false);