Source code for pysqa.wrapper.lsf

import os
from typing import Optional, Union

import pandas
from jinja2 import Template

from pysqa.wrapper.abstract import SchedulerCommands

template = """\
#!/bin/bash
#BSUB -q queue
#BSUB -J {{job_name}}
#BSUB -o time.out
#BSUB -n {{cores}}
#BSUB -cwd {{working_directory}}
#BSUB -e error.out
{%- if run_time_max %}
#BSUB -W {{run_time_max}}
{%- endif %}
{%- if memory_max %}
#BSUB -M {{memory_max}}
{%- endif %}

{{command}}
"""


[docs] class LsfCommands(SchedulerCommands): @property def submit_job_command(self) -> list[str]: """Return the command to submit a job.""" return ["bsub"] @property def delete_job_command(self) -> list[str]: """Return the command to delete a job.""" return ["bkill"] @property def get_queue_status_command(self) -> list[str]: """Return the command to get the queue status.""" return ["bjobs"]
[docs] @staticmethod def get_job_id_from_output(queue_submit_output: str) -> int: """Extract the job ID from the queue submit output.""" return int(queue_submit_output.split("<")[1].split(">", maxsplit=1)[0])
[docs] @staticmethod def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: """Convert the queue status output to a pandas DataFrame.""" job_id_lst, user_lst, status_lst, job_name_lst = [], [], [], [] line_split_lst = queue_status_output.split("\n") if len(line_split_lst) > 1: for line in line_split_lst[1:]: line_segments = line.split() if len(line_segments) > 1: job_id_lst.append(int(line_segments[0])) user_lst.append(line_segments[1]) status_lst.append(line_segments[2]) job_name_lst.append(line_segments[6]) df = pandas.DataFrame( { "jobid": job_id_lst, "user": user_lst, "jobname": job_name_lst, "status": status_lst, } ) df.loc[df.status == "RUN", "status"] = "running" df.loc[df.status == "PEND", "status"] = "pending" return df
[docs] @staticmethod def render_submission_template( command: str, submission_template: Union[str, Template] = template, job_name: str = "pysqa", working_directory: str = os.path.abspath("."), cores: int = 1, memory_max: Optional[Union[int, str]] = None, run_time_max: Optional[int] = None, dependency_list: Optional[list[int]] = None, **kwargs, ) -> str: """ Generate the job submission template. Args: command (str, optional): The command to be executed. job_name (str, optional): The job name. Defaults to "pysqa". working_directory (str, optional): The working directory. Defaults to ".". cores (int, optional): The number of cores. Defaults to 1. memory_max (int, optional): The maximum memory. Defaults to None. run_time_max (int, optional): The maximum run time. Defaults to None. dependency_list (list[int], optional): The list of dependency job IDs. Defaults to None. submission_template (str): Submission script template pysqa.wrapper.flux.template Returns: str: The rendered job submission template. """ return SchedulerCommands.render_submission_template( command=command, job_name=job_name, working_directory=working_directory, cores=cores, memory_max=memory_max, run_time_max=run_time_max, dependency_list=dependency_list, submission_template=submission_template, **kwargs, )