pmsco-public/pmsco/handlers.py

1029 lines
40 KiB
Python

"""
@package pmsco.handlers
project-independent task handlers for models, scans, domains, emitters and energies.
calculation tasks are organized in a hierarchical tree.
at each node, a task handler (feel free to find a better name)
creates a set of child tasks according to the optimization mode and requirements of the project.
at the end points of the tree, the tasks are ready to be sent to calculation program.
the handlers collect the results, and return one combined dataset per node.
the passing of tasks and results between handlers is managed by the processing loop.
<em>model handlers</em> define the model parameters used in calculations.
the parameters can be chosen according to user input, or according to a structural optimization algorithm.
a model handler class derives from the ModelHandler class.
the most simple one, SingleModelHandler, is implemented in this module.
it calculates the diffraction pattern of a single model with the start parameters given in the domain object.
the handlers of the structural optimizers are declared in separate modules.
<em>scan handlers</em> split a task into one child task per scan file.
scans are defined by the project.
the actual merging step from multiple scans into one result dataset is delegated to the project class.
<em>domain handlers</em> split a task into one child per domain.
domains are defined by the project.
the actual merging step from multiple domains into one result dataset is delegated to the project class.
<em>emitter handlers</em> split a task into one child per emitter configuration (inequivalent sets of emitting atoms).
emitter configurations are defined by the project.
the merging of calculation results of emitter configurations is delegated to the project class.
since emitters contribute incoherently to the diffraction pattern,
it should make no difference how the emitters are grouped and calculated.
code inspection and tests have shown that per-emitter results from EDAC can be simply added.
<em>energy handlers</em> may split a calculation task into multiple tasks
in order to take advantage of parallel processing.
while several classes of model handlers are available,
the default handlers for scans, domains, emitters and energies should be sufficient in most situations.
the scan and domain handlers call methods of the project class to invoke project-specific functionality.
@author Matthias Muntwiler, matthias.muntwiler@psi.ch
@copyright (c) 2015-21 by Paul Scherrer Institut @n
Licensed under the Apache License, Version 2.0 (the "License"); @n
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
"""
import datetime
from functools import reduce
import logging
import math
import numpy as np
import os
from pathlib import Path
from pmsco.compat import open
import pmsco.data as md
import pmsco.dispatch as dispatch
import pmsco.graphics.scan as mgs
from pmsco.helpers import BraceMessage as BMsg
logger = logging.getLogger(__name__)
class TaskHandler(object):
"""
common ancestor for task handlers.
this class defines the common interface of task handlers.
"""
## @var _project
# (Project) project instance.
## @var _slots
# (int) number of calculation slots (processes).
#
# for best efficiency the number of tasks generated should be greater or equal the number of slots.
# it should not exceed N times the number of slots, where N is a reasonably small number.
## @var _pending_tasks
# (dict) pending tasks by ID (created but not yet calculated).
#
# the dictionary keys are the task identifiers CalculationTask.id,
# the values are the corresponding CalculationTask objects.
## @var _complete_tasks
# (dict) complete tasks by ID (calculation finished, parent not yet complete).
#
# the dictionary keys are the task identifiers CalculationTask.id,
# the values are the corresponding CalculationTask objects.
## @var _parent_tasks
# (dict) pending parent tasks by ID.
#
# the dictionary keys are the task identifiers CalculationTask.id,
# the values are the corresponding CalculationTask objects.
## @var _invalid_count (int)
# accumulated total number of invalid results received.
#
# the number is incremented by add_result if an invalid task is reported.
# the number can be used by descendants to terminate a hopeless calculation.
def __init__(self):
self._project = None
self._slots = 0
self._pending_tasks = {}
self._parent_tasks = {}
self._complete_tasks = {}
self._invalid_count = 0
def setup(self, project, slots):
"""
initialize the handler with project data and the process environment.
the method is called once by the dispatcher before the calculation loop starts.
the handler can initialize internal variables which it hasn't done in the constructor.
@param project (Project) project instance.
@param slots (int) number of calculation slots (processes).
for best efficiency the number of tasks generated should be greater or equal the number of slots.
it should not exceed N times the number of slots, where N is a reasonably small number.
@return (int) number of children that create_tasks() will generate on average.
the number does not need to be accurate, a rough estimate or order of magnitude if greater than 10 is fine.
it is used to distribute processing slots across task levels.
see pmsco.dispatch.MscoMaster.setup().
"""
self._project = project
self._slots = slots
return 1
def cleanup(self):
"""
clean up whatever is necessary, e.g. close files.
this method is called once after all calculations have finished.
@return None
"""
pass
def create_tasks(self, parent_task):
"""
create the next series of child tasks for the given parent task.
the method is called by the dispatcher when a new series of tasks should be generated.
when no more tasks are to be calculated, the method must return an empty list.
processing will finish when all pending and running tasks are complete.
@param parent_task (CalculationTask) task with initial model parameters.
@return list of CalculationTask objects holding the parameters for the next calculations.
the list must be empty if there are no more tasks.
"""
return []
def add_result(self, task):
"""
collect and combine the results of tasks created by the same handler.
this method collects the results of tasks that were created by self.create_tasks() and
passes them on to the parent whenever a family (i.e. all tasks that have the same parent) is complete.
when the family is complete, the method creates the data files that are represented by the parent task and
signals to the caller that the parent task is complete.
the method is called by the dispatcher whenever a calculation task belonging to this handler completes.
as of this class, the method counts invalid results and
adds the list of data files to the project's file tracker.
collecting the tasks and combining their data must be implemented in sub-classes.
@param task: (CalculationTask) calculation task that completed.
@return parent task (CalculationTask) if the family is complete,
None if the family is not complete yet.
As of this class, the method returns None.
"""
if not task.result_valid:
self._invalid_count += 1
self.track_files(task)
return None
def track_files(self, task):
"""
register all task files with the file tracker of the project.
@param task: CalculationTask object.
the id, model, and files attributes are required.
if model contains a '_rfac' value, the r-factor is
@return None
"""
model_id = task.id.model
for path, cat in task.files.items():
self._project.files.add_file(path, model_id, category=cat)
def cleanup_files(self, keep=0):
"""
delete uninteresting files.
@param keep: minimum number of models to keep.
0 (default): leave the decision to the project.
@return None
"""
self._project.cleanup_files(keep=keep)
class ModelHandler(TaskHandler):
"""
abstract model handler.
structural optimizers must be derived from this class and implement a loop on the model.
"""
## @var datetime_limit (datetime.datetime)
# date and time when the model handler should finish (regardless of result)
# because the process may get killed by the scheduler after this time.
#
# the default is 100 days after creation of the handler.
def __init__(self):
super(ModelHandler, self).__init__()
self.datetime_limit = datetime.datetime.now() + datetime.timedelta(days=100)
def create_tasks(self, parent_task):
"""
create tasks for the next population of models.
the method is called repeatedly by the dispatcher when the calculation queue runs empty.
the model should then create the next round of tasks, e.g. the next generation of a population.
the number of tasks created can be as low as one.
when no more tasks are to be calculated, the method must return an empty list.
processing will finish when all pending and running tasks are complete.
@note it is not possible to hold back calculations, or to wait for results.
the handler must either return a task, or signal the end of the optimization process.
@param parent_task (CalculationTask) task with initial model parameters.
@return list of CalculationTask objects holding the parameters for the next calculations.
the list must be empty if there are no more tasks.
"""
super(ModelHandler, self).create_tasks(parent_task)
return []
def add_result(self, task):
"""
collect and combine results of a scan.
this method is called by the dispatcher when all results for a scan are available.
"""
super(ModelHandler, self).add_result(task)
return None
def save_report(self, root_task):
"""
generate a final report of the optimization procedure.
detailed calculation results are usually saved as soon as they become available.
this method may be implemented in sub-classes to aggregate and summarize the results, generate plots, etc.
in this class, the method does nothing.
@note: implementations must add the path names of generated files to self._project.files.
@param root_task: (CalculationTask) task with initial model parameters.
@return: None
"""
pass
class SingleModelHandler(ModelHandler):
"""
single model calculation handler.
this class runs a single calculation on the start parameters defined in the domain of the project.
"""
def __init__(self):
super(SingleModelHandler, self).__init__()
self.result = {}
def create_tasks(self, parent_task):
"""
start one task with the start parameters.
subsequent calls will return an empty task list.
@param parent_task (CalculationTask) task with initial model parameters.
"""
super(SingleModelHandler, self).create_tasks(parent_task)
out_tasks = []
if len(self._complete_tasks) + len(self._pending_tasks) == 0:
parent_id = parent_task.id
self._parent_tasks[parent_id] = parent_task
new_task = parent_task.copy()
new_task.change_id(model=0)
new_task.parent_id = parent_id
child_id = new_task.id
self._pending_tasks[child_id] = new_task
out_tasks.append(new_task)
return out_tasks
def add_result(self, task):
"""
collect the end result of a single calculation.
the SingleModelHandler runs calculations for a single model.
this method assumes that it will be called just once.
it returns the parent task to signal the end of the calculations.
the result file is not deleted regardless of the files_to_delete project option.
the task ID is removed from the file name.
@param task: (CalculationTask) calculation task that completed.
@return (CalculationTask) parent task.
"""
super(SingleModelHandler, self).add_result(task)
self._complete_tasks[task.id] = task
del self._pending_tasks[task.id]
parent_task = self._parent_tasks[task.parent_id]
del self._parent_tasks[task.parent_id]
parent_task.result_valid = task.result_valid
parent_task.file_ext = task.file_ext
parent_task.result_filename = parent_task.file_root + parent_task.file_ext
modf_ext = ".modf" + parent_task.file_ext
parent_task.modf_filename = parent_task.file_root + modf_ext
self.result = task.model.copy()
self.result['_rfac'] = task.rfac
self._project.files.update_model_rfac(task.id.model, task.rfac)
self._project.files.set_model_complete(task.id.model, True)
parent_task.time = task.time
return parent_task
def save_report(self, root_task):
"""
save model parameters and r-factor to a file.
the file name is derived from the project's output_file with '.dat' extension.
the file has a space-separated column format.
the first line contains the parameter names.
this is the same format as used by the swarm and grid handlers.
@param root_task: (CalculationTask) the id.model attribute is used to register the generated files.
@return: None
"""
super(SingleModelHandler, self).save_report(root_task)
keys = [key for key in self.result]
keys.sort(key=lambda t: t[0].lower())
vals = (str(self.result[key]) for key in keys)
filename = Path(self._project.output_file).with_suffix(".dat")
with open(filename, "w") as outfile:
outfile.write("# ")
outfile.write(" ".join(keys))
outfile.write("\n")
outfile.write(" ".join(vals))
outfile.write("\n")
self._project.files.add_file(filename, root_task.id.model, "report")
class ScanHandler(TaskHandler):
"""
split the parameters into one set per scan and gather the results.
the scan selection takes effect in MscoProcess.calc().
"""
## @var _pending_ids_per_parent
# (dict) sets of child task IDs per parent
#
# each dictionary element is a set of IDs referring to pending calculation tasks (children)
# belonging to a parent task identified by the key.
#
# the dictionary keys are the task identifiers CalculationTask.id of the parent tasks,
# the values are sets of all child CalculationTask.id belonging to the parent.
## @var _complete_ids_per_parent
# (dict) sets of child task IDs per parent
#
# each dictionary element is a set of complete calculation tasks (children)
# belonging to a parent task identified by the key.
#
# the dictionary keys are the task identifiers CalculationTask.id of the parent tasks,
# the values are sets of all child CalculationTask.id belonging to the parent.
def __init__(self):
super(ScanHandler, self).__init__()
self._pending_ids_per_parent = {}
self._complete_ids_per_parent = {}
def setup(self, project, slots):
"""
initialize the scan task handler and save processed experimental scans.
@return (int) number of scans defined in the project.
"""
super(ScanHandler, self).setup(project, slots)
for (i_scan, scan) in enumerate(self._project.scans):
if scan.modulation is not None:
__, filename = os.path.split(scan.filename)
pre, ext = os.path.splitext(filename)
filename = "{pre}_{scan}.modf{ext}".format(pre=pre, ext=ext, scan=i_scan)
filepath = os.path.join(self._project.output_dir, filename)
md.save_data(filepath, scan.modulation)
mgs.render_scan(filepath, data=scan.modulation)
if project.combined_scan is not None:
ext = md.format_extension(project.combined_scan)
filename = Path(project.output_file).with_suffix(ext)
md.save_data(filename, project.combined_scan)
if project.combined_modf is not None:
ext = md.format_extension(project.combined_modf)
filename = Path(project.output_file).with_suffix(".modf" + ext)
md.save_data(filename, project.combined_modf)
return len(self._project.scans)
def create_tasks(self, parent_task):
"""
generate a calculation task for each scan of the given parent task.
all scans share the model parameters.
@return list of CalculationTask objects, with one element per scan.
the scan index varies according to project.scans.
"""
super(ScanHandler, self).create_tasks(parent_task)
parent_id = parent_task.id
self._parent_tasks[parent_id] = parent_task
assert parent_id not in self._pending_ids_per_parent.keys()
self._pending_ids_per_parent[parent_id] = set()
self._complete_ids_per_parent[parent_id] = set()
out_tasks = []
for (i_scan, scan) in enumerate(self._project.scans):
new_task = parent_task.copy()
new_task.parent_id = parent_id
new_task.change_id(scan=i_scan)
child_id = new_task.id
self._pending_tasks[child_id] = new_task
self._pending_ids_per_parent[parent_id].add(child_id)
out_tasks.append(new_task)
if not out_tasks:
logger.error("no scan tasks generated. your project must link to at least one scan file.")
return out_tasks
def add_result(self, task):
"""
collect and combine the calculation results versus scan.
* mark the task as complete
* store its result for later
* check whether this was the last pending task of the family (belonging to the same parent).
the actual merging of data is delegated to the project's combine_scans() method.
@param task: (CalculationTask) calculation task that completed.
@return parent task (CalculationTask) if the family is complete. None if the family is not complete yet.
"""
super(ScanHandler, self).add_result(task)
self._complete_tasks[task.id] = task
del self._pending_tasks[task.id]
family_pending = self._pending_ids_per_parent[task.parent_id]
family_complete = self._complete_ids_per_parent[task.parent_id]
family_pending.remove(task.id)
family_complete.add(task.id)
# all scans complete?
if len(family_pending) == 0:
parent_task = self._parent_tasks[task.parent_id]
parent_task.file_ext = task.file_ext
parent_task.result_filename = parent_task.format_filename()
modf_ext = ".modf" + parent_task.file_ext
parent_task.modf_filename = parent_task.format_filename(ext=modf_ext)
child_tasks = [self._complete_tasks[task_id] for task_id in sorted(family_complete)]
child_valid = [t.result_valid for t in child_tasks]
parent_task.result_valid = reduce(lambda a, b: a and b, child_valid)
child_times = [t.time for t in child_tasks]
parent_task.time = reduce(lambda a, b: a + b, child_times)
if parent_task.result_valid:
self._project.combine_scans(parent_task, child_tasks)
self._project.evaluate_result(parent_task, child_tasks)
self._project.files.add_file(parent_task.result_filename, parent_task.id.model, 'model')
self._project.files.add_file(parent_task.modf_filename, parent_task.id.model, 'model')
del self._pending_ids_per_parent[parent_task.id]
del self._complete_ids_per_parent[parent_task.id]
del self._parent_tasks[parent_task.id]
return parent_task
else:
return None
class DomainHandler(TaskHandler):
## @var _pending_ids_per_parent
# (dict) sets of child task IDs per parent
#
# each dictionary element is a set of IDs referring to pending calculation tasks (children)
# belonging to a parent task identified by the key.
#
# the dictionary keys are the task identifiers CalculationTask.id of the parent tasks,
# the values are sets of all child CalculationTask.id belonging to the parent.
## @var _complete_ids_per_parent
# (dict) sets of child task IDs per parent
#
# each dictionary element is a set of complete calculation tasks (children)
# belonging to a parent task identified by the key.
#
# the dictionary keys are the task identifiers CalculationTask.id of the parent tasks,
# the values are sets of all child CalculationTask.id belonging to the parent.
def __init__(self):
super(DomainHandler, self).__init__()
self._pending_ids_per_parent = {}
self._complete_ids_per_parent = {}
def setup(self, project, slots):
"""
initialize the domain task handler.
@return (int) number of domains defined in the project.
"""
super(DomainHandler, self).setup(project, slots)
return len(self._project.domains)
def create_tasks(self, parent_task):
"""
generate a calculation task for each domain of the given parent task.
all domains share the same model parameters.
@return list of CalculationTask objects, with one element per domain.
the domain index varies according to project.domains.
"""
super(DomainHandler, self).create_tasks(parent_task)
parent_id = parent_task.id
self._parent_tasks[parent_id] = parent_task
self._pending_ids_per_parent[parent_id] = set()
self._complete_ids_per_parent[parent_id] = set()
out_tasks = []
for (i_dom, domain) in enumerate(self._project.domains):
new_task = parent_task.copy()
new_task.parent_id = parent_id
new_task.change_id(domain=i_dom)
child_id = new_task.id
self._pending_tasks[child_id] = new_task
self._pending_ids_per_parent[parent_id].add(child_id)
out_tasks.append(new_task)
if not out_tasks:
logger.error("no domain tasks generated. your project must declare at least one domain.")
return out_tasks
def add_result(self, task):
"""
collect and combine the calculation results versus domain.
* mark the task as complete
* store its result for later
* check whether this was the last pending task of the family (belonging to the same parent).
the actual merging of data is delegated to the project's combine_domains() method.
@param task: (CalculationTask) calculation task that completed.
@return parent task (CalculationTask) if the family is complete. None if the family is not complete yet.
"""
super(DomainHandler, self).add_result(task)
self._complete_tasks[task.id] = task
del self._pending_tasks[task.id]
family_pending = self._pending_ids_per_parent[task.parent_id]
family_complete = self._complete_ids_per_parent[task.parent_id]
family_pending.remove(task.id)
family_complete.add(task.id)
# all domains complete?
if len(family_pending) == 0:
parent_task = self._parent_tasks[task.parent_id]
parent_task.file_ext = task.file_ext
parent_task.result_filename = parent_task.format_filename()
modf_ext = ".modf" + parent_task.file_ext
parent_task.modf_filename = parent_task.format_filename(ext=modf_ext)
child_tasks = [self._complete_tasks[task_id] for task_id in sorted(family_complete)]
child_valid = [t.result_valid for t in child_tasks]
parent_task.result_valid = reduce(lambda a, b: a and b, child_valid)
child_times = [t.time for t in child_tasks]
parent_task.time = reduce(lambda a, b: a + b, child_times)
if parent_task.result_valid:
self._project.combine_domains(parent_task, child_tasks)
self._project.evaluate_result(parent_task, child_tasks)
self._project.files.add_file(parent_task.result_filename, parent_task.id.model, 'scan')
self._project.files.add_file(parent_task.modf_filename, parent_task.id.model, 'scan')
graph_file = mgs.render_scan(parent_task.modf_filename,
ref_data=self._project.scans[parent_task.id.scan].modulation)
self._project.files.add_file(graph_file, parent_task.id.model, 'scan')
del self._pending_ids_per_parent[parent_task.id]
del self._complete_ids_per_parent[parent_task.id]
del self._parent_tasks[parent_task.id]
return parent_task
else:
return None
class EmitterHandler(TaskHandler):
"""
the emitter handler distributes emitter configurations to calculation tasks and collects their results.
"""
## @var _pending_ids_per_parent
# (dict) sets of child task IDs per parent
#
# each dictionary element is a set of IDs referring to pending calculation tasks (children)
# belonging to a parent task identified by the key.
#
# the dictionary keys are the task identifiers CalculationTask.id of the parent tasks,
# the values are sets of all child CalculationTask.id belonging to the parent.
## @var _complete_ids_per_parent
# (dict) sets of child task IDs per parent
#
# each dictionary element is a set of complete calculation tasks (children)
# belonging to a parent task identified by the key.
#
# the dictionary keys are the task identifiers CalculationTask.id of the parent tasks,
# the values are sets of all child CalculationTask.id belonging to the parent.
def __init__(self):
super(EmitterHandler, self).__init__()
self._pending_ids_per_parent = {}
self._complete_ids_per_parent = {}
def setup(self, project, slots):
"""
initialize the emitter task handler.
@return (int) estimated number of emitter configurations that the cluster generator will generate.
the estimate is based on the start parameters, scan 0 and domain 0.
"""
super(EmitterHandler, self).setup(project, slots)
mock_model = self._project.model_space.start
mock_index = dispatch.CalcID(-1, 0, 0, -1, -1)
n_emitters = project.cluster_generator.count_emitters(mock_model, mock_index)
return n_emitters
def create_tasks(self, parent_task):
"""
generate a calculation task for each emitter configuration of the given parent task.
all emitters share the same model parameters.
@return list of @ref pmsco.dispatch.CalculationTask objects with one element per emitter configuration
if parallel processing is enabled.
otherwise the list contains a single CalculationTask object with emitter index 0.
the emitter index is used by the project's create_cluster method.
"""
super(EmitterHandler, self).create_tasks(parent_task)
parent_id = parent_task.id
self._parent_tasks[parent_id] = parent_task
self._pending_ids_per_parent[parent_id] = set()
self._complete_ids_per_parent[parent_id] = set()
n_emitters = self._project.cluster_generator.count_emitters(parent_task.model, parent_task.id)
emitters = range(n_emitters)
out_tasks = []
for em in emitters:
new_task = parent_task.copy()
new_task.parent_id = parent_id
new_task.change_id(emit=em)
child_id = new_task.id
self._pending_tasks[child_id] = new_task
self._pending_ids_per_parent[parent_id].add(child_id)
out_tasks.append(new_task)
if not out_tasks:
logger.error("no emitter tasks generated. your project must declare at least one emitter configuration.")
return out_tasks
def add_result(self, task):
"""
collect and combine the calculation results of inequivalent emitters.
* mark the task as complete
* store its result for later
* check whether this was the last pending task of the family (belonging to the same parent).
the actual merging of data is delegated to the project's combine_emitters() method.
@param task: (CalculationTask) calculation task that completed.
@return parent task (CalculationTask) if the family is complete. None if the family is not complete yet.
"""
super(EmitterHandler, self).add_result(task)
self._complete_tasks[task.id] = task
del self._pending_tasks[task.id]
family_pending = self._pending_ids_per_parent[task.parent_id]
family_complete = self._complete_ids_per_parent[task.parent_id]
family_pending.remove(task.id)
family_complete.add(task.id)
# all emitters complete?
if len(family_pending) == 0:
parent_task = self._parent_tasks[task.parent_id]
parent_task.file_ext = task.file_ext
parent_task.result_filename = parent_task.format_filename()
modf_ext = ".modf" + parent_task.file_ext
parent_task.modf_filename = parent_task.format_filename(ext=modf_ext)
child_tasks = [self._complete_tasks[task_id] for task_id in sorted(family_complete)]
child_valid = [t.result_valid for t in child_tasks]
parent_task.result_valid = reduce(lambda a, b: a and b, child_valid)
child_times = [t.time for t in child_tasks]
parent_task.time = reduce(lambda a, b: a + b, child_times)
if parent_task.result_valid:
self._project.combine_emitters(parent_task, child_tasks)
self._project.evaluate_result(parent_task, child_tasks)
self._project.files.add_file(parent_task.result_filename, parent_task.id.model, 'domain')
self._project.files.add_file(parent_task.modf_filename, parent_task.id.model, 'domain')
graph_file = mgs.render_scan(parent_task.modf_filename,
ref_data=self._project.scans[parent_task.id.scan].modulation)
self._project.files.add_file(graph_file, parent_task.id.model, 'domain')
del self._pending_ids_per_parent[parent_task.id]
del self._complete_ids_per_parent[parent_task.id]
del self._parent_tasks[parent_task.id]
return parent_task
else:
return None
class RegionHandler(TaskHandler):
"""
region handlers split a scan into a number of regions that can be calculated in parallel.
this class is an abstract base class.
it implements only common code to combine different regions into one result.
"""
## @var _pending_ids_per_parent
# (dict) sets of child task IDs per parent
#
# each dictionary element is a set of IDs referring to pending calculation tasks (children)
# belonging to a parent task identified by the key.
#
# the dictionary keys are the task identifiers CalculationTask.id of the parent tasks,
# the values are sets of all child CalculationTask.id belonging to the parent.
## @var _complete_ids_per_parent
# (dict) sets of child task IDs per parent
#
# each dictionary element is a set of complete calculation tasks (children)
# belonging to a parent task identified by the key.
#
# the dictionary keys are the task identifiers CalculationTask.id of the parent tasks,
# the values are sets of all child CalculationTask.id belonging to the parent.
def __init__(self):
super(RegionHandler, self).__init__()
self._pending_ids_per_parent = {}
self._complete_ids_per_parent = {}
def add_result(self, task):
"""
gather results of all regions that belong to the same parent.
@param task: (CalculationTask) calculation task that completed.
@return parent task (CalculationTask) if the family is complete. None if the family is not complete yet.
"""
super(RegionHandler, self).add_result(task)
self._complete_tasks[task.id] = task
del self._pending_tasks[task.id]
family_pending = self._pending_ids_per_parent[task.parent_id]
family_complete = self._complete_ids_per_parent[task.parent_id]
family_pending.remove(task.id)
family_complete.add(task.id)
# all regions ready?
if len(family_pending) == 0:
parent_task = self._parent_tasks[task.parent_id]
parent_task.file_ext = task.file_ext
parent_task.result_filename = parent_task.format_filename()
modf_ext = ".modf" + parent_task.file_ext
parent_task.modf_filename = parent_task.format_filename(ext=modf_ext)
child_tasks = [self._complete_tasks[task_id] for task_id in sorted(family_complete)]
child_valid = [t.result_valid for t in child_tasks]
parent_task.result_valid = reduce(lambda a, b: a and b, child_valid)
child_times = [t.time for t in child_tasks]
parent_task.time = reduce(lambda a, b: a + b, child_times)
if parent_task.result_valid:
self._project.combine_regions(parent_task, child_tasks)
self._project.evaluate_result(parent_task, child_tasks)
self._project.files.add_file(parent_task.result_filename, parent_task.id.model, "emitter")
self._project.files.add_file(parent_task.modf_filename, parent_task.id.model, "emitter")
del self._pending_ids_per_parent[parent_task.id]
del self._complete_ids_per_parent[parent_task.id]
del self._parent_tasks[parent_task.id]
return parent_task
else:
return None
class SingleRegionHandler(RegionHandler):
"""
trivial region handler
this is a trivial region handler.
the whole parent task is identified as one region and calculated at once.
"""
def create_tasks(self, parent_task):
"""
generate one calculation task for the parent task.
@return list of CalculationTask objects, with one element per region.
the energy index enumerates the regions.
"""
super(SingleRegionHandler, self).create_tasks(parent_task)
parent_id = parent_task.id
self._parent_tasks[parent_id] = parent_task
self._pending_ids_per_parent[parent_id] = set()
self._complete_ids_per_parent[parent_id] = set()
new_task = parent_task.copy()
new_task.parent_id = parent_id
new_task.change_id(region=0)
child_id = new_task.id
self._pending_tasks[child_id] = new_task
self._pending_ids_per_parent[parent_id].add(child_id)
out_tasks = [new_task]
return out_tasks
class EnergyRegionHandler(RegionHandler):
"""
split a scan into a number of energy regions that can be run in parallel.
the purpose of this task handler is to save wall clock time on a multi-processor machine
by splitting energy scans into smaller chunks.
the handler distributes the processing slots to the scans proportional to their scan lengths
so that all child tasks of the same parent finish approximately in the same time.
pure angle scans are not split.
to use this feature, the project assigns this class to its @ref pmsco.project.Project.handler_classes['region'].
it is safe to use this handler for calculations that do not involve energy scans.
the handler is best used for single calculations.
in optimizations that calculate many models there is no advantage in using it
(on the contrary, the overhead increases the total run time slightly.)
"""
## @var _slots_per_scan
# (list of integers) number of processor slots assigned to each scan,
# i.e. number of chunks to split a scan region into.
#
# the sequence has the same order as self._project.scans.
def __init__(self):
super(EnergyRegionHandler, self).__init__()
self._slots_per_scan = []
def setup(self, project, slots):
"""
initialize the handler with project data and the process environment.
this function distributes the processing slots to the scans.
the slots are distributed proportional to the scan lengths of the energy scans
so that all chunks have approximately the same size.
the number of slots per scan is stored in @ref _slots_per_scan for later use by @ref create_tasks.
@param project (Project) project instance.
@param slots (int) number of calculation slots (processes).
@return (int) average number of child tasks
"""
super(EnergyRegionHandler, self).setup(project, slots)
scan_lengths = [scan.energies.shape[0] for scan in self._project.scans]
total_length = sum(scan_lengths)
f = min(1.0, float(self._slots) / total_length)
self._slots_per_scan = [max(1, int(round(l * f))) for l in scan_lengths]
for i, scan in enumerate(self._project.scans):
logger.debug(BMsg("region handler: split scan {file} into {slots} chunks",
file=os.path.basename(scan.filename), slots=self._slots_per_scan[i]))
return max(int(sum(self._slots_per_scan) / len(self._slots_per_scan)), 1)
def create_tasks(self, parent_task):
"""
generate a calculation task for each energy region of the given parent task.
all child tasks share the model parameters.
@return list of CalculationTask objects, with one element per region.
the energy index enumerates the regions.
"""
super(EnergyRegionHandler, self).create_tasks(parent_task)
parent_id = parent_task.id
self._parent_tasks[parent_id] = parent_task
self._pending_ids_per_parent[parent_id] = set()
self._complete_ids_per_parent[parent_id] = set()
energies = self._project.scans[parent_id.scan].energies
n_regions = self._slots_per_scan[parent_id.scan]
regions = np.array_split(energies, n_regions)
out_tasks = []
for ireg, reg in enumerate(regions):
new_task = parent_task.copy()
new_task.parent_id = parent_id
new_task.change_id(region=ireg)
if n_regions > 1:
new_task.region['e'] = reg
child_id = new_task.id
self._pending_tasks[child_id] = new_task
self._pending_ids_per_parent[parent_id].add(child_id)
out_tasks.append(new_task)
if not out_tasks:
logger.error("no region tasks generated. this is probably a bug.")
return out_tasks
def choose_region_handler_class(project):
"""
choose a suitable region handler for the project.
the function returns the EnergyRegionHandler class
if the project includes an energy scan with at least 10 steps.
Otherwise, it returns the SingleRegionHandler.
angle scans do not benefit from region splitting in EDAC.
@param project: Project instance.
@return: SingleRegionHandler or EnergyRegionHandler class.
"""
energy_scans = 0
for scan in project.scans:
if scan.energies.shape[0] >= 10:
energy_scans += 1
if energy_scans >= 1:
return EnergyRegionHandler
else:
return SingleRegionHandler