import os
from typing import Callable, Optional, Union
import pandas
import yaml
from jinja2 import Template
from jinja2.exceptions import TemplateSyntaxError
from pysqa.base.core import QueueAdapterCore, execute_command
from pysqa.base.validate import check_queue_parameters, value_error_if_none
try:
from pysqa.base.models import validate_config
except ImportError:
def validate_config(config: dict) -> dict:
return config
[docs]
class Queues:
"""
Queues is an abstract class simply to make the list of queues available for auto completion. This is mainly used in
interactive environments like jupyter.
"""
[docs]
def __init__(self, list_of_queues: list[str]):
"""
Initialize the Queues object.
Args:
list_of_queues (List[str]): A list of queue names.
"""
self._list_of_queues = list_of_queues
def __getattr__(self, item: str) -> str:
"""
Get the queue name.
Args:
item (str): The name of the queue.
Returns:
str: The name of the queue.
Raises:
AttributeError: If the queue name is not in the list of queues.
"""
if item in self._list_of_queues:
return item
else:
raise AttributeError
def __dir__(self) -> list[str]:
"""
Get the list of queues.
Returns:
List[str]: The list of queues.
"""
return self._list_of_queues
[docs]
class QueueAdapterWithConfig(QueueAdapterCore):
"""
The goal of the QueueAdapter class is to make submitting to a queue system as easy as starting another sub process
locally.
Args:
config (dict): Configuration for the QueueAdapter.
directory (str): Directory containing the queue.yaml files as well as corresponding jinja2 templates for the individual queues.
execute_command(funct): Function to execute commands.
"""
[docs]
def __init__(
self,
config: dict,
directory: str = "~/.queues",
execute_command: Callable = execute_command,
):
self._config = validate_config(config)
super().__init__(
queue_type=self._config["queue_type"], execute_command=execute_command
)
self._fill_queue_dict(queue_lst_dict=self._config["queues"])
self._load_templates(queue_lst_dict=self._config["queues"], directory=directory)
self._queues = Queues(self.queue_list)
self._remote_flag = False
self._ssh_delete_file_on_remote = True
@property
def ssh_delete_file_on_remote(self) -> bool:
"""
Get the value of ssh_delete_file_on_remote.
Returns:
bool: The value of ssh_delete_file_on_remote.
"""
return self._ssh_delete_file_on_remote
@property
def remote_flag(self) -> bool:
"""
Get the value of remote_flag.
Returns:
bool: The value of remote_flag.
"""
return self._remote_flag
@property
def config(self) -> dict:
"""
Get the QueueAdapter configuration.
Returns:
dict: The QueueAdapter configuration.
"""
return self._config
@property
def queue_list(self) -> list:
"""
Get the list of available queues.
Returns:
list: The list of available queues.
"""
return list(self._config["queues"].keys())
@property
def queue_view(self) -> pandas.DataFrame:
"""
Get the Pandas DataFrame representation of the available queues.
Returns:
pandas.DataFrame: The Pandas DataFrame representation of the available queues.
"""
return pandas.DataFrame(self._config["queues"]).T.drop(
["script", "template"], axis=1
)
@property
def queues(self):
"""
Get the available queues.
Returns:
Queues: The available queues.
"""
return self._queues
[docs]
def get_job_from_remote(self, working_directory: str) -> None:
"""
Get the results of the calculation - this is necessary when the calculation was executed on a remote host.
Args:
working_directory (str): The working directory where the calculation was executed.
"""
raise NotImplementedError
[docs]
def convert_path_to_remote(self, path: str):
"""
Converts a local file path to a remote file path.
Args:
path (str): The local file path to be converted.
Returns:
str: The converted remote file path.
"""
raise NotImplementedError
[docs]
def transfer_file(
self,
file: str,
transfer_back: bool = False,
delete_file_on_remote: bool = False,
):
"""
Transfer a file to a remote location.
Args:
file (str): The path of the file to be transferred.
transfer_back (bool, optional): Whether to transfer the file back after processing. Defaults to False.
delete_file_on_remote (bool, optional): Whether to delete the file on the remote location after transfer. Defaults to False.
"""
raise NotImplementedError
[docs]
def check_queue_parameters(
self,
queue: Optional[str],
cores: int = 1,
run_time_max: Optional[int] = None,
memory_max: Optional[Union[int, str]] = None,
active_queue: Optional[dict] = None,
) -> tuple[
Union[float, int, None], Union[float, int, None], Union[float, int, str, None]
]:
"""
Check the parameters of a queue.
Args:
queue (str): The queue to check.
cores (int, optional): The number of cores. Defaults to 1.
run_time_max (int, optional): The maximum run time. Defaults to None.
memory_max (int, optional): The maximum memory. Defaults to None.
active_queue (dict, optional): The active queue. Defaults to None.
Returns:
list: A list of queue parameters [cores, run_time_max, memory_max].
"""
if active_queue is None:
active_queue = self._config["queues"][queue]
return check_queue_parameters(
active_queue=active_queue,
cores=cores,
run_time_max=run_time_max,
memory_max=memory_max,
)
def _job_submission_template(
self,
queue: Optional[str] = None,
submission_template: Optional[Union[str, Template]] = None,
job_name: str = "job.py",
working_directory: str = ".",
cores: int = 1,
memory_max: Optional[Union[int, str]] = None,
run_time_max: Optional[int] = None,
dependency_list: Optional[list[int]] = None,
command: str = "",
**kwargs,
) -> str:
"""
Generate the job submission template.
Args:
queue (str, optional): The queue name. Defaults to None.
job_name (str, optional): The job name. Defaults to "job.py".
working_directory (str, optional): The working directory. Defaults to ".".
cores (int, optional): The number of cores. Defaults to None.
memory_max (int, optional): The maximum memory. Defaults to None.
run_time_max (int, optional): The maximum run time. Defaults to None.
dependency_list (list[int], optional): The list of dependency job IDs. Defaults to None.
command (str, optional): The command to be executed. Defaults to None.
Returns:
str: The job submission template.
"""
if queue is None:
queue = self._config["queue_primary"]
value_error_if_none(value=command)
if queue not in self.queue_list:
raise ValueError(
"The queue "
+ queue
+ " was not found in the list of queues: "
+ str(self.queue_list)
)
active_queue = self._config["queues"][queue]
cores_checked, run_time_max_checked, memory_max_checked = (
self.check_queue_parameters(
queue=None,
cores=cores,
run_time_max=run_time_max,
memory_max=memory_max,
active_queue=active_queue,
)
)
if cores_checked is None:
raise ValueError()
return super()._job_submission_template(
queue=None,
submission_template=active_queue["template"],
job_name=job_name,
working_directory=working_directory,
cores=int(cores_checked),
memory_max=(
int(memory_max_checked)
if isinstance(memory_max_checked, float)
else memory_max_checked
),
run_time_max=(
int(run_time_max_checked)
if isinstance(run_time_max_checked, float)
else run_time_max_checked
),
dependency_list=dependency_list,
command=command,
**kwargs,
)
@staticmethod
def _fill_queue_dict(queue_lst_dict: dict):
"""
Fill missing keys in the queue dictionary with None values.
Args:
queue_lst_dict (dict): The queue dictionary.
"""
queue_keys = ["cores_min", "cores_max", "run_time_max", "memory_max"]
for queue_dict in queue_lst_dict.values():
for key in set(queue_keys) - set(queue_dict.keys()):
queue_dict[key] = None
@staticmethod
def _load_templates(queue_lst_dict: dict, directory: str = ".") -> None:
"""
Load the queue templates from files and store them in the queue dictionary.
Args:
queue_lst_dict (dict): The queue dictionary.
directory (str, optional): The directory where the queue template files are located. Defaults to ".".
"""
for queue_dict in queue_lst_dict.values():
if "script" in queue_dict and queue_dict["script"] is not None:
with open(os.path.join(directory, queue_dict["script"])) as f:
try:
queue_dict["template"] = Template(f.read())
except TemplateSyntaxError as error:
raise TemplateSyntaxError(
message="File: "
+ queue_dict["script"]
+ " - "
+ error.message,
lineno=error.lineno,
)
[docs]
def read_config(file_name: str = "queue.yaml") -> dict:
"""
Read and parse a YAML configuration file.
Args:
file_name (str): The name of the YAML file to read.
Returns:
dict: The parsed configuration as a dictionary.
"""
with open(file_name) as f:
return yaml.load(f, Loader=yaml.FullLoader)