diff --git a/pvAccessApp/Makefile b/pvAccessApp/Makefile index b093ac9..be47a78 100644 --- a/pvAccessApp/Makefile +++ b/pvAccessApp/Makefile @@ -38,8 +38,10 @@ LIBSRCS += CreateRequestFactory.cpp SRC_DIRS += $(PVACCESS)/remote INC += remote.h INC += blockingUDP.h +INC += blockingTCP.h LIBSRCS += blockingUDPTransport.cpp LIBSRCS += blockingUDPConnector.cpp +LIBSRCS += blockingTCPTransport.cpp LIBRARY = pvAccess diff --git a/pvAccessApp/ca/caConstants.h b/pvAccessApp/ca/caConstants.h index 7956067..adf89d5 100644 --- a/pvAccessApp/ca/caConstants.h +++ b/pvAccessApp/ca/caConstants.h @@ -8,6 +8,10 @@ #ifndef CONSTANTS_H_ #define CONSTANTS_H_ +#include + +using namespace epics::pvData; + namespace epics { namespace pvAccess { diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h new file mode 100644 index 0000000..c588330 --- /dev/null +++ b/pvAccessApp/remote/blockingTCP.h @@ -0,0 +1,269 @@ +/* + * blockingTCP.h + * + * Created on: Dec 29, 2010 + * Author: Miha Vitorovic + */ + +#ifndef BLOCKINGTCP_H_ +#define BLOCKINGTCP_H_ + +/* pvAccess */ +#include "caConstants.h" +#include "remote.h" + +/* pvData */ +#include +#include +#include + +/* EPICSv3 */ +#include +#include + +using namespace epics::pvData; + +namespace epics { + namespace pvAccess { + + enum ReceiveStage { + READ_FROM_SOCKET, PROCESS_HEADER, PROCESS_PAYLOAD, NONE + }; + + class BlockingTCPTransport : public Transport, + public TransportSendControl { + public: + BlockingTCPTransport(SOCKET channel, + ResponseHandler* responseHandler, int receiveBufferSize, + short priority); + + bool isClosed() const { + return _closed; + } + + void setRemoteMinorRevision(int minorRevision) { + _remoteTransportRevision = minorRevision; + } + + void setRemoteTransportReceiveBufferSize( + int remoteTransportReceiveBufferSize) { + _remoteTransportReceiveBufferSize + = remoteTransportReceiveBufferSize; + } + + void setRemoteTransportSocketReceiveBufferSize( + int socketReceiveBufferSize) { + _remoteTransportSocketReceiveBufferSize + = socketReceiveBufferSize; + } + + virtual const String getType() const { + return String("TCP"); + } + + virtual void aliveNotification() { + // noop + } + + virtual void changedTransport() { + // noop + } + + virtual const osiSockAddr* getRemoteAddress() const { + return _socketAddress; + } + + virtual int16 getPriority() const { + return _priority; + } + + virtual int getReceiveBufferSize() const { + return _socketBuffer->getSize(); + } + + virtual int getSocketReceiveBufferSize() const; + + virtual bool isVerified() const { + Lock lock(_verifiedMutex); + return _verified; + } + + virtual void verified() { + Lock lock(_verifiedMutex); + _verified = true; + } + + virtual void setRecipient(const osiSockAddr* sendTo) { + // noop + } + + /** + * @param[in] timeout Timeout in seconds + */ + bool waitUntilVerified(double timeout); + + virtual void flush(bool lastMessageCompleted); + virtual void startMessage(int8 command, int ensureCapacity); + virtual void endMessage(); + + virtual void flushSerializeBuffer() { + flush(false); + } + + virtual void ensureBuffer(int size); + + virtual void ensureData(int size); + + protected: + /** + * Connection status + */ + bool volatile _closed; + + /** + * Corresponding channel. + */ + SOCKET _channel; + + /** + * Cached socket address. + */ + osiSockAddr* _socketAddress; + + /** + * Send buffer. + */ + ByteBuffer* _sendBuffer; + + /** + * Remote side transport revision (minor). + */ + int8 _remoteTransportRevision; + + /** + * Remote side transport receive buffer size. + */ + int _remoteTransportReceiveBufferSize; + + /** + * Remote side transport socket receive buffer size. + */ + int _remoteTransportSocketReceiveBufferSize; + + /** + * Priority. + * NOTE: Priority cannot just be changed, since it is registered in transport registry with given priority. + */ + short _priority; + // TODO to be implemeneted + + /** + * CAS response handler. + */ + ResponseHandler* _responseHandler; + + /** + * Read sync. object monitor. + */ + //Object _readMonitor = new Object(); + + /** + * Total bytes received. + */ + int64 volatile _totalBytesReceived; + + /** + * Total bytes sent. + */ + int64 volatile _totalBytesSent; + + /** + * Marker to send. + */ + int volatile _markerToSend; + + bool _verified; + + int64 volatile _remoteBufferFreeSpace; + + void processReadCached(bool nestedCall, ReceiveStage inStage, + int requiredBytes, bool addToBuffer); + + private: + /** + * Default marker period. + */ + static const int MARKER_PERIOD = 1024; + + static const int MAX_ENSURE_DATA_BUFFER_SIZE = 1024; + + /** + * Send buffer size. + */ + int _maxPayloadSize; + + /** + * Send buffer size. + */ + int _socketSendBufferSize; + + /** + * Marker "period" in bytes (every X bytes marker should be set). + */ + int64 _markerPeriodBytes; + + /** + * Next planned marker position. + */ + int64 _nextMarkerPosition; + + /** + * Send pending flag. + */ + bool _sendPending; + + /** + * Last message start position. + */ + int _lastMessageStartPosition; + + ByteBuffer* _socketBuffer; + + int _startPosition; + + Mutex* _mutex; + Mutex* _sendQueueMutex; + Mutex* _verifiedMutex; + Mutex* _monitorMutex; + + ReceiveStage _stage; + + int8 _lastSegmentedMessageType; + int8 _lastSegmentedMessageCommand; + + int _storedPayloadSize; + int _storedPosition; + int _storedLimit; + + short _magicAndVersion; + int8 _packetType; + int8 _command; + int _payloadSize; + + bool _flushRequested; + + /** + * Internal method that clears and releases buffer. + * sendLock and sendBufferLock must be hold while calling this method. + */ + void clearAndReleaseBuffer(); + + void endMessage(bool hasMoreSegments); + + bool flush(); + }; + + } +} + +#endif /* BLOCKINGTCP_H_ */ diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp new file mode 100644 index 0000000..f2ad49a --- /dev/null +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -0,0 +1,518 @@ +/* + * blockingTCPTransport.cpp + * + * Created on: Dec 29, 2010 + * Author: Miha Vitorovic + */ + +#include "blockingTCP.h" +#include "inetAddressUtil.h" + +/* pvData */ +#include +#include +#include + +/* EPICSv3 */ +#include +#include +#include +#include + +/* standard */ +#include +#include +#include +#include + +using namespace epics::pvData; + +using std::max; +using std::min; +using std::ostringstream; + +namespace epics { + namespace pvAccess { + + BlockingTCPTransport::BlockingTCPTransport(SOCKET channel, + ResponseHandler* responseHandler, int receiveBufferSize, + short priority) : + _closed(false), _channel(channel), _remoteTransportRevision(0), + _remoteTransportReceiveBufferSize(MAX_TCP_RECV), + _remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV), + _priority(priority), _responseHandler(responseHandler), + _totalBytesReceived(0), _totalBytesSent(0), + _markerToSend(0), _verified(false), _remoteBufferFreeSpace( + LONG_LONG_MAX), _markerPeriodBytes(MARKER_PERIOD), + _nextMarkerPosition(_markerPeriodBytes), + _sendPending(false), _lastMessageStartPosition(0), _mutex( + new Mutex()), _sendQueueMutex(new Mutex()), + _verifiedMutex(new Mutex()), _monitorMutex(new Mutex()), + _stage(READ_FROM_SOCKET), _lastSegmentedMessageType(0), + _lastSegmentedMessageCommand(0), _storedPayloadSize(0), + _storedPosition(0), _storedLimit(0), _magicAndVersion(0), + _packetType(0), _command(0), _payloadSize(0), + _flushRequested(false) { + + _socketBuffer = new ByteBuffer(max(MAX_TCP_RECV + +MAX_ENSURE_DATA_BUFFER_SIZE, receiveBufferSize)); + _socketBuffer->setPosition(_socketBuffer->getLimit()); + _startPosition = _socketBuffer->getPosition(); + + // allocate buffer + _sendBuffer = new ByteBuffer(_socketBuffer->getSize()); + _maxPayloadSize = _sendBuffer->getSize()-2*CA_MESSAGE_HEADER_SIZE; // one for header, one for flow control + + // get send buffer size + + + socklen_t intLen = sizeof(int); + + int retval = getsockopt(_channel, SOL_SOCKET, SO_SNDBUF, + &_socketSendBufferSize, &intLen); + if(retval<0) { + _socketSendBufferSize = MAX_TCP_RECV; + errlogSevPrintf(errlogMinor, + "Unable to retrieve socket send buffer size: %s", + strerror(errno)); + } + + socklen_t saSize = sizeof(sockaddr); + retval = getpeername(_channel, &(_socketAddress->sa), &saSize); + if(retval<0) { + errlogSevPrintf(errlogMajor, + "Error fetching socket remote address: %s", strerror( + errno)); + } + + // prepare buffer + clearAndReleaseBuffer(); + + // TODO: add to registry + //context.getTransportRegistry().put(this); + + } + + void BlockingTCPTransport::clearAndReleaseBuffer() { + // NOTE: take care that nextMarkerPosition is set right + // fix position to be correct when buffer is cleared + // do not include pre-buffered flow control message; not 100% correct, but OK + _nextMarkerPosition -= _sendBuffer->getPosition() + -CA_MESSAGE_HEADER_SIZE; + + _sendQueueMutex->lock(); + _flushRequested = false; + _sendQueueMutex->unlock(); + + _sendBuffer->clear(); + + _sendPending = false; + + // prepare ACK marker + _sendBuffer->putShort(CA_MAGIC_AND_VERSION); + _sendBuffer->putByte(1); // control data + _sendBuffer->putByte(1); // marker ACK + _sendBuffer->putInt(0); + } + + int BlockingTCPTransport::getSocketReceiveBufferSize() const { + // Get value of the SO_RCVBUF option for this DatagramSocket, + // that is the buffer size used by the platform for input on + // this DatagramSocket. + + int sockBufSize; + socklen_t intLen = sizeof(int); + + int retval = getsockopt(_channel, SOL_SOCKET, SO_RCVBUF, + &sockBufSize, &intLen); + if(retval<0) errlogSevPrintf(errlogMajor, + "Socket getsockopt SO_RCVBUF error: %s", strerror(errno)); + + return sockBufSize; + } + + bool BlockingTCPTransport::waitUntilVerified(double timeout) { + double internalTimeout = timeout; + bool internalVerified = false; + + _verifiedMutex->lock(); + internalVerified = _verified; + _verifiedMutex->unlock(); + + while(!internalVerified&&internalTimeout<=0) { + epicsThreadSleep(min(0.1, internalTimeout)); + internalTimeout -= 0.1; + + _verifiedMutex->lock(); + internalVerified = _verified; + _verifiedMutex->unlock(); + } + return internalVerified; + } + + void BlockingTCPTransport::flush(bool lastMessageCompleted) { + + // automatic end + endMessage(!lastMessageCompleted); + + bool moreToSend = true; + // TODO closed check !!! + while(moreToSend) { + moreToSend = !flush(); + + // all sent, exit + if(!moreToSend) break; + + // TODO solve this sleep in a better way + epicsThreadSleep(0.01); + } + + _lastMessageStartPosition = _sendBuffer->getPosition(); + // start with last header + if(!lastMessageCompleted&&_lastSegmentedMessageType!=0) startMessage( + _lastSegmentedMessageCommand, 0); + } + + void BlockingTCPTransport::startMessage(int8 command, + int ensureCapacity) { + _lastMessageStartPosition = -1; + ensureBuffer(CA_MESSAGE_HEADER_SIZE+ensureCapacity); + _lastMessageStartPosition = _sendBuffer->getPosition(); + _sendBuffer->putShort(CA_MAGIC_AND_VERSION); + _sendBuffer->putByte(_lastSegmentedMessageType); // data + _sendBuffer->putByte(command); // command + _sendBuffer->putInt(0); // temporary zero payload + + } + + void BlockingTCPTransport::endMessage() { + endMessage(false); + } + + void BlockingTCPTransport::ensureBuffer(int size) { + if(_sendBuffer->getRemaining()>=size) return; + + // too large for buffer... + if(_maxPayloadSizegetRemaining()=0) { + + // set message size + _sendBuffer->putInt(_lastMessageStartPosition+sizeof(int16)+2, + _sendBuffer->getPosition()-_lastMessageStartPosition + -CA_MESSAGE_HEADER_SIZE); + + int flagsPosition = _lastMessageStartPosition+sizeof(int16); + // set segmented bit + if(hasMoreSegments) { + // first segment + if(_lastSegmentedMessageType==0) { + int8 type = _sendBuffer->getByte(flagsPosition); + + // set first segment bit + _sendBuffer->putByte(flagsPosition, (int8)(type|0x10)); + + // first + last segment bit == in-between segment + _lastSegmentedMessageType = (int8)(type|0x30); + _lastSegmentedMessageCommand = _sendBuffer->getByte( + flagsPosition+1); + } + } + else { + // last segment + if(_lastSegmentedMessageType!=0) { + // set last segment bit (by clearing first segment bit) + _sendBuffer->putByte(flagsPosition, + (int8)(_lastSegmentedMessageType&0xEF)); + _lastSegmentedMessageType = 0; + } + } + + // manage markers + int position = _sendBuffer->getPosition(); + int bytesLeft = _sendBuffer->getRemaining(); + + if(position>=_nextMarkerPosition&&bytesLeft + >=CA_MESSAGE_HEADER_SIZE) { + _sendBuffer->putShort(CA_MAGIC_AND_VERSION); + _sendBuffer->putByte(1); // control data + _sendBuffer->putByte(0); // marker + _sendBuffer->putInt((int)(_totalBytesSent+position + +CA_MESSAGE_HEADER_SIZE)); + _nextMarkerPosition = position+_markerPeriodBytes; + } + } + } + + void BlockingTCPTransport::ensureData(int size) { + // enough of data? + if(_socketBuffer->getRemaining()>=size) return; + + // too large for buffer... + if(_maxPayloadSizegetPosition()-_storedPosition; + + // no more data and we have some payload left => read buffer + if(_storedPayloadSize>=size) { + //System.out.println("storedPayloadSize >= size, remaining:" + socketBuffer.remaining()); + + // just read up remaining payload + // since there is no data on the buffer, read to the beginning of it, at least size bytes + processReadCached(true, PROCESS_PAYLOAD, size, false); + _storedPosition = _socketBuffer->getPosition(); + _storedLimit = _socketBuffer->getLimit(); + _socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize, + _storedLimit)); + } + else { + // copy remaining bytes, if any + int remainingBytes = _socketBuffer->getRemaining(); + for(int i = 0; iputByte(i, _socketBuffer->getByte()); + + // read what is left + _socketBuffer->setLimit(_storedLimit); + + _stage = PROCESS_HEADER; + processReadCached(true, NONE, size, false); + + // copy before position + for(int i = remainingBytes-1, j = _socketBuffer->getPosition() + -1; i>=0; i--, j--) + _socketBuffer->putByte(j, _socketBuffer->getByte(i)); + _startPosition = _socketBuffer->getPosition()-remainingBytes; + _socketBuffer->setPosition(_startPosition); + + _storedPosition = _startPosition; //socketBuffer.position(); + _storedLimit = _socketBuffer->getLimit(); + _socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize, + _storedLimit)); + + // add if missing... + if(!_closed&&_socketBuffer->getRemaining()getPosition(); + _socketBuffer->setPosition( + _socketBuffer->getLimit()); + _socketBuffer->setLimit(_socketBuffer->getSize()); + } + else { + // add to bytes read + _totalBytesReceived + += (_socketBuffer->getPosition() + -_startPosition); + + // copy remaining bytes, if any + int remainingBytes = _socketBuffer->getRemaining(); + int endPosition = MAX_ENSURE_DATA_BUFFER_SIZE + +remainingBytes; + for(int i = MAX_ENSURE_DATA_BUFFER_SIZE; i + putByte(i, + _socketBuffer->getByte()); + + currentStartPosition = _startPosition + = MAX_ENSURE_DATA_BUFFER_SIZE; + _socketBuffer->setPosition( + MAX_ENSURE_DATA_BUFFER_SIZE+remainingBytes); + _socketBuffer->setLimit(_socketBuffer->getSize()); + } + + // read at least requiredBytes bytes + + int requiredPosition = (currentStartPosition + +requiredBytes); + while(_socketBuffer->getPosition()getRemaining()); + ssize_t bytesRead = recv(_channel, readBuffer, + maxToRead, 0); + _socketBuffer->put(readBuffer,0,maxToRead); + + if(bytesRead<0) { + // error (disconnect, end-of-stream) detected + close(true); + + if(nestedCall) THROW_BASE_EXCEPTION( + "bytesRead < 0"); + + return; + } + } + _socketBuffer->setLimit(_socketBuffer->getPosition()); + _socketBuffer->setPosition(currentStartPosition); + + // notify liveness + aliveNotification(); + + // exit + if(inStage!=NONE) return; + + _stage = PROCESS_HEADER; + } + + if(_stage==PROCESS_HEADER) { + // ensure CAConstants.CA_MESSAGE_HEADER_SIZE bytes of data + if(_socketBuffer->getRemaining()getShort(); + if((short)(_magicAndVersion&0xFFF0) + !=CA_MAGIC_AND_MAJOR_VERSION) { + // error... disconnect + errlogSevPrintf( + errlogMinor, + "Invalid header received from client %s, disconnecting...", + inetAddressToString(_socketAddress).c_str()); + close(true); + return; + } + + // data vs. control packet + _packetType = _socketBuffer->getByte(); + + // command + _command = _socketBuffer->getByte(); + + // read payload size + _payloadSize = _socketBuffer->getInt(); + + // data + int8 type = (int8)(_packetType&0x0F); + if(type==0) { + _stage = PROCESS_PAYLOAD; + } + else if(type==1) { + if(_command==0) { + if(_markerToSend==0) _markerToSend + = _payloadSize; // TODO send back response + } + else //if (command == 1) + { + int difference = (int)_totalBytesSent + -_payloadSize+CA_MESSAGE_HEADER_SIZE; + // overrun check + if(difference<0) difference += INT_MAX; + _remoteBufferFreeSpace + = _remoteTransportReceiveBufferSize + +_remoteTransportSocketReceiveBufferSize + -difference; + // TODO if this is calculated wrong, this can be critical !!! + } + + // no payload + //stage = ReceiveStage.PROCESS_HEADER; + continue; + } + else { + errlogSevPrintf( + errlogMajor, + "Unknown packet type %d, received from client %s, disconnecting...", + type, + inetAddressToString(_socketAddress).c_str()); + close(true); + return; + } + } + + if(_stage==PROCESS_PAYLOAD) { + // read header + int8 version = (int8)(_magicAndVersion&0xFF); + // last segment bit set (means in-between segment or last segment) + bool notFirstSegment = (_packetType&0x20)!=0; + + _storedPayloadSize = _payloadSize; + + // if segmented, exit reading code + if(nestedCall&¬FirstSegment) return; + + // NOTE: nested data (w/ payload) messages between segmented messages are not supported + _storedPosition = _socketBuffer->getPosition(); + _storedLimit = _socketBuffer->getLimit(); + _socketBuffer->setLimit(min(_storedPosition + +_storedPayloadSize, _storedLimit)); + try { + // handle response + _responseHandler->handleResponse(_socketAddress, + this, version, _command, _payloadSize, + _socketBuffer); + } catch(...) { + //noop + } + + /* + * Java finally start + */ + _socketBuffer->setLimit(_storedLimit); + int newPosition = _storedPosition+_storedPayloadSize; + if(newPosition>_storedLimit) { + newPosition -= _storedLimit; + _socketBuffer->setPosition(_storedLimit); + processReadCached(true, PROCESS_PAYLOAD, + newPosition, false); + newPosition += _startPosition; + } + _socketBuffer->setPosition(newPosition); + // TODO discard all possible segments?!!! + /* + * Java finally end + */ + + _stage = PROCESS_HEADER; + + continue; + } + + } + } catch(...) { + // close connection + close(true); + + if(nestedCall) throw; + } + } + + bool BlockingTCPTransport::flush() { + // TODO implement! + return true; + } + + } +} diff --git a/pvAccessApp/remote/blockingUDP.h b/pvAccessApp/remote/blockingUDP.h index 55dce30..3da9315 100644 --- a/pvAccessApp/remote/blockingUDP.h +++ b/pvAccessApp/remote/blockingUDP.h @@ -49,14 +49,6 @@ namespace epics { return String("UDP"); } - virtual int8 getMajorRevision() const { - return CA_MAJOR_PROTOCOL_REVISION; - } - - virtual int8 getMinorRevision() const { - return CA_MINOR_PROTOCOL_REVISION; - } - virtual int getReceiveBufferSize() const { return _receiveBuffer->getSize(); } @@ -169,7 +161,7 @@ namespace epics { } protected: - bool _closed; + bool volatile _closed; /** * Response handler. diff --git a/pvAccessApp/remote/blockingUDPTransport.cpp b/pvAccessApp/remote/blockingUDPTransport.cpp index b8131b4..fdb7cc0 100644 --- a/pvAccessApp/remote/blockingUDPTransport.cpp +++ b/pvAccessApp/remote/blockingUDPTransport.cpp @@ -108,12 +108,9 @@ namespace epics { } void BlockingUDPTransport::endMessage() { - int oldPosition = _sendBuffer->getPosition(); - _sendBuffer->setPosition(_lastMessageStartPosition - +(sizeof(int16)+2)); - _sendBuffer->putInt(oldPosition-_lastMessageStartPosition - -CA_MESSAGE_HEADER_SIZE); - _sendBuffer->setPosition(oldPosition); + _sendBuffer->putInt(_lastMessageStartPosition+(sizeof(int16)+2), + _sendBuffer->getPosition()-_lastMessageStartPosition + -CA_MESSAGE_HEADER_SIZE); } @@ -295,9 +292,7 @@ namespace epics { // this DatagramSocket. int sockBufSize; - socklen_t intLen; - - intLen = sizeof(int); + socklen_t intLen = sizeof(int); int retval = getsockopt(_channel, SOL_SOCKET, SO_RCVBUF, &sockBufSize, &intLen); diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index 5eb5813..d99ec9e 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -8,6 +8,8 @@ #ifndef REMOTE_H_ #define REMOTE_H_ +#include "caConstants.h" + #include #include #include @@ -92,13 +94,17 @@ namespace epics { * Transport protocol major revision. * @return protocol major revision. */ - virtual int8 getMajorRevision() const =0; + virtual int8 getMajorRevision() const { + return CA_MAJOR_PROTOCOL_REVISION; + } /** * Transport protocol minor revision. * @return protocol minor revision. */ - virtual int8 getMinorRevision() const =0; + virtual int8 getMinorRevision() const { + return CA_MINOR_PROTOCOL_REVISION; + } /** * Get receive buffer size. @@ -238,7 +244,6 @@ namespace epics { }; - /** * Interface defining socket connector (Connector-Transport pattern). * @author Matej Sekoranja @@ -256,8 +261,9 @@ namespace epics { * @return transport instance. * @throws ConnectionException */ - virtual Transport* connect(TransportClient* client, ResponseHandler* responseHandler, - osiSockAddr* address, short transportRevision, short priority) =0; + virtual Transport* connect(TransportClient* client, + ResponseHandler* responseHandler, osiSockAddr* address, + short transportRevision, short priority) =0; };