Retries, Timeouts & Failure Handling¶
Robust workflows need resilient task execution. auto-workflow
offers per-task retries with exponential backoff + jitter, timeouts, and flow-level failure policies.
Task-Level Retries¶
@task(retries=3, retry_backoff=1.0, retry_jitter=0.3)
async def unreliable():
...
Sequence (no jitter): 1.0s, 2.0s, 4.0s delays before final attempt.
Timeout¶
@task(timeout=5)
async def slow(): ...
If the inner coroutine does not finish in 5 seconds, a custom TimeoutError
is raised (subject to retry logic if retries remain).
Failure Policies (Flow Run)¶
Set at invocation:
flow.run(failure_policy="fail_fast") # default
flow.run(failure_policy="continue") # downstream tasks attempt even if upstream failed
flow.run(failure_policy="aggregate") # collect all failures, raise AggregateTaskError at end
Error Surfaces¶
TaskExecutionError
: wrapper for a task failure.RetryExhaustedError
: raised after final attempt fails (unless policy continues).AggregateTaskError
: contains list ofTaskExecutionError
when usingaggregate
policy.
Observability Hooks¶
Events emitted: task_started
, task_retry
, task_failed
, task_succeeded
.
Subscribe via:
from auto_workflow import subscribe
def on_retry(evt):
if evt.get("task") == "unreliable":
print("retrying", evt)
subscribe("task_retry", on_retry)