rsrv: redo initialization to support bind multiple interfaces.

consolidate most setup tasks in rsrv_init(),
which now spawns threads directly.
For each interface create 3-4 sockets,

* TCP listener
* UDP receiver (unicast)
* UDP receiver (broadcast optional)
* UDP beacon sender
This commit is contained in:
Michael Davidsaver
2016-01-11 20:59:07 -05:00
parent 8d5815ac95
commit 15307c4db6
13 changed files with 408 additions and 437 deletions
+368 -130
View File
@@ -59,83 +59,12 @@ epicsThreadPrivateId rsrvCurrentClient;
*/
static void req_server (void *pParm)
{
unsigned priorityOfSelf = epicsThreadGetPrioritySelf ();
unsigned priorityOfBeacons;
epicsThreadBooleanStatus tbs;
osiSockAddrNode *pNode;
struct sockaddr_in serverAddr; /* server's address */
osiSocklen_t addrSize = (osiSocklen_t) sizeof(struct sockaddr_in);
int status;
rsrv_iface_config *conf = pParm;
SOCKET IOC_sock;
epicsThreadId tid;
int portChange;
taskwdInsert ( epicsThreadGetIdSelf (), NULL, NULL );
assert (ellCount(&casIntfAddrList)>0);
pNode = (osiSockAddrNode *) ellFirst ( &casIntfAddrList );
memcpy ( &serverAddr, &pNode->addr.ia, addrSize );
/*
* Open the socket. Use ARPA Internet address format and stream
* sockets. Format described in <sys/socket.h>.
*/
if ( ( IOC_sock = epicsSocketCreate (AF_INET, SOCK_STREAM, 0) ) == INVALID_SOCKET ) {
errlogPrintf ("CAS: Socket creation error\n");
epicsThreadSuspendSelf ();
}
epicsSocketEnableAddressReuseDuringTimeWaitState ( IOC_sock );
/* get server's Internet address */
status = bind(IOC_sock, (struct sockaddr *) &serverAddr, addrSize);
if ( status < 0 ) {
if ( SOCKERRNO == SOCK_EADDRINUSE ) {
/*
* enable assignment of a default port
* (so the getsockname() call below will
* work correctly)
*/
serverAddr.sin_port = ntohs (0);
status = bind(IOC_sock, (struct sockaddr *) &serverAddr, addrSize);
}
if ( status < 0 ) {
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
errlogPrintf ( "CAS: Socket bind error was \"%s\"\n",
sockErrBuf );
epicsThreadSuspendSelf ();
}
portChange = 1;
}
else {
portChange = 0;
}
status = getsockname ( IOC_sock,
(struct sockaddr *)&serverAddr, &addrSize);
if ( status ) {
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
errlogPrintf ( "CAS: getsockname() error %s\n",
sockErrBuf );
epicsThreadSuspendSelf ();
}
ca_server_port = ntohs (serverAddr.sin_port);
if ( portChange ) {
errlogPrintf ( "cas warning: Configured TCP port was unavailable.\n");
errlogPrintf ( "cas warning: Using dynamically assigned TCP port %hu,\n",
ca_server_port );
errlogPrintf ( "cas warning: but now two or more servers share the same UDP port.\n");
errlogPrintf ( "cas warning: Depending on your IP kernel this server may not be\n" );
errlogPrintf ( "cas warning: reachable with UDP unicast (a host's IP in EPICS_CA_ADDR_LIST)\n" );
}
IOC_sock = conf->tcp;
/* listen and accept new connections */
if ( listen ( IOC_sock, 20 ) < 0 ) {
@@ -148,22 +77,6 @@ static void req_server (void *pParm)
epicsThreadSuspendSelf ();
}
tbs = epicsThreadHighestPriorityLevelBelow ( priorityOfSelf, &priorityOfBeacons );
if ( tbs != epicsThreadBooleanStatusSuccess ) {
priorityOfBeacons = priorityOfSelf;
}
beacon_startStopEvent = epicsEventMustCreate(epicsEventEmpty);
beacon_ctl = ctlPause;
tid = epicsThreadCreate ( "CAS-beacon", priorityOfBeacons,
epicsThreadGetStackSize (epicsThreadStackSmall),
rsrv_online_notify_task, 0 );
if ( tid == 0 ) {
epicsPrintf ( "CAS: unable to start beacon thread\n" );
}
epicsEventMustWait(beacon_startStopEvent);
epicsEventSignal(castcp_startStopEvent);
while (TRUE) {
@@ -216,6 +129,129 @@ static void req_server (void *pParm)
}
}
static
int tryBind(SOCKET sock, const osiSockAddr* addr, const char *name)
{
if(bind(sock, &addr->ia, sizeof(*addr))<0) {
char sockErrBuf[64];
if(errno!=SOCK_EADDRINUSE)
{
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
errlogPrintf ( "CAS: %s bind error: \"%s\"\n",
name, sockErrBuf );
epicsThreadSuspendSelf ();
}
return -1;
} else
return 0;
}
/* need to collect a set of TCP sockets, one for each interface,
* which are bound to the same TCP port number.
* Needed to avoid the complications and confusion of different TCP
* ports for each interface (name server and beacon sender would need
* to know this).
*/
static
SOCKET* rsrv_grap_tcp(unsigned short *port)
{
SOCKET *socks;
osiSockAddr scratch;
socks = mallocMustSucceed(ellCount(&casIntfAddrList)*sizeof(*socks), "rsrv_grap_tcp");
/* start with preferred port */
memset(&scratch, 0, sizeof(scratch));
scratch.ia.sin_family = AF_INET;
scratch.ia.sin_port = htons(*port);
while(1) {
ELLNODE *cur;
unsigned i, ok = 1;
for(i=0; i<ellCount(&casIntfAddrList); i++)
socks[i] = INVALID_SOCKET;
for (i=0, cur=ellFirst(&casIntfAddrList); cur; i++, cur=ellNext(cur))
{
SOCKET tcpsock;
osiSockAddr ifaceAddr = ((osiSockAddrNode *)cur)->addr;
scratch.ia.sin_addr = ifaceAddr.ia.sin_addr;
tcpsock = socks[i] = epicsSocketCreate (AF_INET, SOCK_STREAM, 0);
if(tcpsock==INVALID_SOCKET)
cantProceed("rsrv ran out of sockets during initialization");
epicsSocketEnableAddressReuseDuringTimeWaitState ( tcpsock );
if(bind(tcpsock, &scratch.sa, sizeof(scratch))==0) {
if(scratch.ia.sin_port==0) {
/* use first socket to pick a random port */
assert(i==0);
osiSocklen_t alen = sizeof(ifaceAddr);
if(getsockname(tcpsock, &ifaceAddr.sa, &alen)) {
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
errlogPrintf ( "CAS: getsockname error was \"%s\"\n",
sockErrBuf );
epicsThreadSuspendSelf ();
ok = 0;
break;
}
scratch.ia.sin_port = ifaceAddr.ia.sin_port;
assert(scratch.ia.sin_port!=0);
}
} else {
/* bind fails. React harshly to unexpected errors to avoid an infinite loop */
if(errno==SOCK_EADDRNOTAVAIL) {
char name[40];
ipAddrToDottedIP(&scratch.ia, name, sizeof(name));
printf("Skipping %s which is not an interface address\n", name);
ellDelete(&casIntfAddrList, cur);
free(cur);
ok = 0;
break;
}
if(errno!=SOCK_EADDRINUSE && errno!=SOCK_EADDRNOTAVAIL) {
char name[40];
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
ipAddrToDottedIP(&scratch.ia, name, sizeof(name));
errlogPrintf ( "CAS: Socket bind %s error was \"%s\"\n",
name, sockErrBuf );
epicsThreadSuspendSelf ();
}
ok = 0;
break;
}
}
if (ok) {
assert(scratch.ia.sin_port!=0);
*port = ntohs(scratch.ia.sin_port);
break;
} else {
for(i=0; i<ellCount(&casIntfAddrList); i++) {
/* cleanup any ports actually bound */
if(socks[i]!=INVALID_SOCKET) {
epicsSocketDestroy(socks[i]);
socks[i] = INVALID_SOCKET;
}
}
scratch.ia.sin_port=0; /* next iteration starts with a random port */
}
}
return socks;
}
static dbServer rsrv_server = {
ELLNODE_INIT,
"rsrv",
@@ -229,16 +265,14 @@ static dbServer rsrv_server = {
*/
int rsrv_init (void)
{
epicsThreadBooleanStatus tbs;
unsigned priorityOfConnectDaemon;
epicsThreadId tid;
long maxBytesAsALong;
long status;
unsigned short udp_port, beacon_port;
SOCKET *socks;
clientQlock = epicsMutexMustCreate();
ellInit ( &clientQ );
ellInit ( &clientQudp );
freeListInitPvt ( &rsrvClientFreeList, sizeof(struct client), 8 );
freeListInitPvt ( &rsrvChanFreeList, sizeof(struct channel_in_use), 512 );
freeListInitPvt ( &rsrvEventFreeList, sizeof(struct event_ext), 512 );
@@ -259,6 +293,16 @@ int rsrv_init (void)
ca_server_port = envGetInetPortConfigParam ( &EPICS_CA_SERVER_PORT,
(unsigned short) CA_SERVER_PORT );
}
udp_port = ca_server_port;
if (envGetConfigParamPtr(&EPICS_CAS_BEACON_PORT)) {
beacon_port = envGetInetPortConfigParam (&EPICS_CAS_BEACON_PORT,
(unsigned short) CA_REPEATER_PORT );
}
else {
beacon_port = envGetInetPortConfigParam (&EPICS_CA_REPEATER_PORT,
(unsigned short) CA_REPEATER_PORT );
}
status = envGetLongConfigParam ( &EPICS_CA_MAX_ARRAY_BYTES, &maxBytesAsALong );
if ( status || maxBytesAsALong < 0 ) {
@@ -301,35 +345,227 @@ int rsrv_init (void)
}
castcp_startStopEvent = epicsEventMustCreate(epicsEventEmpty);
casudp_startStopEvent = epicsEventMustCreate(epicsEventEmpty);
beacon_startStopEvent = epicsEventMustCreate(epicsEventEmpty);
castcp_ctl = ctlPause;
/*
* go down two levels so that we are below
* the TCP and event threads started on behalf
* of individual clients
/* Thread priorites
* Now starting per interface
* TCP Listener: epicsThreadPriorityCAServerLow-2
* Name receiver: epicsThreadPriorityCAServerLow-4
* Now starting global
* Beacon sender: epicsThreadPriorityCAServerLow-3
* Started later per TCP client
* TCP receiver: epicsThreadPriorityCAServerLow
* TCP sender : epicsThreadPriorityCAServerLow-1
*/
tbs = epicsThreadHighestPriorityLevelBelow (
epicsThreadPriorityCAServerLow, &priorityOfConnectDaemon );
if ( tbs == epicsThreadBooleanStatusSuccess ) {
tbs = epicsThreadHighestPriorityLevelBelow (
priorityOfConnectDaemon, &priorityOfConnectDaemon );
if ( tbs != epicsThreadBooleanStatusSuccess ) {
priorityOfConnectDaemon = epicsThreadPriorityCAServerLow;
{
unsigned i;
threadPrios[0] = epicsThreadPriorityCAServerLow;
for(i=1; i<NELEMENTS(threadPrios); i++)
{
if(epicsThreadBooleanStatusSuccess!=epicsThreadHighestPriorityLevelBelow(
threadPrios[i-1], &threadPrios[i]))
{
/* on failure use the lowest known */
threadPrios[i] = threadPrios[i-1];
}
}
}
else {
priorityOfConnectDaemon = epicsThreadPriorityCAServerLow;
{
unsigned short sport = ca_server_port;
socks = rsrv_grap_tcp(&sport);
if ( sport != ca_server_port ) {
ca_server_port = sport;
errlogPrintf ( "cas warning: Configured TCP port was unavailable.\n");
errlogPrintf ( "cas warning: Using dynamically assigned TCP port %hu,\n",
ca_server_port );
errlogPrintf ( "cas warning: but now two or more servers share the same UDP port.\n");
errlogPrintf ( "cas warning: Depending on your IP kernel this server may not be\n" );
errlogPrintf ( "cas warning: reachable with UDP unicast (a host's IP in EPICS_CA_ADDR_LIST)\n" );
}
}
tid = epicsThreadCreate ( "CAS-TCP",
priorityOfConnectDaemon,
epicsThreadGetStackSize(epicsThreadStackMedium),
req_server, 0);
if ( tid == 0 ) {
epicsPrintf ( "CAS: unable to start connection request thread\n" );
/* start servers (TCP and UDP(s) for each interface.
*/
{
ELLNODE *cur;
int i;
for (i=0, cur=ellFirst(&casIntfAddrList); cur; i++, cur=ellNext(cur))
{
char ifaceName[40];
rsrv_iface_config *conf;
conf = callocMustSucceed(1, sizeof(*conf), "rsrv_init");
conf->tcpAddr = ((osiSockAddrNode *)cur)->addr;
conf->tcpAddr.ia.sin_port = htons(ca_server_port);
conf->tcp = socks[i];
socks[i] = INVALID_SOCKET;
ipAddrToDottedIP (&conf->tcpAddr.ia, ifaceName, sizeof(ifaceName));
conf->udp = conf->udpbcast = conf->udpbeacon = INVALID_SOCKET;
/* create and bind UDP beacon socket */
conf->udpbeacon = epicsSocketCreate(AF_INET, SOCK_DGRAM, 0);
if(conf->udpbeacon==INVALID_SOCKET)
cantProceed("rsrv_init ran out of udp sockets for beacon at %s", ifaceName);
/* beacon sender binds to a random port, and won't actually receive anything */
conf->udpbeaconRx = conf->tcpAddr;
conf->udpbeaconRx.ia.sin_port = 0;
if(tryBind(conf->udpbeacon, &conf->udpbeaconRx, "UDP beacon socket"))
goto cleanup;
{
int intTrue = 1;
if (setsockopt (conf->udpbeacon, SOL_SOCKET, SO_BROADCAST,
(char *)&intTrue, sizeof(intTrue))<0) {
errlogPrintf ("CAS: online socket set up error\n");
epicsThreadSuspendSelf ();
}
/*
* this connect is to supress a warning message on Linux
* when we shutdown the read side of the socket. If it
* fails (and it will on old ip kernels) we just ignore
* the failure.
*/
osiSockAddr sockAddr;
sockAddr.ia.sin_family = AF_UNSPEC;
sockAddr.ia.sin_port = htons ( 0 );
sockAddr.ia.sin_addr.s_addr = htonl (0);
connect ( conf->udpbeacon, & sockAddr.sa, sizeof ( sockAddr.sa ) );
shutdown ( conf->udpbeacon, SHUT_RD );
}
/* find interface broadcast address */
{
ELLLIST bcastList = ELLLIST_INIT;
osiSockAddrNode *pNode;
osiSockDiscoverBroadcastAddresses (&bcastList,
conf->udpbeacon, &conf->udpbeaconRx); // match addr
if(ellCount(&bcastList)==0) {
cantProceed("Can't find broadcast address of interface %s\n", ifaceName);
} else if(ellCount(&bcastList)>1 && conf->udpbeaconRx.ia.sin_addr.s_addr!=htonl(INADDR_ANY)) {
printf("Interface %s has more than one broadcast address?\n", ifaceName);
}
pNode = (osiSockAddrNode*)ellFirst(&bcastList);
/* beacons are sent to a well known port w/ the iface bcast addr */
conf->udpbeaconTx = conf->udpbeaconRx;
conf->udpbeaconTx.ia.sin_addr = pNode->addr.ia.sin_addr;
conf->udpbeaconTx.ia.sin_port = htons(beacon_port);
if(connect(conf->udpbeacon, &conf->udpbeaconTx.sa, sizeof(conf->udpbeaconTx))!=0)
{
char sockErrBuf[64], buf[40];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
ipAddrToDottedIP (&pNode->addr.ia, buf, sizeof(buf));
cantProceed( "%s: CA beacon routing (connect to \"%s\") error was \"%s\"\n",
__FILE__, buf, sockErrBuf);
}
/* TODO: free bcastList */
}
/* create and bind UDP name receiver socket(s) */
conf->udp = epicsSocketCreate(AF_INET, SOCK_DGRAM, 0);
if(conf->udp==INVALID_SOCKET)
cantProceed("rsrv_init ran out of udp sockets");
conf->udpAddr = conf->tcpAddr;
conf->udpAddr.ia.sin_port = htons(udp_port);
epicsSocketEnableAddressUseForDatagramFanout ( conf->udp );
if(tryBind(conf->udp, &conf->udpAddr, "UDP unicast socket"))
goto cleanup;
#if !defined(_WIN32)
/* An oddness of BSD sockets (not winsock) is that binding to
* INADDR_ANY will receive unicast and broadcast, but binding to
* a specific interface address receives only unicast. The trick
* is to bind a second socket to the interface broadcast address,
* which will then receive only broadcasts.
*/
if(conf->udpAddr.ia.sin_addr.s_addr!=htonl(INADDR_ANY)) {
conf->udpbcast = epicsSocketCreate(AF_INET, SOCK_DGRAM, 0);
if(conf->udpbcast==INVALID_SOCKET)
cantProceed("rsrv_init ran out of udp sockets for bcast");
conf->udpbcastAddr = conf->udpAddr;
conf->udpbcastAddr.ia.sin_addr = conf->udpbeaconTx.ia.sin_addr;
epicsSocketEnableAddressUseForDatagramFanout ( conf->udpbcast );
if(tryBind(conf->udpbcast, &conf->udpbcastAddr, "UDP Socket bcast"))
goto cleanup;
}
ellAdd(&servers, &conf->node);
#endif /* !defined(_WIN32) */
/* have all sockets, time to start some threads */
epicsThreadMustCreate("CAS-TCP", threadPrios[2],
epicsThreadGetStackSize(epicsThreadStackMedium),
&req_server, conf);
epicsEventMustWait(castcp_startStopEvent);
epicsThreadMustCreate("CAS-UDP", threadPrios[4],
epicsThreadGetStackSize(epicsThreadStackMedium),
&cast_server, conf);
epicsEventMustWait(casudp_startStopEvent);
#if !defined(_WIN32)
if(conf->udpbcast != INVALID_SOCKET) {
conf->startbcast = 1;
epicsThreadMustCreate("CAS-UDP2", threadPrios[4],
epicsThreadGetStackSize(epicsThreadStackMedium),
&cast_server, conf);
epicsEventMustWait(casudp_startStopEvent);
conf->startbcast = 0;
}
#endif /* !defined(_WIN32) */
continue;
cleanup:
epicsSocketDestroy(conf->tcp);
if(conf->udp!=INVALID_SOCKET) epicsSocketDestroy(conf->udp);
if(conf->udpbcast!=INVALID_SOCKET) epicsSocketDestroy(conf->udpbcast);
if(conf->udpbeacon!=INVALID_SOCKET) epicsSocketDestroy(conf->udpbeacon);
free(conf);
}
}
epicsEventMustWait(castcp_startStopEvent);
/* servers list is considered read-only from this point */
epicsThreadMustCreate("CAS-beacon", threadPrios[3],
epicsThreadGetStackSize(epicsThreadStackSmall),
&rsrv_online_notify_task, NULL);
epicsEventMustWait(beacon_startStopEvent);
return RSRV_OK;
}
@@ -498,30 +734,32 @@ void casr (unsigned level)
}
if (level>=2) {
client = (struct client *) ellNext ( &clientQudp.node );
rsrv_iface_config *client = (rsrv_iface_config *) ellFirst ( &servers );
while (client) {
struct sockaddr_in addr;
osiSocklen_t alen = sizeof(addr);
char buf[40];
if (!getsockname(client->udpRecv, (struct sockaddr*)&addr, &alen)) {
ipAddrToDottedIP (&addr, buf, sizeof(buf));
} else {
strcpy(buf, "<unknown>");
printf("Server interface\n");
ipAddrToDottedIP (&client->tcpAddr.ia, buf, sizeof(buf));
printf(" TCP listener %s\n", buf);
ipAddrToDottedIP (&client->udpAddr.ia, buf, sizeof(buf));
printf(" UDP receiver 1 %s\n", buf);
#if !defined(_WIN32)
if(client->udpbcast!=INVALID_SOCKET) {
ipAddrToDottedIP (&client->udpbcastAddr.ia, buf, sizeof(buf));
printf(" UDP receiver 2 %s\n", buf);
}
#endif
printf( "UDP Name Server: recvfrom %s", buf );
ipAddrToDottedIP (&client->udpbeaconRx.ia, buf, sizeof(buf));
printf(" UDP beacon socket bound %s\n", buf);
alen = sizeof(addr);
if (!getsockname(client->sock, (struct sockaddr*)&addr, &alen)) {
ipAddrToDottedIP (&addr, buf, sizeof(buf));
} else {
strcpy(buf, "<unknown>");
}
ipAddrToDottedIP (&client->udpbeaconTx.ia, buf, sizeof(buf));
printf(" UDP beacon destination %s\n", buf);
printf( " sendto %s\n", buf );
client = (struct client *) ellNext(&client->node);
client = (rsrv_iface_config *) ellNext(&client->node);
}
}
UNLOCK_CLIENTQ
+8 -64
View File
@@ -115,73 +115,23 @@ static void clean_addrq(struct client *client)
*/
void cast_server(void *pParm)
{
cast_config *conf = pParm;
osiSockAddr *paddrNode = &conf->pAddr;
struct sockaddr_in sin;
rsrv_iface_config *conf = pParm;
int status;
int count=0;
int mysocket=0;
struct sockaddr_in new_recv_addr;
osiSocklen_t recv_addr_size;
osiSockIoctl_t nchars;
SOCKET recv_sock;
SOCKET recv_sock, reply_sock;
struct client *client;
recv_addr_size = sizeof(new_recv_addr);
/*
* Open the socket.
* Use ARPA Internet address format and datagram socket.
*/
if ( ( recv_sock = epicsSocketCreate (AF_INET, SOCK_DGRAM, 0) ) == INVALID_SOCKET ) {
epicsPrintf ("CAS: cast socket creation error\n");
epicsThreadSuspendSelf ();
}
if(conf->reply_sock==INVALID_SOCKET) {
conf->reply_sock = recv_sock; /* assume that the socket capable of unicast send is created first */
mysocket = 1;
}
/*
* some concern that vxWorks will run out of mBuf's
* if this change is made
*
* joh 11-10-98
*/
#if 0
{
/*
*
* this allows for faster connects by queuing
* additional incomming UDP search frames
*
* this allocates a 32k buffer
* (uses a power of two)
*/
int size = 1u<<15u;
status = setsockopt (IOC_cast_sock, SOL_SOCKET,
SO_RCVBUF, (char *)&size, sizeof(size));
if (status<0) {
epicsPrintf ("CAS: unable to set cast socket size\n");
}
}
#endif
epicsSocketEnableAddressUseForDatagramFanout ( recv_sock );
memcpy(&sin, &paddrNode->ia, sizeof (sin));
/* get server's Internet address */
if( bind(recv_sock, (struct sockaddr *)&sin, sizeof (sin)) < 0){
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
epicsPrintf ("CAS: UDP server port bind error was \"%s\"\n", sockErrBuf );
epicsSocketDestroy ( recv_sock );
epicsThreadSuspendSelf ();
}
reply_sock = conf->udp;
if(conf->startbcast)
recv_sock = conf->udpbcast;
else
recv_sock = conf->udp;
/*
* setup new client structure but reuse old structure if
@@ -189,7 +139,7 @@ void cast_server(void *pParm)
*
*/
while ( TRUE ) {
client = create_client ( conf->reply_sock, IPPROTO_UDP );
client = create_client ( reply_sock, IPPROTO_UDP );
if ( client ) {
break;
}
@@ -197,11 +147,6 @@ void cast_server(void *pParm)
}
client->udpRecv = recv_sock;
assert(client->node.next==NULL && client->node.previous==NULL);
LOCK_CLIENTQ;
ellAdd ( &clientQudp, &client->node );
UNLOCK_CLIENTQ;
casAttachThreadToClient ( client );
/*
@@ -211,7 +156,6 @@ void cast_server(void *pParm)
/* these pointers become invalid after signaling casudp_startStopEvent */
conf = NULL;
paddrNode = NULL;
epicsEventSignal(casudp_startStopEvent);
+11 -240
View File
@@ -35,46 +35,19 @@
#define epicsExportSharedSymbols
#include "server.h"
/*
* forcePort ()
*/
static void forcePort (ELLLIST *pList, unsigned short port)
{
osiSockAddrNode *pNode;
pNode = (osiSockAddrNode *) ellFirst ( pList );
while ( pNode ) {
if ( pNode->addr.sa.sa_family == AF_INET ) {
pNode->addr.ia.sin_port = htons ( port );
}
pNode = (osiSockAddrNode *) ellNext ( &pNode->node );
}
}
/*
* RSRV_ONLINE_NOTIFY_TASK
*/
void rsrv_online_notify_task(void *pParm)
{
unsigned priorityOfSelf = epicsThreadGetPrioritySelf ();
osiSockAddrNode *pNode;
double delay;
double maxdelay;
long longStatus;
double maxPeriod;
caHdr msg;
int status;
SOCKET sock;
int intTrue = TRUE;
unsigned short port;
ca_uint32_t beaconCounter = 0;
char * pStr;
int autoBeaconAddr;
ELLLIST autoAddrList;
ELLNODE *cur;
char buf[16];
unsigned priorityOfUDP;
epicsThreadBooleanStatus tbs;
taskwdInsert (epicsThreadGetIdSelf(),NULL,NULL);
@@ -94,235 +67,33 @@ void rsrv_online_notify_task(void *pParm)
delay = 0.02; /* initial beacon period in sec */
maxdelay = maxPeriod;
/*
* Open the socket.
* Use ARPA Internet address format and datagram socket.
* Format described in <sys/socket.h>.
*/
if ( (sock = epicsSocketCreate (AF_INET, SOCK_DGRAM, 0)) == INVALID_SOCKET) {
errlogPrintf ("CAS: online socket creation error\n");
epicsThreadSuspendSelf ();
}
status = setsockopt (sock, SOL_SOCKET, SO_BROADCAST,
(char *)&intTrue, sizeof(intTrue));
if (status<0) {
errlogPrintf ("CAS: online socket set up error\n");
epicsThreadSuspendSelf ();
}
{
/*
* this connect is to supress a warning message on Linux
* when we shutdown the read side of the socket. If it
* fails (and it will on old ip kernels) we just ignore
* the failure.
*/
osiSockAddr sockAddr;
sockAddr.ia.sin_family = AF_UNSPEC;
sockAddr.ia.sin_port = htons ( 0 );
sockAddr.ia.sin_addr.s_addr = htonl (0);
connect ( sock, & sockAddr.sa, sizeof ( sockAddr.sa ) );
shutdown ( sock, SHUT_RD );
}
memset((char *)&msg, 0, sizeof msg);
msg.m_cmmd = htons (CA_PROTO_RSRV_IS_UP);
msg.m_count = htons (ca_server_port);
msg.m_dataType = htons (CA_MINOR_PROTOCOL_REVISION);
ellInit ( & beaconAddrList );
ellInit ( & autoAddrList );
pStr = envGetConfigParam(&EPICS_CAS_AUTO_BEACON_ADDR_LIST, sizeof(buf), buf);
if ( ! pStr ) {
pStr = envGetConfigParam(&EPICS_CA_AUTO_ADDR_LIST, sizeof(buf), buf);
}
if (pStr) {
if (strstr(pStr,"no")||strstr(pStr,"NO")) {
autoBeaconAddr = FALSE;
}
else if (strstr(pStr,"yes")||strstr(pStr,"YES")) {
autoBeaconAddr = TRUE;
}
else {
fprintf(stderr,
"CAS: EPICS_CA(S)_AUTO_ADDR_LIST = \"%s\"? Assuming \"YES\"\n", pStr);
autoBeaconAddr = TRUE;
}
}
else {
autoBeaconAddr = TRUE;
}
/*
* load user and auto configured
* broadcast address list
*/
if (envGetConfigParamPtr(&EPICS_CAS_BEACON_PORT)) {
port = envGetInetPortConfigParam (&EPICS_CAS_BEACON_PORT,
(unsigned short) CA_REPEATER_PORT );
}
else {
port = envGetInetPortConfigParam (&EPICS_CA_REPEATER_PORT,
(unsigned short) CA_REPEATER_PORT );
}
/*
* discover beacon addresses associated with this interface
*/
if ( autoBeaconAddr ) {
osiSockAddr addr;
ELLLIST tmpList;
ellInit ( &tmpList );
addr.ia.sin_family = AF_UNSPEC;
osiSockDiscoverBroadcastAddresses (&tmpList, sock, &addr);
forcePort ( &tmpList, port );
removeDuplicateAddresses ( &autoAddrList, &tmpList, 1 );
}
/*
* by default use EPICS_CA_ADDR_LIST for the
* beacon address list
*/
{
const ENV_PARAM *pParam;
if (envGetConfigParamPtr(&EPICS_CAS_INTF_ADDR_LIST) ||
envGetConfigParamPtr(&EPICS_CAS_BEACON_ADDR_LIST)) {
pParam = &EPICS_CAS_BEACON_ADDR_LIST;
}
else {
pParam = &EPICS_CA_ADDR_LIST;
}
/*
* add in the configured addresses
*/
addAddrToChannelAccessAddressList (
&autoAddrList, pParam, port, pParam == &EPICS_CA_ADDR_LIST );
}
removeDuplicateAddresses ( &beaconAddrList, &autoAddrList, 0 );
if ( ellCount ( &beaconAddrList ) == 0 ) {
errlogPrintf ("The CA server's beacon address list was empty after initialization?\n");
}
# ifdef DEBUG
printChannelAccessAddressList (&beaconAddrList);
# endif
tbs = epicsThreadHighestPriorityLevelBelow ( priorityOfSelf, &priorityOfUDP );
if ( tbs != epicsThreadBooleanStatusSuccess ) {
priorityOfUDP = priorityOfSelf;
}
casudp_startStopEvent = epicsEventMustCreate(epicsEventEmpty);
casudp_ctl = ctlPause;
for (cur=ellFirst(&casIntfAddrList); cur; cur=ellNext(cur))
{
/* casudp_startStopEvent ensures that this struct
* lives until the cast_server thread(s) are done with it.
*/
cast_config config;
config.pAddr = ((osiSockAddrNode *)cur)->addr;
config.reply_sock = INVALID_SOCKET;
epicsThreadMustCreate ( "CAS-UDP", priorityOfUDP,
epicsThreadGetStackSize (epicsThreadStackMedium),
cast_server, &config );
epicsEventMustWait(casudp_startStopEvent);
#if !defined(_WIN32)
/* An oddness of BSD sockets (not winsock) is that binding to
* INADDR_ANY will receive unicast and broadcast, but binding to
* a specific interface address receives only unicast. The trick
* is to bind a second socket to the interface broadcast address,
* which will then receive only broadcasts.
*/
if (config.pAddr.ia.sin_addr.s_addr != htonl(INADDR_ANY)) {
ELLLIST bcastList = ELLLIST_INIT;
osiSockDiscoverBroadcastAddresses (&bcastList,
sock, &config.pAddr); // match addr
if(ellCount(&bcastList)==0) {
errlogPrintf("CAS UDP: failed to find interface broadcast address\n");
} else {
osiSockAddrNode *pNode = (osiSockAddrNode*)ellFirst(&bcastList);
assert(config.pAddr.sa.sa_family==pNode->addr.sa.sa_family);
if (config.pAddr.ia.sin_addr.s_addr != pNode->addr.ia.sin_addr.s_addr) {
/* copy the address, keep the port */
config.pAddr.ia.sin_addr.s_addr = pNode->addr.ia.sin_addr.s_addr;
epicsThreadMustCreate ( "CAS-UDPB", priorityOfUDP,
epicsThreadGetStackSize (epicsThreadStackMedium),
cast_server, &config );
epicsEventMustWait(casudp_startStopEvent);
}
}
ellFree(&bcastList);
}
#endif
}
epicsEventSignal(beacon_startStopEvent);
while (TRUE) {
pNode = (osiSockAddrNode *) ellFirst (&beaconAddrList);
while (pNode) {
char buf[64];
ELLNODE *cur;
status = connect (sock, &pNode->addr.sa,
sizeof(pNode->addr.sa));
if (status<0) {
/* send beacon to each interface */
for(cur=ellFirst(&servers); cur; cur=ellNext(cur))
{
rsrv_iface_config *conf = CONTAINER(cur, rsrv_iface_config, node);
status = send (conf->udpbeacon, (char *)&msg, sizeof(msg), 0);
if (status < 0) {
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
ipAddrToDottedIP (&pNode->addr.ia, buf, sizeof(buf));
errlogPrintf ( "%s: CA beacon routing (connect to \"%s\") error was \"%s\"\n",
epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
ipAddrToDottedIP (&conf->udpbeaconTx.ia, buf, sizeof(buf));
errlogPrintf ( "%s: CA beacon (send to \"%s\") error was \"%s\"\n",
__FILE__, buf, sockErrBuf);
}
else {
struct sockaddr_in if_addr;
osiSocklen_t size = sizeof (if_addr);
status = getsockname (sock, (struct sockaddr *) &if_addr, &size);
if (status<0) {
char sockErrBuf[64];
epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
errlogPrintf ( "%s: CA beacon routing (getsockname) error was \"%s\"\n",
__FILE__, sockErrBuf);
}
else if (if_addr.sin_family==AF_INET) {
msg.m_available = if_addr.sin_addr.s_addr;
msg.m_cid = htonl ( beaconCounter );
status = send (sock, (char *)&msg, sizeof(msg), 0);
if (status < 0) {
char sockErrBuf[64];
epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
ipAddrToDottedIP (&pNode->addr.ia, buf, sizeof(buf));
errlogPrintf ( "%s: CA beacon (send to \"%s\") error was \"%s\"\n",
__FILE__, buf, sockErrBuf);
}
else {
assert (status == sizeof(msg));
}
}
assert (status == sizeof(msg));
}
pNode = (osiSockAddrNode *) pNode->node.next;
}
epicsThreadSleep(delay);
+12 -3
View File
@@ -138,9 +138,16 @@ struct event_ext {
};
typedef struct {
osiSockAddr pAddr;
SOCKET reply_sock;
} cast_config;
ELLNODE node;
osiSockAddr tcpAddr, /* TCP listener endpoint */
udpAddr, /* UDP name unicast receiver endpoint */
udpbcastAddr, /* UDP name broadcast receiver endpoint */
udpbeaconRx, /* UDP beacon receiver endpoint ( doesn't actually receive ) */
udpbeaconTx; /* UDP beacon destination address */
SOCKET tcp, udp, udpbcast, udpbeacon;
unsigned int startbcast:1;
} rsrv_iface_config;
enum ctl {ctlInit, ctlRun, ctlPause, ctlExit};
@@ -167,6 +174,7 @@ GLBLTYPE int CASDEBUG;
GLBLTYPE unsigned short ca_server_port;
GLBLTYPE ELLLIST clientQ; /* (TCP clients) locked by clientQlock */
GLBLTYPE ELLLIST clientQudp; /* locked by clientQlock */
GLBLTYPE ELLLIST servers; /* rsrv_iface_config::node, read-only after rsrv_init() */
GLBLTYPE ELLLIST beaconAddrList;
GLBLTYPE ELLLIST casIntfAddrList;
GLBLTYPE epicsMutexId clientQlock;
@@ -187,6 +195,7 @@ GLBLTYPE volatile enum ctl casudp_ctl;
GLBLTYPE volatile enum ctl beacon_ctl;
GLBLTYPE volatile enum ctl castcp_ctl;
GLBLTYPE unsigned int threadPrios[5];
#define CAS_HASH_TABLE_SIZE 4096
+1
View File
@@ -41,6 +41,7 @@ typedef socklen_t osiSocklen_t;
#define SOCK_ECONNRESET ECONNRESET
#define SOCK_ETIMEDOUT ETIMEDOUT
#define SOCK_EADDRINUSE EADDRINUSE
#define SOCK_EADDRNOTAVAIL EADDRNOTAVAIL
#define SOCK_ECONNREFUSED ECONNREFUSED
#define SOCK_ECONNABORTED ECONNABORTED
#define SOCK_EINPROGRESS EINPROGRESS
+1
View File
@@ -43,6 +43,7 @@ typedef socklen_t osiSocklen_t;
#define SOCK_ECONNRESET ECONNRESET
#define SOCK_ETIMEDOUT ETIMEDOUT
#define SOCK_EADDRINUSE EADDRINUSE
#define SOCK_EADDRNOTAVAIL EADDRNOTAVAIL
#define SOCK_ECONNREFUSED ECONNREFUSED
#define SOCK_ECONNABORTED ECONNABORTED
#define SOCK_EINPROGRESS EINPROGRESS
+1
View File
@@ -51,6 +51,7 @@ typedef socklen_t osiSocklen_t;
#define SOCK_ECONNRESET ECONNRESET
#define SOCK_ETIMEDOUT ETIMEDOUT
#define SOCK_EADDRINUSE EADDRINUSE
#define SOCK_EADDRNOTAVAIL EADDRNOTAVAIL
#define SOCK_ECONNREFUSED ECONNREFUSED
#define SOCK_ECONNABORTED ECONNABORTED
#define SOCK_EINPROGRESS EINPROGRESS
+1
View File
@@ -49,6 +49,7 @@ typedef int osiSocklen_t;
#define SOCK_ECONNRESET WSAECONNRESET
#define SOCK_ETIMEDOUT WSAETIMEDOUT
#define SOCK_EADDRINUSE WSAEADDRINUSE
#define SOCK_EADDRNOTAVAIL WSAEADDRNOTAVAIL
#define SOCK_ECONNREFUSED WSAECONNREFUSED
#define SOCK_ECONNABORTED WSAECONNABORTED
#define SOCK_EINPROGRESS WSAEINPROGRESS
+1
View File
@@ -50,6 +50,7 @@ typedef int osiSocklen_t;
#define SOCK_ECONNRESET ECONNRESET
#define SOCK_ETIMEDOUT ETIMEDOUT
#define SOCK_EADDRINUSE EADDRINUSE
#define SOCK_EADDRNOTAVAIL EADDRNOTAVAIL
#define SOCK_ECONNREFUSED ECONNREFUSED
#define SOCK_ECONNABORTED ECONNABORTED
#define SOCK_EINPROGRESS EINPROGRESS
+1
View File
@@ -44,6 +44,7 @@ typedef socklen_t osiSocklen_t;
#define SOCK_ECONNRESET ECONNRESET
#define SOCK_ETIMEDOUT ETIMEDOUT
#define SOCK_EADDRINUSE EADDRINUSE
#define SOCK_EADDRNOTAVAIL EADDRNOTAVAIL
#define SOCK_ECONNREFUSED ECONNREFUSED
#define SOCK_ECONNABORTED ECONNABORTED
#define SOCK_EINPROGRESS EINPROGRESS
+1
View File
@@ -42,6 +42,7 @@ typedef socklen_t osiSocklen_t;
#define SOCK_ECONNRESET ECONNRESET
#define SOCK_ETIMEDOUT ETIMEDOUT
#define SOCK_EADDRINUSE EADDRINUSE
#define SOCK_EADDRNOTAVAIL EADDRNOTAVAIL
#define SOCK_ECONNREFUSED ECONNREFUSED
#define SOCK_ECONNABORTED ECONNABORTED
#define SOCK_EINPROGRESS EINPROGRESS
+1
View File
@@ -52,6 +52,7 @@ typedef int osiSockIoctl_t;
#define SOCK_ECONNRESET ECONNRESET
#define SOCK_ETIMEDOUT ETIMEDOUT
#define SOCK_EADDRINUSE EADDRINUSE
#define SOCK_EADDRNOTAVAIL EADDRNOTAVAIL
#define SOCK_ECONNREFUSED ECONNREFUSED
#define SOCK_ECONNABORTED ECONNABORTED
#define SOCK_EINPROGRESS EINPROGRESS
+1
View File
@@ -73,6 +73,7 @@ typedef int osiSocklen_t;
#define SOCK_ECONNRESET ECONNRESET
#define SOCK_ETIMEDOUT ETIMEDOUT
#define SOCK_EADDRINUSE EADDRINUSE
#define SOCK_EADDRNOTAVAIL EADDRNOTAVAIL
#define SOCK_ECONNREFUSED ECONNREFUSED
#define SOCK_ECONNABORTED ECONNABORTED
#define SOCK_EINPROGRESS EINPROGRESS