Node.js v18.18.2 文档


目录

Stream 流[来源]#

稳定性:2 - 稳定

源代码: lib/stream.js

流是 Node.js 中处理流数据的抽象接口。node:stream模块提供了用于实现流接口的 API。

Node.js 提供了许多流对象。例如, 对 HTTP 服务器的请求process.stdout 都是流实例。

流可以是可读的、可写的或两者兼而有之。所有流都是 EventEmitter的实例。

要访问node:stream模块:

const stream = require('node:stream'); 

node:stream模块对于创建新类型的流实例非常有用。通常不需要使用node:stream模块来消费流。

本文件的组织#

本文档包含两个主要部分和第三个注释部分。第一部分解释如何在应用程序中使用现有的流。第二部分解释如何创建新类型的流。

流的类型#

Node.js 中有四种基本的流类型:

此外,该模块还包括实用函数 stream.pipeline()stream.finished()stream.Readable.from()stream.addAbortSignal()

Streams Promise API#

stream/promises API为返回Promise对象而不是使用回调的流提供了一组替代的异步实用函数。该 API 可通过require('node:stream/promises')require('node:stream').promises访问。

stream.pipeline(source[, ...transforms], destination[, options])#

stream.pipeline(streams[, options])#

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

async function run() {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz'),
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

await pipeline(
  createReadStream('archive.tar'),
  createGzip(),
  createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');

要使用AbortSignal,请将其作为最后一个参数传递到选项对象内。当信号中止时,将在底层管道上调用destroy ,并带有AbortError

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

async function run() {
  const ac = new AbortController();
  const signal = ac.signal;

  setImmediate(() => ac.abort());
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz'),
    { signal },
  );
}

run().catch(console.error); // AbortErrorimport { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
  await pipeline(
    createReadStream('archive.tar'),
    createGzip(),
    createWriteStream('archive.tar.gz'),
    { signal },
  );
} catch (err) {
  console.error(err); // AbortError
}

pipeline API还支持异步生成器:

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');

async function run() {
  await pipeline(
    fs.createReadStream('lowercase.txt'),
    async function* (source, { signal }) {
      source.setEncoding('utf8');  // Work with strings rather than `Buffer`s.
      for await (const chunk of source) {
        yield await processChunk(chunk, { signal });
      }
    },
    fs.createWriteStream('uppercase.txt'),
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';

await pipeline(
  createReadStream('lowercase.txt'),
  async function* (source, { signal }) {
    source.setEncoding('utf8');  // Work with strings rather than `Buffer`s.
    for await (const chunk of source) {
      yield await processChunk(chunk, { signal });
    }
  },
  createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

请记住处理传递到异步生成器的signal参数。特别是在异步生成器是管道源(即第一个参数)或管道永远不会完成的情况下。

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');

async function run() {
  await pipeline(
    async function* ({ signal }) {
      await someLongRunningfn({ signal });
      yield 'asd';
    },
    fs.createWriteStream('uppercase.txt'),
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
  async function* ({ signal }) {
    await someLongRunningfn({ signal });
    yield 'asd';
  },
  fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

pipeline API提供回调版本

stream.finished(stream[, options])#

const { finished } = require('node:stream/promises');
const fs = require('node:fs');

const rs = fs.createReadStream('archive.tar');

async function run() {
  await finished(rs);
  console.log('Stream is done reading.');
}

run().catch(console.error);
rs.resume(); // Drain the stream.import { finished } from 'node:stream/promises';
import { createReadStream } from 'node:fs';

const rs = createReadStream('archive.tar');

async function run() {
  await finished(rs);
  console.log('Stream is done reading.');
}

run().catch(console.error);
rs.resume(); // Drain the stream.

finished API提供回调版本

对象模式#

Node.js API 创建的所有流都专门对字符串和Buffer (或Uint8Array)对象进行操作。但是,流实现可以使用其他类型的 JavaScript 值( null除外,它在流中具有特殊用途)。此类流被认为以“对象模式”运行。

创建流时,使用objectMode选项将流实例切换为对象模式。尝试将现有流切换到对象模式是不安全的。

缓冲#

WritableReadable流都将数据存储在内部缓冲区中。

可能缓冲的数据量取决于传递到流构造函数中的highWaterMark选项。对于普通流,highWaterMark 选项指定总字节数。对于以对象模式运行的流,highWaterMark指定对象总数。

当实现调用 stream.push(chunk) 时,数据会缓冲在 Readable 流中。如果 Stream 的使用者不调用stream.read(),数据将位于内部队列中直到被消耗。

一旦内部读取缓冲区的总大小达到highWaterMark指定的阈值,流将暂时停止从底层资源读取数据,直到当前缓冲的数据可以被消耗(即流将停止调用用于填充读取缓冲区的内部readable._read()方法)。

当 重复调用writable.write(chunk)方法时,数据会缓冲在 Writable 流中。当内部写入缓冲区的总大小低于 highWaterMark设置的阈值时,调用writable.write()将返回true。一旦内部缓冲区的大小达到或超过highWaterMark, 就会返回false

stream API(尤其是stream.pipe()方法)的一个关键目标是将数据缓冲限制在可接受的水平,以便不同速度的源和目标不会耗尽可用内存。

highWaterMark选项是一个阈值,而不是限制:它指示流在停止请求更多数据之前缓冲的数据量。一般来说,它不强制执行严格的内存限制。特定的流实现可以选择强制执行更严格的限制,但这样做是可选的。

由于DuplexTransform流都是ReadableWritable,因此每个流都维护两个单独的内部缓冲区,用于读取和写入,从而允许每一方独立运作,同时保持适当且高效的数据流。例如, net.Socket实例是Duplex流,其Readable端允许使用套接字接收的数据,其Writable端允许写入数据插座。由于数据写入套接字的速率可能比接收数据的速率更快或更慢,因此每一侧都应独立于另一侧进行操作(和缓冲)。

内部缓冲机制是内部实现细节,可能随时更改。但是,对于某些高级实现,可以使用writable.writableBufferreadable.readableBuffer检索内部缓冲区。不鼓励使用这些未记录的属性。

流消费者的API#

几乎所有 Node.js 应用程序,无论多么简单,都以某种方式使用流。以下是在实现 HTTP 服务器的 Node.js 应用程序中使用流的示例:

const http = require('node:http');

const server = http.createServer((req, res) => {
  // `req` is an http.IncomingMessage, which is a readable stream.
  // `res` is an http.ServerResponse, which is a writable stream.

  let body = '';
  // Get the data as utf8 strings.
  // If an encoding is not set, Buffer objects will be received.
  req.setEncoding('utf8');

  // Readable streams emit 'data' events once a listener is added.
  req.on('data', (chunk) => {
    body += chunk;
  });

  // The 'end' event indicates that the entire body has been received.
  req.on('end', () => {
    try {
      const data = JSON.parse(body);
      // Write back something interesting to the user:
      res.write(typeof data);
      res.end();
    } catch (er) {
      // uh oh! bad json!
      res.statusCode = 400;
      return res.end(`error: ${er.message}`);
    }
  });
});

server.listen(1337);

// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON 

Writable流(例如示例中的 res )公开用于将数据写入流的方法,例如write()end()

Readable流使用EventEmitter API 在可以从流中读取数据时通知应用程序代码。可以通过多种方式从流中读取可用数据。

WritableReadable流都以各种方式使用EventEmitter API 来传达流的当前状态。

DuplexTransform流都是WritableReadable

将数据写入流或从流中消耗数据的应用程序不需要直接实现流接口,并且通常没有理由调用require('node:stream')

希望实现新类型流的开发人员应参阅流实现者的 API部分。

可写流#

可写流是数据写入目的地的抽象。

Writable流的示例包括:

其中一些示例实际上是实现 Writable接口的 Duplex 流。

所有Writable流都实现由stream.Writable类定义的接口 。

虽然Writable流的特定实例可能有多种不同,但所有Writable流都遵循相同的基本使用模式,如下例所示:

const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data'); 
类:stream.Writable#
事件:'close'#

当流及其任何底层资源(例如文件描述符)关闭时,会发出'close'事件。该事件表明不会再发出任何事件,也不会进行进一步的计算。

如果 Writable流是使用emitClose选项创建的,则它始终会发出'close' 事件。

事件:'drain'#

如果对stream.write(chunk)调用返回false, 则当适合继续将数据写入流时,将发出'drain'事件。

// Write the data to the supplied writable stream one million times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000;
  write();
  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {
        // Last time!
        writer.write(data, encoding, callback);
      } else {
        // See if we should continue, or wait.
        // Don't pass the callback, because we're not done yet.
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // Had to stop early!
      // Write some more once it drains.
      writer.once('drain', write);
    }
  }
} 
事件:'error'#

如果写入或传输数据时发生错误,则会发出'error'事件。调用时,侦听器回调会传递一个Error参数。

当发出 'error'事件时,流将关闭,除非在创建流时将autoDestroy选项设置为false

'error'之后,除了'close'之外, 不应再发出其他事件(包括'error'事件)。

事件:'finish'#

调用stream.end()方法后,会发出'finish'事件,并且所有数据都已刷新到底层系统。

const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`);
}
writer.on('finish', () => {
  console.log('All writes are now complete.');
});
writer.end('This is the end\n'); 
事件:'pipe'#

当在可读流上调用 stream.pipe()方法时,会发出'pipe'事件,并将此可写流添加到其目标集。

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
  console.log('Something is piping into the writer.');
  assert.equal(src, reader);
});
reader.pipe(writer); 
事件:'unpipe'#

当在Readable流上调用stream.unpipe()方法时,会发出'unpipe' 事件,从而从其目标集中删除此Writable

如果Writable流在Readable流通过管道传输到其中时发出错误, 也会发出此错误。

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
  console.log('Something has stopped piping into the writer.');
  assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer); 
writable.cork()#

writable.cork()方法强制将所有写入的数据缓冲在内存中。当调用stream.uncork()stream.end()方法时,缓冲的数据将被刷新。

writable.cork()的主要目的是适应将多个小块快速连续写入流的情况。writable.cork()不会立即将它们转发到底层目标,而是 缓冲所有块,直到调用 writable.uncork()为止,这会将它们全部传递到writable._writev()(如果存在)。这可以防止出现队头阻塞情况,即在等待处理第一个小块时缓冲数据。但是,使用writable.cork()而不实现 writable._writev()可能会对吞吐量产生不利影响。

另请参阅:writable.uncork()writable._writev()

writable.destroy([error])#

摧毁流。(可选)发出'error'事件,并发出'close' 事件(除非emitClose设置为false)。在此调用之后,可写流已结束,随后对write()end()的调用将导致ERR_STREAM_DESTROYED错误。这是一种破坏性的、直接的破坏流的方法。之前对 write() 的调用可能尚未耗尽,并且可能会触发ERR_STREAM_DESTROYED错误。如果数据应在关闭前刷新,请使用end()而不是 destroy,或者在销毁流之前等待'drain'事件。

const { Writable } = require('node:stream');

const myStream = new Writable();

const fooErr = new Error('foo error');
myStream.destroy(fooErr);
myStream.on('error', (fooErr) => console.error(fooErr.message)); // foo error 
const { Writable } = require('node:stream');

const myStream = new Writable();

myStream.destroy();
myStream.on('error', function wontHappen() {}); 
const { Writable } = require('node:stream');

const myStream = new Writable();
myStream.destroy();

myStream.write('foo', (error) => console.error(error.code));
// ERR_STREAM_DESTROYED 

一旦调用了destroy() ,任何进一步的调用都将成为空操作,并且除了_destroy()之外的任何进一步的错误都不会被发出为'error'

实现者不应重写此方法,而应实现writable._destroy()

writable.closed#

发出'close'之后是true

writable.destroyed#

调用writable.destroy()之后是 true

const { Writable } = require('node:stream');

const myStream = new Writable();

console.log(myStream.destroyed); // false
myStream.destroy();
console.log(myStream.destroyed); // true 
writable.end([chunk[, encoding]][, callback])#
  • chunk <字符串> | <缓冲区> | <Uint8Array> | <any>要写入的可选数据。对于不在对象模式下运行的流,chunk必须是字符串、BufferUint8Array。对于对象模式流,chunk可以是除null之外的任何 JavaScript 值。
  • encoding <string>如果chunk是字符串,则编码
  • callback <Function>流完成时的回调。
  • 返回:<此>

调用writable.end()方法表示不再有数据写入Writable。可选的chunkencoding参数允许在关闭流之前立即写入最后一个附加数据块。

在调用stream.end() 后 调用 stream.write() 方法将引发错误。

// Write 'hello, ' and then end with 'world!'.
const fs = require('node:fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// Writing more now is not allowed! 
writable.setDefaultEncoding(encoding)#

writable.setDefaultEncoding()方法为 Writable流设置默认的 encoding

writable.uncork()#

writable.uncork()方法刷新自调用stream.cork()以来缓冲的所有数据 。

当使用writable.cork()writable.uncork()管理对流的写入缓冲时,请使用 process.nextTick()推迟对 writable.uncork()的调用。这样做可以对给定 Node.js 事件循环阶段内发生的所有 writable.write()调用进行批处理。

stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork()); 

如果在流上多次调用writable.cork()方法,则必须调用相同次数的writable.uncork()来刷新缓冲数据。

stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
  stream.uncork();
  // The data will not be flushed until uncork() is called a second time.
  stream.uncork();
}); 

另请参阅:writable.cork()

writable.writable#

如果可以安全调用writable.write() ,则为true,这意味着流尚未被破坏、出错或结束。

writable.writableAborted#

稳定性:1 - 实验性

返回流在发出'finish'之前是否被破坏或出错。

writable.writableEnded#

调用writable.end()之后是 true此属性并不指示数据是否已刷新,为此请使用 writable.writableFinished代替。

writable.writableCorked#

为了完全解锁流而需要调用writable.uncork()的次数。

writable.errored#

如果流因错误而被破坏,则返回错误。

writable.writableFinished#

在发出 'finish'事件之前立即设置为true

writable.writableHighWaterMark#

返回创建此Writable时传递的highWaterMark的值。

writable.writableLength#

该属性包含队列中准备写入的字节(或对象)数。该值提供有关highWaterMark状态的内省数据。

writable.writableNeedDrain#

如果流的缓冲区已满并且流将发出'drain' ,则为true

writable.writableObjectMode#

给定 Writable 流的属性objectMode的 getter 。

writable.write(chunk[, encoding][, callback])#
  • chunk <字符串> | <缓冲区> | <Uint8Array> | <any>要写入的可选数据。对于不在对象模式下运行的流,chunk必须是字符串、BufferUint8Array。对于对象模式流,chunk可以是除null之外的任何 JavaScript 值。
  • encoding <字符串> | <null>如果chunk是字符串,则为编码。默认值: 'utf8'
  • callback <Function>刷新此数据块时的回调。
  • 返回:<boolean> false如果流希望调用代码在继续写入其他数据之前等待'drain'事件发出;否则true

writable.write()方法将一些数据写入流,并在数据完全处理后调用提供的callback 。如果发生错误,将调用callback ,并将错误作为其第一个参数。在发出'error'之前,异步调用callback

如果内部缓冲区小于在 承认 chunk后创建流时配置的highWaterMark,则返回值为 true 。如果返回false ,则应停止向流写入数据的进一步尝试,直到发出'drain'事件。

当流未耗尽时,对write()的调用将缓冲chunk,并返回 false。一旦所有当前缓冲的块都被耗尽(操作系统接受交付),就会发出'drain'事件。一旦write()返回 false,则在发出'drain'事件之前不要写入更多块。虽然允许在不耗尽的流上调用write(),但 Node.js 将缓冲所有写入的块,直到出现最大内存使用量,此时它将无条件中止。即使在中止之前,高内存使用率也会导致垃圾收集器性能不佳和 RSS 较高(即使不再需要内存,通常也不会将其释放回系统)。由于如果远程对等方不读取数据,TCP 套接字可能永远不会耗尽,因此编写不耗尽的套接字可能会导致远程可利用的漏洞。

对于Transform来说,在流未耗尽时写入数据尤其成问题,因为Transform流默认暂停,直到它们通过管道传输或'data'或添加了'readable'事件处理程序。

如果要写入的数据可以按需生成或获取,建议将逻辑封装在Readable中并使用 stream.pipe()。但是,如果首选调用write(),则可以使用 'drain'事件遵守背压并避免内存问题:

function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb);
  } else {
    process.nextTick(cb);
  }
}

// Wait for cb to be called before doing any other write.
write('hello', () => {
  console.log('Write completed, do more writes now.');
}); 

对象模式下的Writable流将始终忽略encoding参数。

可读流#

可读流是消费数据的抽象。

Readable流的示例包括:

所有Readable流都实现由stream.Readable类定义的接口 。

两种阅读模式#

Readable流有效地以两种模式之一运行:流动和暂停。这些模式与对象模式是分开的。Readable流可以处于对象模式,也可以不处于对象模式,无论它处于流动模式还是暂停模式。

所有Readable流开始时均处于暂停模式,但可以通过以下方式之一切换到流动模式:

Readable可以使用以下方法之一切换回暂停模式:

要记住的重要概念是,在提供使用或忽略数据的机制之前, Readable不会生成数据。如果消费机制被禁用或取消,Readable尝试 停止生成数据。

出于向后兼容性的原因,删除'data'事件处理程序不会 自动暂停流。此外,如果存在管道目的地,则调用stream.pause()将不能保证一旦这些目的地耗尽并请求更多数据,流将保持暂停状态。

如果Readable切换到流动模式并且没有消费者可用于处理数据,则该数据将会丢失。例如,当调用readable.resume()方法而没有附加到'data'事件的侦听器时,或者当从'data'事件处理程序中删除时,可能会发生这种情况。Stream 流。

添加'readable'事件处理程序会自动使流停止流动,并且必须通过 readable.read()消耗数据。如果删除'readable'事件处理程序,并且存在 'data'事件处理程序,则流将再次开始流动。

三种状态#

Readable流的“两种操作模式”是对Readable流实现中发生的更复杂的内部状态管理的简化抽象。

具体来说,在任何给定时间点,每个Readable都处于三种可能状态之一:

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

readable.readableFlowingnull时,不提供消费流数据的机制。因此,流不会生成数据。在此状态下,为'data'事件附加侦听器、调用 readable.pipe()方法或调用readable.resume()方法将切换 readable.readableFlowingtrue,导致Readable在生成数据时开始主动发出事件。

调用readable.pause()readable.unpipe()或接收背压将导致readable.readableFlowing被设置为false,暂时停止事件的流动,但不会停止数据的生成。在此状态下,附加'data'事件的侦听器不会将readable.readableFlowing切换到true

const { PassThrough, Writable } = require('node:stream');
const pass = new PassThrough();
const writable = new Writable();

pass.pipe(writable);
pass.unpipe(writable);
// readableFlowing is now false.

pass.on('data', (chunk) => { console.log(chunk.toString()); });
// readableFlowing is still false.
pass.write('ok');  // Will not emit 'data'.
pass.resume();     // Must be called to make stream emit 'data'.
// readableFlowing is now true. 

虽然readable.readableFlowingfalse,但数据可能会在流的内部缓冲区中累积。

选择一种 API 样式#

Readable流API 跨多个 Node.js 版本发展,并提供了多种使用流数据的方法。一般来说,开发人员应该选择 一种消费数据的方法,而不应该使用多种方法来消费单个流中的数据。具体来说,使用on('data')on('readable')pipe()或异步迭代器的组合可能会导致不直观的行为。

类:stream.Readable#
事件:'close'#

当流及其任何底层资源(例如文件描述符)关闭时,会发出'close'事件。该事件表明不会再发出任何事件,也不会进行进一步的计算。

如果Readable流是使用emitClose选项创建的,则它始终会发出'close' 事件。

事件:'data'#
  • chunk <缓冲区> | <字符串> | <any>数据块。对于不在对象模式下运行的流,块将是字符串或Buffer。对于对象模式下的流,块可以是除null之外的任何 JavaScript 值。

每当流将数据块的所有权交给使用者时,就会发出'data'事件。每当通过调用readable.pipe()readable.resume()或通过将侦听器回调附加到'data'事件将流切换为流动模式时,都可能会发生这种情况。每当调用 readable.read()方法并且有数据块可返回时,也会发出 'data' 事件。

'data'事件侦听器附加到尚未显式暂停的流会将流切换到流动模式。一旦有数据可用,就会立即传递。

如果使用readable.setEncoding()方法为流指定了默认编码,则侦听器回调将以字符串形式传递数据块 ;否则数据将作为 Buffer传递。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
}); 
事件:'end'#

当流中不再有数据可供使用时,会发出'end'事件。

除非数据完全消耗,否则不会发出'end'事件。这可以通过将流切换到流动模式或重复调用stream.read()直到所有数据都被消耗来完成。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
  console.log('There will be no more data.');
}); 
事件:'error'#

'error'事件可以随时由Readable实现发出。通常,如果底层流由于底层内部故障而无法生成数据,或者当流实现尝试推送无效数据块时,可能会发生这种情况。

侦听器回调将传递单个Error对象。

事件:'pause'#

当调用 stream.pause()并且readableFlowing不是false时,会发出 'pause'事件。

事件:'readable'#

当有数据可供从流中读取或到达流末尾时,会发出'readable'事件。实际上, 'readable'事件表明流有新信息。如果数据可用,stream.read()将返回该数据。

const readable = getReadableStreamSomehow();
readable.on('readable', function() {
  // There is some data to read now.
  let data;

  while ((data = this.read()) !== null) {
    console.log(data);
  }
}); 

如果已到达流末尾,调用 stream.read()将返回null并触发'end' 事件。如果从未有任何数据要读取,也是如此。例如,在以下示例中,foo.txt是一个空文件:

const fs = require('node:fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
  console.log(`readable: ${rr.read()}`);
});
rr.on('end', () => {
  console.log('end');
}); 

运行该脚本的输出是:

$ node test.js
readable: null
end 

在某些情况下,为'readable'事件附加侦听器将导致将一定量的数据读入内部缓冲区。

一般来说,readable.pipe()'data'事件机制比'readable'事件更容易理解。但是,处理'readable'可能会导致吞吐量增加。

如果同时使用'readable''data' ,则 'readable' 优先控制流量,即仅发出'data'当调用stream.read()时。readableFlowing属性 将变为false。如果在移除'readable'时存在'data'侦听器,则流将开始流动,即,将在不调用 .resume() 的情况下发出 'data'事件。

事件:'resume'#

当调用 stream.resume()readableFlowing不是true时,会发出 'resume'事件。

readable.destroy([error])#

摧毁流。(可选)发出'error'事件,并发出'close' 事件(除非emitClose设置为false)。在此调用之后,可读流将释放所有内部资源,并且对push() 的后续调用 将被忽略。

一旦调用了destroy() ,任何进一步的调用都将成为空操作,并且除了_destroy()之外的任何其他错误都不会被发出为'error'

实现者不应重写此方法,而应实现 readable._destroy()

readable.closed#

发出'close'之后是true

readable.destroyed#

调用readable.destroy()之后是 true

readable.isPaused()#

readable.isPaused()方法返回Readable的当前操作状态 。这主要由readable.pipe()方法背后的机制使用 。在大多数典型情况下,没有理由直接使用此方法。

const readable = new stream.Readable();

readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false 
readable.pause()#

readable.pause()方法将导致处于流动模式的流停止发出'data'事件,从而切换出流动模式。任何可用的数据都将保留在内部缓冲区中。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
  readable.pause();
  console.log('There will be no additional data for 1 second.');
  setTimeout(() => {
    console.log('Now data will start flowing again.');
    readable.resume();
  }, 1000);
}); 

如果存在'readable'事件监听器,则readable.pause()方法无效 。

readable.pipe(destination[, options])#

readable.pipe()方法将Writable流附加到readable,使其自动切换到流动模式并将其所有数据推送到附加的Writable。数据流将被自动管理,以便目标Writable流不会被更快的 Readable流淹没。

以下示例将所有数据从readable传输到名为file.txt的文件中:

const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'.
readable.pipe(writable); 

可以将多个Writable流附加到单个Readable 流。

readable.pipe()方法返回对目标流的引用,从而可以设置管道流链:

const fs = require('node:fs');
const zlib = require('node:zlib');
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w); 

默认情况下,当源 Readable 流发出'end' 时,在目标Writable流上调用stream.end(),以便目标不再是可写。要禁用此默认行为,可以将end选项作为false 传递,从而使目标流保持打开状态:

reader.pipe(writer, { end: false });
reader.on('end', () => {
  writer.end('Goodbye\n');
}); 

一个重要的警告是,如果Readable流在处理过程中发出错误,则Writable目标不会自动关闭。如果发生错误,则需要手动关闭每个流以防止内存泄漏。

无论指定的选项如何,在 Node.js 进程退出之前,process.stderrprocess.stdout Writable流都不会关闭。

readable.read([size])#

readable.read()方法从内部缓冲区读取数据并将其返回。如果没有数据可供读取,则返回null 。默认情况下,数据以Buffer对象的形式返回,除非使用readable.setEncoding()方法指定了编码或者流在对象模式下运行。

可选的size参数指定要读取的特定字节数。如果 无法读取size字节,则将返回null ,除非 流已结束,在这种情况下,将返回内部缓冲区中剩余的所有数据。

如果未指定size参数,则将返回内部缓冲区中包含的所有数据。

size参数必须小于或等于 1 GiB。

readable.read()方法只能在以暂停模式运行的Readable流上调用。在流动模式下,会自动调用readable.read(),直到内部缓冲区完全耗尽。

const readable = getReadableStreamSomehow();

// 'readable' may be triggered multiple times as data is buffered in
readable.on('readable', () => {
  let chunk;
  console.log('Stream is readable (new data received in buffer)');
  // Use a loop to make sure we read all currently available data
  while (null !== (chunk = readable.read())) {
    console.log(`Read ${chunk.length} bytes of data...`);
  }
});

// 'end' will be triggered once when there is no more data available
readable.on('end', () => {
  console.log('Reached end of stream.');
}); 

每次调用readable.read()都会返回一块数据,即null。这些块没有连接。需要while循环来消耗缓冲区中当前的所有数据。读取大文件时.read()可能会返回null,到目前为止已经消耗了所有缓冲的内容,但仍有更多数据尚未缓冲。在这种情况下,当缓冲区中有更多数据时,将发出新的'readable'事件。最后,当没有更多数据到来时,将发出'end'事件。

因此,要从readable读取文件的全部内容,有必要跨多个'readable'事件收集块:

const chunks = [];

readable.on('readable', () => {
  let chunk;
  while (null !== (chunk = readable.read())) {
    chunks.push(chunk);
  }
});

readable.on('end', () => {
  const content = chunks.join('');
}); 

对象模式下的Readable流将始终通过调用readable.read(size)返回单个项目,无论size参数的值如何 。

如果readable.read()方法返回一块数据,则也会发出'data'事件。

在发出'end'事件后调用stream.read([size]) 将返回null。不会引发运行时错误。

readable.readable#

如果可以安全调用readable.read() ,则为true,这意味着流尚未被销毁或发出'error''end'

readable.readableAborted#

稳定性:1 - 实验性

返回流在发出'end'之前是否被破坏或出错。

readable.readableDidRead#

稳定性:1 - 实验性

返回是否已发出'data'

readable.readableEncoding#

给定Readable 流的属性encoding 的 getter 。 可以使用readable.setEncoding()方法设置encoding属性。

readable.readableEnded#

当发出 'end'事件时,变为true

readable.errored#

如果流因错误而被破坏,则返回错误。

readable.readableFlowing#

此属性反映了Readable流的当前状态,如“三种状态”部分中所述。

readable.readableHighWaterMark#

返回创建此Readable时传递的highWaterMark的值。

readable.readableLength#

该属性包含队列中准备读取的字节(或对象)数量。该值提供有关highWaterMark状态的内省数据。

readable.readableObjectMode#

给定Readable 流的属性objectMode 的 getter 。

readable.resume()#

readable.resume()方法会导致显式暂停的Readable流恢复发出'data'事件,从而将流切换到流动模式。

readable.resume()方法可用于完全使用流中的数据,而无需实际处理任何数据:

getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('Reached the end, but did not read anything.');
  }); 

如果存在'readable'事件侦听器,则readable.resume()方法无效 。

readable.setEncoding(encoding)#

readable.setEncoding()方法设置从Readable流读取的数据的字符编码。

默认情况下,不分配编码,流数据将作为Buffer对象返回 。设置编码会导致流数据作为指定编码的字符串返回,而不是作为Buffer 对象返回。例如,调用readable.setEncoding('utf8')将导致输出数据被解释为 UTF-8 数据,并作为字符串传递。调用 readable.setEncoding('hex')将导致数据以十六进制字符串格式编码。

Readable流将正确处理通过流传递的多字节字符,否则如果只是将其作为Buffer对象从流中拉出,这些字符将无法正确解码。

const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
  assert.equal(typeof chunk, 'string');
  console.log('Got %d characters of string data:', chunk.length);
}); 
readable.unpipe([destination])#

readable.unpipe()方法分离先前使用stream.pipe()方法附加的Writable 流。

如果未指定destination ,则所有管道都会分离。

如果指定了destination,但没有为其设置管道,则该方法不执行任何操作。

const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second.
readable.pipe(writable);
setTimeout(() => {
  console.log('Stop writing to file.txt.');
  readable.unpipe(writable);
  console.log('Manually close the file stream.');
  writable.end();
}, 1000); 
readable.unshift(chunk[, encoding])#
  • chunk <缓冲区> | <Uint8Array> | <字符串> | <空> | <any>要取消移至读取队列的数据块。对于不在对象模式下运行的流,chunk必须是字符串、BufferUint8Arraynull。对于对象模式流,chunk 可以是任何 JavaScript 值。
  • encoding <string>字符串块的编码。必须是有效的 Buffer编码,例如'utf8''ascii'

chunk作为null传递表示流结束 (EOF),其行为与readable.push(null)相同,之后无法写入更多数据。EOF 信号被放置在缓冲区的末尾,任何缓冲的数据仍将被刷新。

readable.unshift()方法将一块数据推回内部缓冲区。这在某些情况下非常有用,在这种情况下,流正在被代码消耗,而这些代码需要“取消消耗”从源中乐观地拉出的一些数据,以便可以将数据传递给其他方。

发出 'end' 事件后,无法调用stream.unshift(chunk)方法,否则将引发运行时错误。

经常使用stream.unshift()的开发者应该考虑改用Transform流。 有关详细信息,请参阅流实现者 API部分。

// Pull off a header delimited by \n\n.
// Use unshift() if we get too much.
// Call the callback with (error, header, stream).
const { StringDecoder } = require('node:string_decoder');
function parseHeader(stream, callback) {
  stream.on('error', callback);
  stream.on('readable', onReadable);
  const decoder = new StringDecoder('utf8');
  let header = '';
  function onReadable() {
    let chunk;
    while (null !== (chunk = stream.read())) {
      const str = decoder.write(chunk);
      if (str.includes('\n\n')) {
        // Found the header boundary.
        const split = str.split(/\n\n/);
        header += split.shift();
        const remaining = split.join('\n\n');
        const buf = Buffer.from(remaining, 'utf8');
        stream.removeListener('error', callback);
        // Remove the 'readable' listener before unshifting.
        stream.removeListener('readable', onReadable);
        if (buf.length)
          stream.unshift(buf);
        // Now the body of the message can be read from the stream.
        callback(null, header, stream);
        return;
      }
      // Still reading the header.
      header += str;
    }
  }
} 

stream.push(chunk)不同,stream.unshift(chunk)不会通过重置流的内部读取状态来结束读取过程。如果在读取期间调用readable.unshift() (即从自定义流的stream._read()实现中),这可能会导致意外结果。在调用readable.unshift()后立即调用 stream.push('')将适当地重置阅读状态,但是最好避免在执行操作的过程中调用readable.unshift()读。

readable.wrap(stream)#

在 Node.js 0.10 之前,流并未实现 当前定义的整个node:stream模块 API。(有关详细信息,请参阅兼容性。)

当使用发出'data'事件并具有 仅供参考的 stream.pause()方法的旧 Node.js 库时, readable.wrap()方法可用于创建使用旧流作为数据源的Readable流。

很少需要使用readable.wrap(),但提供该方法是为了方便与旧版 Node.js 应用程序和库进行交互。

const { OldReader } = require('./old-api-module.js');
const { Readable } = require('node:stream');
const oreader = new OldReader();
const myReader = new Readable().wrap(oreader);

myReader.on('readable', () => {
  myReader.read(); // etc.
}); 
readable[Symbol.asyncIterator]()#
const fs = require('node:fs');

async function print(readable) {
  readable.setEncoding('utf8');
  let data = '';
  for await (const chunk of readable) {
    data += chunk;
  }
  console.log(data);
}

print(fs.createReadStream('file')).catch(console.error); 

如果循环以breakreturnthrow终止,则流将被销毁。换句话说,迭代流将完全消耗该流。流将以等于highWaterMark 选项大小的块读取。在上面的代码示例中,如果文件的数据少于 64 KiB,则数据将位于单个块中,因为没有向fs.createReadStream()提供 highWaterMark选项。

readable[Symbol.asyncDispose]()#

稳定性:1 - 实验性

使用AbortError调用readable.destroy()并返回一个在流完成时履行的 Promise 。

readable.compose(stream[, options])#

稳定性:1 - 实验性

import { Readable } from 'node:stream';

async function* splitToWords(source) {
  for await (const chunk of source) {
    const words = String(chunk).split(' ');

    for (const word of words) {
      yield word;
    }
  }
}

const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords);
const words = await wordsStream.toArray();

console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator'] 

请参阅stream.compose了解更多信息。

readable.iterator([options])#

稳定性:1 - 实验性

  • options <对象>
    • destroyOnReturn <boolean>当设置为false时,在异步迭代器上调用 return,或使用breakreturnthrow不会销毁流。默认值:true
  • 返回:<AsyncIterator>以使用流。

如果returnbreakthrow,或者如果流在迭代期间发出错误,迭代器是否应销毁流。

const { Readable } = require('node:stream');

async function printIterator(readable) {
  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk); // 1
    break;
  }

  console.log(readable.destroyed); // false

  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk); // Will print 2 and then 3
  }

  console.log(readable.destroyed); // True, stream was totally consumed
}

async function printSymbolAsyncIterator(readable) {
  for await (const chunk of readable) {
    console.log(chunk); // 1
    break;
  }

  console.log(readable.destroyed); // true
}

async function showBoth() {
  await printIterator(Readable.from([1, 2, 3]));
  await printSymbolAsyncIterator(Readable.from([1, 2, 3]));
}

showBoth(); 
readable.map(fn[, options])#

稳定性:1 - 实验性

此方法允许映射流。将为流中的每个块调用fn函数。如果fn函数返回一个 Promise - 该 Promise 将在传递到结果流之前进行await编辑。

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

// With a synchronous mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
  console.log(chunk); // 2, 4, 6, 8
}
// With an asynchronous mapper, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
  'nodejs.org',
  'openjsf.org',
  'www.linuxfoundation.org',
]).map((domain) => resolver.resolve4(domain), { concurrency: 2 });
for await (const result of dnsResults) {
  console.log(result); // Logs the DNS result of resolver.resolve4.
} 
readable.filter(fn[, options])#

稳定性:1 - 实验性

此方法允许过滤流。对于流中的每个块, 将调用fn函数,如果它返回真值,则该块将被传递到结果流。如果fn函数返回一个 Promise - 该 Promise 将被await编辑。

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
  console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
  'nodejs.org',
  'openjsf.org',
  'www.linuxfoundation.org',
]).filter(async (domain) => {
  const { address } = await resolver.resolve4(domain, { ttl: true });
  return address.ttl > 60;
}, { concurrency: 2 });
for await (const result of dnsResults) {
  // Logs domains with more than 60 seconds on the resolved dns record.
  console.log(result);
} 
readable.forEach(fn[, options])#

稳定性:1 - 实验性

此方法允许迭代流。对于流中的每个块, 将调用fn函数。如果fn函数返回一个 Promise - 该 Promise 将被await编辑。

此方法与for await...of循环的不同之处在于它可以选择同时处理块。此外,只能通过传递 signal 选项并中止相关的 AbortController 来停止 forEach 迭代,而for await...of可以停止与breakreturn。无论哪种情况,流都会被销毁。

此方法与监听'data'事件不同,它使用底层机器中的readable事件,并且可以限制并发fn调用的数量。

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
  console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
  'nodejs.org',
  'openjsf.org',
  'www.linuxfoundation.org',
]).map(async (domain) => {
  const { address } = await resolver.resolve4(domain, { ttl: true });
  return address;
}, { concurrency: 2 });
await dnsResults.forEach((result) => {
  // Logs result, similar to `for await (const result of dnsResults)`
  console.log(result);
});
console.log('done'); // Stream has finished 
readable.toArray([options])#

稳定性:1 - 实验性

  • options <对象>
    • signal <AbortSignal>允许在信号中止时取消 toArray 操作。
  • 返回:<Promise>一个包含包含流内容的数组的 Promise。

该方法可以轻松获取流的内容。

由于此方法将整个流读入内存,因此它否定了流的优点。它旨在实现互操作性和便利性,而不是作为使用流的主要方式。

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]

// Make dns queries concurrently using .map and collect
// the results into an array using toArray
const dnsResults = await Readable.from([
  'nodejs.org',
  'openjsf.org',
  'www.linuxfoundation.org',
]).map(async (domain) => {
  const { address } = await resolver.resolve4(domain, { ttl: true });
  return address;
}, { concurrency: 2 }).toArray(); 
readable.some(fn[, options])#

稳定性:1 - 实验性

此方法类似于Array.prototype.some ,并对流中的每个块调用fn ,直到等待的返回值为true(或任何真值)。一旦对块等待返回值的fn调用为真,流就会被销毁,并通过true履行 Promise 。如果对块的任何fn调用都没有返回真值,则通过false 履行 Promise 。

import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).some((x) => x > 2); // true
await Readable.from([1, 2, 3, 4]).some((x) => x < 0); // false

// With an asynchronous predicate, making at most 2 file checks at a time.
const anyBigFile = await Readable.from([
  'file1',
  'file2',
  'file3',
]).some(async (fileName) => {
  const stats = await stat(fileName);
  return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished 
readable.find(fn[, options])#

稳定性:1 - 实验性

此方法与Array.prototype.find类似,并对流中的每个块调用fn以查找具有fn真值的块。一旦fn调用等待的返回值为真值,流就会被销毁,并通过fn返回真值的值来履行 Promise 。如果对块的所有 fn调用都返回虚假值,则通过undefined履行 Promise 。

import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).find((x) => x > 2); // 3
await Readable.from([1, 2, 3, 4]).find((x) => x > 0); // 1
await Readable.from([1, 2, 3, 4]).find((x) => x > 10); // undefined

// With an asynchronous predicate, making at most 2 file checks at a time.
const foundBigFile = await Readable.from([
  'file1',
  'file2',
  'file3',
]).find(async (fileName) => {
  const stats = await stat(fileName);
  return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(foundBigFile); // File name of large file, if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished 
readable.every(fn[, options])#

稳定性:1 - 实验性

此方法类似于Array.prototype.every并在流中的每个块上调用fn以检查所有等待的返回值是否都是fn的真值。一旦对块等待返回值的fn调用为假,流就会被销毁,并通过false履行 Promise 。如果对块的所有fn调用都返回真值,则通过true履行 Promise 。

import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).every((x) => x > 2); // false
await Readable.from([1, 2, 3, 4]).every((x) => x > 0); // true

// With an asynchronous predicate, making at most 2 file checks at a time.
const allBigFiles = await Readable.from([
  'file1',
  'file2',
  'file3',
]).every(async (fileName) => {
  const stats = await stat(fileName);
  return stats.size > 1024 * 1024;
}, { concurrency: 2 });
// `true` if all files in the list are bigger than 1MiB
console.log(allBigFiles);
console.log('done'); // Stream has finished 
readable.flatMap(fn[, options])#

稳定性:1 - 实验性

此方法通过将给定的回调应用于流的每个块然后展平结果来返回一个新流。

可以从 fn返回一个流或另一个可迭代或异步可迭代,结果流将被合并(展平)到返回的流中。

import { Readable } from 'node:stream';
import { createReadStream } from 'node:fs';

// With a synchronous mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
  console.log(chunk); // 1, 1, 2, 2, 3, 3, 4, 4
}
// With an asynchronous mapper, combine the contents of 4 files
const concatResult = Readable.from([
  './1.mjs',
  './2.mjs',
  './3.mjs',
  './4.mjs',
]).flatMap((fileName) => createReadStream(fileName));
for await (const result of concatResult) {
  // This will contain the contents (all chunks) of all 4 files
  console.log(result);
} 
readable.drop(limit[, options])#

稳定性:1 - 实验性

此方法返回一个新的流,其中删除了前limit块。

import { Readable } from 'node:stream';

await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4] 
readable.take(limit[, options])#

稳定性:1 - 实验性

此方法返回包含前limit块的新流。

import { Readable } from 'node:stream';

await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2] 
readable.asIndexedPairs([options])#

稳定性:1 - 实验性

此方法返回一个新流,其中包含与[index, chunk]形式的计数器配对的底层流块。第一个索引值为 0,每生成一个块,该索引值就会增加 1。

import { Readable } from 'node:stream';

const pairs = await Readable.from(['a', 'b', 'c']).asIndexedPairs().toArray();
console.log(pairs); // [[0, 'a'], [1, 'b'], [2, 'c']] 
readable.reduce(fn[, initial[, options]])#

稳定性:1 - 实验性

此方法按顺序对流的每个块调用fn,并将前一个元素的计算结果传递给它。它返回对减少的最终值的 Promise 。

如果没有提供initial值,则流的第一个块将用作初始值。如果流为空,则 Promise 会被带有 ERR_INVALID_ARGS code 属性的TypeError 拒绝。

import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';

const directoryPath = './src';
const filesInDir = await readdir(directoryPath);

const folderSize = await Readable.from(filesInDir)
  .reduce(async (totalSize, file) => {
    const { size } = await stat(join(directoryPath, file));
    return totalSize + size;
  }, 0);

console.log(folderSize); 

reducer 函数逐个元素地迭代流,这意味着没有concurrency参数或并行性。要同时执行reduce ,您可以将异步函数提取到readable.map方法。

import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';

const directoryPath = './src';
const filesInDir = await readdir(directoryPath);

const folderSize = await Readable.from(filesInDir)
  .map((file) => stat(join(directoryPath, file)), { concurrency: 2 })
  .reduce((totalSize, { size }) => totalSize + size, 0);

console.log(folderSize); 

双工和转换流#

类:stream.Duplex#

双工流是同时实现ReadableWritable接口的流。

Duplex流的示例包括:

duplex.allowHalfOpen#

如果false则当可读端结束时,流将自动结束可写端。最初由allowHalfOpen构造函数选项设置,默认为true

可以手动更改此设置以更改现有Duplex流实例的半打开行为 ,但必须在发出'end'事件之前更改。

类:stream.Transform#

转换流是Duplex流,其中输出以某种方式与输入相关。与所有Duplex流一样,Transform流同时实现ReadableWritable接口。

Transform流的示例包括:

transform.destroy([error])#

销毁流,并可以选择发出'error'事件。在此调用之后,转换流将释放所有内部资源。实现者不应重写此方法,而应实现 readable._destroy()Transform_destroy()的默认实现也会发出'close' ,除非emitClose设置为 false。

一旦调用了destroy(),任何进一步的调用都将是空操作,并且除了_destroy()之外的任何其他错误都不会被发出为'error'

stream.finished(stream[, options], callback)#

可读和/或可写流/网络流。

  • options <对象>

    • error <boolean>如果设置为false ,则对emit('error', err) 的调用不会被视为已完成。默认值: true
    • readable <boolean>当设置为false时,即使流可能仍然可读,也会在流结束时调用回调。 默认值: true
    • writable <boolean>当设置为false时,即使流可能仍然可写,也会在流结束时调用回调。 默认值: true
    • signal <AbortSignal>允许中止等待流完成。如果信号被中止,底层流也不会中止。回调将使用AbortError进行调用。此函数添加的所有已注册侦听器也将被删除。
    • cleanup <boolean>删除所有已注册的流侦听器。 默认值: false
  • callback <Function>带有可选错误参数的回调函数。

  • 返回:<Function>删除所有已注册侦听器的清理函数。

当流不再可读、可写或遇到错误或过早关闭事件时收到通知的函数。

const { finished } = require('node:stream');
const fs = require('node:fs');

const rs = fs.createReadStream('archive.tar');

finished(rs, (err) => {
  if (err) {
    console.error('Stream failed.', err);
  } else {
    console.log('Stream is done reading.');
  }
});

rs.resume(); // Drain the stream. 

在流被过早销毁(例如中止的 HTTP 请求)并且不会发出'end''finish' 的错误处理场景中特别有用。

finished API提供Promise 版本

stream.finished()在{ { { _ _ 1088}}}已被调用。这样做的原因是意外的'error'事件(由于不正确的流实现)不会导致意外崩溃。如果这是不需要的行为,则需要在回调中调用返回的清理函数:

const cleanup = finished(rs, (err) => {
  cleanup();
  // ...
}); 

stream.pipeline(source[, ...transforms], destination, callback)#

stream.pipeline(streams, callback)#

一种模块方法,用于在流和生成器之间进行管道传输,转发错误并正确清理并在管道完成时提供回调。

const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.

// A pipeline to gzip a potentially huge tar file efficiently:

pipeline(
  fs.createReadStream('archive.tar'),
  zlib.createGzip(),
  fs.createWriteStream('archive.tar.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed.', err);
    } else {
      console.log('Pipeline succeeded.');
    }
  },
); 

pipeline API提供了一个 Promise 版本

stream.pipeline()将在所有流上调用stream.destroy(err) ,除了:

  • Readable已发出'end''close'的流。
  • Writable已发出'finish''close'的流。

调用callback 后,stream.pipeline()在流上留下悬空事件侦听器。在失败后重用流的情况下,这可能会导致事件侦听器泄漏和吞没错误。如果最后一个流可读,则将删除悬空事件侦听器,以便稍后可以使用最后一个流。

当出现错误时, stream.pipeline()会关闭所有流。一旦IncomingRequestpipeline一起使用会破坏套接字而不发送预期响应,则可能会导致意外行为。请参阅下面的示例:

const fs = require('node:fs');
const http = require('node:http');
const { pipeline } = require('node:stream');

const server = http.createServer((req, res) => {
  const fileStream = fs.createReadStream('./fileNotExist.txt');
  pipeline(fileStream, res, (err) => {
    if (err) {
      console.log(err); // No such file
      // this message can't be sent once `pipeline` already destroyed the socket
      return res.end('error!!!');
    }
  });
}); 

stream.compose(...streams)#

稳定性:1 - stream.compose处于实验阶段。

将两个或多个流组合成一个Duplex流,该流写入第一个流并从最后一个流读取。使用stream.pipeline将每个提供的流通过管道输送到下一个流。如果任何流出错,则所有流都会被销毁,包括外部的Duplex流。

由于stream.compose返回一个新的流,该流又可以(并且应该)通过管道传输到其他流中,因此它可以实现组合。相反,当将流传递给stream.pipeline时,通常第一个流是可读流,最后一个流是可写流,形成闭路。

如果传递了Function它必须是一个采用source Iterable的工厂方法。

import { compose, Transform } from 'node:stream';

const removeSpaces = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, String(chunk).replace(' ', ''));
  },
});

async function* toUpper(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase();
  }
}

let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
  res += buf;
}

console.log(res); // prints 'HELLOWORLD' 

stream.compose可用于将异步迭代、生成器和函数转换为流。

  • AsyncIterable转换为可读的Duplex。无法产生 null
  • AsyncGeneratorFunction转换为可读/可写转换Duplex。必须将源AsyncIterable作为第一个参数。无法产生 null
  • AsyncFunction转换为可写的Duplex。必须返回nullundefined
import { compose } from 'node:stream';
import { finished } from 'node:stream/promises';

// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
  yield 'Hello';
  yield 'World';
}());

// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase();
  }
});

let res = '';

// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
  for await (const chunk of source) {
    res += chunk;
  }
});

await finished(compose(s1, s2, s3));

console.log(res); // prints 'HELLOWORLD' 

请参阅readable.compose(stream)stream.compose作为运算符。

stream.Readable.from(iterable[, options])#

  • iterable <Iterable>实现Symbol.asyncIteratorSymbol.iterator可迭代协议的对象。如果传递空值,则发出“错误”事件。
  • options <Object>提供给new stream.Readable([options])的选项。默认情况下,Readable.from()会将options.objectMode设置为true,除非通过将options.objectMode设置为false来明确选择退出。
  • 返回:<stream.Readable>

一种用于从迭代器创建可读流的实用方法。

const { Readable } = require('node:stream');

async function * generate() {
  yield 'hello';
  yield 'streams';
}

const readable = Readable.from(generate());

readable.on('data', (chunk) => {
  console.log(chunk);
}); 

出于性能原因,调用Readable.from(string)Readable.from(buffer)不会迭代字符串或缓冲区来匹配其他流语义。

如果包含 Promise 的Iterable对象作为参数传递,则可能会导致未处理的拒绝。

const { Readable } = require('node:stream');

Readable.from([
  new Promise((resolve) => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]); 

stream.Readable.fromWeb(readableStream[, options])#

稳定性:1 - 实验性

stream.Readable.isDisturbed(stream)#

稳定性:1 - 实验性

返回流是否已被读取或取消。

stream.isErrored(stream)#

稳定性:1 - 实验性

返回流是否遇到错误。

stream.isReadable(stream)#

稳定性:1 - 实验性

返回流是否可读。

stream.Readable.toWeb(streamReadable[, options])#

稳定性:1 - 实验性

  • streamReadable <stream.Readable>
  • options <对象>
    • strategy <对象>
      • highWaterMark <number>在读取给定的 stream.Readable时应用反压之前的最大内部队列大小(创建的 ReadableStream ) 。如果未提供值,则将从给定的stream.Readable中获取值。
      • size <Function>计算给定数据块大小的函数。如果未提供值,则所有块的 大小将为1
  • 返回:<可读流>

stream.Writable.fromWeb(writableStream[, options])#

稳定性:1 - 实验性

stream.Writable.toWeb(streamWritable)#

稳定性:1 - 实验性

stream.Duplex.from(src)#

一种用于创建双工流的实用方法。

  • Stream将可写流转换为可写Duplex并将可读流转换为Duplex
  • Blob转换为可读的Duplex
  • string转换为可读的Duplex
  • ArrayBuffer转换为可读的Duplex
  • AsyncIterable转换为可读的Duplex。无法产生 null
  • AsyncGeneratorFunction转换为可读/可写转换 Duplex。必须将源AsyncIterable作为第一个参数。无法产生 null
  • AsyncFunction转换为可写的Duplex。必须返回nullundefined
  • Object ({ writable, readable })readablewritable转换为Stream,然后将它们合并为Duplex,其中 Duplex将写入writable并从readable读取。
  • Promise转换为可读的Duplex。值null被忽略。
  • ReadableStream转换为可读的Duplex
  • WritableStream转换为可写的Duplex
  • 返回:<流.Duplex>

如果包含 Promise 的Iterable对象作为参数传递,则可能会导致未处理的拒绝。

const { Duplex } = require('node:stream');

Duplex.from([
  new Promise((resolve) => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]); 

stream.Duplex.fromWeb(pair[, options])#

稳定性:1 - 实验性

import { Duplex } from 'node:stream';
import {
  ReadableStream,
  WritableStream,
} from 'node:stream/web';

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world');
  },
});

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk);
  },
});

const pair = {
  readable,
  writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });

duplex.write('hello');

for await (const chunk of duplex) {
  console.log('readable', chunk);
}const { Duplex } = require('node:stream');
const {
  ReadableStream,
  WritableStream,
} = require('node:stream/web');

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world');
  },
});

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk);
  },
});

const pair = {
  readable,
  writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });

duplex.write('hello');
duplex.once('readable', () => console.log('readable', duplex.read()));

stream.Duplex.toWeb(streamDuplex)#

稳定性:1 - 实验性

import { Duplex } from 'node:stream';

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world');
    this.push(null);
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk);
    callback();
  },
});

const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');

const { value } = await readable.getReader().read();
console.log('readable', value);const { Duplex } = require('node:stream');

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world');
    this.push(null);
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk);
    callback();
  },
});

const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');

readable.getReader().read().then((result) => {
  console.log('readable', result.value);
});

stream.addAbortSignal(signal, stream)#

附加信号的流。

将 AbortSignal 附加到可读或可写流。这允许代码使用AbortController控制流销毁。

在与传递的AbortSignal相对应的 AbortController上调用abort 的行为方式与 在流上调用.destroy(new AbortError()) 的行为相同,并且controller.error(new AbortError())用于网络流。

const fs = require('node:fs');

const controller = new AbortController();
const read = addAbortSignal(
  controller.signal,
  fs.createReadStream(('object.json')),
);
// Later, abort the operation closing the stream
controller.abort(); 

或者使用带有可读流的AbortSignal作为异步迭代:

const controller = new AbortController();
setTimeout(() => controller.abort(), 10_000); // set a timeout
const stream = addAbortSignal(
  controller.signal,
  fs.createReadStream(('object.json')),
);
(async () => {
  try {
    for await (const chunk of stream) {
      await process(chunk);
    }
  } catch (e) {
    if (e.name === 'AbortError') {
      // The operation was cancelled
    } else {
      throw e;
    }
  }
})(); 

或者将AbortSignal与 ReadableStream 一起使用:

const controller = new AbortController();
const rs = new ReadableStream({
  start(controller) {
    controller.enqueue('hello');
    controller.enqueue('world');
    controller.close();
  },
});

addAbortSignal(controller.signal, rs);

finished(rs, (err) => {
  if (err) {
    if (err.name === 'AbortError') {
      // The operation was cancelled
    }
  }
});

const reader = rs.getReader();

reader.read().then(({ value, done }) => {
  console.log(value); // hello
  console.log(done); // false
  controller.abort();
}); 

stream.getDefaultHighWaterMark(objectMode)#

返回流使用的默认 highWaterMark。默认为16384 (16 KiB),或16表示objectMode

stream.setDefaultHighWaterMark(objectMode, value)#

设置流使用的默认 highWaterMark。

流实现者的 API#

node:stream模块API 旨在使使用 JavaScript 的原型继承模型轻松实现流成为可能。

首先,流开发人员将声明一个新的 JavaScript 类,该类扩展四个基本流类之一(stream.Writablestream.Readablestream.Duplexstream.Transform),确保它们调用适当的父类构造函数:

const { Writable } = require('node:stream');

class MyWritable extends Writable {
  constructor({ highWaterMark, ...options }) {
    super({ highWaterMark });
    // ...
  }
} 

扩展流时,请记住在将这些选项转发给基本构造函数之前用户可以并且应该提供哪些选项。例如,如果实现对 autoDestroyemitClose选项做出假设,则不允许用户覆盖这些选项。明确转发哪些选项,而不是隐式转发所有选项。

然后,新的流类必须根据所创建的流的类型实现一个或多个特定方法,如下图所示:

用例实施方法
只读Readable_read()
只写Writable_write() , _writev() , _final()
读写Duplex_read() _write() _writev() _final()
对写入的数据进行操作,然后读取结果Transform_transform() , _flush() , _final()

流的实现代码永远不应该调用供消费者使用的流的“公共”方法(如 流消费者的 API部分中所述)。这样做可能会导致使用流的应用程序代码产生不利的副作用。

避免覆盖公共方法,例如write()end()cork()uncork()read()destroy() ,或通过发出内部事件,例如'error''data''end''finish''close' .emit()。这样做可能会破坏当前和未来的流不变量,从而导致与其他流、流实用程序和用户期望的行为和/或兼容性问题。

简化施工#

对于许多简单的情况,可以在不依赖继承的情况下创建流。这可以通过直接创建 stream.Writablestream.Readablestream.Duplexstream.Transform对象的 实例并传递适当的方法作为构造函数选项来完成。

const { Writable } = require('node:stream');

const myWritable = new Writable({
  construct(callback) {
    // Initialize state and load resources...
  },
  write(chunk, encoding, callback) {
    // ...
  },
  destroy() {
    // Free resources...
  },
}); 

实现可写流#

stream.Writable类经过扩展以实现Writable流。

自定义Writable必须调用new stream.Writable([options]) 构造函数并实现writable._write()和/或writable._writev() 方法。

new stream.Writable([options])#
const { Writable } = require('node:stream');

class MyWritable extends Writable {
  constructor(options) {
    // Calls the stream.Writable() constructor.
    super(options);
    // ...
  }
} 

或者,当使用 ES6 之前风格的构造函数时:

const { Writable } = require('node:stream');
const util = require('node:util');

function MyWritable(options) {
  if (!(this instanceof MyWritable))
    return new MyWritable(options);
  Writable.call(this, options);
}
util.inherits(MyWritable, Writable); 

或者,使用简化的构造函数方法:

const { Writable } = require('node:stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
}); 

在与传递的AbortSignal对应的 AbortController上调用abort 的行为与 在可写流上调用.destroy(new AbortError())的行为相同。

const { Writable } = require('node:stream');

const controller = new AbortController();
const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
  signal: controller.signal,
});
// Later, abort the operation closing the stream
controller.abort(); 
writable._construct(callback)#
  • callback <Function>当流完成初始化时调用此函数(可选地带有错误参数)。

不得直接调用_construct()方法。它可能由子类实现,如果是这样,则仅由内部Writable类方法调用 。

此可选函数将在流构造函数返回后立即调用,延迟任何_write()_final()_destroy()调用,直到 callback叫做。这对于在使用流之前初始化状态或异步初始化资源很有用。

const { Writable } = require('node:stream');
const fs = require('node:fs');

class WriteStream extends Writable {
  constructor(filename) {
    super();
    this.filename = filename;
    this.fd = null;
  }
  _construct(callback) {
    fs.open(this.filename, (err, fd) => {
      if (err) {
        callback(err);
      } else {
        this.fd = fd;
        callback();
      }
    });
  }
  _write(chunk, encoding, callback) {
    fs.write(this.fd, chunk, callback);
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, (er) => callback(er || err));
    } else {
      callback(err);
    }
  }
} 
writable._write(chunk, encoding, callback)#
  • chunk <缓冲区> | <字符串> | <any>要写入的 Buffer,从 传递给stream.write()string转换而来。如果流的 decodeStrings选项是false或者流在对象模式下运行,则块将不会被转换,并且将是传递给 stream.write()的任何内容。
  • encoding <string>如果块是字符串,则encoding是该字符串的字符编码。如果 chunk 是Buffer,或者流在对象模式下运行,则encoding可能会被忽略。
  • callback <Function>当所提供的块的处理完成时调用此函数(可选地带有错误参数)。

所有Writable流实现都必须提供 writable._write()和/或 writable._writev()方法来将数据发送到底层资源。

Transform流提供了自己的writable._write()实现 。

该函数不得由应用程序代码直接调用。它应该由子类实现,并且仅由内部Writable类方法调用。

callback函数必须在writable._write()内部同步调用或异步(即不同的滴答声)调用 ,以指示写入成功完成或因错误而失败。如果调用失败,则传递给callback 的第一个参数必须是Error对象;如果写入成功,则传递给null 的第一个参数必须是 null 对象。

在调用writable._write()和 调用 callback之间发生的所有对writable.write()的调用都会导致写入的数据被缓冲。当调用callback时,流可能会发出'drain' 事件。如果流实现能够一次处理多个数据块,则应实现writable._writev()方法。

如果decodeStrings属性在构造函数选项中显式设置为false ,则chunk将保留为传递给.write()的同一对象,并且可能是字符串而不是Buffer。这是为了支持对某些字符串数据编码进行优化处理的实现。在这种情况下,encoding参数将指示字符串的字符编码。否则,可以安全地忽略encoding参数。

writable._write()方法以下划线为前缀,因为它是定义它的类的内部函数,并且永远不应该由用户程序直接调用。

writable._writev(chunks, callback)#
  • chunks <Object[]>要写入的数据。该值是一个<Object>数组 ,每个数组代表要写入的离散数据块。这些对象的属性是:
    • chunk <缓冲区> | <string>包含要写入的数据的缓冲区实例或字符串。如果Writable是在 decodeStrings选项设置为false 的情况下创建的,并且字符串传递给write()
    • encoding <string> chunk的字符编码。如果chunkBuffer,则encoding将是'buffer'
  • callback <Function>当提供的块的处理完成时调用的回调函数(可选地带有错误参数)。

该函数不得由应用程序代码直接调用。它应该由子类实现,并且仅由内部Writable类方法调用。

writable._writev()方法可以在能够同时处理多个数据块的流实现中作为writable._write() 的补充或替代来实现。如果实现并且存在先前写入的缓冲数据,则将调用_writev()而不是_write()

writable._writev()方法以下划线为前缀,因为它是定义它的类的内部函数,并且永远不应该由用户程序直接调用。

writable._destroy(err, callback)#
  • err <错误>可能存在错误。
  • callback <Function>带有可选错误参数的回调函数。

_destroy()方法由writable.destroy()调用。它可以被子类覆盖,但不能直接调用。此外,一旦在 Promise 得到解决时执行,callback就不应该与 async/await 混合。

writable._final(callback)#
  • callback <Function>写入任何剩余数据后调用此函数(可选地带有错误参数)。

不得直接调用_final()方法。它可能由子类实现,如果是这样,则仅由内部Writable类方法调用 。

此可选函数将在流关闭之前调用,从而延迟 'finish'事件,直到调用callback 。这对于在流结束之前关闭资源或写入缓冲数据很有用。

写入时出错#

处理writable._write()writable._writev()writable._final()方法期间发生的错误必须通过调用回调并将错误作为第一个参数传递来传播。从这些方法中抛出Error或手动发出'error' 事件会导致未定义的行为。

如果在Writable发出错误时将Readable流通过管道输送到Writable 流,则Readable流将取消管道输送。

const { Writable } = require('node:stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  },
}); 
可写流示例#

下面说明了一个相当简单(并且有些毫无意义)的自定义 Writable流实现。虽然这个特定的Writable流实例没有任何真正的特殊用途,但该示例说明了自定义Writable流实例的每个必需元素:

const { Writable } = require('node:stream');

class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  }
} 
解码可写流中的缓冲区#

解码缓冲区是一项常见任务,例如,当使用输入为字符串的转换器时。当使用多字节字符编码(例如 UTF-8)时,这不是一个简单的过程。以下示例演示如何使用StringDecoderWritable解码多字节字符串。

const { Writable } = require('node:stream');
const { StringDecoder } = require('node:string_decoder');

class StringWritable extends Writable {
  constructor(options) {
    super(options);
    this._decoder = new StringDecoder(options && options.defaultEncoding);
    this.data = '';
  }
  _write(chunk, encoding, callback) {
    if (encoding === 'buffer') {
      chunk = this._decoder.write(chunk);
    }
    this.data += chunk;
    callback();
  }
  _final(callback) {
    this.data += this._decoder.end();
    callback();
  }
}

const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
const w = new StringWritable();

w.write('currency: ');
w.write(euro[0]);
w.end(euro[1]);

console.log(w.data); // currency: € 

实现可读流#

stream.Readable类被扩展以实现Readable流。

自定义Readable必须调用new stream.Readable([options]) 构造函数并实现readable._read()方法。

new stream.Readable([options])#
const { Readable } = require('node:stream');

class MyReadable extends Readable {
  constructor(options) {
    // Calls the stream.Readable(options) constructor.
    super(options);
    // ...
  }
} 

或者,当使用 ES6 之前风格的构造函数时:

const { Readable } = require('node:stream');
const util = require('node:util');

function MyReadable(options) {
  if (!(this instanceof MyReadable))
    return new MyReadable(options);
  Readable.call(this, options);
}
util.inherits(MyReadable, Readable); 

或者,使用简化的构造函数方法:

const { Readable } = require('node:stream');

const myReadable = new Readable({
  read(size) {
    // ...
  },
}); 

在与传递的AbortSignal相对应的 AbortController上调用 abort 的行为与 在创建的可读对象上调用.destroy(new AbortError()) 的行为相同。

const { Readable } = require('node:stream');
const controller = new AbortController();
const read = new Readable({
  read(size) {
    // ...
  },
  signal: controller.signal,
});
// Later, abort the operation closing the stream
controller.abort(); 
readable._construct(callback)#
  • callback <Function>当流完成初始化时调用此函数(可选地带有错误参数)。

不得直接调用_construct()方法。它可能由子类实现,如果是这样,则仅由内部Readable类方法调用 。

此可选函数将由流构造函数在下一个tick中安排,延迟任何_read()_destroy()调用,直到调用callback为止。这对于在使用流之前初始化状态或异步初始化资源很有用。

const { Readable } = require('node:stream');
const fs = require('node:fs');

class ReadStream extends Readable {
  constructor(filename) {
    super();
    this.filename = filename;
    this.fd = null;
  }
  _construct(callback) {
    fs.open(this.filename, (err, fd) => {
      if (err) {
        callback(err);
      } else {
        this.fd = fd;
        callback();
      }
    });
  }
  _read(n) {
    const buf = Buffer.alloc(n);
    fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
      if (err) {
        this.destroy(err);
      } else {
        this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null);
      }
    });
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, (er) => callback(er || err));
    } else {
      callback(err);
    }
  }
} 
readable._read(size)#

该函数不得由应用程序代码直接调用。它应该由子类实现,并且仅由内部Readable类方法调用。

所有Readable流实现都必须提供readable._read()方法的实现 ,以从底层资源获取数据。

当调用readable._read()时,如果数据可从资源中获取,则实现应开始使用 this.push(dataChunk)方法将该数据推送到读取队列中。一旦流准备好接受更多数据,每次调用 this.push(dataChunk)后,都会再次调用 _read()_read()可能会继续从资源读取并推送数据,直到readable.push()返回false。仅当_read()停止后再次调用时,它才会恢复将其他数据推送到队列中。

一旦调用了readable._read()方法,就不会再次调用该方法,直到通过readable.push() 方法推送更多数据。空数据(例如空缓冲区和字符串)不会导致 调用readable._read()

size参数是建议性的。对于“读取”是返回数据的单个操作的实现,可以使用size参数来确定要获取多少数据。其他实现可能会忽略此参数,并在数据可用时简单地提供数据。在调用stream.push(chunk)之前,无需“等待” size 字节可用。

readable._read()方法以下划线为前缀,因为它是定义它的类的内部函数,并且永远不应该由用户程序直接调用。

readable._destroy(err, callback)#
  • err <错误>可能存在错误。
  • callback <Function>带有可选错误参数的回调函数。

_destroy()方法由readable.destroy()调用。它可以被子类覆盖,但不能直接调用。

readable.push(chunk[, encoding])#
  • chunk <缓冲区> | <Uint8Array> | <字符串> | <空> | <any>要推入读取队列的数据块。对于不在对象模式下运行的流,chunk必须是字符串、BufferUint8Array。对于对象模式流,chunk可以是任何 JavaScript 值。
  • encoding <string>字符串块的编码。必须是有效的 Buffer编码,例如'utf8''ascii'
  • 返回:<boolean> true如果可以继续推送其他数据块;false否则。

chunkBufferUint8Arraystring时,数据的chunk将被添加到供流用户使用的内部队列。将chunk作为null传递表示流结束 (EOF),之后无法写入更多数据。

Readable处于暂停模式时, 当'readable'运行时,可以通过调用readable.read()方法读取 readable.push() 添加的数据事件被发出。

Readable在流动模式下运行时,通过readable.push()添加的数据 将通过发出'data'事件来传递。

readable.push()方法被设计得尽可能灵活。例如,当包装提供某种形式的暂停/恢复机制和数据回调的低级源时,低级源可以由自定义 Readable实例包装:

// `_source` is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.

class SourceWrapper extends Readable {
  constructor(options) {
    super(options);

    this._source = getLowLevelSourceObject();

    // Every time there's data, push it into the internal buffer.
    this._source.ondata = (chunk) => {
      // If push() returns false, then stop reading from source.
      if (!this.push(chunk))
        this._source.readStop();
    };

    // When the source ends, push the EOF-signaling `null` chunk.
    this._source.onend = () => {
      this.push(null);
    };
  }
  // _read() will be called when the stream wants to pull more data in.
  // The advisory size argument is ignored in this case.
  _read(size) {
    this._source.readStart();
  }
} 

readable.push()方法用于将内容推入内部缓冲区。它可以由readable._read()方法驱动。

对于不以对象模式运行的流,如果readable.push()chunk参数 为undefined,它将被视为空字符串或缓冲区。请参阅readable.push('')了解更多信息。

读取时出错#

处理readable._read()期间发生的错误必须通过readable.destroy(err)方法传播。从readable._read()内抛出Error或手动发出 'error'事件会导致未定义的行为。

const { Readable } = require('node:stream');

const myReadable = new Readable({
  read(size) {
    const err = checkSomeErrorCondition();
    if (err) {
      this.destroy(err);
    } else {
      // Do some work.
    }
  },
}); 
计数流示例#

以下是Readable流的基本示例,该流按升序发出从 1 到 1,000,000 的数字,然后结束。

const { Readable } = require('node:stream');

class Counter extends Readable {
  constructor(opt) {
    super(opt);
    this._max = 1000000;
    this._index = 1;
  }

  _read() {
    const i = this._index++;
    if (i > this._max)
      this.push(null);
    else {
      const str = String(i);
      const buf = Buffer.from(str, 'ascii');
      this.push(buf);
    }
  }
} 

实现双工流#

Duplex流是同时实现ReadableWritable的流,例如 TCP 套接字连接。

由于 JavaScript 不支持多重继承,因此 扩展了stream.Duplex类以实现Duplex流(而不是扩展stream.Readable stream.Writable类)。

stream.Duplex类原型继承自stream.Readable并寄生于stream.Writable,但由于覆盖stream.Writable上的 Symbol.hasInstance

自定义Duplex必须调用new stream.Duplex([options])构造函数 并实现readable._read()writable._write() 方法。

new stream.Duplex(options)#
  • options <Object>传递给WritableReadable 构造函数。还具有以下字段:
    • allowHalfOpen <boolean>如果设置为false,则当可读端结束时,流将自动结束可写端。 默认值: true
    • readable <boolean>设置Duplex是否可读。 默认值: true
    • writable <boolean>设置Duplex是否可写。 默认值: true
    • readableObjectMode <boolean>将objectMode设置为流的可读部分。如果objectModetrue则无效。默认值: false
    • writableObjectMode <boolean>将objectMode设置为流的可写端。如果objectModetrue则无效。默认值: false
    • readableHighWaterMark <number>为流的可读端设置highWaterMark 。如果提供了highWaterMark则无效。
    • writableHighWaterMark <number>设置流的可写端highWaterMark 。如果提供了highWaterMark则无效。
const { Duplex } = require('node:stream');

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    // ...
  }
} 

或者,当使用 ES6 之前风格的构造函数时:

const { Duplex } = require('node:stream');
const util = require('node:util');

function MyDuplex(options) {
  if (!(this instanceof MyDuplex))
    return new MyDuplex(options);
  Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex); 

或者,使用简化的构造函数方法:

const { Duplex } = require('node:stream');

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  },
}); 

使用管道时:

const { Transform, pipeline } = require('node:stream');
const fs = require('node:fs');

pipeline(
  fs.createReadStream('object.json')
    .setEncoding('utf8'),
  new Transform({
    decodeStrings: false, // Accept string input rather than Buffers
    construct(callback) {
      this.data = '';
      callback();
    },
    transform(chunk, encoding, callback) {
      this.data += chunk;
      callback();
    },
    flush(callback) {
      try {
        // Make sure is valid json.
        JSON.parse(this.data);
        this.push(this.data);
        callback();
      } catch (err) {
        callback(err);
      }
    },
  }),
  fs.createWriteStream('valid-object.json'),
  (err) => {
    if (err) {
      console.error('failed', err);
    } else {
      console.log('completed');
    }
  },
); 
双工流示例#

下面说明了一个简单的Duplex流示例,该流包装了一个假设的较低级别源对象,可以向其中写入数据,也可以从中读取数据,尽管使用的 API 与 Node 不兼容.js 流。下面说明了一个简单的Duplex流示例,该流通过Writable接口缓冲传入的写入数据,并通过Readable接口读回数据。

const { Duplex } = require('node:stream');
const kSource = Symbol('source');

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options);
    this[kSource] = source;
  }

  _write(chunk, encoding, callback) {
    // The underlying source only deals with strings.
    if (Buffer.isBuffer(chunk))
      chunk = chunk.toString();
    this[kSource].writeSomeData(chunk);
    callback();
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding));
    });
  }
} 

Duplex流最重要的方面是ReadableWritable端彼此独立运行,尽管共存于单个对象实例中。

对象模式双工流#

对于Duplex流,可以使用readableObjectMode专门为ReadableWritable侧设置 objectMode 和 分别为writableObjectMode选项。

例如,在下面的示例中,创建了一个新的Transform流(这是一种Duplex流),其对象模式Writable端接受在Readable侧转换为十六进制字符串的 JavaScript 数字。

const { Transform } = require('node:stream');

// All Transform streams are also Duplex Streams.
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // Coerce the chunk to a number if necessary.
    chunk |= 0;

    // Transform the chunk into something else.
    const data = chunk.toString(16);

    // Push the data onto the readable queue.
    callback(null, '0'.repeat(data.length % 2) + data);
  },
});

myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));

myTransform.write(1);
// Prints: 01
myTransform.write(10);
// Prints: 0a
myTransform.write(100);
// Prints: 64 

实现转换流#

Transform流是一个 Duplex流,其中输出是根据输入以某种方式计算的。示例包括 压缩、加密或解密数据的zlib流或加密流。

不要求输出与输入大小相同、块数相同或同时到达。例如,一个Hash流只会有一个输出块,该输出块在输入结束时提供。zlib流将产生比其输入小得多或大得多的输出。

stream.Transform类被扩展以实现Transform流。

stream.Transform类原型继承自stream.Duplex并实现其自己版本的writable._write()readable._read()方法。自定义Transform实现必须 实现transform._transform()方法,也可以 实现transform._flush()方法。

使用Transform流时必须小心,因为如果 Readable上的输出写入到流中,则写入流的数据可能会导致流的 Writable端暂停。边没有被消耗。

new stream.Transform([options])#
const { Transform } = require('node:stream');

class MyTransform extends Transform {
  constructor(options) {
    super(options);
    // ...
  }
} 

或者,当使用 ES6 之前风格的构造函数时:

const { Transform } = require('node:stream');
const util = require('node:util');

function MyTransform(options) {
  if (!(this instanceof MyTransform))
    return new MyTransform(options);
  Transform.call(this, options);
}
util.inherits(MyTransform, Transform); 

或者,使用简化的构造函数方法:

const { Transform } = require('node:stream');

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  },
}); 
事件:'end'#

'end'事件来自stream.Readable类。'end'事件在所有数据输出后发出,这是在调用 transform._flush()中的回调之后发生的。如果出现错误,则 不应发出'end'

事件:'finish'#

'finish'事件来自stream.Writable类。'finish'事件在调用stream.end()且所有块已被stream._transform()处理 后发出。如果出现错误,则不应发出'finish'

transform._flush(callback)#
  • callback <Function>刷新剩余数据时调用的回调函数(可选地带有错误参数和数据)。

该函数不得由应用程序代码直接调用。它应该由子类实现,并且仅由内部Readable类方法调用。

在某些情况下,转换操作可能需要在流末尾发出额外的数据位。例如,zlib压缩流将存储一定量的内部状态,用于最佳压缩输出。然而,当流结束时,需要刷新附加数据,以便压缩数据完整。

自定义Transform实现可以实现transform._flush() 方法。当不再有要消耗的写入数据时,但在发出'end'事件以指示Readable流结束 之前,将调用此函数。

transform._flush()实现中,可以根据需要调用transform.push()方法零次或多次。当刷新操作完成时,必须调用callback函数。

transform._flush()方法以下划线为前缀,因为它是定义它的类的内部函数,并且永远不应该由用户程序直接调用。

transform._transform(chunk, encoding, callback)#
  • chunk <缓冲区> | <字符串> | <any>要转换的 Buffer,从传递给stream.write()string转换而来。如果流的 decodeStrings选项是false或者流在对象模式下运行,则块将不会被转换,并且将是传递给 stream.write()的任何内容。
  • encoding <string>如果块是字符串,则这是编码类型。如果 chunk 是一个缓冲区,那么这是特殊值'buffer'。在这种情况下请忽略它。
  • callback <Function>处理提供的chunk后调用的回调函数(可选地带有错误参数和数据)。

该函数不得由应用程序代码直接调用。它应该由子类实现,并且仅由内部Readable类方法调用。

所有Transform流实现都必须提供_transform() 方法来接受输入并生成输出。transform._transform()实现 处理正在写入的字节,计算输出,然后使用transform.push()方法将该输出传递到可读部分。

transform.push()方法可以被调用零次或多次以从单个输入块生成输出,具体取决于作为该块的结果要输出多少。

任何给定的输入数据块都可能不会生成任何输出。

仅当当前块完全消耗时才必须调用callback函数。如果处理输入时发生错误,则传递给callback 的第一个参数必须是Error对象,否则为null。如果将第二个参数传递给callback,它将被转发到 transform.push()方法,但前提是第一个参数为 false。换句话说,以下内容是等效的:

transform.prototype._transform = function(data, encoding, callback) {
  this.push(data);
  callback();
};

transform.prototype._transform = function(data, encoding, callback) {
  callback(null, data);
}; 

transform._transform()方法以下划线为前缀,因为它是定义它的类的内部函数,并且永远不应该由用户程序直接调用。

transform._transform()永远不会被并行调用;流实现队列机制,要接收下一个块,必须同步或异步调用callback

类:stream.PassThrough#

stream.PassThrough类是Transform流的简单实现 ,它只是将输入字节传递到输出。其目的主要是用于示例和测试,但在某些用例中, stream.PassThrough可用作新型流的构建块。

补充注意#

流与异步生成器和异步迭代器的兼容性#

在 JavaScript 中异步生成器和迭代器的支持下,异步生成器此时实际上是一流的语言级流构造。

下面提供了将 Node.js 流与异步生成器和异步迭代器结合使用的一些常见互操作案例。

使用异步迭代器使用可读流#
(async function() {
  for await (const chunk of readable) {
    console.log(chunk);
  }
})(); 

异步迭代器在流上注册一个永久错误处理程序,以防止任何未处理的销毁后错误。

使用异步生成器创建可读流#

可以使用Readable.from()实用方法从异步生成器创建 Node.js 可读流:

const { Readable } = require('node:stream');

const ac = new AbortController();
const signal = ac.signal;

async function * generate() {
  yield 'a';
  await someLongRunningFn({ signal });
  yield 'b';
  yield 'c';
}

const readable = Readable.from(generate());
readable.on('close', () => {
  ac.abort();
});

readable.on('data', (chunk) => {
  console.log(chunk);
}); 
从异步迭代器通过管道传输到可写流#

从异步迭代器写入可写流时,请确保正确处理背压和错误。stream.pipeline()抽象了背压和背压相关错误的处理:

const fs = require('node:fs');
const { pipeline } = require('node:stream');
const { pipeline: pipelinePromise } = require('node:stream/promises');

const writable = fs.createWriteStream('./file');

const ac = new AbortController();
const signal = ac.signal;

const iterator = createIterator({ signal });

// Callback Pattern
pipeline(iterator, writable, (err, value) => {
  if (err) {
    console.error(err);
  } else {
    console.log(value, 'value returned');
  }
}).on('close', () => {
  ac.abort();
});

// Promise Pattern
pipelinePromise(iterator, writable)
  .then((value) => {
    console.log(value, 'value returned');
  })
  .catch((err) => {
    console.error(err);
    ac.abort();
  }); 

与旧版 Node.js 的兼容性#

在 Node.js 0.10 之前,Readable流接口更简单,但功能也较差且不太有用。

  • 'data' 事件将立即开始发出,而不是等待调用 stream.read() 方法 。需要执行一定量工作来决定如何处理数据的应用程序需要将读取的数据存储到缓冲区中,以便数据不会丢失。
  • stream.pause()方法是建议性的,而不是保证性的。这意味着即使流处于暂停状态,仍然需要准备好接收 'data'事件。

在 Node.js 0.10 中,添加了Readable类。为了向后兼容较旧的 Node.js 程序,当添加 'data'事件处理程序或调用stream.resume()方法时,Readable流会切换到“流动模式”叫。其效果是,即使不使用新的stream.read()方法和 'readable'事件,也不再需要担心丢失 'data'块。

虽然大多数应用程序将继续正常运行,但这会在以下情况下引入边缘情况:

例如,考虑以下代码:

// WARNING!  BROKEN!
net.createServer((socket) => {

  // We add an 'end' listener, but never consume the data.
  socket.on('end', () => {
    // It will never get here.
    socket.end('The message was received but was not processed.\n');
  });

}).listen(1337); 

在 Node.js 0.10 之前,传入的消息数据将被简单地丢弃。然而,在 Node.js 0.10 及更高版本中,套接字永远保持暂停状态。

这种情况的解决方法是调用 stream.resume()方法来开始数据流:

// Workaround.
net.createServer((socket) => {
  socket.on('end', () => {
    socket.end('The message was received but was not processed.\n');
  });

  // Start the flow of data, discarding it.
  socket.resume();
}).listen(1337); 

除了新的Readable流切换到流动模式之外,0.10 之前的样式流还可以使用 readable.wrap()方法包装在Readable 类中。

readable.read(0)#

在某些情况下,需要触发底层可读流机制的刷新,而不实际消耗任何数据。在这种情况下,可以调用readable.read(0),它将始终返回null

如果内部读取缓冲区低于highWaterMark,并且流当前未读取,则调用stream.read(0)将触发低级stream._read()调用。

虽然大多数应用程序几乎不需要这样做,但 Node.js 中有些情况会这样做,特别是在 Readable流类内部。

readable.push('')#

不建议使用readable.push('')

将零字节字符串BufferUint8Array推送到不处于对象模式的流会产生有趣的副作用。因为它是对readable.push()的调用 ,所以该调用将结束读取过程。但是,由于参数是空字符串,因此不会将任何数据添加到可读缓冲区中,因此用户无法使用任何内容。

调用readable.setEncoding()后出现 highWaterMark差异#

使用readable.setEncoding()将改变highWaterMark在非对象模式下的运行方式 。

通常,当前缓冲区的大小是根据 highWaterMark (以字节为单位)来衡量的。但是,在调用setEncoding()后,比较函数将开始测量缓冲区的大小(以字符为单位)

在使用latin1ascii的常见情况下,这不是问题。但建议在处理可能包含多字节字符的字符串时注意此行为。

NodeJS中文文档为Read dev Docs平台提供托管,中文NodeJS文档均由英文版NodeJS文档翻译,版权属于nodejs.org