added comQueSendMsgMinder class

This commit is contained in:
Jeff Hill
2003-04-16 20:39:29 +00:00
parent a63da3e2a8
commit 59ca167c5d
2 changed files with 96 additions and 48 deletions

View File

@@ -35,6 +35,19 @@
#define comQueSendCopyDispatchSize 39
class epicsMutex;
template < class T > class epicsGuard;
class comQueSendMsgMinder {
public:
comQueSendMsgMinder (
class comQueSend &, epicsGuard < cacMutex > & );
~comQueSendMsgMinder ();
void commit ();
private:
class comQueSend * pSendQue;
};
//
// Notes.
// o calling popNextComBufToSend() will clear any uncommitted bytes
@@ -44,8 +57,6 @@ public:
comQueSend ( wireSendAdapter &, comBufMemoryManager & );
~comQueSend ();
void clear ();
void beginMsg ();
void commitMsg ();
unsigned occupiedBytes () const;
bool flushEarlyThreshold ( unsigned nBytesThisMsg ) const;
bool flushBlockThreshold ( unsigned nBytesThisMsg ) const;
@@ -53,7 +64,7 @@ public:
void pushUInt32 ( const ca_uint32_t value );
void pushFloat32 ( const ca_float32_t value );
void pushString ( const char *pVal, unsigned nChar );
void insertRequestHeader (
void insertRequestHeader (
ca_uint16_t request, ca_uint32_t payloadSize,
ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid,
ca_uint32_t requestDependent, bool v49Ok );
@@ -69,7 +80,8 @@ private:
wireSendAdapter & wire;
unsigned nBytesPending;
typedef void ( comQueSend::*copyScalarFunc_t ) ( const void * pValue );
typedef void ( comQueSend::*copyScalarFunc_t ) (
const void * pValue );
static const copyScalarFunc_t dbrCopyScalar [comQueSendCopyDispatchSize];
void copy_dbr_string ( const void * pValue );
void copy_dbr_short ( const void * pValue );
@@ -77,9 +89,10 @@ private:
void copy_dbr_char ( const void * pValue );
void copy_dbr_long ( const void * pValue );
void copy_dbr_double ( const void * pValue );
void copy_dbr_invalid ( const void * pValue );
typedef void ( comQueSend::*copyVectorFunc_t ) (
const void *pValue, unsigned nElem );
const void * pValue, unsigned nElem );
static const copyVectorFunc_t dbrCopyVector [comQueSendCopyDispatchSize];
void copy_dbr_string ( const void *pValue, unsigned nElem );
void copy_dbr_short ( const void *pValue, unsigned nElem );
@@ -87,10 +100,16 @@ private:
void copy_dbr_char ( const void *pValue, unsigned nElem );
void copy_dbr_long ( const void *pValue, unsigned nElem );
void copy_dbr_double ( const void *pValue, unsigned nElem );
void copy_dbr_invalid ( const void * pValue, unsigned nElem );
void pushComBuf ( comBuf & );
comBuf * newComBuf ();
void clearUncommitted ();
void beginMsg ();
void commitMsg ();
void clearUncommitedMsg ();
friend class comQueSendMsgMinder;
//
// visual C++ versions 6 & 7 do not allow out of
@@ -137,6 +156,33 @@ private:
extern const char cacNillBytes[];
inline comQueSendMsgMinder::comQueSendMsgMinder (
class comQueSend & sendQueIn, epicsGuard < cacMutex > & ) :
pSendQue ( & sendQueIn )
{
sendQueIn.beginMsg ();
}
inline comQueSendMsgMinder::~comQueSendMsgMinder ()
{
if ( this->pSendQue ) {
this->pSendQue->clearUncommitedMsg ();
}
}
inline void comQueSendMsgMinder::commit ()
{
if ( this->pSendQue ) {
this->pSendQue->commitMsg ();
this->pSendQue = 0;
}
}
inline void comQueSend::beginMsg ()
{
this->pFirstUncommited = this->bufs.lastIter ();
}
inline void comQueSend::pushUInt16 ( const ca_uint16_t value )
{
this->push ( value );
@@ -180,14 +226,6 @@ inline bool comQueSend::flushEarlyThreshold ( unsigned nBytesThisMsg ) const
return ( this->nBytesPending + nBytesThisMsg > 4 * comBuf::capacityBytes () );
}
inline void comQueSend::beginMsg ()
{
if ( this->pFirstUncommited.valid() ) {
this->clearUncommitted ();
}
this->pFirstUncommited = this->bufs.lastIter ();
}
// wrapping this with a function avoids WRS T2.2 Cygnus GNU compiler bugs
inline comBuf * comQueSend::newComBuf ()
{

View File

@@ -158,6 +158,7 @@ unsigned tcpiiu::sendBytes ( const void *pBuf,
static_cast < const char * > (pBuf), (int) nBytesInBuf, 0 );
if ( status > 0 ) {
nBytes = static_cast <unsigned> ( status );
// printf("SEND: %u\n", nBytes );
break;
}
else {
@@ -843,7 +844,7 @@ bool tcpiiu::processIncoming (
/*
* tcpiiu::hostNameSetRequest ()
*/
void tcpiiu::hostNameSetRequest ( epicsGuard < cacMutex > & )
void tcpiiu::hostNameSetRequest ( epicsGuard < cacMutex > & locker )
{
if ( ! CA_V41 ( this->minorProtocolVersion ) ) {
return;
@@ -860,7 +861,7 @@ void tcpiiu::hostNameSetRequest ( epicsGuard < cacMutex > & )
this->flushRequest ();
}
this->sendQue.beginMsg ();
comQueSendMsgMinder minder ( this->sendQue, locker );
this->sendQue.pushUInt16 ( CA_PROTO_HOST_NAME ); // cmd
this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( postSize ) ); // postsize
this->sendQue.pushUInt16 ( 0u ); // dataType
@@ -869,13 +870,13 @@ void tcpiiu::hostNameSetRequest ( epicsGuard < cacMutex > & )
this->sendQue.pushUInt32 ( 0u ); // available
this->sendQue.pushString ( pName, size );
this->sendQue.pushString ( cacNillBytes, postSize - size );
this->sendQue.commitMsg ();
minder.commit ();
}
/*
* tcpiiu::userNameSetRequest ()
*/
void tcpiiu::userNameSetRequest ( epicsGuard < cacMutex > & )
void tcpiiu::userNameSetRequest ( epicsGuard < cacMutex > & locker )
{
if ( ! CA_V41 ( this->minorProtocolVersion ) ) {
return;
@@ -890,7 +891,7 @@ void tcpiiu::userNameSetRequest ( epicsGuard < cacMutex > & )
this->flushRequest ();
}
this->sendQue.beginMsg ();
comQueSendMsgMinder minder ( this->sendQue, locker );
this->sendQue.pushUInt16 ( CA_PROTO_CLIENT_NAME ); // cmd
this->sendQue.pushUInt16 ( static_cast <epicsUInt16> ( postSize ) ); // postsize
this->sendQue.pushUInt16 ( 0u ); // dataType
@@ -899,40 +900,40 @@ void tcpiiu::userNameSetRequest ( epicsGuard < cacMutex > & )
this->sendQue.pushUInt32 ( 0u ); // available
this->sendQue.pushString ( pName, size );
this->sendQue.pushString ( cacNillBytes, postSize - size );
this->sendQue.commitMsg ();
minder.commit ();
}
void tcpiiu::disableFlowControlRequest ( epicsGuard < cacMutex > & )
void tcpiiu::disableFlowControlRequest ( epicsGuard < cacMutex > & locker )
{
if ( this->sendQue.flushEarlyThreshold ( 16u ) ) {
this->flushRequest ();
}
this->sendQue.beginMsg ();
comQueSendMsgMinder minder ( this->sendQue, locker );
this->sendQue.pushUInt16 ( CA_PROTO_EVENTS_ON ); // cmd
this->sendQue.pushUInt16 ( 0u ); // postsize
this->sendQue.pushUInt16 ( 0u ); // dataType
this->sendQue.pushUInt16 ( 0u ); // count
this->sendQue.pushUInt32 ( 0u ); // cid
this->sendQue.pushUInt32 ( 0u ); // available
this->sendQue.commitMsg ();
minder.commit ();
}
void tcpiiu::enableFlowControlRequest ( epicsGuard < cacMutex > & )
void tcpiiu::enableFlowControlRequest ( epicsGuard < cacMutex > & locker )
{
if ( this->sendQue.flushEarlyThreshold ( 16u ) ) {
this->flushRequest ();
}
this->sendQue.beginMsg ();
comQueSendMsgMinder minder ( this->sendQue, locker );
this->sendQue.pushUInt16 ( CA_PROTO_EVENTS_OFF ); // cmd
this->sendQue.pushUInt16 ( 0u ); // postsize
this->sendQue.pushUInt16 ( 0u ); // dataType
this->sendQue.pushUInt16 ( 0u ); // count
this->sendQue.pushUInt32 ( 0u ); // cid
this->sendQue.pushUInt32 ( 0u ); // available
this->sendQue.commitMsg ();
minder.commit ();
}
void tcpiiu::versionMessage ( epicsGuard < cacMutex > &,
void tcpiiu::versionMessage ( epicsGuard < cacMutex > & locker,
const cacChannel::priLev & priority )
{
assert ( priority <= 0xffff );
@@ -941,44 +942,46 @@ void tcpiiu::versionMessage ( epicsGuard < cacMutex > &,
this->flushRequest ();
}
this->sendQue.beginMsg ();
comQueSendMsgMinder minder ( this->sendQue, locker );
this->sendQue.pushUInt16 ( CA_PROTO_VERSION ); // cmd
this->sendQue.pushUInt16 ( 0u ); // old postsize field
this->sendQue.pushUInt16 ( static_cast <ca_uint16_t> ( priority ) ); // old dataType field
this->sendQue.pushUInt16 ( CA_MINOR_PROTOCOL_REVISION ); // old count field
this->sendQue.pushUInt32 ( 0u ); // ( old cid field )
this->sendQue.pushUInt32 ( 0u ); // ( old available field )
this->sendQue.commitMsg ();
minder.commit ();
}
void tcpiiu::echoRequest ( epicsGuard < cacMutex > & )
void tcpiiu::echoRequest ( epicsGuard < cacMutex > & locker )
{
if ( this->sendQue.flushEarlyThreshold ( 16u ) ) {
this->flushRequest ();
}
this->sendQue.beginMsg ();
comQueSendMsgMinder minder ( this->sendQue, locker );
this->sendQue.pushUInt16 ( CA_PROTO_ECHO ); // cmd
this->sendQue.pushUInt16 ( 0u ); // postsize
this->sendQue.pushUInt16 ( 0u ); // dataType
this->sendQue.pushUInt16 ( 0u ); // count
this->sendQue.pushUInt32 ( 0u ); // cid
this->sendQue.pushUInt32 ( 0u ); // available
this->sendQue.commitMsg ();
minder.commit ();
}
void tcpiiu::writeRequest ( epicsGuard < cacMutex > &,
void tcpiiu::writeRequest ( epicsGuard < cacMutex > & guard,
nciu &chan, unsigned type, unsigned nElem, const void *pValue )
{
if ( ! chan.connected () ) {
throw cacChannel::notConnected();
}
comQueSendMsgMinder minder ( this->sendQue, guard );
this->sendQue.insertRequestWithPayLoad ( CA_PROTO_WRITE,
type, nElem, chan.getSID(), chan.getCID(), pValue,
CA_V49 ( this->minorProtocolVersion ) );
minder.commit ();
}
void tcpiiu::writeNotifyRequest ( epicsGuard < cacMutex > &,
void tcpiiu::writeNotifyRequest ( epicsGuard < cacMutex > & guard,
nciu &chan, netWriteNotifyIO &io, unsigned type,
unsigned nElem, const void *pValue )
{
@@ -988,12 +991,14 @@ void tcpiiu::writeNotifyRequest ( epicsGuard < cacMutex > &,
if ( ! this->ca_v41_ok () ) {
throw cacChannel::unsupportedByService();
}
comQueSendMsgMinder minder ( this->sendQue, guard );
this->sendQue.insertRequestWithPayLoad ( CA_PROTO_WRITE_NOTIFY,
type, nElem, chan.getSID(), io.getId(), pValue,
CA_V49 ( this->minorProtocolVersion ) );
minder.commit ();
}
void tcpiiu::readNotifyRequest ( epicsGuard < cacMutex > &,
void tcpiiu::readNotifyRequest ( epicsGuard < cacMutex > & locker,
nciu & chan, netReadNotifyIO & io,
unsigned dataType, unsigned nElem )
{
@@ -1017,15 +1022,17 @@ void tcpiiu::readNotifyRequest ( epicsGuard < cacMutex > &,
if ( nElem > maxElem ) {
throw cacChannel::msgBodyCacheTooSmall ();
}
comQueSendMsgMinder minder ( this->sendQue, locker );
this->sendQue.insertRequestHeader (
CA_PROTO_READ_NOTIFY, 0u,
static_cast < ca_uint16_t > ( dataType ),
nElem, chan.getSID(), io.getId(),
CA_V49 ( this->minorProtocolVersion ) );
this->sendQue.commitMsg ();
minder.commit ();
}
void tcpiiu::createChannelRequest ( nciu & chan )
void tcpiiu::createChannelRequest (
nciu & chan, epicsGuard < cacMutex > & guard )
{
const char *pName;
unsigned nameLength;
@@ -1047,7 +1054,7 @@ void tcpiiu::createChannelRequest ( nciu & chan )
throw cacChannel::unsupportedByService();
}
this->sendQue.beginMsg ();
comQueSendMsgMinder minder ( this->sendQue, guard );
this->sendQue.pushUInt16 ( CA_PROTO_CLAIM_CIU ); // cmd
this->sendQue.pushUInt16 ( static_cast < epicsUInt16 > ( postCnt ) ); // postsize
this->sendQue.pushUInt16 ( 0u ); // dataType
@@ -1065,27 +1072,27 @@ void tcpiiu::createChannelRequest ( nciu & chan )
if ( postCnt > nameLength ) {
this->sendQue.pushString ( cacNillBytes, postCnt - nameLength );
}
this->sendQue.commitMsg ();
minder.commit ();
}
void tcpiiu::clearChannelRequest ( epicsGuard < cacMutex > &,
void tcpiiu::clearChannelRequest ( epicsGuard < cacMutex > & locker,
ca_uint32_t sid, ca_uint32_t cid )
{
this->sendQue.beginMsg ();
comQueSendMsgMinder minder ( this->sendQue, locker );
this->sendQue.pushUInt16 ( CA_PROTO_CLEAR_CHANNEL ); // cmd
this->sendQue.pushUInt16 ( 0u ); // postsize
this->sendQue.pushUInt16 ( 0u ); // dataType
this->sendQue.pushUInt16 ( 0u ); // count
this->sendQue.pushUInt32 ( sid ); // cid
this->sendQue.pushUInt32 ( cid ); // available
this->sendQue.commitMsg ();
minder.commit ();
}
//
// this routine return void because if this internally fails the best response
// is to try again the next time that we reconnect
//
void tcpiiu::subscriptionRequest ( epicsGuard < cacMutex > &,
void tcpiiu::subscriptionRequest ( epicsGuard < cacMutex > & locker,
nciu &chan, netSubscription & subscr )
{
if ( ! chan.connected() ) {
@@ -1109,6 +1116,7 @@ void tcpiiu::subscriptionRequest ( epicsGuard < cacMutex > &,
if ( nElem > maxElem ) {
throw cacChannel::msgBodyCacheTooSmall ();
}
comQueSendMsgMinder minder ( this->sendQue, locker );
this->sendQue.insertRequestHeader (
CA_PROTO_EVENT_ADD, 16u,
static_cast < ca_uint16_t > ( dataType ),
@@ -1121,19 +1129,20 @@ void tcpiiu::subscriptionRequest ( epicsGuard < cacMutex > &,
this->sendQue.pushFloat32 ( 0.0 ); // m_toval
this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( mask ) ); // m_mask
this->sendQue.pushUInt16 ( 0u ); // m_pad
this->sendQue.commitMsg ();
minder.commit ();
}
void tcpiiu::subscriptionCancelRequest ( epicsGuard < cacMutex > &,
void tcpiiu::subscriptionCancelRequest ( epicsGuard < cacMutex > & locker,
nciu & chan, netSubscription & subscr )
{
comQueSendMsgMinder minder ( this->sendQue, locker );
this->sendQue.insertRequestHeader (
CA_PROTO_EVENT_CANCEL, 0u,
static_cast < ca_uint16_t > ( subscr.getType() ),
static_cast < ca_uint16_t > ( subscr.getCount() ),
chan.getSID(), subscr.getId(),
CA_V49 ( this->minorProtocolVersion ) );
this->sendQue.commitMsg ();
minder.commit ();
}
bool tcpiiu::flush ()
@@ -1270,12 +1279,13 @@ void tcpiiu::removeAllChannels (
}
}
void tcpiiu::installChannel ( epicsGuard < cacMutex > &, nciu & chan, unsigned sidIn,
void tcpiiu::installChannel ( epicsGuard < cacMutex > & guard,
nciu & chan, unsigned sidIn,
ca_uint16_t typeIn, arrayElementCount countIn )
{
this->channelList.add ( chan );
chan.searchReplySetUp ( *this, sidIn, typeIn, countIn );
chan.createChannelRequest ( *this );
chan.createChannelRequest ( *this, guard );
this->flushRequest ();
}