# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.
"""
The GenericMaster is the template class for all meta jobs
"""
import inspect
import textwrap
from functools import wraps
from typing import Union
from pyiron_snippets.deprecate import deprecate
from pyiron_base.interfaces.object import HasStorage
from pyiron_base.jobs.job.base import _doc_str_job_core_args
from pyiron_base.jobs.job.extension.jobstatus import job_status_finished_lst
from pyiron_base.jobs.job.generic import GenericJob, _doc_str_generic_job_attr
from pyiron_base.storage.datacontainer import DataContainer
from pyiron_base.storage.parameters import GenericParameters
__author__ = "Jan Janssen"
__copyright__ = (
"Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
"Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "production"
__date__ = "Sep 1, 2017"
# Modular Docstrings
_doc_str_generic_master_attr_extra = """\
.. attribute:: child_names
Dictionary matching the child ID to the child job name.
"""
_doc_str_generic_master_class = """\
The GenericMaster is the template class for all meta jobs - meaning all jobs which contain multiple other jobs. It
defines the shared functionality of the different kind of job series.
"""
_doc_str_generic_master_attr = (
_doc_str_generic_job_attr + "\n" + _doc_str_generic_master_attr_extra
)
[docs]
class GenericMaster(GenericJob):
__doc__ = (
_doc_str_generic_master_class
+ "\n"
+ _doc_str_job_core_args
+ "\n"
+ _doc_str_generic_master_attr
)
[docs]
def __init__(self, project, job_name):
super(GenericMaster, self).__init__(project, job_name=job_name)
self._input = GenericParameters("parameters")
self._job_name_lst = []
self._job_object_dict = {}
self._child_id_func = None
self._child_id_func_str = None
self._job_with_calculate_function = True
@property
def child_names(self):
"""
Dictionary matching the child ID to the child job name
Returns:
dict: {child_id: child job name }
"""
return {
child_id: self.project.db.get_item_by_id(child_id)["job"]
for child_id in self.child_ids
}
@property
def child_ids(self):
"""
list of child job ids - only meta jobs have child jobs - jobs which list the meta job as their master
Returns:
list: list of child job ids
"""
if self._child_id_func is not None:
return self._child_id_func(self)
else:
return super(GenericMaster, self).child_ids
@property
def child_project(self):
"""
:class:`.Project`: project which holds the created child jobs
"""
if not self.server.new_hdf:
return self.project
else:
return self.project.open(self.job_name + "_hdf5")
@property
def input(self):
return self._input
@input.setter
def input(self, new_input: Union[DataContainer, GenericParameters, HasStorage]):
if isinstance(new_input, (DataContainer, GenericParameters, HasStorage)):
self._input = new_input
else:
raise TypeError(
f"Expected a DataContainer, GenericParameters or HasStorage object but got {new_input.__class__}"
)
[docs]
def child_hdf(self, job_name):
"""
Find correct HDF for new children. Depending on `self.server.new_hdf` this creates a new hdf file or creates
the group in the file of this job.
Args:
job_name (str): name of the new job
Returns:
:class:`.ProjectHDFio`: HDF file for new child job, can be assigned to its
:attr:`~.Generic.project_hdf5`
"""
if self.server.new_hdf:
return self.project_hdf5.create_hdf(
path=self.child_project.path, job_name=job_name
)
else:
return self.project_hdf5.open(job_name)
@property
def job_object_dict(self):
"""
internal cache of currently loaded jobs
Returns:
dict: Dictionary of currently loaded jobs
"""
return self._job_object_dict
@wraps(GenericJob.set_input_to_read_only)
def set_input_to_read_only(self):
super().set_input_to_read_only()
self._input.read_only = True
[docs]
def first_child_name(self):
"""
Get the name of the first child job
Returns:
str: name of the first child job
"""
return self.project.db.get_item_by_id(self.child_ids[0])["job"]
[docs]
def append(self, job):
"""
Append a job to the GenericMaster - just like you would append an element to a list.
Args:
job (GenericJob): job to append
"""
if self.status.initialized and not job.status.initialized:
raise ValueError(
"GenericMaster requires reference jobs to have status initialized, rather than ",
job.status.string,
)
if job.server.cores >= self.server.cores:
self.server.cores = job.server.cores
if job.job_name not in self._job_name_lst:
self._job_name_lst.append(job.job_name)
self._child_job_update_hdf(parent_job=self, child_job=job)
[docs]
def pop(self, i=-1):
"""
Pop a job from the GenericMaster - just like you would pop an element from a list
Args:
i (int): position of the job. (Default is last element, -1.)
Returns:
GenericJob: job
"""
job_name_to_return = self._job_name_lst[i]
job_to_return = self._load_all_child_jobs(
self._load_job_from_cache(job_name_to_return)
)
del self._job_name_lst[i]
with self.project_hdf5.open("input") as hdf5_input:
hdf5_input["job_list"] = self._job_name_lst
job_to_return.relocate_hdf5()
if isinstance(job_to_return, GenericMaster):
for sub_job in job_to_return._job_object_dict.values():
self._child_job_update_hdf(parent_job=job_to_return, child_job=sub_job)
job_to_return.status.initialized = True
return job_to_return
[docs]
def move_to(self, project):
"""
Move the content of the job including the HDF5 file to a new location
Args:
project (ProjectHDFio): project to move the job to
Returns:
JobCore: JobCore object pointing to the new location.
"""
if self._job_id is not None:
for child_id in self.child_ids:
child = self.project.load(child_id)
child.move_to(project.open(self.job_name + "_hdf5"))
super(GenericMaster, self).move_to(project)
def _after_generic_copy_to(self, original, new_database_entry, reloaded):
if reloaded:
return
if (
self.job_id is not None
and new_database_entry
and original._job_id is not None
):
for child_id in original.child_ids:
child = original.project.load(child_id)
new_child = child.copy_to(
project=self.project.open(self.job_name + "_hdf5"),
new_database_entry=new_database_entry,
)
if new_database_entry and child.parent_id:
new_child.parent_id = self.job_id
if new_database_entry and child.master_id:
new_child.master_id = self.job_id
[docs]
def update_master(self, force_update=True):
super().update_master(force_update=force_update)
update_master.__doc__ = GenericJob.update_master.__doc__
[docs]
def to_hdf(self, hdf=None, group_name=None):
"""
Store the GenericMaster in an HDF5 file
Args:
hdf (ProjectHDFio): HDF5 group object - optional
group_name (str): HDF5 subgroup name - optional
"""
super(GenericMaster, self).to_hdf(hdf=hdf, group_name=group_name)
with self.project_hdf5.open("input") as hdf5_input:
self.input.to_hdf(hdf5_input)
hdf5_input["job_list"] = self._job_name_lst
self._to_hdf_child_function(hdf=hdf5_input)
for job in self._job_object_dict.values():
job.to_hdf()
[docs]
def from_hdf(self, hdf=None, group_name=None):
"""
Restore the GenericMaster from an HDF5 file
Args:
hdf (ProjectHDFio): HDF5 group object - optional
group_name (str): HDF5 subgroup name - optional
"""
super(GenericMaster, self).from_hdf(hdf=hdf, group_name=group_name)
with self.project_hdf5.open("input") as hdf5_input:
self.input.from_hdf(hdf5_input)
job_list_tmp = hdf5_input["job_list"]
self._from_hdf_child_function(hdf=hdf5_input)
self._job_name_lst = job_list_tmp
self._job_object_dict = {
job_name: self._load_job_from_cache(job_name=job_name)
for job_name in job_list_tmp
}
[docs]
def set_child_id_func(self, child_id_func):
"""
Add an external function to derive a list of child IDs - experimental feature
Args:
child_id_func (Function): Python function which returns the list of child IDs
"""
self._child_id_func = child_id_func
self.save()
self.status.finished = True
[docs]
def get_child_cores(self):
"""
Calculate the currently active number of cores, by summarizing all childs which are neither finished nor
aborted.
Returns:
(int): number of cores used
"""
return sum(
[
int(db_entry["computer"].split("#")[1])
for db_entry in self.project.db.get_items_dict(
{"masterid": self.job_id}
)
if db_entry["status"] not in job_status_finished_lst
]
)
def __len__(self):
"""
Length of the GenericMaster equal the number of childs appended.
Returns:
int: length of the GenericMaster
"""
return len(self._job_name_lst)
@deprecate(
"Use job.output for results, job.files to access files; job.content to access HDF storage and "
"job.child_project to access children of master jobs."
)
def __getitem__(self, item):
"""
Get/ read data from the GenericMaster
Args:
item (str, slice): path to the data or key of the data object
Returns:
dict, list, float, int: data or data object
"""
child_id_lst = self.child_ids
child_name_lst = [
self.project.db.get_item_by_id(child_id)["job"]
for child_id in self.child_ids
]
if isinstance(item, int):
total_lst = self._job_name_lst + child_name_lst
item = total_lst[item]
return self._get_item_when_str(
item=item, child_id_lst=child_id_lst, child_name_lst=child_name_lst
)
def __getattr__(self, item):
"""
CHeck if a job with the specific name exists
Args:
item (str): name of the job
Returns:
"""
item_from_get_item = self.__getitem__(item=item)
if item_from_get_item is not None:
return item_from_get_item
else:
raise AttributeError(
"{} tried to find child job {}, but getattr failed to find the item.".format(
self.job_name, item
)
)
[docs]
def interactive_close(self):
"""Not implemented for MetaJobs."""
pass
[docs]
def interactive_fetch(self):
"""Not implemented for MetaJobs."""
pass
[docs]
def interactive_flush(self, path="generic", include_last_step=True):
"""Not implemented for MetaJobs."""
pass
[docs]
def run_if_interactive_non_modal(self):
"""Not implemented for MetaJobs."""
pass
def _run_if_busy(self):
"""Not implemented for MetaJobs."""
pass
def _load_all_child_jobs(self, job_to_load):
"""
Helper function to load all child jobs to memory - like it was done in the previous implementation
Args:
job_to_load (GenericJob): job to be reloaded
Returns:
GenericJob: job to be reloaded - including all the child jobs and their child jobs
"""
if isinstance(job_to_load, GenericMaster):
for sub_job_name in job_to_load._job_name_lst:
job_to_load._job_object_dict[sub_job_name] = self._load_all_child_jobs(
job_to_load._load_job_from_cache(sub_job_name)
)
return job_to_load
def _load_job_from_cache(self, job_name):
"""
Helper funcction to load a job either from the _job_object_dict or from the HDF5 file
Args:
job_name (str): name of the job
Returns:
GenericJob: the reloaded job
"""
if job_name in self._job_object_dict.keys():
return self._job_object_dict[job_name]
else:
ham_obj = self.project_hdf5[job_name].to_object(
project=self.project_hdf5,
job_name=job_name,
)
return ham_obj
def _to_hdf_child_function(self, hdf):
"""
Helper function to store the child function in HDF5
Args:
hdf: HDF5 file object
"""
hdf["job_list"] = self._job_name_lst
if self._child_id_func is not None:
try:
hdf["child_id_func"] = inspect.getsource(self._child_id_func)
except IOError:
hdf["child_id_func"] = self._child_id_func_str
else:
hdf["child_id_func"] = "None"
def _from_hdf_child_function(self, hdf):
"""
Helper function to load the child function from HDF5
Args:
hdf: HDF5 file object
"""
try:
child_id_func_str = hdf["child_id_func"]
except ValueError:
child_id_func_str = "None"
if child_id_func_str == "None":
self._child_id_func = None
else:
self._child_id_func_str = child_id_func_str
self._child_id_func = get_function_from_string(child_id_func_str)
def _get_item_when_str(self, item, child_id_lst, child_name_lst):
"""
Helper function for __get_item__ when item is type string
Args:
item (str):
child_id_lst (list): a list containing all child job ids
child_name_lst (list): a list containing the names of all child jobs
Returns:
anything
"""
name_lst = item.split("/")
item_obj = name_lst[0]
if item_obj in child_name_lst:
child_id = child_id_lst[child_name_lst.index(item_obj)]
if len(name_lst) > 1:
return self.project.inspect(child_id)["/".join(name_lst[1:])]
else:
return self.project.load(child_id)
elif item_obj in self._job_name_lst:
child = self._load_job_from_cache(job_name=item_obj)
if len(name_lst) == 1:
return child
else:
return child["/".join(name_lst[1:])]
else:
return super(GenericMaster, self).__getitem__(item)
def _child_job_update_hdf(self, parent_job, child_job):
"""
Args:
parent_job:
child_job:
"""
child_job.project_hdf5.file_name = parent_job.project_hdf5.file_name
child_job.project_hdf5.h5_path = (
parent_job.project_hdf5.h5_path + "/" + child_job.job_name
)
if isinstance(child_job, GenericMaster):
for sub_job_name in child_job._job_name_lst:
self._child_job_update_hdf(
parent_job=child_job,
child_job=child_job._load_job_from_cache(sub_job_name),
)
parent_job.job_object_dict[child_job.job_name] = child_job
def _executable_activate_mpi(self):
"""
Internal helper function to switch the executable to MPI mode
"""
pass
def _init_child_job(self, parent):
"""
Update our reference job.
Args:
parent (:class:`.GenericJob`): job instance that this job was created from
"""
self.ref_job = parent
[docs]
def get_function_from_string(function_str):
"""
Convert a string of source code to a function
Args:
function_str: function source code
Returns:
function:
"""
function_dedent_str = textwrap.dedent(function_str)
exec(function_dedent_str)
return eval(function_dedent_str.split("(")[0][4:])