Python Interface#

The pysqa package primarily defines one class, that is the QueueAdapter. It loads the configuration from a configuration directory, initializes the corrsponding adapter for the specific queuing system and provides a high level interface for users to interact with the queuing system. The QueueAdapter can be imported using:

from pysqa import QueueAdapter

After the initial import the class is initialized using the configuration directory specificed by the directory parameter which defaults to "~/.queues". In this example we load the configuration from the test directory:

qa = QueueAdapter(directory="config/flux")

This directory primarily contains two files, a queue.yaml file which contains the meta-data for the queuing system and one or multiple shell script templates. In this example there is one shell script template named flux.sh. The configuration files are explained in more detail in the documentation.

!cat config/flux/queue.yaml
queue_type: FLUX
queue_primary: flux
queues:
  flux: {cores_max: 64, cores_min: 1, run_time_max: 172800, script: flux.sh}
!cat config/flux/flux.sh
#!/bin/bash
# flux:--job-name={{job_name}}
# flux: --env=CORES={{cores}}
# flux: --output=time.out
# flux: --error=error.out
# flux: -n {{cores}}
{%- if run_time_max %}
# flux: -t {{ [1, run_time_max // 60]|max }}
{%- endif %}

{{command}}

The queue.yaml files and some templates for the most common queuing systems are defined below. By default pysqa supports the following variable for the submission script templates:

  • job_name - the name of the calculation which appears on the queuing system

  • working_directory - the directory on the file system the calculation is executed in

  • cores - the number of cores used for the calculation

  • memory_max - the amount of memory requested for the total calculation

  • run_time_max - the run time requested for a given calculation - typically in seconds

  • command - the command which is executed on the queuing system

Beyond these standardized keywords, additional flags can be added to the template which are then available through the python interface.

List available queues#

List available queues as list of queue names:

qa.queue_list
['flux']

List available queues in an pandas dataframe - this returns the information stored in the queue.yaml file as a pandas.DataFrame:

qa.queue_view
cores_max cores_min run_time_max memory_max
flux 64 1 172800 None

Submit job to queue#

Submit a job to the queue - if no queue is specified it is submitted to the default queue defined in the queue configuration:

queue_id = qa.submit_job(
    queue=None,
    job_name=None,
    working_directory=".",
    cores=None,
    memory_max=None,
    run_time_max=None,
    dependency_list=None,
    command='sleep 5',
)
queue_id
127607504896

The only required parameter is:

  • command the command that is executed as part of the job

Additional options for the submission of the job are:

  • queue the queue the job is submitted to. If this option is not defined the primary_queue defined in the configuration is used.

  • job_name the name of the job submitted to the queuing system.

  • working_directory the working directory the job submitted to the queuing system is executed in.

  • cores the number of cores used for the calculation. If the cores are not defined the minimum number of cores defined for the selected queue are used.

  • memory_max the memory used for the calculation.

  • run_time_max the run time for the calculation. If the run time is not defined the maximum run time defined for the selected queue is used.

  • dependency_list other jobs the calculation depends on.

  • **kwargs allows writing additional parameters to the job submission script if they are available in the corresponding template.

Show jobs in queue#

Get status of all jobs currently handled by the queuing system:

qa.get_queue_status()
jobid user jobname status
0 127607504896 jovyan None running

With the additional parameter user a specific user can be defined to only list the jobs of this specific user.

In analogy the jobs of the current user can be listed with:

qa.get_status_of_my_jobs()
jobid user jobname status
0 127607504896 jovyan None running

Finally, the status of a specific job with the queue id queue_id can be received from the queuing system using:

qa.get_status_of_job(process_id=queue_id)
'running'

Delete job from queue#

Delete a job with the queue id queue_id from the queuing system:

qa.delete_job(process_id=queue_id)
''