From 87261382cf01c1c6a4961e55fec4ebd15e78f127 Mon Sep 17 00:00:00 2001 From: Enrico Faulhaber Date: Mon, 15 Oct 2018 14:24:34 +0200 Subject: [PATCH] remove Message objects + rewrite server startup Change-Id: Ide72fb915c3ca93c74edadd8952853508e677de7 Reviewed-on: https://forge.frm2.tum.de/review/19199 Tested-by: JenkinsCodeReview Reviewed-by: Enrico Faulhaber --- etc/amagnet.cfg | 7 +- etc/ccr12.cfg | 5 +- etc/cryo.cfg | 7 +- etc/demo.cfg | 8 +- etc/epics.cfg | 14 +- etc/sim.cfg | 5 +- etc/sim_mlz_amagnet.cfg | 7 +- etc/sim_mlz_cci3he1.cfg | 5 +- etc/sim_mlz_ccidu1.cfg | 5 +- etc/sim_mlz_ccr12.cfg | 5 +- etc/sim_mlz_ccr12_v2.cfg | 5 +- etc/sim_mlz_htf02.cfg | 5 +- etc/sim_mlz_stressihtf2.cfg | 5 +- etc/sim_mlz_stressihtf2_v2.cfg | 5 +- etc/stressihtf2.cfg | 5 +- etc/test.cfg | 5 +- secop/client/__init__.py | 15 +- secop/lib/enum.py | 2 +- secop/modules.py | 17 +- secop/protocol/dispatcher.py | 269 ++++++++++++--------------- secop/protocol/interface/__init__.py | 47 ++++- secop/protocol/interface/tcp.py | 114 ++++-------- secop/protocol/messages.py | 117 ------------ secop/server.py | 195 ++++++++----------- test/test_client_baseclient.py | 12 +- test/test_datatypes.py | 3 +- test/test_lib_enum.py | 3 +- test/test_modules.py | 17 +- test/test_params.py | 3 +- test/test_parse.py | 3 +- 30 files changed, 337 insertions(+), 578 deletions(-) diff --git a/etc/amagnet.cfg b/etc/amagnet.cfg index 7eeb826..80aca7c 100644 --- a/etc/amagnet.cfg +++ b/etc/amagnet.cfg @@ -1,4 +1,4 @@ -[equipment MLZ_amagnet(Garfield)] +[node MLZ_amagnet(Garfield)] description=MLZ-Amagnet . Water cooled magnet from ANTARES@MLZ. @@ -14,12 +14,9 @@ visibility=expert foo=bar [interface tcp] -interface=tcp +type=tcp bindto=0.0.0.0 bindport=10767 -# protocol to use for this interface -framing=eol -encoding=secop [module enable] class=secop_mlz.entangle.NamedDigitalOutput diff --git a/etc/ccr12.cfg b/etc/ccr12.cfg index b3384c7..4f65e0e 100644 --- a/etc/ccr12.cfg +++ b/etc/ccr12.cfg @@ -6,12 +6,9 @@ description = CCR12 box of MLZ Sample environment group [interface tcp] -interface=tcp +type=tcp bindto=0.0.0.0 bindport=10767 -# protocol to use for this interface -framing=eol -encoding=secop [module automatik] class=secop_mlz.entangle.NamedDigitalOutput diff --git a/etc/cryo.cfg b/etc/cryo.cfg index 26d2e4d..db30633 100644 --- a/etc/cryo.cfg +++ b/etc/cryo.cfg @@ -1,4 +1,4 @@ -[equipment cryo_7] +[node cryo_7] # set SEC-node properties description = short description . @@ -6,12 +6,9 @@ description = short description [interface tcp] -interface=tcp +type=tcp bindto=0.0.0.0 bindport=10769 -# protocol to use for this interface -framing=eol -encoding=secop [module cryo] diff --git a/etc/demo.cfg b/etc/demo.cfg index c39504e..10842af 100644 --- a/etc/demo.cfg +++ b/etc/demo.cfg @@ -1,13 +1,9 @@ -[equipment Equipment_ID_for_demonstration] +[node Equipment_ID_for_demonstration] description = virtual modules to play around with [interface tcp] -interface=tcp bindto=0.0.0.0 bindport=10767 -# protocol to use for this interface -framing=eol -encoding=secop [module heatswitch] class=secop_demo.modules.Switch @@ -23,7 +19,7 @@ class=secop_demo.modules.SampleTemp sensor = 'Q1329V7R3' ramp = 4 target = 10 -default = 10 +value = 10 [module tc1] class=secop_demo.modules.CoilTemp diff --git a/etc/epics.cfg b/etc/epics.cfg index feb7dd6..1342ce1 100644 --- a/etc/epics.cfg +++ b/etc/epics.cfg @@ -1,20 +1,10 @@ -[equipment see_demo_equipment] +[node see_demo_equipment] description=Do not use, it needs to be rewritten.... -[client] -connectto=0.0.0.0 -port=10767 -interface = tcp -framing=eol -encoding=secop - [interface testing] -interface=tcp +type=tcp bindto=0.0.0.0 bindport=10767 -# protocol to use for this interface -framing=eol -encoding=secop [module tc1] class=secop_demo.modules.CoilTemp diff --git a/etc/sim.cfg b/etc/sim.cfg index 60149ea..098459a 100644 --- a/etc/sim.cfg +++ b/etc/sim.cfg @@ -4,12 +4,9 @@ description=description of the simulation sec-node Testing simulation dummy setup. [interface tcp] -interface=tcp +type=tcp bindto=0.0.0.0 bindport=10767 -# protocol to use for this interface -framing=eol -encoding=secop [module sim] diff --git a/etc/sim_mlz_amagnet.cfg b/etc/sim_mlz_amagnet.cfg index 2c3f579..1dc516f 100644 --- a/etc/sim_mlz_amagnet.cfg +++ b/etc/sim_mlz_amagnet.cfg @@ -1,4 +1,4 @@ -[equipment SIM_MLZ_amagnet(Garfield)] +[node SIM_MLZ_amagnet(Garfield)] description=MLZ-Amagnet . Water cooled magnet from ANTARES@MLZ. @@ -14,12 +14,9 @@ visibility=expert foo=bar [interface tcp] -interface=tcp +type=tcp bindto=0.0.0.0 bindport=10767 -# protocol to use for this interface -framing=eol -encoding=secop [module enable] class=secop.simulation.SimWritable diff --git a/etc/sim_mlz_cci3he1.cfg b/etc/sim_mlz_cci3he1.cfg index 11b7219..811f0cd 100644 --- a/etc/sim_mlz_cci3he1.cfg +++ b/etc/sim_mlz_cci3he1.cfg @@ -7,12 +7,9 @@ description = [sim] cci3he box of MLZ Sample environment group .meaning={'T_regulation':{'T_cci3he1':300}, 'T_sample':{'T_cci3he1_A':300, 'T_cci3he1_B':280}} [interface tcp] -interface=tcp +type=tcp bindto=0.0.0.0 bindport=10767 -# protocol to use for this interface -framing=eol -encoding=secop [module T_cci3he1] class=secop.simulation.SimDrivable diff --git a/etc/sim_mlz_ccidu1.cfg b/etc/sim_mlz_ccidu1.cfg index a423493..7b2173d 100644 --- a/etc/sim_mlz_ccidu1.cfg +++ b/etc/sim_mlz_ccidu1.cfg @@ -7,12 +7,9 @@ description = [sim] ccidu box of MLZ Sample environment group .meaning={'T_regulation':{'T_ccidu1':300}, 'T_sample':{'T_ccidu1_A':300, 'T_ccidu1_B':280}} [interface tcp] -interface=tcp +type=tcp bindto=0.0.0.0 bindport=10767 -# protocol to use for this interface -framing=eol -encoding=secop [module T_ccidu1] class=secop.simulation.SimDrivable diff --git a/etc/sim_mlz_ccr12.cfg b/etc/sim_mlz_ccr12.cfg index 660b775..ba7d6ac 100644 --- a/etc/sim_mlz_ccr12.cfg +++ b/etc/sim_mlz_ccr12.cfg @@ -9,12 +9,9 @@ description = [sim] CCR12 box of MLZ Sample environment group .meaning={'T_regulation':{'T_ccr12':200, 'T_ccr12_stick':150, 'T_ccr12_tube':100}, 'T_sample':{'T_ccr12_B':100, 'T_ccr12_A':90, 'T_ccr12_D':20, 'T_ccr12_C':10}} [interface tcp] -interface=tcp +type=tcp bindto=0.0.0.0 bindport=10767 -# protocol to use for this interface -framing=eol -encoding=secop [module T_ccr12] class=secop.simulation.SimDrivable diff --git a/etc/sim_mlz_ccr12_v2.cfg b/etc/sim_mlz_ccr12_v2.cfg index 418602a..a9c5fba 100644 --- a/etc/sim_mlz_ccr12_v2.cfg +++ b/etc/sim_mlz_ccr12_v2.cfg @@ -9,12 +9,9 @@ description = [sim] CCR12 box of MLZ Sample environment group .meaning={'T_regulation':{'T_ccr12':200, 'T_ccr12_stick':150, 'T_ccr12_tube':100}, 'T_sample':{'T_ccr12_B':100, 'T_ccr12_A':90, 'T_ccr12_D':20, 'T_ccr12_C':10}} [interface tcp] -interface=tcp +type=tcp bindto=0.0.0.0 bindport=10767 -# protocol to use for this interface -framing=eol -encoding=secop [module T_ccr12] class=secop.simulation.SimDrivable diff --git a/etc/sim_mlz_htf02.cfg b/etc/sim_mlz_htf02.cfg index cd8bcb7..89c671a 100644 --- a/etc/sim_mlz_htf02.cfg +++ b/etc/sim_mlz_htf02.cfg @@ -6,12 +6,9 @@ description = [sim] htf02 box of MLZ Sample environment group .meaning={'T_regulation':{'T_htf02':100}, 'T_sample':{'T_htf02':100}} [interface tcp] -interface=tcp +type=tcp bindto=0.0.0.0 bindport=10767 -# protocol to use for this interface -framing=eol -encoding=secop [module T_htf02] class=secop.simulation.SimDrivable diff --git a/etc/sim_mlz_stressihtf2.cfg b/etc/sim_mlz_stressihtf2.cfg index 69cce83..045c9c1 100644 --- a/etc/sim_mlz_stressihtf2.cfg +++ b/etc/sim_mlz_stressihtf2.cfg @@ -6,12 +6,9 @@ description = [sim] Stressihtf2 box of MLZ Sample environment group .meaning={'T_regulation':{'T_stressihtf2':100}, 'T_sample':{'T_stressihtf2':100}} [interface tcp] -interface=tcp +type=tcp bindto=0.0.0.0 bindport=10767 -# protocol to use for this interface -framing=eol -encoding=secop [module T_stressihtf2] class=secop.simulation.SimDrivable diff --git a/etc/sim_mlz_stressihtf2_v2.cfg b/etc/sim_mlz_stressihtf2_v2.cfg index 5109f15..2f708a2 100644 --- a/etc/sim_mlz_stressihtf2_v2.cfg +++ b/etc/sim_mlz_stressihtf2_v2.cfg @@ -6,12 +6,9 @@ description = [sim] Stressihtf2 box of MLZ Sample environment group meaning={'T_regulation':{'T':100}, 'T_sample':{'T_sample':100}} [interface tcp] -interface=tcp +type=tcp bindto=0.0.0.0 bindport=10767 -# protocol to use for this interface -framing=eol -encoding=secop [module T] class=secop.simulation.SimDrivable diff --git a/etc/stressihtf2.cfg b/etc/stressihtf2.cfg index 4e4152c..7cc14a2 100644 --- a/etc/stressihtf2.cfg +++ b/etc/stressihtf2.cfg @@ -6,12 +6,9 @@ description = Stressihtf2 box of MLZ Sample environment group meaning={'T_regulation':{'T':100}, 'T_sample':{'T_sample':100}} [interface tcp] -interface=tcp +type=tcp bindto=0.0.0.0 bindport=10767 -# protocol to use for this interface -framing=eol -encoding=secop [module T] class=secop_mlz.entangle.TemperatureController diff --git a/etc/test.cfg b/etc/test.cfg index 04b29aa..238952e 100644 --- a/etc/test.cfg +++ b/etc/test.cfg @@ -9,12 +9,9 @@ description=description of the testing sec-node These texts are supposed to be possibly very long. [interface tcp] -interface=tcp +type=tcp bindto=0.0.0.0 bindport=10768 -# protocol to use for this interface -framing=eol -encoding=secop [module LN2] diff --git a/secop/client/__init__.py b/secop/client/__init__.py index ad7e5a0..585e4c9 100644 --- a/secop/client/__init__.py +++ b/secop/client/__init__.py @@ -37,8 +37,8 @@ except ImportError: import ConfigParser as configparser import mlzlog -from secop.protocol.interface.tcp import decode_msg, get_msg, encode_msg_frame -from secop.protocol.messages import EVENTREPLY, DESCRIPTIONREQUEST, Message +from secop.protocol.interface import decode_msg, get_msg, encode_msg_frame +from secop.protocol.messages import EVENTREPLY, DESCRIPTIONREQUEST class NameSpace(dict): @@ -135,12 +135,13 @@ class TCPConnection(object): break # no more messages to process if not origin: # empty string continue # ??? - msg = decode_msg(origin) + _ = decode_msg(origin) # construct msgObj from msg try: - msgObj = Message(*msg) - msgObj.origin = origin.decode('latin-1') - self.handle(msgObj) + #msgObj = Message(*msg) + #msgObj.origin = origin.decode('latin-1') + #self.handle(msgObj) + pass except Exception: # ??? what to do here? pass @@ -188,7 +189,7 @@ class Client(object): # XXX: further notification-callbacks needed ??? def populateNamespace(self, namespace): - self.connection.send(Message(DESCRIPTIONREQUEST)) + #self.connection.send(Message(DESCRIPTIONREQUEST)) # reply = self.connection.read() # self.log.info("found modules %r" % reply) # create proxies, populate cache.... diff --git a/secop/lib/enum.py b/secop/lib/enum.py index bcd5cf7..11024e4 100755 --- a/secop/lib/enum.py +++ b/secop/lib/enum.py @@ -28,7 +28,7 @@ try: text_type = unicode # Py2 except NameError: text_type = str # Py3 - + unicode = str # pylint: disable=redefined-builtin class EnumMember(object): """represents one member of an Enum diff --git a/secop/modules.py b/secop/modules.py index 8780558..83e691e 100644 --- a/secop/modules.py +++ b/secop/modules.py @@ -77,11 +77,11 @@ class Module(object): # reference to the dispatcher (used for sending async updates) DISPATCHER = None - def __init__(self, logger, cfgdict, modname, dispatcher): + def __init__(self, name, logger, cfgdict, srv): # remember the dispatcher object (for the async callbacks) - self.DISPATCHER = dispatcher + self.DISPATCHER = srv.dispatcher self.log = logger - self.name = modname + self.name = name # handle module properties # 1) make local copies of properties @@ -97,6 +97,8 @@ class Module(object): if k[0] == '.': if k[1:] in self.properties: self.properties[k[1:]] = cfgdict.pop(k) + elif k[1] == '_': + self.properties[k[1:]] = cfgdict.pop(k) else: raise ConfigError('Module %r has no property %r' % (self.name, k[1:])) @@ -109,8 +111,8 @@ class Module(object): mycls = self.__class__ myclassname = '%s.%s' % (mycls.__module__, mycls.__name__) self.properties['_implementation'] = myclassname - self.properties['interface_class'] = [ - b.__name__ for b in mycls.__mro__ if b.__module__.startswith('secop.modules')] + self.properties['interface_class'] = [[ + b.__name__ for b in mycls.__mro__ if b.__module__.startswith('secop.modules')][0]] # handle Features # XXX: todo @@ -138,7 +140,7 @@ class Module(object): elif hasattr(paramobj, propname): setattr(paramobj, propname, cfgdict.pop(k)) else: - raise ConfigError('Module %s: Parameter %r has not property %r!' % + raise ConfigError('Module %s: Parameter %r has no property %r!' % (self.name, paramname, propname)) # 3) check config for problems: @@ -167,12 +169,11 @@ class Module(object): # 5) 'apply' config: # pass values through the datatypes and store as attributes for k, v in cfgdict.items(): - if k == 'value': - continue # apply datatype, complain if type does not fit datatype = self.accessibles[k].datatype try: v = datatype.validate(v) + self.accessibles[k].default = v except (ValueError, TypeError): self.log.exception(formatExtendedStack()) raise diff --git a/secop/protocol/dispatcher.py b/secop/protocol/dispatcher.py index 4ac4d6a..951a0e1 100644 --- a/secop/protocol/dispatcher.py +++ b/secop/protocol/dispatcher.py @@ -41,11 +41,15 @@ from __future__ import print_function from time import time as currenttime import threading -from secop.protocol.messages import Message, EVENTREPLY, IDENTREQUEST -from secop.protocol.errors import SECOPError, NoSuchModuleError, \ - NoSuchCommandError, NoSuchParameterError, BadValueError, ReadonlyError -from secop.lib import formatExtendedStack, formatException -from secop.params import Parameter, Command +from secop.protocol.messages import EVENTREPLY, IDENTREQUEST, IDENTREPLY, \ + ENABLEEVENTSREPLY, DESCRIPTIONREPLY, WRITEREPLY, COMMANDREPLY, \ + DISABLEEVENTSREPLY, HEARTBEATREPLY + +from secop.protocol.errors import InternalError, NoSuchModuleError, \ + NoSuchCommandError, NoSuchParameterError, BadValueError, ReadonlyError, \ + ProtocolError + +from secop.params import Parameter try: unicode('a') @@ -56,9 +60,9 @@ except NameError: class Dispatcher(object): - def __init__(self, logger, options): + def __init__(self, name, logger, options, srv): # to avoid errors, we want to eat all options here - self.equipment_id = options[u'equipment_id'] + self.equipment_id = name self.nodeopts = {} for k in list(options): self.nodeopts[k] = options.pop(k) @@ -84,13 +88,12 @@ class Dispatcher(object): if reallyall: listeners = self._connections else: - if getattr(msg, u'command', None) is None: - eventname = u'%s:%s' % (msg.module, msg.parameter - if msg.parameter else u'value') - else: - eventname = u'%s:%s()' % (msg.module, msg.command) - listeners = self._subscriptions.get(eventname, set()).copy() - listeners.update(self._subscriptions.get(msg.module, set())) + # all subscribers to module:param + listeners = self._subscriptions.get(msg[1], set()).copy() + # all subscribers to module + module = msg[1].split(':', 1)[0] + listeners.update(self._subscriptions.get(module, set())) + # all generic subscribers listeners.update(self._active_connections) for conn in listeners: conn.queue_async_reply(msg) @@ -98,22 +101,26 @@ class Dispatcher(object): def announce_update(self, moduleobj, pname, pobj): """called by modules param setters to notify subscribers of new values """ - msg = Message(EVENTREPLY, module=moduleobj.name, parameter=pname) - msg.set_result(pobj.export_value(), dict(t=pobj.timestamp)) + msg = (EVENTREPLY, u'%s:%s' % (moduleobj.name, pname), [pobj.export_value(), dict(t=pobj.timestamp)]) self.broadcast_event(msg) - def subscribe(self, conn, modulename, pname=u'value'): + def subscribe(self, conn, modulename, pname=None): eventname = modulename if pname: eventname = u'%s:%s' % (modulename, pname) self._subscriptions.setdefault(eventname, set()).add(conn) - def unsubscribe(self, conn, modulename, pname=u'value'): - eventname = modulename + def unsubscribe(self, conn, modulename, pname=None): if pname: eventname = u'%s:%s' % (modulename, pname) + else: + eventname = modulename + # also remove 'more specific' subscriptions + for k, v in self._subscriptions.items(): + if k.startswith(u'%s:' % modulename): + v.discard(conn) if eventname in self._subscriptions: - self._subscriptions.setdefault(eventname, set()).discard(conn) + self._subscriptions[eventname].discard(conn) def add_connection(self, conn): """registers new connection""" @@ -125,6 +132,7 @@ class Dispatcher(object): self._connections.remove(conn) for _evt, conns in list(self._subscriptions.items()): conns.discard(conn) + self._active_connections.discard(conn) def register_module(self, moduleobj, modulename, export=True): self.log.debug(u'registering module %r as %s (export=%r)' % @@ -138,15 +146,17 @@ class Dispatcher(object): return self._modules[modulename] elif modulename in list(self._modules.values()): return modulename - raise NoSuchModuleError(module=unicode(modulename)) + raise NoSuchModuleError('Module does not exist on this SEC-Node!') def remove_module(self, modulename_or_obj): - moduleobj = self.get_module(modulename_or_obj) or modulename_or_obj + moduleobj = self.get_module(modulename_or_obj) modulename = moduleobj.name if modulename in self._export: self._export.remove(modulename) self._modules.pop(modulename) - # XXX: also clean _subscriptions + self._subscriptions.pop(modulename, None) + for k in [k for k in self._subscriptions if k.startswith(u'%s:' % modulename)]: + self._subscriptions.pop(k, None) def list_module_names(self): # return a copy of our list @@ -158,7 +168,7 @@ class Dispatcher(object): # omit export=False params! res = [] for aname, aobj in self.get_module(modulename).accessibles.items(): - if isinstance(aobj, Command) or aobj.export: + if aobj.export: res.extend([aname, aobj.for_export()]) self.log.debug(u'list accessibles for module %s -> %r' % (modulename, res)) @@ -190,33 +200,32 @@ class Dispatcher(object): moduleobj = self.get_module(modulename) if moduleobj is None: - raise NoSuchModuleError(module=modulename) + raise NoSuchModuleError('Module does not exist on this SEC-Node!') cmdspec = moduleobj.accessibles.get(command, None) if cmdspec is None: - raise NoSuchCommandError(module=modulename, command=command) - if len(cmdspec.datatype.argtypes) != len(arguments): - raise BadValueError( - module=modulename, - command=command, - reason=u'Wrong number of arguments!') + raise NoSuchCommandError('Module has no such command!') + num_args_required = len(cmdspec.datatype.argtypes) + if num_args_required != len(arguments): + raise BadValueError(u'Wrong number of arguments (need %d, got %d)!' % (num_args_required, len(arguments))) # now call func and wrap result as value # note: exceptions are handled in handle_request, not here! func = getattr(moduleobj, u'do_' + command) res = func(*arguments) + # XXX: pipe through cmdspec.datatype.result ? return res, dict(t=currenttime()) def _setParameterValue(self, modulename, pname, value): moduleobj = self.get_module(modulename) if moduleobj is None: - raise NoSuchModuleError(module=modulename) + raise NoSuchModuleError('Module does not exist on this SEC-Node!') pobj = moduleobj.accessibles.get(pname, None) if pobj is None or not isinstance(pobj, Parameter): - raise NoSuchParameterError(module=modulename, parameter=pname) + raise NoSuchParameterError('Module has no such parameter on this SEC-Node!') if pobj.readonly: - raise ReadonlyError(module=modulename, parameter=pname) + raise ReadonlyError('This parameter can not be changed remotely.') writefunc = getattr(moduleobj, u'write_%s' % pname, None) # note: exceptions are handled in handle_request, not here! @@ -231,11 +240,11 @@ class Dispatcher(object): def _getParameterValue(self, modulename, pname): moduleobj = self.get_module(modulename) if moduleobj is None: - raise NoSuchModuleError(module=modulename) + raise NoSuchModuleError('Module does not exist on this SEC-Node!') pobj = moduleobj.accessibles.get(pname, None) if pobj is None or not isinstance(pobj, Parameter): - raise NoSuchParameterError(module=modulename, parameter=pname) + raise NoSuchParameterError('Module has no such parameter on this SEC-Node!') readfunc = getattr(moduleobj, u'read_%s' % pname, None) if readfunc: @@ -253,155 +262,107 @@ class Dispatcher(object): def handle_request(self, conn, msg): """handles incoming request - will call 'queue.request(data)' on conn to send reply before returning + will call 'queue_async_request(data)' on conn or return reply """ - self.log.debug(u'Dispatcher: handling msg: %r' % msg) - # if there was an error in the frontend, bounce the resulting - # error msgObj directly back to the client - if msg.errorclass: - return msg + self.log.debug(u'Dispatcher: handling msg: %s' % repr(msg)) # play thread safe ! + # XXX: ONLY ONE REQUEST (per dispatcher) AT A TIME with self._lock: - if msg.action == IDENTREQUEST: - self.log.debug(u'Looking for handle_ident') - handler = self.handle_ident - else: - self.log.debug(u'Looking for handle_%s' % msg.action) - handler = getattr(self, u'handle_%s' % msg.action, None) + action, specifier, data = msg + # special case for *IDN? + if action == IDENTREQUEST: + action, specifier, data = 'ident', None, None + + self.log.debug(u'Looking for handle_%s' % action) + handler = getattr(self, u'handle_%s' % action, None) + if handler: - try: - reply = handler(conn, msg) - if reply: - conn.queue_reply(reply) - return None - except SECOPError as err: - self.log.exception(err) - msg.set_error(err.name, unicode(err), {})#u'traceback': formatException(), - #u'extended_stack':formatExtendedStack()}) - return msg - except (ValueError, TypeError) as err: - self.log.exception(err) - msg.set_error(u'BadValue', unicode(err), {u'traceback': formatException()}) - print(u'--------------------') - print(formatExtendedStack()) - print(u'====================') - return msg - except Exception as err: - self.log.exception(err) - msg.set_error(u'InternalError', unicode(err), {u'traceback': formatException()}) - print(u'--------------------') - print(formatExtendedStack()) - print(u'====================') - return msg + return handler(conn, specifier, data) else: - self.log.error(u'Can not handle msg %r' % msg) - msg.set_error(u'Protocol', u'unhandled msg', {}) - return msg + raise InternalError('unhandled message!') # now the (defined) handlers for the different requests - def handle_help(self, conn, msg): - msg.mkreply() - return msg + def handle_help(self, conn, specifier, data): + self.log.error('should have been handled in the interface!') - def handle_ident(self, conn, msg): - msg.mkreply() - return msg + def handle_ident(self, conn, specifier, data): + return (IDENTREPLY, None, None) - def handle_describe(self, conn, msg): - # XXX:collect descriptive data - msg.setvalue(u'specifier', u'.') - msg.setvalue(u'data', self.get_descriptive_data()) - msg.mkreply() - return msg + def handle_describe(self, conn, specifier, data): + return (DESCRIPTIONREPLY, '.', self.get_descriptive_data()) - def handle_read(self, conn, msg): - # XXX: trigger polling and force sending event - if not msg.parameter: - msg.parameter = u'value' - msg.set_result(*self._getParameterValue(msg.module, msg.parameter)) + def handle_read(self, conn, specifier, data): + if data: + raise ProtocolError('poll request don\'t take data!') + modulename, pname = specifier, u'value' + if ':' in specifier: + modulename, pname = specifier.split(':', 1) + # XXX: trigger polling and force sending event ??? + return (EVENTREPLY, specifier, list(self._getParameterValue(modulename, pname))) - #if conn in self._active_connections: - # return None # already send to myself - #if conn in self._subscriptions.get(msg.module, set()): - # return None # already send to myself - msg.mkreply() - return msg # send reply to inactive conns + def handle_change(self, conn, specifier, data): + modulename, pname = specifier, u'value' + if ':' in specifier: + modulename, pname = specifier.split(u':', 1) + return (WRITEREPLY, specifier, list(self._setParameterValue(modulename, pname, data))) - def handle_change(self, conn, msg): - # try to actually write XXX: should this be done asyncron? we could - # just return the reply in that case - if not msg.parameter: - msg.parameter = u'target' - msg.set_result(*self._setParameterValue(msg.module, msg.parameter, msg.data)) - - #if conn in self._active_connections: - # return None # already send to myself - #if conn in self._subscriptions.get(msg.module, set()): - # return None # already send to myself - msg.mkreply() - return msg # send reply to inactive conns - - def handle_do(self, conn, msg): + def handle_do(self, conn, specifier, data): # XXX: should this be done asyncron? we could just return the reply in # that case - if not msg.args: - msg.args = [] - # try to actually execute command - msg.set_result(*self._execute_command(msg.module, msg.command, msg.args)) + modulename, cmd = specifier.split(u':', 1) + return (COMMANDREPLY, specifier, list(self._execute_command(modulename, cmd, data))) - #if conn in self._active_connections: - # return None # already send to myself - #if conn in self._subscriptions.get(msg.module, set()): - # return None # already send to myself - msg.mkreply() - return msg # send reply to inactive conns + def handle_ping(self, conn, specifier, data): + if data: + raise ProtocolError('poll request don\'t take data!') + return (HEARTBEATREPLY, specifier, [None, {u't':currenttime()}]) - def handle_ping(self, conn, msg): - msg.setvalue(u'data', {u't':currenttime()}) - msg.mkreply() - return msg - - def handle_activate(self, conn, msg): - if msg.module: - if msg.module not in self._modules: - raise NoSuchModuleError() + def handle_activate(self, conn, specifier, data): + if data: + raise ProtocolError('activate request don\'t take data!') + if specifier: + modulename, pname = specifier, None + if ':' in specifier: + modulename, pname = specifier.split(u':', 1) + if modulename not in self._export: + raise NoSuchModuleError('Module does not exist on this SEC-Node!') + if pname and pname not in self.get_module(modulename).accessibles: + # what if we try to subscribe a command here ??? + raise NoSuchParameterError('Module has no such parameter on this SEC-Node!') # activate only ONE module - self.subscribe(conn, msg.specifier, u'') - modules = [msg.specifier] + self.subscribe(conn, modulename, pname) + modules = [(modulename, pname)] else: # activate all modules self._active_connections.add(conn) - modules = self._modules + modules = [(m, None) for m in self._export] - # send updates for all values. The first poll already happend before the server is active - for modulename in modules: + # send updates for all subscribed values. + # note: The initial poll already happend before the server is active + for modulename, pname in modules: moduleobj = self._modules.get(modulename, None) - if moduleobj is None: - self.log.error(u'activate: can not lookup module %r, skipping it' % modulename) + if pname: + pobj = moduleobj.accessibles[pname] + updmsg = (EVENTREPLY, u'%s:%s' % (modulename, pname), + [pobj.export_value(), dict(t=pobj.timestamp)]) + conn.queue_async_reply(updmsg) continue - for pname, pobj in moduleobj.accessibles.items(): + for pname, pobj in moduleobj.accessibles.items(): # pylint: disable=redefined-outer-name if not isinstance(pobj, Parameter): continue if not pobj.export: # XXX: handle export_as cases! continue # can not use announce_update here, as this will send to all clients - updmsg = Message(EVENTREPLY, module=moduleobj.name, parameter=pname) - updmsg.set_result(pobj.export_value(), dict(t=pobj.timestamp)) + updmsg = (EVENTREPLY, u'%s:%s' % (modulename, pname), + [pobj.export_value(), dict(t=pobj.timestamp)]) conn.queue_async_reply(updmsg) - msg.mkreply() - conn.queue_async_reply(msg) # should be sent AFTER all the ^^initial updates - return None + return (ENABLEEVENTSREPLY, specifier, None) if specifier else (ENABLEEVENTSREPLY, None, None) - def handle_deactivate(self, conn, msg): - if msg.specifier: - self.unsubscribe(conn, msg.specifier, u'') + def handle_deactivate(self, conn, specifier, data): + if specifier: + self.unsubscribe(conn, specifier) else: self._active_connections.discard(conn) # XXX: also check all entries in self._subscriptions? - msg.mkreply() - return msg - - def handle_error(self, conn, msg): - # is already an error-reply (came from interface frontend) -> just send it back - return msg + return (DISABLEEVENTSREPLY, None, None) diff --git a/secop/protocol/interface/__init__.py b/secop/protocol/interface/__init__.py index b5a8225..85a0fb0 100644 --- a/secop/protocol/interface/__init__.py +++ b/secop/protocol/interface/__init__.py @@ -19,12 +19,47 @@ # Enrico Faulhaber # # ***************************************************************************** -"""provide server interfaces to be used by clients""" -from __future__ import absolute_import -from .tcp import TCPServer +import json -INTERFACES = {'tcp': TCPServer, } +EOL = b'\n' +SPACE = b' ' -# for 'from protocol.interface import *' to only import the dict -__ALL__ = ['INTERFACES'] +def encode_msg_frame(action, specifier=None, data=None): + """ encode a msg_tripel into an msg_frame, ready to be sent + + action (and optional specifier) are unicode strings, + data may be an json-yfied python object""" + action = action.encode('utf-8') + if specifier is None: + # implicit: data is None + return b''.join((action, EOL)) + specifier = specifier.encode('utf-8') + if data: + data = json.dumps(data).encode('utf-8') + return b''.join((action, SPACE, specifier, SPACE, data, EOL)) + return b''.join((action, SPACE, specifier, EOL)) + + +def get_msg(_bytes): + """try to deframe the next msg in (binary) input + always return a tupel (msg, remaining_input) + msg may also be None + """ + if EOL not in _bytes: + return None, _bytes + return _bytes.split(EOL, 1) + + +def decode_msg(msg): + """decode the (binary) msg into a (unicode) msg_tripel""" + # check for leading/trailing CR and remove it + res = msg.split(b' ', 2) + action = res[0].decode('utf-8') + if len(res) == 1: + return action, None, None + specifier = res[1].decode('utf-8') + if len(res) == 2: + return action, specifier, None + data = json.loads(res[2].decode('utf-8')) + return action, specifier, data diff --git a/secop/protocol/interface/tcp.py b/secop/protocol/interface/tcp.py index c3df995..37f60a1 100644 --- a/secop/protocol/interface/tcp.py +++ b/secop/protocol/interface/tcp.py @@ -30,61 +30,19 @@ except ImportError: import SocketServer as socketserver # py2 from secop.lib import formatExtendedStack, formatException -from secop.protocol.messages import HELPREPLY, Message, HelpMessage +from secop.protocol.messages import HELPREQUEST, HELPREPLY, HelpMessage from secop.errors import SECoPError +from secop.protocol.interface import encode_msg_frame, get_msg, decode_msg DEF_PORT = 10767 -MAX_MESSAGE_SIZE = 1024 +MESSAGE_READ_SIZE = 1024 + -EOL = b'\n' CR = b'\r' SPACE = b' ' -def encode_msg_frame(action, specifier=None, data=None): - """ encode a msg_tripel into an msg_frame, ready to be sent - - action (and optional specifier) are unicode strings, - data may be an json-yfied python object""" - action = action.encode('utf-8') - if specifier is None: - # implicit: data is None - return b''.join((action, EOL)) - specifier = specifier.encode('utf-8') - if data: - data = data.encode('utf-8') - return b''.join((action, SPACE, specifier, SPACE, data, EOL)) - return b''.join((action, SPACE, specifier, EOL)) - - -def get_msg(_bytes): - """try to deframe the next msg in (binary) input - always return a tupel (msg, remaining_input) - msg may also be None - """ - if EOL not in _bytes: - return None, _bytes - return _bytes.split(EOL, 1) - - -def decode_msg(msg): - """decode the (binary) msg into a (unicode) msg_tripel""" - # check for leading/trailing CR and remove it - if msg and msg[0] == CR: - msg = msg[1:] - if msg and msg[-1] == CR: - msg = msg[:-1] - - res = msg.split(b' ', 2) - action = res[0].decode('utf-8') - if len(res) == 1: - return action, None, None - specifier = res[1].decode('utf-8') - if len(res) == 2: - return action, specifier, None - data = res[2].decode('utf-8') - return action, specifier, data class TCPRequestHandler(socketserver.BaseRequestHandler): @@ -118,10 +76,11 @@ class TCPRequestHandler(socketserver.BaseRequestHandler): # put frame(s) into framer to get bytestring # send bytestring outmsg = self._queue.popleft() - #outmsg.mkreply() - outdata = encode_msg_frame(*outmsg.serialize()) -# outframes = self.encoding.encode(outmsg) -# outdata = self.framing.encode(outframes) + if not outmsg: + outmsg = ('error','InternalError', ['', 'trying to send none-data', {}]) + if len(outmsg) > 3: + outmsg = ('error', 'InternalError', ['', 'bad message format', {'msg':outmsg}]) + outdata = encode_msg_frame(*outmsg) try: mysocket.sendall(outdata) except Exception: @@ -129,7 +88,7 @@ class TCPRequestHandler(socketserver.BaseRequestHandler): # XXX: improve: use polling/select here? try: - newdata = mysocket.recv(MAX_MESSAGE_SIZE) + newdata = mysocket.recv(MESSAGE_READ_SIZE) if not newdata: # no timeout error, but no new data -> connection closed return @@ -150,33 +109,39 @@ class TCPRequestHandler(socketserver.BaseRequestHandler): origin, data = get_msg(data) if origin is None: break # no more messages to process - if not origin: # empty string -> send help message + origin = origin.strip() + if origin and origin[0] == CR: + origin = origin[1:] + if origin and origin[-1] == CR: + origin = origin[:-1] + if origin in (HELPREQUEST, ''): # empty string -> send help message for idx, line in enumerate(HelpMessage.splitlines()): - msg = Message(HELPREPLY, specifier='%d' % idx) - msg.data = line - self.queue_async_reply(msg) + self.queue_async_reply((HELPREPLY, '%d' % (idx+1), line)) continue msg = decode_msg(origin) - # construct msgObj from msg + result = None try: - msgObj = Message(*msg) - msgObj.origin = origin.decode('latin-1') - msgObj = serverobj.dispatcher.handle_request(self, msgObj) + result = serverobj.dispatcher.handle_request(self, msg) + if (msg[0] == 'read') and result: + # read should only trigger async_replies + self.queue_async_reply(('error', 'InternalError', [origin, + 'read should only trigger async data units'])) except SECoPError as err: - msgObj.set_error(err.name, str(err), {'exception': formatException(), - 'traceback': formatExtendedStack()}) + result = ('error', err.name, [origin, str(err), {'exception': formatException(), + 'traceback': formatExtendedStack()}]) except Exception as err: # create Error Obj instead - msgObj.set_error(u'Internal', str(err), {'exception': formatException(), - 'traceback':formatExtendedStack()}) + result = ('error', 'InternalError', [origin, str(err), {'exception': formatException(), + 'traceback': formatExtendedStack()}]) print('--------------------') print(formatException()) print('--------------------') print(formatExtendedStack()) print('====================') - if msgObj: - self.queue_reply(msgObj) + if not result: + self.log.error('empty result upon msg %s' % repr(msg)) + self.queue_async_reply(result) def queue_async_reply(self, data): """called by dispatcher for async data units""" @@ -185,14 +150,6 @@ class TCPRequestHandler(socketserver.BaseRequestHandler): else: self.log.error('should async_queue empty data!') - def queue_reply(self, data): - """called by dispatcher to queue (sync) replies""" - # sync replies go first! - if data: - self._queue.appendleft(data) - else: - self.log.error('should queue empty data!') - def finish(self): """called when handle() terminates, i.e. the socket closed""" self.log.info('closing connection from %s:%d' % self.client_address) @@ -211,16 +168,17 @@ class TCPServer(socketserver.ThreadingTCPServer): daemon_threads = True allow_reuse_address = True - def __init__(self, logger, interfaceopts, dispatcher): - self.dispatcher = dispatcher + def __init__(self, name, logger, options, srv): + self.dispatcher =srv.dispatcher + self.name = name self.log = logger - bindto = interfaceopts.pop('bindto', 'localhost') - portnum = int(interfaceopts.pop('bindport', DEF_PORT)) + bindto = options.pop('bindto', 'localhost') + portnum = int(options.pop('bindport', DEF_PORT)) if ':' in bindto: bindto, _port = bindto.rsplit(':') portnum = int(_port) - self.log.info("TCPServer binding to %s:%d" % (bindto, portnum)) + self.log.info("TCPServer %s binding to %s:%d" % (name, bindto, portnum)) socketserver.ThreadingTCPServer.__init__( self, (bindto, portnum), TCPRequestHandler, bind_and_activate=True) self.log.info("TCPServer initiated") diff --git a/secop/protocol/messages.py b/secop/protocol/messages.py index 2a98d77..9200065 100644 --- a/secop/protocol/messages.py +++ b/secop/protocol/messages.py @@ -22,9 +22,6 @@ """Define SECoP Messages""" from __future__ import print_function -import json -from secop.protocol.errors import EXCEPTIONS - # allowed actions: IDENTREQUEST = u'*IDN?' # literal @@ -76,120 +73,6 @@ REQUEST2REPLY = { -class Message(object): - """base class for messages""" - origin = u'' - action = u'' - specifier = None - data = None - - # cooked versions - module = None - parameter = None - command = None - args = None - - # if set, these are used for generating the reply - qualifiers = None # will be rectified to dict() in __init__ - value = None # also the result of a command - - # if set, these are used for generating the error msg - errorclass = '' # -> specifier - errordescription = '' # -> data[1] (data[0] is origin) - errorinfo = {} # -> data[2] - - def __init__(self, action, specifier=None, data=None, **kwds): - self.qualifiers = {} - self.action = action - if data: - data = json.loads(data) - if specifier: - self.module = specifier - self.specifier = specifier - if ':' in specifier: - self.module, p = specifier.split(':',1) - if action in (COMMANDREQUEST, COMMANDREPLY): - self.command = p - # XXX: extract args? - self.args = data - else: - self.parameter = p - if data is not None: - self.data = data - elif data is not None: - self.data = data - # record extra values - self.__arguments = set() - for k, v in kwds.items(): - self.setvalue(k, v) - - def setvalue(self, key, value): - setattr(self, key, value) - self.__arguments.add(key) - - def setqualifier(self, key, value): - self.qualifiers[key] = value - - def __repr__(self): - return u'Message(%r' % self.action + \ - u', '.join('%s=%s' % (k, repr(getattr(self, k))) - for k in sorted(self.__arguments)) + u')' - - def serialize(self): - """return ,, triple""" - if self.errorclass: - for k in self.__arguments: - if k in (u'origin', u'errorclass', u'errorinfo', u'errordescription'): - if k in self.errorinfo: - del self.errorinfo[k] - continue - self.errorinfo[k] = getattr(self, k) - data = [self.origin, self.errordescription, self.errorinfo] - print(repr(data)) - return ERRORREPLY, self.errorclass, json.dumps(data) - elif self.value or self.qualifiers: - data = [self.value, self.qualifiers] - else: - data = self.data - - try: - data = json.dumps(data) if data else u'' - except TypeError: - print('Can not serialze: %s' % repr(data)) - data = u'none' - - if self.specifier: - specifier = self.specifier - else: - specifier = self.module - if self.parameter: - specifier = u'%s:%s' %(self.module, self.parameter) - if self.command: - specifier = u'%s:%s' %(self.module, self.command) - return self.action, specifier, data - - def mkreply(self): - self.action = REQUEST2REPLY.get(self.action, self.action) - - def set_error(self, errorclass, errordescription, errorinfo): - if errorclass not in EXCEPTIONS: - errordescription = '%s is not an official errorclass!\n%s' % (errorclass, errordescription) - errorclass = u'Internal' - # used to mark thes as an error message - # XXX: check errorclass for allowed values ! - self.setvalue(u'errorclass', errorclass) # a str - self.setvalue(u'errordescription', errordescription) # a str - self.setvalue(u'errorinfo', errorinfo) # a dict - self.action = ERRORREPLY - - def set_result(self, value, qualifiers): - # used to mark thes as an result reply message - self.setvalue(u'value', value) - self.qualifiers.update(qualifiers) - self.__arguments.add(u'qualifier') - - - HelpMessage = u"""Try one of the following: '%s' to query protocol version '%s' to read the description diff --git a/secop/server.py b/secop/server.py index 0d3dfc6..5235eba 100644 --- a/secop/server.py +++ b/secop/server.py @@ -27,6 +27,7 @@ import os import ast import time import threading +from collections import OrderedDict try: import configparser # py3 @@ -46,13 +47,22 @@ except ImportError: import daemon.pidfile as pidlockfile from secop.lib import get_class, formatException, getGeneralConfig -from secop.protocol.dispatcher import Dispatcher -from secop.protocol.interface import INTERFACES from secop.errors import ConfigError -class Server(object): +class Server(object): + # list allowed section prefixes + # if mapped dict does not exist -> section need a 'class' option + # otherwise a 'type' option is evaluatet and the class from the mapping dict used + # + # IMPORTANT: keep he order! (node MUST be first, as the others are referencing it!) + CFGSECTIONS = [ + # section_prefix, default type, mapping of selectable classes + ('node', None, {None: "protocol.dispatcher.Dispatcher"}), + ('module', None, None), + ('interface', "tcp", {"tcp": "protocol.interface.tcp.TCPServer"}), + ] def __init__(self, name, parent_logger=None): cfg = getGeneralConfig() @@ -97,9 +107,9 @@ class Server(object): self.log.info(u'startup done, handling transport messages') self._threads = set() - for _if in self._interfaces: - self.log.debug(u'starting thread for interface %r' % _if) - t = threading.Thread(target=_if.serve_forever) + for ifname, ifobj in self.interfaces.items(): + self.log.debug(u'starting thread for interface %r' % ifname) + t = threading.Thread(target=ifobj.serve_forever) t.daemon = True t.start() self._threads.add(t) @@ -122,108 +132,76 @@ class Server(object): self.log.error(u'Couldn\'t read cfg file !') raise ConfigError(u'Couldn\'t read cfg file %r' % self._cfgfile) - self._interfaces = [] - moduleopts = [] - interfaceopts = [] - equipment_id = None - nodeopts = [] - for section in parser.sections(): - if section.lower().startswith(u'module '): - # module section - # omit leading 'module ' string - devname = section[len(u'module '):] - devopts = dict(item for item in parser.items(section)) - if u'class' not in devopts: - self.log.error(u'Module %s needs a class option!') - raise ConfigError( - u'cfgfile %r: Module %s needs a class option!' % - (self._cfgfile, devname)) - # MAGIC: transform \n.\n into \n\n which are normally stripped - # by the ini parser - for k in devopts: - v = devopts[k] - while u'\n.\n' in v: - v = v.replace(u'\n.\n', u'\n\n') - devopts[k] = v - # try to import the class, raise if this fails - devopts[u'class'] = get_class(devopts[u'class']) - # all went well so far - moduleopts.append([devname, devopts]) - if section.lower().startswith(u'interface '): - # interface section - # omit leading 'interface ' string - ifname = section[len(u'interface '):] - ifopts = dict(item for item in parser.items(section)) - if u'interface' not in ifopts: - self.log.error(u'Interface %s needs an interface option!') - raise ConfigError( - u'cfgfile %r: Interface %s needs an interface option!' % - (self._cfgfile, ifname)) - # all went well so far - interfaceopts.append([ifname, ifopts]) - if section.lower().startswith(u'equipment ') or section.lower().startswith(u'node '): - if equipment_id is not None: - raise ConfigError(u'cfgfile %r: only one [node ] section allowed, found another [%s]!' % ( - self._cfgfile, section)) - # equipment/node settings - equipment_id = section.split(u' ', 1)[1].replace(u' ', u'_') - nodeopts = dict(item for item in parser.items(section)) - nodeopts[u'equipment_id'] = equipment_id - nodeopts[u'id'] = equipment_id - # MAGIC: transform \n.\n into \n\n which are normally stripped - # by the ini parser - for k in nodeopts: - v = nodeopts[k] - while u'\n.\n' in v: - v = v.replace(u'\n.\n', u'\n\n') - nodeopts[k] = v - if equipment_id is None: - self.log.error(u'Need a [node ] section, none found!') - raise ConfigError( - u'cfgfile %r: need an [node ] option!' % (self._cfgfile)) + for kind, devtype, classmapping in self.CFGSECTIONS: + kinds = u'%ss' % kind + objs = OrderedDict() + self.__dict__[kinds] = objs + for section in parser.sections(): + prefix = u'%s ' % kind + if section.lower().startswith(prefix): + name = section[len(prefix):] + opts = dict(item for item in parser.items(section)) + if u'class' in opts: + cls = opts.pop(u'class') + else: + if not classmapping: + self.log.error(u'%s %s needs a class option!' % (kind.title(), name)) + raise ConfigError(u'cfgfile %r: %s %s needs a class option!' % + (self._cfgfile, kind.title(), name)) + type_ = opts.pop(u'type', devtype) + cls = classmapping.get(type_, None) + if not cls: + self.log.error(u'%s %s needs a type option (select one of %s)!' % + (kind.title(), name, ', '.join(repr(r) for r in classmapping))) + raise ConfigError(u'cfgfile %r: %s %s needs a type option (select one of %s)!' % + (self._cfgfile, kind.title(), name, ', '.join(repr(r) for r in classmapping))) + # MAGIC: transform \n.\n into \n\n which are normally stripped + # by the ini parser + for k in opts: + v = opts[k] + while u'\n.\n' in v: + v = v.replace(u'\n.\n', u'\n\n') + try: + opts[k] = ast.literal_eval(v) + except Exception: + pass + opts[k] = v - self._dispatcher = self._buildObject( - u'Dispatcher', Dispatcher, nodeopts) - self._processInterfaceOptions(interfaceopts) - self._processModuleOptions(moduleopts) + # try to import the class, raise if this fails + self.log.debug(u'Creating %s %s ...' % (kind.title(), name)) + # cls.__init__ should pop all used args from options! + logname = u'dispatcher' if kind == u'node' else u'%s_%s' % (kind, name.lower()) + obj = get_class(cls)(name, self.log.getChild(logname), opts, self) + if opts: + raise ConfigError(u'%s %s: class %s: don\'t know how to handle option(s): %s' % + (kind, name, cls, u', '.join(opts))) - def _processModuleOptions(self, moduleopts): - # check modules opts by creating them - devs = [] - for devname, devopts in moduleopts: - devclass = devopts.pop(u'class') - # create module - self.log.debug(u'Creating Module %r' % devname) - export = devopts.pop(u'export', u'1') - export = export.lower() in (u'1', u'on', u'true', u'yes') - if u'default' in devopts: - devopts[u'value'] = devopts.pop(u'default') - # strip '" - for k, v in devopts.items(): - try: - devopts[k] = ast.literal_eval(v) - except Exception: - pass - devobj = devclass( - self.log.getChild(devname), devopts, devname, self._dispatcher) - devs.append([devname, devobj, export]) + # all went well so far + objs[name] = obj - # connect modules with dispatcher - for devname, devobj, export in devs: - self.log.info(u'registering module %r' % devname) - self._dispatcher.register_module(devobj, devname, export) + # following line is the reason for 'node' beeing the first entry in CFGSECTIONS + if len(self.nodes) != 1: + raise ConfigError(u'cfgfile %r: needs exactly one node section!' % self._cfgfile) + self.dispatcher = self.nodes.values()[0] + + # all objs created, now start them up and interconnect + for modname, modobj in self.modules.items(): + self.log.info(u'registering module %r' % modname) + self.dispatcher.register_module(modobj, modname, modobj.properties['export']) # also call early_init on the modules - devobj.early_init() + modobj.early_init() + # call init on each module after registering all - for _devname, devobj, _export in devs: - devobj.init_module() + for modname, modobj in self.modules.items(): + modobj.init_module() + starting_modules = set() finished_modules = Queue() - for _devname, devobj, _export in devs: - starting_modules.add(devobj) - devobj.start_module(started_callback=finished_modules.put) + for modname, modobj in self.modules.items(): + starting_modules.add(modobj) + modobj.start_module(started_callback=finished_modules.put) # remark: it is the module implementors responsibility to call started_callback # within reasonable time (using timeouts). If we find later, that this is not # enough, we might insert checking for a timeout here, and somehow set the remaining @@ -234,22 +212,3 @@ class Server(object): # use discard instead of remove here, catching the case when started_callback is called twice starting_modules.discard(finished) finished_modules.task_done() - - def _processInterfaceOptions(self, interfaceopts): - # eval interfaces - self._interfaces = [] - for ifname, ifopts in interfaceopts: - ifclass = ifopts.pop(u'interface') - ifclass = INTERFACES[ifclass] - interface = self._buildObject(ifname, ifclass, ifopts, - self._dispatcher) - self._interfaces.append(interface) - - def _buildObject(self, name, cls, options, *args): - self.log.debug(u'Creating %s ...' % name) - # cls.__init__ should pop all used args from options! - obj = cls(self.log.getChild(name.lower()), options, *args) - if options: - raise ConfigError(u'%s: don\'t know how to handle option(s): %s' % - (cls.__name__, u', '.join(options))) - return obj diff --git a/test/test_client_baseclient.py b/test/test_client_baseclient.py index b0a8834..adb28e7 100644 --- a/test/test_client_baseclient.py +++ b/test/test_client_baseclient.py @@ -20,13 +20,16 @@ # # ***************************************************************************** """test base client.""" +from __future__ import print_function + +import sys +from os import path +sys.path.insert(0, path.abspath(path.join(path.dirname(__file__), '..'))) + +from collections import OrderedDict import pytest -import sys -sys.path.insert(0, sys.path[0] + '/..') - -from collections import OrderedDict from secop.client.baseclient import Client # define Test-only connection object @@ -52,6 +55,7 @@ def clientobj(request): print (" TEARDOWN ClientObj") +# pylint: disable=redefined-outer-name def test_describing_data_decode(clientobj): assert OrderedDict( [('a', 1)]) == clientobj._decode_list_to_ordereddict(['a', 1]) diff --git a/test/test_datatypes.py b/test/test_datatypes.py index 453d323..d75c470 100644 --- a/test/test_datatypes.py +++ b/test/test_datatypes.py @@ -22,7 +22,8 @@ """test data types.""" import sys -sys.path.insert(0, sys.path[0] + '/..') +from os import path +sys.path.insert(0, path.abspath(path.join(path.dirname(__file__), '..'))) # no fixtures needed import pytest diff --git a/test/test_lib_enum.py b/test/test_lib_enum.py index fc844fe..ca93a72 100644 --- a/test/test_lib_enum.py +++ b/test/test_lib_enum.py @@ -22,7 +22,8 @@ """test Enum type.""" import sys -sys.path.insert(0, sys.path[0] + '/..') +from os import path +sys.path.insert(0, path.abspath(path.join(path.dirname(__file__), '..'))) # no fixtures needed import pytest diff --git a/test/test_modules.py b/test/test_modules.py index da83093..271680a 100644 --- a/test/test_modules.py +++ b/test/test_modules.py @@ -23,7 +23,8 @@ from __future__ import print_function import sys -sys.path.insert(0, sys.path[0] + '/..') +from os import path +sys.path.insert(0, path.abspath(path.join(path.dirname(__file__), '..'))) # no fixtures needed import pytest @@ -50,7 +51,11 @@ def test_Communicator(): announce_update = lambda self, m, pn, pv: print('%s:%s=%r' % (m.name, pn, pv)), ))() - o = Communicator(logger, {}, 'o1', dispatcher) + srv = type('ServerStub', (object,), dict( + dispatcher = dispatcher, + ))() + + o = Communicator('communicator',logger, {}, srv) o.early_init() o.init_module() q = queue.Queue() @@ -97,8 +102,12 @@ def test_ModuleMeta(): announce_update = lambda self, m, pn, pv: print('%s:%s=%r' % (m.name, pn, pv)), ))() - o1 = newclass(logger, {}, 'o1', dispatcher) - o2 = newclass(logger, {}, 'o1', dispatcher) + srv = type('ServerStub', (object,), dict( + dispatcher = dispatcher, + ))() + + o1 = newclass('o1', logger, {}, srv) + o2 = newclass('o2', logger, {}, srv) params_found= set() ctr_found = set() for obj in [o1, o2]: diff --git a/test/test_params.py b/test/test_params.py index 8b9a3bb..0136018 100644 --- a/test/test_params.py +++ b/test/test_params.py @@ -22,7 +22,8 @@ """test data types.""" import sys -sys.path.insert(0, sys.path[0] + '/..') +from os import path +sys.path.insert(0, path.abspath(path.join(path.dirname(__file__), '..'))) # no fixtures needed import pytest diff --git a/test/test_parse.py b/test/test_parse.py index 702b835..cb2a84b 100644 --- a/test/test_parse.py +++ b/test/test_parse.py @@ -22,7 +22,8 @@ """test data types.""" import sys -sys.path.insert(0, sys.path[0] + '/..') +from os import path +sys.path.insert(0, path.abspath(path.join(path.dirname(__file__), '..'))) from collections import OrderedDict