vxWorks port changes

This commit is contained in:
Jeff Hill
1996-09-16 18:27:51 +00:00
parent 1aa384308d
commit d611fa2d75
26 changed files with 481 additions and 231 deletions

View File

@@ -29,6 +29,9 @@
*
* History
* $Log$
* Revision 1.2 1996/06/21 02:30:52 jhill
* solaris port
*
* Revision 1.1.1.1 1996/06/20 00:28:14 jhill
* ca server installation
*
@@ -115,6 +118,9 @@ void caServer::show(unsigned level)
if (this->pCAS) {
this->pCAS->show(level);
}
else {
printf("caServer:: no server internals attached\n");
}
}
//

View File

@@ -29,6 +29,9 @@
*
* History
* $Log$
* Revision 1.4 1996/09/04 20:12:04 jhill
* added arg to serverToolDebugFunc()
*
* Revision 1.3 1996/08/13 22:56:12 jhill
* added init for mutex class
*
@@ -61,13 +64,15 @@ void caServerI::show (unsigned level)
printf( "Channel Access Server Status V%d.%d\n",
CA_PROTOCOL_VERSION, CA_MINOR_VERSION);
this->lock();
this->osiMutex::show(level);
this->osiLock();
while ( (pClient = iter()) ) {
pClient->show(level);
}
this->dgClient.show(level);
this->unlock();
this->osiUnlock();
bytes_reserved = 0u;
#if 0
@@ -94,14 +99,14 @@ void caServerI::show (unsigned level)
#endif
printf(
"The server's integer resource id conversion table:\n");
this->lock();
this->osiLock();
this->uintResTable<casRes>::show(level);
this->unlock();
this->osiUnlock();
printf(
"The server's character string resource id conversion table:\n");
this->lock();
this->osiLock();
this->stringResTbl.show(level);
this->unlock();
this->osiUnlock();
}
// @@@@@@ caPrintAddrList(&destAddr);
@@ -193,7 +198,7 @@ caServerI::~caServerI()
{
casClient *pClient;
this->lock();
this->osiLock();
//
// delete all clients
@@ -202,6 +207,8 @@ caServerI::~caServerI()
delete pClient;
}
this->osiUnlock();
//
// verify that we didnt leak a PV
//
@@ -214,9 +221,9 @@ caServerI::~caServerI()
//
void caServerI::installClient(casStrmClient *pClient)
{
this->lock();
this->osiLock();
this->clientList.add(*pClient);
this->unlock();
this->osiUnlock();
}
@@ -225,9 +232,9 @@ void caServerI::installClient(casStrmClient *pClient)
//
void caServerI::removeClient(casStrmClient *pClient)
{
this->lock();
this->osiLock();
this->clientList.remove(*pClient);
this->unlock();
this->osiUnlock();
}

View File

@@ -29,6 +29,9 @@
*
* History
* $Log$
* Revision 1.1.1.1 1996/06/20 00:28:16 jhill
* ca server installation
*
*
*/
@@ -110,10 +113,10 @@ inline void caServerI::installPV (casPVI &pv)
{
int resLibStatus;
this->lock ();
this->osiLock ();
this->pvCount++;
resLibStatus = this->stringResTbl.add (pv);
this->unlock ();
this->osiUnlock ();
assert (resLibStatus==0);
}
@@ -124,11 +127,11 @@ inline void caServerI::removePV(casPVI &pv)
{
casPVI *pPV;
this->lock();
this->osiLock();
casVerify (this->pvCount>=1u);
this->pvCount--;
pPV = this->stringResTbl.remove (pv);
this->unlock();
this->osiUnlock();
casVerify (pPV!=0);
casVerify (pPV==&pv);
}

View File

@@ -29,6 +29,9 @@
*
* History
* $Log$
* Revision 1.2 1996/09/04 20:16:24 jhill
* moved operator -> here
*
* Revision 1.1.1.1 1996/06/20 00:28:16 jhill
* ca server installation
*
@@ -44,7 +47,7 @@
//
inline void casAsyncIOI::lock()
{
client.lock();
client.osiLock();
}
//
@@ -52,7 +55,7 @@ inline void casAsyncIOI::lock()
//
inline void casAsyncIOI::unlock()
{
client.unlock();
client.osiUnlock();
}
//

View File

@@ -29,6 +29,9 @@
*
* History
* $Log$
* Revision 1.3 1996/09/04 20:18:27 jhill
* moved operator -> here
*
* Revision 1.2 1996/07/01 19:56:10 jhill
* one last update prior to first release
*
@@ -55,7 +58,7 @@ inline casChannel * casChannelI::operator -> ()
//
inline void casChannelI::lock()
{
this->client.lock();
this->client.osiLock();
}
//
@@ -63,7 +66,7 @@ inline void casChannelI::lock()
//
inline void casChannelI::unlock()
{
this->client.unlock();
this->client.osiUnlock();
}
//

View File

@@ -29,6 +29,9 @@
*
* History
* $Log$
* Revision 1.2 1996/08/13 22:53:14 jhill
* changes for MVC++
*
* Revision 1.1.1.1 1996/06/20 00:28:16 jhill
* ca server installation
*
@@ -107,6 +110,8 @@ casCoreClient::~casCoreClient()
ca_printf ("CAS: Connection Terminated\n");
}
this->osiLock();
//
// cancel any pending asynchronous IO
//
@@ -120,6 +125,8 @@ casCoreClient::~casCoreClient()
delete pCurIO;
pCurIO = pNextIO;
}
this->osiUnlock();
}
//

View File

@@ -29,6 +29,9 @@
*
* History
* $Log$
* Revision 1.5 1996/09/04 20:19:47 jhill
* added missing byte swap on search reply port no
*
* Revision 1.4 1996/08/13 22:54:20 jhill
* fixed little endian problem
*
@@ -311,7 +314,8 @@ void casDGClient::process()
// force all replies to be sent to the client
// that made the request
//
this->clear();
this->inBuf::clear();
this->outBuf::clear();
//
// read in new input
@@ -319,12 +323,11 @@ void casDGClient::process()
fillCond = this->fill();
if (fillCond == casFillDisconnect) {
casVerify(0);
return;
}
//
// verify that we have a message to process
//
if (this->inBuf::bytesPresent()>0u) {
else if (this->inBuf::bytesPresent()>0u) {
//
// process the message
//
@@ -332,22 +335,20 @@ void casDGClient::process()
if (status) {
errMessage (status,
"unexpected error processing stateless protocol");
//
// clear the input buffer so this will
// not effect future input
//
this->clear();
}
else {
//
// force all replies to go to the sender
//
flushCond = this->flush();
if (flushCond!=casFlushCompleted) {
this->clear();
casVerify(0);
}
}
//
// force all replies to go to the sender
//
flushCond = this->flush();
if (flushCond!=casFlushCompleted) {
casVerify(0);
}
}
//
// clear the input/output buffers so replies
// are always sent to the sender of the request
//
this->inBuf::clear();
this->outBuf::clear();
}

View File

@@ -29,6 +29,9 @@
*
* History
* $Log$
* Revision 1.2 1996/07/24 22:00:49 jhill
* added pushOnToEventQueue()
*
* Revision 1.1.1.1 1996/06/20 00:28:15 jhill
* ca server installation
*
@@ -79,16 +82,18 @@ casEventSys::~casEventSys()
{
casEvent *pE;
this->mutex.lock();
/*
* They must cancel all active event blocks first
* all active event blocks must be canceled first
*/
assert (this->numEventBlocks==0);
this->mutex.osiLock();
while ( (pE = this->eventLogQue.get()) ) {
delete pE;
}
this->mutex.osiUnlock();
}
@@ -97,10 +102,10 @@ casEventSys::~casEventSys()
//
void casEventSys::installMonitor()
{
this->mutex.lock();
this->mutex.osiLock();
this->numEventBlocks++;
this->maxLogEntries += averageEventEntries;
this->mutex.unlock();
this->mutex.osiUnlock();
}
//
@@ -108,11 +113,11 @@ void casEventSys::installMonitor()
//
void casEventSys::removeMonitor()
{
this->mutex.lock();
this->mutex.osiLock();
assert (this->numEventBlocks>=1u);
this->numEventBlocks--;
this->maxLogEntries -= averageEventEntries;
this->mutex.unlock();
this->mutex.osiUnlock();
}
@@ -126,7 +131,7 @@ casProcCond casEventSys::process()
casProcCond cond = casProcOk;
unsigned long nAccepted = 0u;
this->mutex.lock();
this->mutex.osiLock();
while ( (pEvent = this->eventLogQue.get()) ) {
@@ -166,7 +171,7 @@ casProcCond casEventSys::process()
this->coreClient.eventFlush();
}
this->mutex.unlock();
this->mutex.osiUnlock();
return cond;
}

View File

@@ -29,6 +29,9 @@
*
* History
* $Log$
* Revision 1.1.1.1 1996/06/20 00:28:16 jhill
* ca server installation
*
*
*/
@@ -41,9 +44,9 @@
//
inline void casEventSys::addToEventQueue(casEvent &event)
{
this->mutex.lock();
this->mutex.osiLock();
this->eventLogQue.add(event);
this->mutex.unlock();
this->mutex.osiUnlock();
//
// wakes up the event queue consumer
//

View File

@@ -29,6 +29,9 @@
*
* History
* $Log$
* Revision 1.6 1996/09/04 20:21:41 jhill
* removed operator -> and added member pv
*
* Revision 1.5 1996/07/01 19:56:11 jhill
* one last update prior to first release
*
@@ -560,13 +563,14 @@ public:
inline aitBool okToBeginNewIO() const;
inline void lock();
inline void unlock();
private:
tsDLList<casPVListChan> chanList;
caServerI &cas;
casPV &pv;
unsigned nMonAttached;
unsigned nIOAttached;
inline void lock();
inline void unlock();
};

View File

@@ -29,6 +29,9 @@
*
* History
* $Log$
* Revision 1.4 1996/07/24 22:00:49 jhill
* added pushOnToEventQueue()
*
* Revision 1.3 1996/07/01 19:56:11 jhill
* one last update prior to first release
*
@@ -82,7 +85,7 @@ casMonitor::~casMonitor()
{
casCoreClient &client = this->ciu.getClient();
this->mutex.lock();
this->mutex.osiLock();
this->disable();
@@ -97,6 +100,8 @@ casMonitor::~casMonitor()
this->pModifiedValue = NULL;
}
this->ciu.deleteMonitor(*this);
this->mutex.osiUnlock();
}
//
@@ -106,24 +111,16 @@ void casMonitor::enable()
{
caStatus status;
this->mutex.lock();
if (this->enabled) {
this->mutex.unlock();
return;
this->mutex.osiLock();
if (!this->enabled && this->ciu->readAccess()) {
this->enabled = TRUE;
status = this->ciu.getPVI().registerEvent();
if (status) {
errMessage(status,
"Server tool failed to register event\n");
}
}
if (!this->ciu->readAccess()) {
this->mutex.unlock();
return;
}
this->enabled = TRUE;
status = this->ciu.getPVI().registerEvent();
if (status) {
errMessage(status,
"Server tool failed to register event\n");
}
this->mutex.unlock();
this->mutex.osiUnlock();
}
//
@@ -131,14 +128,12 @@ void casMonitor::enable()
//
void casMonitor::disable()
{
this->mutex.lock();
if (!this->enabled) {
this->mutex.unlock();
return;
this->mutex.osiLock();
if (this->enabled) {
this->enabled = FALSE;
this->ciu.getPVI().unregisterEvent();
}
this->enabled = FALSE;
this->ciu.getPVI().unregisterEvent();
this->mutex.unlock();
this->mutex.osiUnlock();
}
//
@@ -150,7 +145,7 @@ void casMonitor::push(gdd &newValue)
casMonEvent *pLog = NULL;
char full;
this->mutex.lock();
this->mutex.osiLock();
//
// get a new block if we havent exceeded quotas
@@ -205,7 +200,7 @@ void casMonitor::push(gdd &newValue)
client.addToEventQueue(*pLog);
this->mutex.unlock();
this->mutex.osiUnlock();
}
//
@@ -219,7 +214,7 @@ caStatus casMonitor::executeEvent(casMonEvent *pEV)
pVal = pEV->getValue ();
assert (pVal);
this->mutex.lock();
this->mutex.osiLock();
if (this->ciu.getClient().getEventsOff()==aitFalse) {
status = this->callBack (*pVal);
}
@@ -236,7 +231,7 @@ caStatus casMonitor::executeEvent(casMonEvent *pEV)
this->pModifiedValue = pVal;
status = S_cas_success;
}
this->mutex.unlock();
this->mutex.osiUnlock();
//
// if the event isnt accepted we will try
@@ -285,12 +280,12 @@ void casMonitor::show(unsigned level)
//
void casMonitor::postIfModified()
{
this->mutex.lock();
this->mutex.osiLock();
if (this->pModifiedValue) {
this->callBack (*this->pModifiedValue);
this->pModifiedValue->unreference ();
this->pModifiedValue = NULL;
}
this->mutex.unlock();
this->mutex.osiUnlock();
}

View File

@@ -29,6 +29,9 @@
*
* History
* $Log$
* Revision 1.1.1.1 1996/06/20 00:28:15 jhill
* ca server installation
*
*
*/
@@ -39,8 +42,9 @@
casMsgIO::casMsgIO()
{
elapsedAtLastSend = this->elapsedAtLastRecv
this->elapsedAtLastSend = this->elapsedAtLastRecv
= osiTime::getCurrent ();
this->blockingStatus = xIsBlocking;
}
casMsgIO::~casMsgIO()
@@ -81,15 +85,41 @@ xRecvStatus casMsgIO::xRecv(char *pBuf, bufSizeT nBytes, bufSizeT &nActualBytes)
return stat;
}
xSendStatus casMsgIO::xSend(char *pBuf, bufSizeT nBytes, bufSizeT &nActualBytes)
xSendStatus casMsgIO::xSend(char *pBuf, bufSizeT nBytesAvailableToSend,
bufSizeT nBytesNeedToBeSent, bufSizeT &nActualBytes)
{
xSendStatus stat;
bufSizeT nActualBytesDelta;
stat = this->osdSend(pBuf, nBytes, nActualBytes);
if (stat == xSendOK) {
this->elapsedAtLastSend = osiTime::getCurrent();
assert (nBytesAvailableToSend>=nBytesNeedToBeSent);
nActualBytes = 0u;
if (this->blockingStatus == xIsntBlocking) {
stat = this->osdSend(pBuf, nBytesAvailableToSend,
nActualBytes);
if (stat == xSendOK) {
this->elapsedAtLastSend = osiTime::getCurrent();
}
return stat;
}
return stat;
while (nBytesNeedToBeSent) {
stat = this->osdSend(pBuf, nBytesAvailableToSend,
nActualBytesDelta);
if (stat != xSendOK) {
return stat;
}
this->elapsedAtLastSend = osiTime::getCurrent();
nActualBytes += nActualBytesDelta;
if (nBytesNeedToBeSent>nActualBytesDelta) {
nBytesNeedToBeSent -= nActualBytesDelta;
}
else {
break;
}
}
return xSendOK;
}
void casMsgIO::sendBeacon(char & /*msg*/, bufSizeT /*length*/,
@@ -103,9 +133,9 @@ int casMsgIO::getFileDescriptor() const
return -1; // some os will not have file descriptors
}
void casMsgIO::setNonBlocking()
void casMsgIO::xSetNonBlocking()
{
printf("virtual base setNonBlocking() called?\n");
printf("virtual base casMsgIO::xSetNonBlocking() called?\n");
}
bufSizeT casMsgIO::incommingBytesPresent() const

View File

@@ -29,6 +29,9 @@
*
* History
* $Log$
* Revision 1.5 1996/09/04 20:23:59 jhill
* added operator ->
*
* Revision 1.4 1996/07/01 19:56:13 jhill
* one last update prior to first release
*
@@ -75,7 +78,13 @@ casPV * casPVI::operator -> () const
//
inline void casPVI::lock()
{
this->cas.lock();
//
// NOTE:
// if this lock becomes something else besides the
// server's lock then look carefully at the
// comment in casPVI::deleteSignal()
//
this->cas.osiLock();
}
//
@@ -83,7 +92,7 @@ inline void casPVI::lock()
//
inline void casPVI::unlock()
{
this->cas.unlock();
this->cas.osiUnlock();
}
//
@@ -148,13 +157,29 @@ inline void casPVI::unregisterIO()
//
inline void casPVI::deleteSignal()
{
this->lock();
caServerI &localCASRef(this->cas);
//
// We dont take the PV lock here because
// the PV may be destroyed and we must
// keep the lock unlock pairs consistent
// (because the PV's lock is really a ref
// to the server's lock)
//
// This is safe to do because we take the PV
// lock when we add a new channel (and the
// PV lock is realy the server's lock)
//
localCASRef.osiLock();
if (this->chanList.count()==0u) {
(*this)->destroy();
//
// !! dont access self after destroy !!
//
}
else {
this->unlock();
}
localCASRef.osiUnlock();
}
//

View File

@@ -29,6 +29,10 @@
*
* History
* $Log$
* Revision 1.9 1996/09/04 20:25:53 jhill
* use correct app type for exist test gdd, correct byte
* oder for mon mask, and efficient use of PV name gdd
*
* Revision 1.8 1996/08/05 23:22:57 jhill
* gddScaler => gddScalar
*
@@ -214,7 +218,7 @@ casStrmClient::~casStrmClient()
delete [] this->pHostName;
}
this->lock();
this->osiLock();
//
// delete all channel attached
@@ -228,6 +232,10 @@ casStrmClient::~casStrmClient()
pChan->clientDestroy();
pChan = pNextChan;
}
delete &this->msgIO;
this->osiUnlock();
}
//
@@ -812,7 +820,7 @@ caStatus casStrmClient::hostNameAction()
size-1);
pMalloc[size-1]='\0';
this->lock();
this->osiLock();
if (this->pHostName) {
delete [] this->pHostName;
@@ -823,7 +831,7 @@ caStatus casStrmClient::hostNameAction()
(*pciu)->setOwner(this->pUserName, this->pHostName);
}
this->unlock();
this->osiUnlock();
return S_cas_success;
}
@@ -857,7 +865,7 @@ caStatus casStrmClient::clientNameAction()
size-1);
pMalloc[size-1]='\0';
this->lock();
this->osiLock();
if (this->pUserName) {
delete [] this->pUserName;
}
@@ -866,7 +874,7 @@ caStatus casStrmClient::clientNameAction()
while ( (pciu = iter()) ) {
(*pciu)->setOwner(this->pUserName, this->pHostName);
}
this->unlock();
this->osiUnlock();
return S_cas_success;
}
@@ -1015,11 +1023,11 @@ caStatus casStrmClient::eventsOnAction ()
// perhaps this is to slow - perhaps there
// should be a queue of modified events
//
this->lock();
this->osiLock();
while ( (pciu = iter()) ) {
pciu->postAllModifiedEvents();
}
this->unlock();
this->osiUnlock();
return S_cas_success;
}
@@ -1639,13 +1647,13 @@ caStatus casStrmClient::createChanResponse(casChannelI *,
// prevent problems such as the PV being deleted before the
// channel references it
//
this->lock();
this->osiLock();
pCanonicalName->markConstant();
pPV = this->ctx.getServer()->createPV(*pCanonicalName);
if (!pPV) {
this->unlock();
this->osiUnlock();
return this->channelCreateFailed(&msg, S_cas_noMemory);
}
@@ -1656,12 +1664,12 @@ caStatus casStrmClient::createChanResponse(casChannelI *,
pChan = (*pPV)->createChannel(this->ctx,
this->pUserName, this->pHostName);
if (!pChan) {
this->unlock();
this->osiUnlock();
pPV->deleteSignal();
return this->channelCreateFailed(&msg, S_cas_noMemory);
}
this->unlock();
this->osiUnlock();
pChanI = (casChannelI *) pChan;
@@ -1711,7 +1719,7 @@ casPVI *caServerI::createPV (gdd &name)
name.getRef(pNameStr);
stringId id (pNameStr->string());
this->lock ();
this->osiLock ();
pPVI = this->stringResTbl.lookup (id);
if (!pPVI) {
@@ -1726,7 +1734,7 @@ casPVI *caServerI::createPV (gdd &name)
// lock shouldnt be released until we finish creating and
// installing the PV
//
this->unlock ();
this->osiUnlock ();
return pPVI;
}
@@ -1744,10 +1752,10 @@ inline aitBool caServerI::roomForNewChannel() const
//
void casStrmClient::installChannel(casChannelI &chan)
{
this->lock();
this->osiLock();
this->getCAS().installItem(chan);
this->chanList.add(chan);
this->unlock();
this->osiUnlock();
}
//
@@ -1757,10 +1765,10 @@ void casStrmClient::removeChannel(casChannelI &chan)
{
casRes *pRes;
this->lock();
this->osiLock();
pRes = this->getCAS().removeItem(chan);
assert (&chan == (casChannelI *)pRes);
this->chanList.remove(chan);
this->unlock();
this->osiUnlock();
}

View File

@@ -29,6 +29,9 @@
*
* History
* $Log$
* Revision 1.3 1996/09/04 20:27:01 jhill
* doccasdef.h
*
* Revision 1.2 1996/08/13 22:53:59 jhill
* fixed little endian problem
*
@@ -47,13 +50,12 @@
//
outBuf::outBuf(casMsgIO &virtualCircuit, osiMutex &mutexIn) :
io(virtualCircuit),
mutex(mutexIn)
mutex(mutexIn),
pBuf(NULL),
bufSize(io.optimumBufferSize()),
stack(0u)
{
assert(&io);
this->stack = 0u;
this->bufSize = 0u;
this->pBuf = NULL;
}
//
@@ -61,10 +63,8 @@ outBuf::outBuf(casMsgIO &virtualCircuit, osiMutex &mutexIn) :
//
caStatus outBuf::init()
{
this->bufSize = io.optimumBufferSize();
this->pBuf = new char [this->bufSize];
if (!this->pBuf) {
this->bufSize = 0u;
return S_cas_noMemory;
}
return S_cas_success;
@@ -94,30 +94,32 @@ caHdr **ppMsg
)
{
bufSizeT msgsize;
bufSizeT stackNeeded;
extsize = CA_MESSAGE_ALIGN(extsize);
msgsize = extsize + sizeof(caHdr);
if (msgsize>this->bufSize) {
if (msgsize>this->bufSize || this->pBuf==NULL) {
return S_cas_hugeRequest;
}
stackNeeded = this->bufSize - msgsize;
this->mutex.lock();
this->mutex.osiLock();
if (this->stack + msgsize > this->bufSize) {
if (this->stack > stackNeeded) {
/*
* Try to flush the output queue
*/
this->flush();
this->flush(casFlushSpecified, msgsize);
/*
* If this failed then the fd is nonblocking
* and we will let select() take care of it
*/
if (this->stack + msgsize > this->bufSize) {
this->mutex.unlock();
if (this->stack > stackNeeded) {
this->mutex.osiUnlock();
this->sendBlockSignal();
return S_cas_sendBlocked;
}
@@ -183,43 +185,69 @@ void outBuf::commitMsg ()
this->stack += sizeof(caHdr) + extSize;
assert (this->stack <= this->bufSize);
this->mutex.unlock();
this->mutex.osiUnlock();
}
//
// outBuf::flush()
//
casFlushCondition outBuf::flush()
casFlushCondition outBuf::flush(casFlushRequest req,
bufSizeT spaceRequired)
{
bufSizeT nBytes;
xSendStatus stat;
bufSizeT nBytes;
bufSizeT stackNeeded;
bufSizeT nBytesNeeded;
xSendStatus stat;
casFlushCondition cond;
if (this->stack<=0u) {
return casFlushCompleted;
}
stat = this->io.xSend(this->pBuf, this->stack, nBytes);
if (stat!=xSendOK) {
if (!this->pBuf) {
return casFlushDisconnect;
}
else if (nBytes >= this->stack) {
this->stack=0u;
return casFlushCompleted;
}
else if (nBytes) {
bufSizeT len;
len = this->stack-nBytes;
//
// memmove() is ok with overlapping buffers
//
memmove (this->pBuf, &this->pBuf[nBytes], len);
this->stack = len;
return casFlushPartial;
this->mutex.osiLock();
if (req==casFlushAll) {
nBytesNeeded = this->stack;
}
else {
return casFlushNone;
stackNeeded = this->bufSize - spaceRequired;
if (this->stack>stackNeeded) {
nBytesNeeded = this->stack - stackNeeded;
}
else {
nBytesNeeded = 0u;
}
}
stat = this->io.xSend(this->pBuf, this->stack,
nBytesNeeded, nBytes);
if (nBytes) {
bufSizeT len;
if (nBytes >= this->stack) {
this->stack=0u;
cond = casFlushCompleted;
}
else {
len = this->stack-nBytes;
//
// memmove() is ok with overlapping buffers
//
memmove (this->pBuf, &this->pBuf[nBytes], len);
this->stack = len;
cond = casFlushPartial;
}
}
else {
cond = casFlushNone;
}
if (stat!=xSendOK) {
cond = casFlushDisconnect;
}
this->mutex.osiUnlock();
return cond;
}
//

View File

@@ -29,6 +29,9 @@
*
* History
* $Log$
* Revision 1.10 1996/09/04 20:27:02 jhill
* doccasdef.h
*
* Revision 1.9 1996/08/13 22:56:14 jhill
* added init for mutex class
*
@@ -110,6 +113,7 @@ caStatus copyBetweenDD(gdd &dest, gdd &src);
//
enum xRecvStatus {xRecvOK, xRecvDisconnect};
enum xSendStatus {xSendOK, xSendDisconnect};
enum xBlockingStatus {xIsBlocking, xIsntBlocking};
enum casIOState {casOnLine, casOffLine};
typedef unsigned bufSizeT;
class casMsgIO {
@@ -129,8 +133,8 @@ public:
//
// device dependent recv
//
xSendStatus xSend (char *pBuf, bufSizeT nBytesToSend,
bufSizeT &nBytesSent);
xSendStatus xSend (char *pBuf, bufSizeT nBytesAvailableToSend,
bufSizeT nBytesNeedToBeSent, bufSizeT &nBytesSent);
xRecvStatus xRecv (char *pBuf, bufSizeT nBytesToRecv,
bufSizeT &nByesRecv);
@@ -140,7 +144,11 @@ public:
virtual casIOState state() const=0;
virtual void hostNameFromAddr (char *pBuf, unsigned bufSize)=0;
virtual int getFileDescriptor() const;
virtual void setNonBlocking();
void setNonBlocking()
{
this->xSetNonBlocking();
this->blockingStatus = xIsntBlocking;
}
//
// only for use with DG io
@@ -154,12 +162,14 @@ private:
//
osiTime elapsedAtLastSend;
osiTime elapsedAtLastRecv;
xBlockingStatus blockingStatus;
virtual xSendStatus osdSend (const char *pBuf,
bufSizeT nBytesReq, bufSizeT &nBytesActual) =0;
virtual xRecvStatus osdRecv (char *pBuf,
bufSizeT nBytesReq, bufSizeT &nBytesActual) =0;
virtual void osdShow (unsigned level) const = 0;
virtual void xSetNonBlocking();
};
#include <casOSD.h> // OS dependent
@@ -260,9 +270,9 @@ private:
//
inline void casEventSys::insertEventQueue(casEvent &insert, casEvent &prevEvent)
{
this->mutex.lock();
this->mutex.osiLock();
this->eventLogQue.insertAfter(insert, prevEvent);
this->mutex.unlock();
this->mutex.osiUnlock();
}
//
@@ -270,9 +280,9 @@ inline void casEventSys::insertEventQueue(casEvent &insert, casEvent &prevEvent)
//
inline void casEventSys::pushOnToEventQueue(casEvent &event)
{
this->mutex.lock();
this->mutex.osiLock();
this->eventLogQue.push(event);
this->mutex.unlock();
this->mutex.osiUnlock();
}
//
@@ -280,9 +290,9 @@ inline void casEventSys::pushOnToEventQueue(casEvent &event)
//
inline void casEventSys::removeFromEventQueue(casEvent &event)
{
this->mutex.lock();
this->mutex.osiLock();
this->eventLogQue.remove(event);
this->mutex.unlock();
this->mutex.osiUnlock();
}
//
@@ -469,6 +479,9 @@ enum casFlushCondition{
casFlushPartial,
casFlushCompleted,
casFlushDisconnect};
enum casFlushRequest{
casFlushAll,
casFlushSpecified};
class outBuf {
public:
@@ -480,13 +493,24 @@ public:
// number of bytes in the output queue?
//
bufSizeT bytesPresent() const
{ return this->stack; }
{
return this->stack;
}
//
// number of bytes unused in the output queue?
//
bufSizeT bytesFree() const
{
return this->bufSize - this->stack;
}
//
// flush output queue
// (returns the number of bytes sent)
//
casFlushCondition flush();
casFlushCondition flush(casFlushRequest req = casFlushAll,
bufSizeT spaceRequired=0u);
//
// allocate message buffer space
@@ -502,18 +526,25 @@ public:
//
// release an allocated message (but dont send it)
//
void discardMsg () { this->mutex.unlock(); };
void discardMsg ()
{
this->mutex.osiUnlock();
};
void show(unsigned level);
virtual unsigned getDebugLevel()=0;
virtual void sendBlockSignal()=0;
void clear()
{
this->stack = 0u;
}
private:
casMsgIO &io;
osiMutex &mutex;
char *pBuf;
bufSizeT bufSize;
const bufSizeT bufSize;
bufSizeT stack;
};
@@ -552,16 +583,16 @@ public:
void installAsyncIO(casAsyncIOI &ioIn)
{
this->lock();
this->osiLock();
this->ioInProgList.add(ioIn);
this->unlock();
this->osiUnlock();
}
void removeAsyncIO(casAsyncIOI &ioIn)
{
this->lock();
this->osiLock();
this->ioInProgList.remove(ioIn);
this->unlock();
this->osiUnlock();
}
casRes *lookupRes(const caResId &idIn, casResType type);
@@ -668,8 +699,8 @@ protected:
const void *dp, const char *pFileName,
const unsigned lineno);
private:
casMsgIO &msgIO;
private:
//
// dump message to stderr
@@ -822,7 +853,7 @@ private:
//
// casDGClient
//
class casDGClient : private casDGIO, public casClient {
class casDGClient : private casDGIO, private casClient {
public:
casDGClient (caServerI &serverIn);
@@ -857,9 +888,24 @@ public:
void destroy();
int getFD() const
{
return this->casClient::getFD();
}
protected:
void process();
casProcCond eventSysProcess()
{
return this->casEventSys::process();
}
casFlushCondition flush(casFlushRequest req = casFlushAll,
bufSizeT spaceRequired=0u)
{
return this->outBuf::flush(req,spaceRequired);
}
private:
void ioBlockedSignal(); // dummy

View File

@@ -176,9 +176,9 @@ void casDGIO::osdShow (unsigned level) const
}
//
// casDGIO::setNonBlocking()
// casDGIO::xSetNonBlocking()
//
void casDGIO::setNonBlocking()
void casDGIO::xSetNonBlocking()
{
int status;
int yes = TRUE;

View File

@@ -7,6 +7,9 @@
// Some BSD calls have crept in here
//
// $Log$
// Revision 1.4 1996/09/04 20:29:08 jhill
// removed os depen stuff
//
// Revision 1.3 1996/08/13 23:00:29 jhill
// removed include of netdb.h
//
@@ -84,7 +87,7 @@ public:
~casStreamIO();
int getFileDescriptor() const;
void setNonBlocking();
void xSetNonBlocking();
bufSizeT optimumBufferSize ();
casIOState state() const;
@@ -110,7 +113,7 @@ public:
~casDGIO();
int getFileDescriptor() const;
void setNonBlocking();
void xSetNonBlocking();
bufSizeT optimumBufferSize ();
void sendBeacon(char &msg, bufSizeT length,
aitUint32 &m_avail);

View File

@@ -5,6 +5,9 @@
//
//
// $Log$
// Revision 1.4 1996/07/24 22:03:36 jhill
// fixed net proto for gnu compiler
//
// Revision 1.3 1996/07/09 22:55:22 jhill
// added cast
//
@@ -23,8 +26,15 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#ifndef SUNOS4 // the SUNOS4 prototypes are trad C - see bsdProto.h
//
// the SUNOS4 and vxWorks5.2 prototypes are trad C
//
#if !defined(SUNOS4) && !defined(vxWorks)
#include <arpa/inet.h>
#else
extern "C" {
char * inet_ntoa(struct in_addr);
}
#endif
@@ -32,10 +42,9 @@
// casStreamIO::casStreamIO()
//
casStreamIO::casStreamIO(const SOCKET s, const caAddr &a) :
sock(s), addr(a)
sockState(casOffLine), sock(s), addr(a)
{
assert (sock>=0);
this->sockState = casOffLine;
}
@@ -124,7 +133,8 @@ caStatus casStreamIO::init()
casStreamIO::~casStreamIO()
{
if (sock>=0) {
close(sock);
close(this->sock);
printf("closing sock=%d\n", this->sock);
}
}
@@ -132,7 +142,7 @@ casStreamIO::~casStreamIO()
//
// casStreamIO::osdSend()
//
xSendStatus casStreamIO::osdSend(const char *pBuf, bufSizeT nBytes,
xSendStatus casStreamIO::osdSend(const char *pBuf, bufSizeT nBytesReq,
bufSizeT &nBytesActual)
{
int status;
@@ -156,31 +166,29 @@ xSendStatus casStreamIO::osdSend(const char *pBuf, bufSizeT nBytes,
return xSendDisconnect;
}
if (nBytes<=0u) {
if (nBytesReq<=0u) {
nBytesActual = 0u;
return xSendOK;
}
status = send (
this->sock,
pBuf,
nBytes,
0);
if (status == 0) {
this->sockState = casOffLine;
status = send (
this->sock,
(char *) pBuf,
nBytesReq,
0);
if (status == 0) {
this->sockState = casOffLine;
return xSendDisconnect;
}
else if (status<0) {
int anerrno = SOCKERRNO;
}
else if (status<0) {
int anerrno = SOCKERRNO;
if (anerrno != EWOULDBLOCK) {
this->sockState = casOffLine;
if (anerrno != EWOULDBLOCK) {
this->sockState = casOffLine;
}
nBytesActual = 0u;
return xSendOK;
}
nBytesActual = (bufSizeT) status;
return xSendOK;
}
@@ -247,9 +255,9 @@ void casStreamIO::osdShow (unsigned level) const
//
// casStreamIO::setNonBlocking()
// casStreamIO::xSsetNonBlocking()
//
void casStreamIO::setNonBlocking()
void casStreamIO::xSetNonBlocking()
{
int status;
int yes = TRUE;

View File

@@ -6,6 +6,9 @@
*
*
* $Log$
* Revision 1.2 1996/08/05 19:29:25 jhill
* os depen code now smaller
*
* Revision 1.1.1.1 1996/06/20 00:28:06 jhill
* ca server installation
*
@@ -52,7 +55,7 @@ void casDGEvWakeup::show(unsigned level)
void casDGEvWakeup::expire()
{
casProcCond cond;
cond = this->os.casEventSys::process();
cond = this->os.eventSysProcess();
if (cond != casProcOk) {
//
// if "this" is being used above this

View File

@@ -10,8 +10,9 @@ public:
// (since g++ does not have exceptions)
//
int init() {return 0;}
void lock() {};
void unlock() {};
void osiLock() {}
void osiUnlock() {}
void show (unsigned) {}
private:
};

View File

@@ -5,6 +5,9 @@
*
*
* $Log$
* Revision 1.1 1996/09/04 22:06:43 jhill
* installed
*
* Revision 1.1.1.1 1996/06/20 00:28:06 jhill
* ca server installation
*
@@ -109,9 +112,9 @@ caServerOS::~caServerOS()
//
// caServer()
// caServerEntry()
//
void caServer(caServerI *pCAS)
void caServerEntry(caServerI *pCAS)
{
//
// forever

View File

@@ -6,6 +6,9 @@
*
*
* $Log$
* Revision 1.1 1996/09/04 22:06:45 jhill
* installed
*
* Revision 1.1.1.1 1996/06/20 00:28:06 jhill
* ca server installation
*
@@ -100,7 +103,7 @@ void casDGOS::show(unsigned level)
taskShow(this->clientTId, level);
}
if (taskIdVerify(this->eventTId) == OK) {
taskShow(this->eventTId, level);
printf("casDGOS task id = %x\n", this->eventTId);
}
if (this->eventSignalSem) {
semShow(this->eventSignalSem, level);
@@ -174,16 +177,11 @@ casProcCond casDGOS::processInput ()
//
int casDGServer (casDGOS *pDGOS)
{
caStatus status;
//
// block for the next DG until the connection closes
//
while (TRUE) {
status = pDGOS->processInput();
if (status) {
errMessage(status, "casDGServer (casDGOS *pDGOS)");
}
pDGOS->process();
}
}
@@ -202,7 +200,7 @@ int casDGEvent (casDGOS *pDGOS)
status = semTake(pDGOS->eventSignalSem, WAIT_FOREVER);
assert (status!=OK);
cond = pDGOS->casEventSys::process();
cond = pDGOS->eventSysProcess();
if (cond != casProcOk) {
printf("DG event sys process failed\n");
}

View File

@@ -7,6 +7,9 @@
// Some BSD calls have crept in here
//
// $Log$
// Revision 1.1 1996/09/04 22:06:46 jhill
// installed
//
// Revision 1.1.1.1 1996/06/20 00:28:06 jhill
// ca server installation
//
@@ -61,14 +64,14 @@ class caServerOS;
//
// vxWorks task entry
//
int caServerEntry(caServerI *pCAS);
void caServerEntry(caServerI *pCAS);
//
// caServerOS
//
class caServerOS {
friend class casServerReg;
friend int caServerEntry(caServerOS *pOS);
friend void caServerEntry(caServerI *pCAS);
public:
caServerOS (caServerI &casIn) :
cas (casIn), pBTmr (NULL), tid(ERROR) {}

View File

@@ -4,6 +4,9 @@
//
//
// $Log$
// Revision 1.1 1996/09/04 22:06:46 jhill
// installed
//
// Revision 1.1.1.1 1996/06/20 00:28:06 jhill
// ca server installation
//
@@ -23,7 +26,7 @@
//
void casStreamOS::ioBlockedSignal()
{
printf("in casStreamOS::ioBlockedSignal()\n");
printf("in casStreamOS::ioBlockedSignal() ?\n");
}
//
@@ -115,7 +118,7 @@ void casStreamOS::show(unsigned level)
taskShow(this->clientTId, level);
}
if (taskIdVerify(this->eventTId)==OK) {
taskShow(this->eventTId, level);
printf("casStreamOS task id %x\n", this->eventTId);
}
if (this->eventSignalSem) {
semShow(this->eventSignalSem, level);
@@ -217,10 +220,9 @@ int casStrmServer (casStreamOS *pStrmOS)
{
casFillCondition fillCond;
casProcCond procCond;
caStatus status;
//
// block for the next DG until the connection closes
// block for the next message until the connection closes
//
while (TRUE) {
//
@@ -237,20 +239,13 @@ int casStrmServer (casStreamOS *pStrmOS)
//
return OK;
}
else if (pStrmOS->inBuf::full()==aitTrue) {
//
// If there isnt any space then temporarily
// stop calling this routine until problem is resolved
// either by:
// (1) sending or
// (2) a blocked IO op unblocks
//
delete pStrmOS;
//
// NO CODE HERE
// (see delete above)
//
return OK;
//
// force the output buffer to flush prior to
// blocking for more input (if no input bytes are
// pending)
//
if (pStrmOS->bytesAvailable()<=0u) {
pStrmOS->flush();
}
}
@@ -269,7 +264,7 @@ int casStrmEvent(casStreamOS *pStrmOS)
//
while (TRUE) {
status = semTake(pStrmOS->eventSignalSem, WAIT_FOREVER);
assert (status!=OK);
assert (status==OK);
cond = pStrmOS->casEventSys::process();
if (cond != casProcOk) {

View File

@@ -3,9 +3,26 @@
// osiMutex - OS independent mutex
// (vxWorks version)
//
//
// NOTES:
// 1) epicsPrintf() is used in this file because we cant stand
// the logMsg() 8 arg API amd we dont want the messages from different
// tasks to co-mingle
//
#include <semLib.h>
#include <assert.h>
#include <epicsAssert.h>
#include <epicsPrint.h>
#ifdef DEBUG_OSIMUTEX
#include <stdio.h>
#endif
#ifdef DEBUG_OSIMUTEX
#define osiLock() osiLockI (__FILE__, __LINE__)
#define osiUnlock() osiUnlockI (__FILE__, __LINE__)
#endif
class osiMutex {
public:
@@ -24,26 +41,71 @@ public:
{
return -1;
}
# ifdef DEBUG_OSIMUTEX
epicsPrintf("created mutex at %lx\n",
(unsigned long) this->mutex);
# endif
return 0;
}
~osiMutex()
{
STATUS s;
s = semDelete (this->mutex);
assert (s==OK);
# ifdef DEBUG_OSIMUTEX
epicsPrintf("destroyed mutex at %lx\n",
(unsigned long) this->mutex);
# endif
}
void lock()
#ifdef DEBUG_OSIMUTEX
void osiLockI(const char *pFN, unsigned ln)
#else
void osiLock()
#endif
{
STATUS s;
if (!this->mutex) {
epicsPrintf(
"osiMutex: lock request before init was ignored\n");
return;
}
assert(this->mutex);
s = semTake (this->mutex, WAIT_FOREVER);
assert (s==OK);
# ifdef DEBUG_OSIMUTEX
epicsPrintf("L%lx in %s at %u\n",
(unsigned long) this->mutex,
pFN, ln);
# endif
}
void unlock()
#ifdef DEBUG_OSIMUTEX
void osiUnlockI(const char *pFN, unsigned ln)
#else
void osiUnlock()
#endif
{
STATUS s;
if (!this->mutex) {
epicsPrintf(
"osiMutex: unlock request before init was ignored\n");
return;
}
s = semGive (this->mutex);
assert (s==OK);
# ifdef DEBUG_OSIMUTEX
epicsPrintf("U%lx in %s at %d\n",
(unsigned long) this->mutex,
pFN, ln);
# endif
}
void show(unsigned level)
{
semShow(this->mutex, (int) level);
}
private:
SEM_ID mutex;