TCP vs UDP initialization order, logging handling

This commit is contained in:
Matej Sekoranja
2014-03-26 14:02:16 +01:00
parent 3c7bac27fd
commit b2d2734b58
4 changed files with 57 additions and 42 deletions

View File

@@ -131,9 +131,9 @@ namespace epics {
if (magicCode != PVA_MAGIC)
{
LOG(logLevelError,
"Invalid header received from the client at %s:%d: %d,"
"Invalid header received from the client at %s:%d: %s,"
" disconnecting...",
__FILE__, __LINE__, getLastReadBufferSocketAddress());
__FILE__, __LINE__, inetAddressToString(*getLastReadBufferSocketAddress()).c_str());
invalidDataStreamHandler();
throw invalid_data_stream_exception("invalid header received");
}
@@ -168,8 +168,8 @@ namespace epics {
{
LOG(logLevelWarn,
"Not-a-frst segmented message received in normal mode"
" from the client at %s:%d: %d, disconnecting...",
__FILE__, __LINE__, getLastReadBufferSocketAddress());
" from the client at %s:%d: %s, disconnecting...",
__FILE__, __LINE__, inetAddressToString(*getLastReadBufferSocketAddress()).c_str());
invalidDataStreamHandler();
throw invalid_data_stream_exception(
"not-a-first segmented message received in normal mode");
@@ -260,9 +260,9 @@ namespace epics {
// TODO we do not handle this for now (maybe never)
LOG(logLevelWarn,
"unprocessed read buffer from client at %s:%d: %d,"
"unprocessed read buffer from client at %s:%d: %s,"
" disconnecting...",
__FILE__, __LINE__, getLastReadBufferSocketAddress());
__FILE__, __LINE__, inetAddressToString(*getLastReadBufferSocketAddress()).c_str());
invalidDataStreamHandler();
throw invalid_data_stream_exception(
"unprocessed read buffer");
@@ -299,8 +299,8 @@ namespace epics {
{
LOG(logLevelWarn,
"Not-a-first segmented message expected from the client at"
" %s:%d: %d, disconnecting...",
__FILE__, __LINE__, getLastReadBufferSocketAddress());
" %s:%d: %s, disconnecting...",
__FILE__, __LINE__, inetAddressToString(*getLastReadBufferSocketAddress()).c_str());
invalidDataStreamHandler();
throw new invalid_data_stream_exception(
"not-a-first segmented message expected");
@@ -1162,14 +1162,17 @@ namespace epics {
const {
osiSocklen_t intLen = sizeof(int);
char strBuffer[64];
int socketRecvBufferSize;
int retval = getsockopt(_channel, SOL_SOCKET, SO_RCVBUF,
(char *)&socketRecvBufferSize, &intLen);
if(retval<0) {
epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer));
//LOG(logLevelDebug, "Error getting SO_SNDBUF: %s", strBuffer);
if (IS_LOGGABLE(logLevelDebug))
{
char strBuffer[64];
epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer));
LOG(logLevelDebug, "Error getting SO_SNDBUF: %s", strBuffer);
}
}
return socketRecvBufferSize;
@@ -1327,13 +1330,15 @@ namespace epics {
Lock lock(_channelsMutex);
if(_channels.size()==0) return;
char ipAddrStr[64];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
LOG(
logLevelDebug,
"Transport to %s still has %zd channel(s) active and closing...",
ipAddrStr, _channels.size());
if (IS_LOGGABLE(logLevelDebug))
{
char ipAddrStr[64];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
LOG(
logLevelDebug,
"Transport to %s still has %zd channel(s) active and closing...",
ipAddrStr, _channels.size());
}
std::map<pvAccessID, ServerChannel::shared_pointer>::iterator it = _channels.begin();
for(; it!=_channels.end(); it++)
@@ -1443,9 +1448,12 @@ namespace epics {
Lock lock(_mutex);
if(isClosed()) return false;
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
LOG(logLevelDebug, "Acquiring transport to %s.", ipAddrStr);
if (IS_LOGGABLE(logLevelDebug))
{
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
LOG(logLevelDebug, "Acquiring transport to %s.", ipAddrStr);
}
_owners[client->getID()] = TransportClient::weak_pointer(client);
//_owners.insert(TransportClient::weak_pointer(client));
@@ -1476,12 +1484,16 @@ namespace epics {
// check if still acquired
size_t refs = _owners.size();
if(refs>0) {
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
LOG(
logLevelDebug,
"Transport to %s still has %d client(s) active and closing...",
ipAddrStr, refs);
if (IS_LOGGABLE(logLevelDebug))
{
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
LOG(
logLevelDebug,
"Transport to %s still has %d client(s) active and closing...",
ipAddrStr, refs);
}
TransportClientMap_t::iterator it = _owners.begin();
for(; it!=_owners.end(); it++) {
@@ -1502,10 +1514,12 @@ namespace epics {
Lock lock(_mutex);
if(isClosed()) return;
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
LOG(logLevelDebug, "Releasing transport to %s.", ipAddrStr);
if (IS_LOGGABLE(logLevelDebug))
{
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
LOG(logLevelDebug, "Releasing transport to %s.", ipAddrStr);
}
_owners.erase(clientID);
//_owners.erase(TransportClient::weak_pointer(client));

View File

@@ -220,7 +220,7 @@ namespace epics {
virtual void processControlMessage() = 0;
virtual void processApplicationMessage() = 0;
virtual osiSockAddr getLastReadBufferSocketAddress() = 0;
virtual const osiSockAddr* getLastReadBufferSocketAddress() = 0;
virtual void invalidDataStreamHandler() = 0;
virtual void readPollOne()=0;
virtual void writePollOne() = 0;
@@ -383,7 +383,7 @@ namespace epics {
int read(epics::pvData::ByteBuffer* dst);
int write(epics::pvData::ByteBuffer* src);
osiSockAddr getLastReadBufferSocketAddress() { return _socketAddress;}
const osiSockAddr* getLastReadBufferSocketAddress() { return &_socketAddress; }
void invalidDataStreamHandler();
std::size_t getSocketReceiveBufferSize() const;
@@ -488,7 +488,7 @@ namespace epics {
bool directSerialize(
epics::pvData::ByteBuffer * /*existingBuffer*/,
const char* /*toSerialize*/,
std::size_t /*elementCount*/, std::size_t /*elementSize*/)
std::size_t /*elementCount*/, std::size_t /*elementSize*/)
{
// TODO !!!!
return false;

View File

@@ -204,15 +204,15 @@ void ServerContextImpl::internalInitialize()
_timer.reset(new Timer("pvAccess-server timer", lowerPriority));
_transportRegistry.reset(new TransportRegistry());
// setup broadcast UDP transport
initializeBroadcastTransport();
ServerContextImpl::shared_pointer thisServerContext = shared_from_this();
_acceptor.reset(new BlockingTCPAcceptor(thisServerContext, thisServerContext, _serverPort, _receiveBufferSize));
_serverPort = ntohs(_acceptor->getBindAddress()->ia.sin_port);
_beaconEmitter.reset(new BeaconEmitter(_broadcastTransport, thisServerContext));
// setup broadcast UDP transport
initializeBroadcastTransport();
_beaconEmitter.reset(new BeaconEmitter(_broadcastTransport, thisServerContext));
}
void ServerContextImpl::initializeBroadcastTransport()

View File

@@ -84,7 +84,8 @@ namespace epics {
_disconnected(false),
_forcePayloadRead(-1),
_readBuffer(new ByteBuffer(receiveBufferSize)),
_writeBuffer(sendBufferSize)
_writeBuffer(sendBufferSize),
_dummyAddress()
{
}
@@ -273,10 +274,9 @@ namespace epics {
return _sendBuffer;
}
osiSockAddr getLastReadBufferSocketAddress()
const osiSockAddr* getLastReadBufferSocketAddress()
{
osiSockAddr tmp = {{0}};
return tmp;
return &_dummyAddress;
}
void invalidDataStreamHandler() { _invalidDataStreamCount++; }
@@ -380,6 +380,7 @@ namespace epics {
std::auto_ptr<ReadPollOneCallback> _readPollOneCallback;
std::auto_ptr<WritePollOneCallback> _writePollOneCallback;
osiSockAddr _dummyAddress;
protected: