溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink Batch SQL 1.10 實踐

發布時間:2020-08-09 20:19:18 來源:ITPUB博客 閱讀:241 作者:芊寶寶最可愛 欄目:大數據

Flink作為流批統一的計算框架,在1.10中完成了大量batch相關的增強與改進。1.10可以說是第一個成熟的生產可用的Flink Batch SQL版本,它一掃之前Dataset的羸弱,從功能和性能上都有大幅改進,以下我從架構、外部系統集成、實踐三個方面進行闡述。

架構

Stack


Flink Batch SQL 1.10 實踐


首先來看下stack,在新的Blink planner中,batch也是架設在Transformation上的,這就意味著我們和Dataset完全沒有關系了:

  1. 我們可以盡可能的和streaming復用組件,復用代碼,有同一套行為。
  2. 如果想要Table/SQL的toDataset或者fromDataset,那就完全沒戲了。盡可能的在Table的層面來處理吧。
  3. 后續我們正在考慮在DataStream上構建BoundedStream,給DataStream帶來批處理的功能。

網絡模型


Flink Batch SQL 1.10 實踐


Batch模式就是在中間結果落盤,這個模式和典型的Batch處理是一致的,比如MapReduce/Spark/Tez。

Flink以前的網絡模型也分為Batch和Pipeline兩種,但是Batch模式只是支持上下游隔斷執行,也就是說資源用量可以不用同時滿足上下游共同的并發。但是另外一個關鍵點是Failover沒有對接好,1.9和1.10在這方面進行了改進,支持了單點的Failover。

建議在Batch時打開:

jobmanager.execution.failover-strategy = region

為了避免重啟過于頻繁導致JobMaster太忙了,可以把重啟間隔提高:

restart-strategy.fixed-delay.delay = 30 s

Batch模式的好處有:

  • 容錯好,可以單點恢復
  • 調度好,不管多少資源都可以運行
  • 性能差,中間數據需要落盤,強烈建議開啟壓縮
    taskmanager.network.blocking-shuffle.compression.enabled = true

Batch模式比較穩,適合傳統Batch作業,大作業。


Flink Batch SQL 1.10 實踐


Pipeline模式是Flink的傳統模式,它完全和Streaming作業用的是同一套代碼,其實社區里Impala和Presto也是類似的模式,純走網絡,需要處理反壓,不落盤,它主要的優缺點是:

  • 容錯差,只能全局重來
  • 調度差,你得保證有足夠的資源
  • 性能好,Pipeline執行,完全復用Stream,復用流控反壓等功能。

有條件可以考慮開啟Pipeline模式。

調度模型

Flink on Yarn支持兩種模式,Session模式和Per job模式,現在已經在調度層次高度統一了。

  1. Session模式沒有最大進程限制,當有Job需要資源時,它就會去Yarn申請新資源,當Session有空閑資源時,它就會給Job復用,所以它的模型和PerJob是基本一樣的。
  2. 唯一的不同只是:Session模式可以跨作業復用進程。

另外,如果想要更好的復用進程,可以考慮加大TaskManager的超時釋放:
resourcemanager.taskmanager-timeout = 900000

資源模型

先說說并發:

  1. 對Source來說:目前Hive的table是根據InputSplit來定需要多少并發的,它之后能Chain起來的Operators自然都是和source相同的并發。
  2. 對下游網絡傳輸過后的Operators(Tasks)來說:除了一定需要單并發的Task來說,其它Task全部統一并發,由table.exec.resource.default-parallelism統一控制。

我們在Blink內部實現了基于統計信息來推斷并發的功能,但是其實以上的策略在大部分場景就夠用了。

Manage內存


Flink Batch SQL 1.10 實踐


目前一個TaskManager里面含有多個Slot,在Batch作業中,一個Slot里只能運行一個Task (關閉SlotShare)。

對內存來說,單個TM會把Manage內存切分成Slot粒度,如果1個TM中有n個Slot,也就是Task能拿到1/n的manage內存。

我們在1.10做了重大的一個改進就是:Task中chain起來的各個operators按照比例來瓜分內存,所以現在配置的算子內存都是一個比例值,實際拿到的還要根據Slot的內存來瓜分。

這樣做的一個重要好處是:

  1. 不管當前Slot有多少內存,作業能都run起來,這大大提高了開箱即用。
  2. 不管當前Slot有多少內存,Operators都會把內存瓜分干凈,不會存在浪費的可能。

當然,為了運行的效率,我們一般建議單個Slot的manage內存應該大于500MB。

另一個事情,在1.10后,我們去除了OnHeap的manage內存,所以只有off-heap的manage內存。

外部系統集成

Hive

強烈推薦Hive Catalog + Hive,這也是目前批處理最成熟的架構。在1.10中,除了對以前功能的完善以外,其它做了幾件事:

  1. 多版本支持,支持Hive 1.X 2.X 3.X
  2. 完善了分區的支持,包括分區讀,動態/靜態分區寫,分區統計信息的支持。
  3. 集成Hive內置函數,可以通過以下方式來load:
    a)TableEnvironment.loadModule("hiveModule",new HiveModule("hiveVersion"))
  4. 優化了ORC的性能讀,使用向量化的讀取方式,但是目前只支持Hive 2+版本,且要求列沒有復雜類型。有沒有進行過優化差距在5倍量級。

兼容Streaming Connectors

得益于流批統一的架構,目前的流Connectors也能在batch上使用,比如HBase的Lookup和Sink、JDBC的Lookup和Sink、Elasticsearch的Sink,都可以在Batch無縫對接使用起來。

實踐

SQL-CLI

在1.10中,SQL-CLI也做了大量的改動,比如把SQL-CLI做了stateful,里面也支持了DDL,還支持了大量的DDL命令,給SQL-CLI暴露了很多TableEnvironment的能力,這讓用戶可以方便得多。后續,我們也需要對接JDBC的客戶端,讓用戶可以更好的對接外部工具。但是SQL-CLI仍然待繼續改進,比如目前仍然只支持Session模式,不支持Per Job模式。

編程方式

TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings
  .newInstance()
  .useBlinkPlanner()
  .inBatchMode()
  .build());

老的BatchTableEnv因為綁定了Dataset,而且區分Java和Scala,是不干凈的設計方式,所以Blink planner只支持新的TableEnv。

TableEnv注冊的source, sink, connector, functions,都是temporary的,重啟之后即失效了。如果需要持久化的object,考慮使用HiveCatalog。

tEnv.registerCatalog(“hive”, hiveCatalog);
tEnv.useCatalog(“hive”);

可以通過tEnv.sqlQuery來執行DML,這樣可以獲得一個Table,我們也通過collect來獲得小量的數據:

Table table = tEnv.sqlQuery(“SELECT COUNT(*) FROM MyTable”);
List<Row> results = TableUtils.collectToList(table);
System.out.println(results);

可以通過tEnv.sqlUpdate來執行DDL,但是目前并不支持創建hive的table,只能創建Flink類型的table:

tEnv.sqlUpdate(
   "CREATE TABLE myResult (" +
      "  cnt BIGINT"
      ") WITH (" +
      "  'connector.type'='jdbc'," 
         ……
      ")");

可以通過tEnv.sqlUpdate來執行insert語句,Insert到臨時表或者Catalog表中,比如insert到上面創建的臨時JDBC表中:

tEnv.sqlUpdate(“INSERT INTO myResult SELECT COUNT(*) FROM MyTable”);
tEnv.execute(“MyJob”);

當結果表是Hive表時,可以使用Overwrite語法,也可以使用靜態Partition的語法,這需要打開Hive的方言:

tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

結語

目前Flink batch SQL仍然在高速發展中,但是1.10已經是一個可用的版本了,它在功能上、性能上都有很大的提升,后續還有很多有意思的features,等待著大家一起去挖掘。

原文鏈接

本文為阿里云原創內容,未經允許不得轉載。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女