BlockingUDPTransport: add replyTransport
allow replies to be sent out on a different socket. Needed when binding the an interface broadcast address. Also, expose the ctor and deprecate pointless create() method.
This commit is contained in:
@@ -46,19 +46,20 @@ namespace epics {
|
||||
public:
|
||||
POINTER_DEFINITIONS(BlockingUDPTransport);
|
||||
|
||||
private:
|
||||
BlockingUDPTransport(bool serverFlag,
|
||||
std::auto_ptr<ResponseHandler> &responseHandler,
|
||||
SOCKET channel, osiSockAddr &bindAddress,
|
||||
short remoteTransportRevision);
|
||||
public:
|
||||
|
||||
static shared_pointer create(bool serverFlag,
|
||||
std::auto_ptr<ResponseHandler>& responseHandler,
|
||||
SOCKET channel, osiSockAddr& bindAddress,
|
||||
short remoteTransportRevision)
|
||||
short remoteTransportRevision) EPICS_DEPRECATED
|
||||
{
|
||||
shared_pointer thisPointer(
|
||||
new BlockingUDPTransport(serverFlag, responseHandler, channel, bindAddress, remoteTransportRevision)
|
||||
new BlockingUDPTransport(serverFlag, responseHandler,
|
||||
channel, bindAddress,
|
||||
remoteTransportRevision)
|
||||
);
|
||||
return thisPointer;
|
||||
}
|
||||
@@ -69,6 +70,11 @@ namespace epics {
|
||||
return _closed.get();
|
||||
}
|
||||
|
||||
void setReplyTransport(const Transport::shared_pointer& T)
|
||||
{
|
||||
_replyTransport = T;
|
||||
}
|
||||
|
||||
virtual const osiSockAddr* getRemoteAddress() const {
|
||||
return &_remoteAddress;
|
||||
}
|
||||
@@ -324,6 +330,13 @@ namespace epics {
|
||||
*/
|
||||
SOCKET _channel;
|
||||
|
||||
/** When provided, this transport is used for replies (passed to handler)
|
||||
* instead of *this. This feature is used in the situation where broadcast
|
||||
* traffic is received on one socket, but a different socket must be used
|
||||
* for unicast replies.
|
||||
*/
|
||||
Transport::shared_pointer _replyTransport;
|
||||
|
||||
/**
|
||||
* Bind address.
|
||||
*/
|
||||
|
||||
@@ -68,8 +68,8 @@ namespace epics {
|
||||
}
|
||||
|
||||
// sockets are blocking by default
|
||||
Transport::shared_pointer transport = BlockingUDPTransport::create(_serverFlag,
|
||||
responseHandler, socket, bindAddress, transportRevision);
|
||||
Transport::shared_pointer transport(new BlockingUDPTransport(_serverFlag, responseHandler,
|
||||
socket, bindAddress, transportRevision));
|
||||
return transport;
|
||||
}
|
||||
|
||||
|
||||
@@ -40,8 +40,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
|
||||
|
||||
PVACCESS_REFCOUNT_MONITOR_DEFINE(blockingUDPTransport);
|
||||
|
||||
BlockingUDPTransport::BlockingUDPTransport(
|
||||
bool serverFlag,
|
||||
BlockingUDPTransport::BlockingUDPTransport(bool serverFlag,
|
||||
auto_ptr<ResponseHandler>& responseHandler, SOCKET channel,
|
||||
osiSockAddr& bindAddress,
|
||||
short /*remoteTransportRevision*/) :
|
||||
@@ -213,7 +212,8 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
|
||||
|
||||
osiSockAddr fromAddress;
|
||||
osiSocklen_t addrStructSize = sizeof(sockaddr);
|
||||
Transport::shared_pointer thisTransport = shared_from_this();
|
||||
Transport::shared_pointer thisTransport = shared_from_this(),
|
||||
replyTo(_replyTransport ? _replyTransport : thisTransport);
|
||||
|
||||
try {
|
||||
|
||||
@@ -249,7 +249,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
|
||||
_receiveBuffer->flip();
|
||||
|
||||
try{
|
||||
processBuffer(thisTransport, fromAddress, _receiveBuffer.get());
|
||||
processBuffer(replyTo, fromAddress, _receiveBuffer.get());
|
||||
}catch(std::exception& e){
|
||||
LOG(logLevelError,
|
||||
"an exception caught while in UDP receiveThread at %s:%d: %s",
|
||||
@@ -304,7 +304,8 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
|
||||
_shutdownEvent.signal();
|
||||
}
|
||||
|
||||
bool BlockingUDPTransport::processBuffer(Transport::shared_pointer const & thisTransport, osiSockAddr& fromAddress, ByteBuffer* receiveBuffer) {
|
||||
bool BlockingUDPTransport::processBuffer(Transport::shared_pointer const & replyTransport,
|
||||
osiSockAddr& fromAddress, ByteBuffer* receiveBuffer) {
|
||||
|
||||
// handle response(s)
|
||||
while(likely((int)receiveBuffer->getRemaining()>=PVA_MESSAGE_HEADER_SIZE)) {
|
||||
@@ -342,7 +343,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
|
||||
if(unlikely(nextRequestPosition>receiveBuffer->getLimit())) return false;
|
||||
|
||||
// handle
|
||||
_responseHandler->handleResponse(&fromAddress, thisTransport,
|
||||
_responseHandler->handleResponse(&fromAddress, replyTransport,
|
||||
version, command, payloadSize,
|
||||
_receiveBuffer.get());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user