add test for message encode/decode

in corner cases, encode_msg and decode_msg_frame fail

Change-Id: I28b3ddcdce80c7c5b71afe19b11bb73cd761f595
Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/22211
Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de>
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
2020-01-16 17:24:55 +01:00
parent 2d98fe8812
commit e623fe8287
2 changed files with 61 additions and 25 deletions

View File

@ -17,35 +17,26 @@
#
# Module authors:
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
import json
EOL = b'\n'
SPACE = b' '
def encode_msg_frame(action, specifier=None, data=None):
""" encode a msg_tripel into an msg_frame, ready to be sent
""" encode a msg_triple into an msg_frame, ready to be sent
action (and optional specifier) are str strings,
data may be an json-yfied python object"""
action = action.encode('utf-8')
if specifier is None:
if data is None:
return b''.join((action, EOL))
# error_activate might have no specifier
specifier = ''
specifier = specifier.encode('utf-8')
if data:
data = json.dumps(data).encode('utf-8')
return b''.join((action, SPACE, specifier, SPACE, data, EOL))
return b''.join((action, SPACE, specifier, EOL))
msg = (action, specifier or '', '' if data is None else json.dumps(data))
return ' '.join(msg).strip().encode('utf-8') + EOL
def get_msg(_bytes):
"""try to deframe the next msg in (binary) input
always return a tupel (msg, remaining_input)
always return a tuple (msg, remaining_input)
msg may also be None
"""
if EOL not in _bytes:
@ -54,14 +45,7 @@ def get_msg(_bytes):
def decode_msg(msg):
"""decode the (binary) msg into a (str) msg_tripel"""
# check for leading/trailing CR and remove it
res = msg.split(b' ', 2)
action = res[0].decode('utf-8')
if len(res) == 1:
return action, None, None
specifier = res[1].decode('utf-8')
if len(res) == 2:
return action, specifier, None
data = json.loads(res[2].decode('utf-8'))
return action, specifier, data
"""decode the (binary) msg into a (str) msg_triple"""
res = msg.strip().decode('utf-8').split(' ', 2) + ['', '']
action, specifier, data = res[0:3]
return action, specifier or None, None if data == '' else json.loads(data)