From 596034d6c6f866a60ea4b5a1e303dd6efc9d0918 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Mon, 10 Jan 2011 11:23:31 +0100 Subject: [PATCH 1/7] Added debug build to the project Fixes to getBroadcastAddresses(). --- .cproject | 11 ++++- configure/CONFIG_SITE | 9 ++++- pvAccessApp/utils/inetAddressUtil.cpp | 56 +++++++++++++++++--------- testApp/utils/inetAddressUtilsTest.cpp | 2 + 4 files changed, 57 insertions(+), 21 deletions(-) diff --git a/.cproject b/.cproject index e91f269..0e0c996 100644 --- a/.cproject +++ b/.cproject @@ -307,7 +307,6 @@ make - all true true @@ -315,7 +314,6 @@ make - clean true true @@ -323,11 +321,20 @@ make + uninstall true true false + + make + + clean all DEBUG=1 + true + true + false + diff --git a/configure/CONFIG_SITE b/configure/CONFIG_SITE index 287667b..7f70b02 100644 --- a/configure/CONFIG_SITE +++ b/configure/CONFIG_SITE @@ -24,4 +24,11 @@ # take effect. #IOCS_APPL_TOP = -USR_LDFLAGS += -lpthread +ifeq ($(DEBUG),1) + DEBUG_CFLAGS=-O0 -g -ggdb +endif + +ifeq ($(EPICS_HOST_ARCH),linux-x86) + USR_LDFLAGS += -lpthread +endif + diff --git a/pvAccessApp/utils/inetAddressUtil.cpp b/pvAccessApp/utils/inetAddressUtil.cpp index 77eac47..7eeb41c 100644 --- a/pvAccessApp/utils/inetAddressUtil.cpp +++ b/pvAccessApp/utils/inetAddressUtil.cpp @@ -40,6 +40,8 @@ namespace epics { int status; struct ifconf ifconf; struct ifreq* pIfreqList; + struct ifreq* pifreq; + struct ifreq ifrBuff; osiSockAddr* pNewNode; InetAddrVector* retVector = new InetAddrVector(); @@ -62,39 +64,53 @@ namespace epics { ifconf.ifc_req = pIfreqList; status = ioctl(sock, SIOCGIFCONF, &ifconf); if(status<0||ifconf.ifc_len==0) { - errlogSevPrintf( - errlogMinor, + errlogSevPrintf(errlogMinor, "getBroadcastAddresses(): unable to fetch network interface configuration"); delete[] pIfreqList; return retVector; } - errlogPrintf("Found %d interfaces\n", ifconf.ifc_len); + int maxNodes = ifconf.ifc_len/sizeof(ifreq); + //errlogPrintf("Found %d interfaces\n", maxNodes); + + pifreq = pIfreqList; + + for(int i = 0; i<=maxNodes; i++) { + if(!(*pifreq->ifr_name)) break; + + if(i>0) { + size_t n = sizeof(sockaddr)+sizeof(pifreq->ifr_name); + if(nifr_addr.sa_family!=AF_INET) continue; - status = ioctl(sock, SIOCGIFFLAGS, &pIfreqList[i]); + strncpy(ifrBuff.ifr_name, pifreq->ifr_name, + sizeof(ifrBuff.ifr_name)); + status = ioctl(sock, SIOCGIFFLAGS, &ifrBuff); if(status) { errlogSevPrintf( errlogMinor, "getBroadcastAddresses(): net intf flags fetch for \"%s\" failed", - pIfreqList[i].ifr_name); + pifreq->ifr_name); continue; } /* * dont bother with interfaces that have been disabled */ - if(!(pIfreqList[i].ifr_flags&IFF_UP)) continue; + if(!(ifrBuff.ifr_flags&IFF_UP)) continue; /* * dont use the loop back interface */ - if(pIfreqList[i].ifr_flags&IFF_LOOPBACK) continue; + if(ifrBuff.ifr_flags&IFF_LOOPBACK) continue; pNewNode = new osiSockAddr; if(pNewNode==NULL) { @@ -114,37 +130,41 @@ namespace epics { * Otherwise CA will not query through the * interface. */ - if(pIfreqList[i].ifr_flags&IFF_BROADCAST) { - status = ioctl(sock, SIOCGIFBRDADDR, &pIfreqList[i]); + if(ifrBuff.ifr_flags&IFF_BROADCAST) { + strncpy(ifrBuff.ifr_name, pifreq->ifr_name, + sizeof(ifrBuff.ifr_name)); + status = ioctl(sock, SIOCGIFBRDADDR, &ifrBuff); if(status) { errlogSevPrintf( errlogMinor, "getBroadcastAddresses(): net intf \"%s\": bcast addr fetch fail", - pIfreqList->ifr_name); + pifreq->ifr_name); delete pNewNode; continue; } - pNewNode->sa = pIfreqList[i].ifr_broadaddr; + pNewNode->sa = ifrBuff.ifr_broadaddr; } #ifdef IFF_POINTOPOINT - else if(pIfreqList->ifr_flags&IFF_POINTOPOINT) { - status = ioctl(sock, SIOCGIFDSTADDR, &pIfreqList[i]); + else if(ifrBuff.ifr_flags&IFF_POINTOPOINT) { + strncpy(ifrBuff.ifr_name, pifreq->ifr_name, + sizeof(ifrBuff.ifr_name)); + status = ioctl(sock, SIOCGIFDSTADDR, &ifrBuff); if(status) { errlogSevPrintf( errlogMinor, "getBroadcastAddresses(): net intf \"%s\": pt to pt addr fetch fail", - pIfreqList[i].ifr_name); + pifreq->ifr_name); delete pNewNode; continue; } - pNewNode->sa = pIfreqList[i].ifr_dstaddr; + pNewNode->sa = ifrBuff.ifr_dstaddr; } #endif else { errlogSevPrintf( errlogMinor, "getBroadcastAddresses(): net intf \"%s\": not point to point or bcast?", - pIfreqList[i].ifr_name); + pifreq->ifr_name); delete pNewNode; continue; } diff --git a/testApp/utils/inetAddressUtilsTest.cpp b/testApp/utils/inetAddressUtilsTest.cpp index 5a594b7..8594254 100644 --- a/testApp/utils/inetAddressUtilsTest.cpp +++ b/testApp/utils/inetAddressUtilsTest.cpp @@ -6,6 +6,7 @@ */ #include "inetAddressUtil.h" +#include "logger.h" #include #include @@ -23,6 +24,7 @@ using std::stringstream; using std::hex; int main(int argc, char *argv[]) { + createFileLogger("inetAddresUtils.log"); InetAddrVector *vec; InetAddrVector *vec1; From df7a9fa0739780bd7e5721fb4f4322ff1f17fcdd Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Mon, 10 Jan 2011 14:09:58 +0100 Subject: [PATCH 2/7] Added configuration to Context and fixed all the users. --- pvAccessApp/Makefile | 1 + pvAccessApp/remote/remote.h | 211 +++++----- pvAccessApp/server/responseHandlers.cpp | 25 +- pvAccessApp/server/responseHandlers.h | 10 +- pvAccessApp/server/serverContext.h | 5 +- testApp/remote/testBeaconEmitter.cpp | 28 +- testApp/remote/testBeaconHandler.cpp | 28 +- testApp/remote/testBlockingTCPClnt.cpp | 24 +- testApp/remote/testBlockingTCPSrv.cpp | 10 +- testApp/remote/testBlockingUDPClnt.cpp | 39 +- testApp/remote/testBlockingUDPSrv.cpp | 39 +- testApp/remote/testRemoteClientImpl.cpp | 519 ++++++++++++------------ 12 files changed, 532 insertions(+), 407 deletions(-) diff --git a/pvAccessApp/Makefile b/pvAccessApp/Makefile index bd26f53..2f35089 100644 --- a/pvAccessApp/Makefile +++ b/pvAccessApp/Makefile @@ -67,6 +67,7 @@ LIBSRCS += blockingTCPConnector.cpp LIBSRCS += blockingServerTCPTransport.cpp LIBSRCS += blockingTCPAcceptor.cpp LIBSRCS += channelSearchManager.cpp +LIBSRCS += abstractResponseHandler.cpp LIBRARY = pvAccess pvAccess_LIBS += Com diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index f5129d1..8a8aaea 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -11,7 +11,7 @@ #include "caConstants.h" #include "transportRegistry.h" #include "introspectionRegistry.h" -#include "serverContext.h" +#include "configuration.h" #include #include @@ -32,42 +32,42 @@ namespace epics { }; enum QoS { - /** - * Default behavior. - */ - DEFAULT = 0x00, - /** - * Require reply (acknowledgment for reliable operation). - */ - REPLY_REQUIRED = 0x01, - /** - * Best-effort option (no reply). - */ - BESY_EFFORT = 0x02, - /** - * Process option. - */ - PROCESS = 0x04, - /** - * Initialize option. - */ - INIT = 0x08, - /** - * Destroy option. - */ - DESTROY = 0x10, - /** - * Share data option. - */ - SHARE = 0x20, - /** - * Get. - */ - GET = 0x40, - /** - * Get-put. - */ - GET_PUT =0x80 + /** + * Default behavior. + */ + DEFAULT = 0x00, + /** + * Require reply (acknowledgment for reliable operation). + */ + REPLY_REQUIRED = 0x01, + /** + * Best-effort option (no reply). + */ + BESY_EFFORT = 0x02, + /** + * Process option. + */ + PROCESS = 0x04, + /** + * Initialize option. + */ + INIT = 0x08, + /** + * Destroy option. + */ + DESTROY = 0x10, + /** + * Share data option. + */ + SHARE = 0x20, + /** + * Get. + */ + GET = 0x40, + /** + * Get-put. + */ + GET_PUT = 0x80 }; typedef int32 pvAccessID; @@ -253,6 +253,35 @@ namespace epics { }; + class Channel; + + /** + * Not public IF, used by Transports, etc. + */ + class Context { + public: + virtual ~Context() { + } + /** + * Get timer. + * @return timer. + */ + virtual Timer* getTimer() = 0; + + /** + * Get transport (virtual circuit) registry. + * @return transport (virtual circuit) registry. + */ + virtual TransportRegistry* getTransportRegistry() = 0; + + virtual Channel* getChannel(pvAccessID id) = 0; + + virtual Transport* getSearchTransport() = 0; + + virtual Configuration* getConfiguration() = 0; + + }; + /** * Interface defining response handler. * @author Matej Sekoranja @@ -260,6 +289,10 @@ namespace epics { */ class ResponseHandler { public: + ResponseHandler(Context* context) : + _context(context) { + } + virtual ~ResponseHandler() { } @@ -277,6 +310,9 @@ namespace epics { handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) =0; + + protected: + Context* _context; }; /** @@ -289,9 +325,10 @@ namespace epics { /** * @param description */ - AbstractResponseHandler(String description) : - _description(description), _debug(true) { - //debug = System.getProperties().containsKey(CAConstants.CAJ_DEBUG); + AbstractResponseHandler(Context* context, String description) : + ResponseHandler(context), _description(description), _debug( + _context->getConfiguration()->getPropertyAsBoolean( + "PVACCESS_DEBUG", false)) { } virtual ~AbstractResponseHandler() { @@ -373,33 +410,6 @@ namespace epics { }; - class Channel; - - /** - * Not public IF, used by Transports, etc. - */ - class Context { - public: - virtual ~Context() { - } - /** - * Get timer. - * @return timer. - */ - virtual Timer* getTimer() =0; - - /** - * Get transport (virtual circuit) registry. - * @return transport (virtual circuit) registry. - */ - virtual TransportRegistry* getTransportRegistry() =0; - - virtual Channel* getChannel(pvAccessID id) = 0; - - virtual Transport* getSearchTransport() = 0; - - }; - /** * Interface defining reference counting transport IF. * @author Matej Sekoranja @@ -474,7 +484,8 @@ namespace epics { * @param sid preallocated channel SID. * @param channel channel to register. */ - virtual void registerChannel(pvAccessID sid, ServerChannel* channel) =0; + virtual void + registerChannel(pvAccessID sid, ServerChannel* channel) =0; /** * Unregister a new channel (and deallocates its handle). @@ -495,44 +506,44 @@ namespace epics { */ virtual int getChannelCount() =0; }; - + /** * A request that expects an response. - * Responses identified by its I/O ID. + * Responses identified by its I/O ID. * This interface needs to be extended (to provide method called on response). * @author Matej Sekoranja */ class ResponseRequest { - public: - - /** - * Get I/O ID. - * @return ioid - */ - virtual pvAccessID getIOID() = 0; - - /** - * Timeout notification. - */ - virtual void timeout() = 0; - - /** - * Cancel response request (always to be called to complete/destroy). - */ - virtual void cancel() = 0; - - /** - * Report status to clients (e.g. disconnected). - * @param status to report. - */ - virtual void reportStatus(epics::pvData::Status* status) = 0; - - /** - * Get request requester. - * @return request requester. - */ - virtual epics::pvData::Requester* getRequester() = 0; - }; + public: + + /** + * Get I/O ID. + * @return ioid + */ + virtual pvAccessID getIOID() = 0; + + /** + * Timeout notification. + */ + virtual void timeout() = 0; + + /** + * Cancel response request (always to be called to complete/destroy). + */ + virtual void cancel() = 0; + + /** + * Report status to clients (e.g. disconnected). + * @param status to report. + */ + virtual void reportStatus(epics::pvData::Status* status) = 0; + + /** + * Get request requester. + * @return request requester. + */ + virtual epics::pvData::Requester* getRequester() = 0; + }; } } diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index e3bbad5..c0ab2ae 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -24,23 +24,6 @@ using namespace epics::pvData; namespace epics { namespace pvAccess { - void AbstractResponseHandler::handleResponse(osiSockAddr* responseFrom, - Transport* transport, int8 version, int8 command, - int payloadSize, ByteBuffer* payloadBuffer) { - if(_debug) { - char ipAddrStr[48]; - ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); - - ostringstream prologue; - prologue<<"Message [0x"<getArray(), - payloadBuffer->getPosition(), payloadSize); - } - } - void BadResponse::handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, ByteBuffer* payloadBuffer) { @@ -57,15 +40,15 @@ namespace epics { } ServerResponseHandler::ServerResponseHandler(ServerContextImpl* context) : - _context(context) { + ResponseHandler(context) { BadResponse* badResponse = new BadResponse(context); _handlerTable = new ResponseHandler*[HANDLER_TABLE_LENGTH]; // TODO add real handlers, as they are developed - _handlerTable[0] = new NoopResponse(_context, "Beacon"); - _handlerTable[1] = new ConnectionValidationHandler(_context); - _handlerTable[2] = new EchoHandler(_context); + _handlerTable[0] = new NoopResponse(context, "Beacon"); + _handlerTable[1] = new ConnectionValidationHandler(context); + _handlerTable[2] = new EchoHandler(context); _handlerTable[3] = badResponse; _handlerTable[4] = badResponse; _handlerTable[5] = badResponse; diff --git a/pvAccessApp/server/responseHandlers.h b/pvAccessApp/server/responseHandlers.h index afbcdba..7aeab36 100644 --- a/pvAccessApp/server/responseHandlers.h +++ b/pvAccessApp/server/responseHandlers.h @@ -8,6 +8,7 @@ #ifndef RESPONSEHANDLERS_H_ #define RESPONSEHANDLERS_H_ +#include "serverContext.h" #include "remote.h" namespace epics { @@ -25,13 +26,11 @@ namespace epics { */ AbstractServerResponseHandler(ServerContextImpl* context, String description) : - AbstractResponseHandler(description), _context(context) { + AbstractResponseHandler(context, description) { } virtual ~AbstractServerResponseHandler() { } - protected: - ServerContextImpl* _context; }; /** @@ -77,11 +76,6 @@ namespace epics { */ ResponseHandler** _handlerTable; - /** - * Context instance. - */ - ServerContextImpl* _context; - }; /** diff --git a/pvAccessApp/server/serverContext.h b/pvAccessApp/server/serverContext.h index d535616..c01d52e 100644 --- a/pvAccessApp/server/serverContext.h +++ b/pvAccessApp/server/serverContext.h @@ -8,11 +8,12 @@ #ifndef SERVERCONTEXT_H_ #define SERVERCONTEXT_H_ +#include "remote.h" + namespace epics { namespace pvAccess { - - class ServerContextImpl { + class ServerContextImpl : public Context { }; diff --git a/testApp/remote/testBeaconEmitter.cpp b/testApp/remote/testBeaconEmitter.cpp index 0d3c904..c1a3026 100644 --- a/testApp/remote/testBeaconEmitter.cpp +++ b/testApp/remote/testBeaconEmitter.cpp @@ -18,6 +18,8 @@ using namespace epics::pvData; class DummyResponseHandler : public ResponseHandler { public: + DummyResponseHandler(Context* ctx) : ResponseHandler(ctx) {} + virtual void handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, ByteBuffer* payloadBuffer) @@ -27,9 +29,33 @@ public: }; +class ContextImpl : public Context { +public: + ContextImpl() : + _tr(new TransportRegistry()), _timer(new Timer("server thread", + lowPriority)), _conf(new SystemConfigurationImpl()) { + } + virtual ~ContextImpl() { + delete _tr; + delete _timer; + } + virtual Timer* getTimer() { return _timer; } + virtual TransportRegistry* getTransportRegistry() { return _tr; } + virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; } + virtual Transport* getSearchTransport() { return 0; } + virtual Configuration* getConfiguration() { return _conf; } + +private: + TransportRegistry* _tr; + Timer* _timer; + Configuration* _conf; +}; + + void testBeaconEmitter() { - DummyResponseHandler drh; + ContextImpl ctx; + DummyResponseHandler drh(&ctx); /* SOCKET mysocket; if ((mysocket = socket (AF_INET, SOCK_DGRAM, 0)) == -1) { diff --git a/testApp/remote/testBeaconHandler.cpp b/testApp/remote/testBeaconHandler.cpp index 0ac231c..6690ab4 100644 --- a/testApp/remote/testBeaconHandler.cpp +++ b/testApp/remote/testBeaconHandler.cpp @@ -37,7 +37,7 @@ void decodeFromIPv6Address(ByteBuffer* buffer, osiSockAddr* address) class BeaconResponseHandler : public ResponseHandler { public: - BeaconResponseHandler() + BeaconResponseHandler(Context* ctx) : ResponseHandler(ctx) { _pvDataCreate = getPVDataCreate(); } @@ -103,9 +103,33 @@ private: }; +class ContextImpl : public Context { +public: + ContextImpl() : + _tr(new TransportRegistry()), _timer(new Timer("server thread", + lowPriority)), _conf(new SystemConfigurationImpl()) { + } + virtual ~ContextImpl() { + delete _tr; + delete _timer; + } + virtual Timer* getTimer() { return _timer; } + virtual TransportRegistry* getTransportRegistry() { return _tr; } + virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; } + virtual Transport* getSearchTransport() { return 0; } + virtual Configuration* getConfiguration() { return _conf; } + +private: + TransportRegistry* _tr; + Timer* _timer; + Configuration* _conf; +}; + + void testBeaconHandler() { - BeaconResponseHandler brh; + ContextImpl ctx; + BeaconResponseHandler brh(&ctx); BlockingUDPConnector connector(false, NULL, true); osiSockAddr bindAddr; diff --git a/testApp/remote/testBlockingTCPClnt.cpp b/testApp/remote/testBlockingTCPClnt.cpp index a3bde97..d845c1b 100644 --- a/testApp/remote/testBlockingTCPClnt.cpp +++ b/testApp/remote/testBlockingTCPClnt.cpp @@ -29,28 +29,30 @@ using std::sscanf; class ContextImpl : public Context { public: ContextImpl() : - _tr(new TransportRegistry()), _timer(new Timer("client thread", - lowPriority)) { + _tr(new TransportRegistry()), _timer(new Timer("server thread", + lowPriority)), _conf(new SystemConfigurationImpl()) { } virtual ~ContextImpl() { delete _tr; delete _timer; } - virtual Timer* getTimer() { - return _timer; - } - virtual TransportRegistry* getTransportRegistry() { - return _tr; - } - virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; } - virtual Transport* getSearchTransport() { return 0; } + virtual Timer* getTimer() { return _timer; } + virtual TransportRegistry* getTransportRegistry() { return _tr; } + virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; } + virtual Transport* getSearchTransport() { return 0; } + virtual Configuration* getConfiguration() { return _conf; } + private: TransportRegistry* _tr; Timer* _timer; + Configuration* _conf; }; class DummyResponseHandler : public ResponseHandler { public: + DummyResponseHandler(Context* ctx) : ResponseHandler(ctx) { + } + virtual void handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, ByteBuffer* payloadBuffer) { @@ -110,7 +112,7 @@ void testBlockingTCPSender() { DummyTransportClient dtc; DummyTransportSender dts; - DummyResponseHandler drh; + DummyResponseHandler drh(&ctx); osiSockAddr srvAddr; diff --git a/testApp/remote/testBlockingTCPSrv.cpp b/testApp/remote/testBlockingTCPSrv.cpp index 6ab54f6..c7ced35 100644 --- a/testApp/remote/testBlockingTCPSrv.cpp +++ b/testApp/remote/testBlockingTCPSrv.cpp @@ -8,6 +8,7 @@ #include "blockingTCP.h" #include "remote.h" #include "logger.h" +#include "configuration.h" #include @@ -21,7 +22,8 @@ class ContextImpl : public Context { public: ContextImpl() : _tr(new TransportRegistry()), - _timer(new Timer("server thread", lowPriority)) {} + _timer(new Timer("server thread", lowPriority)), + _conf(new SystemConfigurationImpl()) {} virtual ~ContextImpl() { delete _tr; delete _timer; @@ -30,10 +32,12 @@ public: virtual TransportRegistry* getTransportRegistry() { return _tr; } virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; } virtual Transport* getSearchTransport() { return 0; } - + virtual Configuration* getConfiguration() { return _conf; } + private: TransportRegistry* _tr; Timer* _timer; + Configuration* _conf; }; void testServerConnections() { @@ -43,7 +47,7 @@ void testServerConnections() { 1024); cout<<"Press any key to stop the server..."; - char c = cin.peek(); + cin.peek(); delete srv; } diff --git a/testApp/remote/testBlockingUDPClnt.cpp b/testApp/remote/testBlockingUDPClnt.cpp index 06443aa..3e124d9 100644 --- a/testApp/remote/testBlockingUDPClnt.cpp +++ b/testApp/remote/testBlockingUDPClnt.cpp @@ -26,8 +26,44 @@ using std::sscanf; static osiSockAddr sendTo; +class ContextImpl : public Context { +public: + ContextImpl() : + _tr(new TransportRegistry()), _timer(new Timer("server thread", + lowPriority)), _conf(new SystemConfigurationImpl()) { + } + virtual ~ContextImpl() { + delete _tr; + delete _timer; + } + virtual Timer* getTimer() { + return _timer; + } + virtual TransportRegistry* getTransportRegistry() { + return _tr; + } + virtual Channel* getChannel(epics::pvAccess::pvAccessID) { + return 0; + } + virtual Transport* getSearchTransport() { + return 0; + } + virtual Configuration* getConfiguration() { + return _conf; + } + +private: + TransportRegistry* _tr; + Timer* _timer; + Configuration* _conf; +}; + class DummyResponseHandler : public ResponseHandler { public: + DummyResponseHandler(Context* ctx) : + ResponseHandler(ctx) { + } + virtual void handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, ByteBuffer* payloadBuffer) { @@ -69,9 +105,10 @@ private: void testBlockingUDPSender() { BlockingUDPConnector connector(false, NULL, true); + ContextImpl ctx; DummyTransportSender dts; - DummyResponseHandler drh; + DummyResponseHandler drh(&ctx); osiSockAddr bindAddr; diff --git a/testApp/remote/testBlockingUDPSrv.cpp b/testApp/remote/testBlockingUDPSrv.cpp index 6b713bb..16612ac 100644 --- a/testApp/remote/testBlockingUDPSrv.cpp +++ b/testApp/remote/testBlockingUDPSrv.cpp @@ -21,10 +21,42 @@ using std::endl; using std::hex; using std::dec; +class ContextImpl : public Context { +public: + ContextImpl() : + _tr(new TransportRegistry()), _timer(new Timer("server thread", + lowPriority)), _conf(new SystemConfigurationImpl()) { + } + virtual ~ContextImpl() { + delete _tr; + delete _timer; + } + virtual Timer* getTimer() { + return _timer; + } + virtual TransportRegistry* getTransportRegistry() { + return _tr; + } + virtual Channel* getChannel(epics::pvAccess::pvAccessID) { + return 0; + } + virtual Transport* getSearchTransport() { + return 0; + } + virtual Configuration* getConfiguration() { + return _conf; + } + +private: + TransportRegistry* _tr; + Timer* _timer; + Configuration* _conf; +}; + class DummyResponseHandler : public ResponseHandler { public: - DummyResponseHandler() : - packets(0) { + DummyResponseHandler(Context* context) : + ResponseHandler(context), packets(0) { } int getPackets() { @@ -71,8 +103,9 @@ void DummyResponseHandler::handleResponse(osiSockAddr* responseFrom, void testBlockingUDPConnector() { BlockingUDPConnector connector(false, NULL, true); + ContextImpl ctx; - DummyResponseHandler drh; + DummyResponseHandler drh(&ctx); osiSockAddr bindAddr; diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 4b0f311..987006d 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -20,6 +20,7 @@ #include #include #include +#include using namespace epics::pvData; using namespace epics::pvAccess; @@ -34,7 +35,7 @@ class ChannelImplProcess : public ChannelProcess ChannelProcessRequester* m_channelProcessRequester; PVStructure* m_pvStructure; PVScalar* m_valueField; - + private: ~ChannelImplProcess() { @@ -53,29 +54,29 @@ class ChannelImplProcess : public ChannelProcess Status* noValueFieldStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "no 'value' field"); m_channelProcessRequester->channelProcessConnect(noValueFieldStatus, this); delete noValueFieldStatus; - + // NOTE client must destroy this instance... // do not access any fields and return ASAP return; } - + if (field->getField()->getType() != scalar) { Status* notAScalarStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "'value' field not scalar type"); m_channelProcessRequester->channelProcessConnect(notAScalarStatus, this); delete notAScalarStatus; - + // NOTE client must destroy this instance…. // do not access any fields and return ASAP return; } - + m_valueField = static_cast(field); - - // TODO pvRequest + + // TODO pvRequest m_channelProcessRequester->channelProcessConnect(getStatusCreate()->getStatusOK(), this); } - + virtual void process(bool lastRequest) { switch (m_valueField->getScalar()->getScalarType()) @@ -147,19 +148,19 @@ class ChannelImplProcess : public ChannelProcess default: // noop break; - - } + + } m_channelProcessRequester->processDone(getStatusCreate()->getStatusOK()); - + if (lastRequest) destroy(); } - + virtual void destroy() { delete this; } - + }; @@ -176,7 +177,7 @@ class ChannelImplGet : public ChannelGet PVStructure* m_pvStructure; BitSet* m_bitSet; volatile bool m_first; - + private: ~ChannelImplGet() { @@ -190,10 +191,10 @@ class ChannelImplGet : public ChannelGet { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannelGet); - // TODO pvRequest + // TODO pvRequest m_channelGetRequester->channelGetConnect(getStatusCreate()->getStatusOK(), this, m_pvStructure, m_bitSet); } - + virtual void get(bool lastRequest) { m_channelGetRequester->getDone(getStatusCreate()->getStatusOK()); @@ -202,17 +203,17 @@ class ChannelImplGet : public ChannelGet m_first = false; m_bitSet->set(0); // TODO } - + if (lastRequest) destroy(); } - + virtual void destroy() { delete m_bitSet; delete this; } - + }; @@ -231,7 +232,7 @@ class ChannelImplPut : public ChannelPut PVStructure* m_pvStructure; BitSet* m_bitSet; volatile bool m_first; - + private: ~ChannelImplPut() { @@ -245,17 +246,17 @@ class ChannelImplPut : public ChannelPut { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannelPut); - // TODO pvRequest + // TODO pvRequest m_channelPutRequester->channelPutConnect(getStatusCreate()->getStatusOK(), this, m_pvStructure, m_bitSet); } - + virtual void put(bool lastRequest) { m_channelPutRequester->putDone(getStatusCreate()->getStatusOK()); if (lastRequest) destroy(); } - + virtual void get() { m_channelPutRequester->getDone(getStatusCreate()->getStatusOK()); @@ -266,7 +267,7 @@ class ChannelImplPut : public ChannelPut delete m_bitSet; delete this; } - + }; @@ -287,7 +288,7 @@ class MockMonitor : public Monitor, public MonitorElement volatile bool m_first; Mutex* m_lock; volatile int m_count; - + private: ~MockMonitor() { @@ -306,16 +307,16 @@ class MockMonitor : public Monitor, public MonitorElement PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockMonitor); m_changedBitSet->set(0); - - // TODO pvRequest + + // TODO pvRequest m_monitorRequester->monitorConnect(getStatusCreate()->getStatusOK(), this, const_cast(m_pvStructure->getStructure())); } - + virtual Status* start() { // fist monitor m_monitorRequester->monitorEvent(this); - + // client needs to delete status, so passing shared OK instance is not right thing to do return getStatusCreate()->createStatus(STATUSTYPE_OK, "Monitor started."); } @@ -346,24 +347,24 @@ class MockMonitor : public Monitor, public MonitorElement if (m_count) m_count--; } - + virtual void destroy() { delete stop(); - + delete m_lock; delete m_overrunBitSet; delete m_changedBitSet; delete this; } - + // ============ MonitorElement ============ - + virtual PVStructure* getPVStructure() { return m_pvStructure; } - + virtual BitSet* getChangedBitSet() { return m_changedBitSet; @@ -373,8 +374,8 @@ class MockMonitor : public Monitor, public MonitorElement { return m_overrunBitSet; } - - + + }; @@ -395,7 +396,8 @@ class ClientContextImpl; /** * @param context */ - DebugResponse() + DebugResponse(Context* ctx) : + ResponseHandler(ctx) { } @@ -419,7 +421,7 @@ class ClientContextImpl; hexDump(prologue.str(), "received", (const int8*)payloadBuffer->getArray(), payloadBuffer->getPosition(), payloadSize); - + } }; @@ -430,30 +432,30 @@ class ClientContextImpl; */ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoDefaultMethods { private: - + /** * Table of response handlers for each command ID. */ ResponseHandler** m_handlerTable; - /** - * Context instance. + /* + * Context instance is part of the response handler now */ - ClientContextImpl* m_context; - + //ClientContextImpl* m_context; + public: - + ~ClientResponseHandler() { delete[] m_handlerTable; } - + /** * @param context */ - ClientResponseHandler(ClientContextImpl* context) : m_context(context) { - static ResponseHandler* badResponse = new DebugResponse(); + ClientResponseHandler(ClientContextImpl* context) : ResponseHandler((Context*)context) { + static ResponseHandler* badResponse = new DebugResponse((Context*)context); static ResponseHandler* dataResponse = 0; //new DataResponseHandler(context); - + #define HANDLER_COUNT 28 m_handlerTable = new ResponseHandler*[HANDLER_COUNT]; m_handlerTable[ 0] = badResponse; // TODO new BeaconHandler(context), /* 0 */ @@ -502,7 +504,7 @@ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoD hexDump(buf, (const int8*)(payloadBuffer->getArray()), payloadBuffer->getPosition(), payloadSize); return; } - + // delegate m_handlerTable[c]->handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); } @@ -525,7 +527,7 @@ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoD }; - + class BeaconHandlerImpl; @@ -556,10 +558,10 @@ enum ContextState { */ CONTEXT_DESTROYED }; - + class ClientContextImpl : public ClientContext, -public Context /* TODO */ +public Context /* TODO */ { @@ -577,7 +579,7 @@ class ChannelImpl : public TransportSender, public BaseSearchInstance { private: - + /** * Context. */ @@ -587,12 +589,12 @@ class ChannelImpl : * Client channel ID. */ pvAccessID m_channelID; - + /** * Channel name. */ String m_name; - + /** * Channel requester. */ @@ -602,48 +604,48 @@ class ChannelImpl : * Process priority. */ short m_priority; - + /** * List of fixed addresses, if name resolution will be used. */ InetAddrVector* m_addresses; - + /** * Connection status. */ ConnectionState m_connectionState; - + /** - * List of all channel's pending requests (keys are subscription IDs). + * List of all channel's pending requests (keys are subscription IDs). */ IOIDResponseRequestMap m_responseRequests; - + /** - * Allow reconnection flag. + * Allow reconnection flag. */ bool m_allowCreation; - + /** * Reference counting. - * NOTE: synced on m_channelMutex. + * NOTE: synced on m_channelMutex. */ int m_references; - + /* ****************** */ - /* CA protocol fields */ + /* CA protocol fields */ /* ****************** */ - + /** * Server transport. */ Transport* m_transport; - + /** * Server channel ID. */ pvAccessID m_serverChannelID; - + /** * Context sync. mutex. */ @@ -651,20 +653,20 @@ class ChannelImpl : /** * Flag indicting what message to send. - */ + */ bool m_issueCreateMessage; - + // TODO mock PVStructure* m_pvStructure; - + private: - ~ChannelImpl() + ~ChannelImpl() { PVDATA_REFCOUNT_MONITOR_DESTRUCT(channel); } - + public: - + /** * Constructor. * @param context @@ -693,10 +695,10 @@ class ChannelImpl : m_issueCreateMessage(true) { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channel); - + // register before issuing search request m_context->registerChannel(this); - + // connect connect(); @@ -704,7 +706,7 @@ class ChannelImpl : // // mock - // + // ScalarType stype = pvDouble; String allProperties("alarm,timeStamp,display,control,valueAlarm"); @@ -713,14 +715,14 @@ class ChannelImpl : PVDouble *pvField = m_pvStructure->getDoubleField(String("value")); pvField->put(1.123); - + // already connected, report state m_requester->channelStateChange(this, CONNECTED); - - - + + + } - + virtual void destroy() { if (m_addresses) delete m_addresses; @@ -732,13 +734,13 @@ class ChannelImpl : { return getChannelName(); }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } - virtual ChannelProvider* getProvider() + virtual ChannelProvider* getProvider() { return m_context->getProvider(); } @@ -790,7 +792,7 @@ class ChannelImpl : pvAccessID getChannelID() { return m_channelID; } - + void connect() { Lock guard(&m_channelMutex); // if not destroyed... @@ -807,14 +809,14 @@ class ChannelImpl : throw std::runtime_error("Channel destroyed."); else if (m_connectionState == CONNECTED) disconnect(false, true); - } - + } + /** * Create a channel, i.e. submit create channel request to the server. * This method is called after search is complete. * @param transport */ - void createChannel(Transport* transport) + void createChannel(Transport* transport) { Lock guard(&m_channelMutex); @@ -822,12 +824,12 @@ class ChannelImpl : if (!m_allowCreation) return; m_allowCreation = false; - + // check existing transport if (m_transport && m_transport != transport) { disconnectPendingIO(false); - + ReferenceCountingTransport* rct = dynamic_cast(m_transport); if (rct) rct->release(this); } @@ -837,11 +839,11 @@ class ChannelImpl : // this happens when server is slower (processing search requests) than client generating it return; } - + m_transport = transport; m_transport->enqueueSendRequest(this); } - + virtual void cancel() { // noop } @@ -867,7 +869,7 @@ class ChannelImpl : * sid might not be valid, this depends on protocol revision. * @param sid */ - void connectionCompleted(pvAccessID sid/*, rights*/) + void connectionCompleted(pvAccessID sid/*, rights*/) { Lock guard(&m_channelMutex); @@ -884,16 +886,16 @@ class ChannelImpl : // user might create monitors in listeners, so this has to be done before this can happen // however, it would not be nice if events would come before connection event is fired - // but this cannot happen since transport (TCP) is serving in this thread + // but this cannot happen since transport (TCP) is serving in this thread resubscribeSubscriptions(); setConnectionState(CONNECTED); allOK = true; } catch (...) { // noop - // TODO at least log something?? + // TODO at least log something?? } - + if (!allOK) { // end connection request @@ -908,10 +910,10 @@ class ChannelImpl : Lock guard(&m_channelMutex); if (m_connectionState == DESTROYED) throw std::runtime_error("Channel already destroyed."); - + // do destruction via context m_context->destroyChannel(this, force); - + } /** @@ -938,7 +940,7 @@ class ChannelImpl : m_references--; if (m_references > 0 && !force) return; - + // stop searching... m_context->getChannelSearchManager()->unregisterChannel(this); cancel(); @@ -958,7 +960,7 @@ class ChannelImpl : } setConnectionState(DESTROYED); - + // unregister m_context->unregisterChannel(this); } @@ -970,10 +972,10 @@ class ChannelImpl : */ void disconnect(bool initiateSearch, bool remoteDestroy) { Lock guard(&m_channelMutex); - + if (m_connectionState != CONNECTED && !m_transport) return; - + if (!initiateSearch) { // stop searching... m_context->getChannelSearchManager()->unregisterChannel(this); @@ -990,12 +992,12 @@ class ChannelImpl : m_issueCreateMessage = false; m_transport->enqueueSendRequest(this); } - + ReferenceCountingTransport* rct = dynamic_cast(m_transport); if (rct) rct->release(this); m_transport = 0; } - + if (initiateSearch) this->initiateSearch(); @@ -1009,7 +1011,7 @@ class ChannelImpl : Lock guard(&m_channelMutex); m_allowCreation = true; - + if (!m_addresses) m_context->getChannelSearchManager()->registerChannel(this); /* TODO @@ -1035,7 +1037,7 @@ class ChannelImpl : return; } } - + transport = m_context->getTransport(this, serverAddress, minorRevision, m_priority); if (!transport) { @@ -1060,7 +1062,7 @@ class ChannelImpl : if (m_connectionState == DISCONNECTED) { updateSubscriptions(); - + // reconnect using existing IDs, data connectionCompleted(m_serverChannelID/*, accessRights*/); } @@ -1073,7 +1075,7 @@ class ChannelImpl : // NOTE: 2 types of disconnected state - distinguish them setConnectionState(DISCONNECTED); - // ... CA notifies also w/ no access rights callback, although access right are not changed + // ... CA notifies also w/ no access rights callback, although access right are not changed } } @@ -1087,7 +1089,7 @@ class ChannelImpl : if (m_connectionState != connectionState) { m_connectionState = connectionState; - + //bool connectionStatusToReport = (connectionState == CONNECTED); //if (connectionStatusToReport != lastReportedConnectionState) { @@ -1102,16 +1104,16 @@ class ChannelImpl : // noop } - + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { m_channelMutex.lock(); bool issueCreateMessage = m_issueCreateMessage; m_channelMutex.unlock(); - + if (issueCreateMessage) { control->startMessage((int8)7, 2+4); - + // count buffer->putShort((int16)1); // array of CIDs and names @@ -1150,9 +1152,9 @@ class ChannelImpl : { // TODO } - + /** - * Resubscribe subscriptions. + * Resubscribe subscriptions. */ // TODO to be called from non-transport thread !!!!!! void resubscribeSubscriptions() @@ -1161,7 +1163,7 @@ class ChannelImpl : } /** - * Update subscriptions. + * Update subscriptions. */ // TODO to be called from non-transport thread !!!!!! void updateSubscriptions() @@ -1202,7 +1204,7 @@ class ChannelImpl : // TODO return 0; } - + virtual ChannelRPC* createChannelRPC(ChannelRPCRequester *channelRPCRequester, epics::pvData::PVStructure *pvRequest) { @@ -1224,20 +1226,20 @@ class ChannelImpl : // TODO return 0; } - - - + + + virtual void printInfo() { String info; printInfo(&info); std::cout << info.c_str() << std::endl; } - + virtual void printInfo(epics::pvData::StringBuilder out) { //Lock lock(&m_channelMutex); //std::ostringstream ostr; //static String emptyString; - + out->append( "CHANNEL : "); out->append(m_name); out->append("\nSTATE : "); out->append(ConnectionStateNames[m_connectionState]); if (m_connectionState == CONNECTED) @@ -1248,72 +1250,72 @@ class ChannelImpl : out->append("\n"); } }; - + class ChannelProviderImpl; - + class ChannelImplFind : public ChannelFind { public: ChannelImplFind(ChannelProvider* provider) : m_provider(provider) { } - + virtual void destroy() { // one instance for all, do not delete at all } - + virtual ChannelProvider* getChannelProvider() { return m_provider; }; - + virtual void cancelChannelFind() { throw std::runtime_error("not supported"); } - + private: - + // only to be destroyed by it friend class ChannelProviderImpl; virtual ~ChannelImplFind() {} - - ChannelProvider* m_provider; + + ChannelProvider* m_provider; }; - + class ChannelProviderImpl : public ChannelProvider { public: - + ChannelProviderImpl(ClientContextImpl* context) : m_context(context) { } - + virtual epics::pvData::String getProviderName() { return "ChannelProviderImpl"; } - + virtual void destroy() { delete this; } - + virtual ChannelFind* channelFind( epics::pvData::String channelName, ChannelFindRequester *channelFindRequester) { m_context->checkChannelName(channelName); - + if (!channelFindRequester) throw std::runtime_error("null requester"); - + std::auto_ptr errorStatus(getStatusCreate()->createStatus(STATUSTYPE_ERROR, "not implemented", 0)); channelFindRequester->channelFindResult(errorStatus.get(), 0, false); return 0; } - + virtual Channel* createChannel( epics::pvData::String channelName, ChannelRequester *channelRequester, @@ -1321,7 +1323,7 @@ class ChannelImpl : { return createChannel(channelName, channelRequester, priority, emptyString); } - + virtual Channel* createChannel( epics::pvData::String channelName, ChannelRequester *channelRequester, @@ -1333,31 +1335,35 @@ class ChannelImpl : if (channel) channelRequester->channelCreated(getStatusCreate()->getStatusOK(), channel); return channel; - + // NOTE it's up to internal code to respond w/ error to requester and return 0 in case of errors } - + private: ~ChannelProviderImpl() {}; - + /* TODO static*/ String emptyString; ClientContextImpl* m_context; }; public: - - ClientContextImpl() : + + ClientContextImpl() : m_addressList(""), m_autoAddressList(true), m_connectionTimeout(30.0f), m_beaconPeriod(15.0f), m_broadcastPort(CA_BROADCAST_PORT), m_receiveBufferSize(MAX_TCP_RECV), m_timer(0), m_broadcastTransport(0), m_searchTransport(0), m_connector(0), m_transportRegistry(0), m_namedLocker(0), m_lastCID(0), m_lastIOID(0), m_channelSearchManager(0), m_version(new Version("CA Client", "cpp", 0, 0, 0, 1)), m_provider(new ChannelProviderImpl(this)), - m_contextState(CONTEXT_NOT_INITIALIZED) + m_contextState(CONTEXT_NOT_INITIALIZED), m_configuration(new SystemConfigurationImpl()) { loadConfiguration(); } - + + virtual Configuration* getConfiguration() { + return m_configuration; + } + virtual Version* getVersion() { return m_version; } @@ -1387,28 +1393,28 @@ class ChannelImpl : virtual void initialize() { Lock lock(&m_contextMutex); - + if (m_contextState == CONTEXT_DESTROYED) throw std::runtime_error("Context destroyed."); else if (m_contextState == CONTEXT_INITIALIZED) throw std::runtime_error("Context already initialized."); - + internalInitialize(); m_contextState = CONTEXT_INITIALIZED; } - + virtual void printInfo() { String info; printInfo(&info); std::cout << info.c_str() << std::endl; } - + virtual void printInfo(epics::pvData::StringBuilder out) { Lock lock(&m_contextMutex); std::ostringstream ostr; static String emptyString; - + out->append( "CLASS : ::epics::pvAccess::ClientContextImpl"); out->append("\nVERSION : "); out->append(m_version->getVersionString()); out->append("\nADDR_LIST : "); ostr << m_addressList; out->append(ostr.str()); ostr.str(emptyString); @@ -1434,7 +1440,7 @@ class ChannelImpl : } out->append("\n"); } - + virtual void destroy() { m_contextMutex.lock(); @@ -1444,21 +1450,21 @@ class ChannelImpl : m_contextMutex.unlock(); throw std::runtime_error("Context already destroyed."); } - - // go into destroyed state ASAP + + // go into destroyed state ASAP m_contextState = CONTEXT_DESTROYED; internalDestroy(); } - + virtual void dispose() { destroy(); - } - + } + private: ~ClientContextImpl() {}; - + void loadConfiguration() { // TODO /* @@ -1472,19 +1478,19 @@ class ChannelImpl : } void internalInitialize() { - + m_timer = new Timer("pvAccess-client timer", lowPriority); m_connector = new BlockingTCPConnector(this, m_receiveBufferSize, m_beaconPeriod); m_transportRegistry = new TransportRegistry(); m_namedLocker = new NamedLockPattern(); - + // setup UDP transport initializeUDPTransport(); // setup search manager m_channelSearchManager = new ChannelSearchManager(this); } - + /** * Initialized UDP transport (broadcast socket and repeater connection). */ @@ -1497,7 +1503,7 @@ class ChannelImpl : listenLocalAddress.ia.sin_family = AF_INET; listenLocalAddress.ia.sin_port = htons(m_broadcastPort); listenLocalAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY); - + // where to send address InetAddrVector* broadcastAddresses = getSocketAddressList("192.168.1.255", m_broadcastPort); // TODO getBroadcastAddresses(broadcastPort) @@ -1505,20 +1511,20 @@ class ChannelImpl : /// TOD !!!! addresses !!!!! by pointer and not copied BlockingUDPConnector* broadcastConnector = new BlockingUDPConnector(true, broadcastAddresses, true); - + m_broadcastTransport = (BlockingUDPTransport*)broadcastConnector->connect( 0, new ClientResponseHandler(this), listenLocalAddress, CA_MINOR_PROTOCOL_REVISION, CA_DEFAULT_PRIORITY); BlockingUDPConnector* searchConnector = new BlockingUDPConnector(false, broadcastAddresses, true); - + // undefined address osiSockAddr undefinedAddress; undefinedAddress.ia.sin_family = AF_INET; undefinedAddress.ia.sin_port = htons(0); undefinedAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY); - + m_searchTransport = (BlockingUDPTransport*)searchConnector->connect( 0, new ClientResponseHandler(this), undefinedAddress, CA_MINOR_PROTOCOL_REVISION, @@ -1531,7 +1537,7 @@ class ChannelImpl : InetAddrVector* appendList = 0; if (m_autoAddressList) appendList = m_broadcastTransport->getSendAddresses(); - + InetAddrVector* list = getSocketAddressList(m_addressList, m_broadcastPort, appendList); // TODO delete !!!! if (list && list->size()) { @@ -1549,40 +1555,41 @@ class ChannelImpl : // TODO } } - + void internalDestroy() { - + // stop searching if (m_channelSearchManager) delete m_channelSearchManager; //->destroy(); - + // stop timer - if (m_timer) + if (m_timer) delete m_timer; // // cleanup // - + // this will also close all CA transports destroyAllChannels(); - + // TODO destroy !!! if (m_broadcastTransport) delete m_broadcastTransport; //->destroy(true); if (m_searchTransport) delete m_searchTransport; //->destroy(true); - + if (m_namedLocker) delete m_namedLocker; if (m_transportRegistry) delete m_transportRegistry; if (m_connector) delete m_connector; + if (m_configuration) delete m_configuration; m_provider->destroy(); delete m_version; m_contextMutex.unlock(); delete this; } - + void destroyAllChannels() { // TODO } @@ -1637,18 +1644,18 @@ class ChannelImpl : ChannelImpl* getChannel(pvAccessID channelID) { Lock guard(&m_cidMapMutex); - CIDChannelMap::iterator it = m_channelsByCID.find(channelID); + CIDChannelMap::iterator it = m_channelsByCID.find(channelID); return (it == m_channelsByCID.end() ? 0 : it->second); } /** * Generate Client channel ID (CID). - * @return Client channel ID (CID). + * @return Client channel ID (CID). */ pvAccessID generateCID() { Lock guard(&m_cidMapMutex); - + // search first free (theoretically possible loop of death) while (m_channelsByCID.find(++m_lastCID) != m_channelsByCID.end()); // reserve CID @@ -1665,7 +1672,7 @@ class ChannelImpl : m_channelsByCID.erase(cid); } - + /** * Get, or create if necessary, transport of given server address. * @param serverAddress required transport address @@ -1684,11 +1691,11 @@ class ChannelImpl : { logger.log(Level.SEVERE, "Failed to create transport for: " + serverAddress, cex); } - */ + */ return 0; - + } - + /** * Internal create channel. */ @@ -1699,16 +1706,16 @@ class ChannelImpl : checkState(); checkChannelName(name); - + if (requester == 0) throw std::runtime_error("null requester"); - + if (priority < ChannelProvider::PRIORITY_MIN || priority > ChannelProvider::PRIORITY_MAX) throw std::range_error("priority out of bounds"); bool lockAcquired = true; // TODO namedLocker->acquireSynchronizationObject(name, LOCK_TIMEOUT); if (lockAcquired) - { + { try { pvAccessID cid = generateCID(); @@ -1718,10 +1725,10 @@ class ChannelImpl : // TODO return 0; } - // TODO namedLocker.releaseSynchronizationObject(name); + // TODO namedLocker.releaseSynchronizationObject(name); } else - { + { // TODO is this OK? throw std::runtime_error("Failed to obtain synchronization lock for '" + name + "', possible deadlock."); } @@ -1735,23 +1742,23 @@ class ChannelImpl : * @throws IllegalStateException */ void destroyChannel(ChannelImpl* channel, bool force) { - + String name = channel->getChannelName(); bool lockAcquired = true; //namedLocker->acquireSynchronizationObject(name, LOCK_TIMEOUT); if (lockAcquired) - { + { try - { + { channel->destroyChannel(force); } catch(...) { // TODO } - // TODO namedLocker->releaseSynchronizationObject(channel.getChannelName()); + // TODO namedLocker->releaseSynchronizationObject(channel.getChannelName()); } else - { - // TODO is this OK? + { + // TODO is this OK? throw std::runtime_error("Failed to obtain synchronization lock for '" + name + "', possible deadlock."); } } @@ -1763,15 +1770,15 @@ class ChannelImpl : ChannelSearchManager* getChannelSearchManager() { return m_channelSearchManager; } - + /** * A space-separated list of broadcast address for process variable name resolution. * Each address must be of the form: ip.number:port or host.name:port */ String m_addressList; - + /** - * Define whether or not the network interfaces should be discovered at runtime. + * Define whether or not the network interfaces should be discovered at runtime. */ bool m_autoAddressList; @@ -1782,22 +1789,22 @@ class ChannelImpl : * the server is no longer present on the network and disconnect. */ float m_connectionTimeout; - + /** * Period in second between two beacon signals. */ float m_beaconPeriod; - + /** * Broadcast (beacon, search) port number to listen to. */ int m_broadcastPort; - + /** * Receive buffer size (max size of payload). */ int m_receiveBufferSize; - + /** * Timer. */ @@ -1807,7 +1814,7 @@ class ChannelImpl : * Broadcast transport needed to listen for broadcasts. */ BlockingUDPTransport* m_broadcastTransport; - + /** * UDP transport needed for channel searches. */ @@ -1820,7 +1827,7 @@ class ChannelImpl : /** * CA transport (virtual circuit) registry. - * This registry contains all active transports - connections to CA servers. + * This registry contains all active transports - connections to CA servers. */ TransportRegistry* m_transportRegistry; @@ -1847,7 +1854,7 @@ class ChannelImpl : Mutex m_cidMapMutex; /** - * Last CID cache. + * Last CID cache. */ pvAccessID m_lastCID; @@ -1859,7 +1866,7 @@ class ChannelImpl : IOIDResponseRequestMap m_pendingResponseRequests; /** - * Last IOID cache. + * Last IOID cache. */ pvAccessID m_lastIOID; @@ -1875,7 +1882,7 @@ class ChannelImpl : // TODO consider std::unordered_map typedef std::map AddressBeaconHandlerMap; AddressBeaconHandlerMap m_beaconHandlers; - + /** * Version. */ @@ -1885,18 +1892,20 @@ class ChannelImpl : * Provider implementation. */ ChannelProviderImpl* m_provider; - + /** * Context state. */ ContextState m_contextState; - + /** * Context sync. mutex. */ Mutex m_contextMutex; friend class ChannelProviderImpl; + + Configuration* m_configuration; }; @@ -1905,7 +1914,7 @@ class ChannelFindRequesterImpl : public ChannelFindRequester virtual void channelFindResult(epics::pvData::Status *status,ChannelFind *channelFind,bool wasFound) { std::cout << "[ChannelFindRequesterImpl] channelFindResult(" - << status->toString() << ", ..., " << wasFound << ")" << std::endl; + << status->toString() << ", ..., " << wasFound << ")" << std::endl; } }; @@ -1915,10 +1924,10 @@ class ChannelRequesterImpl : public ChannelRequester { return "ChannelRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void channelCreated(epics::pvData::Status* status, Channel *channel) @@ -1926,7 +1935,7 @@ class ChannelRequesterImpl : public ChannelRequester std::cout << "channelCreated(" << status->toString() << ", " << (channel ? channel->getChannelName() : "(null)") << ")" << std::endl; } - + virtual void channelStateChange(Channel *c, ConnectionState connectionState) { std::cout << "channelStateChange(" << c->getChannelName() << ", " << ConnectionStateNames[connectionState] << ")" << std::endl; @@ -1939,10 +1948,10 @@ class GetFieldRequesterImpl : public GetFieldRequester { return "GetFieldRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void getDone(epics::pvData::Status *status,epics::pvData::FieldConstPtr field) @@ -1965,22 +1974,22 @@ class ChannelGetRequesterImpl : public ChannelGetRequester ChannelGet *m_channelGet; epics::pvData::PVStructure *m_pvStructure; epics::pvData::BitSet *m_bitSet; - + virtual String getRequesterName() { return "ChannelGetRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void channelGetConnect(epics::pvData::Status *status,ChannelGet *channelGet, epics::pvData::PVStructure *pvStructure,epics::pvData::BitSet *bitSet) { std::cout << "channelGetConnect(" << status->toString() << ")" << std::endl; - + // TODO sync m_channelGet = channelGet; m_pvStructure = pvStructure; @@ -2002,22 +2011,22 @@ class ChannelPutRequesterImpl : public ChannelPutRequester ChannelPut *m_channelPut; epics::pvData::PVStructure *m_pvStructure; epics::pvData::BitSet *m_bitSet; - + virtual String getRequesterName() { return "ChannelPutRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void channelPutConnect(epics::pvData::Status *status,ChannelPut *channelPut, epics::pvData::PVStructure *pvStructure,epics::pvData::BitSet *bitSet) { std::cout << "channelPutConnect(" << status->toString() << ")" << std::endl; - + // TODO sync m_channelPut = channelPut; m_pvStructure = pvStructure; @@ -2043,20 +2052,20 @@ class ChannelPutRequesterImpl : public ChannelPutRequester } }; - - + + class MonitorRequesterImpl : public MonitorRequester { virtual String getRequesterName() { return "MonitorRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } - + virtual void monitorConnect(Status* status, Monitor* monitor, Structure* structure) { std::cout << "monitorConnect(" << status->toString() << ")" << std::endl; @@ -2067,13 +2076,13 @@ class MonitorRequesterImpl : public MonitorRequester std::cout << str << std::endl; } } - + virtual void monitorEvent(Monitor* monitor) { std::cout << "monitorEvent" << std::endl; MonitorElement* element = monitor->poll(); - + String str("changed/overrun "); element->getChangedBitSet()->toString(&str); str += '/'; @@ -2081,35 +2090,35 @@ class MonitorRequesterImpl : public MonitorRequester str += '\n'; element->getPVStructure()->toString(&str); std::cout << str << std::endl; - + monitor->release(element); } - + virtual void unlisten(Monitor* monitor) { std::cout << "unlisten" << std::endl; } -}; +}; class ChannelProcessRequesterImpl : public ChannelProcessRequester { ChannelProcess *m_channelProcess; - + virtual String getRequesterName() { return "ProcessRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void channelProcessConnect(epics::pvData::Status *status,ChannelProcess *channelProcess) { std::cout << "channelProcessConnect(" << status->toString() << ")" << std::endl; - + // TODO sync m_channelProcess = channelProcess; } @@ -2126,33 +2135,33 @@ int main(int argc,char *argv[]) ClientContextImpl* context = new ClientContextImpl(); context->printInfo(); - context->initialize(); + context->initialize(); context->printInfo(); - + epicsThreadSleep ( 1.0 ); - + //ChannelFindRequesterImpl findRequester; //context->getProvider()->channelFind("something", &findRequester); - + ChannelRequesterImpl channelRequester; Channel* channel = context->getProvider()->createChannel("structureArrayTest", &channelRequester); channel->printInfo(); /* GetFieldRequesterImpl getFieldRequesterImpl; channel->getField(&getFieldRequesterImpl, "timeStamp.secondsPastEpoch"); - + ChannelGetRequesterImpl channelGetRequesterImpl; ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, 0); channelGet->get(false); channelGet->destroy(); - + ChannelPutRequesterImpl channelPutRequesterImpl; ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, 0); channelPut->get(); channelPut->put(false); channelPut->destroy(); - - + + MonitorRequesterImpl monitorRequesterImpl; Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, 0); @@ -2165,20 +2174,20 @@ int main(int argc,char *argv[]) ChannelProcess* channelProcess = channel->createChannelProcess(&channelProcessRequester, 0); channelProcess->process(false); channelProcess->destroy(); - + status = monitor->stop(); std::cout << "monitor->stop() = " << status->toString() << std::endl; delete status; - - + + monitor->destroy(); */ epicsThreadSleep ( 10.0 ); channel->destroy(); - + context->destroy(); - + std::cout << "-----------------------------------------------------------------------" << std::endl; getShowConstructDestruct()->constuctDestructTotals(stdout); return(0); From d7852fa666f60ec1bb72d99ca2b72437f9e0b1b2 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Mon, 10 Jan 2011 14:11:27 +0100 Subject: [PATCH 3/7] Moved AbstractResponseHandler definition to a more logical place. --- .../remote/abstractResponseHandler.cpp | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 pvAccessApp/remote/abstractResponseHandler.cpp diff --git a/pvAccessApp/remote/abstractResponseHandler.cpp b/pvAccessApp/remote/abstractResponseHandler.cpp new file mode 100644 index 0000000..b5399b0 --- /dev/null +++ b/pvAccessApp/remote/abstractResponseHandler.cpp @@ -0,0 +1,42 @@ +/* + * abstractResponseHandler.cpp + * + * Created on: Jan 10, 2011 + * Author: Miha Vitorovic + */ + +#include "remote.h" +#include "hexDump.h" + +#include + +#include + +#include + +using std::ostringstream; +using std::hex; + +using namespace epics::pvData; + +namespace epics { + namespace pvAccess { + + void AbstractResponseHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + if(_debug) { + char ipAddrStr[48]; + ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); + + ostringstream prologue; + prologue<<"Message [0x"<getArray(), + payloadBuffer->getPosition(), payloadSize); + } + } + } +} From 4e96a20766d5230757b7120573196145f5a60be5 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Mon, 10 Jan 2011 15:18:15 +0100 Subject: [PATCH 4/7] ArrayFIFO template specialization for pointers done. --- pvAccessApp/utils/arrayFIFO.h | 367 +++++++++++++++++++++++++++++++- testApp/utils/arrayFIFOTest.cpp | 136 +++++++++++- 2 files changed, 494 insertions(+), 9 deletions(-) diff --git a/pvAccessApp/utils/arrayFIFO.h b/pvAccessApp/utils/arrayFIFO.h index d449c56..c96e7c5 100644 --- a/pvAccessApp/utils/arrayFIFO.h +++ b/pvAccessApp/utils/arrayFIFO.h @@ -118,11 +118,11 @@ namespace epics { #ifdef ARRAY_FIFO_DEBUG void debugState() { //size_t mask = _size-1; - std::cout<<"h:"<<_head<<",t:"<<_tail<<",c:"<<_size; + std::cout<<"Simple, h:"<<_head<<",t:"<<_tail<<",c:"<<_size; std::cout<<",s:"< - int ArrayFIFO::MIN_INITIAL_CAPACITY = 8; + /* * * * * * * * * * * template implementation * * * * * * * * * * */ template void ArrayFIFO::arraycopy(T* src, size_t srcPos, T* dest, @@ -374,6 +370,361 @@ namespace epics { return false; } + /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + + /** + * Template specialization for pointers + */ + template + class ArrayFIFO { + public: + /** + * Constructs an empty array deque with an initial capacity + * sufficient to hold the specified number of elements. Array FIFO + * is designed to hold objects allocated on the heap. + * + * @param[in] numElements lower bound on initial capacity of the + * deque. Default value is 16 elements. + */ + ArrayFIFO(size_t numElements = 16); + ~ArrayFIFO(); + + /** + * Inserts the specified element at the front of this deque. + * + * @param[in] e the element to add. + */ + void addFirst(const T* e); + + /** + * Inserts the specified element at the end of this deque. + * @param[in] e the element to add + */ + void addLast(const T* e); + + T* pollFirst(); + T* pollLast(); + T* peekFirst(); + T* peekLast(); + + /** + * Pushes an element onto the stack represented by this deque. In other + * words, inserts the element at the front of this deque. + * + * @param[in] e the element to push + */ + void push(const T* e); + + /** + * Pops an element from the stack represented by this deque. In other + * words, removes and returns the first element of this deque. + * + * @return the element at the front of this deque (which is the top + * of the stack represented by this deque), null if no element available. + */ + T* pop(); + + /** + * Looks at the object at the top of this stack without removing it + * from the stack. + * @return the object at the top of this stack (the last item + * of the Vector object). + */ + T* peek(); + + /** + * Returns the number of elements in this deque. + * @return the number of elements in this deque + */ + size_t size(); + + /** + * Returns true if this deque contains no elements. + * + * @return true if this deque contains no elements + */ + bool isEmpty(); + + /** + * Removes all of the elements from this deque. + * The deque will be empty after this call returns. + * + * @param freeElements tells the methods to automatically free + * the memory of all the elments in the FIFO. Default is {@code true} + */ + void clear(); + + /** + * Removes the first occurrence of the specified element in this + * deque (when traversing the deque from head to tail). + * If the deque does not contain the element, it is unchanged. + * More formally, removes the first element e such that + * o.equals(e) (if such an element exists). + * Returns true if this deque contained the specified element + * (or equivalently, if this deque changed as a result of the call). + * + * @param o element to be removed from this deque, if present + * @return true if the deque contained the specified element + */ + bool remove(const T* e); + +#ifdef ARRAY_FIFO_DEBUG + void debugState() { + //size_t mask = _size-1; + std::cout<<"Pointer, h:"<<_head<<",t:"<<_tail<<",c:"<<_size; + std::cout<<",s:"<This method is called delete rather than remove to emphasize + * that its semantics differ from those of {@link List#remove(int)}. + * + * @return true if elements moved backwards + */ + bool del(const size_t i); + + }; + + /* * * * * * * * * * * template implementation * * * * * * * * * * */ + + template + void ArrayFIFO::arraycopy(T** src, size_t srcPos, T** dest, + size_t destPos, size_t length) { + if(srcPos=0; i--) + dest[destPos+i] = src[srcPos+i]; + else + for(size_t i = 0; i + void ArrayFIFO::allocateElements(size_t numElements) { + _size = MIN_INITIAL_CAPACITY; + // Find the best power of two to hold elements. + // Tests "<=" because arrays aren't kept full. + if(numElements>=_size) { + _size = numElements; + _size |= (_size>>1); + _size |= (_size>>2); + _size |= (_size>>4); + _size |= (_size>>8); + _size |= (_size>>16); + _size++; + } + _elements = new T*[_size]; + } + + template + void ArrayFIFO::doubleCapacity() { + size_t p = _head; + size_t n = _size; + size_t r = n-p; // number of elements to the right of p + size_t newCapacity = n<<1; + T** a = new T*[newCapacity]; + arraycopy(_elements, p, a, 0, r); + arraycopy(_elements, 0, a, r, p); + delete[] _elements; + _elements = a; + _size = newCapacity; + _head = 0; + _tail = n; + + } + + template + ArrayFIFO::ArrayFIFO(size_t numElements) : + _head(0), _tail(0), _mutex() { + allocateElements(numElements); + } + + template + ArrayFIFO::~ArrayFIFO() { + delete[] _elements; + } + + template + void ArrayFIFO::addFirst(const T* e) { + Lock lock(&_mutex); + + _elements[_head = (_head-1)&(_size-1)] = const_cast(e); + if(_head==_tail) doubleCapacity(); + } + + template + void ArrayFIFO::addLast(const T* e) { + Lock lock(&_mutex); + + _elements[_tail] = const_cast(e); + if((_tail = (_tail+1)&(_size-1))==_head) doubleCapacity(); + } + + template + T* ArrayFIFO::pollFirst() { + Lock lock(&_mutex); + + if(isEmpty()) return NULL; + + T* result = _elements[_head]; // Element is null if deque empty + _head = (_head+1)&(_size-1); + return result; + } + + template + T* ArrayFIFO::pollLast() { + Lock lock(&_mutex); + + if(isEmpty()) return NULL; + + _tail = (_tail-1)&(_size-1); + return _elements[_tail]; + } + + template + T* ArrayFIFO::peekFirst() { + Lock lock(&_mutex); + + if(isEmpty()) return NULL; + + return _elements[_head]; + } + + template + T* ArrayFIFO::peekLast() { + Lock lock(&_mutex); + + if(isEmpty()) return NULL; + + return _elements[(_tail-1)&(_size-1)]; + } + + template + void ArrayFIFO::push(const T* e) { + addLast(e); + } + + template + T* ArrayFIFO::pop() { + return pollLast(); + } + + template + T* ArrayFIFO::peek() { + return peekFirst(); + } + + template + size_t ArrayFIFO::size() { + Lock lock(&_mutex); + + return (_tail-_head)&(_size-1); + } + + template + bool ArrayFIFO::isEmpty() { + Lock lock(&_mutex); + + return _head==_tail; + } + + template + void ArrayFIFO::clear() { + Lock lock(&_mutex); + + _head = _tail = 0; + } + + template + bool ArrayFIFO::del(const size_t i) { + // i is absolute index in the array + size_t mask = _size-1; + size_t h = _head; + size_t t = _tail; + size_t front = (i-h)&mask; + size_t back = (t-i)&mask; + + // Invariant: head <= i < tail mod circularity + if(front>=((t-h)&mask)) THROW_BASE_EXCEPTION( + "Illegal State Exception"); // concurrency problem!!! + + // Optimize for least element motion + if(front0) _elements[0] = _elements[mask]; + arraycopy(_elements, h, _elements, h+1, mask-h); + } + _head = (h+1)&mask; + + return false; + } + else { + if(i + bool ArrayFIFO::remove(const T* e) { + Lock lock(&_mutex); + + if(isEmpty()) return false; // nothing to do + + size_t mask = _size-1; + size_t i = _head; + while(i!=_tail) { + if(e==_elements[i]) { + del(i); + return true; + } + i = (i+1)&mask; + } + return false; + } + } } diff --git a/testApp/utils/arrayFIFOTest.cpp b/testApp/utils/arrayFIFOTest.cpp index 61cbfc7..88617e1 100644 --- a/testApp/utils/arrayFIFOTest.cpp +++ b/testApp/utils/arrayFIFOTest.cpp @@ -15,7 +15,9 @@ using namespace epics::pvAccess; using std::cout; using std::endl; -int main(int argc, char *argv[]) { +void testSimpleType() { + cout<<"\nTests for simple type template."< fifoInt; assert(fifoInt.size()==0); @@ -133,5 +135,137 @@ int main(int argc, char *argv[]) { assert(fifoInt.isEmpty()); cout<<"\nPASSED!\n"; +} + +void testPointerType() { + cout<<"\nTests for pointer type template."< fifoInt; + + assert(fifoInt.size()==0); + assert(fifoInt.isEmpty()); + + cout<<"Testing clear."< Date: Mon, 10 Jan 2011 15:37:25 +0100 Subject: [PATCH 5/7] GrowingCircularBuffer template specialization for pointers done. --- pvAccessApp/utils/growingCircularBuffer.h | 166 +++++++++++++++++--- testApp/utils/arrayFIFOTest.cpp | 138 ++++++++-------- testApp/utils/growingCircularBufferTest.cpp | 49 +++++- 3 files changed, 261 insertions(+), 92 deletions(-) diff --git a/pvAccessApp/utils/growingCircularBuffer.h b/pvAccessApp/utils/growingCircularBuffer.h index 6a993a1..63bb242 100644 --- a/pvAccessApp/utils/growingCircularBuffer.h +++ b/pvAccessApp/utils/growingCircularBuffer.h @@ -28,8 +28,8 @@ namespace epics { * Create a GrowingCircularBuffer with the given capacity. **/ GrowingCircularBuffer(size_t capacity = 16) : - _elements(new T[capacity]), _takePointer(0), _putPointer(0), _count(0), _size(capacity) - { + _elements(new T[capacity]), _takePointer(0), _putPointer(0), + _count(0), _size(capacity) { } ~GrowingCircularBuffer() { @@ -94,13 +94,9 @@ namespace epics { size_t length); }; - /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * - * g++ requires template definition inside a header file. - * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ - template - void GrowingCircularBuffer::arraycopy(T* src, size_t srcPos, T* dest, - size_t destPos, size_t length) { + void GrowingCircularBuffer::arraycopy(T* src, size_t srcPos, + T* dest, size_t destPos, size_t length) { if(srcPos=0; i--) dest[destPos+i] = src[srcPos+i]; @@ -111,17 +107,16 @@ namespace epics { template bool GrowingCircularBuffer::insert(const T x) { - if (_count == _size) - { + if(_count==_size) { // we are full, grow by factor 2 - T* newElements = new T[_size * 2]; + T* newElements = new T[_size*2]; // invariant: _takePointer < _size - size_t split = _size - _takePointer; - if (split > 0) - arraycopy(_elements, _takePointer, newElements, 0, split); - if (_takePointer != 0) - arraycopy(_elements, 0, newElements, split, _putPointer); + size_t split = _size-_takePointer; + if(split>0) arraycopy(_elements, _takePointer, newElements, 0, + split); + if(_takePointer!=0) arraycopy(_elements, 0, newElements, split, + _putPointer); _takePointer = 0; _putPointer = _size; @@ -132,8 +127,8 @@ namespace epics { _count++; _elements[_putPointer] = x; - if (++_putPointer >= _size) _putPointer = 0; - return _count == 1; + if(++_putPointer>=_size) _putPointer = 0; + return _count==1; } template @@ -146,6 +141,141 @@ namespace epics { return old; } + /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + + /** + * Template specialization for pointers. + * Implementation of circular FIFO unbouded buffer. + * Instance is not thread safe. + * @author Miha Vitorovic + */ + template + class GrowingCircularBuffer { + public: + + /** + * Create a GrowingCircularBuffer with the given capacity. + **/ + GrowingCircularBuffer(size_t capacity = 16) : + _elements(new T*[capacity]), _takePointer(0), _putPointer(0), + _count(0), _size(capacity) { + } + + ~GrowingCircularBuffer() { + delete[] _elements; + } + + /** + * Get number of elements in the buffer. + * @return number of elements in the buffer. + */ + inline size_t size() { + return _count; + } + + /** + * Get current buffer capacity. + * @return buffer current capacity. + */ + inline size_t capacity() { + return _size; + } + + /** + * Insert a new element in to the buffer. + * If buffer full the buffer is doubled. + * + * @param x element to insert. + * @return true if first element. + */ + bool insert(const T* x); + + /** + * Extract the oldest element from the buffer. + * @return the oldest element from the buffer. + */ + T* extract(); + + private: + /** + * Array (circular buffer) of elements. + */ + T** _elements; + + /** + * Take (read) pointer. + */ + size_t _takePointer; + + /** + * Put (write) pointer. + */ + size_t _putPointer; + + /** + * Number of elements in the buffer. + */ + size_t _count; + + size_t _size; + + void arraycopy(T** src, size_t srcPos, T** dest, size_t destPos, + size_t length); + }; + + /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * g++ requires template definition inside a header file. + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + + template + void GrowingCircularBuffer::arraycopy(T** src, size_t srcPos, + T** dest, size_t destPos, size_t length) { + if(srcPos=0; i--) + dest[destPos+i] = src[srcPos+i]; + else + for(size_t i = 0; i + bool GrowingCircularBuffer::insert(const T* x) { + if(_count==_size) { + // we are full, grow by factor 2 + T** newElements = new T*[_size*2]; + + // invariant: _takePointer < _size + size_t split = _size-_takePointer; + if(split>0) arraycopy(_elements, _takePointer, newElements, 0, + split); + if(_takePointer!=0) arraycopy(_elements, 0, newElements, split, + _putPointer); + + _takePointer = 0; + _putPointer = _size; + _size *= 2; + delete[] _elements; + _elements = newElements; + } + _count++; + + _elements[_putPointer] = const_cast(x); + if(++_putPointer>=_size) _putPointer = 0; + return _count==1; + } + + template + T* GrowingCircularBuffer::extract() { + if(_count==0) return NULL; + + _count--; + T* old = _elements[_takePointer]; + if(++_takePointer>=_size) _takePointer = 0; + return old; + } + } } #endif /* GROWINGCIRCULARBUFFER_H_ */ diff --git a/testApp/utils/arrayFIFOTest.cpp b/testApp/utils/arrayFIFOTest.cpp index 88617e1..966d2c1 100644 --- a/testApp/utils/arrayFIFOTest.cpp +++ b/testApp/utils/arrayFIFOTest.cpp @@ -140,7 +140,7 @@ void testSimpleType() { void testPointerType() { cout<<"\nTests for pointer type template."< fifoInt; @@ -148,110 +148,110 @@ void testPointerType() { assert(fifoInt.isEmpty()); cout<<"Testing clear."< cb(CAPACITY); - cout<<"Testing circular buffer."< cb(CAPACITY); + int testVals[] = {0,1,2,3,4,5,6,7,8,9,11,12,13,14,15,16,17,18,19,20}; + + cout<<"Testing circular buffer pointer type."< Date: Mon, 10 Jan 2011 15:51:35 +0100 Subject: [PATCH 6/7] - Connecting to invalid server (port) now generates an error. - Removed usage 'ipAddrToA' from the code. Replaced with 'ipAddrToDottedIP'. --- pvAccessApp/remote/abstractResponseHandler.cpp | 2 +- .../remote/blockingClientTCPTransport.cpp | 6 +++--- .../remote/blockingServerTCPTransport.cpp | 2 +- pvAccessApp/remote/blockingTCPAcceptor.cpp | 10 ++++++---- pvAccessApp/remote/blockingTCPConnector.cpp | 17 ++++++++++++++--- pvAccessApp/server/responseHandlers.cpp | 2 +- testApp/remote/testRemoteClientImpl.cpp | 3 ++- 7 files changed, 28 insertions(+), 14 deletions(-) diff --git a/pvAccessApp/remote/abstractResponseHandler.cpp b/pvAccessApp/remote/abstractResponseHandler.cpp index b5399b0..2d5f0b3 100644 --- a/pvAccessApp/remote/abstractResponseHandler.cpp +++ b/pvAccessApp/remote/abstractResponseHandler.cpp @@ -27,7 +27,7 @@ namespace epics { int payloadSize, ByteBuffer* payloadBuffer) { if(_debug) { char ipAddrStr[48]; - ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); ostringstream prologue; prologue<<"Message [0x"<ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Acquiring transport to %s.", ipAddrStr); _ownersMutex->lock(); @@ -127,7 +127,7 @@ namespace epics { int refs = _owners->size(); if(refs>0) { char ipAddrStr[48]; - ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf( errlogInfo, "Transport to %s still has %d client(s) active and closing...", @@ -145,7 +145,7 @@ namespace epics { if(_closed) return; char ipAddrStr[48]; - ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Releasing transport to %s.", ipAddrStr); diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp index 9048920..042a2b8 100644 --- a/pvAccessApp/remote/blockingServerTCPTransport.cpp +++ b/pvAccessApp/remote/blockingServerTCPTransport.cpp @@ -52,7 +52,7 @@ namespace epics { if(_channels->size()==0) return; char ipAddrStr[64]; - ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf( errlogInfo, diff --git a/pvAccessApp/remote/blockingTCPAcceptor.cpp b/pvAccessApp/remote/blockingTCPAcceptor.cpp index c5bfcb7..10c5f21 100644 --- a/pvAccessApp/remote/blockingTCPAcceptor.cpp +++ b/pvAccessApp/remote/blockingTCPAcceptor.cpp @@ -51,7 +51,7 @@ namespace epics { char strBuffer[64]; char ipAddrStr[48]; - ipAddrToA(&_bindAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_bindAddress->ia, ipAddrStr, sizeof(ipAddrStr)); int tryCount = 0; while(tryCount<2) { @@ -150,7 +150,7 @@ namespace epics { void BlockingTCPAcceptor::handleEvents() { // rise level if port is assigned dynamically char ipAddrStr[48]; - ipAddrToA(&_bindAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_bindAddress->ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Accepting connections at %s.", ipAddrStr); @@ -183,7 +183,8 @@ namespace epics { _serverSocketChannel, &address.sa, &len); if(newClient!=INVALID_SOCKET) { // accept succeeded - ipAddrToA(&address.ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&address.ia, ipAddrStr, + sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Accepted connection from CA client: %s", ipAddrStr); @@ -273,7 +274,8 @@ namespace epics { if(_serverSocketChannel!=INVALID_SOCKET) { char ipAddrStr[48]; - ipAddrToA(&_bindAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_bindAddress->ia, ipAddrStr, + sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Stopped accepting connections at %s.", ipAddrStr); diff --git a/pvAccessApp/remote/blockingTCPConnector.cpp b/pvAccessApp/remote/blockingTCPConnector.cpp index 4a066b6..5c3b021 100644 --- a/pvAccessApp/remote/blockingTCPConnector.cpp +++ b/pvAccessApp/remote/blockingTCPConnector.cpp @@ -38,7 +38,7 @@ namespace epics { if(tryCount>0) epicsThreadSleep(0.1); char strBuffer[64]; - ipAddrToA(&address.ia, strBuffer, sizeof(strBuffer)); + ipAddrToDottedIP(&address.ia, strBuffer, sizeof(strBuffer)); errlogSevPrintf(errlogInfo, "Opening socket to CA server %s, attempt %d.", @@ -73,7 +73,7 @@ namespace epics { SOCKET socket = INVALID_SOCKET; char ipAddrStr[64]; - ipAddrToA(&address.ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&address.ia, ipAddrStr, sizeof(ipAddrStr)); // first try to check cache w/o named lock... BlockingClientTCPTransport @@ -106,6 +106,17 @@ namespace epics { ipAddrStr); socket = tryConnect(address, 3); + // verify + if(socket==INVALID_SOCKET) { + errlogSevPrintf( + errlogMajor, + "Connection to CA server %s failed.", + ipAddrStr); + ostringstream temp; + temp<<"Failed to verify TCP connection to '"<waitUntilVerified(3.0)) { transport->close(true); errlogSevPrintf( - errlogInfo, + errlogMinor, "Connection to CA server %s failed to be validated, closing it.", ipAddrStr); ostringstream temp; diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index c0ab2ae..fe57284 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -31,7 +31,7 @@ namespace epics { transport, version, command, payloadSize, payloadBuffer); char ipAddrStr[48]; - ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Undecipherable message (bad response type %d) from %s.", diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 987006d..8ab748b 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -410,7 +410,8 @@ class ClientContextImpl; { char ipAddrStr[48]; std::cout << "ole" << std::endl; - ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&responseFrom->ia, ipAddrStr, + sizeof(ipAddrStr)); std::cout << "ole2" << std::endl; ostringstream prologue; From 487882dbff727bb1d5bb72fa8552aefc8124c1b5 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Mon, 10 Jan 2011 16:13:23 +0100 Subject: [PATCH 7/7] - 'getBroadcastAddresses' now accepts a port as parameter which in then used in all returned broadcast addresses. - 'getBroadcastAddresses' now returns a default broadcast address (255.255.255.255) on failure, if no other broadcast address was found. --- pvAccessApp/utils/inetAddressUtil.cpp | 16 +++++++++++++++- pvAccessApp/utils/inetAddressUtil.h | 8 ++++++-- testApp/utils/inetAddressUtilsTest.cpp | 4 ++-- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/pvAccessApp/utils/inetAddressUtil.cpp b/pvAccessApp/utils/inetAddressUtil.cpp index 7eeb41c..baded12 100644 --- a/pvAccessApp/utils/inetAddressUtil.cpp +++ b/pvAccessApp/utils/inetAddressUtil.cpp @@ -32,10 +32,19 @@ using namespace epics::pvData; namespace epics { namespace pvAccess { + void addDefaultBroadcastAddress(InetAddrVector* v, in_port_t p) { + osiSockAddr* pNewNode = new osiSockAddr; + pNewNode->ia.sin_family = AF_INET; + pNewNode->ia.sin_addr.s_addr = htonl(INADDR_BROADCAST); + pNewNode->ia.sin_port = htons(p); + v->push_back(pNewNode); + } + /* port of osiSockDiscoverBroadcastAddresses() in * epics/base/src/libCom/osi/os/default/osdNetIntf.c */ - InetAddrVector* getBroadcastAddresses(SOCKET sock) { + InetAddrVector* getBroadcastAddresses(SOCKET sock, + in_port_t defaultPort) { static const unsigned nelem = 100; int status; struct ifconf ifconf; @@ -56,6 +65,7 @@ namespace epics { if(!pIfreqList) { errlogSevPrintf(errlogMajor, "getBroadcastAddresses(): no memory to complete request"); + addDefaultBroadcastAddress(retVector, defaultPort); return retVector; } @@ -67,6 +77,7 @@ namespace epics { errlogSevPrintf(errlogMinor, "getBroadcastAddresses(): unable to fetch network interface configuration"); delete[] pIfreqList; + addDefaultBroadcastAddress(retVector, defaultPort); return retVector; } @@ -117,6 +128,8 @@ namespace epics { errlogSevPrintf(errlogMajor, "getBroadcastAddresses(): no memory available for configuration"); delete[] pIfreqList; + if(retVector->size()==0) addDefaultBroadcastAddress( + retVector, defaultPort); return retVector; } @@ -168,6 +181,7 @@ namespace epics { delete pNewNode; continue; } + pNewNode->ia.sin_port = htons(defaultPort); retVector->push_back(pNewNode); } diff --git a/pvAccessApp/utils/inetAddressUtil.h b/pvAccessApp/utils/inetAddressUtil.h index ee7f381..e8ee082 100644 --- a/pvAccessApp/utils/inetAddressUtil.h +++ b/pvAccessApp/utils/inetAddressUtil.h @@ -33,8 +33,12 @@ namespace epics { /** * returns a vector containing all the IPv4 broadcast addresses * on this machine. IPv6 doesn't have a local broadcast address. - */ - InetAddrVector* getBroadcastAddresses(SOCKET sock); + * Conversion of the defaultPort to network byte order performed by + * the function. + * TODO: Windows implementation of the function. + */ + InetAddrVector* getBroadcastAddresses(SOCKET sock, + in_port_t defaultPort); /** * Encode IPv4 address as IPv6 address. diff --git a/testApp/utils/inetAddressUtilsTest.cpp b/testApp/utils/inetAddressUtilsTest.cpp index 8594254..3ceae49 100644 --- a/testApp/utils/inetAddressUtilsTest.cpp +++ b/testApp/utils/inetAddressUtilsTest.cpp @@ -129,11 +129,11 @@ int main(int argc, char *argv[]) { cout<<"\nPASSED!\n"; SOCKET socket = epicsSocketCreate(AF_INET, SOCK_STREAM, IPPROTO_TCP); - InetAddrVector* broadcasts = getBroadcastAddresses(socket); + InetAddrVector* broadcasts = getBroadcastAddresses(socket,6678); cout<<"Broadcast addresses: "<size()<size(); i++) { cout<<"Broadcast address: "; - cout<at(i), false)<at(i))<