A working blockingUDPTransport with test.

TODO: debug stopping listener thread.
This commit is contained in:
miha_vitorovic
2010-12-28 15:47:05 +01:00
parent 58f03384c2
commit fe5ea9442c
7 changed files with 232 additions and 16 deletions

View File

@@ -73,8 +73,8 @@ namespace epics {
_closed = true;
if(_bindAddress!=NULL) errlogSevPrintf(errlogInfo,
"UDP socket %s closed.", inetAddressToString(
_bindAddress).c_str());
"UDP socket %s closed.",
inetAddressToString(_bindAddress).c_str());
int retval = ::close(_channel);
@@ -92,7 +92,7 @@ namespace epics {
sender->send(_sendBuffer, this);
sender->unlock();
endMessage();
if(_sendTo!=NULL) send(_sendBuffer, _sendTo);
send(_sendBuffer, _sendTo);
} catch(...) {
sender->unlock();
}
@@ -108,10 +108,13 @@ namespace epics {
}
void BlockingUDPTransport::endMessage() {
int32 data = _lastMessageStartPosition+(16/8+2);
_sendBuffer->put((char*)&data, _sendBuffer->getPosition()
-_lastMessageStartPosition-CA_MESSAGE_HEADER_SIZE,
sizeof(int32));
int oldPosition = _sendBuffer->getPosition();
_sendBuffer->setPosition(_lastMessageStartPosition
+(sizeof(int16)+2));
_sendBuffer->putInt(oldPosition-_lastMessageStartPosition
-CA_MESSAGE_HEADER_SIZE);
_sendBuffer->setPosition(oldPosition);
}
void BlockingUDPTransport::processRead() {
@@ -157,7 +160,7 @@ namespace epics {
if(bytesRead>0) {
// successfully got datagram
bool ignore = false;
if(_ignoredAddresses!=NULL) for(int i = 0; i
if(_ignoredAddresses!=NULL) for(size_t i = 0; i
<_ignoredAddresses->size(); i++)
if(_ignoredAddresses->at(i)->ia.sin_addr.s_addr
==fromAddress.ia.sin_addr.s_addr) {
@@ -255,7 +258,7 @@ namespace epics {
bool BlockingUDPTransport::send(ByteBuffer* buffer,
const osiSockAddr* address) {
if(address==NULL||_sendAddresses==NULL) return false;
if(address==NULL&&_sendAddresses==NULL) return false;
if(address!=NULL) {
buffer->flip();
@@ -270,7 +273,7 @@ namespace epics {
}
}
else {
for(int i = 0; i<_sendAddresses->size(); i++) {
for(size_t i = 0; i<_sendAddresses->size(); i++) {
buffer->flip();
int retval = sendto(_channel, buffer->getArray(),
buffer->getLimit(), 0,

View File

@@ -197,7 +197,7 @@ namespace epics {
* @param[in] payloadSize size of this message data available in the <code>payloadBuffer</code>.
* @param[in] payloadBuffer message payload data.
* Note that this might not be the only message in the buffer.
* Code must not manilupate buffer.
* Code must not manipulate buffer.
*/
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,

View File

@@ -214,11 +214,13 @@ namespace epics {
bool displayHex) {
stringstream saddr;
saddr<<(int)((addr->ia.sin_addr.s_addr)>>24)<<'.';
saddr<<((int)((addr->ia.sin_addr.s_addr)>>16)&0xFF)<<'.';
saddr<<((int)((addr->ia.sin_addr.s_addr)>>8)&0xFF)<<'.';
saddr<<((int)(addr->ia.sin_addr.s_addr)&0xFF);
if(addr->ia.sin_port>0) saddr<<":"<<addr->ia.sin_port;
int ipa = ntohl(addr->ia.sin_addr.s_addr);
saddr<<((int)(ipa>>24)&0xFF)<<'.';
saddr<<((int)(ipa>>16)&0xFF)<<'.';
saddr<<((int)(ipa>>8)&0xFF)<<'.';
saddr<<((int)ipa&0xFF);
if(addr->ia.sin_port>0) saddr<<":"<<ntohs(addr->ia.sin_port);
if(displayHex) saddr<<" ("<<hex<<((uint32_t)(
addr->ia.sin_addr.s_addr))<<")";

View File

@@ -2,4 +2,5 @@ TOP = ..
include $(TOP)/configure/CONFIG
DIRS += utils
DIRS += client
DIRS += remote
include $(TOP)/configure/RULES_DIRS

17
testApp/remote/Makefile Normal file
View File

@@ -0,0 +1,17 @@
TOP=../..
include $(TOP)/configure/CONFIG
PROD_HOST += testBlockingUDPSrv
testBlockingUDPSrv_SRCS += testBlockingUDPSrv.cpp
testBlockingUDPSrv_LIBS += pvData pvAccess Com
PROD_HOST += testBlockingUDPClnt
testBlockingUDPClnt_SRCS += testBlockingUDPClnt.cpp
testBlockingUDPClnt_LIBS += pvData pvAccess Com
include $(TOP)/configure/RULES
#----------------------------------------
# ADD RULES AFTER THIS LINE

View File

@@ -0,0 +1,98 @@
/*
* testBlockingUDPClnt.cpp
*
* Created on: Dec 28, 2010
* Author: Miha Vitorovic
*/
#include "remote.h"
#include "blockingUDP.h"
#include "logger.h"
#include "inetAddressUtil.h"
#include <osiSock.h>
#include <iostream>
#include <cstdio>
using namespace epics::pvAccess;
using namespace epics::pvData;
using std::cout;
using std::endl;
using std::sscanf;
static osiSockAddr sendTo;
class DummyResponseHandler : public ResponseHandler {
public:
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command, int payloadSize,
ByteBuffer* payloadBuffer) {
}
};
class DummyTransportSender : public TransportSender {
public:
DummyTransportSender() {
for(int i = 0; i<20; i++)
data[i] = (char)(i+1);
count = 0;
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
// set recipient
sendTo.ia.sin_family = AF_INET;
sendTo.ia.sin_port = htons(65000);
if(inet_aton("192.168.71.129",&sendTo.ia.sin_addr)==0) {
cout<<"error in inet_aton()"<<endl;
return;
}
control->setRecipient(&sendTo);
// send the packet
count++;
control->startMessage((int8)(count+0x10), 0);
buffer->put(data, 0, count);
//control->endMessage();
}
virtual void lock() {
}
virtual void unlock() {
}
private:
char data[20];
int count;
};
void testBlockingUDPSender() {
BlockingUDPConnector connector(false, NULL, true);
DummyTransportSender dts;
DummyResponseHandler drh;
osiSockAddr bindAddr;
bindAddr.ia.sin_family = AF_INET;
bindAddr.ia.sin_port = htons(65001);
bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY);
Transport* transport = connector.connect(NULL, &drh, &bindAddr, 1, 50);
cout<<"Sending 10 packets..."<<endl;
for(int i = 0; i<10; i++) {
cout<<" Packet: "<<i+1<<endl;
transport->enqueueSendRequest(&dts);
sleep(1);
}
}
int main(int argc, char *argv[]) {
createFileLogger("testBlockingUDPClnt.log");
testBlockingUDPSender();
return (0);
}

View File

@@ -0,0 +1,95 @@
/*
* blockingUDPTest.cpp
*
* Created on: Dec 28, 2010
* Author: Miha Vitorovic
*/
#include "remote.h"
#include "blockingUDP.h"
#include "logger.h"
#include "inetAddressUtil.h"
#include "hexDump.h"
#include <iostream>
#include <sstream>
using namespace epics::pvAccess;
using std::cout;
using std::endl;
using std::hex;
using std::dec;
class DummyResponseHandler : public ResponseHandler {
public:
DummyResponseHandler() :
packets(0) {
}
int getPackets() {
return packets;
}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command, int payloadSize,
ByteBuffer* payloadBuffer);
private:
int packets;
};
void DummyResponseHandler::handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command, int payloadSize,
ByteBuffer* payloadBuffer) {
packets++;
std::ostringstream os;
cout<<"Received new UDP datagram["<<packets<<"]..."<<endl;
cout<<"From: "<<inetAddressToString(responseFrom)<<endl;
cout<<"Version: 0x"<<hex<<(int)version<<endl;
cout<<"Command: 0x"<<hex<<(int)command<<endl;
cout<<"Payload size: "<<dec<<payloadSize<<endl;
char payload[50];
for(int i = 0; i<payloadSize;) {
int dataCount = payloadSize-i<50 ? payloadSize-i : 50;
payloadBuffer->get(payload, 0, dataCount);
os<<"Payload ("<<i<<"-"<<(dataCount-1)<<")";
hexDump(os.str(), (int8*)payload, dataCount);
i += dataCount;
}
cout<<endl<<endl;
}
void testBlockingUDPConnector() {
BlockingUDPConnector connector(false, NULL, true);
DummyResponseHandler drh;
osiSockAddr bindAddr;
bindAddr.ia.sin_family = AF_INET;
bindAddr.ia.sin_port = htons(65000);
bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY);
Transport* transport = connector.connect(NULL, &drh, &bindAddr, 1, 50);
((BlockingUDPTransport*)transport)->start();
cout<<"Waiting for 10 packets..."<<endl;
while(drh.getPackets()<10) {
sleep(1);
}
}
int main(int argc, char *argv[]) {
createFileLogger("testBlockingUDPSrv.log");
testBlockingUDPConnector();
return (0);
}