- 断言测试
- 异步上下文跟踪
- 异步钩子
- 缓冲(Buffer)
- C++ 插件
- 使用 Node-API 的 C/C++ 插件
- C++ 嵌入 Node环境
- 子进程(Child processes)
- 集群(Cluster)
- 命令行选项
- 控制台(Console)
- 核心包(Corepack)
- 加密(Crypto)
- 调试器(Debugger)
- 已弃用的 API
- 诊断通道(Diagnostics Channel)
- 域名系统(DNS)
- 域(Domain)
- 错误(Errors)
- 事件(Events)
- 文件系统(File system)
- 全局变量(Globals)
- HTTP
- HTTP/2
- HTTPS
- 检查器(Inspector)
- 国际化
- 模块:CommonJS 模块
- 模块:ECMAScript 模块
- 模块:
node:module
API - 模块:packages 模块
- 网络(Net)
- 系统(OS)
- 路径(Path)
- 性能挂钩(Performance hooks)
- 性能挂钩(Permissions)
- 进程(Process)
- Punycode 国际化域名编码
- 查询字符串(Query strings)
- 命令行库(Readline)
- REPL 交互式编程环境
- 诊断报告
- 单个可执行应用程序
- Stream 流
- 字符串解码器
- 单元测试
- 定时器(Timers)
- 传输层安全/SSL
- 跟踪事件
- TTY
- UDP/数据报
- URL
- 实用程序
- V8
- 虚拟机
- WebAssembly
- Web加密 API(Web Crypto API)
- 网络流 API(Web Streams API)
- 工作线程(Worker threads)
- zlib
Node.js v18.18.2 文档
- Node.js v18.18.2
-
►
目录
- 异步上下文跟踪
- 介绍
- 类别:
AsyncLocalStorage
new AsyncLocalStorage()
- 静态方法:
AsyncLocalStorage.bind(fn)
- 静态方法:
AsyncLocalStorage.snapshot()
asyncLocalStorage.disable()
asyncLocalStorage.getStore()
asyncLocalStorage.enterWith(store)
asyncLocalStorage.run(store, callback[, ...args])
asyncLocalStorage.exit(callback[, ...args])
- 与
async/await
一起使用 - 故障排除:上下文丢失
- 类:
AsyncResource
new AsyncResource(type[, options])
- 静态方法:
AsyncResource.bind(fn[, type[, thisArg]])
asyncResource.bind(fn[, thisArg])
asyncResource.runInAsyncScope(fn[, thisArg, ...args])
asyncResource.emitDestroy()
asyncResource.asyncId()
asyncResource.triggerAsyncId()
- Using
AsyncResource
for aWorker
thread pool - Integrating
AsyncResource
withEventEmitter
- 异步上下文跟踪
-
►
索引
- Assertion testing
- Asynchronous context tracking
- Async hooks
- Buffer
- C++ addons
- C/C++ addons with Node-API
- C++ embedder API
- Child processes
- Cluster
- Command-line options
- Console
- Corepack
- Crypto
- Debugger
- Deprecated APIs
- Diagnostics Channel
- DNS
- Domain
- Errors
- Events
- File system
- Globals
- HTTP
- HTTP/2
- HTTPS
- Inspector
- Internationalization
- Modules: CommonJS modules
- Modules: ECMAScript modules
- Modules:
node:module
API - Modules: Packages
- Net
- 系统(OS)
- 路径(Path)
- Performance hooks
- Permissions
- 进程(Process)
- Punycode
- Query strings
- 命令行库(Readline)
- REPL 交互式编程环境
- Report
- Single executable applications
- Stream
- String decoder
- Test runner
- Timers
- TLS/SSL
- Trace events
- TTY
- UDP/datagram
- URL
- Utilities
- V8
- VM
- WASI
- Web Crypto API
- Web Streams API
- Worker threads
- Zlib
- ► 其他版本
- ► 选项
目录
- 异步上下文跟踪
- 介绍
- 类别:
AsyncLocalStorage
new AsyncLocalStorage()
- 静态方法:
AsyncLocalStorage.bind(fn)
- 静态方法:
AsyncLocalStorage.snapshot()
asyncLocalStorage.disable()
asyncLocalStorage.getStore()
asyncLocalStorage.enterWith(store)
asyncLocalStorage.run(store, callback[, ...args])
asyncLocalStorage.exit(callback[, ...args])
- 与
async/await
一起使用 - 故障排除:上下文丢失
- 类:
AsyncResource
new AsyncResource(type[, options])
- 静态方法:
AsyncResource.bind(fn[, type[, thisArg]])
asyncResource.bind(fn[, thisArg])
asyncResource.runInAsyncScope(fn[, thisArg, ...args])
asyncResource.emitDestroy()
asyncResource.asyncId()
asyncResource.triggerAsyncId()
- 将
AsyncResource
用于Worker
线程池 - 将
AsyncResource
与EventEmitter
集成
异步上下文跟踪#
源代码: lib/async_hooks.js
介绍#
这些类用于关联状态并将其传播到回调和 Promise 链中。它们允许在 Web 请求的整个生命周期或任何其他异步持续时间内存储数据。它类似于其他语言中的线程本地存储。
AsyncLocalStorage
和AsyncResource
类是
node:async_hooks
模块的一部分:
import { AsyncLocalStorage, AsyncResource } from 'node:async_hooks';
const { AsyncLocalStorage, AsyncResource } = require('node:async_hooks');
类别:AsyncLocalStorage
#
此类创建通过异步操作保持一致的存储。
虽然您可以在node:async_hooks
模块之上创建自己的实现
,但应该首选AsyncLocalStorage
,因为它是一个高性能且内存安全的实现,涉及非重要的优化显然可以实施。
以下示例使用AsyncLocalStorage
构建一个简单的记录器,该记录器为传入的 HTTP 请求分配 ID,并将它们包含在每个请求中记录的消息中。
import http from 'node:http';
import { AsyncLocalStorage } from 'node:async_hooks';
const asyncLocalStorage = new AsyncLocalStorage();
function logWithId(msg) {
const id = asyncLocalStorage.getStore();
console.log(`${id !== undefined ? id : '-'}:`, msg);
}
let idSeq = 0;
http.createServer((req, res) => {
asyncLocalStorage.run(idSeq++, () => {
logWithId('start');
// Imagine any chain of async operations here
setImmediate(() => {
logWithId('finish');
res.end();
});
});
}).listen(8080);
http.get('http://localhost:8080');
http.get('http://localhost:8080');
// Prints:
// 0: start
// 1: start
// 0: finish
// 1: finish
const http = require('node:http');
const { AsyncLocalStorage } = require('node:async_hooks');
const asyncLocalStorage = new AsyncLocalStorage();
function logWithId(msg) {
const id = asyncLocalStorage.getStore();
console.log(`${id !== undefined ? id : '-'}:`, msg);
}
let idSeq = 0;
http.createServer((req, res) => {
asyncLocalStorage.run(idSeq++, () => {
logWithId('start');
// Imagine any chain of async operations here
setImmediate(() => {
logWithId('finish');
res.end();
});
});
}).listen(8080);
http.get('http://localhost:8080');
http.get('http://localhost:8080');
// Prints:
// 0: start
// 1: start
// 0: finish
// 1: finish
AsyncLocalStorage
的每个实例都维护一个独立的存储上下文。多个实例可以安全地同时存在,而不存在相互干扰数据的风险。
new AsyncLocalStorage()
#
创建AsyncLocalStorage
的新实例。仅在run()
调用内或enterWith()
调用后提供存储
。
静态方法:AsyncLocalStorage.bind(fn)
#
fn
<Function>绑定到当前执行上下文的函数。- 返回:<Function>在捕获的执行上下文中调用
fn
的新函数。
将给定函数绑定到当前执行上下文。
静态方法:AsyncLocalStorage.snapshot()
#
- 返回:<Function>签名为
(fn: (...args) : R, ...args) : R
的新函数。
捕获当前执行上下文并返回一个接受函数作为参数的函数。每当调用返回的函数时,它都会在捕获的上下文中调用传递给它的函数。
const asyncLocalStorage = new AsyncLocalStorage();
const runInAsyncScope = asyncLocalStorage.run(123, () => AsyncLocalStorage.snapshot());
const result = asyncLocalStorage.run(321, () => runInAsyncScope(() => asyncLocalStorage.getStore()));
console.log(result); // returns 123
AsyncLocalStorage.snapshot() 可以替换 AsyncResource 的使用以实现简单的异步上下文跟踪目的,例如:
class Foo {
#runInAsyncScope = AsyncLocalStorage.snapshot();
get() { return this.#runInAsyncScope(() => asyncLocalStorage.getStore()); }
}
const foo = asyncLocalStorage.run(123, () => new Foo());
console.log(asyncLocalStorage.run(321, () => foo.get())); // returns 123
asyncLocalStorage.disable()
#
禁用AsyncLocalStorage
的实例。对asyncLocalStorage.getStore()
的所有后续调用都将返回undefined
,直到
asyncLocalStorage.run()
或asyncLocalStorage.enterWith()
再次被调用。
当调用asyncLocalStorage.disable()
时,链接到该实例的所有当前上下文都将退出。
需要先
调用asyncLocalStorage.disable()
,然后才能对 asyncLocalStorage
进行垃圾收集。这不适用于asyncLocalStorage
提供的存储,因为这些对象与相应的异步资源一起被垃圾收集。
当当前进程不再使用asyncLocalStorage
时,使用此方法。
asyncLocalStorage.getStore()
#
- 返回:<任意>
返回当前商店。如果在通过调用asyncLocalStorage.run()
或asyncLocalStorage.enterWith()
初始化的异步上下文之外调用,则返回undefined
。
asyncLocalStorage.enterWith(store)
#
store
<任意>
转换到上下文以执行当前同步执行的剩余部分,然后通过任何后续异步调用保留存储。
例子:
const store = { id: 1 };
// Replaces previous store with the given store object
asyncLocalStorage.enterWith(store);
asyncLocalStorage.getStore(); // Returns the store object
someAsyncOperation(() => {
asyncLocalStorage.getStore(); // Returns the same object
});
此转换将在整个同步执行过程中持续进行。这意味着,例如,如果在事件处理程序中输入上下文,则后续事件处理程序也将在该上下文中运行,除非使用 AsyncResource
专门绑定到另一个上下文。这就是为什么
run()
应优先于enterWith()
,除非有充分的理由使用后一种方法。
const store = { id: 1 };
emitter.on('my-event', () => {
asyncLocalStorage.enterWith(store);
});
emitter.on('my-event', () => {
asyncLocalStorage.getStore(); // Returns the same object
});
asyncLocalStorage.getStore(); // Returns undefined
emitter.emit('my-event');
asyncLocalStorage.getStore(); // Returns the same object
asyncLocalStorage.run(store, callback[, ...args])
#
在上下文中同步运行函数并返回其返回值。在回调函数之外无法访问该存储。在回调中创建的任何异步操作都可以访问该存储。
可选的args
被传递给回调函数。
如果回调函数抛出错误,则run()
也会抛出错误。堆栈跟踪不受此调用的影响,并且上下文已退出。
例子:
const store = { id: 2 };
try {
asyncLocalStorage.run(store, () => {
asyncLocalStorage.getStore(); // Returns the store object
setTimeout(() => {
asyncLocalStorage.getStore(); // Returns the store object
}, 200);
throw new Error();
});
} catch (e) {
asyncLocalStorage.getStore(); // Returns undefined
// The error will be caught here
}
asyncLocalStorage.exit(callback[, ...args])
#
在上下文之外同步运行函数并返回其返回值。无法在回调函数或回调中创建的异步操作中访问存储。
回调函数内完成的任何getStore()
调用将始终返回undefined
。
可选的args
被传递给回调函数。
如果回调函数抛出错误,则exit()
也会抛出错误。堆栈跟踪不受此调用的影响,并且重新进入上下文。
例子:
// Within a call to run
try {
asyncLocalStorage.getStore(); // Returns the store object or value
asyncLocalStorage.exit(() => {
asyncLocalStorage.getStore(); // Returns undefined
throw new Error();
});
} catch (e) {
asyncLocalStorage.getStore(); // Returns the same object or value
// The error will be caught here
}
与async/await
一起使用#
如果在异步函数中,只有一个await
调用要在上下文中运行,则应使用以下模式:
async function fn() {
await asyncLocalStorage.run(new Map(), () => {
asyncLocalStorage.getStore().set('key', value);
return foo(); // The return value of foo will be awaited
});
}
在此示例中,存储仅在回调函数和foo
调用的函数中可用。在run
之外,调用getStore
将返回
undefined
。
故障排除:上下文丢失#
在大多数情况下,AsyncLocalStorage
可以正常工作。在极少数情况下,当前存储会在异步操作之一中丢失。
如果您的代码是基于回调的,则使用
util.promisify()
进行 Promise 就足够了,因此它开始使用本机 Promise 。
如果您需要使用基于回调的 API 或者您的代码采用自定义 thenable 实现,请使用AsyncResource
类将异步操作与正确的执行上下文关联起来。通过在您怀疑导致上下文丢失的调用之后记录asyncLocalStorage.getStore()
的内容,找到导致上下文丢失的函数调用。当代码记录undefined
时,最后调用的回调可能是导致上下文丢失的原因。
类:AsyncResource
#
类AsyncResource
旨在通过嵌入的异步资源进行扩展。使用此功能,用户可以轻松触发自己资源的生命周期事件。
当实例化AsyncResource
时,会触发init
钩子。
以下是AsyncResource
API 的概述。
import { AsyncResource, executionAsyncId } from 'node:async_hooks';
// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false },
);
// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);
// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();
// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();
// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();
const { AsyncResource, executionAsyncId } = require('node:async_hooks');
// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false },
);
// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);
// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();
// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();
// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();
new AsyncResource(type[, options])
#
用法示例:
class DBQuery extends AsyncResource {
constructor(db) {
super('DBQuery');
this.db = db;
}
getInfo(query, callback) {
this.db.get(query, (err, data) => {
this.runInAsyncScope(callback, null, err, data);
});
}
close() {
this.db = null;
this.emitDestroy();
}
}
静态方法:AsyncResource.bind(fn[, type[, thisArg]])
#
fn
<Function>绑定到当前执行上下文的函数。type
<string>与底层AsyncResource
关联的可选名称 。thisArg
<任意>
将给定函数绑定到当前执行上下文。
返回的函数将有一个asyncResource
属性,引用该函数绑定到的AsyncResource
。
asyncResource.bind(fn[, thisArg])
#
fn
<Function>绑定到当前AsyncResource
的函数。thisArg
<任意>
将要执行的给定函数绑定到此AsyncResource
的作用域。
返回的函数将有一个asyncResource
属性,引用该函数绑定到的AsyncResource
。
asyncResource.runInAsyncScope(fn[, thisArg, ...args])
#
fn
<Function>在此异步资源的执行上下文中调用的函数。thisArg
<any>用于函数调用的接收器。...args
<any>传递给函数的可选参数。
在异步资源的执行上下文中使用提供的参数调用提供的函数。这将建立上下文,在回调之前触发AsyncHooks,调用该函数,在回调后触发AsyncHooks,然后恢复原始执行上下文。
asyncResource.emitDestroy()
#
- 返回:<AsyncResource>对
asyncResource
的引用。
调用所有destroy
挂钩。这应该只被调用一次。如果多次调用就会抛出错误。这必须手动调用。如果资源由 GC 收集,则永远不会调用destroy
挂钩。
asyncResource.asyncId()
#
- 返回:<number>分配给资源的唯一
asyncId
。
asyncResource.triggerAsyncId()
#
- 返回:<number>与传递给
AsyncResource
构造函数相同的triggerAsyncId
。
将AsyncResource
用于Worker
线程池#
以下示例演示如何使用AsyncResource
类为Worker
池正确提供异步跟踪。其他资源池,例如数据库连接池,可以遵循类似的模型。
假设任务是添加两个数字,使用名为
task_processor.js
的文件,其内容如下:
import { parentPort } from 'node:worker_threads';
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
});
const { parentPort } = require('node:worker_threads');
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
});
围绕它的工作池可以使用以下结构:
import { AsyncResource } from 'node:async_hooks';
import { EventEmitter } from 'node:events';
import path from 'node:path';
import { Worker } from 'node:worker_threads';
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}
done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}
export default class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
this.tasks = [];
for (let i = 0; i < numThreads; i++)
this.addNewWorker();
// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift();
this.runTask(task, callback);
}
});
}
addNewWorker() {
const worker = new Worker(new URL('task_processor.js', import.meta.url));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback });
return;
}
const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}
close() {
for (const worker of this.workers) worker.terminate();
}
}
const { AsyncResource } = require('node:async_hooks');
const { EventEmitter } = require('node:events');
const path = require('node:path');
const { Worker } = require('node:worker_threads');
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}
done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}
class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
this.tasks = [];
for (let i = 0; i < numThreads; i++)
this.addNewWorker();
// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift();
this.runTask(task, callback);
}
});
}
addNewWorker() {
const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback });
return;
}
const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}
close() {
for (const worker of this.workers) worker.terminate();
}
}
module.exports = WorkerPool;
如果没有WorkerPoolTaskInfo
对象添加的显式跟踪,回调似乎与各个Worker
对象关联。但是, Worker
的创建与任务的创建无关,并且不提供有关任务何时计划的信息。
该池可以按如下方式使用:
import WorkerPool from './worker_pool.js';
import os from 'node:os';
const pool = new WorkerPool(os.availableParallelism());
let finished = 0;
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result);
if (++finished === 10)
pool.close();
});
}
const WorkerPool = require('./worker_pool.js');
const os = require('node:os');
const pool = new WorkerPool(os.availableParallelism());
let finished = 0;
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result);
if (++finished === 10)
pool.close();
});
}
将AsyncResource
与EventEmitter
集成#
由EventEmitter
触发的事件侦听器可能会在与调用eventEmitter.on()
时事件的执行上下文不同的执行上下文中运行。
以下示例演示如何使用AsyncResource
类将事件侦听器与正确的执行上下文正确关联。相同的方法可以应用于Stream
或类似的事件驱动类。
import { createServer } from 'node:http';
import { AsyncResource, executionAsyncId } from 'node:async_hooks';
const server = createServer((req, res) => {
req.on('close', AsyncResource.bind(() => {
// Execution context is bound to the current outer scope.
}));
req.on('close', () => {
// Execution context is bound to the scope that caused 'close' to emit.
});
res.end();
}).listen(3000);
const { createServer } = require('node:http');
const { AsyncResource, executionAsyncId } = require('node:async_hooks');
const server = createServer((req, res) => {
req.on('close', AsyncResource.bind(() => {
// Execution context is bound to the current outer scope.
}));
req.on('close', () => {
// Execution context is bound to the scope that caused 'close' to emit.
});
res.end();
}).listen(3000);