From 59ca167c5d6eed7a2f18cdd0411c7ce7af81904c Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Wed, 16 Apr 2003 20:39:29 +0000 Subject: [PATCH] added comQueSendMsgMinder class --- src/ca/comQueSend.h | 66 ++++++++++++++++++++++++++++++-------- src/ca/tcpiiu.cpp | 78 +++++++++++++++++++++++++-------------------- 2 files changed, 96 insertions(+), 48 deletions(-) diff --git a/src/ca/comQueSend.h b/src/ca/comQueSend.h index 5d20170b9..b34349b57 100644 --- a/src/ca/comQueSend.h +++ b/src/ca/comQueSend.h @@ -35,6 +35,19 @@ #define comQueSendCopyDispatchSize 39 +class epicsMutex; +template < class T > class epicsGuard; + +class comQueSendMsgMinder { +public: + comQueSendMsgMinder ( + class comQueSend &, epicsGuard < cacMutex > & ); + ~comQueSendMsgMinder (); + void commit (); +private: + class comQueSend * pSendQue; +}; + // // Notes. // o calling popNextComBufToSend() will clear any uncommitted bytes @@ -44,8 +57,6 @@ public: comQueSend ( wireSendAdapter &, comBufMemoryManager & ); ~comQueSend (); void clear (); - void beginMsg (); - void commitMsg (); unsigned occupiedBytes () const; bool flushEarlyThreshold ( unsigned nBytesThisMsg ) const; bool flushBlockThreshold ( unsigned nBytesThisMsg ) const; @@ -53,7 +64,7 @@ public: void pushUInt32 ( const ca_uint32_t value ); void pushFloat32 ( const ca_float32_t value ); void pushString ( const char *pVal, unsigned nChar ); - void insertRequestHeader ( + void insertRequestHeader ( ca_uint16_t request, ca_uint32_t payloadSize, ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid, ca_uint32_t requestDependent, bool v49Ok ); @@ -69,7 +80,8 @@ private: wireSendAdapter & wire; unsigned nBytesPending; - typedef void ( comQueSend::*copyScalarFunc_t ) ( const void * pValue ); + typedef void ( comQueSend::*copyScalarFunc_t ) ( + const void * pValue ); static const copyScalarFunc_t dbrCopyScalar [comQueSendCopyDispatchSize]; void copy_dbr_string ( const void * pValue ); void copy_dbr_short ( const void * pValue ); @@ -77,9 +89,10 @@ private: void copy_dbr_char ( const void * pValue ); void copy_dbr_long ( const void * pValue ); void copy_dbr_double ( const void * pValue ); + void copy_dbr_invalid ( const void * pValue ); typedef void ( comQueSend::*copyVectorFunc_t ) ( - const void *pValue, unsigned nElem ); + const void * pValue, unsigned nElem ); static const copyVectorFunc_t dbrCopyVector [comQueSendCopyDispatchSize]; void copy_dbr_string ( const void *pValue, unsigned nElem ); void copy_dbr_short ( const void *pValue, unsigned nElem ); @@ -87,10 +100,16 @@ private: void copy_dbr_char ( const void *pValue, unsigned nElem ); void copy_dbr_long ( const void *pValue, unsigned nElem ); void copy_dbr_double ( const void *pValue, unsigned nElem ); + void copy_dbr_invalid ( const void * pValue, unsigned nElem ); void pushComBuf ( comBuf & ); comBuf * newComBuf (); - void clearUncommitted (); + + void beginMsg (); + void commitMsg (); + void clearUncommitedMsg (); + + friend class comQueSendMsgMinder; // // visual C++ versions 6 & 7 do not allow out of @@ -137,6 +156,33 @@ private: extern const char cacNillBytes[]; +inline comQueSendMsgMinder::comQueSendMsgMinder ( + class comQueSend & sendQueIn, epicsGuard < cacMutex > & ) : + pSendQue ( & sendQueIn ) +{ + sendQueIn.beginMsg (); +} + +inline comQueSendMsgMinder::~comQueSendMsgMinder () +{ + if ( this->pSendQue ) { + this->pSendQue->clearUncommitedMsg (); + } +} + +inline void comQueSendMsgMinder::commit () +{ + if ( this->pSendQue ) { + this->pSendQue->commitMsg (); + this->pSendQue = 0; + } +} + +inline void comQueSend::beginMsg () +{ + this->pFirstUncommited = this->bufs.lastIter (); +} + inline void comQueSend::pushUInt16 ( const ca_uint16_t value ) { this->push ( value ); @@ -180,14 +226,6 @@ inline bool comQueSend::flushEarlyThreshold ( unsigned nBytesThisMsg ) const return ( this->nBytesPending + nBytesThisMsg > 4 * comBuf::capacityBytes () ); } -inline void comQueSend::beginMsg () -{ - if ( this->pFirstUncommited.valid() ) { - this->clearUncommitted (); - } - this->pFirstUncommited = this->bufs.lastIter (); -} - // wrapping this with a function avoids WRS T2.2 Cygnus GNU compiler bugs inline comBuf * comQueSend::newComBuf () { diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index 45eb4be85..2e751ca9d 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -158,6 +158,7 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, static_cast < const char * > (pBuf), (int) nBytesInBuf, 0 ); if ( status > 0 ) { nBytes = static_cast ( status ); + // printf("SEND: %u\n", nBytes ); break; } else { @@ -843,7 +844,7 @@ bool tcpiiu::processIncoming ( /* * tcpiiu::hostNameSetRequest () */ -void tcpiiu::hostNameSetRequest ( epicsGuard < cacMutex > & ) +void tcpiiu::hostNameSetRequest ( epicsGuard < cacMutex > & locker ) { if ( ! CA_V41 ( this->minorProtocolVersion ) ) { return; @@ -860,7 +861,7 @@ void tcpiiu::hostNameSetRequest ( epicsGuard < cacMutex > & ) this->flushRequest (); } - this->sendQue.beginMsg (); + comQueSendMsgMinder minder ( this->sendQue, locker ); this->sendQue.pushUInt16 ( CA_PROTO_HOST_NAME ); // cmd this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( postSize ) ); // postsize this->sendQue.pushUInt16 ( 0u ); // dataType @@ -869,13 +870,13 @@ void tcpiiu::hostNameSetRequest ( epicsGuard < cacMutex > & ) this->sendQue.pushUInt32 ( 0u ); // available this->sendQue.pushString ( pName, size ); this->sendQue.pushString ( cacNillBytes, postSize - size ); - this->sendQue.commitMsg (); + minder.commit (); } /* * tcpiiu::userNameSetRequest () */ -void tcpiiu::userNameSetRequest ( epicsGuard < cacMutex > & ) +void tcpiiu::userNameSetRequest ( epicsGuard < cacMutex > & locker ) { if ( ! CA_V41 ( this->minorProtocolVersion ) ) { return; @@ -890,7 +891,7 @@ void tcpiiu::userNameSetRequest ( epicsGuard < cacMutex > & ) this->flushRequest (); } - this->sendQue.beginMsg (); + comQueSendMsgMinder minder ( this->sendQue, locker ); this->sendQue.pushUInt16 ( CA_PROTO_CLIENT_NAME ); // cmd this->sendQue.pushUInt16 ( static_cast ( postSize ) ); // postsize this->sendQue.pushUInt16 ( 0u ); // dataType @@ -899,40 +900,40 @@ void tcpiiu::userNameSetRequest ( epicsGuard < cacMutex > & ) this->sendQue.pushUInt32 ( 0u ); // available this->sendQue.pushString ( pName, size ); this->sendQue.pushString ( cacNillBytes, postSize - size ); - this->sendQue.commitMsg (); + minder.commit (); } -void tcpiiu::disableFlowControlRequest ( epicsGuard < cacMutex > & ) +void tcpiiu::disableFlowControlRequest ( epicsGuard < cacMutex > & locker ) { if ( this->sendQue.flushEarlyThreshold ( 16u ) ) { this->flushRequest (); } - this->sendQue.beginMsg (); + comQueSendMsgMinder minder ( this->sendQue, locker ); this->sendQue.pushUInt16 ( CA_PROTO_EVENTS_ON ); // cmd this->sendQue.pushUInt16 ( 0u ); // postsize this->sendQue.pushUInt16 ( 0u ); // dataType this->sendQue.pushUInt16 ( 0u ); // count this->sendQue.pushUInt32 ( 0u ); // cid this->sendQue.pushUInt32 ( 0u ); // available - this->sendQue.commitMsg (); + minder.commit (); } -void tcpiiu::enableFlowControlRequest ( epicsGuard < cacMutex > & ) +void tcpiiu::enableFlowControlRequest ( epicsGuard < cacMutex > & locker ) { if ( this->sendQue.flushEarlyThreshold ( 16u ) ) { this->flushRequest (); } - this->sendQue.beginMsg (); + comQueSendMsgMinder minder ( this->sendQue, locker ); this->sendQue.pushUInt16 ( CA_PROTO_EVENTS_OFF ); // cmd this->sendQue.pushUInt16 ( 0u ); // postsize this->sendQue.pushUInt16 ( 0u ); // dataType this->sendQue.pushUInt16 ( 0u ); // count this->sendQue.pushUInt32 ( 0u ); // cid this->sendQue.pushUInt32 ( 0u ); // available - this->sendQue.commitMsg (); + minder.commit (); } -void tcpiiu::versionMessage ( epicsGuard < cacMutex > &, +void tcpiiu::versionMessage ( epicsGuard < cacMutex > & locker, const cacChannel::priLev & priority ) { assert ( priority <= 0xffff ); @@ -941,44 +942,46 @@ void tcpiiu::versionMessage ( epicsGuard < cacMutex > &, this->flushRequest (); } - this->sendQue.beginMsg (); + comQueSendMsgMinder minder ( this->sendQue, locker ); this->sendQue.pushUInt16 ( CA_PROTO_VERSION ); // cmd this->sendQue.pushUInt16 ( 0u ); // old postsize field this->sendQue.pushUInt16 ( static_cast ( priority ) ); // old dataType field this->sendQue.pushUInt16 ( CA_MINOR_PROTOCOL_REVISION ); // old count field this->sendQue.pushUInt32 ( 0u ); // ( old cid field ) this->sendQue.pushUInt32 ( 0u ); // ( old available field ) - this->sendQue.commitMsg (); + minder.commit (); } -void tcpiiu::echoRequest ( epicsGuard < cacMutex > & ) +void tcpiiu::echoRequest ( epicsGuard < cacMutex > & locker ) { if ( this->sendQue.flushEarlyThreshold ( 16u ) ) { this->flushRequest (); } - this->sendQue.beginMsg (); + comQueSendMsgMinder minder ( this->sendQue, locker ); this->sendQue.pushUInt16 ( CA_PROTO_ECHO ); // cmd this->sendQue.pushUInt16 ( 0u ); // postsize this->sendQue.pushUInt16 ( 0u ); // dataType this->sendQue.pushUInt16 ( 0u ); // count this->sendQue.pushUInt32 ( 0u ); // cid this->sendQue.pushUInt32 ( 0u ); // available - this->sendQue.commitMsg (); + minder.commit (); } -void tcpiiu::writeRequest ( epicsGuard < cacMutex > &, +void tcpiiu::writeRequest ( epicsGuard < cacMutex > & guard, nciu &chan, unsigned type, unsigned nElem, const void *pValue ) { if ( ! chan.connected () ) { throw cacChannel::notConnected(); } + comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestWithPayLoad ( CA_PROTO_WRITE, type, nElem, chan.getSID(), chan.getCID(), pValue, CA_V49 ( this->minorProtocolVersion ) ); + minder.commit (); } -void tcpiiu::writeNotifyRequest ( epicsGuard < cacMutex > &, +void tcpiiu::writeNotifyRequest ( epicsGuard < cacMutex > & guard, nciu &chan, netWriteNotifyIO &io, unsigned type, unsigned nElem, const void *pValue ) { @@ -988,12 +991,14 @@ void tcpiiu::writeNotifyRequest ( epicsGuard < cacMutex > &, if ( ! this->ca_v41_ok () ) { throw cacChannel::unsupportedByService(); } + comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestWithPayLoad ( CA_PROTO_WRITE_NOTIFY, type, nElem, chan.getSID(), io.getId(), pValue, CA_V49 ( this->minorProtocolVersion ) ); + minder.commit (); } -void tcpiiu::readNotifyRequest ( epicsGuard < cacMutex > &, +void tcpiiu::readNotifyRequest ( epicsGuard < cacMutex > & locker, nciu & chan, netReadNotifyIO & io, unsigned dataType, unsigned nElem ) { @@ -1017,15 +1022,17 @@ void tcpiiu::readNotifyRequest ( epicsGuard < cacMutex > &, if ( nElem > maxElem ) { throw cacChannel::msgBodyCacheTooSmall (); } + comQueSendMsgMinder minder ( this->sendQue, locker ); this->sendQue.insertRequestHeader ( CA_PROTO_READ_NOTIFY, 0u, static_cast < ca_uint16_t > ( dataType ), nElem, chan.getSID(), io.getId(), CA_V49 ( this->minorProtocolVersion ) ); - this->sendQue.commitMsg (); + minder.commit (); } -void tcpiiu::createChannelRequest ( nciu & chan ) +void tcpiiu::createChannelRequest ( + nciu & chan, epicsGuard < cacMutex > & guard ) { const char *pName; unsigned nameLength; @@ -1047,7 +1054,7 @@ void tcpiiu::createChannelRequest ( nciu & chan ) throw cacChannel::unsupportedByService(); } - this->sendQue.beginMsg (); + comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.pushUInt16 ( CA_PROTO_CLAIM_CIU ); // cmd this->sendQue.pushUInt16 ( static_cast < epicsUInt16 > ( postCnt ) ); // postsize this->sendQue.pushUInt16 ( 0u ); // dataType @@ -1065,27 +1072,27 @@ void tcpiiu::createChannelRequest ( nciu & chan ) if ( postCnt > nameLength ) { this->sendQue.pushString ( cacNillBytes, postCnt - nameLength ); } - this->sendQue.commitMsg (); + minder.commit (); } -void tcpiiu::clearChannelRequest ( epicsGuard < cacMutex > &, +void tcpiiu::clearChannelRequest ( epicsGuard < cacMutex > & locker, ca_uint32_t sid, ca_uint32_t cid ) { - this->sendQue.beginMsg (); + comQueSendMsgMinder minder ( this->sendQue, locker ); this->sendQue.pushUInt16 ( CA_PROTO_CLEAR_CHANNEL ); // cmd this->sendQue.pushUInt16 ( 0u ); // postsize this->sendQue.pushUInt16 ( 0u ); // dataType this->sendQue.pushUInt16 ( 0u ); // count this->sendQue.pushUInt32 ( sid ); // cid this->sendQue.pushUInt32 ( cid ); // available - this->sendQue.commitMsg (); + minder.commit (); } // // this routine return void because if this internally fails the best response // is to try again the next time that we reconnect // -void tcpiiu::subscriptionRequest ( epicsGuard < cacMutex > &, +void tcpiiu::subscriptionRequest ( epicsGuard < cacMutex > & locker, nciu &chan, netSubscription & subscr ) { if ( ! chan.connected() ) { @@ -1109,6 +1116,7 @@ void tcpiiu::subscriptionRequest ( epicsGuard < cacMutex > &, if ( nElem > maxElem ) { throw cacChannel::msgBodyCacheTooSmall (); } + comQueSendMsgMinder minder ( this->sendQue, locker ); this->sendQue.insertRequestHeader ( CA_PROTO_EVENT_ADD, 16u, static_cast < ca_uint16_t > ( dataType ), @@ -1121,19 +1129,20 @@ void tcpiiu::subscriptionRequest ( epicsGuard < cacMutex > &, this->sendQue.pushFloat32 ( 0.0 ); // m_toval this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( mask ) ); // m_mask this->sendQue.pushUInt16 ( 0u ); // m_pad - this->sendQue.commitMsg (); + minder.commit (); } -void tcpiiu::subscriptionCancelRequest ( epicsGuard < cacMutex > &, +void tcpiiu::subscriptionCancelRequest ( epicsGuard < cacMutex > & locker, nciu & chan, netSubscription & subscr ) { + comQueSendMsgMinder minder ( this->sendQue, locker ); this->sendQue.insertRequestHeader ( CA_PROTO_EVENT_CANCEL, 0u, static_cast < ca_uint16_t > ( subscr.getType() ), static_cast < ca_uint16_t > ( subscr.getCount() ), chan.getSID(), subscr.getId(), CA_V49 ( this->minorProtocolVersion ) ); - this->sendQue.commitMsg (); + minder.commit (); } bool tcpiiu::flush () @@ -1270,12 +1279,13 @@ void tcpiiu::removeAllChannels ( } } -void tcpiiu::installChannel ( epicsGuard < cacMutex > &, nciu & chan, unsigned sidIn, +void tcpiiu::installChannel ( epicsGuard < cacMutex > & guard, + nciu & chan, unsigned sidIn, ca_uint16_t typeIn, arrayElementCount countIn ) { this->channelList.add ( chan ); chan.searchReplySetUp ( *this, sidIn, typeIn, countIn ); - chan.createChannelRequest ( *this ); + chan.createChannelRequest ( *this, guard ); this->flushRequest (); }