This commit is contained in:
Matej Sekoranja
2014-11-24 11:02:34 +01:00
9 changed files with 169 additions and 46 deletions

View File

@@ -105,12 +105,13 @@ struct ServerEntry {
typedef map<string, ServerEntry> ServerMap;
static ServerMap serverMap;
void processSearchResponse(osiSockAddr const & responseFrom, ByteBuffer & receiveBuffer)
// return true if new server response is recevived
bool processSearchResponse(osiSockAddr const & responseFrom, ByteBuffer & receiveBuffer)
{
// first byte is PVA_MAGIC
int8 magic = receiveBuffer.getByte();
if(magic != PVA_MAGIC)
return;
return false;
// second byte version
int8 version = receiveBuffer.getByte();
@@ -130,11 +131,11 @@ void processSearchResponse(osiSockAddr const & responseFrom, ByteBuffer & receiv
// command ID and paylaod
int8 command = receiveBuffer.getByte();
if (command != (int8)0x04)
return;
return false;
size_t payloadSize = receiveBuffer.getInt();
if (payloadSize < (12+4+16+2))
return;
return false;
epics::pvAccess::GUID guid;
@@ -146,7 +147,8 @@ void processSearchResponse(osiSockAddr const & responseFrom, ByteBuffer & receiv
serverAddress.ia.sin_family = AF_INET;
// 128-bit IPv6 address
if (!decodeAsIPv6Address(&receiveBuffer, &serverAddress)) return;
if (!decodeAsIPv6Address(&receiveBuffer, &serverAddress))
return false;
// accept given address if explicitly specified by sender
if (serverAddress.ia.sin_addr.s_addr == INADDR_ANY)
@@ -178,7 +180,12 @@ void processSearchResponse(osiSockAddr const & responseFrom, ByteBuffer & receiv
}
if (!found)
{
vec.push_back(serverAddress);
return true;
}
else
return false;
}
else
{
@@ -189,9 +196,9 @@ void processSearchResponse(osiSockAddr const & responseFrom, ByteBuffer & receiv
serverEntry.version = version;
serverMap[guidString] = serverEntry;
}
return;
return true;
}
}
bool discoverServers(double timeOut)
@@ -267,6 +274,26 @@ bool discoverServers(double timeOut)
return false;
}
// set timeout
#ifdef _WIN32
// ms
DWORD timeout = 250;
#else
struct timeval timeout;
memset(&timeout, 0, sizeof(struct timeval));
timeout.tv_sec = 0;
timeout.tv_usec = 250000;
#endif
status = ::setsockopt (socket, SOL_SOCKET, SO_RCVTIMEO,
(char*)&timeout, sizeof(timeout));
if (status)
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
fprintf(stderr, "Error setting SO_RCVTIMEO: %s\n", errStr);
return false;
}
osiSockAddr responseAddress;
osiSocklen_t sockLen = sizeof(sockaddr);
// read the actual socket info
@@ -319,28 +346,14 @@ bool discoverServers(double timeOut)
return false;
// set timeout in case message is not sent
struct timeval timeout;
memset(&timeout, 0, sizeof(struct timeval));
timeout.tv_sec = 1;
timeout.tv_usec = 0;
status = ::setsockopt (socket, SOL_SOCKET, SO_RCVTIMEO,
(char*)&timeout, sizeof(timeout));
if (status)
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
fprintf(stderr, "Error setting SO_RCVTIMEO: %s\n", errStr);
return false;
}
char rxbuff[1024];
ByteBuffer receiveBuffer(rxbuff, sizeof(rxbuff)/sizeof(char));
osiSockAddr fromAddress;
osiSocklen_t addrStructSize = sizeof(sockaddr);
int sendCount = 0;
while (true)
{
receiveBuffer.clear();
@@ -349,6 +362,7 @@ bool discoverServers(double timeOut)
int bytesRead = ::recvfrom(socket, (char*)receiveBuffer.getArray(),
receiveBuffer.getRemaining(), 0,
(sockaddr*)&fromAddress, &addrStructSize);
if (bytesRead > 0)
{
receiveBuffer.setPosition(bytesRead);
@@ -357,9 +371,9 @@ bool discoverServers(double timeOut)
processSearchResponse(fromAddress, receiveBuffer);
}
else if (status <= 0)
else
{
if (status == -1)
if (bytesRead == -1)
{
int socketError = SOCKERRNO;
@@ -367,19 +381,52 @@ bool discoverServers(double timeOut)
if (socketError == SOCK_EINTR ||
socketError == EAGAIN || // no alias in libCom
// windows times out with this
//socketError == SOCK_ETIMEDOUT ||
socketError == SOCK_ETIMEDOUT ||
socketError == SOCK_EWOULDBLOCK)
continue;
if (socketError == SOCK_ECONNREFUSED || // avoid spurious ECONNREFUSED in Linux
{
// OK
}
else if (socketError == SOCK_ECONNREFUSED || // avoid spurious ECONNREFUSED in Linux
socketError == SOCK_ECONNRESET) // or ECONNRESET in Windows
continue;
{
// OK
}
else
{
// unexpected error
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
fprintf(stderr, "Socket recv error: %s\n", errStr);
break;
}
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
fprintf(stderr, "Socket recv error: %s\n", errStr);
}
break;
if (++sendCount < 3)
{
// TODO duplicate code
bool oneOK = false;
for (size_t i = 0; i < broadcastAddresses->size(); i++)
{
// send the packet
status = ::sendto(socket, sendBuffer.getArray(), sendBuffer.getPosition(), 0,
&((*broadcastAddresses)[i].sa), sizeof(sockaddr));
if (status < 0)
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
fprintf(stderr, "Send error: %s\n", errStr);
}
else
oneOK = true;
}
if (!oneOK)
return false;
}
else
break;
}
}

View File

@@ -27,7 +27,7 @@ using std::string;
PVACCESS_REFCOUNT_MONITOR_DEFINE(caChannel);
CAChannel::shared_pointer CAChannel::create(ChannelProvider::shared_pointer const & channelProvider,
CAChannel::shared_pointer CAChannel::create(CAChannelProvider::shared_pointer const & channelProvider,
std::string const & channelName,
short priority,
ChannelRequester::shared_pointer const & channelRequester)
@@ -185,7 +185,7 @@ void CAChannel::disconnected()
}
CAChannel::CAChannel(std::string const & _channelName,
ChannelProvider::shared_pointer const & _channelProvider,
CAChannelProvider::shared_pointer const & _channelProvider,
ChannelRequester::shared_pointer const & _channelRequester) :
channelName(_channelName),
channelProvider(_channelProvider),
@@ -423,6 +423,8 @@ void CAChannel::message(std::string const & message,MessageType messageType)
void CAChannel::destroy()
{
threadAttach();
Lock lock(requestsMutex);
{
while (!requests.empty())
@@ -439,6 +441,11 @@ void CAChannel::destroy()
/* ---------------------------------------------------------- */
void CAChannel::threadAttach()
{
std::tr1::static_pointer_cast<CAChannelProvider>(channelProvider)->threadAttach();
}
void CAChannel::registerRequest(ChannelRequest::shared_pointer const & request)
{
Lock lock(requestsMutex);
@@ -871,6 +878,8 @@ void CAChannelGet::getDone(struct event_handler_args &args)
void CAChannelGet::get()
{
channel->threadAttach();
/*
From R3.14.12 onwards ca_array_get_callback() replies will give a CA client application the current number
of elements in an array field, provided they specified an element count of zero in their original request.
@@ -1176,6 +1185,8 @@ void CAChannelPut::putDone(struct event_handler_args &args)
void CAChannelPut::put(PVStructure::shared_pointer const & pvPutStructure,
BitSet::shared_pointer const & /*putBitSet*/)
{
channel->threadAttach();
doPut putFunc = doPutFuncTable[channel->getNativeType()];
if (putFunc)
{
@@ -1229,6 +1240,8 @@ void CAChannelPut::getDone(struct event_handler_args &args)
void CAChannelPut::get()
{
channel->threadAttach();
int result = ca_array_get_callback(getType, channel->getElementCount(),
channel->getChannelID(), ca_put_get_handler, this);
if (result == ECA_NORMAL)
@@ -1388,6 +1401,8 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args)
epics::pvData::Status CAChannelMonitor::start()
{
channel->threadAttach();
/*
From R3.14.12 onwards when using the IOC server and the C++ client libraries monitor callbacks
replies will give a CA client application the current number of elements in an array field,
@@ -1419,6 +1434,8 @@ epics::pvData::Status CAChannelMonitor::start()
epics::pvData::Status CAChannelMonitor::stop()
{
channel->threadAttach();
int result = ca_clear_subscription(eventID);
if (result == ECA_NORMAL)
@@ -1466,6 +1483,8 @@ void CAChannelMonitor::cancel()
void CAChannelMonitor::destroy()
{
channel->threadAttach();
ca_clear_subscription(eventID);
// TODO

View File

@@ -12,6 +12,8 @@
/* for CA */
#include <cadef.h>
#include <caProvider.h>
namespace epics {
namespace pvAccess {
namespace ca {
@@ -24,7 +26,7 @@ class CAChannel :
public:
POINTER_DEFINITIONS(CAChannel);
static shared_pointer create(ChannelProvider::shared_pointer const & channelProvider,
static shared_pointer create(CAChannelProvider::shared_pointer const & channelProvider,
std::string const & channelName,
short priority,
ChannelRequester::shared_pointer const & channelRequester);
@@ -95,19 +97,21 @@ public:
/* ---------------------------------------------------------------- */
void threadAttach();
void registerRequest(ChannelRequest::shared_pointer const & request);
void unregisterRequest(ChannelRequest::shared_pointer const & request);
private:
CAChannel(std::string const & channelName,
ChannelProvider::shared_pointer const & channelProvider,
CAChannelProvider::shared_pointer const & channelProvider,
ChannelRequester::shared_pointer const & channelRequester);
void activate(short priority);
std::string channelName;
ChannelProvider::shared_pointer channelProvider;
CAChannelProvider::shared_pointer channelProvider;
ChannelRequester::shared_pointer channelRequester;
chid channelID;

View File

@@ -27,7 +27,7 @@ using namespace epics::pvAccess::ca;
std::string CAChannelProvider::PROVIDER_NAME = "ca";
CAChannelProvider::CAChannelProvider()
CAChannelProvider::CAChannelProvider() : current_context(0)
{
initialize();
}
@@ -75,6 +75,8 @@ Channel::shared_pointer CAChannelProvider::createChannel(
ChannelRequester::shared_pointer const & channelRequester,
short priority)
{
threadAttach();
static std::string emptyString;
return createChannel(channelName, channelRequester, priority, emptyString);
}
@@ -119,6 +121,11 @@ void CAChannelProvider::destroy()
ca_context_destroy();
}
void CAChannelProvider::threadAttach()
{
ca_attach_context(current_context);
}
void CAChannelProvider::registerChannel(Channel::shared_pointer const & channel)
{
Lock lock(channelsMutex);
@@ -140,6 +147,8 @@ void CAChannelProvider::initialize()
"to start channel access:") + ca_message(result));
}
current_context = ca_current_context();
// TODO create a ca_poll thread, if ca_disable_preemptive_callback
}

View File

@@ -7,6 +7,8 @@
#ifndef CAPROVIDER_H
#define CAPROVIDER_H
#include <cadef.h>
#include <pv/pvAccess.h>
#include <map>
@@ -57,6 +59,8 @@ public:
/* ---------------------------------------------------------------- */
void threadAttach();
void registerChannel(Channel::shared_pointer const & channel);
void unregisterChannel(Channel::shared_pointer const & channel);
@@ -64,6 +68,8 @@ private:
void initialize();
ca_client_context* current_context;
epics::pvData::Mutex channelsMutex;
// TODO std::unordered_map
// void* is not the nicest thing, but there is no fast weak_ptr==

View File

@@ -26,7 +26,7 @@
// TODO to be generated, etc.
#define EPICS_PVA_MAJOR_VERSION 4
#define EPICS_PVA_MINOR_VERSION 0
#define EPICS_PVA_MAINTENANCE_VERSION 2
#define EPICS_PVA_MAINTENANCE_VERSION 3
#define EPICS_PVA_DEVELOPMENT_FLAG 0
namespace epics {

View File

@@ -14,6 +14,8 @@
#endif
#include <sstream>
#include <time.h>
#include <stdlib.h>
#include <pv/responseHandlers.h>
#include <pv/remote.h>
@@ -27,8 +29,6 @@
#include <osiProcess.h>
#include <pv/logger.h>
#include <sstream>
#include <pv/pvAccessMB.h>
#include <pv/rpcServer.h>
#include <pv/security.h>
@@ -206,6 +206,8 @@ std::string ServerSearchHandler::SUPPORTED_PROTOCOL = "tcp";
ServerSearchHandler::ServerSearchHandler(ServerContextImpl::shared_pointer const & context) :
AbstractServerResponseHandler(context, "Search request"), _providers(context->getChannelProviders())
{
// initialize random seed with some random value
srand ( time(NULL) );
}
void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom,
@@ -254,8 +256,11 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom,
transport->ensureData(2);
const int32 count = payloadBuffer->getShort() & 0xFFFF;
// TODO DoS attack?
const bool responseRequired = (QOS_REPLY_REQUIRED & qosCode) != 0;
// TODO bloom filter or similar server selection (by GUID)
//
// locally broadcast if unicast (qosCode & 0x80 == 0x80)
//
@@ -306,12 +311,17 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom,
{
if (allowed)
{
// TODO constant
#define MAX_SERVER_SEARCH_RESPONSE_DELAY_MS 100
double period = (rand() % MAX_SERVER_SEARCH_RESPONSE_DELAY_MS)/(double)1000;
ServerChannelFindRequesterImpl* pr = new ServerChannelFindRequesterImpl(_context, 1);
pr->set("", searchSequenceId, 0, responseAddress, true, true);
// TODO use std::make_shared
std::tr1::shared_ptr<ServerChannelFindRequesterImpl> tp(pr);
ChannelFindRequester::shared_pointer spr = tp;
spr->channelFindResult(Status::Ok, ChannelFind::shared_pointer(), false);
TimerCallback::shared_pointer tc = tp;
_context->getTimer()->scheduleAfterDelay(tc, period);
}
}
}
@@ -335,6 +345,16 @@ void ServerChannelFindRequesterImpl::clear()
_serverSearch = false;
}
void ServerChannelFindRequesterImpl::callback()
{
channelFindResult(Status::Ok, ChannelFind::shared_pointer(), false);
}
void ServerChannelFindRequesterImpl::timerStopped()
{
// noop
}
ServerChannelFindRequesterImpl* ServerChannelFindRequesterImpl::set(std::string name, int32 searchSequenceId, int32 cid, osiSockAddr const & sendTo,
bool responseRequired, bool serverSearch)
{

View File

@@ -7,6 +7,8 @@
#ifndef RESPONSEHANDLERS_H_
#define RESPONSEHANDLERS_H_
#include <pv/timer.h>
#include <pv/serverContext.h>
#include <pv/remote.h>
#include <pv/serverChannelImpl.h>
@@ -161,6 +163,7 @@ namespace pvAccess {
class ServerChannelFindRequesterImpl:
public ChannelFindRequester,
public TransportSender,
public epics::pvData::TimerCallback,
public std::tr1::enable_shared_from_this<ServerChannelFindRequesterImpl>
{
public:
@@ -170,9 +173,14 @@ namespace pvAccess {
ServerChannelFindRequesterImpl* set(std::string _name, epics::pvData::int32 searchSequenceId,
epics::pvData::int32 cid, osiSockAddr const & sendTo, bool responseRequired, bool serverSearch);
void channelFindResult(const epics::pvData::Status& status, ChannelFind::shared_pointer const & channelFind, bool wasFound);
void lock();
void lock();
void unlock();
void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control);
void callback();
void timerStopped();
private:
GUID _guid;
std::string _name;

View File

@@ -76,6 +76,16 @@ const Version& ServerContextImpl::getVersion()
return ServerContextImpl::VERSION;
}
/*
#ifdef WIN32
UUID uuid;
UuidCreate ( &uuid );
#else
uuid_t uuid;
uuid_generate_random ( uuid );
#endif
*/
void ServerContextImpl::generateGUID()
{
// TODO use UUID