d6tflow package

Submodules

Module contents

class d6tflow.FlowExport(tasks=None, flows=None, save=False, path_export='tasks_d6tpipe.py')[source]

Bases: object

Auto generate task files to quickly share workflows with others using d6tpipe.

Parameters:
  • tasks (obj) – task or list of tasks to share
  • flows (obj) – flow or list of flows to get tasks from.
  • save (bool) – save to tasks file
  • path_export (str) – filename for tasks to export.
generate()[source]

Generate output files

class d6tflow.Workflow(task=None, params=None, path=None, env=None)[source]

Bases: object

The class is used to orchestrate tasks and define a task pipeline

attach_flow(flow=None, flow_name='flow')[source]
complete(task=None)[source]
get_task(task=None)[source]

Get task with the workflow parameters

Parameters:task (class) –

Retuns: An instance of task class with the workflow parameters

output(task=None)[source]
outputLoad(task=None, keys=None, as_dict=False, cached=False)[source]

Load output from task with the workflow parameters

Parameters:
  • task (class) – task class
  • keys (list) – list of data to load
  • as_dict (bool) – cache data in memory
  • cached (bool) – cache data in memory

Returns: list or dict of all task output

outputLoadAll(task=None, keys=None, as_dict=False, cached=False)[source]

Load all output from task with the workflow parameters

Parameters:
  • task (class) – task class
  • keys (list) – list of data to load
  • as_dict (bool) – cache data in memory
  • cached (bool) – cache data in memory

Returns: list or dict of all task output

outputLoadMeta(task=None)[source]
outputPath(task=None)[source]

Ouputs the Path given a task

Parameters:task (class) – task class

Returns: list or dict of all task paths

preview(tasks=None, indent='', last=True, show_params=True, clip_params=False)[source]

Preview task flows with the workflow parameters

Parameters:tasks (class, list) – task class or list of tasks class
reset(task=None, confirm=False)[source]
reset_downstream(task, task_downstream=None, confirm=False)[source]

Invalidate all downstream tasks in a flow.

For example, you have 3 dependant tasks. Normally you run Task3 but you’ve changed parameters for Task1. By invalidating Task3 it will check the full DAG and realize Task1 needs to be invalidated and therefore Task2 and Task3 also.

Parameters:
  • task (obj) – task to invalidate. This should be an downstream task for which you want to check downstream dependencies for invalidation conditions
  • task_downstream (obj) – downstream task target
  • confirm (bool) – confirm operation
reset_upstream(task, confirm=False)[source]
run(tasks=None, forced=None, forced_all=False, forced_all_upstream=False, confirm=False, workers=1, abort=True, execution_summary=None, **kwargs)[source]

Run tasks with the workflow parameters. See luigi.build for additional details

Parameters:
  • tasks (class, list) – task class or list of tasks class
  • forced (list) – list of forced tasks
  • forced_all (bool) – force all tasks
  • forced_all_upstream (bool) – force all tasks including upstream
  • confirm (list) – confirm invalidating tasks
  • workers (int) – number of workers
  • abort (bool) – on errors raise exception
  • execution_summary (bool) – print execution summary
  • kwargs – keywords to pass to luigi.build
set_default(task)[source]

Set default task for the workflow object

Parameters:task (obj) –
class d6tflow.WorkflowMulti(task=None, params=None, path=None, env=None)[source]

Bases: object

A multi experiment workflow can be defined with multiple flows and separate parameters for each flow and a default task. It is mandatory to define the flows and parameters for each of the flows.

get_flow(flow)[source]

Get flow by name

Parameters:flow (string) – The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run

Retuns: An instance of Workflow

get_task(task=None, flow=None)[source]

Get task with the workflow parameters for a flow

Parameters:
  • flow (string) – The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
  • task (class) – task class

Retuns: An instance of task class with the workflow parameters

outputLoad(task=None, flow=None, keys=None, as_dict=False, cached=False)[source]

Load output from task with the workflow parameters for a flow

Parameters:
  • flow (string) – The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
  • task (class) – task class
  • keys (list) – list of data to load
  • as_dict (bool) – cache data in memory
  • cached (bool) – cache data in memory

Returns: list or dict of all task output

outputLoadAll(task=None, flow=None, keys=None, as_dict=False, cached=False)[source]

Load all output from task with the workflow parameters for a flow

Parameters:
  • flow (string) – The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
  • task (class) – task class
  • keys (list) – list of data to load
  • as_dict (bool) – cache data in memory
  • cached (bool) – cache data in memory

Returns: list or dict of all task output

outputLoadMeta(task=None, flow=None)[source]
outputPath(task=None, flow=None)[source]

Ouputs the Path given a task

Parameters:
  • task (class) – task class
  • flow (string) – The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run

Returns: list or dict of all task paths

preview(tasks=None, flow=None, indent='', last=True, show_params=True, clip_params=False)[source]

Preview task flows with the workflow parameters for a flow

Parameters:
  • flow (string) – The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
  • tasks (class, list) – task class or list of tasks class
reset(task=None, flow=None, confirm=False)[source]
reset_downstream(task=None, flow=None, confirm=False)[source]
reset_upstream(task=None, flow=None, confirm=False)[source]
run(tasks=None, flow=None, forced=None, forced_all=False, forced_all_upstream=False, confirm=False, workers=1, abort=True, execution_summary=None, **kwargs)[source]

Run tasks with the workflow parameters for a flow. See luigi.build for additional details

Parameters:
  • flow (string) – The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
  • tasks (class, list) – task class or list of tasks class
  • forced (list) – list of forced tasks
  • forced_all (bool) – force all tasks
  • forced_all_upstream (bool) – force all tasks including upstream
  • confirm (list) – confirm invalidating tasks
  • workers (int) – number of workers
  • abort (bool) – on errors raise exception
  • execution_summary (bool) – print execution summary
  • kwargs – keywords to pass to luigi.build
set_default(task)[source]

Set default task for the workflow. The default task is set for all the experiments

Parameters:task (obj) –
d6tflow.clone_parent(cls)[source]
class d6tflow.dict_inherits(*tasks_to_inherit)[source]

Bases: object

class d6tflow.dict_requires(*tasks_to_require)[source]

Bases: object

d6tflow.inherits(*tasks_to_inherit)[source]
d6tflow.invalidate_all(confirm=False)[source]

Invalidate all tasks by deleting all files in data directory

Parameters:confirm (bool) – confirm operation
d6tflow.invalidate_downstream(task, task_downstream, confirm=False)[source]

Invalidate all downstream tasks in a flow.

For example, you have 3 dependant tasks. Normally you run Task3 but you’ve changed parameters for Task1. By invalidating Task3 it will check the full DAG and realize Task1 needs to be invalidated and therefore Task2 and Task3 also.

Parameters:
  • task (obj) – task to invalidate. This should be an downstream task for which you want to check downstream dependencies for invalidation conditions
  • task_downstream (obj) – downstream task target
  • confirm (bool) – confirm operation
d6tflow.invalidate_orphans(confirm=False)[source]

Invalidate all unused task outputs

Parameters:confirm (bool) – confirm operation
d6tflow.invalidate_upstream(task, confirm=False)[source]

Invalidate all tasks upstream tasks in a flow.

For example, you have 3 dependant tasks. Normally you run Task3 but you’ve changed parameters for Task1. By invalidating Task3 it will check the full DAG and realize Task1 needs to be invalidated and therefore Task2 and Task3 also.

Parameters:
  • task (obj) – task to invalidate. This should be an upstream task for which you want to check upstream dependencies for invalidation conditions
  • confirm (bool) – confirm operation
d6tflow.requires(*tasks_to_require)[source]
d6tflow.set_dir(dir=None)[source]

Initialize d6tflow

Parameters:dir (str) – data output directory
d6tflow.show(task)[source]

Show task execution status

Parameters:tasks (obj, list) – task or list of tasks
d6tflow.taskflow_downstream(task, task_downstream, only_complete=False)[source]

Get all downstream outputs for a task

Parameters:
  • task (obj) – task
  • task_downstream (obj) – downstream target task
d6tflow.taskflow_upstream(task, only_complete=False)[source]

Get all upstream inputs for a task

Parameters:task (obj) – task

d6tflow.tasks module

class d6tflow.tasks.TaskAggregator(*args, **kwargs)[source]

Bases: luigi.task.Task

Task which yields other tasks

NB: Use this function by implementing run() which should do nothing but yield other tasks

example:

class TaskCollector(d6tflow.tasks.TaskAggregator):
    def run(self):
        yield Task1()
        yield Task2()
complete(cascade=True)[source]

If the task has any outputs, return True if all outputs exist. Otherwise, return False.

However, you may freely override this method with custom logic.

invalidate(confirm=True)[source]
output()[source]

The output that this Task produces.

The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single Target or a list of Target instances.

Implementation note
If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.

See Task.output

outputLoad(keys=None, as_dict=False, cached=False)[source]
reset(confirm=True)[source]
class d6tflow.tasks.TaskCSVGZPandas(*args, path=None, flows=None, **kwargs)[source]

Bases: d6tflow.tasks.TaskData

Task which saves to CSV

target_class

alias of d6tflow.targets.CSVGZPandasTarget

target_ext = 'csv.gz'
class d6tflow.tasks.TaskCSVPandas(*args, path=None, flows=None, **kwargs)[source]

Bases: d6tflow.tasks.TaskData

Task which saves to CSV

target_class

alias of d6tflow.targets.CSVPandasTarget

target_ext = 'csv'
class d6tflow.tasks.TaskCache(*args, path=None, flows=None, **kwargs)[source]

Bases: d6tflow.tasks.TaskData

Task which saves to cache

target_class

alias of d6tflow.targets.CacheTarget

target_ext = 'cache'
class d6tflow.tasks.TaskCachePandas(*args, path=None, flows=None, **kwargs)[source]

Bases: d6tflow.tasks.TaskData

Task which saves to cache pandas dataframes

target_class

alias of d6tflow.targets.PdCacheTarget

target_ext = 'cache'
class d6tflow.tasks.TaskData(*args, path=None, flows=None, **kwargs)[source]

Bases: luigi.task.Task

Task which has data as input and output

Parameters:
  • target_class (obj) – target data format
  • target_ext (str) – file extension
  • persist (list) – list of string to identify data
  • data (dict) – data container for all outputs
complete(*args, **kwargs)
classmethod get_param_values(params, args, kwargs)[source]

Get the values of the parameters from the args and kwargs.

Parameters:
  • params – list of (param_name, Parameter).
  • args – positional arguments
  • kwargs – keyword arguments.
Returns:

list of (name, value) tuples, one for each parameter.

get_pipe()[source]

Get associated pipe object

get_pipename(*args, **kwargs)
inputLoad(keys=None, task=None, cached=False, as_dict=False)[source]

Load all or several outputs from task

Parameters:
  • keys (list) – list of data to load
  • task (str) – if requires multiple tasks load that task ‘input1’ for eg def requires: {‘input1’:Task1(), ‘input2’:Task2()}
  • cached (bool) – cache data in memory
  • as_dict (bool) – if the inputs were saved as a dictionary. use this to return them as dictionary.

Returns: list or dict of all task output

invalidate(confirm=True)[source]

Reset a task, eg by deleting output file

metaLoad()[source]
metaSave(data)[source]
metadata = None
output()[source]

Similar to luigi task output

outputLoad(keys=None, as_dict=False, cached=False)[source]

Load all or several outputs from task

Parameters:
  • keys (list) – list of data to load
  • as_dict (bool) – cache data in memory
  • cached (bool) – cache data in memory

Returns: list or dict of all task output

outputLoadAllMeta()[source]
outputLoadMeta()[source]
persist = ['data']
pull(**kwargs)[source]

Pull files from data repo

pull_preview(**kwargs)[source]

Preview pull files from data repo

push(**kwargs)[source]

Push files to data repo

push_preview(**kwargs)[source]

Preview push files to data repo

reset(confirm=True)[source]

Reset a task, eg by deleting output file

save(data, **kwargs)[source]

Persist data to target

Parameters:data (dict) – data to save. keys are the self.persist keys and values is data
saveMeta(data)[source]
target_class

alias of d6tflow.targets.DataTarget

target_ext = 'ext'
class d6tflow.tasks.TaskExcelPandas(*args, path=None, flows=None, **kwargs)[source]

Bases: d6tflow.tasks.TaskData

Task which saves to Excel

target_class

alias of d6tflow.targets.ExcelPandasTarget

target_ext = 'xlsx'
class d6tflow.tasks.TaskJson(*args, path=None, flows=None, **kwargs)[source]

Bases: d6tflow.tasks.TaskData

Task which saves to json

target_class

alias of d6tflow.targets.JsonTarget

target_ext = 'json'
class d6tflow.tasks.TaskPickle(*args, path=None, flows=None, **kwargs)[source]

Bases: d6tflow.tasks.TaskData

Task which saves to pickle

target_class

alias of d6tflow.targets.PickleTarget

target_ext = 'pkl'
class d6tflow.tasks.TaskPqPandas(*args, path=None, flows=None, **kwargs)[source]

Bases: d6tflow.tasks.TaskData

Task which saves to parquet

target_class

alias of d6tflow.targets.PqPandasTarget

target_ext = 'parquet'

d6tflow.targets module

class d6tflow.targets.CSVGZPandasTarget(path=None)[source]

Bases: d6tflow.targets.CSVPandasTarget

Saves to CSV gzip, loads to pandas dataframe

save(*args, **kwargs)
class d6tflow.targets.CSVPandasTarget(path=None)[source]

Bases: d6tflow.targets.DataTarget

Saves to CSV, loads to pandas dataframe

load(cached=False, **kwargs)[source]

Load from csv to pandas dataframe

Parameters:
  • cached (bool) – keep data cached in memory
  • **kwargs – arguments to pass to pd.read_csv

Returns: pandas dataframe

save(*args, **kwargs)
class d6tflow.targets.CacheTarget(path=None, format=None, is_tmp=False)[source]

Bases: luigi.local_target.LocalTarget

Saves to in-memory cache, loads to python object

exists()[source]

Returns True if the path for this FileSystemTarget exists; False otherwise.

This method is implemented by using fs.

invalidate()[source]
load(cached=True)[source]

Load from in-memory cache

Returns: python object

save(*args, **kwargs)
class d6tflow.targets.DataTarget(path=None)[source]

Bases: d6tflow.targets._LocalPathTarget

Local target which saves in-memory data (eg dataframes) to persistent storage (eg files) and loads from storage to memory

This is an abstract class that you should extend.

load(fun, cached=False, **kwargs)[source]

Runs a function to load data from storage into memory

Parameters:
  • fun (function) – loading function
  • cached (bool) – keep data cached in memory
  • **kwargs – arguments to pass to fun

Returns: data object

save(df, fun, **kwargs)[source]

Runs a function to save data from memory into storage

Parameters:
  • df (obj) – data to save
  • fun (function) – saving function
  • **kwargs – arguments to pass to fun

Returns: filename

class d6tflow.targets.ExcelPandasTarget(path=None)[source]

Bases: d6tflow.targets.DataTarget

Saves to Excel, loads to pandas dataframe

load(cached=False, **kwargs)[source]

Load from Excel to pandas dataframe

Parameters:
  • cached (bool) – keep data cached in memory
  • **kwargs – arguments to pass to pd.read_csv

Returns: pandas dataframe

save(*args, **kwargs)
class d6tflow.targets.JsonTarget(path=None)[source]

Bases: d6tflow.targets.DataTarget

Saves to json, loads to dict

load(cached=False, **kwargs)[source]

Load from json to dict

Parameters:
  • cached (bool) – keep data cached in memory
  • **kwargs – arguments to pass to json.load

Returns: dict

save(*args, **kwargs)
class d6tflow.targets.PdCacheTarget(path=None, format=None, is_tmp=False)[source]

Bases: d6tflow.targets.CacheTarget

class d6tflow.targets.PickleTarget(path=None)[source]

Bases: d6tflow.targets.DataTarget

Saves to pickle, loads to python obj

load(cached=False, **kwargs)[source]

Load from pickle to obj

Parameters:
  • cached (bool) – keep data cached in memory
  • **kwargs – arguments to pass to pickle.load

Returns: dict

save(*args, **kwargs)
class d6tflow.targets.PqPandasTarget(path=None)[source]

Bases: d6tflow.targets.DataTarget

Saves to parquet, loads to pandas dataframe

load(cached=False, **kwargs)[source]

Load from parquet to pandas dataframe

Parameters:
  • cached (bool) – keep data cached in memory
  • **kwargs – arguments to pass to pd.read_parquet

Returns: pandas dataframe

save(*args, **kwargs)

d6tflow.pipes module

d6tflow.pipes.all_pull(task, **kwargs)[source]

Pull for all upstream tasks in a flow

d6tflow.pipes.all_pull_preview(task, **kwargs)[source]

Pull preview for all upstream tasks in a flow

d6tflow.pipes.all_push(task, **kwargs)[source]

Push for all upstream tasks in a flow

d6tflow.pipes.all_push_preview(task, **kwargs)[source]

Push preview for all upstream tasks in a flow

d6tflow.pipes.get_dirpath(name=None)[source]

Get a pipe directory as Pathlib.Path

Parameters:name (str) – name of pipe
d6tflow.pipes.get_pipe(name=None)[source]

Get a pipe

Parameters:name (str) – name of pipe
Returns:pipe object
Return type:obj
d6tflow.pipes.init(default_pipe_name, profile=None, local_pipe=False, local_api=False, reset=False, api=None, set_dir=True, api_args=None, pipe_args=None)[source]

Initialize d6tpipe

Parameters:
  • default_pipe_name (str) – name of pipe to store results. Override by setting Task.pipe attribute
  • profile (str) – name of d6tpipe profile to get api if api not provided
  • local_pipe (bool) – use PipeLocal()
  • local_api (bool) – use APILocal()
  • reset (bool) – reset api and pipe connection
  • api (obj) – d6tpipe api object. if not provided will be loaded
  • set_dir (bool) – if True, set d6tflow directory to default pipe directory
  • api_args (dir) – arguments to pass to api
  • pipe_args (dir) – arguments to pass to pipe
d6tflow.pipes.init_pipe(name=None, **kwargs)[source]
d6tflow.pipes.set_default(name)[source]

Set default pipe. Will also change d6tflow directory

Parameters:name (str) – name of pipe

d6tflow.functional module

class d6tflow.functional.Workflow[source]

Bases: object

Functional Flow class that acts as a manager of all flow steps. Defines all the decorators that can be used on flow functions.

add_global_params(**params)[source]

Adds params to flow functions. More like declares the params for further use. :param params: dictionary of param name and param type :type params: dict

Example

flow.add_params({‘multiplier’: d6tflow.IntParameter(default=0)})

delete(func_to_reset, *args, **kwargs)[source]

Possibly dangerous! delete(func) will delete all files in the data/func directory of the given func. Useful if you want to delete all function related outputs. Consider using reset(func, params) to reset a specific func

deleteAll(*args, **kwargs)[source]

Possibly dangerous! Will delete all files in the data/ directory of the functions attached to the workflow object. Useful if you want to delete all outputs even the once previously run. Consider using resetAll() if you want to only reset the functions with params you have run thus far

outputLoad(func_to_run, *args, **kwargs)[source]

Loads all or several outputs from flow step.

Parameters:
  • func_to_run – flow step function
  • keys (list) – list of data to load
  • as_dict (bool) – cache data in memory
  • cached (bool) – cache data in memory

Returns: list or dict of all task output

outputLoadAll(func_to_run, *args, **kwargs)[source]

Loads all output from flow task and its parents.

Parameters:
  • func_to_run – flow step function
  • keys (list) – list of data to load
  • as_dict (bool) – cache data in memory
  • cached (bool) – cache data in memory

Returns: list or dict of all task output

params(*args, **kwargs)
persists(*args, **kwargs)
preview(func_to_preview, params: dict)[source]
requires(*args, **kwargs)
reset(func_to_reset, params=None, *args, **kwargs)[source]

Resets a particular function. Use with params to reset function with the given parameters. If params is not used, reset(func) will reset the function with all the parameters run thus far

resetAll(*args, **kwargs)[source]

Resets all functions that are attached to the workflow object that have run at least once.

run(*args, **kwargs)
task(*args, **kwargs)