# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.
"""
The JobCore the most fundamental pyiron job class.
"""
import copy
import math
import os
import posixpath
import shutil
import warnings
from typing import Any, Generator, List, Optional, Union
from pyiron_snippets.deprecate import deprecate
from pyiron_base.interfaces.has_groups import HasGroups
from pyiron_base.jobs.job.extension.files import FileBrowser
from pyiron_base.jobs.job.util import (
_copy_database_entry,
_copy_to_delete_existing,
_get_project_for_copy,
_get_safe_job_name,
_job_archive,
_job_compress,
_job_decompress,
_job_delete_files,
_job_delete_hdf,
_job_is_archived,
_job_is_compressed,
_job_list_files,
_job_read_file,
_job_remove_folder,
_job_unarchive,
_rename_job,
)
from pyiron_base.state import state
from pyiron_base.storage.hdfio import ProjectHDFio
__author__ = "Jan Janssen"
__copyright__ = (
"Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
"Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "production"
__date__ = "Sep 1, 2017"
# Modular Docstrings
_doc_str_job_core_args = """\
Args:
project (ProjectHDFio): ProjectHDFio instance which points to the HDF5 file the job is stored in
job_name (str): name of the job, which has to be unique within the project
"""
_doc_str_job_core_attr = """\
Attributes:
.. attribute:: job_name
name of the job, which has to be unique within the project
.. attribute:: status
execution status of the job, can be one of the following [initialized, appended, created, submitted,
running, aborted, collect, suspended, refresh, busy, finished]
.. attribute:: job_id
unique id to identify the job in the pyiron database
.. attribute:: parent_id
job id of the predecessor job - the job which was executed before the current one in the current job series
.. attribute:: master_id
job id of the master job - a meta job which groups a series of jobs, which are executed either in parallel
or in serial.
.. attribute:: child_ids
list of child job ids - only meta jobs have child jobs - jobs which list the meta job as their master
.. attribute:: project
Project instance the jobs is located in
.. attribute:: project_hdf5
ProjectHDFio instance which points to the HDF5 file the job is stored in
.. attribute:: job_info_str
short string to describe the job by it is job_name and job ID - mainly used for logging
.. attribute:: working_directory
working directory of the job is executed in - outside the HDF5 file
.. attribute:: path
path to the job as a combination of absolute file system path and path within the HDF5 file.
"""
_doc_str_job_core_class = """\
The JobCore the most fundamental pyiron job class. From this class the GenericJob as well as the reduced
JobPath class are derived. While JobPath only provides access to the HDF5 file it is about one order faster.
Implements :class:`.HasGroups`. Groups are HDF groups in the HDF file associated with the job and any
child jobs, nodes are HDF dataset in the HDF file.
"""
[docs]
def recursive_load_from_hdf(project_hdf5: ProjectHDFio, item: str):
"""
Load given item from HDF, but check also for DataContainer along the way.
If `item` exists as is in HDF, return it, otherwise break it up along every slash and try to load a
:class:`~.DataContainer` and then try to index with the remainder of the path, i.e.
>>> recursive_load_from_hdf(hdf, 'my/path/to/value')
is equivalent to one of (in this order)
>>> hdf['my/path/to'].to_object()['value']
>>> hdf['my/path'].to_object()['to/value']
>>> hdf['my'].to_object()['path/to/value']
in case
>>> hdf['/my/path/to/value']
doesn't exist.
Args:
project_hdf5 (ProjectHDFio): HDF file to access
item (str): path to value, may contain `/`
Returns:
object: whatever was found in the HDF file
None: if nothing was found in the HDF file
"""
def successive_path_splits(name_lst: list) -> Generator:
"""
Yield successive split/joins of a path, i.e.
/a/b/c/d
gives
/a/b/c, d
/a/b, c/d
/a, b/c/d
"""
for i in range(1, len(name_lst)):
# where we are looking for the data container
container_path = "/".join(name_lst[:-i])
# where we are looking for data in the container
data_path = "/".join(name_lst[-i:])
yield container_path, data_path
try:
group = project_hdf5[item]
if (
isinstance(group, ProjectHDFio)
and "NAME" in group
and group["NAME"] == "DataContainer"
):
return group.to_object(lazy=True)
else:
return group
except ValueError:
pass
name_lst = item.split("/")
for container_path, data_path in successive_path_splits(name_lst):
try:
group = project_hdf5[container_path]
if (
isinstance(group, ProjectHDFio)
and "NAME" in group
and group["NAME"] == "DataContainer"
):
return group.to_object(lazy=True)[data_path]
except (ValueError, IndexError, KeyError):
# either group does not contain a data container or it is does, but it does not have the path we're
# looking for
pass
[docs]
class DatabaseProperties(object):
"""
Access the database entry of the job
"""
[docs]
def __init__(self, job_dict=None):
self._job_dict = job_dict
def __bool__(self):
return self._job_dict is not None
def __dir__(self):
return list(self._job_dict.keys())
def __getattr__(self, name):
if name in self._job_dict.keys():
return self._job_dict[name]
else:
raise AttributeError(name)
def __repr__(self):
return f"{self.__class__.__name__}({repr(self._job_dict)})"
[docs]
class HDF5Content(object):
"""
Access the HDF5 file of the job
"""
[docs]
def __init__(self, project_hdf5):
self._project_hdf5 = project_hdf5
def __getattr__(self, name):
try:
return self[name]
except KeyError:
raise AttributeError(name) from None
def __getitem__(self, item):
value = recursive_load_from_hdf(self._project_hdf5, item)
if value is not None:
return value
if item in self._project_hdf5.list_groups():
return HDF5Content(self._project_hdf5[item])
else:
raise KeyError(item)
def __dir__(self):
return self._project_hdf5.list_nodes() + self._project_hdf5.list_groups()
def __repr__(self):
return self._project_hdf5.__repr__()
[docs]
class JobCore(HasGroups):
__doc__ = (
_doc_str_job_core_class
+ "\n"
+ _doc_str_job_core_args
+ "\n"
+ _doc_str_job_core_attr
)
[docs]
def __init__(self, project: ProjectHDFio, job_name: str):
job_name = _get_safe_job_name(job_name)
self._name = job_name
self._hdf5 = project.open(self._name)
self._job_id = None
self._parent_id = None
self._master_id = None
self._status = None
self._import_directory = None
self._database_property = DatabaseProperties()
self._hdf5_content = HDF5Content(project_hdf5=self._hdf5)
self._files_to_remove = list()
self._files_to_compress = list()
@property
def content(self) -> HDF5Content:
return self._hdf5_content
@property
def files(self) -> FileBrowser:
return FileBrowser(working_directory=self.working_directory)
files.__doc__ = FileBrowser.__doc__
@property
def job_name(self) -> str:
"""
Get name of the job, which has to be unique within the project
Returns:
str: job name
"""
return self.name
@job_name.setter
def job_name(self, new_job_name: str) -> None:
"""
Set name of the job, which has to be unique within the project. When changing the job_name this also moves the
HDF5 file as the name of the HDF5 file is the job_name plus the extension *.h5
Args:
new_job_name (str): new job name
"""
self.name = new_job_name
@property
def name(self) -> str:
"""
Get name of the job, which has to be unique within the project
Returns:
str: job name
"""
return self._name
@name.setter
def name(self, new_job_name: str) -> None:
"""
Set name of the job, which has to be unique within the project. When changing the job_name this also moves the
HDF5 file as the name of the HDF5 file is the job_name plus the extension *.h5
Args:
new_job_name (str): new job name
"""
_rename_job(job=self, new_job_name=new_job_name)
@property
def status(self) -> str:
"""
Execution status of the job, can be one of the following [initialized, appended, created, submitted, running,
aborted, collect, suspended, refresh, busy, finished]
Returns:
(str/pyiron_base.job.jobstatus.JobStatus): status
"""
return self._status
@property
def job_id(self) -> int:
"""
Unique id to identify the job in the pyiron database
Returns:
int: job id
"""
if self._job_id is None and not state.database.database_is_disabled:
self._job_id = self.get_job_id()
return self._job_id
@property
def id(self) -> int:
"""
Unique id to identify the job in the pyiron database - use self.job_id instead
Returns:
int: job id
"""
return self.job_id
@property
def database_entry(self) -> DatabaseProperties:
if not bool(self._database_property):
self._database_property = DatabaseProperties(
job_dict=self.project.db.get_item_by_id(self.job_id)
)
return self._database_property
@property
def parent_id(self) -> int:
"""
Get job id of the predecessor job - the job which was executed before the current one in the current job series
Returns:
int: parent id
"""
if self._parent_id is None and self.job_id is not None:
return self.project.db.get_item_by_id(self.job_id)["parentid"]
return self._parent_id
@parent_id.setter
def parent_id(self, parent_id: int) -> None:
"""
Set job id of the predecessor job - the job which was executed before the current one in the current job series
Args:
parent_id (int): parent id
"""
if self.job_id is not None:
self.project.db.item_update({"parentid": parent_id}, self.job_id)
self._parent_id = parent_id
@property
def master_id(self) -> int:
"""
Get job id of the master job - a meta job which groups a series of jobs, which are executed either in parallel
or in serial.
Returns:
int: master id
"""
if self._master_id is None and self.job_id is not None:
return self.project.db.get_item_by_id(self.job_id)["masterid"]
return self._master_id
@master_id.setter
def master_id(self, master_id: int) -> None:
"""
Set job id of the master job - a meta job which groups a series of jobs, which are executed either in parallel
or in serial.
Args:
master_id (int): master id
"""
if self.job_id is not None:
self.project.db.item_update({"masterid": master_id}, self.job_id)
self._master_id = master_id
@property
def child_ids(self) -> list:
"""
list of child job ids - only meta jobs have child jobs - jobs which list the meta job as their master
Returns:
list: list of child job ids
"""
return self.project.get_child_ids(
job_specifier=self.job_name, project=self.project.project_path
)
@property
def project_hdf5(self) -> ProjectHDFio:
"""
Get the ProjectHDFio instance which points to the HDF5 file the job is stored in
Returns:
ProjectHDFio: HDF5 project
"""
return self._hdf5
@project_hdf5.setter
def project_hdf5(self, project: ProjectHDFio) -> None:
"""
Set the ProjectHDFio instance which points to the HDF5 file the job is stored in
Args:
project (ProjectHDFio): HDF5 project
"""
self._hdf5 = project.copy()
@property
def files_to_compress(self) -> list:
return self._files_to_compress or self.files.list()
@property
def files_to_remove(self) -> list:
return self._files_to_remove
[docs]
def relocate_hdf5(self, h5_path: Optional[str] = None):
"""
Relocate the hdf file. This function is needed when the child job is
spawned by a parent job (cf. pyiron_base.jobs.master.generic)
"""
if h5_path is None:
h5_path = "/" + self.job_name
self.project_hdf5.remove_group()
self.project_hdf5 = self.project_hdf5.__class__(
self.project, self.job_name, h5_path=h5_path
)
@property
def project(self) -> "pyiron_base.project.generic.Project":
"""
Project instance the jobs is located in
Returns:
Project: project the job is located in
"""
return self._hdf5.project
@property
def job_info_str(self) -> str:
"""
Short string to describe the job by it is job_name and job ID - mainly used for logging
Returns:
str: job info string
"""
return "job: {0} id: {1}".format(self._name, self.job_id)
@property
def working_directory(self) -> str:
"""
working directory of the job is executed in - outside the HDF5 file
Returns:
str: working directory
"""
return self.project_hdf5.working_directory
@property
def path(self) -> str:
"""
Absolute path of the HDF5 group starting from the system root - combination of the absolute system path plus the
absolute path inside the HDF5 file starting from the root group.
Returns:
str: absolute path
"""
return self.project_hdf5.path
[docs]
def check_if_job_exists(
self,
job_name: Optional[str] = None,
project: Optional[
Union[ProjectHDFio, "pyiron_base.project.generic.Project"]
] = None,
):
"""
Check if a job already exists in an specific project.
Args:
job_name (str): Job name (optional)
project (ProjectHDFio, Project): Project path (optional)
Returns:
(bool): True / False
"""
if job_name is None:
job_name = self.job_name
if project is None:
project = self._hdf5
where_dict = {
"job": str(job_name),
"project": str(project.project_path),
"subjob": str(project.h5_path),
}
if not state.database.database_is_disabled and self.project.db.get_items_dict(
where_dict, return_all_columns=False
):
return True
elif state.database.database_is_disabled and os.path.exists(
self.project_hdf5.file_name
):
return True
else:
return False
[docs]
def show_hdf(self) -> None:
"""
Iterating over the HDF5 datastructure and generating a human readable graph.
"""
self.project_hdf5.show_hdf()
[docs]
def get_from_table(self, path: str, name: str) -> Union[dict, list, float, int]:
"""
Get a specific value from a pandas.Dataframe
Args:
path (str): relative path to the data object
name (str): parameter key
Returns:
dict, list, float, int: the value associated to the specific parameter key
"""
return self.project_hdf5.get_from_table(path, name)
[docs]
def remove(self, _protect_childs: bool = True) -> None:
"""
Remove the job - this removes the HDF5 file, all data stored in the HDF5 file an the corresponding database entry.
Args:
_protect_childs (bool): [True/False] by default child jobs can not be deleted, to maintain the consistency
- default=True
"""
# When the Job is a GenericMaster, try to delete its children first.
if len(self.child_ids) > 0:
if _protect_childs:
if self._master_id is not None and not math.isnan(self._master_id):
state.logger.error(
"Job {0} is a child of a master job and cannot be deleted!".format(
str(self.job_id)
)
)
raise ValueError("Child jobs are protected and cannot be deleted!")
for job_id in self.child_ids:
job = self.project.inspect(job_id)
if len(job.child_ids) > 0:
job.remove(_protect_childs=False)
else:
self.project_hdf5.remove_job(job_id, _unprotect=True)
# After all children are deleted, remove the job itself.
self.remove_child()
[docs]
def remove_child(self) -> None:
"""
internal function to remove command that removes also child jobs.
Do never use this command, since it will destroy the integrity of your project.
"""
# Check if the job requires to be removed from the full object (This is the case for external Storage)
# TODO: remove this workaround once the database lookup is aware of external storage types.
requires_full_object = self._hdf5.get("REQUIRE_FULL_OBJ_FOR_RM", default=False)
if requires_full_object:
job = self.to_object()
job._before_generic_remove_child()
# Delete job from HPC-computing-queue if it is still running.
job_status = str(self.status)
if (
job_status in ["submitted", "running", "collect"]
and "server" in self.project_hdf5.list_nodes()
):
server_hdf_dict = self.project_hdf5["server"]
if "qid" in server_hdf_dict.keys() and server_hdf_dict["qid"] is not None:
self.project.queue_delete_job(server_hdf_dict["qid"])
# Delete working directory:
_job_delete_files(job=self)
# Delete HDF5 file
with self.project_hdf5.open("..") as hdf_parent:
hdf_groups = hdf_parent.list_groups()
if self.job_name in hdf_groups and len(hdf_groups) < 2:
_job_delete_hdf(job=self)
else:
with self.project_hdf5.open("..") as hdf_parent:
try:
del hdf_parent[self.job_name]
except (AttributeError, LookupError, KeyError, OSError):
print(
"This group does not exist in the HDF5 file {}".format(
self.job_name
)
)
_job_remove_folder(job=self)
# Delete database entry
if self.job_id is not None:
self.project.db.delete_item(self.job_id)
[docs]
def to_object(
self, object_type: Optional[str] = None, **qwargs
) -> "pyiron_base.job.generic.GenericJob":
"""
Load the full pyiron object from an HDF5 file
Args:
object_type: if the 'TYPE' node is not available in the HDF5 file a manual object type can be set - optional
**qwargs: optional parameters ['job_name', 'project'] - to specify the location of the HDF5 path
Returns:
GenericJob: pyiron object
"""
if self.project_hdf5.is_empty:
raise ValueError(
'The HDF5 file of this job with the job_name: "'
+ self.job_name
+ '" is empty, so it can not be loaded.'
)
return self.project_hdf5.to_object(object_type, **qwargs)
[docs]
def get(self, name: str, default: Optional[Any] = None) -> Any:
"""
Internal wrapper function for __getitem__() - self[name]
Args:
key (str, slice): path to the data or key of the data object
default (any, optional): return this if key cannot be found
Returns:
dict, list, float, int: data or data object
Raises:
ValueError: key cannot be found and default is not given
"""
try:
return self.__getitem__(name)
except ValueError:
if default is not None:
return default
raise
[docs]
def load(
self, job_specifier: Union[str, int], convert_to_object: bool = True
) -> Union["pyiron_base.job.generic.GenericJob", "JobCore"]:
"""
Load an existing pyiron object - most commonly a job - from the database
Args:
job_specifier (str, int): name of the job or job ID
convert_to_object (bool): convert the object to an pyiron object or only access the HDF5 file - default=True
accessing only the HDF5 file is about an order of magnitude faster, but only
provides limited functionality. Compare the GenericJob object to JobCore object.
Returns:
GenericJob, JobCore: Either the full GenericJob object or just a reduced JobCore object
"""
return self.project.load(
job_specifier=job_specifier, convert_to_object=convert_to_object
)
[docs]
def inspect(self, job_specifier: Union[str, int]) -> "JobCore":
"""
Inspect an existing pyiron object - most commonly a job - from the database
Args:
job_specifier (str, int): name of the job or job ID
Returns:
JobCore: Access to the HDF5 object - not a GenericJob object - use load() instead.
"""
return self.project.inspect(job_specifier=job_specifier)
[docs]
def is_master_id(self, job_id: int) -> bool:
"""
Check if the job ID job_id is the master ID for any child job
Args:
job_id (int): job ID of the master job
Returns:
bool: [True/False]
"""
return (
len(
[
job["id"]
for job in self.project.db.get_items_dict(
{"masterid": str(job_id)}, return_all_columns=False
)
]
)
> 0
)
[docs]
def get_job_id(
self, job_specifier: Optional[Union[str, int]] = None
) -> Union[int, None]:
"""
get the job_id for job named job_name in the local project path from database
Args:
job_specifier (str, int): name of the job or job ID
Returns:
int: job ID of the job
"""
if job_specifier is not None:
return self.project.get_job_id(job_specifier)
where_dict = {
"job": str(self._name),
"project": str(self.project_hdf5.project_path),
"subjob": str(self.project_hdf5.h5_path),
}
response = self.project.db.get_items_dict(where_dict, return_all_columns=False)
if len(response) > 0:
return response[-1]["id"]
return None
[docs]
@deprecate("use job.files.list()")
def list_files(self) -> list:
"""
List files inside the working directory
Args:
extension (str): filter by a specific extension
Returns:
list: list of file names
"""
return _job_list_files(self)
[docs]
def list_childs(self) -> list:
"""
List child jobs as JobPath objects - not loading the full GenericJob objects for each child
Returns:
list: list of child jobs
"""
return [self.project.inspect(child_id).job_name for child_id in self.child_ids]
def _list_groups(self) -> list:
return self.project_hdf5.list_groups() + self._list_ext_childs()
def _list_nodes(self) -> list:
return self.project_hdf5.list_nodes()
def _list_all(self) -> dict:
"""
List all groups and nodes of the HDF5 file - where groups are equivalent to directories and nodes to files.
Returns:
dict: {'groups': [list of groups], 'nodes': [list of nodes]}
"""
h5_dict = self.project_hdf5.list_all()
h5_dict["groups"] += self._list_ext_childs()
return h5_dict
[docs]
def copy(self) -> "JobCore":
"""
Copy the JobCore object which links to the HDF5 file
Returns:
JobCore: New FileHDFio object pointing to the same HDF5 file
"""
copied_self = copy.copy(self)
copied_self.reset_job_id()
return copied_self
def _internal_copy_to(
self,
project: Optional[
Union["JobCore", ProjectHDFio, "pyiron_base.project.generic.Project"]
] = None,
new_job_name: Optional[str] = None,
new_database_entry: bool = True,
copy_files: bool = True,
delete_existing_job: bool = False,
) -> "JobCore":
"""
Internal helper function for copy_to() which returns more
Args:
project (JobCore/ProjectHDFio/Project/None): The project to copy the job to.
(Default is None, use the same project.)
new_job_name (str): The new name to assign the duplicate job. Required if the project is `None` or the same
project as the copied job. (Default is None, try to keep the same name.)
new_database_entry (bool): [True/False] to create a new database entry - default True
copy_files (bool): [True/False] copy the files inside the working directory - default True
delete_existing_job (bool): [True/False] Delete existing job in case it exists already (Default is False.)
"""
# Check either a new project, a new job_name or both were specified.
if project is None and new_job_name is None:
raise ValueError("copy_to requires either a new project or a new_job_name.")
# Set the new job name
new_job_name = new_job_name or self.job_name
# The project variable can be JobCore/ProjectHDFio/Project,
# get a Project and a ProjectHDFio object.
file_project, hdf5_project = _get_project_for_copy(
job=self, project=project, new_job_name=new_job_name
)
# Check if the job exists already and either delete it or return it
job_return = _copy_to_delete_existing(
project_class=file_project,
job_name=new_job_name,
delete_job=delete_existing_job,
)
if job_return is not None:
return job_return, file_project, hdf5_project, True
# Create a new job by copying the current python object, move the content
# of the HDF5 file and then attach the new HDF5 link to the new python object.
new_job_core = self.copy()
new_job_core._name = new_job_name
new_job_core._hdf5 = hdf5_project
new_job_core._hdf5_content = HDF5Content(project_hdf5=hdf5_project)
new_job_core._master_id = self._master_id
new_job_core._parent_id = self._parent_id
new_job_core._master_id = self._master_id
new_job_core._status = self._status
new_job_core._create_working_directory()
if new_job_name == self.job_name:
self.project_hdf5.copy_to(destination=hdf5_project.open(".."))
else:
self.project_hdf5.copy_to(destination=hdf5_project, maintain_name=False)
# Update the database entry
if self.job_id is not None:
if new_database_entry:
_copy_database_entry(
new_job_core=new_job_core,
job_copied_id=self.job_id,
username=state.settings.login_user,
)
else:
new_job_core.reset_job_id(job_id=None)
# Copy files outside the HDF5 file
if copy_files and os.path.exists(self.working_directory):
wd_content = os.listdir(new_job_core.working_directory)
if len(wd_content) == 0:
os.rmdir(new_job_core.working_directory)
else:
raise RuntimeError(
f"Target directory for copy not empty! Content = {wd_content}."
)
shutil.copytree(self.working_directory, new_job_core.working_directory)
return new_job_core, file_project, hdf5_project, False
[docs]
def copy_to(
self,
project: Union["JobCore", ProjectHDFio, "pyiron_base.project.generic.Project"],
new_job_name: Optional[str] = None,
input_only: bool = False,
new_database_entry: bool = True,
copy_files: bool = True,
) -> "JobCore":
"""
Copy the content of the job including the HDF5 file to a new location
Args:
project (JobCore/ProjectHDFio/Project): project to copy the job to
new_job_name (str): The new name to assign the duplicate job. Required if the project is `None` or the same
project as the copied job. (Default is None, try to keep the same name.)
input_only (bool): [True/False] Whether to copy only the input. (Default is False.)
new_database_entry (bool): [True/False] Whether to create a new database entry. If input_only is True then
new_database_entry is False. (Default is True.)
copy_files (bool): [True/False] copy the files inside the working directory - default True
Returns:
JobCore: JobCore object pointing to the new location.
"""
# Update flags
if input_only and new_database_entry:
warnings.warn(
"input_only conflicts new_database_entry; setting new_database_entry=False"
)
new_database_entry = False
new_job_core, _, _, reloaded = self._internal_copy_to(
project=project,
new_job_name=new_job_name,
new_database_entry=new_database_entry,
copy_files=copy_files,
)
if reloaded:
return new_job_core
# Remove output if it should not be copied
if input_only:
for group in new_job_core.project_hdf5.list_groups():
if "output" in group:
del new_job_core.project_hdf5[
posixpath.join(new_job_core.project_hdf5.h5_path, group)
]
new_job_core._status = "initialized"
return new_job_core
[docs]
def move_to(self, project: ProjectHDFio) -> None:
"""
Move the content of the job including the HDF5 file to a new location
Args:
project (ProjectHDFio): project to move the job to
"""
delete_hdf5_after_copy = False
old_working_directory = self.working_directory
if not self.project_hdf5.file_exists:
delete_hdf5_after_copy = True
new_job = self.copy_to(project=project, new_database_entry=False)
if self.project_hdf5.file_exists:
if len(self.project_hdf5.h5_path.split("/")) == 2:
self.project_hdf5.remove_file()
else:
self.project_hdf5.remove_group()
self.project_hdf5 = new_job.project_hdf5.copy()
if self._job_id is not None:
self.project.db.item_update(
{
"subjob": self.project_hdf5.h5_path,
"projectpath": self.project_hdf5.root_path,
"project": self.project_hdf5.project_path,
},
self._job_id,
)
if delete_hdf5_after_copy:
if len(self.project_hdf5.h5_path.split("/")) == 2:
self.project_hdf5.remove_file()
else:
self.project_hdf5.remove_group()
if os.path.exists(old_working_directory):
shutil.rmtree(old_working_directory)
os.rmdir("/".join(old_working_directory.split("/")[:-1]))
[docs]
def rename(self, new_job_name: str) -> None:
"""
Rename the job - by changing the job name
Args:
new_job_name (str): new job name
"""
self.job_name = new_job_name
[docs]
def reset_job_id(self, job_id: Optional[int] = None) -> None:
"""
The reset_job_id function has to be implemented by the derived classes - usually the GenericJob class
Args:
job_id (int/ None):
"""
if job_id is not None:
job_id = int(job_id)
self._job_id = job_id
[docs]
def save(self) -> None:
"""
The save function has to be implemented by the derived classes - usually the GenericJob class
"""
raise NotImplementedError("save() should be implemented in the derived class")
[docs]
def to_hdf(
self, hdf: Optional[ProjectHDFio] = None, group_name: str = "group"
) -> None:
"""
Store object in hdf5 format - The function has to be implemented by the derived classes
- usually the GenericJob class
Args:
hdf (ProjectHDFio): Optional hdf5 file, otherwise self is used.
group_name (str): Optional hdf5 group in the hdf5 file.
"""
raise NotImplementedError("to_hdf() should be implemented in the derived class")
[docs]
def from_hdf(
self, hdf: Optional[ProjectHDFio] = None, group_name: str = "group"
) -> None:
"""
Restore object from hdf5 format - The function has to be implemented by the derived classes
- usually the GenericJob class
Args:
hdf (ProjectHDFio): Optional hdf5 file, otherwise self is used.
group_name (str): Optional hdf5 group in the hdf5 file.
"""
raise NotImplementedError(
"from_hdf() should be implemented in the derived class"
)
def __del__(self) -> None:
"""
The delete function is just implemented for compatibilty
"""
del self._name
del self._hdf5
del self._job_id
del self._parent_id
del self._master_id
del self._status
@deprecate(
"Use job.output for results, job.files to access files; job.content to access HDF storage and "
"job.child_project to access children of master jobs."
)
def __getitem__(self, item: str) -> Any:
"""
Get/read data from the HDF5 file, child jobs or access log files.
If the job is :method:`~.decompress`ed, item can also be a file name to
access the raw output file of that name of the job. See available file
with :method:`~.list_files()`.
`item` is first looked up in this jobs HDF5 file, then in the HDF5 files of any child jobs and finally it is
matched against any files in the job directory as described above.
If `item` doesn't match any value (i.e. `None` would be returned), but along its path a `DataContainer` is
located, it will be lazily loaded from HDF and then indexed with the remaineder of the path.
Args:
item (str, slice): path to the data or key of the data object
Returns:
dict, list, float, int, :class:`.DataContainer`, None: data or data object; if nothing is found None is returned
"""
# first try to access HDF5 directly to make the common case fast
value = recursive_load_from_hdf(self._hdf5, item)
if value is not None:
return value
# only try to read files when no slashes are present:
# downstream code will often do something like job['path/to/output'] to check if certain values exist and branch
# on that. In cases where they don't exists this would then trigger us to decompress the job files in memory on
# every check which slows down things a lot. Generally these value checks will be of the form output/.../...
# i.e. contain slashes and file access tend to be just the file name without slashes, so I separate those cases
# here like this. In those cases where we actually have sub directories in the job folders we can beef up the
# file browser.
if "/" not in item and item in self.files.list():
warnings.warn(
"Using __getitem__ on a job to access files in deprecated: use job.files instead!",
category=DeprecationWarning,
)
return _job_read_file(self, item)
name_lst = item.split("/")
item_obj = name_lst[0]
if item_obj in self._list_ext_childs():
# ToDo: Murn['strain_0.9'] - sucht im HDF5 file, dort gibt es aber die entsprechenden Gruppen noch nicht.
child = self._hdf5[self._name + "_hdf5/" + item_obj]
print("job get: ", self._name + "_jobs")
if len(name_lst) == 1:
return child
else:
return child["/".join(name_lst[1:])]
return None
def __setitem__(self, key: str, value: Any) -> None:
"""
Stores data
Args:
key (str): key to store in hdf (full path)
value (anything): value to store
"""
if not key.startswith("user/"):
raise ValueError(
"user defined paths+values must begin with user/, e.g. job['user/key'] = value"
)
self._hdf5[key] = value
def __delitem__(self, key: str) -> None:
"""
Delete item from the HDF5 file
Args:
key (str): key of the item to delete
"""
del self.project_hdf5[posixpath.join(self.project_hdf5.h5_path, key)]
def __repr__(self) -> str:
"""
Human readable string representation
Returns:
str: list all nodes and groups as string
"""
return str(self.list_all())
def _create_working_directory(self) -> None:
"""
internal function to create the working directory on the file system if it does not exist already.
"""
self.project_hdf5.create_working_directory()
def _list_ext_childs(self) -> list:
"""
internal function to list nodes excluding childs
Returns:
list: list of nodes without childs
"""
nodes = self.list_nodes()
childs = self.list_childs()
return list(set(childs) - set(nodes))
[docs]
def compress(
self,
files_to_compress: Optional[List[str]] = None,
files_to_remove: Optional[List[str]] = None,
) -> None:
"""
Compress the output files of a job object.
Args:
files_to_compress (list):
"""
if files_to_compress is None:
files_to_compress = self.files_to_compress
if files_to_remove is None:
files_to_remove = self.files_to_remove
else:
files_to_remove = []
_job_compress(
job=self,
files_to_compress=files_to_compress,
files_to_remove=files_to_remove,
)
[docs]
def decompress(self) -> None:
"""
Decompress the output files of a compressed job object.
"""
_job_decompress(job=self)
[docs]
def is_compressed(self) -> bool:
"""
Check if the job is already compressed or not.
Returns:
bool: [True/False]
"""
return _job_is_compressed(job=self)
[docs]
def self_archive(self) -> None:
"""
Compress HDF5 file of the job object to tar-archive
"""
_job_archive(job=self)
[docs]
def self_unarchive(self) -> None:
"""
Decompress HDF5 file of the job object from tar-archive
"""
_job_unarchive(job=self)
[docs]
def is_self_archived(self) -> bool:
"""
Check if the HDF5 file of the Job is compressed as tar-archive
Returns:
bool: [True/False]
"""
return _job_is_archived(job=self)