# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.
import multiprocessing
import os
import posixpath
import shutil
import subprocess
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
from typing import List, Optional, Tuple
from pyiron_snippets.deprecate import deprecate
from pyiron_base.jobs.job.wrapper import JobWrapper
from pyiron_base.state import state
from pyiron_base.state.signal import catch_signals
from pyiron_base.utils.instance import static_isinstance
try:
import flux.job
flux_available = True
except ImportError:
flux_available = False
"""
The function job.run() inside pyiron is executed differently depending on the status of the job object. This module
introduces the most general run functions and how they are selected.
If an additional parameter is provided, then a specific run function is executed:
repair: run_job_with_parameter_repair
If no explicit parameter is provided the first implicit parameter is the job.status:
initialized: run_job_with_status_initialized
created: run_job_with_status_created
submitted: run_job_with_status_submitted
running: run_job_with_status_running
refresh: run_job_with_status_refresh
busy: run_job_with_status_busy
collect: run_job_with_status_collect
suspended: run_job_with_status_suspended
finished: run_job_with_status_finished
Afterwards inside the run_job_with_status_created() function the job is executed differently depending on the run mode
of the server object attached to the job object: job.server.run_mode
manual: run_job_with_runmode_manually
modal: run_job_with_runmode_modal
non_modal: run_job_with_runmode_non_modal
interactive: run_job_with_runmode_interactive
interactive_non_modal: run_job_with_runmode_interactive_non_modal
queue: run_job_with_runmode_queue
srun: run_job_with_runmode_srun
executor: run_job_with_runmode_executor
thread: only affects children of a GenericMaster
worker: only affects children of a GenericMaster
Finally for jobs which call an external executable the execution is implemented in an function as well:
execute_job_with_external_executable
"""
[docs]
class CalculateFunctionCaller:
__slots__ = ("write_input_funct", "collect_output_funct")
[docs]
def __init__(
self,
write_input_funct: callable = write_input_files_from_input_dict,
collect_output_funct: callable = None,
):
self.write_input_funct = write_input_funct
self.collect_output_funct = collect_output_funct
def __call__(
self,
working_directory: str,
input_parameter_dict: dict,
executable_script: str,
shell_parameter: bool,
cores: int = 1,
threads: int = 1,
gpus: int = 1,
conda_environment_name: Optional[str] = None,
conda_environment_path: Optional[str] = None,
accept_crash: bool = False,
accepted_return_codes: List[int] = [],
output_parameter_dict: dict = {},
) -> Tuple[str, dict, bool]:
"""
Generic calculate function, which writes the input files into the working_directory, executes the
executable_script and parses the output using the output_parameter_dict.
Args:
working_directory (str): Directory the calculation is executed in.
input_parameter_dict (dict): Dictionary with parameters for the write_input function. By default this is a
hierarchical dictionary with two keys files_to_write and files_to_copy on the
first level. The files_to_write dictionary contains the file names and their
content as strings, while the files_to_copy dictionary contains the file names
and the links to the files which should be copied.
executable_script (str): Executable to be executed in the working directory.
shell_parameter (bool): The shell parameter from the subprocess.Popen() function of the python standard
library.
conda_environment_name (str): Name of a conda environment to execute the executable in.
conda_environment_path (str): Path of a conda environment to execute the executable in.
accept_crash (bool): Boolean flag to accept crashes.
accepted_return_codes (list): List of accepted return codes.
output_parameter_dict (dict): Additional parameters for the collect_output function.
Returns:
str, dict, bool: Tuple consisting of the shell output (str), the parsed output (dict) and a boolean flag if
the execution raised an accepted error.
"""
os.makedirs(working_directory, exist_ok=True)
if self.write_input_funct is not None:
self.write_input_funct(
input_dict=input_parameter_dict,
working_directory=working_directory,
)
job_crashed, shell_output = execute_command_with_error_handling(
executable=executable_script,
shell=shell_parameter,
working_directory=working_directory,
cores=cores,
threads=threads,
gpus=gpus,
conda_environment_name=conda_environment_name,
conda_environment_path=conda_environment_path,
accepted_return_codes=accepted_return_codes,
accept_crash=accept_crash,
)
parsed_output = None
if not job_crashed and self.collect_output_funct is not None:
parsed_output = self.collect_output_funct(
working_directory=working_directory,
**output_parameter_dict,
)
return shell_output, parsed_output, job_crashed
# Parameter
[docs]
def run_job_with_parameter_repair(
job: "pyiron_base.jobs.job.generic.GenericJob",
) -> None:
"""
Internal helper function the run if repair function is called when the run() function is called with the
'repair' parameter.
Args:
job (GenericJob): pyiron job object
"""
job._run_if_created()
# Job Status
[docs]
def run_job_with_status_initialized(
job: "pyiron_base.jobs.job.generic.GenericJob", debug: bool = False
):
"""
Internal helper function the run if new function is called when the job status is 'initialized'. It prepares
the hdf5 file and the corresponding directory structure.
Args:
job (GenericJob): pyiron job object
debug (bool): Debug Mode
"""
job.validate_ready_to_run()
if job.server.run_mode.queue:
job.check_setup()
if job.check_if_job_exists():
print("job exists already and therefore was not created!")
else:
job.save()
# The PythonFunctionContainerJob can generate the job_name during job.save(). In particular, this can lead to
# the job being reloaded from the database resulting in the job status to change from initialized to finished.
# Skipping the job.run() prevents raising a warning by calling job.run() on an already finished job.
if (
not job.status.finished
and not job.status.aborted
and not job.status.not_converged
):
job.run()
[docs]
def run_job_with_status_created(job: "pyiron_base.jobs.job.generic.GenericJob") -> None:
"""
Internal helper function the run if created function is called when the job status is 'created'. It executes
the simulation, either in modal mode, meaning waiting for the simulation to finish, manually, or submits the
simulation to the que.
Args:
job (GenericJob): pyiron job object
Returns:
int: Queue ID - if the job was send to the queue
"""
job.status.submitted = True
# Different run modes
if job.server.run_mode.manual:
run_job_with_runmode_manually(job=job, _manually_print=True)
elif job.server.run_mode.worker:
run_job_with_runmode_manually(job=job, _manually_print=True)
elif job.server.run_mode.modal:
job.run_static()
elif job.server.run_mode.srun:
run_job_with_runmode_srun(job=job)
elif job.server.run_mode.executor:
if job.server.gpus is not None:
gpus_per_slot = int(job.server.gpus / job.server.cores)
if gpus_per_slot < 0:
raise ValueError(
"Both job.server.gpus and job.server.cores have to be greater than zero."
)
else:
gpus_per_slot = None
run_job_with_runmode_executor(
job=job,
executor=job.server.executor,
gpus_per_slot=gpus_per_slot,
)
elif (
job.server.run_mode.non_modal
or job.server.run_mode.thread
or job.server.run_mode.worker
):
run_job_with_runmode_non_modal(job=job)
elif job.server.run_mode.queue:
job.run_if_scheduler()
elif job.server.run_mode.interactive:
job.run_if_interactive()
elif job.server.run_mode.interactive_non_modal:
job.run_if_interactive_non_modal()
[docs]
def run_job_with_status_submitted(
job: "pyiron_base.jobs.job.generic.GenericJob",
) -> None: # Submitted jobs are handled by the job wrapper!
"""
Internal helper function the run if submitted function is called when the job status is 'submitted'. It means
the job is waiting in the queue. ToDo: Display a list of the users jobs in the queue.
Args:
job (GenericJob): pyiron job object
"""
if (
job.server.run_mode.queue
and not job.project.queue_check_job_is_waiting_or_running(job)
):
if not state.queue_adapter.remote_flag:
job.run(delete_existing_job=True)
else:
job.transfer_from_remote()
else:
print("Job " + str(job.job_id) + " is waiting in the que!")
[docs]
def run_job_with_status_running(job: "pyiron_base.jobs.job.generic.GenericJob") -> None:
"""
Internal helper function the run if running function is called when the job status is 'running'. It allows the
user to interact with the simulation while it is running.
Args:
job (GenericJob): pyiron job object
"""
if (
job.server.run_mode.queue
and not job.project.queue_check_job_is_waiting_or_running(job)
):
job.run(delete_existing_job=True)
elif job.server.run_mode.interactive:
job.run_if_interactive()
elif job.server.run_mode.interactive_non_modal:
job.run_if_interactive_non_modal()
else:
print("Job " + str(job.job_id) + " is running!")
[docs]
def run_job_with_status_refresh(job: "pyiron_base.jobs.job.generic.GenericJob") -> None:
"""
Internal helper function the run if refresh function is called when the job status is 'refresh'. If the job was
suspended previously, the job is going to be started again, to be continued.
Args:
job (GenericJob): pyiron job object
"""
raise NotImplementedError(
"Refresh is not supported for this job type for job " + str(job.job_id)
)
[docs]
def run_job_with_status_busy(job: "pyiron_base.jobs.job.generic.GenericJob") -> None:
"""
Internal helper function the run if busy function is called when the job status is 'busy'.
Args:
job (GenericJob): pyiron job object
"""
raise NotImplementedError(
"Refresh is not supported for this job type for job " + str(job.job_id)
)
[docs]
def run_job_with_status_collect(job: "pyiron_base.jobs.job.generic.GenericJob") -> None:
"""
Internal helper function the run if collect function is called when the job status is 'collect'. It collects
the simulation output using the standardized functions collect_output() and collect_logfiles(). Afterwards the
status is set to 'finished'
Args:
job (GenericJob): pyiron job object
"""
if job._job_with_calculate_function and job._collect_output_funct is not None:
parsed_output = job._collect_output_funct(
working_directory=job.working_directory, **job.get_output_parameter_dict()
)
job.save_output(output_dict=parsed_output)
else:
job.collect_output()
job.collect_logfiles()
job.run_time_to_db()
if job.status.collect:
if not job.convergence_check():
job.status.not_converged = True
else:
if job._compress_by_default:
job.compress()
job.status.finished = True
job._hdf5["status"] = job.status.string
job.update_master()
[docs]
def run_job_with_status_suspended(
job: "pyiron_base.jobs.job.generic.GenericJob",
) -> None:
"""
Internal helper function the run if suspended function is called when the job status is 'suspended'. It
restarts the job by calling the run if refresh function after setting the status to 'refresh'.
Args:
job (GenericJob): pyiron job object
"""
job.status.refresh = True
job.run()
[docs]
@deprecate(
run_again="Either delete the job via job.remove() or use delete_existing_job=True.",
version="0.4.0",
)
def run_job_with_status_finished(
job: "pyiron_base.jobs.job.generic.GenericJob",
) -> None:
"""
Internal helper function the run if finished function is called when the job status is 'finished'. It loads
the existing job.
Args:
job (GenericJob): pyiron job object
"""
job.logger.warning(
"The job {} is being loaded instead of running. To re-run use the argument "
"'delete_existing_job=True in create_job'".format(job.job_name)
)
job.from_hdf()
# Run Modes
[docs]
def run_job_with_runmode_manually(
job: "pyiron_base.jobs.job.generic.GenericJob", _manually_print: bool = True
) -> None:
"""
Internal helper function to run a job manually.
Args:
job (GenericJob): pyiron job object
_manually_print (bool): [True/False] print command for execution - default=True
"""
if job._job_with_calculate_function:
job.project_hdf5.create_working_directory()
write_input_files_from_input_dict(
input_dict=job.get_input_parameter_dict(),
working_directory=job.working_directory,
)
if _manually_print:
abs_working = posixpath.abspath(job.project_hdf5.working_directory)
if not state.database.database_is_disabled:
print(
"You have selected to start the job manually. "
+ "To run it, go into the working directory {} and ".format(abs_working)
+ "call 'python -m pyiron_base.cli wrapper -p {}".format(abs_working)
+ " -j {} ' ".format(job.job_id)
)
else:
print(
"You have selected to start the job manually. "
+ "To run it, go into the working directory {} and ".format(abs_working)
+ "call 'python -m pyiron_base.cli wrapper -p {}".format(abs_working)
+ " -f {} ' ".format(
job.project_hdf5.file_name + job.project_hdf5.h5_path
)
)
[docs]
def run_job_with_runmode_modal(job: "pyiron_base.jobs.job.generic.GenericJob") -> None:
"""
The run if modal function is called by run to execute the simulation, while waiting for the output. For this we
use subprocess.check_output()
Args:
job (GenericJob): pyiron job object
"""
job.run_static()
[docs]
def run_job_with_runmode_non_modal(
job: "pyiron_base.jobs.job.generic.GenericJob",
) -> None:
"""
The run if non modal function is called by run to execute the simulation in the background. For this we use
multiprocessing.Process()
Args:
job (GenericJob): pyiron job object
"""
if not state.database.database_is_disabled:
if not state.database.using_local_database:
args = (job.project_hdf5.working_directory, job.job_id, None, False, None)
else:
args = (
job.project_hdf5.working_directory,
job.job_id,
None,
False,
str(job.project.db.conn.engine.url),
)
else:
args = (
job.project_hdf5.working_directory,
None,
job.project_hdf5.file_name + job.project_hdf5.h5_path,
False,
None,
)
p = multiprocessing.Process(
target=multiprocess_wrapper,
args=args,
)
if job.master_id and job.server.run_mode.non_modal:
del job
p.start()
else:
if job.server.run_mode.non_modal:
p.start()
else:
job._process = p
job._process.start()
[docs]
def run_job_with_runmode_queue(job: "pyiron_base.jobs.job.generic.GenericJob") -> None:
"""
The run if queue function is called by run if the user decides to submit the job to and queing system. The job
is submitted to the queuing system using subprocess.Popen()
Args:
job (GenericJob): pyiron job object
Returns:
int: Returns the queue ID for the job.
"""
if state.queue_adapter is None:
raise TypeError("No queue adapter defined.")
if state.queue_adapter.remote_flag:
filename = state.queue_adapter.convert_path_to_remote(
path=job.project_hdf5.file_name
)
working_directory = state.queue_adapter.convert_path_to_remote(
path=job.working_directory
)
command = (
"python -m pyiron_base.cli wrapper -p "
+ working_directory
+ " -f "
+ filename
+ job.project_hdf5.h5_path
+ " --submit"
)
job.project_hdf5.create_working_directory()
job.write_input()
state.queue_adapter.transfer_file_to_remote(
file=job.project_hdf5.file_name, transfer_back=False
)
elif state.database.database_is_disabled:
command = (
"python -m pyiron_base.cli wrapper -p "
+ job.working_directory
+ " -f "
+ job.project_hdf5.file_name
+ job.project_hdf5.h5_path
)
else:
command = (
"python -m pyiron_base.cli wrapper -p "
+ job.working_directory
+ " -j "
+ str(job.job_id)
)
que_id = state.queue_adapter.submit_job(
queue=job.server.queue,
job_name="pi_" + str(job.job_id),
working_directory=job.project_hdf5.working_directory,
cores=job.server.cores,
run_time_max=job.server.run_time,
memory_max=job.server.memory_limit,
command=command,
**job.server.additional_arguments,
)
if que_id is not None:
with job.server.unlocked():
job.server.queue_id = que_id
job.project_hdf5.write_dict(data_dict={"server": job.server.to_dict()})
print("Queue system id: ", que_id)
else:
job._logger.warning("Job aborted")
job.status.aborted = True
raise ValueError("run_queue.sh crashed")
state.logger.debug("submitted %s", job.job_name)
job._logger.debug("job status: %s", job.status)
job._logger.info(
"{}, status: {}, submitted: queue id {}".format(
job.job_info_str, job.status, que_id
)
)
[docs]
def run_job_with_runmode_srun(job: "pyiron_base.jobs.job.generic.GenericJob") -> None:
working_directory = job.project_hdf5.working_directory
if not state.database.database_is_disabled:
if not state.database.using_local_database:
command = (
"srun python -m pyiron_base.cli wrapper -p "
+ working_directory
+ "- j "
+ job.job_id
)
else:
raise ValueError(
"run_job_with_runmode_srun() does not support local databases."
)
else:
command = (
"srun python -m pyiron_base.cli wrapper -p "
+ working_directory
+ " -f "
+ job.project_hdf5.file_name
+ job.project_hdf5.h5_path
)
os.makedirs(working_directory, exist_ok=True)
del job
subprocess.Popen(
command,
cwd=working_directory,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
env=os.environ.copy(),
)
[docs]
def run_job_with_runmode_executor(
job: "pyiron_base.jobs.job.generic.GenericJob",
executor: "concurrent.futures.Executor",
gpus_per_slot: Optional[int] = None,
) -> None:
"""
Introduced in Python 3.2 the concurrent.futures interface enables the asynchronous execution of python programs.
A function is submitted to the executor and a future object is returned. The future object is updated in the
background once the executor finished executing the function. The job.server.run_mode.executor implements the same
functionality for pyiron jobs. An executor is set as an attribute to the server object:
>>> job.server.executor = concurrent.futures.Executor()
>>> job.run()
>>> job.server.future.done()
False
>>> job.server.future.result()
>>> job.server.future.done()
True
When the job is executed by calling the run() function a future object is returned. The job is then executed in the
background and the user can use the future object to check the status of the job.
Args:
job (GenericJob): pyiron job object
executor (concurrent.futures.Executor): executor class which implements the executor interface defined in the
python concurrent.futures.Executor class.
gpus_per_slot (int): number of GPUs per MPI rank, typically 1
"""
if static_isinstance(
obj=job,
obj_type="pyiron_base.jobs.master.generic.GenericMaster",
# The static check is used to avoid a circular import:
# runfunction -> GenericJob -> GenericMaster -> runfunction
# This smells a bit, so if a better architecture is found in the future, use it
# to avoid string-based specifications
):
raise NotImplementedError(
"Currently job.server.run_mode.executor does not support GenericMaster jobs."
)
if flux_available and isinstance(executor, flux.job.FluxExecutor):
run_job_with_runmode_executor_flux(
job=job, executor=executor, gpus_per_slot=gpus_per_slot
)
elif isinstance(executor, ProcessPoolExecutor):
run_job_with_runmode_executor_futures(job=job, executor=executor)
else:
raise NotImplementedError(
"Currently only flux.job.FluxExecutor and concurrent.futures.ProcessPoolExecutor are supported."
)
[docs]
def run_job_with_runmode_executor_futures(
job: "pyiron_base.jobs.job.generic.GenericJob",
executor: "concurrent.futures.Executor",
) -> None:
"""
Interface for the ProcessPoolExecutor implemented in the python standard library as part of the concurrent.futures
module. The ProcessPoolExecutor does not provide any resource management, so the user is responsible to keep track of
the number of compute cores in use, as over-subscription can lead to low performance.
The [ProcessPoolExecutor docs](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) state: "The __main__ module must be importable by worker subprocesses. This means that ProcessPoolExecutor will not work in the interactive interpreter." (i.e. Jupyter notebooks). For standard usage this is a non-issue, but for the edge case of job classes defined in-notebook (e.g. children of `PythonTemplateJob`), the using the ProcessPoolExecutor will result in errors. To resolve this, relocate such classes to an importable .py file.
>>> from concurrent.futures import ProcessPoolExecutor
>>> job.server.executor = ProcessPoolExecutor()
>>> job.server.future.done()
False
>>> job.server.future.result()
>>> job.server.future.done()
True
Args:
job (GenericJob): pyiron job object
executor (concurrent.futures.Executor): executor class which implements the executor interface defined in the
python concurrent.futures.Executor class.
"""
if state.database.database_is_disabled:
file_path = job.project_hdf5.file_name + job.project_hdf5.h5_path
connection_string = None
else:
file_path = None
if state.database.using_local_database:
connection_string = str(job.project.db.conn.engine.url)
else:
connection_string = None
job.server.future = executor.submit(
multiprocess_wrapper,
working_directory=job.project_hdf5.working_directory,
job_id=job.job_id,
file_path=file_path,
debug=False,
connection_string=connection_string,
)
[docs]
def run_job_with_runmode_executor_flux(
job: "pyiron_base.jobs.job.generic.GenericJob",
executor: "concurrent.futures.Executor",
gpus_per_slot: Optional[int] = None,
) -> None:
"""
Interface for the flux.job.FluxExecutor executor. Flux is a hierarchical resource management. It can either be used to
replace queuing systems like SLURM or be used as a user specific queuing system within an existing allocation.
pyiron provides two interfaces to flux, this executor interface as well as a traditional queuing system interface
via pysqa. This executor interface is designed for the development of asynchronous simulation protocols, while the
traditional queuing system interface simplifies the transition from other queuing systems like SLURM. The usuage
is analog to the concurrent.futures.Executor interface:
>>> from flux.job import FluxExecutor
>>> job.server.executor = FluxExecutor()
>>> job.run()
>>> job.server.future.done()
False
>>> job.server.future.result()
>>> job.server.future.done()
True
A word of caution - flux is currently only available on Linux, for all other operation systems the ProcessPoolExecutor
from the python standard library concurrent.futures is recommended. The advantage of flux over the ProcessPoolExecutor
is that flux takes over the resource management, like monitoring how many cores are available while with the
ProcessPoolExecutor this is left to the user.
Args:
job (GenericJob): pyiron job object
executor (flux.job.FluxExecutor): flux executor class which implements the executor interface defined in the
python concurrent.futures.Executor class.
gpus_per_slot (int): number of GPUs per MPI rank, typically 1
Returns:
concurrent.futures.Future: future object to develop asynchronous simulation protocols
"""
if not flux_available:
raise ModuleNotFoundError(
"No module named 'flux'. Running in flux mode is only available on Linux;"
"For CPU jobs, please use `conda install -c conda-forge flux-core`; for "
"GPU support you will additionally need "
"`conda install -c conda-forge flux-sched libhwloc=*=cuda*`"
)
executable_str, job_name = _generate_flux_execute_string(
job=job, database_is_disabled=state.database.database_is_disabled
)
jobspec = flux.job.JobspecV1.from_batch_command(
jobname=job_name,
script=executable_str,
num_nodes=1,
cores_per_slot=1,
gpus_per_slot=gpus_per_slot,
num_slots=job.server.cores,
)
jobspec.cwd = job.project_hdf5.working_directory
jobspec.environment = dict(os.environ)
job.server.future = executor.submit(jobspec)
[docs]
def run_time_decorator(func: callable) -> callable:
def wrapper(job: "pyiron_base.jobs.job.generic.GenericJob"):
if not state.database.database_is_disabled and job.job_id is not None:
job.project.db.item_update({"timestart": datetime.now()}, job.job_id)
output = func(job)
job.project.db.item_update(job._runtime(), job.job_id)
else:
output = func(job)
return output
return wrapper
[docs]
def execute_subprocess(
executable: str,
shell: bool,
working_directory: str,
cores: int = 1,
threads: int = 1,
gpus: int = 1,
conda_environment_name: Optional[str] = None,
conda_environment_path: Optional[str] = None,
) -> str:
"""
Execute a subprocess with the given parameters.
Args:
executable (str): The executable command to run.
shell (bool): If True, the command will be executed through the shell.
working_directory (str): The working directory for the subprocess.
cores (int, optional): The number of CPU cores to allocate. Defaults to 1.
threads (int, optional): The number of threads to allocate. Defaults to 1.
gpus (int, optional): The number of GPUs to allocate. Defaults to 1.
conda_environment_name (str, optional): The name of the conda environment to activate. Defaults to None.
conda_environment_path (str, optional): The path to the conda environment to activate. Defaults to None.
Returns:
str: The output of the subprocess.
Raises:
subprocess.CalledProcessError: If the subprocess returns a non-zero exit status.
Note:
- If both `conda_environment_name` and `conda_environment_path` are None, the subprocess will be executed using `subprocess.run`.
- If `conda_environment_name` is not None, the subprocess will be executed using `conda_subprocess.run` with the specified environment name.
- If `conda_environment_path` is not None, the subprocess will be executed using `conda_subprocess.run` with the specified environment path.
"""
environment_dict = os.environ.copy()
environment_dict.update(
{
"PYIRON_CORES": str(cores),
"PYIRON_THREADS": str(threads),
"PYIRON_GPUS": str(gpus),
}
)
if conda_environment_name is None and conda_environment_path is None:
out = subprocess.run(
executable,
cwd=working_directory,
shell=shell,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
check=True,
env=environment_dict,
).stdout
else:
import conda_subprocess
if conda_environment_name is not None:
conda_environment_path = None
out = conda_subprocess.run(
executable,
cwd=working_directory,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
check=True,
env=environment_dict,
prefix_name=conda_environment_name,
prefix_path=conda_environment_path,
).stdout
return out
[docs]
@run_time_decorator
def execute_job_with_external_executable(
job: "pyiron_base.jobs.job.generic.GenericJob",
) -> str:
"""
The run static function is called by run to execute the simulation.
Args:
job (GenericJob): pyiron job object
"""
job._logger.info(
"{}, status: {}, run job (modal)".format(job.job_info_str, job.status)
)
if job.executable.executable_path == "":
job.status.aborted = True
job._hdf5["status"] = job.status.string
raise ValueError("No executable set!")
executable, shell = job.executable.get_input_for_subprocess_call(
cores=job.server.cores, threads=job.server.threads, gpus=job.server.gpus
)
job_crashed, out = False, None
try:
out = execute_subprocess(
executable=executable,
shell=shell,
working_directory=job.working_directory,
conda_environment_name=job.server.conda_environment_name,
conda_environment_path=job.server.conda_environment_path,
cores=job.server.cores,
threads=job.server.threads,
gpus=job.server.gpus,
)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
job_crashed, out = handle_failed_job(job=job, error=e)
job._logger.info(
"{}, status: {}, output: {}".format(job.job_info_str, job.status, out)
)
with open(
posixpath.join(job.project_hdf5.working_directory, "error.out"), mode="w"
) as f_err:
f_err.write(out)
handle_finished_job(job=job, job_crashed=job_crashed, collect_output=True)
return out
[docs]
def handle_finished_job(
job: "pyiron_base.jobs.job.generic.GenericJob",
job_crashed: bool = False,
collect_output: bool = True,
) -> None:
"""
Handle finished jobs, collect the calculation output and set the status to aborted if the job crashed
Args:
job (GenericJob): pyiron job object
job_crashed (boolean): flag to indicate failed jobs
collect_output (boolean): flag to indicate if the collect_output() function should be called
"""
job.set_input_to_read_only()
if collect_output:
job.status.collect = True
job.run()
if job_crashed:
job.status.aborted = True
job._hdf5["status"] = job.status.string
[docs]
def handle_failed_job(
job: "pyiron_base.jobs.job.generic.GenericJob", error: subprocess.SubprocessError
) -> Tuple[bool, str]:
"""
Handle failed jobs write error message to text file and update database
Args:
job (GenericJob): pyiron job object
error (subprocess.SubprocessError): error of the subprocess executing the job
Returns:
boolean, str: job crashed and error message
"""
if hasattr(error, "output"):
out = error.output
if error.returncode in job.executable.accepted_return_codes:
return False, out
elif not job.server.accept_crash:
job._logger.warning(error.output)
error_file = posixpath.join(job.project_hdf5.working_directory, "error.msg")
with open(error_file, "w") as f:
f.write(error.output)
raise_runtimeerror_for_failed_job(job=job)
else:
return True, out
else:
raise_runtimeerror_for_failed_job(job=job)
[docs]
def raise_runtimeerror_for_failed_job(
job: "pyiron_base.jobs.job.generic.GenericJob",
) -> None:
job._logger.warning("Job aborted")
job.status.aborted = True
job._hdf5["status"] = job.status.string
job.run_time_to_db()
if job.server.run_mode.non_modal:
state.database.close_connection()
raise RuntimeError("Job aborted")
[docs]
def multiprocess_wrapper(
working_directory: str,
job_id: Optional[int] = None,
file_path: Optional[str] = None,
debug: bool = False,
connection_string: Optional[str] = None,
):
"""
Wrapper function for running a job in a separate process.
Args:
working_directory (str): The working directory for the job.
job_id (Optional[int], optional): The ID of the job. Defaults to None.
file_path (Optional[str], optional): The file path of the job. Defaults to None.
debug (bool, optional): Whether to run the job in debug mode. Defaults to False.
connection_string (Optional[str], optional): The connection string for the job. Defaults to None.
Raises:
ValueError: If both job_id and file_path are None.
Returns:
None
"""
if job_id is not None:
job_wrap = JobWrapper(
working_directory=str(working_directory),
job_id=int(job_id),
debug=debug,
connection_string=connection_string,
)
elif file_path is not None:
hdf5_file = (
".".join(file_path.split(".")[:-1])
+ "."
+ file_path.split(".")[-1].split("/")[0]
)
h5_path = "/".join(file_path.split(".")[-1].split("/")[1:])
job_wrap = JobWrapper(
working_directory,
job_id=None,
hdf5_file=hdf5_file,
h5_path="/" + h5_path,
debug=debug,
connection_string=connection_string,
)
else:
raise ValueError("Either job_id or file_path have to be not None.")
with catch_signals(job_wrap.job.signal_intercept):
job_wrap.job.run_static()
[docs]
def execute_command_with_error_handling(
executable: str,
shell: bool,
working_directory: str,
cores: int = 1,
threads: int = 1,
gpus: int = 1,
conda_environment_name: Optional[str] = None,
conda_environment_path: Optional[str] = None,
accepted_return_codes: List[int] = [],
accept_crash: bool = False,
) -> Tuple[bool, str]:
"""
Execute command including error handling and support for execution in separate conda environment
Args:
executable (str): Executable to be executed in the working directory.
shell (bool): The shell parameter from the subprocess.Popen() function of the python standard library.
working_directory (str): Directory the calculation is executed in.
conda_environment_name (str): Name of a conda environment to execute the executable in.
conda_environment_path (str): Path of a conda environment to execute the executable in.
accept_crash (bool): Boolean flag to accept crashes.
accepted_return_codes (list): List of accepted return codes.
Returns:
bool, str: boolean flag if the execution crashed with an acceptable error and string output of the command line
"""
job_crashed = False
try:
shell_output = execute_subprocess(
executable=executable,
shell=shell,
working_directory=working_directory,
conda_environment_name=conda_environment_name,
conda_environment_path=conda_environment_path,
cores=cores,
threads=threads,
gpus=gpus,
)
except (subprocess.CalledProcessError, FileNotFoundError) as error:
if hasattr(error, "output"):
shell_output = error.output
if error.returncode not in accepted_return_codes and not accept_crash:
error_file = posixpath.join(working_directory, "error.msg")
with open(error_file, "w") as f:
f.write(error.output)
raise RuntimeError("External executable failed")
else:
job_crashed = True
else:
raise RuntimeError("Job aborted")
with open(posixpath.join(working_directory, "error.out"), mode="w") as f_err:
f_err.write(shell_output)
return job_crashed, shell_output
[docs]
@run_time_decorator
def execute_job_with_calculate_function(
job: "pyiron_base.jobs.job.generic.GenericJob",
) -> None:
"""
The run_static() function is called internally in pyiron to trigger the execution of the executable. This is
typically divided into three steps: (1) the generation of the calculate function and its inputs, (2) the
execution of this function and (3) storing the output of this function in the HDF5 file.
In future the execution of the calculate function might be transferred to a separate process, so the separation
in these three distinct steps is necessary to simplify the submission to an external executor.
"""
try:
(
shell_output,
parsed_output,
job_crashed,
) = job.get_calculate_function()(**job.calculate_kwargs)
except RuntimeError:
raise_runtimeerror_for_failed_job(job=job)
else:
job.set_input_to_read_only()
if job_crashed:
job.status.aborted = True
else:
job.save_output(output_dict=parsed_output, shell_output=shell_output)
if not job.convergence_check():
job.status.not_converged = True
else:
if job._compress_by_default:
job.compress()
job.status.finished = True
job._hdf5["status"] = job.status.string
job.update_master()
def _generate_flux_execute_string(
job: "pyiron_base.jobs.job.generic.GenericJob", database_is_disabled: bool
) -> Tuple[str, str]:
from jinja2 import Template
if not database_is_disabled:
executable_template = Template(
"#!/bin/bash\n"
+ "python -m pyiron_base.cli wrapper -p {{working_directory}} -j {{job_id}}"
)
executable_str = executable_template.render(
working_directory=job.working_directory,
job_id=str(job.job_id),
)
job_name = "pi_" + str(job.job_id)
else:
executable_template = Template(
"#!/bin/bash\n"
+ "python -m pyiron_base.cli wrapper -p {{working_directory}} -f {{file_name}}{{h5_path}}"
)
executable_str = executable_template.render(
working_directory=job.working_directory,
file_name=job.project_hdf5.file_name,
h5_path=job.project_hdf5.h5_path,
)
job_name = "pi_" + job.job_name
return executable_str, job_name