溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何在Apache Flink中使用RocksDB狀態后端

發布時間:2021-12-23 10:40:26 來源:億速云 閱讀:1474 作者:柒染 欄目:大數據

如何在Apache Flink中使用RocksDB狀態后端,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

流處理應用程序通常是有狀態的,“記住”已處理事件中的信息,并使用它來影響進一步的事件處理。在Flink中,記住的信息,即狀態,被本地存儲在配置的狀態后端中。為了防止發生故障時丟失數據,狀態后端會定期將其內容的快照持久化到預先配置的持久存儲中。RocksDB狀態后端(即RocksDBStateBackend)是Flink中三個內置狀態后端之一。這篇博文將引導您了解使用RocksDB管理應用程序狀態的好處,解釋何時以及如何使用它,并澄清一些常見的誤解。

Flink中的狀態

為了更好地理解Flink中的狀態和狀態后端,區分飛行狀態(in-flight state)狀態快照(state snapshots)是很重要的。飛行狀態,也稱為工作狀態,是Flink作業正在處理的狀態。它總是本地存儲在內存中(有可能溢出到磁盤),并且在作業失敗時可能會丟失,而不會影響作業的可恢復性。狀態快照,即檢查點和保存點,存儲在遠程持久存儲器中,用于在作業失敗時恢復本地狀態。生產部署的適當狀態后端取決于可伸縮性、吞吐量和延遲要求。

什么是RocksDB?

認為RocksDB是一個分布式數據庫,需要在集群上運行并由專門的管理員管理,這是一個常見的誤解。RocksDB是一個用于快速存儲的可嵌入持久鍵值存儲。它通過Java本機接口(JNI)與Flink交互。下圖顯示了RocksDB在Flink集群節點中的位置。以下各節將詳細說明。

如何在Apache Flink中使用RocksDB狀態后端

Flink中的RocksDB

將RocksDB用作狀態后端所需的一切都捆綁在Apache Flink發行版中,包括本機共享庫:

$ jar -tvf lib/flink-dist_2.12-1.12.0.jar| grep librocksdbjni-linux64
8695334 Wed Nov 27 02:27:06 CET 2019 librocksdbjni-linux64.so

在運行時,RocksDB嵌入到TaskManager進程中。它在本機線程中運行并處理本地文件。例如,如果你的Flink集群中有一個配置了RocksDBStateBendback的作業,您將看到類似于下面的內容,其中32513是TaskManager進程ID。

$ ps -T -p 32513 | grep -i rocksdb
32513 32633 ?        00:00:00 rocksdb:low0
32513 32634 ?        00:00:00 rocksdb:high0

注意:該命令僅適用于Linux。對于其他操作系統,請參閱其文檔

什么時候使用RocksDBStateBackend

除了RocksDBStateBackend,Flink還有另外兩個內置的狀態后端:MemoryStateBackend和FsStateBackend。它們都是基于堆的,因為運行中的狀態存儲在JVM堆中。目前,讓我們忽略MemoryStateBackend,因為它只用于本地開發調試,不用于生產。

使用RocksDBStateBackend,運行中的狀態首先寫入堆外/本機內存,然后在達到配置的閾值時刷新到本地磁盤。這意味著RocksDBStateBackend可以支持大于總配置堆容量的狀態??梢源鎯υ赗ocksDBStateBackend中的狀態量僅受整個集群中可用磁盤空間的限制。此外,由于RocksDBStateBackend不使用JVM堆來存儲運行中的狀態,因此它不受JVM垃圾收集的影響,因此具有可預測的延遲。

除了完整的、自包含的狀態快照之外,RocksDBStateBackend還支持作為性能調優選項的增量檢查點。增量檢查點僅存儲自上次完成的檢查點以來發生的更改。與執行完整快照相比,這大大減少了檢查點時間。RocksDBStateBendback是當前唯一支持增量檢查點的狀態后端。

在以下情況下,RocksDB是一個不錯的選擇:

  • 作業的狀態超出了本地內存的容量(例如,長時間的窗口、大的KeyedState);

  • 你正在研究增量檢查點作為一種減少檢查點時間的方法;

  • 希望有更可預測的延遲,而不受JVM垃圾回收的影響

否則,如果應用程序的狀態很小或需要很低的延遲,則應該考慮FsStateBackend。根據經驗,RocksDBStateBackend比基于堆的狀態后端慢幾倍,因為它將鍵值對存儲為序列化的字節。這意味著任何狀態訪問(讀/寫)都需要經過一個跨JNI邊界的反序列化/序列化過程,這比直接使用堆上的狀態表示更昂貴。好處是,對于相同數量的狀態,與相應的堆上表示法相比,它的內存占用率較低。

如何使用RocksDBStateBackend

RocksDB完全嵌入到TaskManager進程中,并完全由TaskManager進程管理。RocksDBStateBackend可以在集群級別配置為整個集群的默認值,也可以在作業級別配置為單個作業的默認值。作業級配置優先于集群級配置。

集群級別

conf/flink-conf.yaml中添加以下配置:

state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: hdfs:///flink-checkpoints # location to store checkpoints

作業級別

創建StreamExecutionEnvironment后,將以下內容添加到作業的代碼中:

# 'env' is the created StreamExecutionEnvironment
# 'true' is to enable incremental checkpointing
env.setStateBackend(new RocksDBStateBackend("hdfs:///fink-checkpoints", true));  

注意:除了HDFS之外,如果在FLINK_HOME/plugins下添加了相應的依賴項,那么還可以使用其他本地或基于云的對象存儲。

最佳實踐和高級配置

我們希望這個概述能幫助您更好地理解RocksDB在Flink中的作用,以及如何使用RocksDBStateBackend成功地運行作業。最后,我們將探討一些最佳實踐和一些參考點,以便進一步進行故障診斷和性能調優。

狀態在RocksDB中的位置

如前所述,RocksDBStateBackend 中的運行中狀態會溢出到磁盤上的文件。這些文件位于Flink配置指定的state.backend.rocksdb.localdir目錄下。因為磁盤性能直接影響RocksDB的性能,所以建議將此目錄放在本地磁盤上。不鼓勵將其配置到基于網絡的遠程位置,如NFS或HDFS,因為寫入遠程磁盤通常比較慢。高可用性也不是飛行狀態(in-flight state)的要求。如果需要高磁盤吞吐量,則首選本地SSD磁盤。

狀態快照將持久化到遠程持久存儲。在狀態快照期間,TaskManager會對飛行中的狀態(in-flight state)進行快照并遠程存儲。將狀態快照傳輸到遠程存儲完全由TaskManager本身處理,而不需要狀態后端的參與。所以,state.checkpoints.dir 目錄或者您在代碼中為特定作業設置的參數可以是不同的位置,如本地HDFS集群或基于云的對象存儲,如Amazon S3、Azure Blob Storage、Google cloud Storage、Alibaba OSS等。

RocksDB故障診斷

要檢查RocksDB在生產中的行為,應該查找名為LOG的RocksDB日志文件。默認情況下,此日志文件與數據文件位于同一目錄中,即Flink配置指定的目錄state.backend.rocksdb.localdir。啟用時,RocksDB統計信息也會記錄在那里,以幫助診斷潛在的問題。有關更多信息,請查看RocksDB Wiki中的Troubleshooting Guide。如果你對RocksDB行為趨勢感興趣,可以考慮為你的Flink作業啟用RocksDB本機指標。

注意:從Flink1.10開始,通過將日志級別設置為HEADER,RocksDB日志記錄被有效地禁用。要啟用它,請查看How to get RocksDB’s LOG file back for advanced troubleshooting。

警告:在Flink中啟用RocksDB的原生指標可能會對您的工作產生負面影響。

從Flink 1.10開始,Flink默認將RocksDB的內存分配配置為每個任務槽的托管內存(managed memory)量。改善內存相關性能問題的主要機制是通過Flink配置taskmanager.memory.managed.sizetaskmanager.memory.managed.fraction增加Flink的托管內存。對于更細粒度的控制,應該首先通過設置state.backend.rocksdb.memory.managed為false,然后從以下Flink配置開始:state.backend.rocksdb.block.cache-size(與RocksDB中的塊大小相對應),state.backend.rocksdb.writebuffer.size(與RocksDB中的write_buffer_size相對應),以及state.backend.rocksdb.writebuffer.count(對應于RocksDB中的最大寫入緩沖區數)。有關更多詳細信息,請查看這篇關于如何在Flink中管理RocksDB內存大小的文章和RocksDB內存使用Wiki頁面。

在RocksDB中寫入或覆蓋數據時,RocksDB線程在后臺管理從內存到本地磁盤的刷新和數據壓縮。在多核CPU的機器上,應該通過設置Flink配置state.backend.rocksdb.thread.num(對應于RocksDB中的max_background_jobs)來增加后臺刷新和壓縮的并行性。對于生產設置來說,默認配置通常太小。如果你的工作經常從RocksDB讀取內容,那么應該考慮啟用布隆過濾器。

對于其他RocksDBStateBackend配置,請查看Flink文檔Advanced RocksDB State Backends Options。有關進一步的調優,請查看RocksDB Wiki中的RocksDB Tuning Guide。

RocksDB狀態后端(即RocksDBStateBackend)是Flink中捆綁的三種狀態后端之一,在配置流應用程序時是一個很好的選擇。它使可擴展的應用程序能夠保持高達數TB的狀態,并保證exactly-once。如果Flink作業的狀態太大,無法放入JVM堆中,或者你對增量檢查點很感興趣,或者希望有可預測的延遲,那么應該使用RocksDBStateBackend。由于RocksDB作為本機線程嵌入到TaskManager進程中,并且可以處理本地磁盤上的文件,RocksDBStateBackend支持開箱即用,無需更多設置和管理任何外部系統或進程。

關于如何在Apache Flink中使用RocksDB狀態后端問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女