diff --git a/src/rsrv/camessage.c b/src/rsrv/camessage.c index ddf8fb481..7503f660e 100644 --- a/src/rsrv/camessage.c +++ b/src/rsrv/camessage.c @@ -62,9 +62,6 @@ typedef struct rsrv_put_notify { ELLNODE node; putNotify dbPutNotify; caHdrLargeArray msg; - unsigned valueSize; /* size of block pointed to by dbPutNotify */ - char busy; /* put notify in progress */ - char onExtraLaborQueue; /* * Include a union of all scalar types * including fixed length strings so @@ -83,6 +80,10 @@ typedef struct rsrv_put_notify { dbr_long_t longval; dbr_double_t doubleval; } dbrScalarValue; + void * asWritePvt; + unsigned valueSize; /* size of block pointed to by dbPutNotify */ + char busy; /* put notify in progress */ + char onExtraLaborQueue; } RSRVPUTNOTIFY; /* @@ -813,6 +814,7 @@ LOCAL int write_action ( caHdrLargeArray *mp, mp->m_count); asTrapWriteAfter(asWritePvt); + if (dbStatus < 0) { SEND_LOCK(client); send_err( @@ -1370,23 +1372,8 @@ LOCAL int claim_ciu_action ( caHdrLargeArray *mp, */ LOCAL void write_notify_call_back(putNotify *ppn) { - struct client *pclient; - struct channel_in_use *pciu; - - /* - * we choose to suspend the task if there - * is an internal failure - */ - pciu = (struct channel_in_use *) ppn->usrPvt; - assert(pciu); - assert(pciu->pPutNotify); - - if(!pciu->pPutNotify->busy){ - errlogPrintf("Double DB put notify call back!!\n"); - return; - } - - pclient = pciu->client; + struct channel_in_use * pciu = (struct channel_in_use *) ppn->usrPvt; + struct client * pClient; /* * independent lock used here in order to @@ -1394,33 +1381,42 @@ LOCAL void write_notify_call_back(putNotify *ppn) * the database (or indirectly blocking * one client on another client). */ - epicsMutexMustLock(pclient->putNotifyLock); - ellAdd(&pclient->putNotifyQue, &pciu->pPutNotify->node); + assert(pciu); + assert(pciu->pPutNotify); + pClient = pciu->client; + epicsMutexMustLock(pClient->putNotifyLock); + + if ( ! pciu->pPutNotify->busy || pciu->pPutNotify->onExtraLaborQueue ) { + epicsMutexUnlock(pClient->putNotifyLock); + errlogPrintf("Double DB put notify call back!!\n"); + return; + } + + ellAdd(&pClient->putNotifyQue, &pciu->pPutNotify->node); pciu->pPutNotify->onExtraLaborQueue = TRUE; - epicsMutexUnlock(pclient->putNotifyLock); + + epicsMutexUnlock(pClient->putNotifyLock); /* * offload the labor for this to the * event task so that we never block * the db or another client. */ - db_post_extra_labor(pclient->evuser); + db_post_extra_labor(pClient->evuser); } /* * write_notify_reply() - * * (called by the CA server event task via the extra labor interface) */ -void write_notify_reply(void *pArg) +void write_notify_reply ( void * pArg ) { - RSRVPUTNOTIFY *ppnb; - struct client *pClient; - - pClient = pArg; + struct client * pClient = pArg; SEND_LOCK(pClient); while(TRUE){ + caHdrLargeArray msgtmp; + void * asWritePvtTmp; ca_uint32_t status; int localStatus; @@ -1431,34 +1427,42 @@ void write_notify_reply(void *pArg) * one client on another client). */ epicsMutexMustLock(pClient->putNotifyLock); - ppnb = (RSRVPUTNOTIFY *)ellGet(&pClient->putNotifyQue); - if ( ppnb ) { + { + RSRVPUTNOTIFY * ppnb = (RSRVPUTNOTIFY *) + ellGet ( &pClient->putNotifyQue ); + if ( ! ppnb ) { + epicsMutexUnlock(pClient->putNotifyLock); + break; + } + /* + * + * Map from DB status to CA status + * + */ + if ( ppnb->dbPutNotify.status != putNotifyOK ) { + status = ECA_PUTFAIL; + } + else{ + status = ECA_NORMAL; + } + msgtmp = ppnb->msg; + asWritePvtTmp = ppnb->asWritePvt; + ppnb->asWritePvt = 0; ppnb->onExtraLaborQueue = FALSE; + ppnb->busy = FALSE; } epicsMutexUnlock(pClient->putNotifyLock); - /* - * break to loop exit - */ - if(!ppnb){ - break; - } + + asTrapWriteAfter ( asWritePvtTmp ); /* - * - * Map from DB status to CA status - * * the channel id field is being abused to carry * status here */ - if(ppnb->dbPutNotify.status != putNotifyOK){ - status = ECA_PUTFAIL; - } - else{ - status = ECA_NORMAL; - } - localStatus = cas_copy_in_header ( pClient, CA_PROTO_WRITE_NOTIFY, - 0u, ppnb->msg.m_dataType, ppnb->msg.m_count, status, - ppnb->msg.m_available, 0 ); + localStatus = cas_copy_in_header ( + pClient, CA_PROTO_WRITE_NOTIFY, + 0u, msgtmp.m_dataType, msgtmp.m_count, status, + msgtmp.m_available, 0 ); if ( localStatus != ECA_NORMAL ) { /* * inability to aquire buffer space @@ -1470,7 +1474,6 @@ void write_notify_reply(void *pArg) /* commit the message */ cas_commit_msg ( pClient, 0u ); - ppnb->busy = FALSE; } cas_send_bs_msg ( pClient, FALSE ); @@ -1595,10 +1598,18 @@ void rsrvFreePutNotify ( client *pClient, struct rsrv_put_notify *pNotify ) { if ( pNotify ) { + char busyTmp; + void * asWritePvtTmp = 0; + + epicsMutexMustLock ( pClient->putNotifyLock ); + busyTmp = pNotify->busy; + epicsMutexUnlock ( pClient->putNotifyLock ); + /* - * if the put notify is outstanding then cancel it + * if any possiblity that the put notify is + * outstanding then cancel it */ - if ( pNotify->busy ) { + if ( busyTmp ) { dbNotifyCancel ( &pNotify->dbPutNotify ); } @@ -1607,8 +1618,15 @@ void rsrvFreePutNotify ( client *pClient, ellDelete ( &pClient->putNotifyQue, &pNotify->node ); } + busyTmp = pNotify->busy; + asWritePvtTmp = pNotify->asWritePvt; + pNotify->asWritePvt = 0; epicsMutexUnlock ( pClient->putNotifyLock ); + if ( busyTmp ) { + asTrapWriteAfter ( asWritePvtTmp ); + } + if ( pNotify->valueSize > sizeof(pNotify->dbrScalarValue) ) { free ( pNotify->dbPutNotify.pbuffer ); @@ -1651,17 +1669,48 @@ LOCAL int write_notify_action ( caHdrLargeArray *mp, void *pPayload, /* * serialize concurrent put notifies */ + epicsMutexMustLock(client->putNotifyLock); while(pciu->pPutNotify->busy){ + epicsMutexUnlock(client->putNotifyLock); status = epicsEventWaitWithTimeout(client->blockSem,60.0); - if(status != epicsEventWaitOK && pciu->pPutNotify->busy){ - log_header("put call back time out", client, - mp, pPayload, 0); - dbNotifyCancel(&pciu->pPutNotify->dbPutNotify); - pciu->pPutNotify->busy = FALSE; - putNotifyErrorReply (client, mp, ECA_PUTCBINPROG); - return RSRV_OK; + if ( status != epicsEventWaitOK ) { + char busyTmp; + void * asWritePvtTmp = 0; + + epicsMutexMustLock(client->putNotifyLock); + busyTmp = pciu->pPutNotify->busy; + epicsMutexUnlock(client->putNotifyLock); + + /* + * if any possibility of put notify still running + * then cancel it + */ + if ( busyTmp ) { + dbNotifyCancel(&pciu->pPutNotify->dbPutNotify); + } + epicsMutexMustLock(client->putNotifyLock); + busyTmp = pciu->pPutNotify->busy; + if ( busyTmp ) { + if ( pciu->pPutNotify->onExtraLaborQueue ) { + ellDelete ( &client->putNotifyQue, + &pciu->pPutNotify->node ); + } + pciu->pPutNotify->busy = FALSE; + asWritePvtTmp = pciu->pPutNotify->asWritePvt; + pciu->pPutNotify->asWritePvt = 0; + } + epicsMutexUnlock(client->putNotifyLock); + + if ( busyTmp ) { + log_header("put call back time out", client, + &pciu->pPutNotify->msg, pciu->pPutNotify->dbPutNotify.pbuffer, 0); + asTrapWriteAfter ( asWritePvtTmp ); + putNotifyErrorReply (client, &pciu->pPutNotify->msg, ECA_PUTCBINPROG); + } } + epicsMutexMustLock(client->putNotifyLock); } + epicsMutexUnlock(client->putNotifyLock); } else { pciu->pPutNotify = rsrvAllocPutNotify ( pciu ); @@ -1705,7 +1754,14 @@ LOCAL int write_notify_action ( caHdrLargeArray *mp, void *pPayload, return RSRV_OK; } + pciu->pPutNotify->asWritePvt = asTrapWriteBefore ( + pciu->asClientPVT, + pciu->client->pUserName ? pciu->client->pUserName : "", + pciu->client->pHostName ? pciu->client->pHostName : "", + (void *) &pciu->addr ); + dbPutNotify(&pciu->pPutNotify->dbPutNotify); + return RSRV_OK; }