diff --git a/src/rsrv/camessage.c b/src/rsrv/camessage.c new file mode 100644 index 000000000..39370c1dd --- /dev/null +++ b/src/rsrv/camessage.c @@ -0,0 +1,708 @@ +/* @(#)camessage.c + * $Id$ + * Author: Jeffrey O. Hill + * hill@luke.lanl.gov + * (505) 665 1831 + * Date: 5-88 + * + * Experimental Physics and Industrial Control System (EPICS) + * + * Copyright 1991, the Regents of the University of California, + * and the University of Chicago Board of Governors. + * + * This software was produced under U.S. Government contracts: + * (W-7405-ENG-36) at the Los Alamos National Laboratory, + * and (W-31-109-ENG-38) at Argonne National Laboratory. + * + * Initial development by: + * The Controls and Automation Group (AT-8) + * Ground Test Accelerator + * Accelerator Technology Division + * Los Alamos National Laboratory + * + * Co-developed with + * The Controls and Computing Group + * Accelerator Systems Division + * Advanced Photon Source + * Argonne National Laboratory + * + * Modification Log: + * ----------------- + * .01 joh 012290 placed break in ca_cancel_event search so entire + * list is not searched. + * .02 joh 011091 added missing break to error message send + * chix select switch + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +static struct extmsg nill_msg; + +#define MPTOPADDR(MP) (&((struct channel_in_use *)(MP)->m_pciu)->addr) +#define RECORD_NAME(PADDR) (((struct db_addr *)(PADDR))->precord) + +void search_reply(); +void build_reply(); +void read_reply(); +void read_sync_reply(); +void event_cancel_reply(); +void clear_channel_reply(); +void send_err(); +void log_header(); + + +/* + * CAMESSSAGE() + * + * + */ +camessage(client, recv) + struct client *client; + struct message_buffer *recv; +{ + int nmsg = 0; + unsigned msgsize; + unsigned bytes_left; + int status; + FAST struct extmsg *mp; + FAST struct event_ext *pevext; + + + if (MPDEBUG == 1) + logMsg("Parsing %d(decimal) bytes\n", recv->cnt); + + bytes_left = recv->cnt; + while (bytes_left) { + + if(bytes_left < sizeof(*mp)) + return OK; + + mp = (struct extmsg *) &recv->buf[recv->stk]; + + msgsize = mp->m_postsize + sizeof(*mp); + + if(msgsize > bytes_left) + return OK; + + nmsg++; + + if (MPDEBUG == 1) + log_header(mp, nmsg); + + switch (mp->m_cmmd) { + case IOC_NOOP: /* verify TCP */ + break; + + case IOC_EVENT_ADD: + + FASTLOCK(&rsrv_free_eventq_lck); + pevext = (struct event_ext *) + lstGet(&rsrv_free_eventq); + FASTUNLOCK(&rsrv_free_eventq_lck); + if (!pevext) { + int size = db_sizeof_event_block() + + sizeof(*pevext); + pevext = + (struct event_ext *) malloc(size); + if (!pevext) { + LOCK_SEND(client); + send_err( + mp, + ECA_ALLOCMEM, + client, + RECORD_NAME(MPTOPADDR(mp))); + UNLOCK_SEND(client); + break; + } + } + pevext->msg = *mp; + pevext->mp = &pevext->msg; /* for speed- see + * IOC_READ */ + pevext->client = client; + pevext->send_lock = TRUE; + pevext->size = (mp->m_count - 1) + * dbr_value_size[mp->m_type] + + dbr_size[mp->m_type]; + + lstAdd( &((struct channel_in_use *) mp->m_pciu)->eventq, + pevext); + + status = db_add_event(client->evuser, + MPTOPADDR(mp), + read_reply, + pevext, + (unsigned) ((struct monops *) mp)->m_info.m_mask, + pevext + 1); + if (status == ERROR) { + LOCK_SEND(client); + send_err( + mp, + ECA_ADDFAIL, + client, + RECORD_NAME(MPTOPADDR(mp))); + UNLOCK_SEND(client); + } + /* + * allways send it once at event add + * + * Hold argument is supplied true so the send message + * buffer is not flushed once each call. + */ + read_reply(pevext, MPTOPADDR(mp), TRUE, NULL); + + break; + + case IOC_EVENT_CANCEL: + event_cancel_reply(mp, client); + break; + + case IOC_CLEAR_CHANNEL: + clear_channel_reply(mp, client); + break; + + case IOC_READ_NOTIFY: + case IOC_READ: + { + struct event_ext evext; + + pevext = &evext; + pevext->mp = mp; + pevext->client = client; + pevext->send_lock = TRUE; + if (mp->m_count == 1) + pevext->size = dbr_size[mp->m_type]; + else + pevext->size = (mp->m_count - 1) * + dbr_value_size[mp->m_type] + + dbr_size[mp->m_type]; + + /* + * Arguments to this routine organized in + * favor of the standard db event calling + * mechanism- routine(userarg, paddr). See + * events added above. + * + * Hold argument set true so the send message + * buffer is not flushed once each call. + */ + read_reply(pevext, MPTOPADDR(mp), TRUE, NULL); + break; + } + case IOC_SEARCH: + case IOC_BUILD: + build_reply(mp, client); + break; + case IOC_WRITE: + status = db_put_field( + MPTOPADDR(mp), + mp->m_type, + mp + 1, + mp->m_count + ); + if (status < 0) { + LOCK_SEND(client); + send_err( + mp, + ECA_PUTFAIL, + client, + RECORD_NAME(MPTOPADDR(mp))); + UNLOCK_SEND(client); + } + break; + case IOC_EVENTS_ON: + { + struct channel_in_use *pciu = + (struct channel_in_use *) & client->addrq; + + client->eventsoff = FALSE; + + while (pciu = (struct channel_in_use *) + pciu->node.next) { + + pevext = (struct event_ext *) + & pciu->eventq; + while (pevext = (struct event_ext *) + pevext->node.next){ + + if (pevext->modified) { + read_reply( + pevext, + MPTOPADDR(&pevext->msg), + TRUE, + NULL); + pevext->modified = FALSE; + } + } + } + break; + } + case IOC_EVENTS_OFF: + client->eventsoff = TRUE; + break; + case IOC_READ_SYNC: + read_sync_reply(mp, client); + break; + default: + log_header(mp, nmsg); + LOCK_SEND(client); + send_err(mp, ECA_INTERNAL, client, "Invalid Msg"); + UNLOCK_SEND(client); + return ERROR; + + } + + recv->stk += msgsize; + bytes_left = recv->cnt - recv->stk; + } + + + return OK; +} + + +/* + * + * clear_channel_reply() + * + * + */ +static void +clear_channel_reply(mp, client) +FAST struct extmsg *mp; +struct client *client; +{ + FAST struct extmsg *reply; + FAST struct event_ext *pevext; + FAST int status; + struct channel_in_use *pciu = (struct channel_in_use *) mp->m_pciu; + LIST *peventq = &pciu->eventq; + + for (pevext = (struct event_ext *) peventq->node.next; + pevext; + pevext = (struct event_ext *) pevext->node.next) { + status = db_cancel_event(pevext + 1); + if (status == ERROR) + taskSuspend(0); + lstDelete(peventq, pevext); + + FASTLOCK(&rsrv_free_eventq_lck); + lstAdd(&rsrv_free_eventq, pevext); + FASTUNLOCK(&rsrv_free_eventq_lck); + } + + /* + * send delete confirmed message + */ + LOCK_SEND(client); + reply = (struct extmsg *) ALLOC_MSG(client, 0); + if (!reply) { + UNLOCK_SEND(client); + taskSuspend(0); + } + *reply = *mp; + + END_MSG(client); + UNLOCK_SEND(client); + + lstDelete(&client->addrq, pciu); + FASTLOCK(&rsrv_free_addrq_lck); + lstAdd(&rsrv_free_addrq, pciu); + FASTUNLOCK(&rsrv_free_addrq_lck); + + return; +} + + + + +/* + * + * event_cancel_reply() + * + * + * Much more efficient now since the event blocks hang off the channel in use + * blocks not all together off the client block. + */ +static void +event_cancel_reply(mp, client) + FAST struct extmsg *mp; + struct client *client; +{ + FAST struct extmsg *reply; + FAST struct event_ext *pevext; + FAST int status; + LIST *peventq = + &((struct channel_in_use *) mp->m_pciu)->eventq; + + + for (pevext = (struct event_ext *) peventq->node.next; + pevext; + pevext = (struct event_ext *) pevext->node.next) + if (pevext->msg.m_available == mp->m_available) { + status = db_cancel_event(pevext + 1); + if (status == ERROR) + taskSuspend(0); + lstDelete(peventq, pevext); + + /* + * send delete confirmed message + */ + LOCK_SEND(client); + reply = (struct extmsg *) ALLOC_MSG(client, 0); + if (!reply) { + UNLOCK_SEND(client); + taskSuspend(0); + } + *reply = pevext->msg; + reply->m_postsize = 0; + + END_MSG(client); + UNLOCK_SEND(client); + + FASTLOCK(&rsrv_free_eventq_lck); + lstAdd(&rsrv_free_eventq, pevext); + FASTUNLOCK(&rsrv_free_eventq_lck); + + return; + } + /* + * Not Found- return an error message + */ + LOCK_SEND(client); + send_err(mp, ECA_BADMONID, client, NULL); + UNLOCK_SEND(client); + + return; +} + + + +/* + * + * read_reply() + * + * + */ +static void +read_reply(pevext, paddr, hold, pfl) +FAST struct event_ext *pevext; +FAST struct db_addr *paddr; +int hold; /* more on the way if true */ +db_field_log *pfl; +{ + FAST struct extmsg *mp = pevext->mp; + FAST struct client *client = pevext->client; + FAST struct extmsg *reply; + FAST int status; + FAST int strcnt; + + /* + * If flow control is on set modified and send for later + */ + if (client->eventsoff) { + pevext->modified = TRUE; + return; + } + if (pevext->send_lock) + LOCK_SEND(client); + + reply = (struct extmsg *) ALLOC_MSG(client, pevext->size); + if (!reply) { + send_err(mp, ECA_TOLARGE, client, RECORD_NAME(paddr)); + if (pevext->send_lock) + UNLOCK_SEND(client); + return; + } + *reply = *mp; + reply->m_postsize = pevext->size; + reply->m_pciu = (void *) ((struct channel_in_use *) mp->m_pciu)->chid; + status = db_get_field( + paddr, + mp->m_type, + reply + 1, + mp->m_count, + pfl); + if (status < 0) { + send_err(mp, ECA_GETFAIL, client, RECORD_NAME(paddr)); + log_header(mp, 0); + } + else{ + /* + * force string message size to be the true size rounded to even + * boundary + */ + if (mp->m_type == DBR_STRING && mp->m_count == 1) { + /* add 1 so that the string terminator will be shipped */ + strcnt = strlen(reply + 1) + 1; + reply->m_postsize = strcnt; + } + END_MSG(client); + + /* + * Ensures timely response for events, but does que + * them up like db requests when the OPI does not keep up. + */ + if (!hold) + cas_send_msg(client,FALSE); + } + + if (pevext->send_lock) + UNLOCK_SEND(client); + + return; +} + + +/* + * + * read_sync_reply() + * + * + */ +static void +read_sync_reply(mp, client) + FAST struct extmsg *mp; + struct client *client; +{ + FAST struct extmsg *reply; + + LOCK_SEND(client); + reply = (struct extmsg *) ALLOC_MSG(client, 0); + if (!reply) + taskSuspend(0); + + *reply = *mp; + + END_MSG(client); + + UNLOCK_SEND(client); + + return; +} + + +/* + * + * build_reply() + * + * + */ +static void +build_reply(mp, client) + FAST struct extmsg *mp; + struct client *client; +{ + LIST *addrq = &client->addrq; + FAST struct extmsg *search_reply; + FAST struct extmsg *get_reply; + FAST int status; + struct db_addr tmp_addr; + FAST struct channel_in_use *pchannel; + + void search_fail_reply(); + + + /* Exit quickly if channel not on this node */ + status = db_name_to_addr(mp->m_cmmd == IOC_BUILD ? mp + 2 : mp + 1, &tmp_addr); + if (status < 0) { + if (MPDEBUG == 1) + logMsg("Lookup for channel \"%s\" failed\n", mp + 1); + if (mp->m_type == DOREPLY) + search_fail_reply(mp, client); + return; + } + /* get block off free list if possible */ + FASTLOCK(&rsrv_free_addrq_lck); + pchannel = (struct channel_in_use *) lstGet(&rsrv_free_addrq); + FASTUNLOCK(&rsrv_free_addrq_lck); + if (!pchannel) { + pchannel = (struct channel_in_use *) calloc(1, sizeof(*pchannel)); + if (!pchannel) { + LOCK_SEND(client); + send_err(mp, ECA_ALLOCMEM, client, RECORD_NAME(&tmp_addr)); + UNLOCK_SEND(client); + return; + } + } + pchannel->addr = tmp_addr; + pchannel->chid = (void *) mp->m_pciu; + /* store the addr block in a Q so it can be deallocated */ + lstAdd(addrq, pchannel); + + + /* + * ALLOC_MSG allways allocs at least the sizeof extmsg Large + * requested size insures both messages sent in one reply NOTE: my + * UDP reliability schemes rely on both msgs in same reply Therefore + * the send buffer locked while both messages are placed + */ + LOCK_SEND(client); + if (mp->m_cmmd == IOC_BUILD) { + FAST short type = (mp + 1)->m_type; + FAST unsigned int count = (mp + 1)->m_count; + FAST unsigned int size; + + size = (count - 1) * dbr_value_size[type] + dbr_size[type]; + + get_reply = (struct extmsg *) ALLOC_MSG(client, size + sizeof(*mp)); + if (!get_reply) { + /* tell them that their request is to large */ + send_err(mp, ECA_TOLARGE, client, RECORD_NAME(&tmp_addr)); + } else { + struct event_ext evext; + + evext.mp = mp + 1; + /* pchannel ukn prior to connect */ + evext.mp->m_pciu = pchannel; + evext.client = client; + evext.send_lock = FALSE; + evext.size = size; + + /* + * Arguments to this routine organized in favor of + * the standard db event calling mechanism- + * routine(userarg, paddr). See events added above. + * Hold argument set true so the send message buffer + * is not flushed once each call. + */ + read_reply(&evext, &tmp_addr, TRUE, NULL); + } + } + search_reply = (struct extmsg *) ALLOC_MSG(client, 0); + if (!search_reply) + taskSuspend(); + + *search_reply = *mp; + search_reply->m_postsize = 0; + + /* this field for rmt machines where paddr invalid */ + search_reply->m_type = tmp_addr.field_type; + search_reply->m_count = tmp_addr.no_elements; + search_reply->m_pciu = (void *) pchannel; + + END_MSG(client); + UNLOCK_SEND(client); + + return; +} + + +/* search_fail_reply() + * + * Only when requested by the client + * send search failed reply + * + * + */ +static void +search_fail_reply(mp, client) + FAST struct extmsg *mp; + struct client *client; +{ + FAST struct extmsg *reply; + + LOCK_SEND(client); + reply = (struct extmsg *) ALLOC_MSG(client, 0); + if (!reply) { + taskSuspend(0); + } + *reply = *mp; + reply->m_cmmd = IOC_NOT_FOUND; + reply->m_postsize = 0; + + END_MSG(client); + UNLOCK_SEND(client); + +} + + +/* send_err() + * + * reflect error msg back to the client + * + * send buffer lock must be on while in this routine + * + */ +static void +send_err(curp, status, client, footnote) + struct extmsg *curp; + int status; + struct client *client; + char *footnote; +{ + FAST struct extmsg *reply; + FAST int size; + + /* + * force string post size to be the true size rounded to even + * boundary + */ + size = strlen(footnote)+1; + size += sizeof(*curp); + + reply = (struct extmsg *) ALLOC_MSG(client, size); + if (!reply){ + printf( "caserver: Unable to deliver err msg [%s]\n", + ca_message(status)); + return; + } + + *reply = nill_msg; + reply->m_cmmd = IOC_ERROR; + reply->m_available = status; + reply->m_postsize = size; + switch (curp->m_cmmd) { + case IOC_EVENT_ADD: + case IOC_EVENT_CANCEL: + case IOC_READ: + case IOC_READ_NOTIFY: + case IOC_SEARCH: + case IOC_BUILD: + case IOC_WRITE: + reply->m_pciu = (void *) + ((struct channel_in_use *) curp->m_pciu)->chid; + break; + case IOC_EVENTS_ON: + case IOC_EVENTS_OFF: + case IOC_READ_SYNC: + case IOC_SNAPSHOT: + default: + reply->m_pciu = (void *) NULL; + break; + } + *(reply + 1) = *curp; + strcpy(reply + 2, footnote); + + END_MSG(client); + +} + + +/* log_header() + * + * Debug aid - print the header part of a message. + * + */ +static void +log_header (mp, mnum) +FAST struct extmsg *mp; +{ + logMsg( "N=%d cmd=%d type=%d pstsize=%d paddr=%x avail=%x\n", + mnum, + mp->m_cmmd, + mp->m_type, + mp->m_postsize, + MPTOPADDR(mp), + mp->m_available); + if(mp->m_cmmd==IOC_WRITE && mp->m_type==DBF_STRING) + logMsg("The string written: %s \n",mp+1); +} diff --git a/src/rsrv/camsgtask.c b/src/rsrv/camsgtask.c new file mode 100644 index 000000000..f659e3094 --- /dev/null +++ b/src/rsrv/camsgtask.c @@ -0,0 +1,274 @@ +/* @(#)camsgtask.c + * $Id$ + * Author: Jeffrey O. Hill + * hill@luke.lanl.gov + * (505) 665 1831 + * Date: 6-88 + * + * Experimental Physics and Industrial Control System (EPICS) + * + * Copyright 1991, the Regents of the University of California, + * and the University of Chicago Board of Governors. + * + * This software was produced under U.S. Government contracts: + * (W-7405-ENG-36) at the Los Alamos National Laboratory, + * and (W-31-109-ENG-38) at Argonne National Laboratory. + * + * Initial development by: + * The Controls and Automation Group (AT-8) + * Ground Test Accelerator + * Accelerator Technology Division + * Los Alamos National Laboratory + * + * Co-developed with + * The Controls and Computing Group + * Accelerator Systems Division + * Advanced Photon Source + * Argonne National Laboratory + * + * Modification Log: + * ----------------- + * .01 joh 08--88 added broadcast switchover to TCP/IP + * .02 joh 041089 added new event passing + * .03 joh 060791 camsgtask() now returns info about + * partial messages + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +/* + * + * camsgtask() + * + * CA server TCP client task (one spawned for each client) + */ +void camsgtask(sock) +FAST int sock; +{ + int nchars; + FAST int status; + FAST struct client *client = NULL; + struct sockaddr_in addr; + int i; + int true = TRUE; + + + /* + * see TCP(4P) this seems to make unsollicited single events much + * faster. I take care of queue up as load increases. + */ + status = setsockopt( + sock, + IPPROTO_TCP, + TCP_NODELAY, + &true, + sizeof true); + if(status == ERROR){ + logMsg("camsgtask: TCP_NODELAY option set failed\n"); + close(sock); + return; + } + + + /* + * turn on KEEPALIVE so if the client crashes + * this task will find out and exit + */ + status = setsockopt( + sock, + SOL_SOCKET, + SO_KEEPALIVE, + &true, + sizeof true); + if(status == ERROR){ + logMsg("camsgtask: SO_KEEPALIVE option set failed\n"); + close(sock); + return; + } + + nchars = recv( sock, + &addr, + sizeof(addr), + 0); + if ( nchars < sizeof(addr) ){ + logMsg("camsgtask: Protocol error\n"); + close(sock); + return; + } + + if(MPDEBUG==2){ + logMsg( "camsgtask: Recieved connection request\n"); + logMsg("from addr %x, udp port %x \n", + addr.sin_addr, + addr.sin_port); + } + + /* + * NOTE: The client structure used here does not have to be locked + * since this thread does not traverse the addr que or the client + * que. This thread is deleted prior to deletion of this client by + * terminate_one_client(). + * + * wait a reasonable length of time in case the cast server is busy + */ + for (i = 0; i < 10; i++) { + LOCK_CLIENTQ; + client = existing_client(&addr); + UNLOCK_CLIENTQ; + + if (client) + break; + + taskDelay(sysClkRateGet() * 5); /* 5 sec */ + } + + if (!client) { + logMsg("camsgtask: Unknown client (protocol error)\n"); + close(sock); + return; + } + + /* + * convert connection to TCP + */ + LOCK_SEND(client); + udp_to_tcp(client, sock); + UNLOCK_SEND(client); + + client->tid = taskIdSelf(); + + client->evuser = (struct event_user *) db_init_events(); + if (!client->evuser) { + logMsg("camsgtask: unable to init the event facility\n"); + free_client(client); + return; + } + status = db_start_events( + client->evuser, + CA_EVENT_NAME, + NULL, + NULL); + if (status == ERROR) { + logMsg("camsgtask: unable to start the event facility\n"); + free_client(client); + return; + } + + client->recv.cnt = 0; + while (TRUE) { + client->recv.stk = 0; + + nchars = recv( + sock, + &client->recv.buf[client->recv.cnt], + sizeof(client->recv.buf)-client->recv.cnt, + 0); + if(nchars<=0){ + if(MPDEBUG>0){ + logMsg("CA server: msg recv error\n"); + printErrno(errnoGet(taskIdSelf())); + } + break; + } + + client->recv.cnt += nchars; + status = camessage(client, &client->recv); + if(status == OK){ + unsigned bytes_left; + + bytes_left = client->recv.cnt - client->recv.stk; + + /* + * if there is a partial message + * align it with the start of the buffer + */ + if(bytes_left>0){ + char *pbuf; + + pbuf = client->recv.buf; + + /* + * overlapping regions handled + * by bcopy + */ + bcopy( pbuf + client->recv.stk, + pbuf, + bytes_left); + client->recv.cnt = bytes_left; + } + else{ + client->recv.cnt = 0; + } + }else{ + client->recv.cnt = 0; + } + + + /* + * allow message to batch up if more are comming + */ + status = ioctl(sock, FIONREAD, &nchars); + if (status == ERROR) { + printErrno(errnoGet(taskIdSelf())); + taskSuspend(0); + } + if (nchars == 0) + cas_send_msg(client, TRUE); + + /* + * dont hang around if there are no + * connections to process variables + */ + if (client->addrq.count == 0) + break; + } + + free_client(client); +} + + +/* + * + * read_entire_msg() + * + * + */ +static int read_entire_msg(sockfd, mp, tot) +FAST sockfd; /* socket file descriptor */ +FAST unsigned char *mp; /* where to read pointer */ +FAST unsigned int tot; /* size of entire message */ +{ + FAST unsigned int nchars; + FAST unsigned int total = 0; + int status; + + while(TRUE) { + nchars = recv( sockfd, + mp + total, + tot - total, + 0); + if(nchars <= 0){ + if(MPDEBUG == 2) { + logMsg("rsrv: msg recv error\n"); + printErrno(errnoGet(taskIdSelf())); + } + return ERROR; + } + + total += nchars; + if(total == tot) + return total; + else if(total > tot) + return ERROR; + } +} diff --git a/src/rsrv/caserverio.c b/src/rsrv/caserverio.c new file mode 100644 index 000000000..1f3775f4e --- /dev/null +++ b/src/rsrv/caserverio.c @@ -0,0 +1,143 @@ +/* @(#)caserverio.c + * $Id$ + * Author: Jeffrey O. Hill + * hill@luke.lanl.gov + * (505) 665 1831 + * Date: 060791 + * + * Experimental Physics and Industrial Control System (EPICS) + * + * Copyright 1991, the Regents of the University of California, + * and the University of Chicago Board of Governors. + * + * This software was produced under U.S. Government contracts: + * (W-7405-ENG-36) at the Los Alamos National Laboratory, + * and (W-31-109-ENG-38) at Argonne National Laboratory. + * + * Initial development by: + * The Controls and Automation Group (AT-8) + * Ground Test Accelerator + * Accelerator Technology Division + * Los Alamos National Laboratory + * + * Co-developed with + * The Controls and Computing Group + * Accelerator Systems Division + * Advanced Photon Source + * Argonne National Laboratory + * + * Modification Log: + * ----------------- + */ + +#include +#include +#include +#include +#include +#include +#include +#include + + +/* + * + * cas_send_msg() + * + * (channel access server send message) + */ +void cas_send_msg(pclient, lock_needed) +struct client *pclient; +int lock_needed; +{ + int status; + + if(pclient->disconnect){ + return; + } + + if(lock_needed){ + LOCK_SEND(pclient); + } + + if(pclient->send.stk){ + pclient->send.cnt = pclient->send.stk; + + status = sendto( + pclient->sock, + pclient->send.buf, + pclient->send.cnt, + NULL, + &pclient->addr, + sizeof(pclient->addr)); + if(status>=0){ + if(MPDEBUG==2){ + logMsg( "Sent a message of %d bytes\n", + pclient->send.cnt); + } + } + else{ + logMsg("caserver: client unreachable\n"); + logMsg("caserver: msg from vxWorks follows\n"); + printErrno(errnoGet(taskIdSelf())); + pclient->disconnect = TRUE; + } + + pclient->send.stk = 0; + } + + + if(lock_needed){ + UNLOCK_SEND(pclient); + } + + return; +} + + + +/* + * + * cas_alloc_msg() + * + * see also ALLOC_MSG()/END_MSG() in server.h + * + * (allocate space in the outgoing message buffer) + * + * send lock must be on while in this routine + * + * returns 1) a valid ptr to msg buffer space + * 2) NULL (msg will not fit) + */ +struct extmsg *cas_alloc_msg(pclient, extsize) +struct client *pclient; /* ptr to per client struct */ +unsigned extsize; /* extension size */ +{ + unsigned msgsize; + unsigned newstack; + + msgsize = extsize + sizeof(struct extmsg); + newstack = pclient->send.stk + msgsize; + if(newstack > pclient->send.maxstk){ + if(pclient->disconnect){ + pclient->send.stk = 0; + } + else{ + cas_send_msg(pclient, FALSE); + } + + newstack = pclient->send.stk + msgsize; + + /* + * If dosnt fit now it never will + */ + if(newstack > pclient->send.maxstk){ + return NULL; + } + } + + /* + * it fits END_MSG will push it on the stack + */ + return (struct extmsg *) &pclient->send.buf[pclient->send.stk]; +} diff --git a/src/rsrv/caservertask.c b/src/rsrv/caservertask.c new file mode 100644 index 000000000..92f37fc6e --- /dev/null +++ b/src/rsrv/caservertask.c @@ -0,0 +1,263 @@ +/* @(#)caservertask.c + * $Id$ + * Author: Jeffrey O. Hill + * hill@luke.lanl.gov + * (505) 665 1831 + * Date: 5-88 + * + * Experimental Physics and Industrial Control System (EPICS) + * + * Copyright 1991, the Regents of the University of California, + * and the University of Chicago Board of Governors. + * + * This software was produced under U.S. Government contracts: + * (W-7405-ENG-36) at the Los Alamos National Laboratory, + * and (W-31-109-ENG-38) at Argonne National Laboratory. + * + * Initial development by: + * The Controls and Automation Group (AT-8) + * Ground Test Accelerator + * Accelerator Technology Division + * Los Alamos National Laboratory + * + * Co-developed with + * The Controls and Computing Group + * Accelerator Systems Division + * Advanced Photon Source + * Argonne National Laboratory + * + * Modification Log: + * ----------------- + * .01 joh 030891 now saves old client structure for later reuse + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +/* + * + * req_server() + * + * CA server task + * + * Waits for connections at the CA port and spawns a task to + * handle each of them + * + */ +void +req_server() +{ + struct sockaddr_in serverAddr; /* server's address */ + FAST struct client *client; + FAST int status; + FAST int i; + + if (IOC_sock != 0 && IOC_sock != ERROR) + if ((status = close(IOC_sock)) == ERROR) + logMsg("Unable to close open master socket\n"); + + /* + * Open the socket. Use ARPA Internet address format and stream + * sockets. Format described in . + */ + if ((IOC_sock = socket(AF_INET, SOCK_STREAM, 0)) == ERROR) { + logMsg("Socket creation error\n"); + printErrno(errnoGet()); + taskSuspend(0); + } + + /* Zero the sock_addr structure */ + bfill(&serverAddr, sizeof(serverAddr), 0); + serverAddr.sin_family = AF_INET; + serverAddr.sin_port = CA_SERVER_PORT; + + /* get server's Internet address */ + if (bind(IOC_sock, &serverAddr, sizeof(serverAddr)) == ERROR) { + logMsg("Bind error\n"); + printErrno(errnoGet()); + close(IOC_sock); + taskSuspend(0); + } + + /* listen and accept new connections */ + if (listen(IOC_sock, 10) == ERROR) { + logMsg("Listen error\n"); + printErrno(errnoGet()); + close(IOC_sock); + taskSuspend(0); + } + + while (TRUE) { + if ((i = accept(IOC_sock, NULL, 0)) == ERROR) { + logMsg("Accept error\n"); + printErrno(errnoGet()); + taskSuspend(0); + } else { + status = taskSpawn(CA_CLIENT_NAME, + CA_CLIENT_PRI, + CA_CLIENT_OPT, + CA_CLIENT_STACK, + camsgtask, + i); + if (status == ERROR) { + logMsg("Unable to spawn network server\n"); + printErrno(errnoGet()); + } + } + } +} + + +/* + * + * free_client() + * + */ +STATUS +free_client(client) +register struct client *client; +{ + if (client) { + /* remove it from the list of clients */ + /* list delete returns no status */ + LOCK_CLIENTQ; + lstDelete(&clientQ, client); + UNLOCK_CLIENTQ; + terminate_one_client(client); + LOCK_CLIENTQ; + lstAdd(&rsrv_free_clientQ, client); + UNLOCK_CLIENTQ; + } else { + LOCK_CLIENTQ; + while (client = (struct client *) lstGet(&clientQ)) + terminate_one_client(client); + + FASTLOCK(&rsrv_free_addrq_lck); + lstFree(&rsrv_free_addrq); + lstInit(&rsrv_free_addrq); + FASTUNLOCK(&rsrv_free_addrq_lck); + + FASTLOCK(&rsrv_free_eventq_lck); + lstFree(&rsrv_free_eventq); + lstInit(&rsrv_free_eventq); + FASTUNLOCK(&rsrv_free_eventq_lck); + + lstFree(&rsrv_free_clientQ); + + UNLOCK_CLIENTQ; + } +} + + +/* + * TERMINATE_ONE_CLIENT + */ +int +terminate_one_client(client) +register struct client *client; +{ + FAST int servertid = client->tid; + FAST int tmpsock = client->sock; + FAST int status; + FAST struct event_ext *pevext; + FAST struct channel_in_use *pciu; + + if (client->proto == IPPROTO_TCP) { + + logMsg("CA Connection %d Terminated\n", tmpsock); + + /* + * Server task deleted first since close() is not reentrant + */ + if (servertid != taskIdSelf() && servertid) + if (td(servertid) == ERROR) { /* delete server */ + if (errnoGet() != S_taskLib_TASK_ID_ERROR) + printErrno(errnoGet()); + } + pciu = (struct channel_in_use *) & client->addrq; + while (pciu = (struct channel_in_use *) pciu->node.next) + while (pevext = (struct event_ext *) lstGet(&pciu->eventq)) { + + status = db_cancel_event(pevext + 1); + if (status == ERROR) + taskSuspend(0); + FASTLOCK(&rsrv_free_eventq_lck); + lstAdd(&rsrv_free_eventq, pevext); + FASTUNLOCK(&rsrv_free_eventq_lck); + } + + if (client->evuser) { + status = db_close_events(client->evuser); + if (status == ERROR) + taskSuspend(0); + } + if (tmpsock != NONE) + if ((status = close(tmpsock)) == ERROR) /* close socket */ + logMsg("Unable to close open TCP client socket\n"); + } + FASTLOCK(&rsrv_free_addrq_lck); + /* free dbaddr str */ + lstExtract(&client->addrq, + client->addrq.node.next, + client->addrq.node.previous, + &rsrv_free_addrq); + FASTUNLOCK(&rsrv_free_addrq_lck); + + return OK; +} + + +/* + * client_stat() + * + */ +STATUS +client_stat() +{ + FAST struct client *client; + NODE *addr; + struct sockaddr_in *psaddr; + + + LOCK_CLIENTQ; + client = (struct client *) lstNext(&clientQ); + while (client) { + char *pproto; + + if(client->proto == IPPROTO_UDP){ + pproto = "UDP"; + } + else if(client->proto == IPPROTO_TCP){ + pproto = "TCP"; + } + else{ + pproto = "UKN"; + } + + printf("Socket %d Protocol %s\n", client->sock, pproto); + psaddr = &client->addr; + printf("\tRemote address %u.%u.%u.%u Remote port %d\n", + (psaddr->sin_addr.s_addr & 0xff000000) >> 24, + (psaddr->sin_addr.s_addr & 0x00ff0000) >> 16, + (psaddr->sin_addr.s_addr & 0x0000ff00) >> 8, + (psaddr->sin_addr.s_addr & 0x000000ff), + psaddr->sin_port); + printf("\tChannel count %d\n", lstCount(&client->addrq)); + addr = (NODE *) & client->addrq; + while (addr = lstNext(addr)) + printf("\t%s ", ((struct db_addr *) (addr + 1))->precord); + printf("\n"); + + client = (struct client *) lstNext(client); + } + UNLOCK_CLIENTQ; + return lstCount(&clientQ); +}