This commit is contained in:
Gasper Jansa
2011-01-24 21:47:36 +01:00
19 changed files with 1166 additions and 770 deletions

View File

@@ -1,3 +1,72 @@
pvAccessApp/ca/version.cpp
pvAccessApp/client/pvAccess.cpp
pvAccessApp/factory/ChannelAccessFactory.cpp
pvAccessApp/factory/CreateRequestFactory.cpp
pvAccessApp/remote/abstractResponseHandler.cpp
pvAccessApp/remote/beaconEmitter.cpp
pvAccessApp/remote/beaconHandler.cpp
pvAccessApp/remote/beaconServerStatusProvider.cpp
pvAccessApp/remote/blockingClientTCPTransport.cpp
pvAccessApp/remote/blockingServerTCPTransport.cpp
pvAccessApp/remote/blockingTCPAcceptor.cpp
pvAccessApp/remote/blockingTCPConnector.cpp
pvAccessApp/remote/blockingTCPTransport.cpp
pvAccessApp/remote/blockingUDPConnector.cpp
pvAccessApp/remote/blockingUDPTransport.cpp
pvAccessApp/remote/channelSearchManager.cpp
pvAccessApp/server/responseHandlers.cpp
pvAccessApp/utils/configuration.cpp
pvAccessApp/utils/hexDump.cpp
pvAccessApp/utils/inetAddressUtil.cpp
pvAccessApp/utils/introspectionRegistry.cpp
pvAccessApp/utils/logger.cpp
pvAccessApp/utils/namedLockPattern.cpp
pvAccessApp/utils/referenceCountingLock.cpp
pvAccessApp/utils/transportRegistry.cpp
pvAccessApp/utils/wildcharMatcher.cpp
testApp/client/MockClientImpl.cpp
testApp/client/testChannelAccessFactory.cpp
testApp/client/testCreateRequest.cpp
testApp/client/testMockClient.cpp
testApp/remote/testBeaconEmitter.cpp
testApp/remote/testBeaconHandler.cpp
testApp/remote/testBlockingTCPClnt.cpp
testApp/remote/testBlockingTCPSrv.cpp
testApp/remote/testBlockingUDPClnt.cpp
testApp/remote/testBlockingUDPSrv.cpp
testApp/remote/testChannelSearchManager.cpp
testApp/remote/testRemoteClientImpl.cpp
testApp/utils/arrayFIFOTest.cpp
testApp/utils/configurationTest.cpp
testApp/utils/growingCircularBufferTest.cpp
testApp/utils/hexDumpTest.cpp
testApp/utils/inetAddressUtilsTest.cpp
testApp/utils/introspectionRegistryTest.cpp
testApp/utils/loggerTest.cpp
testApp/utils/namedLockPatternTest.cpp
testApp/utils/transportRegistryTest.cpp
testApp/utils/wildcharMatcherTest.cpp
pvAccessApp/ca/caConstants.h
pvAccessApp/ca/version.h
pvAccessApp/client/pvAccess.h
pvAccessApp/client/ChannelAccessFactory.cpp
pvAccessApp/testClient/testChannelAccessFactory.cpp
pvAccessApp/remote/beaconEmitter.h
pvAccessApp/remote/beaconHandler.h
pvAccessApp/remote/beaconServerStatusProvider.h
pvAccessApp/remote/blockingTCP.h
pvAccessApp/remote/blockingUDP.h
pvAccessApp/remote/channelSearchManager.h
pvAccessApp/remote/remote.h
pvAccessApp/remoteClient/clientContextImpl.h
pvAccessApp/server/responseHandlers.h
pvAccessApp/server/serverContext.h
pvAccessApp/utils/arrayFIFO.h
pvAccessApp/utils/configuration.h
pvAccessApp/utils/growingCircularBuffer.h
pvAccessApp/utils/hexDump.h
pvAccessApp/utils/inetAddressUtil.h
pvAccessApp/utils/introspectionRegistry.h
pvAccessApp/utils/logger.h
pvAccessApp/utils/namedLockPattern.h
pvAccessApp/utils/referenceCountingLock.h
pvAccessApp/utils/transportRegistry.h
pvAccessApp/utils/wildcharMatcher.h

View File

@@ -1 +1,2 @@
pvAccessApp/client
include
../pvDataCPP/include

View File

@@ -142,10 +142,13 @@ class CreateRequestImpl : public CreateRequest {
requester->message("illegal option", errorMessage);
return false;
}
PVString* pvStringField = static_cast<PVString*>(getPVDataCreate()->createPVScalar(pvParent, token.substr(0, equalsPos), pvString));
pvStringField->put(token.substr(equalsPos+1));
pvParent->appendPVField(pvStringField);
if (equalsPos != std::string::npos)
{
PVString* pvStringField = static_cast<PVString*>(getPVDataCreate()->createPVScalar(pvParent, token.substr(0, equalsPos), pvString));
pvStringField->put(token.substr(equalsPos+1));
pvParent->appendPVField(pvStringField);
}
}
return true;
}
@@ -160,13 +163,14 @@ class CreateRequestImpl : public CreateRequest {
{
return getPVDataCreate()->createPVStructure(0, emptyString, 0);
}
size_t offsetRecord = request.find("record[");
size_t offsetField = request.find("field(");
size_t offsetPutField = request.find("putField(");
size_t offsetGetField = request.find("getField(");
PVStructure* pvStructure = getPVDataCreate()->createPVStructure(0, emptyString, 0);
if (offsetRecord != std::string::npos) {
size_t offsetBegin = request.find('[', offsetRecord);
size_t offsetEnd = request.find(']', offsetBegin);
@@ -199,7 +203,7 @@ class CreateRequestImpl : public CreateRequest {
delete pvStructure;
return 0;
}
pvStructure->appendPVField(pvStruct);
pvStructure->appendPVField(pvStruct);
}
if (offsetPutField != std::string::npos) {
size_t offsetBegin = request.find('(', offsetPutField);

View File

@@ -34,12 +34,10 @@ namespace epics {
float beaconInterval, int16 priority) :
BlockingTCPTransport(context, channel, responseHandler,
receiveBufferSize, priority), _introspectionRegistry(
new IntrospectionRegistry(false)), _owners(new set<
TransportClient*> ()), _connectionTimeout(beaconInterval
new IntrospectionRegistry(false)), _connectionTimeout(beaconInterval
*1000), _unresponsiveTransport(false), _timerNode(
new TimerNode(this)), _mutex(new Mutex()), _ownersMutex(
new Mutex()), _verifyOrEcho(true) {
_autoDelete = false;
new TimerNode(this)), _verifyOrEcho(true) {
// _autoDelete = false;
// initialize owners list, send queue
acquire(client);
@@ -58,11 +56,9 @@ namespace epics {
}
BlockingClientTCPTransport::~BlockingClientTCPTransport() {
printf("========== ~BlockingClientTCPTransport\n");
delete _introspectionRegistry;
delete _owners;
delete _timerNode;
delete _mutex;
delete _ownersMutex;
}
void BlockingClientTCPTransport::callback() {
@@ -84,27 +80,25 @@ namespace epics {
if(!_unresponsiveTransport) {
_unresponsiveTransport = true;
Lock lock(_ownersMutex);
set<TransportClient*>::iterator it = _owners->begin();
for(; it!=_owners->end(); it++)
Lock lock(&_ownersMutex);
set<TransportClient*>::iterator it = _owners.begin();
for(; it!=_owners.end(); it++)
(*it)->transportUnresponsive();
}
}
bool BlockingClientTCPTransport::acquire(TransportClient* client) {
Lock lock(_mutex);
Lock lock(&_mutex);
if(_closed) return false;
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr));
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
errlogSevPrintf(errlogInfo, "Acquiring transport to %s.", ipAddrStr);
_ownersMutex->lock();
if(_closed) return false;
_owners->insert(client);
_ownersMutex->unlock();
Lock lock2(&_ownersMutex);
// TODO double check? if(_closed) return false;
_owners.insert(client);
return true;
}
@@ -121,40 +115,40 @@ namespace epics {
* Notifies clients about disconnect.
*/
void BlockingClientTCPTransport::closedNotifyClients() {
Lock lock(_ownersMutex);
Lock lock(&_ownersMutex);
// check if still acquired
int refs = _owners->size();
int refs = _owners.size();
if(refs>0) {
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr));
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
errlogSevPrintf(
errlogInfo,
"Transport to %s still has %d client(s) active and closing...",
ipAddrStr, refs);
set<TransportClient*>::iterator it = _owners->begin();
for(; it!=_owners->end(); it++)
set<TransportClient*>::iterator it = _owners.begin();
for(; it!=_owners.end(); it++)
(*it)->transportClosed();
}
_owners->clear();
_owners.clear();
}
void BlockingClientTCPTransport::release(TransportClient* client) {
if(_closed) return;
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr));
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
errlogSevPrintf(errlogInfo, "Releasing transport to %s.", ipAddrStr);
Lock lock(_ownersMutex);
_owners->erase(client);
Lock lock(&_ownersMutex);
_owners.erase(client);
// not used anymore
// TODO consider delayed destruction (can improve performance!!!)
if(_owners->size()==0) close(false);
if(_owners.size()==0) close(false);
}
void BlockingClientTCPTransport::aliveNotification() {
@@ -165,20 +159,20 @@ namespace epics {
void BlockingClientTCPTransport::responsiveTransport() {
if(_unresponsiveTransport) {
_unresponsiveTransport = false;
Lock lock(_ownersMutex);
Lock lock(&_ownersMutex);
set<TransportClient*>::iterator it = _owners->begin();
for(; it!=_owners->end(); it++)
set<TransportClient*>::iterator it = _owners.begin();
for(; it!=_owners.end(); it++)
(*it)->transportResponsive(this);
}
}
void BlockingClientTCPTransport::changedTransport() {
_introspectionRegistry->reset();
Lock lock(_ownersMutex);
Lock lock(&_ownersMutex);
set<TransportClient*>::iterator it = _owners->begin();
for(; it!=_owners->end(); it++)
set<TransportClient*>::iterator it = _owners.begin();
for(; it!=_owners.end(); it++)
(*it)->transportChanged();
}

View File

@@ -31,9 +31,7 @@ namespace epics {
BlockingTCPTransport(context, channel, responseHandler,
receiveBufferSize, CA_DEFAULT_PRIORITY),
_introspectionRegistry(new IntrospectionRegistry(true)),
_lastChannelSID(0), _channels(
new map<int, ServerChannel*> ()), _channelsMutex(
new Mutex()) {
_lastChannelSID(0) {
// NOTE: priority not yet known, default priority is used to register/unregister
// TODO implement priorities in Reactor... not that user will
// change it.. still getPriority() must return "registered" priority!
@@ -43,27 +41,25 @@ namespace epics {
BlockingServerTCPTransport::~BlockingServerTCPTransport() {
delete _introspectionRegistry;
delete _channels;
delete _channelsMutex;
}
void BlockingServerTCPTransport::destroyAllChannels() {
Lock lock(_channelsMutex);
if(_channels->size()==0) return;
Lock lock(&_channelsMutex);
if(_channels.size()==0) return;
char ipAddrStr[64];
ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr));
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
errlogSevPrintf(
errlogInfo,
"Transport to %s still has %u channel(s) active and closing...",
ipAddrStr, _channels->size());
ipAddrStr, _channels.size());
map<pvAccessID, ServerChannel*>::iterator it = _channels->begin();
for(; it!=_channels->end(); it++)
map<pvAccessID, ServerChannel*>::iterator it = _channels.begin();
for(; it!=_channels.end(); it++)
it->second->destroy();
_channels->clear();
_channels.clear();
}
void BlockingServerTCPTransport::internalClose(bool force) {
@@ -72,37 +68,37 @@ namespace epics {
}
pvAccessID BlockingServerTCPTransport::preallocateChannelSID() {
Lock lock(_channelsMutex);
Lock lock(&_channelsMutex);
// search first free (theoretically possible loop of death)
pvAccessID sid = ++_lastChannelSID;
while(_channels->find(sid)!=_channels->end())
while(_channels.find(sid)!=_channels.end())
sid = ++_lastChannelSID;
return sid;
}
void BlockingServerTCPTransport::registerChannel(pvAccessID sid,
ServerChannel* channel) {
Lock lock(_channelsMutex);
(*_channels)[sid] = channel;
Lock lock(&_channelsMutex);
_channels[sid] = channel;
}
void BlockingServerTCPTransport::unregisterChannel(pvAccessID sid) {
Lock lock(_channelsMutex);
_channels->erase(sid);
Lock lock(&_channelsMutex);
_channels.erase(sid);
}
ServerChannel* BlockingServerTCPTransport::getChannel(pvAccessID sid) {
Lock lock(_channelsMutex);
Lock lock(&_channelsMutex);
map<pvAccessID, ServerChannel*>::iterator it = _channels->find(sid);
if(it!=_channels->end()) return it->second;
map<pvAccessID, ServerChannel*>::iterator it = _channels.find(sid);
if(it!=_channels.end()) return it->second;
return NULL;
}
int BlockingServerTCPTransport::getChannelCount() {
Lock lock(_channelsMutex);
return _channels->size();
Lock lock(&_channelsMutex);
return _channels.size();
}
void BlockingServerTCPTransport::send(ByteBuffer* buffer,

View File

@@ -22,6 +22,7 @@
#include <pvType.h>
#include <lock.h>
#include <timer.h>
#include <event.h>
/* EPICSv3 */
#include <osdSock.h>
@@ -52,7 +53,7 @@ namespace epics {
BlockingTCPTransport(Context* context, SOCKET channel,
ResponseHandler* responseHandler, int receiveBufferSize,
int16 priority);
virtual bool isClosed() const {
return _closed;
}
@@ -86,7 +87,7 @@ namespace epics {
}
virtual const osiSockAddr* getRemoteAddress() const {
return _socketAddress;
return &_socketAddress;
}
virtual int16 getPriority() const {
@@ -108,12 +109,12 @@ namespace epics {
virtual int getSocketReceiveBufferSize() const;
virtual bool isVerified() const {
Lock lock(_verifiedMutex);
Lock lock(const_cast<epics::pvData::Mutex*>(&_verifiedMutex));
return _verified;
}
virtual void verified() {
Lock lock(_verifiedMutex);
Lock lock(&_verifiedMutex);
_verified = true;
}
@@ -148,7 +149,7 @@ namespace epics {
_flushStrategy = flushStrategy;
}
void requestFlush();
//void requestFlush();
/**
* Close and free connection resources.
@@ -165,80 +166,7 @@ namespace epics {
void enqueueMonitorSendRequest(TransportSender* sender);
protected:
/**
* Connection status
*/
bool volatile _closed;
/**
* Corresponding channel.
*/
SOCKET _channel;
/**
* Cached socket address.
*/
osiSockAddr* _socketAddress;
/**
* Send buffer.
*/
epics::pvData::ByteBuffer* _sendBuffer;
/**
* Remote side transport revision (minor).
*/
int8 _remoteTransportRevision;
/**
* Remote side transport receive buffer size.
*/
int _remoteTransportReceiveBufferSize;
/**
* Remote side transport socket receive buffer size.
*/
int _remoteTransportSocketReceiveBufferSize;
/**
* Priority.
* NOTE: Priority cannot just be changed, since it is registered
* in transport registry with given priority.
*/
int16 _priority;
// TODO to be implemeneted
/**
* CAS response handler.
*/
ResponseHandler* _responseHandler;
/**
* Read sync. object monitor.
*/
//Object _readMonitor = new Object();
/**
* Total bytes received.
*/
int64 volatile _totalBytesReceived;
/**
* Total bytes sent.
*/
int64 volatile _totalBytesSent;
/**
* Marker to send.
*/
volatile int _markerToSend;
volatile bool _verified;
volatile int64 _remoteBufferFreeSpace;
volatile bool _autoDelete;
virtual void processReadCached(bool nestedCall,
ReceiveStage inStage, int requiredBytes, bool addToBuffer);
@@ -259,7 +187,8 @@ namespace epics {
virtual ~BlockingTCPTransport();
private:
/**
* Default marker period.
*/
@@ -267,7 +196,32 @@ namespace epics {
static const int MAX_ENSURE_DATA_BUFFER_SIZE = 1024;
static const double delay = 0.01;
static const double _delay = 0.01;
/****** finally initialized at construction time and after start (called by the same thread) ********/
/**
* Corresponding channel.
*/
SOCKET _channel;
/**
* Cached socket address.
*/
osiSockAddr _socketAddress;
/**
* Priority.
* NOTE: Priority cannot just be changed, since it is registered
* in transport registry with given priority.
*/
int16 _priority;
// TODO to be implemeneted
/**
* CAS response handler.
*/
ResponseHandler* _responseHandler;
/**
* Send buffer size.
@@ -284,6 +238,58 @@ namespace epics {
*/
int64 _markerPeriodBytes;
SendQueueFlushStrategy _flushStrategy;
epicsThreadId _rcvThreadId;
epicsThreadId _sendThreadId;
MonitorSender* _monitorSender;
Context* _context;
bool _autoDelete;
/**** after verification ****/
/**
* Remote side transport revision (minor).
*/
int8 _remoteTransportRevision;
/**
* Remote side transport receive buffer size.
*/
int _remoteTransportReceiveBufferSize;
/**
* Remote side transport socket receive buffer size.
*/
int _remoteTransportSocketReceiveBufferSize;
/*** send thread only - no need to sync ***/
// NOTE: now all send-related external calls are TransportSender IF
// and its reference is only valid when called from send thread
// initialized at construction time
GrowingCircularBuffer<TransportSender*>* _sendQueue;
epics::pvData::Mutex _sendQueueMutex;
// initialized at construction time
GrowingCircularBuffer<TransportSender*>* _monitorSendQueue;
epics::pvData::Mutex _monitorMutex;
/**
* Send buffer.
*/
epics::pvData::ByteBuffer* _sendBuffer;
/**
* Next planned marker position.
*/
@@ -299,20 +305,28 @@ namespace epics {
*/
int _lastMessageStartPosition;
int8 _lastSegmentedMessageType;
int8 _lastSegmentedMessageCommand;
bool _flushRequested;
int _sendBufferSentPosition;
/*** receive thread only - no need to sync ***/
// initialized at construction time
epics::pvData::ByteBuffer* _socketBuffer;
int _startPosition;
epics::pvData::Mutex* _mutex;
epics::pvData::Mutex* _sendQueueMutex;
epics::pvData::Mutex* _verifiedMutex;
epics::pvData::Mutex* _monitorMutex;
ReceiveStage _stage;
int8 _lastSegmentedMessageType;
int8 _lastSegmentedMessageCommand;
int _storedPayloadSize;
int _storedPosition;
int _storedLimit;
@@ -322,26 +336,68 @@ namespace epics {
int8 _command;
int _payloadSize;
volatile bool _flushRequested;
ReceiveStage _stage;
int _sendBufferSentPosition;
/**
* Total bytes received.
*/
int64 _totalBytesReceived;
SendQueueFlushStrategy _flushStrategy;
GrowingCircularBuffer<TransportSender*>* _sendQueue;
epicsThreadId _rcvThreadId;
epicsThreadId _sendThreadId;
GrowingCircularBuffer<TransportSender*>* _monitorSendQueue;
MonitorSender* _monitorSender;
/*** send/receive thread shared ***/
Context* _context;
/**
* Connection status
* NOTE: synced by _mutex
*/
bool volatile _closed;
volatile bool _sendThreadRunning;
// NOTE: synced by _mutex
bool _sendThreadExited;
epics::pvData::Mutex _mutex;
bool _verified;
epics::pvData::Mutex _verifiedMutex;
Event _sendQueueEvent;
/**
* Marker to send.
* NOTE: synced by _flowControlMutex
*/
int _markerToSend;
/**
* Total bytes sent.
* NOTE: synced by _flowControlMutex
*/
int64 _totalBytesSent;
/**
* Calculated remote free buffer size.
* NOTE: synced by _flowControlMutex
*/
int64 _remoteBufferFreeSpace;
epics::pvData::Mutex _flowControlMutex;
private:
/**
* Internal method that clears and releases buffer.
* sendLock and sendBufferLock must be hold while calling this method.
@@ -439,7 +495,7 @@ namespace epics {
/**
* Owners (users) of the transport.
*/
std::set<TransportClient*>* _owners;
std::set<TransportClient*> _owners;
/**
* Connection timeout (no-traffic) flag.
@@ -461,8 +517,8 @@ namespace epics {
*/
volatile epicsTimeStamp _aliveTimestamp;
epics::pvData::Mutex* _mutex;
epics::pvData::Mutex* _ownersMutex;
epics::pvData::Mutex _mutex;
epics::pvData::Mutex _ownersMutex;
bool _verifyOrEcho;
@@ -645,9 +701,9 @@ namespace epics {
/**
* Channel table (SID -> channel mapping).
*/
std::map<pvAccessID, ServerChannel*>* _channels;
std::map<pvAccessID, ServerChannel*> _channels;
Mutex* _channelsMutex;
Mutex _channelsMutex;
/**
* Destroy all channels.
@@ -672,7 +728,7 @@ namespace epics {
BlockingTCPAcceptor(Context* context, int port,
int receiveBufferSize);
~BlockingTCPAcceptor();
virtual ~BlockingTCPAcceptor();
void handleEvents();

View File

@@ -65,7 +65,7 @@ namespace epics {
BlockingTCPTransport::BlockingTCPTransport(Context* context,
SOCKET channel, ResponseHandler* responseHandler,
int receiveBufferSize, int16 priority) :
_closed(false), _channel(channel), _socketAddress(new osiSockAddr),
_closed(false), _channel(channel),
_remoteTransportRevision(0),
_remoteTransportReceiveBufferSize(MAX_TCP_RECV),
_remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV),
@@ -75,9 +75,7 @@ namespace epics {
LONG_LONG_MAX), _autoDelete(true),
_markerPeriodBytes(MARKER_PERIOD), _nextMarkerPosition(
_markerPeriodBytes), _sendPending(false),
_lastMessageStartPosition(0), _mutex(new Mutex()),
_sendQueueMutex(new Mutex()), _verifiedMutex(new Mutex()),
_monitorMutex(new Mutex()), _stage(READ_FROM_SOCKET),
_lastMessageStartPosition(0), _stage(READ_FROM_SOCKET),
_lastSegmentedMessageType(0), _lastSegmentedMessageCommand(
0), _storedPayloadSize(0), _storedPosition(0),
_storedLimit(0), _magicAndVersion(0), _packetType(0),
@@ -87,9 +85,9 @@ namespace epics {
new GrowingCircularBuffer<TransportSender*> (100)),
_rcvThreadId(NULL), _sendThreadId(NULL), _monitorSendQueue(
new GrowingCircularBuffer<TransportSender*> (100)),
_monitorSender(new MonitorSender(_monitorMutex,
_monitorSender(new MonitorSender(&_monitorMutex,
_monitorSendQueue)), _context(context),
_sendThreadRunning(false) {
_sendThreadExited(false) {
_socketBuffer = new ByteBuffer(max(MAX_TCP_RECV
+MAX_ENSURE_DATA_BUFFER_SIZE, receiveBufferSize), EPICS_ENDIAN_BIG);
@@ -114,7 +112,7 @@ namespace epics {
}
socklen_t saSize = sizeof(sockaddr);
retval = getpeername(_channel, &(_socketAddress->sa), &saSize);
retval = getpeername(_channel, &(_socketAddress.sa), &saSize);
if(retval<0) {
errlogSevPrintf(errlogMajor,
"Error fetching socket remote address: %s", strerror(
@@ -131,21 +129,19 @@ namespace epics {
BlockingTCPTransport::~BlockingTCPTransport() {
close(true);
delete _socketAddress;
delete _sendQueue;
delete _socketBuffer;
delete _sendBuffer;
delete _mutex;
delete _sendQueueMutex;
delete _verifiedMutex;
delete _monitorMutex;
delete _responseHandler;
}
void BlockingTCPTransport::start() {
_sendThreadRunning = true;
// TODO consuder epics::pvData::Thread
String threadName = "TCP-receive "+inetAddressToString(
_socketAddress);
_socketAddress);
errlogSevPrintf(errlogInfo, "Starting thread: %s",
threadName.c_str());
@@ -174,9 +170,9 @@ namespace epics {
_nextMarkerPosition -= _sendBuffer->getPosition()
-CA_MESSAGE_HEADER_SIZE;
_sendQueueMutex->lock();
_sendQueueMutex.lock();
_flushRequested = false;
_sendQueueMutex->unlock();
_sendQueueMutex.unlock();
_sendBuffer->clear();
@@ -190,12 +186,13 @@ namespace epics {
}
void BlockingTCPTransport::close(bool force) {
Lock lock(_mutex);
Lock lock(&_mutex);
// already closed check
if(_closed) return;
_closed = true;
printf("closing.\n");
// remove from registry
_context->getTransportRegistry()->remove(this);
@@ -203,12 +200,8 @@ namespace epics {
// clean resources
internalClose(force);
// threads cannot "wait" Epics, no need to notify
// TODO check alternatives to "wait"
// notify send queue
//synchronized (sendQueue) {
// sendQueue.notifyAll();
//}
_sendQueueEvent.signal();
}
void BlockingTCPTransport::internalClose(bool force) {
@@ -235,21 +228,22 @@ namespace epics {
return sockBufSize;
}
// TODO reimplement using Event
bool BlockingTCPTransport::waitUntilVerified(double timeout) {
double internalTimeout = timeout;
bool internalVerified = false;
_verifiedMutex->lock();
_verifiedMutex.lock();
internalVerified = _verified;
_verifiedMutex->unlock();
_verifiedMutex.unlock();
while(!internalVerified&&internalTimeout>0) {
epicsThreadSleep(min(0.1, internalTimeout));
internalTimeout -= 0.1;
_verifiedMutex->lock();
_verifiedMutex.lock();
internalVerified = _verified;
_verifiedMutex->unlock();
_verifiedMutex.unlock();
}
return internalVerified;
}
@@ -527,11 +521,14 @@ namespace epics {
}
else if(type==1) {
if(_command==0) {
if(_markerToSend==0) _markerToSend
= _payloadSize; // TODO send back response
_flowControlMutex.lock();
if(_markerToSend==0)
_markerToSend = _payloadSize; // TODO send back response
_flowControlMutex.unlock();
}
else //if (command == 1)
{
_flowControlMutex.lock();
int difference = (int)_totalBytesSent
-_payloadSize+CA_MESSAGE_HEADER_SIZE;
// overrun check
@@ -541,6 +538,7 @@ namespace epics {
+_remoteTransportSocketReceiveBufferSize
-difference;
// TODO if this is calculated wrong, this can be critical !!!
_flowControlMutex.unlock();
}
// no payload
@@ -576,7 +574,7 @@ namespace epics {
+_storedPayloadSize, _storedLimit));
try {
// handle response
_responseHandler->handleResponse(_socketAddress,
_responseHandler->handleResponse(&_socketAddress,
this, version, _command, _payloadSize,
_socketBuffer);
} catch(...) {
@@ -624,8 +622,10 @@ namespace epics {
_sendBufferSentPosition = 0;
// if not set skip marker otherwise set it
_flowControlMutex.lock();
int markerValue = _markerToSend;
_markerToSend = 0;
_flowControlMutex.unlock();
if(markerValue==0)
_sendBufferSentPosition = CA_MESSAGE_HEADER_SIZE;
else
@@ -657,7 +657,7 @@ namespace epics {
} catch(BaseException* e) {
String trace;
e->toString(trace);
errlogSevPrintf(errlogMajor, trace.c_str());
errlogSevPrintf(errlogMajor, "%s", trace.c_str());
// error, release lock
clearAndReleaseBuffer();
} catch(...) {
@@ -701,7 +701,7 @@ namespace epics {
// connection lost
ostringstream temp;
temp<<"error in sending TCP data: "<<strerror(errno);
errlogSevPrintf(errlogMajor, temp.str().c_str());
errlogSevPrintf(errlogMajor, "%s", temp.str().c_str());
THROW_BASE_EXCEPTION(temp.str().c_str());
}
else if(bytesSent==0) {
@@ -722,7 +722,9 @@ namespace epics {
buffer->setPosition(buffer->getPosition()+bytesSent);
_flowControlMutex.lock();
_totalBytesSent += bytesSent;
_flowControlMutex.unlock();
// readjust limit
if(bytesToSend==maxBytesToSend) {
@@ -748,18 +750,9 @@ namespace epics {
TransportSender* BlockingTCPTransport::extractFromSendQueue() {
TransportSender* retval;
_sendQueueMutex->lock();
try {
if(_sendQueue->size()>0)
retval = _sendQueue->extract();
else
retval = NULL;
} catch(...) {
// not expecting the exception here, but just to be safe
retval = NULL;
}
_sendQueueMutex->unlock();
_sendQueueMutex.lock();
retval = _sendQueue->extract();
_sendQueueMutex.unlock();
return retval;
}
@@ -768,23 +761,35 @@ namespace epics {
while(!_closed) {
TransportSender* sender;
// TODO race!
sender = extractFromSendQueue();
printf("extraced %d\n", sender);
// wait for new message
while(sender==NULL&&!_flushRequested&&!_closed) {
while(sender==NULL&&!_flushRequested/*&&!_closed*/) {
bool c;
_mutex.lock();
c = _closed;
printf("closed %d\n", c);
_mutex.unlock();
if (c)
break;
if(_flushStrategy==DELAYED) {
if(delay>0) epicsThreadSleep(delay);
if(_delay>0) epicsThreadSleep(_delay);
if(_sendQueue->size()==0) {
// if (hasMonitors || sendBuffer.position() > CAConstants.CA_MESSAGE_HEADER_SIZE)
if(_sendBuffer->getPosition()
>CA_MESSAGE_HEADER_SIZE)
if(_sendBuffer->getPosition()>CA_MESSAGE_HEADER_SIZE)
_flushRequested = true;
else
epicsThreadSleep(0);
_sendQueueEvent.wait();
}
}
else
epicsThreadSleep(0);
_sendQueueEvent.wait();
sender = extractFromSendQueue();
printf("extraced2 %d\n", sender);
}
// always do flush from this thread
@@ -813,7 +818,7 @@ namespace epics {
} catch(BaseException* e) {
String trace;
e->toString(trace);
errlogSevPrintf(errlogMajor, trace.c_str());
errlogSevPrintf(errlogMajor, "%s", trace.c_str());
_sendBuffer->setPosition(_lastMessageStartPosition);
} catch(...) {
_sendBuffer->setPosition(_lastMessageStartPosition);
@@ -823,13 +828,6 @@ namespace epics {
} // while(!_closed)
}
void BlockingTCPTransport::requestFlush() {
// needless lock, manipulating a single byte
//Lock lock(_sendQueueMutex);
if(_flushRequested) return;
_flushRequested = true;
}
void BlockingTCPTransport::freeSendBuffers() {
// TODO ?
}
@@ -849,53 +847,68 @@ namespace epics {
void BlockingTCPTransport::rcvThreadRunner(void* param) {
BlockingTCPTransport* obj = (BlockingTCPTransport*)param;
try{
obj->processReadCached(false, NONE, CA_MESSAGE_HEADER_SIZE, false);
} catch (...) {
printf("rcvThreadRunnner exception\n");
}
printf("rcvThreadRunner done, autodelete %d-\n", obj->_autoDelete);
if(obj->_autoDelete) {
while(obj->_sendThreadRunning)
while(true)
{
printf("waiting send thread to exit.\n");
bool exited;
obj->_mutex.lock();
exited = obj->_sendThreadExited;
obj->_mutex.unlock();
if (exited)
break;
epicsThreadSleep(0.1);
}
printf("deleting.\n");
delete obj;
}
}
void BlockingTCPTransport::sendThreadRunner(void* param) {
BlockingTCPTransport* obj = (BlockingTCPTransport*)param;
try {
obj->processSendQueue();
} catch (...) {
printf("sendThreadRunnner exception\n");
}
obj->freeConnectionResorces();
printf("exited.\n");
obj->_sendThreadRunning = false;
// TODO possible crash on unlock
obj->_mutex.lock();
obj->_sendThreadExited = true;
obj->_mutex.unlock();
}
void BlockingTCPTransport::enqueueSendRequest(TransportSender* sender) {
Lock lock(&_sendQueueMutex);
if(_closed) return;
Lock lock(_sendQueueMutex);
_sendQueue->insert(sender);
_sendQueueEvent.signal();
}
void BlockingTCPTransport::enqueueMonitorSendRequest(
TransportSender* sender) {
void BlockingTCPTransport::enqueueMonitorSendRequest(TransportSender* sender) {
Lock lock(&_monitorMutex);
if(_closed) return;
Lock lock(_monitorMutex);
_monitorSendQueue->insert(sender);
if(_monitorSendQueue->size()==1) enqueueSendRequest(_monitorSender);
}
void MonitorSender::send(ByteBuffer* buffer,
TransportSendControl* control) {
void MonitorSender::send(ByteBuffer* buffer, TransportSendControl* control) {
control->startMessage(19, 0);
while(true) {
TransportSender* sender;
_monitorMutex->lock();
if(_monitorSendQueue->size()>0)
try {
sender = _monitorSendQueue->extract();
} catch(...) {
sender = NULL;
}
sender = _monitorSendQueue->extract();
else
sender = NULL;
_monitorMutex->unlock();

View File

@@ -17,7 +17,7 @@
#include <noDefaultMethods.h>
#include <byteBuffer.h>
#include <lock.h>
#include <epicsException.h>
#include <event.h>
/* EPICSv3 */
#include <osdSock.h>
@@ -33,7 +33,6 @@ namespace epics {
public:
BlockingUDPTransport(ResponseHandler* responseHandler,
SOCKET channel, osiSockAddr& bindAddress,
InetAddrVector* sendAddresses,
short remoteTransportRevision);
virtual ~BlockingUDPTransport();
@@ -43,7 +42,7 @@ namespace epics {
}
virtual const osiSockAddr* getRemoteAddress() const {
return _socketAddress;
return &_bindAddress;
}
virtual const String getType() const {
@@ -97,7 +96,7 @@ namespace epics {
virtual void close(bool forced);
virtual void ensureData(int size) {
// TODO Auto-generated method stub
// noop
}
virtual void startMessage(int8 command, int ensureCapacity);
@@ -108,13 +107,12 @@ namespace epics {
}
virtual void setRecipient(const osiSockAddr& sendTo) {
if(_sendTo!=NULL) delete _sendTo;
_sendTo = new osiSockAddr;
memcpy(_sendTo, &sendTo, sizeof(osiSockAddr));
_sendToEnabled = true;
_sendTo = sendTo;
}
virtual void flushSerializeBuffer() {
// TODO Auto-generated method stub
// noop
}
virtual void ensureBuffer(int size) {
@@ -126,7 +124,15 @@ namespace epics {
* @param addresses list of ignored addresses.
*/
void setIgnoredAddresses(InetAddrVector* addresses) {
_ignoredAddresses = addresses;
if (addresses)
{
if (!_ignoredAddresses) _ignoredAddresses = new InetAddrVector;
*_ignoredAddresses = *addresses;
}
else
{
if (_ignoredAddresses) { delete _ignoredAddresses; _ignoredAddresses = 0; }
}
}
/**
@@ -154,7 +160,7 @@ namespace epics {
* @return bind address.
*/
const osiSockAddr* getBindAddress() const {
return _bindAddress;
return &_bindAddress;
}
/**
@@ -162,11 +168,19 @@ namespace epics {
* @param addresses list of send addresses, non-<code>null</code>.
*/
void setBroadcastAddresses(InetAddrVector* addresses) {
_sendAddresses = addresses;
if (addresses)
{
if (!_sendAddresses) _sendAddresses = new InetAddrVector;
*_sendAddresses = *addresses;
}
else
{
if (_sendAddresses) { delete _sendAddresses; _sendAddresses = 0; }
}
}
virtual IntrospectionRegistry* getIntrospectionRegistry() {
THROW_BASE_EXCEPTION("not supported by UDP transport");
return 0;
}
protected:
@@ -184,6 +198,8 @@ namespace epics {
bool processBuffer(osiSockAddr& fromAddress,
epics::pvData::ByteBuffer* receiveBuffer);
void close(bool forced, bool waitForThreadToComplete);
// Context only used for logging in this class
/**
@@ -191,15 +207,10 @@ namespace epics {
*/
SOCKET _channel;
/**
* Cached socket address.
*/
osiSockAddr* _socketAddress;
/**
* Bind address.
*/
osiSockAddr* _bindAddress;
osiSockAddr _bindAddress;
/**
* Send addresses.
@@ -211,8 +222,12 @@ namespace epics {
*/
InetAddrVector* _ignoredAddresses;
osiSockAddr* _sendTo;
/**
* Send address.
*/
osiSockAddr _sendTo;
bool _sendToEnabled;
/**
* Receive buffer.
*/
@@ -228,15 +243,12 @@ namespace epics {
*/
int _lastMessageStartPosition;
/**
* Read buffer
*/
char* _readBuffer;
/**
* Used for process sync.
*/
Mutex* _mutex;
Mutex _mutex;
Mutex _sendMutex;
Event _shutdownEvent;
/**
* Thread ID
@@ -245,18 +257,19 @@ namespace epics {
};
class BlockingUDPConnector : public Connector,
public epics::pvData::NoDefaultMethods {
class BlockingUDPConnector :
public Connector,
private epics::pvData::NoDefaultMethods {
public:
BlockingUDPConnector(bool reuseSocket,
InetAddrVector* sendAddresses, bool broadcast) :
_sendAddresses(sendAddresses), _reuseSocket(reuseSocket),
_broadcast(broadcast) {
BlockingUDPConnector(
bool reuseSocket,
bool broadcast) :
_reuseSocket(reuseSocket),
_broadcast(broadcast) {
}
virtual ~BlockingUDPConnector() {
// TODO: delete _sendAddresses here?
}
/**
@@ -268,11 +281,6 @@ namespace epics {
private:
/**
* Send address.
*/
InetAddrVector* _sendAddresses;
/**
* Reuse socket flag.
*/

View File

@@ -9,9 +9,6 @@
#include "blockingUDP.h"
#include "remote.h"
/* pvData */
#include <epicsException.h>
/* EPICSv3 */
#include <errlog.h>
#include <osiSock.h>
@@ -26,47 +23,40 @@ namespace epics {
Transport* BlockingUDPConnector::connect(TransportClient* client,
ResponseHandler* responseHandler, osiSockAddr& bindAddress,
short transportRevision, int16 priority) {
errlogSevPrintf(errlogInfo, "Creating datagram socket to: %s",
inetAddressToString(&bindAddress).c_str());
inetAddressToString(bindAddress).c_str());
SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if(socket==INVALID_SOCKET) {
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
errlogSevPrintf(errlogMajor, "Error creating socket: %s",
errStr);
errlogSevPrintf(errlogMajor, "Error creating socket: %s", errStr);
return 0;
}
int optval = _broadcast ? 1 : 0;
int retval = ::setsockopt(socket, SOL_SOCKET, SO_BROADCAST, &optval,
sizeof(optval));
if(retval<0) errlogSevPrintf(errlogMajor,
"Error setting SO_BROADCAST: %s", strerror(errno));
// set the socket options
int retval = ::setsockopt(socket, SOL_SOCKET, SO_BROADCAST, &optval, sizeof(optval));
if(retval<0)
{
errlogSevPrintf(errlogMajor, "Error setting SO_BROADCAST: %s", strerror(errno));
epicsSocketDestroy (socket);
return 0;
}
// set SO_REUSEADDR or SO_REUSEPORT, OS dependant
if (_reuseSocket)
epicsSocketEnableAddressUseForDatagramFanout(socket);
/*
optval = _reuseSocket ? 1 : 0;
// or SO_REUSEADDR, OS dependant
retval = ::setsockopt(socket, SOL_SOCKET, SO_REUSEPORT, &optval,
sizeof(optval));
if(retval<0) errlogSevPrintf(errlogMajor,
"Error setting SO_REUSEADDR: %s", strerror(errno));
*/
retval = ::bind(socket, (sockaddr*)&(bindAddress.sa),
sizeof(sockaddr));
retval = ::bind(socket, (sockaddr*)&(bindAddress.sa), sizeof(sockaddr));
if(retval<0) {
errlogSevPrintf(errlogMajor, "Error binding socket: %s",
strerror(errno));
THROW_BASE_EXCEPTION(strerror(errno));
errlogSevPrintf(errlogMajor, "Error binding socket: %s", strerror(errno));
epicsSocketDestroy (socket);
return 0;
}
// sockets are blocking by default
return new BlockingUDPTransport(responseHandler, socket,
bindAddress, _sendAddresses, transportRevision);
return new BlockingUDPTransport(responseHandler, socket, bindAddress, transportRevision);
}
}

View File

@@ -33,83 +33,86 @@ namespace epics {
BlockingUDPTransport::BlockingUDPTransport(
ResponseHandler* responseHandler, SOCKET channel,
osiSockAddr& bindAddress, InetAddrVector* sendAddresses,
osiSockAddr& bindAddress,
short remoteTransportRevision) :
_closed(false),
_responseHandler(responseHandler),
_channel(channel),
_sendAddresses(sendAddresses),
_ignoredAddresses(NULL),
_sendTo(NULL),
_receiveBuffer(new ByteBuffer(MAX_UDP_RECV,
EPICS_ENDIAN_BIG)),
_bindAddress(bindAddress),
_sendAddresses(0),
_ignoredAddresses(0),
_sendToEnabled(false),
_receiveBuffer(new ByteBuffer(MAX_UDP_RECV, EPICS_ENDIAN_BIG)),
_sendBuffer(new ByteBuffer(MAX_UDP_RECV, EPICS_ENDIAN_BIG)),
_lastMessageStartPosition(0), _readBuffer(
new char[MAX_UDP_RECV]), _mutex(new Mutex()),
_threadId(NULL) {
_socketAddress = new osiSockAddr;
memcpy(_socketAddress, &bindAddress, sizeof(osiSockAddr));
_bindAddress = _socketAddress;
_lastMessageStartPosition(0),
_threadId(0)
{
}
BlockingUDPTransport::~BlockingUDPTransport() {
close(true); // close the socket and stop the thread.
if(_sendTo!=NULL) delete _sendTo;
delete _socketAddress;
// _bindAddress equals _socketAddress
if (_sendAddresses) delete _sendAddresses;
if (_ignoredAddresses) delete _ignoredAddresses;
delete _receiveBuffer;
delete _sendBuffer;
delete[] _readBuffer;
delete _mutex;
delete _responseHandler;
}
void BlockingUDPTransport::start() {
String threadName = "UDP-receive "+inetAddressToString(
_socketAddress);
String threadName = "UDP-receive "+inetAddressToString(_bindAddress);
errlogSevPrintf(errlogInfo, "Starting thread: %s",
threadName.c_str());
errlogSevPrintf(errlogInfo, "Starting thread: %s",threadName.c_str());
_threadId = epicsThreadCreate(threadName.c_str(),
epicsThreadPriorityMedium, epicsThreadGetStackSize(
epicsThreadStackMedium),
epicsThreadPriorityMedium,
epicsThreadGetStackSize(epicsThreadStackMedium),
BlockingUDPTransport::threadRunner, this);
}
void BlockingUDPTransport::close(bool forced) {
if(_closed) return;
_closed = true;
close(forced, true);
}
if(_bindAddress!=NULL) errlogSevPrintf(errlogInfo,
void BlockingUDPTransport::close(bool forced, bool waitForThreadToComplete) {
{
Lock guard(&_mutex);
if(_closed) return;
_closed = true;
errlogSevPrintf(errlogInfo,
"UDP socket %s closed.",
inetAddressToString(_bindAddress).c_str());
epicsSocketDestroy(_channel);
epicsSocketDestroy(_channel);
}
// wait for send thread to exit cleanly
if (waitForThreadToComplete)
_shutdownEvent.wait(3.0);
}
void BlockingUDPTransport::enqueueSendRequest(TransportSender* sender) {
Lock lock(_mutex);
Lock lock(&_sendMutex);
_sendTo = NULL;
_sendToEnabled = false;
_sendBuffer->clear();
sender->lock();
try {
sender->send(_sendBuffer, this);
sender->unlock();
endMessage();
if(_sendTo==NULL)
if(!_sendToEnabled)
send(_sendBuffer);
else
send(_sendBuffer, *_sendTo);
send(_sendBuffer, _sendTo);
} catch(...) {
sender->unlock();
}
}
void BlockingUDPTransport::startMessage(int8 command,
int ensureCapacity) {
void BlockingUDPTransport::startMessage(int8 command, int ensureCapacity) {
_lastMessageStartPosition = _sendBuffer->getPosition();
_sendBuffer->putShort(CA_MAGIC_AND_VERSION);
_sendBuffer->putByte(0); // data
@@ -118,21 +121,29 @@ namespace epics {
}
void BlockingUDPTransport::endMessage() {
_sendBuffer->putInt(_lastMessageStartPosition+(sizeof(int16)+2),
_sendBuffer->getPosition()-_lastMessageStartPosition
-CA_MESSAGE_HEADER_SIZE);
_sendBuffer->putInt(
_lastMessageStartPosition+(sizeof(int16)+2),
_sendBuffer->getPosition()-_lastMessageStartPosition-CA_MESSAGE_HEADER_SIZE);
}
void BlockingUDPTransport::processRead() {
// This function is always called from only one thread - this
// object's own thread.
char _readBuffer[MAX_UDP_RECV];
osiSockAddr fromAddress;
try {
bool closed;
while(!_closed) {
_mutex.lock();
closed = _closed;
_mutex.unlock();
if (closed)
break;
// we poll to prevent blocking indefinitely
// data ready to be read
@@ -147,18 +158,23 @@ namespace epics {
if(bytesRead>0) {
// successfully got datagram
bool ignore = false;
if(_ignoredAddresses!=NULL) for(size_t i = 0; i
<_ignoredAddresses->size(); i++)
if(_ignoredAddresses->at(i)->ia.sin_addr.s_addr
if(_ignoredAddresses!=0)
{
for(size_t i = 0; i <_ignoredAddresses->size(); i++)
if(_ignoredAddresses->at(i).ia.sin_addr.s_addr
==fromAddress.ia.sin_addr.s_addr) {
ignore = true;
break;
}
}
if(!ignore) {
_receiveBuffer->put(_readBuffer, 0, bytesRead
<_receiveBuffer->getRemaining() ? bytesRead
: _receiveBuffer->getRemaining());
// TODO do not copy.... wrap the buffer!!!
_receiveBuffer->put(_readBuffer, 0,
bytesRead <_receiveBuffer->getRemaining() ?
bytesRead :
_receiveBuffer->getRemaining()
);
_receiveBuffer->flip();
@@ -166,22 +182,26 @@ namespace epics {
}
}
else {
// 0 == socket close
// 0 == socket remotely closed
// log a 'recvfrom' error
if(bytesRead==-1) errlogSevPrintf(errlogMajor,
if(!_closed && bytesRead==-1) errlogSevPrintf(errlogMajor,
"Socket recv error: %s", strerror(errno));
close(true, false);
break;
}
}
} catch(...) {
// TODO: catch all exceptions, and act accordingly
close(true);
close(true, false);
}
char threadName[40];
epicsThreadGetName(_threadId, threadName, 40);
errlogSevPrintf(errlogInfo, "Thread '%s' exiting", threadName);
_shutdownEvent.signal();
}
bool BlockingUDPTransport::processBuffer(osiSockAddr& fromAddress,
@@ -240,12 +260,12 @@ namespace epics {
}
bool BlockingUDPTransport::send(ByteBuffer* buffer) {
if(_sendAddresses==NULL) return false;
if(!_sendAddresses) return false;
for(size_t i = 0; i<_sendAddresses->size(); i++) {
buffer->flip();
int retval = sendto(_channel, buffer->getArray(),
buffer->getLimit(), 0, &(_sendAddresses->at(i)->sa),
buffer->getLimit(), 0, &(_sendAddresses->at(i).sa),
sizeof(sockaddr));
{
if(retval<0) errlogSevPrintf(errlogMajor,
@@ -262,7 +282,7 @@ namespace epics {
// that is the buffer size used by the platform for input on
// this DatagramSocket.
int sockBufSize;
int sockBufSize = -1;
socklen_t intLen = sizeof(int);
int retval = getsockopt(_channel, SOL_SOCKET, SO_RCVBUF,

View File

@@ -289,6 +289,8 @@ namespace epics {
*/
class ResponseHandler {
public:
virtual ~ResponseHandler() {}
/**
* Handle response.
* @param[in] responseFrom remote address of the responder, <code>0</code> if unknown.
@@ -504,6 +506,7 @@ namespace epics {
*/
class ResponseRequest {
public:
virtual ~ResponseRequest() {}
/**
* Get I/O ID.
@@ -540,6 +543,8 @@ namespace epics {
*/
class DataResponse : public ResponseRequest {
public:
virtual ~DataResponse() {}
/**
* Notification response.
* @param transport
@@ -559,6 +564,8 @@ namespace epics {
*/
class SubscriptionRequest /*: public ResponseRequest*/ {
public:
virtual ~SubscriptionRequest() {}
/**
* Update (e.g. after some time of unresponsiveness) - report current value.
*/

View File

@@ -66,10 +66,10 @@ namespace epics {
namespace pvAccess {
void addDefaultBroadcastAddress(InetAddrVector* v, in_port_t p) {
osiSockAddr* pNewNode = new osiSockAddr;
pNewNode->ia.sin_family = AF_INET;
pNewNode->ia.sin_addr.s_addr = htonl(INADDR_BROADCAST);
pNewNode->ia.sin_port = htons(p);
osiSockAddr pNewNode;
pNewNode.ia.sin_family = AF_INET;
pNewNode.ia.sin_addr.s_addr = htonl(INADDR_BROADCAST);
pNewNode.ia.sin_port = htons(p);
v->push_back(pNewNode);
}
@@ -84,7 +84,7 @@ namespace epics {
struct ifreq* pIfreqList;
struct ifreq* pifreq;
struct ifreq ifrBuff;
osiSockAddr* pNewNode;
osiSockAddr pNewNode;
InetAddrVector* retVector = new InetAddrVector();
@@ -157,16 +157,6 @@ namespace epics {
*/
if(ifrBuff.ifr_flags&IFF_LOOPBACK) continue;
pNewNode = new osiSockAddr;
if(pNewNode==NULL) {
errlogSevPrintf(errlogMajor,
"getBroadcastAddresses(): no memory available for configuration");
delete[] pIfreqList;
if(retVector->size()==0) addDefaultBroadcastAddress(
retVector, defaultPort);
return retVector;
}
/*
* If this is an interface that supports
* broadcast fetch the broadcast address.
@@ -186,10 +176,9 @@ namespace epics {
errlogMinor,
"getBroadcastAddresses(): net intf \"%s\": bcast addr fetch fail",
pifreq->ifr_name);
delete pNewNode;
continue;
}
pNewNode->sa = ifrBuff.ifr_broadaddr;
pNewNode.sa = ifrBuff.ifr_broadaddr;
}
#ifdef IFF_POINTOPOINT
else if(ifrBuff.ifr_flags&IFF_POINTOPOINT) {
@@ -201,10 +190,9 @@ namespace epics {
errlogMinor,
"getBroadcastAddresses(): net intf \"%s\": pt to pt addr fetch fail",
pifreq->ifr_name);
delete pNewNode;
continue;
}
pNewNode->sa = ifrBuff.ifr_dstaddr;
pNewNode.sa = ifrBuff.ifr_dstaddr;
}
#endif
else {
@@ -212,10 +200,9 @@ namespace epics {
errlogMinor,
"getBroadcastAddresses(): net intf \"%s\": not point to point or bcast?",
pifreq->ifr_name);
delete pNewNode;
continue;
}
pNewNode->ia.sin_port = htons(defaultPort);
pNewNode.ia.sin_port = htons(defaultPort);
retVector->push_back(pNewNode);
}
@@ -296,15 +283,15 @@ namespace epics {
size_t subEnd;
while((subEnd = list.find(' ', subStart))!=String::npos) {
String address = list.substr(subStart, (subEnd-subStart));
osiSockAddr* addr = new osiSockAddr;
aToIPAddr(address.c_str(), defaultPort, &addr->ia);
osiSockAddr addr;
aToIPAddr(address.c_str(), defaultPort, &addr.ia);
iav->push_back(addr);
subStart = list.find_first_not_of(" \t\r\n\v", subEnd);
}
if(subStart!=String::npos&&list.length()>0) {
osiSockAddr* addr = new osiSockAddr;
aToIPAddr(list.substr(subStart).c_str(), defaultPort, &addr->ia);
osiSockAddr addr;
aToIPAddr(list.substr(subStart).c_str(), defaultPort, &addr.ia);
iav->push_back(addr);
}
@@ -315,18 +302,18 @@ namespace epics {
return iav;
}
const String inetAddressToString(const osiSockAddr *addr,
const String inetAddressToString(const osiSockAddr &addr,
bool displayPort, bool displayHex) {
stringstream saddr;
int ipa = ntohl(addr->ia.sin_addr.s_addr);
int ipa = ntohl(addr.ia.sin_addr.s_addr);
saddr<<((int)(ipa>>24)&0xFF)<<'.';
saddr<<((int)(ipa>>16)&0xFF)<<'.';
saddr<<((int)(ipa>>8)&0xFF)<<'.';
saddr<<((int)ipa&0xFF);
if(displayPort) saddr<<":"<<ntohs(addr->ia.sin_port);
if(displayHex) saddr<<" ("<<hex<<ntohl(addr->ia.sin_addr.s_addr)
if(displayPort) saddr<<":"<<ntohs(addr.ia.sin_port);
if(displayHex) saddr<<" ("<<hex<<ntohl(addr.ia.sin_addr.s_addr)
<<")";
return saddr.str();

View File

@@ -28,7 +28,7 @@
namespace epics {
namespace pvAccess {
typedef std::vector<osiSockAddr*> InetAddrVector;
typedef std::vector<osiSockAddr> InetAddrVector;
/**
* returns a vector containing all the IPv4 broadcast addresses
@@ -72,7 +72,7 @@ namespace epics {
InetAddrVector* getSocketAddressList(epics::pvData::String list, int defaultPort,
const InetAddrVector* appendList = NULL);
const epics::pvData::String inetAddressToString(const osiSockAddr *addr,
const epics::pvData::String inetAddressToString(const osiSockAddr &addr,
bool displayPort = true, bool displayHex = false);
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */

View File

@@ -56,36 +56,26 @@ void testBeaconEmitter()
{
ContextImpl ctx;
DummyResponseHandler drh(&ctx);
/* SOCKET mysocket;
if ((mysocket = socket (AF_INET, SOCK_DGRAM, 0)) == -1)
{
assert(false);
}
InetAddrVector* broadcastAddresses = getBroadcastAddresses(mysocket);*/
SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, 0);
auto_ptr<InetAddrVector> broadcastAddresses(getBroadcastAddresses(socket, 5067));
epicsSocketDestroy (socket);
InetAddrVector* broadcastAddresses = new InetAddrVector;
osiSockAddr* addr = new osiSockAddr;
addr->ia.sin_family = AF_INET;
addr->ia.sin_port = htons(5067);
if(inet_aton("255.255.255.255",&addr->ia.sin_addr)==0)
{
assert(false);
}
broadcastAddresses->push_back(addr);
BlockingUDPConnector connector(true, broadcastAddresses, true);
BlockingUDPConnector connector(true, true);
osiSockAddr bindAddr;
bindAddr.ia.sin_family = AF_INET;
bindAddr.ia.sin_port = htons(5066);
bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY);
Transport* transport = connector.connect(NULL, &drh, bindAddr, 1, 50);
BlockingUDPTransport* transport = (BlockingUDPTransport*)connector.connect(NULL, &drh, bindAddr, 1, 50);
transport->setBroadcastAddresses(broadcastAddresses.get());
cout<<"Sending beacons"<<endl;
BeaconEmitter beaconEmitter(transport, transport->getRemoteAddress());
beaconEmitter.start();
while(1) sleep(1);
epicsThreadSleep (60.0);
delete transport;
}

View File

@@ -57,15 +57,15 @@ public:
transport->ensureData((2*sizeof(int16)+2*sizeof(int32)+128)/sizeof(int8));
const int32 sequentalID = payloadBuffer->getShort() & 0x0000FFFF;
const TimeStamp startupTimestamp(payloadBuffer->getInt() & 0x00000000FFFFFFFFL,(int32)(payloadBuffer->getInt() & 0x00000000FFFFFFFFL));
/*const int32 sequentalID = */ payloadBuffer->getShort();
const TimeStamp startupTimestamp(payloadBuffer->getInt(),payloadBuffer->getInt());
// 128-bit IPv6 address
osiSockAddr address;
decodeFromIPv6Address(payloadBuffer, &address);
// get port
const int32 port = payloadBuffer->getShort() & 0xFFFF;
const int32 port = payloadBuffer->getShort();
address.ia.sin_port = ntohs(port);
// accept given address if explicitly specified by sender
@@ -130,7 +130,7 @@ void testBeaconHandler()
{
ContextImpl ctx;
BeaconResponseHandler brh(&ctx);
BlockingUDPConnector connector(false, NULL, true);
BlockingUDPConnector connector(false, true);
osiSockAddr bindAddr;
bindAddr.ia.sin_family = AF_INET;
@@ -139,7 +139,7 @@ void testBeaconHandler()
Transport* transport = connector.connect(NULL, &brh, bindAddr, 1, 50);
(static_cast<BlockingUDPTransport*>(transport))->start();
while(1) sleep(1);
epicsThreadSleep (60.0);
delete transport;
}

View File

@@ -5,17 +5,19 @@
* Author: Miha Vitorovic
*/
#include "remote.h"
#include "blockingUDP.h"
#include "logger.h"
#include "inetAddressUtil.h"
#include <remote.h>
#include <blockingUDP.h>
#include <logger.h>
#include <inetAddressUtil.h>
//#include <showConstructDestruct.h>
#include <osiSock.h>
#include <iostream>
#include <cstdio>
#define SRV_IP "192.168.71.132"
#define SRV_IP "127.0.0.1"
using namespace epics::pvAccess;
using namespace epics::pvData;
@@ -28,19 +30,16 @@ static osiSockAddr sendTo;
class ContextImpl : public Context {
public:
ContextImpl() :
_tr(new TransportRegistry()), _timer(new Timer("server thread",
lowPriority)), _conf(new SystemConfigurationImpl()) {
}
ContextImpl()
{}
virtual ~ContextImpl() {
delete _tr;
delete _timer;
}
virtual Timer* getTimer() {
return _timer;
return 0;
}
virtual TransportRegistry* getTransportRegistry() {
return _tr;
return 0;
}
virtual Channel* getChannel(epics::pvAccess::pvAccessID) {
return 0;
@@ -49,13 +48,8 @@ public:
return 0;
}
virtual Configuration* getConfiguration() {
return _conf;
return 0;
}
private:
TransportRegistry* _tr;
Timer* _timer;
Configuration* _conf;
};
class DummyResponseHandler : public ResponseHandler {
@@ -63,6 +57,8 @@ public:
DummyResponseHandler(Context* ctx)
{ }
virtual ~DummyResponseHandler() {}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command, int payloadSize,
ByteBuffer* payloadBuffer) {
@@ -97,7 +93,7 @@ private:
};
void testBlockingUDPSender() {
BlockingUDPConnector connector(false, NULL, true);
BlockingUDPConnector connector(false, true);
ContextImpl ctx;
DummyTransportSender dts;
@@ -130,8 +126,11 @@ void testBlockingUDPSender() {
}
int main(int argc, char *argv[]) {
createFileLogger("testBlockingUDPClnt.log");
// createFileLogger("testBlockingUDPClnt.log");
testBlockingUDPSender();
// std::cout << "-----------------------------------------------------------------------" << std::endl;
// getShowConstructDestruct()->constuctDestructTotals(stdout);
return (0);
}

View File

@@ -23,19 +23,15 @@ using std::dec;
class ContextImpl : public Context {
public:
ContextImpl() :
_tr(new TransportRegistry()), _timer(new Timer("server thread",
lowPriority)), _conf(new SystemConfigurationImpl()) {
}
ContextImpl() {}
virtual ~ContextImpl() {
delete _tr;
delete _timer;
}
virtual Timer* getTimer() {
return _timer;
return 0;
}
virtual TransportRegistry* getTransportRegistry() {
return _tr;
return 0;
}
virtual Channel* getChannel(epics::pvAccess::pvAccessID) {
return 0;
@@ -44,13 +40,8 @@ public:
return 0;
}
virtual Configuration* getConfiguration() {
return _conf;
return 0;
}
private:
TransportRegistry* _tr;
Timer* _timer;
Configuration* _conf;
};
class DummyResponseHandler : public ResponseHandler {
@@ -58,6 +49,8 @@ public:
DummyResponseHandler(Context* context)
: packets(0) {
}
virtual ~DummyResponseHandler() {}
int getPackets() {
return packets;
@@ -102,7 +95,7 @@ void DummyResponseHandler::handleResponse(osiSockAddr* responseFrom,
}
void testBlockingUDPConnector() {
BlockingUDPConnector connector(false, NULL, true);
BlockingUDPConnector connector(false, true);
ContextImpl ctx;
DummyResponseHandler drh(&ctx);
@@ -127,7 +120,7 @@ void testBlockingUDPConnector() {
}
int main(int argc, char *argv[]) {
createFileLogger("testBlockingUDPSrv.log");
// createFileLogger("testBlockingUDPSrv.log");
testBlockingUDPConnector();
return (0);

File diff suppressed because it is too large Load Diff

View File

@@ -32,25 +32,25 @@ int main(int argc, char *argv[]) {
assert(vec->size()==3);
osiSockAddr* addr;
osiSockAddr addr;
addr = vec->at(0);
assert(addr->ia.sin_family==AF_INET);
assert(addr->ia.sin_port==htons(555));
assert(addr->ia.sin_addr.s_addr==htonl(0x7F000001));
assert(addr.ia.sin_family==AF_INET);
assert(addr.ia.sin_port==htons(555));
assert(addr.ia.sin_addr.s_addr==htonl(0x7F000001));
assert(inetAddressToString(addr)=="127.0.0.1:555");
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
addr = vec->at(1);
assert(addr->ia.sin_family==AF_INET);
assert(addr->ia.sin_port==htons(1234));
assert(addr->ia.sin_addr.s_addr==htonl(0x0A0A0C0B));
assert(addr.ia.sin_family==AF_INET);
assert(addr.ia.sin_port==htons(1234));
assert(addr.ia.sin_addr.s_addr==htonl(0x0A0A0C0B));
assert(inetAddressToString(addr)=="10.10.12.11:1234");
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
addr = vec->at(2);
assert(addr->ia.sin_family==AF_INET);
assert(addr->ia.sin_port==htons(555));
assert(addr->ia.sin_addr.s_addr==htonl(0xC0A80304));
assert(addr.ia.sin_family==AF_INET);
assert(addr.ia.sin_port==htons(555));
assert(addr.ia.sin_addr.s_addr==htonl(0xC0A80304));
assert(inetAddressToString(addr)=="192.168.3.4:555");
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
@@ -62,55 +62,58 @@ int main(int argc, char *argv[]) {
assert(vec1->size()==4);
addr = vec1->at(0);
assert(addr->ia.sin_family==AF_INET);
assert(addr->ia.sin_port==htons(6789));
assert(addr->ia.sin_addr.s_addr==htonl(0xAC1037A0));
assert(addr.ia.sin_family==AF_INET);
assert(addr.ia.sin_port==htons(6789));
assert(addr.ia.sin_addr.s_addr==htonl(0xAC1037A0));
assert(inetAddressToString(addr)=="172.16.55.160:6789");
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
addr = vec1->at(1);
assert(addr->ia.sin_family==AF_INET);
assert(addr->ia.sin_port==htons(555));
assert(addr->ia.sin_addr.s_addr==htonl(0x7F000001));
assert(addr.ia.sin_family==AF_INET);
assert(addr.ia.sin_port==htons(555));
assert(addr.ia.sin_addr.s_addr==htonl(0x7F000001));
assert(inetAddressToString(addr)=="127.0.0.1:555");
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
addr = vec1->at(2);
assert(addr->ia.sin_family==AF_INET);
assert(addr->ia.sin_port==htons(1234));
assert(addr->ia.sin_addr.s_addr==htonl(0x0A0A0C0B));
assert(addr.ia.sin_family==AF_INET);
assert(addr.ia.sin_port==htons(1234));
assert(addr.ia.sin_addr.s_addr==htonl(0x0A0A0C0B));
assert(inetAddressToString(addr)=="10.10.12.11:1234");
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
addr = vec1->at(3);
assert(addr->ia.sin_family==AF_INET);
assert(addr->ia.sin_port==htons(555));
assert(addr->ia.sin_addr.s_addr==htonl(0xC0A80304));
assert(addr.ia.sin_family==AF_INET);
assert(addr.ia.sin_port==htons(555));
assert(addr.ia.sin_addr.s_addr==htonl(0xC0A80304));
assert(inetAddressToString(addr)=="192.168.3.4:555");
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
cout<<"\nPASSED!\n";
cout<<"Testing \"ipv4AddressToInt\""<<endl;
assert(ipv4AddressToInt(*(vec->at(0)))==(int32)0x7F000001);
assert(ipv4AddressToInt(*(vec->at(1)))==(int32)0x0A0A0C0B);
assert(ipv4AddressToInt(*(vec->at(2)))==(int32)0xC0A80304);
assert(ipv4AddressToInt((vec->at(0)))==(int32)0x7F000001);
assert(ipv4AddressToInt((vec->at(1)))==(int32)0x0A0A0C0B);
assert(ipv4AddressToInt((vec->at(2)))==(int32)0xC0A80304);
cout<<"\nPASSED!\n";
delete vec;
delete vec1;
osiSockAddr* paddr;
cout<<"Testing \"intToIPv4Address\""<<endl;
addr = intToIPv4Address(0x7F000001);
assert(addr->ia.sin_family==AF_INET);
assert(inetAddressToString(addr)=="127.0.0.1:0");
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
delete addr;
paddr = intToIPv4Address(0x7F000001);
assert(paddr->ia.sin_family==AF_INET);
assert(inetAddressToString(*paddr)=="127.0.0.1:0");
cout<<'\t'<<inetAddressToString(*paddr, true)<<endl;
delete paddr;
addr = intToIPv4Address(0x0A0A0C0B);
assert(addr->ia.sin_family==AF_INET);
assert(inetAddressToString(addr)=="10.10.12.11:0");
cout<<'\t'<<inetAddressToString(addr, true)<<endl;
paddr = intToIPv4Address(0x0A0A0C0B);
assert(paddr->ia.sin_family==AF_INET);
assert(inetAddressToString(*paddr)=="10.10.12.11:0");
cout<<'\t'<<inetAddressToString(*paddr, true)<<endl;
addr = *paddr;
delete paddr;
cout<<"\nPASSED!\n";
@@ -122,7 +125,7 @@ int main(int argc, char *argv[]) {
(char)0, (char)0, (char)0, (char)0, (char)0xFF, (char)0xFF,
(char)0x0A, (char)0x0A, (char)0x0C, (char)0x0B };
encodeAsIPv6Address(buff, addr);
encodeAsIPv6Address(buff, &addr);
assert(strncmp(buff->getArray(), src, 16)==0);
cout<<"\nPASSED!\n";
@@ -135,7 +138,6 @@ int main(int argc, char *argv[]) {
}
delete broadcasts;
delete addr;
return 0;
}