new ByteBuffer support

This commit is contained in:
Matej Sekoranja
2011-09-08 11:15:36 +02:00
parent ee0d56fcae
commit 54d435972b
16 changed files with 67 additions and 27 deletions

View File

@@ -32,6 +32,9 @@ namespace epics {
_introspectionRegistry(true),
_lastChannelSID(0)
{
// for performance testing
// setSendQueueFlushStrategy(IMMEDIATE);
// NOTE: priority not yet known, default priority is used to register/unregister
// TODO implement priorities in Reactor... not that user will
// change it.. still getPriority() must return "registered" priority!

View File

@@ -317,7 +317,7 @@ namespace epics {
}
void BlockingTCPTransport::ensureBuffer(int size) {
if(_sendBuffer->getRemaining()>=size) return;
if((int)(_sendBuffer->getRemaining())>=size) return;
// too large for buffer...
if(_maxPayloadSize<size) {
@@ -329,7 +329,7 @@ namespace epics {
// TODO sync _closed
while(_sendBuffer->getRemaining()<size&&!_closed)
while(((int)_sendBuffer->getRemaining())<size&&!_closed)
flush(false);
if(_closed) THROW_BASE_EXCEPTION("transport closed");
@@ -338,11 +338,12 @@ namespace epics {
void BlockingTCPTransport::endMessage(bool hasMoreSegments) {
if(_lastMessageStartPosition>=0) {
// TODO align?
// set message size
_sendBuffer->putInt(_lastMessageStartPosition+sizeof(int16)+2,
_sendBuffer->getPosition()-_lastMessageStartPosition
-CA_MESSAGE_HEADER_SIZE);
int flagsPosition = _lastMessageStartPosition+sizeof(int16);
// set segmented bit
if(hasMoreSegments) {
@@ -387,7 +388,7 @@ namespace epics {
void BlockingTCPTransport::ensureData(int size) {
// enough of data?
if(_socketBuffer->getRemaining()>=size) return;
if(((int)_socketBuffer->getRemaining())>=size) return;
// too large for buffer...
if(_maxPayloadSize<size) {
@@ -441,7 +442,7 @@ namespace epics {
// TODO sync _closed
// add if missing...
if(!_closed&&_socketBuffer->getRemaining()<size)
if(!_closed&&((int)_socketBuffer->getRemaining())<size)
ensureData(size);
}
@@ -477,7 +478,7 @@ namespace epics {
// read at least requiredBytes bytes
int requiredPosition = (currentStartPosition+requiredBytes);
uintptr_t requiredPosition = (currentStartPosition+requiredBytes);
while(_socketBuffer->getPosition()<requiredPosition) {
// read
int pos = _socketBuffer->getPosition();
@@ -514,7 +515,7 @@ namespace epics {
if(_stage==PROCESS_HEADER) {
// ensure CAConstants.CA_MESSAGE_HEADER_SIZE bytes of data
if(_socketBuffer->getRemaining()<CA_MESSAGE_HEADER_SIZE)
if(((int)_socketBuffer->getRemaining())<CA_MESSAGE_HEADER_SIZE)
processReadCached(true, PROCESS_HEADER, CA_MESSAGE_HEADER_SIZE, false);
// first byte is CA_MAGIC
@@ -806,7 +807,7 @@ namespace epics {
if(_delay>0) epicsThreadSleep(_delay);
if(_sendQueue.empty()) {
// if (hasMonitors || sendBuffer.position() > CAConstants.CA_MESSAGE_HEADER_SIZE)
if(_sendBuffer->getPosition()>CA_MESSAGE_HEADER_SIZE)
if(((int)_sendBuffer->getPosition())>CA_MESSAGE_HEADER_SIZE)
_flushRequested = true;
else
_sendQueueEvent.wait();

View File

@@ -54,6 +54,7 @@ namespace epics {
// set receive timeout so that we do not have problems at shutdown (recvfrom would block)
struct timeval timeout;
bzero(&timeout, sizeof(struct timeval));
timeout.tv_sec = 1;
timeout.tv_usec = 0;
@@ -251,7 +252,7 @@ namespace epics {
bool BlockingUDPTransport::processBuffer(Transport::shared_pointer const & thisTransport, osiSockAddr& fromAddress, ByteBuffer* receiveBuffer) {
// handle response(s)
while(receiveBuffer->getRemaining()>=CA_MESSAGE_HEADER_SIZE) {
while((int)receiveBuffer->getRemaining()>=CA_MESSAGE_HEADER_SIZE) {
//
// read header
//
@@ -271,7 +272,7 @@ namespace epics {
int nextRequestPosition = receiveBuffer->getPosition() + payloadSize;
// payload size check
if(nextRequestPosition>receiveBuffer->getLimit()) return false;
if(nextRequestPosition>(int)receiveBuffer->getLimit()) return false;
// handle
_responseHandler->handleResponse(&fromAddress, thisTransport,

View File

@@ -82,7 +82,7 @@ bool BaseSearchInstance::generateSearchRequestMessage(ByteBuffer* requestMessage
// not nice...
const int addedPayloadSize = sizeof(int32)/sizeof(int8) + (1 + sizeof(int32)/sizeof(int8) + name.length());
if(requestMessage->getRemaining() < addedPayloadSize)
if(((int)requestMessage->getRemaining()) < addedPayloadSize)
{
return false;
}

View File

@@ -962,6 +962,6 @@ int main(int argc,char *argv[])
std::cout << "-----------------------------------------------------------------------" << std::endl;
epicsExitCallAtExits();
CDRMonitor::get().show(stdout);
CDRMonitor::get().show(stdout, true);
return(0);
}

View File

@@ -182,7 +182,7 @@ int main(int argc,char *argv[])
std::cout << "-----------------------------------------------------------------------" << std::endl;
epicsExitCallAtExits();
CDRMonitor::get().show(stdout);
CDRMonitor::get().show(stdout, true);
return 0;
}

View File

@@ -54,6 +54,10 @@ PROD_HOST += pvput
pvput_SRCS += pvput.cpp
pvput_LIBS += pvData pvAccess Com
#PROD_HOST += testGetPerformance
#testGetPerformance_SRCS += testGetPerformance.cpp
#testGetPerformance_LIBS += pvData pvAccess Com
include $(TOP)/configure/RULES
#----------------------------------------
# ADD RULES AFTER THIS LINE

View File

@@ -90,6 +90,6 @@ int main(int argc,char *argv[])
epicsThreadSleep ( 1.0 );
std::cout << "-----------------------------------------------------------------------" << std::endl;
epicsExitCallAtExits();
CDRMonitor::get().show(stdout);
CDRMonitor::get().show(stdout, true);
return(0);
}

View File

@@ -591,6 +591,6 @@ int main(int argc,char *argv[])
epicsThreadSleep ( 1.0 ); }
std::cout << "-----------------------------------------------------------------------" << std::endl;
epicsExitCallAtExits();
CDRMonitor::get().show(stdout);
CDRMonitor::get().show(stdout, true);
return(0);
}

View File

@@ -588,12 +588,43 @@ class MockChannel : public Channel {
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannel);
ScalarType stype = pvDouble;
String allProperties("alarm,timeStamp,display,control,valueAlarm");
m_pvStructure.reset(getStandardPVField()->scalar(0,name,stype,allProperties));
PVDouble *pvField = m_pvStructure->getDoubleField(String("value"));
pvField->put(1.123);
if (m_name.find("array") == 0)
{
String allProperties("alarm,timeStamp,display,control");
m_pvStructure.reset(getStandardPVField()->scalarArray(0,name,pvDouble,allProperties));
PVDoubleArray *pvField = static_cast<PVDoubleArray*>(m_pvStructure->getScalarArrayField(String("value"), pvDouble));
int v = 0;
int ix = 0;
int COUNT = 1000;
pvField->setCapacity(1000*COUNT);
for (int n = 0; n < 1000; n++)
{
double array[COUNT];
for (int i = 0; i < COUNT; i++)
{
array[i] = v; v+=1.1;
}
pvField->put(ix, COUNT, array, 0);
ix += COUNT;
}
/*
printf("array prepared------------------------------------!!!\n");
String str;
pvField->toString(&str);
printf("%s\n", str.c_str());
printf("=============------------------------------------!!!\n");
*/
}
else
{
String allProperties("alarm,timeStamp,display,control,valueAlarm");
m_pvStructure.reset(getStandardPVField()->scalar(0,name,pvDouble,allProperties));
PVDouble *pvField = m_pvStructure->getDoubleField(String("value"));
pvField->put(1.123);
}
}
public:
@@ -1115,6 +1146,6 @@ int main(int argc, char *argv[])
epicsThreadSleep ( 1.0 );
std::cout << "-----------------------------------------------------------------------" << std::endl;
epicsExitCallAtExits();
CDRMonitor::get().show(stdout);
CDRMonitor::get().show(stdout, true);
return (0);
}

View File

@@ -94,6 +94,6 @@ int main(int argc, char *argv[])
cout << "Done" << endl;
epicsExitCallAtExits();
CDRMonitor::get().show(stdout);
CDRMonitor::get().show(stdout, true);
return (0);
}

View File

@@ -70,7 +70,7 @@ int main(int argc, char *argv[])
if(configProvider) delete configProvider;
epicsExitCallAtExits();
CDRMonitor::get().show(stdout);
CDRMonitor::get().show(stdout, true);
return 0;
}

View File

@@ -119,7 +119,7 @@ int main(int argc, char *argv[]) {
cout<<"Testing \"encodeAsIPv6Address\""<<endl;
ByteBuffer* buff = new ByteBuffer();
ByteBuffer* buff = new ByteBuffer(32);
char src[] = { (char)0, (char)0, (char)0, (char)0, (char)0, (char)0,
(char)0, (char)0, (char)0, (char)0, (char)0xFF, (char)0xFF,

View File

@@ -448,7 +448,7 @@ int main(int argc, char *argv[])
if(serverRegistry) delete serverRegistry;
epicsExitCallAtExits();
CDRMonitor::get().show(stdout);
CDRMonitor::get().show(stdout, true);
cout << "DONE" << endl;
return 0;
}

View File

@@ -223,7 +223,7 @@ int main(int argc, char *argv[])
}
epicsExitCallAtExits();
CDRMonitor::get().show(stdout);
CDRMonitor::get().show(stdout, true);
return 0;
}

View File

@@ -154,7 +154,7 @@ int main(int argc, char *argv[])
addrArray.clear();
if(registry) delete registry;
epicsExitCallAtExits();
CDRMonitor::get().show(stdout);
CDRMonitor::get().show(stdout, true);
return 0;
}