General Examples
ETL Mini-Pipeline
from auto_workflow import task, flow, fan_out
@task
def extract_sources() -> list[str]:
return ["source_a", "source_b"]
@task
def extract(name: str) -> dict:
return {"name": name, "rows": [1,2,3]}
@task
def transform(batch: dict) -> dict:
return {**batch, "rows": [r*2 for r in batch["rows"]]}
@task(persist=True)
def load(batches: list[dict]) -> int:
# pretend to write to warehouse
return sum(len(b["rows"]) for b in batches)
@flow
def etl():
srcs = extract_sources()
extracted = fan_out(extract, srcs)
transformed = fan_out(transform, extracted)
return load(transformed)
print(etl.run())
Caching & Priority
@task(cache_ttl=300, priority=10)
def config(): return {"threshold": 5}
@task(priority=1)
def heavy(x: int): return compute(x)
@flow
def prioritized():
cfg = config()
results = [heavy(i) for i in range(5)]
return results, cfg
Secrets
from auto_workflow.secrets import secret, StaticMappingSecrets, set_secrets_provider
set_secrets_provider(StaticMappingSecrets({"TOKEN": "abc"}))
@task
def use_secret():
return secret("TOKEN")
Timeout + Retry
@task(retries=2, retry_backoff=1.0, timeout=3)
async def external_call():
...
ArtifactRef Handling
from auto_workflow.artifacts import get_store
@task(persist=True)
def produce_big():
return list(range(100000))
@task
def consume(ref):
data = get_store().get(ref)
return len(data)
Custom Tracing
from auto_workflow.tracing import set_tracer
from contextlib import asynccontextmanager
import time
class RecordingTracer:
@asynccontextmanager
async def span(self, name: str, **attrs):
start = time.time()
try:
yield {"name": name, **attrs}
finally:
dur = (time.time() - start) * 1000.0
print(f"span {name} attrs={attrs} dur_ms={dur:.2f}")
set_tracer(RecordingTracer())
Threaded Parallelism
from auto_workflow import task, flow
import time
@task(run_in="thread")
def slow(i: int):
time.sleep(0.05)
return i
@flow
def parallel():
return [slow(i) for i in range(4)]
print(parallel.run())