本篇文章和大家探讨一下Node.js利用多个核心的方法–worker_threads模块提供的多线程模型,介绍一下Node.js多进程模型中实现共享内存的方法。
Node.js 由于其单线程模型的设计,导致一个Node进程(的主线程)只能利用一个CPU核心,然而现在的机器基本上都是多核的,这造成了严重的性能浪费。通常来说,想要利用到多个核心一般有以下的方法:
-
编写Node的C++插件扩充线程池,并在JS代码中将CPU耗时任务委托给其它线程处理。
-
使用worker_threads模块提供的多线程模型(尚在实验阶段)。
-
使用child_process 或者 cluster模块提供的多进程模型,每个进程都是一个独立的Node.js进程。
从易用、代码入侵性、稳定性的角度来说,多进程模型通常是首要的选择。【推荐学习:《nodejs 教程》】
Node.js cluster 多进程模型存在的问题
在cluster模块提供的多进程模型中,每个Node进程都是一个独立且完整的应用进程,有自己的内存空间,其它进程无法访问。因此虽然在项目启动时,所有Worker进程具有一致的状态和行为,但在之后的运行中无法保证其状态维持一致。
例如,项目启动时有两个Worker进程,进程A和进程B,两个进程都声明了变量a=1。但之后项目接收到一个请求,Master进程将其分派给进程A来处理,这个请求将a的值变更为了2,那么此时进程A的内存空间中a=2,但是进程B的内存空间中a依旧是1。此时如果有个请求读取a的值,Master进程将这个请求分派给进程A和进程B时读取到的结果是不一致的,这就出现了一致性问题。
cluster模块在设计时并没有给出解决方案,而是要求Worker进程是无状态的,即程序员在写代码时不应该允许在处理请求时修改内存中的值,以此来保障所有Worker进程的一致性。然而在实践中总会有各种各样的情况需要写内存,比如记录用户的登录状态等,在许多企业的实践中,通常会把这些状态数据记录在外部,例如数据库、redis、消息队列、文件系统等,每次处理有状态请求时会读写外部存储空间。
这不失为一种有效的做法,然而这需要额外引入一个外部存储空间,同时还要自行处理多进程并发访问下的一致性问题,自行维护数据的生命周期(因为Node进程和维护在外部的数据并不是同步创建和销毁的),以及在高并发访问情况下的IO性能瓶颈(如果是存储在数据库等非内存环境中)。其实本质上来说,我们只是需要一个可供多个进程共享访问的空间罢了,并不需要持久化存储,这段空间的生命周期最好与Node进程强绑定,这样在使用时能省去不少麻烦。因此跨进程的共享内存就成了最适合在这种场景使用的方式。
Node.js 的共享内存
很遗憾Node本身并未提供共享内存的实现,因此我们可以看看npm仓库中第三方库的实现。这些库有些是通过C++插件扩充Node的函数实现的,有些是通过Node提供的IPC机制实现的,但很遗憾它们的实现都很简单,并未提供互斥访问、对象监听等功能,这使得使用者必须自己小心维护这段共享内存,否则就会导致时序问题。
转了一圈下来没找到我想要的。。。那就算了,我自己写一个。
共享内存的设计
首先我们必须理清楚到底需要个什么样的共享内存,我是根据我自身的需求出发(为了在项目中用它来存储跨进程访问的状态数据),同时兼顾通用性,因此会首先考虑以下几点:
-
以JS对象为基本单位进行读写访问。
-
能够进程间互斥访问,一个进程访问时,其它进程被阻塞。
-
能够监听共享内存中的对象,当对象发生变化的时候监听的进程能被通知到。
-
在满足上述条件的前提下,实现方式尽可能简单。
可以发现,其实我们并不需要操作系统层面的共享内存,只需要能够多个Node进程能访问同一个对象就行了,那么就可以在Node本身提供的机制上实现。可以使用Master进程的一段内存空间作为共享内存空间,Worker进程通过IPC将读写请求委托给Master进程,由Master进程进行读写,然后再通过IPC将结果返回给Worker进程。
为了让共享内存的使用方式在Master进程和Worker进程中一致,我们可以将对共享内存的操作抽离成一个接口,在Master进程和Worker进程中各自实现这个接口。类图如下图所示,用一个SharedMemory
类作为抽象接口,在server.js
入口文件中声明该对象。其在Master进程中实例化为Manager
对象,在Worker进程中实例化为Worker
对象。Manager
对象来维护共享内存,并处理对共享内存的读写请求,而Worker
对象则将读写请求发送到Master进程。
可以使用Manager
类中的一个属性作为共享内存对象,访问该对象的方式与访问普通JS对象的方式一致,然后再做一层封装,只暴露get
、set
、remove
等基本操作,避免该属性直接被修改。
由于Master进程会优先于所有Worker进程创建,因此,可以在Master进程中声明共享内存空间之后再创建Worker进程,以此来保证每个Worker进程创建后都可以立即访问共享内存。
为了使用简单,我们可以将SharedMemory
设计成单例,这样每个进程中就只有一个实例,并可以在import
了SharedMemory
之后直接使用。
代码实现
读写控制与IPC通信
首先实现对外接口SharedMemory
类,这里没有使用让Manager
和Worker
继承SharedMemory
的方式,而是让SharedMemory
在实例化的时候返回一个Manager
或Worker
的实例,从而实现自动选择子类。
在Node 16中
isPrimary
替代了isMaster
,这里为了兼容使用了两种写法。
// shared-memory.js class SharedMemory { constructor() { if (cluster.isMaster || cluster.isPrimary) { return new Manager(); } else { return new Worker(); } } }
Manager
负责管理共享内存空间,我们直接在Manager
对象中增加__sharedMemory__
属性,由于其本身也是JS对象,会被纳入JS的垃圾回收管理中,因此我们不需要进行内存清理、数据迁移等操作,使得实现上非常简洁。之后在__sharedMemory__
之中定义set
、get
、remove
等标准操作来提供访问方式。
我们通过cluster.on('online', callback)
来监听worker进程的创建事件,并在创建后立即用worker.on('message', callback)
来监听来自worker进程的IPC通信,并把通信消息交给handle
函数处理。
handle
函数的职责是区分worker进程是想进行哪种操作,并取出操作的参数委托给对应的set
、get
、remove
函数(注意不是__sharedMemory__
中的set
、get
、remove
)进行处理,并将处理后的结果返还给worker进程。
// manager.js const cluster = require('cluster'); class Manager { constructor() { this.__sharedMemory__ = { set(key, value) { this.memory[key] = value; }, get(key) { return this.memory[key]; }, remove(key) { delete this.memory[key]; }, memory: {}, }; // Listen the messages from worker processes. cluster.on('online', (worker) => { worker.on('message', (data) => { this.handle(data, worker); return false; }); }); } handle(data, target) { const args = data.value ? [data.key, data.value] : [data.key]; this[data.method](...args).then((value) => { const msg = { id: data.id, // workerId uuid: data.uuid, // communicationID value, }; target.send(msg); }); } set(key, value) { return new Promise((resolve) => { this.__sharedMemory__.set(key, value); resolve('OK'); }); } get(key) { return new Promise((resolve) => { resolve(this.__sharedMemory__.get(key)); }); } remove(key) { return new Promise((resolve) => { this.__sharedMemory__.remove(key); resolve('OK'); }); } }
Worker
自对象创建开始就使用process.on
监听来自Master进程的返回消息(毕竟不能等消息发送出去以后再监听吧,那就来不及了)。至于__getCallbacks__
对象的作用一会儿再说。此时Worker
对象便创建完成。
之后项目运行到某个地方的时候,如果要访问共享内存,就会调用Worker
的set
、get
、remove
函数,它们又会调用handle
函数将消息通过process.send
发送到master进程,同时,将得到返回结果时要进行的操作记录在__getCallbacks__
中。当结果返回时,会被之前在process.on
中的函数监听到,并从__getCallbacks__
中取出对应的回调函数,并执行。
因为访问共享内存的过程中会经过IPC,所以必定是异步操作,所以需要记录回调函数,不能实现成同步的方式,不然会阻塞原本的任务。
// worker.js const cluster = require('cluster'); const { v4: uuid4 } = require('uuid'); class Worker { constructor() { this.__getCallbacks__ = {}; process.on('message', (data) => { const callback = this.__getCallbacks__[data.uuid]; if (callback && typeof callback === 'function') { callback(data.value); } delete this.__getCallbacks__[data.uuid]; }); } set(key, value) { return new Promise((resolve) => { this.handle('set', key, value, () => { resolve(); }); }); } get(key) { return new Promise((resolve) => { this.handle('get', key, null, (value) => { resolve(value); }); }); } remove(key) { return new Promise((resolve) => { this.handle('remove', key, null, () => { resolve(); }); }); } handle(method, key, value, callback) { const uuid = uuid4(); // 每次通信的uuid process.send({ id: cluster.worker.id, method, uuid, key, value, }); this.__getCallbacks__[uuid] = callback; } }
一次共享内存访问的完整流程是:调用Worker
的set
/get
/remove
函数 -> 调用Worker
的handle
函数,向master进程通信并将回调函数记录在__getCallbacks__
-> master进程监听到来自worker进程的消息 -> 调用Manager
的handle
函数 -> 调用Manager
的set
/get
/remove
函数 -> 调用__sharedMemory__
的set
/get
/remove
函数 -> 操作完成返回Manager
的set
/get
/remove
函数 -> 操作完成返回handle
函数 -> 向worker进程发送通信消息 -> worker进程监听到来自master进程的消息 -> 从__getCallbacks__
中取出回调函数并执行。
互斥访问
到目前为止,我们已经实现了读写共享内存,但还没有结束,目前的共享内存是存在严重安全问题的。因为这个共享内存是可以所有进程同时访问的,然而我们并没有考虑并发访问时的时序问题。我们来看下面这个例子:
时间 | 进程A | 进程B | 共享内存中变量x的值 |
---|---|---|---|
t0 | 0 | ||
t1 | 读取x(x=0) | 0 | |
t2 | x1=x+1(x1=1) | 读取x(x=0) | 0 |
t3 | 将x1的值写回x | x2=x+1(x2=1) | 1 |
t4 | 将x2的值写回x | 1 |
进程A和进程B的目的都是将x的值加1,理想情况下最后x的值应该是2,可是最后的结果却是1。这是因为进程B在t3时刻给x的值加1的时候,使用的是t2时刻读取出来的x的值,但此时从全局角度来看,这个值已经过期了,因为t3时刻x最新的值已经被进程A写为了1,可是进程B无法知道进程外部的变化,所以导致了t4时刻最后写回的值又覆盖掉了进程A写回的值,等于是进程A的行为被覆盖掉了。
在多线程、多进程和分布式中并发情况下的数据一致性问题是老大难问题了,这里不再展开讨论。
为了解决上述问题,我们必须实现进程间互斥访问某个对象,来避免同时操作一个对象,从而使进程可以进行原子操作,所谓原子操作就是不可被打断的一小段连续操作,为此需要引入锁的概念。由于读写均以对象为基本单位,因此锁的粒度设置为对象级别。在某一个进程(的某一任务)获取了某个对象的锁之后,其它要获取锁的进程(的任务)会被阻塞,直到锁被归还。而要进行写操作,则必须要先获取对象的锁。这样在获取到锁直到锁被释放的这段时间里,该对象在共享内存中的值不会被其它进程修改,从而导致错误。
在Manager
的__sharedMemory__
中加入locks
属性,用来记录哪个对象的锁被拿走了,lockRequestQueues
属性用来记录被阻塞的任务(正在等待锁的任务)。并增加getLock
函数和releaseLock
函数,用来申请和归还锁,以及handleLockRequest
函数,用来使被阻塞的任务获得锁。在申请锁时,会先将回调函数记录到lockRequestQueues
队尾(因为此时该对象的锁可能已被拿走),然后再调用handleLockRequest
检查当前锁是否被拿走,若锁还在,则让队首的任务获得锁。归还锁时,先将__sharedMemory__.locks
中对应的记录删掉,然后再调用handleLockRequest
让队首的任务获得锁。
// manager.js const { v4: uuid4 } = require('uuid'); class Manager { constructor() { this.__sharedMemory__ = { ... locks: {}, lockRequestQueues: {}, }; } getLock(key) { return new Promise((resolve) => { this.__sharedMemory__.lockRequestQueues[key] = this.__sharedMemory__.lockRequestQueues[key] ?? []; this.__sharedMemory__.lockRequestQueues[key].push(resolve); this.handleLockRequest(key); }); } releaseLock(key, lockId) { return new Promise((resolve) => { if (lockId === this.__sharedMemory__.locks[key]) { delete this.__sharedMemory__.locks[key]; this.handleLockRequest(key); } resolve('OK'); }); } handleLockRequest(key) { return new Promise((resolve) => { if ( !this.__sharedMemory__.locks[key] && this.__sharedMemory__.lockRequestQueues[key]?.length > 0 ) { const callback = this.__sharedMemory__.lockRequestQueues[key].shift(); const lockId = uuid4(); this.__sharedMemory__.locks[key] = lockId; callback(lockId); } resolve(); }); } ... }
在Worker
中,则是增加getLock
和releaseLock
两个函数,行为与get
、set
类似,都是调用handle
函数。
// worker.js class Worker { getLock(key) { return new Promise((resolve) => { this.handle('getLock', key, null, (value) => { resolve(value); }); }); } releaseLock(key, lockId) { return new Promise((resolve) => { this.handle('releaseLock', key, lockId, (value) => { resolve(value); }); }); } ... }
监听对象
有时候我们需要监听某个对象值的变化,在单进程Node应用中这很容易做到,只需要重写对象的set
属性就可以了,然而在多进程共享内存中,对象和监听者都不在一个进程中,这只能依赖Manager
的实现。这里,我们选择了经典的观察者模式来实现监听共享内存中的对象。
为此,我们先在__sharedMemory__
中加入listeners
属性,用来记录在对象值发生变化时监听者注册的回调函数。然后增加listen
函数,其将监听回调函数记录到__sharedMemory__.listeners
中,这个监听回调函数会将变化的值发送给对应的worker进程。最后,在set
和remove
函数返回前调用notifyListener
,将所有记录在__sharedMemory__.listeners
中监听该对象的所有函数取出并调用。
// manager.js class Manager { constructor() { this.__sharedMemory__ = { ... listeners: {}, }; } handle(data, target) { if (data.method === 'listen') { this.listen(data.key, (value) => { const msg = { isNotified: true, id: data.id, uuid: data.uuid, value, }; target.send(msg); }); } else { ... } } notifyListener(key) { const listeners = this.__sharedMemory__.listeners[key]; if (listeners?.length > 0) { Promise.all( listeners.map( (callback) => new Promise((resolve) => { callback(this.__sharedMemory__.get(key)); resolve(); }) ) ); } } set(key, value) { return new Promise((resolve) => { this.__sharedMemory__.set(key, value); this.notifyListener(key); resolve('OK'); }); } remove(key) { return new Promise((resolve) => { this.__sharedMemory__.remove(key); this.notifyListener(key); resolve('OK'); }); } listen(key, callback) { if (typeof callback === 'function') { this.__sharedMemory__.listeners[key] = this.__sharedMemory__.listeners[key] ?? []; this.__sharedMemory__.listeners[key].push(callback); } else { throw new Error('a listener must have a callback.'); } } ... }
在Worker
中由于监听操作与其它操作不一样,它是一次注册监听回调函数之后对象的值每次变化都会被通知,因此需要在增加一个__getListenerCallbacks__
属性用来记录监听操作的回调函数,与__getCallbacks__
不同,它里面的函数在收到master的回信之后不会删除。
// worker.js class Worker { constructor() { ... this.__getListenerCallbacks__ = {}; process.on('message', (data) => { if (data.isNotified) { const callback = this.__getListenerCallbacks__[data.uuid]; if (callback && typeof callback === 'function') { callback(data.value); } } else { ... } }); } handle(method, key, value, callback) { ... if (method === 'listen') { this.__getListenerCallbacks__[uuid] = callback; } else { this.__getCallbacks__[uuid] = callback; } } listen(key, callback) { if (typeof callback === 'function') { this.handle('listen', key, null, callback); } else { throw new Error('a listener must have a callback.'); } } ... }
LRU缓存
有时候我们需要用用内存作为缓存,但多进程中各进程的内存空间独立,不能共享,因此也需要用到共享内存。但是如果用共享内存中的一个对象作为缓存的话,由于每次IPC都需要传输整个缓存对象,会导致缓存对象不能太大(否则序列化和反序列化耗时太长),而且由于写缓存对象的操作需要加锁,进一步影响了性能,而原本我们使用缓存就是为了加快访问速度。其实在使用缓存的时候通常不会做复杂操作,大多数时候也不需要保障一致性,因此我们可以在Manager
再增加一个共享内存__sharedLRUMemory__
,其为一个lru-cache
实例,并增加getLRU
、setLRU
、removeLRU
函数,与set
、get
、remove
函数类似。
// manager.js const LRU = require('lru-cache'); class Manager { constructor() { ... this.defaultLRUOptions = { max: 10000, maxAge: 1000 * 60 * 5 }; this.__sharedLRUMemory__ = new LRU(this.defaultLRUOptions); } getLRU(key) { return new Promise((resolve) => { resolve(this.__sharedLRUMemory__.get(key)); }); } setLRU(key, value) { return new Promise((resolve) => { this.__sharedLRUMemory__.set(key, value); resolve('OK'); }); } removeLRU(key) { return new Promise((resolve) => { this.__sharedLRUMemory__.del(key); resolve('OK'); }); } ... }
Worker
中也增加getLRU
、setLRU
、removeLRU
函数。
// worker.js class Worker { getLRU(key) { return new Promise((resolve) => { this.handle('getLRU', key, null, (value) => { resolve(value); }); }); } setLRU(key, value) { return new Promise((resolve) => { this.handle('setLRU', key, value, () => { resolve(); }); }); } removeLRU(key) { return new Promise((resolve) => { this.handle('removeLRU', key, null, () => { resolve(); }); }); } ... }
共享内存的使用方式
目前共享内存的实现已发到npm仓库(文档和源代码在Github仓库,欢迎pull request和报bug),可以直接通过npm安装:
npm i cluster-shared-memory
下面的示例包含了基本使用方法:
const cluster = require('cluster'); // 引入模块时会根据当前进程 master 进程还是 worker 进程自动创建对应的 SharedMemory 对象 require('cluster-shared-memory'); if (cluster.isMaster) { // 在 master 进程中 fork 子进程 for (let i = 0; i < 2; i++) { cluster.fork(); } } else { const sharedMemoryController = require('./src/shared-memory'); const obj = { name: 'Tom', age: 10, }; // 写对象 await sharedMemoryController.set('myObj', obj); // 读对象 const myObj = await sharedMemoryController.get('myObj'); // 互斥访问对象,首先获得对象的锁 const lockId = await sharedMemoryController.getLock('myObj'); const newObj = await sharedMemoryController.get('myObj'); newObj.age = newObj.age + 1; await sharedMemoryController.set('myObj', newObj); // 操作完之后释放锁 await sharedMemoryController.releaseLock('requestTimes', lockId); // 或者使用 mutex 函数自动获取和释放锁 await sharedMemoryController.mutex('myObj', async () => { const newObjM = await sharedMemoryController.get('myObj'); newObjM.age = newObjM.age + 1; await sharedMemoryController.set('myObj', newObjM); }); // 监听对象 sharedMemoryController.listen('myObj', (value) => { console.log(`myObj: ${value}`); }); //写LRU缓存 await sharedMemoryController.setLRU('cacheItem', {user: 'Tom'}); // 读对象 const cacheItem = await sharedMemoryController.getLRU('cacheItem'); }
缺点
这种实现目前尚有几个缺点:
-
不能使用PM2的自动创建worker进程的功能。
由于PM2会使用自己的
cluster
模块的master进程的实现,而我们的共享内存模块需要在master进程维护一个内存空间,则不能使用PM2的实现,因此不能使用PM2的自动创建worker进程的功能。
-
传输的对象必须可序列化,且不能太大。
-
如果使用者在获取锁之后忘记释放,会导致其它进程一直被阻塞,这要求程序员有良好的代码习惯。
原文地址:https://juejin.cn/post/6992091006220894215
作者:FinalZJY