Node.js 中的流(Stream)是出了名的难用甚至是难以理解。【视频教程推荐:nodejs视频教程 】
用 Dominic Tarr 的话来说:“流是 Node 中最好的,也是最容易被误解的想法。”即使是 Redux 的创建者和 React.js 的核心团队成员 Dan Abramov 也害怕 Node 流。
本文将帮助你了解流以及如何使用。不要害怕,你完全可以把它搞清楚!
什么是流(Stream)?
流(Stream)是为 Node.js 应用提供动力的基本概念之一。它们是数据处理方法,用于将输入的数据顺序读取或把数据写入输出。
流是一种以有效方式处理读写文件、网络通信或任何类型的端到端信息交换的方式。
流的处理方式非常独特,流不是像传统方式那样将文件一次全部读取到存储器中,而是逐段读取数据块并处理数据的内容,不将其全部保留在内存中。
这种方式使流在处理大量数据时非常强大,例如,文件的大小可能大于可用的内存空间,从而无法将整个文件读入内存进行处理。那是流的用武之地!
既能用流来处理较小的数据块,也可以读取较大的文件。
以 YouTube 或 Netflix 之类的“流媒体”服务为例:这些服务不会让你你立即下载视频和音频文件。取而代之的是,你的浏览器以连续的块流形式接收视频,从而使接收者几乎可以立即开始观看和收听。
但是,流不仅涉及处理媒体和大数据。它们还在代码中赋予了我们“可组合性”的力量。考虑可组合性的设计意味着能够以某种方式组合多个组件以产生相同类型的结果。在 Node.js 中,可以通过流在其他较小的代码段中传递数据,从而组成功能强大的代码段。
为什么使用流?
与其他数据处理方法相比,流基本上具有两个主要优点:
- 内存效率:你无需事先把大量数据加载到内存中即可进行处理
- 时间效率:得到数据后立即开始处所需的时间大大减少,不必等到整个有效数据全部发送完毕才开始处理
Node.js 中有 4 种流:
- 可写流:可以向其中写入数据的流。例如,
fs.createWriteStream()
使我们可以使用流将数据写入文件。 - 可读流:可从中读取数据的流。例如:
fs.createReadStream()
让我们读取文件的内容。 - 双工流(可读写的流):可读和可写的流。例如,
net.Socket
- Transform:可在写入和读取时修改或转换数据。例如在文件压缩的情况下,你可以在文件中写入压缩数据,也可以从文件中读取解压缩的数据。
如果你已经使用过 Node.js,则可能遇到过流。例如在基于 Node.js 的 HTTP 服务器中,request
是可读流,而 response
是可写流。你可能用过 fs
模块,该模块可让你用可读和可写文件流。每当使用 Express 时,你都在使用流与客户端进行交互,而且由于 TCP 套接字、TLS栈和其他连接都基于 Node.js,所以在每个可以使用的数据库连接驱动的程序中使用流。
实例
如何创建可读流?
首先需要可读性流,然后将其初始化。
const Stream = require('stream') const readableStream = new Stream.Readable()
现在,流已初始化,可以向其发送数据了:
readableStream.push('ping!') readableStream.push('pong!')
异步迭代器
强烈建议在使用流时配合异步迭代器(async iterator)。根据 Axel Rauschmayer 博士的说法,异步迭代是一种用于异步检索数据容器内容的协议(这意味着当前“任务”可以在检索项目之前被暂停)。另外必须提及的是,流异步迭代器实现使用内部的 readable
事件。
从可读流中读取时,可以使用异步迭代器:
import * as fs from 'fs'; async function logChunks(readable) { for await (const chunk of readable) { console.log(chunk); } } const readable = fs.createReadStream( 'tmp/test.txt', {encoding: 'utf8'}); logChunks(readable); // Output: // 'This is a test!n'
也可以用字符串收集可读流的内容:
import {Readable} from 'stream'; async function readableToString2(readable) { let result = ''; for await (const chunk of readable) { result += chunk; } return result; } const readable = Readable.from('Good morning!', {encoding: 'utf8'}); assert.equal(await readableToString2(readable), 'Good morning!');
注意,在这种情况下必须使用异步函数,因为我们想返回 Promise。
请切记不要将异步功能与 EventEmitter
混合使用,因为当前在事件处理程序中发出拒绝时,无法捕获拒绝,从而导致难以跟踪错误和内存泄漏。目前的最佳实践是始终将异步函数的内容包装在 try/catch 块中并处理错误,但这很容易出错。 这个 pull request 旨在解决一旦其落在 Node 核心上产生的问题。
要了解有关异步迭代的 Node.js 流的