Source code for pyiron_base.jobs.job.extension.server.queuestatus

# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.
"""
Set of functions to interact with the queuing system directly from within pyiron - optimized for the Sun grid engine.
"""

import time
from concurrent.futures import Future
from typing import List, Optional, Union

import numpy as np
import pandas

from pyiron_base.jobs.job.extension.jobstatus import job_status_finished_lst
from pyiron_base.state import state
from pyiron_base.utils.instance import static_isinstance

__author__ = "Jan Janssen"
__copyright__ = (
    "Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
    "Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "production"
__date__ = "Sep 1, 2017"

QUEUE_SCRIPT_PREFIX = "pi_"


[docs] def queue_table( job_ids: Optional[List[int]] = None, working_directory_lst: Optional[List[str]] = None, project_only: bool = True, full_table: bool = False, ) -> pandas.DataFrame: """ Display the queuing system table as pandas.Dataframe Args: job_ids (list): check for a specific list of job IDs - empty list by default working_directory_lst (list): list of working directories to include - empty list by default project_only (bool): Query only for jobs within the current project - True by default full_table (bool): Return all entries from the queuing system without filtering - False by default Returns: pandas.DataFrame: Output from the queuing system - optimized for the Sun grid engine """ job_ids = [] if job_ids is None else job_ids working_directory_lst = ( [] if working_directory_lst is None else working_directory_lst ) if project_only and not job_ids and not working_directory_lst: return [] if state.queue_adapter is not None: if full_table: pandas.set_option("display.max_rows", None) pandas.set_option("display.max_columns", None) else: pandas.reset_option("display.max_rows") pandas.reset_option("display.max_columns") df = state.queue_adapter.get_status_of_my_jobs() if not project_only: return df[ [ True if QUEUE_SCRIPT_PREFIX in job_name else False for job_name in list(df.jobname) ] ] else: if len(job_ids) > len(working_directory_lst): job_name_lst = [QUEUE_SCRIPT_PREFIX + str(job_id) for job_id in job_ids] return df[ [ True if job_name in job_name_lst else False for job_name in list(df.jobname) ] ] else: if len(df) > 0 and "working_directory" in df.columns: return df[ [ any( [ working_dir.startswith(p) for p in working_directory_lst ] ) for working_dir in list(df.working_directory) ] ] else: return df else: return None
[docs] def queue_check_job_is_waiting_or_running( item: Union[int, "pyiron_base.jobs.job.generic.GenericJob"], ) -> Union[bool, None]: """ Check if a job is still listed in the queue system as either waiting or running. Args: item (int, GenericJob): Provide either the job_ID or the full hamiltonian Returns: bool: [True/False] """ que_id = validate_que_request(item) if state.queue_adapter is not None: return state.queue_adapter.get_status_of_job(process_id=que_id) in [ "pending", "running", ] else: return None
[docs] def queue_info_by_job_id(job_id: int) -> dict: """ Display the queuing system info of job by qstat | grep shell command as dictionary Args: job_id (int): query for a specific job_id Returns: dict: Dictionary with the output from the queuing system - optimized for the Sun grid engine """ if state.queue_adapter is not None: return state.queue_adapter.get_status_of_job(process_id=job_id) else: return None
[docs] def queue_is_empty() -> bool: """ Check if the queue table is currently empty - no more jobs to wait for. Returns: bool: True if the table is empty, else False - optimized for the Sun grid engine """ if state.queue_adapter is not None: return len(state.queue_adapter.get_status_of_my_jobs()) == 0 else: return True
[docs] def queue_delete_job( item: Union[int, "pyiron_base.jobs.job.generic.GenericJob"], ) -> Union[str, None]: """ Delete a job from the queuing system Args: item (int, pyiron_base.jobs.job.generic.GenericJob): Provide either the job_ID or the full hamiltonian Returns: str: Output from the queuing system as string - optimized for the Sun grid engine """ que_id = validate_que_request(item) if state.queue_adapter is not None: return state.queue_adapter.delete_job(process_id=que_id) else: return None
[docs] def queue_enable_reservation( item: Union[int, "pyiron_base.jobs.job.generic.GenericJob"], ) -> Union[str, None]: """ Enable a reservation for a particular job within the queuing system Args: item (int, pyiron_base.jobs.job.generic.GenericJob): Provide either the job_ID or the full hamiltonian Returns: str: Output from the queuing system as string - optimized for the Sun grid engine """ que_id = validate_que_request(item) if state.queue_adapter is not None: if isinstance(que_id, list): return [ state.queue_adapter.enable_reservation(process_id=q) for q in que_id ] else: return state.queue_adapter.enable_reservation(process_id=que_id) else: return None
[docs] def wait_for_job( job: "pyiron_base.jobs.job.generic.GenericJob", interval_in_s: int = 5, max_iterations: int = 100, ) -> None: """ Sleep until the job is finished but maximum interval_in_s * max_iterations seconds. Args: job (pyiron_base.job.utils.GenericJob): Job to wait for interval_in_s (int): interval when the job status is queried from the database - default 5 sec. max_iterations (int): maximum number of iterations - default 100 Raises: ValueError: max_iterations reached, job still running """ if job.status.string not in job_status_finished_lst: if ( state.queue_adapter is not None and state.queue_adapter.remote_flag and job.server.queue is not None ): finished = False for _ in range(max_iterations): if not queue_check_job_is_waiting_or_running(item=job): state.queue_adapter.transfer_file_to_remote( file=job.project_hdf5.file_name, transfer_back=True, delete_file_on_remote=False, ) status_hdf5 = job.project_hdf5["status"] job.status.string = status_hdf5 else: status_hdf5 = job.status.string if status_hdf5 in job_status_finished_lst: job.transfer_from_remote() finished = True break time.sleep(interval_in_s) if not finished: raise ValueError( "Maximum iterations reached, but the job was not finished." ) else: finished = False for _ in range(max_iterations): if state.database.database_is_disabled: job.project.db.update() job.refresh_job_status() if job.status.string in job_status_finished_lst: finished = True break elif isinstance(job.server.future, Future): try: job.server.future.result(timeout=interval_in_s) except TimeoutError: pass else: finished = job.server.future.done() break else: time.sleep(interval_in_s) if not finished: raise ValueError( "Maximum iterations reached, but the job was not finished." )
[docs] def wait_for_jobs( project: "pyiron_base.project.generic.Project", interval_in_s: int = 5, max_iterations: int = 100, recursive: bool = True, ignore_exceptions: bool = False, try_collecting: bool = False, ) -> None: """ Wait for the calculation in the project to be finished Args: project: Project instance the jobs is located in interval_in_s (int): interval when the job status is queried from the database - default 5 sec. max_iterations (int): maximum number of iterations - default 100 recursive (bool): search subprojects [True/False] - default=True ignore_exceptions (bool): ignore eventual exceptions when retrieving jobs - default=False try_collecting (bool): try to run collect for fetched jobs that don't have a status counting as finished - default=False Raises: ValueError: max_iterations reached, but jobs still running """ finished = False for _ in range(max_iterations): project.update_from_remote(recursive=True, ignore_exceptions=ignore_exceptions) project.refresh_job_status() df = project.job_table(recursive=recursive) if all(df.status.isin(job_status_finished_lst)): finished = True break time.sleep(interval_in_s) if not finished: raise ValueError("Maximum iterations reached, but the job was not finished.")
[docs] def update_from_remote( project: "pyiron_base.project.generic.Project", recursive: bool = True, ignore_exceptions: bool = False, try_collecting: bool = False, ) -> None: """ Update jobs from the remote server Args: project: Project instance the jobs is located in recursive (bool): search subprojects [True/False] - default=True ignore_exceptions (bool): ignore eventual exceptions when retrieving jobs - default=False try_collecting (bool): try to collect jobs that don't have a status counting as finished - default=False Returns: returns None if ignore_exceptions is False or when no error occured. returns a list with job ids when errors occured, but were ignored """ if state.queue_adapter is not None and state.queue_adapter.remote_flag: df_project = project.job_table(recursive=recursive) df_submitted = df_project[df_project.status == "submitted"] df_combined = df_project[df_project.status.isin(["running", "submitted"])] df_queue = state.queue_adapter.get_status_of_my_jobs() if ( len(df_queue) > 0 and len(df_queue[df_queue.jobname.str.contains(QUEUE_SCRIPT_PREFIX)]) > 0 ): df_queue = df_queue[df_queue.jobname.str.contains(QUEUE_SCRIPT_PREFIX)] df_queue["pyiron_id"] = df_queue.apply( lambda x: int(x["jobname"].split(QUEUE_SCRIPT_PREFIX)[-1]), axis=1 ) queue_running = df_queue[df_queue.status == "running"].pyiron_id.values jobs_now_running_lst = df_submitted.id.values[ np.isin(df_submitted.id.values, queue_running) ] project.db.set_job_status(status="running", job_id=jobs_now_running_lst) fetch_ids = df_combined.id.values[ np.isin(df_combined.id.values, df_queue.pyiron_id.values, invert=True) ] else: # handle empty pyiron queue case for fetching fetch_ids = df_combined.id.values failed_jobs = [] for job_id in fetch_ids: try: job = project.load(job_id) retrieve_job(job, try_collecting=try_collecting) except Exception as e: if ignore_exceptions: state.logger.warning( f"An error occurred while trying to retrieve job {job_id}\n" f"Error message: \n{e}" ) failed_jobs.append(job_id) else: raise e if len(failed_jobs) > 0: return failed_jobs
[docs] def retrieve_job( job: "pyiron_base.jobs.job.generic.GenericJob", try_collecting: bool = False ) -> None: """ Retrieve a job from remote server and check if it has a "finished status". Optionally try to collect its output. Args: job: pyiron job try_collecting (bool): whether to run collect if not finished - default=False Returns: returns None """ job.transfer_from_remote() if job.status in job_status_finished_lst: return if try_collecting: job.status.collect = True job.run()
[docs] def validate_que_request( item: Union[int, "pyiron_base.jobs.job.generic.GenericJob"], ) -> int: """ Internal function to convert the job_ID or hamiltonian to the queuing system ID. Args: item (int, pyiron_base.jobs.job.generic.GenericJob): Provide either the job_ID or the full hamiltonian Returns: int: queuing system ID """ if isinstance(item, int): que_id = item elif static_isinstance( item.__class__, "pyiron_base.jobs.master.generic.GenericMaster" ): if item.server.queue_id: que_id = item.server.queue_id else: queue_id_lst = [ item.project.load(child_id).server.queue_id for child_id in item.child_ids ] que_id = [queue_id for queue_id in queue_id_lst if queue_id is not None] if len(que_id) == 0: raise ValueError("This job does not have a queue ID.") elif static_isinstance(item.__class__, "pyiron_base.jobs.job.generic.GenericJob"): if item.server.queue_id: que_id = item.server.queue_id else: raise ValueError("This job does not have a queue ID.") elif static_isinstance(item.__class__, "pyiron_base.jobs.job.base.JobCore"): if "server" in item.project_hdf5.list_nodes(): server_hdf_dict = item.project_hdf5["server"] if "qid" in server_hdf_dict.keys(): que_id = server_hdf_dict["qid"] else: raise ValueError("This job does not have a queue ID.") else: raise ValueError("This job does not have a queue ID.") else: raise TypeError( "The queue can either query for IDs or for pyiron GenericJobObjects." ) return que_id