pointer => server id

This commit is contained in:
Jeff Hill
1993-09-09 16:47:10 +00:00
parent 2a812b8d2b
commit f70fcd44f6
4 changed files with 341 additions and 175 deletions

View File

@@ -43,9 +43,10 @@
* detected- just disconnect instead
* .10 joh 050692 added new routine - cas_send_heartbeat()
* .11 joh 062492 dont allow flow control to turn off gets
* .12 joh 090893 converted pointer to server id
*/
static char *sccsId = "$Id$\t$Date$";
static char *sccsId = "$Id$";
#include <vxWorks.h>
#include <taskLib.h>
@@ -63,7 +64,7 @@ static char *sccsId = "$Id$\t$Date$";
static struct extmsg nill_msg;
#define MPTOPADDR(MP) (&((struct channel_in_use *)(MP)->m_pciu)->addr)
#define RECORD_NAME(PADDR) (((struct db_addr *)(PADDR))->precord)
LOCAL void clear_channel_reply(
@@ -110,6 +111,22 @@ struct extmsg *mp,
int mnum
);
LOCAL void event_add_action(
struct extmsg *mp,
struct client *client
);
LOCAL void logBadId(
struct client *client,
struct extmsg *mp
);
LOCAL struct channel_in_use *MPTOPCIU(
struct extmsg *mp
);
LOCAL unsigned long bucketID;
/*
* CAMESSAGE()
@@ -125,9 +142,15 @@ struct message_buffer *recv
unsigned msgsize;
unsigned bytes_left;
int status;
FAST struct extmsg *mp;
FAST struct event_ext *pevext;
struct extmsg *mp;
struct channel_in_use *pciu;
if(!pCaBucket){
pCaBucket = bucketCreate(NBBY*sizeof(mp->m_cid));
if(!pCaBucket){
return ERROR;
}
}
if (CASDEBUG > 2){
logMsg( "CAS: Parsing %d(decimal) bytes\n",
@@ -162,95 +185,7 @@ struct message_buffer *recv
break;
case IOC_EVENT_ADD:
FASTLOCK(&rsrv_free_eventq_lck);
pevext = (struct event_ext *)
ellGet(&rsrv_free_eventq);
FASTUNLOCK(&rsrv_free_eventq_lck);
if (!pevext) {
int size = db_sizeof_event_block()
+ sizeof(*pevext);
pevext =
(struct event_ext *) malloc(size);
if (!pevext) {
LOCK_CLIENT(client);
send_err(
mp,
ECA_ALLOCMEM,
client,
RECORD_NAME(MPTOPADDR(mp)));
UNLOCK_CLIENT(client);
break;
}
}
pevext->msg = *mp;
pevext->mp = &pevext->msg; /* for speed- see
* IOC_READ */
pevext->client = client;
pevext->send_lock = TRUE;
pevext->size = (mp->m_count - 1)
* dbr_value_size[mp->m_type] +
dbr_size[mp->m_type];
pevext->get = FALSE;
status = db_add_event(client->evuser,
MPTOPADDR(mp),
read_reply,
pevext,
(unsigned) ((struct monops *) mp)->m_info.m_mask,
(struct event_block *)(pevext+1));
if (status == ERROR) {
LOCK_CLIENT(client);
send_err(
mp,
ECA_ADDFAIL,
client,
RECORD_NAME(MPTOPADDR(mp)));
UNLOCK_CLIENT(client);
FASTLOCK(&rsrv_free_eventq_lck);
ellAdd((ELLLIST *)&rsrv_free_eventq, (ELLNODE *)pevext);
FASTUNLOCK(&rsrv_free_eventq_lck);
break;
}
/*
* Only add to the list if we can get enough
* memory. If not, attempts to delete it
* from the client will cause a warning message
* to be printed since it will not be found on
* the list.
*/
ellAdd( (ELLLIST *)&((struct channel_in_use *)mp->m_pciu)->eventq,
(ELLNODE *)pevext);
/*
* allways send it once at event add
*/
/*
* if the client program issues many monitors
* in a row then I recv when the send side
* of the socket would block. This prevents
* a application program initiated deadlock.
*
* However when I am reconnecting I reissue
* the monitors and I could get deadlocked.
* The client is blocked sending and the server
* task for the client is blocked sending in
* this case. I cant check the recv part of the
* socket in the client since I am still handling an
* outstanding recv ( they must be processed in order).
* I handle this problem in the server by using
* post_single_event() below instead of calling
* read_reply() in this module. This is a complete
* fix since a monitor setup is the only request
* soliciting a reply in the client which is
* issued from inside of service.c (from inside
* of the part of the ca client which services
* messages sent by the server).
*/
db_post_single_event((struct event_block *)(pevext+1));
event_add_action(mp, client);
break;
case IOC_EVENT_CANCEL:
@@ -266,15 +201,20 @@ struct message_buffer *recv
{
struct event_ext evext;
pevext = &evext;
pevext->mp = mp;
pevext->client = client;
pevext->send_lock = TRUE;
pevext->get = TRUE;
pciu = MPTOPCIU(mp);
if(!pciu){
logBadId(client, mp);
break;
}
evext.mp = mp;
evext.pciu = pciu;
evext.send_lock = TRUE;
evext.get = TRUE;
if (mp->m_count == 1)
pevext->size = dbr_size[mp->m_type];
evext.size = dbr_size[mp->m_type];
else
pevext->size = (mp->m_count - 1) *
evext.size = (mp->m_count - 1) *
dbr_value_size[mp->m_type] +
dbr_size[mp->m_type];
@@ -287,7 +227,7 @@ struct message_buffer *recv
* Hold argument set true so the send message
* buffer is not flushed once each call.
*/
read_reply(pevext, MPTOPADDR(mp), TRUE, NULL);
read_reply(&evext, &pciu->addr, TRUE, NULL);
break;
}
case IOC_SEARCH:
@@ -295,8 +235,13 @@ struct message_buffer *recv
build_reply(mp, client);
break;
case IOC_WRITE:
pciu = MPTOPCIU(mp);
if(!pciu){
logBadId(client, mp);
break;
}
status = db_put_field(
MPTOPADDR(mp),
&pciu->addr,
mp->m_type,
mp + 1,
mp->m_count
@@ -307,14 +252,14 @@ struct message_buffer *recv
mp,
ECA_PUTFAIL,
client,
RECORD_NAME(MPTOPADDR(mp)));
RECORD_NAME(pciu));
UNLOCK_CLIENT(client);
}
break;
case IOC_EVENTS_ON:
{
struct event_ext *pevext;
struct event_ext evext;
struct channel_in_use *pciu;
client->eventsoff = FALSE;
@@ -333,7 +278,7 @@ struct message_buffer *recv
evext.get = TRUE;
read_reply(
&evext,
MPTOPADDR(&pevext->msg),
&pciu->addr,
TRUE,
NULL);
pevext->modified = FALSE;
@@ -367,17 +312,8 @@ struct message_buffer *recv
* channel in use block prior to
* timeout must reconnect
*/
LOCK_CLIENT(prsrv_cast_client);
status = ellFind(
&prsrv_cast_client->addrq,
mp->m_pciu);
if(status >= 0){
ellDelete(
&prsrv_cast_client->addrq,
mp->m_pciu);
}
UNLOCK_CLIENT(prsrv_cast_client);
if(status < 0){
pciu = MPTOPCIU(mp);
if(pciu?pciu->client!=prsrv_cast_client:TRUE){
free_client(client);
logMsg("CAS: client timeout disconnect\n",
NULL,
@@ -388,10 +324,18 @@ struct message_buffer *recv
NULL);
exit(0);
}
LOCK_CLIENT(prsrv_cast_client);
ellDelete(
&prsrv_cast_client->addrq,
&pciu->node);
UNLOCK_CLIENT(prsrv_cast_client);
pciu->client = client;
LOCK_CLIENT(client);
ellAdd(&client->addrq, mp->m_pciu);
ellAdd(&client->addrq, &pciu->node);
UNLOCK_CLIENT(client);
break;
default:
logMsg("CAS: bad msg detected\n",
NULL,
@@ -432,6 +376,120 @@ struct message_buffer *recv
return OK;
}
/*
*
* event_add_action()
*
*/
LOCAL void event_add_action(
struct extmsg *mp,
struct client *client
)
{
struct channel_in_use *pciu;
struct event_ext *pevext;
int status;
pciu = MPTOPCIU(mp);
if(!pciu){
logBadId(client, mp);
return;
}
FASTLOCK(&rsrv_free_eventq_lck);
pevext = (struct event_ext *)
ellGet(&rsrv_free_eventq);
FASTUNLOCK(&rsrv_free_eventq_lck);
if (!pevext) {
int size = db_sizeof_event_block()
+ sizeof(*pevext);
pevext =
(struct event_ext *) malloc(size);
if (!pevext) {
LOCK_CLIENT(client);
send_err(
mp,
ECA_ALLOCMEM,
client,
RECORD_NAME(pciu));
UNLOCK_CLIENT(client);
return;
}
}
pevext->msg = *mp;
pevext->mp = &pevext->msg; /* for speed- see
* IOC_READ */
pevext->pciu = pciu;
pevext->send_lock = TRUE;
pevext->size = (mp->m_count - 1)
* dbr_value_size[mp->m_type] +
dbr_size[mp->m_type];
pevext->get = FALSE;
status = db_add_event(client->evuser,
&pciu->addr,
read_reply,
pevext,
(unsigned) ((struct monops *) mp)->m_info.m_mask,
(struct event_block *)(pevext+1));
if (status == ERROR) {
LOCK_CLIENT(client);
send_err(
mp,
ECA_ADDFAIL,
client,
RECORD_NAME(pciu));
UNLOCK_CLIENT(client);
FASTLOCK(&rsrv_free_eventq_lck);
ellAdd((ELLLIST *)&rsrv_free_eventq, (ELLNODE *)pevext);
FASTUNLOCK(&rsrv_free_eventq_lck);
return;
}
/*
* Only add to the list if we can get enough
* memory. If not, attempts to delete it
* from the client will cause a warning message
* to be printed since it will not be found on
* the list.
*/
ellAdd( (ELLLIST *)&pciu->eventq,
(ELLNODE *)pevext);
/*
* always send it once at event add
*/
/*
* if the client program issues many monitors
* in a row then I recv when the send side
* of the socket would block. This prevents
* a application program initiated deadlock.
*
* However when I am reconnecting I reissue
* the monitors and I could get deadlocked.
* The client is blocked sending and the server
* task for the client is blocked sending in
* this case. I cant check the recv part of the
* socket in the client since I am still handling an
* outstanding recv ( they must be processed in order).
* I handle this problem in the server by using
* post_single_event() below instead of calling
* read_reply() in this module. This is a complete
* fix since a monitor setup is the only request
* soliciting a reply in the client which is
* issued from inside of service.c (from inside
* of the part of the ca client which services
* messages sent by the server).
*/
db_post_single_event((struct event_block *)(pevext+1));
return;
}
/*
*
@@ -454,20 +512,11 @@ struct client *client
* Verify the channel
*
*/
pciu = (struct channel_in_use *) mp->m_pciu;
status = ellFind(
&client->addrq,
mp->m_pciu);
if(status < 0){
logMsg("CAS: Attempt to delete nonexistent channel ignored\n",
NULL,
NULL,
NULL,
NULL,
NULL,
NULL);
return;
}
pciu = MPTOPCIU(mp);
if(pciu?pciu->client!=client:TRUE){
logBadId(client, mp);
return;
}
while (pevext = (struct event_ext *) ellGet(&pciu->eventq)) {
@@ -497,6 +546,10 @@ struct client *client
UNLOCK_CLIENT(client);
FASTLOCK(&rsrv_free_addrq_lck);
status = bucketRemoveItem(pCaBucket, pciu->sid, pciu);
if(status != BUCKET_SUCCESS){
logBadId(client, mp);
}
ellAdd((ELLLIST *)&rsrv_free_addrq, (ELLNODE *)pciu);
FASTUNLOCK(&rsrv_free_addrq_lck);
@@ -519,11 +572,23 @@ struct extmsg *mp,
struct client *client
)
{
FAST struct extmsg *reply;
FAST struct event_ext *pevext;
ELLLIST *peventq =
&((struct channel_in_use *) mp->m_pciu)->eventq;
FAST int status;
struct extmsg *reply;
struct event_ext *pevext;
ELLLIST *peventq;
int status;
struct channel_in_use *pciu;
/*
*
* Verify the channel
*
*/
pciu = MPTOPCIU(mp);
if(pciu?pciu->client!=client:TRUE){
logBadId(client, mp);
return;
}
peventq = &pciu->eventq;
for (pevext = (struct event_ext *) peventq->node.next;
pevext;
@@ -580,11 +645,13 @@ int hold, /* more on the way if true */
void *pfl
)
{
FAST struct extmsg *mp = pevext->mp;
FAST struct client *client = pevext->client;
FAST struct extmsg *reply;
FAST int status;
FAST int strcnt;
struct extmsg *mp = pevext->mp;
struct client *client = pevext->pciu->client;
struct channel_in_use *pciu = pevext->pciu;
struct extmsg *reply;
int status;
int strcnt;
/*
* If flow control is on set modified and send for later
@@ -606,7 +673,7 @@ void *pfl
}
*reply = *mp;
reply->m_postsize = pevext->size;
reply->m_pciu = (void *) ((struct channel_in_use *) mp->m_pciu)->chid;
reply->m_cid = (unsigned long)pciu->cid;
status = db_get_field(
paddr,
mp->m_type,
@@ -683,13 +750,13 @@ struct extmsg *mp,
struct client *client
)
{
ELLLIST *addrq = &client->addrq;
FAST struct extmsg *search_reply;
FAST struct extmsg *get_reply;
FAST int status;
struct db_addr tmp_addr;
FAST struct channel_in_use *pchannel;
ELLLIST *addrq = &client->addrq;
struct extmsg *search_reply;
struct extmsg *get_reply;
int status;
struct db_addr tmp_addr;
struct channel_in_use *pchannel;
unsigned long sid;
/* Exit quickly if channel not on this node */
@@ -725,8 +792,25 @@ struct client *client
}
pchannel->ticks_at_creation = tickGet();
pchannel->addr = tmp_addr;
pchannel->chid = (void *) mp->m_pciu;
pchannel->client = client;
pchannel->cid = mp->m_cid;
/*
* allocate a server id and enter the channel pointer
* in the table
*/
FASTLOCK(&rsrv_free_addrq_lck);
sid = bucketID++;
pchannel->sid = sid;
status = bucketAddItem(pCaBucket, sid, pchannel);
FASTUNLOCK(&rsrv_free_addrq_lck);
if(status!=BUCKET_SUCCESS){
LOCK_CLIENT(client);
send_err(mp, ECA_ALLOCMEM, client, "No room for hash table");
UNLOCK_CLIENT(client);
free(pchannel);
return;
}
/*
* UDP reliability schemes rely on both msgs in same reply Therefore
@@ -761,9 +845,8 @@ struct client *client
struct event_ext evext;
evext.mp = mp + 1;
/* pchannel ukn prior to connect */
evext.mp->m_pciu = pchannel;
evext.client = client;
evext.pciu = pchannel;
evext.mp->m_cid = sid;
evext.send_lock = FALSE;
evext.size = size;
evext.get = TRUE;
@@ -794,7 +877,7 @@ struct client *client
/* this field for rmt machines where paddr invalid */
search_reply->m_type = tmp_addr.field_type;
search_reply->m_count = tmp_addr.no_elements;
search_reply->m_pciu = (void *) pchannel;
search_reply->m_cid = sid;
END_MSG(client);
UNLOCK_CLIENT(client);
@@ -846,8 +929,10 @@ struct client *client,
char *footnote
)
{
FAST struct extmsg *reply;
FAST int size;
struct channel_in_use *pciu;
int size;
struct extmsg *reply;
/*
* force string post size to be the true size rounded to even
@@ -875,15 +960,25 @@ char *footnote
case IOC_SEARCH:
case IOC_BUILD:
case IOC_WRITE:
reply->m_pciu = (void *)
((struct channel_in_use *) curp->m_pciu)->chid;
/*
*
* Verify the channel
*
*/
pciu = MPTOPCIU(curp);
if(pciu){
reply->m_cid = (unsigned long) pciu->cid;
}
else{
reply->m_cid = ~0L;
}
break;
case IOC_EVENTS_ON:
case IOC_EVENTS_OFF:
case IOC_READ_SYNC:
case IOC_SNAPSHOT:
default:
reply->m_pciu = (void *) NULL;
reply->m_cid = NULL;
break;
}
*(reply + 1) = *curp;
@@ -893,6 +988,18 @@ char *footnote
}
/*
* logBadId()
*/
LOCAL void logBadId(
struct client *client,
struct extmsg *mp
)
{
send_err(mp, ECA_INTERNAL, client, "ID lookup failed");
}
/* log_header()
*
@@ -904,12 +1011,16 @@ struct extmsg *mp,
int mnum
)
{
struct channel_in_use *pciu;
pciu = MPTOPCIU(mp);
logMsg( "CAS: N=%d cmd=%d type=%d pstsize=%d paddr=%x avail=%x\n",
mnum,
mp->m_cmmd,
mp->m_type,
mp->m_postsize,
(int)MPTOPADDR(mp),
(int)(pciu?&pciu->addr:NULL),
(int)mp->m_available);
if(mp->m_cmmd==IOC_WRITE && mp->m_type==DBF_STRING)
logMsg("CAS: The string written: %s \n",
@@ -948,3 +1059,21 @@ struct client *pc
return;
}
/*
* MPTOPCIU()
*
* used to be a macro
*/
LOCAL struct channel_in_use *MPTOPCIU(struct extmsg *mp)
{
struct channel_in_use *pciu;
FASTLOCK(&rsrv_free_addrq_lck);
pciu = bucketLookupItem(pCaBucket, mp->m_cid);
FASTUNLOCK(&rsrv_free_addrq_lck);
return pciu;
}

View File

@@ -43,7 +43,7 @@
* .11 joh 073093 added args to taskSpawn for v5.1 vxWorks
*/
static char *sccsId = "@(#)caservertask.c 1.13\t7/28/92";
static char *sccsId = "$Id$";
#include <vxWorks.h>
#include <ellLib.h>
@@ -239,11 +239,11 @@ int free_client(struct client *client)
*/
LOCAL int terminate_one_client(struct client *client)
{
FAST int servertid;
FAST int tmpsock;
FAST int status;
FAST struct event_ext *pevext;
FAST struct channel_in_use *pciu;
int servertid;
int tmpsock;
int status;
struct event_ext *pevext;
struct channel_in_use *pciu;
if (client->proto != IPPROTO_TCP) {
logMsg("CAS: non TCP client delete ignored\n",
@@ -293,6 +293,19 @@ LOCAL int terminate_one_client(struct client *client)
ellAdd((ELLLIST *)&rsrv_free_eventq, (ELLNODE *)pevext);
FASTUNLOCK(&rsrv_free_eventq_lck);
}
FASTLOCK(&rsrv_free_addrq_lck);
status = bucketRemoveItem(pCaBucket, pciu->sid, pciu);
FASTUNLOCK(&rsrv_free_addrq_lck);
if(status != BUCKET_SUCCESS){
logMsg(
"%s: Bad id=%d at close",
(int)__FILE__,
pciu->sid,
NULL,
NULL,
NULL,
NULL);
}
}
if (client->evuser) {
@@ -367,6 +380,13 @@ int client_stat(void)
ellCount(&rsrv_free_addrq),
ellCount(&rsrv_free_eventq));
if(pCaBucket){
printf( "The server's resource id conversion table:\n");
FASTLOCK(&rsrv_free_addrq_lck);
bucketShow(pCaBucket);
FASTUNLOCK(&rsrv_free_addrq_lck);
}
return ellCount(&clientQ);
}

View File

@@ -56,7 +56,7 @@
* pend which could lock up the cast server.
*/
static char *sccsId = "$Id$\t$Date$";
static char *sccsId = "@(#)cast_server.c 1.17\t8/5/93";
#include <vxWorks.h>
#include <ellLib.h>
@@ -318,6 +318,7 @@ clean_addrq(struct client *pclient)
unsigned long maxdelay = 0;
unsigned ndelete=0;
unsigned long timeout = TIMEOUT*sysClkRateGet();
int s;
current = tickGet();
@@ -337,6 +338,17 @@ clean_addrq(struct client *pclient)
if (delay > timeout) {
ellDelete((ELLLIST *)&pclient->addrq, (ELLNODE *)pciu);
FASTLOCK(&rsrv_free_addrq_lck);
s = bucketRemoveItem(pCaBucket, pciu->sid, pciu);
if(s != BUCKET_SUCCESS){
logMsg(
"%s Bad id at close",
(int)__FILE__,
NULL,
NULL,
NULL,
NULL,
NULL);
}
ellAdd((ELLLIST *)&rsrv_free_addrq, (ELLNODE *)pciu);
FASTUNLOCK(&rsrv_free_addrq_lck);
ndelete++;

View File

@@ -36,13 +36,14 @@
* .05 joh 103191 moved lock from msg buf to client structure
* .06 joh 050692 added declaration for cac_send_heartbeat()
* .07 joh 022492 added get flag to the event ext block
* .08 joh 090893 added sid field to channel in use block
*
*/
#ifndef INCLserverh
#define INCLserverh
static char *serverhSccsId = "$Id$\t$Date$";
static char *serverhSccsId = "$Id$";
#include <vxLib.h>
#include <ellLib.h>
@@ -52,6 +53,7 @@ static char *serverhSccsId = "$Id$\t$Date$";
#include <db_access.h>
#include <dbEvent.h>
#include <iocmsg.h>
#include <bucketLib.h>
struct message_buffer{
unsigned stk;
@@ -61,11 +63,11 @@ struct message_buffer{
};
struct client{
ELLNODE node;
ELLNODE node;
int sock;
int proto;
FAST_LOCK lock;
ELLLIST addrq;
ELLLIST addrq;
struct message_buffer send;
struct message_buffer recv;
struct sockaddr_in addr;
@@ -83,10 +85,12 @@ struct client{
*/
struct channel_in_use{
ELLNODE node;
struct db_addr addr;
ELLLIST eventq;
void *chid; /* the chid from the client saved here */
unsigned long ticks_at_creation; /* for UDP timeout */
struct db_addr addr;
struct client *client;
unsigned long cid; /* client id */
unsigned long sid; /* server id */
unsigned long ticks_at_creation; /* for UDP timeout */
};
@@ -98,10 +102,10 @@ struct event_ext{
ELLNODE node;
struct extmsg msg;
struct extmsg *mp; /* for speed (IOC_READ) */
struct client *client;
struct channel_in_use *pciu;
unsigned size; /* for speed */
char modified; /* mod & ev flw ctrl enbl */
char send_lock; /* lock send buffer */
unsigned size; /* for speed */
char get; /* T: get F: monitor */
};
@@ -126,6 +130,7 @@ GLBLTYPE ELLLIST rsrv_free_eventq;
GLBLTYPE FAST_LOCK rsrv_free_addrq_lck;
GLBLTYPE FAST_LOCK rsrv_free_eventq_lck;
GLBLTYPE struct client *prsrv_cast_client;
GLBLTYPE BUCKET *pCaBucket;
#define LOCK_CLIENT(CLIENT)\
FASTLOCK(&(CLIENT)->lock);