diff --git a/src/ca/bsd_depen.c b/src/ca/bsd_depen.c index d60c4ab4b..54a55159b 100644 --- a/src/ca/bsd_depen.c +++ b/src/ca/bsd_depen.c @@ -160,13 +160,15 @@ int cac_select_io (struct timeval *ptimeout, int flags) } LOCK; - /* - * must run through the IIU list even if no IO is pending - * if any of the IOCs are in flow control (so that an exit - * flow control msg can be sent to each of them that are) - */ - if (status>0 || (ca_static->ca_number_iiu_in_fc>0u&&status>=0) ) { - for ( piiu = (IIU *) iiuList.node.next; + + /* + * must run through the IIU list even if no IO is pending + * if any of the IOCs are in flow control (so that an exit + * flow control msg can be sent to each of them that are) + */ + if (status>0 || (ca_static->ca_number_iiu_in_fc>0u && status>=0)) { + status = 0; + for (piiu = (IIU *) iiuList.node.next; piiu; piiu = (IIU *) piiu->node.next) { if (piiu->state==iiu_disconnected) { @@ -174,17 +176,22 @@ int cac_select_io (struct timeval *ptimeout, int flags) } if (FD_ISSET(piiu->sock_chan,&pfdi->readMask)) { - (*piiu->recvBytes)(piiu); - /* - * if we were not blocking and there is a - * message present then start to suspect that - * we are getting behind - */ - if (piiu->sock_proto==IPPROTO_TCP) { - if (ptimeout->tv_sec==0 && ptimeout->tv_usec==0) { - flow_control_on(piiu); - } - } + unsigned long bytesReceived; + + bytesReceived = (*piiu->recvBytes)(piiu); + if (bytesReceived>0) { + status++; + /* + * if we are not blocking and there is a + * message present then start to suspect that + * we are getting behind + */ + if (piiu->sock_proto==IPPROTO_TCP) { + if (ptimeout->tv_sec==0 && ptimeout->tv_usec==0) { + flow_control_on(piiu); + } + } + } } else if (piiu->recvPending) { /* @@ -198,7 +205,12 @@ int cac_select_io (struct timeval *ptimeout, int flags) } if (FD_ISSET(piiu->sock_chan,&pfdi->writeMask)) { - (*piiu->sendBytes)(piiu); + unsigned long bytesSent; + + bytesSent = (*piiu->sendBytes)(piiu); + if (bytesSent>0) { + status++; + } } } } diff --git a/src/ca/iocinf.c b/src/ca/iocinf.c index e9b7effee..e155bce42 100644 --- a/src/ca/iocinf.c +++ b/src/ca/iocinf.c @@ -16,12 +16,12 @@ static char *sccsId = "@# $Id$"; #include "net_convert.h" #include "bsdSocketResource.h" -LOCAL void tcp_recv_msg(struct ioc_in_use *piiu); -LOCAL void cac_connect_iiu(struct ioc_in_use *piiu); LOCAL void cac_set_iiu_non_blocking (struct ioc_in_use *piiu); -LOCAL void cac_tcp_send_msg_piiu(struct ioc_in_use *piiu); -LOCAL void cac_udp_send_msg_piiu(struct ioc_in_use *piiu); -LOCAL void udp_recv_msg(struct ioc_in_use *piiu); +LOCAL unsigned long tcp_recv_msg(struct ioc_in_use *piiu); +LOCAL unsigned long cac_connect_iiu(struct ioc_in_use *piiu); +LOCAL unsigned long cac_tcp_send_msg_piiu(struct ioc_in_use *piiu); +LOCAL unsigned long cac_udp_send_msg_piiu(struct ioc_in_use *piiu); +LOCAL unsigned long udp_recv_msg(struct ioc_in_use *piiu); LOCAL void ca_process_tcp(struct ioc_in_use *piiu); LOCAL void ca_process_udp(struct ioc_in_use *piiu); LOCAL void cacRingBufferInit(struct ca_buffer *pBuf, @@ -461,19 +461,19 @@ LOCAL void cac_set_iiu_non_blocking (struct ioc_in_use *piiu) /* * cac_connect_iiu() */ -LOCAL void cac_connect_iiu (struct ioc_in_use *piiu) +LOCAL unsigned long cac_connect_iiu (struct ioc_in_use *piiu) { caAddrNode *pNode; int status; if (piiu->state==iiu_connected) { ca_printf("CAC: redundant connect() attempt?\n"); - return; + return 0ul; } if (piiu->state==iiu_disconnected) { ca_printf("CAC: connecting when disconnected?\n"); - return; + return 0ul; } LOCK; @@ -514,11 +514,11 @@ LOCAL void cac_connect_iiu (struct ioc_in_use *piiu) * but not completed. */ UNLOCK; - return; + return 0ul; } else if (errnoCpy==SOCK_EALREADY) { UNLOCK; - return; + return 0ul; } #ifdef _WIN32 /* @@ -527,7 +527,7 @@ LOCAL void cac_connect_iiu (struct ioc_in_use *piiu) */ else if (errnoCpy==SOCK_EINVAL) { /* a SOCK_EALREADY alias used by early WINSOCK */ UNLOCK; - return; + return 0ul; } #endif else if(errnoCpy==SOCK_EINTR) { @@ -544,7 +544,7 @@ LOCAL void cac_connect_iiu (struct ioc_in_use *piiu) ntohs(pNode->destAddr.in.sin_port), piiu->host_name_str, errnoCpy, SOCKERRSTR(errnoCpy)); - return; + return 0ul; } } @@ -566,6 +566,8 @@ LOCAL void cac_connect_iiu (struct ioc_in_use *piiu) retryPendingClaims (piiu); } UNLOCK; + + return 1ul; } /* @@ -754,19 +756,19 @@ void notify_ca_repeater() /* * CAC_UDP_SEND_MSG_PIIU() - * */ -LOCAL void cac_udp_send_msg_piiu(struct ioc_in_use *piiu) +LOCAL unsigned long cac_udp_send_msg_piiu(struct ioc_in_use *piiu) { caAddrNode *pNode; unsigned long sendCnt; + unsigned long totalBytes = 0ul; int status; /* * check for shutdown in progress */ if(piiu->state!=iiu_connected){ - return; + return 0ul; } LOCK; @@ -780,12 +782,12 @@ LOCAL void cac_udp_send_msg_piiu(struct ioc_in_use *piiu) */ if(sendCnt == 0){ UNLOCK; - return; + return 0ul; } pNode = (caAddrNode *) piiu->destAddr.node.next; while (pNode) { - unsigned long actualSendCnt; + unsigned long actualSendCnt; status = sendto( piiu->sock_chan, @@ -797,6 +799,7 @@ LOCAL void cac_udp_send_msg_piiu(struct ioc_in_use *piiu) if(status>=0){ actualSendCnt = (unsigned long) status; assert (actualSendCnt == sendCnt); + totalBytes += actualSendCnt; } else { int localErrno; @@ -827,24 +830,24 @@ LOCAL void cac_udp_send_msg_piiu(struct ioc_in_use *piiu) min(MAX_UDP, sizeof(piiu->send.buf))); piiu->pushPending = FALSE; UNLOCK; - return; + return totalBytes; } /* * CAC_TCP_SEND_MSG_PIIU() - * */ -LOCAL void cac_tcp_send_msg_piiu(struct ioc_in_use *piiu) +LOCAL unsigned long cac_tcp_send_msg_piiu(struct ioc_in_use *piiu) { - unsigned long sendCnt; - int status; - int localError; + unsigned long sendCnt; + unsigned long totalBytes = 0ul; + int status; + int localError; /* * check for shutdown in progress */ if(piiu->state!=iiu_connected){ - return; + return 0ul; } LOCK; @@ -875,7 +878,7 @@ LOCAL void cac_tcp_send_msg_piiu(struct ioc_in_use *piiu) if (piiu->claimsPending) { retryPendingClaims(piiu); } - return; + return totalBytes; } assert (sendCnt<=INT_MAX); @@ -890,13 +893,13 @@ LOCAL void cac_tcp_send_msg_piiu(struct ioc_in_use *piiu) } CAC_RING_BUFFER_READ_ADVANCE(&piiu->send, status); - + totalBytes += (unsigned long) status; } if (status==0) { TAG_CONN_DOWN(piiu); UNLOCK; - return; + return totalBytes; } localError = SOCKERRNO; @@ -904,7 +907,7 @@ LOCAL void cac_tcp_send_msg_piiu(struct ioc_in_use *piiu) if( localError == SOCK_EWOULDBLOCK || localError == SOCK_EINTR){ UNLOCK; - return; + return totalBytes; } if( localError != SOCK_EPIPE && @@ -917,7 +920,7 @@ LOCAL void cac_tcp_send_msg_piiu(struct ioc_in_use *piiu) TAG_CONN_DOWN(piiu); UNLOCK; - return; + return totalBytes; } /* @@ -954,13 +957,14 @@ void ca_process_input_queue() * TCP_RECV_MSG() * */ -LOCAL void tcp_recv_msg(struct ioc_in_use *piiu) +LOCAL unsigned long tcp_recv_msg(struct ioc_in_use *piiu) { - unsigned long writeSpace; - int status; + unsigned long writeSpace; + unsigned long totalBytes = 0; + int status; if(piiu->state!=iiu_connected){ - return; + return totalBytes; } LOCK; @@ -1009,6 +1013,8 @@ LOCAL void tcp_recv_msg(struct ioc_in_use *piiu) assert (((unsigned long)status)<=writeSpace); CAC_RING_BUFFER_WRITE_ADVANCE(&piiu->recv, status); + + totalBytes += (unsigned long) status; /* * Record the time whenever we receive a message @@ -1018,7 +1024,7 @@ LOCAL void tcp_recv_msg(struct ioc_in_use *piiu) } UNLOCK; - return; + return totalBytes; } /* @@ -1092,89 +1098,86 @@ LOCAL void ca_process_tcp(struct ioc_in_use *piiu) * UDP_RECV_MSG() * */ -LOCAL void udp_recv_msg(struct ioc_in_use *piiu) +LOCAL unsigned long udp_recv_msg(struct ioc_in_use *piiu) { - int status; - int reply_size; - struct udpmsglog *pmsglog; - unsigned long bytesAvailable; - - if(piiu->state!=iiu_connected){ - return; - } - - LOCK; - - bytesAvailable = cacRingBufferWriteSize(&piiu->recv, TRUE); - assert(bytesAvailable >= ETHERNET_MAX_UDP+2*sizeof(*pmsglog)); - pmsglog = (struct udpmsglog *) &piiu->recv.buf[piiu->recv.wtix]; - - reply_size = sizeof(pmsglog->addr); - status = recvfrom( - piiu->sock_chan, - (char *)(pmsglog+1), - bytesAvailable-2*sizeof(*pmsglog), /* was MAX_UDP before 8-5-97 */ - 0, - (struct sockaddr *)&pmsglog->addr, - &reply_size); - if(status < 0){ - int errnoCpy = SOCKERRNO; - /* - * op would block which is ok to ignore till ready - * later - */ - if(errnoCpy == SOCK_EWOULDBLOCK || errnoCpy == SOCK_EINTR){ - UNLOCK; - return; - } -# ifdef linux - /* - * Avoid spurious ECONNREFUSED bug - * in linux - */ - if (errnoCpy==SOCK_ECONNREFUSED) { - UNLOCK; - return; - } -# endif - ca_printf("Unexpected UDP recv error %s\n", SOCKERRSTR(errnoCpy)); - } - else if(status > 0){ - unsigned long bytesActual; - - /* - * log the msg size - * and advance the ring index - */ - pmsglog->nbytes = status; - pmsglog->valid = TRUE; - bytesActual = status + sizeof(*pmsglog); - CAC_RING_BUFFER_WRITE_ADVANCE(&piiu->recv, bytesActual); - /* - * if there isnt enough room at the end advance - * to the beginning of the ring - */ - bytesAvailable = cacRingBufferWriteSize(&piiu->recv, TRUE); - if( bytesAvailable < ETHERNET_MAX_UDP+2*sizeof(*pmsglog) ){ - assert(bytesAvailable>=sizeof(*pmsglog)); - pmsglog = (struct udpmsglog *) - &piiu->recv.buf[piiu->recv.wtix]; - pmsglog->valid = FALSE; - pmsglog->nbytes = bytesAvailable - sizeof(*pmsglog); - CAC_RING_BUFFER_WRITE_ADVANCE( - &piiu->recv, bytesAvailable); - } + int status; + int reply_size; + struct udpmsglog *pmsglog; + unsigned long bytesAvailable; + unsigned long totalBytes = 0ul; + + if(piiu->state!=iiu_connected){ + return totalBytes; + } + + LOCK; + + bytesAvailable = cacRingBufferWriteSize(&piiu->recv, TRUE); + assert(bytesAvailable >= ETHERNET_MAX_UDP+2*sizeof(*pmsglog)); + pmsglog = (struct udpmsglog *) &piiu->recv.buf[piiu->recv.wtix]; + + reply_size = sizeof(pmsglog->addr); + status = recvfrom( + piiu->sock_chan, + (char *)(pmsglog+1), + bytesAvailable-2*sizeof(*pmsglog), /* was MAX_UDP before 8-5-97 */ + 0, + (struct sockaddr *)&pmsglog->addr, + &reply_size); + if(status < 0){ + int errnoCpy = SOCKERRNO; + /* + * op would block which is ok to ignore till ready + * later + */ + if(errnoCpy == SOCK_EWOULDBLOCK || errnoCpy == SOCK_EINTR){ + UNLOCK; + return totalBytes; + } +# ifdef linux + /* + * Avoid spurious ECONNREFUSED bug + * in linux + */ + if (errnoCpy==SOCK_ECONNREFUSED) { + UNLOCK; + return totalBytes; + } +# endif + ca_printf ("Unexpected UDP recv error %s\n", SOCKERRSTR(errnoCpy)); + } + else if(status > 0){ + + /* + * log the msg size + * and advance the ring index + */ + pmsglog->nbytes = status; + pmsglog->valid = TRUE; + totalBytes = sizeof(*pmsglog) + (unsigned long) status; + CAC_RING_BUFFER_WRITE_ADVANCE(&piiu->recv, totalBytes); + /* + * if there isnt enough room at the end advance + * to the beginning of the ring + */ + bytesAvailable = cacRingBufferWriteSize(&piiu->recv, TRUE); + if( bytesAvailable < ETHERNET_MAX_UDP+2*sizeof(*pmsglog) ){ + assert(bytesAvailable>=sizeof(*pmsglog)); + pmsglog = (struct udpmsglog *) + &piiu->recv.buf[piiu->recv.wtix]; + pmsglog->valid = FALSE; + pmsglog->nbytes = bytesAvailable - sizeof(*pmsglog); + CAC_RING_BUFFER_WRITE_ADVANCE( + &piiu->recv, bytesAvailable); + } # ifdef DEBUG - ca_printf( - "%s: udp reply of %d bytes\n", - __FILE__, - status); + ca_printf("%s: udp reply of %d bytes\n", __FILE__, status); # endif - } - - UNLOCK; - - return; + } + + UNLOCK; + + return totalBytes; } /* diff --git a/src/ca/iocinf.h b/src/ca/iocinf.h index bf3af36a7..6e280c256 100644 --- a/src/ca/iocinf.h +++ b/src/ca/iocinf.h @@ -33,6 +33,9 @@ /* * $Log$ + * Revision 1.77 1999/09/14 23:38:18 jhill + * added ca_vprintf() function + * * Revision 1.76 1999/09/02 21:44:49 jhill * improved the way that socket error numbers are converted to strings, * changed () to (void) in func proto, and fixed missing parameter to @@ -500,8 +503,8 @@ typedef struct ioc_in_use{ caHdr curMsg; struct CA_STATIC *pcas; void *pCurData; - void (*sendBytes)(struct ioc_in_use *); - void (*recvBytes)(struct ioc_in_use *); + unsigned long (*sendBytes)(struct ioc_in_use *); + unsigned long (*recvBytes)(struct ioc_in_use *); void (*procInput)(struct ioc_in_use *); SOCKET sock_chan; int sock_proto;