allow saturated clients to poll/use new consolodated IP address routines in libCom/clean up when a server and client delete the PV simultaneously

This commit is contained in:
Jeff Hill
1998-06-16 01:16:09 +00:00
parent 4183105e2e
commit e134af5b80

View File

@@ -47,6 +47,9 @@
/* address in use so that test works on UNIX */
/* kernels that support multicast */
/* $Log$
* Revision 1.77 1998/05/29 00:03:19 jhill
* allow CA to run systems w/o local interface query capabilities (ie cygwin32)
*
* Revision 1.76 1998/04/14 00:39:49 jhill
* cosmetic
*
@@ -125,7 +128,7 @@ static char *sccsId = "@(#) $Id$";
#define CA_GLBLSOURCE
#include "iocinf.h"
#include "net_convert.h"
#include "ipAddrToA.h"
#include "bsdSocketResource.h"
LOCAL void tcp_recv_msg(struct ioc_in_use *piiu);
LOCAL void cac_connect_iiu(struct ioc_in_use *piiu);
@@ -377,11 +380,9 @@ int net_proto
* Save the Host name for efficient access in the
* future.
*/
caHostFromInetAddr(
&pNode->destAddr.in.sin_addr,
piiu->host_name_str,
ipAddrToA (&pNode->destAddr.in, piiu->host_name_str,
sizeof(piiu->host_name_str));
/*
* TCP starts out in the connecting state and later transitions
* to the connected state
@@ -411,31 +412,31 @@ int net_proto
if(sock == INVALID_SOCKET){
free (piiu);
UNLOCK;
return ECA_SOCK;
return ECA_SOCK;
}
piiu->sock_chan = sock;
piiu->sock_chan = sock;
/*
* The following only needed on BSD 4.3 machines
*/
status = setsockopt(
sock,
SOL_SOCKET,
SO_BROADCAST,
(char *)&true,
sizeof(true));
if(status<0){
free(piiu);
ca_printf("CAC: sso (err=\"%s\")\n",
SOCKERRSTR);
status = socket_close(sock);
if(status < 0){
SEVCHK(ECA_INTERNAL,NULL);
}
UNLOCK;
return ECA_CONN;
}
status = setsockopt(
sock,
SOL_SOCKET,
SO_BROADCAST,
(char *)&true,
sizeof(true));
if(status<0){
free(piiu);
ca_printf("CAC: sso (err=\"%s\")\n",
SOCKERRSTR);
status = socket_close(sock);
if(status < 0){
SEVCHK(ECA_INTERNAL,NULL);
}
UNLOCK;
return ECA_CONN;
}
/*
* bump up the UDP recv buffer
@@ -462,21 +463,21 @@ int net_proto
}
}
#if 0
memset((char *)&saddr,0,sizeof(saddr));
saddr.sin_family = AF_INET;
memset((char *)&saddr,0,sizeof(saddr));
saddr.sin_family = AF_INET;
/*
* let slib pick lcl addr
*/
saddr.sin_addr.s_addr = htonl(INADDR_ANY);
saddr.sin_port = htons(0U);
saddr.sin_addr.s_addr = htonl(INADDR_ANY);
saddr.sin_port = htons(0U);
status = bind( sock,
(struct sockaddr *) &saddr,
sizeof(saddr));
if(status<0){
ca_printf("CAC: bind (err=%s)\n",SOCKERRSTR);
status = bind( sock,
(struct sockaddr *) &saddr,
sizeof(saddr));
if(status<0){
ca_printf("CAC: bind (err=%s)\n",SOCKERRSTR);
genLocalExcep (ECA_INTERNAL,"bind failed");
}
}
#endif
/*
@@ -506,11 +507,11 @@ int net_proto
cac_set_iiu_non_blocking (piiu);
break;
break;
default:
free(piiu);
genLocalExcep (ECA_INTERNAL,"create_net_chan: ukn protocol");
genLocalExcep (ECA_INTERNAL,"create_net_chan: ukn protocol");
/*
* turn off gcc warnings
*/
@@ -1053,9 +1054,8 @@ void ca_process_input_queue()
return;
}
for( piiu=(IIU *)iiuList.node.next;
piiu;
piiu=(IIU *)piiu->node.next){
for(piiu=(IIU *)iiuList.node.next;
piiu; piiu=(IIU *)piiu->node.next){
if(piiu->state!=iiu_connected){
continue;
@@ -1146,9 +1146,10 @@ LOCAL void tcp_recv_msg(struct ioc_in_use *piiu)
*/
LOCAL void ca_process_tcp(struct ioc_in_use *piiu)
{
caAddrNode *pNode;
int status;
long bytesToProcess;
caAddrNode *pNode;
int status;
long bytesToProcess;
unsigned countDown;
LOCK;
@@ -1164,7 +1165,20 @@ LOCAL void ca_process_tcp(struct ioc_in_use *piiu)
pNode = (caAddrNode *) piiu->destAddr.node.next;
while(TRUE){
/*
* Dont loop forever here if the server floods a slow
* client with monitors and there is a flush call in
* an event routine. This code was specifically added
* to allow a client to get out of an infinite loop
* occurring when the client does a put/flush in a
* monitor call back to the same PV that is being
* monitored
*
* it should be sufficent to read two sections from the
* ring buffer in case the populated space is split
*/
countDown = 3;
while (--countDown) {
bytesToProcess = cacRingBufferReadSize(&piiu->recv, TRUE);
if(bytesToProcess == 0){
break;
@@ -1173,7 +1187,7 @@ LOCAL void ca_process_tcp(struct ioc_in_use *piiu)
/* post message to the user */
status = post_msg(
piiu,
&pNode->destAddr.in.sin_addr,
&pNode->destAddr.in,
&piiu->recv.buf[piiu->recv.rdix],
bytesToProcess);
if(status != OK){
@@ -1326,7 +1340,7 @@ LOCAL void ca_process_udp(struct ioc_in_use *piiu)
status = post_msg(
piiu,
&pmsglog->addr.sin_addr,
&pmsglog->addr,
(char *)(pmsglog+1),
pmsglog->nbytes);
if(status != OK || piiu->curMsgBytes){
@@ -1406,22 +1420,10 @@ LOCAL void close_ioc (IIU *piiu)
assert (pNode);
removeBeaconInetAddr (&pNode->destAddr.in);
/*
* Mark all of their channels disconnected
* prior to calling handlers incase the
* handler tries to use a channel before
* I mark it disconnected.
*/
chix = (ciu) ellFirst(&piiu->chidlist);
while (chix) {
chix->state = cs_prev_conn;
chix = (ciu) ellNext(&chix->node);
}
chix = (ciu) ellFirst(&piiu->chidlist);
while (chix) {
pNext = (ciu) ellNext(&chix->node);
cacDisconnectChannel(chix, cs_conn);
cacDisconnectChannel (chix);
chix = pNext;
}
}
@@ -1459,10 +1461,21 @@ LOCAL void close_ioc (IIU *piiu)
/*
* cacDisconnectChannel()
*/
void cacDisconnectChannel(ciu chix, enum channel_state state)
void cacDisconnectChannel(ciu chix)
{
LOCK;
/*
* if a client initiated channel delete is pending then we will
* never get a delete confirm message from this server, and will therefore
* need to take care of freeing the remaing channel resources here
*/
if (chix->state == cs_closed) {
clearChannelResources (chix->cid);
UNLOCK;
return;
}
chix->privType = TYPENOTCONN;
chix->privCount = 0u;
chix->id.sid = ~0u;
@@ -1472,7 +1485,9 @@ void cacDisconnectChannel(ciu chix, enum channel_state state)
/*
* call their connection handler as required
*/
if (state==cs_conn) {
if (chix->state==cs_conn) {
miu monix, next;
chix->state = cs_prev_conn;
/*
@@ -1487,6 +1502,25 @@ void cacDisconnectChannel(ciu chix, enum channel_state state)
caIOBlockListFree (&pend_write_list, chix,
TRUE, ECA_DISCONN);
/*
* look for events that have an event cancel in progress
*/
for (monix = (miu) ellFirst (&chix->eventq);
monix; monix = next) {
next = (miu) ellNext (&monix->node);
/*
* if there is an event cancel in progress
* delete the event - we will never receive
* an event cancel confirm from this server
*/
if (monix->usr_func == NULL) {
ellDelete (&chix->eventq, &monix->node);
caIOBlockFree (monix);
}
}
if (chix->pConnFunc) {
struct connection_handler_args args;
@@ -1845,20 +1879,15 @@ void epicsShareAPI caAddConfiguredAddr(ELLLIST *pList, const ENV_PARAM *pEnv,
while( (pToken = getToken(&pStr, buf, sizeof(buf))) ){
status = aToIPAddr(pToken, port, &addr.in);
if (status<0) {
ca_printf(
"%s: Parsing '%s'\n",
__FILE__,
pEnv->name);
ca_printf(
"\tBad internet address format: '%s'\n",
pToken);
ca_printf("%s: Parsing '%s'\n", __FILE__, pEnv->name);
ca_printf("\tBad internet address or host name: '%s'\n", pToken);
continue;
}
pNode = (caAddrNode *) calloc(1,sizeof(*pNode));
if(pNode){
pNode->destAddr.in = addr.in;
pNode->srcAddr.in = localAddr.in;
ellAdd(pList, &pNode->node);
pNode = (caAddrNode *) calloc (1, sizeof(*pNode));
if (pNode) {
pNode->destAddr = addr;
pNode->srcAddr = localAddr;
ellAdd (pList, &pNode->node);
}
}
@@ -1872,13 +1901,13 @@ void epicsShareAPI caAddConfiguredAddr(ELLLIST *pList, const ENV_PARAM *pEnv,
*/
LOCAL char *getToken(const char **ppString, char *pBuf, unsigned bufSIze)
{
const char *pToken;
const char *pToken;
unsigned i;
pToken = *ppString;
while(isspace(*pToken)&&*pToken){
pToken++;
}
pToken = *ppString;
while(isspace(*pToken)&&*pToken){
pToken++;
}
for (i=0u; i<bufSIze; i++) {
if (isspace(pToken[i]) || pToken[i]=='\0') {
@@ -1886,16 +1915,16 @@ LOCAL char *getToken(const char **ppString, char *pBuf, unsigned bufSIze)
break;
}
pBuf[i] = pToken[i];
}
}
*ppString = &pToken[i];
*ppString = &pToken[i];
if(*pToken){
return pBuf;
}
else{
return NULL;
}
if(*pToken){
return pBuf;
}
else{
return NULL;
}
}
@@ -1966,23 +1995,28 @@ unsigned short epicsShareAPI caFetchPortConfig
*/
void cac_mux_io(struct timeval *ptimeout)
{
int count;
struct timeval timeout;
int count;
struct timeval timeout;
unsigned countDown;
cac_clean_iiu_list();
cac_clean_iiu_list();
/*
* manage search timers and detect disconnects
*/
manage_conn();
/*
* manage search timers and detect disconnects
*/
manage_conn();
/*
* first check for pending recv's with a zero time out so that
* 1) flow control works correctly (and)
* 2) we queue up sends resulting from recvs properly
* (this results in improved max throughput)
*
* ... but dont allow this to go on forever if a fast
* server is flooding a slow client with monitors ...
*/
while (TRUE) {
countDown = 512u;
while (--countDown) {
CLR_CA_TIME (&timeout);
/*
* NOTE cac_select_io() will set the
@@ -1990,19 +2024,25 @@ void cac_mux_io(struct timeval *ptimeout)
* of what is requested here if piiu->pushPending
* is set
*/
count = cac_select_io(&timeout, CA_DO_RECVS);
count = cac_select_io(&timeout, CA_DO_RECVS);
if (count<=0) {
break;
}
ca_process_input_queue();
}
}
/*
* next check for pending writes's with the specified time out
*
* ... but dont allow this to go on forever if a fast
* server is flooding a slow client with monitors ...
*/
timeout = *ptimeout;
while (TRUE) {
count = cac_select_io(&timeout, CA_DO_RECVS|CA_DO_SENDS);
if (count<=0) {
countDown = 512u;
timeout = *ptimeout;
while (TRUE) {
count = cac_select_io(&timeout, CA_DO_RECVS|CA_DO_SENDS);
countDown--;
if (count<=0 || countDown==0u) {
/*
* if its a flush then loop until all
* of the send buffers are empty
@@ -2017,6 +2057,7 @@ void cac_mux_io(struct timeval *ptimeout)
}
else {
if (caSendMsgPending()) {
countDown = 512u;
LD_CA_TIME (cac_fetch_poll_period(), &timeout);
}
else {
@@ -2033,7 +2074,7 @@ void cac_mux_io(struct timeval *ptimeout)
CLR_CA_TIME (&timeout);
}
ca_process_input_queue();
}
}
}