# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.
import codecs
import concurrent.futures
import json
import os
import types
from datetime import datetime
from typing import Any, List, Optional, Tuple, Union
import cloudpickle
import numpy as np
import pandas
from pandas.errors import EmptyDataError
from pyiron_snippets.deprecate import deprecate
from tqdm.auto import tqdm
from pyiron_base.jobs.job.extension import jobstatus
from pyiron_base.jobs.job.generic import GenericJob
from pyiron_base.storage.hdfio import FileHDFio
__author__ = "Uday Gajera, Jan Janssen, Joerg Neugebauer"
__copyright__ = (
"Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
"Computational Materials Design (CM) Department"
)
__version__ = "0.0.1"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "development"
__date__ = "Sep 1, 2018"
def _to_pickle(hdf: FileHDFio, key: str, value: Any) -> None:
"""
Pickle and store an object in an HDF file.
Args:
hdf (FileHDFio): The HDF file object.
key (str): The key to store the object under.
value (Any): The object to be pickled and stored.
"""
hdf[key] = codecs.encode(
cloudpickle.dumps(obj=value, protocol=5, buffer_callback=None), "base64"
).decode()
def _from_pickle(hdf: FileHDFio, key: str) -> Any:
"""
Load and unpickle an object from an HDF file.
Args:
hdf (FileHDFio): The HDF file object.
key (str): The key of the object in the HDF file.
Returns:
Any: The unpickled object.
"""
try:
return cloudpickle.loads(codecs.decode(hdf[key].encode(), "base64"))
except ModuleNotFoundError:
import dill
return dill.loads(codecs.decode(hdf[key].encode(), "base64"))
[docs]
def get_job_id(job):
"""
Get the job ID.
Args:
job: The job object.
Returns:
dict: A dictionary containing the job ID.
"""
return {"job_id": job.job_id}
[docs]
class FunctionContainer(object):
"""
Class which is able to append, store and retreive a set of functions.
"""
[docs]
def __init__(self, system_function_lst: Optional[List[callable]] = None):
if system_function_lst is None:
system_function_lst = []
self._user_function_dict = {}
self._system_function_lst = system_function_lst
self._system_function_dict = {
func.__name__: False for func in self._system_function_lst
}
self._system_function_dict["get_job_id"] = True
@property
def _function_lst(self):
return [
funct
for funct in self._system_function_lst
if funct.__name__ in self._system_function_dict.keys()
and self._system_function_dict[funct.__name__]
] + list(self._user_function_dict.values())
def _to_hdf(self, hdf: FileHDFio) -> None:
"""
Store the user and system function dictionaries in an HDF file.
Args:
hdf (FileHDFio): The HDF file object.
"""
_to_pickle(hdf=hdf, key="user_function_dict", value=self._user_function_dict)
_to_pickle(
hdf=hdf, key="system_function_dict", value=self._system_function_dict
)
def _from_hdf(self, hdf: FileHDFio) -> None:
"""
Load data from an HDF file.
Args:
hdf (str): The path to the HDF file.
Returns:
None
"""
self._user_function_dict = _from_pickle(hdf=hdf, key="user_function_dict")
self._system_function_dict = _from_pickle(hdf=hdf, key="system_function_dict")
def __setitem__(self, key: str, item: Union[str, types.FunctionType]) -> None:
if isinstance(item, str):
self._user_function_dict[key] = eval(
'lambda job: {"' + key + '":' + item + "}"
)
elif isinstance(item, types.FunctionType):
self._user_function_dict[key] = lambda job: {key: item(job)}
else:
raise TypeError("unsupported function type!")
def __getitem__(self, key: str) -> callable:
return self._user_function_dict[key]
def __getattr__(self, name: str) -> callable:
if name in list(self._system_function_dict.keys()):
self._system_function_dict[name] = True
return self._system_function_dict[name]
else:
super(FunctionContainer, self).__getattr__(name)
def __dir__(self) -> list:
return list(self._system_function_dict.keys())
[docs]
class JobFilters(object):
"""
Certain predefined job filters
"""
@staticmethod
def job_type(job_type: str) -> callable:
def filter_job_type(job):
return job.__name__ == job_type
return filter_job_type
@staticmethod
def job_name_contains(job_name_segment: str) -> callable:
def filter_job_name_segment(job):
return job_name_segment in job.job_name
return filter_job_name_segment
[docs]
class PyironTable:
"""
Class for easy, efficient, and pythonic analysis of data from pyiron projects
Args:
project (pyiron_base.project.generic.Project): The project to analyze
name (str): Name of the pyiron table
system_function_lst (list/ None): List of built-in functions
"""
[docs]
def __init__(
self,
project: "pyiron_base.project.generic.Project",
name: Optional[str] = None,
system_function_lst: List[callable] = None,
csv_file_name: Optional[str] = None,
):
self._project = project
self._df = pandas.DataFrame({})
self.convert_to_object = False
self._name = name
self._db_filter_function = always_true_pandas
self._filter_function = always_true
self._filter = JobFilters()
self._system_function_lst = system_function_lst
self.add = FunctionContainer(system_function_lst=self._system_function_lst)
self._csv_file = csv_file_name
@property
def filter(self) -> JobFilters:
"""
Object containing pre-defined filter functions
Returns:
pyiron.table.datamining.JobFilters: The object containing the filters
"""
return self._filter
@property
def name(self) -> str:
"""
Name of the table. Takes the project name if not specified
Returns:
str: Name of the table
"""
if self._name is None:
return self._project.name
return self._name
@property
def db_filter_function(self) -> Union[callable, None]:
"""
Function to filter the a project database table before job specific functions are applied.
The function must take a pyiron project table in the pandas.DataFrame format (project.job_table()) and return a
boolean pandas.DataSeries with the same number of rows as the project table
Example:
>>> def job_filter_function(df):
>>> return (df["chemicalformula"=="H2"]) & (df["hamilton"=="Vasp"])
>>> table.db_filter_function = job_filter_function
"""
return self._db_filter_function
@db_filter_function.setter
def db_filter_function(self, funct: callable) -> None:
self._db_filter_function = funct
@property
def filter_function(self) -> Union[callable, None]:
"""
Function to filter each job before more expensive functions are applied
Example:
>>> def job_filter_function(job):
>>> return (job.status == "finished") & ("murn" in job.job_name)
>>> table.filter_function = job_filter_function
"""
return self._filter_function
@filter_function.setter
def filter_function(self, funct: callable):
self._filter_function = funct
def _get_new_functions(self, file: FileHDFio) -> Tuple[List, List]:
"""
Get new user-defined and system functions from an HDF5 file.
Args:
file (FileHDFio): The HDF5 file to extract data from.
Returns:
Tuple[List, List]: A tuple containing two lists:
- new_user_functions (List): A list of new user-defined functions.
- new_system_functions (List): A list of new system functions.
Raises:
IndexError: If an index is out of range.
ValueError: If a value is not valid.
TypeError: If a type is incorrect.
"""
try:
(
temp_user_function_dict,
temp_system_function_dict,
) = self._get_data_from_hdf5(hdf=file)
new_user_functions = [
key
for key in self.add._user_function_dict.keys()
if key not in temp_user_function_dict.keys()
]
new_system_functions = [
k
for k, v in self.add._system_function_dict.items()
if v and not temp_system_function_dict[k]
]
except (IndexError, ValueError, TypeError):
new_user_functions = []
new_system_functions = []
return new_user_functions, new_system_functions
[docs]
def create_table(
self,
file: FileHDFio,
job_status_list: List[str],
executor: Optional["concurrent.futures.Executor"] = None,
enforce_update: bool = False,
):
"""
Create or update the table.
If this method has been called before and there are new functions added to :attr:`.add`, apply them on the
previously analyzed jobs.
If this method has been called before and there are new jobs added to :attr:`.analysis_project`, apply all
functions to them.
The result is available via :meth:`.get_dataframe`.
.. warning::
The executor, if given, must not naively pickle the mapped functions or
arguments, as PyironTable relies on lambda functions internally. Use
with executors that rely on dill or cloudpickle instead. Pyiron
provides such executors in the `executorlib` sub packages.
Args:
file (FileHDFio): HDF were the previous state of the table is stored
job_status_list (list of str): only consider jobs with these statuses
executor (concurrent.futures.Executor): executor for parallel execution
enforce_update (bool): if True always regenerate the table completely.
"""
# if there's new keys, apply the *new* functions to the old jobs and name the resulting table `df_new_keys`
# if there's new jobs, apply *all* functions to them and name the resulting table `df_new_ids`
# if enforce_update is given we recalculate the whole table below anyway, no need to patch up new keys
if not enforce_update:
new_user_functions, new_system_functions = self._get_new_functions(file)
if len(new_user_functions) > 0 or len(new_system_functions) > 0:
function_lst = [
self.add._user_function_dict[k] for k in new_user_functions
] + [
funct
for funct in self.add._system_function_lst
if funct.__name__ in new_system_functions
]
df_new_keys = self._iterate_over_job_lst(
job_id_lst=self._get_job_ids(),
function_lst=function_lst,
executor=executor,
)
if len(df_new_keys) > 0:
self._df = pandas.concat([self._df, df_new_keys], axis="columns")
new_jobs = self._collect_job_update_lst(
job_status_list=job_status_list,
job_stored_ids=self._get_job_ids() if not enforce_update else None,
)
if len(new_jobs) > 0:
df_new_ids = self._iterate_over_job_lst(
job_id_lst=new_jobs,
function_lst=self.add._function_lst,
executor=executor,
)
if len(df_new_ids) > 0:
self._df = pandas.concat([self._df, df_new_ids], ignore_index=True)
def get_dataframe(self) -> pandas.DataFrame:
return self._df
def _list_nodes(self) -> list:
return list(self._df.columns)
def __getitem__(self, item: str) -> Union[np.ndarray, None]:
if item in self.list_nodes():
return np.array(self._df[item])
return None
def __str__(self) -> str:
return self._df.__str__()
def __repr__(self) -> str:
"""
Human readable string representation
Returns:
str: pandas Dataframe structure as string
"""
return self._df.__repr__()
@property
def _file_name_csv(self) -> str:
"""
Get the file name of the CSV file.
Returns:
str: The file name of the CSV file.
"""
if self._csv_file is None:
return self._project.path + self.name + ".csv"
else:
return self._csv_file
def _load_csv(self) -> None:
"""
Load the table from a CSV file.
Returns:
None
"""
self._df = pandas.read_csv(self._file_name_csv)
@staticmethod
def _get_data_from_hdf5(hdf: FileHDFio) -> Tuple[dict, dict]:
"""
Load user-defined and system function dictionaries from an HDF file.
Args:
hdf (FileHDFio): The HDF file object.
Returns:
Tuple[dict, dict]: A tuple containing two dictionaries:
- temp_user_function_dict (dict): The user-defined function dictionary.
- temp_system_function_dict (dict): The system function dictionary.
"""
temp_user_function_dict = _from_pickle(hdf=hdf, key="user_function_dict")
temp_system_function_dict = _from_pickle(hdf=hdf, key="system_function_dict")
return temp_user_function_dict, temp_system_function_dict
def _get_job_ids(self) -> np.ndarray:
"""
Get the job IDs from the dataframe.
Returns:
np.ndarray: An array of job IDs.
"""
if len(self._df) > 0:
return self._df.job_id.values
else:
return np.array([])
def _get_filtered_job_ids_from_project(self, recursive: bool = True) -> List[int]:
"""
Get the filtered job IDs from the project.
Args:
recursive (bool): Flag to indicate whether to include jobs from subprojects (default is True).
Returns:
List[int]: A list of filtered job IDs.
"""
project_table = self._project.job_table(recursive=recursive)
filter_funct = self.db_filter_function
return project_table[filter_funct(project_table)]["id"].tolist()
def _iterate_over_job_lst(
self,
job_id_lst: List,
function_lst: List,
executor: concurrent.futures.Executor = None,
) -> List[dict]:
"""
Apply functions to job.
Any functions that raise an error are set to `None` in the final list.
Args:
job_id_lst (list of int): all job ids to analyze
function_lst (list of functions): all functions to apply on jobs. Must return a dictionary.
executor (concurrent.futures.Executor): executor for parallel execution
Returns:
list of dict: a list of the merged dicts from all functions for each job
"""
job_to_analyse_lst = [
[
self._project.db.get_item_by_id(job_id),
function_lst,
self.convert_to_object,
]
for job_id in job_id_lst
]
if executor is not None:
diff_dict_lst = list(
tqdm(
executor.map(_apply_list_of_functions_on_job, job_to_analyse_lst),
total=len(job_to_analyse_lst),
)
)
else:
diff_dict_lst = list(
tqdm(
map(_apply_list_of_functions_on_job, job_to_analyse_lst),
total=len(job_to_analyse_lst),
)
)
self.refill_dict(diff_dict_lst)
return pandas.DataFrame(diff_dict_lst)
[docs]
@staticmethod
def total_lst_of_keys(diff_dict_lst: List[dict]) -> set:
"""
Get unique list of all keys occuring in list.
"""
total_key_lst = []
for sub_dict in diff_dict_lst:
for key in sub_dict.keys():
total_key_lst.append(key)
return set(total_key_lst)
[docs]
def refill_dict(self, diff_dict_lst: List) -> None:
"""
Ensure that all dictionaries in the list have the same keys.
Keys that are not in a dict are set to `None`.
"""
total_key_lst = self.total_lst_of_keys(diff_dict_lst)
for sub_dict in diff_dict_lst:
for key in total_key_lst:
if key not in sub_dict.keys():
sub_dict[key] = None
def _collect_job_update_lst(
self, job_status_list: List, job_stored_ids: Optional[List] = None
) -> List:
"""
Collect jobs to update the pyiron table.
Jobs in `job_stored_ids` are ignored.
Args:
job_status_list (list): List of job status to consider
job_stored_ids (list/ None): List of already analysed job ids
Returns:
list: List of JobCore objects
"""
if job_stored_ids is not None:
job_id_lst = [
job_id
for job_id in self._get_filtered_job_ids_from_project()
if job_id not in job_stored_ids
]
else:
job_id_lst = self._get_filtered_job_ids_from_project()
job_update_lst = []
for job_id in tqdm(job_id_lst, desc="Loading and filtering jobs"):
try:
job = self._project.inspect(job_id)
except (
IndexError
): # In case the job was deleted while the pyiron table is running
job = None
if (
job is not None
and job.status in job_status_list
and self.filter_function(job)
):
job_update_lst.append(job_id)
return job_update_lst
def _repr_html_(self) -> str:
"""
Internal helper function to represent the GenericParameters object within the Jupyter Framework
Returns:
HTML: Jupyter HTML object
"""
return self._df._repr_html_()
[docs]
class TableJob(GenericJob):
"""
Since a project can have a large number of jobs, it is often necessary
to “filter” the data to extract useful information. PyironTable is a tool
that allows the user to do this efficiently.
Example:
>>> # Prepare random data
>>> for T in T_range:
>>> lmp = pr.create.job.Lammps(('lmp', T))
>>> lmp.structure = pr.create.structure.bulk('Ni', cubic=True).repeat(5)
>>> lmp.calc_md(temperature=T)
>>> lmp.run()
>>> def db_filter_function(job_table):
>>> return (job_table.status == "finished") & (job_table.hamilton == "Lammps")
>>> def get_energy(job):
>>> return job["output/generic/energy_pot"][-1]
>>> def get_temperature(job):
>>> return job['output/generic/temperature'][-1]
>>> table.db_filter_function = db_filter_function
>>> table.add["energy"] = get_energy
>>> table.add["temperature"] = get_temperature
>>> table.run()
>>> table.get_dataframe()
This returns a dataframe containing job-id, energy and temperature.
Alternatively, the filter function can be applied on the job
>>> def job_filter_function(job):
>>> return (job.status == "finished") & ("lmp" in job.job_name)
>>> table.filter_function = job_filter_function
"""
_system_function_lst = [get_job_id]
[docs]
def __init__(self, project, job_name):
super(TableJob, self).__init__(project, job_name)
self.__version__ = "0.1"
self.__hdf_version__ = "0.3.0"
self._analysis_project = None
self._pyiron_table = PyironTable(
project=None,
system_function_lst=self._system_function_lst,
csv_file_name=os.path.join(self.working_directory, "pyirontable.csv"),
)
self._enforce_update = False
self._job_status = ["finished"]
self._job_with_calculate_function = True
self.analysis_project = project.project
@property
def filter(self) -> JobFilters:
return self._pyiron_table.filter
@property
def db_filter_function(self) -> Union[callable, None]:
"""
function: database level filter function
The function should accept a dataframe, the job table of :attr:`~.analysis_project` and return a bool index into
it. Jobs where the index is `False` are excluced from the analysis.
"""
return self._pyiron_table.db_filter_function
@db_filter_function.setter
def db_filter_function(self, funct: callable) -> None:
self._pyiron_table.db_filter_function = funct
@property
def filter_function(self) -> Union[callable, None]:
"""
function: job level filter function
The function should accept a GenericJob or JobCore object and return a bool, if it returns `False` the job is
excluced from the analysis.
"""
return self._pyiron_table.filter_function
@filter_function.setter
def filter_function(self, funct: callable) -> None:
self._pyiron_table.filter_function = funct
@property
def job_status(self) -> List[str]:
"""
list of str: only jobs with status in this list are included in the table.
"""
return self._job_status
@job_status.setter
def job_status(self, status: Union[str, List[str]]) -> None:
if isinstance(status, str):
status = [status]
for s in status:
valid = jobstatus.job_status_lst
if s not in valid:
raise ValueError(
f"'{s}' not a valid job status! Must be one of {valid}."
)
self._job_status = status
@property
def pyiron_table(self) -> PyironTable:
return self._pyiron_table
@property
@deprecate("Use analysis_project instead!")
def ref_project(self) -> "pyiron_base.project.generic.Project":
return self.analysis_project
@ref_project.setter
def ref_project(self, project: "pyiron_base.project.generic.Project") -> None:
self.analysis_project = project
@property
def analysis_project(self) -> "pyiron_base.project.generic.Project":
"""
:class:`.Project`: which pyiron project should be searched for jobs
WARNING: setting this resets any previously added analysis and filter functions
"""
return self._analysis_project
@analysis_project.setter
def analysis_project(self, project: "pyiron_base.project.generic.Project") -> None:
self._analysis_project = project
self._pyiron_table = PyironTable(
project=self._analysis_project,
system_function_lst=self._system_function_lst,
csv_file_name=os.path.join(self.working_directory, "pyirontable.csv"),
)
@property
def add(self) -> FunctionContainer:
"""
Add a function to analyse job data
Example:
>>> def get_energy(job):
>>> return job["output/generic/energy_pot"][-1]
>>> table.add["energy"] = get_energy
"""
return self._pyiron_table.add
@property
def convert_to_object(self) -> bool:
"""
bool: if `True` convert fully load jobs before passing them to functions, if `False` use inspect mode.
"""
return self._pyiron_table.convert_to_object
@convert_to_object.setter
def convert_to_object(self, conv_to_obj: bool) -> None:
self._pyiron_table.convert_to_object = conv_to_obj
@property
def enforce_update(self) -> bool:
"""
bool: if `True` re-evaluate all function on all jobs when :meth:`.update_table` is called.
"""
return self._enforce_update
@enforce_update.setter
def enforce_update(self, enforce: bool) -> None:
"""
Set the enforce_update property.
Args:
enforce (bool): If True, re-evaluate all functions on all jobs when update_table is called.
"""
if isinstance(enforce, bool):
if enforce:
self._enforce_update = True
if self.status.finished:
self.status.created = True
else:
self._enforce_update = False
else:
raise TypeError("enforce must be a boolean")
def _save_output(self) -> None:
"""
Save the pyiron table dataframe to the HDF5 output file.
Returns:
None
"""
with self.project_hdf5.open("output") as hdf5_output:
self.pyiron_table._df.to_hdf(
hdf5_output.file_name, key=hdf5_output.h5_path + "/table"
)
def _to_dict(self) -> dict:
"""
Convert the TableJob object to a dictionary.
Returns:
dict: The TableJob object represented as a dictionary.
"""
job_dict = super()._to_dict()
job_dict["input/bool_dict"] = {
"enforce_update": self._enforce_update,
"convert_to_object": self._pyiron_table.convert_to_object,
}
if self._analysis_project is not None:
job_dict["input/project"] = {
"path": self._analysis_project.path,
"user": self._analysis_project.user,
"sql_query": self._analysis_project.sql_query,
"filter": self._analysis_project._filter,
"inspect_mode": self._analysis_project._inspect_mode,
}
add_dict = {}
self._pyiron_table.add._to_hdf(add_dict)
for k, v in add_dict.items():
job_dict["input/" + k] = v
if self.pyiron_table._filter_function is not None:
_to_pickle(job_dict, "input/filter", self.pyiron_table._filter_function)
if self.pyiron_table._db_filter_function is not None:
_to_pickle(
job_dict, "input/db_filter", self.pyiron_table._db_filter_function
)
return job_dict
def _from_dict(self, obj_dict: dict, version: str = None):
"""
Restore the TableJob object from a dictionary.
Args:
obj_dict (dict): The TableJob object represented as a dictionary.
version (str): The version of the object.
Returns:
None
"""
super()._from_dict(obj_dict=obj_dict, version=version)
if "project" in obj_dict["input"].keys():
project_dict = obj_dict["input"]["project"]
if os.path.exists(project_dict["path"]):
project = self.project.__class__(
path=project_dict["path"],
user=project_dict["user"],
sql_query=project_dict["sql_query"],
)
project._filter = project_dict["filter"]
project._inspect_mode = project_dict["inspect_mode"]
self.analysis_project = project
else:
self._logger.warning(
f"Could not instantiate analysis_project, no such path {project_dict['path']}."
)
if "filter" in obj_dict["input"].keys():
self.pyiron_table.filter_function = _from_pickle(
obj_dict["input"], "filter"
)
if "db_filter" in obj_dict["input"].keys():
self.pyiron_table.db_filter_function = _from_pickle(
obj_dict["input"], "db_filter"
)
bool_dict = obj_dict["input"]["bool_dict"]
self._enforce_update = bool_dict["enforce_update"]
self._pyiron_table.convert_to_object = bool_dict["convert_to_object"]
self._pyiron_table.add._from_hdf(obj_dict["input"])
[docs]
def to_hdf(
self,
hdf: Optional["pyiron_base.storage.hdfio.ProjectHDFio"] = None,
group_name: Optional[str] = None,
) -> None:
"""
Store pyiron table job in HDF5
Args:
hdf (Optional[ProjectHDFio]): The HDF5 file object.
group_name (Optional[str]): The name of the group in the HDF5 file.
Returns:
None
"""
super(TableJob, self).to_hdf(hdf=hdf, group_name=group_name)
if len(self.pyiron_table._df) != 0:
self._save_output()
[docs]
def from_hdf(
self,
hdf: Optional["pyiron_base.storage.hdfio.ProjectHDFio"] = None,
group_name: Optional[str] = None,
) -> None:
"""
Restore pyiron table job from HDF5
Args:
hdf (Optional[ProjectHDFio]): The HDF5 file object.
group_name (Optional[str]): The name of the group in the HDF5 file.
Returns:
None
"""
super(TableJob, self).from_hdf(hdf=hdf, group_name=group_name)
hdf_version = self.project_hdf5.get("HDF_VERSION", "0.1.0")
if hdf_version == "0.3.0":
with self.project_hdf5.open("output") as hdf5_output:
if "table" in hdf5_output.list_groups():
self._pyiron_table._df = pandas.read_hdf(
hdf5_output.file_name, hdf5_output.h5_path + "/table"
)
else:
pyiron_table = os.path.join(self.working_directory, "pyirontable.csv")
if os.path.exists(pyiron_table):
try:
self._pyiron_table._df = pandas.read_csv(pyiron_table)
self._pyiron_table._csv_file = pyiron_table
except EmptyDataError:
pass
else:
with self.project_hdf5.open("output") as hdf5_output:
if "table" in hdf5_output.list_nodes():
self._pyiron_table._df = pandas.DataFrame(
json.loads(hdf5_output["table"])
)
[docs]
def validate_ready_to_run(self) -> None:
"""
Validate if the job is ready to run.
Raises:
ValueError: If the analysis project is not defined.
"""
if self._analysis_project is None:
raise ValueError("Analysis project not defined!")
[docs]
def run_static(self) -> None:
"""
Run the static analysis job.
This method creates the working directory, updates the table, and sets the job status to finished.
"""
self._create_working_directory()
self.status.running = True
self.update_table()
self.status.finished = True
[docs]
@deprecate(job_status_list="Use TableJob.job_status instead!")
def update_table(self, job_status_list: Optional[List[str]] = None) -> None:
"""
Update the pyiron table object, add new columns if a new function was added or add new rows for new jobs.
By default this function does not recompute already evaluated functions on already existing jobs. To force a
complete re-evaluation set :attr:`~.enforce_update` to `True`.
Args:
job_status_list (list/None): List of job status which are added to the table by default ["finished"].
Deprecated, use :attr:`.job_status` instead!
"""
if job_status_list is None:
job_status_list = self.job_status
if self.job_id is not None:
self.project.db.item_update({"timestart": datetime.now()}, self.job_id)
with self.project_hdf5.open("input") as hdf5_input:
if self._executor_type is None and self.server.cores > 1:
self._executor_type = "executorlib.SingleNodeExecutor"
if self._executor_type is not None:
with self._get_executor(max_workers=self.server.cores) as exe:
self._pyiron_table.create_table(
file=hdf5_input,
job_status_list=job_status_list,
enforce_update=self._enforce_update,
executor=exe,
)
else:
self._pyiron_table.create_table(
file=hdf5_input,
job_status_list=job_status_list,
enforce_update=self._enforce_update,
executor=None,
)
self.to_hdf()
self._pyiron_table._df.to_csv(
os.path.join(self.working_directory, "pyirontable.csv"), index=False
)
self._save_output()
self.run_time_to_db()
[docs]
def get_dataframe(self) -> pandas.DataFrame:
"""
Returns aggregated results over all jobs.
Returns:
pandas.Dataframe
"""
return self.pyiron_table._df
[docs]
def always_true_pandas(job_table) -> "pandas.Series":
"""
A function which returns a pandas Series with all True values based on the size of the input pandas dataframe
Args:
job_table (pandas.DataFrame): Input dataframe
Returns:
pandas.Series: A series of True values
"""
from pandas import Series
return Series([True] * len(job_table), index=job_table.index)
[docs]
def always_true(_):
"""
A function that always returns True no matter what!
Returns:
bool: True
"""
return True
def _apply_list_of_functions_on_job(input_parameters: Tuple) -> dict:
from pyiron_snippets.logger import logger
from pyiron_base.jobs.job.path import JobPath
db_entry, function_lst, convert_to_object = input_parameters
job = JobPath.from_db_entry(db_entry)
if convert_to_object:
job = job.to_object()
job.set_input_to_read_only()
diff_dict = {}
for funct in function_lst:
try:
diff_dict.update(funct(job))
except Exception as e:
logger.warn(f"Caught exception '{e}' when called on job {job.id}!")
return diff_dict