diff --git a/configure/CONFIG_SITE b/configure/CONFIG_SITE index b9b5a43..534d470 100644 --- a/configure/CONFIG_SITE +++ b/configure/CONFIG_SITE @@ -45,5 +45,7 @@ INSTALL_INCLUDE = $(INSTALL_LOCATION)/include/pv USR_INCLUDES += -I $(INSTALL_LOCATION)/include USR_CXXFLAGS += -Wall -Wextra +#USR_CPPFLAGS += -DPV_MB -g -ggdb + -include $(TOP)/configure/CONFIG_SITE.local -include $(TOP)/../CONFIG.local diff --git a/pvAccessApp/client/pvAccess.h b/pvAccessApp/client/pvAccess.h index 210756b..1755dc2 100644 --- a/pvAccessApp/client/pvAccess.h +++ b/pvAccessApp/client/pvAccess.h @@ -92,7 +92,6 @@ namespace pvAccess { /** * Base interface for all channel requests. - * @author mse */ class ChannelRequest : public epics::pvData::Destroyable, public Lockable, private epics::pvData::NoDefaultMethods { public: @@ -102,8 +101,6 @@ namespace pvAccess { /** * Request to put and get Array Data. * The data is either taken from or put in the PVArray returned by ChannelArrayRequester.channelArrayConnect. - * @author mrk - * */ class ChannelArray : public ChannelRequest{ public: @@ -136,8 +133,6 @@ namespace pvAccess { /** * The epics::pvData::Requester for a ChannelArray. - * @author mrk - * */ class ChannelArrayRequester : virtual public epics::pvData::Requester { public: @@ -175,7 +170,6 @@ namespace pvAccess { /** - * @author mrk * */ class ChannelFind : public epics::pvData::Destroyable, private epics::pvData::NoDefaultMethods { @@ -187,7 +181,6 @@ namespace pvAccess { }; /** - * @author mrk * */ class ChannelFindRequester { @@ -204,8 +197,6 @@ namespace pvAccess { /** * Request to get data from a channel. - * @author mrk - * */ class ChannelGet : public ChannelRequest { public: @@ -223,8 +214,6 @@ namespace pvAccess { /** * epics::pvData::Requester for channelGet. - * @author mrk - * */ class ChannelGetRequester : virtual public epics::pvData::Requester { public: @@ -250,8 +239,6 @@ namespace pvAccess { /** * ChannelProcess - request that a channel be processed.. - * @author mrk - * */ class ChannelProcess : public ChannelRequest { public: @@ -269,8 +256,6 @@ namespace pvAccess { /** * epics::pvData::Requester for channelProcess. - * @author mrk - * */ class ChannelProcessRequester : virtual public epics::pvData::Requester { public: @@ -294,8 +279,6 @@ namespace pvAccess { /** * Interface for a channel access put request. - * @author mrk - * */ class ChannelPut : public ChannelRequest { public: @@ -318,8 +301,6 @@ namespace pvAccess { /** * epics::pvData::Requester for ChannelPut. - * @author mrk - * */ class ChannelPutRequester : virtual public epics::pvData::Requester { public: @@ -352,8 +333,6 @@ namespace pvAccess { /** * Channel access put/get request. * The put is performed first, followed optionally by a process request, and then by a get request. - * @author mrk - * */ class ChannelPutGet : public ChannelRequest { public: @@ -381,8 +360,6 @@ namespace pvAccess { /** * epics::pvData::Requester for ChannelPutGet. - * @author mrk - * */ class ChannelPutGetRequester : virtual public epics::pvData::Requester { @@ -420,8 +397,6 @@ namespace pvAccess { /** * epics::pvData::Requester for channelGet. - * @author mrk - * */ class ChannelRPC : public ChannelRequest { public: @@ -439,8 +414,6 @@ namespace pvAccess { /** * epics::pvData::Requester for channelGet. - * @author mrk - * */ class ChannelRPCRequester : virtual public epics::pvData::Requester { public: @@ -464,8 +437,6 @@ namespace pvAccess { /** * epics::pvData::Requester for a getStructure request. - * @author mrk - * */ class GetFieldRequester : virtual public epics::pvData::Requester { public: @@ -486,8 +457,6 @@ namespace pvAccess { /** * Interface for accessing a channel. * A channel is created via a call to ChannelAccess.createChannel(String channelName). - * @author mrk - * @author msekoranja */ class Channel : public epics::pvData::Requester, @@ -662,8 +631,6 @@ namespace pvAccess { /** * Listener for connect state changes. - * @author mrk - * */ class ChannelRequester : public virtual epics::pvData::Requester { public: @@ -684,11 +651,16 @@ namespace pvAccess { virtual void channelStateChange(Channel::shared_pointer const & channel, Channel::ConnectionState connectionState) = 0; }; + /** + * @brief The FlushStrategy enum + */ + enum FlushStrategy { + IMMEDIATE, DELAYED, USER_CONTROLED + }; + /** * Interface implemented by code that can provide access to the record * to which a channel connects. - * @author mrk - * */ class ChannelProvider : public epics::pvData::Destroyable, private epics::pvData::NoDefaultMethods { public: @@ -742,12 +714,15 @@ namespace pvAccess { */ virtual Channel::shared_pointer createChannel(epics::pvData::String const & channelName,ChannelRequester::shared_pointer const & channelRequester, short priority, epics::pvData::String const & address) = 0; + + virtual void configure(epics::pvData::PVStructure::shared_pointer /*configuration*/) {}; + virtual void flush() {}; + virtual void poll() {}; + }; /** * Interface for locating channel providers. - * @author mrk - * */ class ChannelAccess : private epics::pvData::NoDefaultMethods { public: @@ -777,8 +752,6 @@ namespace pvAccess { /** * Interface for creating request structure. - * @author mse - * */ class CreateRequest { public: diff --git a/pvAccessApp/remote/blockingClientTCPTransport.cpp b/pvAccessApp/remote/blockingClientTCPTransport.cpp index 70c9869..660b0f2 100644 --- a/pvAccessApp/remote/blockingClientTCPTransport.cpp +++ b/pvAccessApp/remote/blockingClientTCPTransport.cpp @@ -40,7 +40,7 @@ namespace epics { acquire(client); // use immediate for clients - setSendQueueFlushStrategy(DELAYED); + setFlushStrategy(DELAYED); // setup connection timeout timer (watchdog) epicsTimeGetCurrent(&_aliveTimestamp); diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp index e9be221..b8a5aec 100644 --- a/pvAccessApp/remote/blockingServerTCPTransport.cpp +++ b/pvAccessApp/remote/blockingServerTCPTransport.cpp @@ -27,7 +27,7 @@ namespace pvAccess { _lastChannelSID(0) { // for performance testing - setSendQueueFlushStrategy(DELAYED); + setFlushStrategy(DELAYED); _delay = 0.000; // NOTE: priority not yet known, default priority is used to register/unregister diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index 782ba1e..8af5f0f 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -29,6 +29,9 @@ #include #include +// not implemented anyway +#define FLOW_CONTROL 0 + namespace epics { namespace pvAccess { @@ -38,10 +41,6 @@ namespace epics { READ_FROM_SOCKET, PROCESS_HEADER, PROCESS_PAYLOAD, UNDEFINED_STAGE }; - enum SendQueueFlushStrategy { - IMMEDIATE, DELAYED, USER_CONTROLED - }; - class BlockingTCPTransport : public Transport, public TransportSendControl, @@ -140,6 +139,14 @@ namespace epics { virtual void alignData(std::size_t alignment); + virtual bool directSerialize(epics::pvData::ByteBuffer *existingBuffer, const char* toSerialize, + std::size_t elementCount, std::size_t elementSize); + + virtual bool directDeserialize(epics::pvData::ByteBuffer *existingBuffer, char* deserializeTo, + std::size_t elementCount, std::size_t elementSize); + + void processReadIntoDirectBuffer(std::size_t bytesToRead); + virtual void close(); virtual void setByteOrder(int /*byteOrder*/) @@ -147,11 +154,11 @@ namespace epics { // not used this this implementation } - SendQueueFlushStrategy getSendQueueFlushStrategy() { + FlushStrategy getFlushStrategy() { return _flushStrategy; } - void setSendQueueFlushStrategy(SendQueueFlushStrategy flushStrategy) { + void setFlushStrategy(FlushStrategy flushStrategy) { _flushStrategy = flushStrategy; } @@ -217,12 +224,12 @@ namespace epics { virtual ~BlockingTCPTransport(); - +#if FLOW_CONTROL /** * Default marker period. */ static const std::size_t MARKER_PERIOD = 1024; - +#endif static const std::size_t MAX_ENSURE_DATA_BUFFER_SIZE = 1024; // TODO @@ -271,7 +278,7 @@ namespace epics { epics::pvData::int64 _markerPeriodBytes; - SendQueueFlushStrategy _flushStrategy; + FlushStrategy _flushStrategy; epicsThreadId _rcvThreadId; @@ -315,18 +322,20 @@ namespace epics { epics::pvData::Mutex _sendQueueMutex; // initialized at construction time - std::deque _monitorSendQueue; - epics::pvData::Mutex _monitorMutex; +// std::deque _monitorSendQueue; +// epics::pvData::Mutex _monitorMutex; /** * Send buffer. */ epics::pvData::ByteBuffer* _sendBuffer; +#if FLOW_CONTROL /** * Next planned marker position. */ epics::pvData::int64 _nextMarkerPosition; +#endif /** * Send pending flag. @@ -375,11 +384,16 @@ namespace epics { ReceiveStage _stage; + std::size_t _directPayloadRead; + char * _directBuffer; + +#if FLOW_CONTROL + /** * Total bytes received. */ epics::pvData::int64 _totalBytesReceived; - +#endif /** * Incoming (codes generated by other party) introspection registry. @@ -416,7 +430,7 @@ namespace epics { - +#if FLOW_CONTROL /** * Marker to send. * NOTE: synced by _flowControlMutex @@ -436,7 +450,7 @@ namespace epics { epics::pvData::int64 _remoteBufferFreeSpace; epics::pvData::Mutex _flowControlMutex; - +#endif private: diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index 57881f8..65bbd99 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -84,7 +84,9 @@ namespace pvAccess { _channel(channel), _priority(priority), _responseHandler(responseHandler), +#if FLOW_CONTROL _markerPeriodBytes(MARKER_PERIOD), +#endif _flushStrategy(DELAYED), _rcvThreadId(0), _sendThreadId(0), @@ -96,7 +98,9 @@ namespace pvAccess { _remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV), _sendQueue(), //_monitorSendQueue(), +#if FLOW_CONTROL _nextMarkerPosition(_markerPeriodBytes), +#endif _sendPending(false), _lastMessageStartPosition(0), _lastSegmentedMessageType(0), @@ -112,13 +116,20 @@ namespace pvAccess { _command(0), _payloadSize(0), _stage(READ_FROM_SOCKET), + _directPayloadRead(0), + _directBuffer(0), +#if FLOW_CONTROL _totalBytesReceived(0), +#endif _closed(), _sendThreadExited(false), - _verified(false), + _verified(false) +#if FLOW_CONTROL + , _markerToSend(0), _totalBytesSent(0), _remoteBufferFreeSpace(INT64_MAX) +#endif { PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(blockingTCPTransport); @@ -227,10 +238,12 @@ namespace pvAccess { } void BlockingTCPTransport::clearAndReleaseBuffer() { +#if FLOW_CONTROL // NOTE: take care that nextMarkerPosition is set right // fix position to be correct when buffer is cleared // do not include pre-buffered flow control message; not 100% correct, but OK _nextMarkerPosition -= _sendBuffer->getPosition() - CA_MESSAGE_HEADER_SIZE; +#endif _sendQueueMutex.lock(); _flushRequested = false; @@ -240,12 +253,14 @@ namespace pvAccess { _sendPending = false; +#if FLOW_CONTROL // prepare ACK marker _sendBuffer->putByte(CA_MAGIC); _sendBuffer->putByte(CA_VERSION); _sendBuffer->putByte(0x01 | _byteOrderFlag); // control data _sendBuffer->putByte(1); // marker ACK _sendBuffer->putInt(0); +#endif } void BlockingTCPTransport::close() { @@ -311,12 +326,13 @@ namespace pvAccess { endMessage(!lastMessageCompleted); bool moreToSend = true; - // TODO closed check !!! while(moreToSend) { moreToSend = !flush(); // all sent, exit if(!moreToSend) break; + // TODO check if this is OK + else if (_closed.get()) THROW_BASE_EXCEPTION("transport closed"); // TODO solve this sleep in a better way epicsThreadSleep(0.01); @@ -378,9 +394,20 @@ namespace pvAccess { // alignBuffer(CA_ALIGNMENT); // set paylaod size - _sendBuffer->putInt(_lastMessageStartPosition+sizeof(int16)+2, - _sendBuffer->getPosition()-_lastMessageStartPosition - -CA_MESSAGE_HEADER_SIZE); + const size_t payloadSize = _sendBuffer->getPosition()-_lastMessageStartPosition-CA_MESSAGE_HEADER_SIZE; + + // TODO by spec? + // ignore empty segmented messages + if (payloadSize == 0 && _lastSegmentedMessageType != 0) + { + _sendBuffer->setPosition(_lastMessageStartPosition); + if (!hasMoreSegments) + _lastSegmentedMessageType = 0; + return; + + } + + _sendBuffer->putInt(_lastMessageStartPosition+sizeof(int16)+2, payloadSize); int flagsPosition = _lastMessageStartPosition+sizeof(int16); // set segmented bit @@ -408,28 +435,31 @@ namespace pvAccess { } } +#if FLOW_CONTROL // manage markers int position = _sendBuffer->getPosition(); int bytesLeft = _sendBuffer->getRemaining(); - if(unlikely(position>=_nextMarkerPosition && + if(unlikely(position>=_nextMarkerPosition && bytesLeft>=CA_MESSAGE_HEADER_SIZE)) { _sendBuffer->putByte(CA_MAGIC); _sendBuffer->putByte(CA_VERSION); _sendBuffer->putByte(0x01 | _byteOrderFlag); // control data _sendBuffer->putByte(0); // marker - _sendBuffer->putInt((int)(_totalBytesSent+position+CA_MESSAGE_HEADER_SIZE)); + s_sendBuffer->putInt((int)(_totalBytesSent+position+CA_MESSAGE_HEADER_SIZE)); _nextMarkerPosition = position+_markerPeriodBytes; } +#endif } } - void BlockingTCPTransport::ensureData(size_t size) { + void BlockingTCPTransport::ensureData(size_t size) { // enough of data? - if(likely(_socketBuffer->getRemaining()>=size)) return; + const size_t remainingBytes = _socketBuffer->getRemaining(); + if (likely(remainingBytes>=size)) return; // too large for buffer... - if(unlikely(MAX_ENSURE_DATA_BUFFER_SIZEgetPosition()-_storedPosition; // no more data and we have some payload left => read buffer - if(likely(_storedPayloadSize>=size)) { + if (likely(_storedPayloadSize>=size)) + { //LOG(logLevelInfo, // "storedPayloadSize >= size, remaining: %d", // _socketBuffer->getRemaining()); @@ -453,63 +484,264 @@ namespace pvAccess { _socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize, _storedLimit)); } - else { - // copy remaining bytes, if any - int remainingBytes = _socketBuffer->getRemaining(); - for(int i = 0; iputByte(i, _socketBuffer->getByte()); - // read what is left + // extend limit to what was read _socketBuffer->setLimit(_storedLimit); _stage = PROCESS_HEADER; processReadCached(true, UNDEFINED_STAGE, size-remainingBytes); - // copy before position - for(int i = remainingBytes-1, j = _socketBuffer->getPosition() - -1; i>=0; i--, j--) - _socketBuffer->putByte(j, _socketBuffer->getByte(i)); - _startPosition = _socketBuffer->getPosition()-remainingBytes; - _socketBuffer->setPosition(_startPosition); + if (unlikely(remainingBytes > 0)) + { + // copy saved back to before position + for(int i = static_cast(remainingBytes-1), j = _socketBuffer->getPosition()-1; + i>=0; + i--, j--) + _socketBuffer->putByte(j, _socketBuffer->getByte(i)); + _startPosition = _socketBuffer->getPosition()-remainingBytes; + _socketBuffer->setPosition(_startPosition); + _storedPosition = _startPosition; + } + else + { + _storedPosition = _socketBuffer->getPosition(); + } - _storedPosition = _startPosition; //socketBuffer.position(); _storedLimit = _socketBuffer->getLimit(); _socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize, _storedLimit)); - // add if missing... + // add if missing, since UNDEFINED_STAGE and return less... if(unlikely(!_closed.get()&&(_socketBuffer->getRemaining()getRemaining()<(alignment-1))) + if (unlikely(_socketBuffer->getRemaining()<(alignment-1))) ensureData(alignment-1); _socketBuffer->align(alignment); } + bool BlockingTCPTransport::directSerialize(ByteBuffer */*existingBuffer*/, const char* toSerialize, + std::size_t elementCount, std::size_t elementSize) + { + // TODO overflow check, size_t type, other is int32 for payloadSize header field !!! + // TODO do not ignore or new field in max message size in connection validation + std::size_t count = elementCount * elementSize; + + // TODO find smart limit + // check if direct mode actually pays off + if (count < 1024) + return false; + + // first end current message indicating the we will segment + endMessage(true); + + // append segmented message header + startMessage(_lastSegmentedMessageCommand, 0); + // set segmented message size + _sendBuffer->putInt(_lastMessageStartPosition+sizeof(int16)+2, count); + + // flush (TODO this is code is duplicated) + bool moreToSend = true; + while (moreToSend) { + moreToSend = !flush(); + + // all sent, exit + if(!moreToSend) break; + // TODO check if this is OK + else if (_closed.get()) THROW_BASE_EXCEPTION("transport closed"); + + // TODO solve this sleep in a better way + epicsThreadSleep(0.01); + } + _lastMessageStartPosition = _sendBuffer->getPosition(); + + // TODO think if alignment is preserved after... + + try { + //LOG(logLevelInfo, + // "Sending (direct) %d bytes in the packet to %s.", + // count, + // inetAddressToString(_socketAddress).c_str()); + const char* ptr = toSerialize; + while(count>0) { + ssize_t bytesSent = ::send(_channel, + ptr, + count, 0); + + if(unlikely(bytesSent<0)) { + + int socketError = SOCKERRNO; + + // spurious EINTR check + if (socketError==SOCK_EINTR) + continue; + + // TODO check this (copy below)... consolidate!!! + if (socketError==SOCK_ENOBUFS) { + // TODO improve this + epicsThreadSleep(0.01); + continue; + } + + // connection lost + + char errStr[64]; + epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); + ostringstream temp; + temp<<"error in sending TCP data: "<getRemaining()); + existingBuffer->getArray(_directBuffer, availableBytes); + _directPayloadRead -= availableBytes; + + if (_directPayloadRead == 0) + return true; + + _directBuffer += availableBytes; + + // subtract what was already processed + size_t pos = _socketBuffer->getPosition(); + _storedPayloadSize -= pos -_storedPosition; + _storedPosition = pos; + + // no more data and we have some payload left => read buffer + if (likely(_storedPayloadSize > 0)) + { + size_t bytesToRead = std::min(_directPayloadRead, _storedPayloadSize); + processReadIntoDirectBuffer(bytesToRead); + // std::cout << "d: " << bytesToRead << std::endl; + _storedPayloadSize -= bytesToRead; + _directPayloadRead -= bytesToRead; + } + + if (_directPayloadRead == 0) + return true; + + _stage = PROCESS_HEADER; + processReadCached(true, UNDEFINED_STAGE, _directPayloadRead); + + _storedPosition = _socketBuffer->getPosition(); + _storedLimit = _socketBuffer->getLimit(); + _socketBuffer->setLimit( + min(_storedPosition + _storedPayloadSize, _storedLimit) + ); + + } + + return true; + } + + void BlockingTCPTransport::processReadIntoDirectBuffer(size_t bytesToRead) + { + while (bytesToRead > 0) + { + ssize_t bytesRead = recv(_channel, _directBuffer, bytesToRead, 0); + + // std::cout << "d: " << bytesRead << std::endl; + + if(unlikely(bytesRead<=0)) + { + + if (bytesRead<0) + { + int socketError = SOCKERRNO; + + // interrupted or timeout + if (socketError == EINTR || + socketError == EAGAIN || + socketError == EWOULDBLOCK) + continue; + } + + // error (disconnect, end-of-stream) detected + close(); + + THROW_BASE_EXCEPTION("bytesRead < 0"); + + return; + } + + bytesToRead -= bytesRead; + _directBuffer += bytesRead; + + } + } + void BlockingTCPTransport::processReadCached(bool nestedCall, ReceiveStage inStage, size_t requiredBytes) { try { + // TODO we need to throw exception in nextedCall not just bail out!!!! while(likely(!_closed.get())) { if(_stage==READ_FROM_SOCKET||inStage!=UNDEFINED_STAGE) { // add to bytes read +#if FLOW_CONTROL int currentPosition = _socketBuffer->getPosition(); _totalBytesReceived += (currentPosition - _startPosition); - +#endif // preserve alignment int currentStartPosition = _startPosition = MAX_ENSURE_DATA_BUFFER_SIZE; // "TODO uncomment align" + (unsigned int)currentPosition % CA_ALIGNMENT; // copy remaining bytes, if any int remainingBytes = _socketBuffer->getRemaining(); + int endPosition = currentStartPosition + remainingBytes; // TODO memmove for(int i = MAX_ENSURE_DATA_BUFFER_SIZE; igetPosition()getPosition(); + ssize_t bytesRead = recv(_channel, (char*)(_socketBuffer->getArray()+pos), - _socketBuffer->getRemaining(), 0); +// _socketBuffer->getRemaining(), 0); +// TODO we assume that caller is smart and requiredBytes > remainingBytes +// if in direct read mode, try to read only header so that rest can be read directly to direct buffers +(_directPayloadRead > 0 && inStage == PROCESS_HEADER) ? (requiredBytes-remainingBytes) : _socketBuffer->getRemaining(), 0); +//std::cout << "i: " << bytesRead << std::endl; if(unlikely(bytesRead<=0)) { @@ -551,7 +788,10 @@ namespace pvAccess { _socketBuffer->setPosition(pos+bytesRead); } - _socketBuffer->setLimit(_socketBuffer->getPosition()); + + std::size_t pos = _socketBuffer->getPosition(); + _storedLimit = pos; + _socketBuffer->setLimit(pos); _socketBuffer->setPosition(currentStartPosition); /* @@ -570,6 +810,10 @@ namespace pvAccess { } if(likely(_stage==PROCESS_HEADER)) { + + // reveal what's already in buffer + _socketBuffer->setLimit(_storedLimit); + // ensure CAConstants.CA_MESSAGE_HEADER_SIZE bytes of data if(unlikely(((int)_socketBuffer->getRemaining())getPosition(); - _storedLimit = _socketBuffer->getLimit(); - _socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize, _storedLimit)); - try { - // handle response - Transport::shared_pointer thisPointer = shared_from_this(); - _responseHandler->handleResponse(&_socketAddress, - thisPointer, _version, _command, _payloadSize, - _socketBuffer); - } catch(...) { - //noop // TODO print? - } + // ignore segmented messages with no payload + if (likely(!notFirstSegment || _payloadSize > 0)) + { + + // NOTE: nested data (w/ payload) messages between segmented messages are not supported + _storedPosition = _socketBuffer->getPosition(); + _storedLimit = _socketBuffer->getLimit(); + _socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize, _storedLimit)); + try { + // handle response + Transport::shared_pointer thisPointer = shared_from_this(); + _responseHandler->handleResponse(&_socketAddress, + thisPointer, _version, _command, _payloadSize, + _socketBuffer); + } catch(...) { + //noop // TODO print? + } + + _socketBuffer->setLimit(_storedLimit); + size_t newPosition = _storedPosition+_storedPayloadSize; + if(unlikely(newPosition>_storedLimit)) { + newPosition -= _storedLimit; + _socketBuffer->setPosition(_storedLimit); + processReadCached(true, PROCESS_PAYLOAD,newPosition); + newPosition += _startPosition; + } + _socketBuffer->setPosition(newPosition); + // TODO discard all possible segments?!!! - _socketBuffer->setLimit(_storedLimit); - size_t newPosition = _storedPosition+_storedPayloadSize; - if(unlikely(newPosition>_storedLimit)) { - newPosition -= _storedLimit; - _socketBuffer->setPosition(_storedLimit); - processReadCached(true, PROCESS_PAYLOAD,newPosition); - newPosition += _startPosition; } - _socketBuffer->setPosition(newPosition); - // TODO discard all possible segments?!!! _stage = PROCESS_HEADER; @@ -720,6 +973,7 @@ namespace pvAccess { // start sending from the start _sendBufferSentPosition = 0; +#if FLOW_CONTROL // if not set skip marker otherwise set it _flowControlMutex.lock(); int markerValue = _markerToSend; @@ -729,6 +983,7 @@ namespace pvAccess { _sendBufferSentPosition = CA_MESSAGE_HEADER_SIZE; else _sendBuffer->putInt(4, markerValue); +#endif } bool success = false; @@ -842,9 +1097,11 @@ namespace pvAccess { buffer->setPosition(buffer->getPosition()+bytesSent); +#if FLOW_CONTROL _flowControlMutex.lock(); _totalBytesSent += bytesSent; _flowControlMutex.unlock(); +#endif // readjust limit if(bytesToSend==maxBytesToSend) { @@ -886,7 +1143,11 @@ namespace pvAccess { if(_delay>0) epicsThreadSleep(_delay); if(unlikely(_sendQueue.empty())) { // if (hasMonitors || sendBuffer.position() > CAConstants.CA_MESSAGE_HEADER_SIZE) +#if FLOW_CONTROL if(((int)_sendBuffer->getPosition())>CA_MESSAGE_HEADER_SIZE) +#else + if(((int)_sendBuffer->getPosition())>0) +#endif _flushRequested = true; else _sendQueueEvent.wait(); @@ -928,7 +1189,6 @@ namespace pvAccess { flush(true); else endMessage(false);// automatic end (to set payload) - } catch(std::exception &e) { //LOG(logLevelError, "%s", e.what()); _sendBuffer->setPosition(_lastMessageStartPosition); diff --git a/pvAccessApp/remote/blockingUDP.h b/pvAccessApp/remote/blockingUDP.h index 9dcb8d2..c2d2f6b 100644 --- a/pvAccessApp/remote/blockingUDP.h +++ b/pvAccessApp/remote/blockingUDP.h @@ -119,6 +119,8 @@ namespace epics { virtual void enqueueSendRequest(TransportSender::shared_pointer const & sender); + virtual void flushSendQueue(); + void start(); virtual void close(); @@ -131,6 +133,18 @@ namespace epics { _receiveBuffer->align(alignment); } + virtual bool directSerialize(epics::pvData::ByteBuffer */*existingBuffer*/, const char* /*toSerialize*/, + std::size_t /*elementCount*/, std::size_t /*elementSize*/) + { + return false; + } + + virtual bool directDeserialize(epics::pvData::ByteBuffer */*existingBuffer*/, char* /*deserializeTo*/, + std::size_t /*elementCount*/, std::size_t /*elementSize*/) + { + return false; + } + virtual void startMessage(epics::pvData::int8 command, std::size_t ensureCapacity); virtual void endMessage(); diff --git a/pvAccessApp/remote/blockingUDPTransport.cpp b/pvAccessApp/remote/blockingUDPTransport.cpp index d84397a..da634ec 100644 --- a/pvAccessApp/remote/blockingUDPTransport.cpp +++ b/pvAccessApp/remote/blockingUDPTransport.cpp @@ -145,6 +145,12 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so } } + + void BlockingUDPTransport::flushSendQueue() + { + // noop (note different sent addresses are possible) + } + void BlockingUDPTransport::startMessage(int8 command, size_t /*ensureCapacity*/) { _lastMessageStartPosition = _sendBuffer->getPosition(); _sendBuffer->putByte(CA_MAGIC); diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index acef8d2..aac0956 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -241,6 +241,11 @@ namespace epics { */ virtual void enqueueSendRequest(TransportSender::shared_pointer const & sender) = 0; + /** + * Flush send queue (sent messages). + */ + virtual void flushSendQueue() = 0; + /** * Notify transport that it is has been verified. */ diff --git a/pvAccessApp/remote/simpleChannelSearchManagerImpl.h b/pvAccessApp/remote/simpleChannelSearchManagerImpl.h index 9cac07c..c793f28 100644 --- a/pvAccessApp/remote/simpleChannelSearchManagerImpl.h +++ b/pvAccessApp/remote/simpleChannelSearchManagerImpl.h @@ -32,6 +32,11 @@ public: // no cache field->serialize(buffer, this); } + virtual bool directSerialize(epics::pvData::ByteBuffer */*existingBuffer*/, const char* /*toSerialize*/, + std::size_t /*elementCount*/, std::size_t /*elementSize*/) + { + return false; + } }; diff --git a/pvAccessApp/remoteClient/clientContextImpl.cpp b/pvAccessApp/remoteClient/clientContextImpl.cpp index 63d1bd6..e298922 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.cpp +++ b/pvAccessApp/remoteClient/clientContextImpl.cpp @@ -3105,6 +3105,27 @@ namespace epics { // NOTE it's up to internal code to respond w/ error to requester and return 0 in case of errors } + virtual void configure(epics::pvData::PVStructure::shared_pointer configuration) + { + std::tr1::shared_ptr context = m_context.lock(); + if (context.get()) + context->configure(configuration); + } + + virtual void flush() + { + std::tr1::shared_ptr context = m_context.lock(); + if (context.get()) + context->flush(); + } + + virtual void poll() + { + std::tr1::shared_ptr context = m_context.lock(); + if (context.get()) + context->poll(); + } + ~ChannelProviderImpl() {}; private: @@ -4506,6 +4527,25 @@ TODO } } + virtual void configure(epics::pvData::PVStructure::shared_pointer /*configuration*/) + { + // TODO + } + + virtual void flush() + { + // TODO not OK, since new object is created by toArray() call + std::auto_ptr transports = m_transportRegistry->toArray(); + TransportRegistry::transportVector_t::const_iterator iter = transports->begin(); + while (iter != transports->end()) + (*iter)->flushSendQueue(); + } + + virtual void poll() + { + // TODO + } + /** * Get channel search manager. * @return channel search manager. diff --git a/pvAccessApp/remoteClient/clientContextImpl.h b/pvAccessApp/remoteClient/clientContextImpl.h index 71bc010..d8fad15 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.h +++ b/pvAccessApp/remoteClient/clientContextImpl.h @@ -108,6 +108,9 @@ namespace epics { virtual std::tr1::shared_ptr getBeaconHandler(osiSockAddr* responseFrom) = 0; + virtual void configure(epics::pvData::PVStructure::shared_pointer configuration) = 0; + virtual void flush() = 0; + virtual void poll() = 0; }; extern ClientContextImpl::shared_pointer createClientContextImpl(); diff --git a/pvAccessApp/rpcService/rpcServer.cpp b/pvAccessApp/rpcService/rpcServer.cpp index d71f6ca..b509f21 100644 --- a/pvAccessApp/rpcService/rpcServer.cpp +++ b/pvAccessApp/rpcService/rpcServer.cpp @@ -380,10 +380,10 @@ public: ChannelRequester::shared_pointer const & /*channelRequester*/, short /*priority*/, epics::pvData::String const & /*address*/) - { + { // this will never get called by the pvAccess server throw std::runtime_error("not supported"); - } + } void registerService(String const & serviceName, RPCService::shared_pointer const & service) {