# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.
"""
Server object class which is connected to each job containing the technical details how the job is executed.
"""
import numbers
import socket
from concurrent.futures import Executor, Future
from dataclasses import asdict, fields
from typing import Optional, Union
import pandas
from pyiron_dataclasses.v1.jobs.generic import Server as ServerDataClass
from pyiron_snippets.deprecate import deprecate
from pyiron_base.interfaces.has_dict import HasDict
from pyiron_base.interfaces.lockable import Lockable, sentinel
from pyiron_base.jobs.job.extension.server.runmode import Runmode
from pyiron_base.state import state
from pyiron_base.utils.instance import static_isinstance
__author__ = "Jan Janssen"
__copyright__ = (
"Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
"Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "production"
__date__ = "Sep 1, 2017"
[docs]
class Server(
Lockable, HasDict
): # add the option to return the job id and the hold id to the server object
"""
Generic Server object to handle the execution environment for the job
Args:
host (str): hostname of the local machine
queue (str): queue name of the currently selected queue
cores (int): number of cores
run_mode (pyiron_base.server.runmode.Runmode): mode of the job ['modal', 'non_modal', 'queue', 'manual']
new_hdf (bool): create a new HDF5 file [True/False] - default=True
accept_crash (bool): ignore execution errors raised by external executables - default False
run_time (int): run time limit in seconds for the job to finish - required for HPC job schedulers
memory_limit (str): memory required
qid (int): Queuing system ID - ID received from the HPC job scheduler
additional_arguments (dict): Additional arguments for the HPC job scheduler
conda_environment_name (str): Name of the conda environment
conda_environment_path (str): Path to the conda environment
Attributes:
.. attribute:: host
the hostname of the current system.
.. attribute:: queue
the queue selected for a current simulation.
.. attribute:: cores
the number of cores selected for the current simulation.
.. attribute:: run_time
the run time in seconds selected for the current simulation.
.. attribute:: run_mode
the run mode of the job ['modal', 'non_modal', 'queue', 'manual']
.. attribute:: memory_limit
the maximum amount of RAM allocated for the calculation in GB
.. attribute:: new_hdf
defines whether a subjob should be stored in the same HDF5 file or in a new one.
.. attribute:: executor
the executor can be used to execute the job object
.. attribute:: future
the concurrent.futures.Future object for monitoring the execution of the job object
"""
[docs]
def __init__(
self,
host: Optional[str] = None,
queue: Optional[str] = None,
cores: int = 1,
threads: int = 1,
gpus: Optional[int] = None,
run_mode: str = "modal",
new_hdf: bool = True,
accept_crash: bool = False,
run_time: Optional[int] = None,
memory_limit: Optional[str] = None,
qid: Optional[int] = None,
additional_arguments: dict = {},
conda_environment_name: Optional[str] = None,
conda_environment_path: Optional[str] = None,
):
super().__init__()
self._data = ServerDataClass(
user=state.settings.login_user,
host=self._init_host(host=host),
run_mode=run_mode,
cores=cores,
gpus=gpus,
threads=threads,
new_hdf=new_hdf,
accept_crash=accept_crash,
run_time=run_time,
memory_limit=memory_limit,
queue=queue,
qid=qid,
additional_arguments=additional_arguments,
conda_environment_name=conda_environment_name,
conda_environment_path=conda_environment_path,
)
self._run_mode = Runmode()
self._executor: Union[Executor, None] = None
self._future: Union[Future, None] = None
self.queue = queue
self.run_mode = run_mode
@property
def accept_crash(self) -> bool:
return self._data.accept_crash
@accept_crash.setter
@sentinel
def accept_crash(self, accept: bool) -> None:
self._data.accept_crash = accept
@property
def additional_arguments(self) -> dict:
return self._data.additional_arguments
@additional_arguments.setter
@sentinel
def additional_arguments(self, additional_arguments: dict) -> None:
self._data.additional_arguments = additional_arguments
@property
def queue(self) -> Union[str, None]:
"""
The que selected for a current simulation
Returns:
(str): schedulers_name
"""
return self._data.queue
@queue.setter
@sentinel
def queue(self, new_scheduler: Union[str, None]) -> None:
"""
Set a queue for the current simulation, by choosing one of the options
listed in :attribute:`~.queue_list`.
Args:
new_scheduler (str/None): scheduler name, None resets to default
run_mode modal
"""
if new_scheduler is None:
self._data.queue = None
self.run_mode.modal = True
self.cores = 1
self.threads = 1
self._data.run_time = None
self.memory_limit = None
else:
if state.queue_adapter is not None:
(
cores,
run_time_max,
memory_max,
) = state.queue_adapter.check_queue_parameters(
queue=new_scheduler,
cores=self.cores,
run_time_max=self.run_time,
memory_max=self.memory_limit,
)
if self.cores is not None and cores != self.cores:
self._data.cores = cores
state.logger.warning(f"Updated the number of cores to: {cores}")
if self.run_time is not None and run_time_max != self.run_time:
self._data.run_time = run_time_max
state.logger.warning(
f"Updated the run time limit to: {run_time_max}"
)
if self.memory_limit is not None and memory_max != self.memory_limit:
self._data.memory_limit = memory_max
state.logger.warning(f"Updated the memory limit to: {memory_max}")
self._data.queue = new_scheduler
self.run_mode = "queue"
else:
raise TypeError(
"No queue adapter defined. Have you "
"configured in $PYIRONRESOURCES_PATHS/queues?"
)
@property
def queue_id(self) -> int:
"""
Get the queue ID - the ID in the queuing system is most likely not the same as the job ID.
Returns:
int: queue ID
"""
return self._data.qid
@queue_id.setter
@sentinel
def queue_id(self, qid: int) -> None:
"""
Set the queue ID
Args:
qid (int): queue ID
"""
self._data.qid = int(qid)
@property
def threads(self) -> int:
"""
The number of threads selected for the current simulation
Returns:
(int): number of threads
"""
return self._data.threads
@threads.setter
@sentinel
def threads(self, number_of_threads: int) -> None:
"""
The number of threads selected for the current simulation
Args:
number_of_threads (int): number of threads
"""
self._data.threads = number_of_threads
@property
def gpus(self) -> Union[int, None]:
"""
Total number of GPUs to use for this calculation.
Returns:
int: Total number of GPUs to use for this calculation.
"""
return self._data.gpus
@gpus.setter
@sentinel
def gpus(self, number_of_gpus: int) -> None:
"""
Total number of GPUs to use for this calculation.
Args:
number_of_gpus (int): Total number of GPUs to use for this calculation.
"""
self._data.gpus = number_of_gpus
@property
def cores(self) -> int:
"""
The number of cores selected for the current simulation
Returns:
(int): number of cores
"""
return self._data.cores
@cores.setter
@sentinel
def cores(self, new_cores: int) -> None:
"""
The number of cores selected for the current simulation
Args:
new_cores (int): number of cores
"""
if not isinstance(new_cores, numbers.Integral):
converted = int(new_cores)
# if the conversion truncated the number, raise error otherwise silently accept it
if new_cores != converted:
raise ValueError(f"cores must be an integer number, not {new_cores}!")
new_cores = converted
if state.queue_adapter is not None and self._data.queue is not None:
cores = state.queue_adapter.check_queue_parameters(
queue=self.queue,
cores=new_cores,
run_time_max=self.run_time,
memory_max=self.memory_limit,
)[0]
if cores != new_cores:
self._data.cores = cores
state.logger.warning(f"Updated the number of cores to: {cores}")
else:
self._data.cores = new_cores
else:
self._data.cores = new_cores
@property
def run_time(self) -> int:
"""
The run time in seconds selected for the current simulation
Returns:
(int): run time in seconds
"""
return self._data.run_time
@run_time.setter
@sentinel
def run_time(self, new_run_time: int) -> None:
"""
The run time in seconds selected for the current simulation
Args:
new_run_time (int): run time in seconds
Raises:
ValueError: if new_run_time cannot be converted to int
"""
new_run_time = int(new_run_time)
if state.queue_adapter is not None and self._data.queue is not None:
run_time_max = state.queue_adapter.check_queue_parameters(
queue=self.queue,
cores=self.cores,
run_time_max=new_run_time,
memory_max=self.memory_limit,
)[1]
if run_time_max != new_run_time:
self._data.run_time = run_time_max
state.logger.warning(f"Updated the run time limit to: {run_time_max}")
else:
self._data.run_time = new_run_time
else:
self._data.run_time = new_run_time
@property
def memory_limit(self) -> str:
return self._data.memory_limit
@memory_limit.setter
@sentinel
def memory_limit(self, limit: str) -> None:
if state.queue_adapter is not None and self._data.queue is not None:
memory_max = state.queue_adapter.check_queue_parameters(
queue=self.queue,
cores=self.cores,
run_time_max=self.run_time,
memory_max=limit,
)[2]
if memory_max != limit:
self._data.memory_limit = memory_max
state.logger.warning(f"Updated the memory limit to: {memory_max}")
else:
self._data.memory_limit = limit
else:
self._data.memory_limit = limit
@property
def run_mode(self) -> str:
"""
Get the run mode of the job
Returns:
(str/pyiron_base.server.runmode.Runmode): ['modal', 'non_modal', 'queue', 'manual', 'thread', 'worker',
'interactive', 'interactive_non_modal', 'srun', 'executor']
"""
return self._run_mode
@run_mode.setter
@sentinel
def run_mode(self, new_mode: str) -> None:
"""
Set the run mode of the job
Args:
new_mode (str): ['modal', 'non_modal', 'queue', 'manual', 'thread', 'worker', 'interactive',
'interactive_non_modal', 'srun', 'executor']
"""
self._run_mode.mode = new_mode
if new_mode == "queue":
if state.queue_adapter is None:
raise TypeError("No queue adapter defined.")
if self._data.queue is None:
self.queue = state.queue_adapter.config["queue_primary"]
@property
def new_hdf(self) -> bool:
"""
New_hdf5 defines whether a subjob should be stored in the same HDF5 file or in a new one.
Returns:
(bool): [True / False]
"""
return self._data.new_hdf
@new_hdf.setter
@sentinel
def new_hdf(self, new_hdf_bool: bool) -> None:
"""
New_hdf5 defines whether a subjob should be stored in the same HDF5 file or in a new one.
Args:
new_hdf_bool (bool): [True / False]
"""
if isinstance(new_hdf_bool, bool):
self._data.new_hdf = new_hdf_bool
else:
raise TypeError(
"The new_hdf5 is a boolean property, defining whether subjobs are stored in the same file."
)
@property
def queue_list(self) -> list:
"""
List the available Job scheduler provided by the system.
Returns:
(list)
"""
return self.list_queues()
@property
def queue_view(self) -> pandas.DataFrame:
"""
List the available Job scheduler provided by the system.
Returns:
(pandas.DataFrame)
"""
return self.view_queues()
@property
def executor(self) -> Union[Executor, None]:
"""
Executor to execute the job object this server object is attached to in the background.
Returns:
concurrent.futures.Executor:
"""
if not self.run_mode.executor and self._executor is not None:
self._executor = None
return self._executor
@executor.setter
@sentinel
def executor(self, exe: Union[Executor, None]):
"""
Executor to execute the job object this server object is attached to in the background.
Args:
exe (concurrent.futures.Executor):
"""
if isinstance(exe, Executor):
self.run_mode.executor = True
elif static_isinstance(exe, "flux.job.executor.FluxExecutor"):
self.run_mode.executor = True
elif exe is None and self.run_mode.executor:
self.run_mode.modal = True
elif exe is not None:
raise TypeError(
"The executor has to be derived from the concurrent.futures.Executor class."
)
self._executor = exe
@property
def future(self) -> Union[Future, None]:
"""
Python concurrent.futures.Future object to track the status of the execution of the job this server object is
attached to. This is an internal pyiron feature and most users never have to interact with the future object
directly.
Returns:
concurrent.futures.Future: future object to track the status of the execution
"""
return self._future
# We don't wrap future in sentinel, to allow it later to be dropped to
# None, once execution is finished
@future.setter
def future(self, future_obj: Future) -> None:
"""
Set a python concurrent.futures.Future object to track the status of the execution of the job this server object
is attached to. This is an internal pyiron feature and most users never have to interact with the future object
directly.
Args:
future_obj (concurrent.futures.Future): future object to track the status of the execution
"""
self._future = future_obj
@property
def conda_environment_name(self) -> str:
"""
Get name of the conda environment the job should be executed in.
Returns:
str: name of the conda environment
"""
return self._data.conda_environment_name
@conda_environment_name.setter
@sentinel
def conda_environment_name(self, environment_name: str) -> None:
"""
Set name of the conda environment the job should be executed in.
Args:
environment_name (str): name of the conda environment
"""
self._data.conda_environment_name = environment_name
@property
def conda_environment_path(self) -> str:
"""
Get path of the conda environment the job should be executed in.
Returns:
str: path of the conda environment
"""
return self._data.conda_environment_path
@conda_environment_path.setter
@sentinel
def conda_environment_path(self, environment_path: str) -> None:
"""
Set path of the conda environment the job should be executed in.
Args:
environment_path (str): path of the conda environment
"""
self._data.conda_environment_path = environment_path
[docs]
@staticmethod
def list_queues() -> list:
"""
List the available Job scheduler provided by the system.
Returns:
(list)
"""
if state.queue_adapter is not None:
return state.queue_adapter.queue_list
else:
return None
[docs]
@staticmethod
def view_queues() -> pandas.DataFrame:
"""
List the available Job scheduler provided by the system.
Returns:
(pandas.DataFrame)
"""
if state.queue_adapter is not None:
return state.queue_adapter.queue_view
else:
return None
def _to_dict(self) -> dict:
"""
Convert the Server object to a dictionary.
Returns:
dict: The Server object as a dictionary.
"""
self._data.run_mode = self._run_mode.mode
return asdict(self._data)
def _from_dict(self, obj_dict: dict, version: Optional[str] = None) -> None:
"""
Load the Server object from a dictionary.
Args:
obj_dict (dict): The dictionary containing the Server object data.
version (str, optional): The version of the Server object. Defaults to None.
"""
# backwards compatibility
if "new_h5" in obj_dict.keys():
obj_dict["new_hdf"] = obj_dict.pop("new_h5") == 1
for key in ["conda_environment_name", "conda_environment_path", "qid"]:
if key not in obj_dict.keys():
obj_dict[key] = None
if "accept_crash" not in obj_dict.keys():
obj_dict["accept_crash"] = False
if "additional_arguments" not in obj_dict.keys():
obj_dict["additional_arguments"] = {}
# Reload dataclass and discard unknown keys
server_fields = tuple(f.name for f in fields(ServerDataClass))
# force tuple otherwise dict complains about changing size
for key in tuple(obj_dict):
if key not in server_fields:
del obj_dict[key]
self._data = ServerDataClass(**obj_dict)
self._run_mode = Runmode(mode=self._data.run_mode)
[docs]
@deprecate(message="Use job.server.to_dict() instead of to_hdf()", version=0.9)
def to_hdf(
self,
hdf: "pyiron_base.storage.hdfio.ProjectHDFio",
group_name: Optional[str] = None,
) -> None:
"""
Store Server object in HDF5 file
Args:
hdf: HDF5 object
group_name (str): node name in the HDF5 file
"""
if group_name is not None:
with hdf.open(group_name) as hdf_group:
hdf_group["server"] = self.to_dict()
else:
hdf["server"] = self.to_dict()
[docs]
@deprecate(message="Use job.server.from_dict() instead of from_hdf()", version=0.9)
def from_hdf(
self,
hdf: "pyiron_base.storage.hdfio.ProjectHDFio",
group_name: Optional[str] = None,
) -> None:
"""
Recover Server object in HDF5 file
Args:
hdf: HDF5 object
group_name: node name in the HDF5 file
"""
if group_name is not None:
with hdf.open(group_name) as hdf_group:
self.from_dict(obj_dict=hdf_group["server"])
else:
self.from_dict(obj_dict=hdf["server"])
[docs]
def db_entry(self) -> str:
"""
connect all the info regarding the server into a single word that can be used e.g. as entry in a database
Returns:
(str): server info as single word
"""
if self.run_mode.queue:
server_lst = [self._data.host, str(self.cores), self.queue]
else:
server_lst = [self._data.host, str(self.cores)]
return self._data.user + "@" + "#".join(server_lst)
def __del__(self) -> None:
"""
Delete the Server object from memory
"""
del self._run_mode
del self._data
@staticmethod
def _init_host(host) -> Union[str, None]:
if host is None:
return socket.gethostname()
else:
return host