Skip to content

LCORE-: Wired compaction turn persistence into agentic query flows#1953

Draft
asimurka wants to merge 1 commit into
lightspeed-core:mainfrom
asimurka:integrate_compaction_to_agentic_flow
Draft

LCORE-: Wired compaction turn persistence into agentic query flows#1953
asimurka wants to merge 1 commit into
lightspeed-core:mainfrom
asimurka:integrate_compaction_to_agentic_flow

Conversation

@asimurka

@asimurka asimurka commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Description

Wires explicit turn compaction for compacted turns in query agentic flows.

Type of change

  • Refactor
  • New feature
  • Bug fix
  • CVE fix
  • Optimization
  • Documentation Update
  • Configuration Update
  • Bump-up service version
  • Bump-up dependent library
  • Bump-up library or tool used for development (does not change the final image)
  • CI configuration change
  • Konflux configuration change
  • Unit tests improvement
  • Integration tests improvement
  • End to end tests improvement
  • Benchmarks improvement

Tools used to create PR

Identify any AI code assistants used in this PR (for transparency and review context)

  • Assisted-by: (e.g., Claude, CodeRabbit, Ollama, etc., N/A if not used)
  • Generated by: (e.g., tool name and version; N/A if not used)

Related Tickets & Documents

  • Related Issue #
  • Closes #

Checklist before requesting a review

  • I have performed a self-review of my code.
  • PR has passed all pre-merge test jobs.
  • If it is a core feature, I have added thorough tests.

Testing

  • Please provide detailed steps to perform tests related to this code change.
  • How were the fix/results from this change verified? Please provide relevant screenshots or results.

Summary by CodeRabbit

  • Refactor

    • Modified how conversation history stores interaction data, removing structured output item persistence.
    • Refined original input handling in conversation compaction mode.
    • Optimized streaming response generation flow.
  • Tests

    • Updated test coverage to reflect changes in conversation persistence behavior.

@asimurka asimurka marked this pull request as draft June 19, 2026 12:41
@coderabbitai

coderabbitai Bot commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Walkthrough

The PR removes TurnSummary.output_items and migrates compacted-turn persistence out of the application layer into the model layer. A new CompactionTurnContext dataclass and _NormalizedLlamaStackStream wrapper replace the old request_stream override, handling event normalization and turn appending directly on stream completion. original_input is threaded from endpoints through agent utilities to build_agent.

Changes

Compaction Turn Persistence Refactor

Layer / File(s) Summary
Remove TurnSummary.output_items contract
src/models/common/turn_summary.py, src/app/endpoints/streaming_query.py
Drops output_items: list[OpenAIResponseOutput] from TurnSummary and removes all code that populated it from response_generator completion/failure terminals, store_compacted_turn calls, and the moderation-block path.
CompactionTurnContext and _NormalizedLlamaStackStream
src/pydantic_ai_lightspeed/llamastack/_model.py, src/pydantic_ai_lightspeed/llamastack/__init__.py
Adds CompactionTurnContext dataclass; rewrites stream wrapper to buffer early tool-call delta events, replay them on item announcement, consolidate MCP deltas, and call append_turn_items_to_conversation on ResponseCompletedEvent. Replaces request_stream override with _patch_responses_create client patching. Exports CompactionTurnContext from the package.
original_input threading through build_agent and agent utilities
src/utils/pydantic_ai.py, src/utils/agents/query.py, src/utils/agents/streaming.py
build_agent accepts original_input, constructs CompactionTurnContext when provided, and passes it to LlamaStackResponsesModel. retrieve_agent_response and retrieve_agent_response_generator receive original_input and use it in moderation-blocked conversation appends and in the build_agent call.
Endpoint original_input pass-through and tests
src/app/endpoints/query.py, src/app/endpoints/streaming_query.py, tests/unit/app/endpoints/test_streaming_query.py, tests/unit/utils/agents/test_streaming.py
query.py passes compaction.original_input unconditionally. streaming_query.py passes original_input=None on the non-compaction path and compacted_original_input via generate_response_with_compaction. Tests drop output_items assertions and add a new compacted-mode blocked-moderation assertion using original_input.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • lightspeed-core/lightspeed-stack#1936: Adds _FilteredResponseStream/request_stream buffering fix in _model.py that this PR directly replaces with the _NormalizedLlamaStackStream + client-patching approach.
  • lightspeed-core/lightspeed-stack#1870: Wires agent-based streaming through retrieve_agent_response_generatorgenerate_agent_response in streaming_query.py, the same path this PR extends with original_input threading.
  • lightspeed-core/lightspeed-stack#1919: Modifies retrieve_agent_response_generator and the moderation/persistence behavior in src/utils/agents/streaming.py, directly overlapping with this PR's original_input parameter addition.

Suggested reviewers

  • jrobertboos
  • tisnik
🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly describes the main change: wiring compaction turn persistence into agentic query flows, which aligns with all the modifications across query endpoints, model configuration, and agent utilities.
Docstring Coverage ✅ Passed Docstring coverage is 95.65% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
✨ Simplify code
  • Create PR with simplified code

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@asimurka asimurka marked this pull request as ready for review June 22, 2026 06:36

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/pydantic_ai_lightspeed/llamastack/_model.py`:
- Around line 246-277: The _patch_responses_create method mutates the shared
singleton client.responses.create method, causing a race condition where
concurrent LlamaStackResponsesModel instances overwrite each other's patches and
execute the wrong compaction context. Instead of directly assigning to
responses_api.create, implement per-instance method wrapping that preserves the
original create method behavior while maintaining instance-specific state
without mutating the shared client, or create a per-request client instance to
avoid sharing mutable state between concurrent requests.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 3a16c48b-ee7d-4f57-a12e-b55fc68e3576

📥 Commits

Reviewing files that changed from the base of the PR and between bcc47ed and 62527e3.

📒 Files selected for processing (10)
  • src/app/endpoints/query.py
  • src/app/endpoints/streaming_query.py
  • src/models/common/turn_summary.py
  • src/pydantic_ai_lightspeed/llamastack/__init__.py
  • src/pydantic_ai_lightspeed/llamastack/_model.py
  • src/utils/agents/query.py
  • src/utils/agents/streaming.py
  • src/utils/pydantic_ai.py
  • tests/unit/app/endpoints/test_streaming_query.py
  • tests/unit/utils/agents/test_streaming.py
💤 Files with no reviewable changes (2)
  • tests/unit/app/endpoints/test_streaming_query.py
  • src/models/common/turn_summary.py
📜 Review details
⏰ Context from checks skipped due to timeout. (2)
  • GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-0-6-on-pull-request
  • GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-on-pull-request
🧰 Additional context used
📓 Path-based instructions (4)
src/**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

src/**/*.py: Use absolute imports for internal modules: from authentication import get_auth_dependency
Llama Stack imports: Use from llama_stack_client import AsyncLlamaStackClient
Check constants.py for shared constants before defining new ones
All modules must start with descriptive docstrings explaining purpose
Use logger = get_logger(__name__) from log.py for module logging
All functions must have complete type annotations for parameters and return types, use modern syntax (str | int), and include descriptive docstrings
Use snake_case with descriptive, action-oriented names for functions (get_, validate_, check_)
Avoid in-place parameter modification anti-patterns; return new data structures instead of modifying function parameters
Use async def for I/O operations and external API calls
Use standard log levels with clear purposes: debug() for diagnostic info, info() for program execution, warning() for unexpected events, error() for serious problems
All classes must have descriptive docstrings explaining purpose and use PascalCase with standard suffixes: Configuration, Error/Exception, Resolver, Interface
Abstract classes must use ABC with @abstractmethod decorators
Follow Google Python docstring conventions with required sections: Parameters, Returns, Raises, and Attributes for classes

Files:

  • src/pydantic_ai_lightspeed/llamastack/__init__.py
  • src/app/endpoints/query.py
  • src/utils/pydantic_ai.py
  • src/utils/agents/query.py
  • src/utils/agents/streaming.py
  • src/app/endpoints/streaming_query.py
  • src/pydantic_ai_lightspeed/llamastack/_model.py
src/**/__init__.py

📄 CodeRabbit inference engine (AGENTS.md)

Package __init__.py files must contain brief package descriptions

Files:

  • src/pydantic_ai_lightspeed/llamastack/__init__.py
src/app/**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

src/app/**/*.py: FastAPI dependencies: Import from fastapi module for APIRouter, HTTPException, Request, status, Depends
Use FastAPI HTTPException with appropriate status codes for API endpoints and handle APIConnectionError from Llama Stack

Files:

  • src/app/endpoints/query.py
  • src/app/endpoints/streaming_query.py
tests/**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

tests/**/*.py: Use pytest for all unit and integration tests; do not use unittest
Use pytest.mark.asyncio marker for async tests

Files:

  • tests/unit/utils/agents/test_streaming.py
🧠 Learnings (2)
📚 Learning: 2026-01-14T09:37:51.612Z
Learnt from: asimurka
Repo: lightspeed-core/lightspeed-stack PR: 988
File: src/app/endpoints/query.py:319-339
Timestamp: 2026-01-14T09:37:51.612Z
Learning: In the lightspeed-stack repository, when provider_id == "azure", the Azure provider with provider_type "remote::azure" is guaranteed to be present in the providers list. Therefore, avoid defensive StopIteration handling for next() when locating the Azure provider in providers within src/app/endpoints/query.py. This change applies specifically to this file (or nearby provider lookup code) and relies on the invariant that the Azure provider exists; if the invariant could be violated, keep the existing StopIteration handling.

Applied to files:

  • src/app/endpoints/query.py
📚 Learning: 2026-04-06T20:18:07.852Z
Learnt from: major
Repo: lightspeed-core/lightspeed-stack PR: 1463
File: src/app/endpoints/rlsapi_v1.py:266-271
Timestamp: 2026-04-06T20:18:07.852Z
Learning: In the lightspeed-stack codebase, within `src/app/endpoints/` inference/MCP endpoints, treat `tools: Optional[list[Any]]` in MCP tool definitions as an intentional, consistent typing pattern (used across `query`, `responses`, `streaming_query`, `rlsapi_v1`). Do not raise or suggest this as a typing issue during code review; changing it in isolation could break endpoint typing consistency across the codebase.

Applied to files:

  • src/app/endpoints/query.py
  • src/app/endpoints/streaming_query.py
🔇 Additional comments (18)
src/app/endpoints/streaming_query.py (5)

337-342: LGTM!


350-361: LGTM!


507-512: LGTM!


872-895: LGTM!


701-706: This code path is unreachable dead code; the compacted branch in generate_response is never executed.

The streaming_query_endpoint_handler routes all requests through either generate_response_with_compaction (for compacted paths) or generate_agent_response (standard paths). The deprecated generate_response function is never called from the handler, and all test calls use the default compacted=False. Lines 701–706 are therefore unreachable. Either remove this dead code or mark it explicitly as such with a comment.

			> Likely an incorrect or invalid review comment.
src/pydantic_ai_lightspeed/llamastack/_model.py (5)

1-26: LGTM!


28-47: LGTM!


49-90: LGTM!


91-138: LGTM!


171-190: Document the assumption about MCP argument delta format and consider adding validation.

Line 175 unconditionally appends "}" to combined MCP argument deltas. This assumes Llama Stack's MCP event stream always delivers arguments without the closing brace, but this assumption lacks documentation and validation.

If the buffered deltas already include the closing brace (or if the format changes), this produces malformed JSON. Add a comment explaining why the closing brace is required based on Llama Stack's MCP streaming behavior, and consider a defensive check: combined_args.rstrip() + "}" or verify the last buffered delta doesn't already contain }.

src/pydantic_ai_lightspeed/llamastack/__init__.py (1)

1-14: LGTM!

src/utils/pydantic_ai.py (2)

13-17: LGTM!

Also applies to: 116-121


142-157: LGTM!

src/utils/agents/query.py (1)

280-333: LGTM!

src/utils/agents/streaming.py (2)

83-138: LGTM!


141-266: LGTM!

src/app/endpoints/query.py (1)

231-237: LGTM!

tests/unit/utils/agents/test_streaming.py (1)

530-566: LGTM!

Comment on lines +246 to +277
responses_api = self.client.responses
original_create = responses_api.create

async def create(*args: Any, **kwargs: Any) -> Any:
if (
self.compaction is not None
and "input" in kwargs
and self.compaction.original_input_persisted
):
self.compaction.latest_round_input = cast(
ResponseInput, kwargs["input"]
)

result = await original_create(*args, **kwargs)

if kwargs.get("stream"):
return _NormalizedLlamaStackStream(
cast(AsyncStream[responses.ResponseStreamEvent], result),
self.compaction,
)

if not isinstance(first_chunk, responses.ResponseCreatedEvent):
raise UnexpectedModelBehavior(
f"Expected ResponseCreatedEvent, got {type(first_chunk).__name__}"
if self.compaction is not None:
await append_turn_items_to_conversation(
self.compaction.client,
self.compaction.conversation_id,
self.compaction.latest_round_input,
cast(Sequence[Any], result.output),
)
self.compaction.original_input_persisted = True
return result

yield OpenAIResponsesStreamedResponse(
model_request_parameters=model_request_parameters,
_model_name=first_chunk.response.model,
_model_settings=model_settings_cast,
_response=peekable, # type: ignore[arg-type]
_provider_name=self._provider.name,
_provider_url=self._provider.base_url,
_provider_timestamp=(
number_to_datetime(first_chunk.response.created_at)
if first_chunk.response.created_at
else None
),
)
responses_api.create = create # type: ignore[method-assign]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if multiple LlamaStackResponsesModel instances could share a client
rg -n 'LlamaStackResponsesModel\s*\(' --type py -B5 -A5 | head -80

# Check how clients are created/shared
rg -n 'AsyncLlamaStackClientHolder|get_client' --type py -C2 | head -60

Repository: lightspeed-core/lightspeed-stack

Length of output: 5717


🏁 Script executed:

#!/bin/bash
# Check how the client is obtained in LlamaStackResponsesModel __init__
sed -n '193,230p' src/pydantic_ai_lightspeed/llamastack/_model.py

# Check what parameters are passed to __init__
rg -n 'class LlamaStackResponsesModel' -A 35 --type py

Repository: lightspeed-core/lightspeed-stack

Length of output: 4763


🏁 Script executed:

#!/bin/bash
# Check if the client comes from parent class and how it's set
rg -n 'class OpenAIResponsesModel' --type py -A 30

# Check how client is passed to LlamaStackResponsesModel instantiation
sed -n '140,160p' src/utils/pydantic_ai.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 650


🏁 Script executed:

#!/bin/bash
# Check AsyncLlamaStackClientHolder pattern
fd -e py -x grep -l 'AsyncLlamaStackClientHolder' {} \;

# Look at its implementation
rg -n 'class AsyncLlamaStackClientHolder' -A 20 --type py

Repository: lightspeed-core/lightspeed-stack

Length of output: 3572


🏁 Script executed:

#!/bin/bash
# Find OpenAIResponsesModel and see how it gets the client
rg -n 'class OpenAIResponsesModel' -A 50 --type py | head -100

# Check if self.client is set in __init__
rg -n 'self\.client\s*=' src/pydantic_ai_lightspeed/llamastack/_model.py | head -10

Repository: lightspeed-core/lightspeed-stack

Length of output: 58


🏁 Script executed:

#!/bin/bash
# Check pydantic-ai or parent package for OpenAIResponsesModel
fd -e py -x grep -l 'class OpenAIResponsesModel' {} \;

Repository: lightspeed-core/lightspeed-stack

Length of output: 58


🏁 Script executed:

#!/bin/bash
# Look at full _model.py to understand client initialization
wc -l src/pydantic_ai_lightspeed/llamastack/_model.py
head -150 src/pydantic_ai_lightspeed/llamastack/_model.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 5774


🏁 Script executed:

#!/bin/bash
# Check if multiple LlamaStackResponsesModel instances could exist concurrently
# Look at how build_agent is called and if models persist across requests
rg -n 'build_agent|LlamaStackResponsesModel' src/app/endpoints/*.py | head -40

# Check if there are any uses that keep model instances alive across requests
rg -n 'self\.model|model =' src/app/endpoints/*.py -A 2 | head -60

Repository: lightspeed-core/lightspeed-stack

Length of output: 1862


🏁 Script executed:

#!/bin/bash
# Verify that pydantic-ai's OpenAIResponsesModel gets client from environment or passes it
# Check the documentation or imports to understand client handling
rg -n 'from pydantic_ai' src/pydantic_ai_lightspeed/llamastack/_model.py

# Check if there's any client parameter passed to OpenAIResponsesModel
rg -n 'OpenAIResponsesModel.*client' --type py

Repository: lightspeed-core/lightspeed-stack

Length of output: 188


🏁 Script executed:

#!/bin/bash
# Most critically: check if the patch is idempotent or if overlapping patches cause issues
# Look at what happens when _patch_responses_create is called multiple times
sed -n '246,277p' src/pydantic_ai_lightspeed/llamastack/_model.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 1306


🏁 Script executed:

#!/bin/bash
# Check if requests are processed concurrently or serially
# Look at how endpoints handle incoming requests
rg -n 'async def.*responses|async def.*query' src/app/endpoints/responses.py | head -10

# Check if models are kept alive across requests or created fresh per request
rg -n 'build_agent' src/app/endpoints/responses.py -B5 -A5 | head -40

Repository: lightspeed-core/lightspeed-stack

Length of output: 118


🏁 Script executed:

#!/bin/bash
# Key question: when _patch_responses_create is called multiple times,
# does it create a chain of wrappers, or is it truly idempotent?
# The patch captures 'original_create' - what does it reference on second call?

python3 << 'PY'
# Simulate what happens with multiple patches
class API:
    def create(self):
        return "SDK"

class Model:
    def __init__(self, name, api):
        self.name = name
        self.api = api
        self._patch()
    
    def _patch(self):
        original = self.api.create
        
        def wrapper():
            print(f"{self.name} wrapper calling {original.__name__ if hasattr(original, '__name__') else 'previous wrapper'}")
            return original()
        
        self.api.create = wrapper

api = API()
m1 = Model("Model1", api)
print(f"After m1: {api.create()}")

m2 = Model("Model2", api)
print(f"After m2: {api.create()}")
PY

Repository: lightspeed-core/lightspeed-stack

Length of output: 195


🏁 Script executed:

#!/bin/bash
# The critical issue: check if the closure captures self.compaction correctly
# If two models patch concurrently, they both close over their own self.compaction
# but the second patch overwrites the first, so requests use the wrong compaction

# Look for any synchronization or request isolation mechanisms
rg -n 'asyncio.Lock|threading.Lock|asyncio.Semaphore' src/ --type py
rg -n 'context|Context' src/utils/pydantic_ai.py | head -20

Repository: lightspeed-core/lightspeed-stack

Length of output: 618


Client mutation is a critical issue with concurrent requests.

The _patch_responses_create method mutates the shared singleton AsyncLlamaStackClient.responses.create at model construction time. Since responses_endpoint_handler is async and can process concurrent requests, multiple LlamaStackResponsesModel instances can be created concurrently, each calling _patch_responses_create and overwriting the previous patch.

When Request A and Request B execute concurrently:

  1. Model A patches client.responses.create with a closure capturing A.compaction
  2. Model B immediately patches the same method, overwriting with a closure capturing B.compaction
  3. When Model A processes its response, it now executes Model B's patch and uses B's compaction context (wrong conversation_id, wrong input)

This race condition corrupts conversation state when multiple agents execute in parallel. Recommend either:

  • Creating a per-request client instead of using the singleton
  • Adding per-instance method wrapping that doesn't mutate shared state
  • Synchronizing patch updates with locks
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/pydantic_ai_lightspeed/llamastack/_model.py` around lines 246 - 277, The
_patch_responses_create method mutates the shared singleton
client.responses.create method, causing a race condition where concurrent
LlamaStackResponsesModel instances overwrite each other's patches and execute
the wrong compaction context. Instead of directly assigning to
responses_api.create, implement per-instance method wrapping that preserves the
original create method behavior while maintaining instance-specific state
without mutating the shared client, or create a per-request client instance to
avoid sharing mutable state between concurrent requests.

@asimurka asimurka marked this pull request as draft June 22, 2026 08:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant