Initial revision

This commit is contained in:
Jeff Hill
1991-06-25 13:01:41 +00:00
parent 49c339455c
commit 553e949496
4 changed files with 1388 additions and 0 deletions

708
src/rsrv/camessage.c Normal file
View File

@@ -0,0 +1,708 @@
/* @(#)camessage.c
* $Id$
* Author: Jeffrey O. Hill
* hill@luke.lanl.gov
* (505) 665 1831
* Date: 5-88
*
* Experimental Physics and Industrial Control System (EPICS)
*
* Copyright 1991, the Regents of the University of California,
* and the University of Chicago Board of Governors.
*
* This software was produced under U.S. Government contracts:
* (W-7405-ENG-36) at the Los Alamos National Laboratory,
* and (W-31-109-ENG-38) at Argonne National Laboratory.
*
* Initial development by:
* The Controls and Automation Group (AT-8)
* Ground Test Accelerator
* Accelerator Technology Division
* Los Alamos National Laboratory
*
* Co-developed with
* The Controls and Computing Group
* Accelerator Systems Division
* Advanced Photon Source
* Argonne National Laboratory
*
* Modification Log:
* -----------------
* .01 joh 012290 placed break in ca_cancel_event search so entire
* list is not searched.
* .02 joh 011091 added missing break to error message send
* chix select switch
*/
#include <vxWorks.h>
#include <taskLib.h>
#include <types.h>
#include <in.h>
#include <db_access.h>
#include <task_params.h>
#include <server.h>
#include <dblib.h>
#include <caerr.h>
static struct extmsg nill_msg;
#define MPTOPADDR(MP) (&((struct channel_in_use *)(MP)->m_pciu)->addr)
#define RECORD_NAME(PADDR) (((struct db_addr *)(PADDR))->precord)
void search_reply();
void build_reply();
void read_reply();
void read_sync_reply();
void event_cancel_reply();
void clear_channel_reply();
void send_err();
void log_header();
/*
* CAMESSSAGE()
*
*
*/
camessage(client, recv)
struct client *client;
struct message_buffer *recv;
{
int nmsg = 0;
unsigned msgsize;
unsigned bytes_left;
int status;
FAST struct extmsg *mp;
FAST struct event_ext *pevext;
if (MPDEBUG == 1)
logMsg("Parsing %d(decimal) bytes\n", recv->cnt);
bytes_left = recv->cnt;
while (bytes_left) {
if(bytes_left < sizeof(*mp))
return OK;
mp = (struct extmsg *) &recv->buf[recv->stk];
msgsize = mp->m_postsize + sizeof(*mp);
if(msgsize > bytes_left)
return OK;
nmsg++;
if (MPDEBUG == 1)
log_header(mp, nmsg);
switch (mp->m_cmmd) {
case IOC_NOOP: /* verify TCP */
break;
case IOC_EVENT_ADD:
FASTLOCK(&rsrv_free_eventq_lck);
pevext = (struct event_ext *)
lstGet(&rsrv_free_eventq);
FASTUNLOCK(&rsrv_free_eventq_lck);
if (!pevext) {
int size = db_sizeof_event_block()
+ sizeof(*pevext);
pevext =
(struct event_ext *) malloc(size);
if (!pevext) {
LOCK_SEND(client);
send_err(
mp,
ECA_ALLOCMEM,
client,
RECORD_NAME(MPTOPADDR(mp)));
UNLOCK_SEND(client);
break;
}
}
pevext->msg = *mp;
pevext->mp = &pevext->msg; /* for speed- see
* IOC_READ */
pevext->client = client;
pevext->send_lock = TRUE;
pevext->size = (mp->m_count - 1)
* dbr_value_size[mp->m_type] +
dbr_size[mp->m_type];
lstAdd( &((struct channel_in_use *) mp->m_pciu)->eventq,
pevext);
status = db_add_event(client->evuser,
MPTOPADDR(mp),
read_reply,
pevext,
(unsigned) ((struct monops *) mp)->m_info.m_mask,
pevext + 1);
if (status == ERROR) {
LOCK_SEND(client);
send_err(
mp,
ECA_ADDFAIL,
client,
RECORD_NAME(MPTOPADDR(mp)));
UNLOCK_SEND(client);
}
/*
* allways send it once at event add
*
* Hold argument is supplied true so the send message
* buffer is not flushed once each call.
*/
read_reply(pevext, MPTOPADDR(mp), TRUE, NULL);
break;
case IOC_EVENT_CANCEL:
event_cancel_reply(mp, client);
break;
case IOC_CLEAR_CHANNEL:
clear_channel_reply(mp, client);
break;
case IOC_READ_NOTIFY:
case IOC_READ:
{
struct event_ext evext;
pevext = &evext;
pevext->mp = mp;
pevext->client = client;
pevext->send_lock = TRUE;
if (mp->m_count == 1)
pevext->size = dbr_size[mp->m_type];
else
pevext->size = (mp->m_count - 1) *
dbr_value_size[mp->m_type] +
dbr_size[mp->m_type];
/*
* Arguments to this routine organized in
* favor of the standard db event calling
* mechanism- routine(userarg, paddr). See
* events added above.
*
* Hold argument set true so the send message
* buffer is not flushed once each call.
*/
read_reply(pevext, MPTOPADDR(mp), TRUE, NULL);
break;
}
case IOC_SEARCH:
case IOC_BUILD:
build_reply(mp, client);
break;
case IOC_WRITE:
status = db_put_field(
MPTOPADDR(mp),
mp->m_type,
mp + 1,
mp->m_count
);
if (status < 0) {
LOCK_SEND(client);
send_err(
mp,
ECA_PUTFAIL,
client,
RECORD_NAME(MPTOPADDR(mp)));
UNLOCK_SEND(client);
}
break;
case IOC_EVENTS_ON:
{
struct channel_in_use *pciu =
(struct channel_in_use *) & client->addrq;
client->eventsoff = FALSE;
while (pciu = (struct channel_in_use *)
pciu->node.next) {
pevext = (struct event_ext *)
& pciu->eventq;
while (pevext = (struct event_ext *)
pevext->node.next){
if (pevext->modified) {
read_reply(
pevext,
MPTOPADDR(&pevext->msg),
TRUE,
NULL);
pevext->modified = FALSE;
}
}
}
break;
}
case IOC_EVENTS_OFF:
client->eventsoff = TRUE;
break;
case IOC_READ_SYNC:
read_sync_reply(mp, client);
break;
default:
log_header(mp, nmsg);
LOCK_SEND(client);
send_err(mp, ECA_INTERNAL, client, "Invalid Msg");
UNLOCK_SEND(client);
return ERROR;
}
recv->stk += msgsize;
bytes_left = recv->cnt - recv->stk;
}
return OK;
}
/*
*
* clear_channel_reply()
*
*
*/
static void
clear_channel_reply(mp, client)
FAST struct extmsg *mp;
struct client *client;
{
FAST struct extmsg *reply;
FAST struct event_ext *pevext;
FAST int status;
struct channel_in_use *pciu = (struct channel_in_use *) mp->m_pciu;
LIST *peventq = &pciu->eventq;
for (pevext = (struct event_ext *) peventq->node.next;
pevext;
pevext = (struct event_ext *) pevext->node.next) {
status = db_cancel_event(pevext + 1);
if (status == ERROR)
taskSuspend(0);
lstDelete(peventq, pevext);
FASTLOCK(&rsrv_free_eventq_lck);
lstAdd(&rsrv_free_eventq, pevext);
FASTUNLOCK(&rsrv_free_eventq_lck);
}
/*
* send delete confirmed message
*/
LOCK_SEND(client);
reply = (struct extmsg *) ALLOC_MSG(client, 0);
if (!reply) {
UNLOCK_SEND(client);
taskSuspend(0);
}
*reply = *mp;
END_MSG(client);
UNLOCK_SEND(client);
lstDelete(&client->addrq, pciu);
FASTLOCK(&rsrv_free_addrq_lck);
lstAdd(&rsrv_free_addrq, pciu);
FASTUNLOCK(&rsrv_free_addrq_lck);
return;
}
/*
*
* event_cancel_reply()
*
*
* Much more efficient now since the event blocks hang off the channel in use
* blocks not all together off the client block.
*/
static void
event_cancel_reply(mp, client)
FAST struct extmsg *mp;
struct client *client;
{
FAST struct extmsg *reply;
FAST struct event_ext *pevext;
FAST int status;
LIST *peventq =
&((struct channel_in_use *) mp->m_pciu)->eventq;
for (pevext = (struct event_ext *) peventq->node.next;
pevext;
pevext = (struct event_ext *) pevext->node.next)
if (pevext->msg.m_available == mp->m_available) {
status = db_cancel_event(pevext + 1);
if (status == ERROR)
taskSuspend(0);
lstDelete(peventq, pevext);
/*
* send delete confirmed message
*/
LOCK_SEND(client);
reply = (struct extmsg *) ALLOC_MSG(client, 0);
if (!reply) {
UNLOCK_SEND(client);
taskSuspend(0);
}
*reply = pevext->msg;
reply->m_postsize = 0;
END_MSG(client);
UNLOCK_SEND(client);
FASTLOCK(&rsrv_free_eventq_lck);
lstAdd(&rsrv_free_eventq, pevext);
FASTUNLOCK(&rsrv_free_eventq_lck);
return;
}
/*
* Not Found- return an error message
*/
LOCK_SEND(client);
send_err(mp, ECA_BADMONID, client, NULL);
UNLOCK_SEND(client);
return;
}
/*
*
* read_reply()
*
*
*/
static void
read_reply(pevext, paddr, hold, pfl)
FAST struct event_ext *pevext;
FAST struct db_addr *paddr;
int hold; /* more on the way if true */
db_field_log *pfl;
{
FAST struct extmsg *mp = pevext->mp;
FAST struct client *client = pevext->client;
FAST struct extmsg *reply;
FAST int status;
FAST int strcnt;
/*
* If flow control is on set modified and send for later
*/
if (client->eventsoff) {
pevext->modified = TRUE;
return;
}
if (pevext->send_lock)
LOCK_SEND(client);
reply = (struct extmsg *) ALLOC_MSG(client, pevext->size);
if (!reply) {
send_err(mp, ECA_TOLARGE, client, RECORD_NAME(paddr));
if (pevext->send_lock)
UNLOCK_SEND(client);
return;
}
*reply = *mp;
reply->m_postsize = pevext->size;
reply->m_pciu = (void *) ((struct channel_in_use *) mp->m_pciu)->chid;
status = db_get_field(
paddr,
mp->m_type,
reply + 1,
mp->m_count,
pfl);
if (status < 0) {
send_err(mp, ECA_GETFAIL, client, RECORD_NAME(paddr));
log_header(mp, 0);
}
else{
/*
* force string message size to be the true size rounded to even
* boundary
*/
if (mp->m_type == DBR_STRING && mp->m_count == 1) {
/* add 1 so that the string terminator will be shipped */
strcnt = strlen(reply + 1) + 1;
reply->m_postsize = strcnt;
}
END_MSG(client);
/*
* Ensures timely response for events, but does que
* them up like db requests when the OPI does not keep up.
*/
if (!hold)
cas_send_msg(client,FALSE);
}
if (pevext->send_lock)
UNLOCK_SEND(client);
return;
}
/*
*
* read_sync_reply()
*
*
*/
static void
read_sync_reply(mp, client)
FAST struct extmsg *mp;
struct client *client;
{
FAST struct extmsg *reply;
LOCK_SEND(client);
reply = (struct extmsg *) ALLOC_MSG(client, 0);
if (!reply)
taskSuspend(0);
*reply = *mp;
END_MSG(client);
UNLOCK_SEND(client);
return;
}
/*
*
* build_reply()
*
*
*/
static void
build_reply(mp, client)
FAST struct extmsg *mp;
struct client *client;
{
LIST *addrq = &client->addrq;
FAST struct extmsg *search_reply;
FAST struct extmsg *get_reply;
FAST int status;
struct db_addr tmp_addr;
FAST struct channel_in_use *pchannel;
void search_fail_reply();
/* Exit quickly if channel not on this node */
status = db_name_to_addr(mp->m_cmmd == IOC_BUILD ? mp + 2 : mp + 1, &tmp_addr);
if (status < 0) {
if (MPDEBUG == 1)
logMsg("Lookup for channel \"%s\" failed\n", mp + 1);
if (mp->m_type == DOREPLY)
search_fail_reply(mp, client);
return;
}
/* get block off free list if possible */
FASTLOCK(&rsrv_free_addrq_lck);
pchannel = (struct channel_in_use *) lstGet(&rsrv_free_addrq);
FASTUNLOCK(&rsrv_free_addrq_lck);
if (!pchannel) {
pchannel = (struct channel_in_use *) calloc(1, sizeof(*pchannel));
if (!pchannel) {
LOCK_SEND(client);
send_err(mp, ECA_ALLOCMEM, client, RECORD_NAME(&tmp_addr));
UNLOCK_SEND(client);
return;
}
}
pchannel->addr = tmp_addr;
pchannel->chid = (void *) mp->m_pciu;
/* store the addr block in a Q so it can be deallocated */
lstAdd(addrq, pchannel);
/*
* ALLOC_MSG allways allocs at least the sizeof extmsg Large
* requested size insures both messages sent in one reply NOTE: my
* UDP reliability schemes rely on both msgs in same reply Therefore
* the send buffer locked while both messages are placed
*/
LOCK_SEND(client);
if (mp->m_cmmd == IOC_BUILD) {
FAST short type = (mp + 1)->m_type;
FAST unsigned int count = (mp + 1)->m_count;
FAST unsigned int size;
size = (count - 1) * dbr_value_size[type] + dbr_size[type];
get_reply = (struct extmsg *) ALLOC_MSG(client, size + sizeof(*mp));
if (!get_reply) {
/* tell them that their request is to large */
send_err(mp, ECA_TOLARGE, client, RECORD_NAME(&tmp_addr));
} else {
struct event_ext evext;
evext.mp = mp + 1;
/* pchannel ukn prior to connect */
evext.mp->m_pciu = pchannel;
evext.client = client;
evext.send_lock = FALSE;
evext.size = size;
/*
* Arguments to this routine organized in favor of
* the standard db event calling mechanism-
* routine(userarg, paddr). See events added above.
* Hold argument set true so the send message buffer
* is not flushed once each call.
*/
read_reply(&evext, &tmp_addr, TRUE, NULL);
}
}
search_reply = (struct extmsg *) ALLOC_MSG(client, 0);
if (!search_reply)
taskSuspend();
*search_reply = *mp;
search_reply->m_postsize = 0;
/* this field for rmt machines where paddr invalid */
search_reply->m_type = tmp_addr.field_type;
search_reply->m_count = tmp_addr.no_elements;
search_reply->m_pciu = (void *) pchannel;
END_MSG(client);
UNLOCK_SEND(client);
return;
}
/* search_fail_reply()
*
* Only when requested by the client
* send search failed reply
*
*
*/
static void
search_fail_reply(mp, client)
FAST struct extmsg *mp;
struct client *client;
{
FAST struct extmsg *reply;
LOCK_SEND(client);
reply = (struct extmsg *) ALLOC_MSG(client, 0);
if (!reply) {
taskSuspend(0);
}
*reply = *mp;
reply->m_cmmd = IOC_NOT_FOUND;
reply->m_postsize = 0;
END_MSG(client);
UNLOCK_SEND(client);
}
/* send_err()
*
* reflect error msg back to the client
*
* send buffer lock must be on while in this routine
*
*/
static void
send_err(curp, status, client, footnote)
struct extmsg *curp;
int status;
struct client *client;
char *footnote;
{
FAST struct extmsg *reply;
FAST int size;
/*
* force string post size to be the true size rounded to even
* boundary
*/
size = strlen(footnote)+1;
size += sizeof(*curp);
reply = (struct extmsg *) ALLOC_MSG(client, size);
if (!reply){
printf( "caserver: Unable to deliver err msg [%s]\n",
ca_message(status));
return;
}
*reply = nill_msg;
reply->m_cmmd = IOC_ERROR;
reply->m_available = status;
reply->m_postsize = size;
switch (curp->m_cmmd) {
case IOC_EVENT_ADD:
case IOC_EVENT_CANCEL:
case IOC_READ:
case IOC_READ_NOTIFY:
case IOC_SEARCH:
case IOC_BUILD:
case IOC_WRITE:
reply->m_pciu = (void *)
((struct channel_in_use *) curp->m_pciu)->chid;
break;
case IOC_EVENTS_ON:
case IOC_EVENTS_OFF:
case IOC_READ_SYNC:
case IOC_SNAPSHOT:
default:
reply->m_pciu = (void *) NULL;
break;
}
*(reply + 1) = *curp;
strcpy(reply + 2, footnote);
END_MSG(client);
}
/* log_header()
*
* Debug aid - print the header part of a message.
*
*/
static void
log_header (mp, mnum)
FAST struct extmsg *mp;
{
logMsg( "N=%d cmd=%d type=%d pstsize=%d paddr=%x avail=%x\n",
mnum,
mp->m_cmmd,
mp->m_type,
mp->m_postsize,
MPTOPADDR(mp),
mp->m_available);
if(mp->m_cmmd==IOC_WRITE && mp->m_type==DBF_STRING)
logMsg("The string written: %s \n",mp+1);
}

274
src/rsrv/camsgtask.c Normal file
View File

@@ -0,0 +1,274 @@
/* @(#)camsgtask.c
* $Id$
* Author: Jeffrey O. Hill
* hill@luke.lanl.gov
* (505) 665 1831
* Date: 6-88
*
* Experimental Physics and Industrial Control System (EPICS)
*
* Copyright 1991, the Regents of the University of California,
* and the University of Chicago Board of Governors.
*
* This software was produced under U.S. Government contracts:
* (W-7405-ENG-36) at the Los Alamos National Laboratory,
* and (W-31-109-ENG-38) at Argonne National Laboratory.
*
* Initial development by:
* The Controls and Automation Group (AT-8)
* Ground Test Accelerator
* Accelerator Technology Division
* Los Alamos National Laboratory
*
* Co-developed with
* The Controls and Computing Group
* Accelerator Systems Division
* Advanced Photon Source
* Argonne National Laboratory
*
* Modification Log:
* -----------------
* .01 joh 08--88 added broadcast switchover to TCP/IP
* .02 joh 041089 added new event passing
* .03 joh 060791 camsgtask() now returns info about
* partial messages
*/
#include <vxWorks.h>
#include <lstLib.h>
#include <types.h>
#include <socket.h>
#include <ioLib.h>
#include <in.h>
#include <tcp.h>
#include <task_params.h>
#include <db_access.h>
#include <server.h>
/*
*
* camsgtask()
*
* CA server TCP client task (one spawned for each client)
*/
void camsgtask(sock)
FAST int sock;
{
int nchars;
FAST int status;
FAST struct client *client = NULL;
struct sockaddr_in addr;
int i;
int true = TRUE;
/*
* see TCP(4P) this seems to make unsollicited single events much
* faster. I take care of queue up as load increases.
*/
status = setsockopt(
sock,
IPPROTO_TCP,
TCP_NODELAY,
&true,
sizeof true);
if(status == ERROR){
logMsg("camsgtask: TCP_NODELAY option set failed\n");
close(sock);
return;
}
/*
* turn on KEEPALIVE so if the client crashes
* this task will find out and exit
*/
status = setsockopt(
sock,
SOL_SOCKET,
SO_KEEPALIVE,
&true,
sizeof true);
if(status == ERROR){
logMsg("camsgtask: SO_KEEPALIVE option set failed\n");
close(sock);
return;
}
nchars = recv( sock,
&addr,
sizeof(addr),
0);
if ( nchars < sizeof(addr) ){
logMsg("camsgtask: Protocol error\n");
close(sock);
return;
}
if(MPDEBUG==2){
logMsg( "camsgtask: Recieved connection request\n");
logMsg("from addr %x, udp port %x \n",
addr.sin_addr,
addr.sin_port);
}
/*
* NOTE: The client structure used here does not have to be locked
* since this thread does not traverse the addr que or the client
* que. This thread is deleted prior to deletion of this client by
* terminate_one_client().
*
* wait a reasonable length of time in case the cast server is busy
*/
for (i = 0; i < 10; i++) {
LOCK_CLIENTQ;
client = existing_client(&addr);
UNLOCK_CLIENTQ;
if (client)
break;
taskDelay(sysClkRateGet() * 5); /* 5 sec */
}
if (!client) {
logMsg("camsgtask: Unknown client (protocol error)\n");
close(sock);
return;
}
/*
* convert connection to TCP
*/
LOCK_SEND(client);
udp_to_tcp(client, sock);
UNLOCK_SEND(client);
client->tid = taskIdSelf();
client->evuser = (struct event_user *) db_init_events();
if (!client->evuser) {
logMsg("camsgtask: unable to init the event facility\n");
free_client(client);
return;
}
status = db_start_events(
client->evuser,
CA_EVENT_NAME,
NULL,
NULL);
if (status == ERROR) {
logMsg("camsgtask: unable to start the event facility\n");
free_client(client);
return;
}
client->recv.cnt = 0;
while (TRUE) {
client->recv.stk = 0;
nchars = recv(
sock,
&client->recv.buf[client->recv.cnt],
sizeof(client->recv.buf)-client->recv.cnt,
0);
if(nchars<=0){
if(MPDEBUG>0){
logMsg("CA server: msg recv error\n");
printErrno(errnoGet(taskIdSelf()));
}
break;
}
client->recv.cnt += nchars;
status = camessage(client, &client->recv);
if(status == OK){
unsigned bytes_left;
bytes_left = client->recv.cnt - client->recv.stk;
/*
* if there is a partial message
* align it with the start of the buffer
*/
if(bytes_left>0){
char *pbuf;
pbuf = client->recv.buf;
/*
* overlapping regions handled
* by bcopy
*/
bcopy( pbuf + client->recv.stk,
pbuf,
bytes_left);
client->recv.cnt = bytes_left;
}
else{
client->recv.cnt = 0;
}
}else{
client->recv.cnt = 0;
}
/*
* allow message to batch up if more are comming
*/
status = ioctl(sock, FIONREAD, &nchars);
if (status == ERROR) {
printErrno(errnoGet(taskIdSelf()));
taskSuspend(0);
}
if (nchars == 0)
cas_send_msg(client, TRUE);
/*
* dont hang around if there are no
* connections to process variables
*/
if (client->addrq.count == 0)
break;
}
free_client(client);
}
/*
*
* read_entire_msg()
*
*
*/
static int read_entire_msg(sockfd, mp, tot)
FAST sockfd; /* socket file descriptor */
FAST unsigned char *mp; /* where to read pointer */
FAST unsigned int tot; /* size of entire message */
{
FAST unsigned int nchars;
FAST unsigned int total = 0;
int status;
while(TRUE) {
nchars = recv( sockfd,
mp + total,
tot - total,
0);
if(nchars <= 0){
if(MPDEBUG == 2) {
logMsg("rsrv: msg recv error\n");
printErrno(errnoGet(taskIdSelf()));
}
return ERROR;
}
total += nchars;
if(total == tot)
return total;
else if(total > tot)
return ERROR;
}
}

143
src/rsrv/caserverio.c Normal file
View File

@@ -0,0 +1,143 @@
/* @(#)caserverio.c
* $Id$
* Author: Jeffrey O. Hill
* hill@luke.lanl.gov
* (505) 665 1831
* Date: 060791
*
* Experimental Physics and Industrial Control System (EPICS)
*
* Copyright 1991, the Regents of the University of California,
* and the University of Chicago Board of Governors.
*
* This software was produced under U.S. Government contracts:
* (W-7405-ENG-36) at the Los Alamos National Laboratory,
* and (W-31-109-ENG-38) at Argonne National Laboratory.
*
* Initial development by:
* The Controls and Automation Group (AT-8)
* Ground Test Accelerator
* Accelerator Technology Division
* Los Alamos National Laboratory
*
* Co-developed with
* The Controls and Computing Group
* Accelerator Systems Division
* Advanced Photon Source
* Argonne National Laboratory
*
* Modification Log:
* -----------------
*/
#include <vxWorks.h>
#include <lstLib.h>
#include <types.h>
#include <socket.h>
#include <ioLib.h>
#include <in.h>
#include <tcp.h>
#include <server.h>
/*
*
* cas_send_msg()
*
* (channel access server send message)
*/
void cas_send_msg(pclient, lock_needed)
struct client *pclient;
int lock_needed;
{
int status;
if(pclient->disconnect){
return;
}
if(lock_needed){
LOCK_SEND(pclient);
}
if(pclient->send.stk){
pclient->send.cnt = pclient->send.stk;
status = sendto(
pclient->sock,
pclient->send.buf,
pclient->send.cnt,
NULL,
&pclient->addr,
sizeof(pclient->addr));
if(status>=0){
if(MPDEBUG==2){
logMsg( "Sent a message of %d bytes\n",
pclient->send.cnt);
}
}
else{
logMsg("caserver: client unreachable\n");
logMsg("caserver: msg from vxWorks follows\n");
printErrno(errnoGet(taskIdSelf()));
pclient->disconnect = TRUE;
}
pclient->send.stk = 0;
}
if(lock_needed){
UNLOCK_SEND(pclient);
}
return;
}
/*
*
* cas_alloc_msg()
*
* see also ALLOC_MSG()/END_MSG() in server.h
*
* (allocate space in the outgoing message buffer)
*
* send lock must be on while in this routine
*
* returns 1) a valid ptr to msg buffer space
* 2) NULL (msg will not fit)
*/
struct extmsg *cas_alloc_msg(pclient, extsize)
struct client *pclient; /* ptr to per client struct */
unsigned extsize; /* extension size */
{
unsigned msgsize;
unsigned newstack;
msgsize = extsize + sizeof(struct extmsg);
newstack = pclient->send.stk + msgsize;
if(newstack > pclient->send.maxstk){
if(pclient->disconnect){
pclient->send.stk = 0;
}
else{
cas_send_msg(pclient, FALSE);
}
newstack = pclient->send.stk + msgsize;
/*
* If dosnt fit now it never will
*/
if(newstack > pclient->send.maxstk){
return NULL;
}
}
/*
* it fits END_MSG will push it on the stack
*/
return (struct extmsg *) &pclient->send.buf[pclient->send.stk];
}

263
src/rsrv/caservertask.c Normal file
View File

@@ -0,0 +1,263 @@
/* @(#)caservertask.c
* $Id$
* Author: Jeffrey O. Hill
* hill@luke.lanl.gov
* (505) 665 1831
* Date: 5-88
*
* Experimental Physics and Industrial Control System (EPICS)
*
* Copyright 1991, the Regents of the University of California,
* and the University of Chicago Board of Governors.
*
* This software was produced under U.S. Government contracts:
* (W-7405-ENG-36) at the Los Alamos National Laboratory,
* and (W-31-109-ENG-38) at Argonne National Laboratory.
*
* Initial development by:
* The Controls and Automation Group (AT-8)
* Ground Test Accelerator
* Accelerator Technology Division
* Los Alamos National Laboratory
*
* Co-developed with
* The Controls and Computing Group
* Accelerator Systems Division
* Advanced Photon Source
* Argonne National Laboratory
*
* Modification Log:
* -----------------
* .01 joh 030891 now saves old client structure for later reuse
*/
#include <vxWorks.h>
#include <lstLib.h>
#include <taskLib.h>
#include <types.h>
#include <socket.h>
#include <in.h>
#include <db_access.h>
#include <task_params.h>
#include <server.h>
/*
*
* req_server()
*
* CA server task
*
* Waits for connections at the CA port and spawns a task to
* handle each of them
*
*/
void
req_server()
{
struct sockaddr_in serverAddr; /* server's address */
FAST struct client *client;
FAST int status;
FAST int i;
if (IOC_sock != 0 && IOC_sock != ERROR)
if ((status = close(IOC_sock)) == ERROR)
logMsg("Unable to close open master socket\n");
/*
* Open the socket. Use ARPA Internet address format and stream
* sockets. Format described in <sys/socket.h>.
*/
if ((IOC_sock = socket(AF_INET, SOCK_STREAM, 0)) == ERROR) {
logMsg("Socket creation error\n");
printErrno(errnoGet());
taskSuspend(0);
}
/* Zero the sock_addr structure */
bfill(&serverAddr, sizeof(serverAddr), 0);
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = CA_SERVER_PORT;
/* get server's Internet address */
if (bind(IOC_sock, &serverAddr, sizeof(serverAddr)) == ERROR) {
logMsg("Bind error\n");
printErrno(errnoGet());
close(IOC_sock);
taskSuspend(0);
}
/* listen and accept new connections */
if (listen(IOC_sock, 10) == ERROR) {
logMsg("Listen error\n");
printErrno(errnoGet());
close(IOC_sock);
taskSuspend(0);
}
while (TRUE) {
if ((i = accept(IOC_sock, NULL, 0)) == ERROR) {
logMsg("Accept error\n");
printErrno(errnoGet());
taskSuspend(0);
} else {
status = taskSpawn(CA_CLIENT_NAME,
CA_CLIENT_PRI,
CA_CLIENT_OPT,
CA_CLIENT_STACK,
camsgtask,
i);
if (status == ERROR) {
logMsg("Unable to spawn network server\n");
printErrno(errnoGet());
}
}
}
}
/*
*
* free_client()
*
*/
STATUS
free_client(client)
register struct client *client;
{
if (client) {
/* remove it from the list of clients */
/* list delete returns no status */
LOCK_CLIENTQ;
lstDelete(&clientQ, client);
UNLOCK_CLIENTQ;
terminate_one_client(client);
LOCK_CLIENTQ;
lstAdd(&rsrv_free_clientQ, client);
UNLOCK_CLIENTQ;
} else {
LOCK_CLIENTQ;
while (client = (struct client *) lstGet(&clientQ))
terminate_one_client(client);
FASTLOCK(&rsrv_free_addrq_lck);
lstFree(&rsrv_free_addrq);
lstInit(&rsrv_free_addrq);
FASTUNLOCK(&rsrv_free_addrq_lck);
FASTLOCK(&rsrv_free_eventq_lck);
lstFree(&rsrv_free_eventq);
lstInit(&rsrv_free_eventq);
FASTUNLOCK(&rsrv_free_eventq_lck);
lstFree(&rsrv_free_clientQ);
UNLOCK_CLIENTQ;
}
}
/*
* TERMINATE_ONE_CLIENT
*/
int
terminate_one_client(client)
register struct client *client;
{
FAST int servertid = client->tid;
FAST int tmpsock = client->sock;
FAST int status;
FAST struct event_ext *pevext;
FAST struct channel_in_use *pciu;
if (client->proto == IPPROTO_TCP) {
logMsg("CA Connection %d Terminated\n", tmpsock);
/*
* Server task deleted first since close() is not reentrant
*/
if (servertid != taskIdSelf() && servertid)
if (td(servertid) == ERROR) { /* delete server */
if (errnoGet() != S_taskLib_TASK_ID_ERROR)
printErrno(errnoGet());
}
pciu = (struct channel_in_use *) & client->addrq;
while (pciu = (struct channel_in_use *) pciu->node.next)
while (pevext = (struct event_ext *) lstGet(&pciu->eventq)) {
status = db_cancel_event(pevext + 1);
if (status == ERROR)
taskSuspend(0);
FASTLOCK(&rsrv_free_eventq_lck);
lstAdd(&rsrv_free_eventq, pevext);
FASTUNLOCK(&rsrv_free_eventq_lck);
}
if (client->evuser) {
status = db_close_events(client->evuser);
if (status == ERROR)
taskSuspend(0);
}
if (tmpsock != NONE)
if ((status = close(tmpsock)) == ERROR) /* close socket */
logMsg("Unable to close open TCP client socket\n");
}
FASTLOCK(&rsrv_free_addrq_lck);
/* free dbaddr str */
lstExtract(&client->addrq,
client->addrq.node.next,
client->addrq.node.previous,
&rsrv_free_addrq);
FASTUNLOCK(&rsrv_free_addrq_lck);
return OK;
}
/*
* client_stat()
*
*/
STATUS
client_stat()
{
FAST struct client *client;
NODE *addr;
struct sockaddr_in *psaddr;
LOCK_CLIENTQ;
client = (struct client *) lstNext(&clientQ);
while (client) {
char *pproto;
if(client->proto == IPPROTO_UDP){
pproto = "UDP";
}
else if(client->proto == IPPROTO_TCP){
pproto = "TCP";
}
else{
pproto = "UKN";
}
printf("Socket %d Protocol %s\n", client->sock, pproto);
psaddr = &client->addr;
printf("\tRemote address %u.%u.%u.%u Remote port %d\n",
(psaddr->sin_addr.s_addr & 0xff000000) >> 24,
(psaddr->sin_addr.s_addr & 0x00ff0000) >> 16,
(psaddr->sin_addr.s_addr & 0x0000ff00) >> 8,
(psaddr->sin_addr.s_addr & 0x000000ff),
psaddr->sin_port);
printf("\tChannel count %d\n", lstCount(&client->addrq));
addr = (NODE *) & client->addrq;
while (addr = lstNext(addr))
printf("\t%s ", ((struct db_addr *) (addr + 1))->precord);
printf("\n");
client = (struct client *) lstNext(client);
}
UNLOCK_CLIENTQ;
return lstCount(&clientQ);
}