Source code for pysqa.wrapper.slurm

import os
from typing import Optional, Union

import pandas
from jinja2 import Template

from pysqa.wrapper.abstract import SchedulerCommands

template = """\
#!/bin/bash
#SBATCH --output=time.out
#SBATCH --job-name={{job_name}}
#SBATCH --chdir={{working_directory}}
#SBATCH --get-user-env=L
#SBATCH --partition={{partition}}
{%- if run_time_max %}
#SBATCH --time={{ [1, run_time_max // 60]|max }}
{%- endif %}
{%- if dependency_list %}
#SBATCH --dependency=afterok:{{ dependency_list | join(',') }}
{%- endif %}
{%- if memory_max %}
#SBATCH --mem={{memory_max}}G
{%- endif %}
#SBATCH --ntasks={{cores}}

{{command}}
"""


[docs] class SlurmCommands(SchedulerCommands): @property def submit_job_command(self) -> list[str]: """Returns the command to submit a job to Slurm.""" return ["sbatch", "--parsable"] @property def delete_job_command(self) -> list[str]: """Returns the command to delete a job from Slurm.""" return ["scancel"] @property def get_queue_status_command(self) -> list[str]: """Returns the command to get the queue status from Slurm.""" return ["squeue", "--format", "%A|%u|%t|%.15j|%Z", "--noheader"]
[docs] @staticmethod def get_job_id_from_output(queue_submit_output: str) -> int: """Extracts the job ID from the output of the job submission command.""" return int(queue_submit_output.splitlines()[-1].rstrip().lstrip().split()[-1])
[docs] @staticmethod def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: """Converts the queue status output from Slurm into a pandas DataFrame.""" line_split_lst = [line.split("|") for line in queue_status_output.splitlines()] if len(line_split_lst) != 0: job_id_lst, user_lst, status_lst, job_name_lst, working_directory_lst = zip( *[ (int(jobid), user, status.lower(), jobname, working_directory) for jobid, user, status, jobname, working_directory in line_split_lst ] ) else: job_id_lst, user_lst, status_lst, job_name_lst, working_directory_lst = ( (), (), (), (), (), ) df = pandas.DataFrame( { "jobid": job_id_lst, "user": user_lst, "jobname": job_name_lst, "status": status_lst, "working_directory": working_directory_lst, } ) df.loc[df.status == "r", "status"] = "running" df.loc[df.status == "pd", "status"] = "pending" return df
[docs] @staticmethod def dependencies(dependency_list: list[str]) -> list[str]: """Returns the dependency options for job submission.""" if dependency_list is not None and len(dependency_list) > 0: return ["--dependency=afterok:" + ",".join(dependency_list)] else: return []
[docs] @staticmethod def render_submission_template( command: str, submission_template: Union[str, Template] = template, job_name: str = "pysqa", working_directory: str = os.path.abspath("."), cores: int = 1, memory_max: Optional[Union[int, str]] = None, run_time_max: Optional[int] = None, dependency_list: Optional[list[int]] = None, **kwargs, ) -> str: """ Generate the job submission template. Args: command (str, optional): The command to be executed. job_name (str, optional): The job name. Defaults to "pysqa". working_directory (str, optional): The working directory. Defaults to ".". cores (int, optional): The number of cores. Defaults to 1. memory_max (int, optional): The maximum memory. Defaults to None. run_time_max (int, optional): The maximum run time. Defaults to None. dependency_list (list[int], optional): The list of dependency job IDs. Defaults to None. submission_template (str): Submission script template pysqa.wrapper.flux.template Returns: str: The rendered job submission template. """ return SchedulerCommands.render_submission_template( command=command, job_name=job_name, working_directory=working_directory, cores=cores, memory_max=memory_max, run_time_max=run_time_max, dependency_list=dependency_list, submission_template=submission_template, **kwargs, )