- 断言测试
- 异步上下文跟踪
- 异步钩子
- 缓冲(Buffer)
- C++ 插件
- 使用 Node-API 的 C/C++ 插件
- C++ 嵌入 Node环境
- 子进程(Child processes)
- 集群(Cluster)
- 命令行选项
- 控制台(Console)
- 核心包(Corepack)
- 加密(Crypto)
- 调试器(Debugger)
- 已弃用的 API
- 诊断通道(Diagnostics Channel)
- 域名系统(DNS)
- 域(Domain)
- 错误(Errors)
- 事件(Events)
- 文件系统(File system)
- 全局变量(Globals)
- HTTP
- HTTP/2
- HTTPS
- 检查器(Inspector)
- 国际化
- 模块:CommonJS 模块
- 模块:ECMAScript 模块
- 模块:
node:module
API - 模块:packages 模块
- 网络(Net)
- 系统(OS)
- 路径(Path)
- 性能挂钩(Performance hooks)
- 性能挂钩(Permissions)
- 进程(Process)
- Punycode 国际化域名编码
- 查询字符串(Query strings)
- 命令行库(Readline)
- REPL 交互式编程环境
- 诊断报告
- 单个可执行应用程序
- Stream 流
- 字符串解码器
- 单元测试
- 定时器(Timers)
- 传输层安全/SSL
- 跟踪事件
- TTY
- UDP/数据报
- URL
- 实用程序
- V8
- 虚拟机
- WebAssembly
- Web加密 API(Web Crypto API)
- 网络流 API(Web Streams API)
- 工作线程(Worker threads)
- zlib
Node.js v18.18.2 文档
- Node.js v18.18.2
-
►
目录
- 网络流 API
- 概述
- 应用程序编程接口
- 类:
ReadableStream
new ReadableStream([underlyingSource [, strategy]])
readableStream.locked
readableStream.cancel([reason])
readableStream.getReader([options])
readableStream.pipeThrough(transform[, options])
readableStream.pipeTo(destination[, options])
readableStream.tee()
readableStream.values([options])
- 异步迭代
- 使用
postMessage()
转移
- 类:
ReadableStreamDefaultReader
- 类:
ReadableStreamBYOBReader
- 类:
ReadableStreamDefaultController
- 类:
ReadableByteStreamController
- Class:
ReadableStreamBYOBRequest
- Class:
WritableStream
- Class:
WritableStreamDefaultWriter
new WritableStreamDefaultWriter(stream)
writableStreamDefaultWriter.abort([reason])
writableStreamDefaultWriter.close()
writableStreamDefaultWriter.closed
writableStreamDefaultWriter.desiredSize
writableStreamDefaultWriter.ready
writableStreamDefaultWriter.releaseLock()
writableStreamDefaultWriter.write([chunk])
- Class:
WritableStreamDefaultController
- Class:
TransformStream
- Class:
TransformStreamDefaultController
- Class:
ByteLengthQueuingStrategy
- Class:
CountQueuingStrategy
- Class:
TextEncoderStream
- Class:
TextDecoderStream
- Class:
CompressionStream
- Class:
DecompressionStream
- Utility Consumers
- 类:
- 网络流 API
-
►
索引
- Assertion testing
- Asynchronous context tracking
- Async hooks
- Buffer
- C++ addons
- C/C++ addons with Node-API
- C++ embedder API
- Child processes
- Cluster
- Command-line options
- Console
- Corepack
- Crypto
- Debugger
- Deprecated APIs
- Diagnostics Channel
- DNS
- Domain
- Errors
- Events
- File system
- Globals
- HTTP
- HTTP/2
- HTTPS
- Inspector
- Internationalization
- Modules: CommonJS modules
- Modules: ECMAScript modules
- Modules:
node:module
API - Modules: Packages
- Net
- 系统(OS)
- 路径(Path)
- Performance hooks
- Permissions
- 进程(Process)
- Punycode
- Query strings
- 命令行库(Readline)
- REPL 交互式编程环境
- Report
- Single executable applications
- Stream
- String decoder
- Test runner
- Timers
- TLS/SSL
- Trace events
- TTY
- UDP/datagram
- URL
- Utilities
- V8
- VM
- WASI
- Web Crypto API
- Web Streams API
- Worker threads
- Zlib
- ► 其他版本
- ► 选项
目录
- 网络流 API
- 概述
- 应用程序编程接口
- 类:
ReadableStream
new ReadableStream([underlyingSource [, strategy]])
readableStream.locked
readableStream.cancel([reason])
readableStream.getReader([options])
readableStream.pipeThrough(transform[, options])
readableStream.pipeTo(destination[, options])
readableStream.tee()
readableStream.values([options])
- 异步迭代
- 使用
postMessage()
转移
- 类:
ReadableStreamDefaultReader
- 类:
ReadableStreamBYOBReader
- 类:
ReadableStreamDefaultController
- 类:
ReadableByteStreamController
- 类:
ReadableStreamBYOBRequest
- 类:
WritableStream
- 类:
WritableStreamDefaultWriter
new WritableStreamDefaultWriter(stream)
writableStreamDefaultWriter.abort([reason])
writableStreamDefaultWriter.close()
writableStreamDefaultWriter.closed
writableStreamDefaultWriter.desiredSize
writableStreamDefaultWriter.ready
writableStreamDefaultWriter.releaseLock()
writableStreamDefaultWriter.write([chunk])
- 类:
WritableStreamDefaultController
- 类:
TransformStream
- 类:
TransformStreamDefaultController
- 类:
ByteLengthQueuingStrategy
- 类:
CountQueuingStrategy
- 类:
TextEncoderStream
- 类:
TextDecoderStream
- 类:
CompressionStream
- 类:
DecompressionStream
- 实用程序消费者
- 类:
网络流 API#
概述#
WHATWG Streams 标准(或“Web 流”)定义了用于处理流数据的 API。它类似于 Node.js Streams API,但出现较晚,并已成为跨许多 JavaScript 环境流式传输数据的“标准”API。
对象主要分为三种类型:
ReadableStream
- 表示流数据源。WritableStream
- 表示流数据的目的地。TransformStream
- 表示转换流数据的算法。
示例ReadableStream
#
此示例创建一个简单的ReadableStream
,它
每秒推送一次当前的performance.now()
时间戳。异步迭代用于从流中读取数据。
import {
ReadableStream,
} from 'node:stream/web';
import {
setInterval as every,
} from 'node:timers/promises';
import {
performance,
} from 'node:perf_hooks';
const SECOND = 1000;
const stream = new ReadableStream({
async start(controller) {
for await (const _ of every(SECOND))
controller.enqueue(performance.now());
},
});
for await (const value of stream)
console.log(value);
const {
ReadableStream,
} = require('node:stream/web');
const {
setInterval: every,
} = require('node:timers/promises');
const {
performance,
} = require('node:perf_hooks');
const SECOND = 1000;
const stream = new ReadableStream({
async start(controller) {
for await (const _ of every(SECOND))
controller.enqueue(performance.now());
},
});
(async () => {
for await (const value of stream)
console.log(value);
})();
应用程序编程接口#
类:ReadableStream
#
new ReadableStream([underlyingSource [, strategy]])
#
underlyingSource
<对象>start
<Function>创建ReadableStream
时立即调用的用户定义函数。controller
<ReadableStreamDefaultController> | <可读字节流控制器>- 返回:
undefined
或通过undefined
履行的 Promise 。
pull
<Function>当ReadableStream
内部队列未满时重复调用的用户定义函数 。该操作可以是同步的或异步的。如果是异步的,则在满足之前返回的 Promise 之前,不会再次调用该函数。controller
<ReadableStreamDefaultController> | <可读字节流控制器>- 返回:用
undefined
履行的 Promise 。
cancel
<Function>当取消ReadableStream
时调用的用户定义函数 。reason
<任意>- 返回:用
undefined
履行的 Promise 。
type
<string>必须是'bytes'
或undefined
。autoAllocateChunkSize
<number>仅当type
等于'bytes'
时使用。当设置为非零值时,视图缓冲区会自动分配给ReadableByteStreamController.byobRequest
。如果未设置,则必须使用流的内部队列通过默认读取器ReadableStreamDefaultReader
传输数据。
strategy
<对象>highWaterMark
<number>应用反压之前的最大内部队列大小。size
<Function>用户定义的函数,用于识别每个数据块的大小。
readableStream.locked
#
- 类型:<boolean>如果此<ReadableStream>有事件读取器,则
设置为
true
。
readableStream.locked
属性默认为false
,当有事件读取器消耗流数据时,会切换为true
。
readableStream.cancel([reason])
#
reason
<任意>- 返回:取消完成后,用
undefined
履行 Promise 。
readableStream.getReader([options])
#
options
<对象>mode
<字符串>'byob'
或undefined
- 返回:<ReadableStreamDefaultReader> | <ReadableStreamBYOBReader>
import { ReadableStream } from 'node:stream/web';
const stream = new ReadableStream();
const reader = stream.getReader();
console.log(await reader.read());
const { ReadableStream } = require('node:stream/web');
const stream = new ReadableStream();
const reader = stream.getReader();
reader.read().then(console.log);
导致readableStream.locked
为true
。
readableStream.pipeThrough(transform[, options])
#
transform
<对象>readable
<ReadableStream>ReadableStream
将从此ReadableStream
接收可能修改的数据推送到的ReadableStream
。writable
<WritableStream>此ReadableStream
的数据将写入的WritableStream
。
options
<对象>preventAbort
<boolean>当true
时,此ReadableStream
中的错误 不会导致transform.writable
中止。preventCancel
<boolean>当true
时,目标transform.writable
中的错误不会导致此ReadableStream
被取消。preventClose
<boolean>当true
时,关闭此ReadableStream
不会导致transform.writable
关闭。signal
<AbortSignal>允许使用<AbortController>取消数据传输。
- 返回:<ReadableStream>来自
transform.readable
。
将此<ReadableStream>连接到transform
参数中提供的一对<ReadableStream>和
<WritableStream>,以便将此<ReadableStream>中的数据写入到transform.writable
,可能会进行转换,然后推送到transform.readable
。配置管道后,将返回transform.readable
。
当管道操作处于事件状态时,导致readableStream.locked
为true
。
import {
ReadableStream,
TransformStream,
} from 'node:stream/web';
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a');
},
});
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
const transformedStream = stream.pipeThrough(transform);
for await (const chunk of transformedStream)
console.log(chunk);
const {
ReadableStream,
TransformStream,
} = require('node:stream/web');
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a');
},
});
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
const transformedStream = stream.pipeThrough(transform);
(async () => {
for await (const chunk of transformedStream)
console.log(chunk);
})();
readableStream.pipeTo(destination[, options])
#
destination
<WritableStream>此ReadableStream
的数据将写入其中的 <WritableStream>。options
<对象>preventAbort
<boolean>当true
时,此ReadableStream
中的错误 不会导致destination
中止。preventCancel
<boolean>当true
时,destination
中的错误 不会导致此ReadableStream
被取消。preventClose
<boolean>当true
时,关闭此ReadableStream
不会导致destination
关闭。signal
<AbortSignal>允许使用<AbortController>取消数据传输。
- 返回:用
undefined
履行的 Promise
当管道操作处于事件状态时,导致readableStream.locked
为true
。
readableStream.tee()
#
返回一对新的<ReadableStream>实例,此
ReadableStream
的数据将被转发到该实例。每个人都会收到相同的数据。
导致readableStream.locked
为true
。
readableStream.values([options])
#
options
<对象>preventCancel
<boolean>当true
时,防止 在异步迭代器突然终止时关闭 <ReadableStream> 。默认值:false
。
创建并返回一个可用于使用此
ReadableStream
的数据的异步迭代器。
当异步迭代器处于事件状态时,导致readableStream.locked
为true
。
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream.values({ preventCancel: true }))
console.log(Buffer.from(chunk).toString());
异步迭代#
<ReadableStream>对象支持使用
for await
语法的异步迭代器协议。
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream)
console.log(Buffer.from(chunk).toString());
异步迭代器将消耗<ReadableStream>直到它终止。
默认情况下,如果异步迭代器提前退出(通过break
、
return
或throw
),则<ReadableStream>将被关闭。要防止自动关闭 <ReadableStream> ,请使用readableStream.values()
方法获取异步迭代器并将preventCancel
选项设置为
true
。
<ReadableStream>不得被锁定(即,它不得有现有的事件读取器)。在异步迭代期间,<ReadableStream>将被锁定。
使用postMessage()
转移#
可以使用<MessagePort>传输<ReadableStream>实例。
const stream = new ReadableStream(getReadableSourceSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getReader().read().then((chunk) => {
console.log(chunk);
});
};
port2.postMessage(stream, [stream]);
类:ReadableStreamDefaultReader
#
默认情况下,调用不带参数的readableStream.getReader()
将返回ReadableStreamDefaultReader
的实例。默认读取器将通过流传递的数据块视为不透明值,这允许 <ReadableStream>通常可以处理任何 JavaScript 值。
new ReadableStreamDefaultReader(stream)
#
stream
<ReadableStream>
创建一个锁定到给定<ReadableStream>的新<ReadableStreamDefaultReader>。
readableStreamDefaultReader.cancel([reason])
#
reason
<任意>- 返回:用
undefined
履行的 Promise 。
取消<ReadableStream>并返回一个在取消基础流时实现的 Promise 。
readableStreamDefaultReader.closed
#
- 类型:<Promise>当关联的
<ReadableStream>被关闭或拒绝时,如果流发生错误或在流完成关闭之前释放了读取器的锁,则用
undefined
实现。
readableStreamDefaultReader.read()
#
从底层<ReadableStream>请求下一个数据块 ,并返回一个 Promise ,一旦数据可用,该 Promise 将被履行。
readableStreamDefaultReader.releaseLock()
#
释放该读取器对底层<ReadableStream>的锁定。
类:ReadableStreamBYOBReader
#
ReadableStreamBYOBReader
是面向字节的<ReadableStream>的替代使用者(
当
ReadableStream
已创建)。
BYOB
是“自带缓冲区”的缩写。这种模式允许更有效地读取面向字节的数据,从而避免无关的复制。
import {
open,
} from 'node:fs/promises';
import {
ReadableStream,
} from 'node:stream/web';
import { Buffer } from 'node:buffer';
class Source {
type = 'bytes';
autoAllocateChunkSize = 1024;
async start(controller) {
this.file = await open(new URL(import.meta.url));
this.controller = controller;
}
async pull(controller) {
const view = controller.byobRequest?.view;
const {
bytesRead,
} = await this.file.read({
buffer: view,
offset: view.byteOffset,
length: view.byteLength,
});
if (bytesRead === 0) {
await this.file.close();
this.controller.close();
}
controller.byobRequest.respond(bytesRead);
}
}
const stream = new ReadableStream(new Source());
async function read(stream) {
const reader = stream.getReader({ mode: 'byob' });
const chunks = [];
let result;
do {
result = await reader.read(Buffer.alloc(100));
if (result.value !== undefined)
chunks.push(Buffer.from(result.value));
} while (!result.done);
return Buffer.concat(chunks);
}
const data = await read(stream);
console.log(Buffer.from(data).toString());
new ReadableStreamBYOBReader(stream)
#
stream
<ReadableStream>
创建一个锁定到给定<ReadableStream>的新ReadableStreamBYOBReader
。
readableStreamBYOBReader.cancel([reason])
#
reason
<任意>- 返回:用
undefined
履行的 Promise 。
取消<ReadableStream>并返回一个在取消基础流时实现的 Promise 。
readableStreamBYOBReader.closed
#
- 类型:<Promise>当关联的
<ReadableStream>被关闭或拒绝时,如果流发生错误或在流完成关闭之前释放了读取器的锁,则用
undefined
实现。
readableStreamBYOBReader.read(view)
#
从底层<ReadableStream>请求下一个数据块 ,并返回一个 Promise ,一旦数据可用,该 Promise 将被履行。
不要将池化的<Buffer>对象实例传递给此方法。池化的Buffer
对象是使用Buffer.allocUnsafe()
或Buffer.from()
创建的,或者通常由各种node:fs
模块回调返回。这些类型的Buffer
使用共享的底层
<ArrayBuffer>对象,该对象包含来自所有池化Buffer
实例的所有数据。当Buffer
、<TypedArray>或<DataView>传递到readableStreamBYOBReader.read()
时,视图的基础ArrayBuffer
被分离,从而使所有可能存在的现有视图失效关于那个ArrayBuffer
。这可能会给您的应用程序带来灾难性的后果。
readableStreamBYOBReader.releaseLock()
#
释放该读取器对底层<ReadableStream>的锁定。
类:ReadableStreamDefaultController
#
每个<ReadableStream>都有一个控制器,负责流队列的内部状态和管理。ReadableStreamDefaultController
是非面向字节的ReadableStream
的默认控制器实现。
readableStreamDefaultController.close()
#
关闭与此控制器关联的<ReadableStream> 。
readableStreamDefaultController.desiredSize
#
- 类型:<数字>
返回填充<ReadableStream>队列的剩余数据量。
readableStreamDefaultController.enqueue([chunk])
#
chunk
<任意>
将新的数据块追加到<ReadableStream>的队列中。
readableStreamDefaultController.error([error])
#
error
<任意>
发出错误信号,导致 <ReadableStream>出错并关闭。
类:ReadableByteStreamController
#
每个<ReadableStream>都有一个控制器,负责流队列的内部状态和管理。ReadableByteStreamController
用于面向字节的ReadableStream
。
readableByteStreamController.byobRequest
#
readableByteStreamController.close()
#
关闭与此控制器关联的<ReadableStream> 。
readableByteStreamController.desiredSize
#
- 类型:<数字>
返回填充<ReadableStream>队列的剩余数据量。
readableByteStreamController.enqueue(chunk)
#
将新的数据块追加到<ReadableStream>的队列中。
readableByteStreamController.error([error])
#
error
<任意>
发出错误信号,导致 <ReadableStream>出错并关闭。
类:ReadableStreamBYOBRequest
#
在面向字节的流中使用ReadableByteStreamController
以及使用 ReadableStreamBYOBReader
时,readableByteStreamController.byobRequest
属性提供对表示的ReadableStreamBYOBRequest
实例的访问当前的读取请求。该对象用于访问已为读取请求填充而提供的ArrayBuffer
/ TypedArray
,并提供用于发出数据已提供信号的方法。
readableStreamBYOBRequest.respond(bytesWritten)
#
bytesWritten
<数字>
表示bytesWritten
个字节已写入readableStreamBYOBRequest.view
。
readableStreamBYOBRequest.respondWithNewView(view)
#
表示请求已通过写入新的Buffer
、TypedArray
或DataView
的字节得到满足。
readableStreamBYOBRequest.view
#
类:WritableStream
#
WritableStream
是流数据发送到的目的地。
import {
WritableStream,
} from 'node:stream/web';
const stream = new WritableStream({
write(chunk) {
console.log(chunk);
},
});
await stream.getWriter().write('Hello World');
new WritableStream([underlyingSink[, strategy]])
#
underlyingSink
<对象>start
<Function>创建WritableStream
时立即调用的用户定义函数。controller
<WritableStreamDefaultController>- 返回:
undefined
或通过undefined
履行的 Promise 。
write
<Function>当一块数据写入WritableStream
时调用的用户定义函数。chunk
<任意>controller
<WritableStreamDefaultController>- 返回:用
undefined
履行的 Promise 。
close
<Function>关闭WritableStream
时调用的用户定义函数 。- 返回:用
undefined
履行的 Promise 。
- 返回:用
abort
<Function>调用以突然关闭WritableStream
的用户定义函数。reason
<任意>- 返回:用
undefined
履行的 Promise 。
type
<any>type
选项保留供将来使用,并且必须未定义。
strategy
<对象>highWaterMark
<number>应用反压之前的最大内部队列大小。size
<Function>用户定义的函数,用于识别每个数据块的大小。
writableStream.abort([reason])
#
reason
<任意>- 返回:用
undefined
履行的 Promise 。
突然终止WritableStream
。所有排队的写入都将被取消,并且其关联的 Promise 将被拒绝。
writableStream.close()
#
- 返回:用
undefined
履行的 Promise 。
当不需要额外写入时关闭WritableStream
。
writableStream.getWriter()
#
创建并创建一个新的写入器实例,可用于将数据写入WritableStream
。
writableStream.locked
#
- 类型:<布尔值>
writableStream.locked
属性默认为false
,当有一个事件的 writer 附加到此
WritableStream
时,会切换为true
。
使用 postMessage() 传输#
可以使用<MessagePort>传输<WritableStream>实例。
const stream = new WritableStream(getWritableSinkSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getWriter().write('hello');
};
port2.postMessage(stream, [stream]);
类:WritableStreamDefaultWriter
#
new WritableStreamDefaultWriter(stream)
#
stream
<WritableStream>
创建一个锁定到给定
WritableStream
的新WritableStreamDefaultWriter
。
writableStreamDefaultWriter.abort([reason])
#
reason
<任意>- 返回:用
undefined
履行的 Promise 。
突然终止WritableStream
。所有排队的写入都将被取消,并且其关联的 Promise 将被拒绝。
writableStreamDefaultWriter.close()
#
- 返回:用
undefined
履行的 Promise 。
当不需要额外写入时关闭WritableStream
。
writableStreamDefaultWriter.closed
#
- 类型:<Promise>当关联的
<WritableStream>被关闭或拒绝时,如果流发生错误或在流完成关闭之前释放写入者的锁,则用
undefined
实现。
writableStreamDefaultWriter.desiredSize
#
- 类型:<数字>
填充<WritableStream>队列所需的数据量。
writableStreamDefaultWriter.ready
#
- type:当编写器准备好使用时,用
undefined
履行的 Promise 。
writableStreamDefaultWriter.releaseLock()
#
释放该写入者对底层<ReadableStream>的锁定。
writableStreamDefaultWriter.write([chunk])
#
chunk
:<任何>- 返回:用
undefined
履行的 Promise 。
将新的数据块追加到<WritableStream>的队列中。
类:WritableStreamDefaultController
#
WritableStreamDefaultController
管理<WritableStream>的内部状态。
writableStreamDefaultController.error([error])
#
error
<任意>
由用户代码调用以表示处理WritableStream
数据时发生错误。调用时,<WritableStream>将被中止,同时取消当前挂起的写入。
writableStreamDefaultController.signal
#
- 类型:<AbortSignal>一个
AbortSignal
,可用于在<WritableStream>中止时取消挂起的写入或关闭操作。
类:TransformStream
#
TransformStream
由<ReadableStream>和<WritableStream>组成,它们相互连接,以便写入到WritableStream
的数据在被推送到ReadableStream
的队列。
import {
TransformStream,
} from 'node:stream/web';
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
await Promise.all([
transform.writable.getWriter().write('A'),
transform.readable.getReader().read(),
]);
new TransformStream([transformer[, writableStrategy[, readableStrategy]]])
#
transformer
<对象>start
<Function>创建TransformStream
时立即调用的用户定义函数。controller
<TransformStreamDefaultController>- 返回:
undefined
或用undefined
履行的 Promise
transform
<Function>一个用户定义的函数,用于接收并可能修改写入transformStream.writable
的数据块,然后将其转发到transformStream.readable
。chunk
<任意>controller
<TransformStreamDefaultController>- 返回:用
undefined
履行的 Promise 。
flush
<Function>在TransformStream
的可写端关闭之前立即调用的用户定义函数,表示转换过程结束。controller
<TransformStreamDefaultController>- 返回:用
undefined
履行的 Promise 。
readableType
<any>readableType
选项保留供将来使用,并且必须为undefined
。writableType
<any>writableType
选项保留供将来使用,并且必须为undefined
。
writableStrategy
<对象>highWaterMark
<number>应用反压之前的最大内部队列大小。size
<Function>用户定义的函数,用于识别每个数据块的大小。
readableStrategy
<对象>highWaterMark
<number>应用反压之前的最大内部队列大小。size
<Function>用户定义的函数,用于识别每个数据块的大小。
transformStream.readable
#
- 类型:<可读流>
transformStream.writable
#
- 类型:<可写流>
使用 postMessage() 传输#
可以使用<MessagePort>传输<TransformStream>实例。
const stream = new TransformStream();
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
const { writable, readable } = data;
// ...
};
port2.postMessage(stream, [stream]);
类:TransformStreamDefaultController
#
TransformStreamDefaultController
管理TransformStream
的内部状态。
transformStreamDefaultController.desiredSize
#
- 类型:<数字>
填充可读端队列所需的数据量。
transformStreamDefaultController.enqueue([chunk])
#
chunk
<任意>
将一块数据附加到可读端的队列中。
transformStreamDefaultController.error([reason])
#
reason
<任意>
向可读和可写端发出信号,表明在处理转换数据时发生了错误,导致双方突然关闭。
transformStreamDefaultController.terminate()
#
关闭传输的可读端并导致可写端因错误而突然关闭。
类:ByteLengthQueuingStrategy
#
new ByteLengthQueuingStrategy(options)
#
byteLengthQueuingStrategy.highWaterMark
#
- 类型:<数字>
byteLengthQueuingStrategy.size
#
类:CountQueuingStrategy
#
new CountQueuingStrategy(options)
#
countQueuingStrategy.highWaterMark
#
- 类型:<数字>
countQueuingStrategy.size
#
类:TextEncoderStream
#
new TextEncoderStream()
#
创建一个新的TextEncoderStream
实例。
textEncoderStream.encoding
#
- 类型:<字符串>
TextEncoderStream
实例支持的编码。
textEncoderStream.readable
#
- 类型:<可读流>
textEncoderStream.writable
#
- 类型:<可写流>
类:TextDecoderStream
#
new TextDecoderStream([encoding[, options]])
#
创建一个新的TextDecoderStream
实例。
textDecoderStream.encoding
#
- 类型:<字符串>
TextDecoderStream
实例支持的编码。
textDecoderStream.fatal
#
- 类型:<布尔值>
如果解码错误导致抛出TypeError
,则该值将为true
。
textDecoderStream.ignoreBOM
#
- 类型:<布尔值>
如果解码结果包含字节顺序标记,则该值将为true
。
textDecoderStream.readable
#
- 类型:<可读流>
textDecoderStream.writable
#
- 类型:<可写流>
类:CompressionStream
#
new CompressionStream(format)
#
format
<string>'deflate'
或'gzip'
之一。
compressionStream.readable
#
- 类型:<可读流>
compressionStream.writable
#
- 类型:<可写流>
类:DecompressionStream
#
new DecompressionStream(format)
#
format
<string>'deflate'
或'gzip'
之一。
decompressionStream.readable
#
- 类型:<可读流>
decompressionStream.writable
#
- 类型:<可写流>
实用程序消费者#
实用程序消费者函数提供了消费流的通用选项。
它们可以通过以下方式访问:
import {
arrayBuffer,
blob,
buffer,
json,
text,
} from 'node:stream/consumers';
const {
arrayBuffer,
blob,
buffer,
json,
text,
} = require('node:stream/consumers');
streamConsumers.arrayBuffer(stream)
#
stream
<ReadableStream> | <流.可读> | <异步迭代器>- 返回:<Promise>通过包含流的完整内容的
ArrayBuffer
来实现。
import { arrayBuffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { TextEncoder } from 'node:util';
const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
const data = await arrayBuffer(readable);
console.log(`from readable: ${data.byteLength}`);
const { arrayBuffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { TextEncoder } = require('node:util');
const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
arrayBuffer(readable).then((data) => {
console.log(`from readable: ${data.byteLength}`);
});
streamConsumers.blob(stream)
#
stream
<ReadableStream> | <流.可读> | <异步迭代器>- 返回:<Promise>通过包含流的完整内容的<Blob>实现。
import { blob } from 'node:stream/consumers';
const dataBlob = new Blob(['hello world from consumers!']);
const readable = dataBlob.stream();
const data = await blob(readable);
console.log(`from readable: ${data.size}`);
const { blob } = require('node:stream/consumers');
const dataBlob = new Blob(['hello world from consumers!']);
const readable = dataBlob.stream();
blob(readable).then((data) => {
console.log(`from readable: ${data.size}`);
});
streamConsumers.buffer(stream)
#
stream
<ReadableStream> | <流.可读> | <异步迭代器>- 返回:<Promise>实现包含流的全部内容的<Buffer> 。
import { buffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';
const dataBuffer = Buffer.from('hello world from consumers!');
const readable = Readable.from(dataBuffer);
const data = await buffer(readable);
console.log(`from readable: ${data.length}`);
const { buffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { Buffer } = require('node:buffer');
const dataBuffer = Buffer.from('hello world from consumers!');
const readable = Readable.from(dataBuffer);
buffer(readable).then((data) => {
console.log(`from readable: ${data.length}`);
});
streamConsumers.json(stream)
#
stream
<ReadableStream> | <流.可读> | <异步迭代器>- 返回:<Promise>将流的内容解析为 UTF-8 编码字符串,然后通过
JSON.parse()
传递。
import { json } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const items = Array.from(
{
length: 100,
},
() => ({
message: 'hello world from consumers!',
}),
);
const readable = Readable.from(JSON.stringify(items));
const data = await json(readable);
console.log(`from readable: ${data.length}`);
const { json } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const items = Array.from(
{
length: 100,
},
() => ({
message: 'hello world from consumers!',
}),
);
const readable = Readable.from(JSON.stringify(items));
json(readable).then((data) => {
console.log(`from readable: ${data.length}`);
});
streamConsumers.text(stream)
#
stream
<ReadableStream> | <流.可读> | <异步迭代器>- 返回:<Promise>实现将流的内容解析为 UTF-8 编码字符串。
import { text } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const readable = Readable.from('Hello world from consumers!');
const data = await text(readable);
console.log(`from readable: ${data.length}`);
const { text } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const readable = Readable.from('Hello world from consumers!');
text(readable).then((data) => {
console.log(`from readable: ${data.length}`);
});