This commit is contained in:
Matej Sekoranja
2011-01-05 10:15:52 +01:00
21 changed files with 1599 additions and 42 deletions

View File

@@ -19,18 +19,25 @@ INC += inetAddressUtil.h
INC += logger.h
INC += introspectionRegistry.h
INC += transportRegistry.h
INC += namedLockPattern.h
INC += referenceCountingLock.h
LIBSRCS += hexDump.cpp
LIBSRCS += wildcharMatcher.cpp
LIBSRCS += inetAddressUtil.cpp
LIBSRCS += logger.cpp
LIBSRCS += introspectionRegistry.cpp
LIBSRCS += transportRegistry.cpp
LIBSRCS += namedLockPattern.cpp
LIBSRCS += referenceCountingLock.cpp
SRC_DIRS += $(PVACCESS)/client
INC += pvAccess.h
LIBSRCS += pvAccess.cpp
SRC_DIRS += $(PVACCESS)/server
INC += serverContext.h
SRC_DIRS += $(PVACCESS)/factory
LIBSRCS += ChannelAccessFactory.cpp
@@ -52,6 +59,9 @@ LIBSRCS += beaconHandler.cpp
LIBSRCS += blockingTCPTransport.cpp
LIBSRCS += blockingClientTCPTransport.cpp
LIBSRCS += blockingTCPConnector.cpp
LIBSRCS += blockingServerTCPTransport.cpp
LIBSRCS += blockingTCPAcceptor.cpp
LIBSRCS += responseHandlers.cpp
LIBRARY = pvAccess
pvAccess_LIBS += Com

View File

@@ -69,7 +69,7 @@ bool BeaconHandler::updateBeacon(int8 remoteTransportRevision, int64 timestamp,
void BeaconHandler::beaconArrivalNotify()
{
int32 size;
int32 size = 0;
//TODO TCP name must be get from somewhere not hardcoded
//TODO
Transport** transports = NULL;//_context->getTransportRegistry().get("TCP", _responseFrom, size);
@@ -83,12 +83,12 @@ void BeaconHandler::beaconArrivalNotify()
{
transports[i]->aliveNotification();
}
delete transports;
delete[] transports;
}
void BeaconHandler::changedTransport()
{
int32 size;
int32 size = 0;
//TODO TCP name must be get from somewhere not hardcoded
//TODO
Transport** transports = NULL;//_context->getTransportRegistry().get("TCP", _responseFrom, size);
@@ -102,7 +102,7 @@ void BeaconHandler::changedTransport()
{
transports[i]->changedTransport();
}
delete transports;
delete[] transports;
}
}}

View File

@@ -67,7 +67,6 @@ namespace epics { namespace pvAccess {
* Mutex
*/
Mutex _mutex;
/**
* Update beacon.
* @param remoteTransportRevision encoded (major, minor) revision.

View File

@@ -24,7 +24,7 @@ namespace epics { namespace pvAccess {
*/
BeaconServerStatusProvider(ServerContext* context);
/**
* Test Constructor (ohne context)
* Test Constructor (without context)
*/
BeaconServerStatusProvider();
/**

View File

@@ -0,0 +1,126 @@
/*
* blockingServerTCPTransport.cpp
*
* Created on: Jan 4, 2011
* Author: Miha Vitorovic
*/
/* pvAccess */
#include "blockingTCP.h"
#include "remote.h"
/* pvData */
#include <lock.h>
#include <byteBuffer.h>
/* EPICSv3 */
#include <errlog.h>
/* standard */
#include <map>
using namespace epics::pvData;
using std::map;
namespace epics {
namespace pvAccess {
BlockingServerTCPTransport::BlockingServerTCPTransport(
Context* context, SOCKET channel,
ResponseHandler* responseHandler, int receiveBufferSize) :
BlockingTCPTransport(context, channel, responseHandler,
receiveBufferSize, CA_DEFAULT_PRIORITY),
_introspectionRegistry(new IntrospectionRegistry(true)),
_lastChannelSID(0), _channels(
new map<int, ServerChannel*> ()), _channelsMutex(
new Mutex()) {
// NOTE: priority not yet known, default priority is used to register/unregister
// TODO implement priorities in Reactor... not that user will
// change it.. still getPriority() must return "registered" priority!
start();
}
BlockingServerTCPTransport::~BlockingServerTCPTransport() {
delete _introspectionRegistry;
delete _channels;
delete _channelsMutex;
}
void BlockingServerTCPTransport::destroyAllChannels() {
Lock lock(_channelsMutex);
if(_channels->size()==0) return;
char ipAddrStr[64];
ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr));
errlogSevPrintf(
errlogInfo,
"Transport to %s still has %d channel(s) active and closing...",
ipAddrStr, _channels->size());
map<int, ServerChannel*>::iterator it = _channels->begin();
for(; it!=_channels->end(); it++)
it->second->destroy();
_channels->clear();
}
void BlockingServerTCPTransport::internalClose(bool force) {
BlockingTCPTransport::internalClose(force);
destroyAllChannels();
}
int BlockingServerTCPTransport::preallocateChannelSID() {
Lock lock(_channelsMutex);
// search first free (theoretically possible loop of death)
int sid = ++_lastChannelSID;
while(_channels->find(sid)!=_channels->end())
sid = ++_lastChannelSID;
return sid;
}
void BlockingServerTCPTransport::registerChannel(int sid,
ServerChannel* channel) {
Lock lock(_channelsMutex);
(*_channels)[sid] = channel;
}
void BlockingServerTCPTransport::unregisterChannel(int sid) {
Lock lock(_channelsMutex);
_channels->erase(sid);
}
ServerChannel* BlockingServerTCPTransport::getChannel(int sid) {
Lock lock(_channelsMutex);
map<int, ServerChannel*>::iterator it = _channels->find(sid);
if(it!=_channels->end()) return it->second;
return NULL;
}
int BlockingServerTCPTransport::getChannelCount() {
Lock lock(_channelsMutex);
return _channels->size();
}
void BlockingServerTCPTransport::send(ByteBuffer* buffer,
TransportSendControl* control) {
//
// send verification message
//
control->startMessage(1, 2*sizeof(int32));
// receive buffer size
buffer->putInt(getReceiveBufferSize());
// socket receive buffer size
buffer->putInt(getSocketReceiveBufferSize());
// send immediately
control->flush(true);
}
}
}

View File

@@ -14,21 +14,23 @@
#include "growingCircularBuffer.h"
#include "transportRegistry.h"
#include "introspectionRegistry.h"
#include "namedLockPattern.h"
/* pvData */
#include <byteBuffer.h>
#include <pvType.h>
#include <lock.h>
#include <epicsThread.h>
#include <timer.h>
/* EPICSv3 */
#include <osdSock.h>
#include <osiSock.h>
#include <epicsTime.h>
#include <epicsThread.h>
/* standard */
#include <set>
#include <map>
namespace epics {
namespace pvAccess {
@@ -472,6 +474,20 @@ namespace epics {
void responsiveTransport();
};
/**
* comparator for osiSockAddr*
*/
struct comp_osiSockAddrPtr
{
bool operator()(osiSockAddr const *a, osiSockAddr const *b)
{
if (a->sa.sa_family < b->sa.sa_family) return true;
if ((a->sa.sa_family == b->sa.sa_family) && (a->ia.sin_addr.s_addr < b->ia.sin_addr.s_addr )) return true;
if ((a->sa.sa_family == b->sa.sa_family) && (a->ia.sin_addr.s_addr == b->ia.sin_addr.s_addr ) && ( a->ia.sin_port < b->ia.sin_port )) return true;
return false;
}
};
/**
* Channel Access TCP connector.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
@@ -485,8 +501,8 @@ namespace epics {
virtual ~BlockingTCPConnector();
virtual Transport* connect(TransportClient* client,
ResponseHandler* responseHandler, osiSockAddr* address,
short transportRevision, int16 priority);
ResponseHandler* responseHandler, osiSockAddr* address,
short transportRevision, int16 priority);
private:
/**
* Lock timeout
@@ -499,9 +515,9 @@ namespace epics {
Context* _context;
/**
* Context instance.
* named lock
*/
//NamedLockPattern* _namedLocker;
NamedLockPattern<const osiSockAddr*, comp_osiSockAddrPtr>* _namedLocker;
/**
* Receive buffer size.
@@ -524,6 +540,207 @@ namespace epics {
};
class BlockingServerTCPTransport : public BlockingTCPTransport,
public ChannelHostingTransport,
public TransportSender {
public:
BlockingServerTCPTransport(Context* context, SOCKET channel,
ResponseHandler* responseHandler, int receiveBufferSize);
virtual ~BlockingServerTCPTransport();
virtual IntrospectionRegistry* getIntrospectionRegistry() {
return _introspectionRegistry;
}
/**
* Preallocate new channel SID.
* @return new channel server id (SID).
*/
virtual int preallocateChannelSID();
/**
* De-preallocate new channel SID.
* @param sid preallocated channel SID.
*/
virtual void depreallocateChannelSID(int sid) {
// noop
}
/**
* Register a new channel.
* @param sid preallocated channel SID.
* @param channel channel to register.
*/
virtual void registerChannel(int sid, ServerChannel* channel);
/**
* Unregister a new channel (and deallocates its handle).
* @param sid SID
*/
virtual void unregisterChannel(int sid);
/**
* Get channel by its SID.
* @param sid channel SID
* @return channel with given SID, <code>NULL</code> otherwise
*/
virtual ServerChannel* getChannel(int sid);
/**
* Get channel count.
* @return channel count.
*/
virtual int getChannelCount();
virtual epics::pvData::PVField* getSecurityToken() {
return NULL;
}
virtual void lock() {
// noop
}
virtual void unlock() {
// noop
}
/**
* Verify transport. Server side is self-verified.
*/
void verify() {
enqueueSendRequest(this);
verified();
}
/**
* CA connection validation request.
* A server sends a validate connection message when it receives a new connection.
* The message indicates that the server is ready to receive requests; the client must
* not send any messages on the connection until it has received the validate connection message
* from the server. No reply to the message is expected by the server.
* The purpose of the validate connection message is two-fold:
* It informs the client of the protocol version supported by the server.
* It prevents the client from writing a request message to its local transport
* buffers until after the server has acknowledged that it can actually process the
* request. This avoids a race condition caused by the server's TCP/IP stack
* accepting connections in its backlog while the server is in the process of shutting down:
* if the client were to send a request in this situation, the request
* would be lost but the client could not safely re-issue the request because that
* might violate at-most-once semantics.
* The validate connection message guarantees that a server is not in the middle
* of shutting down when the server's TCP/IP stack accepts an incoming connection
* and so avoids the race condition.
* @see org.epics.ca.impl.remote.TransportSender#send(java.nio.ByteBuffer, org.epics.ca.impl.remote.TransportSendControl)
*/
virtual void send(epics::pvData::ByteBuffer* buffer,
TransportSendControl* control);
protected:
/**
* Introspection registry.
*/
IntrospectionRegistry* _introspectionRegistry;
virtual void internalClose(bool force);
private:
/**
* Last SID cache.
*/
volatile int _lastChannelSID;
/**
* Channel table (SID -> channel mapping).
*/
std::map<int, ServerChannel*>* _channels;
Mutex* _channelsMutex;
/**
* Destroy all channels.
*/
void destroyAllChannels();
};
/**
* Channel Access Server TCP acceptor.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: BlockingTCPAcceptor.java,v 1.1 2010/05/03 14:45:42 mrkraimer Exp $
*/
class BlockingTCPAcceptor {
public:
/**
* @param context
* @param port
* @param receiveBufferSize
* @throws CAException
*/
BlockingTCPAcceptor(Context* context, int port,
int receiveBufferSize);
~BlockingTCPAcceptor();
void handleEvents();
/**
* Bind socket address.
* @return bind socket address, <code>null</code> if not binded.
*/
osiSockAddr* getBindAddress() {
return _bindAddress;
}
/**
* Destroy acceptor (stop listening).
*/
void destroy();
private:
/**
* Context instance.
*/
Context* _context;
/**
* Bind server socket address.
*/
osiSockAddr* _bindAddress;
/**
* Server socket channel.
*/
SOCKET _serverSocketChannel;
/**
* Receive buffer size.
*/
int _receiveBufferSize;
/**
* Destroyed flag.
*/
volatile bool _destroyed;
epicsThreadId _threadId;
/**
* Initialize connection acception.
* @return port where server is listening
*/
int initialize(in_port_t port);
/**
* Validate connection by sending a validation message request.
* @return <code>true</code> on success.
*/
bool validateConnection(BlockingServerTCPTransport* transport,
const char* address);
static void handleEventsRunner(void* param);
};
}
}

View File

@@ -0,0 +1,280 @@
/*
* blockingTCPAcceptor.cpp
*
* Created on: Jan 4, 2011
* Author: Miha Vitorovic
*/
/* pvAccess */
#include "blockingTCP.h"
#include "remote.h"
#include "serverContext.h"
/* pvData */
#include <epicsException.h>
/* EPICSv3 */
#include <errlog.h>
#include <osiSock.h>
#include <epicsThread.h>
/* standard */
#include <sstream>
#include <poll.h>
using std::ostringstream;
namespace epics {
namespace pvAccess {
BlockingTCPAcceptor::BlockingTCPAcceptor(Context* context, int port,
int receiveBufferSize) :
_context(context), _bindAddress(NULL), _serverSocketChannel(
INVALID_SOCKET), _receiveBufferSize(receiveBufferSize),
_destroyed(false), _threadId(NULL) {
initialize(port);
}
BlockingTCPAcceptor::~BlockingTCPAcceptor() {
if(_bindAddress!=NULL) delete _bindAddress;
}
int BlockingTCPAcceptor::initialize(in_port_t port) {
// specified bind address
_bindAddress = new osiSockAddr;
_bindAddress->ia.sin_family = AF_INET;
_bindAddress->ia.sin_port = htons(port);
_bindAddress->ia.sin_addr.s_addr = htonl(INADDR_ANY);
char strBuffer[64];
char ipAddrStr[48];
ipAddrToA(&_bindAddress->ia, ipAddrStr, sizeof(ipAddrStr));
int tryCount = 0;
while(tryCount<2) {
errlogSevPrintf(errlogInfo, "Creating acceptor to %s.",
ipAddrStr);
_serverSocketChannel = epicsSocketCreate(AF_INET, SOCK_STREAM,
IPPROTO_TCP);
if(_serverSocketChannel==INVALID_SOCKET) {
epicsSocketConvertErrnoToString(strBuffer,
sizeof(strBuffer));
ostringstream temp;
temp<<"Socket create error: "<<strBuffer;
errlogSevPrintf(errlogMajor, temp.str().c_str());
THROW_BASE_EXCEPTION(temp.str().c_str());
}
else {
// try to bind
int retval = ::bind(_serverSocketChannel,
&_bindAddress->sa, sizeof(sockaddr));
if(retval<0) {
epicsSocketConvertErrnoToString(strBuffer,
sizeof(strBuffer));
errlogSevPrintf(errlogMinor, "Socket bind error: %s",
strBuffer);
if(_bindAddress->ia.sin_port!=0) {
// failed to bind to specified bind address,
// try to get port dynamically, but only once
errlogSevPrintf(
errlogMinor,
"Configured TCP port %d is unavailable, trying to assign it dynamically.",
port);
_bindAddress->ia.sin_port = htons(0);
}
else {
::close(_serverSocketChannel);
break; // exit while loop
}
}
else { // if(retval<0)
// bind succeeded
// update bind address, if dynamically port selection was used
if(ntohs(_bindAddress->ia.sin_port)==0) {
socklen_t sockLen = sizeof(sockaddr);
// read the actual socket info
retval = ::getsockname(_serverSocketChannel,
&_bindAddress->sa, &sockLen);
if(retval<0) {
// error obtaining port number
epicsSocketConvertErrnoToString(strBuffer,
sizeof(strBuffer));
errlogSevPrintf(errlogMinor,
"getsockname error: %s", strBuffer);
}
else {
errlogSevPrintf(
errlogInfo,
"Using dynamically assigned TCP port %d.",
ntohs(_bindAddress->ia.sin_port));
}
}
retval = ::listen(_serverSocketChannel, 5);
if(retval<0) {
epicsSocketConvertErrnoToString(strBuffer,
sizeof(strBuffer));
ostringstream temp;
temp<<"Socket listen error: "<<strBuffer;
errlogSevPrintf(errlogMajor, temp.str().c_str());
THROW_BASE_EXCEPTION(temp.str().c_str());
}
_threadId
= epicsThreadCreate(
"TCP-acceptor",
epicsThreadPriorityMedium,
epicsThreadGetStackSize(
epicsThreadStackMedium),
BlockingTCPAcceptor::handleEventsRunner,
this);
// all OK, return
return ntohs(_bindAddress->ia.sin_port);
} // successful bind
} // successfully obtained socket
tryCount++;
} // while
ostringstream temp;
temp<<"Failed to create acceptor to "<<ipAddrStr;
THROW_BASE_EXCEPTION(temp.str().c_str());
}
void BlockingTCPAcceptor::handleEvents() {
// rise level if port is assigned dynamically
char ipAddrStr[48];
ipAddrToA(&_bindAddress->ia, ipAddrStr, sizeof(ipAddrStr));
errlogSevPrintf(errlogInfo, "Accepting connections at %s.",
ipAddrStr);
bool socketOpen = true;
char strBuffer[64];
pollfd sockets[1];
sockets[0].fd = _serverSocketChannel;
sockets[0].events = POLLIN;
while(!_destroyed&&socketOpen) {
int retval = ::poll(sockets, 1, 50);
if(retval<0) {
// error in poll
epicsSocketConvertErrnoToString(strBuffer,
sizeof(strBuffer));
errlogSevPrintf(errlogMajor, "socket poll error: %s",
strBuffer);
socketOpen = false;
}
else if(retval>0) {
// some event on a socket
if(sockets[0].revents&POLLIN!=0) {
// connection waiting
osiSockAddr address;
osiSocklen_t len = sizeof(sockaddr);
SOCKET newClient = epicsSocketAccept(
_serverSocketChannel, &address.sa, &len);
if(newClient!=INVALID_SOCKET) {
// accept succeeded
ipAddrToA(&address.ia, ipAddrStr, sizeof(ipAddrStr));
errlogSevPrintf(errlogInfo,
"Accepted connection from CA client: %s",
ipAddrStr);
// enable TCP_NODELAY (disable Nagle's algorithm)
int optval = 1; // true
retval = ::setsockopt(newClient, IPPROTO_TCP,
TCP_NODELAY, &optval, sizeof(int));
if(retval<0) {
epicsSocketConvertErrnoToString(strBuffer,
sizeof(strBuffer));
errlogSevPrintf(errlogMinor,
"Error setting TCP_NODELAY: %s",
strBuffer);
}
// enable TCP_KEEPALIVE
retval = ::setsockopt(newClient, SOL_SOCKET,
SO_KEEPALIVE, &optval, sizeof(int));
if(retval<0) {
epicsSocketConvertErrnoToString(strBuffer,
sizeof(strBuffer));
errlogSevPrintf(errlogMinor,
"Error setting SO_KEEPALIVE: %s",
strBuffer);
}
// TODO tune buffer sizes?!
//socket.socket().setReceiveBufferSize();
//socket.socket().setSendBufferSize();
// create transport
// each transport should have its own response handler since it is not "shareable"
BlockingServerTCPTransport
* transport =
new BlockingServerTCPTransport(
_context,
newClient,
new ServerResponseHandler(
(ServerContextImpl*)_context),
_receiveBufferSize);
// validate connection
if(!validateConnection(transport, ipAddrStr)) {
transport->close(true);
errlogSevPrintf(
errlogInfo,
"Connection to CA client %s failed to be validated, closing it.",
ipAddrStr);
return;
}
errlogSevPrintf(errlogInfo,
"Serving to CA client: %s", ipAddrStr);
}// accept succeeded
} // connection waiting
if(sockets[0].revents&(POLLERR|POLLHUP|POLLNVAL)!=0) {
errlogSevPrintf(errlogMajor,
"error on a socket: POLLERR|POLLHUP|POLLNVAL");
socketOpen = false;
}
} // some event on a socket
} // while
}
bool BlockingTCPAcceptor::validateConnection(
BlockingServerTCPTransport* transport, const char* address) {
try {
transport->verify();
return true;
} catch(...) {
errlogSevPrintf(errlogInfo, "Validation of %s failed.", address);
return false;
}
}
void BlockingTCPAcceptor::handleEventsRunner(void* param) {
((BlockingTCPAcceptor*)param)->handleEvents();
}
void BlockingTCPAcceptor::destroy() {
if(_destroyed) return;
_destroyed = true;
if(_serverSocketChannel!=INVALID_SOCKET) {
char ipAddrStr[48];
ipAddrToA(&_bindAddress->ia, ipAddrStr, sizeof(ipAddrStr));
errlogSevPrintf(errlogInfo,
"Stopped accepting connections at %s.", ipAddrStr);
epicsSocketDestroy(_serverSocketChannel);
}
}
}
}

View File

@@ -7,6 +7,7 @@
#include "blockingTCP.h"
#include "remote.h"
#include "namedLockPattern.h"
#include <epicsThread.h>
#include <osiSock.h>
@@ -21,14 +22,14 @@ namespace epics {
BlockingTCPConnector::BlockingTCPConnector(Context* context,
int receiveBufferSize, float beaconInterval) :
_context(context), _receiveBufferSize(receiveBufferSize),
_beaconInterval(beaconInterval)
//TODO , _namedLocker(new NamedLockPattern())
{
_context(context), _namedLocker(new NamedLockPattern<
const osiSockAddr*, comp_osiSockAddrPtr> ()),
_receiveBufferSize(receiveBufferSize), _beaconInterval(
beaconInterval) {
}
BlockingTCPConnector::~BlockingTCPConnector() {
// TODO delete _namedLocker;
delete _namedLocker;
}
SOCKET BlockingTCPConnector::tryConnect(osiSockAddr* address, int tries) {
@@ -86,10 +87,8 @@ namespace epics {
if(transport->acquire(client)) return transport;
}
bool lockAcquired = true;
// TODO comment out
//bool lockAcquired = _namedLocker->acquireSynchronizationObject(
// address, LOCK_TIMEOUT);
bool lockAcquired = _namedLocker->acquireSynchronizationObject(
address, LOCK_TIMEOUT);
if(lockAcquired) {
try {
// ... transport created during waiting in lock
@@ -115,13 +114,13 @@ namespace epics {
// enable TCP_NODELAY (disable Nagle's algorithm)
int optval = 1; // true
int retval = ::setsockopt(socket, IPPROTO_TCP, TCP_NODELAY,
&optval, sizeof(optval));
&optval, sizeof(int));
if(retval<0) errlogSevPrintf(errlogMajor,
"Error setting TCP_NODELAY: %s", strerror(errno));
// enable TCP_KEEPALIVE
retval = ::setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE,
&optval, sizeof(optval));
&optval, sizeof(int));
if(retval<0) errlogSevPrintf(errlogMinor,
"Error setting SO_KEEPALIVE: %s", strerror(errno));
@@ -130,9 +129,10 @@ namespace epics {
//socket.socket().setSendBufferSize();
// create transport
transport = new BlockingClientTCPTransport(_context, socket,
responseHandler, _receiveBufferSize, client,
transportRevision, _beaconInterval, priority);
transport = new BlockingClientTCPTransport(_context,
socket, responseHandler, _receiveBufferSize,
client, transportRevision, _beaconInterval,
priority);
// verify
if(!transport->waitUntilVerified(3.0)) {
@@ -156,11 +156,10 @@ namespace epics {
} catch(...) {
// close socket, if open
if(socket!=INVALID_SOCKET) epicsSocketDestroy(socket);
// TODO namedLocker.releaseSynchronizationObject(address);
_namedLocker->releaseSynchronizationObject(address);
throw;
}
_namedLocker->releaseSynchronizationObject(address);
}
else {
ostringstream temp;

View File

@@ -11,11 +11,13 @@
#include "caConstants.h"
#include "transportRegistry.h"
#include "introspectionRegistry.h"
#include "serverContext.h"
#include <serialize.h>
#include <pvType.h>
#include <byteBuffer.h>
#include <timer.h>
#include <pvData.h>
#include <osiSock.h>
#include <osdSock.h>
@@ -29,12 +31,24 @@ namespace epics {
TCP, UDP, SSL
};
enum MessageCommands {
CMD_BEACON = 0, CMD_CONNECTION_VALIDATION = 1, CMD_ECHO = 2,
CMD_SEARCH = 3, CMD_INTROSPECTION_SEARCH = 5,
CMD_CREATE_CHANNEL = 7, CMD_DESTROY_CHANNEL = 8, CMD_GET = 10,
CMD_PUT = 11, CMD_PUT_GET = 12, CMD_MONITOR = 13, CMD_ARRAY = 14,
CMD_CANCEL_REQUEST = 15, CMD_PROCESS = 16, CMD_GET_FIELD = 17,
CMD_RPC = 20,
};
/**
* Interface defining transport send control.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
*/
class TransportSendControl : public epics::pvData::SerializableControl {
public:
virtual ~TransportSendControl() {
}
virtual void startMessage(int8 command, int ensureCapacity) =0;
virtual void endMessage() =0;
@@ -50,6 +64,8 @@ namespace epics {
*/
class TransportSender {
public:
virtual ~TransportSender() {}
/**
* Called by transport.
* By this call transport gives callee ownership over the buffer.
@@ -58,8 +74,7 @@ namespace epics {
* of this method.
* NOTE: these limitations allows efficient implementation.
*/
virtual void
send(epics::pvData::ByteBuffer* buffer,
virtual void send(epics::pvData::ByteBuffer* buffer,
TransportSendControl* control) =0;
virtual void lock() =0;
@@ -202,6 +217,8 @@ namespace epics {
*/
class ResponseHandler {
public:
virtual ~ResponseHandler() {}
/**
* Handle response.
* @param[in] responseFrom remote address of the responder, <code>null</code> if unknown.
@@ -213,9 +230,115 @@ namespace epics {
* Code must not manipulate buffer.
*/
virtual void
handleResponse(osiSockAddr* responseFrom, Transport* transport,
int8 version, int8 command, int payloadSize,
epics::pvData::ByteBuffer* payloadBuffer) =0;
handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize,
epics::pvData::ByteBuffer* payloadBuffer) =0;
};
/**
* Base (abstract) channel access response handler.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: AbstractResponseHandler.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
*/
class AbstractResponseHandler : public ResponseHandler {
public:
/**
* @param description
*/
AbstractResponseHandler(String description) :
_description(description), _debug(true) {
//debug = System.getProperties().containsKey(CAConstants.CAJ_DEBUG);
}
virtual ~AbstractResponseHandler() {
}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer);
protected:
/**
* Response hanlder description.
*/
String _description;
/**
* Debug flag.
*/
bool _debug;
};
/**
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: AbstractServerResponseHandler.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
*/
class AbstractServerResponseHandler : public AbstractResponseHandler {
public:
/**
* @param context
* @param description
*/
AbstractServerResponseHandler(ServerContextImpl* context,
String description) :
AbstractResponseHandler(description), _context(context) {
}
virtual ~AbstractServerResponseHandler() {
}
protected:
ServerContextImpl* _context;
};
/**
* Bad request handler.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: BadResponse.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
*/
class BadResponse : public AbstractServerResponseHandler {
public:
/**
* @param context
*/
BadResponse(ServerContextImpl* context) :
AbstractServerResponseHandler(context, "Bad request") {
}
virtual ~BadResponse() {
}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer);
};
/**
* CAS request handler - main handler which dispatches requests to appropriate handlers.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: ServerResponseHandler.java,v 1.1 2010/05/03 14:45:48 mrkraimer Exp $
*/
class ServerResponseHandler : public ResponseHandler {
public:
ServerResponseHandler(ServerContextImpl* context);
virtual ~ServerResponseHandler();
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer);
private:
static const int HANDLER_TABLE_LENGTH = 28;
/**
* Table of response handlers for each command ID.
*/
ResponseHandler** _handlerTable;
/**
* Context instance.
*/
ServerContextImpl* _context;
};
/**
@@ -225,6 +348,8 @@ namespace epics {
*/
class TransportClient {
public:
virtual ~TransportClient();
/**
* Notification of unresponsive transport (e.g. no heartbeat detected) .
*/
@@ -256,6 +381,9 @@ namespace epics {
*/
class Connector {
public:
virtual ~Connector() {
}
/**
* Connect.
* @param[in] client client requesting connection (transport).
@@ -274,6 +402,8 @@ namespace epics {
class Context {
public:
virtual ~Context() {
}
/**
* Get timer.
* @return timer.
@@ -294,6 +424,8 @@ namespace epics {
*/
class ReferenceCountingTransport {
public:
virtual ~ReferenceCountingTransport() {}
/**
* Acquires transport.
* @param client client (channel) acquiring the transport
@@ -308,6 +440,78 @@ namespace epics {
virtual void release(TransportClient* client) =0;
};
class ServerChannel {
public:
virtual ~ServerChannel() {
}
/**
* Get channel SID.
* @return channel SID.
*/
virtual int getSID() =0;
/**
* Destroy server channel.
* This method MUST BE called if overriden.
*/
virtual void destroy() =0;
};
/**
* Interface defining a transport that hosts channels.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: ChannelHostingTransport.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
*/
class ChannelHostingTransport {
public:
virtual ~ChannelHostingTransport() {
}
/**
* Get security token.
* @return security token, can be <code>null</code>.
*/
virtual epics::pvData::PVField* getSecurityToken() =0;
/**
* Preallocate new channel SID.
* @return new channel server id (SID).
*/
virtual int preallocateChannelSID() =0;
/**
* De-preallocate new channel SID.
* @param sid preallocated channel SID.
*/
virtual void depreallocateChannelSID(int sid) =0;
/**
* Register a new channel.
* @param sid preallocated channel SID.
* @param channel channel to register.
*/
virtual void registerChannel(int sid, ServerChannel* channel) =0;
/**
* Unregister a new channel (and deallocates its handle).
* @param sid SID
*/
virtual void unregisterChannel(int sid) =0;
/**
* Get channel by its SID.
* @param sid channel SID
* @return channel with given SID, <code>null</code> otherwise
*/
virtual ServerChannel* getChannel(int sid) =0;
/**
* Get channel count.
* @return channel count.
*/
virtual int getChannelCount() =0;
};
}
}

View File

@@ -0,0 +1,93 @@
/*
* responseHandlers.cpp
*
* Created on: Jan 4, 2011
* Author: Miha Vitorovic
*/
#include "remote.h"
#include "hexDump.h"
#include <byteBuffer.h>
#include <osiSock.h>
#include <errlog.h>
#include <sstream>
using std::ostringstream;
using std::hex;
using namespace epics::pvData;
namespace epics {
namespace pvAccess {
void AbstractResponseHandler::handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, ByteBuffer* payloadBuffer) {
if(_debug) {
char ipAddrStr[48];
ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr));
ostringstream prologue;
prologue<<"Message ["<<command<<", v"<<hex<<version;
prologue<<"] received from "<<ipAddrStr;
hexDump(prologue.str(), _description,
(const int8*)payloadBuffer->getArray(),
payloadBuffer->getPosition(), payloadSize);
}
}
void BadResponse::handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, ByteBuffer* payloadBuffer) {
AbstractServerResponseHandler::handleResponse(responseFrom,
transport, version, command, payloadSize, payloadBuffer);
char ipAddrStr[48];
ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr));
errlogSevPrintf(errlogInfo,
"Undecipherable message (bad response type %d) from %s.",
command, ipAddrStr);
}
ServerResponseHandler::ServerResponseHandler(ServerContextImpl* context) :
_context(context) {
BadResponse* badResponse = new BadResponse(context);
_handlerTable = new ResponseHandler*[HANDLER_TABLE_LENGTH];
_handlerTable[0] = badResponse;
}
ServerResponseHandler::~ServerResponseHandler() {
delete[] _handlerTable;
}
void ServerResponseHandler::handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, ByteBuffer* payloadBuffer) {
if(command<0||command>=HANDLER_TABLE_LENGTH) {
errlogSevPrintf(errlogMinor,
"Invalid (or unsupported) command: %d.", command);
// TODO remove debug output
ostringstream name;
name<<"Invalid CA header "<<command;
name<<" + , its payload buffer";
hexDump(name.str(), (const int8*)payloadBuffer->getArray(),
payloadBuffer->getPosition(), payloadSize);
return;
}
// delegate
_handlerTable[command]->handleResponse(responseFrom, transport,
version, command, payloadSize, payloadBuffer);
}
}
}

View File

@@ -0,0 +1,23 @@
/*
* serverContext.h
*
* Created on: Jan 4, 2011
* Author: Miha Vitorovic
*/
#ifndef SERVERCONTEXT_H_
#define SERVERCONTEXT_H_
namespace epics {
namespace pvAccess {
class ServerContextImpl {
};
}
}
#endif /* SERVERCONTEXT_H_ */

View File

@@ -134,7 +134,6 @@ namespace epics {
_elements[_putPointer] = x;
if (++_putPointer >= _size) _putPointer = 0;
return _count == 1;
}
template<class T>

View File

@@ -0,0 +1,7 @@
/*
* namedLockPattern.cpp
*/
#include "namedLockPattern.h"
//NOTE NamedLockPattern is template so implementation is in header file

View File

@@ -0,0 +1,144 @@
/*
* namedLockPattern.h
*/
#ifndef NAMEDLOCKPATTERN_H
#define NAMEDLOCKPATTERN_H
#include <map>
#include <iostream>
#include <lock.h>
#include <pvType.h>
#include "referenceCountingLock.h"
using namespace std;
using namespace epics::pvData;
namespace epics { namespace pvAccess {
/**
* NamedLockPattern
*/
template <class Key, class Compare = less<Key> >
class NamedLockPattern
{
public:
/**
* Constructor.
*/
NamedLockPattern() {};
/**
* Destructor.
*/
virtual ~NamedLockPattern() {};
/**
* Acquire synchronization lock for named object.
* @param name name of the object whose lock to acquire.
* @param msec the number of milleseconds to wait.
* An argument less than or equal to zero means not to wait at all.
* @return <code>true</code> if acquired, <code>false</code> othwerwise.
*/
bool acquireSynchronizationObject(const Key name, const int64 msec);
/**
* Release synchronization lock for named object.
* @param name name of the object whose lock to release.
*/
void releaseSynchronizationObject(const Key name);
private:
Mutex _mutex;
std::map<const Key,ReferenceCountingLock*,Compare> _namedLocks;
typename std::map<const Key,ReferenceCountingLock*,Compare>::iterator _namedLocksIter;
/**
* Release synchronization lock for named object.
* @param name name of the object whose lock to release.
* @param release set to <code>false</code> if there is no need to call release
* on synchronization lock.
*/
void releaseSynchronizationObject(const Key name,const bool release);
};
template <class Key, class Compare>
bool NamedLockPattern<Key,Compare>::acquireSynchronizationObject(const Key name, const int64 msec)
{
ReferenceCountingLock* lock;
{ //due to guard
Lock guard(&_mutex);
_namedLocksIter = _namedLocks.find(name);
// get synchronization object
// none is found, create and return new one
// increment references
if(_namedLocksIter == _namedLocks.end())
{
lock = new ReferenceCountingLock();
_namedLocks[name] = lock;
}
else
{
lock = _namedLocksIter->second;
lock->increment();
}
} // end of guarded area
bool success = lock->acquire(msec);
if(!success)
{
releaseSynchronizationObject(name, false);
}
return success;
}
template <class Key, class Compare>
void NamedLockPattern<Key,Compare>::releaseSynchronizationObject(const Key name)
{
releaseSynchronizationObject(name, true);
}
template <class Key, class Compare>
void NamedLockPattern<Key,Compare>::releaseSynchronizationObject(const Key name,const bool release)
{
Lock guard(&_mutex);
ReferenceCountingLock* lock;
_namedLocksIter = _namedLocks.find(name);
// release lock
if (_namedLocksIter != _namedLocks.end())
{
lock = _namedLocksIter->second;
// release the lock
if (release)
{
lock->release();
}
// if there only one current lock exists
// remove it from the map
if (lock->decrement() <= 0)
{
_namedLocks.erase(_namedLocksIter);
delete lock;
}
}
}
template <class Key, class Compare>
class NamedLock : private NoDefaultMethods
{
public:
NamedLock(NamedLockPattern<Key,Compare>* namedLockPattern): _namedLockPattern(namedLockPattern) {}
bool acquireSynchronizationObject(const Key name, const int64 msec) {_name = name; return _namedLockPattern->acquireSynchronizationObject(name,msec);}
~NamedLock(){_namedLockPattern->releaseSynchronizationObject(_name);}
private:
Key _name;
NamedLockPattern<Key,Compare>* _namedLockPattern;
};
}}
#endif /* NAMEDLOCKPATTERN_H */

View File

@@ -0,0 +1,85 @@
/*
* namedLockPattern.cpp
*/
#include "referenceCountingLock.h"
namespace epics { namespace pvAccess {
ReferenceCountingLock::ReferenceCountingLock(): _references(1)
{
pthread_mutexattr_t mutexAttribute;
int32 retval = pthread_mutexattr_init(&mutexAttribute);
if(retval != 0)
{
//string errMsg = "Error: pthread_mutexattr_init failed: " + string(strerror(retval));
assert(true);
}
retval = pthread_mutexattr_settype(&mutexAttribute, PTHREAD_MUTEX_RECURSIVE);
if(retval == 0)
{
retval = pthread_mutex_init(&_mutex, &mutexAttribute);
if(retval != 0)
{
//string errMsg = "Error: pthread_mutex_init failed: " + string(strerror(retval));
assert(true);
}
}
else
{
//string errMsg = "Error: pthread_mutexattr_settype failed: " + string(strerror(retval));
assert(true);
}
pthread_mutexattr_destroy(&mutexAttribute);
}
ReferenceCountingLock::~ReferenceCountingLock()
{
pthread_mutex_destroy(&_mutex);
}
bool ReferenceCountingLock::acquire(int64 msecs)
{
struct timespec deltatime;
deltatime.tv_sec = msecs / 1000;
deltatime.tv_nsec = (msecs % 1000) * 1000;
int32 retval = pthread_mutex_timedlock(&_mutex, &deltatime);
if(retval == 0)
{
return true;
}
return false;
}
void ReferenceCountingLock::release()
{
int retval = pthread_mutex_unlock(&_mutex);
if(retval != 0)
{
string errMsg = "Error: pthread_mutex_unlock failed: " + string(strerror(retval));
//TODO do something?
}
}
int ReferenceCountingLock::increment()
{
//TODO does it really has to be atomic?
return ++_references;
// commented because linking depends on specific version of glibc library
// on i386 target
//return __sync_add_and_fetch(&_references,1);
}
int ReferenceCountingLock::decrement()
{
//TODO does it really has to be atomic?
return --_references;
// commented because linking depends on specific version of glibc library
// on i386 target
//return __sync_sub_and_fetch(&_references,1);
}
}}

View File

@@ -0,0 +1,74 @@
/*
* referenceCountingLock.h
*/
#ifndef REFERENCECOUNTINGLOCK_H
#define REFERENCECOUNTINGLOCK_H
#include <map>
#include <iostream>
#include <pthread.h>
#include <string.h>
#include <errno.h>
#include <pvType.h>
#include <epicsAssert.h>
using namespace std;
using namespace epics::pvData;
namespace epics { namespace pvAccess {
/**
* Reference counting mutex implementation w/ deadlock detection.
* Synchronization helper class used (intended for use) for activation/deactivation synchronization.
* This class enforces <code>attempt</code> method of acquiring the locks to prevent deadlocks.
* Class also offers reference counting.
* (NOTE: automatic lock counting was not implemented due to imperfect usage.)
*
*/
class ReferenceCountingLock
{
public:
/**
* Constructor of <code>ReferenceCountingLock</code>.
* After construction lock is free and reference count equals <code>1</code>.
*/
ReferenceCountingLock();
/**
* Destructor of <code>ReferenceCountingLock</code>.
*/
virtual ~ReferenceCountingLock();
/**
* Attempt to acquire lock.
*
* @param msecs the number of milleseconds to wait.
* An argument less than or equal to zero means not to wait at all.
* @return <code>true</code> if acquired, <code>false</code> otherwise.
*/
bool acquire(int64 msecs);
/**
* Release previously acquired lock.
*/
void release();
/**
* Increment number of references.
*
* @return number of references.
*/
int increment();
/**
* Decrement number of references.
*
* @return number of references.
*/
int decrement();
private:
int _references;
pthread_mutex_t _mutex;
};
}}
#endif /* NAMEDLOCKPATTERN_H */

View File

@@ -18,6 +18,9 @@ PROD_HOST += testBeaconEmitter
testBeaconEmitter_SRCS += testBeaconEmitter.cpp
testBeaconEmitter_LIBS += pvData pvAccess Com
PROD_HOST += testBeaconHandler
testBeaconHandler_SRCS += testBeaconHandler.cpp
testBeaconHandler_LIBS += pvData pvAccess Com
include $(TOP)/configure/RULES
#----------------------------------------

View File

@@ -42,9 +42,9 @@ void testBeaconEmitter()
osiSockAddr* addr = new osiSockAddr;
addr->ia.sin_family = AF_INET;
addr->ia.sin_port = htons(5067);
if(inet_aton("92.50.75.255",&addr->ia.sin_addr)==0) {
cout<<"error in inet_aton()"<<endl;
return;
if(inet_aton("255.255.255.255",&addr->ia.sin_addr)==0)
{
assert(false);
}
broadcastAddresses->push_back(addr);
BlockingUDPConnector connector(true, broadcastAddresses, true);

View File

@@ -6,6 +6,7 @@
#include "blockingUDP.h"
#include "beaconHandler.h"
#include "inetAddressUtil.h"
#include "introspectionRegistry.h"
#include <osiSock.h>
@@ -14,32 +15,105 @@
using namespace epics::pvAccess;
using namespace epics::pvData;
using namespace std;
void decodeFromIPv6Address(ByteBuffer* buffer, osiSockAddr* address)
{
// IPv4 compatible IPv6 address
// first 80-bit are 0
buffer->getLong();
buffer->getShort();
// next 16-bits are 1
buffer->getShort();
// following IPv4 address in big-endian (network) byte order
in_addr_t ipv4Addr = 0;
ipv4Addr |= (uint32)buffer->getByte() << 24;
ipv4Addr |= (uint32)buffer->getByte() << 16;
ipv4Addr |= (uint32)buffer->getByte() << 8;
ipv4Addr |= (uint32)buffer->getByte() << 0;
address->ia.sin_addr.s_addr = ipv4Addr;
}
class BeaconResponseHandler : public ResponseHandler
{
public:
BeaconResponseHandler()
{
_pvDataCreate = getPVDataCreate();
}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command, int payloadSize,
ByteBuffer* payloadBuffer)
{
cout << "DummyResponseHandler::handleResponse" << endl;
cout << "BeaconResponseHandler::handleResponse" << endl;
// reception timestamp
TimeStamp timestamp;
timestamp.getCurrent();
//TODO
//super.handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
transport->ensureData((2*sizeof(int16)+2*sizeof(int32)+128)/sizeof(int8));
const int32 sequentalID = payloadBuffer->getShort() & 0x0000FFFF;
const TimeStamp startupTimestamp(payloadBuffer->getInt() & 0x00000000FFFFFFFFL,(int32)(payloadBuffer->getInt() & 0x00000000FFFFFFFFL));
// 128-bit IPv6 address
osiSockAddr address;
decodeFromIPv6Address(payloadBuffer, &address);
// get port
const int32 port = payloadBuffer->getShort() & 0xFFFF;
address.ia.sin_port = ntohs(port);
// accept given address if explicitly specified by sender
if (!ipv4AddressToInt(address))
{
responseFrom->ia.sin_port = port;
}
else
{
responseFrom->ia.sin_port = port;
responseFrom->ia.sin_addr.s_addr = address.ia.sin_addr.s_addr;
}
//org.epics.ca.client.impl.remote.BeaconHandler beaconHandler = context.getBeaconHandler(responseFrom);
// currently we care only for servers used by this context
//if (beaconHandler == null)
// return;
// extra data
PVFieldPtr data = NULL;
const FieldConstPtr field = IntrospectionRegistry::deserializeFull(payloadBuffer, transport);
if (field != NULL)
{
data = _pvDataCreate->createPVField(NULL, field);
data->deserialize(payloadBuffer, transport);
}
// notify beacon handler
//beaconHandler.beaconNotify(responseFrom, version, timestamp, startupTimestamp, sequentalID, data);
}
private:
PVDataCreate* _pvDataCreate;
BeaconHandler* _beaconHandler;
};
void testBeaconHandler()
{
BeacondResponseHandler brh;
BeaconResponseHandler brh;
BlockingUDPConnector connector(false, NULL, true);
DummyClientContext context;
osiSockAddr bindAddr;
bindAddr.ia.sin_family = AF_INET;
bindAddr.ia.sin_port = htons(5067);
bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY);
Transport* transport = connector.connect(NULL, &brh, &bindAddr, 1, 50);
((BlockingUDPTransport*)transport)->start();
(static_cast<BlockingUDPTransport*>(transport))->start();
while(1) sleep(1);

View File

@@ -34,6 +34,10 @@ PROD_HOST += transportRegisterTest
transportRegisterTest_SRCS += transportRegistryTest.cpp
transportRegisterTest_LIBS += pvAccess Com pvData
PROD_HOST += namedLockPatternTest
namedLockPatternTest_SRCS += namedLockPatternTest.cpp
namedLockPatternTest_LIBS += pvAccess Com pvData
include $(TOP)/configure/RULES
#----------------------------------------
# ADD RULES AFTER THIS LINE

View File

@@ -0,0 +1,216 @@
/*
* namedLockPatternTest.cpp
*
*/
#include "namedLockPattern.h"
#include "showConstructDestruct.h"
#include <epicsAssert.h>
#include <iostream>
#include <osiSock.h>
using namespace epics::pvAccess;
using namespace std;
void testIntLockPattern()
{
int64 timeout = 100;
NamedLockPattern<int> namedLockPattern;
int name1 = 1;
assert(namedLockPattern.acquireSynchronizationObject(name1,timeout));
assert(namedLockPattern.acquireSynchronizationObject(name1,timeout));
namedLockPattern.releaseSynchronizationObject(name1);
namedLockPattern.releaseSynchronizationObject(name1);
int name2 = 2;
assert(namedLockPattern.acquireSynchronizationObject(name2,timeout));
namedLockPattern.releaseSynchronizationObject(name2);
}
void testIntPtrLockPattern()
{
int64 timeout = 100;
NamedLockPattern<int*> namedLockPattern;
int name1 = 1;
assert(namedLockPattern.acquireSynchronizationObject(&name1,timeout));
assert(namedLockPattern.acquireSynchronizationObject(&name1,timeout));
namedLockPattern.releaseSynchronizationObject(&name1);
namedLockPattern.releaseSynchronizationObject(&name1);
int name2 = 2;
assert(namedLockPattern.acquireSynchronizationObject(&name2,timeout));
namedLockPattern.releaseSynchronizationObject(&name2);
}
struct cmp_str
{
bool operator()(char const *a, char const *b)
{
return strcmp(a, b) < 0;
}
};
void testCharPtrLockPattern()
{
int64 timeout = 100;
NamedLockPattern<const char*,cmp_str> namedLockPattern;
string name1 = "lojze";
assert(namedLockPattern.acquireSynchronizationObject(name1.c_str(),timeout));
assert(namedLockPattern.acquireSynchronizationObject(name1.c_str(),timeout));
namedLockPattern.releaseSynchronizationObject(name1.c_str());
namedLockPattern.releaseSynchronizationObject(name1.c_str());
string name2 = "francka";
assert(namedLockPattern.acquireSynchronizationObject(name2.c_str(),timeout));
namedLockPattern.releaseSynchronizationObject(name2.c_str());
}
struct comp_osiSockAddrPtr
{
bool operator()(osiSockAddr const *a, osiSockAddr const *b)
{
if (a->sa.sa_family < b->sa.sa_family) return true;
if ((a->sa.sa_family == b->sa.sa_family) && (a->ia.sin_addr.s_addr < b->ia.sin_addr.s_addr )) return true;
if ((a->sa.sa_family == b->sa.sa_family) && (a->ia.sin_addr.s_addr == b->ia.sin_addr.s_addr ) && ( a->ia.sin_port < b->ia.sin_port )) return true;
return false;
}
};
void testOsiSockAddrLockPattern()
{
int64 timeout = 10000;
NamedLockPattern<const osiSockAddr*,comp_osiSockAddrPtr> namedLockPattern;
osiSockAddr name1;
name1.ia.sin_addr.s_addr = 1;
name1.ia.sin_port = 1;
name1.ia.sin_family = AF_INET;
assert(namedLockPattern.acquireSynchronizationObject(&name1,timeout));
assert(namedLockPattern.acquireSynchronizationObject(&name1,timeout));
namedLockPattern.releaseSynchronizationObject(&name1);
namedLockPattern.releaseSynchronizationObject(&name1);
osiSockAddr name2;
name2.ia.sin_addr.s_addr = 1;
name2.ia.sin_port = 1;
name2.ia.sin_family = AF_INET;
NamedLock<const osiSockAddr*,comp_osiSockAddrPtr> namedGuard(&namedLockPattern);
assert(namedGuard.acquireSynchronizationObject(&name1,timeout));
}
struct comp_osiSockAddr
{
bool operator()(osiSockAddr const a, osiSockAddr const b)
{
if (a.sa.sa_family < b.sa.sa_family) return true;
if ((a.sa.sa_family == b.sa.sa_family) && (a.ia.sin_addr.s_addr < b.ia.sin_addr.s_addr )) return true;
if ((a.sa.sa_family == b.sa.sa_family) && (a.ia.sin_addr.s_addr == b.ia.sin_addr.s_addr ) && ( a.ia.sin_port < b.ia.sin_port )) return true;
return false;
}
};
void* testWorker1(void* p)
{
int32 timeout = 1000;
const int32 max = 1000;
NamedLockPattern<osiSockAddr,comp_osiSockAddr>* namedLockPattern = (NamedLockPattern<osiSockAddr,comp_osiSockAddr>*)p;
for(int32 i = 0 ; i < max; i = i +2)
{
osiSockAddr addr;
addr.ia.sin_addr.s_addr = i;
addr.ia.sin_port = i;
addr.ia.sin_family = AF_INET;
NamedLock<osiSockAddr,comp_osiSockAddr> namedGuard(namedLockPattern);
assert(namedGuard.acquireSynchronizationObject(addr,timeout));
usleep(1);
}
//this one takes a lock, thread 2 will be slower and will get timeout
{ //due to namedGuard
osiSockAddr addr;
addr.ia.sin_addr.s_addr = 1;
addr.ia.sin_port = 1;
addr.ia.sin_family = AF_INET;
NamedLock<osiSockAddr,comp_osiSockAddr> namedGuard(namedLockPattern);
assert(namedGuard.acquireSynchronizationObject(addr,timeout));
sleep(5);
}
return NULL;
}
void* testWorker2(void* p)
{
int32 timeout = 1000;
const int32 max = 1000;
NamedLockPattern<osiSockAddr,comp_osiSockAddr>* namedLockPattern = (NamedLockPattern<osiSockAddr,comp_osiSockAddr>*)p;
for(int32 i = 1 ; i < max; i = i + 2)
{
osiSockAddr addr;
addr.ia.sin_addr.s_addr = i;
addr.ia.sin_port = i;
addr.ia.sin_family = AF_INET;
NamedLock<osiSockAddr,comp_osiSockAddr> namedGuard(namedLockPattern);
assert(namedGuard.acquireSynchronizationObject(addr,timeout));
usleep(1);
}
//this thread sleeps a while and gets timeout on lock
{
sleep(1);
osiSockAddr addr;
addr.ia.sin_addr.s_addr = 1;
addr.ia.sin_port = 1;
addr.ia.sin_family = AF_INET;
NamedLock<osiSockAddr,comp_osiSockAddr> namedGuard(namedLockPattern);
assert(!namedGuard.acquireSynchronizationObject(addr,timeout));
}
return NULL;
}
int main(int argc, char *argv[])
{
testIntLockPattern();
testIntPtrLockPattern();
testCharPtrLockPattern();
testOsiSockAddrLockPattern();
pthread_t _worker1Id;
pthread_t _worker2Id;
NamedLockPattern<osiSockAddr,comp_osiSockAddr> namedLockPattern;
//create two threads
int32 retval = pthread_create(&_worker1Id, NULL, testWorker1, &namedLockPattern);
if(retval != 0)
{
assert(true);
}
retval = pthread_create(&_worker2Id, NULL, testWorker2, &namedLockPattern);
if(retval != 0)
{
assert(true);
}
//wait for threads
retval = pthread_join(_worker1Id, NULL);
if(retval != 0)
{
assert(true);
}
retval = pthread_join(_worker2Id, NULL);
if(retval != 0)
{
assert(true);
}
getShowConstructDestruct()->constuctDestructTotals(stdout);
return 0;
}