Source code for pyiron_base.project.archiving.import_archive
import io
import os
import posixpath
import tarfile
import tempfile
from shutil import copytree
from typing import Tuple
import numpy as np
import pandas
from pyiron_base.state import state
from pyiron_base.utils.instance import static_isinstance
[docs]
def update_id_lst(record_lst: list, job_id_lst: list) -> list:
"""
Update the list of master IDs based on the record list and job ID list.
Args:
record_lst (list): List of master IDs.
job_id_lst (list): List of job IDs.
Returns:
list: Updated list of master IDs.
"""
masterid_lst = []
for masterid in record_lst:
if masterid is None or np.isnan(masterid):
masterid_lst.append(None)
elif isinstance(masterid, int) or isinstance(masterid, float):
masterid = int(masterid)
masterid_lst.append(job_id_lst[masterid])
return masterid_lst
[docs]
def import_jobs(
project_instance: "pyiron_base.project.generic.Project", archive_directory: str
):
"""
Import jobs from an archive directory to a pyiron project.
Args:
project_instance (pyiron_base.project.generic.Project): Pyiron project instance.
archive_directory (str): Path to the archive directory.
"""
# Copy HDF5 files
# if the archive_directory is a path(string)/name of the compressed file
if static_isinstance(
obj=archive_directory.__class__,
obj_type=[
"pyiron_base.project.generic.Project",
],
):
archive_directory = archive_directory.path
elif not isinstance(archive_directory, str):
raise RuntimeError(
"The given path for importing from, does not have the correct"
" format paths as string or pyiron Project objects are expected"
)
if archive_directory.endswith(".tar.gz"):
with tempfile.TemporaryDirectory() as temp_dir:
with tarfile.open(archive_directory, "r:gz") as tar:
tar.extractall(path=temp_dir)
df, common_path = transfer_files(
origin_path=temp_dir, project_path=project_instance.path
)
else:
df, common_path = transfer_files(
origin_path=archive_directory, project_path=project_instance.path
)
pr_import = project_instance.open(os.curdir)
df["project"] = [
posixpath.normpath(
posixpath.join(pr_import.project_path, posixpath.relpath(p, common_path))
)
+ "/"
for p in df["project"].values
]
df["projectpath"] = len(df) * [pr_import.root_path]
# Add jobs to database
job_id_lst = []
for entry in df.dropna(axis=1).to_dict(orient="records"):
for tag in ["id", "parentid", "masterid"]:
if tag in entry:
del entry[tag]
if "timestart" in entry:
entry["timestart"] = pandas.to_datetime(entry["timestart"])
if "timestop" in entry:
entry["timestop"] = pandas.to_datetime(entry["timestop"])
if "username" not in entry:
entry["username"] = state.settings.login_user
job_id = pr_import.db.add_item_dict(par_dict=entry, check_duplicates=True)
job_id_lst.append(job_id)
# Update parent and master ids
for job_id, masterid, parentid in zip(
job_id_lst,
update_id_lst(record_lst=df["masterid"].values, job_id_lst=job_id_lst),
update_id_lst(record_lst=df["parentid"].values, job_id_lst=job_id_lst),
):
if masterid is not None or parentid is not None:
pr_import.db.item_update(
item_id=job_id, par_dict={"parentid": parentid, "masterid": masterid}
)
[docs]
def transfer_files(origin_path: str, project_path: str) -> Tuple[pandas.DataFrame, str]:
"""
Transfer files from the origin path to the project path.
Args:
origin_path (str): Path to the origin directory.
project_path (str): Path to the project directory.
Returns:
Tuple[pandas.DataFrame, str]: A tuple containing the job table and the common path.
"""
df = get_dataframe(origin_path=origin_path)
common_path = posixpath.commonpath(list(df["project"]))
copytree(posixpath.join(origin_path, common_path), project_path, dirs_exist_ok=True)
return df, common_path
[docs]
def get_dataframe(
origin_path: str, csv_file_name: str = "export.csv"
) -> pandas.DataFrame:
"""
Get the job table from the csv file.
Args:
origin_path (str): Path to the origin directory.
csv_file_name (str): Name of the csv file.
Returns:
pandas.DataFrame: Job table.
"""
# This line looks for the csv file outside of the archive directory to
# guarantee backward compatibility with old archives.
if os.path.exists(csv_file_name):
return pandas.read_csv(csv_file_name, index_col=0)
for root, dirs, files in os.walk(origin_path):
if csv_file_name in files:
return pandas.read_csv(os.path.join(root, csv_file_name), index_col=0)
raise FileNotFoundError(f"File: {csv_file_name} was not found.")
[docs]
def inspect_csv(tar_path: str, csv_file: str = "export.csv") -> None:
"""
Inspect the csv file inside a tar archive.
Args:
tar_path (str): Path to the tar archive.
csv_file (str): Name of the csv file.
Returns:
pandas.DataFrame: Job table.
"""
with tarfile.open(tar_path, mode="r:gz") as tar:
for member in tar.getmembers():
# Check if the member is a file and ends with the desired csv file name
if member.isfile() and member.name.endswith(f"/{csv_file}"):
# Extract the file object
extracted_file = tar.extractfile(member)
if extracted_file:
# Read the file content
return pandas.read_csv(
io.StringIO(extracted_file.read().decode("utf-8")), index_col=0
)
raise FileNotFoundError(f"File: {csv_file} in {tar_path} was not found.")