這篇文章主要講解了“怎么使用Flink TableAPI和SQL /Elasticsearch”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“怎么使用Flink TableAPI和SQL /Elasticsearch”吧!
使用Tbale&SQL與Flink Elasticsearch Connector 連接器將數據寫入Elasticsearch引擎的索引
示例環境
java.version: 1.8.x flink.version: 1.11.1 elasticsearch:6.x
示例數據源 (項目碼云下載)
Flink 系例 之 搭建開發環境與數據
示例模塊 (pom.xml)
Flink 系例 之 TableAPI & SQL 與 示例模塊
InsertToEs.java
package com.flink.examples.elasticsearch;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @Description 使用Tbale&SQL與Flink Elasticsearch連接器將數據寫入Elasticsearch引擎的索引
*/
public class InsertToEs {
/**
* Apache Flink 有兩種關系型 API 來做流批統一處理:Table API 和 SQL。
* 參考官方:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/elasticsearch.html
*/
//參見屬性配置類:ElasticsearchValidator
static String table_sql = "CREATE TABLE my_users (\n" +
" user_id STRING,\n" +
" user_name STRING,\n" +
" uv BIGINT,\n" +
" pv BIGINT,\n" +
" PRIMARY KEY (user_id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'elasticsearch',\n" +
" 'connector.version' = '6',\n" +
" 'connector.property-version' = '1', \n" +
" 'connector.hosts' = 'http://192.168.110.35:9200',\n" +
" 'connector.index' = 'users',\n" +
" 'connector.document-type' = 'doc',\n" +
" 'format.type' = 'json',\n" +
" 'update-mode'='append' -- append|upsert\n" +
")";
public static void main(String[] args) {
//構建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默認流時間方式
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//構建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//構建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//注冊kafka數據維表
tEnv.executeSql(table_sql);
//Elasticsearch connector 目前只支持了 sink,不支持 source 。不能SELECT elasticsearch table,因此只能通過insert的方式提交數據;
String sql = "insert into my_users (user_id,user_name,uv,pv) values('10003','tom',31,10)";
// TableResult tableResult = tEnv.executeSql(sql);
//第二種方式:聲明一個操作集合來執行sql
StatementSet stmtSet = tEnv.createStatementSet();
stmtSet.addInsertSql(sql);
TableResult tableResult = stmtSet.execute();
tableResult.print();
}
}打印結果
+-------------------------------------------+ | default_catalog.default_database.my_users | +-------------------------------------------+ | -1 | +-------------------------------------------+ 1 row in set
感謝各位的閱讀,以上就是“怎么使用Flink TableAPI和SQL /Elasticsearch”的內容了,經過本文的學習后,相信大家對怎么使用Flink TableAPI和SQL /Elasticsearch這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。