From 16aa5fba25c8149f5691904170bd4c4d7318738d Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Thu, 30 Dec 2010 13:53:52 +0100 Subject: [PATCH] Added MonitorHandler. Porting of BlockingTCPTransport now complete. TODO: 1) check thread sync and "notify" in Java make C++ code as equivalent as possible. 2) Debug :) --- pvAccessApp/ca/caConstants.h | 2 + pvAccessApp/remote/blockingTCP.h | 8 +++ pvAccessApp/remote/blockingTCPTransport.cpp | 67 +++++++++++++++++++-- 3 files changed, 71 insertions(+), 6 deletions(-) diff --git a/pvAccessApp/ca/caConstants.h b/pvAccessApp/ca/caConstants.h index adf89d5..b3ef88b 100644 --- a/pvAccessApp/ca/caConstants.h +++ b/pvAccessApp/ca/caConstants.h @@ -73,6 +73,8 @@ namespace epics { /** Invalid data type. */ const int16 INVALID_DATA_TYPE = (int16)0xFFFF; + /** Invalid IOID. */ + const int32 CAJ_INVALID_IOID = 0; } } diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index a89f71e..ed13b5f 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -28,6 +28,8 @@ using namespace epics::pvData; namespace epics { namespace pvAccess { + class MonitorSender; + enum ReceiveStage { READ_FROM_SOCKET, PROCESS_HEADER, PROCESS_PAYLOAD, NONE }; @@ -155,6 +157,8 @@ namespace epics { virtual void enqueueSendRequest(TransportSender* sender); + void enqueueMonitorSendRequest(TransportSender* sender); + protected: /** * Connection status @@ -320,6 +324,10 @@ namespace epics { epicsThreadId _sendThreadId; + GrowingCircularBuffer* _monitorSendQueue; + + MonitorSender* _monitorSender; + /** * Internal method that clears and releases buffer. * sendLock and sendBufferLock must be hold while calling this method. diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index 0d89f81..6d06846 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -8,11 +8,13 @@ #include "blockingTCP.h" #include "inetAddressUtil.h" #include "growingCircularBuffer.h" +#include "caConstants.h" /* pvData */ #include #include #include +#include /* EPICSv3 */ #include @@ -35,6 +37,31 @@ using std::ostringstream; namespace epics { namespace pvAccess { + class MonitorSender : public TransportSender, public NoDefaultMethods { + public: + MonitorSender(Mutex* monitorMutex, GrowingCircularBuffer< + TransportSender*>* monitorSendQueue) : + _monitorMutex(monitorMutex), + _monitorSendQueue(monitorSendQueue) { + } + + virtual ~MonitorSender() { + } + + virtual void lock() { + } + + virtual void unlock() { + } + + virtual void + send(ByteBuffer* buffer, TransportSendControl* control); + + private: + Mutex* _monitorMutex; + GrowingCircularBuffer* _monitorSendQueue; + }; + BlockingTCPTransport::BlockingTCPTransport(SOCKET channel, ResponseHandler* responseHandler, int receiveBufferSize, short priority) : @@ -56,7 +83,10 @@ namespace epics { _flushRequested(false), _sendBufferSentPosition(0), _flushStrategy(DELAYED), _sendQueue( new GrowingCircularBuffer (100)), - _rcvThreadId(NULL), _sendThreadId(NULL) { + _rcvThreadId(NULL), _sendThreadId(NULL), _monitorSendQueue( + new GrowingCircularBuffer (100)), + _monitorSender(new MonitorSender(_monitorMutex, + _monitorSendQueue)) { _socketBuffer = new ByteBuffer(max(MAX_TCP_RECV +MAX_ENSURE_DATA_BUFFER_SIZE, receiveBufferSize)); @@ -94,7 +124,6 @@ namespace epics { // TODO: add to registry //context.getTransportRegistry().put(this); - } BlockingTCPTransport::~BlockingTCPTransport() { @@ -122,8 +151,7 @@ namespace epics { epicsThreadStackMedium), BlockingTCPTransport::rcvThreadRunner, this); - threadName = "TCP-send "+inetAddressToString( - _socketAddress); + threadName = "TCP-send "+inetAddressToString(_socketAddress); errlogSevPrintf(errlogInfo, "Starting thread: %s", threadName.c_str()); @@ -133,7 +161,6 @@ namespace epics { epicsThreadStackMedium), BlockingTCPTransport::sendThreadRunner, this); - } void BlockingTCPTransport::clearAndReleaseBuffer() { @@ -342,7 +369,9 @@ namespace epics { // no more data and we have some payload left => read buffer if(_storedPayloadSize>=size) { - //System.out.println("storedPayloadSize >= size, remaining:" + socketBuffer.remaining()); + //errlogSevPrintf(errlogInfo, + // "storedPayloadSize >= size, remaining: %d", + // _socketBuffer->getRemaining()); // just read up remaining payload // since there is no data on the buffer, read to the beginning of it, at least size bytes @@ -811,5 +840,31 @@ namespace epics { _sendQueue->insert(sender); } + void BlockingTCPTransport::enqueueMonitorSendRequest( + TransportSender* sender) { + Lock lock(_monitorMutex); + _monitorSendQueue->insert(sender); + if(_monitorSendQueue->size()==1) enqueueSendRequest(_monitorSender); + } + + void MonitorSender::send(ByteBuffer* buffer, + TransportSendControl* control) { + control->startMessage(19, 0); + + while(true) { + TransportSender* sender; + _monitorMutex->lock(); + sender = _monitorSendQueue->extract(); + _monitorMutex->unlock(); + + if(sender==NULL) { + control->ensureBuffer(sizeof(int32)); + buffer->putInt(CAJ_INVALID_IOID); + break; + } + sender->send(buffer, control); + } + } + } }