From 6917f6d468d72cb64ca92ad70d839c3488720e83 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Fri, 24 Dec 2010 10:37:16 +0100 Subject: [PATCH] caConstants.h: removing needless declarations. blockingUDPTransport.*: implementation - work in progress remote.h: added file inetAddressUtil.*: added 'inetAddressToString' function and fixed 'getSocketAddressList' declaration. inetAddressUtilsTest.c: using the new function. Makefile: added remote.h --- pvAccessApp/Makefile | 1 + pvAccessApp/ca/caConstants.h | 6 +- pvAccessApp/remote/blockingUDPTransport.cpp | 37 +++- pvAccessApp/remote/blockingUDPTransport.h | 109 +++++++++++- pvAccessApp/remote/remote.h | 188 ++++++++++++++++++++ pvAccessApp/utils/inetAddressUtil.cpp | 31 +++- pvAccessApp/utils/inetAddressUtil.h | 16 +- testApp/utils/inetAddressUtilsTest.cpp | 32 +--- 8 files changed, 364 insertions(+), 56 deletions(-) create mode 100644 pvAccessApp/remote/remote.h diff --git a/pvAccessApp/Makefile b/pvAccessApp/Makefile index 0c94ec2..286276c 100644 --- a/pvAccessApp/Makefile +++ b/pvAccessApp/Makefile @@ -32,6 +32,7 @@ LIBSRCS += ChannelAccessFactory.cpp SRC_DIRS += $(PVACCESS)/remote +INC += remote.h INC += blockingUDPTransport.h LIBSRCS += blockingUDPTransport.cpp diff --git a/pvAccessApp/ca/caConstants.h b/pvAccessApp/ca/caConstants.h index 947f28b..7956067 100644 --- a/pvAccessApp/ca/caConstants.h +++ b/pvAccessApp/ca/caConstants.h @@ -8,10 +8,6 @@ #ifndef CONSTANTS_H_ #define CONSTANTS_H_ -#include - -using namespace epics::pvData; - namespace epics { namespace pvAccess { @@ -71,7 +67,7 @@ namespace epics { const int32 UNREASONABLE_CHANNEL_NAME_LENGTH = 500; /** Invalid data type. */ - const int16 INVALID_DATA_TYPE = (short)0xFFFF; + const int16 INVALID_DATA_TYPE = (int16)0xFFFF; } } diff --git a/pvAccessApp/remote/blockingUDPTransport.cpp b/pvAccessApp/remote/blockingUDPTransport.cpp index 2b98bd9..d7b7552 100644 --- a/pvAccessApp/remote/blockingUDPTransport.cpp +++ b/pvAccessApp/remote/blockingUDPTransport.cpp @@ -9,6 +9,7 @@ #include "blockingUDPTransport.h" #include "caConstants.h" +#include "inetAddressUtil.h" /* pvData */ #include @@ -18,9 +19,9 @@ #include #include +/* standard */ #include - namespace epics { namespace pvAccess { @@ -28,7 +29,7 @@ namespace epics { BlockingUDPTransport::BlockingUDPTransport(SOCKET channel, osiSockAddr* bindAddress, osiSockAddr* sendAddresses, - short remoteTransportRevision) { + short remoteTransportRevision) { this->channel = channel; this->bindAddress = bindAddress; this->sendAddresses = sendAddresses; @@ -42,6 +43,7 @@ namespace epics { sendBuffer = new ByteBuffer(MAX_UDP_SEND); ignoredAddresses = NULL; + sendTo = NULL; closed = false; lastMessageStartPosition = 0; } @@ -56,14 +58,35 @@ namespace epics { } void BlockingUDPTransport::close(bool forced) { - if (closed) - return; + if(closed) return; closed = true; - if (bindAddress != NULL) - errlogSevPrintf( errlogInfo, "UDP connection to %d closed.", *bindAddress); + if(bindAddress!=NULL) errlogSevPrintf(errlogInfo, + "UDP connection to %s closed.", inetAddressToString( + bindAddress).c_str()); - //std::fclose(channel); + // TODO: finish implementation + + } + + void BlockingUDPTransport::enqueueSendRequest(TransportSender* sender) { + // TODO implement + } + + void BlockingUDPTransport::startMessage(int8 command, + int ensureCapacity) { + lastMessageStartPosition = sendBuffer->getPosition(); + sendBuffer->putShort(CA_MAGIC_AND_VERSION); + sendBuffer->putByte(0); // data + sendBuffer->putByte(command); // command + sendBuffer->putInt(0); // temporary zero payload + } + + void BlockingUDPTransport::endMessage() { + int32 data = lastMessageStartPosition+(16/8+2); + sendBuffer->put((char*)&data, sendBuffer->getPosition() + -lastMessageStartPosition-CA_MESSAGE_HEADER_SIZE, + sizeof(int32)); } } diff --git a/pvAccessApp/remote/blockingUDPTransport.h b/pvAccessApp/remote/blockingUDPTransport.h index e52fc55..9b5fe68 100644 --- a/pvAccessApp/remote/blockingUDPTransport.h +++ b/pvAccessApp/remote/blockingUDPTransport.h @@ -8,6 +8,9 @@ #ifndef BLOCKINGUDPTRANSPORT_H_ #define BLOCKINGUDPTRANSPORT_H_ +#include "remote.h" +#include "caConstants.h" + #include #include @@ -17,24 +20,120 @@ namespace epics { namespace pvAccess { - class BlockingUDPTransport : public epics::pvData::NoDefaultMethods { + class BlockingUDPTransport : public epics::pvData::NoDefaultMethods, + public Transport, + public TransportSendControl { public: BlockingUDPTransport(SOCKET channel, osiSockAddr* bindAddress, osiSockAddr* sendAddresses, short remoteTransportRevision); - ~BlockingUDPTransport(); + virtual ~BlockingUDPTransport(); - bool isClosed() { + virtual bool isClosed() const { return closed; } + virtual const osiSockAddr* getRemoteAddress() const { + return socketAddress; + } + + virtual const String getType() const { + return String("UDP"); + } + + virtual int8 getMajorRevision() const { + return CA_MAJOR_PROTOCOL_REVISION; + } + + virtual int8 getMinorRevision() const { + return CA_MINOR_PROTOCOL_REVISION; + } + + virtual int getReceiveBufferSize() const { + return receiveBuffer->getSize(); + } + + virtual int getSocketReceiveBufferSize() const { + // Get value of the SO_RCVBUF option for this DatagramSocket, + // that is the buffer size used by the platform for input on + // this DatagramSocket. + + // TODO: real implementation + return MAX_UDP_RECV; + } + + virtual int16 getPriority() const { + return CA_DEFAULT_PRIORITY; + } + + virtual void setRemoteMinorRevision(int8 minor) { + // noop + } + + virtual void setRemoteTransportReceiveBufferSize( + int receiveBufferSize) { + // noop for UDP (limited by 64k; MAX_UDP_SEND for CA) + } + + virtual void setRemoteTransportSocketReceiveBufferSize( + int socketReceiveBufferSize) { + // noop for UDP (limited by 64k; MAX_UDP_SEND for CA) + } + + virtual void aliveNotification() { + // noop + } + + virtual void changedTransport() { + // noop + } + + virtual bool isVerified() const { + return false; + } + + virtual void verified() { + // noop + } + + virtual void enqueueSendRequest(TransportSender* sender); + void start(); - void close(bool forced); + + virtual void close(bool forced); + + virtual void ensureData(int size) { + // TODO: implement + } + + virtual void startMessage(int8 command, int ensureCapacity); + virtual void endMessage(); + + virtual void flush(bool lastMessageCompleted) { + // noop since all UDP requests are sent immediately + } + + virtual void setRecipient(const osiSockAddr* sendTo) { + this->sendTo = sendTo; + } + + virtual void flushSerializeBuffer() { + // TODO Auto-generated method stub + } + + virtual void ensureBuffer(int size) { + // noop + } protected: bool closed; + virtual void processRead(); + private: + bool processBuffer(osiSockAddr* fromAddress, + ByteBuffer* receiveBuffer); + // Context only used for logging in this class /** @@ -62,6 +161,8 @@ namespace epics { */ osiSockAddr* ignoredAddresses; + const osiSockAddr* sendTo; + /** * Receive buffer. */ diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h new file mode 100644 index 0000000..4681feb --- /dev/null +++ b/pvAccessApp/remote/remote.h @@ -0,0 +1,188 @@ +/* + * remote.h + * + * Created on: Dec 21, 2010 + * Author: user + */ + +#ifndef REMOTE_H_ +#define REMOTE_H_ + +#include +#include +#include + +#include +#include + +namespace epics { + namespace pvAccess { + + using namespace epics::pvData; + + enum ProtocolType { + TCP, UDP, SSL + }; + + /** + * Interface defining transport send control. + * @author Matej Sekoranja + */ + class TransportSendControl : public SerializableControl { + public: + virtual void startMessage(int8 command, int ensureCapacity) =0; + virtual void endMessage() =0; + + virtual void flush(bool lastMessageCompleted) =0; + + virtual void setRecipient(const osiSockAddr* sendTo) =0; + }; + + /** + * Interface defining transport sender (instance sending data over transport). + * @author Matej Sekoranja + * @version $Id: TransportSender.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ + */ + class TransportSender { + public: + /** + * Called by transport. + * By this call transport gives callee ownership over the buffer. + * Calls on TransportSendControl instance must be made from + * calling thread. Moreover, ownership is valid only for the time of call + * of this method. + * NOTE: these limitations allows efficient implementation. + */ + virtual void + send(ByteBuffer* buffer, TransportSendControl* control) =0; + + virtual void lock() =0; + virtual void unlock() =0; + }; + + /** + * Interface defining transport (connection). + * @author Matej Sekoranja + * @version $Id: Transport.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ + */ + class Transport : public DeserializableControl { + public: + /** + * Get remote address. + * @return remote address. + */ + virtual const osiSockAddr* getRemoteAddress() const =0; + + /** + * Get protocol type (tcp, udp, ssl, etc.). + * @return protocol type. + */ + virtual const String getType() const =0; + + /** + * Get context transport is living in. + * @return context transport is living in. + */ + //public Context getContext(); + + /** + * Transport protocol major revision. + * @return protocol major revision. + */ + virtual int8 getMajorRevision() const =0; + + /** + * Transport protocol minor revision. + * @return protocol minor revision. + */ + virtual int8 getMinorRevision() const =0; + + /** + * Get receive buffer size. + * @return receive buffer size. + */ + virtual int getReceiveBufferSize() const =0; + + /** + * Get socket receive buffer size. + * @return socket receive buffer size. + */ + virtual int getSocketReceiveBufferSize() const =0; + + /** + * Transport priority. + * @return protocol priority. + */ + virtual int16 getPriority() const =0; + + /** + * Set remote transport protocol minor revision. + * @param minor protocol minor revision. + */ + virtual void setRemoteMinorRevision(int8 minor) =0; + + /** + * Set remote transport receive buffer size. + * @param receiveBufferSize receive buffer size. + */ + virtual void setRemoteTransportReceiveBufferSize( + int receiveBufferSize) =0; + + /** + * Set remote transport socket receive buffer size. + * @param socketReceiveBufferSize remote socket receive buffer size. + */ + virtual void setRemoteTransportSocketReceiveBufferSize( + int socketReceiveBufferSize) =0; + + /** + * Notification transport that is still alive. + */ + virtual void aliveNotification() =0; + + /** + * Notification that transport has changed. + */ + virtual void changedTransport() =0; + + /** + * Get introspection registry for transport. + * @return IntrospectionRegistry instance. + */ + //virtual IntrospectionRegistry getIntrospectionRegistry() =0; + + /** + * Close transport. + * @param force flag indicating force-full (e.g. remote disconnect) close. + */ + virtual void close(bool force) =0; + + /** + * Check connection status. + * @return true if connected. + */ + virtual bool isClosed() const =0; + + /** + * Get transport verification status. + * @return verification flag. + */ + virtual bool isVerified() const =0; + + /** + * Notify transport that it is has been verified. + */ + virtual void verified() =0; + + /** + * Enqueue send request. + * @param sender + */ + virtual void enqueueSendRequest(TransportSender* sender) =0; + + }; + + } +} + +#endif /* REMOTE_H_ */ diff --git a/pvAccessApp/utils/inetAddressUtil.cpp b/pvAccessApp/utils/inetAddressUtil.cpp index c100d56..8c9de71 100644 --- a/pvAccessApp/utils/inetAddressUtil.cpp +++ b/pvAccessApp/utils/inetAddressUtil.cpp @@ -4,17 +4,23 @@ * Created on: Nov 12, 2010 * Author: Miha Vitorovic */ - +/* pvAccess */ #include "inetAddressUtil.h" -#include +/* pvData */ +#include + +/* EPICSv3 */ #include #include +#include +#include + +/* standard */ +#include #include #include -#include -#include -#include +#include using namespace std; using namespace epics::pvData; @@ -204,5 +210,20 @@ namespace epics { return iav; } + const String inetAddressToString(const osiSockAddr *addr, + bool displayHex) { + stringstream saddr; + + saddr<<(int)((addr->ia.sin_addr.s_addr)>>24)<<'.'; + saddr<<((int)((addr->ia.sin_addr.s_addr)>>16)&0xFF)<<'.'; + saddr<<((int)((addr->ia.sin_addr.s_addr)>>8)&0xFF)<<'.'; + saddr<<((int)(addr->ia.sin_addr.s_addr)&0xFF); + if(addr->ia.sin_port>0) saddr<<":"<ia.sin_port; + if(displayHex) saddr<<" ("<ia.sin_addr.s_addr))<<")"; + + return saddr.str(); + } + } } diff --git a/pvAccessApp/utils/inetAddressUtil.h b/pvAccessApp/utils/inetAddressUtil.h index 5b8c687..512cd86 100644 --- a/pvAccessApp/utils/inetAddressUtil.h +++ b/pvAccessApp/utils/inetAddressUtil.h @@ -44,8 +44,7 @@ namespace epics { * @param address address to encode. */ void - encodeAsIPv6Address(ByteBuffer* buffer, - const osiSockAddr* address); + encodeAsIPv6Address(ByteBuffer* buffer, const osiSockAddr* address); /** * Convert an integer into an IPv4 INET address. @@ -69,17 +68,10 @@ namespace epics { * @return array of InetSocketAddress. */ InetAddrVector* getSocketAddressList(String list, int defaultPort, - const InetAddrVector* appendList); + const InetAddrVector* appendList = NULL); - /** - * Parse space delimited addresss[:port] string and return array of InetSocketAddress. - * @param list space delimited addresss[:port] string. - * @param defaultPort port take if not specified. - * @return array of InetSocketAddress. - */ - InetAddrVector* getSocketAddressList(String list, int defaultPort) { - return getSocketAddressList(list, defaultPort, NULL); - } + const String inetAddressToString(const osiSockAddr *addr, + bool displayHex = false); } } diff --git a/testApp/utils/inetAddressUtilsTest.cpp b/testApp/utils/inetAddressUtilsTest.cpp index c2e84ee..16695e0 100644 --- a/testApp/utils/inetAddressUtilsTest.cpp +++ b/testApp/utils/inetAddressUtilsTest.cpp @@ -11,7 +11,6 @@ #include #include #include -#include #include using namespace epics::pvAccess; @@ -20,19 +19,6 @@ using std::endl; using std::stringstream; using std::hex; -String inetAddressToString(osiSockAddr *addr) { - stringstream saddr; - - saddr<<(int)((addr->ia.sin_addr.s_addr)>>24)<<'.'; - saddr<<((int)((addr->ia.sin_addr.s_addr)>>16)&0xFF)<<'.'; - saddr<<((int)((addr->ia.sin_addr.s_addr)>>8)&0xFF)<<'.'; - saddr<<((int)(addr->ia.sin_addr.s_addr)&0xFF); - if(addr->ia.sin_port>0) saddr<<":"<ia.sin_port; - saddr<<" ("<ia.sin_addr.s_addr))<<")"; - - return saddr.str(); -} - int main(int argc, char *argv[]) { InetAddrVector *vec; @@ -48,19 +34,19 @@ int main(int argc, char *argv[]) { assert(addr->ia.sin_family==AF_INET); assert(addr->ia.sin_port==555); assert(addr->ia.sin_addr.s_addr==(uint32_t)0x7F000001); - cout<<'\t'<at(1); assert(addr->ia.sin_family==AF_INET); assert(addr->ia.sin_port==1234); assert(addr->ia.sin_addr.s_addr==(uint32_t)0x0A0A0C0B); - cout<<'\t'<at(2); assert(addr->ia.sin_family==AF_INET); assert(addr->ia.sin_port==555); assert(addr->ia.sin_addr.s_addr==(uint32_t)0xC0A80304); - cout<<'\t'<ia.sin_family==AF_INET); assert(addr->ia.sin_port==6789); assert(addr->ia.sin_addr.s_addr==(uint32_t)0xAC1037A0); - cout<<'\t'<at(1); assert(addr->ia.sin_family==AF_INET); assert(addr->ia.sin_port==555); assert(addr->ia.sin_addr.s_addr==(uint32_t)0x7F000001); - cout<<'\t'<at(2); assert(addr->ia.sin_family==AF_INET); assert(addr->ia.sin_port==1234); assert(addr->ia.sin_addr.s_addr==(uint32_t)0x0A0A0C0B); - cout<<'\t'<at(3); assert(addr->ia.sin_family==AF_INET); assert(addr->ia.sin_port==555); assert(addr->ia.sin_addr.s_addr==(uint32_t)0xC0A80304); - cout<<'\t'<ia.sin_family==AF_INET); - cout<<'\t'<ia.sin_family==AF_INET); - cout<<'\t'<