many changes

This commit is contained in:
Jeff Hill
2000-02-10 16:05:01 +00:00
parent aed8054f38
commit 0c7674036c
8 changed files with 2463 additions and 2712 deletions
+3 -2
View File
@@ -7,8 +7,9 @@ USR_CFLAGS += -UUNIX -DiocCore
rsrv_SRCS = \
caserverio.c caservertask.c camsgtask.c camessage.c \
rsrv_init.c cast_server.c online_notify.c globalsource.c
cast_server.c online_notify.c
LIBRARY = rsrv
DLL_LIBS = Db As Com
SYS_DLL_LIBS := ws2_32
include $(TOP)/configure/RULES
+1239 -1283
View File
File diff suppressed because it is too large Load Diff
+103 -269
View File
@@ -1,309 +1,143 @@
/*
* Author: Jeffrey O. Hill
* hill@luke.lanl.gov
* (505) 665 1831
* Date: 6-88
* Author: Jeffrey O. Hill
* hill@luke.lanl.gov
* (505) 665 1831
* Date: 6-88
*
* Experimental Physics and Industrial Control System (EPICS)
* Experimental Physics and Industrial Control System (EPICS)
*
* Copyright 1991, the Regents of the University of California,
* and the University of Chicago Board of Governors.
* Copyright 1991, the Regents of the University of California,
* and the University of Chicago Board of Governors.
*
* This software was produced under U.S. Government contracts:
* (W-7405-ENG-36) at the Los Alamos National Laboratory,
* and (W-31-109-ENG-38) at Argonne National Laboratory.
* This software was produced under U.S. Government contracts:
* (W-7405-ENG-36) at the Los Alamos National Laboratory,
* and (W-31-109-ENG-38) at Argonne National Laboratory.
*
* Initial development by:
* The Controls and Automation Group (AT-8)
* Ground Test Accelerator
* Accelerator Technology Division
* Los Alamos National Laboratory
* Initial development by:
* The Controls and Automation Group (AT-8)
* Ground Test Accelerator
* Accelerator Technology Division
* Los Alamos National Laboratory
*
* Co-developed with
* The Controls and Computing Group
* Accelerator Systems Division
* Advanced Photon Source
* Argonne National Laboratory
* Co-developed with
* The Controls and Computing Group
* Accelerator Systems Division
* Advanced Photon Source
* Argonne National Laboratory
*
* Modification Log:
* -----------------
* .01 joh 08--88 added broadcast switchover to TCP/IP
* .02 joh 041089 added new event passing
* .03 joh 060791 camsgtask() now returns info about
* partial messages
* .04 joh 071191 changes to recover from UDP port reuse
* by rebooted clients
* .05 joh 110691 print nil recv disconnect message only
* if debug is on
* .06 joh 021192 better diagnostics
* .07 joh 031692 disconnect on bad message
* .08 joh 111892 set TCP buffer size to be synergistic
* with CA buffer size
* .09 joh 111992 moved the event tasks prioity down
* (added new arg to db_start_events())
*/
static char *sccsId = "@(#) $Id$";
#include <stddef.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <errno.h>
#include "osiSock.h"
#include "tsStamp.h"
#include "os_depen.h"
#include "osiThread.h"
#include "errlog.h"
#include "ellLib.h"
#include "taskwd.h"
#include "db_access.h"
#include "server.h"
#include "osiSockResource.h"
/*
* camsgtask()
*
* camsgtask()
*
* CA server TCP client task (one spawned for each client)
* CA server TCP client task (one spawned for each client)
*/
int camsgtask(sock)
SOCKET sock;
void camsgtask (struct client *client)
{
int nchars;
int status;
struct client *client;
int true = TRUE;
int nchars;
int status;
int true = TRUE;
client->tid = threadGetIdSelf ();
client = NULL;
taskwdInsert (threadGetIdSelf(), NULL, NULL);
/*
* see TCP(4P) this seems to make unsollicited single events much
* faster. I take care of queue up as load increases.
*/
status = setsockopt(
sock,
IPPROTO_TCP,
TCP_NODELAY,
(char *)&true,
sizeof(true));
if(status == ERROR){
errlogPrintf("CAS: TCP_NODELAY option set failed\n");
socket_close(sock);
return ERROR;
}
while (TRUE) {
client->recv.stk = 0;
nchars = recv (client->sock, &client->recv.buf[client->recv.cnt],
(int)(sizeof(client->recv.buf)-client->recv.cnt), 0);
if (nchars==0){
if (CASDEBUG>0) {
errlogPrintf ("CAS: nill message disconnect\n");
}
break;
}
else if (nchars<0) {
int anerrno = SOCKERRNO;
/*
* normal conn lost conditions
*/
if ( (anerrno!=SOCK_ECONNABORTED&&
anerrno!=SOCK_ECONNRESET&&
anerrno!=SOCK_ETIMEDOUT)||
CASDEBUG>2) {
errlogPrintf ("CAS: client disconnect(errno=%d)\n", anerrno);
}
break;
}
/*
* turn on KEEPALIVE so if the client crashes
* this task will find out and exit
*/
status = setsockopt(
sock,
SOL_SOCKET,
SO_KEEPALIVE,
(char *)&true,
sizeof(true));
if(status == ERROR){
errlogPrintf("CAS: SO_KEEPALIVE option set failed\n");
socket_close(sock);
return ERROR;
}
tsStampGetCurrent (&client->time_at_last_recv);
client->recv.cnt += (unsigned long) nchars;
/*
* some concern that vxWorks will run out of mBuf's
* if this change is made
*
* joh 11-10-98
*/
#if 0
/*
* set TCP buffer sizes to be synergistic
* with CA internal buffering
*/
i = MAX_MSG_SIZE;
status = setsockopt(
sock,
SOL_SOCKET,
SO_SNDBUF,
(char *)&i,
sizeof(i));
if(status < 0){
errlogPrintf("CAS: SO_SNDBUF set failed\n");
socket_close(sock);
return ERROR;
}
i = MAX_MSG_SIZE;
status = setsockopt(
sock,
SOL_SOCKET,
SO_RCVBUF,
(char *)&i,
sizeof(i));
if(status < 0){
errlogPrintf("CAS: SO_RCVBUF set failed\n");
socket_close(sock);
return ERROR;
}
#endif
status = camessage (client, &client->recv);
if (status == 0) {
/*
* if there is a partial message
* align it with the start of the buffer
*/
if (client->recv.cnt >= client->recv.stk) {
unsigned bytes_left;
char *pbuf;
/*
* performed in two steps purely for
* historical reasons
*/
client = (struct client *) create_udp_client(NULL);
if (!client) {
errlogPrintf("CAS: client init failed\n");
socket_close(sock);
return ERROR;
}
bytes_left = client->recv.cnt - client->recv.stk;
taskwdInsert( threadGetIdSelf(),
NULL,
NULL);
pbuf = client->recv.buf;
status = udp_to_tcp(client, sock);
if(status<0){
errlogPrintf("CAS: TCP convert failed\n");
free_client(client);
return ERROR;
}
if(CASDEBUG>0){
char buf[64];
ipAddrToA (&client->addr, buf, sizeof(buf));
errlogPrintf( "CAS: conn req from %s\n",
(int) /* sic */ buf);
}
LOCK_CLIENTQ;
ellAdd(&clientQ, &client->node);
UNLOCK_CLIENTQ;
client->evuser = (struct event_user *) db_init_events();
if (!client->evuser) {
errlogPrintf("CAS: unable to init the event facility\n");
free_client(client);
return ERROR;
}
status = db_add_extra_labor_event(
client->evuser,
write_notify_reply,
client);
if(status == ERROR){
errlogPrintf("CAS: unable to setup the event facility\n");
free_client(client);
return ERROR;
}
status = db_start_events(
client->evuser,
"CAevent",
NULL,
NULL,
1); /* one priority notch lower */
if (status == ERROR) {
errlogPrintf("CAS: unable to start the event facility\n");
free_client(client);
return ERROR;
}
client->recv.cnt = 0ul;
while (TRUE) {
client->recv.stk = 0;
nchars = recv(
sock,
&client->recv.buf[client->recv.cnt],
(int)(sizeof(client->recv.buf)-client->recv.cnt),
0);
if (nchars==0){
if(CASDEBUG>0){
errlogPrintf("CAS: nill message disconnect\n");
}
break;
}
else if(nchars<0){
long anerrno;
anerrno = SOCKERRNO;
/*
* normal conn lost conditions
*/
if( (anerrno!=ECONNABORTED&&
anerrno!=ECONNRESET&&
anerrno!=ETIMEDOUT)||
CASDEBUG>2){
errlogPrintf(
"CAS: client disconnect(errno=%d)\n",
anerrno);
}
break;
}
tsStampGetCurrent(&client->time_at_last_recv);
client->recv.cnt += (unsigned long) nchars;
status = camessage(client, &client->recv);
if(status == OK){
/*
* if there is a partial message
* align it with the start of the buffer
*/
if (client->recv.cnt >= client->recv.stk) {
unsigned bytes_left;
char *pbuf;
bytes_left = client->recv.cnt - client->recv.stk;
pbuf = client->recv.buf;
/*
* overlapping regions handled
* by memmove
*/
memmove(pbuf,
pbuf + client->recv.stk,
bytes_left);
client->recv.cnt = bytes_left;
}
else{
client->recv.cnt = 0ul;
}
}else{
/*
* overlapping regions handled
* properly by memmove
*/
memmove (pbuf, pbuf + client->recv.stk, bytes_left);
client->recv.cnt = bytes_left;
}
else {
client->recv.cnt = 0ul;
}
}
else {
char buf[64];
client->recv.cnt = 0ul;
/*
* disconnect when there are severe message errors
*/
client->recv.cnt = 0ul;
/*
* disconnect when there are severe message errors
*/
ipAddrToA (&client->addr, buf, sizeof(buf));
errlogPrintf ("CAS: forcing disconnect from %s\n",
/* sic */ (int) buf);
break;
}
/*
* allow message to batch up if more are comming
*/
status = socket_ioctl(sock, FIONREAD, (int) &nchars);
if (status < 0) {
errlogPrintf("CAS: io ctl err %d\n",
SOCKERRNO);
cas_send_msg(client, TRUE);
}
else if (nchars == 0){
cas_send_msg(client, TRUE);
}
}
free_client(client);
return OK;
errlogPrintf ("CAS: forcing disconnect from %s\n", buf);
break;
}
/*
* allow message to batch up if more are comming
*/
status = socket_ioctl (client->sock, FIONREAD, &nchars);
if (status < 0) {
errlogPrintf("CAS: io ctl err %d\n",
SOCKERRNO);
cas_send_msg(client, TRUE);
}
else if (nchars == 0){
cas_send_msg(client, TRUE);
}
}
destroy_client (client);
}
+147 -178
View File
@@ -1,237 +1,206 @@
/*
* Author: Jeffrey O. Hill
* hill@luke.lanl.gov
* (505) 665 1831
* Date: 060791
/*
* Author: Jeffrey O. Hill
* hill@luke.lanl.gov
* (505) 665 1831
* Date: 060791
*
* Experimental Physics and Industrial Control System (EPICS)
* Experimental Physics and Industrial Control System (EPICS)
*
* Copyright 1991, the Regents of the University of California,
* and the University of Chicago Board of Governors.
* Copyright 1991, the Regents of the University of California,
* and the University of Chicago Board of Governors.
*
* This software was produced under U.S. Government contracts:
* (W-7405-ENG-36) at the Los Alamos National Laboratory,
* and (W-31-109-ENG-38) at Argonne National Laboratory.
* This software was produced under U.S. Government contracts:
* (W-7405-ENG-36) at the Los Alamos National Laboratory,
* and (W-31-109-ENG-38) at Argonne National Laboratory.
*
* Initial development by:
* The Controls and Automation Group (AT-8)
* Ground Test Accelerator
* Accelerator Technology Division
* Los Alamos National Laboratory
* Initial development by:
* The Controls and Automation Group (AT-8)
* Ground Test Accelerator
* Accelerator Technology Division
* Los Alamos National Laboratory
*
* Co-developed with
* The Controls and Computing Group
* Accelerator Systems Division
* Advanced Photon Source
* Argonne National Laboratory
*
* Modification Log:
* -----------------
* .01 joh 071591 log time of last io in the client structure
* .02 joh 091691 use greater than on the DEBUG level test
* .03 joh 110491 improved diagnostics
* .04 joh 021292 improved diagnostics
* .05 joh 022092 improved diagnostics
* .06 joh 031992 improved diagnostics
* Co-developed with
* The Controls and Computing Group
* Accelerator Systems Division
* Advanced Photon Source
* Argonne National Laboratory
*/
static char *sccsId = "@(#) $Id$";
#include <stddef.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include "osiSock.h"
#include "tsStamp.h"
#include "ellLib.h"
#include "errlog.h"
#include "server.h"
#include "osiSockResource.h"
/*
* cas_send_msg()
*
* cas_send_msg()
*
* (channel access server send message)
* (channel access server send message)
*/
void cas_send_msg(pclient, lock_needed)
struct client *pclient;
int lock_needed;
void cas_send_msg (struct client *pclient, int lock_needed)
{
int status;
int status;
if(CASDEBUG>2 && pclient->send.stk){
errlogPrintf( "CAS: Sending a message of %d bytes\n",
pclient->send.stk);
}
if (CASDEBUG>2 && pclient->send.stk) {
errlogPrintf ("CAS: Sending a message of %d bytes\n", pclient->send.stk);
}
if(pclient->disconnect){
if(CASDEBUG>2){
errlogPrintf("CAS: msg Discard for sock %d addr %x\n",
pclient->sock,
pclient->addr.sin_addr.s_addr);
}
return;
}
if (pclient->disconnect) {
if (CASDEBUG>2) {
errlogPrintf ("CAS: msg Discard for sock %d addr %x\n",
pclient->sock, pclient->addr.sin_addr.s_addr);
}
return;
}
if(lock_needed){
SEND_LOCK(pclient);
}
if(lock_needed){
SEND_LOCK(pclient);
}
if(pclient->send.stk){
if(pclient->send.stk){
#ifdef CONVERSION_REQUIRED
/* Convert all caHdr into net format.
* The remaining bytes must already be in
* net format, because here we have no clue
* how to convert them.
*/
char *buf;
unsigned long msg_size, num_bytes;
caHdr *mp;
/* Convert all caHdr into net format.
* The remaining bytes must already be in
* net format, because here we have no clue
* how to convert them.
*/
char *buf;
unsigned long msg_size, num_bytes;
caHdr *mp;
buf = (char *) pclient->send.buf;
num_bytes = pclient->send.stk;
buf = (char *) pclient->send.buf;
num_bytes = pclient->send.stk;
/* convert only if we have at least a complete caHdr */
while (num_bytes >= sizeof(caHdr))
{
mp = (caHdr *) buf;
/* convert only if we have at least a complete caHdr */
while (num_bytes >= sizeof(caHdr))
{
mp = (caHdr *) buf;
msg_size = sizeof (caHdr) + mp->m_postsize;
msg_size = sizeof (caHdr) + mp->m_postsize;
DLOG(3,"CAS: sending cmmd %d, postsize %d\n",
mp->m_cmmd, (int)mp->m_postsize,
0, 0, 0, 0);
DLOG(3,"CAS: sending cmmd %d, postsize %d\n",
mp->m_cmmd, (int)mp->m_postsize,
0, 0, 0, 0);
/* convert the complete header into host format */
mp->m_cmmd = htons (mp->m_cmmd);
mp->m_postsize = htons (mp->m_postsize);
mp->m_dataType = htons (mp->m_dataType);
mp->m_count = htons (mp->m_count);
mp->m_cid = htonl (mp->m_cid);
mp->m_available = htonl (mp->m_available);
/* convert the complete header into host format */
mp->m_cmmd = htons (mp->m_cmmd);
mp->m_postsize = htons (mp->m_postsize);
mp->m_dataType = htons (mp->m_dataType);
mp->m_count = htons (mp->m_count);
mp->m_cid = htonl (mp->m_cid);
mp->m_available = htonl (mp->m_available);
/* get next message: */
buf += msg_size;
num_bytes -= msg_size;
}
/* get next message: */
buf += msg_size;
num_bytes -= msg_size;
}
#endif
status = sendto(
pclient->sock,
pclient->send.buf,
pclient->send.stk,
NULL,
(struct sockaddr *)&pclient->addr,
sizeof(pclient->addr));
if( pclient->send.stk != (unsigned)status){
if(status < 0){
int anerrno;
char buf[64];
status = sendto (pclient->sock, pclient->send.buf, pclient->send.stk, 0,
(struct sockaddr *)&pclient->addr, sizeof(pclient->addr));
if( pclient->send.stk != (unsigned)status) {
if (status < 0) {
int anerrno;
char buf[64];
anerrno = SOCKERRNO;
anerrno = SOCKERRNO;
ipAddrToA (&pclient->addr, buf, sizeof(buf));
ipAddrToA (&pclient->addr, buf, sizeof(buf));
if(pclient->proto == IPPROTO_TCP) {
if( (anerrno!=ECONNABORTED&&
anerrno!=ECONNRESET&&
anerrno!=EPIPE&&
anerrno!=ETIMEDOUT)||
CASDEBUG>2){
if(pclient->proto == IPPROTO_TCP) {
if ( (anerrno!=SOCK_ECONNABORTED&&
anerrno!=SOCK_ECONNRESET&&
anerrno!=SOCK_EPIPE&&
anerrno!=SOCK_ETIMEDOUT)||
CASDEBUG>2){
errlogPrintf(
"CAS: TCP send to \"%s\" failed because \"%s\"\n",
(int)buf,
(int)SOCKERRSTR(anerrno));
}
pclient->disconnect = TRUE;
}
else if (pclient->proto == IPPROTO_UDP) {
errlogPrintf(
"CAS: UDP send to \"%s\" failed because \"%s\"\n",
(int)buf,
(int)SOCKERRSTR(anerrno));
}
else {
assert (0);
}
}
else{
errlogPrintf(
"CAS: blk sock partial send: req %d sent %d \n",
pclient->send.stk,
status);
}
}
errlogPrintf (
"CAS: TCP send to \"%s\" failed because \"%s\"\n",
buf, SOCKERRSTR(anerrno));
}
pclient->disconnect = TRUE;
}
else if (pclient->proto == IPPROTO_UDP) {
errlogPrintf(
"CAS: UDP send to \"%s\" failed because \"%s\"\n",
(int)buf,
(int)SOCKERRSTR(anerrno));
}
else {
assert (0);
}
}
else{
errlogPrintf(
"CAS: blk sock partial send: req %d sent %d \n",
pclient->send.stk,
status);
}
}
pclient->send.stk = 0;
tsStampGetCurrent(&pclient->time_at_last_send);
}
pclient->send.stk = 0;
tsStampGetCurrent (&pclient->time_at_last_send);
}
if(lock_needed){
SEND_UNLOCK(pclient);
}
if(lock_needed){
SEND_UNLOCK(pclient);
}
DLOG(3, "------------------------------\n\n", 0,0,0,0,0,0);
DLOG(3, "------------------------------\n\n", 0,0,0,0,0,0);
return;
return;
}
/*
*
* cas_alloc_msg()
* cas_alloc_msg()
*
* see also ALLOC_MSG()/END_MSG() in server.h
* see also ALLOC_MSG()/END_MSG() in server.h
*
* (allocate space in the outgoing message buffer)
* (allocate space in the outgoing message buffer)
*
* send lock must be on while in this routine
* send lock must be on while in this routine
*
* returns 1) a valid ptr to msg buffer space
* 2) NULL (msg will not fit)
*/
caHdr *cas_alloc_msg(pclient, extsize)
struct client *pclient; /* ptr to per client struct */
unsigned extsize; /* extension size */
* returns 1) a valid ptr to msg buffer space
* 2) NULL (msg will not fit)
*/
caHdr *cas_alloc_msg (struct client *pclient, unsigned extsize)
{
unsigned msgsize;
unsigned newstack;
extsize = CA_MESSAGE_ALIGN(extsize);
unsigned msgsize;
unsigned newstack;
extsize = CA_MESSAGE_ALIGN(extsize);
msgsize = extsize + sizeof(caHdr);
msgsize = extsize + sizeof(caHdr);
newstack = pclient->send.stk + msgsize;
if(newstack > pclient->send.maxstk){
if(pclient->disconnect){
pclient->send.stk = 0;
}
else{
cas_send_msg(pclient, FALSE);
}
newstack = pclient->send.stk + msgsize;
if(newstack > pclient->send.maxstk){
if(pclient->disconnect){
pclient->send.stk = 0;
}
else{
cas_send_msg (pclient, FALSE);
}
newstack = pclient->send.stk + msgsize;
newstack = pclient->send.stk + msgsize;
/*
* If dosnt fit now it never will
*/
if(newstack > pclient->send.maxstk){
return NULL;
}
}
/*
* If dosnt fit now it never will
*/
if(newstack > pclient->send.maxstk){
return NULL;
}
}
/*
* it fits END_MSG will push it on the stack
*/
return (caHdr *) &pclient->send.buf[pclient->send.stk];
/*
* it fits END_MSG will push it on the stack
*/
return (caHdr *) &pclient->send.buf[pclient->send.stk];
}
+578 -402
View File
File diff suppressed because it is too large Load Diff
+164 -310
View File
@@ -1,74 +1,56 @@
/*
* Author: Jeffrey O. Hill
* hill@luke.lanl.gov
* (505) 665 1831
* Date: 5-88
* Author: Jeffrey O. Hill
* hill@luke.lanl.gov
* (505) 665 1831
* Date: 5-88
*
* Experimental Physics and Industrial Control System (EPICS)
* Experimental Physics and Industrial Control System (EPICS)
*
* Copyright 1991, the Regents of the University of California,
* and the University of Chicago Board of Governors.
* Copyright 1991, the Regents of the University of California,
* and the University of Chicago Board of Governors.
*
* This software was produced under U.S. Government contracts:
* (W-7405-ENG-36) at the Los Alamos National Laboratory,
* and (W-31-109-ENG-38) at Argonne National Laboratory.
* This software was produced under U.S. Government contracts:
* (W-7405-ENG-36) at the Los Alamos National Laboratory,
* and (W-31-109-ENG-38) at Argonne National Laboratory.
*
* Initial development by:
* The Controls and Automation Group (AT-8)
* Ground Test Accelerator
* Accelerator Technology Division
* Los Alamos National Laboratory
* Initial development by:
* The Controls and Automation Group (AT-8)
* Ground Test Accelerator
* Accelerator Technology Division
* Los Alamos National Laboratory
*
* Co-developed with
* The Controls and Computing Group
* Accelerator Systems Division
* Advanced Photon Source
* Argonne National Laboratory
* Co-developed with
* The Controls and Computing Group
* Accelerator Systems Division
* Advanced Photon Source
* Argonne National Laboratory
*
* Modification Log:
* -----------------
* .00 joh 030191 Fixed cast server to not block on TCP
* .01 joh 030891 now reuses old client structure
* .02 joh 032091 allways flushes if the client changes and the old
* client has a TCP connection.(bug introduced by .00)
* .03 joh 071291 changes to avoid confusion when a rebooted
* client uses the same port number
* .04 joh 080591 changed printf() to a logMsg()
* .05 joh 082091 tick stamp init in create_udp_client()
* .06 joh 112291 dont change the address until after the flush
* .07 joh 112291 fixed the comments
* .08 joh 021192 better diagnostics
* Improvements
* ------------
* .01
* Dont send channel found message unless there is memory, a task slot,
* and a TCP socket available. Send a diagnostic instead.
* Or ... make the timeout shorter? This is only a problem if
* they persist in trying to make a connection after getting no
* response.
*
* Improvements
* ------------
* .01
* Dont send channel found message unless there is memory, a task slot,
* and a TCP socket available. Send a diagnostic instead.
* Or ... make the timeout shorter? This is only a problem if
* they persist in trying to make a connection after getting no
* response.
*
* Notes:
* ------
* .01
* Replies to broadcasts are not returned over
* an existing TCP connection to avoid a TCP
* pend which could lock up the cast server.
* Notes:
* ------
* .01
* Replies to broadcasts are not returned over
* an existing TCP connection to avoid a TCP
* pend which could lock up the cast server.
*/
static char *sccsId = "@(#) $Id$";
#include <stddef.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <errno.h>
#include "osiSock.h"
#include "tsStamp.h"
#include "os_depen.h"
#include "osiThread.h"
#include "errlog.h"
#include "ellLib.h"
@@ -77,13 +59,60 @@ static char *sccsId = "@(#) $Id$";
#include "envDefs.h"
#include "freeList.h"
#include "server.h"
#include "osiSockResource.h"
#define TIMEOUT 60.0 /* sec */
LOCAL void clean_addrq();
/*
* clean_addrq
*/
LOCAL void clean_addrq()
{
struct channel_in_use *pciu;
struct channel_in_use *pnextciu;
TS_STAMP current;
double delay;
double maxdelay = 0;
unsigned ndelete=0;
double timeout = TIMEOUT;
int s;
tsStampGetCurrent(&current);
semMutexMustTake(prsrv_cast_client->addrqLock);
pnextciu = (struct channel_in_use *)
prsrv_cast_client->addrq.node.next;
while( (pciu = pnextciu) ) {
pnextciu = (struct channel_in_use *)pciu->node.next;
delay = tsStampDiffInSeconds(&current,&pciu->time_at_creation);
if (delay > timeout) {
ellDelete(&prsrv_cast_client->addrq, &pciu->node);
LOCK_CLIENTQ;
s = bucketRemoveItemUnsignedId (
pCaBucket,
&pciu->sid);
if(s){
errMessage (s, "Bad id at close");
}
UNLOCK_CLIENTQ;
freeListFree(rsrvChanFreeList, pciu);
ndelete++;
if(delay>maxdelay) maxdelay = delay;
}
}
semMutexGive(prsrv_cast_client->addrqLock);
# ifdef DEBUG
if(ndelete){
epicsPrintf ("CAS: %d CA channels have expired after %f sec\n",
ndelete, maxdelay);
}
# endif
}
/*
* CAST_SERVER
*
@@ -92,60 +121,60 @@ LOCAL void clean_addrq();
*/
int cast_server(void)
{
struct sockaddr_in sin;
int status;
int count=0;
struct sockaddr_in new_recv_addr;
int recv_addr_size;
unsigned short port;
int nchars;
threadId tid;
struct sockaddr_in sin;
int status;
int count=0;
struct sockaddr_in new_recv_addr;
int recv_addr_size;
unsigned short port;
int nchars;
threadId tid;
taskwdInsert(threadGetIdSelf(),NULL,NULL);
taskwdInsert(threadGetIdSelf(),NULL,NULL);
port = caFetchPortConfig(&EPICS_CA_SERVER_PORT, CA_SERVER_PORT);
port = caFetchPortConfig(&EPICS_CA_SERVER_PORT, CA_SERVER_PORT);
recv_addr_size = sizeof(new_recv_addr);
recv_addr_size = sizeof(new_recv_addr);
if( IOC_cast_sock!=0 && IOC_cast_sock!=ERROR ) {
if( (status = socket_close(IOC_cast_sock)) == ERROR ) {
if( IOC_cast_sock!=0 && IOC_cast_sock!=INVALID_SOCKET ) {
if( (status = socket_close(IOC_cast_sock)) < 0 ) {
epicsPrintf ("CAS: Unable to close master cast socket\n");
}
}
/*
* Open the socket.
* Use ARPA Internet address format and datagram socket.
/*
* Open the socket.
* Use ARPA Internet address format and datagram socket.
*/
if((IOC_cast_sock = socket (AF_INET, SOCK_DGRAM, 0)) == ERROR){
epicsPrintf ("CAS: casts socket creation error\n");
threadSuspend(threadGetIdSelf());
}
if ((IOC_cast_sock = socket (AF_INET, SOCK_DGRAM, 0)) < 0) {
epicsPrintf ("CAS: cast socket creation error\n");
threadSuspend ();
}
/*
* some concern that vxWorks will run out of mBuf's
* if this change is made
*
* joh 11-10-98
*/
/*
* some concern that vxWorks will run out of mBuf's
* if this change is made
*
* joh 11-10-98
*/
#if 0
{
/*
*
* this allows for faster connects by queuing
* additional incomming UDP search frames
*
* this allocates a 32k buffer
* (uses a power of two)
*/
int size = 1u<<15u;
status = setsockopt (IOC_cast_sock, SOL_SOCKET,
SO_RCVBUF, (char *)&size, sizeof(size));
if (status<0) {
epicsPrintf ("CAS: unable to set cast socket size\n");
}
}
{
/*
*
* this allows for faster connects by queuing
* additional incomming UDP search frames
*
* this allocates a 32k buffer
* (uses a power of two)
*/
int size = 1u<<15u;
status = setsockopt (IOC_cast_sock, SOL_SOCKET,
SO_RCVBUF, (char *)&size, sizeof(size));
if (status<0) {
epicsPrintf ("CAS: unable to set cast socket size\n");
}
}
#endif
/* Zero the sock_addr structure */
@@ -155,16 +184,16 @@ int cast_server(void)
sin.sin_port = htons(port);
/* get server's Internet address */
if( bind(IOC_cast_sock, (struct sockaddr *)&sin, sizeof (sin)) == ERROR){
if( bind(IOC_cast_sock, (struct sockaddr *)&sin, sizeof (sin)) < 0){
epicsPrintf ("CAS: cast bind error\n");
socket_close (IOC_cast_sock);
threadSuspend(threadGetIdSelf());
threadSuspend ();
}
/* tell clients we are on line again */
tid = threadCreate("CAonline",threadPriorityChannelAccessClient-3,
threadGetStackSize(threadStackSmall),
(THREADFUNC)rsrv_online_notify_task,0);
threadGetStackSize(threadStackSmall),
(THREADFUNC)rsrv_online_notify_task,0);
if(tid == 0) {
epicsPrintf ("CAS: couldnt start up online notify task because \"%s\"\n",
strerror(errno));
@@ -176,13 +205,16 @@ int cast_server(void)
* possible
*
*/
while(TRUE){
prsrv_cast_client = create_udp_client(IOC_cast_sock);
if(prsrv_cast_client){
while (TRUE) {
prsrv_cast_client = create_base_client ();
if (prsrv_cast_client) {
break;
}
threadSleep(300.0);
threadSleep(300.0);
}
prsrv_cast_client->sock = IOC_cast_sock;
prsrv_cast_client->tid = threadGetIdSelf ();
while (TRUE) {
status = recvfrom (
@@ -195,12 +227,12 @@ int cast_server(void)
if (status<0) {
epicsPrintf ("CAS: UDP recv error (errno=%s)\n",
SOCKERRSTR(SOCKERRNO));
threadSleep(1.0);
threadSleep(1.0);
}
else {
prsrv_cast_client->recv.cnt = (unsigned long) status;
prsrv_cast_client->recv.stk = 0ul;
tsStampGetCurrent(&prsrv_cast_client->time_at_last_recv);
tsStampGetCurrent(&prsrv_cast_client->time_at_last_recv);
/*
* If we are talking to a new client flush to the old one
@@ -209,7 +241,7 @@ int cast_server(void)
*/
if (prsrv_cast_client->send.stk) {
status = memcmp( (void *)&prsrv_cast_client->addr, (void *)&new_recv_addr, recv_addr_size);
if(status){
if(status){
/*
* if the address is different
*/
@@ -221,20 +253,20 @@ int cast_server(void)
prsrv_cast_client->addr = new_recv_addr;
}
if(CASDEBUG>1){
char buf[40];
if (CASDEBUG>1) {
char buf[40];
ipAddrToA (&prsrv_cast_client->addr, buf, sizeof(buf));
epicsPrintf ("CAS: cast server msg of %d bytes from addr %s\n",
errlogPrintf ("CAS: cast server msg of %d bytes from addr %s\n",
prsrv_cast_client->recv.cnt, buf);
}
if(CASDEBUG>2)
count = ellCount(&prsrv_cast_client->addrq);
if (CASDEBUG>2)
count = ellCount (&prsrv_cast_client->addrq);
status = camessage(
prsrv_cast_client,&prsrv_cast_client->recv);
if(status == OK){
if(status == RSRV_OK){
if(prsrv_cast_client->recv.cnt !=
prsrv_cast_client->recv.stk){
char buf[40];
@@ -253,205 +285,27 @@ int cast_server(void)
epicsPrintf ("CAS: invalid (damaged?) UDP request from %s ?\n", buf);
}
if(CASDEBUG>2){
if(ellCount(&prsrv_cast_client->addrq)){
epicsPrintf ("CAS: Fnd %d name matches (%d tot)\n",
if (CASDEBUG>2) {
if ( ellCount (&prsrv_cast_client->addrq) ) {
errlogPrintf ("CAS: Fnd %d name matches (%d tot)\n",
ellCount(&prsrv_cast_client->addrq)-count,
ellCount(&prsrv_cast_client->addrq));
}
}
}
/*
* allow messages to batch up if more are comming
*/
status = socket_ioctl(IOC_cast_sock, FIONREAD, /* sic */(int) &nchars);
if(status == ERROR){
threadSuspend(threadGetIdSelf());
}
if(nchars == 0){
cas_send_msg(prsrv_cast_client, TRUE);
clean_addrq();
}
}
}
/*
* clean_addrq
*
*
*/
#define TIMEOUT 60.0 /* sec */
LOCAL void clean_addrq()
{
struct channel_in_use *pciu;
struct channel_in_use *pnextciu;
TS_STAMP current;
double delay;
double maxdelay = 0;
unsigned ndelete=0;
double timeout = TIMEOUT;
int s;
tsStampGetCurrent(&current);
semMutexMustTake(prsrv_cast_client->addrqLock);
pnextciu = (struct channel_in_use *)
prsrv_cast_client->addrq.node.next;
while( (pciu = pnextciu) ) {
pnextciu = (struct channel_in_use *)pciu->node.next;
delay = tsStampDiffInSeconds(&current,&pciu->time_at_creation);
if (delay > timeout) {
ellDelete(&prsrv_cast_client->addrq, &pciu->node);
LOCK_CLIENTQ;
s = bucketRemoveItemUnsignedId (
pCaBucket,
&pciu->sid);
if(s){
errMessage (s, "Bad id at close");
}
UNLOCK_CLIENTQ;
freeListFree(rsrvChanFreeList, pciu);
ndelete++;
if(delay>maxdelay) maxdelay = delay;
}
}
semMutexGive(prsrv_cast_client->addrqLock);
# ifdef DEBUG
if(ndelete){
epicsPrintf ("CAS: %d CA channels have expired after %f sec\n",
ndelete, maxdelay);
}
# endif
}
/*
* CREATE_UDP_CLIENT
*
*
*/
struct client *create_udp_client(SOCKET sock)
{
struct client *client;
client = freeListMalloc(rsrvClientFreeList);
if(!client){
epicsPrintf ("CAS: no space in pool for a new client\n");
return NULL;
}
if(CASDEBUG>2)
epicsPrintf ("CAS: Creating new udp client\n");
/*
* The following inits to zero done instead of a bfill since the send
* and recv buffers are large and don't need initialization.
*
* memset(client, 0, sizeof(*client));
*/
client->blockSem = semBinaryCreate(semEmpty);
if(!client->blockSem){
freeListFree(rsrvClientFreeList, client);
return NULL;
}
/*
* user name initially unknown
*/
client->pUserName = malloc(1);
if(!client->pUserName){
semBinaryDestroy(client->blockSem);
freeListFree(rsrvClientFreeList, client);
return NULL;
}
client->pUserName[0] = '\0';
/*
* host name initially unknown
*/
client->pHostName = malloc(1);
if(!client->pHostName){
semBinaryDestroy(client->blockSem);
free(client->pUserName);
freeListFree(rsrvClientFreeList, client);
return NULL;
}
client->pHostName[0] = '\0';
ellInit(&client->addrq);
ellInit(&client->putNotifyQue);
memset((char *)&client->addr, 0, sizeof(client->addr));
client->tid = threadGetIdSelf();
client->send.stk = 0ul;
client->send.cnt = 0ul;
client->recv.stk = 0ul;
client->recv.cnt = 0ul;
client->evuser = NULL;
client->disconnect = FALSE; /* for TCP only */
tsStampGetCurrent(&client->time_at_last_send);
tsStampGetCurrent(&client->time_at_last_recv);
client->proto = IPPROTO_UDP;
client->sock = sock;
client->minor_version_number = CA_UKN_MINOR_VERSION;
client->send.maxstk = MAX_UDP;
client->lock = semMutexMustCreate();
client->putNotifyLock = semMutexMustCreate();
client->addrqLock = semMutexMustCreate();
client->eventqLock = semMutexMustCreate();
client->recv.maxstk = ETHERNET_MAX_UDP;
return client;
}
/*
* UDP_TO_TCP
*
* send lock must be applied
*
*/
int udp_to_tcp(
struct client *client,
SOCKET sock
)
{
int status;
int addrSize;
if(CASDEBUG>2){
epicsPrintf ("CAS: converting udp client to tcp\n");
}
client->proto = IPPROTO_TCP;
client->send.maxstk = MAX_TCP;
client->recv.maxstk = MAX_TCP;
client->sock = sock;
client->tid = threadGetIdSelf();
addrSize = sizeof(client->addr);
status = getpeername(
sock,
(struct sockaddr *)&client->addr,
&addrSize);
if(status == ERROR){
epicsPrintf ("CAS: peer address fetch failed\n");
return ERROR;
/*
* allow messages to batch up if more are comming
*/
status = socket_ioctl(IOC_cast_sock, FIONREAD, &nchars);
if (status<0) {
errlogPrintf ("CA cast server: Unable to fetch N characters pending\n");
cas_send_msg (prsrv_cast_client, TRUE);
clean_addrq ();
}
return OK;
}
else if (nchars == 0) {
cas_send_msg (prsrv_cast_client, TRUE);
clean_addrq ();
}
}
}
+103 -102
View File
@@ -1,74 +1,65 @@
/*
* O N L I N E _ N O T I F Y . C
* $Id$
*
* tell CA clients this a server has joined the network
* tell CA clients this a server has joined the network
*
* Author: Jeffrey O. Hill
* hill@luke.lanl.gov
* (505) 665 1831
* Date: 103090
* Author: Jeffrey O. Hill
* hill@luke.lanl.gov
* (505) 665 1831
* Date: 103090
*
* Experimental Physics and Industrial Control System (EPICS)
* Experimental Physics and Industrial Control System (EPICS)
*
* Copyright 1991, the Regents of the University of California,
* and the University of Chicago Board of Governors.
* Copyright 1991, the Regents of the University of California,
* and the University of Chicago Board of Governors.
*
* This software was produced under U.S. Government contracts:
* (W-7405-ENG-36) at the Los Alamos National Laboratory,
* and (W-31-109-ENG-38) at Argonne National Laboratory.
* This software was produced under U.S. Government contracts:
* (W-7405-ENG-36) at the Los Alamos National Laboratory,
* and (W-31-109-ENG-38) at Argonne National Laboratory.
*
* Initial development by:
* The Controls and Automation Group (AT-8)
* Ground Test Accelerator
* Accelerator Technology Division
* Los Alamos National Laboratory
* Initial development by:
* The Controls and Automation Group (AT-8)
* Ground Test Accelerator
* Accelerator Technology Division
* Los Alamos National Laboratory
*
* Co-developed with
* The Controls and Computing Group
* Accelerator Systems Division
* Advanced Photon Source
* Argonne National Laboratory
* Co-developed with
* The Controls and Computing Group
* Accelerator Systems Division
* Advanced Photon Source
* Argonne National Laboratory
*
* History
* .00 joh 021192 better diagnostics
*/
static char *sccsId = "@(#) $Id$";
#include <stddef.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#define MAX_BLOCK_THRESHOLD 100000
/*
* EPICS includes
* EPICS includes
*/
#include "osiSock.h"
#include "osiThread.h"
#include "osiPoolStatus.h"
#include "tsStamp.h"
#include "errlog.h"
#include "envDefs.h"
#include "server.h"
/*
* RSRV_ONLINE_NOTIFY_TASK
* RSRV_ONLINE_NOTIFY_TASK
*/
int rsrv_online_notify_task()
{
caAddrNode *pNode;
double delay;
double maxdelay;
double delay;
double maxdelay;
long longStatus;
double maxPeriod;
caHdr msg;
struct sockaddr_in recv_addr;
SOCKET status;
int status;
SOCKET sock;
int true = TRUE;
unsigned short port;
@@ -76,23 +67,16 @@ int rsrv_online_notify_task()
taskwdInsert(threadGetIdSelf(),NULL,NULL);
longStatus = envGetDoubleConfigParam (
&EPICS_CA_BEACON_PERIOD,
&maxPeriod);
&EPICS_CA_BEACON_PERIOD, &maxPeriod);
if (longStatus || maxPeriod<=0.0) {
maxPeriod = 15.0;
epicsPrintf (
"EPICS \"%s\" float fetch failed\n",
EPICS_CA_BEACON_PERIOD.name);
epicsPrintf (
"Setting \"%s\" = %f\n",
EPICS_CA_BEACON_PERIOD.name,
maxPeriod);
epicsPrintf ("EPICS \"%s\" float fetch failed\n",
EPICS_CA_BEACON_PERIOD.name);
epicsPrintf ("Setting \"%s\" = %f\n",
EPICS_CA_BEACON_PERIOD.name, maxPeriod);
}
/*
* 1/50 second initial delay between beacons
*/
delay = .02;
delay = 0.02; /* initial beacon period in sec */
maxdelay = maxPeriod;
/*
@@ -100,34 +84,37 @@ int rsrv_online_notify_task()
* Use ARPA Internet address format and datagram socket.
* Format described in <sys/socket.h>.
*/
if((sock = socket (AF_INET, SOCK_DGRAM, 0)) == ERROR){
errlogPrintf("CAS: online socket creation error\n");
abort();
if( (sock = socket (AF_INET, SOCK_DGRAM, 0)) == SOCKET_ERROR){
errlogPrintf ("CAS: online socket creation error\n");
threadSuspend ();
}
status = setsockopt( sock,
SOL_SOCKET,
SO_BROADCAST,
(char *)&true,
sizeof(true));
if(status<0){
abort();
status = setsockopt (sock, SOL_SOCKET, SO_BROADCAST,
(char *)&true, sizeof(true));
if (status<0) {
errlogPrintf ("CAS: online socket set up error\n");
threadSuspend ();
}
memset((char *)&recv_addr, 0, sizeof recv_addr);
recv_addr.sin_family = AF_INET;
recv_addr.sin_addr.s_addr = htonl(INADDR_ANY); /* let slib pick lcl addr */
recv_addr.sin_port = htons(0); /* let slib pick port */
status = bind(sock, (struct sockaddr *)&recv_addr, sizeof recv_addr);
if(status<0)
abort();
#if 0
{
struct sockaddr_in recv_addr;
memset((char *)&recv_addr, 0, sizeof recv_addr);
recv_addr.sin_family = AF_INET;
recv_addr.sin_addr.s_addr = htonl(INADDR_ANY); /* let slib pick lcl addr */
recv_addr.sin_port = htons(0); /* let slib pick port */
status = bind(sock, (struct sockaddr *)&recv_addr, sizeof recv_addr);
if(status<0)
abort();
}
#endif
memset((char *)&msg, 0, sizeof msg);
msg.m_cmmd = htons (CA_PROTO_RSRV_IS_UP);
msg.m_count = htons (ca_server_port);
msg.m_available = htonl (INADDR_ANY);
ellInit(&beaconAddrList);
ellInit (&beaconAddrList);
/*
* load user and auto configured
@@ -137,49 +124,63 @@ int rsrv_online_notify_task()
caSetupBCastAddrList (&beaconAddrList, sock, port);
# ifdef DEBUG
caPrintAddrList(&beaconAddrList);
caPrintAddrList (&beaconAddrList);
# endif
while(TRUE){
int maxBlock;
while (TRUE) {
/*
* check max block and disable new channels
* if its to small
* check to see if we are running low on memory
* and disable new channels if so
*/
maxBlock = memFindMax();
if(maxBlock<MAX_BLOCK_THRESHOLD){
casBelowMaxBlockThresh = TRUE;
}
else{
casBelowMaxBlockThresh = FALSE;
}
pNode = (caAddrNode *) beaconAddrList.node.next;
while(pNode){
status = sendto(
sock,
(char *)&msg,
sizeof(msg),
0,
&pNode->destAddr.sa,
sizeof(pNode->destAddr.sa));
if(status < 0){
errlogPrintf( "%s: CA beacon error was \"%s\"\n",
(int) __FILE__,
(int) SOCKERRSTR(SOCKERRNO));
casSufficentSpaceInPool = osiSufficentSpaceInPool ();
pNode = (caAddrNode *) ellFirst (&beaconAddrList);
while (pNode) {
char buf[64];
status = connect (sock, &pNode->destAddr.sa, sizeof(pNode->destAddr.sa));
if (status<0) {
ipAddrToA (&pNode->destAddr.in, buf, sizeof(buf));
errlogPrintf ( "%s: CA beacon routing (connect to \"%s\") error was \"%s\"\n",
__FILE__, buf, SOCKERRSTR(SOCKERRNO));
}
else{
assert(status == sizeof(msg));
else {
struct sockaddr_in if_addr;
int size = sizeof (if_addr);
status = getsockname (sock, (struct sockaddr *) &if_addr, &size);
if (status<0) {
errlogPrintf ( "%s: CA beacon routing (getsockname) error was \"%s\"\n",
__FILE__, SOCKERRSTR(SOCKERRNO));
}
else if (if_addr.sin_family==AF_INET) {
msg.m_available = if_addr.sin_addr.s_addr;
ipAddrToA (&if_addr, buf, sizeof(buf));
printf ("**** Setting local address to \"%s\" - this may not work correctly ****\n", buf);
status = send (sock, (char *)&msg, sizeof(msg), 0);
if (status < 0) {
ipAddrToA (&pNode->destAddr.in, buf, sizeof(buf));
errlogPrintf ( "%s: CA beacon (send to \"%s\") error was \"%s\"\n",
__FILE__, buf, SOCKERRSTR(SOCKERRNO));
}
else {
assert (status == sizeof(msg));
}
}
}
pNode = (caAddrNode *)pNode->node.next;
}
threadSleep(delay);
if (delay<maxdelay) {
delay *= 2.0;
if(delay>maxdelay) delay = maxdelay;
}
if (delay<maxdelay) {
delay *= 2.0;
if (delay>maxdelay) {
delay = maxdelay;
}
}
}
}
+126 -166
View File
@@ -1,42 +1,29 @@
/*
* Author: Jeffrey O. Hill
* hill@luke.lanl.gov
* (505) 665 1831
* Date: 5-88
* Author: Jeffrey O. Hill
* hill@luke.lanl.gov
* (505) 665 1831
* Date: 5-88
*
* Experimental Physics and Industrial Control System (EPICS)
* Experimental Physics and Industrial Control System (EPICS)
*
* Copyright 1991, the Regents of the University of California,
* and the University of Chicago Board of Governors.
* Copyright 1991, the Regents of the University of California,
* and the University of Chicago Board of Governors.
*
* This software was produced under U.S. Government contracts:
* (W-7405-ENG-36) at the Los Alamos National Laboratory,
* and (W-31-109-ENG-38) at Argonne National Laboratory.
* This software was produced under U.S. Government contracts:
* (W-7405-ENG-36) at the Los Alamos National Laboratory,
* and (W-31-109-ENG-38) at Argonne National Laboratory.
*
* Initial development by:
* The Controls and Automation Group (AT-8)
* Ground Test Accelerator
* Accelerator Technology Division
* Los Alamos National Laboratory
* Initial development by:
* The Controls and Automation Group (AT-8)
* Ground Test Accelerator
* Accelerator Technology Division
* Los Alamos National Laboratory
*
* Co-developed with
* The Controls and Computing Group
* Accelerator Systems Division
* Advanced Photon Source
* Argonne National Laboratory
*
* Modification Log:
* -----------------
* .01 joh 060691 removed 4 byte count from the beginning of
* of each message
* .02 joh 060791 moved send_msg stuff into caserverio.c
* .03 joh 071291 moved time stamp from client to the
* channel in use block
* .04 joh 071591 added ticks at last io to the client structure
* .05 joh 103191 moved lock from msg buf to client structure
* .06 joh 050692 added declaration for cac_send_heartbeat()
* .07 joh 022492 added get flag to the event ext block
* .08 joh 090893 added sid field to channel in use block
* Co-developed with
* The Controls and Computing Group
* Accelerator Systems Division
* Advanced Photon Source
* Argonne National Laboratory
*
*/
@@ -51,8 +38,6 @@ static char *serverhSccsId = "@(#) $Id$";
# define HDRVERSIONID(NAME,VERS)
#endif /*CAS_VERSION_GLOBAL*/
#define APIENTRY
#include "epicsAssert.h"
#include "ellLib.h"
@@ -73,6 +58,9 @@ static char *serverhSccsId = "@(#) $Id$";
#include "asLib.h"
#include "asDbLib.h"
#define RSRV_OK 0
#define RSRV_ERROR (-1)
/*
* !! buf must be the first item in this structure !!
* This guarantees that buf will have 8 byte natural
@@ -101,70 +89,69 @@ static char *serverhSccsId = "@(#) $Id$";
* o four unsigned longs also take up a multiple of 8 bytes
* (usually 2).
* NOTE:
* o we should solve the above message alignment problems by
* allocating the message buffers
* o we should solve the above message alignment problems by
* allocating the message buffers
*
*/
struct message_buffer{
char buf[MAX_MSG_SIZE];
unsigned long stk;
unsigned long maxstk;
unsigned long cnt;
unsigned long pad0; /* force 8 byte alignement */
struct message_buffer {
char buf[MAX_MSG_SIZE];
unsigned long stk;
unsigned long maxstk;
unsigned long cnt;
unsigned long pad0; /* force 8 byte alignement */
};
struct client{
ELLNODE node;
struct message_buffer send;
struct message_buffer recv;
semMutexId lock;
semMutexId putNotifyLock;
semMutexId addrqLock;
semMutexId eventqLock;
ELLLIST addrq;
ELLLIST putNotifyQue;
struct sockaddr_in addr;
TS_STAMP time_at_last_send;
TS_STAMP time_at_last_recv;
void *evuser;
char *pUserName;
char *pHostName;
semBinaryId blockSem; /* used whenever the client blocks */
SOCKET sock;
int proto;
threadId tid;
unsigned minor_version_number;
char disconnect; /* disconnect detected */
struct client {
ELLNODE node;
struct message_buffer send;
struct message_buffer recv;
semMutexId lock;
semMutexId putNotifyLock;
semMutexId addrqLock;
semMutexId eventqLock;
ELLLIST addrq;
ELLLIST putNotifyQue;
struct sockaddr_in addr;
TS_STAMP time_at_last_send;
TS_STAMP time_at_last_recv;
void *evuser;
char *pUserName;
char *pHostName;
semBinaryId blockSem; /* used whenever the client blocks */
SOCKET sock;
int proto;
threadId tid;
unsigned minor_version_number;
char disconnect; /* disconnect detected */
};
/*
* for tracking db put notifies
*/
typedef struct rsrv_put_notify{
ELLNODE node;
PUTNOTIFY dbPutNotify;
caHdr msg;
unsigned long valueSize; /* size of block pointed to by dbPutNotify */
int busy; /* put notify in progress */
}RSRVPUTNOTIFY;
typedef struct rsrv_put_notify {
ELLNODE node;
PUTNOTIFY dbPutNotify;
caHdr msg;
unsigned long valueSize; /* size of block pointed to by dbPutNotify */
int busy; /* put notify in progress */
} RSRVPUTNOTIFY;
/*
* per channel structure
* (stored in addrq off of a client block)
*/
struct channel_in_use{
ELLNODE node;
ELLLIST eventq;
struct client *client;
RSRVPUTNOTIFY *pPutNotify; /* potential active put notify */
const unsigned cid; /* client id */
const unsigned sid; /* server id */
TS_STAMP time_at_creation; /* for UDP timeout */
struct dbAddr addr;
ASCLIENTPVT asClientPVT;
struct channel_in_use {
ELLNODE node;
ELLLIST eventq;
struct client *client;
RSRVPUTNOTIFY *pPutNotify; /* potential active put notify */
const unsigned cid; /* client id */
const unsigned sid; /* server id */
TS_STAMP time_at_creation; /* for UDP timeout */
struct dbAddr addr;
ASCLIENTPVT asClientPVT;
};
@@ -172,70 +159,55 @@ struct channel_in_use{
* Event block extension for channel access
* some things duplicated for speed
*/
struct event_ext{
ELLNODE node;
caHdr msg;
struct channel_in_use *pciu;
struct event_block *pdbev; /* ptr to db event block */
unsigned size; /* for speed */
unsigned mask;
char modified; /* mod & ev flw ctrl enbl */
char send_lock; /* lock send buffer */
struct event_ext {
ELLNODE node;
caHdr msg;
struct channel_in_use *pciu;
struct event_block *pdbev; /* ptr to db event block */
unsigned size; /* for speed */
unsigned mask;
char modified; /* mod & ev flw ctrl enbl */
char send_lock; /* lock send buffer */
};
/* NOTE: external used so they remember the state across loads */
#ifdef GLBLSOURCE
# define GLBLTYPE
# define GLBLTYPE_INIT(A)
/* NOTE: external used so they remember the state across loads */
#ifdef GLBLSOURCE
# define GLBLTYPE
# define GLBLTYPE_INIT(A)
#else
# define GLBLTYPE extern
# define GLBLTYPE_INIT(A)
# define GLBLTYPE extern
# define GLBLTYPE_INIT(A)
#endif
/*
* for debug-level dependent messages:
* for debug-level dependent messages:
*/
#ifdef DEBUG
# define DLOG(level, fmt, a1, a2, a3, a4, a5, a6) \
if (CASDEBUG > level) \
errlogPrintf (fmt, a1, a2, a3, a4, a5, a6)
# define DBLOCK(level, code) \
if (CASDEBUG > level) \
{ \
code; \
}
# define DLOG(level, fmt, a1, a2, a3, a4, a5, a6) \
if (CASDEBUG > level) errlogPrintf (fmt, a1, a2, a3, a4, a5, a6)
# define DBLOCK(level, code) \
if (CASDEBUG > level) { code; }
#else
# define DLOG(level, fmt, a1, a2, a3, a4, a5, a6)
# define DBLOCK(level, code)
# define DLOG(level, fmt, a1, a2, a3, a4, a5, a6)
# define DBLOCK(level, code)
#endif
GLBLTYPE int CASDEBUG;
GLBLTYPE SOCKET IOC_sock;
GLBLTYPE SOCKET IOC_cast_sock;
GLBLTYPE unsigned short ca_server_port;
GLBLTYPE ELLLIST clientQ; /* locked by clientQlock */
GLBLTYPE ELLLIST beaconAddrList;
GLBLTYPE semMutexId clientQlock;
GLBLTYPE struct client *prsrv_cast_client;
GLBLTYPE BUCKET *pCaBucket;
GLBLTYPE void *rsrvClientFreeList;
GLBLTYPE void *rsrvChanFreeList;
GLBLTYPE void *rsrvEventFreeList;
GLBLTYPE int CASDEBUG;
GLBLTYPE SOCKET IOC_sock;
GLBLTYPE SOCKET IOC_cast_sock;
GLBLTYPE unsigned short ca_server_port;
GLBLTYPE ELLLIST clientQ; /* locked by clientQlock */
GLBLTYPE ELLLIST beaconAddrList;
GLBLTYPE semMutexId clientQlock;
GLBLTYPE struct client *prsrv_cast_client;
GLBLTYPE BUCKET *pCaBucket;
GLBLTYPE void *rsrvClientFreeList;
GLBLTYPE void *rsrvChanFreeList;
GLBLTYPE void *rsrvEventFreeList;
#define CAS_HASH_TABLE_SIZE 4096
/*
* set true if max memory block drops below MAX_BLOCK_THRESHOLD
*/
#define MAX_BLOCK_THRESHOLD 100000
GLBLTYPE int casBelowMaxBlockThresh;
GLBLTYPE int casSufficentSpaceInPool;
#define SEND_LOCK(CLIENT) semMutexMustTake((CLIENT)->lock)
#define SEND_UNLOCK(CLIENT) semMutexGive((CLIENT)->lock)
@@ -244,48 +216,36 @@ GLBLTYPE int casBelowMaxBlockThresh;
((caHdr *) &(CLIENT)->send.buf[(CLIENT)->send.stk])
/*
* ALLOC_MSG get a ptr to space in the buffer
* END_MSG push a message onto the buffer stack
* ALLOC_MSG get a ptr to space in the buffer
* END_MSG push a message onto the buffer stack
*
*/
#define ALLOC_MSG(CLIENT, EXTSIZE) cas_alloc_msg(CLIENT, EXTSIZE)
#define ALLOC_MSG(CLIENT, EXTSIZE) cas_alloc_msg (CLIENT, EXTSIZE)
#define END_MSG(CLIENT)\
EXTMSGPTR(CLIENT)->m_postsize = CA_MESSAGE_ALIGN(EXTMSGPTR(CLIENT)->m_postsize),\
(CLIENT)->send.stk += sizeof(caHdr) + EXTMSGPTR(CLIENT)->m_postsize
#define LOCK_CLIENTQ semMutexMustTake (clientQlock);
#define UNLOCK_CLIENTQ semMutexGive (clientQlock);
#define LOCK_CLIENTQ semMutexMustTake(clientQlock)
#define UNLOCK_CLIENTQ semMutexGive(clientQlock)
struct client *existing_client();
int camsgtask();
void cas_send_msg();
caHdr *cas_alloc_msg();
int rsrv_online_notify_task();
void cac_send_heartbeat();
int client_stat(unsigned level);
void casr(unsigned level);
int req_server(void);
int cast_server(void);
int free_client(struct client *client);
struct client *create_udp_client(SOCKET sock);
int udp_to_tcp(struct client *client, SOCKET sock);
int camessage(
struct client *client,
struct message_buffer *recv
);
void cas_send_heartbeat(
struct client *pc
);
void write_notify_reply(void *pArg);
int client_stat (unsigned level);
void casr (unsigned level);
void camsgtask (struct client *client);
void cas_send_msg (struct client *pclient, int lock_needed);
caHdr *cas_alloc_msg (struct client *pclient, unsigned extsize);
int rsrv_online_notify_task (void);
void cac_send_heartbeat (void);
int cast_server (void);
struct client *create_base_client ();
int camessage (struct client *client,
struct message_buffer *recv);
void cas_send_heartbeat (struct client *pc);
void write_notify_reply (void *pArg);
int rsrvCheckPut (const struct channel_in_use *pciu);
struct client *create_client (SOCKET sock);
void destroy_client (struct client *client);
/*
* !!KLUDGE!!
@@ -294,7 +254,7 @@ int rsrvCheckPut (const struct channel_in_use *pciu);
* to include both dbAccess.h and db_access.h at the
* same time.
*/
#define S_db_Blocked (M_dbAccess|39) /*Request is Blocked*/
#define S_db_Blocked (M_dbAccess|39) /*Request is Blocked*/
#define S_db_Pending (M_dbAccess|37) /*Request is pending*/
#endif /*INCLserverh*/