import os
import sys
import logging
import multiprocessing
import collections
import six
import string
import time
from .util import mkdirp
from .tracked import TrackedExecutable
LOG_FILE_NAME = "anadama.log"
[docs]def default(output_dir=None, log_level=None):
log = LOG_FILE_NAME
if not output_dir:
output_dir=os.getcwd()
log = os.path.abspath(os.path.join(output_dir, log))
return ReporterGroup([
LoggerReporter(log_level, log),
VerboseConsoleReporter()
])
[docs]class BaseReporter(object):
"""The base reporter defines functionality similar amongst all
reporters.
The workflow that executes the hooks defined can be accessed at
``self.run_context``.
"""
[docs] def started(self, run_context):
"""Executed when a run is started, usually when
:meth:`anadama2.workflow.Workflow.go` is executed.
"""
self.run_context = run_context
raise NotImplementedError()
[docs] def task_skipped(self, task_no):
"""Executed when anadama determines that a task needn't be run.
:param task_no: The task number of the task that is
skipped. To get the actual :class:`anadama2.Task` object
that's being skipped, do ``self.run_context.tasks[task_no]``.
:type task_no: int
"""
raise NotImplementedError()
[docs] def task_started(self, task_no):
"""Executed when anadama is just about to execute a task. These tasks
are in the queue and waiting for available resources.
:param task_no: The task number of the task that is
being started. To get the actual :class:`anadama2.Task` object
that's being executed, do ``self.run_context.tasks[task_no]``.
:type task_no: int
"""
raise NotImplementedError()
[docs] def task_running(self, task_no):
"""Executed when anadama is ready to run a task.
:param task_no: The task number of the task that is
being started. To get the actual :class:`anadama2.Task` object
that's being executed, do ``self.run_context.tasks[task_no]``.
:type task_no: int
"""
raise NotImplementedError()
[docs] def task_command(self, task_no):
"""Executed when anadama is ready to run a task. Logs the command
and version of any tracked executables.
:param task: The task number of the task that is
being started. To get the actual :class:`anadama2.Task` object
that's being executed, do ``self.run_context.tasks[task_no]``.
:type task_no: int
"""
pass
[docs] def task_failed(self, task_result):
"""Executed when a task fails.
:param task_no: The task number of the task that failed . To
get the actual :class:`anadama2.Task` object that failed, do
``self.run_context.tasks[task_no]``. To get the task result
of the task that failed,
do``self.run_context.task_results[task_no]``
:type task_no: int
"""
raise NotImplementedError()
[docs] def task_completed(self, task_result):
"""Executed when a task completes with no errors.
:param task_no: The task number of the task that succeeded . To
get the actual :class:`anadama2.Task` object that succeeded, do
``self.run_context.tasks[task_no]``. To get the task result
of the task that succeeded,
do``self.run_context.task_results[task_no]``
:type task_no: int
"""
raise NotImplementedError()
[docs] def finished(self):
"""Executed when a run finishes. This method is called whether there
are task failures or not.
"""
raise NotImplementedError()
[docs] def task_grid_status(self, task_no, grid_id, status_message):
"""Executed when anadama has grid information for a task. These messages
are reported when the status for a grid task has changed.
:param task_no: The task number of the task that is
being started. To get the actual :class:`anadama2.Task` object
that's being executed, do ``self.run_context.tasks[task_no]``.
:param grid_id: The id of the grid job.
:param status_message: The grid status message.
:type task_no: int
"""
raise NotImplementedError()
[docs] def task_grid_status_polling(self, task_no, grid_id, status_message):
"""Executed when anadama has grid information for a task at set polling
intervals. Status may repeat at each interval.
:param task_no: The task number of the task that is
being started. To get the actual :class:`anadama2.Task` object
that's being executed, do ``self.run_context.tasks[task_no]``.
:param grid_id: The id of the grid job.
:param status_message: The grid status message.
:type task_no: int
"""
raise NotImplementedError()
[docs]class ReporterGroup(BaseReporter):
"""Sometimes you want to use multiple reporters. For that, there is
ReporterGroup. Here's an example usage:
.. code:: python
from anadama2.reporters import ReporterGroup
my_grouped_reporter = ReporterGroup([custom_reporter_a,
custom_reporter_b,
custom_reporter_c])
...
ctx.go(reporter=my_grouped_reporter)
"""
def __init__(self, other_reporters):
self.reps = other_reporters
[docs] def started(self, ctx):
for r in self.reps:
r.started(ctx)
[docs] def task_skipped(self, task_no):
for r in self.reps:
r.task_skipped(task_no)
[docs] def task_started(self, task_no):
for r in self.reps:
r.task_started(task_no)
[docs] def task_running(self, task_no):
for r in self.reps:
r.task_running(task_no)
[docs] def task_command(self, task_no):
for r in self.reps:
r.task_command(task_no)
[docs] def task_failed(self, task_result):
for r in self.reps:
r.task_failed(task_result)
[docs] def task_completed(self, task_result):
for r in self.reps:
r.task_completed(task_result)
[docs] def finished(self):
for r in self.reps:
r.finished()
[docs] def task_grid_status(self, task_no, grid_id, status_message):
for r in self.reps:
r.task_grid_status(task_no, grid_id, status_message)
[docs] def task_grid_status_polling(self, task_no, grid_id, status_message):
for r in self.reps:
r.task_grid_status_polling(task_no, grid_id, status_message)
[docs]class ConsoleReporter(BaseReporter):
"""Prints out run progress to stderr.
An example readout is as follows:
::
(s)[ 1/ 6 - 16.67%] Track pre-existing dependencies
The readout is composed of five pieces of information:
1. The task status. That's the part in the parentheses.
* ``( )`` means that the task is currently being executed
* ``(+)`` means that the task finished successfully
* ``(s)`` means that the task was skipped
* ``(!)`` means that the task failed.
2. The current task number. That's the first number in the
square brackets.
3. The total number of tasks to be run or skipped. That's the
number after the forward slash.
4. The percent complete of the current run. That's the number
with a percent-sign next to it
5. The task name. That's the text that comes after the ending
square bracket. Remember that you can set the task name with
the ``name`` option to
:meth:`anadama2.workflow.Workflow.add_task`.
"""
msg_str = six.u("({:.1})[{:3}/{:3} - {:6.2f}%] {:.57}")
[docs] class stats:
skip = six.u("s")
fail = six.u("!")
done = six.u("+")
start = six.u(" ")
def __init__(self, *args, **kwargs):
self.failed_results = list()
self.n_open = 0
self.multithread_mode = False
def _msg(self, status, msg, c_r=False, visible=True):
if self.n_open > 1 and not self.multithread_mode:
self.multithread_mode = True
if visible is True:
sys.stderr.write(six.u('\n'))
s = self.msg_str.format(status, self.n_complete, self.n_tasks,
(float(self.n_complete)/self.n_tasks)*100,
six.u(msg))
if self.multithread_mode is True:
s += six.u("\n")
elif c_r:
self.n_open -= 1
s = six.u("\r") + s + six.u("\n")
else:
self.n_open += 1
if visible is True:
sys.stderr.write(s)
[docs] def started(self, ctx):
self.run_context = ctx
self.reset()
[docs] def task_started(self, task_no):
self._msg(self.stats.start, self.run_context.tasks[task_no].name,
visible=self.run_context.tasks[task_no].visible)
[docs] def task_skipped(self, task_no):
self.n_complete += 1
self._msg(self.stats.skip, self.run_context.tasks[task_no].name, True,
visible=self.run_context.tasks[task_no].visible)
[docs] def task_failed(self, task_result):
self.n_complete += 1
if task_result.task_no is None:
return
name = self.run_context.tasks[task_result.task_no].name
self.failed_results.append((name, task_result))
self._msg(self.stats.fail, name, True,
visible=self.run_context.tasks[task_result.task_no].visible)
[docs] def task_completed(self, task_result):
self.n_complete += 1
name = self.run_context.tasks[task_result.task_no].name
self._msg(self.stats.done, name, True,
visible=self.run_context.tasks[task_result.task_no].visible)
[docs] def finished(self):
sys.stderr.write(six.u("Run Finished\n"))
for name, result in self.failed_results:
sys.stderr.write(six.u("Task {} failed\n".format(result.task_no)))
sys.stderr.write(six.u(" Name: "+name+"\n"))
sys.stderr.write(six.u(" Original error: \n"))
for line in result.error.split("\n"):
sys.stderr.write(six.u(" "+line+"\n"))
self.reset()
[docs] def reset(self):
self.n_tasks = len(self.run_context.tasks)
self.n_complete = 0
self.failed = False
[docs]class VerboseConsoleReporter(BaseReporter):
"""Prints out verbose run progress to stdout.
An example readout is as follows:
::
DATE/TIME [ 0/18 - 0.00%] **Started ** Task 2: kneaddata
DATE/TIME [ 0/18 - 0.00%] **Started ** Task 0: kneaddata
DATE/TIME [ 1/18 - 5.56%] **Completed** Task 0: kneaddata
DATE/TIME [ 1/18 - 5.56%] **Started ** Task 4: metaphlan2.py
DATE/TIME [ 2/18 - 11.11%] **Completed** Task 2: kneaddata
The readout is composed of five pieces of information:
1. The date/time for the status message.
2. The status of all tasks. For example, [1/4 - 25%] indicates that
one task of the four total tasks have finished running. The
workflow is 25% complete.
3. The step the task has completed. Examples are "Started" and "Completed".
4. The task number.
5. The task description. This is the task name if set. If the task name is
the default then it is the first task action. This is the first command
and it is limited to the executable name. If it is a function, it will be the
name of the function.
"""
[docs] class stats:
skip = six.u("Skipped")
fail = six.u("Failed")
done = six.u("Completed")
start = six.u("Started")
ready = six.u("Ready")
grid_run = six.u("GridJob")
max_message_length = max(len(x) for x in [skip,fail,done,start,ready,grid_run])
def __init__(self, *args, **kwargs):
self.failed_results = list()
# Use a multiprocessing value so the total tasks completed is shared
# for the local multiprocessing tasks (is process and thread-safe)
self.n_complete = multiprocessing.Value('i',0)
# Set the max length for the total output line
self.max_length=120
def _msg(self, status, task_name, description, id, visible=True, grid_update=None):
# create a date/time string
s = time.strftime("(%b %d %H:%M:%S) ", time.localtime())
# create a message string for the current status
s += self.msg_str.format(self.n_complete.value, self.n_tasks,
(float(self.n_complete.value)/self.n_tasks)*100,
status, id, description)
# if command string is reduced, add ellipses
if len(description) > self.max_command_length:
s += " ..."
if grid_update:
s += self.grid_update_msg.format(grid_update[0], grid_update[1])
s += six.u("\n")
# only write if the task is visible
if visible is True:
sys.stdout.write(s)
def _increment_complete(self, task_no):
# update the number of completed visible tasks
if self.run_context.tasks[task_no].visible:
self.n_complete.value+=1
[docs] def started(self, ctx):
self.run_context = ctx
self.reset()
[docs] def task_started(self, task_no):
# This indicates the task is ready and waiting in the queue
self._msg(self.stats.ready, self.run_context.tasks[task_no].name,
self.run_context.tasks[task_no].description, task_no,
visible=self.run_context.tasks[task_no].visible)
[docs] def task_running(self, task_no):
self._msg(self.stats.start, self.run_context.tasks[task_no].name,
self.run_context.tasks[task_no].description, task_no,
visible=self.run_context.tasks[task_no].visible)
[docs] def task_skipped(self, task_no):
self._increment_complete(task_no)
self._msg(self.stats.skip, self.run_context.tasks[task_no].name,
self.run_context.tasks[task_no].description, task_no,
visible=self.run_context.tasks[task_no].visible)
[docs] def task_failed(self, task_result):
self._increment_complete(task_result.task_no)
if task_result.task_no is None:
return
name = self.run_context.tasks[task_result.task_no].name
self.failed_results.append((name, task_result))
self._msg(self.stats.fail, name, self.run_context.tasks[task_result.task_no].description,
task_result.task_no, visible=self.run_context.tasks[task_result.task_no].visible)
[docs] def task_completed(self, task_result):
self._increment_complete(task_result.task_no)
name = self.run_context.tasks[task_result.task_no].name
self._msg(self.stats.done, name, self.run_context.tasks[task_result.task_no].description,
task_result.task_no, visible=self.run_context.tasks[task_result.task_no].visible)
[docs] def task_grid_status(self, task_no, grid_id, status_message):
self._msg(self.stats.grid_run, self.run_context.tasks[task_no].name,
self.run_context.tasks[task_no].description, task_no, visible=True,
grid_update=[grid_id, status_message])
[docs] def task_grid_status_polling(self, task_no, grid_id, status_message):
self.task_grid_status(task_no, grid_id, status_message)
[docs] def finished(self):
sys.stdout.write(six.u("Run Finished\n"))
for name, result in self.failed_results:
sys.stdout.write(six.u("Task {} failed\n".format(result.task_no)))
sys.stdout.write(six.u(" Name: "+name+"\n"))
sys.stdout.write(six.u(" Original error: \n"))
for line in result.error.split("\n"):
print_line="".join(filter(lambda x: x in string.printable, line))
sys.stdout.write(six.u(" "+print_line+"\n"))
self.reset()
[docs] def reset(self):
# count the number of visible tasks
self.n_tasks = 0
for task in self.run_context.tasks:
if task.visible:
self.n_tasks+=1
# limit the full string length
max_task_length=len(str(self.n_tasks))
self.max_command_length = self.max_length -(20+max_task_length*3+20+self.stats.max_message_length)
self.msg_str = six.u("[{:"+str(max_task_length)+"}/{:"+str(max_task_length)+
"} - {:6.2f}%] **{:"+str(self.stats.max_message_length)+
"}** Task {:"+str(max_task_length)+
"}: {:."+str(self.max_command_length)+"}")
self.grid_update_msg = six.u(" <Grid JobId {:9}: {:.30}>")
self.n_complete.value = 0
self.failed = False
SHELL_COMMAND = "Executing with shell: "
VERSION_COMMAND = "Tracked executable version: "
[docs]class LoggerReporter(BaseReporter):
"""A reporter that uses :mod:`logging`.
:param loglevel_str: The logging level. Valid levels: subdebug,
debug, info, subwarning, warning, error
:type loglevel_str: str
:param logfile: The file to log to. Defaults to stdout.
:type logfile: str or file-like
:param fmt_str: The log format. See :mod:`logging` for more
information
:type fmt_str: str
"""
FORMAT = "%(asctime)s\t%(name)s\t%(funcName)s\t%(levelname)s: %(message)s"
def __init__(self, loglevel_str=None, logfile=None,
fmt_str=None, *args, **kwargs):
# create the log file folder if needed
if logfile:
mkdirp(os.path.dirname(logfile))
self.logger = logging.getLogger(self.__class__.__name__)
self.loglevel_str = loglevel_str.upper() or logging.WARNING
loglevel = getattr(logging, self.loglevel_str)
logkwds = {"format": fmt_str or self.FORMAT,
"level": loglevel }
if logfile and hasattr(logfile, "write"):
logkwds['stream'] = logfile
elif logfile and isinstance(logfile, six.string_types):
logkwds['filename'] = logfile
logging.basicConfig(**logkwds)
self.any_failed = False
self.start_log_message="task %i, %s : %s "
[docs] @classmethod
def read_log(cls,file,type,remove_paths=True):
""" Read the data from the log file """
# read all of the lines from the log
data=collections.OrderedDict()
with open(file) as file_handle:
lines=file_handle.readlines()
# look for commands, benchmarking, or executable version information
log_info={}
if type == "commands":
keyword=SHELL_COMMAND
for line in lines:
if keyword in line:
new_command=line.split(keyword)[-1].strip()
if remove_paths:
new_command=" ".join([os.path.split(i.rstrip(os.path.sep))[-1] for i in new_command.split(" ")])
data[new_command]=1
log_info=list(data.keys())
elif type == "benchmarking":
benchmarking_info={}
# read in the commands and also the benchmarking information
for i in range(len(lines)):
if "run_task_command" in lines[i]:
# get the id and the executable plus input item
id=lines[i].split()[-1].strip().replace(":","")
command_tokens=lines[i+1].strip().split()
executable=command_tokens[0]
try:
input=os.path.split(command_tokens[command_tokens.index("--input")+1])[-1]
except (ValueError, IndexError):
input="NA"
log_info[id]="\t".join([executable, input])
elif "Benchmark" in lines[i]:
# get the job id and benchmarking info
id=lines[i].split()[-1].strip().replace(":","")
info="\t".join([lines[j].strip().split(": ")[-1] for j in [i+1,i+2,i+3]])
benchmarking_info[id]=info
# set the log info to include id, executable, input and benchmarking
for id in benchmarking_info.keys():
if id in log_info:
log_info[id]=log_info[id]+"\t"+benchmarking_info[id]
elif type == "variables":
# read in the variables set for the workflow
# use the last variable values for workflow with more than one run
for line in lines:
if "started\tINFO:" in line and " = " in line:
data = line.rstrip().split(": ")[-1]
try:
variable, value = data.split(" = ")
log_info[variable]=value
except ValueError:
pass
else:
keyword = VERSION_COMMAND
# remove redundant version from strings since these are already
# identified as version information
format_output=lambda x: x.replace("Version:","").replace(", version","")
for line in lines:
if keyword in line:
data[format_output(line.split(keyword)[-1].strip())]=1
log_info=list(data.keys())
if not log_info:
log_info=["No {} found in log".format(type)]
return log_info
def _daginfo(self, task_no):
children = self.run_context.dag.successors(task_no)
parents = self.run_context.dag.predecessors(task_no)
msg = " {} parents: {}. {} children: {}."
return msg.format(len(parents), parents, len(children), children)
[docs] def log_event(self, msg, task_no, debug_msg=None):
visible=self.run_context.tasks[task_no].visible
description=self.run_context.tasks[task_no].description
# If in debug level and debug message is provided, append to end of message
if self.loglevel_str == "DEBUG" and debug_msg is not None:
msg=msg+" : "+debug_msg
# Write tasks that are not visible to log on debug level
if visible:
self.logger.info(self.start_log_message, task_no, description, msg)
else:
self.logger.debug(self.start_log_message, task_no, description, msg)
[docs] def started(self, ctx):
self.run_context = ctx
self.logger.info("Beginning AnADAMA run with %i tasks.",
len(self.run_context.tasks))
# if there are workflow variables set, add them to the log
try:
options=vars(ctx.vars.get_option_values())
except (AttributeError, TypeError):
options={}
if options:
self.logger.info("Workflow configuration options")
for name, value in options.items():
self.logger.info("{} = {}".format(name, value))
[docs] def task_skipped(self, task_no):
self.log_event("skipped", task_no, self._daginfo(task_no))
[docs] def task_started(self, task_no):
self.log_event("ready and waiting for resources", task_no, self._daginfo(task_no))
[docs] def task_running(self, task_no):
self.log_event("starting to run",task_no,self._daginfo(task_no))
[docs] def task_command(self, task_no):
# if a tracked executable is found, then log the version
for exe_depends in filter(lambda x: isinstance(x, TrackedExecutable), self.run_context.tasks[task_no].depends):
version = exe_depends.version()
if version:
self.logger.info(VERSION_COMMAND+" "+version)
# if a command, then log the shell command(s)
if not list(filter(lambda x: six.callable(x), self.run_context.tasks[task_no].actions)):
self.logger.info(SHELL_COMMAND+" "+" ".join(self.run_context.tasks[task_no].actions))
[docs] def task_failed(self, task_result):
self.logger.error(self.start_log_message,task_result.task_no,
self.run_context.tasks[task_result.task_no].description,
" Failed! Error message : {}".format(task_result.error))
self.any_failed = True
[docs] def task_grid_status(self, task_no, grid_id, status_message):
self.log_event(" grid job id {} has status {}".format(grid_id, status_message),
task_no, self.run_context.tasks[task_no].description)
[docs] def task_grid_status_polling(self, task_no, grid_id, status_message):
self.logger.debug(self.start_log_message,
task_no, self.run_context.tasks[task_no].description,
" grid job id {} has status {}".format(grid_id, status_message))
[docs] def task_completed(self, task_result):
self.log_event("completed successfully",task_result.task_no)
[docs] def finished(self):
if self.any_failed:
self.logger.error("AnADAMA run finished with errors.")
else:
self.logger.info("AnADAMA run finished.")
[docs]class WebhookReporter(BaseReporter):
"""TODO"""
pass