fetch time in only one place
This commit is contained in:
+169
-144
@@ -270,57 +270,53 @@ int net_proto
|
||||
#endif
|
||||
|
||||
#ifdef CA_SET_TCP_BUFFER_SIZES
|
||||
/* set TCP buffer sizes */
|
||||
i = MAX_MSG_SIZE;
|
||||
status = setsockopt(
|
||||
sock,
|
||||
SOL_SOCKET,
|
||||
SO_SNDBUF,
|
||||
&i,
|
||||
sizeof(i));
|
||||
if(status < 0){
|
||||
free(piiu);
|
||||
status = socket_close(sock);
|
||||
if(status<0){
|
||||
SEVCHK(ECA_INTERNAL,NULL);
|
||||
}
|
||||
UNLOCK;
|
||||
return ECA_SOCK;
|
||||
}
|
||||
i = MAX_MSG_SIZE;
|
||||
status = setsockopt(
|
||||
sock,
|
||||
SOL_SOCKET,
|
||||
SO_RCVBUF,
|
||||
&i,
|
||||
sizeof(i));
|
||||
if(status < 0){
|
||||
free(piiu);
|
||||
status = socket_close(sock);
|
||||
if(status<0){
|
||||
SEVCHK(ECA_INTERNAL,NULL);
|
||||
}
|
||||
UNLOCK;
|
||||
return ECA_SOCK;
|
||||
}
|
||||
{
|
||||
int i;
|
||||
int size;
|
||||
|
||||
/* fetch the TCP send buffer size */
|
||||
i = sizeof(piiu->tcp_send_buff_size);
|
||||
status = getsockopt(
|
||||
sock,
|
||||
SOL_SOCKET,
|
||||
SO_SNDBUF,
|
||||
(char *)&piiu->tcp_send_buff_size,
|
||||
&i);
|
||||
if(status < 0 || i != sizeof(piiu->tcp_send_buff_size)){
|
||||
free(piiu);
|
||||
status = socket_close(sock);
|
||||
if(status<0){
|
||||
SEVCHK(ECA_INTERNAL,NULL);
|
||||
/* set TCP buffer sizes */
|
||||
i = MAX_MSG_SIZE;
|
||||
status = setsockopt(
|
||||
sock,
|
||||
SOL_SOCKET,
|
||||
SO_SNDBUF,
|
||||
&i,
|
||||
sizeof(i));
|
||||
if(status < 0){
|
||||
free(piiu);
|
||||
socket_close(sock);
|
||||
UNLOCK;
|
||||
return ECA_SOCK;
|
||||
}
|
||||
UNLOCK;
|
||||
return ECA_SOCK;
|
||||
}
|
||||
i = MAX_MSG_SIZE;
|
||||
status = setsockopt(
|
||||
sock,
|
||||
SOL_SOCKET,
|
||||
SO_RCVBUF,
|
||||
&i,
|
||||
sizeof(i));
|
||||
if(status < 0){
|
||||
free(piiu);
|
||||
socket_close(sock);
|
||||
UNLOCK;
|
||||
return ECA_SOCK;
|
||||
}
|
||||
|
||||
/* fetch the TCP send buffer size */
|
||||
i = sizeof(size);
|
||||
status = getsockopt(
|
||||
sock,
|
||||
SOL_SOCKET,
|
||||
SO_SNDBUF,
|
||||
(char *)&size,
|
||||
&i);
|
||||
if(status < 0 || i != sizeof(size)){
|
||||
free(piiu);
|
||||
socket_close(sock);
|
||||
UNLOCK;
|
||||
return ECA_SOCK;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/* connect */
|
||||
@@ -342,20 +338,6 @@ int net_proto
|
||||
cacRingBufferInit(&piiu->recv, sizeof(piiu->send.buf));
|
||||
cacRingBufferInit(&piiu->send, sizeof(piiu->send.buf));
|
||||
|
||||
/*
|
||||
* Set non blocking IO
|
||||
* to prevent dead locks
|
||||
*/
|
||||
status = socket_ioctl(
|
||||
piiu->sock_chan,
|
||||
FIONBIO,
|
||||
&true);
|
||||
if(status<0){
|
||||
ca_printf(
|
||||
"Error setting non-blocking io: %s\n",
|
||||
strerror(MYERRNO));
|
||||
}
|
||||
|
||||
/*
|
||||
* Save the Host name for efficient access in the
|
||||
* future.
|
||||
@@ -435,6 +417,7 @@ int net_proto
|
||||
sock,
|
||||
ca_static->ca_server_port);
|
||||
|
||||
|
||||
cacRingBufferInit(&piiu->recv, sizeof(piiu->send.buf));
|
||||
cacRingBufferInit(&piiu->send, min(MAX_UDP,
|
||||
sizeof(piiu->send.buf)));
|
||||
@@ -455,6 +438,20 @@ int net_proto
|
||||
return ECA_INTERNAL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Set non blocking IO
|
||||
* to prevent dead locks
|
||||
*/
|
||||
status = socket_ioctl(
|
||||
piiu->sock_chan,
|
||||
FIONBIO,
|
||||
&true);
|
||||
if(status<0){
|
||||
ca_printf(
|
||||
"Error setting non-blocking io: %s\n",
|
||||
strerror(MYERRNO));
|
||||
}
|
||||
|
||||
if(fd_register_func){
|
||||
LOCKEVENTS;
|
||||
(*fd_register_func)(fd_register_arg, sock, TRUE);
|
||||
@@ -654,9 +651,13 @@ LOCAL void cac_udp_send_msg_piiu(struct ioc_in_use *piiu)
|
||||
|
||||
localErrno = MYERRNO;
|
||||
|
||||
if( localErrno != EWOULDBLOCK &&
|
||||
localErrno != ENOBUFS &&
|
||||
localErrno != EINTR){
|
||||
if( localErrno == EWOULDBLOCK &&
|
||||
localErrno == ENOBUFS &&
|
||||
localErrno == EINTR){
|
||||
UNLOCK;
|
||||
return;
|
||||
}
|
||||
else {
|
||||
ca_printf(
|
||||
"CAC: error on socket send() %s\n",
|
||||
strerror(localErrno));
|
||||
@@ -704,34 +705,52 @@ LOCAL void cac_tcp_send_msg_piiu(struct ioc_in_use *piiu)
|
||||
|
||||
LOCK;
|
||||
|
||||
sendCnt = cacRingBufferReadSize(&piiu->send, TRUE);
|
||||
|
||||
assert(sendCnt<=piiu->send.max_msg);
|
||||
|
||||
/*
|
||||
* return if nothing to send
|
||||
* Check at least twice to see if there is anything
|
||||
* in the ring buffer (in case the block of messages
|
||||
* isnt continuous). Always return if the send was
|
||||
* less bytes than requested.
|
||||
*/
|
||||
if(sendCnt == 0){
|
||||
UNLOCK;
|
||||
return;
|
||||
}
|
||||
while (TRUE) {
|
||||
sendCnt = cacRingBufferReadSize(&piiu->send, TRUE);
|
||||
assert(sendCnt<=piiu->send.max_msg);
|
||||
|
||||
status = send(
|
||||
piiu->sock_chan,
|
||||
&piiu->send.buf[piiu->send.rdix],
|
||||
sendCnt,
|
||||
0);
|
||||
if(status>=0){
|
||||
piiu->sendPending = FALSE;
|
||||
CAC_RING_BUFFER_READ_ADVANCE(&piiu->send, status);
|
||||
|
||||
sendCnt = cacRingBufferReadSize(&piiu->send, FALSE);
|
||||
if(sendCnt==0){
|
||||
/*
|
||||
* return if nothing to send
|
||||
*/
|
||||
if(sendCnt == 0){
|
||||
# ifdef DEBUG
|
||||
if (piiu->sendPending) {
|
||||
printf ("-Unblocked-\n");
|
||||
}
|
||||
# endif /* DEBUG */
|
||||
piiu->sendPending = FALSE;
|
||||
piiu->send_needed = FALSE;
|
||||
UNLOCK;
|
||||
return;
|
||||
}
|
||||
|
||||
UNLOCK;
|
||||
return;
|
||||
status = send(
|
||||
piiu->sock_chan,
|
||||
&piiu->send.buf[piiu->send.rdix],
|
||||
sendCnt,
|
||||
0);
|
||||
if (status<0) {
|
||||
break;
|
||||
}
|
||||
else if (status==0) {
|
||||
TAG_CONN_DOWN(piiu);
|
||||
UNLOCK;
|
||||
return;
|
||||
}
|
||||
|
||||
CAC_RING_BUFFER_READ_ADVANCE(&piiu->send, status);
|
||||
|
||||
if (status != sendCnt) {
|
||||
UNLOCK;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
localError = MYERRNO;
|
||||
@@ -740,10 +759,6 @@ LOCAL void cac_tcp_send_msg_piiu(struct ioc_in_use *piiu)
|
||||
localError == ENOBUFS ||
|
||||
localError == EINTR){
|
||||
UNLOCK;
|
||||
if(!piiu->sendPending){
|
||||
cac_gettimeval(&piiu->timeAtSendBlock);
|
||||
piiu->sendPending = TRUE;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -788,7 +803,6 @@ void cac_flush_internal()
|
||||
UNLOCK;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* cac_clean_iiu_list()
|
||||
@@ -823,14 +837,16 @@ void ca_process_input_queue()
|
||||
{
|
||||
struct ioc_in_use *piiu;
|
||||
|
||||
LOCK;
|
||||
|
||||
/*
|
||||
* dont allow recursion
|
||||
* dont allow recursion
|
||||
*/
|
||||
if(post_msg_active){
|
||||
UNLOCK;
|
||||
return;
|
||||
}
|
||||
|
||||
LOCK;
|
||||
for( piiu=(IIU *)iiuList.node.next;
|
||||
piiu;
|
||||
piiu=(IIU *)piiu->node.next){
|
||||
@@ -841,9 +857,12 @@ void ca_process_input_queue()
|
||||
|
||||
(*piiu->procInput)(piiu);
|
||||
}
|
||||
|
||||
UNLOCK;
|
||||
|
||||
#if 0
|
||||
cac_flush_internal();
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
@@ -863,55 +882,59 @@ LOCAL void tcp_recv_msg(struct ioc_in_use *piiu)
|
||||
|
||||
LOCK;
|
||||
|
||||
writeSpace = cacRingBufferWriteSize(&piiu->recv, TRUE);
|
||||
if(writeSpace == 0){
|
||||
UNLOCK;
|
||||
return;
|
||||
}
|
||||
/*
|
||||
* Check at least twice to see if there is ana space left
|
||||
* in the ring buffer (in case the messages block
|
||||
* isnt continuous). Always return if the send was
|
||||
* less bytes than requested.
|
||||
*/
|
||||
while (TRUE) {
|
||||
|
||||
status = recv( piiu->sock_chan,
|
||||
&piiu->recv.buf[piiu->recv.wtix],
|
||||
writeSpace,
|
||||
0);
|
||||
if(status == 0){
|
||||
TAG_CONN_DOWN(piiu);
|
||||
UNLOCK;
|
||||
return;
|
||||
}
|
||||
else if(status <0){
|
||||
/* try again on status of -1 and no luck this time */
|
||||
if(MYERRNO == EWOULDBLOCK || MYERRNO == EINTR){
|
||||
UNLOCK;
|
||||
return;
|
||||
writeSpace = cacRingBufferWriteSize(&piiu->recv, TRUE);
|
||||
if(writeSpace == 0){
|
||||
break;
|
||||
}
|
||||
|
||||
if( MYERRNO != EPIPE &&
|
||||
MYERRNO != ECONNRESET &&
|
||||
MYERRNO != ETIMEDOUT){
|
||||
ca_printf(
|
||||
"CAC: unexpected recv error (err=%s)\n",
|
||||
strerror(MYERRNO));
|
||||
status = recv( piiu->sock_chan,
|
||||
&piiu->recv.buf[piiu->recv.wtix],
|
||||
writeSpace,
|
||||
0);
|
||||
if(status == 0){
|
||||
TAG_CONN_DOWN(piiu);
|
||||
break;
|
||||
}
|
||||
TAG_CONN_DOWN(piiu);
|
||||
UNLOCK;
|
||||
return;
|
||||
}
|
||||
else if(status <0){
|
||||
/* try again on status of -1 and no luck this time */
|
||||
if(MYERRNO == EWOULDBLOCK || MYERRNO == EINTR){
|
||||
break;
|
||||
}
|
||||
|
||||
if(status>MAX_MSG_SIZE){
|
||||
ca_printf( "CAC: recv_msg(): message overflow %l\n",
|
||||
status-MAX_MSG_SIZE);
|
||||
TAG_CONN_DOWN(piiu);
|
||||
UNLOCK;
|
||||
return;
|
||||
}
|
||||
if( MYERRNO != EPIPE &&
|
||||
MYERRNO != ECONNRESET &&
|
||||
MYERRNO != ETIMEDOUT){
|
||||
ca_printf(
|
||||
"CAC: unexpected recv error (err=%s)\n",
|
||||
strerror(MYERRNO));
|
||||
}
|
||||
TAG_CONN_DOWN(piiu);
|
||||
break;
|
||||
}
|
||||
|
||||
CAC_RING_BUFFER_WRITE_ADVANCE(&piiu->recv, status);
|
||||
assert (status<=writeSpace);
|
||||
|
||||
CAC_RING_BUFFER_WRITE_ADVANCE(&piiu->recv, status);
|
||||
|
||||
/*
|
||||
* Record the time whenever we receive a message
|
||||
* from this IOC
|
||||
*/
|
||||
piiu->timeAtLastRecv = ca_static->currentTime;
|
||||
|
||||
if (status != writeSpace) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Record the time whenever we receive a message
|
||||
* from this IOC
|
||||
*/
|
||||
cac_gettimeval(&piiu->timeAtLastRecv);
|
||||
|
||||
UNLOCK;
|
||||
return;
|
||||
@@ -928,18 +951,20 @@ LOCAL void ca_process_tcp(struct ioc_in_use *piiu)
|
||||
int status;
|
||||
long bytesToProcess;
|
||||
|
||||
LOCK;
|
||||
|
||||
/*
|
||||
* dont allow recursion
|
||||
*/
|
||||
if(post_msg_active){
|
||||
UNLOCK;
|
||||
return;
|
||||
}
|
||||
|
||||
pNode = (caAddrNode *) piiu->destAddr.node.next;
|
||||
|
||||
post_msg_active = TRUE;
|
||||
|
||||
LOCK;
|
||||
pNode = (caAddrNode *) piiu->destAddr.node.next;
|
||||
|
||||
while(TRUE){
|
||||
bytesToProcess = cacRingBufferReadSize(&piiu->recv, TRUE);
|
||||
if(bytesToProcess == 0){
|
||||
@@ -962,9 +987,9 @@ LOCAL void ca_process_tcp(struct ioc_in_use *piiu)
|
||||
&piiu->recv,
|
||||
bytesToProcess);
|
||||
}
|
||||
UNLOCK;
|
||||
|
||||
post_msg_active = FALSE;
|
||||
UNLOCK;
|
||||
|
||||
flow_control(piiu);
|
||||
|
||||
@@ -1065,17 +1090,18 @@ LOCAL void ca_process_udp(struct ioc_in_use *piiu)
|
||||
char *pBuf;
|
||||
unsigned long bytesAvailable;
|
||||
|
||||
LOCK;
|
||||
|
||||
/*
|
||||
* dont allow recursion
|
||||
*/
|
||||
if(post_msg_active){
|
||||
UNLOCK;
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
post_msg_active = TRUE;
|
||||
|
||||
LOCK;
|
||||
while(TRUE){
|
||||
|
||||
bytesAvailable = cacRingBufferReadSize(&piiu->recv, TRUE);
|
||||
@@ -1123,9 +1149,8 @@ LOCAL void ca_process_udp(struct ioc_in_use *piiu)
|
||||
bytesAvailable);
|
||||
}
|
||||
|
||||
UNLOCK;
|
||||
|
||||
post_msg_active = FALSE;
|
||||
UNLOCK;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user