From fe5ea9442c674eec8ce90496257b451a354ee6c8 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Tue, 28 Dec 2010 15:47:05 +0100 Subject: [PATCH] A working blockingUDPTransport with test. TODO: debug stopping listener thread. --- pvAccessApp/remote/blockingUDPTransport.cpp | 23 ++--- pvAccessApp/remote/remote.h | 2 +- pvAccessApp/utils/inetAddressUtil.cpp | 12 +-- testApp/Makefile | 1 + testApp/remote/Makefile | 17 ++++ testApp/remote/testBlockingUDPClnt.cpp | 98 +++++++++++++++++++++ testApp/remote/testBlockingUDPSrv.cpp | 95 ++++++++++++++++++++ 7 files changed, 232 insertions(+), 16 deletions(-) create mode 100644 testApp/remote/Makefile create mode 100644 testApp/remote/testBlockingUDPClnt.cpp create mode 100644 testApp/remote/testBlockingUDPSrv.cpp diff --git a/pvAccessApp/remote/blockingUDPTransport.cpp b/pvAccessApp/remote/blockingUDPTransport.cpp index 1ea101c..b8131b4 100644 --- a/pvAccessApp/remote/blockingUDPTransport.cpp +++ b/pvAccessApp/remote/blockingUDPTransport.cpp @@ -73,8 +73,8 @@ namespace epics { _closed = true; if(_bindAddress!=NULL) errlogSevPrintf(errlogInfo, - "UDP socket %s closed.", inetAddressToString( - _bindAddress).c_str()); + "UDP socket %s closed.", + inetAddressToString(_bindAddress).c_str()); int retval = ::close(_channel); @@ -92,7 +92,7 @@ namespace epics { sender->send(_sendBuffer, this); sender->unlock(); endMessage(); - if(_sendTo!=NULL) send(_sendBuffer, _sendTo); + send(_sendBuffer, _sendTo); } catch(...) { sender->unlock(); } @@ -108,10 +108,13 @@ namespace epics { } void BlockingUDPTransport::endMessage() { - int32 data = _lastMessageStartPosition+(16/8+2); - _sendBuffer->put((char*)&data, _sendBuffer->getPosition() - -_lastMessageStartPosition-CA_MESSAGE_HEADER_SIZE, - sizeof(int32)); + int oldPosition = _sendBuffer->getPosition(); + _sendBuffer->setPosition(_lastMessageStartPosition + +(sizeof(int16)+2)); + _sendBuffer->putInt(oldPosition-_lastMessageStartPosition + -CA_MESSAGE_HEADER_SIZE); + _sendBuffer->setPosition(oldPosition); + } void BlockingUDPTransport::processRead() { @@ -157,7 +160,7 @@ namespace epics { if(bytesRead>0) { // successfully got datagram bool ignore = false; - if(_ignoredAddresses!=NULL) for(int i = 0; i + if(_ignoredAddresses!=NULL) for(size_t i = 0; i <_ignoredAddresses->size(); i++) if(_ignoredAddresses->at(i)->ia.sin_addr.s_addr ==fromAddress.ia.sin_addr.s_addr) { @@ -255,7 +258,7 @@ namespace epics { bool BlockingUDPTransport::send(ByteBuffer* buffer, const osiSockAddr* address) { - if(address==NULL||_sendAddresses==NULL) return false; + if(address==NULL&&_sendAddresses==NULL) return false; if(address!=NULL) { buffer->flip(); @@ -270,7 +273,7 @@ namespace epics { } } else { - for(int i = 0; i<_sendAddresses->size(); i++) { + for(size_t i = 0; i<_sendAddresses->size(); i++) { buffer->flip(); int retval = sendto(_channel, buffer->getArray(), buffer->getLimit(), 0, diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index e8b0dc2..46f3509 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -197,7 +197,7 @@ namespace epics { * @param[in] payloadSize size of this message data available in the payloadBuffer. * @param[in] payloadBuffer message payload data. * Note that this might not be the only message in the buffer. - * Code must not manilupate buffer. + * Code must not manipulate buffer. */ virtual void handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, diff --git a/pvAccessApp/utils/inetAddressUtil.cpp b/pvAccessApp/utils/inetAddressUtil.cpp index 8c9de71..e513380 100644 --- a/pvAccessApp/utils/inetAddressUtil.cpp +++ b/pvAccessApp/utils/inetAddressUtil.cpp @@ -214,11 +214,13 @@ namespace epics { bool displayHex) { stringstream saddr; - saddr<<(int)((addr->ia.sin_addr.s_addr)>>24)<<'.'; - saddr<<((int)((addr->ia.sin_addr.s_addr)>>16)&0xFF)<<'.'; - saddr<<((int)((addr->ia.sin_addr.s_addr)>>8)&0xFF)<<'.'; - saddr<<((int)(addr->ia.sin_addr.s_addr)&0xFF); - if(addr->ia.sin_port>0) saddr<<":"<ia.sin_port; + int ipa = ntohl(addr->ia.sin_addr.s_addr); + + saddr<<((int)(ipa>>24)&0xFF)<<'.'; + saddr<<((int)(ipa>>16)&0xFF)<<'.'; + saddr<<((int)(ipa>>8)&0xFF)<<'.'; + saddr<<((int)ipa&0xFF); + if(addr->ia.sin_port>0) saddr<<":"<ia.sin_port); if(displayHex) saddr<<" ("<ia.sin_addr.s_addr))<<")"; diff --git a/testApp/Makefile b/testApp/Makefile index 700bd55..0b70107 100644 --- a/testApp/Makefile +++ b/testApp/Makefile @@ -2,4 +2,5 @@ TOP = .. include $(TOP)/configure/CONFIG DIRS += utils DIRS += client +DIRS += remote include $(TOP)/configure/RULES_DIRS diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile new file mode 100644 index 0000000..8e5c932 --- /dev/null +++ b/testApp/remote/Makefile @@ -0,0 +1,17 @@ +TOP=../.. + +include $(TOP)/configure/CONFIG + +PROD_HOST += testBlockingUDPSrv +testBlockingUDPSrv_SRCS += testBlockingUDPSrv.cpp +testBlockingUDPSrv_LIBS += pvData pvAccess Com + +PROD_HOST += testBlockingUDPClnt +testBlockingUDPClnt_SRCS += testBlockingUDPClnt.cpp +testBlockingUDPClnt_LIBS += pvData pvAccess Com + + +include $(TOP)/configure/RULES +#---------------------------------------- +# ADD RULES AFTER THIS LINE + diff --git a/testApp/remote/testBlockingUDPClnt.cpp b/testApp/remote/testBlockingUDPClnt.cpp new file mode 100644 index 0000000..c874898 --- /dev/null +++ b/testApp/remote/testBlockingUDPClnt.cpp @@ -0,0 +1,98 @@ +/* + * testBlockingUDPClnt.cpp + * + * Created on: Dec 28, 2010 + * Author: Miha Vitorovic + */ + +#include "remote.h" +#include "blockingUDP.h" +#include "logger.h" +#include "inetAddressUtil.h" + +#include + +#include +#include + +using namespace epics::pvAccess; +using namespace epics::pvData; + +using std::cout; +using std::endl; +using std::sscanf; + +static osiSockAddr sendTo; + +class DummyResponseHandler : public ResponseHandler { +public: + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, int payloadSize, + ByteBuffer* payloadBuffer) { + } +}; + +class DummyTransportSender : public TransportSender { +public: + DummyTransportSender() { + for(int i = 0; i<20; i++) + data[i] = (char)(i+1); + count = 0; + } + + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + // set recipient + sendTo.ia.sin_family = AF_INET; + sendTo.ia.sin_port = htons(65000); + if(inet_aton("192.168.71.129",&sendTo.ia.sin_addr)==0) { + cout<<"error in inet_aton()"<setRecipient(&sendTo); + + // send the packet + count++; + control->startMessage((int8)(count+0x10), 0); + buffer->put(data, 0, count); + //control->endMessage(); + } + + virtual void lock() { + } + virtual void unlock() { + } +private: + char data[20]; + int count; +}; + +void testBlockingUDPSender() { + BlockingUDPConnector connector(false, NULL, true); + + DummyTransportSender dts; + DummyResponseHandler drh; + + osiSockAddr bindAddr; + + bindAddr.ia.sin_family = AF_INET; + bindAddr.ia.sin_port = htons(65001); + bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY); + + Transport* transport = connector.connect(NULL, &drh, &bindAddr, 1, 50); + + cout<<"Sending 10 packets..."<enqueueSendRequest(&dts); + sleep(1); + } + +} + +int main(int argc, char *argv[]) { + createFileLogger("testBlockingUDPClnt.log"); + + testBlockingUDPSender(); + return (0); +} diff --git a/testApp/remote/testBlockingUDPSrv.cpp b/testApp/remote/testBlockingUDPSrv.cpp new file mode 100644 index 0000000..9df1d59 --- /dev/null +++ b/testApp/remote/testBlockingUDPSrv.cpp @@ -0,0 +1,95 @@ +/* + * blockingUDPTest.cpp + * + * Created on: Dec 28, 2010 + * Author: Miha Vitorovic + */ + +#include "remote.h" +#include "blockingUDP.h" +#include "logger.h" +#include "inetAddressUtil.h" +#include "hexDump.h" + +#include +#include + +using namespace epics::pvAccess; +using std::cout; +using std::endl; +using std::hex; +using std::dec; + +class DummyResponseHandler : public ResponseHandler { +public: + DummyResponseHandler() : + packets(0) { + } + + int getPackets() { + return packets; + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, int payloadSize, + ByteBuffer* payloadBuffer); +private: + int packets; +}; + +void DummyResponseHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, int payloadSize, + ByteBuffer* payloadBuffer) { + + packets++; + + std::ostringstream os; + + cout<<"Received new UDP datagram["<get(payload, 0, dataCount); + os<<"Payload ("<start(); + + cout<<"Waiting for 10 packets..."<