merge
This commit is contained in:
@@ -98,13 +98,14 @@ SearchTimer::SearchTimer(ChannelSearchManager* _chanSearchManager, int32 timerIn
|
||||
_requestPendingChannelsMutex(Mutex()),
|
||||
_mutex(Mutex())
|
||||
{
|
||||
|
||||
_timerNode = new TimerNode(this);
|
||||
}
|
||||
|
||||
SearchTimer::~SearchTimer()
|
||||
{
|
||||
if(_requestPendingChannels) delete _requestPendingChannels;
|
||||
if(_responsePendingChannels) delete _responsePendingChannels;
|
||||
if(_timerNode) delete _timerNode;
|
||||
}
|
||||
|
||||
void SearchTimer::shutdown()
|
||||
|
||||
@@ -29,16 +29,53 @@ namespace epics { namespace pvAccess {
|
||||
/**
|
||||
* SearchInstance.
|
||||
*/
|
||||
//TODO document
|
||||
class SearchInstance {
|
||||
public:
|
||||
/**
|
||||
* Destructor
|
||||
*/
|
||||
virtual ~SearchInstance() {};
|
||||
/**
|
||||
* Return channel ID.
|
||||
*
|
||||
* @return channel ID.
|
||||
*/
|
||||
virtual pvAccessID getChannelID() = 0;
|
||||
/**
|
||||
* Return channel name.
|
||||
*
|
||||
* @return channel channel name.
|
||||
*/
|
||||
virtual String getChannelName() = 0;
|
||||
/**
|
||||
* Removes the owner of this search instance.
|
||||
*/
|
||||
virtual void unsetListOwnership() = 0;
|
||||
/**
|
||||
* Adds this search instance into the provided list and sets it as the owner of this search instance.
|
||||
*
|
||||
* @param newOwner a list to which this search instance is added.
|
||||
* @param ownerMutex mutex belonging to the newOwner list. The mutex will be locked beofe any modification
|
||||
* to the list will be done.
|
||||
* @param index index of the owner (which is search timer index).
|
||||
*
|
||||
* @throws BaseException if the ownerMutex is NULL.
|
||||
*/
|
||||
virtual void addAndSetListOwnership(ArrayFIFO<SearchInstance*>* newOwner, Mutex* ownerMutex, int32 index) = 0;
|
||||
/**
|
||||
* Removes this search instance from the owner list and also removes the list as the owner of this
|
||||
* search instance.
|
||||
*
|
||||
* @throws BaseException if the ownerMutex is NULL.
|
||||
*/
|
||||
virtual void removeAndUnsetListOwnership() = 0;
|
||||
/**
|
||||
* Returns the index of the owner.
|
||||
*/
|
||||
virtual int32 getOwnerIndex() = 0;
|
||||
/**
|
||||
* Generates request message.
|
||||
*/
|
||||
virtual bool generateSearchRequestMessage(ByteBuffer* requestMessage, TransportSendControl* control) = 0;
|
||||
|
||||
/**
|
||||
|
||||
@@ -74,11 +74,12 @@ namespace epics {
|
||||
|
||||
enum MessageCommands {
|
||||
CMD_BEACON = 0, CMD_CONNECTION_VALIDATION = 1, CMD_ECHO = 2,
|
||||
CMD_SEARCH = 3, CMD_INTROSPECTION_SEARCH = 5,
|
||||
CMD_CREATE_CHANNEL = 7, CMD_DESTROY_CHANNEL = 8, CMD_GET = 10,
|
||||
CMD_PUT = 11, CMD_PUT_GET = 12, CMD_MONITOR = 13, CMD_ARRAY = 14,
|
||||
CMD_SEARCH = 3, CMD_SEARCH_RESPONSE = 4,
|
||||
CMD_INTROSPECTION_SEARCH = 5, CMD_CREATE_CHANNEL = 7,
|
||||
CMD_DESTROY_CHANNEL = 8, CMD_GET = 10, CMD_PUT = 11,
|
||||
CMD_PUT_GET = 12, CMD_MONITOR = 13, CMD_ARRAY = 14,
|
||||
CMD_CANCEL_REQUEST = 15, CMD_PROCESS = 16, CMD_GET_FIELD = 17,
|
||||
CMD_RPC = 20,
|
||||
CMD_MESSAGE = 18, CMD_MULTIPLE_DATA = 19, CMD_RPC = 20,
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#include <ellLib.h>
|
||||
#include <epicsAssert.h>
|
||||
#include <epicsException.h>
|
||||
#include <errlog.h>
|
||||
|
||||
/* standard */
|
||||
#include <vector>
|
||||
@@ -22,6 +23,8 @@
|
||||
#include <cstdlib>
|
||||
#include <sstream>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <net/if.h>
|
||||
|
||||
using namespace std;
|
||||
using namespace epics::pvData;
|
||||
@@ -29,81 +32,128 @@ using namespace epics::pvData;
|
||||
namespace epics {
|
||||
namespace pvAccess {
|
||||
|
||||
/* copied from EPICS v3 ca/iocinf.cpp
|
||||
* removeDuplicateAddresses ()
|
||||
/* port of osiSockDiscoverBroadcastAddresses() in
|
||||
* epics/base/src/libCom/osi/os/default/osdNetIntf.c
|
||||
*/
|
||||
void removeDuplicateAddresses(ELLLIST *pDestList, ELLLIST *pSrcList,
|
||||
int silent) {
|
||||
ELLNODE *pRawNode;
|
||||
|
||||
while((pRawNode = ellGet(pSrcList))) {
|
||||
STATIC_ASSERT(offsetof(osiSockAddrNode, node)==0);
|
||||
osiSockAddrNode *pNode =
|
||||
reinterpret_cast<osiSockAddrNode *> (pRawNode);
|
||||
osiSockAddrNode *pTmpNode;
|
||||
|
||||
if(pNode->addr.sa.sa_family==AF_INET) {
|
||||
|
||||
pTmpNode = (osiSockAddrNode *)ellFirst (pDestList); // X aCC 749
|
||||
while(pTmpNode) {
|
||||
if(pTmpNode->addr.sa.sa_family==AF_INET) {
|
||||
if(pNode->addr.ia.sin_addr.s_addr
|
||||
==pTmpNode->addr.ia.sin_addr.s_addr
|
||||
&&pNode->addr.ia.sin_port
|
||||
==pTmpNode->addr.ia.sin_port) {
|
||||
if(!silent) {
|
||||
char buf[64];
|
||||
ipAddrToDottedIP(&pNode->addr.ia, buf,
|
||||
sizeof(buf));
|
||||
fprintf(
|
||||
stderr,
|
||||
"Warning: Duplicate EPICS CA Address list entry \"%s\" discarded\n",
|
||||
buf);
|
||||
}
|
||||
free(pNode);
|
||||
pNode = NULL;
|
||||
break;
|
||||
}
|
||||
}
|
||||
pTmpNode = (osiSockAddrNode *)ellNext (&pTmpNode->node); // X aCC 749
|
||||
}
|
||||
if(pNode) {
|
||||
ellAdd(pDestList, &pNode->node);
|
||||
}
|
||||
}
|
||||
else {
|
||||
ellAdd(pDestList, &pNode->node);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
InetAddrVector* getBroadcastAddresses(SOCKET sock) {
|
||||
ELLLIST bcastList;
|
||||
ELLLIST tmpList;
|
||||
osiSockAddr addr;
|
||||
static const unsigned nelem = 100;
|
||||
int status;
|
||||
struct ifconf ifconf;
|
||||
struct ifreq* pIfreqList;
|
||||
osiSockAddr* pNewNode;
|
||||
|
||||
ellInit ( &bcastList ); // X aCC 392
|
||||
ellInit ( &tmpList ); // X aCC 392
|
||||
InetAddrVector* retVector = new InetAddrVector();
|
||||
|
||||
addr.ia.sin_family = AF_UNSPEC;
|
||||
osiSockDiscoverBroadcastAddresses(&bcastList, sock, &addr);
|
||||
removeDuplicateAddresses(&tmpList, &bcastList, 1);
|
||||
// forcePort ( &bcastList, port ); // if needed copy from ca/iocinf.cpp
|
||||
|
||||
int size = ellCount(&bcastList );
|
||||
InetAddrVector* retVector = new InetAddrVector(size);
|
||||
|
||||
ELLNODE *pRawNode;
|
||||
|
||||
while((pRawNode = ellGet(&tmpList))) {
|
||||
osiSockAddrNode *pNode =
|
||||
reinterpret_cast<osiSockAddrNode *> (pRawNode);
|
||||
osiSockAddr* posa = new osiSockAddr;
|
||||
memcpy(posa, &(pNode->addr), sizeof(osiSockAddr));
|
||||
retVector->push_back(posa);
|
||||
free(pNode); // using free because it is allocated by calloc
|
||||
/*
|
||||
* use pool so that we avoid using too much stack space
|
||||
*
|
||||
* nelem is set to the maximum interfaces
|
||||
* on one machine here
|
||||
*/
|
||||
pIfreqList = new ifreq[nelem];
|
||||
if(!pIfreqList) {
|
||||
errlogSevPrintf(errlogMajor,
|
||||
"getBroadcastAddresses(): no memory to complete request");
|
||||
return retVector;
|
||||
}
|
||||
|
||||
// get number of interfaces
|
||||
ifconf.ifc_len = nelem*sizeof(ifreq);
|
||||
ifconf.ifc_req = pIfreqList;
|
||||
status = ioctl(sock, SIOCGIFCONF, &ifconf);
|
||||
if(status<0||ifconf.ifc_len==0) {
|
||||
errlogSevPrintf(
|
||||
errlogMinor,
|
||||
"getBroadcastAddresses(): unable to fetch network interface configuration");
|
||||
delete[] pIfreqList;
|
||||
return retVector;
|
||||
}
|
||||
|
||||
errlogPrintf("Found %d interfaces\n", ifconf.ifc_len);
|
||||
|
||||
for(int i = 0; i<=ifconf.ifc_len; i++) {
|
||||
/*
|
||||
* If its not an internet interface then dont use it
|
||||
*/
|
||||
if(pIfreqList[i].ifr_addr.sa_family!=AF_INET) continue;
|
||||
|
||||
status = ioctl(sock, SIOCGIFFLAGS, &pIfreqList[i]);
|
||||
if(status) {
|
||||
errlogSevPrintf(
|
||||
errlogMinor,
|
||||
"getBroadcastAddresses(): net intf flags fetch for \"%s\" failed",
|
||||
pIfreqList[i].ifr_name);
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* dont bother with interfaces that have been disabled
|
||||
*/
|
||||
if(!(pIfreqList[i].ifr_flags&IFF_UP)) continue;
|
||||
|
||||
/*
|
||||
* dont use the loop back interface
|
||||
*/
|
||||
if(pIfreqList[i].ifr_flags&IFF_LOOPBACK) continue;
|
||||
|
||||
pNewNode = new osiSockAddr;
|
||||
if(pNewNode==NULL) {
|
||||
errlogSevPrintf(errlogMajor,
|
||||
"getBroadcastAddresses(): no memory available for configuration");
|
||||
delete[] pIfreqList;
|
||||
return retVector;
|
||||
}
|
||||
|
||||
/*
|
||||
* If this is an interface that supports
|
||||
* broadcast fetch the broadcast address.
|
||||
*
|
||||
* Otherwise if this is a point to point
|
||||
* interface then use the destination address.
|
||||
*
|
||||
* Otherwise CA will not query through the
|
||||
* interface.
|
||||
*/
|
||||
if(pIfreqList[i].ifr_flags&IFF_BROADCAST) {
|
||||
status = ioctl(sock, SIOCGIFBRDADDR, &pIfreqList[i]);
|
||||
if(status) {
|
||||
errlogSevPrintf(
|
||||
errlogMinor,
|
||||
"getBroadcastAddresses(): net intf \"%s\": bcast addr fetch fail",
|
||||
pIfreqList->ifr_name);
|
||||
delete pNewNode;
|
||||
continue;
|
||||
}
|
||||
pNewNode->sa = pIfreqList[i].ifr_broadaddr;
|
||||
}
|
||||
#ifdef IFF_POINTOPOINT
|
||||
else if(pIfreqList->ifr_flags&IFF_POINTOPOINT) {
|
||||
status = ioctl(sock, SIOCGIFDSTADDR, &pIfreqList[i]);
|
||||
if(status) {
|
||||
errlogSevPrintf(
|
||||
errlogMinor,
|
||||
"getBroadcastAddresses(): net intf \"%s\": pt to pt addr fetch fail",
|
||||
pIfreqList[i].ifr_name);
|
||||
delete pNewNode;
|
||||
continue;
|
||||
}
|
||||
pNewNode->sa = pIfreqList[i].ifr_dstaddr;
|
||||
}
|
||||
#endif
|
||||
else {
|
||||
errlogSevPrintf(
|
||||
errlogMinor,
|
||||
"getBroadcastAddresses(): net intf \"%s\": not point to point or bcast?",
|
||||
pIfreqList[i].ifr_name);
|
||||
delete pNewNode;
|
||||
continue;
|
||||
}
|
||||
|
||||
retVector->push_back(pNewNode);
|
||||
}
|
||||
|
||||
delete[] pIfreqList;
|
||||
|
||||
return retVector;
|
||||
}
|
||||
|
||||
|
||||
@@ -33,9 +33,7 @@ namespace epics {
|
||||
/**
|
||||
* returns a vector containing all the IPv4 broadcast addresses
|
||||
* on this machine. IPv6 doesn't have a local broadcast address.
|
||||
*
|
||||
* TODO Check implementation/rewrite this
|
||||
*/
|
||||
*/
|
||||
InetAddrVector* getBroadcastAddresses(SOCKET sock);
|
||||
|
||||
/**
|
||||
|
||||
@@ -34,10 +34,15 @@ public:
|
||||
virtual ~NamedLockPattern() {};
|
||||
/**
|
||||
* Acquire synchronization lock for named object.
|
||||
*
|
||||
* NOTE: Argument msecs is currently not supported due to
|
||||
* Darwin OS not supporting pthread_mutex_timedlock. May be changed in the future.
|
||||
*
|
||||
* @param name name of the object whose lock to acquire.
|
||||
* @param msec the number of milleseconds to wait.
|
||||
* An argument less than or equal to zero means not to wait at all.
|
||||
* @return <code>true</code> if acquired, <code>false</code> othwerwise.
|
||||
* NOTE: currently this routine always returns true. Look above for explanation.
|
||||
*/
|
||||
bool acquireSynchronizationObject(const Key name, const int64 msec);
|
||||
/**
|
||||
|
||||
@@ -8,7 +8,7 @@ namespace epics { namespace pvAccess {
|
||||
|
||||
ReferenceCountingLock::ReferenceCountingLock(): _references(1)
|
||||
{
|
||||
pthread_mutexattr_t mutexAttribute;
|
||||
/* pthread_mutexattr_t mutexAttribute;
|
||||
int32 retval = pthread_mutexattr_init(&mutexAttribute);
|
||||
if(retval != 0)
|
||||
{
|
||||
@@ -31,23 +31,29 @@ ReferenceCountingLock::ReferenceCountingLock(): _references(1)
|
||||
assert(false);
|
||||
}
|
||||
|
||||
pthread_mutexattr_destroy(&mutexAttribute);
|
||||
pthread_mutexattr_destroy(&mutexAttribute);*/
|
||||
}
|
||||
|
||||
ReferenceCountingLock::~ReferenceCountingLock()
|
||||
{
|
||||
pthread_mutex_destroy(&_mutex);
|
||||
// pthread_mutex_destroy(&_mutex);
|
||||
}
|
||||
|
||||
bool ReferenceCountingLock::acquire(int64 msecs)
|
||||
{
|
||||
#ifdef darwin
|
||||
// timedlock not supported by Darwin OS
|
||||
return (pthread_mutex_lock(&_mutex) == 0);
|
||||
#else
|
||||
struct timespec deltatime;
|
||||
deltatime.tv_sec = msecs / 1000;
|
||||
deltatime.tv_nsec = (msecs % 1000) * 1000;
|
||||
_mutex.lock();
|
||||
return true;
|
||||
/* struct timespec deltatime;
|
||||
if(msecs > 0)
|
||||
{
|
||||
deltatime.tv_sec = msecs / 1000;
|
||||
deltatime.tv_nsec = (msecs % 1000) * 1000;
|
||||
}
|
||||
else
|
||||
{
|
||||
deltatime.tv_sec = 0;
|
||||
deltatime.tv_nsec = 0;
|
||||
}
|
||||
|
||||
int32 retval = pthread_mutex_timedlock(&_mutex, &deltatime);
|
||||
if(retval == 0)
|
||||
@@ -55,35 +61,32 @@ bool ReferenceCountingLock::acquire(int64 msecs)
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
#endif
|
||||
*/
|
||||
}
|
||||
|
||||
void ReferenceCountingLock::release()
|
||||
{
|
||||
int retval = pthread_mutex_unlock(&_mutex);
|
||||
_mutex.unlock();
|
||||
/* int retval = pthread_mutex_unlock(&_mutex);
|
||||
if(retval != 0)
|
||||
{
|
||||
//string errMsg = "Error: pthread_mutex_unlock failed: " + string(strerror(retval));
|
||||
//TODO do something?
|
||||
}
|
||||
}*/
|
||||
}
|
||||
|
||||
int ReferenceCountingLock::increment()
|
||||
{
|
||||
//TODO does it really has to be atomic?
|
||||
return ++_references;
|
||||
// commented because linking depends on specific version of glibc library
|
||||
// on i386 target
|
||||
//return __sync_add_and_fetch(&_references,1);
|
||||
Lock guard(&_countMutex);
|
||||
++_references;
|
||||
return _references;
|
||||
}
|
||||
|
||||
int ReferenceCountingLock::decrement()
|
||||
{
|
||||
//TODO does it really has to be atomic?
|
||||
return --_references;
|
||||
// commented because linking depends on specific version of glibc library
|
||||
// on i386 target
|
||||
//return __sync_sub_and_fetch(&_references,1);
|
||||
Lock guard(&_countMutex);
|
||||
--_references;
|
||||
return _references;
|
||||
}
|
||||
|
||||
}}
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include <string.h>
|
||||
#include <errno.h>
|
||||
|
||||
#include <lock.h>
|
||||
#include <pvType.h>
|
||||
#include <epicsAssert.h>
|
||||
|
||||
@@ -42,9 +43,15 @@ public:
|
||||
/**
|
||||
* Attempt to acquire lock.
|
||||
*
|
||||
* NOTE: Argument msecs is currently not supported due to
|
||||
* Darwin OS not supporting pthread_mutex_timedlock. May be changed in the future.
|
||||
*
|
||||
* @param msecs the number of milleseconds to wait.
|
||||
* An argument less than or equal to zero means not to wait at all.
|
||||
*
|
||||
* @return <code>true</code> if acquired, <code>false</code> otherwise.
|
||||
* NOTE: currently this routine always returns true. Look above for explanation.
|
||||
*
|
||||
*/
|
||||
bool acquire(int64 msecs);
|
||||
/**
|
||||
@@ -65,7 +72,9 @@ public:
|
||||
int decrement();
|
||||
private:
|
||||
int _references;
|
||||
pthread_mutex_t _mutex;
|
||||
Mutex _mutex;
|
||||
Mutex _countMutex;
|
||||
//pthread_mutex_t _mutex;
|
||||
|
||||
};
|
||||
|
||||
|
||||
@@ -5,12 +5,36 @@
|
||||
using namespace epics::pvData;
|
||||
using namespace epics::pvAccess;
|
||||
|
||||
class TestSearcInstance : public BaseSearchInstance
|
||||
{
|
||||
public:
|
||||
TestSearcInstance(string channelName, pvAccessID channelID): _channelID(channelID), _channelName(channelName) {}
|
||||
pvAccessID getChannelID() { return _channelID;};
|
||||
string getChannelName() {return _channelName;};
|
||||
void searchResponse(int8 minorRevision, osiSockAddr* serverAddress) {};
|
||||
private:
|
||||
pvAccessID _channelID;
|
||||
string _channelName;
|
||||
};
|
||||
|
||||
int main(int argc,char *argv[])
|
||||
{
|
||||
//ClientContextImpl* context = new ClientContextImpl();
|
||||
Context* context = 0; // TODO will crash...
|
||||
ChannelSearchManager* manager = new ChannelSearchManager(context);
|
||||
|
||||
// context->destroy();
|
||||
TestSearcInstance* chan1 = new TestSearcInstance("chan1", 1);
|
||||
manager->registerChannel(chan1);
|
||||
|
||||
sleep(3);
|
||||
|
||||
manager->cancel();
|
||||
|
||||
//context->destroy();
|
||||
getShowConstructDestruct()->constuctDestructTotals(stdout);
|
||||
|
||||
//if(chan1) delete chan1;
|
||||
if(manager) delete manager;
|
||||
if(context) delete context;
|
||||
return(0);
|
||||
}
|
||||
|
||||
@@ -9,7 +9,10 @@
|
||||
|
||||
#include <byteBuffer.h>
|
||||
#include <pvType.h>
|
||||
|
||||
#include <epicsAssert.h>
|
||||
#include <osiSock.h>
|
||||
|
||||
#include <iostream>
|
||||
#include <cstring>
|
||||
|
||||
@@ -123,8 +126,15 @@ int main(int argc, char *argv[]) {
|
||||
assert(strncmp(buff->getArray(), src, 16)==0);
|
||||
cout<<"\nPASSED!\n";
|
||||
|
||||
// TODO add test for 'getBroadcastAddresses'
|
||||
SOCKET socket = epicsSocketCreate(AF_INET, SOCK_STREAM, IPPROTO_TCP);
|
||||
InetAddrVector* broadcasts = getBroadcastAddresses(socket);
|
||||
cout<<"Broadcast addresses: "<<broadcasts->size()<<endl;
|
||||
for(size_t i = 0; i<broadcasts->size(); i++) {
|
||||
cout<<"Broadcast address: ";
|
||||
cout<<inetAddressToString(broadcasts->at(i), false)<<endl;
|
||||
}
|
||||
|
||||
delete broadcasts;
|
||||
delete addr;
|
||||
|
||||
return 0;
|
||||
|
||||
@@ -156,7 +156,7 @@ void* testWorker2(void* p)
|
||||
assert(namedGuard.acquireSynchronizationObject(addr,timeout));
|
||||
usleep(1);
|
||||
}
|
||||
#ifndef darwin
|
||||
|
||||
//this thread sleeps a while and gets timeout on lock
|
||||
{
|
||||
sleep(1);
|
||||
@@ -165,9 +165,11 @@ void* testWorker2(void* p)
|
||||
addr.ia.sin_port = 1;
|
||||
addr.ia.sin_family = AF_INET;
|
||||
NamedLock<osiSockAddr,comp_osiSockAddr> namedGuard(namedLockPattern);
|
||||
assert(!namedGuard.acquireSynchronizationObject(addr,timeout));
|
||||
//TODO swap next two lines this if timed lock used
|
||||
//assert(!namedGuard.acquireSynchronizationObject(addr,timeout));
|
||||
assert(namedGuard.acquireSynchronizationObject(addr,timeout));
|
||||
}
|
||||
#endif
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user