From f72e495a99d7798b373bd0d043c9e35195f00466 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Tue, 11 Jan 2011 22:35:43 +0100 Subject: [PATCH] more responses implemented --- pvAccessApp/remote/blockingUDPConnector.cpp | 35 +++-- pvAccessApp/remote/remote.h | 16 ++ pvAccessApp/remoteClient/clientContextImpl.h | 5 + testApp/remote/testRemoteClientImpl.cpp | 153 ++++++++++++++++--- 4 files changed, 171 insertions(+), 38 deletions(-) diff --git a/pvAccessApp/remote/blockingUDPConnector.cpp b/pvAccessApp/remote/blockingUDPConnector.cpp index 3e97a93..94ade32 100644 --- a/pvAccessApp/remote/blockingUDPConnector.cpp +++ b/pvAccessApp/remote/blockingUDPConnector.cpp @@ -37,6 +37,24 @@ namespace epics { errStr); } + int optval = _broadcast ? true : false; + int retval = ::setsockopt(socket, SOL_SOCKET, SO_BROADCAST, &optval, + sizeof(optval)); + if(retval<0) errlogSevPrintf(errlogMajor, + "Error setting SO_BROADCAST: %s", strerror(errno)); +printf("_broadcast: %d\n", _broadcast); + + // set the socket options + //if (_reuseSocket) + // epicsSocketEnableAddressUseForDatagramFanout(socket); + + optval = _reuseSocket ? true : false; + retval = ::setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &optval, + sizeof(optval)); + if(retval<0) errlogSevPrintf(errlogMajor, + "Error setting SO_REUSEADDR: %s", strerror(errno)); +printf("_reuseSocket: %d\n", _reuseSocket); + /* from MSDN: * Note: If the setsockopt function is called before the bind * function, TCP/IP options will not be checked by using TCP/IP @@ -44,8 +62,9 @@ namespace epics { * call will always succeed, but the bind function call can fail * because of an early setsockopt call failing. */ + // still we need to set SO_REUSEADDR befire bind - int retval = ::bind(socket, (sockaddr*)&(bindAddress.sa), + retval = ::bind(socket, (sockaddr*)&(bindAddress.sa), sizeof(sockaddr)); if(retval<0) { errlogSevPrintf(errlogMajor, "Error binding socket: %s", @@ -53,20 +72,6 @@ namespace epics { THROW_BASE_EXCEPTION(strerror(errno)); } - // set the socket options - - int optval = _reuseSocket ? 1 : 0; - retval = ::setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &optval, - sizeof(optval)); - if(retval<0) errlogSevPrintf(errlogMajor, - "Error setting SO_REUSEADDR: %s", strerror(errno)); - - optval = _broadcast ? 1 : 0; - retval = ::setsockopt(socket, SOL_SOCKET, SO_BROADCAST, &optval, - sizeof(optval)); - if(retval<0) errlogSevPrintf(errlogMajor, - "Error setting SO_BROADCAST: %s", strerror(errno)); - // sockets are blocking by default return new BlockingUDPTransport(responseHandler, socket, diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index 496c818..dc238d2 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -533,6 +533,22 @@ namespace epics { */ virtual epics::pvData::Requester* getRequester() = 0; }; + + /** + * @author Matej Sekoranja + * @version $Id: DataResponse.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ + */ + class DataResponse : public ResponseRequest { + public: + /** + * Notification response. + * @param transport + * @param version + * @param payloadBuffer + */ + virtual void response(Transport* transport, int8 version, ByteBuffer* payloadBuffer) = 0; + + }; } } diff --git a/pvAccessApp/remoteClient/clientContextImpl.h b/pvAccessApp/remoteClient/clientContextImpl.h index 378abcf..439c84c 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.h +++ b/pvAccessApp/remoteClient/clientContextImpl.h @@ -42,6 +42,11 @@ namespace epics { virtual void destroyChannel(ChannelImpl* channel, bool force) = 0; virtual ChannelImpl* createChannelInternal(String name, ChannelRequester* requester, short priority, InetAddrVector* addresses) = 0; + virtual ResponseRequest* getResponseRequest(pvAccessID ioid) = 0; + virtual pvAccessID registerResponseRequest(ResponseRequest* request) = 0; + virtual ResponseRequest* unregisterResponseRequest(ResponseRequest* request) = 0; + + virtual Transport* getTransport(TransportClient* client, osiSockAddr* serverAddress, int16 minorRevision, int16 priority) = 0; diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 86429ca..640935d 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -23,6 +23,8 @@ #include #include +#include + using namespace epics::pvData; using namespace epics::pvAccess; @@ -411,37 +413,75 @@ typedef std::map IOIDResponseRequestMap; } }; - class DebugResponse : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods { + class NoopResponse : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods { public: /** * @param context */ - DebugResponse(ClientContextImpl* context) : - AbstractClientResponseHandler(context, "not implemented") + NoopResponse(ClientContextImpl* context, String description) : + AbstractClientResponseHandler(context, description) { } - virtual ~DebugResponse() { + virtual ~NoopResponse() { + } + }; + + + class BadResponse : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods { + public: + /** + * @param context + */ + BadResponse(ClientContextImpl* context) : + AbstractClientResponseHandler(context, "Bad response") + { + } + + virtual ~BadResponse() { } virtual void handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { - - char ipAddrStr[48]; - ipAddrToDottedIP(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); - ostringstream prologue; - prologue<<"Message [0x"<getArray(), - payloadBuffer->getPosition(), payloadSize); + char ipAddrStr[48]; + ipAddrToDottedIP(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); + errlogSevPrintf(errlogInfo, + "Undecipherable message (bad response type %d) from %s.", + command, ipAddrStr); } }; + + class DataResponseHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods { + public: + /** + * @param context + */ + DataResponseHandler(ClientContextImpl* context) : + AbstractClientResponseHandler(context, "Data response") + { + } + + virtual ~DataResponseHandler() { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) + { + AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); + + transport->ensureData(4); + DataResponse* nrr = dynamic_cast(_context->getResponseRequest(payloadBuffer->getInt())); + if (nrr) + nrr->response(transport, version, payloadBuffer); + } + }; + + class SearchResponseHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods { public: SearchResponseHandler(ClientContextImpl* context) : @@ -603,20 +643,20 @@ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoD * @param context */ ClientResponseHandler(ClientContextImpl* context) { - static ResponseHandler* badResponse = new DebugResponse(context); - static ResponseHandler* dataResponse = 0; //new DataResponseHandler(context); + ResponseHandler* badResponse = new BadResponse(context); + ResponseHandler* dataResponse = new DataResponseHandler(context); #define HANDLER_COUNT 28 m_handlerTable = new ResponseHandler*[HANDLER_COUNT]; m_handlerTable[ 0] = badResponse; // TODO new BeaconHandler(context), /* 0 */ m_handlerTable[ 1] = new ConnectionValidationHandler(context), /* 1 */ - m_handlerTable[ 2] = badResponse; // TODO new NoopResponse(context, "Echo"), /* 2 */ - m_handlerTable[ 3] = badResponse; // TODO new NoopResponse(context, "Search"), /* 3 */ + m_handlerTable[ 2] = new NoopResponse(context, "Echo"), /* 2 */ + m_handlerTable[ 3] = new NoopResponse(context, "Search"), /* 3 */ m_handlerTable[ 4] = new SearchResponseHandler(context), /* 4 */ - m_handlerTable[ 5] = badResponse; // TODO new NoopResponse(context, "Introspection search"), /* 5 */ + m_handlerTable[ 5] = new NoopResponse(context, "Introspection search"), /* 5 */ m_handlerTable[ 6] = dataResponse; /* 6 - introspection search */ m_handlerTable[ 7] = new CreateChannelHandler(context), /* 7 */ - m_handlerTable[ 8] = badResponse; // TODO new NoopResponse(context, "Destroy channel"), /* 8 */ // TODO it might be useful to implement this... + m_handlerTable[ 8] = new NoopResponse(context, "Destroy channel"), /* 8 */ // TODO it might be useful to implement this... m_handlerTable[ 9] = badResponse; /* 9 */ m_handlerTable[10] = dataResponse; /* 10 - get response */ m_handlerTable[11] = dataResponse; /* 11 - put response */ @@ -642,8 +682,6 @@ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoD Transport* transport, int8 version, int8 command, int payloadSize, ByteBuffer* payloadBuffer) { - int c = command+0; - std::cout << "received " << c << std::endl; if (command < 0 || command >= HANDLER_COUNT) { // TODO context.getLogger().fine("Invalid (or unsupported) command: " + command + "."); @@ -656,7 +694,7 @@ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoD } // delegate - m_handlerTable[c]->handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); + m_handlerTable[command]->handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); } }; @@ -1809,6 +1847,75 @@ class TestChannelImpl : public ChannelImpl { } + /** + * Searches for a response request with given channel IOID. + * @param ioid I/O ID. + * @return request response with given I/O ID. + */ + ResponseRequest* getResponseRequest(pvAccessID ioid) + { + /* + synchronized (pendingResponseRequests) + { + return (ResponseRequest)pendingResponseRequests.get(ioid); + } + */ + return 0; + } + + /** + * Register response request. + * @param request request to register. + * @return request ID (IOID). + */ + pvAccessID registerResponseRequest(ResponseRequest* request) + { + /* + synchronized (pendingResponseRequests) + { + int ioid = generateIOID(); + pendingResponseRequests.put(ioid, request); + return ioid; + } + */ + return 0; + } + + /** + * Unregister response request. + * @param request + * @return removed object, can be null + */ + ResponseRequest* unregisterResponseRequest(ResponseRequest* request) + { + /* + synchronized (pendingResponseRequests) + { + return (ResponseRequest)pendingResponseRequests.remove(request.getIOID()); + } + */ + return 0; + } + + /** + * Generate IOID. + * @return IOID. + */ + pvAccessID generateIOID() + { + /* + synchronized (pendingResponseRequests) + { + // search first free (theoretically possible loop of death) + while (pendingResponseRequests.get(++lastIOID) != null || lastIOID == CAConstants.CAJ_INVALID_IOID); + // reserve IOID + pendingResponseRequests.put(lastIOID, null); + return lastIOID; + } + */ + return 0; + } + /** * Get, or create if necessary, transport of given server address. * @param serverAddress required transport address