熟悉 js 的朋友都知道,js 是单线程
的,在 Node 中,采用的是 多进程单线程 的模型。由于javascript单线程的限制,在多核服务器上,我们往往需要启动多个进程才能最大化服务器性能。
Node.js 进程集群可用于运行多个 Node.js 实例,这些实例可以在其应用程序线程之间分配工作负载。 当不需要进程隔离时,请改用 worker_threads
模块,它允许在单个 Node.js 实例中运行多个应用程序线程。
零、NodeJS多进程
- 进程总数,其中一个主进程,cpu 个数 x cpu 核数 个 子进程
- 无论 child_process 还是 cluster,都不是多线程模型,而是多进程模型
- 应对单线程问题,通常使用多进程的方式来模拟多线程
一、核心模块cluster集群
Node 在 V0.8 版本之后引入了 cluster模块,通过一个主进程 (master) 管理多个子进程 (worker) 的方式实现集群
。
集群模块可以轻松创建共享服务器端口的子进程。
cluster 底层是 child_process 模块,除了可以发送普通消息,还可以发送底层对象
TCP
、UDP
等,cluster
模块是child_process
模块和net
模块的组合应用。 cluster 启动时,内部会启动 TCP 服务器,将这个 TCP 服务器端 socket 的文件描述符发给工作进程。
在 cluster
模块应用中,一个主进程只能管理一组工作进程
,其运作模式没有 child_process
模块那么灵活,但是更加稳定:
1.cluster配置详情
1.1 引入cluster
const cluster = require('cluster')复
1.2 cluster常用属性
.isMaster
标识主进程, Node<16.isPrimary
标识主进程, Node>16.isWorker
标识子进程.worker
对当前工作进程对象的引用【子进程中】.workers
存储活动工作进程对象的哈希,以id
字段为键。 这样可以很容易地遍历所有工作进程。 它仅在主进程中可用。cluster.wokers[id] === worker
【主进程中】.settings
只读, cluster配置项。在调用 .setupPrimary()或.fork()方法之后,此设置对象将包含设置,包括默认值。之前为空对象。此对象不应手动更改或设置。
cluster.settings
配置项详情:
- `execArgv` <string[]>传给 Node.js 可执行文件的字符串参数列表。 **默认值:** `process.execArgv`。 - `exec` <string> 工作进程文件的文件路径。 **默认值:** `process.argv[1]`。 - `args` <string[]> 传给工作进程的字符串参数。 **默认值:**`process.argv.slice(2)`。 - `cwd` <string>工作进程的当前工作目录。 **默认值:** `undefined` (从父进程继承)。 - `serialization` <string>指定用于在进程之间发送消息的序列化类型。 可能的值为 `'json'` 和 `'advanced'`。 **默认值:** `false`。 - `silent` <boolean>是否将输出发送到父进程的标准输入输出。 **默认值:** `false`。 - `stdio` <Array>配置衍生进程的标准输入输出。 由于集群模块依赖 IPC 来运行,因此此配置必须包含 `'ipc'` 条目。 提供此选项时,它会覆盖 `silent`。 - `uid` <number>设置进程的用户标识。 - `gid` <number>设置进程的群组标识。 - `inspectPort` <number> | <Function> 设置工作进程的检查器端口。 这可以是数字,也可以是不带参数并返回数字的函数。 默认情况下,每个工作进程都有自己的端口,从主进程的 `process.debugPort` 开始递增。 - `windowsHide` <boolean> 隐藏通常在 Windows 系统上创建的衍生进程控制台窗口。 **默认值:** `false`。
1.3 cluster常用方法
.fork([env])
衍生新的工作进程【主进程中】.setupPrimary([settings])
Node>16.setupMaster([settings])
用于更改默认的 'fork' 行为,用后设置将出现在cluster.settings
中。任何设置更改只会影响未来对.fork()
的调用,而不会影响已经运行的工作进程。上述默认值仅适用于第一次调用。Node 小于 16【主进程中】.disconnect([callback])
当所有工作进程断开连接并关闭句柄时调用【主进程中】
1.4 cluster常用事件
为了让集群更加稳定和健壮,cluster
模块也暴露了许多事件:
'message'
事件, 当集群主进程接收到来自任何工作进程的消息时触发。'exit'
事件, 当任何工作进程死亡时,则集群模块将触发'exit'
事件。
cluster.on('exit', (worker, code, signal) => { console.log('worker %d died (%s). restarting...', worker.process.pid, signal || code); cluster.fork(); });
'listening'
事件,从工作进程调用listen()
后,当服务器上触发'listening'
事件时,则主进程中的cluster
也将触发'listening'
事件。
cluster.on('listening', (worker, address) => { console.log( `A worker is now connected to ${address.address}:${address.port}`); });
'fork'
事件,当新的工作进程被衍生时,则集群模块将触发'fork'
事件。
cluster.on('fork', (worker) => { timeouts[worker.id] = setTimeout(errorMsg, 2000); });
'setup'
事件,每次调用.setupPrimary()
时触发。disconnect
事件,在工作进程 IPC 通道断开连接后触发。 当工作进程正常退出、被杀死、或手动断开连接时
cluster.on('disconnect', (worker) => { console.log(`The worker #${worker.id} has disconnected`); });
1.5 Worker类
Worker
对象包含了工作进程的所有公共的信息和方法。 在主进程中,可以使用 cluster.workers
来获取它。 在工作进程中,可以使用 cluster.worker
来获取它。
1.5.1 worker常用属性
.id
工作进程标识,每个新的工作进程都被赋予了自己唯一的 id,此 id 存储在id
。当工作进程存活时,这是在cluster.workers
中索引它的键。.process
所有工作进程都是使用child_process.fork()
创建,此函数返回的对象存储为.process
。 在工作进程中,存储了全局的process
。
1.5.2 worker常用方法
.send(message[, sendHandle[, options]][, callback])
向工作进程或主进程发送消息,可选择使用句柄。在主进程中,这会向特定的工作进程发送消息。 它与ChildProcess.send()
相同。在工作进程中,这会向主进程发送消息。 它与process.send()
相同。.destroy()
.kill([signal])
此函数会杀死工作进程。kill()
函数在不等待正常断开连接的情况下杀死工作进程,它与worker.process.kill()
具有相同的行为。为了向后兼容,此方法别名为worker.destroy()
。.disconnect([callback])
发送给工作进程,使其调用自身的.disconnect()
将关闭所有服务器,等待那些服务器上的'close'
事件,然后断开 IPC 通道。.isConnect()
如果工作进程通过其 IPC 通道连接到其主进程,则此函数返回true
,否则返回false
。 工作进程在创建后连接到其主进程。.isDead()
如果工作进程已终止(由于退出或收到信号),则此函数返回true
。 否则,它返回false
。
1.5.3 worker常用事件
为了让集群更加稳定和健壮,cluster
模块也暴露了许多事件:
'message'
事件, 在工作进程中。
cluster.workers[id].on('message', messageHandler);
'exit'
事件, 当任何工作进程死亡时,则当前worker工作进程
对象将触发'exit'
事件。
if (cluster.isPrimary) { const worker = cluster.fork(); worker.on('exit', (code, signal) => { if (signal) { console.log(`worker was killed by signal: ${signal}`); } else if (code !== 0) { console.log(`worker exited with error code: ${code}`); } else { console.log('worker success!'); } }); }
'listening'
事件,从工作进程调用listen()
,对当前工作进程进行监听。
cluster.fork().on('listening', (address) => { // 工作进程正在监听 });
disconnect
事件,在工作进程 IPC 通道断开连接后触发。 当工作进程正常退出、被杀死、或手动断开连接时
cluster.fork().on('disconnect', () => { //限定于当前worker对象触发 });
2. 进程通信
Node中主进程和子进程之间通过进程间通信 (IPC) 实现进程间的通信,进程间通过 .send()
(a.send表示向a发送)方法发送消息,监听 message
事件收取信息,这是 cluster模块
通过集成 EventEmitter
实现的。还是一个简单的官网的进程间通信例子
- 子进程:
process.on('message')
、process.send()
- 父进程:
child.on('message')
、child.send()
# cluster.isMaster # cluster.fork() # cluster.workers # cluster.workers[id].on('message', messageHandler); # cluster.workers[id].send(); # process.on('message', messageHandler); # process.send(); const cluster = require('cluster'); const http = require('http'); # 主进程 if (cluster.isMaster) { // Keep track of http requests console.log(`Primary ${process.pid} is running`); let numReqs = 0; // Count requests function messageHandler(msg) { if (msg.cmd && msg.cmd === 'notifyRequest') { numReqs += 1; } } // Start workers and listen for messages containing notifyRequest // 开启多进程(cpu核心数) // 衍生工作进程。 const numCPUs = require('os').cpus().length; for (let i = 0; i < numCPUs; i++) { console.log(i) cluster.fork(); } // cluster worker 主进程与子进程通信 for (const id in cluster.workers) { // ***监听来自子进程的事件 cluster.workers[id].on('message', messageHandler); // ***向子进程发送 cluster.workers[id].send({ type: 'masterToWorker', from: 'master', data: { number: Math.floor(Math.random() * 50) } }); } cluster.on('exit', (worker, code, signal) => { console.log(`worker ${worker.process.pid} died`); }); } else { # 子进程 // 工作进程可以共享任何 TCP 连接 // 在本示例中,其是 HTTP 服务器 // Worker processes have a http server. http.Server((req, res) => { res.writeHead(200); res.end('hello worldn'); //****** !!!!Notify master about the request !!!!!!******* //****** 向process发送 process.send({ cmd: 'notifyRequest' }); //****** 监听从process来的 process.on('message', function(message) { // xxxxxxx }) }).listen(8000); console.log(`Worker ${process.pid} started`); }
2.1 句柄发送与还原
NodeJS 进程之间通信只有消息传递,不会真正的传递对象。
send()
方法在发送消息前,会将消息组装成 handle 和 message,这个 message 会经过 JSON.stringify
序列化,也就是说,传递句柄的时候,不会将整个对象传递过去,在 IPC 通道传输的都是字符串,传输后通过 JSON.parse
还原成对象。
2.2 监听共同端口
代码里有 app.listen(port)
在进行 fork 时,为什么多个进程可以监听同一个端口呢?
原因是主进程通过 send() 方法向多个子进程发送属于该主进程的一个服务对象的句柄,所以对于每一个子进程而言,它们在还原句柄之后,得到的服务对象是一样的,当网络请求向服务端发起时,进程服务是抢占式的,所以监听相同端口时不会引起异常。
- 看下端口被占用的情况:
# master.js const fork = require('child_process').fork; const cpus = require('os').cpus(); for (let i=0; i<cpus.length; i++) { const worker = fork('worker.js'); console.log('worker process created, pid: %s ppid: %s', worker.pid, process.pid); }
# worker.js const http = require('http'); http.createServer((req, res) => { res.end('I am worker, pid: ' + process.pid + ', ppid: ' + process.ppid); }).listen(3000);
以上代码示例,控制台执行
node master.js
只有一个 worker 可以监听到 3000 端口,其余将会抛出Error: listen EADDRINUSE :::3000
错误。
- 那么多进程模式下怎么实现多进程端口监听呢?答案还是有的,通过句柄传递 Node.js v0.5.9 版本之后支持进程间可
发送句柄
功能
/** * http://nodejs.cn/api/child_process.html#child_process_subprocess_send_message_sendhandle_options_callback * message * sendHandle */ subprocess.send(message, sendHandle)
当父子进程之间建立 IPC 通道之后,通过子进程对象的 send 方法发送消息,第二个参数 sendHandle 就是句柄,可以是 TCP套接字、TCP服务器、UDP套接字等
,为了解决上面多进程端口占用问题,我们将主进程的 socket 传递到子进程。
# master.js const fork = require('child_process').fork; const cpus = require('os').cpus(); const server = require('net').createServer(); server.listen(3000); process.title = 'node-master' for (let i=0; i<cpus.length; i++) { const worker = fork('worker.js'); # 句柄传递 worker.send('server', server); console.log('worker process created, pid: %s ppid: %s', worker.pid, process.pid); }
// worker.js let worker; process.title = 'node-worker' process.on('message', function (message, sendHandle) { if (message === 'server') { worker = sendHandle; worker.on('connection', function (socket) { console.log('I am worker, pid: ' + process.pid + ', ppid: ' + process.ppid) }); } });
验证一番,控制台执行 node master.js
2.3 进程负载均衡
了解 cluster
的话会知道,子进程是通过 cluster.fork()
创建的。在 linux 中,系统原生提供了 fork
方法,那么为什么 Node 选择自己实现 cluster模块
,而不是直接使用系统原生的方法?主要的原因是以下两点:
-
fork的进程监听同一端口会导致端口占用错误
-
fork的进程之间没有负载均衡,容易导致惊群现象
在 cluster模块
中,针对第一个问题,通过判断当前进程是否为 master进程
,若是,则监听端口,若不是则表示为 fork 的 worker进程
,不监听端口。
针对第二个问题,cluster模块
内置了负载均衡功能, master进程
负责监听端口接收请求,然后通过调度算法(默认为 Round-Robin,可以通过环境变量 NODE_CLUSTER_SCHED_POLICY
修改调度算法)分配给对应的 worker进程
。
3. 异常捕获
3.1 未捕获异常
当代码抛出了异常没有被捕获到时,进程将会退出,此时 Node.js 提供了 process.on('uncaughtException', handler)
接口来捕获它,但是当一个 Worker 进程遇到未捕获的异常时,它已经处于一个不确定状态,此时我们应该让这个进程优雅退出:
- 关闭异常 Worker 进程所有的 TCP Server(将已有的连接快速断开,且不再接收新的连接),断开和 Master 的 IPC 通道,不再接受新的用户请求。
- Master 立刻 fork 一个新的 Worker 进程,保证在线的『工人』总数不变。
- 异常 Worker 等待一段时间,处理完已经接受的请求后退出。
+---------+ +---------+ | Worker | | Master | +---------+ +----+----+ | uncaughtException | +------------+ | | | | +---------+ | <----------+ | | Worker | | | +----+----+ | disconnect | fork a new worker | +-------------------------> + ---------------------> | | wait... | | | exit | | +-------------------------> | | | | | die | | | | | |
3.2 OOM、系统异常
当一个进程出现异常导致 crash 或者 OOM 被系统杀死时,不像未捕获异常发生时我们还有机会让进程继续执行,只能够让当前进程直接退出,Master 立刻 fork 一个新的 Worker。
二、子进程
1. child_process模块
child_process 模块提供了衍生子进程的能力, 简单来说就是执行cmd命令的能力
。 默认情况下, stdin、 stdout 和 stderr 的管道会在父 Node.js 进程和衍生的子进程之间建立
。 这些管道具有有限的(且平台特定的)容量。 如果子进程写入 stdout 时超出该限制且没有捕获输出,则子进程会阻塞并等待管道缓冲区接受