#include "cdevBufferedSocket.h" // ***************************************************************************** // * cdevBufferedSocket::sigPipeHandler : // * This is a default signal pipe handler. // ***************************************************************************** void cdevBufferedSocket::sigPipeHandler ( int ) { } // ***************************************************************************** // * cdevBufferedSocket::newNode : // * This method will return a pointer to a cdevSimpleStreamNode. This // * node will then be populated with data that is read from the socket. // ***************************************************************************** cdevStreamNode * cdevBufferedSocket::newNode ( ssize_t size ) { cdevSimpleStreamNode * ptr = NULL; if(size>0) ptr = new cdevSimpleStreamNode(new char[size], size); return ptr; } // ***************************************************************************** // * cdevBufferedSocket::transmit : // * This method will attempt to transmit as many packets as immediately // * possible from the activeOut cdevStreamQueue. The function will cease // * transmitting when the queue is empty or when the receiver is fails // * to accept data after a pre-specified number of retries. // ***************************************************************************** ssize_t cdevBufferedSocket::transmit ( void ) { int totalSent = 0; int parm = 0; int retval = 0; int retries = 0; #ifndef WIN32 void (*oldSig)(int) = signal(SIGPIPE, sigPipeHandler); #endif setOption(IPPROTO_TCP, TCP_NODELAY, &parm, sizeof(parm)); if(activeOut.isEmpty()) { if(!waitingOut.isEmpty()) { activeOut.enqueue(waitingOut); headerXfrLen = 0; dataXfrLen = 0; outboundHeader[0] = htonl(magicNumber); outboundHeader[1] = htonl(activeOut.getSize()+sizeof(size_t)); outboundHeader[2] = htonl(activeOut.getCount()); } else return 0; } while(headerXfrLen<12 && retries=0) { retval = send(((char *)outboundHeader)+headerXfrLen, 12-headerXfrLen); if(retval>=0) { retries = retval?0:retries+1; headerXfrLen += retval; totalSent += retval; } } while(headerXfrLen>=12 && retries=0 && !activeOut.isEmpty()) { cdevStreamNode * node = activeOut.dequeue(); while(dataXfrLen<4 && retries=0) { parm = htonl(node->getLen()); retval = send(((char *)&parm)+dataXfrLen, 4-dataXfrLen); if(retval>=0) { retries = retval?0:retries+1; dataXfrLen += retval; totalSent += retval; } } while(dataXfrLen>=4 && dataXfrLengetLen()+4 && retries=0) { retval = send(node->getBuf()+(dataXfrLen-4), node->getLen()-(dataXfrLen-4)); if(retval>=0) { retries = retval?0:retries+1; dataXfrLen += retval; totalSent += retval; } } if(dataXfrLengetLen()+4) activeOut.poke(node); else { if(deleteFlag) delete node; dataXfrLen = 0; } } parm = 1; setOption(IPPROTO_TCP, TCP_NODELAY, &parm, sizeof(parm)); #ifndef WIN32 signal(SIGPIPE, oldSig); #endif return retval>=0?totalSent:-1; } // ***************************************************************************** // * cdevBufferedSocket::receive : // * This method will attempt to receive as many packets as immediately // * possible from the socket into the waitingIn cdevStreamQueue. The // * function will cease receiving when a complete transmission block // * (multiple packets) have been received or when the sender has failed // * to transmit data after a pre-specified number of retries. // ***************************************************************************** ssize_t cdevBufferedSocket::receive ( void ) { int totalRecv = 0; int parm = 0; int retval = 0; int retries = 0; size_t packetCnt = 0; while(headerRcvLen<12 && retries=0) { if((retval = recv(((char *)inboundHeader)+headerRcvLen, 12-headerRcvLen))==0) { int errCode = GetSocketErrno(); if(errCode!=EWOULDBLOCK && errCode!=EAGAIN) retval = -1; else retries++; } else if(retval>=0) { retries = retval?0:retries+1; headerRcvLen += retval; totalRecv += retval; } if(headerRcvLen>=12) { inboundHeader[0] = ntohl(inboundHeader[0]); inboundHeader[1] = ntohl(inboundHeader[1]); inboundHeader[2] = ntohl(inboundHeader[2]); if(inboundHeader[0]!=magicNumber) retval=-1; } } while(headerRcvLen>=12 && retries=0 && inboundHeader[1]>0 && inboundHeader[2]>0) { while(dataRcvLen<4 && retries=0) { if((retval = recv(((char *)&subPacketLen)+dataRcvLen, 4-dataRcvLen))==0) { int errCode = GetSocketErrno(); if(errCode!=EWOULDBLOCK && errCode!=EAGAIN) retval = -1; else retries++; } else if(retval>0) { retries = 0; dataRcvLen += retval; totalRecv += retval; inboundHeader[1]-=retval; } if(dataRcvLen>=4) { subPacketLen = ntohl(subPacketLen); rcvNode = nodeFactory->newNode(subPacketLen); memset(rcvNode->getBuf(), rcvNode->getLen(), 0); } } while(dataRcvLen>=4 && dataRcvLengetLen()+4 && retries=0) { if((retval = recv(rcvNode->getBuf()+(dataRcvLen-4), rcvNode->getLen()-(dataRcvLen-4)))==0) { int errCode = GetSocketErrno(); if(errCode!=EWOULDBLOCK && errCode!=EAGAIN) retval = -1; else retries++; } else if(retval>=0) { retries = 0; dataRcvLen += retval; totalRecv += retval; inboundHeader[1]-=retval; } if(dataRcvLen>=(rcvNode->getLen()+4)) { waitingIn.enqueue(rcvNode); rcvNode = NULL; dataRcvLen = 0; subPacketLen = 0; inboundHeader[2]--; } } if(inboundHeader[1]<=0 || inboundHeader[2]<=0) { if(rcvNode) // Only received a partial packet on final entry { delete rcvNode; rcvNode = NULL; retval = -1; } headerRcvLen = 0; dataRcvLen = 0; inboundHeader[0]=(inboundHeader[1]=(inboundHeader[2]=0)); } } return retval>=0?totalRecv:-1; } // ***************************************************************************** // * cdevBufferedSocket::getBlockingSemantics : // * Identifies the cdevBufferedSocket object as a NON-BLOCKING entity. // ***************************************************************************** int cdevBufferedSocket::getBlockingSemantics ( void ) const { return O_NONBLOCK; } // ***************************************************************************** // * cdevBufferedSocket::getRcvBufferSize : // * This method is called during the configureHandle method to obtain the // * size of the receive buffer that will be used by the socket. This class // * returns the value 56000 - telling the cdevSocketStream to set the // * receive buffer size to 56 kilobytes. // ***************************************************************************** int cdevBufferedSocket::getRcvBufferSize ( void ) const { return 56000; } // ***************************************************************************** // * cdevBufferedSocket::getSndBufferSize : // * This method is called during the configureHandle method to obtain the // * size of the send buffer that will be used by the socket. This class // * returns the value 56000 - telling the cdevSocketStream to set the // * send buffer size to 56 kilobytes. // ***************************************************************************** int cdevBufferedSocket::getSndBufferSize ( void ) const { return 56000; }