flow: Merged <feature> 'codec' to <develop> ('default').

This commit is contained in:
Matej Sekoranja
2014-03-26 14:07:51 +01:00
18 changed files with 5700 additions and 2472 deletions

View File

@@ -45,18 +45,17 @@ INC += channelSearchManager.h
INC += simpleChannelSearchManagerImpl.h
INC += transportRegistry.h
INC += serializationHelper.h
INC += codec.h
LIBSRCS += blockingUDPTransport.cpp
LIBSRCS += blockingUDPConnector.cpp
LIBSRCS += beaconHandler.cpp
LIBSRCS += blockingTCPTransport.cpp
LIBSRCS += blockingClientTCPTransport.cpp
LIBSRCS += blockingTCPConnector.cpp
LIBSRCS += blockingServerTCPTransport.cpp
LIBSRCS += simpleChannelSearchManagerImpl.cpp
LIBSRCS += abstractResponseHandler.cpp
LIBSRCS += blockingTCPAcceptor.cpp
LIBSRCS += transportRegistry.cpp
LIBSRCS += serializationHelper.cpp
LIBSRCS += codec.cpp
SRC_DIRS += $(PVACCESS)/remoteClient
INC += clientContextImpl.h

View File

@@ -1,240 +0,0 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#include <pv/blockingTCP.h>
#include <pv/introspectionRegistry.h>
#include <pv/logger.h>
#include <pv/lock.h>
#include <set>
#include <epicsTime.h>
#include <sstream>
using namespace std;
using namespace epics::pvData;
namespace epics {
namespace pvAccess {
#define EXCEPTION_GUARD(code) try { code; } \
catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \
catch (...) { LOG(logLevelError, "Unhandled exception caught from code at %s:%d.", __FILE__, __LINE__); }
BlockingClientTCPTransport::BlockingClientTCPTransport(
Context::shared_pointer const & context, SOCKET channel,
auto_ptr<ResponseHandler>& responseHandler, int receiveBufferSize,
TransportClient::shared_pointer client, int8 /*remoteTransportRevision*/,
float beaconInterval, int16 priority) :
BlockingTCPTransport(context, channel, responseHandler, receiveBufferSize, priority),
_connectionTimeout(beaconInterval*1000),
_unresponsiveTransport(false),
_verifyOrEcho(true)
{
// _autoDelete = false;
// initialize owners list, send queue
acquire(client);
// use immediate for clients
setFlushStrategy(DELAYED);
// setup connection timeout timer (watchdog)
epicsTimeGetCurrent(&_aliveTimestamp);
}
void BlockingClientTCPTransport::start()
{
TimerCallbackPtr tcb = std::tr1::dynamic_pointer_cast<TimerCallback>(shared_from_this());
_context->getTimer()->schedulePeriodic(tcb, _connectionTimeout, _connectionTimeout);
BlockingTCPTransport::start();
}
BlockingClientTCPTransport::~BlockingClientTCPTransport() {
}
void BlockingClientTCPTransport::callback() {
epicsTimeStamp currentTime;
epicsTimeGetCurrent(&currentTime);
_mutex.lock();
// no exception expected here
double diff = epicsTimeDiffInSeconds(&currentTime, &_aliveTimestamp);
_mutex.unlock();
if(diff>2*_connectionTimeout) {
unresponsiveTransport();
}
// use some k (3/4) to handle "jitter"
else if(diff>=((3*_connectionTimeout)/4)) {
// send echo
TransportSender::shared_pointer transportSender = std::tr1::dynamic_pointer_cast<TransportSender>(shared_from_this());
enqueueSendRequest(transportSender);
}
}
void BlockingClientTCPTransport::unresponsiveTransport() {
Lock lock(_mutex);
if(!_unresponsiveTransport) {
_unresponsiveTransport = true;
TransportClientMap_t::iterator it = _owners.begin();
for(; it!=_owners.end(); it++) {
TransportClient::shared_pointer client = it->second.lock();
if (client)
{
EXCEPTION_GUARD(client->transportUnresponsive());
}
}
}
}
bool BlockingClientTCPTransport::acquire(TransportClient::shared_pointer const & client) {
Lock lock(_mutex);
if(_closed.get()) return false;
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
LOG(logLevelDebug, "Acquiring transport to %s.", ipAddrStr);
_owners[client->getID()] = TransportClient::weak_pointer(client);
//_owners.insert(TransportClient::weak_pointer(client));
return true;
}
// _mutex is held when this method is called
void BlockingClientTCPTransport::internalClose(bool forced) {
BlockingTCPTransport::internalClose(forced);
TimerCallbackPtr tcb = std::tr1::dynamic_pointer_cast<TimerCallback>(shared_from_this());
_context->getTimer()->cancel(tcb);
}
void BlockingClientTCPTransport::internalPostClose(bool forced) {
BlockingTCPTransport::internalPostClose(forced);
// _owners cannot change when transport is closed
closedNotifyClients();
}
/**
* Notifies clients about disconnect.
*/
void BlockingClientTCPTransport::closedNotifyClients() {
// check if still acquired
size_t refs = _owners.size();
if(refs>0) {
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
LOG(
logLevelDebug,
"Transport to %s still has %d client(s) active and closing...",
ipAddrStr, refs);
TransportClientMap_t::iterator it = _owners.begin();
for(; it!=_owners.end(); it++) {
TransportClient::shared_pointer client = it->second.lock();
if (client)
{
EXCEPTION_GUARD(client->transportClosed());
}
}
}
_owners.clear();
}
//void BlockingClientTCPTransport::release(TransportClient::shared_pointer const & client) {
void BlockingClientTCPTransport::release(pvAccessID clientID) {
Lock lock(_mutex);
if(_closed.get()) return;
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
LOG(logLevelDebug, "Releasing transport to %s.", ipAddrStr);
_owners.erase(clientID);
//_owners.erase(TransportClient::weak_pointer(client));
// not used anymore, close it
// TODO consider delayed destruction (can improve performance!!!)
if(_owners.size()==0) close(); // TODO close(false)
}
void BlockingClientTCPTransport::aliveNotification() {
Lock guard(_mutex);
epicsTimeGetCurrent(&_aliveTimestamp);
if(_unresponsiveTransport) responsiveTransport();
}
void BlockingClientTCPTransport::responsiveTransport() {
Lock lock(_mutex);
if(_unresponsiveTransport) {
_unresponsiveTransport = false;
Transport::shared_pointer thisSharedPtr = shared_from_this();
TransportClientMap_t::iterator it = _owners.begin();
for(; it!=_owners.end(); it++) {
TransportClient::shared_pointer client = it->second.lock();
if (client)
{
EXCEPTION_GUARD(client->transportResponsive(thisSharedPtr));
}
}
}
}
void BlockingClientTCPTransport::changedTransport() {
outgoingIR.reset();
Lock lock(_mutex);
TransportClientMap_t::iterator it = _owners.begin();
for(; it!=_owners.end(); it++) {
TransportClient::shared_pointer client = it->second.lock();
if (client)
{
EXCEPTION_GUARD(client->transportChanged());
}
}
}
void BlockingClientTCPTransport::send(ByteBuffer* buffer,
TransportSendControl* control) {
if(_verifyOrEcho) {
/*
* send verification response message
*/
control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32)+sizeof(int16));
// receive buffer size
buffer->putInt(static_cast<int32>(getReceiveBufferSize()));
// socket receive buffer size
buffer->putInt(static_cast<int32>(getSocketReceiveBufferSize()));
// connection priority
buffer->putShort(getPriority());
// send immediately
control->flush(true);
_verifyOrEcho = false;
}
else {
control->startMessage(CMD_ECHO, 0);
// send immediately
control->flush(true);
}
}
}
}

View File

@@ -1,136 +0,0 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#include <pv/blockingTCP.h>
#include <pv/remote.h>
#include <pv/logger.h>
#include <pv/lock.h>
#include <pv/byteBuffer.h>
/* standard */
#include <map>
using namespace epics::pvData;
using namespace std;
namespace epics {
namespace pvAccess {
BlockingServerTCPTransport::BlockingServerTCPTransport(
Context::shared_pointer const & context, SOCKET channel,
auto_ptr<ResponseHandler>& responseHandler, int receiveBufferSize) :
BlockingTCPTransport(context, channel, responseHandler, receiveBufferSize, PVA_DEFAULT_PRIORITY),
_lastChannelSID(0)
{
// for performance testing
setFlushStrategy(DELAYED);
_delay = 0.000;
// NOTE: priority not yet known, default priority is used to register/unregister
// TODO implement priorities in Reactor... not that user will
// change it.. still getPriority() must return "registered" priority!
//start();
}
BlockingServerTCPTransport::~BlockingServerTCPTransport() {
}
void BlockingServerTCPTransport::destroyAllChannels() {
Lock lock(_channelsMutex);
if(_channels.size()==0) return;
char ipAddrStr[64];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
LOG(
logLevelDebug,
"Transport to %s still has %u channel(s) active and closing...",
ipAddrStr, (unsigned int)_channels.size());
map<pvAccessID, ServerChannel::shared_pointer>::iterator it = _channels.begin();
for(; it!=_channels.end(); it++)
it->second->destroy();
_channels.clear();
}
void BlockingServerTCPTransport::internalClose(bool force) {
Transport::shared_pointer thisSharedPtr = shared_from_this();
BlockingTCPTransport::internalClose(force);
destroyAllChannels();
}
void BlockingServerTCPTransport::internalPostClose(bool forced) {
BlockingTCPTransport::internalPostClose(forced);
}
pvAccessID BlockingServerTCPTransport::preallocateChannelSID() {
Lock lock(_channelsMutex);
// search first free (theoretically possible loop of death)
pvAccessID sid = ++_lastChannelSID;
while(_channels.find(sid)!=_channels.end())
sid = ++_lastChannelSID;
return sid;
}
void BlockingServerTCPTransport::registerChannel(pvAccessID sid, ServerChannel::shared_pointer const & channel) {
Lock lock(_channelsMutex);
_channels[sid] = channel;
}
void BlockingServerTCPTransport::unregisterChannel(pvAccessID sid) {
Lock lock(_channelsMutex);
_channels.erase(sid);
}
ServerChannel::shared_pointer BlockingServerTCPTransport::getChannel(pvAccessID sid) {
Lock lock(_channelsMutex);
map<pvAccessID, ServerChannel::shared_pointer>::iterator it = _channels.find(sid);
if(it!=_channels.end()) return it->second;
return ServerChannel::shared_pointer();
}
int BlockingServerTCPTransport::getChannelCount() {
Lock lock(_channelsMutex);
return static_cast<int>(_channels.size());
}
void BlockingServerTCPTransport::send(ByteBuffer* buffer,
TransportSendControl* control) {
//
// set byte order control message
//
control->ensureBuffer(PVA_MESSAGE_HEADER_SIZE);
buffer->putByte(PVA_MAGIC);
buffer->putByte(PVA_VERSION);
buffer->putByte(0x01 | ((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00)); // control + big endian
buffer->putByte(2); // set byte order
buffer->putInt(0);
//
// send verification message
//
control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32));
// receive buffer size
buffer->putInt(static_cast<int32>(getReceiveBufferSize()));
// socket receive buffer size
buffer->putInt(static_cast<int32>(getSocketReceiveBufferSize()));
// send immediately
control->flush(true);
}
}
}

View File

@@ -40,584 +40,9 @@
#include <pv/namedLockPattern.h>
#include <pv/inetAddressUtil.h>
// not implemented anyway
#define FLOW_CONTROL 0
namespace epics {
namespace pvAccess {
//class MonitorSender;
enum ReceiveStage {
READ_FROM_SOCKET, PROCESS_HEADER, PROCESS_PAYLOAD, UNDEFINED_STAGE
};
class BlockingTCPTransport :
public Transport,
public TransportSendControl,
public std::tr1::enable_shared_from_this<BlockingTCPTransport>
{
protected:
BlockingTCPTransport(Context::shared_pointer const & context, SOCKET channel,
std::auto_ptr<ResponseHandler>& responseHandler, int receiveBufferSize,
epics::pvData::int16 priority);
public:
virtual bool isClosed() {
return _closed.get();
}
virtual epics::pvData::int8 getRevision() const {
return PVA_PROTOCOL_REVISION;
}
virtual void setRemoteRevision(epics::pvData::int8 revision) {
_remoteTransportRevision = revision;
}
virtual void setRemoteTransportReceiveBufferSize(std::size_t remoteTransportReceiveBufferSize) {
_remoteTransportReceiveBufferSize = remoteTransportReceiveBufferSize;
}
virtual void setRemoteTransportSocketReceiveBufferSize(std::size_t socketReceiveBufferSize) {
_remoteTransportSocketReceiveBufferSize = socketReceiveBufferSize;
}
virtual epics::pvData::String getType() const {
return epics::pvData::String("TCP");
}
virtual void aliveNotification() {
// noop
}
virtual void changedTransport() {
// noop
}
virtual const osiSockAddr* getRemoteAddress() const {
return &_socketAddress;
}
virtual epics::pvData::int16 getPriority() const {
return _priority;
}
virtual std::size_t getReceiveBufferSize() const {
return _socketBuffer->getSize();
}
/**
* Get remote transport receive buffer size (in bytes).
* @return remote transport receive buffer size
*/
virtual std::size_t getRemoteTransportReceiveBufferSize() const {
return _remoteTransportReceiveBufferSize;
}
virtual std::size_t getSocketReceiveBufferSize() const;
virtual bool verify(epics::pvData::int32 timeoutMs) {
return _verifiedEvent.wait(timeoutMs/1000.0);
//epics::pvData::Lock lock(_verifiedMutex);
//return _verified;
}
virtual void verified() {
epics::pvData::Lock lock(_verifiedMutex);
_verified = true;
_verifiedEvent.signal();
}
virtual void setRecipient(const osiSockAddr& /*sendTo*/) {
// noop
}
virtual void flush(bool lastMessageCompleted);
virtual void startMessage(epics::pvData::int8 command, std::size_t ensureCapacity);
virtual void endMessage();
virtual void flushSerializeBuffer() {
flush(false);
}
virtual void ensureBuffer(std::size_t size);
virtual void alignBuffer(std::size_t alignment);
virtual void ensureData(std::size_t size);
virtual void alignData(std::size_t alignment);
virtual bool directSerialize(epics::pvData::ByteBuffer *existingBuffer, const char* toSerialize,
std::size_t elementCount, std::size_t elementSize);
virtual bool directDeserialize(epics::pvData::ByteBuffer *existingBuffer, char* deserializeTo,
std::size_t elementCount, std::size_t elementSize);
void processReadIntoDirectBuffer(std::size_t bytesToRead);
virtual void close();
virtual void setByteOrder(int /*byteOrder*/)
{
// not used this this implementation
}
FlushStrategy getFlushStrategy() {
return _flushStrategy;
}
void setFlushStrategy(FlushStrategy flushStrategy) {
_flushStrategy = flushStrategy;
}
//void requestFlush();
/**
* Close and free connection resources.
*/
void freeConnectionResorces();
/**
* Starts the receive and send threads
*/
virtual void start();
virtual void enqueueSendRequest(TransportSender::shared_pointer const & sender);
//void enqueueMonitorSendRequest(TransportSender::shared_pointer const & sender);
virtual void enqueueOnlySendRequest(TransportSender::shared_pointer const & sender);
virtual void flushSendQueue();
virtual void cachedSerialize(
const std::tr1::shared_ptr<const epics::pvData::Field>& field, epics::pvData::ByteBuffer* buffer)
{
outgoingIR.serialize(field, buffer, this);
}
virtual std::tr1::shared_ptr<const epics::pvData::Field>
cachedDeserialize(epics::pvData::ByteBuffer* buffer)
{
return incomingIR.deserialize(buffer, this);
}
protected:
virtual void processReadCached(bool nestedCall,
ReceiveStage inStage, std::size_t requiredBytes);
/**
* Called to any resources just before closing transport
* @param[in] force flag indicating if forced (e.g. forced
* disconnect) is required
*/
virtual void internalClose(bool force);
/**
* Called to any resources just after closing transport and without any locks held on transport
* @param[in] force flag indicating if forced (e.g. forced
* disconnect) is required
*/
virtual void internalPostClose(bool force);
/**
* Send a buffer through the transport.
* NOTE: TCP sent buffer/sending has to be synchronized (not done by this method).
* @param buffer[in] buffer to be sent
* @return success indicator
*/
virtual bool send(epics::pvData::ByteBuffer* buffer);
virtual ~BlockingTCPTransport();
#if FLOW_CONTROL
/**
* Default marker period.
*/
static const std::size_t MARKER_PERIOD = 1024;
#endif
static const std::size_t MAX_ENSURE_DATA_BUFFER_SIZE = 1024;
// TODO
double _delay;
/****** finally initialized at construction time and after start (called by the same thread) ********/
/**
* Corresponding channel.
*/
SOCKET _channel;
/**
* Cached socket address.
*/
osiSockAddr _socketAddress;
/**
* Priority.
* NOTE: Priority cannot just be changed, since it is registered
* in transport registry with given priority.
*/
epics::pvData::int16 _priority;
// TODO to be implemeneted
/**
* PVAS response handler.
*/
std::auto_ptr<ResponseHandler> _responseHandler;
// TODO review int vs std::size_t
/**
* Send buffer size.
*/
std::size_t _maxPayloadSize;
/**
* Send buffer size.
*/
int _socketSendBufferSize;
/**
* Marker "period" in bytes (every X bytes marker should be set).
*/
epics::pvData::int64 _markerPeriodBytes;
FlushStrategy _flushStrategy;
epicsThreadId _rcvThreadId;
epicsThreadId _sendThreadId;
// TODO
//MonitorSender* _monitorSender;
Context::shared_pointer _context;
bool _autoDelete;
/**** after verification ****/
/**
* Remote side transport revision (minor).
*/
epics::pvData::int8 _remoteTransportRevision;
/**
* Remote side transport receive buffer size.
*/
size_t _remoteTransportReceiveBufferSize;
/**
* Remote side transport socket receive buffer size.
*/
size_t _remoteTransportSocketReceiveBufferSize;
/*** send thread only - no need to sync ***/
// NOTE: now all send-related external calls are TransportSender IF
// and its reference is only valid when called from send thread
// initialized at construction time
std::deque<TransportSender::shared_pointer> _sendQueue;
epics::pvData::Mutex _sendQueueMutex;
// initialized at construction time
// std::deque<TransportSender::shared_pointer> _monitorSendQueue;
// epics::pvData::Mutex _monitorMutex;
/**
* Send buffer.
*/
epics::pvData::ByteBuffer* _sendBuffer;
#if FLOW_CONTROL
/**
* Next planned marker position.
*/
epics::pvData::int64 _nextMarkerPosition;
#endif
/**
* Send pending flag.
*/
bool _sendPending;
/**
* Last message start position.
*/
int _lastMessageStartPosition;
epics::pvData::int8 _lastSegmentedMessageType;
epics::pvData::int8 _lastSegmentedMessageCommand;
bool _flushRequested;
int _sendBufferSentPosition;
epics::pvData::int8 _byteOrderFlag;
/**
* Outgoing (codes generated by this party) introspection registry.
*/
IntrospectionRegistry outgoingIR;
/*** receive thread only - no need to sync ***/
// initialized at construction time
epics::pvData::ByteBuffer* _socketBuffer;
std::size_t _startPosition;
std::size_t _storedPayloadSize;
std::size_t _storedPosition;
std::size_t _storedLimit;
epics::pvData::int8 _version;
epics::pvData::int8 _packetType;
epics::pvData::int8 _command;
std::size_t _payloadSize;
ReceiveStage _stage;
std::size_t _directPayloadRead;
char * _directBuffer;
#if FLOW_CONTROL
/**
* Total bytes received.
*/
epics::pvData::int64 _totalBytesReceived;
#endif
/**
* Incoming (codes generated by other party) introspection registry.
*/
IntrospectionRegistry incomingIR;
/*** send/receive thread shared ***/
/**
* Connection status
* NOTE: synced by _mutex
*/
AtomicBoolean _closed;
// NOTE: synced by _mutex
bool _sendThreadExited;
epics::pvData::Mutex _mutex;
bool _verified;
epics::pvData::Mutex _verifiedMutex;
epics::pvData::Event _sendQueueEvent;
epics::pvData::Event _verifiedEvent;
#if FLOW_CONTROL
/**
* Marker to send.
* NOTE: synced by _flowControlMutex
*/
int _markerToSend;
/**
* Total bytes sent.
* NOTE: synced by _flowControlMutex
*/
epics::pvData::int64 _totalBytesSent;
/**
* Calculated remote free buffer size.
* NOTE: synced by _flowControlMutex
*/
epics::pvData::int64 _remoteBufferFreeSpace;
epics::pvData::Mutex _flowControlMutex;
#endif
private:
/**
* Internal method that clears and releases buffer.
* sendLock and sendBufferLock must be hold while calling this method.
*/
void clearAndReleaseBuffer();
void endMessage(bool hasMoreSegments);
bool flush();
void processSendQueue();
static void rcvThreadRunner(void* param);
static void sendThreadRunner(void* param);
/**
* Free all send buffers (return them to the cached buffer allocator).
*/
void freeSendBuffers();
};
class BlockingClientTCPTransport : public BlockingTCPTransport,
public TransportSender,
public epics::pvData::TimerCallback {
public:
POINTER_DEFINITIONS(BlockingClientTCPTransport);
private:
BlockingClientTCPTransport(Context::shared_pointer const & context, SOCKET channel,
std::auto_ptr<ResponseHandler>& responseHandler, int receiveBufferSize,
TransportClient::shared_pointer client, epics::pvData::int8 remoteTransportRevision,
float beaconInterval, epics::pvData::int16 priority);
public:
static shared_pointer create(Context::shared_pointer const & context, SOCKET channel,
std::auto_ptr<ResponseHandler>& responseHandler, int receiveBufferSize,
TransportClient::shared_pointer client, epics::pvData::int8 remoteTransportRevision,
float beaconInterval, epics::pvData::int16 priority)
{
shared_pointer thisPointer(
new BlockingClientTCPTransport(context, channel, responseHandler, receiveBufferSize,
client, remoteTransportRevision, beaconInterval, priority)
);
thisPointer->start();
return thisPointer;
}
virtual void start();
virtual ~BlockingClientTCPTransport();
virtual void timerStopped() {
// noop
}
virtual void callback();
/**
* Acquires transport.
* @param client client (channel) acquiring the transport
* @return <code>true</code> if transport was granted, <code>false</code> otherwise.
*/
virtual bool acquire(TransportClient::shared_pointer const & client);
/**
* Releases transport.
* @param client client (channel) releasing the transport
*/
virtual void release(pvAccessID clientId);
//virtual void release(TransportClient::shared_pointer const & client);
/**
* Alive notification.
* This method needs to be called (by newly received data or beacon)
* at least once in this period, if not echo will be issued
* and if there is not response to it, transport will be considered as unresponsive.
*/
virtual void aliveNotification();
/**
* Changed transport (server restared) notify.
*/
virtual void changedTransport();
virtual void lock() {
// noop
}
virtual void unlock() {
// noop
}
virtual void acquire() {
// noop, since does not make sence on itself
}
virtual void release() {
// noop, since does not make sence on itself
}
virtual void send(epics::pvData::ByteBuffer* buffer,
TransportSendControl* control);
protected:
virtual void internalClose(bool force);
virtual void internalPostClose(bool force);
private:
/**
* Owners (users) of the transport.
*/
// TODO consider using TR1 hash map
typedef std::map<pvAccessID, TransportClient::weak_pointer> TransportClientMap_t;
TransportClientMap_t _owners;
/**
* Connection timeout (no-traffic) flag.
*/
double _connectionTimeout;
/**
* Unresponsive transport flag.
*/
bool _unresponsiveTransport;
/**
* Timestamp of last "live" event on this transport.
*/
epicsTimeStamp _aliveTimestamp;
bool _verifyOrEcho;
/**
* Unresponsive transport notify.
*/
void unresponsiveTransport();
/**
* Notifies clients about disconnect.
*/
void closedNotifyClients();
/**
* Responsive transport notify.
*/
void responsiveTransport();
};
/**
* Channel Access TCP connector.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
@@ -672,152 +97,6 @@ namespace epics {
};
class BlockingServerTCPTransport : public BlockingTCPTransport,
public ChannelHostingTransport,
public TransportSender {
public:
POINTER_DEFINITIONS(BlockingServerTCPTransport);
private:
BlockingServerTCPTransport(Context::shared_pointer const & context, SOCKET channel,
std::auto_ptr<ResponseHandler>& responseHandler, int receiveBufferSize);
public:
static shared_pointer create(Context::shared_pointer const & context, SOCKET channel,
std::auto_ptr<ResponseHandler>& responseHandler, int receiveBufferSize)
{
shared_pointer thisPointer(
new BlockingServerTCPTransport(context, channel, responseHandler, receiveBufferSize)
);
thisPointer->start();
return thisPointer;
}
virtual bool acquire(std::tr1::shared_ptr<TransportClient> const & /*client*/)
{
return false;
}
virtual void release(pvAccessID /*clientId*/) {}
/**
* Preallocate new channel SID.
* @return new channel server id (SID).
*/
virtual pvAccessID preallocateChannelSID();
/**
* De-preallocate new channel SID.
* @param sid preallocated channel SID.
*/
virtual void depreallocateChannelSID(pvAccessID /*sid*/) {
// noop
}
/**
* Register a new channel.
* @param sid preallocated channel SID.
* @param channel channel to register.
*/
virtual void registerChannel(pvAccessID sid, ServerChannel::shared_pointer const & channel);
/**
* Unregister a new channel (and deallocates its handle).
* @param sid SID
*/
virtual void unregisterChannel(pvAccessID sid);
/**
* Get channel by its SID.
* @param sid channel SID
* @return channel with given SID, <code>NULL</code> otherwise
*/
virtual ServerChannel::shared_pointer getChannel(pvAccessID sid);
/**
* Get channel count.
* @return channel count.
*/
virtual int getChannelCount();
virtual epics::pvData::PVField::shared_pointer getSecurityToken() {
return epics::pvData::PVField::shared_pointer();
}
virtual void lock() {
// noop
}
virtual void unlock() {
// noop
}
virtual void acquire() {
// noop, since does not make sence on itself
}
virtual void release() {
// noop, since does not make sence on itself
}
/**
* Verify transport. Server side is self-verified.
*/
virtual bool verify(epics::pvData::int32 /*timeoutMs*/) {
TransportSender::shared_pointer transportSender = std::tr1::dynamic_pointer_cast<TransportSender>(shared_from_this());
enqueueSendRequest(transportSender);
verified();
return true;
}
/**
* PVA connection validation request.
* A server sends a validate connection message when it receives a new connection.
* The message indicates that the server is ready to receive requests; the client must
* not send any messages on the connection until it has received the validate connection message
* from the server. No reply to the message is expected by the server.
* The purpose of the validate connection message is two-fold:
* It informs the client of the protocol version supported by the server.
* It prevents the client from writing a request message to its local transport
* buffers until after the server has acknowledged that it can actually process the
* request. This avoids a race condition caused by the server's TCP/IP stack
* accepting connections in its backlog while the server is in the process of shutting down:
* if the client were to send a request in this situation, the request
* would be lost but the client could not safely re-issue the request because that
* might violate at-most-once semantics.
* The validate connection message guarantees that a server is not in the middle
* of shutting down when the server's TCP/IP stack accepts an incoming connection
* and so avoids the race condition.
* @see org.epics.ca.impl.remote.TransportSender#send(java.nio.ByteBuffer, org.epics.ca.impl.remote.TransportSendControl)
*/
virtual void send(epics::pvData::ByteBuffer* buffer,
TransportSendControl* control);
virtual ~BlockingServerTCPTransport();
protected:
virtual void internalClose(bool force);
virtual void internalPostClose(bool force);
private:
/**
* Last SID cache.
*/
pvAccessID _lastChannelSID;
/**
* Channel table (SID -> channel mapping).
*/
std::map<pvAccessID, ServerChannel::shared_pointer> _channels;
epics::pvData::Mutex _channelsMutex;
/**
* Destroy all channels.
*/
void destroyAllChannels();
};
class ResponseHandlerFactory
{
public:

View File

@@ -5,6 +5,7 @@
*/
#include <pv/blockingTCP.h>
#include "codec.h"
#include <pv/remote.h>
#include <pv/logger.h>
@@ -180,17 +181,26 @@ namespace pvAccess {
}
// TODO tune buffer sizes?!
// get TCP send buffer size
osiSocklen_t intLen = sizeof(int);
int _socketSendBufferSize;
retval = getsockopt(newClient, SOL_SOCKET, SO_SNDBUF, (char *)&_socketSendBufferSize, &intLen);
if(retval<0) {
epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer));
LOG(logLevelDebug, "Error getting SO_SNDBUF: %s", strBuffer);
}
/**
* Create transport, it registers itself to the registry.
* Each transport should have its own response handler since it is not "shareable"
*/
std::auto_ptr<ResponseHandler> responseHandler = _responseHandlerFactory->createResponseHandler();
BlockingServerTCPTransport::shared_pointer transport =
BlockingServerTCPTransport::create(
detail::BlockingServerTCPTransportCodec::shared_pointer transport =
detail::BlockingServerTCPTransportCodec::create(
_context,
newClient,
responseHandler,
_socketSendBufferSize,
_receiveBufferSize);
// validate connection

View File

@@ -8,6 +8,7 @@
#include <pv/remote.h>
#include <pv/namedLockPattern.h>
#include <pv/logger.h>
#include <pv/codec.h>
#include <epicsThread.h>
#include <osiSock.h>
@@ -78,8 +79,7 @@ namespace epics {
Context::shared_pointer context = _context.lock();
// first try to check cache w/o named lock...
Transport::shared_pointer tt = context->getTransportRegistry()->get("TCP", &address, priority);
BlockingClientTCPTransport::shared_pointer transport = std::tr1::static_pointer_cast<BlockingClientTCPTransport>(tt);
Transport::shared_pointer transport = context->getTransportRegistry()->get("TCP", &address, priority);
if(transport.get()) {
LOG(logLevelDebug,
"Reusing existing connection to PVA server: %s",
@@ -92,8 +92,7 @@ namespace epics {
if(lockAcquired) {
try {
// ... transport created during waiting in lock
tt = context->getTransportRegistry()->get("TCP", &address, priority);
transport = std::tr1::static_pointer_cast<BlockingClientTCPTransport>(tt);
transport = context->getTransportRegistry()->get("TCP", &address, priority);
if(transport.get()) {
LOG(logLevelDebug,
"Reusing existing connection to PVA server: %s",
@@ -141,8 +140,18 @@ namespace epics {
// create transport
// TODO introduce factory
transport = BlockingClientTCPTransport::create(
context, socket, responseHandler, _receiveBufferSize,
// get TCP send buffer size
osiSocklen_t intLen = sizeof(int);
int _socketSendBufferSize;
retval = getsockopt(socket, SOL_SOCKET, SO_SNDBUF, (char *)&_socketSendBufferSize, &intLen);
if(retval<0) {
char strBuffer[64];
epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer));
LOG(logLevelDebug, "Error getting SO_SNDBUF: %s", strBuffer);
}
transport = detail::BlockingClientTCPTransportCodec::create(
context, socket, responseHandler, _receiveBufferSize, _socketSendBufferSize,
client, transportRevision, _beaconInterval, priority);
// verify

File diff suppressed because it is too large Load Diff

1613
pvAccessApp/remote/codec.cpp Normal file

File diff suppressed because it is too large Load Diff

800
pvAccessApp/remote/codec.h Normal file
View File

@@ -0,0 +1,800 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#ifndef CODEC_H_
#define CODEC_H_
#include <set>
#include <map>
#include <deque>
#ifdef epicsExportSharedSymbols
# define abstractCodecEpicsExportSharedSymbols
# undef epicsExportSharedSymbols
#endif
#include <shareLib.h>
#include <osdSock.h>
#include <osiSock.h>
#include <epicsTime.h>
#include <epicsThread.h>
#include <pv/byteBuffer.h>
#include <pv/pvType.h>
#include <pv/lock.h>
#include <pv/timer.h>
#include <pv/event.h>
#include <pv/likely.h>
#ifdef abstractCodecEpicsExportSharedSymbols
# define epicsExportSharedSymbols
# undef abstractCodecEpicsExportSharedSymbols
#endif
#include <pv/pvaConstants.h>
#include <pv/remote.h>
#include <pv/transportRegistry.h>
#include <pv/introspectionRegistry.h>
#include <pv/namedLockPattern.h>
#include <pv/inetAddressUtil.h>
namespace epics {
namespace pvAccess {
namespace detail {
// TODO replace mutex with atomic (CAS) operations
template<typename T>
class AtomicValue
{
public:
AtomicValue(): _value(0) {};
T getAndSet(T value)
{
mutex.lock();
T tmp = _value; _value = value;
mutex.unlock();
return tmp;
}
T get() { mutex.lock(); T tmp = _value; mutex.unlock(); return tmp; }
private:
T _value;
epics::pvData::Mutex mutex;
};
template<typename T>
class queue {
public:
queue(void) { }
//TODO
/*queue(queue const &T) = delete;
queue(queue &&T) = delete;
queue& operator=(const queue &T) = delete;
*/
~queue(void)
{
}
bool empty(void)
{
epics::pvData::Lock lock(_queueMutex);
return _queue.empty();
}
void clean()
{
epics::pvData::Lock lock(_queueMutex);
_queue.clear();
}
void wakeup()
{
if (!_wakeup.getAndSet(true))
{
_queueEvent.signal();
}
}
void put(T const & elem)
{
{
epics::pvData::Lock lock(_queueMutex);
_queue.push_back(elem);
}
_queueEvent.signal();
}
T take(int timeOut)
{
while (true)
{
bool isEmpty = empty();
if (isEmpty)
{
if (timeOut < 0) {
return T();
}
while (isEmpty)
{
if (timeOut == 0) {
_queueEvent.wait();
}
else {
_queueEvent.wait(timeOut);
}
isEmpty = empty();
if (isEmpty)
{
if (timeOut > 0) { // TODO spurious wakeup, but not critical
return T();
}
else // if (timeout == 0) cannot be negative
{
if (_wakeup.getAndSet(false)) {
return T();
}
}
}
}
}
else
{
epics::pvData::Lock lock(_queueMutex);
T sender = _queue.front();
_queue.pop_front();
return sender;
}
}
}
private:
std::deque<T> _queue;
epics::pvData::Event _queueEvent;
epics::pvData::Mutex _queueMutex;
AtomicValue<bool> _wakeup;
epics::pvData::Mutex _stdMutex;
};
class io_exception: public std::runtime_error {
public:
explicit io_exception(const std::string &s): std::runtime_error(s) {}
};
class invalid_data_stream_exception: public std::runtime_error {
public:
explicit invalid_data_stream_exception(
const std::string &s): std::runtime_error(s) {}
};
class connection_closed_exception: public std::runtime_error {
public:
explicit connection_closed_exception(const std::string &s): std::runtime_error(s) {}
};
enum ReadMode { NORMAL, SPLIT, SEGMENTED };
enum WriteMode { PROCESS_SEND_QUEUE, WAIT_FOR_READY_SIGNAL };
class AbstractCodec :
public TransportSendControl,
public Transport
{
public:
static const std::size_t MAX_MESSAGE_PROCESS;
static const std::size_t MAX_MESSAGE_SEND;
static const std::size_t MAX_ENSURE_SIZE;
static const std::size_t MAX_ENSURE_DATA_SIZE;
static const std::size_t MAX_ENSURE_BUFFER_SIZE;
static const std::size_t MAX_ENSURE_DATA_BUFFER_SIZE;
AbstractCodec(
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & receiveBuffer,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & sendBuffer,
int32_t socketSendBufferSize,
bool blockingProcessQueue);
virtual void processControlMessage() = 0;
virtual void processApplicationMessage() = 0;
virtual const osiSockAddr* getLastReadBufferSocketAddress() = 0;
virtual void invalidDataStreamHandler() = 0;
virtual void readPollOne()=0;
virtual void writePollOne() = 0;
virtual void scheduleSend() = 0;
virtual void sendCompleted() = 0;
virtual bool terminated() = 0;
virtual int write(epics::pvData::ByteBuffer* src) = 0;
virtual int read(epics::pvData::ByteBuffer* dst) = 0;
virtual bool isOpen() = 0;
virtual void close() = 0;
virtual ~AbstractCodec()
{
}
void alignBuffer(std::size_t alignment);
void ensureData(std::size_t size);
void alignData(std::size_t alignment);
void startMessage(
epics::pvData::int8 command,
std::size_t ensureCapacity);
void putControlMessage(
epics::pvData::int8 command,
epics::pvData::int32 data);
void endMessage();
void ensureBuffer(std::size_t size);
void flushSerializeBuffer();
void flush(bool lastMessageCompleted);
void processWrite();
void processRead();
void processSendQueue();
void clearSendQueue();
void enqueueSendRequest(TransportSender::shared_pointer const & sender);
void enqueueSendRequest(TransportSender::shared_pointer const & sender,
std::size_t requiredBufferSize);
void setSenderThread();
void setRecipient(osiSockAddr const & sendTo);
void setByteOrder(int byteOrder);
static std::size_t alignedValue(std::size_t value, std::size_t alignment);
protected:
virtual void sendBufferFull(int tries) = 0;
void send(epics::pvData::ByteBuffer *buffer);
ReadMode _readMode;
int8_t _version;
int8_t _flags;
int8_t _command;
int32_t _payloadSize; // TODO why not size_t?
epics::pvData::int32 _remoteTransportSocketReceiveBufferSize;
int64_t _totalBytesSent;
bool _blockingProcessQueue;
//TODO initialize union
osiSockAddr _sendTo;
epicsThreadId _senderThread;
WriteMode _writeMode;
bool _writeOpReady;
bool _lowLatency;
std::tr1::shared_ptr<epics::pvData::ByteBuffer> _socketBuffer;
std::tr1::shared_ptr<epics::pvData::ByteBuffer> _sendBuffer;
queue<TransportSender::shared_pointer> _sendQueue;
private:
void processHeader();
void processReadNormal();
void postProcessApplicationMessage();
void processReadSegmented();
bool readToBuffer(std::size_t requiredBytes, bool persistent);
void endMessage(bool hasMoreSegments);
void processSender(
epics::pvAccess::TransportSender::shared_pointer const & sender);
std::size_t _storedPayloadSize;
std::size_t _storedPosition;
std::size_t _storedLimit;
std::size_t _startPosition;
std::size_t _maxSendPayloadSize;
std::size_t _lastMessageStartPosition;
std::size_t _lastSegmentedMessageType;
int8_t _lastSegmentedMessageCommand;
std::size_t _nextMessagePayloadOffset;
epics::pvData::int8 _byteOrderFlag;
int32_t _socketSendBufferSize;
};
class BlockingAbstractCodec:
public AbstractCodec,
public std::tr1::enable_shared_from_this<BlockingAbstractCodec>
{
public:
POINTER_DEFINITIONS(BlockingAbstractCodec);
BlockingAbstractCodec(
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & receiveBuffer,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & sendBuffer,
int32_t socketSendBufferSize):
AbstractCodec(receiveBuffer, sendBuffer, socketSendBufferSize, true),
_readThread(0), _sendThread(0) { _isOpen.getAndSet(true);}
void readPollOne();
void writePollOne();
void scheduleSend() {}
void sendCompleted() {}
void close();
bool terminated();
bool isOpen();
void start();
static void receiveThread(void* param);
static void sendThread(void* param);
protected:
void sendBufferFull(int tries);
virtual void internalDestroy() = 0;
/**
* Called to any resources just before closing transport
* @param[in] force flag indicating if forced (e.g. forced
* disconnect) is required
*/
virtual void internalClose(bool force);
/**
* Called to any resources just after closing transport and without any locks held on transport
* @param[in] force flag indicating if forced (e.g. forced
* disconnect) is required
*/
virtual void internalPostClose(bool force);
private:
AtomicValue<bool> _isOpen;
volatile epicsThreadId _readThread;
volatile epicsThreadId _sendThread;
epics::pvData::Event _shutdownEvent;
};
class BlockingSocketAbstractCodec:
public BlockingAbstractCodec
{
public:
BlockingSocketAbstractCodec(
SOCKET channel,
int32_t sendBufferSize,
int32_t receiveBufferSize);
int read(epics::pvData::ByteBuffer* dst);
int write(epics::pvData::ByteBuffer* src);
const osiSockAddr* getLastReadBufferSocketAddress() { return &_socketAddress; }
void invalidDataStreamHandler();
std::size_t getSocketReceiveBufferSize() const;
protected:
void internalDestroy();
SOCKET _channel;
osiSockAddr _socketAddress;
};
class BlockingTCPTransportCodec :
public BlockingSocketAbstractCodec
{
public:
epics::pvData::String getType() const {
return epics::pvData::String("TCP");
}
void internalDestroy() {
BlockingSocketAbstractCodec::internalDestroy();
Transport::shared_pointer thisSharedPtr = this->shared_from_this();
_context->getTransportRegistry()->remove(thisSharedPtr);
}
void changedTransport() {}
void processControlMessage() {
if (_command == 2)
{
// check 7-th bit
setByteOrder(_flags < 0 ? EPICS_ENDIAN_BIG : EPICS_ENDIAN_LITTLE);
}
}
void processApplicationMessage() {
_responseHandler->handleResponse(&_socketAddress, shared_from_this(),
_version, _command, _payloadSize, _socketBuffer.get());
}
const osiSockAddr* getRemoteAddress() const {
return &_socketAddress;
}
epics::pvData::int8 getRevision() const {
return PVA_PROTOCOL_REVISION;
}
std::size_t getReceiveBufferSize() const {
return _socketBuffer->getSize();
}
epics::pvData::int16 getPriority() const {
return _priority;
}
void setRemoteRevision(epics::pvData::int8 revision) {
_remoteTransportRevision = revision;
}
void setRemoteTransportReceiveBufferSize(
std::size_t remoteTransportReceiveBufferSize) {
_remoteTransportReceiveBufferSize = remoteTransportReceiveBufferSize;
}
void setRemoteTransportSocketReceiveBufferSize(
std::size_t socketReceiveBufferSize) {
_remoteTransportSocketReceiveBufferSize = socketReceiveBufferSize;
}
std::tr1::shared_ptr<const epics::pvData::Field>
cachedDeserialize(epics::pvData::ByteBuffer* buffer)
{
return _incomingIR.deserialize(buffer, this);
}
void cachedSerialize(
const std::tr1::shared_ptr<const epics::pvData::Field>& field,
epics::pvData::ByteBuffer* buffer)
{
_outgoingIR.serialize(field, buffer, this);
}
bool directSerialize(
epics::pvData::ByteBuffer * /*existingBuffer*/,
const char* /*toSerialize*/,
std::size_t /*elementCount*/, std::size_t /*elementSize*/)
{
// TODO !!!!
return false;
}
bool directDeserialize(epics::pvData::ByteBuffer * /*existingBuffer*/,
char* /*deserializeTo*/,
std::size_t /*elementCount*/, std::size_t /*elementSize*/) {
// TODO !!!
return false;
}
void flushSendQueue() { };
bool isClosed() {
return !isOpen();
}
void activate() {
Transport::shared_pointer thisSharedPtr = shared_from_this();
_context->getTransportRegistry()->put(thisSharedPtr);
start();
}
protected:
BlockingTCPTransportCodec(
Context::shared_pointer const & context,
SOCKET channel,
std::auto_ptr<ResponseHandler>& responseHandler,
int32_t sendBufferSize,
int32_t receiveBufferSize,
epics::pvData::int16 priority
):
BlockingSocketAbstractCodec(channel, sendBufferSize, receiveBufferSize),
_context(context), _responseHandler(responseHandler),
_remoteTransportReceiveBufferSize(MAX_TCP_RECV),
_remoteTransportRevision(0), _priority(priority)
{
}
Context::shared_pointer _context;
IntrospectionRegistry _incomingIR;
IntrospectionRegistry _outgoingIR;
private:
std::auto_ptr<ResponseHandler> _responseHandler;
size_t _remoteTransportReceiveBufferSize;
epics::pvData::int8 _remoteTransportRevision;
epics::pvData::int16 _priority;
};
class BlockingServerTCPTransportCodec :
public BlockingTCPTransportCodec,
public ChannelHostingTransport,
public TransportSender {
public:
POINTER_DEFINITIONS(BlockingServerTCPTransportCodec);
protected:
BlockingServerTCPTransportCodec(
Context::shared_pointer const & context,
SOCKET channel,
std::auto_ptr<ResponseHandler>& responseHandler,
int32_t sendBufferSize,
int32_t receiveBufferSize );
public:
static shared_pointer create(
Context::shared_pointer const & context,
SOCKET channel,
std::auto_ptr<ResponseHandler>& responseHandler,
int sendBufferSize,
int receiveBufferSize)
{
shared_pointer thisPointer(
new BlockingServerTCPTransportCodec(
context, channel, responseHandler,
sendBufferSize, receiveBufferSize)
);
thisPointer->activate();
return thisPointer;
}
public:
bool acquire(std::tr1::shared_ptr<TransportClient> const & client)
{
return false;
}
void release(pvAccessID /*clientId*/) {}
pvAccessID preallocateChannelSID();
void depreallocateChannelSID(pvAccessID /*sid*/) {
// noop
}
void registerChannel(
pvAccessID sid,
ServerChannel::shared_pointer const & channel);
void unregisterChannel(pvAccessID sid);
ServerChannel::shared_pointer getChannel(pvAccessID sid);
int getChannelCount();
epics::pvData::PVField::shared_pointer getSecurityToken() {
return epics::pvData::PVField::shared_pointer();
}
void lock() {
// noop
}
void unlock() {
// noop
}
bool verify(epics::pvData::int32 timeoutMs) {
TransportSender::shared_pointer transportSender =
std::tr1::dynamic_pointer_cast<TransportSender>(shared_from_this());
enqueueSendRequest(transportSender);
verified();
return true;
}
void verified() {
}
void aliveNotification() {
// noop on server-side
}
void send(epics::pvData::ByteBuffer* buffer,
TransportSendControl* control);
virtual ~BlockingServerTCPTransportCodec();
protected:
void destroyAllChannels();
virtual void internalClose(bool force);
private:
/**
* Last SID cache.
*/
pvAccessID _lastChannelSID;
/**
* Channel table (SID -> channel mapping).
*/
std::map<pvAccessID, ServerChannel::shared_pointer> _channels;
epics::pvData::Mutex _channelsMutex;
};
class BlockingClientTCPTransportCodec :
public BlockingTCPTransportCodec,
public TransportSender,
public epics::pvData::TimerCallback {
public:
POINTER_DEFINITIONS(BlockingClientTCPTransportCodec);
protected:
BlockingClientTCPTransportCodec(
Context::shared_pointer const & context,
SOCKET channel,
std::auto_ptr<ResponseHandler>& responseHandler,
int32_t sendBufferSize,
int32_t receiveBufferSize,
TransportClient::shared_pointer const & client,
epics::pvData::int8 remoteTransportRevision,
float beaconInterval,
int16_t priority);
public:
static shared_pointer create(
Context::shared_pointer const & context,
SOCKET channel,
std::auto_ptr<ResponseHandler>& responseHandler,
int32_t sendBufferSize,
int32_t receiveBufferSize,
TransportClient::shared_pointer const & client,
int8_t remoteTransportRevision,
float beaconInterval,
int16_t priority )
{
shared_pointer thisPointer(
new BlockingClientTCPTransportCodec(
context, channel, responseHandler,
sendBufferSize, receiveBufferSize,
client, remoteTransportRevision,
beaconInterval, priority)
);
thisPointer->activate();
return thisPointer;
}
public:
void start();
virtual ~BlockingClientTCPTransportCodec();
virtual void timerStopped() {
// noop
}
virtual void callback();
bool acquire(TransportClient::shared_pointer const & client);
void release(pvAccessID clientId);
void changedTransport();
void lock() {
// noop
}
void unlock() {
// noop
}
bool verify(epics::pvData::int32 timeoutMs);
void verified();
void aliveNotification();
void send(epics::pvData::ByteBuffer* buffer,
TransportSendControl* control);
protected:
virtual void internalClose(bool force);
virtual void internalPostClose(bool force);
private:
/**
* Owners (users) of the transport.
*/
// TODO consider using TR1 hash map
typedef std::map<pvAccessID, TransportClient::weak_pointer> TransportClientMap_t;
TransportClientMap_t _owners;
/**
* Connection timeout (no-traffic) flag.
*/
double _connectionTimeout;
/**
* Unresponsive transport flag.
*/
bool _unresponsiveTransport;
/**
* Timestamp of last "live" event on this transport.
*/
epicsTimeStamp _aliveTimestamp;
bool _verifyOrEcho;
/**
* Unresponsive transport notify.
*/
void unresponsiveTransport();
/**
* Notifies clients about disconnect.
*/
void closedNotifyClients();
/**
* Responsive transport notify.
*/
void responsiveTransport();
epics::pvData::Mutex _mutex;
bool _verified;
epics::pvData::Mutex _verifiedMutex;
epics::pvData::Event _verifiedEvent;
};
}
}
}
#endif /* CODEC_H_ */

View File

@@ -4117,7 +4117,7 @@ TODO
auto_ptr<ResponseHandler> handler(new ClientResponseHandler(shared_from_this()));
Transport::shared_pointer t = m_connector->connect(client, handler, *serverAddress, minorRevision, priority);
// TODO !!!
static_pointer_cast<BlockingTCPTransport>(t)->setFlushStrategy(m_flushStrategy);
//static_pointer_cast<BlockingTCPTransport>(t)->setFlushStrategy(m_flushStrategy);
return t;
}
catch (...)

View File

@@ -204,15 +204,15 @@ void ServerContextImpl::internalInitialize()
_timer.reset(new Timer("pvAccess-server timer", lowerPriority));
_transportRegistry.reset(new TransportRegistry());
// setup broadcast UDP transport
initializeBroadcastTransport();
ServerContextImpl::shared_pointer thisServerContext = shared_from_this();
_acceptor.reset(new BlockingTCPAcceptor(thisServerContext, thisServerContext, _serverPort, _receiveBufferSize));
_serverPort = ntohs(_acceptor->getBindAddress()->ia.sin_port);
_beaconEmitter.reset(new BeaconEmitter(_broadcastTransport, thisServerContext));
// setup broadcast UDP transport
initializeBroadcastTransport();
_beaconEmitter.reset(new BeaconEmitter(_broadcastTransport, thisServerContext));
}
void ServerContextImpl::initializeBroadcastTransport()

View File

@@ -23,12 +23,9 @@ pvAccessApp/mb/pvAccessMB.h
pvAccessApp/remote/abstractResponseHandler.cpp
pvAccessApp/remote/beaconHandler.cpp
pvAccessApp/remote/beaconHandler.h
pvAccessApp/remote/blockingClientTCPTransport.cpp
pvAccessApp/remote/blockingServerTCPTransport.cpp
pvAccessApp/remote/blockingTCP.h
pvAccessApp/remote/blockingTCPAcceptor.cpp
pvAccessApp/remote/blockingTCPConnector.cpp
pvAccessApp/remote/blockingTCPTransport.cpp
pvAccessApp/remote/blockingUDP.h
pvAccessApp/remote/blockingUDPConnector.cpp
pvAccessApp/remote/blockingUDPTransport.cpp
@@ -40,6 +37,8 @@ pvAccessApp/remote/simpleChannelSearchManagerImpl.cpp
pvAccessApp/remote/simpleChannelSearchManagerImpl.h
pvAccessApp/remote/transportRegistry.cpp
pvAccessApp/remote/transportRegistry.h
pvAccessApp/remote/codec.cpp
pvAccessApp/remote/codec.h
pvAccessApp/remoteClient/clientContextImpl.cpp
pvAccessApp/remoteClient/clientContextImpl.h
pvAccessApp/remoteClient/clientContextImpl.h.orig
@@ -101,6 +100,8 @@ testApp/remote/testNTImage.cpp
testApp/remote/testRemoteClientImpl.cpp
testApp/remote/testServer.cpp
testApp/remote/testServerContext.cpp
testApp/remote/testChannelAccess.cpp
testApp/remote/testCodec.cpp
testApp/utils/testAtomicBoolean.cpp
testApp/utils/configurationTest.cpp
testApp/utils/testHexDump.cpp

View File

@@ -7,4 +7,6 @@
/home/msekoranja/epicsV4/pvAccessCPP/pvAccessApp/rpcService
/home/msekoranja/epicsV4/pvAccessCPP/pvAccessApp/server
/home/msekoranja/epicsV4/pvAccessCPP/pvAccessApp/utils
/home/msekoranja/epicsV4/pvAccessCPP/testApp/remote
/home/msekoranja/epicsV4/pvAccessCPP/testApp/remote
/home/msekoranja/epicsV4/pvAccessCPP/testApp/client
/home/msekoranja/epicsV4/pvAccessCPP/testApp/utils

View File

@@ -51,6 +51,12 @@ testChannelAccess_SRCS = testChannelAccess channelAccessIFTest
testChannelAccess_LIBS += pvAccess pvData pvMB Com
TESTS += testChannelAccess
TESTPROD_HOST += testCodec
testCodec_SRCS = testCodec
testCodec_LIBS += pvData pvAccess pvMB Com
TESTS += testCodec
PROD_HOST += pvget
pvget_SRCS += pvget.cpp
pvget_LIBS += pvAccess pvData pvMB Com

View File

@@ -432,7 +432,6 @@ void ChannelAccessIFTest::test_channel() {
testOk(!channel->isConnected(), "%s: yet again destroyed channel should not be connected ", CURRENT_FUNCTION);
testOk(channel->getConnectionState() == Channel::DESTROYED ,
"%s: yet again destroyed channel connection state DESTROYED ", CURRENT_FUNCTION);
}

View File

@@ -88,6 +88,7 @@ class ChannelAccessIFRemoteTest: public ChannelAccessIFTest {
MAIN(testChannelProvider)
{
SET_LOG_LEVEL(logLevelError);
ChannelAccessIFRemoteTest caRemoteTest;
return caRemoteTest.runAllTest();
}

3203
testApp/remote/testCodec.cpp Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1541,21 +1541,45 @@ public:
}
else if (m_channelName == "testSum") {
int a = pvArgument->getIntField("a")->get();
int b = pvArgument->getIntField("b")->get();
PVStructure::shared_pointer args(
(pvArgument->getStructure()->getID() == "uri:ev4:nt/2012/pwd:NTURI") ?
pvArgument->getStructureField("query") :
pvArgument
);
FieldCreatePtr fieldCreate = getFieldCreate();
const String helpText =
"Calculates a sum of two integer values.\n"
"Arguments:\n"
"\tint a\tfirst integer number\n"
"\tint b\tsecond integer number\n";
if (handleHelp(args, m_channelRPCRequester, helpText))
return;
StringArray fieldNames;
fieldNames.push_back("c");
FieldConstPtrArray fields;
fields.push_back(fieldCreate->createScalar(pvInt));
StructureConstPtr resultStructure = fieldCreate->createStructure(fieldNames, fields);
PVInt::shared_pointer pa = args->getSubField<PVInt>("a");
PVInt::shared_pointer pb = args->getSubField<PVInt>("b");
if (!pa || !pb)
{
PVStructure::shared_pointer nullPtr;
Status errorStatus(Status::STATUSTYPE_ERROR, "int a and int b arguments are required");
m_channelRPCRequester->requestDone(errorStatus, nullPtr);
return;
}
PVStructure::shared_pointer result = getPVDataCreate()->createPVStructure(resultStructure);
result->getIntField("c")->put(a+b);
int a = pa->get();
int b = pb->get();
m_channelRPCRequester->requestDone(Status::Ok, result);
FieldCreatePtr fieldCreate = getFieldCreate();
StringArray fieldNames;
fieldNames.push_back("c");
FieldConstPtrArray fields;
fields.push_back(fieldCreate->createScalar(pvInt));
StructureConstPtr resultStructure = fieldCreate->createStructure(fieldNames, fields);
PVStructure::shared_pointer result = getPVDataCreate()->createPVStructure(resultStructure);
result->getIntField("c")->put(a+b);
m_channelRPCRequester->requestDone(Status::Ok, result);
}
else if (m_channelName.find("testServerShutdown") == 0)