This commit is contained in:
Gasper Jansa
2010-12-29 10:17:28 +01:00
12 changed files with 676 additions and 110 deletions

View File

@@ -26,7 +26,6 @@
<option id="gnu.cpp.compiler.option.include.paths.1856220007" name="Include paths (-I)" superClass="gnu.cpp.compiler.option.include.paths" valueType="includePath">
<listOptionValue builtIn="false" value="/opt/epics/base/include"/>
<listOptionValue builtIn="false" value="/opt/epics/base/include/os/Linux"/>
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/pvDataCPP/include}&quot;"/>
</option>
<inputType id="cdt.managedbuild.tool.gnu.cpp.compiler.input.1316200261" superClass="cdt.managedbuild.tool.gnu.cpp.compiler.input"/>
</tool>
@@ -45,7 +44,7 @@
</inputType>
</tool>
<tool id="cdt.managedbuild.tool.gnu.assembler.base.898309292" name="GCC Assembler" superClass="cdt.managedbuild.tool.gnu.assembler.base">
<option id="gnu.both.asm.option.include.paths.288017456" name="Include paths (-I)" superClass="gnu.both.asm.option.include.paths" valueType="includePath"/>
<option id="gnu.both.asm.option.include.paths.288017456" name="Include paths (-I)" superClass="gnu.both.asm.option.include.paths"/>
<inputType id="cdt.managedbuild.tool.gnu.assembler.input.342310237" superClass="cdt.managedbuild.tool.gnu.assembler.input"/>
</tool>
</toolChain>
@@ -308,7 +307,6 @@
<buildTargets>
<target name="all" path="" targetID="org.eclipse.cdt.build.MakeTargetBuilder">
<buildCommand>make</buildCommand>
<buildArguments/>
<buildTarget>all</buildTarget>
<stopOnError>true</stopOnError>
<useDefaultCommand>true</useDefaultCommand>
@@ -316,12 +314,19 @@
</target>
<target name="clean" path="" targetID="org.eclipse.cdt.build.MakeTargetBuilder">
<buildCommand>make</buildCommand>
<buildArguments/>
<buildTarget>clean</buildTarget>
<stopOnError>true</stopOnError>
<useDefaultCommand>true</useDefaultCommand>
<runAllBuilders>false</runAllBuilders>
</target>
<target name="uninstall" path="" targetID="org.eclipse.cdt.build.MakeTargetBuilder">
<buildCommand>make</buildCommand>
<buildArguments/>
<buildTarget>uninstall</buildTarget>
<stopOnError>true</stopOnError>
<useDefaultCommand>true</useDefaultCommand>
<runAllBuilders>false</runAllBuilders>
</target>
</buildTargets>
</storageModule>
</cconfiguration>

View File

@@ -39,8 +39,9 @@ LIBSRCS += CreateRequestFactory.cpp
SRC_DIRS += $(PVACCESS)/remote
INC += remote.h
INC += blockingUDPTransport.h
INC += blockingUDP.h
LIBSRCS += blockingUDPTransport.cpp
LIBSRCS += blockingUDPConnector.cpp
LIBRARY = pvAccess

View File

@@ -5,8 +5,8 @@
* Author: Miha Vitorovic
*/
#ifndef BLOCKINGUDPTRANSPORT_H_
#define BLOCKINGUDPTRANSPORT_H_
#ifndef BLOCKINGUDP_H_
#define BLOCKINGUDP_H_
/* pvAccess */
#include "remote.h"
@@ -16,10 +16,12 @@
/* pvData */
#include <noDefaultMethods.h>
#include <byteBuffer.h>
#include <lock.h>
/* EPICSv3 */
#include <osdSock.h>
#include <osiSock.h>
#include <epicsThread.h>
namespace epics {
namespace pvAccess {
@@ -28,18 +30,19 @@ namespace epics {
public Transport,
public TransportSendControl {
public:
BlockingUDPTransport(SOCKET channel, osiSockAddr* bindAddress,
BlockingUDPTransport(ResponseHandler* responseHandler,
SOCKET channel, osiSockAddr* bindAddress,
InetAddrVector* sendAddresses,
short remoteTransportRevision);
virtual ~BlockingUDPTransport();
virtual bool isClosed() const {
return closed;
return _closed;
}
virtual const osiSockAddr* getRemoteAddress() const {
return socketAddress;
return _socketAddress;
}
virtual const String getType() const {
@@ -55,17 +58,10 @@ namespace epics {
}
virtual int getReceiveBufferSize() const {
return receiveBuffer->getSize();
return _receiveBuffer->getSize();
}
virtual int getSocketReceiveBufferSize() const {
// Get value of the SO_RCVBUF option for this DatagramSocket,
// that is the buffer size used by the platform for input on
// this DatagramSocket.
// TODO: real implementation
return MAX_UDP_RECV;
}
virtual int getSocketReceiveBufferSize() const;
virtual int16 getPriority() const {
return CA_DEFAULT_PRIORITY;
@@ -108,7 +104,7 @@ namespace epics {
virtual void close(bool forced);
virtual void ensureData(int size) {
// TODO: implement
// TODO Auto-generated method stub
}
virtual void startMessage(int8 command, int ensureCapacity);
@@ -119,7 +115,7 @@ namespace epics {
}
virtual void setRecipient(const osiSockAddr* sendTo) {
this->sendTo = sendTo;
_sendTo = sendTo;
}
virtual void flushSerializeBuffer() {
@@ -135,7 +131,7 @@ namespace epics {
* @param addresses list of ignored addresses.
*/
void setIgnoredAddresses(InetAddrVector* addresses) {
ignoredAddresses = addresses;
_ignoredAddresses = addresses;
}
/**
@@ -143,15 +139,47 @@ namespace epics {
* @return ignored addresses.
*/
InetAddrVector* getIgnoredAddresses() const {
return ignoredAddresses;
return _ignoredAddresses;
}
bool send(ByteBuffer* buffer, const osiSockAddr* address = NULL);
/**
* Get list of send addresses.
* @return send addresses.
*/
InetAddrVector* getSendAddresses() {
return _sendAddresses;
}
/**
* Get bind address.
* @return bind address.
*/
osiSockAddr* getBindAddress() {
return _bindAddress;
}
/**
* Set list of send addresses.
* @param addresses list of send addresses, non-<code>null</code>.
*/
void setBroadcastAddresses(InetAddrVector* addresses) {
_sendAddresses = addresses;
}
protected:
bool closed;
bool _closed;
/**
* Response handler.
*/
ResponseHandler* _responseHandler;
virtual void processRead();
private:
static void threadRunner(void* param);
bool processBuffer(osiSockAddr* fromAddress,
ByteBuffer* receiveBuffer);
@@ -160,53 +188,102 @@ namespace epics {
/**
* Corresponding channel.
*/
SOCKET channel;
SOCKET _channel;
/**
* Cached socket address.
*/
osiSockAddr* socketAddress;
osiSockAddr* _socketAddress;
/**
* Bind address.
*/
osiSockAddr* bindAddress;
osiSockAddr* _bindAddress;
/**
* Send addresses.
*/
InetAddrVector* sendAddresses;
InetAddrVector* _sendAddresses;
/**
* Ignore addresses.
*/
InetAddrVector* ignoredAddresses;
InetAddrVector* _ignoredAddresses;
const osiSockAddr* sendTo;
const osiSockAddr* _sendTo;
/**
* Receive buffer.
*/
epics::pvData::ByteBuffer* receiveBuffer;
epics::pvData::ByteBuffer* _receiveBuffer;
/**
* Send buffer.
*/
epics::pvData::ByteBuffer* sendBuffer;
epics::pvData::ByteBuffer* _sendBuffer;
/**
* Last message start position.
*/
int lastMessageStartPosition;
int _lastMessageStartPosition;
/**
* Read buffer
*/
char* readBuffer;
char* _readBuffer;
/**
* Used for process sync.
*/
Mutex* _mutex;
/**
* Thread ID
*/
epicsThreadId _threadId;
};
class BlockingUDPConnector : public Connector, NoDefaultMethods {
public:
BlockingUDPConnector(bool reuseSocket,
InetAddrVector* sendAddresses, bool broadcast) :
_sendAddresses(sendAddresses), _reuseSocket(reuseSocket),
_broadcast(broadcast) {
}
virtual ~BlockingUDPConnector() {
// TODO: delete _sendAddresses here?
}
/**
* NOTE: transport client is ignored for broadcast (UDP).
*/
virtual Transport* connect(TransportClient* client,
ResponseHandler* responseHandler, osiSockAddr* bindAddress,
short transportRevision, short priority);
private:
/**
* Send address.
*/
InetAddrVector* _sendAddresses;
/**
* Reuse socket flag.
*/
bool _reuseSocket;
/**
* Broadcast flag.
*/
bool _broadcast;
};
}
}
#endif /* BLOCKINGUDPTRANSPORT_H_ */
#endif /* BLOCKINGUDP_H_ */

View File

@@ -0,0 +1,68 @@
/*
* blockingUDPConnector.cpp
*
* Created on: Dec 27, 2010
* Author: Miha Vitorovic
*/
/* pvAccess */
#include "blockingUDP.h"
#include "remote.h"
/* pvData */
#include <epicsException.h>
/* EPICSv3 */
#include <errlog.h>
/* standard */
#include <sys/types.h>
#include <sys/socket.h>
namespace epics {
namespace pvAccess {
Transport* BlockingUDPConnector::connect(TransportClient* client,
ResponseHandler* responseHandler, osiSockAddr* bindAddress,
short transportRevision, short priority) {
errlogSevPrintf(errlogInfo, "Creating datagram socket to: %s",
inetAddressToString(bindAddress).c_str());
SOCKET socket = ::socket(PF_INET, SOCK_DGRAM, 0);
/* from MSDN:
* Note: If the setsockopt function is called before the bind
* function, TCP/IP options will not be checked by using TCP/IP
* until the bind occurs. In this case, the setsockopt function
* call will always succeed, but the bind function call can fail
* because of an early setsockopt call failing.
*/
int retval = ::bind(socket, (sockaddr*)&(bindAddress->sa),
sizeof(sockaddr));
if(retval<0) {
errlogSevPrintf(errlogMajor, "Error binding socket: %s",
strerror(errno));
THROW_BASE_EXCEPTION(strerror(errno));
}
// set the socket options
int optval = _reuseSocket ? 1 : 0;
retval = ::setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &optval,
sizeof(optval));
if(retval<0) errlogSevPrintf(errlogMajor,
"Error binding socket: %s", strerror(errno));
optval = _broadcast ? 1 : 0;
retval = ::setsockopt(socket, SOL_SOCKET, SO_BROADCAST, &optval,
sizeof(optval));
// sockets are blocking by default
return new BlockingUDPTransport(responseHandler, socket,
bindAddress, _sendAddresses, transportRevision);
}
}
}

View File

@@ -1,27 +1,29 @@
/*
* blockingUDPTransport.cpp
/* blockingUDPTransport.cpp
*
* Created on: Dec 20, 2010
* Author: Miha Vitorovic
*/
/* pvAccess */
#include "blockingUDPTransport.h"
#include "blockingUDP.h"
#include "caConstants.h"
#include "inetAddressUtil.h"
/* pvData */
#include <byteBuffer.h>
#include <lock.h>
/* EPICSv3 */
#include <osdSock.h>
#include <osiSock.h>
#include <errlog.h>
#include <epicsThread.h>
/* standard */
#include <cstdio>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <poll.h>
#include <errno.h>
@@ -30,68 +32,89 @@ namespace epics {
using namespace epics::pvData;
BlockingUDPTransport::BlockingUDPTransport(SOCKET channel,
BlockingUDPTransport::BlockingUDPTransport(
ResponseHandler* responseHandler, SOCKET channel,
osiSockAddr* bindAddress, InetAddrVector* sendAddresses,
short remoteTransportRevision) {
this->channel = channel;
this->bindAddress = bindAddress;
this->sendAddresses = sendAddresses;
socketAddress = bindAddress;
// allocate receive buffer
receiveBuffer = new ByteBuffer(MAX_UDP_RECV);
// allocate send buffer and non-reentrant lock
sendBuffer = new ByteBuffer(MAX_UDP_SEND);
ignoredAddresses = NULL;
sendTo = NULL;
closed = false;
lastMessageStartPosition = 0;
readBuffer = new char[MAX_UDP_RECV];
short remoteTransportRevision) :
_closed(false), _responseHandler(responseHandler),
_channel(channel), _socketAddress(bindAddress),
_bindAddress(bindAddress), _sendAddresses(sendAddresses),
_ignoredAddresses(NULL), _sendTo(NULL), _receiveBuffer(
new ByteBuffer(MAX_UDP_RECV)), _sendBuffer(
new ByteBuffer(MAX_UDP_RECV)),
_lastMessageStartPosition(0), _readBuffer(
new char[MAX_UDP_RECV]), _mutex(new Mutex()),
_threadId(NULL) {
}
BlockingUDPTransport::~BlockingUDPTransport() {
delete receiveBuffer;
delete sendBuffer;
delete readBuffer;
close(true); // close the socket and stop the thread.
delete _receiveBuffer;
delete _sendBuffer;
delete _readBuffer;
delete _mutex;
}
void BlockingUDPTransport::start() {
// TODO implement
String threadName = "UDP-receive "+inetAddressToString(
_socketAddress);
errlogSevPrintf(errlogInfo, "Starting thread: %s",
threadName.c_str());
_threadId = epicsThreadCreate(threadName.c_str(),
epicsThreadPriorityMedium, epicsThreadGetStackSize(
epicsThreadStackMedium),
BlockingUDPTransport::threadRunner, this);
}
void BlockingUDPTransport::close(bool forced) {
if(closed) return;
closed = true;
if(_closed) return;
_closed = true;
if(bindAddress!=NULL) errlogSevPrintf(errlogInfo,
"UDP connection to %s closed.", inetAddressToString(
bindAddress).c_str());
if(_bindAddress!=NULL) errlogSevPrintf(errlogInfo,
"UDP socket %s closed.",
inetAddressToString(_bindAddress).c_str());
// TODO: finish implementation
int retval = ::close(_channel);
if(retval<0) errlogSevPrintf(errlogMajor, "Socket close error: %s",
strerror(errno));
}
void BlockingUDPTransport::enqueueSendRequest(TransportSender* sender) {
// TODO implement
Lock lock(_mutex);
_sendTo = NULL;
_sendBuffer->clear();
sender->lock();
try {
sender->send(_sendBuffer, this);
sender->unlock();
endMessage();
send(_sendBuffer, _sendTo);
} catch(...) {
sender->unlock();
}
}
void BlockingUDPTransport::startMessage(int8 command,
int ensureCapacity) {
lastMessageStartPosition = sendBuffer->getPosition();
sendBuffer->putShort(CA_MAGIC_AND_VERSION);
sendBuffer->putByte(0); // data
sendBuffer->putByte(command); // command
sendBuffer->putInt(0); // temporary zero payload
_lastMessageStartPosition = _sendBuffer->getPosition();
_sendBuffer->putShort(CA_MAGIC_AND_VERSION);
_sendBuffer->putByte(0); // data
_sendBuffer->putByte(command); // command
_sendBuffer->putInt(0); // temporary zero payload
}
void BlockingUDPTransport::endMessage() {
int32 data = lastMessageStartPosition+(16/8+2);
sendBuffer->put((char*)&data, sendBuffer->getPosition()
-lastMessageStartPosition-CA_MESSAGE_HEADER_SIZE,
sizeof(int32));
int oldPosition = _sendBuffer->getPosition();
_sendBuffer->setPosition(_lastMessageStartPosition
+(sizeof(int16)+2));
_sendBuffer->putInt(oldPosition-_lastMessageStartPosition
-CA_MESSAGE_HEADER_SIZE);
_sendBuffer->setPosition(oldPosition);
}
void BlockingUDPTransport::processRead() {
@@ -99,14 +122,14 @@ namespace epics {
// object's own thread.
pollfd pfd;
pfd.fd = channel;
pfd.fd = _channel;
pfd.events = POLLIN;
osiSockAddr fromAddress;
try {
while(!closed) {
while(!_closed) {
// we poll to prevent blocking indefinitely
/* From 'accept' man page:
@@ -118,46 +141,51 @@ namespace epics {
*/
int retval = poll(&pfd, 1, 100);
if(_closed) break; // if the dtor was called during wait
// none of the object properties are no longer valid.
if(retval>0) {
// activity on SOCKET
if(pfd.revents&POLLIN) {
// data ready to be read
receiveBuffer->clear();
_receiveBuffer->clear();
socklen_t addrStructSize = sizeof(sockaddr);
int bytesRead = recvfrom(channel, readBuffer,
int bytesRead = recvfrom(_channel, _readBuffer,
MAX_UDP_RECV, 0, (sockaddr*)&fromAddress,
&addrStructSize);
if(bytesRead>0) {
// successfully got datagram
bool ignore = false;
if(ignoredAddresses!=NULL) for(size_t i = 0; i
<ignoredAddresses->size(); i++)
if(ignoredAddresses->at(i)->ia.sin_addr.s_addr
if(_ignoredAddresses!=NULL) for(size_t i = 0; i
<_ignoredAddresses->size(); i++)
if(_ignoredAddresses->at(i)->ia.sin_addr.s_addr
==fromAddress.ia.sin_addr.s_addr) {
ignore = true;
break;
}
if(!ignore) {
receiveBuffer->put(
readBuffer,
_receiveBuffer->put(
_readBuffer,
0,
bytesRead
<receiveBuffer->getRemaining() ? bytesRead
: receiveBuffer->getRemaining());
<_receiveBuffer->getRemaining() ? bytesRead
: _receiveBuffer->getRemaining());
receiveBuffer->flip();
_receiveBuffer->flip();
processBuffer(&fromAddress, receiveBuffer);
processBuffer(&fromAddress, _receiveBuffer);
}
}
else {
// log a 'recvfrom' error
if(bytesRead==-1) errlogSevPrintf(errlogMajor,
"Socket recv error: %s", strerror(errno));
"Socket recv error: %s",
strerror(errno));
}
}
else {
@@ -182,13 +210,106 @@ namespace epics {
// TODO: catch all exceptions, and act accordingly
close(true);
}
char threadName[40];
epicsThreadGetName(_threadId, threadName, 40);
errlogSevPrintf(errlogInfo, "Thread '%s' exiting", threadName);
}
bool BlockingUDPTransport::processBuffer(osiSockAddr* fromAddress,
ByteBuffer* receiveBuffer) {
// TODO: implement
// handle response(s)
while(receiveBuffer->getRemaining()>=CA_MESSAGE_HEADER_SIZE) {
//
// read header
//
// first byte is CA_MAGIC
// second byte version - major/minor nibble
// check magic and version at once
short magicAndVersion = receiveBuffer->getShort();
if((short)(magicAndVersion&0xFFF0)!=CA_MAGIC_AND_MAJOR_VERSION) return false;
// only data for UDP
receiveBuffer->getByte();
// command ID and paylaod
int8 command = receiveBuffer->getByte();
int payloadSize = receiveBuffer->getInt();
int nextRequestPosition = receiveBuffer->getPosition()
+payloadSize;
// payload size check
if(nextRequestPosition>receiveBuffer->getLimit()) return false;
// handle
_responseHandler->handleResponse(fromAddress, this,
(int8)(magicAndVersion&0xFF), command, payloadSize,
_receiveBuffer);
// set position (e.g. in case handler did not read all)
receiveBuffer->setPosition(nextRequestPosition);
}
//all ok
return true;
}
bool BlockingUDPTransport::send(ByteBuffer* buffer,
const osiSockAddr* address) {
if(address==NULL&&_sendAddresses==NULL) return false;
if(address!=NULL) {
buffer->flip();
int retval =
sendto(_channel, buffer->getArray(),
buffer->getLimit(), 0, &(address->sa),
sizeof(sockaddr));
if(retval<0) {
errlogSevPrintf(errlogMajor, "Socket sendto error: %s",
strerror(errno));
return false;
}
}
else {
for(size_t i = 0; i<_sendAddresses->size(); i++) {
buffer->flip();
int retval = sendto(_channel, buffer->getArray(),
buffer->getLimit(), 0,
&(_sendAddresses->at(i)->sa), sizeof(sockaddr));
{
if(retval<0) errlogSevPrintf(errlogMajor,
"Socket sendto error: %s", strerror(errno));
return false;
}
}
}
return true;
}
int BlockingUDPTransport::getSocketReceiveBufferSize() const {
// Get value of the SO_RCVBUF option for this DatagramSocket,
// that is the buffer size used by the platform for input on
// this DatagramSocket.
int sockBufSize;
socklen_t intLen;
intLen = sizeof(int);
int retval = getsockopt(_channel, SOL_SOCKET, SO_RCVBUF,
&sockBufSize, &intLen);
if(retval<0) errlogSevPrintf(errlogMajor,
"Socket getsockopt SO_RCVBUF error: %s", strerror(errno));
return sockBufSize;
}
void BlockingUDPTransport::threadRunner(void* param) {
((BlockingUDPTransport*)param)->processRead();
}
}
}

View File

@@ -54,7 +54,7 @@ namespace epics {
* NOTE: these limitations allows efficient implementation.
*/
virtual void
send(ByteBuffer* buffer, TransportSendControl* control) =0;
send(ByteBuffer* buffer, TransportSendControl* control) =0;
virtual void lock() =0;
virtual void unlock() =0;
@@ -67,6 +67,9 @@ namespace epics {
*/
class Transport : public DeserializableControl {
public:
virtual ~Transport() {
}
/**
* Get remote address.
* @return remote address.
@@ -182,6 +185,82 @@ namespace epics {
};
/**
* Interface defining response handler.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: ResponseHandler.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
*/
class ResponseHandler {
public:
/**
* Handle response.
* @param[in] responseFrom remote address of the responder, <code>null</code> if unknown.
* @param[in] transport response source transport.
* @param[in] version message version.
* @param[in] payloadSize size of this message data available in the <code>payloadBuffer</code>.
* @param[in] payloadBuffer message payload data.
* Note that this might not be the only message in the buffer.
* Code must not manipulate buffer.
*/
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, ByteBuffer* payloadBuffer) =0;
};
/**
* Client (user) of the transport.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: TransportClient.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
*/
class TransportClient {
public:
/**
* Notification of unresponsive transport (e.g. no heartbeat detected) .
*/
virtual void transportUnresponsive() =0;
/**
* Notification of responsive transport (e.g. heartbeat detected again),
* called to discard <code>transportUnresponsive</code> notification.
* @param transport responsive transport.
*/
virtual void transportResponsive(Transport* transport) =0;
/**
* Notification of network change (server restarted).
*/
virtual void transportChanged() =0;
/**
* Notification of forcefully closed transport.
*/
virtual void transportClosed() =0;
};
/**
* Interface defining socket connector (Connector-Transport pattern).
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: Connector.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
*/
class Connector {
public:
/**
* Connect.
* @param[in] client client requesting connection (transport).
* @param[in] address address of the server.
* @param[in] responseHandler reponse handler.
* @param[in] transportRevision transport revision to be used.
* @param[in] priority process priority.
* @return transport instance.
* @throws ConnectionException
*/
virtual Transport* connect(TransportClient* client, ResponseHandler* responseHandler,
osiSockAddr* address, short transportRevision, short priority) =0;
};
}
}

View File

@@ -214,11 +214,13 @@ namespace epics {
bool displayHex) {
stringstream saddr;
saddr<<(int)((addr->ia.sin_addr.s_addr)>>24)<<'.';
saddr<<((int)((addr->ia.sin_addr.s_addr)>>16)&0xFF)<<'.';
saddr<<((int)((addr->ia.sin_addr.s_addr)>>8)&0xFF)<<'.';
saddr<<((int)(addr->ia.sin_addr.s_addr)&0xFF);
if(addr->ia.sin_port>0) saddr<<":"<<addr->ia.sin_port;
int ipa = ntohl(addr->ia.sin_addr.s_addr);
saddr<<((int)(ipa>>24)&0xFF)<<'.';
saddr<<((int)(ipa>>16)&0xFF)<<'.';
saddr<<((int)(ipa>>8)&0xFF)<<'.';
saddr<<((int)ipa&0xFF);
if(addr->ia.sin_port>0) saddr<<":"<<ntohs(addr->ia.sin_port);
if(displayHex) saddr<<" ("<<hex<<((uint32_t)(
addr->ia.sin_addr.s_addr))<<")";

View File

@@ -5,18 +5,20 @@
#ifndef INTROSPECTIONREGISTRY_H
#define INTROSPECTIONREGISTRY_H
#include <map>
#include <iostream>
#include <lock.h>
#include <pvIntrospect.h>
#include <pvData.h>
#include <byteBuffer.h>
#include <serialize.h>
#include <serializeHelper.h>
#include <status.h>
#include <standardField.h>
#include <epicsMutex.h>
#include "lock.h"
#include "pvIntrospect.h"
#include "pvData.h"
#include "byteBuffer.h"
#include "serialize.h"
#include "serializeHelper.h"
#include "status.h"
#include "standardField.h"
#include <map>
#include <iostream>
using namespace epics::pvData;
using namespace std;

View File

@@ -2,4 +2,5 @@ TOP = ..
include $(TOP)/configure/CONFIG
DIRS += utils
DIRS += client
DIRS += remote
include $(TOP)/configure/RULES_DIRS

17
testApp/remote/Makefile Normal file
View File

@@ -0,0 +1,17 @@
TOP=../..
include $(TOP)/configure/CONFIG
PROD_HOST += testBlockingUDPSrv
testBlockingUDPSrv_SRCS += testBlockingUDPSrv.cpp
testBlockingUDPSrv_LIBS += pvData pvAccess Com
PROD_HOST += testBlockingUDPClnt
testBlockingUDPClnt_SRCS += testBlockingUDPClnt.cpp
testBlockingUDPClnt_LIBS += pvData pvAccess Com
include $(TOP)/configure/RULES
#----------------------------------------
# ADD RULES AFTER THIS LINE

View File

@@ -0,0 +1,98 @@
/*
* testBlockingUDPClnt.cpp
*
* Created on: Dec 28, 2010
* Author: Miha Vitorovic
*/
#include "remote.h"
#include "blockingUDP.h"
#include "logger.h"
#include "inetAddressUtil.h"
#include <osiSock.h>
#include <iostream>
#include <cstdio>
using namespace epics::pvAccess;
using namespace epics::pvData;
using std::cout;
using std::endl;
using std::sscanf;
static osiSockAddr sendTo;
class DummyResponseHandler : public ResponseHandler {
public:
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command, int payloadSize,
ByteBuffer* payloadBuffer) {
}
};
class DummyTransportSender : public TransportSender {
public:
DummyTransportSender() {
for(int i = 0; i<20; i++)
data[i] = (char)(i+1);
count = 0;
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
// set recipient
sendTo.ia.sin_family = AF_INET;
sendTo.ia.sin_port = htons(65000);
if(inet_aton("192.168.71.129",&sendTo.ia.sin_addr)==0) {
cout<<"error in inet_aton()"<<endl;
return;
}
control->setRecipient(&sendTo);
// send the packet
count++;
control->startMessage((int8)(count+0x10), 0);
buffer->put(data, 0, count);
//control->endMessage();
}
virtual void lock() {
}
virtual void unlock() {
}
private:
char data[20];
int count;
};
void testBlockingUDPSender() {
BlockingUDPConnector connector(false, NULL, true);
DummyTransportSender dts;
DummyResponseHandler drh;
osiSockAddr bindAddr;
bindAddr.ia.sin_family = AF_INET;
bindAddr.ia.sin_port = htons(65001);
bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY);
Transport* transport = connector.connect(NULL, &drh, &bindAddr, 1, 50);
cout<<"Sending 10 packets..."<<endl;
for(int i = 0; i<10; i++) {
cout<<" Packet: "<<i+1<<endl;
transport->enqueueSendRequest(&dts);
sleep(1);
}
}
int main(int argc, char *argv[]) {
createFileLogger("testBlockingUDPClnt.log");
testBlockingUDPSender();
return (0);
}

View File

@@ -0,0 +1,95 @@
/*
* blockingUDPTest.cpp
*
* Created on: Dec 28, 2010
* Author: Miha Vitorovic
*/
#include "remote.h"
#include "blockingUDP.h"
#include "logger.h"
#include "inetAddressUtil.h"
#include "hexDump.h"
#include <iostream>
#include <sstream>
using namespace epics::pvAccess;
using std::cout;
using std::endl;
using std::hex;
using std::dec;
class DummyResponseHandler : public ResponseHandler {
public:
DummyResponseHandler() :
packets(0) {
}
int getPackets() {
return packets;
}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command, int payloadSize,
ByteBuffer* payloadBuffer);
private:
int packets;
};
void DummyResponseHandler::handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command, int payloadSize,
ByteBuffer* payloadBuffer) {
std::ostringstream os;
cout<<"Received new UDP datagram["<<packets+1<<"]..."<<endl;
cout<<"From: "<<inetAddressToString(responseFrom)<<endl;
cout<<"Version: 0x"<<hex<<(int)version<<endl;
cout<<"Command: 0x"<<hex<<(int)command<<endl;
cout<<"Payload size: "<<dec<<payloadSize<<endl;
char payload[50];
for(int i = 0; i<payloadSize;) {
int dataCount = payloadSize-i<50 ? payloadSize-i : 50;
payloadBuffer->get(payload, 0, dataCount);
os<<"Payload ("<<i<<"-"<<(dataCount-1)<<")";
hexDump(os.str(), (int8*)payload, dataCount);
i += dataCount;
}
cout<<endl<<endl;
packets++;
}
void testBlockingUDPConnector() {
BlockingUDPConnector connector(false, NULL, true);
DummyResponseHandler drh;
osiSockAddr bindAddr;
bindAddr.ia.sin_family = AF_INET;
bindAddr.ia.sin_port = htons(65000);
bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY);
Transport* transport = connector.connect(NULL, &drh, &bindAddr, 1, 50);
((BlockingUDPTransport*)transport)->start();
cout<<"Waiting for 10 packets..."<<endl;
while(drh.getPackets()<10) {
sleep(1);
}
delete transport;
}
int main(int argc, char *argv[]) {
createFileLogger("testBlockingUDPSrv.log");
testBlockingUDPConnector();
return (0);
}