溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Node.js 中如何使用異步迭代器

發布時間:2021-07-20 16:34:32 來源:億速云 閱讀:118 作者:Leah 欄目:web開發

這期內容當中小編將會給大家帶來有關Node.js 中如何使用異步迭代器,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

在 Events 中使用 asyncIterator

Node.js v12.16.0 中新增了 events.on(emitter, eventName) 方法,返回一個迭代 eventName  事件的異步迭代器。

events.on() 示例 1

如下例所示, for await...of 循環只會輸出 Hello 當觸發 error 事件時會被 try catch 所捕獲。

const { on, EventEmitter } = require('events');  (async () => {   const ee = new EventEmitter();   const ite = on(ee, 'foo');    process.nextTick(() => {     ee.emit('foo', 'Hello');     ee.emit('error', new Error('unknown mistake.'))     ee.emit('foo', 'Node.js');   });    try {     for await (const event of ite) {       console.log(event); // prints ['Hello']     }   } catch (err) {     console.log(err.message); // unknown mistake.   } })();

上述示例,如果 EventEmitter 對象實例 ee 觸發了 error  事件,錯誤信息會被拋出并且退出循環,該實例注冊的所有事件偵聽器也會一并移除。

events.on() 示例 2

for await...of  內部塊的執行是同步的,每次只能處理一個事件,即使你接下來還有會立即執行的事件,也是如此。如果是需要并發執行的則不建議使用,這個原因會在下面解析  events.on() 源碼時給出答案。

如下所示,雖然事件是按順序同時觸發了兩次,但是在內部塊模擬了 2s 的延遲,下一次事件的處理也會得到延遲。

const ite = on(ee, 'foo');  process.nextTick(() => {   ee.emit('foo', 'Hello');   ee.emit('foo', 'Node.js');   // ite.return(); // 調用后可以結束 for await...of 的遍歷   // ite.throw() // 迭代器對象拋出一個錯誤 });  try {   for await (const event of ite) {     console.log(event); // prints ['Hello'] ['Node.js']     await sleep(2000);   } } catch (err) {   console.log(err.message); }  // Unreachable here console.log('這里將不會被執行');

上例中最后一句代碼是不會執行的,此時的迭代器會一直處于遍歷中,雖然上面兩個事件 emit  都觸發了,但是迭代器并沒有終止,什么時候終止呢?也就是當內部出現一些錯誤或我們手動調用可迭代對象的 return() 或 throw()  方法時迭代器才會終止。

events.on() 開啟一個 Node.js 服務器

之前一篇文章《“Hello Node.js” 這一次是你沒見過的寫法》寫過一段使用 events.on() 開啟一個 HTTP  服務器的代碼,在留言中當時有小伙伴對此提出疑惑,基于本章對異步迭代器在 events.on() 中使用的學習,可以很好的解釋。

相關代碼如下所示:

import { createServer as server } from 'http'; import { on } from 'events'; const ee = on(server().listen(3000), 'request'); for await (const [{ url }, res] of ee)   if (url === '/hello')     res.end('Hello Node.js!');   else     res.end('OK!');

以上代碼看似新穎,其核心實現就是使用 events.on() 返回 createServer() 對象 request 事件的異步可迭代對象,之后用  for await...of 語句遍歷,客戶端每一次請求,就相當于做了一次 ee.emit('request', Req, Res)。

由于內部塊的執行是同步的,下一次事件處理需要依賴上次事件完成才可以執行,對于一個 HTTP 服務器需要考慮并發的,請不要使用上面這種方式!

解析 Node.js 源碼對 events.on 異步迭代器的實現

events 模塊直接導出了 on() 方法,這個 on() 方法主要是將異步迭代器與事件的 EventEmitter  類的實例對象做了結合,實現還是很巧妙的,以下對核心源碼做下解釋,理解之后你完全也可以自己實現一個 events.on()。

  • 行 {1} ObjectSetPrototypeOf 是為對象設置一個新的原型,這個對象包含了 next()、return()、throw()  三個方法。

  • 行 {2} 根據異步可迭代協議,可迭代對象必須要包含一個 Symbol.asyncIterator  屬性,該屬性是一個無參數的函數,返回可迭代對象本身,也就是下面代碼中 SymbolAsyncIterator。

  • 行 {3} 新的原型就是 ObjectSetPrototypeOf 的第二個參數 AsyncIteratorPrototype。

  • 行 {4} eventTargetAgnosticAddListener 是對事件注冊監聽器,里面還是用的事件觸發器對象的 on() 方法  emitter.on(name, listener) 。

  • 行 {5} addErrorHandlerIfEventEmitter 判斷事件名如果不等于 'error' 同時注冊一個 error  事件的監聽器,具體實現同行 {4}。

  • 行 {6} eventHandler() 函數就是上面注冊的監聽器函數 listener  當有事件觸發時執行該監聽器函數,與異步迭代器的結合就在這里,當有新事件觸發時會從 unconsumedPromises  數組里取出第一個元素執行,如果理解異步迭代器實現標準你會發現 PromiseResolve(createIterResult(args, false))  就是異步迭代器對象 next() 方法返回值的標準定義。

下面繼續看 unconsumedPromises 從何而來。

module.exports = EventEmitter; module.exports.on = on;  function on(emitter, event) {   const unconsumedEvents = [];   const unconsumedPromises = [];   const iterator = ObjectSetPrototypeOf({ // {1}     next() { .... },     return() { ... },     throw(err) { ... },     [SymbolAsyncIterator]() { // {2}       return this;     }   }, AsyncIteratorPrototype); // {3}   eventTargetAgnosticAddListener(emitter, event, eventHandler); // {4}   if (event !== 'error') {     addErrorHandlerIfEventEmitter(emitter, errorHandler); // {5}   }   return iterator;                  function eventHandler(...args) { // {6}     const promise =  .shift();     if (promise) {       // 以下等價于 promise.resolve({ value: args, done: false });       PromiseResolve(createIterResult(args, false));     } else {       // for await...of 遍歷器內部塊的執行是同步的,所以每次只能處理 1 個事件,如果同時觸發多個事件,上次事件未完成剩下的事件會被保存至 unconsumedEvents 中,待上次事件完成后,遍歷器會自動調用 iterator 對象的 next() 方法,消費所有未處理的事件。       unconsumedEvents.push(args);     }   } }  function eventTargetAgnosticAddListener(emitter, name, listener, flags) {   ...   emitter.on(name, listener); }

以下是 iterator 對象的 next() 方法實現:

  • 行 {1} 首先消費未讀消息

  • 行 {2} 判斷如果是發生錯誤則拋出錯誤信息,例如 iterator 對象的 throw() 方法被調用后就會對 error 做賦值待下次遍歷器調用  next() 此處代碼就會被執行。

  • 行 {3} 如果迭代器對象完成,返回的 Promise 對象 done 屬性設置為 true,遍歷器也就結束了,變量 finished 是由  iterator 對象的 return() 方法被調用之后設置的。

  • 行 {4} 這個是上面提到的 unconsumedPromises 數據來源處,例如當我們執行 for await...of  語句遍歷異步迭代器對象時就會自動觸發 iterator 對象的 next() 方法,執行到行 {4} 處會創建一個 Promise 對象但是 resolve  并沒有被立即執行,而是先存放在 unconsumedPromises 數組中,所以在上面 #events.on() 示例 2# 提到一個問題,for  await...of 遍歷事件的異步迭代器對象時后面的代碼塊并不會被執行, 當我們觸發一個事件時才會在監聽器函數里執行這個 resolve  函數,此時才會被釋放,之后 for await...of 遍歷器會自動再次執行 next() 方法,然后 new 一個新的 Promise  反復循環,直到事件對象拋出 error 事件或執行 iterator 對象的 return() 方法。

const iterator = ObjectSetPrototypeOf({   next() {     // {1} 首先,我們會消費所有未讀消息     const value = unconsumedEvents.shift();     if (value) {       return PromiseResolve(createIterResult(value, false));     }      // {2} 如果發生一次 error 就會執行 Promise.reject 拋出一個錯誤,在這個錯誤發生后也會停止事件監聽。     if (error) {       const p = PromiseReject(error);       // Only the first element errors       error = null;       return p;     }      // {3} 如果迭代器對象完成,Promise.resolve done 設置為 true     if (finished) {       return PromiseResolve(createIterResult(undefined, true));     }      // {4} 等待直到一個事件發生     return new Promise(function(resolve, reject) {       unconsumedPromises.push({ resolve, reject });     });   }   ... }

在 Stream 中使用 asyncIterator

Node.js Stream 模塊的可讀流對象在 v10.0.0 版本試驗性的支持了 [Symbol.asyncIterator] 屬性,可以使用 for  await...of 語句遍歷可讀流對象,在 v11.14.0 版本以上已 LTS 支持。

異步迭代器 與 Readable

借助 fs 模塊創建一個可讀流對象 readable。

const fs = require('fs'); const readable = fs.createReadStream('./hello.txt', {   encoding: 'utf-8',   highWaterMark: 1 });

以往當我們讀取一個文件時,需要監聽 data 事件,拼接數據,在 end 事件里判斷完成,如下所示:

function readText(readable) {   let data = '';   return new Promise((resolve, reject) => {     readable.on('data', chunk => {       data += chunk;     })     readable.on('end', () => {       resolve(data);     });     readable.on('error', err => {       reject(err);     });   }) }

現在通過異步迭代器能以一種更簡單的方式實現,如下所示:

async function readText(readable) {   let data = '';   for await (const chunk of readable) {     data += chunk;   }   return data; }

現在我們可以調用 readText 做測試。

(async () => {   try {     const res = await readText(readable);     console.log(res); // Hello Node.js   } catch (err) {     console.log(err.message);   } })();

使用 for await...of 語句遍歷 readable,如果循環中因為 break 或 throw 一個錯誤而終止,則這個 Stream  也將被銷毀。

上述示例中 chunk 每次接收的值是根據創建可讀流時 highWaterMark 這個屬性決定的,為了能清晰的看到效果,在創建 readable  對象時我們指定了 highWaterMark 屬性為 1 每次只會讀取一個字符。

從 Node.js 源碼看 readable 是如何實現的 asyncIterator

與同步的迭代器遍歷語句 for...of 類似,用于 asyncIterator 異步迭代器遍歷的 for await...of  語句在循環內部會默認調用可迭代對象 readable 的 Symbol.asyncIterator() 方法得到一個異步迭代器對象,之后調用迭代器對象的  next() 方法獲取結果。

本文以 Node.js 源碼 v14.x 為例來看看源碼是如何實現的。當我們調用 fs.createReadStream()  創建一個可讀流對象時,對應的該方法內部會調用 ReadStream 構造函數

// https://github.com/nodejs/node/blob/v14.x/lib/fs.js#L2001 function createReadStream(path, options) {   lazyLoadStreams();   return new ReadStream(path, options); }

其實在 ReadStream 這個構造函數里沒有我們要找的,重點是它通過原型的方式繼承了 Stream 模塊的 Readable 構造函數。

function ReadStream(path, options) {   ...   Readable.call(this, options); }

那么現在我們重點來看看 Readable 這個構造函數的實現。

Readable 原型上定義了 SymbolAsyncIterator 屬性,該方法返回了一個由生成器函數創建的迭代器對象。

// for await...of 循環會調用 Readable.prototype[SymbolAsyncIterator] = function() {   let stream = this;   ...   const iter = createAsyncIterator(stream);   iter.stream = stream;   return iter; };  // 聲明一個創建異步迭代器對象的生成器函數 async function* createAsyncIterator(stream) {   let callback = nop;    function next(resolve) {     if (this === stream) {       callback();       callback = nop;     } else {       callback = resolve;     }   }    const state = stream._readableState;    let error = state.errored;   let errorEmitted = state.errorEmitted;   let endEmitted = state.endEmitted;   let closeEmitted = state.closeEmitted;     // error、end、close 事件控制了什么時候結束迭代器遍歷。   stream     .on('readable', next)     .on('error', function(err) {       error = err;       errorEmitted = true;       next.call(this);     })     .on('end', function() {       endEmitted = true;       next.call(this);     })     .on('close', function() {       closeEmitted = true;       next.call(this);     });    try {     while (true) {       // stream.read() 從內部緩沖拉取并返回數據。如果沒有可讀的數據,則返回 null       // readable 的 destroy() 方法被調用后 readable.destroyed 為 true,readable 即為下面的 stream 對象       const chunk = stream.destroyed ? null : stream.read();       if (chunk !== null) {         yield chunk; // 這里是關鍵,根據迭代器協議定義,迭代器對象要返回一個 next() 方法,使用 yield 返回了每一次的值       } else if (errorEmitted) {         throw error;       } else if (endEmitted) {         break;       } else if (closeEmitted) {         break;       } else {         await new Promise(next);       }     }   } catch (err) {     destroyImpl.destroyer(stream, err);     throw err;   } finally {     if (state.autoDestroy || !endEmitted) {       // TODO(ronag): ERR_PREMATURE_CLOSE?       destroyImpl.destroyer(stream, null);     }   } }

通過上面源碼可以看到可讀流的異步迭代器實現使用了生成器函數 Generator yield,那么對于 readable 對象遍歷除了 for  await...of 遍歷之外,其實也是可以直接使用調用生成器函數的 next() 方法也是可以的。

const ret = readable[Symbol.asyncIterator]() console.log(await ret.next()); // { value: 'H', done: false } console.log(await ret.next()); // { value: 'e', done: false }

異步迭代器與 Writeable

通過上面講解,我們知道了如何遍歷異步迭代器從 readable  對象獲取數據,但是你有沒有想過如何將一個異步迭代器對象傳送給可寫流?正是此處要講的。

從迭代器中創建可讀流

Node.js 流對象提供了一個實用方法 stream.Readable.from(),對于符合 Symbol.asyncIterator 或  Symbol.iterator 協議的可迭代對象(Iterable)會先創建一個可讀流對象 readable 之后從迭代器中構建 Node.js  可讀流。

以下是 從理解到實現輕松掌握 ES6 中的迭代器 一文中曾講解過的例子,r1 就是我們創建的可迭代對象。使用 stream.Readable.from()  方法則可以將可迭代對象構造為一個可讀流對象 readable。

function Range(start, end) {   this.id = start;   this.end = end; } Range.prototype[Symbol.asyncIterator] = async function* () {   while (this.id <= this.end) {     yield this.id++;   } } const r1 = new Range(0, 3); const readable = stream.Readable.from(r1); readable.on('data', chunk => {   console.log(chunk); // 0 1 2 3 });

傳送異步迭代器到可寫流

使用 pipeline 可以將一系列的流和生成器函數通過管道一起傳送,并在管道完成時獲取通知。

使用 util.promisify 將 pipeline 轉化為 promise 形式。

const util = require('util'); const pipeline = util.promisify(stream.pipeline); // 轉為 promise 形式  (async () => {   try {     const readable = stream.Readable.from(r1);     const writeable = fs.createWriteStream('range.txt');     await pipeline(       readable,       async function* (source) {         for await (const chunk of source) {           yield chunk.toString();         }       },       writeable     );     console.log('Pipeline 成功');   } catch (err) {     console.log(err.message);   } })()

在寫入數據時,傳入的 chunk 需是 String、Buffer、Uint8Array 類型,否則 writeable  對象在寫入數據時會報錯。由于我們自定義的可迭代對象 r1 里最終返回的值類型為 Number  在這里需要做次轉換,管道中間的生成器函數就是將每次接收到的值轉為字符串。

MongoDB 中使用 asyncIterator

除了上面我們講解的 Node.js 官方提供的幾個模塊之外,在 MongoDB 中也是支持異步迭代的,不過介紹這點的點資料很少,MongoDB  是通過一個游標的概念來實現的。

MongoDB 中的 cursor

本處以 Node.js 驅動 mongodb 模塊來介紹,當我們調用 db.collection.find() 這個方法返回的是一個  cursor(游標),如果想要訪問文檔那么我們需要迭代這個游標對象來完成,但是通常我們會直接使用 toArray() 這個方法來完成。

下面讓我們通過一段示例來看,現在我們有一個數據庫 example,一個集合 books,表里面有兩條記錄,如下所示:

Node.js 中如何使用異步迭代器

image.png

查詢 books 集合的所有數據,以下代碼中定義的 myCursor 變量就是游標對象,它不會自動進行迭代,可以使用游標對象的 hasNext()  方法檢測是否還有下一個,如果有則可以使用 next() 方法訪問數據。

通過以下日志記錄可以看到在第三次調用 hasNext() 時返回了 false,如果此時在調用 next()  就會報錯,游標已關閉,也就是已經沒有數據可遍歷了。

const MongoClient = require('mongodb').MongoClient; const dbConnectionUrl = 'mongodb://127.0.0.1:27017/example';  (async () => {   const client = await MongoClient.connect(dbConnectionUrl, { useUnifiedTopology: true });   const bookColl = client.db('example').collection('books');   const myCursor = await bookColl.find();     console.log(await myCursor.hasNext()); // true   console.log((await myCursor.next()).name); // 深入淺出Node.js   console.log(await myCursor.hasNext()); // true   console.log((await myCursor.next()).name); // Node.js實戰   console.log(await myCursor.hasNext()); // false   console.log((await myCursor.next()).name); // MongoError: Cursor is closed })()

直接調用 next() 也可檢測,如果還有值則返回該條記錄,否則 next() 方法返回 null。

console.log((await myCursor.next()).name); console.log((await myCursor.next()).name); console.log((await myCursor.next()));

MongoDB 異步迭代器實現源碼分析

MongoDB 中游標是以 hasNext() 返回 false 或 next() 返回為 null 來判斷是否達到游標尾部,與之不同的是在我們的  JavaScript 可迭代協議定義中是要有一個 Symbol.asyncIterator 屬性的迭代器對象,且迭代器對象是 { done, value }  的形式。

幸運的是 MongoDB Node.js 驅動已經幫助我們實現了這一功能,通過一段源碼來看在 MongoDB 中的實現。

  • find 方法

find 方法返回的是一個可迭代游標對象。

// https://github.com/mongodb/node-mongodb-native/blob/3.6/lib/collection.js#L470  Collection.prototype.find = deprecateOptions(   {     name: 'collection.find',     deprecatedOptions: DEPRECATED_FIND_OPTIONS,     optionsIndex: 1   },   function(query, options, callback) {     const cursor = this.s.topology.cursor(       new FindOperation(this, this.s.namespace, findCommand, newOptions),       newOptions     );      return cursor;   } );
  • CoreCursor

核心實現就在這里,這是一個游標的核心類,MongoDB Node.js 驅動程序中所有游標都是基于此,如果當前支持異步迭代器,則在 CoreCursor  的原型上設置 Symbol.asyncIterator 屬性,返回基于 Promise 實現的異步迭代器對象,這符合 JavaScript  中關于異步可迭代對象的標準定義。

// https://github.com/mongodb/node-mongodb-native/blob/3.6/lib/core/cursor.js#L610  if (SUPPORTS.ASYNC_ITERATOR) {   CoreCursor.prototype[Symbol.asyncIterator] = require('../async/async_iterator').asyncIterator; }
// https://github.com/mongodb/node-mongodb-native/blob/3.6/lib/async/async_iterator.js#L16  // async function* asyncIterator() { //   while (true) { //     const value = await this.next(); //     if (!value) { //       await this.close(); //       return; //     }  //     yield value; //   } // }  // TODO: change this to the async generator function above function asyncIterator() {   const cursor = this;    return {     next: function() {       return Promise.resolve()         .then(() => cursor.next())         .then(value => {           if (!value) {             return cursor.close().then(() => ({ value, done: true }));           }           return { value, done: false };         });     }   }; }

目前是默認使用的 Promise 的形式實現的,上面代碼中有段 TODO, Node.js  驅動關于異步迭代實現這塊可能后期會改為基于生成器函數的實現,這對我們使用是沒變化的.

使用 for await...of 遍歷可迭代對象 cursor

還是基于我們上面的示例,如果換成 for await...of 語句遍歷就簡單的多了。

const myCursor = await bookColl.find(); for await (val of myCursor) {   console.log(val.name); }

在 MongoDB 中的聚合管道中使用也是如此,就不再做過多分析了,如下所示:

const myCursor = await bookColl.aggregate(); for await (val of myCursor) {   console.log(val.name); }

對于遍歷龐大的數據集時,使用游標它會批量加載 MongoDB 中的數據,我們也不必擔心一次將所有的數據存在于服務器的內存中,造成內存壓力過大。

傳送 cursor 到可寫流

MongoDB 游標對象本身也是一個可迭代對象(Iterable),結合流模塊的 Readable.from()  則可轉化為可讀流對象,是可以通過流的方式進行寫入文件。

但是要注意 MongoDB 中的游標每次返回的是單條文檔記錄,是一個 Object  類型的,如果直接寫入,可寫流是會報參數類型錯誤的,因為可寫流默認是一個非對象模式(僅接受 String、Buffer、Unit8Array),所以才會看到在  pipeline 傳輸的中間又使用了生成器函數,將每次接收的數據塊處理為可寫流 Buffer 類型。

const myCursor = await bookColl.find(); const readable = stream.Readable.from(myCursor); await pipeline(   readable,   async function* (source) {     for await (const chunk of source) {       yield Buffer.from(JSON.stringify(chunk));     }   },   fs.createWriteStream('books.txt') );

上述就是小編為大家分享的Node.js 中如何使用異步迭代器了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女