first successful message test

This commit is contained in:
Matej Sekoranja
2011-01-09 20:07:03 +01:00
parent 8f0b4d8594
commit 5c5305635b
4 changed files with 239 additions and 46 deletions

View File

@@ -1,3 +1,4 @@
/* testRemoteClientImpl.cpp */
/* Author: Matej Sekoranja Date: 2011.1.1 */
@@ -13,6 +14,8 @@
#include <caConstants.h>
#include <timer.h>
#include <blockingUDP.h>
#include <blockingTCP.h>
#include <namedLockPattern.h>
#include <inetAddressUtil.h>
#include <hexDump.h>
#include <remote.h>
@@ -428,7 +431,38 @@ typedef std::map<pvAccessID, ResponseRequest*> IOIDResponseRequestMap;
class ClientContextImpl;
class DebugResponse : public ResponseHandler, private epics::pvData::NoDefaultMethods {
public:
/**
* @param context
*/
DebugResponse()
{
}
virtual ~DebugResponse() {
}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer)
{
char ipAddrStr[48];
std::cout << "ole" << std::endl;
ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr));
std::cout << "ole2" << std::endl;
ostringstream prologue;
prologue<<"Message [0x"<<hex<<(int)command<<", v0x"<<hex;
prologue<<(int)version<<"] received from "<<ipAddrStr;
std::cout << "ole / " << prologue.str() << std::endl;
hexDump(prologue.str(), "received",
(const int8*)payloadBuffer->getArray(),
payloadBuffer->getPosition(), payloadSize);
}
};
/**
* CA response handler - main handler which dispatches responses to appripriate handlers.
@@ -458,7 +492,7 @@ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoD
* @param context
*/
ClientResponseHandler(ClientContextImpl* context) : m_context(context) {
static ResponseHandler* badResponse = 0; //new BadResponse(context);
static ResponseHandler* badResponse = new DebugResponse();
static ResponseHandler* dataResponse = 0; //new DataResponseHandler(context);
#define HANDLER_COUNT 28
@@ -497,6 +531,8 @@ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoD
Transport* transport, int8 version, int8 command,
int payloadSize, ByteBuffer* payloadBuffer)
{
int c = command+0;
std::cout << "received " << c << std::endl;
if (command < 0 || command >= HANDLER_COUNT)
{
// TODO context.getLogger().fine("Invalid (or unsupported) command: " + command + ".");
@@ -509,12 +545,28 @@ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoD
}
// delegate
m_handlerTable[command]->handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
m_handlerTable[c]->handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
}
};
class TCI : public TransportSendControl {
public:
virtual void flushSerializeBuffer() {
}
virtual void ensureBuffer(int size) {
}
virtual void startMessage(int8 command, int ensureCapacity){}
virtual void endMessage() {}
virtual void flush(bool lastMessageCompleted) {}
virtual void setRecipient(const osiSockAddr& sendTo) {}
};
#include <arrayFIFO.h>
@@ -548,19 +600,37 @@ class BaseSearchInstance : public SearchInstance
virtual void removeAndUnsetListOwnership() {}
virtual int getOwnerIndex() { return 0; }
virtual bool generateSearchRequestMessage(ByteBuffer* buffer, TransportSendControl* control) { return false; };
virtual bool generateSearchRequestMessage(ByteBuffer* requestMessage, TransportSendControl* control)
{
const int DATA_COUNT_POSITION = CA_MESSAGE_HEADER_SIZE + sizeof(int32)/sizeof(int8) + 1;
const int PAYLOAD_POSITION = sizeof(int16)/sizeof(int8) + 2;
int16 dataCount = requestMessage->getShort(DATA_COUNT_POSITION);
dataCount++;
if(dataCount >= MAX_SEARCH_BATCH_COUNT)
{
return false;
}
const string name = getChannelName();
// not nice...
const int addedPayloadSize = sizeof(int32)/sizeof(int8) + (1 + sizeof(int32)/sizeof(int8) + name.length());
if(requestMessage->getRemaining() < addedPayloadSize)
{
return false;
}
requestMessage->putInt(getChannelID());
SerializeHelper::serializeString(name, requestMessage, control);
requestMessage->putInt(PAYLOAD_POSITION, requestMessage->getPosition() - CA_MESSAGE_HEADER_SIZE);
requestMessage->putShort(DATA_COUNT_POSITION, dataCount);
return true;
};
};
class ChannelSearchManager { // tODO no default, etc.
public:
virtual void registerChannel(SearchInstance* channel) = 0;
virtual void unregisterChannel(SearchInstance* channel) = 0;
};
class BlockingTCPConnector;
class NamedLockPattern;
class BeaconHandlerImpl;
@@ -591,9 +661,52 @@ enum ContextState {
};
class ClientContextImpl : public ClientContext
class ClientContextImpl : public ClientContext,
public Context /* TODO */
{
class ChannelSearchManager { // tODO no default, etc.
ClientContextImpl* _context;
public:
ChannelSearchManager(ClientContextImpl* context):
_context(context) {
}
virtual void registerChannel(SearchInstance* channel) {
ByteBuffer sendBuffer(100, EPICS_ENDIAN_BIG);
// new buffer
sendBuffer.clear();
sendBuffer.putShort(CA_MAGIC_AND_VERSION);
sendBuffer.putByte((int8)0); // data
sendBuffer.putByte((int8)3); // search
sendBuffer.putInt(5); // "zero" payload
sendBuffer.putInt(0);
sendBuffer.putByte((int8)0);
sendBuffer.putShort((int16)0); // count
TCI tci;
channel->generateSearchRequestMessage(&sendBuffer, &tci);
std::cout << "sending..." << sendBuffer.getPosition() << " bytes." << std::endl;
_context->getSearchTransport()->send(&sendBuffer);
};
virtual void unregisterChannel(SearchInstance* channel) {};
};
/**
* Implementation of CAJ JCA <code>Channel</code>.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
@@ -1196,7 +1309,6 @@ class ChannelImpl :
// TODO
}
virtual void getField(GetFieldRequester *requester,epics::pvData::String subField)
{
requester->getDone(getStatusCreate()->getStatusOK(),m_pvStructure->getSubField(subField)->getField());
@@ -1394,7 +1506,25 @@ class ChannelImpl :
Lock lock(&m_contextMutex);
return m_provider;
}
virtual Timer* getTimer()
{
Lock lock(&m_contextMutex);
return m_timer;
}
virtual TransportRegistry* getTransportRegistry()
{
Lock lock(&m_contextMutex);
return m_transportRegistry;
}
virtual BlockingUDPTransport* getSearchTransport()
{
Lock lock(&m_contextMutex);
return m_searchTransport;
}
virtual void initialize() {
Lock lock(&m_contextMutex);
@@ -1484,36 +1614,92 @@ class ChannelImpl :
void internalInitialize() {
m_timer = new Timer("pvAccess-client timer", lowPriority);
/* TODO
connector = new BlockingTCPConnector(this, receiveBufferSize, beaconPeriod);
transportRegistry = new TransportRegistry();
namedLocker = new NamedLockPattern();
*/
m_connector = new BlockingTCPConnector(this, m_receiveBufferSize, m_beaconPeriod);
m_transportRegistry = new TransportRegistry();
m_namedLocker = new NamedLockPattern<String>();
// setup UDP transport
initializeUDPTransport();
// TODO
// setup search manager
//channelSearchManager = new ChannelSearchManager(this);
m_channelSearchManager = new ChannelSearchManager(this);
}
void initializeUDPTransport() {
// TODO
}
/**
* Initialized UDP transport (broadcast socket and repeater connection).
*/
void initializeUDPTransport() {
// setup UDP transport
try
{
// where to bind (listen) address
osiSockAddr listenLocalAddress;
listenLocalAddress.ia.sin_family = AF_INET;
listenLocalAddress.ia.sin_port = htons(m_broadcastPort);
listenLocalAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY);
// where to send address
InetAddrVector* broadcastAddresses = getSocketAddressList("192.168.1.255", m_broadcastPort);
// TODO getBroadcastAddresses(broadcastPort)
/// TOD !!!! addresses !!!!! by pointer and not copied
BlockingUDPConnector* broadcastConnector = new BlockingUDPConnector(true, broadcastAddresses, true);
m_broadcastTransport = (BlockingUDPTransport*)broadcastConnector->connect(
0, new ClientResponseHandler(this),
listenLocalAddress, CA_MINOR_PROTOCOL_REVISION,
CA_DEFAULT_PRIORITY);
BlockingUDPConnector* searchConnector = new BlockingUDPConnector(false, broadcastAddresses, true);
// undefined address
osiSockAddr undefinedAddress;
undefinedAddress.ia.sin_family = AF_INET;
undefinedAddress.ia.sin_port = htons(0);
undefinedAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY);
m_searchTransport = (BlockingUDPTransport*)searchConnector->connect(
0, new ClientResponseHandler(this),
undefinedAddress, CA_MINOR_PROTOCOL_REVISION,
CA_DEFAULT_PRIORITY);
// set broadcast address list
if (!m_addressList.empty())
{
// if auto is true, add it to specified list
InetAddrVector* appendList = 0;
if (m_autoAddressList)
appendList = m_broadcastTransport->getSendAddresses();
InetAddrVector* list = getSocketAddressList(m_addressList, m_broadcastPort, appendList);
// TODO delete !!!!
if (list && list->size()) {
m_broadcastTransport->setBroadcastAddresses(list);
m_searchTransport->setBroadcastAddresses(list);
}
}
m_broadcastTransport->start();
m_searchTransport->start();
}
catch (...)
{
// TODO
}
}
void internalDestroy() {
// stop searching
/* TODO
if (m_channelSearchManager)
channelSearchManager->destroy();
*/
delete m_channelSearchManager; //->destroy();
// stop timer
if (m_timer)
delete m_timer;
//
// cleanup
//
@@ -1521,13 +1707,15 @@ class ChannelImpl :
// this will also close all CA transports
destroyAllChannels();
// close broadcast transport
/* TODO
// TODO destroy !!!
if (m_broadcastTransport)
m_broadcastTransport->destroy(true);
if (m_searchTransport != null)
m_searchTransport->destroy(true);
*/
delete m_broadcastTransport; //->destroy(true);
if (m_searchTransport)
delete m_searchTransport; //->destroy(true);
if (m_namedLocker) delete m_namedLocker;
if (m_transportRegistry) delete m_transportRegistry;
if (m_connector) delete m_connector;
m_provider->destroy();
delete m_version;
@@ -1779,7 +1967,7 @@ class ChannelImpl :
/**
* Context instance.
*/
NamedLockPattern* m_namedLocker;
NamedLockPattern<String>* m_namedLocker;
/**
* Context instance.
@@ -2077,13 +2265,17 @@ int main(int argc,char *argv[])
{
ClientContextImpl* context = new ClientContextImpl();
context->printInfo();
context->initialize();
context->printInfo();
epicsThreadSleep ( 1.0 );
ChannelFindRequesterImpl findRequester;
context->getProvider()->channelFind("something", &findRequester);
//ChannelFindRequesterImpl findRequester;
//context->getProvider()->channelFind("something", &findRequester);
ChannelRequesterImpl channelRequester;
Channel* channel = context->getProvider()->createChannel("test", &channelRequester);
Channel* channel = context->getProvider()->createChannel("structureArrayTest", &channelRequester);
channel->printInfo();
/*
GetFieldRequesterImpl getFieldRequesterImpl;
@@ -2122,6 +2314,7 @@ int main(int argc,char *argv[])
monitor->destroy();
*/
epicsThreadSleep ( 10.0 );
channel->destroy();
context->destroy();