cleaned up locking (also avoid destroying a lock that we own on Linux)
This commit is contained in:
@@ -392,6 +392,7 @@ caStatus casClient::echoAction ()
|
||||
const void * dp = this->ctx.getData();
|
||||
void * pPayloadOut;
|
||||
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
caStatus status = this->out.copyInHeader ( mp->m_cmmd, mp->m_postsize,
|
||||
mp->m_dataType, mp->m_count, mp->m_cid, mp->m_available,
|
||||
& pPayloadOut );
|
||||
@@ -413,6 +414,7 @@ caStatus casClient::versionAction ()
|
||||
// send minor protocol revision to the client
|
||||
void casClient::sendVersion ()
|
||||
{
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
caStatus status = this->out.copyInHeader ( CA_PROTO_VERSION, 0,
|
||||
0, CA_MINOR_PROTOCOL_REVISION, 0, 0, 0 );
|
||||
if ( ! status ) {
|
||||
@@ -486,6 +488,7 @@ caStatus casClient::sendErr ( const caHdrLargeArray *curp, const int reportedSta
|
||||
}
|
||||
|
||||
caHdr * pReqOut;
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
caStatus status = this->out.copyInHeader ( CA_PROTO_ERROR,
|
||||
hdrSize + stringSize, 0, 0, cid, reportedStatus,
|
||||
reinterpret_cast <void **> ( & pReqOut ) );
|
||||
@@ -612,8 +615,8 @@ void casClient::dumpMsg ( const caHdrLargeArray *mp,
|
||||
char pHostName[32];
|
||||
this->hostName ( pHostName, sizeof ( pHostName) );
|
||||
|
||||
errlogPrintf (
|
||||
"CAS: %s on %s at %s: cmd=%d cid=%s typ=%d cnt=%d psz=%d avail=%x\n",
|
||||
fprintf ( stderr,
|
||||
"CAS Request: %s on %s at %s: cmd=%d cid=%s typ=%d cnt=%d psz=%d avail=%x\n",
|
||||
pUserName,
|
||||
pHostName,
|
||||
pName,
|
||||
|
||||
@@ -275,6 +275,7 @@ caStatus casDGClient::searchResponse ( const caHdrLargeArray & msg,
|
||||
}
|
||||
|
||||
ca_uint16_t * pMinorVersion;
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
status = this->out.copyInHeader ( CA_PROTO_SEARCH,
|
||||
sizeof ( *pMinorVersion ), serverPort, 0,
|
||||
serverAddr, msg.m_available,
|
||||
@@ -302,6 +303,7 @@ caStatus casDGClient::searchFailResponse ( const caHdrLargeArray * mp )
|
||||
{
|
||||
int status;
|
||||
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
status = this->out.copyInHeader ( CA_PROTO_NOT_FOUND, 0,
|
||||
mp->m_dataType, mp->m_count, mp->m_cid, mp->m_available, 0 );
|
||||
|
||||
@@ -453,6 +455,8 @@ caStatus casDGClient::asyncSearchResponse ( const caNetAddr & outAddr,
|
||||
return S_cas_success;
|
||||
}
|
||||
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
|
||||
//
|
||||
// start a DG context in the output protocol stream
|
||||
// and grab the send lock
|
||||
@@ -507,6 +511,8 @@ caStatus casDGClient::processDG ()
|
||||
break;
|
||||
}
|
||||
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
|
||||
//
|
||||
// start a DG context in the output protocol stream
|
||||
// and grab the send lock
|
||||
|
||||
@@ -229,6 +229,8 @@ caStatus casStrmClient::readResponse ( casChannelI * pChan, const caHdrLargeArra
|
||||
return this->sendErrWithEpicsStatus ( & msg, status, ECA_GETFAIL );
|
||||
}
|
||||
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
|
||||
void *pPayload;
|
||||
{
|
||||
unsigned payloadSize = dbr_size_n ( msg.m_dataType, msg.m_count );
|
||||
@@ -350,6 +352,8 @@ caStatus casStrmClient::readNotifyResponse ( casChannelI * pChan,
|
||||
return this->readNotifyFailureResponse ( msg, ECA_GETFAIL );
|
||||
}
|
||||
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
|
||||
void *pPayload;
|
||||
{
|
||||
unsigned size = dbr_size_n ( msg.m_dataType, msg.m_count );
|
||||
@@ -398,6 +402,7 @@ caStatus casStrmClient::readNotifyResponse ( casChannelI * pChan,
|
||||
//
|
||||
caStatus casStrmClient::readNotifyFailureResponse ( const caHdrLargeArray & msg, const caStatus ECA_XXXX )
|
||||
{
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
assert ( ECA_XXXX != ECA_NORMAL );
|
||||
void *pPayload;
|
||||
unsigned size = dbr_size_n ( msg.m_dataType, msg.m_count );
|
||||
@@ -519,6 +524,7 @@ static smartGDDPointer createDBRDD ( unsigned dbrType, unsigned elemCount )
|
||||
caStatus casStrmClient::monitorFailureResponse ( const caHdrLargeArray & msg,
|
||||
const caStatus ECA_XXXX )
|
||||
{
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
assert ( ECA_XXXX != ECA_NORMAL );
|
||||
void *pPayload;
|
||||
unsigned size = dbr_size_n ( msg.m_dataType, msg.m_count );
|
||||
@@ -538,6 +544,7 @@ caStatus casStrmClient::monitorFailureResponse ( const caHdrLargeArray & msg,
|
||||
caStatus casStrmClient::monitorResponse ( casChannelI & chan, const caHdrLargeArray & msg,
|
||||
const smartConstGDDPointer & pDesc, const caStatus completionStatus )
|
||||
{
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
void * pPayload;
|
||||
{
|
||||
ca_uint32_t size = dbr_size_n ( msg.m_dataType, msg.m_count );
|
||||
@@ -788,6 +795,7 @@ caStatus casStrmClient::writeNotifyResponse(
|
||||
caStatus casStrmClient::writeNotifyResponseECA_XXX (
|
||||
const caHdrLargeArray & msg, const caStatus ecaStatus )
|
||||
{
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
caStatus status = out.copyInHeader ( msg.m_cmmd, 0,
|
||||
msg.m_dataType, msg.m_count, ecaStatus,
|
||||
msg.m_available, 0 );
|
||||
@@ -1087,6 +1095,8 @@ caStatus casStrmClient::createChanResponse ( const caHdrLargeArray & hdr, const
|
||||
caStatus casStrmClient::enumPostponedCreateChanResponse (
|
||||
casChannelI & chan, const caHdrLargeArray & hdr, unsigned nativeTypeDBR )
|
||||
{
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
|
||||
//
|
||||
// We are allocating enough space for both the claim
|
||||
// response and the access rights response so that we know for
|
||||
@@ -1163,10 +1173,11 @@ caStatus casStrmClient::channelCreateFailedResp (
|
||||
errMessage( S_cas_badParameter,
|
||||
"- or S_casApp_asyncCompletion was async IO competion code ?");
|
||||
}
|
||||
else {
|
||||
else if ( status != S_casApp_pvNotFound ) {
|
||||
errMessage ( createStatus, "- Server unable to create a new PV");
|
||||
}
|
||||
if ( CA_V46( this->minor_version_number ) ) {
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
status = this->out.copyInHeader ( CA_PROTO_CLAIM_CIU_FAILED, 0,
|
||||
0, 0, hdr.m_cid, 0, 0 );
|
||||
if ( status ) {
|
||||
@@ -1197,7 +1208,7 @@ caStatus casStrmClient::disconnectChan ( caResId id )
|
||||
caStatus createStatus;
|
||||
|
||||
if ( CA_V47 ( this->minor_version_number ) ) {
|
||||
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
status = this->out.copyInHeader ( CA_PROTO_SERVER_DISCONN, 0,
|
||||
0, 0, id, 0, 0 );
|
||||
if ( status ) {
|
||||
@@ -1359,6 +1370,7 @@ caStatus casStrmClient::clearChannelAction ()
|
||||
//
|
||||
// send delete confirmed message
|
||||
//
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
status = this->out.copyInHeader ( mp->m_cmmd, 0,
|
||||
mp->m_dataType, mp->m_count,
|
||||
mp->m_cid, mp->m_available, 0 );
|
||||
@@ -1397,6 +1409,7 @@ caStatus casStrmClient::eventCancelAction ()
|
||||
return S_cas_badResourceId;
|
||||
}
|
||||
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
int status = this->out.copyInHeader (
|
||||
CA_PROTO_EVENT_ADD, 0,
|
||||
mp->m_dataType, mp->m_count,
|
||||
@@ -1440,6 +1453,8 @@ caStatus casStrmClient::noReadAccessEvent(casClientMon *pMon)
|
||||
falseReply.m_cid = pMon->getChannel().getCID();
|
||||
falseReply.m_available = pMon->getClientId();
|
||||
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
|
||||
status = this->allocMsg(size, &reply);
|
||||
if ( status ) {
|
||||
if( status == S_cas_hugeRequest ) {
|
||||
@@ -1479,20 +1494,19 @@ caStatus casStrmClient::readSyncAction()
|
||||
const caHdrLargeArray *mp = this->ctx.getMsg();
|
||||
int status;
|
||||
|
||||
epicsGuard < casCoreClient > guard ( * this );
|
||||
|
||||
//
|
||||
// This messages indicates that the client
|
||||
// timed out on a read so we must clear out
|
||||
// any pending asynchronous IO associated with
|
||||
// a read.
|
||||
//
|
||||
{
|
||||
epicsGuard < casCoreClient > guard ( * this );
|
||||
tsDLIter <casChannelI> iter = this->chanList.firstIter ();
|
||||
while ( iter.valid () ) {
|
||||
iter->clearOutstandingReads ();
|
||||
++iter;
|
||||
}
|
||||
}
|
||||
tsDLIter <casChannelI> iter = this->chanList.firstIter ();
|
||||
while ( iter.valid () ) {
|
||||
iter->clearOutstandingReads ();
|
||||
++iter;
|
||||
}
|
||||
|
||||
status = this->out.copyInHeader ( mp->m_cmmd, 0,
|
||||
mp->m_dataType, mp->m_count,
|
||||
@@ -1533,6 +1547,7 @@ caStatus casStrmClient::accessRightsResponse(casChannelI *pciu)
|
||||
ar |= CA_PROTO_ACCESS_RIGHT_WRITE;
|
||||
}
|
||||
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
status = this->out.copyInHeader ( CA_PROTO_ACCESS_RIGHTS, 0,
|
||||
0, 0, pciu->getCID(), ar, 0 );
|
||||
if ( ! status ) {
|
||||
@@ -1952,6 +1967,7 @@ unsigned casStrmClient::getDebugLevel() const
|
||||
|
||||
void casStrmClient::flush ()
|
||||
{
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
this->out.flush ();
|
||||
}
|
||||
|
||||
|
||||
@@ -102,7 +102,7 @@ inBufClient::fillCondition inBuf::fill ( inBufClient::fillParameter parm )
|
||||
|
||||
this->client.hostName ( buf, sizeof ( buf ) );
|
||||
|
||||
printf ("CAS: incoming %u byte msg from %s\n",
|
||||
fprintf ( stderr, "CAS Incoming: %u byte msg from %s\n",
|
||||
bytesRecv, buf);
|
||||
}
|
||||
}
|
||||
@@ -139,7 +139,6 @@ const inBufCtx inBuf::pushCtx ( bufSizeT headerSize, // X aCC 361
|
||||
bufSizeT inBuf::popCtx ( const inBufCtx &ctx ) // X aCC 361
|
||||
{
|
||||
if ( ctx.stat==inBufCtx::pushCtxSuccess ) {
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
bufSizeT bytesRemoved = this->nextReadIndex;
|
||||
this->pBuf = ctx.pBuf;
|
||||
this->bufSize = ctx.bufSize;
|
||||
|
||||
@@ -41,15 +41,6 @@ inline bool inBuf::full () const
|
||||
return false;
|
||||
}
|
||||
|
||||
//
|
||||
// inBuf::clear()
|
||||
//
|
||||
inline void inBuf::clear ()
|
||||
{
|
||||
this->bytesInBuffer = 0u;
|
||||
this->nextReadIndex = 0u;
|
||||
}
|
||||
|
||||
//
|
||||
// inBuf::msgPtr()
|
||||
//
|
||||
|
||||
@@ -51,10 +51,7 @@ caStatus outBuf::allocRawMsg ( bufSizeT msgsize, void **ppMsg )
|
||||
|
||||
msgsize = CA_MESSAGE_ALIGN ( msgsize );
|
||||
|
||||
this->mutex.lock ();
|
||||
|
||||
if ( msgsize > this->bufSize ) {
|
||||
this->mutex.unlock ();
|
||||
return S_cas_hugeRequest;
|
||||
}
|
||||
|
||||
@@ -72,7 +69,6 @@ caStatus outBuf::allocRawMsg ( bufSizeT msgsize, void **ppMsg )
|
||||
// and we will let select() take care of it
|
||||
//
|
||||
if ( this->stack > stackNeeded ) {
|
||||
this->mutex.unlock();
|
||||
this->client.sendBlockSignal();
|
||||
return S_cas_sendBlocked;
|
||||
}
|
||||
@@ -174,8 +170,8 @@ void outBuf::commitMsg ()
|
||||
this->commitRawMsg ( hdrSize + payloadSize );
|
||||
|
||||
if ( this->client.getDebugLevel() ) {
|
||||
errlogPrintf (
|
||||
"CAS Response => cmd=%d id=%x typ=%d cnt=%d psz=%d avail=%x outBuf ptr=%p \n",
|
||||
fprintf ( stderr,
|
||||
"CAS Response: cmd=%d id=%x typ=%d cnt=%d psz=%d avail=%x outBuf ptr=%p \n",
|
||||
epicsNTOH16 ( mp->m_cmmd ), epicsNTOH32 ( mp->m_cid ),
|
||||
epicsNTOH16 ( mp->m_dataType ), elementCount, payloadSize,
|
||||
epicsNTOH32 ( mp->m_available ), static_cast <const void *> ( mp ) );
|
||||
@@ -210,8 +206,6 @@ outBufClient::flushCondition outBuf::flush ( bufSizeT spaceRequired )
|
||||
bufSizeT nBytesRequired;
|
||||
outBufClient::flushCondition cond;
|
||||
|
||||
epicsGuard < epicsMutex > guard ( this->mutex );
|
||||
|
||||
if ( this->ctxRecursCount > 0 ) {
|
||||
return outBufClient::flushNone;
|
||||
}
|
||||
@@ -251,7 +245,7 @@ outBufClient::flushCondition outBuf::flush ( bufSizeT spaceRequired )
|
||||
if ( this->client.getDebugLevel () > 2u ) {
|
||||
char buf[64];
|
||||
this->client.hostName ( buf, sizeof ( buf ) );
|
||||
errlogPrintf ( "CAS: Sent a %d byte reply to %s\n",
|
||||
fprintf ( stderr, "CAS outgoing: %u byte reply to %s\n",
|
||||
nBytes, buf );
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,6 @@
|
||||
#define epicsExportSharedSymbols
|
||||
#endif
|
||||
|
||||
|
||||
//
|
||||
// outBuf::bytesPresent ()
|
||||
// number of bytes in the output queue
|
||||
@@ -34,38 +33,10 @@ inline bufSizeT outBuf::bytesPresent () const
|
||||
// This guarantees that any pushCtx() operation
|
||||
// in progress completes before another thread checks.
|
||||
//
|
||||
epicsGuard < epicsMutex > locker ( this->mutex );
|
||||
bufSizeT result = this->stack;
|
||||
return result;
|
||||
}
|
||||
|
||||
//
|
||||
// outBuf::clear ()
|
||||
//
|
||||
inline void outBuf::clear ()
|
||||
{
|
||||
//
|
||||
// Note on use of lock here:
|
||||
// This guarantees that any pushCtx() operation
|
||||
// in progress completes before another thread
|
||||
// clears.
|
||||
//
|
||||
epicsGuard < epicsMutex > locker ( this->mutex );
|
||||
this->stack = 0u;
|
||||
}
|
||||
|
||||
//
|
||||
// outBuf::allocMsg ()
|
||||
//
|
||||
// allocates space in the outgoing message buffer
|
||||
//
|
||||
// (if space is avilable this leaves the send lock applied)
|
||||
//
|
||||
//inline caStatus outBuf::allocMsg (bufSizeT extsize, caHdr **ppMsg)
|
||||
//{
|
||||
// return this->allocRawMsg (extsize + sizeof(caHdr), (void **)ppMsg);
|
||||
//}
|
||||
|
||||
//
|
||||
// outBuf::commitRawMsg()
|
||||
//
|
||||
@@ -73,8 +44,6 @@ inline void outBuf::commitRawMsg (bufSizeT size)
|
||||
{
|
||||
this->stack += size;
|
||||
assert ( this->stack <= this->bufSize );
|
||||
|
||||
this->mutex.unlock();
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
@@ -222,8 +222,6 @@ public:
|
||||
|
||||
void removeMsg ( bufSizeT nBytes );
|
||||
|
||||
void clear ();
|
||||
|
||||
char * msgPtr () const;
|
||||
|
||||
//
|
||||
@@ -243,7 +241,6 @@ public:
|
||||
void expandBuffer ();
|
||||
|
||||
private:
|
||||
epicsMutex mutex;
|
||||
inBufClient & client;
|
||||
clientBufMemoryManager & memMgr;
|
||||
char * pBuf;
|
||||
@@ -336,10 +333,7 @@ public:
|
||||
const outBufCtx pushCtx ( bufSizeT headerSize, bufSizeT maxBodySize, void *&pHeader );
|
||||
bufSizeT popCtx ( const outBufCtx & ); // returns actual size
|
||||
|
||||
void clear ();
|
||||
|
||||
private:
|
||||
mutable epicsMutex mutex;
|
||||
outBufClient & client;
|
||||
clientBufMemoryManager & memMgr;
|
||||
char * pBuf;
|
||||
|
||||
Reference in New Issue
Block a user