dont detect a flow control situation when select is telling us there is

something to read when there isnt anything there
This commit is contained in:
Jeff Hill
1999-10-14 23:25:10 +00:00
parent 0f23813e7c
commit 37b25dd8dc
3 changed files with 153 additions and 135 deletions

View File

@@ -160,13 +160,15 @@ int cac_select_io (struct timeval *ptimeout, int flags)
}
LOCK;
/*
* must run through the IIU list even if no IO is pending
* if any of the IOCs are in flow control (so that an exit
* flow control msg can be sent to each of them that are)
*/
if (status>0 || (ca_static->ca_number_iiu_in_fc>0u&&status>=0) ) {
for ( piiu = (IIU *) iiuList.node.next;
/*
* must run through the IIU list even if no IO is pending
* if any of the IOCs are in flow control (so that an exit
* flow control msg can be sent to each of them that are)
*/
if (status>0 || (ca_static->ca_number_iiu_in_fc>0u && status>=0)) {
status = 0;
for (piiu = (IIU *) iiuList.node.next;
piiu; piiu = (IIU *) piiu->node.next) {
if (piiu->state==iiu_disconnected) {
@@ -174,17 +176,22 @@ int cac_select_io (struct timeval *ptimeout, int flags)
}
if (FD_ISSET(piiu->sock_chan,&pfdi->readMask)) {
(*piiu->recvBytes)(piiu);
/*
* if we were not blocking and there is a
* message present then start to suspect that
* we are getting behind
*/
if (piiu->sock_proto==IPPROTO_TCP) {
if (ptimeout->tv_sec==0 && ptimeout->tv_usec==0) {
flow_control_on(piiu);
}
}
unsigned long bytesReceived;
bytesReceived = (*piiu->recvBytes)(piiu);
if (bytesReceived>0) {
status++;
/*
* if we are not blocking and there is a
* message present then start to suspect that
* we are getting behind
*/
if (piiu->sock_proto==IPPROTO_TCP) {
if (ptimeout->tv_sec==0 && ptimeout->tv_usec==0) {
flow_control_on(piiu);
}
}
}
}
else if (piiu->recvPending) {
/*
@@ -198,7 +205,12 @@ int cac_select_io (struct timeval *ptimeout, int flags)
}
if (FD_ISSET(piiu->sock_chan,&pfdi->writeMask)) {
(*piiu->sendBytes)(piiu);
unsigned long bytesSent;
bytesSent = (*piiu->sendBytes)(piiu);
if (bytesSent>0) {
status++;
}
}
}
}

View File

@@ -16,12 +16,12 @@ static char *sccsId = "@# $Id$";
#include "net_convert.h"
#include "bsdSocketResource.h"
LOCAL void tcp_recv_msg(struct ioc_in_use *piiu);
LOCAL void cac_connect_iiu(struct ioc_in_use *piiu);
LOCAL void cac_set_iiu_non_blocking (struct ioc_in_use *piiu);
LOCAL void cac_tcp_send_msg_piiu(struct ioc_in_use *piiu);
LOCAL void cac_udp_send_msg_piiu(struct ioc_in_use *piiu);
LOCAL void udp_recv_msg(struct ioc_in_use *piiu);
LOCAL unsigned long tcp_recv_msg(struct ioc_in_use *piiu);
LOCAL unsigned long cac_connect_iiu(struct ioc_in_use *piiu);
LOCAL unsigned long cac_tcp_send_msg_piiu(struct ioc_in_use *piiu);
LOCAL unsigned long cac_udp_send_msg_piiu(struct ioc_in_use *piiu);
LOCAL unsigned long udp_recv_msg(struct ioc_in_use *piiu);
LOCAL void ca_process_tcp(struct ioc_in_use *piiu);
LOCAL void ca_process_udp(struct ioc_in_use *piiu);
LOCAL void cacRingBufferInit(struct ca_buffer *pBuf,
@@ -461,19 +461,19 @@ LOCAL void cac_set_iiu_non_blocking (struct ioc_in_use *piiu)
/*
* cac_connect_iiu()
*/
LOCAL void cac_connect_iiu (struct ioc_in_use *piiu)
LOCAL unsigned long cac_connect_iiu (struct ioc_in_use *piiu)
{
caAddrNode *pNode;
int status;
if (piiu->state==iiu_connected) {
ca_printf("CAC: redundant connect() attempt?\n");
return;
return 0ul;
}
if (piiu->state==iiu_disconnected) {
ca_printf("CAC: connecting when disconnected?\n");
return;
return 0ul;
}
LOCK;
@@ -514,11 +514,11 @@ LOCAL void cac_connect_iiu (struct ioc_in_use *piiu)
* but not completed.
*/
UNLOCK;
return;
return 0ul;
}
else if (errnoCpy==SOCK_EALREADY) {
UNLOCK;
return;
return 0ul;
}
#ifdef _WIN32
/*
@@ -527,7 +527,7 @@ LOCAL void cac_connect_iiu (struct ioc_in_use *piiu)
*/
else if (errnoCpy==SOCK_EINVAL) { /* a SOCK_EALREADY alias used by early WINSOCK */
UNLOCK;
return;
return 0ul;
}
#endif
else if(errnoCpy==SOCK_EINTR) {
@@ -544,7 +544,7 @@ LOCAL void cac_connect_iiu (struct ioc_in_use *piiu)
ntohs(pNode->destAddr.in.sin_port),
piiu->host_name_str, errnoCpy,
SOCKERRSTR(errnoCpy));
return;
return 0ul;
}
}
@@ -566,6 +566,8 @@ LOCAL void cac_connect_iiu (struct ioc_in_use *piiu)
retryPendingClaims (piiu);
}
UNLOCK;
return 1ul;
}
/*
@@ -754,19 +756,19 @@ void notify_ca_repeater()
/*
* CAC_UDP_SEND_MSG_PIIU()
*
*/
LOCAL void cac_udp_send_msg_piiu(struct ioc_in_use *piiu)
LOCAL unsigned long cac_udp_send_msg_piiu(struct ioc_in_use *piiu)
{
caAddrNode *pNode;
unsigned long sendCnt;
unsigned long totalBytes = 0ul;
int status;
/*
* check for shutdown in progress
*/
if(piiu->state!=iiu_connected){
return;
return 0ul;
}
LOCK;
@@ -780,12 +782,12 @@ LOCAL void cac_udp_send_msg_piiu(struct ioc_in_use *piiu)
*/
if(sendCnt == 0){
UNLOCK;
return;
return 0ul;
}
pNode = (caAddrNode *) piiu->destAddr.node.next;
while (pNode) {
unsigned long actualSendCnt;
unsigned long actualSendCnt;
status = sendto(
piiu->sock_chan,
@@ -797,6 +799,7 @@ LOCAL void cac_udp_send_msg_piiu(struct ioc_in_use *piiu)
if(status>=0){
actualSendCnt = (unsigned long) status;
assert (actualSendCnt == sendCnt);
totalBytes += actualSendCnt;
}
else {
int localErrno;
@@ -827,24 +830,24 @@ LOCAL void cac_udp_send_msg_piiu(struct ioc_in_use *piiu)
min(MAX_UDP, sizeof(piiu->send.buf)));
piiu->pushPending = FALSE;
UNLOCK;
return;
return totalBytes;
}
/*
* CAC_TCP_SEND_MSG_PIIU()
*
*/
LOCAL void cac_tcp_send_msg_piiu(struct ioc_in_use *piiu)
LOCAL unsigned long cac_tcp_send_msg_piiu(struct ioc_in_use *piiu)
{
unsigned long sendCnt;
int status;
int localError;
unsigned long sendCnt;
unsigned long totalBytes = 0ul;
int status;
int localError;
/*
* check for shutdown in progress
*/
if(piiu->state!=iiu_connected){
return;
return 0ul;
}
LOCK;
@@ -875,7 +878,7 @@ LOCAL void cac_tcp_send_msg_piiu(struct ioc_in_use *piiu)
if (piiu->claimsPending) {
retryPendingClaims(piiu);
}
return;
return totalBytes;
}
assert (sendCnt<=INT_MAX);
@@ -890,13 +893,13 @@ LOCAL void cac_tcp_send_msg_piiu(struct ioc_in_use *piiu)
}
CAC_RING_BUFFER_READ_ADVANCE(&piiu->send, status);
totalBytes += (unsigned long) status;
}
if (status==0) {
TAG_CONN_DOWN(piiu);
UNLOCK;
return;
return totalBytes;
}
localError = SOCKERRNO;
@@ -904,7 +907,7 @@ LOCAL void cac_tcp_send_msg_piiu(struct ioc_in_use *piiu)
if( localError == SOCK_EWOULDBLOCK ||
localError == SOCK_EINTR){
UNLOCK;
return;
return totalBytes;
}
if( localError != SOCK_EPIPE &&
@@ -917,7 +920,7 @@ LOCAL void cac_tcp_send_msg_piiu(struct ioc_in_use *piiu)
TAG_CONN_DOWN(piiu);
UNLOCK;
return;
return totalBytes;
}
/*
@@ -954,13 +957,14 @@ void ca_process_input_queue()
* TCP_RECV_MSG()
*
*/
LOCAL void tcp_recv_msg(struct ioc_in_use *piiu)
LOCAL unsigned long tcp_recv_msg(struct ioc_in_use *piiu)
{
unsigned long writeSpace;
int status;
unsigned long writeSpace;
unsigned long totalBytes = 0;
int status;
if(piiu->state!=iiu_connected){
return;
return totalBytes;
}
LOCK;
@@ -1009,6 +1013,8 @@ LOCAL void tcp_recv_msg(struct ioc_in_use *piiu)
assert (((unsigned long)status)<=writeSpace);
CAC_RING_BUFFER_WRITE_ADVANCE(&piiu->recv, status);
totalBytes += (unsigned long) status;
/*
* Record the time whenever we receive a message
@@ -1018,7 +1024,7 @@ LOCAL void tcp_recv_msg(struct ioc_in_use *piiu)
}
UNLOCK;
return;
return totalBytes;
}
/*
@@ -1092,89 +1098,86 @@ LOCAL void ca_process_tcp(struct ioc_in_use *piiu)
* UDP_RECV_MSG()
*
*/
LOCAL void udp_recv_msg(struct ioc_in_use *piiu)
LOCAL unsigned long udp_recv_msg(struct ioc_in_use *piiu)
{
int status;
int reply_size;
struct udpmsglog *pmsglog;
unsigned long bytesAvailable;
if(piiu->state!=iiu_connected){
return;
}
LOCK;
bytesAvailable = cacRingBufferWriteSize(&piiu->recv, TRUE);
assert(bytesAvailable >= ETHERNET_MAX_UDP+2*sizeof(*pmsglog));
pmsglog = (struct udpmsglog *) &piiu->recv.buf[piiu->recv.wtix];
reply_size = sizeof(pmsglog->addr);
status = recvfrom(
piiu->sock_chan,
(char *)(pmsglog+1),
bytesAvailable-2*sizeof(*pmsglog), /* was MAX_UDP before 8-5-97 */
0,
(struct sockaddr *)&pmsglog->addr,
&reply_size);
if(status < 0){
int errnoCpy = SOCKERRNO;
/*
* op would block which is ok to ignore till ready
* later
*/
if(errnoCpy == SOCK_EWOULDBLOCK || errnoCpy == SOCK_EINTR){
UNLOCK;
return;
}
# ifdef linux
/*
* Avoid spurious ECONNREFUSED bug
* in linux
*/
if (errnoCpy==SOCK_ECONNREFUSED) {
UNLOCK;
return;
}
# endif
ca_printf("Unexpected UDP recv error %s\n", SOCKERRSTR(errnoCpy));
}
else if(status > 0){
unsigned long bytesActual;
/*
* log the msg size
* and advance the ring index
*/
pmsglog->nbytes = status;
pmsglog->valid = TRUE;
bytesActual = status + sizeof(*pmsglog);
CAC_RING_BUFFER_WRITE_ADVANCE(&piiu->recv, bytesActual);
/*
* if there isnt enough room at the end advance
* to the beginning of the ring
*/
bytesAvailable = cacRingBufferWriteSize(&piiu->recv, TRUE);
if( bytesAvailable < ETHERNET_MAX_UDP+2*sizeof(*pmsglog) ){
assert(bytesAvailable>=sizeof(*pmsglog));
pmsglog = (struct udpmsglog *)
&piiu->recv.buf[piiu->recv.wtix];
pmsglog->valid = FALSE;
pmsglog->nbytes = bytesAvailable - sizeof(*pmsglog);
CAC_RING_BUFFER_WRITE_ADVANCE(
&piiu->recv, bytesAvailable);
}
int status;
int reply_size;
struct udpmsglog *pmsglog;
unsigned long bytesAvailable;
unsigned long totalBytes = 0ul;
if(piiu->state!=iiu_connected){
return totalBytes;
}
LOCK;
bytesAvailable = cacRingBufferWriteSize(&piiu->recv, TRUE);
assert(bytesAvailable >= ETHERNET_MAX_UDP+2*sizeof(*pmsglog));
pmsglog = (struct udpmsglog *) &piiu->recv.buf[piiu->recv.wtix];
reply_size = sizeof(pmsglog->addr);
status = recvfrom(
piiu->sock_chan,
(char *)(pmsglog+1),
bytesAvailable-2*sizeof(*pmsglog), /* was MAX_UDP before 8-5-97 */
0,
(struct sockaddr *)&pmsglog->addr,
&reply_size);
if(status < 0){
int errnoCpy = SOCKERRNO;
/*
* op would block which is ok to ignore till ready
* later
*/
if(errnoCpy == SOCK_EWOULDBLOCK || errnoCpy == SOCK_EINTR){
UNLOCK;
return totalBytes;
}
# ifdef linux
/*
* Avoid spurious ECONNREFUSED bug
* in linux
*/
if (errnoCpy==SOCK_ECONNREFUSED) {
UNLOCK;
return totalBytes;
}
# endif
ca_printf ("Unexpected UDP recv error %s\n", SOCKERRSTR(errnoCpy));
}
else if(status > 0){
/*
* log the msg size
* and advance the ring index
*/
pmsglog->nbytes = status;
pmsglog->valid = TRUE;
totalBytes = sizeof(*pmsglog) + (unsigned long) status;
CAC_RING_BUFFER_WRITE_ADVANCE(&piiu->recv, totalBytes);
/*
* if there isnt enough room at the end advance
* to the beginning of the ring
*/
bytesAvailable = cacRingBufferWriteSize(&piiu->recv, TRUE);
if( bytesAvailable < ETHERNET_MAX_UDP+2*sizeof(*pmsglog) ){
assert(bytesAvailable>=sizeof(*pmsglog));
pmsglog = (struct udpmsglog *)
&piiu->recv.buf[piiu->recv.wtix];
pmsglog->valid = FALSE;
pmsglog->nbytes = bytesAvailable - sizeof(*pmsglog);
CAC_RING_BUFFER_WRITE_ADVANCE(
&piiu->recv, bytesAvailable);
}
# ifdef DEBUG
ca_printf(
"%s: udp reply of %d bytes\n",
__FILE__,
status);
ca_printf("%s: udp reply of %d bytes\n", __FILE__, status);
# endif
}
UNLOCK;
return;
}
UNLOCK;
return totalBytes;
}
/*

View File

@@ -33,6 +33,9 @@
/*
* $Log$
* Revision 1.77 1999/09/14 23:38:18 jhill
* added ca_vprintf() function
*
* Revision 1.76 1999/09/02 21:44:49 jhill
* improved the way that socket error numbers are converted to strings,
* changed () to (void) in func proto, and fixed missing parameter to
@@ -500,8 +503,8 @@ typedef struct ioc_in_use{
caHdr curMsg;
struct CA_STATIC *pcas;
void *pCurData;
void (*sendBytes)(struct ioc_in_use *);
void (*recvBytes)(struct ioc_in_use *);
unsigned long (*sendBytes)(struct ioc_in_use *);
unsigned long (*recvBytes)(struct ioc_in_use *);
void (*procInput)(struct ioc_in_use *);
SOCKET sock_chan;
int sock_proto;