Source code for d6tflow

import warnings
import luigi
from luigi.task import flatten
import luigi.tools.deps
from luigi.util import inherits as luigi_inherits, requires as luigi_requires
from luigi.parameter import (
    Parameter,
    DateParameter, MonthParameter, YearParameter, DateHourParameter, DateMinuteParameter, DateSecondParameter,
    DateIntervalParameter, TimeDeltaParameter,
    IntParameter, FloatParameter, BoolParameter,
    TaskParameter, EnumParameter, DictParameter, ListParameter, TupleParameter,
    NumericalParameter, ChoiceParameter, OptionalParameter
)

from pathlib import Path

import d6tcollect
d6tcollect.submit = True

import d6tflow.targets, d6tflow.tasks, d6tflow.settings
import d6tflow.utils
from d6tflow.cache import data as data
import d6tflow.cache

print('Welcome to d6tflow! For Q&A see https://github.com/d6t/d6tflow')

[docs]def set_dir(dir=None): """ Initialize d6tflow Args: dir (str): data output directory """ if d6tflow.settings.isinitpipe: raise RuntimeError('Using d6tpipe, set dir via d6tflow.pipes.set_default()') if dir is None: dirpath = d6tflow.settings.dirpath dirpath.mkdir(exist_ok=True) else: dirpath = Path(dir) d6tflow.settings.dir = dir d6tflow.settings.dirpath = dirpath d6tflow.settings.isinit = True return dirpath
@d6tcollect.collect def preview(tasks, indent='', last=True, show_params=True, clip_params=False): """ Preview task flows Args: tasks (obj, list): task or list of tasks """ print('\n ===== Luigi Execution Preview ===== \n') if not isinstance(tasks, (list,)): tasks = [tasks] for t in tasks: print(d6tflow.utils.print_tree(t, indent=indent, last=last, show_params=show_params, clip_params=clip_params)) print('\n ===== Luigi Execution Preview ===== \n') @d6tcollect.collect def run(tasks, forced=None, forced_all=False, forced_all_upstream=False, confirm=False, workers=1, abort=True, execution_summary=None, **kwargs): """ Run tasks locally. See luigi.build for additional details Args: tasks (obj, list): task or list of tasks forced (list): list of forced tasks forced_all (bool): force all tasks forced_all_upstream (bool): force all tasks including upstream confirm (list): confirm invalidating tasks workers (int): number of workers abort (bool): on errors raise exception execution_summary (bool): print execution summary kwargs: keywords to pass to luigi.build """ if not isinstance(tasks, (list,)): tasks = [tasks] # if forced_all_upstream is true we are going to force run tasks anyway # in the second if condition. # So in this case we are going to skip running forced tasks. if forced_all and not forced_all_upstream: forced = tasks if forced_all_upstream: for t in tasks: invalidate_upstream(t, confirm=confirm) if forced is not None: if not isinstance(forced, (list,)): forced = [forced] invalidate = [] for tf in forced: for tup in tasks: invalidate.append(d6tflow.taskflow_downstream(tf, tup)) invalidate = set().union(*invalidate) invalidate = {t for t in invalidate if t.complete()} if len(invalidate) > 0: if confirm: print('Forced tasks', invalidate) c = input('Confirm invalidating forced tasks (y/n)') else: c = 'y' if c == 'y': [t.invalidate(confirm=False) for t in invalidate] else: return None execution_summary = execution_summary if execution_summary is not None else d6tflow.settings.execution_summary opts = {**{'workers': workers, 'local_scheduler': True, 'log_level': d6tflow.settings.log_level}, **kwargs} if execution_summary and luigi.__version__ >= '3.0.0': opts['detailed_summary'] = True result = luigi.build(tasks, **opts) if isinstance(result, bool): success = result else: success = result.scheduling_succeeded if abort and not success: raise RuntimeError( 'Exception found running flow, check trace. For more details see https://d6tflow.readthedocs.io/en/latest/run.html#debugging-failures') if execution_summary: if not isinstance(result, bool): print(result.summary_text) return result
[docs]def taskflow_upstream(task, only_complete=False): """ Get all upstream inputs for a task Args: task (obj): task """ tasks = d6tflow.utils.traverse(task) if only_complete: tasks = [t for t in tasks if t.complete()] return tasks
[docs]def taskflow_downstream(task, task_downstream, only_complete=False): """ Get all downstream outputs for a task Args: task (obj): task task_downstream (obj): downstream target task """ tasks = luigi.tools.deps.find_deps(task_downstream, task.task_family) if only_complete: tasks = {t for t in tasks if t.complete()} return tasks
[docs]def invalidate_all(confirm=False): """ Invalidate all tasks by deleting all files in data directory Args: confirm (bool): confirm operation """ # record all tasks that run and their output vs files present raise NotImplementedError()
[docs]def invalidate_orphans(confirm=False): """ Invalidate all unused task outputs Args: confirm (bool): confirm operation """ # record all tasks that run and their output vs files present raise NotImplementedError()
[docs]def show(task): """ Show task execution status Args: tasks (obj, list): task or list of tasks """ preview(task)
[docs]def invalidate_upstream(task, confirm=False): """ Invalidate all tasks upstream tasks in a flow. For example, you have 3 dependant tasks. Normally you run Task3 but you've changed parameters for Task1. By invalidating Task3 it will check the full DAG and realize Task1 needs to be invalidated and therefore Task2 and Task3 also. Args: task (obj): task to invalidate. This should be an upstream task for which you want to check upstream dependencies for invalidation conditions confirm (bool): confirm operation """ tasks = taskflow_upstream(task, only_complete=False) if len(tasks) == 0: print('no tasks to invalidate') return True if confirm: print('Completed tasks to invalidate:') for t in tasks: print(t) c = input('Confirm invalidating tasks (y/n)') else: c = 'y' if c == 'y': [t.invalidate(confirm=False) for t in tasks]
[docs]def invalidate_downstream(task, task_downstream, confirm=False): """ Invalidate all downstream tasks in a flow. For example, you have 3 dependant tasks. Normally you run Task3 but you've changed parameters for Task1. By invalidating Task3 it will check the full DAG and realize Task1 needs to be invalidated and therefore Task2 and Task3 also. Args: task (obj): task to invalidate. This should be an downstream task for which you want to check downstream dependencies for invalidation conditions task_downstream (obj): downstream task target confirm (bool): confirm operation """ tasks = taskflow_downstream(task, task_downstream, only_complete=True) if len(tasks) == 0: print('no tasks to invalidate') return True if confirm: print('Completed tasks to invalidate:') for t in tasks: print(t) c = input('Confirm invalidating tasks (y/n)') else: c = 'y' if c == 'y': [t.invalidate(confirm=False) for t in tasks] return True else: return False
[docs]def clone_parent(cls): warnings.warn("This is replaced with `@d6tflow.requires()`", DeprecationWarning, stacklevel=2) def requires(self): return self.clone_parent() setattr(cls, 'requires', requires) return cls
# Like luigi.utils.inherits but for handling dictionaries
[docs]class dict_inherits: def __init__(self, *tasks_to_inherit): super(dict_inherits, self).__init__() if not tasks_to_inherit: raise TypeError("tasks_to_inherit cannot be empty") # We know the first arg is a dict. self.tasks_to_inherit = tasks_to_inherit[0] def __call__(self, task_that_inherits): for task_to_inherit in self.tasks_to_inherit: for param_name, param_obj in self.tasks_to_inherit[task_to_inherit].get_params(): # Check if the parameter exists in the inheriting task if not hasattr(task_that_inherits, param_name): # If not, add it to the inheriting task setattr(task_that_inherits, param_name, param_obj) # adding dictionary functionality def clone_parents_dict(_self, **kwargs): return { task_to_inherit: _self.clone(cls=self.tasks_to_inherit[task_to_inherit], **kwargs) for task_to_inherit in self.tasks_to_inherit } task_that_inherits.clone_parents_dict = clone_parents_dict return task_that_inherits
# Like luigi.utils.requires but for handling dictionaries
[docs]class dict_requires: def __init__(self, *tasks_to_require): super(dict_requires, self).__init__() if not tasks_to_require: raise TypeError("tasks_to_require cannot be empty") self.tasks_to_require = tasks_to_require[0] # Assign the dictionary def __call__(self, task_that_requires): task_that_requires = dict_inherits(self.tasks_to_require)(task_that_requires) def requires(_self): return _self.clone_parents_dict() task_that_requires.requires = requires return task_that_requires
[docs]def inherits(*tasks_to_inherit): if isinstance(tasks_to_inherit[0], dict): return dict_inherits(*tasks_to_inherit) return luigi_inherits(*tasks_to_inherit)
[docs]def requires(*tasks_to_require): # Check the type; if a dictionary call our custom requires decorator is_dict = isinstance(tasks_to_require[0], dict) if is_dict: return dict_requires(*tasks_to_require) return luigi_requires(*tasks_to_require)
[docs]class Workflow(object): """ The class is used to orchestrate tasks and define a task pipeline """ def __init__(self, task=None, params=None, path=None, env=None): # Task List Check if isinstance(task, (list,)): raise Exception('Task lists should be passed to the flow.run function.') # Set Env if path is not None and env is not None: path = str(path) + f"/env={env}" # Will overide other tasks with this task's main path elif env is not None: path = getattr(task, 'path', d6tflow.settings.dirpath) path = str(path) + f"/env={env}" # Set Params self.params = {} if params is None else params self.params = self.params if path is None else dict(**self.params, **{'path': path}) # Add flows to params self.params = dict(**self.params, **{'flows': {}}) # Default Task self.default_task = task # If Task is set, Try to send Flow path to all other tasks if task: # Attach to tasks if not isinstance(task, (list,)): task = [task] self._attach_to_tasks(task, path=path)
[docs] def preview(self, tasks=None, indent='', last=True, show_params=True, clip_params=False): """ Preview task flows with the workflow parameters Args: tasks (class, list): task class or list of tasks class """ if not isinstance(tasks, (list,)): tasks = [tasks] tasks_inst = [self.get_task(x) for x in tasks] return preview(tasks=tasks_inst, indent=indent, last=last, show_params=show_params, clip_params=clip_params)
[docs] def run(self, tasks=None, forced=None, forced_all=False, forced_all_upstream=False, confirm=False, workers=1, abort=True, execution_summary=None, **kwargs): """ Run tasks with the workflow parameters. See luigi.build for additional details Args: tasks (class, list): task class or list of tasks class forced (list): list of forced tasks forced_all (bool): force all tasks forced_all_upstream (bool): force all tasks including upstream confirm (list): confirm invalidating tasks workers (int): number of workers abort (bool): on errors raise exception execution_summary (bool): print execution summary kwargs: keywords to pass to luigi.build """ if not isinstance(tasks, (list,)): tasks = [tasks] tasks_inst = [self.get_task(x) for x in tasks] # Before Running if Path/Flow Param is set, Set it to all other tasks path_param = None flow_param = None if 'path' in self.params.keys(): path_param = self.params['path'] if self.params['flows']: flow_param = self.params['flows'] # Attach to tasks self._attach_to_tasks(tasks, flows=flow_param, path=path_param) return run(tasks_inst, forced=forced, forced_all=forced_all, forced_all_upstream=forced_all_upstream, confirm=confirm, workers=workers, abort=abort, execution_summary=execution_summary, **kwargs)
[docs] def outputLoad(self, task=None, keys=None, as_dict=False, cached=False): """ Load output from task with the workflow parameters Args: task (class): task class keys (list): list of data to load as_dict (bool): cache data in memory cached (bool): cache data in memory Returns: list or dict of all task output """ return self.get_task(task).outputLoad(keys=keys, as_dict=as_dict, cached=cached)
[docs] def outputPath(self, task=None): """ Ouputs the Path given a task Args: task (class): task class Returns: list or dict of all task paths """ # Get Output output = self.get_task(task).output() # If Output is Dict, we have multiple outputs if type(output) is dict: # Get Paths for output_name, output_target in output.items(): output[output_name] = output_target.path return output else: return output.path
[docs] def complete(self, task=None): return self.get_task(task).complete()
[docs] def output(self, task=None): return self.get_task(task).output()
[docs] def outputLoadMeta(self, task=None): return self.get_task(task).outputLoadMeta()
[docs] def outputLoadAll(self, task=None, keys=None, as_dict=False, cached=False): """ Load all output from task with the workflow parameters Args: task (class): task class keys (list): list of data to load as_dict (bool): cache data in memory cached (bool): cache data in memory Returns: list or dict of all task output """ task_inst = self.get_task(task) data_dict = {} tasks = taskflow_upstream(task_inst) for task in tasks: data_dict[type(task).__name__] = task.outputLoad(keys=keys, as_dict=as_dict, cached=cached) return data_dict
[docs] def reset(self, task=None, confirm=False): task_inst = self.get_task(task) return task_inst.reset(confirm)
[docs] def reset_downstream(self, task, task_downstream=None, confirm=False): """ Invalidate all downstream tasks in a flow. For example, you have 3 dependant tasks. Normally you run Task3 but you've changed parameters for Task1. By invalidating Task3 it will check the full DAG and realize Task1 needs to be invalidated and therefore Task2 and Task3 also. Args: task (obj): task to invalidate. This should be an downstream task for which you want to check downstream dependencies for invalidation conditions task_downstream (obj): downstream task target confirm (bool): confirm operation """ task_inst = self.get_task(task) task_downstream_inst = self.get_task(task_downstream) return invalidate_downstream(task_inst, task_downstream_inst, confirm)
[docs] def reset_upstream(self, task, confirm=False): task_inst = self.get_task(task) return invalidate_upstream(task_inst, confirm)
[docs] def set_default(self, task): """ Set default task for the workflow object Args: task(obj) The task to be set as a default task """ self.default_task = task
[docs] def get_task(self, task=None): """ Get task with the workflow parameters Args: task(class) Retuns: An instance of task class with the workflow parameters """ if task is None: if self.default_task is None: raise RuntimeError('no default tasks set') else: task = self.default_task return task(**self.params)
# Add a Flow to the Params of the Workflow
[docs] def attach_flow(self, flow=None, flow_name="flow"): if self.params['flows']: self.params['flows'][flow_name] = flow else: self.params['flows'] = {flow_name: flow}
# Attach Flow/Path to the Tasks def _attach_to_tasks(self, tasks, flows=None, path=None): # If Both not set if not flows and not path: return # Get all paths for t_task in tasks: task_inst = self.get_task(t_task) tasks = taskflow_upstream(task_inst) # Overide param of all tasks for temp_task in tasks: if flows: temp_task.flows = self.params['flows'] if path: temp_task.path = self.params['path']
[docs]class WorkflowMulti(object): """ A multi experiment workflow can be defined with multiple flows and separate parameters for each flow and a default task. It is mandatory to define the flows and parameters for each of the flows. """ def __init__(self, task=None, params=None, path=None, env=None): # Task List Check if isinstance(task, (list,)): raise Exception('Task lists should be passed to the flow.run function.') self.params = params if params is not None and type(params) not in [dict, list]: raise Exception("Params has to be a dictionary with key defining the flow name or a list") if type(params) == dict: if type(list(params.values())[0]) == list: self.params = d6tflow.utils.generate_exps_for_multi_param(params) if type(params) == list: params = {i: v for i, v in enumerate(params)} self.params = params if params is None or len(params.keys()) == 0: raise Exception("Need to pass task parameters or use d6tflow.Workflow") self.default_task = task if params is not None: self.workflow_objs = {k: Workflow(task=task, params=v, path=path, env=env) for k, v in self.params.items()}
[docs] def run(self, tasks=None, flow=None, forced=None, forced_all=False, forced_all_upstream=False, confirm=False, workers=1, abort=True, execution_summary=None, **kwargs): """ Run tasks with the workflow parameters for a flow. See luigi.build for additional details Args: flow (string): The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run tasks (class, list): task class or list of tasks class forced (list): list of forced tasks forced_all (bool): force all tasks forced_all_upstream (bool): force all tasks including upstream confirm (list): confirm invalidating tasks workers (int): number of workers abort (bool): on errors raise exception execution_summary (bool): print execution summary kwargs: keywords to pass to luigi.build """ if flow is not None: return self.workflow_objs[flow].run(tasks=tasks, forced=forced, forced_all=forced_all, forced_all_upstream=forced_all_upstream, confirm=confirm, workers=workers, abort=abort, execution_summary=execution_summary, **kwargs) result = {} for exp_name in self.params.keys(): result[exp_name] = self.workflow_objs[exp_name].run(tasks, forced, forced_all, forced_all_upstream, confirm, workers, abort, execution_summary, **kwargs) return result
[docs] def outputLoad(self, task=None, flow=None, keys=None, as_dict=False, cached=False): """ Load output from task with the workflow parameters for a flow Args: flow (string): The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run task (class): task class keys (list): list of data to load as_dict (bool): cache data in memory cached (bool): cache data in memory Returns: list or dict of all task output """ if flow is not None: return self.workflow_objs[flow].outputLoad(task, keys, as_dict, cached) data = {} for exp_name in self.params.keys(): data[exp_name] = self.workflow_objs[exp_name].outputLoad(task, keys, as_dict, cached) return data
[docs] def outputPath(self, task=None, flow=None): """ Ouputs the Path given a task Args: task (class): task class flow (string): The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run Returns: list or dict of all task paths """ if flow is not None: return self.workflow_objs[flow].outputPath(task) data = {} for exp_name in self.params.keys(): data[exp_name] = self.workflow_objs[exp_name].outputPath(task) return data
[docs] def outputLoadMeta(self, task=None, flow=None): if flow is not None: return self.workflow_objs[flow].outputLoadMeta(task) data = {} for exp_name in self.params.keys(): data[exp_name] = self.workflow_objs[exp_name].outputLoadMeta(task) return data
[docs] def outputLoadAll(self, task=None, flow=None, keys=None, as_dict=False, cached=False): """ Load all output from task with the workflow parameters for a flow Args: flow (string): The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run task (class): task class keys (list): list of data to load as_dict (bool): cache data in memory cached (bool): cache data in memory Returns: list or dict of all task output """ if flow is not None: return self.workflow_objs[flow].outputLoadAll(task, keys, as_dict, cached) data = {} for exp_name in self.params.keys(): data[exp_name] = self.workflow_objs[exp_name].outputLoadAll(task, keys, as_dict, cached) return data
[docs] def reset(self, task=None, flow=None, confirm=False): if flow is not None: return self.workflow_objs[flow].reset(task, confirm) return {self.workflow_objs[exp_name].reset(task, confirm) for exp_name in self.params.keys()}
[docs] def reset_downstream(self, task=None, flow=None, confirm=False): if flow is not None: return self.workflow_objs[flow].reset(task, confirm) return {self.workflow_objs[exp_name].reset_downstream(task, confirm=confirm) for exp_name in self.params.keys()}
[docs] def reset_upstream(self, task=None, flow=None, confirm=False): if flow is not None: return self.workflow_objs[flow].reset(task, confirm) return {self.workflow_objs[exp_name].reset_upstream(task, confirm) for exp_name in self.params.keys()}
[docs] def preview(self, tasks=None, flow=None, indent='', last=True, show_params=True, clip_params=False): """ Preview task flows with the workflow parameters for a flow Args: flow (string): The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run tasks (class, list): task class or list of tasks class """ if not isinstance(tasks, (list,)): tasks = [tasks] if flow is not None: return self.workflow_objs[flow].preview(tasks) data = {} for exp_name in self.params.keys(): data[exp_name] = self.workflow_objs[exp_name].preview(tasks=tasks, indent=indent, last=last, show_params=show_params, clip_params=clip_params) return data
[docs] def set_default(self, task): """ Set default task for the workflow. The default task is set for all the experiments Args: task(obj) The task to be set as a default task """ self.default_task = task for exp_name in self.params.keys(): self.workflow_objs[exp_name].set_default(task)
[docs] def get_task(self, task=None, flow=None): """ Get task with the workflow parameters for a flow Args: flow (string): The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run task(class): task class Retuns: An instance of task class with the workflow parameters """ if task is None: if self.default_task is None: raise RuntimeError('no default tasks set') else: task = self.default_task if flow is None: return {exp_name: self.workflow_objs[exp_name].get_task(task) for exp_name in self.params.keys()} return self.workflow_objs[flow].get_task(task)
[docs] def get_flow(self, flow): """ Get flow by name Args: flow (string): The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run Retuns: An instance of Workflow """ return self.workflow_objs[flow]
from jinja2 import Template
[docs]class FlowExport(object): """ Auto generate task files to quickly share workflows with others using d6tpipe. Args: tasks (obj): task or list of tasks to share flows (obj): flow or list of flows to get tasks from. save (bool): save to tasks file path_export (str): filename for tasks to export. """ def __init__(self, tasks=None, flows=None, save=False, path_export='tasks_d6tpipe.py'): tasks = [] if tasks is None else tasks flows = [] if flows is None else flows if not isinstance(tasks, (list,)): tasks = [tasks] if not isinstance(flows, (list,)): flows = [flows] for flow in flows: task_inst = flow.get_task() t_tasks = taskflow_upstream(task_inst) for task in t_tasks: tasks.append(task) self.tasks = tasks self.save = save self.path_export = path_export # file templates self.tmpl_tasks = ''' import d6tflow import luigi import datetime {% for task in tasks -%} class {{task.name}}({{task.class}}): external=True persist={{task.obj.persist}} {% if task.path -%} path="{{task.path}}" {% endif -%} {% if task.obj.pipename -%} pipename={{task.obj.pipename}} {% endif -%} {% if task.obj.task_group -%} task_group="{{task.obj.task_group}}" {% endif -%} {% for param in task.params -%} {{param.name}}={{param.class}}(default={{param.value}}) {% endfor %} {% endfor %} '''
[docs] def generate(self): """ Generate output files """ tasksPrint = [] for task in self.tasks: if getattr(task, 'export', True): class_ = next(c for c in type(task).__mro__ if 'd6tflow.tasks.' in str(c)) # type(task).__mro__[1] # Get Path task_path = getattr(task, 'path', None) task_path = Path(task_path) if task_path else None # Create Dict taskPrint = {'name': task.__class__.__name__, 'class': class_.__module__ + "." + class_.__name__, 'obj': task, 'persist': task.persist, 'path': task_path, 'params': [{'name': param[0], 'class': f'{param[1].__class__.__module__}.{param[1].__class__.__name__}', 'value': repr(getattr(task,param[0]))} for param in task.get_params()]} # param[1]._default tasksPrint.append(taskPrint) tasksPrint[-1], tasksPrint[0:-1] = tasksPrint[0], tasksPrint[1:] # Print or Save to File if not self.save: print(Template(self.tmpl_tasks).render(tasks=tasksPrint)) else: # Write Tasks with open(self.path_export, 'w') as fh: fh.write(Template(self.tmpl_tasks).render(tasks=tasksPrint))