Postgres Connector¶
The Postgres connector provides a psycopg3-backed client with connection pooling and convenient helpers. Optional SQLAlchemy helpers are available.
Installation¶
Install optional extras:
poetry install -E connectors-postgres
poetry install -E connectors-sqlalchemy
# or install all connector extras
poetry install -E connectors-all
Quick usage¶
from auto_workflow.connectors.postgres import client
# Simple query
with client("default") as db:
rows = db.query("select * from users where id = %s", (123,))
# SQLAlchemy session
with client("default").sqlalchemy_session() as session:
# ORM usage here
pass
# Reflect tables
md = client("default").sqlalchemy_reflect(schema="public", only=["users"])
users_table = md.tables["public.users"]
# Stream results (memory-friendly)
for row in client("default").query_iter("select * from big_table", size=1000):
process(row)
# Bulk COPY (CSV)
import io
csv_data = io.BytesIO(b"a,b\n1,2\n")
client("default").copy_from("public.t", csv_data, columns=["a","b"], format="csv")
buf = io.BytesIO()
client("default").copy_to("public.t", buf, columns=["a","b"], format="csv")
End-to-end example (tasks + flow)¶
from auto_workflow import task, flow
from auto_workflow.connectors import postgres
PROFILE = "example"
@task
def setup(profile: str = PROFILE) -> str:
t = "aw_docs_demo"
with postgres.client(profile) as db:
db.execute(f"CREATE TABLE IF NOT EXISTS {t} (v INT)")
return t
@task
def insert_vals(tbl: str, profile: str = PROFILE):
with postgres.client(profile) as db:
db.executemany(f"INSERT INTO {tbl} (v) VALUES (%s)", [(i,) for i in range(5)])
@task
def sum_vals(tbl: str, profile: str = PROFILE) -> int:
with postgres.client(profile) as db:
return int(db.query_value(f"SELECT COALESCE(SUM(v),0) FROM {tbl}"))
@flow
def demo_flow(profile: str = PROFILE) -> int:
t = setup(profile)
insert_vals(t, profile)
return sum_vals(t, profile)
if __name__ == "__main__":
print(demo_flow.run(PROFILE))
Convenience helpers¶
query_one(sql, params=None, timeout=None) -> dict | None
: returns the first row or None.query_value(sql, params=None, timeout=None) -> Any | None
: returns first column of first row or None.
Pool tuning and connection parameters¶
- Optional pool parameters (if provided in cfg):
min_size
,max_size
,timeout
. _conninfo
includesapplication_name
if provided.sslmode
is included only when explicitly configured (aligns with SQLAlchemy URL behavior).
Transactions¶
- All operations inside
transaction()
run on the same connection; nested transactions do not issue additionalBEGIN/COMMIT
. - Statement timeouts use
SET LOCAL statement_timeout
per connection.
from auto_workflow.connectors.postgres import client
with client("default") as db:
with db.transaction(isolation="serializable", readonly=True, deferrable=True):
db.execute("INSERT INTO t(v) VALUES (1)")
Pool lifecycle guidance¶
- The registry caches active clients per profile+config; avoid opening/closing per tiny operation in hot paths.
- Prefer:
with client(profile).connection() as conn:
for scoped, multi-statement work, or- keep the client open; close on application shutdown.
Configuration¶
Connector config is read from your pyproject.toml
under tool.auto_workflow.connectors
, then overlaid by environment variables.
[tool.auto_workflow.connectors.postgres.default]
host = "db.example.com"
database = "app"
user = "appuser"
password = "secret://dev/db_password" # resolved via secrets provider
Environment overrides use the prefix AUTO_WORKFLOW_CONNECTORS_POSTGRES_<PROFILE>__
. You can also provide a high‑precedence JSON overlay via __JSON
.
export AUTO_WORKFLOW_CONNECTORS_POSTGRES_DEFAULT__HOST=localhost
export AUTO_WORKFLOW_CONNECTORS_POSTGRES_DEFAULT__JSON='{"sslmode": "require"}'
Required vs optional settings¶
You can configure the client using either a single DSN or individual fields.
- Option A (single variable):
AUTO_WORKFLOW_CONNECTORS_POSTGRES_<PROFILE>__DSN
- Option B (individual fields):
HOST
,DATABASE
(orDBNAME
),USER
; optionalPASSWORD
,PORT
,SSLMODE
,APPLICATION_NAME
.
Minimal env examples (DEFAULT profile):
export AUTO_WORKFLOW_CONNECTORS_POSTGRES_DEFAULT__DSN="postgresql://app:secret@db:5432/appdb"
export AUTO_WORKFLOW_CONNECTORS_POSTGRES_DEFAULT__HOST=db
export AUTO_WORKFLOW_CONNECTORS_POSTGRES_DEFAULT__DATABASE=appdb
export AUTO_WORKFLOW_CONNECTORS_POSTGRES_DEFAULT__USER=app
export AUTO_WORKFLOW_CONNECTORS_POSTGRES_DEFAULT__PASSWORD=secret://dev/db_password
export AUTO_WORKFLOW_CONNECTORS_POSTGRES_DEFAULT__PORT=5432
export AUTO_WORKFLOW_CONNECTORS_POSTGRES_DEFAULT__SSLMODE=require
export AUTO_WORKFLOW_CONNECTORS_POSTGRES_DEFAULT__APPLICATION_NAME=auto-workflow
Environment variables reference¶
AUTO_WORKFLOW_CONNECTORS_POSTGRES_<PROFILE>__DSN
— single connection string. If set, individual fields are ignored.AUTO_WORKFLOW_CONNECTORS_POSTGRES_<PROFILE>__HOST
— required if not using DSNAUTO_WORKFLOW_CONNECTORS_POSTGRES_<PROFILE>__PORT
— optional, defaults to 5432AUTO_WORKFLOW_CONNECTORS_POSTGRES_<PROFILE>__DATABASE
— required if not using DSNAUTO_WORKFLOW_CONNECTORS_POSTGRES_<PROFILE>__DBNAME
— alias of DATABASEAUTO_WORKFLOW_CONNECTORS_POSTGRES_<PROFILE>__USER
— required if not using DSNAUTO_WORKFLOW_CONNECTORS_POSTGRES_<PROFILE>__PASSWORD
— optional; supportssecret://
AUTO_WORKFLOW_CONNECTORS_POSTGRES_<PROFILE>__SSLMODE
— optionalAUTO_WORKFLOW_CONNECTORS_POSTGRES_<PROFILE>__APPLICATION_NAME
— optional- High‑precedence JSON overlay:
AUTO_WORKFLOW_CONNECTORS_POSTGRES_<PROFILE>__JSON