Dynamic Python Interface#
The pysqa package primarily defines one class, that is the QueueAdapter. It can either load the configuration from a configuration directory, or initializes the corrsponding adapter for the specific queuing system type queue_type and provides a high level interface for users to interact with the queuing system. The QueueAdapter can be imported using:
from pysqa import QueueAdapter
After the initial import the class is initialized using the queuing system type specificed by the queue_type parameter. In this example we load the flux as queuing system interface:
qa = QueueAdapter(queue_type="flux")
The configuration is explained in more detail in the documentation.
Submit job to queue#
Submit a job to the queue - if no queue is specified it is submitted to the default queue defined in the queue configuration:
queue_id = qa.submit_job(
job_name=None,
working_directory=".",
cores=1,
memory_max=None,
run_time_max=None,
dependency_list=None,
submission_template=None,
command="sleep 5",
)
queue_id
64156073984
The only required parameter is:
commandthe command that is executed as part of the job
Additional options for the submission of the job are:
job_namethe name of the job submitted to the queuing system.working_directorythe working directory the job submitted to the queuing system is executed in.coresthe number of cores used for the calculation. If the cores are not defined the minimum number of cores defined for the selected queue are used.memory_maxthe memory used for the calculation.run_time_maxthe run time for the calculation. If the run time is not defined the maximum run time defined for the selected queue is used.dependency_listother jobs the calculation depends on.submission_templatethe template submission script.**kwargsallows writing additional parameters to the job submission script if they are available in the corresponding template.
The submission script template can be imported directly using from pysqa.wrapper.flux import template:
from pysqa.wrapper.flux import template
template.split("\n")
['#!/bin/bash',
'# flux: --job-name={{job_name}}',
'# flux: --env=CORES={{cores}}',
'# flux: --output=time.out',
'# flux: --error=error.out',
'# flux: -n {{cores}}',
'{%- if run_time_max %}',
'# flux: -t {{ [1, run_time_max // 60]|max }}',
'{%- endif %}',
'{%- if dependency %}',
'{%- for jobid in dependency %}',
'# flux: --dependency=afterok:{{jobid}}',
'{%- endfor %}',
'{%- endif %}',
'',
'{{command}}',
'']
Show jobs in queue#
Get status of all jobs currently handled by the queuing system:
qa.get_queue_status()
| jobid | user | jobname | status | |
|---|---|---|---|---|
| 0 | 64156073984 | jovyan | None | running |
With the additional parameter user a specific user can be defined to only list the jobs of this specific user.
In analogy the jobs of the current user can be listed with:
qa.get_status_of_my_jobs()
| jobid | user | jobname | status | |
|---|---|---|---|---|
| 0 | 64156073984 | jovyan | None | running |
Finally, the status of a specific job with the queue id queue_id can be received from the queuing system using:
qa.get_status_of_job(process_id=queue_id)
'running'
Delete job from queue#
Delete a job with the queue id queue_id from the queuing system:
qa.delete_job(process_id=queue_id)
''