# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.
"""
Classes to map the Python objects to HDF5 data structures
"""
import importlib
import numbers
import os
import posixpath
import sys
from typing import Any, Dict, List, Optional, Set, Tuple, Union
import h5py
import numpy as np
import pandas
from h5io_browser import Pointer, read_nested_dict_from_hdf
from h5io_browser.base import (
_is_ragged_in_1st_dim_only,
_open_hdf,
_read_hdf,
_write_hdf5_with_json_support,
)
from pyiron_snippets.deprecate import deprecate
from pyiron_base.interfaces.has_groups import HasGroups
from pyiron_base.jobs.job.util import _get_safe_job_name
from pyiron_base.state import state
from pyiron_base.utils.instance import static_isinstance
__author__ = "Joerg Neugebauer, Jan Janssen"
__copyright__ = (
"Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
"Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "production"
__date__ = "Sep 1, 2017"
# for historic reasons we write str(class) into the HDF 'TYPE' field of objects, so we need to parse this back out
def _extract_fully_qualified_name(type_field: str) -> str:
"""
Extracts the fully qualified name from the given type field.
Args:
type_field (str): The type field containing the fully qualified name.
Returns:
str: The extracted fully qualified name.
"""
return type_field.split("'")[1]
def _extract_module_class_name(type_field: str) -> Tuple[str, str]:
"""
Extracts the module path and class name from the given type field.
Args:
type_field (str): The type field containing the fully qualified name.
Returns:
Tuple[str, str]: The module path and class name.
"""
fully_qualified_path = _extract_fully_qualified_name(type_field)
return fully_qualified_path.rsplit(".", maxsplit=1)
def _list_groups_and_nodes(hdf: h5py.File, h5_path: str) -> Tuple[List[str], List[str]]:
"""
Get the list of groups and list of nodes from an open HDF5 file
Args:
hdf (h5py.File): file handle of an open HDF5 file
h5_path (str): path inside the HDF5 file
Returns:
Tuple[List[str], List[str]]: list of groups and list of nodes
"""
groups = set()
nodes = set()
try:
h = hdf[h5_path]
for k in h.keys():
if isinstance(h[k], h5py.Group):
groups.add(k)
else:
nodes.add(k)
except KeyError:
pass
return list(groups), list(nodes)
def _import_class(module_path: str, class_name: str) -> type:
"""
Import given class from fully qualified name and return class object.
Args:
module_path (str): fully qualified name of a pyiron class
class_name (str): fully qualified name of a pyiron class
Returns:
type: class object of the given name
"""
# ugly dynamic import, but only needed to log the warning anyway
from pyiron_base.jobs.job.jobtype import JobTypeChoice
job_class_dict = JobTypeChoice().job_class_dict # access global singleton
if class_name in job_class_dict:
known_module_path = job_class_dict[class_name]
# entries in the job_class_dict are either strings of modules or fully
# loaded class object; in the latter case our work here is done we just
# return the class
if isinstance(known_module_path, type):
return known_module_path
if module_path != known_module_path:
state.logger.info(
f'Using registered module "{known_module_path}" instead of custom/old module "{module_path}" to'
f' import job type "{class_name}"!'
)
module_path = known_module_path
try:
return getattr(
importlib.import_module(module_path),
class_name,
)
except ImportError:
import pyiron_base.project.maintenance
if module_path in pyiron_base.project.maintenance._MODULE_CONVERSION_DICT:
raise RuntimeError(
f"Could not import {class_name} from {module_path}, but module path known to have changed. "
"Call project.maintenance.local.update_hdf_types() to upgrade storage!"
) from None
else:
raise
def _to_object(
hdf: "FileHDFio", class_name: Optional[str] = None, **kwargs: Any
) -> Any:
"""
Load the full pyiron object from an HDF5 file
Args:
hdf (FileHDFio): The HDF5 file handle.
class_name (str, optional): If the 'TYPE' node is not available in the HDF5 file, a manual object type can be
set. Must be as reported by `str(type(obj))`.
**kwargs: Optional parameters to override init parameters.
Returns:
Any: Pyiron object of the given class_name.
"""
if "TYPE" not in hdf.list_nodes() and class_name is None:
raise ValueError("Objects can be only recovered from hdf5 if TYPE is given")
elif class_name is not None and class_name != hdf.get("TYPE"):
raise ValueError(
"Object type in hdf5-file must be identical to input parameter"
)
type_field = class_name or hdf.get("TYPE")
module_path, class_name = _extract_module_class_name(type_field)
class_object = _import_class(module_path, class_name)
# Backwards compatibility since the format of TYPE changed
if type_field != str(class_object):
hdf["TYPE"] = str(class_object)
if hasattr(class_object, "from_hdf_args"):
init_args = class_object.from_hdf_args(hdf)
else:
init_args = {}
init_args.update(kwargs)
obj = class_object(**init_args)
obj.from_hdf(hdf=hdf.open(".."), group_name=hdf.h5_path.split("/")[-1])
if static_isinstance(obj=obj, obj_type="pyiron_base.jobs.job.generic.GenericJob"):
module_name = module_path.split(".")[0]
module = importlib.import_module(module_name)
if hasattr(module, "Project"):
obj.project_hdf5._project = getattr(module, "Project")(
obj.project_hdf5.project.path
)
return obj
[docs]
class FileHDFio(HasGroups, Pointer):
"""
Class that provides all info to access a h5 file. This class is based on h5io.py, which allows to
get and put a large variety of jobs to/from h5
Implements :class:`.HasGroups`. Groups are HDF groups in the file, nodes are HDF datasets.
Args:
file_name (str): absolute path of the HDF5 file
h5_path (str): absolute path inside the h5 path - starting from the root group
mode (str): mode : {'a', 'w', 'r', 'r+'}, default 'a'
See HDFStore docstring or tables.open_file for info about modes
.. attribute:: file_name
absolute path to the HDF5 file
.. attribute:: h5_path
path inside the HDF5 file - also stored as absolute path
.. attribute:: history
previously opened groups / folders
.. attribute:: file_exists
boolean if the HDF5 was already written
.. attribute:: base_name
name of the HDF5 file but without any file extension
.. attribute:: file_path
directory where the HDF5 file is located
.. attribute:: is_root
boolean if the HDF5 object is located at the root level of the HDF5 file
.. attribute:: is_open
boolean if the HDF5 file is currently opened - if an active file handler exists
.. attribute:: is_empty
boolean if the HDF5 file is empty
"""
[docs]
def __init__(self, file_name: str, h5_path: str = "/", mode: str = "a") -> None:
Pointer.__init__(self=self, file_name=file_name, h5_path=h5_path)
self.history = []
self._filter = ["groups", "nodes", "objects"]
# MutableMapping Impl
def __contains__(self, item: str) -> bool:
"""
Check if an item exists in the HDF5 file.
Args:
item (str): path to the data or key of the data object
Returns:
bool: True if the item exists, False otherwise
"""
nodes_groups = self.list_all()
return item in nodes_groups["nodes"] or item in nodes_groups["groups"]
def __len__(self) -> int:
"""
Get the number of items in the HDF5 file.
Returns:
int: Number of items in the HDF5 file
"""
nodes_groups = self.list_all()
return len(nodes_groups["nodes"]) + len(nodes_groups["groups"])
def __iter__(self):
"""
Iterate over the keys in the HDF5 file.
Returns:
iter: Iterator over the keys in the HDF5 file
"""
return iter(self.keys())
def __getitem__(self, item: Union[str, slice]) -> Union[Dict, List, float, int]:
"""
Get/ read data from the HDF5 file.
Args:
item (str, slice): path to the data or key of the data object
Returns:
Union[Dict, List, float, int]: Data or data object
"""
if isinstance(item, slice):
if not (item.start or item.stop or item.step):
return self.values()
raise NotImplementedError("Implement if needed, e.g. for [:]")
else:
try:
# fast path, a good amount of accesses will want to fetch a specific dataset it knows exists in the
# file, there's therefor no point in checking whether item is a group or a node or even worse recursing
# in case when item contains '/'. In most cases read_hdf5 will grab the correct data straight away and
# if not we will still check thoroughly below. Since list_nodes()/list_groups() each open the
# underlying file once, this reduces the number of file opens in the most-likely case from 2 to 1 (1 to
# check whether the data is there and 1 to read it) and increases in the worst case from 1 to 2 (1 to
# try to read it here and one more time to verify it's not a group below).
return _read_hdf(
hdf_filehandle=self.file_name, h5_path=self._get_h5_path(item)
)
except (ValueError, OSError, RuntimeError, NotImplementedError):
# h5io couldn't find a dataset with name item, but there still might be a group with that name, which we
# check in the rest of the method
pass
item_lst = item.split("/")
if len(item_lst) == 1 and item_lst[0] != "..":
# if item in self.list_nodes() we would have caught it in the fast path above
if item in self.list_groups():
with self.open(item) as hdf_item:
obj = hdf_item.copy()
if self._is_convertable_dtype_object_array(obj):
obj = self._convert_dtype_obj_array(obj)
return obj
raise ValueError(
"Unknown item: {} {} {}".format(item, self.file_name, self.h5_path)
)
else:
if (
item_lst[0] == ""
): # item starting with '/', thus we have an absoute HDF5 path
item_abs_lst = os.path.normpath(item).replace("\\", "/").split("/")
else: # relative HDF5 path
# The self.h5_path is an absolute path (/h5_path/in/h5/file), however, to
# reach any directory super to root, we start with a
# relative path = ./h5_path/in/h5/file and add whatever we get as item.
# The normpath finally returns a path to the item which is relative to the hdf-root.
item_abs_lst = (
os.path.normpath(os.path.join("." + self.h5_path, item))
.replace("\\", "/")
.split("/")
)
# print('h5_path=', self.h5_path, 'item=', item, 'item_abs_lst=', item_abs_lst)
if item_abs_lst[0] == "." and len(item_abs_lst) == 1:
# Here, we are asked to return the root of the HDF5-file. The resulting self.path would be the
# same as the self.file_path and, thus, the path of the pyiron Project this HDF5-file belongs to:
return self.create_project_from_hdf5()
elif item_abs_lst[0] == "..":
# Here, we are asked to return a path super to the root of the HDF5-file, a.k.a. the path of it's
# pyiron Project, thus we pass the relative path to the pyiron Project to handle it:
return self.create_project_from_hdf5()["/".join(item_abs_lst)]
else:
hdf_object = self.copy()
hdf_object.h5_path = "/".join(item_abs_lst[:-1])
return hdf_object[item_abs_lst[-1]]
# TODO: remove this function upon 1.0.0 release
@staticmethod
def _is_convertable_dtype_object_array(obj: np.ndarray) -> bool:
"""
Check if an object array is convertable to a different dtype.
Args:
obj (np.ndarray): Object array
Returns:
bool: True if the object array is convertable, False otherwise
"""
if isinstance(obj, np.ndarray) and obj.dtype == np.dtype(object):
first_element = obj[(0,) * obj.ndim]
last_element = obj[(-1,) * obj.ndim]
if (
isinstance(first_element, numbers.Number)
and isinstance(last_element, numbers.Number)
and not _is_ragged_in_1st_dim_only(obj)
):
return True
return False
# TODO: remove this function upon 1.0.0 release
@staticmethod
def _convert_dtype_obj_array(obj: np.ndarray) -> np.ndarray:
"""
Convert an object array to a different dtype.
Args:
obj (np.ndarray): Object array
Returns:
np.ndarray: Converted object array
"""
try:
result = np.array(obj.tolist())
except ValueError:
result = np.array(obj.tolist(), dtype=object)
if result.dtype != np.dtype(object):
state.logger.warning(
f"Deprecated data structure! "
f"Returned array was converted from dtype='O' to dtype={result.dtype} "
f"via `np.array(result.tolist())`.\n"
f"Please run rewrite_hdf5() (from a job: job.project_hdf5.rewrite_hdf5() ) to update this data! "
f"To update all your data run Project.maintenance.update.base_v0_3_to_v0_4('all')."
)
return result
else:
return obj
def __setitem__(
self,
key: str,
value: Union[pandas.DataFrame, pandas.Series, Dict, List, float, int],
) -> None:
"""
Store data inside the HDF5 file.
Args:
key (str): Key to store the data
value (Union[pandas.DataFrame, pandas.Series, Dict, List, float, int]): Data to store
"""
if hasattr(value, "to_hdf") & (
not isinstance(value, (pandas.DataFrame, pandas.Series))
):
value.to_hdf(self, key)
return
_write_hdf5_with_json_support(
hdf_filehandle=self.file_name,
h5_path=self._get_h5_path(key),
data=value,
)
@property
def base_name(self) -> str:
"""
Get the name of the HDF5 file without the file extension.
Returns:
str: Name of the HDF5 file without the file extension
"""
return ".".join(posixpath.basename(self.file_name).split(".")[:-1])
@property
def file_path(self) -> str:
"""
Get the directory where the HDF5 file is located.
Returns:
str: Directory where the HDF5 file is located
"""
return posixpath.dirname(self.file_name)
[docs]
def get_size(self, hdf: "FileHDFio") -> float:
"""
Get the size of the groups inside the HDF5 file.
Args:
hdf (FileHDFio): HDF5 file
Returns:
float: File size in Bytes
"""
return sum([sys.getsizeof(hdf[p]) for p in hdf.list_nodes()]) + sum(
[self.get_size(hdf[p]) for p in hdf.list_groups()]
)
[docs]
def copy(self) -> "FileHDFio":
"""
Copy the Python object which links to the HDF5 file - in contrast to copy_to() which copies the content of the
HDF5 file to a new location.
Returns:
FileHDFio: New FileHDFio object pointing to the same HDF5 file
"""
new_h5 = FileHDFio(file_name=self.file_name, h5_path=self.h5_path)
new_h5._filter = self._filter
return new_h5
[docs]
def create_group(self, name: str, track_order: bool = False) -> "FileHDFio":
"""
Create an HDF5 group - similar to a folder in the filesystem - the HDF5 groups allow the users to structure
their data.
Args:
name (str): Name of the HDF5 group
track_order (bool): If False, this groups tracks its elements in alphanumeric order,
if True, in insertion order
Returns:
FileHDFio: FileHDFio object pointing to the new group
"""
full_name = self._get_h5_path(name)
with _open_hdf(self.file_name, mode="a") as h:
try:
h.create_group(full_name, track_order=track_order)
except ValueError:
pass
h_new = self[name].copy()
return h_new
[docs]
def remove_group(self) -> None:
"""
Remove an HDF5 group if it exists. If the group does not exist, no error message is raised.
"""
try:
with _open_hdf(self.file_name, mode="a") as hdf_file:
del hdf_file[self.h5_path]
except KeyError:
pass
[docs]
def open(self, h5_rel_path: str) -> "FileHDFio":
"""
Create an HDF5 group and enter this specific group. If the group exists in the HDF5 path,
only the h5_path is set correspondingly, otherwise the group is created first.
Args:
h5_rel_path (str): Relative path from the current HDF5 path - h5_path - to the new group
Returns:
FileHDFio: FileHDFio object pointing to the new group
"""
new_h5_path = self.copy()
if os.path.isabs(h5_rel_path):
raise ValueError(
"Absolute paths are not supported -> replace by relative path name!"
)
if h5_rel_path.strip() == ".":
h5_rel_path = ""
if h5_rel_path.strip() != "":
new_h5_path.h5_path = self._get_h5_path(h5_rel_path)
new_h5_path.history.append(h5_rel_path)
return new_h5_path
[docs]
def close(self) -> None:
"""
Close the current HDF5 path and return to the path before the last open.
"""
path_lst = self.h5_path.split("/")
last = self.history[-1].strip()
if len(last) > 0:
hist_lst = last.split("/")
self.h5_path = "/".join(path_lst[: -len(hist_lst)])
if len(self.h5_path.strip()) == 0:
self.h5_path = "/"
del self.history[-1]
[docs]
def show_hdf(self) -> None:
"""
Iterate over the HDF5 data structure and generate a human-readable graph.
"""
self._walk()
[docs]
def remove_file(self) -> None:
"""
Remove the HDF5 file with all the related content.
"""
if self.file_exists:
os.remove(self.file_name)
[docs]
def get_from_table(self, path: str, name: str) -> Union[Dict, List, float, int]:
"""
Get a specific value from a pandas.DataFrame.
Args:
path (str): Relative path to the data object
name (str): Parameter key
Returns:
Union[Dict, List, float, int]: The value associated with the specific parameter key
"""
df_table = self.get(path)
keys = df_table["Parameter"]
if name in keys:
job_id = keys.index(name)
return df_table["Value"][job_id]
raise ValueError("Unknown name: {0}".format(name))
[docs]
def get_pandas(self, name: str) -> pandas.DataFrame:
"""
Load a dictionary from the HDF5 file and display the dictionary as a pandas DataFrame.
Args:
name (str): HDF5 node name
Returns:
pd.DataFrame: The dictionary as a pandas DataFrame object
"""
val = self.get(name)
if isinstance(val, dict):
df = pandas.DataFrame(val)
return df
[docs]
def get(
self, key: str, default: Optional[object] = None
) -> Union[Dict, List, float, int]:
"""
Get data from the HDF5 file.
Args:
key (str): Path to the data or key of the data object
default (object): Default value to return if key doesn't exist
Returns:
Union[Dict, List, float, int]: Data or data object
"""
try:
return self.__getitem__(key)
except ValueError:
if default is not None:
return default
else:
raise
[docs]
def put(
self,
key: str,
value: Union[pandas.DataFrame, pandas.Series, Dict, List, float, int],
) -> None:
"""
Store data inside the HDF5 file.
Args:
key (str): Key to store the data
value (Union[pandas.DataFrame, pandas.Series, Dict, List, float, int]): Data to store
"""
self.__setitem__(key=key, value=value)
def _list_all(self) -> Dict[str, List[str]]:
"""
List all groups and nodes of the HDF5 file - where groups are equivalent to directories and nodes to files.
Returns:
Dict[str, List[str]]: Dictionary with keys "groups" and "nodes" containing lists of groups and nodes
"""
if self.file_exists:
with _open_hdf(self.file_name) as hdf:
groups, nodes = _list_groups_and_nodes(hdf=hdf, h5_path=self.h5_path)
iopy_nodes = self._filter_io_objects(set(groups))
return {
"groups": sorted(list(set(groups) - iopy_nodes)),
"nodes": sorted(list((set(nodes) - set(groups)).union(iopy_nodes))),
}
else:
return {"groups": [], "nodes": []}
def _list_nodes(self) -> List[str]:
"""
List all nodes in the HDF5 file.
Returns:
List[str]: List of nodes in the HDF5 file
"""
return self.list_all()["nodes"]
def _list_groups(self) -> List[str]:
"""
List all groups in the HDF5 file.
Returns:
List[str]: List of groups in the HDF5 file
"""
return self.list_all()["groups"]
[docs]
def listdirs(self) -> List[str]:
"""
Equivalent to os.listdirs (consider groups as equivalent to dirs).
Returns:
List[str]: List of groups in pytables for the path self.h5_path
"""
return self.list_groups()
[docs]
def list_dirs(self) -> List[str]:
"""
Equivalent to os.listdirs (consider groups as equivalent to dirs).
Returns:
List[str]: List of groups in pytables for the path self.h5_path
"""
return self.list_groups()
[docs]
def keys(self) -> List[str]:
"""
List all groups and nodes of the HDF5 file - where groups are equivalent to directories and nodes to files.
Returns:
List[str]: All groups and nodes
"""
list_all_dict = self.list_all()
return list_all_dict["nodes"] + list_all_dict["groups"]
[docs]
def values(self) -> List[Union[Dict, List, float, int]]:
"""
List all values for all groups and nodes of the HDF5 file.
Returns:
List[Union[Dict, List, float, int]]: List of all values
"""
return [self[key] for key in self.keys()]
[docs]
def items(self) -> List[Tuple[str, Union[Dict, List, float, int]]]:
"""
List all keys and values as items of all groups and nodes of the HDF5 file.
Returns:
List[Tuple[str, Union[Dict, List, float, int]]]: List of sets (key, value)
"""
return [(key, self[key]) for key in self.keys()]
[docs]
def groups(self) -> "FileHDFio":
"""
Filter HDF5 file by groups.
Returns:
FileHDFio: An HDF5 file which is filtered by groups
"""
new = self.copy()
new._filter = ["groups"]
return new
[docs]
def nodes(self) -> "FileHDFio":
"""
Filter HDF5 file by nodes.
Returns:
FileHDFio: An HDF5 file which is filtered by nodes
"""
new = self.copy()
new._filter = ["nodes"]
return new
[docs]
def hd_copy(
self,
hdf_old: "FileHDFio",
hdf_new: "FileHDFio",
exclude_groups: Optional[List[str]] = None,
exclude_nodes: Optional[List[str]] = None,
) -> None:
"""
Copy data from one HDF5 file to another.
Args:
hdf_old (FileHDFio): Source HDF5 file
hdf_new (FileHDFio): Destination HDF5 file
exclude_groups (List[str]): List of groups to exclude from the copy
exclude_nodes (List[str]): List of nodes to exclude from the copy
"""
if exclude_groups is None or len(exclude_groups) == 0:
exclude_groups_split = list()
group_list = hdf_old.list_groups()
else:
exclude_groups_split = [i.split("/", 1) for i in exclude_groups]
check_groups = [i[-1] for i in exclude_groups_split]
group_list = list(
(set(hdf_old.list_groups()) ^ set(check_groups))
& set(hdf_old.list_groups())
)
if exclude_nodes is None or len(exclude_nodes) == 0:
exclude_nodes_split = list()
node_list = hdf_old.list_nodes()
else:
exclude_nodes_split = [i.split("/", 1) for i in exclude_nodes]
check_nodes = [i[-1] for i in exclude_nodes_split]
node_list = list(
(set(hdf_old.list_nodes()) ^ set(check_nodes))
& set(hdf_old.list_nodes())
)
hdf_new.write_dict(data_dict={p: hdf_old[p] for p in node_list})
for p in group_list:
h_new = hdf_new.create_group(p)
ex_n = [e[-1] for e in exclude_nodes_split if p == e[0] or len(e) == 1]
ex_g = [e[-1] for e in exclude_groups_split if p == e[0] or len(e) == 1]
self.hd_copy(hdf_old[p], h_new, exclude_nodes=ex_n, exclude_groups=ex_g)
return hdf_new
[docs]
@deprecate(job_name="ignored!", exclude_groups="ignored!", exclude_nodes="ignored!")
def rewrite_hdf5(
self,
job_name: Optional[str] = None,
info: bool = False,
exclude_groups: Optional[List[str]] = None,
exclude_nodes: Optional[List[str]] = None,
) -> None:
"""
Rewrite the entire hdf file.
Args:
job_name (Optional[str]): Deprecated argument, ignored.
info (bool): Whether to give the information on how much space has been saved.
exclude_groups (Optional[List[str]]): List of groups to exclude from the copy.
exclude_nodes (Optional[List[str]]): List of nodes to exclude from the copy.
"""
if job_name is not None:
state.logger.warning(
"Specifying job_name is deprecated and ignored! Future versions will change signature."
)
file_name = self.file_name
new_file = file_name + "_rewrite"
self_hdf = FileHDFio(file_name=file_name)
hdf_new = FileHDFio(file_name=new_file, h5_path="/")
old_logger_level = state.logger.level
state.logger.level = 50
hdf_new = self.hd_copy(self_hdf, hdf_new)
state.logger.level = old_logger_level
if info:
print(
"compression rate from old to new: {}".format(
self.file_size(self_hdf) / self.file_size(hdf_new)
)
)
print(
"data size vs file size: {}".format(
self.get_size(hdf_new) / self.file_size(hdf_new)
)
)
self.remove_file()
os.rename(hdf_new.file_name, file_name)
def __str__(self) -> str:
"""
Machine readable string representation
Returns:
str: list all nodes and groups as string
"""
return self.__repr__()
def __repr__(self) -> str:
"""
Human readable string representation
Returns:
str: list all nodes and groups as string
"""
return str(self.list_all())
def __del__(self):
del self._file_name
del self.history
del self._h5_path
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Compatibility function for the with statement
"""
self.close()
try:
self._store.close()
except AttributeError:
pass
def _read(self, item: str) -> Union[Dict, List, float, int]:
"""
Internal read function to read data from the HDF5 file
Args:
item (str): path to the data or key of the data object
Returns:
dict, list, float, int: data or data object
"""
return _read_hdf(hdf_filehandle=self.file_name, h5_path=self._get_h5_path(item))
[docs]
def write_dict_to_hdf(self, data_dict: dict) -> None:
"""
Write a dictionary to HDF5
Args:
data_dict (dict): dictionary with objects which should be written to HDF5
"""
self.write_dict(data_dict=data_dict)
[docs]
def read_dict_from_hdf(
self, group_paths: List[str] = [], recursive: bool = False
) -> dict:
"""
Read data from HDF5 file into a dictionary - by default only the nodes are converted to dictionaries, additional
sub groups can be specified using the group_paths parameter.
Args:
group_paths (List[str]): list of additional groups to be included in the dictionary, for example:
["input", "output", "output/generic"]
These groups are defined relative to the h5_path.
recursive (bool): Load all subgroups recursively
Returns:
Dict: The loaded data. Can be of any type supported by ``write_hdf5``.
"""
return read_nested_dict_from_hdf(
file_name=self.file_name,
h5_path=self.h5_path,
group_paths=group_paths,
recursive=recursive,
slash="ignore",
)
[docs]
def create_project_from_hdf5(self) -> "Project":
"""
Internal function to create a pyiron project pointing to the directory where the HDF5 file is located.
Returns:
Project: pyiron project object
"""
from pyiron_base.project.generic import Project
return Project(path=self.file_path)
def _get_h5_path(self, name: str) -> str:
"""
Internal function to combine the current h5_path with the relative path
Args:
name (str): relative path
Returns:
str: combined path
"""
return posixpath.join(self.h5_path, name)
def _get_h5io_type(self, name: str) -> str:
"""
Internal function to get h5io type
Args:
name (str): HDF5 key
Returns:
str: h5io type
"""
with _open_hdf(self.file_name) as store:
return str(store[self.h5_path][name].attrs.get("TITLE", ""))
def _filter_io_objects(self, groups: Union[List[str], Set[str]]) -> Set[str]:
"""
Internal function to extract h5io objects (which have the same type as normal groups)
Args:
groups (list, set): list of groups (as obtained e.g. from listdirs
Returns:
set: h5io objects
"""
h5io_types = (
"dict",
"list",
"tuple",
"pd_dataframe",
"pd_series",
"multiarray",
"json",
)
group_h5io = set(
[group for group in groups if self._get_h5io_type(group) in h5io_types]
)
return group_h5io
def _walk(self, level: int = 0) -> None:
"""
Internal helper function for show_hdf() - iterating over the HDF5 datastructure and generating a human readable
graph.
Args:
level (int): iteration level
"""
l_dict = self.list_all()
indent = level * " "
for node in l_dict["nodes"]:
print(indent + "node", node)
for group in l_dict["groups"]:
print(indent + "group: ", group)
with self.open(group) as hdf_group:
hdf_group._walk(level=level + 1)
[docs]
class BaseHDFio:
"""
Dummy class to allow other code to type check if it received a ProjectHDFio
or DummyHDFio object. Usually this is used to check if it is safe to call
`to_object` on this object.
"""
pass
[docs]
class ProjectHDFio(FileHDFio, BaseHDFio):
"""
The ProjectHDFio class connects the FileHDFio and the Project class, it is derived from the FileHDFio class but in
addition the a project object instance is located at self.project enabling direct access to the database and other
project related functionality, some of which are mapped to the ProjectHDFio class as well.
Args:
project (Project): pyiron Project the current HDF5 project is located in
file_name (str): name of the HDF5 file - in contrast to the FileHDFio object where file_name represents the
absolute path of the HDF5 file.
h5_path (str): absolute path inside the h5 path - starting from the root group
mode (str): mode : {'a', 'w', 'r', 'r+'}, default 'a'
See HDFStore docstring or tables.open_file for info about modes
Attributes:
.. attribute:: project
Project instance the ProjectHDFio object is located in
.. attribute:: root_path
the pyiron user directory, defined in the .pyiron configuration
.. attribute:: project_path
the relative path of the current project / folder starting from the root path
of the pyiron user directory
.. attribute:: path
the absolute path of the current project / folder plus the absolute path in the HDF5 file as one path
.. attribute:: file_name
absolute path to the HDF5 file
.. attribute:: h5_path
path inside the HDF5 file - also stored as absolute path
.. attribute:: history
previously opened groups / folders
.. attribute:: file_exists
boolean if the HDF5 was already written
.. attribute:: base_name
name of the HDF5 file but without any file extension
.. attribute:: file_path
directory where the HDF5 file is located
.. attribute:: is_root
boolean if the HDF5 object is located at the root level of the HDF5 file
.. attribute:: is_open
boolean if the HDF5 file is currently opened - if an active file handler exists
.. attribute:: is_empty
boolean if the HDF5 file is empty
.. attribute:: user
current unix/linux/windows user who is running pyiron
.. attribute:: sql_query
an SQL query to limit the jobs within the project to a subset which matches the SQL query.
.. attribute:: db
connection to the SQL database
.. attribute:: working_directory
working directory of the job is executed in - outside the HDF5 file
"""
[docs]
def __init__(
self,
project: "pyiron_base.project.generic.Project",
file_name: str,
h5_path: Optional[str] = None,
mode: Optional[str] = None,
) -> None:
self._file_name = _get_safe_filename(file_name)
if h5_path is None:
h5_path = "/"
self._project = project.copy()
super(ProjectHDFio, self).__init__(
file_name=os.path.join(self._project.path, self._file_name).replace(
"\\", "/"
),
h5_path=h5_path,
mode=mode,
)
@property
def base_name(self) -> str:
"""
The absolute path to of the current pyiron project - absolute path on the file system, not including the HDF5
path.
Returns:
str: current project path
"""
return self._project.path
@property
def db(self) -> "DatabaseAccess":
"""
Get connection to the SQL database
Returns:
DatabaseAccess: database conncetion
"""
return self._project.db
@property
def path(self) -> str:
"""
Absolute path of the HDF5 group starting from the system root - combination of the absolute system path plus the
absolute path inside the HDF5 file starting from the root group.
Returns:
str: absolute path
"""
return os.path.join(self._project.path, self.h5_path[1:]).replace("\\", "/")
@property
def project(self) -> "pyiron_base.project.generic.Project":
"""
Get the project instance the ProjectHDFio object is located in
Returns:
Project: pyiron project
"""
return self._project
@property
def project_path(self) -> str:
"""
the relative path of the current project / folder starting from the root path
of the pyiron user directory
Returns:
str: relative path of the current project / folder
"""
return self._project.project_path
@property
def root_path(self) -> str:
"""
the pyiron user directory, defined in the .pyiron configuration
Returns:
str: pyiron user directory of the current project
"""
return self._project.root_path
@property
def sql_query(self) -> str:
"""
Get the SQL query for the project
Returns:
str: SQL query
"""
return self._project.sql_query
@sql_query.setter
def sql_query(self, new_query: str) -> None:
"""
Set the SQL query for the project
Args:
new_query (str): SQL query
"""
self._project.sql_query = new_query
@property
def user(self) -> str:
"""
Get current unix/linux/windows user who is running pyiron
Returns:
str: username
"""
return self._project.user
@property
def working_directory(self) -> str:
"""
Get the working directory of the current ProjectHDFio object. The working directory equals the path but it is
represented by the filesystem:
/absolute/path/to/the/file.h5/path/inside/the/hdf5/file
becomes:
/absolute/path/to/the/file_hdf5/path/inside/the/hdf5/file
Returns:
str: absolute path to the working directory
"""
project_full_path = "/".join(self.file_name.split("/")[:-1])
file_name = self.file_name.split("/")[-1]
if ".h5" in file_name:
file_name = file_name.split(".h5")[0]
file_name += "_hdf5"
if self.h5_path[0] == "/":
h5_path = self.h5_path[1:]
else:
h5_path = self.h5_path
return posixpath.join(project_full_path, file_name, h5_path)
@property
def _filter(self) -> str:
"""
Get project filter
Returns:
str: project filter
"""
return self._project._filter
@_filter.setter
def _filter(self, new_filter: str) -> None:
"""
Set project filter
Args:
new_filter (str): project filter
"""
self._project._filter = new_filter
@property
def _inspect_mode(self) -> bool:
"""
Check if inspect mode is activated
Returns:
bool: [True/False]
"""
return self._project._inspect_mode
@_inspect_mode.setter
def _inspect_mode(self, read_mode: bool) -> None:
"""
Activate or deactivate inspect mode
Args:
read_mode (bool): [True/False]
"""
self._project._inspect_mode = read_mode
@property
def name(self) -> str:
"""
Get the name of the HDF5 group.
Returns:
str: The name of the HDF5 group.
"""
return os.path.basename(self.h5_path)
[docs]
def copy(self) -> "ProjectHDFio":
"""
Copy the ProjectHDFio object - copying just the Python object but maintaining the same pyiron path
Returns:
ProjectHDFio: copy of the ProjectHDFio object
"""
new_h5 = ProjectHDFio(
project=self._project, file_name=self._file_name, h5_path=self._h5_path
)
new_h5._filter = self._filter
return new_h5
[docs]
def create_hdf(self, path: str, job_name: str) -> "ProjectHDFio":
"""
Create an ProjectHDFio object to store project related information - for testing aggregated data
Args:
path (str): absolute path
job_name (str): name of the HDF5 container
Returns:
ProjectHDFio: HDF5 object
"""
return self._project.create_hdf(path=path, job_name=job_name)
[docs]
def create_working_directory(self) -> None:
"""
Create the working directory on the file system if it does not exist already.
"""
os.makedirs(self.working_directory, exist_ok=True)
[docs]
def to_object(self, class_name: Optional[str] = None, **kwargs) -> object:
"""
Load the full pyiron object from an HDF5 file
Args:
class_name(str, optional): if the 'TYPE' node is not available in
the HDF5 file a manual object type can be set,
must be as reported by `str(type(obj))`
**kwargs: optional parameters optional parameters to override init
parameters
Returns:
pyiron object of the given class_name
"""
return _to_object(self, class_name, **kwargs)
[docs]
def get_job_id(self, job_specifier: Union[str, int]) -> int:
"""
get the job_id for job named job_name in the local project path from database
Args:
job_specifier (str, int): name of the job or job ID
Returns:
int: job ID of the job
"""
return self._project.get_job_id(job_specifier=job_specifier)
[docs]
def inspect(self, job_specifier: Union[str, int]) -> "JobCore":
"""
Inspect an existing pyiron object - most commonly a job - from the database
Args:
job_specifier (str, int): name of the job or job ID
Returns:
JobCore: Access to the HDF5 object - not a GenericJob object - use load() instead.
"""
return self._project.inspect(job_specifier=job_specifier)
[docs]
def load(
self, job_specifier: Union[str, int], convert_to_object: bool = True
) -> Union["GenericJob", "JobCore"]:
"""
Load an existing pyiron object - most commonly a job - from the database
Args:
job_specifier (str, int): name of the job or job ID
convert_to_object (bool): convert the object to an pyiron object or only access the HDF5 file - default=True
accessing only the HDF5 file is about an order of magnitude faster, but only
provides limited functionality. Compare the GenericJob object to JobCore object.
Returns:
GenericJob, JobCore: Either the full GenericJob object or just a reduced JobCore object
"""
return self._project.load(
job_specifier=job_specifier, convert_to_object=convert_to_object
)
[docs]
def load_from_jobpath(
self,
job_id: Optional[int] = None,
db_entry: Optional[dict] = None,
convert_to_object: bool = True,
) -> Union["GenericJob", "JobCore"]:
"""
Internal function to load an existing job either based on the job ID or based on the database entry dictionary.
Args:
job_id (int, optional): Job ID - optional, but either the job_id or the db_entry is required.
db_entry (dict, optional): database entry dictionary - optional, but either the job_id or the db_entry is required.
convert_to_object (bool): convert the object to an pyiron object or only access the HDF5 file - default=True
accessing only the HDF5 file is about an order of magnitude faster, but only
provides limited functionality. Compare the GenericJob object to JobCore object.
Returns:
GenericJob, JobCore: Either the full GenericJob object or just a reduced JobCore object
"""
return self._project.load_from_jobpath(
job_id=job_id, db_entry=db_entry, convert_to_object=convert_to_object
)
[docs]
def remove_job(
self, job_specifier: Union[str, int], _unprotect: bool = False
) -> None:
"""
Remove a single job from the project based on its job_specifier.
Args:
job_specifier (Union[str, int]): Name of the job or job ID.
_unprotect (bool): [True/False] Delete the job without validating the dependencies to other jobs.
Default is False.
"""
self._project.remove_job(job_specifier=job_specifier, _unprotect=_unprotect)
[docs]
def create_project_from_hdf5(self) -> "Project":
"""
Internal function to create a pyiron project pointing to the directory where the HDF5 file is located.
Returns:
Project: pyiron project object
"""
return self._project.__class__(path=self.file_path)
[docs]
class DummyHDFio(HasGroups, BaseHDFio):
"""
A dummy ProjectHDFio implementation to serialize objects into a dict
instead of a HDF5 file.
It is modeled after ProjectHDFio, but supports just enough methods to
successfully write objects.
After all desired objects have been written to it, you may extract a pure
dict from with with `.to_dict`.
A simple example for storing data containers:
>>> from pyiron_base import DataContainer, Project
>>> pr = Project(...)
>>> hdf = DummyHDFio(pr, '/', {})
>>> d = DataContainer({'a': 42, 'b':{'c':4, 'g':33}})
>>> d.to_hdf(hdf)
>>> hdf.to_dict()
{'READ_ONLY': False,
'a__index_0': 42,
'b__index_1': {
'READ_ONLY': False,
'c__index_0': 4,
'g__index_1': 33,
'NAME': 'DataContainer',
'TYPE': "<class
'pyiron_base.storage.datacontainer.DataContainer'>",
'OBJECT': 'DataContainer',
'VERSION': '0.1.0',
'HDF_VERSION': '0.2.0'
},
'NAME': 'DataContainer',
'TYPE': "<class
'pyiron_base.storage.datacontainer.DataContainer'>",
'OBJECT': 'DataContainer',
'VERSION': '0.1.0',
'HDF_VERSION': '0.2.0'}
"""
[docs]
def __init__(
self,
project,
h5_path: str,
cont: Optional[dict] = None,
root: Optional["DummyHDFio"] = None,
):
"""
Args:
project (Project): the project this object should advertise itself
belong to; in practice it is not often used for
writing objects
h5_path (str): the path of the HDF group this object fakes
cont (dict, optional): dict to save written values into, make a new
one if not given
root (DummyHDFio, optional): if this object will be a child of
another one, the parent must be passed
here, to make hdf['..'] work.
"""
self._project = project
self._dict = {}
self._h5_path = h5_path
self._root = root
if cont is not None:
self.write_dict_to_hdf(cont)
def __getitem__(self, item: str) -> Union["DummyHDFio", Any]:
"""
Return a value from storage.
If `item` is in :meth:`.list_groups()` this must return another :class:`.GenericStorage`.
Args:
item (str): name of value
Returns:
:class:`.GenericStorage`: if `item` refers to a sub group
object: value that is stored under `item`
Raises:
ValueError: `item` is neither a node or a sub group of this group
"""
try:
v = self._dict[item]
if isinstance(v, DummyHDFio) and v._empty():
raise KeyError()
else:
return v
except KeyError:
if item == "..":
return self._root
# compat with ProjectHDFio with for some reasons raises ValueErrors
raise ValueError(item) from None
[docs]
def get(
self, key: Union[str, slice], default: Optional[object] = None
) -> Union[dict, list, float, int]:
"""
Internal wrapper function for __getitem__() - self[name]
Args:
key (str, slice): path to the data or key of the data object
default (object): default value to return if key doesn't exist
Returns:
dict, list, float, int: data or data object
"""
try:
return self[key]
except ValueError:
if default is not None:
return default
else:
raise
def __setitem__(self, item: str, value: Any) -> None:
self._dict[item] = value
[docs]
def create_group(self, name: str) -> "DummyHDFio":
"""
Create a new sub group.
Args:
name (str): name of the new group
"""
if name == "..":
return self._root
d = self._dict.get(name, None)
if d is None:
self._dict[name] = d = type(self)(
self._project, os.path.join(self.h5_path, name), cont={}, root=self
)
elif isinstance(d, DummyHDFio):
pass
else:
raise RuntimeError(f"'{name}' is already a node!")
return d
def _list_nodes(self) -> List[str]:
return [k for k, v in self._dict.items() if not isinstance(v, DummyHDFio)]
def _list_groups(self) -> List[str]:
return [
k
for k, v in self._dict.items()
if isinstance(v, DummyHDFio) and not v._empty()
]
def __contains__(self, item) -> bool:
return item in self._dict
@property
def project(self) -> "Project":
if self._project is not None:
return self._project
else:
raise RuntimeError("No project set!")
@property
def h5_path(self) -> str:
return self._h5_path
[docs]
def open(self, name: str) -> "DummyHDFio":
"""
Descend into a sub group.
If `name` does not exist yet, create a new group. Calling :meth:`~.close` on the returned object returns this
object.
Args:
name (str): name of sub group
Returns:
:class:`.GenericStorage`: sub group
"""
# FIXME: what if name in self.list_nodes()
new = self.create_group(name)
new._prev = self
return new
[docs]
def close(self) -> "DummyHDFio":
"""
Surface from a sub group.
If this object was not returned from a previous call to :meth:`.open` it returns itself silently.
"""
try:
return self._prev
except AttributeError:
return self
def __enter__(self) -> "DummyHDFio":
"""
Compatibility function for the with statement
"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Compatibility function for the with statement
"""
self.close()
[docs]
def to_dict(self) -> dict:
"""
Convert the HDF5 data to a dictionary.
Returns:
dict: The converted dictionary.
"""
def unwrap(v):
if isinstance(v, DummyHDFio):
return v.to_dict()
return v
return {k: unwrap(v) for k, v in self._dict.items()}
[docs]
def to_object(self, class_name: Optional[str] = None, **kwargs) -> object:
"""
Load the full pyiron object from an HDF5 file
Args:
class_name(str, optional): if the 'TYPE' node is not available in
the HDF5 file a manual object type can be set,
must be as reported by `str(type(obj))`
**kwargs: optional parameters optional parameters to override init
parameters
Returns:
pyiron object of the given class_name
"""
return _to_object(self, class_name, **kwargs)
def _empty(self) -> bool:
"""
Check if the DummyHDFio object is empty.
Returns:
bool: True if the object is empty, False otherwise.
"""
if len(self._dict) == 0:
return True
return len(self.list_nodes()) == 0 and all(
self[g]._empty() for g in self.list_groups()
)
[docs]
def write_dict_to_hdf(self, data_dict: dict) -> None:
"""
Write a dictionary to the HDF5 file.
Args:
data_dict (dict): The dictionary to be written to the HDF5 file.
"""
for k, v in data_dict.items():
if isinstance(v, dict):
g = self.create_group(k)
g.write_dict_to_hdf(v)
else:
self[k] = v
[docs]
def read_dict_from_hdf(
self, group_paths: List[str] = [], recursive: bool = False
) -> Union[dict, Any]:
"""
Read data from the HDF5 file and return it as a dictionary.
Args:
group_paths (List[str]): List of group paths to read data from.
recursive (bool): If True, read data recursively from all groups.
Returns:
Union[dict, Any]: The read data as a dictionary or any other object if recursive is True.
"""
if recursive:
return self.to_dict()
data = {}
for path in group_paths:
keys = path.split("/")
try:
d = self[key]
for key in keys[1:]:
d = d[key]
except KeyError:
d = None
if isinstance(d, DummyHDFio):
d = d.to_object()
data[path] = d
return data
def _get_safe_filename(file_name: str) -> str:
"""
Get a safe filename by replacing special characters and adding a file extension.
Args:
file_name (str): The original file name.
Returns:
str: The safe file name with a file extension.
"""
file_path_no_ext, file_ext = os.path.splitext(file_name)
file_path = os.path.dirname(file_path_no_ext)
file_name_no_ext = os.path.basename(file_path_no_ext)
file_name = os.path.join(
file_path, _get_safe_job_name(name=file_name_no_ext) + file_ext
)
file_name += ".h5" if not file_name.endswith(".h5") else ""
return file_name.replace("\\", "/")