小編給大家分享一下Flink State管理的示例分析,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!
去重
比如上游的系統數據可能會有重復,落到下游系統時希望把重復的數據都去掉。去重需要先了解哪些數據來過,哪些數據還沒有來,也就是把所有的主鍵都記錄下來,當一條數據到來后,能夠看到在主鍵當中是否存在
窗口計算
比如統計每分鐘 Nginx 日志 API 被訪問了多少次。窗口是一分鐘計算一次,在窗口觸發前,如 08:00 ~ 08:01 這個窗口,前59秒的數據來了需要先放入內存,即需要把這個窗口之內的數據先保留下來,等到 8:01 時一分鐘后,再將整個窗口內觸發的數據輸出。未觸發的窗口數據也是一種狀態。
機器學習/深度學習
如訓練的模型以及當前模型的參數也是一種狀態,機器學習可能每次都用有一個數據集,需要在數據集上進行學習,對模型進行一個反饋。
訪問歷史數據
比如與昨天的數據進行對比,需要訪問一些歷史數據。如果每次從外部去讀,對資源的消耗可能比較大,所以也希望把這些歷史數據也放入狀態中做對比。
易用
Flink 提供了豐富的數據結構、多樣的狀態組織形式以及簡潔的擴展接口,讓狀態管理更加易用
高效
實時作業一般需要更低的延遲,一旦出現故障,恢復速度也需要更快;當處理能力不夠時,可以橫向擴展,同時在處理備份時,不影響作業本身處理性能;
可靠
Flink 提供了狀態持久化,包括不丟不重的語義以及具備自動的容錯能力,比如 HA,當節點掛掉后會自動拉起,不需要人工介入。
從狀態管理方式的方式來說,Managed State 由 Flink Runtime 管理,自動存儲,自動恢復,在內存管理上有優化;而 Raw State 需要用戶自己管理,需要自己序列化,Flink 不知道 State中存入的數據是什么結構,只有用戶自己知道,需要最終序列化為可存儲的數據結構。
從狀態數據結構來說,Managed State 支持已知的數據結構,如 Value、List、Map 等。而 Raw State只支持字節數組,所有狀態都要轉換為二進制字節數組才可以。
從推薦使用場景來說,Managed State 大多數情況下均可使用,而 Raw State 是當 Managed State 不夠用時,比如需要自定義Operator時,推薦使用 Raw State。
Keyed State 只能用在 KeyedStream 的算子中,即在整個程序中沒有 keyBy 的過程就沒有辦法使用 KeyedStream。
Operator State 可以用于所有算子,常用于 Source.由于 Operator State 沒有 Key,并發改變時需要選擇狀態如何重新分配。其中內置了 2 種分配方式:一種是均勻分配,另外一種是將所有 State 合并為全量 State 再分發給每個實例
Keyed State 通過 RuntimeContext 訪問,這需要 Operator 是一個Rich Function。Operator State 需要自己實現 CheckpointedFunction 或 ListCheckpointed 接口。在數據結構上,Keyed State 支持的數據結構,比如 ValueState、ListState、ReducingState、AggregatingState 和 MapState;而 Operator State 支持的數據結構相對較少,如 ListState。
ValueState 存儲單個值,比如 Wordcount,用 Word 當 Key,State 就是它的 Count。這里面的單個值可能是數值或者字符串,作為單個值,訪問接口可能有兩種,get 和 set。在 State 上體現的是 update(T) / T value()。
MapState 的狀態數據類型是 Map,在 State 上有 put、remove等。需要注意的是在 MapState 中的 key 和 Keyed state 中的 key 不是同一個。
ListState 狀態數據類型是 List,訪問接口如 add、update 等
ReducingState 和 AggregatingState 與 ListState 都是同一個父類,但狀態數據類型上是單個值,原因在于其中的 add 方法不是把當前的元素追加到列表中,而是把當前元素直接更新進了 Reducing 的結果中。
AggregatingState 的區別是在訪問接口,ReducingState 中 add(T)和 T get() 進去和出來的元素都是同一個類型,但在 AggregatingState 輸入的 IN,輸出的是 OUT。
Flink 狀態保存主要依靠 Checkpoint 機制,Checkpoint 會定時制作分布式快照,對程序中的狀態進行備份。
MemoryStateBackend
Checkpoint 的存儲,第一種是內存存儲,即 MemoryStateBackend,構造方法是設置最大的StateSize,選擇是否做異步快照,這種存儲狀態本身存儲在 TaskManager 節點也就是執行節點內存中的,因為內存有容量限制,所以單個 State maxStateSize 默認 5 M,且需要注意 maxStateSize <= akka.framesize 默認 10 M。Checkpoint 存儲在 JobManager 內存中,因此總大小不超過 JobManager 的內存。推薦使用的場景為:本地測試、幾乎無狀態的作業,比如 ETL、JobManager 不容易掛,或掛掉影響不大的情況。不推薦在生產場景使用。
FsStateBackend
另一種就是在文件系統上的 FsStateBackend ,構建方法是需要傳一個文件路徑和是否異步快照。State 依然在 TaskManager 內存中,但不會像 MemoryStateBackend 有 5 M 的設置上限,Checkpoint 存儲在外部文件系統(本地或 HDFS),打破了總大小 Jobmanager 內存的限制。容量限制上,單 TaskManager 上 State 總量不超過它的內存,總大小不超過配置的文件系統容量。推薦使用的場景、常規使用狀態的作業、例如分鐘級窗口聚合或 join、需要開啟HA的作業。
RocksDBStateBackend
還有一種存儲為 RocksDBStateBackend ,RocksDB 是一個 key/value 的內存存儲系統,和其他的 key/value 一樣,先將狀態放到內存中,如果內存快滿時,則寫入到磁盤中,但需要注意 RocksDB 不支持同步的 Checkpoint,構造方法中沒有同步快照這個選項。不過 RocksDB 支持增量的 Checkpoint,也是目前唯一增量 Checkpoint 的 Backend,意味著并不需要把所有 sst 文件上傳到 Checkpoint 目錄,僅需要上傳新生成的 sst 文件即可。它的 Checkpoint 存儲在外部文件系統(本地或HDFS),其容量限制只要單個 TaskManager 上 State 總量不超過它的內存+磁盤,單 Key最大 2G,總大小不超過配置的文件系統容量即可。推薦使用的場景為:超大狀態的作業,例如天級窗口聚合、需要開啟 HA 的作業、最好是對狀態讀寫性能要求不高的作業。
看完了這篇文章,相信你對“Flink State管理的示例分析”有了一定的了解,如果想了解更多相關知識,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。