Source code for pysqa.base.core

import getpass
import importlib
import os
import subprocess
from typing import Callable, Optional, Union

import pandas
from jinja2 import Template

from pysqa.base.abstract import QueueAdapterAbstractClass
from pysqa.wrapper.abstract import SchedulerCommands

queue_type_dict: dict[str, dict[str, Union[str, None]]] = {
    "SGE": {
        "class_name": "SunGridEngineCommands",
        "module_name": "pysqa.wrapper.sge",
    },
    "TORQUE": {
        "class_name": "TorqueCommands",
        "module_name": "pysqa.wrapper.torque",
    },
    "SLURM": {
        "class_name": "SlurmCommands",
        "module_name": "pysqa.wrapper.slurm",
    },
    "LSF": {
        "class_name": "LsfCommands",
        "module_name": "pysqa.wrapper.lsf",
    },
    "MOAB": {
        "class_name": "MoabCommands",
        "module_name": "pysqa.wrapper.moab",
    },
    "GENT": {
        "class_name": "GentCommands",
        "module_name": "pysqa.wrapper.gent",
    },
    "REMOTE": {
        "class_name": None,
        "module_name": None,
    },
    "FLUX": {
        "class_name": "FluxCommands",
        "module_name": "pysqa.wrapper.flux",
    },
}


[docs] def execute_command( commands: Union[str, list[str]], working_directory: Optional[str] = None, split_output: bool = True, shell: bool = False, error_filename: str = "pysqa.err", ) -> Union[str, list[str], None]: """ A wrapper around the subprocess.check_output function. Args: commands (str): The command(s) to be executed on the command line working_directory (str, optional): The directory where the command is executed. Defaults to None. split_output (bool, optional): Boolean flag to split newlines in the output. Defaults to True. shell (bool, optional): Additional switch to convert commands to a single string. Defaults to False. error_filename (str, optional): In case the execution fails, the output is written to this file. Defaults to "pysqa.err". Returns: Union[str, List[str]]: Output of the shell command either as a string or as a list of strings """ if shell and isinstance(commands, list): commands = " ".join(commands) try: out = subprocess.check_output( commands, cwd=working_directory, stderr=subprocess.STDOUT, universal_newlines=True, shell=not isinstance(commands, list), ) except subprocess.CalledProcessError as e: if working_directory is not None: error_file = os.path.join(working_directory, error_filename) else: error_file = error_filename with open(error_file, "w") as f: print(e.stdout, file=f) out = None if out is not None and split_output: return out.split("\n") else: return out
[docs] def get_queue_commands(queue_type: str) -> Union[SchedulerCommands, None]: """ Load queuing system commands class Args: queue_type (str): Type of the queuing system in capital letters Returns: SchedulerCommands: queuing system commands class instance """ if queue_type in queue_type_dict: class_name = queue_type_dict[queue_type]["class_name"] module_name = queue_type_dict[queue_type]["module_name"] if module_name is not None and class_name is not None: return getattr(importlib.import_module(module_name), class_name)() else: return None else: raise ValueError( "The queue_type " + queue_type + " is not found in the list of supported queue types " + str(list(queue_type_dict.keys())) )
[docs] class QueueAdapterCore(QueueAdapterAbstractClass): """ The goal of the QueueAdapter class is to make submitting to a queue system as easy as starting another sub process locally. Args: queue_type (str): Type of the queuing system in capital letters execute_command (funct): Function to execute commands. """
[docs] def __init__( self, queue_type: str, execute_command: Callable = execute_command, ): self._commands = get_queue_commands(queue_type=queue_type) module_name = queue_type_dict[queue_type]["module_name"] if module_name is not None: self._submission_template = importlib.import_module(module_name).template self._execute_command_function = execute_command
[docs] def submit_job( self, queue: Optional[str] = None, job_name: str = "pysqa", working_directory: Optional[str] = None, cores: int = 1, memory_max: Optional[Union[int, str]] = None, run_time_max: Optional[int] = None, dependency_list: Optional[list[int]] = None, command: str = "", submission_template: Optional[Union[str, Template]] = None, **kwargs, ) -> Union[int, None]: """ Submit a job to the queue. Args: queue (str/None): The queue to submit the job to. job_name (str/None): The name of the job. working_directory (str/None): The working directory for the job. cores (int/None): The number of cores required for the job. memory_max (int/None): The maximum memory required for the job. run_time_max (int/None): The maximum run time for the job. dependency_list (list[str]/None): List of job dependencies. command (str): The command to execute for the job. Returns: int: The job ID. """ if working_directory is not None and " " in working_directory: raise ValueError( "Whitespaces in the working_directory name are not supported!" ) if submission_template is None: submission_template = self._submission_template working_directory, queue_script_path = self._write_queue_script( queue=queue, job_name=job_name, working_directory=working_directory, cores=cores, memory_max=memory_max, run_time_max=run_time_max, command=command, dependency_list=dependency_list, submission_template=submission_template, **kwargs, ) out = self._execute_command( commands=self._list_command_to_be_executed( queue_script_path=queue_script_path ), working_directory=working_directory, split_output=False, ) if out is not None and self._commands is not None: return self._commands.get_job_id_from_output(out) else: return None
[docs] def enable_reservation(self, process_id: int): """ Enable reservation for a process. Args: process_id (int): The process ID. Returns: str: The result of the enable reservation command. """ if self._commands is not None: out = self._execute_command( commands=self._commands.enable_reservation_command + [str(process_id)], split_output=True, ) if out is not None: return out[0] return None
[docs] def delete_job(self, process_id: int) -> Union[str, None]: """ Delete a job. Args: process_id (int): The process ID. Returns: str: The result of the delete job command. """ if self._commands is not None: out = self._execute_command( commands=self._commands.delete_job_command + [str(process_id)], split_output=True, ) if out is not None: return out[0] return None
[docs] def get_queue_status( self, user: Optional[str] = None ) -> Union[pandas.DataFrame, None]: """ Get the status of the queue. Args: user (str): The user to filter the queue status for. Returns: pandas.DataFrame: The queue status. """ if self._commands is not None: out = self._execute_command( commands=self._commands.get_queue_status_command, split_output=False ) df = self._commands.convert_queue_status(queue_status_output=out) if user is None: return df else: return df[df["user"] == user] else: return None
[docs] def get_status_of_my_jobs(self) -> pandas.DataFrame: """ Get the status of the user's jobs. Returns: pandas.DataFrame: The status of the user's jobs. """ return self.get_queue_status(user=self._get_user())
[docs] def get_status_of_job(self, process_id: int) -> Union[str, None]: """ Get the status of a job. Args: process_id (int): The process ID. Returns: str: The status of the job.results_lst.append(df_selected.values[0]) """ df = self.get_queue_status() if df is not None: df_selected = df[df["jobid"] == process_id]["status"] if len(df_selected) != 0: return df_selected.values[0] return None
[docs] def get_status_of_jobs(self, process_id_lst: list[int]) -> list[str]: """ Get the status of multiple jobs. Args: process_id_lst (list[int]): List of process IDs. Returns: list[str]: List of job statuses. """ df = self.get_queue_status() results_lst = [] if df is not None: for process_id in process_id_lst: df_selected = df[df["jobid"] == process_id]["status"] if len(df_selected) != 0: results_lst.append(df_selected.values[0]) else: results_lst.append("finished") return results_lst
def _list_command_to_be_executed(self, queue_script_path: str) -> list: """ Get the list of commands to be executed. Args: queue_script_path (str): The path to the queue script. Returns: list: The list of commands to be executed. """ if self._commands is not None: return self._commands.submit_job_command + [queue_script_path] else: return [] def _execute_command( self, commands: Union[str, list[str]], working_directory: Optional[str] = None, split_output: bool = True, shell: bool = False, error_filename: str = "pysqa.err", ) -> str: """ Execute a command or a list of commands. Args: commands (Union[str, List[str]]): The command(s) to be executed. working_directory (Optional[str], optional): The working directory. Defaults to None. split_output (bool, optional): Whether to split the output into lines. Defaults to True. shell (bool, optional): Whether to use the shell to execute the command. Defaults to False. error_filename (str, optional): The name of the error file. Defaults to "pysqa.err". Returns: str: The output of the command(s). """ return self._execute_command_function( commands=commands, working_directory=working_directory, split_output=split_output, shell=shell, error_filename=error_filename, ) def _write_queue_script( self, queue: Optional[str] = None, submission_template: Optional[Union[str, Template]] = None, job_name: str = "pysqa", working_directory: Optional[str] = None, cores: int = 1, memory_max: Optional[Union[int, str]] = None, run_time_max: Optional[int] = None, dependency_list: Optional[list[int]] = None, command: str = "", **kwargs, ) -> tuple[str, str]: """ Write the queue script to a file. Args: queue (str/None): The queue name. job_name (str/None): The job name. working_directory (str/None): The working directory. cores (int/None): The number of cores. memory_max (int/None): The maximum memory. run_time_max (int/None): The maximum run time. dependency_list (list/None): The list of dependency job IDs. command (str): The command to be executed. Returns: Tuple[str, str]: A tuple containing the working directory and the path to the queue script file. """ if isinstance(command, list): command = "".join(command) if working_directory is None: working_directory = "." queue_script = self._job_submission_template( queue=queue, submission_template=submission_template, job_name=job_name, working_directory=working_directory, cores=cores, memory_max=memory_max, run_time_max=run_time_max, dependency_list=dependency_list, command=command, **kwargs, ) os.makedirs(working_directory, exist_ok=True) queue_script_path = os.path.join(working_directory, "run_queue.sh") with open(queue_script_path, "w") as f: f.writelines(queue_script) return working_directory, queue_script_path def _job_submission_template( self, queue: Optional[str] = None, submission_template: Optional[Union[str, Template]] = None, job_name: str = "pysqa", working_directory: str = ".", cores: int = 1, memory_max: Optional[Union[int, str]] = None, run_time_max: Optional[int] = None, dependency_list: Optional[list[int]] = None, command: str = "", **kwargs, ) -> str: """ Generate the job submission template. Args: queue (str, optional): The queue name. Defaults to None. job_name (str, optional): The job name. Defaults to "job.py". working_directory (str, optional): The working directory. Defaults to ".". cores (int, optional): The number of cores. Defaults to None. memory_max (int, optional): The maximum memory. Defaults to None. run_time_max (int, optional): The maximum run time. Defaults to None. dependency_list (list[int], optional): The list of dependency job IDs. Defaults to None. command (str): The command to be executed. Defaults to None. Returns: str: The job submission template. """ if queue is not None: raise ValueError() if submission_template is None: submission_template = self._submission_template if self._commands is not None: return self._commands.render_submission_template( command=command, submission_template=submission_template, working_directory=working_directory, job_name=job_name, cores=cores, memory_max=memory_max, run_time_max=run_time_max, dependency_list=dependency_list, **kwargs, ) else: return "" def _get_user(self) -> str: """ Get the current user. Returns: str: The current user. """ return getpass.getuser()