An Airflow pipeline that detects its own failures, diagnoses the root cause with an AI agent, proposes a fix, and routes the result to Slack — all without human intervention in the loop until review time.
When a task fails, instead of just sending an alert, the system asks: what broke, can it be fixed automatically, and what does the on-call engineer actually need to do?
A task failure fires Airflow's on_failure_callback. The callback writes a structured log entry and hands off to the remediation agent, which runs an agentic loop backed by OpenAI gpt-4o with LangChain-declared tools. The agent reads recent logs, classifies the failure, checks for schema drift, and — if the error is a known pattern like a dropped column — generates a patched SQL query.
The remediation agent then decides what to do with that output:
- Fix found and valid → posts a Slack message with the diagnosis, the patched query, and a direct link to open a merge request
- No actionable fix → posts a Slack alert with the diagnosis so the engineer has context before they even open the logs
Every run is persisted to logs/remediation_history.json for audit.
See docs/architecture.md for a component-by-component breakdown.
| Scenario | Trigger | Auto-fixable |
|---|---|---|
schema_change |
Column removed from source table | Yes — NULL placeholder patch |
type_mismatch |
Wrong type cast in SQL | Yes — NULL placeholder patch |
missing_file |
Source CSV not found | No — alert only |
db_timeout |
Warehouse connection refused | No — alert only |
Prerequisites: Python 3.10+, a Slack bot token with chat:write scope, an OpenAI API key.
git clone <repo-url>
cd agentic-recovery-pipeline
python3 -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
cp .env.example .env # fill in OPENAI_API_KEY, SLACK_BOT_TOKEN, SLACK_CHANNELpython scripts/simulate_failure.py --scenario schema_changeThis writes a mock log entry and runs the full remediation workflow end-to-end. You will see the diagnosis and fix printed to stdout and a Slack message sent to your configured channel.
Deploy the dags/ directory to your Airflow DAGs folder. The on_failure_callback in dags/callbacks.py is already wired up in each DAG's default_args.
pytest tests/agent/
diagnostic_agent.py OpenAI gpt-4o agentic loop with tool calling
remediation_agent.py fix validation, routing, and audit logging
slack_notifier.py Slack block kit messages
tools.py log reader, schema detector, query patcher, classifier
dags/
etl_pipeline.py ETL DAG definitions with failure scenarios
callbacks.py Airflow on_failure_callback
config/
settings.py environment variable loading
scripts/
simulate_failure.py end-to-end simulation without Airflow
tests/
test_tools.py
test_remediation_agent.py
logs/ per-DAG failure logs + remediation history (git-ignored)
docs/
architecture.svg
architecture.md