# -*- coding: utf-8 -*-
import os
import re
import sys
import logging
import itertools
import time
import six
from .grid import Grid
from .grid import GridWorker
from .grid import GridQueue
if os.name == 'posix' and sys.version_info[0] < 3:
import subprocess32 as subprocess
else:
import subprocess
[docs]class Slurm(Grid):
"""This class enables the Workflow class to dispatch tasks to
SLURM. Use it like so:
.. code:: python
from anadama2 import Workflow
from anadama2.slurm import Slurm
powerup = Slurm(partition="general")
ctx = Workflow(grid=powerup)
ctx.do("wget "
"ftp://public-ftp.hmpdacc.org/"
"HMMCP/finalData/hmp1.v35.hq.otu.counts.bz2 "
"-O @{input/hmp1.v35.hq.otu.counts.bz2}")
# run on slurm with 200 MB of memory, 4 cores, and 60 minutes
t1 = ctx.grid_do("pbzip2 -d -p 4 < #{input/hmp1.v35.hq.otu.counts.bz2} "
"> @{input/hmp1.v35.hq.otu.counts}",
mem=200, cores=4, time=60)
# run on slurm on the serial_requeue partition
ctx.grid_add_task("some_huge_analysis {depends[0]} {targets[0]}",
depends=t1.targets, targets="output.txt",
mem=4000, cores=1, time=300, partition="serial_requeue")
ctx.go()
:param partition: The name of the SLURM partition to submit tasks to
:type partition: str
:param tmpdir: A directory to store temporary files in. All
machines in the cluster must be able to read the contents of
this directory; uses :mod:`anadama2.picklerunner` to create
self-contained scripts to run individual tasks and calls
``srun`` to run the script on the cluster.
:type tmpdir: str
:keyword benchmark_on: Option to turn on/off benchmarking
: type benchmark_on: bool
:keyword options: Grid specific options to apply to each job
:type options: str
"""
def __init__(self, partition, tmpdir, benchmark_on=None, options=None, environment=None):
super(Slurm, self).__init__("slurm", GridWorker, SLURMQueue(partition, benchmark_on, options, environment), tmpdir, benchmark_on)
class SLURMQueue(GridQueue):
def __init__(self, partition, benchmark_on=None, options=None, environment=None):
super(SLURMQueue, self).__init__(partition, benchmark_on)
self.options=options
self.environment=environment
self.job_code_completed="COMPLETED"
self.job_code_cancelled="CANCELLED"
self.job_code_failed="FAILED"
self.job_code_timeout="TIMEOUT"
self.job_code_memkill="MEMKILL"
self.all_failed_codes=[self.job_code_failed,self.job_code_timeout,self.job_code_memkill,self.job_code_cancelled]
self.all_stopped_codes=[self.job_code_completed]+self.all_failed_codes
@staticmethod
def submit_command(grid_script):
return ["sbatch",grid_script]
def submit_template(self):
template = [
"#SBATCH -p ${partition}",
"#SBATCH -N 1 ",
"#SBATCH -n ${cpus}",
"#SBATCH -t ${time}",
"#SBATCH --mem ${memory}",
"#SBATCH -o ${output}",
"#SBATCH -e ${error}"]
# add user supplied options if provided
if self.options:
template+=["#SBATCH "+option for option in self.options]
# add user supplied environment commands if provided
if self.environment:
template+=self.environment
return template
def job_failed(self,status):
# check if the job has a status that it failed
# This will capture "CANCELLED by 0" and the short form "CANCELLED+"
return True if status.startswith(self.job_code_cancelled) or status in self.all_failed_codes else False
def job_stopped(self,status):
# check if the job has a status which indicates it stopped running
# This will capture "CANCELLED by 0" and the short form "CANCELLED+"
return True if status.startswith(self.job_code_cancelled) or status in self.all_stopped_codes else False
def job_memkill(self, status, jobid, memory):
return True if status == self.job_code_memkill else False
def job_timeout(self, status, jobid, time):
return True if status == self.job_code_timeout else False
def get_job_status_from_stderr(self, error_file, grid_job_status, grid_jobid):
# read the error file to see if any time or memory errors were reported
try:
slurm_errors=subprocess.check_output(["grep","-i","slurmstepd: error\|killed",error_file]).split("\n")
except (EnvironmentError, subprocess.CalledProcessError):
slurm_errors=[]
if slurm_errors:
# check for time or memory
if list(filter(lambda x: "TIME LIMIT" in x and self.job_code_cancelled in x, slurm_errors)):
logging.info("Slurm task %s cancelled due to time limit", grid_jobid)
# This has the slurm status of "TIMEOUT" from slurm sacct
grid_job_status=self.job_code_timeout
elif list(filter(lambda x: "exceeded memory limit" in x and "being killed" in x, slurm_errors)) or \
all([i in "\n".join(slurm_errors).lower() for i in ["exceeded","memory limit","killed"]]):
logging.info("Slurm task %s cancelled due to memory limit", grid_jobid)
# This has the slurm status of "CANCELLED by 0" from slurm sacct (short form is "CANCELLED+")
# It might also have the slurm status of FAILED
grid_job_status=self.job_code_memkill
return grid_job_status
def refresh_queue_status(self):
""" Get the latest status for the grid jobs using the same command for
jobs in the queue and for completed jobs to benchmark """
# Get the jobid, state, and resources for all jobs for the current user
stdout=self.run_grid_command_resubmit(["sacct","-o","JobID,State,AllocCPUs,Elapsed,MaxRSS"])
# remove the header information from the status lines
# split each line and remove empty lines
try:
info=filter(lambda x: x, [line.rstrip().split() for line in stdout.split("\n")[2:]])
except IndexError:
info=[]
# now merge the lines for each job so there is only a single item for each job
# the batch line includes the final MAXRSS
merged_info={}
for line in info:
if not "." in line[0]:
merged_info[line[0]]=line
elif ".ba" in line[0]:
try:
merged_info[line[0].split(".")[0]].append(line[-1])
except KeyError:
pass
info=merged_info.values()
return list(info)