direct, i.e. no-copy, de/serialization support; prepared some IF for bulk send

This commit is contained in:
Matej Sekoranja
2013-04-12 21:55:25 +02:00
parent baeac17490
commit f72f89b4d2
13 changed files with 432 additions and 110 deletions
+313 -53
View File
@@ -84,7 +84,9 @@ namespace pvAccess {
_channel(channel),
_priority(priority),
_responseHandler(responseHandler),
#if FLOW_CONTROL
_markerPeriodBytes(MARKER_PERIOD),
#endif
_flushStrategy(DELAYED),
_rcvThreadId(0),
_sendThreadId(0),
@@ -96,7 +98,9 @@ namespace pvAccess {
_remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV),
_sendQueue(),
//_monitorSendQueue(),
#if FLOW_CONTROL
_nextMarkerPosition(_markerPeriodBytes),
#endif
_sendPending(false),
_lastMessageStartPosition(0),
_lastSegmentedMessageType(0),
@@ -112,13 +116,20 @@ namespace pvAccess {
_command(0),
_payloadSize(0),
_stage(READ_FROM_SOCKET),
_directPayloadRead(0),
_directBuffer(0),
#if FLOW_CONTROL
_totalBytesReceived(0),
#endif
_closed(),
_sendThreadExited(false),
_verified(false),
_verified(false)
#if FLOW_CONTROL
,
_markerToSend(0),
_totalBytesSent(0),
_remoteBufferFreeSpace(INT64_MAX)
#endif
{
PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(blockingTCPTransport);
@@ -227,10 +238,12 @@ namespace pvAccess {
}
void BlockingTCPTransport::clearAndReleaseBuffer() {
#if FLOW_CONTROL
// NOTE: take care that nextMarkerPosition is set right
// fix position to be correct when buffer is cleared
// do not include pre-buffered flow control message; not 100% correct, but OK
_nextMarkerPosition -= _sendBuffer->getPosition() - CA_MESSAGE_HEADER_SIZE;
#endif
_sendQueueMutex.lock();
_flushRequested = false;
@@ -240,12 +253,14 @@ namespace pvAccess {
_sendPending = false;
#if FLOW_CONTROL
// prepare ACK marker
_sendBuffer->putByte(CA_MAGIC);
_sendBuffer->putByte(CA_VERSION);
_sendBuffer->putByte(0x01 | _byteOrderFlag); // control data
_sendBuffer->putByte(1); // marker ACK
_sendBuffer->putInt(0);
#endif
}
void BlockingTCPTransport::close() {
@@ -311,12 +326,13 @@ namespace pvAccess {
endMessage(!lastMessageCompleted);
bool moreToSend = true;
// TODO closed check !!!
while(moreToSend) {
moreToSend = !flush();
// all sent, exit
if(!moreToSend) break;
// TODO check if this is OK
else if (_closed.get()) THROW_BASE_EXCEPTION("transport closed");
// TODO solve this sleep in a better way
epicsThreadSleep(0.01);
@@ -378,9 +394,20 @@ namespace pvAccess {
// alignBuffer(CA_ALIGNMENT);
// set paylaod size
_sendBuffer->putInt(_lastMessageStartPosition+sizeof(int16)+2,
_sendBuffer->getPosition()-_lastMessageStartPosition
-CA_MESSAGE_HEADER_SIZE);
const size_t payloadSize = _sendBuffer->getPosition()-_lastMessageStartPosition-CA_MESSAGE_HEADER_SIZE;
// TODO by spec?
// ignore empty segmented messages
if (payloadSize == 0 && _lastSegmentedMessageType != 0)
{
_sendBuffer->setPosition(_lastMessageStartPosition);
if (!hasMoreSegments)
_lastSegmentedMessageType = 0;
return;
}
_sendBuffer->putInt(_lastMessageStartPosition+sizeof(int16)+2, payloadSize);
int flagsPosition = _lastMessageStartPosition+sizeof(int16);
// set segmented bit
@@ -408,28 +435,31 @@ namespace pvAccess {
}
}
#if FLOW_CONTROL
// manage markers
int position = _sendBuffer->getPosition();
int bytesLeft = _sendBuffer->getRemaining();
if(unlikely(position>=_nextMarkerPosition &&
if(unlikely(position>=_nextMarkerPosition &&
bytesLeft>=CA_MESSAGE_HEADER_SIZE)) {
_sendBuffer->putByte(CA_MAGIC);
_sendBuffer->putByte(CA_VERSION);
_sendBuffer->putByte(0x01 | _byteOrderFlag); // control data
_sendBuffer->putByte(0); // marker
_sendBuffer->putInt((int)(_totalBytesSent+position+CA_MESSAGE_HEADER_SIZE));
s_sendBuffer->putInt((int)(_totalBytesSent+position+CA_MESSAGE_HEADER_SIZE));
_nextMarkerPosition = position+_markerPeriodBytes;
}
#endif
}
}
void BlockingTCPTransport::ensureData(size_t size) {
void BlockingTCPTransport::ensureData(size_t size) {
// enough of data?
if(likely(_socketBuffer->getRemaining()>=size)) return;
const size_t remainingBytes = _socketBuffer->getRemaining();
if (likely(remainingBytes>=size)) return;
// too large for buffer...
if(unlikely(MAX_ENSURE_DATA_BUFFER_SIZE<size)) {
if (unlikely(MAX_ENSURE_DATA_BUFFER_SIZE<size)) {
ostringstream temp;
temp<<"requested for buffer size "<<size<<", but only ";
temp<<MAX_ENSURE_DATA_BUFFER_SIZE<<" available.";
@@ -440,7 +470,8 @@ namespace pvAccess {
_storedPayloadSize -= _socketBuffer->getPosition()-_storedPosition;
// no more data and we have some payload left => read buffer
if(likely(_storedPayloadSize>=size)) {
if (likely(_storedPayloadSize>=size))
{
//LOG(logLevelInfo,
// "storedPayloadSize >= size, remaining: %d",
// _socketBuffer->getRemaining());
@@ -453,63 +484,264 @@ namespace pvAccess {
_socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize,
_storedLimit));
}
else {
// copy remaining bytes, if any
int remainingBytes = _socketBuffer->getRemaining();
for(int i = 0; i<remainingBytes; i++)
else
{
// copy remaining bytes to safe region of buffer, if any
for(size_t i = 0; i<remainingBytes; i++)
_socketBuffer->putByte(i, _socketBuffer->getByte());
// read what is left
// extend limit to what was read
_socketBuffer->setLimit(_storedLimit);
_stage = PROCESS_HEADER;
processReadCached(true, UNDEFINED_STAGE, size-remainingBytes);
// copy before position
for(int i = remainingBytes-1, j = _socketBuffer->getPosition()
-1; i>=0; i--, j--)
_socketBuffer->putByte(j, _socketBuffer->getByte(i));
_startPosition = _socketBuffer->getPosition()-remainingBytes;
_socketBuffer->setPosition(_startPosition);
if (unlikely(remainingBytes > 0))
{
// copy saved back to before position
for(int i = static_cast<int>(remainingBytes-1), j = _socketBuffer->getPosition()-1;
i>=0;
i--, j--)
_socketBuffer->putByte(j, _socketBuffer->getByte(i));
_startPosition = _socketBuffer->getPosition()-remainingBytes;
_socketBuffer->setPosition(_startPosition);
_storedPosition = _startPosition;
}
else
{
_storedPosition = _socketBuffer->getPosition();
}
_storedPosition = _startPosition; //socketBuffer.position();
_storedLimit = _socketBuffer->getLimit();
_socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize,
_storedLimit));
// add if missing...
// add if missing, since UNDEFINED_STAGE and return less...
if(unlikely(!_closed.get()&&(_socketBuffer->getRemaining()<size)))
ensureData(size);
}
if(unlikely(_closed.get())) THROW_BASE_EXCEPTION("transport closed");
if (unlikely(_closed.get())) THROW_BASE_EXCEPTION("transport closed");
}
void BlockingTCPTransport::alignData(size_t alignment) {
// not space optimal (always requires 7-bytes), but fast
if(unlikely(_socketBuffer->getRemaining()<(alignment-1)))
if (unlikely(_socketBuffer->getRemaining()<(alignment-1)))
ensureData(alignment-1);
_socketBuffer->align(alignment);
}
bool BlockingTCPTransport::directSerialize(ByteBuffer */*existingBuffer*/, const char* toSerialize,
std::size_t elementCount, std::size_t elementSize)
{
// TODO overflow check, size_t type, other is int32 for payloadSize header field !!!
// TODO do not ignore or new field in max message size in connection validation
std::size_t count = elementCount * elementSize;
// TODO find smart limit
// check if direct mode actually pays off
if (count < 1024)
return false;
// first end current message indicating the we will segment
endMessage(true);
// append segmented message header
startMessage(_lastSegmentedMessageCommand, 0);
// set segmented message size
_sendBuffer->putInt(_lastMessageStartPosition+sizeof(int16)+2, count);
// flush (TODO this is code is duplicated)
bool moreToSend = true;
while (moreToSend) {
moreToSend = !flush();
// all sent, exit
if(!moreToSend) break;
// TODO check if this is OK
else if (_closed.get()) THROW_BASE_EXCEPTION("transport closed");
// TODO solve this sleep in a better way
epicsThreadSleep(0.01);
}
_lastMessageStartPosition = _sendBuffer->getPosition();
// TODO think if alignment is preserved after...
try {
//LOG(logLevelInfo,
// "Sending (direct) %d bytes in the packet to %s.",
// count,
// inetAddressToString(_socketAddress).c_str());
const char* ptr = toSerialize;
while(count>0) {
ssize_t bytesSent = ::send(_channel,
ptr,
count, 0);
if(unlikely(bytesSent<0)) {
int socketError = SOCKERRNO;
// spurious EINTR check
if (socketError==SOCK_EINTR)
continue;
// TODO check this (copy below)... consolidate!!!
if (socketError==SOCK_ENOBUFS) {
// TODO improve this
epicsThreadSleep(0.01);
continue;
}
// connection lost
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
ostringstream temp;
temp<<"error in sending TCP data: "<<errStr;
//LOG(logLevelError, "%s", temp.str().c_str());
THROW_BASE_EXCEPTION(temp.str().c_str());
}
else if(unlikely(bytesSent==0)) {
// TODO WINSOCK indicates disconnect by returning zero here !!!
epicsThreadSleep(0.01);
continue;
}
ptr += bytesSent;
count -= bytesSent;
//LOG(logLevelInfo,
// "Sent (in this pass) %d bytes, remaining %d bytes.",
// bytesSent, count);
} // while
} catch(...) {
close();
throw;
}
// continue where we left before calling directSerialize
startMessage(_lastSegmentedMessageCommand, 0);
return true;
}
bool BlockingTCPTransport::directDeserialize(ByteBuffer *existingBuffer, char* deserializeTo,
std::size_t elementCount, std::size_t elementSize)
{
return false;
// _socketBuffer == existingBuffer
std::size_t bytesToBeDeserialized = elementCount*elementSize;
// TODO check if bytesToDeserialized < threshold that direct pays off?
_directPayloadRead = bytesToBeDeserialized;
_directBuffer = deserializeTo;
while (true)
{
// retrieve what's already in buffers
size_t availableBytes = min(_directPayloadRead, _socketBuffer->getRemaining());
existingBuffer->getArray(_directBuffer, availableBytes);
_directPayloadRead -= availableBytes;
if (_directPayloadRead == 0)
return true;
_directBuffer += availableBytes;
// subtract what was already processed
size_t pos = _socketBuffer->getPosition();
_storedPayloadSize -= pos -_storedPosition;
_storedPosition = pos;
// no more data and we have some payload left => read buffer
if (likely(_storedPayloadSize > 0))
{
size_t bytesToRead = std::min(_directPayloadRead, _storedPayloadSize);
processReadIntoDirectBuffer(bytesToRead);
// std::cout << "d: " << bytesToRead << std::endl;
_storedPayloadSize -= bytesToRead;
_directPayloadRead -= bytesToRead;
}
if (_directPayloadRead == 0)
return true;
_stage = PROCESS_HEADER;
processReadCached(true, UNDEFINED_STAGE, _directPayloadRead);
_storedPosition = _socketBuffer->getPosition();
_storedLimit = _socketBuffer->getLimit();
_socketBuffer->setLimit(
min(_storedPosition + _storedPayloadSize, _storedLimit)
);
}
return true;
}
void BlockingTCPTransport::processReadIntoDirectBuffer(size_t bytesToRead)
{
while (bytesToRead > 0)
{
ssize_t bytesRead = recv(_channel, _directBuffer, bytesToRead, 0);
// std::cout << "d: " << bytesRead << std::endl;
if(unlikely(bytesRead<=0))
{
if (bytesRead<0)
{
int socketError = SOCKERRNO;
// interrupted or timeout
if (socketError == EINTR ||
socketError == EAGAIN ||
socketError == EWOULDBLOCK)
continue;
}
// error (disconnect, end-of-stream) detected
close();
THROW_BASE_EXCEPTION("bytesRead < 0");
return;
}
bytesToRead -= bytesRead;
_directBuffer += bytesRead;
}
}
void BlockingTCPTransport::processReadCached(bool nestedCall,
ReceiveStage inStage, size_t requiredBytes) {
try {
// TODO we need to throw exception in nextedCall not just bail out!!!!
while(likely(!_closed.get())) {
if(_stage==READ_FROM_SOCKET||inStage!=UNDEFINED_STAGE) {
// add to bytes read
#if FLOW_CONTROL
int currentPosition = _socketBuffer->getPosition();
_totalBytesReceived += (currentPosition - _startPosition);
#endif
// preserve alignment
int currentStartPosition = _startPosition =
MAX_ENSURE_DATA_BUFFER_SIZE; // "TODO uncomment align" + (unsigned int)currentPosition % CA_ALIGNMENT;
// copy remaining bytes, if any
int remainingBytes = _socketBuffer->getRemaining();
int endPosition = currentStartPosition + remainingBytes;
// TODO memmove
for(int i = MAX_ENSURE_DATA_BUFFER_SIZE; i<endPosition; i++)
@@ -524,8 +756,13 @@ namespace pvAccess {
while(_socketBuffer->getPosition()<requiredPosition) {
// read
int pos = _socketBuffer->getPosition();
ssize_t bytesRead = recv(_channel, (char*)(_socketBuffer->getArray()+pos),
_socketBuffer->getRemaining(), 0);
// _socketBuffer->getRemaining(), 0);
// TODO we assume that caller is smart and requiredBytes > remainingBytes
// if in direct read mode, try to read only header so that rest can be read directly to direct buffers
(_directPayloadRead > 0 && inStage == PROCESS_HEADER) ? (requiredBytes-remainingBytes) : _socketBuffer->getRemaining(), 0);
//std::cout << "i: " << bytesRead << std::endl;
if(unlikely(bytesRead<=0)) {
@@ -551,7 +788,10 @@ namespace pvAccess {
_socketBuffer->setPosition(pos+bytesRead);
}
_socketBuffer->setLimit(_socketBuffer->getPosition());
std::size_t pos = _socketBuffer->getPosition();
_storedLimit = pos;
_socketBuffer->setLimit(pos);
_socketBuffer->setPosition(currentStartPosition);
/*
@@ -570,6 +810,10 @@ namespace pvAccess {
}
if(likely(_stage==PROCESS_HEADER)) {
// reveal what's already in buffer
_socketBuffer->setLimit(_storedLimit);
// ensure CAConstants.CA_MESSAGE_HEADER_SIZE bytes of data
if(unlikely(((int)_socketBuffer->getRemaining())<CA_MESSAGE_HEADER_SIZE))
processReadCached(true, PROCESS_HEADER, CA_MESSAGE_HEADER_SIZE);
@@ -607,19 +851,21 @@ namespace pvAccess {
else if(unlikely(type==1))
{
// control
// marker request sent
if (_command == CMD_SET_MARKER) {
#if FLOW_CONTROL
_flowControlMutex.lock();
if(_markerToSend==0)
_markerToSend = _payloadSize;
// TODO send back response
_flowControlMutex.unlock();
#endif
}
// marker received back
else if (_command == CMD_ACK_MARKER)
{
#if FLOW_CONTROL
_flowControlMutex.lock();
int difference = (int)_totalBytesSent-_payloadSize+CA_MESSAGE_HEADER_SIZE;
// overrun check
@@ -630,6 +876,7 @@ namespace pvAccess {
-difference;
// TODO if this is calculated wrong, this can be critical !!!
_flowControlMutex.unlock();
#endif
}
// set byte order
else if (_command == CMD_SET_ENDIANESS)
@@ -673,30 +920,36 @@ namespace pvAccess {
// if segmented, exit reading code
if(nestedCall&&notFirstSegment) return;
// NOTE: nested data (w/ payload) messages between segmented messages are not supported
_storedPosition = _socketBuffer->getPosition();
_storedLimit = _socketBuffer->getLimit();
_socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize, _storedLimit));
try {
// handle response
Transport::shared_pointer thisPointer = shared_from_this();
_responseHandler->handleResponse(&_socketAddress,
thisPointer, _version, _command, _payloadSize,
_socketBuffer);
} catch(...) {
//noop // TODO print?
}
// ignore segmented messages with no payload
if (likely(!notFirstSegment || _payloadSize > 0))
{
// NOTE: nested data (w/ payload) messages between segmented messages are not supported
_storedPosition = _socketBuffer->getPosition();
_storedLimit = _socketBuffer->getLimit();
_socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize, _storedLimit));
try {
// handle response
Transport::shared_pointer thisPointer = shared_from_this();
_responseHandler->handleResponse(&_socketAddress,
thisPointer, _version, _command, _payloadSize,
_socketBuffer);
} catch(...) {
//noop // TODO print?
}
_socketBuffer->setLimit(_storedLimit);
size_t newPosition = _storedPosition+_storedPayloadSize;
if(unlikely(newPosition>_storedLimit)) {
newPosition -= _storedLimit;
_socketBuffer->setPosition(_storedLimit);
processReadCached(true, PROCESS_PAYLOAD,newPosition);
newPosition += _startPosition;
}
_socketBuffer->setPosition(newPosition);
// TODO discard all possible segments?!!!
_socketBuffer->setLimit(_storedLimit);
size_t newPosition = _storedPosition+_storedPayloadSize;
if(unlikely(newPosition>_storedLimit)) {
newPosition -= _storedLimit;
_socketBuffer->setPosition(_storedLimit);
processReadCached(true, PROCESS_PAYLOAD,newPosition);
newPosition += _startPosition;
}
_socketBuffer->setPosition(newPosition);
// TODO discard all possible segments?!!!
_stage = PROCESS_HEADER;
@@ -720,6 +973,7 @@ namespace pvAccess {
// start sending from the start
_sendBufferSentPosition = 0;
#if FLOW_CONTROL
// if not set skip marker otherwise set it
_flowControlMutex.lock();
int markerValue = _markerToSend;
@@ -729,6 +983,7 @@ namespace pvAccess {
_sendBufferSentPosition = CA_MESSAGE_HEADER_SIZE;
else
_sendBuffer->putInt(4, markerValue);
#endif
}
bool success = false;
@@ -842,9 +1097,11 @@ namespace pvAccess {
buffer->setPosition(buffer->getPosition()+bytesSent);
#if FLOW_CONTROL
_flowControlMutex.lock();
_totalBytesSent += bytesSent;
_flowControlMutex.unlock();
#endif
// readjust limit
if(bytesToSend==maxBytesToSend) {
@@ -886,7 +1143,11 @@ namespace pvAccess {
if(_delay>0) epicsThreadSleep(_delay);
if(unlikely(_sendQueue.empty())) {
// if (hasMonitors || sendBuffer.position() > CAConstants.CA_MESSAGE_HEADER_SIZE)
#if FLOW_CONTROL
if(((int)_sendBuffer->getPosition())>CA_MESSAGE_HEADER_SIZE)
#else
if(((int)_sendBuffer->getPosition())>0)
#endif
_flushRequested = true;
else
_sendQueueEvent.wait();
@@ -928,7 +1189,6 @@ namespace pvAccess {
flush(true);
else
endMessage(false);// automatic end (to set payload)
} catch(std::exception &e) {
//LOG(logLevelError, "%s", e.what());
_sendBuffer->setPosition(_lastMessageStartPosition);