import hashlib
import inspect
import re
import time
from concurrent.futures import Future
from typing import Dict, List, Optional, Tuple
import cloudpickle
from pyiron_base.jobs.job.extension.server.generic import Server
[docs]
class JobFuture(Future):
[docs]
def __init__(self, job):
super().__init__()
self._job = job
@property
def job(self):
return self._job
[docs]
def done(self):
return self._job.status.finished
[docs]
def result(self):
self._job.storage.from_hdf(hdf=self._job.project_hdf5)
if "error" in self._job.output:
raise self._job.output["error"]
return self._job.output["result"]
[docs]
def draw(node_dict: Dict[str, object], edge_lst: List[List[str]]) -> None:
"""
Draw graph of nodes and edges
Args:
node_dict (Dict[str, object]): Dictionary of nodes
edge_lst (List[List[str]]): List of edges
"""
import networkx as nx
from IPython.display import SVG, display
graph = nx.DiGraph()
for k, v in node_dict.items():
graph.add_node(k, label=str(k).rsplit("_", 1)[0] + "=" + str(v))
for edge in edge_lst:
graph.add_edge(edge[1], edge[0])
svg = nx.nx_agraph.to_agraph(graph).draw(prog="dot", format="svg")
display(SVG(svg))
[docs]
def evaluate_function(funct: callable, input_dict: dict) -> object:
"""
Evaluate python function by using the input dictionary as *args and **kwargs
Args:
funct (callable): the function to evaluate
input_dict (dict): input dictionary
Returns:
object: output of the evaluated function
"""
input_dict = recursive_dict_resolve(input_dict=input_dict)
if "kwargs" in input_dict.keys():
input_dict.update(input_dict.pop("kwargs"))
if "args" in input_dict.keys():
args = input_dict.pop("args")
return funct(*args, **input_dict)
else:
return funct(**input_dict)
[docs]
def get_function_parameter_dict(funct: callable) -> dict:
"""
Get dictionary of parameters for a function
Args:
funct (callable): the function to get the parameters for
Returns:
dict: parameters for the function
"""
return {
k: None if v.default == inspect._empty else v.default
for k, v in inspect.signature(funct).parameters.items()
}
[docs]
def get_graph(
obj: object,
obj_name: Optional[str] = None,
nodes_dict: dict = {},
edges_lst: list = [],
link_node: Optional[str] = None,
) -> Tuple[dict, list]:
"""
Get dictionary of nodes with node names as keys and node objects as values. In addition, generate a list of edges,
consisting of pairs of node names which are linked together.
Args:
obj (object): Object to generate dictionary of nodes and list of edges
obj_name (str): Name of the object
nodes_dict (dict): Dictionary of nodes
edges_lst (list): List of edges
link_node (str): Name of the node to link to
Returns:
dict, list: dictionary of nodes and list of edges
"""
node_name = get_node_name(node=obj, node_name=obj_name)
nodes_dict.update({node_name: obj})
if link_node is not None:
edges_lst.append([link_node, node_name])
if isinstance(obj, DelayedObject):
for k, v in obj._input.items():
if k == "kwargs":
for sk, sv in v.items():
nodes_dict, edges_lst = get_graph(
obj=sv,
obj_name=sk,
nodes_dict=nodes_dict,
edges_lst=edges_lst,
link_node=node_name,
)
else:
nodes_dict, edges_lst = get_graph(
obj=v,
obj_name=k,
nodes_dict=nodes_dict,
edges_lst=edges_lst,
link_node=node_name,
)
elif isinstance(obj, dict) and any(
[isinstance(v, DelayedObject) for v in obj.values()]
):
for k, v in obj.items():
nodes_dict, edges_lst = get_graph(
obj=v,
obj_name=k,
nodes_dict=nodes_dict,
edges_lst=edges_lst,
link_node=node_name,
)
elif isinstance(obj, list) and any([isinstance(v, DelayedObject) for v in obj]):
for k, v in enumerate(obj):
nodes_dict, edges_lst = get_graph(
obj=v,
obj_name=str(k),
nodes_dict=nodes_dict,
edges_lst=edges_lst,
link_node=node_name,
)
return nodes_dict, edges_lst
[docs]
def get_hash(binary: bytes) -> str:
"""
Get the hash of a binary string - remove the specification of jupyter kernel from hash to be deterministic
Args:
binary (bytes): binary string to hash
Returns:
str: hash of the binary string
"""
binary_no_ipykernel = re.sub(b"(?<=/ipykernel_)(.*)(?=/)", b"", binary)
return str(hashlib.md5(binary_no_ipykernel).hexdigest())
[docs]
def get_node_name(node: object, node_name: Optional[str] = None) -> str:
"""
Get name of the node
Args:
node (object): Node to get the name for
node_name (str): Name of the node in case it is already defined
Returns:
str: name of the node
"""
if isinstance(node, DelayedObject) and node_name is None:
try:
node_name = node._function.__name__
except TypeError:
node_name = str(node).replace("<", "").replace(" object at ", "")
if node_name is None:
try:
node_name = node.__name__
except AttributeError:
node_name = str(type(node))
try:
return node_name + "_" + get_hash(binary=cloudpickle.dumps(node))
except TypeError:
return node_name
[docs]
def recursive_dict_resolve(input_dict: dict) -> dict:
"""
Recursively resolve the dictionary to call result() on all objects of type DelayedObject
Args:
input_dict (dict): dictionary to recursively resolve
Returns:
dict: resolved dictionary
"""
output_dict = {}
for k, v in input_dict.items():
if isinstance(v, DelayedObject):
output_dict[k] = v.pull()
elif isinstance(v, dict):
output_dict[k] = recursive_dict_resolve(input_dict=v)
elif isinstance(v, list):
output_dict[k] = list(
recursive_dict_resolve(
input_dict={k: v for k, v in enumerate(v)}
).values()
)
else:
output_dict[k] = v
return output_dict
[docs]
class Selector:
[docs]
def __init__(self, obj: object, selector: str):
self._obj = obj
self._selector = selector
def __getattr__(self, name: str):
if self._selector == "files" and name in self._obj._output_file_lst:
obj_copy = self._obj.__copy__()
obj_copy._output_file = name
return obj_copy
elif self._selector == "output" and name in self._obj._output_key_lst:
obj_copy = self._obj.__copy__()
obj_copy._output_key = name
return obj_copy
else:
return self.__getattribute__(name)
[docs]
class DelayedObject:
[docs]
def __init__(
self,
function: callable,
*args,
output_key: Optional[str] = None,
output_file: Optional[str] = None,
output_file_lst: list = [],
output_key_lst: list = [],
list_length: Optional[int] = None,
list_index: Optional[int] = None,
input_prefix_key: Optional[str] = None,
execute_in_working_directory: bool = False,
**kwargs,
):
self._input = {}
self._function = function
try:
self._input.update(
inspect.signature(self._function).bind(*args, **kwargs).arguments
)
except TypeError:
pass
self.__name__ = "DelayedObject"
self._result = None
self._python_function = None
self._job = None
self._server = Server()
self._output_key = output_key
self._output_file = output_file
self._output_key_lst = output_key_lst
self._output_file_lst = output_file_lst
self._list_length = list_length
self._list_index = list_index
self._input_prefix_key = input_prefix_key
self._execute_in_working_directory = execute_in_working_directory
@property
def input(self):
if self._input_prefix_key is not None and self._input_prefix_key in self._input:
return self._input[self._input_prefix_key]
else:
return self._input
@property
def server(self):
return self._server
@property
def execute_in_working_directory(self) -> bool:
return self._execute_in_working_directory
@execute_in_working_directory.setter
def execute_in_working_directory(self, val: bool) -> None:
self._execute_in_working_directory = bool(val)
def draw(self):
node_dict, edge_lst = self.get_graph()
draw(node_dict=node_dict, edge_lst=edge_lst)
def get_python_result(self):
if isinstance(self._result, dict) and self._output_key is not None:
return self._result[str(self._output_key)]
elif isinstance(self._result, list):
if self._list_index is not None:
return self._result[self._list_index]
elif self._output_key is not None:
return self._result[int(self._output_key)]
else:
return self._result
elif self._output_key is not None:
return getattr(self._result.output, self._output_key)
else:
return self._result
def get_file_result(self):
return getattr(self._result.files, self._output_file)
def pull(self):
if self._result is None:
if (
"_return_job_object"
in inspect.signature(self._function).parameters.keys()
):
self._input.update(
{"_server_obj": self.server, "_return_job_object": True}
)
self._job = evaluate_function(
funct=self._function, input_dict=self._input
)
self._job.execute_in_working_directory = (
self._execute_in_working_directory
)
self._job.run()
if self._job.status.finished:
self._result = self._job.output["result"]
elif self._job.status.aborted:
raise self._job.output.error
else:
return JobFuture(job=self._job)
else:
self._input.update({"_server_obj": self.server})
self._result = evaluate_function(
funct=self._function, input_dict=self._input
)
if self._output_key is not None:
return self.get_python_result()
elif self._output_file is not None:
return self.get_file_result()
elif isinstance(self._result, list) and self._list_index is not None:
return self._result[self._list_index]
elif isinstance(self._result, dict) and self._list_index is not None:
return self._result[str(self._list_index)]
else:
return self._result
def get_graph(self):
return get_graph(obj=self, nodes_dict={}, edges_lst=[], link_node=None)
def __copy__(self):
obj_copy = DelayedObject(
function=self._function,
output_key=self._output_key,
output_file=self._output_file,
output_file_lst=self._output_file_lst,
output_key_lst=self._output_key_lst,
list_length=self._list_length,
list_index=self._list_index,
input_prefix_key=self._input_prefix_key,
)
obj_copy._python_function = self._python_function
obj_copy._input = self._input
obj_copy._result = self._result
obj_copy._execute_in_working_directory = self._execute_in_working_directory
obj_copy._server.from_dict(self._server.to_dict())
return obj_copy
def __getattr__(self, name):
if name in ["files", "output"]:
return Selector(obj=self, selector=name)
else:
return self.__getattribute__(name)
def __iter__(self):
if self._list_length is not None and self._list_index is None:
for i in range(self._list_length):
obj = self.__copy__()
obj._list_index = i
yield obj
else:
raise TypeError(
"'DelayedObject' object is not iterable, when self._list_length = None."
)