Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 80 additions & 1 deletion src/forge/orchestrator/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from forge.workflow.registry import create_default_router
from forge.workflow.router import WorkflowRouter
from forge.workflow.utils.comment_classifier import CommentType, classify_comment
from forge.workflow.utils.jira_status import post_status_comment

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -722,14 +723,21 @@ async def _handle_resume_event(
f"(not in epic_keys): {feedback[:100]}..."
)
elif current_node in task_phase_nodes:
# In task phase - check if it's a Task
# In task phase - comments may target a Task or its Epic.
if child_ticket_key in task_keys:
comment_ticket_key = child_ticket_key
comment_ticket_type = "task"
logger.info(
f"Detected Task-level comment on {comment_ticket_key}: "
f"{feedback[:100]}..."
)
elif child_ticket_key in epic_keys:
comment_ticket_key = child_ticket_key
comment_ticket_type = "epic"
logger.info(
f"Detected Epic-level task comment on {comment_ticket_key}: "
f"{feedback[:100]}..."
)
else:
logger.info(
f"Detected comment on child ticket {child_ticket_key} "
Expand Down Expand Up @@ -998,6 +1006,12 @@ async def _handle_resume_event(
updated_state["is_question"] = True
updated_state["feedback_comment"] = feedback
updated_state["revision_requested"] = False
await self._post_resume_ack_comment(
message.ticket_key,
signal_type="question",
current_node=current_node,
source_ticket_key=comment_ticket_key,
)
elif is_rejected and feedback:
updated_state["is_paused"] = False
updated_state["revision_requested"] = True
Expand All @@ -1011,6 +1025,12 @@ async def _handle_resume_event(
else:
updated_state["current_task_key"] = None
updated_state["current_epic_key"] = None
await self._post_resume_ack_comment(
message.ticket_key,
signal_type="revision",
current_node=current_node,
source_ticket_key=comment_ticket_key,
)
elif was_errored:
# Workflow has an error — auto-resume up to MAX_AUTO_RETRIES times,
# then require an explicit forge:retry label.
Expand Down Expand Up @@ -1070,6 +1090,65 @@ async def _handle_resume_event(

return updated_state

async def _post_resume_ack_comment(
self,
ticket_key: str,
signal_type: str,
current_node: str,
source_ticket_key: str | None = None,
) -> None:
"""Post a best-effort Jira acknowledgement for user-visible resume signals."""
stage = self._stage_label_for_node(current_node)
source_suffix = (
f" from {source_ticket_key}"
if source_ticket_key and source_ticket_key != ticket_key
else ""
)

if signal_type == "question":
message = (
f"Forge received your question about {stage}{source_suffix} "
"and is preparing an answer."
)
else:
message = (
f"Forge received your revision request for {stage}{source_suffix} "
"and is regenerating the artifact."
)

try:
jira = JiraClient()
try:
await post_status_comment(jira, ticket_key, message)
finally:
await jira.close()
except Exception as e:
logger.warning(f"Failed to post resume acknowledgement to {ticket_key}: {e}")

@staticmethod
def _stage_label_for_node(current_node: str) -> str:
"""Return a human-readable workflow stage for an approval/review node."""
node_to_stage = {
"prd_approval_gate": "the PRD",
"generate_prd": "the PRD",
"regenerate_prd": "the PRD",
"spec_approval_gate": "the spec",
"generate_spec": "the spec",
"regenerate_spec": "the spec",
"plan_approval_gate": "the plan",
"decompose_epics": "the plan",
"regenerate_all_epics": "the plan",
"update_single_epic": "the plan",
"rca_option_gate": "the RCA",
"plan_approval_gate_bug": "the plan",
"task_approval_gate": "the tasks",
"generate_tasks": "the tasks",
"regenerate_all_tasks": "the tasks",
"update_single_task": "the task",
"human_review_gate": "the implementation review",
}
return node_to_stage.get(current_node, "the current workflow stage")

@staticmethod
def _extract_text_from_adf(adf: dict) -> str:
"""Extract plain text from Atlassian Document Format."""
Expand Down
4 changes: 3 additions & 1 deletion src/forge/workflow/bug/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ def route_entry(state: BugState) -> str:
return "triage_check"
elif current_node == "triage_gate":
return "triage_gate"
elif current_node in ("analyze_bug", "regenerate_rca"):
elif current_node == "analyze_bug":
return "analyze_bug"
elif current_node == "regenerate_rca":
return "regenerate_rca"
elif current_node == "reflect_rca":
return "reflect_rca"
elif current_node in ("rca_option_gate", "rca_approval_gate"):
Expand Down
24 changes: 21 additions & 3 deletions src/forge/workflow/feature/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,32 @@ def route_by_ticket_type(state: FeatureState) -> str:
return shared

# Feature-specific resume mapping
if current_node in ("generate_prd", "regenerate_prd"):
if current_node == "generate_prd":
return "generate_prd"
elif current_node == "regenerate_prd":
return "regenerate_prd"
elif current_node == "prd_approval_gate":
return "prd_approval_gate"
elif current_node in ("generate_spec", "regenerate_spec"):
elif current_node == "generate_spec":
return "generate_spec"
elif current_node == "regenerate_spec":
return "regenerate_spec"
elif current_node == "spec_approval_gate":
return "spec_approval_gate"
elif current_node in ("decompose_epics", "regenerate_all_epics", "update_single_epic"):
elif current_node == "decompose_epics":
return "decompose_epics"
elif current_node == "regenerate_all_epics":
return "regenerate_all_epics"
elif current_node == "update_single_epic":
return "update_single_epic"
elif current_node == "plan_approval_gate":
return "plan_approval_gate"
elif current_node == "generate_tasks":
return "generate_tasks"
elif current_node == "regenerate_all_tasks":
return "regenerate_all_tasks"
elif current_node == "update_single_task":
return "update_single_task"
elif current_node == "task_approval_gate":
return "task_approval_gate"
elif current_node == "wait_for_ci_gate":
Expand Down Expand Up @@ -406,10 +418,16 @@ def build_feature_graph() -> StateGraph:
# Resume routing for Feature workflow - planning stages
"prd_approval_gate": "prd_approval_gate",
"generate_spec": "generate_spec",
"regenerate_prd": "regenerate_prd",
"spec_approval_gate": "spec_approval_gate",
"regenerate_spec": "regenerate_spec",
"decompose_epics": "decompose_epics",
"regenerate_all_epics": "regenerate_all_epics",
"update_single_epic": "update_single_epic",
"plan_approval_gate": "plan_approval_gate",
"generate_tasks": "generate_tasks",
"regenerate_all_tasks": "regenerate_all_tasks",
"update_single_task": "update_single_task",
"task_approval_gate": "task_approval_gate",
# Resume routing for Feature workflow - execution stages
"task_router": "task_router",
Expand Down
3 changes: 3 additions & 0 deletions src/forge/workflow/nodes/epic_decomposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ async def decompose_epics(state: WorkflowState) -> WorkflowState:
**state,
"epic_keys": epic_keys,
"generation_context": generation_context,
"feedback_comment": None,
"revision_requested": False,
"current_epic_key": None,
"current_node": "plan_approval_gate",
"last_error": f"Partial Jira failure: {jira_error}" if jira_error else None,
}
Expand Down
4 changes: 4 additions & 0 deletions src/forge/workflow/nodes/task_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ async def generate_tasks(state: WorkflowState) -> WorkflowState:
**state,
"task_keys": all_task_keys,
"tasks_by_repo": tasks_by_repo,
"feedback_comment": None,
"revision_requested": False,
"current_task_key": None,
"current_epic_key": None,
"current_node": "task_approval_gate",
"last_error": f"Partial Jira failure: {jira_error}" if jira_error else None,
}
Expand Down
2 changes: 1 addition & 1 deletion tests/flows/bug_workflow/test_complete_bug_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class TestBugWorkflowResumeRouting:

@pytest.mark.parametrize("node,expected", [
("analyze_bug", "analyze_bug"),
("regenerate_rca", "analyze_bug"),
("regenerate_rca", "regenerate_rca"), # reruns cleanup+setup before analyze_bug
("rca_approval_gate", "rca_option_gate"), # backward compat: old gate maps to new
("setup_workspace", "setup_workspace"),
("implement_bug_fix", "implement_bug_fix"),
Expand Down
10 changes: 10 additions & 0 deletions tests/unit/orchestrator/gates/test_task_approval.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ def test_routes_to_update_single_on_task_rejection(self, task_pending_state):

assert result == "update_single_task"

def test_routes_to_regenerate_all_on_epic_sourced_rejection(self, task_pending_state):
"""Epic-sourced task feedback uses full task regeneration."""
task_pending_state["current_epic_key"] = "TEST-124"
task_pending_state["feedback_comment"] = "Revise the tasks for this epic."
task_pending_state["revision_requested"] = True

result = route_task_approval(task_pending_state)

assert result == "regenerate_all_tasks"

def test_routes_to_end_when_pending(self, task_pending_state):
"""Pending Tasks without feedback routes to END."""
result = route_task_approval(task_pending_state)
Expand Down
71 changes: 69 additions & 2 deletions tests/unit/orchestrator/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@
class TestQuestionDetection:
"""Tests for Q&A mode question detection."""

@pytest.fixture(autouse=True)
def ack_comment_mocks(self):
"""Mock Jira acknowledgement posting for direct resume-event tests."""
mock_jira = AsyncMock()
mock_jira.close = AsyncMock()
with (
patch("forge.orchestrator.worker.JiraClient", return_value=mock_jira),
patch("forge.orchestrator.worker.post_status_comment", new_callable=AsyncMock) as post,
):
yield post

@pytest.fixture
def worker(self) -> OrchestratorWorker:
"""Create a worker instance for testing."""
Expand Down Expand Up @@ -68,7 +79,11 @@ def _make_message_with_comment(

@pytest.mark.asyncio
async def test_question_comment_sets_is_question_flag(
self, worker: OrchestratorWorker, base_message: QueueMessage, base_state: dict
self,
worker: OrchestratorWorker,
base_message: QueueMessage,
base_state: dict,
ack_comment_mocks,
):
"""Comments starting with ? set is_question flag."""
message = self._make_message_with_comment(base_message, "?Why REST instead of GraphQL?")
Expand All @@ -79,6 +94,10 @@ async def test_question_comment_sets_is_question_flag(
assert result["feedback_comment"] == "?Why REST instead of GraphQL?"
assert result["revision_requested"] is False
assert result["is_paused"] is False
ack_comment_mocks.assert_awaited_once()
ack_text = ack_comment_mocks.await_args.args[2]
assert "received your question" in ack_text
assert "the PRD" in ack_text

@pytest.mark.asyncio
async def test_forge_ask_comment_sets_is_question_flag(
Expand All @@ -98,7 +117,11 @@ async def test_forge_ask_comment_sets_is_question_flag(

@pytest.mark.asyncio
async def test_normal_feedback_still_works(
self, worker: OrchestratorWorker, base_message: QueueMessage, base_state: dict
self,
worker: OrchestratorWorker,
base_message: QueueMessage,
base_state: dict,
ack_comment_mocks,
):
"""Feedback comments with ! prefix trigger revision_requested."""
message = self._make_message_with_comment(
Expand All @@ -111,6 +134,50 @@ async def test_normal_feedback_still_works(
assert result["revision_requested"] is True
assert result["feedback_comment"] == "Please add more detail to the security section"
assert result["is_paused"] is False
ack_comment_mocks.assert_awaited_once()
ack_text = ack_comment_mocks.await_args.args[2]
assert "received your revision request" in ack_text
assert "regenerating" in ack_text

@pytest.mark.asyncio
async def test_task_phase_feedback_from_epic_sets_current_epic_key(
self,
worker: OrchestratorWorker,
base_message: QueueMessage,
base_state: dict,
ack_comment_mocks,
):
"""Comments on an Epic during task review preserve the Epic source."""
state = {
**base_state,
"current_node": "task_approval_gate",
"epic_keys": ["TEST-124"],
"task_keys": ["TEST-130"],
}
payload = {
**base_message.payload,
"source_ticket_key": "TEST-124",
"comment": {"body": "!Please revise the tasks for this epic"},
"changelog": {"items": []},
}
message = QueueMessage(
message_id=base_message.message_id,
event_id=base_message.event_id,
source=base_message.source,
event_type="comment_created",
ticket_key=base_message.ticket_key,
payload=payload,
)

result = await worker._handle_resume_event(message, state)

assert result["revision_requested"] is True
assert result["feedback_comment"] == "Please revise the tasks for this epic"
assert result["current_epic_key"] == "TEST-124"
assert result["current_task_key"] is None
ack_comment_mocks.assert_awaited_once()
ack_text = ack_comment_mocks.await_args.args[2]
assert "from TEST-124" in ack_text

@pytest.mark.asyncio
async def test_prd_label_change_to_approved_sets_approved_flag(
Expand Down
9 changes: 8 additions & 1 deletion tests/unit/orchestrator/test_worker_option_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,14 @@ async def test_no_option_pattern_falls_through_to_revision(self, worker):
message = _make_option_message("!I think the analysis missed the real root cause.")
state = _make_rca_gate_state()

result = await worker._handle_resume_event(message, state)
mock_jira = AsyncMock()
mock_jira.close = AsyncMock()

with (
patch("forge.orchestrator.worker.JiraClient", return_value=mock_jira),
patch("forge.orchestrator.worker.post_status_comment", new_callable=AsyncMock),
):
result = await worker._handle_resume_event(message, state)

assert result["revision_requested"] is True
assert result["selected_fix_option"] is None
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/workflow/bug/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ class TestRouteEntry:
("post_merge_summary", "post_merge_summary"),
# Backward compat: old rca_approval_gate value maps to rca_option_gate
("rca_approval_gate", "rca_option_gate"),
# regenerate_rca loops through analyze_bug
("regenerate_rca", "analyze_bug"),
# regenerate_rca performs cleanup before routing through analyze_bug
("regenerate_rca", "regenerate_rca"),
# Preserved existing nodes
("setup_workspace", "setup_workspace"),
("implement_bug_fix", "implement_bug_fix"),
Expand Down
Loading
Loading