Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/api/stream_iter.md
Original file line number Diff line number Diff line change
Expand Up @@ -1675,7 +1675,7 @@ Creates a classic [`stream.Writable`][] backed by a stream/iter Writer.

Each `_write()` / `_writev()` call attempts the Writer's synchronous method
first (`writeSync` / `writevSync`), falling back to the async method if the
sync path returns `false` or throws. Similarly, `_final()` tries `endSync()`
sync path returns `false`. Similarly, `_final()` tries `endSync()`
before `end()`. When the sync path succeeds, the callback is deferred via
`queueMicrotask` to preserve the async resolution contract.

Expand Down
67 changes: 16 additions & 51 deletions lib/internal/streams/iter/classic.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ const {
const {
toAsyncStreamable: kToAsyncStreamable,
kValidatedSource,
kSyncWriteAccepted,
drainableProtocol,
} = require('internal/streams/iter/types');

Expand Down Expand Up @@ -765,41 +764,11 @@ function toWritable(writer) {
const hasEndSync = hasEnd &&
typeof writer.endSync === 'function';
const hasFail = typeof writer.fail === 'function';
const hasSyncWriteAccepted =
typeof writer[kSyncWriteAccepted] === 'function';

function syncWriteAccepted() {
return hasSyncWriteAccepted && writer[kSyncWriteAccepted]();
}

function finishAfterSyncBackpressure(cb) {
let ondrain;
try {
if (typeof writer[drainableProtocol] === 'function') {
ondrain = writer[drainableProtocol]();
}
} catch (err) {
cb(err);
return;
}
if (ondrain !== null && ondrain !== undefined) {
PromisePrototypeThen(ondrain, (drained) => {
if (drained === false) {
cb(new ERR_INVALID_STATE.TypeError('Stream closed by consumer'));
return;
}
cb();
}, cb);
return;
}
queueMicrotask(cb);
}

// Try-sync-first pattern: attempt the synchronous method and fall back to the
// async method if it returns false without accepting the data, or if it
// throws. When the sync path succeeds, the callback is deferred via
// queueMicrotask to preserve the async resolution contract that Writable
// internals expect from _write/_writev/_final callbacks.
// async method if it returns false (data not accepted synchronously).
// When the sync path succeeds, the callback is deferred via queueMicrotask
// to preserve the async resolution contract that Writable internals expect
// from _write/_writev/_final callbacks.

function _write(chunk, encoding, cb) {
const bytes = typeof chunk === 'string' ?
Expand All @@ -810,13 +779,10 @@ function toWritable(writer) {
queueMicrotask(cb);
return;
}
if (syncWriteAccepted()) {
// The chunk was accepted; false only signaled backpressure.
finishAfterSyncBackpressure(cb);
return;
}
} catch {
// Sync path threw -- fall through to async.
// WriteSync returned false: not accepted, fall through to async.
} catch (err) {
Comment thread
jasnell marked this conversation as resolved.
cb(err);
return;
}
}
try {
Expand All @@ -839,13 +805,10 @@ function toWritable(writer) {
queueMicrotask(cb);
return;
}
if (syncWriteAccepted()) {
// The chunks were accepted; false only signaled backpressure.
finishAfterSyncBackpressure(cb);
return;
}
} catch {
// Sync path threw -- fall through to async.
// WritevSync returned false: not accepted, fall through to async.
} catch (err) {
cb(err);
return;
}
}
try {
Expand All @@ -867,8 +830,10 @@ function toWritable(writer) {
queueMicrotask(cb);
return;
}
} catch {
// Sync path threw -- fall through to async.
// Result < 0: can't end synchronously, fall through to async.
} catch (err) {
cb(err);
return;
}
}
try {
Expand Down
45 changes: 35 additions & 10 deletions lib/internal/streams/iter/consumers.js
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ function merge(...args) {
);
}

let primaryError;
try {
while (activeCount > 0 || ready.length > 0) {
signal?.throwIfAborted();
Expand All @@ -500,22 +501,46 @@ function merge(...args) {
});
}
}
} catch (err) {
primaryError = err;
} finally {
// Clean up: return all iterators
await SafePromiseAllReturnVoid(iterators, async (iterator) => {
if (iterator.return) {
try {
await iterator.return();
} catch {
// Ignore return errors
}
}
});
// Clean up: return all iterators. Cleanup errors are not
// swallowed - a broken iterator.return() (e.g., failing to
// release a resource) should be visible to the caller.
await cleanupIterators(iterators, primaryError);
}
},
};
}

async function cleanupIterators(iterators, primaryError) {
let cleanupError;
await SafePromiseAllReturnVoid(iterators, async (iterator) => {
if (iterator.return) {
try {
await iterator.return();
} catch (err) {
// Keep the first cleanup error encountered.
cleanupError ??= err;
}
}
});
if (cleanupError !== undefined) {
if (primaryError !== undefined) {
// Both a primary error and a cleanup error occurred.
// Wrap in SuppressedError so neither is lost:
// .error = primaryError, .suppressed = cleanupError.
// eslint-disable-next-line no-restricted-syntax
throw new SuppressedError(primaryError, cleanupError);
}
// No primary error - the cleanup error is the only error.
throw cleanupError;
}
if (primaryError !== undefined) {
throw primaryError;
}
}

module.exports = {
array,
arrayBuffer,
Expand Down
56 changes: 5 additions & 51 deletions lib/internal/streams/iter/pull.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ const {
} = require('internal/streams/iter/utils');

const {
drainableProtocol,
kSyncWriteAcceptedOnFalse,
kValidatedSource,
kValidatedTransform,
toAsyncStreamable,
Expand Down Expand Up @@ -863,18 +861,6 @@ async function* createAsyncPipeline(source, transforms, signal) {
}
}

/**
* Check if a false sync write result means accepted backpressure.
* @param {object} writer - The writer whose sync method returned.
* @param {*} result - The return value from writeSync() or writevSync().
* @returns {boolean}
*/
function isAcceptedSyncWriteBackpressure(writer, result) {
return result === false &&
writer[kSyncWriteAcceptedOnFalse] === true &&
writer.desiredSize === 0;
}

// =============================================================================
// Public API: pull() and pullSync()
// =============================================================================
Expand Down Expand Up @@ -968,9 +954,7 @@ function pipeToSync(source, ...args) {
break;
}
if (hasWritevSync && batch.length > 1) {
const result = writer.writevSync(batch);
if (result === false &&
!isAcceptedSyncWriteBackpressure(writer, result)) {
if (writer.writevSync(batch) === false) {
break;
}
for (let i = 0; i < batch.length; i++) {
Expand All @@ -979,9 +963,7 @@ function pipeToSync(source, ...args) {
} else {
for (let i = 0; i < batch.length; i++) {
const chunk = batch[i];
const result = writer.writeSync(chunk);
if (result === false &&
!isAcceptedSyncWriteBackpressure(writer, result)) {
if (writer.writeSync(chunk) === false) {
canContinue = false;
break;
}
Expand Down Expand Up @@ -1037,28 +1019,13 @@ async function pipeTo(source, ...args) {
const hasWritevSync = typeof writer.writevSync === 'function';
const hasEndSync = typeof writer.endSync === 'function';

function waitForSyncBackpressure() {
const ondrain = writer[drainableProtocol];
return ondrain?.call(writer);
}

async function writeBatchAfterAcceptedBackpressure(batch, startIndex) {
await waitForSyncBackpressure();
await writeBatchAsyncFallback(batch, startIndex);
}

// Async fallback for writeBatch when sync write fails partway through.
// Continues writing from batch[startIndex] using async write().
async function writeBatchAsyncFallback(batch, startIndex) {
for (let i = startIndex; i < batch.length; i++) {
const chunk = batch[i];
const result = hasWriteSync && writer.writeSync(chunk);
if (result) {
if (hasWriteSync && writer.writeSync(chunk)) {
// Sync retry succeeded
} else if (isAcceptedSyncWriteBackpressure(writer, result)) {
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
await waitForSyncBackpressure();
continue;
} else {
const result = writer.write(
chunk, signal ? { __proto__: null, signal } : undefined);
Expand All @@ -1075,14 +1042,7 @@ async function pipeTo(source, ...args) {
// is required. Callers must check: const p = writeBatch(b); if (p) await p;
function writeBatch(batch) {
if (hasWritev && batch.length > 1) {
const result = hasWritevSync && writer.writevSync(batch);
if (!result) {
if (isAcceptedSyncWriteBackpressure(writer, result)) {
for (let i = 0; i < batch.length; i++) {
totalBytes += TypedArrayPrototypeGetByteLength(batch[i]);
}
return waitForSyncBackpressure();
}
if (!hasWritevSync || !writer.writevSync(batch)) {
const opts = signal ? { __proto__: null, signal } : undefined;
const writevResult = writer.writev(batch, opts);
if (writevResult === undefined) {
Expand All @@ -1104,14 +1064,8 @@ async function pipeTo(source, ...args) {
}
for (let i = 0; i < batch.length; i++) {
const chunk = batch[i];
const result = hasWriteSync && writer.writeSync(chunk);
if (!result) {
if (isAcceptedSyncWriteBackpressure(writer, result)) {
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
return writeBatchAfterAcceptedBackpressure(batch, i + 1);
}
if (!hasWriteSync || !writer.writeSync(chunk)) {
// Sync path failed at index i - fall back to async for the rest.
// Count bytes for chunks already written synchronously (0..i-1).
return writeBatchAsyncFallback(batch, i);
}
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
Expand Down
48 changes: 2 additions & 46 deletions lib/internal/streams/iter/push.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ const {

const {
drainableProtocol,
kSyncWriteAccepted,
kSyncWriteAcceptedOnFalse,
} = require('internal/streams/iter/types');

const {
Expand Down Expand Up @@ -369,19 +367,6 @@ class PushQueue {
this.#pendingEnd = pending;
}

/**
* Force-enqueue chunks into the slots buffer, bypassing capacity checks.
* Used by PushWriter.writeSync() for 'block' policy where the data is
* accepted but false is returned as a backpressure signal.
*/
forceEnqueue(chunks) {
this.#slots.push(chunks);
for (let i = 0; i < chunks.length; i++) {
this.#bytesWritten += TypedArrayPrototypeGetByteLength(chunks[i]);
}
this.#resolvePendingReads();
}

/**
* Wait for backpressure to clear (desiredSize > 0).
* @returns {Promise<void>}
Expand Down Expand Up @@ -563,16 +548,11 @@ class PushQueue {

class PushWriter {
#queue;
#syncWriteAccepted = false;

constructor(queue) {
this.#queue = queue;
}

[kSyncWriteAccepted]() {
return this.#syncWriteAccepted;
}

[drainableProtocol]() {
const desired = this.desiredSize;
if (desired === null) return null;
Expand All @@ -584,10 +564,6 @@ class PushWriter {
return this.#queue.desiredSize;
}

get [kSyncWriteAcceptedOnFalse]() {
return this.#queue.backpressurePolicy === 'block';
}

write(chunk, options) {
if (!options?.signal && this.#queue.canWriteSync()) {
const bytes = toUint8Array(chunk);
Expand All @@ -612,36 +588,16 @@ class PushWriter {
}

writeSync(chunk) {
this.#syncWriteAccepted = false;
const bytes = toUint8Array(chunk);
const result = this.#queue.writeSync([bytes]);
if (!result && this.#queue.backpressurePolicy === 'block' &&
this.#queue.desiredSize === 0) {
// Block policy: force-enqueue and return false as backpressure signal.
// Data IS accepted; false tells caller to slow down.
this.#queue.forceEnqueue([bytes]);
this.#syncWriteAccepted = true;
return false;
}
this.#syncWriteAccepted = result;
return result;
return this.#queue.writeSync([bytes]);
Comment thread
jasnell marked this conversation as resolved.
}

writevSync(chunks) {
this.#syncWriteAccepted = false;
if (!ArrayIsArray(chunks)) {
throw new ERR_INVALID_ARG_TYPE('chunks', 'Array', chunks);
}
const bytes = convertChunks(chunks);
const result = this.#queue.writeSync(bytes);
if (!result && this.#queue.backpressurePolicy === 'block' &&
this.#queue.desiredSize === 0) {
this.#queue.forceEnqueue(bytes);
this.#syncWriteAccepted = true;
return false;
}
this.#syncWriteAccepted = result;
return result;
return this.#queue.writeSync(bytes);
}

end(options) {
Expand Down
Loading
Loading