o fixed deadlock occurring when access security callback blocks for

unresponsive client
o made data conversion more standalone in the client library
This commit is contained in:
Jeff Hill
2006-11-18 00:29:04 +00:00
parent a6d8cf0cec
commit f3f56fb2bc
5 changed files with 404 additions and 342 deletions

View File

@@ -580,35 +580,27 @@ LOCAL void read_reply ( void *pArg, struct dbAddr *paddr,
}
}
else {
# ifdef CONVERSION_REQUIRED
ca_uint32_t msgSize = pevext->size;
int cacStatus = caNetConvert (
pevext->msg.m_dataType, pPayload, pPayload,
TRUE /* host -> net format */, pevext->msg.m_count );
if ( cacStatus == ECA_NORMAL ) {
/*
* assert() is safe here because the type was
* checked by db_get_field()
*/
if ( pevext->msg.m_dataType >= NELEMENTS (cac_dbr_cvrt) ) {
memset ( pPayload, 0, pevext->size );
cas_set_header_cid ( pClient, ECA_GETFAIL );
* force string message size to be the true size rounded to even
* boundary
*/
if ( pevext->msg.m_dataType == DBR_STRING
&& pevext->msg.m_count == 1 ) {
/* add 1 so that the string terminator will be shipped */
strcnt = strlen ( (char *) pPayload ) + 1;
msgSize = strcnt;
}
else {
/* use type as index into conversion jumptable */
( *cac_dbr_cvrt[pevext->msg.m_dataType] )
( pPayload, pPayload, TRUE /* host -> net format */,
pevext->msg.m_count );
}
# endif
/*
* force string message size to be the true size rounded to even
* boundary
*/
if ( pevext->msg.m_dataType == DBR_STRING
&& pevext->msg.m_count == 1 ) {
/* add 1 so that the string terminator will be shipped */
strcnt = strlen ( (char *) pPayload ) + 1;
cas_commit_msg ( pClient, strcnt );
}
else {
cas_commit_msg ( pClient, pevext->size );
}
memset ( pPayload, 0, msgSize );
cas_set_header_cid ( pClient, cacStatus );
}
cas_commit_msg ( pClient, msgSize );
}
/*
@@ -644,12 +636,11 @@ LOCAL int read_action ( caHdrLargeArray *mp, void *pPayloadIn, struct client *pC
SEND_LOCK ( pClient );
# ifdef CONVERSION_REQUIRED
if ( mp->m_dataType >= NELEMENTS ( cac_dbr_cvrt ) ) {
send_err ( mp, ECA_BADTYPE, pClient, RECORD_NAME ( &pciu->addr ) );
SEND_UNLOCK ( pClient );
}
# endif
if ( INVALID_DB_REQ ( mp->m_dataType ) ) {
send_err ( mp, ECA_BADTYPE, pClient, RECORD_NAME ( &pciu->addr ) );
SEND_UNLOCK ( pClient );
return RSRV_ERROR;
}
payloadSize = dbr_size_n ( mp->m_dataType, mp->m_count );
status = cas_copy_in_header ( pClient, mp->m_cmmd, payloadSize,
@@ -687,12 +678,15 @@ LOCAL int read_action ( caHdrLargeArray *mp, void *pPayloadIn, struct client *pC
return RSRV_OK;
}
# ifdef CONVERSION_REQUIRED
/* use type as index into conversion jumptable */
(* cac_dbr_cvrt[mp->m_dataType])
( pPayload, pPayload, TRUE /* host -> net format */,
mp->m_count );
# endif
status = caNetConvert (
mp->m_dataType, pPayload, pPayload,
TRUE /* host -> net format */, mp->m_count );
if ( status != ECA_NORMAL ) {
send_err ( mp, status, pClient, RECORD_NAME ( &pciu->addr ) );
SEND_UNLOCK ( pClient );
return RSRV_OK;
}
/*
* force string message size to be the true size rounded to even
* boundary
@@ -781,27 +775,21 @@ LOCAL int write_action ( caHdrLargeArray *mp,
return RSRV_OK;
}
#ifdef CONVERSION_REQUIRED
if (mp->m_dataType >= NELEMENTS(cac_dbr_cvrt)) {
status = caNetConvert (
mp->m_dataType, pPayload, pPayload,
FALSE /* net -> host format */, mp->m_count );
if ( status != ECA_NORMAL ) {
log_header ("invalid data type", client, mp, pPayload, 0);
SEND_LOCK(client);
send_err(
mp,
ECA_PUTFAIL,
status,
client,
RECORD_NAME(&pciu->addr));
SEND_UNLOCK(client);
return RSRV_ERROR;
}
/* use type as index into conversion jumptable */
(* cac_dbr_cvrt[mp->m_dataType])
( pPayload,
pPayload,
FALSE, /* net -> host format */
mp->m_count);
#endif
asWritePvt = asTrapWriteBefore ( pciu->asClientPVT,
pciu->client->pUserName ? pciu->client->pUserName : "",
pciu->client->pHostName ? pciu->client->pHostName : "",
@@ -834,11 +822,28 @@ LOCAL int write_action ( caHdrLargeArray *mp,
LOCAL int host_name_action ( caHdrLargeArray *mp, void *pPayload,
struct client *client )
{
struct channel_in_use *pciu;
unsigned size;
char *pName;
char *pMalloc;
int status;
int chanCount;
epicsMutexMustLock ( client->chanListLock );
chanCount =
ellCount ( &client->chanList ) +
ellCount ( &client->chanPendingUpdateARList );
epicsMutexUnlock( client->chanListLock );
if ( chanCount != 0 ) {
SEND_LOCK ( client );
send_err(
mp,
ECA_INTERNAL,
client,
"attempts to use protocol to set host name "
"after creating first channel ignored by server" );
SEND_UNLOCK ( client );
return RSRV_OK;
}
pName = (char *) pPayload;
size = strlen(pName)+1;
@@ -877,39 +882,14 @@ LOCAL int host_name_action ( caHdrLargeArray *mp, void *pPayload,
size-1);
pMalloc[size-1]='\0';
epicsMutexMustLock(client->addrqLock);
pName = client->pHostName;
client->pHostName = pMalloc;
if(pName){
free(pName);
if ( pName ) {
free ( pName );
}
pciu = (struct channel_in_use *) client->addrq.node.next;
while(pciu){
status = asChangeClient(
pciu->asClientPVT,
asDbGetAsl ( &pciu->addr ),
client->pUserName ? client->pUserName : 0,
client->pHostName ? client->pHostName : 0 );
if(status != 0 && status != S_asLib_asNotActive){
epicsMutexUnlock(client->addrqLock);
log_header ("unable to install new host name into access security",
client, mp, pPayload, 0);
SEND_LOCK(client);
send_err(
mp,
ECA_INTERNAL,
client,
"unable to install new host name into access security");
SEND_UNLOCK(client);
return RSRV_ERROR;
}
pciu = (struct channel_in_use *) pciu->node.next;
}
epicsMutexUnlock(client->addrqLock);
DLOG(2, ( "CAS: host_name_action for \"%s\"\n",
client->pHostName ? client->pHostName : 0 ) );
DLOG (2, ( "CAS: host_name_action for \"%s\"\n",
client->pHostName ? client->pHostName : "" ) );
return RSRV_OK;
}
@@ -921,11 +901,28 @@ LOCAL int host_name_action ( caHdrLargeArray *mp, void *pPayload,
LOCAL int client_name_action ( caHdrLargeArray *mp, void *pPayload,
struct client *client )
{
struct channel_in_use *pciu;
unsigned size;
char *pName;
char *pMalloc;
int status;
int chanCount;
epicsMutexMustLock ( client->chanListLock );
chanCount =
ellCount ( &client->chanList ) +
ellCount ( &client->chanPendingUpdateARList );
epicsMutexUnlock( client->chanListLock );
if ( chanCount != 0 ) {
SEND_LOCK ( client );
send_err(
mp,
ECA_INTERNAL,
client,
"attempts to use protocol to set user name "
"after creating first channel ignored by server" );
SEND_UNLOCK ( client );
return RSRV_OK;
}
pName = (char *) pPayload;
size = strlen(pName)+1;
@@ -964,40 +961,12 @@ LOCAL int client_name_action ( caHdrLargeArray *mp, void *pPayload,
size-1);
pMalloc[size-1]='\0';
epicsMutexMustLock(client->addrqLock);
pName = client->pUserName;
client->pUserName = pMalloc;
if ( pName ) {
free ( pName );
}
pciu = (struct channel_in_use *) client->addrq.node.next;
while(pciu){
status = asChangeClient(
pciu->asClientPVT,
asDbGetAsl(&pciu->addr),
client->pUserName ? client->pUserName : "",
client->pHostName ? client->pHostName : "");
if(status != 0 && status != S_asLib_asNotActive){
epicsMutexUnlock(client->addrqLock);
log_header ("unable to install new user name into access security",
client, mp, pPayload, 0);
SEND_LOCK(client);
send_err(
mp,
ECA_INTERNAL,
client,
"unable to install new user name into access security");
SEND_UNLOCK(client);
return RSRV_ERROR;
}
pciu = (struct channel_in_use *) pciu->node.next;
}
epicsMutexUnlock(client->addrqLock);
DLOG (2, ( "CAS: client_name_action for \"%s\"\n",
client->pUserName ? client->pUserName : "" ) );
return RSRV_OK;
}
@@ -1070,17 +1039,88 @@ unsigned cid
return NULL;
}
epicsMutexMustLock(client->addrqLock);
ellAdd(&client->addrq, &pchannel->node);
epicsMutexUnlock(client->addrqLock);
epicsMutexMustLock( client->chanListLock );
pchannel->state = rsrvCS_inService;
ellAdd ( &client->chanList, &pchannel->node );
epicsMutexUnlock ( client->chanListLock );
return pchannel;
}
/*
* casAccessRightsCB()
*
* If access right state changes then inform the client.
*
*/
LOCAL void casAccessRightsCB(ASCLIENTPVT ascpvt, asClientStatus type)
{
struct client * pclient;
struct channel_in_use * pciu;
struct event_ext * pevext;
pciu = asGetClientPvt(ascpvt);
assert(pciu);
pclient = pciu->client;
assert(pclient);
if(pclient == prsrv_cast_client){
return;
}
switch(type)
{
case asClientCOAR:
{
unsigned sigReq = 0;
epicsMutexMustLock ( pclient->chanListLock );
if ( pciu->state == rsrvCS_inService ) {
ellDelete ( &pclient->chanList, &pciu->node );
pciu->state = rsrvCS_inServiceUpdatePendAR;
ellAdd ( &pclient->chanPendingUpdateARList, &pciu->node );
sigReq = 1;
}
epicsMutexUnlock ( pclient->chanListLock );
/*
* Update all event call backs
*/
epicsMutexMustLock(pclient->eventqLock);
for (pevext = (struct event_ext *) ellFirst(&pciu->eventq);
pevext;
pevext = (struct event_ext *) ellNext(&pevext->node)){
int readAccess = asCheckGet(pciu->asClientPVT);
if(pevext->pdbev && !readAccess){
db_post_single_event(pevext->pdbev);
db_event_disable(pevext->pdbev);
}
else if(pevext->pdbev && readAccess){
db_event_enable(pevext->pdbev);
db_post_single_event(pevext->pdbev);
}
}
epicsMutexUnlock(pclient->eventqLock);
if ( sigReq ) {
db_post_extra_labor( pclient->evuser );
}
break;
}
default:
break;
}
}
/*
* access_rights_reply()
*/
LOCAL void access_rights_reply ( struct channel_in_use *pciu )
LOCAL void access_rights_reply ( struct channel_in_use * pciu )
{
unsigned ar;
int v41;
@@ -1105,76 +1145,18 @@ LOCAL void access_rights_reply ( struct channel_in_use *pciu )
}
SEND_LOCK ( pciu->client );
status = cas_copy_in_header ( pciu->client, CA_PROTO_ACCESS_RIGHTS, 0,
status = cas_copy_in_header (
pciu->client, CA_PROTO_ACCESS_RIGHTS, 0,
0, 0, pciu->cid, ar, 0 );
/*
* OK to just ignore the request if the connection drops
*/
if ( status != ECA_NORMAL ) {
return;
if ( status == ECA_NORMAL ) {
cas_commit_msg ( pciu->client, 0u );
}
cas_commit_msg ( pciu->client, 0u );
SEND_UNLOCK ( pciu->client );
}
/*
* casAccessRightsCB()
*
* If access right state changes then inform the client.
*
*/
LOCAL void casAccessRightsCB(ASCLIENTPVT ascpvt, asClientStatus type)
{
struct client *pclient;
struct channel_in_use *pciu;
struct event_ext *pevext;
pciu = asGetClientPvt(ascpvt);
assert(pciu);
pclient = pciu->client;
assert(pclient);
if(pclient == prsrv_cast_client){
return;
}
switch(type)
{
case asClientCOAR:
access_rights_reply(pciu);
/*
* Update all event call backs
*/
epicsMutexMustLock(pclient->eventqLock);
for (pevext = (struct event_ext *) ellFirst(&pciu->eventq);
pevext;
pevext = (struct event_ext *) ellNext(&pevext->node)){
int readAccess;
readAccess = asCheckGet(pciu->asClientPVT);
if(pevext->pdbev && !readAccess){
db_post_single_event(pevext->pdbev);
db_event_disable(pevext->pdbev);
}
else if(pevext->pdbev && readAccess){
db_event_enable(pevext->pdbev);
db_post_single_event(pevext->pdbev);
}
}
epicsMutexUnlock(pclient->eventqLock);
break;
default:
break;
}
}
/*
* claim_ciu_action()
*/
@@ -1232,7 +1214,7 @@ LOCAL int claim_ciu_action ( caHdrLargeArray *mp,
}
}
else {
epicsMutexMustLock(prsrv_cast_client->addrqLock);
epicsMutexMustLock(prsrv_cast_client->chanListLock);
/*
* clients which dont claim their
* channel in use block prior to
@@ -1242,7 +1224,7 @@ LOCAL int claim_ciu_action ( caHdrLargeArray *mp,
if(!pciu){
errlogPrintf("CAS: client timeout disconnect id=%d\n",
mp->m_cid);
epicsMutexUnlock(prsrv_cast_client->addrqLock);
epicsMutexUnlock(prsrv_cast_client->chanListLock);
SEND_LOCK(client);
send_err(
mp,
@@ -1260,7 +1242,7 @@ LOCAL int claim_ciu_action ( caHdrLargeArray *mp,
if (pciu->client!=prsrv_cast_client) {
errlogPrintf("CAS: duplicate claim disconnect id=%d\n",
mp->m_cid);
epicsMutexUnlock(prsrv_cast_client->addrqLock);
epicsMutexUnlock(prsrv_cast_client->chanListLock);
SEND_LOCK(client);
send_err(
mp,
@@ -1278,14 +1260,15 @@ LOCAL int claim_ciu_action ( caHdrLargeArray *mp,
* who is claiming it
*/
ellDelete(
&prsrv_cast_client->addrq,
&prsrv_cast_client->chanList,
&pciu->node);
epicsMutexUnlock(prsrv_cast_client->addrqLock);
epicsMutexUnlock(prsrv_cast_client->chanListLock);
epicsMutexMustLock(prsrv_cast_client->addrqLock);
epicsMutexMustLock(client->chanListLock);
pciu->state = rsrvCS_inService;
pciu->client = client;
ellAdd(&client->addrq, &pciu->node);
epicsMutexUnlock(prsrv_cast_client->addrqLock);
ellAdd(&client->chanList, &pciu->node);
epicsMutexUnlock(client->chanListLock);
}
/*
@@ -1413,11 +1396,8 @@ LOCAL void write_notify_call_back(putNotify *ppn)
* write_notify_reply()
* (called by the CA server event task via the extra labor interface)
*/
void write_notify_reply ( void * pArg )
LOCAL void write_notify_reply ( struct client * pClient )
{
struct client * pClient = pArg;
SEND_LOCK(pClient);
while(TRUE){
caHdrLargeArray msgtmp;
void * asWritePvtTmp;
@@ -1463,6 +1443,7 @@ void write_notify_reply ( void * pArg )
* the channel id field is being abused to carry
* status here
*/
SEND_LOCK(pClient);
localStatus = cas_copy_in_header (
pClient, CA_PROTO_WRITE_NOTIFY,
0u, msgtmp.m_dataType, msgtmp.m_count, status,
@@ -1473,23 +1454,56 @@ void write_notify_reply ( void * pArg )
* Indicates corruption
*/
errlogPrintf("CA server corrupted - put call back(s) discarded\n");
SEND_UNLOCK ( pClient );
break;
}
/* commit the message */
cas_commit_msg ( pClient, 0u );
SEND_UNLOCK ( pClient );
}
cas_send_bs_msg ( pClient, FALSE );
SEND_UNLOCK ( pClient );
/*
* wakeup the TCP thread if it is waiting for a cb to complete
*/
epicsEventSignal ( pClient->blockSem );
}
/*
* sendAllUpdateAS()
*/
LOCAL void sendAllUpdateAS ( struct client *client )
{
struct channel_in_use *pciu;
epicsMutexMustLock ( client->chanListLock );
pciu = ( struct channel_in_use * )
ellGet ( & client->chanPendingUpdateARList );
while ( pciu ) {
access_rights_reply ( pciu );
pciu->state = rsrvCS_inService;
ellAdd ( & client->chanList, &pciu->node );
pciu = ( struct channel_in_use * )
ellGet ( & client->chanPendingUpdateARList );
}
epicsMutexUnlock( client->chanListLock );
}
/*
* rsrv_extra_labor()
* (called by the CA server event task via the extra labor interface)
*/
void rsrv_extra_labor ( void * pArg )
{
struct client * pClient = pArg;
write_notify_reply ( pClient );
sendAllUpdateAS ( pClient );
cas_send_bs_msg ( pClient, TRUE );
}
/*
* putNotifyErrorReply
*/
@@ -1741,16 +1755,16 @@ LOCAL int write_notify_action ( caHdrLargeArray *mp, void *pPayload,
pciu->pPutNotify->onExtraLaborQueue = FALSE;
pciu->pPutNotify->msg = *mp;
pciu->pPutNotify->dbPutNotify.nRequest = mp->m_count;
#ifdef CONVERSION_REQUIRED
/* use type as index into conversion jumptable */
(* cac_dbr_cvrt[mp->m_dataType])
( pPayload,
pciu->pPutNotify->dbPutNotify.pbuffer,
FALSE, /* net -> host format */
mp->m_count);
#else
memcpy(pciu->pPutNotify->dbPutNotify.pbuffer, pPayload, size);
#endif
status = caNetConvert (
mp->m_dataType, pPayload, pciu->pPutNotify->dbPutNotify.pbuffer,
FALSE /* net -> host format */, mp->m_count );
if ( status != ECA_NORMAL ) {
log_header ("invalid data type", client, mp, pPayload, 0);
putNotifyErrorReply ( client, mp, status );
return RSRV_ERROR;
}
status = dbPutNotifyMapType(&pciu->pPutNotify->dbPutNotify, mp->m_dataType);
if(status){
putNotifyErrorReply (client, mp, ECA_PUTFAIL);
@@ -1939,9 +1953,22 @@ LOCAL int clear_channel_reply ( caHdrLargeArray *mp,
cas_commit_msg ( client, 0u );
SEND_UNLOCK(client);
epicsMutexMustLock(client->addrqLock);
ellDelete(&client->addrq, &pciu->node);
epicsMutexUnlock(client->addrqLock);
epicsMutexMustLock ( client->chanListLock );
if ( pciu->state == rsrvCS_inService ) {
ellDelete ( &client->chanList, &pciu->node );
}
else if ( pciu->state == rsrvCS_inServiceUpdatePendAR ) {
ellDelete ( &client->chanPendingUpdateARList, &pciu->node );
}
else {
epicsMutexUnlock( client->chanListLock );
SEND_LOCK(client);
send_err(mp, ECA_INTERNAL, client,
"channel was in strange state or corrupted during cleanup");
SEND_UNLOCK(client);
return RSRV_ERROR;
}
epicsMutexUnlock( client->chanListLock );
/*
* remove from access control list
@@ -1950,16 +1977,20 @@ LOCAL int clear_channel_reply ( caHdrLargeArray *mp,
assert(status == 0 || status == S_asLib_asNotActive);
if(status != 0 && status != S_asLib_asNotActive){
errMessage(status, RECORD_NAME(&pciu->addr));
return RSRV_ERROR;
}
LOCK_CLIENTQ;
status = bucketRemoveItemUnsignedId (pCaBucket, &pciu->sid);
if(status != S_bucket_success){
UNLOCK_CLIENTQ;
errMessage (status, "Bad resource id during channel clear");
logBadId ( client, mp, pPayload );
return RSRV_ERROR;
}
rsrvChannelCount--;
UNLOCK_CLIENTQ;
freeListFree(rsrvChanFreeList, pciu);
return RSRV_OK;

View File

@@ -299,17 +299,54 @@ int epicsShareAPI rsrv_init (void)
return RSRV_OK;
}
LOCAL unsigned countChanListBytes (
struct client *client, ELLLIST * pList )
{
struct channel_in_use * pciu;
unsigned bytes_reserved = 0;
epicsMutexMustLock ( client->chanListLock );
pciu = ( struct channel_in_use * ) pList->node.next;
while ( pciu ) {
bytes_reserved += sizeof(struct channel_in_use);
bytes_reserved += sizeof(struct event_ext)*ellCount( &pciu->eventq );
bytes_reserved += rsrvSizeOfPutNotify ( pciu->pPutNotify );
pciu = ( struct channel_in_use * ) ellNext( &pciu->node );
}
epicsMutexUnlock ( client->chanListLock );
return bytes_reserved;
}
LOCAL void showChanList (
struct client * client, ELLLIST * pList )
{
unsigned i = 0u;
struct channel_in_use * pciu;
epicsMutexMustLock ( client->chanListLock );
pciu = (struct channel_in_use *) pList->node.next;
while ( pciu ){
printf( "\t%s(%d%c%c)",
pciu->addr.precord->name,
ellCount ( &pciu->eventq ),
asCheckGet ( pciu->asClientPVT ) ? 'r': '-',
rsrvCheckPut ( pciu ) ? 'w': '-' );
pciu = ( struct channel_in_use * ) ellNext ( &pciu->node );
if( ++i % 3u == 0u ) {
printf ( "\n" );
}
}
epicsMutexUnlock ( client->chanListLock );
}
/*
* log_one_client ()
*/
LOCAL void log_one_client (struct client *client, unsigned level)
{
int i;
struct channel_in_use *pciu;
char *pproto;
double send_delay;
double recv_delay;
unsigned bytes_reserved;
char *state[] = {"up", "down"};
epicsTimeStamp current;
char clientHostName[256];
@@ -337,9 +374,10 @@ LOCAL void log_one_client (struct client *client, unsigned level)
client->pUserName ? client->pUserName : "",
CA_MAJOR_PROTOCOL_REVISION,
client->minor_version_number,
ellCount(&client->addrq),
ellCount(&client->chanList) +
ellCount(&client->chanPendingUpdateARList),
client->priority );
if (level>=1) {
if ( level >= 1 ) {
printf ("\tTask Id=%p, Socket FD=%d\n",
(void *) client->tid, client->sock);
printf(
@@ -356,47 +394,26 @@ LOCAL void log_one_client (struct client *client, unsigned level)
client->recv.type == mbtLargeTCP ? " jumbo-recv-buf" : "");
}
if (level>=2u) {
bytes_reserved = 0;
if ( level >= 2u ) {
unsigned bytes_reserved = 0;
bytes_reserved += sizeof(struct client);
epicsMutexMustLock(client->addrqLock);
pciu = (struct channel_in_use *) client->addrq.node.next;
while (pciu){
bytes_reserved += sizeof(struct channel_in_use);
bytes_reserved += sizeof(struct event_ext)*ellCount(&pciu->eventq);
bytes_reserved += rsrvSizeOfPutNotify ( pciu->pPutNotify );
pciu = (struct channel_in_use *) ellNext(&pciu->node);
}
epicsMutexUnlock(client->addrqLock);
bytes_reserved += countChanListBytes (
client, & client->chanList );
bytes_reserved += countChanListBytes (
client, & client->chanPendingUpdateARList );
printf( "\t%d bytes allocated\n", bytes_reserved);
epicsMutexMustLock(client->addrqLock);
pciu = (struct channel_in_use *) client->addrq.node.next;
i=0;
while (pciu){
printf( "\t%s(%d%c%c)",
pciu->addr.precord->name,
ellCount(&pciu->eventq),
asCheckGet(pciu->asClientPVT)?'r':'-',
rsrvCheckPut(pciu)?'w':'-');
pciu = (struct channel_in_use *) ellNext(&pciu->node);
if( ++i % 3 == 0){
printf("\n");
}
}
epicsMutexUnlock(client->addrqLock);
showChanList ( client, & client->chanList );
showChanList ( client, & client->chanPendingUpdateARList );
printf("\n");
}
if (level >= 3u) {
if ( level >= 3u ) {
printf( "\tSend Lock\n");
epicsMutexShow(client->lock,1);
printf( "\tPut Notify Lock\n");
epicsMutexShow (client->putNotifyLock,1);
printf( "\tAddress Queue Lock\n");
epicsMutexShow (client->addrqLock,1);
epicsMutexShow (client->chanListLock,1);
printf( "\tEvent Queue Lock\n");
epicsMutexShow (client->eventqLock,1);
printf( "\tBlock Semaphore\n");
@@ -428,9 +445,7 @@ void epicsShareAPI casr (unsigned level)
printf("No clients connected.\n");
}
while (client) {
log_one_client(client, level);
client = (struct client *) ellNext(&client->node);
}
UNLOCK_CLIENTQ
@@ -535,8 +550,8 @@ void destroy_client ( struct client *client )
epicsMutexDestroy ( client->eventqLock );
}
if ( client->addrqLock ) {
epicsMutexDestroy ( client->addrqLock );
if ( client->chanListLock ) {
epicsMutexDestroy ( client->chanListLock );
}
if ( client->putNotifyLock ) {
@@ -562,10 +577,64 @@ void destroy_client ( struct client *client )
freeListFree ( rsrvClientFreeList, client );
}
LOCAL void destroyAllChannels (
struct client * client, ELLLIST * pList )
{
if ( !client->chanListLock || !client->eventqLock ) {
return;
}
while ( TRUE ) {
struct event_ext *pevext;
int status;
struct channel_in_use *pciu;
epicsMutexMustLock ( client->chanListLock );
pciu = (struct channel_in_use *) ellGet ( pList );
epicsMutexUnlock ( client->chanListLock );
if ( ! pciu ) {
break;
}
while ( TRUE ) {
/*
* AS state change could be using this list
*/
epicsMutexMustLock ( client->eventqLock );
pevext = (struct event_ext *) ellGet ( &pciu->eventq );
epicsMutexUnlock ( client->eventqLock );
if ( ! pevext ) {
break;
}
if ( pevext->pdbev ) {
db_cancel_event (pevext->pdbev);
}
freeListFree (rsrvEventFreeList, pevext);
}
rsrvFreePutNotify ( client, pciu->pPutNotify );
LOCK_CLIENTQ;
status = bucketRemoveItemUnsignedId ( pCaBucket, &pciu->sid);
rsrvChannelCount--;
UNLOCK_CLIENTQ;
if ( status != S_bucket_success ) {
errPrintf ( status, __FILE__, __LINE__,
"Bad id=%d at close", pciu->sid);
}
status = asRemoveClient(&pciu->asClientPVT);
if ( status && status != S_asLib_asNotActive ) {
printf ( "bad asRemoveClient() status was %x \n", status );
errPrintf ( status, __FILE__, __LINE__, "asRemoveClient" );
}
freeListFree ( rsrvChanFreeList, pciu );
}
}
void destroy_tcp_client ( struct client *client )
{
struct event_ext *pevext;
struct channel_in_use *pciu;
int status;
if ( CASDEBUG > 0 ) {
@@ -592,51 +661,8 @@ void destroy_tcp_client ( struct client *client )
assert ( ! status );
}
if ( client->addrqLock && client->eventqLock ) {
while ( TRUE ) {
epicsMutexMustLock ( client->addrqLock );
pciu = (struct channel_in_use *) ellGet ( & client->addrq );
epicsMutexUnlock ( client->addrqLock );
if ( ! pciu ) {
break;
}
while ( TRUE ) {
/*
* AS state change could be using this list
*/
epicsMutexMustLock ( client->eventqLock );
pevext = (struct event_ext *) ellGet ( &pciu->eventq );
epicsMutexUnlock ( client->eventqLock );
if ( ! pevext ) {
break;
}
if ( pevext->pdbev ) {
db_cancel_event (pevext->pdbev);
}
freeListFree (rsrvEventFreeList, pevext);
}
rsrvFreePutNotify ( client, pciu->pPutNotify );
LOCK_CLIENTQ;
status = bucketRemoveItemUnsignedId ( pCaBucket, &pciu->sid);
rsrvChannelCount--;
UNLOCK_CLIENTQ;
if ( status != S_bucket_success ) {
errPrintf ( status, __FILE__, __LINE__,
"Bad id=%d at close", pciu->sid);
}
status = asRemoveClient(&pciu->asClientPVT);
if ( status && status != S_asLib_asNotActive ) {
printf ( "bad asRemoveClient() status was %x \n", status );
errPrintf ( status, __FILE__, __LINE__, "asRemoveClient" );
}
freeListFree ( rsrvChanFreeList, pciu );
}
}
destroyAllChannels ( client, & client->chanList );
destroyAllChannels ( client, & client->chanPendingUpdateARList );
if ( client->evuser ) {
db_close_events (client->evuser);
@@ -679,18 +705,19 @@ struct client * create_client ( SOCKET sock, int proto )
client->blockSem = epicsEventCreate ( epicsEventEmpty );
client->lock = epicsMutexCreate();
client->putNotifyLock = epicsMutexCreate();
client->addrqLock = epicsMutexCreate();
client->chanListLock = epicsMutexCreate();
client->eventqLock = epicsMutexCreate();
if ( ! client->blockSem || ! client->lock || ! client->putNotifyLock ||
! client->addrqLock || ! client->eventqLock ) {
! client->chanListLock || ! client->eventqLock ) {
destroy_client ( client );
return NULL;
}
client->pUserName = NULL;
client->pHostName = NULL;
ellInit ( &client->addrq );
ellInit ( &client->putNotifyQue );
ellInit ( & client->chanList );
ellInit ( & client->chanPendingUpdateARList );
ellInit ( & client->putNotifyQue );
memset ( (char *)&client->addr, 0, sizeof (client->addr) );
client->tid = 0;
@@ -861,7 +888,7 @@ struct client *create_tcp_client ( SOCKET sock )
return NULL;
}
status = db_add_extra_labor_event (client->evuser, write_notify_reply, client);
status = db_add_extra_labor_event ( client->evuser, rsrv_extra_labor, client );
if (status != DB_EVENT_OK) {
errlogPrintf("CAS: unable to setup the event facility\n");
destroy_tcp_client (client);

View File

@@ -57,20 +57,20 @@
*/
LOCAL void clean_addrq()
{
struct channel_in_use *pciu;
struct channel_in_use *pnextciu;
epicsTimeStamp current;
double delay;
double maxdelay = 0;
unsigned ndelete=0;
double timeout = TIMEOUT;
int s;
struct channel_in_use * pciu;
struct channel_in_use * pnextciu;
epicsTimeStamp current;
double delay;
double maxdelay = 0;
unsigned ndelete=0;
double timeout = TIMEOUT;
int s;
epicsTimeGetCurrent(&current);
epicsTimeGetCurrent ( &current );
epicsMutexMustLock(prsrv_cast_client->addrqLock);
epicsMutexMustLock ( prsrv_cast_client->chanListLock );
pnextciu = (struct channel_in_use *)
prsrv_cast_client->addrq.node.next;
prsrv_cast_client->chanList.node.next;
while( (pciu = pnextciu) ) {
pnextciu = (struct channel_in_use *)pciu->node.next;
@@ -78,7 +78,7 @@ LOCAL void clean_addrq()
delay = epicsTimeDiffInSeconds(&current,&pciu->time_at_creation);
if (delay > timeout) {
ellDelete(&prsrv_cast_client->addrq, &pciu->node);
ellDelete(&prsrv_cast_client->chanList, &pciu->node);
LOCK_CLIENTQ;
s = bucketRemoveItemUnsignedId (
pCaBucket,
@@ -86,14 +86,18 @@ LOCAL void clean_addrq()
if(s){
errMessage (s, "Bad id at close");
}
rsrvChannelCount--;
else {
rsrvChannelCount--;
}
UNLOCK_CLIENTQ;
freeListFree(rsrvChanFreeList, pciu);
ndelete++;
if ( ! s ) {
freeListFree(rsrvChanFreeList, pciu);
ndelete++;
}
if(delay>maxdelay) maxdelay = delay;
}
}
epicsMutexUnlock(prsrv_cast_client->addrqLock);
epicsMutexUnlock ( prsrv_cast_client->chanListLock );
# ifdef DEBUG
if(ndelete){
@@ -262,7 +266,7 @@ void cast_server(void *pParm)
}
if (CASDEBUG>2)
count = ellCount (&prsrv_cast_client->addrq);
count = ellCount (&prsrv_cast_client->chanList);
status = camessage ( prsrv_cast_client );
if(status == RSRV_OK){
@@ -285,10 +289,10 @@ void cast_server(void *pParm)
}
if (CASDEBUG>2) {
if ( ellCount (&prsrv_cast_client->addrq) ) {
if ( ellCount (&prsrv_cast_client->chanList) ) {
errlogPrintf ("CAS: Fnd %d name matches (%d tot)\n",
ellCount(&prsrv_cast_client->addrq)-count,
ellCount(&prsrv_cast_client->addrq));
ellCount(&prsrv_cast_client->chanList)-count,
ellCount(&prsrv_cast_client->chanList));
}
}
}

View File

@@ -114,6 +114,21 @@ void rsrv_online_notify_task(void *pParm)
errlogPrintf ("CAS: online socket set up error\n");
epicsThreadSuspendSelf ();
}
{
/*
* this connect is to supress a warning message on Linux
* when we shutdown the read side of the socket. If it
* fails (and it will on old ip kernels) we just ignore
* the failure.
*/
osiSockAddr sockAddr;
sockAddr.ia.sin_family = AF_UNSPEC;
sockAddr.ia.sin_port = htons ( 0 );
sockAddr.ia.sin_addr.s_addr = htonl (0);
connect ( sock, & sockAddr.sa, sizeof ( sockAddr.sa ) );
shutdown ( sock, SHUT_RD );
}
memset((char *)&msg, 0, sizeof msg);
msg.m_cmmd = htons (CA_PROTO_RSRV_IS_UP);
@@ -261,28 +276,6 @@ void rsrv_online_notify_task(void *pParm)
pNode = (osiSockAddrNode *) pNode->node.next;
}
{
/*
* Connect to INADDR_NONE because a UDP connect to AF_UNSPEC
* only works with modern IP kernel.
* INADDR_NONE can never be a source address and therefore no
* messages can be received.
*/
osiSockAddr sockAddr;
memset ( &sockAddr, 0, sizeof ( sockAddr ) );
sockAddr.ia.sin_family = AF_INET;
sockAddr.ia.sin_addr.s_addr = INADDR_NONE;
sockAddr.ia.sin_port = htons ( port );
status = connect ( sock,
& sockAddr.sa, sizeof ( sockAddr.sa ) );
if ( status < 0 ) {
char sockErrBuf[64];
epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
errlogPrintf ( "%s: CA beacon socket disconnect error was \"%s\"\n",
__FILE__, sockErrBuf );
}
}
epicsThreadSleep(delay);
if (delay<maxdelay) {
delay *= 2.0;

View File

@@ -80,9 +80,10 @@ typedef struct client {
struct message_buffer recv;
epicsMutexId lock;
epicsMutexId putNotifyLock;
epicsMutexId addrqLock;
epicsMutexId chanListLock;
epicsMutexId eventqLock;
ELLLIST addrq;
ELLLIST chanList;
ELLLIST chanPendingUpdateARList;
ELLLIST putNotifyQue;
struct sockaddr_in addr;
epicsTimeStamp time_at_last_send;
@@ -101,9 +102,14 @@ typedef struct client {
char disconnect; /* disconnect detected */
} client;
enum rsrvChanState {
rsrvCS_invalid,
rsrvCS_inService,
rsrvCS_inServiceUpdatePendAR };
/*
* per channel structure
* (stored in addrq off of a client block)
* (stored in chanList or chanPendingUpdateARList off of a client block)
*/
struct channel_in_use {
ELLNODE node;
@@ -115,6 +121,7 @@ struct channel_in_use {
epicsTimeStamp time_at_creation; /* for UDP timeout */
struct dbAddr addr;
ASCLIENTPVT asClientPVT;
enum rsrvChanState state;
};
/*
@@ -188,7 +195,7 @@ struct client *create_tcp_client ( SOCKET sock );
void destroy_tcp_client ( struct client * );
void casAttachThreadToClient ( struct client * );
int camessage ( struct client *client );
void write_notify_reply ( void *pArg );
void rsrv_extra_labor ( void * pArg );
int rsrvCheckPut ( const struct channel_in_use *pciu );
int rsrv_version_reply ( struct client *client );
void rsrvFreePutNotify ( struct client *pClient,