adjustments on client.SecopClient

- change self._state to self.state: may be read by calling code
- online and state attributes to be set after the nodStateChange
  callback in order to detect state changes
- remove ProxyClient.readParameter (no need to be implemented)
- reconnect thread must be killed on an external call to disconnect

Change-Id: I08d75dc8e29aa6e65a33ce36a911da4eaf0b62ef
Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/22551
Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
zolliker 2020-02-27 14:46:45 +01:00
parent a5ad527378
commit 199ff0419a

View File

@ -100,7 +100,7 @@ class ProxyClient:
CALLBACK_NAMES = ('updateEvent', 'descriptiveDataChange', 'nodeStateChange', 'unhandledMessage')
online = False # connected or reconnecting since a short time
validate_data = False
_state = 'disconnected' # further possible values: 'connecting', 'reconnecting', 'connected'
state = 'disconnected' # further possible values: 'connecting', 'reconnecting', 'connected'
def __init__(self):
self.callbacks = {cbname: defaultdict(list) for cbname in self.CALLBACK_NAMES}
@ -141,7 +141,7 @@ class ProxyClient:
if mname == key:
cbfunc(mname, pname, *data)
elif cbname == 'nodeStateChange':
cbfunc(self.online, self._state)
cbfunc(self.online, self.state)
if kwds:
raise TypeError('unknown callback: %s' % (', '.join(kwds)))
@ -180,17 +180,15 @@ class ProxyClient:
self.readParameter(module, parameter)
return self.cache[module, parameter]
def readParameter(self, module, parameter):
"""forced read over connection"""
raise NotImplementedError()
class SecopClient(ProxyClient):
"""a general SECoP client"""
reconnect_timeout = 10
shutdown = False
_running = False
_shutdown = False
_rxthread = None
_txthread = None
_connthread = None
disconnect_time = 0 # time of last disconnect
secop_version = ''
descriptive_data = {}
@ -229,7 +227,7 @@ class SecopClient(ProxyClient):
else:
self._set_state(False, 'connecting')
deadline = time.time() + try_period
while True:
while not self._shutdown:
try:
self.io = AsynConn(self.uri) # timeout 1 sec
self.io.writeline(IDENTREQUEST.encode('utf-8'))
@ -242,6 +240,7 @@ class SecopClient(ProxyClient):
raise self.error_map('HardwareError')('bad answer to %s: %r' %
(IDENTREQUEST, self.secop_version))
# now its safe to do secop stuff
self._running = True
self._rxthread = mkthread(self.__rxthread)
self._txthread = mkthread(self.__txthread)
self.log.debug('connected to %s', self.uri)
@ -256,13 +255,14 @@ class SecopClient(ProxyClient):
# print(formatExtendedTraceback())
if time.time() > deadline:
# stay online for now, if activated
self._set_state(self.online and self.activate, 'disconnected')
self._set_state(self.online and self.activate)
raise
time.sleep(1)
self.log.info('%s ready', self.nodename)
if not self._shutdown:
self.log.info('%s ready', self.nodename)
def __txthread(self):
while not self.shutdown:
while self._running:
entry = self.txq.get()
if entry is None:
break
@ -281,10 +281,10 @@ class SecopClient(ProxyClient):
self.log.debug('TX: %r', line)
self.io.send(line)
self._txthread = None
self.disconnect()
self.disconnect(False)
def __rxthread(self):
while not self.shutdown:
while self._running:
try:
reply = self.io.readline()
if reply is None:
@ -340,10 +340,10 @@ class SecopClient(ProxyClient):
self.txq.put(self.pending.get())
self._rxthread = None
self.disconnect()
self.disconnect(False)
if self.activate:
self.log.info('reconnect to %s', self.uri)
mkthread(self._reconnect)
self.log.info('try to reconnect to %s', self.uri)
self._connthread = mkthread(self._reconnect)
else:
self.log.warning('%s disconnected', self.uri)
self._set_state(False, 'disconnected')
@ -354,10 +354,10 @@ class SecopClient(ProxyClient):
and trigger event when done and event is not None
"""
self.disconnect_time = time.time()
mkthread(self._reconnect, connected_callback)
self._connthread = mkthread(self._reconnect, connected_callback)
def _reconnect(self, connected_callback=None):
while True:
while not self._shutdown:
try:
self.connect()
if connected_callback:
@ -372,17 +372,25 @@ class SecopClient(ProxyClient):
if self.online: # was recently connected
self.disconnect_time = 0
self.log.warning('can not reconnect to %s (%r)' % (self.nodename, e))
self.log.info('continue trying to reconnect')
# self.log.warning(formatExtendedTraceback())
self._set_state(False, 'disconnected')
self._set_state(False)
time.sleep(self.reconnect_timeout)
else:
time.sleep(1)
self._connthread = None
def disconnect(self):
self.shutdown = True
def disconnect(self, shutdown=True):
self._running = False
if shutdown:
self._shutdown = True
self._set_state(False, 'shutdown')
if self._connthread: # wait for connection thread stopped
self._connthread.join()
self._connthread = None
self.disconnect_time = time.time()
if self._txthread:
self.txq.put(None) # shutdownmarker
self.txq.put(None) # shutdown marker
self._txthread.join()
self._txthread = None
if self._rxthread:
@ -404,7 +412,6 @@ class SecopClient(ProxyClient):
event.set()
except queue.Empty:
pass
self.shutdown = False
def _init_descriptive_data(self, data):
"""rebuild descriptive data"""
@ -456,11 +463,13 @@ class SecopClient(ProxyClient):
def _set_state(self, online, state=None):
# treat reconnecting as online!
self._state = state or self._state
self.online = online
self.callback(None, 'nodeStateChange', self.online, self._state)
state = state or self.state
self.callback(None, 'nodeStateChange', online, state)
for mname in self.modules:
self.callback(mname, 'nodeStateChange', self.online, self._state)
self.callback(mname, 'nodeStateChange', online, state)
# set online attribute after callbacks -> callback may check for old state
self.online = online
self.state = state
def queue_request(self, action, ident=None, data=None):
"""make a request"""
@ -492,6 +501,7 @@ class SecopClient(ProxyClient):
return self.get_reply(entry)
def readParameter(self, module, parameter):
"""forced read over connection"""
try:
self.request(READREQUEST, self.identifier[module, parameter])
except secop.errors.SECoPError: