o upgraded udp to use version and sequence numbers

This commit is contained in:
Jeff Hill
2002-06-25 22:18:11 +00:00
parent 729144f7c5
commit 7f8de741f0
4 changed files with 54 additions and 11 deletions
+3 -3
View File
@@ -113,7 +113,7 @@ void casClient::loadProtoJumpTable()
// (use of & here is more portable)
//
casClient::msgHandlers[CA_PROTO_VERSION] =
&casClient::noopAction;
&casClient::versionAction;
casClient::msgHandlers[CA_PROTO_EVENT_ADD] =
&casClient::eventAddAction;
casClient::msgHandlers[CA_PROTO_EVENT_CANCEL] =
@@ -388,9 +388,9 @@ caStatus casClient::echoAction ()
}
/*
* casClient::noopAction()
* casClient::versionAction()
*/
caStatus casClient::noopAction ()
caStatus casClient::versionAction ()
{
return S_cas_success;
}
+46 -6
View File
@@ -45,8 +45,9 @@
//
// casDGClient::casDGClient()
//
casDGClient::casDGClient (caServerI &serverIn) :
casClient (serverIn, MAX_UDP_RECV+sizeof(cadg))
casDGClient::casDGClient ( caServerI & serverIn ) :
casClient ( serverIn, MAX_UDP_RECV + sizeof ( cadg ) ),
seqNoOfReq ( 0 )
{
}
@@ -331,6 +332,25 @@ caStatus casDGClient::searchFailResponse ( const caHdrLargeArray * mp )
return S_cas_success;
}
/*
* casDGClient::versionAction()
*/
caStatus casDGClient::versionAction ()
{
const caHdrLargeArray * mp = this->ctx.getMsg();
if ( mp->m_count != 0 ) {
this->minor_version_number = mp->m_count;
if ( CA_V411 ( mp->m_count ) ) {
this->seqNoOfReq = mp->m_cid;
}
else {
this->seqNoOfReq = 0;
}
}
return S_cas_success;
}
//
// casDGClient::sendBeacon()
// (implemented here because this has knowledge of the protocol)
@@ -379,10 +399,23 @@ outBufClient::flushCondition casDGClient::xSend ( char *pBufIn, // X aCC 361
assert ( totalBytes+pHdr->cadg_nBytes <= nBytesAvailableToSend );
if ( pHdr->cadg_addr.isValid() ) {
stat = this->osdSend ( reinterpret_cast<char *>(pHdr+1),
pHdr->cadg_nBytes-sizeof(*pHdr), pHdr->cadg_addr );
char * pDG = reinterpret_cast < char * > ( pHdr + 1 );
unsigned sizeDG = pHdr->cadg_nBytes - sizeof ( *pHdr );
caHdr * pMsg = reinterpret_cast < caHdr * > ( pDG );
assert ( ntohs ( pMsg->m_cmmd ) == CA_PROTO_VERSION );
if ( CA_V411 ( this->minor_version_number ) ) {
pMsg->m_cid = htonl ( this->seqNoOfReq );
pMsg->m_dataType = htons ( sequenceNoIsValid );
}
else {
pDG += sizeof (caHdr);
sizeDG -= sizeof (caHdr);
}
stat = this->osdSend ( pDG, sizeDG, pHdr->cadg_addr );
if ( stat != outBufClient::flushProgress ) {
break;
break;
}
}
@@ -510,6 +543,9 @@ caStatus casDGClient::processDG ()
status = S_cas_sendBlocked;
break;
}
// insert version header at the start of the reply message
this->sendVersion ();
cadg *pRespHdr = reinterpret_cast <cadg *> (pRaw);
@@ -528,6 +564,8 @@ caStatus casDGClient::processDG ()
}
this->lastRecvAddr = pReqHdr->cadg_addr;
this->seqNoOfReq = 0;
this->minor_version_number = 0;
status = this->processMsg ();
dgInBytesConsumed = this->in.popCtx ( inctx );
@@ -542,7 +580,9 @@ caStatus casDGClient::processDG ()
// release the send lock
//
pRespHdr->cadg_nBytes = this->out.popCtx ( outctx ) + sizeof ( *pRespHdr );
if ( pRespHdr->cadg_nBytes > sizeof ( *pRespHdr ) ) {
// if there are not additional messages passed the version header
// then discard the message
if ( pRespHdr->cadg_nBytes > sizeof ( *pRespHdr ) + sizeof (caHdr) ) {
pRespHdr->cadg_addr = pReqHdr->cadg_addr;
this->out.commitRawMsg ( pRespHdr->cadg_nBytes );
}
+1 -1
View File
@@ -34,7 +34,7 @@
//
#include "tsDLList.h"
#include "resourceLib.h"
#define CA_MINOR_PROTOCOL_REVISION 10
#define CA_MINOR_PROTOCOL_REVISION 11
#include "caProto.h"
#include "smartGDDPointer.h"
+4 -1
View File
@@ -553,7 +553,7 @@ private:
//
virtual caStatus uknownMessageAction () = 0;
caStatus ignoreMsgAction ();
caStatus noopAction ();
virtual caStatus versionAction ();
virtual caStatus eventAddAction ();
virtual caStatus eventCancelAction ();
virtual caStatus readAction ();
@@ -756,6 +756,7 @@ protected:
private:
caNetAddr lastRecvAddr;
ca_uint32_t seqNoOfReq;
//
// one function for each CA request type
@@ -786,6 +787,8 @@ private:
virtual inBufClient::fillCondition osdRecv ( char *pBuf, bufSizeT nBytesReq,
fillParameter parm, bufSizeT &nBytesActual, caNetAddr & addr ) = 0;
caStatus versionAction ();
//
// cadg
//