merge + signed/unsigned comparison fix

This commit is contained in:
Matej Sekoranja
2010-12-27 10:12:09 +01:00
11 changed files with 654 additions and 54 deletions

View File

@@ -1 +1,10 @@
QtC-pvAccess.creator.user
syntax: glob
O.Common
O.linux-x86
syntax: regexp
^bin
^include

View File

@@ -33,6 +33,10 @@ LIBSRCS += ChannelAccessFactory.cpp
LIBSRCS += CreateRequestFactory.cpp
SRC_DIRS += $(PVACCESS)/remote
INC += remote.h
INC += blockingUDPTransport.h
LIBSRCS += blockingUDPTransport.cpp
LIBRARY = pvAccess

View File

@@ -8,10 +8,6 @@
#ifndef CONSTANTS_H_
#define CONSTANTS_H_
#include <pvIntrospect.h>
using namespace epics::pvData;
namespace epics {
namespace pvAccess {
@@ -71,7 +67,7 @@ namespace epics {
const int32 UNREASONABLE_CHANNEL_NAME_LENGTH = 500;
/** Invalid data type. */
const int16 INVALID_DATA_TYPE = (short)0xFFFF;
const int16 INVALID_DATA_TYPE = (int16)0xFFFF;
}
}

View File

@@ -0,0 +1,194 @@
/*
* blockingUDPTransport.cpp
*
* Created on: Dec 20, 2010
* Author: Miha Vitorovic
*/
/* pvAccess */
#include "blockingUDPTransport.h"
#include "caConstants.h"
#include "inetAddressUtil.h"
/* pvData */
#include <byteBuffer.h>
/* EPICSv3 */
#include <osdSock.h>
#include <osiSock.h>
#include <errlog.h>
/* standard */
#include <cstdio>
#include <unistd.h>
#include <poll.h>
#include <errno.h>
namespace epics {
namespace pvAccess {
using namespace epics::pvData;
BlockingUDPTransport::BlockingUDPTransport(SOCKET channel,
osiSockAddr* bindAddress, InetAddrVector* sendAddresses,
short remoteTransportRevision) {
this->channel = channel;
this->bindAddress = bindAddress;
this->sendAddresses = sendAddresses;
socketAddress = bindAddress;
// allocate receive buffer
receiveBuffer = new ByteBuffer(MAX_UDP_RECV);
// allocate send buffer and non-reentrant lock
sendBuffer = new ByteBuffer(MAX_UDP_SEND);
ignoredAddresses = NULL;
sendTo = NULL;
closed = false;
lastMessageStartPosition = 0;
readBuffer = new char[MAX_UDP_RECV];
}
BlockingUDPTransport::~BlockingUDPTransport() {
delete receiveBuffer;
delete sendBuffer;
delete readBuffer;
}
void BlockingUDPTransport::start() {
// TODO implement
}
void BlockingUDPTransport::close(bool forced) {
if(closed) return;
closed = true;
if(bindAddress!=NULL) errlogSevPrintf(errlogInfo,
"UDP connection to %s closed.", inetAddressToString(
bindAddress).c_str());
// TODO: finish implementation
}
void BlockingUDPTransport::enqueueSendRequest(TransportSender* sender) {
// TODO implement
}
void BlockingUDPTransport::startMessage(int8 command,
int ensureCapacity) {
lastMessageStartPosition = sendBuffer->getPosition();
sendBuffer->putShort(CA_MAGIC_AND_VERSION);
sendBuffer->putByte(0); // data
sendBuffer->putByte(command); // command
sendBuffer->putInt(0); // temporary zero payload
}
void BlockingUDPTransport::endMessage() {
int32 data = lastMessageStartPosition+(16/8+2);
sendBuffer->put((char*)&data, sendBuffer->getPosition()
-lastMessageStartPosition-CA_MESSAGE_HEADER_SIZE,
sizeof(int32));
}
void BlockingUDPTransport::processRead() {
// This function is always called from only one thread - this
// object's own thread.
pollfd pfd;
pfd.fd = channel;
pfd.events = POLLIN;
osiSockAddr fromAddress;
try {
while(!closed) {
// we poll to prevent blocking indefinitely
/* From 'accept' man page:
* In order to be notified of incoming connections on
* a socket, you can use select(2) or poll(2). A readable
* event will be delivered when a new connection is
* attempted and you may then call accept() to get a
* socket for that connection.
*/
int retval = poll(&pfd, 1, 100);
if(retval>0) {
// activity on SOCKET
if(pfd.revents&POLLIN) {
// data ready to be read
receiveBuffer->clear();
socklen_t addrStructSize = sizeof(sockaddr);
int bytesRead = recvfrom(channel, readBuffer,
MAX_UDP_RECV, 0, (sockaddr*)&fromAddress,
&addrStructSize);
if(bytesRead>0) {
// successfully got datagram
bool ignore = false;
if(ignoredAddresses!=NULL) for(size_t i = 0; i
<ignoredAddresses->size(); i++)
if(ignoredAddresses->at(i)->ia.sin_addr.s_addr
==fromAddress.ia.sin_addr.s_addr) {
ignore = true;
break;
}
if(!ignore) {
receiveBuffer->put(
readBuffer,
0,
bytesRead
<receiveBuffer->getRemaining() ? bytesRead
: receiveBuffer->getRemaining());
receiveBuffer->flip();
processBuffer(&fromAddress, receiveBuffer);
}
}
else {
// log a 'recvfrom' error
if(bytesRead==-1) errlogSevPrintf(errlogMajor,
"Socket recv error: %s", strerror(errno));
}
}
else {
// error (POLLERR, POLLHUP, or POLLNVAL)
if(pfd.revents&POLLERR) errlogSevPrintf(
errlogMajor, "Socket poll error (POLLERR)");
if(pfd.revents&POLLHUP) errlogSevPrintf(
errlogMinor, "Socket poll error (POLLHUP)");
if(pfd.revents&POLLNVAL) errlogSevPrintf(
errlogMajor,
"Socket poll error: server socket no longer bound.");
}
}
// retval == 0 : timeout
// retval < 0 : error
if(retval<0) errlogSevPrintf(errlogMajor,
"Socket poll error: %s", strerror(errno));
}
} catch(...) {
// TODO: catch all exceptions, and act accordingly
close(true);
}
}
bool BlockingUDPTransport::processBuffer(osiSockAddr* fromAddress,
ByteBuffer* receiveBuffer) {
// TODO: implement
return true;
}
}
}

View File

@@ -0,0 +1,212 @@
/*
* blockingUDPTransport.h
*
* Created on: Dec 20, 2010
* Author: Miha Vitorovic
*/
#ifndef BLOCKINGUDPTRANSPORT_H_
#define BLOCKINGUDPTRANSPORT_H_
/* pvAccess */
#include "remote.h"
#include "caConstants.h"
#include "inetAddressUtil.h"
/* pvData */
#include <noDefaultMethods.h>
#include <byteBuffer.h>
/* EPICSv3 */
#include <osdSock.h>
#include <osiSock.h>
namespace epics {
namespace pvAccess {
class BlockingUDPTransport : public epics::pvData::NoDefaultMethods,
public Transport,
public TransportSendControl {
public:
BlockingUDPTransport(SOCKET channel, osiSockAddr* bindAddress,
InetAddrVector* sendAddresses,
short remoteTransportRevision);
virtual ~BlockingUDPTransport();
virtual bool isClosed() const {
return closed;
}
virtual const osiSockAddr* getRemoteAddress() const {
return socketAddress;
}
virtual const String getType() const {
return String("UDP");
}
virtual int8 getMajorRevision() const {
return CA_MAJOR_PROTOCOL_REVISION;
}
virtual int8 getMinorRevision() const {
return CA_MINOR_PROTOCOL_REVISION;
}
virtual int getReceiveBufferSize() const {
return receiveBuffer->getSize();
}
virtual int getSocketReceiveBufferSize() const {
// Get value of the SO_RCVBUF option for this DatagramSocket,
// that is the buffer size used by the platform for input on
// this DatagramSocket.
// TODO: real implementation
return MAX_UDP_RECV;
}
virtual int16 getPriority() const {
return CA_DEFAULT_PRIORITY;
}
virtual void setRemoteMinorRevision(int8 minor) {
// noop
}
virtual void setRemoteTransportReceiveBufferSize(
int receiveBufferSize) {
// noop for UDP (limited by 64k; MAX_UDP_SEND for CA)
}
virtual void setRemoteTransportSocketReceiveBufferSize(
int socketReceiveBufferSize) {
// noop for UDP (limited by 64k; MAX_UDP_SEND for CA)
}
virtual void aliveNotification() {
// noop
}
virtual void changedTransport() {
// noop
}
virtual bool isVerified() const {
return false;
}
virtual void verified() {
// noop
}
virtual void enqueueSendRequest(TransportSender* sender);
void start();
virtual void close(bool forced);
virtual void ensureData(int size) {
// TODO: implement
}
virtual void startMessage(int8 command, int ensureCapacity);
virtual void endMessage();
virtual void flush(bool lastMessageCompleted) {
// noop since all UDP requests are sent immediately
}
virtual void setRecipient(const osiSockAddr* sendTo) {
this->sendTo = sendTo;
}
virtual void flushSerializeBuffer() {
// TODO Auto-generated method stub
}
virtual void ensureBuffer(int size) {
// noop
}
/**
* Set ignore list.
* @param addresses list of ignored addresses.
*/
void setIgnoredAddresses(InetAddrVector* addresses) {
ignoredAddresses = addresses;
}
/**
* Get list of ignored addresses.
* @return ignored addresses.
*/
InetAddrVector* getIgnoredAddresses() const {
return ignoredAddresses;
}
protected:
bool closed;
virtual void processRead();
private:
bool processBuffer(osiSockAddr* fromAddress,
ByteBuffer* receiveBuffer);
// Context only used for logging in this class
/**
* Corresponding channel.
*/
SOCKET channel;
/**
* Cached socket address.
*/
osiSockAddr* socketAddress;
/**
* Bind address.
*/
osiSockAddr* bindAddress;
/**
* Send addresses.
*/
InetAddrVector* sendAddresses;
/**
* Ignore addresses.
*/
InetAddrVector* ignoredAddresses;
const osiSockAddr* sendTo;
/**
* Receive buffer.
*/
epics::pvData::ByteBuffer* receiveBuffer;
/**
* Send buffer.
*/
epics::pvData::ByteBuffer* sendBuffer;
/**
* Last message start position.
*/
int lastMessageStartPosition;
/**
* Read buffer
*/
char* readBuffer;
};
}
}
#endif /* BLOCKINGUDPTRANSPORT_H_ */

188
pvAccessApp/remote/remote.h Normal file
View File

@@ -0,0 +1,188 @@
/*
* remote.h
*
* Created on: Dec 21, 2010
* Author: user
*/
#ifndef REMOTE_H_
#define REMOTE_H_
#include <serialize.h>
#include <pvType.h>
#include <byteBuffer.h>
#include <osiSock.h>
#include <osdSock.h>
namespace epics {
namespace pvAccess {
using namespace epics::pvData;
enum ProtocolType {
TCP, UDP, SSL
};
/**
* Interface defining transport send control.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
*/
class TransportSendControl : public SerializableControl {
public:
virtual void startMessage(int8 command, int ensureCapacity) =0;
virtual void endMessage() =0;
virtual void flush(bool lastMessageCompleted) =0;
virtual void setRecipient(const osiSockAddr* sendTo) =0;
};
/**
* Interface defining transport sender (instance sending data over transport).
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: TransportSender.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
*/
class TransportSender {
public:
/**
* Called by transport.
* By this call transport gives callee ownership over the buffer.
* Calls on <code>TransportSendControl</code> instance must be made from
* calling thread. Moreover, ownership is valid only for the time of call
* of this method.
* NOTE: these limitations allows efficient implementation.
*/
virtual void
send(ByteBuffer* buffer, TransportSendControl* control) =0;
virtual void lock() =0;
virtual void unlock() =0;
};
/**
* Interface defining transport (connection).
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: Transport.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
*/
class Transport : public DeserializableControl {
public:
/**
* Get remote address.
* @return remote address.
*/
virtual const osiSockAddr* getRemoteAddress() const =0;
/**
* Get protocol type (tcp, udp, ssl, etc.).
* @return protocol type.
*/
virtual const String getType() const =0;
/**
* Get context transport is living in.
* @return context transport is living in.
*/
//public Context getContext();
/**
* Transport protocol major revision.
* @return protocol major revision.
*/
virtual int8 getMajorRevision() const =0;
/**
* Transport protocol minor revision.
* @return protocol minor revision.
*/
virtual int8 getMinorRevision() const =0;
/**
* Get receive buffer size.
* @return receive buffer size.
*/
virtual int getReceiveBufferSize() const =0;
/**
* Get socket receive buffer size.
* @return socket receive buffer size.
*/
virtual int getSocketReceiveBufferSize() const =0;
/**
* Transport priority.
* @return protocol priority.
*/
virtual int16 getPriority() const =0;
/**
* Set remote transport protocol minor revision.
* @param minor protocol minor revision.
*/
virtual void setRemoteMinorRevision(int8 minor) =0;
/**
* Set remote transport receive buffer size.
* @param receiveBufferSize receive buffer size.
*/
virtual void setRemoteTransportReceiveBufferSize(
int receiveBufferSize) =0;
/**
* Set remote transport socket receive buffer size.
* @param socketReceiveBufferSize remote socket receive buffer size.
*/
virtual void setRemoteTransportSocketReceiveBufferSize(
int socketReceiveBufferSize) =0;
/**
* Notification transport that is still alive.
*/
virtual void aliveNotification() =0;
/**
* Notification that transport has changed.
*/
virtual void changedTransport() =0;
/**
* Get introspection registry for transport.
* @return <code>IntrospectionRegistry</code> instance.
*/
//virtual IntrospectionRegistry getIntrospectionRegistry() =0;
/**
* Close transport.
* @param force flag indicating force-full (e.g. remote disconnect) close.
*/
virtual void close(bool force) =0;
/**
* Check connection status.
* @return <code>true</code> if connected.
*/
virtual bool isClosed() const =0;
/**
* Get transport verification status.
* @return verification flag.
*/
virtual bool isVerified() const =0;
/**
* Notify transport that it is has been verified.
*/
virtual void verified() =0;
/**
* Enqueue send request.
* @param sender
*/
virtual void enqueueSendRequest(TransportSender* sender) =0;
};
}
}
#endif /* REMOTE_H_ */

View File

@@ -4,17 +4,23 @@
* Created on: Nov 12, 2010
* Author: Miha Vitorovic
*/
/* pvAccess */
#include "inetAddressUtil.h"
#include <vector>
/* pvData */
#include <byteBuffer.h>
/* EPICSv3 */
#include <osiSock.h>
#include <ellLib.h>
#include <epicsAssert.h>
#include <epicsException.h>
/* standard */
#include <vector>
#include <cstring>
#include <cstdlib>
#include <epicsAssert.h>
#include <byteBuffer.h>
#include <epicsException.h>
#include <sstream>
using namespace std;
using namespace epics::pvData;
@@ -204,5 +210,20 @@ namespace epics {
return iav;
}
const String inetAddressToString(const osiSockAddr *addr,
bool displayHex) {
stringstream saddr;
saddr<<(int)((addr->ia.sin_addr.s_addr)>>24)<<'.';
saddr<<((int)((addr->ia.sin_addr.s_addr)>>16)&0xFF)<<'.';
saddr<<((int)((addr->ia.sin_addr.s_addr)>>8)&0xFF)<<'.';
saddr<<((int)(addr->ia.sin_addr.s_addr)&0xFF);
if(addr->ia.sin_port>0) saddr<<":"<<addr->ia.sin_port;
if(displayHex) saddr<<" ("<<hex<<((uint32_t)(
addr->ia.sin_addr.s_addr))<<")";
return saddr.str();
}
}
}

View File

@@ -44,8 +44,7 @@ namespace epics {
* @param address address to encode.
*/
void
encodeAsIPv6Address(ByteBuffer* buffer,
const osiSockAddr* address);
encodeAsIPv6Address(ByteBuffer* buffer, const osiSockAddr* address);
/**
* Convert an integer into an IPv4 INET address.
@@ -69,17 +68,10 @@ namespace epics {
* @return array of <code>InetSocketAddress</code>.
*/
InetAddrVector* getSocketAddressList(String list, int defaultPort,
const InetAddrVector* appendList);
const InetAddrVector* appendList = NULL);
/**
* Parse space delimited addresss[:port] string and return array of <code>InetSocketAddress</code>.
* @param list space delimited addresss[:port] string.
* @param defaultPort port take if not specified.
* @return array of <code>InetSocketAddress</code>.
*/
InetAddrVector* getSocketAddressList(String list, int defaultPort) {
return getSocketAddressList(list, defaultPort, NULL);
}
const String inetAddressToString(const osiSockAddr *addr,
bool displayHex = false);
}
}

View File

@@ -4,7 +4,7 @@ include $(TOP)/configure/CONFIG
PROD_HOST += testChannelAccessFactory
testChannelAccessFactory_SRCS += testChannelAccessFactory.cpp
testChannelAccessFactory_LIBS += pvAccess Com
testChannelAccessFactory_LIBS += pvData pvAccess Com
PROD_HOST += testCreateRequest
testCreateRequest_SRCS += testCreateRequest.cpp

View File

@@ -4,29 +4,27 @@ include $(TOP)/configure/CONFIG
PROD_HOST += hexDumpTest
hexDumpTest_SRCS += hexDumpTest.cpp
hexDumpTest_LIBS += pvAccess
hexDumpTest_LIBS += pvAccess pvData
PROD_HOST += wildcharMatcherTest
wildcharMatcherTest_SRCS += wildcharMatcherTest.cpp
wildcharMatcherTest_LIBS += pvAccess Com
wildcharMatcherTest_LIBS += pvAccess pvData Com
PROD_HOST += arrayFIFOTest
arrayFIFOTest_SRCS += arrayFIFOTest.cpp
arrayFIFOTest_LIBS += pvAccess Com
arrayFIFOTest_LIBS += pvAccess pvData Com
PROD_HOST += growingCircularBufferTest
growingCircularBufferTest_SRCS += growingCircularBufferTest.cpp
growingCircularBufferTest_LIBS += pvAccess Com
growingCircularBufferTest_LIBS += pvAccess pvData Com
PROD_HOST += inetAddressUtilsTest
inetAddressUtilsTest_SRCS += inetAddressUtilsTest.cpp
inetAddressUtilsTest_LIBS += pvAccess Com pvData
pvData_DIR = $(PVDATA_HOME)/lib/$(EPICS_HOST_ARCH)
inetAddressUtilsTest_LIBS += pvAccess pvData Com
PROD_HOST += loggerTest
loggerTest_SRCS += loggerTest.cpp
loggerTest_LIBS += pvAccess Com
loggerTest_LIBS += pvAccess pvData Com
include $(TOP)/configure/RULES
#----------------------------------------

View File

@@ -11,7 +11,6 @@
#include <pvType.h>
#include <epicsAssert.h>
#include <iostream>
#include <sstream>
#include <cstring>
using namespace epics::pvAccess;
@@ -20,19 +19,6 @@ using std::endl;
using std::stringstream;
using std::hex;
String inetAddressToString(osiSockAddr *addr) {
stringstream saddr;
saddr<<(int)((addr->ia.sin_addr.s_addr)>>24)<<'.';
saddr<<((int)((addr->ia.sin_addr.s_addr)>>16)&0xFF)<<'.';
saddr<<((int)((addr->ia.sin_addr.s_addr)>>8)&0xFF)<<'.';
saddr<<((int)(addr->ia.sin_addr.s_addr)&0xFF);
if(addr->ia.sin_port>0) saddr<<":"<<addr->ia.sin_port;
saddr<<" ("<<hex<<((uint32_t)(addr->ia.sin_addr.s_addr))<<")";
return saddr.str();
}
int main(int argc, char *argv[]) {
InetAddrVector *vec;
@@ -48,19 +34,19 @@ int main(int argc, char *argv[]) {
assert(addr->ia.sin_family==AF_INET);
assert(addr->ia.sin_port==555);
assert(addr->ia.sin_addr.s_addr==(uint32_t)0x7F000001);
cout<<'\t'<<inetAddressToString(addr)<<endl;
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
addr = vec->at(1);
assert(addr->ia.sin_family==AF_INET);
assert(addr->ia.sin_port==1234);
assert(addr->ia.sin_addr.s_addr==(uint32_t)0x0A0A0C0B);
cout<<'\t'<<inetAddressToString(addr)<<endl;
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
addr = vec->at(2);
assert(addr->ia.sin_family==AF_INET);
assert(addr->ia.sin_port==555);
assert(addr->ia.sin_addr.s_addr==(uint32_t)0xC0A80304);
cout<<'\t'<<inetAddressToString(addr)<<endl;
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
cout<<"\nPASSED!\n";
@@ -73,25 +59,25 @@ int main(int argc, char *argv[]) {
assert(addr->ia.sin_family==AF_INET);
assert(addr->ia.sin_port==6789);
assert(addr->ia.sin_addr.s_addr==(uint32_t)0xAC1037A0);
cout<<'\t'<<inetAddressToString(addr)<<endl;
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
addr = vec1->at(1);
assert(addr->ia.sin_family==AF_INET);
assert(addr->ia.sin_port==555);
assert(addr->ia.sin_addr.s_addr==(uint32_t)0x7F000001);
cout<<'\t'<<inetAddressToString(addr)<<endl;
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
addr = vec1->at(2);
assert(addr->ia.sin_family==AF_INET);
assert(addr->ia.sin_port==1234);
assert(addr->ia.sin_addr.s_addr==(uint32_t)0x0A0A0C0B);
cout<<'\t'<<inetAddressToString(addr)<<endl;
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
addr = vec1->at(3);
assert(addr->ia.sin_family==AF_INET);
assert(addr->ia.sin_port==555);
assert(addr->ia.sin_addr.s_addr==(uint32_t)0xC0A80304);
cout<<'\t'<<inetAddressToString(addr)<<endl;
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
cout<<"\nPASSED!\n";
@@ -107,12 +93,12 @@ int main(int argc, char *argv[]) {
cout<<"Testing \"intToIPv4Address\""<<endl;
addr = intToIPv4Address(0x7F000001);
assert(addr->ia.sin_family==AF_INET);
cout<<'\t'<<inetAddressToString(addr)<<endl;
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
delete addr;
addr = intToIPv4Address(0x0A0A0C0B);
assert(addr->ia.sin_family==AF_INET);
cout<<'\t'<<inetAddressToString(addr)<<endl;
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
cout<<"\nPASSED!\n";