Node.js v18.18.2 文档


目录

异步上下文跟踪#

稳定性:2 - 稳定

源代码: lib/async_hooks.js

介绍#

这些类用于关联状态并将其传播到回调和 Promise 链中。它们允许在 Web 请求的整个生命周期或任何其他异步持续时间内存储数据。它类似于其他语言中的线程本地存储。

AsyncLocalStorageAsyncResource类是 node:async_hooks模块的一部分:

import { AsyncLocalStorage, AsyncResource } from 'node:async_hooks';const { AsyncLocalStorage, AsyncResource } = require('node:async_hooks');

类别:AsyncLocalStorage#

此类创建通过异步操作保持一致的存储。

虽然您可以在node:async_hooks模块之上创建自己的实现 ,但应该首选AsyncLocalStorage,因为它是一个高性能且内存安全的实现,涉及非重要的优化显然可以实施。

以下示例使用AsyncLocalStorage构建一个简单的记录器,该记录器为传入的 HTTP 请求分配 ID,并将它们包含在每个请求中记录的消息中。

import http from 'node:http';
import { AsyncLocalStorage } from 'node:async_hooks';

const asyncLocalStorage = new AsyncLocalStorage();

function logWithId(msg) {
  const id = asyncLocalStorage.getStore();
  console.log(`${id !== undefined ? id : '-'}:`, msg);
}

let idSeq = 0;
http.createServer((req, res) => {
  asyncLocalStorage.run(idSeq++, () => {
    logWithId('start');
    // Imagine any chain of async operations here
    setImmediate(() => {
      logWithId('finish');
      res.end();
    });
  });
}).listen(8080);

http.get('http://localhost:8080');
http.get('http://localhost:8080');
// Prints:
//   0: start
//   1: start
//   0: finish
//   1: finishconst http = require('node:http');
const { AsyncLocalStorage } = require('node:async_hooks');

const asyncLocalStorage = new AsyncLocalStorage();

function logWithId(msg) {
  const id = asyncLocalStorage.getStore();
  console.log(`${id !== undefined ? id : '-'}:`, msg);
}

let idSeq = 0;
http.createServer((req, res) => {
  asyncLocalStorage.run(idSeq++, () => {
    logWithId('start');
    // Imagine any chain of async operations here
    setImmediate(() => {
      logWithId('finish');
      res.end();
    });
  });
}).listen(8080);

http.get('http://localhost:8080');
http.get('http://localhost:8080');
// Prints:
//   0: start
//   1: start
//   0: finish
//   1: finish

AsyncLocalStorage的每个实例都维护一个独立的存储上下文。多个实例可以安全地同时存在,而不存在相互干扰数据的风险。

new AsyncLocalStorage()#

创建AsyncLocalStorage的新实例。仅在run()调用内或enterWith()调用后提供存储 。

静态方法:AsyncLocalStorage.bind(fn)#

稳定性:1 - 实验性

  • fn <Function>绑定到当前执行上下文的函数。
  • 返回:<Function>在捕获的执行上下文中调用fn的新函数。

将给定函数绑定到当前执行上下文。

静态方法:AsyncLocalStorage.snapshot()#

稳定性:1 - 实验性

  • 返回:<Function>签名为 (fn: (...args) : R, ...args) : R 的新函数。

捕获当前执行上下文并返回一个接受函数作为参数的函数。每当调用返回的函数时,它都会在捕获的上下文中调用传递给它的函数。

const asyncLocalStorage = new AsyncLocalStorage();
const runInAsyncScope = asyncLocalStorage.run(123, () => AsyncLocalStorage.snapshot());
const result = asyncLocalStorage.run(321, () => runInAsyncScope(() => asyncLocalStorage.getStore()));
console.log(result);  // returns 123 

AsyncLocalStorage.snapshot() 可以替换 AsyncResource 的使用以实现简单的异步上下文跟踪目的,例如:

class Foo {
  #runInAsyncScope = AsyncLocalStorage.snapshot();

  get() { return this.#runInAsyncScope(() => asyncLocalStorage.getStore()); }
}

const foo = asyncLocalStorage.run(123, () => new Foo());
console.log(asyncLocalStorage.run(321, () => foo.get())); // returns 123 

asyncLocalStorage.disable()#

稳定性:1 - 实验性

禁用AsyncLocalStorage的实例。对asyncLocalStorage.getStore() 的所有后续调用都将返回undefined,直到 asyncLocalStorage.run()asyncLocalStorage.enterWith()再次被调用。

当调用asyncLocalStorage.disable()时,链接到该实例的所有当前上下文都将退出。

需要先 调用asyncLocalStorage.disable(),然后才能对 asyncLocalStorage进行垃圾收集。这不适用于asyncLocalStorage提供的存储,因为这些对象与相应的异步资源一起被垃圾收集。

当当前进程不再使用asyncLocalStorage时,使用此方法。

asyncLocalStorage.getStore()#

返回当前商店。如果在通过调用asyncLocalStorage.run()asyncLocalStorage.enterWith()初始化的异步上下文之外调用,则返回undefined

asyncLocalStorage.enterWith(store)#

稳定性:1 - 实验性

转换到上下文以执行当前同步执行的剩余部分,然后通过任何后续异步调用保留存储。

例子:

const store = { id: 1 };
// Replaces previous store with the given store object
asyncLocalStorage.enterWith(store);
asyncLocalStorage.getStore(); // Returns the store object
someAsyncOperation(() => {
  asyncLocalStorage.getStore(); // Returns the same object
}); 

此转换将在整个同步执行过程中持续进行。这意味着,例如,如果在事件处理程序中输入上下文,则后续事件处理程序也将在该上下文中运行,除非使用 AsyncResource专门绑定到另一个上下文。这就是为什么 run()应优先于enterWith(),除非有充分的理由使用后一种方法。

const store = { id: 1 };

emitter.on('my-event', () => {
  asyncLocalStorage.enterWith(store);
});
emitter.on('my-event', () => {
  asyncLocalStorage.getStore(); // Returns the same object
});

asyncLocalStorage.getStore(); // Returns undefined
emitter.emit('my-event');
asyncLocalStorage.getStore(); // Returns the same object 

asyncLocalStorage.run(store, callback[, ...args])#

在上下文中同步运行函数并返回其返回值。在回调函数之外无法访问该存储。在回调中创建的任何异步操作都可以访问该存储。

可选的args被传递给回调函数。

如果回调函数抛出错误,则run()也会抛出错误。堆栈跟踪不受此调用的影响,并且上下文已退出。

例子:

const store = { id: 2 };
try {
  asyncLocalStorage.run(store, () => {
    asyncLocalStorage.getStore(); // Returns the store object
    setTimeout(() => {
      asyncLocalStorage.getStore(); // Returns the store object
    }, 200);
    throw new Error();
  });
} catch (e) {
  asyncLocalStorage.getStore(); // Returns undefined
  // The error will be caught here
} 

asyncLocalStorage.exit(callback[, ...args])#

稳定性:1 - 实验性

在上下文之外同步运行函数并返回其返回值。无法在回调函数或回调中创建的异步操作中访问存储。 回调函数内完成的任何getStore()调用将始终返回undefined

可选的args被传递给回调函数。

如果回调函数抛出错误,则exit()也会抛出错误。堆栈跟踪不受此调用的影响,并且重新进入上下文。

例子:

// Within a call to run
try {
  asyncLocalStorage.getStore(); // Returns the store object or value
  asyncLocalStorage.exit(() => {
    asyncLocalStorage.getStore(); // Returns undefined
    throw new Error();
  });
} catch (e) {
  asyncLocalStorage.getStore(); // Returns the same object or value
  // The error will be caught here
} 

async/await一起使用#

如果在异步函数中,只有一个await调用要在上下文中运行,则应使用以下模式:

async function fn() {
  await asyncLocalStorage.run(new Map(), () => {
    asyncLocalStorage.getStore().set('key', value);
    return foo(); // The return value of foo will be awaited
  });
} 

在此示例中,存储仅在回调函数和foo调用的函数中可用。在run之外,调用getStore将返回 undefined

故障排除:上下文丢失#

在大多数情况下,AsyncLocalStorage 可以正常工作。在极少数情况下,当前存储会在异步操作之一中丢失。

如果您的代码是基于回调的,则使用 util.promisify()进行 Promise 就足够了,因此它开始使用本机 Promise 。

如果您需要使用基于回调的 API 或者您的代码采用自定义 thenable 实现,请使用AsyncResource类将异步操作与正确的执行上下文关联起来。通过在您怀疑导致上下文丢失的调用之后记录asyncLocalStorage.getStore()的内容,找到导致上下文丢失的函数调用。当代码记录undefined时,最后调用的回调可能是导致上下文丢失的原因。

类:AsyncResource#

AsyncResource旨在通过嵌入的异步资源进行扩展。使用此功能,用户可以轻松触发自己资源的生命周期事件。

当实例化AsyncResource时,会触发init钩子。

以下是AsyncResource API 的概述。

import { AsyncResource, executionAsyncId } from 'node:async_hooks';

// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
  type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false },
);

// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);

// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();

// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();

// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();const { AsyncResource, executionAsyncId } = require('node:async_hooks');

// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
  type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false },
);

// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);

// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();

// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();

// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();

new AsyncResource(type[, options])#

  • type <string>异步事件的类型。
  • options <对象>
    • triggerAsyncId <number>创建此异步事件的执行上下文的 ID。默认值: executionAsyncId()
    • requireManualDestroy <boolean>如果设置为true, 则在对象被垃圾回收时禁用emitDestroy 。通常不需要设置(即使手动调用emitDestroy),除非检索资源的asyncId 且敏感 API 的emitDestroy被它调用。当设置为false时,仅当至少有一个事件的destroy挂钩时才会发生垃圾回收的 emitDestroy调用。 默认值: false

用法示例:

class DBQuery extends AsyncResource {
  constructor(db) {
    super('DBQuery');
    this.db = db;
  }

  getInfo(query, callback) {
    this.db.get(query, (err, data) => {
      this.runInAsyncScope(callback, null, err, data);
    });
  }

  close() {
    this.db = null;
    this.emitDestroy();
  }
} 

静态方法:AsyncResource.bind(fn[, type[, thisArg]])#

将给定函数绑定到当前执行上下文。

返回的函数将有一个asyncResource属性,引用该函数绑定到的AsyncResource

asyncResource.bind(fn[, thisArg])#

将要执行的给定函数绑定到此AsyncResource的作用域。

返回的函数将有一个asyncResource属性,引用该函数绑定到的AsyncResource

asyncResource.runInAsyncScope(fn[, thisArg, ...args])#

  • fn <Function>在此异步资源的执行上下文中调用的函数。
  • thisArg <any>用于函数调用的接收器。
  • ...args <any>传递给函数的可选参数。

在异步资源的执行上下文中使用提供的参数调用提供的函数。这将建立上下文,在回调之前触发AsyncHooks,调用该函数,在回调后触发AsyncHooks,然后恢复原始执行上下文。

asyncResource.emitDestroy()#

调用所有destroy挂钩。这应该只被调用一次。如果多次调用就会抛出错误。这必须手动调用。如果资源由 GC 收集,则永远不会调用destroy挂钩。

asyncResource.asyncId()#

  • 返回:<number>分配给资源的唯一asyncId

asyncResource.triggerAsyncId()#

  • 返回:<number>与传递给 AsyncResource构造函数相同的triggerAsyncId

AsyncResource用于Worker线程池#

以下示例演示如何使用AsyncResource类为Worker池正确提供异步跟踪。其他资源池,例如数据库连接池,可以遵循类似的模型。

假设任务是添加两个数字,使用名为 task_processor.js的文件,其内容如下:

import { parentPort } from 'node:worker_threads';
parentPort.on('message', (task) => {
  parentPort.postMessage(task.a + task.b);
});const { parentPort } = require('node:worker_threads');
parentPort.on('message', (task) => {
  parentPort.postMessage(task.a + task.b);
});

围绕它的工作池可以使用以下结构:

import { AsyncResource } from 'node:async_hooks';
import { EventEmitter } from 'node:events';
import path from 'node:path';
import { Worker } from 'node:worker_threads';

const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');

class WorkerPoolTaskInfo extends AsyncResource {
  constructor(callback) {
    super('WorkerPoolTaskInfo');
    this.callback = callback;
  }

  done(err, result) {
    this.runInAsyncScope(this.callback, null, err, result);
    this.emitDestroy();  // `TaskInfo`s are used only once.
  }
}

export default class WorkerPool extends EventEmitter {
  constructor(numThreads) {
    super();
    this.numThreads = numThreads;
    this.workers = [];
    this.freeWorkers = [];
    this.tasks = [];

    for (let i = 0; i < numThreads; i++)
      this.addNewWorker();

    // Any time the kWorkerFreedEvent is emitted, dispatch
    // the next task pending in the queue, if any.
    this.on(kWorkerFreedEvent, () => {
      if (this.tasks.length > 0) {
        const { task, callback } = this.tasks.shift();
        this.runTask(task, callback);
      }
    });
  }

  addNewWorker() {
    const worker = new Worker(new URL('task_processor.js', import.meta.url));
    worker.on('message', (result) => {
      // In case of success: Call the callback that was passed to `runTask`,
      // remove the `TaskInfo` associated with the Worker, and mark it as free
      // again.
      worker[kTaskInfo].done(null, result);
      worker[kTaskInfo] = null;
      this.freeWorkers.push(worker);
      this.emit(kWorkerFreedEvent);
    });
    worker.on('error', (err) => {
      // In case of an uncaught exception: Call the callback that was passed to
      // `runTask` with the error.
      if (worker[kTaskInfo])
        worker[kTaskInfo].done(err, null);
      else
        this.emit('error', err);
      // Remove the worker from the list and start a new Worker to replace the
      // current one.
      this.workers.splice(this.workers.indexOf(worker), 1);
      this.addNewWorker();
    });
    this.workers.push(worker);
    this.freeWorkers.push(worker);
    this.emit(kWorkerFreedEvent);
  }

  runTask(task, callback) {
    if (this.freeWorkers.length === 0) {
      // No free threads, wait until a worker thread becomes free.
      this.tasks.push({ task, callback });
      return;
    }

    const worker = this.freeWorkers.pop();
    worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
    worker.postMessage(task);
  }

  close() {
    for (const worker of this.workers) worker.terminate();
  }
}const { AsyncResource } = require('node:async_hooks');
const { EventEmitter } = require('node:events');
const path = require('node:path');
const { Worker } = require('node:worker_threads');

const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');

class WorkerPoolTaskInfo extends AsyncResource {
  constructor(callback) {
    super('WorkerPoolTaskInfo');
    this.callback = callback;
  }

  done(err, result) {
    this.runInAsyncScope(this.callback, null, err, result);
    this.emitDestroy();  // `TaskInfo`s are used only once.
  }
}

class WorkerPool extends EventEmitter {
  constructor(numThreads) {
    super();
    this.numThreads = numThreads;
    this.workers = [];
    this.freeWorkers = [];
    this.tasks = [];

    for (let i = 0; i < numThreads; i++)
      this.addNewWorker();

    // Any time the kWorkerFreedEvent is emitted, dispatch
    // the next task pending in the queue, if any.
    this.on(kWorkerFreedEvent, () => {
      if (this.tasks.length > 0) {
        const { task, callback } = this.tasks.shift();
        this.runTask(task, callback);
      }
    });
  }

  addNewWorker() {
    const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
    worker.on('message', (result) => {
      // In case of success: Call the callback that was passed to `runTask`,
      // remove the `TaskInfo` associated with the Worker, and mark it as free
      // again.
      worker[kTaskInfo].done(null, result);
      worker[kTaskInfo] = null;
      this.freeWorkers.push(worker);
      this.emit(kWorkerFreedEvent);
    });
    worker.on('error', (err) => {
      // In case of an uncaught exception: Call the callback that was passed to
      // `runTask` with the error.
      if (worker[kTaskInfo])
        worker[kTaskInfo].done(err, null);
      else
        this.emit('error', err);
      // Remove the worker from the list and start a new Worker to replace the
      // current one.
      this.workers.splice(this.workers.indexOf(worker), 1);
      this.addNewWorker();
    });
    this.workers.push(worker);
    this.freeWorkers.push(worker);
    this.emit(kWorkerFreedEvent);
  }

  runTask(task, callback) {
    if (this.freeWorkers.length === 0) {
      // No free threads, wait until a worker thread becomes free.
      this.tasks.push({ task, callback });
      return;
    }

    const worker = this.freeWorkers.pop();
    worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
    worker.postMessage(task);
  }

  close() {
    for (const worker of this.workers) worker.terminate();
  }
}

module.exports = WorkerPool;

如果没有WorkerPoolTaskInfo对象添加的显式跟踪,回调似乎与各个Worker 对象关联。但是, Worker的创建与任务的创建无关,并且不提供有关任务何时计划的信息。

该池可以按如下方式使用:

import WorkerPool from './worker_pool.js';
import os from 'node:os';

const pool = new WorkerPool(os.availableParallelism());

let finished = 0;
for (let i = 0; i < 10; i++) {
  pool.runTask({ a: 42, b: 100 }, (err, result) => {
    console.log(i, err, result);
    if (++finished === 10)
      pool.close();
  });
}const WorkerPool = require('./worker_pool.js');
const os = require('node:os');

const pool = new WorkerPool(os.availableParallelism());

let finished = 0;
for (let i = 0; i < 10; i++) {
  pool.runTask({ a: 42, b: 100 }, (err, result) => {
    console.log(i, err, result);
    if (++finished === 10)
      pool.close();
  });
}

AsyncResourceEventEmitter集成#

EventEmitter触发的事件侦听器可能会在与调用eventEmitter.on()时事件的执行上下文不同的执行上下文中运行。

以下示例演示如何使用AsyncResource类将事件侦听器与正确的执行上下文正确关联。相同的方法可以应用于Stream或类似的事件驱动类。

import { createServer } from 'node:http';
import { AsyncResource, executionAsyncId } from 'node:async_hooks';

const server = createServer((req, res) => {
  req.on('close', AsyncResource.bind(() => {
    // Execution context is bound to the current outer scope.
  }));
  req.on('close', () => {
    // Execution context is bound to the scope that caused 'close' to emit.
  });
  res.end();
}).listen(3000);const { createServer } = require('node:http');
const { AsyncResource, executionAsyncId } = require('node:async_hooks');

const server = createServer((req, res) => {
  req.on('close', AsyncResource.bind(() => {
    // Execution context is bound to the current outer scope.
  }));
  req.on('close', () => {
    // Execution context is bound to the scope that caused 'close' to emit.
  });
  res.end();
}).listen(3000);

NodeJS中文文档为Read dev Docs平台提供托管,中文NodeJS文档均由英文版NodeJS文档翻译,版权属于nodejs.org