From d611fa2d75d7c3f648470004cdd1973e283be9ed Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Mon, 16 Sep 1996 18:27:51 +0000 Subject: [PATCH] vxWorks port changes --- src/cas/generic/caServer.cc | 6 ++ src/cas/generic/caServerI.cc | 29 +++++--- src/cas/generic/caServerIIL.h | 11 +-- src/cas/generic/casAsyncIOIIL.h | 7 +- src/cas/generic/casChannelIIL.h | 7 +- src/cas/generic/casCoreClient.cc | 7 ++ src/cas/generic/casDGClient.cc | 37 +++++----- src/cas/generic/casEventSys.cc | 23 ++++--- src/cas/generic/casEventSysIL.h | 7 +- src/cas/generic/casInternal.h | 8 ++- src/cas/generic/casMonitor.cc | 57 +++++++--------- src/cas/generic/casMsgIO.cc | 46 ++++++++++--- src/cas/generic/casPVIIL.h | 37 ++++++++-- src/cas/generic/casStrmClient.cc | 42 +++++++----- src/cas/generic/outBuf.cc | 102 ++++++++++++++++++---------- src/cas/generic/server.h | 84 +++++++++++++++++------ src/cas/io/bsdSocket/casDGIO.cc | 4 +- src/cas/io/bsdSocket/casIOD.h | 7 +- src/cas/io/bsdSocket/casStreamIO.cc | 52 ++++++++------ src/cas/os/posix/casDGOS.cc | 5 +- src/cas/os/posix/osiMutex.h | 5 +- src/cas/os/vxWorks/caServerOS.cc | 7 +- src/cas/os/vxWorks/casDGOS.cc | 14 ++-- src/cas/os/vxWorks/casOSD.h | 7 +- src/cas/os/vxWorks/casStreamOS.cc | 33 ++++----- src/cas/os/vxWorks/osiMutex.h | 68 ++++++++++++++++++- 26 files changed, 481 insertions(+), 231 deletions(-) diff --git a/src/cas/generic/caServer.cc b/src/cas/generic/caServer.cc index 754093540..e247326e8 100644 --- a/src/cas/generic/caServer.cc +++ b/src/cas/generic/caServer.cc @@ -29,6 +29,9 @@ * * History * $Log$ + * Revision 1.2 1996/06/21 02:30:52 jhill + * solaris port + * * Revision 1.1.1.1 1996/06/20 00:28:14 jhill * ca server installation * @@ -115,6 +118,9 @@ void caServer::show(unsigned level) if (this->pCAS) { this->pCAS->show(level); } + else { + printf("caServer:: no server internals attached\n"); + } } // diff --git a/src/cas/generic/caServerI.cc b/src/cas/generic/caServerI.cc index fe6f7dc24..3dfdf4d3e 100644 --- a/src/cas/generic/caServerI.cc +++ b/src/cas/generic/caServerI.cc @@ -29,6 +29,9 @@ * * History * $Log$ + * Revision 1.4 1996/09/04 20:12:04 jhill + * added arg to serverToolDebugFunc() + * * Revision 1.3 1996/08/13 22:56:12 jhill * added init for mutex class * @@ -61,13 +64,15 @@ void caServerI::show (unsigned level) printf( "Channel Access Server Status V%d.%d\n", CA_PROTOCOL_VERSION, CA_MINOR_VERSION); - this->lock(); + this->osiMutex::show(level); + + this->osiLock(); while ( (pClient = iter()) ) { pClient->show(level); } this->dgClient.show(level); - this->unlock(); + this->osiUnlock(); bytes_reserved = 0u; #if 0 @@ -94,14 +99,14 @@ void caServerI::show (unsigned level) #endif printf( "The server's integer resource id conversion table:\n"); - this->lock(); + this->osiLock(); this->uintResTable::show(level); - this->unlock(); + this->osiUnlock(); printf( "The server's character string resource id conversion table:\n"); - this->lock(); + this->osiLock(); this->stringResTbl.show(level); - this->unlock(); + this->osiUnlock(); } // @@@@@@ caPrintAddrList(&destAddr); @@ -193,7 +198,7 @@ caServerI::~caServerI() { casClient *pClient; - this->lock(); + this->osiLock(); // // delete all clients @@ -202,6 +207,8 @@ caServerI::~caServerI() delete pClient; } + this->osiUnlock(); + // // verify that we didnt leak a PV // @@ -214,9 +221,9 @@ caServerI::~caServerI() // void caServerI::installClient(casStrmClient *pClient) { - this->lock(); + this->osiLock(); this->clientList.add(*pClient); - this->unlock(); + this->osiUnlock(); } @@ -225,9 +232,9 @@ void caServerI::installClient(casStrmClient *pClient) // void caServerI::removeClient(casStrmClient *pClient) { - this->lock(); + this->osiLock(); this->clientList.remove(*pClient); - this->unlock(); + this->osiUnlock(); } diff --git a/src/cas/generic/caServerIIL.h b/src/cas/generic/caServerIIL.h index 2d1357890..3d1d385e9 100644 --- a/src/cas/generic/caServerIIL.h +++ b/src/cas/generic/caServerIIL.h @@ -29,6 +29,9 @@ * * History * $Log$ + * Revision 1.1.1.1 1996/06/20 00:28:16 jhill + * ca server installation + * * */ @@ -110,10 +113,10 @@ inline void caServerI::installPV (casPVI &pv) { int resLibStatus; - this->lock (); + this->osiLock (); this->pvCount++; resLibStatus = this->stringResTbl.add (pv); - this->unlock (); + this->osiUnlock (); assert (resLibStatus==0); } @@ -124,11 +127,11 @@ inline void caServerI::removePV(casPVI &pv) { casPVI *pPV; - this->lock(); + this->osiLock(); casVerify (this->pvCount>=1u); this->pvCount--; pPV = this->stringResTbl.remove (pv); - this->unlock(); + this->osiUnlock(); casVerify (pPV!=0); casVerify (pPV==&pv); } diff --git a/src/cas/generic/casAsyncIOIIL.h b/src/cas/generic/casAsyncIOIIL.h index 112a4c7a5..73b2fecf6 100644 --- a/src/cas/generic/casAsyncIOIIL.h +++ b/src/cas/generic/casAsyncIOIIL.h @@ -29,6 +29,9 @@ * * History * $Log$ + * Revision 1.2 1996/09/04 20:16:24 jhill + * moved operator -> here + * * Revision 1.1.1.1 1996/06/20 00:28:16 jhill * ca server installation * @@ -44,7 +47,7 @@ // inline void casAsyncIOI::lock() { - client.lock(); + client.osiLock(); } // @@ -52,7 +55,7 @@ inline void casAsyncIOI::lock() // inline void casAsyncIOI::unlock() { - client.unlock(); + client.osiUnlock(); } // diff --git a/src/cas/generic/casChannelIIL.h b/src/cas/generic/casChannelIIL.h index a5d425e2a..78585290d 100644 --- a/src/cas/generic/casChannelIIL.h +++ b/src/cas/generic/casChannelIIL.h @@ -29,6 +29,9 @@ * * History * $Log$ + * Revision 1.3 1996/09/04 20:18:27 jhill + * moved operator -> here + * * Revision 1.2 1996/07/01 19:56:10 jhill * one last update prior to first release * @@ -55,7 +58,7 @@ inline casChannel * casChannelI::operator -> () // inline void casChannelI::lock() { - this->client.lock(); + this->client.osiLock(); } // @@ -63,7 +66,7 @@ inline void casChannelI::lock() // inline void casChannelI::unlock() { - this->client.unlock(); + this->client.osiUnlock(); } // diff --git a/src/cas/generic/casCoreClient.cc b/src/cas/generic/casCoreClient.cc index 0b83db437..6ab0c9805 100644 --- a/src/cas/generic/casCoreClient.cc +++ b/src/cas/generic/casCoreClient.cc @@ -29,6 +29,9 @@ * * History * $Log$ + * Revision 1.2 1996/08/13 22:53:14 jhill + * changes for MVC++ + * * Revision 1.1.1.1 1996/06/20 00:28:16 jhill * ca server installation * @@ -107,6 +110,8 @@ casCoreClient::~casCoreClient() ca_printf ("CAS: Connection Terminated\n"); } + this->osiLock(); + // // cancel any pending asynchronous IO // @@ -120,6 +125,8 @@ casCoreClient::~casCoreClient() delete pCurIO; pCurIO = pNextIO; } + + this->osiUnlock(); } // diff --git a/src/cas/generic/casDGClient.cc b/src/cas/generic/casDGClient.cc index be265f10a..33cdc162c 100644 --- a/src/cas/generic/casDGClient.cc +++ b/src/cas/generic/casDGClient.cc @@ -29,6 +29,9 @@ * * History * $Log$ + * Revision 1.5 1996/09/04 20:19:47 jhill + * added missing byte swap on search reply port no + * * Revision 1.4 1996/08/13 22:54:20 jhill * fixed little endian problem * @@ -311,7 +314,8 @@ void casDGClient::process() // force all replies to be sent to the client // that made the request // - this->clear(); + this->inBuf::clear(); + this->outBuf::clear(); // // read in new input @@ -319,12 +323,11 @@ void casDGClient::process() fillCond = this->fill(); if (fillCond == casFillDisconnect) { casVerify(0); - return; } // // verify that we have a message to process // - if (this->inBuf::bytesPresent()>0u) { + else if (this->inBuf::bytesPresent()>0u) { // // process the message // @@ -332,22 +335,20 @@ void casDGClient::process() if (status) { errMessage (status, "unexpected error processing stateless protocol"); - // - // clear the input buffer so this will - // not effect future input - // - this->clear(); - } - else { - // - // force all replies to go to the sender - // - flushCond = this->flush(); - if (flushCond!=casFlushCompleted) { - this->clear(); - casVerify(0); - } } + // + // force all replies to go to the sender + // + flushCond = this->flush(); + if (flushCond!=casFlushCompleted) { + casVerify(0); + } } + // + // clear the input/output buffers so replies + // are always sent to the sender of the request + // + this->inBuf::clear(); + this->outBuf::clear(); } diff --git a/src/cas/generic/casEventSys.cc b/src/cas/generic/casEventSys.cc index 5ce4d61c4..7d84d5aa0 100644 --- a/src/cas/generic/casEventSys.cc +++ b/src/cas/generic/casEventSys.cc @@ -29,6 +29,9 @@ * * History * $Log$ + * Revision 1.2 1996/07/24 22:00:49 jhill + * added pushOnToEventQueue() + * * Revision 1.1.1.1 1996/06/20 00:28:15 jhill * ca server installation * @@ -79,16 +82,18 @@ casEventSys::~casEventSys() { casEvent *pE; - this->mutex.lock(); - /* - * They must cancel all active event blocks first + * all active event blocks must be canceled first */ assert (this->numEventBlocks==0); + this->mutex.osiLock(); + while ( (pE = this->eventLogQue.get()) ) { delete pE; } + + this->mutex.osiUnlock(); } @@ -97,10 +102,10 @@ casEventSys::~casEventSys() // void casEventSys::installMonitor() { - this->mutex.lock(); + this->mutex.osiLock(); this->numEventBlocks++; this->maxLogEntries += averageEventEntries; - this->mutex.unlock(); + this->mutex.osiUnlock(); } // @@ -108,11 +113,11 @@ void casEventSys::installMonitor() // void casEventSys::removeMonitor() { - this->mutex.lock(); + this->mutex.osiLock(); assert (this->numEventBlocks>=1u); this->numEventBlocks--; this->maxLogEntries -= averageEventEntries; - this->mutex.unlock(); + this->mutex.osiUnlock(); } @@ -126,7 +131,7 @@ casProcCond casEventSys::process() casProcCond cond = casProcOk; unsigned long nAccepted = 0u; - this->mutex.lock(); + this->mutex.osiLock(); while ( (pEvent = this->eventLogQue.get()) ) { @@ -166,7 +171,7 @@ casProcCond casEventSys::process() this->coreClient.eventFlush(); } - this->mutex.unlock(); + this->mutex.osiUnlock(); return cond; } diff --git a/src/cas/generic/casEventSysIL.h b/src/cas/generic/casEventSysIL.h index 43631edc9..1ac47f25f 100644 --- a/src/cas/generic/casEventSysIL.h +++ b/src/cas/generic/casEventSysIL.h @@ -29,6 +29,9 @@ * * History * $Log$ + * Revision 1.1.1.1 1996/06/20 00:28:16 jhill + * ca server installation + * * */ @@ -41,9 +44,9 @@ // inline void casEventSys::addToEventQueue(casEvent &event) { - this->mutex.lock(); + this->mutex.osiLock(); this->eventLogQue.add(event); - this->mutex.unlock(); + this->mutex.osiUnlock(); // // wakes up the event queue consumer // diff --git a/src/cas/generic/casInternal.h b/src/cas/generic/casInternal.h index 9fec83843..74bc33570 100644 --- a/src/cas/generic/casInternal.h +++ b/src/cas/generic/casInternal.h @@ -29,6 +29,9 @@ * * History * $Log$ + * Revision 1.6 1996/09/04 20:21:41 jhill + * removed operator -> and added member pv + * * Revision 1.5 1996/07/01 19:56:11 jhill * one last update prior to first release * @@ -560,13 +563,14 @@ public: inline aitBool okToBeginNewIO() const; - inline void lock(); - inline void unlock(); private: tsDLList chanList; caServerI &cas; casPV &pv; unsigned nMonAttached; unsigned nIOAttached; + + inline void lock(); + inline void unlock(); }; diff --git a/src/cas/generic/casMonitor.cc b/src/cas/generic/casMonitor.cc index 303aad7ae..e38c16256 100644 --- a/src/cas/generic/casMonitor.cc +++ b/src/cas/generic/casMonitor.cc @@ -29,6 +29,9 @@ * * History * $Log$ + * Revision 1.4 1996/07/24 22:00:49 jhill + * added pushOnToEventQueue() + * * Revision 1.3 1996/07/01 19:56:11 jhill * one last update prior to first release * @@ -82,7 +85,7 @@ casMonitor::~casMonitor() { casCoreClient &client = this->ciu.getClient(); - this->mutex.lock(); + this->mutex.osiLock(); this->disable(); @@ -97,6 +100,8 @@ casMonitor::~casMonitor() this->pModifiedValue = NULL; } this->ciu.deleteMonitor(*this); + + this->mutex.osiUnlock(); } // @@ -106,24 +111,16 @@ void casMonitor::enable() { caStatus status; - this->mutex.lock(); - if (this->enabled) { - this->mutex.unlock(); - return; + this->mutex.osiLock(); + if (!this->enabled && this->ciu->readAccess()) { + this->enabled = TRUE; + status = this->ciu.getPVI().registerEvent(); + if (status) { + errMessage(status, + "Server tool failed to register event\n"); + } } - - if (!this->ciu->readAccess()) { - this->mutex.unlock(); - return; - } - - this->enabled = TRUE; - status = this->ciu.getPVI().registerEvent(); - if (status) { - errMessage(status, - "Server tool failed to register event\n"); - } - this->mutex.unlock(); + this->mutex.osiUnlock(); } // @@ -131,14 +128,12 @@ void casMonitor::enable() // void casMonitor::disable() { - this->mutex.lock(); - if (!this->enabled) { - this->mutex.unlock(); - return; + this->mutex.osiLock(); + if (this->enabled) { + this->enabled = FALSE; + this->ciu.getPVI().unregisterEvent(); } - this->enabled = FALSE; - this->ciu.getPVI().unregisterEvent(); - this->mutex.unlock(); + this->mutex.osiUnlock(); } // @@ -150,7 +145,7 @@ void casMonitor::push(gdd &newValue) casMonEvent *pLog = NULL; char full; - this->mutex.lock(); + this->mutex.osiLock(); // // get a new block if we havent exceeded quotas @@ -205,7 +200,7 @@ void casMonitor::push(gdd &newValue) client.addToEventQueue(*pLog); - this->mutex.unlock(); + this->mutex.osiUnlock(); } // @@ -219,7 +214,7 @@ caStatus casMonitor::executeEvent(casMonEvent *pEV) pVal = pEV->getValue (); assert (pVal); - this->mutex.lock(); + this->mutex.osiLock(); if (this->ciu.getClient().getEventsOff()==aitFalse) { status = this->callBack (*pVal); } @@ -236,7 +231,7 @@ caStatus casMonitor::executeEvent(casMonEvent *pEV) this->pModifiedValue = pVal; status = S_cas_success; } - this->mutex.unlock(); + this->mutex.osiUnlock(); // // if the event isnt accepted we will try @@ -285,12 +280,12 @@ void casMonitor::show(unsigned level) // void casMonitor::postIfModified() { - this->mutex.lock(); + this->mutex.osiLock(); if (this->pModifiedValue) { this->callBack (*this->pModifiedValue); this->pModifiedValue->unreference (); this->pModifiedValue = NULL; } - this->mutex.unlock(); + this->mutex.osiUnlock(); } diff --git a/src/cas/generic/casMsgIO.cc b/src/cas/generic/casMsgIO.cc index b2e31ad22..7f2391898 100644 --- a/src/cas/generic/casMsgIO.cc +++ b/src/cas/generic/casMsgIO.cc @@ -29,6 +29,9 @@ * * History * $Log$ + * Revision 1.1.1.1 1996/06/20 00:28:15 jhill + * ca server installation + * * */ @@ -39,8 +42,9 @@ casMsgIO::casMsgIO() { - elapsedAtLastSend = this->elapsedAtLastRecv + this->elapsedAtLastSend = this->elapsedAtLastRecv = osiTime::getCurrent (); + this->blockingStatus = xIsBlocking; } casMsgIO::~casMsgIO() @@ -81,15 +85,41 @@ xRecvStatus casMsgIO::xRecv(char *pBuf, bufSizeT nBytes, bufSizeT &nActualBytes) return stat; } -xSendStatus casMsgIO::xSend(char *pBuf, bufSizeT nBytes, bufSizeT &nActualBytes) +xSendStatus casMsgIO::xSend(char *pBuf, bufSizeT nBytesAvailableToSend, + bufSizeT nBytesNeedToBeSent, bufSizeT &nActualBytes) { xSendStatus stat; + bufSizeT nActualBytesDelta; - stat = this->osdSend(pBuf, nBytes, nActualBytes); - if (stat == xSendOK) { - this->elapsedAtLastSend = osiTime::getCurrent(); + assert (nBytesAvailableToSend>=nBytesNeedToBeSent); + + nActualBytes = 0u; + if (this->blockingStatus == xIsntBlocking) { + stat = this->osdSend(pBuf, nBytesAvailableToSend, + nActualBytes); + if (stat == xSendOK) { + this->elapsedAtLastSend = osiTime::getCurrent(); + } + return stat; } - return stat; + + while (nBytesNeedToBeSent) { + stat = this->osdSend(pBuf, nBytesAvailableToSend, + nActualBytesDelta); + if (stat != xSendOK) { + return stat; + } + + this->elapsedAtLastSend = osiTime::getCurrent(); + nActualBytes += nActualBytesDelta; + if (nBytesNeedToBeSent>nActualBytesDelta) { + nBytesNeedToBeSent -= nActualBytesDelta; + } + else { + break; + } + } + return xSendOK; } void casMsgIO::sendBeacon(char & /*msg*/, bufSizeT /*length*/, @@ -103,9 +133,9 @@ int casMsgIO::getFileDescriptor() const return -1; // some os will not have file descriptors } -void casMsgIO::setNonBlocking() +void casMsgIO::xSetNonBlocking() { - printf("virtual base setNonBlocking() called?\n"); + printf("virtual base casMsgIO::xSetNonBlocking() called?\n"); } bufSizeT casMsgIO::incommingBytesPresent() const diff --git a/src/cas/generic/casPVIIL.h b/src/cas/generic/casPVIIL.h index bd5702fd1..f17d1646d 100644 --- a/src/cas/generic/casPVIIL.h +++ b/src/cas/generic/casPVIIL.h @@ -29,6 +29,9 @@ * * History * $Log$ + * Revision 1.5 1996/09/04 20:23:59 jhill + * added operator -> + * * Revision 1.4 1996/07/01 19:56:13 jhill * one last update prior to first release * @@ -75,7 +78,13 @@ casPV * casPVI::operator -> () const // inline void casPVI::lock() { - this->cas.lock(); + // + // NOTE: + // if this lock becomes something else besides the + // server's lock then look carefully at the + // comment in casPVI::deleteSignal() + // + this->cas.osiLock(); } // @@ -83,7 +92,7 @@ inline void casPVI::lock() // inline void casPVI::unlock() { - this->cas.unlock(); + this->cas.osiUnlock(); } // @@ -148,13 +157,29 @@ inline void casPVI::unregisterIO() // inline void casPVI::deleteSignal() { - this->lock(); + caServerI &localCASRef(this->cas); + + // + // We dont take the PV lock here because + // the PV may be destroyed and we must + // keep the lock unlock pairs consistent + // (because the PV's lock is really a ref + // to the server's lock) + // + // This is safe to do because we take the PV + // lock when we add a new channel (and the + // PV lock is realy the server's lock) + // + localCASRef.osiLock(); + if (this->chanList.count()==0u) { (*this)->destroy(); + // + // !! dont access self after destroy !! + // } - else { - this->unlock(); - } + + localCASRef.osiUnlock(); } // diff --git a/src/cas/generic/casStrmClient.cc b/src/cas/generic/casStrmClient.cc index 993c8e047..89c9c3436 100644 --- a/src/cas/generic/casStrmClient.cc +++ b/src/cas/generic/casStrmClient.cc @@ -29,6 +29,10 @@ * * History * $Log$ + * Revision 1.9 1996/09/04 20:25:53 jhill + * use correct app type for exist test gdd, correct byte + * oder for mon mask, and efficient use of PV name gdd + * * Revision 1.8 1996/08/05 23:22:57 jhill * gddScaler => gddScalar * @@ -214,7 +218,7 @@ casStrmClient::~casStrmClient() delete [] this->pHostName; } - this->lock(); + this->osiLock(); // // delete all channel attached @@ -228,6 +232,10 @@ casStrmClient::~casStrmClient() pChan->clientDestroy(); pChan = pNextChan; } + + delete &this->msgIO; + + this->osiUnlock(); } // @@ -812,7 +820,7 @@ caStatus casStrmClient::hostNameAction() size-1); pMalloc[size-1]='\0'; - this->lock(); + this->osiLock(); if (this->pHostName) { delete [] this->pHostName; @@ -823,7 +831,7 @@ caStatus casStrmClient::hostNameAction() (*pciu)->setOwner(this->pUserName, this->pHostName); } - this->unlock(); + this->osiUnlock(); return S_cas_success; } @@ -857,7 +865,7 @@ caStatus casStrmClient::clientNameAction() size-1); pMalloc[size-1]='\0'; - this->lock(); + this->osiLock(); if (this->pUserName) { delete [] this->pUserName; } @@ -866,7 +874,7 @@ caStatus casStrmClient::clientNameAction() while ( (pciu = iter()) ) { (*pciu)->setOwner(this->pUserName, this->pHostName); } - this->unlock(); + this->osiUnlock(); return S_cas_success; } @@ -1015,11 +1023,11 @@ caStatus casStrmClient::eventsOnAction () // perhaps this is to slow - perhaps there // should be a queue of modified events // - this->lock(); + this->osiLock(); while ( (pciu = iter()) ) { pciu->postAllModifiedEvents(); } - this->unlock(); + this->osiUnlock(); return S_cas_success; } @@ -1639,13 +1647,13 @@ caStatus casStrmClient::createChanResponse(casChannelI *, // prevent problems such as the PV being deleted before the // channel references it // - this->lock(); + this->osiLock(); pCanonicalName->markConstant(); pPV = this->ctx.getServer()->createPV(*pCanonicalName); if (!pPV) { - this->unlock(); + this->osiUnlock(); return this->channelCreateFailed(&msg, S_cas_noMemory); } @@ -1656,12 +1664,12 @@ caStatus casStrmClient::createChanResponse(casChannelI *, pChan = (*pPV)->createChannel(this->ctx, this->pUserName, this->pHostName); if (!pChan) { - this->unlock(); + this->osiUnlock(); pPV->deleteSignal(); return this->channelCreateFailed(&msg, S_cas_noMemory); } - this->unlock(); + this->osiUnlock(); pChanI = (casChannelI *) pChan; @@ -1711,7 +1719,7 @@ casPVI *caServerI::createPV (gdd &name) name.getRef(pNameStr); stringId id (pNameStr->string()); - this->lock (); + this->osiLock (); pPVI = this->stringResTbl.lookup (id); if (!pPVI) { @@ -1726,7 +1734,7 @@ casPVI *caServerI::createPV (gdd &name) // lock shouldnt be released until we finish creating and // installing the PV // - this->unlock (); + this->osiUnlock (); return pPVI; } @@ -1744,10 +1752,10 @@ inline aitBool caServerI::roomForNewChannel() const // void casStrmClient::installChannel(casChannelI &chan) { - this->lock(); + this->osiLock(); this->getCAS().installItem(chan); this->chanList.add(chan); - this->unlock(); + this->osiUnlock(); } // @@ -1757,10 +1765,10 @@ void casStrmClient::removeChannel(casChannelI &chan) { casRes *pRes; - this->lock(); + this->osiLock(); pRes = this->getCAS().removeItem(chan); assert (&chan == (casChannelI *)pRes); this->chanList.remove(chan); - this->unlock(); + this->osiUnlock(); } diff --git a/src/cas/generic/outBuf.cc b/src/cas/generic/outBuf.cc index 85a79dea0..21ca8a128 100644 --- a/src/cas/generic/outBuf.cc +++ b/src/cas/generic/outBuf.cc @@ -29,6 +29,9 @@ * * History * $Log$ + * Revision 1.3 1996/09/04 20:27:01 jhill + * doccasdef.h + * * Revision 1.2 1996/08/13 22:53:59 jhill * fixed little endian problem * @@ -47,13 +50,12 @@ // outBuf::outBuf(casMsgIO &virtualCircuit, osiMutex &mutexIn) : io(virtualCircuit), - mutex(mutexIn) + mutex(mutexIn), + pBuf(NULL), + bufSize(io.optimumBufferSize()), + stack(0u) { assert(&io); - - this->stack = 0u; - this->bufSize = 0u; - this->pBuf = NULL; } // @@ -61,10 +63,8 @@ outBuf::outBuf(casMsgIO &virtualCircuit, osiMutex &mutexIn) : // caStatus outBuf::init() { - this->bufSize = io.optimumBufferSize(); this->pBuf = new char [this->bufSize]; if (!this->pBuf) { - this->bufSize = 0u; return S_cas_noMemory; } return S_cas_success; @@ -94,30 +94,32 @@ caHdr **ppMsg ) { bufSizeT msgsize; + bufSizeT stackNeeded; extsize = CA_MESSAGE_ALIGN(extsize); msgsize = extsize + sizeof(caHdr); - if (msgsize>this->bufSize) { + if (msgsize>this->bufSize || this->pBuf==NULL) { return S_cas_hugeRequest; } + stackNeeded = this->bufSize - msgsize; - this->mutex.lock(); + this->mutex.osiLock(); - if (this->stack + msgsize > this->bufSize) { + if (this->stack > stackNeeded) { /* * Try to flush the output queue */ - this->flush(); + this->flush(casFlushSpecified, msgsize); /* * If this failed then the fd is nonblocking * and we will let select() take care of it */ - if (this->stack + msgsize > this->bufSize) { - this->mutex.unlock(); + if (this->stack > stackNeeded) { + this->mutex.osiUnlock(); this->sendBlockSignal(); return S_cas_sendBlocked; } @@ -183,43 +185,69 @@ void outBuf::commitMsg () this->stack += sizeof(caHdr) + extSize; assert (this->stack <= this->bufSize); - this->mutex.unlock(); + this->mutex.osiUnlock(); } // // outBuf::flush() // -casFlushCondition outBuf::flush() +casFlushCondition outBuf::flush(casFlushRequest req, + bufSizeT spaceRequired) { - bufSizeT nBytes; - xSendStatus stat; + bufSizeT nBytes; + bufSizeT stackNeeded; + bufSizeT nBytesNeeded; + xSendStatus stat; + casFlushCondition cond; - if (this->stack<=0u) { - return casFlushCompleted; - } - - stat = this->io.xSend(this->pBuf, this->stack, nBytes); - if (stat!=xSendOK) { + if (!this->pBuf) { return casFlushDisconnect; } - else if (nBytes >= this->stack) { - this->stack=0u; - return casFlushCompleted; - } - else if (nBytes) { - bufSizeT len; - len = this->stack-nBytes; - // - // memmove() is ok with overlapping buffers - // - memmove (this->pBuf, &this->pBuf[nBytes], len); - this->stack = len; - return casFlushPartial; + this->mutex.osiLock(); + + if (req==casFlushAll) { + nBytesNeeded = this->stack; } else { - return casFlushNone; + stackNeeded = this->bufSize - spaceRequired; + if (this->stack>stackNeeded) { + nBytesNeeded = this->stack - stackNeeded; + } + else { + nBytesNeeded = 0u; + } } + + stat = this->io.xSend(this->pBuf, this->stack, + nBytesNeeded, nBytes); + if (nBytes) { + bufSizeT len; + + if (nBytes >= this->stack) { + this->stack=0u; + cond = casFlushCompleted; + } + else { + len = this->stack-nBytes; + // + // memmove() is ok with overlapping buffers + // + memmove (this->pBuf, &this->pBuf[nBytes], len); + this->stack = len; + cond = casFlushPartial; + } + } + else { + cond = casFlushNone; + } + + if (stat!=xSendOK) { + cond = casFlushDisconnect; + } + this->mutex.osiUnlock(); + return cond; + } // diff --git a/src/cas/generic/server.h b/src/cas/generic/server.h index 5e6d0d72e..6af5119e4 100644 --- a/src/cas/generic/server.h +++ b/src/cas/generic/server.h @@ -29,6 +29,9 @@ * * History * $Log$ + * Revision 1.10 1996/09/04 20:27:02 jhill + * doccasdef.h + * * Revision 1.9 1996/08/13 22:56:14 jhill * added init for mutex class * @@ -110,6 +113,7 @@ caStatus copyBetweenDD(gdd &dest, gdd &src); // enum xRecvStatus {xRecvOK, xRecvDisconnect}; enum xSendStatus {xSendOK, xSendDisconnect}; +enum xBlockingStatus {xIsBlocking, xIsntBlocking}; enum casIOState {casOnLine, casOffLine}; typedef unsigned bufSizeT; class casMsgIO { @@ -129,8 +133,8 @@ public: // // device dependent recv // - xSendStatus xSend (char *pBuf, bufSizeT nBytesToSend, - bufSizeT &nBytesSent); + xSendStatus xSend (char *pBuf, bufSizeT nBytesAvailableToSend, + bufSizeT nBytesNeedToBeSent, bufSizeT &nBytesSent); xRecvStatus xRecv (char *pBuf, bufSizeT nBytesToRecv, bufSizeT &nByesRecv); @@ -140,7 +144,11 @@ public: virtual casIOState state() const=0; virtual void hostNameFromAddr (char *pBuf, unsigned bufSize)=0; virtual int getFileDescriptor() const; - virtual void setNonBlocking(); + void setNonBlocking() + { + this->xSetNonBlocking(); + this->blockingStatus = xIsntBlocking; + } // // only for use with DG io @@ -154,12 +162,14 @@ private: // osiTime elapsedAtLastSend; osiTime elapsedAtLastRecv; + xBlockingStatus blockingStatus; virtual xSendStatus osdSend (const char *pBuf, bufSizeT nBytesReq, bufSizeT &nBytesActual) =0; virtual xRecvStatus osdRecv (char *pBuf, bufSizeT nBytesReq, bufSizeT &nBytesActual) =0; virtual void osdShow (unsigned level) const = 0; + virtual void xSetNonBlocking(); }; #include // OS dependent @@ -260,9 +270,9 @@ private: // inline void casEventSys::insertEventQueue(casEvent &insert, casEvent &prevEvent) { - this->mutex.lock(); + this->mutex.osiLock(); this->eventLogQue.insertAfter(insert, prevEvent); - this->mutex.unlock(); + this->mutex.osiUnlock(); } // @@ -270,9 +280,9 @@ inline void casEventSys::insertEventQueue(casEvent &insert, casEvent &prevEvent) // inline void casEventSys::pushOnToEventQueue(casEvent &event) { - this->mutex.lock(); + this->mutex.osiLock(); this->eventLogQue.push(event); - this->mutex.unlock(); + this->mutex.osiUnlock(); } // @@ -280,9 +290,9 @@ inline void casEventSys::pushOnToEventQueue(casEvent &event) // inline void casEventSys::removeFromEventQueue(casEvent &event) { - this->mutex.lock(); + this->mutex.osiLock(); this->eventLogQue.remove(event); - this->mutex.unlock(); + this->mutex.osiUnlock(); } // @@ -469,6 +479,9 @@ enum casFlushCondition{ casFlushPartial, casFlushCompleted, casFlushDisconnect}; +enum casFlushRequest{ + casFlushAll, + casFlushSpecified}; class outBuf { public: @@ -480,13 +493,24 @@ public: // number of bytes in the output queue? // bufSizeT bytesPresent() const - { return this->stack; } + { + return this->stack; + } + + // + // number of bytes unused in the output queue? + // + bufSizeT bytesFree() const + { + return this->bufSize - this->stack; + } // // flush output queue // (returns the number of bytes sent) // - casFlushCondition flush(); + casFlushCondition flush(casFlushRequest req = casFlushAll, + bufSizeT spaceRequired=0u); // // allocate message buffer space @@ -502,18 +526,25 @@ public: // // release an allocated message (but dont send it) // - void discardMsg () { this->mutex.unlock(); }; + void discardMsg () + { + this->mutex.osiUnlock(); + }; void show(unsigned level); virtual unsigned getDebugLevel()=0; virtual void sendBlockSignal()=0; + void clear() + { + this->stack = 0u; + } private: casMsgIO &io; osiMutex &mutex; char *pBuf; - bufSizeT bufSize; + const bufSizeT bufSize; bufSizeT stack; }; @@ -552,16 +583,16 @@ public: void installAsyncIO(casAsyncIOI &ioIn) { - this->lock(); + this->osiLock(); this->ioInProgList.add(ioIn); - this->unlock(); + this->osiUnlock(); } void removeAsyncIO(casAsyncIOI &ioIn) { - this->lock(); + this->osiLock(); this->ioInProgList.remove(ioIn); - this->unlock(); + this->osiUnlock(); } casRes *lookupRes(const caResId &idIn, casResType type); @@ -668,8 +699,8 @@ protected: const void *dp, const char *pFileName, const unsigned lineno); -private: casMsgIO &msgIO; +private: // // dump message to stderr @@ -822,7 +853,7 @@ private: // // casDGClient // -class casDGClient : private casDGIO, public casClient { +class casDGClient : private casDGIO, private casClient { public: casDGClient (caServerI &serverIn); @@ -857,9 +888,24 @@ public: void destroy(); + int getFD() const + { + return this->casClient::getFD(); + } + protected: void process(); + casProcCond eventSysProcess() + { + return this->casEventSys::process(); + } + + casFlushCondition flush(casFlushRequest req = casFlushAll, + bufSizeT spaceRequired=0u) + { + return this->outBuf::flush(req,spaceRequired); + } private: void ioBlockedSignal(); // dummy diff --git a/src/cas/io/bsdSocket/casDGIO.cc b/src/cas/io/bsdSocket/casDGIO.cc index aa65f1445..17c6385a8 100644 --- a/src/cas/io/bsdSocket/casDGIO.cc +++ b/src/cas/io/bsdSocket/casDGIO.cc @@ -176,9 +176,9 @@ void casDGIO::osdShow (unsigned level) const } // -// casDGIO::setNonBlocking() +// casDGIO::xSetNonBlocking() // -void casDGIO::setNonBlocking() +void casDGIO::xSetNonBlocking() { int status; int yes = TRUE; diff --git a/src/cas/io/bsdSocket/casIOD.h b/src/cas/io/bsdSocket/casIOD.h index 351a252f5..3286438b5 100644 --- a/src/cas/io/bsdSocket/casIOD.h +++ b/src/cas/io/bsdSocket/casIOD.h @@ -7,6 +7,9 @@ // Some BSD calls have crept in here // // $Log$ +// Revision 1.4 1996/09/04 20:29:08 jhill +// removed os depen stuff +// // Revision 1.3 1996/08/13 23:00:29 jhill // removed include of netdb.h // @@ -84,7 +87,7 @@ public: ~casStreamIO(); int getFileDescriptor() const; - void setNonBlocking(); + void xSetNonBlocking(); bufSizeT optimumBufferSize (); casIOState state() const; @@ -110,7 +113,7 @@ public: ~casDGIO(); int getFileDescriptor() const; - void setNonBlocking(); + void xSetNonBlocking(); bufSizeT optimumBufferSize (); void sendBeacon(char &msg, bufSizeT length, aitUint32 &m_avail); diff --git a/src/cas/io/bsdSocket/casStreamIO.cc b/src/cas/io/bsdSocket/casStreamIO.cc index 7e7e65fba..892ad9d4e 100644 --- a/src/cas/io/bsdSocket/casStreamIO.cc +++ b/src/cas/io/bsdSocket/casStreamIO.cc @@ -5,6 +5,9 @@ // // // $Log$ +// Revision 1.4 1996/07/24 22:03:36 jhill +// fixed net proto for gnu compiler +// // Revision 1.3 1996/07/09 22:55:22 jhill // added cast // @@ -23,8 +26,15 @@ #include #include #include -#ifndef SUNOS4 // the SUNOS4 prototypes are trad C - see bsdProto.h +// +// the SUNOS4 and vxWorks5.2 prototypes are trad C +// +#if !defined(SUNOS4) && !defined(vxWorks) #include +#else +extern "C" { +char * inet_ntoa(struct in_addr); +} #endif @@ -32,10 +42,9 @@ // casStreamIO::casStreamIO() // casStreamIO::casStreamIO(const SOCKET s, const caAddr &a) : - sock(s), addr(a) + sockState(casOffLine), sock(s), addr(a) { assert (sock>=0); - this->sockState = casOffLine; } @@ -124,7 +133,8 @@ caStatus casStreamIO::init() casStreamIO::~casStreamIO() { if (sock>=0) { - close(sock); + close(this->sock); +printf("closing sock=%d\n", this->sock); } } @@ -132,7 +142,7 @@ casStreamIO::~casStreamIO() // // casStreamIO::osdSend() // -xSendStatus casStreamIO::osdSend(const char *pBuf, bufSizeT nBytes, +xSendStatus casStreamIO::osdSend(const char *pBuf, bufSizeT nBytesReq, bufSizeT &nBytesActual) { int status; @@ -156,31 +166,29 @@ xSendStatus casStreamIO::osdSend(const char *pBuf, bufSizeT nBytes, return xSendDisconnect; } - - if (nBytes<=0u) { + if (nBytesReq<=0u) { nBytesActual = 0u; return xSendOK; } - status = send ( - this->sock, - pBuf, - nBytes, - 0); - if (status == 0) { - this->sockState = casOffLine; + status = send ( + this->sock, + (char *) pBuf, + nBytesReq, + 0); + if (status == 0) { + this->sockState = casOffLine; return xSendDisconnect; - } - else if (status<0) { - int anerrno = SOCKERRNO; + } + else if (status<0) { + int anerrno = SOCKERRNO; - if (anerrno != EWOULDBLOCK) { - this->sockState = casOffLine; + if (anerrno != EWOULDBLOCK) { + this->sockState = casOffLine; } nBytesActual = 0u; return xSendOK; } - nBytesActual = (bufSizeT) status; return xSendOK; } @@ -247,9 +255,9 @@ void casStreamIO::osdShow (unsigned level) const // -// casStreamIO::setNonBlocking() +// casStreamIO::xSsetNonBlocking() // -void casStreamIO::setNonBlocking() +void casStreamIO::xSetNonBlocking() { int status; int yes = TRUE; diff --git a/src/cas/os/posix/casDGOS.cc b/src/cas/os/posix/casDGOS.cc index 414ed512a..87aa8f4af 100644 --- a/src/cas/os/posix/casDGOS.cc +++ b/src/cas/os/posix/casDGOS.cc @@ -6,6 +6,9 @@ * * * $Log$ + * Revision 1.2 1996/08/05 19:29:25 jhill + * os depen code now smaller + * * Revision 1.1.1.1 1996/06/20 00:28:06 jhill * ca server installation * @@ -52,7 +55,7 @@ void casDGEvWakeup::show(unsigned level) void casDGEvWakeup::expire() { casProcCond cond; - cond = this->os.casEventSys::process(); + cond = this->os.eventSysProcess(); if (cond != casProcOk) { // // if "this" is being used above this diff --git a/src/cas/os/posix/osiMutex.h b/src/cas/os/posix/osiMutex.h index 7b855ad9e..eab6c23f9 100644 --- a/src/cas/os/posix/osiMutex.h +++ b/src/cas/os/posix/osiMutex.h @@ -10,8 +10,9 @@ public: // (since g++ does not have exceptions) // int init() {return 0;} - void lock() {}; - void unlock() {}; + void osiLock() {} + void osiUnlock() {} + void show (unsigned) {} private: }; diff --git a/src/cas/os/vxWorks/caServerOS.cc b/src/cas/os/vxWorks/caServerOS.cc index b8394802d..8a95f298c 100644 --- a/src/cas/os/vxWorks/caServerOS.cc +++ b/src/cas/os/vxWorks/caServerOS.cc @@ -5,6 +5,9 @@ * * * $Log$ + * Revision 1.1 1996/09/04 22:06:43 jhill + * installed + * * Revision 1.1.1.1 1996/06/20 00:28:06 jhill * ca server installation * @@ -109,9 +112,9 @@ caServerOS::~caServerOS() // -// caServer() +// caServerEntry() // -void caServer(caServerI *pCAS) +void caServerEntry(caServerI *pCAS) { // // forever diff --git a/src/cas/os/vxWorks/casDGOS.cc b/src/cas/os/vxWorks/casDGOS.cc index d5e5aa595..a04536600 100644 --- a/src/cas/os/vxWorks/casDGOS.cc +++ b/src/cas/os/vxWorks/casDGOS.cc @@ -6,6 +6,9 @@ * * * $Log$ + * Revision 1.1 1996/09/04 22:06:45 jhill + * installed + * * Revision 1.1.1.1 1996/06/20 00:28:06 jhill * ca server installation * @@ -100,7 +103,7 @@ void casDGOS::show(unsigned level) taskShow(this->clientTId, level); } if (taskIdVerify(this->eventTId) == OK) { - taskShow(this->eventTId, level); + printf("casDGOS task id = %x\n", this->eventTId); } if (this->eventSignalSem) { semShow(this->eventSignalSem, level); @@ -174,16 +177,11 @@ casProcCond casDGOS::processInput () // int casDGServer (casDGOS *pDGOS) { - caStatus status; - // // block for the next DG until the connection closes // while (TRUE) { - status = pDGOS->processInput(); - if (status) { - errMessage(status, "casDGServer (casDGOS *pDGOS)"); - } + pDGOS->process(); } } @@ -202,7 +200,7 @@ int casDGEvent (casDGOS *pDGOS) status = semTake(pDGOS->eventSignalSem, WAIT_FOREVER); assert (status!=OK); - cond = pDGOS->casEventSys::process(); + cond = pDGOS->eventSysProcess(); if (cond != casProcOk) { printf("DG event sys process failed\n"); } diff --git a/src/cas/os/vxWorks/casOSD.h b/src/cas/os/vxWorks/casOSD.h index 280cadb26..2903b69d9 100644 --- a/src/cas/os/vxWorks/casOSD.h +++ b/src/cas/os/vxWorks/casOSD.h @@ -7,6 +7,9 @@ // Some BSD calls have crept in here // // $Log$ +// Revision 1.1 1996/09/04 22:06:46 jhill +// installed +// // Revision 1.1.1.1 1996/06/20 00:28:06 jhill // ca server installation // @@ -61,14 +64,14 @@ class caServerOS; // // vxWorks task entry // -int caServerEntry(caServerI *pCAS); +void caServerEntry(caServerI *pCAS); // // caServerOS // class caServerOS { friend class casServerReg; - friend int caServerEntry(caServerOS *pOS); + friend void caServerEntry(caServerI *pCAS); public: caServerOS (caServerI &casIn) : cas (casIn), pBTmr (NULL), tid(ERROR) {} diff --git a/src/cas/os/vxWorks/casStreamOS.cc b/src/cas/os/vxWorks/casStreamOS.cc index cd5adb184..200147ef9 100644 --- a/src/cas/os/vxWorks/casStreamOS.cc +++ b/src/cas/os/vxWorks/casStreamOS.cc @@ -4,6 +4,9 @@ // // // $Log$ +// Revision 1.1 1996/09/04 22:06:46 jhill +// installed +// // Revision 1.1.1.1 1996/06/20 00:28:06 jhill // ca server installation // @@ -23,7 +26,7 @@ // void casStreamOS::ioBlockedSignal() { - printf("in casStreamOS::ioBlockedSignal()\n"); + printf("in casStreamOS::ioBlockedSignal() ?\n"); } // @@ -115,7 +118,7 @@ void casStreamOS::show(unsigned level) taskShow(this->clientTId, level); } if (taskIdVerify(this->eventTId)==OK) { - taskShow(this->eventTId, level); + printf("casStreamOS task id %x\n", this->eventTId); } if (this->eventSignalSem) { semShow(this->eventSignalSem, level); @@ -217,10 +220,9 @@ int casStrmServer (casStreamOS *pStrmOS) { casFillCondition fillCond; casProcCond procCond; - caStatus status; // - // block for the next DG until the connection closes + // block for the next message until the connection closes // while (TRUE) { // @@ -237,20 +239,13 @@ int casStrmServer (casStreamOS *pStrmOS) // return OK; } - else if (pStrmOS->inBuf::full()==aitTrue) { - // - // If there isnt any space then temporarily - // stop calling this routine until problem is resolved - // either by: - // (1) sending or - // (2) a blocked IO op unblocks - // - delete pStrmOS; - // - // NO CODE HERE - // (see delete above) - // - return OK; + // + // force the output buffer to flush prior to + // blocking for more input (if no input bytes are + // pending) + // + if (pStrmOS->bytesAvailable()<=0u) { + pStrmOS->flush(); } } @@ -269,7 +264,7 @@ int casStrmEvent(casStreamOS *pStrmOS) // while (TRUE) { status = semTake(pStrmOS->eventSignalSem, WAIT_FOREVER); - assert (status!=OK); + assert (status==OK); cond = pStrmOS->casEventSys::process(); if (cond != casProcOk) { diff --git a/src/cas/os/vxWorks/osiMutex.h b/src/cas/os/vxWorks/osiMutex.h index f3cd18624..f91bf81a5 100644 --- a/src/cas/os/vxWorks/osiMutex.h +++ b/src/cas/os/vxWorks/osiMutex.h @@ -3,9 +3,26 @@ // osiMutex - OS independent mutex // (vxWorks version) // +// +// NOTES: +// 1) epicsPrintf() is used in this file because we cant stand +// the logMsg() 8 arg API amd we dont want the messages from different +// tasks to co-mingle +// #include -#include +#include +#include + + +#ifdef DEBUG_OSIMUTEX +#include +#endif + +#ifdef DEBUG_OSIMUTEX +#define osiLock() osiLockI (__FILE__, __LINE__) +#define osiUnlock() osiUnlockI (__FILE__, __LINE__) +#endif class osiMutex { public: @@ -24,26 +41,71 @@ public: { return -1; } +# ifdef DEBUG_OSIMUTEX + epicsPrintf("created mutex at %lx\n", + (unsigned long) this->mutex); +# endif return 0; } + ~osiMutex() { STATUS s; s = semDelete (this->mutex); assert (s==OK); +# ifdef DEBUG_OSIMUTEX + epicsPrintf("destroyed mutex at %lx\n", + (unsigned long) this->mutex); +# endif } - void lock() + +#ifdef DEBUG_OSIMUTEX + void osiLockI(const char *pFN, unsigned ln) +#else + void osiLock() +#endif { STATUS s; + if (!this->mutex) { + epicsPrintf( + "osiMutex: lock request before init was ignored\n"); + return; + } assert(this->mutex); s = semTake (this->mutex, WAIT_FOREVER); assert (s==OK); +# ifdef DEBUG_OSIMUTEX + epicsPrintf("L%lx in %s at %u\n", + (unsigned long) this->mutex, + pFN, ln); +# endif } - void unlock() + +#ifdef DEBUG_OSIMUTEX + void osiUnlockI(const char *pFN, unsigned ln) +#else + void osiUnlock() +#endif { STATUS s; + + if (!this->mutex) { + epicsPrintf( + "osiMutex: unlock request before init was ignored\n"); + return; + } s = semGive (this->mutex); assert (s==OK); +# ifdef DEBUG_OSIMUTEX + epicsPrintf("U%lx in %s at %d\n", + (unsigned long) this->mutex, + pFN, ln); +# endif + } + + void show(unsigned level) + { + semShow(this->mutex, (int) level); } private: SEM_ID mutex;