/** * Copyright - See the COPYRIGHT that is included with this distribution. * pvAccessCPP is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. */ #ifdef _WIN32 #define NOMINMAX #endif #include #include #include #include #include #include #include #include #include #include #include #include using namespace epics::pvData; using namespace epics::pvAccess; namespace epics { namespace pvAccess { const std::size_t AbstractCodec::MAX_MESSAGE_PROCESS = 100; const std::size_t AbstractCodec::MAX_MESSAGE_SEND = 100; const std::size_t AbstractCodec::MAX_ENSURE_SIZE = 1024; const std::size_t AbstractCodec::MAX_ENSURE_DATA_SIZE = MAX_ENSURE_SIZE/2; const std::size_t AbstractCodec::MAX_ENSURE_BUFFER_SIZE = MAX_ENSURE_SIZE; const std::size_t AbstractCodec::MAX_ENSURE_DATA_BUFFER_SIZE = 1024; AbstractCodec::AbstractCodec( std::tr1::shared_ptr const & receiveBuffer, std::tr1::shared_ptr const & sendBuffer, int32_t socketSendBufferSize, bool blockingProcessQueue): //PROTECTED _readMode(NORMAL), _version(0), _flags(0), _command(0), _payloadSize(0), _remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV), _totalBytesSent(0), _blockingProcessQueue(false), _senderThread(0), _writeMode(PROCESS_SEND_QUEUE), _writeOpReady(false),_lowLatency(false), _socketBuffer(receiveBuffer), _sendBuffer(sendBuffer), //PRIVATE _storedPayloadSize(0), _storedPosition(0), _startPosition(0), _maxSendPayloadSize(0), _lastMessageStartPosition(0),_lastSegmentedMessageType(0), _lastSegmentedMessageCommand(0), _nextMessagePayloadOffset(0), _byteOrderFlag(EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG ? 0x80 : 0x00), _socketSendBufferSize(0) { if (receiveBuffer->getSize() < 2*MAX_ENSURE_SIZE) throw std::invalid_argument( "receiveBuffer.capacity() < 2*MAX_ENSURE_SIZE"); // require aligned buffer size //(not condition, but simplifies alignment code) if (receiveBuffer->getSize() % PVA_ALIGNMENT != 0) throw std::invalid_argument( "receiveBuffer.capacity() % PVAConstants.PVA_ALIGNMENT != 0"); if (sendBuffer->getSize() < 2*MAX_ENSURE_SIZE) throw std::invalid_argument("sendBuffer() < 2*MAX_ENSURE_SIZE"); // require aligned buffer size //(not condition, but simplifies alignment code) if (sendBuffer->getSize() % PVA_ALIGNMENT != 0) throw std::invalid_argument( "sendBuffer() % PVAConstants.PVA_ALIGNMENT != 0"); // initialize to be empty _socketBuffer->setPosition(_socketBuffer->getLimit()); _startPosition = _socketBuffer->getPosition(); // clear send _sendBuffer->clear(); // start msg + control _maxSendPayloadSize = _sendBuffer->getSize() - 2*PVA_MESSAGE_HEADER_SIZE; _socketSendBufferSize = socketSendBufferSize; _blockingProcessQueue = blockingProcessQueue; LOG(logLevelTrace, "AbstractCodec constructed (threadId: %u)", epicsThreadGetIdSelf()); } void AbstractCodec::processRead() { LOG(logLevelTrace, "AbstractCodec::processRead: enter (threadId: %u)", epicsThreadGetIdSelf()); switch (_readMode) { case NORMAL: processReadNormal(); break; case SEGMENTED: processReadSegmented(); break; case SPLIT: throw std::logic_error("ReadMode == SPLIT not supported"); } } void AbstractCodec::processHeader() { LOG(logLevelTrace, "AbstractCodec::processHeader enter (threadId: %u)", epicsThreadGetIdSelf()); // magic code int8_t magicCode = _socketBuffer->getByte(); // version _version = _socketBuffer->getByte(); // flags _flags = _socketBuffer->getByte(); // command _command = _socketBuffer->getByte(); // read payload size _payloadSize = _socketBuffer->getInt(); // check magic code if (magicCode != PVA_MAGIC) { LOG(logLevelError, "Invalid header received from the client at %s:%d: %d," " disconnecting...", __FILE__, __LINE__, getLastReadBufferSocketAddress()); invalidDataStreamHandler(); throw invalid_data_stream_exception("invalid header received"); } } void AbstractCodec::processReadNormal() { LOG(logLevelTrace, "AbstractCodec::processReadNormal enter (threadId: %u)", epicsThreadGetIdSelf()); try { std::size_t messageProcessCount = 0; while (messageProcessCount++ < MAX_MESSAGE_PROCESS) { // read as much as available, but at least for a header // readFromSocket checks if reading from socket is really necessary if (!readToBuffer(PVA_MESSAGE_HEADER_SIZE, false)) { return; } // read header fields processHeader(); bool isControl = ((_flags & 0x01) == 0x01); if (isControl) { processControlMessage(); } else { // segmented sanity check bool notFirstSegment = (_flags & 0x20) != 0; if (notFirstSegment) { LOG(logLevelWarn, "Not-a-frst segmented message received in normal mode" " from the client at %s:%d: %d, disconnecting...", __FILE__, __LINE__, getLastReadBufferSocketAddress()); invalidDataStreamHandler(); throw invalid_data_stream_exception( "not-a-first segmented message received in normal mode"); } _storedPayloadSize = _payloadSize; _storedPosition = _socketBuffer->getPosition(); _storedLimit = _socketBuffer->getLimit(); _socketBuffer->setLimit(std::min (_storedPosition + _storedPayloadSize, _storedLimit)); try { // handle response processApplicationMessage(); postProcessApplicationMessage(); } catch(...) //finally { if (!isOpen()) return; postProcessApplicationMessage(); throw; } } } } catch (invalid_data_stream_exception & ) { // noop, should be already handled (and logged) } catch (connection_closed_exception & ) { // noop, should be already handled (and logged) } } void AbstractCodec::postProcessApplicationMessage() { if (!isOpen()) return; // can be closed by now // isOpen() should be efficiently implemented while (true) //while (isOpen()) { // set position as whole message was read //(in case code haven't done so) std::size_t newPosition = alignedValue( _storedPosition + _storedPayloadSize, PVA_ALIGNMENT); // aligned buffer size ensures that there is enough space //in buffer, // however data might not be fully read // discard the rest of the packet if (newPosition > _storedLimit) { // processApplicationMessage() did not read up //quite some buffer // we only handle unused alignment bytes int bytesNotRead = newPosition - _socketBuffer->getPosition(); if (bytesNotRead < PVA_ALIGNMENT) { // make alignment bytes as real payload to enable SPLIT // no end-of-socket or segmented scenario can happen // due to aligned buffer size _storedPayloadSize += bytesNotRead; // reveal currently existing padding _socketBuffer->setLimit(_storedLimit); ensureData(bytesNotRead); _storedPayloadSize -= bytesNotRead; continue; } // TODO we do not handle this for now (maybe never) LOG(logLevelWarn, "unprocessed read buffer from client at %s:%d: %d," " disconnecting...", __FILE__, __LINE__, getLastReadBufferSocketAddress()); invalidDataStreamHandler(); throw invalid_data_stream_exception( "unprocessed read buffer"); } _socketBuffer->setLimit(_storedLimit); _socketBuffer->setPosition(newPosition); break; } } void AbstractCodec::processReadSegmented() { LOG(logLevelTrace, "AbstractCodec::processReadSegmented enter (threadId: %u)", epicsThreadGetIdSelf()); while (true) { // read as much as available, but at least for a header // readFromSocket checks if reading from socket is really necessary readToBuffer(PVA_MESSAGE_HEADER_SIZE, true); // read header fields processHeader(); bool isControl = ((_flags & 0x01) == 0x01); if (isControl) processControlMessage(); else { // last segment bit set (means in-between segment or last segment) // we expect this, no non-control messages between //segmented message are supported // NOTE: for now... it is easy to support non-semgented //messages between segmented messages bool notFirstSegment = (_flags & 0x20) != 0; if (!notFirstSegment) { LOG(logLevelWarn, "Not-a-first segmented message expected from the client at" " %s:%d: %d, disconnecting...", __FILE__, __LINE__, getLastReadBufferSocketAddress()); invalidDataStreamHandler(); throw new invalid_data_stream_exception( "not-a-first segmented message expected"); } _storedPayloadSize = _payloadSize; // return control to caller code return; } } } bool AbstractCodec::readToBuffer( std::size_t requiredBytes, bool persistent) { LOG(logLevelTrace, "AbstractCodec::readToBuffer enter requiredBytes: %u," " persistant: %d (threadId: %u)", requiredBytes, persistent, epicsThreadGetIdSelf()); // do we already have requiredBytes available? std::size_t remainingBytes = _socketBuffer->getRemaining(); if (remainingBytes >= requiredBytes) { return true; } // assumption: remainingBytes < MAX_ENSURE_DATA_BUFFER_SIZE && // requiredBytes < (socketBuffer.capacity() - PVA_ALIGNMENT) // // copy unread part to the beginning of the buffer // to make room for new data (as much as we can read) // NOTE: requiredBytes is expected to be small (order of 10 bytes) // // a new start position, we are careful to preserve alignment _startPosition = MAX_ENSURE_SIZE + _socketBuffer->getPosition() % PVA_ALIGNMENT; std::size_t endPosition = _startPosition + remainingBytes; for (std::size_t i = _startPosition; i < endPosition; i++) _socketBuffer->putByte(i, _socketBuffer->getByte()); // update buffer to the new position _socketBuffer->setLimit(_socketBuffer->getSize()); _socketBuffer->setPosition(endPosition); // read at least requiredBytes bytes std::size_t requiredPosition = _startPosition + requiredBytes; while (_socketBuffer->getPosition() < requiredPosition) { int bytesRead = read(_socketBuffer.get()); LOG(logLevelTrace, "AbstractCodec::readToBuffer READ BYTES: %d (threadId: %u)", bytesRead, epicsThreadGetIdSelf()); if (bytesRead < 0) { LOG(logLevelTrace, "AbstractCodec::before close (threadId: %u)", epicsThreadGetIdSelf()); close(); throw connection_closed_exception("bytesRead < 0"); } // non-blocking IO support else if (bytesRead == 0) { if (persistent) readPollOne(); else { // set pointers (aka flip) _socketBuffer->setLimit(_socketBuffer->getPosition()); _socketBuffer->setPosition(_startPosition); return false; } } } // set pointers (aka flip) _socketBuffer->setLimit(_socketBuffer->getPosition()); _socketBuffer->setPosition(_startPosition); return true; } void AbstractCodec::ensureData(std::size_t size) { LOG(logLevelTrace, "AbstractCodec::ensureData enter: size: %u (threadId: %u)", size, epicsThreadGetIdSelf()); // enough of data? if (_socketBuffer->getRemaining() >= size) return; // to large for buffer... if (size > MAX_ENSURE_DATA_SIZE) {// half for SPLIT, half for SEGMENTED std::ostringstream msg; msg << "requested for buffer size " << size << ", but maximum " << MAX_ENSURE_DATA_SIZE << " is allowed."; LOG(logLevelWarn, "%s at %s:%d,", msg.str().c_str(), __FILE__, __LINE__); std::string s = msg.str(); throw std::invalid_argument(s); } try { // subtract what was already processed std::size_t pos = _socketBuffer->getPosition(); _storedPayloadSize -= pos - _storedPosition; // SPLIT message case // no more data and we have some payload left => read buffer // NOTE: (storedPayloadSize >= size) does not work if size //spans over multiple messages if (_storedPayloadSize >= (_storedLimit-pos)) { // just read up remaining payload // this will move current (getPosition(); _storedLimit = _socketBuffer->getLimit(); _socketBuffer->setLimit( std::min( _storedPosition + _storedPayloadSize, _storedLimit)); // check needed, if not enough data is available or // we run into segmented message ensureData(size); } // SEGMENTED message case else { // TODO check flags //if (flags && SEGMENTED_FLAGS_MASK == 0) // throw IllegalStateException("segmented message expected, //but current message flag does not indicate it"); // copy remaining bytes of payload to safe area //[0 to MAX_ENSURE_DATA_BUFFER_SIZE/2), if any // remaining is relative to payload since buffer is //bounded from outside std::size_t remainingBytes = _socketBuffer->getRemaining(); for (std::size_t i = 0; i < remainingBytes; i++) _socketBuffer->putByte(i, _socketBuffer->getByte()); // restore limit (there might be some data already present //and readToBuffer needs to know real limit) _socketBuffer->setLimit(_storedLimit); // remember alignment offset of end of the message (to be restored) std::size_t storedAlignmentOffset = _socketBuffer->getPosition() % PVA_ALIGNMENT; // skip post-message alignment bytes if (storedAlignmentOffset > 0) { std::size_t toSkip = PVA_ALIGNMENT - storedAlignmentOffset; readToBuffer(toSkip, true); std::size_t currentPos = _socketBuffer->getPosition(); _socketBuffer->setPosition(currentPos + toSkip); } // we expect segmented message, we expect header // that (and maybe some control packets) needs to be "removed" // so that we get combined payload ReadMode storedMode = _readMode; _readMode = SEGMENTED; processRead(); _readMode = storedMode; // make sure we have all the data (maybe we run into SPLIT) readToBuffer(size - remainingBytes + storedAlignmentOffset, true); // skip storedAlignmentOffset bytes (sender should padded start of //segmented message) // SPLIT cannot mess with this, since start of the message, //i.e. current position, is always aligned _socketBuffer->setPosition( _socketBuffer->getPosition() + storedAlignmentOffset); // copy before position (i.e. start of the payload) for (int32_t i = remainingBytes - 1, j = _socketBuffer->getPosition() - 1; i >= 0; i--, j--) _socketBuffer->putByte(j, _socketBuffer->getByte(i)); _startPosition = _socketBuffer->getPosition() - remainingBytes; _socketBuffer->setPosition(_startPosition); _storedPayloadSize += remainingBytes - storedAlignmentOffset; _storedPosition = _startPosition; _storedLimit = _socketBuffer->getLimit(); _socketBuffer->setLimit( std::min( _storedPosition + _storedPayloadSize, _storedLimit)); // sequential small segmented messages in the buffer ensureData(size); } } catch (io_exception &) { try { close(); } catch (io_exception & ) { // noop, best-effort close } throw connection_closed_exception( "Failed to ensure data to read buffer."); } } std::size_t AbstractCodec::alignedValue( std::size_t value, std::size_t alignment) { LOG(logLevelTrace, "AbstractCodec::alignedValue enter: value: %u, alignment:%u" " (threadId: %u)", value, alignment, epicsThreadGetIdSelf()); std::size_t k = (alignment - 1); return (value + k) & (~k); } void AbstractCodec::alignData(std::size_t alignment) { LOG(logLevelTrace, "AbstractCodec::alignData enter: alignment:%u (threadId: %u)", alignment, epicsThreadGetIdSelf()); std::size_t k = (alignment - 1); std::size_t pos = _socketBuffer->getPosition(); std::size_t newpos = (pos + k) & (~k); if (pos == newpos) return; std::size_t diff = _socketBuffer->getLimit() - newpos; if (diff > 0) { _socketBuffer->setPosition(newpos); return; } ensureData(diff); // position has changed, recalculate newpos = (_socketBuffer->getPosition() + k) & (~k); _socketBuffer->setPosition(newpos); } void AbstractCodec::alignBuffer(std::size_t alignment) { LOG(logLevelTrace, "AbstractCodec::alignBuffer enter:" " alignment:%u (threadId: %u)", alignment, epicsThreadGetIdSelf()); std::size_t k = (alignment - 1); std::size_t pos = _sendBuffer->getPosition(); std::size_t newpos = (pos + k) & (~k); if (pos == newpos) return; // there is always enough of space // since sendBuffer capacity % PVA_ALIGNMENT == 0 _sendBuffer->setPosition(newpos); } void AbstractCodec::startMessage( epics::pvData::int8 command, std::size_t ensureCapacity) { LOG(logLevelTrace, "AbstractCodec::startMessage enter: command:%x " " ensureCapacity:%u (threadId: %u)", command, ensureCapacity, epicsThreadGetIdSelf()); _lastMessageStartPosition = std::numeric_limits::max(); // TODO revise this ensureBuffer( PVA_MESSAGE_HEADER_SIZE + ensureCapacity + _nextMessagePayloadOffset); _lastMessageStartPosition = _sendBuffer->getPosition(); _sendBuffer->putByte(PVA_MAGIC); _sendBuffer->putByte(PVA_VERSION); _sendBuffer->putByte( (_lastSegmentedMessageType | _byteOrderFlag)); // data + endian _sendBuffer->putByte(command); // command _sendBuffer->putInt(0); // temporary zero payload // apply offset if (_nextMessagePayloadOffset > 0) _sendBuffer->setPosition( _sendBuffer->getPosition() + _nextMessagePayloadOffset); } void AbstractCodec::putControlMessage( epics::pvData::int8 command, epics::pvData::int32 data) { LOG(logLevelTrace, "AbstractCodec::putControlMessage enter: command:%x " "data:%d (threadId: %u)", command, data, epicsThreadGetIdSelf()); _lastMessageStartPosition = std::numeric_limits::max(); // TODO revise this ensureBuffer(PVA_MESSAGE_HEADER_SIZE); _sendBuffer->putByte(PVA_MAGIC); _sendBuffer->putByte(PVA_VERSION); _sendBuffer->putByte((0x01 | _byteOrderFlag)); // control + endian _sendBuffer->putByte(command); // command _sendBuffer->putInt(data); // data } void AbstractCodec::endMessage() { LOG(logLevelTrace, "AbstractCodec::endMessage enter: (threadId: %u)", epicsThreadGetIdSelf()); endMessage(false); } void AbstractCodec::endMessage(bool hasMoreSegments) { LOG(logLevelTrace, "AbstractCodec::endMessage enter: hasMoreSegments:%d (threadId: %u)", hasMoreSegments, epicsThreadGetIdSelf()); if (_lastMessageStartPosition != std::numeric_limits::max()) { std::size_t lastPayloadBytePosition = _sendBuffer->getPosition(); // align alignBuffer(PVA_ALIGNMENT); // set paylaod size (non-aligned) std::size_t payloadSize = lastPayloadBytePosition - _lastMessageStartPosition - PVA_MESSAGE_HEADER_SIZE; _sendBuffer->putInt(_lastMessageStartPosition + 4, payloadSize); // set segmented bit if (hasMoreSegments) { // first segment if (_lastSegmentedMessageType == 0) { std::size_t flagsPosition = _lastMessageStartPosition + 2; epics::pvData::int8 type = _sendBuffer->getByte(flagsPosition); // set first segment bit _sendBuffer->putByte(flagsPosition, (type | 0x10)); // first + last segment bit == in-between segment _lastSegmentedMessageType = type | 0x30; _lastSegmentedMessageCommand = _sendBuffer->getByte(flagsPosition + 1); } _nextMessagePayloadOffset = lastPayloadBytePosition % PVA_ALIGNMENT; } else { // last segment if (_lastSegmentedMessageType != std::numeric_limits::max()) { std::size_t flagsPosition = _lastMessageStartPosition + 2; // set last segment bit (by clearing first segment bit) _sendBuffer->putByte(flagsPosition, (_lastSegmentedMessageType & 0xEF)); _lastSegmentedMessageType = 0; } _nextMessagePayloadOffset = 0; } // TODO /* // manage markers final int position = sendBuffer.position(); final int bytesLeft = sendBuffer.remaining(); if (position >= nextMarkerPosition && bytesLeft >= PVAConstants.PVA_MESSAGE_HEADER_SIZE) { sendBuffer.put(PVAConstants.PVA_MAGIC); sendBuffer.put(PVAConstants.PVA_VERSION); sendBuffer.put((byte)(0x01 | byteOrderFlag)); // control data sendBuffer.put((byte)0); // marker sendBuffer.putInt((int)(totalBytesSent + position + PVAConstants.PVA_MESSAGE_HEADER_SIZE)); nextMarkerPosition = position + markerPeriodBytes; } */ _lastMessageStartPosition = std::numeric_limits::max(); } } void AbstractCodec::ensureBuffer(std::size_t size) { LOG(logLevelTrace, "AbstractCodec::ensureBuffer enter: size:%u (threadId: %u)", size, epicsThreadGetIdSelf()); if (_sendBuffer->getRemaining() >= size) return; // too large for buffer... if (_maxSendPayloadSize < size) { std::ostringstream msg; msg << "requested for buffer size " << size << ", but only " << _maxSendPayloadSize << " available."; std::string s = msg.str(); LOG(logLevelWarn, "%s at %s:%d,", msg.str().c_str(), __FILE__, __LINE__); throw std::invalid_argument(s); } while (_sendBuffer->getRemaining() < size) flush(false); } void AbstractCodec::flushSerializeBuffer() { LOG(logLevelTrace, "AbstractCodec::flushSerializeBuffer enter: (threadId: %u)", epicsThreadGetIdSelf()); flush(false); } void AbstractCodec::flush(bool lastMessageCompleted) { LOG(logLevelTrace, "AbstractCodec::flush enter: lastMessageCompleted:%d (threadId: %u)", lastMessageCompleted, epicsThreadGetIdSelf()); // automatic end endMessage(!lastMessageCompleted); _sendBuffer->flip(); try { send(_sendBuffer.get()); } catch (io_exception &) { try { if (isOpen()) close(); } catch (io_exception &) { // noop, best-effort close } throw connection_closed_exception("Failed to send buffer."); } _sendBuffer->clear(); _lastMessageStartPosition = std::numeric_limits::max(); // start with last header if (!lastMessageCompleted && _lastSegmentedMessageType != 0) startMessage(_lastSegmentedMessageCommand, 0); } void AbstractCodec::processWrite() { LOG(logLevelTrace, "AbstractCodec::processWrite enter: (threadId: %u)", epicsThreadGetIdSelf()); // TODO catch ConnectionClosedException, InvalidStreamException? switch (_writeMode) { case PROCESS_SEND_QUEUE: processSendQueue(); break; case WAIT_FOR_READY_SIGNAL: _writeOpReady = true; break; } } void AbstractCodec::send(ByteBuffer *buffer) { LOG(logLevelTrace, "AbstractCodec::send enter: (threadId: %u)", epicsThreadGetIdSelf()); // On Windows, limiting the buffer size is important to prevent // poor throughput performances when transferring large amount of // data. See Microsoft KB article KB823764. // We do it also for other systems just to be safe. std::size_t maxBytesToSend = std::min( _socketSendBufferSize, _remoteTransportSocketReceiveBufferSize) / 2; std::size_t limit = buffer->getLimit(); std::size_t bytesToSend = limit - buffer->getPosition(); // limit sending if (bytesToSend > maxBytesToSend) { bytesToSend = maxBytesToSend; buffer->setLimit(buffer->getPosition() + bytesToSend); } int tries = 0; while (buffer->getRemaining() > 0) { //int p = buffer.position(); int bytesSent = write(buffer); if (IS_LOGGABLE(logLevelTrace)) { hexDump(std::string("AbstractCodec::send WRITE"), (const int8 *)buffer->getArray(), buffer->getPosition(), buffer->getRemaining()); } if (bytesSent < 0) { // connection lost close(); throw connection_closed_exception("bytesSent < 0"); } else if (bytesSent == 0) { sendBufferFull(tries++); continue; } _totalBytesSent += bytesSent; // readjust limit if (bytesToSend == maxBytesToSend) { bytesToSend = limit - buffer->getPosition(); if(bytesToSend > maxBytesToSend) bytesToSend = maxBytesToSend; buffer->setLimit(buffer->getPosition() + bytesToSend); } tries = 0; } } void AbstractCodec::processSendQueue() { LOG(logLevelTrace, "AbstractCodec::processSendQueue enter: (threadId: %u)", epicsThreadGetIdSelf()); try { std::size_t senderProcessed = 0; while (senderProcessed++ < MAX_MESSAGE_SEND) { TransportSender::shared_pointer sender = _sendQueue.take(-1); if (sender.get() == 0) { // flush if (_sendBuffer->getPosition() > 0) flush(true); sendCompleted(); // do not schedule sending if (_blockingProcessQueue) { if (terminated()) // termination break; sender = _sendQueue.take(0); // termination (we want to process even if shutdown) if (sender.get() == 0) break; } else return; } processSender(sender); } } //TODO MATEJ CHECK //InterruptedException ie catch (...) { // noop, allowed and expected in blocking } // flush if (_sendBuffer->getPosition() > 0) flush(true); } void AbstractCodec::clearSendQueue() { LOG(logLevelTrace, "AbstractCodec::clearSendQueue enter: (threadId: %u)", epicsThreadGetIdSelf()); _sendQueue.clean(); } void AbstractCodec::enqueueSendRequest( TransportSender::shared_pointer const & sender) { LOG(logLevelTrace, "AbstractCodec::enqueueSendRequest enter: sender is set:%d" " (threadId: %u)", (sender.get() != 0), epicsThreadGetIdSelf()); _sendQueue.put(sender); scheduleSend(); } void AbstractCodec::setSenderThread() { LOG(logLevelTrace, "AbstractCodec::setSenderThread enter: (threadId: %u)", epicsThreadGetIdSelf()); _senderThread = epicsThreadGetIdSelf(); } void AbstractCodec::processSender( TransportSender::shared_pointer const & sender) { LOG(logLevelTrace, "AbstractCodec::processSender enter: sender is set:%d (threadId: %u)", (sender.get() != 0), epicsThreadGetIdSelf); ScopedLock lock(sender); try { _lastMessageStartPosition = _sendBuffer->getPosition(); sender->send(_sendBuffer.get(), this); // automatic end (to set payload size) endMessage(false); } catch (std::exception &e ) { std::ostringstream msg; msg << "an exception caught while processing a send message: " << e.what(); LOG(logLevelWarn, "%s at %s:%d", msg.str().c_str(), __FILE__, __LINE__); try { close(); } catch (io_exception & ) { // noop } throw connection_closed_exception(msg.str()); } } void AbstractCodec::enqueueSendRequest( TransportSender::shared_pointer const & sender, std::size_t requiredBufferSize) { LOG(logLevelTrace, "AbstractCodec::enqueueSendRequest enter: sender is set:%d " "requiredBufferSize:%u (threadId: %u)", (sender.get() != 0), requiredBufferSize, epicsThreadGetIdSelf); if (_senderThread == epicsThreadGetIdSelf() && _sendQueue.empty() && _sendBuffer->getRemaining() >= requiredBufferSize) { processSender(sender); if (_sendBuffer->getPosition() > 0) { if (_lowLatency) flush(true); else scheduleSend(); } } else enqueueSendRequest(sender); } void AbstractCodec::setRecipient(osiSockAddr const & sendTo) { LOG(logLevelTrace, "AbstractCodec::setRecipient enter: (threadId: %u)", epicsThreadGetIdSelf); _sendTo = sendTo; } void AbstractCodec::setByteOrder(int byteOrder) { LOG(logLevelTrace, "AbstractCodec::setByteOrder enter: byteOrder:%x (threadId: %u)", byteOrder, epicsThreadGetIdSelf()); _socketBuffer->setEndianess(byteOrder); // TODO sync _sendBuffer->setEndianess(byteOrder); _byteOrderFlag = EPICS_ENDIAN_BIG == byteOrder ? 0x80 : 0x00; } // // // BlockingAbstractCodec // // // void BlockingAbstractCodec::readPollOne() { LOG(logLevelTrace, "BlockingAbstractCodec::readPollOne enter: (threadId: %u)", epicsThreadGetIdSelf()); throw std::logic_error("should not be called for blocking IO"); } void BlockingAbstractCodec::writePollOne() { LOG(logLevelTrace, "BlockingAbstractCodec::writePollOne enter: (threadId: %u)", epicsThreadGetIdSelf()); throw std::logic_error("should not be called for blocking IO"); } void BlockingAbstractCodec::close() { LOG(logLevelTrace, "BlockingAbstractCodec::close enter: (threadId: %u)", epicsThreadGetIdSelf()); if (_isOpen.getAndSet(false)) { // always close in the same thread, same way, etc. // wakeup processSendQueue LOG(logLevelTrace, "BlockingAbstractCodec::close _sendQueue.waaaaakeup: " " (threadId: %u)", epicsThreadGetIdSelf()); _sendQueue.wakeup(); } else { LOG(logLevelTrace, "BlockingAbstractCodec::close NOT WAKING UP _sendQueue: " " (threadId: %u)", epicsThreadGetIdSelf()); } } bool BlockingAbstractCodec::terminated() { LOG(logLevelTrace, "BlockingAbstractCodec::terminated enter: (threadId: %u)", epicsThreadGetIdSelf()); //TODO OPEN QUESTION TO MATEJ return !isOpen(); } bool BlockingAbstractCodec::isOpen() { LOG(logLevelTrace, "BlockingAbstractCodec::isOpen enter: (threadId: %u)", epicsThreadGetIdSelf()); return _isOpen.get(); } // NOTE: must not be called from constructor (e.g. needs shared_from_this()) void BlockingAbstractCodec::start() { LOG(logLevelTrace, "BlockingAbstractCodec::start enter: (threadId: %u)", epicsThreadGetIdSelf()); _readThread = epicsThreadCreate( "BlockingAbstractCodec-readThread", epicsThreadPriorityMedium, epicsThreadGetStackSize( epicsThreadStackMedium), BlockingAbstractCodec::receiveThread, this); _sendThread = epicsThreadCreate( "BlockingAbstractCodec-_sendThread", epicsThreadPriorityMedium, epicsThreadGetStackSize( epicsThreadStackMedium), BlockingAbstractCodec::sendThread, this); LOG(logLevelTrace, "BlockingAbstractCodec::start exit WITH readThread: %u," " sendThread:%u (threadId: %u)", _readThread, _sendThread, epicsThreadGetIdSelf()); } void BlockingAbstractCodec::receiveThread(void *param) { LOG(logLevelTrace, "BlockingAbstractCodec::receiveThread enter: (threadId: %u)", epicsThreadGetIdSelf()); BlockingAbstractCodec *bac = static_cast(param); Transport::shared_pointer ptr = bac->shared_from_this(); while (bac->isOpen()) { try { bac->processRead(); } catch (io_exception &e) { LOG(logLevelWarn, "an exception caught while in receiveThread at %s:%d: %s", __FILE__, __LINE__, e.what()); } } LOG(logLevelTrace, "BlockingAbstractCodec::receiveThread" " EXIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIT: (threadId: %u)", epicsThreadGetIdSelf()); } void BlockingAbstractCodec::sendThread(void *param) { LOG(logLevelTrace, "BlockingAbstractCodec::sendThread enter: (threadId: %u)", epicsThreadGetIdSelf()); BlockingAbstractCodec *bac = static_cast(param); Transport::shared_pointer ptr = bac->shared_from_this(); bac->setSenderThread(); while (bac->isOpen()) { try { bac->processWrite(); } catch (io_exception &e) { LOG(logLevelWarn, "an exception caught while in sendThread at %s:%d: %s", __FILE__, __LINE__, e.what()); } } LOG(logLevelTrace, "BlockingAbstractCodec::sendThread EXIIIIIIIIIIIIIIT" " while(bac->isOpen): (threadId: %u)", epicsThreadGetIdSelf()); // wait read thread to die //TODO epics join thread //readThread.join(); // TODO timeout //bac->_shutdownEvent.signal(); // call internal destroy LOG(logLevelTrace, "XXXXXXXXXXXXXXXXXXXXXXXXXXXX" "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX (threadId: %u)", epicsThreadGetIdSelf()); bac->internalDestroy(); LOG(logLevelTrace, "YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY" "YYYYYYYYYYYYYYYYYYYYYYYYYYYY (threadId: %u)", epicsThreadGetIdSelf()); LOG(logLevelTrace, "BlockingAbstractCodec::sendThread EXIIIIT (threadId: %u)", epicsThreadGetIdSelf()); } void BlockingAbstractCodec::sendBufferFull(int tries) { LOG(logLevelTrace, "BlockingAbstractCodec::sendBufferFull enter: tries: %d " "(threadId: %u)", tries, epicsThreadGetIdSelf()); // TODO constants epicsThreadSleep(std::max(tries * 0.1, 1)); } // // // BlockingSocketAbstractCodec // // // BlockingSocketAbstractCodec::BlockingSocketAbstractCodec( SOCKET channel, int32_t sendBufferSize, int32_t receiveBufferSize): BlockingAbstractCodec( std::tr1::shared_ptr(new ByteBuffer((std::max((std::size_t)( MAX_TCP_RECV + MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) + (PVA_ALIGNMENT - 1)) & (~(PVA_ALIGNMENT - 1)))), std::tr1::shared_ptr(new ByteBuffer((std::max((std::size_t)( MAX_TCP_RECV + MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) + (PVA_ALIGNMENT - 1)) & (~(PVA_ALIGNMENT - 1)))), sendBufferSize), _channel(channel) { // get remote address osiSocklen_t saSize = sizeof(sockaddr); int retval = getpeername(_channel, &(_socketAddress.sa), &saSize); if(unlikely(retval<0)) { char errStr[64]; epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); LOG(logLevelError, "Error fetching socket remote address: %s", errStr); } // set receive timeout so that we do not have problems at //shutdown (recvfrom would block) struct timeval timeout; memset(&timeout, 0, sizeof(struct timeval)); timeout.tv_sec = 1; timeout.tv_usec = 0; // TODO remove this and implement use epicsSocketSystemCallInterruptMechanismQuery if (unlikely(::setsockopt (_channel, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout)) < 0)) { char errStr[64]; epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); LOG(logLevelError, "Failed to set SO_RCVTIMEO for TDP socket %s: %s.", inetAddressToString(_socketAddress).c_str(), errStr); } LOG(logLevelTrace, "BlockingSocketAbstractCodec constructed (threadId: %u)", epicsThreadGetIdSelf()); } void BlockingSocketAbstractCodec::internalDestroy() { LOG(logLevelTrace, "BlockingSocketAbstractCodec::internalDestroy enter: (threadId: %u)", epicsThreadGetIdSelf()); if(_channel != INVALID_SOCKET) { epicsSocketDestroy(_channel); _channel = INVALID_SOCKET; } } void BlockingSocketAbstractCodec::invalidDataStreamHandler() { LOG(logLevelTrace, "BlockingSocketAbstractCodec::invalidDataStreamHandler enter:" " (threadId: %u)", epicsThreadGetIdSelf()); close(); } int BlockingSocketAbstractCodec::write( epics::pvData::ByteBuffer *src) { LOG(logLevelTrace, "BlockingSocketAbstractCodec::write enter: position:%u, " "remaining:%u (threadId: %u)", src->getPosition(), src->getRemaining(), epicsThreadGetIdSelf()); std::size_t remaining; while((remaining=src->getRemaining()) > 0) { LOG(logLevelTrace, "BlockingSocketAbstractCodec::write before send" " (threadId: %u)", epicsThreadGetIdSelf()); int bytesSent = ::send(_channel, &src->getArray()[src->getPosition()], remaining, 0); LOG(logLevelTrace, "BlockingSocketAbstractCodec::write afer send, read:%d" " (threadId: %u)", bytesSent, epicsThreadGetIdSelf()); if(unlikely(bytesSent<0)) { int socketError = SOCKERRNO; // spurious EINTR check if (socketError==SOCK_EINTR) continue; } if (bytesSent > 0) { src->setPosition(src->getPosition() + bytesSent); } return bytesSent; } //TODO check what to return return -1; } std::size_t BlockingSocketAbstractCodec::getSocketReceiveBufferSize() const { LOG(logLevelTrace, "BlockingSocketAbstractCodec::getSocketReceiveBufferSize" " enter (threadId: %u)", epicsThreadGetIdSelf()); osiSocklen_t intLen = sizeof(int); char strBuffer[64]; int socketRecvBufferSize; int retval = getsockopt(_channel, SOL_SOCKET, SO_RCVBUF, (char *)&socketRecvBufferSize, &intLen); if(retval<0) { epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer)); //LOG(logLevelDebug, "Error getting SO_SNDBUF: %s", strBuffer); } LOG(logLevelTrace, "BlockingSocketAbstractCodec::getSocketReceiveBufferSize" " returning:%u (threadId: %u)", socketRecvBufferSize, epicsThreadGetIdSelf()); return socketRecvBufferSize; } int BlockingSocketAbstractCodec::read(epics::pvData::ByteBuffer* dst) { LOG(logLevelTrace, "BlockingSocketAbstractCodec::read enter: " "read bytes:%u (threadId: %u)", dst->getRemaining(), epicsThreadGetIdSelf()); std::size_t remaining; while((remaining=dst->getRemaining()) > 0) { // read std::size_t pos = dst->getPosition(); LOG(logLevelTrace, "BlockingSocketAbstractCodec::read before recv" " (threadId: %u)", epicsThreadGetIdSelf()); int bytesRead = recv(_channel, (char*)(dst->getArray()+pos), remaining, 0); LOG(logLevelTrace, "BlockingSocketAbstractCodec::read after recv, read: %d", bytesRead," (threadId: %u)", epicsThreadGetIdSelf()); if (IS_LOGGABLE(logLevelTrace)) { hexDump(std::string("READ"), (const int8 *)(dst->getArray()+pos), bytesRead); } if(unlikely(bytesRead<=0)) { if (bytesRead<0) { int socketError = SOCKERRNO; // interrupted or timeout if (socketError == EINTR || socketError == EAGAIN || socketError == EWOULDBLOCK) continue; } return -1; // 0 means connection loss for blocking transport, notify codec by returning -1 } dst->setPosition(dst->getPosition() + bytesRead); return bytesRead; } return 0; } BlockingServerTCPTransportCodec::BlockingServerTCPTransportCodec( Context::shared_pointer const & context, SOCKET channel, std::auto_ptr& responseHandler, int32_t sendBufferSize, int32_t receiveBufferSize) : BlockingTCPTransportCodec(context, channel, responseHandler, sendBufferSize, receiveBufferSize, PVA_DEFAULT_PRIORITY), _lastChannelSID(0) { // NOTE: priority not yet known, default priority is used to //register/unregister // TODO implement priorities in Reactor... not that user will // change it.. still getPriority() must return "registered" priority! LOG(logLevelTrace, "BlockingServerTCPTransportCodec constructed (threadId: %u)", epicsThreadGetIdSelf()); } BlockingServerTCPTransportCodec::~BlockingServerTCPTransportCodec() { LOG(logLevelTrace, "BlockingServerTCPTransportCodec DESTRUCTED (threadId: %u)", epicsThreadGetIdSelf()); } pvAccessID BlockingServerTCPTransportCodec::preallocateChannelSID() { LOG(logLevelTrace, "BlockingServerTCPTransportCodec::preallocateChannelSID enter:" " (threadId: %u)", epicsThreadGetIdSelf()); Lock lock(_channelsMutex); // search first free (theoretically possible loop of death) pvAccessID sid = ++_lastChannelSID; while(_channels.find(sid)!=_channels.end()) sid = ++_lastChannelSID; return sid; } void BlockingServerTCPTransportCodec::registerChannel( pvAccessID sid, ServerChannel::shared_pointer const & channel) { LOG(logLevelTrace, "BlockingServerTCPTransportCodec::registerChannel enter: sid:%d " " (threadId: %u)", channel->getSID(), epicsThreadGetIdSelf()); Lock lock(_channelsMutex); _channels[sid] = channel; } void BlockingServerTCPTransportCodec::unregisterChannel(pvAccessID sid) { LOG(logLevelTrace, "BlockingServerTCPTransportCodec::unregisterChannel enter:" " sid:%d (threadId: %u)", sid, epicsThreadGetIdSelf()); Lock lock(_channelsMutex); _channels.erase(sid); } ServerChannel::shared_pointer BlockingServerTCPTransportCodec::getChannel(pvAccessID sid) { LOG(logLevelTrace, "BlockingServerTCPTransportCodec::getChannel enter:" " sid:%d (threadId: %u)", sid, epicsThreadGetIdSelf()); Lock lock(_channelsMutex); std::map::iterator it = _channels.find(sid); if(it!=_channels.end()) return it->second; return ServerChannel::shared_pointer(); } int BlockingServerTCPTransportCodec::getChannelCount() { LOG(logLevelTrace, "BlockingServerTCPTransportCodec::getChannelCount enter: " "(threadId: %u)", epicsThreadGetIdSelf()); Lock lock(_channelsMutex); return static_cast(_channels.size()); } void BlockingServerTCPTransportCodec::send(ByteBuffer* buffer, TransportSendControl* control) { LOG(logLevelTrace, "BlockingServerTCPTransportCodec::send enter: (threadId: %u)", epicsThreadGetIdSelf()); // // set byte order control message // ensureBuffer(PVA_MESSAGE_HEADER_SIZE); buffer->putByte(PVA_MAGIC); buffer->putByte(PVA_VERSION); buffer->putByte( 0x01 | ((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00)); // control + big endian buffer->putByte(2); // set byte order buffer->putInt(0); // // send verification message // control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32)); // receive buffer size buffer->putInt(static_cast(getReceiveBufferSize())); // socket receive buffer size buffer->putInt(static_cast(getSocketReceiveBufferSize())); // send immediately control->flush(true); } } }