Source code for pysqa.wrapper.sge
import os
from typing import Optional, Union
import defusedxml.ElementTree as ETree
import pandas
from jinja2 import Template
from pysqa.wrapper.abstract import SchedulerCommands
template = """\
#!/bin/bash
#$ -N {{job_name}}
#$ -wd {{working_directory}}
{%- if cores %}
#$ -pe {{partition}} {{cores}}
{%- endif %}
{%- if memory_max %}
#$ -l h_vmem={{memory_max}}
{%- endif %}
{%- if run_time_max %}
#$ -l h_rt={{run_time_max}}
{%- endif %}
#$ -o time.out
#$ -e error.out
{{command}}
"""
[docs]
class SunGridEngineCommands(SchedulerCommands):
@property
def submit_job_command(self) -> list[str]:
"""Return the command to submit a job."""
return ["qsub", "-terse"]
@property
def delete_job_command(self) -> list[str]:
"""Return the command to delete a job."""
return ["qdel"]
@property
def enable_reservation_command(self) -> list[str]:
"""Return the command to enable job reservation."""
return ["qalter", "-R", "y"]
@property
def get_queue_status_command(self) -> list[str]:
"""Return the command to get the queue status."""
return ["qstat", "-xml"]
[docs]
@staticmethod
def convert_queue_status(queue_status_output: str) -> pandas.DataFrame:
"""Convert the queue status output to a pandas DataFrame.
Args:
queue_status_output: The output of the queue status command.
Returns:
A pandas DataFrame containing the converted queue status.
"""
def leaf_to_dict(leaf):
return [
{sub_child.tag: sub_child.text for sub_child in child} for child in leaf
]
tree = ETree.fromstring(queue_status_output)
df_running_jobs = pandas.DataFrame(leaf_to_dict(leaf=tree[0]))
df_pending_jobs = pandas.DataFrame(leaf_to_dict(leaf=tree[1]))
df_merge = pandas.concat([df_running_jobs, df_pending_jobs], sort=True)
df_merge.loc[df_merge.state == "r", "state"] = "running"
df_merge.loc[df_merge.state == "qw", "state"] = "pending"
df_merge.loc[df_merge.state == "Eqw", "state"] = "error"
return pandas.DataFrame(
{
"jobid": pandas.to_numeric(df_merge.JB_job_number),
"user": df_merge.JB_owner,
"jobname": df_merge.JB_name,
"status": df_merge.state,
"working_directory": [""] * len(df_merge),
}
)
[docs]
@staticmethod
def get_job_id_from_output(queue_submit_output: str) -> int:
"""Extracts the job ID from the output of the job submission command."""
return int(queue_submit_output.strip().split(".")[0])
[docs]
@staticmethod
def render_submission_template(
command: str,
submission_template: Union[str, Template] = template,
job_name: str = "pysqa",
working_directory: str = os.path.abspath("."),
cores: int = 1,
memory_max: Optional[Union[int, str]] = None,
run_time_max: Optional[int] = None,
dependency_list: Optional[list[int]] = None,
**kwargs,
) -> str:
"""
Generate the job submission template.
Args:
command (str, optional): The command to be executed.
job_name (str, optional): The job name. Defaults to "pysqa".
working_directory (str, optional): The working directory. Defaults to ".".
cores (int, optional): The number of cores. Defaults to 1.
memory_max (int, optional): The maximum memory. Defaults to None.
run_time_max (int, optional): The maximum run time. Defaults to None.
dependency_list (list[int], optional): The list of dependency job IDs. Defaults to None.
submission_template (str): Submission script template pysqa.wrapper.flux.template
Returns:
str: The rendered job submission template.
"""
return SchedulerCommands.render_submission_template(
command=command,
job_name=job_name,
working_directory=working_directory,
cores=cores,
memory_max=memory_max,
run_time_max=run_time_max,
dependency_list=dependency_list,
submission_template=submission_template,
**kwargs,
)