import warnings
import luigi
from luigi.task import flatten
import luigi.tools.deps
from luigi.util import inherits as luigi_inherits, requires as luigi_requires
from luigi.parameter import (
Parameter,
DateParameter, MonthParameter, YearParameter, DateHourParameter, DateMinuteParameter, DateSecondParameter,
DateIntervalParameter, TimeDeltaParameter,
IntParameter, FloatParameter, BoolParameter,
TaskParameter, EnumParameter, DictParameter, ListParameter, TupleParameter,
NumericalParameter, ChoiceParameter, OptionalParameter
)
from pathlib import Path
import d6tcollect
d6tcollect.submit = True
import d6tflow.targets, d6tflow.tasks, d6tflow.settings
import d6tflow.utils
from d6tflow.cache import data as data
import d6tflow.cache
print('Welcome to d6tflow! For Q&A see https://github.com/d6t/d6tflow')
[docs]def set_dir(dir=None):
"""
Initialize d6tflow
Args:
dir (str): data output directory
"""
if d6tflow.settings.isinitpipe:
raise RuntimeError('Using d6tpipe, set dir via d6tflow.pipes.set_default()')
if dir is None:
dirpath = d6tflow.settings.dirpath
dirpath.mkdir(exist_ok=True)
else:
dirpath = Path(dir)
d6tflow.settings.dir = dir
d6tflow.settings.dirpath = dirpath
d6tflow.settings.isinit = True
return dirpath
@d6tcollect.collect
def preview(tasks, indent='', last=True, show_params=True, clip_params=False):
"""
Preview task flows
Args:
tasks (obj, list): task or list of tasks
"""
print('\n ===== Luigi Execution Preview ===== \n')
if not isinstance(tasks, (list,)):
tasks = [tasks]
for t in tasks:
print(d6tflow.utils.print_tree(t, indent=indent, last=last, show_params=show_params, clip_params=clip_params))
print('\n ===== Luigi Execution Preview ===== \n')
@d6tcollect.collect
def run(tasks, forced=None, forced_all=False, forced_all_upstream=False, confirm=False, workers=1, abort=True,
execution_summary=None, **kwargs):
"""
Run tasks locally. See luigi.build for additional details
Args:
tasks (obj, list): task or list of tasks
forced (list): list of forced tasks
forced_all (bool): force all tasks
forced_all_upstream (bool): force all tasks including upstream
confirm (list): confirm invalidating tasks
workers (int): number of workers
abort (bool): on errors raise exception
execution_summary (bool): print execution summary
kwargs: keywords to pass to luigi.build
"""
if not isinstance(tasks, (list,)):
tasks = [tasks]
# if forced_all_upstream is true we are going to force run tasks anyway
# in the second if condition.
# So in this case we are going to skip running forced tasks.
if forced_all and not forced_all_upstream:
forced = tasks
if forced_all_upstream:
for t in tasks:
invalidate_upstream(t, confirm=confirm)
if forced is not None:
if not isinstance(forced, (list,)):
forced = [forced]
invalidate = []
for tf in forced:
for tup in tasks:
invalidate.append(d6tflow.taskflow_downstream(tf, tup))
invalidate = set().union(*invalidate)
invalidate = {t for t in invalidate if t.complete()}
if len(invalidate) > 0:
if confirm:
print('Forced tasks', invalidate)
c = input('Confirm invalidating forced tasks (y/n)')
else:
c = 'y'
if c == 'y':
[t.invalidate(confirm=False) for t in invalidate]
else:
return None
execution_summary = execution_summary if execution_summary is not None else d6tflow.settings.execution_summary
opts = {**{'workers': workers, 'local_scheduler': True, 'log_level': d6tflow.settings.log_level}, **kwargs}
if execution_summary and luigi.__version__ >= '3.0.0':
opts['detailed_summary'] = True
result = luigi.build(tasks, **opts)
if isinstance(result, bool):
success = result
else:
success = result.scheduling_succeeded
if abort and not success:
raise RuntimeError(
'Exception found running flow, check trace. For more details see https://d6tflow.readthedocs.io/en/latest/run.html#debugging-failures')
if execution_summary:
if not isinstance(result, bool):
print(result.summary_text)
return result
[docs]def taskflow_upstream(task, only_complete=False):
"""
Get all upstream inputs for a task
Args:
task (obj): task
"""
tasks = d6tflow.utils.traverse(task)
if only_complete:
tasks = [t for t in tasks if t.complete()]
return tasks
[docs]def taskflow_downstream(task, task_downstream, only_complete=False):
"""
Get all downstream outputs for a task
Args:
task (obj): task
task_downstream (obj): downstream target task
"""
tasks = luigi.tools.deps.find_deps(task_downstream, task.task_family)
if only_complete:
tasks = {t for t in tasks if t.complete()}
return tasks
[docs]def invalidate_all(confirm=False):
"""
Invalidate all tasks by deleting all files in data directory
Args:
confirm (bool): confirm operation
"""
# record all tasks that run and their output vs files present
raise NotImplementedError()
[docs]def invalidate_orphans(confirm=False):
"""
Invalidate all unused task outputs
Args:
confirm (bool): confirm operation
"""
# record all tasks that run and their output vs files present
raise NotImplementedError()
[docs]def show(task):
"""
Show task execution status
Args:
tasks (obj, list): task or list of tasks
"""
preview(task)
[docs]def invalidate_upstream(task, confirm=False):
"""
Invalidate all tasks upstream tasks in a flow.
For example, you have 3 dependant tasks. Normally you run Task3 but you've changed parameters for Task1. By invalidating Task3 it will check the full DAG and realize Task1 needs to be invalidated and therefore Task2 and Task3 also.
Args:
task (obj): task to invalidate. This should be an upstream task for which you want to check upstream dependencies for invalidation conditions
confirm (bool): confirm operation
"""
tasks = taskflow_upstream(task, only_complete=False)
if len(tasks) == 0:
print('no tasks to invalidate')
return True
if confirm:
print('Completed tasks to invalidate:')
for t in tasks:
print(t)
c = input('Confirm invalidating tasks (y/n)')
else:
c = 'y'
if c == 'y':
[t.invalidate(confirm=False) for t in tasks]
[docs]def invalidate_downstream(task, task_downstream, confirm=False):
"""
Invalidate all downstream tasks in a flow.
For example, you have 3 dependant tasks. Normally you run Task3 but you've changed parameters for Task1. By invalidating Task3 it will check the full DAG and realize Task1 needs to be invalidated and therefore Task2 and Task3 also.
Args:
task (obj): task to invalidate. This should be an downstream task for which you want to check downstream dependencies for invalidation conditions
task_downstream (obj): downstream task target
confirm (bool): confirm operation
"""
tasks = taskflow_downstream(task, task_downstream, only_complete=True)
if len(tasks) == 0:
print('no tasks to invalidate')
return True
if confirm:
print('Completed tasks to invalidate:')
for t in tasks:
print(t)
c = input('Confirm invalidating tasks (y/n)')
else:
c = 'y'
if c == 'y':
[t.invalidate(confirm=False) for t in tasks]
return True
else:
return False
[docs]def clone_parent(cls):
warnings.warn("This is replaced with `@d6tflow.requires()`", DeprecationWarning, stacklevel=2)
def requires(self):
return self.clone_parent()
setattr(cls, 'requires', requires)
return cls
# Like luigi.utils.inherits but for handling dictionaries
[docs]class dict_inherits:
def __init__(self, *tasks_to_inherit):
super(dict_inherits, self).__init__()
if not tasks_to_inherit:
raise TypeError("tasks_to_inherit cannot be empty")
# We know the first arg is a dict.
self.tasks_to_inherit = tasks_to_inherit[0]
def __call__(self, task_that_inherits):
for task_to_inherit in self.tasks_to_inherit:
for param_name, param_obj in self.tasks_to_inherit[task_to_inherit].get_params():
# Check if the parameter exists in the inheriting task
if not hasattr(task_that_inherits, param_name):
# If not, add it to the inheriting task
setattr(task_that_inherits, param_name, param_obj)
# adding dictionary functionality
def clone_parents_dict(_self, **kwargs):
return {
task_to_inherit: _self.clone(cls=self.tasks_to_inherit[task_to_inherit], **kwargs)
for task_to_inherit in self.tasks_to_inherit
}
task_that_inherits.clone_parents_dict = clone_parents_dict
return task_that_inherits
# Like luigi.utils.requires but for handling dictionaries
[docs]class dict_requires:
def __init__(self, *tasks_to_require):
super(dict_requires, self).__init__()
if not tasks_to_require:
raise TypeError("tasks_to_require cannot be empty")
self.tasks_to_require = tasks_to_require[0] # Assign the dictionary
def __call__(self, task_that_requires):
task_that_requires = dict_inherits(self.tasks_to_require)(task_that_requires)
def requires(_self):
return _self.clone_parents_dict()
task_that_requires.requires = requires
return task_that_requires
[docs]def inherits(*tasks_to_inherit):
if isinstance(tasks_to_inherit[0], dict):
return dict_inherits(*tasks_to_inherit)
return luigi_inherits(*tasks_to_inherit)
[docs]def requires(*tasks_to_require):
# Check the type; if a dictionary call our custom requires decorator
is_dict = isinstance(tasks_to_require[0], dict)
if is_dict:
return dict_requires(*tasks_to_require)
return luigi_requires(*tasks_to_require)
[docs]class Workflow(object):
"""
The class is used to orchestrate tasks and define a task pipeline
"""
def __init__(self, task=None, params=None, path=None, env=None):
# Task List Check
if isinstance(task, (list,)):
raise Exception('Task lists should be passed to the flow.run function.')
# Set Env
if path is not None and env is not None:
path = str(path) + f"/env={env}"
# Will overide other tasks with this task's main path
elif env is not None:
path = getattr(task, 'path', d6tflow.settings.dirpath)
path = str(path) + f"/env={env}"
# Set Params
self.params = {} if params is None else params
self.params = self.params if path is None else dict(**self.params, **{'path': path})
# Add flows to params
self.params = dict(**self.params, **{'flows': {}})
# Default Task
self.default_task = task
# If Task is set, Try to send Flow path to all other tasks
if task:
# Attach to tasks
if not isinstance(task, (list,)):
task = [task]
self._attach_to_tasks(task, path=path)
[docs] def preview(self, tasks=None, indent='', last=True, show_params=True, clip_params=False):
"""
Preview task flows with the workflow parameters
Args:
tasks (class, list): task class or list of tasks class
"""
if not isinstance(tasks, (list,)):
tasks = [tasks]
tasks_inst = [self.get_task(x) for x in tasks]
return preview(tasks=tasks_inst, indent=indent, last=last, show_params=show_params, clip_params=clip_params)
[docs] def run(self, tasks=None, forced=None, forced_all=False, forced_all_upstream=False, confirm=False, workers=1,
abort=True, execution_summary=None, **kwargs):
"""
Run tasks with the workflow parameters. See luigi.build for additional details
Args:
tasks (class, list): task class or list of tasks class
forced (list): list of forced tasks
forced_all (bool): force all tasks
forced_all_upstream (bool): force all tasks including upstream
confirm (list): confirm invalidating tasks
workers (int): number of workers
abort (bool): on errors raise exception
execution_summary (bool): print execution summary
kwargs: keywords to pass to luigi.build
"""
if not isinstance(tasks, (list,)):
tasks = [tasks]
tasks_inst = [self.get_task(x) for x in tasks]
# Before Running if Path/Flow Param is set, Set it to all other tasks
path_param = None
flow_param = None
if 'path' in self.params.keys():
path_param = self.params['path']
if self.params['flows']:
flow_param = self.params['flows']
# Attach to tasks
self._attach_to_tasks(tasks, flows=flow_param, path=path_param)
return run(tasks_inst, forced=forced, forced_all=forced_all, forced_all_upstream=forced_all_upstream,
confirm=confirm, workers=workers, abort=abort, execution_summary=execution_summary, **kwargs)
[docs] def outputLoad(self, task=None, keys=None, as_dict=False, cached=False):
"""
Load output from task with the workflow parameters
Args:
task (class): task class
keys (list): list of data to load
as_dict (bool): cache data in memory
cached (bool): cache data in memory
Returns: list or dict of all task output
"""
return self.get_task(task).outputLoad(keys=keys, as_dict=as_dict, cached=cached)
[docs] def outputPath(self, task=None):
"""
Ouputs the Path given a task
Args:
task (class): task class
Returns: list or dict of all task paths
"""
# Get Output
output = self.get_task(task).output()
# If Output is Dict, we have multiple outputs
if type(output) is dict:
# Get Paths
for output_name, output_target in output.items():
output[output_name] = output_target.path
return output
else:
return output.path
[docs] def complete(self, task=None):
return self.get_task(task).complete()
[docs] def output(self, task=None):
return self.get_task(task).output()
[docs] def outputLoadAll(self, task=None, keys=None, as_dict=False, cached=False):
"""
Load all output from task with the workflow parameters
Args:
task (class): task class
keys (list): list of data to load
as_dict (bool): cache data in memory
cached (bool): cache data in memory
Returns: list or dict of all task output
"""
task_inst = self.get_task(task)
data_dict = {}
tasks = taskflow_upstream(task_inst)
for task in tasks:
data_dict[type(task).__name__] = task.outputLoad(keys=keys, as_dict=as_dict, cached=cached)
return data_dict
[docs] def reset(self, task=None, confirm=False):
task_inst = self.get_task(task)
return task_inst.reset(confirm)
[docs] def reset_downstream(self, task, task_downstream=None, confirm=False):
"""
Invalidate all downstream tasks in a flow.
For example, you have 3 dependant tasks. Normally you run Task3 but you've changed parameters for Task1. By invalidating Task3 it will check the full DAG and realize Task1 needs to be invalidated and therefore Task2 and Task3 also.
Args:
task (obj): task to invalidate. This should be an downstream task for which you want to check downstream dependencies for invalidation conditions
task_downstream (obj): downstream task target
confirm (bool): confirm operation
"""
task_inst = self.get_task(task)
task_downstream_inst = self.get_task(task_downstream)
return invalidate_downstream(task_inst, task_downstream_inst, confirm)
[docs] def reset_upstream(self, task, confirm=False):
task_inst = self.get_task(task)
return invalidate_upstream(task_inst, confirm)
[docs] def set_default(self, task):
"""
Set default task for the workflow object
Args:
task(obj) The task to be set as a default task
"""
self.default_task = task
[docs] def get_task(self, task=None):
"""
Get task with the workflow parameters
Args:
task(class)
Retuns: An instance of task class with the workflow parameters
"""
if task is None:
if self.default_task is None:
raise RuntimeError('no default tasks set')
else:
task = self.default_task
return task(**self.params)
# Add a Flow to the Params of the Workflow
[docs] def attach_flow(self, flow=None, flow_name="flow"):
if self.params['flows']:
self.params['flows'][flow_name] = flow
else:
self.params['flows'] = {flow_name: flow}
# Attach Flow/Path to the Tasks
def _attach_to_tasks(self, tasks, flows=None, path=None):
# If Both not set
if not flows and not path:
return
# Get all paths
for t_task in tasks:
task_inst = self.get_task(t_task)
tasks = taskflow_upstream(task_inst)
# Overide param of all tasks
for temp_task in tasks:
if flows:
temp_task.flows = self.params['flows']
if path:
temp_task.path = self.params['path']
[docs]class WorkflowMulti(object):
"""
A multi experiment workflow can be defined with multiple flows and separate parameters for each flow and a default task. It is mandatory to define the flows and parameters for each of the flows.
"""
def __init__(self, task=None, params=None, path=None, env=None):
# Task List Check
if isinstance(task, (list,)):
raise Exception('Task lists should be passed to the flow.run function.')
self.params = params
if params is not None and type(params) not in [dict, list]:
raise Exception("Params has to be a dictionary with key defining the flow name or a list")
if type(params) == dict:
if type(list(params.values())[0]) == list:
self.params = d6tflow.utils.generate_exps_for_multi_param(params)
if type(params) == list:
params = {i: v for i, v in enumerate(params)}
self.params = params
if params is None or len(params.keys()) == 0:
raise Exception("Need to pass task parameters or use d6tflow.Workflow")
self.default_task = task
if params is not None:
self.workflow_objs = {k: Workflow(task=task, params=v, path=path, env=env) for k, v in self.params.items()}
[docs] def run(self, tasks=None, flow=None, forced=None, forced_all=False, forced_all_upstream=False, confirm=False,
workers=1, abort=True, execution_summary=None, **kwargs):
"""
Run tasks with the workflow parameters for a flow. See luigi.build for additional details
Args:
flow (string): The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
tasks (class, list): task class or list of tasks class
forced (list): list of forced tasks
forced_all (bool): force all tasks
forced_all_upstream (bool): force all tasks including upstream
confirm (list): confirm invalidating tasks
workers (int): number of workers
abort (bool): on errors raise exception
execution_summary (bool): print execution summary
kwargs: keywords to pass to luigi.build
"""
if flow is not None:
return self.workflow_objs[flow].run(tasks=tasks, forced=forced, forced_all=forced_all,
forced_all_upstream=forced_all_upstream, confirm=confirm,
workers=workers,
abort=abort,
execution_summary=execution_summary, **kwargs)
result = {}
for exp_name in self.params.keys():
result[exp_name] = self.workflow_objs[exp_name].run(tasks, forced, forced_all, forced_all_upstream,
confirm, workers, abort,
execution_summary, **kwargs)
return result
[docs] def outputLoad(self, task=None, flow=None, keys=None, as_dict=False, cached=False):
"""
Load output from task with the workflow parameters for a flow
Args:
flow (string): The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
task (class): task class
keys (list): list of data to load
as_dict (bool): cache data in memory
cached (bool): cache data in memory
Returns: list or dict of all task output
"""
if flow is not None:
return self.workflow_objs[flow].outputLoad(task, keys, as_dict, cached)
data = {}
for exp_name in self.params.keys():
data[exp_name] = self.workflow_objs[exp_name].outputLoad(task, keys, as_dict, cached)
return data
[docs] def outputPath(self, task=None, flow=None):
"""
Ouputs the Path given a task
Args:
task (class): task class
flow (string): The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
Returns: list or dict of all task paths
"""
if flow is not None:
return self.workflow_objs[flow].outputPath(task)
data = {}
for exp_name in self.params.keys():
data[exp_name] = self.workflow_objs[exp_name].outputPath(task)
return data
[docs] def outputLoadAll(self, task=None, flow=None, keys=None, as_dict=False, cached=False):
"""
Load all output from task with the workflow parameters for a flow
Args:
flow (string): The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
task (class): task class
keys (list): list of data to load
as_dict (bool): cache data in memory
cached (bool): cache data in memory
Returns: list or dict of all task output
"""
if flow is not None:
return self.workflow_objs[flow].outputLoadAll(task, keys, as_dict, cached)
data = {}
for exp_name in self.params.keys():
data[exp_name] = self.workflow_objs[exp_name].outputLoadAll(task, keys, as_dict, cached)
return data
[docs] def reset(self, task=None, flow=None, confirm=False):
if flow is not None:
return self.workflow_objs[flow].reset(task, confirm)
return {self.workflow_objs[exp_name].reset(task, confirm) for exp_name in self.params.keys()}
[docs] def reset_downstream(self, task=None, flow=None, confirm=False):
if flow is not None:
return self.workflow_objs[flow].reset(task, confirm)
return {self.workflow_objs[exp_name].reset_downstream(task, confirm=confirm) for exp_name in self.params.keys()}
[docs] def reset_upstream(self, task=None, flow=None, confirm=False):
if flow is not None:
return self.workflow_objs[flow].reset(task, confirm)
return {self.workflow_objs[exp_name].reset_upstream(task, confirm) for exp_name in self.params.keys()}
[docs] def preview(self, tasks=None, flow=None, indent='', last=True, show_params=True, clip_params=False):
"""
Preview task flows with the workflow parameters for a flow
Args:
flow (string): The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
tasks (class, list): task class or list of tasks class
"""
if not isinstance(tasks, (list,)):
tasks = [tasks]
if flow is not None:
return self.workflow_objs[flow].preview(tasks)
data = {}
for exp_name in self.params.keys():
data[exp_name] = self.workflow_objs[exp_name].preview(tasks=tasks, indent=indent, last=last,
show_params=show_params, clip_params=clip_params)
return data
[docs] def set_default(self, task):
"""
Set default task for the workflow. The default task is set for all the experiments
Args:
task(obj) The task to be set as a default task
"""
self.default_task = task
for exp_name in self.params.keys():
self.workflow_objs[exp_name].set_default(task)
[docs] def get_task(self, task=None, flow=None):
"""
Get task with the workflow parameters for a flow
Args:
flow (string): The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
task(class): task class
Retuns: An instance of task class with the workflow parameters
"""
if task is None:
if self.default_task is None:
raise RuntimeError('no default tasks set')
else:
task = self.default_task
if flow is None:
return {exp_name: self.workflow_objs[exp_name].get_task(task) for exp_name in self.params.keys()}
return self.workflow_objs[flow].get_task(task)
[docs] def get_flow(self, flow):
"""
Get flow by name
Args:
flow (string): The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
Retuns: An instance of Workflow
"""
return self.workflow_objs[flow]
from jinja2 import Template
[docs]class FlowExport(object):
"""
Auto generate task files to quickly share workflows with others using d6tpipe.
Args:
tasks (obj): task or list of tasks to share
flows (obj): flow or list of flows to get tasks from.
save (bool): save to tasks file
path_export (str): filename for tasks to export.
"""
def __init__(self, tasks=None, flows=None, save=False, path_export='tasks_d6tpipe.py'):
tasks = [] if tasks is None else tasks
flows = [] if flows is None else flows
if not isinstance(tasks, (list,)):
tasks = [tasks]
if not isinstance(flows, (list,)):
flows = [flows]
for flow in flows:
task_inst = flow.get_task()
t_tasks = taskflow_upstream(task_inst)
for task in t_tasks:
tasks.append(task)
self.tasks = tasks
self.save = save
self.path_export = path_export
# file templates
self.tmpl_tasks = '''
import d6tflow
import luigi
import datetime
{% for task in tasks -%}
class {{task.name}}({{task.class}}):
external=True
persist={{task.obj.persist}}
{% if task.path -%}
path="{{task.path}}"
{% endif -%}
{% if task.obj.pipename -%}
pipename={{task.obj.pipename}}
{% endif -%}
{% if task.obj.task_group -%}
task_group="{{task.obj.task_group}}"
{% endif -%}
{% for param in task.params -%}
{{param.name}}={{param.class}}(default={{param.value}})
{% endfor %}
{% endfor %}
'''
[docs] def generate(self):
"""
Generate output files
"""
tasksPrint = []
for task in self.tasks:
if getattr(task, 'export', True):
class_ = next(c for c in type(task).__mro__ if 'd6tflow.tasks.' in str(c)) # type(task).__mro__[1]
# Get Path
task_path = getattr(task, 'path', None)
task_path = Path(task_path) if task_path else None
# Create Dict
taskPrint = {'name': task.__class__.__name__, 'class': class_.__module__ + "." + class_.__name__,
'obj': task, 'persist': task.persist, 'path': task_path,
'params': [{'name': param[0],
'class': f'{param[1].__class__.__module__}.{param[1].__class__.__name__}',
'value': repr(getattr(task,param[0]))} for param in task.get_params()]} # param[1]._default
tasksPrint.append(taskPrint)
tasksPrint[-1], tasksPrint[0:-1] = tasksPrint[0], tasksPrint[1:]
# Print or Save to File
if not self.save:
print(Template(self.tmpl_tasks).render(tasks=tasksPrint))
else:
# Write Tasks
with open(self.path_export, 'w') as fh:
fh.write(Template(self.tmpl_tasks).render(tasks=tasksPrint))