pmsco-public/pmsco/dispatch.py

1279 lines
45 KiB
Python

"""
@package pmsco.dispatch
calculation dispatcher.
@author Matthias Muntwiler
@copyright (c) 2015 by Paul Scherrer Institut @n
Licensed under the Apache License, Version 2.0 (the "License"); @n
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import os.path
import datetime
import signal
import collections
import copy
import logging
from attrdict import AttrDict
from mpi4py import MPI
from pmsco.helpers import BraceMessage as BMsg
logger = logging.getLogger(__name__)
# messages sent from master to slaves
## master sends new assignment
## the message is a dictionary of model parameters
TAG_NEW_TASK = 1
## master calls end of calculation
## the message is empty
TAG_FINISH = 2
# messages sent from slaves to master
## slave reports new result
## the message is a dictionary of model parameters and results
TAG_NEW_RESULT = 1
## slave confirms end of calculation
## currently not used
TAG_FINISHED = 2
## slave has encountered an error, result is invalid
## the message contains the original task message
TAG_INVALID_RESULT = 3
## slave has encountered an error and is aborting
## the message is empty
TAG_ERROR_ABORTING = 4
## levels of calculation tasks
#
CALC_LEVELS = ('model', 'scan', 'sym', 'emit', 'region')
## intermediate sub-class of CalcID
#
# this class should not be instantiated.
# instead, use CalcID which provides some useful helper methods.
#
_CalcID = collections.namedtuple('_CalcID', CALC_LEVELS)
class CalcID(_CalcID):
"""
named tuple class to uniquely identify a calculation task.
this is a 5-tuple of indices, one index per task level.
a positive index refers to a specific instance in the task hierarchy.
the ID is defined as a named tuple so that it can be used as key of a dictionary.
cf. @ref CalculationTask for further detail.
compared to a plain named tuple, the CalcID class provides additional helper methods and properties.
example constructor: CalcID(1, 2, 3, 4, 5).
"""
@property
def levels(self):
"""
level names.
this property returns the defined level names in a tuple.
this is the same as @ref CALC_LEVELS.
@return: tuple of level names
"""
return self._fields
@property
def level(self):
"""
specific level of a task, dictionary key form.
this corresponds to the name of the last positive component.
@return: attribute name corresponding to the task level.
empty string if all members are negative (the root task).
"""
for k in reversed(self._fields):
if self.__getattribute__(k) >= 0:
return k
return ''
@property
def numeric_level(self):
"""
specific level of a task, numeric form.
this corresponds to the last positive value in the sequence of indices.
@return: index corresponding to the significant task level component of the id.
the value ranges from -1 to len(CalcID) - 1.
it is -1 if all indices are negative (root task).
"""
for k in reversed(range(len(self))):
if self[k] >= 0:
return k
return -1
def collapse_levels(self, level):
"""
return a new CalcID that is collapsed at a specific level.
the method returns a new CalcID object where the indices below the given level are -1 (undefined).
this can be seen as collapsing the tree at the specified node (level).
@note because a CalcID is immutable, this method returns a new instance.
@param level: level at which to collapse.
the index at this level remains unchanged, lower ones are set to -1.
the level can be specified by attribute name (str) or numeric index (-1..4).
@raise ValueError if level is not numeric and not in CALC_LEVELS.
@return: new CalcID instance.
"""
try:
level = int(level)
except ValueError:
level = CALC_LEVELS.index(level)
assert -1 <= level < len(CALC_LEVELS)
mask = {l: -1 for (i, l) in enumerate(CALC_LEVELS) if i > level}
return self._replace(**mask)
class CalculationTask(object):
"""
identifies a calculation task by index and model parameters.
given an object of this class, the project must be able to:
* produce calculation parameters,
* produce a cluster,
* gather results.
a calculation task is identified by:
@arg @c id.model structure number or iteration (handled by the mode module)
@arg @c id.scan scan number (handled by the project)
@arg @c id.sym symmetry number (handled by the project)
@arg @c id.emit emitter number (handled by the project)
@arg @c id.region region number (handled by the region handler)
specified members must be greater or equal to zero.
-1 is the wildcard which is used in parent tasks,
where, e.g., no specific symmetry is chosen.
the root task has the ID (-1, -1, -1, -1, -1).
"""
## @var id (CalcID)
# named tuple CalcID containing the 5-part calculation task identifier.
## @var parent_id (CalcID)
# named tuple CalcID containing the task identifier of the parent task.
## @var model (dict)
# dictionary containing the model parameters of the task.
#
# this is typically initialized to the parameters of the parent task,
# and varied at the level where the task ID was produced.
## @var file_root (string)
# file name without extension and index.
## @var file_ext (string)
# file name extension including dot.
#
# the extension is set by the scattering code interface.
# it must be passed back up the hierarchy.
## @var result_filename (string)
# name of the ETPI or ETPAI file that contains the result (intensity) data.
#
# this member is filled at the end of the calculation by MscoProcess.calc().
# the filename can be constructed given the base name, task ID, and extension.
# since this may be tedious, the filename must be returned here.
## @var modf_filename (string)
# name of the ETPI or ETPAI file that contains the resulting modulation function.
## @var result_valid (bool)
# validity status of the result file @ref result_filename.
#
# if True, the file must exist and contain valid data according to the task specification.
# the value is set True when a calculation task completes successfully.
# it may be reset later to invalidate the data if an error occurs during processing.
#
# validity of a parent task requires validity of all child tasks.
## @var rfac (float)
# r-factor value of the task result.
#
# the rfac field is written by @ref pmsco.project.Project.evaluate_result.
# the initial value is Not A Number.
## @var time (timedelta)
# execution time of the task.
#
# execution time is measured as wall time of a single calculation.
# in parent tasks, execution time is the sum of the children's execution time.
#
# this information may be used to plan the end of the program run or for statistics.
## @var files (dict)
# files generated by the task and their category
#
# dictionary key is the file name,
# value is the file category, e.g. 'cluster', 'atomic', etc.
#
# this information is used to automatically clean up unnecessary data files.
## @var region (dict)
# scan positions to substitute the ones from the original scan.
#
# this is used to distribute scans over multiple calculator processes,
# cf. e.g. @ref pmsco.handlers.EnergyRegionHandler.
#
# dictionary key must be the scan dimension 'e', 't', 'p', 'a'.
# the value is a numpy.ndarray containing the scan positions.
#
# the dictionary can be empty if the original scan shall be calculated at once.
def __init__(self):
"""
create a new calculation task instance with all members equal to zero (root task).
"""
self.id = CalcID(-1, -1, -1, -1, -1)
self.parent_id = self.id
self.model = {}
self.file_root = ""
self.file_ext = ""
self.result_filename = ""
self.modf_filename = ""
self.result_valid = False
self.time = datetime.timedelta()
self.files = {}
self.region = {}
self.rfac = float('nan')
def __eq__(self, other):
"""
consider two tasks equal if they have the same ID.
EXPERIMENTAL
not clear whether this is a good idea.
we want this equality because the calculation may modify a task to return results.
yet, it should be considered the same task.
e.g., we want to find the task in the original task list.
"""
return isinstance(other, self.__class__) and self.id == other.id
def __hash__(self):
"""
the hash depends on the ID only.
"""
return hash(self.id)
def get_mpi_message(self):
"""
convert the task data to a format suitable for an MPI message.
mpi4py does not properly pickle objects.
we need to convert our data to basic types.
@return: (dict)
"""
msg = vars(self)
msg['id'] = self.id._asdict()
msg['parent_id'] = self.parent_id._asdict()
return msg
def set_mpi_message(self, msg):
"""
set object attributes from MPI message.
@param msg: message created by get_mpi_message()
@return: None
"""
if isinstance(msg['id'], dict):
msg['id'] = CalcID(**msg['id'])
if isinstance(msg['parent_id'], dict):
msg['parent_id'] = CalcID(**msg['parent_id'])
for k, v in msg.items():
self.__setattr__(k, v)
def format_filename(self, **overrides):
"""
format input or output file name including calculation index.
@param overrides optional keyword arguments override object fields.
the following keywords are handled: @c root, @c model, @c scan, @c sym, @c emit, @c region, @c ext.
@return a string consisting of the concatenation of the base name, the ID, and the extension.
"""
parts = self.id._asdict()
parts['root'] = self.file_root
parts['ext'] = self.file_ext
for key in overrides.keys():
parts[key] = overrides[key]
filename = "{root}_{model}_{scan}_{sym}_{emit}_{region}{ext}".format(**parts)
return filename
def copy(self):
"""
create a copy of the task.
@return: new independent CalculationTask with the same attributes as the original one.
"""
return copy.deepcopy(self)
def change_id(self, **kwargs):
"""
change the ID of the task.
@param kwargs: keyword arguments to change specific parts of the ID.
@note instead of changing all parts of the ID, you may simply assign a new CalcID to the id member.
"""
self.id = self.id._replace(**kwargs)
@property
def level(self):
"""
specific level of a task, dictionary key form.
this corresponds to the name of the last positive component of self.id.
@return: attribute name corresponding to the task level.
empty string for the root task.
"""
return self.id.level
@property
def numeric_level(self):
"""
specific level of a task, numeric form.
this corresponds to the index of the last positive component of self.id.
@return: index corresponding to the significant task level component of the id.
-1 for the root task.
"""
return self.id.numeric_level
def add_task_file(self, name, category):
"""
register a file that was generated by the calculation task.
this information is used to automatically clean up unnecessary data files.
@param name: file name (optionally including a path).
@param category: file category, e.g. 'cluster', 'atomic', etc.
@return: None
"""
self.files[name] = category
def rename_task_file(self, old_filename, new_filename):
"""
rename a file.
update the file list after a file was renamed.
the method silently ignores if old_filename is not listed.
@param old_filename: old file name
@param new_filename: new file name
@return: None
"""
try:
self.files[new_filename] = self.files[old_filename]
del self.files[old_filename]
except KeyError:
logger.warning("CalculationTask.rename_task_file: could not rename file {0} to {1}".format(old_filename,
new_filename))
def remove_task_file(self, filename):
"""
remove a file from the list of generated data files.
the method silently ignores if filename is not listed.
the method removes the file from the internal list.
it does not delete the file.
@param filename: file name
@return: None
"""
try:
del self.files[filename]
except KeyError:
logger.warning("CalculationTask.remove_task_file: could not remove file {0}".format(filename))
class CachedCalculationMethod(object):
"""
decorator to cache results of expensive calculation functions.
this decorator can be used to transparently cache any expensive calculation result
that depends in a deterministic way on the calculation index.
for example, each cluster gets a unique calculation index.
if a cluster needs to be calculated repeatedly, it may be more efficient to cache it.
the method to decorate must have the following signature:
result = func(self, model, index).
the index (neglecting emitter and region) identifies the result (completely and uniquely).
the number of cached results is limited by the ttl (time to live) attribute.
the items' ttl values are decreased each time a requested calculation is not found in the cache (miss).
on a cache hit, the corresponding item's ttl is reset.
the target ttl (time to live) can be specified as an optional parameter of the decorator.
time increases with every cache miss.
"""
## @var _cache (dict)
#
# key = calculation index,
# value = function result
## @var _ttl (dict)
#
# key = calculation index,
# value (int) = remaining time to live
# where time is the number of subsequent cache misses.
## @var ttl (int)
#
# target time to live of cache items.
# time is given by the number cache misses.
def __init__(self, ttl=10):
super(CachedCalculationMethod, self).__init__()
self._cache = {}
self._ttl = {}
self.ttl = ttl
def __call__(self, func):
def wrapped_func(inst, model, index):
# note: _replace returns a new instance of the namedtuple
index = index._replace(emit=-1, region=-1)
cache_index = (id(inst), index.model, index.scan, index.sym)
try:
result = self._cache[cache_index]
except KeyError:
result = func(inst, model, index)
self._expire()
self._cache[cache_index] = result
self._ttl[cache_index] = self.ttl
return result
return wrapped_func
def _expire(self):
"""
decrease the remaining ttl of cache items and delete items whose ttl has fallen below 0.
@return: None
"""
for index in self._ttl:
self._ttl[index] -= 1
old_items = [index for index in self._ttl if self._ttl[index] < 0]
for index in old_items:
del self._ttl[index]
del self._cache[index]
class MscoProcess(object):
"""
code shared by MscoMaster and MscoSlave.
mainly passing project parameters, handling OS signals,
calling an MSC calculation.
"""
## @var _finishing
# if True, the task loop should not accept new tasks.
#
# the loop still waits for the results of running calculations.
## @var _running
# while True, the task loop keeps running.
#
# if False, the loop will exit just before the next iteration.
# pending tasks and running calculations will not be waited for.
#
# @attention maks sure that all calculations are finished before resetting this flag.
# higher ranked processes may not exit if they do not receive the finish message.
## @var datetime_limit (datetime.datetime)
# date and time when the calculations should finish (regardless of result)
# because the process may get killed by the scheduler after this time.
#
# the default is 2 days after start.
def __init__(self, comm):
self._comm = comm
self._project = None
self._atomic_scattering = None
self._multiple_scattering = None
self._running = False
self._finishing = False
self.stop_signal = False
self.datetime_limit = datetime.datetime.now() + datetime.timedelta(days=2)
def setup(self, project):
self._project = project
self._atomic_scattering = project.atomic_scattering_factory()
self._multiple_scattering = project.multiple_scattering_factory()
self._running = False
self._finishing = False
self.stop_signal = False
try:
# signal handlers
signal.signal(signal.SIGTERM, self.receive_signal)
signal.signal(signal.SIGUSR1, self.receive_signal)
signal.signal(signal.SIGUSR2, self.receive_signal)
except AttributeError:
pass
except ValueError:
pass
if project.timedelta_limit:
self.datetime_limit = datetime.datetime.now() + project.timedelta_limit
# noinspection PyUnusedLocal
def receive_signal(self, signum, stack):
"""
sets the self.stop_signal flag,
which will terminate the optimization process
as soon as all slaves have finished their calculation.
"""
self.stop_signal = True
def run(self):
pass
def cleanup(self):
"""
clean up after all calculations.
@return: None
"""
pass
def calc(self, task):
"""
execute a single calculation.
* create the cluster and parameter objects.
* export the cluster for reference.
* choose the scan file.
* specify the output file name.
* call the calculation program.
* set task.result_filename, task.file_ext, task.time.
the function checks for some obvious errors, and skips the calculation if an error is detected, such as:
* missing atoms or emitters in the cluster.
@param task (CalculationTask) calculation task and identifier.
"""
s_model = str(task.model)
s_id = str(task.id)
logger.info("calling calculation %s", s_id)
logger.info("model %s", s_model)
start_time = datetime.datetime.now()
clu = self._create_cluster(task)
par = self._create_params(task)
scan = self._define_scan(task)
output_file = task.format_filename(ext="")
# check parameters and call the calculators
if clu.get_atom_count() >= 1:
self._calc_atomic(task, par, clu, scan, output_file)
else:
logger.error("empty cluster in calculation %s", s_id)
task.result_valid = False
if clu.get_emitter_count() >= 1:
self._calc_multiple(task, par, clu, scan, output_file)
else:
logger.error("no emitters in cluster of calculation %s.", s_id)
task.result_valid = False
task.time = datetime.datetime.now() - start_time
return task
def _define_scan(self, task):
"""
define the scan range.
@param task: CalculationTask with all attributes set for the calculation.
@return: pmsco.project.Scan object for the calculator.
"""
scan = self._project.scans[task.id.scan]
if task.region:
scan = scan.copy()
try:
scan.energies = task.region['e']
logger.debug(BMsg("substitute energy region"))
except KeyError:
pass
try:
scan.thetas = task.region['t']
logger.debug(BMsg("substitute theta region"))
except KeyError:
pass
try:
scan.phis = task.region['p']
logger.debug(BMsg("substitute phi region"))
except KeyError:
pass
try:
scan.alphas = task.region['a']
logger.debug(BMsg("substitute alpha region"))
except KeyError:
pass
return scan
def _create_cluster(self, task):
"""
generate the cluster for the given calculation task.
cluster generation is delegated to the project's cluster_generator object.
if the current task has region == 0,
the method also exports diagnostic clusters via the project's export_cluster() method.
the file name is formatted with the given task index except that region is -1.
if (in addition to region == 0) the current task has emit == 0 and cluster includes multiple emitters,
the method also exports the master cluster and full emitter list.
the file name is formatted with the given task index except that emitter and region are -1.
@param task: CalculationTask with all attributes set for the calculation.
@return: pmsco.cluster.Cluster object for the calculator.
"""
nem = self._project.cluster_generator.count_emitters(task.model, task.id)
clu = self._project.cluster_generator.create_cluster(task.model, task.id)
# overwrite atom classes only if they are at their default value
clu.init_atomclasses(field_or_value='t', default_only=True)
if task.id.region == 0:
file_index = task.id._replace(region=-1)
filename = task.format_filename(region=-1)
files = self._project.export_cluster(file_index, filename, clu)
task.files.update(files)
# master cluster
if nem > 1 and task.id.emit == 0:
master_index = task.id._replace(emit=-1, region=-1)
filename = task.format_filename(emit=-1, region=-1)
master_cluster = self._project.cluster_generator.create_cluster(task.model, master_index)
files = self._project.export_cluster(master_index, filename, master_cluster)
task.files.update(files)
return clu
def _create_params(self, task):
"""
generate the parameters list.
parameters generation is delegated to the project's create_params method.
@param task: CalculationTask with all attributes set for the calculation.
@return: pmsco.project.Params object for the calculator.
"""
par = self._project.create_params(task.model, task.id)
return par
def _calc_atomic(self, task, par, clu, scan, output_file):
"""
calculate the atomic scattering factors if necessary and link them to the cluster.
the method first calls the `before_atomic_scattering` project hook,
the atomic scattering calculator,
and finally the `after_atomic_scattering` hook.
this process updates the par and clu objects to link to the created files.
if any of the functions returns None, the par and clu objects are left unchanged.
@param task: CalculationTask with all attributes set for the calculation.
@param par: pmsco.project.Params object for the calculator.
its phase_files attribute is updated with the created scattering files.
the radial matrix elements are not changed (but may be in a future version).
@param clu: pmsco.cluster.Cluster object for the calculator.
the cluster is overwritten with the one returned by the calculator,
so that atom classes match the phase_files.
@return: None
"""
_par = copy.deepcopy(par)
_clu = copy.deepcopy(clu)
_par, _clu = self._project.before_atomic_scattering(task, _par, _clu)
if _clu is not None:
filename, files = self._atomic_scattering.run(_par, _clu, scan, output_file)
if files:
task.files.update(files)
_par, _clu = self._project.after_atomic_scattering(task, _par, _clu)
if _clu is not None:
par.phase_files = _par.phase_files
clu.copy_from(_clu)
def _calc_multiple(self, task, par, clu, scan, output_file):
"""
calculate the multiple scattering intensity.
@param task: CalculationTask with all attributes set for the calculation.
@param par: pmsco.project.Params object for the calculator.
@param clu: pmsco.cluster.Cluster object for the calculator.
@return: None
"""
task.result_filename, files = self._multiple_scattering.run(par, clu, scan, output_file)
if task.result_filename:
(root, ext) = os.path.splitext(task.result_filename)
task.file_ext = ext
task.result_valid = True
if files:
task.files.update(files)
class MscoMaster(MscoProcess):
"""
MscoMaster process for MSC calculations.
This class implements the main loop of the master (rank 0) process.
It sends calculation commands to the slaves, and dispatches the results
to the appropriate post-processing modules.
if there is only one process, the MscoMaster executes the calculations sequentially.
"""
## @var _pending_tasks (OrderedDict)
# CalculationTask objects of pending calculations.
# the dictionary keys are the task IDs.
## @var _running_tasks
# CalculationTask objects of currently running calculations.
# the dictionary keys are the task IDs.
## @var _complete_tasks
# CalculationTask objects of complete calculations.
#
# calculations are removed from the list when they are passed to the result handlers.
# the dictionary keys are the task IDs.
## @var _slaves
# total number of MPI slave ranks = number of calculator slots
## @var _idle_ranks
# list of ranks which are waiting to receive a task.
#
# list of int, default = []
## @var max_calculations
# maximum number of calculations
#
# if this limit is exceeded, the optimization will stop.
# the limit is meant to catch irregular situations such as run-time calculation errors or infinite loops.
## @var _calculations
# number of dispatched calculations
#
# if this number exceeds the @ref max_calculations, the optimization will stop.
## @var _running_slaves
# number of running slave ranks
#
# keeps track of active (idle or busy) slave ranks.
# it is used to make sure (if possible) that all slave tasks have finished before the master quits.
# the number is decremented when a slave quits due to an error or when the master sends a finish message.
## @var _min_queue_len
# if the queue length drops below this number, the dispatcher asks for the next round of tasks.
## @var _model_done
# (bool) True if the model handler did returned an empty list of new tasks.
## @var _root_task
# (CalculationTask) root calculation task
#
# this is the root of the calculation tasks tree.
# it defines the initial model and the output file name.
# it is passed to the model handler during the main loop.
## @var task_handlers
# (AttrDict) dictionary of task handler objects
#
# the keys are the task levels 'model', 'scan', 'sym', 'emit' and 'region'.
# the values are handlers.TaskHandler objects.
# the objects can be accessed in attribute or dictionary notation.
def __init__(self, comm):
super(MscoMaster, self).__init__(comm)
self._pending_tasks = collections.OrderedDict()
self._running_tasks = collections.OrderedDict()
self._complete_tasks = collections.OrderedDict()
self._slaves = self._comm.Get_size() - 1
self._idle_ranks = []
self.max_calculations = 1000000
self._calculations = 0
self._running_slaves = 0
self._model_done = False
self._min_queue_len = self._slaves + 1
self._root_task = None
self.task_levels = list(CalcID._fields)
self.task_handlers = AttrDict()
def setup(self, project):
"""
initialize the process, handlers, root task, slave counting.
this method initializes the run-time attributes of the master process,
particularly the attributes that depend on the project.
it creates the root calculation task with the initial model defined by the project.
it creates and initializes the task handler objects according to the handler classes defined by the project.
the method notifies the handlers of the number of available slave processes (slots).
some of the tasks handlers adjust their branching according to the number of slots.
this mechanism may be used to balance the load between the task levels.
however, the current implementation is very coarse in this respect.
it advertises all slots to the model handler but a reduced number to the remaining handlers
depending on the operation mode.
the region handler receives a maximum of 4 slots except in single calculation mode.
in single calculation mode, all slots can be used by all handlers.
"""
super(MscoMaster, self).setup(project)
logger.debug("master entering setup")
self._running_slaves = self._slaves
self._idle_ranks = list(range(1, self._running_slaves + 1))
self._root_task = CalculationTask()
self._root_task.file_root = project.output_file
self._root_task.model = project.create_domain().start
for level in self.task_levels:
self.task_handlers[level] = project.handler_classes[level]()
self.task_handlers.model.datetime_limit = self.datetime_limit
slaves_adj = max(self._slaves, 1)
self.task_handlers.model.setup(project, slaves_adj)
if project.mode != "single":
slaves_adj = max(slaves_adj / 2, 1)
self.task_handlers.scan.setup(project, slaves_adj)
self.task_handlers.sym.setup(project, slaves_adj)
self.task_handlers.emit.setup(project, slaves_adj)
if project.mode != "single":
slaves_adj = min(slaves_adj, 4)
self.task_handlers.region.setup(project, slaves_adj)
project.setup(self.task_handlers)
def run(self):
"""
main loop.
calls slaves, accept and dispatches results.
setup() must be called before, cleanup() after.
"""
self._running = True
self._calculations = 0
logger.debug("master entering main loop")
# main task loop
while self._running:
logger.debug("new iteration of master main loop")
self._create_tasks()
self._dispatch_results()
if self._finishing:
self._dispatch_finish()
else:
self._dispatch_tasks()
self._receive_result()
self._check_finish()
logger.debug("master exiting main loop")
self._running = False
self._save_report()
def cleanup(self):
logger.debug("master entering cleanup")
for level in reversed(self.task_levels):
self.task_handlers[level].cleanup()
self._project.cleanup()
super(MscoMaster, self).cleanup()
def _dispatch_results(self):
"""
pass results through the post-processing modules.
"""
logger.debug("dispatching results of %u tasks", len(self._complete_tasks))
while self._complete_tasks:
__, task = self._complete_tasks.popitem(last=False)
self._dispatch_result(task)
def _dispatch_result(self, task):
"""
pass a result through the post-processing modules.
@param task: a CalculationTask object.
@return None
"""
level = task.level
if level:
logger.debug(BMsg("passing task {task} to {level} handler", task=str(task.id), level=level))
task = self.task_handlers[level].add_result(task)
if task:
self._dispatch_result(task)
else:
self._finishing = True
logger.debug(BMsg("root task {task} complete", task=str(task.id)))
def _create_tasks(self):
"""
have the model handler generate the next round of top-level calculation tasks.
the method calls the model handler repeatedly
until the pending tasks queue is filled up
to more than the minimum queue length.
@return: None
"""
logger.debug("creating new tasks from root")
while len(self._pending_tasks) < self._min_queue_len:
tasks = self.task_handlers.model.create_tasks(self._root_task)
logger.debug("model handler returned %u new tasks", len(tasks))
if not tasks:
self._model_done = True
break
for task in tasks:
self.add_model_task(task)
def _dispatch_tasks(self):
"""
send pending tasks to available slaves or master.
if there is only one process, the master executes one task, and returns.
"""
logger.debug("dispatching tasks to calculators")
if self._slaves > 0:
while not self._finishing:
try:
rank = self._idle_ranks.pop(0)
except IndexError:
break
try:
__, task = self._pending_tasks.popitem(last=False)
except KeyError:
self._idle_ranks.append(rank)
break
else:
logger.debug("assigning task %s to rank %u", str(task.id), rank)
self._running_tasks[task.id] = task
self._comm.send(task.get_mpi_message(), dest=rank, tag=TAG_NEW_TASK)
self._calculations += 1
else:
if not self._finishing:
try:
__, task = self._pending_tasks.popitem(last=False)
except KeyError:
pass
else:
logger.debug("executing task %s in master process", str(task.id))
self.calc(task)
self._calculations += 1
self._complete_tasks[task.id] = task
def _dispatch_finish(self):
"""
send all slave ranks a finish message.
"""
logger.debug("dispatch finish message to %u slaves", len(self._idle_ranks))
while self._idle_ranks:
rank = self._idle_ranks.pop()
logger.debug("send finish tag to rank %u", rank)
self._comm.send(None, dest=rank, tag=TAG_FINISH)
self._running_slaves -= 1
def _receive_result(self):
"""
wait for a message from another rank and process it.
"""
if self._running_slaves > 0:
logger.debug("waiting for calculation result")
s = MPI.Status()
data = self._comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=s)
if s.tag == TAG_NEW_RESULT:
task_id = self._accept_task_done(data)
self._idle_ranks.append(s.source)
logger.debug(BMsg("received result of task {0} from rank {1}", task_id, s.source))
elif s.tag == TAG_INVALID_RESULT:
task_id = self._accept_task_done(data)
self._idle_ranks.append(s.source)
logger.error(BMsg("received invalid result of task {0} from rank {1}", task_id, s.source))
elif s.tag == TAG_ERROR_ABORTING:
self._finishing = True
self._running_slaves -= 1
task_id = self._accept_task_done(data)
logger.error(BMsg("received abort signal from rank {1}", task_id, s.source))
def _accept_task_done(self, data):
"""
check the return message from a slave process and mark the task done.
if the message contains complete data of a running task, the corresponding CalculationTask object is returned.
@param data: a dictionary that can be imported into a CalculationTask object by the set_mpi_message() method.
@return: task ID (CalcID type) if the message contains the complete identification of a pending task,
None if the ID cannot be determined or is not in the list of running tasks.
"""
try:
task = CalculationTask()
task.set_mpi_message(data)
del self._running_tasks[task.id]
self._complete_tasks[task.id] = task
task_id = task.id
except (TypeError, IndexError, KeyError):
task_id = None
return task_id
def _check_finish(self):
"""
check whether the task loop is finished.
the task loop is finished on any of the following conditions:
* there are no pending or running tasks,
* a file named "finish_pmsco" exists in the working directory,
* a SIGUSR1, SIGUSR2, or SIGTERM signal was received,
* self.datetime_limit is exceeded, or
* self.max_calculations is exceeded.
self._finishing is set if any of these conditions is fulfilled.
self._running is reset if self._finishing is set and no calculation tasks are running.
@return: self._finishing
"""
if not self._finishing and (self._model_done and not self._pending_tasks and not self._running_tasks):
logger.warning("finish: model handler is done")
self._finishing = True
if not self._finishing and (self._calculations >= self.max_calculations):
logger.warning("finish: max. calculations (%u) exeeded", self.max_calculations)
self._finishing = True
if not self._finishing and self.stop_signal:
logger.warning("finish: stop signal received")
self._finishing = True
if not self._finishing and (datetime.datetime.now() > self.datetime_limit):
logger.warning("finish: time limit exceeded")
self._finishing = True
if not self._finishing and os.path.isfile("finish_pmsco"):
logger.warning("finish: finish_pmsco file detected")
self._finishing = True
if self._finishing and not self._running_slaves and not self._running_tasks:
logger.info("finish: all calculations finished")
self._running = False
return self._finishing
def _save_report(self):
"""
generate a final report.
this method is called at the end of the master loop.
it passes the call to @ref pmsco.handlers.ModelHandler.save_report.
@return: None
"""
self.task_handlers.model.save_report(self._root_task)
def add_model_task(self, task):
"""
add a new model task including all of its children to the task queue.
@param task (CalculationTask) task identifier and model parameters.
"""
scan_tasks = self.task_handlers.scan.create_tasks(task)
for scan_task in scan_tasks:
sym_tasks = self.task_handlers.sym.create_tasks(scan_task)
for sym_task in sym_tasks:
emitter_tasks = self.task_handlers.emit.create_tasks(sym_task)
for emitter_task in emitter_tasks:
region_tasks = self.task_handlers.region.create_tasks(emitter_task)
for region_task in region_tasks:
self._pending_tasks[region_task.id] = region_task
class MscoSlave(MscoProcess):
"""
MscoSlave process for MSC calculations.
This class implements the main loop of a slave (rank > 0) process.
It waits for assignments from the master process,
and runs one calculation after the other.
"""
## @var _errors
# number of errors (exceptions) encountered in calculation tasks.
#
# typically, a task is aborted when an exception is encountered.
def __init__(self, comm):
super(MscoSlave, self).__init__(comm)
self._errors = 0
self._max_errors = 5
def run(self):
"""
Waits for messages from the master and dispatches tasks.
"""
logger.debug("slave entering main loop")
s = MPI.Status()
self._running = True
while self._running:
logger.debug("waiting for message")
data = self._comm.recv(source=0, tag=MPI.ANY_TAG, status=s)
if s.tag == TAG_NEW_TASK:
logger.debug("received new task")
self.accept_task(data)
elif s.tag == TAG_FINISH:
logger.debug("received finish message")
self._running = False
logger.debug("slave exiting main loop")
def accept_task(self, data):
"""
Executes a calculation task and returns the result to the master.
if a recoverable exception (math, value and key errors) occurs,
the method catches the exception but sends a failure message to the master.
if exceptions occur repeatedly, the slave aborts and sends an abort message to the master.
@param data: task message received from MPI.
"""
task = CalculationTask()
task.set_mpi_message(data)
logger.debug(BMsg("executing task {0} in slave process", task.id))
try:
result = self.calc(task)
self._errors = 0
except (ValueError, ArithmeticError, LookupError):
logger.exception(BMsg("unhandled exception in calculation task {0}", task.id))
self._errors += 1
if self._errors <= self._max_errors:
self._comm.send(data, dest=0, tag=TAG_INVALID_RESULT)
else:
logger.error("too many exceptions, aborting")
self._running = False
self._comm.send(data, dest=0, tag=TAG_ERROR_ABORTING)
else:
logger.debug(BMsg("sending result of task {0} to master", result.id))
self._comm.send(result.get_mpi_message(), dest=0, tag=TAG_NEW_RESULT)
def run_master(mpi_comm, project):
"""
initialize and run the master calculation loop.
a MscoMaster object is created.
the MscoMaster executes the calculation loop and dispatches the tasks.
this function must be called in the MPI rank 0 process only.
if an unhandled exception occurs, this function aborts the MPI communicator, killing all MPI processes.
the caller will not have a chance to handle the exception.
@param mpi_comm: MPI communicator (mpi4py.MPI.COMM_WORLD).
@param project: project instance (sub-class of project.Project).
"""
try:
master = MscoMaster(mpi_comm)
master.setup(project)
master.run()
master.cleanup()
except (SystemExit, KeyboardInterrupt):
mpi_comm.Abort()
raise
except Exception:
logger.exception("unhandled exception in master calculation loop.")
mpi_comm.Abort()
raise
def run_slave(mpi_comm, project):
"""
initialize and run the slave calculation loop.
a MscoSlave object is created.
the MscoSlave accepts tasks from rank 0 and runs the calculations.
this function must be called in MPI rank > 0 processes.
if an unhandled exception occurs, the slave process terminates.
unless it is a SystemExit or KeyboardInterrupt (where we expect that the master also receives the signal),
the MPI communicator is aborted, killing all MPI processes.
@param mpi_comm: MPI communicator (mpi4py.MPI.COMM_WORLD).
@param project: project instance (sub-class of project.Project).
"""
try:
slave = MscoSlave(mpi_comm)
slave.setup(project)
slave.run()
slave.cleanup()
except (SystemExit, KeyboardInterrupt):
raise
except Exception:
logger.exception("unhandled exception in slave calculation loop.")
mpi_comm.Abort()
raise
def run_calculations(project):
"""
initialize and run the main calculation loop.
depending on the MPI rank, the function branches into run_master() (rank 0) or run_slave() (rank > 0).
@param project: project instance (sub-class of project.Project).
"""
mpi_comm = MPI.COMM_WORLD
mpi_rank = mpi_comm.Get_rank()
if mpi_rank == 0:
logger.debug("MPI rank %u setting up master loop", mpi_rank)
run_master(mpi_comm, project)
else:
logger.debug("MPI rank %u setting up slave loop", mpi_rank)
run_slave(mpi_comm, project)