diff --git a/src/cas/generic/casClient.cc b/src/cas/generic/casClient.cc index 8c91702b2..ea9266e31 100644 --- a/src/cas/generic/casClient.cc +++ b/src/cas/generic/casClient.cc @@ -113,7 +113,7 @@ void casClient::loadProtoJumpTable() // (use of & here is more portable) // casClient::msgHandlers[CA_PROTO_VERSION] = - &casClient::noopAction; + &casClient::versionAction; casClient::msgHandlers[CA_PROTO_EVENT_ADD] = &casClient::eventAddAction; casClient::msgHandlers[CA_PROTO_EVENT_CANCEL] = @@ -388,9 +388,9 @@ caStatus casClient::echoAction () } /* - * casClient::noopAction() + * casClient::versionAction() */ -caStatus casClient::noopAction () +caStatus casClient::versionAction () { return S_cas_success; } diff --git a/src/cas/generic/casDGClient.cc b/src/cas/generic/casDGClient.cc index 1432712b6..d4caf37e6 100644 --- a/src/cas/generic/casDGClient.cc +++ b/src/cas/generic/casDGClient.cc @@ -45,8 +45,9 @@ // // casDGClient::casDGClient() // -casDGClient::casDGClient (caServerI &serverIn) : - casClient (serverIn, MAX_UDP_RECV+sizeof(cadg)) +casDGClient::casDGClient ( caServerI & serverIn ) : + casClient ( serverIn, MAX_UDP_RECV + sizeof ( cadg ) ), + seqNoOfReq ( 0 ) { } @@ -331,6 +332,25 @@ caStatus casDGClient::searchFailResponse ( const caHdrLargeArray * mp ) return S_cas_success; } +/* + * casDGClient::versionAction() + */ +caStatus casDGClient::versionAction () +{ + const caHdrLargeArray * mp = this->ctx.getMsg(); + + if ( mp->m_count != 0 ) { + this->minor_version_number = mp->m_count; + if ( CA_V411 ( mp->m_count ) ) { + this->seqNoOfReq = mp->m_cid; + } + else { + this->seqNoOfReq = 0; + } + } + return S_cas_success; +} + // // casDGClient::sendBeacon() // (implemented here because this has knowledge of the protocol) @@ -379,10 +399,23 @@ outBufClient::flushCondition casDGClient::xSend ( char *pBufIn, // X aCC 361 assert ( totalBytes+pHdr->cadg_nBytes <= nBytesAvailableToSend ); if ( pHdr->cadg_addr.isValid() ) { - stat = this->osdSend ( reinterpret_cast(pHdr+1), - pHdr->cadg_nBytes-sizeof(*pHdr), pHdr->cadg_addr ); + + char * pDG = reinterpret_cast < char * > ( pHdr + 1 ); + unsigned sizeDG = pHdr->cadg_nBytes - sizeof ( *pHdr ); + caHdr * pMsg = reinterpret_cast < caHdr * > ( pDG ); + assert ( ntohs ( pMsg->m_cmmd ) == CA_PROTO_VERSION ); + if ( CA_V411 ( this->minor_version_number ) ) { + pMsg->m_cid = htonl ( this->seqNoOfReq ); + pMsg->m_dataType = htons ( sequenceNoIsValid ); + } + else { + pDG += sizeof (caHdr); + sizeDG -= sizeof (caHdr); + } + + stat = this->osdSend ( pDG, sizeDG, pHdr->cadg_addr ); if ( stat != outBufClient::flushProgress ) { - break; + break; } } @@ -510,6 +543,9 @@ caStatus casDGClient::processDG () status = S_cas_sendBlocked; break; } + + // insert version header at the start of the reply message + this->sendVersion (); cadg *pRespHdr = reinterpret_cast (pRaw); @@ -528,6 +564,8 @@ caStatus casDGClient::processDG () } this->lastRecvAddr = pReqHdr->cadg_addr; + this->seqNoOfReq = 0; + this->minor_version_number = 0; status = this->processMsg (); dgInBytesConsumed = this->in.popCtx ( inctx ); @@ -542,7 +580,9 @@ caStatus casDGClient::processDG () // release the send lock // pRespHdr->cadg_nBytes = this->out.popCtx ( outctx ) + sizeof ( *pRespHdr ); - if ( pRespHdr->cadg_nBytes > sizeof ( *pRespHdr ) ) { + // if there are not additional messages passed the version header + // then discard the message + if ( pRespHdr->cadg_nBytes > sizeof ( *pRespHdr ) + sizeof (caHdr) ) { pRespHdr->cadg_addr = pReqHdr->cadg_addr; this->out.commitRawMsg ( pRespHdr->cadg_nBytes ); } diff --git a/src/cas/generic/casInternal.h b/src/cas/generic/casInternal.h index bc9450864..2cfe346ec 100644 --- a/src/cas/generic/casInternal.h +++ b/src/cas/generic/casInternal.h @@ -34,7 +34,7 @@ // #include "tsDLList.h" #include "resourceLib.h" -#define CA_MINOR_PROTOCOL_REVISION 10 +#define CA_MINOR_PROTOCOL_REVISION 11 #include "caProto.h" #include "smartGDDPointer.h" diff --git a/src/cas/generic/server.h b/src/cas/generic/server.h index 2d989a8d8..d2b67974c 100644 --- a/src/cas/generic/server.h +++ b/src/cas/generic/server.h @@ -553,7 +553,7 @@ private: // virtual caStatus uknownMessageAction () = 0; caStatus ignoreMsgAction (); - caStatus noopAction (); + virtual caStatus versionAction (); virtual caStatus eventAddAction (); virtual caStatus eventCancelAction (); virtual caStatus readAction (); @@ -756,6 +756,7 @@ protected: private: caNetAddr lastRecvAddr; + ca_uint32_t seqNoOfReq; // // one function for each CA request type @@ -786,6 +787,8 @@ private: virtual inBufClient::fillCondition osdRecv ( char *pBuf, bufSizeT nBytesReq, fillParameter parm, bufSizeT &nBytesActual, caNetAddr & addr ) = 0; + caStatus versionAction (); + // // cadg //