Python Interface#
The pysqa
package primarily defines one class, that is the QueueAdapter
. It loads the configuration from a configuration directory, initializes the corrsponding adapter for the specific queuing system and provides a high level interface for users to interact with the queuing system. The QueueAdapter
can be imported using:
from pysqa import QueueAdapter
After the initial import the class is initialized using the configuration directory specificed by the directory
parameter which defaults to "~/.queues"
. In this example we load the configuration from the test
directory:
qa = QueueAdapter(directory="config/flux")
This directory primarily contains two files, a queue.yaml
file which contains the meta-data for the queuing system and one or multiple shell script templates. In this example there is one shell script template named flux.sh
. The configuration files are explained in more detail in the documentation.
!cat config/flux/queue.yaml
queue_type: FLUX
queue_primary: flux
queues:
flux: {cores_max: 64, cores_min: 1, run_time_max: 172800, script: flux.sh}
!cat config/flux/flux.sh
#!/bin/bash
# flux:--job-name={{job_name}}
# flux: --env=CORES={{cores}}
# flux: --output=time.out
# flux: --error=error.out
# flux: -n {{cores}}
{%- if run_time_max %}
# flux: -t {{ [1, run_time_max // 60]|max }}
{%- endif %}
{{command}}
The queue.yaml
files and some templates for the most common queuing systems are defined below. By default pysqa
supports the following variable for the submission script templates:
job_name
- the name of the calculation which appears on the queuing systemworking_directory
- the directory on the file system the calculation is executed incores
- the number of cores used for the calculationmemory_max
- the amount of memory requested for the total calculationrun_time_max
- the run time requested for a given calculation - typically in secondscommand
- the command which is executed on the queuing system
Beyond these standardized keywords, additional flags can be added to the template which are then available through the python interface.
List available queues#
List available queues as list of queue names:
qa.queue_list
['flux']
List available queues in an pandas dataframe - this returns the information stored in the queue.yaml
file as a pandas.DataFrame
:
qa.queue_view
cores_max | cores_min | run_time_max | memory_max | |
---|---|---|---|---|
flux | 64 | 1 | 172800 | None |
Submit job to queue#
Submit a job to the queue - if no queue is specified it is submitted to the default queue defined in the queue configuration:
queue_id = qa.submit_job(
queue=None,
job_name=None,
working_directory=".",
cores=None,
memory_max=None,
run_time_max=None,
dependency_list=None,
command='sleep 5',
)
queue_id
127607504896
The only required parameter is:
command
the command that is executed as part of the job
Additional options for the submission of the job are:
queue
the queue the job is submitted to. If this option is not defined theprimary_queue
defined in the configuration is used.job_name
the name of the job submitted to the queuing system.working_directory
the working directory the job submitted to the queuing system is executed in.cores
the number of cores used for the calculation. If the cores are not defined the minimum number of cores defined for the selected queue are used.memory_max
the memory used for the calculation.run_time_max
the run time for the calculation. If the run time is not defined the maximum run time defined for the selected queue is used.dependency_list
other jobs the calculation depends on.**kwargs
allows writing additional parameters to the job submission script if they are available in the corresponding template.
Show jobs in queue#
Get status of all jobs currently handled by the queuing system:
qa.get_queue_status()
jobid | user | jobname | status | |
---|---|---|---|---|
0 | 127607504896 | jovyan | None | running |
With the additional parameter user
a specific user can be defined to only list the jobs of this specific user.
In analogy the jobs of the current user can be listed with:
qa.get_status_of_my_jobs()
jobid | user | jobname | status | |
---|---|---|---|---|
0 | 127607504896 | jovyan | None | running |
Finally, the status of a specific job with the queue id queue_id
can be received from the queuing system using:
qa.get_status_of_job(process_id=queue_id)
'running'
Delete job from queue#
Delete a job with the queue id queue_id
from the queuing system:
qa.delete_job(process_id=queue_id)
''