trace log removal, old codec files removed
This commit is contained in:
@@ -49,10 +49,7 @@ INC += codec.h
|
||||
LIBSRCS += blockingUDPTransport.cpp
|
||||
LIBSRCS += blockingUDPConnector.cpp
|
||||
LIBSRCS += beaconHandler.cpp
|
||||
LIBSRCS += blockingTCPTransport.cpp
|
||||
LIBSRCS += blockingClientTCPTransport.cpp
|
||||
LIBSRCS += blockingTCPConnector.cpp
|
||||
LIBSRCS += blockingServerTCPTransport.cpp
|
||||
LIBSRCS += simpleChannelSearchManagerImpl.cpp
|
||||
LIBSRCS += abstractResponseHandler.cpp
|
||||
LIBSRCS += blockingTCPAcceptor.cpp
|
||||
|
||||
@@ -1,240 +0,0 @@
|
||||
/**
|
||||
* Copyright - See the COPYRIGHT that is included with this distribution.
|
||||
* pvAccessCPP is distributed subject to a Software License Agreement found
|
||||
* in file LICENSE that is included with this distribution.
|
||||
*/
|
||||
|
||||
#include <pv/blockingTCP.h>
|
||||
#include <pv/introspectionRegistry.h>
|
||||
#include <pv/logger.h>
|
||||
|
||||
#include <pv/lock.h>
|
||||
|
||||
#include <set>
|
||||
#include <epicsTime.h>
|
||||
#include <sstream>
|
||||
|
||||
using namespace std;
|
||||
using namespace epics::pvData;
|
||||
|
||||
namespace epics {
|
||||
namespace pvAccess {
|
||||
|
||||
#define EXCEPTION_GUARD(code) try { code; } \
|
||||
catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \
|
||||
catch (...) { LOG(logLevelError, "Unhandled exception caught from code at %s:%d.", __FILE__, __LINE__); }
|
||||
|
||||
BlockingClientTCPTransport::BlockingClientTCPTransport(
|
||||
Context::shared_pointer const & context, SOCKET channel,
|
||||
auto_ptr<ResponseHandler>& responseHandler, int receiveBufferSize,
|
||||
TransportClient::shared_pointer client, int8 /*remoteTransportRevision*/,
|
||||
float beaconInterval, int16 priority) :
|
||||
BlockingTCPTransport(context, channel, responseHandler, receiveBufferSize, priority),
|
||||
_connectionTimeout(beaconInterval*1000),
|
||||
_unresponsiveTransport(false),
|
||||
_verifyOrEcho(true)
|
||||
{
|
||||
// _autoDelete = false;
|
||||
|
||||
// initialize owners list, send queue
|
||||
acquire(client);
|
||||
|
||||
// use immediate for clients
|
||||
setFlushStrategy(DELAYED);
|
||||
|
||||
// setup connection timeout timer (watchdog)
|
||||
epicsTimeGetCurrent(&_aliveTimestamp);
|
||||
}
|
||||
|
||||
void BlockingClientTCPTransport::start()
|
||||
{
|
||||
TimerCallbackPtr tcb = std::tr1::dynamic_pointer_cast<TimerCallback>(shared_from_this());
|
||||
_context->getTimer()->schedulePeriodic(tcb, _connectionTimeout, _connectionTimeout);
|
||||
BlockingTCPTransport::start();
|
||||
}
|
||||
|
||||
BlockingClientTCPTransport::~BlockingClientTCPTransport() {
|
||||
}
|
||||
|
||||
void BlockingClientTCPTransport::callback() {
|
||||
epicsTimeStamp currentTime;
|
||||
epicsTimeGetCurrent(¤tTime);
|
||||
|
||||
_mutex.lock();
|
||||
// no exception expected here
|
||||
double diff = epicsTimeDiffInSeconds(¤tTime, &_aliveTimestamp);
|
||||
_mutex.unlock();
|
||||
|
||||
if(diff>2*_connectionTimeout) {
|
||||
unresponsiveTransport();
|
||||
}
|
||||
// use some k (3/4) to handle "jitter"
|
||||
else if(diff>=((3*_connectionTimeout)/4)) {
|
||||
// send echo
|
||||
TransportSender::shared_pointer transportSender = std::tr1::dynamic_pointer_cast<TransportSender>(shared_from_this());
|
||||
enqueueSendRequest(transportSender);
|
||||
}
|
||||
}
|
||||
|
||||
void BlockingClientTCPTransport::unresponsiveTransport() {
|
||||
Lock lock(_mutex);
|
||||
if(!_unresponsiveTransport) {
|
||||
_unresponsiveTransport = true;
|
||||
|
||||
TransportClientMap_t::iterator it = _owners.begin();
|
||||
for(; it!=_owners.end(); it++) {
|
||||
TransportClient::shared_pointer client = it->second.lock();
|
||||
if (client)
|
||||
{
|
||||
EXCEPTION_GUARD(client->transportUnresponsive());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool BlockingClientTCPTransport::acquire(TransportClient::shared_pointer const & client) {
|
||||
Lock lock(_mutex);
|
||||
if(_closed.get()) return false;
|
||||
|
||||
char ipAddrStr[48];
|
||||
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
|
||||
LOG(logLevelDebug, "Acquiring transport to %s.", ipAddrStr);
|
||||
|
||||
_owners[client->getID()] = TransportClient::weak_pointer(client);
|
||||
//_owners.insert(TransportClient::weak_pointer(client));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// _mutex is held when this method is called
|
||||
void BlockingClientTCPTransport::internalClose(bool forced) {
|
||||
BlockingTCPTransport::internalClose(forced);
|
||||
|
||||
TimerCallbackPtr tcb = std::tr1::dynamic_pointer_cast<TimerCallback>(shared_from_this());
|
||||
_context->getTimer()->cancel(tcb);
|
||||
}
|
||||
|
||||
void BlockingClientTCPTransport::internalPostClose(bool forced) {
|
||||
BlockingTCPTransport::internalPostClose(forced);
|
||||
|
||||
// _owners cannot change when transport is closed
|
||||
closedNotifyClients();
|
||||
}
|
||||
|
||||
/**
|
||||
* Notifies clients about disconnect.
|
||||
*/
|
||||
void BlockingClientTCPTransport::closedNotifyClients() {
|
||||
|
||||
// check if still acquired
|
||||
size_t refs = _owners.size();
|
||||
if(refs>0) {
|
||||
char ipAddrStr[48];
|
||||
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
|
||||
LOG(
|
||||
logLevelDebug,
|
||||
"Transport to %s still has %d client(s) active and closing...",
|
||||
ipAddrStr, refs);
|
||||
|
||||
TransportClientMap_t::iterator it = _owners.begin();
|
||||
for(; it!=_owners.end(); it++) {
|
||||
TransportClient::shared_pointer client = it->second.lock();
|
||||
if (client)
|
||||
{
|
||||
EXCEPTION_GUARD(client->transportClosed());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
_owners.clear();
|
||||
}
|
||||
|
||||
//void BlockingClientTCPTransport::release(TransportClient::shared_pointer const & client) {
|
||||
void BlockingClientTCPTransport::release(pvAccessID clientID) {
|
||||
Lock lock(_mutex);
|
||||
if(_closed.get()) return;
|
||||
|
||||
char ipAddrStr[48];
|
||||
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
|
||||
|
||||
LOG(logLevelDebug, "Releasing transport to %s.", ipAddrStr);
|
||||
|
||||
_owners.erase(clientID);
|
||||
//_owners.erase(TransportClient::weak_pointer(client));
|
||||
|
||||
// not used anymore, close it
|
||||
// TODO consider delayed destruction (can improve performance!!!)
|
||||
if(_owners.size()==0) close(); // TODO close(false)
|
||||
}
|
||||
|
||||
void BlockingClientTCPTransport::aliveNotification() {
|
||||
Lock guard(_mutex);
|
||||
epicsTimeGetCurrent(&_aliveTimestamp);
|
||||
if(_unresponsiveTransport) responsiveTransport();
|
||||
}
|
||||
|
||||
void BlockingClientTCPTransport::responsiveTransport() {
|
||||
Lock lock(_mutex);
|
||||
if(_unresponsiveTransport) {
|
||||
_unresponsiveTransport = false;
|
||||
|
||||
Transport::shared_pointer thisSharedPtr = shared_from_this();
|
||||
TransportClientMap_t::iterator it = _owners.begin();
|
||||
for(; it!=_owners.end(); it++) {
|
||||
TransportClient::shared_pointer client = it->second.lock();
|
||||
if (client)
|
||||
{
|
||||
EXCEPTION_GUARD(client->transportResponsive(thisSharedPtr));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void BlockingClientTCPTransport::changedTransport() {
|
||||
outgoingIR.reset();
|
||||
|
||||
Lock lock(_mutex);
|
||||
TransportClientMap_t::iterator it = _owners.begin();
|
||||
for(; it!=_owners.end(); it++) {
|
||||
TransportClient::shared_pointer client = it->second.lock();
|
||||
if (client)
|
||||
{
|
||||
EXCEPTION_GUARD(client->transportChanged());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void BlockingClientTCPTransport::send(ByteBuffer* buffer,
|
||||
TransportSendControl* control) {
|
||||
if(_verifyOrEcho) {
|
||||
/*
|
||||
* send verification response message
|
||||
*/
|
||||
|
||||
control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32)+sizeof(int16));
|
||||
|
||||
// receive buffer size
|
||||
buffer->putInt(static_cast<int32>(getReceiveBufferSize()));
|
||||
|
||||
// socket receive buffer size
|
||||
buffer->putInt(static_cast<int32>(getSocketReceiveBufferSize()));
|
||||
|
||||
// connection priority
|
||||
buffer->putShort(getPriority());
|
||||
|
||||
// send immediately
|
||||
control->flush(true);
|
||||
|
||||
_verifyOrEcho = false;
|
||||
}
|
||||
else {
|
||||
control->startMessage(CMD_ECHO, 0);
|
||||
// send immediately
|
||||
control->flush(true);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -1,136 +0,0 @@
|
||||
/**
|
||||
* Copyright - See the COPYRIGHT that is included with this distribution.
|
||||
* pvAccessCPP is distributed subject to a Software License Agreement found
|
||||
* in file LICENSE that is included with this distribution.
|
||||
*/
|
||||
|
||||
#include <pv/blockingTCP.h>
|
||||
#include <pv/remote.h>
|
||||
#include <pv/logger.h>
|
||||
|
||||
#include <pv/lock.h>
|
||||
#include <pv/byteBuffer.h>
|
||||
|
||||
/* standard */
|
||||
#include <map>
|
||||
|
||||
using namespace epics::pvData;
|
||||
using namespace std;
|
||||
|
||||
namespace epics {
|
||||
namespace pvAccess {
|
||||
|
||||
BlockingServerTCPTransport::BlockingServerTCPTransport(
|
||||
Context::shared_pointer const & context, SOCKET channel,
|
||||
auto_ptr<ResponseHandler>& responseHandler, int receiveBufferSize) :
|
||||
BlockingTCPTransport(context, channel, responseHandler, receiveBufferSize, PVA_DEFAULT_PRIORITY),
|
||||
_lastChannelSID(0)
|
||||
{
|
||||
// for performance testing
|
||||
setFlushStrategy(DELAYED);
|
||||
_delay = 0.000;
|
||||
|
||||
// NOTE: priority not yet known, default priority is used to register/unregister
|
||||
// TODO implement priorities in Reactor... not that user will
|
||||
// change it.. still getPriority() must return "registered" priority!
|
||||
|
||||
//start();
|
||||
}
|
||||
|
||||
BlockingServerTCPTransport::~BlockingServerTCPTransport() {
|
||||
}
|
||||
|
||||
void BlockingServerTCPTransport::destroyAllChannels() {
|
||||
Lock lock(_channelsMutex);
|
||||
if(_channels.size()==0) return;
|
||||
|
||||
char ipAddrStr[64];
|
||||
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
|
||||
|
||||
LOG(
|
||||
logLevelDebug,
|
||||
"Transport to %s still has %u channel(s) active and closing...",
|
||||
ipAddrStr, (unsigned int)_channels.size());
|
||||
|
||||
map<pvAccessID, ServerChannel::shared_pointer>::iterator it = _channels.begin();
|
||||
for(; it!=_channels.end(); it++)
|
||||
it->second->destroy();
|
||||
|
||||
_channels.clear();
|
||||
}
|
||||
|
||||
void BlockingServerTCPTransport::internalClose(bool force) {
|
||||
Transport::shared_pointer thisSharedPtr = shared_from_this();
|
||||
BlockingTCPTransport::internalClose(force);
|
||||
destroyAllChannels();
|
||||
}
|
||||
|
||||
void BlockingServerTCPTransport::internalPostClose(bool forced) {
|
||||
BlockingTCPTransport::internalPostClose(forced);
|
||||
}
|
||||
|
||||
pvAccessID BlockingServerTCPTransport::preallocateChannelSID() {
|
||||
Lock lock(_channelsMutex);
|
||||
// search first free (theoretically possible loop of death)
|
||||
pvAccessID sid = ++_lastChannelSID;
|
||||
while(_channels.find(sid)!=_channels.end())
|
||||
sid = ++_lastChannelSID;
|
||||
return sid;
|
||||
}
|
||||
|
||||
void BlockingServerTCPTransport::registerChannel(pvAccessID sid, ServerChannel::shared_pointer const & channel) {
|
||||
Lock lock(_channelsMutex);
|
||||
_channels[sid] = channel;
|
||||
}
|
||||
|
||||
void BlockingServerTCPTransport::unregisterChannel(pvAccessID sid) {
|
||||
Lock lock(_channelsMutex);
|
||||
_channels.erase(sid);
|
||||
}
|
||||
|
||||
ServerChannel::shared_pointer BlockingServerTCPTransport::getChannel(pvAccessID sid) {
|
||||
Lock lock(_channelsMutex);
|
||||
|
||||
map<pvAccessID, ServerChannel::shared_pointer>::iterator it = _channels.find(sid);
|
||||
if(it!=_channels.end()) return it->second;
|
||||
|
||||
return ServerChannel::shared_pointer();
|
||||
}
|
||||
|
||||
int BlockingServerTCPTransport::getChannelCount() {
|
||||
Lock lock(_channelsMutex);
|
||||
return static_cast<int>(_channels.size());
|
||||
}
|
||||
|
||||
void BlockingServerTCPTransport::send(ByteBuffer* buffer,
|
||||
TransportSendControl* control) {
|
||||
|
||||
//
|
||||
// set byte order control message
|
||||
//
|
||||
|
||||
control->ensureBuffer(PVA_MESSAGE_HEADER_SIZE);
|
||||
buffer->putByte(PVA_MAGIC);
|
||||
buffer->putByte(PVA_VERSION);
|
||||
buffer->putByte(0x01 | ((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00)); // control + big endian
|
||||
buffer->putByte(2); // set byte order
|
||||
buffer->putInt(0);
|
||||
|
||||
|
||||
//
|
||||
// send verification message
|
||||
//
|
||||
control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32));
|
||||
|
||||
// receive buffer size
|
||||
buffer->putInt(static_cast<int32>(getReceiveBufferSize()));
|
||||
|
||||
// socket receive buffer size
|
||||
buffer->putInt(static_cast<int32>(getSocketReceiveBufferSize()));
|
||||
|
||||
// send immediately
|
||||
control->flush(true);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -40,584 +40,9 @@
|
||||
#include <pv/namedLockPattern.h>
|
||||
#include <pv/inetAddressUtil.h>
|
||||
|
||||
// not implemented anyway
|
||||
#define FLOW_CONTROL 0
|
||||
|
||||
namespace epics {
|
||||
namespace pvAccess {
|
||||
|
||||
//class MonitorSender;
|
||||
|
||||
enum ReceiveStage {
|
||||
READ_FROM_SOCKET, PROCESS_HEADER, PROCESS_PAYLOAD, UNDEFINED_STAGE
|
||||
};
|
||||
|
||||
class BlockingTCPTransport :
|
||||
public Transport,
|
||||
public TransportSendControl,
|
||||
public std::tr1::enable_shared_from_this<BlockingTCPTransport>
|
||||
{
|
||||
protected:
|
||||
BlockingTCPTransport(Context::shared_pointer const & context, SOCKET channel,
|
||||
std::auto_ptr<ResponseHandler>& responseHandler, int receiveBufferSize,
|
||||
epics::pvData::int16 priority);
|
||||
|
||||
public:
|
||||
virtual bool isClosed() {
|
||||
return _closed.get();
|
||||
}
|
||||
|
||||
virtual epics::pvData::int8 getRevision() const {
|
||||
return PVA_PROTOCOL_REVISION;
|
||||
}
|
||||
|
||||
virtual void setRemoteRevision(epics::pvData::int8 revision) {
|
||||
_remoteTransportRevision = revision;
|
||||
}
|
||||
|
||||
virtual void setRemoteTransportReceiveBufferSize(std::size_t remoteTransportReceiveBufferSize) {
|
||||
_remoteTransportReceiveBufferSize = remoteTransportReceiveBufferSize;
|
||||
}
|
||||
|
||||
virtual void setRemoteTransportSocketReceiveBufferSize(std::size_t socketReceiveBufferSize) {
|
||||
_remoteTransportSocketReceiveBufferSize = socketReceiveBufferSize;
|
||||
}
|
||||
|
||||
virtual epics::pvData::String getType() const {
|
||||
return epics::pvData::String("TCP");
|
||||
}
|
||||
|
||||
virtual void aliveNotification() {
|
||||
// noop
|
||||
}
|
||||
|
||||
virtual void changedTransport() {
|
||||
// noop
|
||||
}
|
||||
|
||||
virtual const osiSockAddr* getRemoteAddress() const {
|
||||
return &_socketAddress;
|
||||
}
|
||||
|
||||
virtual epics::pvData::int16 getPriority() const {
|
||||
return _priority;
|
||||
}
|
||||
|
||||
virtual std::size_t getReceiveBufferSize() const {
|
||||
return _socketBuffer->getSize();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get remote transport receive buffer size (in bytes).
|
||||
* @return remote transport receive buffer size
|
||||
*/
|
||||
virtual std::size_t getRemoteTransportReceiveBufferSize() const {
|
||||
return _remoteTransportReceiveBufferSize;
|
||||
}
|
||||
|
||||
virtual std::size_t getSocketReceiveBufferSize() const;
|
||||
|
||||
virtual bool verify(epics::pvData::int32 timeoutMs) {
|
||||
return _verifiedEvent.wait(timeoutMs/1000.0);
|
||||
|
||||
//epics::pvData::Lock lock(_verifiedMutex);
|
||||
//return _verified;
|
||||
}
|
||||
|
||||
virtual void verified() {
|
||||
epics::pvData::Lock lock(_verifiedMutex);
|
||||
_verified = true;
|
||||
_verifiedEvent.signal();
|
||||
}
|
||||
|
||||
virtual void setRecipient(const osiSockAddr& /*sendTo*/) {
|
||||
// noop
|
||||
}
|
||||
|
||||
virtual void flush(bool lastMessageCompleted);
|
||||
virtual void startMessage(epics::pvData::int8 command, std::size_t ensureCapacity);
|
||||
virtual void endMessage();
|
||||
|
||||
virtual void flushSerializeBuffer() {
|
||||
flush(false);
|
||||
}
|
||||
|
||||
virtual void ensureBuffer(std::size_t size);
|
||||
|
||||
virtual void alignBuffer(std::size_t alignment);
|
||||
|
||||
virtual void ensureData(std::size_t size);
|
||||
|
||||
virtual void alignData(std::size_t alignment);
|
||||
|
||||
virtual bool directSerialize(epics::pvData::ByteBuffer *existingBuffer, const char* toSerialize,
|
||||
std::size_t elementCount, std::size_t elementSize);
|
||||
|
||||
virtual bool directDeserialize(epics::pvData::ByteBuffer *existingBuffer, char* deserializeTo,
|
||||
std::size_t elementCount, std::size_t elementSize);
|
||||
|
||||
void processReadIntoDirectBuffer(std::size_t bytesToRead);
|
||||
|
||||
virtual void close();
|
||||
|
||||
virtual void setByteOrder(int /*byteOrder*/)
|
||||
{
|
||||
// not used this this implementation
|
||||
}
|
||||
|
||||
FlushStrategy getFlushStrategy() {
|
||||
return _flushStrategy;
|
||||
}
|
||||
|
||||
void setFlushStrategy(FlushStrategy flushStrategy) {
|
||||
_flushStrategy = flushStrategy;
|
||||
}
|
||||
|
||||
//void requestFlush();
|
||||
|
||||
/**
|
||||
* Close and free connection resources.
|
||||
*/
|
||||
void freeConnectionResorces();
|
||||
|
||||
/**
|
||||
* Starts the receive and send threads
|
||||
*/
|
||||
virtual void start();
|
||||
|
||||
virtual void enqueueSendRequest(TransportSender::shared_pointer const & sender);
|
||||
|
||||
//void enqueueMonitorSendRequest(TransportSender::shared_pointer const & sender);
|
||||
|
||||
virtual void enqueueOnlySendRequest(TransportSender::shared_pointer const & sender);
|
||||
|
||||
virtual void flushSendQueue();
|
||||
|
||||
virtual void cachedSerialize(
|
||||
const std::tr1::shared_ptr<const epics::pvData::Field>& field, epics::pvData::ByteBuffer* buffer)
|
||||
{
|
||||
outgoingIR.serialize(field, buffer, this);
|
||||
}
|
||||
|
||||
virtual std::tr1::shared_ptr<const epics::pvData::Field>
|
||||
cachedDeserialize(epics::pvData::ByteBuffer* buffer)
|
||||
{
|
||||
return incomingIR.deserialize(buffer, this);
|
||||
}
|
||||
|
||||
protected:
|
||||
|
||||
virtual void processReadCached(bool nestedCall,
|
||||
ReceiveStage inStage, std::size_t requiredBytes);
|
||||
|
||||
/**
|
||||
* Called to any resources just before closing transport
|
||||
* @param[in] force flag indicating if forced (e.g. forced
|
||||
* disconnect) is required
|
||||
*/
|
||||
virtual void internalClose(bool force);
|
||||
|
||||
/**
|
||||
* Called to any resources just after closing transport and without any locks held on transport
|
||||
* @param[in] force flag indicating if forced (e.g. forced
|
||||
* disconnect) is required
|
||||
*/
|
||||
virtual void internalPostClose(bool force);
|
||||
|
||||
/**
|
||||
* Send a buffer through the transport.
|
||||
* NOTE: TCP sent buffer/sending has to be synchronized (not done by this method).
|
||||
* @param buffer[in] buffer to be sent
|
||||
* @return success indicator
|
||||
*/
|
||||
virtual bool send(epics::pvData::ByteBuffer* buffer);
|
||||
|
||||
virtual ~BlockingTCPTransport();
|
||||
|
||||
|
||||
#if FLOW_CONTROL
|
||||
/**
|
||||
* Default marker period.
|
||||
*/
|
||||
static const std::size_t MARKER_PERIOD = 1024;
|
||||
#endif
|
||||
static const std::size_t MAX_ENSURE_DATA_BUFFER_SIZE = 1024;
|
||||
|
||||
// TODO
|
||||
double _delay;
|
||||
|
||||
/****** finally initialized at construction time and after start (called by the same thread) ********/
|
||||
|
||||
/**
|
||||
* Corresponding channel.
|
||||
*/
|
||||
SOCKET _channel;
|
||||
|
||||
/**
|
||||
* Cached socket address.
|
||||
*/
|
||||
osiSockAddr _socketAddress;
|
||||
|
||||
/**
|
||||
* Priority.
|
||||
* NOTE: Priority cannot just be changed, since it is registered
|
||||
* in transport registry with given priority.
|
||||
*/
|
||||
epics::pvData::int16 _priority;
|
||||
// TODO to be implemeneted
|
||||
|
||||
/**
|
||||
* PVAS response handler.
|
||||
*/
|
||||
std::auto_ptr<ResponseHandler> _responseHandler;
|
||||
|
||||
// TODO review int vs std::size_t
|
||||
|
||||
/**
|
||||
* Send buffer size.
|
||||
*/
|
||||
std::size_t _maxPayloadSize;
|
||||
|
||||
/**
|
||||
* Send buffer size.
|
||||
*/
|
||||
int _socketSendBufferSize;
|
||||
|
||||
/**
|
||||
* Marker "period" in bytes (every X bytes marker should be set).
|
||||
*/
|
||||
epics::pvData::int64 _markerPeriodBytes;
|
||||
|
||||
|
||||
FlushStrategy _flushStrategy;
|
||||
|
||||
|
||||
epicsThreadId _rcvThreadId;
|
||||
|
||||
epicsThreadId _sendThreadId;
|
||||
|
||||
// TODO
|
||||
//MonitorSender* _monitorSender;
|
||||
|
||||
Context::shared_pointer _context;
|
||||
|
||||
bool _autoDelete;
|
||||
|
||||
|
||||
|
||||
/**** after verification ****/
|
||||
|
||||
/**
|
||||
* Remote side transport revision (minor).
|
||||
*/
|
||||
epics::pvData::int8 _remoteTransportRevision;
|
||||
|
||||
/**
|
||||
* Remote side transport receive buffer size.
|
||||
*/
|
||||
size_t _remoteTransportReceiveBufferSize;
|
||||
|
||||
/**
|
||||
* Remote side transport socket receive buffer size.
|
||||
*/
|
||||
size_t _remoteTransportSocketReceiveBufferSize;
|
||||
|
||||
|
||||
|
||||
/*** send thread only - no need to sync ***/
|
||||
// NOTE: now all send-related external calls are TransportSender IF
|
||||
// and its reference is only valid when called from send thread
|
||||
|
||||
// initialized at construction time
|
||||
std::deque<TransportSender::shared_pointer> _sendQueue;
|
||||
epics::pvData::Mutex _sendQueueMutex;
|
||||
|
||||
// initialized at construction time
|
||||
// std::deque<TransportSender::shared_pointer> _monitorSendQueue;
|
||||
// epics::pvData::Mutex _monitorMutex;
|
||||
|
||||
/**
|
||||
* Send buffer.
|
||||
*/
|
||||
epics::pvData::ByteBuffer* _sendBuffer;
|
||||
|
||||
#if FLOW_CONTROL
|
||||
/**
|
||||
* Next planned marker position.
|
||||
*/
|
||||
epics::pvData::int64 _nextMarkerPosition;
|
||||
#endif
|
||||
|
||||
/**
|
||||
* Send pending flag.
|
||||
*/
|
||||
bool _sendPending;
|
||||
|
||||
/**
|
||||
* Last message start position.
|
||||
*/
|
||||
int _lastMessageStartPosition;
|
||||
|
||||
epics::pvData::int8 _lastSegmentedMessageType;
|
||||
epics::pvData::int8 _lastSegmentedMessageCommand;
|
||||
|
||||
bool _flushRequested;
|
||||
|
||||
int _sendBufferSentPosition;
|
||||
|
||||
epics::pvData::int8 _byteOrderFlag;
|
||||
|
||||
/**
|
||||
* Outgoing (codes generated by this party) introspection registry.
|
||||
*/
|
||||
IntrospectionRegistry outgoingIR;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/*** receive thread only - no need to sync ***/
|
||||
|
||||
// initialized at construction time
|
||||
epics::pvData::ByteBuffer* _socketBuffer;
|
||||
|
||||
std::size_t _startPosition;
|
||||
|
||||
std::size_t _storedPayloadSize;
|
||||
std::size_t _storedPosition;
|
||||
std::size_t _storedLimit;
|
||||
|
||||
epics::pvData::int8 _version;
|
||||
epics::pvData::int8 _packetType;
|
||||
epics::pvData::int8 _command;
|
||||
std::size_t _payloadSize;
|
||||
|
||||
ReceiveStage _stage;
|
||||
|
||||
std::size_t _directPayloadRead;
|
||||
char * _directBuffer;
|
||||
|
||||
#if FLOW_CONTROL
|
||||
|
||||
/**
|
||||
* Total bytes received.
|
||||
*/
|
||||
epics::pvData::int64 _totalBytesReceived;
|
||||
#endif
|
||||
|
||||
/**
|
||||
* Incoming (codes generated by other party) introspection registry.
|
||||
*/
|
||||
IntrospectionRegistry incomingIR;
|
||||
|
||||
|
||||
|
||||
/*** send/receive thread shared ***/
|
||||
|
||||
/**
|
||||
* Connection status
|
||||
* NOTE: synced by _mutex
|
||||
*/
|
||||
AtomicBoolean _closed;
|
||||
|
||||
// NOTE: synced by _mutex
|
||||
bool _sendThreadExited;
|
||||
|
||||
epics::pvData::Mutex _mutex;
|
||||
|
||||
|
||||
bool _verified;
|
||||
epics::pvData::Mutex _verifiedMutex;
|
||||
|
||||
|
||||
|
||||
|
||||
epics::pvData::Event _sendQueueEvent;
|
||||
|
||||
epics::pvData::Event _verifiedEvent;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#if FLOW_CONTROL
|
||||
/**
|
||||
* Marker to send.
|
||||
* NOTE: synced by _flowControlMutex
|
||||
*/
|
||||
int _markerToSend;
|
||||
|
||||
/**
|
||||
* Total bytes sent.
|
||||
* NOTE: synced by _flowControlMutex
|
||||
*/
|
||||
epics::pvData::int64 _totalBytesSent;
|
||||
|
||||
/**
|
||||
* Calculated remote free buffer size.
|
||||
* NOTE: synced by _flowControlMutex
|
||||
*/
|
||||
epics::pvData::int64 _remoteBufferFreeSpace;
|
||||
|
||||
epics::pvData::Mutex _flowControlMutex;
|
||||
#endif
|
||||
|
||||
private:
|
||||
|
||||
/**
|
||||
* Internal method that clears and releases buffer.
|
||||
* sendLock and sendBufferLock must be hold while calling this method.
|
||||
*/
|
||||
void clearAndReleaseBuffer();
|
||||
|
||||
void endMessage(bool hasMoreSegments);
|
||||
|
||||
bool flush();
|
||||
|
||||
void processSendQueue();
|
||||
|
||||
static void rcvThreadRunner(void* param);
|
||||
|
||||
static void sendThreadRunner(void* param);
|
||||
|
||||
/**
|
||||
* Free all send buffers (return them to the cached buffer allocator).
|
||||
*/
|
||||
void freeSendBuffers();
|
||||
};
|
||||
|
||||
|
||||
class BlockingClientTCPTransport : public BlockingTCPTransport,
|
||||
public TransportSender,
|
||||
public epics::pvData::TimerCallback {
|
||||
|
||||
public:
|
||||
POINTER_DEFINITIONS(BlockingClientTCPTransport);
|
||||
|
||||
private:
|
||||
BlockingClientTCPTransport(Context::shared_pointer const & context, SOCKET channel,
|
||||
std::auto_ptr<ResponseHandler>& responseHandler, int receiveBufferSize,
|
||||
TransportClient::shared_pointer client, epics::pvData::int8 remoteTransportRevision,
|
||||
float beaconInterval, epics::pvData::int16 priority);
|
||||
|
||||
public:
|
||||
static shared_pointer create(Context::shared_pointer const & context, SOCKET channel,
|
||||
std::auto_ptr<ResponseHandler>& responseHandler, int receiveBufferSize,
|
||||
TransportClient::shared_pointer client, epics::pvData::int8 remoteTransportRevision,
|
||||
float beaconInterval, epics::pvData::int16 priority)
|
||||
{
|
||||
shared_pointer thisPointer(
|
||||
new BlockingClientTCPTransport(context, channel, responseHandler, receiveBufferSize,
|
||||
client, remoteTransportRevision, beaconInterval, priority)
|
||||
);
|
||||
thisPointer->start();
|
||||
return thisPointer;
|
||||
}
|
||||
|
||||
virtual void start();
|
||||
|
||||
virtual ~BlockingClientTCPTransport();
|
||||
|
||||
virtual void timerStopped() {
|
||||
// noop
|
||||
}
|
||||
|
||||
virtual void callback();
|
||||
|
||||
/**
|
||||
* Acquires transport.
|
||||
* @param client client (channel) acquiring the transport
|
||||
* @return <code>true</code> if transport was granted, <code>false</code> otherwise.
|
||||
*/
|
||||
virtual bool acquire(TransportClient::shared_pointer const & client);
|
||||
|
||||
/**
|
||||
* Releases transport.
|
||||
* @param client client (channel) releasing the transport
|
||||
*/
|
||||
virtual void release(pvAccessID clientId);
|
||||
//virtual void release(TransportClient::shared_pointer const & client);
|
||||
|
||||
/**
|
||||
* Alive notification.
|
||||
* This method needs to be called (by newly received data or beacon)
|
||||
* at least once in this period, if not echo will be issued
|
||||
* and if there is not response to it, transport will be considered as unresponsive.
|
||||
*/
|
||||
virtual void aliveNotification();
|
||||
|
||||
/**
|
||||
* Changed transport (server restared) notify.
|
||||
*/
|
||||
virtual void changedTransport();
|
||||
|
||||
virtual void lock() {
|
||||
// noop
|
||||
}
|
||||
|
||||
virtual void unlock() {
|
||||
// noop
|
||||
}
|
||||
|
||||
virtual void acquire() {
|
||||
// noop, since does not make sence on itself
|
||||
}
|
||||
|
||||
virtual void release() {
|
||||
// noop, since does not make sence on itself
|
||||
}
|
||||
|
||||
virtual void send(epics::pvData::ByteBuffer* buffer,
|
||||
TransportSendControl* control);
|
||||
|
||||
protected:
|
||||
|
||||
virtual void internalClose(bool force);
|
||||
virtual void internalPostClose(bool force);
|
||||
|
||||
private:
|
||||
|
||||
/**
|
||||
* Owners (users) of the transport.
|
||||
*/
|
||||
// TODO consider using TR1 hash map
|
||||
typedef std::map<pvAccessID, TransportClient::weak_pointer> TransportClientMap_t;
|
||||
TransportClientMap_t _owners;
|
||||
|
||||
/**
|
||||
* Connection timeout (no-traffic) flag.
|
||||
*/
|
||||
double _connectionTimeout;
|
||||
|
||||
/**
|
||||
* Unresponsive transport flag.
|
||||
*/
|
||||
bool _unresponsiveTransport;
|
||||
|
||||
/**
|
||||
* Timestamp of last "live" event on this transport.
|
||||
*/
|
||||
epicsTimeStamp _aliveTimestamp;
|
||||
|
||||
bool _verifyOrEcho;
|
||||
|
||||
/**
|
||||
* Unresponsive transport notify.
|
||||
*/
|
||||
void unresponsiveTransport();
|
||||
|
||||
/**
|
||||
* Notifies clients about disconnect.
|
||||
*/
|
||||
void closedNotifyClients();
|
||||
|
||||
/**
|
||||
* Responsive transport notify.
|
||||
*/
|
||||
void responsiveTransport();
|
||||
};
|
||||
|
||||
/**
|
||||
* Channel Access TCP connector.
|
||||
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
|
||||
@@ -672,152 +97,6 @@ namespace epics {
|
||||
|
||||
};
|
||||
|
||||
class BlockingServerTCPTransport : public BlockingTCPTransport,
|
||||
public ChannelHostingTransport,
|
||||
public TransportSender {
|
||||
public:
|
||||
POINTER_DEFINITIONS(BlockingServerTCPTransport);
|
||||
|
||||
private:
|
||||
BlockingServerTCPTransport(Context::shared_pointer const & context, SOCKET channel,
|
||||
std::auto_ptr<ResponseHandler>& responseHandler, int receiveBufferSize);
|
||||
public:
|
||||
static shared_pointer create(Context::shared_pointer const & context, SOCKET channel,
|
||||
std::auto_ptr<ResponseHandler>& responseHandler, int receiveBufferSize)
|
||||
{
|
||||
shared_pointer thisPointer(
|
||||
new BlockingServerTCPTransport(context, channel, responseHandler, receiveBufferSize)
|
||||
);
|
||||
thisPointer->start();
|
||||
return thisPointer;
|
||||
}
|
||||
|
||||
virtual bool acquire(std::tr1::shared_ptr<TransportClient> const & /*client*/)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
virtual void release(pvAccessID /*clientId*/) {}
|
||||
|
||||
/**
|
||||
* Preallocate new channel SID.
|
||||
* @return new channel server id (SID).
|
||||
*/
|
||||
virtual pvAccessID preallocateChannelSID();
|
||||
|
||||
/**
|
||||
* De-preallocate new channel SID.
|
||||
* @param sid preallocated channel SID.
|
||||
*/
|
||||
virtual void depreallocateChannelSID(pvAccessID /*sid*/) {
|
||||
// noop
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a new channel.
|
||||
* @param sid preallocated channel SID.
|
||||
* @param channel channel to register.
|
||||
*/
|
||||
virtual void registerChannel(pvAccessID sid, ServerChannel::shared_pointer const & channel);
|
||||
|
||||
/**
|
||||
* Unregister a new channel (and deallocates its handle).
|
||||
* @param sid SID
|
||||
*/
|
||||
virtual void unregisterChannel(pvAccessID sid);
|
||||
|
||||
/**
|
||||
* Get channel by its SID.
|
||||
* @param sid channel SID
|
||||
* @return channel with given SID, <code>NULL</code> otherwise
|
||||
*/
|
||||
virtual ServerChannel::shared_pointer getChannel(pvAccessID sid);
|
||||
|
||||
/**
|
||||
* Get channel count.
|
||||
* @return channel count.
|
||||
*/
|
||||
virtual int getChannelCount();
|
||||
|
||||
virtual epics::pvData::PVField::shared_pointer getSecurityToken() {
|
||||
return epics::pvData::PVField::shared_pointer();
|
||||
}
|
||||
|
||||
virtual void lock() {
|
||||
// noop
|
||||
}
|
||||
|
||||
virtual void unlock() {
|
||||
// noop
|
||||
}
|
||||
|
||||
virtual void acquire() {
|
||||
// noop, since does not make sence on itself
|
||||
}
|
||||
|
||||
virtual void release() {
|
||||
// noop, since does not make sence on itself
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify transport. Server side is self-verified.
|
||||
*/
|
||||
virtual bool verify(epics::pvData::int32 /*timeoutMs*/) {
|
||||
TransportSender::shared_pointer transportSender = std::tr1::dynamic_pointer_cast<TransportSender>(shared_from_this());
|
||||
enqueueSendRequest(transportSender);
|
||||
verified();
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* PVA connection validation request.
|
||||
* A server sends a validate connection message when it receives a new connection.
|
||||
* The message indicates that the server is ready to receive requests; the client must
|
||||
* not send any messages on the connection until it has received the validate connection message
|
||||
* from the server. No reply to the message is expected by the server.
|
||||
* The purpose of the validate connection message is two-fold:
|
||||
* It informs the client of the protocol version supported by the server.
|
||||
* It prevents the client from writing a request message to its local transport
|
||||
* buffers until after the server has acknowledged that it can actually process the
|
||||
* request. This avoids a race condition caused by the server's TCP/IP stack
|
||||
* accepting connections in its backlog while the server is in the process of shutting down:
|
||||
* if the client were to send a request in this situation, the request
|
||||
* would be lost but the client could not safely re-issue the request because that
|
||||
* might violate at-most-once semantics.
|
||||
* The validate connection message guarantees that a server is not in the middle
|
||||
* of shutting down when the server's TCP/IP stack accepts an incoming connection
|
||||
* and so avoids the race condition.
|
||||
* @see org.epics.ca.impl.remote.TransportSender#send(java.nio.ByteBuffer, org.epics.ca.impl.remote.TransportSendControl)
|
||||
*/
|
||||
virtual void send(epics::pvData::ByteBuffer* buffer,
|
||||
TransportSendControl* control);
|
||||
|
||||
virtual ~BlockingServerTCPTransport();
|
||||
|
||||
protected:
|
||||
|
||||
virtual void internalClose(bool force);
|
||||
virtual void internalPostClose(bool force);
|
||||
|
||||
private:
|
||||
/**
|
||||
* Last SID cache.
|
||||
*/
|
||||
pvAccessID _lastChannelSID;
|
||||
|
||||
/**
|
||||
* Channel table (SID -> channel mapping).
|
||||
*/
|
||||
std::map<pvAccessID, ServerChannel::shared_pointer> _channels;
|
||||
|
||||
epics::pvData::Mutex _channelsMutex;
|
||||
|
||||
/**
|
||||
* Destroy all channels.
|
||||
*/
|
||||
void destroyAllChannels();
|
||||
};
|
||||
|
||||
class ResponseHandlerFactory
|
||||
{
|
||||
public:
|
||||
|
||||
@@ -191,15 +191,12 @@ namespace pvAccess {
|
||||
LOG(logLevelDebug, "Error getting SO_SNDBUF: %s", strBuffer);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create transport, it registers itself to the registry.
|
||||
* Each transport should have its own response handler since it is not "shareable"
|
||||
*/
|
||||
// TODO it is shareable?!!! but code is not adopted to it...
|
||||
std::auto_ptr<ResponseHandler> responseHandler = _responseHandlerFactory->createResponseHandler();
|
||||
BlockingServerTCPTransportCodec::shared_pointer transport =
|
||||
BlockingServerTCPTransportCodec::create(
|
||||
detail::BlockingServerTCPTransportCodec::shared_pointer transport =
|
||||
detail::BlockingServerTCPTransportCodec::create(
|
||||
_context,
|
||||
newClient,
|
||||
responseHandler,
|
||||
|
||||
@@ -150,7 +150,7 @@ namespace epics {
|
||||
LOG(logLevelDebug, "Error getting SO_SNDBUF: %s", strBuffer);
|
||||
}
|
||||
|
||||
transport = BlockingClientTCPTransportCodec::create(
|
||||
transport = detail::BlockingClientTCPTransportCodec::create(
|
||||
context, socket, responseHandler, _receiveBufferSize, _socketSendBufferSize,
|
||||
client, transportRevision, _beaconInterval, priority);
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -21,7 +21,6 @@
|
||||
#include <stdexcept>
|
||||
#include <limits>
|
||||
|
||||
|
||||
#include <pv/codec.h>
|
||||
|
||||
using namespace epics::pvData;
|
||||
@@ -30,6 +29,7 @@ using namespace epics::pvAccess;
|
||||
|
||||
namespace epics {
|
||||
namespace pvAccess {
|
||||
namespace detail {
|
||||
|
||||
const std::size_t AbstractCodec::MAX_MESSAGE_PROCESS = 100;
|
||||
const std::size_t AbstractCodec::MAX_MESSAGE_SEND = 100;
|
||||
@@ -91,16 +91,10 @@ namespace epics {
|
||||
_sendBuffer->getSize() - 2*PVA_MESSAGE_HEADER_SIZE;
|
||||
_socketSendBufferSize = socketSendBufferSize;
|
||||
_blockingProcessQueue = blockingProcessQueue;
|
||||
LOG(logLevelTrace, "AbstractCodec constructed (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
}
|
||||
|
||||
|
||||
void AbstractCodec::processRead() {
|
||||
|
||||
LOG(logLevelTrace, "AbstractCodec::processRead: enter (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
switch (_readMode)
|
||||
{
|
||||
case NORMAL:
|
||||
@@ -118,10 +112,6 @@ namespace epics {
|
||||
|
||||
void AbstractCodec::processHeader() {
|
||||
|
||||
LOG(logLevelTrace, "AbstractCodec::processHeader enter (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
|
||||
// magic code
|
||||
int8_t magicCode = _socketBuffer->getByte();
|
||||
|
||||
@@ -153,10 +143,6 @@ namespace epics {
|
||||
|
||||
void AbstractCodec::processReadNormal() {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::processReadNormal enter (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
try
|
||||
{
|
||||
std::size_t messageProcessCount = 0;
|
||||
@@ -286,10 +272,6 @@ namespace epics {
|
||||
|
||||
void AbstractCodec::processReadSegmented() {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::processReadSegmented enter (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
while (true)
|
||||
{
|
||||
// read as much as available, but at least for a header
|
||||
@@ -335,18 +317,9 @@ namespace epics {
|
||||
std::size_t requiredBytes,
|
||||
bool persistent) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::readToBuffer enter requiredBytes: %u,"
|
||||
" persistant: %d (threadId: %u)",
|
||||
requiredBytes, persistent, epicsThreadGetIdSelf());
|
||||
|
||||
// do we already have requiredBytes available?
|
||||
std::size_t remainingBytes = _socketBuffer->getRemaining();
|
||||
if (remainingBytes >= requiredBytes) {
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::readToBuffer requiredBytes: %u"
|
||||
" <= remainingBytes: %d (threadId: %u)",
|
||||
requiredBytes, remainingBytes);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -378,17 +351,8 @@ namespace epics {
|
||||
{
|
||||
int bytesRead = read(_socketBuffer.get());
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::readToBuffer READ BYTES: %d (threadId: %u)",
|
||||
bytesRead, epicsThreadGetIdSelf());
|
||||
|
||||
if (bytesRead < 0)
|
||||
{
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::before close on bytesRead < 0 condition (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
close();
|
||||
throw connection_closed_exception("bytesRead < 0");
|
||||
}
|
||||
@@ -418,11 +382,6 @@ namespace epics {
|
||||
|
||||
void AbstractCodec::ensureData(std::size_t size) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::ensureData enter: size: %u (threadId: %u)",
|
||||
size, epicsThreadGetIdSelf());
|
||||
|
||||
|
||||
// enough of data?
|
||||
if (_socketBuffer->getRemaining() >= size)
|
||||
return;
|
||||
@@ -553,11 +512,6 @@ namespace epics {
|
||||
std::size_t value,
|
||||
std::size_t alignment) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::alignedValue enter: value: %u, alignment:%u"
|
||||
" (threadId: %u)",
|
||||
value, alignment, epicsThreadGetIdSelf());
|
||||
|
||||
std::size_t k = (alignment - 1);
|
||||
return (value + k) & (~k);
|
||||
}
|
||||
@@ -565,10 +519,6 @@ namespace epics {
|
||||
|
||||
void AbstractCodec::alignData(std::size_t alignment) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::alignData enter: alignment:%u (threadId: %u)",
|
||||
alignment, epicsThreadGetIdSelf());
|
||||
|
||||
std::size_t k = (alignment - 1);
|
||||
std::size_t pos = _socketBuffer->getPosition();
|
||||
std::size_t newpos = (pos + k) & (~k);
|
||||
@@ -592,10 +542,6 @@ namespace epics {
|
||||
|
||||
void AbstractCodec::alignBuffer(std::size_t alignment) {
|
||||
|
||||
LOG(logLevelTrace, "AbstractCodec::alignBuffer enter:"
|
||||
" alignment:%u (threadId: %u)",
|
||||
alignment, epicsThreadGetIdSelf());
|
||||
|
||||
std::size_t k = (alignment - 1);
|
||||
std::size_t pos = _sendBuffer->getPosition();
|
||||
std::size_t newpos = (pos + k) & (~k);
|
||||
@@ -612,11 +558,6 @@ namespace epics {
|
||||
epics::pvData::int8 command,
|
||||
std::size_t ensureCapacity) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::startMessage enter: command:%x "
|
||||
" ensureCapacity:%u (threadId: %u)",
|
||||
command, ensureCapacity, epicsThreadGetIdSelf());
|
||||
|
||||
_lastMessageStartPosition =
|
||||
std::numeric_limits<size_t>::max(); // TODO revise this
|
||||
ensureBuffer(
|
||||
@@ -640,11 +581,6 @@ namespace epics {
|
||||
epics::pvData::int8 command,
|
||||
epics::pvData::int32 data) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::putControlMessage enter: command:%x "
|
||||
"data:%d (threadId: %u)",
|
||||
command, data, epicsThreadGetIdSelf());
|
||||
|
||||
_lastMessageStartPosition =
|
||||
std::numeric_limits<size_t>::max(); // TODO revise this
|
||||
ensureBuffer(PVA_MESSAGE_HEADER_SIZE);
|
||||
@@ -657,20 +593,12 @@ namespace epics {
|
||||
|
||||
|
||||
void AbstractCodec::endMessage() {
|
||||
|
||||
LOG(logLevelTrace, "AbstractCodec::endMessage enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
endMessage(false);
|
||||
}
|
||||
|
||||
|
||||
void AbstractCodec::endMessage(bool hasMoreSegments) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::endMessage enter: hasMoreSegments:%d (threadId: %u)",
|
||||
hasMoreSegments, epicsThreadGetIdSelf());
|
||||
|
||||
if (_lastMessageStartPosition != std::numeric_limits<size_t>::max())
|
||||
{
|
||||
std::size_t lastPayloadBytePosition = _sendBuffer->getPosition();
|
||||
@@ -739,10 +667,6 @@ namespace epics {
|
||||
|
||||
void AbstractCodec::ensureBuffer(std::size_t size) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::ensureBuffer enter: size:%u (threadId: %u)",
|
||||
size, epicsThreadGetIdSelf());
|
||||
|
||||
if (_sendBuffer->getRemaining() >= size)
|
||||
return;
|
||||
|
||||
@@ -763,21 +687,12 @@ namespace epics {
|
||||
|
||||
// assumes startMessage was called (or header is in place), because endMessage(true) is later called that peeks and sets _lastSegmentedMessageType
|
||||
void AbstractCodec::flushSerializeBuffer() {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::flushSerializeBuffer enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
flush(false);
|
||||
}
|
||||
|
||||
|
||||
void AbstractCodec::flush(bool lastMessageCompleted) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::flush enter: lastMessageCompleted:%d (threadId: %u)",
|
||||
lastMessageCompleted, epicsThreadGetIdSelf());
|
||||
|
||||
// automatic end
|
||||
endMessage(!lastMessageCompleted);
|
||||
|
||||
@@ -807,10 +722,6 @@ namespace epics {
|
||||
|
||||
void AbstractCodec::processWrite() {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::processWrite enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
// TODO catch ConnectionClosedException, InvalidStreamException?
|
||||
switch (_writeMode)
|
||||
{
|
||||
@@ -827,10 +738,6 @@ namespace epics {
|
||||
void AbstractCodec::send(ByteBuffer *buffer)
|
||||
{
|
||||
|
||||
LOG(logLevelTrace, "AbstractCodec::send enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
|
||||
// On Windows, limiting the buffer size is important to prevent
|
||||
// poor throughput performances when transferring large amount of
|
||||
// data. See Microsoft KB article KB823764.
|
||||
@@ -856,11 +763,13 @@ namespace epics {
|
||||
//int p = buffer.position();
|
||||
int bytesSent = write(buffer);
|
||||
|
||||
/*
|
||||
if (IS_LOGGABLE(logLevelTrace)) {
|
||||
hexDump(std::string("AbstractCodec::send WRITE"),
|
||||
(const int8 *)buffer->getArray(),
|
||||
buffer->getPosition(), buffer->getRemaining());
|
||||
}
|
||||
*/
|
||||
|
||||
if (bytesSent < 0)
|
||||
{
|
||||
@@ -894,10 +803,6 @@ namespace epics {
|
||||
void AbstractCodec::processSendQueue()
|
||||
{
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::processSendQueue enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
{
|
||||
std::size_t senderProcessed = 0;
|
||||
while (senderProcessed++ < MAX_MESSAGE_SEND)
|
||||
@@ -935,22 +840,12 @@ namespace epics {
|
||||
|
||||
void AbstractCodec::clearSendQueue()
|
||||
{
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::clearSendQueue enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
_sendQueue.clean();
|
||||
}
|
||||
|
||||
|
||||
void AbstractCodec::enqueueSendRequest(
|
||||
TransportSender::shared_pointer const & sender) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::enqueueSendRequest enter: sender is set:%d"
|
||||
" (threadId: %u)",
|
||||
(sender.get() != 0), epicsThreadGetIdSelf());
|
||||
|
||||
_sendQueue.put(sender);
|
||||
scheduleSend();
|
||||
}
|
||||
@@ -958,10 +853,6 @@ namespace epics {
|
||||
|
||||
void AbstractCodec::setSenderThread()
|
||||
{
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::setSenderThread enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
_senderThread = epicsThreadGetIdSelf();
|
||||
}
|
||||
|
||||
@@ -970,10 +861,6 @@ namespace epics {
|
||||
TransportSender::shared_pointer const & sender)
|
||||
{
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::processSender enter: sender is set:%d (threadId: %u)",
|
||||
(sender.get() != 0), epicsThreadGetIdSelf);
|
||||
|
||||
ScopedLock lock(sender);
|
||||
|
||||
try {
|
||||
@@ -1007,11 +894,6 @@ namespace epics {
|
||||
TransportSender::shared_pointer const & sender,
|
||||
std::size_t requiredBufferSize) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::enqueueSendRequest enter: sender is set:%d "
|
||||
"requiredBufferSize:%u (threadId: %u)",
|
||||
(sender.get() != 0), requiredBufferSize, epicsThreadGetIdSelf);
|
||||
|
||||
if (_senderThread == epicsThreadGetIdSelf() &&
|
||||
_sendQueue.empty() &&
|
||||
_sendBuffer->getRemaining() >= requiredBufferSize)
|
||||
@@ -1031,22 +913,12 @@ namespace epics {
|
||||
|
||||
|
||||
void AbstractCodec::setRecipient(osiSockAddr const & sendTo) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::setRecipient enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf);
|
||||
|
||||
_sendTo = sendTo;
|
||||
}
|
||||
|
||||
|
||||
void AbstractCodec::setByteOrder(int byteOrder)
|
||||
{
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::setByteOrder enter: byteOrder:%x (threadId: %u)",
|
||||
byteOrder, epicsThreadGetIdSelf());
|
||||
|
||||
_socketBuffer->setEndianess(byteOrder);
|
||||
// TODO sync
|
||||
_sendBuffer->setEndianess(byteOrder);
|
||||
@@ -1063,40 +935,21 @@ namespace epics {
|
||||
//
|
||||
|
||||
void BlockingAbstractCodec::readPollOne() {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingAbstractCodec::readPollOne enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
throw std::logic_error("should not be called for blocking IO");
|
||||
}
|
||||
|
||||
|
||||
void BlockingAbstractCodec::writePollOne() {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingAbstractCodec::writePollOne enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
throw std::logic_error("should not be called for blocking IO");
|
||||
}
|
||||
|
||||
|
||||
void BlockingAbstractCodec::close() {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingAbstractCodec::close enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
|
||||
if (_isOpen.getAndSet(false))
|
||||
{
|
||||
// always close in the same thread, same way, etc.
|
||||
// wakeup processSendQueue
|
||||
LOG(logLevelTrace,
|
||||
"BlockingAbstractCodec::close _sendQueue.waaaaakeup: "
|
||||
" (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
// clean resources
|
||||
internalClose(true);
|
||||
@@ -1106,12 +959,6 @@ namespace epics {
|
||||
// post close
|
||||
internalPostClose(true);
|
||||
}
|
||||
else {
|
||||
LOG(logLevelTrace,
|
||||
"BlockingAbstractCodec::close NOT WAKING UP _sendQueue: "
|
||||
" (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
}
|
||||
}
|
||||
|
||||
void BlockingAbstractCodec::internalClose(bool /*force*/) {
|
||||
@@ -1121,20 +968,11 @@ namespace epics {
|
||||
}
|
||||
|
||||
bool BlockingAbstractCodec::terminated() {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingAbstractCodec::terminated enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
return !isOpen();
|
||||
}
|
||||
|
||||
|
||||
bool BlockingAbstractCodec::isOpen() {
|
||||
|
||||
LOG(logLevelTrace, "BlockingAbstractCodec::isOpen %d (threadId: %u)", _isOpen.get(),
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
return _isOpen.get();
|
||||
}
|
||||
|
||||
@@ -1142,9 +980,6 @@ namespace epics {
|
||||
// NOTE: must not be called from constructor (e.g. needs shared_from_this())
|
||||
void BlockingAbstractCodec::start() {
|
||||
|
||||
LOG(logLevelTrace, "BlockingAbstractCodec::start enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
_readThread = epicsThreadCreate(
|
||||
"BlockingAbstractCodec-readThread",
|
||||
epicsThreadPriorityMedium,
|
||||
@@ -1161,21 +996,12 @@ namespace epics {
|
||||
BlockingAbstractCodec::sendThread,
|
||||
this);
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingAbstractCodec::start exit WITH readThread: %u,"
|
||||
" sendThread:%u (threadId: %u)",
|
||||
_readThread, _sendThread, epicsThreadGetIdSelf());
|
||||
|
||||
}
|
||||
|
||||
|
||||
void BlockingAbstractCodec::receiveThread(void *param)
|
||||
{
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingAbstractCodec::receiveThread enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
BlockingAbstractCodec *bac = static_cast<BlockingAbstractCodec *>(param);
|
||||
Transport::shared_pointer ptr = bac->shared_from_this();
|
||||
|
||||
@@ -1190,23 +1016,14 @@ namespace epics {
|
||||
}
|
||||
}
|
||||
|
||||
LOG(logLevelTrace, "BlockingAbstractCodec::receiveThread"
|
||||
" EXIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIT: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
bac->_shutdownEvent.signal();
|
||||
|
||||
}
|
||||
|
||||
|
||||
void BlockingAbstractCodec::sendThread(void *param)
|
||||
{
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingAbstractCodec::sendThread enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
BlockingAbstractCodec *bac = static_cast<BlockingAbstractCodec *>(param);
|
||||
BlockingAbstractCodec *bac = static_cast<BlockingAbstractCodec *>(param);
|
||||
Transport::shared_pointer ptr = bac->shared_from_this();
|
||||
|
||||
bac->setSenderThread();
|
||||
@@ -1222,38 +1039,16 @@ namespace epics {
|
||||
}
|
||||
}
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingAbstractCodec::sendThread EXIIIIIIIIIIIIIIT"
|
||||
" while(bac->isOpen): (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
|
||||
// wait read thread to die
|
||||
bac->_shutdownEvent.wait();
|
||||
|
||||
// call internal destroy
|
||||
LOG(logLevelTrace, "XXXXXXXXXXXXXXXXXXXXXXXXXXXX"
|
||||
"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
bac->internalDestroy();
|
||||
LOG(logLevelTrace, "YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY"
|
||||
"YYYYYYYYYYYYYYYYYYYYYYYYYYYY (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingAbstractCodec::sendThread EXIIIIT (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
}
|
||||
|
||||
|
||||
void BlockingAbstractCodec::sendBufferFull(int tries) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingAbstractCodec::sendBufferFull enter: tries: %d "
|
||||
"(threadId: %u)",
|
||||
tries, epicsThreadGetIdSelf());
|
||||
|
||||
// TODO constants
|
||||
epicsThreadSleep(std::max<double>(tries * 0.1, 1));
|
||||
}
|
||||
@@ -1309,18 +1104,11 @@ namespace epics {
|
||||
inetAddressToString(_socketAddress).c_str(), errStr);
|
||||
}
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingSocketAbstractCodec constructed (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
}
|
||||
|
||||
// must be called only once, when there will be no operation on socket (e.g. just before tx/rx thread exists)
|
||||
void BlockingSocketAbstractCodec::internalDestroy() {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingSocketAbstractCodec::internalDestroy enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
if(_channel != INVALID_SOCKET) {
|
||||
epicsSocketDestroy(_channel);
|
||||
_channel = INVALID_SOCKET;
|
||||
@@ -1330,12 +1118,6 @@ namespace epics {
|
||||
|
||||
|
||||
void BlockingSocketAbstractCodec::invalidDataStreamHandler() {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingSocketAbstractCodec::invalidDataStreamHandler enter:"
|
||||
" (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
close();
|
||||
}
|
||||
|
||||
@@ -1343,29 +1125,14 @@ namespace epics {
|
||||
int BlockingSocketAbstractCodec::write(
|
||||
epics::pvData::ByteBuffer *src) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingSocketAbstractCodec::write enter: position:%u, "
|
||||
"remaining:%u (threadId: %u)",
|
||||
src->getPosition(), src->getRemaining(), epicsThreadGetIdSelf());
|
||||
|
||||
std::size_t remaining;
|
||||
while((remaining=src->getRemaining()) > 0) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingSocketAbstractCodec::write before send"
|
||||
" (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
|
||||
int bytesSent = ::send(_channel,
|
||||
&src->getArray()[src->getPosition()],
|
||||
remaining, 0);
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingSocketAbstractCodec::write afer send, read:%d"
|
||||
" (threadId: %u)",
|
||||
bytesSent, epicsThreadGetIdSelf());
|
||||
|
||||
// NOTE: do not log here, you might override SOCKERRNO relevant to recv() operation above
|
||||
|
||||
if(unlikely(bytesSent<0)) {
|
||||
|
||||
@@ -1384,18 +1151,13 @@ namespace epics {
|
||||
|
||||
}
|
||||
|
||||
//TODO check what to return
|
||||
return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
std::size_t BlockingSocketAbstractCodec::getSocketReceiveBufferSize()
|
||||
const {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingSocketAbstractCodec::getSocketReceiveBufferSize"
|
||||
" enter (threadId: %u)", epicsThreadGetIdSelf());
|
||||
|
||||
osiSocklen_t intLen = sizeof(int);
|
||||
char strBuffer[64];
|
||||
int socketRecvBufferSize;
|
||||
@@ -1407,46 +1169,24 @@ namespace epics {
|
||||
//LOG(logLevelDebug, "Error getting SO_SNDBUF: %s", strBuffer);
|
||||
}
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingSocketAbstractCodec::getSocketReceiveBufferSize"
|
||||
" returning:%u (threadId: %u)", socketRecvBufferSize,
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
return socketRecvBufferSize;
|
||||
}
|
||||
|
||||
|
||||
int BlockingSocketAbstractCodec::read(epics::pvData::ByteBuffer* dst) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingSocketAbstractCodec::read enter: "
|
||||
"read bytes:%u (threadId: %u)",
|
||||
dst->getRemaining(), epicsThreadGetIdSelf());
|
||||
|
||||
std::size_t remaining;
|
||||
while((remaining=dst->getRemaining()) > 0) {
|
||||
|
||||
// read
|
||||
std::size_t pos = dst->getPosition();
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingSocketAbstractCodec::read before recv"
|
||||
" (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
|
||||
int bytesRead = recv(_channel,
|
||||
(char*)(dst->getArray()+pos), remaining, 0);
|
||||
|
||||
// NOTE: do not log here, you might override SOCKERRNO relevant to recv() operation above
|
||||
|
||||
/*
|
||||
LOG(logLevelTrace,
|
||||
"BlockingSocketAbstractCodec::read after recv, read: %d",
|
||||
bytesRead," (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
|
||||
if (IS_LOGGABLE(logLevelTrace)) {
|
||||
hexDump(std::string("READ"),
|
||||
(const int8 *)(dst->getArray()+pos), bytesRead);
|
||||
@@ -1459,11 +1199,6 @@ namespace epics {
|
||||
{
|
||||
int socketError = SOCKERRNO;
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingSocketAbstractCodec::read SOCKERRNO %d", socketError,
|
||||
" (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
// interrupted or timeout
|
||||
if (socketError == EINTR ||
|
||||
socketError == EAGAIN ||
|
||||
@@ -1497,28 +1232,15 @@ namespace epics {
|
||||
//register/unregister
|
||||
// TODO implement priorities in Reactor... not that user will
|
||||
// change it.. still getPriority() must return "registered" priority!
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingServerTCPTransportCodec constructed (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
}
|
||||
|
||||
|
||||
BlockingServerTCPTransportCodec::~BlockingServerTCPTransportCodec() {
|
||||
LOG(logLevelTrace,
|
||||
"BlockingServerTCPTransportCodec DESTRUCTED (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
}
|
||||
|
||||
|
||||
pvAccessID BlockingServerTCPTransportCodec::preallocateChannelSID() {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingServerTCPTransportCodec::preallocateChannelSID enter:"
|
||||
" (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
Lock lock(_channelsMutex);
|
||||
// search first free (theoretically possible loop of death)
|
||||
pvAccessID sid = ++_lastChannelSID;
|
||||
@@ -1532,11 +1254,6 @@ namespace epics {
|
||||
pvAccessID sid,
|
||||
ServerChannel::shared_pointer const & channel) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingServerTCPTransportCodec::registerChannel enter: sid:%d "
|
||||
" (threadId: %u)",
|
||||
channel->getSID(), epicsThreadGetIdSelf());
|
||||
|
||||
Lock lock(_channelsMutex);
|
||||
_channels[sid] = channel;
|
||||
|
||||
@@ -1545,11 +1262,6 @@ namespace epics {
|
||||
|
||||
void BlockingServerTCPTransportCodec::unregisterChannel(pvAccessID sid) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingServerTCPTransportCodec::unregisterChannel enter:"
|
||||
" sid:%d (threadId: %u)",
|
||||
sid, epicsThreadGetIdSelf());
|
||||
|
||||
Lock lock(_channelsMutex);
|
||||
_channels.erase(sid);
|
||||
}
|
||||
@@ -1558,11 +1270,6 @@ namespace epics {
|
||||
ServerChannel::shared_pointer
|
||||
BlockingServerTCPTransportCodec::getChannel(pvAccessID sid) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingServerTCPTransportCodec::getChannel enter:"
|
||||
" sid:%d (threadId: %u)",
|
||||
sid, epicsThreadGetIdSelf());
|
||||
|
||||
Lock lock(_channelsMutex);
|
||||
|
||||
std::map<pvAccessID, ServerChannel::shared_pointer>::iterator it =
|
||||
@@ -1576,11 +1283,6 @@ namespace epics {
|
||||
|
||||
int BlockingServerTCPTransportCodec::getChannelCount() {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingServerTCPTransportCodec::getChannelCount enter: "
|
||||
"(threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
Lock lock(_channelsMutex);
|
||||
return static_cast<int>(_channels.size());
|
||||
}
|
||||
@@ -1589,10 +1291,6 @@ namespace epics {
|
||||
void BlockingServerTCPTransportCodec::send(ByteBuffer* buffer,
|
||||
TransportSendControl* control) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingServerTCPTransportCodec::send enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
//
|
||||
// set byte order control message
|
||||
//
|
||||
@@ -1893,7 +1591,6 @@ namespace epics {
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -28,7 +28,6 @@
|
||||
#include <pv/timer.h>
|
||||
#include <pv/event.h>
|
||||
#include <pv/likely.h>
|
||||
#include <pv/logger.h>
|
||||
|
||||
#ifdef abstractCodecEpicsExportSharedSymbols
|
||||
# define epicsExportSharedSymbols
|
||||
@@ -44,6 +43,7 @@
|
||||
|
||||
namespace epics {
|
||||
namespace pvAccess {
|
||||
namespace detail {
|
||||
|
||||
// TODO replace mutex with atomic (CAS) operations
|
||||
template<typename T>
|
||||
@@ -80,24 +80,17 @@ namespace epics {
|
||||
*/
|
||||
~queue(void)
|
||||
{
|
||||
LOG(logLevelTrace,
|
||||
"queue::~queue DESTROY (threadId: %u)", epicsThreadGetIdSelf());
|
||||
}
|
||||
|
||||
|
||||
bool empty(void)
|
||||
{
|
||||
LOG(logLevelTrace,
|
||||
"queue::empty enter: (threadId: %u)", epicsThreadGetIdSelf());
|
||||
epics::pvData::Lock lock(_queueMutex);
|
||||
return _queue.empty();
|
||||
}
|
||||
|
||||
void clean()
|
||||
{
|
||||
LOG(logLevelTrace, "queue::clean enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
epics::pvData::Lock lock(_queueMutex);
|
||||
_queue.clear();
|
||||
}
|
||||
@@ -105,15 +98,8 @@ namespace epics {
|
||||
|
||||
void wakeup()
|
||||
{
|
||||
|
||||
LOG(logLevelTrace, "queue::wakeup enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
if (!_wakeup.getAndSet(true))
|
||||
{
|
||||
LOG(logLevelTrace,
|
||||
"queue::wakeup signaling on _queueEvent: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
_queueEvent.signal();
|
||||
}
|
||||
}
|
||||
@@ -121,9 +107,6 @@ namespace epics {
|
||||
|
||||
void put(T const & elem)
|
||||
{
|
||||
LOG(logLevelTrace,
|
||||
"queue::put enter (threadId: %u)", epicsThreadGetIdSelf());
|
||||
|
||||
{
|
||||
epics::pvData::Lock lock(_queueMutex);
|
||||
_queue.push_back(elem);
|
||||
@@ -135,11 +118,6 @@ namespace epics {
|
||||
|
||||
T take(int timeOut)
|
||||
{
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"queue::take enter timeOut:%d (threadId: %u)",
|
||||
timeOut, epicsThreadGetIdSelf());
|
||||
|
||||
while (true)
|
||||
{
|
||||
|
||||
@@ -149,10 +127,6 @@ namespace epics {
|
||||
{
|
||||
|
||||
if (timeOut < 0) {
|
||||
epics::pvAccess::LOG(logLevelTrace,
|
||||
"queue::take exit timeOut:%d (threadId: %u)",
|
||||
timeOut, epicsThreadGetIdSelf());
|
||||
|
||||
return T();
|
||||
}
|
||||
|
||||
@@ -160,45 +134,21 @@ namespace epics {
|
||||
{
|
||||
|
||||
if (timeOut == 0) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"queue::take going to wait timeOut:%d (threadId: %u)",
|
||||
timeOut, epicsThreadGetIdSelf());
|
||||
|
||||
_queueEvent.wait();
|
||||
}
|
||||
else {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"queue::take going to wait timeOut:%d (threadId: %u)",
|
||||
timeOut, epicsThreadGetIdSelf());
|
||||
|
||||
_queueEvent.wait(timeOut);
|
||||
}
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"queue::take waking up timeOut:%d (threadId: %u)",
|
||||
timeOut, epicsThreadGetIdSelf());
|
||||
|
||||
isEmpty = empty();
|
||||
if (isEmpty)
|
||||
{
|
||||
if (timeOut > 0) { // TODO spurious wakeup, but not critical
|
||||
LOG(logLevelTrace,
|
||||
"queue::take exit after being woken up timeOut:%d"
|
||||
" (threadId: %u)",
|
||||
timeOut, epicsThreadGetIdSelf());
|
||||
return T();
|
||||
}
|
||||
else // if (timeout == 0) cannot be negative
|
||||
{
|
||||
if (_wakeup.getAndSet(false)) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"queue::take exit after being woken up timeOut:%d"
|
||||
" (threadId: %u)",
|
||||
timeOut, epicsThreadGetIdSelf());
|
||||
|
||||
return T();
|
||||
}
|
||||
}
|
||||
@@ -207,20 +157,9 @@ namespace epics {
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"queue::take obtaining lock for front element timeOut:%d"
|
||||
" (threadId: %u)",
|
||||
timeOut, epicsThreadGetIdSelf());
|
||||
|
||||
epics::pvData::Lock lock(_queueMutex);
|
||||
T sender = _queue.front();
|
||||
_queue.pop_front();
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"queue::take exit with sender timeOut:%d (threadId: %u)",
|
||||
timeOut, epicsThreadGetIdSelf());
|
||||
|
||||
return sender;
|
||||
}
|
||||
}
|
||||
@@ -296,9 +235,6 @@ namespace epics {
|
||||
|
||||
virtual ~AbstractCodec()
|
||||
{
|
||||
LOG(logLevelTrace,
|
||||
"AbstractCodec::~AbstractCodec DESTROY (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
}
|
||||
|
||||
void alignBuffer(std::size_t alignment);
|
||||
@@ -351,7 +287,7 @@ namespace epics {
|
||||
std::tr1::shared_ptr<epics::pvData::ByteBuffer> _socketBuffer;
|
||||
std::tr1::shared_ptr<epics::pvData::ByteBuffer> _sendBuffer;
|
||||
|
||||
epics::pvAccess::queue<TransportSender::shared_pointer> _sendQueue;
|
||||
queue<TransportSender::shared_pointer> _sendQueue;
|
||||
|
||||
private:
|
||||
|
||||
@@ -473,11 +409,6 @@ namespace epics {
|
||||
|
||||
|
||||
void internalDestroy() {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingTCPTransportCodec::internalDestroy() enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
BlockingSocketAbstractCodec::internalDestroy();
|
||||
Transport::shared_pointer thisSharedPtr = this->shared_from_this();
|
||||
_context->getTransportRegistry()->remove(thisSharedPtr);
|
||||
@@ -488,12 +419,6 @@ namespace epics {
|
||||
|
||||
|
||||
void processControlMessage() {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingTCPTransportCodec::processControlMessage()"
|
||||
"enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
if (_command == 2)
|
||||
{
|
||||
// check 7-th bit
|
||||
@@ -503,12 +428,6 @@ namespace epics {
|
||||
|
||||
|
||||
void processApplicationMessage() {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingTCPTransportCodec::processApplicationMessage() enter:"
|
||||
" (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
_responseHandler->handleResponse(&_socketAddress, shared_from_this(),
|
||||
_version, _command, _payloadSize, _socketBuffer.get());
|
||||
}
|
||||
@@ -535,37 +454,18 @@ namespace epics {
|
||||
|
||||
|
||||
void setRemoteRevision(epics::pvData::int8 revision) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingTCPTransportCodec::setRemoteRevision() enter:"
|
||||
" revision: %d (threadId: %u)",
|
||||
revision, epicsThreadGetIdSelf());
|
||||
|
||||
_remoteTransportRevision = revision;
|
||||
}
|
||||
|
||||
|
||||
void setRemoteTransportReceiveBufferSize(
|
||||
std::size_t remoteTransportReceiveBufferSize) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingTCPTransportCodec::setRemoteTransportReceiveBufferSize()"
|
||||
" enter: remoteTransportReceiveBufferSize:%u (threadId: %u)",
|
||||
remoteTransportReceiveBufferSize, epicsThreadGetIdSelf());
|
||||
|
||||
_remoteTransportReceiveBufferSize = remoteTransportReceiveBufferSize;
|
||||
}
|
||||
|
||||
|
||||
void setRemoteTransportSocketReceiveBufferSize(
|
||||
std::size_t socketReceiveBufferSize) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingTCPTransportCodec::"
|
||||
"setRemoteTransportSocketReceiveBufferSize()"
|
||||
"enter: socketReceiveBufferSize:%u (threadId: %u)",
|
||||
socketReceiveBufferSize, epicsThreadGetIdSelf());
|
||||
|
||||
_remoteTransportSocketReceiveBufferSize = socketReceiveBufferSize;
|
||||
}
|
||||
|
||||
@@ -573,12 +473,6 @@ namespace epics {
|
||||
std::tr1::shared_ptr<const epics::pvData::Field>
|
||||
cachedDeserialize(epics::pvData::ByteBuffer* buffer)
|
||||
{
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingTCPTransportCodec::cachedDeserialize() enter:"
|
||||
" (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
return _incomingIR.deserialize(buffer, this);
|
||||
}
|
||||
|
||||
@@ -587,12 +481,6 @@ namespace epics {
|
||||
const std::tr1::shared_ptr<const epics::pvData::Field>& field,
|
||||
epics::pvData::ByteBuffer* buffer)
|
||||
{
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingTCPTransportCodec::cachedSerialize() enter:"
|
||||
" (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
_outgoingIR.serialize(field, buffer, this);
|
||||
}
|
||||
|
||||
@@ -602,11 +490,7 @@ namespace epics {
|
||||
const char* /*toSerialize*/,
|
||||
std::size_t /*elementCount*/, std::size_t /*elementSize*/)
|
||||
{
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingTCPTransportCodec::directSerialize() enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
// TODO !!!!
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -614,12 +498,7 @@ namespace epics {
|
||||
bool directDeserialize(epics::pvData::ByteBuffer * /*existingBuffer*/,
|
||||
char* /*deserializeTo*/,
|
||||
std::size_t /*elementCount*/, std::size_t /*elementSize*/) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingTCPTransportCodec::directDeserialize() enter:"
|
||||
" (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
// TODO !!!
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -628,21 +507,11 @@ namespace epics {
|
||||
|
||||
|
||||
bool isClosed() {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingTCPTransportCodec::isClosed() enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
return !isOpen();
|
||||
}
|
||||
|
||||
|
||||
void activate() {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingTCPTransportCodec::activate() enter: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
Transport::shared_pointer thisSharedPtr = shared_from_this();
|
||||
_context->getTransportRegistry()->put(thisSharedPtr);
|
||||
|
||||
@@ -664,9 +533,6 @@ namespace epics {
|
||||
_remoteTransportReceiveBufferSize(MAX_TCP_RECV),
|
||||
_remoteTransportRevision(0), _priority(priority)
|
||||
{
|
||||
LOG(logLevelTrace,
|
||||
"BlockingTCPTransportCodec constructed: (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
}
|
||||
|
||||
Context::shared_pointer _context;
|
||||
@@ -720,12 +586,6 @@ namespace epics {
|
||||
|
||||
bool acquire(std::tr1::shared_ptr<TransportClient> const & client)
|
||||
{
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingServerTCPTransportCodec::acquire() enter:"
|
||||
" client is set: %d (threadId: %u)",
|
||||
(client.get() != 0), epicsThreadGetIdSelf());
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -748,12 +608,6 @@ namespace epics {
|
||||
int getChannelCount();
|
||||
|
||||
epics::pvData::PVField::shared_pointer getSecurityToken() {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingServerTCPTransportCodec::getSecurityToken() enter:"
|
||||
" (threadId: %u)",
|
||||
epicsThreadGetIdSelf());
|
||||
|
||||
return epics::pvData::PVField::shared_pointer();
|
||||
}
|
||||
|
||||
@@ -766,12 +620,6 @@ namespace epics {
|
||||
}
|
||||
|
||||
bool verify(epics::pvData::int32 timeoutMs) {
|
||||
|
||||
LOG(logLevelTrace,
|
||||
"BlockingServerTCPTransportCodec::verify() enter: "
|
||||
"timeoutMs:%d (threadId: %u)",
|
||||
timeoutMs, epicsThreadGetIdSelf());
|
||||
|
||||
TransportSender::shared_pointer transportSender =
|
||||
std::tr1::dynamic_pointer_cast<TransportSender>(shared_from_this());
|
||||
enqueueSendRequest(transportSender);
|
||||
@@ -944,7 +792,8 @@ namespace epics {
|
||||
epics::pvData::Event _verifiedEvent;
|
||||
|
||||
};
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -23,12 +23,9 @@ pvAccessApp/mb/pvAccessMB.h
|
||||
pvAccessApp/remote/abstractResponseHandler.cpp
|
||||
pvAccessApp/remote/beaconHandler.cpp
|
||||
pvAccessApp/remote/beaconHandler.h
|
||||
pvAccessApp/remote/blockingClientTCPTransport.cpp
|
||||
pvAccessApp/remote/blockingServerTCPTransport.cpp
|
||||
pvAccessApp/remote/blockingTCP.h
|
||||
pvAccessApp/remote/blockingTCPAcceptor.cpp
|
||||
pvAccessApp/remote/blockingTCPConnector.cpp
|
||||
pvAccessApp/remote/blockingTCPTransport.cpp
|
||||
pvAccessApp/remote/blockingUDP.h
|
||||
pvAccessApp/remote/blockingUDPConnector.cpp
|
||||
pvAccessApp/remote/blockingUDPTransport.cpp
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include <pv/current_function.h>
|
||||
|
||||
using namespace epics::pvData;
|
||||
using namespace epics::pvAccess::detail;
|
||||
|
||||
namespace epics {
|
||||
|
||||
|
||||
Reference in New Issue
Block a user