|
| 1 | +""" |
| 2 | +AI orchestration: prompt generation and provider routing. |
| 3 | +""" |
| 4 | + |
| 5 | +from __future__ import annotations |
| 6 | + |
| 7 | +import json |
| 8 | +from typing import Any, Dict, Optional |
| 9 | + |
| 10 | +from .config import load_config |
| 11 | +from .ai_providers.openai_provider import get_openai_risk_assessment |
| 12 | +# Other providers (Google/Grok/DeepSeek) are imported lazily inside get_risk_assessment() |
| 13 | +# to keep optional dependencies graceful and avoid import errors if packages are not installed. |
| 14 | + |
| 15 | + |
| 16 | +def generate_ai_prompt(cve_details: str, cve_data: Dict[str, Any]) -> str: |
| 17 | + """ |
| 18 | + Build a deterministic, hallucination-resistant prompt. We: |
| 19 | + - Force exactly four numbered sections with strict formatting. |
| 20 | + - Emphasize evidential reasoning using provided data (CVE, KEV, EPSS, exploits). |
| 21 | + - Forbid inventing patches/links and require stating “Unknown” if uncertain. |
| 22 | + - Optimize for security triage usefulness (business impact + concrete actions). |
| 23 | + """ |
| 24 | + prompt = f""" |
| 25 | +You are a senior security analyst. Using ONLY the information provided below, produce EXACTLY four sections, each starting with the numeric header shown: |
| 26 | +
|
| 27 | +1. Risk Assessment |
| 28 | +Explain: vulnerability nature, affected components (if clear), preconditions, attacker effort, and impact on confidentiality, integrity, and availability. Reflect signals like CVSS/EPSS, CISA KEV status, public exploit presence (GitHub, Metasploit, Exploit‑DB, Nuclei) where available. If data is unclear, write “Unknown”. |
| 29 | +
|
| 30 | +2. Potential Attack Scenarios |
| 31 | +Describe at least one plausible end-to-end scenario: entry point, pre-auth vs post-auth, privilege required, lateral movement, and realistic outcomes (e.g., data theft, RCE, business downtime). If multiple paths exist, choose the highest-risk one and justify briefly. |
| 32 | +
|
| 33 | +3. Mitigation Recommendations |
| 34 | +Provide concrete, prioritized actions. Include: patch/update guidance if a vendor fix is referenced, interim mitigations (network controls, feature disablement, WAF rules, config hardening), and detection/monitoring ideas. Only cite links that are present in the provided references; do NOT invent URLs. If patch availability is unclear, say “Patch status: Unknown”. |
| 35 | +
|
| 36 | +4. Executive Summary |
| 37 | +Give a concise, two-paragraph summary for non-technical stakeholders: business risk, exploitation likelihood, and urgency. End with a clear call to action. |
| 38 | +
|
| 39 | +Strict formatting rules: |
| 40 | +- Plain text only. No bullet points, no dashes, no markdown, no emojis. |
| 41 | +- Each of the four headings must appear verbatim on its own line as above. |
| 42 | +- Separate paragraphs with a single blank line. |
| 43 | +- Do not add extra sections or a conclusion beyond the four sections. |
| 44 | +- If you are unsure, write “Unknown” rather than speculating. |
| 45 | +
|
| 46 | +CVE DETAILS: |
| 47 | +{cve_details} |
| 48 | +
|
| 49 | +FULL CVE DATA (for reference; use cautiously, do not copy raw JSON): |
| 50 | +{json.dumps(cve_data, indent=2)} |
| 51 | +""" |
| 52 | + return prompt |
| 53 | + |
| 54 | + |
| 55 | +def get_risk_assessment( |
| 56 | + ai_provider: Optional[str], |
| 57 | + cve_details: str, |
| 58 | + cve_data: Dict[str, Any], |
| 59 | + *, |
| 60 | + config: Optional[Dict[str, Any]] = None, |
| 61 | +) -> str: |
| 62 | + """ |
| 63 | + Route the prompt to the selected AI provider. If no provider is selected, return an explanatory message. |
| 64 | + """ |
| 65 | + if not ai_provider: |
| 66 | + return "❌ No AI provider selected." |
| 67 | + |
| 68 | + cfg = config or load_config() |
| 69 | + prompt = generate_ai_prompt(cve_details, cve_data) |
| 70 | + |
| 71 | + # Normalize common aliases |
| 72 | + normalized = { |
| 73 | + "openai": "openai", |
| 74 | + "chatgpt": "openai", |
| 75 | + "gpt": "openai", |
| 76 | + "google": "google", |
| 77 | + "gemini": "google", |
| 78 | + "grok": "grok", |
| 79 | + "xai": "grok", |
| 80 | + "deepseek": "deepseek", |
| 81 | + } |
| 82 | + provider = normalized.get(ai_provider.lower(), ai_provider.lower()) |
| 83 | + |
| 84 | + if provider == "openai": |
| 85 | + return get_openai_risk_assessment(prompt, cfg.get("openai_api_key")) |
| 86 | + |
| 87 | + if provider == "google": |
| 88 | + try: |
| 89 | + from .ai_providers.google_provider import get_google_risk_assessment |
| 90 | + except Exception as e: |
| 91 | + return f"❌ Google provider not available: {e}" |
| 92 | + return get_google_risk_assessment(prompt, cfg.get("google_ai_api_key")) |
| 93 | + |
| 94 | + if provider == "grok": |
| 95 | + try: |
| 96 | + from .ai_providers.grok_provider import get_grok_risk_assessment |
| 97 | + except Exception as e: |
| 98 | + return f"❌ Grok provider not available: {e}" |
| 99 | + return get_grok_risk_assessment(prompt, cfg.get("grok_api_key")) |
| 100 | + |
| 101 | + if provider == "deepseek": |
| 102 | + try: |
| 103 | + from .ai_providers.deepseek_provider import get_deepseek_risk_assessment |
| 104 | + except Exception as e: |
| 105 | + return f"❌ DeepSeek provider not available: {e}" |
| 106 | + return get_deepseek_risk_assessment(prompt, cfg.get("deepseek_api_key")) |
| 107 | + |
| 108 | + return "❌ Unknown AI provider selected." |
0 commit comments