cdev-1.7.2n

This commit is contained in:
2022-12-13 12:44:04 +01:00
commit b3b88fc333
1357 changed files with 338883 additions and 0 deletions

View File

@@ -0,0 +1,256 @@
#include "cdevBufferedSocket.h"
// *****************************************************************************
// * cdevBufferedSocket::sigPipeHandler :
// * This is a default signal pipe handler.
// *****************************************************************************
void cdevBufferedSocket::sigPipeHandler ( int )
{
}
// *****************************************************************************
// * cdevBufferedSocket::newNode :
// * This method will return a pointer to a cdevSimpleStreamNode. This
// * node will then be populated with data that is read from the socket.
// *****************************************************************************
cdevStreamNode * cdevBufferedSocket::newNode ( ssize_t size )
{
cdevSimpleStreamNode * ptr = NULL;
if(size>0) ptr = new cdevSimpleStreamNode(new char[size], size);
return ptr;
}
// *****************************************************************************
// * cdevBufferedSocket::transmit :
// * This method will attempt to transmit as many packets as immediately
// * possible from the activeOut cdevStreamQueue. The function will cease
// * transmitting when the queue is empty or when the receiver is fails
// * to accept data after a pre-specified number of retries.
// *****************************************************************************
ssize_t cdevBufferedSocket::transmit ( void )
{
int totalSent = 0;
int parm = 0;
int retval = 0;
int retries = 0;
#ifndef WIN32
void (*oldSig)(int) = signal(SIGPIPE, sigPipeHandler);
#endif
setOption(IPPROTO_TCP, TCP_NODELAY, &parm, sizeof(parm));
if(activeOut.isEmpty())
{
if(!waitingOut.isEmpty())
{
activeOut.enqueue(waitingOut);
headerXfrLen = 0;
dataXfrLen = 0;
outboundHeader[0] = htonl(magicNumber);
outboundHeader[1] = htonl(activeOut.getSize()+sizeof(size_t));
outboundHeader[2] = htonl(activeOut.getCount());
}
else return 0;
}
while(headerXfrLen<12 && retries<RETRYCNT && retval>=0)
{
retval = send(((char *)outboundHeader)+headerXfrLen, 12-headerXfrLen);
if(retval>=0)
{
retries = retval?0:retries+1;
headerXfrLen += retval;
totalSent += retval;
}
}
while(headerXfrLen>=12 && retries<RETRYCNT && retval>=0 && !activeOut.isEmpty())
{
cdevStreamNode * node = activeOut.dequeue();
while(dataXfrLen<4 && retries<RETRYCNT && retval>=0)
{
parm = htonl(node->getLen());
retval = send(((char *)&parm)+dataXfrLen, 4-dataXfrLen);
if(retval>=0)
{
retries = retval?0:retries+1;
dataXfrLen += retval;
totalSent += retval;
}
}
while(dataXfrLen>=4 && dataXfrLen<node->getLen()+4 && retries<RETRYCNT && retval>=0)
{
retval = send(node->getBuf()+(dataXfrLen-4), node->getLen()-(dataXfrLen-4));
if(retval>=0)
{
retries = retval?0:retries+1;
dataXfrLen += retval;
totalSent += retval;
}
}
if(dataXfrLen<node->getLen()+4) activeOut.poke(node);
else {
if(deleteFlag) delete node;
dataXfrLen = 0;
}
}
parm = 1;
setOption(IPPROTO_TCP, TCP_NODELAY, &parm, sizeof(parm));
#ifndef WIN32
signal(SIGPIPE, oldSig);
#endif
return retval>=0?totalSent:-1;
}
// *****************************************************************************
// * cdevBufferedSocket::receive :
// * This method will attempt to receive as many packets as immediately
// * possible from the socket into the waitingIn cdevStreamQueue. The
// * function will cease receiving when a complete transmission block
// * (multiple packets) have been received or when the sender has failed
// * to transmit data after a pre-specified number of retries.
// *****************************************************************************
ssize_t cdevBufferedSocket::receive ( void )
{
int totalRecv = 0;
int parm = 0;
int retval = 0;
int retries = 0;
size_t packetCnt = 0;
while(headerRcvLen<12 && retries<RETRYCNT && retval>=0)
{
if((retval = recv(((char *)inboundHeader)+headerRcvLen, 12-headerRcvLen))==0)
{
int errCode = GetSocketErrno();
if(errCode!=EWOULDBLOCK && errCode!=EAGAIN) retval = -1;
else retries++;
}
else if(retval>=0)
{
retries = retval?0:retries+1;
headerRcvLen += retval;
totalRecv += retval;
}
if(headerRcvLen>=12)
{
inboundHeader[0] = ntohl(inboundHeader[0]);
inboundHeader[1] = ntohl(inboundHeader[1]);
inboundHeader[2] = ntohl(inboundHeader[2]);
if(inboundHeader[0]!=magicNumber) retval=-1;
}
}
while(headerRcvLen>=12 && retries<RETRYCNT && retval>=0 &&
inboundHeader[1]>0 && inboundHeader[2]>0)
{
while(dataRcvLen<4 && retries<RETRYCNT && retval>=0)
{
if((retval = recv(((char *)&subPacketLen)+dataRcvLen, 4-dataRcvLen))==0)
{
int errCode = GetSocketErrno();
if(errCode!=EWOULDBLOCK && errCode!=EAGAIN) retval = -1;
else retries++;
}
else if(retval>0)
{
retries = 0;
dataRcvLen += retval;
totalRecv += retval;
inboundHeader[1]-=retval;
}
if(dataRcvLen>=4)
{
subPacketLen = ntohl(subPacketLen);
rcvNode = nodeFactory->newNode(subPacketLen);
memset(rcvNode->getBuf(), rcvNode->getLen(), 0);
}
}
while(dataRcvLen>=4 && dataRcvLen<rcvNode->getLen()+4 && retries<RETRYCNT && retval>=0)
{
if((retval = recv(rcvNode->getBuf()+(dataRcvLen-4), rcvNode->getLen()-(dataRcvLen-4)))==0)
{
int errCode = GetSocketErrno();
if(errCode!=EWOULDBLOCK && errCode!=EAGAIN) retval = -1;
else retries++;
}
else if(retval>=0)
{
retries = 0;
dataRcvLen += retval;
totalRecv += retval;
inboundHeader[1]-=retval;
}
if(dataRcvLen>=(rcvNode->getLen()+4))
{
waitingIn.enqueue(rcvNode);
rcvNode = NULL;
dataRcvLen = 0;
subPacketLen = 0;
inboundHeader[2]--;
}
}
if(inboundHeader[1]<=0 || inboundHeader[2]<=0)
{
if(rcvNode) // Only received a partial packet on final entry
{
delete rcvNode;
rcvNode = NULL;
retval = -1;
}
headerRcvLen = 0;
dataRcvLen = 0;
inboundHeader[0]=(inboundHeader[1]=(inboundHeader[2]=0));
}
}
return retval>=0?totalRecv:-1;
}
// *****************************************************************************
// * cdevBufferedSocket::getBlockingSemantics :
// * Identifies the cdevBufferedSocket object as a NON-BLOCKING entity.
// *****************************************************************************
int cdevBufferedSocket::getBlockingSemantics ( void ) const
{
return O_NONBLOCK;
}
// *****************************************************************************
// * cdevBufferedSocket::getRcvBufferSize :
// * This method is called during the configureHandle method to obtain the
// * size of the receive buffer that will be used by the socket. This class
// * returns the value 56000 - telling the cdevSocketStream to set the
// * receive buffer size to 56 kilobytes.
// *****************************************************************************
int cdevBufferedSocket::getRcvBufferSize ( void ) const
{
return 56000;
}
// *****************************************************************************
// * cdevBufferedSocket::getSndBufferSize :
// * This method is called during the configureHandle method to obtain the
// * size of the send buffer that will be used by the socket. This class
// * returns the value 56000 - telling the cdevSocketStream to set the
// * send buffer size to 56 kilobytes.
// *****************************************************************************
int cdevBufferedSocket::getSndBufferSize ( void ) const
{
return 56000;
}