# Node.js中怎么實現Stream流
## 目錄
1. [Stream基礎概念](#1-stream基礎概念)
- 1.1 [什么是流](#11-什么是流)
- 1.2 [為什么需要流](#12-為什么需要流)
- 1.3 [流與傳統I/O對比](#13-流與傳統io對比)
2. [Node.js中的流類型](#2-nodejs中的流類型)
- 2.1 [可讀流(Readable)](#21-可讀流readable)
- 2.2 [可寫流(Writable)](#22-可寫流writable)
- 2.3 [雙工流(Duplex)](#23-雙工流duplex)
- 2.4 [轉換流(Transform)](#24-轉換流transform)
3. [實現自定義流](#3-實現自定義流)
- 3.1 [實現可讀流](#31-實現可讀流)
- 3.2 [實現可寫流](#32-實現可寫流)
- 3.3 [實現雙工流](#33-實現雙工流)
- 3.4 [實現轉換流](#34-實現轉換流)
4. [流的高級應用](#4-流的高級應用)
- 4.1 [管道(Pipeline)](#41-管道pipeline)
- 4.2 [錯誤處理](#42-錯誤處理)
- 4.3 [性能優化](#43-性能優化)
5. [實戰案例](#5-實戰案例)
- 5.1 [大文件處理](#51-大文件處理)
- 5.2 [HTTP流式傳輸](#52-http流式傳輸)
- 5.3 [實時數據處理](#53-實時數據處理)
6. [常見問題與解決方案](#6-常見問題與解決方案)
7. [總結](#7-總結)
---
## 1. Stream基礎概念
### 1.1 什么是流
流(Stream)是Node.js中處理流式數據的抽象接口。它們是數據的集合——就像數組或字符串一樣,但流的特點是不需要一次性將所有數據加載到內存中,而是可以逐塊處理數據。
```javascript
const fs = require('fs');
// 傳統方式讀取文件
fs.readFile('largefile.txt', (err, data) => {
// 整個文件內容都在內存中
});
// 使用流讀取文件
const readStream = fs.createReadStream('largefile.txt');
readStream.on('data', (chunk) => {
// 每次只處理一小塊數據
});
特性 | 傳統I/O | Stream |
---|---|---|
內存使用 | 高 | 低 |
處理速度 | 慢(等待全部數據) | 快(立即開始) |
適用場景 | 小文件 | 大文件/實時數據 |
可組合性 | 有限 | 高(管道) |
可讀流是數據的來源,例如: - 文件讀取流 - HTTP請求 - 標準輸入(stdin)
const { Readable } = require('stream');
class MyReadable extends Readable {
constructor(options) {
super(options);
this.data = ['a', 'b', 'c'];
this.index = 0;
}
_read() {
if (this.index < this.data.length) {
this.push(this.data[this.index++]);
} else {
this.push(null); // 結束流
}
}
}
const readable = new MyReadable();
readable.on('data', (chunk) => {
console.log(chunk.toString()); // a, b, c
});
可寫流是數據的目標,例如: - 文件寫入流 - HTTP響應 - 標準輸出(stdout)
const { Writable } = require('stream');
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
console.log(`Writing: ${chunk.toString()}`);
callback(); // 通知寫入完成
}
}
const writable = new MyWritable();
writable.write('Hello');
writable.end('World');
雙工流既是可讀的也是可寫的,例如: - TCP socket - zlib流
const { Duplex } = require('stream');
class MyDuplex extends Duplex {
constructor(options) {
super(options);
this.data = [];
}
_write(chunk, encoding, callback) {
this.data.push(chunk);
callback();
}
_read(size) {
if (this.data.length) {
this.push(this.data.shift());
} else {
this.push(null);
}
}
}
const duplex = new MyDuplex();
duplex.on('data', (chunk) => {
console.log(`Received: ${chunk}`);
});
duplex.write('Hello');
轉換流是一種特殊的雙工流,用于轉換數據,例如: - zlib壓縮/解壓 - crypto加密/解密
const { Transform } = require('stream');
class UppercaseTransform extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
}
const transform = new UppercaseTransform();
transform.on('data', (chunk) => {
console.log(chunk.toString()); // HELLO
});
transform.write('hello');
transform.end();
const { Readable } = require('stream');
class CounterStream extends Readable {
constructor(limit, options) {
super(options);
this.limit = limit;
this.count = 1;
}
_read() {
if (this.count <= this.limit) {
this.push(this.count.toString());
this.count++;
} else {
this.push(null); // 結束流
}
}
}
const counter = new CounterStream(5);
counter.pipe(process.stdout); // 輸出: 12345
const { Writable } = require('stream');
class FileWriter extends Writable {
constructor(filename, options) {
super(options);
this.filename = filename;
this.chunks = [];
}
_write(chunk, encoding, callback) {
this.chunks.push(chunk);
fs.appendFile(this.filename, chunk, callback);
}
_final(callback) {
console.log(`All data written to ${this.filename}`);
callback();
}
}
const writer = new FileWriter('output.txt');
writer.write('Hello ');
writer.end('World');
const { Duplex } = require('stream');
class EchoDuplex extends Duplex {
constructor(options) {
super(options);
this.buffer = [];
}
_write(chunk, encoding, callback) {
this.buffer.push(chunk);
callback();
}
_read(size) {
while (this.buffer.length) {
const chunk = this.buffer.shift();
if (!this.push(chunk)) {
break;
}
}
if (this.buffer.length === 0) {
this.push(null);
}
}
}
const echo = new EchoDuplex();
echo.pipe(process.stdout);
echo.write('Hello');
echo.end(' World');
const { Transform } = require('stream');
class JSONParseTransform extends Transform {
constructor(options) {
super(options);
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk;
let boundary;
try {
while ((boundary = this.buffer.indexOf('\n')) !== -1) {
const line = this.buffer.substring(0, boundary);
this.buffer = this.buffer.substring(boundary + 1);
if (line.trim()) {
this.push(JSON.parse(line));
}
}
} catch (err) {
return callback(err);
}
callback();
}
_flush(callback) {
if (this.buffer.trim()) {
try {
this.push(JSON.parse(this.buffer));
} catch (err) {
return callback(err);
}
}
callback();
}
}
const parser = new JSONParseTransform();
parser.on('data', (obj) => {
console.log('Parsed:', obj);
});
parser.write('{"name":"Alice"}\n');
parser.write('{"age":30}\n');
parser.end();
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
// 傳統方式
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('output.txt.gz'))
.on('finish', () => console.log('Done'));
// 使用pipeline (更好的錯誤處理)
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('output.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
const { Readable } = require('stream');
class ErrorReadable extends Readable {
_read() {
process.nextTick(() => {
this.emit('error', new Error('Something went wrong'));
});
}
}
const stream = new ErrorReadable();
// 方式1: 監聽error事件
stream.on('error', (err) => {
console.error('Error:', err.message);
});
// 方式2: 使用pipeline自動處理錯誤
pipeline(
stream,
process.stdout,
(err) => {
if (err) {
console.error('Error in pipeline:', err);
}
}
);
const { Writable } = require('stream');
class SlowWriter extends Writable {
_write(chunk, encoding, callback) {
console.log('Processing chunk...');
setTimeout(callback, 1000); // 模擬慢速處理
}
}
const writer = new SlowWriter();
let i = 0;
function writeData() {
let ok = true;
do {
i++;
if (i === 10) {
writer.end('Last chunk');
} else {
ok = writer.write(`Chunk ${i}`);
}
} while (i < 10 && ok);
if (i < 10) {
writer.once('drain', writeData);
}
}
writeData();
const { Readable } = require('stream');
// 高水位線設置為10
const readable = new Readable({
read() {},
highWaterMark: 10
});
console.log(readable.readableHighWaterMark); // 10
const fs = require('fs');
const crypto = require('crypto');
// 流式文件哈希計算
function calculateFileHash(filePath) {
return new Promise((resolve, reject) => {
const hash = crypto.createHash('sha256');
const stream = fs.createReadStream(filePath);
stream.on('data', (chunk) => {
hash.update(chunk);
});
stream.on('end', () => {
resolve(hash.digest('hex'));
});
stream.on('error', reject);
});
}
// 使用
calculateFileHash('largefile.iso')
.then(hash => console.log('File hash:', hash))
.catch(err => console.error('Error:', err));
const http = require('http');
const fs = require('fs');
const server = http.createServer((req, res) => {
// 流式響應
if (req.url === '/video') {
const videoStream = fs.createReadStream('video.mp4');
res.writeHead(200, {
'Content-Type': 'video/mp4',
'Content-Length': fs.statSync('video.mp4').size
});
videoStream.pipe(res);
}
// 流式上傳
else if (req.url === '/upload' && req.method === 'POST') {
const fileStream = fs.createWriteStream('uploaded.dat');
req.pipe(fileStream);
fileStream.on('finish', () => {
res.end('File uploaded');
});
}
});
server.listen(3000);
const { Transform } = require('stream');
const WebSocket = require('ws');
// 創建WebSocket服務器
const wss = new WebSocket.Server({ port: 8080 });
// 自定義轉換流 - 分析傳感器數據
class SensorAnalyzer extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(chunk, encoding, callback) {
try {
const data = JSON.parse(chunk);
// 添加分析結果
data.timestamp = new Date().toISOString();
data.alert = data.temperature > 30 ? 'HIGH' : 'NORMAL';
this.push(JSON.stringify(data));
callback();
} catch (err) {
callback(err);
}
}
}
wss.on('connection', (ws) => {
const analyzer = new SensorAnalyzer();
// 客戶端消息 -> 分析 -> 返回結果
ws.on('message', (message) => {
analyzer.write(message);
});
analyzer.on('data', (result) => {
ws.send(result);
});
analyzer.on('error', (err) => {
console.error('Analysis error:', err);
ws.send(JSON.stringify({ error: err.message }));
});
});
內存泄漏
destroy()
或end()
數據丟失
drain
事件性能瓶頸
錯誤未捕獲
流過早結束
finished
或pipeline
Node.js中的流是處理I/O操作的高效工具,特別適合處理大文件或實時數據。通過理解四種基本流類型(可讀、可寫、雙工、轉換)及其實現方式,開發者可以構建高性能、內存效率高的應用程序。關鍵要點包括:
通過本文的示例和解釋,希望讀者能夠掌握Node.js流的核心概念,并在實際項目中靈活應用。
”`
(注:此為精簡版框架,完整14150字版本將包含更多詳細解釋、示例代碼、性能對比圖表、最佳實踐和深入分析)
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。