#if !defined (_SOCKET_UTIL_H) #define _SOCKET_UTIL_H #include #include #include class SocketReader : virtual public ErrorReporter { public: enum { SHUTDOWN_CODE = -128}; enum { MAX_RETRIES = 1000 }; enum { READBUF_INITIAL_SIZE = 56000 }; virtual int getHandle ( void ) const = 0; SocketReader ( long MagicNumber = 0L ); ~SocketReader ( void ); int read ( char ** buf, int * len ); virtual int readNextPacket ( char ** buf, int * len ); virtual int reading ( void ); virtual int readReset ( void ); private: char * readBuf; char * readNextPktPtr; int readBufMax; int readPktCnt; int readPktXfrCnt; int readBufLen; int readXfrLen; int readPktXfrLen; int readRetries; // *************************** // * This section added to // * optionally support magic // * number for buffer // * validation. // *************************** const long readMagicNumber; int readMagicLen; long readMagicVal; }; class SocketWriter : virtual public ErrorReporter { public: enum { WRITEBUF_INITIAL_SIZE = 56000 }; enum { WRITEBUF_PAD_SIZE = 32 }; virtual int getHandle ( void ) const = 0; SocketWriter ( long MagicNumber = 0L ); ~SocketWriter ( void ); int write ( char * buf, int buflen); int writeEnqueue ( char * buf, int buflen); virtual int writing ( void ); virtual int writeContinue ( void ); virtual int writeReset ( void ); virtual int writeGoodbye ( void ); private: char * writeBuf; char * writeNextPktPtr; int writeBufMax; int writePktCnt; int writeBufLen; int writeXfrLen; int writePktXfrLen; // *************************** // * This section added to // * optionally support magic // * number for buffer // * validation. // *************************** const long writeMagicNumber; int writeMagicLen; }; inline SocketReader::SocketReader ( long MagicNumber ) : readBuf(NULL), readNextPktPtr(NULL), readBufMax(READBUF_INITIAL_SIZE), readPktCnt(0), readPktXfrCnt(0), readBufLen(0), readXfrLen(0), readPktXfrLen(0), readRetries(0), // ************************* // * Magic number support. // ************************* readMagicNumber(MagicNumber), readMagicLen(0), readMagicVal(0L) { readBuf = (char *)malloc(readBufMax); readNextPktPtr = readBuf+RNDUP(sizeof(long)); } inline SocketReader::~SocketReader ( void ) { if(readBuf!=NULL) free(readBuf); } inline int SocketReader::reading ( void ) { return (readPktCnt>readPktXfrCnt)?1:0; } inline int SocketReader::readReset ( void ) { readNextPktPtr = readBuf+RNDUP(sizeof(long)); readBufLen = 0; readXfrLen = 0; readPktCnt = 0; readPktXfrCnt = 0; readPktXfrLen = 0; readMagicLen = 0; readMagicVal = 0L; return 0; } // ***************************************************************************** // * This method causes the next available packet to be dequeued from the // * already read buffer. // ***************************************************************************** inline int SocketReader::readNextPacket (char ** buf, int *buflen) { if(readPktCnt>readPktXfrCnt) { *buflen = (int)ntohl(*(long *)readNextPktPtr); readNextPktPtr+=RNDUP(sizeof(long)); *buf = readNextPktPtr; readNextPktPtr+=RNDUP(*buflen); readPktXfrCnt++; } else { *buf = NULL; *buflen = 0; } if(readPktXfrCnt>=readPktCnt) readReset(); return *buflen; } // ***************************************************************************** // * This method will get the next available packet from the socket. The caller // * is responsible for transfering this data to a new location immediately. // * The caller should not delete the pointer provided by this method, or rely // * on it to remain valid between calls. // ***************************************************************************** inline int SocketReader::read(char ** buf, int * buflen) { int handle = getHandle(); int result = 0; int shutdown = 0; int error = 0; *buf = NULL; *buflen = 0; // ********************************************************************* // * First test to ensure that the socket is allocated and that the // * device descriptor is valid. // ********************************************************************* if(handle<=0) result = -1; // ********************************************************************* // * If the handle is valid attempt to read the next packet of a // * multi-packet set from an already existing buffer. // ********************************************************************* else if(reading()) result = readNextPacket(buf, buflen); // ********************************************************************* // * If all packets in the most recent multi-packet set have already // * been read... Then attempt to read a new block from the socket. // ********************************************************************* else { int amntread = 0; char *readPtr = NULL; // ************************************************************* // * Optionally read the magic number from the socket. // ************************************************************* readPtr = (char *)&readMagicVal; while(readMagicNumber && readMagicLen0) { // ***************************************************** // * Anytime a read is successful, set the readRetries // * variable to 0. // ***************************************************** readRetries = 0; // ***************************************************** // * Once an entire long integer is read from the socket // * validate that against the Magic Number that is // * expected. // ***************************************************** if((readMagicLen += amntread)>=sizeof(long)) { readMagicVal = ntohl(readMagicVal); // ********************************************* // * If the Magic Number received is not the // * same as the Magic Number expected, set the // * shutdown flag and kill the connection. // ********************************************* if(readMagicVal!=readMagicNumber) { outputError (CDEV_SEVERITY_ERROR, "SocketReader", "Invalid magic number read from socket\n\t=> Expected %lX - received %lX", readMagicNumber, readMagicVal); shutdown = 1; } } } // ************************************************************* // * Read the size of the packet from the socket... note, this // * code will not be executed until the Magic Number has been // * successfully read. // ************************************************************* readPtr = (char *)&readBufLen; while(!shutdown && readPktXfrLen=sizeof(long)) && (amntread = recv(handle, readPtr+readPktXfrLen, sizeof(long)-readPktXfrLen, 0))>0) { // ***************************************************** // * Anytime a read is successful, set the readRetries // * variable to 0. // ***************************************************** readRetries = 0; // ***************************************************** // * Once an entire long integer is read from the socket // * use that variable as the expected packet length, // * and allocate a buffer of sufficient size to hold // * the incoming packet. // ***************************************************** if((readPktXfrLen += amntread)>=sizeof(long)) { readBufLen = ntohl(readBufLen); // ********************************************* // * A length of -1 indicates that the socket // * should be shutdown. // ********************************************* if(readBufLen == -1) shutdown = 1; if(readBufLen <= 0) readReset(); else { if(readBufLen>readBufMax) { readBufMax = readBufLen; readBuf = (char *)realloc(readBuf, readBufMax); readNextPktPtr = readBuf+RNDUP(sizeof(long)); } readXfrLen = 0; } } } // ************************************************************* // * Continue reading from the socket into the new buffer until // * the amount of data specified by the readBufLen variable // * has been obtained, or no further data is available. // ************************************************************* while(!shutdown && readPktXfrLen>=sizeof(long) && readXfrLen < readBufLen && (amntread = recv(handle, readBuf+readXfrLen, readBufLen-readXfrLen, 0))>0) { // ***************************************************** // * Anytime a read is successful, set the readRetries // * variable to 0. // ***************************************************** readRetries = 0; // ***************************************************** // * Once a complete buffer of data has been read from // * the socket, use the readNextPacket method to set // * the user pointer to the appropriate position within // * the data buffer. // ***************************************************** if((readXfrLen+=amntread)>=readBufLen) { readPktCnt = (int)ntohl(*(long *)readBuf); result = readNextPacket(buf, buflen); } } // ************************************************************* // * If an error occurred, or the function failed to read // * data from the socket, then this section of code will be // * executed. // ************************************************************* if(!shutdown && amntread<=0) { int errCode = GetSocketErrno(); // ***************************************************** // * Increment the readRetries to count the number of // * empty receives. Once this variable reaches the // * MAX_RETRIES value, a -1 will be returned to delete // * the socket. // ***************************************************** readRetries++; // ***************************************************** // * If the amntread is 0 or -1 (and any error was // * caused by blocking), and the maximum number of // * retries has not been reached, do the following // ***************************************************** if(readRetries < MAX_RETRIES && ((amntread==0 && readPktXfrLen>0) || (amntread==-1 && (errCode == EWOULDBLOCK || errCode == EAGAIN)))) { result = 0; } // ***************************************************** // * Otherwise, if the maximum number of retries have // * been reached, do the following // ***************************************************** else if(readRetries >= MAX_RETRIES) { outputError (CDEV_SEVERITY_WARN, "SocketReader", "Have exceeded maximum retries on socket"); result = -1; } // ***************************************************** // * Otherwise, if the error was not due to blocking, // * do the following // ****************************************************** else if(amntread==-1) { outputError (CDEV_SEVERITY_ERROR, "SocketReader", "Error number %i while reading from socket", errCode); result = -1; } } } if(shutdown) result = SHUTDOWN_CODE; if(result==-1) readReset(); return result; } inline SocketWriter::SocketWriter ( long MagicNumber ) : writeBuf(NULL), writeNextPktPtr(NULL), writeBufMax(WRITEBUF_INITIAL_SIZE), writePktCnt(0), writeBufLen(RNDUP(sizeof(long))), writeXfrLen(0), writePktXfrLen(0), // ************************* // * Magic number support. // ************************* writeMagicNumber(MagicNumber), writeMagicLen(0) { writeBuf = (char *)malloc(writeBufMax); writeNextPktPtr = writeBuf+writeBufLen; } inline SocketWriter::~SocketWriter ( void ) { if(writeBuf!=NULL) free(writeBuf); } inline int SocketWriter::writing ( void ) { return (writePktCnt>0)?1:0; } inline int SocketWriter::writeReset ( void ) { writeBufLen = RNDUP(sizeof(long)); writeNextPktPtr = writeBuf+writeBufLen; writePktCnt = 0; writeXfrLen = 0; writePktXfrLen = 0; // ************************* // * Magic number support. // ************************* writeMagicLen = 0; return 0; } inline int SocketWriter::writeContinue ( void ) { int handle = getHandle(); int result = 0; // ********************************************************************* // * The following variable has been added to allow the SocketWriter // * to poll the file descriptor for validity prior to writing to it. // ********************************************************************* #ifdef SYSV struct pollfd fds; fds.fd = handle; fds.events = POLLERR|POLLNVAL|POLLHUP; fds.revents = 0; // ********************************************************************* // * First test to ensure that the socket is allocated and that the // * device descriptor is valid. // ********************************************************************* if( handle<=0 ) result = -1; // ********************************************************************* // * Execute poll to ensure that the handle is still valid and writable. // ********************************************************************* else if(poll(&fds, 1, 0)>0 && (fds.revents&(POLLERR|POLLNVAL|POLLHUP))!=0) { result = -1; } #else cdevHandleSet readfd; struct timeval tv; readfd.set_bit(handle); tv.tv_sec = 0; tv.tv_usec = 0; if (handle<=0) result = -1; else if (cdevSelect (handle+1,readfd,0,0,&tv)<0) result = -1; #endif // ********************************************************************* // * If all is well, continue writing data. // ********************************************************************* else if( writing() ) { int amntsent = 0; char *sendPtr = NULL; long magicNumber = htonl(writeMagicNumber); long packetSize = htonl(writeBufLen); sendPtr = (char *)&magicNumber; while(writeMagicNumber && writeMagicLen0) { writeMagicLen += amntsent; } sendPtr = (char *)&packetSize; while((!writeMagicNumber || writeMagicLen>=sizeof(long)) && writePktXfrLen0) { writePktXfrLen += amntsent; } while(writePktXfrLen>=sizeof(long) && writeXfrLen < writeBufLen && (amntsent = send(handle, writeBuf+writeXfrLen, writeBufLen-writeXfrLen, 0))>0) { if((writeXfrLen+=amntsent)>=writeBufLen) { result = writeBufLen; writeReset(); } } if(amntsent<=0) { int errCode = GetSocketErrno(); if((amntsent==0 && writePktXfrLen>0) || (amntsent==-1 && (errCode == EWOULDBLOCK || errCode == EAGAIN))) { // ********************************************* // * Do Nothing // ********************************************* } else if(amntsent==-1) { outputError (CDEV_SEVERITY_ERROR, "SocketWriter", "Error number %i while writing to socket", errCode); result = -1; } } } if(result==-1) writeReset(); return result; } inline int SocketWriter::writeGoodbye ( void ) { int handle = getHandle(); int result = 0; if( handle<=0 ) result = -1; else { char val = -1; result=(send(handle, &val, 1, MSG_OOB)==1)?0:-1; } if(result==-1) writeReset(); return result; } inline int SocketWriter::writeEnqueue ( char * buf, int buflen ) { int result = 0; // ********************************************************************* // * If data has already been transmitted, then it is impossible to // * add more data to the outbound buffer because it would invalidate // * the packet length and packet count variables. // ********************************************************************* if(writePktXfrLen>0) result = -1; else { if(writePktCnt==0 && (RNDUP(sizeof(long))+RNDUP(buflen)+RNDUP(sizeof(long))) > writeBufMax) { writeBufMax = (RNDUP(sizeof(long))+RNDUP(buflen)+RNDUP(sizeof(long))+WRITEBUF_PAD_SIZE); writeBuf = (char *)realloc(writeBuf, writeBufMax); writeBufLen = RNDUP(sizeof(long)); writeNextPktPtr = writeBuf+writeBufLen; } if(writeBufLen+RNDUP(buflen)+RNDUP(sizeof(long)) < writeBufMax) { writePktCnt++; long tpktCnt = htonl(writePktCnt); memcpy(writeBuf, &tpktCnt, sizeof(long)); long tbuflen = htonl(buflen); memcpy(writeNextPktPtr, &tbuflen, sizeof(long)); writeNextPktPtr+=RNDUP(sizeof(long)); memcpy(writeNextPktPtr, buf, buflen); writeNextPktPtr+=RNDUP(buflen); writeBufLen = (int)(writeNextPktPtr-writeBuf); result = 0; } else result = -1; } return result; } inline int SocketWriter::write(char * buf, int buflen) { writeEnqueue(buf, buflen); return writeContinue(); } #endif /* _SOCKET_UTIL_H */