codec: BlockingTCPClientTransportCodec

This commit is contained in:
Matej Sekoranja
2014-03-09 01:36:44 +01:00
parent eb12372535
commit f2ee8de758
2 changed files with 420 additions and 19 deletions

View File

@@ -1602,4 +1602,283 @@ namespace epics {
control->flush(true);
}
}
// TODO
/*
void BlockingServerTCPTransportCodec::destroyAllChannels() {
Lock lock(_channelsMutex);
if(_channels.size()==0) return;
char ipAddrStr[64];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
LOG(
logLevelDebug,
"Transport to %s still has %u channel(s) active and closing...",
ipAddrStr, (unsigned int)_channels.size());
map<pvAccessID, ServerChannel::shared_pointer>::iterator it = _channels.begin();
for(; it!=_channels.end(); it++)
it->second->destroy();
_channels.clear();
}
void BlockingServerTCPTransportCodec::internalClose(bool force) {
Transport::shared_pointer thisSharedPtr = shared_from_this();
BlockingTCPTransport::internalClose(force);
destroyAllChannels();
}
void BlockingServerTCPTransportCodec::internalPostClose(bool forced) {
BlockingTCPTransport::internalPostClose(forced);
}
*/
BlockingClientTCPTransportCodec::BlockingClientTCPTransportCodec(
Context::shared_pointer const & context,
SOCKET channel,
std::auto_ptr<ResponseHandler>& responseHandler,
int32_t sendBufferSize,
int32_t receiveBufferSize,
TransportClient::shared_pointer const & client,
epics::pvData::int8 remoteTransportRevision,
float beaconInterval,
int16_t priority ) :
BlockingTCPTransportCodec(context, channel, responseHandler,
sendBufferSize, receiveBufferSize, priority),
_connectionTimeout(beaconInterval*1000),
_unresponsiveTransport(false),
_verifyOrEcho(true),
_verified(false)
{
// initialize owners list, send queue
acquire(client);
// use immediate for clients
//setFlushStrategy(DELAYED);
// setup connection timeout timer (watchdog) - moved to start() method
epicsTimeGetCurrent(&_aliveTimestamp);
}
void BlockingClientTCPTransportCodec::start()
{
TimerCallbackPtr tcb = std::tr1::dynamic_pointer_cast<TimerCallback>(shared_from_this());
_context->getTimer()->schedulePeriodic(tcb, _connectionTimeout, _connectionTimeout);
BlockingTCPTransportCodec::start();
}
BlockingClientTCPTransportCodec::~BlockingClientTCPTransportCodec() {
}
void BlockingClientTCPTransportCodec::callback() {
epicsTimeStamp currentTime;
epicsTimeGetCurrent(&currentTime);
_mutex.lock();
// no exception expected here
double diff = epicsTimeDiffInSeconds(&currentTime, &_aliveTimestamp);
_mutex.unlock();
if(diff>2*_connectionTimeout) {
unresponsiveTransport();
}
// use some k (3/4) to handle "jitter"
else if(diff>=((3*_connectionTimeout)/4)) {
// send echo
TransportSender::shared_pointer transportSender = std::tr1::dynamic_pointer_cast<TransportSender>(shared_from_this());
enqueueSendRequest(transportSender);
}
}
#define EXCEPTION_GUARD(code) try { code; } \
catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \
catch (...) { LOG(logLevelError, "Unhandled exception caught from code at %s:%d.", __FILE__, __LINE__); }
void BlockingClientTCPTransportCodec::unresponsiveTransport() {
Lock lock(_mutex);
if(!_unresponsiveTransport) {
_unresponsiveTransport = true;
TransportClientMap_t::iterator it = _owners.begin();
for(; it!=_owners.end(); it++) {
TransportClient::shared_pointer client = it->second.lock();
if (client)
{
EXCEPTION_GUARD(client->transportUnresponsive());
}
}
}
}
bool BlockingClientTCPTransportCodec::acquire(TransportClient::shared_pointer const & client) {
Lock lock(_mutex);
if(isClosed()) return false;
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
LOG(logLevelDebug, "Acquiring transport to %s.", ipAddrStr);
_owners[client->getID()] = TransportClient::weak_pointer(client);
//_owners.insert(TransportClient::weak_pointer(client));
return true;
}
// _mutex is held when this method is called
void BlockingClientTCPTransportCodec::internalClose(bool forced) {
// TODO !!! BlockingTCPTransportCodec::internalClose(forced);
TimerCallbackPtr tcb = std::tr1::dynamic_pointer_cast<TimerCallback>(shared_from_this());
_context->getTimer()->cancel(tcb);
}
void BlockingClientTCPTransportCodec::internalPostClose(bool forced) {
// TODO !!! BlockingTCPTransportCodec::internalPostClose(forced);
// _owners cannot change when transport is closed
closedNotifyClients();
}
/**
* Notifies clients about disconnect.
*/
void BlockingClientTCPTransportCodec::closedNotifyClients() {
// check if still acquired
size_t refs = _owners.size();
if(refs>0) {
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
LOG(
logLevelDebug,
"Transport to %s still has %d client(s) active and closing...",
ipAddrStr, refs);
TransportClientMap_t::iterator it = _owners.begin();
for(; it!=_owners.end(); it++) {
TransportClient::shared_pointer client = it->second.lock();
if (client)
{
EXCEPTION_GUARD(client->transportClosed());
}
}
}
_owners.clear();
}
//void BlockingClientTCPTransportCodec::release(TransportClient::shared_pointer const & client) {
void BlockingClientTCPTransportCodec::release(pvAccessID clientID) {
Lock lock(_mutex);
if(isClosed()) return;
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
LOG(logLevelDebug, "Releasing transport to %s.", ipAddrStr);
_owners.erase(clientID);
//_owners.erase(TransportClient::weak_pointer(client));
// not used anymore, close it
// TODO consider delayed destruction (can improve performance!!!)
if(_owners.size()==0) close(); // TODO close(false)
}
void BlockingClientTCPTransportCodec::aliveNotification() {
Lock guard(_mutex);
epicsTimeGetCurrent(&_aliveTimestamp);
if(_unresponsiveTransport) responsiveTransport();
}
bool BlockingClientTCPTransportCodec::verify(epics::pvData::int32 timeoutMs) {
return _verifiedEvent.wait(timeoutMs/1000.0);
}
void BlockingClientTCPTransportCodec::verified() {
epics::pvData::Lock lock(_verifiedMutex);
_verified = true;
_verifiedEvent.signal();
}
void BlockingClientTCPTransportCodec::responsiveTransport() {
Lock lock(_mutex);
if(_unresponsiveTransport) {
_unresponsiveTransport = false;
Transport::shared_pointer thisSharedPtr = shared_from_this();
TransportClientMap_t::iterator it = _owners.begin();
for(; it!=_owners.end(); it++) {
TransportClient::shared_pointer client = it->second.lock();
if (client)
{
EXCEPTION_GUARD(client->transportResponsive(thisSharedPtr));
}
}
}
}
void BlockingClientTCPTransportCodec::changedTransport() {
_outgoingIR.reset();
Lock lock(_mutex);
TransportClientMap_t::iterator it = _owners.begin();
for(; it!=_owners.end(); it++) {
TransportClient::shared_pointer client = it->second.lock();
if (client)
{
EXCEPTION_GUARD(client->transportChanged());
}
}
}
void BlockingClientTCPTransportCodec::send(ByteBuffer* buffer,
TransportSendControl* control) {
if(_verifyOrEcho) {
/*
* send verification response message
*/
control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32)+sizeof(int16));
// receive buffer size
buffer->putInt(static_cast<int32>(getReceiveBufferSize()));
// socket receive buffer size
buffer->putInt(static_cast<int32>(getSocketReceiveBufferSize()));
// connection priority
buffer->putShort(getPriority());
// send immediately
control->flush(true);
_verifyOrEcho = false;
}
else {
control->startMessage(CMD_ECHO, 0);
// send immediately
control->flush(true);
}
}
}

View File

@@ -648,7 +648,7 @@ namespace epics {
):
BlockingSocketAbstractCodec(channel, sendBufferSize, receiveBufferSize),
_context(context), _responseHandler(responseHandler),
_verified(false), _remoteTransportReceiveBufferSize(MAX_TCP_RECV),
_remoteTransportReceiveBufferSize(MAX_TCP_RECV),
_remoteTransportRevision(0), _priority(priority)
{
LOG(logLevelTrace,
@@ -656,22 +656,18 @@ namespace epics {
epicsThreadGetIdSelf());
}
private:
Context::shared_pointer _context;
std::auto_ptr<ResponseHandler> _responseHandler;
bool _verified;
size_t _remoteTransportReceiveBufferSize;
epics::pvData::int8 _remoteTransportRevision;
epics::pvData::int16 _priority;
osiSockAddr _socketAddress;
epics::pvData::Mutex _verifiedMutex;
epics::pvData::Event _verifiedEvent;
IntrospectionRegistry _incomingIR;
IntrospectionRegistry _outgoingIR;
private:
std::auto_ptr<ResponseHandler> _responseHandler;
size_t _remoteTransportReceiveBufferSize;
epics::pvData::int8 _remoteTransportRevision;
epics::pvData::int16 _priority;
};
@@ -757,14 +753,6 @@ namespace epics {
// noop
}
void acquire() {
// noop, since does not make sence on itself
}
void release() {
// noop, since does not make sence on itself
}
bool verify(epics::pvData::int32 timeoutMs) {
LOG(logLevelTrace,
@@ -806,6 +794,140 @@ namespace epics {
epics::pvData::Mutex _channelsMutex;
};
class BlockingClientTCPTransportCodec :
public BlockingTCPTransportCodec,
public TransportSender,
public epics::pvData::TimerCallback {
public:
POINTER_DEFINITIONS(BlockingClientTCPTransportCodec);
protected:
BlockingClientTCPTransportCodec(
Context::shared_pointer const & context,
SOCKET channel,
std::auto_ptr<ResponseHandler>& responseHandler,
int32_t sendBufferSize,
int32_t receiveBufferSize,
TransportClient::shared_pointer const & client,
epics::pvData::int8 remoteTransportRevision,
float beaconInterval,
int16_t priority );
public:
static shared_pointer create(
Context::shared_pointer const & context,
SOCKET channel,
std::auto_ptr<ResponseHandler>& responseHandler,
int32_t sendBufferSize,
int32_t receiveBufferSize,
TransportClient::shared_pointer const & client,
int8_t remoteTransportRevision,
float beaconInterval,
int16_t priority )
{
shared_pointer thisPointer(
new BlockingClientTCPTransportCodec(
context, channel, responseHandler,
sendBufferSize, receiveBufferSize,
client, remoteTransportRevision,
beaconInterval, priority)
);
thisPointer->activate();
return thisPointer;
}
public:
void start();
virtual ~BlockingClientTCPTransportCodec();
virtual void timerStopped() {
// noop
}
virtual void callback();
bool acquire(TransportClient::shared_pointer const & client);
void release(pvAccessID clientId);
void changedTransport();
void lock() {
// noop
}
void unlock() {
// noop
}
bool verify(epics::pvData::int32 timeoutMs);
void verified();
void aliveNotification();
void send(epics::pvData::ByteBuffer* buffer,
TransportSendControl* control);
protected:
virtual void internalClose(bool force);
virtual void internalPostClose(bool force);
private:
/**
* Owners (users) of the transport.
*/
// TODO consider using TR1 hash map
typedef std::map<pvAccessID, TransportClient::weak_pointer> TransportClientMap_t;
TransportClientMap_t _owners;
/**
* Connection timeout (no-traffic) flag.
*/
double _connectionTimeout;
/**
* Unresponsive transport flag.
*/
bool _unresponsiveTransport;
/**
* Timestamp of last "live" event on this transport.
*/
epicsTimeStamp _aliveTimestamp;
bool _verifyOrEcho;
/**
* Unresponsive transport notify.
*/
void unresponsiveTransport();
/**
* Notifies clients about disconnect.
*/
void closedNotifyClients();
/**
* Responsive transport notify.
*/
void responsiveTransport();
epics::pvData::Mutex _mutex;
bool _verified;
epics::pvData::Mutex _verifiedMutex;
epics::pvData::Event _verifiedEvent;
};
}
}