import os
from typing import Callable, Optional, Union
import pandas
from jinja2 import Template
from pysqa.base.abstract import QueueAdapterAbstractClass
from pysqa.base.config import QueueAdapterWithConfig, Queues, read_config
from pysqa.base.core import QueueAdapterCore, execute_command
from pysqa.base.modular import ModularQueueAdapter
[docs]
class QueueAdapter(QueueAdapterAbstractClass):
"""
The goal of the QueueAdapter class is to make submitting to a queue system as easy as starting another sub process
locally.
Args:
directory (str): directory containing the queue.yaml files as well as corresponding jinja2 templates for the
individual queues.
Attributes:
.. attribute:: config
QueueAdapter configuration read from the queue.yaml file.
.. attribute:: queue_list
List of available queues
.. attribute:: queue_view
Pandas DataFrame representation of the available queues, read from queue.yaml.
.. attribute:: queues
Queues available for auto completion QueueAdapter().queues.<queue name> returns the queue name.
"""
[docs]
def __init__(
self,
directory: Optional[str] = None,
queue_type: Optional[str] = None,
execute_command: Callable = execute_command,
):
"""
Initialize the QueueAdapter.
Args:
directory (str): Directory containing the queue.yaml files and corresponding templates.
execute_command (Callable): Function to execute commands.
"""
if directory is not None:
queue_yaml = os.path.join(directory, "queue.yaml")
clusters_yaml = os.path.join(directory, "clusters.yaml")
if os.path.exists(queue_yaml):
self._queue_dict = {
"default": set_queue_adapter(
config=read_config(file_name=queue_yaml),
directory=directory,
execute_command=execute_command,
)
}
primary_queue = "default"
elif os.path.exists(clusters_yaml):
config = read_config(file_name=clusters_yaml)
self._queue_dict = {
k: set_queue_adapter(
config=read_config(file_name=os.path.join(directory, v)),
directory=directory,
execute_command=execute_command,
)
for k, v in config["cluster"].items()
}
primary_queue = config["cluster_primary"]
else:
raise ValueError(
"Neither a queue.yaml file nor a clusters.yaml file were found in "
+ directory
)
self._adapter = self._queue_dict[primary_queue]
elif queue_type is not None:
self._queue_dict = {}
self._adapter = QueueAdapterCore(
queue_type=queue_type.upper(),
execute_command=execute_command,
)
else:
raise ValueError()
[docs]
def list_clusters(self) -> list[str]:
"""
List available computing clusters for remote submission
Returns:
List of computing clusters
"""
return list(self._queue_dict.keys())
[docs]
def switch_cluster(self, cluster_name: str):
"""
Switch to a different computing cluster
Args:
cluster_name (str): name of the computing cluster
"""
self._adapter = self._queue_dict[cluster_name]
@property
def config(self) -> Union[dict, None]:
"""
Get the QueueAdapter configuration.
Returns:
dict: The QueueAdapter configuration.
"""
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.config
else:
return None
@property
def ssh_delete_file_on_remote(self) -> bool:
"""
Get the value of ssh_delete_file_on_remote property.
Returns:
bool: The value of ssh_delete_file_on_remote property.
"""
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.ssh_delete_file_on_remote
else:
return False
@property
def remote_flag(self) -> bool:
"""
Get the value of remote_flag property.
Returns:
bool: The value of remote_flag property.
"""
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.remote_flag
else:
return False
@property
def queue_list(self) -> Union[list[str], None]:
"""
Get the list of available queues.
Returns:
List[str]: The list of available queues.
"""
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.queue_list
else:
return None
@property
def queue_view(self) -> Union[pandas.DataFrame, None]:
"""
Get the Pandas DataFrame representation of the available queues.
Returns:
pandas.DataFrame: The Pandas DataFrame representation of the available queues.
"""
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.queue_view
else:
return None
@property
def queues(self) -> Union[Queues, None]:
"""
Get the list of available queues.
Returns:
List[str]: The list of available queues.
"""
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.queues
else:
return None
[docs]
def submit_job(
self,
queue: Optional[str] = None,
job_name: Optional[str] = None,
working_directory: Optional[str] = None,
cores: Optional[int] = None,
memory_max: Optional[Union[int, str]] = None,
run_time_max: Optional[int] = None,
dependency_list: Optional[list[int]] = None,
command: Optional[str] = None,
submission_template: Optional[Union[str, Template]] = None,
**kwargs,
) -> int:
"""
Submits command to the given queue.
Args:
queue (str/None): Name of the queue to submit to, must be one of the names configured for this adapter
(optional)
job_name (str/None): Name of the job for the underlying queuing system (optional)
working_directory (str/None): Directory to run the job in (optional)
cores (int/None): Number of hardware threads requested (optional)
memory_max (int/None): Amount of memory requested per node in GB (optional)
run_time_max (int/None): Maximum runtime in seconds (optional)
dependency_list(list[str]/None: Job ids of jobs to be completed before starting (optional)
command (str/None): shell command to run in the job
**kwargs: allows writing additional parameters to the job submission script if they are available in the
corresponding template.
Returns:
int: Job id received from the queuing system for the job which was submitted
"""
return self._adapter.submit_job(
queue=queue,
job_name=job_name,
working_directory=working_directory,
cores=cores,
memory_max=memory_max,
run_time_max=run_time_max,
dependency_list=dependency_list,
command=command,
submission_template=submission_template,
**kwargs,
)
[docs]
def enable_reservation(self, process_id: int) -> str:
"""
Enable reservation for a process.
Args:
process_id (int): The process id.
Returns:
str: The result of enabling reservation.
"""
return self._adapter.enable_reservation(process_id=process_id)
[docs]
def get_job_from_remote(self, working_directory: str):
"""
Get the results of the calculation - this is necessary when the calculation was executed on a remote host.
Args:
working_directory (str): The working directory.
"""
if isinstance(self._adapter, QueueAdapterWithConfig):
self._adapter.get_job_from_remote(working_directory=working_directory)
else:
raise TypeError()
[docs]
def transfer_file_to_remote(
self,
file: str,
transfer_back: bool = False,
delete_file_on_remote: bool = False,
):
"""
Transfer file from remote host to local host.
Args:
file (str): The file to transfer.
transfer_back (bool): Whether to transfer the file back.
delete_file_on_remote (bool): Whether to delete the file on the remote host.
"""
if isinstance(self._adapter, QueueAdapterWithConfig):
self._adapter.transfer_file(
file=file,
transfer_back=transfer_back,
delete_file_on_remote=delete_file_on_remote,
)
else:
raise TypeError()
[docs]
def convert_path_to_remote(self, path: str) -> str:
"""
Convert a local path to a remote path.
Args:
path (str): The local path.
Returns:
str: The remote path.
"""
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.convert_path_to_remote(path=path)
else:
raise TypeError()
[docs]
def delete_job(self, process_id: int) -> str:
"""
Delete a job.
Args:
process_id (int): The process id.
Returns:
str: The result of deleting the job.
"""
return self._adapter.delete_job(process_id=process_id)
[docs]
def get_queue_status(self, user: Optional[str] = None) -> pandas.DataFrame:
"""
Get the status of the queue.
Args:
user (str/None): The user.
Returns:
pandas.DataFrame: The status of the queue.
"""
return self._adapter.get_queue_status(user=user)
[docs]
def get_status_of_my_jobs(self) -> pandas.DataFrame:
"""
Get the status of the user's jobs.
Returns:
pandas.DataFrame: The status of the user's jobs.
"""
return self._adapter.get_status_of_my_jobs()
[docs]
def get_status_of_job(self, process_id: int) -> str:
"""
Get the status of a job.
Args:
process_id: The process id.
Returns:
str: The status of the job. Possible values are ['running', 'pending', 'error'].
"""
return self._adapter.get_status_of_job(process_id=process_id)
[docs]
def get_status_of_jobs(self, process_id_lst: list[int]) -> list[str]:
"""
Get the status of multiple jobs.
Args:
process_id_lst: The list of process ids.
Returns:
List[str]: The status of the jobs. Possible values are ['running', 'pending', 'error', ...].
"""
return self._adapter.get_status_of_jobs(process_id_lst=process_id_lst)
[docs]
def check_queue_parameters(
self,
queue: str,
cores: int = 1,
run_time_max: Optional[int] = None,
memory_max: Optional[int] = None,
active_queue: Optional[dict] = None,
) -> tuple[
Union[float, int, None], Union[float, int, None], Union[float, int, str, None]
]:
"""
Check the parameters of a queue.
Args:
queue (str/None): The queue name.
cores (int): The number of cores.
run_time_max (int/None): The maximum runtime.
memory_max (int/None): The maximum memory.
active_queue (dict/None): The active queue.
Returns:
List: A list containing the checked parameters [cores, run_time_max, memory_max].
"""
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.check_queue_parameters(
queue=queue,
cores=cores,
run_time_max=run_time_max,
memory_max=memory_max,
active_queue=active_queue,
)
else:
return cores, run_time_max, memory_max
[docs]
def set_queue_adapter(
config: dict, directory: str, execute_command: Callable = execute_command
):
"""
Initialize the queue adapter
Args:
config (dict): configuration for one cluster
directory (str): directory which contains the queue configurations
"""
if config["queue_type"] in ["SGE", "TORQUE", "SLURM", "LSF", "MOAB", "FLUX"]:
return QueueAdapterWithConfig(
config=config, directory=directory, execute_command=execute_command
)
elif config["queue_type"] in ["GENT"]:
return ModularQueueAdapter(
config=config, directory=directory, execute_command=execute_command
)
elif config["queue_type"] in ["REMOTE"]:
# The RemoteQueueAdapter has additional dependencies, namely paramiko and tqdm.
# By moving the import to this line it only fails when the user specifies the
# RemoteQueueAdapter in their pysqa configuration.
from pysqa.base.remote import RemoteQueueAdapter
return RemoteQueueAdapter(
config=config, directory=directory, execute_command=execute_command
)
else:
raise ValueError(
"The queue_type "
+ config["queue_type"]
+ " is not found in the list of supported queue types "
+ str(["SGE", "TORQUE", "SLURM", "LSF", "MOAB", "FLUX", "GENT", "REMOTE"])
)