diff --git a/src/remote/blockingUDP.h b/src/remote/blockingUDP.h index 554204b..682ccec 100644 --- a/src/remote/blockingUDP.h +++ b/src/remote/blockingUDP.h @@ -46,19 +46,20 @@ namespace epics { public: POINTER_DEFINITIONS(BlockingUDPTransport); - private: BlockingUDPTransport(bool serverFlag, std::auto_ptr &responseHandler, SOCKET channel, osiSockAddr &bindAddress, short remoteTransportRevision); - public: + static shared_pointer create(bool serverFlag, std::auto_ptr& responseHandler, SOCKET channel, osiSockAddr& bindAddress, - short remoteTransportRevision) + short remoteTransportRevision) EPICS_DEPRECATED { shared_pointer thisPointer( - new BlockingUDPTransport(serverFlag, responseHandler, channel, bindAddress, remoteTransportRevision) + new BlockingUDPTransport(serverFlag, responseHandler, + channel, bindAddress, + remoteTransportRevision) ); return thisPointer; } @@ -69,6 +70,11 @@ namespace epics { return _closed.get(); } + void setReplyTransport(const Transport::shared_pointer& T) + { + _replyTransport = T; + } + virtual const osiSockAddr* getRemoteAddress() const { return &_remoteAddress; } @@ -324,6 +330,13 @@ namespace epics { */ SOCKET _channel; + /** When provided, this transport is used for replies (passed to handler) + * instead of *this. This feature is used in the situation where broadcast + * traffic is received on one socket, but a different socket must be used + * for unicast replies. + */ + Transport::shared_pointer _replyTransport; + /** * Bind address. */ diff --git a/src/remote/blockingUDPConnector.cpp b/src/remote/blockingUDPConnector.cpp index 10dc29f..7057f68 100644 --- a/src/remote/blockingUDPConnector.cpp +++ b/src/remote/blockingUDPConnector.cpp @@ -68,8 +68,8 @@ namespace epics { } // sockets are blocking by default - Transport::shared_pointer transport = BlockingUDPTransport::create(_serverFlag, - responseHandler, socket, bindAddress, transportRevision); + Transport::shared_pointer transport(new BlockingUDPTransport(_serverFlag, responseHandler, + socket, bindAddress, transportRevision)); return transport; } diff --git a/src/remote/blockingUDPTransport.cpp b/src/remote/blockingUDPTransport.cpp index 36dabdc..d8621b1 100644 --- a/src/remote/blockingUDPTransport.cpp +++ b/src/remote/blockingUDPTransport.cpp @@ -40,8 +40,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so PVACCESS_REFCOUNT_MONITOR_DEFINE(blockingUDPTransport); - BlockingUDPTransport::BlockingUDPTransport( - bool serverFlag, + BlockingUDPTransport::BlockingUDPTransport(bool serverFlag, auto_ptr& responseHandler, SOCKET channel, osiSockAddr& bindAddress, short /*remoteTransportRevision*/) : @@ -213,7 +212,8 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so osiSockAddr fromAddress; osiSocklen_t addrStructSize = sizeof(sockaddr); - Transport::shared_pointer thisTransport = shared_from_this(); + Transport::shared_pointer thisTransport = shared_from_this(), + replyTo(_replyTransport ? _replyTransport : thisTransport); try { @@ -249,7 +249,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so _receiveBuffer->flip(); try{ - processBuffer(thisTransport, fromAddress, _receiveBuffer.get()); + processBuffer(replyTo, fromAddress, _receiveBuffer.get()); }catch(std::exception& e){ LOG(logLevelError, "an exception caught while in UDP receiveThread at %s:%d: %s", @@ -304,7 +304,8 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so _shutdownEvent.signal(); } - bool BlockingUDPTransport::processBuffer(Transport::shared_pointer const & thisTransport, osiSockAddr& fromAddress, ByteBuffer* receiveBuffer) { + bool BlockingUDPTransport::processBuffer(Transport::shared_pointer const & replyTransport, + osiSockAddr& fromAddress, ByteBuffer* receiveBuffer) { // handle response(s) while(likely((int)receiveBuffer->getRemaining()>=PVA_MESSAGE_HEADER_SIZE)) { @@ -342,7 +343,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so if(unlikely(nextRequestPosition>receiveBuffer->getLimit())) return false; // handle - _responseHandler->handleResponse(&fromAddress, thisTransport, + _responseHandler->handleResponse(&fromAddress, replyTransport, version, command, payloadSize, _receiveBuffer.get());