This commit is contained in:
Gasper Jansa
2011-01-07 18:47:13 +01:00
24 changed files with 259 additions and 187 deletions

View File

@@ -307,13 +307,15 @@
<buildTargets>
<target name="all" path="" targetID="org.eclipse.cdt.build.MakeTargetBuilder">
<buildCommand>make</buildCommand>
<buildArguments/>
<buildTarget>all</buildTarget>
<stopOnError>true</stopOnError>
<useDefaultCommand>true</useDefaultCommand>
<runAllBuilders>true</runAllBuilders>
<runAllBuilders>false</runAllBuilders>
</target>
<target name="clean" path="" targetID="org.eclipse.cdt.build.MakeTargetBuilder">
<buildCommand>make</buildCommand>
<buildArguments/>
<buildTarget>clean</buildTarget>
<stopOnError>true</stopOnError>
<useDefaultCommand>true</useDefaultCommand>
@@ -321,7 +323,6 @@
</target>
<target name="uninstall" path="" targetID="org.eclipse.cdt.build.MakeTargetBuilder">
<buildCommand>make</buildCommand>
<buildArguments/>
<buildTarget>uninstall</buildTarget>
<stopOnError>true</stopOnError>
<useDefaultCommand>true</useDefaultCommand>

View File

@@ -39,6 +39,7 @@ namespace epics {
*1000), _unresponsiveTransport(false), _timerNode(
new TimerNode(this)), _mutex(new Mutex()), _ownersMutex(
new Mutex()), _verifyOrEcho(true) {
_autoDelete = false;
// initialize owners list, send queue
acquire(client);
@@ -188,7 +189,7 @@ namespace epics {
* send verification response message
*/
control->startMessage(1, 2*sizeof(int32)+sizeof(int16));
control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32)+sizeof(int16));
// receive buffer size
buffer->putInt(getReceiveBufferSize());
@@ -205,7 +206,7 @@ namespace epics {
_verifyOrEcho = false;
}
else {
control->startMessage(2, 0);
control->startMessage(CMD_ECHO, 0);
// send immediately
control->flush(true);
}

View File

@@ -33,7 +33,7 @@ namespace epics {
_introspectionRegistry(new IntrospectionRegistry(true)),
_lastChannelSID(0), _channels(
new map<int, ServerChannel*> ()), _channelsMutex(
new Mutex()), _notifyOnClose(NULL) {
new Mutex()) {
// NOTE: priority not yet known, default priority is used to register/unregister
// TODO implement priorities in Reactor... not that user will
// change it.. still getPriority() must return "registered" priority!
@@ -68,7 +68,6 @@ namespace epics {
void BlockingServerTCPTransport::internalClose(bool force) {
BlockingTCPTransport::internalClose(force);
if(_notifyOnClose!=NULL) _notifyOnClose->transportClosed(this);
destroyAllChannels();
}

View File

@@ -37,7 +37,6 @@ namespace epics {
namespace pvAccess {
class MonitorSender;
class BlockingServerTCPTransport;
enum ReceiveStage {
READ_FROM_SOCKET, PROCESS_HEADER, PROCESS_PAYLOAD, NONE
@@ -47,19 +46,6 @@ namespace epics {
IMMEDIATE, DELAYED, USER_CONTROLED
};
class TransportCloseNotification {
public:
virtual ~TransportCloseNotification() {
}
/**
* When transport closes, the owner will be notified through this
* callback
*/
virtual void
transportClosed(BlockingServerTCPTransport* transport) =0;
};
class BlockingTCPTransport : public Transport,
public TransportSendControl {
public:
@@ -67,8 +53,6 @@ namespace epics {
ResponseHandler* responseHandler, int receiveBufferSize,
int16 priority);
virtual ~BlockingTCPTransport();
virtual bool isClosed() const {
return _closed;
}
@@ -133,7 +117,7 @@ namespace epics {
_verified = true;
}
virtual void setRecipient(const osiSockAddr* sendTo) {
virtual void setRecipient(const osiSockAddr& sendTo) {
// noop
}
@@ -253,6 +237,8 @@ namespace epics {
volatile int64 _remoteBufferFreeSpace;
volatile bool _autoDelete;
virtual void processReadCached(bool nestedCall,
ReceiveStage inStage, int requiredBytes, bool addToBuffer);
@@ -271,6 +257,8 @@ namespace epics {
*/
virtual bool send(epics::pvData::ByteBuffer* buffer);
virtual ~BlockingTCPTransport();
private:
/**
* Default marker period.
@@ -352,6 +340,8 @@ namespace epics {
Context* _context;
volatile bool _sendThreadRunning;
/**
* Internal method that clears and releases buffer.
* sendLock and sendBufferLock must be hold while calling this method.
@@ -387,8 +377,6 @@ namespace epics {
TransportClient* client, short remoteTransportRevision,
float beaconInterval, int16 priority);
virtual ~BlockingClientTCPTransport();
virtual void timerStopped() {
// noop
}
@@ -444,6 +432,8 @@ namespace epics {
virtual void internalClose(bool force);
virtual ~BlockingClientTCPTransport();
private:
/**
@@ -502,7 +492,7 @@ namespace epics {
virtual ~BlockingTCPConnector();
virtual Transport* connect(TransportClient* client,
ResponseHandler* responseHandler, osiSockAddr* address,
ResponseHandler* responseHandler, osiSockAddr& address,
short transportRevision, int16 priority);
private:
/**
@@ -538,7 +528,7 @@ namespace epics {
* @return the SOCKET
* @throws IOException
*/
SOCKET tryConnect(osiSockAddr* address, int tries);
SOCKET tryConnect(osiSockAddr& address, int tries);
};
@@ -549,8 +539,6 @@ namespace epics {
BlockingServerTCPTransport(Context* context, SOCKET channel,
ResponseHandler* responseHandler, int receiveBufferSize);
virtual ~BlockingServerTCPTransport();
virtual IntrospectionRegistry* getIntrospectionRegistry() {
return _introspectionRegistry;
}
@@ -638,10 +626,6 @@ namespace epics {
virtual void send(epics::pvData::ByteBuffer* buffer,
TransportSendControl* control);
void addCloseNotification(TransportCloseNotification* notifyTarget) {
_notifyOnClose = notifyTarget;
}
protected:
/**
* Introspection registry.
@@ -650,6 +634,8 @@ namespace epics {
virtual void internalClose(bool force);
virtual ~BlockingServerTCPTransport();
private:
/**
* Last SID cache.
@@ -663,8 +649,6 @@ namespace epics {
Mutex* _channelsMutex;
TransportCloseNotification* _notifyOnClose;
/**
* Destroy all channels.
*/
@@ -676,7 +660,7 @@ namespace epics {
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: BlockingTCPAcceptor.java,v 1.1 2010/05/03 14:45:42 mrkraimer Exp $
*/
class BlockingTCPAcceptor : public TransportCloseNotification {
class BlockingTCPAcceptor {
public:
/**
@@ -705,8 +689,6 @@ namespace epics {
*/
void destroy();
virtual void transportClosed(BlockingServerTCPTransport* transport);
private:
/**
* Context instance.
@@ -735,10 +717,6 @@ namespace epics {
epicsThreadId _threadId;
std::set<BlockingServerTCPTransport*>* _connectedClients;
Mutex* _connectedClientsMutex;
/**
* Initialize connection acception.
* @return port where server is listening

View File

@@ -24,7 +24,6 @@
#include <poll.h>
using std::ostringstream;
using std::set;
namespace epics {
namespace pvAccess {
@@ -33,9 +32,7 @@ namespace epics {
int receiveBufferSize) :
_context(context), _bindAddress(NULL), _serverSocketChannel(
INVALID_SOCKET), _receiveBufferSize(receiveBufferSize),
_destroyed(false), _threadId(NULL), _connectedClients(
new set<BlockingServerTCPTransport*> ()),
_connectedClientsMutex(new Mutex()) {
_destroyed(false), _threadId(NULL) {
initialize(port);
}
@@ -43,22 +40,6 @@ namespace epics {
destroy();
if(_bindAddress!=NULL) delete _bindAddress;
_connectedClientsMutex->lock();
// go through all the connected clients, close them, and destroy
set<BlockingServerTCPTransport*>::iterator it =
_connectedClients->begin();
while(it!=_connectedClients->end()) {
BlockingServerTCPTransport* client = *it;
it++;
client->close(true);
delete client;
}
_connectedClients->clear();
delete _connectedClients;
_connectedClientsMutex->unlock();
delete _connectedClientsMutex;
}
int BlockingTCPAcceptor::initialize(in_port_t port) {
@@ -254,16 +235,9 @@ namespace epics {
errlogInfo,
"Connection to CA client %s failed to be validated, closing it.",
ipAddrStr);
delete transport;
return;
}
// store the new connected client
_connectedClientsMutex->lock();
_connectedClients->insert(transport);
transport->addCloseNotification(this);
_connectedClientsMutex->unlock();
errlogSevPrintf(errlogInfo,
"Serving to CA client: %s", ipAddrStr);
@@ -307,16 +281,5 @@ namespace epics {
}
}
void BlockingTCPAcceptor::transportClosed(
BlockingServerTCPTransport* transport) {
Lock lock(_connectedClientsMutex);
// remove the closed client from the list of connected clients
_connectedClients->erase(transport);
// release the memory
delete transport;
}
}
}

View File

@@ -32,13 +32,13 @@ namespace epics {
delete _namedLocker;
}
SOCKET BlockingTCPConnector::tryConnect(osiSockAddr* address, int tries) {
SOCKET BlockingTCPConnector::tryConnect(osiSockAddr& address, int tries) {
for(int tryCount = 0; tryCount<tries; tryCount++) {
// sleep for a while
if(tryCount>0) epicsThreadSleep(0.1);
char strBuffer[64];
ipAddrToA(&address->ia, strBuffer, sizeof(strBuffer));
ipAddrToA(&address.ia, strBuffer, sizeof(strBuffer));
errlogSevPrintf(errlogInfo,
"Opening socket to CA server %s, attempt %d.",
@@ -53,7 +53,7 @@ namespace epics {
strBuffer);
}
else {
if(::connect(socket, &address->sa, sizeof(sockaddr))==0)
if(::connect(socket, &address.sa, sizeof(sockaddr))==0)
return socket;
else {
epicsSocketConvertErrnoToString(strBuffer,
@@ -67,19 +67,19 @@ namespace epics {
}
Transport* BlockingTCPConnector::connect(TransportClient* client,
ResponseHandler* responseHandler, osiSockAddr* address,
ResponseHandler* responseHandler, osiSockAddr& address,
short transportRevision, int16 priority) {
SOCKET socket = INVALID_SOCKET;
char ipAddrStr[64];
ipAddrToA(&address->ia, ipAddrStr, sizeof(ipAddrStr));
ipAddrToA(&address.ia, ipAddrStr, sizeof(ipAddrStr));
// first try to check cache w/o named lock...
BlockingClientTCPTransport
* transport =
(BlockingClientTCPTransport*)(_context->getTransportRegistry()->get(
"TCP", address, priority));
"TCP", &address, priority));
if(transport!=NULL) {
errlogSevPrintf(errlogInfo,
"Reusing existing connection to CA server: %s",
@@ -88,13 +88,13 @@ namespace epics {
}
bool lockAcquired = _namedLocker->acquireSynchronizationObject(
address, LOCK_TIMEOUT);
&address, LOCK_TIMEOUT);
if(lockAcquired) {
try {
// ... transport created during waiting in lock
transport
= (BlockingClientTCPTransport*)(_context->getTransportRegistry()->get(
"TCP", address, priority));
"TCP", &address, priority));
if(transport!=NULL) {
errlogSevPrintf(errlogInfo,
"Reusing existing connection to CA server: %s",
@@ -156,10 +156,10 @@ namespace epics {
} catch(...) {
// close socket, if open
if(socket!=INVALID_SOCKET) epicsSocketDestroy(socket);
_namedLocker->releaseSynchronizationObject(address);
_namedLocker->releaseSynchronizationObject(&address);
throw;
}
_namedLocker->releaseSynchronizationObject(address);
_namedLocker->releaseSynchronizationObject(&address);
}
else {
ostringstream temp;

View File

@@ -72,22 +72,24 @@ namespace epics {
_priority(priority), _responseHandler(responseHandler),
_totalBytesReceived(0), _totalBytesSent(0),
_markerToSend(0), _verified(false), _remoteBufferFreeSpace(
LONG_LONG_MAX), _markerPeriodBytes(MARKER_PERIOD),
_nextMarkerPosition(_markerPeriodBytes),
_sendPending(false), _lastMessageStartPosition(0), _mutex(
new Mutex()), _sendQueueMutex(new Mutex()),
_verifiedMutex(new Mutex()), _monitorMutex(new Mutex()),
_stage(READ_FROM_SOCKET), _lastSegmentedMessageType(0),
_lastSegmentedMessageCommand(0), _storedPayloadSize(0),
_storedPosition(0), _storedLimit(0), _magicAndVersion(0),
_packetType(0), _command(0), _payloadSize(0),
_flushRequested(false), _sendBufferSentPosition(0),
_flushStrategy(DELAYED), _sendQueue(
LONG_LONG_MAX), _autoDelete(true),
_markerPeriodBytes(MARKER_PERIOD), _nextMarkerPosition(
_markerPeriodBytes), _sendPending(false),
_lastMessageStartPosition(0), _mutex(new Mutex()),
_sendQueueMutex(new Mutex()), _verifiedMutex(new Mutex()),
_monitorMutex(new Mutex()), _stage(READ_FROM_SOCKET),
_lastSegmentedMessageType(0), _lastSegmentedMessageCommand(
0), _storedPayloadSize(0), _storedPosition(0),
_storedLimit(0), _magicAndVersion(0), _packetType(0),
_command(0), _payloadSize(0), _flushRequested(false),
_sendBufferSentPosition(0), _flushStrategy(DELAYED),
_sendQueue(
new GrowingCircularBuffer<TransportSender*> (100)),
_rcvThreadId(NULL), _sendThreadId(NULL), _monitorSendQueue(
new GrowingCircularBuffer<TransportSender*> (100)),
_monitorSender(new MonitorSender(_monitorMutex,
_monitorSendQueue)), _context(context) {
_monitorSendQueue)), _context(context),
_sendThreadRunning(false) {
_socketBuffer = new ByteBuffer(max(MAX_TCP_RECV
+MAX_ENSURE_DATA_BUFFER_SIZE, receiveBufferSize));
@@ -141,6 +143,7 @@ namespace epics {
}
void BlockingTCPTransport::start() {
_sendThreadRunning = true;
String threadName = "TCP-receive "+inetAddressToString(
_socketAddress);
@@ -210,7 +213,10 @@ namespace epics {
void BlockingTCPTransport::internalClose(bool force) {
// close the socket
epicsSocketDestroy(_channel);
if(_channel!=INVALID_SOCKET) {
epicsSocketDestroy(_channel);
_channel = INVALID_SOCKET;
}
}
int BlockingTCPTransport::getSocketReceiveBufferSize() const {
@@ -462,11 +468,11 @@ namespace epics {
maxToRead, 0);
_socketBuffer->put(readBuffer, 0, bytesRead);
if(bytesRead<0) {
if(bytesRead<=0) {
// error (disconnect, end-of-stream) detected
close(true);
if(nestedCall) THROW_BASE_EXCEPTION(
if(bytesRead<0&&nestedCall) THROW_BASE_EXCEPTION(
"bytesRead < 0");
return;
@@ -834,12 +840,23 @@ namespace epics {
errlogSevPrintf(errlogInfo, "Connection to %s closed.",
inetAddressToString(_socketAddress).c_str());
epicsSocketDestroy(_channel);
if(_channel!=INVALID_SOCKET) {
epicsSocketDestroy(_channel);
_channel = INVALID_SOCKET;
}
}
void BlockingTCPTransport::rcvThreadRunner(void* param) {
((BlockingTCPTransport*)param)->processReadCached(false, NONE,
CA_MESSAGE_HEADER_SIZE, false);
BlockingTCPTransport* obj = (BlockingTCPTransport*)param;
obj->processReadCached(false, NONE, CA_MESSAGE_HEADER_SIZE, false);
if(obj->_autoDelete) {
while(obj->_sendThreadRunning)
epicsThreadSleep(0.1);
delete obj;
}
}
void BlockingTCPTransport::sendThreadRunner(void* param) {
@@ -848,15 +865,19 @@ namespace epics {
obj->processSendQueue();
obj->freeConnectionResorces();
obj->_sendThreadRunning = false;
}
void BlockingTCPTransport::enqueueSendRequest(TransportSender* sender) {
if(_closed) return;
Lock lock(_sendQueueMutex);
_sendQueue->insert(sender);
}
void BlockingTCPTransport::enqueueMonitorSendRequest(
TransportSender* sender) {
if(_closed) return;
Lock lock(_monitorMutex);
_monitorSendQueue->insert(sender);
if(_monitorSendQueue->size()==1) enqueueSendRequest(_monitorSender);

View File

@@ -32,7 +32,7 @@ namespace epics {
public TransportSendControl {
public:
BlockingUDPTransport(ResponseHandler* responseHandler,
SOCKET channel, osiSockAddr* bindAddress,
SOCKET channel, osiSockAddr& bindAddress,
InetAddrVector* sendAddresses,
short remoteTransportRevision);
@@ -107,8 +107,10 @@ namespace epics {
// noop since all UDP requests are sent immediately
}
virtual void setRecipient(const osiSockAddr* sendTo) {
_sendTo = sendTo;
virtual void setRecipient(const osiSockAddr& sendTo) {
if(_sendTo!=NULL) delete _sendTo;
_sendTo = new osiSockAddr;
memcpy(_sendTo, &sendTo, sizeof(osiSockAddr));
}
virtual void flushSerializeBuffer() {
@@ -135,7 +137,9 @@ namespace epics {
return _ignoredAddresses;
}
bool send(ByteBuffer* buffer, const osiSockAddr* address = NULL);
bool send(ByteBuffer* buffer, const osiSockAddr& address);
bool send(ByteBuffer* buffer);
/**
* Get list of send addresses.
@@ -149,7 +153,7 @@ namespace epics {
* Get bind address.
* @return bind address.
*/
osiSockAddr* getBindAddress() {
const osiSockAddr* getBindAddress() const {
return _bindAddress;
}
@@ -177,7 +181,7 @@ namespace epics {
private:
static void threadRunner(void* param);
bool processBuffer(osiSockAddr* fromAddress,
bool processBuffer(osiSockAddr& fromAddress,
epics::pvData::ByteBuffer* receiveBuffer);
// Context only used for logging in this class
@@ -207,7 +211,7 @@ namespace epics {
*/
InetAddrVector* _ignoredAddresses;
const osiSockAddr* _sendTo;
osiSockAddr* _sendTo;
/**
* Receive buffer.
@@ -259,7 +263,7 @@ namespace epics {
* NOTE: transport client is ignored for broadcast (UDP).
*/
virtual Transport* connect(TransportClient* client,
ResponseHandler* responseHandler, osiSockAddr* bindAddress,
ResponseHandler* responseHandler, osiSockAddr& bindAddress,
short transportRevision, int16 priority);
private:

View File

@@ -24,10 +24,10 @@ namespace epics {
namespace pvAccess {
Transport* BlockingUDPConnector::connect(TransportClient* client,
ResponseHandler* responseHandler, osiSockAddr* bindAddress,
ResponseHandler* responseHandler, osiSockAddr& bindAddress,
short transportRevision, int16 priority) {
errlogSevPrintf(errlogInfo, "Creating datagram socket to: %s",
inetAddressToString(bindAddress).c_str());
inetAddressToString(&bindAddress).c_str());
SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if(socket==INVALID_SOCKET) {
@@ -45,7 +45,7 @@ namespace epics {
* because of an early setsockopt call failing.
*/
int retval = ::bind(socket, (sockaddr*)&(bindAddress->sa),
int retval = ::bind(socket, (sockaddr*)&(bindAddress.sa),
sizeof(sockaddr));
if(retval<0) {
errlogSevPrintf(errlogMajor, "Error binding socket: %s",

View File

@@ -34,21 +34,28 @@ namespace epics {
BlockingUDPTransport::BlockingUDPTransport(
ResponseHandler* responseHandler, SOCKET channel,
osiSockAddr* bindAddress, InetAddrVector* sendAddresses,
osiSockAddr& bindAddress, InetAddrVector* sendAddresses,
short remoteTransportRevision) :
_closed(false), _responseHandler(responseHandler),
_channel(channel), _socketAddress(bindAddress),
_bindAddress(bindAddress), _sendAddresses(sendAddresses),
_channel(channel), _sendAddresses(sendAddresses),
_ignoredAddresses(NULL), _sendTo(NULL), _receiveBuffer(
new ByteBuffer(MAX_UDP_RECV)), _sendBuffer(
new ByteBuffer(MAX_UDP_RECV)),
_lastMessageStartPosition(0), _readBuffer(
new char[MAX_UDP_RECV]), _mutex(new Mutex()),
_threadId(NULL) {
_socketAddress = new osiSockAddr;
memcpy(_socketAddress, &bindAddress, sizeof(osiSockAddr));
_bindAddress = _socketAddress;
}
BlockingUDPTransport::~BlockingUDPTransport() {
close(true); // close the socket and stop the thread.
if(_sendTo!=NULL) delete _sendTo;
delete _socketAddress;
// _bindAddress equals _socketAddress
delete _receiveBuffer;
delete _sendBuffer;
delete[] _readBuffer;
@@ -89,7 +96,10 @@ namespace epics {
sender->send(_sendBuffer, this);
sender->unlock();
endMessage();
send(_sendBuffer, _sendTo);
if(_sendTo==NULL)
send(_sendBuffer);
else
send(_sendBuffer, *_sendTo);
} catch(...) {
sender->unlock();
}
@@ -172,7 +182,7 @@ namespace epics {
_receiveBuffer->flip();
processBuffer(&fromAddress, _receiveBuffer);
processBuffer(fromAddress, _receiveBuffer);
}
}
else {
@@ -210,7 +220,7 @@ namespace epics {
errlogSevPrintf(errlogInfo, "Thread '%s' exiting", threadName);
}
bool BlockingUDPTransport::processBuffer(osiSockAddr* fromAddress,
bool BlockingUDPTransport::processBuffer(osiSockAddr& fromAddress,
ByteBuffer* receiveBuffer) {
// handle response(s)
@@ -238,7 +248,7 @@ namespace epics {
if(nextRequestPosition>receiveBuffer->getLimit()) return false;
// handle
_responseHandler->handleResponse(fromAddress, this,
_responseHandler->handleResponse(&fromAddress, this,
(int8)(magicAndVersion&0xFF), command, payloadSize,
_receiveBuffer);
@@ -251,32 +261,32 @@ namespace epics {
}
bool BlockingUDPTransport::send(ByteBuffer* buffer,
const osiSockAddr* address) {
if(address==NULL&&_sendAddresses==NULL) return false;
const osiSockAddr& address) {
if(address!=NULL) {
buffer->flip();
int retval =
sendto(_channel, buffer->getArray(),
buffer->getLimit(), 0, &(address->sa),
sizeof(sockaddr));
if(retval<0) {
errlogSevPrintf(errlogMajor, "Socket sendto error: %s",
strerror(errno));
return false;
}
buffer->flip();
int retval = sendto(_channel, buffer->getArray(),
buffer->getLimit(), 0, &(address.sa), sizeof(sockaddr));
if(retval<0) {
errlogSevPrintf(errlogMajor, "Socket sendto error: %s",
strerror(errno));
return false;
}
else {
for(size_t i = 0; i<_sendAddresses->size(); i++) {
buffer->flip();
int retval = sendto(_channel, buffer->getArray(),
buffer->getLimit(), 0,
&(_sendAddresses->at(i)->sa), sizeof(sockaddr));
{
if(retval<0) errlogSevPrintf(errlogMajor,
"Socket sendto error: %s", strerror(errno));
return false;
}
return true;
}
bool BlockingUDPTransport::send(ByteBuffer* buffer) {
if(_sendAddresses==NULL) return false;
for(size_t i = 0; i<_sendAddresses->size(); i++) {
buffer->flip();
int retval = sendto(_channel, buffer->getArray(),
buffer->getLimit(), 0, &(_sendAddresses->at(i)->sa),
sizeof(sockaddr));
{
if(retval<0) errlogSevPrintf(errlogMajor,
"Socket sendto error: %s", strerror(errno));
return false;
}
}

View File

@@ -418,7 +418,7 @@ class MockTransportSendControl: public TransportSendControl
public:
void endMessage() {}
void flush(bool lastMessageCompleted) {}
void setRecipient(const osiSockAddr* sendTo) {}
void setRecipient(const osiSockAddr& sendTo) {}
void startMessage(int8 command, int32 ensureCapacity) {}
void ensureBuffer(int32 size) {}
void flushSerializeBuffer() {}

View File

@@ -54,7 +54,7 @@ namespace epics {
virtual void flush(bool lastMessageCompleted) =0;
virtual void setRecipient(const osiSockAddr* sendTo) =0;
virtual void setRecipient(const osiSockAddr& sendTo) =0;
};
/**
@@ -326,7 +326,7 @@ namespace epics {
* @throws ConnectionException
*/
virtual Transport* connect(TransportClient* client,
ResponseHandler* responseHandler, osiSockAddr* address,
ResponseHandler* responseHandler, osiSockAddr& address,
short transportRevision, int16 priority) =0;
};

View File

@@ -32,8 +32,8 @@ namespace epics {
ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr));
ostringstream prologue;
prologue<<"Message ["<<command<<", v"<<hex<<version;
prologue<<"] received from "<<ipAddrStr;
prologue<<"Message [0x"<<hex<<(int)command<<", v0x"<<hex;
prologue<<(int)version<<"] received from "<<ipAddrStr;
hexDump(prologue.str(), _description,
(const int8*)payloadBuffer->getArray(),
@@ -63,9 +63,9 @@ namespace epics {
_handlerTable = new ResponseHandler*[HANDLER_TABLE_LENGTH];
// TODO add real handlers, as they are developed
_handlerTable[0] = badResponse;
_handlerTable[0] = new NoopResponse(_context, "Beacon");
_handlerTable[1] = new ConnectionValidationHandler(_context);
_handlerTable[2] = badResponse;
_handlerTable[2] = new EchoHandler(_context);
_handlerTable[3] = badResponse;
_handlerTable[4] = badResponse;
_handlerTable[5] = badResponse;
@@ -96,6 +96,8 @@ namespace epics {
ServerResponseHandler::~ServerResponseHandler() {
delete _handlerTable[0];
delete _handlerTable[1];
delete _handlerTable[2];
delete _handlerTable[27];
delete[] _handlerTable;
}
@@ -137,5 +139,42 @@ namespace epics {
//transport.setPriority(payloadBuffer.getShort());
}
class EchoTransportSender : public TransportSender {
public:
EchoTransportSender(osiSockAddr* echoFrom) {
memcpy(&_echoFrom, echoFrom, sizeof(osiSockAddr));
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
control->startMessage(CMD_ECHO, 0);
control->setRecipient(_echoFrom);
}
virtual void lock() {
}
virtual void unlock() {
delete this;
}
private:
osiSockAddr _echoFrom;
virtual ~EchoTransportSender() {
}
};
void EchoHandler::handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) {
AbstractServerResponseHandler::handleResponse(responseFrom,
transport, version, command, payloadSize, payloadBuffer);
EchoTransportSender* echoReply = new EchoTransportSender(
responseFrom);
// send back
transport->enqueueSendRequest(echoReply);
}
}
}

View File

@@ -103,6 +103,41 @@ namespace epics {
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer);
};
/**
* NOOP response.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: NoopResponse.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
*/
class NoopResponse : public AbstractServerResponseHandler {
public:
/**
* @param context
* @param description
*/
NoopResponse(ServerContextImpl* context, String description) :
AbstractServerResponseHandler(context, description) {
}
};
/**
* Echo request handler.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: EchoHandler.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
*/
class EchoHandler : public AbstractServerResponseHandler {
public:
/**
* @param context
*/
EchoHandler(ServerContextImpl* context) :
AbstractServerResponseHandler(context, "Echo request") {
}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer);
};
}
}

View File

@@ -11,6 +11,9 @@
#include <sstream>
using namespace epics::pvData;
using std::stringstream;
using std::endl;
using std::cout;
namespace epics {
namespace pvAccess {
@@ -29,9 +32,9 @@ namespace epics {
void hexDump(const String prologue, const String name, const int8 *bs,
int start, int len) {
std::stringstream header;
stringstream header;
header<<prologue<<std::endl<<"Hexdump ["<<name<<"] size = "<<len;
header<<prologue<<endl<<"Hexdump ["<<name<<"] size = "<<len;
String out(header.str());
@@ -70,7 +73,7 @@ namespace epics {
}
out += chars;
std::cout<<out;
cout<<out<<endl;
}
/**

View File

@@ -115,7 +115,7 @@ namespace epics {
// next 16-bits are 1
buffer->putShort(0xFFFF);
// following IPv4 address in big-endian (network) byte order
in_addr_t ipv4Addr = address->ia.sin_addr.s_addr;
in_addr_t ipv4Addr = ntohl(address->ia.sin_addr.s_addr);
buffer->putByte((int8)((ipv4Addr>>24)&0xFF));
buffer->putByte((int8)((ipv4Addr>>16)&0xFF));
buffer->putByte((int8)((ipv4Addr>>8)&0xFF));
@@ -166,7 +166,7 @@ namespace epics {
retAddr <<= 8;
retAddr |= byte;
return retAddr;
return htonl(retAddr);
}
InetAddrVector* getSocketAddressList(String list, int defaultPort,
@@ -198,7 +198,7 @@ namespace epics {
}
const String inetAddressToString(const osiSockAddr *addr,
bool displayHex) {
bool displayPort, bool displayHex) {
stringstream saddr;
int ipa = ntohl(addr->ia.sin_addr.s_addr);
@@ -207,9 +207,9 @@ namespace epics {
saddr<<((int)(ipa>>16)&0xFF)<<'.';
saddr<<((int)(ipa>>8)&0xFF)<<'.';
saddr<<((int)ipa&0xFF);
if(addr->ia.sin_port>0) saddr<<":"<<ntohs(addr->ia.sin_port);
if(displayHex) saddr<<" ("<<hex
<<ntohl(addr->ia.sin_addr.s_addr)<<")";
if(displayPort) saddr<<":"<<ntohs(addr->ia.sin_port);
if(displayHex) saddr<<" ("<<hex<<ntohl(addr->ia.sin_addr.s_addr)
<<")";
return saddr.str();
}

View File

@@ -71,7 +71,7 @@ namespace epics {
const InetAddrVector* appendList = NULL);
const String inetAddressToString(const osiSockAddr *addr,
bool displayHex = false);
bool displayPort = true, bool displayHex = false);
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */

View File

@@ -26,6 +26,15 @@ PROD_HOST += testChannelSearchManager
testChannelSearchManager_SRCS += testChannelSearchManager.cpp
testChannelSearchManager_LIBS += pvData pvAccess Com
PROD_HOST += testBlockingTCPSrv
testBlockingTCPSrv_SRCS += testBlockingTCPSrv.cpp
testBlockingTCPSrv_LIBS += pvData pvAccess Com
PROD_HOST += testBlockingTCPClnt
testBlockingTCPClnt_SRCS += testBlockingTCPClnt.cpp
testBlockingTCPClnt_LIBS += pvData pvAccess Com
include $(TOP)/configure/RULES
#----------------------------------------
# ADD RULES AFTER THIS LINE

View File

@@ -53,7 +53,7 @@ void testBeaconEmitter()
bindAddr.ia.sin_family = AF_INET;
bindAddr.ia.sin_port = htons(5066);
bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY);
Transport* transport = connector.connect(NULL, &drh, &bindAddr, 1, 50);
Transport* transport = connector.connect(NULL, &drh, bindAddr, 1, 50);
cout<<"Sending beacons"<<endl;
BeaconEmitter beaconEmitter(transport, transport->getRemoteAddress());

View File

@@ -112,7 +112,7 @@ void testBeaconHandler()
bindAddr.ia.sin_family = AF_INET;
bindAddr.ia.sin_port = htons(5067);
bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY);
Transport* transport = connector.connect(NULL, &brh, &bindAddr, 1, 50);
Transport* transport = connector.connect(NULL, &brh, bindAddr, 1, 50);
(static_cast<BlockingUDPTransport*>(transport))->start();
while(1) sleep(1);

View File

@@ -29,14 +29,19 @@ using std::sscanf;
class ContextImpl : public Context {
public:
ContextImpl() :
_tr(new TransportRegistry()),
_timer(new Timer("client thread", lowPriority)) {}
_tr(new TransportRegistry()), _timer(new Timer("client thread",
lowPriority)) {
}
virtual ~ContextImpl() {
delete _tr;
delete _timer;
}
virtual Timer* getTimer() { return _timer; }
virtual TransportRegistry* getTransportRegistry() { return _tr; }
virtual Timer* getTimer() {
return _timer;
}
virtual TransportRegistry* getTransportRegistry() {
return _tr;
}
private:
TransportRegistry* _tr;
Timer* _timer;
@@ -47,6 +52,8 @@ public:
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command, int payloadSize,
ByteBuffer* payloadBuffer) {
if(command==CMD_CONNECTION_VALIDATION) transport->verified();
}
};
@@ -105,21 +112,23 @@ void testBlockingTCPSender() {
osiSockAddr srvAddr;
srvAddr.ia.sin_family = AF_INET;
//srvAddr.ia.sin_port = htons(CA_SERVER_PORT);
//srvAddr.ia.sin_family = AF_INET;
if(aToIPAddr("192.168.71.132", CA_SERVER_PORT, &srvAddr.ia)<0) {
cout<<"error in aToIPAddr(...)"<<endl;
return;
}
Transport* transport = connector.connect(&dtc, &drh, &srvAddr,
Transport* transport = connector.connect(&dtc, &drh, srvAddr,
CA_MAGIC_AND_VERSION, CA_DEFAULT_PRIORITY);
cout<<"Sending 10 messages..."<<endl;
for(int i = 0; i<10; i++) {
cout<<" Message: "<<i+1<<endl;
transport->enqueueSendRequest(&dts);
if(!transport->isClosed())
transport->enqueueSendRequest(&dts);
else
break;
sleep(1);
}

View File

@@ -15,6 +15,8 @@
#include <iostream>
#include <cstdio>
#define SRV_IP "192.168.71.132"
using namespace epics::pvAccess;
using namespace epics::pvData;
@@ -41,15 +43,13 @@ public:
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
// set recipient
sendTo.ia.sin_family = AF_INET;
sendTo.ia.sin_port = htons(65000);
if(aToIPAddr("192.168.71.129", 65000, &sendTo.ia)<0) {
// SRV_IP defined at the top of the this file
if(aToIPAddr(SRV_IP, 65000, &sendTo.ia)<0) {
cout<<"error in aToIPAddr(...)"<<endl;
return;
}
control->setRecipient(&sendTo);
control->setRecipient(sendTo);
// send the packet
count++;
@@ -79,7 +79,7 @@ void testBlockingUDPSender() {
bindAddr.ia.sin_port = htons(65001);
bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY);
Transport* transport = connector.connect(NULL, &drh, &bindAddr, 1, 50);
Transport* transport = connector.connect(NULL, &drh, bindAddr, 1, 50);
cout<<"Sending 10 packets..."<<endl;

View File

@@ -80,7 +80,7 @@ void testBlockingUDPConnector() {
bindAddr.ia.sin_port = htons(65000);
bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY);
Transport* transport = connector.connect(NULL, &drh, &bindAddr, 1, 50);
Transport* transport = connector.connect(NULL, &drh, bindAddr, 1, 50);
((BlockingUDPTransport*)transport)->start();

View File

@@ -63,7 +63,7 @@ int main(int argc, char *argv[]) {
assert(addr->ia.sin_port==htons(6789));
assert(addr->ia.sin_addr.s_addr==htonl(0xAC1037A0));
assert(inetAddressToString(addr)=="172.16.55.160:6789");
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
addr = vec1->at(1);
assert(addr->ia.sin_family==AF_INET);
@@ -100,13 +100,13 @@ cout<<'\t'<<inetAddressToString(addr, true)<<endl;
cout<<"Testing \"intToIPv4Address\""<<endl;
addr = intToIPv4Address(0x7F000001);
assert(addr->ia.sin_family==AF_INET);
assert(inetAddressToString(addr)=="127.0.0.1");
assert(inetAddressToString(addr)=="127.0.0.1:0");
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
delete addr;
addr = intToIPv4Address(0x0A0A0C0B);
assert(addr->ia.sin_family==AF_INET);
assert(inetAddressToString(addr)=="10.10.12.11");
assert(inetAddressToString(addr)=="10.10.12.11:0");
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
cout<<"\nPASSED!\n";