This commit is contained in:
Gasper Jansa
2011-01-03 14:47:42 +01:00
10 changed files with 54 additions and 59 deletions

View File

@@ -12,6 +12,7 @@
#include "caConstants.h"
#include "remote.h"
#include "growingCircularBuffer.h"
#include "transportRegistry.h"
/* pvData */
#include <byteBuffer.h>
@@ -23,8 +24,6 @@
#include <osdSock.h>
#include <osiSock.h>
using namespace epics::pvData;
namespace epics {
namespace pvAccess {
@@ -43,7 +42,7 @@ namespace epics {
public:
BlockingTCPTransport(SOCKET channel,
ResponseHandler* responseHandler, int receiveBufferSize,
short priority);
short priority, TransportRegistry* transportRegistry);
~BlockingTCPTransport();
@@ -99,7 +98,6 @@ namespace epics {
return _remoteTransportReceiveBufferSize;
}
virtual int getSocketReceiveBufferSize() const;
virtual bool isVerified() const {
@@ -178,7 +176,7 @@ namespace epics {
/**
* Send buffer.
*/
ByteBuffer* _sendBuffer;
epics::pvData::ByteBuffer* _sendBuffer;
/**
* Remote side transport revision (minor).
@@ -247,7 +245,7 @@ namespace epics {
* @param buffer[in] buffer to be sent
* @return success indicator
*/
virtual bool send(ByteBuffer* buffer);
virtual bool send(epics::pvData::ByteBuffer* buffer);
private:
/**
@@ -289,14 +287,14 @@ namespace epics {
*/
int _lastMessageStartPosition;
ByteBuffer* _socketBuffer;
epics::pvData::ByteBuffer* _socketBuffer;
int _startPosition;
Mutex* _mutex;
Mutex* _sendQueueMutex;
Mutex* _verifiedMutex;
Mutex* _monitorMutex;
epics::pvData::Mutex* _mutex;
epics::pvData::Mutex* _sendQueueMutex;
epics::pvData::Mutex* _verifiedMutex;
epics::pvData::Mutex* _monitorMutex;
ReceiveStage _stage;
@@ -328,6 +326,8 @@ namespace epics {
MonitorSender* _monitorSender;
TransportRegistry* _transportRegistry;
/**
* Internal method that clears and releases buffer.
* sendLock and sendBufferLock must be hold while calling this method.

View File

@@ -64,7 +64,7 @@ namespace epics {
BlockingTCPTransport::BlockingTCPTransport(SOCKET channel,
ResponseHandler* responseHandler, int receiveBufferSize,
short priority) :
short priority, TransportRegistry* transportRegistry) :
_closed(false), _channel(channel), _remoteTransportRevision(0),
_remoteTransportReceiveBufferSize(MAX_TCP_RECV),
_remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV),
@@ -86,7 +86,8 @@ namespace epics {
_rcvThreadId(NULL), _sendThreadId(NULL), _monitorSendQueue(
new GrowingCircularBuffer<TransportSender*> (100)),
_monitorSender(new MonitorSender(_monitorMutex,
_monitorSendQueue)) {
_monitorSendQueue)), _transportRegistry(
transportRegistry) {
_socketBuffer = new ByteBuffer(max(MAX_TCP_RECV
+MAX_ENSURE_DATA_BUFFER_SIZE, receiveBufferSize));
@@ -122,8 +123,8 @@ namespace epics {
// prepare buffer
clearAndReleaseBuffer();
// TODO: add to registry
//context.getTransportRegistry().put(this);
// add to registry
_transportRegistry->put(this);
}
BlockingTCPTransport::~BlockingTCPTransport() {
@@ -193,8 +194,8 @@ namespace epics {
_closed = true;
// TODO remove from registry
//context.getTransportRegistry().remove(this);
// remove from registry
_transportRegistry->remove(this);
// clean resources
internalClose(force);
@@ -832,11 +833,7 @@ namespace epics {
errlogSevPrintf(errlogInfo, "Connection to %s closed.",
inetAddressToString(_socketAddress).c_str());
int retval = ::shutdown(_channel, SHUT_RDWR);
if(retval<0&&errno!=ENOTCONN) errlogSevPrintf(errlogMajor,
"Error closing socket to %s: %s", inetAddressToString(
_socketAddress).c_str(), strerror(errno));
epicsSocketDestroy(_channel);
}
void BlockingTCPTransport::rcvThreadRunner(void* param) {

View File

@@ -173,7 +173,7 @@ namespace epics {
static void threadRunner(void* param);
bool processBuffer(osiSockAddr* fromAddress,
ByteBuffer* receiveBuffer);
epics::pvData::ByteBuffer* receiveBuffer);
// Context only used for logging in this class
@@ -236,7 +236,8 @@ namespace epics {
};
class BlockingUDPConnector : public Connector, NoDefaultMethods {
class BlockingUDPConnector : public Connector,
epics::pvData::NoDefaultMethods {
public:
BlockingUDPConnector(bool reuseSocket,

View File

@@ -14,6 +14,7 @@
/* EPICSv3 */
#include <errlog.h>
#include <osiSock.h>
/* standard */
#include <sys/types.h>
@@ -28,7 +29,13 @@ namespace epics {
errlogSevPrintf(errlogInfo, "Creating datagram socket to: %s",
inetAddressToString(bindAddress).c_str());
SOCKET socket = ::socket(PF_INET, SOCK_DGRAM, 0);
SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if(socket==INVALID_SOCKET) {
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
errlogSevPrintf(errlogMajor, "Error creating socket: %s",
errStr);
}
/* from MSDN:
* Note: If the setsockopt function is called before the bind

View File

@@ -51,7 +51,7 @@ namespace epics {
close(true); // close the socket and stop the thread.
delete _receiveBuffer;
delete _sendBuffer;
delete _readBuffer;
delete[] _readBuffer;
delete _mutex;
}
@@ -76,10 +76,7 @@ namespace epics {
"UDP socket %s closed.",
inetAddressToString(_bindAddress).c_str());
int retval = ::close(_channel);
if(retval<0) errlogSevPrintf(errlogMajor, "Socket close error: %s",
strerror(errno));
epicsSocketDestroy(_channel);
}
void BlockingUDPTransport::enqueueSendRequest(TransportSender* sender) {

View File

@@ -20,8 +20,6 @@
namespace epics {
namespace pvAccess {
using namespace epics::pvData;
enum ProtocolType {
TCP, UDP, SSL
};
@@ -30,7 +28,7 @@ namespace epics {
* Interface defining transport send control.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
*/
class TransportSendControl : public SerializableControl {
class TransportSendControl : public epics::pvData::SerializableControl {
public:
virtual void startMessage(int8 command, int ensureCapacity) =0;
virtual void endMessage() =0;
@@ -56,7 +54,7 @@ namespace epics {
* NOTE: these limitations allows efficient implementation.
*/
virtual void
send(ByteBuffer* buffer, TransportSendControl* control) =0;
send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) =0;
virtual void lock() =0;
virtual void unlock() =0;
@@ -67,7 +65,7 @@ namespace epics {
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: Transport.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
*/
class Transport : public DeserializableControl {
class Transport : public epics::pvData::DeserializableControl {
public:
virtual ~Transport() {
}
@@ -210,7 +208,7 @@ namespace epics {
*/
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, ByteBuffer* payloadBuffer) =0;
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) =0;
};
/**

View File

@@ -25,21 +25,15 @@ namespace epics {
template<class T>
class ArrayFIFO {
public:
/**
* Constructs an empty array deque with an initial capacity
* sufficient to hold 16 elements. Array FIFO is designed to hold
* objects allocated on the heap.
*/
ArrayFIFO();
/**
* Constructs an empty array deque with an initial capacity
* sufficient to hold the specified number of elements. Array FIFO
* is designed to hold objects allocated on the heap.
*
* @param[in] numElements lower bound on initial capacity of the deque
* @param[in] numElements lower bound on initial capacity of the
* deque. Default value is 16 elements.
*/
ArrayFIFO(size_t numElements);
ArrayFIFO(size_t numElements = 16);
~ArrayFIFO();
/**
@@ -210,7 +204,7 @@ namespace epics {
T* a = new T[newCapacity];
arraycopy(_elements, p, a, 0, r);
arraycopy(_elements, 0, a, r, p);
delete _elements;
delete[] _elements;
_elements = a;
_size = newCapacity;
_head = 0;
@@ -218,12 +212,6 @@ namespace epics {
}
template<class T>
ArrayFIFO<T>::ArrayFIFO() :
_head(0), _tail(0), _size(16), _mutex() {
_elements = new T[16];
}
template<class T>
ArrayFIFO<T>::ArrayFIFO(size_t numElements) :
_head(0), _tail(0), _mutex() {
@@ -232,7 +220,7 @@ namespace epics {
template<class T>
ArrayFIFO<T>::~ArrayFIFO() {
delete _elements;
delete[] _elements;
}
template<class T>

View File

@@ -33,7 +33,7 @@ namespace epics {
}
~GrowingCircularBuffer() {
delete _elements;
delete[] _elements;
}
/**
@@ -126,7 +126,7 @@ namespace epics {
_takePointer = 0;
_putPointer = _size;
_size *= 2;
delete _elements;
delete[] _elements;
_elements = newElements;
}
_count++;

View File

@@ -44,10 +44,11 @@ public:
// set recipient
sendTo.ia.sin_family = AF_INET;
sendTo.ia.sin_port = htons(65000);
if(inet_aton("192.168.71.129",&sendTo.ia.sin_addr)==0) {
cout<<"error in inet_aton()"<<endl;
if(aToIPAddr("192.168.71.129", 65000, &sendTo.ia)<0) {
cout<<"error in aToIPAddr(...)"<<endl;
return;
}
control->setRecipient(&sendTo);
// send the packet

View File

@@ -8,9 +8,10 @@
#include "remote.h"
#include "blockingUDP.h"
#include "logger.h"
#include "inetAddressUtil.h"
#include "hexDump.h"
#include <osiSock.h>
#include <iostream>
#include <sstream>
@@ -43,7 +44,12 @@ void DummyResponseHandler::handleResponse(osiSockAddr* responseFrom,
std::ostringstream os;
cout<<"Received new UDP datagram["<<packets+1<<"]..."<<endl;
cout<<"From: "<<inetAddressToString(responseFrom)<<endl;
char ipAddressStr[24];
ipAddrToDottedIP(&responseFrom->ia, ipAddressStr, sizeof(ipAddressStr));
cout<<"From: "<<ipAddressStr<<endl;
cout<<"Version: 0x"<<hex<<(int)version<<endl;
cout<<"Command: 0x"<<hex<<(int)command<<endl;
cout<<"Payload size: "<<dec<<payloadSize<<endl;