溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何基于Flink+ClickHouse 構建實時數據分析平臺

發布時間:2021-12-27 17:31:11 來源:億速云 閱讀:271 作者:柒染 欄目:大數據

今天就跟大家聊聊有關如何基于Flink+ClickHouse 構建實時數據分析平臺,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

下面 主要介紹Flink-to-Hive 小時級場景和 Flink-to-ClickHouse 秒級場景。

一、業務場景與現狀分析

   

查詢的頁面分為離線查詢頁面和實時查詢頁面。今年所實現的改造是在實時查詢中接入了 ClickHouse 計算引擎。根據不同的業務場景,實時數據報表中會展現數據指標曲線圖和詳細的數據指標表。目前數據指標的采集和計算為每五分鐘一個時間窗口,當然也存在三分鐘或一分鐘的特殊情況。數據指標數據全部從 Kafka 實時數據中導出,并導入 ClickHouse 進行計算。

如何基于Flink+ClickHouse 構建實時數據分析平臺

 

二、Flink-to-Hive 小時級場景

1.小時級實現架構圖


如下圖所示,Database 中的 Binlog 導出到 Kafka,同時 Log Server 數據也會上報到 Kafka。所有數據實時落地到 Kafka 之后,通過 Flink 抽取到 HDFS。下圖中 HDFS 到 Hive 之間為虛線,即 Flink 并非直接落地到 Hive,Flink 落地到 HDFS 后,再落地到 Hive 的時間可能是小時級、半小時級甚至分鐘級,需要知道數據的 Event time 已經到何時,再觸發 alter table,add partition,add location 等,寫入其分區。

這時需要有一個程序監控當前 Flink 任務的數據時間已經消費到什么時候,如9點的數據,落地時需要查看 Kafka 中消費的數據是否已經到達9點,然后在 Hive 中觸發分區寫入。

如何基于Flink+ClickHouse 構建實時數據分析平臺


   

2.實現原理

主要使用了 Flink 高階版本的一個特性——StreamingFileSink。StreamingFileSink 主要有幾點功能。
  • 第一,  forBulkFormat 支持 avro、parquet 格式,即列式存儲格式。

  • 第二,  withBucketAssigner 自定義按數據時間分桶,此處會定義一個EventtimeBucket,既按數據時間進行數據落地到離線中。

  • 第三,  OnCheckPointRollingPolicy,根據 CheckPoint 時間進行數據落地,在一定的 CheckPoint 時間內數據落地并回穩。按照 CheckPoint 落地還有其它策略,如按照數據大小。

  • 第四,  StreamingFileSink 是 Exactly-Once 語義實現。

 
Flink 中有兩個 Exactly-Once 語義實現,第一個是 Kafka,第二個是 StreamingFileSink。下圖為 OnCheckPointRollingPolicy 設計的每10分鐘落地一次到HDFS文件中的 demo。

如何基于Flink+ClickHouse 構建實時數據分析平臺

 
■ 如何實現 Exactly-Once

下圖左側為一個簡單的二 PC 模型。Coordinator 發送一個 prepare,執行者開始觸發 ack 動作,Coordinator 收到 ack 所有消息后,所有 ack 開始觸發 commit,所有執行者進行落地,將其轉化到 Flink 的模型中,Source 收到 checkpoint barrier 流時,開始觸發一個 snapshot。

每個算子的 CheckPoint、snapshot 都完成之后,CheckPoint 會給 Job Manager 發送 notifyCheckpointComplete。下圖中二階段模型和 Flink 模型左側三條線部分是一致的。因此用 Flink 可以實現二階段提交協議。

如何基于Flink+ClickHouse 構建實時數據分析平臺

 
■ 如何使用 Flink 實現二階段提交協議  
 
首先,StreamingFileSink 實現兩個接口,CheckpointedFunction 和CheckpointListener。  CheckpointedFunction 實現 initializeState 和 snapshotState 函數。  CheckpointListener 是 notifyCheckpointComplete 的方法實現,因此這兩個接口可以實現二階段提交語義。

  • initializeState 


initializeState 在任務啟動時會觸發三個動作。  第一個是 commitPendingFile。  實時數據落地到 Hdfs 上有三個狀態。  第一個狀態是 in-progress ,正在進行狀態。  第二個狀態是 pending 狀態,第三個狀態是 finished 狀態。

 initializeState 在任務啟動時還會觸發 restoreInProgressFile,算子實時寫入。  如果 CheckPoint 還未成功時程序出現問題,再次啟動時 initializeState 會 commit PendingFile,然后采用 Hadoop 2.7+ 版本的 truncate 方式重置或截斷 in-progress 文件。

  • invoke 


實時寫入數據。
  • snapshotState 


觸發 CheckPoint 時會將 in-progress 文件轉化為 pending state,同時記錄數據長度(truncate 方式需要截斷長度)。  snapshotState 并非真正將數據寫入 HDFS,而是寫入 ListState。  Flink 在 Barrier 對齊狀態時內部實現 Exactly-Once 語義,但是實現外部端到端的 Exactly-Once 語義比較困難。  Flink 內部實現 Exactly-Once 通過 ListState,將數據全部存入 ListState,等待所有算子 CheckPoint 完成,再將 ListState 中的數據刷到 HDFS 中。

  • notifyCheckpointComplete 


notifyCheckpointComplete 會觸發 pending 到 finished state 的數據寫入。  實現方法是 rename,Streaming 不斷向 HDFS 寫入臨時文件,所有動作結束后通過 rename 動作寫成正式文件。

如何基于Flink+ClickHouse 構建實時數據分析平臺


   

3.跨集群多 nameservices

 
趣頭條的實時集群和離線集群是獨立的,離線集群有多套,實時集群目前有一套。通過實時集群寫入離線集群,會產生 HDFS nameservices 問題。在實時集群中將所有離線集群的 nameservices 用 namenode HA 的方式全部打入實時集群并不合適。那么如何在任務中通過實時集群提交到各個離線集群?

如下圖所示,在 Flink 任務的 resource 下面,在 HDFS 的 xml 中間加入 <final>。在 PropertyHong Kong 中添加 nameservices,如 stream 是實時集群的 namenode HA 配置,data 是即將寫入的離線集群的 namenode HA 配置。那么兩個集群中間的 HDFS set 不需要相互修改,直接可以在客戶端實現。

如何基于Flink+ClickHouse 構建實時數據分析平臺


   

4.多用戶寫入權限

   
 
實時要寫入離線 HDFS,可能會涉及用戶權限問題。實時提交的用戶已經定義好該用戶在所有程序中都是同一個用戶,但離線中是多用戶的,因此會造成實時和離線用戶不對等。趣頭條在 API 中添加了 withBucketUser 寫 HDFS。配置好 nameservices后,接下來只需要知道該 HDFS 路徑通過哪個用戶來寫,比如配置一個 stream 用戶寫入。

API 層級的好處是一個 Flink 程序可以指定多個不同的 HDFS 和不同的用戶。多用戶寫入的實現是在 Hadoop file system 中加一個 ugi.do as ,代理用戶。以上為趣頭條使用 Flink 方式進行實時數據同步到 Hive 的一些工作。其中可能會出現小文件問題,小文件是后臺程序進行定期 merge,如果 CheckPoint 間隔時間較短,如3分鐘一次,會出現大量小文件問題。

如何基于Flink+ClickHouse 構建實時數據分析平臺

 

三、Flink-to-ClickHouse 秒級場景


 
   

1.秒級實現架構圖

   
趣頭條目前有很多實時指標,平均每五分鐘或三分鐘計算一次,如果每一個實時指標用一個 Flink 任務,或者一個 Flink SQL 來寫,比如消費一個 Kafka Topic,需要計算其日活、新增、流程等等當用戶提出一個新需求時,需要改當前的 Flink 任務或者啟動一個新的 Flink 任務消費 Topic。

因此會出現 Flink 任務不斷修改或者不斷起新的 Flink 任務的問題。趣頭條嘗試在 Flink 后接入 ClickHouse,實現整體的 OLAP。下圖為秒級實現架構圖。從 Kafka 到 Flink,到 Hive,到 ClickHouse 集群,對接外部 Horizon(實時報表),QE(實時 adhoc 查詢),千尋(數據分析),用戶畫像(實時圈人)。

如何基于Flink+ClickHouse 構建實時數據分析平臺

   
   

2.Why Flink+ClickHouse

 
  • 指標實現 sql 化描述:分析師提出的指標基本都以 SQL 進行描述。

  • 指標的上下線互不影響:一個 Flink 任務消費 Topic,如果還需要其它指標,可以保證指標的上下線互不影響。

  • 數據可回溯,方便異常排查:當日活下降,需要回溯排查是哪些指標口徑的邏輯問題,比如是報的數據差異或是數據流 Kafka 掉了,或者是因為用戶沒有上報某個指標導致日活下降,而 Flink 則無法進行回溯。

  • 計算快,一個周期內完成所有指標計算:需要在五分鐘內將成百上千的所有維度的指標全部計算完成。

  • 支持實時流,分布式部署,運維簡單:支持 Kafka 數據實時流。


目前趣頭條 Flink 集群有 100+ 臺 32 核 128 G 3.5T SSD,日數據量 2000+ 億,日查詢量 21w+ 次,80% 查詢在 1s 內完成。下圖為單表測試結果。ClickHouse 單表測試速度快。但受制于架構,ClickHouse 的 Join 較弱。

如何基于Flink+ClickHouse 構建實時數據分析平臺


下圖是處理相對較為復雜的 SQL,count+group by+order by,ClickHouse 在 3.6s內完成 26 億數據計算。

如何基于Flink+ClickHouse 構建實時數據分析平臺

 
   

3.Why ClickHouse so Fast


 
ClickHouse 采用列式存儲 +LZ4、ZSTD 數據壓縮。其次,計算存儲結合本地化+向量化執行。Presto 數據可能存儲在 Hadoop 集群或者 HDFS 中,實時拉取數據進行計算。而 ClickHouse 計算存儲本地化是指每一臺計算機器存在本地 SSD 盤,只需要計算自己的數據,再進行節點合并。  同時,LSM merge tree+Index。  將數據寫入    ClickHouse 之后,會在后臺開始一個線程將數據進行 merge,做 Index 索引。  如建常見的 DT 索引和小時級數據索引,以提高查詢性能。  第四,SIMD+LLVM 優化。  SIMD 是單指令多數據集。  第五,SQL 語法及 UDF 完善。  ClickHouse 對此有很大需求。  在數據分析或者維度下拽時需要更高的特性,如時間窗口的一部分功能點。
 
  • Merge Tree:如下圖所示。第一層為實時數據寫入。后臺進行每一層級數據的merge。merge 時會進行數據排序,做 Index 索引。

  • ClickHouse Connector:ClickHouse 有兩個概念,Local table 和Distributed table。一般是寫 Local table ,讀 Distributed table。ClickHouse 一般以 5~10w一個批次進行數據寫入,5s一個周期。趣頭條還實現了 RoundRobinClickHouseDataSource。

  • BalancedClickHouseDataSource MySQL 中配置一個 IP 和端口號就可以寫入數據,而 BalancedClickHouseDataSource 需要寫 Local 表,因此必須要知道該集群有多少個 Local 表,每一個 Local 表的 IP 和端口號。如有一百臺機器,需要將一百臺機器的 IP 和端口號全部配置好,再進行寫入。BalancedClickHouseDataSource 有兩個 schedule。scheduleActualization和 scheduleConnectionsCleaning 。配置一百臺機器的 IP 和端口號,會出現某些機器不連接或者服務不響應問題,scheduleActualization 會定期發現機器無法連接的問題,觸發下線或刪除 IP 等動作。scheduleConnectionsCleaning 會定期清理 ClickHouse 中無用的 http 請求。


如何基于Flink+ClickHouse 構建實時數據分析平臺


 
  • RoundRobinClickHouseDataSource:趣頭條對BalancedClickHouseDataSource 進行加強的結果,實現了三個語義。testOnBorrow 設置為 true,嘗試 ping 看能否獲取連接。用 ClickHouse 寫入時是一個 batch,再將 testOnReturn 設置為 false,testWhileIdel 設置為true,填入官方 scheduleActualization 和 scheduleConnectionsCleaning 的功能。ClickHouse 后臺不斷進行 merge,如果 insert 過快使后臺 merge 速度變慢,跟不上 insert,出現報錯。因此需要盡量不斷往下寫,等寫完當前機器,再寫下一個機器,以5s間隔進行寫入,使 merge 速度能夠盡量與 insert 速度保持一致。


如何基于Flink+ClickHouse 構建實時數據分析平臺


   

4.Backfill


Flink 導入 ClickHouse,在數據查詢或展示報表時,會遇到一些問題,比如 Flink 任務出現故障、報錯或數據反壓等,或 ClickHouse 集群出現不可響應,zk 跟不上,insert 過快或集群負載等問題,這會導致整個任務出現問題。

如果流數據量突然暴增,啟動 Flink 可能出現一段時間內不斷追數據的情況,需要進行調整并行度等操作幫助 Flink 追數據。但這時已經出現數據積壓,若還要加大 Flink 并發度處理數據,ClickHouse 限制 insert 不能過快,否則會導致惡性循環。因此當 Flink 故障或 ClickHouse 集群故障時,等待 ClickHouse 集群恢復后,Flink 任務從最新數據開始消費,不再追過去一段時間的數據,通過 Hive 將數據導入到 ClickHouse。

由于之前已經通過 Kafka 將數據實時落地到 Hive,通過 Hive 將數據寫入 ClickHouse 中。ClickHouse 有分區,只需要將上一個小時的數據刪除,導入 Hive 的一小時數據,就可以繼續進行數據查詢操作。Backfill 提供了 Flink 任務小時級容錯以及 ClickHouse 集群小時級容錯機制。

如何基于Flink+ClickHouse 構建實時數據分析平臺


未來發展與思考

1.Connector SQL 化

目前, Flink-to-Hive 以及 Flink-to-ClickHouse 都是趣頭條較為固化的場景,只需指定 HDFS 路徑以及用戶,其余過程都可以通過 SQL 化描述。
 
   

2.Delta lake

Flink 是流批一體計算引擎,但是沒有流批一體的存儲。趣頭條會用 HBase、Kudu、Redis 等能夠與 Flink 實時交互的 KV 存儲進行數據計算。如計算新增問題,目前趣頭條的方案是需要將 Hive 歷史用戶刷到 Redis 或 HBase 中,與 Flink 進行實時交互判斷用戶是否新增。
但因為 Hive 中的數據和 Redis 中的數據是存儲為兩份數據。其次 Binlog 抽取數據會涉及 delete 動作,Hbase,Kudu 支持數據修改,定期回到 Hive 中。帶來的問題是 HBase,Kudu 中存在數據,Hive 又保存了一份數據,多出一份或多份數據。如果有流批一體的存儲支持上述場景,當 Flink 任務過來,可以與離線數據進行實時交互,包括實時查詢 Hive 數據等,可以實時判斷用戶是否新增,對數據進行實時修改、更新或 delete,也能支持 Hive 的批的動作存儲。

看完上述內容,你們對如何基于Flink+ClickHouse 構建實時數據分析平臺有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女