LCORE-: Wired compaction turn persistence into agentic query flows#1953
LCORE-: Wired compaction turn persistence into agentic query flows#1953asimurka wants to merge 1 commit into
Conversation
WalkthroughThe PR removes ChangesCompaction Turn Persistence Refactor
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/pydantic_ai_lightspeed/llamastack/_model.py`:
- Around line 246-277: The _patch_responses_create method mutates the shared
singleton client.responses.create method, causing a race condition where
concurrent LlamaStackResponsesModel instances overwrite each other's patches and
execute the wrong compaction context. Instead of directly assigning to
responses_api.create, implement per-instance method wrapping that preserves the
original create method behavior while maintaining instance-specific state
without mutating the shared client, or create a per-request client instance to
avoid sharing mutable state between concurrent requests.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 3a16c48b-ee7d-4f57-a12e-b55fc68e3576
📒 Files selected for processing (10)
src/app/endpoints/query.pysrc/app/endpoints/streaming_query.pysrc/models/common/turn_summary.pysrc/pydantic_ai_lightspeed/llamastack/__init__.pysrc/pydantic_ai_lightspeed/llamastack/_model.pysrc/utils/agents/query.pysrc/utils/agents/streaming.pysrc/utils/pydantic_ai.pytests/unit/app/endpoints/test_streaming_query.pytests/unit/utils/agents/test_streaming.py
💤 Files with no reviewable changes (2)
- tests/unit/app/endpoints/test_streaming_query.py
- src/models/common/turn_summary.py
📜 Review details
⏰ Context from checks skipped due to timeout. (2)
- GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-0-6-on-pull-request
- GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-on-pull-request
🧰 Additional context used
📓 Path-based instructions (4)
src/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/**/*.py: Use absolute imports for internal modules:from authentication import get_auth_dependency
Llama Stack imports: Usefrom llama_stack_client import AsyncLlamaStackClient
Checkconstants.pyfor shared constants before defining new ones
All modules must start with descriptive docstrings explaining purpose
Uselogger = get_logger(__name__)fromlog.pyfor module logging
All functions must have complete type annotations for parameters and return types, use modern syntax (str | int), and include descriptive docstrings
Use snake_case with descriptive, action-oriented names for functions (get_, validate_, check_)
Avoid in-place parameter modification anti-patterns; return new data structures instead of modifying function parameters
Useasync deffor I/O operations and external API calls
Use standard log levels with clear purposes:debug()for diagnostic info,info()for program execution,warning()for unexpected events,error()for serious problems
All classes must have descriptive docstrings explaining purpose and use PascalCase with standard suffixes:Configuration,Error/Exception,Resolver,Interface
Abstract classes must use ABC with@abstractmethoddecorators
Follow Google Python docstring conventions with required sections: Parameters, Returns, Raises, and Attributes for classes
Files:
src/pydantic_ai_lightspeed/llamastack/__init__.pysrc/app/endpoints/query.pysrc/utils/pydantic_ai.pysrc/utils/agents/query.pysrc/utils/agents/streaming.pysrc/app/endpoints/streaming_query.pysrc/pydantic_ai_lightspeed/llamastack/_model.py
src/**/__init__.py
📄 CodeRabbit inference engine (AGENTS.md)
Package
__init__.pyfiles must contain brief package descriptions
Files:
src/pydantic_ai_lightspeed/llamastack/__init__.py
src/app/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/app/**/*.py: FastAPI dependencies: Import fromfastapimodule forAPIRouter,HTTPException,Request,status,Depends
Use FastAPIHTTPExceptionwith appropriate status codes for API endpoints and handleAPIConnectionErrorfrom Llama Stack
Files:
src/app/endpoints/query.pysrc/app/endpoints/streaming_query.py
tests/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
tests/**/*.py: Use pytest for all unit and integration tests; do not use unittest
Usepytest.mark.asynciomarker for async tests
Files:
tests/unit/utils/agents/test_streaming.py
🧠 Learnings (2)
📚 Learning: 2026-01-14T09:37:51.612Z
Learnt from: asimurka
Repo: lightspeed-core/lightspeed-stack PR: 988
File: src/app/endpoints/query.py:319-339
Timestamp: 2026-01-14T09:37:51.612Z
Learning: In the lightspeed-stack repository, when provider_id == "azure", the Azure provider with provider_type "remote::azure" is guaranteed to be present in the providers list. Therefore, avoid defensive StopIteration handling for next() when locating the Azure provider in providers within src/app/endpoints/query.py. This change applies specifically to this file (or nearby provider lookup code) and relies on the invariant that the Azure provider exists; if the invariant could be violated, keep the existing StopIteration handling.
Applied to files:
src/app/endpoints/query.py
📚 Learning: 2026-04-06T20:18:07.852Z
Learnt from: major
Repo: lightspeed-core/lightspeed-stack PR: 1463
File: src/app/endpoints/rlsapi_v1.py:266-271
Timestamp: 2026-04-06T20:18:07.852Z
Learning: In the lightspeed-stack codebase, within `src/app/endpoints/` inference/MCP endpoints, treat `tools: Optional[list[Any]]` in MCP tool definitions as an intentional, consistent typing pattern (used across `query`, `responses`, `streaming_query`, `rlsapi_v1`). Do not raise or suggest this as a typing issue during code review; changing it in isolation could break endpoint typing consistency across the codebase.
Applied to files:
src/app/endpoints/query.pysrc/app/endpoints/streaming_query.py
🔇 Additional comments (18)
src/app/endpoints/streaming_query.py (5)
337-342: LGTM!
350-361: LGTM!
507-512: LGTM!
872-895: LGTM!
701-706: This code path is unreachable dead code; the compacted branch ingenerate_responseis never executed.The
streaming_query_endpoint_handlerroutes all requests through eithergenerate_response_with_compaction(for compacted paths) orgenerate_agent_response(standard paths). The deprecatedgenerate_responsefunction is never called from the handler, and all test calls use the defaultcompacted=False. Lines 701–706 are therefore unreachable. Either remove this dead code or mark it explicitly as such with a comment.> Likely an incorrect or invalid review comment.src/pydantic_ai_lightspeed/llamastack/_model.py (5)
1-26: LGTM!
28-47: LGTM!
49-90: LGTM!
91-138: LGTM!
171-190: Document the assumption about MCP argument delta format and consider adding validation.Line 175 unconditionally appends
"}"to combined MCP argument deltas. This assumes Llama Stack's MCP event stream always delivers arguments without the closing brace, but this assumption lacks documentation and validation.If the buffered deltas already include the closing brace (or if the format changes), this produces malformed JSON. Add a comment explaining why the closing brace is required based on Llama Stack's MCP streaming behavior, and consider a defensive check:
combined_args.rstrip() + "}"or verify the last buffered delta doesn't already contain}.src/pydantic_ai_lightspeed/llamastack/__init__.py (1)
1-14: LGTM!src/utils/pydantic_ai.py (2)
13-17: LGTM!Also applies to: 116-121
142-157: LGTM!src/utils/agents/query.py (1)
280-333: LGTM!src/utils/agents/streaming.py (2)
83-138: LGTM!
141-266: LGTM!src/app/endpoints/query.py (1)
231-237: LGTM!tests/unit/utils/agents/test_streaming.py (1)
530-566: LGTM!
| responses_api = self.client.responses | ||
| original_create = responses_api.create | ||
|
|
||
| async def create(*args: Any, **kwargs: Any) -> Any: | ||
| if ( | ||
| self.compaction is not None | ||
| and "input" in kwargs | ||
| and self.compaction.original_input_persisted | ||
| ): | ||
| self.compaction.latest_round_input = cast( | ||
| ResponseInput, kwargs["input"] | ||
| ) | ||
|
|
||
| result = await original_create(*args, **kwargs) | ||
|
|
||
| if kwargs.get("stream"): | ||
| return _NormalizedLlamaStackStream( | ||
| cast(AsyncStream[responses.ResponseStreamEvent], result), | ||
| self.compaction, | ||
| ) | ||
|
|
||
| if not isinstance(first_chunk, responses.ResponseCreatedEvent): | ||
| raise UnexpectedModelBehavior( | ||
| f"Expected ResponseCreatedEvent, got {type(first_chunk).__name__}" | ||
| if self.compaction is not None: | ||
| await append_turn_items_to_conversation( | ||
| self.compaction.client, | ||
| self.compaction.conversation_id, | ||
| self.compaction.latest_round_input, | ||
| cast(Sequence[Any], result.output), | ||
| ) | ||
| self.compaction.original_input_persisted = True | ||
| return result | ||
|
|
||
| yield OpenAIResponsesStreamedResponse( | ||
| model_request_parameters=model_request_parameters, | ||
| _model_name=first_chunk.response.model, | ||
| _model_settings=model_settings_cast, | ||
| _response=peekable, # type: ignore[arg-type] | ||
| _provider_name=self._provider.name, | ||
| _provider_url=self._provider.base_url, | ||
| _provider_timestamp=( | ||
| number_to_datetime(first_chunk.response.created_at) | ||
| if first_chunk.response.created_at | ||
| else None | ||
| ), | ||
| ) | ||
| responses_api.create = create # type: ignore[method-assign] |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if multiple LlamaStackResponsesModel instances could share a client
rg -n 'LlamaStackResponsesModel\s*\(' --type py -B5 -A5 | head -80
# Check how clients are created/shared
rg -n 'AsyncLlamaStackClientHolder|get_client' --type py -C2 | head -60Repository: lightspeed-core/lightspeed-stack
Length of output: 5717
🏁 Script executed:
#!/bin/bash
# Check how the client is obtained in LlamaStackResponsesModel __init__
sed -n '193,230p' src/pydantic_ai_lightspeed/llamastack/_model.py
# Check what parameters are passed to __init__
rg -n 'class LlamaStackResponsesModel' -A 35 --type pyRepository: lightspeed-core/lightspeed-stack
Length of output: 4763
🏁 Script executed:
#!/bin/bash
# Check if the client comes from parent class and how it's set
rg -n 'class OpenAIResponsesModel' --type py -A 30
# Check how client is passed to LlamaStackResponsesModel instantiation
sed -n '140,160p' src/utils/pydantic_ai.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 650
🏁 Script executed:
#!/bin/bash
# Check AsyncLlamaStackClientHolder pattern
fd -e py -x grep -l 'AsyncLlamaStackClientHolder' {} \;
# Look at its implementation
rg -n 'class AsyncLlamaStackClientHolder' -A 20 --type pyRepository: lightspeed-core/lightspeed-stack
Length of output: 3572
🏁 Script executed:
#!/bin/bash
# Find OpenAIResponsesModel and see how it gets the client
rg -n 'class OpenAIResponsesModel' -A 50 --type py | head -100
# Check if self.client is set in __init__
rg -n 'self\.client\s*=' src/pydantic_ai_lightspeed/llamastack/_model.py | head -10Repository: lightspeed-core/lightspeed-stack
Length of output: 58
🏁 Script executed:
#!/bin/bash
# Check pydantic-ai or parent package for OpenAIResponsesModel
fd -e py -x grep -l 'class OpenAIResponsesModel' {} \;Repository: lightspeed-core/lightspeed-stack
Length of output: 58
🏁 Script executed:
#!/bin/bash
# Look at full _model.py to understand client initialization
wc -l src/pydantic_ai_lightspeed/llamastack/_model.py
head -150 src/pydantic_ai_lightspeed/llamastack/_model.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 5774
🏁 Script executed:
#!/bin/bash
# Check if multiple LlamaStackResponsesModel instances could exist concurrently
# Look at how build_agent is called and if models persist across requests
rg -n 'build_agent|LlamaStackResponsesModel' src/app/endpoints/*.py | head -40
# Check if there are any uses that keep model instances alive across requests
rg -n 'self\.model|model =' src/app/endpoints/*.py -A 2 | head -60Repository: lightspeed-core/lightspeed-stack
Length of output: 1862
🏁 Script executed:
#!/bin/bash
# Verify that pydantic-ai's OpenAIResponsesModel gets client from environment or passes it
# Check the documentation or imports to understand client handling
rg -n 'from pydantic_ai' src/pydantic_ai_lightspeed/llamastack/_model.py
# Check if there's any client parameter passed to OpenAIResponsesModel
rg -n 'OpenAIResponsesModel.*client' --type pyRepository: lightspeed-core/lightspeed-stack
Length of output: 188
🏁 Script executed:
#!/bin/bash
# Most critically: check if the patch is idempotent or if overlapping patches cause issues
# Look at what happens when _patch_responses_create is called multiple times
sed -n '246,277p' src/pydantic_ai_lightspeed/llamastack/_model.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 1306
🏁 Script executed:
#!/bin/bash
# Check if requests are processed concurrently or serially
# Look at how endpoints handle incoming requests
rg -n 'async def.*responses|async def.*query' src/app/endpoints/responses.py | head -10
# Check if models are kept alive across requests or created fresh per request
rg -n 'build_agent' src/app/endpoints/responses.py -B5 -A5 | head -40Repository: lightspeed-core/lightspeed-stack
Length of output: 118
🏁 Script executed:
#!/bin/bash
# Key question: when _patch_responses_create is called multiple times,
# does it create a chain of wrappers, or is it truly idempotent?
# The patch captures 'original_create' - what does it reference on second call?
python3 << 'PY'
# Simulate what happens with multiple patches
class API:
def create(self):
return "SDK"
class Model:
def __init__(self, name, api):
self.name = name
self.api = api
self._patch()
def _patch(self):
original = self.api.create
def wrapper():
print(f"{self.name} wrapper calling {original.__name__ if hasattr(original, '__name__') else 'previous wrapper'}")
return original()
self.api.create = wrapper
api = API()
m1 = Model("Model1", api)
print(f"After m1: {api.create()}")
m2 = Model("Model2", api)
print(f"After m2: {api.create()}")
PYRepository: lightspeed-core/lightspeed-stack
Length of output: 195
🏁 Script executed:
#!/bin/bash
# The critical issue: check if the closure captures self.compaction correctly
# If two models patch concurrently, they both close over their own self.compaction
# but the second patch overwrites the first, so requests use the wrong compaction
# Look for any synchronization or request isolation mechanisms
rg -n 'asyncio.Lock|threading.Lock|asyncio.Semaphore' src/ --type py
rg -n 'context|Context' src/utils/pydantic_ai.py | head -20Repository: lightspeed-core/lightspeed-stack
Length of output: 618
Client mutation is a critical issue with concurrent requests.
The _patch_responses_create method mutates the shared singleton AsyncLlamaStackClient.responses.create at model construction time. Since responses_endpoint_handler is async and can process concurrent requests, multiple LlamaStackResponsesModel instances can be created concurrently, each calling _patch_responses_create and overwriting the previous patch.
When Request A and Request B execute concurrently:
- Model A patches
client.responses.createwith a closure capturingA.compaction - Model B immediately patches the same method, overwriting with a closure capturing
B.compaction - When Model A processes its response, it now executes Model B's patch and uses B's
compactioncontext (wrong conversation_id, wrong input)
This race condition corrupts conversation state when multiple agents execute in parallel. Recommend either:
- Creating a per-request client instead of using the singleton
- Adding per-instance method wrapping that doesn't mutate shared state
- Synchronizing patch updates with locks
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/pydantic_ai_lightspeed/llamastack/_model.py` around lines 246 - 277, The
_patch_responses_create method mutates the shared singleton
client.responses.create method, causing a race condition where concurrent
LlamaStackResponsesModel instances overwrite each other's patches and execute
the wrong compaction context. Instead of directly assigning to
responses_api.create, implement per-instance method wrapping that preserves the
original create method behavior while maintaining instance-specific state
without mutating the shared client, or create a per-request client instance to
avoid sharing mutable state between concurrent requests.
Description
Wires explicit turn compaction for compacted turns in query agentic flows.
Type of change
Tools used to create PR
Identify any AI code assistants used in this PR (for transparency and review context)
Related Tickets & Documents
Checklist before requesting a review
Testing
Summary by CodeRabbit
Refactor
Tests