Skip to content

Commit 8798a84

Browse files
authored
fix(effect): isolate scheduler runners per fiber (#6124)
1 parent 7103e24 commit 8798a84

9 files changed

Lines changed: 208 additions & 108 deletions

File tree

.changeset/giant-horses-clean.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"effect": patch
3+
---
4+
5+
Fix scheduler task draining to isolate `AsyncLocalStorage` across fibers.

packages/cluster/test/Sharding.test.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,14 @@ describe.concurrent("Sharding", () => {
9696
yield* TestClock.adjust(1)
9797
const config = yield* ShardingConfig.ShardingConfig
9898
;(config as any).runnerAddress = Option.some(RunnerAddress.make("localhost", 1234))
99-
fiber.currentScheduler.scheduleTask(() => {
100-
fiber.unsafeInterruptAsFork(FiberId.none)
101-
Effect.runFork(testClock.adjust(30000))
102-
}, 0)
99+
fiber.currentScheduler.scheduleTask(
100+
() => {
101+
fiber.unsafeInterruptAsFork(FiberId.none)
102+
Effect.runFork(testClock.adjust(30000))
103+
},
104+
0,
105+
fiber
106+
)
103107
}).pipe(
104108
Effect.provide(TestShardingWithoutRunners.pipe(
105109
Layer.provide(Layer.scoped(

packages/effect/src/Scheduler.ts

Lines changed: 84 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,69 @@ export type Task = () => void
2121
*/
2222
export interface Scheduler {
2323
shouldYield(fiber: RuntimeFiber<unknown, unknown>): number | false
24-
scheduleTask(task: Task, priority: number): void
24+
scheduleTask(task: Task, priority: number, fiber?: RuntimeFiber<unknown, unknown>): void
25+
}
26+
27+
/**
28+
* @since 3.20.0
29+
* @category models
30+
*/
31+
export class SchedulerRunner {
32+
running = false
33+
tasks = new PriorityBuckets()
34+
35+
constructor(
36+
readonly scheduleDrain: (depth: number, drain: (depth: number) => void) => void
37+
) {}
38+
39+
private starveInternal = (depth: number) => {
40+
const tasks = this.tasks.buckets
41+
this.tasks.buckets = []
42+
for (const [_, toRun] of tasks) {
43+
for (let i = 0; i < toRun.length; i++) {
44+
toRun[i]()
45+
}
46+
}
47+
if (this.tasks.buckets.length === 0) {
48+
this.running = false
49+
} else {
50+
this.starve(depth)
51+
}
52+
}
53+
54+
private starve(depth = 0) {
55+
this.scheduleDrain(depth, this.starveInternal)
56+
}
57+
58+
scheduleTask(task: Task, priority: number) {
59+
this.tasks.scheduleTask(task, priority)
60+
if (!this.running) {
61+
this.running = true
62+
this.starve()
63+
}
64+
}
65+
/**
66+
* @since 3.20.0
67+
* @category constructors
68+
*/
69+
static cached(
70+
scheduleDrain: (depth: number, drain: (depth: number) => void) => void
71+
) {
72+
const fallback = new SchedulerRunner(scheduleDrain)
73+
const runners = new WeakMap<RuntimeFiber<unknown, unknown>, SchedulerRunner>()
74+
75+
return (fiber?: RuntimeFiber<unknown, unknown>) => {
76+
if (fiber === undefined) {
77+
return fallback
78+
}
79+
let runner = runners.get(fiber)
80+
if (runner === undefined) {
81+
runner = new SchedulerRunner(scheduleDrain)
82+
runners.set(fiber, runner)
83+
}
84+
return runner
85+
}
86+
}
2587
}
2688

2789
/**
@@ -62,14 +124,13 @@ export class PriorityBuckets<in out T = Task> {
62124
* @category constructors
63125
*/
64126
export class MixedScheduler implements Scheduler {
65-
/**
66-
* @since 2.0.0
67-
*/
68-
running = false
69-
/**
70-
* @since 2.0.0
71-
*/
72-
tasks = new PriorityBuckets()
127+
private readonly getRunner = SchedulerRunner.cached((depth, drain) => {
128+
if (depth >= this.maxNextTickBeforeTimer) {
129+
setTimeout(() => drain(0), 0)
130+
} else {
131+
Promise.resolve(void 0).then(() => drain(depth + 1))
132+
}
133+
})
73134

74135
constructor(
75136
/**
@@ -78,35 +139,6 @@ export class MixedScheduler implements Scheduler {
78139
readonly maxNextTickBeforeTimer: number
79140
) {}
80141

81-
/**
82-
* @since 2.0.0
83-
*/
84-
private starveInternal(depth: number) {
85-
const tasks = this.tasks.buckets
86-
this.tasks.buckets = []
87-
for (const [_, toRun] of tasks) {
88-
for (let i = 0; i < toRun.length; i++) {
89-
toRun[i]()
90-
}
91-
}
92-
if (this.tasks.buckets.length === 0) {
93-
this.running = false
94-
} else {
95-
this.starve(depth)
96-
}
97-
}
98-
99-
/**
100-
* @since 2.0.0
101-
*/
102-
private starve(depth = 0) {
103-
if (depth >= this.maxNextTickBeforeTimer) {
104-
setTimeout(() => this.starveInternal(0), 0)
105-
} else {
106-
Promise.resolve(void 0).then(() => this.starveInternal(depth + 1))
107-
}
108-
}
109-
110142
/**
111143
* @since 2.0.0
112144
*/
@@ -119,12 +151,8 @@ export class MixedScheduler implements Scheduler {
119151
/**
120152
* @since 2.0.0
121153
*/
122-
scheduleTask(task: Task, priority: number) {
123-
this.tasks.scheduleTask(task, priority)
124-
if (!this.running) {
125-
this.running = true
126-
this.starve()
127-
}
154+
scheduleTask(task: Task, priority: number, fiber?: RuntimeFiber<unknown, unknown>) {
155+
this.getRunner(fiber).scheduleTask(task, priority)
128156
}
129157
}
130158

@@ -155,9 +183,9 @@ export class SyncScheduler implements Scheduler {
155183
/**
156184
* @since 2.0.0
157185
*/
158-
scheduleTask(task: Task, priority: number) {
186+
scheduleTask(task: Task, priority: number, fiber?: RuntimeFiber<unknown, unknown>) {
159187
if (this.deferred) {
160-
defaultScheduler.scheduleTask(task, priority)
188+
defaultScheduler.scheduleTask(task, priority, fiber)
161189
} else {
162190
this.tasks.scheduleTask(task, priority)
163191
}
@@ -207,9 +235,9 @@ export class ControlledScheduler implements Scheduler {
207235
/**
208236
* @since 2.0.0
209237
*/
210-
scheduleTask(task: Task, priority: number) {
238+
scheduleTask(task: Task, priority: number, fiber?: RuntimeFiber<unknown, unknown>) {
211239
if (this.deferred) {
212-
defaultScheduler.scheduleTask(task, priority)
240+
defaultScheduler.scheduleTask(task, priority, fiber)
213241
} else {
214242
this.tasks.scheduleTask(task, priority)
215243
}
@@ -254,16 +282,16 @@ export const makeMatrix = (...record: Array<[number, Scheduler]>): Scheduler =>
254282
}
255283
return false
256284
},
257-
scheduleTask(task, priority) {
285+
scheduleTask(task, priority, fiber) {
258286
let scheduler: Scheduler | undefined = undefined
259287
for (const i of index) {
260288
if (priority >= i[0]) {
261289
scheduler = i[1]
262290
} else {
263-
return (scheduler ?? defaultScheduler).scheduleTask(task, priority)
291+
return (scheduler ?? defaultScheduler).scheduleTask(task, priority, fiber)
264292
}
265293
}
266-
return (scheduler ?? defaultScheduler).scheduleTask(task, priority)
294+
return (scheduler ?? defaultScheduler).scheduleTask(task, priority, fiber)
267295
}
268296
}
269297
}
@@ -298,31 +326,12 @@ export const makeBatched = (
298326
callback: (runBatch: () => void) => void,
299327
shouldYield: Scheduler["shouldYield"] = defaultShouldYield
300328
) => {
301-
let running = false
302-
const tasks = new PriorityBuckets()
303-
const starveInternal = () => {
304-
const tasksToRun = tasks.buckets
305-
tasks.buckets = []
306-
for (const [_, toRun] of tasksToRun) {
307-
for (let i = 0; i < toRun.length; i++) {
308-
toRun[i]()
309-
}
310-
}
311-
if (tasks.buckets.length === 0) {
312-
running = false
313-
} else {
314-
starve()
315-
}
316-
}
317-
318-
const starve = () => callback(starveInternal)
329+
const getRunner = SchedulerRunner.cached((_, drain) => {
330+
callback(() => drain(0))
331+
})
319332

320-
return make((task, priority) => {
321-
tasks.scheduleTask(task, priority)
322-
if (!running) {
323-
running = true
324-
starve()
325-
}
333+
return make((task, priority, fiber) => {
334+
getRunner(fiber).scheduleTask(task, priority)
326335
}, shouldYield)
327336
}
328337

packages/effect/src/internal/effect/circular.ts

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,18 @@ class Semaphore {
7070
updateTakenUnsafe(fiber: Fiber.RuntimeFiber<any, any>, f: (n: number) => number): Effect.Effect<number> {
7171
this.taken = f(this.taken)
7272
if (this.waiters.size > 0) {
73-
fiber.getFiberRef(currentScheduler).scheduleTask(() => {
74-
const iter = this.waiters.values()
75-
let item = iter.next()
76-
while (item.done === false && this.free > 0) {
77-
item.value()
78-
item = iter.next()
79-
}
80-
}, fiber.getFiberRef(core.currentSchedulingPriority))
73+
fiber.getFiberRef(currentScheduler).scheduleTask(
74+
() => {
75+
const iter = this.waiters.values()
76+
let item = iter.next()
77+
while (item.done === false && this.free > 0) {
78+
item.value()
79+
item = iter.next()
80+
}
81+
},
82+
fiber.getFiberRef(core.currentSchedulingPriority),
83+
fiber
84+
)
8185
}
8286
return core.succeed(this.free)
8387
}
@@ -143,7 +147,7 @@ class Latch extends Effectable.Class<void> implements Effect.Latch {
143147
return core.void
144148
}
145149
this.scheduled = true
146-
fiber.currentScheduler.scheduleTask(this.flushWaiters, fiber.getFiberRef(core.currentSchedulingPriority))
150+
fiber.currentScheduler.scheduleTask(this.flushWaiters, fiber.getFiberRef(core.currentSchedulingPriority), fiber)
147151
return core.void
148152
}
149153
private flushWaiters = () => {

packages/effect/src/internal/fiberRuntime.ts

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,8 @@ export class FiberRuntime<in out A, in out E = never> extends Effectable.Class<A
708708
drainQueueLaterOnExecutor() {
709709
this.currentScheduler.scheduleTask(
710710
this.run,
711-
this.getFiberRef(core.currentSchedulingPriority)
711+
this.getFiberRef(core.currentSchedulingPriority),
712+
this
712713
)
713714
}
714715

@@ -2209,9 +2210,13 @@ export const forEachConcurrentDiscard = <A, X, E, R>(
22092210
const results = new Array()
22102211
const interruptAll = () =>
22112212
fibers.forEach((fiber) => {
2212-
fiber.currentScheduler.scheduleTask(() => {
2213-
fiber.unsafeInterruptAsFork(parent.id())
2214-
}, 0)
2213+
fiber.currentScheduler.scheduleTask(
2214+
() => {
2215+
fiber.unsafeInterruptAsFork(parent.id())
2216+
},
2217+
0,
2218+
fiber
2219+
)
22152220
})
22162221
const startOrder = new Array<FiberRuntime<Exit.Exit<X, E> | Effect.Blocked<X, E>>>()
22172222
const joinOrder = new Array<FiberRuntime<Exit.Exit<X, E> | Effect.Blocked<X, E>>>()
@@ -2234,12 +2239,16 @@ export const forEachConcurrentDiscard = <A, X, E, R>(
22342239
parent.currentRuntimeFlags,
22352240
fiberScope.globalScope
22362241
)
2237-
parent.currentScheduler.scheduleTask(() => {
2238-
if (interruptImmediately) {
2239-
fiber.unsafeInterruptAsFork(parent.id())
2240-
}
2241-
fiber.resume(runnable)
2242-
}, 0)
2242+
parent.currentScheduler.scheduleTask(
2243+
() => {
2244+
if (interruptImmediately) {
2245+
fiber.unsafeInterruptAsFork(parent.id())
2246+
}
2247+
fiber.resume(runnable)
2248+
},
2249+
0,
2250+
fiber
2251+
)
22432252
return fiber
22442253
}
22452254
const onInterruptSignal = () => {
@@ -2295,9 +2304,13 @@ export const forEachConcurrentDiscard = <A, X, E, R>(
22952304
startOrder.push(fiber)
22962305
fibers.add(fiber)
22972306
if (interrupted) {
2298-
fiber.currentScheduler.scheduleTask(() => {
2299-
fiber.unsafeInterruptAsFork(parent.id())
2300-
}, 0)
2307+
fiber.currentScheduler.scheduleTask(
2308+
() => {
2309+
fiber.unsafeInterruptAsFork(parent.id())
2310+
},
2311+
0,
2312+
fiber
2313+
)
23012314
}
23022315
fiber.addObserver((wrapped) => {
23032316
let exit: Exit.Exit<any, any> | core.Blocked

0 commit comments

Comments
 (0)