Added MonitorHandler. Porting of BlockingTCPTransport now complete.

TODO: 1) check thread sync and "notify" in Java make C++ code as equivalent as possible.
2) Debug :)
This commit is contained in:
miha_vitorovic
2010-12-30 13:53:52 +01:00
parent 3fc6171075
commit 16aa5fba25
3 changed files with 71 additions and 6 deletions

View File

@@ -8,11 +8,13 @@
#include "blockingTCP.h"
#include "inetAddressUtil.h"
#include "growingCircularBuffer.h"
#include "caConstants.h"
/* pvData */
#include <lock.h>
#include <byteBuffer.h>
#include <epicsException.h>
#include <noDefaultMethods.h>
/* EPICSv3 */
#include <osdSock.h>
@@ -35,6 +37,31 @@ using std::ostringstream;
namespace epics {
namespace pvAccess {
class MonitorSender : public TransportSender, public NoDefaultMethods {
public:
MonitorSender(Mutex* monitorMutex, GrowingCircularBuffer<
TransportSender*>* monitorSendQueue) :
_monitorMutex(monitorMutex),
_monitorSendQueue(monitorSendQueue) {
}
virtual ~MonitorSender() {
}
virtual void lock() {
}
virtual void unlock() {
}
virtual void
send(ByteBuffer* buffer, TransportSendControl* control);
private:
Mutex* _monitorMutex;
GrowingCircularBuffer<TransportSender*>* _monitorSendQueue;
};
BlockingTCPTransport::BlockingTCPTransport(SOCKET channel,
ResponseHandler* responseHandler, int receiveBufferSize,
short priority) :
@@ -56,7 +83,10 @@ namespace epics {
_flushRequested(false), _sendBufferSentPosition(0),
_flushStrategy(DELAYED), _sendQueue(
new GrowingCircularBuffer<TransportSender*> (100)),
_rcvThreadId(NULL), _sendThreadId(NULL) {
_rcvThreadId(NULL), _sendThreadId(NULL), _monitorSendQueue(
new GrowingCircularBuffer<TransportSender*> (100)),
_monitorSender(new MonitorSender(_monitorMutex,
_monitorSendQueue)) {
_socketBuffer = new ByteBuffer(max(MAX_TCP_RECV
+MAX_ENSURE_DATA_BUFFER_SIZE, receiveBufferSize));
@@ -94,7 +124,6 @@ namespace epics {
// TODO: add to registry
//context.getTransportRegistry().put(this);
}
BlockingTCPTransport::~BlockingTCPTransport() {
@@ -122,8 +151,7 @@ namespace epics {
epicsThreadStackMedium),
BlockingTCPTransport::rcvThreadRunner, this);
threadName = "TCP-send "+inetAddressToString(
_socketAddress);
threadName = "TCP-send "+inetAddressToString(_socketAddress);
errlogSevPrintf(errlogInfo, "Starting thread: %s",
threadName.c_str());
@@ -133,7 +161,6 @@ namespace epics {
epicsThreadStackMedium),
BlockingTCPTransport::sendThreadRunner, this);
}
void BlockingTCPTransport::clearAndReleaseBuffer() {
@@ -342,7 +369,9 @@ namespace epics {
// no more data and we have some payload left => read buffer
if(_storedPayloadSize>=size) {
//System.out.println("storedPayloadSize >= size, remaining:" + socketBuffer.remaining());
//errlogSevPrintf(errlogInfo,
// "storedPayloadSize >= size, remaining: %d",
// _socketBuffer->getRemaining());
// just read up remaining payload
// since there is no data on the buffer, read to the beginning of it, at least size bytes
@@ -811,5 +840,31 @@ namespace epics {
_sendQueue->insert(sender);
}
void BlockingTCPTransport::enqueueMonitorSendRequest(
TransportSender* sender) {
Lock lock(_monitorMutex);
_monitorSendQueue->insert(sender);
if(_monitorSendQueue->size()==1) enqueueSendRequest(_monitorSender);
}
void MonitorSender::send(ByteBuffer* buffer,
TransportSendControl* control) {
control->startMessage(19, 0);
while(true) {
TransportSender* sender;
_monitorMutex->lock();
sender = _monitorSendQueue->extract();
_monitorMutex->unlock();
if(sender==NULL) {
control->ensureBuffer(sizeof(int32));
buffer->putInt(CAJ_INVALID_IOID);
break;
}
sender->send(buffer, control);
}
}
}
}