From 39739c8437af139b5df89c03e3851a69bb562be1 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Wed, 2 Mar 2016 22:24:22 +0100 Subject: [PATCH] fixed tricky UDP transport replayTo issue --- src/remote/blockingUDPTransport.cpp | 22 +++++++++------------- src/remote/pv/blockingUDP.h | 7 +++++-- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/src/remote/blockingUDPTransport.cpp b/src/remote/blockingUDPTransport.cpp index 775718a..7a9cf2e 100644 --- a/src/remote/blockingUDPTransport.cpp +++ b/src/remote/blockingUDPTransport.cpp @@ -218,8 +218,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so osiSockAddr fromAddress; osiSocklen_t addrStructSize = sizeof(sockaddr); - Transport::shared_pointer thisTransport = shared_from_this(), - replyTo(_replyTransport ? _replyTransport : thisTransport); + Transport::shared_pointer thisTransport = shared_from_this(); try { @@ -227,13 +226,6 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so size_t recvfrom_buffer_len =_receiveBuffer->getSize()-RECEIVE_BUFFER_PRE_RESERVE; while(!_closed.get()) { - // we poll to prevent blocking indefinitely - - // data ready to be read - _receiveBuffer->clear(); - // reserve some space for CMD_ORIGIN_TAG - _receiveBuffer->setPosition(RECEIVE_BUFFER_PRE_RESERVE); - int bytesRead = recvfrom(_channel, recvfrom_buffer_start, recvfrom_buffer_len, 0, (sockaddr*)&fromAddress, @@ -259,7 +251,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so _receiveBuffer->setLimit(RECEIVE_BUFFER_PRE_RESERVE+bytesRead); try{ - processBuffer(replyTo, fromAddress, _receiveBuffer.get()); + processBuffer(thisTransport, fromAddress, _receiveBuffer.get()); }catch(std::exception& e){ LOG(logLevelError, "an exception caught while in UDP receiveThread at %s:%d: %s", @@ -314,11 +306,13 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so _shutdownEvent.signal(); } - bool BlockingUDPTransport::processBuffer(Transport::shared_pointer const & replyTransport, + bool BlockingUDPTransport::processBuffer(Transport::shared_pointer const & transport, osiSockAddr& fromAddress, ByteBuffer* receiveBuffer) { + // handle response(s) while(likely((int)receiveBuffer->getRemaining()>=PVA_MESSAGE_HEADER_SIZE)) { + // // read header // @@ -398,7 +392,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so else { // handle - _responseHandler->handleResponse(&fromAddress, replyTransport, + _responseHandler->handleResponse(&fromAddress, transport, version, command, payloadSize, _receiveBuffer.get()); } @@ -668,8 +662,10 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so PVA_DEFAULT_PRIORITY)); /* The other wrinkle is that nothing should be sent from this second * socket. So replies are made through the unicast socket. - */ + * transport2->setReplyTransport(transport); + */ + // NOTE: search responses all always send from sendTransport if (ignoreAddressVector.get() && ignoreAddressVector->size()) transport2->setIgnoredAddresses(ignoreAddressVector.get()); diff --git a/src/remote/pv/blockingUDP.h b/src/remote/pv/blockingUDP.h index c7791bf..af124b0 100644 --- a/src/remote/pv/blockingUDP.h +++ b/src/remote/pv/blockingUDP.h @@ -70,10 +70,12 @@ namespace epics { return _closed.get(); } + /* void setReplyTransport(const Transport::shared_pointer& T) { _replyTransport = T; } + */ virtual const osiSockAddr* getRemoteAddress() const { return &_remoteAddress; @@ -369,12 +371,13 @@ namespace epics { */ SOCKET _channel; - /** When provided, this transport is used for replies (passed to handler) + /* When provided, this transport is used for replies (passed to handler) * instead of *this. This feature is used in the situation where broadcast * traffic is received on one socket, but a different socket must be used * for unicast replies. - */ + * Transport::shared_pointer _replyTransport; + */ /** * Bind address.