Makefile: included all the new sources

blockingClientTCPTransport.cpp: implementation

blockingTCP.h:
* class BlockingTCPTransport:
  - added Context to ctor
  - added 'virtual' declaration to overrides
  - 'priority' is now 'int16'
* added class 'BlockingClientTCPTransport'

blockingTCPConnector.cpp: implementation

blockingTCPTransport.cpp:
* removed 'transportRegistry' added 'context'

blockingUDP.h:
* added missing override 'getIntrospectionRegistry'
* 'BlockingUDPConnector::_priority' is now 'int16' instead of 'short'

blockingUDPConnector.cpp:
* 'connect' parameter priority is now 'int16' instead of 'short'
* fixed and added error logging

remote.h:
* added 'TransportRegistry' forward declaration
* added 'Transport::getIntrospectionRegistry' prototype
* changed 'Connector::connect' prototype parameter 'priority': 'short'->'int16'
* added 'Context' interface
* added 'ReferenceCountingTransport' interface

transportRegistry.h:
* added 'Transport' forward declaration

testRemoteClientImpl.cpp:
* added '#include <transportRegistry.h>' to fix compile error
* lots of auto-format changes

transportRegistryTest.cpp:
*
This commit is contained in:
miha_vitorovic
2011-01-04 11:58:00 +01:00
parent 9b99f6f389
commit bd1a4e2634
11 changed files with 775 additions and 158 deletions

View File

@@ -43,11 +43,15 @@ INC += blockingUDP.h
INC += beaconEmitter.h
INC += beaconServerStatusProvider.h
INC += beaconHandler.h
INC += blockingTCP.h
LIBSRCS += blockingUDPTransport.cpp
LIBSRCS += blockingUDPConnector.cpp
LIBSRCS += beaconEmitter.cpp
LIBSRCS += beaconServerStatusProvider.cpp
LIBSRCS += beaconHandler.cpp
LIBSRCS += blockingTCPTransport.cpp
LIBSRCS += blockingClientTCPTransport.cpp
LIBSRCS += blockingTCPConnector.cpp
LIBRARY = pvAccess
pvAccess_LIBS += Com

View File

@@ -0,0 +1,216 @@
/*
* BlockingClientTCPTransport.cpp
*
* Created on: Jan 3, 2011
* Author: Miha Vitorovic
*/
/* pvAccess */
#include "blockingTCP.h"
#include "introspectionRegistry.h"
/* pvData */
#include <lock.h>
/* EPICSv3 */
#include <errlog.h>
/* standard */
#include <set>
#include <epicsTime.h>
#include <sstream>
using std::set;
using namespace epics::pvData;
namespace epics {
namespace pvAccess {
BlockingClientTCPTransport::BlockingClientTCPTransport(
Context* context, SOCKET channel,
ResponseHandler* responseHandler, int receiveBufferSize,
TransportClient* client, short remoteTransportRevision,
float beaconInterval, int16 priority) :
BlockingTCPTransport(context, channel, responseHandler,
receiveBufferSize, priority), _introspectionRegistry(
new IntrospectionRegistry(false)), _owners(new set<
TransportClient*> ()), _connectionTimeout(beaconInterval
*1000), _unresponsiveTransport(false), _timerNode(
new TimerNode(this)), _mutex(new Mutex()), _ownersMutex(
new Mutex()), _verifyOrEcho(true) {
// initialize owners list, send queue
acquire(client);
// use immediate for clients
setSendQueueFlushStrategy(IMMEDIATE);
// setup connection timeout timer (watchdog)
epicsTimeGetCurrent(const_cast<epicsTimeStamp*> (&_aliveTimestamp));
context->getTimer()->schedulePeriodic(_timerNode, beaconInterval,
beaconInterval);
start();
}
BlockingClientTCPTransport::~BlockingClientTCPTransport() {
delete _introspectionRegistry;
delete _owners;
delete _timerNode;
delete _mutex;
delete _ownersMutex;
}
void BlockingClientTCPTransport::callback() {
epicsTimeStamp currentTime;
epicsTimeGetCurrent(&currentTime);
double diff = epicsTimeDiffInSeconds(&currentTime,
const_cast<epicsTimeStamp*> (&_aliveTimestamp));
if(diff>2*_connectionTimeout) {
unresponsiveTransport();
}
else if(diff>_connectionTimeout) {
// send echo
enqueueSendRequest(this);
}
}
void BlockingClientTCPTransport::unresponsiveTransport() {
if(!_unresponsiveTransport) {
_unresponsiveTransport = true;
Lock lock(_ownersMutex);
set<TransportClient*>::iterator it = _owners->begin();
for(; it!=_owners->end(); it++)
(*it)->transportUnresponsive();
}
}
bool BlockingClientTCPTransport::acquire(TransportClient* client) {
Lock lock(_mutex);
if(_closed) return false;
char ipAddrStr[48];
ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr));
errlogSevPrintf(errlogInfo, "Acquiring transport to %s.", ipAddrStr);
_ownersMutex->lock();
if(_closed) return false;
_owners->insert(client);
_ownersMutex->unlock();
return true;
}
void BlockingClientTCPTransport::internalClose(bool forced) {
BlockingTCPTransport::internalClose(forced);
_timerNode->cancel();
closedNotifyClients();
}
/**
* Notifies clients about disconnect.
*/
void BlockingClientTCPTransport::closedNotifyClients() {
Lock lock(_ownersMutex);
// check if still acquired
int refs = _owners->size();
if(refs>0) {
ostringstream temp;
char ipAddrStr[48];
ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr));
temp<<"Transport to "<<ipAddrStr<<" still has "<<refs;
temp<<" client(s) active and closing...";
errlogSevPrintf(errlogInfo, temp.str().c_str());
set<TransportClient*>::iterator it = _owners->begin();
for(; it!=_owners->end(); it++)
(*it)->transportClosed();
}
_owners->clear();
}
void BlockingClientTCPTransport::release(TransportClient* client) {
if(_closed) return;
char ipAddrStr[48];
ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr));
errlogSevPrintf(errlogInfo, "Releasing transport to %s.", ipAddrStr);
Lock lock(_ownersMutex);
_owners->erase(client);
// not used anymore
// TODO consider delayed destruction (can improve performance!!!)
if(_owners->size()==0) close(false);
}
void BlockingClientTCPTransport::aliveNotification() {
epicsTimeGetCurrent(const_cast<epicsTimeStamp*> (&_aliveTimestamp));
if(_unresponsiveTransport) responsiveTransport();
}
void BlockingClientTCPTransport::responsiveTransport() {
if(_unresponsiveTransport) {
_unresponsiveTransport = false;
Lock lock(_ownersMutex);
set<TransportClient*>::iterator it = _owners->begin();
for(; it!=_owners->end(); it++)
(*it)->transportResponsive(this);
}
}
void BlockingClientTCPTransport::changedTransport() {
_introspectionRegistry->reset();
Lock lock(_ownersMutex);
set<TransportClient*>::iterator it = _owners->begin();
for(; it!=_owners->end(); it++)
(*it)->transportChanged();
}
void BlockingClientTCPTransport::send(ByteBuffer* buffer,
TransportSendControl* control) {
if(_verifyOrEcho) {
/*
* send verification response message
*/
control->startMessage(1, 2*sizeof(int32)+sizeof(int16));
// receive buffer size
buffer->putInt(getReceiveBufferSize());
// socket receive buffer size
buffer->putInt(getSocketReceiveBufferSize());
// connection priority
buffer->putShort(getPriority());
// send immediately
control->flush(true);
_verifyOrEcho = false;
}
else {
control->startMessage(2, 0);
// send immediately
control->flush(true);
}
}
}
}

View File

@@ -13,16 +13,22 @@
#include "remote.h"
#include "growingCircularBuffer.h"
#include "transportRegistry.h"
#include "introspectionRegistry.h"
/* pvData */
#include <byteBuffer.h>
#include <pvType.h>
#include <lock.h>
#include <epicsThread.h>
#include <timer.h>
/* EPICSv3 */
#include <osdSock.h>
#include <osiSock.h>
#include <epicsTime.h>
/* standard */
#include <set>
namespace epics {
namespace pvAccess {
@@ -40,27 +46,27 @@ namespace epics {
class BlockingTCPTransport : public Transport,
public TransportSendControl {
public:
BlockingTCPTransport(SOCKET channel,
BlockingTCPTransport(Context* context, SOCKET channel,
ResponseHandler* responseHandler, int receiveBufferSize,
short priority, TransportRegistry* transportRegistry);
int16 priority);
~BlockingTCPTransport();
virtual ~BlockingTCPTransport();
bool isClosed() const {
virtual bool isClosed() const {
return _closed;
}
void setRemoteMinorRevision(int minorRevision) {
virtual void setRemoteMinorRevision(int8 minorRevision) {
_remoteTransportRevision = minorRevision;
}
void setRemoteTransportReceiveBufferSize(
virtual void setRemoteTransportReceiveBufferSize(
int remoteTransportReceiveBufferSize) {
_remoteTransportReceiveBufferSize
= remoteTransportReceiveBufferSize;
}
void setRemoteTransportSocketReceiveBufferSize(
virtual void setRemoteTransportSocketReceiveBufferSize(
int socketReceiveBufferSize) {
_remoteTransportSocketReceiveBufferSize
= socketReceiveBufferSize;
@@ -195,9 +201,10 @@ namespace epics {
/**
* Priority.
* NOTE: Priority cannot just be changed, since it is registered in transport registry with given priority.
* NOTE: Priority cannot just be changed, since it is registered
* in transport registry with given priority.
*/
short _priority;
int16 _priority;
// TODO to be implemeneted
/**
@@ -326,7 +333,7 @@ namespace epics {
MonitorSender* _monitorSender;
TransportRegistry* _transportRegistry;
Context* _context;
/**
* Internal method that clears and releases buffer.
@@ -352,6 +359,171 @@ namespace epics {
TransportSender* extractFromSendQueue();
};
class BlockingClientTCPTransport : public BlockingTCPTransport,
public TransportSender,
public epics::pvData::TimerCallback,
public ReferenceCountingTransport {
public:
BlockingClientTCPTransport(Context* context, SOCKET channel,
ResponseHandler* responseHandler, int receiveBufferSize,
TransportClient* client, short remoteTransportRevision,
float beaconInterval, int16 priority);
virtual ~BlockingClientTCPTransport();
virtual void timerStopped() {
// noop
}
virtual void callback();
/**
* Acquires transport.
* @param client client (channel) acquiring the transport
* @return <code>true</code> if transport was granted, <code>false</code> otherwise.
*/
virtual bool acquire(TransportClient* client);
virtual IntrospectionRegistry* getIntrospectionRegistry() {
return _introspectionRegistry;
}
/**
* Releases transport.
* @param client client (channel) releasing the transport
*/
virtual void release(TransportClient* client);
/**
* Alive notification.
* This method needs to be called (by newly received data or beacon)
* at least once in this period, if not echo will be issued
* and if there is not response to it, transport will be considered as unresponsive.
*/
virtual void aliveNotification();
/**
* Changed transport (server restared) notify.
*/
virtual void changedTransport();
virtual void lock() {
// noop
}
virtual void unlock() {
// noop
}
virtual void send(epics::pvData::ByteBuffer* buffer,
TransportSendControl* control);
protected:
/**
* Introspection registry.
*/
IntrospectionRegistry* _introspectionRegistry;
virtual void internalClose(bool force);
private:
/**
* Owners (users) of the transport.
*/
std::set<TransportClient*>* _owners;
/**
* Connection timeout (no-traffic) flag.
*/
double _connectionTimeout;
/**
* Unresponsive transport flag.
*/
volatile bool _unresponsiveTransport;
/**
* Timer task node.
*/
TimerNode* _timerNode;
/**
* Timestamp of last "live" event on this transport.
*/
volatile epicsTimeStamp _aliveTimestamp;
epics::pvData::Mutex* _mutex;
epics::pvData::Mutex* _ownersMutex;
bool _verifyOrEcho;
void unresponsiveTransport();
/**
* Notifies clients about disconnect.
*/
void closedNotifyClients();
/**
* Responsive transport notify.
*/
void responsiveTransport();
};
/**
* Channel Access TCP connector.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: BlockingTCPConnector.java,v 1.1 2010/05/03 14:45:47 mrkraimer Exp $
*/
class BlockingTCPConnector : public Connector {
public:
BlockingTCPConnector(Context* context, int receiveBufferSize,
float beaconInterval);
virtual ~BlockingTCPConnector();
virtual Transport* connect(TransportClient* client,
ResponseHandler* responseHandler, osiSockAddr* address,
short transportRevision, int16 priority);
private:
/**
* Lock timeout
*/
static const int LOCK_TIMEOUT = 20*1000; // 20s
/**
* Context instance.
*/
Context* _context;
/**
* Context instance.
*/
//NamedLockPattern* _namedLocker;
/**
* Receive buffer size.
*/
int _receiveBufferSize;
/**
* Beacon interval.
*/
float _beaconInterval;
/**
* Tries to connect to the given address.
* @param[in] address
* @param[in] tries
* @return the SOCKET
* @throws IOException
*/
SOCKET tryConnect(osiSockAddr* address, int tries);
};
}
}

View File

@@ -0,0 +1,174 @@
/*
* blockingTCPConnector.cpp
*
* Created on: Jan 4, 2011
* Author: Miha Vitorovic
*/
#include "blockingTCP.h"
#include "remote.h"
#include <epicsThread.h>
#include <osiSock.h>
#include <errlog.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sstream>
namespace epics {
namespace pvAccess {
BlockingTCPConnector::BlockingTCPConnector(Context* context,
int receiveBufferSize, float beaconInterval) :
_context(context), _receiveBufferSize(receiveBufferSize),
_beaconInterval(beaconInterval)
//TODO , _namedLocker(new NamedLockPattern())
{
}
BlockingTCPConnector::~BlockingTCPConnector() {
// TODO delete _namedLocker;
}
SOCKET BlockingTCPConnector::tryConnect(osiSockAddr* address, int tries) {
for(int tryCount = 0; tryCount<tries; tryCount++) {
// sleep for a while
if(tryCount>0) epicsThreadSleep(0.1);
char strBuffer[64];
ipAddrToA(&address->ia, strBuffer, sizeof(strBuffer));
errlogSevPrintf(errlogInfo,
"Opening socket to CA server %s, attempt %d.",
strBuffer, tryCount+1);
SOCKET socket = epicsSocketCreate(AF_INET, SOCK_STREAM,
IPPROTO_TCP);
if(socket==INVALID_SOCKET) {
epicsSocketConvertErrnoToString(strBuffer,
sizeof(strBuffer));
errlogSevPrintf(errlogMinor, "Socket create error: %s",
strBuffer);
}
else {
if(::connect(socket, &address->sa, sizeof(sockaddr))==0)
return socket;
else {
epicsSocketConvertErrnoToString(strBuffer,
sizeof(strBuffer));
errlogSevPrintf(errlogMinor,
"Socket connect error: %s", strBuffer);
}
}
}
return INVALID_SOCKET;
}
Transport* BlockingTCPConnector::connect(TransportClient* client,
ResponseHandler* responseHandler, osiSockAddr* address,
short transportRevision, int16 priority) {
SOCKET socket = INVALID_SOCKET;
char ipAddrStr[64];
ipAddrToA(&address->ia, ipAddrStr, sizeof(ipAddrStr));
// first try to check cache w/o named lock...
BlockingClientTCPTransport
* transport =
(BlockingClientTCPTransport*)(_context->getTransportRegistry()->get(
"TCP", address, priority));
if(transport!=NULL) {
errlogSevPrintf(errlogInfo,
"Reusing existing connection to CA server: %s",
ipAddrStr);
if(transport->acquire(client)) return transport;
}
bool lockAcquired = true;
// TODO comment out
//bool lockAcquired = _namedLocker->acquireSynchronizationObject(
// address, LOCK_TIMEOUT);
if(lockAcquired) {
try {
// ... transport created during waiting in lock
transport
= (BlockingClientTCPTransport*)(_context->getTransportRegistry()->get(
"TCP", address, priority));
if(transport!=NULL) {
errlogSevPrintf(errlogInfo,
"Reusing existing connection to CA server: %s",
ipAddrStr);
if(transport->acquire(client)) return transport;
}
errlogSevPrintf(errlogInfo, "Connecting to CA server: %s",
ipAddrStr);
socket = tryConnect(address, 3);
// use blocking channel
// socket is blocking bya default
//socket.configureBlocking(true);
// enable TCP_NODELAY (disable Nagle's algorithm)
int optval = 1; // true
int retval = ::setsockopt(socket, IPPROTO_TCP, TCP_NODELAY,
&optval, sizeof(optval));
if(retval<0) errlogSevPrintf(errlogMajor,
"Error setting TCP_NODELAY: %s", strerror(errno));
// enable TCP_KEEPALIVE
retval = ::setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE,
&optval, sizeof(optval));
if(retval<0) errlogSevPrintf(errlogMinor,
"Error setting SO_KEEPALIVE: %s", strerror(errno));
// TODO tune buffer sizes?! Win32 defaults are 8k, which is OK
//socket.socket().setReceiveBufferSize();
//socket.socket().setSendBufferSize();
// create transport
transport = new BlockingClientTCPTransport(_context, socket,
responseHandler, _receiveBufferSize, client,
transportRevision, _beaconInterval, priority);
// verify
if(!transport->waitUntilVerified(3.0)) {
transport->close(true);
errlogSevPrintf(
errlogInfo,
"Connection to CA client %s failed to be validated, closing it.",
ipAddrStr);
ostringstream temp;
temp<<"Failed to verify TCP connection to '"<<ipAddrStr
<<"'.";
THROW_BASE_EXCEPTION(temp.str().c_str());
}
// TODO send security token
errlogSevPrintf(errlogInfo, "Connected to CA server: %s",
ipAddrStr);
return transport;
} catch(...) {
// close socket, if open
if(socket!=INVALID_SOCKET) epicsSocketDestroy(socket);
// TODO namedLocker.releaseSynchronizationObject(address);
throw;
}
}
else {
ostringstream temp;
temp<<"Failed to obtain synchronization lock for '"<<ipAddrStr;
temp<<"', possible deadlock.";
THROW_BASE_EXCEPTION(temp.str().c_str());
}
}
}
}

View File

@@ -62,9 +62,9 @@ namespace epics {
GrowingCircularBuffer<TransportSender*>* _monitorSendQueue;
};
BlockingTCPTransport::BlockingTCPTransport(SOCKET channel,
ResponseHandler* responseHandler, int receiveBufferSize,
short priority, TransportRegistry* transportRegistry) :
BlockingTCPTransport::BlockingTCPTransport(Context* context,
SOCKET channel, ResponseHandler* responseHandler,
int receiveBufferSize, int16 priority) :
_closed(false), _channel(channel), _remoteTransportRevision(0),
_remoteTransportReceiveBufferSize(MAX_TCP_RECV),
_remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV),
@@ -86,8 +86,7 @@ namespace epics {
_rcvThreadId(NULL), _sendThreadId(NULL), _monitorSendQueue(
new GrowingCircularBuffer<TransportSender*> (100)),
_monitorSender(new MonitorSender(_monitorMutex,
_monitorSendQueue)), _transportRegistry(
transportRegistry) {
_monitorSendQueue)), _context(context) {
_socketBuffer = new ByteBuffer(max(MAX_TCP_RECV
+MAX_ENSURE_DATA_BUFFER_SIZE, receiveBufferSize));
@@ -124,7 +123,7 @@ namespace epics {
clearAndReleaseBuffer();
// add to registry
_transportRegistry->put(this);
_context->getTransportRegistry()->put(this);
}
BlockingTCPTransport::~BlockingTCPTransport() {
@@ -195,7 +194,7 @@ namespace epics {
_closed = true;
// remove from registry
_transportRegistry->remove(this);
_context->getTransportRegistry()->remove(this);
// clean resources
internalClose(force);

View File

@@ -17,6 +17,7 @@
#include <noDefaultMethods.h>
#include <byteBuffer.h>
#include <lock.h>
#include <epicsException.h>
/* EPICSv3 */
#include <osdSock.h>
@@ -160,6 +161,10 @@ namespace epics {
_sendAddresses = addresses;
}
virtual IntrospectionRegistry* getIntrospectionRegistry() {
THROW_BASE_EXCEPTION("not supported by UDP transport");
}
protected:
bool volatile _closed;
@@ -237,7 +242,7 @@ namespace epics {
};
class BlockingUDPConnector : public Connector,
epics::pvData::NoDefaultMethods {
public epics::pvData::NoDefaultMethods {
public:
BlockingUDPConnector(bool reuseSocket,
@@ -255,7 +260,7 @@ namespace epics {
*/
virtual Transport* connect(TransportClient* client,
ResponseHandler* responseHandler, osiSockAddr* bindAddress,
short transportRevision, short priority);
short transportRevision, int16 priority);
private:

View File

@@ -25,7 +25,7 @@ namespace epics {
Transport* BlockingUDPConnector::connect(TransportClient* client,
ResponseHandler* responseHandler, osiSockAddr* bindAddress,
short transportRevision, short priority) {
short transportRevision, int16 priority) {
errlogSevPrintf(errlogInfo, "Creating datagram socket to: %s",
inetAddressToString(bindAddress).c_str());
@@ -59,11 +59,13 @@ namespace epics {
retval = ::setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &optval,
sizeof(optval));
if(retval<0) errlogSevPrintf(errlogMajor,
"Error binding socket: %s", strerror(errno));
"Error setting SO_REUSEADDR: %s", strerror(errno));
optval = _broadcast ? 1 : 0;
retval = ::setsockopt(socket, SOL_SOCKET, SO_BROADCAST, &optval,
sizeof(optval));
if(retval<0) errlogSevPrintf(errlogMajor,
"Error setting SO_BROADCAST: %s", strerror(errno));
// sockets are blocking by default

View File

@@ -9,10 +9,13 @@
#define REMOTE_H_
#include "caConstants.h"
#include "transportRegistry.h"
#include "introspectionRegistry.h"
#include <serialize.h>
#include <pvType.h>
#include <byteBuffer.h>
#include <timer.h>
#include <osiSock.h>
#include <osdSock.h>
@@ -20,6 +23,8 @@
namespace epics {
namespace pvAccess {
class TransportRegistry;
enum ProtocolType {
TCP, UDP, SSL
};
@@ -54,7 +59,8 @@ namespace epics {
* NOTE: these limitations allows efficient implementation.
*/
virtual void
send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) =0;
send(epics::pvData::ByteBuffer* buffer,
TransportSendControl* control) =0;
virtual void lock() =0;
virtual void unlock() =0;
@@ -156,7 +162,7 @@ namespace epics {
* Get introspection registry for transport.
* @return <code>IntrospectionRegistry</code> instance.
*/
//virtual IntrospectionRegistry getIntrospectionRegistry() =0;
virtual IntrospectionRegistry* getIntrospectionRegistry() =0;
/**
* Close transport.
@@ -206,9 +212,10 @@ namespace epics {
* Note that this might not be the only message in the buffer.
* Code must not manipulate buffer.
*/
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) =0;
virtual void
handleResponse(osiSockAddr* responseFrom, Transport* transport,
int8 version, int8 command, int payloadSize,
epics::pvData::ByteBuffer* payloadBuffer) =0;
};
/**
@@ -261,10 +268,46 @@ namespace epics {
*/
virtual Transport* connect(TransportClient* client,
ResponseHandler* responseHandler, osiSockAddr* address,
short transportRevision, short priority) =0;
short transportRevision, int16 priority) =0;
};
class Context {
public:
/**
* Get timer.
* @return timer.
*/
virtual Timer* getTimer() =0;
/**
* Get transport (virtual circuit) registry.
* @return transport (virtual circuit) registry.
*/
virtual TransportRegistry* getTransportRegistry() =0;
};
/**
* Interface defining reference counting transport IF.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: ReferenceCountingTransport.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
*/
class ReferenceCountingTransport {
public:
/**
* Acquires transport.
* @param client client (channel) acquiring the transport
* @return <code>true</code> if transport was granted, <code>false</code> otherwise.
*/
virtual bool acquire(TransportClient* client) =0;
/**
* Releases transport.
* @param client client (channel) releasing the transport
*/
virtual void release(TransportClient* client) =0;
};
}
}

View File

@@ -24,6 +24,7 @@ using namespace std;
namespace epics { namespace pvAccess {
class Transport;
typedef std::map<const int16,Transport*> prioritiesMap_t;
typedef std::map<const int32,prioritiesMap_t*> transportsMap_t;

View File

@@ -1,7 +1,7 @@
/* testRemoteClientImpl.cpp */
/* Author: Matej Sekoranja Date: 2011.1.1 */
#include <transportRegistry.h>
#include <pvAccess.h>
#include <iostream>
#include <showConstructDestruct.h>
@@ -25,7 +25,7 @@ class ChannelImplProcess : public ChannelProcess
ChannelProcessRequester* m_channelProcessRequester;
PVStructure* m_pvStructure;
PVScalar* m_valueField;
private:
~ChannelImplProcess()
{
@@ -44,29 +44,29 @@ class ChannelImplProcess : public ChannelProcess
Status* noValueFieldStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "no 'value' field");
m_channelProcessRequester->channelProcessConnect(noValueFieldStatus, this);
delete noValueFieldStatus;
// NOTE client must destroy this instance...
// do not access any fields and return ASAP
return;
}
if (field->getField()->getType() != scalar)
{
Status* notAScalarStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "'value' field not scalar type");
m_channelProcessRequester->channelProcessConnect(notAScalarStatus, this);
delete notAScalarStatus;
// NOTE client must destroy this instance….
// do not access any fields and return ASAP
return;
}
m_valueField = static_cast<PVScalar*>(field);
// TODO pvRequest
// TODO pvRequest
m_channelProcessRequester->channelProcessConnect(getStatusCreate()->getStatusOK(), this);
}
virtual void process(bool lastRequest)
{
switch (m_valueField->getScalar()->getScalarType())
@@ -138,19 +138,19 @@ class ChannelImplProcess : public ChannelProcess
default:
// noop
break;
}
}
m_channelProcessRequester->processDone(getStatusCreate()->getStatusOK());
if (lastRequest)
destroy();
}
virtual void destroy()
{
delete this;
}
};
@@ -167,7 +167,7 @@ class ChannelImplGet : public ChannelGet
PVStructure* m_pvStructure;
BitSet* m_bitSet;
volatile bool m_first;
private:
~ChannelImplGet()
{
@@ -181,10 +181,10 @@ class ChannelImplGet : public ChannelGet
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannelGet);
// TODO pvRequest
// TODO pvRequest
m_channelGetRequester->channelGetConnect(getStatusCreate()->getStatusOK(), this, m_pvStructure, m_bitSet);
}
virtual void get(bool lastRequest)
{
m_channelGetRequester->getDone(getStatusCreate()->getStatusOK());
@@ -193,17 +193,17 @@ class ChannelImplGet : public ChannelGet
m_first = false;
m_bitSet->set(0); // TODO
}
if (lastRequest)
destroy();
}
virtual void destroy()
{
delete m_bitSet;
delete this;
}
};
@@ -222,7 +222,7 @@ class ChannelImplPut : public ChannelPut
PVStructure* m_pvStructure;
BitSet* m_bitSet;
volatile bool m_first;
private:
~ChannelImplPut()
{
@@ -236,17 +236,17 @@ class ChannelImplPut : public ChannelPut
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannelPut);
// TODO pvRequest
// TODO pvRequest
m_channelPutRequester->channelPutConnect(getStatusCreate()->getStatusOK(), this, m_pvStructure, m_bitSet);
}
virtual void put(bool lastRequest)
{
m_channelPutRequester->putDone(getStatusCreate()->getStatusOK());
if (lastRequest)
destroy();
}
virtual void get()
{
m_channelPutRequester->getDone(getStatusCreate()->getStatusOK());
@@ -257,7 +257,7 @@ class ChannelImplPut : public ChannelPut
delete m_bitSet;
delete this;
}
};
@@ -278,7 +278,7 @@ class MockMonitor : public Monitor, public MonitorElement
volatile bool m_first;
Mutex* m_lock;
volatile int m_count;
private:
~MockMonitor()
{
@@ -297,16 +297,16 @@ class MockMonitor : public Monitor, public MonitorElement
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockMonitor);
m_changedBitSet->set(0);
// TODO pvRequest
// TODO pvRequest
m_monitorRequester->monitorConnect(getStatusCreate()->getStatusOK(), this, const_cast<Structure*>(m_pvStructure->getStructure()));
}
virtual Status* start()
{
// fist monitor
m_monitorRequester->monitorEvent(this);
// client needs to delete status, so passing shared OK instance is not right thing to do
return getStatusCreate()->createStatus(STATUSTYPE_OK, "Monitor started.");
}
@@ -337,24 +337,24 @@ class MockMonitor : public Monitor, public MonitorElement
if (m_count)
m_count--;
}
virtual void destroy()
{
delete stop();
delete m_lock;
delete m_overrunBitSet;
delete m_changedBitSet;
delete this;
}
// ============ MonitorElement ============
virtual PVStructure* getPVStructure()
{
return m_pvStructure;
}
virtual BitSet* getChangedBitSet()
{
return m_changedBitSet;
@@ -364,8 +364,8 @@ class MockMonitor : public Monitor, public MonitorElement
{
return m_overrunBitSet;
}
};
@@ -381,17 +381,17 @@ class ChannelImpl : public Channel {
ChannelRequester* m_requester;
String m_name;
String m_remoteAddress;
PVStructure* m_pvStructure;
private:
~ChannelImpl()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(mockChannel);
}
public:
ChannelImpl(
ChannelProvider* provider,
ChannelRequester* requester,
@@ -403,8 +403,8 @@ class ChannelImpl : public Channel {
m_remoteAddress(remoteAddress)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannel);
ScalarType stype = pvDouble;
String allProperties("alarm,timeStamp,display,control,valueAlarm");
@@ -413,11 +413,11 @@ class ChannelImpl : public Channel {
PVDouble *pvField = m_pvStructure->getDoubleField(String("value"));
pvField->put(1.123);
// already connected, report state
m_requester->channelStateChange(this, CONNECTED);
}
virtual void destroy()
{
delete m_pvStructure;
@@ -428,13 +428,13 @@ class ChannelImpl : public Channel {
{
return getChannelName();
};
virtual void message(String message,MessageType messageType)
virtual void message(String message,MessageType messageType)
{
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl;
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl;
}
virtual ChannelProvider* getProvider()
virtual ChannelProvider* getProvider()
{
return m_provider;
}
@@ -502,7 +502,7 @@ class ChannelImpl : public Channel {
// TODO
return 0;
}
virtual ChannelRPC* createChannelRPC(ChannelRPCRequester *channelRPCRequester,
epics::pvData::PVStructure *pvRequest)
{
@@ -539,24 +539,24 @@ class ChannelImplFind : public ChannelFind
{
// one instance for all, do not delete at all
}
virtual ChannelProvider* getChannelProvider()
{
return m_provider;
};
virtual void cancelChannelFind()
{
throw std::runtime_error("not supported");
}
private:
// only to be destroyed by it
friend class ChannelProviderImpl;
virtual ~ChannelImplFind() {}
ChannelProvider* m_provider;
ChannelProvider* m_provider;
};
class ChannelProviderImpl : public ChannelProvider {
@@ -569,13 +569,13 @@ class ChannelProviderImpl : public ChannelProvider {
{
return "ChannelProviderImpl";
}
virtual void destroy()
{
delete m_mockChannelFind;
delete this;
}
virtual ChannelFind* channelFind(
epics::pvData::String channelName,
ChannelFindRequester *channelFindRequester)
@@ -605,23 +605,22 @@ class ChannelProviderImpl : public ChannelProvider {
return channel;
}
else
{
{
Status* errorStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "only local supported", 0);
channelRequester->channelCreated(errorStatus, 0);
delete errorStatus; // TODO guard from CB
return 0;
}
}
private:
~ChannelProviderImpl() {};
ChannelImplFind* m_mockChannelFind;
};
class TransportRegistry;
class ChannelSearchManager;
class BlockingTCPConnector;
class NamedLockPattern;
@@ -631,8 +630,8 @@ class BeaconHandlerImpl;
class ClientContextImpl : public ClientContext
{
public:
ClientContextImpl() :
ClientContextImpl() :
m_addressList(""), m_autoAddressList(true), m_connectionTimeout(30.0f), m_beaconPeriod(15.0f),
m_broadcastPort(CA_BROADCAST_PORT), m_receiveBufferSize(MAX_TCP_RECV), m_timer(0),
m_broadcastTransport(0), m_searchTransport(0), m_connector(0), m_transportRegistry(0),
@@ -641,7 +640,7 @@ class ClientContextImpl : public ClientContext
{
initialize();
}
virtual Version* getVersion() {
return m_version;
}
@@ -649,33 +648,33 @@ class ClientContextImpl : public ClientContext
virtual ChannelProvider* getProvider() {
return m_provider;
}
virtual void initialize() {
m_provider = new ChannelProviderImpl();
}
virtual void printInfo() {
String info;
printInfo(&info);
std::cout << info.c_str() << std::endl;
}
virtual void printInfo(epics::pvData::StringBuilder out) {
out->append(m_version->getVersionString());
}
virtual void destroy()
{
m_provider->destroy();
delete m_version;
delete this;
}
virtual void dispose()
{
destroy();
}
}
private:
~ClientContextImpl() {};
@@ -684,9 +683,9 @@ class ClientContextImpl : public ClientContext
* Each address must be of the form: ip.number:port or host.name:port
*/
String m_addressList;
/**
* Define whether or not the network interfaces should be discovered at runtime.
* Define whether or not the network interfaces should be discovered at runtime.
*/
bool m_autoAddressList;
@@ -697,22 +696,22 @@ class ClientContextImpl : public ClientContext
* the server is no longer present on the network and disconnect.
*/
float m_connectionTimeout;
/**
* Period in second between two beacon signals.
*/
float m_beaconPeriod;
/**
* Broadcast (beacon, search) port number to listen to.
*/
int m_broadcastPort;
/**
* Receive buffer size (max size of payload).
*/
int m_receiveBufferSize;
/**
* Timer.
*/
@@ -722,7 +721,7 @@ class ClientContextImpl : public ClientContext
* Broadcast transport needed to listen for broadcasts.
*/
BlockingUDPTransport* m_broadcastTransport;
/**
* UDP transport needed for channel searches.
*/
@@ -735,7 +734,7 @@ class ClientContextImpl : public ClientContext
/**
* CA transport (virtual circuit) registry.
* This registry contains all active transports - connections to CA servers.
* This registry contains all active transports - connections to CA servers.
*/
TransportRegistry* m_transportRegistry;
@@ -758,7 +757,7 @@ class ClientContextImpl : public ClientContext
typedef int pvAccessID;
/**
* Last CID cache.
* Last CID cache.
*/
pvAccessID m_lastCID;
@@ -770,7 +769,7 @@ typedef int pvAccessID;
IntResponseRequestMap m_pendingResponseRequests;
/**
* Last IOID cache.
* Last IOID cache.
*/
pvAccessID m_lastIOID;
@@ -786,7 +785,7 @@ typedef int pvAccessID;
// TODO consider std::unordered_map
typedef std::map<osiSockAddr, BeaconHandlerImpl*> AddressBeaconHandlerMap;
AddressBeaconHandlerMap m_beaconHandlers;
/**
* Version.
*/
@@ -804,7 +803,7 @@ class ChannelFindRequesterImpl : public ChannelFindRequester
virtual void channelFindResult(epics::pvData::Status *status,ChannelFind *channelFind,bool wasFound)
{
std::cout << "[ChannelFindRequesterImpl] channelFindResult("
<< status->toString() << ", ..., " << wasFound << ")" << std::endl;
<< status->toString() << ", ..., " << wasFound << ")" << std::endl;
}
};
@@ -814,10 +813,10 @@ class ChannelRequesterImpl : public ChannelRequester
{
return "ChannelRequesterImpl";
};
virtual void message(String message,MessageType messageType)
virtual void message(String message,MessageType messageType)
{
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl;
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl;
}
virtual void channelCreated(epics::pvData::Status* status, Channel *channel)
@@ -825,7 +824,7 @@ class ChannelRequesterImpl : public ChannelRequester
std::cout << "channelCreated(" << status->toString() << ", "
<< (channel ? channel->getChannelName() : "(null)") << ")" << std::endl;
}
virtual void channelStateChange(Channel *c, ConnectionState connectionState)
{
std::cout << "channelStateChange(" << c->getChannelName() << ", " << ConnectionStateNames[connectionState] << ")" << std::endl;
@@ -838,10 +837,10 @@ class GetFieldRequesterImpl : public GetFieldRequester
{
return "GetFieldRequesterImpl";
};
virtual void message(String message,MessageType messageType)
virtual void message(String message,MessageType messageType)
{
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl;
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl;
}
virtual void getDone(epics::pvData::Status *status,epics::pvData::FieldConstPtr field)
@@ -864,22 +863,22 @@ class ChannelGetRequesterImpl : public ChannelGetRequester
ChannelGet *m_channelGet;
epics::pvData::PVStructure *m_pvStructure;
epics::pvData::BitSet *m_bitSet;
virtual String getRequesterName()
{
return "ChannelGetRequesterImpl";
};
virtual void message(String message,MessageType messageType)
virtual void message(String message,MessageType messageType)
{
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl;
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl;
}
virtual void channelGetConnect(epics::pvData::Status *status,ChannelGet *channelGet,
epics::pvData::PVStructure *pvStructure,epics::pvData::BitSet *bitSet)
{
std::cout << "channelGetConnect(" << status->toString() << ")" << std::endl;
// TODO sync
m_channelGet = channelGet;
m_pvStructure = pvStructure;
@@ -901,22 +900,22 @@ class ChannelPutRequesterImpl : public ChannelPutRequester
ChannelPut *m_channelPut;
epics::pvData::PVStructure *m_pvStructure;
epics::pvData::BitSet *m_bitSet;
virtual String getRequesterName()
{
return "ChannelPutRequesterImpl";
};
virtual void message(String message,MessageType messageType)
virtual void message(String message,MessageType messageType)
{
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl;
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl;
}
virtual void channelPutConnect(epics::pvData::Status *status,ChannelPut *channelPut,
epics::pvData::PVStructure *pvStructure,epics::pvData::BitSet *bitSet)
{
std::cout << "channelPutConnect(" << status->toString() << ")" << std::endl;
// TODO sync
m_channelPut = channelPut;
m_pvStructure = pvStructure;
@@ -942,20 +941,20 @@ class ChannelPutRequesterImpl : public ChannelPutRequester
}
};
class MonitorRequesterImpl : public MonitorRequester
{
virtual String getRequesterName()
{
return "MonitorRequesterImpl";
};
virtual void message(String message,MessageType messageType)
virtual void message(String message,MessageType messageType)
{
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl;
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl;
}
virtual void monitorConnect(Status* status, Monitor* monitor, Structure* structure)
{
std::cout << "monitorConnect(" << status->toString() << ")" << std::endl;
@@ -966,13 +965,13 @@ class MonitorRequesterImpl : public MonitorRequester
std::cout << str << std::endl;
}
}
virtual void monitorEvent(Monitor* monitor)
{
std::cout << "monitorEvent" << std::endl;
MonitorElement* element = monitor->poll();
String str("changed/overrun ");
element->getChangedBitSet()->toString(&str);
str += '/';
@@ -980,35 +979,35 @@ class MonitorRequesterImpl : public MonitorRequester
str += '\n';
element->getPVStructure()->toString(&str);
std::cout << str << std::endl;
monitor->release(element);
}
virtual void unlisten(Monitor* monitor)
{
std::cout << "unlisten" << std::endl;
}
};
};
class ChannelProcessRequesterImpl : public ChannelProcessRequester
{
ChannelProcess *m_channelProcess;
virtual String getRequesterName()
{
return "ProcessRequesterImpl";
};
virtual void message(String message,MessageType messageType)
virtual void message(String message,MessageType messageType)
{
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl;
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl;
}
virtual void channelProcessConnect(epics::pvData::Status *status,ChannelProcess *channelProcess)
{
std::cout << "channelProcessConnect(" << status->toString() << ")" << std::endl;
// TODO sync
m_channelProcess = channelProcess;
}
@@ -1024,13 +1023,13 @@ int main(int argc,char *argv[])
{
ClientContextImpl* context = new ClientContextImpl();
context->printInfo();
ChannelFindRequesterImpl findRequester;
context->getProvider()->channelFind("something", &findRequester);
ChannelRequesterImpl channelRequester;
//Channel* noChannel
//Channel* noChannel
context->getProvider()->createChannel("test", &channelRequester, ChannelProvider::PRIORITY_DEFAULT, "over the rainbow");
Channel* channel = context->getProvider()->createChannel("test", &channelRequester);
@@ -1038,19 +1037,19 @@ int main(int argc,char *argv[])
/*
GetFieldRequesterImpl getFieldRequesterImpl;
channel->getField(&getFieldRequesterImpl, "timeStamp.secondsPastEpoch");
ChannelGetRequesterImpl channelGetRequesterImpl;
ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, 0);
channelGet->get(false);
channelGet->destroy();
ChannelPutRequesterImpl channelPutRequesterImpl;
ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, 0);
channelPut->get();
channelPut->put(false);
channelPut->destroy();
MonitorRequesterImpl monitorRequesterImpl;
Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, 0);
@@ -1063,19 +1062,19 @@ int main(int argc,char *argv[])
ChannelProcess* channelProcess = channel->createChannelProcess(&channelProcessRequester, 0);
channelProcess->process(false);
channelProcess->destroy();
status = monitor->stop();
std::cout << "monitor->stop() = " << status->toString() << std::endl;
delete status;
monitor->destroy();
*/
channel->destroy();
context->destroy();
std::cout << "-----------------------------------------------------------------------" << std::endl;
getShowConstructDestruct()->constuctDestructTotals(stdout);
return(0);

View File

@@ -4,6 +4,7 @@
*/
#include "transportRegistry.h"
#include "introspectionRegistry.h"
#include "showConstructDestruct.h"
#include <epicsAssert.h>
@@ -39,6 +40,7 @@ namespace epics {
virtual void verified(){};
virtual void enqueueSendRequest(TransportSender* sender){};
virtual void ensureData(int) {};
virtual IntrospectionRegistry* getIntrospectionRegistry() {return NULL;};
private:
string _type;
int16 _priority;