diff --git a/pvtoolsSrc/pvlist.cpp b/pvtoolsSrc/pvlist.cpp index cfdf2db..7b5d72a 100644 --- a/pvtoolsSrc/pvlist.cpp +++ b/pvtoolsSrc/pvlist.cpp @@ -105,12 +105,13 @@ struct ServerEntry { typedef map ServerMap; static ServerMap serverMap; -void processSearchResponse(osiSockAddr const & responseFrom, ByteBuffer & receiveBuffer) +// return true if new server response is recevived +bool processSearchResponse(osiSockAddr const & responseFrom, ByteBuffer & receiveBuffer) { // first byte is PVA_MAGIC int8 magic = receiveBuffer.getByte(); if(magic != PVA_MAGIC) - return; + return false; // second byte version int8 version = receiveBuffer.getByte(); @@ -130,11 +131,11 @@ void processSearchResponse(osiSockAddr const & responseFrom, ByteBuffer & receiv // command ID and paylaod int8 command = receiveBuffer.getByte(); if (command != (int8)0x04) - return; + return false; size_t payloadSize = receiveBuffer.getInt(); if (payloadSize < (12+4+16+2)) - return; + return false; epics::pvAccess::GUID guid; @@ -146,7 +147,8 @@ void processSearchResponse(osiSockAddr const & responseFrom, ByteBuffer & receiv serverAddress.ia.sin_family = AF_INET; // 128-bit IPv6 address - if (!decodeAsIPv6Address(&receiveBuffer, &serverAddress)) return; + if (!decodeAsIPv6Address(&receiveBuffer, &serverAddress)) + return false; // accept given address if explicitly specified by sender if (serverAddress.ia.sin_addr.s_addr == INADDR_ANY) @@ -178,7 +180,12 @@ void processSearchResponse(osiSockAddr const & responseFrom, ByteBuffer & receiv } if (!found) + { vec.push_back(serverAddress); + return true; + } + else + return false; } else { @@ -189,9 +196,9 @@ void processSearchResponse(osiSockAddr const & responseFrom, ByteBuffer & receiv serverEntry.version = version; serverMap[guidString] = serverEntry; - } - return; + return true; + } } bool discoverServers(double timeOut) @@ -267,6 +274,26 @@ bool discoverServers(double timeOut) return false; } + // set timeout +#ifdef _WIN32 + // ms + DWORD timeout = 250; +#else + struct timeval timeout; + memset(&timeout, 0, sizeof(struct timeval)); + timeout.tv_sec = 0; + timeout.tv_usec = 250000; +#endif + status = ::setsockopt (socket, SOL_SOCKET, SO_RCVTIMEO, + (char*)&timeout, sizeof(timeout)); + if (status) + { + char errStr[64]; + epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); + fprintf(stderr, "Error setting SO_RCVTIMEO: %s\n", errStr); + return false; + } + osiSockAddr responseAddress; osiSocklen_t sockLen = sizeof(sockaddr); // read the actual socket info @@ -319,28 +346,14 @@ bool discoverServers(double timeOut) return false; - // set timeout in case message is not sent - struct timeval timeout; - memset(&timeout, 0, sizeof(struct timeval)); - timeout.tv_sec = 1; - timeout.tv_usec = 0; - - status = ::setsockopt (socket, SOL_SOCKET, SO_RCVTIMEO, - (char*)&timeout, sizeof(timeout)); - if (status) - { - char errStr[64]; - epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); - fprintf(stderr, "Error setting SO_RCVTIMEO: %s\n", errStr); - return false; - } - char rxbuff[1024]; ByteBuffer receiveBuffer(rxbuff, sizeof(rxbuff)/sizeof(char)); osiSockAddr fromAddress; osiSocklen_t addrStructSize = sizeof(sockaddr); + int sendCount = 0; + while (true) { receiveBuffer.clear(); @@ -349,6 +362,7 @@ bool discoverServers(double timeOut) int bytesRead = ::recvfrom(socket, (char*)receiveBuffer.getArray(), receiveBuffer.getRemaining(), 0, (sockaddr*)&fromAddress, &addrStructSize); + if (bytesRead > 0) { receiveBuffer.setPosition(bytesRead); @@ -357,9 +371,9 @@ bool discoverServers(double timeOut) processSearchResponse(fromAddress, receiveBuffer); } - else if (status <= 0) + else { - if (status == -1) + if (bytesRead == -1) { int socketError = SOCKERRNO; @@ -367,19 +381,52 @@ bool discoverServers(double timeOut) if (socketError == SOCK_EINTR || socketError == EAGAIN || // no alias in libCom // windows times out with this - //socketError == SOCK_ETIMEDOUT || + socketError == SOCK_ETIMEDOUT || socketError == SOCK_EWOULDBLOCK) - continue; - - if (socketError == SOCK_ECONNREFUSED || // avoid spurious ECONNREFUSED in Linux + { + // OK + } + else if (socketError == SOCK_ECONNREFUSED || // avoid spurious ECONNREFUSED in Linux socketError == SOCK_ECONNRESET) // or ECONNRESET in Windows - continue; + { + // OK + } + else + { + // unexpected error + char errStr[64]; + epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); + fprintf(stderr, "Socket recv error: %s\n", errStr); + break; + } - char errStr[64]; - epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); - fprintf(stderr, "Socket recv error: %s\n", errStr); } - break; + + if (++sendCount < 3) + { + // TODO duplicate code + bool oneOK = false; + for (size_t i = 0; i < broadcastAddresses->size(); i++) + { + // send the packet + status = ::sendto(socket, sendBuffer.getArray(), sendBuffer.getPosition(), 0, + &((*broadcastAddresses)[i].sa), sizeof(sockaddr)); + if (status < 0) + { + char errStr[64]; + epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); + fprintf(stderr, "Send error: %s\n", errStr); + } + else + oneOK = true; + } + + if (!oneOK) + return false; + + } + else + break; } } diff --git a/src/ca/caChannel.cpp b/src/ca/caChannel.cpp index 02a979e..e010506 100644 --- a/src/ca/caChannel.cpp +++ b/src/ca/caChannel.cpp @@ -27,7 +27,7 @@ using std::string; PVACCESS_REFCOUNT_MONITOR_DEFINE(caChannel); -CAChannel::shared_pointer CAChannel::create(ChannelProvider::shared_pointer const & channelProvider, +CAChannel::shared_pointer CAChannel::create(CAChannelProvider::shared_pointer const & channelProvider, std::string const & channelName, short priority, ChannelRequester::shared_pointer const & channelRequester) @@ -185,7 +185,7 @@ void CAChannel::disconnected() } CAChannel::CAChannel(std::string const & _channelName, - ChannelProvider::shared_pointer const & _channelProvider, + CAChannelProvider::shared_pointer const & _channelProvider, ChannelRequester::shared_pointer const & _channelRequester) : channelName(_channelName), channelProvider(_channelProvider), @@ -423,6 +423,8 @@ void CAChannel::message(std::string const & message,MessageType messageType) void CAChannel::destroy() { + threadAttach(); + Lock lock(requestsMutex); { while (!requests.empty()) @@ -439,6 +441,11 @@ void CAChannel::destroy() /* ---------------------------------------------------------- */ +void CAChannel::threadAttach() +{ + std::tr1::static_pointer_cast(channelProvider)->threadAttach(); +} + void CAChannel::registerRequest(ChannelRequest::shared_pointer const & request) { Lock lock(requestsMutex); @@ -871,6 +878,8 @@ void CAChannelGet::getDone(struct event_handler_args &args) void CAChannelGet::get() { + channel->threadAttach(); + /* From R3.14.12 onwards ca_array_get_callback() replies will give a CA client application the current number of elements in an array field, provided they specified an element count of zero in their original request. @@ -1176,6 +1185,8 @@ void CAChannelPut::putDone(struct event_handler_args &args) void CAChannelPut::put(PVStructure::shared_pointer const & pvPutStructure, BitSet::shared_pointer const & /*putBitSet*/) { + channel->threadAttach(); + doPut putFunc = doPutFuncTable[channel->getNativeType()]; if (putFunc) { @@ -1229,6 +1240,8 @@ void CAChannelPut::getDone(struct event_handler_args &args) void CAChannelPut::get() { + channel->threadAttach(); + int result = ca_array_get_callback(getType, channel->getElementCount(), channel->getChannelID(), ca_put_get_handler, this); if (result == ECA_NORMAL) @@ -1388,6 +1401,8 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args) epics::pvData::Status CAChannelMonitor::start() { + channel->threadAttach(); + /* From R3.14.12 onwards when using the IOC server and the C++ client libraries monitor callbacks replies will give a CA client application the current number of elements in an array field, @@ -1419,6 +1434,8 @@ epics::pvData::Status CAChannelMonitor::start() epics::pvData::Status CAChannelMonitor::stop() { + channel->threadAttach(); + int result = ca_clear_subscription(eventID); if (result == ECA_NORMAL) @@ -1466,6 +1483,8 @@ void CAChannelMonitor::cancel() void CAChannelMonitor::destroy() { + channel->threadAttach(); + ca_clear_subscription(eventID); // TODO diff --git a/src/ca/caChannel.h b/src/ca/caChannel.h index 2966e7d..ebeb437 100644 --- a/src/ca/caChannel.h +++ b/src/ca/caChannel.h @@ -12,6 +12,8 @@ /* for CA */ #include +#include + namespace epics { namespace pvAccess { namespace ca { @@ -24,7 +26,7 @@ class CAChannel : public: POINTER_DEFINITIONS(CAChannel); - static shared_pointer create(ChannelProvider::shared_pointer const & channelProvider, + static shared_pointer create(CAChannelProvider::shared_pointer const & channelProvider, std::string const & channelName, short priority, ChannelRequester::shared_pointer const & channelRequester); @@ -95,19 +97,21 @@ public: /* ---------------------------------------------------------------- */ + void threadAttach(); + void registerRequest(ChannelRequest::shared_pointer const & request); void unregisterRequest(ChannelRequest::shared_pointer const & request); private: CAChannel(std::string const & channelName, - ChannelProvider::shared_pointer const & channelProvider, + CAChannelProvider::shared_pointer const & channelProvider, ChannelRequester::shared_pointer const & channelRequester); void activate(short priority); std::string channelName; - ChannelProvider::shared_pointer channelProvider; + CAChannelProvider::shared_pointer channelProvider; ChannelRequester::shared_pointer channelRequester; chid channelID; diff --git a/src/ca/caProvider.cpp b/src/ca/caProvider.cpp index de75c4b..7734334 100644 --- a/src/ca/caProvider.cpp +++ b/src/ca/caProvider.cpp @@ -27,7 +27,7 @@ using namespace epics::pvAccess::ca; std::string CAChannelProvider::PROVIDER_NAME = "ca"; -CAChannelProvider::CAChannelProvider() +CAChannelProvider::CAChannelProvider() : current_context(0) { initialize(); } @@ -75,6 +75,8 @@ Channel::shared_pointer CAChannelProvider::createChannel( ChannelRequester::shared_pointer const & channelRequester, short priority) { + threadAttach(); + static std::string emptyString; return createChannel(channelName, channelRequester, priority, emptyString); } @@ -119,6 +121,11 @@ void CAChannelProvider::destroy() ca_context_destroy(); } +void CAChannelProvider::threadAttach() +{ + ca_attach_context(current_context); +} + void CAChannelProvider::registerChannel(Channel::shared_pointer const & channel) { Lock lock(channelsMutex); @@ -140,6 +147,8 @@ void CAChannelProvider::initialize() "to start channel access:") + ca_message(result)); } + current_context = ca_current_context(); + // TODO create a ca_poll thread, if ca_disable_preemptive_callback } diff --git a/src/ca/caProvider.h b/src/ca/caProvider.h index 66f5999..c0beff5 100644 --- a/src/ca/caProvider.h +++ b/src/ca/caProvider.h @@ -7,6 +7,8 @@ #ifndef CAPROVIDER_H #define CAPROVIDER_H +#include + #include #include @@ -57,6 +59,8 @@ public: /* ---------------------------------------------------------------- */ + void threadAttach(); + void registerChannel(Channel::shared_pointer const & channel); void unregisterChannel(Channel::shared_pointer const & channel); @@ -64,6 +68,8 @@ private: void initialize(); + ca_client_context* current_context; + epics::pvData::Mutex channelsMutex; // TODO std::unordered_map // void* is not the nicest thing, but there is no fast weak_ptr== diff --git a/src/pva/pvaVersion.h b/src/pva/pvaVersion.h index c33bdf8..73d02a1 100644 --- a/src/pva/pvaVersion.h +++ b/src/pva/pvaVersion.h @@ -26,7 +26,7 @@ // TODO to be generated, etc. #define EPICS_PVA_MAJOR_VERSION 4 #define EPICS_PVA_MINOR_VERSION 0 -#define EPICS_PVA_MAINTENANCE_VERSION 2 +#define EPICS_PVA_MAINTENANCE_VERSION 3 #define EPICS_PVA_DEVELOPMENT_FLAG 0 namespace epics { diff --git a/src/server/responseHandlers.cpp b/src/server/responseHandlers.cpp index bf7b251..e0f783c 100644 --- a/src/server/responseHandlers.cpp +++ b/src/server/responseHandlers.cpp @@ -14,6 +14,8 @@ #endif #include +#include +#include #include #include @@ -27,8 +29,6 @@ #include #include -#include - #include #include #include @@ -206,6 +206,8 @@ std::string ServerSearchHandler::SUPPORTED_PROTOCOL = "tcp"; ServerSearchHandler::ServerSearchHandler(ServerContextImpl::shared_pointer const & context) : AbstractServerResponseHandler(context, "Search request"), _providers(context->getChannelProviders()) { + // initialize random seed with some random value + srand ( time(NULL) ); } void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom, @@ -254,8 +256,11 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom, transport->ensureData(2); const int32 count = payloadBuffer->getShort() & 0xFFFF; + // TODO DoS attack? const bool responseRequired = (QOS_REPLY_REQUIRED & qosCode) != 0; + // TODO bloom filter or similar server selection (by GUID) + // // locally broadcast if unicast (qosCode & 0x80 == 0x80) // @@ -306,12 +311,17 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom, { if (allowed) { + // TODO constant + #define MAX_SERVER_SEARCH_RESPONSE_DELAY_MS 100 + double period = (rand() % MAX_SERVER_SEARCH_RESPONSE_DELAY_MS)/(double)1000; + ServerChannelFindRequesterImpl* pr = new ServerChannelFindRequesterImpl(_context, 1); pr->set("", searchSequenceId, 0, responseAddress, true, true); + // TODO use std::make_shared std::tr1::shared_ptr tp(pr); - ChannelFindRequester::shared_pointer spr = tp; - spr->channelFindResult(Status::Ok, ChannelFind::shared_pointer(), false); + TimerCallback::shared_pointer tc = tp; + _context->getTimer()->scheduleAfterDelay(tc, period); } } } @@ -335,6 +345,16 @@ void ServerChannelFindRequesterImpl::clear() _serverSearch = false; } +void ServerChannelFindRequesterImpl::callback() +{ + channelFindResult(Status::Ok, ChannelFind::shared_pointer(), false); +} + +void ServerChannelFindRequesterImpl::timerStopped() +{ + // noop +} + ServerChannelFindRequesterImpl* ServerChannelFindRequesterImpl::set(std::string name, int32 searchSequenceId, int32 cid, osiSockAddr const & sendTo, bool responseRequired, bool serverSearch) { diff --git a/src/server/responseHandlers.h b/src/server/responseHandlers.h index 35b21d6..c0bc95a 100644 --- a/src/server/responseHandlers.h +++ b/src/server/responseHandlers.h @@ -7,6 +7,8 @@ #ifndef RESPONSEHANDLERS_H_ #define RESPONSEHANDLERS_H_ +#include + #include #include #include @@ -161,6 +163,7 @@ namespace pvAccess { class ServerChannelFindRequesterImpl: public ChannelFindRequester, public TransportSender, + public epics::pvData::TimerCallback, public std::tr1::enable_shared_from_this { public: @@ -170,9 +173,14 @@ namespace pvAccess { ServerChannelFindRequesterImpl* set(std::string _name, epics::pvData::int32 searchSequenceId, epics::pvData::int32 cid, osiSockAddr const & sendTo, bool responseRequired, bool serverSearch); void channelFindResult(const epics::pvData::Status& status, ChannelFind::shared_pointer const & channelFind, bool wasFound); - void lock(); + + void lock(); void unlock(); void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); + + void callback(); + void timerStopped(); + private: GUID _guid; std::string _name; diff --git a/src/server/serverContext.cpp b/src/server/serverContext.cpp index 460bcae..48c7812 100644 --- a/src/server/serverContext.cpp +++ b/src/server/serverContext.cpp @@ -76,6 +76,16 @@ const Version& ServerContextImpl::getVersion() return ServerContextImpl::VERSION; } +/* +#ifdef WIN32 + UUID uuid; + UuidCreate ( &uuid ); +#else + uuid_t uuid; + uuid_generate_random ( uuid ); +#endif +*/ + void ServerContextImpl::generateGUID() { // TODO use UUID