merge
This commit is contained in:
@@ -1670,10 +1670,13 @@ int main (int argc, char *argv[])
|
||||
serviceRequest = true;
|
||||
}
|
||||
|
||||
static string noAddress;
|
||||
|
||||
// PVs mode
|
||||
if (!serviceRequest)
|
||||
{
|
||||
vector<string> pvs;
|
||||
vector<string> pvsAddress;
|
||||
vector<string> providerNames;
|
||||
|
||||
if (validURI)
|
||||
@@ -1688,8 +1691,6 @@ int main (int argc, char *argv[])
|
||||
return 1;
|
||||
}
|
||||
|
||||
// authority = uri.host;
|
||||
|
||||
if (uri.path.length() <= 1)
|
||||
{
|
||||
std::cerr << "invalid URI, empty path" << std::endl;
|
||||
@@ -1699,6 +1700,7 @@ int main (int argc, char *argv[])
|
||||
|
||||
// skip trailing '/'
|
||||
pvs.push_back(uri.path.substr(1));
|
||||
pvsAddress.push_back(uri.host);
|
||||
providerNames.push_back(uri.protocol);
|
||||
}
|
||||
else
|
||||
@@ -1719,8 +1721,6 @@ int main (int argc, char *argv[])
|
||||
return 1;
|
||||
}
|
||||
|
||||
// authority = uri.host;
|
||||
|
||||
if (uri.path.length() <= 1)
|
||||
{
|
||||
std::cerr << "invalid URI, empty path" << std::endl;
|
||||
@@ -1730,11 +1730,13 @@ int main (int argc, char *argv[])
|
||||
|
||||
// skip trailing '/'
|
||||
pvs.push_back(uri.path.substr(1));
|
||||
pvsAddress.push_back(uri.host);
|
||||
providerNames.push_back(uri.protocol);
|
||||
}
|
||||
else
|
||||
{
|
||||
pvs.push_back(argv[optind]);
|
||||
pvsAddress.push_back(noAddress);
|
||||
providerNames.push_back(defaultProvider);
|
||||
}
|
||||
}
|
||||
@@ -1756,8 +1758,11 @@ int main (int argc, char *argv[])
|
||||
for (int n = 0; n < nPvs; n++)
|
||||
{
|
||||
shared_ptr<ChannelRequesterImpl> channelRequesterImpl(new ChannelRequesterImpl(quiet));
|
||||
// TODO no provider check
|
||||
channels[n] = getChannelProviderRegistry()->getProvider(providerNames[n])->createChannel(pvs[n], channelRequesterImpl);
|
||||
if (pvsAddress[n].empty())
|
||||
channels[n] = getChannelProviderRegistry()->getProvider(providerNames[n])->createChannel(pvs[n], channelRequesterImpl);
|
||||
else
|
||||
channels[n] = getChannelProviderRegistry()->getProvider(providerNames[n])->createChannel(pvs[n], channelRequesterImpl,
|
||||
ChannelProvider::PRIORITY_DEFAULT, pvsAddress[n]);
|
||||
}
|
||||
|
||||
// TODO maybe unify for nPvs == 1?!
|
||||
@@ -1787,6 +1792,7 @@ int main (int argc, char *argv[])
|
||||
else
|
||||
{
|
||||
string cn;
|
||||
string ca;
|
||||
string cp;
|
||||
|
||||
// read next channel name from stream
|
||||
@@ -1808,8 +1814,6 @@ int main (int argc, char *argv[])
|
||||
return 1;
|
||||
}
|
||||
|
||||
// authority = uri.host;
|
||||
|
||||
if (uri.path.length() <= 1)
|
||||
{
|
||||
std::cerr << "invalid URI, empty path" << std::endl;
|
||||
@@ -1819,19 +1823,24 @@ int main (int argc, char *argv[])
|
||||
|
||||
// skip trailing '/'
|
||||
cn = uri.path.substr(1);
|
||||
ca = uri.host;
|
||||
cp = uri.protocol;
|
||||
}
|
||||
else
|
||||
{
|
||||
// leave cn as it is, use default provider
|
||||
ca = noAddress;
|
||||
cp = defaultProvider;
|
||||
}
|
||||
|
||||
|
||||
|
||||
shared_ptr<ChannelRequesterImpl> channelRequesterImpl(new ChannelRequesterImpl(quiet));
|
||||
// TODO no provider check
|
||||
channel = getChannelProviderRegistry()->getProvider(cp)->createChannel(cn, channelRequesterImpl);
|
||||
if (ca.empty())
|
||||
channel = getChannelProviderRegistry()->getProvider(cp)->createChannel(cn, channelRequesterImpl);
|
||||
else
|
||||
channel = getChannelProviderRegistry()->getProvider(cp)->createChannel(cn, channelRequesterImpl,
|
||||
ChannelProvider::PRIORITY_DEFAULT, ca);
|
||||
}
|
||||
|
||||
if (monitor)
|
||||
|
||||
@@ -26,7 +26,7 @@
|
||||
// TODO to be generated, etc.
|
||||
#define EPICS_PVA_MAJOR_VERSION 4
|
||||
#define EPICS_PVA_MINOR_VERSION 0
|
||||
#define EPICS_PVA_MAINTENANCE_VERSION 1
|
||||
#define EPICS_PVA_MAINTENANCE_VERSION 2
|
||||
#define EPICS_PVA_DEVELOPMENT_FLAG 0
|
||||
|
||||
namespace epics {
|
||||
|
||||
@@ -2775,6 +2775,82 @@ namespace epics {
|
||||
}
|
||||
};
|
||||
|
||||
class SearchHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods {
|
||||
public:
|
||||
SearchHandler(ClientContextImpl::shared_pointer const & context) :
|
||||
AbstractClientResponseHandler(context, "Search")
|
||||
{
|
||||
}
|
||||
|
||||
virtual ~SearchHandler() {
|
||||
}
|
||||
|
||||
virtual void handleResponse(osiSockAddr* responseFrom,
|
||||
Transport::shared_pointer const & transport, int8 version, int8 command,
|
||||
size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer)
|
||||
{
|
||||
AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
|
||||
|
||||
transport->ensureData(4+1+3+16+2);
|
||||
|
||||
size_t startPosition = payloadBuffer->getPosition();
|
||||
|
||||
/*const int32 searchSequenceId =*/ payloadBuffer->getInt();
|
||||
const int8 qosCode = payloadBuffer->getByte();
|
||||
|
||||
// reserved part
|
||||
payloadBuffer->getByte();
|
||||
payloadBuffer->getShort();
|
||||
|
||||
osiSockAddr responseAddress;
|
||||
responseAddress.ia.sin_family = AF_INET;
|
||||
|
||||
// 128-bit IPv6 address
|
||||
if (!decodeAsIPv6Address(payloadBuffer, &responseAddress)) return;
|
||||
|
||||
// accept given address if explicitly specified by sender
|
||||
if (responseAddress.ia.sin_addr.s_addr == INADDR_ANY)
|
||||
responseAddress.ia.sin_addr = responseFrom->ia.sin_addr;
|
||||
|
||||
// NOTE: htons might be a macro (e.g. vxWorks)
|
||||
int16 port = payloadBuffer->getShort();
|
||||
responseAddress.ia.sin_port = htons(port);
|
||||
|
||||
// we ignore the rest, since we care only about data relevant
|
||||
// to do the local multicast
|
||||
|
||||
//
|
||||
// locally broadcast if unicast (qosCode & 0x80 == 0x80)
|
||||
//
|
||||
if ((qosCode & 0x80) == 0x80)
|
||||
{
|
||||
// TODO optimize
|
||||
ClientContextImpl::shared_pointer context = _context.lock();
|
||||
if (!context)
|
||||
return;
|
||||
|
||||
BlockingUDPTransport::shared_pointer bt =
|
||||
//context->getLocalMulticastTransport();
|
||||
std::tr1::dynamic_pointer_cast<BlockingUDPTransport>(context->getSearchTransport());
|
||||
|
||||
if (bt)
|
||||
{
|
||||
// clear unicast flag
|
||||
payloadBuffer->put(startPosition+4, (int8)(qosCode & ~0x80));
|
||||
|
||||
// update response address
|
||||
payloadBuffer->setPosition(startPosition+8);
|
||||
encodeAsIPv6Address(payloadBuffer, &responseAddress);
|
||||
|
||||
payloadBuffer->setPosition(payloadBuffer->getLimit()); // send will call flip()
|
||||
|
||||
bt->send(payloadBuffer, context->getLocalBroadcastAddress());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
class BeaconResponseHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods {
|
||||
public:
|
||||
@@ -2801,7 +2877,8 @@ namespace epics {
|
||||
GUID guid;
|
||||
payloadBuffer->get(guid.value, 0, sizeof(guid.value));
|
||||
|
||||
int16 sequentalID = payloadBuffer->getShort();
|
||||
/*int8 qosCode =*/ payloadBuffer->getByte();
|
||||
int8 sequentalID = payloadBuffer->getByte();
|
||||
int16 changeCount = payloadBuffer->getShort();
|
||||
|
||||
osiSockAddr serverAddress;
|
||||
@@ -3013,7 +3090,7 @@ namespace epics {
|
||||
m_handlerTable[CMD_BEACON].reset(new BeaconResponseHandler(context)); /* 0 */
|
||||
m_handlerTable[CMD_CONNECTION_VALIDATION].reset(new ClientConnectionValidationHandler(context)); /* 1 */
|
||||
m_handlerTable[CMD_ECHO].reset(new NoopResponse(context, "Echo")); /* 2 */
|
||||
m_handlerTable[CMD_SEARCH].reset(new NoopResponse(context, "Search")); /* 3 */
|
||||
m_handlerTable[CMD_SEARCH].reset(new SearchHandler(context)); /* 3 */
|
||||
m_handlerTable[CMD_SEARCH_RESPONSE].reset(new SearchResponseHandler(context)); /* 4 */
|
||||
m_handlerTable[CMD_AUTHNZ].reset(new AuthNZHandler(context.get())); /* 5 */
|
||||
m_handlerTable[CMD_ACL_CHANGE].reset(new NoopResponse(context, "Access rights change")); /* 6 */
|
||||
@@ -3249,7 +3326,8 @@ namespace epics {
|
||||
*/
|
||||
class InternalChannelImpl :
|
||||
public ChannelImpl,
|
||||
public std::tr1::enable_shared_from_this<InternalChannelImpl>
|
||||
public std::tr1::enable_shared_from_this<InternalChannelImpl>,
|
||||
public TimerCallback
|
||||
{
|
||||
private:
|
||||
|
||||
@@ -3282,7 +3360,12 @@ namespace epics {
|
||||
* List of fixed addresses, if <code<0</code> name resolution will be used.
|
||||
*/
|
||||
auto_ptr<InetAddrVector> m_addresses;
|
||||
|
||||
|
||||
/**
|
||||
* @brief m_addressIndex Index of currently used address (rollover pointer in a list).
|
||||
*/
|
||||
int m_addressIndex;
|
||||
|
||||
/**
|
||||
* Connection status.
|
||||
*/
|
||||
@@ -3352,6 +3435,7 @@ namespace epics {
|
||||
m_requester(requester),
|
||||
m_priority(priority),
|
||||
m_addresses(addresses),
|
||||
m_addressIndex(0),
|
||||
m_connectionState(NEVER_CONNECTED),
|
||||
m_needSubscriptionUpdate(false),
|
||||
m_allowCreation(true),
|
||||
@@ -3593,6 +3677,8 @@ namespace epics {
|
||||
m_serverChannelID = sid;
|
||||
//setAccessRights(rights);
|
||||
|
||||
m_addressIndex = 0; // reset
|
||||
|
||||
// user might create monitors in listeners, so this has to be done before this can happen
|
||||
// however, it would not be nice if events would come before connection event is fired
|
||||
// but this cannot happen since transport (TCP) is serving in this thread
|
||||
@@ -3709,6 +3795,9 @@ namespace epics {
|
||||
|
||||
}
|
||||
|
||||
#define STATIC_SEARCH_BASE_DELAY_SEC 5
|
||||
#define STATIC_SEARCH_MAX_MULTIPLIER 10
|
||||
|
||||
/**
|
||||
* Initiate search (connect) procedure.
|
||||
*/
|
||||
@@ -3724,13 +3813,29 @@ namespace epics {
|
||||
}
|
||||
else if (!m_addresses->empty())
|
||||
{
|
||||
// TODO not only first !!!
|
||||
// TODO minor version !!!
|
||||
// TODO what to do if there is no channel, do not search in a loop!!! do this in other thread...!
|
||||
searchResponse(PVA_PROTOCOL_REVISION, &((*m_addresses)[0]));
|
||||
TimerCallback::shared_pointer tc = std::tr1::dynamic_pointer_cast<TimerCallback>(shared_from_this());
|
||||
m_context->getTimer()->scheduleAfterDelay(tc,
|
||||
(m_addressIndex / m_addresses->size())*STATIC_SEARCH_BASE_DELAY_SEC);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
virtual void callback() {
|
||||
// TODO cancellaction?!
|
||||
// TODO not in this timer thread !!!
|
||||
// TODO boost when a server (from address list) is started!!! IP vs address !!!
|
||||
int ix = m_addressIndex % m_addresses->size();
|
||||
m_addressIndex++;
|
||||
if (m_addressIndex >= static_cast<int>(m_addresses->size()*(STATIC_SEARCH_MAX_MULTIPLIER+1)))
|
||||
m_addressIndex = m_addresses->size()*STATIC_SEARCH_MAX_MULTIPLIER;
|
||||
|
||||
// NOTE: calls channelConnectFailed() on failure
|
||||
searchResponse(PVA_PROTOCOL_REVISION, &((*m_addresses)[ix]));
|
||||
}
|
||||
|
||||
virtual void timerStopped() {
|
||||
// noop
|
||||
}
|
||||
|
||||
virtual void searchResponse(int8 minorRevision, osiSockAddr* serverAddress) {
|
||||
Lock guard(m_channelMutex);
|
||||
Transport::shared_pointer transport = m_transport;
|
||||
@@ -4226,6 +4331,11 @@ TODO
|
||||
PVACCESS_REFCOUNT_MONITOR_DESTRUCT(remoteClientContext);
|
||||
};
|
||||
|
||||
virtual const osiSockAddr& getLocalBroadcastAddress() const
|
||||
{
|
||||
return m_localBroadcastAddress;
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
void loadConfiguration() {
|
||||
@@ -4331,6 +4441,39 @@ TODO
|
||||
return false;
|
||||
m_searchTransport->setSendAddresses(broadcastAddresses.get());
|
||||
|
||||
// TODO do not use searchBroadcast in future
|
||||
// setup local broadcasting
|
||||
// TODO configurable local NIF, address
|
||||
osiSockAddr loAddr;
|
||||
getLoopbackNIF(loAddr, "", 0);
|
||||
if (true)
|
||||
{
|
||||
try
|
||||
{
|
||||
//osiSockAddr group;
|
||||
aToIPAddr("224.0.0.128", m_broadcastPort, &m_localBroadcastAddress.ia);
|
||||
m_broadcastTransport->join(m_localBroadcastAddress, loAddr);
|
||||
|
||||
// NOTE: this disables usage of multicast addresses in EPICS_PVA_ADDR_LIST
|
||||
m_searchTransport->setMutlicastNIF(loAddr, true);
|
||||
|
||||
//InetAddrVector sendAddressList;
|
||||
//sendAddressList.push_back(group);
|
||||
//m_searchTransport->setSendAddresses(&sendAddressList);
|
||||
|
||||
LOG(logLevelDebug, "Local multicast enabled on %s using network interface %s.",
|
||||
inetAddressToString(m_localBroadcastAddress).c_str(), inetAddressToString(loAddr, false).c_str());
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
LOG(logLevelDebug, "Failed to initialize local multicast, funcionality disabled. Reason: %s.", ex.what());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(logLevelDebug, "Failed to detect a loopback network interface, local multicast disabled.");
|
||||
}
|
||||
|
||||
// become active
|
||||
m_broadcastTransport->start();
|
||||
m_searchTransport->start();
|
||||
@@ -4844,6 +4987,8 @@ TODO
|
||||
TransportRegistry::transportVector_t m_flushTransports;
|
||||
|
||||
FlushStrategy m_flushStrategy;
|
||||
|
||||
osiSockAddr m_localBroadcastAddress;
|
||||
};
|
||||
|
||||
ClientContextImpl::shared_pointer createClientContextImpl()
|
||||
|
||||
@@ -128,6 +128,8 @@ namespace epics {
|
||||
virtual void poll() = 0;
|
||||
|
||||
virtual void destroy() = 0;
|
||||
|
||||
virtual const osiSockAddr& getLocalBroadcastAddress() const = 0;
|
||||
};
|
||||
|
||||
epicsShareExtern ClientContextImpl::shared_pointer createClientContextImpl();
|
||||
|
||||
@@ -78,8 +78,12 @@ void BeaconEmitter::send(ByteBuffer* buffer, TransportSendControl* control)
|
||||
control->startMessage((int8)0, 12+2+2+16+2);
|
||||
|
||||
buffer->put(_guid.value, 0, sizeof(_guid.value));
|
||||
buffer->putShort(_beaconSequenceID);
|
||||
|
||||
|
||||
// TODO qos/flags (e.g. multicast/unicast)
|
||||
buffer->putByte(0);
|
||||
|
||||
buffer->putByte(_beaconSequenceID);
|
||||
|
||||
// TODO for now fixed changeCount
|
||||
buffer->putShort(0);
|
||||
|
||||
@@ -124,7 +128,7 @@ void BeaconEmitter::start()
|
||||
|
||||
void BeaconEmitter::reschedule()
|
||||
{
|
||||
const double period = (_beaconSequenceID >= _beaconCountLimit) ? _slowBeaconPeriod : _fastBeaconPeriod;
|
||||
const double period = (_beaconSequenceID >= _beaconCountLimit) ? _slowBeaconPeriod : _fastBeaconPeriod;
|
||||
if (period > 0)
|
||||
{
|
||||
_timer->scheduleAfterDelay(shared_from_this(), period);
|
||||
|
||||
@@ -107,7 +107,7 @@ namespace epics { namespace pvAccess {
|
||||
/**
|
||||
* Beacon sequence ID.
|
||||
*/
|
||||
epics::pvData::int16 _beaconSequenceID;
|
||||
epics::pvData::int8 _beaconSequenceID;
|
||||
|
||||
/**
|
||||
* Server GUID.
|
||||
|
||||
@@ -146,8 +146,13 @@ InetAddrVector* getSocketAddressList(const std::string & list, int defaultPort,
|
||||
const InetAddrVector* appendList) {
|
||||
InetAddrVector* iav = new InetAddrVector();
|
||||
|
||||
// parse string
|
||||
// skip leading spaces
|
||||
size_t len = list.length();
|
||||
size_t subStart = 0;
|
||||
while (subStart < len && isspace(list[subStart]))
|
||||
subStart++;
|
||||
|
||||
// parse string
|
||||
size_t subEnd;
|
||||
while((subEnd = list.find(' ', subStart))!=std::string::npos) {
|
||||
string address = list.substr(subStart, (subEnd-subStart));
|
||||
@@ -157,7 +162,7 @@ InetAddrVector* getSocketAddressList(const std::string & list, int defaultPort,
|
||||
subStart = list.find_first_not_of(" \t\r\n\v", subEnd);
|
||||
}
|
||||
|
||||
if(subStart!=std::string::npos&&list.length()>0) {
|
||||
if(subStart!=std::string::npos && subStart<len) {
|
||||
osiSockAddr addr;
|
||||
if (aToIPAddr(list.substr(subStart).c_str(), defaultPort, &addr.ia) == 0)
|
||||
iav->push_back(addr);
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
|
||||
#define TESTSERVERNOMAIN
|
||||
|
||||
#include <envDefs.h>
|
||||
#include <epicsExit.h>
|
||||
#include <epicsUnitTest.h>
|
||||
#include <testMain.h>
|
||||
@@ -93,6 +94,10 @@ class ChannelAccessIFRemoteTest: public ChannelAccessIFTest {
|
||||
|
||||
MAIN(testChannelAccess)
|
||||
{
|
||||
// note: this leaks memory (uses putenv)
|
||||
epicsEnvSet("EPICS_PVA_ADDR_LIST", "127.0.0.1");
|
||||
epicsEnvSet("EPICS_PVA_AUTO_ADDR_LIST", "0");
|
||||
|
||||
SET_LOG_LEVEL(logLevelError);
|
||||
ChannelAccessIFRemoteTest caRemoteTest;
|
||||
return caRemoteTest.runAllTest();
|
||||
|
||||
@@ -74,7 +74,38 @@ void test_getSocketAddressList()
|
||||
testOk1(htons(555) == addr.ia.sin_port);
|
||||
testOk1(htonl(0xC0A80304) == addr.ia.sin_addr.s_addr);
|
||||
testOk1("192.168.3.4:555" == inetAddressToString(addr));
|
||||
|
||||
|
||||
|
||||
// empty
|
||||
auto_ptr<InetAddrVector> vec2(getSocketAddressList("", 1111));
|
||||
testOk1(static_cast<size_t>(0) == vec2->size());
|
||||
|
||||
// just spaces
|
||||
auto_ptr<InetAddrVector> vec3(getSocketAddressList(" ", 1111));
|
||||
testOk1(static_cast<size_t>(0) == vec3->size());
|
||||
|
||||
// leading spaces
|
||||
auto_ptr<InetAddrVector> vec4(getSocketAddressList(" 127.0.0.1 10.10.12.11:1234 192.168.3.4", 555));
|
||||
|
||||
testOk1(static_cast<size_t>(3) == vec4->size());
|
||||
|
||||
addr = vec4->at(0);
|
||||
testOk1(AF_INET == addr.ia.sin_family);
|
||||
testOk1(htons(555) == addr.ia.sin_port);
|
||||
testOk1(htonl(0x7F000001) == addr.ia.sin_addr.s_addr);
|
||||
testOk1("127.0.0.1:555" == inetAddressToString(addr));
|
||||
|
||||
addr = vec4->at(1);
|
||||
testOk1(AF_INET == addr.ia.sin_family);
|
||||
testOk1(htons(1234) == addr.ia.sin_port);
|
||||
testOk1(htonl(0x0A0A0C0B) == addr.ia.sin_addr.s_addr);
|
||||
testOk1("10.10.12.11:1234" == inetAddressToString(addr));
|
||||
|
||||
addr = vec4->at(2);
|
||||
testOk1(AF_INET == addr.ia.sin_family);
|
||||
testOk1(htons(555) == addr.ia.sin_port);
|
||||
testOk1(htonl(0xC0A80304) == addr.ia.sin_addr.s_addr);
|
||||
testOk1("192.168.3.4:555" == inetAddressToString(addr));
|
||||
}
|
||||
|
||||
void test_ipv4AddressToInt()
|
||||
@@ -329,7 +360,7 @@ void test_multicastLoopback()
|
||||
|
||||
MAIN(testInetAddressUtils)
|
||||
{
|
||||
testPlan(68);
|
||||
testPlan(83);
|
||||
testDiag("Tests for InetAddress utils");
|
||||
|
||||
test_getSocketAddressList();
|
||||
|
||||
Reference in New Issue
Block a user