Source code for jobrunner.jobrunner

"""
This module provides an abstraction layer to run jobs on high performance computers
using torque, grid, or locally with xargs.
"""

from __future__ import print_function
from __future__ import absolute_import

import os
import psutil
import subprocess
import sys


# Determine how to decode bytes
std_encoding = sys.stdout.encoding
if std_encoding is None:
    std_encoding = sys.stdin.encoding
if std_encoding is None:
    std_encoding = "utf-8"


[docs]class JobRunnerException(Exception): """Raised for fatal JobRunner errors"""
[docs]class JobRunner(object):
[docs] def __init__(self, hpc_type, strip_job_array_suffix=True, qsub_extra_params=None, exception_handler=None, verbose=False): """Initialize an hpc job runner object. Parameters ---------- hpc_type : str Type of job runner. Possible values are "grid", "slurm", "torque", and "local". strip_job_array_suffix : bool, optional defaults to True When true, the dot and array suffix in the job id is removed before returning the job id. qsub_extra_params : str, optional defaults to None Extra command line options passed to qsub or sbatch every time a job is submitted. exception_handler : function, optional defalts to None Function to be called in local mode only when an exception occurs while attempting to run an external process. The function will be called with the arguments (exc_type, exc_value, exc_traceback). verbose : bool, optional defaults to False When true, the job command lines are logged. Examples -------- >>> runner = JobRunner("foobar") Traceback (most recent call last): ValueError: hpc_type must be one of: "grid", "slurm", "torque", "local" """ hpc_type = hpc_type.lower() if hpc_type not in ["grid", "slurm", "torque", "local"]: raise ValueError('hpc_type must be one of: "grid", "slurm", "torque", "local"') self.hpc_type = hpc_type self.strip_job_array_suffix = strip_job_array_suffix self.qsub_extra_params = qsub_extra_params self.exception_handler = exception_handler self.verbose = verbose if hpc_type == 'grid': self.subtask_env_var_name = "SGE_TASK_ID" elif hpc_type == 'slurm': self.subtask_env_var_name = "SLURM_ARRAY_TASK_ID" elif hpc_type == 'torque': self.subtask_env_var_name = "PBS_ARRAYID"
def _make_qsub_command(self, job_name, log_file, wait_for=[], wait_for_array=[], slot_dependency=False, threads=1, parallel_environment=None, num_tasks=None, max_processes=None, exclusive=False, wall_clock_limit=None): """Create the command line to run a job on a computing cluster. Parameters ---------- job_name : str Job name that will appear in the job scheduler queue. log_file : str Path to the combined stdout / stderr log file. wait_for : str or list of str, optional defaults to empty list Single job id or list of jobs ids to wait for before beginning execution. wait_for_array : str or list of str, optional defaults to empty list Single array job id or list of array jobs ids to wait for before beginning execution. slot_dependency : bool, optional defaults to False Enforced for grid engine and slurm only. Ignored for all other schedulers. If true, the sub-tasks of the array job being submitted will be dependent on the completion of the corresponding sub-tasks of the jobs in the wait_for_array. Has no effect on the dependencies of non-array jobs. threads : int, optional defaults to 1 Number of CPU threads consumed by the job. parallel_environment : str, optional defaults to None Name of the grid engine parallel execution environment. This must be specified when consuming more than one thread on grid engine. Ununsed for any other job scheduler. num_tasks : int, optional defaults to None When specified, the job becomes an array job with num_tasks sub-tasks. max_processes : int, optional defaults to None If not None, it sets the maximium number of concurrent processes for an array job. exclusive : bool, optional, defaults to False Requests exclusive access to compute nodes to prevent other jobs from sharing the node resources. Enforced only on SLURM, silently ignored for all other schedulers. wall_clock_limit : str, optional, defaults to None Maximum run-time; string of the form HH:MM:SS. Returns ------- command_line : str Job submission command line for Grid, SLURM, or torque. Examples -------- >>> runner = JobRunner("local") >>> runner._make_qsub_command("JobName", "log") Traceback (most recent call last): ValueError: _make_qsub_command() does not support hpc type local # grid # ======= >>> runner = JobRunner("grid", qsub_extra_params="-q service.q") >>> runner._make_qsub_command("JobName", "log", "777", "888", threads=8, parallel_environment="mpi") 'qsub -terse -V -j y -cwd -N JobName -o log -hold_jid 777,888 -pe mpi 8 -q service.q' >>> runner._make_qsub_command("JobName", "log", ["666", "777"], ["888", "999"], threads=8, parallel_environment="mpi", wall_clock_limit="00:00:40") 'qsub -terse -V -j y -cwd -N JobName -o log -hold_jid 666,777,888,999 -pe mpi 8 -l h_rt=00:00:40 -q service.q' >>> runner._make_qsub_command("JobName", "log", ["666", "777"], ["888", "999"], slot_dependency=True, threads=8, parallel_environment="mpi") 'qsub -terse -V -j y -cwd -N JobName -o log -hold_jid 666,777 -hold_jid_ad 888,999 -pe mpi 8 -q service.q' # array job >>> runner._make_qsub_command("JobName", "log", ["666", "777"], ["888", "999"], slot_dependency=True, threads=8, parallel_environment="mpi", num_tasks=99, max_processes=2, wall_clock_limit="00:00:40") 'qsub -terse -t 1-99 -V -j y -cwd -N JobName -o log -hold_jid 666,777 -hold_jid_ad 888,999 -tc 2 -pe mpi 8 -l h_rt=00:00:40 -q service.q' # slurm # ======= >>> runner = JobRunner("slurm", qsub_extra_params="-p short.q") >>> runner._make_qsub_command("JobName", "log", "777", "888", threads=8) 'sbatch --parsable --export=ALL --job-name=JobName -o log --dependency=afterok:777:888 --cpus-per-task=8 -p short.q' >>> runner._make_qsub_command("JobName", "log", ["666", "777"], ["888", "999"], threads=8, wall_clock_limit="00:00:40") 'sbatch --parsable --export=ALL --job-name=JobName -o log --dependency=afterok:666:777:888:999 --cpus-per-task=8 --time 00:00:40 -p short.q' # array job >>> runner._make_qsub_command("JobName", "log", ["666", "777"], ["888", "999"], slot_dependency=False, threads=8, num_tasks=44, max_processes=2, wall_clock_limit="00:00:40") 'sbatch --parsable --array=1-44%2 --export=ALL --job-name=JobName -o log --dependency=afterok:666:777:888:999 --cpus-per-task=8 --time 00:00:40 -p short.q' >>> runner._make_qsub_command("JobName", "log", ["666", "777"], ["888", "999"], slot_dependency=True, threads=8, num_tasks=44, max_processes=2) 'sbatch --parsable --array=1-44%2 --export=ALL --job-name=JobName -o log --dependency=afterok:666:777,aftercorr:888:999 --cpus-per-task=8 -p short.q' # exclusive node access >>> runner._make_qsub_command("JobName", "log", ["666", "777"], ["888", "999"], slot_dependency=False, threads=8, num_tasks=44, max_processes=2, exclusive=True) 'sbatch --parsable --exclusive --array=1-44%2 --export=ALL --job-name=JobName -o log --dependency=afterok:666:777:888:999 --cpus-per-task=8 -p short.q' # torque # ======= >>> runner = JobRunner("torque", qsub_extra_params="-q short.q") >>> cmd = runner._make_qsub_command("JobName", "log", "777", "888", threads=8) >>> cmd == "qsub -V -j oe -d %s -N JobName -o log -W depend=afterok:777,afterokarray:888 -l nodes=1:ppn=8 -q short.q" % os.getcwd() True >>> cmd = runner._make_qsub_command("JobName", "log", ["666", "777"], ["888", "999"], threads=8, wall_clock_limit="00:00:40") >>> cmd == "qsub -V -j oe -d %s -N JobName -o log -W depend=afterok:666:777,afterokarray:888:999 -l nodes=1:ppn=8 -l walltime=00:00:40 -q short.q" % os.getcwd() True # array job >>> cmd = runner._make_qsub_command("JobName", "log", ["666", "777"], ["888", "999"], slot_dependency=True, threads=8, num_tasks=44, max_processes=2) >>> cmd == "qsub -t 1-44%%2 -V -j oe -d %s -N JobName -o log -W depend=afterok:666:777,afterokarray:888:999 -l nodes=1:ppn=8 -q short.q" % os.getcwd() True """ if self.hpc_type not in ["grid", "slurm", "torque"]: raise ValueError("_make_qsub_command() does not support hpc type %s" % self.hpc_type) if self.hpc_type == "grid": array_option = " -t 1-" + str(num_tasks) if num_tasks else "" qsub_command_line = "qsub -terse" + array_option + " -V -j y -cwd -N " + job_name + " -o " + log_file if isinstance(wait_for, str): wait_for = [wait_for] if isinstance(wait_for_array, str): wait_for_array = [wait_for_array] if not slot_dependency: wait_for.extend(wait_for_array) # combine lists if len(wait_for) > 0: qsub_command_line += " -hold_jid " + ','.join(wait_for) if slot_dependency and len(wait_for_array) > 0: qsub_command_line += " -hold_jid_ad " + ','.join(wait_for_array) if max_processes: qsub_command_line += " -tc " + str(max_processes) if threads > 1: if not parallel_environment: raise ValueError("You must use a parallel environment when consuming more than one thread on grid engine") qsub_command_line += " -pe " + parallel_environment + ' ' + str(threads) if wall_clock_limit: qsub_command_line += " -l h_rt=" + wall_clock_limit if self.qsub_extra_params: qsub_command_line += ' ' + self.qsub_extra_params return qsub_command_line if self.hpc_type == "slurm": exclusive_option = " --exclusive" if exclusive else "" max_processes_option = "%%%i" % max_processes if max_processes else "" array_option = " --array=1-" + str(num_tasks) + max_processes_option if num_tasks else "" qsub_command_line = "sbatch --parsable" + exclusive_option + array_option + " --export=ALL --job-name=" + job_name + " -o " + log_file if isinstance(wait_for, str): wait_for = [wait_for] if isinstance(wait_for_array, str): wait_for_array = [wait_for_array] if not slot_dependency: wait_for.extend(wait_for_array) # combine lists wait_for_array = [] dependencies = [] if len(wait_for) > 0: dependencies.append("afterok:" + ':'.join(wait_for)) if len(wait_for_array) > 0 and slot_dependency: dependencies.append("aftercorr:" + ':'.join(wait_for_array)) if len(dependencies) > 0: qsub_command_line += " --dependency=" + ','.join(dependencies) if threads > 1: qsub_command_line += " --cpus-per-task=" + str(threads) if wall_clock_limit: qsub_command_line += " --time " + wall_clock_limit if self.qsub_extra_params: qsub_command_line += ' ' + self.qsub_extra_params return qsub_command_line if self.hpc_type == "torque": max_processes_option = "%%%i" % max_processes if max_processes else "" array_option = " -t 1-" + str(num_tasks) + max_processes_option if num_tasks else "" qsub_command_line = "qsub" + array_option + " -V -j oe -d " + os.getcwd() + " -N " + job_name + " -o " + log_file if isinstance(wait_for, str): wait_for = [wait_for] if isinstance(wait_for_array, str): wait_for_array = [wait_for_array] dependencies = [] if len(wait_for) > 0: dependencies.append("afterok:" + ':'.join(wait_for)) if len(wait_for_array) > 0: dependencies.append("afterokarray:" + ':'.join(wait_for_array)) if len(dependencies) > 0: qsub_command_line += " -W depend=" + ','.join(dependencies) if threads > 1: qsub_command_line += " -l nodes=1:ppn=" + str(threads) if wall_clock_limit: qsub_command_line += " -l walltime=" + wall_clock_limit if self.qsub_extra_params: qsub_command_line += ' ' + self.qsub_extra_params return qsub_command_line
[docs] def run(self, command_line, job_name, log_file, wait_for=[], wait_for_array=[], threads=1, parallel_environment=None, exclusive=False, wall_clock_limit=None, quiet=False): """Run a non-array job. Stderr is redirected (joined) to stdout. Parameters ---------- command_line : str Command with all arguments to be executed. job_name : str Job name that will appear in the job scheduler queue. log_file : str Path to the combined stdout / stderr log file. wait_for : str or list of str, optional defaults to empty list Single job id or list of jobs ids to wait for before beginning execution. Ignored when running locally. wait_for_array : str or list of str, optional defaults to empty list Single array job id or list of array jobs ids to wait for before beginning execution. Ignored when running locally. threads : int, optional defaults to 1 Number of CPU threads consumed by the job, unused when running locally. parallel_environment : str, optional defaults to None Name of the grid engine parallel execution environment. This must be specified when consuming more than one thread on grid engine. Ununsed for any other job scheduler. exclusive : bool, optional, defaults to False Requests exclusive access to compute nodes to prevent other jobs from sharing the node resources. Enforced only on SLURM, silently ignored for all other schedulers. wall_clock_limit : str, optional, defaults to None Maximum run-time; string of the form HH:MM:SS. Ignored when running locally. quiet : bool, optional, defaults to False Controls whether the job stderr and stdout are written to stdout in addition to the log file. By default, the job stderr and stdout are written to both stdout and the log file. When True, the job stderr and stdout are written to the log file only. Returns ------- job_id : str Grid or torque job id. Returns '0' in local mode. Raises ------ CalledProcessError In local mode, non-zero exit codes will raise CalledProcessError and the exception will be routed to the exception handler installed during JobRunner initialization, if any. If no exception handler was specified, the exception is re-raised. Examples -------- >>> # Normal case - verify job id is '0', stdout and stderr written to log file >>> from tempfile import NamedTemporaryFile >>> fout = NamedTemporaryFile(delete=False, mode='w'); fout.close() >>> runner = JobRunner("local") >>> # Parenthesis are needed when the command line contains multiple commands separated by semicolon >>> job_id = runner.run("(echo text to stdout; echo text to stderr 1>&2)", "JobName", fout.name) >>> type(job_id) == type("this is a string") True >>> job_id '0' >>> f = open(fout.name); out = f.read(); f.close(); os.unlink(fout.name) >>> print(out.strip()) text to stdout text to stderr >>> # Error case, external program returns non-zero. >>> # Need to ignore exception details to work with both python2 and python3. >>> job_id = runner.run("exit 100", "JobName", "") # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): CalledProcessError: Command 'set -o pipefail; exit 100 2>&1 | tee ' returned non-zero exit status 100 """ if self.hpc_type == "local": redirection = " > " + log_file + " 2>&1 " if quiet else " 2>&1 | tee " + log_file command_line = "set -o pipefail; " + command_line + redirection if self.verbose: print(command_line) # flush stdout to keep the unbuffered stderr in chronological order with stdout sys.stdout.flush() # Run command. Wait for command to complete. If the return code was zero then return, otherwise raise CalledProcessError try: subprocess.check_call(command_line, shell=True, executable="bash") except subprocess.CalledProcessError: if self.exception_handler: exc_type, exc_value, exc_traceback = sys.exc_info() self.exception_handler(exc_type, exc_value, exc_traceback) else: raise return '0' else: # grid, slurm, or torque qsub_command_line = self._make_qsub_command(job_name, log_file, wait_for, wait_for_array, threads=threads, parallel_environment=parallel_environment, exclusive=exclusive, wall_clock_limit=wall_clock_limit) # Run command and return its stdout output as a byte string. # If the return code was non-zero it raises a CalledProcessError. if self.hpc_type == "slurm": command_line = "'#!/bin/sh\\n'" + command_line shell_command_line = "echo -e " + command_line + " | " + qsub_command_line if self.verbose: print(shell_command_line) job_id = subprocess.check_output(shell_command_line, shell=True) if sys.version_info > (3,): job_id = job_id.decode(std_encoding) # Python 3 stdout is bytes, not str job_id = job_id.strip() if self.verbose: print("Job id=" + job_id) return job_id
[docs] def run_array(self, command_line, job_name, log_file, array_file, num_tasks=None, max_processes=None, wait_for=[], wait_for_array=[], slot_dependency=False, threads=1, parallel_environment=None, array_subshell=True, exclusive=False, wall_clock_limit=None, quiet=False): """Run an array of sub-tasks with the work of each task defined by a single line in the specified array_file. Parameters ---------- command_line : str Command to be executed with parameter placeholders of the form {1}, {2}, {3} ... job_name : str Job name that will appear in the job scheduler queue. log_file : str Path to the combined stdout / stderr log file. The sub-task number will be automatically appended. array_file : str Name of the file containing the arguments for each sub-task with one line per sub-task. The arguments for each sub-task are found at the line number corresponding to the sub-task number. The line is parsed and substituted into the command, replacing the parameter placeholders with the actual arguments. num_tasks : int, optional defaults to None Defines the number of subtasks in the job array. If not specified, the array_file must exist and the number of tasks will be equal to the number of lines in the file. Use this option when the array_file does not pre-exist and is created by a process that has not run yet. max_processes : int, optional defaults to None If None, the number of concurrent processes is limited to available CPU on an HPC and limited to the number of CPU cores when run locally. If not None, it sets the maximium number of concurrent processes for the array job. This works locally with xargs, and with grid and torque. wait_for : str or list of str, optional defaults to empty list Single job id or list of jobs ids to wait for before beginning execution. Ignored when running locally. wait_for_array : str or list of str, optional defaults to empty list Single array job id or list of array jobs ids to wait for before beginning execution. Ignored when running locally. slot_dependency : bool, optional defaults to False Ignored for all schedulers but grid engine. If true, the sub-tasks of the array job being submitted will be dependent on the completion of the corresponding sub-tasks of the jobs in the wait_for_array. Has no effect on the dependencies of non-array jobs. threads : int, optional defaults to 1 Number of CPU threads consumed by each sub-task of the job, unused when running locally. parallel_environment : str, optional defaults to None Name of the grid engine parallel execution environment. Ununsed for any other job scheduler. array_subshell : bool, optional defaults to True When true, HPC array job command lines are quoted and executed in a subshell. When running locally, this parameter is ignored -- commands are not quoted and always run in a subshell. exclusive : bool, optional, defaults to False Requests exclusive access to compute nodes to prevent other jobs from sharing the node resources. Enforced only on SLURM, silently ignored for all other schedulers. wall_clock_limit : str, optional, defaults to None Maximum run-time; string of the form HH:MM:SS. Ignored when running locally. quiet : bool, optional, defaults to False Controls whether the job stderr and stdout are written to stdout in addition to the log file. By default, the job stderr and stdout are written to both stdout and the log file. When True, the job stderr and stdout are written to the log file only. Returns ------- job_id : str Grid or torque job id. Returns '0' in local mode. Raises ------ JobRunnerException If the array_file is missing or empty, and num_tasks is not specified, JobRunnerException is raised. In local mode, non-zero exit codes will raise CalledProcessError and the exception will be routed to the exception handler installed during JobRunner initialization, if any. If no exception handler was specified, the exception is re-raised. """ # Determine the number of array slots if not num_tasks: if not os.path.isfile(array_file): raise JobRunnerException("The file %s does not exist.\nCannot start array job %s." % (array_file, job_name)) if os.path.getsize(array_file) == 0: raise JobRunnerException("The file %s is empty.\nCannot start array job %s." % (array_file, job_name)) with open(array_file) as f: num_tasks = sum(1 for line in f) if self.hpc_type == "grid": log_file += "-\\$TASK_ID" elif self.hpc_type == "slurm": log_file += "-%a" if self.hpc_type == "local": # Change parameter placeholder into bash variables ready to feed to bash through xargs for param_num in range(1, 10): placeholder = '{' + str(param_num) + '}' command_line = command_line.replace(placeholder, '$' + str(param_num)) # Use all CPU cores, if no limit requested if max_processes is None: max_processes = psutil.cpu_count() # Number the tasks with nl to get the task number into the log file suffix. # Allow up to 9 parameters per command. redirection = " > " + log_file + "-$0 2>&1'" if quiet else " 2>&1 | tee " + log_file + "-$0'" command_line = "head -n " + str(num_tasks) + " " + array_file + " | nl | xargs -P " + str(max_processes) + " -n 9 -L 1 bash -c + 'set -o pipefail; " + command_line + redirection if self.verbose: print(command_line) # flush stdout to keep the unbuffered stderr in chronological order with stdout sys.stdout.flush() # Run command. Wait for command to complete try: subprocess.check_call(command_line, shell=True, executable="bash") # If the return code is non-zero it raises a CalledProcessError except subprocess.CalledProcessError: if self.exception_handler: exc_type, exc_value, exc_traceback = sys.exc_info() self.exception_handler(exc_type, exc_value, exc_traceback) else: raise return '0' else: # grid, slurm, or torque qsub_command_line = self._make_qsub_command(job_name, log_file, wait_for, wait_for_array, slot_dependency, threads, parallel_environment, num_tasks, max_processes, exclusive=exclusive, wall_clock_limit=wall_clock_limit) if array_subshell: command_line = '"' + command_line + '"' compute_node_command = "qarrayrun --shell " + self.subtask_env_var_name + ' ' + array_file + ' ' + command_line else: compute_node_command = "qarrayrun " + self.subtask_env_var_name + ' ' + array_file + ' ' + command_line compute_node_command = "'" + compute_node_command + "'" if self.hpc_type == "slurm": compute_node_command = "'#!/bin/sh\\n'" + compute_node_command shell_command_line = "echo -e " + compute_node_command + " | " + qsub_command_line if self.verbose: print(shell_command_line) job_id = subprocess.check_output(shell_command_line, shell=True) # If the return code is non-zero it raises a CalledProcessError if sys.version_info > (3,): job_id = job_id.decode(std_encoding) # Python 3 stdout is bytes, not str job_id = job_id.strip() if self.strip_job_array_suffix: dot_idx = job_id.find('.') if dot_idx > 0: job_id = job_id[0: dot_idx] if self.verbose: print("Job id=" + job_id) return job_id