Merge pull request #17 from mdavidsaver/miscclean

Misc. cleanup and minor features
This commit is contained in:
Matej Sekoranja
2015-11-25 00:25:38 +01:00
12 changed files with 194 additions and 271 deletions

View File

@@ -99,7 +99,7 @@ class ChannelPipelineMonitorImpl :
{
Lock guard(m_freeQueueLock);
m_freeQueue.reserve(m_queueSize);
for (int32 i = 0; i < m_queueSize; i++)
for (size_t i = 0; i < m_queueSize; i++)
{
PVStructure::shared_pointer pvStructure = getPVDataCreate()->createPVStructure(structure);
MonitorElement::shared_pointer monitorElement(new MonitorElement(pvStructure));

View File

@@ -87,7 +87,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
void BlockingUDPTransport::start() {
string threadName = "UDP-receive " + inetAddressToString(_bindAddress);
string threadName = "UDP-rx " + inetAddressToString(_bindAddress);
if (IS_LOGGABLE(logLevelTrace))
{
@@ -243,7 +243,17 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
_receiveBuffer->flip();
processBuffer(thisTransport, fromAddress, _receiveBuffer.get());
try{
processBuffer(thisTransport, fromAddress, _receiveBuffer.get());
}catch(std::exception& e){
LOG(logLevelError,
"an exception caught while in UDP receiveThread at %s:%d: %s",
__FILE__, __LINE__, e.what());
} catch (...) {
LOG(logLevelError,
"unknown exception caught while in UDP receiveThread at %s:%d.",
__FILE__, __LINE__);
}
}
}
else if (unlikely(bytesRead == -1)) {
@@ -282,7 +292,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
if (IS_LOGGABLE(logLevelTrace))
{
string threadName = "UDP-receive "+inetAddressToString(_bindAddress);
string threadName = "UDP-rx "+inetAddressToString(_bindAddress);
LOG(logLevelTrace, "Thread '%s' exiting.", threadName.c_str());
}

View File

@@ -1019,101 +1019,7 @@ namespace epics {
std::size_t elementCount, std::size_t elementSize)
{
return false;
// _socketBuffer == existingBuffer
/*
std::size_t bytesToBeDeserialized = elementCount*elementSize;
// TODO check if bytesToDeserialized < threshold that direct pays off?
_directPayloadRead = bytesToBeDeserialized;
_directBuffer = deserializeTo;
while (true)
{
// retrieve what's already in buffers
size_t availableBytes = min(_directPayloadRead, _socketBuffer->getRemaining());
existingBuffer->getArray(_directBuffer, availableBytes);
_directPayloadRead -= availableBytes;
if (_directPayloadRead == 0)
return true;
_directBuffer += availableBytes;
// subtract what was already processed
size_t pos = _socketBuffer->getPosition();
_storedPayloadSize -= pos -_storedPosition;
_storedPosition = pos;
// no more data and we have some payload left => read buffer
if (likely(_storedPayloadSize > 0))
{
size_t bytesToRead = std::min(_directPayloadRead, _storedPayloadSize);
processReadIntoDirectBuffer(bytesToRead);
// std::cout << "d: " << bytesToRead << std::endl;
_storedPayloadSize -= bytesToRead;
_directPayloadRead -= bytesToRead;
}
if (_directPayloadRead == 0)
return true;
_stage = PROCESS_HEADER;
processReadCached(true, UNDEFINED_STAGE, _directPayloadRead);
_storedPosition = _socketBuffer->getPosition();
_storedLimit = _socketBuffer->getLimit();
_socketBuffer->setLimit(
min(_storedPosition + _storedPayloadSize, _storedLimit)
);
}
return true;
*/
}
/*
void AbstractCodec::processReadIntoDirectBuffer(size_t bytesToRead)
{
while (bytesToRead > 0)
{
ssize_t bytesRead = recv(_channel, _directBuffer, bytesToRead, 0);
// std::cout << "d: " << bytesRead << std::endl;
if(unlikely(bytesRead<=0))
{
if (bytesRead<0)
{
int socketError = SOCKERRNO;
// interrupted or timeout
if (socketError == EINTR ||
socketError == EAGAIN ||
socketError == EWOULDBLOCK)
continue;
}
// error (disconnect, end-of-stream) detected
close();
THROW_BASE_EXCEPTION("bytesRead < 0");
return;
}
bytesToRead -= bytesRead;
_directBuffer += bytesRead;
}
}
*/
//
//
// BlockingAbstractCodec
@@ -1200,11 +1106,11 @@ namespace epics {
try {
bac->processRead();
} catch (std::exception &e) {
LOG(logLevelWarn,
LOG(logLevelError,
"an exception caught while in receiveThread at %s:%d: %s",
__FILE__, __LINE__, e.what());
} catch (...) {
LOG(logLevelWarn,
LOG(logLevelError,
"unknown exception caught while in receiveThread at %s:%d.",
__FILE__, __LINE__);
}
@@ -1228,11 +1134,6 @@ namespace epics {
bac->processWrite();
} catch (connection_closed_exception &cce) {
// noop
/*
LOG(logLevelDebug,
"connection closed by remote host while in sendThread at %s:%d: %s",
__FILE__, __LINE__, e.what());
*/
} catch (std::exception &e) {
LOG(logLevelWarn,
"an exception caught while in sendThread at %s:%d: %s",

View File

@@ -21,6 +21,14 @@
#include <osiSock.h>
#include <epicsTime.h>
#include <epicsThread.h>
#include <epicsVersion.h>
#ifdef EPICS_VERSION_INT
#if EPICS_VERSION_INT>=VERSION_INT(3,15,1,0)
#include <epicsAtomic.h>
#define PVA_CODEC_USE_ATOMIC
#endif
#endif
#include <pv/byteBuffer.h>
#include <pv/pvType.h>
@@ -48,8 +56,40 @@ namespace epics {
namespace pvAccess {
namespace detail {
// TODO replace mutex with atomic (CAS) operations
template<typename T>
#ifdef PVA_CODEC_USE_ATOMIC
#undef PVA_CODEC_USE_ATOMIC
template<typename T>
class AtomicValue
{
T val;
public:
AtomicValue() :val(0) {}
inline T getAndSet(T newval)
{
int oldval;
// epicsAtomic doesn't have unconditional swap
do {
oldval = epics::atomic::get(val);
}while(epics::atomic::compareAndSwap(val, oldval, newval)!=oldval);
return oldval;
}
inline T get() { return epics::atomic::get(val); }
};
// treat bool as int
template<>
class AtomicValue<bool>
{
AtomicValue<int> realval;
public:
inline bool getAndSet(bool newval)
{
return this->realval.getAndSet(newval?1:0)!=0;
}
inline bool get() { return !!this->realval.get(); }
};
#else
template<typename T>
class AtomicValue
{
public:
@@ -69,6 +109,7 @@ namespace epics {
T _value;
epics::pvData::Mutex mutex;
};
#endif
// TODO replace this queue with lock-free implementation

View File

@@ -181,7 +181,7 @@ void SimpleChannelSearchManagerImpl::initializeSendBuffer()
m_sendBuffer.putByte(PVA_MAGIC);
m_sendBuffer.putByte(PVA_VERSION);
m_sendBuffer.putByte((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00); // data + 7-bit endianess
m_sendBuffer.putByte((int8_t)3); // search
m_sendBuffer.putByte(CMD_SEARCH);
m_sendBuffer.putInt(4+1+3+16+2+1); // "zero" payload
m_sendBuffer.putInt(m_sequenceNumber);

View File

@@ -2160,106 +2160,6 @@ namespace epics {
}
}
/*
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) {
bool notify = false;
{
Lock guard(m_mutex);
// if in overrun mode, check if some is free
if (m_overrunInProgress)
{
MonitorElementPtr newElement = m_monitorQueue.getFree();
if (newElement.get() != 0)
{
// take new, put current in use
PVStructurePtr pvStructure = m_monitorElement->pvStructurePtr;
copyUnchecked(pvStructure, newElement->pvStructurePtr);
BitSetUtil::compress(m_monitorElement->changedBitSet, pvStructure);
BitSetUtil::compress(m_monitorElement->overrunBitSet, pvStructure);
m_monitorQueue.setUsed(m_monitorElement);
m_monitorElement = newElement;
notify = true;
m_overrunInProgress = false;
}
}
}
if (notify)
{
EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this()));
}
{
Lock guard(m_mutex);
// setup current fields
PVStructurePtr pvStructure = m_monitorElement->pvStructurePtr;
BitSet::shared_pointer changedBitSet = m_monitorElement->changedBitSet;
BitSet::shared_pointer overrunBitSet = m_monitorElement->overrunBitSet;
// special treatment if in overrun state
if (m_overrunInProgress)
{
// lazy init
if (m_bitSet1.get() == 0) m_bitSet1.reset(new BitSet(changedBitSet->size()));
if (m_bitSet2.get() == 0) m_bitSet2.reset(new BitSet(overrunBitSet->size()));
m_bitSet1->deserialize(payloadBuffer, transport.get());
pvStructure->deserialize(payloadBuffer, transport.get(), m_bitSet1.get());
m_bitSet2->deserialize(payloadBuffer, transport.get());
// OR local overrun
// TODO this does not work perfectly if bitSet is compressed !!!
// uncompressed bitSets should be used !!!
overrunBitSet->or_and(*(changedBitSet.get()), *(m_bitSet1.get()));
// OR remove change
*(changedBitSet.get()) |= *(m_bitSet1.get());
// OR remote overrun
*(overrunBitSet.get()) |= *(m_bitSet2.get());
}
else
{
// deserialize changedBitSet and data, and overrun bit set
changedBitSet->deserialize(payloadBuffer, transport.get());
pvStructure->deserialize(payloadBuffer, transport.get(), changedBitSet.get());
overrunBitSet->deserialize(payloadBuffer, transport.get());
}
// prepare next free (if any)
MonitorElementPtr newElement = m_monitorQueue.getFree();
if (newElement.get() == 0) {
m_overrunInProgress = true;
return;
}
// if there was overrun in progress we manipulated bitSets... compress them
if (m_overrunInProgress) {
BitSetUtil::compress(changedBitSet, pvStructure);
BitSetUtil::compress(overrunBitSet, pvStructure);
m_overrunInProgress = false;
}
copyUnchecked(pvStructure, newElement->pvStructurePtr);
m_monitorQueue.setUsed(m_monitorElement);
m_monitorElement = newElement;
}
EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this()));
}
*/
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) {

View File

@@ -295,10 +295,9 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom,
{
// TODO object pool!!!
int providerCount = _providers.size();
ServerChannelFindRequesterImpl* pr = new ServerChannelFindRequesterImpl(_context, providerCount);
pr->set(name, searchSequenceId, cid, responseAddress, responseRequired, false);
std::tr1::shared_ptr<ServerChannelFindRequesterImpl> tp(new ServerChannelFindRequesterImpl(_context, providerCount));
tp->set(name, searchSequenceId, cid, responseAddress, responseRequired, false);
// TODO use std::make_shared
std::tr1::shared_ptr<ServerChannelFindRequesterImpl> tp(pr);
ChannelFindRequester::shared_pointer spr = tp;
for (int i = 0; i < providerCount; i++)
@@ -314,11 +313,10 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom,
#define MAX_SERVER_SEARCH_RESPONSE_DELAY_MS 100
double period = (rand() % MAX_SERVER_SEARCH_RESPONSE_DELAY_MS)/(double)1000;
ServerChannelFindRequesterImpl* pr = new ServerChannelFindRequesterImpl(_context, 1);
pr->set("", searchSequenceId, 0, responseAddress, true, true);
std::tr1::shared_ptr<ServerChannelFindRequesterImpl> tp(new ServerChannelFindRequesterImpl(_context, 1));
tp->set("", searchSequenceId, 0, responseAddress, true, true);
// TODO use std::make_shared
std::tr1::shared_ptr<ServerChannelFindRequesterImpl> tp(pr);
TimerCallback::shared_pointer tc = tp;
_context->getTimer()->scheduleAfterDelay(tc, period);
}
@@ -540,7 +538,7 @@ public:
PVStringArray::shared_pointer allChannelNames = result->getSubField<PVStringArray>("value");
ChannelListRequesterImpl::shared_pointer listListener(new ChannelListRequesterImpl());
std::vector<ChannelProvider::shared_pointer> providers = m_serverContext->getChannelProviders();
std::vector<ChannelProvider::shared_pointer>& providers = m_serverContext->getChannelProviders();
size_t providerCount = providers.size();
for (size_t i = 0; i < providerCount; i++)

View File

@@ -552,7 +552,7 @@ void ServerContextImpl::dispose()
}
catch(...)
{
// noop
std::cerr<<"Oh no, something when wrong in ServerContextImpl::dispose!\n";
}
}

View File

@@ -8,6 +8,8 @@
#include <pv/epicsException.h>
#include <osiSock.h>
#define epicsExportSharedSymbols
#include <pv/configuration.h>
@@ -242,8 +244,6 @@ void Properties::list()
SystemConfigurationImpl::SystemConfigurationImpl() :
_properties(new Properties())
{
_envParam.name = new char[256];
_envParam.pdflt = NULL;
// no exception, default value is taken
//_ibuffer.exceptions ( ifstream::failbit | ifstream::badbit );
//_obuffer.exceptions ( ifstream::failbit | ifstream::badbit );
@@ -251,7 +251,6 @@ SystemConfigurationImpl::SystemConfigurationImpl() :
SystemConfigurationImpl::~SystemConfigurationImpl()
{
if(_envParam.name) delete[] _envParam.name;
}
bool SystemConfigurationImpl::getPropertyAsBoolean(const string &name, const bool defaultValue)
@@ -332,8 +331,7 @@ float SystemConfigurationImpl::getPropertyAsDouble(const string &name, const dou
string SystemConfigurationImpl::getPropertyAsString(const string &name, const string &defaultValue)
{
strncpy(_envParam.name,name.c_str(),name.length() + 1);
const char* val = envGetConfigParamPtr(&_envParam);
const char* val = getenv(name.c_str());
if(val != NULL)
{
return _properties->getProperty(name, string(val));
@@ -341,10 +339,25 @@ string SystemConfigurationImpl::getPropertyAsString(const string &name, const st
return _properties->getProperty(name, defaultValue);
}
bool SystemConfigurationImpl::getPropertyAsAddress(const std::string& name, osiSockAddr* addr)
{
unsigned short dftport=0;
if(addr->sa.sa_family==AF_INET)
dftport = ntohs(addr->ia.sin_port);
std::string val(getPropertyAsString(name, ""));
if(val.empty()) return false;
addr->ia.sin_family = AF_INET;
if(aToIPAddr(val.c_str(), dftport, &addr->ia))
return false;
return true;
}
bool SystemConfigurationImpl::hasProperty(const string &key)
{
strncpy(_envParam.name,key.c_str(),key.length() + 1);
const char* val = envGetConfigParamPtr(&_envParam);
const char* val = getenv(key.c_str());
return (val != NULL) || _properties->hasProperty(key);
}

View File

@@ -31,6 +31,8 @@
#include <shareLib.h>
union osiSockAddr; // defined in osiSock;
namespace epics {
namespace pvAccess {
@@ -135,6 +137,17 @@ public:
* @return environment variable value as std::string or default value if it does not exist.
*/
virtual std::string getPropertyAsString(const std::string &name, const std::string &defaultValue) = 0;
/**
* Fetch and parse as a socket address and port number (address family set accordingly).
* At present only numeric addresses are parsed (eg. "127.0.0.1:4242").
*
* The storage pointed to be addr should be initialized with a default value, or zeroed.
*
* @param name name of the environment variable to return.
* @pram addr pointer to the address struct to be filled in
* @return true if addr now contains an address, false otherwise
*/
virtual bool getPropertyAsAddress(const std::string& name, osiSockAddr* addr) = 0;
virtual bool hasProperty(const std::string &name) = 0;
};
@@ -149,10 +162,10 @@ public:
float getPropertyAsFloat(const std::string &name, const float defaultValue);
float getPropertyAsDouble(const std::string &name, const double defaultValue);
std::string getPropertyAsString(const std::string &name, const std::string &defaultValue);
bool getPropertyAsAddress(const std::string& name, osiSockAddr* addr);
bool hasProperty(const std::string &name);
std::auto_ptr<Properties> _properties;
private:
ENV_PARAM _envParam;
std::istringstream _ibuffer;
std::ostringstream _obuffer;

View File

@@ -44,7 +44,7 @@ TESTSCRIPTS_HOST += $(TESTS:%=%.t)
#testHarness_SRCS += namedLockPatternTest.cpp
#TESTS += namedLockPatternTest
#TESTPROD_HOST += configurationTest
#configurationTest_SRCS += configurationTest.cpp
TESTPROD_HOST += configurationTest
configurationTest_SRCS += configurationTest.cpp
#testHarness_SRCS += configurationTest.cpp
#TESTS += configurationTest
TESTS += configurationTest

View File

@@ -4,13 +4,20 @@
*/
#include <pv/configuration.h>
#include <pv/CDRMonitor.h>
#include <iostream>
#include <string>
#include <memory>
#include <stdlib.h>
#include <epicsAssert.h>
#include <epicsExit.h>
#include <iostream>
#include <string>
#include <stdlib.h>
#include <envDefs.h>
#include <osiSock.h>
#include <epicsUnitTest.h>
#include <testMain.h>
#ifdef _WIN32
void setenv(char * a, char * b, int c)
@@ -25,62 +32,102 @@ using namespace epics::pvAccess;
using namespace epics::pvData;
using namespace std;
int main(int argc, char *argv[])
static void showEnv(const char *name)
{
SystemConfigurationImpl* configuration = new SystemConfigurationImpl();
bool boolProperty = configuration->getPropertyAsBoolean("boolProperty", true);
assert(boolProperty == true);
testDiag("%s = \"%s\"", name, getenv(name));
}
int32 intProperty = configuration->getPropertyAsInteger("intProperty", 1);
assert(intProperty == 1);
static void setEnv(const char *name, const char *val)
{
epicsEnvSet(name, val);
testDiag("%s = \"%s\"", name, getenv(name));
}
float floatProperty = configuration->getPropertyAsFloat("floatProperty", 3);
assert(floatProperty == 3);
static void showAddr(const osiSockAddr& addr)
{
char buf[40];
sockAddrToDottedIP(&addr.sa, buf, sizeof(buf));
testDiag("%s", buf);
}
double doubleProperty = configuration->getPropertyAsDouble("doubleProperty", -3);
assert(doubleProperty == -3);
#define TESTVAL(TYPE, VAL1, VAL2, VAL1S) do {\
showEnv(#TYPE "Property"); \
testOk1(configuration->getPropertyAs##TYPE(#TYPE "Property", VAL1) == VAL1); \
testOk1(configuration->getPropertyAs##TYPE(#TYPE "Property", VAL2) == VAL2); \
setEnv(#TYPE "Property", VAL1S); \
testOk1(configuration->getPropertyAs##TYPE(#TYPE "Property", VAL1) == VAL1); \
testOk1(configuration->getPropertyAs##TYPE(#TYPE "Property", VAL2) == VAL1); \
} while(0)
string stringProperty = configuration->getPropertyAsString("stringProperty", "string");
assert(stringProperty == string("string"));
ConfigurationProviderImpl* configProvider = ConfigurationFactory::getProvider();
configProvider->registerConfiguration("conf1",static_cast<Configuration*>(configuration));
MAIN(configurationTest)
{
testPlan(35);
testDiag("Default configuration");
Configuration::shared_pointer configuration(new SystemConfigurationImpl());
SystemConfigurationImpl* configurationOut = static_cast<SystemConfigurationImpl*>(configProvider->getConfiguration("conf1"));
assert(configurationOut == configuration);
TESTVAL(String, "one", "two", "one");
TESTVAL(Boolean, true, false, "true");
TESTVAL(Integer, 100, 321, "100");
TESTVAL(Float, 42.0e3, 44.0e3, "42.0e3");
TESTVAL(Double, 42.0e3, 44.0e3, "42.0e3");
intProperty = configuration->getPropertyAsInteger("intProperty", 2);
assert(intProperty == 1);
testDiag("IP Address w/o default or explicit port");
floatProperty = configuration->getPropertyAsFloat("floatProperty", 4);
assert(floatProperty == 3);
showEnv("AddressProperty");
osiSockAddr addr;
memset(&addr, 0, sizeof(addr));
addr.ia.sin_family = AF_INET+1; // something not IPv4
addr.ia.sin_port = htons(42);
doubleProperty = configuration->getPropertyAsDouble("doubleProperty", -4);
assert(doubleProperty == -3);
testOk1(configuration->getPropertyAsAddress("AddressProperty", &addr)==false);
setEnv("AddressProperty", "127.0.0.1"); // no port
testOk1(configuration->getPropertyAsAddress("AddressProperty", &addr)==true);
showAddr(addr);
stringProperty = configuration->getPropertyAsString("stringProperty", "string1");
assert(stringProperty == string("string"));
testOk1(addr.ia.sin_family==AF_INET);
testOk1(ntohl(addr.ia.sin_addr.s_addr)==INADDR_LOOPBACK);
testOk1(ntohs(addr.ia.sin_port)==0);
setenv("boolProperty1", "1", 1);
boolProperty = configuration->getPropertyAsInteger("boolProperty1", 0);
assert(boolProperty == true);
testDiag("IP Address w/ default port");
setenv("intProperty1", "45", 1);
intProperty = configuration->getPropertyAsInteger("intProperty1", 2);
assert(intProperty == 45);
memset(&addr, 0, sizeof(addr));
addr.ia.sin_family = AF_INET;
addr.ia.sin_port = htons(42);
setenv("floatProperty1", "22", 1);
floatProperty = configuration->getPropertyAsFloat("floatProperty1", 3);
assert(floatProperty == 22);
testOk1(configuration->getPropertyAsAddress("AddressProperty", &addr)==true);
showAddr(addr);
setenv("dobuleProperty1", "42", 1);
doubleProperty = configuration->getPropertyAsDouble("dobuleProperty1", -3);
assert(doubleProperty == 42);
testOk1(addr.ia.sin_family==AF_INET);
testOk1(ntohl(addr.ia.sin_addr.s_addr)==INADDR_LOOPBACK);
testOk1(ntohs(addr.ia.sin_port)==42);
if(configProvider) delete configProvider;
epicsExitCallAtExits();
CDRMonitor::get().show(stdout, true);
return 0;
testDiag("IP Address w/ default and explicit port");
setEnv("AddressProperty", "127.0.0.1:43"); // no port
testOk1(configuration->getPropertyAsAddress("AddressProperty", &addr)==true);
showAddr(addr);
memset(&addr, 0, sizeof(addr));
addr.ia.sin_family = AF_INET;
addr.ia.sin_port = htons(42);
testOk1(configuration->getPropertyAsAddress("AddressProperty", &addr)==true);
showAddr(addr);
testOk1(addr.ia.sin_family==AF_INET);
testOk1(ntohl(addr.ia.sin_addr.s_addr)==INADDR_LOOPBACK);
testOk1(ntohs(addr.ia.sin_port)==43);
testDiag("register with global configuration listings");
ConfigurationProvider::shared_pointer configProvider(ConfigurationFactory::getProvider());
configProvider->registerConfiguration("conf1", configuration);
Configuration::shared_pointer configurationOut(configProvider->getConfiguration("conf1"));
testOk1(configurationOut.get() == configuration.get());
return testDone();
}