Source code for pyiron_base.project.archiving.export_archive

import os
import shutil
import tarfile
import tempfile
from typing import Optional

import pandas


[docs] def copy_files_to_archive( directory_to_transfer: str, archive_directory: str, compress: bool = True, copy_all_files: bool = False, arcname: Optional[str] = None, df: Optional[pandas.DataFrame] = None, ): """ Copy files from a directory to an archive, optionally compressing the archive. Args: directory_to_transfer (str): The directory containing the files to transfer. archive_directory (str): The destination directory for the archive. compress (bool): If True, compress the archive directory into a tarball. Default is True. copy_all_files (bool): If True, include all files in the archive, otherwise only .h5 files. Default is False. arcname (str): The name of the archive directory. Default is the name of the directory to transfer. df (DataFrame): DataFrame containing updated job information with new IDs and project paths. """ def copy_files( origin: str, destination: str, copy_all_files: bool = copy_all_files ): """ Copy files from the origin directory to the destination directory. Args: origin (str): The origin directory containing the files to copy. destination (str): The destination directory for the copied files. copy_all_files (bool): If True, include all files in the archive, otherwise only .h5 files. Default is False. """ if copy_all_files: shutil.copytree(origin, destination, dirs_exist_ok=True) else: copy_h5_files(origin, destination) assert isinstance(archive_directory, str) and ".tar.gz" not in archive_directory if arcname is None: arcname = os.path.relpath(os.path.abspath(archive_directory), os.getcwd()) if df is not None: df.to_csv(os.path.join(directory_to_transfer, "export.csv")) if not compress: copy_files(directory_to_transfer, os.path.join(archive_directory, arcname)) elif compress and copy_all_files: with tarfile.open(f"{archive_directory}.tar.gz", "w:gz") as tar: tar.add(directory_to_transfer, arcname=arcname) else: with tempfile.TemporaryDirectory() as temp_dir: # Copy files to the temporary directory dest = os.path.join( temp_dir, os.path.basename(directory_to_transfer.rstrip("/\\")) ) copy_files(directory_to_transfer, dest) # Compress the temporary directory into a tar.gz archive with tarfile.open(f"{archive_directory}.tar.gz", "w:gz") as tar: tar.add(dest, arcname=arcname) if df is not None: os.remove(os.path.join(directory_to_transfer, "export.csv"))
[docs] def copy_h5_files(src: str, dst: str) -> None: """ Copies all .h5 files from the source directory to the destination directory, preserving the directory structure. Args: src (str): The source directory from which .h5 files will be copied. dst (str): The destination directory where .h5 files will be copied to. This function traverses the source directory tree, identifies files with a .h5 extension, and copies them to the destination directory while maintaining the same directory structure. Non-.h5 files are ignored. """ for root, dirs, files in os.walk(src): for file in files: if file.endswith(".h5") or file == "export.csv": src_file = os.path.join(root, file) rel_path = os.path.relpath(root, src) dst_dir = os.path.join(dst, rel_path) os.makedirs(dst_dir, exist_ok=True) shutil.copy2(src_file, os.path.join(dst_dir, file))
[docs] def export_database(df: pandas.DataFrame) -> pandas.DataFrame: """ Export the project database to an archive directory. Args: df (DataFrame): pyiron job table containing job information. Returns: DataFrame: DataFrame containing updated job information with new IDs and project paths. """ job_translate_dict = { old_id: new_id for new_id, old_id in enumerate(sorted(df.id.values)) } df["id"] = df["id"].map(job_translate_dict) df["masterid"] = df["masterid"].map(job_translate_dict) df["parentid"] = df["parentid"].map(job_translate_dict) df["project"] = df["project"].map(lambda x: os.path.relpath(x, os.getcwd())) df.drop(columns=["projectpath"], inplace=True) return df