本篇內容介紹了“什么是Apache Beam”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
1. 概述
在本教程中,我們將介紹 Apache Beam 并探討其基本概念。我們將首先演示使用 Apache Beam 的用例和好處,然后介紹基本概念和術語。之后,我們將通過一個簡單的例子來說明 Apache Beam 的所有重要方面。
2. Apache Beam是個啥?
Apache Beam(Batch+strEAM)是一個用于批處理和流式數據處理作業的統一編程模型。它提供了一個軟件開發工具包,用于定義和構建數據處理管道以及執行這些管道的運行程序。
Apache Beam旨在提供一個可移植的編程層。事實上,Beam管道運行程序將數據處理管道轉換為與用戶選擇的后端兼容的API。目前,支持這些分布式處理后端有:
Apache Apex
Apache Flink
Apache Gearpump (incubating)
Apache Samza
Apache Spark
Google Cloud Dataflow
Hazelcast Jet
3. 為啥選擇 Apache Beam
Apache Beam 將批處理和流式數據處理融合在一起,而其他組件通常通過單獨的 API 來實現這一點 。因此,很容易將流式處理更改為批處理,反之亦然,例如,隨著需求的變化。
Apache Beam 提高了可移植性和靈活性。我們關注的是邏輯,而不是底層的細節。此外,我們可以隨時更改數據處理后端。
Apache Beam 可以使用 Java、Python、Go和 Scala等SDK。事實上,團隊中的每個人都可以使用他們選擇的語言。
4. 基本概念
使用 Apache Beam,我們可以構建工作流圖(管道)并執行它們。編程模型中的關鍵概念是:
PCollection–表示可以是固定批處理或數據流的數據集
PTransform–一種數據處理操作,它接受一個或多個 PCollections 并輸出零個或多個 PCollections。
Pipeline–表示 PCollection 和 PTransform 的有向無環圖,因此封裝了整個數據處理作業。
PipelineRunner–在指定的分布式處理后端上執行管道。
簡單地說,PipelineRunner 執行一個管道,管道由 PCollection 和 PTransform 組成。
5. 字數統計示例
現在我們已經學習了 Apache Beam 的基本概念,讓我們設計并測試一個單詞計數任務。
5.1 建造梁式管道
設計工作流圖是每個 Apache Beam 作業的第一步,單詞計數任務的步驟定義如下:
1.從原文中讀課文。
2.把課文分成單詞表。
3.所有單詞都小寫。
4.刪去標點符號。
5.過濾停止語。
6.統計唯一單詞數量。
為了實現這一點,我們需要使用 PCollection 和 PTransform 抽象將上述步驟轉換為 管道 。
5.2. 依賴
在實現工作流圖之前,先添加 Apache Beam的依賴項 到我們的項目:
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>${beam.version}</version> </dependency>
Beam管道運行程序依賴于分布式處理后端來執行任務。我們添加 DirectRunner 作為運行時依賴項:
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>${beam.version}</version> <scope>runtime</scope> </dependency>
與其他管道運行程序不同,DirectRunner 不需要任何額外的設置,這對初學者來說是個不錯的選擇。
5.3. 實現
Apache Beam 使用 Map-Reduce 編程范式 ( 類似 Java Stream)。講下面內容之前,最好 對 reduce(), filter(), count(), map(), 和 flatMap() 有個基礎概念和認識。
首先要做的事情就是 創建管道:
PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options);
六步單詞計數任務:
PCollection<KV<String, Long>> wordCount = p .apply("(1) Read all lines", TextIO.read().from(inputFilePath)) .apply("(2) Flatmap to a list of words", FlatMapElements.into(TypeDescriptors.strings()) .via(line -> Arrays.asList(line.split("\\s")))) .apply("(3) Lowercase all", MapElements.into(TypeDescriptors.strings()) .via(word -> word.toLowerCase())) .apply("(4) Trim punctuations", MapElements.into(TypeDescriptors.strings()) .via(word -> trim(word))) .apply("(5) Filter stopwords", Filter.by(word -> !isStopWord(word))) .apply("(6) Count words", Count.perElement());
apply() 的第一個(可選)參數是一個String,它只是為了提高代碼的可讀性。下面是上述代碼中每個 apply() 的作用:
首先,我們使用 TextIO 逐行讀取輸入文本文件。
將每一行按空格分開,把它映射到一個單詞表上。
單詞計數不區分大小寫,所以我們將所有單詞都小寫。
之前,我們用空格分隔行,但是像“word!“和”word?"這樣的,就需要刪除標點符號。
像“is”和“by”這樣的停止詞在幾乎每一篇英語文章中都很常見,所以我們將它們刪除。
最后,我們使用內置函數 Count.perElement() 計算唯一單詞數量。
如前所述,管道是在分布式后端處理的。不可能在內存中的PCollection上迭代,因為它分布在多個后端。相反,我們將結果寫入外部數據庫或文件。
首先,我們將PCollection轉換為String。然后,使用TextIO編寫輸出:
wordCount.apply(MapElements.into(TypeDescriptors.strings()) .via(count -> count.getKey() + " --> " + count.getValue())) .apply(TextIO.write().to(outputFilePath));
現在管道 已經定義好了,接下來做個簡單的測試。
5.4. 運行測試
到目前為止,我們已為單詞計數任務定義了管道,現在運行管道:
p.run().waitUntilFinish();
在這行代碼中,Apache Beam 將把我們的任務發送到多個 DirectRunner 實例。因此,最后將生成幾個輸出文件。它們將包含以下內容:
... apache --> 3 beam --> 5 rocks --> 2 ...
在 Apache Beam 中定義和運行分布式作業是如此地簡單。為了進行比較,單詞計數實現在 Apache Spark, Apache Flink 和 Hazelcast-Jet 上也有
“什么是Apache Beam”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。