pointer => client id

This commit is contained in:
Jeff Hill
1993-09-09 16:46:48 +00:00
parent d079b80394
commit 2a812b8d2b
6 changed files with 198 additions and 217 deletions

View File

@@ -348,6 +348,10 @@ int ca_task_initialize
ca_static->ca_exception_func = ca_default_exception_handler;
ca_static->ca_exception_arg = NULL;
ca_static->ca_pBucket = bucketCreate(CLIENT_ID_WIDTH);
if(!ca_static->ca_pBucket)
abort();
status = broadcast_addr(&ca_static->ca_castaddr);
if(status == OK){
ca_static->ca_cast_available = TRUE;
@@ -954,14 +958,13 @@ int ca_build_and_connect
if (!chix)
abort();
chix->paddr = (void *) (strcnt + (char *) (chix + 1));
*(struct db_addr *) chix->paddr = tmp_paddr;
chix->id.paddr = (struct db_addr *)
(strcnt + (char *) (chix + 1));
*chix->id.paddr = tmp_paddr;
chix->puser = puser;
chix->connection_func = conn_func;
chix->type = ((struct db_addr *)
chix->paddr)->field_type;
chix->count = ((struct db_addr *)
chix->paddr)->no_elements;
chix->type = chix->id.paddr->field_type;
chix->count = chix->id.paddr->no_elements;
chix->iocix = LOCAL_IIU;
chix->state = cs_conn;
ellInit(&chix->eventq);
@@ -1002,6 +1005,7 @@ int ca_build_and_connect
status = ECA_NOCAST;
}
else{
/* allocate CHIU (channel in use) block */
/* also allocate enough for the channel name */
*chixptr = chix = (chid) malloc(sizeof(*chix) + strcnt);
@@ -1019,11 +1023,21 @@ int ca_build_and_connect
free((char *) chix);
goto exit;
}
chix->cid = CLIENT_ID_ALLOC;
status = bucketAddItem(pBucket, chix->cid, chix);
if(status != BUCKET_SUCCESS){
*chixptr = (chid) NULL;
free((char *) chix);
status = ECA_ALLOCMEM;
goto exit;
}
chix->puser = puser;
chix->connection_func = conn_func;
chix->type = TYPENOTCONN; /* invalid initial type */
chix->count = 0;/* invalid initial count */
chix->paddr = (void *) NULL; /* invalid initial paddr */
chix->type = TYPENOTCONN; /* invalid initial type */
chix->count = 0; /* invalid initial count */
chix->id.sid = ~0L; /* invalid initial server id */
/* save stuff for build retry if required */
chix->build_type = get_type;
@@ -1051,6 +1065,8 @@ int ca_build_and_connect
if (VALID_BUILD(chix))
SETPENDRECV;
}
status = ECA_NORMAL;
exit:
UNLOCK;
@@ -1094,10 +1110,10 @@ void build_msg(chix, reply_type)
}
mptr->m_cmmd = htons(cmd);
mptr->m_available = (int) chix;
mptr->m_available = chix->cid;
mptr->m_type = reply_type;
mptr->m_count = 0;
mptr->m_pciu = (void *) chix;
mptr->m_cid = chix->cid;
if (cmd == IOC_BUILD) {
/* msg header only on db read req */
@@ -1107,7 +1123,7 @@ void build_msg(chix, reply_type)
mptr->m_type = htons(chix->build_type);
mptr->m_count = htons(chix->build_count);
mptr->m_available = (int) chix->build_value;
mptr->m_pciu = 0;
mptr->m_cid = ~0L;
}
/*
* channel name string - forces a NULL at the end because
@@ -1162,7 +1178,7 @@ void *pvalue
if (chix->iocix == LOCAL_IIU) {
status = db_get_field(
chix->paddr,
chix->id.paddr,
type,
pvalue,
count,
@@ -1190,7 +1206,7 @@ void *pvalue
mptr->m_type = htons(type);
mptr->m_available = (long) pvalue;
mptr->m_count = htons(count);
mptr->m_pciu = chix->paddr;
mptr->m_cid = chix->id.sid;
CAC_ADD_MSG(piiu);
@@ -1254,7 +1270,7 @@ ca_array_get_callback
ev.chan = chix;
ev.type = type;
ev.count = count;
ca_event_handler(&ev, chix->paddr, NULL, NULL);
ca_event_handler(&ev, chix->id.paddr, NULL, NULL);
return ECA_NORMAL;
}
#endif
@@ -1331,7 +1347,7 @@ issue_get_callback(monix)
mptr->m_type = htons(monix->type);
mptr->m_available = (long) monix;
mptr->m_count = htons(count);
mptr->m_pciu = chix->paddr;
mptr->m_cid = chix->id.sid;
CAC_ADD_MSG(piiu);
@@ -1395,7 +1411,7 @@ void *pvalue;
int status;
if(chix->iocix == LOCAL_IIU){
status = db_put_field( chix->paddr,
status = db_put_field( chix->id.paddr,
type,
pvalue,
count);
@@ -1494,8 +1510,8 @@ void *pvalue;
mptr->m_cmmd = htons(IOC_WRITE);
mptr->m_type = htons(type);
mptr->m_count = htons(count);
mptr->m_pciu = chix->paddr;
mptr->m_available = (long) chix;
mptr->m_cid = chix->id.sid;
mptr->m_available = ~0L;
CAC_ADD_MSG(&iiu[chix->iocix]);
UNLOCK;
@@ -1721,7 +1737,7 @@ long mask;
if(chix->iocix == LOCAL_IIU){
status = db_add_event( evuser,
chix->paddr,
chix->id.paddr,
ca_event_handler,
monix,
mask,
@@ -1819,7 +1835,7 @@ ca_request_event(monix)
mptr->m_header.m_available = (long) monix;
mptr->m_header.m_type = htons(monix->type);
mptr->m_header.m_count = htons(count);
mptr->m_header.m_pciu = chix->paddr;
mptr->m_header.m_cid = chix->id.sid;
/* msg body */
htonf(&monix->p_delta, &mptr->m_info.m_hval);
@@ -2063,7 +2079,7 @@ evid monix;
mptr->m_available = (long) monix;
mptr->m_type = chix->type;
mptr->m_count = chix->count;
mptr->m_pciu = chix->paddr;
mptr->m_cid = chix->id.sid;
/*
* NOTE: I free the monitor block only
@@ -2108,6 +2124,7 @@ ca_clear_channel
chid chix;
#endif
{
int status;
register evid monix;
struct ioc_in_use *piiu = &iiu[chix->iocix];
register struct extmsg *mptr;
@@ -2172,6 +2189,12 @@ chid chix;
!piiu->chidlist.count){
close_ioc(piiu);
}
status = bucketRemoveItem(pBucket, chix->cid, chix);
if(status != BUCKET_SUCCESS){
ca_signal(
ECA_INTERNAL,
"bad id at channel delete");
}
free((char *) chix);
break; /* to unlock exit */
}
@@ -2191,7 +2214,7 @@ chid chix;
mptr->m_available = (int) chix;
mptr->m_type = 0;
mptr->m_count = 0;
mptr->m_pciu = chix->paddr;
mptr->m_cid = chix->id.sid;
/*
* NOTE: I free the chid and monitor blocks only after
@@ -2258,9 +2281,6 @@ int early;
}
/* Flush the send buffers */
LOCK;
cac_send_msg();
UNLOCK;
if(pndrecvcnt<1 && early){
return ECA_NORMAL;
@@ -2271,30 +2291,17 @@ int early;
*/
if((timeout*SYSFREQ)<LOCALTICKS && timeout != 0.0){
/*
* on UNIX call recv_msg() with zero timeout. on vxWorks
* and VMS recv_msg() need not be called.
*
* cac_send_msg() only calls recv_msg on UNIX if the
* buffer needs to be flushed.
*
*/
#ifdef UNIX
{
struct timeval notimeout;
/*
* locking only a comment on UNIX
*/
notimeout.tv_sec = 0;
notimeout.tv_usec = 0;
recv_msg_select(&notimeout);
}
#endif
LOCK;
manage_conn(!early);
if(pndrecvcnt>0 && early){
ca_pend_io_cleanup();
}
/*
* also takes care of outstanding recvs
* under UNIX
*/
cac_send_msg();
UNLOCK;
if(pndrecvcnt<1 && early){
@@ -2308,7 +2315,16 @@ int early;
beg_time = time(NULL);
while(TRUE){
#if defined(UNIX)
/*
* also takes care of outstanding recvs
* under UNIX
*/
LOCK;
manage_conn(TRUE);
cac_send_msg();
UNLOCK;
#if defined(UNIX)
{
struct timeval itimeout;
@@ -2318,11 +2334,11 @@ int early;
recv_msg_select(&itimeout);
UNLOCK;
}
#else
# if defined(vxWorks)
semTake(io_done_sem, LOCALTICKS);
# else
# if defined(VMS)
# else
# if defined(vxWorks)
semTake(io_done_sem, LOCALTICKS);
# else
# if defined(VMS)
{
int status;
unsigned int systim[2]={-LOCALTICKS,~0};
@@ -2339,15 +2355,12 @@ int early;
if(status != SS$_NORMAL)
lib$signal(status);
}
# else
# else
@@@@ dont compile in this case @@@@
# endif
# endif
#endif
# endif
# endif
# endif
LOCK;
manage_conn(TRUE);
UNLOCK;
if(pndrecvcnt<1 && early)
return ECA_NORMAL;
@@ -2359,6 +2372,7 @@ int early;
if(early){
ca_pend_io_cleanup();
}
cac_send_msg();
UNLOCK;
return ECA_TIMEOUT;
}
@@ -2662,7 +2676,7 @@ chid pchan;
}
*mptr = nullmsg;
mptr->m_cmmd = htons(IOC_CLAIM_CIU);
mptr->m_pciu = pchan->paddr;
mptr->m_cid = pchan->id.sid;
CAC_ADD_MSG(piiu);
piiu->send_needed = TRUE;
}

View File

@@ -20,6 +20,8 @@
/* .06 111892 joh tuned up cast retries */
/* .07 010493 joh print retry count when `<Trying>' */
/* .08 010493 joh removed `<Trying>' message */
/* .09 090293 joh removed flush from manage_conn */
/* (now handled by the send needed flag) */
/* */
/*_begin */
/************************************************************************/
@@ -152,11 +154,6 @@ char silent;
ca_signal(ECA_CHIDRETRY, sprintf_buf);
}
}
if(keepalive_cnt|retry_cnt){
cac_send_msg();
}
}

View File

@@ -608,10 +608,10 @@ void cac_send_msg()
register struct ioc_in_use *piiu;
int done;
int status;
int retry_count;
# define RETRY_INIT 100
retry_count = RETRY_INIT;
for(piiu=iiu; piiu<&iiu[nxtiiu]; piiu++){
piiu->send_retry_count = SEND_RETRY_COUNT_INIT;
}
if(!ca_static->ca_repeater_contacted)
notify_ca_repeater();
@@ -639,14 +639,11 @@ void cac_send_msg()
* frees up push pull deadlock only
* if recv not already in progress
*/
#if defined(UNIX)
# if defined(UNIX)
if(post_msg_active==0){
recv_msg_select(&notimeout);
}
#else
# if defined(vxWorks)
# endif
#endif
# endif
done = TRUE;
for(piiu=iiu; piiu<&iiu[nxtiiu]; piiu++){
@@ -656,7 +653,16 @@ void cac_send_msg()
status = cac_send_msg_piiu(piiu);
if(status<0){
done = FALSE;
if(piiu->send_retry_count == 0){
ca_signal(
ECA_DLCKREST,
piiu->host_name_str);
close_ioc(piiu);
}
else{
piiu->send_retry_count--;
done = FALSE;
}
}
}
@@ -676,32 +682,6 @@ void cac_send_msg()
break;
}
if(retry_count-- <= 0){
char *iocname;
struct in_addr *inaddr;
for(piiu=iiu; piiu<&iiu[nxtiiu]; piiu++){
if(piiu->send->stk){
inaddr = &piiu->sock_addr.sin_addr;
iocname = piiu->host_name_str;
#define CLOSE_ON_EXPIRED /* kill conn if we pend to long on it */
# ifdef CLOSE_ON_EXPIRED
ca_signal(
ECA_DLCKREST,
iocname);
close_ioc(piiu);
# else
ca_signal(
ECA_SERVBEHIND,
iocname);
# endif
}
}
# ifdef CLOSE_ON_EXPIRED
break;
# else
retry_count = RETRY_INIT;
# endif
}
TCPDELAY;
}
@@ -1015,6 +995,7 @@ struct ioc_in_use *piiu;
return;
}
piiu->send_retry_count = SEND_RETRY_COUNT_INIT;
rcvb->stk += byte_cnt;
@@ -1318,7 +1299,7 @@ struct ioc_in_use *piiu;
chix->type = TYPENOTCONN;
chix->count = 0;
chix->state = cs_prev_conn;
chix->paddr = NULL;
chix->id.sid = ~0L;
}
/*

View File

@@ -74,11 +74,9 @@ static char *iocinfhSccsId = "@(#)iocinf.h 1.15\t6/2/93";
# endif
#endif
# include <ellLib.h>
#ifndef INCos_depenh
# include <os_depen.h>
#endif
#include <bucketLib.h>
#include <ellLib.h>
#include <os_depen.h>
#ifndef min
#define min(A,B) ((A)>(B)?(B):(A))
@@ -126,6 +124,12 @@ typedef unsigned long ca_time;
#define MAX_CONTIGUOUS_MSG_COUNT 2
#define CLIENT_ID_WIDTH 20 /* bits (1 million before rollover) */
#define CLIENT_ID_COUNT (1<<CLIENT_ID_WIDTH)
#define CLIENT_ID_MASK (CLIENT_ID_COUNT-1)
#define CLIENT_ID_ALLOC (CLIENT_ID_MASK&nextBucketId++)
#define SEND_RETRY_COUNT_INIT 100
#define iiu (ca_static->ca_iiu)
#define pndrecvcnt (ca_static->ca_pndrecvcnt)
@@ -143,6 +147,8 @@ typedef unsigned long ca_time;
#define post_msg_active (ca_static->ca_post_msg_active)
#define send_msg_active (ca_static->ca_send_msg_active)
#define sprintf_buf (ca_static->ca_sprintf_buf)
#define pBucket (ca_static->ca_pBucket)
#define nextBucketId (ca_static->ca_nextBucketId)
#if defined(UNIX)
# define readch (ca_static->ca_readch)
@@ -189,6 +195,8 @@ struct ca_static{
short ca_cast_available;
struct in_addr ca_castaddr;
char ca_sprintf_buf[128];
BUCKET *ca_pBucket;
unsigned long ca_nextBucketId;
#if defined(UNIX)
fd_set ca_readch;
fd_set ca_excepch;
@@ -234,6 +242,7 @@ struct ca_static{
short send_needed; /* CA needs a send */
char host_name_str[32];
unsigned nconn_tries;
unsigned send_retry_count;
ca_time next_retry;
ca_time retry_delay;
#define MAXCONNTRIES 30

View File

@@ -92,7 +92,7 @@ struct extmsg {
unsigned short m_postsize; /* size of message extension */
unsigned short m_type; /* operation data type */
unsigned short m_count; /* operation data count */
void *m_pciu; /* ptr to server channel in use */
unsigned long m_cid; /* channel identifier */
unsigned long m_available; /* undefined message location for use
* by client processes */
};

View File

@@ -95,7 +95,6 @@ static char *sccsId = "@(#)service.c 1.17\t6/2/93";
static void reconnect_channel();
void ca_request_event();
static int client_channel_exists();
#define BUFSTAT ca_printf("CAC: expected %d left %d\n",msgcnt,*pbufcnt);
@@ -278,9 +277,21 @@ post_msg(hdrptr, pbufcnt, pnet_addr, piiu)
case IOC_READ:
case IOC_READ_BUILD:
{
chid chan = (chid) hdrptr->m_pciu;
chid chan;
unsigned size;
/*
* verify the channel id
*/
LOCK;
chan = bucketLookupItem(pBucket, hdrptr->m_cid);
UNLOCK;
if(!chan){
ca_signal(ECA_INTERNAL,
"bad client channel id from server");
break;
}
/*
* ignore IOC_READ_BUILDS after
* connection occurs
@@ -337,79 +348,9 @@ post_msg(hdrptr, pbufcnt, pnet_addr, piiu)
}
case IOC_SEARCH:
case IOC_BUILD:
{
chid chan = (chid) t_available;
struct ioc_in_use *chpiiu;
/*
* ignore broadcast replies for deleted channels
*
* lock required for client_channel_exists()
* lock required around use of the sprintf buffer
*/
LOCK;
status = client_channel_exists(chan);
if (!status) {
sprintf(
sprintf_buf,
"Chid %x Search reply from %s",
chan,
host_from_addr(pnet_addr));
ca_signal(ECA_NOCHANMSG, sprintf_buf);
break;
}
UNLOCK;
chpiiu = &iiu[chan->iocix];
if (chan->paddr) {
if (chpiiu->sock_addr.sin_addr.s_addr ==
pnet_addr->s_addr) {
ca_printf("<Extra> ");
# ifdef UNIX
fflush(stdout);
# endif
} else {
char acc[128];
char rej[128];
sprintf(acc,
"%s",
chpiiu->host_name_str);
sprintf(rej,
"%s",
host_from_addr(pnet_addr));
LOCK;
sprintf(
sprintf_buf,
"Channel: %s Accepted: %s Rejected: %s ",
chan + 1,
acc,
rej);
ca_signal(ECA_DBLCHNL, sprintf_buf);
UNLOCK;
}
# ifdef IOC_READ_FOLLOWING_BUILD
/*
* IOC_BUILD messages always have a
* IOC_READ msg following. (IOC_BUILD
* messages are sometimes followed by
* error messages which are ignored
* on double replies)
*/
if (t_cmmd == IOC_BUILD){
msgcnt += sizeof(struct extmsg) +
ntohs((hdrptr + 1)->m_postsize);
}
# endif
break;
}
reconnect_channel(hdrptr, pnet_addr);
break;
}
case IOC_READ_SYNC:
piiu->outstanding_ack_count--;
piiu->read_seq++;
@@ -477,6 +418,12 @@ post_msg(hdrptr, pbufcnt, pnet_addr, piiu)
}
ellConcat(&free_event_list, &chix->eventq);
ellDelete(&piiu->chidlist, chix);
status = bucketRemoveItem(pBucket, chix->cid, chix);
if(status != BUCKET_SUCCESS){
ca_signal(
ECA_INTERNAL,
"bad id at channel delete");
}
free(chix);
piiu->outstanding_ack_count--;
if (!piiu->chidlist.count)
@@ -545,8 +492,10 @@ post_msg(hdrptr, pbufcnt, pnet_addr, piiu)
break;
}
LOCK;
args.chid = bucketLookupItem(pBucket, hdrptr->m_cid);
UNLOCK;
args.usr = ca_static->ca_exception_arg;
args.chid = (chid) hdrptr->m_pciu;
args.type = ntohs(req->m_type);
args.count = ntohs(req->m_count);
args.addr = (void *) (req->m_available);
@@ -592,14 +541,78 @@ static void reconnect_channel(hdrptr,pnet_addr)
register struct extmsg *hdrptr;
struct in_addr *pnet_addr;
{
chid chan = (chid) hdrptr->m_available;
chid chan;
unsigned short newiocix;
evid pevent;
int status;
enum channel_state prev_cs;
struct ioc_in_use *chpiiu;
/*
* ignore broadcast replies for deleted channels
*
* lock required around use of the sprintf buffer
*/
LOCK;
chan = bucketLookupItem(
pBucket,
hdrptr->m_available);
if(!chan){
sprintf(
sprintf_buf,
"Search reply from %s with server id %x",
host_from_addr(pnet_addr),
hdrptr->m_available);
ca_signal(ECA_NOCHANMSG, sprintf_buf);
UNLOCK;
return;
}
chpiiu = &iiu[chan->iocix];
if (chan->state == cs_conn) {
if (chpiiu->sock_addr.sin_addr.s_addr ==
pnet_addr->s_addr) {
ca_printf("<Extra> ");
# ifdef UNIX
fflush(stdout);
# endif
} else {
char acc[128];
char rej[128];
sprintf(acc,
"%s",
chpiiu->host_name_str);
sprintf(rej,
"%s",
host_from_addr(pnet_addr));
sprintf(
sprintf_buf,
"Channel: %s Accepted: %s Rejected: %s ",
chan + 1,
acc,
rej);
ca_signal(ECA_DBLCHNL, sprintf_buf);
}
# ifdef IOC_READ_FOLLOWING_BUILD
/*
* IOC_BUILD messages always have a
* IOC_READ msg following. (IOC_BUILD
* messages are sometimes followed by
* error messages which are ignored
* on double replies)
*/
if (t_cmmd == IOC_BUILD){
msgcnt += sizeof(struct extmsg) +
ntohs((hdrptr + 1)->m_postsize);
}
# endif
UNLOCK;
return;
}
LOCK;
status = alloc_ioc (
pnet_addr,
IPPROTO_TCP,
@@ -609,13 +622,14 @@ struct in_addr *pnet_addr;
ca_printf("CAC: ... %s ...\n", ca_message(status));
ca_printf("CAC: for %s on %s\n", chan+1, host_from_addr(pnet_addr));
ca_printf("CAC: ignored search reply- proceeding\n");
UNLOCK;
return;
}
/* Update rmt chid fields from extmsg fields */
chan->type = ntohs(hdrptr->m_type);
chan->count = ntohs(hdrptr->m_count);
chan->paddr = hdrptr->m_pciu;
chan->id.sid = hdrptr->m_cid;
if(chan->iocix != newiocix){
struct ioc_in_use *chpiiu;
@@ -718,37 +732,3 @@ int lock;
UNLOCK;
}
}
/*
* client_channel_exists()
* (usually will find it in the first piiu)
*
* LOCK should be on while in this routine
*
* iocix field in the chid block not used here because
* I dont trust the chid ptr yet.
*/
static int
client_channel_exists(chan)
chid chan;
{
register struct ioc_in_use *piiu;
register struct ioc_in_use *pnext_iiu = &iiu[nxtiiu];
int status;
for (piiu = iiu; piiu < pnext_iiu; piiu++) {
/*
* ellFind returns the node number or ERROR
*/
status = ellFind(&piiu->chidlist, chan);
if (status != ERROR) {
return TRUE;
}
}
return FALSE;
}