Source code for pyiron_base.jobs.job.wrapper

# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.
"""
The job wrapper is called from the run_job.py script, it restores the job from hdf5 and executes it.
"""

import logging
import os
from typing import Optional

from pyiron_base.database.filetable import (
    get_hamilton_from_file,
    get_hamilton_version_from_file,
    get_job_status_from_file,
)
from pyiron_base.project.generic import Project
from pyiron_base.state import state
from pyiron_base.state.signal import catch_signals

__author__ = "Joerg Neugebauer"
__copyright__ = (
    "Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
    "Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "production"
__date__ = "Sep 1, 2017"


[docs] class JobWrapper(object): """ The job wrapper is called from the run_job.py script, it restores the job from hdf5 and executes it. Args: working_directory (str): working directory of the job job_id (int/ None): job ID hdf5_file (str): path to the HDF5 file of the job h5_path (str): path inside the HDF5 file to load the job submit_on_remote (bool): submit to queuing system on remote host debug (bool): enable debug mode [True/False] (optional) """
[docs] def __init__( self, working_directory: str, job_id: Optional[int] = None, hdf5_file: Optional["pyiron_base.storage.hdfio.ProjectHDFio"] = None, h5_path: Optional[str] = None, submit_on_remote: bool = False, debug: bool = False, connection_string: Optional[str] = None, collect: bool = False, ): self.working_directory = working_directory self._remote_flag = submit_on_remote self._collect = collect if connection_string is not None: state.database.open_local_sqlite_connection( connection_string=connection_string ) pr = Project(path=os.path.join(working_directory, "..", "..")) if job_id is not None: self.job = pr.load(int(job_id)) else: projectpath = state.database.top_path(hdf5_file) if projectpath is None: project = os.path.dirname(hdf5_file) else: project = os.path.relpath(os.path.dirname(hdf5_file), projectpath) job_name = h5_path[1:] self.job = pr.load_from_jobpath( job_id=None, db_entry={ "job": job_name, "subjob": h5_path, "projectpath": projectpath, "project": project + "/", "status": get_job_status_from_file( hdf5_file=hdf5_file, job_name=job_name ), "hamilton": get_hamilton_from_file( hdf5_file=hdf5_file, job_name=job_name ), "hamversion": get_hamilton_version_from_file( hdf5_file=hdf5_file, job_name=job_name ), }, convert_to_object=True, ) # setup logger self._logger = self.setup_logger(debug=debug)
[docs] @staticmethod def setup_logger(debug: bool = False) -> logging.Logger: """ Setup the error logger Args: debug (bool): the level of logging, enable debug mode [True/False] (optional) Returns: logger: logger object instance """ logger = logging.getLogger("pyiron_log") logger.setLevel(logging.INFO) if debug: logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.INFO) formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) ch.setFormatter(formatter) logger.addHandler(ch) return logger
[docs] def run(self) -> None: """ The job wrapper run command, sets the job status to 'running' and executes run_if_modal(). """ if self._remote_flag and self.job.server.queue is not None: self.job.run_if_scheduler() self.job.status.submitted = True elif self._collect: self.job.status.collect = True self.job.run() else: with catch_signals(self.job.signal_intercept): self.job.run_static()
[docs] def job_wrapper_function( working_directory: str, job_id: Optional[int] = None, file_path: Optional[str] = None, submit_on_remote: bool = False, debug: bool = False, collect: bool = False, ): """ Job Wrapper function - creates a JobWrapper object and calls run() on that object Args: working_directory (str): directory where the HDF5 file of the job is located job_id (int/ None): job id file_path (str): path to the HDF5 file debug (bool): enable debug mode submit_on_remote (bool): submit to queuing system on remote host collect (bool): collect output of calculation """ # always close the database connection in calculations on the cluster to avoid high number of concurrent # connections. state.database.close_connection() state.database.connection_timeout = 0 state.database.open_connection() if job_id is not None: job = JobWrapper( working_directory=working_directory, job_id=job_id, submit_on_remote=submit_on_remote, debug=debug, collect=collect, ) elif file_path is not None: hdf5_file = ( ".".join(file_path.split(".")[:-1]) + "." + file_path.split(".")[-1].split("/")[0] ) h5_path = "/".join(file_path.split(".")[-1].split("/")[1:]) job = JobWrapper( working_directory, job_id=None, hdf5_file=hdf5_file, h5_path="/" + h5_path, submit_on_remote=submit_on_remote, debug=debug, collect=collect, ) else: raise ValueError("Either job_id or file_path have to be not None.") job.run()