completed merge

This commit is contained in:
Matej Sekoranja
2014-06-10 00:09:08 +02:00
61 changed files with 3280 additions and 1955 deletions

View File

@@ -133,7 +133,7 @@ namespace epics {
if (magicCode != PVA_MAGIC)
{
LOG(logLevelError,
"Invalid header received from the client at %s:%d: %s,"
"Invalid header received from the client at %s:%d: %s.,"
" disconnecting...",
__FILE__, __LINE__, inetAddressToString(*getLastReadBufferSocketAddress()).c_str());
invalidDataStreamHandler();
@@ -155,7 +155,13 @@ namespace epics {
if (!readToBuffer(PVA_MESSAGE_HEADER_SIZE, false)) {
return;
}
/*
hexDump("Header", (const int8*)_socketBuffer->getArray(),
_socketBuffer->getPosition(), PVA_MESSAGE_HEADER_SIZE);
*/
// read header fields
processHeader();
bool isControl = ((_flags & 0x01) == 0x01);
@@ -174,7 +180,7 @@ namespace epics {
continue;
LOG(logLevelWarn,
"Not-a-frst segmented message received in normal mode"
"Not-a-first segmented message received in normal mode"
" from the client at %s:%d: %s, disconnecting...",
__FILE__, __LINE__, inetAddressToString(*getLastReadBufferSocketAddress()).c_str());
invalidDataStreamHandler();
@@ -402,7 +408,7 @@ namespace epics {
msg << "requested for buffer size " << size
<< ", but maximum " << MAX_ENSURE_DATA_SIZE << " is allowed.";
LOG(logLevelWarn,
"%s at %s:%d,", msg.str().c_str(), __FILE__, __LINE__);
"%s at %s:%d.,", msg.str().c_str(), __FILE__, __LINE__);
std::string s = msg.str();
throw std::invalid_argument(s);
}
@@ -688,7 +694,7 @@ namespace epics {
size << ", but only " << _maxSendPayloadSize << " available.";
std::string s = msg.str();
LOG(logLevelWarn,
"%s at %s:%d,", msg.str().c_str(), __FILE__, __LINE__);
"%s at %s:%d.,", msg.str().c_str(), __FILE__, __LINE__);
throw std::invalid_argument(s);
}
@@ -892,7 +898,7 @@ namespace epics {
std::ostringstream msg;
msg << "an exception caught while processing a send message: "
<< e.what();
LOG(logLevelDebug, "%s at %s:%d",
LOG(logLevelWarn, "%s at %s:%d.",
msg.str().c_str(), __FILE__, __LINE__);
try {
@@ -1165,15 +1171,13 @@ namespace epics {
{
try {
bac->processRead();
} catch (connection_closed_exception &cce) {
// noop
} catch (std::exception &e) {
LOG(logLevelWarn,
"an exception caught while in sendThread at %s:%d: %s",
__FILE__, __LINE__, e.what());
} catch (...) {
LOG(logLevelError,
"unknown exception caught while in sendThread at %s:%d",
LOG(logLevelWarn,
"unknown exception caught while in sendThread at %s:%d.",
__FILE__, __LINE__);
}
}
@@ -1195,14 +1199,19 @@ namespace epics {
try {
bac->processWrite();
} catch (connection_closed_exception &cce) {
// noop
// noop
/*
LOG(logLevelDebug,
"connection closed by remote host while in sendThread at %s:%d: %s",
__FILE__, __LINE__, e.what());
*/
} catch (std::exception &e) {
LOG(logLevelWarn,
"an exception caught while in sendThread at %s:%d: %s",
__FILE__, __LINE__, e.what());
} catch (...) {
LOG(logLevelError,
"unknown exception caught while in sendThread at %s:%d",
LOG(logLevelWarn,
"unknown exception caught while in sendThread at %s:%d.",
__FILE__, __LINE__);
}
}
@@ -1250,37 +1259,47 @@ namespace epics {
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
LOG(logLevelError,
"Error fetching socket remote address: %s",
"Error fetching socket remote address: %s.",
errStr);
}
// set receive timeout so that we do not have problems at
//shutdown (recvfrom would block)
struct timeval timeout;
memset(&timeout, 0, sizeof(struct timeval));
timeout.tv_sec = 1;
timeout.tv_usec = 0;
// TODO remove this and implement use epicsSocketSystemCallInterruptMechanismQuery
if (unlikely(::setsockopt (_channel, SOL_SOCKET, SO_RCVTIMEO,
(char*)&timeout, sizeof(timeout)) < 0))
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
LOG(logLevelError,
"Failed to set SO_RCVTIMEO for TDP socket %s: %s.",
inetAddressToString(_socketAddress).c_str(), errStr);
}
}
// must be called only once, when there will be no operation on socket (e.g. just before tx/rx thread exists)
void BlockingSocketAbstractCodec::internalDestroy() {
if(_channel != INVALID_SOCKET) {
epicsSocketDestroy(_channel);
_channel = INVALID_SOCKET;
}
if(_channel != INVALID_SOCKET) {
epicsSocketSystemCallInterruptMechanismQueryInfo info =
epicsSocketSystemCallInterruptMechanismQuery ();
switch ( info )
{
case esscimqi_socketCloseRequired:
epicsSocketDestroy ( _channel );
break;
case esscimqi_socketBothShutdownRequired:
{
/*int status =*/ ::shutdown ( _channel, SHUT_RDWR );
/*
if ( status ) {
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
LOG(logLevelDebug,
"TCP socket to %s failed to shutdown: %s.",
inetAddressToString(_socketAddress).c_str(), sockErrBuf);
}
*/
epicsSocketDestroy ( _channel );
}
break;
case esscimqi_socketSigAlarmRequired:
// not supported anymore anyway
default:
epicsSocketDestroy(_channel);
}
_channel = INVALID_SOCKET;
}
}
@@ -1393,6 +1412,38 @@ namespace epics {
}
void BlockingTCPTransportCodec::internalClose(bool force) {
BlockingSocketAbstractCodec::internalClose(force);
if (IS_LOGGABLE(logLevelDebug))
{
LOG(logLevelDebug,
"TCP socket to %s closed.",
inetAddressToString(_socketAddress).c_str());
}
}
bool BlockingTCPTransportCodec::verify(epics::pvData::int32 timeoutMs) {
return _verifiedEvent.wait(timeoutMs/1000.0);
}
void BlockingTCPTransportCodec::verified(epics::pvData::Status const & status) {
epics::pvData::Lock lock(_verifiedMutex);
if (IS_LOGGABLE(logLevelDebug) && !status.isOK())
{
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
LOG(logLevelDebug, "Failed to verify connection to %s: %s.", ipAddrStr, status.getMessage().c_str());
// TODO stack dump
}
_verified = status.isSuccess();
_verifiedEvent.signal();
}
@@ -1407,7 +1458,7 @@ namespace epics {
int32_t receiveBufferSize) :
BlockingTCPTransportCodec(context, channel, responseHandler,
sendBufferSize, receiveBufferSize, PVA_DEFAULT_PRIORITY),
_lastChannelSID(0)
_lastChannelSID(0), _verifyOrVerified(false)
{
// NOTE: priority not yet known, default priority is used to
@@ -1473,33 +1524,59 @@ namespace epics {
void BlockingServerTCPTransportCodec::send(ByteBuffer* buffer,
TransportSendControl* control) {
//
// set byte order control message
//
if (!_verifyOrVerified)
{
_verifyOrVerified = true;
ensureBuffer(PVA_MESSAGE_HEADER_SIZE);
buffer->putByte(PVA_MAGIC);
buffer->putByte(PVA_VERSION);
buffer->putByte(
0x01 | ((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG)
? 0x80 : 0x00)); // control + big endian
buffer->putByte(2); // set byte order
buffer->putInt(0);
//
// set byte order control message
//
ensureBuffer(PVA_MESSAGE_HEADER_SIZE);
buffer->putByte(PVA_MAGIC);
buffer->putByte(PVA_VERSION);
buffer->putByte(
0x01 | ((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG)
? 0x80 : 0x00)); // control + big endian
buffer->putByte(2); // set byte order
buffer->putInt(0);
//
// send verification message
//
control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32));
//
// send verification message
//
control->startMessage(CMD_CONNECTION_VALIDATION, 4+2);
// receive buffer size
buffer->putInt(static_cast<int32>(getReceiveBufferSize()));
// receive buffer size
buffer->putInt(static_cast<int32>(getReceiveBufferSize()));
// socket receive buffer size
buffer->putInt(static_cast<int32>(getSocketReceiveBufferSize()));
// server introspection registy max size
// TODO
buffer->putShort(0x7FFF);
// send immediately
control->flush(true);
// list of authNZ plugin names
// TODO
buffer->putByte(0);
// send immediately
control->flush(true);
}
else
{
//
// send verified message
//
control->startMessage(CMD_CONNECTION_VALIDATED, 0);
{
Lock lock(_verificationStatusMutex);
_verificationStatus.serialize(buffer, control);
}
// send immediately
control->flush(true);
}
}
void BlockingServerTCPTransportCodec::destroyAllChannels() {
@@ -1549,8 +1626,7 @@ namespace epics {
sendBufferSize, receiveBufferSize, priority),
_connectionTimeout(beaconInterval*1000),
_unresponsiveTransport(false),
_verifyOrEcho(true),
_verified(false)
_verifyOrEcho(true)
{
// initialize owners list, send queue
acquire(client);
@@ -1694,7 +1770,7 @@ namespace epics {
{
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
LOG(logLevelDebug, "Releasing transport to %s.", ipAddrStr);
LOG(logLevelDebug, "Releasing TCP transport to %s.", ipAddrStr);
}
_owners.erase(clientID);
@@ -1711,16 +1787,6 @@ namespace epics {
if(_unresponsiveTransport) responsiveTransport();
}
bool BlockingClientTCPTransportCodec::verify(epics::pvData::int32 timeoutMs) {
return _verifiedEvent.wait(timeoutMs/1000.0);
}
void BlockingClientTCPTransportCodec::verified() {
epics::pvData::Lock lock(_verifiedMutex);
_verified = true;
_verifiedEvent.signal();
}
void BlockingClientTCPTransportCodec::responsiveTransport() {
Lock lock(_mutex);
if(_unresponsiveTransport) {
@@ -1755,25 +1821,30 @@ namespace epics {
void BlockingClientTCPTransportCodec::send(ByteBuffer* buffer,
TransportSendControl* control) {
if(_verifyOrEcho) {
_verifyOrEcho = false;
/*
* send verification response message
*/
control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32)+sizeof(int16));
control->startMessage(CMD_CONNECTION_VALIDATION, 4+2+2);
// receive buffer size
buffer->putInt(static_cast<int32>(getReceiveBufferSize()));
// socket receive buffer size
buffer->putInt(static_cast<int32>(getSocketReceiveBufferSize()));
// max introspection registry size
// TODO
buffer->putShort(0x7FFF);
// connection priority
// QoS (aka connection priority)
buffer->putShort(getPriority());
// authNZ plugin name
// TODO
SerializeHelper::serializeString("", buffer, control);
// send immediately
control->flush(true);
_verifyOrEcho = false;
}
else {
control->startMessage(CMD_ECHO, 0);