Node.js v18.18.2 文档


目录

集群#

稳定性:2 - 稳定

源代码: lib/cluster.js

Node.js 进程集群可用于运行多个 Node.js 实例,这些实例可以在其应用程序线程之间分配工作负载。当不需要进程隔离时,请改用worker_threads模块,该模块允许在单个 Node.js 实例中运行多个应用程序线程。

集群模块允许轻松创建所有共享服务器端口的子进程。

import cluster from 'node:cluster';
import http from 'node:http';
import { availableParallelism } from 'node:os';
import process from 'node:process';

const numCPUs = availableParallelism();

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);

  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`worker ${worker.process.pid} died`);
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);

  console.log(`Worker ${process.pid} started`);
}const cluster = require('node:cluster');
const http = require('node:http');
const numCPUs = require('node:os').availableParallelism();
const process = require('node:process');

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);

  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`worker ${worker.process.pid} died`);
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);

  console.log(`Worker ${process.pid} started`);
}

运行 Node.js 现在将在工作人员之间共享端口 8000:

$ node server.js
Primary 3596 is running
Worker 4324 started
Worker 4520 started
Worker 6056 started
Worker 5644 started 

在 Windows 上,尚无法在工作线程中设置命名管道服务器。

怎么运行的#

工作进程是使用child_process.fork()方法生成的,以便它们可以通过 IPC 与父进程通信并来回传递服务器句柄。

集群模块支持两种分配传入连接的方法。

第一种(也是除 Windows 之外的所有平台上的默认方法)是循环方法,其中主进程侦听端口,接受新连接并以循环方式将它们分发给工作进程,其中有一些内置的聪明地避免工作进程过载。

第二种方法是主进程创建侦听套接字并将其发送给感兴趣的工作人员。然后工作人员直接接受传入连接。

从理论上讲,第二种方法应该提供最佳性能。然而在实践中,由于操作系统调度程序的反复无常,分配往往非常不平衡。据观察,在总共 8 个进程中,超过 70% 的连接仅在两个进程中结束。

由于server.listen()将大部分工作交给主进程,因此在三种情况下,普通 Node.js 进程和集群工作进程之间的行为有所不同:

  1. server.listen({fd: 7})因为消息被传递到主节点,所以父节点中的文件描述符 7将被监听,并将句柄传递给工作线程,而不是监听工作线程关于 7 号文件描述符引用的内容。
  2. server.listen(handle)显式侦听句柄将导致工作线程使用提供的句柄,而不是与主进程通信。
  3. server.listen(0)通常,这会导致服务器侦听随机端口。然而,在集群中,每个工作线程每次执行listen(0)时都会收到相同的“随机”端口。本质上,端口第一次是随机的,但此后是可预测的。要侦听唯一端口,请根据集群工作线程 ID 生成端口号。

Node.js 不提供路由逻辑。因此,设计一个应用程序非常重要,使其不会过度依赖内存中的数据对象来进行会话和登录等操作。

由于工作进程都是独立的进程,因此可以根据程序的需要杀死或重新生成它们,而不会影响其他工作进程。只要还有一些工作人员还活着,服务器就会继续接受连接。如果没有工作人员处于事件状态,则现有连接将被丢弃,新连接将被拒绝。然而,Node.js 不会自动管理工作线程的数量。应用程序有责任根据自己的需要来管理工作池。

尽管node:cluster模块的主要用例是网络,但它也可用于需要工作进程的其他用例。

类:Worker#

Worker对象包含有关工作人员的所有公共信息和方法。在初选中,可以使用cluster.workers获取。在工作线程中,可以使用cluster.worker获取。

事件:'disconnect'#

cluster.on('disconnect')事件类似,但特定于该工作人员。

cluster.fork().on('disconnect', () => {
  // Worker has disconnected
}); 

事件:'error'#

此事件与child_process.fork()提供的事件相同。

在工作线程中,也可以使用process.on('error')

事件:'exit'#

  • code <number>退出代码(如果正常退出)。
  • signal <string>导致进程被终止的信号名称(例如'SIGHUP' )。

cluster.on('exit')事件类似,但特定于该工作人员。

import cluster from 'node:cluster';

if (cluster.isPrimary) {
  const worker = cluster.fork();
  worker.on('exit', (code, signal) => {
    if (signal) {
      console.log(`worker was killed by signal: ${signal}`);
    } else if (code !== 0) {
      console.log(`worker exited with error code: ${code}`);
    } else {
      console.log('worker success!');
    }
  });
}const cluster = require('node:cluster');

if (cluster.isPrimary) {
  const worker = cluster.fork();
  worker.on('exit', (code, signal) => {
    if (signal) {
      console.log(`worker was killed by signal: ${signal}`);
    } else if (code !== 0) {
      console.log(`worker exited with error code: ${code}`);
    } else {
      console.log('worker success!');
    }
  });
}

事件:'listening'#

cluster.on('listening')事件类似,但特定于该工作人员。

cluster.fork().on('listening', (address) => {
  // Worker is listening
});cluster.fork().on('listening', (address) => {
  // Worker is listening
});

它不会在Worker体内排放。

事件:'message'#

cluster'message'事件类似,但特定于此工作线程。

在工作线程中,也可以使用process.on('message')

请参阅process事件:'message'

这是使用消息系统的示例。它在主进程中记录工作人员收到的 HTTP 请求数量:

import cluster from 'node:cluster';
import http from 'node:http';
import { availableParallelism } from 'node:os';
import process from 'node:process';

if (cluster.isPrimary) {

  // Keep track of http requests
  let numReqs = 0;
  setInterval(() => {
    console.log(`numReqs = ${numReqs}`);
  }, 1000);

  // Count requests
  function messageHandler(msg) {
    if (msg.cmd && msg.cmd === 'notifyRequest') {
      numReqs += 1;
    }
  }

  // Start workers and listen for messages containing notifyRequest
  const numCPUs = availableParallelism();
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  for (const id in cluster.workers) {
    cluster.workers[id].on('message', messageHandler);
  }

} else {

  // Worker processes have a http server.
  http.Server((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');

    // Notify primary about the request
    process.send({ cmd: 'notifyRequest' });
  }).listen(8000);
}const cluster = require('node:cluster');
const http = require('node:http');
const process = require('node:process');

if (cluster.isPrimary) {

  // Keep track of http requests
  let numReqs = 0;
  setInterval(() => {
    console.log(`numReqs = ${numReqs}`);
  }, 1000);

  // Count requests
  function messageHandler(msg) {
    if (msg.cmd && msg.cmd === 'notifyRequest') {
      numReqs += 1;
    }
  }

  // Start workers and listen for messages containing notifyRequest
  const numCPUs = require('node:os').availableParallelism();
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  for (const id in cluster.workers) {
    cluster.workers[id].on('message', messageHandler);
  }

} else {

  // Worker processes have a http server.
  http.Server((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');

    // Notify primary about the request
    process.send({ cmd: 'notifyRequest' });
  }).listen(8000);
}

事件:'online'#

cluster.on('online')事件类似,但特定于该工作人员。

cluster.fork().on('online', () => {
  // Worker is online
}); 

它不会在Worker体内排放。

worker.disconnect()#

在工作线程中,此函数将关闭所有服务器,等待这些服务器上的'close'事件,然后断开 IPC 通道。

在主节点中,内部消息会发送到工作线程,导致其 自行调用.disconnect()

导致设置.exitedAfterDisconnect

服务器关闭后,它将不再接受新连接,但连接可能会被任何其他侦听工作线程接受。现有连接将被允许照常关闭。当不再存在连接时,请参阅server.close(),工作线程的 IPC 通道将关闭,使其优雅地终止。

以上适用于服务器连接,客户端连接不会被worker自动关闭,并且disconnect不会等待它们关闭才退出。

在worker中,存在process.disconnect,但它不是这个函数;它是disconnect()

由于长期存在的服务器连接可能会阻止工作人员断开连接,因此发送消息可能很有用,因此可以采取应用程序特定的操作来关闭它们。实现超时也可能很有用,如果在一段时间后没有发出'disconnect'事件,则杀死工作线程。

if (cluster.isPrimary) {
  const worker = cluster.fork();
  let timeout;

  worker.on('listening', (address) => {
    worker.send('shutdown');
    worker.disconnect();
    timeout = setTimeout(() => {
      worker.kill();
    }, 2000);
  });

  worker.on('disconnect', () => {
    clearTimeout(timeout);
  });

} else if (cluster.isWorker) {
  const net = require('node:net');
  const server = net.createServer((socket) => {
    // Connections never end
  });

  server.listen(8000);

  process.on('message', (msg) => {
    if (msg === 'shutdown') {
      // Initiate graceful close of any connections to server
    }
  });
} 

worker.exitedAfterDisconnect#

如果工作线程由于.disconnect() 退出,则此属性为true 。如果工作人员以任何其他方式退出,则为false。如果工作线程尚未退出,则为undefined

布尔值worker.exitedAfterDisconnect允许区分自愿退出和意外退出,主节点可以根据此值选择不重生工作线程。

cluster.on('exit', (worker, code, signal) => {
  if (worker.exitedAfterDisconnect === true) {
    console.log('Oh, it was just voluntary – no need to worry');
  }
});

// kill worker
worker.kill(); 

worker.id#

每个新的worker都有自己唯一的id,这个id存储在 id中。

当工作线程处于事件状态时,这是在cluster.workers中对其进行索引的键 。

worker.isConnected()#

如果工作线程通过其 IPC 通道连接到其主节点,则此函数返回true ,否则返回 false。工作线程在创建后会连接到其主节点。发出'disconnect'事件后,它会断开连接。

worker.isDead()#

如果工作进程已终止(由于退出或收到信号),此函数将返回true 。否则,它返回false

import cluster from 'node:cluster';
import http from 'node:http';
import { availableParallelism } from 'node:os';
import process from 'node:process';

const numCPUs = availableParallelism();

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);

  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('fork', (worker) => {
    console.log('worker is dead:', worker.isDead());
  });

  cluster.on('exit', (worker, code, signal) => {
    console.log('worker is dead:', worker.isDead());
  });
} else {
  // Workers can share any TCP connection. In this case, it is an HTTP server.
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end(`Current process\n ${process.pid}`);
    process.kill(process.pid);
  }).listen(8000);
}const cluster = require('node:cluster');
const http = require('node:http');
const numCPUs = require('node:os').availableParallelism();
const process = require('node:process');

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);

  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('fork', (worker) => {
    console.log('worker is dead:', worker.isDead());
  });

  cluster.on('exit', (worker, code, signal) => {
    console.log('worker is dead:', worker.isDead());
  });
} else {
  // Workers can share any TCP connection. In this case, it is an HTTP server.
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end(`Current process\n ${process.pid}`);
    process.kill(process.pid);
  }).listen(8000);
}

worker.kill([signal])#

  • signal <string>要发送到工作进程的终止信号的名称。默认值: 'SIGTERM'

这个函数会杀死worker。在主工作线程中,它通过断开worker.process 的连接来实现此目的,一旦断开连接,就会使用 signal进行终止。在工作线程中,它通过使用signal终止进程来实现。

kill()函数会终止工作进程,而不等待正常断开连接,它的行为与worker.process.kill()相同。

为了向后兼容,此方法别名为worker.destroy()

在worker中,存在process.kill(),但它不是这个函数;它是kill()

worker.process#

所有工作线程都是使用child_process.fork()创建的,该函数返回的对象存储为.process。 在工作线程中,存储全局process

请参阅:子进程模块

如果'disconnect'事件发生在process并且.exitedAfterDisconnect不是true ,工作人员将调用process.exit(0)。这可以防止意外断开。

worker.send(message[, sendHandle[, options]][, callback])#

  • message <对象>
  • sendHandle <句柄>
  • options <Object> options参数(如果存在)是用于参数化某些类型句柄的发送的对象。options支持以下属性:
    • keepOpen <boolean>传递net.Socket实例时可以使用的值 。当true时,套接字在发送过程中保持打开状态。 默认值: false
  • callback <函数>
  • 返回:<布尔值>

向工作节点或主节点发送消息,可以选择使用句柄。

在主数据库中,这会向特定工作人员发送一条消息。它与ChildProcess.send()相同 。

在工作进程中,这会向主进程发送一条消息。它与process.send()相同 。

此示例将回显来自主节点的所有消息:

if (cluster.isPrimary) {
  const worker = cluster.fork();
  worker.send('hi there');

} else if (cluster.isWorker) {
  process.on('message', (msg) => {
    process.send(msg);
  });
} 

事件:'disconnect'#

在工作进程 IPC 通道断开连接后发出。当工作线程正常退出、被终止或手动断开连接(例如使用 worker.disconnect())时,可能会发生这种情况。

'disconnect''exit'事件之间可能存在延迟。这些事件可用于检测进程是否陷入清理过程或是否存在长期连接。

cluster.on('disconnect', (worker) => {
  console.log(`The worker #${worker.id} has disconnected`);
}); 

事件:'exit'#

当任何工作线程死亡时,集群模块将发出'exit'事件。

这可用于通过再次调用.fork()来重新启动工作线程。

cluster.on('exit', (worker, code, signal) => {
  console.log('worker %d died (%s). restarting...',
              worker.process.pid, signal || code);
  cluster.fork();
}); 

请参阅child_process事件:'exit'

事件:'fork'#

当一个新的工作进程被分叉时,集群模块将发出一个'fork'事件。这可用于记录工作人员事件并创建自定义超时。

const timeouts = [];
function errorMsg() {
  console.error('Something must be wrong with the connection ...');
}

cluster.on('fork', (worker) => {
  timeouts[worker.id] = setTimeout(errorMsg, 2000);
});
cluster.on('listening', (worker, address) => {
  clearTimeout(timeouts[worker.id]);
});
cluster.on('exit', (worker, code, signal) => {
  clearTimeout(timeouts[worker.id]);
  errorMsg();
}); 

事件:'listening'#

从工作线程调用listen()后,当服务器上发出'listening'事件时, 'listening'事件也会在cluster上发出在小学。

事件处理程序使用两个参数执行,worker包含工作对象,而address对象包含以下连接属性:addressportaddressType。如果工作人员正在侦听多个地址,这一点非常有用。

cluster.on('listening', (worker, address) => {
  console.log(
    `A worker is now connected to ${address.address}:${address.port}`);
}); 

addressType是以下之一:

  • 4 (TCPv4)
  • 6 (TCPv6)
  • -1(Unix 域套接字)
  • 'udp4''udp6'(UDPv4 或 UDPv6)

事件:'message'#

当集群主节点收到来自任何工作线程的消息时发出。

请参阅child_process事件:'message'

事件:'online'#

分叉一个新的工作人员后,该工作人员应该通过在线消息进行响应。当主节点收到在线消息时,它将发出此事件。'fork''online'之间的区别在于,当主进程派生一个工作线程时会发出 fork,而当工作线程运行时会发出'online'

cluster.on('online', (worker) => {
  console.log('Yay, the worker responded after it was forked');
}); 

事件:'setup'#

每次调用.setupPrimary()时发出。

settings对象是调用 .setupPrimary()时的 cluster.settings对象,并且仅供参考,因为可以多次调用.setupPrimary()在一个滴答声中。

如果准确性很重要,请使用cluster.settings

cluster.disconnect([callback])#

  • callback <Function>当所有工作线程断开连接且句柄关闭时调用。

cluster.workers中的每个工作线程调用.disconnect()

当它们断开连接时,所有内部句柄都将关闭,如果没有其他事件正在等待,则允许主进程优雅地终止。

该方法采用一个可选的回调参数,该参数将在完成时调用。

这只能从主进程中调用。

cluster.fork([env])#

产生一个新的工作进程。

这只能从主进程中调用。

cluster.isMaster#

稳定性:0 - 已弃用

已弃用cluster.isPrimary的别名。

cluster.isPrimary#

如果该进程是主进程,则为 true。这是由process.env.NODE_UNIQUE_ID决定的。如果process.env.NODE_UNIQUE_ID未定义,则isPrimarytrue

cluster.isWorker#

如果该进程不是主进程,则为 True(它是cluster.isPrimary的否定)。

cluster.schedulingPolicy#

调度策略,cluster.SCHED_RR用于循环,或 cluster.SCHED_NONE将其留给操作系统。这是一个全局设置,一旦第一个工作线程生成或调用.setupPrimary() (以先到者为准),就会被有效冻结。

SCHED_RR是除 Windows 之外的所有操作系统上的默认值。一旦 libuv 能够有效地分发 IOCP 句柄而不会对性能造成较大影响,Windows 将更改为SCHED_RR

cluster.schedulingPolicy也可以通过NODE_CLUSTER_SCHED_POLICY环境变量设置 。有效值为'rr''none'

cluster.settings#

  • <对象>
    • execArgv <string[]>传递给 Node.js 可执行文件的字符串参数列表。默认值: process.execArgv
    • exec <string>工作文件的文件路径。默认值: process.argv[1]
    • args <string[]>传递给工作器的字符串参数。 默认值: process.argv.slice(2)
    • cwd <string>工作进程的当前工作目录。默认值: undefined(从父进程继承)。
    • serialization <string>指定用于在进程之间发送消息的序列化类型。可能的值为'json''advanced'。有关更多详细信息,请参阅child_process的高级序列化默认值: false
    • silent <boolean>是否将输出发送到父级的 stdio。 默认值: false
    • stdio <Array>配置分叉进程的 stdio。由于集群模块依赖 IPC 来运行,因此此配置必须包含 'ipc'条目。当提供此选项时,它将覆盖silent。请参阅 child_process.spawn()stdio
    • uid <number>设置进程的用户身份。(参见setuid(2)。)
    • gid <number>设置进程的组标识。(参见setgid(2)。)
    • inspectPort <数字> | <函数>设置worker的检查器端口。这可以是一个数字,也可以是一个不带参数并返回数字的函数。默认情况下,每个工作线程都有自己的端口,从主端口的process.debugPort开始递增。
    • windowsHide <boolean>隐藏通常在 Windows 系统上创建的分叉进程控制台窗口。默认值: false

调用.setupPrimary()(或.fork())后,此设置对象将包含设置,包括默认值。

该对象不应手动更改或设置。

cluster.setupMaster([settings])#

稳定性:0 - 已弃用

已弃用.setupPrimary()的别名。

cluster.setupPrimary([settings])#

setupPrimary用于更改默认的“fork”行为。调用后,设置将显示在cluster.settings中。

任何设置更改只会影响将来对.fork()的调用,而不会影响已在运行的工作线程。

无法通过.setupPrimary()设置的工作线程的唯一属性是传递给.fork()env

上述默认值仅适用于第一次调用;以后调用的默认值是调用cluster.setupPrimary()时的当前值。

import cluster from 'node:cluster';

cluster.setupPrimary({
  exec: 'worker.js',
  args: ['--use', 'https'],
  silent: true,
});
cluster.fork(); // https worker
cluster.setupPrimary({
  exec: 'worker.js',
  args: ['--use', 'http'],
});
cluster.fork(); // http workerconst cluster = require('node:cluster');

cluster.setupPrimary({
  exec: 'worker.js',
  args: ['--use', 'https'],
  silent: true,
});
cluster.fork(); // https worker
cluster.setupPrimary({
  exec: 'worker.js',
  args: ['--use', 'http'],
});
cluster.fork(); // http worker

这只能从主进程中调用。

cluster.worker#

对当前工作对象的引用。在主进程中不可用。

import cluster from 'node:cluster';

if (cluster.isPrimary) {
  console.log('I am primary');
  cluster.fork();
  cluster.fork();
} else if (cluster.isWorker) {
  console.log(`I am worker #${cluster.worker.id}`);
}const cluster = require('node:cluster');

if (cluster.isPrimary) {
  console.log('I am primary');
  cluster.fork();
  cluster.fork();
} else if (cluster.isWorker) {
  console.log(`I am worker #${cluster.worker.id}`);
}

cluster.workers#

存储事件工作线程对象的哈希,以id字段为键。这使得循环所有工作人员变得很容易。它仅在主进程中可用。

在工作线程断开连接 退出后,该工作线程将从cluster.workers中删除。这两个事件之间的顺序无法提前确定。但是,可以保证从cluster.workers列表中删除 发生在最后一个'disconnect''exit'事件发出之前。

import cluster from 'node:cluster';

for (const worker of Object.values(cluster.workers)) {
  worker.send('big announcement to all workers');
}const cluster = require('node:cluster');

for (const worker of Object.values(cluster.workers)) {
  worker.send('big announcement to all workers');
}

NodeJS中文文档为Read dev Docs平台提供托管,中文NodeJS文档均由英文版NodeJS文档翻译,版权属于nodejs.org