Makefile: renamed blockingUDP header, added blockingUDPConnector.cpp

blockingUDPTransport.cpp: fixed ctor, addedmutex to 'enqueueSendRequest', started using 'sys/socket.h'
remote.h: added TransportClient and Connector classes
introspectionRegistry.h: organized #includes
This commit is contained in:
miha_vitorovic
2010-12-28 08:59:25 +01:00
parent 15ad456930
commit 06842f7890
7 changed files with 208 additions and 40 deletions

View File

@@ -26,7 +26,6 @@
<option id="gnu.cpp.compiler.option.include.paths.1856220007" name="Include paths (-I)" superClass="gnu.cpp.compiler.option.include.paths" valueType="includePath">
<listOptionValue builtIn="false" value="/opt/epics/base/include"/>
<listOptionValue builtIn="false" value="/opt/epics/base/include/os/Linux"/>
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/pvDataCPP/include}&quot;"/>
</option>
<inputType id="cdt.managedbuild.tool.gnu.cpp.compiler.input.1316200261" superClass="cdt.managedbuild.tool.gnu.cpp.compiler.input"/>
</tool>
@@ -45,7 +44,7 @@
</inputType>
</tool>
<tool id="cdt.managedbuild.tool.gnu.assembler.base.898309292" name="GCC Assembler" superClass="cdt.managedbuild.tool.gnu.assembler.base">
<option id="gnu.both.asm.option.include.paths.288017456" name="Include paths (-I)" superClass="gnu.both.asm.option.include.paths" valueType="includePath"/>
<option id="gnu.both.asm.option.include.paths.288017456" name="Include paths (-I)" superClass="gnu.both.asm.option.include.paths"/>
<inputType id="cdt.managedbuild.tool.gnu.assembler.input.342310237" superClass="cdt.managedbuild.tool.gnu.assembler.input"/>
</tool>
</toolChain>
@@ -308,7 +307,6 @@
<buildTargets>
<target name="all" path="" targetID="org.eclipse.cdt.build.MakeTargetBuilder">
<buildCommand>make</buildCommand>
<buildArguments/>
<buildTarget>all</buildTarget>
<stopOnError>true</stopOnError>
<useDefaultCommand>true</useDefaultCommand>
@@ -316,12 +314,19 @@
</target>
<target name="clean" path="" targetID="org.eclipse.cdt.build.MakeTargetBuilder">
<buildCommand>make</buildCommand>
<buildArguments/>
<buildTarget>clean</buildTarget>
<stopOnError>true</stopOnError>
<useDefaultCommand>true</useDefaultCommand>
<runAllBuilders>false</runAllBuilders>
</target>
<target name="uninstall" path="" targetID="org.eclipse.cdt.build.MakeTargetBuilder">
<buildCommand>make</buildCommand>
<buildArguments/>
<buildTarget>uninstall</buildTarget>
<stopOnError>true</stopOnError>
<useDefaultCommand>true</useDefaultCommand>
<runAllBuilders>false</runAllBuilders>
</target>
</buildTargets>
</storageModule>
</cconfiguration>

View File

@@ -37,8 +37,9 @@ LIBSRCS += CreateRequestFactory.cpp
SRC_DIRS += $(PVACCESS)/remote
INC += remote.h
INC += blockingUDPTransport.h
INC += blockingUDP.h
LIBSRCS += blockingUDPTransport.cpp
LIBSRCS += blockingUDPConnector.cpp
LIBRARY = pvAccess

View File

@@ -5,8 +5,8 @@
* Author: Miha Vitorovic
*/
#ifndef BLOCKINGUDPTRANSPORT_H_
#define BLOCKINGUDPTRANSPORT_H_
#ifndef BLOCKINGUDP_H_
#define BLOCKINGUDP_H_
/* pvAccess */
#include "remote.h"
@@ -16,6 +16,7 @@
/* pvData */
#include <noDefaultMethods.h>
#include <byteBuffer.h>
#include <lock.h>
/* EPICSv3 */
#include <osdSock.h>
@@ -230,9 +231,53 @@ namespace epics {
*/
char* _readBuffer;
/**
* Used for process sync.
*/
Mutex* _mutex;
};
class BlockingUDPConnector : public Connector, NoDefaultMethods {
public:
BlockingUDPConnector(bool reuseSocket,
InetAddrVector* sendAddresses, bool broadcast) :
_sendAddresses(sendAddresses), _reuseSocket(reuseSocket),
_broadcast(broadcast) {
}
virtual ~BlockingUDPConnector() {
// TODO: delete _sendAddresses here?
}
/**
* NOTE: transport client is ignored for broadcast (UDP).
*/
virtual Transport* connect(TransportClient* client,
ResponseHandler* responseHandler, osiSockAddr* bindAddress,
short transportRevision, short priority);
private:
/**
* Send address.
*/
InetAddrVector* _sendAddresses;
/**
* Reuse socket flag.
*/
bool _reuseSocket;
/**
* Broadcast flag.
*/
bool _broadcast;
};
}
}
#endif /* BLOCKINGUDPTRANSPORT_H_ */
#endif /* BLOCKINGUDP_H_ */

View File

@@ -0,0 +1,68 @@
/*
* blockingUDPConnector.cpp
*
* Created on: Dec 27, 2010
* Author: Miha Vitorovic
*/
/* pvAccess */
#include "blockingUDP.h"
#include "remote.h"
/* pvData */
#include <epicsException.h>
/* EPICSv3 */
#include <errlog.h>
/* standard */
#include <sys/types.h>
#include <sys/socket.h>
namespace epics {
namespace pvAccess {
Transport* BlockingUDPConnector::connect(TransportClient* client,
ResponseHandler* responseHandler, osiSockAddr* bindAddress,
short transportRevision, short priority) {
errlogSevPrintf(errlogInfo, "Creating datagram socket to: %s",
inetAddressToString(bindAddress).c_str());
SOCKET socket = ::socket(PF_INET, SOCK_DGRAM, 0);
/* from MSDN:
* Note: If the setsockopt function is called before the bind
* function, TCP/IP options will not be checked by using TCP/IP
* until the bind occurs. In this case, the setsockopt function
* call will always succeed, but the bind function call can fail
* because of an early setsockopt call failing.
*/
int retval = ::bind(socket, (sockaddr*)&(bindAddress->sa),
sizeof(sockaddr));
if(retval<0) {
errlogSevPrintf(errlogMajor, "Error binding socket: %s",
strerror(errno));
THROW_BASE_EXCEPTION(strerror(errno));
}
// set the socket options
int optval = 1; // true
retval = ::setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &optval,
sizeof(optval));
if(retval<0) errlogSevPrintf(errlogMajor,
"Error binding socket: %s", strerror(errno));
retval = ::setsockopt(socket, SOL_SOCKET, SO_BROADCAST, &optval,
sizeof(optval));
// sockets are blocking by default
return new BlockingUDPTransport(responseHandler, socket,
bindAddress, _sendAddresses, transportRevision);
}
}
}

View File

@@ -5,13 +5,14 @@
*/
/* pvAccess */
#include "blockingUDPTransport.h"
#include "blockingUDP.h"
#include "caConstants.h"
#include "inetAddressUtil.h"
/* pvData */
#include <byteBuffer.h>
#include <lock.h>
/* EPICSv3 */
#include <osdSock.h>
@@ -21,7 +22,8 @@
/* standard */
#include <cstdio>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <poll.h>
#include <errno.h>
@@ -33,31 +35,22 @@ namespace epics {
BlockingUDPTransport::BlockingUDPTransport(
ResponseHandler* responseHandler, SOCKET channel,
osiSockAddr* bindAddress, InetAddrVector* sendAddresses,
short remoteTransportRevision) {
_responseHandler = responseHandler;
_channel = channel;
_bindAddress = bindAddress;
_sendAddresses = sendAddresses;
_socketAddress = bindAddress;
// allocate receive buffer
_receiveBuffer = new ByteBuffer(MAX_UDP_RECV);
// allocate send buffer and non-reentrant lock
_sendBuffer = new ByteBuffer(MAX_UDP_SEND);
_ignoredAddresses = NULL;
_sendTo = NULL;
_closed = false;
_lastMessageStartPosition = 0;
_readBuffer = new char[MAX_UDP_RECV];
short remoteTransportRevision) :
_closed(false), _responseHandler(responseHandler),
_channel(channel), _socketAddress(bindAddress),
_bindAddress(bindAddress), _sendAddresses(sendAddresses),
_ignoredAddresses(NULL), _sendTo(NULL), _receiveBuffer(
new ByteBuffer(MAX_UDP_RECV)), _sendBuffer(
new ByteBuffer(MAX_UDP_RECV)),
_lastMessageStartPosition(0), _readBuffer(
new char[MAX_UDP_RECV]), _mutex(new Mutex()) {
}
BlockingUDPTransport::~BlockingUDPTransport() {
delete _receiveBuffer;
delete _sendBuffer;
delete _readBuffer;
delete _mutex;
}
void BlockingUDPTransport::start() {
@@ -87,7 +80,7 @@ namespace epics {
}
void BlockingUDPTransport::enqueueSendRequest(TransportSender* sender) {
// TODO: Java version uses synchronized. Why?
Lock lock(_mutex);
_sendTo = NULL;
_sendBuffer->clear();

View File

@@ -204,6 +204,60 @@ namespace epics {
int payloadSize, ByteBuffer* payloadBuffer) =0;
};
/**
* Client (user) of the transport.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: TransportClient.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
*/
class TransportClient {
public:
/**
* Notification of unresponsive transport (e.g. no heartbeat detected) .
*/
virtual void transportUnresponsive() =0;
/**
* Notification of responsive transport (e.g. heartbeat detected again),
* called to discard <code>transportUnresponsive</code> notification.
* @param transport responsive transport.
*/
virtual void transportResponsive(Transport* transport) =0;
/**
* Notification of network change (server restarted).
*/
virtual void transportChanged() =0;
/**
* Notification of forcefully closed transport.
*/
virtual void transportClosed() =0;
};
/**
* Interface defining socket connector (Connector-Transport pattern).
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: Connector.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
*/
class Connector {
public:
/**
* Connect.
* @param[in] client client requesting connection (transport).
* @param[in] address address of the server.
* @param[in] responseHandler reponse handler.
* @param[in] transportRevision transport revision to be used.
* @param[in] priority process priority.
* @return transport instance.
* @throws ConnectionException
*/
virtual Transport* connect(TransportClient* client, ResponseHandler* responseHandler,
osiSockAddr* address, short transportRevision, short priority) =0;
};
}
}

View File

@@ -5,18 +5,20 @@
#ifndef INTROSPECTIONREGISTRY_H
#define INTROSPECTIONREGISTRY_H
#include <map>
#include <iostream>
#include <lock.h>
#include <pvIntrospect.h>
#include <pvData.h>
#include <byteBuffer.h>
#include <serialize.h>
#include <serializeHelper.h>
#include <status.h>
#include <standardField.h>
#include <epicsMutex.h>
#include "lock.h"
#include "pvIntrospect.h"
#include "pvData.h"
#include "byteBuffer.h"
#include "serialize.h"
#include "serializeHelper.h"
#include "status.h"
#include "standardField.h"
#include <map>
#include <iostream>
using namespace epics::pvData;
using namespace std;