溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

?Flink SQL怎么用

發布時間:2021-12-31 10:22:32 來源:億速云 閱讀:249 作者:小新 欄目:大數據

這篇文章將為大家詳細講解有關Flink SQL怎么用,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

Flink SQL 是 Flink 的核心模塊之一。作為一個分布式的 SQL 查詢引擎。Flink SQL 提供了各種異構數據源的聯合查詢。開發者可以很方便地在一個程序中通過 SQL 編寫復雜的分析查詢。通過 CBO 優化器、列式存儲、和代碼生成技術,Flink SQL 擁有非常高的查詢效率。同時借助于 Flink runtime 良好的容錯和擴展性,Flink SQL 可以輕松處理海量數據。

在保證優秀性能的同時,易用性是 1.11 版本 Flink SQL 的重頭戲。易用性的提升主要體現在以下幾個方面:


Create Table Like

在生產中,用戶常常有調整現有表定義的需求。例如用戶想在一些外部的表定義(例如 Hive metastore)基礎上追加 Flink 特有的一些定義比如 watermark。在 ETL 場景中,將多張表的數據合并到一張表,目標表的 schema 定義其實是上游表的合集,需要一種方便合并表定義的方式。
從 1.11 版本開始,Flink 提供了 LIKE 語法,用戶可以很方便的在已有的表定義上追加新的定義。

例如我們可以使用下面的語法給已有表 base_table 追加 watermark 定義:

  
    
  
  
  
CREATE [TEMPORARY] TABLE base_table (    id BIGINT,    name STRING,    tstmp TIMESTAMP,    PRIMARY KEY(id)) WITH (    'connector': 'kafka') CREATE [TEMPORARY] TABLE derived_table (    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND)LIKE base_table;
           

           
這里 derived_table 表定義等價于如下定義:

  
    
  
  
  
CREATE [TEMPORARY] TABLE derived_table (    id BIGINT,    name STRING,    tstmp TIMESTAMP,    PRIMARY KEY(id),    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND) WITH (    ‘connector’: ‘kafka’)
           
           
           
對比之下,新的語法省去了重復的 schema 定義,用戶只需要定義追加屬性,非常方便簡潔。

多屬性策略
有的小伙伴會問,原表和新表的屬性只是新增或追加嗎?如果我想覆蓋或者排除某些屬性該如何操作?這是一個好問題,Flink LIKE 語法提供了非常靈活的表屬性操作策略。

LIKE 語法支持使用不同的 keyword 對表屬性分類:

  • ALL:完整的表定義
  • CONSTRAINTS: primary keys, unique key 等約束
  • GENERATED: 主要指計算列和 watermark
  • OPTIONS: WITH (...) 語句內定義的 table options
  • PARTITIONS: 表分區信息

在不同的屬性分類上可以追加不同的屬性行為:

  • INCLUDING:包含(默認行為)
  • EXCLUDING:排除
  • OVERWRITING:覆蓋

下面這張表格說明了不同的分類屬性允許的行為:


INCLUDING    
EXCLUDING    
OVERWRITING    
ALL    
??    
??    
?    
CONSTRAINTS    
??    
??    
?    
PARTITIONS    
??    
??    
?    
GENERATED    
??    
??    
??    
OPTIONS    
??    
??    
??    

例如下面的語句:

  
    
  
  
  CREATE [TEMPORARY] TABLE base_table (    id BIGINT,    name STRING,    tstmp TIMESTAMP,    PRIMARY KEY(id)) WITH (    'connector': 'kafka',    'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300',    'format': 'json') CREATE [TEMPORARY] TABLE derived_table (    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND)WITH (    'connector.starting-offset': '0')LIKE base_table (OVERWRITING OPTIONS, EXCLUDING CONSTRAINTS);

等價的表屬性定義為:

  
    
  
  
  
CREATE [TEMPORARY] TABLE derived_table (    id BIGINT,    name STRING,    tstmp TIMESTAMP,    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND) WITH (    'connector': 'kafka',    'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300',    'format': 'json')
           
           
           
細節參見:  https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table

Dynamic Table Options

在生產中,調整參數是一個常見需求,很多的時候是臨時修改(比如通過終端查詢和展示),比如下面這張 Kafka 表:


 
 
 
 
   

 
 
 
create table kafka_table (  id bigint,  age int,  name STRING) WITH (  'connector' = 'kafka',  'topic' = 'employees',  'scan.startup.mode' = 'timestamp',  'scan.startup.timestamp-millis' = '123456',  'format' = 'csv',  'csv.ignore-parse-errors' = 'false')

在之前的版本,如果用戶有如下需求:

  • 用戶需要指定特性的消費時間戳,即修改 scan.startup.timestamp-millis 屬性
  • 用戶想忽略掉解析錯誤,需要將 format.ignore-parse-errors 改為 true
只能使用 ALTER TABLE 這樣的語句修改表的定義,從 1.11 開始,用戶可以通過動態參數的形式靈活地設置表的屬性參數,覆蓋或者追加原表的 WITH (...) 語句內定義的 table options。

基本語法為:

  
    
  
  
  
table_name /*+ OPTIONS('k1'='v1', 'aa.bb.cc'='v2') */
           
           
           
OPTIONS 內的鍵值對會覆蓋原表的 table options,用戶可以在各種 SQL 語境中使用這樣的語法,例如:

  
    
  
  
  
CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);
-- override table options in query sourceselect id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
-- override table options in joinselect * from    kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1    join    kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2    on t1.id = t2.id;
-- override table options for INSERT target tableinsert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;
           

           
動態參數的使用沒有語境限制,只要是引用表的地方都可以追加定義。在指定的表后面追加的動態參數會自動追加到原表定義中,是不是很方便呢 :)

由于可能對查詢結果有影響,動態參數功能默認是關閉的, 使用下面的方式開啟該功能:

  
    
  
  
  
// instantiate table environmentTableEnvironment tEnv = ...// access flink configurationConfiguration configuration = tEnv.getConfig().getConfiguration();// set low-level key-value optionsconfiguration.setString("table.dynamic-table-options.enabled", "true");
           

           
細節參見:  https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/hints.html

SQL API 改進

隨著 Flink SQL 支持的語句越來越豐富,老的 API 容易引起一些困惑:

  • 原先的 sqlUpdate() 方法傳遞 DDL 語句會立即執行,而 INSERT INTO 語句在調用 execute 方法時才會執行
  • Table 程序的執行入口不夠清晰,像 TableEnvironment.execute() 和 StreamExecutionEnvironment.execute() 都可以觸發 table 程序執行
  • execute 方法沒有返回值。像 SHOW TABLES 這樣的語句沒有很好地方式返回結果。另外,sqlUpdate 方法加入了越來越多的語句導致接口定義不清晰,sqlUpdate 可以執行 SHOW TABLES 就是一個反例
  • 在 Blink planner 一直提供多 sink 優化執行的能力,但是在 API 層沒有體現出來

1.11 重新梳理了 TableEnv 上的 sql 相關接口,提供了更清晰的執行語義,同時執行任意 sql 語句現在都有返回值,用戶可以通過新的 API 靈活的組織多行 sql 語句一起執行。

更清晰的執行語義
新的接口 TableEnvironment#executeSql 統一返回抽象 TableResult,用戶可以迭代 TableResult 拿到執行結果。根據執行語句的不同,返回結果的數據結構也有變化,比如 SELECT 語句會返回查詢結果,而 INSERT 語句會異步提交作業到集群。

組織多條語句一起執行
新的接口 TableEnvironment#createStatementSet 允許用戶添加多條 INSERT 語句并一起執行,在多 sink 場景,Blink planner 會針對性地對執行計劃做優化。

新舊 API 對比
一張表格感受新老 API 的變化:

sqlUpdate vs executeSql  

Current Interface

New Interface

tEnv.sqlUpdate("CREATE TABLE ...");

TableResult result = tEnv.executeSql("CREATE TABLE ...");

tEnv.sqlUpdate("INSERT INTO ... SELECT ...");

tEnv.execute("test");

TableResult result = tEnv.executeSql("INSERT INTO ... SELECT ...");

execute vs createStatementSet

Current Interface    
New Interface    
tEnv.sqlUpdate("insert into xx ...")
tEnv.sqlUpdate("insert into yy ...")
tEnv.execute("test")
StatementSet ss = tEnv.createStatementSet();
ss.addInsertSql("insert into xx ...");
ss.addInsertSql("insert into yy ...");
TableResult result = ss.execute();

   
tEnv.insertInto("sink1", table1)
tEnv.insertInto("sink2", table2)
tEnv.execute("test")
StatementSet ss = tEnv.createStatementSet();
ss.addInsert("sink1", table1);
ss.addInsert("sink2", table2);
TableResult result = ss.execute()
詳情參見:  https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878

Hive 語法兼容加強

從 1.11 開始,Flink  SQL 將 Hive parser 模塊獨立出來,用以兼容 Hive 的語法,目前 DDL 層面,DB、Table、View、Function 相關的語法均已支持。搭配 HiveCatalog,Hive 的同學可以直接使用 Hive 的語法來進行相關的操作。

在使用 hive 語句之前需要設置正確的 Dialect:

  
    
  
  
  
EnvironmentSettings settings = EnvironmentSettings.newInstance()...build();TableEnvironment tableEnv = TableEnvironment.create(settings);// to use hive dialecttableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);// use the hive catalogtableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);tableEnv.useCatalog(hiveCatalog.getName());
           

           
之后我們便可以使用 Hive 的語法來執行一些 DDL,例如最常見的建表操作:

  
    
  
  
  
create external table tbl1 (  d decimal(10,0),  ts timestamp)partitioned by (p string)location '%s'tblproperties('k1'='v1');  create table tbl2 (s struct<ts:timestamp,bin:binary>) stored as orc;
create table tbl3 (  m map<timestamp,binary>)partitioned by (p1 bigint, p2 tinyint)row format serde 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe';
create table tbl4 (  x int,  y smallint)row format delimited fields terminated by '|' lines terminated by '\n';
           

           
對于 DQL 的 Hive 語法兼容已經在規劃中,1.12 版本會兼容更多 query 語法 ~

詳情參見:  https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html

更簡潔的 connector 屬性

1.11 重新規范了 connector 的屬性定義,新的屬性 key 更加直觀簡潔,和原有的屬性 key 相比主要做了如下改動:

  • 使用 connector 作為 connector 的類型 key,connector 版本信息直接放到 value 中,比如 0.11 的 kafka 為 kafka-0.11 
  • 去掉了其余屬性中多余的 connector 前綴
  • 使用 scan 和 sink 前綴標記 source 和 sink 專有屬性
  • format.type 精簡為 format ,同時 format 自身屬性使用 format 的值作為前綴,比如 csv format 的自身屬性使用 csv 統一作前綴

例如,1.11 Kafka 表的定義如下:

  
    
  
  
  CREATE TABLE kafkaTable ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3)) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset')

詳情參見:  https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

JDBC catalog

在之前的版本中,用戶只能通過顯示建表的方式創建關系型數據庫的鏡像表。用戶需要手動追蹤 Flink SQL 的表 schema 和數據庫的 schema 變更。在 1.11,Flink SQL 提供了一個 JDBC catalog 接口對接各種外部的數據庫系統,例如 Postgres、MySQL、MariaDB、AWS Aurora、etc。

當前 Flink 內置了 Postgres 的 catalog 實現,使用下面的代碼配置 JDBC catalog:

CREATE CATALOG mypg WITH(    'type' = 'jdbc',    'default-database' = '...',    'username' = '...',    'password' = '...',    'base-url' = '...');
USE CATALOG mypg;

用戶也可以實現 JDBCCatalog 接口定制其他數據庫的 catalog ~


Python UDF 增強

1.11 版本的 py-flink 在 python UDF 方面提供了很多增強,包括 DDL 的定義方式、支持了標量的向量化 python UDF,支持全套的 python UDF metrics 定義,以及在 SQL-CLI 中定義 python UDF。

DDL 定義 python UDF
1.10.0 版本引入了對 python UDF 的支持。但是僅僅支持 python table api 的方式。1.11 提供了 SQL DDL 的方式定義 python UDF, 用戶可以在 Java/Scala table API 以及 SQL-CLI 場景下使用。

例如,現在用戶可以使用如下方式定義 Java table API 程序使用 python UDF:

  
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py");tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");
tEnv.toDataSet(table, String.class).collect();
   
向量化支持
向量化 Python  UDF 相較于普通函數大大提升了性能。用戶可以使用流行的 python 庫例如 Pandas、Numpy 來實現向量化的 python UDF。用戶只需在裝飾器 udf 中添加額外的參數 udf_type="pandas" 即可。
例如,下面的樣例展示了如何定義向量化的 Python 標量函數以及在 python table api 中的應用:

  
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT(), udf_type="pandas")def add(i, j):  return i + j
table_env = BatchTableEnvironment.create(env)
# register the vectorized Python scalar functiontable_env.register_function("add", add)
# use the vectorized Python scalar function in Python Table APImy_table.select("add(bigint, bigint)")
# use the vectorized Python scalar function in SQL APItable_env.sql_query("SELECT add(bigint, bigint) FROM MyTable")

關于“Flink SQL怎么用”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

sql
AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女