本篇文章带大家解读一下Node.js流源码,深入了解下Node可读流,看看其基本原理、使用方法与工作机制,希望对大家有所帮助!
1. 基本概念
1.1. 流的历史演变
流不是 Nodejs 特有的概念。 它们是几十年前在 Unix 操作系统中引入的,程序可以通过管道运算符(|)对流进行相互交互。
在基于Unix系统的MacOS以及Linux中都可以使用管道运算符(|),他可以将运算符左侧进程的输出转换成右侧的输入。
在Node中,我们使用传统的readFile去读取文件的话,会将文件从头到尾都读到内存中,当所有内容都被读取完毕之后才会对加载到内存中的文件内容进行统一处理。
这样做会有两个缺点:
-
内存方面:占用大量内存
-
时间方面:需要等待数据的整个有效负载都加载完才会开始处理数据
为了解决上述问题,Node.js效仿并实现了流的概念,在Node.js流中,一共有四种类型的流,他们都是Node.js中EventEmitter的实例:
-
可读流(Readable Stream)
-
可写流(Writable Stream)
-
可读可写全双工流(Duplex Stream)
-
转换流(Transform Stream)
为了深入学习这部分的内容,循序渐进的理解Node.js中流的概念,并且由于源码部分较为复杂,本人决定先从可读流开始学习这部分内容。
1.2. 什么是流(Stream)
流是一种抽象的数据结构,是数据的集合,其中存储的数据类型只能为以下类型(仅针对objectMode === false的情况):
- string
- Buffer
我们可以把流看作这些数据的集合,就像液体一样,我们先把这些液体保存在一个容器里(流的内部缓冲区BufferList),等到相应的事件触发的时候,我们再把里面的液体倒进管道里,并通知其他人在管道的另一侧拿自己的容器来接里面的液体进行处理。
1.3. 什么是可读流(Readable Stream)
可读流是流的一种类型,他有两种模式三种状态
两种读取模式:
-
流动模式:数据会从底层系统读取,并通过EventEmitter尽快的将数据传递给所注册的事件处理程序中
-
暂停模式:在这种模式下将不会读取数据,必须显示的调用Stream.read()方法来从流中读取数据
三种状态:
-
readableFlowing === null:不会产生数据,调用Stream.pipe()、Stream.resume会使其状态变为true,开始产生数据并主动触发事件
-
readableFlowing === false:此时会暂停数据的流动,但不会暂停数据的生成,因此会产生数据积压
-
readableFlowing === true:正常产生和消耗数据
2. 基本原理
2.1. 内部状态定义(ReadableState)
ReadableState
_readableState: ReadableState { objectMode: false, // 操作除了string、Buffer、null之外的其他类型的数据需要把这个模式打开 highWaterMark: 16384, // 水位限制,1024 * 16,默认16kb,超过这个限制则会停止调用_read()读数据到buffer中 buffer: BufferList { head: null, tail: null, length: 0 }, // Buffer链表,用于保存数据 length: 0, // 整个可读流数据的大小,如果是objectMode则与buffer.length相等 pipes: [], // 保存监听了该可读流的所有管道队列 flowing: null, // 可独流的状态 null、false、true ended: false, // 所有数据消费完毕 endEmitted: false, // 结束事件收否已发送 reading: false, // 是否正在读取数据 constructed: true, // 流在构造好之前或者失败之前,不能被销毁 sync: true, // 是否同步触发'readable'/'data'事件,或是等到下一个tick needReadable: false, // 是否需要发送readable事件 emittedReadable: false, // readable事件发送完毕 readableListening: false, // 是否有readable监听事件 resumeScheduled: false, // 是否调用过resume方法 errorEmitted: false, // 错误事件已发送 emitClose: true, // 流销毁时,是否发送close事件 autoDestroy: true, // 自动销毁,在'end'事件触发后被调用 destroyed: false, // 流是否已经被销毁 errored: null, // 标识流是否报错 closed: false, // 流是否已经关闭 closeEmitted: false, // close事件是否已发送 defaultEncoding: 'utf8', // 默认字符编码格式 awaitDrainWriters: null, // 指向监听了'drain'事件的writer引用,类型为null、Writable、Set<Writable> multiAwaitDrain: false, // 是否有多个writer等待drain事件 readingMore: false, // 是否可以读取