Skip to content

wassupjay/Agentic-Recovery-Pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Agentic Recovery Pipeline

An Airflow pipeline that detects its own failures, diagnoses the root cause with an AI agent, proposes a fix, and routes the result to Slack — all without human intervention in the loop until review time.

When a task fails, instead of just sending an alert, the system asks: what broke, can it be fixed automatically, and what does the on-call engineer actually need to do?

Demo

How it works

A task failure fires Airflow's on_failure_callback. The callback writes a structured log entry and hands off to the remediation agent, which runs an agentic loop backed by OpenAI gpt-4o with LangChain-declared tools. The agent reads recent logs, classifies the failure, checks for schema drift, and — if the error is a known pattern like a dropped column — generates a patched SQL query.

The remediation agent then decides what to do with that output:

  • Fix found and valid → posts a Slack message with the diagnosis, the patched query, and a direct link to open a merge request
  • No actionable fix → posts a Slack alert with the diagnosis so the engineer has context before they even open the logs

Every run is persisted to logs/remediation_history.json for audit.

Architecture

Architecture Diagram

See docs/architecture.md for a component-by-component breakdown.

Failure scenarios

Scenario Trigger Auto-fixable
schema_change Column removed from source table Yes — NULL placeholder patch
type_mismatch Wrong type cast in SQL Yes — NULL placeholder patch
missing_file Source CSV not found No — alert only
db_timeout Warehouse connection refused No — alert only

Setup

Prerequisites: Python 3.10+, a Slack bot token with chat:write scope, an OpenAI API key.

git clone <repo-url>
cd agentic-recovery-pipeline
python3 -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
cp .env.example .env   # fill in OPENAI_API_KEY, SLACK_BOT_TOKEN, SLACK_CHANNEL

Usage

Simulate a failure (no Airflow required)

python scripts/simulate_failure.py --scenario schema_change

This writes a mock log entry and runs the full remediation workflow end-to-end. You will see the diagnosis and fix printed to stdout and a Slack message sent to your configured channel.

Run with Airflow

Deploy the dags/ directory to your Airflow DAGs folder. The on_failure_callback in dags/callbacks.py is already wired up in each DAG's default_args.

Run tests

pytest tests/

Project structure

agent/
  diagnostic_agent.py   OpenAI gpt-4o agentic loop with tool calling
  remediation_agent.py  fix validation, routing, and audit logging
  slack_notifier.py     Slack block kit messages
  tools.py              log reader, schema detector, query patcher, classifier
dags/
  etl_pipeline.py       ETL DAG definitions with failure scenarios
  callbacks.py          Airflow on_failure_callback
config/
  settings.py           environment variable loading
scripts/
  simulate_failure.py   end-to-end simulation without Airflow
tests/
  test_tools.py
  test_remediation_agent.py
logs/                   per-DAG failure logs + remediation history (git-ignored)
docs/
  architecture.svg
  architecture.md

About

An Airflow pipeline that detects its own failures, diagnoses the root cause with an AI agent, proposes a fix, and routes the result to Slack,all without human intervention in the loop until review time.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages