some handlers added

This commit is contained in:
Gasper Jansa
2011-02-07 22:14:39 +01:00
parent bba69beb05
commit 0c855c7326
4 changed files with 404 additions and 52 deletions

View File

@@ -48,9 +48,9 @@ namespace epics {
_handlerTable[0] = new NoopResponse(context, "Beacon");
_handlerTable[1] = new ConnectionValidationHandler(context);
_handlerTable[2] = new EchoHandler(context);
_handlerTable[3] = badResponse;
_handlerTable[3] = new SearchHandler(context);
_handlerTable[4] = badResponse;
_handlerTable[5] = badResponse;
_handlerTable[5] = new IntrospectionSearchHandler(context);
_handlerTable[6] = badResponse;
_handlerTable[7] = badResponse;
_handlerTable[8] = badResponse;
@@ -79,6 +79,8 @@ namespace epics {
delete _handlerTable[0];
delete _handlerTable[1];
delete _handlerTable[2];
delete _handlerTable[3];
delete _handlerTable[5];
delete _handlerTable[27];
delete[] _handlerTable;
}
@@ -165,5 +167,254 @@ namespace epics {
transport->enqueueSendRequest(echoReply);
}
void IntrospectionSearchHandler::handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) {
AbstractServerResponseHandler::handleResponse(responseFrom,
transport, version, command, payloadSize, payloadBuffer);
THROW_BASE_EXCEPTION("not implemented");
}
SearchHandler::SearchHandler(ServerContextImpl* context) :
AbstractServerResponseHandler(context, "Introspection search request")
{
_provider = context->getChannelProvider();
_objectPool = new ChannelFindRequesterImplObjectPool(context);
}
SearchHandler::~SearchHandler()
{
if(_objectPool) delete _objectPool;
}
void SearchHandler::handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) {
AbstractServerResponseHandler::handleResponse(responseFrom,
transport, version, command, payloadSize, payloadBuffer);
transport->ensureData((sizeof(int32)+sizeof(int16))/sizeof(int8)+1);
const int32 searchSequenceId = payloadBuffer->getInt();
const int8 qosCode = payloadBuffer->getByte();
const int32 count = payloadBuffer->getShort() & 0xFFFF;
const boolean responseRequired = (QOS_REPLY_REQUIRED & qosCode) != 0;
for (int32 i = 0; i < count; i++)
{
transport->ensureData(sizeof(int32)/sizeof(int8));
const int32 cid = payloadBuffer->getInt();
const String name = SerializeHelper::deserializeString(payloadBuffer, transport);
// no name check here...
_provider->channelFind(name, _objectPool->get()->set(searchSequenceId, cid, responseFrom, responseRequired));
}
}
ChannelFindRequesterImpl::ChannelFindRequesterImpl(ServerContextImpl* context, ChannelFindRequesterImplObjectPool* objectPool) :
_sendTo(NULL),
_context(context),
_objectPool(objectPool)
{}
void ChannelFindRequesterImpl::clear()
{
Lock guard(_mutex);
_sendTo = NULL;
}
ChannelFindRequesterImpl* ChannelFindRequesterImpl::set(int32 searchSequenceId, int32 cid, osiSockAddr* sendTo, boolean responseRequired)
{
Lock guard(_mutex);
_searchSequenceId = searchSequenceId;
_cid = cid;
_sendTo = sendTo;
_responseRequired = responseRequired;
return this;
}
void ChannelFindRequesterImpl::channelFindResult(epics::pvData::Status* status, ChannelFind* channelFind, boolean wasFound)
{
// TODO status
Lock guard(_mutex);
if (wasFound || _responseRequired)
{
_wasFound = wasFound;
_context->getBroadcastTransport()->enqueueSendRequest(this);
}
}
void ChannelFindRequesterImpl::lock()
{
// noop
}
void ChannelFindRequesterImpl::unlock()
{
// noop
}
void ChannelFindRequesterImpl::acquire()
{
// noop
}
void ChannelFindRequesterImpl::release()
{
// noop
}
void ChannelFindRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control)
{
int32 count = 1;
control->startMessage((int8)4, (sizeof(int32)+sizeof(int8)+128+2*sizeof(int16)+count*sizeof(int32))/sizeof(8));
Lock guard(_mutex);
buffer->putInt(_searchSequenceId);
buffer->putByte(_wasFound ? (int8)1 : (int8)0);
// NOTE: is it possible (very likely) that address is any local address ::ffff:0.0.0.0
encodeAsIPv6Address(buffer, _context->getServerInetAddress());
buffer->putShort((int16)_context->getServerPort());
buffer->putShort((int16)count);
buffer->putInt(_cid);
control->setRecipient(*_sendTo);
// return this object to the pool
_objectPool->put(this);
}
ChannelFindRequesterImplObjectPool::ChannelFindRequesterImplObjectPool(ServerContextImpl* context) :
_context(context)
{}
ChannelFindRequesterImpl* ChannelFindRequesterImplObjectPool::get()
{
Lock guard(_mutex);
const int32 count = _elements.size();
if (count == 0)
{
return new ChannelFindRequesterImpl(_context, this);
}
else
{
ChannelFindRequesterImpl* channelFindRequesterImpl = _elements.back();
_elements.pop_back();
return channelFindRequesterImpl;
}
}
void ChannelFindRequesterImplObjectPool::put(ChannelFindRequesterImpl* element)
{
Lock guard(_mutex);
element->clear();
_elements.push_back(element);
}
ChannelRequesterImpl::ChannelRequesterImpl(Transport* transport, const String channelName, const int32 cid) :
_transport(transport),
_channelName(channelName),
_cid(cid),
_status(NULL),
_channel(NULL)
{
}
void ChannelRequesterImpl::channelCreated(Status* const status, Channel* const channel)
{
Lock guard(_mutex);
_status = status;
_channel = channel;
}
void ChannelRequesterImpl::channelStateChange(Channel* constc, const Channel::ConnectionState isConnected)
{
//noop
}
String ChannelRequesterImpl::getRequesterName()
{
//TODO
// return _transport-> + "/" + _cid;
}
void ChannelRequesterImpl::message(const String message, const epics::pvData::MessageType messageType)
{
// TODO
//System.err.println("[" + messageType + "] " + message);
}
void ChannelRequesterImpl::lock()
{
//noop
}
void ChannelRequesterImpl::unlock()
{
//noop
}
void ChannelRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control)
{
/* Channel* channel;
Status* status;
{
Lock guard(_mutex);
channel = _channel;
status = _status;
}
// error response
if (channel == NULL)
{
createChannelFailedResponse(buffer, control, status);
}
// OK
else
{
ServerChannelImpl serverChannel = NULL;
try
{
// NOTE: we do not explicitly check if transport OK
ChannelHostingTransport* casTransport = static_cast<ChannelHostingTransport*>(_transport);
//
// create a new channel instance
//
int sid = casTransport->preallocateChannelSID();
try
{
//TODO
serverChannel = new ServerChannelImpl(channel, _cid, sid, casTransport->getSecurityToken());
// ack allocation and register
casTransport->registerChannel(sid, serverChannel);
} catch (...)
{
// depreallocate and rethrow
casTransport->depreallocateChannelSID(sid);
throw;
}
control->startMessage((int8)7, 2*sizeof(int32)/sizeof(int8));
buffer->putInt(_cid);
buffer->putInt(sid);
_transport->getIntrospectionRegistry()->serializeStatus(buffer, control, status);
}
catch (...)
{
errlogPrintf(errlogMinor, "Exception caught when creating channel: %s", _channelName);
createChannelFailedResponse(buffer, control,
//BaseChannelRequester.statusCreate.createStatus(StatusType.FATAL, "failed to create channel", th));
// if (serverChannel != null)
// serverChannel.destroy();
}
}*/
}
}
}

View File

@@ -133,6 +133,130 @@ namespace epics {
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer);
};
/**
* Introspection search request handler.
*/
class IntrospectionSearchHandler : public AbstractServerResponseHandler
{
public:
/**
* @param context
*/
IntrospectionSearchHandler(ServerContextImpl* context) :
AbstractServerResponseHandler(context, "Search request") {
}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer);
};
/**
* Introspection search request handler.
*/
class ChannelFindRequesterImplObjectPool;
class SearchHandler : public AbstractServerResponseHandler
{
public:
/**
* @param context
*/
SearchHandler(ServerContextImpl* context);
~SearchHandler();
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer);
private:
ChannelProvider* _provider;
ChannelFindRequesterImplObjectPool* _objectPool;
};
class ChannelFindRequesterImpl: public ChannelFindRequester, public TransportSender
{
public:
ChannelFindRequesterImpl(ServerContextImpl* context, ChannelFindRequesterImplObjectPool* objectPool);
void clear();
ChannelFindRequesterImpl* set(int32 searchSequenceId, int32 cid, osiSockAddr* sendTo, boolean responseRequired);
void channelFindResult(epics::pvData::Status* status, ChannelFind* channelFind, boolean wasFound);
void lock();
void unlock();
void acquire();
void release();
void send(ByteBuffer* buffer, TransportSendControl* control);
private:
int32 _searchSequenceId;
int32 _cid;
osiSockAddr* _sendTo;
boolean _responseRequired;
boolean _wasFound;
epics::pvData::Mutex _mutex;
ServerContextImpl* _context;
ChannelFindRequesterImplObjectPool* _objectPool;
};
class ChannelFindRequesterImplObjectPool
{
public:
ChannelFindRequesterImplObjectPool(ServerContextImpl* context);
ChannelFindRequesterImpl* get();
void put(ChannelFindRequesterImpl* element);
private:
std::vector<ChannelFindRequesterImpl*> _elements;
epics::pvData::Mutex _mutex;
ServerContextImpl* _context;
};
/**
* Create channel request handler.
*/
class CreateChannelHandler : public AbstractServerResponseHandler
{
public:
/**
* @param context
*/
CreateChannelHandler(ServerContextImpl* context) :
AbstractServerResponseHandler(context, "Create channel request") {
}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer);
private:
void disconnect(Transport* transport);
};
class ServerChannelImpl;
class ChannelRequesterImpl : public ChannelRequester, public TransportSender
{
public:
ChannelRequesterImpl(Transport* transport, const String channelName, const int32 cid);
void channelCreated(Status* const status, Channel* const channel);
void channelStateChange(Channel* const c, const Channel::ConnectionState isConnected);
String getRequesterName();
void message(const String message, const epics::pvData::MessageType messageType);
void lock();
void unlock();
void send(ByteBuffer* buffer, TransportSendControl* control);
private:
Transport* _transport;
const String _channelName;
const int32 _cid;
Status* _status;
Channel* _channel;
epics::pvData::Mutex _mutex;
void createChannelFailedResponse(ByteBuffer* buffer, TransportSendControl* control, Status* const status);
};
}
}

View File

@@ -35,9 +35,7 @@ ServerContextImpl::ServerContextImpl():
_acceptor(NULL),
_transportRegistry(NULL),
_channelAccess(NULL),
//TODO CAJ_DEFAULT_PROVIDER is not defined
_channelProviderName("local"),
//_channelProviderName(CAJ_DEFAULT_PROVIDER),
_channelProviderName(CAJ_DEFAULT_PROVIDER),
_channelProvider(NULL),
_beaconServerStatusProvider(NULL)
@@ -50,7 +48,6 @@ ServerContextImpl::~ServerContextImpl()
{
if(_beaconEmitter) delete _beaconEmitter;
if(_broadcastTransport) delete _broadcastTransport;
if(_broadcastConnector) delete _broadcastConnector;
if(_acceptor) delete _acceptor;
if(_transportRegistry) delete _transportRegistry;
if(_timer) delete _timer;
@@ -166,34 +163,32 @@ void ServerContextImpl::initializeBroadcastTransport()
// where to send address
SOCKET socket = epicsSocketCreate(AF_INET, SOCK_STREAM, IPPROTO_TCP);
InetAddrVector* broadcasts = getBroadcastAddresses(socket,_broadcastPort);
if (socket == INVALID_SOCKET)
{
THROW_BASE_EXCEPTION("Failed to initialize broadcast UDP transport");
}
auto_ptr<InetAddrVector> broadcastAddresses(getBroadcastAddresses(socket,_broadcastPort));
epicsSocketDestroy(socket);
_broadcastConnector = new BlockingUDPConnector(true, true);
_broadcastTransport = static_cast<BlockingUDPTransport*>(_broadcastConnector->connect(
auto_ptr<BlockingUDPConnector> broadcastConnector(new BlockingUDPConnector(true, true));
_broadcastTransport = static_cast<BlockingUDPTransport*>(broadcastConnector->connect(
NULL, new ServerResponseHandler(this),
listenLocalAddress, CA_MINOR_PROTOCOL_REVISION,
CA_DEFAULT_PRIORITY));
_broadcastTransport->setBroadcastAddresses(broadcasts);
if(broadcasts) delete broadcasts;
_broadcastTransport->setBroadcastAddresses(broadcastAddresses.get());
// set ignore address list
if (_ignoreAddressList.length() > 0)
if (!_ignoreAddressList.empty())
{
// we do not care about the port
InetAddrVector* list = getSocketAddressList(_ignoreAddressList, 0, NULL);
if (list != NULL && list->size() > 0)
auto_ptr<InetAddrVector> list(getSocketAddressList(_ignoreAddressList, 0, NULL));
if (list.get() != NULL && list->size() > 0)
{
_broadcastTransport->setIgnoredAddresses(list);
_broadcastTransport->setIgnoredAddresses(list.get());
}
if(list) delete list;
}
// set broadcast address list
if (_beaconAddressList.length() > 0)
if (!_beaconAddressList.empty())
{
// if auto is true, add it to specified list
InetAddrVector* appendList = NULL;
@@ -202,16 +197,19 @@ void ServerContextImpl::initializeBroadcastTransport()
appendList = _broadcastTransport->getSendAddresses();
}
InetAddrVector* list = getSocketAddressList(_beaconAddressList, _broadcastPort, appendList);
if (list != NULL && list->size() > 0)
auto_ptr<InetAddrVector> list(getSocketAddressList(_beaconAddressList, _broadcastPort, appendList));
if (list.get() != NULL && list->size() > 0)
{
_broadcastTransport->setBroadcastAddresses(list);
_broadcastTransport->setBroadcastAddresses(list.get());
}
if(list) delete list;
}
_broadcastTransport->start();
}
catch (std::exception& e)
{
THROW_BASE_EXCEPTION_CAUSE("Failed to initialize broadcast UDP transport", e);
}
catch (...)
{
THROW_BASE_EXCEPTION("Failed to initialize broadcast UDP transport");
@@ -251,6 +249,7 @@ void ServerContextImpl::run(int32 seconds)
// run...
_beaconEmitter->start();
//TODO review this
if(seconds == 0)
{
_runEvent.wait();
@@ -274,7 +273,6 @@ void ServerContextImpl::shutdown()
THROW_BASE_EXCEPTION("Context already destroyed.");
}
// notify to stop running...
_runEvent.signal();
}
@@ -317,13 +315,6 @@ void ServerContextImpl::internalDestroy()
_beaconEmitter->destroy();
}
// stop timer
if (_timer != NULL)
{
//TODO there is no stop in Timer
// _timer->stop();
}
// this will also destroy all channels
destroyAllTransports();
}
@@ -338,18 +329,18 @@ void ServerContextImpl::destroyAllTransports()
}
int32 size;
Transport** transports = _transportRegistry->toArray(size);
auto_ptr<Transport*> transports(_transportRegistry->toArray(size));
if (size == 0)
{
return;
}
errlogSevPrintf(errlogMajor, "Server context still has %d transport(s) active and closing...", size);
errlogSevPrintf(errlogInfo, "Server context still has %d transport(s) active and closing...", size);
for (int i = 0; i < size; i++)
{
Transport* transport = transports[i];
Transport* transport = transports.get()[i];
try
{
transport->close(true);
@@ -365,8 +356,6 @@ void ServerContextImpl::destroyAllTransports()
errlogSevPrintf(errlogMajor, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__);
}
}
delete[] transports;
}
void ServerContextImpl::printInfo()
@@ -502,19 +491,17 @@ TransportRegistry* ServerContextImpl::getTransportRegistry()
return _transportRegistry;
}
//TODO what with this?
Channel* ServerContextImpl::getChannel(pvAccessID id)
{
//TODO
return NULL;
}
//TODO what with this?
Transport* ServerContextImpl::getSearchTransport()
{
//TODO
return NULL;
}
}
}

View File

@@ -326,16 +326,6 @@ private:
*/
Timer* _timer;
/**
* Reactor.
*/
//Reactor _reactor;
/**
* Leader/followers thread pool.
*/
//LeaderFollowersThreadPool _leaderFollowersThreadPool;
/**
* Broadcast transport needed for channel searches.
*/