fix(waterdata): gate the chunked fan-out with a semaphore, not the connection pool#322
Draft
thodson-usgs wants to merge 1 commit into
Draft
fix(waterdata): gate the chunked fan-out with a semaphore, not the connection pool#322thodson-usgs wants to merge 1 commit into
thodson-usgs wants to merge 1 commit into
Conversation
442b4b3 to
2cf2e08
Compare
thodson-usgs
commented
Jun 12, 2026
thodson-usgs
left a comment
Collaborator
Author
There was a problem hiding this comment.
Please make these changes.
Collaborator
Author
|
@ehinman, this PR won't make sense until you're caught up on recent history. While you were away, I swapped |
ChunkedCall._run dispatched every pending sub-request into one asyncio.gather and relied on the shared httpx.AsyncClient connection pool as the only concurrency throttle (max_connections sized from API_USGS_CONCURRENT). That collides with the client's pool-acquire timeout (60 s, from HTTPX_DEFAULTS): a sub-request that can't get a connection waits in httpx's pool queue, and that wait is bounded by the pool-acquire timeout. So whenever every pooled connection stays busy past that window with none freeing — a batch of large, slowly-streaming pages is enough — the still-queued tail of the fan-out times out with httpx.PoolTimeout. Being a TransportError it burns the per-sub-request retry budget and ultimately surfaces as a bogus *resumable* ServiceInterrupted, telling the user to wait for an upstream that never saw the request. Gate each fetch attempt with an asyncio.Semaphore sized from API_USGS_CONCURRENT instead; the connection pool is now merely sized to match so in-flight sub-requests reuse keepalive connections. Parked sub-requests wait on the semaphore before they touch the pool, so no transport clock runs while they wait and the pool timeout reverts to its protective role (a genuinely wedged checkout). The slot is acquired per attempt, so a sub-request sleeping off a retry backoff doesn't hold one. "unbounded" degenerates to a semaphore sized at the plan total, so there is a single gated code path. Observable behavior is otherwise unchanged: same plan, same sub-request order, same resume semantics. Tests: - in-flight high-water-mark probe (parametrized capped/unbounded) — the fetch-level concurrency equals the cap, not the plan total; the capped case fails on the pre-fix code. - real-localhost-server end-to-end test — mock transports bypass the pool, so this drives the chunker's shared client against a slow server past a scaled-down pool timeout; reproduces the spurious resumable ServiceInterrupted on the pre-fix code and completes on this branch. Also raise the default API_USGS_CONCURRENT from 16 to 32 and correct the concurrency rationale: N caps how many of a chunked query's sub-requests are in flight at once (a client-side connection/latency knob), but does not affect the API rate limit -- a chunked call issues the same number of sub-requests regardless of N. Live testing showed the API serves 300 simultaneous requests without 5xx; heavy use is rate-limited by request volume (HTTP 429), mitigated by an API_USGS_PAT token. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
1e70998 to
32f6918
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
ChunkedCall._rundispatches every pending sub-request into oneasyncio.gatherand relies on the sharedhttpx.AsyncClient's connection pool as the only concurrency throttle (max_connections = API_USGS_CONCURRENT). That collides with the client's pool-acquire timeout (60 s, fromHTTPX_DEFAULTS):httpx.PoolTimeout.TransportError, thatPoolTimeoutburns the per-sub-request retry budget and ultimately surfaces as a bogus resumableServiceInterrupted, telling the user to wait for an upstream that never saw the request.Fix
Gate each fetch attempt with an
asyncio.Semaphoresized fromAPI_USGS_CONCURRENT; the connection pool is now merely sized to match so in-flight sub-requests reuse keepalive connections. Parked sub-requests wait on the semaphore before they touch the pool — no transport clock runs while they wait, so the pool-acquire timeout reverts to its protective role (a genuinely wedged checkout). The slot is acquired per attempt (inside the retry driver), so a sub-request sleeping off a retry backoff doesn't hold one.unboundeddegenerates to a semaphore sized at the plan total, so there is a single gated code path.Observable behavior is otherwise unchanged: same plan, same sub-request order, same resume semantics.
Why not just disable the pool timeout?
Setting
pool=Noneon the client would suppress the spurious failure but lose the stuck-checkout protection and leave dispatch breadth-first in one FIFO. The semaphore removes the failure mode, keeps the timeout meaningful, and bounds in-flight work explicitly.Tests
test_fan_out_in_flight_high_water_mark_is_the_cap(parametrized capped/unbounded) — the fetch-level concurrency equals the cap, not the plan total; the capped case fails on the pre-fix code.test_fan_out_outlives_pool_timeout_on_real_transport— mock transports bypass the pool, so this drives the chunker's shared client against a slow localhost server past a scaled-down pool timeout; it reproduces the spurious resumableServiceInterruptedon the pre-fix code and completes on this branch.Verified end-to-end against the live USGS API: with the pre-fix code 6/8 runs interrupted (in-flight peak 8, pool the only throttle); with the fix 0/8 (in-flight peak 2, semaphore gating). Full suite green,
ruffandmypy --strict dataretrieval/clean.🤖 Generated with Claude Code