Merged changes from default branch

This commit is contained in:
Andrew Johnson
2014-04-18 16:58:06 -05:00
21 changed files with 2373 additions and 2086 deletions

View File

@@ -95,6 +95,7 @@ namespace epics {
}
// thows io_exception, connection_closed_exception, invalid_stream_exception
void AbstractCodec::processRead() {
switch (_readMode)
{
@@ -565,7 +566,8 @@ namespace epics {
void AbstractCodec::startMessage(
epics::pvData::int8 command,
std::size_t ensureCapacity) {
std::size_t ensureCapacity,
epics::pvData::int32 payloadSize) {
_lastMessageStartPosition =
std::numeric_limits<size_t>::max(); // TODO revise this
@@ -577,7 +579,7 @@ namespace epics {
_sendBuffer->putByte(
(_lastSegmentedMessageType | _byteOrderFlag)); // data + endian
_sendBuffer->putByte(command); // command
_sendBuffer->putInt(0); // temporary zero payload
_sendBuffer->putInt(payloadSize);
// apply offset
if (_nextMessagePayloadOffset > 0)
@@ -699,11 +701,7 @@ namespace epics {
flush(false);
}
void AbstractCodec::flush(bool lastMessageCompleted) {
// automatic end
endMessage(!lastMessageCompleted);
void AbstractCodec::flushSendBuffer() {
_sendBuffer->flip();
@@ -722,6 +720,15 @@ namespace epics {
_sendBuffer->clear();
_lastMessageStartPosition = std::numeric_limits<size_t>::max();
}
void AbstractCodec::flush(bool lastMessageCompleted) {
// automatic end
endMessage(!lastMessageCompleted);
// flush send buffer
flushSendBuffer();
// start with last header
if (!lastMessageCompleted && _lastSegmentedMessageType != 0)
@@ -729,9 +736,9 @@ namespace epics {
}
// thows io_exception, connection_closed_exception
void AbstractCodec::processWrite() {
// TODO catch ConnectionClosedException, InvalidStreamException?
switch (_writeMode)
{
case PROCESS_SEND_QUEUE:
@@ -885,7 +892,7 @@ namespace epics {
std::ostringstream msg;
msg << "an exception caught while processing a send message: "
<< e.what();
LOG(logLevelWarn, "%s at %s:%d",
LOG(logLevelDebug, "%s at %s:%d",
msg.str().c_str(), __FILE__, __LINE__);
try {
@@ -935,6 +942,146 @@ namespace epics {
}
bool AbstractCodec::directSerialize(ByteBuffer* /*existingBuffer*/, const char* toSerialize,
std::size_t elementCount, std::size_t elementSize)
{
// TODO overflow check of "size_t count", overflow int32 field of payloadSize header field
// TODO max message size in connection validation
std::size_t count = elementCount * elementSize;
// TODO find smart limit
// check if direct mode actually pays off
if (count < 64*1024)
return false;
//
// first end current message, and write a header of next "directly serialized" message
//
// first end current message indicating the we will segment
endMessage(true);
// append segmented message header with payloadSize == count
// TODO size_t to int32
startMessage(_lastSegmentedMessageCommand, 0, static_cast<int32>(count));
// flush
flushSendBuffer();
// TODO think if alignment is preserved after...
//
// send toSerialize buffer
//
ByteBuffer wrappedBuffer(const_cast<char*>(toSerialize), count);
send(&wrappedBuffer);
//
// continue where we left before calling directSerialize
//
startMessage(_lastSegmentedMessageCommand, 0);
return true;
}
bool AbstractCodec::directDeserialize(ByteBuffer *existingBuffer, char* deserializeTo,
std::size_t elementCount, std::size_t elementSize)
{
return false;
// _socketBuffer == existingBuffer
/*
std::size_t bytesToBeDeserialized = elementCount*elementSize;
// TODO check if bytesToDeserialized < threshold that direct pays off?
_directPayloadRead = bytesToBeDeserialized;
_directBuffer = deserializeTo;
while (true)
{
// retrieve what's already in buffers
size_t availableBytes = min(_directPayloadRead, _socketBuffer->getRemaining());
existingBuffer->getArray(_directBuffer, availableBytes);
_directPayloadRead -= availableBytes;
if (_directPayloadRead == 0)
return true;
_directBuffer += availableBytes;
// subtract what was already processed
size_t pos = _socketBuffer->getPosition();
_storedPayloadSize -= pos -_storedPosition;
_storedPosition = pos;
// no more data and we have some payload left => read buffer
if (likely(_storedPayloadSize > 0))
{
size_t bytesToRead = std::min(_directPayloadRead, _storedPayloadSize);
processReadIntoDirectBuffer(bytesToRead);
// std::cout << "d: " << bytesToRead << std::endl;
_storedPayloadSize -= bytesToRead;
_directPayloadRead -= bytesToRead;
}
if (_directPayloadRead == 0)
return true;
_stage = PROCESS_HEADER;
processReadCached(true, UNDEFINED_STAGE, _directPayloadRead);
_storedPosition = _socketBuffer->getPosition();
_storedLimit = _socketBuffer->getLimit();
_socketBuffer->setLimit(
min(_storedPosition + _storedPayloadSize, _storedLimit)
);
}
return true;
*/
}
/*
void AbstractCodec::processReadIntoDirectBuffer(size_t bytesToRead)
{
while (bytesToRead > 0)
{
ssize_t bytesRead = recv(_channel, _directBuffer, bytesToRead, 0);
// std::cout << "d: " << bytesRead << std::endl;
if(unlikely(bytesRead<=0))
{
if (bytesRead<0)
{
int socketError = SOCKERRNO;
// interrupted or timeout
if (socketError == EINTR ||
socketError == EAGAIN ||
socketError == EWOULDBLOCK)
continue;
}
// error (disconnect, end-of-stream) detected
close();
THROW_BASE_EXCEPTION("bytesRead < 0");
return;
}
bytesToRead -= bytesRead;
_directBuffer += bytesRead;
}
}
*/
//
//
@@ -1018,10 +1165,16 @@ namespace epics {
{
try {
bac->processRead();
} catch (io_exception &e) {
} catch (connection_closed_exception &cce) {
// noop
} catch (std::exception &e) {
LOG(logLevelWarn,
"an exception caught while in receiveThread at %s:%d: %s",
"an exception caught while in sendThread at %s:%d: %s",
__FILE__, __LINE__, e.what());
} catch (...) {
LOG(logLevelError,
"unknown exception caught while in sendThread at %s:%d",
__FILE__, __LINE__);
}
}
@@ -1041,10 +1194,16 @@ namespace epics {
{
try {
bac->processWrite();
} catch (io_exception &e) {
} catch (connection_closed_exception &cce) {
// noop
} catch (std::exception &e) {
LOG(logLevelWarn,
"an exception caught while in sendThread at %s:%d: %s",
__FILE__, __LINE__, e.what());
} catch (...) {
LOG(logLevelError,
"unknown exception caught while in sendThread at %s:%d",
__FILE__, __LINE__);
}
}
@@ -1143,6 +1302,8 @@ namespace epics {
// NOTE: do not log here, you might override SOCKERRNO relevant to recv() operation above
// TODO winsock return 0 on disconnect (blocking socket) ?
if(unlikely(bytesSent<0)) {
int socketError = SOCKERRNO;
@@ -1150,6 +1311,8 @@ namespace epics {
// spurious EINTR check
if (socketError==SOCK_EINTR)
continue;
else if (socketError==SOCK_ENOBUFS)
return 0;
}
if (bytesSent > 0) {
@@ -1211,10 +1374,11 @@ namespace epics {
{
int socketError = SOCKERRNO;
// TODO SOCK_ENOBUFS, for read?
// interrupted or timeout
if (socketError == EINTR ||
if (socketError == SOCK_EINTR ||
socketError == EAGAIN ||
socketError == EWOULDBLOCK)
socketError == SOCK_EWOULDBLOCK)
continue;
}
@@ -1229,6 +1393,12 @@ namespace epics {
}
BlockingServerTCPTransportCodec::BlockingServerTCPTransportCodec(
Context::shared_pointer const & context,
SOCKET channel,
@@ -1372,7 +1542,7 @@ namespace epics {
int32_t sendBufferSize,
int32_t receiveBufferSize,
TransportClient::shared_pointer const & client,
epics::pvData::int8 remoteTransportRevision,
epics::pvData::int8 /*remoteTransportRevision*/,
float beaconInterval,
int16_t priority ) :
BlockingTCPTransportCodec(context, channel, responseHandler,