fixed vx exit handler problems

This commit is contained in:
Jeff Hill
1994-11-23 21:03:41 +00:00
parent ded5d21444
commit 3fee3eceff
12 changed files with 587 additions and 438 deletions

View File

@@ -461,81 +461,80 @@ int APIENTRY ca_task_initialize(void)
{
int status;
struct ca_static *ca_temp;
unsigned sec;
if (!ca_static) {
ca_temp = (struct ca_static *)
calloc(1, sizeof(*ca_temp));
if (!ca_temp)
return ECA_ALLOCMEM;
/*
* os dependent
*/
status = cac_add_task_variable(ca_temp);
if(status != ECA_NORMAL){
free(ca_temp);
return status;
}
ca_static->ca_exception_func = ca_default_exception_handler;
ca_static->ca_exception_arg = NULL;
/* record a default user name */
ca_static->ca_pUserName = localUserName();
if(!ca_static->ca_pUserName){
free(ca_static);
calloc(1, sizeof(*ca_temp));
if (!ca_temp) {
return ECA_ALLOCMEM;
}
/* record a default user name */
ca_static->ca_pHostName = localHostName();
if(!ca_static->ca_pHostName){
free(ca_static->ca_pUserName);
free(ca_static);
return ECA_ALLOCMEM;
}
/* init sync group facility */
ca_sg_init();
/*
* init broadcasted search counters
*/
ca_static->ca_search_retry = 0;
ca_static->ca_conn_next_retry = CA_CURRENT_TIME;
sec = CA_RECAST_DELAY;
ca_static->ca_conn_retry_delay.tv_sec = sec;
ca_static->ca_conn_retry_delay.tv_usec =
(CA_RECAST_DELAY-sec)*USEC_PER_SEC;
ellInit(&ca_static->ca_iiuList);
ellInit(&ca_static->ca_ioeventlist);
ellInit(&ca_static->ca_free_event_list);
ellInit(&ca_static->ca_pend_read_list);
ellInit(&ca_static->ca_pend_write_list);
ellInit(&ca_static->putCvrtBuf);
ca_static->ca_pSlowBucket =
bucketCreate(CLIENT_HASH_TBL_SIZE);
assert(ca_static->ca_pSlowBucket);
ca_static->ca_pFastBucket =
bucketCreate(CLIENT_HASH_TBL_SIZE);
assert(ca_static->ca_pFastBucket);
status = cac_os_depen_init(ca_static);
if(status != ECA_NORMAL){
free(ca_static->ca_pUserName);
free(ca_static);
return status;
}
if (repeater_installed()==FALSE) {
ca_spawn_repeater();
}
status = cac_os_depen_init (ca_temp);
return status;
}
return ECA_NORMAL;
}
/*
* ca_os_independent_init ()
*/
int ca_os_independent_init (void)
{
unsigned sec;
ca_static->ca_exception_func = ca_default_exception_handler;
ca_static->ca_exception_arg = NULL;
/* record a default user name */
ca_static->ca_pUserName = localUserName();
if(!ca_static->ca_pUserName){
free(ca_static);
return ECA_ALLOCMEM;
}
/* record a default user name */
ca_static->ca_pHostName = localHostName();
if(!ca_static->ca_pHostName){
free(ca_static->ca_pUserName);
free(ca_static);
return ECA_ALLOCMEM;
}
/* init sync group facility */
ca_sg_init();
/*
* init broadcasted search counters
*/
ca_static->ca_search_retry = 0;
ca_static->ca_conn_next_retry = CA_CURRENT_TIME;
sec = CA_RECAST_DELAY;
ca_static->ca_conn_retry_delay.tv_sec = sec;
ca_static->ca_conn_retry_delay.tv_usec =
(CA_RECAST_DELAY-sec)*USEC_PER_SEC;
ellInit(&ca_static->ca_iiuList);
ellInit(&ca_static->ca_ioeventlist);
ellInit(&ca_static->ca_free_event_list);
ellInit(&ca_static->ca_pend_read_list);
ellInit(&ca_static->ca_pend_write_list);
ellInit(&ca_static->putCvrtBuf);
ellInit(&ca_static->fdInfoFreeList);
ellInit(&ca_static->fdInfoList);
ca_static->ca_pSlowBucket =
bucketCreate(CLIENT_HASH_TBL_SIZE);
assert(ca_static->ca_pSlowBucket);
ca_static->ca_pFastBucket =
bucketCreate(CLIENT_HASH_TBL_SIZE);
assert(ca_static->ca_pFastBucket);
if (repeater_installed()==FALSE) {
ca_spawn_repeater();
}
return ECA_NORMAL;
}
@@ -580,17 +579,18 @@ LOCAL void create_udp_fd()
4096,
(FUNCPTR)cac_recv_task,
(int)taskIdCurrent,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL);
if(status<0)
0,
0,
0,
0,
0,
0,
0,
0,
0);
if (status<0) {
ca_signal(ECA_INTERNAL,NULL);
}
ca_static->recv_tid = status;
@@ -716,7 +716,10 @@ int APIENTRY ca_modify_user_name(char *pClientName)
*/
int APIENTRY ca_task_exit (void)
{
ca_process_exit(ca_static);
/*
* This indirectly calls ca_process_exit() below
*/
cac_os_depen_exit(ca_static);
return ECA_NORMAL;
}
@@ -726,13 +729,13 @@ int APIENTRY ca_task_exit (void)
/*
*
* CA_TASK_EXIT_TID() / CA_PROCESS_EXIT()
* attempts to release all resources alloc to a channel access client
*
* NOTE: on vxWorks if a CA task is deleted or crashes while a
* lock is set then a deadlock will occur when this routine is called.
* releases all resources alloc to a channel access client
*
* On multi thread os it is assumed that all threads are
* before calling this routine.
*
*/
void ca_process_exit(struct ca_static *ca_temp)
void ca_process_exit()
{
chid chix;
chid chixNext;
@@ -741,124 +744,127 @@ void ca_process_exit(struct ca_static *ca_temp)
IIU *piiu;
int status;
if (ca_temp) {
assert(ca_static);
/*
* after activity eliminated
* close all sockets before clearing chid blocks and remote
* event blocks
*/
piiu = (struct ioc_in_use *)
ca_temp->ca_iiuList.node.next;
while(piiu){
if(ca_temp->ca_fd_register_func){
(*ca_temp->ca_fd_register_func)(
ca_temp->ca_fd_register_arg,
piiu->sock_chan,
FALSE);
}
if (socket_close(piiu->sock_chan) < 0){
ca_signal(
ECA_INTERNAL,
"Corrupt iiu list- at close");
}
piiu = (struct ioc_in_use *) piiu->node.next;
LOCK;
/*
* after activity eliminated
* close all sockets before clearing chid blocks and remote
* event blocks
*/
piiu = (struct ioc_in_use *)
ca_static->ca_iiuList.node.next;
while(piiu){
if(ca_static->ca_fd_register_func){
(*ca_static->ca_fd_register_func)(
ca_static->ca_fd_register_arg,
piiu->sock_chan,
FALSE);
}
/*
* remove remote chid blocks and event blocks
*/
piiu = (struct ioc_in_use *)
ca_temp->ca_iiuList.node.next;
while(piiu){
chix = (chid) ellFirst(&piiu->chidlist);
while (chix) {
chixNext = (chid) ellNext (&chix->node);
clearChannelResources (ca_temp, chix->cid);
chix = chixNext;
}
/*
* free message body cache
*/
if(piiu->pCurData){
free(piiu->pCurData);
piiu->pCurData = NULL;
piiu->curDataMax = 0;
}
/*
* free address list
*/
ellFree(&piiu->destAddr);
piiu = (struct ioc_in_use *) piiu->node.next;
if (socket_close(piiu->sock_chan) < 0){
ca_signal(
ECA_INTERNAL,
"Corrupt iiu list- at close");
}
/* remove any pending read blocks */
monix = (evid) ellFirst(&ca_temp->ca_pend_read_list);
while (monix) {
monixNext = (evid) ellNext (&monix->node);
caIOBlockFree (ca_temp, monix);
monix = monixNext;
}
/* remove any pending write blocks */
monix = (evid) ellFirst(&ca_temp->ca_pend_write_list);
while (monix) {
monixNext = (evid) ellNext (&monix->node);
caIOBlockFree (ca_temp, monix);
monix = monixNext;
}
/* remove any pending io event blocks */
ellFree(&ca_temp->ca_ioeventlist);
/* remove put convert block free list */
ellFree(&ca_temp->putCvrtBuf);
/* reclaim sync group resources */
ca_sg_shutdown(ca_temp);
/* remove remote waiting ev blocks */
ellFree(&ca_temp->ca_free_event_list);
/*
* remove IOCs in use
*/
ellFree(&ca_temp->ca_iiuList);
/*
* free user name string
*/
if(ca_temp->ca_pUserName){
free(ca_temp->ca_pUserName);
}
/*
* free host name string
*/
if(ca_temp->ca_pHostName){
free(ca_temp->ca_pHostName);
}
/*
* free hash tables
*/
status = bucketFree(ca_temp->ca_pSlowBucket);
assert(status == BUCKET_SUCCESS);
status = bucketFree(ca_temp->ca_pFastBucket);
assert(status == BUCKET_SUCCESS);
/*
* free beacon hash table
*/
freeBeaconHash(ca_temp);
free((char *)ca_temp);
ca_static = (struct ca_static *) NULL;
piiu = (struct ioc_in_use *) piiu->node.next;
}
/*
* remove remote chid blocks and event blocks
*/
piiu = (struct ioc_in_use *)
ca_static->ca_iiuList.node.next;
while(piiu){
chix = (chid) ellFirst(&piiu->chidlist);
while (chix) {
chixNext = (chid) ellNext (&chix->node);
clearChannelResources (chix->cid);
chix = chixNext;
}
/*
* free message body cache
*/
if(piiu->pCurData){
free(piiu->pCurData);
piiu->pCurData = NULL;
piiu->curDataMax = 0;
}
/*
* free address list
*/
ellFree(&piiu->destAddr);
piiu = (struct ioc_in_use *) piiu->node.next;
}
/* remove any pending read blocks */
monix = (evid) ellFirst(&ca_static->ca_pend_read_list);
while (monix) {
monixNext = (evid) ellNext (&monix->node);
caIOBlockFree (monix);
monix = monixNext;
}
/* remove any pending write blocks */
monix = (evid) ellFirst(&ca_static->ca_pend_write_list);
while (monix) {
monixNext = (evid) ellNext (&monix->node);
caIOBlockFree (monix);
monix = monixNext;
}
/* remove any pending io event blocks */
ellFree(&ca_static->ca_ioeventlist);
/* remove put convert block free list */
ellFree(&ca_static->putCvrtBuf);
/* reclaim sync group resources */
ca_sg_shutdown(ca_static);
/* remove remote waiting ev blocks */
ellFree(&ca_static->ca_free_event_list);
/* free select context lists */
ellFree(&ca_static->fdInfoFreeList);
ellFree(&ca_static->fdInfoList);
/*
* remove IOCs in use
*/
ellFree(&ca_static->ca_iiuList);
/*
* free user name string
*/
if(ca_static->ca_pUserName){
free(ca_static->ca_pUserName);
}
/*
* free host name string
*/
if(ca_static->ca_pHostName){
free(ca_static->ca_pHostName);
}
/*
* free hash tables
*/
status = bucketFree(ca_static->ca_pSlowBucket);
assert(status == BUCKET_SUCCESS);
status = bucketFree(ca_static->ca_pFastBucket);
assert(status == BUCKET_SUCCESS);
/*
* free beacon hash table
*/
freeBeaconHash(ca_static);
UNLOCK;
}
@@ -1222,7 +1228,7 @@ void *arg
ev.chan = chix;
ev.type = type;
ev.count = count;
ca_event_handler(&ev, chix->id.paddr, NULL, NULL);
ca_event_handler(&ev, chix->id.paddr, 0, NULL);
return ECA_NORMAL;
}
#endif
@@ -1290,13 +1296,13 @@ LOCAL evid caIOBlockCreate(void)
/*
* caIOBlockFree()
*/
void caIOBlockFree(struct ca_static *pCAC, evid pIOBlock)
void caIOBlockFree(evid pIOBlock)
{
int status;
LOCK;
status = bucketRemoveItemUnsignedId(
pCAC->ca_pFastBucket,
ca_static->ca_pFastBucket,
&pIOBlock->id);
assert (status == BUCKET_SUCCESS);
pIOBlock->id = ~0U; /* this id always invalid */
@@ -1494,7 +1500,7 @@ void *usrarg
pvalue);
if(status != ECA_NORMAL){
if(chix->piiu){
caIOBlockFree(ca_static, monix);
caIOBlockFree(monix);
}
return status;
}
@@ -2458,7 +2464,7 @@ int APIENTRY ca_clear_channel (chid chix)
*/
if(old_chan_state != cs_conn){
UNLOCK;
clearChannelResources (ca_static, chix->cid);
clearChannelResources (chix->cid);
return ECA_NORMAL;
}
@@ -2491,7 +2497,7 @@ int APIENTRY ca_clear_channel (chid chix)
/*
* clearChannelResources()
*/
void clearChannelResources(struct ca_static *pCAC, unsigned id)
void clearChannelResources(unsigned id)
{
struct ioc_in_use *piiu;
chid chix;
@@ -2501,7 +2507,7 @@ void clearChannelResources(struct ca_static *pCAC, unsigned id)
LOCK;
chix = bucketLookupItemUnsignedId(pCAC->ca_pSlowBucket, &id);
chix = bucketLookupItemUnsignedId(ca_static->ca_pSlowBucket, &id);
assert ( chix!=NULL );
piiu = chix->piiu;
@@ -2510,15 +2516,15 @@ void clearChannelResources(struct ca_static *pCAC, unsigned id)
* remove any orphaned get callbacks for this
* channel
*/
for (monix = (evid) ellFirst (&pCAC->ca_pend_read_list);
for (monix = (evid) ellFirst (&ca_static->ca_pend_read_list);
monix;
monix = next) {
next = (evid) ellNext (&monix->node);
if (monix->chan == chix) {
ellDelete (
&pCAC->ca_pend_read_list,
&ca_static->ca_pend_read_list,
&monix->node);
caIOBlockFree (pCAC, monix);
caIOBlockFree (monix);
}
}
for (monix = (evid) ellFirst (&chix->eventq);
@@ -2526,11 +2532,11 @@ void clearChannelResources(struct ca_static *pCAC, unsigned id)
monix = next){
assert (monix->chan == chix);
next = (evid) ellNext (&monix->node);
caIOBlockFree(pCAC, monix);
caIOBlockFree(monix);
}
ellDelete (&piiu->chidlist, &chix->node);
status = bucketRemoveItemUnsignedId (
pCAC->ca_pSlowBucket, &chix->cid);
ca_static->ca_pSlowBucket, &chix->cid);
assert (status == BUCKET_SUCCESS);
free (chix);
if (!piiu->chidlist.count){

View File

@@ -471,7 +471,10 @@ int doacctst(char *pname)
SEVCHK(ca_modify_user_name("Willma"), NULL);
SEVCHK(ca_modify_host_name("Bed Rock"), NULL);
assert(conn_get_cb_count == 3);
if (conn_get_cb_count != 3){
printf ("!!!! Connect cb count = %d expected = 3 !!!!\n",
conn_get_cb_count);
}
printf("-- Put/Gets done- waiting for Events --\n");
status = ca_pend_event(10.0);

View File

@@ -36,6 +36,9 @@
/*
* cac_select_io()
*
* NOTE: on multithreaded systems this assumes that the
* local implementation of select is reentrant
*/
int cac_select_io(struct timeval *ptimeout, int flags)
{
@@ -43,15 +46,31 @@ int cac_select_io(struct timeval *ptimeout, int flags)
IIU *piiu;
unsigned long freespace;
int maxfd;
caFDInfo *pfdi;
LOCK;
pfdi = (caFDInfo *) ellGet(&ca_static->fdInfoFreeList);
if (!pfdi) {
pfdi = (caFDInfo *) calloc (1, sizeof(*pfdi));
if (!pfdi) {
ca_printf("CAC: no mem for select ctx?\n");
UNLOCK;
return -1;
}
}
ellAdd (&ca_static->fdInfoList, &pfdi->node);
UNLOCK;
FD_ZERO (&pfdi->readMask);
FD_ZERO (&pfdi->writeMask);
maxfd = 0;
for( piiu=(IIU *)iiuList.node.next;
for( piiu = (IIU *) iiuList.node.next;
piiu;
piiu=(IIU *)piiu->node.next){
piiu = (IIU *) piiu->node.next) {
if(!piiu->conn_up){
if (!piiu->conn_up) {
continue;
}
@@ -60,21 +79,23 @@ int cac_select_io(struct timeval *ptimeout, int flags)
* space for the maximum UDP message
*/
if (flags&CA_DO_RECVS) {
freespace = cacRingBufferWriteSize(&piiu->recv, TRUE);
if(freespace>=piiu->minfreespace){
maxfd = max(maxfd,piiu->sock_chan);
FD_SET(piiu->sock_chan,&readch);
freespace = cacRingBufferWriteSize (&piiu->recv, TRUE);
if (freespace>=piiu->minfreespace) {
maxfd = max (maxfd,piiu->sock_chan);
FD_SET (piiu->sock_chan, &pfdi->readMask);
}
}
if (flags&CA_DO_SENDS) {
if(cacRingBufferReadSize(&piiu->send, FALSE)>0){
maxfd = max(maxfd,piiu->sock_chan);
FD_SET(piiu->sock_chan,&writech);
if (cacRingBufferReadSize(&piiu->send, FALSE)>0) {
maxfd = max (maxfd,piiu->sock_chan);
FD_SET (piiu->sock_chan, &pfdi->writeMask);
}
}
}
UNLOCK;
pfdi->writeSave = pfdi->writeMask;
pfdi->readSave = pfdi->readMask;
#if 0
printf( "max fd=%d tv_usec=%d tv_sec=%d\n",
@@ -84,52 +105,64 @@ printf( "max fd=%d tv_usec=%d tv_sec=%d\n",
#endif
status = select(
maxfd+1,
&readch,
&writech,
&pfdi->readMask,
&pfdi->writeMask,
NULL,
ptimeout);
#if 0
printf("leaving select stat=%d errno=%d \n", status, MYERRNO);
#endif
if(status<0){
if(MYERRNO == EINTR){
if (status<0) {
if (MYERRNO == EINTR) {
}
else if(MYERRNO == EWOULDBLOCK){
else if (MYERRNO == EWOULDBLOCK) {
ca_printf("CAC: blocked at select ?\n");
}
else{
ca_printf(
else if (MYERRNO == ESRCH) {
}
else {
ca_printf (
"CAC: unexpected select fail: %s\n",
strerror(MYERRNO));
return status;
}
}
LOCK;
if(status>0){
for( piiu=(IIU *)iiuList.node.next;
LOCK;
if (status>0) {
for ( piiu = (IIU *) iiuList.node.next;
piiu;
piiu=(IIU *)piiu->node.next){
piiu = (IIU *) piiu->node.next) {
if(!piiu->conn_up){
if (!piiu->conn_up) {
continue;
}
if(flags&CA_DO_SENDS &&
FD_ISSET(piiu->sock_chan,&writech)){
if (FD_ISSET(piiu->sock_chan,&pfdi->writeMask)) {
(*piiu->sendBytes)(piiu);
}
if(flags&CA_DO_RECVS &&
FD_ISSET(piiu->sock_chan,&readch)){
#if 0
else{
if (FD_ISSET(piiu->sock_chan, &pfdi->writeSave)) {
if(FD_ISSET(piiu->sock_chan, &pfdi->readSave) &&
!FD_ISSET(piiu->sock_chan,&pfdi->readMask)) {
printf("Still waiting to send on %d with recv empty\n", piiu->sock_chan);
}
if(!FD_ISSET(piiu->sock_chan, &pfdi->readSave)){
printf("Still waiting to send on %d with no recv wait?\n", piiu->sock_chan);
}
}
}
#endif
if (FD_ISSET(piiu->sock_chan,&pfdi->readMask)) {
(*piiu->recvBytes)(piiu);
}
FD_CLR(piiu->sock_chan,&readch);
FD_CLR(piiu->sock_chan,&writech);
}
}
UNLOCK;
ellDelete (&ca_static->fdInfoList, &pfdi->node);
ellAdd (&ca_static->fdInfoFreeList, &pfdi->node);
UNLOCK;
return status;
}

View File

@@ -70,6 +70,9 @@ void manage_conn(int silent)
ca_real delay;
unsigned long idelay;
/*
* prevent recursion
*/
if(ca_static->ca_manage_conn_active){
return;
}
@@ -80,6 +83,7 @@ void manage_conn(int silent)
/*
* issue connection heartbeat
* (if we dont see a beacon)
*/
LOCK;
for( piiu = (IIU *) iiuList.node.next;
@@ -98,7 +102,7 @@ void manage_conn(int silent)
delay = cac_time_diff (
&current,
&piiu->timeAtSendBlock);
if(delay > CA_RETRY_PERIOD){
if(delay > CA_CONN_VERIFY_PERIOD){
TAG_CONN_DOWN(piiu);
continue;
}
@@ -174,6 +178,7 @@ void manage_conn(int silent)
* Stop here if there are not any disconnected channels
*/
if(!piiuCast) {
ca_static->ca_manage_conn_active = FALSE;
return;
}
if (piiuCast->chidlist.count == 0) {
@@ -183,10 +188,7 @@ void manage_conn(int silent)
if(ca_static->ca_conn_next_retry.tv_sec == CA_CURRENT_TIME.tv_sec &&
ca_static->ca_conn_next_retry.tv_usec == CA_CURRENT_TIME.tv_usec){
ca_static->ca_conn_next_retry =
cac_time_sum (
&current,
&ca_static->ca_conn_retry_delay);
ca_static->ca_conn_next_retry = current;
LOGRETRYINTERVAL
}

View File

@@ -40,6 +40,14 @@ static char *sccsId = "@(#) $Id$";
#include "iocinf.h"
/*
* Dont use ca_static based lock macros here because this is
* also called by the server. All locks required are applied at
* a higher level.
*/
#undef LOCK
#undef UNLOCK
/*
* local_addr()
@@ -140,6 +148,9 @@ int local_addr(int s, struct sockaddr_in *plcladdr)
*
* Load the list with the broadcast address for all
* interfaces found that support broadcast.
*
* LOCK should be applied here for (pList)
* (this is also called from the server)
*/
void caDiscoverInterfaces(ELLLIST *pList, int socket, int port)
{
@@ -255,10 +266,10 @@ void caDiscoverInterfaces(ELLLIST *pList, int socket, int port)
pNode->destAddr.inetAddr.sin_port = htons(port);
pNode->srcAddr.inetAddr = localAddr;
LOCK;
/*
* LOCK applied externally
*/
ellAdd(pList, &pNode->node);
UNLOCK;
}
free(pIfreqList);

View File

@@ -425,10 +425,16 @@ int net_proto
ca_signal(ECA_INTERNAL,"bind failed");
}
/*
* LOCK is for piiu->destAddr list
* (lock outside because this is used by the server also)
*/
LOCK;
caDiscoverInterfaces(
&piiu->destAddr,
sock,
CA_SERVER_PORT);
UNLOCK;
caAddConfiguredAddr(
&piiu->destAddr,
@@ -437,7 +443,8 @@ int net_proto
CA_SERVER_PORT);
cacRingBufferInit(&piiu->recv, sizeof(piiu->send.buf));
cacRingBufferInit(&piiu->send, min(MAX_UDP, sizeof(piiu->send.buf)));
cacRingBufferInit(&piiu->send, min(MAX_UDP,
sizeof(piiu->send.buf)));
strncpy(
piiu->host_name_str,
@@ -462,18 +469,6 @@ int net_proto
UNLOCKEVENTS;
}
/*
* setup the recv thread
* (OS dependent)
*/
status = cac_setup_recv_thread(piiu);
if(status != ECA_NORMAL){
free(piiu);
status = socket_close(sock);
return status;
}
/*
* add to the list of active IOCs
*/
@@ -1093,7 +1088,7 @@ LOCAL void ca_process_udp(struct ioc_in_use *piiu)
*
*
*/
void close_ioc(struct ioc_in_use *piiu)
void close_ioc (struct ioc_in_use *piiu)
{
caAddrNode *pNode;
chid chix;
@@ -1224,7 +1219,7 @@ void close_ioc(struct ioc_in_use *piiu)
*
* NOTE: potential race condition here can result
* in two copies of the repeater being spawned
* however the repeater detectes this, prints a message,
* however the repeater detects this, prints a message,
* and lets the other task start the repeater.
*
* QUESTION: is there a better way to test for a port in use?

View File

@@ -257,8 +257,6 @@ typedef struct caclient_put_notify{
#define pFastBucket (ca_static->ca_pFastBucket)
#define nextSlowBucketId (ca_static->ca_nextSlowBucketId)
#define nextFastBucketId (ca_static->ca_nextFastBucketId)
#define readch (ca_static->ca_readch)
#define writech (ca_static->ca_writech)
#if defined(vxWorks)
# define io_done_sem (ca_static->ca_io_done_sem)
@@ -370,6 +368,18 @@ typedef struct beaconHashEntry{
ca_real averagePeriod;
}bhe;
/*
* This struct allocated off of a free list
* so that the select() ctx is thread safe
*/
typedef struct {
ELLNODE node;
fd_set readMask;
fd_set writeMask;
fd_set writeSave;
fd_set readSave;
}caFDInfo;
struct ca_static{
ELLLIST ca_iiuList;
ELLLIST ca_ioeventlist;
@@ -381,20 +391,23 @@ struct ca_static{
ELLLIST activeCASGOP;
ELLLIST freeCASGOP;
ELLLIST putCvrtBuf;
ELLLIST fdInfoFreeList;
ELLLIST fdInfoList;
ca_time ca_conn_next_retry;
ca_time ca_conn_retry_delay;
ca_time ca_last_repeater_try;
fd_set ca_readch;
fd_set ca_writech;
long ca_pndrecvcnt;
unsigned long ca_nextSlowBucketId;
unsigned long ca_nextFastBucketId;
IIU *ca_piiuCast;
void (*ca_exception_func)();
void (*ca_exception_func)
(struct exception_handler_args);
void *ca_exception_arg;
void (*ca_connection_func)();
void (*ca_connection_func)
(struct connection_handler_args);
void *ca_connection_arg;
void (*ca_fd_register_func)();
void (*ca_fd_register_func)
(void *, SOCKET, int);
void *ca_fd_register_arg;
char *ca_pUserName;
char *ca_pHostName;
@@ -473,9 +486,9 @@ struct ca_static *ca_static;
*
*/
void cac_send_msg();
void cac_send_msg(void);
void cac_mux_io(struct timeval *ptimeout);
int repeater_installed();
int repeater_installed(void);
int search_msg(chid chix, int reply_type);
int ca_request_event(evid monix);
void ca_busy_message(struct ioc_in_use *piiu);
@@ -491,7 +504,7 @@ void manage_conn(int silent);
void mark_server_available(struct in_addr *pnet_addr);
void flow_control(struct ioc_in_use *piiu);
int broadcast_addr(struct in_addr *pcastaddr);
void ca_repeater();
void ca_repeater(void);
void cac_recv_task(int tid);
void cac_io_done(int lock);
void ca_sg_init(void);
@@ -509,8 +522,8 @@ int post_msg(
unsigned long blockSize
);
int alloc_ioc(
struct in_addr *pnet_addr,
struct ioc_in_use **ppiiu
struct in_addr *pnet_addr,
struct ioc_in_use **ppiiu
);
unsigned long cacRingBufferWrite(
struct ca_buffer *pRing,
@@ -530,9 +543,9 @@ unsigned long cacRingBufferReadSize(
struct ca_buffer *pBuf,
int contiguous);
char *localUserName();
char *localUserName(void);
char *localHostName();
char *localHostName(void);
int create_net_chan(
struct ioc_in_use **ppiiu,
@@ -540,36 +553,37 @@ struct in_addr *pnet_addr, /* only used by TCP connections */
int net_proto
);
int ca_check_for_fp();
int ca_check_for_fp(void);
int ca_os_independent_init (void);
void freeBeaconHash(struct ca_static *ca_temp);
void removeBeaconInetAddr(struct in_addr *pnet_addr);
bhe *lookupBeaconInetAddr(struct in_addr *pnet_addr);
bhe *createBeaconHashEntry(struct in_addr *pnet_addr);
int cac_setup_recv_thread(IIU *piiu);
void close_ioc(IIU *piiu);
void notify_ca_repeater();
void cac_clean_iiu_list();
void notify_ca_repeater(void);
void cac_clean_iiu_list(void);
void ca_process_input_queue();
void cac_flush_internal();
void ca_process_input_queue(void);
void cac_flush_internal(void);
void cac_block_for_io_completion(struct timeval *pTV);
void cac_block_for_sg_completion(CASG *pcasg, struct timeval *pTV);
void os_specific_sg_create(CASG *pcasg);
void os_specific_sg_delete(CASG *pcasg);
void os_specific_sg_io_complete(CASG *pcasg);
int cac_os_depen_init(struct ca_static *pcas);
void ca_process_exit(struct ca_static *ca_temp);
int cac_add_task_variable(struct ca_static *);
void ca_spawn_repeater();
void cac_os_depen_exit (struct ca_static *pcas);
void ca_process_exit();
void ca_spawn_repeater(void);
typedef void CACVRTFUNC(void *pSrc, void *pDest, int hton, unsigned long count);
void cac_gettimeval(struct timeval *pt);
/* returns A - B in floating secs */
ca_real cac_time_diff(ca_time *pTVA, ca_time *pTVB);
/* returns A + B in integer secs & integer usec */
ca_time cac_time_sum(ca_time *pTVA, ca_time *pTVB);
void caIOBlockFree(struct ca_static *pCAC, evid pIOBlock);
void clearChannelResources(struct ca_static *pCAC, unsigned id);
void caIOBlockFree(evid pIOBlock);
void clearChannelResources(unsigned id);
/*
* !!KLUDGE!!

View File

@@ -81,6 +81,11 @@ void cac_mux_io(struct timeval *ptimeout)
timeout = *ptimeout;
do{
/*
* manage search timers and detect disconnects
*/
manage_conn(TRUE);
newInput = FALSE;
do{
count = cac_select_io(
@@ -96,10 +101,6 @@ void cac_mux_io(struct timeval *ptimeout)
ca_process_input_queue();
/*
* manage search timers and detect disconnects
*/
manage_conn(TRUE);
}
while(newInput);
@@ -171,7 +172,9 @@ int cac_add_task_variable(struct ca_static *ca_temp)
*/
int cac_os_depen_init(struct ca_static *pcas)
{
int status;
int status;
ca_static = pcas;
/*
* dont allow disconnect to terminate process
@@ -182,7 +185,22 @@ int cac_os_depen_init(struct ca_static *pcas)
*/
signal(SIGPIPE,SIG_IGN);
return ECA_NORMAL;
status = ca_os_independent_init ();
return status;
}
/*
* cac_os_depen_exit ()
*/
void cac_os_depen_exit (struct ca_static *pcas)
{
ca_static = pcas;
ca_process_exit();
ca_static = NULL;
free ((char *)pcas);
}
@@ -264,17 +282,6 @@ void ca_spawn_repeater()
}
/*
* Setup recv thread
* (OS dependent)
*/
int cac_setup_recv_thread(IIU *piiu)
{
return ECA_NORMAL;
}
/*
* ca_printf()

View File

@@ -285,7 +285,7 @@ struct in_addr *pnet_addr
ellDelete(&pend_write_list, &monix->node);
UNLOCK;
caIOBlockFree(ca_static, monix);
caIOBlockFree(monix);
break;
@@ -357,7 +357,7 @@ struct in_addr *pnet_addr
LOCK;
ellDelete(&pend_read_list, &monix->node);
UNLOCK;
caIOBlockFree(ca_static, monix);
caIOBlockFree(monix);
break;
}
@@ -389,7 +389,7 @@ struct in_addr *pnet_addr
LOCK;
ellDelete(&monix->chan->eventq, &monix->node);
UNLOCK;
caIOBlockFree(ca_static, monix);
caIOBlockFree(monix);
break;
}
@@ -493,7 +493,7 @@ struct in_addr *pnet_addr
LOCK;
ellDelete(&pend_read_list, &pIOBlock->node);
UNLOCK;
caIOBlockFree(ca_static, pIOBlock);
caIOBlockFree(pIOBlock);
break;
}
case IOC_SEARCH:
@@ -526,7 +526,7 @@ struct in_addr *pnet_addr
break;
case IOC_CLEAR_CHANNEL:
clearChannelResources (ca_static, piiu->curMsg.m_available);
clearChannelResources (piiu->curMsg.m_available);
break;
case IOC_ERROR:
@@ -623,7 +623,7 @@ struct in_addr *pnet_addr
}
if (monix) {
caIOBlockFree(ca_static, monix);
caIOBlockFree(monix);
}
LOCK;

View File

@@ -84,6 +84,11 @@ void cac_mux_io(struct timeval *ptimeout)
timeout = *ptimeout;
do{
/*
* manage search timers and detect disconnects
*/
manage_conn(TRUE);
newInput = FALSE;
do{
count = cac_select_io(
@@ -99,10 +104,6 @@ void cac_mux_io(struct timeval *ptimeout)
ca_process_input_queue();
/*
* manage search timers and detect disconnects
*/
manage_conn(TRUE);
}
while(newInput);
@@ -151,16 +152,6 @@ void cac_block_for_sg_completion(pTV)
cac_mux_io(pTV);
}
/*
* CAC_ADD_TASK_VARIABLE()
*/
int cac_add_task_variable(struct ca_static *ca_temp)
{
ca_static = ca_temp;
return ECA_NORMAL;
}
/*
@@ -168,7 +159,26 @@ int cac_add_task_variable(struct ca_static *ca_temp)
*/
int cac_os_depen_init(struct ca_static *pcas)
{
return ECA_NORMAL;
int status;
ca_static = pcas;
status = ca_os_independent_init ();
return status;
}
/*
* cac_os_depen_exit ()
*/
void cac_os_depen_exit (struct ca_static *pcas)
{
ca_static = pcas;
ca_process_exit();
ca_static = NULL;
free ((char *)pcas);
}
@@ -298,12 +308,6 @@ void ca_spawn_repeater()
}
cac_setup_recv_thread(IIU *piiu)
{
return ECA_NORMAL;
}
/*
* caHostFromInetAddr()

View File

@@ -33,13 +33,16 @@
#include <stdarg.h>
#include <callback.h>
#include "iocinf.h"
#include "remLib.h"
LOCAL void ca_repeater_task();
LOCAL void ca_task_exit_tcb(WIND_TCB *ptcb);
LOCAL void ca_extra_event_labor(void *pArg);
LOCAL int cac_os_depen_exit(struct ca_static *pcas, int tid);
LOCAL int cac_os_depen_exit_tid (struct ca_static *pcas, int tid);
LOCAL int cac_add_task_variable (struct ca_static *ca_temp);
LOCAL void deleteCallBack(CALLBACK *pcb);
#define USEC_PER_SEC 1000000
@@ -105,7 +108,7 @@ void cac_mux_io(struct timeval *ptimeout)
do{
count = cac_select_io(
&timeout,
CA_DO_SENDS);
CA_DO_SENDS | CA_DO_RECVS);
timeout.tv_usec = 0;
timeout.tv_sec = 0;
}
@@ -199,7 +202,7 @@ void cac_block_for_sg_completion(CASG *pcasg, struct timeval *pTV)
/*
* CAC_ADD_TASK_VARIABLE()
*/
int cac_add_task_variable(struct ca_static *ca_temp)
LOCAL int cac_add_task_variable (struct ca_static *ca_temp)
{
static char ca_installed;
TVIU *ptviu;
@@ -210,7 +213,7 @@ int cac_add_task_variable(struct ca_static *ca_temp)
return status;
}
# if DEBUG
# ifdef DEBUG
ca_printf("CAC: adding task variable\n");
# endif
@@ -236,7 +239,7 @@ int cac_add_task_variable(struct ca_static *ca_temp)
* taskDeleteHookAdd() if you use a task variable
* in a task exit handler.
*/
# if DEBUG
# ifdef DEBUG
ca_printf("CAC: adding delete hook\n");
# endif
@@ -251,13 +254,12 @@ int cac_add_task_variable(struct ca_static *ca_temp)
}
}
ptviu = calloc(1, sizeof(*ptviu));
ptviu = (TVIU *) calloc(1, sizeof(*ptviu));
if(!ptviu){
return ECA_INTERNAL;
}
ptviu->tid = taskIdSelf();
ellAdd(&ca_temp->ca_taskVarList, &ptviu->node);
status = taskVarAdd(VXTHISTASKID, (int *)&ca_static);
if (status != OK){
@@ -266,6 +268,7 @@ int cac_add_task_variable(struct ca_static *ca_temp)
}
ca_static = ca_temp;
ellAdd(&ca_temp->ca_taskVarList, &ptviu->node);
return ECA_NORMAL;
}
@@ -277,9 +280,10 @@ int cac_add_task_variable(struct ca_static *ca_temp)
*/
LOCAL void ca_task_exit_tcb(WIND_TCB *ptcb)
{
int status;
struct ca_static *ca_temp;
# if DEBUG
# ifdef DEBUG
ca_printf("CAC: entering the exit handler %x\n", ptcb);
# endif
@@ -297,24 +301,28 @@ LOCAL void ca_task_exit_tcb(WIND_TCB *ptcb)
}
/*
* vxWorks specific shut down
* Add CA task var for the exit handler
*/
cac_os_depen_exit(ca_temp, (int) ptcb);
if (ptcb != taskIdCurrent) {
status = taskVarAdd (VXTHISTASKID, (int *)&ca_static);
if (status == ERROR){
ca_printf ("Couldnt add task var to CA exit task\n");
return;
}
}
/*
* normal CA sut down
* normal CA shut down
*/
ca_process_exit(ca_temp);
cac_os_depen_exit_tid (ca_temp, (int) ptcb);
/*
* remove semaphores here so that ca_process_exit()
* can use them.
*/
assert(semDelete(ca_temp->ca_client_lock)==OK);
assert(semDelete(ca_temp->ca_event_lock)==OK);
assert(semDelete(ca_temp->ca_putNotifyLock)==OK);
assert(semDelete(ca_temp->ca_io_done_sem)==OK);
assert(semDelete(ca_temp->ca_blockSem)==OK);
if (ptcb != taskIdCurrent) {
status = taskVarDelete(VXTHISTASKID, (int *)&ca_static);
if (status == ERROR){
ca_printf ("Couldnt remove task var from CA exit task\n");
return;
}
}
}
@@ -345,6 +353,16 @@ int cac_os_depen_init(struct ca_static *pcas)
pcas->ca_blockSem = semBCreate(SEM_Q_PRIORITY, SEM_EMPTY);
assert(pcas->ca_blockSem);
status = cac_add_task_variable (pcas);
if (status != ECA_NORMAL) {
return status;
}
status = ca_os_independent_init ();
if (status != ECA_NORMAL){
return status;
}
evuser = (void *) db_init_events();
assert(evuser);
@@ -371,26 +389,50 @@ int cac_os_depen_init(struct ca_static *pcas)
/*
* cac_os_depen_exit()
* cac_os_depen_exit ()
*/
LOCAL int cac_os_depen_exit(struct ca_static *pcas, int tid)
void cac_os_depen_exit (struct ca_static *pcas)
{
int status;
chid chix;
evid monix;
TVIU *ptviu;
cac_os_depen_exit_tid (pcas, 0);
}
/*
* cac_os_depen_exit_tid ()
*/
LOCAL int cac_os_depen_exit_tid (struct ca_static *pcas, int tid)
{
int status;
chid chix;
evid monix;
TVIU *ptviu;
CALLBACK *pcb;
# ifdef DEBUG
ca_printf("CAC: entering the exit routine %x %x\n",
tid, pcas);
# endif
ca_static = pcas;
LOCK;
/*
* stop the socket recv task
* (only after we get the LOCK here)
*/
if(taskIdVerify(pcas->recv_tid)==OK){
taskwdRemove(pcas->recv_tid);
if (taskIdVerify (pcas->recv_tid)==OK) {
taskwdRemove (pcas->recv_tid);
/*
* dont do a task suspend if the exit handler is
* running for this task - it botches vxWorks -
*/
if(pcas->recv_tid != tid){
taskSuspend(pcas->recv_tid);
if (pcas->recv_tid != tid) {
status = taskSuspend (pcas->recv_tid);
if (status<0) {
ca_printf ("taskSuspend() error = %s\n",
strerror (MYERRNO) );
}
}
}
@@ -399,24 +441,30 @@ LOCAL int cac_os_depen_exit(struct ca_static *pcas, int tid)
* (and put call backs)
*/
chix = (chid) & pcas->ca_local_chidlist.node;
while (chix = (chid) chix->node.next){
while (chix = (chid) chix->node.next) {
while (monix = (evid) ellGet(&chix->eventq)) {
status = db_cancel_event(monix + 1);
assert(status == OK);
free(monix);
}
if(chix->ppn){
if (chix->ppn) {
CACLIENTPUTNOTIFY *ppn;
ppn = chix->ppn;
if(ppn->busy){
dbNotifyCancel(&ppn->dbPutNotify);
if (ppn->busy) {
dbNotifyCancel (&ppn->dbPutNotify);
}
free(ppn);
free (ppn);
}
}
/*
* set ca_static for access.c
* (run this before deleting the task variable)
*/
ca_process_exit();
/*
* cancel task vars for other tasks so this
* only runs once
@@ -426,7 +474,11 @@ LOCAL int cac_os_depen_exit(struct ca_static *pcas, int tid)
*
* db_close_events() does not require a CA context.
*/
while(ptviu = (TVIU *)ellGet(&pcas->ca_taskVarList)){
while (ptviu = (TVIU *)ellGet(&pcas->ca_taskVarList)) {
# ifdef DEBUG
ca_printf("CAC: removing task var %x\n", ptviu->tid);
# endif
status = taskVarDelete(
ptviu->tid,
(int *)&ca_static);
@@ -438,9 +490,15 @@ LOCAL int cac_os_depen_exit(struct ca_static *pcas, int tid)
free(ptviu);
}
if(taskIdVerify(pcas->recv_tid)==OK){
if (taskIdVerify(pcas->recv_tid)==OK) {
if(pcas->recv_tid != tid){
taskDelete(pcas->recv_tid);
pcb = (CALLBACK *) calloc(1,sizeof(*pcb));
if (pcb) {
pcb->callback = deleteCallBack;
pcb->priority = priorityHigh;
pcb->user = (void *) pcas->recv_tid;
callbackRequest (pcb);
}
}
}
@@ -459,9 +517,40 @@ LOCAL int cac_os_depen_exit(struct ca_static *pcas, int tid)
ellFree(&pcas->ca_local_chidlist);
ellFree(&pcas->ca_dbfree_ev_list);
/*
* remove semaphores here so that ca_process_exit()
* can use them.
*/
assert(semDelete(pcas->ca_client_lock)==OK);
assert(semDelete(pcas->ca_event_lock)==OK);
assert(semDelete(pcas->ca_putNotifyLock)==OK);
assert(semDelete(pcas->ca_io_done_sem)==OK);
assert(semDelete(pcas->ca_blockSem)==OK);
ca_static = NULL;
free ((char *)pcas);
return ECA_NORMAL;
}
/*
* deleteCallBack()
*/
LOCAL void deleteCallBack(CALLBACK *pcb)
{
int status;
status = taskDelete ((int)pcb->user);
if (status < 0) {
ca_printf ("CAC: tak delete at exit failed: %s\n",
strerror(errno));
}
free (pcb);
}
/*
*
@@ -535,7 +624,7 @@ int ca_import(int tid)
return ECA_NORMAL;
}
ptviu = calloc(1, sizeof(*ptviu));
ptviu = (TVIU *) calloc(1, sizeof(*ptviu));
if(!ptviu){
return ECA_ALLOCMEM;
}
@@ -601,13 +690,11 @@ int ca_import_cancel(int tid)
*/
int ca_check_for_fp()
{
{
int options;
int options;
assert(taskOptionsGet(taskIdSelf(), &options) == OK);
if (!(options & VX_FP_TASK)) {
return ECA_NEEDSFP;
}
assert(taskOptionsGet(taskIdSelf(), &options) == OK);
if (!(options & VX_FP_TASK)) {
return ECA_NEEDSFP;
}
return ECA_NORMAL;
}
@@ -629,16 +716,16 @@ void ca_spawn_repeater()
CA_REPEATER_OPT,
CA_REPEATER_STACK,
(FUNCPTR)ca_repeater_task,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL);
0,
0,
0,
0,
0,
0,
0,
0,
0,
0);
if (status < 0){
SEVCHK(ECA_NOREPEATER, NULL);
}
@@ -654,16 +741,6 @@ void ca_repeater_task()
ca_repeater();
}
/*
* Setup recv thread
* (OS dependent)
*/
int cac_setup_recv_thread(IIU *piiu)
{
return ECA_NORMAL;
}
/*
@@ -728,13 +805,13 @@ LOCAL void ca_extra_event_labor(void *pArg)
*/
status = semGive(pcas->ca_blockSem);
if(status != OK){
logMsg("CA block sem corrupted\n",
NULL,
NULL,
NULL,
NULL,
NULL,
NULL);
logMsg( "CA block sem corrupted\n",
0,
0,
0,
0,
0,
0);
}
}
@@ -760,6 +837,8 @@ void cac_recv_task(int tid)
* ca_task_exit() is called.
*/
while(TRUE){
manage_conn(TRUE);
timeout.tv_usec = 0;
timeout.tv_sec = 1;
@@ -770,8 +849,6 @@ void cac_recv_task(int tid)
CA_DO_RECVS);
ca_process_input_queue();
manage_conn(TRUE);
}
}

View File

@@ -79,7 +79,11 @@ void cac_mux_io(struct timeval *ptimeout)
timeout = *ptimeout;
do{
newInput = FALSE;
/*
* manage search timers and detect disconnects
*/
manage_conn(TRUE); newInput = FALSE;
do{
count = cac_select_io(
&timeout,
@@ -94,10 +98,6 @@ void cac_mux_io(struct timeval *ptimeout)
ca_process_input_queue();
/*
* manage search timers and detect disconnects
*/
manage_conn(TRUE);
}
while(newInput);
@@ -137,16 +137,6 @@ void cac_block_for_sg_completion(CASG *pcasg, struct timeval *pTV)
cac_mux_io(pTV);
}
/*
* CAC_ADD_TASK_VARIABLE()
*/
int cac_add_task_variable(struct ca_static *ca_temp)
{
ca_static = ca_temp;
return ECA_NORMAL;
}
/*
* cac_os_depen_init()
@@ -155,6 +145,8 @@ int cac_os_depen_init(struct ca_static *pcas)
{
int status;
ca_static = ca_temp;
/*
* dont allow disconnect to terminate process
* when running in UNIX enviroment
@@ -169,7 +161,22 @@ int cac_os_depen_init(struct ca_static *pcas)
assert (status==0);
# endif
return ECA_NORMAL;
status = ca_os_independent_init ();
return status;
}
/*
* cac_os_depen_exit ()
*/
void cac_os_depen_exit (struct ca_static *pcas)
{
ca_static = pcas;
ca_process_exit();
ca_static = NULL;
free ((char *)pcas);
}
@@ -223,16 +230,6 @@ void ca_spawn_repeater()
}
/*
* Setup recv thread
* (OS dependent)
*/
int cac_setup_recv_thread(IIU *piiu)
{
return ECA_NORMAL;
}
/*