handle SEC node connection properly in client
baseclient.py: - select was not used properly, creating a busy loop - added stop function in TCPConnection mainwindow.py: - fixed behaviour when a connection is broken: a message is shown, and the node is removed from the tree Change-Id: I7223dfd9ea027681aff089f2fa16e134a16a7b84 Reviewed-on: https://forge.frm2.tum.de/review/20922 Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de> Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
This commit is contained in:
@@ -68,7 +68,7 @@ class TCPConnection(object):
|
|||||||
self._readbuffer = queue.Queue(100)
|
self._readbuffer = queue.Queue(100)
|
||||||
io = socket.create_connection((self._host, self._port))
|
io = socket.create_connection((self._host, self._port))
|
||||||
io.setblocking(False)
|
io.setblocking(False)
|
||||||
io.settimeout(0.3)
|
self.stopflag = False
|
||||||
self._io = io
|
self._io = io
|
||||||
if self._thread and self._thread.is_alive():
|
if self._thread and self._thread.is_alive():
|
||||||
return
|
return
|
||||||
@@ -76,49 +76,60 @@ class TCPConnection(object):
|
|||||||
|
|
||||||
def _run(self):
|
def _run(self):
|
||||||
try:
|
try:
|
||||||
data = u''
|
data = b''
|
||||||
while True:
|
while not self.stopflag:
|
||||||
|
rlist, _, xlist = select([self._io], [], [self._io], 1)
|
||||||
|
if xlist:
|
||||||
|
# on some strange systems, a closed connection is indicated by
|
||||||
|
# an exceptional condition instead of "read ready" + "empty recv"
|
||||||
newdata = b''
|
newdata = b''
|
||||||
|
else:
|
||||||
|
if not rlist:
|
||||||
|
continue # check stopflag every second
|
||||||
|
# self._io is now ready to read some bytes
|
||||||
try:
|
try:
|
||||||
dlist = [self._io.fileno()]
|
|
||||||
rlist, wlist, xlist = select(dlist, dlist, dlist, 1)
|
|
||||||
if dlist[0] in rlist + wlist:
|
|
||||||
newdata = self._io.recv(1024)
|
newdata = self._io.recv(1024)
|
||||||
if dlist[0] in xlist:
|
except socket.error as err:
|
||||||
print("Problem: exception on socket, reconnecting!")
|
if err.args[0] == socket.EAGAIN:
|
||||||
for cb, arg in self.callbacks:
|
# if we receive an EAGAIN error, just continue
|
||||||
cb(arg)
|
continue
|
||||||
return
|
newdata = b''
|
||||||
except socket.timeout:
|
except Exception:
|
||||||
pass
|
newdata = b''
|
||||||
except Exception as err:
|
if not newdata: # no data on recv indicates a closed connection
|
||||||
print(err, "reconnecting")
|
raise IOError('%s:%d disconnected' % (self._host, self._port))
|
||||||
for cb, arg in self.callbacks:
|
lines = (data + newdata).split(b'\n')
|
||||||
cb(arg)
|
for line in lines[:-1]: # last line is incomplete or empty
|
||||||
return
|
|
||||||
data += newdata.decode('latin-1')
|
|
||||||
while '\n' in data:
|
|
||||||
line, data = data.split('\n', 1)
|
|
||||||
try:
|
try:
|
||||||
self._readbuffer.put(line.strip('\r'),
|
self._readbuffer.put(line.strip(b'\r').decode('utf-8'),
|
||||||
block=True,
|
block=True, timeout=1)
|
||||||
timeout=1)
|
|
||||||
except queue.Full:
|
except queue.Full:
|
||||||
self.log.debug('rcv queue full! dropping line: %r' %
|
self.log.debug('rcv queue full! dropping line: %r' % line)
|
||||||
line)
|
data = lines[-1]
|
||||||
finally:
|
except Exception as err:
|
||||||
self._thread = None
|
self.log.error(err)
|
||||||
|
|
||||||
def readline(self, block=False):
|
|
||||||
"""blocks until a full line was read and returns it"""
|
|
||||||
i = 10
|
|
||||||
while i:
|
|
||||||
try:
|
try:
|
||||||
return self._readbuffer.get(block=True, timeout=1)
|
self._io.shutdown(socket.SHUT_RDWR)
|
||||||
except queue.Empty:
|
except socket.error:
|
||||||
pass
|
pass
|
||||||
if not block:
|
try:
|
||||||
i -= 1
|
self._io.close()
|
||||||
|
except socket.error:
|
||||||
|
pass
|
||||||
|
for cb, args in self.callbacks:
|
||||||
|
cb(*args)
|
||||||
|
|
||||||
|
def readline(self, timeout=None):
|
||||||
|
"""blocks until a full line was read and returns it
|
||||||
|
|
||||||
|
returns None when connection is stopped"""
|
||||||
|
if self.stopflag:
|
||||||
|
return None
|
||||||
|
return self._readbuffer.get(block=True, timeout=timeout)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.stopflag = True
|
||||||
|
self._readbuffer.put(None) # terminate pending readline
|
||||||
|
|
||||||
def readable(self):
|
def readable(self):
|
||||||
return not self._readbuffer.empty()
|
return not self._readbuffer.empty()
|
||||||
@@ -239,6 +250,8 @@ class Client(object):
|
|||||||
|
|
||||||
while not self.stopflag:
|
while not self.stopflag:
|
||||||
line = self.connection.readline()
|
line = self.connection.readline()
|
||||||
|
if line is None: # connection stopped
|
||||||
|
break
|
||||||
self.connection_established = True
|
self.connection_established = True
|
||||||
self.log.debug('got answer %r' % line)
|
self.log.debug('got answer %r' % line)
|
||||||
if line.startswith(('SECoP', 'SINE2020&ISSE,SECoP')):
|
if line.startswith(('SECoP', 'SINE2020&ISSE,SECoP')):
|
||||||
@@ -396,8 +409,8 @@ class Client(object):
|
|||||||
self.callbacks.setdefault('%s:%s' % (module, parameter),
|
self.callbacks.setdefault('%s:%s' % (module, parameter),
|
||||||
set()).discard(cb)
|
set()).discard(cb)
|
||||||
|
|
||||||
def register_shutdown_callback(self, func, arg):
|
def register_shutdown_callback(self, func, *args):
|
||||||
self.connection.callbacks.append((func, arg))
|
self.connection.callbacks.append((func, args))
|
||||||
|
|
||||||
def communicate(self, msgtype, spec='', data=None):
|
def communicate(self, msgtype, spec='', data=None):
|
||||||
# only return the data portion....
|
# only return the data portion....
|
||||||
@@ -470,10 +483,11 @@ class Client(object):
|
|||||||
|
|
||||||
def quit(self):
|
def quit(self):
|
||||||
# after calling this the client is dysfunctional!
|
# after calling this the client is dysfunctional!
|
||||||
self.communicate(DISABLEEVENTSREQUEST)
|
# self.communicate(DISABLEEVENTSREQUEST)
|
||||||
self.stopflag = True
|
self.stopflag = True
|
||||||
|
self.connection.stop()
|
||||||
if self._thread and self._thread.is_alive():
|
if self._thread and self._thread.is_alive():
|
||||||
self.thread.join(self._thread)
|
self._thread.join(10)
|
||||||
|
|
||||||
def startup(self, _async=False):
|
def startup(self, _async=False):
|
||||||
self._issueDescribe()
|
self._issueDescribe()
|
||||||
|
|||||||
@@ -65,6 +65,7 @@ class QSECNode(SECNode, QObject):
|
|||||||
|
|
||||||
|
|
||||||
class MainWindow(QMainWindow):
|
class MainWindow(QMainWindow):
|
||||||
|
showMessageSignal = pyqtSignal(str, str)
|
||||||
|
|
||||||
def __init__(self, parent=None):
|
def __init__(self, parent=None):
|
||||||
super(MainWindow, self).__init__(parent)
|
super(MainWindow, self).__init__(parent)
|
||||||
@@ -83,6 +84,7 @@ class MainWindow(QMainWindow):
|
|||||||
self._paramCtrls = {}
|
self._paramCtrls = {}
|
||||||
self._topItems = {}
|
self._topItems = {}
|
||||||
self._currentWidget = self.splitter.widget(1).layout().takeAt(0)
|
self._currentWidget = self.splitter.widget(1).layout().takeAt(0)
|
||||||
|
self.showMessageSignal.connect(self.showMessage)
|
||||||
|
|
||||||
# add localhost (if available) and SEC nodes given as arguments
|
# add localhost (if available) and SEC nodes given as arguments
|
||||||
args = sys.argv[1:]
|
args = sys.argv[1:]
|
||||||
@@ -129,15 +131,17 @@ class MainWindow(QMainWindow):
|
|||||||
current.parent().text(0), current.text(0))
|
current.parent().text(0), current.text(0))
|
||||||
|
|
||||||
def _removeSubTree(self, toplevel_item):
|
def _removeSubTree(self, toplevel_item):
|
||||||
#....
|
self.treeWidget.invisibleRootItem().removeChild(toplevel_item)
|
||||||
pass
|
|
||||||
|
|
||||||
def _nodeDisconnected_callback(self, host):
|
def _nodeDisconnected_callback(self, host):
|
||||||
node = self._nodes[host]
|
node = self._nodes[host]
|
||||||
topItem = self._topItems[node]
|
self._removeSubTree(self._topItems[node])
|
||||||
self._removeSubTree(topItem)
|
del self._topItems[node]
|
||||||
node.quit()
|
node.quit()
|
||||||
QMessageBox(self.parent(), repr(host))
|
self.showMessageSignal.emit('connection closed', 'connection to %s closed' % host)
|
||||||
|
|
||||||
|
def showMessage(self, title, text):
|
||||||
|
QMessageBox.warning(self.parent(), title, text)
|
||||||
|
|
||||||
def _addNode(self, host):
|
def _addNode(self, host):
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user