From f1f7ad27098fb99f442d8ed32245afd945488561 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Sun, 9 Jan 2011 22:41:05 +0100 Subject: [PATCH 01/10] Context, ChannelSearchManager cyc. dep. resolved --- .../remote/blockingServerTCPTransport.cpp | 14 +- pvAccessApp/remote/blockingTCP.h | 14 +- pvAccessApp/remote/channelSearchManager.cpp | 8 +- pvAccessApp/remote/channelSearchManager.h | 212 +----------------- pvAccessApp/remote/remote.h | 101 ++++++++- testApp/remote/testBlockingTCPClnt.cpp | 2 + testApp/remote/testBlockingTCPSrv.cpp | 3 + testApp/remote/testChannelSearchManager.cpp | 6 +- testApp/remote/testRemoteClientImpl.cpp | 144 +----------- 9 files changed, 124 insertions(+), 380 deletions(-) diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp index b051e2c..9048920 100644 --- a/pvAccessApp/remote/blockingServerTCPTransport.cpp +++ b/pvAccessApp/remote/blockingServerTCPTransport.cpp @@ -59,7 +59,7 @@ namespace epics { "Transport to %s still has %d channel(s) active and closing...", ipAddrStr, _channels->size()); - map::iterator it = _channels->begin(); + map::iterator it = _channels->begin(); for(; it!=_channels->end(); it++) it->second->destroy(); @@ -71,30 +71,30 @@ namespace epics { destroyAllChannels(); } - int BlockingServerTCPTransport::preallocateChannelSID() { + pvAccessID BlockingServerTCPTransport::preallocateChannelSID() { Lock lock(_channelsMutex); // search first free (theoretically possible loop of death) - int sid = ++_lastChannelSID; + pvAccessID sid = ++_lastChannelSID; while(_channels->find(sid)!=_channels->end()) sid = ++_lastChannelSID; return sid; } - void BlockingServerTCPTransport::registerChannel(int sid, + void BlockingServerTCPTransport::registerChannel(pvAccessID sid, ServerChannel* channel) { Lock lock(_channelsMutex); (*_channels)[sid] = channel; } - void BlockingServerTCPTransport::unregisterChannel(int sid) { + void BlockingServerTCPTransport::unregisterChannel(pvAccessID sid) { Lock lock(_channelsMutex); _channels->erase(sid); } - ServerChannel* BlockingServerTCPTransport::getChannel(int sid) { + ServerChannel* BlockingServerTCPTransport::getChannel(pvAccessID sid) { Lock lock(_channelsMutex); - map::iterator it = _channels->find(sid); + map::iterator it = _channels->find(sid); if(it!=_channels->end()) return it->second; return NULL; diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index d7302e3..b017341 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -547,13 +547,13 @@ namespace epics { * Preallocate new channel SID. * @return new channel server id (SID). */ - virtual int preallocateChannelSID(); + virtual pvAccessID preallocateChannelSID(); /** * De-preallocate new channel SID. * @param sid preallocated channel SID. */ - virtual void depreallocateChannelSID(int sid) { + virtual void depreallocateChannelSID(pvAccessID sid) { // noop } @@ -562,20 +562,20 @@ namespace epics { * @param sid preallocated channel SID. * @param channel channel to register. */ - virtual void registerChannel(int sid, ServerChannel* channel); + virtual void registerChannel(pvAccessID sid, ServerChannel* channel); /** * Unregister a new channel (and deallocates its handle). * @param sid SID */ - virtual void unregisterChannel(int sid); + virtual void unregisterChannel(pvAccessID sid); /** * Get channel by its SID. * @param sid channel SID * @return channel with given SID, NULL otherwise */ - virtual ServerChannel* getChannel(int sid); + virtual ServerChannel* getChannel(pvAccessID sid); /** * Get channel count. @@ -640,12 +640,12 @@ namespace epics { /** * Last SID cache. */ - volatile int _lastChannelSID; + volatile pvAccessID _lastChannelSID; /** * Channel table (SID -> channel mapping). */ - std::map* _channels; + std::map* _channels; Mutex* _channelsMutex; diff --git a/pvAccessApp/remote/channelSearchManager.cpp b/pvAccessApp/remote/channelSearchManager.cpp index 86aa3e1..7463c3d 100644 --- a/pvAccessApp/remote/channelSearchManager.cpp +++ b/pvAccessApp/remote/channelSearchManager.cpp @@ -408,7 +408,7 @@ const int64 ChannelSearchManager::MAX_SEARCH_PERIOD_LOWER_LIMIT = 60000; const int64 ChannelSearchManager::BEACON_ANOMALY_SEARCH_PERIOD = 5000; const int32 ChannelSearchManager::MAX_TIMERS = 18; -ChannelSearchManager::ChannelSearchManager(ClientContextImpl* context): +ChannelSearchManager::ChannelSearchManager(Context* context): _context(context), _canceled(false), _rttmean(MIN_RTT), @@ -435,7 +435,7 @@ ChannelSearchManager::ChannelSearchManager(ClientContextImpl* context): // create timers _timers = new SearchTimer*[numberOfTimers]; - for(int32 i = 0; i < numberOfTimers; i++) + for(int i = 0; i < numberOfTimers; i++) { _timers[i] = new SearchTimer(this, i, i > _beaconAnomalyTimerIndex, i != (numberOfTimers-1)); } @@ -446,7 +446,7 @@ ChannelSearchManager::ChannelSearchManager(ClientContextImpl* context): ChannelSearchManager::~ChannelSearchManager() { - for(int32 i = 0; i < _numberOfTimers; i++) + for(int i = 0; i < _numberOfTimers; i++) { if(_timers[i]) delete _timers[i]; } @@ -569,7 +569,7 @@ void ChannelSearchManager::flushSendBuffer() TimeStamp now; now.getCurrent(); _timeAtLastSend = now.getMilliseconds(); - _context->getSearchTransport()->send(_sendBuffer); + ((BlockingUDPTransport*)_context->getSearchTransport())->send(_sendBuffer); initializeSendBuffer(); } diff --git a/pvAccessApp/remote/channelSearchManager.h b/pvAccessApp/remote/channelSearchManager.h index cb5f3c3..62f208b 100644 --- a/pvAccessApp/remote/channelSearchManager.h +++ b/pvAccessApp/remote/channelSearchManager.h @@ -24,214 +24,6 @@ using namespace epics::pvData; namespace epics { namespace pvAccess { -typedef int32 pvAccessID; - -enum QoS { - /** - * Default behavior. - */ - DEFAULT = 0x00, - /** - * Require reply (acknowledgment for reliable operation). - */ - REPLY_REQUIRED = 0x01, - /** - * Best-effort option (no reply). - */ - BESY_EFFORT = 0x02, - /** - * Process option. - */ - PROCESS = 0x04, - /** - * Initialize option. - */ - INIT = 0x08, - /** - * Destroy option. - */ - DESTROY = 0x10, - /** - * Share data option. - */ - SHARE = 0x20, - /** - * Get. - */ - GET = 0x40, - /** - * Get-put. - */ - GET_PUT =0x80 -}; - - -//TODO this will be deleted -class ChannelImpl; -class ChannelSearchManager; -class ClientContextImpl : public ClientContext -{ - public: - - ClientContextImpl() - { - - } - - virtual Version* getVersion() { - return NULL; - } - - virtual ChannelProvider* getProvider() { - return NULL; - } - - Timer* getTimer() - { - return NULL; - } - - virtual void initialize() { - - } - - virtual void printInfo() { - - } - - virtual void printInfo(epics::pvData::StringBuilder out) { - - } - - virtual void destroy() - { - - } - - virtual void dispose() - { - - } - - BlockingUDPTransport* getSearchTransport() - { - return NULL; - } - - /** - * Searches for a channel with given channel ID. - * @param channelID CID. - * @return channel with given CID, 0 if non-existent. - */ - ChannelImpl* getChannel(pvAccessID channelID) - { - return NULL; - } - - - private: - ~ClientContextImpl() {}; - - void loadConfiguration() { - - } - - void internalInitialize() { - - - } - - void initializeUDPTransport() { - - } - - void internalDestroy() { - - } - - void destroyAllChannels() { - - } - - /** - * Check channel name. - */ - void checkChannelName(String& name) { - - } - - /** - * Check context state and tries to establish necessary state. - */ - void checkState() { - - } - - - - /** - * Generate Client channel ID (CID). - * @return Client channel ID (CID). - */ - pvAccessID generateCID() - { - return 0; - } - - /** - * Free generated channel ID (CID). - */ - void freeCID(int cid) - { - - } - - - /** - * Get, or create if necessary, transport of given server address. - * @param serverAddress required transport address - * @param priority process priority. - * @return transport for given address - */ - Transport* getTransport(TransportClient* client, osiSockAddr* serverAddress, int16 minorRevision, int16 priority) - { - - return NULL; - } - - /** - * Internal create channel. - */ - // TODO no minor version with the addresses - // TODO what if there is an channel with the same name, but on different host! - Channel* createChannelInternal(String name, ChannelRequester* requester, short priority, - InetAddrVector* addresses) { - return NULL; - } - - /** - * Destroy channel. - * @param channel - * @param force - * @throws CAException - * @throws IllegalStateException - */ - void destroyChannel(ChannelImpl* channel, bool force) { - - - } - - /** - * Get channel search manager. - * @return channel search manager. - */ - ChannelSearchManager* getChannelSearchManager() { - return NULL; - } -}; - - - //TODO check the const of paramerers /** @@ -432,7 +224,7 @@ public: * Constructor. * @param context */ - ChannelSearchManager(ClientContextImpl* context); + ChannelSearchManager(Context* context); /** * Constructor. * @param context @@ -486,7 +278,7 @@ private: /** * Context. */ - ClientContextImpl* _context; + Context* _context; /** * Canceled flag. */ diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index 46f7a12..9a8521e 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -31,6 +31,47 @@ namespace epics { TCP, UDP, SSL }; + enum QoS { + /** + * Default behavior. + */ + DEFAULT = 0x00, + /** + * Require reply (acknowledgment for reliable operation). + */ + REPLY_REQUIRED = 0x01, + /** + * Best-effort option (no reply). + */ + BESY_EFFORT = 0x02, + /** + * Process option. + */ + PROCESS = 0x04, + /** + * Initialize option. + */ + INIT = 0x08, + /** + * Destroy option. + */ + DESTROY = 0x10, + /** + * Share data option. + */ + SHARE = 0x20, + /** + * Get. + */ + GET = 0x40, + /** + * Get-put. + */ + GET_PUT =0x80 + }; + + typedef int32 pvAccessID; + enum MessageCommands { CMD_BEACON = 0, CMD_CONNECTION_VALIDATION = 1, CMD_ECHO = 2, CMD_SEARCH = 3, CMD_INTROSPECTION_SEARCH = 5, @@ -331,6 +372,11 @@ namespace epics { }; + class Channel; + + /** + * Not public IF, used by Transports, etc. + */ class Context { public: virtual ~Context() { @@ -346,6 +392,11 @@ namespace epics { * @return transport (virtual circuit) registry. */ virtual TransportRegistry* getTransportRegistry() =0; + + virtual Channel* getChannel(pvAccessID id) = 0; + + virtual Transport* getSearchTransport() = 0; + }; /** @@ -380,7 +431,7 @@ namespace epics { * Get channel SID. * @return channel SID. */ - virtual int getSID() =0; + virtual pvAccessID getSID() =0; /** * Destroy server channel. @@ -409,33 +460,33 @@ namespace epics { * Preallocate new channel SID. * @return new channel server id (SID). */ - virtual int preallocateChannelSID() =0; + virtual pvAccessID preallocateChannelSID() =0; /** * De-preallocate new channel SID. * @param sid preallocated channel SID. */ - virtual void depreallocateChannelSID(int sid) =0; + virtual void depreallocateChannelSID(pvAccessID sid) =0; /** * Register a new channel. * @param sid preallocated channel SID. * @param channel channel to register. */ - virtual void registerChannel(int sid, ServerChannel* channel) =0; + virtual void registerChannel(pvAccessID sid, ServerChannel* channel) =0; /** * Unregister a new channel (and deallocates its handle). * @param sid SID */ - virtual void unregisterChannel(int sid) =0; + virtual void unregisterChannel(pvAccessID sid) =0; /** * Get channel by its SID. * @param sid channel SID * @return channel with given SID, null otherwise */ - virtual ServerChannel* getChannel(int sid) =0; + virtual ServerChannel* getChannel(pvAccessID sid) =0; /** * Get channel count. @@ -443,6 +494,44 @@ namespace epics { */ virtual int getChannelCount() =0; }; + + /** + * A request that expects an response. + * Responses identified by its I/O ID. + * This interface needs to be extended (to provide method called on response). + * @author Matej Sekoranja + */ + class ResponseRequest { + public: + + /** + * Get I/O ID. + * @return ioid + */ + virtual pvAccessID getIOID() = 0; + + /** + * Timeout notification. + */ + virtual void timeout() = 0; + + /** + * Cancel response request (always to be called to complete/destroy). + */ + virtual void cancel() = 0; + + /** + * Report status to clients (e.g. disconnected). + * @param status to report. + */ + virtual void reportStatus(epics::pvData::Status* status) = 0; + + /** + * Get request requester. + * @return request requester. + */ + virtual epics::pvData::Requester* getRequester() = 0; + }; } } diff --git a/testApp/remote/testBlockingTCPClnt.cpp b/testApp/remote/testBlockingTCPClnt.cpp index 3f69392..a3bde97 100644 --- a/testApp/remote/testBlockingTCPClnt.cpp +++ b/testApp/remote/testBlockingTCPClnt.cpp @@ -42,6 +42,8 @@ public: virtual TransportRegistry* getTransportRegistry() { return _tr; } + virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; } + virtual Transport* getSearchTransport() { return 0; } private: TransportRegistry* _tr; Timer* _timer; diff --git a/testApp/remote/testBlockingTCPSrv.cpp b/testApp/remote/testBlockingTCPSrv.cpp index 704093a..6ab54f6 100644 --- a/testApp/remote/testBlockingTCPSrv.cpp +++ b/testApp/remote/testBlockingTCPSrv.cpp @@ -28,6 +28,9 @@ public: } virtual Timer* getTimer() { return _timer; } virtual TransportRegistry* getTransportRegistry() { return _tr; } + virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; } + virtual Transport* getSearchTransport() { return 0; } + private: TransportRegistry* _tr; Timer* _timer; diff --git a/testApp/remote/testChannelSearchManager.cpp b/testApp/remote/testChannelSearchManager.cpp index 780cda2..0798acf 100644 --- a/testApp/remote/testChannelSearchManager.cpp +++ b/testApp/remote/testChannelSearchManager.cpp @@ -5,14 +5,12 @@ using namespace epics::pvData; using namespace epics::pvAccess; - - int main(int argc,char *argv[]) { - ClientContextImpl* context = new ClientContextImpl(); + Context* context = 0; // TODO will crash... ChannelSearchManager* manager = new ChannelSearchManager(context); - context->destroy(); + // context->destroy(); getShowConstructDestruct()->constuctDestructTotals(stdout); return(0); } diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index f917251..4b0f311 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -19,6 +19,7 @@ #include #include #include +#include using namespace epics::pvData; using namespace epics::pvAccess; @@ -378,48 +379,6 @@ class MockMonitor : public Monitor, public MonitorElement -typedef int pvAccessID; - - -/** - * A request that expects an response. - * Responses identified by its I/O ID. - * This interface needs to be extended (to provide method called on response). - * @author Matej Sekoranja - */ -class ResponseRequest { - public: - - /** - * Get I/O ID. - * @return ioid - */ - virtual pvAccessID getIOID() = 0; - - /** - * Timeout notification. - */ - virtual void timeout() = 0; - - /** - * Cancel response request (always to be called to complete/destroy). - */ - virtual void cancel() = 0; - - /** - * Report status to clients (e.g. disconnected). - * @param status to report. - */ - virtual void reportStatus(Status* status) = 0; - - /** - * Get request requester. - * @return request requester. - */ - virtual Requester* getRequester() = 0; -}; - - // TODO consider std::unordered_map typedef std::map IOIDResponseRequestMap; @@ -568,68 +527,6 @@ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoD -#include - -class SearchInstance { - public: - - virtual pvAccessID getChannelID() = 0; - virtual String getChannelName() = 0; - virtual void unsetListOwnership() = 0; - virtual void addAndSetListOwnership(ArrayFIFO* newOwner, int index) = 0; - virtual void removeAndUnsetListOwnership() = 0; - virtual int getOwnerIndex() = 0; - virtual bool generateSearchRequestMessage(ByteBuffer* buffer, TransportSendControl* control) = 0; - - /** - * Search response from server (channel found). - * @param minorRevision server minor CA revision. - * @param serverAddress server address. - */ - virtual void searchResponse(int8 minorRevision, osiSockAddr* serverAddress) = 0; -}; - -// TODO (only to make to make it compile) -class BaseSearchInstance : public SearchInstance -{ - public: - virtual pvAccessID getChannelID() { return 0; } - virtual String getChannelName() { return ""; } - virtual void unsetListOwnership() {} - virtual void addAndSetListOwnership(ArrayFIFO* newOwner, int index) {} - virtual void removeAndUnsetListOwnership() {} - virtual int getOwnerIndex() { return 0; } - - virtual bool generateSearchRequestMessage(ByteBuffer* requestMessage, TransportSendControl* control) - { -const int DATA_COUNT_POSITION = CA_MESSAGE_HEADER_SIZE + sizeof(int32)/sizeof(int8) + 1; -const int PAYLOAD_POSITION = sizeof(int16)/sizeof(int8) + 2; - - int16 dataCount = requestMessage->getShort(DATA_COUNT_POSITION); - - dataCount++; - if(dataCount >= MAX_SEARCH_BATCH_COUNT) - { - return false; - } - - const string name = getChannelName(); - // not nice... - const int addedPayloadSize = sizeof(int32)/sizeof(int8) + (1 + sizeof(int32)/sizeof(int8) + name.length()); - - if(requestMessage->getRemaining() < addedPayloadSize) - { - return false; - } - - requestMessage->putInt(getChannelID()); - SerializeHelper::serializeString(name, requestMessage, control); - - requestMessage->putInt(PAYLOAD_POSITION, requestMessage->getPosition() - CA_MESSAGE_HEADER_SIZE); - requestMessage->putShort(DATA_COUNT_POSITION, dataCount); - return true; - }; -}; class BeaconHandlerImpl; @@ -670,43 +567,6 @@ public Context /* TODO */ - -class ChannelSearchManager { // tODO no default, etc. -ClientContextImpl* _context; - public: -ChannelSearchManager(ClientContextImpl* context): - _context(context) { -} - - - virtual void registerChannel(SearchInstance* channel) { - - ByteBuffer sendBuffer(100, EPICS_ENDIAN_BIG); - // new buffer - sendBuffer.clear(); - sendBuffer.putShort(CA_MAGIC_AND_VERSION); - sendBuffer.putByte((int8)0); // data - sendBuffer.putByte((int8)3); // search - sendBuffer.putInt(5); // "zero" payload - - sendBuffer.putInt(0); - - - sendBuffer.putByte((int8)0); - sendBuffer.putShort((int16)0); // count - - TCI tci; - - channel->generateSearchRequestMessage(&sendBuffer, &tci); - std::cout << "sending..." << sendBuffer.getPosition() << " bytes." << std::endl; - _context->getSearchTransport()->send(&sendBuffer); - - }; - virtual void unregisterChannel(SearchInstance* channel) {}; -}; - - - /** * Implementation of CAJ JCA Channel. * @author Matej Sekoranja @@ -927,7 +787,7 @@ class ChannelImpl : * Get client channel ID. * @return client channel ID. */ - pvAccessID getChannelID() const { + pvAccessID getChannelID() { return m_channelID; } From 596034d6c6f866a60ea4b5a1e303dd6efc9d0918 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Mon, 10 Jan 2011 11:23:31 +0100 Subject: [PATCH 02/10] Added debug build to the project Fixes to getBroadcastAddresses(). --- .cproject | 11 ++++- configure/CONFIG_SITE | 9 ++++- pvAccessApp/utils/inetAddressUtil.cpp | 56 +++++++++++++++++--------- testApp/utils/inetAddressUtilsTest.cpp | 2 + 4 files changed, 57 insertions(+), 21 deletions(-) diff --git a/.cproject b/.cproject index e91f269..0e0c996 100644 --- a/.cproject +++ b/.cproject @@ -307,7 +307,6 @@ make - all true true @@ -315,7 +314,6 @@ make - clean true true @@ -323,11 +321,20 @@ make + uninstall true true false + + make + + clean all DEBUG=1 + true + true + false + diff --git a/configure/CONFIG_SITE b/configure/CONFIG_SITE index 287667b..7f70b02 100644 --- a/configure/CONFIG_SITE +++ b/configure/CONFIG_SITE @@ -24,4 +24,11 @@ # take effect. #IOCS_APPL_TOP = -USR_LDFLAGS += -lpthread +ifeq ($(DEBUG),1) + DEBUG_CFLAGS=-O0 -g -ggdb +endif + +ifeq ($(EPICS_HOST_ARCH),linux-x86) + USR_LDFLAGS += -lpthread +endif + diff --git a/pvAccessApp/utils/inetAddressUtil.cpp b/pvAccessApp/utils/inetAddressUtil.cpp index 77eac47..7eeb41c 100644 --- a/pvAccessApp/utils/inetAddressUtil.cpp +++ b/pvAccessApp/utils/inetAddressUtil.cpp @@ -40,6 +40,8 @@ namespace epics { int status; struct ifconf ifconf; struct ifreq* pIfreqList; + struct ifreq* pifreq; + struct ifreq ifrBuff; osiSockAddr* pNewNode; InetAddrVector* retVector = new InetAddrVector(); @@ -62,39 +64,53 @@ namespace epics { ifconf.ifc_req = pIfreqList; status = ioctl(sock, SIOCGIFCONF, &ifconf); if(status<0||ifconf.ifc_len==0) { - errlogSevPrintf( - errlogMinor, + errlogSevPrintf(errlogMinor, "getBroadcastAddresses(): unable to fetch network interface configuration"); delete[] pIfreqList; return retVector; } - errlogPrintf("Found %d interfaces\n", ifconf.ifc_len); + int maxNodes = ifconf.ifc_len/sizeof(ifreq); + //errlogPrintf("Found %d interfaces\n", maxNodes); + + pifreq = pIfreqList; + + for(int i = 0; i<=maxNodes; i++) { + if(!(*pifreq->ifr_name)) break; + + if(i>0) { + size_t n = sizeof(sockaddr)+sizeof(pifreq->ifr_name); + if(nifr_addr.sa_family!=AF_INET) continue; - status = ioctl(sock, SIOCGIFFLAGS, &pIfreqList[i]); + strncpy(ifrBuff.ifr_name, pifreq->ifr_name, + sizeof(ifrBuff.ifr_name)); + status = ioctl(sock, SIOCGIFFLAGS, &ifrBuff); if(status) { errlogSevPrintf( errlogMinor, "getBroadcastAddresses(): net intf flags fetch for \"%s\" failed", - pIfreqList[i].ifr_name); + pifreq->ifr_name); continue; } /* * dont bother with interfaces that have been disabled */ - if(!(pIfreqList[i].ifr_flags&IFF_UP)) continue; + if(!(ifrBuff.ifr_flags&IFF_UP)) continue; /* * dont use the loop back interface */ - if(pIfreqList[i].ifr_flags&IFF_LOOPBACK) continue; + if(ifrBuff.ifr_flags&IFF_LOOPBACK) continue; pNewNode = new osiSockAddr; if(pNewNode==NULL) { @@ -114,37 +130,41 @@ namespace epics { * Otherwise CA will not query through the * interface. */ - if(pIfreqList[i].ifr_flags&IFF_BROADCAST) { - status = ioctl(sock, SIOCGIFBRDADDR, &pIfreqList[i]); + if(ifrBuff.ifr_flags&IFF_BROADCAST) { + strncpy(ifrBuff.ifr_name, pifreq->ifr_name, + sizeof(ifrBuff.ifr_name)); + status = ioctl(sock, SIOCGIFBRDADDR, &ifrBuff); if(status) { errlogSevPrintf( errlogMinor, "getBroadcastAddresses(): net intf \"%s\": bcast addr fetch fail", - pIfreqList->ifr_name); + pifreq->ifr_name); delete pNewNode; continue; } - pNewNode->sa = pIfreqList[i].ifr_broadaddr; + pNewNode->sa = ifrBuff.ifr_broadaddr; } #ifdef IFF_POINTOPOINT - else if(pIfreqList->ifr_flags&IFF_POINTOPOINT) { - status = ioctl(sock, SIOCGIFDSTADDR, &pIfreqList[i]); + else if(ifrBuff.ifr_flags&IFF_POINTOPOINT) { + strncpy(ifrBuff.ifr_name, pifreq->ifr_name, + sizeof(ifrBuff.ifr_name)); + status = ioctl(sock, SIOCGIFDSTADDR, &ifrBuff); if(status) { errlogSevPrintf( errlogMinor, "getBroadcastAddresses(): net intf \"%s\": pt to pt addr fetch fail", - pIfreqList[i].ifr_name); + pifreq->ifr_name); delete pNewNode; continue; } - pNewNode->sa = pIfreqList[i].ifr_dstaddr; + pNewNode->sa = ifrBuff.ifr_dstaddr; } #endif else { errlogSevPrintf( errlogMinor, "getBroadcastAddresses(): net intf \"%s\": not point to point or bcast?", - pIfreqList[i].ifr_name); + pifreq->ifr_name); delete pNewNode; continue; } diff --git a/testApp/utils/inetAddressUtilsTest.cpp b/testApp/utils/inetAddressUtilsTest.cpp index 5a594b7..8594254 100644 --- a/testApp/utils/inetAddressUtilsTest.cpp +++ b/testApp/utils/inetAddressUtilsTest.cpp @@ -6,6 +6,7 @@ */ #include "inetAddressUtil.h" +#include "logger.h" #include #include @@ -23,6 +24,7 @@ using std::stringstream; using std::hex; int main(int argc, char *argv[]) { + createFileLogger("inetAddresUtils.log"); InetAddrVector *vec; InetAddrVector *vec1; From df7a9fa0739780bd7e5721fb4f4322ff1f17fcdd Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Mon, 10 Jan 2011 14:09:58 +0100 Subject: [PATCH 03/10] Added configuration to Context and fixed all the users. --- pvAccessApp/Makefile | 1 + pvAccessApp/remote/remote.h | 211 +++++----- pvAccessApp/server/responseHandlers.cpp | 25 +- pvAccessApp/server/responseHandlers.h | 10 +- pvAccessApp/server/serverContext.h | 5 +- testApp/remote/testBeaconEmitter.cpp | 28 +- testApp/remote/testBeaconHandler.cpp | 28 +- testApp/remote/testBlockingTCPClnt.cpp | 24 +- testApp/remote/testBlockingTCPSrv.cpp | 10 +- testApp/remote/testBlockingUDPClnt.cpp | 39 +- testApp/remote/testBlockingUDPSrv.cpp | 39 +- testApp/remote/testRemoteClientImpl.cpp | 519 ++++++++++++------------ 12 files changed, 532 insertions(+), 407 deletions(-) diff --git a/pvAccessApp/Makefile b/pvAccessApp/Makefile index bd26f53..2f35089 100644 --- a/pvAccessApp/Makefile +++ b/pvAccessApp/Makefile @@ -67,6 +67,7 @@ LIBSRCS += blockingTCPConnector.cpp LIBSRCS += blockingServerTCPTransport.cpp LIBSRCS += blockingTCPAcceptor.cpp LIBSRCS += channelSearchManager.cpp +LIBSRCS += abstractResponseHandler.cpp LIBRARY = pvAccess pvAccess_LIBS += Com diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index f5129d1..8a8aaea 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -11,7 +11,7 @@ #include "caConstants.h" #include "transportRegistry.h" #include "introspectionRegistry.h" -#include "serverContext.h" +#include "configuration.h" #include #include @@ -32,42 +32,42 @@ namespace epics { }; enum QoS { - /** - * Default behavior. - */ - DEFAULT = 0x00, - /** - * Require reply (acknowledgment for reliable operation). - */ - REPLY_REQUIRED = 0x01, - /** - * Best-effort option (no reply). - */ - BESY_EFFORT = 0x02, - /** - * Process option. - */ - PROCESS = 0x04, - /** - * Initialize option. - */ - INIT = 0x08, - /** - * Destroy option. - */ - DESTROY = 0x10, - /** - * Share data option. - */ - SHARE = 0x20, - /** - * Get. - */ - GET = 0x40, - /** - * Get-put. - */ - GET_PUT =0x80 + /** + * Default behavior. + */ + DEFAULT = 0x00, + /** + * Require reply (acknowledgment for reliable operation). + */ + REPLY_REQUIRED = 0x01, + /** + * Best-effort option (no reply). + */ + BESY_EFFORT = 0x02, + /** + * Process option. + */ + PROCESS = 0x04, + /** + * Initialize option. + */ + INIT = 0x08, + /** + * Destroy option. + */ + DESTROY = 0x10, + /** + * Share data option. + */ + SHARE = 0x20, + /** + * Get. + */ + GET = 0x40, + /** + * Get-put. + */ + GET_PUT = 0x80 }; typedef int32 pvAccessID; @@ -253,6 +253,35 @@ namespace epics { }; + class Channel; + + /** + * Not public IF, used by Transports, etc. + */ + class Context { + public: + virtual ~Context() { + } + /** + * Get timer. + * @return timer. + */ + virtual Timer* getTimer() = 0; + + /** + * Get transport (virtual circuit) registry. + * @return transport (virtual circuit) registry. + */ + virtual TransportRegistry* getTransportRegistry() = 0; + + virtual Channel* getChannel(pvAccessID id) = 0; + + virtual Transport* getSearchTransport() = 0; + + virtual Configuration* getConfiguration() = 0; + + }; + /** * Interface defining response handler. * @author Matej Sekoranja @@ -260,6 +289,10 @@ namespace epics { */ class ResponseHandler { public: + ResponseHandler(Context* context) : + _context(context) { + } + virtual ~ResponseHandler() { } @@ -277,6 +310,9 @@ namespace epics { handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) =0; + + protected: + Context* _context; }; /** @@ -289,9 +325,10 @@ namespace epics { /** * @param description */ - AbstractResponseHandler(String description) : - _description(description), _debug(true) { - //debug = System.getProperties().containsKey(CAConstants.CAJ_DEBUG); + AbstractResponseHandler(Context* context, String description) : + ResponseHandler(context), _description(description), _debug( + _context->getConfiguration()->getPropertyAsBoolean( + "PVACCESS_DEBUG", false)) { } virtual ~AbstractResponseHandler() { @@ -373,33 +410,6 @@ namespace epics { }; - class Channel; - - /** - * Not public IF, used by Transports, etc. - */ - class Context { - public: - virtual ~Context() { - } - /** - * Get timer. - * @return timer. - */ - virtual Timer* getTimer() =0; - - /** - * Get transport (virtual circuit) registry. - * @return transport (virtual circuit) registry. - */ - virtual TransportRegistry* getTransportRegistry() =0; - - virtual Channel* getChannel(pvAccessID id) = 0; - - virtual Transport* getSearchTransport() = 0; - - }; - /** * Interface defining reference counting transport IF. * @author Matej Sekoranja @@ -474,7 +484,8 @@ namespace epics { * @param sid preallocated channel SID. * @param channel channel to register. */ - virtual void registerChannel(pvAccessID sid, ServerChannel* channel) =0; + virtual void + registerChannel(pvAccessID sid, ServerChannel* channel) =0; /** * Unregister a new channel (and deallocates its handle). @@ -495,44 +506,44 @@ namespace epics { */ virtual int getChannelCount() =0; }; - + /** * A request that expects an response. - * Responses identified by its I/O ID. + * Responses identified by its I/O ID. * This interface needs to be extended (to provide method called on response). * @author Matej Sekoranja */ class ResponseRequest { - public: - - /** - * Get I/O ID. - * @return ioid - */ - virtual pvAccessID getIOID() = 0; - - /** - * Timeout notification. - */ - virtual void timeout() = 0; - - /** - * Cancel response request (always to be called to complete/destroy). - */ - virtual void cancel() = 0; - - /** - * Report status to clients (e.g. disconnected). - * @param status to report. - */ - virtual void reportStatus(epics::pvData::Status* status) = 0; - - /** - * Get request requester. - * @return request requester. - */ - virtual epics::pvData::Requester* getRequester() = 0; - }; + public: + + /** + * Get I/O ID. + * @return ioid + */ + virtual pvAccessID getIOID() = 0; + + /** + * Timeout notification. + */ + virtual void timeout() = 0; + + /** + * Cancel response request (always to be called to complete/destroy). + */ + virtual void cancel() = 0; + + /** + * Report status to clients (e.g. disconnected). + * @param status to report. + */ + virtual void reportStatus(epics::pvData::Status* status) = 0; + + /** + * Get request requester. + * @return request requester. + */ + virtual epics::pvData::Requester* getRequester() = 0; + }; } } diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index e3bbad5..c0ab2ae 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -24,23 +24,6 @@ using namespace epics::pvData; namespace epics { namespace pvAccess { - void AbstractResponseHandler::handleResponse(osiSockAddr* responseFrom, - Transport* transport, int8 version, int8 command, - int payloadSize, ByteBuffer* payloadBuffer) { - if(_debug) { - char ipAddrStr[48]; - ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); - - ostringstream prologue; - prologue<<"Message [0x"<getArray(), - payloadBuffer->getPosition(), payloadSize); - } - } - void BadResponse::handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, ByteBuffer* payloadBuffer) { @@ -57,15 +40,15 @@ namespace epics { } ServerResponseHandler::ServerResponseHandler(ServerContextImpl* context) : - _context(context) { + ResponseHandler(context) { BadResponse* badResponse = new BadResponse(context); _handlerTable = new ResponseHandler*[HANDLER_TABLE_LENGTH]; // TODO add real handlers, as they are developed - _handlerTable[0] = new NoopResponse(_context, "Beacon"); - _handlerTable[1] = new ConnectionValidationHandler(_context); - _handlerTable[2] = new EchoHandler(_context); + _handlerTable[0] = new NoopResponse(context, "Beacon"); + _handlerTable[1] = new ConnectionValidationHandler(context); + _handlerTable[2] = new EchoHandler(context); _handlerTable[3] = badResponse; _handlerTable[4] = badResponse; _handlerTable[5] = badResponse; diff --git a/pvAccessApp/server/responseHandlers.h b/pvAccessApp/server/responseHandlers.h index afbcdba..7aeab36 100644 --- a/pvAccessApp/server/responseHandlers.h +++ b/pvAccessApp/server/responseHandlers.h @@ -8,6 +8,7 @@ #ifndef RESPONSEHANDLERS_H_ #define RESPONSEHANDLERS_H_ +#include "serverContext.h" #include "remote.h" namespace epics { @@ -25,13 +26,11 @@ namespace epics { */ AbstractServerResponseHandler(ServerContextImpl* context, String description) : - AbstractResponseHandler(description), _context(context) { + AbstractResponseHandler(context, description) { } virtual ~AbstractServerResponseHandler() { } - protected: - ServerContextImpl* _context; }; /** @@ -77,11 +76,6 @@ namespace epics { */ ResponseHandler** _handlerTable; - /** - * Context instance. - */ - ServerContextImpl* _context; - }; /** diff --git a/pvAccessApp/server/serverContext.h b/pvAccessApp/server/serverContext.h index d535616..c01d52e 100644 --- a/pvAccessApp/server/serverContext.h +++ b/pvAccessApp/server/serverContext.h @@ -8,11 +8,12 @@ #ifndef SERVERCONTEXT_H_ #define SERVERCONTEXT_H_ +#include "remote.h" + namespace epics { namespace pvAccess { - - class ServerContextImpl { + class ServerContextImpl : public Context { }; diff --git a/testApp/remote/testBeaconEmitter.cpp b/testApp/remote/testBeaconEmitter.cpp index 0d3c904..c1a3026 100644 --- a/testApp/remote/testBeaconEmitter.cpp +++ b/testApp/remote/testBeaconEmitter.cpp @@ -18,6 +18,8 @@ using namespace epics::pvData; class DummyResponseHandler : public ResponseHandler { public: + DummyResponseHandler(Context* ctx) : ResponseHandler(ctx) {} + virtual void handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, ByteBuffer* payloadBuffer) @@ -27,9 +29,33 @@ public: }; +class ContextImpl : public Context { +public: + ContextImpl() : + _tr(new TransportRegistry()), _timer(new Timer("server thread", + lowPriority)), _conf(new SystemConfigurationImpl()) { + } + virtual ~ContextImpl() { + delete _tr; + delete _timer; + } + virtual Timer* getTimer() { return _timer; } + virtual TransportRegistry* getTransportRegistry() { return _tr; } + virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; } + virtual Transport* getSearchTransport() { return 0; } + virtual Configuration* getConfiguration() { return _conf; } + +private: + TransportRegistry* _tr; + Timer* _timer; + Configuration* _conf; +}; + + void testBeaconEmitter() { - DummyResponseHandler drh; + ContextImpl ctx; + DummyResponseHandler drh(&ctx); /* SOCKET mysocket; if ((mysocket = socket (AF_INET, SOCK_DGRAM, 0)) == -1) { diff --git a/testApp/remote/testBeaconHandler.cpp b/testApp/remote/testBeaconHandler.cpp index 0ac231c..6690ab4 100644 --- a/testApp/remote/testBeaconHandler.cpp +++ b/testApp/remote/testBeaconHandler.cpp @@ -37,7 +37,7 @@ void decodeFromIPv6Address(ByteBuffer* buffer, osiSockAddr* address) class BeaconResponseHandler : public ResponseHandler { public: - BeaconResponseHandler() + BeaconResponseHandler(Context* ctx) : ResponseHandler(ctx) { _pvDataCreate = getPVDataCreate(); } @@ -103,9 +103,33 @@ private: }; +class ContextImpl : public Context { +public: + ContextImpl() : + _tr(new TransportRegistry()), _timer(new Timer("server thread", + lowPriority)), _conf(new SystemConfigurationImpl()) { + } + virtual ~ContextImpl() { + delete _tr; + delete _timer; + } + virtual Timer* getTimer() { return _timer; } + virtual TransportRegistry* getTransportRegistry() { return _tr; } + virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; } + virtual Transport* getSearchTransport() { return 0; } + virtual Configuration* getConfiguration() { return _conf; } + +private: + TransportRegistry* _tr; + Timer* _timer; + Configuration* _conf; +}; + + void testBeaconHandler() { - BeaconResponseHandler brh; + ContextImpl ctx; + BeaconResponseHandler brh(&ctx); BlockingUDPConnector connector(false, NULL, true); osiSockAddr bindAddr; diff --git a/testApp/remote/testBlockingTCPClnt.cpp b/testApp/remote/testBlockingTCPClnt.cpp index a3bde97..d845c1b 100644 --- a/testApp/remote/testBlockingTCPClnt.cpp +++ b/testApp/remote/testBlockingTCPClnt.cpp @@ -29,28 +29,30 @@ using std::sscanf; class ContextImpl : public Context { public: ContextImpl() : - _tr(new TransportRegistry()), _timer(new Timer("client thread", - lowPriority)) { + _tr(new TransportRegistry()), _timer(new Timer("server thread", + lowPriority)), _conf(new SystemConfigurationImpl()) { } virtual ~ContextImpl() { delete _tr; delete _timer; } - virtual Timer* getTimer() { - return _timer; - } - virtual TransportRegistry* getTransportRegistry() { - return _tr; - } - virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; } - virtual Transport* getSearchTransport() { return 0; } + virtual Timer* getTimer() { return _timer; } + virtual TransportRegistry* getTransportRegistry() { return _tr; } + virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; } + virtual Transport* getSearchTransport() { return 0; } + virtual Configuration* getConfiguration() { return _conf; } + private: TransportRegistry* _tr; Timer* _timer; + Configuration* _conf; }; class DummyResponseHandler : public ResponseHandler { public: + DummyResponseHandler(Context* ctx) : ResponseHandler(ctx) { + } + virtual void handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, ByteBuffer* payloadBuffer) { @@ -110,7 +112,7 @@ void testBlockingTCPSender() { DummyTransportClient dtc; DummyTransportSender dts; - DummyResponseHandler drh; + DummyResponseHandler drh(&ctx); osiSockAddr srvAddr; diff --git a/testApp/remote/testBlockingTCPSrv.cpp b/testApp/remote/testBlockingTCPSrv.cpp index 6ab54f6..c7ced35 100644 --- a/testApp/remote/testBlockingTCPSrv.cpp +++ b/testApp/remote/testBlockingTCPSrv.cpp @@ -8,6 +8,7 @@ #include "blockingTCP.h" #include "remote.h" #include "logger.h" +#include "configuration.h" #include @@ -21,7 +22,8 @@ class ContextImpl : public Context { public: ContextImpl() : _tr(new TransportRegistry()), - _timer(new Timer("server thread", lowPriority)) {} + _timer(new Timer("server thread", lowPriority)), + _conf(new SystemConfigurationImpl()) {} virtual ~ContextImpl() { delete _tr; delete _timer; @@ -30,10 +32,12 @@ public: virtual TransportRegistry* getTransportRegistry() { return _tr; } virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; } virtual Transport* getSearchTransport() { return 0; } - + virtual Configuration* getConfiguration() { return _conf; } + private: TransportRegistry* _tr; Timer* _timer; + Configuration* _conf; }; void testServerConnections() { @@ -43,7 +47,7 @@ void testServerConnections() { 1024); cout<<"Press any key to stop the server..."; - char c = cin.peek(); + cin.peek(); delete srv; } diff --git a/testApp/remote/testBlockingUDPClnt.cpp b/testApp/remote/testBlockingUDPClnt.cpp index 06443aa..3e124d9 100644 --- a/testApp/remote/testBlockingUDPClnt.cpp +++ b/testApp/remote/testBlockingUDPClnt.cpp @@ -26,8 +26,44 @@ using std::sscanf; static osiSockAddr sendTo; +class ContextImpl : public Context { +public: + ContextImpl() : + _tr(new TransportRegistry()), _timer(new Timer("server thread", + lowPriority)), _conf(new SystemConfigurationImpl()) { + } + virtual ~ContextImpl() { + delete _tr; + delete _timer; + } + virtual Timer* getTimer() { + return _timer; + } + virtual TransportRegistry* getTransportRegistry() { + return _tr; + } + virtual Channel* getChannel(epics::pvAccess::pvAccessID) { + return 0; + } + virtual Transport* getSearchTransport() { + return 0; + } + virtual Configuration* getConfiguration() { + return _conf; + } + +private: + TransportRegistry* _tr; + Timer* _timer; + Configuration* _conf; +}; + class DummyResponseHandler : public ResponseHandler { public: + DummyResponseHandler(Context* ctx) : + ResponseHandler(ctx) { + } + virtual void handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, ByteBuffer* payloadBuffer) { @@ -69,9 +105,10 @@ private: void testBlockingUDPSender() { BlockingUDPConnector connector(false, NULL, true); + ContextImpl ctx; DummyTransportSender dts; - DummyResponseHandler drh; + DummyResponseHandler drh(&ctx); osiSockAddr bindAddr; diff --git a/testApp/remote/testBlockingUDPSrv.cpp b/testApp/remote/testBlockingUDPSrv.cpp index 6b713bb..16612ac 100644 --- a/testApp/remote/testBlockingUDPSrv.cpp +++ b/testApp/remote/testBlockingUDPSrv.cpp @@ -21,10 +21,42 @@ using std::endl; using std::hex; using std::dec; +class ContextImpl : public Context { +public: + ContextImpl() : + _tr(new TransportRegistry()), _timer(new Timer("server thread", + lowPriority)), _conf(new SystemConfigurationImpl()) { + } + virtual ~ContextImpl() { + delete _tr; + delete _timer; + } + virtual Timer* getTimer() { + return _timer; + } + virtual TransportRegistry* getTransportRegistry() { + return _tr; + } + virtual Channel* getChannel(epics::pvAccess::pvAccessID) { + return 0; + } + virtual Transport* getSearchTransport() { + return 0; + } + virtual Configuration* getConfiguration() { + return _conf; + } + +private: + TransportRegistry* _tr; + Timer* _timer; + Configuration* _conf; +}; + class DummyResponseHandler : public ResponseHandler { public: - DummyResponseHandler() : - packets(0) { + DummyResponseHandler(Context* context) : + ResponseHandler(context), packets(0) { } int getPackets() { @@ -71,8 +103,9 @@ void DummyResponseHandler::handleResponse(osiSockAddr* responseFrom, void testBlockingUDPConnector() { BlockingUDPConnector connector(false, NULL, true); + ContextImpl ctx; - DummyResponseHandler drh; + DummyResponseHandler drh(&ctx); osiSockAddr bindAddr; diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 4b0f311..987006d 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -20,6 +20,7 @@ #include #include #include +#include using namespace epics::pvData; using namespace epics::pvAccess; @@ -34,7 +35,7 @@ class ChannelImplProcess : public ChannelProcess ChannelProcessRequester* m_channelProcessRequester; PVStructure* m_pvStructure; PVScalar* m_valueField; - + private: ~ChannelImplProcess() { @@ -53,29 +54,29 @@ class ChannelImplProcess : public ChannelProcess Status* noValueFieldStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "no 'value' field"); m_channelProcessRequester->channelProcessConnect(noValueFieldStatus, this); delete noValueFieldStatus; - + // NOTE client must destroy this instance... // do not access any fields and return ASAP return; } - + if (field->getField()->getType() != scalar) { Status* notAScalarStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "'value' field not scalar type"); m_channelProcessRequester->channelProcessConnect(notAScalarStatus, this); delete notAScalarStatus; - + // NOTE client must destroy this instance…. // do not access any fields and return ASAP return; } - + m_valueField = static_cast(field); - - // TODO pvRequest + + // TODO pvRequest m_channelProcessRequester->channelProcessConnect(getStatusCreate()->getStatusOK(), this); } - + virtual void process(bool lastRequest) { switch (m_valueField->getScalar()->getScalarType()) @@ -147,19 +148,19 @@ class ChannelImplProcess : public ChannelProcess default: // noop break; - - } + + } m_channelProcessRequester->processDone(getStatusCreate()->getStatusOK()); - + if (lastRequest) destroy(); } - + virtual void destroy() { delete this; } - + }; @@ -176,7 +177,7 @@ class ChannelImplGet : public ChannelGet PVStructure* m_pvStructure; BitSet* m_bitSet; volatile bool m_first; - + private: ~ChannelImplGet() { @@ -190,10 +191,10 @@ class ChannelImplGet : public ChannelGet { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannelGet); - // TODO pvRequest + // TODO pvRequest m_channelGetRequester->channelGetConnect(getStatusCreate()->getStatusOK(), this, m_pvStructure, m_bitSet); } - + virtual void get(bool lastRequest) { m_channelGetRequester->getDone(getStatusCreate()->getStatusOK()); @@ -202,17 +203,17 @@ class ChannelImplGet : public ChannelGet m_first = false; m_bitSet->set(0); // TODO } - + if (lastRequest) destroy(); } - + virtual void destroy() { delete m_bitSet; delete this; } - + }; @@ -231,7 +232,7 @@ class ChannelImplPut : public ChannelPut PVStructure* m_pvStructure; BitSet* m_bitSet; volatile bool m_first; - + private: ~ChannelImplPut() { @@ -245,17 +246,17 @@ class ChannelImplPut : public ChannelPut { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannelPut); - // TODO pvRequest + // TODO pvRequest m_channelPutRequester->channelPutConnect(getStatusCreate()->getStatusOK(), this, m_pvStructure, m_bitSet); } - + virtual void put(bool lastRequest) { m_channelPutRequester->putDone(getStatusCreate()->getStatusOK()); if (lastRequest) destroy(); } - + virtual void get() { m_channelPutRequester->getDone(getStatusCreate()->getStatusOK()); @@ -266,7 +267,7 @@ class ChannelImplPut : public ChannelPut delete m_bitSet; delete this; } - + }; @@ -287,7 +288,7 @@ class MockMonitor : public Monitor, public MonitorElement volatile bool m_first; Mutex* m_lock; volatile int m_count; - + private: ~MockMonitor() { @@ -306,16 +307,16 @@ class MockMonitor : public Monitor, public MonitorElement PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockMonitor); m_changedBitSet->set(0); - - // TODO pvRequest + + // TODO pvRequest m_monitorRequester->monitorConnect(getStatusCreate()->getStatusOK(), this, const_cast(m_pvStructure->getStructure())); } - + virtual Status* start() { // fist monitor m_monitorRequester->monitorEvent(this); - + // client needs to delete status, so passing shared OK instance is not right thing to do return getStatusCreate()->createStatus(STATUSTYPE_OK, "Monitor started."); } @@ -346,24 +347,24 @@ class MockMonitor : public Monitor, public MonitorElement if (m_count) m_count--; } - + virtual void destroy() { delete stop(); - + delete m_lock; delete m_overrunBitSet; delete m_changedBitSet; delete this; } - + // ============ MonitorElement ============ - + virtual PVStructure* getPVStructure() { return m_pvStructure; } - + virtual BitSet* getChangedBitSet() { return m_changedBitSet; @@ -373,8 +374,8 @@ class MockMonitor : public Monitor, public MonitorElement { return m_overrunBitSet; } - - + + }; @@ -395,7 +396,8 @@ class ClientContextImpl; /** * @param context */ - DebugResponse() + DebugResponse(Context* ctx) : + ResponseHandler(ctx) { } @@ -419,7 +421,7 @@ class ClientContextImpl; hexDump(prologue.str(), "received", (const int8*)payloadBuffer->getArray(), payloadBuffer->getPosition(), payloadSize); - + } }; @@ -430,30 +432,30 @@ class ClientContextImpl; */ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoDefaultMethods { private: - + /** * Table of response handlers for each command ID. */ ResponseHandler** m_handlerTable; - /** - * Context instance. + /* + * Context instance is part of the response handler now */ - ClientContextImpl* m_context; - + //ClientContextImpl* m_context; + public: - + ~ClientResponseHandler() { delete[] m_handlerTable; } - + /** * @param context */ - ClientResponseHandler(ClientContextImpl* context) : m_context(context) { - static ResponseHandler* badResponse = new DebugResponse(); + ClientResponseHandler(ClientContextImpl* context) : ResponseHandler((Context*)context) { + static ResponseHandler* badResponse = new DebugResponse((Context*)context); static ResponseHandler* dataResponse = 0; //new DataResponseHandler(context); - + #define HANDLER_COUNT 28 m_handlerTable = new ResponseHandler*[HANDLER_COUNT]; m_handlerTable[ 0] = badResponse; // TODO new BeaconHandler(context), /* 0 */ @@ -502,7 +504,7 @@ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoD hexDump(buf, (const int8*)(payloadBuffer->getArray()), payloadBuffer->getPosition(), payloadSize); return; } - + // delegate m_handlerTable[c]->handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); } @@ -525,7 +527,7 @@ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoD }; - + class BeaconHandlerImpl; @@ -556,10 +558,10 @@ enum ContextState { */ CONTEXT_DESTROYED }; - + class ClientContextImpl : public ClientContext, -public Context /* TODO */ +public Context /* TODO */ { @@ -577,7 +579,7 @@ class ChannelImpl : public TransportSender, public BaseSearchInstance { private: - + /** * Context. */ @@ -587,12 +589,12 @@ class ChannelImpl : * Client channel ID. */ pvAccessID m_channelID; - + /** * Channel name. */ String m_name; - + /** * Channel requester. */ @@ -602,48 +604,48 @@ class ChannelImpl : * Process priority. */ short m_priority; - + /** * List of fixed addresses, if name resolution will be used. */ InetAddrVector* m_addresses; - + /** * Connection status. */ ConnectionState m_connectionState; - + /** - * List of all channel's pending requests (keys are subscription IDs). + * List of all channel's pending requests (keys are subscription IDs). */ IOIDResponseRequestMap m_responseRequests; - + /** - * Allow reconnection flag. + * Allow reconnection flag. */ bool m_allowCreation; - + /** * Reference counting. - * NOTE: synced on m_channelMutex. + * NOTE: synced on m_channelMutex. */ int m_references; - + /* ****************** */ - /* CA protocol fields */ + /* CA protocol fields */ /* ****************** */ - + /** * Server transport. */ Transport* m_transport; - + /** * Server channel ID. */ pvAccessID m_serverChannelID; - + /** * Context sync. mutex. */ @@ -651,20 +653,20 @@ class ChannelImpl : /** * Flag indicting what message to send. - */ + */ bool m_issueCreateMessage; - + // TODO mock PVStructure* m_pvStructure; - + private: - ~ChannelImpl() + ~ChannelImpl() { PVDATA_REFCOUNT_MONITOR_DESTRUCT(channel); } - + public: - + /** * Constructor. * @param context @@ -693,10 +695,10 @@ class ChannelImpl : m_issueCreateMessage(true) { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channel); - + // register before issuing search request m_context->registerChannel(this); - + // connect connect(); @@ -704,7 +706,7 @@ class ChannelImpl : // // mock - // + // ScalarType stype = pvDouble; String allProperties("alarm,timeStamp,display,control,valueAlarm"); @@ -713,14 +715,14 @@ class ChannelImpl : PVDouble *pvField = m_pvStructure->getDoubleField(String("value")); pvField->put(1.123); - + // already connected, report state m_requester->channelStateChange(this, CONNECTED); - - - + + + } - + virtual void destroy() { if (m_addresses) delete m_addresses; @@ -732,13 +734,13 @@ class ChannelImpl : { return getChannelName(); }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } - virtual ChannelProvider* getProvider() + virtual ChannelProvider* getProvider() { return m_context->getProvider(); } @@ -790,7 +792,7 @@ class ChannelImpl : pvAccessID getChannelID() { return m_channelID; } - + void connect() { Lock guard(&m_channelMutex); // if not destroyed... @@ -807,14 +809,14 @@ class ChannelImpl : throw std::runtime_error("Channel destroyed."); else if (m_connectionState == CONNECTED) disconnect(false, true); - } - + } + /** * Create a channel, i.e. submit create channel request to the server. * This method is called after search is complete. * @param transport */ - void createChannel(Transport* transport) + void createChannel(Transport* transport) { Lock guard(&m_channelMutex); @@ -822,12 +824,12 @@ class ChannelImpl : if (!m_allowCreation) return; m_allowCreation = false; - + // check existing transport if (m_transport && m_transport != transport) { disconnectPendingIO(false); - + ReferenceCountingTransport* rct = dynamic_cast(m_transport); if (rct) rct->release(this); } @@ -837,11 +839,11 @@ class ChannelImpl : // this happens when server is slower (processing search requests) than client generating it return; } - + m_transport = transport; m_transport->enqueueSendRequest(this); } - + virtual void cancel() { // noop } @@ -867,7 +869,7 @@ class ChannelImpl : * sid might not be valid, this depends on protocol revision. * @param sid */ - void connectionCompleted(pvAccessID sid/*, rights*/) + void connectionCompleted(pvAccessID sid/*, rights*/) { Lock guard(&m_channelMutex); @@ -884,16 +886,16 @@ class ChannelImpl : // user might create monitors in listeners, so this has to be done before this can happen // however, it would not be nice if events would come before connection event is fired - // but this cannot happen since transport (TCP) is serving in this thread + // but this cannot happen since transport (TCP) is serving in this thread resubscribeSubscriptions(); setConnectionState(CONNECTED); allOK = true; } catch (...) { // noop - // TODO at least log something?? + // TODO at least log something?? } - + if (!allOK) { // end connection request @@ -908,10 +910,10 @@ class ChannelImpl : Lock guard(&m_channelMutex); if (m_connectionState == DESTROYED) throw std::runtime_error("Channel already destroyed."); - + // do destruction via context m_context->destroyChannel(this, force); - + } /** @@ -938,7 +940,7 @@ class ChannelImpl : m_references--; if (m_references > 0 && !force) return; - + // stop searching... m_context->getChannelSearchManager()->unregisterChannel(this); cancel(); @@ -958,7 +960,7 @@ class ChannelImpl : } setConnectionState(DESTROYED); - + // unregister m_context->unregisterChannel(this); } @@ -970,10 +972,10 @@ class ChannelImpl : */ void disconnect(bool initiateSearch, bool remoteDestroy) { Lock guard(&m_channelMutex); - + if (m_connectionState != CONNECTED && !m_transport) return; - + if (!initiateSearch) { // stop searching... m_context->getChannelSearchManager()->unregisterChannel(this); @@ -990,12 +992,12 @@ class ChannelImpl : m_issueCreateMessage = false; m_transport->enqueueSendRequest(this); } - + ReferenceCountingTransport* rct = dynamic_cast(m_transport); if (rct) rct->release(this); m_transport = 0; } - + if (initiateSearch) this->initiateSearch(); @@ -1009,7 +1011,7 @@ class ChannelImpl : Lock guard(&m_channelMutex); m_allowCreation = true; - + if (!m_addresses) m_context->getChannelSearchManager()->registerChannel(this); /* TODO @@ -1035,7 +1037,7 @@ class ChannelImpl : return; } } - + transport = m_context->getTransport(this, serverAddress, minorRevision, m_priority); if (!transport) { @@ -1060,7 +1062,7 @@ class ChannelImpl : if (m_connectionState == DISCONNECTED) { updateSubscriptions(); - + // reconnect using existing IDs, data connectionCompleted(m_serverChannelID/*, accessRights*/); } @@ -1073,7 +1075,7 @@ class ChannelImpl : // NOTE: 2 types of disconnected state - distinguish them setConnectionState(DISCONNECTED); - // ... CA notifies also w/ no access rights callback, although access right are not changed + // ... CA notifies also w/ no access rights callback, although access right are not changed } } @@ -1087,7 +1089,7 @@ class ChannelImpl : if (m_connectionState != connectionState) { m_connectionState = connectionState; - + //bool connectionStatusToReport = (connectionState == CONNECTED); //if (connectionStatusToReport != lastReportedConnectionState) { @@ -1102,16 +1104,16 @@ class ChannelImpl : // noop } - + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { m_channelMutex.lock(); bool issueCreateMessage = m_issueCreateMessage; m_channelMutex.unlock(); - + if (issueCreateMessage) { control->startMessage((int8)7, 2+4); - + // count buffer->putShort((int16)1); // array of CIDs and names @@ -1150,9 +1152,9 @@ class ChannelImpl : { // TODO } - + /** - * Resubscribe subscriptions. + * Resubscribe subscriptions. */ // TODO to be called from non-transport thread !!!!!! void resubscribeSubscriptions() @@ -1161,7 +1163,7 @@ class ChannelImpl : } /** - * Update subscriptions. + * Update subscriptions. */ // TODO to be called from non-transport thread !!!!!! void updateSubscriptions() @@ -1202,7 +1204,7 @@ class ChannelImpl : // TODO return 0; } - + virtual ChannelRPC* createChannelRPC(ChannelRPCRequester *channelRPCRequester, epics::pvData::PVStructure *pvRequest) { @@ -1224,20 +1226,20 @@ class ChannelImpl : // TODO return 0; } - - - + + + virtual void printInfo() { String info; printInfo(&info); std::cout << info.c_str() << std::endl; } - + virtual void printInfo(epics::pvData::StringBuilder out) { //Lock lock(&m_channelMutex); //std::ostringstream ostr; //static String emptyString; - + out->append( "CHANNEL : "); out->append(m_name); out->append("\nSTATE : "); out->append(ConnectionStateNames[m_connectionState]); if (m_connectionState == CONNECTED) @@ -1248,72 +1250,72 @@ class ChannelImpl : out->append("\n"); } }; - + class ChannelProviderImpl; - + class ChannelImplFind : public ChannelFind { public: ChannelImplFind(ChannelProvider* provider) : m_provider(provider) { } - + virtual void destroy() { // one instance for all, do not delete at all } - + virtual ChannelProvider* getChannelProvider() { return m_provider; }; - + virtual void cancelChannelFind() { throw std::runtime_error("not supported"); } - + private: - + // only to be destroyed by it friend class ChannelProviderImpl; virtual ~ChannelImplFind() {} - - ChannelProvider* m_provider; + + ChannelProvider* m_provider; }; - + class ChannelProviderImpl : public ChannelProvider { public: - + ChannelProviderImpl(ClientContextImpl* context) : m_context(context) { } - + virtual epics::pvData::String getProviderName() { return "ChannelProviderImpl"; } - + virtual void destroy() { delete this; } - + virtual ChannelFind* channelFind( epics::pvData::String channelName, ChannelFindRequester *channelFindRequester) { m_context->checkChannelName(channelName); - + if (!channelFindRequester) throw std::runtime_error("null requester"); - + std::auto_ptr errorStatus(getStatusCreate()->createStatus(STATUSTYPE_ERROR, "not implemented", 0)); channelFindRequester->channelFindResult(errorStatus.get(), 0, false); return 0; } - + virtual Channel* createChannel( epics::pvData::String channelName, ChannelRequester *channelRequester, @@ -1321,7 +1323,7 @@ class ChannelImpl : { return createChannel(channelName, channelRequester, priority, emptyString); } - + virtual Channel* createChannel( epics::pvData::String channelName, ChannelRequester *channelRequester, @@ -1333,31 +1335,35 @@ class ChannelImpl : if (channel) channelRequester->channelCreated(getStatusCreate()->getStatusOK(), channel); return channel; - + // NOTE it's up to internal code to respond w/ error to requester and return 0 in case of errors } - + private: ~ChannelProviderImpl() {}; - + /* TODO static*/ String emptyString; ClientContextImpl* m_context; }; public: - - ClientContextImpl() : + + ClientContextImpl() : m_addressList(""), m_autoAddressList(true), m_connectionTimeout(30.0f), m_beaconPeriod(15.0f), m_broadcastPort(CA_BROADCAST_PORT), m_receiveBufferSize(MAX_TCP_RECV), m_timer(0), m_broadcastTransport(0), m_searchTransport(0), m_connector(0), m_transportRegistry(0), m_namedLocker(0), m_lastCID(0), m_lastIOID(0), m_channelSearchManager(0), m_version(new Version("CA Client", "cpp", 0, 0, 0, 1)), m_provider(new ChannelProviderImpl(this)), - m_contextState(CONTEXT_NOT_INITIALIZED) + m_contextState(CONTEXT_NOT_INITIALIZED), m_configuration(new SystemConfigurationImpl()) { loadConfiguration(); } - + + virtual Configuration* getConfiguration() { + return m_configuration; + } + virtual Version* getVersion() { return m_version; } @@ -1387,28 +1393,28 @@ class ChannelImpl : virtual void initialize() { Lock lock(&m_contextMutex); - + if (m_contextState == CONTEXT_DESTROYED) throw std::runtime_error("Context destroyed."); else if (m_contextState == CONTEXT_INITIALIZED) throw std::runtime_error("Context already initialized."); - + internalInitialize(); m_contextState = CONTEXT_INITIALIZED; } - + virtual void printInfo() { String info; printInfo(&info); std::cout << info.c_str() << std::endl; } - + virtual void printInfo(epics::pvData::StringBuilder out) { Lock lock(&m_contextMutex); std::ostringstream ostr; static String emptyString; - + out->append( "CLASS : ::epics::pvAccess::ClientContextImpl"); out->append("\nVERSION : "); out->append(m_version->getVersionString()); out->append("\nADDR_LIST : "); ostr << m_addressList; out->append(ostr.str()); ostr.str(emptyString); @@ -1434,7 +1440,7 @@ class ChannelImpl : } out->append("\n"); } - + virtual void destroy() { m_contextMutex.lock(); @@ -1444,21 +1450,21 @@ class ChannelImpl : m_contextMutex.unlock(); throw std::runtime_error("Context already destroyed."); } - - // go into destroyed state ASAP + + // go into destroyed state ASAP m_contextState = CONTEXT_DESTROYED; internalDestroy(); } - + virtual void dispose() { destroy(); - } - + } + private: ~ClientContextImpl() {}; - + void loadConfiguration() { // TODO /* @@ -1472,19 +1478,19 @@ class ChannelImpl : } void internalInitialize() { - + m_timer = new Timer("pvAccess-client timer", lowPriority); m_connector = new BlockingTCPConnector(this, m_receiveBufferSize, m_beaconPeriod); m_transportRegistry = new TransportRegistry(); m_namedLocker = new NamedLockPattern(); - + // setup UDP transport initializeUDPTransport(); // setup search manager m_channelSearchManager = new ChannelSearchManager(this); } - + /** * Initialized UDP transport (broadcast socket and repeater connection). */ @@ -1497,7 +1503,7 @@ class ChannelImpl : listenLocalAddress.ia.sin_family = AF_INET; listenLocalAddress.ia.sin_port = htons(m_broadcastPort); listenLocalAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY); - + // where to send address InetAddrVector* broadcastAddresses = getSocketAddressList("192.168.1.255", m_broadcastPort); // TODO getBroadcastAddresses(broadcastPort) @@ -1505,20 +1511,20 @@ class ChannelImpl : /// TOD !!!! addresses !!!!! by pointer and not copied BlockingUDPConnector* broadcastConnector = new BlockingUDPConnector(true, broadcastAddresses, true); - + m_broadcastTransport = (BlockingUDPTransport*)broadcastConnector->connect( 0, new ClientResponseHandler(this), listenLocalAddress, CA_MINOR_PROTOCOL_REVISION, CA_DEFAULT_PRIORITY); BlockingUDPConnector* searchConnector = new BlockingUDPConnector(false, broadcastAddresses, true); - + // undefined address osiSockAddr undefinedAddress; undefinedAddress.ia.sin_family = AF_INET; undefinedAddress.ia.sin_port = htons(0); undefinedAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY); - + m_searchTransport = (BlockingUDPTransport*)searchConnector->connect( 0, new ClientResponseHandler(this), undefinedAddress, CA_MINOR_PROTOCOL_REVISION, @@ -1531,7 +1537,7 @@ class ChannelImpl : InetAddrVector* appendList = 0; if (m_autoAddressList) appendList = m_broadcastTransport->getSendAddresses(); - + InetAddrVector* list = getSocketAddressList(m_addressList, m_broadcastPort, appendList); // TODO delete !!!! if (list && list->size()) { @@ -1549,40 +1555,41 @@ class ChannelImpl : // TODO } } - + void internalDestroy() { - + // stop searching if (m_channelSearchManager) delete m_channelSearchManager; //->destroy(); - + // stop timer - if (m_timer) + if (m_timer) delete m_timer; // // cleanup // - + // this will also close all CA transports destroyAllChannels(); - + // TODO destroy !!! if (m_broadcastTransport) delete m_broadcastTransport; //->destroy(true); if (m_searchTransport) delete m_searchTransport; //->destroy(true); - + if (m_namedLocker) delete m_namedLocker; if (m_transportRegistry) delete m_transportRegistry; if (m_connector) delete m_connector; + if (m_configuration) delete m_configuration; m_provider->destroy(); delete m_version; m_contextMutex.unlock(); delete this; } - + void destroyAllChannels() { // TODO } @@ -1637,18 +1644,18 @@ class ChannelImpl : ChannelImpl* getChannel(pvAccessID channelID) { Lock guard(&m_cidMapMutex); - CIDChannelMap::iterator it = m_channelsByCID.find(channelID); + CIDChannelMap::iterator it = m_channelsByCID.find(channelID); return (it == m_channelsByCID.end() ? 0 : it->second); } /** * Generate Client channel ID (CID). - * @return Client channel ID (CID). + * @return Client channel ID (CID). */ pvAccessID generateCID() { Lock guard(&m_cidMapMutex); - + // search first free (theoretically possible loop of death) while (m_channelsByCID.find(++m_lastCID) != m_channelsByCID.end()); // reserve CID @@ -1665,7 +1672,7 @@ class ChannelImpl : m_channelsByCID.erase(cid); } - + /** * Get, or create if necessary, transport of given server address. * @param serverAddress required transport address @@ -1684,11 +1691,11 @@ class ChannelImpl : { logger.log(Level.SEVERE, "Failed to create transport for: " + serverAddress, cex); } - */ + */ return 0; - + } - + /** * Internal create channel. */ @@ -1699,16 +1706,16 @@ class ChannelImpl : checkState(); checkChannelName(name); - + if (requester == 0) throw std::runtime_error("null requester"); - + if (priority < ChannelProvider::PRIORITY_MIN || priority > ChannelProvider::PRIORITY_MAX) throw std::range_error("priority out of bounds"); bool lockAcquired = true; // TODO namedLocker->acquireSynchronizationObject(name, LOCK_TIMEOUT); if (lockAcquired) - { + { try { pvAccessID cid = generateCID(); @@ -1718,10 +1725,10 @@ class ChannelImpl : // TODO return 0; } - // TODO namedLocker.releaseSynchronizationObject(name); + // TODO namedLocker.releaseSynchronizationObject(name); } else - { + { // TODO is this OK? throw std::runtime_error("Failed to obtain synchronization lock for '" + name + "', possible deadlock."); } @@ -1735,23 +1742,23 @@ class ChannelImpl : * @throws IllegalStateException */ void destroyChannel(ChannelImpl* channel, bool force) { - + String name = channel->getChannelName(); bool lockAcquired = true; //namedLocker->acquireSynchronizationObject(name, LOCK_TIMEOUT); if (lockAcquired) - { + { try - { + { channel->destroyChannel(force); } catch(...) { // TODO } - // TODO namedLocker->releaseSynchronizationObject(channel.getChannelName()); + // TODO namedLocker->releaseSynchronizationObject(channel.getChannelName()); } else - { - // TODO is this OK? + { + // TODO is this OK? throw std::runtime_error("Failed to obtain synchronization lock for '" + name + "', possible deadlock."); } } @@ -1763,15 +1770,15 @@ class ChannelImpl : ChannelSearchManager* getChannelSearchManager() { return m_channelSearchManager; } - + /** * A space-separated list of broadcast address for process variable name resolution. * Each address must be of the form: ip.number:port or host.name:port */ String m_addressList; - + /** - * Define whether or not the network interfaces should be discovered at runtime. + * Define whether or not the network interfaces should be discovered at runtime. */ bool m_autoAddressList; @@ -1782,22 +1789,22 @@ class ChannelImpl : * the server is no longer present on the network and disconnect. */ float m_connectionTimeout; - + /** * Period in second between two beacon signals. */ float m_beaconPeriod; - + /** * Broadcast (beacon, search) port number to listen to. */ int m_broadcastPort; - + /** * Receive buffer size (max size of payload). */ int m_receiveBufferSize; - + /** * Timer. */ @@ -1807,7 +1814,7 @@ class ChannelImpl : * Broadcast transport needed to listen for broadcasts. */ BlockingUDPTransport* m_broadcastTransport; - + /** * UDP transport needed for channel searches. */ @@ -1820,7 +1827,7 @@ class ChannelImpl : /** * CA transport (virtual circuit) registry. - * This registry contains all active transports - connections to CA servers. + * This registry contains all active transports - connections to CA servers. */ TransportRegistry* m_transportRegistry; @@ -1847,7 +1854,7 @@ class ChannelImpl : Mutex m_cidMapMutex; /** - * Last CID cache. + * Last CID cache. */ pvAccessID m_lastCID; @@ -1859,7 +1866,7 @@ class ChannelImpl : IOIDResponseRequestMap m_pendingResponseRequests; /** - * Last IOID cache. + * Last IOID cache. */ pvAccessID m_lastIOID; @@ -1875,7 +1882,7 @@ class ChannelImpl : // TODO consider std::unordered_map typedef std::map AddressBeaconHandlerMap; AddressBeaconHandlerMap m_beaconHandlers; - + /** * Version. */ @@ -1885,18 +1892,20 @@ class ChannelImpl : * Provider implementation. */ ChannelProviderImpl* m_provider; - + /** * Context state. */ ContextState m_contextState; - + /** * Context sync. mutex. */ Mutex m_contextMutex; friend class ChannelProviderImpl; + + Configuration* m_configuration; }; @@ -1905,7 +1914,7 @@ class ChannelFindRequesterImpl : public ChannelFindRequester virtual void channelFindResult(epics::pvData::Status *status,ChannelFind *channelFind,bool wasFound) { std::cout << "[ChannelFindRequesterImpl] channelFindResult(" - << status->toString() << ", ..., " << wasFound << ")" << std::endl; + << status->toString() << ", ..., " << wasFound << ")" << std::endl; } }; @@ -1915,10 +1924,10 @@ class ChannelRequesterImpl : public ChannelRequester { return "ChannelRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void channelCreated(epics::pvData::Status* status, Channel *channel) @@ -1926,7 +1935,7 @@ class ChannelRequesterImpl : public ChannelRequester std::cout << "channelCreated(" << status->toString() << ", " << (channel ? channel->getChannelName() : "(null)") << ")" << std::endl; } - + virtual void channelStateChange(Channel *c, ConnectionState connectionState) { std::cout << "channelStateChange(" << c->getChannelName() << ", " << ConnectionStateNames[connectionState] << ")" << std::endl; @@ -1939,10 +1948,10 @@ class GetFieldRequesterImpl : public GetFieldRequester { return "GetFieldRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void getDone(epics::pvData::Status *status,epics::pvData::FieldConstPtr field) @@ -1965,22 +1974,22 @@ class ChannelGetRequesterImpl : public ChannelGetRequester ChannelGet *m_channelGet; epics::pvData::PVStructure *m_pvStructure; epics::pvData::BitSet *m_bitSet; - + virtual String getRequesterName() { return "ChannelGetRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void channelGetConnect(epics::pvData::Status *status,ChannelGet *channelGet, epics::pvData::PVStructure *pvStructure,epics::pvData::BitSet *bitSet) { std::cout << "channelGetConnect(" << status->toString() << ")" << std::endl; - + // TODO sync m_channelGet = channelGet; m_pvStructure = pvStructure; @@ -2002,22 +2011,22 @@ class ChannelPutRequesterImpl : public ChannelPutRequester ChannelPut *m_channelPut; epics::pvData::PVStructure *m_pvStructure; epics::pvData::BitSet *m_bitSet; - + virtual String getRequesterName() { return "ChannelPutRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void channelPutConnect(epics::pvData::Status *status,ChannelPut *channelPut, epics::pvData::PVStructure *pvStructure,epics::pvData::BitSet *bitSet) { std::cout << "channelPutConnect(" << status->toString() << ")" << std::endl; - + // TODO sync m_channelPut = channelPut; m_pvStructure = pvStructure; @@ -2043,20 +2052,20 @@ class ChannelPutRequesterImpl : public ChannelPutRequester } }; - - + + class MonitorRequesterImpl : public MonitorRequester { virtual String getRequesterName() { return "MonitorRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } - + virtual void monitorConnect(Status* status, Monitor* monitor, Structure* structure) { std::cout << "monitorConnect(" << status->toString() << ")" << std::endl; @@ -2067,13 +2076,13 @@ class MonitorRequesterImpl : public MonitorRequester std::cout << str << std::endl; } } - + virtual void monitorEvent(Monitor* monitor) { std::cout << "monitorEvent" << std::endl; MonitorElement* element = monitor->poll(); - + String str("changed/overrun "); element->getChangedBitSet()->toString(&str); str += '/'; @@ -2081,35 +2090,35 @@ class MonitorRequesterImpl : public MonitorRequester str += '\n'; element->getPVStructure()->toString(&str); std::cout << str << std::endl; - + monitor->release(element); } - + virtual void unlisten(Monitor* monitor) { std::cout << "unlisten" << std::endl; } -}; +}; class ChannelProcessRequesterImpl : public ChannelProcessRequester { ChannelProcess *m_channelProcess; - + virtual String getRequesterName() { return "ProcessRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void channelProcessConnect(epics::pvData::Status *status,ChannelProcess *channelProcess) { std::cout << "channelProcessConnect(" << status->toString() << ")" << std::endl; - + // TODO sync m_channelProcess = channelProcess; } @@ -2126,33 +2135,33 @@ int main(int argc,char *argv[]) ClientContextImpl* context = new ClientContextImpl(); context->printInfo(); - context->initialize(); + context->initialize(); context->printInfo(); - + epicsThreadSleep ( 1.0 ); - + //ChannelFindRequesterImpl findRequester; //context->getProvider()->channelFind("something", &findRequester); - + ChannelRequesterImpl channelRequester; Channel* channel = context->getProvider()->createChannel("structureArrayTest", &channelRequester); channel->printInfo(); /* GetFieldRequesterImpl getFieldRequesterImpl; channel->getField(&getFieldRequesterImpl, "timeStamp.secondsPastEpoch"); - + ChannelGetRequesterImpl channelGetRequesterImpl; ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, 0); channelGet->get(false); channelGet->destroy(); - + ChannelPutRequesterImpl channelPutRequesterImpl; ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, 0); channelPut->get(); channelPut->put(false); channelPut->destroy(); - - + + MonitorRequesterImpl monitorRequesterImpl; Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, 0); @@ -2165,20 +2174,20 @@ int main(int argc,char *argv[]) ChannelProcess* channelProcess = channel->createChannelProcess(&channelProcessRequester, 0); channelProcess->process(false); channelProcess->destroy(); - + status = monitor->stop(); std::cout << "monitor->stop() = " << status->toString() << std::endl; delete status; - - + + monitor->destroy(); */ epicsThreadSleep ( 10.0 ); channel->destroy(); - + context->destroy(); - + std::cout << "-----------------------------------------------------------------------" << std::endl; getShowConstructDestruct()->constuctDestructTotals(stdout); return(0); From d7852fa666f60ec1bb72d99ca2b72437f9e0b1b2 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Mon, 10 Jan 2011 14:11:27 +0100 Subject: [PATCH 04/10] Moved AbstractResponseHandler definition to a more logical place. --- .../remote/abstractResponseHandler.cpp | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 pvAccessApp/remote/abstractResponseHandler.cpp diff --git a/pvAccessApp/remote/abstractResponseHandler.cpp b/pvAccessApp/remote/abstractResponseHandler.cpp new file mode 100644 index 0000000..b5399b0 --- /dev/null +++ b/pvAccessApp/remote/abstractResponseHandler.cpp @@ -0,0 +1,42 @@ +/* + * abstractResponseHandler.cpp + * + * Created on: Jan 10, 2011 + * Author: Miha Vitorovic + */ + +#include "remote.h" +#include "hexDump.h" + +#include + +#include + +#include + +using std::ostringstream; +using std::hex; + +using namespace epics::pvData; + +namespace epics { + namespace pvAccess { + + void AbstractResponseHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + if(_debug) { + char ipAddrStr[48]; + ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); + + ostringstream prologue; + prologue<<"Message [0x"<getArray(), + payloadBuffer->getPosition(), payloadSize); + } + } + } +} From 4e96a20766d5230757b7120573196145f5a60be5 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Mon, 10 Jan 2011 15:18:15 +0100 Subject: [PATCH 05/10] ArrayFIFO template specialization for pointers done. --- pvAccessApp/utils/arrayFIFO.h | 367 +++++++++++++++++++++++++++++++- testApp/utils/arrayFIFOTest.cpp | 136 +++++++++++- 2 files changed, 494 insertions(+), 9 deletions(-) diff --git a/pvAccessApp/utils/arrayFIFO.h b/pvAccessApp/utils/arrayFIFO.h index d449c56..c96e7c5 100644 --- a/pvAccessApp/utils/arrayFIFO.h +++ b/pvAccessApp/utils/arrayFIFO.h @@ -118,11 +118,11 @@ namespace epics { #ifdef ARRAY_FIFO_DEBUG void debugState() { //size_t mask = _size-1; - std::cout<<"h:"<<_head<<",t:"<<_tail<<",c:"<<_size; + std::cout<<"Simple, h:"<<_head<<",t:"<<_tail<<",c:"<<_size; std::cout<<",s:"< - int ArrayFIFO::MIN_INITIAL_CAPACITY = 8; + /* * * * * * * * * * * template implementation * * * * * * * * * * */ template void ArrayFIFO::arraycopy(T* src, size_t srcPos, T* dest, @@ -374,6 +370,361 @@ namespace epics { return false; } + /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + + /** + * Template specialization for pointers + */ + template + class ArrayFIFO { + public: + /** + * Constructs an empty array deque with an initial capacity + * sufficient to hold the specified number of elements. Array FIFO + * is designed to hold objects allocated on the heap. + * + * @param[in] numElements lower bound on initial capacity of the + * deque. Default value is 16 elements. + */ + ArrayFIFO(size_t numElements = 16); + ~ArrayFIFO(); + + /** + * Inserts the specified element at the front of this deque. + * + * @param[in] e the element to add. + */ + void addFirst(const T* e); + + /** + * Inserts the specified element at the end of this deque. + * @param[in] e the element to add + */ + void addLast(const T* e); + + T* pollFirst(); + T* pollLast(); + T* peekFirst(); + T* peekLast(); + + /** + * Pushes an element onto the stack represented by this deque. In other + * words, inserts the element at the front of this deque. + * + * @param[in] e the element to push + */ + void push(const T* e); + + /** + * Pops an element from the stack represented by this deque. In other + * words, removes and returns the first element of this deque. + * + * @return the element at the front of this deque (which is the top + * of the stack represented by this deque), null if no element available. + */ + T* pop(); + + /** + * Looks at the object at the top of this stack without removing it + * from the stack. + * @return the object at the top of this stack (the last item + * of the Vector object). + */ + T* peek(); + + /** + * Returns the number of elements in this deque. + * @return the number of elements in this deque + */ + size_t size(); + + /** + * Returns true if this deque contains no elements. + * + * @return true if this deque contains no elements + */ + bool isEmpty(); + + /** + * Removes all of the elements from this deque. + * The deque will be empty after this call returns. + * + * @param freeElements tells the methods to automatically free + * the memory of all the elments in the FIFO. Default is {@code true} + */ + void clear(); + + /** + * Removes the first occurrence of the specified element in this + * deque (when traversing the deque from head to tail). + * If the deque does not contain the element, it is unchanged. + * More formally, removes the first element e such that + * o.equals(e) (if such an element exists). + * Returns true if this deque contained the specified element + * (or equivalently, if this deque changed as a result of the call). + * + * @param o element to be removed from this deque, if present + * @return true if the deque contained the specified element + */ + bool remove(const T* e); + +#ifdef ARRAY_FIFO_DEBUG + void debugState() { + //size_t mask = _size-1; + std::cout<<"Pointer, h:"<<_head<<",t:"<<_tail<<",c:"<<_size; + std::cout<<",s:"<This method is called delete rather than remove to emphasize + * that its semantics differ from those of {@link List#remove(int)}. + * + * @return true if elements moved backwards + */ + bool del(const size_t i); + + }; + + /* * * * * * * * * * * template implementation * * * * * * * * * * */ + + template + void ArrayFIFO::arraycopy(T** src, size_t srcPos, T** dest, + size_t destPos, size_t length) { + if(srcPos=0; i--) + dest[destPos+i] = src[srcPos+i]; + else + for(size_t i = 0; i + void ArrayFIFO::allocateElements(size_t numElements) { + _size = MIN_INITIAL_CAPACITY; + // Find the best power of two to hold elements. + // Tests "<=" because arrays aren't kept full. + if(numElements>=_size) { + _size = numElements; + _size |= (_size>>1); + _size |= (_size>>2); + _size |= (_size>>4); + _size |= (_size>>8); + _size |= (_size>>16); + _size++; + } + _elements = new T*[_size]; + } + + template + void ArrayFIFO::doubleCapacity() { + size_t p = _head; + size_t n = _size; + size_t r = n-p; // number of elements to the right of p + size_t newCapacity = n<<1; + T** a = new T*[newCapacity]; + arraycopy(_elements, p, a, 0, r); + arraycopy(_elements, 0, a, r, p); + delete[] _elements; + _elements = a; + _size = newCapacity; + _head = 0; + _tail = n; + + } + + template + ArrayFIFO::ArrayFIFO(size_t numElements) : + _head(0), _tail(0), _mutex() { + allocateElements(numElements); + } + + template + ArrayFIFO::~ArrayFIFO() { + delete[] _elements; + } + + template + void ArrayFIFO::addFirst(const T* e) { + Lock lock(&_mutex); + + _elements[_head = (_head-1)&(_size-1)] = const_cast(e); + if(_head==_tail) doubleCapacity(); + } + + template + void ArrayFIFO::addLast(const T* e) { + Lock lock(&_mutex); + + _elements[_tail] = const_cast(e); + if((_tail = (_tail+1)&(_size-1))==_head) doubleCapacity(); + } + + template + T* ArrayFIFO::pollFirst() { + Lock lock(&_mutex); + + if(isEmpty()) return NULL; + + T* result = _elements[_head]; // Element is null if deque empty + _head = (_head+1)&(_size-1); + return result; + } + + template + T* ArrayFIFO::pollLast() { + Lock lock(&_mutex); + + if(isEmpty()) return NULL; + + _tail = (_tail-1)&(_size-1); + return _elements[_tail]; + } + + template + T* ArrayFIFO::peekFirst() { + Lock lock(&_mutex); + + if(isEmpty()) return NULL; + + return _elements[_head]; + } + + template + T* ArrayFIFO::peekLast() { + Lock lock(&_mutex); + + if(isEmpty()) return NULL; + + return _elements[(_tail-1)&(_size-1)]; + } + + template + void ArrayFIFO::push(const T* e) { + addLast(e); + } + + template + T* ArrayFIFO::pop() { + return pollLast(); + } + + template + T* ArrayFIFO::peek() { + return peekFirst(); + } + + template + size_t ArrayFIFO::size() { + Lock lock(&_mutex); + + return (_tail-_head)&(_size-1); + } + + template + bool ArrayFIFO::isEmpty() { + Lock lock(&_mutex); + + return _head==_tail; + } + + template + void ArrayFIFO::clear() { + Lock lock(&_mutex); + + _head = _tail = 0; + } + + template + bool ArrayFIFO::del(const size_t i) { + // i is absolute index in the array + size_t mask = _size-1; + size_t h = _head; + size_t t = _tail; + size_t front = (i-h)&mask; + size_t back = (t-i)&mask; + + // Invariant: head <= i < tail mod circularity + if(front>=((t-h)&mask)) THROW_BASE_EXCEPTION( + "Illegal State Exception"); // concurrency problem!!! + + // Optimize for least element motion + if(front0) _elements[0] = _elements[mask]; + arraycopy(_elements, h, _elements, h+1, mask-h); + } + _head = (h+1)&mask; + + return false; + } + else { + if(i + bool ArrayFIFO::remove(const T* e) { + Lock lock(&_mutex); + + if(isEmpty()) return false; // nothing to do + + size_t mask = _size-1; + size_t i = _head; + while(i!=_tail) { + if(e==_elements[i]) { + del(i); + return true; + } + i = (i+1)&mask; + } + return false; + } + } } diff --git a/testApp/utils/arrayFIFOTest.cpp b/testApp/utils/arrayFIFOTest.cpp index 61cbfc7..88617e1 100644 --- a/testApp/utils/arrayFIFOTest.cpp +++ b/testApp/utils/arrayFIFOTest.cpp @@ -15,7 +15,9 @@ using namespace epics::pvAccess; using std::cout; using std::endl; -int main(int argc, char *argv[]) { +void testSimpleType() { + cout<<"\nTests for simple type template."< fifoInt; assert(fifoInt.size()==0); @@ -133,5 +135,137 @@ int main(int argc, char *argv[]) { assert(fifoInt.isEmpty()); cout<<"\nPASSED!\n"; +} + +void testPointerType() { + cout<<"\nTests for pointer type template."< fifoInt; + + assert(fifoInt.size()==0); + assert(fifoInt.isEmpty()); + + cout<<"Testing clear."< Date: Mon, 10 Jan 2011 15:37:25 +0100 Subject: [PATCH 06/10] GrowingCircularBuffer template specialization for pointers done. --- pvAccessApp/utils/growingCircularBuffer.h | 166 +++++++++++++++++--- testApp/utils/arrayFIFOTest.cpp | 138 ++++++++-------- testApp/utils/growingCircularBufferTest.cpp | 49 +++++- 3 files changed, 261 insertions(+), 92 deletions(-) diff --git a/pvAccessApp/utils/growingCircularBuffer.h b/pvAccessApp/utils/growingCircularBuffer.h index 6a993a1..63bb242 100644 --- a/pvAccessApp/utils/growingCircularBuffer.h +++ b/pvAccessApp/utils/growingCircularBuffer.h @@ -28,8 +28,8 @@ namespace epics { * Create a GrowingCircularBuffer with the given capacity. **/ GrowingCircularBuffer(size_t capacity = 16) : - _elements(new T[capacity]), _takePointer(0), _putPointer(0), _count(0), _size(capacity) - { + _elements(new T[capacity]), _takePointer(0), _putPointer(0), + _count(0), _size(capacity) { } ~GrowingCircularBuffer() { @@ -94,13 +94,9 @@ namespace epics { size_t length); }; - /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * - * g++ requires template definition inside a header file. - * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ - template - void GrowingCircularBuffer::arraycopy(T* src, size_t srcPos, T* dest, - size_t destPos, size_t length) { + void GrowingCircularBuffer::arraycopy(T* src, size_t srcPos, + T* dest, size_t destPos, size_t length) { if(srcPos=0; i--) dest[destPos+i] = src[srcPos+i]; @@ -111,17 +107,16 @@ namespace epics { template bool GrowingCircularBuffer::insert(const T x) { - if (_count == _size) - { + if(_count==_size) { // we are full, grow by factor 2 - T* newElements = new T[_size * 2]; + T* newElements = new T[_size*2]; // invariant: _takePointer < _size - size_t split = _size - _takePointer; - if (split > 0) - arraycopy(_elements, _takePointer, newElements, 0, split); - if (_takePointer != 0) - arraycopy(_elements, 0, newElements, split, _putPointer); + size_t split = _size-_takePointer; + if(split>0) arraycopy(_elements, _takePointer, newElements, 0, + split); + if(_takePointer!=0) arraycopy(_elements, 0, newElements, split, + _putPointer); _takePointer = 0; _putPointer = _size; @@ -132,8 +127,8 @@ namespace epics { _count++; _elements[_putPointer] = x; - if (++_putPointer >= _size) _putPointer = 0; - return _count == 1; + if(++_putPointer>=_size) _putPointer = 0; + return _count==1; } template @@ -146,6 +141,141 @@ namespace epics { return old; } + /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + + /** + * Template specialization for pointers. + * Implementation of circular FIFO unbouded buffer. + * Instance is not thread safe. + * @author Miha Vitorovic + */ + template + class GrowingCircularBuffer { + public: + + /** + * Create a GrowingCircularBuffer with the given capacity. + **/ + GrowingCircularBuffer(size_t capacity = 16) : + _elements(new T*[capacity]), _takePointer(0), _putPointer(0), + _count(0), _size(capacity) { + } + + ~GrowingCircularBuffer() { + delete[] _elements; + } + + /** + * Get number of elements in the buffer. + * @return number of elements in the buffer. + */ + inline size_t size() { + return _count; + } + + /** + * Get current buffer capacity. + * @return buffer current capacity. + */ + inline size_t capacity() { + return _size; + } + + /** + * Insert a new element in to the buffer. + * If buffer full the buffer is doubled. + * + * @param x element to insert. + * @return true if first element. + */ + bool insert(const T* x); + + /** + * Extract the oldest element from the buffer. + * @return the oldest element from the buffer. + */ + T* extract(); + + private: + /** + * Array (circular buffer) of elements. + */ + T** _elements; + + /** + * Take (read) pointer. + */ + size_t _takePointer; + + /** + * Put (write) pointer. + */ + size_t _putPointer; + + /** + * Number of elements in the buffer. + */ + size_t _count; + + size_t _size; + + void arraycopy(T** src, size_t srcPos, T** dest, size_t destPos, + size_t length); + }; + + /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * g++ requires template definition inside a header file. + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + + template + void GrowingCircularBuffer::arraycopy(T** src, size_t srcPos, + T** dest, size_t destPos, size_t length) { + if(srcPos=0; i--) + dest[destPos+i] = src[srcPos+i]; + else + for(size_t i = 0; i + bool GrowingCircularBuffer::insert(const T* x) { + if(_count==_size) { + // we are full, grow by factor 2 + T** newElements = new T*[_size*2]; + + // invariant: _takePointer < _size + size_t split = _size-_takePointer; + if(split>0) arraycopy(_elements, _takePointer, newElements, 0, + split); + if(_takePointer!=0) arraycopy(_elements, 0, newElements, split, + _putPointer); + + _takePointer = 0; + _putPointer = _size; + _size *= 2; + delete[] _elements; + _elements = newElements; + } + _count++; + + _elements[_putPointer] = const_cast(x); + if(++_putPointer>=_size) _putPointer = 0; + return _count==1; + } + + template + T* GrowingCircularBuffer::extract() { + if(_count==0) return NULL; + + _count--; + T* old = _elements[_takePointer]; + if(++_takePointer>=_size) _takePointer = 0; + return old; + } + } } #endif /* GROWINGCIRCULARBUFFER_H_ */ diff --git a/testApp/utils/arrayFIFOTest.cpp b/testApp/utils/arrayFIFOTest.cpp index 88617e1..966d2c1 100644 --- a/testApp/utils/arrayFIFOTest.cpp +++ b/testApp/utils/arrayFIFOTest.cpp @@ -140,7 +140,7 @@ void testSimpleType() { void testPointerType() { cout<<"\nTests for pointer type template."< fifoInt; @@ -148,110 +148,110 @@ void testPointerType() { assert(fifoInt.isEmpty()); cout<<"Testing clear."< cb(CAPACITY); - cout<<"Testing circular buffer."< cb(CAPACITY); + int testVals[] = {0,1,2,3,4,5,6,7,8,9,11,12,13,14,15,16,17,18,19,20}; + + cout<<"Testing circular buffer pointer type."< Date: Mon, 10 Jan 2011 15:51:35 +0100 Subject: [PATCH 07/10] - Connecting to invalid server (port) now generates an error. - Removed usage 'ipAddrToA' from the code. Replaced with 'ipAddrToDottedIP'. --- pvAccessApp/remote/abstractResponseHandler.cpp | 2 +- .../remote/blockingClientTCPTransport.cpp | 6 +++--- .../remote/blockingServerTCPTransport.cpp | 2 +- pvAccessApp/remote/blockingTCPAcceptor.cpp | 10 ++++++---- pvAccessApp/remote/blockingTCPConnector.cpp | 17 ++++++++++++++--- pvAccessApp/server/responseHandlers.cpp | 2 +- testApp/remote/testRemoteClientImpl.cpp | 3 ++- 7 files changed, 28 insertions(+), 14 deletions(-) diff --git a/pvAccessApp/remote/abstractResponseHandler.cpp b/pvAccessApp/remote/abstractResponseHandler.cpp index b5399b0..2d5f0b3 100644 --- a/pvAccessApp/remote/abstractResponseHandler.cpp +++ b/pvAccessApp/remote/abstractResponseHandler.cpp @@ -27,7 +27,7 @@ namespace epics { int payloadSize, ByteBuffer* payloadBuffer) { if(_debug) { char ipAddrStr[48]; - ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); ostringstream prologue; prologue<<"Message [0x"<ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Acquiring transport to %s.", ipAddrStr); _ownersMutex->lock(); @@ -127,7 +127,7 @@ namespace epics { int refs = _owners->size(); if(refs>0) { char ipAddrStr[48]; - ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf( errlogInfo, "Transport to %s still has %d client(s) active and closing...", @@ -145,7 +145,7 @@ namespace epics { if(_closed) return; char ipAddrStr[48]; - ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Releasing transport to %s.", ipAddrStr); diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp index 9048920..042a2b8 100644 --- a/pvAccessApp/remote/blockingServerTCPTransport.cpp +++ b/pvAccessApp/remote/blockingServerTCPTransport.cpp @@ -52,7 +52,7 @@ namespace epics { if(_channels->size()==0) return; char ipAddrStr[64]; - ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf( errlogInfo, diff --git a/pvAccessApp/remote/blockingTCPAcceptor.cpp b/pvAccessApp/remote/blockingTCPAcceptor.cpp index c5bfcb7..10c5f21 100644 --- a/pvAccessApp/remote/blockingTCPAcceptor.cpp +++ b/pvAccessApp/remote/blockingTCPAcceptor.cpp @@ -51,7 +51,7 @@ namespace epics { char strBuffer[64]; char ipAddrStr[48]; - ipAddrToA(&_bindAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_bindAddress->ia, ipAddrStr, sizeof(ipAddrStr)); int tryCount = 0; while(tryCount<2) { @@ -150,7 +150,7 @@ namespace epics { void BlockingTCPAcceptor::handleEvents() { // rise level if port is assigned dynamically char ipAddrStr[48]; - ipAddrToA(&_bindAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_bindAddress->ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Accepting connections at %s.", ipAddrStr); @@ -183,7 +183,8 @@ namespace epics { _serverSocketChannel, &address.sa, &len); if(newClient!=INVALID_SOCKET) { // accept succeeded - ipAddrToA(&address.ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&address.ia, ipAddrStr, + sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Accepted connection from CA client: %s", ipAddrStr); @@ -273,7 +274,8 @@ namespace epics { if(_serverSocketChannel!=INVALID_SOCKET) { char ipAddrStr[48]; - ipAddrToA(&_bindAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_bindAddress->ia, ipAddrStr, + sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Stopped accepting connections at %s.", ipAddrStr); diff --git a/pvAccessApp/remote/blockingTCPConnector.cpp b/pvAccessApp/remote/blockingTCPConnector.cpp index 4a066b6..5c3b021 100644 --- a/pvAccessApp/remote/blockingTCPConnector.cpp +++ b/pvAccessApp/remote/blockingTCPConnector.cpp @@ -38,7 +38,7 @@ namespace epics { if(tryCount>0) epicsThreadSleep(0.1); char strBuffer[64]; - ipAddrToA(&address.ia, strBuffer, sizeof(strBuffer)); + ipAddrToDottedIP(&address.ia, strBuffer, sizeof(strBuffer)); errlogSevPrintf(errlogInfo, "Opening socket to CA server %s, attempt %d.", @@ -73,7 +73,7 @@ namespace epics { SOCKET socket = INVALID_SOCKET; char ipAddrStr[64]; - ipAddrToA(&address.ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&address.ia, ipAddrStr, sizeof(ipAddrStr)); // first try to check cache w/o named lock... BlockingClientTCPTransport @@ -106,6 +106,17 @@ namespace epics { ipAddrStr); socket = tryConnect(address, 3); + // verify + if(socket==INVALID_SOCKET) { + errlogSevPrintf( + errlogMajor, + "Connection to CA server %s failed.", + ipAddrStr); + ostringstream temp; + temp<<"Failed to verify TCP connection to '"<waitUntilVerified(3.0)) { transport->close(true); errlogSevPrintf( - errlogInfo, + errlogMinor, "Connection to CA server %s failed to be validated, closing it.", ipAddrStr); ostringstream temp; diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index c0ab2ae..fe57284 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -31,7 +31,7 @@ namespace epics { transport, version, command, payloadSize, payloadBuffer); char ipAddrStr[48]; - ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Undecipherable message (bad response type %d) from %s.", diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 987006d..8ab748b 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -410,7 +410,8 @@ class ClientContextImpl; { char ipAddrStr[48]; std::cout << "ole" << std::endl; - ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&responseFrom->ia, ipAddrStr, + sizeof(ipAddrStr)); std::cout << "ole2" << std::endl; ostringstream prologue; From 487882dbff727bb1d5bb72fa8552aefc8124c1b5 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Mon, 10 Jan 2011 16:13:23 +0100 Subject: [PATCH 08/10] - 'getBroadcastAddresses' now accepts a port as parameter which in then used in all returned broadcast addresses. - 'getBroadcastAddresses' now returns a default broadcast address (255.255.255.255) on failure, if no other broadcast address was found. --- pvAccessApp/utils/inetAddressUtil.cpp | 16 +++++++++++++++- pvAccessApp/utils/inetAddressUtil.h | 8 ++++++-- testApp/utils/inetAddressUtilsTest.cpp | 4 ++-- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/pvAccessApp/utils/inetAddressUtil.cpp b/pvAccessApp/utils/inetAddressUtil.cpp index 7eeb41c..baded12 100644 --- a/pvAccessApp/utils/inetAddressUtil.cpp +++ b/pvAccessApp/utils/inetAddressUtil.cpp @@ -32,10 +32,19 @@ using namespace epics::pvData; namespace epics { namespace pvAccess { + void addDefaultBroadcastAddress(InetAddrVector* v, in_port_t p) { + osiSockAddr* pNewNode = new osiSockAddr; + pNewNode->ia.sin_family = AF_INET; + pNewNode->ia.sin_addr.s_addr = htonl(INADDR_BROADCAST); + pNewNode->ia.sin_port = htons(p); + v->push_back(pNewNode); + } + /* port of osiSockDiscoverBroadcastAddresses() in * epics/base/src/libCom/osi/os/default/osdNetIntf.c */ - InetAddrVector* getBroadcastAddresses(SOCKET sock) { + InetAddrVector* getBroadcastAddresses(SOCKET sock, + in_port_t defaultPort) { static const unsigned nelem = 100; int status; struct ifconf ifconf; @@ -56,6 +65,7 @@ namespace epics { if(!pIfreqList) { errlogSevPrintf(errlogMajor, "getBroadcastAddresses(): no memory to complete request"); + addDefaultBroadcastAddress(retVector, defaultPort); return retVector; } @@ -67,6 +77,7 @@ namespace epics { errlogSevPrintf(errlogMinor, "getBroadcastAddresses(): unable to fetch network interface configuration"); delete[] pIfreqList; + addDefaultBroadcastAddress(retVector, defaultPort); return retVector; } @@ -117,6 +128,8 @@ namespace epics { errlogSevPrintf(errlogMajor, "getBroadcastAddresses(): no memory available for configuration"); delete[] pIfreqList; + if(retVector->size()==0) addDefaultBroadcastAddress( + retVector, defaultPort); return retVector; } @@ -168,6 +181,7 @@ namespace epics { delete pNewNode; continue; } + pNewNode->ia.sin_port = htons(defaultPort); retVector->push_back(pNewNode); } diff --git a/pvAccessApp/utils/inetAddressUtil.h b/pvAccessApp/utils/inetAddressUtil.h index ee7f381..e8ee082 100644 --- a/pvAccessApp/utils/inetAddressUtil.h +++ b/pvAccessApp/utils/inetAddressUtil.h @@ -33,8 +33,12 @@ namespace epics { /** * returns a vector containing all the IPv4 broadcast addresses * on this machine. IPv6 doesn't have a local broadcast address. - */ - InetAddrVector* getBroadcastAddresses(SOCKET sock); + * Conversion of the defaultPort to network byte order performed by + * the function. + * TODO: Windows implementation of the function. + */ + InetAddrVector* getBroadcastAddresses(SOCKET sock, + in_port_t defaultPort); /** * Encode IPv4 address as IPv6 address. diff --git a/testApp/utils/inetAddressUtilsTest.cpp b/testApp/utils/inetAddressUtilsTest.cpp index 8594254..3ceae49 100644 --- a/testApp/utils/inetAddressUtilsTest.cpp +++ b/testApp/utils/inetAddressUtilsTest.cpp @@ -129,11 +129,11 @@ int main(int argc, char *argv[]) { cout<<"\nPASSED!\n"; SOCKET socket = epicsSocketCreate(AF_INET, SOCK_STREAM, IPPROTO_TCP); - InetAddrVector* broadcasts = getBroadcastAddresses(socket); + InetAddrVector* broadcasts = getBroadcastAddresses(socket,6678); cout<<"Broadcast addresses: "<size()<size(); i++) { cout<<"Broadcast address: "; - cout<at(i), false)<at(i))< Date: Mon, 10 Jan 2011 16:34:56 +0100 Subject: [PATCH 09/10] search req. accepted --- pvAccessApp/Makefile | 5 + pvAccessApp/remote/beaconHandler.h | 3 +- .../remote/blockingClientTCPTransport.cpp | 6 +- pvAccessApp/remote/blockingTCPConnector.cpp | 6 +- pvAccessApp/remote/channelSearchManager.cpp | 12 +- pvAccessApp/remote/channelSearchManager.h | 10 +- pvAccessApp/remoteClient/clientContextImpl.h | 50 ++++++ pvAccessApp/server/responseHandlers.cpp | 4 +- pvAccessApp/utils/arrayFIFO.h | 8 +- pvAccessApp/utils/inetAddressUtil.cpp | 57 +++++-- testApp/remote/testChannelSearchManager.cpp | 4 +- testApp/remote/testRemoteClientImpl.cpp | 160 ++++++++++++------ testApp/utils/inetAddressUtilsTest.cpp | 2 +- 13 files changed, 229 insertions(+), 98 deletions(-) create mode 100644 pvAccessApp/remoteClient/clientContextImpl.h diff --git a/pvAccessApp/Makefile b/pvAccessApp/Makefile index bd26f53..687ee94 100644 --- a/pvAccessApp/Makefile +++ b/pvAccessApp/Makefile @@ -68,6 +68,11 @@ LIBSRCS += blockingServerTCPTransport.cpp LIBSRCS += blockingTCPAcceptor.cpp LIBSRCS += channelSearchManager.cpp + +SRC_DIRS += $(PVACCESS)/remoteClient +INC += clientContextImpl.h + + LIBRARY = pvAccess pvAccess_LIBS += Com diff --git a/pvAccessApp/remote/beaconHandler.h b/pvAccessApp/remote/beaconHandler.h index 2a033a6..ed9b93a 100644 --- a/pvAccessApp/remote/beaconHandler.h +++ b/pvAccessApp/remote/beaconHandler.h @@ -18,8 +18,9 @@ using namespace epics::pvData; namespace epics { namespace pvAccess { - //TODO delete this + class ClientContextImpl; + /** * BeaconHandler */ diff --git a/pvAccessApp/remote/blockingClientTCPTransport.cpp b/pvAccessApp/remote/blockingClientTCPTransport.cpp index a18f7a5..b793374 100644 --- a/pvAccessApp/remote/blockingClientTCPTransport.cpp +++ b/pvAccessApp/remote/blockingClientTCPTransport.cpp @@ -97,7 +97,7 @@ namespace epics { if(_closed) return false; char ipAddrStr[48]; - ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Acquiring transport to %s.", ipAddrStr); _ownersMutex->lock(); @@ -127,7 +127,7 @@ namespace epics { int refs = _owners->size(); if(refs>0) { char ipAddrStr[48]; - ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf( errlogInfo, "Transport to %s still has %d client(s) active and closing...", @@ -145,7 +145,7 @@ namespace epics { if(_closed) return; char ipAddrStr[48]; - ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Releasing transport to %s.", ipAddrStr); diff --git a/pvAccessApp/remote/blockingTCPConnector.cpp b/pvAccessApp/remote/blockingTCPConnector.cpp index 4a066b6..1bd09d4 100644 --- a/pvAccessApp/remote/blockingTCPConnector.cpp +++ b/pvAccessApp/remote/blockingTCPConnector.cpp @@ -38,7 +38,7 @@ namespace epics { if(tryCount>0) epicsThreadSleep(0.1); char strBuffer[64]; - ipAddrToA(&address.ia, strBuffer, sizeof(strBuffer)); + ipAddrToDottedIP(&address.ia, strBuffer, sizeof(strBuffer)); errlogSevPrintf(errlogInfo, "Opening socket to CA server %s, attempt %d.", @@ -73,7 +73,7 @@ namespace epics { SOCKET socket = INVALID_SOCKET; char ipAddrStr[64]; - ipAddrToA(&address.ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&address.ia, ipAddrStr, sizeof(ipAddrStr)); // first try to check cache w/o named lock... BlockingClientTCPTransport @@ -106,6 +106,8 @@ namespace epics { ipAddrStr); socket = tryConnect(address, 3); + if (socket == INVALID_SOCKET) + return 0; // use blocking channel // socket is blocking bya default diff --git a/pvAccessApp/remote/channelSearchManager.cpp b/pvAccessApp/remote/channelSearchManager.cpp index 992af77..0e93554 100644 --- a/pvAccessApp/remote/channelSearchManager.cpp +++ b/pvAccessApp/remote/channelSearchManager.cpp @@ -60,7 +60,7 @@ bool BaseSearchInstance::generateSearchRequestMessage(ByteBuffer* requestMessage return false; } - const string name = getChannelName(); + const String name = getSearchInstanceName(); // not nice... const int addedPayloadSize = sizeof(int32)/sizeof(int8) + (1 + sizeof(int32)/sizeof(int8) + name.length()); @@ -69,7 +69,7 @@ bool BaseSearchInstance::generateSearchRequestMessage(ByteBuffer* requestMessage return false; } - requestMessage->putInt(getChannelID()); + requestMessage->putInt(getSearchInstanceID()); SerializeHelper::serializeString(name, requestMessage, control); requestMessage->putInt(PAYLOAD_POSITION, requestMessage->getPosition() - CA_MESSAGE_HEADER_SIZE); @@ -484,17 +484,17 @@ void ChannelSearchManager::registerChannel(SearchInstance* channel) if(_canceled) return; //overrides if already registered - _channels[channel->getChannelID()] = channel; + _channels[channel->getSearchInstanceID()] = channel; _timers[0]->installChannel(channel); } void ChannelSearchManager::unregisterChannel(SearchInstance* channel) { Lock guard(&_mutex); - _channelsIter = _channels.find(channel->getChannelID()); + _channelsIter = _channels.find(channel->getSearchInstanceID()); if(_channelsIter != _channels.end()) { - _channels.erase(channel->getChannelID()); + _channels.erase(channel->getSearchInstanceID()); } channel->removeAndUnsetListOwnership(); @@ -508,7 +508,7 @@ void ChannelSearchManager::searchResponse(int32 cid, int32 seqNo, int8 minorRevi _channelsIter = _channels.find(cid); if(_channelsIter != _channels.end()) { - SearchInstance* si = _channelsIter->second; + si = _channelsIter->second; _channels.erase(_channelsIter); si->removeAndUnsetListOwnership(); } diff --git a/pvAccessApp/remote/channelSearchManager.h b/pvAccessApp/remote/channelSearchManager.h index c36f19e..5479825 100644 --- a/pvAccessApp/remote/channelSearchManager.h +++ b/pvAccessApp/remote/channelSearchManager.h @@ -40,13 +40,13 @@ public: * * @return channel ID. */ - virtual pvAccessID getChannelID() = 0; + virtual pvAccessID getSearchInstanceID() = 0; /** - * Return channel name. + * Return search instance, e.g. channel, name. * * @return channel channel name. */ - virtual String getChannelName() = 0; + virtual String getSearchInstanceName() = 0; /** * Removes the owner of this search instance. */ @@ -93,8 +93,8 @@ class BaseSearchInstance : public SearchInstance { public: virtual ~BaseSearchInstance() {}; - virtual pvAccessID getChannelID() = 0; - virtual string getChannelName() = 0; + virtual pvAccessID getSearchInstanceID() = 0; + virtual String getSearchInstanceName() = 0; virtual void unsetListOwnership(); virtual void addAndSetListOwnership(ArrayFIFO* newOwner, Mutex* ownerMutex, int32 index); virtual void removeAndUnsetListOwnership(); diff --git a/pvAccessApp/remoteClient/clientContextImpl.h b/pvAccessApp/remoteClient/clientContextImpl.h new file mode 100644 index 0000000..bf5bf1e --- /dev/null +++ b/pvAccessApp/remoteClient/clientContextImpl.h @@ -0,0 +1,50 @@ +/* + * clientContext.h + * + * Created on: Dec 21, 2010 + * Author: msekoran + */ + +#ifndef CLIENTCONTEXTIMPL_H_ +#define CLIENTCONTEXTIMPL_H_ + +#include +#include +#include + +namespace epics { + namespace pvAccess { + + class ChannelImpl : + public Channel , + public TransportClient, + public TransportSender, + public BaseSearchInstance + { + public: + virtual pvAccessID getChannelID() = 0; + virtual void destroyChannel(bool force) = 0; + + }; + + class ClientContextImpl : public ClientContext, public Context + { + public: + virtual ChannelSearchManager* getChannelSearchManager() = 0; + virtual void checkChannelName(String& name) = 0; + + virtual void registerChannel(ChannelImpl* channel) = 0; + virtual void unregisterChannel(ChannelImpl* channel) = 0; + + virtual void destroyChannel(ChannelImpl* channel, bool force) = 0; + virtual ChannelImpl* createChannelInternal(String name, ChannelRequester* requester, short priority, InetAddrVector* addresses) = 0; + + virtual Transport* getTransport(TransportClient* client, osiSockAddr* serverAddress, int16 minorRevision, int16 priority) = 0; + + + }; + + } +} + +#endif /* CLIENTCONTEXTIMPL_H_ */ diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index e3bbad5..f191c32 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -29,7 +29,7 @@ namespace epics { int payloadSize, ByteBuffer* payloadBuffer) { if(_debug) { char ipAddrStr[48]; - ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); ostringstream prologue; prologue<<"Message [0x"<ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Undecipherable message (bad response type %d) from %s.", diff --git a/pvAccessApp/utils/arrayFIFO.h b/pvAccessApp/utils/arrayFIFO.h index d449c56..6de4bdb 100644 --- a/pvAccessApp/utils/arrayFIFO.h +++ b/pvAccessApp/utils/arrayFIFO.h @@ -243,7 +243,7 @@ namespace epics { T ArrayFIFO::pollFirst() { Lock lock(&_mutex); - if(isEmpty()) THROW_BASE_EXCEPTION("ArrayFIFO empty"); + if(isEmpty()) return 0; T result = _elements[_head]; // Element is null if deque empty _head = (_head+1)&(_size-1); @@ -254,7 +254,7 @@ namespace epics { T ArrayFIFO::pollLast() { Lock lock(&_mutex); - if(isEmpty()) THROW_BASE_EXCEPTION("ArrayFIFO empty"); + if(isEmpty()) return 0; _tail = (_tail-1)&(_size-1); return _elements[_tail]; @@ -264,7 +264,7 @@ namespace epics { T ArrayFIFO::peekFirst() { Lock lock(&_mutex); - if(isEmpty()) THROW_BASE_EXCEPTION("ArrayFIFO empty"); + if(isEmpty()) return 0; return _elements[_head]; } @@ -273,7 +273,7 @@ namespace epics { T ArrayFIFO::peekLast() { Lock lock(&_mutex); - if(isEmpty()) THROW_BASE_EXCEPTION("ArrayFIFO empty"); + if(isEmpty()) return 0; return _elements[(_tail-1)&(_size-1)]; } diff --git a/pvAccessApp/utils/inetAddressUtil.cpp b/pvAccessApp/utils/inetAddressUtil.cpp index 77eac47..e9ab30f 100644 --- a/pvAccessApp/utils/inetAddressUtil.cpp +++ b/pvAccessApp/utils/inetAddressUtil.cpp @@ -60,6 +60,7 @@ namespace epics { // get number of interfaces ifconf.ifc_len = nelem*sizeof(ifreq); ifconf.ifc_req = pIfreqList; + memset(ifconf.ifc_req,0,ifconf.ifc_len); status = ioctl(sock, SIOCGIFCONF, &ifconf); if(status<0||ifconf.ifc_len==0) { errlogSevPrintf( @@ -69,32 +70,46 @@ namespace epics { return retVector; } - errlogPrintf("Found %d interfaces\n", ifconf.ifc_len); + struct ifreq* p = pIfreqList; + int maxNodes = ifconf.ifc_len/sizeof(ifreq); + for(int i = 0; iifr_name)) break; + //printf("[%i] plen %d name %s\n", i,p->ifr_addr.sa_len, p->ifr_name); + + + size_t n = p->ifr_addr.sa_len + sizeof(p->ifr_name); + if (n < sizeof(*p)) + p++; + else + p = (struct ifreq *)((char *)p + n); + - for(int i = 0; i<=ifconf.ifc_len; i++) { /* * If its not an internet interface then dont use it */ - if(pIfreqList[i].ifr_addr.sa_family!=AF_INET) continue; + if(p->ifr_addr.sa_family!=AF_INET) continue; - status = ioctl(sock, SIOCGIFFLAGS, &pIfreqList[i]); + struct ifreq ifrflags; + strncpy(ifrflags.ifr_name, p->ifr_name, + sizeof(ifrflags.ifr_name)); + status = ioctl(sock, SIOCGIFFLAGS, (char*)&ifrflags); if(status) { errlogSevPrintf( errlogMinor, "getBroadcastAddresses(): net intf flags fetch for \"%s\" failed", - pIfreqList[i].ifr_name); + p->ifr_name); continue; } /* * dont bother with interfaces that have been disabled */ - if(!(pIfreqList[i].ifr_flags&IFF_UP)) continue; + if(!(ifrflags.ifr_flags&IFF_UP)) continue; /* * dont use the loop back interface */ - if(pIfreqList[i].ifr_flags&IFF_LOOPBACK) continue; + if(ifrflags.ifr_flags&IFF_LOOPBACK) continue; pNewNode = new osiSockAddr; if(pNewNode==NULL) { @@ -114,37 +129,43 @@ namespace epics { * Otherwise CA will not query through the * interface. */ - if(pIfreqList[i].ifr_flags&IFF_BROADCAST) { - status = ioctl(sock, SIOCGIFBRDADDR, &pIfreqList[i]); + if(ifrflags.ifr_flags&IFF_BROADCAST) { + struct ifreq ifrflags; + strncpy(ifrflags.ifr_name, p->ifr_name, + sizeof(ifrflags.ifr_name)); + status = ioctl(sock, SIOCGIFBRDADDR, (char*)&ifrflags); if(status) { errlogSevPrintf( errlogMinor, "getBroadcastAddresses(): net intf \"%s\": bcast addr fetch fail", - pIfreqList->ifr_name); + p->ifr_name); delete pNewNode; continue; } - pNewNode->sa = pIfreqList[i].ifr_broadaddr; + pNewNode->sa = ifrflags.ifr_broadaddr; } #ifdef IFF_POINTOPOINT - else if(pIfreqList->ifr_flags&IFF_POINTOPOINT) { - status = ioctl(sock, SIOCGIFDSTADDR, &pIfreqList[i]); + else if(ifrflags.ifr_flags&IFF_POINTOPOINT) { + struct ifreq ifrflags; + strncpy(ifrflags.ifr_name, p->ifr_name, + sizeof(ifrflags.ifr_name)); + status = ioctl(sock, SIOCGIFDSTADDR, (char*)&ifrflags); if(status) { errlogSevPrintf( errlogMinor, "getBroadcastAddresses(): net intf \"%s\": pt to pt addr fetch fail", - pIfreqList[i].ifr_name); + p->ifr_name); delete pNewNode; continue; } - pNewNode->sa = pIfreqList[i].ifr_dstaddr; + pNewNode->sa = ifrflags.ifr_dstaddr; } #endif else { errlogSevPrintf( errlogMinor, "getBroadcastAddresses(): net intf \"%s\": not point to point or bcast?", - pIfreqList[i].ifr_name); + p->ifr_name); delete pNewNode; continue; } @@ -164,12 +185,16 @@ namespace epics { buffer->putShort(0); // next 16-bits are 1 buffer->putShort(0xFFFF); + buffer->putInt(ntohl(address->ia.sin_addr.s_addr)); + + /* // following IPv4 address in big-endian (network) byte order in_addr_t ipv4Addr = ntohl(address->ia.sin_addr.s_addr); buffer->putByte((int8)((ipv4Addr>>24)&0xFF)); buffer->putByte((int8)((ipv4Addr>>16)&0xFF)); buffer->putByte((int8)((ipv4Addr>>8)&0xFF)); buffer->putByte((int8)(ipv4Addr&0xFF)); + */ } osiSockAddr* intToIPv4Address(int32 addr) { diff --git a/testApp/remote/testChannelSearchManager.cpp b/testApp/remote/testChannelSearchManager.cpp index 5861a11..a459db2 100644 --- a/testApp/remote/testChannelSearchManager.cpp +++ b/testApp/remote/testChannelSearchManager.cpp @@ -9,8 +9,8 @@ class TestSearcInstance : public BaseSearchInstance { public: TestSearcInstance(string channelName, pvAccessID channelID): _channelID(channelID), _channelName(channelName) {} - pvAccessID getChannelID() { return _channelID;}; - string getChannelName() {return _channelName;}; + pvAccessID getSearchInstanceID() { return _channelID;}; + string getSearchInstanceName() {return _channelName;}; void searchResponse(int8 minorRevision, osiSockAddr* serverAddress) {}; private: pvAccessID _channelID; diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 4b0f311..de251df 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -20,6 +20,7 @@ #include #include #include +#include using namespace epics::pvData; using namespace epics::pvAccess; @@ -388,7 +389,6 @@ typedef std::map IOIDResponseRequestMap; #define CALLBACK_GUARD(code) try { code } catch(...) { } -class ClientContextImpl; class DebugResponse : public ResponseHandler, private epics::pvData::NoDefaultMethods { public: @@ -406,14 +406,13 @@ class ClientContextImpl; Transport* transport, int8 version, int8 command, int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { + char ipAddrStr[48]; - std::cout << "ole" << std::endl; - ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); - std::cout << "ole2" << std::endl; - + ipAddrToDottedIP(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); + ostringstream prologue; prologue<<"Message [0x"<ensureData(5); + int32 searchSequenceId = payloadBuffer->getInt(); + bool found = payloadBuffer->getByte() != 0; + if (!found) + return; + + transport->ensureData((128+2*16)/8); + + osiSockAddr serverAddress; + serverAddress.ia.sin_family = AF_INET; + + // 128-bit IPv6 address + /* + int8* byteAddress = new int8[16]; + for (int i = 0; i < 16; i++) + byteAddress[i] = payloadBuffer->getByte(); }; + */ + + // IPv4 compatible IPv6 address expected + // first 80-bit are 0 + if (payloadBuffer->getLong() != 0) return; + if (payloadBuffer->getShort() != 0) return; + if (payloadBuffer->getShort() != (int16)0xFFFF) return; + + // accept given address if explicitly specified by sender + serverAddress.ia.sin_addr.s_addr = htonl(payloadBuffer->getInt()); + if (serverAddress.ia.sin_addr.s_addr == INADDR_ANY) + serverAddress.ia.sin_addr = responseFrom->ia.sin_addr; + + serverAddress.ia.sin_port = htons(payloadBuffer->getShort()); + + // reads CIDs + ChannelSearchManager* csm = m_context->getChannelSearchManager(); + int16 count = payloadBuffer->getShort(); + for (int i = 0; i < count; i++) + { + transport->ensureData(4); + pvAccessID cid = payloadBuffer->getInt(); + csm->searchResponse(cid, searchSequenceId, version & 0x0F, &serverAddress); + } + + + } + }; + + /** * CA response handler - main handler which dispatches responses to appripriate handlers. * @author Matej Sekoranja @@ -460,7 +522,7 @@ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoD m_handlerTable[ 1] = badResponse; // TODO new ConnectionValidationHandler(context), /* 1 */ m_handlerTable[ 2] = badResponse; // TODO new NoopResponse(context, "Echo"), /* 2 */ m_handlerTable[ 3] = badResponse; // TODO new NoopResponse(context, "Search"), /* 3 */ - m_handlerTable[ 4] = badResponse; // TODO new SearchResponseHandler(context), /* 4 */ + m_handlerTable[ 4] = new SearchResponseHandler(context), /* 4 */ m_handlerTable[ 5] = badResponse; // TODO new NoopResponse(context, "Introspection search"), /* 5 */ m_handlerTable[ 6] = dataResponse; /* 6 - introspection search */ m_handlerTable[ 7] = badResponse; // TODO new CreateChannelHandler(context), /* 7 */ @@ -528,7 +590,6 @@ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoD -class BeaconHandlerImpl; @@ -558,8 +619,7 @@ enum ContextState { }; -class ClientContextImpl : public ClientContext, -public Context /* TODO */ +class TestClientContextImpl : public ClientContextImpl { @@ -571,11 +631,7 @@ public Context /* TODO */ * Implementation of CAJ JCA Channel. * @author Matej Sekoranja */ -class ChannelImpl : - public Channel , - public TransportClient, - public TransportSender, - public BaseSearchInstance { +class TestChannelImpl : public ChannelImpl { private: /** @@ -658,7 +714,7 @@ class ChannelImpl : PVStructure* m_pvStructure; private: - ~ChannelImpl() + ~TestChannelImpl() { PVDATA_REFCOUNT_MONITOR_DESTRUCT(channel); } @@ -672,7 +728,7 @@ class ChannelImpl : * @param listener * @throws CAException */ - ChannelImpl( + TestChannelImpl( ClientContextImpl* context, pvAccessID channelID, String name, @@ -700,25 +756,6 @@ class ChannelImpl : // connect connect(); - - - // - // mock - // - ScalarType stype = pvDouble; - String allProperties("alarm,timeStamp,display,control,valueAlarm"); - - m_pvStructure = getStandardPVField()->scalar( - 0,name,stype,allProperties); - PVDouble *pvField = m_pvStructure->getDoubleField(String("value")); - pvField->put(1.123); - - - // already connected, report state - m_requester->channelStateChange(this, CONNECTED); - - - } virtual void destroy() @@ -791,6 +828,14 @@ class ChannelImpl : return m_channelID; } + virtual pvAccessID getSearchInstanceID() { + return m_channelID; + } + + virtual String getSearchInstanceName() { + return m_name; + } + void connect() { Lock guard(&m_channelMutex); // if not destroyed... @@ -1027,8 +1072,7 @@ class ChannelImpl : if (transport) { // multiple defined PV or reconnect request (same server address) - // TOD !!!! if (!(*(transport->getRemoteAddress()) == *serverAddress)) - if (false) + if (sockAddrAreIdentical(transport->getRemoteAddress(), serverAddress)) { m_requester->message("More than one channel with name '" + m_name + "' detected, additional response from: " + inetAddressToString(serverAddress), warningMessage); @@ -1346,7 +1390,7 @@ class ChannelImpl : public: - ClientContextImpl() : + TestClientContextImpl() : m_addressList(""), m_autoAddressList(true), m_connectionTimeout(30.0f), m_beaconPeriod(15.0f), m_broadcastPort(CA_BROADCAST_PORT), m_receiveBufferSize(MAX_TCP_RECV), m_timer(0), m_broadcastTransport(0), m_searchTransport(0), m_connector(0), m_transportRegistry(0), @@ -1457,7 +1501,7 @@ class ChannelImpl : } private: - ~ClientContextImpl() {}; + ~TestClientContextImpl() {}; void loadConfiguration() { // TODO @@ -1499,8 +1543,15 @@ class ChannelImpl : listenLocalAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY); // where to send address - InetAddrVector* broadcastAddresses = getSocketAddressList("192.168.1.255", m_broadcastPort); - // TODO getBroadcastAddresses(broadcastPort) + SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, 0); + InetAddrVector* broadcastAddresses = getBroadcastAddresses(socket); + cout<<"Broadcast addresses: "<size()<size(); i++) { + broadcastAddresses->at(i)->ia.sin_port = htons(m_broadcastPort); + cout<<"Broadcast address: "; + cout<at(i))<connect(client, new ClientResponseHandler(this), serverAddress, minorRevision, priority); + return m_connector->connect(client, new ClientResponseHandler(this), *serverAddress, minorRevision, priority); } - catch (ConnectionException cex) + catch (...) { - logger.log(Level.SEVERE, "Failed to create transport for: " + serverAddress, cex); + // TODO log + printf("failed to get transport\n"); + return 0; } - */ - return 0; - } /** @@ -1694,7 +1742,7 @@ class ChannelImpl : */ // TODO no minor version with the addresses // TODO what if there is an channel with the same name, but on different host! - Channel* createChannelInternal(String name, ChannelRequester* requester, short priority, + ChannelImpl* createChannelInternal(String name, ChannelRequester* requester, short priority, InetAddrVector* addresses) { // TODO addresses checkState(); @@ -1712,7 +1760,7 @@ class ChannelImpl : try { pvAccessID cid = generateCID(); - return new ChannelImpl(this, cid, name, requester, priority, addresses); + return new TestChannelImpl(this, cid, name, requester, priority, addresses); } catch(...) { // TODO @@ -1873,8 +1921,8 @@ class ChannelImpl : * Beacon handler map. */ // TODO consider std::unordered_map - typedef std::map AddressBeaconHandlerMap; - AddressBeaconHandlerMap m_beaconHandlers; +// typedef std::map AddressBeaconHandlerMap; +// AddressBeaconHandlerMap m_beaconHandlers; /** * Version. @@ -2123,7 +2171,7 @@ class ChannelProcessRequesterImpl : public ChannelProcessRequester int main(int argc,char *argv[]) { - ClientContextImpl* context = new ClientContextImpl(); + TestClientContextImpl* context = new TestClientContextImpl(); context->printInfo(); context->initialize(); @@ -2174,7 +2222,7 @@ int main(int argc,char *argv[]) monitor->destroy(); */ - epicsThreadSleep ( 10.0 ); + epicsThreadSleep ( 100.0 ); channel->destroy(); context->destroy(); diff --git a/testApp/utils/inetAddressUtilsTest.cpp b/testApp/utils/inetAddressUtilsTest.cpp index 5a594b7..0ff8f93 100644 --- a/testApp/utils/inetAddressUtilsTest.cpp +++ b/testApp/utils/inetAddressUtilsTest.cpp @@ -126,7 +126,7 @@ int main(int argc, char *argv[]) { assert(strncmp(buff->getArray(), src, 16)==0); cout<<"\nPASSED!\n"; - SOCKET socket = epicsSocketCreate(AF_INET, SOCK_STREAM, IPPROTO_TCP); + SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, 0); InetAddrVector* broadcasts = getBroadcastAddresses(socket); cout<<"Broadcast addresses: "<size()<size(); i++) { From 21951b885dbdcaaf1eeece9d26a9e46950947221 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Mon, 10 Jan 2011 21:15:17 +0100 Subject: [PATCH 10/10] create channel w/ minor error on verification send-back --- pvAccessApp/remote/remote.h | 17 +-- pvAccessApp/remoteClient/clientContextImpl.h | 3 + pvAccessApp/server/responseHandlers.cpp | 3 +- pvAccessApp/server/responseHandlers.h | 7 +- testApp/remote/testBeaconEmitter.cpp | 2 +- testApp/remote/testBeaconHandler.cpp | 2 +- testApp/remote/testBlockingTCPClnt.cpp | 2 +- testApp/remote/testBlockingUDPClnt.cpp | 5 +- testApp/remote/testBlockingUDPSrv.cpp | 4 +- testApp/remote/testRemoteClientImpl.cpp | 117 ++++++++++++++++--- 10 files changed, 120 insertions(+), 42 deletions(-) diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index 8a8aaea..496c818 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -289,16 +289,9 @@ namespace epics { */ class ResponseHandler { public: - ResponseHandler(Context* context) : - _context(context) { - } - - virtual ~ResponseHandler() { - } - /** * Handle response. - * @param[in] responseFrom remote address of the responder, null if unknown. + * @param[in] responseFrom remote address of the responder, 0 if unknown. * @param[in] transport response source transport. * @param[in] version message version. * @param[in] payloadSize size of this message data available in the payloadBuffer. @@ -310,9 +303,6 @@ namespace epics { handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) =0; - - protected: - Context* _context; }; /** @@ -326,9 +316,8 @@ namespace epics { * @param description */ AbstractResponseHandler(Context* context, String description) : - ResponseHandler(context), _description(description), _debug( - _context->getConfiguration()->getPropertyAsBoolean( - "PVACCESS_DEBUG", false)) { + _description(description), + _debug(context->getConfiguration()->getPropertyAsBoolean("PVACCESS_DEBUG", false)) { } virtual ~AbstractResponseHandler() { diff --git a/pvAccessApp/remoteClient/clientContextImpl.h b/pvAccessApp/remoteClient/clientContextImpl.h index bf5bf1e..378abcf 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.h +++ b/pvAccessApp/remoteClient/clientContextImpl.h @@ -24,6 +24,9 @@ namespace epics { public: virtual pvAccessID getChannelID() = 0; virtual void destroyChannel(bool force) = 0; + virtual void connectionCompleted(pvAccessID sid/*, rights*/) = 0; + virtual void createChannelFailed() = 0; + }; diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index fe57284..b96c659 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -39,8 +39,7 @@ namespace epics { } - ServerResponseHandler::ServerResponseHandler(ServerContextImpl* context) : - ResponseHandler(context) { + ServerResponseHandler::ServerResponseHandler(ServerContextImpl* context) { BadResponse* badResponse = new BadResponse(context); diff --git a/pvAccessApp/server/responseHandlers.h b/pvAccessApp/server/responseHandlers.h index 7aeab36..ccc3849 100644 --- a/pvAccessApp/server/responseHandlers.h +++ b/pvAccessApp/server/responseHandlers.h @@ -19,14 +19,15 @@ namespace epics { * @version $Id: AbstractServerResponseHandler.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ */ class AbstractServerResponseHandler : public AbstractResponseHandler { + protected: + ServerContextImpl* _context; public: /** * @param context * @param description */ - AbstractServerResponseHandler(ServerContextImpl* context, - String description) : - AbstractResponseHandler(context, description) { + AbstractServerResponseHandler(ServerContextImpl* context, String description) : + AbstractResponseHandler(context, description), _context(context) { } virtual ~AbstractServerResponseHandler() { diff --git a/testApp/remote/testBeaconEmitter.cpp b/testApp/remote/testBeaconEmitter.cpp index c1a3026..9e98d85 100644 --- a/testApp/remote/testBeaconEmitter.cpp +++ b/testApp/remote/testBeaconEmitter.cpp @@ -18,7 +18,7 @@ using namespace epics::pvData; class DummyResponseHandler : public ResponseHandler { public: - DummyResponseHandler(Context* ctx) : ResponseHandler(ctx) {} + DummyResponseHandler(Context* ctx) : ResponseHandler() {} virtual void handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, diff --git a/testApp/remote/testBeaconHandler.cpp b/testApp/remote/testBeaconHandler.cpp index 6690ab4..0e5ec28 100644 --- a/testApp/remote/testBeaconHandler.cpp +++ b/testApp/remote/testBeaconHandler.cpp @@ -37,7 +37,7 @@ void decodeFromIPv6Address(ByteBuffer* buffer, osiSockAddr* address) class BeaconResponseHandler : public ResponseHandler { public: - BeaconResponseHandler(Context* ctx) : ResponseHandler(ctx) + BeaconResponseHandler(Context* ctx) : ResponseHandler() { _pvDataCreate = getPVDataCreate(); } diff --git a/testApp/remote/testBlockingTCPClnt.cpp b/testApp/remote/testBlockingTCPClnt.cpp index d845c1b..4340caa 100644 --- a/testApp/remote/testBlockingTCPClnt.cpp +++ b/testApp/remote/testBlockingTCPClnt.cpp @@ -50,7 +50,7 @@ private: class DummyResponseHandler : public ResponseHandler { public: - DummyResponseHandler(Context* ctx) : ResponseHandler(ctx) { + DummyResponseHandler(Context* ctx) : ResponseHandler() { } virtual void handleResponse(osiSockAddr* responseFrom, diff --git a/testApp/remote/testBlockingUDPClnt.cpp b/testApp/remote/testBlockingUDPClnt.cpp index 3e124d9..7fb490c 100644 --- a/testApp/remote/testBlockingUDPClnt.cpp +++ b/testApp/remote/testBlockingUDPClnt.cpp @@ -60,9 +60,8 @@ private: class DummyResponseHandler : public ResponseHandler { public: - DummyResponseHandler(Context* ctx) : - ResponseHandler(ctx) { - } + DummyResponseHandler(Context* ctx) + { } virtual void handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, diff --git a/testApp/remote/testBlockingUDPSrv.cpp b/testApp/remote/testBlockingUDPSrv.cpp index 16612ac..5308428 100644 --- a/testApp/remote/testBlockingUDPSrv.cpp +++ b/testApp/remote/testBlockingUDPSrv.cpp @@ -55,8 +55,8 @@ private: class DummyResponseHandler : public ResponseHandler { public: - DummyResponseHandler(Context* context) : - ResponseHandler(context), packets(0) { + DummyResponseHandler(Context* context) + : packets(0) { } int getPackets() { diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 5f8c362..e9ca92b 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -391,13 +391,33 @@ typedef std::map IOIDResponseRequestMap; - class DebugResponse : public ResponseHandler, private epics::pvData::NoDefaultMethods { + /** + * @author Matej Sekoranja + * @version $Id: AbstractServerResponseHandler.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ + */ + class AbstractClientResponseHandler : public AbstractResponseHandler { + protected: + ClientContextImpl* _context; + public: + /** + * @param context + * @param description + */ + AbstractClientResponseHandler(ClientContextImpl* context, String description) : + AbstractResponseHandler(context, description), _context(context) { + } + + virtual ~AbstractClientResponseHandler() { + } + }; + + class DebugResponse : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods { public: /** * @param context */ - DebugResponse(Context* ctx) : - ResponseHandler(ctx) + DebugResponse(ClientContextImpl* context) : + AbstractClientResponseHandler(context, "not implemented") { } @@ -414,7 +434,6 @@ typedef std::map IOIDResponseRequestMap; ostringstream prologue; prologue<<"Message [0x"<getArray(), @@ -423,11 +442,10 @@ typedef std::map IOIDResponseRequestMap; } }; - class SearchResponseHandler : public ResponseHandler, private epics::pvData::NoDefaultMethods { - private: - ClientContextImpl* m_context; + class SearchResponseHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods { public: - SearchResponseHandler(ClientContextImpl* context) : ResponseHandler(context), m_context(context) + SearchResponseHandler(ClientContextImpl* context) : + AbstractClientResponseHandler(context, "Search response") { } @@ -438,7 +456,7 @@ typedef std::map IOIDResponseRequestMap; Transport* transport, int8 version, int8 command, int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { - // TODO super.handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); + AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); transport->ensureData(5); int32 searchSequenceId = payloadBuffer->getInt(); @@ -472,7 +490,7 @@ typedef std::map IOIDResponseRequestMap; serverAddress.ia.sin_port = htons(payloadBuffer->getShort()); // reads CIDs - ChannelSearchManager* csm = m_context->getChannelSearchManager(); + ChannelSearchManager* csm = _context->getChannelSearchManager(); int16 count = payloadBuffer->getShort(); for (int i = 0; i < count; i++) { @@ -486,6 +504,75 @@ typedef std::map IOIDResponseRequestMap; }; + class ConnectionValidationHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods { + public: + ConnectionValidationHandler(ClientContextImpl* context) : + AbstractClientResponseHandler(context, "Connection validation") + { + } + + virtual ~ConnectionValidationHandler() { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) + { + AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); + + transport->ensureData(8); + transport->setRemoteTransportReceiveBufferSize(payloadBuffer->getInt()); + transport->setRemoteTransportSocketReceiveBufferSize(payloadBuffer->getInt()); + + transport->setRemoteMinorRevision(version); + + transport->enqueueSendRequest((TransportSender*)transport); + transport->verified(); + + } + }; + + class CreateChannelHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods { + public: + CreateChannelHandler(ClientContextImpl* context) : + AbstractClientResponseHandler(context, "Create channel") + { + } + + virtual ~CreateChannelHandler() { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) + { + AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); + + transport->ensureData(8); + pvAccessID cid = payloadBuffer->getInt(); + pvAccessID sid = payloadBuffer->getInt(); + // TODO... do not destroy OK + Status* status = transport->getIntrospectionRegistry()->deserializeStatus(payloadBuffer, transport); + + ChannelImpl* channel = static_cast(_context->getChannel(cid)); + if (channel) + { + // failed check + if (!status->isSuccess()) { + channel->createChannelFailed(); + return; + } + + //int16 acl = payloadBuffer->getShort(); + + channel->connectionCompleted(sid); + } + + } + }; + + + /** * CA response handler - main handler which dispatches responses to appripriate handlers. * @author Matej Sekoranja @@ -513,20 +600,20 @@ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoD /** * @param context */ - ClientResponseHandler(ClientContextImpl* context) : ResponseHandler((Context*)context) { - static ResponseHandler* badResponse = new DebugResponse((Context*)context); + ClientResponseHandler(ClientContextImpl* context) { + static ResponseHandler* badResponse = new DebugResponse(context); static ResponseHandler* dataResponse = 0; //new DataResponseHandler(context); #define HANDLER_COUNT 28 m_handlerTable = new ResponseHandler*[HANDLER_COUNT]; m_handlerTable[ 0] = badResponse; // TODO new BeaconHandler(context), /* 0 */ - m_handlerTable[ 1] = badResponse; // TODO new ConnectionValidationHandler(context), /* 1 */ + m_handlerTable[ 1] = new ConnectionValidationHandler(context), /* 1 */ m_handlerTable[ 2] = badResponse; // TODO new NoopResponse(context, "Echo"), /* 2 */ m_handlerTable[ 3] = badResponse; // TODO new NoopResponse(context, "Search"), /* 3 */ m_handlerTable[ 4] = new SearchResponseHandler(context), /* 4 */ m_handlerTable[ 5] = badResponse; // TODO new NoopResponse(context, "Introspection search"), /* 5 */ m_handlerTable[ 6] = dataResponse; /* 6 - introspection search */ - m_handlerTable[ 7] = badResponse; // TODO new CreateChannelHandler(context), /* 7 */ + m_handlerTable[ 7] = new CreateChannelHandler(context), /* 7 */ m_handlerTable[ 8] = badResponse; // TODO new NoopResponse(context, "Destroy channel"), /* 8 */ // TODO it might be useful to implement this... m_handlerTable[ 9] = badResponse; /* 9 */ m_handlerTable[10] = dataResponse; /* 10 - get response */ @@ -912,7 +999,7 @@ class TestChannelImpl : public ChannelImpl { * sid might not be valid, this depends on protocol revision. * @param sid */ - void connectionCompleted(pvAccessID sid/*, rights*/) + virtual void connectionCompleted(pvAccessID sid/*, rights*/) { Lock guard(&m_channelMutex);