Source code for d6tflow.functional

import d6tflow.tasks as tasks
import d6tflow
from functools import wraps
import pathlib
import d6tcollect


[docs]class Workflow: """ Functional Flow class that acts as a manager of all flow steps. Defines all the decorators that can be used on flow functions. """ def __init__(self): self.funcs_to_run = [] self.steps = {} self.instantiated_tasks = {} self.object_count = 0 self.multi_params = None self.multi_params_tasks = {} self.params_used = {} def common_decorator(func): """ Common decorator that all decorators use. """ # Checking if the function that is decorated is the function that we want to run. # If so then we set the function as the run function for the current task class. # Also we are changing the name of the task class to the function name. if not '__wrapped__' in func.__dict__: self.steps[func.__name__] = self.steps[self.current_step] del self.steps[self.current_step] self.steps[func.__name__].__name__ = func.__name__ setattr(self.steps[func.__name__], 'run', func) # Thanks to wraps, wrapper has all the metadata of func. @wraps(func) def wrapper(*args, **kwargs): func(*args, **kwargs) return wrapper self.common_decorator = common_decorator @d6tcollect._collectClass def task(self, task_type: d6tflow.tasks.TaskData): """ Flow step decorator. Converts the decorated function into a flow step. Should be defined at the top of the decorator stack. Parameters ---------- task_type : d6ftflow.tasks.TaskData task_type should be a class which inherits from d6ftflow.tasks.TaskData Example ------- @flow.step(d6tflow.tasks.TaskCache) """ assert task_type.__base__ == tasks.TaskData, "Invalid parameter. Parameter should be type defined in d6tflow.tasks" self.task_type = task_type step_name = str(self.object_count) self.steps[step_name] = type(step_name, (task_type,), {}) self.current_step = step_name self.object_count += 1 return self.common_decorator @d6tcollect._collectClass def requires(self, *args, **kwargs): """ Flow requires decorator. Defines dependencies between flow steps. Internally calls d6tflow.requires which inturn calls luigi.requires. Parameters ---------- func : dict or function or mutiple functions Examples -------- @flow.requires({"foo": func1, "bar": func2}) @flow.requires(func1) """ if isinstance(args[0], dict): tasks_to_require = {} for key in args[0]: tasks_to_require[key] = self.steps[args[0][key].__name__] tasks_to_require = [tasks_to_require] else: tasks_to_require = [ self.steps[func.__name__] for func in args ] self.steps[self.current_step] = d6tflow.requires( *tasks_to_require, **kwargs)(self.steps[self.current_step]) return self.common_decorator @d6tcollect._collectClass def params(self, **params): """ Flow parameters decorator. Use this to add parameter(s) to a particular flow task. Also see flow.add_global_params() to add parameters globally Parameters ---------- **params : keyword arguments of d6tflow parameters Example ------- @flow.params(multiplier=d6tflow.IntParameter(default=0)) """ for param in params: setattr(self.steps[self.current_step], param, params[param]) return self.common_decorator @d6tcollect._collectClass def persists(self, to_persist: list): """ Flow persists decorator. Takes in a list of variables that need to be persisted for the flow step. Parameters ---------- to_persist : list Example ------- @flow.persists(['a1', 'a2']) """ self.steps[self.current_step].persist = to_persist return self.common_decorator
[docs] def preview(self, func_to_preview, params: dict): func_params = params name = func_to_preview.__name__ all_params = self.params_used.get(name, None) if func_params: d6tflow.preview(self.steps[name](**func_params)) elif all_params: for params in self.params_used[name]: d6tflow.preview(self.steps[name](**params)) else: d6tflow.preview(self.steps[name]())
@d6tcollect._collectClass def run(self, funcs_to_run, params: dict = None, multi_params: dict = None, *args, **kwargs): """ Runs flow steps locally. See luigi.build for additional details Parameters ---------- funcs_to_run : function or list of functions params : dict dictionary of paramaters. Keys are param names and values are the values of params. Examples -------- flow.run(func, params={'multiplier':2}) flow.run([func1, func2], params={'multiplier':42}) flow.run(func) """ funcs_to_run = funcs_to_run if isinstance( funcs_to_run, list) else [funcs_to_run] if multi_params: self.multi_params = multi_params self.multi_params_tasks = {} for params in multi_params: for func in funcs_to_run: self._instantiate([func], params=multi_params[params]) self.multi_params_tasks[params] = self.instantiated_tasks[func.__name__] d6tflow.run( self.multi_params_tasks[params], *args, **kwargs) else: # Reset to single params mode self.multi_params = None self.multi_params_tasks = {} self._instantiate(funcs_to_run, params=params) d6tflow.run( list(self.instantiated_tasks.values()), *args, **kwargs) def _instantiate(self, funcs_to_run: list, params=None): params = params if params else {} instantiated_tasks = { func_to_run.__name__: self.steps[func_to_run.__name__](**params) for func_to_run in funcs_to_run } self.instantiated_tasks.update(instantiated_tasks) self._update_params_used(funcs_to_run, params) def _update_params_used(self, funcs, params): funcs = funcs if isinstance(funcs, list) else [funcs] for func in funcs: params_used = self.params_used.get(func.__name__, []) params_used.append(params) self.params_used[func.__name__] = params_used
[docs] def add_global_params(self, **params): """ Adds params to flow functions. More like declares the params for further use. Parameters ---------- params : dict dictionary of param name and param type Example ------- flow.add_params({'multiplier': d6tflow.IntParameter(default=0)}) """ for step in self.steps: for param in params: setattr(self.steps[step], param, params[param])
[docs] def outputLoad(self, func_to_run, *args, **kwargs): """ Loads all or several outputs from flow step. Args: func_to_run: flow step function keys (list): list of data to load as_dict (bool): cache data in memory cached (bool): cache data in memory Returns: list or dict of all task output """ if self.multi_params: output = {} for params in self.multi_params: print(self.multi_params_tasks[params]) output[params] = self.multi_params_tasks[params].outputLoad( *args, **kwargs) return output else: name = func_to_run.__name__ if name in self.instantiated_tasks: return self.instantiated_tasks[name].outputLoad(*args, **kwargs) raise RuntimeError( f"The function {name} has not been run yet! Please run the function using WorkflowObject.run()")
[docs] def outputLoadAll(self, func_to_run, *args, **kwargs): """ Loads all output from flow task and its parents. Args: func_to_run: flow step function keys (list): list of data to load as_dict (bool): cache data in memory cached (bool): cache data in memory Returns: list or dict of all task output """ if self.multi_params: output = {} for params in self.multi_params: print(self.multi_params_tasks[params]) output[params] = {} tasks = d6tflow.taskflow_upstream( self.multi_params_tasks[params]) for task in tasks: output[params][task.task_family] = task.outputLoad( *args, **kwargs) return output else: name = func_to_run.__name__ if name in self.instantiated_tasks: tasks = d6tflow.taskflow_upstream( self.instantiated_tasks[name]) return { task.task_family: task.outputLoad(*args, **kwargs) for task in tasks } raise RuntimeError( f"The function {name} has not been run yet! Please run the function using WorkflowObject.run()")
[docs] def reset(self, func_to_reset, params=None, *args, **kwargs): """Resets a particular function. Use with `params` to reset function with the given parameters. If `params` is not used, `reset(func)` will reset the function with all the parameters run thus far""" func_params = params name = func_to_reset if isinstance( func_to_reset, str) else func_to_reset.__name__ if func_params: return self.steps[name](**func_params).reset(*args, **kwargs) else: all_params = self.params_used.get(name, None) if all_params: return [ self.steps[name](**params).reset(*args, **kwargs) for params in self.params_used[name] ]
[docs] def resetAll(self, *args, **kwargs): """Resets all functions that are attached to the workflow object that have run at least once.""" for name in self.steps: self.reset(name, params=None, *args, **kwargs)
[docs] def delete(self, func_to_reset, *args, **kwargs): """Possibly dangerous! `delete(func)` will delete *all files* in the `data/func` directory of the given func. Useful if you want to delete all function related outputs. Consider using `reset(func, params)` to reset a specific func """ name = func_to_reset if isinstance( func_to_reset, str) else func_to_reset.__name__ task = self.steps[name]() path = task._getpath([]) for f in path.parent.glob('*'): f.unlink()
[docs] def deleteAll(self, *args, **kwargs): """Possibly dangerous! Will delete all files in the `data/` directory of the functions attached to the workflow object. Useful if you want to delete all outputs even the once previously run. Consider using `resetAll()` if you want to only reset the functions with params you have run thus far """ for task_cls in self.steps: task = self.steps[task_cls]() self.delete(task.task_family)