Source code for pysqa.wrapper.torque

import os
import re
from typing import Optional, Union

import pandas
from jinja2 import Template

from pysqa.wrapper.abstract import SchedulerCommands

template = """\
#!/bin/bash
#PBS -l ncpus={{cores}}
#PBS -N {{job_name}}
{%- if memory_max %}
#PBS -l mem={{ memory_max| int }}GB
{%- endif %}
{%- if run_time_max %}
#PBS -l walltime={{run_time_max}} 
{%- endif %}
#PBS -l wd
{%- if dependency_list %}
#PBS -W depend=afterok:{{ dependency_list | join(':') }}
{%- endif %}
 
{{command}}
"""


[docs] class TorqueCommands(SchedulerCommands): @property def submit_job_command(self) -> list[str]: """Returns the command to submit a job.""" return ["qsub"] @property def delete_job_command(self) -> list[str]: """Returns the command to delete a job.""" return ["qdel"] @property def get_queue_status_command(self) -> list[str]: """Returns the command to get the queue status.""" return ["qstat", "-f"]
[docs] @staticmethod def get_job_id_from_output(queue_submit_output: str) -> int: """Extracts the job ID from the queue submit output. Args: queue_submit_output (str): The output of the queue submit command. Returns: int: The job ID. Raises: ValueError: If the job ID cannot be extracted. """ return int( queue_submit_output.splitlines()[-1].rstrip().lstrip().split(sep=".")[0] )
[docs] @staticmethod def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: """Converts the queue status output into a pandas DataFrame. Args: queue_status_output (str): The output of the queue status command. Returns: pandas.DataFrame: The queue status as a DataFrame. """ lines = queue_status_output.split("\n") input_string = "".join(lines) # remove all whitespaces input_string = "".join(input_string.split()) # Extract the job ID, user, job name, status, and working directory for each running job regex_pattern_job_id = r"JobId:(.*?)Job_Name" job_id_lst = re.findall(regex_pattern_job_id, input_string) job_id_lst = [int(job_id_str.split(sep=".")[0]) for job_id_str in job_id_lst] regex_pattern_user = r"Job_Owner=(.*?)job_state" user_lst = re.findall(regex_pattern_user, input_string) user_lst = [user_str.split("@")[0] for user_str in user_lst] regex_pattern_job_name = r"Job_Name=(.*?)Job_Owner" job_name_lst = re.findall(regex_pattern_job_name, input_string) regex_pattern_status = r"job_state=(.*?)queue" status_lst = re.findall(regex_pattern_status, input_string) regex_pattern_working_directory = r"PBS_O_WORKDIR=(.*?),PBS" working_directory_lst = re.findall( regex_pattern_working_directory, input_string ) df = pandas.DataFrame( { "jobid": job_id_lst, "user": user_lst, "jobname": job_name_lst, "status": status_lst, "working_directory": working_directory_lst, } ) df["status"] = df["status"].apply( lambda x: "running" if x == "R" else "pending" ) return df
[docs] @staticmethod def render_submission_template( command: str, submission_template: Union[str, Template] = template, job_name: str = "pysqa", working_directory: str = os.path.abspath("."), cores: int = 1, memory_max: Optional[Union[int, str]] = None, run_time_max: Optional[int] = None, dependency_list: Optional[list[int]] = None, **kwargs, ) -> str: """ Generate the job submission template. Args: command (str, optional): The command to be executed. job_name (str, optional): The job name. Defaults to "pysqa". working_directory (str, optional): The working directory. Defaults to ".". cores (int, optional): The number of cores. Defaults to 1. memory_max (int, optional): The maximum memory. Defaults to None. run_time_max (int, optional): The maximum run time. Defaults to None. dependency_list (list[int], optional): The list of dependency job IDs. Defaults to None. submission_template (str): Submission script template pysqa.wrapper.flux.template Returns: str: The rendered job submission template. """ return SchedulerCommands.render_submission_template( command=command, job_name=job_name, working_directory=working_directory, cores=cores, memory_max=memory_max, run_time_max=run_time_max, dependency_list=dependency_list, submission_template=submission_template, **kwargs, )