more responses implemented

This commit is contained in:
Matej Sekoranja
2011-01-11 22:35:43 +01:00
parent d30cb10439
commit f72e495a99
4 changed files with 171 additions and 38 deletions

View File

@@ -37,6 +37,24 @@ namespace epics {
errStr);
}
int optval = _broadcast ? true : false;
int retval = ::setsockopt(socket, SOL_SOCKET, SO_BROADCAST, &optval,
sizeof(optval));
if(retval<0) errlogSevPrintf(errlogMajor,
"Error setting SO_BROADCAST: %s", strerror(errno));
printf("_broadcast: %d\n", _broadcast);
// set the socket options
//if (_reuseSocket)
// epicsSocketEnableAddressUseForDatagramFanout(socket);
optval = _reuseSocket ? true : false;
retval = ::setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &optval,
sizeof(optval));
if(retval<0) errlogSevPrintf(errlogMajor,
"Error setting SO_REUSEADDR: %s", strerror(errno));
printf("_reuseSocket: %d\n", _reuseSocket);
/* from MSDN:
* Note: If the setsockopt function is called before the bind
* function, TCP/IP options will not be checked by using TCP/IP
@@ -44,8 +62,9 @@ namespace epics {
* call will always succeed, but the bind function call can fail
* because of an early setsockopt call failing.
*/
// still we need to set SO_REUSEADDR befire bind
int retval = ::bind(socket, (sockaddr*)&(bindAddress.sa),
retval = ::bind(socket, (sockaddr*)&(bindAddress.sa),
sizeof(sockaddr));
if(retval<0) {
errlogSevPrintf(errlogMajor, "Error binding socket: %s",
@@ -53,20 +72,6 @@ namespace epics {
THROW_BASE_EXCEPTION(strerror(errno));
}
// set the socket options
int optval = _reuseSocket ? 1 : 0;
retval = ::setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &optval,
sizeof(optval));
if(retval<0) errlogSevPrintf(errlogMajor,
"Error setting SO_REUSEADDR: %s", strerror(errno));
optval = _broadcast ? 1 : 0;
retval = ::setsockopt(socket, SOL_SOCKET, SO_BROADCAST, &optval,
sizeof(optval));
if(retval<0) errlogSevPrintf(errlogMajor,
"Error setting SO_BROADCAST: %s", strerror(errno));
// sockets are blocking by default
return new BlockingUDPTransport(responseHandler, socket,

View File

@@ -533,6 +533,22 @@ namespace epics {
*/
virtual epics::pvData::Requester* getRequester() = 0;
};
/**
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: DataResponse.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
*/
class DataResponse : public ResponseRequest {
public:
/**
* Notification response.
* @param transport
* @param version
* @param payloadBuffer
*/
virtual void response(Transport* transport, int8 version, ByteBuffer* payloadBuffer) = 0;
};
}
}

View File

@@ -42,6 +42,11 @@ namespace epics {
virtual void destroyChannel(ChannelImpl* channel, bool force) = 0;
virtual ChannelImpl* createChannelInternal(String name, ChannelRequester* requester, short priority, InetAddrVector* addresses) = 0;
virtual ResponseRequest* getResponseRequest(pvAccessID ioid) = 0;
virtual pvAccessID registerResponseRequest(ResponseRequest* request) = 0;
virtual ResponseRequest* unregisterResponseRequest(ResponseRequest* request) = 0;
virtual Transport* getTransport(TransportClient* client, osiSockAddr* serverAddress, int16 minorRevision, int16 priority) = 0;

View File

@@ -23,6 +23,8 @@
#include <clientContextImpl.h>
#include <configuration.h>
#include <errlog.h>
using namespace epics::pvData;
using namespace epics::pvAccess;
@@ -411,37 +413,75 @@ typedef std::map<pvAccessID, ResponseRequest*> IOIDResponseRequestMap;
}
};
class DebugResponse : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods {
class NoopResponse : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods {
public:
/**
* @param context
*/
DebugResponse(ClientContextImpl* context) :
AbstractClientResponseHandler(context, "not implemented")
NoopResponse(ClientContextImpl* context, String description) :
AbstractClientResponseHandler(context, description)
{
}
virtual ~DebugResponse() {
virtual ~NoopResponse() {
}
};
class BadResponse : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods {
public:
/**
* @param context
*/
BadResponse(ClientContextImpl* context) :
AbstractClientResponseHandler(context, "Bad response")
{
}
virtual ~BadResponse() {
}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer)
{
char ipAddrStr[48];
ipAddrToDottedIP(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr));
ostringstream prologue;
prologue<<"Message [0x"<<hex<<(int)command<<", v0x"<<hex;
prologue<<(int)version<<"] received from "<< ipAddrStr;
hexDump(prologue.str(), "received",
(const int8*)payloadBuffer->getArray(),
payloadBuffer->getPosition(), payloadSize);
char ipAddrStr[48];
ipAddrToDottedIP(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr));
errlogSevPrintf(errlogInfo,
"Undecipherable message (bad response type %d) from %s.",
command, ipAddrStr);
}
};
class DataResponseHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods {
public:
/**
* @param context
*/
DataResponseHandler(ClientContextImpl* context) :
AbstractClientResponseHandler(context, "Data response")
{
}
virtual ~DataResponseHandler() {
}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer)
{
AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
transport->ensureData(4);
DataResponse* nrr = dynamic_cast<DataResponse*>(_context->getResponseRequest(payloadBuffer->getInt()));
if (nrr)
nrr->response(transport, version, payloadBuffer);
}
};
class SearchResponseHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods {
public:
SearchResponseHandler(ClientContextImpl* context) :
@@ -603,20 +643,20 @@ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoD
* @param context
*/
ClientResponseHandler(ClientContextImpl* context) {
static ResponseHandler* badResponse = new DebugResponse(context);
static ResponseHandler* dataResponse = 0; //new DataResponseHandler(context);
ResponseHandler* badResponse = new BadResponse(context);
ResponseHandler* dataResponse = new DataResponseHandler(context);
#define HANDLER_COUNT 28
m_handlerTable = new ResponseHandler*[HANDLER_COUNT];
m_handlerTable[ 0] = badResponse; // TODO new BeaconHandler(context), /* 0 */
m_handlerTable[ 1] = new ConnectionValidationHandler(context), /* 1 */
m_handlerTable[ 2] = badResponse; // TODO new NoopResponse(context, "Echo"), /* 2 */
m_handlerTable[ 3] = badResponse; // TODO new NoopResponse(context, "Search"), /* 3 */
m_handlerTable[ 2] = new NoopResponse(context, "Echo"), /* 2 */
m_handlerTable[ 3] = new NoopResponse(context, "Search"), /* 3 */
m_handlerTable[ 4] = new SearchResponseHandler(context), /* 4 */
m_handlerTable[ 5] = badResponse; // TODO new NoopResponse(context, "Introspection search"), /* 5 */
m_handlerTable[ 5] = new NoopResponse(context, "Introspection search"), /* 5 */
m_handlerTable[ 6] = dataResponse; /* 6 - introspection search */
m_handlerTable[ 7] = new CreateChannelHandler(context), /* 7 */
m_handlerTable[ 8] = badResponse; // TODO new NoopResponse(context, "Destroy channel"), /* 8 */ // TODO it might be useful to implement this...
m_handlerTable[ 8] = new NoopResponse(context, "Destroy channel"), /* 8 */ // TODO it might be useful to implement this...
m_handlerTable[ 9] = badResponse; /* 9 */
m_handlerTable[10] = dataResponse; /* 10 - get response */
m_handlerTable[11] = dataResponse; /* 11 - put response */
@@ -642,8 +682,6 @@ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoD
Transport* transport, int8 version, int8 command,
int payloadSize, ByteBuffer* payloadBuffer)
{
int c = command+0;
std::cout << "received " << c << std::endl;
if (command < 0 || command >= HANDLER_COUNT)
{
// TODO context.getLogger().fine("Invalid (or unsupported) command: " + command + ".");
@@ -656,7 +694,7 @@ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoD
}
// delegate
m_handlerTable[c]->handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
m_handlerTable[command]->handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
}
};
@@ -1809,6 +1847,75 @@ class TestChannelImpl : public ChannelImpl {
}
/**
* Searches for a response request with given channel IOID.
* @param ioid I/O ID.
* @return request response with given I/O ID.
*/
ResponseRequest* getResponseRequest(pvAccessID ioid)
{
/*
synchronized (pendingResponseRequests)
{
return (ResponseRequest)pendingResponseRequests.get(ioid);
}
*/
return 0;
}
/**
* Register response request.
* @param request request to register.
* @return request ID (IOID).
*/
pvAccessID registerResponseRequest(ResponseRequest* request)
{
/*
synchronized (pendingResponseRequests)
{
int ioid = generateIOID();
pendingResponseRequests.put(ioid, request);
return ioid;
}
*/
return 0;
}
/**
* Unregister response request.
* @param request
* @return removed object, can be <code>null</code>
*/
ResponseRequest* unregisterResponseRequest(ResponseRequest* request)
{
/*
synchronized (pendingResponseRequests)
{
return (ResponseRequest)pendingResponseRequests.remove(request.getIOID());
}
*/
return 0;
}
/**
* Generate IOID.
* @return IOID.
*/
pvAccessID generateIOID()
{
/*
synchronized (pendingResponseRequests)
{
// search first free (theoretically possible loop of death)
while (pendingResponseRequests.get(++lastIOID) != null || lastIOID == CAConstants.CAJ_INVALID_IOID);
// reserve IOID
pendingResponseRequests.put(lastIOID, null);
return lastIOID;
}
*/
return 0;
}
/**
* Get, or create if necessary, transport of given server address.
* @param serverAddress required transport address