Skip to content

Commit fea1da4

Browse files
lpcoxCopilot
andcommitted
review: enforce single HALF-OPEN probe, injectable clock, preserve backend errors
Address all 5 existing review comments plus 2 additional findings: 1. HALF-OPEN probe tracking: add probeInFlight flag so only one probe passes in HALF-OPEN state; concurrent requests get ErrCircuitOpen. Add TestCircuitBreaker_HalfOpenBlocksConcurrentProbes. 2. Nil map guard: getCircuitBreaker initializes circuitBreakers map when nil, preventing panic in test constructors that bypass NewUnified. 3. Preserve backend error text: rate-limit detection now returns the original upstream error message via extractRateLimitErrorText instead of wrapping in ErrCircuitOpen. ErrCircuitOpen is only returned when cb.Allow() rejects the call before contacting the backend. 4. TOML-only doc comments: clarify that rate_limit_threshold and rate_limit_cooldown are not wired for stdin JSON config. 5. Injectable clock: replace time.Now() in circuit breaker with nowFunc field (default time.Now). Tests use deterministic fake time instead of flaky time.Sleep-based assertions. 6. Operator precedence: add explicit parentheses in isRateLimitText for the compound rate limit + 403 condition. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 4b0ba32 commit fea1da4

4 files changed

Lines changed: 140 additions & 24 deletions

File tree

internal/config/config_core.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,11 +219,13 @@ type ServerConfig struct {
219219
// RateLimitThreshold is the number of consecutive rate-limit errors from this backend
220220
// that will trip the circuit breaker (transition CLOSED → OPEN). When OPEN, requests
221221
// are immediately rejected until the cooldown period elapses. Default: 3.
222+
// Supported in file-based config (TOML/JSON); stdin JSON config does not currently accept this field.
222223
RateLimitThreshold int `toml:"rate_limit_threshold" json:"rate_limit_threshold,omitempty"`
223224

224225
// RateLimitCooldown is the number of seconds the circuit breaker stays OPEN before
225226
// allowing a single probe request (transition OPEN → HALF-OPEN). If the probe
226227
// succeeds the circuit closes; if rate-limited again it re-opens. Default: 60.
228+
// Supported in file-based config (TOML/JSON); stdin JSON config does not currently accept this field.
227229
RateLimitCooldown int `toml:"rate_limit_cooldown" json:"rate_limit_cooldown,omitempty"`
228230
}
229231

internal/server/circuit_breaker.go

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,16 @@ type circuitBreaker struct {
6161
openedAt time.Time
6262
// resetAt is the time when the upstream rate limit resets, parsed from
6363
// the X-RateLimit-Reset header or the tool response message.
64-
resetAt time.Time
65-
serverID string
64+
resetAt time.Time
65+
probeInFlight bool
66+
serverID string
6667

6768
threshold int
6869
cooldown time.Duration
70+
71+
// nowFunc returns the current time. Defaults to time.Now; overridden in tests
72+
// to avoid flaky time.Sleep-based assertions.
73+
nowFunc func() time.Time
6974
}
7075

7176
// newCircuitBreaker creates a circuit breaker for the given server ID.
@@ -83,6 +88,7 @@ func newCircuitBreaker(serverID string, threshold int, cooldown time.Duration) *
8388
state: circuitClosed,
8489
threshold: threshold,
8590
cooldown: cooldown,
91+
nowFunc: time.Now,
8692
}
8793
}
8894

@@ -114,7 +120,7 @@ func (cb *circuitBreaker) Allow() error {
114120
case circuitOpen:
115121
// Check whether we should transition to HALF-OPEN.
116122
// We use the upstream reset time when available, otherwise the cooldown.
117-
now := time.Now()
123+
now := cb.nowFunc()
118124
var openUntil time.Time
119125
if !cb.resetAt.IsZero() && cb.resetAt.After(cb.openedAt) {
120126
openUntil = cb.resetAt
@@ -125,12 +131,18 @@ func (cb *circuitBreaker) Allow() error {
125131
logCircuitBreaker.Printf("server %q circuit breaker OPEN → HALF-OPEN after cooldown", cb.serverID)
126132
logger.LogInfo("backend", "circuit breaker for server %q transitioning OPEN → HALF-OPEN", cb.serverID)
127133
cb.state = circuitHalfOpen
128-
return nil // allow the probe
134+
cb.probeInFlight = true
135+
return nil // allow the single probe
129136
}
130137
return &ErrCircuitOpen{ServerID: cb.serverID, ResetAt: cb.resetAt}
131138

132139
case circuitHalfOpen:
133-
// One probe is allowed through; further requests are blocked.
140+
// Only one probe is allowed; further requests are blocked until the probe resolves.
141+
if cb.probeInFlight {
142+
return &ErrCircuitOpen{ServerID: cb.serverID, ResetAt: cb.resetAt}
143+
}
144+
// This shouldn't normally happen (probe resolved but state wasn't updated),
145+
// but allow through defensively.
134146
return nil
135147
}
136148

@@ -145,6 +157,7 @@ func (cb *circuitBreaker) RecordSuccess() {
145157

146158
prev := cb.state
147159
cb.consecutiveErrors = 0
160+
cb.probeInFlight = false
148161
if cb.state == circuitHalfOpen {
149162
cb.state = circuitClosed
150163
cb.resetAt = time.Time{}
@@ -163,6 +176,7 @@ func (cb *circuitBreaker) RecordRateLimit(resetAt time.Time) {
163176
defer cb.mu.Unlock()
164177

165178
cb.consecutiveErrors++
179+
cb.probeInFlight = false
166180
if !resetAt.IsZero() {
167181
cb.resetAt = resetAt
168182
}
@@ -171,7 +185,7 @@ func (cb *circuitBreaker) RecordRateLimit(resetAt time.Time) {
171185
case circuitClosed:
172186
if cb.consecutiveErrors >= cb.threshold {
173187
cb.state = circuitOpen
174-
cb.openedAt = time.Now()
188+
cb.openedAt = cb.nowFunc()
175189
logger.LogError("backend",
176190
"circuit breaker for server %q OPENED after %d consecutive rate-limit errors; resets at %s",
177191
cb.serverID, cb.consecutiveErrors, formatResetAt(cb.resetAt))
@@ -185,7 +199,7 @@ func (cb *circuitBreaker) RecordRateLimit(resetAt time.Time) {
185199
case circuitHalfOpen:
186200
// Probe failed — re-open the circuit.
187201
cb.state = circuitOpen
188-
cb.openedAt = time.Now()
202+
cb.openedAt = cb.nowFunc()
189203
logger.LogError("backend",
190204
"circuit breaker for server %q re-OPENED after probe was rate-limited; resets at %s",
191205
cb.serverID, formatResetAt(cb.resetAt))
@@ -213,6 +227,27 @@ func formatResetAt(t time.Time) string {
213227
return fmt.Sprintf("%s (in %s)", t.UTC().Format(time.RFC3339), time.Until(t).Round(time.Second))
214228
}
215229

230+
// extractRateLimitErrorText extracts the text content from a raw tool result
231+
// that has been identified as a rate-limit error. Returns the original backend
232+
// message so agents see the actual upstream error rather than a synthetic one.
233+
func extractRateLimitErrorText(result interface{}) string {
234+
m, ok := result.(map[string]interface{})
235+
if !ok {
236+
return "rate limit exceeded"
237+
}
238+
contents, _ := m["content"].([]interface{})
239+
for _, c := range contents {
240+
cm, ok := c.(map[string]interface{})
241+
if !ok {
242+
continue
243+
}
244+
if text, ok := cm["text"].(string); ok && text != "" {
245+
return text
246+
}
247+
}
248+
return "rate limit exceeded"
249+
}
250+
216251
// isRateLimitToolResult reports whether a raw tool call result indicates
217252
// a rate-limit error from the GitHub MCP server. It inspects the `isError`
218253
// flag and the text content for well-known rate-limit phrases.
@@ -251,7 +286,7 @@ func isRateLimitToolResult(result interface{}) (bool, time.Time) {
251286
func isRateLimitText(text string) bool {
252287
lower := strings.ToLower(text)
253288
return strings.Contains(lower, "rate limit exceeded") ||
254-
strings.Contains(lower, "rate limit") && strings.Contains(lower, "403") ||
289+
(strings.Contains(lower, "rate limit") && strings.Contains(lower, "403")) ||
255290
strings.Contains(lower, "api rate limit") ||
256291
strings.Contains(lower, "secondary rate limit") ||
257292
strings.Contains(lower, "too many requests")

internal/server/circuit_breaker_test.go

Lines changed: 84 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,19 @@ func TestCircuitBreaker_SuccessResetsCounter(t *testing.T) {
6161
// TestCircuitBreaker_HalfOpenAfterCooldown verifies OPEN → HALF-OPEN transition.
6262
func TestCircuitBreaker_HalfOpenAfterCooldown(t *testing.T) {
6363
t.Parallel()
64-
// Use a very short cooldown so the test doesn't sleep long.
65-
cb := newCircuitBreaker("test", 1, 10*time.Millisecond)
64+
fakeNow := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
65+
cb := newCircuitBreaker("test", 1, time.Minute)
66+
cb.nowFunc = func() time.Time { return fakeNow }
6667

6768
cb.RecordRateLimit(time.Time{})
6869
require.Equal(t, circuitOpen, cb.State(), "should be OPEN after 1 error")
6970

70-
// Wait for cooldown.
71-
time.Sleep(20 * time.Millisecond)
71+
// Before cooldown: still OPEN.
72+
fakeNow = fakeNow.Add(30 * time.Second)
73+
require.Error(t, cb.Allow(), "should reject before cooldown elapses")
7274

75+
// After cooldown: transitions to HALF-OPEN.
76+
fakeNow = fakeNow.Add(31 * time.Second)
7377
err := cb.Allow()
7478
assert.NoError(t, err, "should allow probe after cooldown")
7579
assert.Equal(t, circuitHalfOpen, cb.State(), "should be HALF-OPEN after cooldown")
@@ -78,12 +82,14 @@ func TestCircuitBreaker_HalfOpenAfterCooldown(t *testing.T) {
7882
// TestCircuitBreaker_HalfOpenClosesOnSuccess verifies HALF-OPEN → CLOSED on probe success.
7983
func TestCircuitBreaker_HalfOpenClosesOnSuccess(t *testing.T) {
8084
t.Parallel()
81-
cb := newCircuitBreaker("test", 1, 10*time.Millisecond)
85+
fakeNow := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
86+
cb := newCircuitBreaker("test", 1, time.Minute)
87+
cb.nowFunc = func() time.Time { return fakeNow }
8288

8389
cb.RecordRateLimit(time.Time{})
8490
require.Equal(t, circuitOpen, cb.State())
8591

86-
time.Sleep(20 * time.Millisecond)
92+
fakeNow = fakeNow.Add(2 * time.Minute)
8793
require.NoError(t, cb.Allow()) // probe allowed
8894

8995
cb.RecordSuccess()
@@ -94,12 +100,14 @@ func TestCircuitBreaker_HalfOpenClosesOnSuccess(t *testing.T) {
94100
// TestCircuitBreaker_HalfOpenReOpensOnRateLimit verifies HALF-OPEN → OPEN on probe failure.
95101
func TestCircuitBreaker_HalfOpenReOpensOnRateLimit(t *testing.T) {
96102
t.Parallel()
97-
cb := newCircuitBreaker("test", 1, 10*time.Millisecond)
103+
fakeNow := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
104+
cb := newCircuitBreaker("test", 1, time.Minute)
105+
cb.nowFunc = func() time.Time { return fakeNow }
98106

99107
cb.RecordRateLimit(time.Time{})
100108
require.Equal(t, circuitOpen, cb.State())
101109

102-
time.Sleep(20 * time.Millisecond)
110+
fakeNow = fakeNow.Add(2 * time.Minute)
103111
require.NoError(t, cb.Allow()) // probe allowed
104112

105113
cb.RecordRateLimit(time.Time{})
@@ -114,21 +122,54 @@ func TestCircuitBreaker_HalfOpenReOpensOnRateLimit(t *testing.T) {
114122
// TestCircuitBreaker_ResetAtFromHeader verifies the reset time from upstream is used.
115123
func TestCircuitBreaker_ResetAtFromHeader(t *testing.T) {
116124
t.Parallel()
117-
cb := newCircuitBreaker("test", 1, 60*time.Second)
118-
future := time.Now().Add(5 * time.Millisecond)
119-
cb.RecordRateLimit(future)
125+
fakeNow := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
126+
cb := newCircuitBreaker("test", 1, time.Hour)
127+
cb.nowFunc = func() time.Time { return fakeNow }
128+
129+
resetAt := fakeNow.Add(30 * time.Second)
130+
cb.RecordRateLimit(resetAt)
120131
require.Equal(t, circuitOpen, cb.State())
121132

122133
// Before the reset time: still OPEN.
134+
fakeNow = fakeNow.Add(15 * time.Second)
123135
require.Error(t, cb.Allow())
124136

125-
// After the reset time: transitions to HALF-OPEN.
126-
time.Sleep(10 * time.Millisecond)
137+
// After the reset time: transitions to HALF-OPEN (before cooldown would elapse).
138+
fakeNow = fakeNow.Add(20 * time.Second)
127139
err := cb.Allow()
128140
assert.NoError(t, err, "should allow probe after reset time")
129141
assert.Equal(t, circuitHalfOpen, cb.State())
130142
}
131143

144+
// TestCircuitBreaker_HalfOpenBlocksConcurrentProbes verifies that only one probe is allowed in HALF-OPEN.
145+
func TestCircuitBreaker_HalfOpenBlocksConcurrentProbes(t *testing.T) {
146+
t.Parallel()
147+
fakeNow := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
148+
cb := newCircuitBreaker("test", 1, time.Minute)
149+
cb.nowFunc = func() time.Time { return fakeNow }
150+
151+
cb.RecordRateLimit(time.Time{})
152+
require.Equal(t, circuitOpen, cb.State())
153+
154+
// Advance past cooldown to trigger HALF-OPEN.
155+
fakeNow = fakeNow.Add(2 * time.Minute)
156+
157+
// First Allow() should succeed (the probe).
158+
require.NoError(t, cb.Allow())
159+
assert.Equal(t, circuitHalfOpen, cb.State())
160+
161+
// Second Allow() should be rejected — probe is already in flight.
162+
err := cb.Allow()
163+
require.Error(t, err, "concurrent requests in HALF-OPEN should be rejected")
164+
var openErr *ErrCircuitOpen
165+
require.ErrorAs(t, err, &openErr)
166+
167+
// After the probe succeeds, requests should be allowed again.
168+
cb.RecordSuccess()
169+
assert.Equal(t, circuitClosed, cb.State())
170+
assert.NoError(t, cb.Allow())
171+
}
172+
132173
// TestCircuitBreaker_DefaultsApplied verifies zero-value config gets sensible defaults.
133174
func TestCircuitBreaker_DefaultsApplied(t *testing.T) {
134175
t.Parallel()
@@ -362,3 +403,33 @@ func TestParseRateLimitResetHeader(t *testing.T) {
362403
})
363404
}
364405
}
406+
407+
// TestExtractRateLimitErrorText verifies extraction of error text from backend results.
408+
func TestExtractRateLimitErrorText(t *testing.T) {
409+
t.Parallel()
410+
411+
t.Run("extracts text from standard rate-limit result", func(t *testing.T) {
412+
t.Parallel()
413+
result := map[string]interface{}{
414+
"isError": true,
415+
"content": []interface{}{
416+
map[string]interface{}{
417+
"type": "text",
418+
"text": "failed to search: 403 API rate limit exceeded [rate reset in 42s]",
419+
},
420+
},
421+
}
422+
assert.Equal(t, "failed to search: 403 API rate limit exceeded [rate reset in 42s]", extractRateLimitErrorText(result))
423+
})
424+
425+
t.Run("returns fallback for nil result", func(t *testing.T) {
426+
t.Parallel()
427+
assert.Equal(t, "rate limit exceeded", extractRateLimitErrorText(nil))
428+
})
429+
430+
t.Run("returns fallback for empty content", func(t *testing.T) {
431+
t.Parallel()
432+
result := map[string]interface{}{"isError": true, "content": []interface{}{}}
433+
assert.Equal(t, "rate limit exceeded", extractRateLimitErrorText(result))
434+
})
435+
}

internal/server/unified.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,9 @@ func buildCircuitBreakers(cfg *config.Config) map[string]*circuitBreaker {
386386
// getCircuitBreaker returns the circuit breaker for serverID, creating one with
387387
// defaults if none exists (e.g., when called from tests that bypass NewUnified).
388388
func (us *UnifiedServer) getCircuitBreaker(serverID string) *circuitBreaker {
389+
if us.circuitBreakers == nil {
390+
us.circuitBreakers = make(map[string]*circuitBreaker)
391+
}
389392
if cb, ok := us.circuitBreakers[serverID]; ok {
390393
return cb
391394
}
@@ -588,9 +591,14 @@ func (us *UnifiedServer) callBackendTool(ctx context.Context, serverID, toolName
588591
cb.RecordRateLimit(resetAt)
589592
execSpan.SetAttributes(attribute.Bool("rate_limit.hit", true))
590593
httpStatusCode = 429
591-
// Return the original error message so the agent can see it.
592-
return newErrorCallToolResult(fmt.Errorf("backend server %q rate-limited: %w",
593-
serverID, &ErrCircuitOpen{ServerID: serverID, ResetAt: resetAt}))
594+
// Preserve the original backend error text so the agent sees the actual upstream
595+
// rate-limit details. ErrCircuitOpen is only returned when cb.Allow() rejects
596+
// the call before contacting the backend.
597+
errText := extractRateLimitErrorText(backendResult)
598+
return &sdk.CallToolResult{
599+
Content: []sdk.Content{&sdk.TextContent{Text: errText}},
600+
IsError: true,
601+
}, backendResult, nil
594602
}
595603
cb.RecordSuccess()
596604

0 commit comments

Comments
 (0)