diff --git a/src/ca/access.c b/src/ca/access.c index b4f23cbcb..66161b0e6 100644 --- a/src/ca/access.c +++ b/src/ca/access.c @@ -195,9 +195,7 @@ struct ioc_in_use *piiu, unsigned extsize, struct extmsg **ppMsg ); -LOCAL int ca_add_task_variable(struct ca_static *ca_temp); #ifdef vxWorks -LOCAL void ca_task_exit_tcb(WIND_TCB *ptcb); LOCAL void ca_task_exit_tid(int tid); #else /*vxWorks*/ LOCAL void ca_process_exit(); @@ -221,7 +219,6 @@ chid chix, void *pvalue ); LOCAL void ca_put_notify_action(PUTNOTIFY *ppn); -LOCAL void ca_extra_event_labor(void *pArg); LOCAL void ca_default_exception_handler(struct exception_handler_args args); LOCAL int cac_push_msg( @@ -235,9 +232,7 @@ LOCAL void cac_wait_for_flush(IIU *piiu); #else /*__STDC__*/ LOCAL int cac_alloc_msg(); -LOCAL int ca_add_task_variable(); #ifdef vxWorks -LOCAL void ca_task_exit_tcb(); LOCAL void ca_task_exit_tid(); #else LOCAL void ca_process_exit(); @@ -250,7 +245,6 @@ LOCAL void ca_default_exception_handler(); LOCAL void create_udp_fd(); LOCAL int issue_ca_array_put(); LOCAL void ca_put_notify_action(); -LOCAL void ca_extra_event_labor(); LOCAL int cac_push_msg(); LOCAL void cac_wait_for_flush(); @@ -506,17 +500,6 @@ int ca_task_initialize() return status; } -#ifdef UNIX - /* - * dont allow disconnect to terminate process - * when running in UNIX enviroment - * - * allow error to be returned to sendto() - * instead of handling disconnect at interrupt - */ - signal(SIGPIPE,SIG_IGN); -#endif /*UNIX*/ - if (repeater_installed()==FALSE) { ca_spawn_repeater(); } @@ -526,14 +509,14 @@ int ca_task_initialize() if (!ca_temp) return ECA_ALLOCMEM; -#ifdef vxWorks - if (ca_add_task_variable(ca_temp)<0){ + /* + * os dependent + */ + status = cac_add_task_variable(ca_temp); + if(status != ECA_NORMAL){ free(ca_temp); - return ECA_ALLOCMEM; + return status; } -#else /* vxWorks */ - ca_static = ca_temp; -#endif /* vxWorks */ ca_static->ca_exception_func = ca_default_exception_handler; ca_static->ca_exception_arg = NULL; @@ -568,69 +551,16 @@ int ca_task_initialize() ellInit(&ca_static->ca_free_event_list); ellInit(&ca_static->ca_pend_read_list); ellInit(&ca_static->ca_pend_write_list); -#ifdef vxWorks - ellInit(&ca_static->ca_local_chidlist); - ellInit(&ca_static->ca_dbfree_ev_list); - ellInit(&ca_static->ca_lcl_buff_list); - ellInit(&ca_static->ca_taskVarList); - ellInit(&ca_static->ca_putNotifyQue); -#endif /* vxWorks */ + ca_static->ca_pBucket = bucketCreate(CLIENT_ID_WIDTH); assert(ca_static->ca_pBucket); -#ifdef VMS - { - status = lib$get_ef(&io_done_flag); - if (status != SS$_NORMAL) - lib$signal(status); + status = cac_os_depen_init(ca_static); + if(status != ECA_NORMAL){ + free(ca_static->ca_pUserName); + free(ca_static); + return status; } -#endif /*VMS*/ -#ifdef vxWorks - { - char name[15]; - int status; - - ca_static->ca_tid = taskIdSelf(); - - ca_static->ca_local_ticks = LOCALTICKS; - - ca_static->ca_client_lock = semMCreate(SEM_DELETE_SAFE); - assert(ca_static->ca_client_lock); - ca_static->ca_event_lock = semMCreate(SEM_DELETE_SAFE); - assert(ca_static->ca_event_lock); - ca_static->ca_putNotifyLock = semMCreate(SEM_DELETE_SAFE); - assert(ca_static->ca_putNotifyLock); - ca_static->ca_io_done_sem = semBCreate(SEM_Q_PRIORITY, SEM_EMPTY); - assert(ca_static->ca_io_done_sem); - ca_static->ca_blockSem = - semBCreate(SEM_Q_PRIORITY, SEM_EMPTY); - assert(ca_static->ca_blockSem); - - evuser = (void *) db_init_events(); - assert(evuser); - - status = db_add_extra_labor_event( - evuser, - ca_extra_event_labor, - ca_static); - assert(status==0); - strcpy(name, "EV "); - strncat( - name, - taskName(VXTHISTASKID), - sizeof(name) - strlen(name) - 1); - status = db_start_events( - evuser, - name, - ca_import, - taskIdSelf(), - -1); /* higher priority */ - assert(status == OK); - } -#endif /*vxWorks*/ -#ifdef VMS - setupConnectionTimer(); -#endif } return ECA_NORMAL; } @@ -813,84 +743,6 @@ char *pClientName; } - -/* - * CA_ADD_TASK_VARIABLE() - * - */ -#ifdef vxWorks -#ifdef __STDC__ -LOCAL int ca_add_task_variable(struct ca_static *ca_temp) -#else -LOCAL int ca_add_task_variable(ca_temp) -struct ca_static *ca_temp; -#endif -{ - static char ca_installed; - TVIU *ptviu; - int status; - -# if DEBUG - ca_printf("CAC: adding task variable\n"); -# endif - - status = taskVarGet(VXTHISTASKID, (int *)&ca_static); - if(status == OK){ - ca_printf("task variable already installed?\n"); - return ERROR; - } - - /* - * only one delete hook for all CA tasks - */ - if (vxTas(&ca_installed)) { - /* - * - * This guarantees that vxWorks's task - * variable delete (at task exit) handler runs - * after the CA task exit handler. This ensures - * that CA's task variable will still exist - * when it's exit handler runs. - * - * That is taskVarInit() must run prior to your - * taskDeleteHookAdd() if you use a task variable - * in a task exit handler. - */ -# if DEBUG - ca_printf("CAC: adding delete hook\n"); -# endif - - status = taskVarInit(); - if (status != OK) - return ERROR; - status = taskDeleteHookAdd((FUNCPTR)ca_task_exit_tcb); - if (status != OK) { - ca_printf("ca_init_task: could not add CA delete routine\n"); - return ERROR; - } - } - - ptviu = calloc(1, sizeof(*ptviu)); - if(!ptviu){ - return ERROR; - } - - ptviu->tid = taskIdSelf(); - ellAdd(&ca_temp->ca_taskVarList, &ptviu->node); - - status = taskVarAdd(VXTHISTASKID, (int *)&ca_static); - if (status != OK){ - free(ptviu); - return ERROR; - } - - ca_static = ca_temp; - - return OK; -} -#endif - - /* * CA_TASK_EXIT() @@ -915,34 +767,6 @@ int ca_task_exit } - -/* - * CA_TASK_EXIT_TCBX() - * - */ -#ifdef vxWorks -#ifdef __STDC__ -LOCAL void ca_task_exit_tcb(WIND_TCB *ptcb) -#else -LOCAL void ca_task_exit_tcb(ptcb) -WIND_TCB *ptcb; -#endif -{ -# if DEBUG - ca_printf("CAC: entering the exit handler %x\n", ptcb); -# endif - - /* - * NOTE: vxWorks provides no method at this time - * to get the task id from the ptcb so I am - * taking the liberty of using the ptcb as - * the task id - somthing which may not be true - * on future releases of vxWorks - */ - ca_task_exit_tid((int) ptcb); -} -#endif - /* * @@ -1974,81 +1798,6 @@ LOCAL void ca_put_notify_action(PUTNOTIFY *ppn) } #endif /*vxWorks*/ - -/* - * CA_EXTRA_EVENT_LABOR - */ -#ifdef vxWorks -LOCAL void ca_extra_event_labor(void *pArg) -{ - int status; - CACLIENTPUTNOTIFY *ppnb; - struct ca_static *pcas; - struct event_handler_args args; - - pcas = pArg; - - while(TRUE){ - /* - * independent lock used here in order to - * avoid any possibility of blocking - * the database (or indirectly blocking - * one client on another client). - */ - semTake(pcas->ca_putNotifyLock, WAIT_FOREVER); - ppnb = (CACLIENTPUTNOTIFY *)ellGet(&pcas->ca_putNotifyQue); - semGive(pcas->ca_putNotifyLock); - - /* - * break to loop exit - */ - if(!ppnb){ - break; - } - - /* - * setup arguments and call user's function - */ - args.usr = ppnb->caUserArg; - args.chid = ppnb->dbPutNotify.usrPvt; - args.type = ppnb->dbPutNotify.dbrType; - args.count = ppnb->dbPutNotify.nRequest; - args.dbr = NULL; - if(ppnb->dbPutNotify.status){ - if(ppnb->dbPutNotify.status == S_db_Blocked){ - args.status = ECA_PUTCBINPROG; - } - else{ - args.status = ECA_PUTFAIL; - } - } - else{ - args.status = ECA_NORMAL; - } - - LOCKEVENTS; - (*ppnb->caUserCallback) (args); - UNLOCKEVENTS; - - ppnb->busy = FALSE; - } - - /* - * wakeup the TCP thread if it is waiting for a cb to complete - */ - status = semGive(pcas->ca_blockSem); - if(status != OK){ - logMsg("CA block sem corrupted\n", - NULL, - NULL, - NULL, - NULL, - NULL, - NULL); - } - -} -#endif /*vxWorks*/ /* @@ -3329,9 +3078,6 @@ int lineno; */ if( !(ca_status & CA_M_SUCCESS) && CA_EXTRACT_SEVERITY(ca_status) != CA_K_WARNING ){ -# ifdef VMS - lib$signal(0); -# endif abort(); } @@ -3666,11 +3412,7 @@ void *arg; */ int ca_defunct() { -#ifdef VMS - SEVCHK(ECA_DEFUNCT, "you must have a VMS share image mismatch\n"); -#else SEVCHK(ECA_DEFUNCT, NULL); -#endif return ECA_DEFUNCT; } diff --git a/src/ca/iocinf.c b/src/ca/iocinf.c index f8579b044..372f9dfa8 100644 --- a/src/ca/iocinf.c +++ b/src/ca/iocinf.c @@ -85,9 +85,6 @@ LOCAL void cac_tcp_send_msg_piiu(struct ioc_in_use *piiu); LOCAL void cac_udp_send_msg_piiu(struct ioc_in_use *piiu); LOCAL void notify_ca_repeater(); LOCAL void udp_recv_msg(struct ioc_in_use *piiu); -#ifdef VMS -LOCAL void vms_recv_msg_ast(struct ioc_in_use *piiu); -#endif /*VMS*/ LOCAL void ca_process_tcp(struct ioc_in_use *piiu); LOCAL void ca_process_udp(struct ioc_in_use *piiu); LOCAL void ca_process_input_queue(); @@ -103,9 +100,6 @@ LOCAL void cac_tcp_send_msg_piiu(); LOCAL void cac_udp_send_msg_piiu(); LOCAL void notify_ca_repeater(); LOCAL void udp_recv_msg(); -#ifdef VMS -void vms_recv_msg_ast(); -#endif /*VMS*/ LOCAL void ca_process_tcp(); LOCAL void ca_process_udp(); LOCAL void ca_process_input_queue(); @@ -498,31 +492,16 @@ int net_proto; } - /* Set up recv thread for VMS */ -#if defined(VMS) - { - /* - * request to be informed of future IO - */ - status = sys$qio( - NULL, - sock, - IO$_RECEIVE, - &piiu->iosb, - vms_recv_msg_ast, - piiu, - &peek_ast_buf, - sizeof(peek_ast_buf), - MSG_PEEK, - &piiu->recvfrom, - sizeof(piiu->recvfrom), - NULL); - if(status != SS$_NORMAL){ - lib$signal(status); - exit(); - } - } -#endif + /* + * setup the recv thread + * (OS dependent) + */ + status = cac_setup_recv_thread(piiu); + if(status != ECA_NORMAL){ + free(piiu); + status = socket_close(sock); + return status; + } /* * add to the list of active IOCs @@ -828,11 +807,6 @@ int flags; unsigned long minfreespace; unsigned long freespace; - /* - * manage search timers and detect disconnects - */ - manage_conn(TRUE); - LOCK; piiu=(IIU *)iiuList.node.next; while(piiu){ @@ -927,6 +901,11 @@ int flags; } UNLOCK; + /* + * manage search timers and detect disconnects + */ + manage_conn(TRUE); + return status; } #endif @@ -1301,66 +1280,6 @@ int tid; } #endif /*vxWorks*/ - -/* - * - * VMS_RECV_MSG_AST() - * - * - */ -#ifdef VMS -#ifdef __STDC__ -LOCAL void vms_recv_msg_ast(struct ioc_in_use *piiu) -#else /*__STDC__*/ -LOCAL void vms_recv_msg_ast(piiu) -struct ioc_in_use *piiu; -#endif /*__STDC__*/ -{ - short io_status; - - io_status = piiu->iosb.status; - - if(io_status != SS$_NORMAL){ - close_ioc(piiu); - if(io_status != SS$_CANCEL) - lib$signal(io_status); - return; - } - - if(!ca_static->ca_repeater_contacted) - notify_ca_repeater(); - - if(piiu->conn_up){ - (*piiu->recvBytes)(piiu); - ca_process_input_queue(); - } - else{ - close_ioc(piiu); - return; - } - - /* - * request to be informed of future IO - */ - io_status = sys$qio( - NULL, - piiu->sock_chan, - IO$_RECEIVE, - &piiu->iosb, - vms_recv_msg_ast, - piiu, - &peek_ast_buf, - sizeof(peek_ast_buf), - MSG_PEEK, - &piiu->recvfrom, - sizeof(piiu->recvfrom), - NULL); - if(io_status != SS$_NORMAL) - lib$signal(io_status); - return; -} -#endif /*VMS*/ - /* * @@ -1754,9 +1673,6 @@ int contiguous; count = 0; } -#if 0 - printf("%d bytes available for writing\n", count); -#endif return count; } diff --git a/src/ca/iocinf.h b/src/ca/iocinf.h index 77f2b8b7d..7cf2a7417 100644 --- a/src/ca/iocinf.h +++ b/src/ca/iocinf.h @@ -58,7 +58,6 @@ static char *iocinfhSccsId = "$Id$"; /* * ANSI C includes */ -#include #include #include #include @@ -524,6 +523,7 @@ void freeBeaconHash(struct ca_static *ca_temp); void removeBeaconInetAddr(struct in_addr *pnet_addr); bhe *lookupBeaconInetAddr(struct in_addr *pnet_addr); bhe *createBeaconHashEntry(struct in_addr *pnet_addr); +int cac_setup_recv_thread(IIU *piiu); #else /*__STDC__*/ int ca_defunct(); @@ -570,6 +570,7 @@ void freeBeaconHash(); void removeBeaconInetAddr(); bhe *lookupBeaconInetAddr(); bhe *createBeaconHashEntry(); +int cac_setup_recv_thread(); #endif /*__STDC__*/ /* diff --git a/src/ca/posix_depen.c b/src/ca/posix_depen.c index 1e91e691c..79ff55c08 100644 --- a/src/ca/posix_depen.c +++ b/src/ca/posix_depen.c @@ -44,6 +44,41 @@ #include "iocinf.h" + +/* + * CAC_ADD_TASK_VARIABLE() + */ +#ifdef __STDC__ +int cac_add_task_variable(struct ca_static *ca_temp) +#else /*__STDC__*/ +int cac_add_task_variable(ca_temp) +struct ca_static *ca_temp; +#endif /*__STDC__*/ +{ + ca_static = ca_temp; + return ECA_NORMAL; +} + + +/* + * cac_os_depen_init() + */ +int cac_os_depen_init(struct ca_static *pcas) +{ + int status; + + /* + * dont allow disconnect to terminate process + * when running in UNIX enviroment + * + * allow error to be returned to sendto() + * instead of handling disconnect at interrupt + */ + signal(SIGPIPE,SIG_IGN); + + return ECA_NORMAL; +} + /* * @@ -130,3 +165,14 @@ void ca_spawn_repeater() } exit(0); } + + + +/* + * Setup recv thread + * (OS dependent) + */ +int cac_setup_recv_thread(IIU *piiu) +{ + return ECA_NORMAL; +} diff --git a/src/ca/vms_depen.c b/src/ca/vms_depen.c index 9ea8c3f8c..03dd33b53 100644 --- a/src/ca/vms_depen.c +++ b/src/ca/vms_depen.c @@ -49,7 +49,8 @@ #define CONNECTION_TIMER_ID 56 -void connectionTimer(void *astarg); +LOCAL void connectionTimer(void *astarg); +LOCAL void vms_recv_msg_ast(struct ioc_in_use *piiu); struct time{ int lval; @@ -60,17 +61,36 @@ struct time timer = {-10000000,-1}; /* - * + * CAC_ADD_TASK_VARIABLE() */ -void setupConnectionTimer() +int cac_add_task_variable(struct ca_static *ca_temp) { - struct time tmo; - int status; + ca_static = ca_temp; + return ECA_NORMAL; +} + + + +/* + * cac_os_depen_init() + */ +int cac_os_depen_init(struct ca_static *pcas) +{ + int status; + + status = lib$get_ef(&pcas->ca_io_done_flag); + if (status != SS$_NORMAL){ + lib$signal(status); + return ECA_INTERNAL; + } status = sys$setimr(NULL, &timer, connectionTimer, CONNECTION_TIMER_ID, 0); assert(status == SS$_NORMAL); + + return ECA_NORMAL } + /* * connectionTimer() @@ -207,3 +227,90 @@ void ca_spawn_repeater() } } + + +cac_setup_recv_thread(IIU *piiu) +{ + + /* + * request to be informed of future IO + */ + status = sys$qio( + NULL, + piiu->sock_chan, + IO$_RECEIVE, + &piiu->iosb, + vms_recv_msg_ast, + piiu, + &peek_ast_buf, + sizeof(peek_ast_buf), + MSG_PEEK, + &piiu->recvfrom, + sizeof(piiu->recvfrom), + NULL); + if(status != SS$_NORMAL){ + lib$signal(status); + return ECA_INTERNAL; + } + + return ECA_NORMAL; +} + + +/* + * + * VMS_RECV_MSG_AST() + * + * + */ +#ifdef __STDC__ +LOCAL void vms_recv_msg_ast(struct ioc_in_use *piiu) +#else /*__STDC__*/ +LOCAL void vms_recv_msg_ast(piiu) +struct ioc_in_use *piiu; +#endif /*__STDC__*/ +{ + short io_status; + + io_status = piiu->iosb.status; + + if(io_status != SS$_NORMAL){ + close_ioc(piiu); + if(io_status != SS$_CANCEL) + lib$signal(io_status); + return; + } + + if(!ca_static->ca_repeater_contacted) + notify_ca_repeater(); + + if(piiu->conn_up){ + (*piiu->recvBytes)(piiu); + ca_process_input_queue(); + } + else{ + close_ioc(piiu); + return; + } + + /* + * request to be informed of future IO + */ + io_status = sys$qio( + NULL, + piiu->sock_chan, + IO$_RECEIVE, + &piiu->iosb, + vms_recv_msg_ast, + piiu, + &peek_ast_buf, + sizeof(peek_ast_buf), + MSG_PEEK, + &piiu->recvfrom, + sizeof(piiu->recvfrom), + NULL); + if(io_status != SS$_NORMAL) + lib$signal(io_status); + return; +} + diff --git a/src/ca/vxWorks_depen.c b/src/ca/vxWorks_depen.c index ad7ab6ab1..ad2061dfa 100644 --- a/src/ca/vxWorks_depen.c +++ b/src/ca/vxWorks_depen.c @@ -34,7 +34,152 @@ #include "iocinf.h" #include "remLib.h" -void ca_repeater_task(); +LOCAL void ca_repeater_task(); +LOCAL void ca_task_exit_tcb(WIND_TCB *ptcb); +LOCAL void ca_extra_event_labor(void *pArg); + + +/* + * CAC_ADD_TASK_VARIABLE() + */ +int cac_add_task_variable(struct ca_static *ca_temp) +{ + static char ca_installed; + TVIU *ptviu; + int status; + +# if DEBUG + ca_printf("CAC: adding task variable\n"); +# endif + + status = taskVarGet(VXTHISTASKID, (int *)&ca_static); + if(status == OK){ + ca_printf("task variable already installed?\n"); + return ECA_INTERNAL; + } + + /* + * only one delete hook for all CA tasks + */ + if (vxTas(&ca_installed)) { + /* + * + * This guarantees that vxWorks's task + * variable delete (at task exit) handler runs + * after the CA task exit handler. This ensures + * that CA's task variable will still exist + * when it's exit handler runs. + * + * That is taskVarInit() must run prior to your + * taskDeleteHookAdd() if you use a task variable + * in a task exit handler. + */ +# if DEBUG + ca_printf("CAC: adding delete hook\n"); +# endif + + status = taskVarInit(); + if (status != OK) + return ECA_INTERNAL; + status = taskDeleteHookAdd((FUNCPTR)ca_task_exit_tcb); + if (status != OK) { + ca_printf("ca_init_task: could not add CA delete routine\n" +); + return ECA_INTERNAL; + } + } + + ptviu = calloc(1, sizeof(*ptviu)); + if(!ptviu){ + return ECA_INTERNAL; + } + + ptviu->tid = taskIdSelf(); + ellAdd(&ca_temp->ca_taskVarList, &ptviu->node); + + status = taskVarAdd(VXTHISTASKID, (int *)&ca_static); + if (status != OK){ + free(ptviu); + return ECA_INTERNAL; + } + + ca_static = ca_temp; + + return ECA_NORMAL; +} + + +/* + * CA_TASK_EXIT_TCBX() + * + */ +LOCAL void ca_task_exit_tcb(WIND_TCB *ptcb) +{ +# if DEBUG + ca_printf("CAC: entering the exit handler %x\n", ptcb); +# endif + + /* + * NOTE: vxWorks provides no method at this time + * to get the task id from the ptcb so I am + * taking the liberty of using the ptcb as + * the task id - somthing which may not be true + * on future releases of vxWorks + */ + ca_task_exit_tid((int) ptcb); +} + + +/* + * cac_os_depen_init() + */ +int cac_os_depen_init(struct ca_static *pcas) +{ + char name[15]; + int status; + + ellInit(&pcas->ca_local_chidlist); + ellInit(&pcas->ca_dbfree_ev_list); + ellInit(&pcas->ca_lcl_buff_list); + ellInit(&pcas->ca_taskVarList); + ellInit(&pcas->ca_putNotifyQue); + + pcas->ca_tid = taskIdSelf(); + pcas->ca_local_ticks = LOCALTICKS; + pcas->ca_client_lock = semMCreate(SEM_DELETE_SAFE); + assert(pcas->ca_client_lock); + pcas->ca_event_lock = semMCreate(SEM_DELETE_SAFE); + assert(pcas->ca_event_lock); + pcas->ca_putNotifyLock = semMCreate(SEM_DELETE_SAFE); + assert(pcas->ca_putNotifyLock); + pcas->ca_io_done_sem = semBCreate(SEM_Q_PRIORITY, SEM_EMPTY); + assert(pcas->ca_io_done_sem); + pcas->ca_blockSem = semBCreate(SEM_Q_PRIORITY, SEM_EMPTY); + assert(pcas->ca_blockSem); + + evuser = (void *) db_init_events(); + assert(evuser); + + status = db_add_extra_labor_event( + evuser, + ca_extra_event_labor, + pcas); + assert(status==0); + strcpy(name, "EV "); + strncat( + name, + taskName(VXTHISTASKID), + sizeof(name) - strlen(name) - 1); + status = db_start_events( + evuser, + name, + ca_import, + taskIdSelf(), + -1); /* higher priority */ + assert(status == OK); + + return ECA_NORMAL; +} /* @@ -245,3 +390,88 @@ void ca_repeater_task() ca_repeater(); } + +/* + * Setup recv thread + * (OS dependent) + */ +int cac_setup_recv_thread(IIU *piiu) +{ + return ECA_NORMAL; +} + + + +/* + * CA_EXTRA_EVENT_LABOR + */ +LOCAL void ca_extra_event_labor(void *pArg) +{ + int status; + CACLIENTPUTNOTIFY *ppnb; + struct ca_static *pcas; + struct event_handler_args args; + + pcas = pArg; + + while(TRUE){ + /* + * independent lock used here in order to + * avoid any possibility of blocking + * the database (or indirectly blocking + * one client on another client). + */ + semTake(pcas->ca_putNotifyLock, WAIT_FOREVER); + ppnb = (CACLIENTPUTNOTIFY *)ellGet(&pcas->ca_putNotifyQue); + semGive(pcas->ca_putNotifyLock); + + /* + * break to loop exit + */ + if(!ppnb){ + break; + } + + /* + * setup arguments and call user's function + */ + args.usr = ppnb->caUserArg; + args.chid = ppnb->dbPutNotify.usrPvt; + args.type = ppnb->dbPutNotify.dbrType; + args.count = ppnb->dbPutNotify.nRequest; + args.dbr = NULL; + if(ppnb->dbPutNotify.status){ + if(ppnb->dbPutNotify.status == S_db_Blocked){ + args.status = ECA_PUTCBINPROG; + } + else{ + args.status = ECA_PUTFAIL; + } + } + else{ + args.status = ECA_NORMAL; + } + + LOCKEVENTS; + (*ppnb->caUserCallback) (args); + UNLOCKEVENTS; + + ppnb->busy = FALSE; + } + + /* + * wakeup the TCP thread if it is waiting for a cb to complete + */ + status = semGive(pcas->ca_blockSem); + if(status != OK){ + logMsg("CA block sem corrupted\n", + NULL, + NULL, + NULL, + NULL, + NULL, + NULL); + } + +} +