diff --git a/src/ca/iocinf.c b/src/ca/iocinf.c index e2e7e033a..d4417ef78 100644 --- a/src/ca/iocinf.c +++ b/src/ca/iocinf.c @@ -270,57 +270,53 @@ int net_proto #endif #ifdef CA_SET_TCP_BUFFER_SIZES - /* set TCP buffer sizes */ - i = MAX_MSG_SIZE; - status = setsockopt( - sock, - SOL_SOCKET, - SO_SNDBUF, - &i, - sizeof(i)); - if(status < 0){ - free(piiu); - status = socket_close(sock); - if(status<0){ - SEVCHK(ECA_INTERNAL,NULL); - } - UNLOCK; - return ECA_SOCK; - } - i = MAX_MSG_SIZE; - status = setsockopt( - sock, - SOL_SOCKET, - SO_RCVBUF, - &i, - sizeof(i)); - if(status < 0){ - free(piiu); - status = socket_close(sock); - if(status<0){ - SEVCHK(ECA_INTERNAL,NULL); - } - UNLOCK; - return ECA_SOCK; - } + { + int i; + int size; - /* fetch the TCP send buffer size */ - i = sizeof(piiu->tcp_send_buff_size); - status = getsockopt( - sock, - SOL_SOCKET, - SO_SNDBUF, - (char *)&piiu->tcp_send_buff_size, - &i); - if(status < 0 || i != sizeof(piiu->tcp_send_buff_size)){ - free(piiu); - status = socket_close(sock); - if(status<0){ - SEVCHK(ECA_INTERNAL,NULL); + /* set TCP buffer sizes */ + i = MAX_MSG_SIZE; + status = setsockopt( + sock, + SOL_SOCKET, + SO_SNDBUF, + &i, + sizeof(i)); + if(status < 0){ + free(piiu); + socket_close(sock); + UNLOCK; + return ECA_SOCK; } - UNLOCK; - return ECA_SOCK; - } + i = MAX_MSG_SIZE; + status = setsockopt( + sock, + SOL_SOCKET, + SO_RCVBUF, + &i, + sizeof(i)); + if(status < 0){ + free(piiu); + socket_close(sock); + UNLOCK; + return ECA_SOCK; + } + + /* fetch the TCP send buffer size */ + i = sizeof(size); + status = getsockopt( + sock, + SOL_SOCKET, + SO_SNDBUF, + (char *)&size, + &i); + if(status < 0 || i != sizeof(size)){ + free(piiu); + socket_close(sock); + UNLOCK; + return ECA_SOCK; + } + } #endif /* connect */ @@ -342,20 +338,6 @@ int net_proto cacRingBufferInit(&piiu->recv, sizeof(piiu->send.buf)); cacRingBufferInit(&piiu->send, sizeof(piiu->send.buf)); - /* - * Set non blocking IO - * to prevent dead locks - */ - status = socket_ioctl( - piiu->sock_chan, - FIONBIO, - &true); - if(status<0){ - ca_printf( - "Error setting non-blocking io: %s\n", - strerror(MYERRNO)); - } - /* * Save the Host name for efficient access in the * future. @@ -435,6 +417,7 @@ int net_proto sock, ca_static->ca_server_port); + cacRingBufferInit(&piiu->recv, sizeof(piiu->send.buf)); cacRingBufferInit(&piiu->send, min(MAX_UDP, sizeof(piiu->send.buf))); @@ -455,6 +438,20 @@ int net_proto return ECA_INTERNAL; } + /* + * Set non blocking IO + * to prevent dead locks + */ + status = socket_ioctl( + piiu->sock_chan, + FIONBIO, + &true); + if(status<0){ + ca_printf( + "Error setting non-blocking io: %s\n", + strerror(MYERRNO)); + } + if(fd_register_func){ LOCKEVENTS; (*fd_register_func)(fd_register_arg, sock, TRUE); @@ -654,9 +651,13 @@ LOCAL void cac_udp_send_msg_piiu(struct ioc_in_use *piiu) localErrno = MYERRNO; - if( localErrno != EWOULDBLOCK && - localErrno != ENOBUFS && - localErrno != EINTR){ + if( localErrno == EWOULDBLOCK && + localErrno == ENOBUFS && + localErrno == EINTR){ + UNLOCK; + return; + } + else { ca_printf( "CAC: error on socket send() %s\n", strerror(localErrno)); @@ -704,34 +705,52 @@ LOCAL void cac_tcp_send_msg_piiu(struct ioc_in_use *piiu) LOCK; - sendCnt = cacRingBufferReadSize(&piiu->send, TRUE); - - assert(sendCnt<=piiu->send.max_msg); /* - * return if nothing to send + * Check at least twice to see if there is anything + * in the ring buffer (in case the block of messages + * isnt continuous). Always return if the send was + * less bytes than requested. */ - if(sendCnt == 0){ - UNLOCK; - return; - } + while (TRUE) { + sendCnt = cacRingBufferReadSize(&piiu->send, TRUE); + assert(sendCnt<=piiu->send.max_msg); - status = send( - piiu->sock_chan, - &piiu->send.buf[piiu->send.rdix], - sendCnt, - 0); - if(status>=0){ - piiu->sendPending = FALSE; - CAC_RING_BUFFER_READ_ADVANCE(&piiu->send, status); - - sendCnt = cacRingBufferReadSize(&piiu->send, FALSE); - if(sendCnt==0){ + /* + * return if nothing to send + */ + if(sendCnt == 0){ +# ifdef DEBUG + if (piiu->sendPending) { + printf ("-Unblocked-\n"); + } +# endif /* DEBUG */ + piiu->sendPending = FALSE; piiu->send_needed = FALSE; + UNLOCK; + return; } - UNLOCK; - return; + status = send( + piiu->sock_chan, + &piiu->send.buf[piiu->send.rdix], + sendCnt, + 0); + if (status<0) { + break; + } + else if (status==0) { + TAG_CONN_DOWN(piiu); + UNLOCK; + return; + } + + CAC_RING_BUFFER_READ_ADVANCE(&piiu->send, status); + + if (status != sendCnt) { + UNLOCK; + return; + } } localError = MYERRNO; @@ -740,10 +759,6 @@ LOCAL void cac_tcp_send_msg_piiu(struct ioc_in_use *piiu) localError == ENOBUFS || localError == EINTR){ UNLOCK; - if(!piiu->sendPending){ - cac_gettimeval(&piiu->timeAtSendBlock); - piiu->sendPending = TRUE; - } return; } @@ -788,7 +803,6 @@ void cac_flush_internal() UNLOCK; } - /* * cac_clean_iiu_list() @@ -823,14 +837,16 @@ void ca_process_input_queue() { struct ioc_in_use *piiu; + LOCK; + /* - * dont allow recursion + * dont allow recursion */ if(post_msg_active){ + UNLOCK; return; } - LOCK; for( piiu=(IIU *)iiuList.node.next; piiu; piiu=(IIU *)piiu->node.next){ @@ -841,9 +857,12 @@ void ca_process_input_queue() (*piiu->procInput)(piiu); } + UNLOCK; +#if 0 cac_flush_internal(); +#endif } @@ -863,55 +882,59 @@ LOCAL void tcp_recv_msg(struct ioc_in_use *piiu) LOCK; - writeSpace = cacRingBufferWriteSize(&piiu->recv, TRUE); - if(writeSpace == 0){ - UNLOCK; - return; - } + /* + * Check at least twice to see if there is ana space left + * in the ring buffer (in case the messages block + * isnt continuous). Always return if the send was + * less bytes than requested. + */ + while (TRUE) { - status = recv( piiu->sock_chan, - &piiu->recv.buf[piiu->recv.wtix], - writeSpace, - 0); - if(status == 0){ - TAG_CONN_DOWN(piiu); - UNLOCK; - return; - } - else if(status <0){ - /* try again on status of -1 and no luck this time */ - if(MYERRNO == EWOULDBLOCK || MYERRNO == EINTR){ - UNLOCK; - return; + writeSpace = cacRingBufferWriteSize(&piiu->recv, TRUE); + if(writeSpace == 0){ + break; } - if( MYERRNO != EPIPE && - MYERRNO != ECONNRESET && - MYERRNO != ETIMEDOUT){ - ca_printf( - "CAC: unexpected recv error (err=%s)\n", - strerror(MYERRNO)); + status = recv( piiu->sock_chan, + &piiu->recv.buf[piiu->recv.wtix], + writeSpace, + 0); + if(status == 0){ + TAG_CONN_DOWN(piiu); + break; } - TAG_CONN_DOWN(piiu); - UNLOCK; - return; - } + else if(status <0){ + /* try again on status of -1 and no luck this time */ + if(MYERRNO == EWOULDBLOCK || MYERRNO == EINTR){ + break; + } - if(status>MAX_MSG_SIZE){ - ca_printf( "CAC: recv_msg(): message overflow %l\n", - status-MAX_MSG_SIZE); - TAG_CONN_DOWN(piiu); - UNLOCK; - return; - } + if( MYERRNO != EPIPE && + MYERRNO != ECONNRESET && + MYERRNO != ETIMEDOUT){ + ca_printf( + "CAC: unexpected recv error (err=%s)\n", + strerror(MYERRNO)); + } + TAG_CONN_DOWN(piiu); + break; + } - CAC_RING_BUFFER_WRITE_ADVANCE(&piiu->recv, status); + assert (status<=writeSpace); + + CAC_RING_BUFFER_WRITE_ADVANCE(&piiu->recv, status); + + /* + * Record the time whenever we receive a message + * from this IOC + */ + piiu->timeAtLastRecv = ca_static->currentTime; + + if (status != writeSpace) { + break; + } + } - /* - * Record the time whenever we receive a message - * from this IOC - */ - cac_gettimeval(&piiu->timeAtLastRecv); UNLOCK; return; @@ -928,18 +951,20 @@ LOCAL void ca_process_tcp(struct ioc_in_use *piiu) int status; long bytesToProcess; + LOCK; + /* * dont allow recursion */ if(post_msg_active){ + UNLOCK; return; } - pNode = (caAddrNode *) piiu->destAddr.node.next; - post_msg_active = TRUE; - LOCK; + pNode = (caAddrNode *) piiu->destAddr.node.next; + while(TRUE){ bytesToProcess = cacRingBufferReadSize(&piiu->recv, TRUE); if(bytesToProcess == 0){ @@ -962,9 +987,9 @@ LOCAL void ca_process_tcp(struct ioc_in_use *piiu) &piiu->recv, bytesToProcess); } - UNLOCK; post_msg_active = FALSE; + UNLOCK; flow_control(piiu); @@ -1065,17 +1090,18 @@ LOCAL void ca_process_udp(struct ioc_in_use *piiu) char *pBuf; unsigned long bytesAvailable; + LOCK; + /* * dont allow recursion */ if(post_msg_active){ + UNLOCK; return; } - post_msg_active = TRUE; - LOCK; while(TRUE){ bytesAvailable = cacRingBufferReadSize(&piiu->recv, TRUE); @@ -1123,9 +1149,8 @@ LOCAL void ca_process_udp(struct ioc_in_use *piiu) bytesAvailable); } - UNLOCK; - post_msg_active = FALSE; + UNLOCK; return; }