diff --git a/src/cas/generic/casClient.cc b/src/cas/generic/casClient.cc index 05229188b..933a47a9c 100644 --- a/src/cas/generic/casClient.cc +++ b/src/cas/generic/casClient.cc @@ -392,6 +392,7 @@ caStatus casClient::echoAction () const void * dp = this->ctx.getData(); void * pPayloadOut; + epicsGuard < epicsMutex > guard ( this->mutex ); caStatus status = this->out.copyInHeader ( mp->m_cmmd, mp->m_postsize, mp->m_dataType, mp->m_count, mp->m_cid, mp->m_available, & pPayloadOut ); @@ -413,6 +414,7 @@ caStatus casClient::versionAction () // send minor protocol revision to the client void casClient::sendVersion () { + epicsGuard < epicsMutex > guard ( this->mutex ); caStatus status = this->out.copyInHeader ( CA_PROTO_VERSION, 0, 0, CA_MINOR_PROTOCOL_REVISION, 0, 0, 0 ); if ( ! status ) { @@ -486,6 +488,7 @@ caStatus casClient::sendErr ( const caHdrLargeArray *curp, const int reportedSta } caHdr * pReqOut; + epicsGuard < epicsMutex > guard ( this->mutex ); caStatus status = this->out.copyInHeader ( CA_PROTO_ERROR, hdrSize + stringSize, 0, 0, cid, reportedStatus, reinterpret_cast ( & pReqOut ) ); @@ -612,8 +615,8 @@ void casClient::dumpMsg ( const caHdrLargeArray *mp, char pHostName[32]; this->hostName ( pHostName, sizeof ( pHostName) ); - errlogPrintf ( -"CAS: %s on %s at %s: cmd=%d cid=%s typ=%d cnt=%d psz=%d avail=%x\n", + fprintf ( stderr, +"CAS Request: %s on %s at %s: cmd=%d cid=%s typ=%d cnt=%d psz=%d avail=%x\n", pUserName, pHostName, pName, diff --git a/src/cas/generic/casDGClient.cc b/src/cas/generic/casDGClient.cc index 083a9e246..451b4422e 100644 --- a/src/cas/generic/casDGClient.cc +++ b/src/cas/generic/casDGClient.cc @@ -275,6 +275,7 @@ caStatus casDGClient::searchResponse ( const caHdrLargeArray & msg, } ca_uint16_t * pMinorVersion; + epicsGuard < epicsMutex > guard ( this->mutex ); status = this->out.copyInHeader ( CA_PROTO_SEARCH, sizeof ( *pMinorVersion ), serverPort, 0, serverAddr, msg.m_available, @@ -302,6 +303,7 @@ caStatus casDGClient::searchFailResponse ( const caHdrLargeArray * mp ) { int status; + epicsGuard < epicsMutex > guard ( this->mutex ); status = this->out.copyInHeader ( CA_PROTO_NOT_FOUND, 0, mp->m_dataType, mp->m_count, mp->m_cid, mp->m_available, 0 ); @@ -453,6 +455,8 @@ caStatus casDGClient::asyncSearchResponse ( const caNetAddr & outAddr, return S_cas_success; } + epicsGuard < epicsMutex > guard ( this->mutex ); + // // start a DG context in the output protocol stream // and grab the send lock @@ -507,6 +511,8 @@ caStatus casDGClient::processDG () break; } + epicsGuard < epicsMutex > guard ( this->mutex ); + // // start a DG context in the output protocol stream // and grab the send lock diff --git a/src/cas/generic/casStrmClient.cc b/src/cas/generic/casStrmClient.cc index 7a2a80279..42e0a1a3d 100644 --- a/src/cas/generic/casStrmClient.cc +++ b/src/cas/generic/casStrmClient.cc @@ -229,6 +229,8 @@ caStatus casStrmClient::readResponse ( casChannelI * pChan, const caHdrLargeArra return this->sendErrWithEpicsStatus ( & msg, status, ECA_GETFAIL ); } + epicsGuard < epicsMutex > guard ( this->mutex ); + void *pPayload; { unsigned payloadSize = dbr_size_n ( msg.m_dataType, msg.m_count ); @@ -350,6 +352,8 @@ caStatus casStrmClient::readNotifyResponse ( casChannelI * pChan, return this->readNotifyFailureResponse ( msg, ECA_GETFAIL ); } + epicsGuard < epicsMutex > guard ( this->mutex ); + void *pPayload; { unsigned size = dbr_size_n ( msg.m_dataType, msg.m_count ); @@ -398,6 +402,7 @@ caStatus casStrmClient::readNotifyResponse ( casChannelI * pChan, // caStatus casStrmClient::readNotifyFailureResponse ( const caHdrLargeArray & msg, const caStatus ECA_XXXX ) { + epicsGuard < epicsMutex > guard ( this->mutex ); assert ( ECA_XXXX != ECA_NORMAL ); void *pPayload; unsigned size = dbr_size_n ( msg.m_dataType, msg.m_count ); @@ -519,6 +524,7 @@ static smartGDDPointer createDBRDD ( unsigned dbrType, unsigned elemCount ) caStatus casStrmClient::monitorFailureResponse ( const caHdrLargeArray & msg, const caStatus ECA_XXXX ) { + epicsGuard < epicsMutex > guard ( this->mutex ); assert ( ECA_XXXX != ECA_NORMAL ); void *pPayload; unsigned size = dbr_size_n ( msg.m_dataType, msg.m_count ); @@ -538,6 +544,7 @@ caStatus casStrmClient::monitorFailureResponse ( const caHdrLargeArray & msg, caStatus casStrmClient::monitorResponse ( casChannelI & chan, const caHdrLargeArray & msg, const smartConstGDDPointer & pDesc, const caStatus completionStatus ) { + epicsGuard < epicsMutex > guard ( this->mutex ); void * pPayload; { ca_uint32_t size = dbr_size_n ( msg.m_dataType, msg.m_count ); @@ -788,6 +795,7 @@ caStatus casStrmClient::writeNotifyResponse( caStatus casStrmClient::writeNotifyResponseECA_XXX ( const caHdrLargeArray & msg, const caStatus ecaStatus ) { + epicsGuard < epicsMutex > guard ( this->mutex ); caStatus status = out.copyInHeader ( msg.m_cmmd, 0, msg.m_dataType, msg.m_count, ecaStatus, msg.m_available, 0 ); @@ -1087,6 +1095,8 @@ caStatus casStrmClient::createChanResponse ( const caHdrLargeArray & hdr, const caStatus casStrmClient::enumPostponedCreateChanResponse ( casChannelI & chan, const caHdrLargeArray & hdr, unsigned nativeTypeDBR ) { + epicsGuard < epicsMutex > guard ( this->mutex ); + // // We are allocating enough space for both the claim // response and the access rights response so that we know for @@ -1163,10 +1173,11 @@ caStatus casStrmClient::channelCreateFailedResp ( errMessage( S_cas_badParameter, "- or S_casApp_asyncCompletion was async IO competion code ?"); } - else { + else if ( status != S_casApp_pvNotFound ) { errMessage ( createStatus, "- Server unable to create a new PV"); } if ( CA_V46( this->minor_version_number ) ) { + epicsGuard < epicsMutex > guard ( this->mutex ); status = this->out.copyInHeader ( CA_PROTO_CLAIM_CIU_FAILED, 0, 0, 0, hdr.m_cid, 0, 0 ); if ( status ) { @@ -1197,7 +1208,7 @@ caStatus casStrmClient::disconnectChan ( caResId id ) caStatus createStatus; if ( CA_V47 ( this->minor_version_number ) ) { - + epicsGuard < epicsMutex > guard ( this->mutex ); status = this->out.copyInHeader ( CA_PROTO_SERVER_DISCONN, 0, 0, 0, id, 0, 0 ); if ( status ) { @@ -1359,6 +1370,7 @@ caStatus casStrmClient::clearChannelAction () // // send delete confirmed message // + epicsGuard < epicsMutex > guard ( this->mutex ); status = this->out.copyInHeader ( mp->m_cmmd, 0, mp->m_dataType, mp->m_count, mp->m_cid, mp->m_available, 0 ); @@ -1397,6 +1409,7 @@ caStatus casStrmClient::eventCancelAction () return S_cas_badResourceId; } + epicsGuard < epicsMutex > guard ( this->mutex ); int status = this->out.copyInHeader ( CA_PROTO_EVENT_ADD, 0, mp->m_dataType, mp->m_count, @@ -1440,6 +1453,8 @@ caStatus casStrmClient::noReadAccessEvent(casClientMon *pMon) falseReply.m_cid = pMon->getChannel().getCID(); falseReply.m_available = pMon->getClientId(); + epicsGuard < epicsMutex > guard ( this->mutex ); + status = this->allocMsg(size, &reply); if ( status ) { if( status == S_cas_hugeRequest ) { @@ -1479,20 +1494,19 @@ caStatus casStrmClient::readSyncAction() const caHdrLargeArray *mp = this->ctx.getMsg(); int status; + epicsGuard < casCoreClient > guard ( * this ); + // // This messages indicates that the client // timed out on a read so we must clear out // any pending asynchronous IO associated with // a read. // - { - epicsGuard < casCoreClient > guard ( * this ); - tsDLIter iter = this->chanList.firstIter (); - while ( iter.valid () ) { - iter->clearOutstandingReads (); - ++iter; - } - } + tsDLIter iter = this->chanList.firstIter (); + while ( iter.valid () ) { + iter->clearOutstandingReads (); + ++iter; + } status = this->out.copyInHeader ( mp->m_cmmd, 0, mp->m_dataType, mp->m_count, @@ -1533,6 +1547,7 @@ caStatus casStrmClient::accessRightsResponse(casChannelI *pciu) ar |= CA_PROTO_ACCESS_RIGHT_WRITE; } + epicsGuard < epicsMutex > guard ( this->mutex ); status = this->out.copyInHeader ( CA_PROTO_ACCESS_RIGHTS, 0, 0, 0, pciu->getCID(), ar, 0 ); if ( ! status ) { @@ -1952,6 +1967,7 @@ unsigned casStrmClient::getDebugLevel() const void casStrmClient::flush () { + epicsGuard < epicsMutex > guard ( this->mutex ); this->out.flush (); } diff --git a/src/cas/generic/inBuf.cc b/src/cas/generic/inBuf.cc index 83f8b3b2e..025166490 100644 --- a/src/cas/generic/inBuf.cc +++ b/src/cas/generic/inBuf.cc @@ -102,7 +102,7 @@ inBufClient::fillCondition inBuf::fill ( inBufClient::fillParameter parm ) this->client.hostName ( buf, sizeof ( buf ) ); - printf ("CAS: incoming %u byte msg from %s\n", + fprintf ( stderr, "CAS Incoming: %u byte msg from %s\n", bytesRecv, buf); } } @@ -139,7 +139,6 @@ const inBufCtx inBuf::pushCtx ( bufSizeT headerSize, // X aCC 361 bufSizeT inBuf::popCtx ( const inBufCtx &ctx ) // X aCC 361 { if ( ctx.stat==inBufCtx::pushCtxSuccess ) { - epicsGuard < epicsMutex > guard ( this->mutex ); bufSizeT bytesRemoved = this->nextReadIndex; this->pBuf = ctx.pBuf; this->bufSize = ctx.bufSize; diff --git a/src/cas/generic/inBufIL.h b/src/cas/generic/inBufIL.h index 0097490c2..16f977fe0 100644 --- a/src/cas/generic/inBufIL.h +++ b/src/cas/generic/inBufIL.h @@ -41,15 +41,6 @@ inline bool inBuf::full () const return false; } -// -// inBuf::clear() -// -inline void inBuf::clear () -{ - this->bytesInBuffer = 0u; - this->nextReadIndex = 0u; -} - // // inBuf::msgPtr() // diff --git a/src/cas/generic/outBuf.cc b/src/cas/generic/outBuf.cc index 1d88ef332..96c3637b9 100644 --- a/src/cas/generic/outBuf.cc +++ b/src/cas/generic/outBuf.cc @@ -51,10 +51,7 @@ caStatus outBuf::allocRawMsg ( bufSizeT msgsize, void **ppMsg ) msgsize = CA_MESSAGE_ALIGN ( msgsize ); - this->mutex.lock (); - if ( msgsize > this->bufSize ) { - this->mutex.unlock (); return S_cas_hugeRequest; } @@ -72,7 +69,6 @@ caStatus outBuf::allocRawMsg ( bufSizeT msgsize, void **ppMsg ) // and we will let select() take care of it // if ( this->stack > stackNeeded ) { - this->mutex.unlock(); this->client.sendBlockSignal(); return S_cas_sendBlocked; } @@ -174,8 +170,8 @@ void outBuf::commitMsg () this->commitRawMsg ( hdrSize + payloadSize ); if ( this->client.getDebugLevel() ) { - errlogPrintf ( - "CAS Response => cmd=%d id=%x typ=%d cnt=%d psz=%d avail=%x outBuf ptr=%p \n", + fprintf ( stderr, + "CAS Response: cmd=%d id=%x typ=%d cnt=%d psz=%d avail=%x outBuf ptr=%p \n", epicsNTOH16 ( mp->m_cmmd ), epicsNTOH32 ( mp->m_cid ), epicsNTOH16 ( mp->m_dataType ), elementCount, payloadSize, epicsNTOH32 ( mp->m_available ), static_cast ( mp ) ); @@ -210,8 +206,6 @@ outBufClient::flushCondition outBuf::flush ( bufSizeT spaceRequired ) bufSizeT nBytesRequired; outBufClient::flushCondition cond; - epicsGuard < epicsMutex > guard ( this->mutex ); - if ( this->ctxRecursCount > 0 ) { return outBufClient::flushNone; } @@ -251,7 +245,7 @@ outBufClient::flushCondition outBuf::flush ( bufSizeT spaceRequired ) if ( this->client.getDebugLevel () > 2u ) { char buf[64]; this->client.hostName ( buf, sizeof ( buf ) ); - errlogPrintf ( "CAS: Sent a %d byte reply to %s\n", + fprintf ( stderr, "CAS outgoing: %u byte reply to %s\n", nBytes, buf ); } } diff --git a/src/cas/generic/outBufIL.h b/src/cas/generic/outBufIL.h index 3f1500c83..e149a68b7 100644 --- a/src/cas/generic/outBufIL.h +++ b/src/cas/generic/outBufIL.h @@ -22,7 +22,6 @@ #define epicsExportSharedSymbols #endif - // // outBuf::bytesPresent () // number of bytes in the output queue @@ -34,38 +33,10 @@ inline bufSizeT outBuf::bytesPresent () const // This guarantees that any pushCtx() operation // in progress completes before another thread checks. // - epicsGuard < epicsMutex > locker ( this->mutex ); bufSizeT result = this->stack; return result; } -// -// outBuf::clear () -// -inline void outBuf::clear () -{ - // - // Note on use of lock here: - // This guarantees that any pushCtx() operation - // in progress completes before another thread - // clears. - // - epicsGuard < epicsMutex > locker ( this->mutex ); - this->stack = 0u; -} - -// -// outBuf::allocMsg () -// -// allocates space in the outgoing message buffer -// -// (if space is avilable this leaves the send lock applied) -// -//inline caStatus outBuf::allocMsg (bufSizeT extsize, caHdr **ppMsg) -//{ -// return this->allocRawMsg (extsize + sizeof(caHdr), (void **)ppMsg); -//} - // // outBuf::commitRawMsg() // @@ -73,8 +44,6 @@ inline void outBuf::commitRawMsg (bufSizeT size) { this->stack += size; assert ( this->stack <= this->bufSize ); - - this->mutex.unlock(); } // diff --git a/src/cas/generic/server.h b/src/cas/generic/server.h index 80b1f129c..13498434e 100644 --- a/src/cas/generic/server.h +++ b/src/cas/generic/server.h @@ -222,8 +222,6 @@ public: void removeMsg ( bufSizeT nBytes ); - void clear (); - char * msgPtr () const; // @@ -243,7 +241,6 @@ public: void expandBuffer (); private: - epicsMutex mutex; inBufClient & client; clientBufMemoryManager & memMgr; char * pBuf; @@ -336,10 +333,7 @@ public: const outBufCtx pushCtx ( bufSizeT headerSize, bufSizeT maxBodySize, void *&pHeader ); bufSizeT popCtx ( const outBufCtx & ); // returns actual size - void clear (); - private: - mutable epicsMutex mutex; outBufClient & client; clientBufMemoryManager & memMgr; char * pBuf;