Node.js v18.18.2 文档


目录

网络流 API#

稳定性:1 - 实验性。

WHATWG Streams 标准的实现。

概述#

WHATWG Streams 标准(或“Web 流”)定义了用于处理流数据的 API。它类似于 Node.js Streams API,但出现较晚,并已成为跨许多 JavaScript 环境流式传输数据的“标准”API。

对象主要分为三种类型:

  • ReadableStream - 表示流数据源。
  • WritableStream - 表示流数据的目的地。
  • TransformStream - 表示转换流数据的算法。

示例ReadableStream#

此示例创建一个简单的ReadableStream,它 每秒推送一次当前的performance.now()时间戳。异步迭代用于从流中读取数据。

import {
  ReadableStream,
} from 'node:stream/web';

import {
  setInterval as every,
} from 'node:timers/promises';

import {
  performance,
} from 'node:perf_hooks';

const SECOND = 1000;

const stream = new ReadableStream({
  async start(controller) {
    for await (const _ of every(SECOND))
      controller.enqueue(performance.now());
  },
});

for await (const value of stream)
  console.log(value);const {
  ReadableStream,
} = require('node:stream/web');

const {
  setInterval: every,
} = require('node:timers/promises');

const {
  performance,
} = require('node:perf_hooks');

const SECOND = 1000;

const stream = new ReadableStream({
  async start(controller) {
    for await (const _ of every(SECOND))
      controller.enqueue(performance.now());
  },
});

(async () => {
  for await (const value of stream)
    console.log(value);
})();

应用程序编程接口#

类:ReadableStream#

new ReadableStream([underlyingSource [, strategy]])#
  • underlyingSource <对象>
    • start <Function>创建 ReadableStream时立即调用的用户定义函数。
    • pull <Function>当ReadableStream内部队列未满时重复调用的用户定义函数 。该操作可以是同步的或异步的。如果是异步的,则在满足之前返回的 Promise 之前,不会再次调用该函数。
    • cancel <Function>当取消 ReadableStream时调用的用户定义函数 。
      • reason <任意>
      • 返回:用undefined履行的 Promise 。
    • type <string>必须是'bytes'undefined
    • autoAllocateChunkSize <number>仅当type等于 'bytes'时使用。当设置为非零值时,视图缓冲区会自动分配给ReadableByteStreamController.byobRequest。如果未设置,则必须使用流的内部队列通过默认读取器ReadableStreamDefaultReader传输数据。
  • strategy <对象>
    • highWaterMark <number>应用反压之前的最大内部队列大小。
    • size <Function>用户定义的函数,用于识别每个数据块的大小。
readableStream.locked#

readableStream.locked属性默认为false ,当有事件读取器消耗流数据时,会切换为true

readableStream.cancel([reason])#
  • reason <任意>
  • 返回:取消完成后,用undefined履行 Promise 。
readableStream.getReader([options])#
import { ReadableStream } from 'node:stream/web';

const stream = new ReadableStream();

const reader = stream.getReader();

console.log(await reader.read());const { ReadableStream } = require('node:stream/web');

const stream = new ReadableStream();

const reader = stream.getReader();

reader.read().then(console.log);

导致readableStream.lockedtrue

readableStream.pipeThrough(transform[, options])#
  • transform <对象>
    • readable <ReadableStream> ReadableStream将从此 ReadableStream接收可能修改的数据推送到的ReadableStream
    • writable <WritableStream>ReadableStream的数据将写入的 WritableStream
  • options <对象>
    • preventAbort <boolean>true时,此ReadableStream中的错误 不会导致transform.writable中止。
    • preventCancel <boolean>true时,目标 transform.writable中的错误不会导致此ReadableStream被取消。
    • preventClose <boolean>true时,关闭此ReadableStream 不会导致transform.writable关闭。
    • signal <AbortSignal>允许使用<AbortController>取消数据传输。
  • 返回:<ReadableStream>来自transform.readable

将此<ReadableStream>连接到transform参数中提供的一对<ReadableStream><WritableStream>,以便将此<ReadableStream>中的数据写入到transform.writable,可能会进行转换,然后推送到transform.readable。配置管道后,将返回transform.readable

当管道操作处于事件状态时,导致readableStream.lockedtrue

import {
  ReadableStream,
  TransformStream,
} from 'node:stream/web';

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('a');
  },
});

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

const transformedStream = stream.pipeThrough(transform);

for await (const chunk of transformedStream)
  console.log(chunk);const {
  ReadableStream,
  TransformStream,
} = require('node:stream/web');

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('a');
  },
});

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

const transformedStream = stream.pipeThrough(transform);

(async () => {
  for await (const chunk of transformedStream)
    console.log(chunk);
})();
readableStream.pipeTo(destination[, options])#

当管道操作处于事件状态时,导致readableStream.lockedtrue

readableStream.tee()#

返回一对新的<ReadableStream>实例,此 ReadableStream的数据将被转发到该实例。每个人都会收到相同的数据。

导致readableStream.lockedtrue

readableStream.values([options])#

创建并返回一个可用于使用此 ReadableStream的数据的异步迭代器。

当异步迭代器处于事件状态时,导致readableStream.lockedtrue

import { Buffer } from 'node:buffer';

const stream = new ReadableStream(getSomeSource());

for await (const chunk of stream.values({ preventCancel: true }))
  console.log(Buffer.from(chunk).toString()); 
异步迭代#

<ReadableStream>对象支持使用 for await语法的异步迭代器协议。

import { Buffer } from 'node:buffer';

const stream = new ReadableStream(getSomeSource());

for await (const chunk of stream)
  console.log(Buffer.from(chunk).toString()); 

异步迭代器将消耗<ReadableStream>直到它终止。

默认情况下,如果异步迭代器提前退出(通过breakreturnthrow),则<ReadableStream>将被关闭。要防止自动关闭 <ReadableStream> 请使用readableStream.values() 方法获取异步迭代器并将preventCancel选项设置为 true

<ReadableStream>不得被锁定(即,它不得有现有的事件读取器)在异步迭代期间,<ReadableStream>将被锁定。

使用postMessage()转移#

可以使用<MessagePort>传输<ReadableStream>实例。

const stream = new ReadableStream(getReadableSourceSomehow());

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  data.getReader().read().then((chunk) => {
    console.log(chunk);
  });
};

port2.postMessage(stream, [stream]); 

类:ReadableStreamDefaultReader#

默认情况下,调用不带参数的readableStream.getReader()将返回ReadableStreamDefaultReader的实例。默认读取器将通过流传递的数据块视为不透明值,这允许 <ReadableStream>通常可以处理任何 JavaScript 值。

new ReadableStreamDefaultReader(stream)#

创建一个锁定到给定<ReadableStream>的新<ReadableStreamDefaultReader>

readableStreamDefaultReader.cancel([reason])#
  • reason <任意>
  • 返回:用undefined履行的 Promise 。

取消<ReadableStream>并返回一个在取消基础流时实现的 Promise 。

readableStreamDefaultReader.closed#
  • 类型:<Promise>当关联的 <ReadableStream>被关闭或拒绝时,如果流发生错误或在流完成关闭之前释放了读取器的锁,则用 undefined实现。
readableStreamDefaultReader.read()#

从底层<ReadableStream>请求下一个数据块 ,并返回一个 Promise ,一旦数据可用,该 Promise 将被履行。

readableStreamDefaultReader.releaseLock()#

释放该读取器对底层<ReadableStream>的锁定。

类:ReadableStreamBYOBReader#

ReadableStreamBYOBReader是面向字节的<ReadableStream>的替代使用者( 当 ReadableStream已创建)。

BYOB是“自带缓冲区”的缩写。这种模式允许更有效地读取面向字节的数据,从而避免无关的复制。

import {
  open,
} from 'node:fs/promises';

import {
  ReadableStream,
} from 'node:stream/web';

import { Buffer } from 'node:buffer';

class Source {
  type = 'bytes';
  autoAllocateChunkSize = 1024;

  async start(controller) {
    this.file = await open(new URL(import.meta.url));
    this.controller = controller;
  }

  async pull(controller) {
    const view = controller.byobRequest?.view;
    const {
      bytesRead,
    } = await this.file.read({
      buffer: view,
      offset: view.byteOffset,
      length: view.byteLength,
    });

    if (bytesRead === 0) {
      await this.file.close();
      this.controller.close();
    }
    controller.byobRequest.respond(bytesRead);
  }
}

const stream = new ReadableStream(new Source());

async function read(stream) {
  const reader = stream.getReader({ mode: 'byob' });

  const chunks = [];
  let result;
  do {
    result = await reader.read(Buffer.alloc(100));
    if (result.value !== undefined)
      chunks.push(Buffer.from(result.value));
  } while (!result.done);

  return Buffer.concat(chunks);
}

const data = await read(stream);
console.log(Buffer.from(data).toString()); 
new ReadableStreamBYOBReader(stream)#

创建一个锁定到给定<ReadableStream>的新ReadableStreamBYOBReader

readableStreamBYOBReader.cancel([reason])#
  • reason <任意>
  • 返回:用undefined履行的 Promise 。

取消<ReadableStream>并返回一个在取消基础流时实现的 Promise 。

readableStreamBYOBReader.closed#
  • 类型:<Promise>当关联的 <ReadableStream>被关闭或拒绝时,如果流发生错误或在流完成关闭之前释放了读取器的锁,则用 undefined实现。
readableStreamBYOBReader.read(view)#

从底层<ReadableStream>请求下一个数据块 ,并返回一个 Promise ,一旦数据可用,该 Promise 将被履行。

不要将池化的<Buffer>对象实例传递给此方法。池化的Buffer对象是使用Buffer.allocUnsafe()Buffer.from()创建的,或者通常由各种node:fs模块回调返回。这些类型的Buffer使用共享的底层 <ArrayBuffer>对象,该对象包含来自所有池化Buffer实例的所有数据。当Buffer<TypedArray><DataView>传递到readableStreamBYOBReader.read()时,视图的基础ArrayBuffer分离,从而使所有可能存在的现有视图失效关于那个ArrayBuffer。这可能会给您的应用程序带来灾难性的后果。

readableStreamBYOBReader.releaseLock()#

释放该读取器对底层<ReadableStream>的锁定。

类:ReadableStreamDefaultController#

每个<ReadableStream>都有一个控制器,负责流队列的内部状态和管理。ReadableStreamDefaultController是非面向字节的ReadableStream的默认控制器实现。

readableStreamDefaultController.close()#

关闭与此控制器关联的<ReadableStream> 。

readableStreamDefaultController.desiredSize#

返回填充<ReadableStream>队列的剩余数据量。

readableStreamDefaultController.enqueue([chunk])#

将新的数据块追加到<ReadableStream>的队列中。

readableStreamDefaultController.error([error])#

发出错误信号,导致 <ReadableStream>出错并关闭。

类:ReadableByteStreamController#

每个<ReadableStream>都有一个控制器,负责流队列的内部状态和管理。ReadableByteStreamController用于面向字节的ReadableStream

readableByteStreamController.byobRequest#
readableByteStreamController.close()#

关闭与此控制器关联的<ReadableStream> 。

readableByteStreamController.desiredSize#

返回填充<ReadableStream>队列的剩余数据量。

readableByteStreamController.enqueue(chunk)#

将新的数据块追加到<ReadableStream>的队列中。

readableByteStreamController.error([error])#

发出错误信号,导致 <ReadableStream>出错并关闭。

类:ReadableStreamBYOBRequest#

在面向字节的流中使用ReadableByteStreamController以及使用 ReadableStreamBYOBReader时,readableByteStreamController.byobRequest属性提供对表示的ReadableStreamBYOBRequest实例的访问当前的读取请求。该对象用于访问已为读取请求填充而提供的ArrayBuffer / TypedArray ,并提供用于发出数据已提供信号的方法。

readableStreamBYOBRequest.respond(bytesWritten)#

表示bytesWritten个字节已写入readableStreamBYOBRequest.view

readableStreamBYOBRequest.respondWithNewView(view)#

表示请求已通过写入新的BufferTypedArrayDataView 的字节得到满足。

readableStreamBYOBRequest.view#

类:WritableStream#

WritableStream是流数据发送到的目的地。

import {
  WritableStream,
} from 'node:stream/web';

const stream = new WritableStream({
  write(chunk) {
    console.log(chunk);
  },
});

await stream.getWriter().write('Hello World'); 
new WritableStream([underlyingSink[, strategy]])#
  • underlyingSink <对象>
    • start <Function>创建 WritableStream时立即调用的用户定义函数。
    • write <Function>当一块数据写入WritableStream时调用的用户定义函数。
    • close <Function>关闭 WritableStream时调用的用户定义函数 。
      • 返回:用undefined履行的 Promise 。
    • abort <Function>调用以突然关闭WritableStream的用户定义函数。
      • reason <任意>
      • 返回:用undefined履行的 Promise 。
    • type <any> type选项保留供将来使用,并且必须未定义。
  • strategy <对象>
    • highWaterMark <number>应用反压之前的最大内部队列大小。
    • size <Function>用户定义的函数,用于识别每个数据块的大小。
writableStream.abort([reason])#
  • reason <任意>
  • 返回:用undefined履行的 Promise 。

突然终止WritableStream。所有排队的写入都将被取消,并且其关联的 Promise 将被拒绝。

writableStream.close()#
  • 返回:用undefined履行的 Promise 。

当不需要额外写入时关闭WritableStream

writableStream.getWriter()#

创建并创建一个新的写入器实例,可用于将数据写入WritableStream

writableStream.locked#

writableStream.locked属性默认为false ,当有一个事件的 writer 附加到此 WritableStream时,会切换为true

使用 postMessage() 传输#

可以使用<MessagePort>传输<WritableStream>实例。

const stream = new WritableStream(getWritableSinkSomehow());

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  data.getWriter().write('hello');
};

port2.postMessage(stream, [stream]); 

类:WritableStreamDefaultWriter#

new WritableStreamDefaultWriter(stream)#

创建一个锁定到给定 WritableStream 的新WritableStreamDefaultWriter

writableStreamDefaultWriter.abort([reason])#
  • reason <任意>
  • 返回:用undefined履行的 Promise 。

突然终止WritableStream。所有排队的写入都将被取消,并且其关联的 Promise 将被拒绝。

writableStreamDefaultWriter.close()#
  • 返回:用undefined履行的 Promise 。

当不需要额外写入时关闭WritableStream

writableStreamDefaultWriter.closed#
  • 类型:<Promise>当关联的 <WritableStream>被关闭或拒绝时,如果流发生错误或在流完成关闭之前释放写入者的锁,则用 undefined实现。
writableStreamDefaultWriter.desiredSize#

填充<WritableStream>队列所需的数据量。

writableStreamDefaultWriter.ready#
  • type:当编写器准备好使用时,用undefined履行的 Promise 。
writableStreamDefaultWriter.releaseLock()#

释放该写入者对底层<ReadableStream>的锁定。

writableStreamDefaultWriter.write([chunk])#
  • chunk<任何>
  • 返回:用undefined履行的 Promise 。

将新的数据块追加到<WritableStream>的队列中。

类:WritableStreamDefaultController#

WritableStreamDefaultController管理<WritableStream>的内部状态。

writableStreamDefaultController.error([error])#

由用户代码调用以表示处理WritableStream数据时发生错误。调用时,<WritableStream>被中止,同时取消当前挂起的写入。

writableStreamDefaultController.signal#

类:TransformStream#

TransformStream<ReadableStream><WritableStream>组成,它们相互连接,以便写入到WritableStream 的数据在被推送到ReadableStream的队列。

import {
  TransformStream,
} from 'node:stream/web';

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

await Promise.all([
  transform.writable.getWriter().write('A'),
  transform.readable.getReader().read(),
]); 
new TransformStream([transformer[, writableStrategy[, readableStrategy]]])#
transformStream.readable#
transformStream.writable#
使用 postMessage() 传输#

可以使用<MessagePort>传输<TransformStream>实例。

const stream = new TransformStream();

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  const { writable, readable } = data;
  // ...
};

port2.postMessage(stream, [stream]); 

类:TransformStreamDefaultController#

TransformStreamDefaultController管理TransformStream的内部状态。

transformStreamDefaultController.desiredSize#

填充可读端队列所需的数据量。

transformStreamDefaultController.enqueue([chunk])#

将一块数据附加到可读端的队列中。

transformStreamDefaultController.error([reason])#

向可读和可写端发出信号,表明在处理转换数据时发生了错误,导致双方突然关闭。

transformStreamDefaultController.terminate()#

关闭传输的可读端并导致可写端因错误而突然关闭。

类:ByteLengthQueuingStrategy#

new ByteLengthQueuingStrategy(options)#
byteLengthQueuingStrategy.highWaterMark#
byteLengthQueuingStrategy.size#

类:CountQueuingStrategy#

new CountQueuingStrategy(options)#
countQueuingStrategy.highWaterMark#
countQueuingStrategy.size#

类:TextEncoderStream#

new TextEncoderStream()#

创建一个新的TextEncoderStream实例。

textEncoderStream.encoding#

TextEncoderStream实例支持的编码。

textEncoderStream.readable#
textEncoderStream.writable#

类:TextDecoderStream#

new TextDecoderStream([encoding[, options]])#
  • encoding <string>标识此TextDecoder实例支持的encoding默认值: 'utf-8'
  • options <对象>
    • fatal <boolean> true如果解码失败是致命的。
    • ignoreBOM <boolean>true时,TextDecoderStream将在解码结果中包含字节顺序标记。当false时,字节顺序标记将从输出中删除。仅当encoding'utf-8''utf-16be''utf-16le'时,才使用此选项。默认值: false

创建一个新的TextDecoderStream实例。

textDecoderStream.encoding#

TextDecoderStream实例支持的编码。

textDecoderStream.fatal#

如果解码错误导致抛出TypeError,则该值将为true

textDecoderStream.ignoreBOM#

如果解码结果包含字节顺序标记,则该值将为true

textDecoderStream.readable#
textDecoderStream.writable#

类:CompressionStream#

new CompressionStream(format)#
  • format <string> 'deflate''gzip'之一。
compressionStream.readable#
compressionStream.writable#

类:DecompressionStream#

new DecompressionStream(format)#
  • format <string> 'deflate''gzip'之一。
decompressionStream.readable#
decompressionStream.writable#

实用程序消费者#

实用程序消费者函数提供了消费流的通用选项。

它们可以通过以下方式访问:

import {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} from 'node:stream/consumers';const {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} = require('node:stream/consumers');
streamConsumers.arrayBuffer(stream)#
import { arrayBuffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { TextEncoder } from 'node:util';

const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');

const readable = Readable.from(dataArray);
const data = await arrayBuffer(readable);
console.log(`from readable: ${data.byteLength}`);const { arrayBuffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { TextEncoder } = require('node:util');

const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
arrayBuffer(readable).then((data) => {
  console.log(`from readable: ${data.byteLength}`);
});
streamConsumers.blob(stream)#
import { blob } from 'node:stream/consumers';

const dataBlob = new Blob(['hello world from consumers!']);

const readable = dataBlob.stream();
const data = await blob(readable);
console.log(`from readable: ${data.size}`);const { blob } = require('node:stream/consumers');

const dataBlob = new Blob(['hello world from consumers!']);

const readable = dataBlob.stream();
blob(readable).then((data) => {
  console.log(`from readable: ${data.size}`);
});
streamConsumers.buffer(stream)#
import { buffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer);
const data = await buffer(readable);
console.log(`from readable: ${data.length}`);const { buffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { Buffer } = require('node:buffer');

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer);
buffer(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
});
streamConsumers.json(stream)#
import { json } from 'node:stream/consumers';
import { Readable } from 'node:stream';

const items = Array.from(
  {
    length: 100,
  },
  () => ({
    message: 'hello world from consumers!',
  }),
);

const readable = Readable.from(JSON.stringify(items));
const data = await json(readable);
console.log(`from readable: ${data.length}`);const { json } = require('node:stream/consumers');
const { Readable } = require('node:stream');

const items = Array.from(
  {
    length: 100,
  },
  () => ({
    message: 'hello world from consumers!',
  }),
);

const readable = Readable.from(JSON.stringify(items));
json(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
});
streamConsumers.text(stream)#
import { text } from 'node:stream/consumers';
import { Readable } from 'node:stream';

const readable = Readable.from('Hello world from consumers!');
const data = await text(readable);
console.log(`from readable: ${data.length}`);const { text } = require('node:stream/consumers');
const { Readable } = require('node:stream');

const readable = Readable.from('Hello world from consumers!');
text(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
});

NodeJS中文文档为Read dev Docs平台提供托管,中文NodeJS文档均由英文版NodeJS文档翻译,版权属于nodejs.org