Files
pvAccess/pvAccessApp/remote/simpleChannelSearchManagerImpl.cpp
2011-11-08 14:34:45 +01:00

298 lines
8.2 KiB
C++

/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#include <pv/simpleChannelSearchManagerImpl.h>
#include <pv/caConstants.h>
#include <pv/blockingUDP.h>
#include <stdlib.h>
#include <time.h>
#include <pv/timeStamp.h>
#include <vector>
using namespace std;
using namespace epics::pvData;
namespace epics {
namespace pvAccess {
const int SimpleChannelSearchManagerImpl::DATA_COUNT_POSITION = CA_MESSAGE_HEADER_SIZE + sizeof(int32)/sizeof(int8) + 1;
const int SimpleChannelSearchManagerImpl::PAYLOAD_POSITION = sizeof(int16)/sizeof(int8) + 2;
// 225ms +/- 25ms random
const double SimpleChannelSearchManagerImpl::ATOMIC_PERIOD = 0.225;
const int SimpleChannelSearchManagerImpl::PERIOD_JITTER_MS = 25;
const int SimpleChannelSearchManagerImpl::BOOST_VALUE = 1;
// must be power of two (so that search is done)
const int SimpleChannelSearchManagerImpl::MAX_COUNT_VALUE = 1 << 7;
const int SimpleChannelSearchManagerImpl::MAX_FALLBACK_COUNT_VALUE = (1 << 6) + 1;
const int SimpleChannelSearchManagerImpl::MAX_FRAMES_AT_ONCE = 10;
const int SimpleChannelSearchManagerImpl::DELAY_BETWEEN_FRAMES_MS = 50;
SimpleChannelSearchManagerImpl::SimpleChannelSearchManagerImpl(Context::shared_pointer const & context) :
m_context(context),
m_canceled(),
m_sequenceNumber(0),
m_sendBuffer(MAX_UDP_SEND),
m_channels(),
m_timerNode(*this),
m_lastTimeSent(),
m_mockTransportSendControl(),
m_mutex()
{
// initialize send buffer
initializeSendBuffer();
// initialize random seed with some random value
srand ( time(NULL) );
// add some jitter so that all the clients do not send at the same time
double period = ATOMIC_PERIOD + (rand() % (2*PERIOD_JITTER_MS+1) - PERIOD_JITTER_MS)/(double)1000;
context->getTimer()->schedulePeriodic(m_timerNode, period, period);
//new Thread(this, "pvAccess immediate-search").start();
}
SimpleChannelSearchManagerImpl::~SimpleChannelSearchManagerImpl()
{
cancel();
}
void SimpleChannelSearchManagerImpl::cancel()
{
Lock guard(m_mutex);
if (m_canceled.get())
return;
m_canceled.set();
m_timerNode.cancel();
}
int32_t SimpleChannelSearchManagerImpl::registeredCount()
{
Lock guard(m_channelMutex);
return m_channels.size();
}
void SimpleChannelSearchManagerImpl::registerSearchInstance(SearchInstance::shared_pointer const & channel)
{
if (m_canceled.get())
return;
Lock guard(m_channelMutex);
//overrides if already registered
m_channels[channel->getSearchInstanceID()] = channel;
int32_t& userValue = channel->getUserValue();
userValue = 1;
}
void SimpleChannelSearchManagerImpl::unregisterSearchInstance(SearchInstance::shared_pointer const & channel)
{
Lock guard(m_channelMutex);
pvAccessID id = channel->getSearchInstanceID();
std::map<pvAccessID,SearchInstance::shared_pointer>::iterator channelsIter = m_channels.find(id);
if(channelsIter != m_channels.end())
m_channels.erase(id);
}
void SimpleChannelSearchManagerImpl::searchResponse(pvAccessID cid, int32_t seqNo, int8_t minorRevision, osiSockAddr* serverAddress)
{
Lock guard(m_channelMutex);
std::map<pvAccessID,SearchInstance::shared_pointer>::iterator channelsIter = m_channels.find(cid);
if(channelsIter == m_channels.end())
{
guard.unlock();
// minor hack to enable duplicate reports
SearchInstance::shared_pointer si = std::tr1::dynamic_pointer_cast<SearchInstance>(m_context.lock()->getChannel(cid));
if (si)
si->searchResponse(minorRevision, serverAddress);
}
else
{
SearchInstance::shared_pointer si = channelsIter->second;
// remove from search list
m_channels.erase(cid);
guard.unlock();
// then notify SearchInstance
si->searchResponse(minorRevision, serverAddress);
}
}
void SimpleChannelSearchManagerImpl::beaconAnomalyNotify()
{
boost();
callback();
}
void SimpleChannelSearchManagerImpl::initializeSendBuffer()
{
// for now OK, since it is only set here
m_sequenceNumber++;
// new buffer
m_sendBuffer.clear();
m_sendBuffer.putByte(CA_MAGIC);
m_sendBuffer.putByte(CA_VERSION);
m_sendBuffer.putByte((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00); // data + 7-bit endianess
m_sendBuffer.putByte((int8_t)3); // search
m_sendBuffer.putInt(sizeof(int32_t)/sizeof(int8_t) + 1); // "zero" payload
m_sendBuffer.putInt(m_sequenceNumber);
/*
final boolean REQUIRE_REPLY = false;
sendBuffer.put(REQUIRE_REPLY ? (byte)QoS.REPLY_REQUIRED.getMaskValue() : (byte)QoS.DEFAULT.getMaskValue());
*/
m_sendBuffer.putByte((int8_t)QOS_DEFAULT);
m_sendBuffer.putShort((int16_t)0); // count
}
void SimpleChannelSearchManagerImpl::flushSendBuffer()
{
Lock guard(m_mutex);
Transport::shared_pointer tt = m_context.lock()->getSearchTransport();
BlockingUDPTransport::shared_pointer ut = std::tr1::static_pointer_cast<BlockingUDPTransport>(tt);
ut->send(&m_sendBuffer); // TODO
initializeSendBuffer();
}
bool SimpleChannelSearchManagerImpl::generateSearchRequestMessage(SearchInstance::shared_pointer const & channel,
ByteBuffer* requestMessage, TransportSendControl* control)
{
epics::pvData::int16 dataCount = requestMessage->getShort(DATA_COUNT_POSITION);
dataCount++;
/*
if(dataCount >= MAX_SEARCH_BATCH_COUNT)
return false;
*/
const epics::pvData::String name = channel->getSearchInstanceName();
// not nice...
const int addedPayloadSize = sizeof(int32)/sizeof(int8) + (1 + sizeof(int32)/sizeof(int8) + name.length());
if(((int)requestMessage->getRemaining()) < addedPayloadSize)
return false;
requestMessage->putInt(channel->getSearchInstanceID());
SerializeHelper::serializeString(name, requestMessage, control);
requestMessage->putInt(PAYLOAD_POSITION, requestMessage->getPosition() - CA_MESSAGE_HEADER_SIZE);
requestMessage->putShort(DATA_COUNT_POSITION, dataCount);
return true;
}
bool SimpleChannelSearchManagerImpl::generateSearchRequestMessage(SearchInstance::shared_pointer const & channel,
bool allowNewFrame, bool flush)
{
Lock guard(m_mutex);
bool success = generateSearchRequestMessage(channel, &m_sendBuffer, &m_mockTransportSendControl);
// buffer full, flush
if(!success)
{
flushSendBuffer();
if(allowNewFrame)
generateSearchRequestMessage(channel, &m_sendBuffer, &m_mockTransportSendControl);
if (flush)
flushSendBuffer();
return true;
}
if (flush)
flushSendBuffer();
return flush;
}
void SimpleChannelSearchManagerImpl::boost()
{
Lock guard(m_channelMutex);
std::map<pvAccessID,SearchInstance::shared_pointer>::iterator channelsIter = m_channels.begin();
for(; channelsIter != m_channels.end(); channelsIter++)
{
int32_t& userValue = channelsIter->second->getUserValue();
userValue = BOOST_VALUE;
}
}
void SimpleChannelSearchManagerImpl::callback()
{
// high-frequency beacon anomaly trigger guard
{
Lock guard(m_mutex);
epics::pvData::TimeStamp now;
now.getCurrent();
int64_t nowMS = now.getMilliseconds();
if (nowMS - m_lastTimeSent < 100)
return;
m_lastTimeSent = nowMS;
}
int frameSent = 0;
vector<SearchInstance::shared_pointer> toSend;
{
Lock guard(m_channelMutex);
toSend.reserve(m_channels.size());
std::map<pvAccessID,SearchInstance::shared_pointer>::iterator channelsIter = m_channels.begin();
for(; channelsIter != m_channels.end(); channelsIter++)
toSend.push_back(channelsIter->second);
}
vector<SearchInstance::shared_pointer>::iterator siter = toSend.begin();
for (; siter != toSend.end(); siter++)
{
int32_t& countValue = (*siter)->getUserValue();
bool skip = !isPowerOfTwo(countValue);
if (countValue == MAX_COUNT_VALUE)
countValue = MAX_FALLBACK_COUNT_VALUE;
else
countValue++;
// back-off
if (skip)
continue;
if (generateSearchRequestMessage(*siter, true, false))
frameSent++;
if (frameSent == MAX_FRAMES_AT_ONCE)
{
epicsThreadSleep(DELAY_BETWEEN_FRAMES_MS/(double)1000.0);
frameSent = 0;
}
}
flushSendBuffer();
}
bool SimpleChannelSearchManagerImpl::isPowerOfTwo(int32_t x)
{
return ((x > 0) && (x & (x - 1)) == 0);
}
void SimpleChannelSearchManagerImpl::timerStopped()
{
}
}}