Fixed core dumps. Transport client now finishes successfully.

This commit is contained in:
miha_vitorovic
2011-01-07 13:01:48 +01:00
parent b868736759
commit 3c03971939
6 changed files with 41 additions and 84 deletions

View File

@@ -33,7 +33,7 @@ namespace epics {
_introspectionRegistry(new IntrospectionRegistry(true)),
_lastChannelSID(0), _channels(
new map<int, ServerChannel*> ()), _channelsMutex(
new Mutex()), _notifyOnClose(NULL) {
new Mutex()) {
// NOTE: priority not yet known, default priority is used to register/unregister
// TODO implement priorities in Reactor... not that user will
// change it.. still getPriority() must return "registered" priority!
@@ -68,7 +68,6 @@ namespace epics {
void BlockingServerTCPTransport::internalClose(bool force) {
BlockingTCPTransport::internalClose(force);
if(_notifyOnClose!=NULL) _notifyOnClose->transportClosed(this);
destroyAllChannels();
}

View File

@@ -37,7 +37,6 @@ namespace epics {
namespace pvAccess {
class MonitorSender;
class BlockingServerTCPTransport;
enum ReceiveStage {
READ_FROM_SOCKET, PROCESS_HEADER, PROCESS_PAYLOAD, NONE
@@ -47,19 +46,6 @@ namespace epics {
IMMEDIATE, DELAYED, USER_CONTROLED
};
class TransportCloseNotification {
public:
virtual ~TransportCloseNotification() {
}
/**
* When transport closes, the owner will be notified through this
* callback
*/
virtual void
transportClosed(BlockingServerTCPTransport* transport) =0;
};
class BlockingTCPTransport : public Transport,
public TransportSendControl {
public:
@@ -67,8 +53,6 @@ namespace epics {
ResponseHandler* responseHandler, int receiveBufferSize,
int16 priority);
virtual ~BlockingTCPTransport();
virtual bool isClosed() const {
return _closed;
}
@@ -271,6 +255,8 @@ namespace epics {
*/
virtual bool send(epics::pvData::ByteBuffer* buffer);
virtual ~BlockingTCPTransport();
private:
/**
* Default marker period.
@@ -352,6 +338,8 @@ namespace epics {
Context* _context;
volatile bool _sendThreadRunning;
/**
* Internal method that clears and releases buffer.
* sendLock and sendBufferLock must be hold while calling this method.
@@ -387,8 +375,6 @@ namespace epics {
TransportClient* client, short remoteTransportRevision,
float beaconInterval, int16 priority);
virtual ~BlockingClientTCPTransport();
virtual void timerStopped() {
// noop
}
@@ -444,6 +430,8 @@ namespace epics {
virtual void internalClose(bool force);
virtual ~BlockingClientTCPTransport();
private:
/**
@@ -549,8 +537,6 @@ namespace epics {
BlockingServerTCPTransport(Context* context, SOCKET channel,
ResponseHandler* responseHandler, int receiveBufferSize);
virtual ~BlockingServerTCPTransport();
virtual IntrospectionRegistry* getIntrospectionRegistry() {
return _introspectionRegistry;
}
@@ -638,10 +624,6 @@ namespace epics {
virtual void send(epics::pvData::ByteBuffer* buffer,
TransportSendControl* control);
void addCloseNotification(TransportCloseNotification* notifyTarget) {
_notifyOnClose = notifyTarget;
}
protected:
/**
* Introspection registry.
@@ -650,6 +632,8 @@ namespace epics {
virtual void internalClose(bool force);
virtual ~BlockingServerTCPTransport();
private:
/**
* Last SID cache.
@@ -663,8 +647,6 @@ namespace epics {
Mutex* _channelsMutex;
TransportCloseNotification* _notifyOnClose;
/**
* Destroy all channels.
*/
@@ -676,7 +658,7 @@ namespace epics {
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: BlockingTCPAcceptor.java,v 1.1 2010/05/03 14:45:42 mrkraimer Exp $
*/
class BlockingTCPAcceptor : public TransportCloseNotification {
class BlockingTCPAcceptor {
public:
/**
@@ -705,8 +687,6 @@ namespace epics {
*/
void destroy();
virtual void transportClosed(BlockingServerTCPTransport* transport);
private:
/**
* Context instance.
@@ -735,10 +715,6 @@ namespace epics {
epicsThreadId _threadId;
std::set<BlockingServerTCPTransport*>* _connectedClients;
Mutex* _connectedClientsMutex;
/**
* Initialize connection acception.
* @return port where server is listening

View File

@@ -24,7 +24,6 @@
#include <poll.h>
using std::ostringstream;
using std::set;
namespace epics {
namespace pvAccess {
@@ -33,9 +32,7 @@ namespace epics {
int receiveBufferSize) :
_context(context), _bindAddress(NULL), _serverSocketChannel(
INVALID_SOCKET), _receiveBufferSize(receiveBufferSize),
_destroyed(false), _threadId(NULL), _connectedClients(
new set<BlockingServerTCPTransport*> ()),
_connectedClientsMutex(new Mutex()) {
_destroyed(false), _threadId(NULL) {
initialize(port);
}
@@ -43,22 +40,6 @@ namespace epics {
destroy();
if(_bindAddress!=NULL) delete _bindAddress;
_connectedClientsMutex->lock();
// go through all the connected clients, close them, and destroy
set<BlockingServerTCPTransport*>::iterator it =
_connectedClients->begin();
while(it!=_connectedClients->end()) {
BlockingServerTCPTransport* client = *it;
it++;
client->close(true);
delete client;
}
_connectedClients->clear();
delete _connectedClients;
_connectedClientsMutex->unlock();
delete _connectedClientsMutex;
}
int BlockingTCPAcceptor::initialize(in_port_t port) {
@@ -254,16 +235,9 @@ namespace epics {
errlogInfo,
"Connection to CA client %s failed to be validated, closing it.",
ipAddrStr);
delete transport;
return;
}
// store the new connected client
_connectedClientsMutex->lock();
_connectedClients->insert(transport);
transport->addCloseNotification(this);
_connectedClientsMutex->unlock();
errlogSevPrintf(errlogInfo,
"Serving to CA client: %s", ipAddrStr);
@@ -307,16 +281,5 @@ namespace epics {
}
}
void BlockingTCPAcceptor::transportClosed(
BlockingServerTCPTransport* transport) {
Lock lock(_connectedClientsMutex);
// remove the closed client from the list of connected clients
_connectedClients->erase(transport);
// release the memory
delete transport;
}
}
}

View File

@@ -87,7 +87,8 @@ namespace epics {
_rcvThreadId(NULL), _sendThreadId(NULL), _monitorSendQueue(
new GrowingCircularBuffer<TransportSender*> (100)),
_monitorSender(new MonitorSender(_monitorMutex,
_monitorSendQueue)), _context(context) {
_monitorSendQueue)), _context(context),
_sendThreadRunning(false) {
_socketBuffer = new ByteBuffer(max(MAX_TCP_RECV
+MAX_ENSURE_DATA_BUFFER_SIZE, receiveBufferSize));
@@ -141,6 +142,7 @@ namespace epics {
}
void BlockingTCPTransport::start() {
_sendThreadRunning = true;
String threadName = "TCP-receive "+inetAddressToString(
_socketAddress);
@@ -210,7 +212,10 @@ namespace epics {
void BlockingTCPTransport::internalClose(bool force) {
// close the socket
epicsSocketDestroy(_channel);
if(_channel!=INVALID_SOCKET) {
epicsSocketDestroy(_channel);
_channel = INVALID_SOCKET;
}
}
int BlockingTCPTransport::getSocketReceiveBufferSize() const {
@@ -462,11 +467,11 @@ namespace epics {
maxToRead, 0);
_socketBuffer->put(readBuffer, 0, bytesRead);
if(bytesRead<0) {
if(bytesRead<=0) {
// error (disconnect, end-of-stream) detected
close(true);
if(nestedCall) THROW_BASE_EXCEPTION(
if(bytesRead<0&&nestedCall) THROW_BASE_EXCEPTION(
"bytesRead < 0");
return;
@@ -834,12 +839,21 @@ namespace epics {
errlogSevPrintf(errlogInfo, "Connection to %s closed.",
inetAddressToString(_socketAddress).c_str());
epicsSocketDestroy(_channel);
if(_channel!=INVALID_SOCKET) {
epicsSocketDestroy(_channel);
_channel = INVALID_SOCKET;
}
}
void BlockingTCPTransport::rcvThreadRunner(void* param) {
((BlockingTCPTransport*)param)->processReadCached(false, NONE,
CA_MESSAGE_HEADER_SIZE, false);
BlockingTCPTransport* obj = (BlockingTCPTransport*)param;
obj->processReadCached(false, NONE, CA_MESSAGE_HEADER_SIZE, false);
while(obj->_sendThreadRunning)
epicsThreadSleep(0.1);
delete obj;
}
void BlockingTCPTransport::sendThreadRunner(void* param) {
@@ -848,6 +862,8 @@ namespace epics {
obj->processSendQueue();
obj->freeConnectionResorces();
obj->_sendThreadRunning = false;
}
void BlockingTCPTransport::enqueueSendRequest(TransportSender* sender) {

View File

@@ -32,7 +32,7 @@ namespace epics {
ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr));
ostringstream prologue;
prologue<<endl<<"Message [0x"<<hex<<(int)command<<", v0x"<<hex;
prologue<<"Message [0x"<<hex<<(int)command<<", v0x"<<hex;
prologue<<(int)version<<"] received from "<<ipAddrStr;
hexDump(prologue.str(), _description,

View File

@@ -11,6 +11,9 @@
#include <sstream>
using namespace epics::pvData;
using std::stringstream;
using std::endl;
using std::cout;
namespace epics {
namespace pvAccess {
@@ -29,9 +32,9 @@ namespace epics {
void hexDump(const String prologue, const String name, const int8 *bs,
int start, int len) {
std::stringstream header;
stringstream header;
header<<prologue<<std::endl<<"Hexdump ["<<name<<"] size = "<<len;
header<<prologue<<endl<<"Hexdump ["<<name<<"] size = "<<len;
String out(header.str());
@@ -70,7 +73,7 @@ namespace epics {
}
out += chars;
std::cout<<out;
cout<<out<<endl;
}
/**