Functional Tasks

What are functional tasks?

Functional tasks are meant to provide a nice decorator based way of defining tasks.

How to create a functional task?

For defining our tasks we will need to first define a Workflow() object.

from d6tflow.functional import Workflow
flow = Workflow()

Each function is decorated with a flow.task decorator - that takes a d6tflow.tasks.TaskName as parameter

@flow.task(d6tflow.tasks.TaskPqPandas)
def your_functional_task(task):
    print("Running a complicated task!!")

You might have noticed we provide a task parameter to the function above.

This is deliberate.

If you have worked with d6tflow.task before you would remember having a self parameter passed to run() method.

Here task is exactly that. It contains all methods available in d6tflow.task.Task


Running a functional task

All functional tasks are run as d6tflow.task under the hood.

So we require to run them as you would run any d6tflow.task

Workflow() object comes with a run method which does exactly that.

flow.run(your_functional_task)

Below is a minimal example of functional task that encompasses everything mentioned above.

import d6tflow
from d6tflow.functional import Workflow
import pandas as pd

flow = Workflow()

@flow.task(d6tflow.tasks.TaskCache)
def sample_functional_task(task):
    df = pd.DataFrame({'a':range(3)})
    print("Functional task running!")
    task.save(df)

flow.run(sample_functional_task)

Additional decorators

These decorators are to be decorated after @flow.task

  • @flow.persists
    • Takes in a list of variables that need to be persisted for the flow task.

    • @flow.persists(['a1', 'a2'])
      
  • @flow.params
    • Takes in keyword-arguments of parameters and their types to be used in the function body.

    • @flow.params(example_argument=d6tflow.IntParameter(default=42))
      
  • @flow.requires
    • Defines dependencies between flow tasks.

    • @flow.requires({"foo": func1, "bar": func2})
      @flow.requires(func1)
      

Example -

...
@flow.task(d6tflow.tasks.TaskCache)
@flow.requires({"a":get_data1, "b":get_data2})
@flow.persists(['aa'])
def example_function(task):
    df = task.inputLoad()
    a = df["a"]
    b = df["b"]
    print(a,b)
    output = pd.DataFrame({'a':range(4)})
    task.save({'aa':output})
...

Passing parameters to the run() method

We saw in one of the above section how to run functional tasks.

d6tflow also allows you to pass in parameters to these functions dynamically using @flow.params()

Below is an example of passing a ‘multiplier’ paramter to a functional task.

@flow.params(multiplier=d6tflow.IntParameter(default=0))
def print_parameter(task):
    print(task.multiplier)

flow.run(print_parameter, params={'multiplier':42})

So basically, you define the parameter name and its type with @flow.params and then use the run() method’s params to pass in the actual value

Additional methods

Some of the functions that are in d6tflow are available in the Workflow() object too!

Here’s a list of them -

  • preview(function)
  • outputLoad(function)
  • run(functions_as_list)
  • reset(function)
  • outputLoadAll()

Wait! There is more! Here are some more functions unique to functional workflow.

  • add_global_params(example_argument=d6tflow.IntParameter(default=42))
  • resetAll()
  • delete(function)
  • deleteAll()