Running Tasks and Managing Workflows¶
Previewing Task Execution Status¶
Running a task will automatically run all the upstream dependencies. Before running a workflow, you can preview which tasks will be run.
d6tflow.preview(TaskTrain()) # single task
d6tflow.preview([TaskPreprocess(),TaskTrain()]) # multiple tasks
Running Multiple Tasks as Workflows¶
To run all tasks in a workflow, run the downstream task you want to complete. It will check if all the upstream dependencies are complete and if not it will run them intelligently for you.
d6tflow.run(TaskTrain()) # single task
d6tflow.run([TaskPreprocess(),TaskTrain()]) # multiple tasks
If you get Error: luigi.worker.TaskException: Can not schedule non-task <class 'TaskTrain'>
, make sure you run an instantiated task object d6tflow.run(TaskTrain())
not just the class d6tflow.run(TaskTrain)
.
How is a task marked complete?¶
Tasks are complete when task output exists. This is typically the existance of a file, database table or cache. See Task I/O Formats how task output is stored to understand what needs to exist for a task to be complete.
TaskTrain().complete() # status
TaskTrain().output().path # where is output saved?
TaskTrain().output()['output1'].path # multiple outputs
Task Completion with Parameters¶
If a task has parameters, it needs to be run separately for each parameter to be complete when using different parameter settings.
d6tflow.run(TaskTrain()) # default param
TaskTrain().complete() # True
TaskTrain(do_preprocess).complete() # False
Disable Dependency Checks¶
By default, for a task to be complete, it checks if all dependencies are complete also, not just the task itself. To check if just the task is complete without checking dependencies, set d6tflow.settings.check_dependencies=False
TaskGetData().reset(confirm=False)
d6tflow.settings.check_dependencies=True # default
d6tflow.preview(TaskTrain()) # TaskGetData is pending so all tasks are pending
'''
└─--[TaskTrain-{'do_preprocess': 'True'} (PENDING)]
└─--[TaskPreprocess-{'do_preprocess': 'True'} (PENDING)]
└─--[TaskGetData-{} (PENDING)]
'''
d6tflow.settings.check_dependencies=False # deactivate dependency checks
d6tflow.preview(TaskTrain())
└─--[TaskTrain-{'do_preprocess': 'True'} (COMPLETE)]
└─--[TaskPreprocess-{'do_preprocess': 'True'} (COMPLETE)]
└─--[TaskGetData-{} (PENDING)]
d6tflow.settings.check_dependencies=True # set to default
Debugging Failures¶
If a task fails, it will show the stack trace. You need to look further up in the stack trace to find the line that caused the error. You can also set breakpoints in the task obviously.
File "tasks.py", line 37, in run => error is here
1/0
ZeroDivisionError: division by zero
[...] => look further up to find error
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 1 complete ones were encountered:
- 1 TaskPreprocess(do_preprocess=True)
* 1 failed:
- 1 TaskTrain(do_preprocess=True)
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====
File
raise RuntimeError('Exception found running flow, check trace')
RuntimeError: Exception found running flow, check trace
=> look further up to find error
Rerun Tasks When You Make Changes¶
You have several options to force tasks to reset and rerun. See sections below on how to handle parameter, data and code changes.
# force execution including downstream tasks
d6tflow.run([TaskTrain()],force=[TaskGetData()])
# reset single task
TaskGetData().reset()
# reset all downstream tasks
d6tflow.invalidate_downstream(TaskGetData(), TaskTrain())
# reset all upstream tasks
d6tflow.invalidate_upstream(TaskTrain())
When to reset and rerun tasks?¶
Typically you want to reset and rerun tasks when:
- parameters changed
- data changed
- code changed
Handling Parameter Change¶
As long as the parameter is defined in the task, d6tflow will automatically rerun tasks with different parameters.
d6tflow.run([TaskTrain(do_preprocess=True)]) # first experiment
d6tflow.run([TaskTrain(do_preprocess=False)]) # another experiment
For d6tflow to intelligently figure out which tasks to rerun, the parameter has to be defined in the task. The downstream task (TaskTrain) has to pass on the parameter to the upstream task (TaskPreprocess).
class TaskGetData(d6tflow.tasks.TaskPqPandas):
# no parameter dependence
class TaskPreprocess(d6tflow.tasks.TaskCachePandas): # save data in memory
do_preprocess = luigi.BoolParameter(default=True) # parameter for preprocessing yes/no
class TaskTrain(d6tflow.tasks.TaskPickle):
# pass parameter upstream
do_preprocess = luigi.BoolParameter(default=True)
def requires(self):
# pass parameter upstream
return TaskPreprocess(do_preprocess=self.do_preprocess)
See [luigi docs for handling parameter inheritance](https://luigi.readthedocs.io/en/stable/api/luigi.util.html#using-inherits-and-requires-to-ease-parameter-pain)
Default Parameter Values in Config¶
As an alternative to inheriting parameters, you can define defaults in a config files. When you change the config it will automatically rerun tasks. The DOWNSIDE is that previously saved data will be overwritten!
class TaskPreprocess(d6tflow.tasks.TaskCachePandas):
do_preprocess = luigi.BoolParameter(default=cfg.do_preprocess) # store default in config
Avoid repeating parameters when referring to tasks¶
To run tasks and load their output for different parameters, you have to pass them to the task. Instead of hardcoding them each time, it is best to keep them in a dictionary and pass that to the task.
# avoid this
d6tflow.run(TaskTrain(do_preprocess=False, model='nnet'))
TaskTrain(do_preprocess=False, model='nnet').outputLoad()
# better
params = dict(do_preprocess=False, model='nnet')
d6tflow.run(TaskTrain(**params))
TaskTrain(**params).outputLoad()
Handling Data Change¶
In future releases, d6tflow will automatically detect data changes. For now you have to manually reset tasks.
Handling Code Change¶
Code changes likely lead to data changes. Code changes are difficult to detect and it is best if you manually force tasks to rerun.
Forcing a Single Task to Run¶
You can always run single tasks by calling the run() function. This is useful during debugging. However, this will only run this one task and not take care of any downstream dependencies.
# forcing execution
TaskTrain().run()
Hiding Execution Output¶
By default, the workflow execution summary is shown, because it shows important information which tasks were run and if any failed. At times, eg during deployment, it can be desirable to not show the execution output.
d6tflow.settings.execution_summary = False # global
# or
d6tflow.run(Task() ,execution_summary=False) # at each run
While typically not necessary, you can control change the log level to see additional log data. Default is WARNING
. It is a global setting, modify before you execute d6tflow.run()
.
d6tflow.settings.log_level = 'WARNING' # 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'