Flows¶
A Flow defines how tasks compose into a DAG. Use the @flow
decorator on a function whose body instantiates tasks and passes results to other tasks.
Basic Flow¶
from auto_workflow import task, flow
@task
def inc(x: int) -> int: return x + 1
@task
def add(a: int, b: int) -> int: return a + b
@flow
def two_step():
x = inc(1)
y = inc(x)
return add(x, y)
print(two_step.run())
Description Without Execution¶
print(two_step.describe())
# => {"flow": "two_step", "nodes": [...], "count": N}
Exporting Graph¶
print(two_step.export_dot()) # DOT format
print(two_step.export_graph()) # adjacency JSON
When using dynamic fan-out (fan_out(...)
), DOT export renders barrier nodes (fanout:n
) as
diamonds and wires dependencies through them. This removes any direct shortcut edges from the
original source to downstream consumers, ensuring the visual ordering matches execution.
Parameters¶
Pass runtime parameters:
@flow
def configured():
from auto_workflow.context import get_context
ctx = get_context()
n = ctx.params.get("n", 5)
# build tasks using n
...
configured.run(params={"n": 10})
Failure Policies¶
Configure at run time:
two_step.run(failure_policy="continue")
Options: fail_fast
(default), continue
, aggregate
.
Concurrency Limit¶
Limit simultaneous in-flight tasks:
two_step.run(max_concurrency=4)
Cancellation¶
A cancellation event is internally supported; external cooperative cancellation can be added by wrapping the scheduler (see Priority & Cancellation).