eliminate the UDP send thread

This commit is contained in:
Jeff Hill
2000-06-16 23:35:02 +00:00
parent 5f4e31c600
commit 2e67127f25
6 changed files with 168 additions and 244 deletions

View File

@@ -534,15 +534,18 @@ void cac::beaconNotify ( const inetAddrID &addr )
int status;
status = getsockname ( this->pudpiiu->sock, (struct sockaddr *) &saddr,
&saddr_length);
assert ( status >= 0 );
&saddr_length );
if ( status < 0 ) {
epicsPrintf ( "CAC: getsockname () error was \"%s\"\n", SOCKERRSTR (SOCKERRNO) );
return;
}
port = ntohs ( saddr.sin_port );
}
{
ca_real delay;
delay = (port&CA_RECAST_PORT_MASK);
delay = ( port & CA_RECAST_PORT_MASK );
delay /= MSEC_PER_SEC;
delay += CA_RECAST_DELAY;

View File

@@ -319,9 +319,6 @@ private:
*/
#define CA_RETRY_PERIOD 5 /* int sec to next keepalive */
#define N_REPEATER_TRIES_PRIOR_TO_MSG 50
#define REPEATER_TRY_PERIOD (1.0)
/*
* this determines the number of messages received
* without a delay in between before we go into
@@ -398,6 +395,7 @@ private:
class repeaterSubscribeTimer : public osiTimer {
public:
repeaterSubscribeTimer (udpiiu &iiu, osiTimerQueue &queue);
void confirmNotify ();
private:
virtual void expire ();
@@ -408,6 +406,9 @@ private:
virtual const char *name () const;
udpiiu &iiu;
unsigned attempts;
bool registered;
bool once;
};
class udpiiu : public netiiu {
@@ -421,10 +422,13 @@ public:
void addToChanList (nciu *chan);
void removeFromChanList (nciu *chan);
void disconnect (nciu *chan);
int recvMsg ();
int post_msg (const struct sockaddr_in *pnet_addr,
char *pInBuf, unsigned long blockSize);
int pushStreamMsg ( const caHdr *pmsg, const void *pext, bool BlockingOk );
int pushDatagramMsg (const caHdr *pMsg, const void *pExt, ca_uint16_t extsize);
void repeaterRegistrationMessage ( unsigned attemptNumber );
void flush ();
osiTime recvTime;
char xmitBuf[MAX_UDP];
@@ -432,16 +436,11 @@ public:
searchTimer searchTmr;
repeaterSubscribeTimer repeaterSubscribeTmr;
semMutexId xmitBufLock;
semBinaryId xmitSignal;
ELLLIST dest;
SOCKET sock;
semBinaryId recvThreadExitSignal;
semBinaryId sendThreadExitSignal;
unsigned nBytesInXmitBuf;
unsigned repeaterTries;
unsigned short repeaterPort;
bool contactRepeater;
bool repeaterContacted;
bool shutdownCmd;
// exceptions

View File

@@ -23,17 +23,17 @@ struct putCvrtBuf {
/*
* nciu::nciu ()
*/
nciu::nciu (cac *pcac, cacChannel &chan, const char *pNameIn) :
cacChannelIO (chan)
nciu::nciu ( cac *pcac, cacChannel &chan, const char *pNameIn ) :
cacChannelIO ( chan )
{
static const caar defaultAccessRights = { false, false };
size_t strcnt;
strcnt = strlen (pNameIn) + 1;
if ( strcnt > MAX_UDP - sizeof (caHdr) ) {
throwWithLocation ( caErrorCode (ECA_STRTOBIG) );
strcnt = strlen ( pNameIn ) + 1;
if ( strcnt > MAX_UDP - sizeof ( caHdr ) ) {
throwWithLocation ( caErrorCode ( ECA_STRTOBIG ) );
}
this->pNameStr = reinterpret_cast <char *> ( malloc (strcnt) );
this->pNameStr = reinterpret_cast <char *> ( malloc ( strcnt ) );
if ( ! this->pNameStr ) {
this->f_fullyConstructed = false;
return;
@@ -42,8 +42,6 @@ nciu::nciu (cac *pcac, cacChannel &chan, const char *pNameIn) :
pcac->lock ();
pcac->installChannel (*this);
this->typeCode = USHRT_MAX; /* invalid initial type */
this->count = 0; /* invalid initial count */
this->sid = UINT_MAX; /* invalid initial server id */
@@ -52,7 +50,8 @@ nciu::nciu (cac *pcac, cacChannel &chan, const char *pNameIn) :
this->previousConn = 0;
this->f_connected = false;
pcac->pudpiiu->addToChanList (this);
pcac->installChannel ( *this );
pcac->pudpiiu->addToChanList ( this );
/*
* reset broadcasted search counters
@@ -61,7 +60,7 @@ nciu::nciu (cac *pcac, cacChannel &chan, const char *pNameIn) :
this->f_fullyConstructed = true;
chan.attachIO (*this);
chan.attachIO ( *this );
pcac->unlock ();
}

View File

@@ -14,15 +14,25 @@
#include "iocinf.h"
repeaterSubscribeTimer::repeaterSubscribeTimer (udpiiu &iiuIn, osiTimerQueue &queueIn) :
osiTimer (queueIn), iiu (iiuIn)
repeaterSubscribeTimer::repeaterSubscribeTimer ( udpiiu &iiuIn, osiTimerQueue &queueIn ) :
osiTimer ( 10.0, queueIn ), iiu ( iiuIn ), attempts ( 0 ), registered ( false ), once (false)
{
}
void repeaterSubscribeTimer::expire ()
{
this->iiu.contactRepeater = 1u;
semBinaryGive (this->iiu.xmitSignal);
static const unsigned nTriesToMsg = 50;
this->attempts++;
if ( this->attempts > nTriesToMsg && ! this->once ) {
ca_printf (
"Unable to contact CA repeater after %u tries\n", nTriesToMsg);
ca_printf (
"Silence this message by starting a CA repeater daemon\n");
this->once = true;
}
this->iiu.repeaterRegistrationMessage ( this->attempts );
}
void repeaterSubscribeTimer::destroy ()
@@ -31,17 +41,12 @@ void repeaterSubscribeTimer::destroy ()
bool repeaterSubscribeTimer::again () const
{
if (this->iiu.repeaterContacted) {
return false;
}
else {
return true;
}
return ( ! this->registered );
}
double repeaterSubscribeTimer::delay () const
{
return REPEATER_TRY_PERIOD;
return 1.0;
}
void repeaterSubscribeTimer::show (unsigned /* level */ ) const
@@ -53,3 +58,7 @@ const char *repeaterSubscribeTimer::name () const
return "repeaterSubscribeTimer";
}
void repeaterSubscribeTimer::confirmNotify ()
{
this->registered = true;
}

View File

@@ -41,18 +41,30 @@ searchTimer::searchTimer (udpiiu &iiuIn, osiTimerQueue &queueIn) :
//
void searchTimer::reset ( double delayToNextTry )
{
LOCK (this->iiu.pcas);
this->retry = 0;
this->period = CA_RECAST_DELAY;
UNLOCK (this->iiu.pcas);
bool reschedule;
if ( delayToNextTry < CA_RECAST_DELAY ) {
delayToNextTry = CA_RECAST_DELAY;
}
if ( this->timeRemaining () > delayToNextTry ) {
this->reschedule (delayToNextTry);
debugPrintf ( ("reschedualed search timer for completion in %f sec\n", delayToNextTry) );
LOCK (this->iiu.pcas);
this->retry = 0;
if ( this->period > delayToNextTry ) {
reschedule = true;
}
else {
reschedule = false;
}
this->period = CA_RECAST_DELAY;
UNLOCK (this->iiu.pcas);
if ( reschedule ) {
this->reschedule ( delayToNextTry );
debugPrintf ( ("rescheduled search timer for completion in %f sec\n", delayToNextTry) );
}
else {
this->activate ( delayToNextTry );
debugPrintf ( ("if inactive, search timer started to completion in %f sec\n", delayToNextTry) );
}
}
@@ -105,7 +117,7 @@ void searchTimer::notifySearchResponse (nciu *pChan)
UNLOCK (this->iiu.pcas);
if (pChan->retrySeqNo == this->retrySeqNo) {
if ( pChan->retrySeqNo == this->retrySeqNo ) {
this->reschedule (0.0);
}
}
@@ -115,8 +127,8 @@ void searchTimer::notifySearchResponse (nciu *pChan)
//
void searchTimer::expire ()
{
tsDLIterBD<nciu> chan(0);
tsDLIterBD<nciu> firstChan(0);
tsDLIterBD <nciu> chan(0);
tsDLIterBD <nciu> firstChan(0);
int status;
unsigned nSent=0u;
@@ -255,24 +267,24 @@ void searchTimer::expire ()
* list (if successful)
*/
status = chan->searchMsg ();
if (status != ECA_NORMAL) {
if ( status != ECA_NORMAL ) {
nSent++;
if (nSent>=this->framesPerTry) {
if ( nSent >= this->framesPerTry ) {
break;
}
/* flush out the search request buffer */
semBinaryGive (this->iiu.xmitSignal);
/* try again */
// flush out the search request buffer
this->iiu.flush ();
// try again
status = chan->searchMsg ();
if (status != ECA_NORMAL) {
break;
}
}
if (this->searchTries<ULONG_MAX) {
if ( this->searchTries < ULONG_MAX ) {
this->searchTries++;
}
@@ -282,7 +294,7 @@ void searchTimer::expire ()
/*
* dont send any of the channels twice within one try
*/
if (chan==firstChan) {
if ( chan == firstChan ) {
/*
* add one to nSent because there may be
* one more partial frame to be sent
@@ -304,9 +316,9 @@ void searchTimer::expire ()
UNLOCK (this->iiu.pcas);
/* flush out the search request buffer */
semBinaryGive (this->iiu.xmitSignal);
// flush out the search request buffer
this->iiu.flush ();
debugPrintf ( ("sent %u delay sec=%f\n", nSent, this->period) );
}

View File

@@ -18,16 +18,16 @@
typedef void (*pProtoStubUDP) (udpiiu *piiu, caHdr *pMsg, const struct sockaddr_in *pnet_addr);
/*
* cac_udp_recv_msg ()
*/
LOCAL int cac_udp_recv_msg (udpiiu *piiu)
//
// udpiiu::recvMsg ()
//
int udpiiu::recvMsg ()
{
osiSockAddr src;
int src_size = sizeof (src);
int status;
status = recvfrom ( piiu->sock, piiu->recvBuf, sizeof (piiu->recvBuf), 0,
status = recvfrom ( this->sock, this->recvBuf, sizeof ( this->recvBuf ), 0,
&src.sa, &src_size );
if (status < 0) {
int errnoCpy = SOCKERRNO;
@@ -42,7 +42,7 @@ LOCAL int cac_udp_recv_msg (udpiiu *piiu)
return -1;
}
if ( errnoCpy == SOCK_EINTR ) {
if ( piiu->shutdownCmd ) {
if ( this->shutdownCmd ) {
return -1;
}
else {
@@ -62,8 +62,8 @@ LOCAL int cac_udp_recv_msg (udpiiu *piiu)
"Unexpected UDP recv error %s\n", SOCKERRSTR(errnoCpy));
}
else if (status > 0) {
status = piiu->post_msg ( &src.ia,
piiu->recvBuf, (unsigned long) status );
status = this->post_msg ( &src.ia,
this->recvBuf, (unsigned long) status );
if ( status != ECA_NORMAL ) {
char buf[64];
@@ -89,44 +89,24 @@ extern "C" void cacRecvThreadUDP (void *pParam)
int status;
do {
status = cac_udp_recv_msg (piiu);
status = piiu->recvMsg ();
} while ( status == 0 );
semBinaryGive (piiu->recvThreadExitSignal);
}
/*
* NOTIFY_CA_REPEATER()
*
* tell the cast repeater that another client needs fan out
*
* NOTES:
* 1) local communication only (no LAN traffic)
* udpiiu::repeaterRegistrationMessage ()
*
* register with the repeater
*/
void notify_ca_repeater (udpiiu *piiu)
void udpiiu::repeaterRegistrationMessage ( unsigned attemptNumber )
{
caHdr msg;
osiSockAddr saddr;
int status;
static int once = FALSE;
int len;
if (piiu->repeaterContacted) {
return;
}
if (piiu->repeaterTries > N_REPEATER_TRIES_PRIOR_TO_MSG ) {
if (!once) {
ca_printf (
"Unable to contact CA repeater after %d tries\n",
N_REPEATER_TRIES_PRIOR_TO_MSG);
ca_printf (
"Silence this message by starting a CA repeater daemon\n");
once = TRUE;
}
}
/*
* In 3.13 beta 11 and before the CA repeater calls local_addr()
* to determine a local address and does not allow registration
@@ -145,9 +125,9 @@ void notify_ca_repeater (udpiiu *piiu)
* either the loopback address or the address returned
* by local address (the first non-loopback address found)
*/
if (piiu->repeaterTries&1) {
saddr = osiLocalAddr (piiu->sock);
if (saddr.sa.sa_family != AF_INET) {
if ( attemptNumber & 1 ) {
saddr = osiLocalAddr ( this->sock );
if ( saddr.sa.sa_family != AF_INET ) {
/*
* use the loop back address to communicate with the CA repeater
* if this os does not have interface query capabilities
@@ -155,18 +135,18 @@ void notify_ca_repeater (udpiiu *piiu)
* this will only work with 3.13 beta 12 CA repeaters or later
*/
saddr.ia.sin_family = AF_INET;
saddr.ia.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
saddr.ia.sin_port = htons (piiu->repeaterPort);
saddr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK );
saddr.ia.sin_port = htons ( this->repeaterPort );
}
}
else {
saddr.ia.sin_family = AF_INET;
saddr.ia.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
saddr.ia.sin_port = htons (piiu->repeaterPort);
saddr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK );
saddr.ia.sin_port = htons ( this->repeaterPort );
}
memset ((char *)&msg, 0, sizeof(msg));
msg.m_cmmd = htons (REPEATER_REGISTER);
memset ( (char *) &msg, 0, sizeof (msg) );
msg.m_cmmd = htons ( REPEATER_REGISTER );
msg.m_available = saddr.ia.sin_addr.s_addr;
/*
@@ -186,13 +166,13 @@ void notify_ca_repeater (udpiiu *piiu)
* therefore restarted the CA repeater - and therefore
* moved it to an EPICS release that accepts this protocol)
*/
# if defined (DOES_NOT_ACCEPT_ZERO_LENGTH_UDP)
# if defined ( DOES_NOT_ACCEPT_ZERO_LENGTH_UDP )
len = sizeof (msg);
# else
len = 0;
# endif
status = sendto ( piiu->sock, (char *) &msg, len,
status = sendto ( this->sock, (char *) &msg, len,
0, (struct sockaddr *)&saddr, sizeof (saddr) );
if ( status < 0 ) {
int errnoCpy = SOCKERRNO;
@@ -207,88 +187,6 @@ void notify_ca_repeater (udpiiu *piiu)
SOCKERRSTR(errnoCpy));
}
}
piiu->repeaterTries++;
piiu->contactRepeater = 0u;
}
/*
* cacSendThreadUDP ()
*/
extern "C" void cacSendThreadUDP (void *pParam)
{
udpiiu *piiu = (udpiiu *) pParam;
while ( ! piiu->shutdownCmd ) {
int status;
if ( piiu->contactRepeater ) {
notify_ca_repeater (piiu);
}
semBinaryMustTake ( piiu->xmitSignal );
semMutexMustTake (piiu->xmitBufLock);
if (piiu->nBytesInXmitBuf > 0) {
osiSockAddrNode *pNode;
pNode = (osiSockAddrNode *) ellFirst (&piiu->dest);
while ( pNode ) {
assert ( piiu->nBytesInXmitBuf <= INT_MAX );
status = sendto ( piiu->sock, piiu->xmitBuf,
(int) piiu->nBytesInXmitBuf, 0,
&pNode->addr.sa, sizeof (pNode->addr.sa) );
if ( status <= 0 ) {
int localErrno = SOCKERRNO;
if (status==0) {
break;
}
if (localErrno == SOCK_SHUTDOWN) {
break;
}
else if ( localErrno == SOCK_ENOTSOCK ) {
break;
}
else if ( localErrno == SOCK_EBADF ) {
break;
}
else if ( localErrno == SOCK_EINTR ) {
if ( piiu->shutdownCmd ) {
break;
}
else {
continue;
}
}
else {
char buf[64];
ipAddrToA (&pNode->addr.ia, buf, sizeof (buf));
ca_printf (
"CAC: error = \"%s\" sending UDP msg to %s\n",
SOCKERRSTR(localErrno), buf);
break;
}
}
pNode = (osiSockAddrNode *) ellNext (&pNode->node);
}
piiu->nBytesInXmitBuf = 0u;
if ( status <= 0 ) {
break;
}
}
semMutexGive ( piiu->xmitBufLock );
}
semBinaryGive ( piiu->sendThreadExitSignal) ;
}
/*
@@ -421,9 +319,6 @@ udpiiu::udpiiu ( cac *pcac ) :
}
this->nBytesInXmitBuf = 0u;
this->contactRepeater = 0u;
this->repeaterContacted = 0u;
this->repeaterTries = 0u;
this->xmitBufLock = semMutexCreate ();
if (!this->xmitBufLock) {
@@ -438,24 +333,6 @@ udpiiu::udpiiu ( cac *pcac ) :
throwWithLocation ( noMemory () );
}
this->sendThreadExitSignal = semBinaryCreate (semEmpty);
if ( ! this->sendThreadExitSignal ) {
semBinaryDestroy (this->recvThreadExitSignal);
semMutexDestroy (this->xmitBufLock);
socket_close (this->sock);
throwWithLocation ( noMemory () );
}
this->xmitSignal = semBinaryCreate (semEmpty);
if ( ! this->xmitSignal ) {
ca_printf ("CA: unable to create xmit signal\n");
semBinaryDestroy (this->recvThreadExitSignal);
semBinaryDestroy (this->sendThreadExitSignal);
semMutexDestroy (this->xmitBufLock);
socket_close (this->sock);
throwWithLocation ( noMemory () );
}
/*
* load user and auto configured
* broadcast address list
@@ -481,35 +358,6 @@ udpiiu::udpiiu ( cac *pcac ) :
threadGetStackSize (threadStackMedium), cacRecvThreadUDP, this);
if (tid==0) {
ca_printf ("CA: unable to create UDP receive thread\n");
::shutdown (this->sock, SD_BOTH);
semBinaryDestroy (this->xmitSignal);
semBinaryDestroy (this->recvThreadExitSignal);
semBinaryDestroy (this->sendThreadExitSignal);
semMutexDestroy (this->xmitBufLock);
socket_close (this->sock);
throwWithLocation ( noMemory () );
}
}
{
unsigned priorityOfSelf = threadGetPrioritySelf ();
unsigned priorityOfSend;
threadId tid;
threadBoolStatus tbs;
tbs = threadHighestPriorityLevelBelow (priorityOfSelf, &priorityOfSend);
if ( tbs != tbsSuccess ) {
priorityOfSend = priorityOfSelf;
}
tid = threadCreate ( "CAC-UDP-send", priorityOfSend,
threadGetStackSize (threadStackMedium), cacSendThreadUDP, this );
if (tid==0) {
ca_printf ("CA: unable to create UDP transmitt thread\n");
::shutdown (this->sock, SD_BOTH);
semMutexMustTake (this->recvThreadExitSignal);
semBinaryDestroy (this->xmitSignal);
semBinaryDestroy (this->sendThreadExitSignal);
semBinaryDestroy (this->recvThreadExitSignal);
semMutexDestroy (this->xmitBufLock);
socket_close (this->sock);
@@ -517,8 +365,8 @@ udpiiu::udpiiu ( cac *pcac ) :
}
}
if (pcac->ca_fd_register_func) {
(*pcac->ca_fd_register_func) (pcac->ca_fd_register_arg, this->sock, TRUE);
if ( pcac->ca_fd_register_func ) {
( *pcac->ca_fd_register_func ) ( pcac->ca_fd_register_arg, this->sock, TRUE );
}
if ( ! repeater_installed (this) ) {
@@ -552,8 +400,6 @@ udpiiu::udpiiu ( cac *pcac ) :
ca_printf ("CA: unable to start CA repeater daemon detached process\n");
}
}
this->repeaterSubscribeTmr.reschedule ();
}
/*
@@ -576,14 +422,11 @@ udpiiu::~udpiiu ()
}
UNLOCK (this->pcas);
// wait for send and recv threads to exit
// wait for recv threads to exit
semBinaryMustTake (this->recvThreadExitSignal);
semBinaryMustTake (this->sendThreadExitSignal);
semBinaryDestroy (this->xmitSignal);
semMutexDestroy (this->xmitBufLock);
semBinaryDestroy (this->recvThreadExitSignal);
semBinaryDestroy (this->sendThreadExitSignal);
ellFree (&this->dest);
if (this->pcas->ca_fd_register_func) {
@@ -593,7 +436,7 @@ udpiiu::~udpiiu ()
}
/*
* udpiiu::sutdown ()
* udpiiu::shutdown ()
*/
void udpiiu::shutdown ()
{
@@ -613,7 +456,6 @@ void udpiiu::shutdown ()
}
}
UNLOCK (this->pcas);
semBinaryGive (this->xmitSignal);
}
/*
@@ -823,11 +665,7 @@ LOCAL void beacon_action ( udpiiu * piiu,
LOCAL void repeater_ack_action (udpiiu * piiu,
caHdr * /* pMsg */, const struct sockaddr_in * /* pnet_addr */)
{
piiu->repeaterContacted = 1u;
# ifdef DEBUG
ca_printf ( "CAC: repeater confirmation recv\n");
# endif
return;
piiu->repeaterSubscribeTmr.confirmNotify ();
}
/*
@@ -1055,7 +893,8 @@ int udpiiu::pushDatagramMsg (const caHdr *pMsg, const void *pExt, ca_uint16_t ex
}
semMutexMustTake (this->xmitBufLock);
if ( msgsize + this->nBytesInXmitBuf > sizeof (this->xmitBuf) ) {
if ( msgsize + this->nBytesInXmitBuf > sizeof ( this->xmitBuf ) ) {
semMutexGive (this->xmitBufLock);
return ECA_TOLARGE;
}
@@ -1069,11 +908,74 @@ int udpiiu::pushDatagramMsg (const caHdr *pMsg, const void *pExt, ca_uint16_t ex
}
pbufmsg->m_postsize = htons (allignedExtSize);
this->nBytesInXmitBuf += msgsize;
semMutexGive (this->xmitBufLock);
return ECA_NORMAL;
}
//
// udpiiu::flush ()
//
void udpiiu::flush ()
{
osiSockAddrNode *pNode;
semMutexMustTake (this->xmitBufLock);
pNode = (osiSockAddrNode *) ellFirst ( &this->dest );
while ( pNode ) {
int status;
assert ( this->nBytesInXmitBuf <= INT_MAX );
status = sendto ( this->sock, this->xmitBuf,
(int) this->nBytesInXmitBuf, 0,
&pNode->addr.sa, sizeof ( pNode->addr.sa ) );
if ( status != (int) this->nBytesInXmitBuf ) {
if ( status >= 0 ) {
ca_printf ( "CAC: UDP sendto () call returned strange xmit count?\n" );
break;
}
else {
int localErrno = SOCKERRNO;
if ( localErrno == SOCK_EINTR ) {
if ( this->shutdownCmd ) {
break;
}
else {
continue;
}
}
else if ( localErrno == SOCK_SHUTDOWN ) {
break;
}
else if ( localErrno == SOCK_ENOTSOCK ) {
break;
}
else if ( localErrno == SOCK_EBADF ) {
break;
}
else {
char buf[64];
ipAddrToA ( &pNode->addr.ia, buf, sizeof ( buf ) );
ca_printf (
"CAC: error = \"%s\" sending UDP msg to %s\n",
SOCKERRSTR ( localErrno ), buf);
break;
}
}
}
pNode = (osiSockAddrNode *) ellNext ( &pNode->node );
}
this->nBytesInXmitBuf = 0u;
semMutexGive ( this->xmitBufLock );
}
int udpiiu::pushStreamMsg ( const caHdr * /* pmsg */,
const void * /* pext */, bool /* blockingOk */ )
{