diff --git a/src/cas/example/Makefile.Vx.WorkInProgress b/src/cas/example/Makefile.Vx.WorkInProgress new file mode 100644 index 000000000..df941c9ee --- /dev/null +++ b/src/cas/example/Makefile.Vx.WorkInProgress @@ -0,0 +1,38 @@ + +CAS = ../../ +TOP = $(CAS)/../.. + +include $(TOP)/config/CONFIG_BASE + +CXXCMPLR = STRICT + +USR_INCLUDES = +USR_LDFLAGS = + +DEPLIBS_BASE = $(EPICS_BASE_LIB) +DEPLIBS = $(DEPLIBS_BASE)/libcas.a $(DEPLIBSWOCAS) + +SRCS.cc += ../vxEntry.cc +SRCS.cc += ../exServer.cc +SRCS.cc += ../exPV.cc +SRCS.cc += ../exSyncPV.cc +SRCS.cc += ../exAsyncPV.cc +SRCS.cc += ../exChannel.cc + +LIBOBJS += vxEntry.o +LIBOBJS += exServer.o +LIBOBJS += exPV.o +LIBOBJS += exSyncPV.o +LIBOBJS += exAsyncPV.o +LIBOBJS += exChannel.o + +LIBNAME = libexserver.o +include $(TOP)/config/RULES.Vx + +excas: $(OBJS) $(DEPLIBS) + $(LINK.cc) -o $@ $(OBJS) $(LDFLAGS) $(LDLIBS) + +clean:: + @$(RM) -rf Templates.DB + @$(RM) core + diff --git a/src/cas/example/README b/src/cas/example/README new file mode 100644 index 000000000..1ab19153f --- /dev/null +++ b/src/cas/example/README @@ -0,0 +1,13 @@ + +This directory contains an example ca server. The example server +exports 4 process variables (PVs): "fred", "freddy", "jane", and +"janet". "fred" and "jane" are synchronous PVs. "freddy" and "janet" +are asynchronous. Many ca servers will find that synchronous +variables will meet there needs and therefore will not require +the increased complexity associated with asynchronous PVs. The PVs in +the example server are updated periodically. Some random "noise" +is added to each PVs current value each time that it is updated. + +The example server does not so far implement enumerated data types. + + diff --git a/src/cas/generic/casAddr.h b/src/cas/generic/casAddr.h new file mode 100644 index 000000000..f72796ede --- /dev/null +++ b/src/cas/generic/casAddr.h @@ -0,0 +1,9 @@ + +#ifndef includeCASAddrH +#define includeCASAddrH + +#include "osiSock.h" +#include "addrList.h" + +#endif // includeCASAddrH + diff --git a/src/cas/generic/casAsyncExIOI.cc b/src/cas/generic/casAsyncExIOI.cc new file mode 100644 index 000000000..2d5eb8b8e --- /dev/null +++ b/src/cas/generic/casAsyncExIOI.cc @@ -0,0 +1,107 @@ +/* + * $Id$ + * + * Author Jeffrey O. Hill + * johill@lanl.gov + * 505 665 1831 + * + * Experimental Physics and Industrial Control System (EPICS) + * + * Copyright 1991, the Regents of the University of California, + * and the University of Chicago Board of Governors. + * + * This software was produced under U.S. Government contracts: + * (W-7405-ENG-36) at the Los Alamos National Laboratory, + * and (W-31-109-ENG-38) at Argonne National Laboratory. + * + * Initial development by: + * The Controls and Automation Group (AT-8) + * Ground Test Accelerator + * Accelerator Technology Division + * Los Alamos National Laboratory + * + * Co-developed with + * The Controls and Computing Group + * Accelerator Systems Division + * Advanced Photon Source + * Argonne National Laboratory + * + * + * History + * $Log$ + * + * + */ + + +#include +#include // casAsyncIOI in line func +#include // casChannelI in line func +#include // casOpaqueAddr in line func +#include // casCtx in line func +#include // casCoreClient in line func + +// +// casAsyncExIOI::casAsyncExIOI() +// +casAsyncExIOI::casAsyncExIOI(const casCtx &ctx, + casAsyncPVExistIO &ioIn) : + casAsyncIOI(*ctx.getClient(), ioIn), + msg(*ctx.getMsg()), + pOutDGIntfIO(ctx.getClient()->fetchOutIntf()), + dgOutAddr(ctx.getClient()->fetchRespAddr()) +{ + assert (&this->msg); + + if (this->msg.m_cmmd != CA_PROTO_CLAIM_CIU && + this->msg.m_cmmd != CA_PROTO_SEARCH) { + this->reportInvalidAsynchIO(this->msg.m_cmmd); + } + + this->client.installAsyncIO(*this); +} + +// +// casAsyncExIOI::~casAsyncExIOI() +// +casAsyncExIOI::~casAsyncExIOI() +{ + this->client.removeAsyncIO(*this); +} + +// +// casAsyncExIOI::postIOCompletion() +// +caStatus casAsyncExIOI::postIOCompletion(const pvExistReturn &retValIn) +{ + this->retVal = retValIn; + return this->postIOCompletionI(); +} + + +// +// casAsyncExIOI::cbFuncAsyncIO() +// (called when IO completion event reaches top of event queue) +// +caStatus casAsyncExIOI::cbFuncAsyncIO() +{ + caStatus status; + + if (this->msg.m_cmmd == CA_PROTO_SEARCH) { + // + // pass output DG address parameters + // + assert(this->pOutDGIntfIO); + status = this->client.asyncSearchResponse(*this->pOutDGIntfIO, + this->dgOutAddr.get(), this->msg, this->retVal); + } + else if (this->msg.m_cmmd == CA_PROTO_CLAIM_CIU) { + status = this->client.createChanResponse(this->msg, this->retVal); + } + else { + status = S_cas_internal; + } + + return status; +} + diff --git a/src/cas/generic/casAsyncRdIOI.cc b/src/cas/generic/casAsyncRdIOI.cc new file mode 100644 index 000000000..a21ecbaae --- /dev/null +++ b/src/cas/generic/casAsyncRdIOI.cc @@ -0,0 +1,141 @@ +/* + * $Id$ + * + * Author Jeffrey O. Hill + * johill@lanl.gov + * 505 665 1831 + * + * Experimental Physics and Industrial Control System (EPICS) + * + * Copyright 1991, the Regents of the University of California, + * and the University of Chicago Board of Governors. + * + * This software was produced under U.S. Government contracts: + * (W-7405-ENG-36) at the Los Alamos National Laboratory, + * and (W-31-109-ENG-38) at Argonne National Laboratory. + * + * Initial development by: + * The Controls and Automation Group (AT-8) + * Ground Test Accelerator + * Accelerator Technology Division + * Los Alamos National Laboratory + * + * Co-developed with + * The Controls and Computing Group + * Accelerator Systems Division + * Advanced Photon Source + * Argonne National Laboratory + * + * + * History + * $Log$ + * Revision 1.3 1996/09/04 20:13:16 jhill + * initialize new member - asyncIO + * + * Revision 1.2 1996/06/26 21:18:50 jhill + * now matches gdd api revisions + * + * Revision 1.1.1.1 1996/06/20 00:28:14 jhill + * ca server installation + * + * + */ + + +#include +#include // casAsyncIOI in line func +#include // casChannelI in line func +#include // casCtxI in line func + +// +// casAsyncRdIOI::casAsyncRdIOI() +// +casAsyncRdIOI::casAsyncRdIOI(const casCtx &ctx, casAsyncReadIO &ioIn) : + casAsyncIOI(*ctx.getClient(), ioIn), + msg(*ctx.getMsg()), + chan(*ctx.getChannel()), + pDD(NULL), + completionStatus(S_cas_internal) +{ + assert (&this->msg); + assert (&this->chan); + + if (this->msg.m_cmmd != CA_PROTO_READ && + this->msg.m_cmmd != CA_PROTO_READ_NOTIFY) { + this->reportInvalidAsynchIO(this->msg.m_cmmd); + } + + this->chan.installAsyncIO(*this); +} + +// +// casAsyncRdIOI::~casAsyncRdIOI() +// +casAsyncRdIOI::~casAsyncRdIOI() +{ + int gddStatus; + + this->lock(); + + this->chan.removeAsyncIO(*this); + + if (this->pDD) { + gddStatus = this->pDD->unreference(); + assert(!gddStatus); + } + + this->unlock(); +} + +// +// casAsyncRdIOI::postIOCompletion() +// +caStatus casAsyncRdIOI::postIOCompletion(caStatus completionStatusIn, + gdd &valueRead) +{ + int gddStatus; + + this->lock(); + this->pDD = &valueRead; + gddStatus = this->pDD->reference(); + this->unlock(); + + assert(!gddStatus); + + this->completionStatus = completionStatusIn; + return this->postIOCompletionI(); +} + +// +// casAsyncRdIOI::readOP() +// +int casAsyncRdIOI::readOP() +{ + return TRUE; // it is a read op +} + + +// +// casAsyncRdIOI::cbFuncAsyncIO() +// (called when IO completion event reaches top of event queue) +// +caStatus casAsyncRdIOI::cbFuncAsyncIO() +{ + caStatus status; + + if (this->msg.m_cmmd == CA_PROTO_READ) { + status = client.readResponse(&this->chan, this->msg, + this->pDD, this->completionStatus); + } + else if (this->msg.m_cmmd == CA_PROTO_READ_NOTIFY) { + status = client.readNotifyResponse(&this->chan, + this->msg, this->pDD, + this->completionStatus); + } + else { + status = S_cas_internal; + } + + return status; +} + diff --git a/src/cas/generic/casAsyncWtIOI.cc b/src/cas/generic/casAsyncWtIOI.cc new file mode 100644 index 000000000..d1a5a0b16 --- /dev/null +++ b/src/cas/generic/casAsyncWtIOI.cc @@ -0,0 +1,103 @@ +/* + * $Id$ + * + * Author Jeffrey O. Hill + * johill@lanl.gov + * 505 665 1831 + * + * Experimental Physics and Industrial Control System (EPICS) + * + * Copyright 1991, the Regents of the University of California, + * and the University of Chicago Board of Governors. + * + * This software was produced under U.S. Government contracts: + * (W-7405-ENG-36) at the Los Alamos National Laboratory, + * and (W-31-109-ENG-38) at Argonne National Laboratory. + * + * Initial development by: + * The Controls and Automation Group (AT-8) + * Ground Test Accelerator + * Accelerator Technology Division + * Los Alamos National Laboratory + * + * Co-developed with + * The Controls and Computing Group + * Accelerator Systems Division + * Advanced Photon Source + * Argonne National Laboratory + * + * + * History + * $Log$ + * + */ + + +#include +#include // casAsyncIOI in line func +#include // casChannelI in line func +#include // casCtx in line func + +// +// casAsyncWtIOI::casAsyncWtIOI() +// +casAsyncWtIOI::casAsyncWtIOI(const casCtx &ctx, casAsyncWriteIO &ioIn) : + casAsyncIOI(*ctx.getClient(), ioIn), + msg(*ctx.getMsg()), + chan(*ctx.getChannel()), + completionStatus(S_cas_internal) +{ + assert (&this->msg); + assert (&this->chan); + + if (this->msg.m_cmmd != CA_PROTO_WRITE && + this->msg.m_cmmd != CA_PROTO_WRITE_NOTIFY) { + this->reportInvalidAsynchIO(this->msg.m_cmmd); + } + + this->chan.installAsyncIO(*this); +} + +// +// casAsyncWtIOI::~casAsyncWtIOI() +// +casAsyncWtIOI::~casAsyncWtIOI() +{ + this->lock(); + this->chan.removeAsyncIO(*this); + this->unlock(); +} + +// +// casAsyncWtIOI::postIOCompletion() +// +caStatus casAsyncWtIOI::postIOCompletion(caStatus completionStatusIn) +{ + this->completionStatus = completionStatusIn; + return this->postIOCompletionI(); +} + + +// +// casAsyncWtIOI::cbFuncAsyncIO() +// (called when IO completion event reaches top of event queue) +// +caStatus casAsyncWtIOI::cbFuncAsyncIO() +{ + caStatus status; + + if (this->msg.m_cmmd == CA_PROTO_WRITE) { + status = client.writeResponse(&this->chan, this->msg, + this->completionStatus); + } + else if (this->msg.m_cmmd == CA_PROTO_WRITE_NOTIFY) { + status = client.writeNotifyResponse(&this->chan, + this->msg, this->completionStatus); + } + else { + status = S_cas_internal; + } + + return status; +} + diff --git a/src/cas/generic/casAsyncXXIO.cc b/src/cas/generic/casAsyncXXIO.cc new file mode 100644 index 000000000..2af260b30 --- /dev/null +++ b/src/cas/generic/casAsyncXXIO.cc @@ -0,0 +1,61 @@ +/* + * $Id$ + * + * Author Jeffrey O. Hill + * johill@lanl.gov + * 505 665 1831 + * + * Experimental Physics and Industrial Control System (EPICS) + * + * Copyright 1991, the Regents of the University of California, + * and the University of Chicago Board of Governors. + * + * This software was produced under U.S. Government contracts: + * (W-7405-ENG-36) at the Los Alamos National Laboratory, + * and (W-31-109-ENG-38) at Argonne National Laboratory. + * + * Initial development by: + * The Controls and Automation Group (AT-8) + * Ground Test Accelerator + * Accelerator Technology Division + * Los Alamos National Laboratory + * + * Co-developed with + * The Controls and Computing Group + * Accelerator Systems Division + * Advanced Photon Source + * Argonne National Laboratory + * + * + * History + * $Log$ + * + * + */ + +#include + +// +// This must be virtual so that derived destructor will +// be run indirectly. Therefore it cannot be inline. +// +casAsyncReadIO::~casAsyncReadIO() +{ +} + +// +// This must be virtual so that derived destructor will +// be run indirectly. Therefore it cannot be inline. +// +casAsyncWriteIO::~casAsyncWriteIO() +{ +} + +// +// This must be virtual so that derived destructor will +// be run indirectly. Therefore it cannot be inline. +// +casAsyncPVExistIO::~casAsyncPVExistIO() +{ +} + diff --git a/src/cas/generic/casCoreClientIL.h b/src/cas/generic/casCoreClientIL.h new file mode 100644 index 000000000..e6c5b062b --- /dev/null +++ b/src/cas/generic/casCoreClientIL.h @@ -0,0 +1,81 @@ +/* + * $Id$ + * + * Author Jeffrey O. Hill + * johill@lanl.gov + * 505 665 1831 + * + * Experimental Physics and Industrial Control System (EPICS) + * + * Copyright 1991, the Regents of the University of California, + * and the University of Chicago Board of Governors. + * + * This software was produced under U.S. Government contracts: + * (W-7405-ENG-36) at the Los Alamos National Laboratory, + * and (W-31-109-ENG-38) at Argonne National Laboratory. + * + * Initial development by: + * The Controls and Automation Group (AT-8) + * Ground Test Accelerator + * Accelerator Technology Division + * Los Alamos National Laboratory + * + * Co-developed with + * The Controls and Computing Group + * Accelerator Systems Division + * Advanced Photon Source + * Argonne National Laboratory + * + * + * History + * $Log$ + * + */ + + +#ifndef casCoreClientIL_h +#define casCoreClientIL_h + +#include "caServerIIL.h" // caServerI in line func +#include "casCtxIL.h" // casEventSys in line func + +// +// casCoreClient::getCAS() +// +inline caServerI &casCoreClient::getCAS() const +{ + return *this->ctx.getServer(); +} + +// +// casCoreClient::lookupRes() +// +inline casRes *casCoreClient::lookupRes(const caResId &idIn, casResType type) +{ + casRes *pRes; + pRes = this->ctx.getServer()->lookupRes(idIn, type); + return pRes; +} + +// +// casCoreClient::installAsyncIO() +// +inline void casCoreClient::installAsyncIO(casAsyncIOI &ioIn) +{ + this->osiLock(); + this->ioInProgList.add(ioIn); + this->osiUnlock(); +} + +// +// casCoreClient::removeAsyncIO() +// +inline void casCoreClient::removeAsyncIO(casAsyncIOI &ioIn) +{ + this->osiLock(); + this->ioInProgList.remove(ioIn); + this->osiUnlock(); +} + +#endif // casCoreClientIL_h + diff --git a/src/cas/generic/casCtx.cc b/src/cas/generic/casCtx.cc new file mode 100644 index 000000000..48660c59b --- /dev/null +++ b/src/cas/generic/casCtx.cc @@ -0,0 +1,19 @@ + +#include + +// +// casCtx::show() +// +void casCtx::show (unsigned level) const +{ + printf ("casCtx at %x\n", (unsigned) this); + if (level >= 1u) { + printf ("\tpMsg = %x\n", (unsigned) &this->msg); + printf ("\tpData = %x\n", (unsigned) pData); + printf ("\tpCAS = %x\n", (unsigned) pCAS); + printf ("\tpClient = %x\n", (unsigned) pClient); + printf ("\tpChannel = %x\n", (unsigned) pChannel); + printf ("\tpPV = %x\n", (unsigned) pPV); + } +} + diff --git a/src/cas/generic/casCtxIL.h b/src/cas/generic/casCtxIL.h new file mode 100644 index 000000000..7b360b092 --- /dev/null +++ b/src/cas/generic/casCtxIL.h @@ -0,0 +1,123 @@ + +#ifndef casCtxILh +#define casCtxILh + +// +// casCtx::casCtx() +// +inline casCtx::casCtx() : + pData(NULL), pCAS(NULL), pClient(NULL), + pChannel(NULL), pPV(NULL) +{ + memset(&this->msg, 0, sizeof(this->msg)); +} + +// +// casCtx::getMsg() +// +inline const caHdr *casCtx::getMsg() const +{ + return (const caHdr *) &this->msg; +} + +// +// casCtx::getData() +// +inline void *casCtx::getData() const +{ + return this->pData; +} + +// +// casCtx::getServer() +// +inline caServerI * casCtx::getServer() const +{ + return this->pCAS; +} + +// +// casCtx::getClient() +// +inline casCoreClient * casCtx::getClient() const +{ + return this->pClient; +} + +// +// casCtx::getPV() +// +inline casPVI * casCtx::getPV() const +{ + return this->pPV; +} + +// +// casCtx::getChannel() +// +inline casChannelI * casCtx::getChannel() const +{ + return this->pChannel; +} + +// +// casCtx::setMsg() +// (assumes incoming message is in network byte order) +// +inline void casCtx::setMsg(const char *pBuf) +{ + // + // copy as raw bytes in order to avoid + // alignment problems + // + memcpy (&this->msg, pBuf, sizeof(this->msg)); + this->msg.m_cmmd = ntohs (this->msg.m_cmmd); + this->msg.m_postsize = ntohs (this->msg.m_postsize); + this->msg.m_type = ntohs (this->msg.m_type); + this->msg.m_count = ntohs (this->msg.m_count); + this->msg.m_cid = ntohl (this->msg.m_cid); + this->msg.m_available = ntohl (this->msg.m_available); +} + +// +// casCtx::setData() +// +inline void casCtx::setData(void *p) +{ + this->pData = p; +} + +// +// casCtx::setServer() +// +inline void casCtx::setServer(caServerI *p) +{ + this->pCAS = p; +} + +// +// casCtx::setClient() +// +inline void casCtx::setClient(casCoreClient *p) +{ + this->pClient = p; +} + +// +// casCtx::setPV() +// +inline void casCtx::setPV(casPVI *p) +{ + this->pPV = p; +} + +// +// casCtx::setChannel() +// +inline void casCtx::setChannel(casChannelI *p) +{ + this->pChannel = p; +} + +#endif // casCtxILh + diff --git a/src/cas/generic/casOpaqueAddr.cc b/src/cas/generic/casOpaqueAddr.cc new file mode 100644 index 000000000..12ea0e60f --- /dev/null +++ b/src/cas/generic/casOpaqueAddr.cc @@ -0,0 +1,12 @@ + +#include +#include <> + + +casOpaqueAddr::checkSize sizeChecker; + +checkSize::checkSize() +{ + assert( sizeof(casOpaqueAddr::opaqueAddr) >= sizeof(caAddr)); +} + diff --git a/src/cas/generic/casOpaqueAddrIL.h b/src/cas/generic/casOpaqueAddrIL.h new file mode 100644 index 000000000..d20895f9a --- /dev/null +++ b/src/cas/generic/casOpaqueAddrIL.h @@ -0,0 +1,70 @@ + +#ifndef casOpaqueAddrILh +#define casOpaqueAddrILh + +// +// casOpaqueAddr::clear() +// +inline void casOpaqueAddr::clear() +{ + this->init = 0; +} + +// +// casOpaqueAddr::casOpaqueAddr() +// +inline casOpaqueAddr::casOpaqueAddr() +{ + this->clear(); +} + +// +// casOpaqueAddr::hasBeenInitialized() +// +inline int casOpaqueAddr::hasBeenInitialized() const +{ + return this->init; +} + +// +// casOpaqueAddr::set() +// +inline void casOpaqueAddr::set (const caAddr &addr) +{ + caAddr *p = (caAddr *) this->opaqueAddr; + + // + // see class casOpaqueAddr::checkSize + // for assert fail when + // sizeof(casOpaqueAddr::opaqueAddr) < sizeof(caAddr) + // + *p = addr; + this->init = TRUE; +} + +// +// casOpaqueAddr::set() +// +inline casOpaqueAddr::casOpaqueAddr (const caAddr &addr) +{ + this->set(addr); +} + +// +// casOpaqueAddr::get() +// +inline caAddr casOpaqueAddr::get () const +{ + caAddr *p = (caAddr *) this->opaqueAddr; + + assert(this->init); + // + // see class casOpaqueAddr::checkSize + // for assert fail when + // sizeof(casOpaqueAddr::opaqueAddr) < sizeof(caAddr) + // + return *p; +} + +#endif // casOpaqueAddrILh + diff --git a/src/cas/generic/dgInBuf.cc b/src/cas/generic/dgInBuf.cc new file mode 100644 index 000000000..c2596a960 --- /dev/null +++ b/src/cas/generic/dgInBuf.cc @@ -0,0 +1,28 @@ + +#include +#include + +// +// dgInBuf::~dgInBuf() +// (force virtual constructor) +// +dgInBuf::~dgInBuf() +{ +} + +// +// dgInBuf::xRecv () +// +xRecvStatus dgInBuf::xRecv (char *pBufIn, bufSizeT nBytesToRecv, + bufSizeT &nByesRecv) +{ + caAddr addr; + xRecvStatus stat; + + stat = this->xDGRecv (pBufIn, nBytesToRecv, nByesRecv, addr); + if (stat == xRecvOK) { + this->from.set(addr); + } + return stat; +} + diff --git a/src/cas/generic/dgInBufIL.h b/src/cas/generic/dgInBufIL.h new file mode 100644 index 000000000..c5897daae --- /dev/null +++ b/src/cas/generic/dgInBufIL.h @@ -0,0 +1,42 @@ + +#ifndef dgInBufILh +#define dgInBufILh + +#include +#include + +// +// dgInBuf::dgInBuf() +// +inline dgInBuf::dgInBuf (osiMutex &mutexIn, unsigned bufSizeIn) : + inBuf(mutexIn, bufSizeIn) +{ +} + +// +// dgInBuf::getSender() +// +inline const caAddr dgInBuf::getSender() const +{ + return this->from.get(); +} + +// +// dgInBuf::hasAddress() +// +inline int dgInBuf::hasAddress() const +{ + return this->from.hasBeenInitialized(); +} + +// +// dgInBuf::clear() +// +inline void dgInBuf::clear() +{ + this->from.clear(); + this->inBuf::clear(); +} + +#endif // dgInBufILh + diff --git a/src/cas/generic/dgOutBuf.cc b/src/cas/generic/dgOutBuf.cc new file mode 100644 index 000000000..2fd4bc15b --- /dev/null +++ b/src/cas/generic/dgOutBuf.cc @@ -0,0 +1,24 @@ + +#include +#include + +// +// dgOutBuf::~dgOutBuf() +// (force virtual destructor) +// +dgOutBuf::~dgOutBuf() +{ +} + +// +// dgOutBuf::xSend() +// +xSendStatus dgOutBuf::xSend (char *pBufIn, + bufSizeT nBytesAvailableToSend, bufSizeT nBytesNeedToBeSent, + bufSizeT &nBytesSent) +{ + assert(nBytesAvailableToSend>=nBytesNeedToBeSent); + return xDGSend(pBufIn, nBytesAvailableToSend, nBytesSent, + this->to.get()); +} + diff --git a/src/cas/generic/dgOutBufIL.h b/src/cas/generic/dgOutBufIL.h new file mode 100644 index 000000000..70e326078 --- /dev/null +++ b/src/cas/generic/dgOutBufIL.h @@ -0,0 +1,42 @@ + +#ifndef dgOutBufILh +#define dgOutBufILh + +#include +#include + +// +// dgOutBuf::dgOutBuf() +// +inline dgOutBuf::dgOutBuf (osiMutex &mutexIn, unsigned bufSizeIn) : + outBuf(mutexIn, bufSizeIn) +{ +} + +// +// dgOutBuf::getRecipient() +// +inline caAddr dgOutBuf::getRecipient() +{ + return this->to.get(); +} + +// +// dgOutBuf::setRecipient() +// +inline void dgOutBuf::setRecipient(const caAddr &addr) +{ + this->to.set(addr); +} + +// +// dgOutBuf::clear() +// +inline void dgOutBuf::clear() +{ + this->to.clear(); + this->outBuf::clear(); +} + +#endif // dgOutBufILh + diff --git a/src/cas/generic/inBufIL.h b/src/cas/generic/inBufIL.h new file mode 100644 index 000000000..3680bcd2e --- /dev/null +++ b/src/cas/generic/inBufIL.h @@ -0,0 +1,88 @@ + +#ifndef inBufILh +#define inBufILh + + +// +// inBuf::inBuf() +// +inline inBuf::inBuf(osiMutex &mutexIn, unsigned bufSizeIn) : + mutex(mutexIn), + pBuf(NULL), + bufSize(bufSizeIn), + bytesInBuffer(0u), + nextReadIndex(0u) +{ +} + +// +// inBuf::init() +// +inline caStatus inBuf::init() +{ + this->pBuf = new char [this->bufSize]; + if (!this->pBuf) { + this->bufSize = 0u; + return S_cas_noMemory; + } + return S_cas_success; +} + +// +// inBuf::bytesPresent() +// +inline bufSizeT inBuf::bytesPresent() const +{ + return this->bytesInBuffer-this->nextReadIndex; +} + +// +// inBuf::bytesAvailable() +// +inline bufSizeT inBuf::bytesAvailable() const +{ + bufSizeT bp; + bp = this->bytesPresent(); + bp += this->incommingBytesPresent(); + return bp; +} + +// +// inBuf::full() +// +inline aitBool inBuf::full() const +{ + if (this->bytesPresent()>=this->bufSize) { + return aitTrue; + } + return aitFalse; +} + +// +// inBuf::clear() +// +inline void inBuf::clear() +{ + this->bytesInBuffer = 0u; + this->nextReadIndex = 0u; +} + +// +// inBuf::msgPtr() +// +inline char *inBuf::msgPtr() const +{ + return &this->pBuf[this->nextReadIndex]; +} + +// +// inBuf::removeMsg() +// +inline void inBuf::removeMsg(unsigned nBytes) +{ + this->nextReadIndex += nBytes; + assert(this->nextReadIndex<=this->bytesInBuffer); +} + +#endif // inBufILh + diff --git a/src/cas/generic/mt/README b/src/cas/generic/mt/README new file mode 100644 index 000000000..73d5be3dd --- /dev/null +++ b/src/cas/generic/mt/README @@ -0,0 +1,5 @@ + +This directory contains files specific to the multi-threaded +version of the CA server + +- diff --git a/src/cas/generic/mt/ioBlocked.cc b/src/cas/generic/mt/ioBlocked.cc new file mode 100644 index 000000000..ef1113ef6 --- /dev/null +++ b/src/cas/generic/mt/ioBlocked.cc @@ -0,0 +1,75 @@ +// +// $Id$ +// Author: Jeff Hill +// This file implements a IO blocked list NOOP for multi-threaded systems +// +// $Log$ +// + +#include +#include + + +// +// ioBlocked::~ioBlocked() +// +ioBlocked::ioBlocked() : + pList(NULL) +{ +} + +// +// ioBlocked::~ioBlocked() +// +ioBlocked::~ioBlocked() +{ +} + +// +// ioBlocked::ioBlockedSignal() +// +void ioBlocked::ioBlockedSignal() +{ + printf("in virtual base ioBlocked::ioBlockedSignal() ?\n"); +} + +// +// ioBlockedList::ioBlockedList () +// +ioBlockedList::ioBlockedList () +{ +} + +// +// ioBlockedList::~ioBlockedList () +// (NOOP on MT system) +// +ioBlockedList::~ioBlockedList () +{ +} + +// +// ioBlockedList::signal() +// (NOOP on MT system) +// +void ioBlockedList::signal() +{ +} + +// +// ioBlockedList::removeItemFromIOBLockedList() +// (NOOP on MT system) +// +void ioBlockedList::removeItemFromIOBLockedList(ioBlocked &) +{ +} + +// +// ioBlockedList::addItemToIOBLockedList() +// (NOOP on MT system) +// +void ioBlockedList::addItemToIOBLockedList(ioBlocked &) +{ +} + + diff --git a/src/cas/generic/outBufIL.h b/src/cas/generic/outBufIL.h new file mode 100644 index 000000000..1ab94ebe7 --- /dev/null +++ b/src/cas/generic/outBufIL.h @@ -0,0 +1,32 @@ + +#ifndef outBufILh +#define outBufILh + +// +// outBuf::bytesPresent() +// number of bytes in the output queue +// +inline bufSizeT outBuf::bytesPresent() const +{ + return this->stack; +} + +// +// outBuf::bytesFree() +// number of bytes unused in the output queue +// +inline bufSizeT outBuf::bytesFree() const +{ + return this->bufSize - this->stack; +} + +// +// outBuf::clear() +// +inline void outBuf::clear() +{ + this->stack = 0u; +} + +#endif // outBufILh + diff --git a/src/cas/generic/st/README b/src/cas/generic/st/README new file mode 100644 index 000000000..f5851b1f6 --- /dev/null +++ b/src/cas/generic/st/README @@ -0,0 +1,5 @@ + +This directory contains files specific to the single-threaded +version of the CA server + +- diff --git a/src/cas/generic/st/caServerOS.cc b/src/cas/generic/st/caServerOS.cc new file mode 100644 index 000000000..dad710f98 --- /dev/null +++ b/src/cas/generic/st/caServerOS.cc @@ -0,0 +1,105 @@ + +/* + * + * caServerOS.c + * $Id$ + * + * + * $Log$ + * Revision 1.2 1996/08/05 19:28:49 jhill + * space became tab + * + * Revision 1.1.1.1 1996/06/20 00:28:06 jhill + * ca server installation + * + * + */ + + +// +// CA server +// +#include + +// +// casBeaconTimer +// +class casBeaconTimer : public osiTimer { +public: + casBeaconTimer (const osiTime &delay, caServerOS &osIn) : + osiTimer(delay), os (osIn) {} + void expire(); + const osiTime delay() const; + osiBool again() const; + const char *name() const; +private: + caServerOS &os; + +}; + +// +// aServerOS::operator -> () +// +inline caServerI * caServerOS::operator -> () +{ + return &this->cas; +} + +// +// casBeaconTimer::expire() +// +void casBeaconTimer::expire() +{ + os->sendBeacon (); +} + +// +// casBeaconTimer::again() +// +osiBool casBeaconTimer::again() const +{ + return osiTrue; +} + +// +// casBeaconTimer::delay() +// +const osiTime casBeaconTimer::delay() const +{ + return os->getBeaconPeriod(); +} + +// +// casBeaconTimer::name() +// +const char *casBeaconTimer::name() const +{ + return "casBeaconTimer"; +} + + +// +// caServerOS::init() +// +caStatus caServerOS::init() +{ + this->pBTmr = new casBeaconTimer((*this)->getBeaconPeriod(), *this); + if (!this->pBTmr) { + ca_printf("CAS: Unable to start server beacon\n"); + return S_cas_noMemory; + } + + return S_cas_success; +} + + +// +// caServerOS::~caServerOS() +// +caServerOS::~caServerOS() +{ + if (this->pBTmr) { + delete this->pBTmr; + } +} + diff --git a/src/cas/generic/st/casDGIntfOS.cc b/src/cas/generic/st/casDGIntfOS.cc new file mode 100644 index 000000000..8073efd52 --- /dev/null +++ b/src/cas/generic/st/casDGIntfOS.cc @@ -0,0 +1,102 @@ + +/* + * + * casDGIntfOS.cc + * $Id$ + * + * + */ + +// +// CA server +// +#include +#include // IO Depen in line func + +// +// casDGReadReg +// +class casDGReadReg : public fdReg { +public: + casDGReadReg (casDGIntfOS &osIn) : + os (osIn), fdReg (osIn.getFD(), fdrRead) {} + ~casDGReadReg (); + + void show (unsigned level) const; +private: + casDGIntfOS &os; + + void callBack (); +}; + + +// +// casDGIntfOS::casDGIntfOS() +// +casDGIntfOS::casDGIntfOS(casDGClient &clientIn) : + casDGIntfIO(clientIn), + pRdReg(NULL) +{ +} + + +// +// casDGIntfOS::~casDGIntfOS() +// +casDGIntfOS::~casDGIntfOS() +{ + if (this->pRdReg) { + delete this->pRdReg; + } +} + +// +// casDGIntfOS::show() +// +void casDGIntfOS::show(unsigned level) const +{ + printf ("casDGIntfOS at %x\n", (unsigned) this); + if (this->pRdReg) { + this->pRdReg->show(level); + } +} + + +/* + * casDGIntfOS::start() + */ +caStatus casDGIntfOS::start() +{ + this->pRdReg = new casDGReadReg (*this); + if (!this->pRdReg) { + return S_cas_noMemory; + } + return S_cas_success; +} + +// +// casDGReadReg::callBack() +// +void casDGReadReg::callBack() +{ + assert (os.pRdReg); + os.processDG(); +} + +// +// casDGReadReg::~casDGReadReg() +// +casDGReadReg::~casDGReadReg() +{ + this->os.pRdReg = NULL; +} + +// +// casDGReadReg::show() +// +void casDGReadReg::show(unsigned level) const +{ + this->fdReg::show(level); + printf("casDGReadReg at %x\n", (unsigned) this); +} + diff --git a/src/cas/generic/st/casDGOS.cc b/src/cas/generic/st/casDGOS.cc new file mode 100644 index 000000000..b04c0fd48 --- /dev/null +++ b/src/cas/generic/st/casDGOS.cc @@ -0,0 +1,156 @@ + +/* + * + * casDGOS.cc + * $Id$ + * + * + * $Log$ + * Revision 1.3 1996/09/16 18:27:50 jhill + * vxWorks port changes + * + * Revision 1.2 1996/08/05 19:29:25 jhill + * os depen code now smaller + * + * Revision 1.1.1.1 1996/06/20 00:28:06 jhill + * ca server installation + * + * + */ + +// +// CA server +// +#include +#include + +class casDGEvWakeup : public osiTimer { +public: + casDGEvWakeup(casDGOS &osIn) : + osiTimer(osiTime(0.0)), os(osIn) {} + ~casDGEvWakeup(); + void expire(); + void show (unsigned level) const; + + const char *name() const; +private: + casDGOS &os; +}; + +// +// casDGEvWakeup::~casDGEvWakeup() +// +casDGEvWakeup::~casDGEvWakeup() +{ + os.pEvWk = NULL; +} + +// +// casDGEvWakeup::name() +// +const char *casDGEvWakeup::name() const +{ + return "casDGEvWakeup"; +} + +// +// casDGEvWakeup::show() +// +void casDGEvWakeup::show(unsigned level) const +{ + this->osiTimer::show(level); + printf("casDGEvWakeup at %x\n", (unsigned) this); +} + +// +// casDGEvWakeup::expire() +// +void casDGEvWakeup::expire() +{ + casProcCond cond; + cond = this->os.eventSysProcess(); + if (cond != casProcOk) { + // + // if "this" is being used above this + // routine on the stack then problems + // will result if we delete "this" here + // + // delete &this->os; + ca_printf("DG event sys process failed\n"); + } +} + +// +// casDGOS::eventSignal() +// +void casDGOS::eventSignal() +{ + if (!this->pEvWk) { + this->pEvWk = new casDGEvWakeup(*this); + if (!this->pEvWk) { + errMessage(S_cas_noMemory, + "casDGOS::eventSignal()"); + } + } +} + +// +// casDGOS::eventFlush() +// +void casDGOS::eventFlush() +{ + this->flush(); +} + + +// +// casDGOS::casDGOS() +// +casDGOS::casDGOS(caServerI &cas) : + casDGIO(cas), + pEvWk(NULL) +{ +} + + +// +// casDGOS::~casDGOS() +// +casDGOS::~casDGOS() +{ + if (this->pEvWk) { + delete this->pEvWk; + } +} + +// +// casDGOS::show() +// +void casDGOS::show(unsigned level) const +{ + printf ("casDGOS at %lx\n", (unsigned long) this); + this->casDGClient::show(level); + if (this->pEvWk) { + this->pEvWk->show(level); + } +} + + +// +// casDGOS::processInput () +// - a noop +// +casProcCond casDGOS::processInput () +{ + return casProcOk; +} + +// +// casDGOS::start() +// - a noop +// +caStatus casDGOS::start() +{ + return S_cas_success; +} + diff --git a/src/cas/generic/st/casIntfOS.cc b/src/cas/generic/st/casIntfOS.cc new file mode 100644 index 000000000..604bdcbcd --- /dev/null +++ b/src/cas/generic/st/casIntfOS.cc @@ -0,0 +1,91 @@ + +/* + * + * casIntfOS.cc + * $Id$ + * + * + */ + + +// +// CA server +// +#include + +// +// casServerReg +// +class casServerReg : public fdReg { +public: + casServerReg (casIntfOS &osIn) : + os (osIn), fdReg (osIn.getFD(), fdrRead) {} + ~casServerReg (); +private: + casIntfOS &os; + + void callBack (); +}; + + +// +// casIntfOS::init() +// +caStatus casIntfOS::init(const caAddr &addrIn, casDGClient &dgClientIn, + int autoBeaconAddr, int addConfigBeaconAddr) +{ + caStatus stat; + + stat = this->casIntfIO::init(addrIn, dgClientIn, + autoBeaconAddr, addConfigBeaconAddr); + if (stat) { + return stat; + } + + this->setNonBlocking(); + + this->pRdReg = new casServerReg(*this); + if (!this->pRdReg) { + return S_cas_noMemory; + } + + return S_cas_success; +} + + +// +// casIntfOS::~casIntfOS() +// +casIntfOS::~casIntfOS() +{ + if (this->pRdReg) { + delete this->pRdReg; + } +} + +// +// casIntfOS::newDGIntfIO() +// +casDGIntfIO *casIntfOS::newDGIntfIO(casDGClient &dgClientIn) const +{ + return new casDGIntfOS(dgClientIn); +} + + +// +// casServerReg::callBack() +// +void casServerReg::callBack() +{ + assert(this->os.pRdReg); + this->os.cas.connectCB(this->os); +} + +// +// casServerReg::~casServerReg() +// +casServerReg::~casServerReg() +{ + this->os.pRdReg = NULL; +} + diff --git a/src/cas/generic/st/casOSD.h b/src/cas/generic/st/casOSD.h new file mode 100644 index 000000000..939ad9e3d --- /dev/null +++ b/src/cas/generic/st/casOSD.h @@ -0,0 +1,176 @@ +// +// $Id$ +// +// casOSD.h - Channel Access Server OS Dependent for posix +// +// +// Some BSD calls have crept in here +// +// $Log$ +// Revision 1.3 1996/09/04 22:04:07 jhill +// moved netdb.h include here +// +// Revision 1.2 1996/08/13 22:58:15 jhill +// fdMgr.h => fdmanager.h +// +// Revision 1.1.1.1 1996/06/20 00:28:06 jhill +// ca server installation +// +// + +#ifndef includeCASOSDH +#define includeCASOSDH + +#include +#include + +class caServerI; +class caServerOS; + +class casServerReg; +class casBeaconTimer; + +// +// caServerOS +// +class caServerOS { + friend class casServerReg; +public: + caServerOS (caServerI &casIn) : + cas (casIn), pBTmr (NULL) {} + caStatus init (); + ~caServerOS (); + + caStatus start (); + + inline caServerI * operator -> (); +private: + caServerI &cas; + casBeaconTimer *pBTmr; +}; + +class casDGClient; +class casDGReadReg; + +// +// casDGIntfOS +// +class casDGIntfOS : public casDGIntfIO { + friend class casDGReadReg; +public: + casDGIntfOS(casDGClient &client); + virtual ~casDGIntfOS(); + + caStatus start(); + + void show(unsigned level) const; + + void recvCB(); + void sendCB(); +private: + casDGReadReg *pRdReg; +}; + +// +// casIntfOS +// +class casIntfOS : public casIntfIO, public tsDLNode +{ + friend class casServerReg; +public: + casIntfOS (caServerI &casIn) : + cas (casIn), pRdReg (NULL) {} + caStatus init(const caAddr &addr, casDGClient &dgClientIn, + int autoBeaconAddr, int addConfigBeaconAddr); + virtual ~casIntfOS(); + + void recvCB (); + void sendCB () {}; // NOOP satifies template + + casDGIntfIO *newDGIntfIO (casDGClient &dgClientIn) const; +private: + caServerI &cas; + casServerReg *pRdReg; +}; + +class casStreamWriteReg; +class casStreamReadReg; +class casStreamEvWakeup; +class casStreamIOWakeup; + +// +// casStreamOS +// +class casStreamOS : public casStreamIO { + friend class casStreamReadReg; + friend class casStreamWriteReg; + friend class casStreamEvWakeup; + friend class casStreamIOWakeup; +public: + casStreamOS(caServerI &, const ioArgsToNewStreamIO &ioArgs); + caStatus init(); + ~casStreamOS(); + + // + // process any incomming messages + // + casProcCond processInput(); + caStatus start(); + + void recvCB(); + void sendCB(); + + void sendBlockSignal(); + + void ioBlockedSignal(); + + void eventSignal(); + void eventFlush(); + + void show (unsigned level) const; +private: + casStreamWriteReg *pWtReg; + casStreamReadReg *pRdReg; + casStreamEvWakeup *pEvWk; + casStreamIOWakeup *pIOWk; + unsigned sendBlocked:1; + // + // + // + inline void armSend (); + inline void armRecv (); + inline void disarmSend(); + inline void disarmRecv(); +}; + +class casDGEvWakeup; + +// +// casDGOS +// +class casDGOS : public casDGIO { + friend class casDGEvWakeup; +public: + casDGOS(caServerI &cas); + ~casDGOS(); + + // + // process any incomming messages + // + casProcCond processInput(); + + void sendBlockSignal() {} + + void eventSignal(); + void eventFlush(); + + void show(unsigned level) const; + + caStatus start(); +private: + casDGEvWakeup *pEvWk; +}; + +// no additions below this line +#endif // includeCASOSDH + diff --git a/src/cas/generic/st/casStreamOS.cc b/src/cas/generic/st/casStreamOS.cc new file mode 100644 index 000000000..74ee38068 --- /dev/null +++ b/src/cas/generic/st/casStreamOS.cc @@ -0,0 +1,602 @@ +// +// casStreamOS.cc +// $Id$ +// +// +// $Log$ +// Revision 1.1.1.1 1996/06/20 00:28:06 jhill +// ca server installation +// +// +// +// TO DO: +// o armRecv() and armSend() should return bad status when +// there isnt enough memory +// + +// +// CA server +// +#include +#include // casClient inline func +#include // inBuf inline func +#include // outBuf inline func + +// +// casStreamReadReg +// +class casStreamReadReg : public fdReg { +public: + inline casStreamReadReg (casStreamOS &osIn); + inline ~casStreamReadReg (); + + void show (unsigned level) const; +private: + casStreamOS &os; + + void callBack (); +}; + +// +// casStreamReadReg::casStreamReadReg() +// +inline casStreamReadReg::casStreamReadReg (casStreamOS &osIn) : + os (osIn), fdReg (osIn.getFD(), fdrRead) +{ +# if defined(DEBUG) + printf ("Read on %d\n", this->os.getFD()); + printf ("Recv backlog %u\n", + this->os.inBuf::bytesPresent()); + printf ("Send backlog %u\n", + this->os.outBuf::bytesPresent()); +# endif +} + +// +// casStreamReadReg::~casStreamReadReg +// +inline casStreamReadReg::~casStreamReadReg () +{ + this->os.pRdReg = NULL; +# if defined(DEBUG) + printf ("Read off %d\n", this->os.getFD()); + printf ("Recv backlog %u\n", + this->os.inBuf::bytesPresent()); + printf ("Send backlog %u\n", + this->os.outBuf::bytesPresent()); +# endif +} + +// +// casStreamWriteReg +// +class casStreamWriteReg : public fdReg { +public: + inline casStreamWriteReg (casStreamOS &osIn); + inline ~casStreamWriteReg (); + + void show (unsigned level) const; +private: + casStreamOS &os; + + void callBack (); +}; + +// +// casStreamWriteReg::casStreamWriteReg() +// +inline casStreamWriteReg::casStreamWriteReg (casStreamOS &osIn) : + os (osIn), fdReg (osIn.getFD(), fdrWrite, TRUE) +{ +# if defined(DEBUG) + printf ("Write on %d\n", this->os.getFD()); + printf ("Recv backlog %u\n", + this->os.inBuf::bytesPresent()); + printf ("Send backlog %u\n", + this->os.outBuf::bytesPresent()); +# endif +} + +// +// casStreamWriteReg::~casStreamWriteReg () +// +inline casStreamWriteReg::~casStreamWriteReg () +{ + this->os.pWtReg = NULL; +# if defined(DEBUG) + printf ("Write off %d\n", this->os.getFD()); + printf ("Recv backlog %u\n", + this->os.inBuf::bytesPresent()); + printf ("Send backlog %u\n", + this->os.outBuf::bytesPresent()); +# endif +} + +// +// class casStreamEvWakeup +// +class casStreamEvWakeup : public osiTimer { +public: + casStreamEvWakeup(casStreamOS &osIn) : + osiTimer(osiTime(0.0)), os(osIn) {} + ~casStreamEvWakeup(); + + void expire(); + + void show(unsigned level) const; + + const char *name() const; +private: + casStreamOS &os; +}; + +// +// casStreamEvWakeup::~casStreamEvWakeup() +// +casStreamEvWakeup::~casStreamEvWakeup() +{ + this->os.pEvWk = NULL; +} + +// +// casStreamEvWakeup::name() +// +const char *casStreamEvWakeup::name() const +{ + return "casStreamEvWakeup"; +} + +// +// casStreamEvWakeup::show() +// +void casStreamEvWakeup::show(unsigned level) const +{ + printf ("casStreamEvWakeup at %x {\n", (unsigned) this); + this->osiTimer::show(level); + printf ("}\n"); +} + +// +// casStreamEvWakeup::expire() +// +void casStreamEvWakeup::expire() +{ + casProcCond cond; + cond = this->os.casEventSys::process(); + if (cond != casProcOk) { + // + // if "this" is being used above this + // routine on the stack then problems + // will result if we delete "this" here + // + // delete &this->os; + ca_printf("strm event sys process failed\n"); + } +} + +// +// class casStreamIOWakeup +// +class casStreamIOWakeup : public osiTimer { +public: + casStreamIOWakeup(casStreamOS &osIn) : + osiTimer(osiTime(0.0)), os(osIn) {} + ~casStreamIOWakeup(); + + void expire(); + + void show(unsigned level) const; + + const char *name() const; +private: + casStreamOS &os; +}; + +// +// casStreamIOWakeup::~casStreamIOWakeup() +// +casStreamIOWakeup::~casStreamIOWakeup() +{ + this->os.pIOWk = NULL; +} + +// +// casStreamIOWakeup::name() +// +const char *casStreamIOWakeup::name() const +{ + return "casStreamIOWakeup"; +} + +// +// casStreamIOWakeup::show() +// +void casStreamIOWakeup::show(unsigned level) const +{ + printf ("casStreamIOWakeup at %x {\n", (unsigned) this); + this->osiTimer::show(level); + printf ("}\n"); +} + +// +// casStreamOS::armRecv () +// +inline void casStreamOS::armRecv() +{ + if (!this->pRdReg) { + if (this->inBuf::full()!=aitTrue) { + this->pRdReg = new casStreamReadReg(*this); + if (!this->pRdReg) { + errMessage(S_cas_noMemory, "armRecv()"); + } + } + } +} + +// +// casStreamIOWakeup::expire() +// +// Running this indirectly off of the timer queue +// guarantees that we will not call processInput() +// recursively +// +void casStreamIOWakeup::expire() +{ + // + // in case there is something in the input buffer + // and currently nothing to be read from TCP + // + this->os.processInput(); + + // + // in case recv is not armed, there is space in + // the input buffer, and there eventually will + // be something to read from TCP + // + this->os.armRecv(); +} + +// +// casStreamOS::disarmRecv() +// +inline void casStreamOS::disarmRecv() +{ + if (this->pRdReg) { + delete this->pRdReg; + } +} + +// +// casStreamOS::armSend() +// +inline void casStreamOS::armSend() +{ + if (this->outBuf::bytesPresent()==0u) { + return; + } + + if (!this->pWtReg) { + this->pWtReg = new casStreamWriteReg(*this); + if (!this->pWtReg) { + errMessage(S_cas_noMemory, "armSend() failed"); + } + + } +} + +// +// casStreamOS::disarmSend() +// +inline void casStreamOS::disarmSend () +{ + if (this->pWtReg) { + delete this->pWtReg; + } +} + +// +// casStreamOS::ioBlockedSignal() +// +void casStreamOS::ioBlockedSignal() +{ + if (!this->pIOWk) { + this->pIOWk = new casStreamIOWakeup(*this); + if (!this->pIOWk) { + errMessage(S_cas_noMemory, + "casStreamOS::ioBlockedSignal()"); + } + } +} + +// +// casStreamOS::eventSignal() +// +void casStreamOS::eventSignal() +{ + if (!this->pEvWk) { + this->pEvWk = new casStreamEvWakeup(*this); + if (!this->pEvWk) { + errMessage(S_cas_noMemory, + "casStreamOS::eventSignal()"); + } + } +} + +// +// casStreamOS::eventFlush() +// +void casStreamOS::eventFlush() +{ + this->armSend(); +} + + +// +// casStreamOS::casStreamOS() +// +casStreamOS::casStreamOS(caServerI &cas, const ioArgsToNewStreamIO &ioArgs) : + casStreamIO(cas, ioArgs), + pWtReg(NULL), + pRdReg(NULL), + pEvWk(NULL), + pIOWk(NULL), + sendBlocked(FALSE) +{ +} + +// +// casStreamOS::init() +// +caStatus casStreamOS::init() +{ + caStatus status; + + // + // init the base classes + // + status = this->casStreamIO::init(); + if (status) { + return status; + } + + this->xSetNonBlocking(); + + return S_cas_success; +} + + +// +// casStreamOS::~casStreamOS() +// +casStreamOS::~casStreamOS() +{ + // + // attempt to flush out any remaining messages + // + this->flush(); + + this->disarmSend(); + this->disarmRecv(); + + if (this->pEvWk) { + delete this->pEvWk; + } + + if (this->pIOWk) { + delete this->pIOWk; + } +} + +// +// casStreamOS::show() +// +void casStreamOS::show(unsigned level) const +{ + this->casStrmClient::show(level); + printf("casStreamOS at %x\n", (unsigned) this); + printf("\tsendBlocked = %d\n", this->sendBlocked); + if (this->pWtReg) { + this->pWtReg->show(level); + } + if (this->pRdReg) { + this->pRdReg->show(level); + } + if (this->pEvWk) { + this->pEvWk->show(level); + } +} + + +// +// casClientStart () +// +caStatus casStreamOS::start() +{ + this->armRecv(); + return S_cas_success; +} + + +// +// casStreamReadReg::show() +// +void casStreamReadReg::show(unsigned level) const +{ + this->fdReg::show(level); + printf ("casStreamReadReg at %x\n", (unsigned) this); +} + +// +// casStreamReadReg::callBack () +// +void casStreamReadReg::callBack () +{ + casFillCondition fillCond; + casProcCond procCond; + + assert (this->os.pRdReg); + + // + // copy in new messages + // + fillCond = os.fill(); + procCond = os.processInput(); + if (fillCond == casFillDisconnect || + procCond == casProcDisconnect) { + delete &this->os; + } + else if (os.inBuf::full()==aitTrue) { + // + // If there isnt any space then temporarily + // stop calling this routine until problem is resolved + // either by: + // (1) sending or + // (2) a blocked IO op unblocks + // + // (casStreamReadReg is _not_ a onceOnly fdReg - + // therefore an explicit delete is required here) + // + delete this; + } + // + // NO CODE HERE + // (see deletes above) + // +} + + +// +// casStreamOS::sendBlockSignal() +// +void casStreamOS::sendBlockSignal() +{ + this->sendBlocked=TRUE; + this->armSend(); +} + + + +// +// casStreamWriteReg::show() +// +void casStreamWriteReg::show(unsigned level) const +{ + this->fdReg::show (level); + printf ("casStreamWriteReg at %x\n", (unsigned) this); +} + + +// +// casStreamWriteReg::callBack() +// +void casStreamWriteReg::callBack() +{ + casFlushCondition flushCond; + casProcCond procCond; + + assert (os.pWtReg); + + // + // attempt to flush the output buffer + // + flushCond = os.flush(); + switch (flushCond) { + case casFlushCompleted: + case casFlushPartial: + if (os.sendBlocked) { + os.sendBlocked = FALSE; + } + break; + case casFlushNone: + break; + case casFlushDisconnect: + return; + break; + default: + assert(0); + } + + // + // If we are unable to flush out all of the events + // in casStreamEvWakeup::expire() because the + // client is slow then we must check again here when + // we _are_ able to write to see if additional events + // can be sent to the slow client. + // + procCond = this->os.casEventSys::process(); + if (procCond != casProcOk) { + ca_printf("strm event sys process failed\n"); + } + +# if defined(DEBUG) + printf ("write attempted on %d result was %d\n", + os.getFD(), flushCond); + printf ("Recv backlog %u\n", os.inBuf::bytesPresent()); + printf ("Send backlog %u\n", os.outBuf::bytesPresent()); +# endif + + // + // If we were able to send something then we need + // to process the input queue in case we were send + // blocked. + // + procCond = this->os.processInput(); + if (procCond == casProcDisconnect) { + delete &this->os; + } + else { + // + // anything left in the send buffer that + // still needs to be sent ? + // (once this starts sending it doesnt stop until + // the outgoing buf is empty) + // + if (flushCond!=casFlushCompleted) { + casStreamOS *pStrmOS = &this->os; + // + // force the delete now so that the + // arm will work + // + delete this; + pStrmOS->armSend(); + } + } + // + // NO CODE HERE + // (see deletes above) + // +} + +// +// casStreamOS::processInput() +// +casProcCond casStreamOS::processInput() +{ + caStatus status; + +# ifdef DEBUG + printf( + "Resp bytes to send=%d, Req bytes pending %d\n", + this->outBuf::bytesPresent(), + this->inBuf::bytesPresent()); +# endif + + status = this->processMsg(); + switch (status) { + case S_cas_sendBlocked: + case S_cas_partialMessage: + case S_cas_ioBlocked: + case S_cas_success: + if (this->inBuf::bytesAvailable()==0u) { + this->armSend (); + } + this->armRecv(); + return casProcOk; + break; + default: + errMessage (status, + "unexpected error processing client's input"); + return casProcDisconnect; + } +} + diff --git a/src/cas/generic/st/ioBlocked.cc b/src/cas/generic/st/ioBlocked.cc new file mode 100644 index 000000000..015ff7c27 --- /dev/null +++ b/src/cas/generic/st/ioBlocked.cc @@ -0,0 +1,103 @@ + +// +// $Id$ +// Author Jeff Hill +// +// IO Blocked list class +// (for single threaded version of the server) +// +// $Log$ +// + +#include + +#include +#include + + +// +// ioBlocked::~ioBlocked() +// +ioBlocked::ioBlocked() : + pList(NULL) +{ +} + +// +// ioBlocked::~ioBlocked() +// +ioBlocked::~ioBlocked() +{ +} + +// +// ioBlocked::ioBlockedSignal() +// +void ioBlocked::ioBlockedSignal() +{ + fprintf(stderr, "in virtual base ioBlocked::ioBlockedSignal() ?\n"); +} + +// +// ioBlockedList::ioBlockedList () +// +ioBlockedList::ioBlockedList () +{ +} + +// +// ioBlockedList::~ioBlockedList () +// +ioBlockedList::~ioBlockedList () +{ + ioBlocked *pB; + + while ( (pB = this->get ()) ) { + pB->pList = NULL; + } +} + +// +// ioBlockedList::signal() +// +// works from a temporary list to avoid problems +// where the virtual function adds items to the +// list +// +void ioBlockedList::signal() +{ + tsDLList tmp(*this); + ioBlocked *pB; + + while ( (pB = tmp.get ()) ) { + pB->pList = NULL; + pB->ioBlockedSignal (); + } +} + +// +// ioBlockedList::removeItemFromIOBLockedList() +// +void ioBlockedList::removeItemFromIOBLockedList(ioBlocked &item) +{ + if (item.pList==this) { + this->remove(item); + item.pList = NULL; + } +} + +// +// ioBlockedList::addItemToIOBLockedList() +// +void ioBlockedList::addItemToIOBLockedList(ioBlocked &item) +{ + if (item.pList==NULL) { + this->add(item); + item.pList = this; + } + else { + assert(item.pList == this); + } +} + + diff --git a/src/cas/generic/st/osiMutexCAS.h b/src/cas/generic/st/osiMutexCAS.h new file mode 100644 index 000000000..430afd5e8 --- /dev/null +++ b/src/cas/generic/st/osiMutexCAS.h @@ -0,0 +1,6 @@ + +// +// single threaded code NOOPs the mutex class +// +#include + diff --git a/src/cas/generic/templInst.cc b/src/cas/generic/templInst.cc new file mode 100644 index 000000000..2fb54bccb --- /dev/null +++ b/src/cas/generic/templInst.cc @@ -0,0 +1,28 @@ + +// +// $Id$ +// +// Explcit instantiation of template instances used by the server +// +// + +#include "casdef.h" +#include "resourceLib.h" +#include "resourceLib.cc" + +// +// Sun C++ 4.1 still appears to be lacking support in this area +// +#if !defined(__SUNPRO_CC) + // + // From Stroustrups's "The C++ Programming Language" + // Appendix A: r.14.9 + // + // This explicitly instantiates the template class's member + // functions into "templInst.o" + // + template class resTable ; + template class resTable ; + template class resTable ; +#endif + diff --git a/src/cas/io/bsdSocket/casDGIntfIO.cc b/src/cas/io/bsdSocket/casDGIntfIO.cc new file mode 100644 index 000000000..e7e15106c --- /dev/null +++ b/src/cas/io/bsdSocket/casDGIntfIO.cc @@ -0,0 +1,442 @@ +/* + * + * Author: Jeffrey O. Hill + * hill@luke.lanl.gov + * (505) 665 1831 + * + * Experimental Physics and Industrial Control System (EPICS) + * + * Copyright 1991, the Regents of the University of California, + * and the University of Chicago Board of Governors. + * + * This software was produced under U.S. Government contracts: + * (W-7405-ENG-36) at the Los Alamos National Laboratory, + * and (W-31-109-ENG-38) at Argonne National Laboratory. + * + * Initial development by: + * The Controls and Automation Group (AT-8) + * Ground Test Accelerator + * Accelerator Technology Division + * Los Alamos National Laboratory + * + * Co-developed with + * The Controls and Computing Group + * Accelerator Systems Division + * Advanced Photon Source + * Argonne National Laboratory + * + * History + */ + +// +// +// Should I fetch the MTU from the outgoing interface? +// +// + +#include + +// +// casDGIntfIO::casDGIntfIO() +// +casDGIntfIO::casDGIntfIO(casDGClient &clientIn) : + pAltOutIO(NULL), + client(clientIn), + sock(INVALID_SOCKET), + sockState(casOffLine), + connectWithThisPort(0) +{ + ellInit(&this->beaconAddrList); +} + +// +// casDGIntfIO::init() +// +caStatus casDGIntfIO::init(const caAddr &addr, unsigned connectWithThisPortIn, + int autoBeaconAddr, int addConfigBeaconAddr, + int useBroadcastAddr, casDGIntfIO *pAltOutIn) +{ + int yes = TRUE; + caAddr serverAddr; + int status; + unsigned short serverPort; + unsigned short beaconPort; + ELLLIST BCastAddrList; + + if (pAltOutIn) { + this->pAltOutIO = pAltOutIn; + } + else { + this->pAltOutIO = this; + } + + this->sock = socket (AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (this->sock == INVALID_SOCKET) { + errMessage(S_cas_noMemory, + "CAS: unable to create cast socket\n"); + return S_cas_noMemory; + } + + status = setsockopt( + this->sock, + SOL_SOCKET, + SO_BROADCAST, + (char *)&yes, + sizeof(yes)); + if (status<0) { + errMessage(S_cas_internal, + "CAS: unable to set up cast socket\n"); + return S_cas_internal; + } + + /* + * release the port in case we exit early. Also if + * on a kernel with MULTICAST mods then we can have + * two UDP servers on the same port number (requires + * setting SO_REUSEADDR prior to the bind step below). + */ + status = setsockopt( + this->sock, + SOL_SOCKET, + SO_REUSEADDR, + (char *) &yes, + sizeof (yes)); + if (status<0) { + errMessage(S_cas_internal, + "CAS: unable to set SO_REUSEADDR on UDP socket?\n"); + } + + // + // Fetch port configuration from EPICS environment variables + // + if (envParamIsEmpty(&EPICS_CAS_SERVER_PORT)) { + serverPort = caFetchPortConfig(&EPICS_CA_SERVER_PORT, + CA_SERVER_PORT); + } + else { + serverPort = caFetchPortConfig(&EPICS_CAS_SERVER_PORT, + CA_SERVER_PORT); + } + beaconPort = caFetchPortConfig(&EPICS_CA_REPEATER_PORT, + CA_REPEATER_PORT); + + /* + * discover beacon addresses associated with this interface + */ + serverAddr.in = addr.in; + if (autoBeaconAddr || useBroadcastAddr) { + ellInit(&BCastAddrList); + caDiscoverInterfaces( + &BCastAddrList, + this->sock, + beaconPort, + serverAddr.in.sin_addr); /* match addr */ + if (useBroadcastAddr) { + caAddrNode *pAddr; + if (ellCount(&BCastAddrList)!=1) { + return S_cas_noInterface; + } + pAddr = (caAddrNode *) ellFirst(&BCastAddrList); + serverAddr.in.sin_addr = pAddr->destAddr.in.sin_addr; + } + if (autoBeaconAddr) { + ellConcat(&this->beaconAddrList, &BCastAddrList); + } + else { + ellFree(&BCastAddrList); + } + } + + serverAddr.in.sin_port = htons (serverPort); + status = bind( + this->sock, + &serverAddr.sa, + sizeof (serverAddr.sa)); + if (status<0) { + errPrintf(S_cas_bindFail, + __FILE__, __LINE__, + "- bind UDP IP addr=%s port=%u failed because %s", + inet_ntoa(serverAddr.in.sin_addr), + (unsigned) serverPort, + strerror(SOCKERRNO)); + return S_cas_bindFail; + } + + if (addConfigBeaconAddr) { + + /* + * by default use EPICS_CA_ADDR_LIST for the + * beacon address list + */ + ENV_PARAM *pParam = &EPICS_CA_ADDR_LIST; + + if (envParamIsEmpty(&EPICS_CAS_INTF_ADDR_LIST) && + envParamIsEmpty(&EPICS_CAS_BEACON_ADDR_LIST)) { + pParam = &EPICS_CA_ADDR_LIST; + } + else { + pParam = &EPICS_CAS_BEACON_ADDR_LIST; + } + + /* + * add in the configured addresses + */ + caAddConfiguredAddr( + &this->beaconAddrList, + pParam, + this->sock, + beaconPort); + } + + this->connectWithThisPort = connectWithThisPortIn; + this->sockState=casOnLine; + + return S_cas_success; +} + +// +// use an initialize routine ? +// +casDGIntfIO::~casDGIntfIO() +{ + if(this->sock!=INVALID_SOCKET){ + socket_close(this->sock); + } + + ellFree(&this->beaconAddrList); +} + +// +// casDGIntfIO::clientHostName() +// +void casDGIntfIO::clientHostName (char *pBuf, unsigned /* bufSize */) const +{ + // + // should eventually get the address of the last DG + // received + // + pBuf[0] = '\0'; +} + +// +// casDGIntfIO::show() +// +void casDGIntfIO::osdShow (unsigned level) const +{ + printf ("casDGIntfIO at %x\n", (unsigned) this); + if (level>=1u) { + char buf[64]; + this->clientHostName(buf, sizeof(buf)); + printf("Client Host=%s\n", buf); + } +} + +// +// casDGIntfIO::xSetNonBlocking() +// +void casDGIntfIO::xSetNonBlocking() +{ + int status; + int yes = TRUE; + + if (this->sockState!=casOnLine) { + return; + } + + status = socket_ioctl(this->sock, FIONBIO, &yes); + if (status<0) { + ca_printf("%s:CAS: UDP non blocking IO set fail because \"%s\"\n", + __FILE__, strerror(SOCKERRNO)); + this->sockState = casOffLine; + } +} + +// +// casDGIntfIO::osdRecv() +// +xRecvStatus casDGIntfIO::osdRecv(char *pBuf, bufSizeT size, + bufSizeT &actualSize, caAddr &from) +{ + int status; + int addrSize; + + if (this->sockState!=casOnLine) { + return xRecvDisconnect; + } + + addrSize = sizeof(from.sa); + status = recvfrom(this->sock, pBuf, size, 0, + &from.sa, &addrSize); + if (status<0) { + if(SOCKERRNO == EWOULDBLOCK){ + actualSize = 0u; + return xRecvOK; + } + else { + ca_printf("CAS: UDP recv error was %s", + strerror(SOCKERRNO)); + actualSize = 0u; + return xRecvOK; + } + } + + actualSize = (bufSizeT) status; + return xRecvOK; +} + +// +// casDGIntfIO::osdSend() +// +xSendStatus casDGIntfIO::osdSend(const char *pBuf, bufSizeT size, + bufSizeT &actualSize, const caAddr &to) +{ + int status; + int anerrno; + + if (this->sockState!=casOnLine) { + return xSendDisconnect; + } + + if (size==0u) { + actualSize = 0u; + return xSendOK; + } + + // + // (char *) cast below is for brain dead wrs prototype + // + status = sendto(this->sock, (char *) pBuf, size, 0, + (sockaddr *) &to.sa, sizeof(to.sa)); + if (status>0) { + if (size != (unsigned) status) { + ca_printf ("CAS: partial UDP msg discarded??\n"); + } + } + else if (status==0) { + this->sockState = casOffLine; + ca_printf ("CAS: UDP send returns zero??\n"); + return xSendDisconnect; + } + else { + anerrno = SOCKERRNO; + + if (anerrno != EWOULDBLOCK) { + char buf[64]; + this->clientHostName(buf, sizeof(buf)); + ca_printf( + "CAS: UDP socket send to \"%s\" failed because \"%s\"\n", + buf, strerror(anerrno)); + } + } + + actualSize = size; + return xSendOK; +} + + +// +// casDGIntfIO::sendBeacon() +// +void casDGIntfIO::sendBeacon(char &msg, unsigned length, aitUint32 &m_avail) +{ + caAddrNode *pAddr; + int status; + + if (this->sockState!=casOnLine) { + return; + } + + for( pAddr = (caAddrNode *)ellFirst(&this->beaconAddrList); + pAddr; + pAddr = (caAddrNode *)ellNext(&pAddr->node)) { + + m_avail = htonl(pAddr->srcAddr.in.sin_addr.s_addr); + + status = sendto( + this->sock, + &msg, + length, + 0, + &pAddr->destAddr.sa, + sizeof(pAddr->destAddr.sa)); + if (status < 0) { + char buf[64]; + ipAddrToA(&pAddr->destAddr.in, buf, sizeof(buf)); + + ca_printf( + "CAS:beacon error was \"%s\" dest=%s sock=%d\n", + strerror(SOCKERRNO), + buf, + this->sock); + } + } +} + +// +// casDGIntfIO::optimumBufferSize() +// +bufSizeT casDGIntfIO::optimumBufferSize () +{ + +#if 1 + // + // must update client before the message size can be + // increased here + // + return MAX_UDP; +#else + int n; + int size; + int status; + + if (this->sockState!=casOnLine) { + return MAX_UDP; + } + + /* fetch the TCP send buffer size */ + n = sizeof(size); + status = getsockopt( + this->sock, + SOL_SOCKET, + SO_SNDBUF, + (char *)&size, + &n); + if(status < 0 || n != sizeof(size)){ + size = MAX_UDP; + } + + if (size<=0) { + size = MAX_UDP; + } + return (bufSizeT) size; +#endif +} + + +// +// casDGIntfIO::state() +// +casIOState casDGIntfIO::state() const +{ + return this->sockState; +} + +// +// casDGIntfIO::getFD() +// +int casDGIntfIO::getFD() const +{ + return this->sock; +} + +// +// casDGIntfIO::serverPortNumber() +// +// the server's port number +// (to be used for connection requests) +// +unsigned casDGIntfIO::serverPortNumber() +{ + return this->connectWithThisPort; +} + diff --git a/src/cas/io/bsdSocket/casIODIL.h b/src/cas/io/bsdSocket/casIODIL.h new file mode 100644 index 000000000..aea9286dc --- /dev/null +++ b/src/cas/io/bsdSocket/casIODIL.h @@ -0,0 +1,20 @@ + +#ifndef casIODILh +#define casIODILh + +// +// casDGIntfIO::processDG() +// +inline void casDGIntfIO::processDG() +{ + assert(this->pAltOutIO); + + // + // process the request DG and send a response DG + // if it is required + // + this->client.processDG(*this,*this->pAltOutIO); +} + +#endif // casIODILh + diff --git a/src/cas/io/bsdSocket/casIntfIO.cc b/src/cas/io/bsdSocket/casIntfIO.cc new file mode 100644 index 000000000..edfa65fbd --- /dev/null +++ b/src/cas/io/bsdSocket/casIntfIO.cc @@ -0,0 +1,271 @@ +// +// $Id$ +// +// Author Jeff Hill +// +// +// +// $Log$ +// + +#include + +// +// 5 appears to be a TCP/IP built in maximum +// +const unsigned caServerConnectPendQueueSize = 5u; + +// +// casIntfIO::casIntfIO() +// +casIntfIO::casIntfIO() : + pNormalUDP(NULL), + pBCastUDP(NULL), + sock(INVALID_SOCKET) +{ + memset(&addr, '\0', sizeof(addr)); +} + +// +// casIntfIO::init() +// +caStatus casIntfIO::init(const caAddr &addrIn, casDGClient &dgClientIn, + int autoBeaconAddr, int addConfigBeaconAddr) +{ + int yes = TRUE; + int status; + caStatus stat; + int addrSize; + + /* + * Setup the server socket + */ + this->sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (this->sock==INVALID_SOCKET) { + return S_cas_noFD; + } + + /* + * release the port in case we exit early + */ + status = setsockopt ( + this->sock, + SOL_SOCKET, + SO_REUSEADDR, + (char *) &yes, + sizeof (yes)); + if (status<0) { + ca_printf("CAS: server set SO_REUSEADDR failed?\n", + strerror(SOCKERRNO)); + return S_cas_internal; + } + + this->addr = addrIn; + status = bind( + this->sock, + (sockaddr *) &this->addr.sa, + sizeof(this->addr)); + if (status<0) { + if (SOCKERRNO == EADDRINUSE) { + // + // force assignement of a default port + // (so the getsockname() call below will + // work correctly) + // + this->addr.in.sin_port = ntohs (0); + status = bind( + this->sock, + &this->addr.sa, + sizeof(this->addr)); + } + if (status<0) { + errPrintf(S_cas_bindFail, + __FILE__, __LINE__, + "- bind TCP IP addr=%s port=%u failed because %s", + inet_ntoa(this->addr.in.sin_addr), + ntohs(this->addr.in.sin_port), + strerror(SOCKERRNO)); + return S_cas_bindFail; + } + } + + addrSize = sizeof(this->addr); + status = getsockname(this->sock, &this->addr.sa, &addrSize); + if (status) { + ca_printf("CAS: getsockname() error %s\n", strerror(SOCKERRNO)); + return S_cas_internal; + } + + // + // be sure of this now so that we can fetch the IP + // address and port number later + // + assert (this->addr.sa.sa_family == AF_INET); + + status = listen(this->sock, caServerConnectPendQueueSize); + if(status < 0) { + ca_printf("CAS: listen() error %s\n", strerror(SOCKERRNO)); + return S_cas_internal; + } + + // + // set up a DG socket bound to the specified interface + // (or INADDR_ANY) + // + this->pNormalUDP = this->newDGIntfIO(dgClientIn); + if (!this->pNormalUDP) { + return S_cas_noMemory; + } + stat = this->pNormalUDP->init(addr, this->portNumber(), + autoBeaconAddr, addConfigBeaconAddr); + if (stat) { + return stat; + } + else { + this->pNormalUDP->start(); + } + + // + // If they are binding to a particular interface then + // we will also need to bind to the broadcast address + // for that interface (if it has one) + // + if (this->addr.in.sin_addr.s_addr != INADDR_ANY) { + this->pBCastUDP = this->newDGIntfIO(dgClientIn); + if (this->pBCastUDP) { + stat = this->pBCastUDP->init(addr, this->portNumber(), + FALSE, FALSE, TRUE, this->pNormalUDP); + if (stat) { + if (stat==S_cas_noInterface) { + delete this->pBCastUDP; + this->pBCastUDP = NULL; + } + else { + errMessage(stat, + "server will not receive broadcasts"); + } + } + else { + this->pBCastUDP->start(); + } + } + else { + errMessage(S_cas_noMemory, + "failed to create broadcast socket"); + } + } + return S_cas_success; +} + +// +// casIntfIO::~casIntfIO() +// +casIntfIO::~casIntfIO() +{ + if (this->sock != INVALID_SOCKET) { + socket_close(this->sock); + } + if (this->pNormalUDP) { + delete this->pNormalUDP; + } + if (this->pBCastUDP) { + delete this->pBCastUDP; + } +} + +// +// newStreamIO::newStreamClient() +// +casStreamOS *casIntfIO::newStreamClient(caServerI &cas) const +{ + caAddr newAddr; + SOCKET newSock; + int length; + casStreamOS *pOS; + + length = sizeof(newAddr.sa); + newSock = accept(this->sock, &newAddr.sa, &length); + if (newSock==INVALID_SOCKET) { + if (SOCKERRNO!=EWOULDBLOCK) { + ca_printf( + "CAS: %s accept error %s\n", + __FILE__, + strerror(SOCKERRNO)); + } + return NULL; + } + else if (sizeof(newAddr.sa)>(size_t)length) { + socket_close(newSock); + ca_printf("CAS: accept returned bad address len?\n"); + return NULL; + } + + ioArgsToNewStreamIO args; + args.addr = newAddr; + args.sock = newSock; + pOS = new casStreamOS(cas, args); + if (!pOS) { + socket_close(newSock); + } + return pOS; +} + +// +// casIntfIO::setNonBlocking() +// +void casIntfIO::setNonBlocking() +{ + int status; + int yes = TRUE; + + status = socket_ioctl(this->sock, FIONBIO, &yes); + if (status<0) { + ca_printf( + "%s:CAS: server non blocking IO set fail because \"%s\"\n", + __FILE__, strerror(SOCKERRNO)); + } +} + +// +// casIntfIO::getFD() +// +int casIntfIO::getFD() const +{ + return this->sock; +} + +// +// casIntfIO::show() +// +void casIntfIO::show(unsigned level) const +{ + if (level>2u) { + printf(" casIntfIO::sock = %d\n", this->sock); + } +} + +// +// casIntfIO::portNumber() +// +unsigned casIntfIO::portNumber() const +{ + return ntohs(this->addr.in.sin_port); +} + +// +// casIntfIO::requestBeacon() +// +void casIntfIO::requestBeacon() +{ + // + // the broadcast bound socket is not used here because + // it will have the wrong source address. This + // casDGIntfIO has a list of all beacon addresses + // that have been configured (no need to use + // this->pBCastUDP). + // + if (this->pNormalUDP) { + casDGClient::sendBeacon(*this->pNormalUDP); + } +} +