Skip to content

Commit f99048e

Browse files
authored
fix: resolve batched request resolver defects hanging consumer fibers (#6139)
Signed-off-by: Marc MacLeod <marbemac+gh@gmail.com>
1 parent d4ed885 commit f99048e

File tree

3 files changed

+80
-13
lines changed

3 files changed

+80
-13
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"effect": patch
3+
---
4+
5+
Fix batched request resolver defects causing consumer fibers to hang forever.
6+
7+
When a `RequestResolver.makeBatched` resolver died with a defect, the request `Deferred`s were never completed because the cleanup logic in `invokeWithInterrupt` used `flatMap` (which only runs on success). Changed to `ensuring` so uncompleted request entries are always resolved regardless of exit type.

packages/effect/src/internal/fiberRuntime.ts

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3722,7 +3722,7 @@ export const invokeWithInterrupt: <A, E, R>(
37223722
onInterrupt?: () => void
37233723
) =>
37243724
core.fiberIdWith((id) =>
3725-
core.flatMap(
3725+
ensuring(
37263726
core.flatMap(
37273727
forkDaemon(core.interruptible(self)),
37283728
(processing) =>
@@ -3770,19 +3770,18 @@ export const invokeWithInterrupt: <A, E, R>(
37703770
})
37713771
})
37723772
),
3773-
() =>
3774-
core.suspend(() => {
3775-
const residual = entries.flatMap((entry) => {
3776-
if (!entry.state.completed) {
3777-
return [entry]
3778-
}
3779-
return []
3780-
})
3781-
return core.forEachSequentialDiscard(
3782-
residual,
3783-
(entry) => complete(entry.request as any, core.exitInterrupt(id))
3784-
)
3773+
core.suspend(() => {
3774+
const residual = entries.flatMap((entry) => {
3775+
if (!entry.state.completed) {
3776+
return [entry]
3777+
}
3778+
return []
37853779
})
3780+
return core.forEachSequentialDiscard(
3781+
residual,
3782+
(entry) => complete(entry.request as any, core.exitInterrupt(id))
3783+
)
3784+
})
37863785
)
37873786
)
37883787

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import { describe, it } from "@effect/vitest"
2+
import { assertTrue } from "@effect/vitest/utils"
3+
import * as Effect from "effect/Effect"
4+
import * as Exit from "effect/Exit"
5+
import * as Fiber from "effect/Fiber"
6+
import * as Request from "effect/Request"
7+
import * as RequestResolver from "effect/RequestResolver"
8+
9+
class GetValue extends Request.TaggedClass("GetValue")<string, never, { readonly id: number }> {}
10+
11+
describe("batched resolver defect", () => {
12+
// When a batched resolver dies with a defect, the request Deferreds are
13+
// never completed and consumers hang forever. The cleanup that completes
14+
// uncompleted entries only runs on success (flatMap/OP_ON_SUCCESS) but
15+
// should run on all exits.
16+
it.live("resolver defect should not hang consumers", () =>
17+
Effect.gen(function*() {
18+
const resolver = RequestResolver.makeBatched((_requests: Array<GetValue>) => Effect.die("boom"))
19+
20+
const fiber = yield* Effect.request(new GetValue({ id: 1 }), resolver).pipe(
21+
Effect.fork
22+
)
23+
24+
// Wait briefly then check if the fiber completed.
25+
// If the bug is present, the fiber hangs on deferredAwait forever.
26+
yield* Effect.sleep("500 millis")
27+
const poll = yield* Fiber.poll(fiber)
28+
29+
assertTrue(
30+
poll._tag === "Some",
31+
"Fiber should have completed — resolver defect must not leave consumers hanging"
32+
)
33+
34+
if (poll._tag === "Some") {
35+
assertTrue(Exit.isFailure(poll.value))
36+
}
37+
}))
38+
39+
it.live("resolver defect should not hang multiple consumers", () =>
40+
Effect.gen(function*() {
41+
const resolver = RequestResolver.makeBatched((_requests: Array<GetValue>) => Effect.die("boom"))
42+
43+
const fiber = yield* Effect.forEach(
44+
[1, 2, 3],
45+
(id) => Effect.request(new GetValue({ id }), resolver),
46+
{ batching: true, concurrency: "unbounded" }
47+
).pipe(Effect.fork)
48+
49+
yield* Effect.sleep("500 millis")
50+
const poll = yield* Fiber.poll(fiber)
51+
52+
assertTrue(
53+
poll._tag === "Some",
54+
"Fiber should have completed — resolver defect must not leave consumers hanging"
55+
)
56+
57+
if (poll._tag === "Some") {
58+
assertTrue(Exit.isFailure(poll.value))
59+
}
60+
}))
61+
})

0 commit comments

Comments
 (0)