diff --git a/src/ca/access.c b/src/ca/access.c index d0de2fea3..eab584c79 100644 --- a/src/ca/access.c +++ b/src/ca/access.c @@ -348,6 +348,10 @@ int ca_task_initialize ca_static->ca_exception_func = ca_default_exception_handler; ca_static->ca_exception_arg = NULL; + ca_static->ca_pBucket = bucketCreate(CLIENT_ID_WIDTH); + if(!ca_static->ca_pBucket) + abort(); + status = broadcast_addr(&ca_static->ca_castaddr); if(status == OK){ ca_static->ca_cast_available = TRUE; @@ -954,14 +958,13 @@ int ca_build_and_connect if (!chix) abort(); - chix->paddr = (void *) (strcnt + (char *) (chix + 1)); - *(struct db_addr *) chix->paddr = tmp_paddr; + chix->id.paddr = (struct db_addr *) + (strcnt + (char *) (chix + 1)); + *chix->id.paddr = tmp_paddr; chix->puser = puser; chix->connection_func = conn_func; - chix->type = ((struct db_addr *) - chix->paddr)->field_type; - chix->count = ((struct db_addr *) - chix->paddr)->no_elements; + chix->type = chix->id.paddr->field_type; + chix->count = chix->id.paddr->no_elements; chix->iocix = LOCAL_IIU; chix->state = cs_conn; ellInit(&chix->eventq); @@ -1002,6 +1005,7 @@ int ca_build_and_connect status = ECA_NOCAST; } else{ + /* allocate CHIU (channel in use) block */ /* also allocate enough for the channel name */ *chixptr = chix = (chid) malloc(sizeof(*chix) + strcnt); @@ -1019,11 +1023,21 @@ int ca_build_and_connect free((char *) chix); goto exit; } + + chix->cid = CLIENT_ID_ALLOC; + status = bucketAddItem(pBucket, chix->cid, chix); + if(status != BUCKET_SUCCESS){ + *chixptr = (chid) NULL; + free((char *) chix); + status = ECA_ALLOCMEM; + goto exit; + } + chix->puser = puser; chix->connection_func = conn_func; - chix->type = TYPENOTCONN; /* invalid initial type */ - chix->count = 0;/* invalid initial count */ - chix->paddr = (void *) NULL; /* invalid initial paddr */ + chix->type = TYPENOTCONN; /* invalid initial type */ + chix->count = 0; /* invalid initial count */ + chix->id.sid = ~0L; /* invalid initial server id */ /* save stuff for build retry if required */ chix->build_type = get_type; @@ -1051,6 +1065,8 @@ int ca_build_and_connect if (VALID_BUILD(chix)) SETPENDRECV; } + + status = ECA_NORMAL; exit: UNLOCK; @@ -1094,10 +1110,10 @@ void build_msg(chix, reply_type) } mptr->m_cmmd = htons(cmd); - mptr->m_available = (int) chix; + mptr->m_available = chix->cid; mptr->m_type = reply_type; mptr->m_count = 0; - mptr->m_pciu = (void *) chix; + mptr->m_cid = chix->cid; if (cmd == IOC_BUILD) { /* msg header only on db read req */ @@ -1107,7 +1123,7 @@ void build_msg(chix, reply_type) mptr->m_type = htons(chix->build_type); mptr->m_count = htons(chix->build_count); mptr->m_available = (int) chix->build_value; - mptr->m_pciu = 0; + mptr->m_cid = ~0L; } /* * channel name string - forces a NULL at the end because @@ -1162,7 +1178,7 @@ void *pvalue if (chix->iocix == LOCAL_IIU) { status = db_get_field( - chix->paddr, + chix->id.paddr, type, pvalue, count, @@ -1190,7 +1206,7 @@ void *pvalue mptr->m_type = htons(type); mptr->m_available = (long) pvalue; mptr->m_count = htons(count); - mptr->m_pciu = chix->paddr; + mptr->m_cid = chix->id.sid; CAC_ADD_MSG(piiu); @@ -1254,7 +1270,7 @@ ca_array_get_callback ev.chan = chix; ev.type = type; ev.count = count; - ca_event_handler(&ev, chix->paddr, NULL, NULL); + ca_event_handler(&ev, chix->id.paddr, NULL, NULL); return ECA_NORMAL; } #endif @@ -1331,7 +1347,7 @@ issue_get_callback(monix) mptr->m_type = htons(monix->type); mptr->m_available = (long) monix; mptr->m_count = htons(count); - mptr->m_pciu = chix->paddr; + mptr->m_cid = chix->id.sid; CAC_ADD_MSG(piiu); @@ -1395,7 +1411,7 @@ void *pvalue; int status; if(chix->iocix == LOCAL_IIU){ - status = db_put_field( chix->paddr, + status = db_put_field( chix->id.paddr, type, pvalue, count); @@ -1494,8 +1510,8 @@ void *pvalue; mptr->m_cmmd = htons(IOC_WRITE); mptr->m_type = htons(type); mptr->m_count = htons(count); - mptr->m_pciu = chix->paddr; - mptr->m_available = (long) chix; + mptr->m_cid = chix->id.sid; + mptr->m_available = ~0L; CAC_ADD_MSG(&iiu[chix->iocix]); UNLOCK; @@ -1721,7 +1737,7 @@ long mask; if(chix->iocix == LOCAL_IIU){ status = db_add_event( evuser, - chix->paddr, + chix->id.paddr, ca_event_handler, monix, mask, @@ -1819,7 +1835,7 @@ ca_request_event(monix) mptr->m_header.m_available = (long) monix; mptr->m_header.m_type = htons(monix->type); mptr->m_header.m_count = htons(count); - mptr->m_header.m_pciu = chix->paddr; + mptr->m_header.m_cid = chix->id.sid; /* msg body */ htonf(&monix->p_delta, &mptr->m_info.m_hval); @@ -2063,7 +2079,7 @@ evid monix; mptr->m_available = (long) monix; mptr->m_type = chix->type; mptr->m_count = chix->count; - mptr->m_pciu = chix->paddr; + mptr->m_cid = chix->id.sid; /* * NOTE: I free the monitor block only @@ -2108,6 +2124,7 @@ ca_clear_channel chid chix; #endif { + int status; register evid monix; struct ioc_in_use *piiu = &iiu[chix->iocix]; register struct extmsg *mptr; @@ -2172,6 +2189,12 @@ chid chix; !piiu->chidlist.count){ close_ioc(piiu); } + status = bucketRemoveItem(pBucket, chix->cid, chix); + if(status != BUCKET_SUCCESS){ + ca_signal( + ECA_INTERNAL, + "bad id at channel delete"); + } free((char *) chix); break; /* to unlock exit */ } @@ -2191,7 +2214,7 @@ chid chix; mptr->m_available = (int) chix; mptr->m_type = 0; mptr->m_count = 0; - mptr->m_pciu = chix->paddr; + mptr->m_cid = chix->id.sid; /* * NOTE: I free the chid and monitor blocks only after @@ -2258,9 +2281,6 @@ int early; } /* Flush the send buffers */ - LOCK; - cac_send_msg(); - UNLOCK; if(pndrecvcnt<1 && early){ return ECA_NORMAL; @@ -2271,30 +2291,17 @@ int early; */ if((timeout*SYSFREQ)0 && early){ ca_pend_io_cleanup(); } + + /* + * also takes care of outstanding recvs + * under UNIX + */ + cac_send_msg(); UNLOCK; if(pndrecvcnt<1 && early){ @@ -2308,7 +2315,16 @@ int early; beg_time = time(NULL); while(TRUE){ -#if defined(UNIX) + /* + * also takes care of outstanding recvs + * under UNIX + */ + LOCK; + manage_conn(TRUE); + cac_send_msg(); + UNLOCK; + +#if defined(UNIX) { struct timeval itimeout; @@ -2318,11 +2334,11 @@ int early; recv_msg_select(&itimeout); UNLOCK; } -#else -# if defined(vxWorks) - semTake(io_done_sem, LOCALTICKS); -# else -# if defined(VMS) +# else +# if defined(vxWorks) + semTake(io_done_sem, LOCALTICKS); +# else +# if defined(VMS) { int status; unsigned int systim[2]={-LOCALTICKS,~0}; @@ -2339,15 +2355,12 @@ int early; if(status != SS$_NORMAL) lib$signal(status); } -# else +# else @@@@ dont compile in this case @@@@ -# endif -# endif -#endif +# endif +# endif +# endif - LOCK; - manage_conn(TRUE); - UNLOCK; if(pndrecvcnt<1 && early) return ECA_NORMAL; @@ -2359,6 +2372,7 @@ int early; if(early){ ca_pend_io_cleanup(); } + cac_send_msg(); UNLOCK; return ECA_TIMEOUT; } @@ -2662,7 +2676,7 @@ chid pchan; } *mptr = nullmsg; mptr->m_cmmd = htons(IOC_CLAIM_CIU); - mptr->m_pciu = pchan->paddr; + mptr->m_cid = pchan->id.sid; CAC_ADD_MSG(piiu); piiu->send_needed = TRUE; } diff --git a/src/ca/conn.c b/src/ca/conn.c index 11d605058..1934aad10 100644 --- a/src/ca/conn.c +++ b/src/ca/conn.c @@ -20,6 +20,8 @@ /* .06 111892 joh tuned up cast retries */ /* .07 010493 joh print retry count when `' */ /* .08 010493 joh removed `' message */ +/* .09 090293 joh removed flush from manage_conn */ +/* (now handled by the send needed flag) */ /* */ /*_begin */ /************************************************************************/ @@ -152,11 +154,6 @@ char silent; ca_signal(ECA_CHIDRETRY, sprintf_buf); } } - - if(keepalive_cnt|retry_cnt){ - cac_send_msg(); - } - } diff --git a/src/ca/iocinf.c b/src/ca/iocinf.c index 9919a3f9e..98db59756 100644 --- a/src/ca/iocinf.c +++ b/src/ca/iocinf.c @@ -608,10 +608,10 @@ void cac_send_msg() register struct ioc_in_use *piiu; int done; int status; - int retry_count; -# define RETRY_INIT 100 - retry_count = RETRY_INIT; + for(piiu=iiu; piiu<&iiu[nxtiiu]; piiu++){ + piiu->send_retry_count = SEND_RETRY_COUNT_INIT; + } if(!ca_static->ca_repeater_contacted) notify_ca_repeater(); @@ -639,14 +639,11 @@ void cac_send_msg() * frees up push pull deadlock only * if recv not already in progress */ -#if defined(UNIX) +# if defined(UNIX) if(post_msg_active==0){ recv_msg_select(¬imeout); } -#else -# if defined(vxWorks) -# endif -#endif +# endif done = TRUE; for(piiu=iiu; piiu<&iiu[nxtiiu]; piiu++){ @@ -656,7 +653,16 @@ void cac_send_msg() status = cac_send_msg_piiu(piiu); if(status<0){ - done = FALSE; + if(piiu->send_retry_count == 0){ + ca_signal( + ECA_DLCKREST, + piiu->host_name_str); + close_ioc(piiu); + } + else{ + piiu->send_retry_count--; + done = FALSE; + } } } @@ -676,32 +682,6 @@ void cac_send_msg() break; } - if(retry_count-- <= 0){ - char *iocname; - struct in_addr *inaddr; - for(piiu=iiu; piiu<&iiu[nxtiiu]; piiu++){ - if(piiu->send->stk){ - inaddr = &piiu->sock_addr.sin_addr; - iocname = piiu->host_name_str; -#define CLOSE_ON_EXPIRED /* kill conn if we pend to long on it */ -# ifdef CLOSE_ON_EXPIRED - ca_signal( - ECA_DLCKREST, - iocname); - close_ioc(piiu); -# else - ca_signal( - ECA_SERVBEHIND, - iocname); -# endif - } - } -# ifdef CLOSE_ON_EXPIRED - break; -# else - retry_count = RETRY_INIT; -# endif - } TCPDELAY; } @@ -1015,6 +995,7 @@ struct ioc_in_use *piiu; return; } + piiu->send_retry_count = SEND_RETRY_COUNT_INIT; rcvb->stk += byte_cnt; @@ -1318,7 +1299,7 @@ struct ioc_in_use *piiu; chix->type = TYPENOTCONN; chix->count = 0; chix->state = cs_prev_conn; - chix->paddr = NULL; + chix->id.sid = ~0L; } /* diff --git a/src/ca/iocinf.h b/src/ca/iocinf.h index 7be73b4b5..c8b2c8d7e 100644 --- a/src/ca/iocinf.h +++ b/src/ca/iocinf.h @@ -74,11 +74,9 @@ static char *iocinfhSccsId = "@(#)iocinf.h 1.15\t6/2/93"; # endif #endif -# include - -#ifndef INCos_depenh -# include -#endif +#include +#include +#include #ifndef min #define min(A,B) ((A)>(B)?(B):(A)) @@ -126,6 +124,12 @@ typedef unsigned long ca_time; #define MAX_CONTIGUOUS_MSG_COUNT 2 +#define CLIENT_ID_WIDTH 20 /* bits (1 million before rollover) */ +#define CLIENT_ID_COUNT (1<ca_iiu) #define pndrecvcnt (ca_static->ca_pndrecvcnt) @@ -143,6 +147,8 @@ typedef unsigned long ca_time; #define post_msg_active (ca_static->ca_post_msg_active) #define send_msg_active (ca_static->ca_send_msg_active) #define sprintf_buf (ca_static->ca_sprintf_buf) +#define pBucket (ca_static->ca_pBucket) +#define nextBucketId (ca_static->ca_nextBucketId) #if defined(UNIX) # define readch (ca_static->ca_readch) @@ -189,6 +195,8 @@ struct ca_static{ short ca_cast_available; struct in_addr ca_castaddr; char ca_sprintf_buf[128]; + BUCKET *ca_pBucket; + unsigned long ca_nextBucketId; #if defined(UNIX) fd_set ca_readch; fd_set ca_excepch; @@ -234,6 +242,7 @@ struct ca_static{ short send_needed; /* CA needs a send */ char host_name_str[32]; unsigned nconn_tries; + unsigned send_retry_count; ca_time next_retry; ca_time retry_delay; #define MAXCONNTRIES 30 diff --git a/src/ca/iocmsg.h b/src/ca/iocmsg.h index fc28ac93c..ccc4dc387 100644 --- a/src/ca/iocmsg.h +++ b/src/ca/iocmsg.h @@ -92,7 +92,7 @@ struct extmsg { unsigned short m_postsize; /* size of message extension */ unsigned short m_type; /* operation data type */ unsigned short m_count; /* operation data count */ - void *m_pciu; /* ptr to server channel in use */ + unsigned long m_cid; /* channel identifier */ unsigned long m_available; /* undefined message location for use * by client processes */ }; diff --git a/src/ca/service.c b/src/ca/service.c index 33a46dba9..b85c9d3a3 100644 --- a/src/ca/service.c +++ b/src/ca/service.c @@ -95,7 +95,6 @@ static char *sccsId = "@(#)service.c 1.17\t6/2/93"; static void reconnect_channel(); void ca_request_event(); -static int client_channel_exists(); #define BUFSTAT ca_printf("CAC: expected %d left %d\n",msgcnt,*pbufcnt); @@ -278,9 +277,21 @@ post_msg(hdrptr, pbufcnt, pnet_addr, piiu) case IOC_READ: case IOC_READ_BUILD: { - chid chan = (chid) hdrptr->m_pciu; + chid chan; unsigned size; + /* + * verify the channel id + */ + LOCK; + chan = bucketLookupItem(pBucket, hdrptr->m_cid); + UNLOCK; + if(!chan){ + ca_signal(ECA_INTERNAL, + "bad client channel id from server"); + break; + } + /* * ignore IOC_READ_BUILDS after * connection occurs @@ -337,79 +348,9 @@ post_msg(hdrptr, pbufcnt, pnet_addr, piiu) } case IOC_SEARCH: case IOC_BUILD: - { - chid chan = (chid) t_available; - struct ioc_in_use *chpiiu; - - /* - * ignore broadcast replies for deleted channels - * - * lock required for client_channel_exists() - * lock required around use of the sprintf buffer - */ - LOCK; - status = client_channel_exists(chan); - if (!status) { - sprintf( - sprintf_buf, - "Chid %x Search reply from %s", - chan, - host_from_addr(pnet_addr)); - ca_signal(ECA_NOCHANMSG, sprintf_buf); - break; - } - UNLOCK; - - chpiiu = &iiu[chan->iocix]; - - if (chan->paddr) { - - if (chpiiu->sock_addr.sin_addr.s_addr == - pnet_addr->s_addr) { - ca_printf(" "); -# ifdef UNIX - fflush(stdout); -# endif - } else { - char acc[128]; - char rej[128]; - - sprintf(acc, - "%s", - chpiiu->host_name_str); - sprintf(rej, - "%s", - host_from_addr(pnet_addr)); - LOCK; - sprintf( - sprintf_buf, - "Channel: %s Accepted: %s Rejected: %s ", - chan + 1, - acc, - rej); - ca_signal(ECA_DBLCHNL, sprintf_buf); - UNLOCK; - } -# ifdef IOC_READ_FOLLOWING_BUILD - /* - * IOC_BUILD messages always have a - * IOC_READ msg following. (IOC_BUILD - * messages are sometimes followed by - * error messages which are ignored - * on double replies) - */ - if (t_cmmd == IOC_BUILD){ - msgcnt += sizeof(struct extmsg) + - ntohs((hdrptr + 1)->m_postsize); - } -# endif - break; - } reconnect_channel(hdrptr, pnet_addr); - - break; - } + case IOC_READ_SYNC: piiu->outstanding_ack_count--; piiu->read_seq++; @@ -477,6 +418,12 @@ post_msg(hdrptr, pbufcnt, pnet_addr, piiu) } ellConcat(&free_event_list, &chix->eventq); ellDelete(&piiu->chidlist, chix); + status = bucketRemoveItem(pBucket, chix->cid, chix); + if(status != BUCKET_SUCCESS){ + ca_signal( + ECA_INTERNAL, + "bad id at channel delete"); + } free(chix); piiu->outstanding_ack_count--; if (!piiu->chidlist.count) @@ -545,8 +492,10 @@ post_msg(hdrptr, pbufcnt, pnet_addr, piiu) break; } + LOCK; + args.chid = bucketLookupItem(pBucket, hdrptr->m_cid); + UNLOCK; args.usr = ca_static->ca_exception_arg; - args.chid = (chid) hdrptr->m_pciu; args.type = ntohs(req->m_type); args.count = ntohs(req->m_count); args.addr = (void *) (req->m_available); @@ -592,14 +541,78 @@ static void reconnect_channel(hdrptr,pnet_addr) register struct extmsg *hdrptr; struct in_addr *pnet_addr; { - chid chan = (chid) hdrptr->m_available; + chid chan; unsigned short newiocix; evid pevent; int status; enum channel_state prev_cs; + struct ioc_in_use *chpiiu; + /* + * ignore broadcast replies for deleted channels + * + * lock required around use of the sprintf buffer + */ + LOCK; + chan = bucketLookupItem( + pBucket, + hdrptr->m_available); + if(!chan){ + sprintf( + sprintf_buf, + "Search reply from %s with server id %x", + host_from_addr(pnet_addr), + hdrptr->m_available); + ca_signal(ECA_NOCHANMSG, sprintf_buf); + UNLOCK; + return; + } + + chpiiu = &iiu[chan->iocix]; + + if (chan->state == cs_conn) { + + if (chpiiu->sock_addr.sin_addr.s_addr == + pnet_addr->s_addr) { + ca_printf(" "); +# ifdef UNIX + fflush(stdout); +# endif + } else { + char acc[128]; + char rej[128]; + + sprintf(acc, + "%s", + chpiiu->host_name_str); + sprintf(rej, + "%s", + host_from_addr(pnet_addr)); + sprintf( + sprintf_buf, + "Channel: %s Accepted: %s Rejected: %s ", + chan + 1, + acc, + rej); + ca_signal(ECA_DBLCHNL, sprintf_buf); + } +# ifdef IOC_READ_FOLLOWING_BUILD + /* + * IOC_BUILD messages always have a + * IOC_READ msg following. (IOC_BUILD + * messages are sometimes followed by + * error messages which are ignored + * on double replies) + */ + if (t_cmmd == IOC_BUILD){ + msgcnt += sizeof(struct extmsg) + + ntohs((hdrptr + 1)->m_postsize); + } +# endif + UNLOCK; + return; + } - LOCK; status = alloc_ioc ( pnet_addr, IPPROTO_TCP, @@ -609,13 +622,14 @@ struct in_addr *pnet_addr; ca_printf("CAC: ... %s ...\n", ca_message(status)); ca_printf("CAC: for %s on %s\n", chan+1, host_from_addr(pnet_addr)); ca_printf("CAC: ignored search reply- proceeding\n"); + UNLOCK; return; } /* Update rmt chid fields from extmsg fields */ chan->type = ntohs(hdrptr->m_type); chan->count = ntohs(hdrptr->m_count); - chan->paddr = hdrptr->m_pciu; + chan->id.sid = hdrptr->m_cid; if(chan->iocix != newiocix){ struct ioc_in_use *chpiiu; @@ -718,37 +732,3 @@ int lock; UNLOCK; } } - - - - -/* - * client_channel_exists() - * (usually will find it in the first piiu) - * - * LOCK should be on while in this routine - * - * iocix field in the chid block not used here because - * I dont trust the chid ptr yet. - */ -static int -client_channel_exists(chan) - chid chan; -{ - register struct ioc_in_use *piiu; - register struct ioc_in_use *pnext_iiu = &iiu[nxtiiu]; - int status; - - for (piiu = iiu; piiu < pnext_iiu; piiu++) { - /* - * ellFind returns the node number or ERROR - */ - status = ellFind(&piiu->chidlist, chan); - if (status != ERROR) { - return TRUE; - } - } - return FALSE; -} - -