Running Tasks and Managing Workflows

A workflow object is used to orchestrate tasks and define a task pipeline.

NB: the workflow object is new preferred way of interacting with workflow. Alternatively, legacy workflow describes the old way which might help understand better how everything works.

Define a workflow object

Workflow object can be defined by passing the default task and the parameters for the pipeline. Both the arguments are optional.

flow = d6tflow.Workflow(Task1, params)
flow = d6tflow.Workflow(Task1) # use default params

Note you want to pass the task definition, not an instantiated task.

import tasks
flow = d6tflow.Workflow(tasks.Task1) # yes
flow = d6tflow.Workflow(tasks.Task1()) # no

Previewing Task Execution Status

Running a task will automatically run all the upstream dependencies. Before running a workflow, you can preview which tasks will be run.

flow.preview() # default task
flow.preview(TaskTrain) # single task
flow.preview([TaskPreprocess,TaskTrain]) # multiple tasks

Running Multiple Tasks as Workflows

To run all tasks in a workflow, run the downstream task you want to complete. It will check if all the upstream dependencies are complete and if not it will run them intelligently for you.

flow.run() # default task
flow.run(TaskTrain) # single task
flow.run([TaskPreprocess,TaskTrain]) # multiple tasks

If your tasks are already complete, they will not rerun. To force rerunning of all tasks but there are better alternatives, see below.

flow.run(forced_all_upstream=True, confirm=False) # use flow.reset() instead

How is a task marked complete?

Tasks are complete when task output exists. This is typically the existance of a file, database table or cache. See Task I/O Formats how task output is stored to understand what needs to exist for a task to be complete.

flow.get_task().complete() # status
flow.get_task().output().path # where is output saved?
flow.get_task().output()['output1'].path # multiple outputs

Task Completion with Parameters

If a task has parameters, it needs to be run separately for each parameter to be complete when using different parameter settings. The d6tflow.WorkflowMulti helps you do that

flow = d6tflow.WorkflowMult(Task1, {'flow1':{'preprocess':False},'flow2':{'preprocess':True}})
flow.run() # will run all flow with all parameters

Disable Dependency Checks

By default, for a task to be complete, it checks if all dependencies are complete also, not just the task itself. To check if just the task is complete without checking dependencies, set d6tflow.settings.check_dependencies=False

flow.reset(TaskGetData, confirm=False)
d6tflow.settings.check_dependencies=True # default
flow.preview() # TaskGetData is pending so all tasks are pending
'''
└─--[TaskTrain-{'do_preprocess': 'True'} (PENDING)]
   └─--[TaskPreprocess-{'do_preprocess': 'True'} (PENDING)]
      └─--[TaskGetData-{} (PENDING)]
'''
d6tflow.settings.check_dependencies=False # deactivate dependency checks
flow.preview()
└─--[TaskTrain-{'do_preprocess': 'True'} (COMPLETE)]
   └─--[TaskPreprocess-{'do_preprocess': 'True'} (COMPLETE)]
      └─--[TaskGetData-{} (PENDING)]
d6tflow.settings.check_dependencies=True # set to default

Debugging Failures

If a task fails, it will show the stack trace. You need to look further up in the stack trace to find the line that caused the error. You can also set breakpoints in the task obviously.

File "tasks.py", line 37, in run => error is here
    1/0
ZeroDivisionError: division by zero

[...] => look further up to find error

===== d6tflow Execution Summary =====
Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 TaskPreprocess(do_preprocess=True)
* 1 failed:
    - 1 TaskTrain(do_preprocess=True)
This progress looks :( because there were failed tasks
===== d6tflow Execution Summary =====

 File
     raise RuntimeError('Exception found running flow, check trace')
RuntimeError: Exception found running flow, check trace

=> look further up to find error

Rerun Tasks When You Make Changes

You have several options to force tasks to reset and rerun. See sections below on how to handle parameter, data and code changes.

# preferred way: reset single task, this will automatically run all upstream dependencies
flow.reset(TaskGetData, confirm=False) # remove confirm=False to avoid accidentally deleting data

# force execution including upstream tasks
flow.run([TaskTrain()],forced_all=True, confirm=False)

# force run everything
flow.run(forced_all_upstream=True, confirm=False)

When to reset and rerun tasks?

Typically you want to reset and rerun tasks when:

  • parameters changed
  • data changed
  • code changed

Handling Parameter Change

As long as the parameter is defined in the task, d6tflow will automatically rerun tasks with different parameters.

flow = d6tflow.WorkflowMult(Task1, {'flow1':{'preprocess':False},'flow2':{'preprocess':True}})
flow.run() # executes 2 flows, one for each task

For d6tflow to intelligently figure out which tasks to rerun, the parameter has to be defined in the task. The downstream task (TaskTrain) has to pass on the parameter to the upstream task (TaskPreprocess).

class TaskGetData(d6tflow.tasks.TaskPqPandas):
# no parameter dependence

class TaskPreprocess(d6tflow.tasks.TaskCachePandas):  # save data in memory
    do_preprocess = d6tflow.BoolParameter(default=True) # parameter for preprocessing yes/no

@d6tflow.requires(TaskPreprocess)
class TaskTrain(d6tflow.tasks.TaskPickle):
    # pass parameter upstream
    # no need for to define it again: do_preprocess = d6tflow.BoolParameter(default=True)

See [d6tflow docs for handling parameter inheritance](https://d6tflow.readthedocs.io/en/stable/api/d6tflow.util.html#using-inherits-and-requires-to-ease-parameter-pain)

Default Parameter Values in Config

As an alternative to inheriting parameters, you can define defaults in a config files. When you change the config it will automatically rerun tasks.

class TaskPreprocess(d6tflow.tasks.TaskCachePandas):
    do_preprocess = d6tflow.BoolParameter(default=cfg.do_preprocess) # store default in config

Handling Data Change

Premium feature, request access at https://pipe.databolt.tech/gui/request-premium/. You can manually reset tasks if you know your data has changed.

Handling Code Change

Premium feature, request access at https://pipe.databolt.tech/gui/request-premium/. You can manually reset tasks if you know your code has changed.

Forcing a Single Task to Run

You can always run single tasks by calling the run() function. This is useful during debugging. However, this will only run this one task and not take care of any downstream dependencies.

# forcing execution
flow.get_task().run()
# or
TaskTrain().run()

Hiding Execution Output

By default, the workflow execution summary is shown, because it shows important information which tasks were run and if any failed. At times, eg during deployment, it can be desirable to not show the execution output.

d6tflow.settings.execution_summary = False # global
# or
flow.run(execution_summary=False) # at each run

While typically not necessary, you can control change the log level to see additional log data. Default is WARNING. It is a global setting, modify before you execute d6tflow.run().

d6tflow.settings.log_level = 'WARNING' # 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'