blockingTCPTransport - work in progress.

This commit is contained in:
miha_vitorovic
2010-12-29 15:01:42 +01:00
parent 15d2daef43
commit 2202d88020
7 changed files with 809 additions and 23 deletions
+2
View File
@@ -38,8 +38,10 @@ LIBSRCS += CreateRequestFactory.cpp
SRC_DIRS += $(PVACCESS)/remote
INC += remote.h
INC += blockingUDP.h
INC += blockingTCP.h
LIBSRCS += blockingUDPTransport.cpp
LIBSRCS += blockingUDPConnector.cpp
LIBSRCS += blockingTCPTransport.cpp
LIBRARY = pvAccess
+4
View File
@@ -8,6 +8,10 @@
#ifndef CONSTANTS_H_
#define CONSTANTS_H_
#include <pvType.h>
using namespace epics::pvData;
namespace epics {
namespace pvAccess {
+269
View File
@@ -0,0 +1,269 @@
/*
* blockingTCP.h
*
* Created on: Dec 29, 2010
* Author: Miha Vitorovic
*/
#ifndef BLOCKINGTCP_H_
#define BLOCKINGTCP_H_
/* pvAccess */
#include "caConstants.h"
#include "remote.h"
/* pvData */
#include <byteBuffer.h>
#include <pvType.h>
#include <lock.h>
/* EPICSv3 */
#include <osdSock.h>
#include <osiSock.h>
using namespace epics::pvData;
namespace epics {
namespace pvAccess {
enum ReceiveStage {
READ_FROM_SOCKET, PROCESS_HEADER, PROCESS_PAYLOAD, NONE
};
class BlockingTCPTransport : public Transport,
public TransportSendControl {
public:
BlockingTCPTransport(SOCKET channel,
ResponseHandler* responseHandler, int receiveBufferSize,
short priority);
bool isClosed() const {
return _closed;
}
void setRemoteMinorRevision(int minorRevision) {
_remoteTransportRevision = minorRevision;
}
void setRemoteTransportReceiveBufferSize(
int remoteTransportReceiveBufferSize) {
_remoteTransportReceiveBufferSize
= remoteTransportReceiveBufferSize;
}
void setRemoteTransportSocketReceiveBufferSize(
int socketReceiveBufferSize) {
_remoteTransportSocketReceiveBufferSize
= socketReceiveBufferSize;
}
virtual const String getType() const {
return String("TCP");
}
virtual void aliveNotification() {
// noop
}
virtual void changedTransport() {
// noop
}
virtual const osiSockAddr* getRemoteAddress() const {
return _socketAddress;
}
virtual int16 getPriority() const {
return _priority;
}
virtual int getReceiveBufferSize() const {
return _socketBuffer->getSize();
}
virtual int getSocketReceiveBufferSize() const;
virtual bool isVerified() const {
Lock lock(_verifiedMutex);
return _verified;
}
virtual void verified() {
Lock lock(_verifiedMutex);
_verified = true;
}
virtual void setRecipient(const osiSockAddr* sendTo) {
// noop
}
/**
* @param[in] timeout Timeout in seconds
*/
bool waitUntilVerified(double timeout);
virtual void flush(bool lastMessageCompleted);
virtual void startMessage(int8 command, int ensureCapacity);
virtual void endMessage();
virtual void flushSerializeBuffer() {
flush(false);
}
virtual void ensureBuffer(int size);
virtual void ensureData(int size);
protected:
/**
* Connection status
*/
bool volatile _closed;
/**
* Corresponding channel.
*/
SOCKET _channel;
/**
* Cached socket address.
*/
osiSockAddr* _socketAddress;
/**
* Send buffer.
*/
ByteBuffer* _sendBuffer;
/**
* Remote side transport revision (minor).
*/
int8 _remoteTransportRevision;
/**
* Remote side transport receive buffer size.
*/
int _remoteTransportReceiveBufferSize;
/**
* Remote side transport socket receive buffer size.
*/
int _remoteTransportSocketReceiveBufferSize;
/**
* Priority.
* NOTE: Priority cannot just be changed, since it is registered in transport registry with given priority.
*/
short _priority;
// TODO to be implemeneted
/**
* CAS response handler.
*/
ResponseHandler* _responseHandler;
/**
* Read sync. object monitor.
*/
//Object _readMonitor = new Object();
/**
* Total bytes received.
*/
int64 volatile _totalBytesReceived;
/**
* Total bytes sent.
*/
int64 volatile _totalBytesSent;
/**
* Marker to send.
*/
int volatile _markerToSend;
bool _verified;
int64 volatile _remoteBufferFreeSpace;
void processReadCached(bool nestedCall, ReceiveStage inStage,
int requiredBytes, bool addToBuffer);
private:
/**
* Default marker period.
*/
static const int MARKER_PERIOD = 1024;
static const int MAX_ENSURE_DATA_BUFFER_SIZE = 1024;
/**
* Send buffer size.
*/
int _maxPayloadSize;
/**
* Send buffer size.
*/
int _socketSendBufferSize;
/**
* Marker "period" in bytes (every X bytes marker should be set).
*/
int64 _markerPeriodBytes;
/**
* Next planned marker position.
*/
int64 _nextMarkerPosition;
/**
* Send pending flag.
*/
bool _sendPending;
/**
* Last message start position.
*/
int _lastMessageStartPosition;
ByteBuffer* _socketBuffer;
int _startPosition;
Mutex* _mutex;
Mutex* _sendQueueMutex;
Mutex* _verifiedMutex;
Mutex* _monitorMutex;
ReceiveStage _stage;
int8 _lastSegmentedMessageType;
int8 _lastSegmentedMessageCommand;
int _storedPayloadSize;
int _storedPosition;
int _storedLimit;
short _magicAndVersion;
int8 _packetType;
int8 _command;
int _payloadSize;
bool _flushRequested;
/**
* Internal method that clears and releases buffer.
* sendLock and sendBufferLock must be hold while calling this method.
*/
void clearAndReleaseBuffer();
void endMessage(bool hasMoreSegments);
bool flush();
};
}
}
#endif /* BLOCKINGTCP_H_ */
+518
View File
@@ -0,0 +1,518 @@
/*
* blockingTCPTransport.cpp
*
* Created on: Dec 29, 2010
* Author: Miha Vitorovic
*/
#include "blockingTCP.h"
#include "inetAddressUtil.h"
/* pvData */
#include <lock.h>
#include <byteBuffer.h>
#include <epicsException.h>
/* EPICSv3 */
#include <osdSock.h>
#include <osiSock.h>
#include <epicsThread.h>
#include <errlog.h>
/* standard */
#include <sys/types.h>
#include <sys/socket.h>
#include <algorithm>
#include <sstream>
using namespace epics::pvData;
using std::max;
using std::min;
using std::ostringstream;
namespace epics {
namespace pvAccess {
BlockingTCPTransport::BlockingTCPTransport(SOCKET channel,
ResponseHandler* responseHandler, int receiveBufferSize,
short priority) :
_closed(false), _channel(channel), _remoteTransportRevision(0),
_remoteTransportReceiveBufferSize(MAX_TCP_RECV),
_remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV),
_priority(priority), _responseHandler(responseHandler),
_totalBytesReceived(0), _totalBytesSent(0),
_markerToSend(0), _verified(false), _remoteBufferFreeSpace(
LONG_LONG_MAX), _markerPeriodBytes(MARKER_PERIOD),
_nextMarkerPosition(_markerPeriodBytes),
_sendPending(false), _lastMessageStartPosition(0), _mutex(
new Mutex()), _sendQueueMutex(new Mutex()),
_verifiedMutex(new Mutex()), _monitorMutex(new Mutex()),
_stage(READ_FROM_SOCKET), _lastSegmentedMessageType(0),
_lastSegmentedMessageCommand(0), _storedPayloadSize(0),
_storedPosition(0), _storedLimit(0), _magicAndVersion(0),
_packetType(0), _command(0), _payloadSize(0),
_flushRequested(false) {
_socketBuffer = new ByteBuffer(max(MAX_TCP_RECV
+MAX_ENSURE_DATA_BUFFER_SIZE, receiveBufferSize));
_socketBuffer->setPosition(_socketBuffer->getLimit());
_startPosition = _socketBuffer->getPosition();
// allocate buffer
_sendBuffer = new ByteBuffer(_socketBuffer->getSize());
_maxPayloadSize = _sendBuffer->getSize()-2*CA_MESSAGE_HEADER_SIZE; // one for header, one for flow control
// get send buffer size
socklen_t intLen = sizeof(int);
int retval = getsockopt(_channel, SOL_SOCKET, SO_SNDBUF,
&_socketSendBufferSize, &intLen);
if(retval<0) {
_socketSendBufferSize = MAX_TCP_RECV;
errlogSevPrintf(errlogMinor,
"Unable to retrieve socket send buffer size: %s",
strerror(errno));
}
socklen_t saSize = sizeof(sockaddr);
retval = getpeername(_channel, &(_socketAddress->sa), &saSize);
if(retval<0) {
errlogSevPrintf(errlogMajor,
"Error fetching socket remote address: %s", strerror(
errno));
}
// prepare buffer
clearAndReleaseBuffer();
// TODO: add to registry
//context.getTransportRegistry().put(this);
}
void BlockingTCPTransport::clearAndReleaseBuffer() {
// NOTE: take care that nextMarkerPosition is set right
// fix position to be correct when buffer is cleared
// do not include pre-buffered flow control message; not 100% correct, but OK
_nextMarkerPosition -= _sendBuffer->getPosition()
-CA_MESSAGE_HEADER_SIZE;
_sendQueueMutex->lock();
_flushRequested = false;
_sendQueueMutex->unlock();
_sendBuffer->clear();
_sendPending = false;
// prepare ACK marker
_sendBuffer->putShort(CA_MAGIC_AND_VERSION);
_sendBuffer->putByte(1); // control data
_sendBuffer->putByte(1); // marker ACK
_sendBuffer->putInt(0);
}
int BlockingTCPTransport::getSocketReceiveBufferSize() const {
// Get value of the SO_RCVBUF option for this DatagramSocket,
// that is the buffer size used by the platform for input on
// this DatagramSocket.
int sockBufSize;
socklen_t intLen = sizeof(int);
int retval = getsockopt(_channel, SOL_SOCKET, SO_RCVBUF,
&sockBufSize, &intLen);
if(retval<0) errlogSevPrintf(errlogMajor,
"Socket getsockopt SO_RCVBUF error: %s", strerror(errno));
return sockBufSize;
}
bool BlockingTCPTransport::waitUntilVerified(double timeout) {
double internalTimeout = timeout;
bool internalVerified = false;
_verifiedMutex->lock();
internalVerified = _verified;
_verifiedMutex->unlock();
while(!internalVerified&&internalTimeout<=0) {
epicsThreadSleep(min(0.1, internalTimeout));
internalTimeout -= 0.1;
_verifiedMutex->lock();
internalVerified = _verified;
_verifiedMutex->unlock();
}
return internalVerified;
}
void BlockingTCPTransport::flush(bool lastMessageCompleted) {
// automatic end
endMessage(!lastMessageCompleted);
bool moreToSend = true;
// TODO closed check !!!
while(moreToSend) {
moreToSend = !flush();
// all sent, exit
if(!moreToSend) break;
// TODO solve this sleep in a better way
epicsThreadSleep(0.01);
}
_lastMessageStartPosition = _sendBuffer->getPosition();
// start with last header
if(!lastMessageCompleted&&_lastSegmentedMessageType!=0) startMessage(
_lastSegmentedMessageCommand, 0);
}
void BlockingTCPTransport::startMessage(int8 command,
int ensureCapacity) {
_lastMessageStartPosition = -1;
ensureBuffer(CA_MESSAGE_HEADER_SIZE+ensureCapacity);
_lastMessageStartPosition = _sendBuffer->getPosition();
_sendBuffer->putShort(CA_MAGIC_AND_VERSION);
_sendBuffer->putByte(_lastSegmentedMessageType); // data
_sendBuffer->putByte(command); // command
_sendBuffer->putInt(0); // temporary zero payload
}
void BlockingTCPTransport::endMessage() {
endMessage(false);
}
void BlockingTCPTransport::ensureBuffer(int size) {
if(_sendBuffer->getRemaining()>=size) return;
// too large for buffer...
if(_maxPayloadSize<size) {
ostringstream temp;
temp<<"requested for buffer size "<<size<<", but only ";
temp<<_maxPayloadSize<<" available.";
THROW_BASE_EXCEPTION(temp.str().c_str());
}
while(_sendBuffer->getRemaining()<size&&!_closed)
flush(false);
if(!_closed) THROW_BASE_EXCEPTION("transport closed");
}
void BlockingTCPTransport::endMessage(bool hasMoreSegments) {
if(_lastMessageStartPosition>=0) {
// set message size
_sendBuffer->putInt(_lastMessageStartPosition+sizeof(int16)+2,
_sendBuffer->getPosition()-_lastMessageStartPosition
-CA_MESSAGE_HEADER_SIZE);
int flagsPosition = _lastMessageStartPosition+sizeof(int16);
// set segmented bit
if(hasMoreSegments) {
// first segment
if(_lastSegmentedMessageType==0) {
int8 type = _sendBuffer->getByte(flagsPosition);
// set first segment bit
_sendBuffer->putByte(flagsPosition, (int8)(type|0x10));
// first + last segment bit == in-between segment
_lastSegmentedMessageType = (int8)(type|0x30);
_lastSegmentedMessageCommand = _sendBuffer->getByte(
flagsPosition+1);
}
}
else {
// last segment
if(_lastSegmentedMessageType!=0) {
// set last segment bit (by clearing first segment bit)
_sendBuffer->putByte(flagsPosition,
(int8)(_lastSegmentedMessageType&0xEF));
_lastSegmentedMessageType = 0;
}
}
// manage markers
int position = _sendBuffer->getPosition();
int bytesLeft = _sendBuffer->getRemaining();
if(position>=_nextMarkerPosition&&bytesLeft
>=CA_MESSAGE_HEADER_SIZE) {
_sendBuffer->putShort(CA_MAGIC_AND_VERSION);
_sendBuffer->putByte(1); // control data
_sendBuffer->putByte(0); // marker
_sendBuffer->putInt((int)(_totalBytesSent+position
+CA_MESSAGE_HEADER_SIZE));
_nextMarkerPosition = position+_markerPeriodBytes;
}
}
}
void BlockingTCPTransport::ensureData(int size) {
// enough of data?
if(_socketBuffer->getRemaining()>=size) return;
// too large for buffer...
if(_maxPayloadSize<size) {
ostringstream temp;
temp<<"requested for buffer size "<<size<<", but only ";
temp<<_maxPayloadSize<<" available.";
THROW_BASE_EXCEPTION(temp.str().c_str());
}
// subtract what was already processed
_storedPayloadSize -= _socketBuffer->getPosition()-_storedPosition;
// no more data and we have some payload left => read buffer
if(_storedPayloadSize>=size) {
//System.out.println("storedPayloadSize >= size, remaining:" + socketBuffer.remaining());
// just read up remaining payload
// since there is no data on the buffer, read to the beginning of it, at least size bytes
processReadCached(true, PROCESS_PAYLOAD, size, false);
_storedPosition = _socketBuffer->getPosition();
_storedLimit = _socketBuffer->getLimit();
_socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize,
_storedLimit));
}
else {
// copy remaining bytes, if any
int remainingBytes = _socketBuffer->getRemaining();
for(int i = 0; i<remainingBytes; i++)
_socketBuffer->putByte(i, _socketBuffer->getByte());
// read what is left
_socketBuffer->setLimit(_storedLimit);
_stage = PROCESS_HEADER;
processReadCached(true, NONE, size, false);
// copy before position
for(int i = remainingBytes-1, j = _socketBuffer->getPosition()
-1; i>=0; i--, j--)
_socketBuffer->putByte(j, _socketBuffer->getByte(i));
_startPosition = _socketBuffer->getPosition()-remainingBytes;
_socketBuffer->setPosition(_startPosition);
_storedPosition = _startPosition; //socketBuffer.position();
_storedLimit = _socketBuffer->getLimit();
_socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize,
_storedLimit));
// add if missing...
if(!_closed&&_socketBuffer->getRemaining()<size) ensureData(
size);
}
if(_closed) THROW_BASE_EXCEPTION("transport closed");
}
void BlockingTCPTransport::processReadCached(bool nestedCall,
ReceiveStage inStage, int requiredBytes, bool addToBuffer) {
try {
while(!_closed) {
if(_stage==READ_FROM_SOCKET||inStage!=NONE) {
int currentStartPosition;
if(addToBuffer) {
currentStartPosition = _socketBuffer->getPosition();
_socketBuffer->setPosition(
_socketBuffer->getLimit());
_socketBuffer->setLimit(_socketBuffer->getSize());
}
else {
// add to bytes read
_totalBytesReceived
+= (_socketBuffer->getPosition()
-_startPosition);
// copy remaining bytes, if any
int remainingBytes = _socketBuffer->getRemaining();
int endPosition = MAX_ENSURE_DATA_BUFFER_SIZE
+remainingBytes;
for(int i = MAX_ENSURE_DATA_BUFFER_SIZE; i
<endPosition; i++)
_socketBuffer->putByte(i,
_socketBuffer->getByte());
currentStartPosition = _startPosition
= MAX_ENSURE_DATA_BUFFER_SIZE;
_socketBuffer->setPosition(
MAX_ENSURE_DATA_BUFFER_SIZE+remainingBytes);
_socketBuffer->setLimit(_socketBuffer->getSize());
}
// read at least requiredBytes bytes
int requiredPosition = (currentStartPosition
+requiredBytes);
while(_socketBuffer->getPosition()<requiredPosition) {
// read
char readBuffer[MAX_TCP_RECV];
size_t maxToRead = min(MAX_TCP_RECV,
_socketBuffer->getRemaining());
ssize_t bytesRead = recv(_channel, readBuffer,
maxToRead, 0);
_socketBuffer->put(readBuffer,0,maxToRead);
if(bytesRead<0) {
// error (disconnect, end-of-stream) detected
close(true);
if(nestedCall) THROW_BASE_EXCEPTION(
"bytesRead < 0");
return;
}
}
_socketBuffer->setLimit(_socketBuffer->getPosition());
_socketBuffer->setPosition(currentStartPosition);
// notify liveness
aliveNotification();
// exit
if(inStage!=NONE) return;
_stage = PROCESS_HEADER;
}
if(_stage==PROCESS_HEADER) {
// ensure CAConstants.CA_MESSAGE_HEADER_SIZE bytes of data
if(_socketBuffer->getRemaining()<CA_MESSAGE_HEADER_SIZE) processReadCached(
true, PROCESS_HEADER, CA_MESSAGE_HEADER_SIZE,
false);
// first byte is CA_MAGIC
// second byte version - major/minor nibble
// check magic and version at once
_magicAndVersion = _socketBuffer->getShort();
if((short)(_magicAndVersion&0xFFF0)
!=CA_MAGIC_AND_MAJOR_VERSION) {
// error... disconnect
errlogSevPrintf(
errlogMinor,
"Invalid header received from client %s, disconnecting...",
inetAddressToString(_socketAddress).c_str());
close(true);
return;
}
// data vs. control packet
_packetType = _socketBuffer->getByte();
// command
_command = _socketBuffer->getByte();
// read payload size
_payloadSize = _socketBuffer->getInt();
// data
int8 type = (int8)(_packetType&0x0F);
if(type==0) {
_stage = PROCESS_PAYLOAD;
}
else if(type==1) {
if(_command==0) {
if(_markerToSend==0) _markerToSend
= _payloadSize; // TODO send back response
}
else //if (command == 1)
{
int difference = (int)_totalBytesSent
-_payloadSize+CA_MESSAGE_HEADER_SIZE;
// overrun check
if(difference<0) difference += INT_MAX;
_remoteBufferFreeSpace
= _remoteTransportReceiveBufferSize
+_remoteTransportSocketReceiveBufferSize
-difference;
// TODO if this is calculated wrong, this can be critical !!!
}
// no payload
//stage = ReceiveStage.PROCESS_HEADER;
continue;
}
else {
errlogSevPrintf(
errlogMajor,
"Unknown packet type %d, received from client %s, disconnecting...",
type,
inetAddressToString(_socketAddress).c_str());
close(true);
return;
}
}
if(_stage==PROCESS_PAYLOAD) {
// read header
int8 version = (int8)(_magicAndVersion&0xFF);
// last segment bit set (means in-between segment or last segment)
bool notFirstSegment = (_packetType&0x20)!=0;
_storedPayloadSize = _payloadSize;
// if segmented, exit reading code
if(nestedCall&&notFirstSegment) return;
// NOTE: nested data (w/ payload) messages between segmented messages are not supported
_storedPosition = _socketBuffer->getPosition();
_storedLimit = _socketBuffer->getLimit();
_socketBuffer->setLimit(min(_storedPosition
+_storedPayloadSize, _storedLimit));
try {
// handle response
_responseHandler->handleResponse(_socketAddress,
this, version, _command, _payloadSize,
_socketBuffer);
} catch(...) {
//noop
}
/*
* Java finally start
*/
_socketBuffer->setLimit(_storedLimit);
int newPosition = _storedPosition+_storedPayloadSize;
if(newPosition>_storedLimit) {
newPosition -= _storedLimit;
_socketBuffer->setPosition(_storedLimit);
processReadCached(true, PROCESS_PAYLOAD,
newPosition, false);
newPosition += _startPosition;
}
_socketBuffer->setPosition(newPosition);
// TODO discard all possible segments?!!!
/*
* Java finally end
*/
_stage = PROCESS_HEADER;
continue;
}
}
} catch(...) {
// close connection
close(true);
if(nestedCall) throw;
}
}
bool BlockingTCPTransport::flush() {
// TODO implement!
return true;
}
}
}
+1 -9
View File
@@ -49,14 +49,6 @@ namespace epics {
return String("UDP");
}
virtual int8 getMajorRevision() const {
return CA_MAJOR_PROTOCOL_REVISION;
}
virtual int8 getMinorRevision() const {
return CA_MINOR_PROTOCOL_REVISION;
}
virtual int getReceiveBufferSize() const {
return _receiveBuffer->getSize();
}
@@ -169,7 +161,7 @@ namespace epics {
}
protected:
bool _closed;
bool volatile _closed;
/**
* Response handler.
+4 -9
View File
@@ -108,12 +108,9 @@ namespace epics {
}
void BlockingUDPTransport::endMessage() {
int oldPosition = _sendBuffer->getPosition();
_sendBuffer->setPosition(_lastMessageStartPosition
+(sizeof(int16)+2));
_sendBuffer->putInt(oldPosition-_lastMessageStartPosition
-CA_MESSAGE_HEADER_SIZE);
_sendBuffer->setPosition(oldPosition);
_sendBuffer->putInt(_lastMessageStartPosition+(sizeof(int16)+2),
_sendBuffer->getPosition()-_lastMessageStartPosition
-CA_MESSAGE_HEADER_SIZE);
}
@@ -295,9 +292,7 @@ namespace epics {
// this DatagramSocket.
int sockBufSize;
socklen_t intLen;
intLen = sizeof(int);
socklen_t intLen = sizeof(int);
int retval = getsockopt(_channel, SOL_SOCKET, SO_RCVBUF,
&sockBufSize, &intLen);
+11 -5
View File
@@ -8,6 +8,8 @@
#ifndef REMOTE_H_
#define REMOTE_H_
#include "caConstants.h"
#include <serialize.h>
#include <pvType.h>
#include <byteBuffer.h>
@@ -92,13 +94,17 @@ namespace epics {
* Transport protocol major revision.
* @return protocol major revision.
*/
virtual int8 getMajorRevision() const =0;
virtual int8 getMajorRevision() const {
return CA_MAJOR_PROTOCOL_REVISION;
}
/**
* Transport protocol minor revision.
* @return protocol minor revision.
*/
virtual int8 getMinorRevision() const =0;
virtual int8 getMinorRevision() const {
return CA_MINOR_PROTOCOL_REVISION;
}
/**
* Get receive buffer size.
@@ -238,7 +244,6 @@ namespace epics {
};
/**
* Interface defining socket connector (Connector-Transport pattern).
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
@@ -256,8 +261,9 @@ namespace epics {
* @return transport instance.
* @throws ConnectionException
*/
virtual Transport* connect(TransportClient* client, ResponseHandler* responseHandler,
osiSockAddr* address, short transportRevision, short priority) =0;
virtual Transport* connect(TransportClient* client,
ResponseHandler* responseHandler, osiSockAddr* address,
short transportRevision, short priority) =0;
};