This guide takes you from pip install to a job reaching completed.
Before the code, here is the operational model Awa is built around:
- inserting a job writes durable job state to Postgres, so enqueuing can live inside the same transaction as your application write
- workers claim runnable jobs, heartbeat while they execute, and rescue them if the worker dies
- retries, callback waits, and progress checkpoints are persisted in Postgres and exposed as one hydrated job snapshot instead of being held only in memory
- when you debug or operate the system, inspect the job first; the CLI and UI are designed around that read-only inspection path
That means “what happened?” is usually a database inspection question, not a worker-log archaeology exercise.
- PostgreSQL running locally or remotely
- Python 3.10+
- A database URL exported as
DATABASE_URL
Example local URL:
export DATABASE_URL=postgres://postgres:test@localhost:15432/awa_testpython -m venv .venv
source .venv/bin/activate
pip install awa-pgpython -m awa --database-url "$DATABASE_URL" migrateCreate quickstart.py:
import asyncio
import os
from dataclasses import dataclass
import awa
DATABASE_URL = os.environ["DATABASE_URL"]
@dataclass
class SendEmail:
to: str
subject: str
async def main() -> None:
client = awa.AsyncClient(DATABASE_URL)
@client.task(SendEmail, queue="email")
async def handle_email(job):
print(f"sending email to {job.args.to}: {job.args.subject}")
await client.start([("email", 2)])
job = await client.insert(
SendEmail(to="alice@example.com", subject="Welcome"),
queue="email",
)
await asyncio.sleep(1)
result = await client.get_job(job.id)
print(f"job {result.id} state = {result.state}")
await client.shutdown()
asyncio.run(main())python quickstart.pyExpected output is similar to:
sending email to alice@example.com: Welcome
job 1 state = completed
python -m awa --database-url "$DATABASE_URL" job list --queue email
python -m awa --database-url "$DATABASE_URL" job dump 1
python -m awa --database-url "$DATABASE_URL" job dump-run 1
python -m awa --database-url "$DATABASE_URL" queue statsjob dump gives you the whole job snapshot as JSON. job dump-run focuses on one attempt: the current attempt uses live row data, while historical attempts are reconstructed from the stored errors[] history.
The dashboard ships in a separate wheel so the default awa-pg install stays small for workers and producers. Install the [ui] extra to bring in the awa-cli binary that hosts it:
pip install 'awa-pg[ui]'
python -m awa --database-url "$DATABASE_URL" serve
# → http://127.0.0.1:3000python -m awa serve delegates to the awa serve binary (you can also call awa serve directly once the extra is installed). The UI is read-only when the database reports transaction_read_only = on (e.g. on a replica) or when --read-only is passed.
await client.migrate()runs migrations from Python instead of the CLI.awa.Clientprovides a synchronous API for worker/admin/direct-producer code — all methods are plain (e.g.,client.insert(...),client.migrate()).client.start()accepts tuple queue configs for hard-reserved mode and dict configs for weighted mode. See Configuration reference.
Most applications should keep using their normal database stack for business
tables. Use AsyncClient/Client for workers, admin calls, migrations, and
queue-only producers; when a web request already has a transaction, enqueue
through awa.bridge on that same connection/session.
Install the app database libraries you already use, for example:
pip install 'sqlalchemy[asyncio]' asyncpgThen enqueue in the same SQLAlchemy transaction as your application write:
from dataclasses import dataclass
from awa.bridge import insert_job
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
@dataclass
class SendEmail:
to: str
subject: str
async def create_order(session: AsyncSession, order_id: str, email: str) -> int:
async with session.begin():
await session.execute(
text("INSERT INTO orders (id, email) VALUES (:id, :email)"),
{"id": order_id, "email": email},
)
job = await insert_job(
session,
SendEmail(to=email, subject="Order confirmed"),
queue="email",
)
return job["id"]The same bridge supports asyncpg, psycopg3, SQLAlchemy, and Django; see Bridge Adapters for driver-specific examples.
By default, a queue is strict FIFO per (queue, priority). Operators can opt a contended queue into partitioned FIFO by raising awa.queue_meta.enqueue_shards — order is then preserved within each shard, but not across shards. If your producer enqueues jobs that must be processed in order (per-customer events, sequential workflow steps), pass ordering_key so they all land on one shard:
await client.insert(
UpdateCustomer(customer_id=42, payload=...),
queue="customer-updates",
ordering_key=b"customer-42", # str also accepted; UTF-8 encoded
)At the default enqueue_shards = 1 the key is ignored (everything is on shard 0 anyway). See ADR-025 for the partitioned-FIFO contract and docs/upgrade-0.5-to-0.6.md for the operator-side knob.
awa records 20+ metrics (throughput, pickup latency, in-flight jobs, rescues, …) on the Rust side. Python workers enable OTLP export by calling awa.init_telemetry(...) once before the worker starts:
import os
import awa
awa.init_telemetry(
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"], # e.g. http://localhost:4317
os.environ.get("OTEL_SERVICE_NAME", "my-service"),
)
# ... then build the client and start workers as normal.init_telemetry is idempotent; only the first call installs a provider. Call awa.shutdown_telemetry() at the end of short-lived scripts to flush pending metrics. See awa-python/examples/telemetry.py for a runnable example.