站长资讯网
最全最丰富的资讯网站

浅谈Node.js多进程模型中如何实现共享内存(代码详解)

本篇文章和大家探讨一下Node.js利用多个核心的方法–worker_threads模块提供的多线程模型,介绍一下Node.js多进程模型中实现共享内存的方法。

浅谈Node.js多进程模型中如何实现共享内存(代码详解)

Node.js 由于其单线程模型的设计,导致一个Node进程(的主线程)只能利用一个CPU核心,然而现在的机器基本上都是多核的,这造成了严重的性能浪费。通常来说,想要利用到多个核心一般有以下的方法:

  • 编写Node的C++插件扩充线程池,并在JS代码中将CPU耗时任务委托给其它线程处理。

  • 使用worker_threads模块提供的多线程模型(尚在实验阶段)。

  • 使用child_process 或者 cluster模块提供的多进程模型,每个进程都是一个独立的Node.js进程。

从易用、代码入侵性、稳定性的角度来说,多进程模型通常是首要的选择。【推荐学习:《nodejs 教程》】

Node.js cluster 多进程模型存在的问题

在cluster模块提供的多进程模型中,每个Node进程都是一个独立且完整的应用进程,有自己的内存空间,其它进程无法访问。因此虽然在项目启动时,所有Worker进程具有一致的状态和行为,但在之后的运行中无法保证其状态维持一致

例如,项目启动时有两个Worker进程,进程A和进程B,两个进程都声明了变量a=1。但之后项目接收到一个请求,Master进程将其分派给进程A来处理,这个请求将a的值变更为了2,那么此时进程A的内存空间中a=2,但是进程B的内存空间中a依旧是1。此时如果有个请求读取a的值,Master进程将这个请求分派给进程A和进程B时读取到的结果是不一致的,这就出现了一致性问题。

cluster模块在设计时并没有给出解决方案,而是要求Worker进程是无状态的,即程序员在写代码时不应该允许在处理请求时修改内存中的值,以此来保障所有Worker进程的一致性。然而在实践中总会有各种各样的情况需要写内存,比如记录用户的登录状态等,在许多企业的实践中,通常会把这些状态数据记录在外部,例如数据库、redis、消息队列、文件系统等,每次处理有状态请求时会读写外部存储空间。

这不失为一种有效的做法,然而这需要额外引入一个外部存储空间,同时还要自行处理多进程并发访问下的一致性问题,自行维护数据的生命周期(因为Node进程和维护在外部的数据并不是同步创建和销毁的),以及在高并发访问情况下的IO性能瓶颈(如果是存储在数据库等非内存环境中)。其实本质上来说,我们只是需要一个可供多个进程共享访问的空间罢了,并不需要持久化存储,这段空间的生命周期最好与Node进程强绑定,这样在使用时能省去不少麻烦。因此跨进程的共享内存就成了最适合在这种场景使用的方式。

Node.js 的共享内存

很遗憾Node本身并未提供共享内存的实现,因此我们可以看看npm仓库中第三方库的实现。这些库有些是通过C++插件扩充Node的函数实现的,有些是通过Node提供的IPC机制实现的,但很遗憾它们的实现都很简单,并未提供互斥访问、对象监听等功能,这使得使用者必须自己小心维护这段共享内存,否则就会导致时序问题。

转了一圈下来没找到我想要的。。。那就算了,我自己写一个。

共享内存的设计

首先我们必须理清楚到底需要个什么样的共享内存,我是根据我自身的需求出发(为了在项目中用它来存储跨进程访问的状态数据),同时兼顾通用性,因此会首先考虑以下几点:

  • 以JS对象为基本单位进行读写访问。

  • 能够进程间互斥访问,一个进程访问时,其它进程被阻塞。

  • 能够监听共享内存中的对象,当对象发生变化的时候监听的进程能被通知到。

  • 在满足上述条件的前提下,实现方式尽可能简单。

可以发现,其实我们并不需要操作系统层面的共享内存,只需要能够多个Node进程能访问同一个对象就行了,那么就可以在Node本身提供的机制上实现。可以使用Master进程的一段内存空间作为共享内存空间,Worker进程通过IPC将读写请求委托给Master进程,由Master进程进行读写,然后再通过IPC将结果返回给Worker进程。

为了让共享内存的使用方式在Master进程和Worker进程中一致,我们可以将对共享内存的操作抽离成一个接口,在Master进程和Worker进程中各自实现这个接口。类图如下图所示,用一个SharedMemory类作为抽象接口,在server.js入口文件中声明该对象。其在Master进程中实例化为Manager对象,在Worker进程中实例化为Worker对象。Manager对象来维护共享内存,并处理对共享内存的读写请求,而Worker对象则将读写请求发送到Master进程。

浅谈Node.js多进程模型中如何实现共享内存(代码详解)

可以使用Manager类中的一个属性作为共享内存对象,访问该对象的方式与访问普通JS对象的方式一致,然后再做一层封装,只暴露getsetremove等基本操作,避免该属性直接被修改。

由于Master进程会优先于所有Worker进程创建,因此,可以在Master进程中声明共享内存空间之后再创建Worker进程,以此来保证每个Worker进程创建后都可以立即访问共享内存。

为了使用简单,我们可以将SharedMemory设计成单例,这样每个进程中就只有一个实例,并可以在importSharedMemory之后直接使用。

代码实现

读写控制与IPC通信

首先实现对外接口SharedMemory类,这里没有使用让ManagerWorker继承SharedMemory的方式,而是让SharedMemory在实例化的时候返回一个ManagerWorker的实例,从而实现自动选择子类。

在Node 16中isPrimary替代了isMaster,这里为了兼容使用了两种写法。

// shared-memory.js class SharedMemory {   constructor() {     if (cluster.isMaster || cluster.isPrimary) {       return new Manager();     } else {       return new Worker();     }   } }

Manager负责管理共享内存空间,我们直接在Manager对象中增加__sharedMemory__属性,由于其本身也是JS对象,会被纳入JS的垃圾回收管理中,因此我们不需要进行内存清理、数据迁移等操作,使得实现上非常简洁。之后在__sharedMemory__之中定义setgetremove等标准操作来提供访问方式。

我们通过cluster.on('online', callback)来监听worker进程的创建事件,并在创建后立即用worker.on('message', callback)来监听来自worker进程的IPC通信,并把通信消息交给handle函数处理。

handle函数的职责是区分worker进程是想进行哪种操作,并取出操作的参数委托给对应的setgetremove函数(注意不是__sharedMemory__中的setgetremove)进行处理,并将处理后的结果返还给worker进程。

// manager.js const cluster = require('cluster');  class Manager {   constructor() {     this.__sharedMemory__ = {       set(key, value) {         this.memory[key] = value;       },       get(key) {         return this.memory[key];       },       remove(key) {         delete this.memory[key];       },       memory: {},     };      // Listen the messages from worker processes.     cluster.on('online', (worker) => {       worker.on('message', (data) => {         this.handle(data, worker);         return false;       });     });   }    handle(data, target) {     const args = data.value ? [data.key, data.value] : [data.key];     this[data.method](...args).then((value) => {       const msg = {         id: data.id, // workerId         uuid: data.uuid, // communicationID         value,       };       target.send(msg);     });   }    set(key, value) {     return new Promise((resolve) => {       this.__sharedMemory__.set(key, value);       resolve('OK');     });   }    get(key) {     return new Promise((resolve) => {       resolve(this.__sharedMemory__.get(key));     });   }    remove(key) {     return new Promise((resolve) => {       this.__sharedMemory__.remove(key);       resolve('OK');     });   } }

Worker自对象创建开始就使用process.on监听来自Master进程的返回消息(毕竟不能等消息发送出去以后再监听吧,那就来不及了)。至于__getCallbacks__对象的作用一会儿再说。此时Worker对象便创建完成。

之后项目运行到某个地方的时候,如果要访问共享内存,就会调用Workersetgetremove函数,它们又会调用handle函数将消息通过process.send发送到master进程,同时,将得到返回结果时要进行的操作记录在__getCallbacks__中。当结果返回时,会被之前在process.on中的函数监听到,并从__getCallbacks__中取出对应的回调函数,并执行。

因为访问共享内存的过程中会经过IPC,所以必定是异步操作,所以需要记录回调函数,不能实现成同步的方式,不然会阻塞原本的任务。

// worker.js const cluster = require('cluster'); const { v4: uuid4 } = require('uuid');  class Worker {   constructor() {     this.__getCallbacks__ = {};      process.on('message', (data) => {       const callback = this.__getCallbacks__[data.uuid];       if (callback && typeof callback === 'function') {         callback(data.value);       }       delete this.__getCallbacks__[data.uuid];     });   }    set(key, value) {     return new Promise((resolve) => {       this.handle('set', key, value, () => {         resolve();       });     });   }    get(key) {     return new Promise((resolve) => {       this.handle('get', key, null, (value) => {         resolve(value);       });     });   }    remove(key) {     return new Promise((resolve) => {       this.handle('remove', key, null, () => {         resolve();       });     });   }    handle(method, key, value, callback) {     const uuid = uuid4(); // 每次通信的uuid     process.send({       id: cluster.worker.id,       method,       uuid,       key,       value,     });     this.__getCallbacks__[uuid] = callback;   } }

一次共享内存访问的完整流程是:调用Workerset/get/remove函数 -> 调用Workerhandle函数,向master进程通信并将回调函数记录在__getCallbacks__ -> master进程监听到来自worker进程的消息 -> 调用Managerhandle函数 -> 调用Managerset/get/remove函数 -> 调用__sharedMemory__set/get/remove函数 -> 操作完成返回Managerset/get/remove函数 -> 操作完成返回handle函数 -> 向worker进程发送通信消息 -> worker进程监听到来自master进程的消息 -> 从__getCallbacks__中取出回调函数并执行。

互斥访问

到目前为止,我们已经实现了读写共享内存,但还没有结束,目前的共享内存是存在严重安全问题的。因为这个共享内存是可以所有进程同时访问的,然而我们并没有考虑并发访问时的时序问题。我们来看下面这个例子:

时间 进程A 进程B 共享内存中变量x的值
t0 0
t1 读取x(x=0) 0
t2 x1=x+1(x1=1) 读取x(x=0) 0
t3 将x1的值写回x x2=x+1(x2=1) 1
t4 将x2的值写回x 1

进程A和进程B的目的都是将x的值加1,理想情况下最后x的值应该是2,可是最后的结果却是1。这是因为进程B在t3时刻给x的值加1的时候,使用的是t2时刻读取出来的x的值,但此时从全局角度来看,这个值已经过期了,因为t3时刻x最新的值已经被进程A写为了1,可是进程B无法知道进程外部的变化,所以导致了t4时刻最后写回的值又覆盖掉了进程A写回的值,等于是进程A的行为被覆盖掉了。

在多线程、多进程和分布式中并发情况下的数据一致性问题是老大难问题了,这里不再展开讨论。

为了解决上述问题,我们必须实现进程间互斥访问某个对象,来避免同时操作一个对象,从而使进程可以进行原子操作,所谓原子操作就是不可被打断的一小段连续操作,为此需要引入锁的概念。由于读写均以对象为基本单位,因此锁的粒度设置为对象级别。在某一个进程(的某一任务)获取了某个对象的锁之后,其它要获取锁的进程(的任务)会被阻塞,直到锁被归还。而要进行写操作,则必须要先获取对象的锁。这样在获取到锁直到锁被释放的这段时间里,该对象在共享内存中的值不会被其它进程修改,从而导致错误。

Manager__sharedMemory__中加入locks属性,用来记录哪个对象的锁被拿走了,lockRequestQueues属性用来记录被阻塞的任务(正在等待锁的任务)。并增加getLock函数和releaseLock函数,用来申请和归还锁,以及handleLockRequest函数,用来使被阻塞的任务获得锁。在申请锁时,会先将回调函数记录到lockRequestQueues队尾(因为此时该对象的锁可能已被拿走),然后再调用handleLockRequest检查当前锁是否被拿走,若锁还在,则让队首的任务获得锁。归还锁时,先将__sharedMemory__.locks中对应的记录删掉,然后再调用handleLockRequest让队首的任务获得锁。

// manager.js const { v4: uuid4 } = require('uuid');  class Manager {   constructor() {     this.__sharedMemory__ = {       ...       locks: {},       lockRequestQueues: {},     };   }    getLock(key) {     return new Promise((resolve) => {       this.__sharedMemory__.lockRequestQueues[key] =         this.__sharedMemory__.lockRequestQueues[key] ?? [];       this.__sharedMemory__.lockRequestQueues[key].push(resolve);       this.handleLockRequest(key);     });   }    releaseLock(key, lockId) {     return new Promise((resolve) => {       if (lockId === this.__sharedMemory__.locks[key]) {         delete this.__sharedMemory__.locks[key];         this.handleLockRequest(key);       }       resolve('OK');     });   }    handleLockRequest(key) {     return new Promise((resolve) => {       if (         !this.__sharedMemory__.locks[key] &&         this.__sharedMemory__.lockRequestQueues[key]?.length > 0       ) {         const callback = this.__sharedMemory__.lockRequestQueues[key].shift();         const lockId = uuid4();         this.__sharedMemory__.locks[key] = lockId;         callback(lockId);       }       resolve();     });   }   ... }

Worker中,则是增加getLockreleaseLock两个函数,行为与getset类似,都是调用handle函数。

// worker.js class Worker {   getLock(key) {     return new Promise((resolve) => {       this.handle('getLock', key, null, (value) => {         resolve(value);       });     });   }    releaseLock(key, lockId) {     return new Promise((resolve) => {       this.handle('releaseLock', key, lockId, (value) => {         resolve(value);       });     });   }   ... }

监听对象

有时候我们需要监听某个对象值的变化,在单进程Node应用中这很容易做到,只需要重写对象的set属性就可以了,然而在多进程共享内存中,对象和监听者都不在一个进程中,这只能依赖Manager的实现。这里,我们选择了经典的观察者模式来实现监听共享内存中的对象。

浅谈Node.js多进程模型中如何实现共享内存(代码详解)

为此,我们先在__sharedMemory__中加入listeners属性,用来记录在对象值发生变化时监听者注册的回调函数。然后增加listen函数,其将监听回调函数记录到__sharedMemory__.listeners中,这个监听回调函数会将变化的值发送给对应的worker进程。最后,在setremove函数返回前调用notifyListener,将所有记录在__sharedMemory__.listeners中监听该对象的所有函数取出并调用。

// manager.js class Manager {   constructor() {     this.__sharedMemory__ = {       ...       listeners: {},     };   }    handle(data, target) {     if (data.method === 'listen') {       this.listen(data.key, (value) => {         const msg = {           isNotified: true,           id: data.id,           uuid: data.uuid,           value,         };         target.send(msg);       });     } else {       ...     }   }    notifyListener(key) {     const listeners = this.__sharedMemory__.listeners[key];     if (listeners?.length > 0) {       Promise.all(         listeners.map(           (callback) =>             new Promise((resolve) => {               callback(this.__sharedMemory__.get(key));               resolve();             })         )       );     }   }    set(key, value) {     return new Promise((resolve) => {       this.__sharedMemory__.set(key, value);       this.notifyListener(key);       resolve('OK');     });   }    remove(key) {     return new Promise((resolve) => {       this.__sharedMemory__.remove(key);       this.notifyListener(key);       resolve('OK');     });   }    listen(key, callback) {     if (typeof callback === 'function') {       this.__sharedMemory__.listeners[key] =         this.__sharedMemory__.listeners[key] ?? [];       this.__sharedMemory__.listeners[key].push(callback);     } else {       throw new Error('a listener must have a callback.');     }   }   ... }

Worker中由于监听操作与其它操作不一样,它是一次注册监听回调函数之后对象的值每次变化都会被通知,因此需要在增加一个__getListenerCallbacks__属性用来记录监听操作的回调函数,与__getCallbacks__不同,它里面的函数在收到master的回信之后不会删除。

// worker.js class Worker {   constructor() {     ...     this.__getListenerCallbacks__ = {};      process.on('message', (data) => {       if (data.isNotified) {         const callback = this.__getListenerCallbacks__[data.uuid];         if (callback && typeof callback === 'function') {           callback(data.value);         }       } else {         ...       }     });   }    handle(method, key, value, callback) {     ...     if (method === 'listen') {       this.__getListenerCallbacks__[uuid] = callback;     } else {       this.__getCallbacks__[uuid] = callback;     }   }    listen(key, callback) {     if (typeof callback === 'function') {       this.handle('listen', key, null, callback);     } else {       throw new Error('a listener must have a callback.');     }   }   ... }

LRU缓存

有时候我们需要用用内存作为缓存,但多进程中各进程的内存空间独立,不能共享,因此也需要用到共享内存。但是如果用共享内存中的一个对象作为缓存的话,由于每次IPC都需要传输整个缓存对象,会导致缓存对象不能太大(否则序列化和反序列化耗时太长),而且由于写缓存对象的操作需要加锁,进一步影响了性能,而原本我们使用缓存就是为了加快访问速度。其实在使用缓存的时候通常不会做复杂操作,大多数时候也不需要保障一致性,因此我们可以在Manager再增加一个共享内存__sharedLRUMemory__,其为一个lru-cache实例,并增加getLRUsetLRUremoveLRU函数,与setgetremove函数类似。

// manager.js const LRU = require('lru-cache');  class Manager {   constructor() {     ...     this.defaultLRUOptions = { max: 10000, maxAge: 1000 * 60 * 5 };     this.__sharedLRUMemory__ = new LRU(this.defaultLRUOptions);   }    getLRU(key) {     return new Promise((resolve) => {       resolve(this.__sharedLRUMemory__.get(key));     });   }    setLRU(key, value) {     return new Promise((resolve) => {       this.__sharedLRUMemory__.set(key, value);       resolve('OK');     });   }    removeLRU(key) {     return new Promise((resolve) => {       this.__sharedLRUMemory__.del(key);       resolve('OK');     });   }   ... }

Worker中也增加getLRUsetLRUremoveLRU函数。

// worker.js class Worker {   getLRU(key) {     return new Promise((resolve) => {       this.handle('getLRU', key, null, (value) => {         resolve(value);       });     });   }    setLRU(key, value) {     return new Promise((resolve) => {       this.handle('setLRU', key, value, () => {         resolve();       });     });   }    removeLRU(key) {     return new Promise((resolve) => {       this.handle('removeLRU', key, null, () => {         resolve();       });     });   }   ... }

共享内存的使用方式

目前共享内存的实现已发到npm仓库(文档和源代码在Github仓库,欢迎pull request和报bug),可以直接通过npm安装:

npm i cluster-shared-memory

下面的示例包含了基本使用方法:

const cluster = require('cluster'); // 引入模块时会根据当前进程 master 进程还是 worker 进程自动创建对应的 SharedMemory 对象 require('cluster-shared-memory');  if (cluster.isMaster) {   // 在 master 进程中 fork 子进程   for (let i = 0; i < 2; i++) {     cluster.fork();   } } else {   const sharedMemoryController = require('./src/shared-memory');   const obj = {     name: 'Tom',     age: 10,   };      // 写对象   await sharedMemoryController.set('myObj', obj);      // 读对象   const myObj = await sharedMemoryController.get('myObj');      // 互斥访问对象,首先获得对象的锁   const lockId = await sharedMemoryController.getLock('myObj');   const newObj = await sharedMemoryController.get('myObj');   newObj.age = newObj.age + 1;   await sharedMemoryController.set('myObj', newObj);   // 操作完之后释放锁   await sharedMemoryController.releaseLock('requestTimes', lockId);      // 或者使用 mutex 函数自动获取和释放锁   await sharedMemoryController.mutex('myObj', async () => {     const newObjM = await sharedMemoryController.get('myObj');     newObjM.age = newObjM.age + 1;     await sharedMemoryController.set('myObj', newObjM);   });      // 监听对象   sharedMemoryController.listen('myObj', (value) => {     console.log(`myObj: ${value}`);   });      //写LRU缓存   await sharedMemoryController.setLRU('cacheItem', {user: 'Tom'});      // 读对象   const cacheItem = await sharedMemoryController.getLRU('cacheItem'); }

缺点

这种实现目前尚有几个缺点:

  • 不能使用PM2的自动创建worker进程的功能。

由于PM2会使用自己的cluster模块的master进程的实现,而我们的共享内存模块需要在master进程维护一个内存空间,则不能使用PM2的实现,因此不能使用PM2的自动创建worker进程的功能。

  • 传输的对象必须可序列化,且不能太大。

  • 如果使用者在获取锁之后忘记释放,会导致其它进程一直被阻塞,这要求程序员有良好的代码习惯。

原文地址:https://juejin.cn/post/6992091006220894215

作者:FinalZJY

赞(0)
分享到: 更多 (0)
网站地图   沪ICP备18035694号-2    沪公网安备31011702889846号