257 lines
7.5 KiB
C++
Executable File
257 lines
7.5 KiB
C++
Executable File
#include "cdevBufferedSocket.h"
|
|
|
|
// *****************************************************************************
|
|
// * cdevBufferedSocket::sigPipeHandler :
|
|
// * This is a default signal pipe handler.
|
|
// *****************************************************************************
|
|
void cdevBufferedSocket::sigPipeHandler ( int )
|
|
{
|
|
}
|
|
|
|
|
|
// *****************************************************************************
|
|
// * cdevBufferedSocket::newNode :
|
|
// * This method will return a pointer to a cdevSimpleStreamNode. This
|
|
// * node will then be populated with data that is read from the socket.
|
|
// *****************************************************************************
|
|
cdevStreamNode * cdevBufferedSocket::newNode ( ssize_t size )
|
|
{
|
|
cdevSimpleStreamNode * ptr = NULL;
|
|
if(size>0) ptr = new cdevSimpleStreamNode(new char[size], size);
|
|
return ptr;
|
|
}
|
|
|
|
|
|
// *****************************************************************************
|
|
// * cdevBufferedSocket::transmit :
|
|
// * This method will attempt to transmit as many packets as immediately
|
|
// * possible from the activeOut cdevStreamQueue. The function will cease
|
|
// * transmitting when the queue is empty or when the receiver is fails
|
|
// * to accept data after a pre-specified number of retries.
|
|
// *****************************************************************************
|
|
ssize_t cdevBufferedSocket::transmit ( void )
|
|
{
|
|
int totalSent = 0;
|
|
int parm = 0;
|
|
int retval = 0;
|
|
int retries = 0;
|
|
|
|
#ifndef WIN32
|
|
void (*oldSig)(int) = signal(SIGPIPE, sigPipeHandler);
|
|
#endif
|
|
|
|
setOption(IPPROTO_TCP, TCP_NODELAY, &parm, sizeof(parm));
|
|
|
|
if(activeOut.isEmpty())
|
|
{
|
|
if(!waitingOut.isEmpty())
|
|
{
|
|
activeOut.enqueue(waitingOut);
|
|
headerXfrLen = 0;
|
|
dataXfrLen = 0;
|
|
outboundHeader[0] = htonl(magicNumber);
|
|
outboundHeader[1] = htonl(activeOut.getSize()+sizeof(size_t));
|
|
outboundHeader[2] = htonl(activeOut.getCount());
|
|
}
|
|
else return 0;
|
|
}
|
|
|
|
while(headerXfrLen<12 && retries<RETRYCNT && retval>=0)
|
|
{
|
|
retval = send(((char *)outboundHeader)+headerXfrLen, 12-headerXfrLen);
|
|
if(retval>=0)
|
|
{
|
|
retries = retval?0:retries+1;
|
|
headerXfrLen += retval;
|
|
totalSent += retval;
|
|
}
|
|
}
|
|
|
|
while(headerXfrLen>=12 && retries<RETRYCNT && retval>=0 && !activeOut.isEmpty())
|
|
{
|
|
cdevStreamNode * node = activeOut.dequeue();
|
|
|
|
while(dataXfrLen<4 && retries<RETRYCNT && retval>=0)
|
|
{
|
|
parm = htonl(node->getLen());
|
|
retval = send(((char *)&parm)+dataXfrLen, 4-dataXfrLen);
|
|
if(retval>=0)
|
|
{
|
|
retries = retval?0:retries+1;
|
|
dataXfrLen += retval;
|
|
totalSent += retval;
|
|
}
|
|
}
|
|
|
|
while(dataXfrLen>=4 && dataXfrLen<node->getLen()+4 && retries<RETRYCNT && retval>=0)
|
|
{
|
|
retval = send(node->getBuf()+(dataXfrLen-4), node->getLen()-(dataXfrLen-4));
|
|
if(retval>=0)
|
|
{
|
|
retries = retval?0:retries+1;
|
|
dataXfrLen += retval;
|
|
totalSent += retval;
|
|
}
|
|
}
|
|
|
|
if(dataXfrLen<node->getLen()+4) activeOut.poke(node);
|
|
else {
|
|
if(deleteFlag) delete node;
|
|
dataXfrLen = 0;
|
|
}
|
|
}
|
|
|
|
parm = 1;
|
|
setOption(IPPROTO_TCP, TCP_NODELAY, &parm, sizeof(parm));
|
|
|
|
#ifndef WIN32
|
|
signal(SIGPIPE, oldSig);
|
|
#endif
|
|
|
|
return retval>=0?totalSent:-1;
|
|
}
|
|
|
|
|
|
// *****************************************************************************
|
|
// * cdevBufferedSocket::receive :
|
|
// * This method will attempt to receive as many packets as immediately
|
|
// * possible from the socket into the waitingIn cdevStreamQueue. The
|
|
// * function will cease receiving when a complete transmission block
|
|
// * (multiple packets) have been received or when the sender has failed
|
|
// * to transmit data after a pre-specified number of retries.
|
|
// *****************************************************************************
|
|
ssize_t cdevBufferedSocket::receive ( void )
|
|
{
|
|
int totalRecv = 0;
|
|
int parm = 0;
|
|
int retval = 0;
|
|
int retries = 0;
|
|
size_t packetCnt = 0;
|
|
|
|
while(headerRcvLen<12 && retries<RETRYCNT && retval>=0)
|
|
{
|
|
if((retval = recv(((char *)inboundHeader)+headerRcvLen, 12-headerRcvLen))==0)
|
|
{
|
|
int errCode = GetSocketErrno();
|
|
|
|
if(errCode!=EWOULDBLOCK && errCode!=EAGAIN) retval = -1;
|
|
else retries++;
|
|
}
|
|
else if(retval>=0)
|
|
{
|
|
retries = retval?0:retries+1;
|
|
headerRcvLen += retval;
|
|
totalRecv += retval;
|
|
}
|
|
if(headerRcvLen>=12)
|
|
{
|
|
inboundHeader[0] = ntohl(inboundHeader[0]);
|
|
inboundHeader[1] = ntohl(inboundHeader[1]);
|
|
inboundHeader[2] = ntohl(inboundHeader[2]);
|
|
|
|
if(inboundHeader[0]!=magicNumber) retval=-1;
|
|
}
|
|
}
|
|
|
|
while(headerRcvLen>=12 && retries<RETRYCNT && retval>=0 &&
|
|
inboundHeader[1]>0 && inboundHeader[2]>0)
|
|
{
|
|
while(dataRcvLen<4 && retries<RETRYCNT && retval>=0)
|
|
{
|
|
if((retval = recv(((char *)&subPacketLen)+dataRcvLen, 4-dataRcvLen))==0)
|
|
{
|
|
int errCode = GetSocketErrno();
|
|
|
|
if(errCode!=EWOULDBLOCK && errCode!=EAGAIN) retval = -1;
|
|
else retries++;
|
|
}
|
|
else if(retval>0)
|
|
{
|
|
retries = 0;
|
|
dataRcvLen += retval;
|
|
totalRecv += retval;
|
|
inboundHeader[1]-=retval;
|
|
}
|
|
if(dataRcvLen>=4)
|
|
{
|
|
subPacketLen = ntohl(subPacketLen);
|
|
rcvNode = nodeFactory->newNode(subPacketLen);
|
|
memset(rcvNode->getBuf(), rcvNode->getLen(), 0);
|
|
}
|
|
}
|
|
|
|
while(dataRcvLen>=4 && dataRcvLen<rcvNode->getLen()+4 && retries<RETRYCNT && retval>=0)
|
|
{
|
|
if((retval = recv(rcvNode->getBuf()+(dataRcvLen-4), rcvNode->getLen()-(dataRcvLen-4)))==0)
|
|
{
|
|
int errCode = GetSocketErrno();
|
|
|
|
if(errCode!=EWOULDBLOCK && errCode!=EAGAIN) retval = -1;
|
|
else retries++;
|
|
}
|
|
else if(retval>=0)
|
|
{
|
|
retries = 0;
|
|
dataRcvLen += retval;
|
|
totalRecv += retval;
|
|
inboundHeader[1]-=retval;
|
|
}
|
|
if(dataRcvLen>=(rcvNode->getLen()+4))
|
|
{
|
|
waitingIn.enqueue(rcvNode);
|
|
rcvNode = NULL;
|
|
dataRcvLen = 0;
|
|
subPacketLen = 0;
|
|
inboundHeader[2]--;
|
|
}
|
|
}
|
|
if(inboundHeader[1]<=0 || inboundHeader[2]<=0)
|
|
{
|
|
if(rcvNode) // Only received a partial packet on final entry
|
|
{
|
|
delete rcvNode;
|
|
rcvNode = NULL;
|
|
retval = -1;
|
|
}
|
|
headerRcvLen = 0;
|
|
dataRcvLen = 0;
|
|
inboundHeader[0]=(inboundHeader[1]=(inboundHeader[2]=0));
|
|
}
|
|
}
|
|
return retval>=0?totalRecv:-1;
|
|
}
|
|
|
|
// *****************************************************************************
|
|
// * cdevBufferedSocket::getBlockingSemantics :
|
|
// * Identifies the cdevBufferedSocket object as a NON-BLOCKING entity.
|
|
// *****************************************************************************
|
|
int cdevBufferedSocket::getBlockingSemantics ( void ) const
|
|
{
|
|
return O_NONBLOCK;
|
|
}
|
|
|
|
// *****************************************************************************
|
|
// * cdevBufferedSocket::getRcvBufferSize :
|
|
// * This method is called during the configureHandle method to obtain the
|
|
// * size of the receive buffer that will be used by the socket. This class
|
|
// * returns the value 56000 - telling the cdevSocketStream to set the
|
|
// * receive buffer size to 56 kilobytes.
|
|
// *****************************************************************************
|
|
int cdevBufferedSocket::getRcvBufferSize ( void ) const
|
|
{
|
|
return 56000;
|
|
}
|
|
|
|
|
|
// *****************************************************************************
|
|
// * cdevBufferedSocket::getSndBufferSize :
|
|
// * This method is called during the configureHandle method to obtain the
|
|
// * size of the send buffer that will be used by the socket. This class
|
|
// * returns the value 56000 - telling the cdevSocketStream to set the
|
|
// * send buffer size to 56 kilobytes.
|
|
// *****************************************************************************
|
|
int cdevBufferedSocket::getSndBufferSize ( void ) const
|
|
{
|
|
return 56000;
|
|
}
|